Merge branch 'test/detect_exception_in_idf_dut_v3.1' into 'release/v3.1'

tiny-test-fw: support detect exception in IDFDUT (backport v3.1)

See merge request idf/esp-idf!4544
This commit is contained in:
Angus Gratton 2019-03-21 07:54:37 +08:00
commit 7313da42b5
6 changed files with 187 additions and 51 deletions

View file

@ -41,19 +41,19 @@ import time
import re import re
import threading import threading
import copy import copy
import sys
import functools import functools
# python2 and python3 queue package name is different
try:
import Queue as _queue
except ImportError:
import queue as _queue
import serial import serial
from serial.tools import list_ports from serial.tools import list_ports
import Utility import Utility
if sys.version_info[0] == 2:
import Queue as _queue
else:
import queue as _queue
class ExpectTimeout(ValueError): class ExpectTimeout(ValueError):
""" timeout for expect method """ """ timeout for expect method """
@ -200,55 +200,62 @@ class _LogThread(threading.Thread, _queue.Queue):
self.flush_data() self.flush_data()
class _RecvThread(threading.Thread): class RecvThread(threading.Thread):
PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n") CHECK_FUNCTIONS = []
""" DUT subclass can define a few check functions to process received data. """
def __init__(self, read, data_cache, recorded_data, record_data_lock): def __init__(self, read, dut):
super(_RecvThread, self).__init__() super(RecvThread, self).__init__()
self.exit_event = threading.Event() self.exit_event = threading.Event()
self.setDaemon(True) self.setDaemon(True)
self.read = read self.read = read
self.data_cache = data_cache self.dut = dut
self.recorded_data = recorded_data self.data_cache = dut.data_cache
self.record_data_lock = record_data_lock self.recorded_data = dut.recorded_data
# cache the last line of recv data for collecting performance self.record_data_lock = dut.record_data_lock
self._line_cache = str() self._line_cache = str()
def collect_performance(self, data): def _line_completion(self, data):
""" collect performance """ """
if data: Usually check functions requires to check for one complete line.
decoded_data = _decode_data(data) This method will do line completion for the first line, and strip incomplete last line.
"""
ret = self._line_cache
decoded_data = _decode_data(data)
matches = self.PERFORMANCE_PATTERN.findall(self._line_cache + decoded_data) # cache incomplete line to later process
for match in matches: lines = decoded_data.splitlines(True)
Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]), last_line = lines[-1]
color="orange")
# cache incomplete line to later process if last_line[-1] != "\n":
lines = decoded_data.splitlines(True) if len(lines) == 1:
last_line = lines[-1] # only one line and the line is not finished, then append this to cache
self._line_cache += lines[-1]
if last_line[-1] != "\n": ret = str()
if len(lines) == 1:
# only one line and the line is not finished, then append this to cache
self._line_cache += lines[-1]
else:
# more than one line and not finished, replace line cache
self._line_cache = lines[-1]
else: else:
# line finishes, flush cache # more than one line and not finished, replace line cache
self._line_cache = str() self._line_cache = lines[-1]
ret += "".join(lines[:-1])
else:
# line finishes, flush cache
self._line_cache = str()
ret += decoded_data
return ret
def run(self): def run(self):
while not self.exit_event.isSet(): while not self.exit_event.isSet():
data = self.read(1000) raw_data = self.read(1000)
if data: if raw_data:
with self.record_data_lock: with self.record_data_lock:
self.data_cache.put(data) self.data_cache.put(raw_data)
for capture_id in self.recorded_data: for capture_id in self.recorded_data:
self.recorded_data[capture_id].put(data) self.recorded_data[capture_id].put(raw_data)
self.collect_performance(data)
# we need to do line completion before call check functions
comp_data = self._line_completion(raw_data)
for check_function in self.CHECK_FUNCTIONS:
check_function(self, comp_data)
def exit(self): def exit(self):
self.exit_event.set() self.exit_event.set()
@ -266,6 +273,9 @@ class BaseDUT(object):
DEFAULT_EXPECT_TIMEOUT = 5 DEFAULT_EXPECT_TIMEOUT = 5
MAX_EXPECT_FAILURES_TO_SAVED = 10 MAX_EXPECT_FAILURES_TO_SAVED = 10
RECV_THREAD_CLS = RecvThread
""" DUT subclass can specify RECV_THREAD_CLS to do add some extra stuff when receive data.
For example, DUT can implement exception detect & analysis logic in receive thread subclass. """
LOG_THREAD = _LogThread() LOG_THREAD = _LogThread()
LOG_THREAD.start() LOG_THREAD.start()
@ -398,8 +408,7 @@ class BaseDUT(object):
:return: None :return: None
""" """
self._port_open() self._port_open()
self.receive_thread = _RecvThread(self._port_read, self.data_cache, self.receive_thread = self.RECV_THREAD_CLS(self._port_read, self)
self.recorded_data, self.record_data_lock)
self.receive_thread.start() self.receive_thread.start()
def close(self): def close(self):

View file

@ -62,7 +62,7 @@ class Env(object):
self.lock = threading.RLock() self.lock = threading.RLock()
@_synced @_synced
def get_dut(self, dut_name, app_path, dut_class=None, app_class=None): def get_dut(self, dut_name, app_path, dut_class=None, app_class=None, **dut_init_args):
""" """
get_dut(dut_name, app_path, dut_class=None, app_class=None) get_dut(dut_name, app_path, dut_class=None, app_class=None)
@ -70,6 +70,7 @@ class Env(object):
:param app_path: application path, app instance will use this path to process application info :param app_path: application path, app instance will use this path to process application info
:param dut_class: dut class, if not specified will use default dut class of env :param dut_class: dut class, if not specified will use default dut class of env
:param app_class: app class, if not specified will use default app of env :param app_class: app class, if not specified will use default app of env
:keyword dut_init_args: extra kwargs used when creating DUT instance
:return: dut instance :return: dut instance
""" """
if dut_name in self.allocated_duts: if dut_name in self.allocated_duts:
@ -97,6 +98,7 @@ class Env(object):
dut_config = self.get_variable(dut_name + "_port_config") dut_config = self.get_variable(dut_name + "_port_config")
except ValueError: except ValueError:
dut_config = dict() dut_config = dict()
dut_config.update(dut_init_args)
dut = self.default_dut_cls(dut_name, port, dut = self.default_dut_cls(dut_name, port,
os.path.join(self.log_path, dut_name + ".log"), os.path.join(self.log_path, dut_name + ".log"),
app_inst, app_inst,
@ -168,11 +170,16 @@ class Env(object):
close all DUTs of the Env. close all DUTs of the Env.
:param dut_debug: if dut_debug is True, then print all dut expect failures before close it :param dut_debug: if dut_debug is True, then print all dut expect failures before close it
:return: None :return: exceptions during close DUT
""" """
dut_close_errors = []
for dut_name in self.allocated_duts: for dut_name in self.allocated_duts:
dut = self.allocated_duts[dut_name]["dut"] dut = self.allocated_duts[dut_name]["dut"]
if dut_debug: if dut_debug:
dut.print_debug_info() dut.print_debug_info()
dut.close() try:
dut.close()
except Exception as e:
dut_close_errors.append(e)
self.allocated_duts = dict() self.allocated_duts = dict()
return dut_close_errors

View file

@ -31,6 +31,7 @@ class IDFApp(App.BaseApp):
super(IDFApp, self).__init__(app_path) super(IDFApp, self).__init__(app_path)
self.idf_path = self.get_sdk_path() self.idf_path = self.get_sdk_path()
self.binary_path = self.get_binary_path(app_path) self.binary_path = self.get_binary_path(app_path)
self.elf_file = self._get_elf_file_path(self.binary_path)
assert os.path.exists(self.binary_path) assert os.path.exists(self.binary_path)
assert self.IDF_DOWNLOAD_CONFIG_FILE in os.listdir(self.binary_path) assert self.IDF_DOWNLOAD_CONFIG_FILE in os.listdir(self.binary_path)
self.esptool, self.partition_tool = self.get_tools() self.esptool, self.partition_tool = self.get_tools()
@ -64,6 +65,15 @@ class IDFApp(App.BaseApp):
""" """
pass pass
@staticmethod
def _get_elf_file_path(binary_path):
ret = ""
file_names = os.listdir(binary_path)
for fn in file_names:
if os.path.splitext(fn)[1] == ".elf":
ret = os.path.join(binary_path, fn)
return ret
def process_arg(self, arg): def process_arg(self, arg):
""" """
process args in download.config. convert to abs path for .bin args. strip spaces and CRLFs. process args in download.config. convert to abs path for .bin args. strip spaces and CRLFs.

View file

@ -19,16 +19,85 @@ import subprocess
import functools import functools
import random import random
import tempfile import tempfile
import subprocess
# python2 and python3 queue package name is different
try:
import Queue as _queue
except ImportError:
import queue as _queue
from serial.tools import list_ports from serial.tools import list_ports
import DUT import DUT
import Utility
class IDFToolError(OSError): class IDFToolError(OSError):
pass pass
class IDFDUTException(RuntimeError):
pass
class IDFRecvThread(DUT.RecvThread):
PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
EXCEPTION_PATTERNS = [
re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))"),
re.compile(r"(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)"),
re.compile(r"(rst 0x\d+ \(TG\dWDT_SYS_RESET|TGWDT_CPU_RESET\))")
]
BACKTRACE_PATTERN = re.compile(r"Backtrace:((\s(0x[0-9a-f]{8}):0x[0-9a-f]{8})+)")
BACKTRACE_ADDRESS_PATTERN = re.compile(r"(0x[0-9a-f]{8}):0x[0-9a-f]{8}")
def __init__(self, read, dut):
super(IDFRecvThread, self).__init__(read, dut)
self.exceptions = _queue.Queue()
def collect_performance(self, comp_data):
matches = self.PERFORMANCE_PATTERN.findall(comp_data)
for match in matches:
Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]),
color="orange")
def detect_exception(self, comp_data):
for pattern in self.EXCEPTION_PATTERNS:
start = 0
while True:
match = pattern.search(comp_data, pos=start)
if match:
start = match.end()
self.exceptions.put(match.group(0))
Utility.console_log("[Exception]: {}".format(match.group(0)), color="red")
else:
break
def detect_backtrace(self, comp_data):
start = 0
while True:
match = self.BACKTRACE_PATTERN.search(comp_data, pos=start)
if match:
start = match.end()
Utility.console_log("[Backtrace]:{}".format(match.group(1)), color="red")
# translate backtrace
addresses = self.BACKTRACE_ADDRESS_PATTERN.findall(match.group(1))
translated_backtrace = ""
for addr in addresses:
ret = self.dut.lookup_pc_address(addr)
if ret:
translated_backtrace += ret + "\n"
if translated_backtrace:
Utility.console_log("Translated backtrace\n:" + translated_backtrace, color="yellow")
else:
Utility.console_log("Failed to translate backtrace", color="yellow")
else:
break
CHECK_FUNCTIONS = [collect_performance, detect_exception, detect_backtrace]
def _tool_method(func): def _tool_method(func):
""" close port, execute tool method and then reopen port """ """ close port, execute tool method and then reopen port """
@functools.wraps(func) @functools.wraps(func)
@ -46,10 +115,13 @@ class IDFDUT(DUT.SerialDUT):
CHIP_TYPE_PATTERN = re.compile(r"Detecting chip type[.:\s]+(.+)") CHIP_TYPE_PATTERN = re.compile(r"Detecting chip type[.:\s]+(.+)")
# if need to erase NVS partition in start app # if need to erase NVS partition in start app
ERASE_NVS = True ERASE_NVS = True
RECV_THREAD_CLS = IDFRecvThread
TOOLCHAIN_PREFIX = "xtensa-esp32-elf-"
def __init__(self, name, port, log_file, app, **kwargs): def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
self.download_config, self.partition_table = app.process_app_info() self.download_config, self.partition_table = app.process_app_info()
super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs) super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
self.allow_dut_exception = allow_dut_exception
@classmethod @classmethod
def get_chip(cls, app, port): def get_chip(cls, app, port):
@ -156,3 +228,31 @@ class IDFDUT(DUT.SerialDUT):
"--before", "default_reset", "--after", "hard_reset", "read_flash", "--before", "default_reset", "--after", "hard_reset", "read_flash",
_address, _size, output_file] _address, _size, output_file]
) )
def lookup_pc_address(self, pc_addr):
cmd = ["%saddr2line" % self.TOOLCHAIN_PREFIX,
"-pfiaC", "-e", self.app.elf_file, pc_addr]
ret = ""
try:
translation = subprocess.check_output(cmd)
ret = translation.decode()
except OSError:
pass
return ret
def get_exceptions(self):
""" Get exceptions detected by DUT receive thread. """
exceptions = []
if self.receive_thread:
while True:
try:
exceptions.append(self.receive_thread.exceptions.get(timeout=0))
except _queue.Empty:
break
return exceptions
def close(self):
super(IDFDUT, self).close()
if not self.allow_dut_exception and self.get_exceptions():
Utility.console_log("DUT exception detected on {}".format(self), color="red")
raise IDFDUTException()

View file

@ -184,10 +184,20 @@ def test_method(**kwargs):
# log failure # log failure
junit_test_case.add_failure_info(str(e) + ":\r\n" + traceback.format_exc()) junit_test_case.add_failure_info(str(e) + ":\r\n" + traceback.format_exc())
finally: finally:
# do close all DUTs, if result is False then print DUT debug info
close_errors = env_inst.close(dut_debug=(not result))
# We have a hook in DUT close, allow DUT to raise error to fail test case.
# For example, we don't allow DUT exception (reset) during test execution.
# We don't want to implement in exception detection in test function logic,
# as we need to add it to every test case.
# We can implement it in DUT receive thread,
# and raise exception in DUT close to fail test case if reset detected.
if close_errors:
for error in close_errors:
junit_test_case.add_failure_info(str(error))
result = False
if not case_info["junit_report_by_case"]: if not case_info["junit_report_by_case"]:
JunitReport.test_case_finish(junit_test_case) JunitReport.test_case_finish(junit_test_case)
# do close all DUTs, if result is False then print DUT debug info
env_inst.close(dut_debug=(not result))
# end case and output result # end case and output result
JunitReport.output_report(junit_file_path) JunitReport.output_report(junit_file_path)

View file

@ -237,7 +237,7 @@ def run_unit_test_cases(env, extra_data):
for ut_config in case_config: for ut_config in case_config:
Utility.console_log("Running unit test for config: " + ut_config, "O") Utility.console_log("Running unit test for config: " + ut_config, "O")
dut = env.get_dut("unit-test-app", app_path=ut_config) dut = env.get_dut("unit-test-app", app_path=ut_config, allow_dut_exception=True)
dut.start_app() dut.start_app()
Utility.console_log("Download finished, start running test cases", "O") Utility.console_log("Download finished, start running test cases", "O")
@ -360,7 +360,7 @@ def get_dut(duts, env, name, ut_config):
if name in duts: if name in duts:
dut = duts[name] dut = duts[name]
else: else:
dut = env.get_dut(name, app_path=ut_config) dut = env.get_dut(name, app_path=ut_config, allow_dut_exception=True)
duts[name] = dut duts[name] = dut
dut.start_app() dut.start_app()
return dut return dut
@ -565,7 +565,7 @@ def run_multiple_stage_cases(env, extra_data):
for ut_config in case_config: for ut_config in case_config:
Utility.console_log("Running unit test for config: " + ut_config, "O") Utility.console_log("Running unit test for config: " + ut_config, "O")
dut = env.get_dut("unit-test-app", app_path=ut_config) dut = env.get_dut("unit-test-app", app_path=ut_config, allow_dut_exception=True)
dut.start_app() dut.start_app()
for one_case in case_config[ut_config]: for one_case in case_config[ut_config]: