wsc_tools.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. import pyautogui as ue
  2. import serial
  3. import serial.tools.list_ports
  4. import logging
  5. import time
  6. from pyHS100 import SmartPlug, SmartBulb
  7. class EncoderConfig():
  8. BAUD_RATE = 115200
  9. SET_AP_COMMAND = "AT+WA="
  10. CLEAR_APS_COMMAND = "AT+WC="
  11. LIST_APS_COMMAND = "AT+WL"
  12. def __init__(self):
  13. self.__ports = None
  14. self.__ser = None
  15. self.__port = None
  16. def setPort(self, port):
  17. if self.__ser != None:
  18. self.__ser.close()
  19. self.__port = port
  20. def close(self):
  21. if self.__ser != None:
  22. self.__ser.close()
  23. def connect(self, port=None):
  24. if port != None:
  25. self.port = port
  26. self.__ser = serial.Serial(port = self.port, baudrate=EncoderConfig.BAUD_RATE, timeout=3)
  27. #self.__ser.open()
  28. def reset(self):
  29. self.__ser.setDTR(value=1)
  30. time.sleep(0.5)
  31. self.__ser.setDTR(value=0)
  32. time.sleep(0.5)
  33. def swReset(self):
  34. self.connect(port)
  35. clear_str = "\n\r"
  36. clear_str = clear_str.encode('utf-8')
  37. time.sleep(0.5)
  38. self.__ser.write(clear_str)
  39. time.sleep(0.5)
  40. def getPort(self):
  41. return self.__port
  42. def getPorts(self):
  43. temp = []
  44. self.__ports = serial.tools.list_ports.comports()
  45. for port in self.__ports:
  46. temp.append(port[0])
  47. return temp
  48. def clearWifiAPs(self, port):
  49. self.connect(port)
  50. clear_str = "\n\r"
  51. clear_str = clear_str.encode('utf-8')
  52. self.__ser.write(clear_str)
  53. def setWifiAPs(self, aps, port):
  54. MAX_APS = 1
  55. self.connect(port)
  56. clear_str = "\n\r"
  57. clear_str = clear_str.encode('utf-8')
  58. time.sleep(0.5)
  59. self.__ser.write(clear_str)
  60. time.sleep(0.5)
  61. self.setWifiAP(aps[0][0],aps[0][1])
  62. time.sleep(1)
  63. # for creds in aps[MAX_APS-1]:
  64. # self.setWifiAP(creds[0], creds[1])
  65. # time.sleep(1)
  66. self.__ser.write(clear_str)
  67. time.sleep(0.5)
  68. ## do hard reset
  69. self.reset()
  70. self.__ser.close()
  71. def setWifiAP(self, ssid, password=None):
  72. if not self.__ser.isOpen():
  73. return
  74. command_str = EncoderConfig.SET_AP_COMMAND + ssid + "," + password + "\r\n"
  75. command_str = command_str.encode('utf-8')
  76. self.__ser.write(command_str)
  77. def clearWifiAPs(self, port):
  78. pass
  79. def getWifiConfig(self):
  80. pass
  81. def setBrokerConfig(self):
  82. pass
  83. def getBrokerConifg(self):
  84. pass
  85. class ParsingTools():
  86. def macToString(self, mac_bytes):
  87. ''' Convert uint8 byte string to "XX:XX:XX:XX:XX"
  88. '''
  89. mac_str = ""
  90. for byte in mac_bytes:
  91. mac_str = mac_str + format(byte, '02x') + ':'
  92. return mac_str[:-1].format('utf-8')
  93. def calculateVCell(self, raw_Vcell):
  94. raw_Vcell = int(raw_Vcell.decode('utf-8'))
  95. return raw_Vcell*1.25e-3
  96. def calculateSOC(self, raw_SOC):
  97. raw_SOC = int(raw_SOC.decode('utf-8'))
  98. soc = raw_SOC.to_bytes(2, 'big')[0]
  99. if (soc > 100):
  100. return 100
  101. else:
  102. return raw_SOC.to_bytes(2, 'big')[0]
  103. def getIDfromTopic(self, topic):
  104. return topic.split('/')[1]
  105. def getFieldfromTopic(self, topic):
  106. return topic.split('/')[3]
  107. def getStr(self, byteCode):
  108. return byteCode.decode('utf-8')
  109. class FieldGenerator():
  110. def generateMACID(self):
  111. import numpy as np
  112. macID = np.random.randint(255, size=5)
  113. macStr = ""
  114. for val in macID:
  115. macStr = macStr + format(val, 'x') + ":"
  116. return macStr[:-1]
  117. def generateRSSI(self):
  118. import numpy as np
  119. rssi = np.random.randint(80)
  120. rssiStr = "RSSI: -" + str(rssi) + "dBm"
  121. return rssiStr
  122. def generateSOC(self):
  123. import numpy as np
  124. soc = np.random.randint(100)
  125. socStr = "Battery: " + str(soc) + "%"
  126. return socStr
  127. def generateStatus(self):
  128. import numpy as np
  129. hr = np.random.randint(12) + 1
  130. min = np.random.randint(60)
  131. ampm = np.random.randint(1)
  132. statusStr = "Last Update: " + str(hr) + ":" + str(min)
  133. return statusStr
  134. plug = SmartPlug('192.168.8.154')
  135. class Switch():
  136. def __init__(self):
  137. self.__key = "1"
  138. self.__latch = False
  139. self.__latchState = 0
  140. self.__timer = 0
  141. self.__release = True
  142. self.__interval = 0.0 # milliseconds
  143. self.active = True
  144. self.outletActive = False
  145. def getKey(self):
  146. return self.__key
  147. def getActive(self):
  148. return self.active
  149. def getOutletActive(self):
  150. return self.outletActive
  151. def setConfig(self, key, latch=False, interval=0.0, release=False, enable=True, outletEnable=False):
  152. # release old key if currently held down
  153. self.releaseKey(force=True)
  154. #self.turnOffOutlet(force=True)
  155. self.__latchState = 0
  156. self.active = enable
  157. self.outletActive = outletEnable
  158. self.__key = key
  159. self.__latch = latch
  160. self.__interval = interval
  161. self.__release = release
  162. def activateOutlet(self):
  163. self.outletActive = True
  164. def deactivateOutlet(self):
  165. self.outletActive = False
  166. def activate(self):
  167. self.active = True
  168. def deactivate(self):
  169. self.active = False
  170. def turnOnOutlet(self, force=False):
  171. if self.outletActive or force:
  172. try:
  173. plug.turn_on()
  174. except Exception as e:
  175. print("Error: Failed to access Wi-Fi Outlet")
  176. def turnOffOutlet(self, force=False):
  177. if self.outletActive or force:
  178. try:
  179. plug.turn_off()
  180. except Exception as e:
  181. print("Error: Failed to access Wi-Fi Outlet")
  182. def pressKey(self, force=False):
  183. if self.active or force:
  184. if self.__release:
  185. ue.keyDown(self.__key)
  186. else:
  187. ue.typewrite(self.__key, interval=self.__interval)
  188. def releaseKey(self, force=False):
  189. if self.active or force:
  190. if self.__release:
  191. ue.keyUp(self.__key)
  192. if __name__ == '__main__':
  193. print("I never developed self-test, but if I did they would go here.")
  194. pt = ParsingTools()
  195. print("Testing macToString")
  196. print(pt.macToString(b'023430'))
  197. print("Testing getModuleIDfromTopic")
  198. print(pt.getIDfromTopic("encoder/12:23:32:32:32/health/ota"))
  199. print("Testing getFieldfromTopic")
  200. print(pt.getFieldfromTopic("encoder/12:23:32:32:32/health/ota"))