NanoVNA_V2.py 14 KB


  1. # NanoVNASaver
  2. #
  3. # A python program to view and export Touchstone data from a NanoVNA
  4. # Copyright (C) 2019, 2020 Rune B. Broberg
  5. # Copyright (C) 2020 NanoVNA-Saver Authors
  6. #
  7. # This program is free software: you can redistribute it and/or modify
  8. # it under the terms of the GNU General Public License as published by
  9. # the Free Software Foundation, either version 3 of the License, or
  10. # (at your option) any later version.
  11. #
  12. # This program is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU General Public License
  18. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  19. import logging
  20. import platform
  21. import time
  22. from struct import pack, unpack_from
  23. from time import sleep
  24. from typing import List
  25. from Serial import Interface
  26. from Calibration import Calibration
  27. from RFTools import Datapoint
  28. __all__ = ['NanoVNA_V2_H4']
  29. if platform.system() != "Windows":
  30. import tty
  31. logger = logging.getLogger(__name__)
  32. _CMD_NOP = 0x00
  33. _CMD_INDICATE = 0x0D
  34. _CMD_READ = 0x10
  35. _CMD_READ2 = 0x11
  36. _CMD_READ4 = 0x12
  37. _CMD_READFIFO = 0x18
  38. _CMD_WRITE = 0x20
  39. _CMD_WRITE2 = 0x21
  40. _CMD_WRITE4 = 0x22
  41. _CMD_WRITE8 = 0x23
  42. _CMD_WRITEFIFO = 0x28
  43. WRITE_SLEEP = 0.05
  44. _ADF4350_TXPOWER_DESC_MAP = {
  45. 0: "9dattenuation",
  46. 1: "6dB attenuation",
  47. 2: "3dB attenuation",
  48. 3: "Maximum",
  49. }
  50. _ADF4350_TXPOWER_DESC_REV_MAP = {
  51. value: key for key, value in _ADF4350_TXPOWER_DESC_MAP.items()
  52. }
  53. # _NANOVNA_V2_CMDS = {
  54. # 'NOP' : {'val': 0x, 'len':, 'resp': },
  55. # 'INDICATE' : {'val': 0x, 'len':, 'resp': },
  56. # 'READ' : {'val': 0x, 'len':, 'resp': },
  57. # 'READ2' : {'val': 0x, 'len':, 'resp': },
  58. # 'READ4' : {'val': 0x, 'len':, 'resp': },
  59. # 'READFIFO' : {'val': 0x, 'len':, 'resp': },
  60. # 'NOP' : {'val': 0x, 'len':, 'resp': },
  61. # 'NOP' : {'val': 0x, 'len':, 'resp': },
  62. # }
  63. _NANOVNA_V2_REGS = {
  64. "sweepStartHz": {"addr": 0x00, "len": 8},
  65. "sweepStepHz": {"addr": 0x10, "len": 8},
  66. "sweepPoints": {"addr": 0x20, "len": 2},
  67. "valuesPerFrequency": {"addr": 0x22, "len": 2},
  68. "rawSamplesMode": {"addr": 0x26, "len": 1},
  69. "valuesFIFO": {"addr": 0x30, "len": 4},
  70. "deviceVariant": {"addr": 0xF0, "len": 1},
  71. "protocolVersion": {"addr": 0xF1, "len": 1},
  72. "hardwareRevision": {"addr": 0xF2, "len": 1},
  73. "firmwareMajor": {"addr": 0xF3, "len": 1},
  74. "firmwareMinor": {"addr": 0xF4, "len": 1},
  75. }
  76. """
  77. NanoVNA FIFO Format (32 bytes)
  78. fwd0Re (uint32)
  79. fw0Im (uint32)
  80. rev0Re (uint32)
  81. rev0Re (uint32)
  82. rev1Re (uint32)
  83. rev1Im (uint32)
  84. freqIndex (uint16)
  85. reserved (6 bytes)
  86. """
  87. _NANOVAN_V2_FIFO_FORMAT = "<iiiiiiHxxxxxx"
  88. class NanoVNA_V2_H4:
  89. name = "NanoVNA V2 Plus4"
  90. def __init__(self, iface: Interface = None):
  91. self.iface = iface
  92. self.iface.open()
  93. if platform.system() != "Windows":
  94. tty.setraw(self.iface.fd)
  95. # reset protocol to known state
  96. with self.iface.lock:
  97. self.iface.write(pack("<Q", 0))
  98. sleep(WRITE_SLEEP)
  99. # # firmware major version of 0xff indicates dfu mode
  100. # if self.version.data["major"] == 0xff:
  101. # raise IOError('Device is in DFU mode')
  102. # Write Parameters
  103. self.write_sleep = 0.05
  104. # Device Ranges
  105. self.valid_freq = range(
  106. int(50e3), int(4.4e9 + 1)
  107. ) # Frequency Range: 50kHz - 4.4GHz, 1Hz Resolution
  108. self.valid_power = None # Not Implemented / Fixed
  109. self.valid_bw = 800 # Fixed
  110. self.valid_points = range(1, 10000) # TODO: Increase to actual limit
  111. # Device Defaults
  112. self.freq_start = 140e6
  113. self.freq_step = None
  114. self.freq_end = 4.4e9
  115. self.bw = 800
  116. self.points = 100
  117. # Device Memory
  118. self._fwd = None
  119. self._refl = None
  120. self._thru = None
  121. self._f = None
  122. self._s11 = None
  123. self._s22 = None
  124. self._cal = Calibration()
  125. self._cal_valid = False
  126. # Load Defaults
  127. self.set_sweep(self.freq_start, self.freq_end, self.points)
  128. def __del__(self):
  129. self.iface.close()
  130. def connect(self) -> bool:
  131. self.iface.open()
  132. return self.iface.isOpen()
  133. def disconnect(self) -> bool:
  134. self.iface.close()
  135. return not self.iface.isOpen()
  136. def reconnect(self) -> bool:
  137. self.iface.close()
  138. sleep(self.wait)
  139. self.iface.open()
  140. return self.iface.isOpen()
  141. def _read_register(self, reg_name: str):
  142. if reg_name not in _NANOVNA_V2_REGS:
  143. raise ValueError(
  144. f"must be name of nanoVNA v2 register: {_NANOVNA_V2_REGS.keys()}"
  145. )
  146. size = _NANOVNA_V2_REGS[reg_name]["len"]
  147. addr = _NANOVNA_V2_REGS[reg_name]["addr"]
  148. if size == 1:
  149. packet = pack("<BB", _CMD_READ, addr)
  150. elif size == 2:
  151. packet = pack("<BB", _CMD_READ2, addr)
  152. elif size == 4:
  153. packet = pack("<BB", _CMD_READ4, addr)
  154. elif size == 8:
  155. packet = pack("<BBBB", _CMD_READ4, addr, _CMD_READ4, addr + 4)
  156. self.iface.flush()
  157. self.iface.write(packet)
  158. resp = self.iface.read(size)
  159. if len(resp) != size:
  160. return None
  161. return int.from_bytes(resp, byteorder="little")
  162. def _write_register(self, reg_name, value):
  163. if reg_name not in _NANOVNA_V2_REGS:
  164. raise ValueError("must be name of nanoVNA v2 register")
  165. size = _NANOVNA_V2_REGS[reg_name]["len"]
  166. addr = _NANOVNA_V2_REGS[reg_name]["addr"]
  167. if size == 1:
  168. packet = pack("<BBB", _CMD_WRITE, addr, value)
  169. elif size == 2:
  170. packet = pack("<BBH", _CMD_WRITE2, addr, value)
  171. elif size == 4:
  172. packet = pack("<BBI", _CMD_WRITE4, addr, value)
  173. elif size == 8:
  174. packet = pack("<BBQ", _CMD_WRITE8, addr, value)
  175. self.iface.write(packet)
  176. def _read_fifo(self, points: int):
  177. # clear something
  178. self.iface.write(pack("<Q", 0))
  179. # clear FIFO
  180. self._write_register("valuesFIFO", 0x00)
  181. points_remaining = points
  182. fifo_bytes = bytes()
  183. while points_remaining > 0:
  184. # calculate the number of points to read (255) at a time
  185. points_to_read = min(255, points_remaining)
  186. bytes_to_read = points_to_read * 32
  187. # TODO: Fix
  188. self.iface.write(pack("<BBB", _CMD_READFIFO, 0x30, points_to_read))
  189. timeout = self.iface.timeout
  190. self.iface.timeout = points_to_read * 0.032 + 0.1
  191. resp = self.iface.read(bytes_to_read)
  192. if len(resp) != bytes_to_read:
  193. self.iface.timeout = timeout
  194. return None
  195. fifo_bytes = fifo_bytes + resp
  196. points_remaining = points_remaining - points_to_read
  197. # restore timeout
  198. self.iface.timeout = timeout
  199. return fifo_bytes
  200. def read_registers(self):
  201. i = dict()
  202. for reg in _NANOVNA_V2_REGS.keys():
  203. if reg == "valuesFIFO": # skip
  204. continue
  205. i[reg] = self._read_register(reg)
  206. return i
  207. def set_sweep(
  208. self, freq_start: float, freq_end: float, points: float = 101, avg: int = 1
  209. ):
  210. # input checking
  211. if int(freq_start) not in self.valid_freq:
  212. raise ValueError("outside of operating frequency range")
  213. if int(freq_end) not in self.valid_freq:
  214. raise ValueError("outside of operating frequency range")
  215. if freq_end <= freq_start:
  216. raise ValueError("start frequency must be < end frequency")
  217. if int(points) not in self.valid_points:
  218. raise ValueError("invalid number of data points")
  219. # store and calc registers
  220. self.points = int(points)
  221. self.freq_start = int(freq_start)
  222. self.freq_step = int((int(freq_end) - freq_start) / (self.points - 1))
  223. self.freq_end = self.freq_start + self.freq_step * (self.points - 1)
  224. self.avg = avg
  225. # write registers
  226. self._write_register("sweepStartHz", self.freq_start)
  227. self._write_register("sweepStepHz", self.freq_step)
  228. self._write_register("sweepPoints", self.points)
  229. self._write_register("valuesPerFrequency", self.avg)
  230. # allocate memory for sweep data
  231. self._fwd = [complex()] * self.points
  232. self._refl = [complex()] * self.points
  233. self._thru = [complex()] * self.points
  234. self._freq_index = [int()] * self.points
  235. self._f = [int()] * self.points
  236. self._s11 = [complex()] * self.points
  237. self._s21 = [complex()] * self.points
  238. def get_sweep(self) -> List:
  239. reg = self.read_registers()
  240. self.freq_start = reg["sweepStartHz"]
  241. self.freq_step = reg["sweepStepHz"]
  242. self.points = reg["sweepPoints"]
  243. self.freq_end = self.freq_start + self.freq_step * (self.points - 1)
  244. self.avg = reg["valuesPerFrequency"]
  245. return (self.freq_start, self.freq_end, self.points, self.avg)
  246. def wizard_cal(self, port1_only:bool=False):
  247. # wipe old cal because I'm scared
  248. self._cal = Calibration()
  249. # short
  250. input("connect Short to port 1...")
  251. fs, s11_point, _ = self.measure(skip_calibration=True)
  252. s11_dp = []
  253. for (f, s11_point) in zip(fs, s11_point):
  254. s11_dp.append(Datapoint(f, s11_point.real, s11_point.imag))
  255. self._cal.insert("short", s11_dp)
  256. # open
  257. input("connect Open to port 1...")
  258. fs, s11, _ = self.measure(skip_calibration=True)
  259. s11_dp = []
  260. for (f, s11_point) in zip(fs, s11):
  261. s11_dp.append(Datapoint(f, s11_point.real, s11_point.imag))
  262. self._cal.insert("open", s11_dp)
  263. # load
  264. input("connect Load to port 1...")
  265. fs, s11, _ = self.measure(skip_calibration=True)
  266. s11_dp = []
  267. for (f, s11_point) in zip(fs, s11):
  268. s11_dp.append(Datapoint(f, s11_point.real, s11_point.imag))
  269. self._cal.insert("load", s11_dp)
  270. # through
  271. input("connect port 1 to port 2...")
  272. fs, _, s21 = self.measure(skip_calibration=True)
  273. s21_dp = []
  274. for (f, s21_point) in zip(fs, s21):
  275. s21_dp.append(Datapoint(f, s21_point.real, s21_point.imag))
  276. self._cal.insert("through", s21_dp)
  277. # isolation
  278. input("connect load to both ports...")
  279. fs, _, s21 = self.measure(skip_calibration=True)
  280. s21_dp = []
  281. for (f, s21_point) in zip(fs, s21):
  282. s21_dp.append(Datapoint(f, s21_point.real, s21_point.imag))
  283. self._cal.insert("isolation", s21_dp)
  284. # calculate corrections
  285. self._cal.calc_corrections()
  286. if self._cal.isValid1Port():
  287. self._cal_valid = True
  288. else:
  289. self._cal_valid = False
  290. # Store Notes
  291. notes = dict()
  292. notes['Date'] = time.ctime()
  293. start, stop, points, avg = self.get_sweep()
  294. notes['Sweep Start'] = start
  295. notes['Sweep Stop'] = stop
  296. notes['Sweep Points'] = points
  297. notes['Point Average'] = avg
  298. notes['Device Name'] = self.name
  299. notes['Device Settings'] = self.read_registers()
  300. self._cal.notes.append(notes) # date
  301. return self._cal_valid
  302. def save_cal(self, filename: str):
  303. if self._cal_valid:
  304. self._cal.save(filename)
  305. def load_cal(self, filename: str) -> bool:
  306. self._cal = Calibration()
  307. self._cal.load(filename)
  308. self._cal.calc_corrections()
  309. if self._cal.isValid1Port():
  310. self._cal_valid = True
  311. else:
  312. self._cal_valid = False
  313. return self._cal_valid
  314. def measure(self, skip_calibration=False):
  315. # retrive data from fifo
  316. fifo_bytes = self._read_fifo(self.points)
  317. if fifo_bytes == None:
  318. return None
  319. # unpack data and convert to complex
  320. for n_point in range(self.points):
  321. # TODO: clean this up
  322. (
  323. fwd_real,
  324. fwd_imag,
  325. rev0_real,
  326. rev0_imag,
  327. rev1_real,
  328. rev1_imag,
  329. freq_index,
  330. ) = unpack_from(_NANOVAN_V2_FIFO_FORMAT, fifo_bytes, n_point * 32)
  331. fwd = complex(fwd_real, fwd_imag)
  332. refl = complex(rev0_real, rev0_imag)
  333. thru = complex(rev1_real, rev1_imag)
  334. # store internally and calculate s11, s21
  335. self._fwd[freq_index] = fwd
  336. self._refl[freq_index] = refl
  337. self._thru[freq_index] = thru
  338. self._f[freq_index] = self.freq_start + self.freq_step * freq_index
  339. self._s11[freq_index] = refl / fwd
  340. self._s21[freq_index] = thru / fwd
  341. # skip calibration if requested
  342. if skip_calibration==True:
  343. return self._f, self._s11, self._s21
  344. # apply calibration
  345. # TODO: Implement this better (skrf or custom modifications to calibration import)
  346. cal_s11 = None
  347. cal_s21 = None
  348. if self._cal.isValid1Port():
  349. cal_s11 = []
  350. for f, s11_point in zip(self._f, self._s11):
  351. dp = Datapoint(f, s11_point.real, s11_point.imag)
  352. cal_s11.append(self._cal.correct11(dp).z)
  353. if self._cal.isValid2Port():
  354. cal_s21 = []
  355. for f, s21_point in zip(self._f, self._s21):
  356. dp = Datapoint(f, s21_point.real, s21_point.imag)
  357. cal_s21.append(self._cal.correct21(dp).z)
  358. if cal_s11 == None:
  359. return self._f, None, None
  360. if cal_s21 == None:
  361. return self._f, cal_s11, None
  362. return self._f, cal_s11, cal_s21
  363. # def setTXPower(self, freq_range, power_desc):
  364. # if freq_range[0] != 140e6:
  365. # raise ValueError('Invalid TX power frequency range')
  366. # # 140MHz..max => ADF4350
  367. # self._set_register(0x42, _ADF4350_TXPOWER_DESC_REV_MAP[power_desc], 1)