Merge branch 'test/enhanced_junit_test_result_v3.2' into 'release/v3.2'

test: enhanced junit test result (backport v3.2)

See merge request idf/esp-idf!3794
This commit is contained in:
Angus Gratton 2018-11-30 11:39:31 +08:00
commit 7658b8ab62
9 changed files with 409 additions and 268 deletions

View file

@ -183,13 +183,13 @@ build_ssc_02:
# If you want to add new build ssc jobs, please add it into dependencies of `assign_test` and `.test_template` # If you want to add new build ssc jobs, please add it into dependencies of `assign_test` and `.test_template`
build_esp_idf_tests:
.build_esp_idf_unit_test_template: &build_esp_idf_unit_test_template
<<: *build_template <<: *build_template
artifacts: artifacts:
paths: paths:
- tools/unit-test-app/output - tools/unit-test-app/output
- components/idf_test/unit_test/TestCaseAll.yml - components/idf_test/unit_test/TestCaseAll.yml
- components/idf_test/unit_test/CIConfigs/*.yml
expire_in: 2 days expire_in: 2 days
only: only:
variables: variables:
@ -197,11 +197,30 @@ build_esp_idf_tests:
- $BOT_LABEL_BUILD - $BOT_LABEL_BUILD
- $BOT_LABEL_UNIT_TEST - $BOT_LABEL_UNIT_TEST
- $BOT_LABEL_REGULAR_TEST - $BOT_LABEL_REGULAR_TEST
build_esp_idf_tests_make:
<<: *build_esp_idf_unit_test_template
script: script:
- export PATH="$IDF_PATH/tools:$PATH"
- cd $CI_PROJECT_DIR/tools/unit-test-app
- export EXTRA_CFLAGS="-Werror -Werror=deprecated-declarations" - export EXTRA_CFLAGS="-Werror -Werror=deprecated-declarations"
- export EXTRA_CXXFLAGS=${EXTRA_CFLAGS} - export EXTRA_CXXFLAGS=${EXTRA_CFLAGS}
- cd $CI_PROJECT_DIR/tools/unit-test-app
- MAKEFLAGS= make help # make sure kconfig tools are built in single process
- make ut-clean-all-configs
- make ut-build-all-configs
- python tools/UnitTestParser.py
- if [ "$UNIT_TEST_BUILD_SYSTEM" == "make" ]; then exit 0; fi
# If Make, delete the CMake built artifacts
- rm -rf builds output sdkconfig
- rm -rf $CI_PROJECT_DIR/components/idf_test/unit_test/TestCaseAll.yml
- rm -rf $CI_PROJECT_DIR/components/idf_test/unit_test/CIConfigs/*.yml
build_esp_idf_tests_cmake:
<<: *build_esp_idf_unit_test_template
script:
- export PATH="$IDF_PATH/tools:$PATH"
- export EXTRA_CFLAGS="-Werror -Werror=deprecated-declarations"
- export EXTRA_CXXFLAGS=${EXTRA_CFLAGS}
- cd $CI_PROJECT_DIR/tools/unit-test-app
# Build with CMake first # Build with CMake first
- idf.py ut-clean-all-configs - idf.py ut-clean-all-configs
- idf.py ut-build-all-configs - idf.py ut-build-all-configs
@ -210,14 +229,8 @@ build_esp_idf_tests:
- if [ "$UNIT_TEST_BUILD_SYSTEM" == "cmake" ]; then exit 0; fi - if [ "$UNIT_TEST_BUILD_SYSTEM" == "cmake" ]; then exit 0; fi
# If Make, delete the CMake built artifacts # If Make, delete the CMake built artifacts
- rm -rf builds output sdkconfig - rm -rf builds output sdkconfig
- rm -rf components/idf_test/unit_test/TestCaseAll.yml - rm -rf $CI_PROJECT_DIR/components/idf_test/unit_test/TestCaseAll.yml
- rm -rf components/idf_test/unit_test/CIConfigs/*.yml - rm -rf $CI_PROJECT_DIR/components/idf_test/unit_test/CIConfigs/*.yml
# Then build with Make
- cd $CI_PROJECT_DIR/tools/unit-test-app
- MAKEFLAGS= make help # make sure kconfig tools are built in single process
- make ut-clean-all-configs
- make ut-build-all-configs
- python tools/UnitTestParser.py
.build_examples_make_template: &build_examples_make_template .build_examples_make_template: &build_examples_make_template
<<: *build_template <<: *build_template
@ -752,7 +765,8 @@ assign_test:
- build_ssc_00 - build_ssc_00
- build_ssc_01 - build_ssc_01
- build_ssc_02 - build_ssc_02
- build_esp_idf_tests - build_esp_idf_tests_make
- build_esp_idf_tests_cmake
variables: variables:
TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw" TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw"
EXAMPLE_CONFIG_OUTPUT_PATH: "$CI_PROJECT_DIR/examples/test_configs" EXAMPLE_CONFIG_OUTPUT_PATH: "$CI_PROJECT_DIR/examples/test_configs"
@ -817,6 +831,8 @@ assign_test:
paths: paths:
- $LOG_PATH - $LOG_PATH
expire_in: 1 week expire_in: 1 week
reports:
junit: $LOG_PATH/*/XUNIT_RESULT.xml
variables: variables:
TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw" TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw"
TEST_CASE_PATH: "$CI_PROJECT_DIR/examples" TEST_CASE_PATH: "$CI_PROJECT_DIR/examples"
@ -839,7 +855,8 @@ assign_test:
stage: unit_test stage: unit_test
dependencies: dependencies:
- assign_test - assign_test
- build_esp_idf_tests - build_esp_idf_tests_make
- build_esp_idf_tests_cmake
only: only:
refs: refs:
- master - master

View file

@ -205,12 +205,14 @@ class _RecvThread(threading.Thread):
PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n") PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
def __init__(self, read, data_cache): def __init__(self, read, data_cache, recorded_data, record_data_lock):
super(_RecvThread, self).__init__() super(_RecvThread, self).__init__()
self.exit_event = threading.Event() self.exit_event = threading.Event()
self.setDaemon(True) self.setDaemon(True)
self.read = read self.read = read
self.data_cache = data_cache self.data_cache = data_cache
self.recorded_data = recorded_data
self.record_data_lock = record_data_lock
# cache the last line of recv data for collecting performance # cache the last line of recv data for collecting performance
self._line_cache = str() self._line_cache = str()
@ -243,7 +245,10 @@ class _RecvThread(threading.Thread):
while not self.exit_event.isSet(): while not self.exit_event.isSet():
data = self.read(1000) data = self.read(1000)
if data: if data:
self.data_cache.put(data) with self.record_data_lock:
self.data_cache.put(data)
for capture_id in self.recorded_data:
self.recorded_data[capture_id].put(data)
self.collect_performance(data) self.collect_performance(data)
def exit(self): def exit(self):
@ -274,6 +279,11 @@ class BaseDUT(object):
self.log_file = log_file self.log_file = log_file
self.app = app self.app = app
self.data_cache = _DataCache() self.data_cache = _DataCache()
# the main process of recorded data are done in receive thread
# but receive thread could be closed in DUT lifetime (tool methods)
# so we keep it in BaseDUT, as their life cycle are same
self.recorded_data = dict()
self.record_data_lock = threading.RLock()
self.receive_thread = None self.receive_thread = None
self.expect_failures = [] self.expect_failures = []
# open and start during init # open and start during init
@ -389,7 +399,8 @@ class BaseDUT(object):
:return: None :return: None
""" """
self._port_open() self._port_open()
self.receive_thread = _RecvThread(self._port_read, self.data_cache) self.receive_thread = _RecvThread(self._port_read, self.data_cache,
self.recorded_data, self.record_data_lock)
self.receive_thread.start() self.receive_thread.start()
def close(self): def close(self):
@ -448,6 +459,42 @@ class BaseDUT(object):
self.data_cache.flush(size) self.data_cache.flush(size)
return data return data
def start_capture_raw_data(self, capture_id="default"):
"""
Sometime application want to get DUT raw data and use ``expect`` method at the same time.
Capture methods provides a way to get raw data without affecting ``expect`` or ``read`` method.
If you call ``start_capture_raw_data`` with same capture id again, it will restart capture on this ID.
:param capture_id: ID of capture. You can use different IDs to do different captures at the same time.
"""
with self.record_data_lock:
try:
# if start capture on existed ID, we do flush data and restart capture
self.recorded_data[capture_id].flush()
except KeyError:
# otherwise, create new data cache
self.recorded_data[capture_id] = _DataCache()
def stop_capture_raw_data(self, capture_id="default"):
"""
Stop capture and get raw data.
This method should be used after ``start_capture_raw_data`` on the same capture ID.
:param capture_id: ID of capture.
:return: captured raw data between start capture and stop capture.
"""
with self.record_data_lock:
try:
ret = self.recorded_data[capture_id].get_data()
self.recorded_data.pop(capture_id)
except KeyError as e:
e.message = "capture_id does not exist. " \
"You should call start_capture_raw_data with same ID " \
"before calling stop_capture_raw_data"
raise e
return ret
# expect related methods # expect related methods
@staticmethod @staticmethod

View file

@ -77,7 +77,11 @@ def log_performance(item, value):
:param item: performance item name :param item: performance item name
:param value: performance value :param value: performance value
""" """
Utility.console_log("[Performance][{}]: {}".format(item, value), "orange") performance_msg = "[Performance][{}]: {}".format(item, value)
Utility.console_log(performance_msg, "orange")
# update to junit test report
current_junit_case = TinyFW.JunitReport.get_current_test_case()
current_junit_case.stdout += performance_msg + "\r\n"
def check_performance(item, value): def check_performance(item, value):

View file

@ -13,14 +13,12 @@
# limitations under the License. # limitations under the License.
""" Interface for test cases. """ """ Interface for test cases. """
import sys
import os import os
import time import time
import traceback import traceback
import inspect
import functools import functools
import xunitgen import junit_xml
import Env import Env
import DUT import DUT
@ -28,11 +26,6 @@ import App
import Utility import Utility
XUNIT_FILE_NAME = "XUNIT_RESULT.xml"
XUNIT_RECEIVER = xunitgen.EventReceiver()
XUNIT_DEFAULT_TEST_SUITE = "test-suite"
class DefaultEnvConfig(object): class DefaultEnvConfig(object):
""" """
default test configs. There're 3 places to set configs, priority is (high -> low): default test configs. There're 3 places to set configs, priority is (high -> low):
@ -69,40 +62,6 @@ set_default_config = DefaultEnvConfig.set_default_config
get_default_config = DefaultEnvConfig.get_default_config get_default_config = DefaultEnvConfig.get_default_config
class TestResult(object):
TEST_RESULT = {
"pass": [],
"fail": [],
}
@classmethod
def get_failed_cases(cls):
"""
:return: failed test cases
"""
return cls.TEST_RESULT["fail"]
@classmethod
def get_passed_cases(cls):
"""
:return: passed test cases
"""
return cls.TEST_RESULT["pass"]
@classmethod
def set_result(cls, result, case_name):
"""
:param result: True or False
:param case_name: test case name
:return: None
"""
cls.TEST_RESULT["pass" if result else "fail"].append(case_name)
get_failed_cases = TestResult.get_failed_cases
get_passed_cases = TestResult.get_passed_cases
MANDATORY_INFO = { MANDATORY_INFO = {
"execution_time": 1, "execution_time": 1,
"env_tag": "default", "env_tag": "default",
@ -111,6 +70,61 @@ MANDATORY_INFO = {
} }
class JunitReport(object):
# wrapper for junit test report
# TODO: Don't support by multi-thread (although not likely to be used this way).
JUNIT_FILE_NAME = "XUNIT_RESULT.xml"
JUNIT_DEFAULT_TEST_SUITE = "test-suite"
JUNIT_TEST_SUITE = junit_xml.TestSuite(JUNIT_DEFAULT_TEST_SUITE)
JUNIT_CURRENT_TEST_CASE = None
_TEST_CASE_CREATED_TS = 0
@classmethod
def output_report(cls, junit_file_path):
""" Output current test result to file. """
with open(os.path.join(junit_file_path, cls.JUNIT_FILE_NAME), "w") as f:
cls.JUNIT_TEST_SUITE.to_file(f, [cls.JUNIT_TEST_SUITE], prettyprint=False)
@classmethod
def get_current_test_case(cls):
"""
By default, the test framework will handle junit test report automatically.
While some test case might want to update some info to test report.
They can use this method to get current test case created by test framework.
:return: current junit test case instance created by ``JunitTestReport.create_test_case``
"""
return cls.JUNIT_CURRENT_TEST_CASE
@classmethod
def test_case_finish(cls, test_case):
"""
Append the test case to test suite so it can be output to file.
Execution time will be automatically updated (compared to ``create_test_case``).
"""
test_case.elapsed_sec = time.time() - cls._TEST_CASE_CREATED_TS
cls.JUNIT_TEST_SUITE.test_cases.append(test_case)
@classmethod
def create_test_case(cls, name):
"""
Extend ``junit_xml.TestCase`` with:
1. save create test case so it can be get by ``get_current_test_case``
2. log create timestamp, so ``elapsed_sec`` can be auto updated in ``test_case_finish``.
:param name: test case name
:return: instance of ``junit_xml.TestCase``
"""
# set stdout to empty string, so we can always append string to stdout.
# It won't affect output logic. If stdout is empty, it won't be put to report.
test_case = junit_xml.TestCase(name, stdout="")
cls.JUNIT_CURRENT_TEST_CASE = test_case
cls._TEST_CASE_CREATED_TS = time.time()
return test_case
def test_method(**kwargs): def test_method(**kwargs):
""" """
decorator for test case function. decorator for test case function.
@ -124,14 +138,15 @@ def test_method(**kwargs):
:keyword env_config_file: test env config file. usually will not set this keyword when define case :keyword env_config_file: test env config file. usually will not set this keyword when define case
:keyword test_suite_name: test suite name, used for generating log folder name and adding xunit format test result. :keyword test_suite_name: test suite name, used for generating log folder name and adding xunit format test result.
usually will not set this keyword when define case usually will not set this keyword when define case
:keyword junit_report_by_case: By default the test fw will handle junit report generation.
In some cases, one test function might test many test cases.
If this flag is set, test case can update junit report by its own.
""" """
def test(test_func): def test(test_func):
# get test function file name
frame = inspect.stack()
test_func_file_name = frame[1][1]
case_info = MANDATORY_INFO.copy() case_info = MANDATORY_INFO.copy()
case_info["name"] = case_info["ID"] = test_func.__name__ case_info["name"] = case_info["ID"] = test_func.__name__
case_info["junit_report_by_case"] = False
case_info.update(kwargs) case_info.update(kwargs)
@functools.wraps(test_func) @functools.wraps(test_func)
@ -151,11 +166,12 @@ def test_method(**kwargs):
env_config.update(overwrite) env_config.update(overwrite)
env_inst = Env.Env(**env_config) env_inst = Env.Env(**env_config)
# prepare for xunit test results # prepare for xunit test results
xunit_file = os.path.join(env_inst.app_cls.get_log_folder(env_config["test_suite_name"]), junit_file_path = env_inst.app_cls.get_log_folder(env_config["test_suite_name"])
XUNIT_FILE_NAME) junit_test_case = JunitReport.create_test_case(case_info["name"])
XUNIT_RECEIVER.begin_case(test_func.__name__, time.time(), test_func_file_name)
result = False result = False
try: try:
Utility.console_log("starting running test: " + test_func.__name__, color="green") Utility.console_log("starting running test: " + test_func.__name__, color="green")
# execute test function # execute test function
@ -166,21 +182,20 @@ def test_method(**kwargs):
# handle all the exceptions here # handle all the exceptions here
traceback.print_exc() traceback.print_exc()
# log failure # log failure
XUNIT_RECEIVER.failure(str(e), test_func_file_name) junit_test_case.add_failure_info(str(e) + ":\r\n" + traceback.format_exc())
finally: finally:
if not case_info["junit_report_by_case"]:
JunitReport.test_case_finish(junit_test_case)
# do close all DUTs, if result is False then print DUT debug info # do close all DUTs, if result is False then print DUT debug info
env_inst.close(dut_debug=(not result)) env_inst.close(dut_debug=(not result))
# end case and output result # end case and output result
XUNIT_RECEIVER.end_case(test_func.__name__, time.time()) JunitReport.output_report(junit_file_path)
with open(xunit_file, "ab+") as f:
f.write(xunitgen.toxml(XUNIT_RECEIVER.results(),
XUNIT_DEFAULT_TEST_SUITE))
if result: if result:
Utility.console_log("Test Succeed: " + test_func.__name__, color="green") Utility.console_log("Test Succeed: " + test_func.__name__, color="green")
else: else:
Utility.console_log(("Test Fail: " + test_func.__name__), color="red") Utility.console_log(("Test Fail: " + test_func.__name__), color="red")
TestResult.set_result(result, test_func.__name__)
return result return result
handle_test.case_info = case_info handle_test.case_info = case_info

View file

@ -143,6 +143,7 @@ class AssignTest(object):
for job_name in ci_config: for job_name in ci_config:
if self.CI_TEST_JOB_PATTERN.search(job_name) is not None: if self.CI_TEST_JOB_PATTERN.search(job_name) is not None:
job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name)) job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name))
job_list.sort(key=lambda x: x["name"])
return job_list return job_list
def _search_cases(self, test_case_path, case_filter=None): def _search_cases(self, test_case_path, case_filter=None):

View file

@ -186,7 +186,7 @@ The following 3rd party lib is required:
* pyserial * pyserial
* pyyaml * pyyaml
* xunitgen * junit_xml
* netifaces * netifaces
* matplotlib (if use Utility.LineChart) * matplotlib (if use Utility.LineChart)

View file

@ -1,5 +1,5 @@
pyserial pyserial
pyyaml pyyaml
xunitgen junit_xml
netifaces netifaces
matplotlib matplotlib

View file

@ -254,7 +254,7 @@ class Parser(object):
config_output_folder = os.path.join(output_folder, config) config_output_folder = os.path.join(output_folder, config)
if os.path.exists(config_output_folder): if os.path.exists(config_output_folder):
test_cases.extend(self.parse_test_cases_for_one_config(config_output_folder, config)) test_cases.extend(self.parse_test_cases_for_one_config(config_output_folder, config))
test_cases.sort(key=lambda x: x["config"] + x["summary"])
self.dump_test_cases(test_cases) self.dump_test_cases(test_cases)

View file

@ -23,7 +23,6 @@ import os
import sys import sys
import time import time
import argparse import argparse
import threading import threading
# if we want to run test case outside `tiny-test-fw` folder, # if we want to run test case outside `tiny-test-fw` folder,
@ -53,7 +52,7 @@ SIMPLE_TEST_ID = 0
MULTI_STAGE_ID = 1 MULTI_STAGE_ID = 1
MULTI_DEVICE_ID = 2 MULTI_DEVICE_ID = 2
DEFAULT_TIMEOUT=20 DEFAULT_TIMEOUT = 20
DUT_STARTUP_CHECK_RETRY_COUNT = 5 DUT_STARTUP_CHECK_RETRY_COUNT = 5
TEST_HISTROY_CHECK_TIMEOUT = 1 TEST_HISTROY_CHECK_TIMEOUT = 1
@ -132,6 +131,7 @@ def format_test_case_config(test_case_data):
return case_config return case_config
def replace_app_bin(dut, name, new_app_bin): def replace_app_bin(dut, name, new_app_bin):
if new_app_bin is None: if new_app_bin is None:
return return
@ -142,13 +142,15 @@ def replace_app_bin(dut, name, new_app_bin):
Utility.console_log("The replaced application binary is {}".format(new_app_bin), "O") Utility.console_log("The replaced application binary is {}".format(new_app_bin), "O")
break break
def reset_dut(dut): def reset_dut(dut):
dut.reset() dut.reset()
# esptool ``run`` cmd takes quite long time. # esptool ``run`` cmd takes quite long time.
# before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened. # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
# this could cause checking bootup print failed. # this could cause checking bootup print failed.
# now use input cmd `-` and check test history to check if DUT is bootup. # now use input cmd `-` and check test history to check if DUT is bootup.
# we'll retry this step for a few times in case `dut.reset` returns during DUT bootup (when DUT can't process any command). # we'll retry this step for a few times,
# in case `dut.reset` returns during DUT bootup (when DUT can't process any command).
for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT): for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT):
dut.write("-") dut.write("-")
try: try:
@ -157,10 +159,86 @@ def reset_dut(dut):
except ExpectTimeout: except ExpectTimeout:
pass pass
else: else:
raise AssertationError("Reset {} ({}) failed!".format(dut.name, dut.port)) raise AssertionError("Reset {} ({}) failed!".format(dut.name, dut.port))
@IDF.idf_unit_test(env_tag="UT_T1_1") def run_one_normal_case(dut, one_case, junit_test_case, failed_cases):
reset_dut(dut)
dut.start_capture_raw_data()
# run test case
dut.write("\"{}\"".format(one_case["name"]))
dut.expect("Running " + one_case["name"] + "...")
exception_reset_list = []
# we want to set this flag in callbacks (inner functions)
# use list here so we can use append to set this flag
test_finish = list()
# expect callbacks
def one_case_finish(result):
""" one test finished, let expect loop break and log result """
test_finish.append(True)
output = dut.stop_capture_raw_data()
if result:
Utility.console_log("Success: " + one_case["name"], color="green")
else:
failed_cases.append(one_case["name"])
Utility.console_log("Failed: " + one_case["name"], color="red")
junit_test_case.add_failure_info(output)
def handle_exception_reset(data):
"""
just append data to exception list.
exception list will be checked in ``handle_reset_finish``, once reset finished.
"""
exception_reset_list.append(data[0])
def handle_test_finish(data):
""" test finished without reset """
# in this scenario reset should not happen
assert not exception_reset_list
if int(data[1]):
# case ignored
Utility.console_log("Ignored: " + one_case["name"], color="orange")
junit_test_case.add_skipped_info("ignored")
one_case_finish(not int(data[0]))
def handle_reset_finish(data):
""" reset happened and reboot finished """
assert exception_reset_list # reboot but no exception/reset logged. should never happen
result = False
if len(one_case["reset"]) == len(exception_reset_list):
for i, exception in enumerate(exception_reset_list):
if one_case["reset"][i] not in exception:
break
else:
result = True
if not result:
err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"],
exception_reset_list)
Utility.console_log(err_msg, color="orange")
junit_test_case.add_error_info(err_msg)
one_case_finish(result)
while not test_finish:
try:
dut.expect_any((RESET_PATTERN, handle_exception_reset),
(EXCEPTION_PATTERN, handle_exception_reset),
(ABORT_PATTERN, handle_exception_reset),
(FINISH_PATTERN, handle_test_finish),
(UT_APP_BOOT_UP_DONE, handle_reset_finish),
timeout=one_case["timeout"])
except ExpectTimeout:
Utility.console_log("Timeout in expect", color="orange")
junit_test_case.add_error_info("timeout")
one_case_finish(False)
break
@IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True)
def run_unit_test_cases(env, extra_data): def run_unit_test_cases(env, extra_data):
""" """
extra_data can be three types of value extra_data can be three types of value
@ -173,6 +251,7 @@ def run_unit_test_cases(env, extra_data):
3. as list of string or dict: 3. as list of string or dict:
[case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...] [case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
:param env: test env instance
:param extra_data: the case name or case list or case dictionary :param extra_data: the case name or case list or case dictionary
:return: None :return: None
""" """
@ -190,74 +269,17 @@ def run_unit_test_cases(env, extra_data):
if len(case_config[ut_config]) > 0: if len(case_config[ut_config]) > 0:
replace_app_bin(dut, "unit-test-app", case_config[ut_config][0].get('app_bin')) replace_app_bin(dut, "unit-test-app", case_config[ut_config][0].get('app_bin'))
dut.start_app() dut.start_app()
Utility.console_log("Download finished, start running test cases", "O")
for one_case in case_config[ut_config]: for one_case in case_config[ut_config]:
reset_dut(dut) # create junit report test case
junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
# run test case try:
dut.write("\"{}\"".format(one_case["name"])) run_one_normal_case(dut, one_case, junit_test_case, failed_cases)
dut.expect("Running " + one_case["name"] + "...") TinyFW.JunitReport.test_case_finish(junit_test_case)
except Exception as e:
exception_reset_list = [] junit_test_case.add_error_info("Unexpected exception: " + str(e))
TinyFW.JunitReport.test_case_finish(junit_test_case)
# we want to set this flag in callbacks (inner functions)
# use list here so we can use append to set this flag
test_finish = list()
# expect callbacks
def one_case_finish(result):
""" one test finished, let expect loop break and log result """
test_finish.append(True)
if result:
Utility.console_log("Success: " + one_case["name"], color="green")
else:
failed_cases.append(one_case["name"])
Utility.console_log("Failed: " + one_case["name"], color="red")
def handle_exception_reset(data):
"""
just append data to exception list.
exception list will be checked in ``handle_reset_finish``, once reset finished.
"""
exception_reset_list.append(data[0])
def handle_test_finish(data):
""" test finished without reset """
# in this scenario reset should not happen
assert not exception_reset_list
if int(data[1]):
# case ignored
Utility.console_log("Ignored: " + one_case["name"], color="orange")
one_case_finish(not int(data[0]))
def handle_reset_finish(data):
""" reset happened and reboot finished """
assert exception_reset_list # reboot but no exception/reset logged. should never happen
result = False
if len(one_case["reset"]) == len(exception_reset_list):
for i, exception in enumerate(exception_reset_list):
if one_case["reset"][i] not in exception:
break
else:
result = True
if not result:
Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}"""
.format(one_case["reset"], exception_reset_list),
color="orange")
one_case_finish(result)
while not test_finish:
try:
dut.expect_any((RESET_PATTERN, handle_exception_reset),
(EXCEPTION_PATTERN, handle_exception_reset),
(ABORT_PATTERN, handle_exception_reset),
(FINISH_PATTERN, handle_test_finish),
(UT_APP_BOOT_UP_DONE, handle_reset_finish),
timeout=one_case["timeout"])
except ExpectTimeout:
Utility.console_log("Timeout in expect", color="orange")
one_case_finish(False)
break
# raise exception if any case fails # raise exception if any case fails
if failed_cases: if failed_cases:
@ -267,7 +289,6 @@ def run_unit_test_cases(env, extra_data):
raise AssertionError("Unit Test Failed") raise AssertionError("Unit Test Failed")
class Handler(threading.Thread): class Handler(threading.Thread):
WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!') WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!')
@ -283,6 +304,7 @@ class Handler(threading.Thread):
self.child_case_index = child_case_index + 1 self.child_case_index = child_case_index + 1
self.finish = False self.finish = False
self.result = False self.result = False
self.output = ""
self.fail_name = None self.fail_name = None
self.timeout = timeout self.timeout = timeout
self.force_stop = threading.Event() # it show the running status self.force_stop = threading.Event() # it show the running status
@ -292,6 +314,9 @@ class Handler(threading.Thread):
threading.Thread.__init__(self, name="{} Handler".format(dut)) threading.Thread.__init__(self, name="{} Handler".format(dut))
def run(self): def run(self):
self.dut.start_capture_raw_data()
def get_child_case_name(data): def get_child_case_name(data):
self.child_case_name = data[0] self.child_case_name = data[0]
time.sleep(1) time.sleep(1)
@ -301,6 +326,8 @@ class Handler(threading.Thread):
""" one test finished, let expect loop break and log result """ """ one test finished, let expect loop break and log result """
self.finish = True self.finish = True
self.result = result self.result = result
self.output = "[{}]\n\n{}\n".format(self.child_case_name,
self.dut.stop_capture_raw_data())
if not result: if not result:
self.fail_name = self.child_case_name self.fail_name = self.child_case_name
@ -309,7 +336,7 @@ class Handler(threading.Thread):
expected_signal = data[0] expected_signal = data[0]
while 1: while 1:
if time.time() > start_time + self.timeout: if time.time() > start_time + self.timeout:
Utility.console_log("Timeout in device for function: %s"%self.child_case_name, color="orange") Utility.console_log("Timeout in device for function: %s" % self.child_case_name, color="orange")
break break
with self.lock: with self.lock:
if expected_signal in self.sent_signal_list: if expected_signal in self.sent_signal_list:
@ -330,7 +357,6 @@ class Handler(threading.Thread):
Utility.console_log("Ignored: " + self.child_case_name, color="orange") Utility.console_log("Ignored: " + self.child_case_name, color="orange")
one_device_case_finish(not int(data[0])) one_device_case_finish(not int(data[0]))
try: try:
time.sleep(1) time.sleep(1)
self.dut.write("\"{}\"".format(self.parent_case_name)) self.dut.write("\"{}\"".format(self.parent_case_name))
@ -339,7 +365,8 @@ class Handler(threading.Thread):
Utility.console_log("No case detected!", color="orange") Utility.console_log("No case detected!", color="orange")
while not self.finish and not self.force_stop.isSet(): while not self.finish and not self.force_stop.isSet():
try: try:
self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), get_child_case_name), self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'),
get_child_case_name),
(self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern (self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern
(self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern (self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern
(self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern (self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern
@ -366,11 +393,11 @@ def get_dut(duts, env, name, ut_config, app_bin=None):
dut = env.get_dut(name, app_path=ut_config) dut = env.get_dut(name, app_path=ut_config)
duts[name] = dut duts[name] = dut
replace_app_bin(dut, "unit-test-app", app_bin) replace_app_bin(dut, "unit-test-app", app_bin)
dut.start_app() # download bin to board dut.start_app() # download bin to board
return dut return dut
def case_run(duts, ut_config, env, one_case, failed_cases, app_bin): def run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, app_bin, junit_test_case):
lock = threading.RLock() lock = threading.RLock()
threads = [] threads = []
send_signal_list = [] send_signal_list = []
@ -384,9 +411,11 @@ def case_run(duts, ut_config, env, one_case, failed_cases, app_bin):
for thread in threads: for thread in threads:
thread.setDaemon(True) thread.setDaemon(True)
thread.start() thread.start()
output = "Multiple Device Failed\n"
for thread in threads: for thread in threads:
thread.join() thread.join()
result = result and thread.result result = result and thread.result
output += thread.output
if not thread.result: if not thread.result:
[thd.stop() for thd in threads] [thd.stop() for thd in threads]
@ -394,10 +423,11 @@ def case_run(duts, ut_config, env, one_case, failed_cases, app_bin):
Utility.console_log("Success: " + one_case["name"], color="green") Utility.console_log("Success: " + one_case["name"], color="green")
else: else:
failed_cases.append(one_case["name"]) failed_cases.append(one_case["name"])
junit_test_case.add_failure_info(output)
Utility.console_log("Failed: " + one_case["name"], color="red") Utility.console_log("Failed: " + one_case["name"], color="red")
@IDF.idf_unit_test(env_tag="UT_T2_1") @IDF.idf_unit_test(env_tag="UT_T2_1", junit_report_by_case=True)
def run_multiple_devices_cases(env, extra_data): def run_multiple_devices_cases(env, extra_data):
""" """
extra_data can be two types of value extra_data can be two types of value
@ -421,11 +451,18 @@ def run_multiple_devices_cases(env, extra_data):
""" """
failed_cases = [] failed_cases = []
case_config = format_test_case_config(extra_data) case_config = format_test_case_config(extra_data)
DUTS = {} duts = {}
for ut_config in case_config: for ut_config in case_config:
Utility.console_log("Running unit test for config: " + ut_config, "O") Utility.console_log("Running unit test for config: " + ut_config, "O")
for one_case in case_config[ut_config]: for one_case in case_config[ut_config]:
case_run(DUTS, ut_config, env, one_case, failed_cases, one_case.get('app_bin')) junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
try:
run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases,
one_case.get('app_bin'), junit_test_case)
TinyFW.JunitReport.test_case_finish(junit_test_case)
except Exception as e:
junit_test_case.add_error_info("Unexpected exception: " + str(e))
TinyFW.JunitReport.test_case_finish(junit_test_case)
if failed_cases: if failed_cases:
Utility.console_log("Failed Cases:", color="red") Utility.console_log("Failed Cases:", color="red")
@ -434,7 +471,109 @@ def run_multiple_devices_cases(env, extra_data):
raise AssertionError("Unit Test Failed") raise AssertionError("Unit Test Failed")
@IDF.idf_unit_test(env_tag="UT_T1_1") def run_one_multiple_stage_case(dut, one_case, failed_cases, junit_test_case):
reset_dut(dut)
dut.start_capture_raw_data()
exception_reset_list = []
for test_stage in range(one_case["child case num"]):
# select multi stage test case name
dut.write("\"{}\"".format(one_case["name"]))
dut.expect("Running " + one_case["name"] + "...")
# select test function for current stage
dut.write(str(test_stage + 1))
# we want to set this flag in callbacks (inner functions)
# use list here so we can use append to set this flag
stage_finish = list()
def last_stage():
return test_stage == one_case["child case num"] - 1
def check_reset():
if one_case["reset"]:
assert exception_reset_list # reboot but no exception/reset logged. should never happen
result = False
if len(one_case["reset"]) == len(exception_reset_list):
for i, exception in enumerate(exception_reset_list):
if one_case["reset"][i] not in exception:
break
else:
result = True
if not result:
err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"],
exception_reset_list)
Utility.console_log(err_msg, color="orange")
junit_test_case.add_error_info(err_msg)
else:
# we allow omit reset in multi stage cases
result = True
return result
# expect callbacks
def one_case_finish(result):
""" one test finished, let expect loop break and log result """
# handle test finish
result = result and check_reset()
output = dut.stop_capture_raw_data()
if result:
Utility.console_log("Success: " + one_case["name"], color="green")
else:
failed_cases.append(one_case["name"])
Utility.console_log("Failed: " + one_case["name"], color="red")
junit_test_case.add_failure_info(output)
stage_finish.append("break")
def handle_exception_reset(data):
"""
just append data to exception list.
exception list will be checked in ``handle_reset_finish``, once reset finished.
"""
exception_reset_list.append(data[0])
def handle_test_finish(data):
""" test finished without reset """
# in this scenario reset should not happen
if int(data[1]):
# case ignored
Utility.console_log("Ignored: " + one_case["name"], color="orange")
junit_test_case.add_skipped_info("ignored")
# only passed in last stage will be regarded as real pass
if last_stage():
one_case_finish(not int(data[0]))
else:
Utility.console_log("test finished before enter last stage", color="orange")
one_case_finish(False)
def handle_next_stage(data):
""" reboot finished. we goto next stage """
if last_stage():
# already last stage, should never goto next stage
Utility.console_log("didn't finish at last stage", color="orange")
one_case_finish(False)
else:
stage_finish.append("continue")
while not stage_finish:
try:
dut.expect_any((RESET_PATTERN, handle_exception_reset),
(EXCEPTION_PATTERN, handle_exception_reset),
(ABORT_PATTERN, handle_exception_reset),
(FINISH_PATTERN, handle_test_finish),
(UT_APP_BOOT_UP_DONE, handle_next_stage),
timeout=one_case["timeout"])
except ExpectTimeout:
Utility.console_log("Timeout in expect", color="orange")
one_case_finish(False)
break
if stage_finish[0] == "break":
# test breaks on current stage
break
@IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True)
def run_multiple_stage_cases(env, extra_data): def run_multiple_stage_cases(env, extra_data):
""" """
extra_data can be 2 types of value extra_data can be 2 types of value
@ -442,6 +581,7 @@ def run_multiple_stage_cases(env, extra_data):
3. as list of string or dict: 3. as list of string or dict:
[case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...] [case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...]
:param env: test env instance
:param extra_data: the case name or case list or case dictionary :param extra_data: the case name or case list or case dictionary
:return: None :return: None
""" """
@ -461,98 +601,13 @@ def run_multiple_stage_cases(env, extra_data):
dut.start_app() dut.start_app()
for one_case in case_config[ut_config]: for one_case in case_config[ut_config]:
reset_dut(dut) junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
exception_reset_list = [] try:
run_one_multiple_stage_case(dut, one_case, failed_cases, junit_test_case)
for test_stage in range(one_case["child case num"]): TinyFW.JunitReport.test_case_finish(junit_test_case)
# select multi stage test case name except Exception as e:
dut.write("\"{}\"".format(one_case["name"])) junit_test_case.add_error_info("Unexpected exception: " + str(e))
dut.expect("Running " + one_case["name"] + "...") TinyFW.JunitReport.test_case_finish(junit_test_case)
# select test function for current stage
dut.write(str(test_stage + 1))
# we want to set this flag in callbacks (inner functions)
# use list here so we can use append to set this flag
stage_finish = list()
def last_stage():
return test_stage == one_case["child case num"] - 1
def check_reset():
if one_case["reset"]:
assert exception_reset_list # reboot but no exception/reset logged. should never happen
result = False
if len(one_case["reset"]) == len(exception_reset_list):
for i, exception in enumerate(exception_reset_list):
if one_case["reset"][i] not in exception:
break
else:
result = True
if not result:
Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}"""
.format(one_case["reset"], exception_reset_list),
color="orange")
else:
# we allow omit reset in multi stage cases
result = True
return result
# expect callbacks
def one_case_finish(result):
""" one test finished, let expect loop break and log result """
# handle test finish
result = result and check_reset()
if result:
Utility.console_log("Success: " + one_case["name"], color="green")
else:
failed_cases.append(one_case["name"])
Utility.console_log("Failed: " + one_case["name"], color="red")
stage_finish.append("break")
def handle_exception_reset(data):
"""
just append data to exception list.
exception list will be checked in ``handle_reset_finish``, once reset finished.
"""
exception_reset_list.append(data[0])
def handle_test_finish(data):
""" test finished without reset """
# in this scenario reset should not happen
if int(data[1]):
# case ignored
Utility.console_log("Ignored: " + one_case["name"], color="orange")
# only passed in last stage will be regarded as real pass
if last_stage():
one_case_finish(not int(data[0]))
else:
Utility.console_log("test finished before enter last stage", color="orange")
one_case_finish(False)
def handle_next_stage(data):
""" reboot finished. we goto next stage """
if last_stage():
# already last stage, should never goto next stage
Utility.console_log("didn't finish at last stage", color="orange")
one_case_finish(False)
else:
stage_finish.append("continue")
while not stage_finish:
try:
dut.expect_any((RESET_PATTERN, handle_exception_reset),
(EXCEPTION_PATTERN, handle_exception_reset),
(ABORT_PATTERN, handle_exception_reset),
(FINISH_PATTERN, handle_test_finish),
(UT_APP_BOOT_UP_DONE, handle_next_stage),
timeout=one_case["timeout"])
except ExpectTimeout:
Utility.console_log("Timeout in expect", color="orange")
one_case_finish(False)
break
if stage_finish[0] == "break":
# test breaks on current stage
break
# raise exception if any case fails # raise exception if any case fails
if failed_cases: if failed_cases:
@ -561,6 +616,7 @@ def run_multiple_stage_cases(env, extra_data):
Utility.console_log("\t" + _case_name, color="red") Utility.console_log("\t" + _case_name, color="red")
raise AssertionError("Unit Test Failed") raise AssertionError("Unit Test Failed")
def detect_update_unit_test_info(env, extra_data, app_bin): def detect_update_unit_test_info(env, extra_data, app_bin):
case_config = format_test_case_config(extra_data) case_config = format_test_case_config(extra_data)
@ -576,14 +632,14 @@ def detect_update_unit_test_info(env, extra_data, app_bin):
dut.write("") dut.write("")
dut.expect("Here's the test menu, pick your combo:", timeout=DEFAULT_TIMEOUT) dut.expect("Here's the test menu, pick your combo:", timeout=DEFAULT_TIMEOUT)
def find_update_dic(name, t, timeout, child_case_num=None): def find_update_dic(name, _t, _timeout, child_case_num=None):
for dic in extra_data: for _case_data in extra_data:
if dic['name'] == name: if _case_data['name'] == name:
dic['type'] = t _case_data['type'] = _t
if 'timeout' not in dic: if 'timeout' not in _case_data:
dic['timeout'] = timeout _case_data['timeout'] = _timeout
if child_case_num: if child_case_num:
dic['child case num'] = child_case_num _case_data['child case num'] = child_case_num
try: try:
while True: while True:
@ -613,9 +669,9 @@ def detect_update_unit_test_info(env, extra_data, app_bin):
if data[1] and re.search(END_LIST_STR, data[1]): if data[1] and re.search(END_LIST_STR, data[1]):
break break
# check if the unit test case names are correct, i.e. they could be found in the device # check if the unit test case names are correct, i.e. they could be found in the device
for dic in extra_data: for _dic in extra_data:
if 'type' not in dic: if 'type' not in _dic:
raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(dic.get('name'))) raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(_dic.get('name')))
except ExpectTimeout: except ExpectTimeout:
Utility.console_log("Timeout during getting the test list", color="red") Utility.console_log("Timeout during getting the test list", color="red")
finally: finally:
@ -624,6 +680,7 @@ def detect_update_unit_test_info(env, extra_data, app_bin):
# These options are the same for all configs, therefore there is no need to continue # These options are the same for all configs, therefore there is no need to continue
break break
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument(
@ -633,13 +690,13 @@ if __name__ == '__main__':
default=1 default=1
) )
parser.add_argument("--env_config_file", "-e", parser.add_argument("--env_config_file", "-e",
help="test env config file", help="test env config file",
default=None default=None
) )
parser.add_argument("--app_bin", "-b", parser.add_argument("--app_bin", "-b",
help="application binary file for flashing the chip", help="application binary file for flashing the chip",
default=None default=None
) )
parser.add_argument( parser.add_argument(
'test', 'test',
help='Comma separated list of <option>:<argument> where option can be "name" (default), "child case num", \ help='Comma separated list of <option>:<argument> where option can be "name" (default), "child case num", \
@ -673,12 +730,12 @@ if __name__ == '__main__':
env_config['app'] = UT env_config['app'] = UT
env_config['dut'] = IDF.IDFDUT env_config['dut'] = IDF.IDFDUT
env_config['test_suite_name'] = 'unit_test_parsing' env_config['test_suite_name'] = 'unit_test_parsing'
env = Env.Env(**env_config) test_env = Env.Env(**env_config)
detect_update_unit_test_info(env, extra_data=list_of_dicts, app_bin=args.app_bin) detect_update_unit_test_info(test_env, extra_data=list_of_dicts, app_bin=args.app_bin)
for i in range(1, args.repeat+1): for index in range(1, args.repeat+1):
if args.repeat > 1: if args.repeat > 1:
Utility.console_log("Repetition {}".format(i), color="green") Utility.console_log("Repetition {}".format(index), color="green")
for dic in list_of_dicts: for dic in list_of_dicts:
t = dic.get('type', SIMPLE_TEST_ID) t = dic.get('type', SIMPLE_TEST_ID)
if t == SIMPLE_TEST_ID: if t == SIMPLE_TEST_ID: