|
@@ -0,0 +1,446 @@
|
|
|
+# NanoVNASaver
|
|
|
+#
|
|
|
+# A python program to view and export Touchstone data from a NanoVNA
|
|
|
+# Copyright (C) 2019, 2020 Rune B. Broberg
|
|
|
+# Copyright (C) 2020 NanoVNA-Saver Authors
|
|
|
+#
|
|
|
+# This program is free software: you can redistribute it and/or modify
|
|
|
+# it under the terms of the GNU General Public License as published by
|
|
|
+# the Free Software Foundation, either version 3 of the License, or
|
|
|
+# (at your option) any later version.
|
|
|
+#
|
|
|
+# This program is distributed in the hope that it will be useful,
|
|
|
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
+# GNU General Public License for more details.
|
|
|
+#
|
|
|
+# You should have received a copy of the GNU General Public License
|
|
|
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
+import logging
|
|
|
+import platform
|
|
|
+import time
|
|
|
+from struct import pack, unpack_from
|
|
|
+
|
|
|
+from time import sleep
|
|
|
+from typing import List
|
|
|
+from Serial import Interface
|
|
|
+from Calibration import Calibration
|
|
|
+from RFTools import Datapoint
|
|
|
+
|
|
|
+__all__ = ['NanoVNA_V2_H4']
|
|
|
+
|
|
|
+if platform.system() != "Windows":
|
|
|
+ import tty
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+_CMD_NOP = 0x00
|
|
|
+_CMD_INDICATE = 0x0D
|
|
|
+_CMD_READ = 0x10
|
|
|
+_CMD_READ2 = 0x11
|
|
|
+_CMD_READ4 = 0x12
|
|
|
+_CMD_READFIFO = 0x18
|
|
|
+_CMD_WRITE = 0x20
|
|
|
+_CMD_WRITE2 = 0x21
|
|
|
+_CMD_WRITE4 = 0x22
|
|
|
+_CMD_WRITE8 = 0x23
|
|
|
+_CMD_WRITEFIFO = 0x28
|
|
|
+
|
|
|
+WRITE_SLEEP = 0.05
|
|
|
+
|
|
|
+_ADF4350_TXPOWER_DESC_MAP = {
|
|
|
+ 0: "9dattenuation",
|
|
|
+ 1: "6dB attenuation",
|
|
|
+ 2: "3dB attenuation",
|
|
|
+ 3: "Maximum",
|
|
|
+}
|
|
|
+_ADF4350_TXPOWER_DESC_REV_MAP = {
|
|
|
+ value: key for key, value in _ADF4350_TXPOWER_DESC_MAP.items()
|
|
|
+}
|
|
|
+
|
|
|
+# _NANOVNA_V2_CMDS = {
|
|
|
+# 'NOP' : {'val': 0x, 'len':, 'resp': },
|
|
|
+# 'INDICATE' : {'val': 0x, 'len':, 'resp': },
|
|
|
+# 'READ' : {'val': 0x, 'len':, 'resp': },
|
|
|
+# 'READ2' : {'val': 0x, 'len':, 'resp': },
|
|
|
+# 'READ4' : {'val': 0x, 'len':, 'resp': },
|
|
|
+# 'READFIFO' : {'val': 0x, 'len':, 'resp': },
|
|
|
+# 'NOP' : {'val': 0x, 'len':, 'resp': },
|
|
|
+# 'NOP' : {'val': 0x, 'len':, 'resp': },
|
|
|
+
|
|
|
+# }
|
|
|
+
|
|
|
+_NANOVNA_V2_REGS = {
|
|
|
+ "sweepStartHz": {"addr": 0x00, "len": 8},
|
|
|
+ "sweepStepHz": {"addr": 0x10, "len": 8},
|
|
|
+ "sweepPoints": {"addr": 0x20, "len": 2},
|
|
|
+ "valuesPerFrequency": {"addr": 0x22, "len": 2},
|
|
|
+ "rawSamplesMode": {"addr": 0x26, "len": 1},
|
|
|
+ "valuesFIFO": {"addr": 0x30, "len": 4},
|
|
|
+ "deviceVariant": {"addr": 0xF0, "len": 1},
|
|
|
+ "protocolVersion": {"addr": 0xF1, "len": 1},
|
|
|
+ "hardwareRevision": {"addr": 0xF2, "len": 1},
|
|
|
+ "firmwareMajor": {"addr": 0xF3, "len": 1},
|
|
|
+ "firmwareMinor": {"addr": 0xF4, "len": 1},
|
|
|
+}
|
|
|
+
|
|
|
+"""
|
|
|
+NanoVNA FIFO Format (32 bytes)
|
|
|
+
|
|
|
+fwd0Re (uint32)
|
|
|
+fw0Im (uint32)
|
|
|
+rev0Re (uint32)
|
|
|
+rev0Re (uint32)
|
|
|
+rev1Re (uint32)
|
|
|
+rev1Im (uint32)
|
|
|
+freqIndex (uint16)
|
|
|
+reserved (6 bytes)
|
|
|
+"""
|
|
|
+_NANOVAN_V2_FIFO_FORMAT = "<iiiiiiHxxxxxx"
|
|
|
+
|
|
|
+
|
|
|
+class NanoVNA_V2_H4:
|
|
|
+ name = "NanoVNA V2 Plus4"
|
|
|
+
|
|
|
+ def __init__(self, iface: Interface = None):
|
|
|
+ self.iface = iface
|
|
|
+ self.iface.open()
|
|
|
+
|
|
|
+ if platform.system() != "Windows":
|
|
|
+ tty.setraw(self.iface.fd)
|
|
|
+
|
|
|
+ # reset protocol to known state
|
|
|
+ with self.iface.lock:
|
|
|
+ self.iface.write(pack("<Q", 0))
|
|
|
+ sleep(WRITE_SLEEP)
|
|
|
+ # # firmware major version of 0xff indicates dfu mode
|
|
|
+ # if self.version.data["major"] == 0xff:
|
|
|
+ # raise IOError('Device is in DFU mode')
|
|
|
+
|
|
|
+ # Write Parameters
|
|
|
+ self.write_sleep = 0.05
|
|
|
+
|
|
|
+ # Device Ranges
|
|
|
+ self.valid_freq = range(
|
|
|
+ int(50e3), int(4.4e9 + 1)
|
|
|
+ ) # Frequency Range: 50kHz - 4.4GHz, 1Hz Resolution
|
|
|
+ self.valid_power = None # Not Implemented / Fixed
|
|
|
+ self.valid_bw = 800 # Fixed
|
|
|
+ self.valid_points = range(1, 10000) # TODO: Increase to actual limit
|
|
|
+
|
|
|
+ # Device Defaults
|
|
|
+ self.freq_start = 140e6
|
|
|
+ self.freq_step = None
|
|
|
+ self.freq_end = 4.4e9
|
|
|
+ self.bw = 800
|
|
|
+ self.points = 100
|
|
|
+
|
|
|
+ # Device Memory
|
|
|
+ self._fwd = None
|
|
|
+ self._refl = None
|
|
|
+ self._thru = None
|
|
|
+ self._f = None
|
|
|
+ self._s11 = None
|
|
|
+ self._s22 = None
|
|
|
+ self._cal = Calibration()
|
|
|
+ self._cal_valid = False
|
|
|
+
|
|
|
+ # Load Defaults
|
|
|
+ self.set_sweep(self.freq_start, self.freq_end, self.points)
|
|
|
+
|
|
|
+ def __del__(self):
|
|
|
+ self.iface.close()
|
|
|
+
|
|
|
+ def connect(self) -> bool:
|
|
|
+ self.iface.open()
|
|
|
+ return self.iface.isOpen()
|
|
|
+
|
|
|
+ def disconnect(self) -> bool:
|
|
|
+ self.iface.close()
|
|
|
+ return not self.iface.isOpen()
|
|
|
+
|
|
|
+ def reconnect(self) -> bool:
|
|
|
+ self.iface.close()
|
|
|
+ sleep(self.wait)
|
|
|
+ self.iface.open()
|
|
|
+ return self.iface.isOpen()
|
|
|
+
|
|
|
+ def _read_register(self, reg_name: str):
|
|
|
+ if reg_name not in _NANOVNA_V2_REGS:
|
|
|
+ raise ValueError(
|
|
|
+ f"must be name of nanoVNA v2 register: {_NANOVNA_V2_REGS.keys()}"
|
|
|
+ )
|
|
|
+
|
|
|
+ size = _NANOVNA_V2_REGS[reg_name]["len"]
|
|
|
+ addr = _NANOVNA_V2_REGS[reg_name]["addr"]
|
|
|
+
|
|
|
+ if size == 1:
|
|
|
+ packet = pack("<BB", _CMD_READ, addr)
|
|
|
+ elif size == 2:
|
|
|
+ packet = pack("<BB", _CMD_READ2, addr)
|
|
|
+ elif size == 4:
|
|
|
+ packet = pack("<BB", _CMD_READ4, addr)
|
|
|
+ elif size == 8:
|
|
|
+ packet = pack("<BBBB", _CMD_READ4, addr, _CMD_READ4, addr + 4)
|
|
|
+
|
|
|
+ self.iface.flush()
|
|
|
+ self.iface.write(packet)
|
|
|
+ resp = self.iface.read(size)
|
|
|
+
|
|
|
+ if len(resp) != size:
|
|
|
+ return None
|
|
|
+ return int.from_bytes(resp, byteorder="little")
|
|
|
+
|
|
|
+ def _write_register(self, reg_name, value):
|
|
|
+ if reg_name not in _NANOVNA_V2_REGS:
|
|
|
+ raise ValueError("must be name of nanoVNA v2 register")
|
|
|
+
|
|
|
+ size = _NANOVNA_V2_REGS[reg_name]["len"]
|
|
|
+ addr = _NANOVNA_V2_REGS[reg_name]["addr"]
|
|
|
+
|
|
|
+ if size == 1:
|
|
|
+ packet = pack("<BBB", _CMD_WRITE, addr, value)
|
|
|
+ elif size == 2:
|
|
|
+ packet = pack("<BBH", _CMD_WRITE2, addr, value)
|
|
|
+ elif size == 4:
|
|
|
+ packet = pack("<BBI", _CMD_WRITE4, addr, value)
|
|
|
+ elif size == 8:
|
|
|
+ packet = pack("<BBQ", _CMD_WRITE8, addr, value)
|
|
|
+
|
|
|
+ self.iface.write(packet)
|
|
|
+
|
|
|
+ def _read_fifo(self, points: int):
|
|
|
+ # clear something
|
|
|
+ self.iface.write(pack("<Q", 0))
|
|
|
+
|
|
|
+ # clear FIFO
|
|
|
+ self._write_register("valuesFIFO", 0x00)
|
|
|
+
|
|
|
+ points_remaining = points
|
|
|
+ fifo_bytes = bytes()
|
|
|
+
|
|
|
+ while points_remaining > 0:
|
|
|
+
|
|
|
+ # calculate the number of points to read (255) at a time
|
|
|
+ points_to_read = min(255, points_remaining)
|
|
|
+ bytes_to_read = points_to_read * 32
|
|
|
+
|
|
|
+ # TODO: Fix
|
|
|
+ self.iface.write(pack("<BBB", _CMD_READFIFO, 0x30, points_to_read))
|
|
|
+ timeout = self.iface.timeout
|
|
|
+ self.iface.timeout = points_to_read * 0.032 + 0.1
|
|
|
+ resp = self.iface.read(bytes_to_read)
|
|
|
+
|
|
|
+ if len(resp) != bytes_to_read:
|
|
|
+ self.iface.timeout = timeout
|
|
|
+ return None
|
|
|
+
|
|
|
+ fifo_bytes = fifo_bytes + resp
|
|
|
+ points_remaining = points_remaining - points_to_read
|
|
|
+
|
|
|
+ # restore timeout
|
|
|
+ self.iface.timeout = timeout
|
|
|
+ return fifo_bytes
|
|
|
+
|
|
|
+ def read_registers(self):
|
|
|
+ i = dict()
|
|
|
+ for reg in _NANOVNA_V2_REGS.keys():
|
|
|
+ if reg == "valuesFIFO": # skip
|
|
|
+ continue
|
|
|
+ i[reg] = self._read_register(reg)
|
|
|
+ return i
|
|
|
+
|
|
|
+ def set_sweep(
|
|
|
+ self, freq_start: float, freq_end: float, points: float = 101, avg: int = 1
|
|
|
+ ):
|
|
|
+
|
|
|
+ # input checking
|
|
|
+ if int(freq_start) not in self.valid_freq:
|
|
|
+ raise ValueError("outside of operating frequency range")
|
|
|
+ if int(freq_end) not in self.valid_freq:
|
|
|
+ raise ValueError("outside of operating frequency range")
|
|
|
+ if freq_end <= freq_start:
|
|
|
+ raise ValueError("start frequency must be < end frequency")
|
|
|
+ if int(points) not in self.valid_points:
|
|
|
+ raise ValueError("invalid number of data points")
|
|
|
+
|
|
|
+ # store and calc registers
|
|
|
+ self.points = int(points)
|
|
|
+ self.freq_start = int(freq_start)
|
|
|
+ self.freq_step = int((int(freq_end) - freq_start) / (self.points - 1))
|
|
|
+ self.freq_end = self.freq_start + self.freq_step * (self.points - 1)
|
|
|
+ self.avg = avg
|
|
|
+
|
|
|
+ # write registers
|
|
|
+ self._write_register("sweepStartHz", self.freq_start)
|
|
|
+ self._write_register("sweepStepHz", self.freq_step)
|
|
|
+ self._write_register("sweepPoints", self.points)
|
|
|
+ self._write_register("valuesPerFrequency", self.avg)
|
|
|
+
|
|
|
+ # allocate memory for sweep data
|
|
|
+ self._fwd = [complex()] * self.points
|
|
|
+ self._refl = [complex()] * self.points
|
|
|
+ self._thru = [complex()] * self.points
|
|
|
+ self._freq_index = [int()] * self.points
|
|
|
+ self._f = [int()] * self.points
|
|
|
+ self._s11 = [complex()] * self.points
|
|
|
+ self._s21 = [complex()] * self.points
|
|
|
+
|
|
|
+ def get_sweep(self) -> List:
|
|
|
+ reg = self.read_registers()
|
|
|
+ self.freq_start = reg["sweepStartHz"]
|
|
|
+ self.freq_step = reg["sweepStepHz"]
|
|
|
+ self.points = reg["sweepPoints"]
|
|
|
+ self.freq_end = self.freq_start + self.freq_step * (self.points - 1)
|
|
|
+ self.avg = reg["valuesPerFrequency"]
|
|
|
+ return (self.freq_start, self.freq_end, self.points, self.avg)
|
|
|
+
|
|
|
+ def wizard_cal(self, port1_only:bool=False):
|
|
|
+ # wipe old cal because I'm scared
|
|
|
+ self._cal = Calibration()
|
|
|
+
|
|
|
+ # short
|
|
|
+ input("connect Short to port 1...")
|
|
|
+ fs, s11_point, _ = self.measure(skip_calibration=True)
|
|
|
+ s11_dp = []
|
|
|
+ for (f, s11_point) in zip(fs, s11_point):
|
|
|
+ s11_dp.append(Datapoint(f, s11_point.real, s11_point.imag))
|
|
|
+ self._cal.insert("short", s11_dp)
|
|
|
+
|
|
|
+ # open
|
|
|
+ input("connect Open to port 1...")
|
|
|
+ fs, s11, _ = self.measure(skip_calibration=True)
|
|
|
+ s11_dp = []
|
|
|
+ for (f, s11_point) in zip(fs, s11):
|
|
|
+ s11_dp.append(Datapoint(f, s11_point.real, s11_point.imag))
|
|
|
+ self._cal.insert("open", s11_dp)
|
|
|
+
|
|
|
+ # load
|
|
|
+ input("connect Load to port 1...")
|
|
|
+ fs, s11, _ = self.measure(skip_calibration=True)
|
|
|
+ s11_dp = []
|
|
|
+ for (f, s11_point) in zip(fs, s11):
|
|
|
+ s11_dp.append(Datapoint(f, s11_point.real, s11_point.imag))
|
|
|
+ self._cal.insert("load", s11_dp)
|
|
|
+
|
|
|
+ # through
|
|
|
+ input("connect port 1 to port 2...")
|
|
|
+ fs, _, s21 = self.measure(skip_calibration=True)
|
|
|
+ s21_dp = []
|
|
|
+ for (f, s21_point) in zip(fs, s21):
|
|
|
+ s21_dp.append(Datapoint(f, s21_point.real, s21_point.imag))
|
|
|
+ self._cal.insert("through", s21_dp)
|
|
|
+
|
|
|
+ # isolation
|
|
|
+ input("connect load to both ports...")
|
|
|
+ fs, _, s21 = self.measure(skip_calibration=True)
|
|
|
+ s21_dp = []
|
|
|
+ for (f, s21_point) in zip(fs, s21):
|
|
|
+ s21_dp.append(Datapoint(f, s21_point.real, s21_point.imag))
|
|
|
+ self._cal.insert("isolation", s21_dp)
|
|
|
+
|
|
|
+ # calculate corrections
|
|
|
+ self._cal.calc_corrections()
|
|
|
+
|
|
|
+ if self._cal.isValid1Port():
|
|
|
+ self._cal_valid = True
|
|
|
+ else:
|
|
|
+ self._cal_valid = False
|
|
|
+
|
|
|
+ # Store Notes
|
|
|
+ notes = dict()
|
|
|
+ notes['Date'] = time.ctime()
|
|
|
+ start, stop, points, avg = self.get_sweep()
|
|
|
+ notes['Sweep Start'] = start
|
|
|
+ notes['Sweep Stop'] = stop
|
|
|
+ notes['Sweep Points'] = points
|
|
|
+ notes['Point Average'] = avg
|
|
|
+ notes['Device Name'] = self.name
|
|
|
+ notes['Device Settings'] = self.read_registers()
|
|
|
+
|
|
|
+ self._cal.notes.append(notes) # date
|
|
|
+
|
|
|
+
|
|
|
+ return self._cal_valid
|
|
|
+
|
|
|
+ def save_cal(self, filename: str):
|
|
|
+ if self._cal_valid:
|
|
|
+ self._cal.save(filename)
|
|
|
+
|
|
|
+ def load_cal(self, filename: str) -> bool:
|
|
|
+ self._cal = Calibration()
|
|
|
+ self._cal.load(filename)
|
|
|
+ self._cal.calc_corrections()
|
|
|
+ if self._cal.isValid1Port():
|
|
|
+ self._cal_valid = True
|
|
|
+ else:
|
|
|
+ self._cal_valid = False
|
|
|
+ return self._cal_valid
|
|
|
+
|
|
|
+ def measure(self, skip_calibration=False):
|
|
|
+
|
|
|
+ # retrive data from fifo
|
|
|
+ fifo_bytes = self._read_fifo(self.points)
|
|
|
+
|
|
|
+ if fifo_bytes == None:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # unpack data and convert to complex
|
|
|
+ for n_point in range(self.points):
|
|
|
+
|
|
|
+ # TODO: clean this up
|
|
|
+ (
|
|
|
+ fwd_real,
|
|
|
+ fwd_imag,
|
|
|
+ rev0_real,
|
|
|
+ rev0_imag,
|
|
|
+ rev1_real,
|
|
|
+ rev1_imag,
|
|
|
+ freq_index,
|
|
|
+ ) = unpack_from(_NANOVAN_V2_FIFO_FORMAT, fifo_bytes, n_point * 32)
|
|
|
+ fwd = complex(fwd_real, fwd_imag)
|
|
|
+ refl = complex(rev0_real, rev0_imag)
|
|
|
+ thru = complex(rev1_real, rev1_imag)
|
|
|
+
|
|
|
+ # store internally and calculate s11, s21
|
|
|
+ self._fwd[freq_index] = fwd
|
|
|
+ self._refl[freq_index] = refl
|
|
|
+ self._thru[freq_index] = thru
|
|
|
+
|
|
|
+ self._f[freq_index] = self.freq_start + self.freq_step * freq_index
|
|
|
+ self._s11[freq_index] = refl / fwd
|
|
|
+ self._s21[freq_index] = thru / fwd
|
|
|
+
|
|
|
+ # skip calibration if requested
|
|
|
+ if skip_calibration==True:
|
|
|
+ return self._f, self._s11, self._s21
|
|
|
+
|
|
|
+ # apply calibration
|
|
|
+ # TODO: Implement this better (skrf or custom modifications to calibration import)
|
|
|
+ cal_s11 = None
|
|
|
+ cal_s21 = None
|
|
|
+ if self._cal.isValid1Port():
|
|
|
+ cal_s11 = []
|
|
|
+ for f, s11_point in zip(self._f, self._s11):
|
|
|
+ dp = Datapoint(f, s11_point.real, s11_point.imag)
|
|
|
+ cal_s11.append(self._cal.correct11(dp).z)
|
|
|
+
|
|
|
+ if self._cal.isValid2Port():
|
|
|
+ cal_s21 = []
|
|
|
+ for f, s21_point in zip(self._f, self._s21):
|
|
|
+ dp = Datapoint(f, s21_point.real, s21_point.imag)
|
|
|
+ cal_s21.append(self._cal.correct21(dp).z)
|
|
|
+
|
|
|
+ if cal_s11 == None:
|
|
|
+ return self._f, None, None
|
|
|
+
|
|
|
+ if cal_s21 == None:
|
|
|
+ return self._f, cal_s11, None
|
|
|
+
|
|
|
+ return self._f, cal_s11, cal_s21
|
|
|
+
|
|
|
+ # def setTXPower(self, freq_range, power_desc):
|
|
|
+ # if freq_range[0] != 140e6:
|
|
|
+ # raise ValueError('Invalid TX power frequency range')
|
|
|
+ # # 140MHz..max => ADF4350
|
|
|
+ # self._set_register(0x42, _ADF4350_TXPOWER_DESC_REV_MAP[power_desc], 1)
|