from __future__ import print_function import os import sys try: import IDF from IDF.IDFDUT import ESP32DUT except ImportError: # this is a test case write with tiny-test-fw. # to run test cases outside tiny-test-fw, # we need to set environment variable `TEST_FW_PATH`, # then get and insert `TEST_FW_PATH` to sys path before import FW module test_fw_path = os.getenv('TEST_FW_PATH') if test_fw_path and test_fw_path not in sys.path: sys.path.insert(0, test_fw_path) import IDF # Timer events TIMER_EVENT_LIMIT = 3 TIMER_EXPIRY_HANDLING = "TIMER_EVENTS:TIMER_EVENT_EXPIRY: timer_expiry_handler, executed {} out of " + str(TIMER_EVENT_LIMIT) + " times" # Task events TASK_ITERATION_LIMIT = 5 TASK_UNREGISTRATION_LIMIT = 3 TASK_ITERATION_POST = "TASK_EVENTS:TASK_ITERATION_EVENT: posting to default loop, {} out of " + str(TASK_ITERATION_LIMIT) TASK_ITERATION_HANDLING = "TASK_EVENTS:TASK_ITERATION_EVENT: task_iteration_handler, executed {} times" def _test_timer_events(dut): dut.start_app() print("Checking timer events posting and handling") dut.expect("setting up") dut.expect("starting event sources") print("Finished setup") dut.expect("TIMER_EVENTS:TIMER_EVENT_STARTED: posting to default loop") print("Posted timer started event") dut.expect("TIMER_EVENTS:TIMER_EVENT_STARTED: timer_started_handler") dut.expect("TIMER_EVENTS:TIMER_EVENT_STARTED: timer_any_handler") dut.expect("TIMER_EVENTS:TIMER_EVENT_STARTED: all_event_handler") print("Handled timer started event") for expiries in range(1, TIMER_EVENT_LIMIT + 1): dut.expect("TIMER_EVENTS:TIMER_EVENT_EXPIRY: posting to default loop") print("Posted timer expiry event {} out of {}".format(expiries, TIMER_EVENT_LIMIT)) if expiries >= TIMER_EVENT_LIMIT: dut.expect("TIMER_EVENTS:TIMER_EVENT_STOPPED: posting to default loop") print("Posted timer stopped event") dut.expect(TIMER_EXPIRY_HANDLING.format(expiries)) dut.expect("TIMER_EVENTS:TIMER_EVENT_EXPIRY: timer_any_handler") dut.expect("TIMER_EVENTS:TIMER_EVENT_EXPIRY: all_event_handler") print("Handled timer expiry event {} out of {}".format(expiries, TIMER_EVENT_LIMIT)) dut.expect("TIMER_EVENTS:TIMER_EVENT_STOPPED: timer_stopped_handler") dut.expect("TIMER_EVENTS:TIMER_EVENT_STOPPED: deleted timer event source") print("Handled timer stopped event") def _test_iteration_events(dut): dut.start_app() print("Checking iteration events posting and handling") dut.expect("setting up") dut.expect("starting event sources") print("Finished setup") for iteration in range(1, TASK_ITERATION_LIMIT + 1): dut.expect(TASK_ITERATION_POST.format(iteration)) print("Posted iteration {} out of {}".format(iteration, TASK_ITERATION_LIMIT)) if iteration < TASK_UNREGISTRATION_LIMIT: dut.expect(TASK_ITERATION_HANDLING.format(iteration)) dut.expect("TASK_EVENTS:TASK_ITERATION_EVENT: all_event_handler") elif iteration == TASK_UNREGISTRATION_LIMIT: dut.expect("TASK_EVENTS:TASK_ITERATION_EVENT: unregistering task_iteration_handler") dut.expect("TASK_EVENTS:TASK_ITERATION_EVENT: all_event_handler") print("Unregistered handler at iteration {} out of {}".format(iteration, TASK_ITERATION_LIMIT)) else: dut.expect("TASK_EVENTS:TASK_ITERATION_EVENT: all_event_handler") print("Handled iteration {} out of {}".format(iteration, TASK_ITERATION_LIMIT)) dut.expect("TASK_EVENTS:TASK_ITERATION_EVENT: deleting task event source") print("Deleted task event source") @IDF.idf_example_test(env_tag='Example_WIFI') def test_default_event_loop_example(env, extra_data): dut = env.get_dut('default_event_loop', 'examples/system/esp_event/default_event_loop', dut_class=ESP32DUT) _test_iteration_events(dut) _test_timer_events(dut) if __name__ == '__main__': test_default_event_loop_example()