123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450 |
- # NanoVNASaver
- #
- # A python program to view and export Touchstone data from a NanoVNA
- # Copyright (C) 2019, 2020 Rune B. Broberg
- # Copyright (C) 2020 NanoVNA-Saver Authors
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <https://www.gnu.org/licenses/>.
- import logging
- import platform
- import time
- from struct import pack, unpack_from
- from time import sleep
- from typing import Dict, List
- from .Serial import Interface
- from .Calibration import Calibration
- from .RFTools import Datapoint
- __all__ = ['NanoVNA_V2_H4']
- if platform.system() != "Windows":
- import tty
- logger = logging.getLogger(__name__)
- _CMD_NOP = 0x00
- _CMD_INDICATE = 0x0D
- _CMD_READ = 0x10
- _CMD_READ2 = 0x11
- _CMD_READ4 = 0x12
- _CMD_READFIFO = 0x18
- _CMD_WRITE = 0x20
- _CMD_WRITE2 = 0x21
- _CMD_WRITE4 = 0x22
- _CMD_WRITE8 = 0x23
- _CMD_WRITEFIFO = 0x28
- WRITE_SLEEP = 0.05
- _ADF4350_TXPOWER_DESC_MAP = {
- 0: "9dattenuation",
- 1: "6dB attenuation",
- 2: "3dB attenuation",
- 3: "Maximum",
- }
- _ADF4350_TXPOWER_DESC_REV_MAP = {
- value: key for key, value in _ADF4350_TXPOWER_DESC_MAP.items()
- }
- # _NANOVNA_V2_CMDS = {
- # 'NOP' : {'val': 0x, 'len':, 'resp': },
- # 'INDICATE' : {'val': 0x, 'len':, 'resp': },
- # 'READ' : {'val': 0x, 'len':, 'resp': },
- # 'READ2' : {'val': 0x, 'len':, 'resp': },
- # 'READ4' : {'val': 0x, 'len':, 'resp': },
- # 'READFIFO' : {'val': 0x, 'len':, 'resp': },
- # 'NOP' : {'val': 0x, 'len':, 'resp': },
- # 'NOP' : {'val': 0x, 'len':, 'resp': },
- # }
- _NANOVNA_V2_REGS = {
- "sweepStartHz": {"addr": 0x00, "len": 8},
- "sweepStepHz": {"addr": 0x10, "len": 8},
- "sweepPoints": {"addr": 0x20, "len": 2},
- "valuesPerFrequency": {"addr": 0x22, "len": 2},
- "rawSamplesMode": {"addr": 0x26, "len": 1},
- "valuesFIFO": {"addr": 0x30, "len": 4},
- "deviceVariant": {"addr": 0xF0, "len": 1},
- "protocolVersion": {"addr": 0xF1, "len": 1},
- "hardwareRevision": {"addr": 0xF2, "len": 1},
- "firmwareMajor": {"addr": 0xF3, "len": 1},
- "firmwareMinor": {"addr": 0xF4, "len": 1},
- }
- """
- NanoVNA FIFO Format (32 bytes)
- fwd0Re (uint32)
- fw0Im (uint32)
- rev0Re (uint32)
- rev0Re (uint32)
- rev1Re (uint32)
- rev1Im (uint32)
- freqIndex (uint16)
- reserved (6 bytes)
- """
- _NANOVAN_V2_FIFO_FORMAT = "<iiiiiiHxxxxxx"
- class NanoVNA_V2_H4:
- name = "NanoVNA V2 Plus4"
- def __init__(self, iface: Interface = None):
- self.iface = iface
- #self.iface.open()
- if platform.system() != "Windows":
- tty.setraw(self.iface.fd)
- # reset protocol to known state
- with self.iface.lock:
- self.iface.write(pack("<Q", 0))
- sleep(WRITE_SLEEP)
- # # firmware major version of 0xff indicates dfu mode
- # if self.version.data["major"] == 0xff:
- # raise IOError('Device is in DFU mode')
- # Write Parameters
- self.write_sleep = 0.05
- # Device Ranges
- self.valid_freq = range(
- int(50e3), int(4.4e9 + 1)
- ) # Frequency Range: 50kHz - 4.4GHz, 1Hz Resolution
- self.valid_power = None # Not Implemented / Fixed
- self.valid_bw = 800 # Fixed
- self.valid_points = range(1, 10000) # TODO: Increase to actual limit
- # Device Defaults
- self.freq_start = 140e6
- self.freq_step = None
- self.freq_end = 4.4e9
- self.bw = 800
- self.points = 100
- # Device Memory
- self._fwd = None
- self._refl = None
- self._thru = None
- self._f = None
- self._s11 = None
- self._s22 = None
- self._cal = Calibration()
- self._cal_valid = False
- # Load Defaults
- self.set_sweep(self.freq_start, self.freq_end, self.points)
- def __del__(self):
- self.iface.close()
- def connect(self) -> bool:
- self.iface.open()
- return self.iface.isOpen()
- def disconnect(self) -> bool:
- self.iface.close()
- return not self.iface.isOpen()
- def reconnect(self) -> bool:
- self.iface.close()
- sleep(self.wait)
- self.iface.open()
- return self.iface.isOpen()
- def get_info(self):
- reg = self.read_registers()
- info = Dict()
- def _read_register(self, reg_name: str):
- if reg_name not in _NANOVNA_V2_REGS:
- raise ValueError(
- f"must be name of nanoVNA v2 register: {_NANOVNA_V2_REGS.keys()}"
- )
- size = _NANOVNA_V2_REGS[reg_name]["len"]
- addr = _NANOVNA_V2_REGS[reg_name]["addr"]
- if size == 1:
- packet = pack("<BB", _CMD_READ, addr)
- elif size == 2:
- packet = pack("<BB", _CMD_READ2, addr)
- elif size == 4:
- packet = pack("<BB", _CMD_READ4, addr)
- elif size == 8:
- packet = pack("<BBBB", _CMD_READ4, addr, _CMD_READ4, addr + 4)
- self.iface.flush()
- self.iface.write(packet)
- resp = self.iface.read(size)
- if len(resp) != size:
- return None
- return int.from_bytes(resp, byteorder="little")
- def _write_register(self, reg_name, value):
- if reg_name not in _NANOVNA_V2_REGS:
- raise ValueError("must be name of nanoVNA v2 register")
- size = _NANOVNA_V2_REGS[reg_name]["len"]
- addr = _NANOVNA_V2_REGS[reg_name]["addr"]
- if size == 1:
- packet = pack("<BBB", _CMD_WRITE, addr, value)
- elif size == 2:
- packet = pack("<BBH", _CMD_WRITE2, addr, value)
- elif size == 4:
- packet = pack("<BBI", _CMD_WRITE4, addr, value)
- elif size == 8:
- packet = pack("<BBQ", _CMD_WRITE8, addr, value)
- self.iface.write(packet)
- def _read_fifo(self, points: int):
- # clear something
- self.iface.write(pack("<Q", 0))
-
- # clear FIFO
- self._write_register("valuesFIFO", 0x00)
- points_remaining = points
- fifo_bytes = bytes()
- while points_remaining > 0:
- # calculate the number of points to read (255) at a time
- points_to_read = min(255, points_remaining)
- bytes_to_read = points_to_read * 32
- # TODO: Fix
- self.iface.write(pack("<BBB", _CMD_READFIFO, 0x30, points_to_read))
- timeout = self.iface.timeout
- self.iface.timeout = points_to_read * 0.032 + 0.1
- resp = self.iface.read(bytes_to_read)
- if len(resp) != bytes_to_read:
- self.iface.timeout = timeout
- return None
- fifo_bytes = fifo_bytes + resp
- points_remaining = points_remaining - points_to_read
- # restore timeout
- self.iface.timeout = timeout
- return fifo_bytes
- def read_registers(self):
- i = dict()
- for reg in _NANOVNA_V2_REGS.keys():
- if reg == "valuesFIFO": # skip
- continue
- i[reg] = self._read_register(reg)
- return i
- def set_sweep(
- self, freq_start: float, freq_end: float, points: float = 101, avg: int = 1
- ):
- # input checking
- if int(freq_start) not in self.valid_freq:
- raise ValueError("outside of operating frequency range")
- if int(freq_end) not in self.valid_freq:
- raise ValueError("outside of operating frequency range")
- if freq_end <= freq_start:
- raise ValueError("start frequency must be < end frequency")
- if int(points) not in self.valid_points:
- raise ValueError("invalid number of data points")
- # store and calc registers
- self.points = int(points)
- self.freq_start = int(freq_start)
- self.freq_step = int((int(freq_end) - freq_start) / (self.points - 1))
- self.freq_end = self.freq_start + self.freq_step * (self.points - 1)
- self.avg = avg
- # write registers
- self._write_register("sweepStartHz", self.freq_start)
- self._write_register("sweepStepHz", self.freq_step)
- self._write_register("sweepPoints", self.points)
- self._write_register("valuesPerFrequency", self.avg)
- # allocate memory for sweep data
- self._fwd = [complex()] * self.points
- self._refl = [complex()] * self.points
- self._thru = [complex()] * self.points
- self._freq_index = [int()] * self.points
- self._f = [int()] * self.points
- self._s11 = [complex()] * self.points
- self._s21 = [complex()] * self.points
- def get_sweep(self) -> List:
- reg = self.read_registers()
- self.freq_start = reg["sweepStartHz"]
- self.freq_step = reg["sweepStepHz"]
- self.points = reg["sweepPoints"]
- self.freq_end = self.freq_start + self.freq_step * (self.points - 1)
- self.avg = reg["valuesPerFrequency"]
- return (self.freq_start, self.freq_end, self.points, self.avg)
- def wizard_cal(self, port1_only:bool=False):
- # wipe old cal because I'm scared
- self._cal = Calibration()
- # short
- input("connect Short to port 1...")
- fs, s11_point, _ = self.measure(skip_calibration=True)
- s11_dp = []
- for (f, s11_point) in zip(fs, s11_point):
- s11_dp.append(Datapoint(f, s11_point.real, s11_point.imag))
- self._cal.insert("short", s11_dp)
- # open
- input("connect Open to port 1...")
- fs, s11, _ = self.measure(skip_calibration=True)
- s11_dp = []
- for (f, s11_point) in zip(fs, s11):
- s11_dp.append(Datapoint(f, s11_point.real, s11_point.imag))
- self._cal.insert("open", s11_dp)
- # load
- input("connect Load to port 1...")
- fs, s11, _ = self.measure(skip_calibration=True)
- s11_dp = []
- for (f, s11_point) in zip(fs, s11):
- s11_dp.append(Datapoint(f, s11_point.real, s11_point.imag))
- self._cal.insert("load", s11_dp)
- # through
- input("connect port 1 to port 2...")
- fs, _, s21 = self.measure(skip_calibration=True)
- s21_dp = []
- for (f, s21_point) in zip(fs, s21):
- s21_dp.append(Datapoint(f, s21_point.real, s21_point.imag))
- self._cal.insert("through", s21_dp)
- # isolation
- input("connect load to both ports...")
- fs, _, s21 = self.measure(skip_calibration=True)
- s21_dp = []
- for (f, s21_point) in zip(fs, s21):
- s21_dp.append(Datapoint(f, s21_point.real, s21_point.imag))
- self._cal.insert("isolation", s21_dp)
- # calculate corrections
- self._cal.calc_corrections()
- if self._cal.isValid1Port():
- self._cal_valid = True
- else:
- self._cal_valid = False
- # Store Notes
- notes = dict()
- notes['Date'] = time.ctime()
- start, stop, points, avg = self.get_sweep()
- notes['Sweep Start'] = start
- notes['Sweep Stop'] = stop
- notes['Sweep Points'] = points
- notes['Point Average'] = avg
- notes['Device Name'] = self.name
- notes['Device Settings'] = self.read_registers()
-
- self._cal.notes.append(notes) # date
- return self._cal_valid
- def save_cal(self, filename: str):
- if self._cal_valid:
- self._cal.save(filename)
- def load_cal(self, filename: str) -> bool:
- self._cal = Calibration()
- self._cal.load(filename)
- self._cal.calc_corrections()
- if self._cal.isValid1Port():
- self._cal_valid = True
- else:
- self._cal_valid = False
- return self._cal_valid
- def measure(self, skip_calibration=False):
- # retrive data from fifo
- fifo_bytes = self._read_fifo(self.points)
- if fifo_bytes == None:
- return None
- # unpack data and convert to complex
- for n_point in range(self.points):
- # TODO: clean this up
- (
- fwd_real,
- fwd_imag,
- rev0_real,
- rev0_imag,
- rev1_real,
- rev1_imag,
- freq_index,
- ) = unpack_from(_NANOVAN_V2_FIFO_FORMAT, fifo_bytes, n_point * 32)
- fwd = complex(fwd_real, fwd_imag)
- refl = complex(rev0_real, rev0_imag)
- thru = complex(rev1_real, rev1_imag)
- # store internally and calculate s11, s21
- self._fwd[freq_index] = fwd
- self._refl[freq_index] = refl
- self._thru[freq_index] = thru
- self._f[freq_index] = self.freq_start + self.freq_step * freq_index
- self._s11[freq_index] = refl / fwd
- self._s21[freq_index] = thru / fwd
- # skip calibration if requested
- if skip_calibration==True:
- return self._f, self._s11, self._s21
- # apply calibration
- # TODO: Implement this better (skrf or custom modifications to calibration import)
- cal_s11 = None
- cal_s21 = None
- if self._cal.isValid1Port():
- cal_s11 = []
- for f, s11_point in zip(self._f, self._s11):
- dp = Datapoint(f, s11_point.real, s11_point.imag)
- cal_s11.append(self._cal.correct11(dp).z)
- if self._cal.isValid2Port():
- cal_s21 = []
- for f, s21_point in zip(self._f, self._s21):
- dp = Datapoint(f, s21_point.real, s21_point.imag)
- cal_s21.append(self._cal.correct21(dp).z)
- if cal_s11 == None:
- return self._f, None, None
-
- if cal_s21 == None:
- return self._f, cal_s11, None
- return self._f, cal_s11, cal_s21
- # def setTXPower(self, freq_range, power_desc):
- # if freq_range[0] != 140e6:
- # raise ValueError('Invalid TX power frequency range')
- # # 140MHz..max => ADF4350
- # self._set_register(0x42, _ADF4350_TXPOWER_DESC_REV_MAP[power_desc], 1)
|