diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 72d571b8a..9ebf2d086 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -691,7 +691,9 @@ assign_test: when: always paths: - $LOG_PATH - expire_in: 1 mos + expire_in: 1 week + reports: + junit: $LOG_PATH/*/XUNIT_RESULT.xml variables: TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw" TEST_CASE_PATH: "$CI_PROJECT_DIR/examples" diff --git a/tools/tiny-test-fw/DUT.py b/tools/tiny-test-fw/DUT.py index 0193cb6c7..b782b5e1e 100644 --- a/tools/tiny-test-fw/DUT.py +++ b/tools/tiny-test-fw/DUT.py @@ -204,12 +204,14 @@ class _RecvThread(threading.Thread): PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n") - def __init__(self, read, data_cache): + def __init__(self, read, data_cache, recorded_data, record_data_lock): super(_RecvThread, self).__init__() self.exit_event = threading.Event() self.setDaemon(True) self.read = read self.data_cache = data_cache + self.recorded_data = recorded_data + self.record_data_lock = record_data_lock # cache the last line of recv data for collecting performance self._line_cache = str() @@ -242,7 +244,10 @@ class _RecvThread(threading.Thread): while not self.exit_event.isSet(): data = self.read(1000) if data: - self.data_cache.put(data) + with self.record_data_lock: + self.data_cache.put(data) + for capture_id in self.recorded_data: + self.recorded_data[capture_id].put(data) self.collect_performance(data) def exit(self): @@ -273,6 +278,11 @@ class BaseDUT(object): self.log_file = log_file self.app = app self.data_cache = _DataCache() + # the main process of recorded data are done in receive thread + # but receive thread could be closed in DUT lifetime (tool methods) + # so we keep it in BaseDUT, as their life cycle are same + self.recorded_data = dict() + self.record_data_lock = threading.RLock() self.receive_thread = None self.expect_failures = [] # open and start during init @@ -388,7 +398,8 @@ class BaseDUT(object): :return: None """ self._port_open() - self.receive_thread = _RecvThread(self._port_read, self.data_cache) + self.receive_thread = _RecvThread(self._port_read, self.data_cache, + self.recorded_data, self.record_data_lock) self.receive_thread.start() def close(self): @@ -432,6 +443,42 @@ class BaseDUT(object): self.data_cache.flush(size) return data + def start_capture_raw_data(self, capture_id="default"): + """ + Sometime application want to get DUT raw data and use ``expect`` method at the same time. + Capture methods provides a way to get raw data without affecting ``expect`` or ``read`` method. + + If you call ``start_capture_raw_data`` with same capture id again, it will restart capture on this ID. + + :param capture_id: ID of capture. You can use different IDs to do different captures at the same time. + """ + with self.record_data_lock: + try: + # if start capture on existed ID, we do flush data and restart capture + self.recorded_data[capture_id].flush() + except KeyError: + # otherwise, create new data cache + self.recorded_data[capture_id] = _DataCache() + + def stop_capture_raw_data(self, capture_id="default"): + """ + Stop capture and get raw data. + This method should be used after ``start_capture_raw_data`` on the same capture ID. + + :param capture_id: ID of capture. + :return: captured raw data between start capture and stop capture. + """ + with self.record_data_lock: + try: + ret = self.recorded_data[capture_id].get_data() + self.recorded_data.pop(capture_id) + except KeyError as e: + e.message = "capture_id does not exist. " \ + "You should call start_capture_raw_data with same ID " \ + "before calling stop_capture_raw_data" + raise e + return ret + # expect related methods @staticmethod diff --git a/tools/tiny-test-fw/IDF/__init__.py b/tools/tiny-test-fw/IDF/__init__.py index c7480c43f..0e342e844 100644 --- a/tools/tiny-test-fw/IDF/__init__.py +++ b/tools/tiny-test-fw/IDF/__init__.py @@ -77,7 +77,11 @@ def log_performance(item, value): :param item: performance item name :param value: performance value """ - Utility.console_log("[Performance][{}]: {}".format(item, value), "orange") + performance_msg = "[Performance][{}]: {}".format(item, value) + Utility.console_log(performance_msg, "orange") + # update to junit test report + current_junit_case = TinyFW.JunitReport.get_current_test_case() + current_junit_case.stdout += performance_msg + "\r\n" def check_performance(item, value): diff --git a/tools/tiny-test-fw/TinyFW.py b/tools/tiny-test-fw/TinyFW.py index c475f3824..e9f9289d3 100644 --- a/tools/tiny-test-fw/TinyFW.py +++ b/tools/tiny-test-fw/TinyFW.py @@ -13,14 +13,12 @@ # limitations under the License. """ Interface for test cases. """ -import sys import os import time import traceback -import inspect import functools -import xunitgen +import junit_xml import Env import DUT @@ -28,11 +26,6 @@ import App import Utility -XUNIT_FILE_NAME = "XUNIT_RESULT.xml" -XUNIT_RECEIVER = xunitgen.EventReceiver() -XUNIT_DEFAULT_TEST_SUITE = "test-suite" - - class DefaultEnvConfig(object): """ default test configs. There're 3 places to set configs, priority is (high -> low): @@ -69,40 +62,6 @@ set_default_config = DefaultEnvConfig.set_default_config get_default_config = DefaultEnvConfig.get_default_config -class TestResult(object): - TEST_RESULT = { - "pass": [], - "fail": [], - } - - @classmethod - def get_failed_cases(cls): - """ - :return: failed test cases - """ - return cls.TEST_RESULT["fail"] - - @classmethod - def get_passed_cases(cls): - """ - :return: passed test cases - """ - return cls.TEST_RESULT["pass"] - - @classmethod - def set_result(cls, result, case_name): - """ - :param result: True or False - :param case_name: test case name - :return: None - """ - cls.TEST_RESULT["pass" if result else "fail"].append(case_name) - - -get_failed_cases = TestResult.get_failed_cases -get_passed_cases = TestResult.get_passed_cases - - MANDATORY_INFO = { "execution_time": 1, "env_tag": "default", @@ -111,6 +70,61 @@ MANDATORY_INFO = { } +class JunitReport(object): + # wrapper for junit test report + # TODO: Don't support by multi-thread (although not likely to be used this way). + + JUNIT_FILE_NAME = "XUNIT_RESULT.xml" + JUNIT_DEFAULT_TEST_SUITE = "test-suite" + JUNIT_TEST_SUITE = junit_xml.TestSuite(JUNIT_DEFAULT_TEST_SUITE) + JUNIT_CURRENT_TEST_CASE = None + _TEST_CASE_CREATED_TS = 0 + + @classmethod + def output_report(cls, junit_file_path): + """ Output current test result to file. """ + with open(os.path.join(junit_file_path, cls.JUNIT_FILE_NAME), "w") as f: + cls.JUNIT_TEST_SUITE.to_file(f, [cls.JUNIT_TEST_SUITE], prettyprint=False) + + @classmethod + def get_current_test_case(cls): + """ + By default, the test framework will handle junit test report automatically. + While some test case might want to update some info to test report. + They can use this method to get current test case created by test framework. + + :return: current junit test case instance created by ``JunitTestReport.create_test_case`` + """ + return cls.JUNIT_CURRENT_TEST_CASE + + @classmethod + def test_case_finish(cls, test_case): + """ + Append the test case to test suite so it can be output to file. + Execution time will be automatically updated (compared to ``create_test_case``). + """ + test_case.elapsed_sec = time.time() - cls._TEST_CASE_CREATED_TS + cls.JUNIT_TEST_SUITE.test_cases.append(test_case) + + @classmethod + def create_test_case(cls, name): + """ + Extend ``junit_xml.TestCase`` with: + + 1. save create test case so it can be get by ``get_current_test_case`` + 2. log create timestamp, so ``elapsed_sec`` can be auto updated in ``test_case_finish``. + + :param name: test case name + :return: instance of ``junit_xml.TestCase`` + """ + # set stdout to empty string, so we can always append string to stdout. + # It won't affect output logic. If stdout is empty, it won't be put to report. + test_case = junit_xml.TestCase(name, stdout="") + cls.JUNIT_CURRENT_TEST_CASE = test_case + cls._TEST_CASE_CREATED_TS = time.time() + return test_case + + def test_method(**kwargs): """ decorator for test case function. @@ -124,14 +138,15 @@ def test_method(**kwargs): :keyword env_config_file: test env config file. usually will not set this keyword when define case :keyword test_suite_name: test suite name, used for generating log folder name and adding xunit format test result. usually will not set this keyword when define case + :keyword junit_report_by_case: By default the test fw will handle junit report generation. + In some cases, one test function might test many test cases. + If this flag is set, test case can update junit report by its own. """ def test(test_func): - # get test function file name - frame = inspect.stack() - test_func_file_name = frame[1][1] case_info = MANDATORY_INFO.copy() case_info["name"] = case_info["ID"] = test_func.__name__ + case_info["junit_report_by_case"] = False case_info.update(kwargs) @functools.wraps(test_func) @@ -151,11 +166,12 @@ def test_method(**kwargs): env_config.update(overwrite) env_inst = Env.Env(**env_config) + # prepare for xunit test results - xunit_file = os.path.join(env_inst.app_cls.get_log_folder(env_config["test_suite_name"]), - XUNIT_FILE_NAME) - XUNIT_RECEIVER.begin_case(test_func.__name__, time.time(), test_func_file_name) + junit_file_path = env_inst.app_cls.get_log_folder(env_config["test_suite_name"]) + junit_test_case = JunitReport.create_test_case(case_info["name"]) result = False + try: Utility.console_log("starting running test: " + test_func.__name__, color="green") # execute test function @@ -166,21 +182,20 @@ def test_method(**kwargs): # handle all the exceptions here traceback.print_exc() # log failure - XUNIT_RECEIVER.failure(str(e), test_func_file_name) + junit_test_case.add_failure_info(str(e) + ":\r\n" + traceback.format_exc()) finally: + if not case_info["junit_report_by_case"]: + JunitReport.test_case_finish(junit_test_case) # do close all DUTs, if result is False then print DUT debug info env_inst.close(dut_debug=(not result)) + # end case and output result - XUNIT_RECEIVER.end_case(test_func.__name__, time.time()) - with open(xunit_file, "ab+") as f: - f.write(xunitgen.toxml(XUNIT_RECEIVER.results(), - XUNIT_DEFAULT_TEST_SUITE)) + JunitReport.output_report(junit_file_path) if result: Utility.console_log("Test Succeed: " + test_func.__name__, color="green") else: Utility.console_log(("Test Fail: " + test_func.__name__), color="red") - TestResult.set_result(result, test_func.__name__) return result handle_test.case_info = case_info diff --git a/tools/tiny-test-fw/Utility/CIAssignTest.py b/tools/tiny-test-fw/Utility/CIAssignTest.py index 2df66fe81..9d727b5eb 100644 --- a/tools/tiny-test-fw/Utility/CIAssignTest.py +++ b/tools/tiny-test-fw/Utility/CIAssignTest.py @@ -143,6 +143,7 @@ class AssignTest(object): for job_name in ci_config: if self.CI_TEST_JOB_PATTERN.search(job_name) is not None: job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name)) + job_list.sort(key=lambda x: x["name"]) return job_list def _search_cases(self, test_case_path, case_filter=None): diff --git a/tools/tiny-test-fw/docs/index.rst b/tools/tiny-test-fw/docs/index.rst index b83254cb1..df132bc09 100644 --- a/tools/tiny-test-fw/docs/index.rst +++ b/tools/tiny-test-fw/docs/index.rst @@ -127,7 +127,7 @@ The following 3rd party lib is required: * pyserial * pyyaml - * xunitgen + * junit_xml * netifaces * matplotlib (if use Utility.LineChart) diff --git a/tools/tiny-test-fw/requirements.txt b/tools/tiny-test-fw/requirements.txt new file mode 100644 index 000000000..aa6b53b4b --- /dev/null +++ b/tools/tiny-test-fw/requirements.txt @@ -0,0 +1,5 @@ +pyserial +pyyaml +junit_xml +netifaces +matplotlib diff --git a/tools/unit-test-app/tools/UnitTestParser.py b/tools/unit-test-app/tools/UnitTestParser.py index aa488bc55..781f38734 100644 --- a/tools/unit-test-app/tools/UnitTestParser.py +++ b/tools/unit-test-app/tools/UnitTestParser.py @@ -220,7 +220,7 @@ class Parser(object): config_output_folder = os.path.join(output_folder, config) if os.path.exists(config_output_folder): test_cases.extend(self.parse_test_cases_for_one_config(config_output_folder, config)) - + test_cases.sort(key=lambda x: x["config"] + x["summary"]) self.dump_test_cases(test_cases) diff --git a/tools/unit-test-app/unit_test.py b/tools/unit-test-app/unit_test.py index bf913c278..f7ea874e7 100644 --- a/tools/unit-test-app/unit_test.py +++ b/tools/unit-test-app/unit_test.py @@ -6,7 +6,6 @@ import re import os import sys import time - import threading # if we want to run test case outside `tiny-test-fw` folder, @@ -105,7 +104,112 @@ def format_test_case_config(test_case_data): return case_config -@IDF.idf_unit_test(env_tag="UT_T1_1") +def replace_app_bin(dut, name, new_app_bin): + if new_app_bin is None: + return + search_pattern = '/{}.bin'.format(name) + for i, config in enumerate(dut.download_config): + if config.endswith(search_pattern): + dut.download_config[i] = new_app_bin + Utility.console_log("The replaced application binary is {}".format(new_app_bin), "O") + break + + +def reset_dut(dut): + dut.reset() + # esptool ``run`` cmd takes quite long time. + # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened. + # this could cause checking bootup print failed. + # now use input cmd `-` and check test history to check if DUT is bootup. + # we'll retry this step for a few times in case `dut.reset` returns during DUT bootup (when DUT can't process any command). + for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT): + dut.write("-") + try: + dut.expect("0 Tests 0 Failures 0 Ignored", timeout=TEST_HISTROY_CHECK_TIMEOUT) + break + except ExpectTimeout: + pass + else: + raise AssertationError("Reset {} ({}) failed!".format(dut.name, dut.port)) + + +def run_one_normal_case(dut, one_case, junit_test_case, failed_cases): + + reset_dut(dut) + + dut.start_capture_raw_data() + # run test case + dut.write("\"{}\"".format(one_case["name"])) + dut.expect("Running " + one_case["name"] + "...") + + exception_reset_list = [] + + # we want to set this flag in callbacks (inner functions) + # use list here so we can use append to set this flag + test_finish = list() + + # expect callbacks + def one_case_finish(result): + """ one test finished, let expect loop break and log result """ + test_finish.append(True) + output = dut.stop_capture_raw_data() + if result: + Utility.console_log("Success: " + one_case["name"], color="green") + else: + failed_cases.append(one_case["name"]) + Utility.console_log("Failed: " + one_case["name"], color="red") + junit_test_case.add_failure_info(output) + + def handle_exception_reset(data): + """ + just append data to exception list. + exception list will be checked in ``handle_reset_finish``, once reset finished. + """ + exception_reset_list.append(data[0]) + + def handle_test_finish(data): + """ test finished without reset """ + # in this scenario reset should not happen + assert not exception_reset_list + if int(data[1]): + # case ignored + Utility.console_log("Ignored: " + one_case["name"], color="orange") + junit_test_case.add_skipped_info("ignored") + one_case_finish(not int(data[0])) + + def handle_reset_finish(data): + """ reset happened and reboot finished """ + assert exception_reset_list # reboot but no exception/reset logged. should never happen + result = False + if len(one_case["reset"]) == len(exception_reset_list): + for i, exception in enumerate(exception_reset_list): + if one_case["reset"][i] not in exception: + break + else: + result = True + if not result: + err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"], + exception_reset_list) + Utility.console_log(err_msg, color="orange") + junit_test_case.add_error_info(err_msg) + one_case_finish(result) + + while not test_finish: + try: + dut.expect_any((RESET_PATTERN, handle_exception_reset), + (EXCEPTION_PATTERN, handle_exception_reset), + (ABORT_PATTERN, handle_exception_reset), + (FINISH_PATTERN, handle_test_finish), + (UT_APP_BOOT_UP_DONE, handle_reset_finish), + timeout=one_case["timeout"]) + except ExpectTimeout: + Utility.console_log("Timeout in expect", color="orange") + junit_test_case.add_error_info("timeout") + one_case_finish(False) + break + + +@IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True) def run_unit_test_cases(env, extra_data): """ extra_data can be three types of value @@ -133,82 +237,17 @@ def run_unit_test_cases(env, extra_data): Utility.console_log("Running unit test for config: " + ut_config, "O") dut = env.get_dut("unit-test-app", app_path=ut_config) dut.start_app() + Utility.console_log("Download finished, start running test cases", "O") for one_case in case_config[ut_config]: - dut.reset() - # esptool ``run`` cmd takes quite long time. - # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened. - # this could cause checking bootup print failed. - # now we input cmd `-`, and check either bootup print or test history, - # to determine if DUT is ready to test. - dut.write("-", flush=False) - dut.expect_any(UT_APP_BOOT_UP_DONE, - "0 Tests 0 Failures 0 Ignored", timeout=STARTUP_TIMEOUT) - - # run test case - dut.write("\"{}\"".format(one_case["name"])) - dut.expect("Running " + one_case["name"] + "...") - - exception_reset_list = [] - - # we want to set this flag in callbacks (inner functions) - # use list here so we can use append to set this flag - test_finish = list() - - # expect callbacks - def one_case_finish(result): - """ one test finished, let expect loop break and log result """ - test_finish.append(True) - if result: - Utility.console_log("Success: " + one_case["name"], color="green") - else: - failed_cases.append(one_case["name"]) - Utility.console_log("Failed: " + one_case["name"], color="red") - - def handle_exception_reset(data): - """ - just append data to exception list. - exception list will be checked in ``handle_reset_finish``, once reset finished. - """ - exception_reset_list.append(data[0]) - - def handle_test_finish(data): - """ test finished without reset """ - # in this scenario reset should not happen - assert not exception_reset_list - if int(data[1]): - # case ignored - Utility.console_log("Ignored: " + one_case["name"], color="orange") - one_case_finish(not int(data[0])) - - def handle_reset_finish(data): - """ reset happened and reboot finished """ - assert exception_reset_list # reboot but no exception/reset logged. should never happen - result = False - if len(one_case["reset"]) == len(exception_reset_list): - for i, exception in enumerate(exception_reset_list): - if one_case["reset"][i] not in exception: - break - else: - result = True - if not result: - Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}""" - .format(one_case["reset"], exception_reset_list), - color="orange") - one_case_finish(result) - - while not test_finish: - try: - dut.expect_any((RESET_PATTERN, handle_exception_reset), - (EXCEPTION_PATTERN, handle_exception_reset), - (ABORT_PATTERN, handle_exception_reset), - (FINISH_PATTERN, handle_test_finish), - (UT_APP_BOOT_UP_DONE, handle_reset_finish), - timeout=one_case["timeout"]) - except ExpectTimeout: - Utility.console_log("Timeout in expect", color="orange") - one_case_finish(False) - break + # create junit report test case + junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"])) + try: + run_one_normal_case(dut, one_case, junit_test_case, failed_cases) + TinyFW.JunitReport.test_case_finish(junit_test_case) + except Exception as e: + junit_test_case.add_error_info("Unexpected exception: " + str(e)) + TinyFW.JunitReport.test_case_finish(junit_test_case) # raise exception if any case fails if failed_cases: @@ -233,11 +272,15 @@ class Handler(threading.Thread): self.child_case_index = child_case_index + 1 self.finish = False self.result = False + self.output = "" self.fail_name = None self.timeout = timeout threading.Thread.__init__(self, name="{} Handler".format(dut)) def run(self): + + self.dut.start_capture_raw_data() + def get_child_case_name(data): self.child_case_name = data[0] time.sleep(1) @@ -247,6 +290,8 @@ class Handler(threading.Thread): """ one test finished, let expect loop break and log result """ self.finish = True self.result = result + self.output = "[{}]\n\n{}\n".format(self.child_case_name, + self.dut.stop_capture_raw_data()) if not result: self.fail_name = self.child_case_name @@ -312,7 +357,7 @@ def get_dut(duts, env, name, ut_config): return dut -def case_run(duts, ut_config, env, one_case, failed_cases): +def run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, junit_test_case): lock = threading.RLock() threads = [] send_signal_list = [] @@ -326,19 +371,22 @@ def case_run(duts, ut_config, env, one_case, failed_cases): for thread in threads: thread.setDaemon(True) thread.start() + output = "Multiple Device Failed\n" for thread in threads: thread.join() result = result and thread.result + output += thread.output if not thread.result: failed_device.append(thread.fail_name) if result: Utility.console_log("Success: " + one_case["name"], color="green") else: failed_cases.append(one_case["name"]) + junit_test_case.add_failure_info(output) Utility.console_log("Failed: " + one_case["name"], color="red") -@IDF.idf_unit_test(env_tag="UT_T2_1") +@IDF.idf_unit_test(env_tag="UT_T2_1", junit_report_by_case=True) def run_multiple_devices_cases(env, extra_data): """ extra_data can be two types of value @@ -362,11 +410,17 @@ def run_multiple_devices_cases(env, extra_data): """ failed_cases = [] case_config = format_test_case_config(extra_data) - DUTS = {} + duts = {} for ut_config in case_config: Utility.console_log("Running unit test for config: " + ut_config, "O") for one_case in case_config[ut_config]: - case_run(DUTS, ut_config, env, one_case, failed_cases) + junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"])) + try: + run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, junit_test_case) + TinyFW.JunitReport.test_case_finish(junit_test_case) + except Exception as e: + junit_test_case.add_error_info("Unexpected exception: " + str(e)) + TinyFW.JunitReport.test_case_finish(junit_test_case) if failed_cases: Utility.console_log("Failed Cases:", color="red") @@ -375,7 +429,109 @@ def run_multiple_devices_cases(env, extra_data): raise AssertionError("Unit Test Failed") -@IDF.idf_unit_test(env_tag="UT_T1_1") +def run_one_multiple_stage_case(dut, one_case, failed_cases, junit_test_case): + reset_dut(dut) + + dut.start_capture_raw_data() + + exception_reset_list = [] + + for test_stage in range(one_case["child case num"]): + # select multi stage test case name + dut.write("\"{}\"".format(one_case["name"])) + dut.expect("Running " + one_case["name"] + "...") + # select test function for current stage + dut.write(str(test_stage + 1)) + + # we want to set this flag in callbacks (inner functions) + # use list here so we can use append to set this flag + stage_finish = list() + + def last_stage(): + return test_stage == one_case["child case num"] - 1 + + def check_reset(): + if one_case["reset"]: + assert exception_reset_list # reboot but no exception/reset logged. should never happen + result = False + if len(one_case["reset"]) == len(exception_reset_list): + for i, exception in enumerate(exception_reset_list): + if one_case["reset"][i] not in exception: + break + else: + result = True + if not result: + err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"], + exception_reset_list) + Utility.console_log(err_msg, color="orange") + junit_test_case.add_error_info(err_msg) + else: + # we allow omit reset in multi stage cases + result = True + return result + + # expect callbacks + def one_case_finish(result): + """ one test finished, let expect loop break and log result """ + # handle test finish + result = result and check_reset() + output = dut.stop_capture_raw_data() + if result: + Utility.console_log("Success: " + one_case["name"], color="green") + else: + failed_cases.append(one_case["name"]) + Utility.console_log("Failed: " + one_case["name"], color="red") + junit_test_case.add_failure_info(output) + stage_finish.append("break") + + def handle_exception_reset(data): + """ + just append data to exception list. + exception list will be checked in ``handle_reset_finish``, once reset finished. + """ + exception_reset_list.append(data[0]) + + def handle_test_finish(data): + """ test finished without reset """ + # in this scenario reset should not happen + if int(data[1]): + # case ignored + Utility.console_log("Ignored: " + one_case["name"], color="orange") + junit_test_case.add_skipped_info("ignored") + # only passed in last stage will be regarded as real pass + if last_stage(): + one_case_finish(not int(data[0])) + else: + Utility.console_log("test finished before enter last stage", color="orange") + one_case_finish(False) + + def handle_next_stage(data): + """ reboot finished. we goto next stage """ + if last_stage(): + # already last stage, should never goto next stage + Utility.console_log("didn't finish at last stage", color="orange") + one_case_finish(False) + else: + stage_finish.append("continue") + + while not stage_finish: + try: + dut.expect_any((RESET_PATTERN, handle_exception_reset), + (EXCEPTION_PATTERN, handle_exception_reset), + (ABORT_PATTERN, handle_exception_reset), + (FINISH_PATTERN, handle_test_finish), + (UT_APP_BOOT_UP_DONE, handle_next_stage), + timeout=one_case["timeout"]) + except ExpectTimeout: + Utility.console_log("Timeout in expect", color="orange") + one_case_finish(False) + break + if stage_finish[0] == "break": + # test breaks on current stage + break + + +@IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True) def run_multiple_stage_cases(env, extra_data): """ extra_data can be 2 types of value @@ -400,102 +556,13 @@ def run_multiple_stage_cases(env, extra_data): dut.start_app() for one_case in case_config[ut_config]: - dut.reset() - dut.write("-", flush=False) - dut.expect_any(UT_APP_BOOT_UP_DONE, - "0 Tests 0 Failures 0 Ignored") - - exception_reset_list = [] - - for test_stage in range(one_case["child case num"]): - # select multi stage test case name - dut.write("\"{}\"".format(one_case["name"])) - dut.expect("Running " + one_case["name"] + "...") - # select test function for current stage - dut.write(str(test_stage + 1)) - - # we want to set this flag in callbacks (inner functions) - # use list here so we can use append to set this flag - stage_finish = list() - - def last_stage(): - return test_stage == one_case["child case num"] - 1 - - def check_reset(): - if one_case["reset"]: - assert exception_reset_list # reboot but no exception/reset logged. should never happen - result = False - if len(one_case["reset"]) == len(exception_reset_list): - for i, exception in enumerate(exception_reset_list): - if one_case["reset"][i] not in exception: - break - else: - result = True - if not result: - Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}""" - .format(one_case["reset"], exception_reset_list), - color="orange") - else: - # we allow omit reset in multi stage cases - result = True - return result - - # expect callbacks - def one_case_finish(result): - """ one test finished, let expect loop break and log result """ - # handle test finish - result = result and check_reset() - if result: - Utility.console_log("Success: " + one_case["name"], color="green") - else: - failed_cases.append(one_case["name"]) - Utility.console_log("Failed: " + one_case["name"], color="red") - stage_finish.append("break") - - def handle_exception_reset(data): - """ - just append data to exception list. - exception list will be checked in ``handle_reset_finish``, once reset finished. - """ - exception_reset_list.append(data[0]) - - def handle_test_finish(data): - """ test finished without reset """ - # in this scenario reset should not happen - if int(data[1]): - # case ignored - Utility.console_log("Ignored: " + one_case["name"], color="orange") - # only passed in last stage will be regarded as real pass - if last_stage(): - one_case_finish(not int(data[0])) - else: - Utility.console_log("test finished before enter last stage", color="orange") - one_case_finish(False) - - def handle_next_stage(data): - """ reboot finished. we goto next stage """ - if last_stage(): - # already last stage, should never goto next stage - Utility.console_log("didn't finish at last stage", color="orange") - one_case_finish(False) - else: - stage_finish.append("continue") - - while not stage_finish: - try: - dut.expect_any((RESET_PATTERN, handle_exception_reset), - (EXCEPTION_PATTERN, handle_exception_reset), - (ABORT_PATTERN, handle_exception_reset), - (FINISH_PATTERN, handle_test_finish), - (UT_APP_BOOT_UP_DONE, handle_next_stage), - timeout=one_case["timeout"]) - except ExpectTimeout: - Utility.console_log("Timeout in expect", color="orange") - one_case_finish(False) - break - if stage_finish[0] == "break": - # test breaks on current stage - break + junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"])) + try: + run_one_multiple_stage_case(dut, one_case, failed_cases, junit_test_case) + TinyFW.JunitReport.test_case_finish(junit_test_case) + except Exception as e: + junit_test_case.add_error_info("Unexpected exception: " + str(e)) + TinyFW.JunitReport.test_case_finish(junit_test_case) # raise exception if any case fails if failed_cases: