diff --git a/components/nvs_flash/nvs_partition_generator/nvs_partition_gen.py b/components/nvs_flash/nvs_partition_generator/nvs_partition_gen.py index 47242302e..99ee53ff6 100755 --- a/components/nvs_flash/nvs_partition_generator/nvs_partition_gen.py +++ b/components/nvs_flash/nvs_partition_generator/nvs_partition_gen.py @@ -39,6 +39,8 @@ VERSION2_PRINT = "v2 - Multipage Blob Support Enabled" """ Class for standard NVS page structure """ + + class Page(object): PAGE_PARAMS = { "max_size": 4096, @@ -68,8 +70,8 @@ class Page(object): CHUNK_ANY = 0xFF ACTIVE = 0xFFFFFFFE FULL = 0xFFFFFFFC - VERSION1=0xFF - VERSION2=0xFE + VERSION1 = 0xFF + VERSION2 = 0xFE def __init__(self, page_num, is_rsrv_page=False): self.entry_num = 0 @@ -77,7 +79,7 @@ class Page(object): self.encr_key = None self.bitmap_array = array.array('B') self.version = Page.VERSION2 - self.page_buf = bytearray(b'\xff')*Page.PAGE_PARAMS["max_size"] + self.page_buf = bytearray(b'\xff') * Page.PAGE_PARAMS["max_size"] if not is_rsrv_page: self.bitmap_array = self.create_bitmap_array() self.set_header(page_num) @@ -86,7 +88,7 @@ class Page(object): global page_header # set page state to active - page_header= bytearray(b'\xff') *32 + page_header = bytearray(b'\xff') * 32 page_state_active_seq = Page.ACTIVE struct.pack_into('=0, "Page overflow!!" + assert tailroom >= 0, "Page overflow!!" # Split the binary data into two and store a chunk of available size onto curr page if tailroom < remaining_size: @@ -266,7 +259,7 @@ class Page(object): # Calculate no. of entries data chunk will require datachunk_rounded_size = (chunk_size + 31) & ~31 datachunk_entry_count = datachunk_rounded_size // 32 - datachunk_total_entry_count = datachunk_entry_count + 1 # +1 for the entry header + datachunk_total_entry_count = datachunk_entry_count + 1 # +1 for the entry header # Set Span entry_struct[2] = datachunk_total_entry_count @@ -276,7 +269,7 @@ class Page(object): entry_struct[3] = chunk_index # Set data chunk - data_chunk = data[offset:offset + chunk_size] + data_chunk = data[offset:offset + chunk_size] # Compute CRC of data chunk struct.pack_into(' Page.PAGE_PARAMS["max_old_blob_size"]: + if datalen > Page.PAGE_PARAMS["max_old_blob_size"]: + if version == Page.VERSION1: raise InputError("Version %s\n%s: Size exceeds max allowed length." % (VERSION1_PRINT,key)) - - if version == Page.VERSION2: - if encoding == "string": - if datalen > Page.PAGE_PARAMS["max_new_blob_size"]: + else: + if encoding == "string": raise InputError("Version %s\n%s: Size exceeds max allowed length." % (VERSION2_PRINT,key)) # Calculate no. of entries data will require rounded_size = (datalen + 31) & ~31 data_entry_count = rounded_size // 32 - total_entry_count = data_entry_count + 1 # +1 for the entry header + total_entry_count = data_entry_count + 1 # +1 for the entry header # Check if page is already full and new page is needed to be created right away - if version == Page.VERSION1: - if encoding in ["string", "hex2bin", "binary", "base64"]: - if (self.entry_num + total_entry_count) >= Page.PAGE_PARAMS["max_entries"]: - raise PageFullError() - else: - if encoding == "string": - if (self.entry_num + total_entry_count) >= Page.PAGE_PARAMS["max_entries"]: - raise PageFullError() + if self.entry_num >= Page.PAGE_PARAMS["max_entries"]: + raise PageFullError() + elif (self.entry_num + total_entry_count) >= Page.PAGE_PARAMS["max_entries"]: + if not (version == Page.VERSION2 and encoding in ["hex2bin", "binary", "base64"]): + raise PageFullError() # Entry header - entry_struct = bytearray(b'\xff')*32 + entry_struct = bytearray(b'\xff') * 32 # Set Namespace Index entry_struct[0] = ns_index # Set Span @@ -414,27 +399,25 @@ class Page(object): entry_struct[1] = Page.BLOB if version == Page.VERSION2 and (encoding in ["hex2bin", "binary", "base64"]): - entry_struct = self.write_varlen_binary_data(entry_struct,ns_index,key,data,\ - datalen,total_entry_count, encoding, nvs_obj) + entry_struct = self.write_varlen_binary_data(entry_struct,ns_index,key,data, + datalen,total_entry_count, encoding, nvs_obj) else: self.write_single_page_entry(entry_struct, data, datalen, data_entry_count, nvs_obj) - - """ Low-level function to write data of primitive type into page buffer. """ def write_primitive_data(self, key, data, encoding, ns_index,nvs_obj): # Check if entry exceeds max number of entries allowed per page if self.entry_num >= Page.PAGE_PARAMS["max_entries"]: raise PageFullError() - entry_struct = bytearray(b'\xff')*32 - entry_struct[0] = ns_index # namespace index - entry_struct[2] = 0x01 # Span + entry_struct = bytearray(b'\xff') * 32 + entry_struct[0] = ns_index # namespace index + entry_struct[2] = 0x01 # Span chunk_index = Page.CHUNK_ANY entry_struct[3] = chunk_index # write key - key_array = b'\x00' *16 + key_array = b'\x00' * 16 entry_struct[8:24] = key_array entry_struct[8:8 + len(key)] = key.encode() @@ -469,9 +452,13 @@ class Page(object): def get_data(self): return self.page_buf + """ -NVS class encapsulates all NVS specific operations to create a binary with given key-value pairs. Binary can later be flashed onto device via a flashing utility. +NVS class encapsulates all NVS specific operations to create a binary with given key-value pairs. +Binary can later be flashed onto device via a flashing utility. """ + + class NVS(object): def __init__(self, fout, input_size): self.size = input_size @@ -485,11 +472,11 @@ class NVS(object): return self def __exit__(self, exc_type, exc_value, traceback): - if exc_type == None and exc_value == None: + if exc_type is None and exc_value is None: # Create pages for remaining available size while True: try: - new_page = self.create_new_page() + self.create_new_page() except InsufficientSizeError: self.size = None # Creating the last reserved page @@ -577,13 +564,15 @@ class NVS(object): data += page.get_data() return data + class PageFullError(RuntimeError): """ Represents error when current page doesn't have sufficient entries left to accommodate current request """ def __init__(self): - super(PageFullError, self).__init__() + super(PageFullError, self).__init__() + class InputError(RuntimeError): """ @@ -592,6 +581,7 @@ class InputError(RuntimeError): def __init__(self, e): super(InputError, self).__init__(e) + class InsufficientSizeError(RuntimeError): """ Represents error when NVS Partition size given is insufficient @@ -600,6 +590,7 @@ class InsufficientSizeError(RuntimeError): def __init__(self, e): super(InsufficientSizeError, self).__init__(e) + def nvs_open(result_obj, input_size): """ Wrapper to create and NVS class object. This object can later be used to set key-value pairs @@ -609,6 +600,7 @@ def nvs_open(result_obj, input_size): """ return NVS(result_obj, input_size) + def write_entry(nvs_instance, key, datatype, encoding, value): """ Wrapper to set key-value pair in NVS format @@ -622,7 +614,7 @@ def write_entry(nvs_instance, key, datatype, encoding, value): if datatype == "file": abs_file_path = value - if os.path.isabs(value) == False: + if os.path.isabs(value) is False: script_dir = os.path.dirname(__file__) abs_file_path = os.path.join(script_dir, value) @@ -634,6 +626,7 @@ def write_entry(nvs_instance, key, datatype, encoding, value): else: nvs_instance.write_entry(key, value, encoding) + def nvs_close(nvs_instance): """ Wrapper to finish writing to NVS and write data to file/stream object provided to nvs_open method @@ -643,8 +636,8 @@ def nvs_close(nvs_instance): nvs_instance.__exit__(None, None, None) -def check_input_args(input_filename=None, output_filename=None, input_part_size=None, is_key_gen=None,\ -encrypt_mode=None, key_file=None, version_no=None, print_arg_str=None, print_encrypt_arg_str=None): +def check_input_args(input_filename=None, output_filename=None, input_part_size=None, is_key_gen=None, + encrypt_mode=None, key_file=None, version_no=None, print_arg_str=None, print_encrypt_arg_str=None): global version, is_encrypt_data, input_size, key_gen @@ -658,7 +651,6 @@ encrypt_mode=None, key_file=None, version_no=None, print_arg_str=None, print_enc elif is_encrypt_data.lower() == 'false': is_encrypt_data = False - if version == 'v1': version = Page.VERSION1 elif version == 'v2': @@ -669,7 +661,6 @@ encrypt_mode=None, key_file=None, version_no=None, print_arg_str=None, print_enc elif key_gen.lower() == 'false': key_gen = False - if key_gen: if all(arg is not None for arg in [input_filename, output_filename, input_size]): if not is_encrypt_data: @@ -681,7 +672,6 @@ encrypt_mode=None, key_file=None, version_no=None, print_arg_str=None, print_enc if not all(arg is not None for arg in [input_filename, output_filename]): sys.exit(print_arg_str) - if is_encrypt_data and not key_gen and not key_file: sys.exit(print_encrypt_arg_str) @@ -695,7 +685,7 @@ encrypt_mode=None, key_file=None, version_no=None, print_arg_str=None, print_enc # Set size input_size = int(input_size, 0) - if input_size % 4096 !=0: + if input_size % 4096 != 0: sys.exit("Size of partition must be multiple of 4096") # Update size as a page needs to be reserved of size 4KB @@ -705,8 +695,6 @@ encrypt_mode=None, key_file=None, version_no=None, print_arg_str=None, print_enc sys.exit("Minimum NVS partition size needed is 0x3000 bytes.") - - def nvs_part_gen(input_filename=None, output_filename=None, input_part_size=None, is_key_gen=None, encrypt_mode=None, key_file=None, version_no=None): """ Wrapper to generate nvs partition binary @@ -749,9 +737,8 @@ def nvs_part_gen(input_filename=None, output_filename=None, input_part_size=None input_file.close() output_file.close() - if key_gen: - keys_page_buf = bytearray(b'\xff')*Page.PAGE_PARAMS["max_size"] + keys_page_buf = bytearray(b'\xff') * Page.PAGE_PARAMS["max_size"] key_bytes = bytearray() if len(key_input) == key_len_needed: key_bytes = key_input @@ -773,44 +760,44 @@ def main(): parser = argparse.ArgumentParser(description="ESP32 NVS partition generation utility") nvs_part_gen_group = parser.add_argument_group('To generate NVS partition') nvs_part_gen_group.add_argument( - "--input", - help="Path to CSV file to parse.", - default=None) + "--input", + help="Path to CSV file to parse.", + default=None) nvs_part_gen_group.add_argument( - "--output", - help='Path to output converted binary file.', - default=None) + "--output", + help='Path to output converted binary file.', + default=None) nvs_part_gen_group.add_argument( - "--size", - help='Size of NVS Partition in bytes (must be multiple of 4096)') + "--size", + help='Size of NVS Partition in bytes (must be multiple of 4096)') nvs_part_gen_group.add_argument( - "--version", - help='Set version. Default: v2', - choices=['v1','v2'], - default='v2', - type=str.lower) + "--version", + help='Set version. Default: v2', + choices=['v1','v2'], + default='v2', + type=str.lower) - keygen_action=nvs_part_gen_group.add_argument( - "--keygen", - help='Generate keys for encryption. Creates an `encryption_keys.bin` file. Default: false', - choices=['true','false'], - default= 'false', - type=str.lower) + keygen_action = nvs_part_gen_group.add_argument( + "--keygen", + help='Generate keys for encryption. Creates an `encryption_keys.bin` file. Default: false', + choices=['true','false'], + default='false', + type=str.lower) nvs_part_gen_group.add_argument( - "--encrypt", - help='Set encryption mode. Default: false', - choices=['true','false'], - default='false', - type=str.lower) + "--encrypt", + help='Set encryption mode. Default: false', + choices=['true','false'], + default='false', + type=str.lower) nvs_part_gen_group.add_argument( - "--keyfile", - help='File having key for encryption (Applicable only if encryption mode is true)', - default = None) + "--keyfile", + help='File having key for encryption (Applicable only if encryption mode is true)', + default=None) key_gen_group = parser.add_argument_group('To generate encryption keys') key_gen_group._group_actions.append(keygen_action) @@ -824,13 +811,13 @@ def main(): is_encrypt_data = args.encrypt key_file = args.keyfile - print_arg_str = "Invalid.\nTo generate nvs partition binary --input, --output and --size arguments are mandatory.\nTo generate encryption keys --keygen argument is mandatory." + print_arg_str = "Invalid.\nTo generate nvs partition binary --input, --output and --size arguments are mandatory.\n \ + To generate encryption keys --keygen argument is mandatory." print_encrypt_arg_str = "Missing parameter. Enter --keyfile or --keygen." check_input_args(input_filename,output_filename, part_size, is_key_gen, is_encrypt_data, key_file, version_no, print_arg_str, print_encrypt_arg_str) nvs_part_gen(input_filename, output_filename, part_size, is_key_gen, is_encrypt_data, key_file, version_no) - if __name__ == "__main__": main() diff --git a/tools/ci/setup_python.sh b/tools/ci/setup_python.sh index 296d00735..c134ba816 100644 --- a/tools/ci/setup_python.sh +++ b/tools/ci/setup_python.sh @@ -2,8 +2,10 @@ # Regexp for matching job names which are incompatible with Python 3 # - assign_test, nvs_compatible_test, IT - auto_test_script causes the incompatibility -# - UT_009_ - RS485 multi-device test is not started properly -py3_incomp='assign_test|nvs_compatible_test|IT|UT_009_' +# - UT_009_ - multi-device tests are not compatible +# - UT_014_ - multi-device tests are not compatible +# - UT_017_ - multi-device tests are not compatible +py3_incomp='assign_test|nvs_compatible_test|IT|UT_009_|UT_013_|UT_014_|UT_017_' if [ -z ${PYTHON_VER+x} ] || [[ $CI_JOB_NAME =~ $py3_incomp ]]; then # Use this version of the Python interpreter if it was not defined before or