wsc_tools.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import pyautogui as ue
  2. import serial
  3. import serial.tools.list_ports
  4. import logging
  5. import time
  6. class EncoderConfig():
  7. BAUD_RATE = 115200
  8. SET_AP_COMMAND = "AT+WA="
  9. CLEAR_APS_COMMAND = "AT+WC="
  10. LIST_APS_COMMAND = "AT+WL"
  11. def __init__(self):
  12. self.__ports = None
  13. self.__ser = None
  14. self.__port = None
  15. def setPort(self, port):
  16. if self.__ser != None:
  17. self.__ser.close()
  18. self.__port = port
  19. def close(self):
  20. if self.__ser != None:
  21. self.__ser.close()
  22. def connect(self, port=None):
  23. if port != None:
  24. self.port = port
  25. self.__ser = serial.Serial(port = self.port, baudrate=EncoderConfig.BAUD_RATE, timeout=3)
  26. #self.__ser.open()
  27. def reset(self):
  28. self.__ser.setDTR(value=1)
  29. time.sleep(0.5)
  30. self.__ser.setDTR(value=0)
  31. time.sleep(0.5)
  32. def getPort(self):
  33. return self.__port
  34. def getPorts(self):
  35. temp = []
  36. self.__ports = serial.tools.list_ports.comports()
  37. for port in self.__ports:
  38. temp.append(port[0])
  39. return temp
  40. def clearWifiAPs(self, port):
  41. self.connect(port)
  42. clear_str = "\n\r"
  43. clear_str = clear_str.encode('utf-8')
  44. self.__ser.write(clear_str)
  45. def setWifiAPs(self, aps, port):
  46. self.connect(port)
  47. clear_str = "\n\r"
  48. clear_str = clear_str.encode('utf-8')
  49. time.sleep(0.5)
  50. self.__ser.write(clear_str)
  51. time.sleep(0.5)
  52. factory_reset_str = "\n\rAT+FRST\n\r"
  53. factory_reset_str = factory_reset_str.encode('utf-8')
  54. self.__ser.write(factory_reset_str)
  55. time.sleep(0.5)
  56. for creds in aps:
  57. self.setWifiAP(creds[0], creds[1])
  58. time.sleep(1)
  59. self.__ser.write(clear_str)
  60. time.sleep(0.5)
  61. ## do hard reset
  62. self.reset()
  63. ##reset_str = "\n\rAT+RST\r\n"
  64. ##reset_str = reset_str.encode('utf-8')
  65. ##self.__ser.write(reset_str)
  66. self.__ser.close()
  67. def setWifiAP(self, ssid, password=None):
  68. if not self.__ser.isOpen():
  69. return
  70. command_str = EncoderConfig.SET_AP_COMMAND + ssid + "," + password + "\r\n"
  71. command_str = command_str.encode('utf-8')
  72. self.__ser.write(command_str)
  73. def clearWifiAPs(self, port):
  74. pass
  75. def getWifiConfig(self):
  76. pass
  77. def setBrokerConfig(self):
  78. pass
  79. def getBrokerConifg(self):
  80. pass
  81. class ParsingTools():
  82. def macToString(self, mac_bytes):
  83. ''' Convert uint8 byte string to "XX:XX:XX:XX:XX"
  84. '''
  85. mac_str = ""
  86. for byte in mac_bytes:
  87. mac_str = mac_str + format(byte, '02x') + ':'
  88. return mac_str[:-1].format('utf-8')
  89. def calculateVCell(self, raw_Vcell):
  90. raw_Vcell = int(raw_Vcell.decode('utf-8'))
  91. return raw_Vcell*1.25e-3
  92. def calculateSOC(self, raw_SOC):
  93. raw_SOC = int(raw_SOC.decode('utf-8'))
  94. soc = raw_SOC.to_bytes(2, 'big')[0]
  95. if (soc > 100):
  96. return 100
  97. else:
  98. return raw_SOC.to_bytes(2, 'big')[0]
  99. def getIDfromTopic(self, topic):
  100. return topic.split('/')[1]
  101. def getFieldfromTopic(self, topic):
  102. return topic.split('/')[3]
  103. def getStr(self, byteCode):
  104. return byteCode.decode('utf-8')
  105. class FieldGenerator():
  106. def generateMACID(self):
  107. import numpy as np
  108. macID = np.random.randint(255, size=5)
  109. macStr = ""
  110. for val in macID:
  111. macStr = macStr + format(val, 'x') + ":"
  112. return macStr[:-1]
  113. def generateRSSI(self):
  114. import numpy as np
  115. rssi = np.random.randint(80)
  116. rssiStr = "RSSI: -" + str(rssi) + "dBm"
  117. return rssiStr
  118. def generateSOC(self):
  119. import numpy as np
  120. soc = np.random.randint(100)
  121. socStr = "Battery: " + str(soc) + "%"
  122. return socStr
  123. def generateStatus(self):
  124. import numpy as np
  125. hr = np.random.randint(12) + 1
  126. min = np.random.randint(60)
  127. ampm = np.random.randint(1)
  128. statusStr = "Last Update: " + str(hr) + ":" + str(min)
  129. return statusStr
  130. class Switch():
  131. def __init__(self):
  132. self.__key = "1"
  133. self.__latch = False
  134. self.__latchState = 0
  135. self.__timer = 0
  136. self.__release = True
  137. self.__interval = 0.0 # milliseconds
  138. self.active = True
  139. def getKey(self):
  140. return self.__key
  141. def getActive(self):
  142. return self.active
  143. def setConfig(self, key, latch=False, interval=0.0, release=False, enable=True):
  144. # release old key if currently held down
  145. self.releaseKey(force=True)
  146. self.__latchState = 0
  147. self.active = enable
  148. self.__key = key
  149. self.__latch = latch
  150. self.__interval = interval
  151. self.__release = release
  152. def activate(self):
  153. self.active = True
  154. def deactivate(self):
  155. self.active = False
  156. def pressKey(self, force=False):
  157. if self.active or force:
  158. if self.__release:
  159. ue.keyDown(self.__key)
  160. else:
  161. ue.typewrite(self.__key, interval=self.__interval)
  162. def releaseKey(self, force=False):
  163. if self.active or force:
  164. if self.__release:
  165. ue.keyUp(self.__key)
  166. if __name__ == '__main__':
  167. print("I never developed self-test, but if I did they would go here.")
  168. pt = ParsingTools()
  169. print("Testing macToString")
  170. print(pt.macToString(b'023430'))
  171. print("Testing getModuleIDfromTopic")
  172. print(pt.getIDfromTopic("encoder/12:23:32:32:32/health/ota"))
  173. print("Testing getFieldfromTopic")
  174. print(pt.getFieldfromTopic("encoder/12:23:32:32:32/health/ota"))