# -*- coding: utf-8 -*-
# (c) Copyright 2019 Sensirion AG, Switzerland
from __future__ import absolute_import, division, print_function
from .errors import ShdlcTimeoutError
from .serial_frame_builder import ShdlcSerialMosiFrameBuilder, \
ShdlcSerialMisoFrameBuilder
from threading import RLock
import serial
import socket
import time
import logging
log = logging.getLogger(__name__)
[docs]class ShdlcPort(object):
"""
Common interface for all communication ports for transceiving SHDLC frames.
Concrete implementations may use the serial port or another interface for
transceiving SHDLC frames. All methods must be implemented thread-safe,
i.e. allowing them to be called from multiple threads at the same time.
"""
@property
def description(self):
"""
Get a description of the port.
:return: Description string.
:rtype: string
"""
raise NotImplementedError()
@property
def bitrate(self):
"""
The current bitrate in bit/s.
:type: int
"""
raise NotImplementedError()
@bitrate.setter
def bitrate(self, bitrate):
raise NotImplementedError()
@property
def lock(self):
"""
Get the lock object of the port to allow locking it, i.e. to get
exclusive access across multiple method calls.
:return: The lock object.
:rtype: threading.RLock
"""
raise NotImplementedError()
@property
def is_open(self):
"""
Indicates whether the port is open.
:return: If ``True`` the port is open, if ``False`` the port is closed.
:rtype: bool
"""
raise NotImplementedError()
[docs] def open(self):
"""
Open the port. Only needs to be called if the port is not already
opened. Does nothing if the port is already opened.
"""
raise NotImplementedError()
[docs] def close(self):
"""
Close the port to release the underlying resources. Does nothing if
the port is already closed.
"""
raise NotImplementedError()
[docs] def transceive(self, slave_address, command_id, data, response_timeout):
"""
Send SHDLC frame to port and return received response frame.
.. note:: The specified response timeout defines the maximum time the
device needs until it starts to send the response after
receiving the last byte from the master request. The time
needed for the transmission itself and other possible
overhead or delays depends on hardware, drivers, bitrate etc.
and must be taken into account in the implementation of this
method.
:param byte slave_address: Slave address.
:param byte command_id: SHDLC command ID.
:param bytes-like data: Payload.
:param float response_timeout: Response timeout in seconds (maximum
time until the first byte is received).
:return: Received address, command_id, state, and payload.
:rtype: byte, byte, byte, bytes
:raise ~sensirion_shdlc_driver.errors.ShdlcTimeoutError:
If no response received within timeout.
:raise ~sensirion_shdlc_driver.errors.ShdlcResponseError:
If the received response is invalid.
"""
raise NotImplementedError()
[docs]class ShdlcSerialPort(ShdlcPort):
"""
SHDLC transceiver for the serial port (e.g. UART/RS232/RS485).
This class implements the ShdlcPort interface for the serial port.
.. note:: This class can be used in a "with"-statement, and it's
recommended to do so as it automatically closes the port after
using it.
"""
[docs] def __init__(self, port, baudrate, additional_response_time=0.1,
do_open=True):
"""
Create and optionally open a serial port. Throws an exception if the
port cannot be opened.
:param string port: The serial port (e.g. "COM2" or "/dev/ttyUSB0")
:param int baudrate: The baudrate in bit/s.
:param float additional_response_time: Additional response time (in
Seconds) used when receiving frames. See property
:py:attr:`~sensirion_shdlc_driver.port.ShdlcSerialPort.additional_response_time`
for details. Defaults to 0.1 (i.e. 100ms) which should be enough
in most cases.
:param bool do_open:
Whether the serial port should be opened immediately or not.
If ``False``, you will have to call
:py:meth:`~sensirion_shdlc_driver.port.ShdlcSerialPort.open`
manually before using this object. Defaults to ``True``.
"""
super(ShdlcSerialPort, self).__init__()
log.debug("Open ShdlcSerialPort on '{}' with {} bit/s."
.format(port, baudrate))
self._additional_response_time = float(additional_response_time)
self._lock = RLock()
self._serial = serial.Serial(port=None, baudrate=baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=0.01, xonxoff=False)
self._serial.port = port
if do_open:
self.open()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@property
def description(self):
"""
Get a description of the port.
:return: Description string.
:rtype: string
"""
with self._lock:
return self._serial.name + '@' + str(self._serial.baudrate)
@property
def bitrate(self):
"""
The current bitrate in bit/s.
:type: int
"""
with self._lock:
return self._serial.baudrate
@bitrate.setter
def bitrate(self, bitrate):
with self._lock:
self._serial.baudrate = bitrate
@property
def additional_response_time(self):
"""
The additional response time (in Seconds) used when receiving frames.
Since the timeout measurement of serial communication is typically
very inaccurate (e.g. USB-UART converter drivers often buffer I/O
data for 16ms), this class adds some extra time to the specified
response timeout to avoid timeout errors even if the device responded
within the given timeout. If needed, this extra time can be changed
either with this property, or with the parameter
``additional_response_time`` of
:py:meth:`~sensirion_shdlc_driver.port.ShdlcSerialPort.__init__`.
:type: float
"""
with self._lock:
return self._additional_response_time
@additional_response_time.setter
def additional_response_time(self, additional_response_time):
with self._lock:
self._additional_response_time = float(additional_response_time)
@property
def lock(self):
"""
Get the lock object of the port to allow locking it, i.e. to get
exclusive access across multiple method calls.
:return: The lock object.
:rtype: threading.RLock
"""
return self._lock
@property
def is_open(self):
"""
Indicates whether the port is open.
:return: If ``True`` the port is open, if ``False`` the port is closed.
:rtype: bool
"""
return self._serial.is_open
[docs] def open(self):
"""
Open the serial port (only needs to be called if ``do_open`` in
:py:meth:`~sensirion_shdlc_driver.port.ShdlcSerialPort.__init__`
was set to ``False``). Does nothing if the port is already opened.
"""
if self._serial.is_open is False:
self._serial.open()
[docs] def close(self):
"""
Close (release) the serial port. Does nothing if the port is already
closed.
"""
if self._serial.is_open is True:
self._serial.close()
[docs] def transceive(self, slave_address, command_id, data, response_timeout):
"""
Send SHDLC frame to port and return received response frame.
:param byte slave_address: Slave address.
:param byte command_id: SHDLC command ID.
:param bytes-like data: Payload.
:param float response_timeout: Response timeout in seconds (maximum
time until the first byte is received).
:return: Received address, command_id, state, and payload.
:rtype: byte, byte, byte, bytes
:raise ~sensirion_shdlc_driver.errors.ShdlcTimeoutError:
If no response received within timeout.
:raise ~sensirion_shdlc_driver.errors.ShdlcResponseError:
If the received response is invalid.
"""
with self._lock:
self._serial.flushInput()
self._send_frame(slave_address, command_id, data)
self._serial.flush()
return self._receive_frame(response_timeout)
def _send_frame(self, slave_address, command_id, data):
"""
Send a frame to the serial port.
:param byte slave_address: Slave address.
:param byte command_id: SHDLC command ID.
:param bytes-like data: Payload.
"""
builder = ShdlcSerialMosiFrameBuilder(slave_address, command_id, data)
tx_data = builder.to_bytes()
log.debug("ShdlcSerialPort send raw: [{}]".format(
", ".join(["0x%.2X" % i for i in bytearray(tx_data)])))
self._serial.write(tx_data)
def _receive_frame(self, response_timeout):
"""
Wait for the response frame and return it.
:param float response_timeout: Response timeout in seconds (maximum
time until the first byte is received).
:return: Received address, command_id, state, and payload.
:rtype: byte, byte, byte, bytes
"""
start_time = time.time()
response_timeout += self._additional_response_time # add extra time
total_timeout = response_timeout + self._calculate_maximum_frame_time()
builder = ShdlcSerialMisoFrameBuilder()
while True:
# Fetch all received bytes at once (to get maximum performance) or
# wait for at least one byte (with timeout) if the buffer is empty.
new_data = self._serial.read(max(self._serial.inWaiting(), 1))
# Process received data and return if the frame is complete.
if builder.add_data(new_data):
log.debug("ShdlcSerialPort received raw: [{}]".format(
", ".join(["0x%.2X" % i for i in builder.data])))
return builder.interpret_data()
# Frame not (completely) received yet, check timeout conditions.
elapsed_time = time.time() - start_time
timeout = \
total_timeout if builder.start_received else response_timeout
if elapsed_time > timeout:
log.warning("ShdlcSerialPort timed out while waiting for "
"response after {:.0f} ms.".format(
elapsed_time * 1000.0))
log.debug("ShdlcSerialPort received raw until timeout: [{}]"
.format(", ".join(["0x%.2X" % i
for i in builder.data])))
raise ShdlcTimeoutError()
def _calculate_maximum_frame_time(self):
"""
Calculate the time required for receiving the longest possible frame,
respecting the used bitrate and with some extra time for inter-byte
spaces etc.
:return: Maximum frame time in Seconds
:rtype: float
"""
# Calculate theoretical transmission time of longest possible frame:
# 600 bytes * (start bit + 8 data bits + stop bit) / bitrate
max_frame_time = (600.0 * 10.0) / self.bitrate
# Add 200ms extra, e.g. for inter-byte spaces.
return max_frame_time + 0.2
[docs]class ShdlcTcpPort(ShdlcPort):
"""
SHDLC transceiver for a TCP/IP port in client connection mode.
This class implements the ShdlcPort interface for a client connection on a
TCP/IP port.
.. note:: This class can be used in a "with"-statement, and it's
recommended to do so as it automatically closes the port after
using it.
"""
[docs] def __init__(self, ip, port, socket_timeout=5.0, do_open=True):
"""
Create and optionally open a TCP socket. Throws an exception if the
socket cannot be opened.
:param string ip: The IP address (e.g. "192.168.100.200").
:param int port: The TCP port.
:param float socket_timeout: General TCP socket base timeout. Upon data
transmission, the socket timeout is adjusted for each command, i.e.
the timeout is increased with the parameter ``response_timeout`` of
:py:meth:`~sensirion_shdlc_driver.port.ShdlcTcpPort.transceive`.
:param bool do_open:
Whether the port should be opened immediately or not. If ``False``,
you will have to call
:py:meth:`~sensirion_shdlc_driver.port.ShdlcTcpPort.open`
manually before using this object. Defaults to ``True``.
"""
super(ShdlcTcpPort, self).__init__()
log.debug("Open ShdlcTcpPort as TCP client to '{}' on port {}."
.format(ip, port))
self._ip = str(ip)
self._port = int(port)
self._socket_timeout = float(socket_timeout)
self._is_open = False
self._lock = RLock()
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(self._socket_timeout)
if do_open:
self.open()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@property
def description(self):
"""
Get the description of the TCP socket (address and port).
:return: Description string.
:rtype: string
"""
with self._lock:
return '{}:{}'.format(self._ip, self._port)
@property
def socket_timeout(self):
"""
The base timeout of the TCP socket (in seconds) during transmission.
The actual socket timeout is adjusted for each command. There are
commands (e.g. flash erase) which require several seconds to be
successfully executed. Therefore, the actual socket timeout value
is calculated by the sum of the base timeout, plus the parameter
``response_timeout`` of
:py:meth:`~sensirion_shdlc_driver.port.ShdlcTcpPort.transceive`.
:type: float
"""
with self._lock:
return self._socket_timeout
@socket_timeout.setter
def socket_timeout(self, socket_timeout):
with self._lock:
self._socket_timeout = float(socket_timeout)
@property
def lock(self):
"""
Get the lock object of the port to allow locking it, i.e. to get
exclusive access across multiple method calls.
:return: The lock object.
:rtype: threading.RLock
"""
return self._lock
@property
def is_open(self):
"""
Indicates whether the port is open.
:return: If ``True`` the port is open, if ``False`` the port is closed.
:rtype: bool
"""
return self._is_open
[docs] def open(self):
"""
Open the TCP socket (only needs to be called if ``do_open`` in
:py:meth:`~sensirion_shdlc_driver.port.ShdlcSerialPort.__init__`
was set to ``False``). Does nothing if the socket is already opened.
"""
if self._is_open is False:
self._socket.connect((self._ip, self._port))
self._is_open = True
[docs] def close(self):
"""
Close the TCP socket. Does nothing if the socket is already closed.
"""
if self._is_open is True:
self._socket.close()
self._is_open = False
[docs] def transceive(self, slave_address, command_id, data, response_timeout):
"""
Send SHDLC frame to the TCP socket and return received response frame.
:param byte slave_address: Slave address.
:param byte command_id: SHDLC command ID.
:param bytes-like data: Payload.
:param float response_timeout: Response timeout in seconds. The actual
command response timeout is defined by the sum of this parameter
and the socket base timeout.
:return: Received address, command_id, state, and payload.
:rtype: byte, byte, byte, bytes
:raise ~sensirion_shdlc_driver.errors.ShdlcTimeoutError:
If no response received within timeout.
:raise ~sensirion_shdlc_driver.errors.ShdlcResponseError:
If the received response is invalid.
"""
with self._lock:
self._socket.settimeout(self._socket_timeout + response_timeout)
self._send_frame(slave_address, command_id, data)
return self._receive_frame()
def _send_frame(self, slave_address, command_id, data):
"""
Send a frame to the TCP socket.
:param byte slave_address: Slave address.
:param byte command_id: SHDLC command ID.
:param bytes-like data: Payload.
"""
builder = ShdlcSerialMosiFrameBuilder(slave_address, command_id, data)
tx_data = builder.to_bytes()
log.debug("ShdlcTcpPort send raw: [{}]".format(
", ".join(["0x%.2X" % i for i in bytearray(tx_data)])))
self._socket.send(tx_data)
def _receive_frame(self):
"""
Wait for the response frame and return it.
:return: Received address, command_id, state, and payload.
:rtype: byte, byte, byte, bytes
"""
builder = ShdlcSerialMisoFrameBuilder()
try:
while True:
# Receive data from socket
# Note: recv buffer size should be a relatively small power
# of 2. See: https://docs.python.org/3/library/socket.html
new_data = self._socket.recv(1024)
if len(new_data) == 0:
raise ShdlcTimeoutError()
# Process received data and return if the frame is complete
if builder.add_data(new_data):
log.debug("ShdlcTcpPort received raw: [{}]".format(
", ".join(["0x%.2X" % i for i in builder.data])))
return builder.interpret_data()
except socket.timeout:
raise ShdlcTimeoutError()