wsc_tools.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. import pyautogui as ue
  2. import serial
  3. import serial.tools.list_ports
  4. import logging
  5. import time
  6. class EncoderConfig():
  7. BAUD_RATE = 115200
  8. SET_AP_COMMAND = "AT+WA="
  9. CLEAR_APS_COMMAND = "AT+WC="
  10. LIST_APS_COMMAND = "AT+WL"
  11. def __init__(self):
  12. self.__ports = None
  13. self.__ser = None
  14. self.__port = None
  15. def setPort(self, port):
  16. if self.__ser != None:
  17. self.__ser.close()
  18. self.__port = port
  19. def close(self):
  20. if self.__ser != None:
  21. self.__ser.close()
  22. def connect(self, port=None):
  23. if port != None:
  24. self.port = port
  25. self.__ser = serial.Serial(port = self.port, baudrate=EncoderConfig.BAUD_RATE, timeout=3)
  26. #self.__ser.open()
  27. def reset(self):
  28. self.__ser.setDTR(value=1)
  29. time.sleep(0.5)
  30. self.__ser.setDTR(value=0)
  31. time.sleep(0.5)
  32. def swReset(self):
  33. self.connect(port)
  34. clear_str = "\n\r"
  35. clear_str = clear_str.encode('utf-8')
  36. time.sleep(0.5)
  37. self.__ser.write(clear_str)
  38. time.sleep(0.5)
  39. def getPort(self):
  40. return self.__port
  41. def getPorts(self):
  42. temp = []
  43. self.__ports = serial.tools.list_ports.comports()
  44. for port in self.__ports:
  45. temp.append(port[0])
  46. return temp
  47. def clearWifiAPs(self, port):
  48. self.connect(port)
  49. clear_str = "\n\r"
  50. clear_str = clear_str.encode('utf-8')
  51. self.__ser.write(clear_str)
  52. def setWifiAPs(self, aps, port):
  53. MAX_APS = 1
  54. self.connect(port)
  55. clear_str = "\n\r"
  56. clear_str = clear_str.encode('utf-8')
  57. time.sleep(0.5)
  58. self.__ser.write(clear_str)
  59. time.sleep(0.5)
  60. self.setWifiAP(aps[0][0],aps[0][1])
  61. time.sleep(1)
  62. # for creds in aps[MAX_APS-1]:
  63. # self.setWifiAP(creds[0], creds[1])
  64. # time.sleep(1)
  65. self.__ser.write(clear_str)
  66. time.sleep(0.5)
  67. ## do hard reset
  68. self.reset()
  69. self.__ser.close()
  70. def setWifiAP(self, ssid, password=None):
  71. if not self.__ser.isOpen():
  72. return
  73. command_str = EncoderConfig.SET_AP_COMMAND + ssid + "," + password + "\r\n"
  74. command_str = command_str.encode('utf-8')
  75. self.__ser.write(command_str)
  76. def clearWifiAPs(self, port):
  77. pass
  78. def getWifiConfig(self):
  79. pass
  80. def setBrokerConfig(self):
  81. pass
  82. def getBrokerConifg(self):
  83. pass
  84. class ParsingTools():
  85. def macToString(self, mac_bytes):
  86. ''' Convert uint8 byte string to "XX:XX:XX:XX:XX"
  87. '''
  88. mac_str = ""
  89. for byte in mac_bytes:
  90. mac_str = mac_str + format(byte, '02x') + ':'
  91. return mac_str[:-1].format('utf-8')
  92. def calculateVCell(self, raw_Vcell):
  93. raw_Vcell = int(raw_Vcell.decode('utf-8'))
  94. return raw_Vcell*1.25e-3
  95. def calculateSOC(self, raw_SOC):
  96. raw_SOC = int(raw_SOC.decode('utf-8'))
  97. soc = raw_SOC.to_bytes(2, 'big')[0]
  98. if (soc > 100):
  99. return 100
  100. else:
  101. return raw_SOC.to_bytes(2, 'big')[0]
  102. def getIDfromTopic(self, topic):
  103. return topic.split('/')[1]
  104. def getFieldfromTopic(self, topic):
  105. return topic.split('/')[3]
  106. def getStr(self, byteCode):
  107. return byteCode.decode('utf-8')
  108. class FieldGenerator():
  109. def generateMACID(self):
  110. import numpy as np
  111. macID = np.random.randint(255, size=5)
  112. macStr = ""
  113. for val in macID:
  114. macStr = macStr + format(val, 'x') + ":"
  115. return macStr[:-1]
  116. def generateRSSI(self):
  117. import numpy as np
  118. rssi = np.random.randint(80)
  119. rssiStr = "RSSI: -" + str(rssi) + "dBm"
  120. return rssiStr
  121. def generateSOC(self):
  122. import numpy as np
  123. soc = np.random.randint(100)
  124. socStr = "Battery: " + str(soc) + "%"
  125. return socStr
  126. def generateStatus(self):
  127. import numpy as np
  128. hr = np.random.randint(12) + 1
  129. min = np.random.randint(60)
  130. ampm = np.random.randint(1)
  131. statusStr = "Last Update: " + str(hr) + ":" + str(min)
  132. return statusStr
  133. class Switch():
  134. def __init__(self):
  135. self.__key = "1"
  136. self.__latch = False
  137. self.__latchState = 0
  138. self.__timer = 0
  139. self.__release = True
  140. self.__interval = 0.0 # milliseconds
  141. self.active = True
  142. def getKey(self):
  143. return self.__key
  144. def getActive(self):
  145. return self.active
  146. def setConfig(self, key, latch=False, interval=0.0, release=False, enable=True):
  147. # release old key if currently held down
  148. self.releaseKey(force=True)
  149. self.__latchState = 0
  150. self.active = enable
  151. self.__key = key
  152. self.__latch = latch
  153. self.__interval = interval
  154. self.__release = release
  155. def activate(self):
  156. self.active = True
  157. def deactivate(self):
  158. self.active = False
  159. def pressKey(self, force=False):
  160. if self.active or force:
  161. if self.__release:
  162. ue.keyDown(self.__key)
  163. else:
  164. ue.typewrite(self.__key, interval=self.__interval)
  165. def releaseKey(self, force=False):
  166. if self.active or force:
  167. if self.__release:
  168. ue.keyUp(self.__key)
  169. if __name__ == '__main__':
  170. print("I never developed self-test, but if I did they would go here.")
  171. pt = ParsingTools()
  172. print("Testing macToString")
  173. print(pt.macToString(b'023430'))
  174. print("Testing getModuleIDfromTopic")
  175. print(pt.getIDfromTopic("encoder/12:23:32:32:32/health/ota"))
  176. print("Testing getFieldfromTopic")
  177. print(pt.getFieldfromTopic("encoder/12:23:32:32:32/health/ota"))