Merge branch 'bugfix/invoke_ota_operations_on_windows' into 'master'

Fix permission denied error on Windows for otatool, parttool

See merge request idf/esp-idf!4086
This commit is contained in:
Angus Gratton 2019-02-05 09:15:22 +08:00
commit 985e1c4c7f
2 changed files with 103 additions and 83 deletions

View file

@ -83,10 +83,18 @@ def _get_otadata_contents(args, check=True):
if not output: if not output:
raise RuntimeError("No ota_data partition found") raise RuntimeError("No ota_data partition found")
with tempfile.NamedTemporaryFile() as otadata_file: with tempfile.NamedTemporaryFile(delete=False) as f:
invoke_args = ["read_partition", "--output", otadata_file.name] f_name = f.name
try:
invoke_args = ["read_partition", "--output", f_name]
_invoke_parttool(invoke_args, args) _invoke_parttool(invoke_args, args)
return otadata_file.read() with open(f_name, "rb") as f:
contents = f.read()
finally:
os.unlink(f_name)
return contents
def _get_otadata_status(otadata_contents): def _get_otadata_status(otadata_contents):
@ -130,112 +138,116 @@ def switch_otadata(args):
sys.path.append(os.path.join(IDF_COMPONENTS_PATH, "partition_table")) sys.path.append(os.path.join(IDF_COMPONENTS_PATH, "partition_table"))
import gen_esp32part as gen import gen_esp32part as gen
def is_otadata_status_valid(status): with tempfile.NamedTemporaryFile(delete=False) as f:
seq = status.seq % (1 << 32) f_name = f.name
crc = hex(binascii.crc32(struct.pack("I", seq), 0xFFFFFFFF) % (1 << 32))
return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
status("Looking for ota app partitions...") try:
def is_otadata_status_valid(status):
seq = status.seq % (1 << 32)
crc = hex(binascii.crc32(struct.pack("I", seq), 0xFFFFFFFF) % (1 << 32))
return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
# In order to get the number of ota app partitions, we need the partition table status("Looking for ota app partitions...")
partition_table = None
with tempfile.NamedTemporaryFile() as partition_table_file: # In order to get the number of ota app partitions, we need the partition table
invoke_args = ["get_partition_info", "--table", partition_table_file.name] partition_table = None
invoke_args = ["get_partition_info", "--table", f_name]
_invoke_parttool(invoke_args, args) _invoke_parttool(invoke_args, args)
partition_table = partition_table_file.read() partition_table = open(f_name, "rb").read()
partition_table = gen.PartitionTable.from_binary(partition_table) partition_table = gen.PartitionTable.from_binary(partition_table)
ota_partitions = list() ota_partitions = list()
for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA): for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA):
ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table) ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table)
try:
ota_partitions.append(list(ota_partition)[0])
except IndexError:
break
ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
if not ota_partitions:
raise RuntimeError("No ota app partitions found")
status("Verifying partition to switch to exists...")
# Look for the app partition to switch to
ota_partition_next = None
try: try:
ota_partitions.append(list(ota_partition)[0]) if args.name:
ota_partition_next = filter(lambda p: p.name == args.name, ota_partitions)
else:
ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == args.slot, ota_partitions)
ota_partition_next = list(ota_partition_next)[0]
except IndexError: except IndexError:
break raise RuntimeError("Partition to switch to not found")
ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype) otadata_contents = _get_otadata_contents(args)
otadata_status = _get_otadata_status(otadata_contents)
if not ota_partitions: # Find the copy to base the computation for ota sequence number on
raise RuntimeError("No ota app partitions found") otadata_compute_base = -1
status("Verifying partition to switch to exists...") # Both are valid, take the max as computation base
if is_otadata_status_valid(otadata_status[0]) and is_otadata_status_valid(otadata_status[1]):
# Look for the app partition to switch to if otadata_status[0].seq >= otadata_status[1].seq:
ota_partition_next = None otadata_compute_base = 0
else:
try: otadata_compute_base = 1
if args.name: # Only one copy is valid, use that
ota_partition_next = filter(lambda p: p.name == args.name, ota_partitions) elif is_otadata_status_valid(otadata_status[0]):
else:
ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == args.slot, ota_partitions)
ota_partition_next = list(ota_partition_next)[0]
except IndexError:
raise RuntimeError("Partition to switch to not found")
otadata_contents = _get_otadata_contents(args)
otadata_status = _get_otadata_status(otadata_contents)
# Find the copy to base the computation for ota sequence number on
otadata_compute_base = -1
# Both are valid, take the max as computation base
if is_otadata_status_valid(otadata_status[0]) and is_otadata_status_valid(otadata_status[1]):
if otadata_status[0].seq >= otadata_status[1].seq:
otadata_compute_base = 0 otadata_compute_base = 0
else: elif is_otadata_status_valid(otadata_status[1]):
otadata_compute_base = 1 otadata_compute_base = 1
# Only one copy is valid, use that # Both are invalid (could be initial state - all 0xFF's)
elif is_otadata_status_valid(otadata_status[0]): else:
otadata_compute_base = 0 pass
elif is_otadata_status_valid(otadata_status[1]):
otadata_compute_base = 1
# Both are invalid (could be initial state - all 0xFF's)
else:
pass
ota_seq_next = 0 ota_seq_next = 0
ota_partitions_num = len(ota_partitions) ota_partitions_num = len(ota_partitions)
target_seq = (ota_partition_next.subtype & 0x0F) + 1 target_seq = (ota_partition_next.subtype & 0x0F) + 1
# Find the next ota sequence number # Find the next ota sequence number
if otadata_compute_base == 0 or otadata_compute_base == 1: if otadata_compute_base == 0 or otadata_compute_base == 1:
base_seq = otadata_status[otadata_compute_base].seq % (1 << 32) base_seq = otadata_status[otadata_compute_base].seq % (1 << 32)
i = 0 i = 0
while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num: while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
i += 1 i += 1
ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num
else: else:
ota_seq_next = target_seq ota_seq_next = target_seq
# Create binary data from computed values # Create binary data from computed values
ota_seq_next = struct.pack("I", ota_seq_next) ota_seq_next = struct.pack("I", ota_seq_next)
ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32) ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
ota_seq_crc_next = struct.pack("I", ota_seq_crc_next) ota_seq_crc_next = struct.pack("I", ota_seq_crc_next)
with tempfile.NamedTemporaryFile() as otadata_next_file: with open(f_name, "wb") as otadata_next_file:
start = (1 if otadata_compute_base == 0 else 0) * (SPI_FLASH_SEC_SIZE >> 1) start = (1 if otadata_compute_base == 0 else 0) * (SPI_FLASH_SEC_SIZE >> 1)
otadata_next_file.write(otadata_contents) otadata_next_file.write(otadata_contents)
otadata_next_file.seek(start) otadata_next_file.seek(start)
otadata_next_file.write(ota_seq_next) otadata_next_file.write(ota_seq_next)
otadata_next_file.seek(start + 28) otadata_next_file.seek(start + 28)
otadata_next_file.write(ota_seq_crc_next) otadata_next_file.write(ota_seq_crc_next)
otadata_next_file.flush() otadata_next_file.flush()
_invoke_parttool(["write_partition", "--input", otadata_next_file.name], args) _invoke_parttool(["write_partition", "--input", f_name], args)
status("Updated ota_data partition") status("Updated ota_data partition")
finally:
os.unlink(f_name)
def _get_partition_specifier(args): def _get_partition_specifier(args):

View file

@ -74,11 +74,19 @@ def _get_partition_table(args):
else: else:
port_info = (" on port " + args.port if args.port else "") port_info = (" on port " + args.port if args.port else "")
status("Reading partition table from device{}...".format(port_info)) status("Reading partition table from device{}...".format(port_info))
with tempfile.NamedTemporaryFile() as partition_table_file:
invoke_args = ["read_flash", str(gen.offset_part_table), str(gen.MAX_PARTITION_LENGTH), partition_table_file.name] f_name = None
with tempfile.NamedTemporaryFile(delete=False) as f:
f_name = f.name
try:
invoke_args = ["read_flash", str(gen.offset_part_table), str(gen.MAX_PARTITION_LENGTH), f_name]
_invoke_esptool(invoke_args, args) _invoke_esptool(invoke_args, args)
partition_table = gen.PartitionTable.from_binary(partition_table_file.read()) with open(f_name, "rb") as f:
status("Partition table read from device" + port_info) partition_table = gen.PartitionTable.from_binary(f.read())
status("Partition table read from device" + port_info)
finally:
os.unlink(f_name)
return partition_table return partition_table