Source code for sensirion_shdlc_driver.firmware_update

# -*- coding: utf-8 -*-
# (c) Copyright 2019 Sensirion AG, Switzerland

from __future__ import absolute_import, division, print_function
from .commands.bootloader import ShdlcCmdEnterBootloader, \
    ShdlcCmdFirmwareUpdateStart, ShdlcCmdFirmwareUpdateData, \
    ShdlcCmdFirmwareUpdateStop
from .errors import ShdlcFirmwareImageIncompatibilityError

import logging
log = logging.getLogger(__name__)


[docs]class ShdlcFirmwareUpdate(object): """ Helper class to perform firmware updates over SHDLC. .. note:: Device-specific drivers should provide a simple wrapper method to easily perform firmware updates. So usually you don't have to use this class directly. """ # The SHDLC bootloader has a fixed bitrate and slave address, independent # of the actual device settings or rotary switches. BOOTLOADER_BITRATE = 115200 BOOTLOADER_SLAVE_ADDRESS = 0
[docs] def __init__(self, device, image, status_callback=None, progress_callback=None): """ Constructor. :param ~sensirion_shdlc_driver.device.ShdlcDevice device: The device to update. :param ~sensirion_shdlc_driver.firmware_image.ShdlcFirmwareImage image: The image to flash. :param callable status_callback: Optional callback for status report, taking a string as parameter. :param callable progress_callback: Optional callback for progress report, taking a float (progress in percent) as parameter. """ self._device = device self._image = image self._status_callback = status_callback self._progress_callback = progress_callback
[docs] def execute(self, emergency=False): """ Perform the firmware update. .. note:: This can take several minutes, don't abort it! If aborted, the device stays in the bootloader and you need to restart the update with ``emergency=True`` to recover. :param bool emergency: Must be set to ``True`` if the device is already in bootloader mode, ``False`` otherwise. :raises ~sensirion_shdlc_driver.errors.ShdlcFirmwareImageIncompatibilityError: If the image is not compatible with the connected device. :raises Exception: On other errors. """ # noqa: E501 # Lock the port during the whole firmware update procedure. with self._device.connection.port.lock: # Check if the underlying port supports changing the bitrate. If it # doesn't support changing the bitrate (e.g. like ShdlcTcpPort), we # must raise an exception *before* the "enter bootloader" command # is sent to the device. Otherwise the device enters the bootloader # but the user will not be able to update the firmware, which would # be a bad situation. Thus we just read and set the bitrate to # check if the underlying port has a bitrate getter and setter. try: old_bitrate = self._device.connection.port.bitrate self._device.connection.port.bitrate = old_bitrate except NotImplementedError: raise NotImplementedError( "The used port '{}' does not support changing the bitrate," " therefore it's not possible to update the firmware." .format(self._device.connection.port.__class__.__name__)) # Only check product type and enter the bootloader if it's not an # emergency update, i.e. the application firmware is still running. if not emergency: self._check_product_type(progress=4.0) self._enter_bootloader(progress=7.0) # Switch to the bootloader bitrate and send the update commands. self._device.connection.port.bitrate = self.BOOTLOADER_BITRATE try: self._send_start(progress=10.0) self._send_data(progress_start=10.0, progress_end=90.0) self._send_stop(progress=100.0) finally: self._device.connection.port.bitrate = old_bitrate self._status("Finished!")
def _check_product_type(self, progress): """ Check the product type. Checks if the product type of the loaded firmware image corresponds to the product type of the connected device. Raises an exception if it doesn't match. """ self._status("Check compatibility...") actual = self._device.get_product_type(as_int=True) expected = self._image.product_type if actual != expected: raise ShdlcFirmwareImageIncompatibilityError(expected, actual) self._progress(progress) def _enter_bootloader(self, progress): """ Enter the bootloaded on the device. """ self._status("Enter bootloader...") self._execute(self._device.slave_address, ShdlcCmdEnterBootloader()) self._progress(progress) def _send_start(self, progress): """ Send the update start command to the bootloader. :param float progress: The progress after executing this command. """ self._status("Clear flash...") self._execute(self.BOOTLOADER_SLAVE_ADDRESS, ShdlcCmdFirmwareUpdateStart()) self._progress(progress) def _send_data(self, progress_start, progress_end): """ Send the new firmware to the bootloader. :param float progress_start: The progress before sending the data. :param float progress_end: The progress after sending the data. """ self._status("Write new firmware...") progress_diff = progress_end - progress_start while self._image.available_bytes > 0: cmd = ShdlcCmdFirmwareUpdateData(data=self._image.read(254)) self._execute(self.BOOTLOADER_SLAVE_ADDRESS, cmd) count = self._image.size - self._image.available_bytes ratio = count / float(self._image.size) percent = progress_start + (ratio * progress_diff) self._status("Write new firmware: {:.2f} kB of {:.2f} kB".format( count / 1024.0, self._image.size / 1024.0)) self._progress(percent) def _send_stop(self, progress): """ Send the update stop command to the bootloader. :param float progress: The progress after executing this command. """ self._status("Verify checksum...") cmd = ShdlcCmdFirmwareUpdateStop(checksum=self._image.checksum) self._execute(self.BOOTLOADER_SLAVE_ADDRESS, cmd) self._progress(progress) def _execute(self, slave_address, command): """ Execute an SHDLC command. :param int slave_address: The slave address where to send the command. :param ~sensirion_shdlc_driver.command.ShdlcCommand command: The command to execute. """ self._device.connection.execute(slave_address, command) def _status(self, status): """ Update the status message. :param string status: The new status message. """ log.debug("SHDLC firmware update: {}".format(status)) if self._status_callback: self._status_callback(status) def _progress(self, percent): """ Update the progress position. :param float percent: The new progress in percent. """ if self._progress_callback: self._progress_callback(percent)