diff --git a/tools/tiny-test-fw/DUT.py b/tools/tiny-test-fw/DUT.py index 899c7e885..dc6c7f8e6 100644 --- a/tools/tiny-test-fw/DUT.py +++ b/tools/tiny-test-fw/DUT.py @@ -42,19 +42,19 @@ import time import re import threading import copy -import sys import functools +# python2 and python3 queue package name is different +try: + import Queue as _queue +except ImportError: + import queue as _queue + import serial from serial.tools import list_ports import Utility -if sys.version_info[0] == 2: - import Queue as _queue -else: - import queue as _queue - class ExpectTimeout(ValueError): """ timeout for expect method """ @@ -201,55 +201,61 @@ class _LogThread(threading.Thread, _queue.Queue): self.flush_data() -class _RecvThread(threading.Thread): +class RecvThread(threading.Thread): - PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n") + CHECK_FUNCTIONS = [] + """ DUT subclass can define a few check functions to process received data. """ def __init__(self, read, data_cache, recorded_data, record_data_lock): - super(_RecvThread, self).__init__() + super(RecvThread, self).__init__() self.exit_event = threading.Event() self.setDaemon(True) self.read = read self.data_cache = data_cache self.recorded_data = recorded_data self.record_data_lock = record_data_lock - # cache the last line of recv data for collecting performance self._line_cache = str() - def collect_performance(self, data): - """ collect performance """ - if data: - decoded_data = _decode_data(data) + def _line_completion(self, data): + """ + Usually check functions requires to check for one complete line. + This method will do line completion for the first line, and strip incomplete last line. + """ + ret = self._line_cache + decoded_data = _decode_data(data) - matches = self.PERFORMANCE_PATTERN.findall(self._line_cache + decoded_data) - for match in matches: - Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]), - color="orange") + # cache incomplete line to later process + lines = decoded_data.splitlines(True) + last_line = lines[-1] - # cache incomplete line to later process - lines = decoded_data.splitlines(True) - last_line = lines[-1] - - if last_line[-1] != "\n": - if len(lines) == 1: - # only one line and the line is not finished, then append this to cache - self._line_cache += lines[-1] - else: - # more than one line and not finished, replace line cache - self._line_cache = lines[-1] + if last_line[-1] != "\n": + if len(lines) == 1: + # only one line and the line is not finished, then append this to cache + self._line_cache += lines[-1] + ret = str() else: - # line finishes, flush cache - self._line_cache = str() + # more than one line and not finished, replace line cache + self._line_cache = lines[-1] + ret += "".join(lines[:-1]) + else: + # line finishes, flush cache + self._line_cache = str() + ret += decoded_data + return ret def run(self): while not self.exit_event.isSet(): - data = self.read(1000) - if data: + raw_data = self.read(1000) + if raw_data: with self.record_data_lock: - self.data_cache.put(data) + self.data_cache.put(raw_data) for capture_id in self.recorded_data: - self.recorded_data[capture_id].put(data) - self.collect_performance(data) + self.recorded_data[capture_id].put(raw_data) + + # we need to do line completion before call check functions + comp_data = self._line_completion(raw_data) + for check_function in self.CHECK_FUNCTIONS: + check_function(self, comp_data) def exit(self): self.exit_event.set() @@ -267,6 +273,9 @@ class BaseDUT(object): DEFAULT_EXPECT_TIMEOUT = 10 MAX_EXPECT_FAILURES_TO_SAVED = 10 + RECV_THREAD_CLS = RecvThread + """ DUT subclass can specify RECV_THREAD_CLS to do add some extra stuff when receive data. + For example, DUT can implement exception detect & analysis logic in receive thread subclass. """ LOG_THREAD = _LogThread() LOG_THREAD.start() @@ -399,8 +408,8 @@ class BaseDUT(object): :return: None """ self._port_open() - self.receive_thread = _RecvThread(self._port_read, self.data_cache, - self.recorded_data, self.record_data_lock) + self.receive_thread = self.RECV_THREAD_CLS(self._port_read, self.data_cache, + self.recorded_data, self.record_data_lock) self.receive_thread.start() def close(self): @@ -424,9 +433,9 @@ class BaseDUT(object): if type(data) is type(u''): try: data = data.encode('utf-8') - except: + except Exception as e: print(u'Cannot encode {} of type {}'.format(data, type(data))) - raise + raise e return data def write(self, data, eol="\r\n", flush=True): diff --git a/tools/tiny-test-fw/Env.py b/tools/tiny-test-fw/Env.py index b18df2273..3622ba382 100644 --- a/tools/tiny-test-fw/Env.py +++ b/tools/tiny-test-fw/Env.py @@ -62,7 +62,7 @@ class Env(object): self.lock = threading.RLock() @_synced - def get_dut(self, dut_name, app_path, dut_class=None, app_class=None): + def get_dut(self, dut_name, app_path, dut_class=None, app_class=None, **dut_init_args): """ get_dut(dut_name, app_path, dut_class=None, app_class=None) @@ -70,6 +70,7 @@ class Env(object): :param app_path: application path, app instance will use this path to process application info :param dut_class: dut class, if not specified will use default dut class of env :param app_class: app class, if not specified will use default app of env + :keyword dut_init_args: extra kwargs used when creating DUT instance :return: dut instance """ if dut_name in self.allocated_duts: @@ -97,6 +98,7 @@ class Env(object): dut_config = self.get_variable(dut_name + "_port_config") except ValueError: dut_config = dict() + dut_config.update(dut_init_args) dut = self.default_dut_cls(dut_name, port, os.path.join(self.log_path, dut_name + ".log"), app_inst, @@ -168,11 +170,16 @@ class Env(object): close all DUTs of the Env. :param dut_debug: if dut_debug is True, then print all dut expect failures before close it - :return: None + :return: exceptions during close DUT """ + dut_close_errors = [] for dut_name in self.allocated_duts: dut = self.allocated_duts[dut_name]["dut"] if dut_debug: dut.print_debug_info() - dut.close() + try: + dut.close() + except Exception as e: + dut_close_errors.append(e) self.allocated_duts = dict() + return dut_close_errors diff --git a/tools/tiny-test-fw/IDF/IDFDUT.py b/tools/tiny-test-fw/IDF/IDFDUT.py index 1a660931b..8e05df651 100644 --- a/tools/tiny-test-fw/IDF/IDFDUT.py +++ b/tools/tiny-test-fw/IDF/IDFDUT.py @@ -21,15 +21,72 @@ import functools import random import tempfile +# python2 and python3 queue package name is different +try: + import Queue as _queue +except ImportError: + import queue as _queue + from serial.tools import list_ports import DUT +import Utility class IDFToolError(OSError): pass +class IDFDUTException(RuntimeError): + pass + + +class IDFRecvThread(DUT.RecvThread): + + PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n") + EXCEPTION_PATTERNS = [ + re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))"), + re.compile(r"(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)"), + re.compile(r"(rst 0x\d+ \(TG\dWDT_SYS_RESET|TGWDT_CPU_RESET\))") + ] + BACKTRACE_PATTERN = re.compile(r"Backtrace:((\s(0x[0-9a-f]{8}):0x[0-9a-f]{8})+)") + + def __init__(self, read, data_cache, recorded_data, record_data_lock): + super(IDFRecvThread, self).__init__(read, data_cache, recorded_data, record_data_lock) + self.exceptions = _queue.Queue() + + def collect_performance(self, comp_data): + matches = self.PERFORMANCE_PATTERN.findall(comp_data) + for match in matches: + Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]), + color="orange") + + def detect_exception(self, comp_data): + for pattern in self.EXCEPTION_PATTERNS: + start = 0 + while True: + match = pattern.search(comp_data, pos=start) + if match: + start = match.end() + self.exceptions.put(match.group(0)) + Utility.console_log("[Exception]: {}".format(match.group(0)), color="red") + else: + break + + def detect_backtrace(self, comp_data): + # TODO: to support auto parse backtrace + start = 0 + while True: + match = self.BACKTRACE_PATTERN.search(comp_data, pos=start) + if match: + start = match.end() + Utility.console_log("[Backtrace]:{}".format(match.group(1)), color="red") + else: + break + + CHECK_FUNCTIONS = [collect_performance, detect_exception, detect_backtrace] + + def _tool_method(func): """ close port, execute tool method and then reopen port """ @functools.wraps(func) @@ -50,10 +107,12 @@ class IDFDUT(DUT.SerialDUT): INVALID_PORT_PATTERN = re.compile(r"AMA|Bluetooth") # if need to erase NVS partition in start app ERASE_NVS = True + RECV_THREAD_CLS = IDFRecvThread - def __init__(self, name, port, log_file, app, **kwargs): + def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs): self.download_config, self.partition_table = app.process_app_info() super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs) + self.allow_dut_exception = allow_dut_exception @classmethod def get_chip(cls, app, port): @@ -194,3 +253,19 @@ class IDFDUT(DUT.SerialDUT): return [port_hint] + ports return ports + + def get_exceptions(self): + """ Get exceptions detected by DUT receive thread. """ + exceptions = [] + if self.receive_thread: + while True: + try: + exceptions.append(self.receive_thread.exceptions.get(timeout=0)) + except _queue.Empty: + break + return exceptions + + def close(self): + super(IDFDUT, self).close() + if not self.allow_dut_exception and self.get_exceptions(): + raise IDFDUTException() diff --git a/tools/tiny-test-fw/TinyFW.py b/tools/tiny-test-fw/TinyFW.py index e9f9289d3..219851535 100644 --- a/tools/tiny-test-fw/TinyFW.py +++ b/tools/tiny-test-fw/TinyFW.py @@ -184,10 +184,20 @@ def test_method(**kwargs): # log failure junit_test_case.add_failure_info(str(e) + ":\r\n" + traceback.format_exc()) finally: + # do close all DUTs, if result is False then print DUT debug info + close_errors = env_inst.close(dut_debug=(not result)) + # We have a hook in DUT close, allow DUT to raise error to fail test case. + # For example, we don't allow DUT exception (reset) during test execution. + # We don't want to implement in exception detection in test function logic, + # as we need to add it to every test case. + # We can implement it in DUT receive thread, + # and raise exception in DUT close to fail test case if reset detected. + if close_errors: + for error in close_errors: + junit_test_case.add_failure_info(str(error)) + result = False if not case_info["junit_report_by_case"]: JunitReport.test_case_finish(junit_test_case) - # do close all DUTs, if result is False then print DUT debug info - env_inst.close(dut_debug=(not result)) # end case and output result JunitReport.output_report(junit_file_path)