unit tests: Keep serial port open when running esptool

* Call esptool directly not via subprocess
* Use the same serial port instance for listener thread and esptool
* Includes some refactoring for encapsulation of App vs DUT members
This commit is contained in:
Angus Gratton 2018-12-05 11:13:33 +11:00 committed by Angus Gratton
parent 0a27cfa850
commit f6e857c2b9
5 changed files with 177 additions and 109 deletions

View File

@ -2,6 +2,11 @@
"write_flash_args" : [ "--flash_mode", "${ESPFLASHMODE}",
"--flash_size", "${ESPFLASHSIZE}",
"--flash_freq", "${ESPFLASHFREQ}" ],
"flash_settings" : {
"flash_mode": "${ESPFLASHMODE}",
"flash_size": "${ESPFLASHSIZE}",
"flash_freq": "${ESPFLASHFREQ}"
},
"flash_files" : {
"${BOOTLOADER_OFFSET}" : "bootloader/bootloader.bin",
"${PARTITION_TABLE_OFFSET}" : "partition_table/partition-table.bin",

View File

@ -286,8 +286,8 @@ class BaseDUT(object):
self.record_data_lock = threading.RLock()
self.receive_thread = None
self.expect_failures = []
# open and start during init
self.open()
self._port_open()
self.start_receive()
def __str__(self):
return "DUT({}: {})".format(self.name, str(self.port))
@ -392,27 +392,32 @@ class BaseDUT(object):
pass
# methods that features raw port methods
def open(self):
def start_receive(self):
"""
open port and create thread to receive data.
Start thread to receive data.
:return: None
"""
self._port_open()
self.receive_thread = _RecvThread(self._port_read, self.data_cache,
self.recorded_data, self.record_data_lock)
self.receive_thread.start()
def close(self):
def stop_receive(self):
"""
close receive thread and then close port.
stop the receiving thread for the port
:return: None
"""
if self.receive_thread:
self.receive_thread.exit()
self._port_close()
self.LOG_THREAD.flush_data()
self.receive_thread = None
def close(self):
"""
permanently close the port
"""
self.stop_receive()
self._port_close()
@staticmethod
def u_to_bytearray(data):

View File

@ -16,6 +16,7 @@
import subprocess
import os
import json
import App
@ -26,7 +27,7 @@ class IDFApp(App.BaseApp):
"""
IDF_DOWNLOAD_CONFIG_FILE = "download.config"
IDF_FLASH_ARGS_FILE = "flash_project_args"
IDF_FLASH_ARGS_FILE = "flasher_args.json"
def __init__(self, app_path):
super(IDFApp, self).__init__(app_path)
@ -43,7 +44,8 @@ class IDFApp(App.BaseApp):
self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE)
raise AssertionError(msg)
self.esptool, self.partition_tool = self.get_tools()
self.flash_files, self.flash_settings = self._parse_flash_download_config()
self.partition_table = self._parse_partition_table()
@classmethod
def get_sdk_path(cls):
@ -52,16 +54,6 @@ class IDFApp(App.BaseApp):
assert os.path.exists(idf_path)
return idf_path
@classmethod
def get_tools(cls):
idf_path = cls.get_sdk_path()
# get esptool and partition tool for esp-idf
esptool = os.path.join(idf_path, "components",
"esptool_py", "esptool", "esptool.py")
partition_tool = os.path.join(idf_path, "components",
"partition_table", "gen_esp32part.py")
assert os.path.exists(esptool) and os.path.exists(partition_tool)
return esptool, partition_tool
def get_binary_path(self, app_path):
"""
@ -74,47 +66,64 @@ class IDFApp(App.BaseApp):
"""
pass
def process_arg(self, arg):
def _parse_flash_download_config(self):
"""
process args in download.config. convert to abs path for .bin args. strip spaces and CRLFs.
"""
if ".bin" in arg:
ret = os.path.join(self.binary_path, arg)
else:
ret = arg
return ret.strip("\r\n ")
Parse flash download config from build metadata files
def process_app_info(self):
"""
get app download config and partition info from a specific app path
Sets self.flash_files, self.flash_settings
:return: download config, partition info
(Called from constructor)
Returns (flash_files, flash_settings)
"""
if self.IDF_FLASH_ARGS_FILE in os.listdir(self.binary_path):
# CMake version using build metadata file
with open(os.path.join(self.binary_path, self.IDF_FLASH_ARGS_FILE), "r") as f:
configs = []
for line in f:
line = line.strip()
if len(line) > 0:
configs += line.split()
args = json.load(f)
flash_files = [ (offs,file) for (offs,file) in args["flash_files"].items() if offs != "" ]
flash_settings = args["flash_settings"]
else:
# GNU Make version uses download.config arguments file
with open(os.path.join(self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE), "r") as f:
configs = f.read().split(" ")
args = f.readlines()[-1].split(" ")
flash_files = []
flash_settings = {}
for idx in range(0, len(args), 2): # process arguments in pairs
if args[idx].startswith("--"):
# strip the -- from the command line argument
flash_settings[args[idx][2:]] = args[idx+1]
else:
# offs, filename
flash_files.append( (args[idx], args[idx+1]) )
download_configs = ["--chip", "auto", "--before", "default_reset",
"--after", "hard_reset", "write_flash", "-z"]
download_configs += [self.process_arg(x) for x in configs]
# make file offsets into integers, make paths absolute
flash_files = [ (int(offs, 0), os.path.join(self.binary_path, path.strip())) for (offs, path) in flash_files ]
# handle partition table
for partition_file in download_configs:
if "partition" in partition_file:
partition_file = os.path.join(self.binary_path, partition_file)
return (flash_files, flash_settings)
def _parse_partition_table(self):
"""
Parse partition table contents based on app binaries
Returns partition_table data
(Called from constructor)
"""
partition_tool = os.path.join(self.idf_path,
"components",
"partition_table",
"gen_esp32part.py")
assert os.path.exists(partition_tool)
for (_, path) in self.flash_files:
if "partition" in path:
partition_file = os.path.join(self.binary_path, path)
break
else:
raise ValueError("No partition table found for IDF binary path: {}".format(self.binary_path))
process = subprocess.Popen(["python", self.partition_tool, partition_file],
process = subprocess.Popen(["python", partition_tool, partition_file],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data = process.stdout.read()
if isinstance(raw_data, bytes):
@ -140,7 +149,8 @@ class IDFApp(App.BaseApp):
"size": _size,
"flags": _flags
}
return download_configs, partition_table
return partition_table
class Example(IDFApp):

View File

@ -14,37 +14,64 @@
""" DUT for IDF applications """
import os
import os.path
import sys
import re
import subprocess
import functools
import random
import tempfile
import time
from serial.tools import list_ports
from collections import namedtuple
import DUT
try:
import esptool
except ImportError: # cheat and use IDF's copy of esptool if available
idf_path = os.getenv("IDF_PATH")
if not idf_path or not os.path.exists(idf_path):
raise
sys.path.insert(0, os.path.join(idf_path, "components", "esptool_py", "esptool"))
import esptool
class IDFToolError(OSError):
pass
def _tool_method(func):
""" close port, execute tool method and then reopen port """
def _uses_esptool(func):
""" Suspend listener thread, connect with esptool,
call target function with esptool instance,
then resume listening for output
"""
@functools.wraps(func)
def handler(self, *args, **kwargs):
self.close()
ret = func(self, *args, **kwargs)
self.open()
self.stop_receive()
settings = self.port_inst.get_settings()
rom = esptool.ESP32ROM(self.port_inst)
rom.connect('hard_reset')
esp = rom.run_stub()
ret = func(self, esp, *args, **kwargs)
self.port_inst.apply_settings(settings)
self.start_receive()
return ret
return handler
class IDFDUT(DUT.SerialDUT):
""" IDF DUT, extends serial with ESPTool methods """
""" IDF DUT, extends serial with esptool methods
(Becomes aware of IDFApp instance which holds app-specific data)
"""
CHIP_TYPE_PATTERN = re.compile(r"Detecting chip type[.:\s]+(.+)")
# /dev/ttyAMA0 port is listed in Raspberry Pi
# /dev/tty.Bluetooth-Incoming-Port port is listed in Mac
INVALID_PORT_PATTERN = re.compile(r"AMA|Bluetooth")
@ -52,88 +79,109 @@ class IDFDUT(DUT.SerialDUT):
ERASE_NVS = True
def __init__(self, name, port, log_file, app, **kwargs):
self.download_config, self.partition_table = app.process_app_info()
super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
@classmethod
def get_chip(cls, app, port):
def get_mac(cls, app, port):
"""
get chip id via esptool
get MAC address via esptool
:param app: application instance (to get tool)
:param port: comport
:return: chip ID or None
:param port: serial port as string
:return: MAC address or None
"""
try:
output = subprocess.check_output(["python", app.esptool, "--port", port, "chip_id"])
except subprocess.CalledProcessError:
output = bytes()
if isinstance(output, bytes):
output = output.decode()
chip_type = cls.CHIP_TYPE_PATTERN.search(output)
return chip_type.group(1) if chip_type else None
esp = esptool.ESP32ROM(port)
esp.connect()
return esp.read_mac()
except RuntimeError as e:
return None
finally:
esp._port.close()
@classmethod
def confirm_dut(cls, port, app, **kwargs):
return cls.get_chip(app, port) is not None
return cls.get_mac(app, port) is not None
@_tool_method
def start_app(self, erase_nvs=ERASE_NVS):
@_uses_esptool
def start_app(self, esp, erase_nvs=ERASE_NVS):
"""
download and start app.
:param: erase_nvs: whether erase NVS partition during flash
:return: None
"""
if erase_nvs:
address = self.partition_table["nvs"]["offset"]
size = self.partition_table["nvs"]["size"]
nvs_file = tempfile.NamedTemporaryFile()
nvs_file.write(b'\xff' * size)
nvs_file.flush()
download_config = self.download_config + [address, nvs_file.name]
else:
download_config = self.download_config
flash_files = [ (offs, open(path, "rb")) for (offs, path) in self.app.flash_files ]
if erase_nvs:
address = self.app.partition_table["nvs"]["offset"]
size = self.app.partition_table["nvs"]["size"]
nvs_file = tempfile.TemporaryFile()
nvs_file.write(b'\xff' * size)
nvs_file.seek(0)
flash_files.append( (int(address, 0), nvs_file) )
# fake flasher args object, this is a hack until
# esptool Python API is improved
Flash_Args = namedtuple('write_flash_args',
['flash_size',
'flash_mode',
'flash_freq',
'addr_filename',
'no_stub',
'compress',
'verify',
'encrypt'])
flash_args = Flash_Args(
self.app.flash_settings["flash_size"],
self.app.flash_settings["flash_mode"],
self.app.flash_settings["flash_freq"],
flash_files,
False,
True,
False,
False
)
retry_baud_rates = ["921600", "115200"]
error = IDFToolError()
try:
for baud_rate in retry_baud_rates:
for baud_rate in [ 921600, 115200 ]:
try:
subprocess.check_output(["python", self.app.esptool,
"--port", self.port, "--baud", baud_rate]
+ download_config)
esp.change_baud(baud_rate)
esptool.write_flash(esp, flash_args)
break
except subprocess.CalledProcessError as error:
except RuntimeError:
continue
else:
raise error
raise IDFToolError()
finally:
if erase_nvs:
nvs_file.close()
for (_,f) in flash_files:
f.close()
@_tool_method
def reset(self):
@_uses_esptool
def reset(self, esp):
"""
reset DUT with esptool
hard reset DUT
:return: None
"""
subprocess.check_output(["python", self.app.esptool, "--port", self.port, "run"])
esp.hard_reset()
@_tool_method
def erase_partition(self, partition):
@_uses_esptool
def erase_partition(self, esp, partition):
"""
:param partition: partition name to erase
:return: None
"""
address = self.partition_table[partition]["offset"]
size = self.partition_table[partition]["size"]
raise NotImplementedError() # TODO: implement this
address = self.app.partition_table[partition]["offset"]
size = self.app.partition_table[partition]["size"]
# TODO can use esp.erase_region() instead of this, I think
with open(".erase_partition.tmp", "wb") as f:
f.write(chr(0xFF) * size)
@_tool_method
def dump_flush(self, output_file, **kwargs):
@_uses_esptool
def dump_flush(self, esp, output_file, **kwargs):
"""
dump flush
@ -147,7 +195,7 @@ class IDFDUT(DUT.SerialDUT):
if os.path.isabs(output_file) is False:
output_file = os.path.relpath(output_file, self.app.get_log_folder())
if "partition" in kwargs:
partition = self.partition_table[kwargs["partition"]]
partition = self.app.partition_table[kwargs["partition"]]
_address = partition["offset"]
_size = partition["size"]
elif "address" in kwargs and "size" in kwargs:
@ -155,11 +203,10 @@ class IDFDUT(DUT.SerialDUT):
_size = kwargs["size"]
else:
raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash")
subprocess.check_output(
["python", self.app.esptool, "--port", self.port, "--baud", "921600",
"--before", "default_reset", "--after", "hard_reset", "read_flash",
_address, _size, output_file]
)
content = esp.read_flash(_address, _size)
with open(output_file, "wb") as f:
f.write(content)
@classmethod
def list_available_ports(cls):

View File

@ -122,7 +122,8 @@ Class Diagram
{method} expect_all
{method} read
{method} write
{method} open
{method} start_receive
{method} stop_receive
{method} close
}
class SerialDUT {
@ -137,12 +138,12 @@ Class Diagram
}
class BaseApp {
{method} get_sdk_path
{method} get_tools
{method} process_app_info
{method} get_log_folder
}
class IDFApp {
{method} process_app_info
{field} flash_files
{field} flash_settings
{field} partition_table
}
class Example {
{method} get_binary_path