123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454 |
- import logging
- import sys
- import time
- import sip
- from wsc_tools import ParsingTools
- from wsc_tools import Switch
- from PyQt5 import QtCore
- from PyQt5 import QtGui
- from PyQt5 import QtWidgets
- from pyqt.ideasxdevice import Ui_IdeasXDevice
- from pyqt.encoderconfigurationdialog import Ui_SwitchConfigDialog
- from pyqt.devicedialog import Ui_Dialog
- pt = ParsingTools()
- logging.basicConfig( level=logging.DEBUG)
- log = logging.getLogger("wsc_device_encoder")
- class Encoder():
- FAIL = 0
- SUCCESS = 1
- ALIVE_VALUE = b'1'
- ALIVE_TOPIC = "alive"
- DEVICE_HEALTH_TOPIC = "encoder/+/health/#"
- DEVICE_HEALTH_QOS = 0
- SHUTDOWN_COMMAND_TOPIC = "shutdown"
- RESTART_COMMAND_TOPIC = "restart"
- OTA_COMMAND_TOPIC = "ota"
- UART_TX_COMMAND_TOPIC = "uart"
- COMMAND_RETAIN = False
- COMMAND_QOS = 1
- DATA_BUTTON_QOS = 0
- def __init__(self, device_id, mqttc):
- self.__device_id = device_id
- self.__hw_version_default = b"0,0"
- self.__fw_version_default = b"0,0"
- self.__alive_default = b"0"
- self.__vcell_default = b"0"
- self.__charge_default = b"0"
- self.__lbi_default = b"0"
- self.__soc_default = b"0"
- self.__rom_default = b"0"
- self.__ota_default = b"0"
- self.__wireless_default = b"0"
- self.__ssid_default = b""
- self.__bssid_default = b""
- self.__rssi_default = b"0"
- self.__auth_default = b"0"
- self.__time_default = time.time()
- self.__fields = {"device_id": self.__device_id,
- "hw_ver": self.__hw_version_default,
- "fw_ver": self.__fw_version_default,
- "alive": self.__alive_default,
- "vcell": self.__vcell_default,
- "charge": self.__charge_default,
- "lbi": self.__lbi_default,
- "soc": self.__soc_default,
- "rom": self.__rom_default,
- "ota": self.__ota_default,
- "wireless": self.__wireless_default,
- "ssid": self.__ssid_default,
- "bssid": self.__bssid_default,
- "rssi": self.__rssi_default,
- "auth": self.__auth_default,
- "time": self.__time_default}
- self.__commands = {'update': self.update,
- 'restart': self.restart,
- 'shutdown': self.shutdown}
- self.__mqttc = mqttc
- self.switchOne = Switch()
- self.switchOne.setConfig("1")
- self.switchTwo = Switch()
- self.switchTwo.setConfig("2")
- self.switchAdaptive = Switch()
- self.RAW_COMMAND_TOPIC = "encoder/" + device_id + "/command/"
- self.RAW_DATA_TOPIC = "encoder/" + device_id + "/data/"
- self.__previous_button_payload = 0b0
- def getDeviceID(self):
- return self.__device_id
- def updateField(self, field, value):
- if field in self.__fields.keys():
- self.__fields[field] = value
- self.__fields['time'] = time.time()
- return Encoder.SUCCESS
- else:
- return Encoder.FAIL
-
- def listFieldNames(self):
- return self.__fields.keys()
-
- def listFields(self):
- return self.__fields
- def listCommandNames(self):
- return self.__commands.keys()
- def listCommands(self):
- return self.__commmands
-
- def getField(self, field):
- return self.__fields[field]
- def activate(self):
- self.__mqttc.subscribe(self.RAW_DATA_TOPIC + "button", qos=self.DATA_BUTTON_QOS)
- self.__mqttc.message_callback_add(self.RAW_DATA_TOPIC + "button", self.data_button_cb)
- log.info("activated device" + self.__device_id)
- def deactivate(self):
- self.__mqttc.unsubscribe(self.RAW_DATA_TOPIC + "button")
- log.info("deactivated device" + self.__device_id)
- # This needs to be modified for the following:
- # 1. only activate switch if there is change....i.e. don't activating switch 1 again which switch 1 is already
- # depressed and switch 2 is pressed
- # 2. don't be pushing mad buttons because shit is broken
- def data_button_cb(self, mqttc, backend_data, msg):
- topic = msg.topic
- payload = int.from_bytes(msg.payload, 'little')
- switchTwo = 0b100
- switchOne = 0b100000
-
- # press / release key
- if ((payload & switchOne) ^ (self.__previous_button_payload & switchOne)):
- print("switchOne Change")
- if (payload & switchOne):
- self.switchOne.releaseKey()
- else:
- print("Activated!")
- self.switchOne.pressKey()
- if ((payload & switchTwo) ^ (self.__previous_button_payload & switchTwo)):
- print("switchTwo Change")
- if (payload & switchTwo):
- self.switchTwo.releaseKey()
- else:
- print("Activated!")
- self.switchTwo.pressKey()
- self.__previous_button_payload = payload
- log.debug("encoder " + self.__device_id + " button payload: " + bin(payload))
- def update(self):
- self.__mqttc.publish(self.RAW_COMMAND_TOPIC+self.OTA_COMMAND_TOPIC, b'1', qos=self.COMMAND_QOS, retain=False)
- log.info("sent OTA command")
- def restart(self):
- self.__mqttc.publish(self.RAW_COMMAND_TOPIC+self.RESTART_COMMAND_TOPIC, b'1', qos=self.COMMAND_QOS, retain=False)
- log.info("sent restart command")
- def shutdown(self):
- self.__mqttc.publish(self.RAW_COMMAND_TOPIC+self.SHUTDOWN_COMMAND_TOPIC, b'1', qos=self.COMMAND_QOS, retain=False)
- log.info("sent shutdown command")
-
- def locate(self):
- self.__mqttc.publish(self.RAW_COMMAND_TOPIC+self.LOCATE_COMMAND_TOPIC, b'1', qos=self.COMMAND_QOS, retain=False)
- class EncoderUI(QtWidgets.QWidget):
- sendCommand = QtCore.pyqtSignal(['QString'], name='sendCommand')
- activateDevice = QtCore.pyqtSignal(['QString', 'QString'], name='deactivateDevice')
- deactivateDevice = QtCore.pyqtSignal(['QString', 'QString'], name='activateDevice')
- __pathToIcon = {'network': './icon/network/',
- 'battery': './icon/battery/',
- 'battery_charging': './icon/battery/',
- 'switch': './icon/switch/'
- }
- __icon = {'network': ['network-wireless-offline-symbolic.png',
- 'network-wireless-signal-weak-symbolic.png',
- 'network-wireless-signal-ok-symbolic.png',
- 'network-wireless-signal-good-symbolic.png',
- 'network-wireless-signal-excellent-symbolic.png'],
- 'battery': ['battery-empty-symbolic.png',
- 'battery-caution-symbolic.png',
- 'battery-low-symbolic.png',
- 'battery-good-symbolic.png',
- 'battery-full-symbolic.png'],
- 'battery_charging': ['battery-empty-charging-symbolic.png',
- 'battery-caution-charging-symbolic.png',
- 'battery-low-charging-symbolic.png',
- 'battery-good-charging-symbolic.png',
- 'battery-full-charged-symbolic.png'],
- 'switch': ['switch-one-enabled.png',
- 'switch-one-disabled.png',
- 'switch-two-enabled.png',
- 'switch-two-disabled.png',
- 'switch-adaptive-enabled.png',
- 'switch-adaptive-disabled.png']
- }
- __deviceType = 'encoder/'
- def __init__(self, encoder):
- self.__deviceName = None
- self.__encoder = encoder
- self.__org = 'IdeasX'
- self.__app = 'Workstation-Client'
- self.restoreSettings()
- # Setup UI components
- super(EncoderUI, self).__init__()
- self.__ui = Ui_IdeasXDevice()
- self.__ui.setupUi(self)
- self.updateDevice(encoder)
- self.updateSwitchIcons()
- # Setup Signals
- self.setupMenu()
- self.__ui.buttonActivate.clicked.connect(self.activateEncoder)
- self.__ui.buttonSwitchOne.clicked.connect(lambda: self.openSwitchDialog(self.__encoder.switchOne))
- self.__ui.buttonSwitchTwo.clicked.connect(lambda: self.openSwitchDialog(self.__encoder.switchTwo))
- #self.activateDevice.connect(self.__wsc.activateEncoder)
- #self.deactivateDevice.connect(self.__wsc.deactivateEncoder)
- def saveSettings(self, device_name):
- settings = QtCore.QSettings(self.__org, self.__app)
- settings.beginGroup(self.__encoder.getDeviceID())
- settings.setValue("name", device_name)
- settings.setValue("type", "encoder")
- settings.endGroup()
- self.setDeviceAlisas(device_name)
- def restoreSettings(self):
- settings = QtCore.QSettings(self.__org, self.__app)
- settings = QtCore.QSettings(self.__org, self.__app)
- settings.beginGroup(self.__encoder.getDeviceID())
- self.__deviceName = settings.value("name", self.__encoder.getDeviceID())
- settings.endGroup()
- def openDeviceInformation(self):
- dialog = InfoUI()
- dialog.updateDisplay(self.__encoder.listFields())
- dialog.newDeviceName.connect(self.saveSettings)
- dialog.exec()
- def openSwitchDialog(self, switch):
- dialog = SwitchUI(switch)
- if dialog.exec_():
- if dialog.key != None and len(dialog.key) == 1:
- switch.setConfig(dialog.key, latch=False,interval=0.0, release=False, enable=dialog.enable)
- self.updateSwitchIcons()
- def setupSwitchIcon(self, path):
- icon = QtGui.QIcon()
- iconPath = self.__pathToIcon['switch']
- iconPath = iconPath + path
- icon.addPixmap(QtGui.QPixmap(iconPath), QtGui.QIcon.Normal, QtGui.QIcon.Off)
- return icon
- def setupMenu(self):
- # create menu actions
- shutdownAction = QtWidgets.QAction('Shutdown', self)
- resetAction = QtWidgets.QAction("Reset", self)
- testKeysAction = QtWidgets.QAction("Test Keys", self)
- openInfoAction = QtWidgets.QAction("Information", self)
- startOTAAction = QtWidgets.QAction("OTA Update", self)
- # connect signals to funcitons
- #testKeysAction.triggered.connect(self.testKeys)
- openInfoAction.triggered.connect(self.openDeviceInformation)
- startOTAAction.triggered.connect(lambda: self.__encoder.update())
- shutdownAction.triggered.connect(lambda: self.__encoder.shutdown())
- resetAction.triggered.connect(lambda: self.__encoder.restart())
- # create menu options
- deviceMenu = QtWidgets.QMenu()
- deviceMenu.addAction(shutdownAction)
- deviceMenu.addAction(resetAction)
- deviceMenu.addAction(openInfoAction)
- deviceMenu.addSection("Engineering Tools")
- #deviceMenu.addAction(testKeysAction)
- #deviceMenu.addAction(startOTAAction)
- self.__ui.buttonMenu.setPopupMode(2)
- self.__ui.buttonMenu.setMenu(deviceMenu)
- self.__ui.buttonMenu.setStyleSheet("* { padding-right: 3px } QToolButton::menu-indicator { image: none }")
- def activateEncoder(self):
- if self.__ui.buttonActivate.text() == "Activate":
- self.__encoder.activate()
- self.__ui.buttonActivate.setText("Deactivate")
- else:
- self.__encoder.deactivate()
- self.__ui.buttonActivate.setText("Activate")
- def updateDevice(self, encoder):
- self.__encoder = encoder
- self.__rssi = encoder.getField('rssi')
- self.__soc = pt.calculateSOC(encoder.getField('soc'))
- self.__vcell = pt.calculateVCell(encoder.getField('vcell'))
- self.__strModuleID = encoder.getField('device_id')
- self.__updateTime = encoder.getField('time')
- self.__ota = encoder.getField('ota')
- if self.__deviceName == None:
- self.setModuleID(self.__strModuleID)
- else:
- self.setDeviceAlisas(self.__deviceName)
- self.setSOCIcon(self.__soc)
- self.setRSSIIcon(self.__rssi)
- self.setStatusTime(self.__updateTime)
- self.setOTAIcon(self.__ota)
- def setOTAIcon(self, ota):
- if ota == '1':
- self.__ui.labelOTA.show()
- else:
- self.__ui.labelOTA.hide()
- def setModuleID(self, strModuleID):
- self.__ui.labelModuleID.setText(strModuleID)
- def setDeviceAlisas(self, label):
- self.__deviceName = label
- if label != None or label != "":
- self.__ui.labelModuleID.setText(label)
- else:
- self.__ui.labelModuleID.setText(self.__strModuleID)
- def setSOCIcon(self, soc):
- soc = int(soc)
- if soc >= 75:
- batteryIcon = 4
- elif soc >= 50 and soc < 75:
- batteryIcon = 3
- elif soc >= 25 and soc < 50:
- batteryIcon = 2
- elif soc >=10 and soc < 25:
- batteryIcon = 1
- elif soc < 10:
- batteryIcon = 0
- batteryIcon = self.__pathToIcon['battery']+self.__icon['battery'][batteryIcon]
- self.__ui.labelBattery.setPixmap(QtGui.QPixmap(batteryIcon))
- self.__ui.labelBattery.setToolTip(str(soc) + "%")
- def setStatusTime(self, updateTime):
- lastUpdate = time.ctime(updateTime).replace(" ", " ").split(" ")
- currentTime = time.ctime().replace(" ", " ").split(" ")
- if currentTime[1] != lastUpdate[1] or currentTime[2] != lastUpdate[2] or currentTime[4] != lastUpdate[4]:
- lastUpdate = lastUpdate[1] + " " + lastUpdate[2] + " " + lastUpdate[4]
- else:
- lastUpdate = lastUpdate[3]
- self.__ui.labelStatus.setText("Last Update: " + lastUpdate)
- def setRSSIIcon(self, rssi):
- rssi = int(rssi)
- if rssi >= -50:
- rssiIcon = 4
- elif rssi >= -60 and rssi < -50:
- rssiIcon = 3
- elif rssi >= -70 and rssi < -60:
- rssiIcon = 2
- elif rssi < -70:
- rssiIcon = 1
- rssiIcon = self.__pathToIcon['network'] + self.__icon['network'][rssiIcon]
- self.__ui.labelSignal.setPixmap(QtGui.QPixmap(rssiIcon))
- self.__ui.labelSignal.setToolTip(str(rssi) + " dBm")
- def updateSwitchIcons(self):
- switch1 = self.__encoder.switchOne
- switch2 = self.__encoder.switchTwo
- switchA = self.__encoder.switchAdaptive
- switchA.deactivate()
- if switch1.active:
- iconPath = self.__icon['switch'][0]
- else:
- iconPath = self.__icon['switch'][1]
- self.__ui.buttonSwitchOne.setIcon(self.setupSwitchIcon(iconPath))
- if switch2.active:
- iconPath = self.__icon['switch'][2]
- else:
- iconPath = self.__icon['switch'][3]
- self.__ui.buttonSwitchTwo.setIcon(self.setupSwitchIcon(iconPath))
- if switchA.active:
- iconPath = self.__icon['switch'][4]
- else:
- iconPath = self.__icon['switch'][5]
- self.__ui.buttonSwitchAdaptive.setIcon(self.setupSwitchIcon(iconPath))
- def testKeys(self):
- time.sleep(3)
- for payload in [1, 0, 2, 0, 4, 0]:
- self.__wsc.keyEmulator.emulateKey(self.__strModuleID, payload)
- time.sleep(0.1)
- class SwitchUI(QtWidgets.QDialog):
- def __init__(self, switch):
- super(SwitchUI, self).__init__()
- self.__ui = Ui_SwitchConfigDialog()
- self.__ui.setupUi(self)
- self.__ui.buttonApply.clicked.connect(self.submitOnClose)
- self.key = switch.getKey()
- self.enable = switch.getActive()
- self.switch = switch
- self.__ui.checkSwitchEnable.setChecked(self.enable)
- self.__ui.lineSwitchKey.setText(self.key)
- def submitOnClose(self):
- self.key = self.__ui.lineSwitchKey.text()
- self.enable = self.__ui.checkSwitchEnable.isChecked()
- self.accept()
- class InfoUI(QtWidgets.QDialog):
- newDeviceName = QtCore.pyqtSignal(['QString'], name='newDeviceName')
- def __init__(self):
- super(InfoUI, self).__init__()
- self.__ui = Ui_Dialog()
- self.__ui.setupUi(self)
- self.__ui.lineAlias.textEdited.connect(lambda: self.newDeviceName.emit(self.__ui.lineAlias.text()))
- def updateDisplay(self, encoder):
- self.__ui.labelBatteryCapacity.setText(encoder['soc'].decode('utf-8'))
- self.__ui.labelBatteryVoltage.setText(str(encoder['vcell'].decode('utf-8')))
- self.__ui.labelLowBattery.setText(str(encoder['lbi'].decode('utf-8')))
- self.__ui.labelChargeState.setText('N/A')
- self.__ui.labelActiveFlag.setText('N/A')
- self.__ui.labelAliveFlag.setText(str(encoder['alive'].decode('utf-8')))
- self.__ui.labelFirmwareVersion.setText(str(encoder['fw_ver'].decode('utf-8')))
- self.__ui.labelHardwareVersion.setText(str(encoder['hw_ver'].decode('utf-8')))
- self.__ui.labelOTAFlag.setText(str(encoder['ota'].decode('utf-8')))
- self.__ui.labelROMSlot.setText(str(encoder['rom'].decode('utf-8')))
- self.__ui.labelMAC.setText(str(encoder['device_id']))
- self.__ui.labelRSSI.setText(str(encoder['rssi'].decode('utf-8')))
- self.__ui.labelSSID.setText(str(encoder['ssid'].decode('utf-8')))
- if __name__ == "__main__":
- deviceID = '23:45:21:23:32'
-
- encoder = Encoder(deviceID, None)
- print(pt.calculateSOC(encoder.getField('soc')))
- app = QtWidgets.QApplication(sys.argv)
- encoderUI = EncoderUI(encoder)
-
- encoderUI.show()
- sys.exit(app.exec_())
|