Merge branch 'test/print_debug_info_when_test_failed' into 'master'

tiny-test-fw: print debug info when test failed

See merge request idf/esp-idf!2564
This commit is contained in:
He Yin Ling 2018-07-13 21:22:29 +08:00
commit be81d2c16d
5 changed files with 117 additions and 21 deletions

View file

@ -371,4 +371,4 @@ static void check_time_deepsleep(void)
TEST_ASSERT_MESSAGE(dt_ms > 0, "Time in deep sleep is negative");
}
TEST_CASE_MULTIPLE_STAGES("check a time after wakeup from deep sleep", "[deepsleep][reset=DEEPSLEEP_RESET]", trigger_deepsleep, check_time_deepsleep);
TEST_CASE_MULTIPLE_STAGES("check a time after wakeup from deep sleep", "[deepsleep][reset=DEEPSLEEP_RESET][timeout=60]", trigger_deepsleep, check_time_deepsleep);

View file

@ -117,7 +117,7 @@ class _DataCache(_queue.Queue):
break
return ret
def get_data(self, timeout=0):
def get_data(self, timeout=0.0):
"""
get a copy of data from cache.
@ -154,6 +154,52 @@ class _DataCache(_queue.Queue):
self.data_cache = self.data_cache[index:]
class _LogThread(threading.Thread, _queue.Queue):
"""
We found some SD card on Raspberry Pi could have very bad performance.
It could take seconds to save small amount of data.
If the DUT receives data and save it as log, then it stops receiving data until log is saved.
This could lead to expect timeout.
As an workaround to this issue, ``BaseDUT`` class will create a thread to save logs.
Then data will be passed to ``expect`` as soon as received.
"""
def __init__(self):
threading.Thread.__init__(self, name="LogThread")
_queue.Queue.__init__(self, maxsize=0)
self.setDaemon(True)
self.flush_lock = threading.Lock()
def save_log(self, filename, data):
"""
:param filename: log file name
:param data: log data. Must be ``bytes``.
"""
self.put({"filename": filename, "data": data})
def flush_data(self):
with self.flush_lock:
data_cache = dict()
while True:
# move all data from queue to data cache
try:
log = self.get_nowait()
try:
data_cache[log["filename"]] += log["data"]
except KeyError:
data_cache[log["filename"]] = log["data"]
except _queue.Empty:
break
# flush data
for filename in data_cache:
with open(filename, "ab+") as f:
f.write(data_cache[filename])
def run(self):
while True:
time.sleep(1)
self.flush_data()
class _RecvThread(threading.Thread):
PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
@ -214,6 +260,10 @@ class BaseDUT(object):
"""
DEFAULT_EXPECT_TIMEOUT = 5
MAX_EXPECT_FAILURES_TO_SAVED = 10
LOG_THREAD = _LogThread()
LOG_THREAD.start()
def __init__(self, name, port, log_file, app, **kwargs):
@ -224,12 +274,33 @@ class BaseDUT(object):
self.app = app
self.data_cache = _DataCache()
self.receive_thread = None
self.expect_failures = []
# open and start during init
self.open()
def __str__(self):
return "DUT({}: {})".format(self.name, str(self.port))
def _save_expect_failure(self, pattern, data, start_time):
"""
Save expect failure. If the test fails, then it will print the expect failures.
In some cases, user will handle expect exceptions.
The expect failures could be false alarm, and test case might generate a lot of such failures.
Therefore, we don't print the failure immediately and limit the max size of failure list.
"""
self.expect_failures.insert(0, {"pattern": pattern, "data": data,
"start": start_time, "end": time.time()})
self.expect_failures = self.expect_failures[:self.MAX_EXPECT_FAILURES_TO_SAVED]
def _save_dut_log(self, data):
"""
Save DUT log into file using another thread.
This is a workaround for some devices takes long time for file system operations.
See descriptions in ``_LogThread`` for details.
"""
self.LOG_THREAD.save_log(self.log_file, data)
# define for methods need to be overwritten by Port
@classmethod
def list_available_ports(cls):
@ -329,6 +400,7 @@ class BaseDUT(object):
if self.receive_thread:
self.receive_thread.exit()
self._port_close()
self.LOG_THREAD.flush_data()
def write(self, data, eol="\r\n", flush=True):
"""
@ -437,14 +509,19 @@ class BaseDUT(object):
start_time = time.time()
while True:
ret, index = method(data, pattern)
if ret is not None or time.time() - start_time > timeout:
if ret is not None:
self.data_cache.flush(index)
break
time_remaining = start_time + timeout - time.time()
if time_remaining < 0:
break
# wait for new data from cache
data = self.data_cache.get_data(time.time() + timeout - start_time)
data = self.data_cache.get_data(time_remaining)
if ret is None:
raise ExpectTimeout(self.name + ": " + _pattern_to_string(pattern))
pattern = _pattern_to_string(pattern)
self._save_expect_failure(pattern, data, start_time)
raise ExpectTimeout(self.name + ": " + pattern)
return ret
def _expect_multi(self, expect_all, expect_item_list, timeout):
@ -492,10 +569,11 @@ class BaseDUT(object):
else:
match_succeed = True if matched_expect_items else False
if time.time() - start_time > timeout or match_succeed:
time_remaining = start_time + timeout - time.time()
if time_remaining < 0 or match_succeed:
break
else:
data = self.data_cache.get_data(time.time() + timeout - start_time)
data = self.data_cache.get_data(time_remaining)
if match_succeed:
# do callback and flush matched data cache
@ -508,7 +586,9 @@ class BaseDUT(object):
# flush already matched data
self.data_cache.flush(slice_index)
else:
raise ExpectTimeout(self.name + ": " + str([_pattern_to_string(x) for x in expect_items]))
pattern = str([_pattern_to_string(x["pattern"]) for x in expect_items])
self._save_expect_failure(pattern, data, start_time)
raise ExpectTimeout(self.name + ": " + pattern)
@_expect_lock
def expect_any(self, *expect_items, **timeout):
@ -554,6 +634,22 @@ class BaseDUT(object):
timeout["timeout"] = self.DEFAULT_EXPECT_TIMEOUT
return self._expect_multi(True, expect_items, **timeout)
@staticmethod
def _format_ts(ts):
return "{}:{}".format(time.strftime("%m-%d %H:%M:%S", time.localtime(ts)), str(ts % 1)[2:5])
def print_debug_info(self):
"""
Print debug info of current DUT. Currently we will print debug info for expect failures.
"""
Utility.console_log("DUT debug info for DUT: {}:".format(self.name), color="orange")
for failure in self.expect_failures:
Utility.console_log("\t[pattern]: {}\r\n\t[data]: {}\r\n\t[time]: {} - {}\r\n"
.format(failure["pattern"], failure["data"],
self._format_ts(failure["start"]), self._format_ts(failure["end"])),
color="orange")
class SerialDUT(BaseDUT):
""" serial with logging received data feature """
@ -574,17 +670,14 @@ class SerialDUT(BaseDUT):
self.serial_configs.update(kwargs)
super(SerialDUT, self).__init__(name, port, log_file, app, **kwargs)
@staticmethod
def _format_data(data):
def _format_data(self, data):
"""
format data for logging. do decode and add timestamp.
:param data: raw data from read
:return: formatted data (str)
"""
timestamp = time.time()
timestamp = "[{}:{}]".format(time.strftime("%m-%d %H:%M:%S", time.localtime(timestamp)),
str(timestamp % 1)[2:5])
timestamp = "[{}]".format(self._format_ts(time.time()))
formatted_data = timestamp.encode() + b"\r\n" + data + b"\r\n"
return formatted_data
@ -597,8 +690,7 @@ class SerialDUT(BaseDUT):
def _port_read(self, size=1):
data = self.port_inst.read(size)
if data:
with open(self.log_file, "ab+") as _log_file:
_log_file.write(self._format_data(data))
self._save_dut_log(self._format_data(data))
return data
def _port_write(self, data):

View file

@ -162,14 +162,17 @@ class Env(object):
return if_addr[self.PROTO_MAP[proto]][0]
@_synced
def close(self):
def close(self, dut_debug=False):
"""
close()
close all DUTs of the Env.
:param dut_debug: if dut_debug is True, then print all dut expect failures before close it
:return: None
"""
for dut_name in self.allocated_duts:
dut = self.allocated_duts[dut_name]["dut"]
if dut_debug:
dut.print_debug_info()
dut.close()
self.allocated_duts = dict()

View file

@ -21,7 +21,7 @@ from IDF.IDFDUT import IDFDUT
def idf_example_test(app=Example, dut=IDFDUT, chip="ESP32",
module="examples", execution_time=1,
module="examples", execution_time=1, level="example",
**kwargs):
"""
decorator for testing idf examples (with default values for some keyword args).
@ -31,12 +31,13 @@ def idf_example_test(app=Example, dut=IDFDUT, chip="ESP32",
:param chip: chip supported, string or tuple
:param module: module, string
:param execution_time: execution time in minutes, int
:param level: test level, could be used to filter test cases, string
:param kwargs: other keyword args
:return: test method
"""
# not use partial function as define as function support auto generating document
return TinyFW.test_method(app=app, dut=dut, chip=chip, module=module,
execution_time=execution_time, **kwargs)
execution_time=execution_time, level=level, **kwargs)
def log_performance(item, value):

View file

@ -154,6 +154,7 @@ def test_method(**kwargs):
xunit_file = os.path.join(env_inst.app_cls.get_log_folder(env_config["test_suite_name"]),
XUNIT_FILE_NAME)
XUNIT_RECEIVER.begin_case(test_func.__name__, time.time(), test_func_file_name)
result = False
try:
Utility.console_log("starting running test: " + test_func.__name__, color="green")
# execute test function
@ -163,12 +164,11 @@ def test_method(**kwargs):
except Exception as e:
# handle all the exceptions here
traceback.print_exc()
result = False
# log failure
XUNIT_RECEIVER.failure(str(e), test_func_file_name)
finally:
# do close all DUTs
env_inst.close()
# do close all DUTs, if result is False then print DUT debug info
env_inst.close(dut_debug=(not result))
# end case and output result
XUNIT_RECEIVER.end_case(test_func.__name__, time.time())
with open(xunit_file, "ab+") as f: