Source code for sensirion_shdlc_driver.connection

# -*- coding: utf-8 -*-
# (c) Copyright 2019 Sensirion AG, Switzerland

from __future__ import absolute_import, division, print_function
from .errors import ShdlcResponseError, ShdlcDeviceError
import time

import logging
log = logging.getLogger(__name__)


[docs]class ShdlcConnection(object): """ This class represents the connection to an SHDLC bus. So you need to instantiate one object per bus, no matter how many devices are connected to that bus system. The basic functionality of the class is to send SHDLC frames to devices and receive their response. Handling of communication errors (e.g. timeout or checksum errors) and device errors is done in this class. """
[docs] def __init__(self, port): """ Open an SHDLC connection on a specific port. .. note:: This constructor does not send or receive any data to resp. from the specified port. :param ~sensirion_shdlc_driver.port.ShdlcPort port: The port used for communication (must implement the :py:class:`~sensirion_shdlc_driver.port.ShdlcPort` interface) """ super(ShdlcConnection, self).__init__() self._port = port log.debug("Opened ShdlcConnection on '{}'.".format(port.description))
@property def port(self): """ Get the underlying port. :return: An :py:class:`~sensirion_shdlc_driver.port.ShdlcPort` object. :rtype: ~sensirion_shdlc_driver.port.ShdlcPort """ return self._port
[docs] def execute(self, slave_address, command, wait_post_process=True): """ Execute an ShdlcCommand and return the interpreted response. Executing a command means: - Send request (MOSI) frame - Receive response (MISO) frame - Validate and interpret response data - Wait until post processing is done (optional, and only if needed) :param byte slave_address: Slave address. :param ~sensirion_shdlc_driver.command.ShdlcCommand command: SHDLC command to execute. :param bool wait_post_process: If true and the command needs some time for post processing, this thread blocks until post processing is done. :return: Received response (interpreted) and error state flag. :rtype: object, bool """ data, error = self.transceive(slave_address, command.id, command.data, command.max_response_time) if wait_post_process and command.post_processing_time > 0.0: # Wait for post processing in the device (to be sure the device is # ready for receiving the next command). time.sleep(command.post_processing_time) command.check_response_length(data) # Raises if length was wrong return command.interpret_response(data), error
[docs] def transceive(self, slave_address, command_id, data, response_timeout): """ Send a raw SHDLC command and return the received raw response. :param byte slave_address: Slave address. :param byte command_id: SHDLC command ID. :param bytes-like data: Payload (may be empty). :param float response_timeout: Response timeout in seconds (maximum time until the first byte is received). :return: Received response payload and error state flag. :rtype: bytes, bool """ rx_addr, rx_cmd, rx_state, rx_data = self._port.transceive( slave_address, command_id, data, response_timeout) if rx_addr != slave_address: raise ShdlcResponseError("Received slave address {} instead of {}." .format(rx_addr, slave_address)) if rx_cmd != command_id: raise ShdlcResponseError("Received command ID 0x{:02X} instead of " "0x{:02X}.".format(rx_cmd, command_id)) error_state = True if rx_state & 0x80 else False if error_state: log.warning("SHDLC device with address {} is in error state." .format(slave_address)) error_code = rx_state & 0x7F if error_code: log.warning("SHDLC device with address {} returned error {}." .format(slave_address, error_code)) raise ShdlcDeviceError(error_code) # Command failed to execute return rx_data, error_state