diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index e61c77b84..22466d712 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -183,13 +183,13 @@ build_ssc_02: # If you want to add new build ssc jobs, please add it into dependencies of `assign_test` and `.test_template` -build_esp_idf_tests: + +.build_esp_idf_unit_test_template: &build_esp_idf_unit_test_template <<: *build_template artifacts: paths: - tools/unit-test-app/output - components/idf_test/unit_test/TestCaseAll.yml - - components/idf_test/unit_test/CIConfigs/*.yml expire_in: 2 days only: variables: @@ -197,11 +197,30 @@ build_esp_idf_tests: - $BOT_LABEL_BUILD - $BOT_LABEL_UNIT_TEST - $BOT_LABEL_REGULAR_TEST + +build_esp_idf_tests_make: + <<: *build_esp_idf_unit_test_template script: - - export PATH="$IDF_PATH/tools:$PATH" - - cd $CI_PROJECT_DIR/tools/unit-test-app - export EXTRA_CFLAGS="-Werror -Werror=deprecated-declarations" - export EXTRA_CXXFLAGS=${EXTRA_CFLAGS} + - cd $CI_PROJECT_DIR/tools/unit-test-app + - MAKEFLAGS= make help # make sure kconfig tools are built in single process + - make ut-clean-all-configs + - make ut-build-all-configs + - python tools/UnitTestParser.py + - if [ "$UNIT_TEST_BUILD_SYSTEM" == "make" ]; then exit 0; fi + # If Make, delete the CMake built artifacts + - rm -rf builds output sdkconfig + - rm -rf $CI_PROJECT_DIR/components/idf_test/unit_test/TestCaseAll.yml + - rm -rf $CI_PROJECT_DIR/components/idf_test/unit_test/CIConfigs/*.yml + +build_esp_idf_tests_cmake: + <<: *build_esp_idf_unit_test_template + script: + - export PATH="$IDF_PATH/tools:$PATH" + - export EXTRA_CFLAGS="-Werror -Werror=deprecated-declarations" + - export EXTRA_CXXFLAGS=${EXTRA_CFLAGS} + - cd $CI_PROJECT_DIR/tools/unit-test-app # Build with CMake first - idf.py ut-clean-all-configs - idf.py ut-build-all-configs @@ -210,14 +229,8 @@ build_esp_idf_tests: - if [ "$UNIT_TEST_BUILD_SYSTEM" == "cmake" ]; then exit 0; fi # If Make, delete the CMake built artifacts - rm -rf builds output sdkconfig - - rm -rf components/idf_test/unit_test/TestCaseAll.yml - - rm -rf components/idf_test/unit_test/CIConfigs/*.yml - # Then build with Make - - cd $CI_PROJECT_DIR/tools/unit-test-app - - MAKEFLAGS= make help # make sure kconfig tools are built in single process - - make ut-clean-all-configs - - make ut-build-all-configs - - python tools/UnitTestParser.py + - rm -rf $CI_PROJECT_DIR/components/idf_test/unit_test/TestCaseAll.yml + - rm -rf $CI_PROJECT_DIR/components/idf_test/unit_test/CIConfigs/*.yml .build_examples_make_template: &build_examples_make_template <<: *build_template @@ -752,7 +765,8 @@ assign_test: - build_ssc_00 - build_ssc_01 - build_ssc_02 - - build_esp_idf_tests + - build_esp_idf_tests_make + - build_esp_idf_tests_cmake variables: TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw" EXAMPLE_CONFIG_OUTPUT_PATH: "$CI_PROJECT_DIR/examples/test_configs" @@ -817,6 +831,8 @@ assign_test: paths: - $LOG_PATH expire_in: 1 week + reports: + junit: $LOG_PATH/*/XUNIT_RESULT.xml variables: TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw" TEST_CASE_PATH: "$CI_PROJECT_DIR/examples" @@ -839,7 +855,8 @@ assign_test: stage: unit_test dependencies: - assign_test - - build_esp_idf_tests + - build_esp_idf_tests_make + - build_esp_idf_tests_cmake only: refs: - master diff --git a/tools/tiny-test-fw/DUT.py b/tools/tiny-test-fw/DUT.py index e49a121f3..88894fd49 100644 --- a/tools/tiny-test-fw/DUT.py +++ b/tools/tiny-test-fw/DUT.py @@ -205,12 +205,14 @@ class _RecvThread(threading.Thread): PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n") - def __init__(self, read, data_cache): + def __init__(self, read, data_cache, recorded_data, record_data_lock): super(_RecvThread, self).__init__() self.exit_event = threading.Event() self.setDaemon(True) self.read = read self.data_cache = data_cache + self.recorded_data = recorded_data + self.record_data_lock = record_data_lock # cache the last line of recv data for collecting performance self._line_cache = str() @@ -243,7 +245,10 @@ class _RecvThread(threading.Thread): while not self.exit_event.isSet(): data = self.read(1000) if data: - self.data_cache.put(data) + with self.record_data_lock: + self.data_cache.put(data) + for capture_id in self.recorded_data: + self.recorded_data[capture_id].put(data) self.collect_performance(data) def exit(self): @@ -274,6 +279,11 @@ class BaseDUT(object): self.log_file = log_file self.app = app self.data_cache = _DataCache() + # the main process of recorded data are done in receive thread + # but receive thread could be closed in DUT lifetime (tool methods) + # so we keep it in BaseDUT, as their life cycle are same + self.recorded_data = dict() + self.record_data_lock = threading.RLock() self.receive_thread = None self.expect_failures = [] # open and start during init @@ -389,7 +399,8 @@ class BaseDUT(object): :return: None """ self._port_open() - self.receive_thread = _RecvThread(self._port_read, self.data_cache) + self.receive_thread = _RecvThread(self._port_read, self.data_cache, + self.recorded_data, self.record_data_lock) self.receive_thread.start() def close(self): @@ -448,6 +459,42 @@ class BaseDUT(object): self.data_cache.flush(size) return data + def start_capture_raw_data(self, capture_id="default"): + """ + Sometime application want to get DUT raw data and use ``expect`` method at the same time. + Capture methods provides a way to get raw data without affecting ``expect`` or ``read`` method. + + If you call ``start_capture_raw_data`` with same capture id again, it will restart capture on this ID. + + :param capture_id: ID of capture. You can use different IDs to do different captures at the same time. + """ + with self.record_data_lock: + try: + # if start capture on existed ID, we do flush data and restart capture + self.recorded_data[capture_id].flush() + except KeyError: + # otherwise, create new data cache + self.recorded_data[capture_id] = _DataCache() + + def stop_capture_raw_data(self, capture_id="default"): + """ + Stop capture and get raw data. + This method should be used after ``start_capture_raw_data`` on the same capture ID. + + :param capture_id: ID of capture. + :return: captured raw data between start capture and stop capture. + """ + with self.record_data_lock: + try: + ret = self.recorded_data[capture_id].get_data() + self.recorded_data.pop(capture_id) + except KeyError as e: + e.message = "capture_id does not exist. " \ + "You should call start_capture_raw_data with same ID " \ + "before calling stop_capture_raw_data" + raise e + return ret + # expect related methods @staticmethod diff --git a/tools/tiny-test-fw/IDF/__init__.py b/tools/tiny-test-fw/IDF/__init__.py index c7480c43f..0e342e844 100644 --- a/tools/tiny-test-fw/IDF/__init__.py +++ b/tools/tiny-test-fw/IDF/__init__.py @@ -77,7 +77,11 @@ def log_performance(item, value): :param item: performance item name :param value: performance value """ - Utility.console_log("[Performance][{}]: {}".format(item, value), "orange") + performance_msg = "[Performance][{}]: {}".format(item, value) + Utility.console_log(performance_msg, "orange") + # update to junit test report + current_junit_case = TinyFW.JunitReport.get_current_test_case() + current_junit_case.stdout += performance_msg + "\r\n" def check_performance(item, value): diff --git a/tools/tiny-test-fw/TinyFW.py b/tools/tiny-test-fw/TinyFW.py index c475f3824..e9f9289d3 100644 --- a/tools/tiny-test-fw/TinyFW.py +++ b/tools/tiny-test-fw/TinyFW.py @@ -13,14 +13,12 @@ # limitations under the License. """ Interface for test cases. """ -import sys import os import time import traceback -import inspect import functools -import xunitgen +import junit_xml import Env import DUT @@ -28,11 +26,6 @@ import App import Utility -XUNIT_FILE_NAME = "XUNIT_RESULT.xml" -XUNIT_RECEIVER = xunitgen.EventReceiver() -XUNIT_DEFAULT_TEST_SUITE = "test-suite" - - class DefaultEnvConfig(object): """ default test configs. There're 3 places to set configs, priority is (high -> low): @@ -69,40 +62,6 @@ set_default_config = DefaultEnvConfig.set_default_config get_default_config = DefaultEnvConfig.get_default_config -class TestResult(object): - TEST_RESULT = { - "pass": [], - "fail": [], - } - - @classmethod - def get_failed_cases(cls): - """ - :return: failed test cases - """ - return cls.TEST_RESULT["fail"] - - @classmethod - def get_passed_cases(cls): - """ - :return: passed test cases - """ - return cls.TEST_RESULT["pass"] - - @classmethod - def set_result(cls, result, case_name): - """ - :param result: True or False - :param case_name: test case name - :return: None - """ - cls.TEST_RESULT["pass" if result else "fail"].append(case_name) - - -get_failed_cases = TestResult.get_failed_cases -get_passed_cases = TestResult.get_passed_cases - - MANDATORY_INFO = { "execution_time": 1, "env_tag": "default", @@ -111,6 +70,61 @@ MANDATORY_INFO = { } +class JunitReport(object): + # wrapper for junit test report + # TODO: Don't support by multi-thread (although not likely to be used this way). + + JUNIT_FILE_NAME = "XUNIT_RESULT.xml" + JUNIT_DEFAULT_TEST_SUITE = "test-suite" + JUNIT_TEST_SUITE = junit_xml.TestSuite(JUNIT_DEFAULT_TEST_SUITE) + JUNIT_CURRENT_TEST_CASE = None + _TEST_CASE_CREATED_TS = 0 + + @classmethod + def output_report(cls, junit_file_path): + """ Output current test result to file. """ + with open(os.path.join(junit_file_path, cls.JUNIT_FILE_NAME), "w") as f: + cls.JUNIT_TEST_SUITE.to_file(f, [cls.JUNIT_TEST_SUITE], prettyprint=False) + + @classmethod + def get_current_test_case(cls): + """ + By default, the test framework will handle junit test report automatically. + While some test case might want to update some info to test report. + They can use this method to get current test case created by test framework. + + :return: current junit test case instance created by ``JunitTestReport.create_test_case`` + """ + return cls.JUNIT_CURRENT_TEST_CASE + + @classmethod + def test_case_finish(cls, test_case): + """ + Append the test case to test suite so it can be output to file. + Execution time will be automatically updated (compared to ``create_test_case``). + """ + test_case.elapsed_sec = time.time() - cls._TEST_CASE_CREATED_TS + cls.JUNIT_TEST_SUITE.test_cases.append(test_case) + + @classmethod + def create_test_case(cls, name): + """ + Extend ``junit_xml.TestCase`` with: + + 1. save create test case so it can be get by ``get_current_test_case`` + 2. log create timestamp, so ``elapsed_sec`` can be auto updated in ``test_case_finish``. + + :param name: test case name + :return: instance of ``junit_xml.TestCase`` + """ + # set stdout to empty string, so we can always append string to stdout. + # It won't affect output logic. If stdout is empty, it won't be put to report. + test_case = junit_xml.TestCase(name, stdout="") + cls.JUNIT_CURRENT_TEST_CASE = test_case + cls._TEST_CASE_CREATED_TS = time.time() + return test_case + + def test_method(**kwargs): """ decorator for test case function. @@ -124,14 +138,15 @@ def test_method(**kwargs): :keyword env_config_file: test env config file. usually will not set this keyword when define case :keyword test_suite_name: test suite name, used for generating log folder name and adding xunit format test result. usually will not set this keyword when define case + :keyword junit_report_by_case: By default the test fw will handle junit report generation. + In some cases, one test function might test many test cases. + If this flag is set, test case can update junit report by its own. """ def test(test_func): - # get test function file name - frame = inspect.stack() - test_func_file_name = frame[1][1] case_info = MANDATORY_INFO.copy() case_info["name"] = case_info["ID"] = test_func.__name__ + case_info["junit_report_by_case"] = False case_info.update(kwargs) @functools.wraps(test_func) @@ -151,11 +166,12 @@ def test_method(**kwargs): env_config.update(overwrite) env_inst = Env.Env(**env_config) + # prepare for xunit test results - xunit_file = os.path.join(env_inst.app_cls.get_log_folder(env_config["test_suite_name"]), - XUNIT_FILE_NAME) - XUNIT_RECEIVER.begin_case(test_func.__name__, time.time(), test_func_file_name) + junit_file_path = env_inst.app_cls.get_log_folder(env_config["test_suite_name"]) + junit_test_case = JunitReport.create_test_case(case_info["name"]) result = False + try: Utility.console_log("starting running test: " + test_func.__name__, color="green") # execute test function @@ -166,21 +182,20 @@ def test_method(**kwargs): # handle all the exceptions here traceback.print_exc() # log failure - XUNIT_RECEIVER.failure(str(e), test_func_file_name) + junit_test_case.add_failure_info(str(e) + ":\r\n" + traceback.format_exc()) finally: + if not case_info["junit_report_by_case"]: + JunitReport.test_case_finish(junit_test_case) # do close all DUTs, if result is False then print DUT debug info env_inst.close(dut_debug=(not result)) + # end case and output result - XUNIT_RECEIVER.end_case(test_func.__name__, time.time()) - with open(xunit_file, "ab+") as f: - f.write(xunitgen.toxml(XUNIT_RECEIVER.results(), - XUNIT_DEFAULT_TEST_SUITE)) + JunitReport.output_report(junit_file_path) if result: Utility.console_log("Test Succeed: " + test_func.__name__, color="green") else: Utility.console_log(("Test Fail: " + test_func.__name__), color="red") - TestResult.set_result(result, test_func.__name__) return result handle_test.case_info = case_info diff --git a/tools/tiny-test-fw/Utility/CIAssignTest.py b/tools/tiny-test-fw/Utility/CIAssignTest.py index 2df66fe81..9d727b5eb 100644 --- a/tools/tiny-test-fw/Utility/CIAssignTest.py +++ b/tools/tiny-test-fw/Utility/CIAssignTest.py @@ -143,6 +143,7 @@ class AssignTest(object): for job_name in ci_config: if self.CI_TEST_JOB_PATTERN.search(job_name) is not None: job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name)) + job_list.sort(key=lambda x: x["name"]) return job_list def _search_cases(self, test_case_path, case_filter=None): diff --git a/tools/tiny-test-fw/docs/index.rst b/tools/tiny-test-fw/docs/index.rst index af5115a92..fac61f101 100644 --- a/tools/tiny-test-fw/docs/index.rst +++ b/tools/tiny-test-fw/docs/index.rst @@ -186,7 +186,7 @@ The following 3rd party lib is required: * pyserial * pyyaml - * xunitgen + * junit_xml * netifaces * matplotlib (if use Utility.LineChart) diff --git a/tools/tiny-test-fw/requirements.txt b/tools/tiny-test-fw/requirements.txt index e71c928fd..aa6b53b4b 100644 --- a/tools/tiny-test-fw/requirements.txt +++ b/tools/tiny-test-fw/requirements.txt @@ -1,5 +1,5 @@ pyserial pyyaml -xunitgen +junit_xml netifaces matplotlib diff --git a/tools/unit-test-app/tools/UnitTestParser.py b/tools/unit-test-app/tools/UnitTestParser.py index ec16583eb..b4c11c2a0 100644 --- a/tools/unit-test-app/tools/UnitTestParser.py +++ b/tools/unit-test-app/tools/UnitTestParser.py @@ -254,7 +254,7 @@ class Parser(object): config_output_folder = os.path.join(output_folder, config) if os.path.exists(config_output_folder): test_cases.extend(self.parse_test_cases_for_one_config(config_output_folder, config)) - + test_cases.sort(key=lambda x: x["config"] + x["summary"]) self.dump_test_cases(test_cases) diff --git a/tools/unit-test-app/unit_test.py b/tools/unit-test-app/unit_test.py index 884512a34..1aae0315b 100755 --- a/tools/unit-test-app/unit_test.py +++ b/tools/unit-test-app/unit_test.py @@ -23,7 +23,6 @@ import os import sys import time import argparse - import threading # if we want to run test case outside `tiny-test-fw` folder, @@ -53,7 +52,7 @@ SIMPLE_TEST_ID = 0 MULTI_STAGE_ID = 1 MULTI_DEVICE_ID = 2 -DEFAULT_TIMEOUT=20 +DEFAULT_TIMEOUT = 20 DUT_STARTUP_CHECK_RETRY_COUNT = 5 TEST_HISTROY_CHECK_TIMEOUT = 1 @@ -132,6 +131,7 @@ def format_test_case_config(test_case_data): return case_config + def replace_app_bin(dut, name, new_app_bin): if new_app_bin is None: return @@ -142,13 +142,15 @@ def replace_app_bin(dut, name, new_app_bin): Utility.console_log("The replaced application binary is {}".format(new_app_bin), "O") break + def reset_dut(dut): dut.reset() # esptool ``run`` cmd takes quite long time. # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened. # this could cause checking bootup print failed. # now use input cmd `-` and check test history to check if DUT is bootup. - # we'll retry this step for a few times in case `dut.reset` returns during DUT bootup (when DUT can't process any command). + # we'll retry this step for a few times, + # in case `dut.reset` returns during DUT bootup (when DUT can't process any command). for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT): dut.write("-") try: @@ -157,10 +159,86 @@ def reset_dut(dut): except ExpectTimeout: pass else: - raise AssertationError("Reset {} ({}) failed!".format(dut.name, dut.port)) + raise AssertionError("Reset {} ({}) failed!".format(dut.name, dut.port)) -@IDF.idf_unit_test(env_tag="UT_T1_1") +def run_one_normal_case(dut, one_case, junit_test_case, failed_cases): + + reset_dut(dut) + + dut.start_capture_raw_data() + # run test case + dut.write("\"{}\"".format(one_case["name"])) + dut.expect("Running " + one_case["name"] + "...") + + exception_reset_list = [] + + # we want to set this flag in callbacks (inner functions) + # use list here so we can use append to set this flag + test_finish = list() + + # expect callbacks + def one_case_finish(result): + """ one test finished, let expect loop break and log result """ + test_finish.append(True) + output = dut.stop_capture_raw_data() + if result: + Utility.console_log("Success: " + one_case["name"], color="green") + else: + failed_cases.append(one_case["name"]) + Utility.console_log("Failed: " + one_case["name"], color="red") + junit_test_case.add_failure_info(output) + + def handle_exception_reset(data): + """ + just append data to exception list. + exception list will be checked in ``handle_reset_finish``, once reset finished. + """ + exception_reset_list.append(data[0]) + + def handle_test_finish(data): + """ test finished without reset """ + # in this scenario reset should not happen + assert not exception_reset_list + if int(data[1]): + # case ignored + Utility.console_log("Ignored: " + one_case["name"], color="orange") + junit_test_case.add_skipped_info("ignored") + one_case_finish(not int(data[0])) + + def handle_reset_finish(data): + """ reset happened and reboot finished """ + assert exception_reset_list # reboot but no exception/reset logged. should never happen + result = False + if len(one_case["reset"]) == len(exception_reset_list): + for i, exception in enumerate(exception_reset_list): + if one_case["reset"][i] not in exception: + break + else: + result = True + if not result: + err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"], + exception_reset_list) + Utility.console_log(err_msg, color="orange") + junit_test_case.add_error_info(err_msg) + one_case_finish(result) + + while not test_finish: + try: + dut.expect_any((RESET_PATTERN, handle_exception_reset), + (EXCEPTION_PATTERN, handle_exception_reset), + (ABORT_PATTERN, handle_exception_reset), + (FINISH_PATTERN, handle_test_finish), + (UT_APP_BOOT_UP_DONE, handle_reset_finish), + timeout=one_case["timeout"]) + except ExpectTimeout: + Utility.console_log("Timeout in expect", color="orange") + junit_test_case.add_error_info("timeout") + one_case_finish(False) + break + + +@IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True) def run_unit_test_cases(env, extra_data): """ extra_data can be three types of value @@ -173,6 +251,7 @@ def run_unit_test_cases(env, extra_data): 3. as list of string or dict: [case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...] + :param env: test env instance :param extra_data: the case name or case list or case dictionary :return: None """ @@ -190,74 +269,17 @@ def run_unit_test_cases(env, extra_data): if len(case_config[ut_config]) > 0: replace_app_bin(dut, "unit-test-app", case_config[ut_config][0].get('app_bin')) dut.start_app() + Utility.console_log("Download finished, start running test cases", "O") for one_case in case_config[ut_config]: - reset_dut(dut) - - # run test case - dut.write("\"{}\"".format(one_case["name"])) - dut.expect("Running " + one_case["name"] + "...") - - exception_reset_list = [] - - # we want to set this flag in callbacks (inner functions) - # use list here so we can use append to set this flag - test_finish = list() - - # expect callbacks - def one_case_finish(result): - """ one test finished, let expect loop break and log result """ - test_finish.append(True) - if result: - Utility.console_log("Success: " + one_case["name"], color="green") - else: - failed_cases.append(one_case["name"]) - Utility.console_log("Failed: " + one_case["name"], color="red") - - def handle_exception_reset(data): - """ - just append data to exception list. - exception list will be checked in ``handle_reset_finish``, once reset finished. - """ - exception_reset_list.append(data[0]) - - def handle_test_finish(data): - """ test finished without reset """ - # in this scenario reset should not happen - assert not exception_reset_list - if int(data[1]): - # case ignored - Utility.console_log("Ignored: " + one_case["name"], color="orange") - one_case_finish(not int(data[0])) - - def handle_reset_finish(data): - """ reset happened and reboot finished """ - assert exception_reset_list # reboot but no exception/reset logged. should never happen - result = False - if len(one_case["reset"]) == len(exception_reset_list): - for i, exception in enumerate(exception_reset_list): - if one_case["reset"][i] not in exception: - break - else: - result = True - if not result: - Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}""" - .format(one_case["reset"], exception_reset_list), - color="orange") - one_case_finish(result) - - while not test_finish: - try: - dut.expect_any((RESET_PATTERN, handle_exception_reset), - (EXCEPTION_PATTERN, handle_exception_reset), - (ABORT_PATTERN, handle_exception_reset), - (FINISH_PATTERN, handle_test_finish), - (UT_APP_BOOT_UP_DONE, handle_reset_finish), - timeout=one_case["timeout"]) - except ExpectTimeout: - Utility.console_log("Timeout in expect", color="orange") - one_case_finish(False) - break + # create junit report test case + junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"])) + try: + run_one_normal_case(dut, one_case, junit_test_case, failed_cases) + TinyFW.JunitReport.test_case_finish(junit_test_case) + except Exception as e: + junit_test_case.add_error_info("Unexpected exception: " + str(e)) + TinyFW.JunitReport.test_case_finish(junit_test_case) # raise exception if any case fails if failed_cases: @@ -267,7 +289,6 @@ def run_unit_test_cases(env, extra_data): raise AssertionError("Unit Test Failed") - class Handler(threading.Thread): WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!') @@ -283,6 +304,7 @@ class Handler(threading.Thread): self.child_case_index = child_case_index + 1 self.finish = False self.result = False + self.output = "" self.fail_name = None self.timeout = timeout self.force_stop = threading.Event() # it show the running status @@ -292,6 +314,9 @@ class Handler(threading.Thread): threading.Thread.__init__(self, name="{} Handler".format(dut)) def run(self): + + self.dut.start_capture_raw_data() + def get_child_case_name(data): self.child_case_name = data[0] time.sleep(1) @@ -301,6 +326,8 @@ class Handler(threading.Thread): """ one test finished, let expect loop break and log result """ self.finish = True self.result = result + self.output = "[{}]\n\n{}\n".format(self.child_case_name, + self.dut.stop_capture_raw_data()) if not result: self.fail_name = self.child_case_name @@ -309,7 +336,7 @@ class Handler(threading.Thread): expected_signal = data[0] while 1: if time.time() > start_time + self.timeout: - Utility.console_log("Timeout in device for function: %s"%self.child_case_name, color="orange") + Utility.console_log("Timeout in device for function: %s" % self.child_case_name, color="orange") break with self.lock: if expected_signal in self.sent_signal_list: @@ -330,7 +357,6 @@ class Handler(threading.Thread): Utility.console_log("Ignored: " + self.child_case_name, color="orange") one_device_case_finish(not int(data[0])) - try: time.sleep(1) self.dut.write("\"{}\"".format(self.parent_case_name)) @@ -339,7 +365,8 @@ class Handler(threading.Thread): Utility.console_log("No case detected!", color="orange") while not self.finish and not self.force_stop.isSet(): try: - self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), get_child_case_name), + self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), + get_child_case_name), (self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern (self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern (self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern @@ -366,11 +393,11 @@ def get_dut(duts, env, name, ut_config, app_bin=None): dut = env.get_dut(name, app_path=ut_config) duts[name] = dut replace_app_bin(dut, "unit-test-app", app_bin) - dut.start_app() # download bin to board + dut.start_app() # download bin to board return dut -def case_run(duts, ut_config, env, one_case, failed_cases, app_bin): +def run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, app_bin, junit_test_case): lock = threading.RLock() threads = [] send_signal_list = [] @@ -384,9 +411,11 @@ def case_run(duts, ut_config, env, one_case, failed_cases, app_bin): for thread in threads: thread.setDaemon(True) thread.start() + output = "Multiple Device Failed\n" for thread in threads: thread.join() result = result and thread.result + output += thread.output if not thread.result: [thd.stop() for thd in threads] @@ -394,10 +423,11 @@ def case_run(duts, ut_config, env, one_case, failed_cases, app_bin): Utility.console_log("Success: " + one_case["name"], color="green") else: failed_cases.append(one_case["name"]) + junit_test_case.add_failure_info(output) Utility.console_log("Failed: " + one_case["name"], color="red") -@IDF.idf_unit_test(env_tag="UT_T2_1") +@IDF.idf_unit_test(env_tag="UT_T2_1", junit_report_by_case=True) def run_multiple_devices_cases(env, extra_data): """ extra_data can be two types of value @@ -421,11 +451,18 @@ def run_multiple_devices_cases(env, extra_data): """ failed_cases = [] case_config = format_test_case_config(extra_data) - DUTS = {} + duts = {} for ut_config in case_config: Utility.console_log("Running unit test for config: " + ut_config, "O") for one_case in case_config[ut_config]: - case_run(DUTS, ut_config, env, one_case, failed_cases, one_case.get('app_bin')) + junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"])) + try: + run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, + one_case.get('app_bin'), junit_test_case) + TinyFW.JunitReport.test_case_finish(junit_test_case) + except Exception as e: + junit_test_case.add_error_info("Unexpected exception: " + str(e)) + TinyFW.JunitReport.test_case_finish(junit_test_case) if failed_cases: Utility.console_log("Failed Cases:", color="red") @@ -434,7 +471,109 @@ def run_multiple_devices_cases(env, extra_data): raise AssertionError("Unit Test Failed") -@IDF.idf_unit_test(env_tag="UT_T1_1") +def run_one_multiple_stage_case(dut, one_case, failed_cases, junit_test_case): + reset_dut(dut) + + dut.start_capture_raw_data() + + exception_reset_list = [] + + for test_stage in range(one_case["child case num"]): + # select multi stage test case name + dut.write("\"{}\"".format(one_case["name"])) + dut.expect("Running " + one_case["name"] + "...") + # select test function for current stage + dut.write(str(test_stage + 1)) + + # we want to set this flag in callbacks (inner functions) + # use list here so we can use append to set this flag + stage_finish = list() + + def last_stage(): + return test_stage == one_case["child case num"] - 1 + + def check_reset(): + if one_case["reset"]: + assert exception_reset_list # reboot but no exception/reset logged. should never happen + result = False + if len(one_case["reset"]) == len(exception_reset_list): + for i, exception in enumerate(exception_reset_list): + if one_case["reset"][i] not in exception: + break + else: + result = True + if not result: + err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"], + exception_reset_list) + Utility.console_log(err_msg, color="orange") + junit_test_case.add_error_info(err_msg) + else: + # we allow omit reset in multi stage cases + result = True + return result + + # expect callbacks + def one_case_finish(result): + """ one test finished, let expect loop break and log result """ + # handle test finish + result = result and check_reset() + output = dut.stop_capture_raw_data() + if result: + Utility.console_log("Success: " + one_case["name"], color="green") + else: + failed_cases.append(one_case["name"]) + Utility.console_log("Failed: " + one_case["name"], color="red") + junit_test_case.add_failure_info(output) + stage_finish.append("break") + + def handle_exception_reset(data): + """ + just append data to exception list. + exception list will be checked in ``handle_reset_finish``, once reset finished. + """ + exception_reset_list.append(data[0]) + + def handle_test_finish(data): + """ test finished without reset """ + # in this scenario reset should not happen + if int(data[1]): + # case ignored + Utility.console_log("Ignored: " + one_case["name"], color="orange") + junit_test_case.add_skipped_info("ignored") + # only passed in last stage will be regarded as real pass + if last_stage(): + one_case_finish(not int(data[0])) + else: + Utility.console_log("test finished before enter last stage", color="orange") + one_case_finish(False) + + def handle_next_stage(data): + """ reboot finished. we goto next stage """ + if last_stage(): + # already last stage, should never goto next stage + Utility.console_log("didn't finish at last stage", color="orange") + one_case_finish(False) + else: + stage_finish.append("continue") + + while not stage_finish: + try: + dut.expect_any((RESET_PATTERN, handle_exception_reset), + (EXCEPTION_PATTERN, handle_exception_reset), + (ABORT_PATTERN, handle_exception_reset), + (FINISH_PATTERN, handle_test_finish), + (UT_APP_BOOT_UP_DONE, handle_next_stage), + timeout=one_case["timeout"]) + except ExpectTimeout: + Utility.console_log("Timeout in expect", color="orange") + one_case_finish(False) + break + if stage_finish[0] == "break": + # test breaks on current stage + break + + +@IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True) def run_multiple_stage_cases(env, extra_data): """ extra_data can be 2 types of value @@ -442,6 +581,7 @@ def run_multiple_stage_cases(env, extra_data): 3. as list of string or dict: [case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...] + :param env: test env instance :param extra_data: the case name or case list or case dictionary :return: None """ @@ -461,98 +601,13 @@ def run_multiple_stage_cases(env, extra_data): dut.start_app() for one_case in case_config[ut_config]: - reset_dut(dut) - exception_reset_list = [] - - for test_stage in range(one_case["child case num"]): - # select multi stage test case name - dut.write("\"{}\"".format(one_case["name"])) - dut.expect("Running " + one_case["name"] + "...") - # select test function for current stage - dut.write(str(test_stage + 1)) - - # we want to set this flag in callbacks (inner functions) - # use list here so we can use append to set this flag - stage_finish = list() - - def last_stage(): - return test_stage == one_case["child case num"] - 1 - - def check_reset(): - if one_case["reset"]: - assert exception_reset_list # reboot but no exception/reset logged. should never happen - result = False - if len(one_case["reset"]) == len(exception_reset_list): - for i, exception in enumerate(exception_reset_list): - if one_case["reset"][i] not in exception: - break - else: - result = True - if not result: - Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}""" - .format(one_case["reset"], exception_reset_list), - color="orange") - else: - # we allow omit reset in multi stage cases - result = True - return result - - # expect callbacks - def one_case_finish(result): - """ one test finished, let expect loop break and log result """ - # handle test finish - result = result and check_reset() - if result: - Utility.console_log("Success: " + one_case["name"], color="green") - else: - failed_cases.append(one_case["name"]) - Utility.console_log("Failed: " + one_case["name"], color="red") - stage_finish.append("break") - - def handle_exception_reset(data): - """ - just append data to exception list. - exception list will be checked in ``handle_reset_finish``, once reset finished. - """ - exception_reset_list.append(data[0]) - - def handle_test_finish(data): - """ test finished without reset """ - # in this scenario reset should not happen - if int(data[1]): - # case ignored - Utility.console_log("Ignored: " + one_case["name"], color="orange") - # only passed in last stage will be regarded as real pass - if last_stage(): - one_case_finish(not int(data[0])) - else: - Utility.console_log("test finished before enter last stage", color="orange") - one_case_finish(False) - - def handle_next_stage(data): - """ reboot finished. we goto next stage """ - if last_stage(): - # already last stage, should never goto next stage - Utility.console_log("didn't finish at last stage", color="orange") - one_case_finish(False) - else: - stage_finish.append("continue") - - while not stage_finish: - try: - dut.expect_any((RESET_PATTERN, handle_exception_reset), - (EXCEPTION_PATTERN, handle_exception_reset), - (ABORT_PATTERN, handle_exception_reset), - (FINISH_PATTERN, handle_test_finish), - (UT_APP_BOOT_UP_DONE, handle_next_stage), - timeout=one_case["timeout"]) - except ExpectTimeout: - Utility.console_log("Timeout in expect", color="orange") - one_case_finish(False) - break - if stage_finish[0] == "break": - # test breaks on current stage - break + junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"])) + try: + run_one_multiple_stage_case(dut, one_case, failed_cases, junit_test_case) + TinyFW.JunitReport.test_case_finish(junit_test_case) + except Exception as e: + junit_test_case.add_error_info("Unexpected exception: " + str(e)) + TinyFW.JunitReport.test_case_finish(junit_test_case) # raise exception if any case fails if failed_cases: @@ -561,6 +616,7 @@ def run_multiple_stage_cases(env, extra_data): Utility.console_log("\t" + _case_name, color="red") raise AssertionError("Unit Test Failed") + def detect_update_unit_test_info(env, extra_data, app_bin): case_config = format_test_case_config(extra_data) @@ -576,14 +632,14 @@ def detect_update_unit_test_info(env, extra_data, app_bin): dut.write("") dut.expect("Here's the test menu, pick your combo:", timeout=DEFAULT_TIMEOUT) - def find_update_dic(name, t, timeout, child_case_num=None): - for dic in extra_data: - if dic['name'] == name: - dic['type'] = t - if 'timeout' not in dic: - dic['timeout'] = timeout + def find_update_dic(name, _t, _timeout, child_case_num=None): + for _case_data in extra_data: + if _case_data['name'] == name: + _case_data['type'] = _t + if 'timeout' not in _case_data: + _case_data['timeout'] = _timeout if child_case_num: - dic['child case num'] = child_case_num + _case_data['child case num'] = child_case_num try: while True: @@ -613,9 +669,9 @@ def detect_update_unit_test_info(env, extra_data, app_bin): if data[1] and re.search(END_LIST_STR, data[1]): break # check if the unit test case names are correct, i.e. they could be found in the device - for dic in extra_data: - if 'type' not in dic: - raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(dic.get('name'))) + for _dic in extra_data: + if 'type' not in _dic: + raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(_dic.get('name'))) except ExpectTimeout: Utility.console_log("Timeout during getting the test list", color="red") finally: @@ -624,6 +680,7 @@ def detect_update_unit_test_info(env, extra_data, app_bin): # These options are the same for all configs, therefore there is no need to continue break + if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument( @@ -633,13 +690,13 @@ if __name__ == '__main__': default=1 ) parser.add_argument("--env_config_file", "-e", - help="test env config file", - default=None - ) + help="test env config file", + default=None + ) parser.add_argument("--app_bin", "-b", - help="application binary file for flashing the chip", - default=None - ) + help="application binary file for flashing the chip", + default=None + ) parser.add_argument( 'test', help='Comma separated list of