wsc_tools.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import pyautogui as ue
  2. import serial
  3. import serial.tools.list_ports
  4. import logging
  5. import time
  6. class EncoderConfig():
  7. BAUD_RATE = 115200
  8. SET_AP_COMMAND = "AT+WA="
  9. CLEAR_APS_COMMAND = "AT+WC="
  10. LIST_APS_COMMAND = "AT+WL"
  11. def __init__(self):
  12. self.__ports = None
  13. self.__ser = None
  14. self.__port = None
  15. def setPort(self, port):
  16. if self.__ser != None:
  17. self.__ser.close()
  18. self.__port = port
  19. def close(self):
  20. if self.__ser != None:
  21. self.__ser.close()
  22. def connect(self, port=None):
  23. if port != None:
  24. self.port = port
  25. self.__ser = serial.Serial(port = self.port, baudrate=EncoderConfig.BAUD_RATE, timeout=3)
  26. #self.__ser.open()
  27. def reset(self):
  28. self.__ser.setDTR(value=1)
  29. time.sleep(0.5)
  30. self.__ser.setDTR(value=0)
  31. time.sleep(0.5)
  32. def getPort(self):
  33. return self.__port
  34. def getPorts(self):
  35. temp = []
  36. self.__ports = serial.tools.list_ports.comports()
  37. for port in self.__ports:
  38. temp.append(port[0])
  39. return temp
  40. def clearWifiAPs(self, port):
  41. self.connect(port)
  42. clear_str = "\n\r"
  43. clear_str = clear_str.encode('utf-8')
  44. self.__ser.write(clear_str)
  45. def setWifiAPs(self, aps, port):
  46. self.connect(port)
  47. clear_str = "\n\r"
  48. clear_str = clear_str.encode('utf-8')
  49. time.sleep(0.5)
  50. self.__ser.write(clear_str)
  51. time.sleep(0.5)
  52. for creds in aps:
  53. self.setWifiAP(creds[0], creds[1])
  54. time.sleep(1)
  55. self.__ser.write(clear_str)
  56. time.sleep(0.5)
  57. ## do hard reset
  58. self.reset()
  59. ##reset_str = "\n\rAT+RST\r\n"
  60. ##reset_str = reset_str.encode('utf-8')
  61. ##self.__ser.write(reset_str)
  62. self.__ser.close()
  63. def setWifiAP(self, ssid, password=None):
  64. if not self.__ser.isOpen():
  65. return
  66. command_str = EncoderConfig.SET_AP_COMMAND + ssid + "," + password + "\r\n"
  67. command_str = command_str.encode('utf-8')
  68. self.__ser.write(command_str)
  69. def clearWifiAPs(self, port):
  70. pass
  71. def getWifiConfig(self):
  72. pass
  73. def setBrokerConfig(self):
  74. pass
  75. def getBrokerConifg(self):
  76. pass
  77. class ParsingTools():
  78. def macToString(self, mac_bytes):
  79. ''' Convert uint8 byte string to "XX:XX:XX:XX:XX"
  80. '''
  81. mac_str = ""
  82. for byte in mac_bytes:
  83. mac_str = mac_str + format(byte, '02x') + ':'
  84. return mac_str[:-1].format('utf-8')
  85. def calculateVCell(self, raw_Vcell):
  86. raw_Vcell = int(raw_Vcell.decode('utf-8'))
  87. return raw_Vcell*1.25e-3
  88. def calculateSOC(self, raw_SOC):
  89. raw_SOC = int(raw_SOC.decode('utf-8'))
  90. soc = raw_SOC.to_bytes(2, 'big')[0]
  91. if (soc > 100):
  92. return 100
  93. else:
  94. return raw_SOC.to_bytes(2, 'big')[0]
  95. def getIDfromTopic(self, topic):
  96. return topic.split('/')[1]
  97. def getFieldfromTopic(self, topic):
  98. return topic.split('/')[3]
  99. def getStr(self, byteCode):
  100. return byteCode.decode('utf-8')
  101. class FieldGenerator():
  102. def generateMACID(self):
  103. import numpy as np
  104. macID = np.random.randint(255, size=5)
  105. macStr = ""
  106. for val in macID:
  107. macStr = macStr + format(val, 'x') + ":"
  108. return macStr[:-1]
  109. def generateRSSI(self):
  110. import numpy as np
  111. rssi = np.random.randint(80)
  112. rssiStr = "RSSI: -" + str(rssi) + "dBm"
  113. return rssiStr
  114. def generateSOC(self):
  115. import numpy as np
  116. soc = np.random.randint(100)
  117. socStr = "Battery: " + str(soc) + "%"
  118. return socStr
  119. def generateStatus(self):
  120. import numpy as np
  121. hr = np.random.randint(12) + 1
  122. min = np.random.randint(60)
  123. ampm = np.random.randint(1)
  124. statusStr = "Last Update: " + str(hr) + ":" + str(min)
  125. return statusStr
  126. class Switch():
  127. def __init__(self):
  128. self.__key = "1"
  129. self.__latch = False
  130. self.__latchState = 0
  131. self.__timer = 0
  132. self.__release = True
  133. self.__interval = 0.0 # milliseconds
  134. self.active = True
  135. def getKey(self):
  136. return self.__key
  137. def getActive(self):
  138. return self.active
  139. def setConfig(self, key, latch=False, interval=0.0, release=False, enable=True):
  140. # release old key if currently held down
  141. self.releaseKey(force=True)
  142. self.__latchState = 0
  143. self.active = enable
  144. self.__key = key
  145. self.__latch = latch
  146. self.__interval = interval
  147. self.__release = release
  148. def activate(self):
  149. self.active = True
  150. def deactivate(self):
  151. self.active = False
  152. def pressKey(self, force=False):
  153. if self.active or force:
  154. if self.__release:
  155. ue.keyDown(self.__key)
  156. else:
  157. ue.typewrite(self.__key, interval=self.__interval)
  158. def releaseKey(self, force=False):
  159. if self.active or force:
  160. if self.__release:
  161. ue.keyUp(self.__key)
  162. if __name__ == '__main__':
  163. print("I never developed self-test, but if I did they would go here.")
  164. pt = ParsingTools()
  165. print("Testing macToString")
  166. print(pt.macToString(b'023430'))
  167. print("Testing getModuleIDfromTopic")
  168. print(pt.getIDfromTopic("encoder/12:23:32:32:32/health/ota"))
  169. print("Testing getFieldfromTopic")
  170. print(pt.getFieldfromTopic("encoder/12:23:32:32:32/health/ota"))