123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385 |
- import logging
- import cmath
- import math
- import os
- import re
- from collections import defaultdict, UserDict
- from typing import List
- from scipy.interpolate import interp1d
- from .RFTools import Datapoint
- RXP_CAL_LINE = re.compile(r"""^\s*
- (?P<freq>\d+) \s+
- (?P<shortr>[-0-9Ee.]+) \s+ (?P<shorti>[-0-9Ee.]+) \s+
- (?P<openr>[-0-9Ee.]+) \s+ (?P<openi>[-0-9Ee.]+) \s+
- (?P<loadr>[-0-9Ee.]+) \s+ (?P<loadi>[-0-9Ee.]+)(?: \s
- (?P<throughr>[-0-9Ee.]+) \s+ (?P<throughi>[-0-9Ee.]+) \s+
- (?P<isolationr>[-0-9Ee.]+) \s+ (?P<isolationi>[-0-9Ee.]+)
- )?
- """, re.VERBOSE)
- logger = logging.getLogger(__name__)
- def correct_delay(d: Datapoint, delay: float, reflect: bool = False):
- mult = 2 if reflect else 1
- corr_data = d.z * cmath.exp(
- complex(0, 1) * 2 * math.pi * d.freq * delay * -1 * mult)
- return Datapoint(d.freq, corr_data.real, corr_data.imag)
- class CalData(UserDict):
- def __init__(self):
- data = {
- "short": None,
- "open": None,
- "load": None,
- "through": None,
- "isolation": None,
-
- "freq": 0,
-
- "e00": 0.0,
- "e11": 0.0,
- "delta_e": 0.0,
-
- "e30": 0.0,
- "e10e32": 0.0,
- }
- super().__init__(data)
- def __str__(self):
- d = self.data
- s = (f'{d["freq"]}'
- f' {d["short"].re} {d["short"].im}'
- f' {d["open"].re} {d["open"].im}'
- f' {d["load"].re} {d["load"].im}')
- if d["through"] is not None:
- s += (f' {d["through"].re} {d["through"].im}'
- f' {d["isolation"].re} {d["isolation"].im}')
- return s
- class CalDataSet:
- def __init__(self):
- self.data = defaultdict(CalData)
- def insert(self, name: str, dp: Datapoint):
- if name not in self.data[dp.freq]:
- raise KeyError(name)
- self.data[dp.freq]["freq"] = dp.freq
- self.data[dp.freq][name] = dp
- def frequencies(self) -> List[int]:
- return sorted(self.data.keys())
- def get(self, freq: int) -> CalData:
- return self.data[freq]
- def items(self):
- for item in self.data.items():
- yield item
- def values(self):
- for freq in self.frequencies():
- yield self.get(freq)
- def size_of(self, name: str) -> int:
- return len([v for v in self.data.values() if v[name] is not None])
- def complete1port(self) -> bool:
- for val in self.data.values():
- for name in ("short", "open", "load"):
- if val[name] is None:
- return False
- return any(self.data)
- def complete2port(self) -> bool:
- for val in self.data.values():
- for name in ("short", "open", "load", "through", "isolation"):
- if val[name] is None:
- return False
- return any(self.data)
- class Calibration:
- CAL_NAMES = ("short", "open", "load", "through", "isolation",)
- IDEAL_SHORT = complex(-1, 0)
- IDEAL_OPEN = complex(1, 0)
- IDEAL_LOAD = complex(0, 0)
- def __init__(self):
- self.notes = []
- self.dataset = CalDataSet()
- self.interp = {}
- self.useIdealShort = True
- self.shortL0 = 5.7 * 10E-12
- self.shortL1 = -8960 * 10E-24
- self.shortL2 = -1100 * 10E-33
- self.shortL3 = -41200 * 10E-42
- self.shortLength = -34.2
-
-
- self.useIdealOpen = True
-
-
- self.openC0 = 2.1 * 10E-14
- self.openC1 = 5.67 * 10E-23
- self.openC2 = -2.39 * 10E-31
- self.openC3 = 2.0 * 10E-40
- self.openLength = 0
- self.useIdealLoad = True
- self.loadR = 25
- self.loadL = 0
- self.loadC = 0
- self.loadLength = 0
- self.useIdealThrough = True
- self.throughLength = 0
- self.isCalculated = False
- self.source = "Manual"
- def insert(self, name: str, data: List[Datapoint]):
- for dp in data:
- self.dataset.insert(name, dp)
- def size(self) -> int:
- return len(self.dataset.frequencies())
- def data_size(self, name) -> int:
- return self.dataset.size_of(name)
- def isValid1Port(self) -> bool:
- return self.dataset.complete1port()
- def isValid2Port(self) -> bool:
- return self.dataset.complete2port()
- def calc_corrections(self):
- if not self.isValid1Port():
- logger.warning(
- "Tried to calibrate from insufficient data.")
- raise ValueError(
- "All of short, open and load calibration steps"
- "must be completed for calibration to be applied.")
- logger.debug("Calculating calibration for %d points.", self.size())
- for freq, caldata in self.dataset.items():
- g1 = self.gamma_short(freq)
- g2 = self.gamma_open(freq)
- g3 = self.gamma_load(freq)
- gm1 = caldata["short"].z
- gm2 = caldata["open"].z
- gm3 = caldata["load"].z
- try:
- denominator = (g1 * (g2 - g3) * gm1 +
- g2 * g3 * gm2 - g2 * g3 * gm3 -
- (g2 * gm2 - g3 * gm3) * g1)
- caldata["e00"] = - ((g2 * gm3 - g3 * gm3) * g1 * gm2 -
- (g2 * g3 * gm2 - g2 * g3 * gm3 -
- (g3 * gm2 - g2 * gm3) * g1) * gm1
- ) / denominator
- caldata["e11"] = ((g2 - g3) * gm1 - g1 * (gm2 - gm3) +
- g3 * gm2 - g2 * gm3) / denominator
- caldata["delta_e"] = - ((g1 * (gm2 - gm3) - g2 * gm2 + g3 *
- gm3) * gm1 + (g2 * gm3 - g3 * gm3) *
- gm2) / denominator
- except ZeroDivisionError:
- self.isCalculated = False
- logger.error(
- "Division error - did you use the same measurement"
- " for two of short, open and load?")
- raise ValueError(
- f"Two of short, open and load returned the same"
- f" values at frequency {freq}Hz.")
- if self.isValid2Port():
- caldata["e30"] = caldata["isolation"].z
- gt = self.gamma_through(freq)
- caldata["e10e32"] = (caldata["through"].z / gt - caldata["e30"]
- ) * (1 - caldata["e11"]**2)
- self.gen_interpolation()
- self.isCalculated = True
- logger.debug("Calibration correctly calculated.")
- def gamma_short(self, freq: int) -> complex:
- g = Calibration.IDEAL_SHORT
- if not self.useIdealShort:
- logger.debug("Using short calibration set values.")
- Zsp = complex(0, 1) * 2 * math.pi * freq * (
- self.shortL0 + self.shortL1 * freq +
- self.shortL2 * freq**2 + self.shortL3 * freq**3)
-
- g = (Zsp / 50 - 1) / (Zsp / 50 + 1) * cmath.exp(
- complex(0, 1) * 2 * math.pi * 2 * freq *
- self.shortLength * -1)
- return g
- def gamma_open(self, freq: int) -> complex:
- g = Calibration.IDEAL_OPEN
- if not self.useIdealOpen:
- logger.debug("Using open calibration set values.")
- divisor = (2 * math.pi * freq * (
- self.openC0 + self.openC1 * freq +
- self.openC2 * freq**2 + self.openC3 * freq**3))
- if divisor != 0:
- Zop = complex(0, -1) / divisor
- g = ((Zop / 50 - 1) / (Zop / 50 + 1)) * cmath.exp(
- complex(0, 1) * 2 * math.pi *
- 2 * freq * self.openLength * -1)
- return g
- def gamma_load(self, freq: int) -> complex:
- g = Calibration.IDEAL_LOAD
- if not self.useIdealLoad:
- logger.debug("Using load calibration set values.")
- Zl = self.loadR + (complex(0, 1) * 2 *
- math.pi * freq * self.loadL)
- g = (Zl / 50 - 1) / (Zl / 50 + 1) * cmath.exp(
- complex(0, 1) * 2 * math.pi *
- 2 * freq * self.loadLength * -1)
- return g
- def gamma_through(self, freq: int) -> complex:
- g = complex(1, 0)
- if not self.useIdealThrough:
- logger.debug("Using through calibration set values.")
- g = cmath.exp(complex(0, 1) * 2 * math.pi *
- self.throughLength * freq * -1)
- return g
- def gen_interpolation(self):
- freq = []
- e00 = []
- e11 = []
- delta_e = []
- e30 = []
- e10e32 = []
- for caldata in self.dataset.values():
- freq.append(caldata["freq"])
- e00.append(caldata["e00"])
- e11.append(caldata["e11"])
- delta_e.append(caldata["delta_e"])
- e30.append(caldata["e30"])
- e10e32.append(caldata["e10e32"])
- self.interp = {
- "e00": interp1d(freq, e00,
- kind="slinear", bounds_error=False,
- fill_value=(e00[0], e00[-1])),
- "e11": interp1d(freq, e11,
- kind="slinear", bounds_error=False,
- fill_value=(e11[0], e11[-1])),
- "delta_e": interp1d(freq, delta_e,
- kind="slinear", bounds_error=False,
- fill_value=(delta_e[0], delta_e[-1])),
- "e30": interp1d(freq, e30,
- kind="slinear", bounds_error=False,
- fill_value=(e30[0], e30[-1])),
- "e10e32": interp1d(freq, e10e32,
- kind="slinear", bounds_error=False,
- fill_value=(e10e32[0], e10e32[-1])),
- }
- def correct11(self, dp: Datapoint):
- i = self.interp
- s11 = (dp.z - i["e00"](dp.freq)) / (
- (dp.z * i["e11"](dp.freq)) - i["delta_e"](dp.freq))
- return Datapoint(dp.freq, s11.real, s11.imag)
- def correct21(self, dp: Datapoint):
- i = self.interp
- s21 = (dp.z - i["e30"](dp.freq)) / i["e10e32"](dp.freq)
- return Datapoint(dp.freq, s21.real, s21.imag)
-
- def save(self, filename: str):
-
- if not self.isValid1Port():
- raise ValueError("Not a valid 1-Port calibration")
- with open(f"{filename}", "w") as calfile:
- calfile.write("# Calibration data for NanoVNA-Saver\n")
- for note in self.notes:
- calfile.write(f"! {note}\n")
- calfile.write(
- "# Hz ShortR ShortI OpenR OpenI LoadR LoadI"
- " ThroughR ThroughI IsolationR IsolationI\n")
- for freq in self.dataset.frequencies():
- calfile.write(f"{self.dataset.get(freq)}\n")
-
-
- def load(self, filename):
- self.source = os.path.basename(filename)
- self.dataset = CalDataSet()
- self.notes = []
- parsed_header = False
- with open(filename) as calfile:
- for i, line in enumerate(calfile):
- line = line.strip()
- if line.startswith("!"):
- note = line[2:]
- self.notes.append(note)
- continue
- if line.startswith("#"):
- if not parsed_header:
-
- if line == (
- "# Hz ShortR ShortI OpenR OpenI LoadR LoadI"
- " ThroughR ThroughI IsolationR IsolationI"):
- parsed_header = True
- continue
- if not parsed_header:
- logger.warning(
- "Warning: Read line without having read header: %s",
- line)
- continue
- m = RXP_CAL_LINE.search(line)
- if not m:
- logger.warning("Illegal data in cal file. Line %i", i)
- cal = m.groupdict()
- if cal["throughr"]:
- nr_cals = 5
- else:
- nr_cals = 3
- for name in Calibration.CAL_NAMES[:nr_cals]:
- self.dataset.insert(
- name,
- Datapoint(int(cal["freq"]),
- float(cal[f"{name}r"]),
- float(cal[f"{name}i"])))
|