Merge branch 'update/nvs_partition_utility_v3.1' into 'release/v3.1'

Update/nvs partition utility v3.1 (backport v3.1)

See merge request idf/esp-idf!4275
This commit is contained in:
Angus Gratton 2019-04-03 11:12:23 +08:00
commit 64169e7d12
6 changed files with 188 additions and 120 deletions

View file

@ -17,6 +17,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# #
from __future__ import division, print_function
from builtins import int, range, bytes
from io import open
import sys import sys
import argparse import argparse
import binascii import binascii
@ -26,10 +29,13 @@ import os
import array import array
import csv import csv
import zlib import zlib
import codecs
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
""" Class for standard NVS page structure """ """ Class for standard NVS page structure """
class Page(object): class Page(object):
PAGE_PARAMS = { PAGE_PARAMS = {
"max_size": 4096, "max_size": 4096,
@ -59,8 +65,8 @@ class Page(object):
CHUNK_ANY = 0xFF CHUNK_ANY = 0xFF
ACTIVE = 0xFFFFFFFE ACTIVE = 0xFFFFFFFE
FULL = 0xFFFFFFFC FULL = 0xFFFFFFFC
VERSION1=0xFF VERSION1 = 0xFF
VERSION2=0xFE VERSION2 = 0xFE
def __init__(self, page_num): def __init__(self, page_num):
self.entry_num = 0 self.entry_num = 0
@ -76,23 +82,22 @@ class Page(object):
global page_header global page_header
# set page state to active # set page state to active
page_header= bytearray(b'\xff')*32 page_header = bytearray(b'\xff') * 32
page_state_active_seq = Page.ACTIVE page_state_active_seq = Page.ACTIVE
page_header[0:4] = struct.pack('<I', page_state_active_seq) struct.pack_into('<I', page_header, 0, page_state_active_seq)
# set page sequence number # set page sequence number
page_header[4:8] = struct.pack('<I', page_num) struct.pack_into('<I', page_header, 4, page_num)
# set version # set version
if version == Page.VERSION2: if version == Page.VERSION2:
page_header[8] = Page.VERSION2 page_header[8] = Page.VERSION2
elif version == Page.VERSION1: elif version == Page.VERSION1:
page_header[8] = Page.VERSION1 page_header[8] = Page.VERSION1
# set header's CRC # set header's CRC
crc_data = page_header[4:28] crc_data = bytes(page_header[4:28])
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF) crc = zlib.crc32(crc_data, 0xFFFFFFFF)
page_header[28:32] = struct.pack('<I', crc & 0xFFFFFFFF) struct.pack_into('<I', page_header, 28, crc & 0xFFFFFFFF)
self.page_buf[0:len(page_header)] = page_header self.page_buf[0:len(page_header)] = page_header
def create_bitmap_array(self): def create_bitmap_array(self):
bitarray = array.array('B') bitarray = array.array('B')
charsize = 32 # bitmaparray has 256 bits, hence 32 bytes charsize = 32 # bitmaparray has 256 bits, hence 32 bytes
@ -100,10 +105,9 @@ class Page(object):
bitarray.extend((fill,) * charsize) bitarray.extend((fill,) * charsize)
return bitarray return bitarray
def write_bitmaparray(self): def write_bitmaparray(self):
bitnum = self.entry_num * 2 bitnum = self.entry_num * 2
byte_idx = bitnum / 8 # Find byte index in the array byte_idx = bitnum // 8 # Find byte index in the array
bit_offset = bitnum & 7 # Find bit offset in given byte index bit_offset = bitnum & 7 # Find bit offset in given byte index
mask = ~(1 << bit_offset) mask = ~(1 << bit_offset)
self.bitmap_array[byte_idx] &= mask self.bitmap_array[byte_idx] &= mask
@ -111,50 +115,50 @@ class Page(object):
end_idx = Page.BITMAPARRAY_OFFSET + Page.BITMAPARRAY_SIZE_IN_BYTES end_idx = Page.BITMAPARRAY_OFFSET + Page.BITMAPARRAY_SIZE_IN_BYTES
self.page_buf[start_idx:end_idx] = self.bitmap_array self.page_buf[start_idx:end_idx] = self.bitmap_array
def encrypt_entry(self, data_arr, tweak_arr, encr_key): def encrypt_entry(self, data_arr, tweak_arr, encr_key):
# Encrypt 32 bytes of data using AES-XTS encryption # Encrypt 32 bytes of data using AES-XTS encryption
backend = default_backend() backend = default_backend()
plain_text = data_arr.decode('hex') plain_text = codecs.decode(data_arr, 'hex')
tweak = tweak_arr.decode('hex') tweak = codecs.decode(tweak_arr, 'hex')
cipher = Cipher(algorithms.AES(encr_key), modes.XTS(tweak), backend=backend) cipher = Cipher(algorithms.AES(encr_key), modes.XTS(tweak), backend=backend)
encryptor = cipher.encryptor() encryptor = cipher.encryptor()
encrypted_data = encryptor.update(plain_text) encrypted_data = encryptor.update(plain_text)
return encrypted_data return encrypted_data
def reverse_hexbytes(self, addr_tmp): def reverse_hexbytes(self, addr_tmp):
addr = [] addr = []
reversed_bytes = "" reversed_bytes = ""
for i in range(0, len(addr_tmp), 2): for i in range(0, len(addr_tmp), 2):
addr.append(addr_tmp[i:i+2]) addr.append(addr_tmp[i:i + 2])
reversed_bytes = "".join(reversed(addr)) reversed_bytes = "".join(reversed(addr))
return reversed_bytes return reversed_bytes
def encrypt_data(self, data_input, no_of_entries, nvs_obj): def encrypt_data(self, data_input, no_of_entries, nvs_obj):
# Set values needed for encryption and encrypt data byte wise # Set values needed for encryption and encrypt data byte wise
encr_data_to_write = '' encr_data_to_write = bytearray()
data_len_needed = 64 #in hex data_len_needed = 64 # in hex
tweak_len_needed = 32 #in hex tweak_len_needed = 32 # in hex
init_tweak_val = '0' init_tweak_val = '0'
init_data_val = 'f' init_data_val = 'f'
tweak_tmp = '' tweak_tmp = ''
encr_key_input = None encr_key_input = None
# Extract encryption key and tweak key from given key input # Extract encryption key and tweak key from given key input
encr_key_input = self.encr_key.decode('hex') if len(self.encr_key) == key_len_needed:
encr_key_input = self.encr_key
else:
encr_key_input = codecs.decode(self.encr_key, 'hex')
rel_addr = nvs_obj.page_num * Page.PAGE_PARAMS["max_size"] + Page.FIRST_ENTRY_OFFSET rel_addr = nvs_obj.page_num * Page.PAGE_PARAMS["max_size"] + Page.FIRST_ENTRY_OFFSET
if type(data_input) != bytearray: if not isinstance(data_input, bytearray):
byte_arr = bytearray('\xff') * 32 byte_arr = bytearray(b'\xff') * 32
byte_arr[0:len(data_input)] = data_input byte_arr[0:len(data_input)] = data_input
data_input = byte_arr data_input = byte_arr
data_input = binascii.hexlify(bytearray(data_input)) data_input = binascii.hexlify(data_input)
entry_no = self.entry_num entry_no = self.entry_num
start_idx = 0 start_idx = 0
@ -179,6 +183,8 @@ class Page(object):
# Encrypt data # Encrypt data
data_bytes = data_input[start_idx:end_idx] data_bytes = data_input[start_idx:end_idx]
if type(data_bytes) == bytes:
data_bytes = data_bytes.decode()
data_val = data_bytes + (init_data_val * (data_len_needed - len(data_bytes))) data_val = data_bytes + (init_data_val * (data_len_needed - len(data_bytes)))
encr_data_ret = self.encrypt_entry(data_val, tweak_val, encr_key_input) encr_data_ret = self.encrypt_entry(data_val, tweak_val, encr_key_input)
encr_data_to_write = encr_data_to_write + encr_data_ret encr_data_to_write = encr_data_to_write + encr_data_ret
@ -190,7 +196,6 @@ class Page(object):
return encr_data_to_write return encr_data_to_write
def write_entry_to_buf(self, data, entrycount,nvs_obj): def write_entry_to_buf(self, data, entrycount,nvs_obj):
encr_data = bytearray() encr_data = bytearray()
if self.is_encrypt: if self.is_encrypt:
@ -208,16 +213,16 @@ class Page(object):
self.write_bitmaparray() self.write_bitmaparray()
self.entry_num += 1 self.entry_num += 1
def set_crc_header(self, entry_struct): def set_crc_header(self, entry_struct):
crc_data = bytearray(28) crc_data = bytearray(b'28')
crc_data[0:4] = entry_struct[0:4] crc_data[0:4] = entry_struct[0:4]
crc_data[4:28] = entry_struct[8:32] crc_data[4:28] = entry_struct[8:32]
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF) crc_data = bytes(crc_data)
entry_struct[4:8] = struct.pack('<I', crc & 0xFFFFFFFF) crc = zlib.crc32(crc_data, 0xFFFFFFFF)
struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF)
return entry_struct return entry_struct
def write_varlen_binary_data(self, entry_struct, ns_index, key, data, data_size, total_entry_count,nvs_obj): def write_varlen_binary_data(self, entry_struct, ns_index, key, data, data_size, total_entry_count, encoding, nvs_obj):
chunk_start = 0 chunk_start = 0
chunk_count = 0 chunk_count = 0
chunk_index = Page.CHUNK_ANY chunk_index = Page.CHUNK_ANY
@ -229,8 +234,8 @@ class Page(object):
chunk_size = 0 chunk_size = 0
# Get the size available in current page # Get the size available in current page
if self.entry_num < (Page.PAGE_PARAMS["max_entries"] - 1):
tailroom = (Page.PAGE_PARAMS["max_entries"] - self.entry_num - 1) * Page.SINGLE_ENTRY_SIZE tailroom = (Page.PAGE_PARAMS["max_entries"] - self.entry_num - 1) * Page.SINGLE_ENTRY_SIZE
assert tailroom >= 0, "Page overflow!!"
# Split the binary data into two and store a chunk of available size onto curr page # Split the binary data into two and store a chunk of available size onto curr page
if tailroom < remaining_size: if tailroom < remaining_size:
@ -245,7 +250,7 @@ class Page(object):
# Calculate no. of entries data chunk will require # Calculate no. of entries data chunk will require
datachunk_rounded_size = (chunk_size + 31) & ~31 datachunk_rounded_size = (chunk_size + 31) & ~31
datachunk_entry_count = datachunk_rounded_size / 32 datachunk_entry_count = datachunk_rounded_size // 32
datachunk_total_entry_count = datachunk_entry_count + 1 # +1 for the entry header datachunk_total_entry_count = datachunk_entry_count + 1 # +1 for the entry header
# Set Span # Set Span
@ -259,9 +264,12 @@ class Page(object):
data_chunk = data[offset:offset + chunk_size] data_chunk = data[offset:offset + chunk_size]
# Compute CRC of data chunk # Compute CRC of data chunk
entry_struct[24:26] = struct.pack('<H', chunk_size) struct.pack_into('<H', entry_struct, 24, chunk_size)
if type(data) != bytes:
data_chunk = bytes(data_chunk, encoding='utf8')
crc = zlib.crc32(data_chunk, 0xFFFFFFFF) crc = zlib.crc32(data_chunk, 0xFFFFFFFF)
entry_struct[28:32] = struct.pack('<I', crc & 0xFFFFFFFF) struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF)
# compute crc of entry header # compute crc of entry header
entry_struct = self.set_crc_header(entry_struct) entry_struct = self.set_crc_header(entry_struct)
@ -276,17 +284,16 @@ class Page(object):
if remaining_size or (tailroom - chunk_size) < Page.SINGLE_ENTRY_SIZE: if remaining_size or (tailroom - chunk_size) < Page.SINGLE_ENTRY_SIZE:
if page_header[0:4] != Page.FULL: if page_header[0:4] != Page.FULL:
page_state_full_seq = Page.FULL page_state_full_seq = Page.FULL
page_header[0:4] = struct.pack('<I', page_state_full_seq) struct.pack_into('<I', page_header, 0, page_state_full_seq)
nvs_obj.create_new_page() nvs_obj.create_new_page()
self = nvs_obj.cur_page self = nvs_obj.cur_page
offset = offset + chunk_size offset = offset + chunk_size
# All chunks are stored, now store the index # All chunks are stored, now store the index
if not remaining_size: if not remaining_size:
# Initialise data field to 0xff # Initialise data field to 0xff
data_array = bytearray('\xff')*8 data_array = bytearray(b'\xff') * 8
entry_struct[24:32] = data_array entry_struct[24:32] = data_array
# change type of data to BLOB_IDX # change type of data to BLOB_IDX
@ -299,7 +306,7 @@ class Page(object):
chunk_index = Page.CHUNK_ANY chunk_index = Page.CHUNK_ANY
entry_struct[3] = chunk_index entry_struct[3] = chunk_index
entry_struct[24:28] = struct.pack('<I', data_size) struct.pack_into('<I', entry_struct, 24, data_size)
entry_struct[28] = chunk_count entry_struct[28] = chunk_count
entry_struct[29] = chunk_start entry_struct[29] = chunk_start
@ -312,12 +319,11 @@ class Page(object):
return entry_struct return entry_struct
def write_single_page_entry(self, entry_struct, data, datalen, data_entry_count, nvs_obj): def write_single_page_entry(self, entry_struct, data, datalen, data_entry_count, nvs_obj):
# compute CRC of data # compute CRC of data
entry_struct[24:26] = struct.pack('<H', datalen) struct.pack_into('<H', entry_struct, 24, datalen)
crc = zlib.crc32(data, 0xFFFFFFFF) crc = zlib.crc32(data, 0xFFFFFFFF)
entry_struct[28:32] = struct.pack('<I', crc & 0xFFFFFFFF) struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF)
# compute crc of entry header # compute crc of entry header
entry_struct = self.set_crc_header(entry_struct) entry_struct = self.set_crc_header(entry_struct)
@ -327,7 +333,6 @@ class Page(object):
# write actual data # write actual data
self.write_entry_to_buf(data, data_entry_count, nvs_obj) self.write_entry_to_buf(data, data_entry_count, nvs_obj)
""" """
Low-level function to write variable length data into page buffer. Data should be formatted Low-level function to write variable length data into page buffer. Data should be formatted
according to encoding specified. according to encoding specified.
@ -336,27 +341,27 @@ class Page(object):
# Set size of data # Set size of data
datalen = len(data) datalen = len(data)
if version == Page.VERSION1:
if datalen > Page.PAGE_PARAMS["max_old_blob_size"]: if datalen > Page.PAGE_PARAMS["max_old_blob_size"]:
raise InputError("%s: Size exceeds max allowed length." % key) if version == Page.VERSION1:
raise InputError("Version %s\n%s: Size exceeds max allowed length." % (VERSION1_PRINT,key))
if version == Page.VERSION2: else:
if encoding == "string": if encoding == "string":
if datalen > Page.PAGE_PARAMS["max_new_blob_size"]: raise InputError("Version %s\n%s: Size exceeds max allowed length." % (VERSION2_PRINT,key))
raise InputError("%s: Size exceeds max allowed length." % key)
# Calculate no. of entries data will require # Calculate no. of entries data will require
rounded_size = (datalen + 31) & ~31 rounded_size = (datalen + 31) & ~31
data_entry_count = rounded_size / 32 data_entry_count = rounded_size // 32
total_entry_count = data_entry_count + 1 # +1 for the entry header total_entry_count = data_entry_count + 1 # +1 for the entry header
# Check if page is already full and new page is needed to be created right away # Check if page is already full and new page is needed to be created right away
if encoding == "string": if self.entry_num >= Page.PAGE_PARAMS["max_entries"]:
if (self.entry_num + total_entry_count) >= Page.PAGE_PARAMS["max_entries"]: raise PageFullError()
elif (self.entry_num + total_entry_count) >= Page.PAGE_PARAMS["max_entries"]:
if not (version == Page.VERSION2 and encoding in ["hex2bin", "binary", "base64"]):
raise PageFullError() raise PageFullError()
# Entry header # Entry header
entry_struct = bytearray('\xff')*32 entry_struct = bytearray(b'\xff') * 32
# Set Namespace Index # Set Namespace Index
entry_struct[0] = ns_index entry_struct[0] = ns_index
# Set Span # Set Span
@ -370,9 +375,9 @@ class Page(object):
entry_struct[2] = data_entry_count + 1 entry_struct[2] = data_entry_count + 1
# set key # set key
key_array = bytearray('\x00')*16 key_array = b'\x00' * 16
entry_struct[8:24] = key_array entry_struct[8:24] = key_array
entry_struct[8:8 + len(key)] = key entry_struct[8:8 + len(key)] = key.encode()
# set Type # set Type
if encoding == "string": if encoding == "string":
@ -381,52 +386,51 @@ class Page(object):
entry_struct[1] = Page.BLOB entry_struct[1] = Page.BLOB
if version == Page.VERSION2 and (encoding in ["hex2bin", "binary", "base64"]): if version == Page.VERSION2 and (encoding in ["hex2bin", "binary", "base64"]):
entry_struct = self.write_varlen_binary_data(entry_struct,ns_index,key,data,\ entry_struct = self.write_varlen_binary_data(entry_struct,ns_index,key,data,
datalen,total_entry_count, nvs_obj) datalen,total_entry_count, encoding, nvs_obj)
else: else:
self.write_single_page_entry(entry_struct, data, datalen, data_entry_count, nvs_obj) self.write_single_page_entry(entry_struct, data, datalen, data_entry_count, nvs_obj)
""" Low-level function to write data of primitive type into page buffer. """ """ Low-level function to write data of primitive type into page buffer. """
def write_primitive_data(self, key, data, encoding, ns_index,nvs_obj): def write_primitive_data(self, key, data, encoding, ns_index,nvs_obj):
# Check if entry exceeds max number of entries allowed per page # Check if entry exceeds max number of entries allowed per page
if self.entry_num >= Page.PAGE_PARAMS["max_entries"]: if self.entry_num >= Page.PAGE_PARAMS["max_entries"]:
raise PageFullError() raise PageFullError()
entry_struct = bytearray('\xff')*32 entry_struct = bytearray(b'\xff') * 32
entry_struct[0] = ns_index # namespace index entry_struct[0] = ns_index # namespace index
entry_struct[2] = 0x01 # Span entry_struct[2] = 0x01 # Span
chunk_index = Page.CHUNK_ANY chunk_index = Page.CHUNK_ANY
entry_struct[3] = chunk_index entry_struct[3] = chunk_index
# write key # write key
key_array = bytearray('\x00')*16 key_array = b'\x00' * 16
entry_struct[8:24] = key_array entry_struct[8:24] = key_array
entry_struct[8:8 + len(key)] = key entry_struct[8:8 + len(key)] = key.encode()
if encoding == "u8": if encoding == "u8":
entry_struct[1] = Page.U8 entry_struct[1] = Page.U8
entry_struct[24] = struct.pack('<B', data) struct.pack_into('<B', entry_struct, 24, data)
elif encoding == "i8": elif encoding == "i8":
entry_struct[1] = Page.I8 entry_struct[1] = Page.I8
entry_struct[24] = struct.pack('<b', data) struct.pack_into('<b', entry_struct, 24, data)
elif encoding == "u16": elif encoding == "u16":
entry_struct[1] = Page.U16 entry_struct[1] = Page.U16
entry_struct[24:26] = struct.pack('<H', data) struct.pack_into('<H', entry_struct, 24, data)
elif encoding == "u32": elif encoding == "u32":
entry_struct[1] = Page.U32 entry_struct[1] = Page.U32
entry_struct[24:28] = struct.pack('<I', data) struct.pack_into('<I', entry_struct, 24, data)
elif encoding == "i32": elif encoding == "i32":
entry_struct[1] = Page.I32 entry_struct[1] = Page.I32
entry_struct[24:28] = struct.pack('<i', data) struct.pack_into('<i', entry_struct, 24, data)
# Compute CRC # Compute CRC
crc_data = bytearray(28) crc_data = bytearray(b'28')
crc_data[0:4] = entry_struct[0:4] crc_data[0:4] = entry_struct[0:4]
crc_data[4:28] = entry_struct[8:32] crc_data[4:28] = entry_struct[8:32]
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF) crc_data = bytes(crc_data)
entry_struct[4:8] = struct.pack('<I', crc & 0xFFFFFFFF) crc = zlib.crc32(crc_data, 0xFFFFFFFF)
struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF)
# write to file # write to file
self.write_entry_to_buf(entry_struct, 1,nvs_obj) self.write_entry_to_buf(entry_struct, 1,nvs_obj)
@ -435,9 +439,13 @@ class Page(object):
def get_data(self): def get_data(self):
return self.page_buf return self.page_buf
""" """
NVS class encapsulates all NVS specific operations to create a binary with given key-value pairs. Binary can later be flashed onto device via a flashing utility. NVS class encapsulates all NVS specific operations to create a binary with given key-value pairs.
Binary can later be flashed onto device via a flashing utility.
""" """
class NVS(object): class NVS(object):
def __init__(self, fout): def __init__(self, fout):
self.namespace_idx = 0 self.namespace_idx = 0
@ -450,7 +458,7 @@ class NVS(object):
return self return self
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
if exc_type == None and exc_value == None: if exc_type is None and exc_value is None:
result = self.get_binary_data() result = self.get_binary_data()
self.fout.write(result) self.fout.write(result)
@ -459,6 +467,7 @@ class NVS(object):
new_page = Page(self.page_num) new_page = Page(self.page_num)
new_page.version = version new_page.version = version
new_page.is_encrypt = is_encrypt_data new_page.is_encrypt = is_encrypt_data
if new_page.is_encrypt:
new_page.encr_key = key_input new_page.encr_key = key_input
self.pages.append(new_page) self.pages.append(new_page)
self.cur_page = new_page self.cur_page = new_page
@ -475,7 +484,6 @@ class NVS(object):
except PageFullError: except PageFullError:
new_page = self.create_new_page() new_page = self.create_new_page()
new_page.write_primitive_data(key, self.namespace_idx, "u8", 0,self) new_page.write_primitive_data(key, self.namespace_idx, "u8", 0,self)
pass
""" """
Write key-value pair. Function accepts value in the form of ascii character and converts Write key-value pair. Function accepts value in the form of ascii character and converts
@ -493,6 +501,8 @@ class NVS(object):
value = binascii.a2b_base64(value) value = binascii.a2b_base64(value)
if encoding == "string": if encoding == "string":
if type(value) == bytes:
value = value.decode()
value += '\0' value += '\0'
encoding = encoding.lower() encoding = encoding.lower()
@ -504,15 +514,12 @@ class NVS(object):
except PageFullError: except PageFullError:
new_page = self.create_new_page() new_page = self.create_new_page()
new_page.write_varlen_data(key, value, encoding, self.namespace_idx,self) new_page.write_varlen_data(key, value, encoding, self.namespace_idx,self)
pass
elif encoding in primitive_encodings: elif encoding in primitive_encodings:
try: try:
self.cur_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self) self.cur_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self)
except PageFullError: except PageFullError:
new_page = self.create_new_page() new_page = self.create_new_page()
new_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self) new_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self)
sys.exc_clear()
pass
else: else:
raise InputError("%s: Unsupported encoding" % encoding) raise InputError("%s: Unsupported encoding" % encoding)
@ -523,6 +530,7 @@ class NVS(object):
data += page.get_data() data += page.get_data()
return data return data
class PageFullError(RuntimeError): class PageFullError(RuntimeError):
""" """
Represents error when current page doesn't have sufficient entries left Represents error when current page doesn't have sufficient entries left
@ -531,6 +539,7 @@ class PageFullError(RuntimeError):
def __init__(self): def __init__(self):
super(PageFullError, self).__init__() super(PageFullError, self).__init__()
class InputError(RuntimeError): class InputError(RuntimeError):
""" """
Represents error on the input Represents error on the input
@ -546,6 +555,7 @@ def nvs_open(result_obj):
""" """
return NVS(result_obj) return NVS(result_obj)
def write_entry(nvs_instance, key, datatype, encoding, value): def write_entry(nvs_instance, key, datatype, encoding, value):
""" Wrapper to set key-value pair in NVS format """ Wrapper to set key-value pair in NVS format
@ -559,7 +569,7 @@ def write_entry(nvs_instance, key, datatype, encoding, value):
if datatype == "file": if datatype == "file":
abs_file_path = value abs_file_path = value
if os.path.isabs(value) == False: if os.path.isabs(value) is False:
script_dir = os.path.dirname(__file__) script_dir = os.path.dirname(__file__)
abs_file_path = os.path.join(script_dir, value) abs_file_path = os.path.join(script_dir, value)
with open(abs_file_path, 'rb') as f: with open(abs_file_path, 'rb') as f:
@ -570,6 +580,7 @@ def write_entry(nvs_instance, key, datatype, encoding, value):
else: else:
nvs_instance.write_entry(key, value, encoding) nvs_instance.write_entry(key, value, encoding)
def nvs_close(nvs_instance): def nvs_close(nvs_instance):
""" Wrapper to finish writing to NVS and write data to file/stream object provided to nvs_open method """ Wrapper to finish writing to NVS and write data to file/stream object provided to nvs_open method
@ -589,7 +600,9 @@ def nvs_part_gen(input_filename=None, output_filename=None, key_gen=None, encryp
:param version_no: NVS format version :param version_no: NVS format version
:return: None :return: None
""" """
global version, is_encrypt_data, key_input global version, is_encrypt_data, key_input, key_len_needed
key_len_needed = 64
version = version_no version = version_no
key_input = None key_input = None
is_encrypt_data = encrypt_mode is_encrypt_data = encrypt_mode
@ -645,15 +658,20 @@ def nvs_part_gen(input_filename=None, output_filename=None, key_gen=None, encryp
input_file.close() input_file.close()
output_file.close() output_file.close()
if is_encrypt_data: if key_gen:
output_keys_file = open("encryption_keys.bin",'wb') keys_page_buf = bytearray(b'\xff') * Page.PAGE_PARAMS["max_size"]
keys_page_buf = bytearray(b'\xff')*Page.PAGE_PARAMS["max_size"] key_bytes = bytearray()
key_bytes = key_input.decode('hex') if len(key_input) == key_len_needed:
key_bytes = key_input
else:
key_bytes = codecs.decode(key_input, 'hex')
key_len = len(key_bytes) key_len = len(key_bytes)
keys_page_buf[0:key_len] = key_bytes keys_page_buf[0:key_len] = key_bytes
crc_data = keys_page_buf[0:key_len] crc_data = keys_page_buf[0:key_len]
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF) crc_data = bytes(crc_data)
keys_page_buf[64:68] = struct.pack('<I', crc & 0xFFFFFFFF) crc = zlib.crc32(crc_data, 0xFFFFFFFF)
struct.pack_into('<I', keys_page_buf, key_len, crc & 0xFFFFFFFF)
with open("encryption_keys.bin",'wb') as output_keys_file:
output_keys_file.write(keys_page_buf) output_keys_file.write(keys_page_buf)
@ -704,6 +722,5 @@ def main():
nvs_part_gen(input_filename, output_filename, key_gen, is_encrypt_data, key_file, version_no) nvs_part_gen(input_filename, output_filename, key_gen, is_encrypt_data, key_file, version_no)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View file

@ -11,4 +11,6 @@ dummyBase64Key,data,base64,MTIzYWJj
hexFileKey,file,hex2bin,testdata/sample.hex hexFileKey,file,hex2bin,testdata/sample.hex
base64FileKey,file,base64,testdata/sample.base64 base64FileKey,file,base64,testdata/sample.base64
stringFileKey,file,string,testdata/sample.txt stringFileKey,file,string,testdata/sample.txt
blobFileAKey,file,binary,testdata/sample_blob.bin
blobFileBKey,file,binary,testdata/sample_blob.bin
binFileKey,file,binary,testdata/sample_multipage_blob.bin binFileKey,file,binary,testdata/sample_multipage_blob.bin

1 key type encoding value
11 hexFileKey file hex2bin testdata/sample.hex
12 base64FileKey file base64 testdata/sample.base64
13 stringFileKey file string testdata/sample.txt
14 blobFileAKey file binary testdata/sample_blob.bin
15 blobFileBKey file binary testdata/sample_blob.bin
16 binFileKey file binary testdata/sample_multipage_blob.bin

View file

@ -11,4 +11,6 @@ dummyBase64Key,data,base64,MTIzYWJj
hexFileKey,file,hex2bin,testdata/sample.hex hexFileKey,file,hex2bin,testdata/sample.hex
base64FileKey,file,base64,testdata/sample.base64 base64FileKey,file,base64,testdata/sample.base64
stringFileKey,file,string,testdata/sample.txt stringFileKey,file,string,testdata/sample.txt
blobFileAKey,file,binary,testdata/sample_blob.bin
blobFileBKey,file,binary,testdata/sample_blob.bin
binFileKey,file,binary,testdata/sample_singlepage_blob.bin binFileKey,file,binary,testdata/sample_singlepage_blob.bin

1 key type encoding value
11 hexFileKey file hex2bin testdata/sample.hex
12 base64FileKey file base64 testdata/sample.base64
13 stringFileKey file string testdata/sample.txt
14 blobFileAKey file binary testdata/sample_blob.bin
15 blobFileBKey file binary testdata/sample_blob.bin
16 binFileKey file binary testdata/sample_singlepage_blob.bin

View file

@ -0,0 +1 @@
start0000000000000000000000start0123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef00000000000000000123456789abcdef0000000000000000end00000000000000000000000000end

View file

@ -2131,8 +2131,10 @@ TEST_CASE("read data from partition generated via partition generation utility w
{ {
SpiFlashEmulator emu("../nvs_partition_generator/partition_single_page.bin"); SpiFlashEmulator emu("../nvs_partition_generator/partition_single_page.bin");
nvs_handle handle; nvs_handle handle;
TEST_ESP_OK( nvs_flash_init_custom("test", 0, 2) ); TEST_ESP_OK( nvs_flash_init_custom("test", 0, 3) );
TEST_ESP_OK( nvs_open_from_partition("test", "dummyNamespace", NVS_READONLY, &handle)); TEST_ESP_OK( nvs_open_from_partition("test", "dummyNamespace", NVS_READONLY, &handle));
uint8_t u8v; uint8_t u8v;
TEST_ESP_OK( nvs_get_u8(handle, "dummyU8Key", &u8v)); TEST_ESP_OK( nvs_get_u8(handle, "dummyU8Key", &u8v));
CHECK(u8v == 127); CHECK(u8v == 127);
@ -2174,16 +2176,30 @@ TEST_CASE("read data from partition generated via partition generation utility w
TEST_ESP_OK( nvs_get_str(handle, "stringFileKey", buf, &buflen)); TEST_ESP_OK( nvs_get_str(handle, "stringFileKey", buf, &buflen));
CHECK(memcmp(buf, strfiledata, buflen) == 0); CHECK(memcmp(buf, strfiledata, buflen) == 0);
char bin_data[5200]; char bin_data[1984];
size_t bin_len = sizeof(bin_data); size_t bin_len = sizeof(bin_data);
char binfiledata[5200]; char binfiledata[1984];
ifstream file; ifstream file;
file.open("../nvs_partition_generator/testdata/sample_singlepage_blob.bin"); file.open("../nvs_partition_generator/testdata/sample_blob.bin");
file.read(binfiledata,5200); file.read(binfiledata,1984);
TEST_ESP_OK( nvs_get_blob(handle, "binFileKey", bin_data, &bin_len)); TEST_ESP_OK( nvs_get_blob(handle, "blobFileAKey", bin_data, &bin_len));
CHECK(memcmp(bin_data, binfiledata, bin_len) == 0); CHECK(memcmp(bin_data, binfiledata, bin_len) == 0);
file.close(); bin_len = sizeof(bin_data);
file.open("../nvs_partition_generator/testdata/sample_blob.bin");
file.read(binfiledata,1984);
TEST_ESP_OK( nvs_get_blob(handle, "blobFileBKey", bin_data, &bin_len));
CHECK(memcmp(bin_data, binfiledata, bin_len) == 0);
char blob_data[5200];
size_t blob_len = sizeof(blob_data);
char blobfiledata[5200];
ifstream file1;
file1.open("../nvs_partition_generator/testdata/sample_singlepage_blob.bin");
file1.read(blobfiledata,5200);
TEST_ESP_OK( nvs_get_blob(handle, "binFileKey", blob_data, &blob_len));
CHECK(memcmp(blob_data, blobfiledata, blob_len) == 0);
file1.close();
nvs_close(handle); nvs_close(handle);
} }
@ -2210,7 +2226,7 @@ TEST_CASE("read data from partition generated via partition generation utility w
{ {
SpiFlashEmulator emu("../nvs_partition_generator/partition_multipage_blob.bin"); SpiFlashEmulator emu("../nvs_partition_generator/partition_multipage_blob.bin");
nvs_handle handle; nvs_handle handle;
TEST_ESP_OK( nvs_flash_init_custom("test", 0, 3) ); TEST_ESP_OK( nvs_flash_init_custom("test", 0, 4) );
TEST_ESP_OK( nvs_open_from_partition("test", "dummyNamespace", NVS_READONLY, &handle)); TEST_ESP_OK( nvs_open_from_partition("test", "dummyNamespace", NVS_READONLY, &handle));
uint8_t u8v; uint8_t u8v;
TEST_ESP_OK( nvs_get_u8(handle, "dummyU8Key", &u8v)); TEST_ESP_OK( nvs_get_u8(handle, "dummyU8Key", &u8v));
@ -2253,16 +2269,31 @@ TEST_CASE("read data from partition generated via partition generation utility w
TEST_ESP_OK( nvs_get_str(handle, "stringFileKey", buf, &buflen)); TEST_ESP_OK( nvs_get_str(handle, "stringFileKey", buf, &buflen));
CHECK(memcmp(buf, strfiledata, buflen) == 0); CHECK(memcmp(buf, strfiledata, buflen) == 0);
char bin_data[5200]; char bin_data[1984];
size_t bin_len = sizeof(bin_data); size_t bin_len = sizeof(bin_data);
char binfiledata[5200]; char binfiledata[1984];
ifstream file; ifstream file;
file.open("../nvs_partition_generator/testdata/sample_multipage_blob.bin"); file.open("../nvs_partition_generator/testdata/sample_blob.bin");
file.read(binfiledata,5200); file.read(binfiledata,1984);
TEST_ESP_OK( nvs_get_blob(handle, "binFileKey", bin_data, &bin_len)); TEST_ESP_OK( nvs_get_blob(handle, "blobFileAKey", bin_data, &bin_len));
CHECK(memcmp(bin_data, binfiledata, bin_len) == 0); CHECK(memcmp(bin_data, binfiledata, bin_len) == 0);
file.close(); bin_len = sizeof(bin_data);
file.open("../nvs_partition_generator/testdata/sample_blob.bin");
file.read(binfiledata,1984);
TEST_ESP_OK( nvs_get_blob(handle, "blobFileBKey", bin_data, &bin_len));
CHECK(memcmp(bin_data, binfiledata, bin_len) == 0);
char blob_data[5200];
size_t blob_len = sizeof(blob_data);
char blobfiledata[5200];
ifstream file1;
file1.open("../nvs_partition_generator/testdata/sample_multipage_blob.bin");
file1.read(blobfiledata,5200);
TEST_ESP_OK( nvs_get_blob(handle, "binFileKey", blob_data, &blob_len));
CHECK(memcmp(blob_data, blobfiledata, blob_len) == 0);
file1.close();
} }
#endif #endif
@ -2438,7 +2469,7 @@ TEST_CASE("test nvs apis for nvs partition generator utility with encryption ena
xts_cfg.tky[count] = 0x22; xts_cfg.tky[count] = 0x22;
} }
TEST_ESP_OK(nvs_flash_secure_init_custom(NVS_DEFAULT_PART_NAME, 0, 3, &xts_cfg)); TEST_ESP_OK(nvs_flash_secure_init_custom(NVS_DEFAULT_PART_NAME, 0, 4, &xts_cfg));
TEST_ESP_OK(nvs_open_from_partition(NVS_DEFAULT_PART_NAME, "dummyNamespace", NVS_READONLY, &handle)); TEST_ESP_OK(nvs_open_from_partition(NVS_DEFAULT_PART_NAME, "dummyNamespace", NVS_READONLY, &handle));
@ -2488,15 +2519,30 @@ TEST_CASE("test nvs apis for nvs partition generator utility with encryption ena
TEST_ESP_OK( nvs_get_str(handle, "stringFileKey", buf, &buflen)); TEST_ESP_OK( nvs_get_str(handle, "stringFileKey", buf, &buflen));
CHECK(memcmp(buf, strfiledata, buflen) == 0); CHECK(memcmp(buf, strfiledata, buflen) == 0);
char bin_data[5120]; char bin_data[1984];
size_t bin_len = sizeof(bin_data); size_t bin_len = sizeof(bin_data);
char binfiledata[5200]; char binfiledata[1984];
ifstream file; ifstream file;
file.open("../nvs_partition_generator/testdata/sample_multipage_blob.bin"); file.open("../nvs_partition_generator/testdata/sample_blob.bin");
file.read(binfiledata,5120); file.read(binfiledata,1984);
TEST_ESP_OK( nvs_get_blob(handle, "binFileKey", bin_data, &bin_len)); TEST_ESP_OK( nvs_get_blob(handle, "blobFileAKey", bin_data, &bin_len));
CHECK(memcmp(bin_data, binfiledata, bin_len) == 0); CHECK(memcmp(bin_data, binfiledata, bin_len) == 0);
bin_len = sizeof(bin_data);
file.open("../nvs_partition_generator/testdata/sample_blob.bin");
file.read(binfiledata,1984);
TEST_ESP_OK( nvs_get_blob(handle, "blobFileBKey", bin_data, &bin_len));
CHECK(memcmp(bin_data, binfiledata, bin_len) == 0);
char blob_data[5120];
size_t blob_len = sizeof(blob_data);
char blobfiledata[5200];
ifstream file1;
file1.open("../nvs_partition_generator/testdata/sample_multipage_blob.bin");
file1.read(blobfiledata,5120);
TEST_ESP_OK( nvs_get_blob(handle, "binFileKey", blob_data, &blob_len));
CHECK(memcmp(blob_data, blobfiledata, blob_len) == 0);
file1.close();
nvs_close(handle); nvs_close(handle);
TEST_ESP_OK(nvs_flash_deinit()); TEST_ESP_OK(nvs_flash_deinit());

View file

@ -7,4 +7,4 @@ setuptools
# #
pyserial>=3.0 pyserial>=3.0
future>=0.15.2 future>=0.15.2
cryptography cryptography>=2.1.4