1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- import logging
- from threading import Lock
- import serial
- logger = logging.getLogger(__name__)
- def drain_serial(serial_port: serial.Serial):
- """drain up to 64k outstanding data in the serial incoming buffer"""
-
- timeout = serial_port.timeout
- serial_port.timeout = 0.05
- for _ in range(512):
- cnt = len(serial_port.read(128))
- if not cnt:
- serial_port.timeout = timeout
- return
- serial_port.timeout = timeout
- logger.warning("unable to drain all data")
- class Interface(serial.Serial):
- def __init__(self, interface_type: str, comment, *args, **kwargs):
- super().__init__(*args, **kwargs)
- assert interface_type in ('serial', 'usb', 'bt', 'network')
- self.type = interface_type
- self.comment = comment
- self.port = None
- self.baudrate = 115200
- self.timeout = 0.05
- self.lock = Lock()
- def __str__(self):
- return f"{self.port} ({self.comment})"
|