nvs_part_gen: Update to make python version compatible

(cherry picked from commit 3506b2d60792e5d4ed4f744b28f2da6733c6aae7)
This commit is contained in:
Shivani Tipnis 2018-10-08 10:32:09 +05:30
parent 807e72adce
commit 4bc87a414f
2 changed files with 154 additions and 58 deletions

View file

@ -28,6 +28,7 @@ import os
import array
import csv
import zlib
import codecs
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
@ -81,9 +82,9 @@ class Page(object):
# set page state to active
page_header= bytearray(b'\xff')*32
page_state_active_seq = Page.ACTIVE
page_header[0:4] = struct.pack('<I', page_state_active_seq)
struct.pack_into('<I', page_header, 0, page_state_active_seq)
# set page sequence number
page_header[4:8] = struct.pack('<I', page_num)
struct.pack_into('<I', page_header, 4, page_num)
# set version
if version == Page.VERSION2:
page_header[8] = Page.VERSION2
@ -91,8 +92,11 @@ class Page(object):
page_header[8] = Page.VERSION1
# set header's CRC
crc_data = page_header[4:28]
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
page_header[28:32] = struct.pack('<I', crc & 0xFFFFFFFF)
if sys.version_info[0] < 3:
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
else:
crc = zlib.crc32(memoryview(crc_data), 0xFFFFFFFF)
struct.pack_into('<I', page_header, 28, crc & 0xFFFFFFFF)
self.page_buf[0:len(page_header)] = page_header
@ -118,8 +122,13 @@ class Page(object):
def encrypt_entry(self, data_arr, tweak_arr, encr_key):
# Encrypt 32 bytes of data using AES-XTS encryption
backend = default_backend()
plain_text = data_arr.decode('hex')
tweak = tweak_arr.decode('hex')
if sys.version_info[0] < 3:
plain_text = data_arr.decode('hex')
tweak = tweak_arr.decode('hex')
else:
plain_text = codecs.decode(data_arr, 'hex')
tweak = codecs.decode(tweak_arr, 'hex')
cipher = Cipher(algorithms.AES(encr_key), modes.XTS(tweak), backend=backend)
encryptor = cipher.encryptor()
encrypted_data = encryptor.update(plain_text)
@ -139,7 +148,7 @@ class Page(object):
def encrypt_data(self, data_input, no_of_entries, nvs_obj):
# Set values needed for encryption and encrypt data byte wise
encr_data_to_write = ''
encr_data_to_write = bytearray()
data_len_needed = 64 #in hex
tweak_len_needed = 32 #in hex
init_tweak_val = '0'
@ -147,17 +156,34 @@ class Page(object):
tweak_tmp = ''
encr_key_input = None
# Extract encryption key and tweak key from given key input
encr_key_input = self.encr_key.decode('hex')
if sys.version_info[0] < 3:
encr_key_input = self.encr_key.decode('hex')
else:
encr_key_input = codecs.decode(self.encr_key, 'hex')
rel_addr = nvs_obj.page_num * Page.PAGE_PARAMS["max_size"] + Page.FIRST_ENTRY_OFFSET
if type(data_input) != bytearray:
byte_arr = bytearray('\xff') * 32
byte_arr[0:len(data_input)] = data_input
byte_arr = bytearray(b'\xff') * 32
if sys.version_info[0] < 3:
byte_arr[0:len(data_input)] = data_input
else:
if type(data_input) == str:
byte_arr[0:len(data_input)] = data_input
else:
byte_arr[0:len(data_input)] = data_input
data_input = byte_arr
data_input = binascii.hexlify(bytearray(data_input))
if sys.version_info[0] < 3:
data_input = binascii.hexlify(bytearray(data_input))
else:
data_input = data_input.hex()
entry_no = self.entry_num
start_idx = 0
@ -182,8 +208,13 @@ class Page(object):
# Encrypt data
data_bytes = data_input[start_idx:end_idx]
data_val = data_bytes + (init_data_val * (data_len_needed - len(data_bytes)))
if sys.version_info[0] < 3:
data_val = data_bytes + (init_data_val * (data_len_needed - len(data_bytes)))
else:
data_val = str(data_bytes) + (init_data_val * (data_len_needed - len(data_bytes)))
encr_data_ret = self.encrypt_entry(data_val, tweak_val, encr_key_input)
#print("\n<<<\n")
encr_data_to_write = encr_data_to_write + encr_data_ret
# Update values for encrypting next set of data bytes
start_idx = end_idx
@ -198,13 +229,24 @@ class Page(object):
encr_data = bytearray()
if self.is_encrypt:
encr_data_ret = self.encrypt_data(data, entrycount,nvs_obj)
encr_data[0:len(encr_data_ret)] = encr_data_ret
if sys.version_info[0] < 3:
encr_data[0:len(encr_data_ret)] = encr_data_ret
else:
encr_data[0:len(encr_data_ret)] = encr_data_ret
data = encr_data
data_offset = Page.FIRST_ENTRY_OFFSET + (Page.SINGLE_ENTRY_SIZE * self.entry_num)
start_idx = data_offset
end_idx = data_offset + len(data)
self.page_buf[start_idx:end_idx] = data
if not sys.version_info[0] < 3:
if type(data) == str:
self.page_buf[start_idx:end_idx] = data
else:
self.page_buf[start_idx:end_idx] = data
else:
self.page_buf[start_idx:end_idx] = data
# Set bitmap array for entries in current page
for i in range(0, entrycount):
@ -213,11 +255,14 @@ class Page(object):
def set_crc_header(self, entry_struct):
crc_data = bytearray(28)
crc_data = bytearray(b'28')
crc_data[0:4] = entry_struct[0:4]
crc_data[4:28] = entry_struct[8:32]
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
entry_struct[4:8] = struct.pack('<I', crc & 0xFFFFFFFF)
if sys.version_info[0] < 3:
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
else:
crc = zlib.crc32(crc_data, 0xFFFFFFFF)
struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF)
return entry_struct
def write_varlen_binary_data(self, entry_struct, ns_index, key, data, data_size, total_entry_count,nvs_obj):
@ -262,9 +307,13 @@ class Page(object):
data_chunk = data[offset:offset + chunk_size]
# Compute CRC of data chunk
entry_struct[24:26] = struct.pack('<H', chunk_size)
struct.pack_into('<H', entry_struct, 24, chunk_size)
if not sys.version_info[0] < 3:
if type(data_chunk) == str:
data_chunk = codecs.encode(data_chunk, 'utf8')
crc = zlib.crc32(data_chunk, 0xFFFFFFFF)
entry_struct[28:32] = struct.pack('<I', crc & 0xFFFFFFFF)
struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF)
# compute crc of entry header
entry_struct = self.set_crc_header(entry_struct)
@ -279,7 +328,7 @@ class Page(object):
if remaining_size or (tailroom - chunk_size) < Page.SINGLE_ENTRY_SIZE:
if page_header[0:4] != Page.FULL:
page_state_full_seq = Page.FULL
page_header[0:4] = struct.pack('<I', page_state_full_seq)
struct.pack_into('<I', page_header, 0, page_state_full_seq)
nvs_obj.create_new_page()
self = nvs_obj.cur_page
@ -289,7 +338,7 @@ class Page(object):
# All chunks are stored, now store the index
if not remaining_size:
# Initialise data field to 0xff
data_array = bytearray('\xff')*8
data_array = bytearray(b'\xff')*8
entry_struct[24:32] = data_array
# change type of data to BLOB_IDX
@ -302,7 +351,7 @@ class Page(object):
chunk_index = Page.CHUNK_ANY
entry_struct[3] = chunk_index
entry_struct[24:28] = struct.pack('<I', data_size)
struct.pack_into('<I', entry_struct, 24, data_size)
entry_struct[28] = chunk_count
entry_struct[29] = chunk_start
@ -318,9 +367,17 @@ class Page(object):
def write_single_page_entry(self, entry_struct, data, datalen, data_entry_count, nvs_obj):
# compute CRC of data
entry_struct[24:26] = struct.pack('<H', datalen)
crc = zlib.crc32(data, 0xFFFFFFFF)
entry_struct[28:32] = struct.pack('<I', crc & 0xFFFFFFFF)
struct.pack_into('<H', entry_struct, 24, datalen)
if sys.version_info[0] < 3:
crc = zlib.crc32(data, 0xFFFFFFFF)
else:
if (type(data)) == str:
data = data.encode('utf8')
crc = zlib.crc32(data, 0xFFFFFFFF)
else:
crc = zlib.crc32(data, 0xFFFFFFFF)
struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF)
# compute crc of entry header
entry_struct = self.set_crc_header(entry_struct)
@ -359,7 +416,7 @@ class Page(object):
raise PageFullError()
# Entry header
entry_struct = bytearray('\xff')*32
entry_struct = bytearray(b'\xff')*32
# Set Namespace Index
entry_struct[0] = ns_index
# Set Span
@ -373,9 +430,12 @@ class Page(object):
entry_struct[2] = data_entry_count + 1
# set key
key_array = bytearray('\x00')*16
key_array = bytearray(b'\x00')*16
entry_struct[8:24] = key_array
entry_struct[8:8 + len(key)] = key
if sys.version_info[0] < 3:
entry_struct[8:8 + len(key)] = key
else:
entry_struct[8:8 + len(key)] = key.encode('utf8')
# set Type
if encoding == "string":
@ -397,39 +457,47 @@ class Page(object):
if self.entry_num >= Page.PAGE_PARAMS["max_entries"]:
raise PageFullError()
entry_struct = bytearray('\xff')*32
entry_struct = bytearray(b'\xff')*32
entry_struct[0] = ns_index # namespace index
entry_struct[2] = 0x01 # Span
chunk_index = Page.CHUNK_ANY
entry_struct[3] = chunk_index
# write key
key_array = bytearray('\x00')*16
key_array = bytearray(b'\x00')*16
entry_struct[8:24] = key_array
entry_struct[8:8 + len(key)] = key
if sys.version_info[0] < 3:
entry_struct[8:8 + len(key)] = key
else:
entry_struct[8:8 + len(key)] = key.encode('utf8')
if encoding == "u8":
entry_struct[1] = Page.U8
entry_struct[24] = struct.pack('<B', data)
struct.pack_into('<B', entry_struct, 24, data)
elif encoding == "i8":
entry_struct[1] = Page.I8
entry_struct[24] = struct.pack('<b', data)
struct.pack_into('<b', entry_struct, 24, data)
elif encoding == "u16":
entry_struct[1] = Page.U16
entry_struct[24:26] = struct.pack('<H', data)
struct.pack_into('<H', entry_struct, 24, data)
elif encoding == "u32":
entry_struct[1] = Page.U32
entry_struct[24:28] = struct.pack('<I', data)
struct.pack_into('<I', entry_struct, 24, data)
elif encoding == "i32":
entry_struct[1] = Page.I32
entry_struct[24:28] = struct.pack('<i', data)
struct.pack_into('<i', entry_struct, 24, data)
# Compute CRC
crc_data = bytearray(28)
crc_data = bytearray(b'28')
crc_data[0:4] = entry_struct[0:4]
crc_data[4:28] = entry_struct[8:32]
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
entry_struct[4:8] = struct.pack('<I', crc & 0xFFFFFFFF)
if sys.version_info[0] < 3:
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
else:
crc = zlib.crc32(memoryview(crc_data), 0xFFFFFFFF)
struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF)
# write to file
self.write_entry_to_buf(entry_struct, 1,nvs_obj)
@ -478,7 +546,8 @@ class NVS(object):
new_page = Page(self.page_num, is_rsrv_page)
new_page.version = version
new_page.is_encrypt = is_encrypt_data
new_page.encr_key = key_input
if new_page.is_encrypt:
new_page.encr_key = key_input
self.pages.append(new_page)
self.cur_page = new_page
return new_page
@ -590,8 +659,12 @@ def write_entry(nvs_instance, key, datatype, encoding, value):
if os.path.isabs(value) == False:
script_dir = os.path.dirname(__file__)
abs_file_path = os.path.join(script_dir, value)
with open(abs_file_path, 'rb') as f:
value = f.read()
if sys.version_info[0] < 3:
with open(abs_file_path, 'rb') as f:
value = f.read()
else:
with open(abs_file_path, 'r', newline='') as f:
value = f.read()
if datatype == "namespace":
nvs_instance.write_namespace(key)
@ -611,9 +684,9 @@ def nvs_part_gen(input_filename=None, output_filename=None, input_size=None, key
:param input_filename: Name of input file containing data
:param output_filename: Name of output file to store generated binary
:param input_size: Size of partition
:param input_size: Size of partition (must be multiple of 4096)
:param key_gen: Enable encryption key generation in encryption mode
:param encryption_mode: Enable/Disable encryption mode
:param encrypt_mode: Enable/Disable encryption mode
:param key_file: Input file having encryption keys in encryption mode
:return: None
"""
@ -623,16 +696,16 @@ def nvs_part_gen(input_filename=None, output_filename=None, input_size=None, key
is_encrypt_data = encrypt_mode
# Set size
input_size = int(input_size.split('KB')[0]) * 1024
input_size = int(input_size, 16)
if input_size % 4096 !=0:
sys.exit("Size parameter should be a multiple of 4KB.")
sys.exit("Size of partition (must be multiple of 4096)")
if version == 'v1':
version = Page.VERSION1
elif version == 'v2':
version = Page.VERSION2
# Update size as a page needs to be reserved of size 4KB
input_size = input_size - Page.PAGE_PARAMS["max_size"]
@ -662,13 +735,20 @@ def nvs_part_gen(input_filename=None, output_filename=None, input_size=None, key
sys.exit("Invalid. Cannot give --key_file as --encrypt is set to False.")
if key_gen:
key_input = ''.join(random.choice('0123456789abcdef') for _ in xrange(128))
if sys.version_info[0] < 3:
key_input = ''.join(random.choice('0123456789abcdef') for _ in xrange(128))
else:
key_input = ''.join(random.choice('0123456789abcdef') for _ in range(128)).strip()
elif key_file:
with open(key_file) as key_f:
key_input = key_f.readline()
key_input = key_input.strip()
input_file = open(input_filename, 'rb')
if sys.version_info[0] < 3:
input_file = open(input_filename, 'rb')
else:
input_file = open(input_filename, 'r', newline='')
output_file = open(output_filename, 'wb')
with nvs_open(output_file, input_size) as nvs_obj:
@ -688,12 +768,25 @@ def nvs_part_gen(input_filename=None, output_filename=None, input_size=None, key
if is_encrypt_data:
output_keys_file = open("encryption_keys.bin",'wb')
keys_page_buf = bytearray(b'\xff')*Page.PAGE_PARAMS["max_size"]
key_bytes = key_input.decode('hex')
if sys.version_info[0] < 3:
key_bytes = key_input.decode('hex')
else:
key_bytes = bytearray()
key_bytes = codecs.decode(key_input, 'hex')
key_len = len(key_bytes)
keys_page_buf[0:key_len] = key_bytes
crc_data = keys_page_buf[0:key_len]
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
keys_page_buf[64:68] = struct.pack('<I', crc & 0xFFFFFFFF)
if sys.version_info[0] < 3:
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
else:
crc = zlib.crc32(memoryview(crc_data), 0xFFFFFFFF)
struct.pack_into('<I', keys_page_buf, key_len, crc & 0xFFFFFFFF)
output_keys_file.write(keys_page_buf)
@ -712,7 +805,7 @@ def main():
parser.add_argument(
"size",
help='Size of NVS Partition in KB. Eg. 12KB')
help='Size of NVS Partition in hex (must be multiple of 4096). Eg. 0x1000')
parser.add_argument(
"--version",

View file

@ -2003,7 +2003,7 @@ TEST_CASE("check partition generation utility with multipage blob support disabl
"../nvs_partition_generator/nvs_partition_gen.py",
"../nvs_partition_generator/sample_singlepage_blob.csv",
"../nvs_partition_generator/partition_single_page.bin",
"12KB",
"0x3000",
"--version",
"v1",NULL));
} else {
@ -2071,6 +2071,7 @@ TEST_CASE("read data from partition generated via partition generation utility w
CHECK(memcmp(bin_data, binfiledata, bin_len) == 0);
file.close();
nvs_close(handle);
}
@ -2082,7 +2083,7 @@ TEST_CASE("check partition generation utility with multipage blob support enable
"../nvs_partition_generator/nvs_partition_gen.py",
"../nvs_partition_generator/sample_multipage_blob.csv",
"../nvs_partition_generator/partition_multipage_blob.bin",
"12KB",
"0x3000",
"--version",
"v2",NULL));
} else {
@ -2148,8 +2149,9 @@ TEST_CASE("read data from partition generated via partition generation utility w
file.read(binfiledata,5200);
TEST_ESP_OK( nvs_get_blob(handle, "binFileKey", bin_data, &bin_len));
CHECK(memcmp(bin_data, binfiledata, bin_len) == 0);
file.close();
nvs_close(handle);
}
@ -2300,7 +2302,7 @@ TEST_CASE("test nvs apis for nvs partition generator utility with encryption ena
"../nvs_partition_generator/nvs_partition_gen.py",
"../nvs_partition_generator/sample_multipage_blob.csv",
"../nvs_partition_generator/partition_encrypted.bin",
"12KB",
"0x3000",
"--encrypt",
"True",
"--keyfile",
@ -2329,6 +2331,7 @@ TEST_CASE("test nvs apis for nvs partition generator utility with encryption ena
uint8_t u8v;
TEST_ESP_OK( nvs_get_u8(handle, "dummyU8Key", &u8v));
CHECK(u8v == 127);
int8_t i8v;
TEST_ESP_OK( nvs_get_i8(handle, "dummyI8Key", &i8v));
CHECK(i8v == -128);
@ -2380,7 +2383,7 @@ TEST_CASE("test nvs apis for nvs partition generator utility with encryption ena
file.read(binfiledata,5120);
TEST_ESP_OK( nvs_get_blob(handle, "binFileKey", bin_data, &bin_len));
CHECK(memcmp(bin_data, binfiledata, bin_len) == 0);
nvs_close(handle);
TEST_ESP_OK(nvs_flash_deinit());