app_update: implement Python API for otatool

This commit is contained in:
Renz Christian Bagaporo 2019-05-27 11:08:28 +08:00
parent 63bd57c1d7
commit 1de627e68a
4 changed files with 277 additions and 157 deletions

View file

@ -33,10 +33,8 @@ if(NOT BOOTLOADER_BUILD)
idf_build_get_property(idf_path IDF_PATH)
idf_build_get_property(python PYTHON)
add_custom_command(OUTPUT ${blank_otadata_file}
COMMAND ${python} ${idf_path}/components/partition_table/parttool.py
--partition-type data --partition-subtype ota -q
--partition-table-file ${PARTITION_CSV_PATH} generate_blank_partition_file
--output ${blank_otadata_file})
COMMAND ${python} ${idf_path}/components/partition_table/gen_empty_partition.py
${otadata_size} ${blank_otadata_file})
add_custom_target(blank_ota_data ALL DEPENDS ${blank_otadata_file})
add_dependencies(app blank_ota_data)

View file

@ -17,8 +17,7 @@ endif
$(BLANK_OTA_DATA_FILE): partition_table_get_info $(PARTITION_TABLE_CSV_PATH) | check_python_dependencies
$(shell if [ "$(OTA_DATA_OFFSET)" != "" ] && [ "$(OTA_DATA_SIZE)" != "" ]; then \
$(PARTTOOL_PY) --partition-type data --partition-subtype ota --partition-table-file $(PARTITION_TABLE_CSV_PATH) \
-q generate_blank_partition_file --output $(BLANK_OTA_DATA_FILE); \
$(PYTHON) $(IDF_PATH)/components/partition_table/gen_empty_partition.py $(OTA_DATA_SIZE) $(BLANK_OTA_DATA_FILE); \
fi; )
$(eval BLANK_OTA_DATA_FILE = $(shell if [ "$(OTA_DATA_OFFSET)" != "" ] && [ "$(OTA_DATA_SIZE)" != "" ]; then \
echo $(BLANK_OTA_DATA_FILE); else echo " "; fi) )

View file

@ -21,16 +21,20 @@ import argparse
import os
import sys
import binascii
import subprocess
import tempfile
import collections
import struct
__version__ = '1.0'
try:
from parttool import PartitionName, PartitionType, ParttoolTarget, PARTITION_TABLE_OFFSET
except ImportError:
COMPONENTS_PATH = os.path.expandvars(os.path.join("$IDF_PATH", "components"))
PARTTOOL_DIR = os.path.join(COMPONENTS_PATH, "partition_table")
IDF_COMPONENTS_PATH = os.path.expandvars(os.path.join("$IDF_PATH", "components"))
sys.path.append(PARTTOOL_DIR)
from parttool import PartitionName, PartitionType, ParttoolTarget, PARTITION_TABLE_OFFSET
PARTTOOL_PY = os.path.join(IDF_COMPONENTS_PATH, "partition_table", "parttool.py")
__version__ = '2.0'
SPI_FLASH_SEC_SIZE = 0x2000
@ -42,121 +46,69 @@ def status(msg):
print(msg)
def _invoke_parttool(parttool_args, args, output=False, partition=None):
invoke_args = []
class OtatoolTarget():
if partition:
invoke_args += [sys.executable, PARTTOOL_PY] + partition
else:
invoke_args += [sys.executable, PARTTOOL_PY, "--partition-type", "data", "--partition-subtype", "ota"]
OTADATA_PARTITION = PartitionType("data", "ota")
if quiet:
invoke_args += ["-q"]
def __init__(self, port=None, partition_table_offset=PARTITION_TABLE_OFFSET, partition_table_file=None, spi_flash_sec_size=SPI_FLASH_SEC_SIZE):
self.target = ParttoolTarget(port, partition_table_offset, partition_table_file)
self.spi_flash_sec_size = spi_flash_sec_size
if args.port != "":
invoke_args += ["--port", args.port]
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.close()
try:
self.target.read_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name)
with open(temp_file.name, "rb") as f:
self.otadata = f.read()
except Exception:
self.otadata = None
finally:
os.unlink(temp_file.name)
if args.partition_table_file:
invoke_args += ["--partition-table-file", args.partition_table_file]
def _check_otadata_partition(self):
if not self.otadata:
raise Exception("No otadata partition found")
if args.partition_table_offset:
invoke_args += ["--partition-table-offset", args.partition_table_offset]
def erase_otadata(self):
self._check_otadata_partition()
self.target.erase_partition(OtatoolTarget.OTADATA_PARTITION)
invoke_args += parttool_args
def _get_otadata_info(self):
info = []
if output:
return subprocess.check_output(invoke_args)
else:
return subprocess.check_call(invoke_args)
otadata_info = collections.namedtuple("otadata_info", "seq crc")
for i in range(2):
start = i * (self.spi_flash_sec_size >> 1)
def _get_otadata_contents(args, check=True):
global quiet
seq = bytearray(self.otadata[start:start + 4])
crc = bytearray(self.otadata[start + 28:start + 32])
if check:
check_args = ["get_partition_info", "--info", "offset", "size"]
seq = struct.unpack('>I', seq)
crc = struct.unpack('>I', crc)
quiet = True
output = _invoke_parttool(check_args, args, True).split(b" ")
quiet = args.quiet
info.append(otadata_info(seq[0], crc[0]))
if not output:
raise RuntimeError("No ota_data partition found")
return info
with tempfile.NamedTemporaryFile(delete=False) as f:
f_name = f.name
def _get_partition_id_from_ota_id(self, ota_id):
if isinstance(ota_id, int):
return PartitionType("app", "ota_" + str(ota_id))
else:
return PartitionName(ota_id)
try:
invoke_args = ["read_partition", "--output", f_name]
_invoke_parttool(invoke_args, args)
with open(f_name, "rb") as f:
contents = f.read()
finally:
os.unlink(f_name)
def switch_ota_partition(self, ota_id):
self._check_otadata_partition()
return contents
sys.path.append(PARTTOOL_DIR)
import gen_esp32part as gen
def _get_otadata_status(otadata_contents):
status = []
otadata_status = collections.namedtuple("otadata_status", "seq crc")
for i in range(2):
start = i * (SPI_FLASH_SEC_SIZE >> 1)
seq = bytearray(otadata_contents[start:start + 4])
crc = bytearray(otadata_contents[start + 28:start + 32])
seq = struct.unpack('>I', seq)
crc = struct.unpack('>I', crc)
status.append(otadata_status(seq[0], crc[0]))
return status
def read_otadata(args):
status("Reading ota_data partition contents...")
otadata_info = _get_otadata_contents(args)
otadata_info = _get_otadata_status(otadata_info)
print(otadata_info)
print("\t\t{:11}\t{:8s}|\t{:8s}\t{:8s}".format("OTA_SEQ", "CRC", "OTA_SEQ", "CRC"))
print("Firmware: 0x{:8x} \t 0x{:8x} |\t0x{:8x} \t 0x{:8x}".format(otadata_info[0].seq, otadata_info[0].crc,
otadata_info[1].seq, otadata_info[1].crc))
def erase_otadata(args):
status("Erasing ota_data partition contents...")
_invoke_parttool(["erase_partition"], args)
status("Erased ota_data partition contents")
def switch_otadata(args):
sys.path.append(os.path.join(IDF_COMPONENTS_PATH, "partition_table"))
import gen_esp32part as gen
with tempfile.NamedTemporaryFile(delete=False) as f:
f_name = f.name
try:
def is_otadata_status_valid(status):
def is_otadata_info_valid(status):
seq = status.seq % (1 << 32)
crc = hex(binascii.crc32(struct.pack("I", seq), 0xFFFFFFFF) % (1 << 32))
return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
status("Looking for ota app partitions...")
# In order to get the number of ota app partitions, we need the partition table
partition_table = None
invoke_args = ["get_partition_info", "--table", f_name]
_invoke_parttool(invoke_args, args)
partition_table = open(f_name, "rb").read()
partition_table = gen.PartitionTable.from_binary(partition_table)
partition_table = self.target.partition_table
ota_partitions = list()
@ -171,39 +123,36 @@ def switch_otadata(args):
ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
if not ota_partitions:
raise RuntimeError("No ota app partitions found")
status("Verifying partition to switch to exists...")
raise Exception("No ota app partitions found")
# Look for the app partition to switch to
ota_partition_next = None
try:
if args.name:
ota_partition_next = filter(lambda p: p.name == args.name, ota_partitions)
if isinstance(ota_id, int):
ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == ota_id, ota_partitions)
else:
ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == args.slot, ota_partitions)
ota_partition_next = filter(lambda p: p.name == ota_id, ota_partitions)
ota_partition_next = list(ota_partition_next)[0]
except IndexError:
raise RuntimeError("Partition to switch to not found")
raise Exception("Partition to switch to not found")
otadata_contents = _get_otadata_contents(args)
otadata_status = _get_otadata_status(otadata_contents)
otadata_info = self._get_otadata_info()
# Find the copy to base the computation for ota sequence number on
otadata_compute_base = -1
# Both are valid, take the max as computation base
if is_otadata_status_valid(otadata_status[0]) and is_otadata_status_valid(otadata_status[1]):
if otadata_status[0].seq >= otadata_status[1].seq:
if is_otadata_info_valid(otadata_info[0]) and is_otadata_info_valid(otadata_info[1]):
if otadata_info[0].seq >= otadata_info[1].seq:
otadata_compute_base = 0
else:
otadata_compute_base = 1
# Only one copy is valid, use that
elif is_otadata_status_valid(otadata_status[0]):
elif is_otadata_info_valid(otadata_info[0]):
otadata_compute_base = 0
elif is_otadata_status_valid(otadata_status[1]):
elif is_otadata_info_valid(otadata_info[1]):
otadata_compute_base = 1
# Both are invalid (could be initial state - all 0xFF's)
else:
@ -216,7 +165,7 @@ def switch_otadata(args):
# Find the next ota sequence number
if otadata_compute_base == 0 or otadata_compute_base == 1:
base_seq = otadata_status[otadata_compute_base].seq % (1 << 32)
base_seq = otadata_info[otadata_compute_base].seq % (1 << 32)
i = 0
while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
@ -231,47 +180,68 @@ def switch_otadata(args):
ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
ota_seq_crc_next = struct.pack("I", ota_seq_crc_next)
with open(f_name, "wb") as otadata_next_file:
start = (1 if otadata_compute_base == 0 else 0) * (SPI_FLASH_SEC_SIZE >> 1)
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.close()
otadata_next_file.write(otadata_contents)
try:
with open(temp_file.name, "wb") as otadata_next_file:
start = (1 if otadata_compute_base == 0 else 0) * (self.spi_flash_sec_size >> 1)
otadata_next_file.seek(start)
otadata_next_file.write(ota_seq_next)
otadata_next_file.write(self.otadata)
otadata_next_file.seek(start + 28)
otadata_next_file.write(ota_seq_crc_next)
otadata_next_file.seek(start)
otadata_next_file.write(ota_seq_next)
otadata_next_file.flush()
otadata_next_file.seek(start + 28)
otadata_next_file.write(ota_seq_crc_next)
_invoke_parttool(["write_partition", "--input", f_name], args)
status("Updated ota_data partition")
finally:
os.unlink(f_name)
otadata_next_file.flush()
self.target.write_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name)
finally:
os.unlink(temp_file.name)
def read_ota_partition(self, ota_id, output):
self.target.read_partition(self._get_partition_id_from_ota_id(ota_id), output)
def write_ota_partition(self, ota_id, input):
self.target.write_partition(self._get_partition_id_from_ota_id(ota_id), input)
def erase_ota_partition(self, ota_id):
self.target.erase_partition(self._get_partition_id_from_ota_id(ota_id))
def _get_partition_specifier(args):
if args.name:
return ["--partition-name", args.name]
else:
return ["--partition-type", "app", "--partition-subtype", "ota_" + str(args.slot)]
def _read_otadata(target):
target._check_otadata_partition()
otadata_info = target._get_otadata_info(target.otadata)
print("\t\t{:11}\t{:8s}|\t{:8s}\t{:8s}".format("OTA_SEQ", "CRC", "OTA_SEQ", "CRC"))
print("Firmware: 0x{:8x} \t 0x{:8x} |\t0x{:8x} \t 0x{:8x}".format(otadata_info[0].seq, otadata_info[0].crc,
otadata_info[1].seq, otadata_info[1].crc))
def read_ota_partition(args):
invoke_args = ["read_partition", "--output", args.output]
_invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
status("Read ota partition contents to file {}".format(args.output))
def _erase_otadata(target):
target.erase_otadata()
status("Erased ota_data partition contents")
def write_ota_partition(args):
invoke_args = ["write_partition", "--input", args.input]
_invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
status("Written contents of file {} to ota partition".format(args.input))
def _switch_ota_partition(target, ota_id):
target.switch_ota_partition(ota_id)
def erase_ota_partition(args):
invoke_args = ["erase_partition"]
_invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args))
def _read_ota_partition(target, ota_id, output):
target.read_ota_partition(ota_id, output)
status("Read ota partition contents to file {}".format(output))
def _write_ota_partition(target, ota_id, input):
target.write_ota_partition(ota_id, input)
status("Written contents of file {} to ota partition".format(input))
def _erase_ota_partition(target, ota_id):
target.erase_ota_partition(ota_id)
status("Erased contents of ota partition")
@ -284,17 +254,20 @@ def main():
# There are two possible sources for the partition table: a device attached to the host
# or a partition table CSV/binary file. These sources are mutually exclusive.
partition_table_info_source_args = parser.add_mutually_exclusive_group()
parser.add_argument("--port", "-p", help="port where the device to read the partition table from is attached")
partition_table_info_source_args.add_argument("--port", "-p", help="port where the device to read the partition table from is attached", default="")
partition_table_info_source_args.add_argument("--partition-table-file", "-f", help="file (CSV/binary) to read the partition table from", default="")
parser.add_argument("--partition-table-offset", "-o", help="offset to read the partition table from", type=str)
parser.add_argument("--partition-table-offset", "-o", help="offset to read the partition table from", default="0x8000")
parser.add_argument("--partition-table-file", "-f", help="file (CSV/binary) to read the partition table from; \
overrides device attached to specified port as the partition table source when defined")
subparsers = parser.add_subparsers(dest="operation", help="run otatool -h for additional help")
spi_flash_sec_size = argparse.ArgumentParser(add_help=False)
spi_flash_sec_size.add_argument("--spi-flash-sec-size", help="value of SPI_FLASH_SEC_SIZE macro", type=str)
# Specify the supported operations
subparsers.add_parser("read_otadata", help="read otadata partition")
subparsers.add_parser("read_otadata", help="read otadata partition", parents=[spi_flash_sec_size])
subparsers.add_parser("erase_otadata", help="erase otadata partition")
slot_or_name_parser = argparse.ArgumentParser(add_help=False)
@ -302,7 +275,7 @@ def main():
slot_or_name_parser_args.add_argument("--slot", help="slot number of the ota partition", type=int)
slot_or_name_parser_args.add_argument("--name", help="name of the ota partition")
subparsers.add_parser("switch_otadata", help="switch otadata partition", parents=[slot_or_name_parser])
subparsers.add_parser("switch_ota_partition", help="switch otadata partition", parents=[slot_or_name_parser, spi_flash_sec_size])
read_ota_partition_subparser = subparsers.add_parser("read_ota_partition", help="read contents of an ota partition", parents=[slot_or_name_parser])
read_ota_partition_subparser.add_argument("--output", help="file to write the contents of the ota partition to")
@ -322,17 +295,69 @@ def main():
parser.print_help()
sys.exit(1)
# Else execute the operation
operation_func = globals()[args.operation]
target_args = {}
if args.port:
target_args["port"] = args.port
if args.partition_table_file:
target_args["partition_table_file"] = args.partition_table_file
if args.partition_table_offset:
target_args["partition_table_offset"] = int(args.partition_table_offset, 0)
try:
if args.spi_flash_sec_size:
target_args["spi_flash_sec_size"] = int(args.spi_flash_sec_size, 0)
except AttributeError:
pass
target = OtatoolTarget(**target_args)
# Create the operation table and execute the operation
common_args = {'target':target}
ota_id = []
try:
if args.name is not None:
ota_id = ["name"]
else:
if args.slot is not None:
ota_id = ["slot"]
except AttributeError:
pass
otatool_ops = {
'read_otadata':(_read_otadata, []),
'erase_otadata':(_erase_otadata, []),
'switch_ota_partition':(_switch_ota_partition, ota_id),
'read_ota_partition':(_read_ota_partition, ["output"] + ota_id),
'write_ota_partition':(_write_ota_partition, ["input"] + ota_id),
'erase_ota_partition':(_erase_ota_partition, ota_id)
}
(op, op_args) = otatool_ops[args.operation]
for op_arg in op_args:
common_args.update({op_arg:vars(args)[op_arg]})
try:
common_args['ota_id'] = common_args.pop('name')
except KeyError:
try:
common_args['ota_id'] = common_args.pop('slot')
except KeyError:
pass
if quiet:
# If exceptions occur, suppress and exit quietly
try:
operation_func(args)
op(**common_args)
except Exception:
sys.exit(2)
else:
operation_func(args)
op(**common_args)
if __name__ == '__main__':

View file

@ -199,6 +199,104 @@ Secure OTA Updates Without Secure boot
The verification of signed OTA updates can be performed even without enabling hardware secure boot. For doing so, refer :ref:`signed-app-verify`
OTA Tool (otatool.py)
---------------------
The component `app_update` provides a tool :component_file:`otatool.py<app_update/otatool.py>` for performing OTA partition-related operations on a target device. The following operations can be performed using the tool:
- read contents of otadata partition (read_otadata)
- erase otadata partition, effectively resetting device to factory app (erase_otadata)
- switch OTA partitions (switch_ota_partition)
- erasing OTA partition (erase_ota_partition)
- write to OTA partition (write_ota_partition)
- read contents of OTA partition (read_ota_partition)
The tool can either be imported and used from another Python script or invoked from shell script for users wanting to perform operation programmatically. This is facilitated by the tool's Python API
and command-line interface, respectively.
Python API
^^^^^^^^^^
Before anything else, make sure that the `otatool` module is imported.
.. code-block:: python
import sys
import os
idf_path = os.environ["IDF_PATH"] # get value of IDF_PATH from environment
otatool_dir = os.path.join(idf_path, "components", "app_update") # otatool.py lives in $IDF_PATH/components/app_update
sys.path.append(otatool_dir) # this enables Python to find otatool module
from otatool import * # import all names inside otatool module
The starting point for using the tool's Python API to do is create a `OtatoolTarget` object:
.. code-block:: python
# Create a partool.py target device connected on serial port /dev/ttyUSB1
target = OtatoolTarget("/dev/ttyUSB1")
The created object can now be used to perform operations on the target device:
.. code-block:: python
# Erase otadata, reseting the device to factory app
target.erase_otadata()
# Erase contents of OTA app slot 0
target.erase_ota_partition(0)
# Switch boot partition to that of app slot 1
target.switch_ota_partition(1)
# Read OTA partition 'ota_3' and save contents to a file named 'ota_3.bin'
target.read_ota_partition("ota_3", "ota_3.bin")
The OTA partition to operate on is specified using either the app slot number or the partition name.
More information on the Python API is available in the docstrings for the tool.
Command-line Interface
^^^^^^^^^^^^^^^^^^^^^^
The command-line interface of `otatool.py` has the following structure:
.. code-block:: bash
otatool.py [command-args] [subcommand] [subcommand-args]
- command-args - these are arguments that are needed for executing the main command (parttool.py), mostly pertaining to the target device
- subcommand - this is the operation to be performed
- subcommand-args - these are arguments that are specific to the chosen operation
.. code-block:: bash
# Erase otadata, resetting the device to factory app
otatool.py --port "/dev/ttyUSB1" erase_otadata
# Erase contents of OTA app slot 0
otatool.py --port "/dev/ttyUSB1" erase_ota_partition --slot 0
# Switch boot partition to that of app slot 1
otatool.py --port "/dev/ttyUSB1" switch_ota_partition --slot 1
# Read OTA partition 'ota_3' and save contents to a file named 'ota_3.bin'
otatool.py --port "/dev/ttyUSB1" read_ota_partition --name=ota_3
More information can be obtained by specifying `--help` as argument:
.. code-block:: bash
# Display possible subcommands and show main command argument descriptions
otatool.py --help
# Show descriptions for specific subcommand arguments
otatool.py [subcommand] --help
See also
--------