#!/usr/bin/env python # # esp-idf NVS partition generation tool. Tool helps in generating NVS-compatible # partition binary, with key-value pair entries provided via a CSV file. # # Copyright 2018 Espressif Systems (Shanghai) PTE LTD # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from __future__ import division, print_function from builtins import int, range, bytes from io import open import sys import argparse import binascii import random import struct import os import array import csv import zlib import codecs import datetime import distutils.dir_util try: from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend except ImportError: print('The cryptography package is not installed.' 'Please refer to the Get Started section of the ESP-IDF Programming Guide for ' 'setting up the required packages.') raise VERSION1_PRINT = "v1 - Multipage Blob Support Disabled" VERSION2_PRINT = "v2 - Multipage Blob Support Enabled" """ Class for standard NVS page structure """ class Page(object): PAGE_PARAMS = { "max_size": 4096, "max_old_blob_size": 1984, "max_new_blob_size": 4000, "max_entries": 126 } # Item type codes U8 = 0x01 I8 = 0x11 U16 = 0x02 I16 = 0x12 U32 = 0x04 I32 = 0x14 SZ = 0x21 BLOB = 0x41 BLOB_DATA = 0x42 BLOB_IDX = 0x48 # Few Page constants HEADER_SIZE = 32 BITMAPARRAY_OFFSET = 32 BITMAPARRAY_SIZE_IN_BYTES = 32 FIRST_ENTRY_OFFSET = 64 SINGLE_ENTRY_SIZE = 32 CHUNK_ANY = 0xFF ACTIVE = 0xFFFFFFFE FULL = 0xFFFFFFFC VERSION1 = 0xFF VERSION2 = 0xFE def __init__(self, page_num, is_rsrv_page=False): self.entry_num = 0 self.is_encrypt = False self.encr_key = None self.bitmap_array = array.array('B') self.version = Page.VERSION2 self.page_buf = bytearray(b'\xff') * Page.PAGE_PARAMS["max_size"] if not is_rsrv_page: self.bitmap_array = self.create_bitmap_array() self.set_header(page_num) def set_header(self, page_num): global page_header # set page state to active page_header = bytearray(b'\xff') * 32 page_state_active_seq = Page.ACTIVE struct.pack_into(' 2: if not addr_len % 2: addr_tmp = addr tweak_tmp = self.reverse_hexbytes(addr_tmp) tweak_val = tweak_tmp + (init_tweak_val * (tweak_len_needed - (len(tweak_tmp)))) else: addr_tmp = init_tweak_val + addr tweak_tmp = self.reverse_hexbytes(addr_tmp) tweak_val = tweak_tmp + (init_tweak_val * (tweak_len_needed - (len(tweak_tmp)))) else: tweak_val = addr + (init_tweak_val * (tweak_len_needed - len(addr))) # Encrypt data data_bytes = data_input[start_idx:end_idx] if type(data_bytes) == bytes: data_bytes = data_bytes.decode() data_val = data_bytes + (init_data_val * (data_len_needed - len(data_bytes))) encr_data_ret = self.encrypt_entry(data_val, tweak_val, encr_key_input) encr_data_to_write = encr_data_to_write + encr_data_ret # Update values for encrypting next set of data bytes start_idx = end_idx end_idx = start_idx + 64 entry_no += 1 return encr_data_to_write def write_entry_to_buf(self, data, entrycount,nvs_obj): encr_data = bytearray() if self.is_encrypt: encr_data_ret = self.encrypt_data(data, entrycount,nvs_obj) encr_data[0:len(encr_data_ret)] = encr_data_ret data = encr_data data_offset = Page.FIRST_ENTRY_OFFSET + (Page.SINGLE_ENTRY_SIZE * self.entry_num) start_idx = data_offset end_idx = data_offset + len(data) self.page_buf[start_idx:end_idx] = data # Set bitmap array for entries in current page for i in range(0, entrycount): self.write_bitmaparray() self.entry_num += 1 def set_crc_header(self, entry_struct): crc_data = bytearray(b'28') crc_data[0:4] = entry_struct[0:4] crc_data[4:28] = entry_struct[8:32] crc_data = bytes(crc_data) crc = zlib.crc32(crc_data, 0xFFFFFFFF) struct.pack_into('= 0, "Page overflow!!" # Split the binary data into two and store a chunk of available size onto curr page if tailroom < remaining_size: chunk_size = tailroom else: chunk_size = remaining_size remaining_size = remaining_size - chunk_size # Change type of data to BLOB_DATA entry_struct[1] = Page.BLOB_DATA # Calculate no. of entries data chunk will require datachunk_rounded_size = (chunk_size + 31) & ~31 datachunk_entry_count = datachunk_rounded_size // 32 datachunk_total_entry_count = datachunk_entry_count + 1 # +1 for the entry header # Set Span entry_struct[2] = datachunk_total_entry_count # Update the chunkIndex chunk_index = chunk_start + chunk_count entry_struct[3] = chunk_index # Set data chunk data_chunk = data[offset:offset + chunk_size] # Compute CRC of data chunk struct.pack_into(' Page.PAGE_PARAMS["max_old_blob_size"]: if version == Page.VERSION1: raise InputError("Version %s\n%s: Size exceeds max allowed length." % (VERSION1_PRINT,key)) else: if encoding == "string": raise InputError("Version %s\n%s: Size exceeds max allowed length." % (VERSION2_PRINT,key)) # Calculate no. of entries data will require rounded_size = (datalen + 31) & ~31 data_entry_count = rounded_size // 32 total_entry_count = data_entry_count + 1 # +1 for the entry header # Check if page is already full and new page is needed to be created right away if self.entry_num >= Page.PAGE_PARAMS["max_entries"]: raise PageFullError() elif (self.entry_num + total_entry_count) >= Page.PAGE_PARAMS["max_entries"]: if not (version == Page.VERSION2 and encoding in ["hex2bin", "binary", "base64"]): raise PageFullError() # Entry header entry_struct = bytearray(b'\xff') * 32 # Set Namespace Index entry_struct[0] = ns_index # Set Span if version == Page.VERSION2: if encoding == "string": entry_struct[2] = data_entry_count + 1 # Set Chunk Index chunk_index = Page.CHUNK_ANY entry_struct[3] = chunk_index else: entry_struct[2] = data_entry_count + 1 # set key key_array = b'\x00' * 16 entry_struct[8:24] = key_array entry_struct[8:8 + len(key)] = key.encode() # set Type if encoding == "string": entry_struct[1] = Page.SZ elif encoding in ["hex2bin", "binary", "base64"]: entry_struct[1] = Page.BLOB if version == Page.VERSION2 and (encoding in ["hex2bin", "binary", "base64"]): entry_struct = self.write_varlen_binary_data(entry_struct,ns_index,key,data, datalen,total_entry_count, encoding, nvs_obj) else: self.write_single_page_entry(entry_struct, data, datalen, data_entry_count, nvs_obj) """ Low-level function to write data of primitive type into page buffer. """ def write_primitive_data(self, key, data, encoding, ns_index,nvs_obj): # Check if entry exceeds max number of entries allowed per page if self.entry_num >= Page.PAGE_PARAMS["max_entries"]: raise PageFullError() entry_struct = bytearray(b'\xff') * 32 entry_struct[0] = ns_index # namespace index entry_struct[2] = 0x01 # Span chunk_index = Page.CHUNK_ANY entry_struct[3] = chunk_index # write key key_array = b'\x00' * 16 entry_struct[8:24] = key_array entry_struct[8:8 + len(key)] = key.encode() if encoding == "u8": entry_struct[1] = Page.U8 struct.pack_into(' 15: raise InputError("Error: Length of key `%s` should be <= 15 characters." % row["key"]) write_entry(nvs_obj, row["key"], row["type"], row["encoding"], row["value"]) except (InputError) as e: print(e) input_file.close() output_file.close() sys.exit(-2) input_file.close() output_file.close() print("NVS binary created: " + output_filename) if key_gen: keys_page_buf = bytearray(b'\xff') * Page.PAGE_PARAMS["max_size"] key_bytes = bytearray() if len(key_input) == key_len_needed: key_bytes = key_input else: key_bytes = codecs.decode(key_input, 'hex') key_len = len(key_bytes) keys_page_buf[0:key_len] = key_bytes crc_data = keys_page_buf[0:key_len] crc_data = bytes(crc_data) crc = zlib.crc32(crc_data, 0xFFFFFFFF) struct.pack_into('