NanoVNA_V2.py 14 KB


  1. # NanoVNASaver
  2. #
  3. # A python program to view and export Touchstone data from a NanoVNA
  4. # Copyright (C) 2019, 2020 Rune B. Broberg
  5. # Copyright (C) 2020 NanoVNA-Saver Authors
  6. #
  7. # This program is free software: you can redistribute it and/or modify
  8. # it under the terms of the GNU General Public License as published by
  9. # the Free Software Foundation, either version 3 of the License, or
  10. # (at your option) any later version.
  11. #
  12. # This program is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU General Public License
  18. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  19. import logging
  20. import platform
  21. import time
  22. from struct import pack, unpack_from
  23. from time import sleep
  24. from typing import Dict, List
  25. from .Serial import Interface
  26. from .Calibration import Calibration
  27. from .RFTools import Datapoint
  28. __all__ = ['NanoVNA_V2_H4']
  29. if platform.system() != "Windows":
  30. import tty
  31. logger = logging.getLogger(__name__)
  32. _CMD_NOP = 0x00
  33. _CMD_INDICATE = 0x0D
  34. _CMD_READ = 0x10
  35. _CMD_READ2 = 0x11
  36. _CMD_READ4 = 0x12
  37. _CMD_READFIFO = 0x18
  38. _CMD_WRITE = 0x20
  39. _CMD_WRITE2 = 0x21
  40. _CMD_WRITE4 = 0x22
  41. _CMD_WRITE8 = 0x23
  42. _CMD_WRITEFIFO = 0x28
  43. WRITE_SLEEP = 0.05
  44. _ADF4350_TXPOWER_DESC_MAP = {
  45. 0: "9dattenuation",
  46. 1: "6dB attenuation",
  47. 2: "3dB attenuation",
  48. 3: "Maximum",
  49. }
  50. _ADF4350_TXPOWER_DESC_REV_MAP = {
  51. value: key for key, value in _ADF4350_TXPOWER_DESC_MAP.items()
  52. }
  53. # _NANOVNA_V2_CMDS = {
  54. # 'NOP' : {'val': 0x, 'len':, 'resp': },
  55. # 'INDICATE' : {'val': 0x, 'len':, 'resp': },
  56. # 'READ' : {'val': 0x, 'len':, 'resp': },
  57. # 'READ2' : {'val': 0x, 'len':, 'resp': },
  58. # 'READ4' : {'val': 0x, 'len':, 'resp': },
  59. # 'READFIFO' : {'val': 0x, 'len':, 'resp': },
  60. # 'NOP' : {'val': 0x, 'len':, 'resp': },
  61. # 'NOP' : {'val': 0x, 'len':, 'resp': },
  62. # }
  63. _NANOVNA_V2_REGS = {
  64. "sweepStartHz": {"addr": 0x00, "len": 8},
  65. "sweepStepHz": {"addr": 0x10, "len": 8},
  66. "sweepPoints": {"addr": 0x20, "len": 2},
  67. "valuesPerFrequency": {"addr": 0x22, "len": 2},
  68. "rawSamplesMode": {"addr": 0x26, "len": 1},
  69. "valuesFIFO": {"addr": 0x30, "len": 4},
  70. "deviceVariant": {"addr": 0xF0, "len": 1},
  71. "protocolVersion": {"addr": 0xF1, "len": 1},
  72. "hardwareRevision": {"addr": 0xF2, "len": 1},
  73. "firmwareMajor": {"addr": 0xF3, "len": 1},
  74. "firmwareMinor": {"addr": 0xF4, "len": 1},
  75. }
  76. """
  77. NanoVNA FIFO Format (32 bytes)
  78. fwd0Re (uint32)
  79. fw0Im (uint32)
  80. rev0Re (uint32)
  81. rev0Re (uint32)
  82. rev1Re (uint32)
  83. rev1Im (uint32)
  84. freqIndex (uint16)
  85. reserved (6 bytes)
  86. """
  87. _NANOVAN_V2_FIFO_FORMAT = "<iiiiiiHxxxxxx"
  88. class NanoVNA_V2_H4:
  89. name = "NanoVNA V2 Plus4"
  90. def __init__(self, iface: Interface = None):
  91. self.iface = iface
  92. #self.iface.open()
  93. if platform.system() != "Windows":
  94. tty.setraw(self.iface.fd)
  95. # reset protocol to known state
  96. with self.iface.lock:
  97. self.iface.write(pack("<Q", 0))
  98. sleep(WRITE_SLEEP)
  99. # # firmware major version of 0xff indicates dfu mode
  100. # if self.version.data["major"] == 0xff:
  101. # raise IOError('Device is in DFU mode')
  102. # Write Parameters
  103. self.write_sleep = 0.05
  104. # Device Ranges
  105. self.valid_freq = range(
  106. int(50e3), int(4.4e9 + 1)
  107. ) # Frequency Range: 50kHz - 4.4GHz, 1Hz Resolution
  108. self.valid_power = None # Not Implemented / Fixed
  109. self.valid_bw = 800 # Fixed
  110. self.valid_points = range(1, 10000) # TODO: Increase to actual limit
  111. # Device Defaults
  112. self.freq_start = 140e6
  113. self.freq_step = None
  114. self.freq_end = 4.4e9
  115. self.bw = 800
  116. self.points = 100
  117. # Device Memory
  118. self._fwd = None
  119. self._refl = None
  120. self._thru = None
  121. self._f = None
  122. self._s11 = None
  123. self._s22 = None
  124. self._cal = Calibration()
  125. self._cal_valid = False
  126. # Load Defaults
  127. self.set_sweep(self.freq_start, self.freq_end, self.points)
  128. def __del__(self):
  129. self.iface.close()
  130. def connect(self) -> bool:
  131. self.iface.open()
  132. return self.iface.isOpen()
  133. def disconnect(self) -> bool:
  134. self.iface.close()
  135. return not self.iface.isOpen()
  136. def reconnect(self) -> bool:
  137. self.iface.close()
  138. sleep(self.wait)
  139. self.iface.open()
  140. return self.iface.isOpen()
  141. def get_info(self):
  142. reg = self.read_registers()
  143. info = Dict()
  144. def _read_register(self, reg_name: str):
  145. if reg_name not in _NANOVNA_V2_REGS:
  146. raise ValueError(
  147. f"must be name of nanoVNA v2 register: {_NANOVNA_V2_REGS.keys()}"
  148. )
  149. size = _NANOVNA_V2_REGS[reg_name]["len"]
  150. addr = _NANOVNA_V2_REGS[reg_name]["addr"]
  151. if size == 1:
  152. packet = pack("<BB", _CMD_READ, addr)
  153. elif size == 2:
  154. packet = pack("<BB", _CMD_READ2, addr)
  155. elif size == 4:
  156. packet = pack("<BB", _CMD_READ4, addr)
  157. elif size == 8:
  158. packet = pack("<BBBB", _CMD_READ4, addr, _CMD_READ4, addr + 4)
  159. self.iface.flush()
  160. self.iface.write(packet)
  161. resp = self.iface.read(size)
  162. if len(resp) != size:
  163. return None
  164. return int.from_bytes(resp, byteorder="little")
  165. def _write_register(self, reg_name, value):
  166. if reg_name not in _NANOVNA_V2_REGS:
  167. raise ValueError("must be name of nanoVNA v2 register")
  168. size = _NANOVNA_V2_REGS[reg_name]["len"]
  169. addr = _NANOVNA_V2_REGS[reg_name]["addr"]
  170. if size == 1:
  171. packet = pack("<BBB", _CMD_WRITE, addr, value)
  172. elif size == 2:
  173. packet = pack("<BBH", _CMD_WRITE2, addr, value)
  174. elif size == 4:
  175. packet = pack("<BBI", _CMD_WRITE4, addr, value)
  176. elif size == 8:
  177. packet = pack("<BBQ", _CMD_WRITE8, addr, value)
  178. self.iface.write(packet)
  179. def _read_fifo(self, points: int):
  180. # clear something
  181. self.iface.write(pack("<Q", 0))
  182. # clear FIFO
  183. self._write_register("valuesFIFO", 0x00)
  184. points_remaining = points
  185. fifo_bytes = bytes()
  186. while points_remaining > 0:
  187. # calculate the number of points to read (255) at a time
  188. points_to_read = min(255, points_remaining)
  189. bytes_to_read = points_to_read * 32
  190. # TODO: Fix
  191. self.iface.write(pack("<BBB", _CMD_READFIFO, 0x30, points_to_read))
  192. timeout = self.iface.timeout
  193. self.iface.timeout = points_to_read * 0.032 + 0.1
  194. resp = self.iface.read(bytes_to_read)
  195. if len(resp) != bytes_to_read:
  196. self.iface.timeout = timeout
  197. return None
  198. fifo_bytes = fifo_bytes + resp
  199. points_remaining = points_remaining - points_to_read
  200. # restore timeout
  201. self.iface.timeout = timeout
  202. return fifo_bytes
  203. def read_registers(self):
  204. i = dict()
  205. for reg in _NANOVNA_V2_REGS.keys():
  206. if reg == "valuesFIFO": # skip
  207. continue
  208. i[reg] = self._read_register(reg)
  209. return i
  210. def set_sweep(
  211. self, freq_start: float, freq_end: float, points: float = 101, avg: int = 1
  212. ):
  213. # input checking
  214. if int(freq_start) not in self.valid_freq:
  215. raise ValueError("outside of operating frequency range")
  216. if int(freq_end) not in self.valid_freq:
  217. raise ValueError("outside of operating frequency range")
  218. if freq_end <= freq_start:
  219. raise ValueError("start frequency must be < end frequency")
  220. if int(points) not in self.valid_points:
  221. raise ValueError("invalid number of data points")
  222. # store and calc registers
  223. self.points = int(points)
  224. self.freq_start = int(freq_start)
  225. self.freq_step = int((int(freq_end) - freq_start) / (self.points - 1))
  226. self.freq_end = self.freq_start + self.freq_step * (self.points - 1)
  227. self.avg = avg
  228. # write registers
  229. self._write_register("sweepStartHz", self.freq_start)
  230. self._write_register("sweepStepHz", self.freq_step)
  231. self._write_register("sweepPoints", self.points)
  232. self._write_register("valuesPerFrequency", self.avg)
  233. # allocate memory for sweep data
  234. self._fwd = [complex()] * self.points
  235. self._refl = [complex()] * self.points
  236. self._thru = [complex()] * self.points
  237. self._freq_index = [int()] * self.points
  238. self._f = [int()] * self.points
  239. self._s11 = [complex()] * self.points
  240. self._s21 = [complex()] * self.points
  241. def get_sweep(self) -> List:
  242. reg = self.read_registers()
  243. self.freq_start = reg["sweepStartHz"]
  244. self.freq_step = reg["sweepStepHz"]
  245. self.points = reg["sweepPoints"]
  246. self.freq_end = self.freq_start + self.freq_step * (self.points - 1)
  247. self.avg = reg["valuesPerFrequency"]
  248. return (self.freq_start, self.freq_end, self.points, self.avg)
  249. def wizard_cal(self, port1_only:bool=False):
  250. # wipe old cal because I'm scared
  251. self._cal = Calibration()
  252. # short
  253. input("connect Short to port 1...")
  254. fs, s11_point, _ = self.measure(skip_calibration=True)
  255. s11_dp = []
  256. for (f, s11_point) in zip(fs, s11_point):
  257. s11_dp.append(Datapoint(f, s11_point.real, s11_point.imag))
  258. self._cal.insert("short", s11_dp)
  259. # open
  260. input("connect Open to port 1...")
  261. fs, s11, _ = self.measure(skip_calibration=True)
  262. s11_dp = []
  263. for (f, s11_point) in zip(fs, s11):
  264. s11_dp.append(Datapoint(f, s11_point.real, s11_point.imag))
  265. self._cal.insert("open", s11_dp)
  266. # load
  267. input("connect Load to port 1...")
  268. fs, s11, _ = self.measure(skip_calibration=True)
  269. s11_dp = []
  270. for (f, s11_point) in zip(fs, s11):
  271. s11_dp.append(Datapoint(f, s11_point.real, s11_point.imag))
  272. self._cal.insert("load", s11_dp)
  273. # through
  274. input("connect port 1 to port 2...")
  275. fs, _, s21 = self.measure(skip_calibration=True)
  276. s21_dp = []
  277. for (f, s21_point) in zip(fs, s21):
  278. s21_dp.append(Datapoint(f, s21_point.real, s21_point.imag))
  279. self._cal.insert("through", s21_dp)
  280. # isolation
  281. input("connect load to both ports...")
  282. fs, _, s21 = self.measure(skip_calibration=True)
  283. s21_dp = []
  284. for (f, s21_point) in zip(fs, s21):
  285. s21_dp.append(Datapoint(f, s21_point.real, s21_point.imag))
  286. self._cal.insert("isolation", s21_dp)
  287. # calculate corrections
  288. self._cal.calc_corrections()
  289. if self._cal.isValid1Port():
  290. self._cal_valid = True
  291. else:
  292. self._cal_valid = False
  293. # Store Notes
  294. notes = dict()
  295. notes['Date'] = time.ctime()
  296. start, stop, points, avg = self.get_sweep()
  297. notes['Sweep Start'] = start
  298. notes['Sweep Stop'] = stop
  299. notes['Sweep Points'] = points
  300. notes['Point Average'] = avg
  301. notes['Device Name'] = self.name
  302. notes['Device Settings'] = self.read_registers()
  303. self._cal.notes.append(notes) # date
  304. return self._cal_valid
  305. def save_cal(self, filename: str):
  306. if self._cal_valid:
  307. self._cal.save(filename)
  308. def load_cal(self, filename: str) -> bool:
  309. self._cal = Calibration()
  310. self._cal.load(filename)
  311. self._cal.calc_corrections()
  312. if self._cal.isValid1Port():
  313. self._cal_valid = True
  314. else:
  315. self._cal_valid = False
  316. return self._cal_valid
  317. def measure(self, skip_calibration=False):
  318. # retrive data from fifo
  319. fifo_bytes = self._read_fifo(self.points)
  320. if fifo_bytes == None:
  321. return None
  322. # unpack data and convert to complex
  323. for n_point in range(self.points):
  324. # TODO: clean this up
  325. (
  326. fwd_real,
  327. fwd_imag,
  328. rev0_real,
  329. rev0_imag,
  330. rev1_real,
  331. rev1_imag,
  332. freq_index,
  333. ) = unpack_from(_NANOVAN_V2_FIFO_FORMAT, fifo_bytes, n_point * 32)
  334. fwd = complex(fwd_real, fwd_imag)
  335. refl = complex(rev0_real, rev0_imag)
  336. thru = complex(rev1_real, rev1_imag)
  337. # store internally and calculate s11, s21
  338. self._fwd[freq_index] = fwd
  339. self._refl[freq_index] = refl
  340. self._thru[freq_index] = thru
  341. self._f[freq_index] = self.freq_start + self.freq_step * freq_index
  342. self._s11[freq_index] = refl / fwd
  343. self._s21[freq_index] = thru / fwd
  344. # skip calibration if requested
  345. if skip_calibration==True:
  346. return self._f, self._s11, self._s21
  347. # apply calibration
  348. # TODO: Implement this better (skrf or custom modifications to calibration import)
  349. cal_s11 = None
  350. cal_s21 = None
  351. if self._cal.isValid1Port():
  352. cal_s11 = []
  353. for f, s11_point in zip(self._f, self._s11):
  354. dp = Datapoint(f, s11_point.real, s11_point.imag)
  355. cal_s11.append(self._cal.correct11(dp).z)
  356. if self._cal.isValid2Port():
  357. cal_s21 = []
  358. for f, s21_point in zip(self._f, self._s21):
  359. dp = Datapoint(f, s21_point.real, s21_point.imag)
  360. cal_s21.append(self._cal.correct21(dp).z)
  361. if cal_s11 == None:
  362. return self._f, None, None
  363. if cal_s21 == None:
  364. return self._f, cal_s11, None
  365. return self._f, cal_s11, cal_s21
  366. # def setTXPower(self, freq_range, power_desc):
  367. # if freq_range[0] != 140e6:
  368. # raise ValueError('Invalid TX power frequency range')
  369. # # 140MHz..max => ADF4350
  370. # self._set_register(0x42, _ADF4350_TXPOWER_DESC_REV_MAP[power_desc], 1)