#!/usr/bin/env python # # esp-idf NVS partition generation tool. Tool helps in generating NVS-compatible # partition binary, with key-value pair entries provided via a CSV file. # # Copyright 2018 Espressif Systems (Shanghai) PTE LTD # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from __future__ import division, print_function from builtins import int, range, bytes from io import open import sys import argparse import binascii import random import struct import os import array import csv import zlib import codecs import datetime import distutils.dir_util try: from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend except ImportError: print('The cryptography package is not installed.' 'Please refer to the Get Started section of the ESP-IDF Programming Guide for ' 'setting up the required packages.') raise VERSION1_PRINT = "V1 - Multipage Blob Support Disabled" VERSION2_PRINT = "V2 - Multipage Blob Support Enabled" def reverse_hexbytes(addr_tmp): addr = [] reversed_bytes = "" for i in range(0, len(addr_tmp), 2): addr.append(addr_tmp[i:i + 2]) reversed_bytes = "".join(reversed(addr)) return reversed_bytes """ Class for standard NVS page structure """ class Page(object): PAGE_PARAMS = { "max_size": 4096, "max_old_blob_size": 1984, "max_new_blob_size": 4000, "max_entries": 126 } # Item type codes U8 = 0x01 I8 = 0x11 U16 = 0x02 I16 = 0x12 U32 = 0x04 I32 = 0x14 SZ = 0x21 BLOB = 0x41 BLOB_DATA = 0x42 BLOB_IDX = 0x48 # Few Page constants HEADER_SIZE = 32 BITMAPARRAY_OFFSET = 32 BITMAPARRAY_SIZE_IN_BYTES = 32 FIRST_ENTRY_OFFSET = 64 SINGLE_ENTRY_SIZE = 32 CHUNK_ANY = 0xFF ACTIVE = 0xFFFFFFFE FULL = 0xFFFFFFFC VERSION1 = 0xFF VERSION2 = 0xFE def __init__(self, page_num, version, is_rsrv_page=False): self.entry_num = 0 self.bitmap_array = array.array('B') self.version = version self.page_buf = bytearray(b'\xff') * Page.PAGE_PARAMS["max_size"] if not is_rsrv_page: self.bitmap_array = self.create_bitmap_array() self.set_header(page_num, version) def set_header(self, page_num, version): # set page state to active page_header = bytearray(b'\xff') * 32 page_state_active_seq = Page.ACTIVE struct.pack_into(' 2: if not addr_len % 2: addr_tmp = addr else: addr_tmp = init_tweak_val + addr tweak_tmp = reverse_hexbytes(addr_tmp) tweak_val = tweak_tmp + (init_tweak_val * (tweak_len_needed - (len(tweak_tmp)))) else: tweak_val = addr + (init_tweak_val * (tweak_len_needed - len(addr))) # Encrypt data data_bytes = data_input[start_idx:end_idx] if type(data_bytes) == bytes: data_bytes = data_bytes.decode() data_val = data_bytes + (init_data_val * (data_len_needed - len(data_bytes))) encr_data_ret = self.encrypt_entry(data_val, tweak_val, encr_key_input) encr_data_to_write = encr_data_to_write + encr_data_ret # Update values for encrypting next set of data bytes start_idx = end_idx end_idx = start_idx + 64 entry_no += 1 return encr_data_to_write def write_entry_to_buf(self, data, entrycount,nvs_obj): encr_data = bytearray() if nvs_obj.encrypt: encr_data_ret = self.encrypt_data(data, entrycount,nvs_obj) encr_data[0:len(encr_data_ret)] = encr_data_ret data = encr_data data_offset = Page.FIRST_ENTRY_OFFSET + (Page.SINGLE_ENTRY_SIZE * self.entry_num) start_idx = data_offset end_idx = data_offset + len(data) self.page_buf[start_idx:end_idx] = data # Set bitmap array for entries in current page for i in range(0, entrycount): self.write_bitmaparray() self.entry_num += 1 def set_crc_header(self, entry_struct): crc_data = bytearray(b'28') crc_data[0:4] = entry_struct[0:4] crc_data[4:28] = entry_struct[8:32] crc_data = bytes(crc_data) crc = zlib.crc32(crc_data, 0xFFFFFFFF) struct.pack_into('= 0, "Page overflow!!" # Split the binary data into two and store a chunk of available size onto curr page if tailroom < remaining_size: chunk_size = tailroom else: chunk_size = remaining_size remaining_size = remaining_size - chunk_size # Change type of data to BLOB_DATA entry_struct[1] = Page.BLOB_DATA # Calculate no. of entries data chunk will require datachunk_rounded_size = (chunk_size + 31) & ~31 datachunk_entry_count = datachunk_rounded_size // 32 datachunk_total_entry_count = datachunk_entry_count + 1 # +1 for the entry header # Set Span entry_struct[2] = datachunk_total_entry_count # Update the chunkIndex chunk_index = chunk_start + chunk_count entry_struct[3] = chunk_index # Set data chunk data_chunk = data[offset:offset + chunk_size] # Compute CRC of data chunk struct.pack_into(' Page.PAGE_PARAMS["max_old_blob_size"]: if self.version == Page.VERSION1: raise InputError(" Input File: Size exceeds max allowed length `%s` bytes for key `%s`." % (Page.PAGE_PARAMS["max_old_blob_size"], key)) else: if encoding == "string": raise InputError(" Input File: Size exceeds max allowed length `%s` bytes for key `%s`." % (Page.PAGE_PARAMS["max_old_blob_size"], key)) # Calculate no. of entries data will require rounded_size = (datalen + 31) & ~31 data_entry_count = rounded_size // 32 total_entry_count = data_entry_count + 1 # +1 for the entry header # Check if page is already full and new page is needed to be created right away if self.entry_num >= Page.PAGE_PARAMS["max_entries"]: raise PageFullError() elif (self.entry_num + total_entry_count) >= Page.PAGE_PARAMS["max_entries"]: if not (self.version == Page.VERSION2 and encoding in ["hex2bin", "binary", "base64"]): raise PageFullError() # Entry header entry_struct = bytearray(b'\xff') * 32 # Set Namespace Index entry_struct[0] = ns_index # Set Span if self.version == Page.VERSION2: if encoding == "string": entry_struct[2] = data_entry_count + 1 # Set Chunk Index chunk_index = Page.CHUNK_ANY entry_struct[3] = chunk_index else: entry_struct[2] = data_entry_count + 1 # set key key_array = b'\x00' * 16 entry_struct[8:24] = key_array entry_struct[8:8 + len(key)] = key.encode() # set Type if encoding == "string": entry_struct[1] = Page.SZ elif encoding in ["hex2bin", "binary", "base64"]: entry_struct[1] = Page.BLOB if self.version == Page.VERSION2 and (encoding in ["hex2bin", "binary", "base64"]): entry_struct = self.write_varlen_binary_data(entry_struct,ns_index,key,data, datalen,total_entry_count, encoding, nvs_obj) else: self.write_single_page_entry(entry_struct, data, datalen, data_entry_count, nvs_obj) """ Low-level function to write data of primitive type into page buffer. """ def write_primitive_data(self, key, data, encoding, ns_index,nvs_obj): # Check if entry exceeds max number of entries allowed per page if self.entry_num >= Page.PAGE_PARAMS["max_entries"]: raise PageFullError() entry_struct = bytearray(b'\xff') * 32 entry_struct[0] = ns_index # namespace index entry_struct[2] = 0x01 # Span chunk_index = Page.CHUNK_ANY entry_struct[3] = chunk_index # write key key_array = b'\x00' * 16 entry_struct[8:24] = key_array entry_struct[8:8 + len(key)] = key.encode() if encoding == "u8": entry_struct[1] = Page.U8 struct.pack_into('/ :param outdir: Target output dir to store files :param filepath: Path of target file ''' bin_ext = '.bin' # Expand if tilde(~) provided in path outdir = os.path.expanduser(outdir) if filepath: key_file_name, ext = os.path.splitext(filepath) if not ext: filepath = key_file_name + bin_ext elif bin_ext not in ext: sys.exit('Error: `%s`. Only `%s` extension allowed.' % (filepath, bin_ext)) # Create dir if does not exist if not (os.path.isdir(outdir)): distutils.dir_util.mkpath(outdir) filedir, filename = os.path.split(filepath) filedir = os.path.join(outdir,filedir,'') if filedir and not os.path.isdir(filedir): distutils.dir_util.mkpath(filedir) if os.path.isabs(filepath): if not outdir == os.getcwd(): print("\nWarning: `%s` \n\t==> absolute path given so outdir is ignored for this file." % filepath) # Set to empty as outdir is ignored here outdir = '' # Set full path - outdir + filename filepath = os.path.join(outdir, '') + filepath return outdir, filepath def encrypt(args): ''' Generate encrypted NVS Partition :param args: Command line arguments given ''' key = None bin_ext = '.bin' check_size(args.size) if (args.keygen is False) and (not args.inputkey): sys.exit("Error. --keygen or --inputkey argument needed.") elif args.keygen and args.inputkey: sys.exit("Error. --keygen and --inputkey both are not allowed.") elif not args.keygen and args.keyfile: print("\nWarning:","--inputkey argument is given. --keyfile argument will be ignored...") if args.inputkey: # Check if key file has .bin extension filename, ext = os.path.splitext(args.inputkey) if bin_ext not in ext: sys.exit('Error: `%s`. Only `%s` extension allowed.' % (args.inputkey, bin_ext)) key = bytearray() with open(args.inputkey, 'rb') as key_f: key = key_f.read(64) # Generate encrypted NVS Partition generate(args, is_encr_enabled=True, encr_key=key) def decrypt_data(data_input, decr_key, page_num, entry_no, entry_size): ''' Decrypt NVS data entry ''' page_max_size = 4096 first_entry_offset = 64 init_tweak_val = '0' tweak_len_needed = 32 # in hex tweak_tmp = '' data_input = binascii.hexlify(data_input) rel_addr = page_num * page_max_size + first_entry_offset # Set tweak value offset = entry_no * entry_size addr = hex(rel_addr + offset)[2:] addr_len = len(addr) if addr_len > 2: if not addr_len % 2: addr_tmp = addr else: addr_tmp = init_tweak_val + addr tweak_tmp = reverse_hexbytes(addr_tmp) tweak_val = tweak_tmp + (init_tweak_val * (tweak_len_needed - (len(tweak_tmp)))) else: tweak_val = addr + (init_tweak_val * (tweak_len_needed - len(addr))) if type(data_input) == bytes: data_input = data_input.decode() # Decrypt 32 bytes of data using AES-XTS decryption backend = default_backend() plain_text = codecs.decode(data_input, 'hex') tweak = codecs.decode(tweak_val, 'hex') cipher = Cipher(algorithms.AES(decr_key), modes.XTS(tweak), backend=backend) decryptor = cipher.decryptor() decrypted_data = decryptor.update(plain_text) return decrypted_data def decrypt(args): ''' Decrypt encrypted NVS Partition :param args: Command line arguments given ''' bin_ext = '.bin' nvs_read_bytes = 32 decrypted_entry_no = 0 file_entry_no = 0 page_num = 0 page_max_size = 4096 start_entry_offset = 0 empty_data_entry = bytearray('\xff') * 32 # Check if key file has .bin extension input_files = [args.input, args.key, args.output] for filepath in input_files: filename, ext = os.path.splitext(filepath) if bin_ext not in ext: sys.exit('Error: `%s`. Only `%s` extension allowed.' % (filepath, bin_ext)) with open(args.key,'rb') as decr_key_file: decr_key = decr_key_file.read(64) args.outdir, args.output = set_target_filepath(args.outdir, args.output) output_buf = bytearray(b'\xff') with open(args.input, 'rb') as input_file, open(args.output,'wb') as output_file: while True: if file_entry_no == 128: decrypted_entry_no = 0 file_entry_no = 0 page_num += 1 data_entry = input_file.read(nvs_read_bytes) if not data_entry: break if data_entry != empty_data_entry and file_entry_no not in [0,1]: data_entry = decrypt_data(data_entry, decr_key, page_num, decrypted_entry_no, nvs_read_bytes) decrypted_entry_no += 1 write_entry_no = ((page_num * page_max_size) + file_entry_no) start_idx = start_entry_offset + (write_entry_no * nvs_read_bytes) end_idx = nvs_read_bytes output_buf[start_idx:end_idx] = data_entry file_entry_no += 1 start_entry_offset += nvs_read_bytes output_file.write(output_buf) print("\nCreated NVS decrypted binary: ===>", args.output) def generate_key(args): ''' Generate encryption keys :param args: Command line arguments given ''' page_max_size = 4096 keys_dir = 'keys' output_keyfile = None bin_ext = '.bin' if not args.keyfile: timestamp = datetime.datetime.now().strftime('%m-%d_%H-%M') args.keyfile = "keys-" + timestamp + bin_ext keys_outdir = os.path.join(args.outdir,keys_dir, '') # Create keys/ dir in if does not exist if not (os.path.isdir(keys_outdir)): distutils.dir_util.mkpath(keys_outdir) keys_outdir, output_keyfile = set_target_filepath(keys_outdir, args.keyfile) key = ''.join(random.choice('0123456789abcdef') for _ in range(128)).strip() encr_key_bytes = codecs.decode(key, 'hex') key_len = len(encr_key_bytes) keys_buf = bytearray(b'\xff') * page_max_size keys_buf[0:key_len] = encr_key_bytes crc_data = keys_buf[0:key_len] crc_data = bytes(crc_data) crc = zlib.crc32(crc_data, 0xFFFFFFFF) struct.pack_into(' ", output_keyfile) return key def generate(args, is_encr_enabled=False, encr_key=None): ''' Generate NVS Partition :param args: Command line arguments given :param is_encr_enabled: Encryption enabled/disabled :param encr_key: Key to encrypt NVS partition ''' is_dir_new = False bin_ext = '.bin' input_size = check_size(args.size) if args.version == 1: args.version = Page.VERSION1 elif args.version == 2: args.version = Page.VERSION2 # Check if key file has .bin extension filename, ext = os.path.splitext(args.output) if bin_ext not in ext: sys.exit('Error: `%s`. Only `.bin` extension allowed.' % args.output) args.outdir, args.output = set_target_filepath(args.outdir, args.output) if is_encr_enabled and not encr_key: encr_key = generate_key(args) input_file = open(args.input, 'rt', encoding='utf8') output_file = open(args.output, 'wb') with open(args.input, 'rt', encoding='utf8') as input_file,\ open(args.output, 'wb') as output_file,\ nvs_open(output_file, input_size, args.version, is_encrypt=is_encr_enabled, key=encr_key) as nvs_obj: # Comments are skipped reader = csv.DictReader(filter(lambda row: row[0] != '#',input_file), delimiter=',') if nvs_obj.version == Page.VERSION1: version_set = VERSION1_PRINT else: version_set = VERSION2_PRINT print("\nCreating NVS binary with version:", version_set) for row in reader: try: # Check key length if len(row["key"]) > 15: raise InputError("Length of key `%s` should be <= 15 characters." % row["key"]) write_entry(nvs_obj, row["key"], row["type"], row["encoding"], row["value"]) except InputError as e: print(e) filedir, filename = os.path.split(args.output) if filename: print("\nWarning: NVS binary not created...") os.remove(args.output) if is_dir_new and not filedir == os.getcwd(): print("\nWarning: Output dir not created...") os.rmdir(filedir) sys.exit(-2) print("\nCreated NVS binary: ===>", args.output) def main(): parser = argparse.ArgumentParser(description="\nESP NVS partition generation utility", formatter_class=argparse.RawTextHelpFormatter) subparser = parser.add_subparsers(title='Commands', dest='command', help='\nRun nvs_partition_gen.py {command} -h for additional help\n\n') parser_gen = subparser.add_parser('generate', help='Generate NVS partition', formatter_class=argparse.RawTextHelpFormatter) parser_gen.set_defaults(func=generate) parser_gen.add_argument('input', default=None, help='Path to CSV file to parse') parser_gen.add_argument('output', default=None, help='Path to output NVS binary file') parser_gen.add_argument('size', default=None, help='Size of NVS partition in bytes\ \n(must be multiple of 4096)') parser_gen.add_argument('--version', choices=[1,2], default=2, type=int, help='''Set multipage blob version.\ \nVersion 1 - Multipage blob support disabled.\ \nVersion 2 - Multipage blob support enabled.\ \nDefault: Version 2''') parser_gen.add_argument('--outdir', default=os.getcwd(), help='Output directory to store files created\ \n(Default: current directory)') parser_gen_key = subparser.add_parser('generate-key', help='Generate keys for encryption', formatter_class=argparse.RawTextHelpFormatter) parser_gen_key.set_defaults(func=generate_key) parser_gen_key.add_argument('--keyfile', default=None, help='Path to output encryption keys file') parser_gen_key.add_argument('--outdir', default=os.getcwd(), help='Output directory to store files created.\ \n(Default: current directory)') parser_encr = subparser.add_parser('encrypt', help='Generate NVS encrypted partition', formatter_class=argparse.RawTextHelpFormatter) parser_encr.set_defaults(func=encrypt) parser_encr.add_argument('input', default=None, help='Path to CSV file to parse') parser_encr.add_argument('output', default=None, help='Path to output NVS binary file') parser_encr.add_argument('size', default=None, help='Size of NVS partition in bytes\ \n(must be multiple of 4096)') parser_encr.add_argument('--version', choices=[1,2], default=2, type=int, help='''Set multipage blob version.\ \nVersion 1 - Multipage blob support disabled.\ \nVersion 2 - Multipage blob support enabled.\ \nDefault: Version 2''') parser_encr.add_argument('--keygen', action="store_true", default=False, help='Generates key for encrypting NVS partition') parser_encr.add_argument('--keyfile', default=None, help='Path to output encryption keys file') parser_encr.add_argument('--inputkey', default=None, help='File having key for encrypting NVS partition') parser_encr.add_argument('--outdir', default=os.getcwd(), help='Output directory to store files created.\ \n(Default: current directory)') parser_decr = subparser.add_parser('decrypt', help='Decrypt NVS encrypted partition', formatter_class=argparse.RawTextHelpFormatter) parser_decr.set_defaults(func=decrypt) parser_decr.add_argument('input', default=None, help='Path to encrypted NVS partition file to parse') parser_decr.add_argument('key', default=None, help='Path to file having keys for decryption') parser_decr.add_argument('output', default=None, help='Path to output decrypted binary file') parser_decr.add_argument('--outdir', default=os.getcwd(), help='Output directory to store files created.\ \n(Default: current directory)') args = parser.parse_args() args.func(args) if __name__ == "__main__": main()