Merge branch 'feature/pystyle_examples' into 'master'

examples: Fix Python coding style

See merge request idf/esp-idf!3881
This commit is contained in:
Angus Gratton 2018-12-07 09:12:56 +08:00
commit 560e92b101
36 changed files with 645 additions and 561 deletions

38
.flake8
View file

@ -149,6 +149,8 @@ exclude =
components/expat/expat,
components/unity/unity,
examples/build_system/cmake/import_lib/main/lib/tinyxml2
# autogenerated scripts
examples/provisioning/custom_config/components/custom_provisioning/python/custom_config_pb2.py,
# temporary list (should be empty)
components/app_update/dump_otadata.py,
components/app_update/gen_empty_partition.py,
@ -164,45 +166,9 @@ exclude =
components/ulp/esp32ulp_mapgen.py,
components/wifi_provisioning/python/wifi_config_pb2.py,
components/wifi_provisioning/python/wifi_constants_pb2.py,
examples/peripherals/can/can_alert_and_recovery/example_test.py,
examples/peripherals/can/can_network/example_test.py,
examples/peripherals/can/can_self_test/example_test.py,
examples/peripherals/i2s_adc_dac/tools/generate_audio_file.py,
examples/peripherals/sdio/sdio_test.py,
examples/protocols/asio/chat_client/asio_chat_client_test.py,
examples/protocols/asio/chat_server/asio_chat_server_test.py,
examples/protocols/asio/tcp_echo_server/asio_tcp_server_test.py,
examples/protocols/asio/udp_echo_server/asio_udp_server_test.py,
examples/protocols/esp_http_client/esp_http_client_test.py,
examples/protocols/http_server/advanced_tests/http_server_advanced_test.py,
examples/protocols/http_server/advanced_tests/scripts/test.py,
examples/protocols/http_server/persistent_sockets/http_server_persistence_test.py,
examples/protocols/http_server/persistent_sockets/scripts/adder.py,
examples/protocols/http_server/simple/http_server_simple_test.py,
examples/protocols/http_server/simple/scripts/client.py,
examples/protocols/https_request/example_test.py,
examples/protocols/mdns/mdns_example_test.py,
examples/protocols/mqtt/ssl/mqtt_ssl_example_test.py,
examples/protocols/mqtt/tcp/mqtt_tcp_example_test.py,
examples/protocols/mqtt/ws/mqtt_ws_example_test.py,
examples/protocols/mqtt/wss/mqtt_wss_example_test.py,
examples/protocols/sockets/scripts/tcpclient.py,
examples/protocols/sockets/scripts/tcpserver.py,
examples/protocols/sockets/scripts/udpclient.py,
examples/protocols/sockets/scripts/udpserver.py,
examples/provisioning/ble_prov/ble_prov_test.py,
examples/provisioning/custom_config/components/custom_provisioning/python/custom_config_pb2.py,
examples/provisioning/softap_prov/softap_prov_test.py,
examples/provisioning/softap_prov/utils/wifi_tools.py,
examples/storage/parttool/example_test.py,
examples/system/cpp_exceptions/example_test.py,
examples/system/esp_event/default_event_loop/example_test.py,
examples/system/esp_event/user_event_loops/example_test.py,
examples/system/esp_timer/example_test.py,
examples/system/light_sleep/example_test.py,
examples/system/ota/otatool/example_test.py,
examples/wifi/iperf/iperf_test.py,
examples/wifi/iperf/test_report.py,
tools/ci/apply_bot_filter.py,
tools/cmake/convert_to_cmake.py,
tools/esp_app_trace/apptrace_proc.py,

View file

@ -1,29 +1,33 @@
#Need Python 3 string formatting functions
# Need Python 3 string formatting functions
from __future__ import print_function
import re
import os
import sys
# The test cause is dependent on the Tiny Test Framework. Ensure the
# `TEST_FW_PATH` environment variable is set to `$IDF_PATH/tools/tiny-test-fw`
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
try:
import IDF
except ImportError:
# The test cause is dependent on the Tiny Test Framework. Ensure the
# `TEST_FW_PATH` environment variable is set to `$IDF_PATH/tools/tiny-test-fw`
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
# CAN Self Test Example constants
STR_EXPECT = ("CAN Alert and Recovery: Driver installed", "CAN Alert and Recovery: Driver uninstalled")
EXPECT_TIMEOUT = 20
@IDF.idf_example_test(env_tag='Example_CAN')
def test_can_alert_and_recovery_example(env, extra_data):
#Get device under test, flash and start example. "dut4" must be defined in EnvConfig
# Get device under test, flash and start example. "dut4" must be defined in EnvConfig
dut = env.get_dut('dut4', 'examples/peripherals/can/can_alert_and_recovery')
dut.start_app()
for string in STR_EXPECT:
dut.expect(string, timeout = EXPECT_TIMEOUT)
dut.expect(string, timeout=EXPECT_TIMEOUT)
if __name__ == '__main__':
test_can_alert_and_recovery_example()

View file

@ -1,77 +1,80 @@
#Need Python 3 string formatting functions
# Need Python 3 string formatting functions
from __future__ import print_function
import re
import os
import sys
import time
from threading import Thread
# The test cause is dependent on the Tiny Test Framework. Ensure the
# `TEST_FW_PATH` environment variable is set to `$IDF_PATH/tools/tiny-test-fw`
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
try:
import IDF
except ImportError:
# The test cause is dependent on the Tiny Test Framework. Ensure the
# `TEST_FW_PATH` environment variable is set to `$IDF_PATH/tools/tiny-test-fw`
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
#Define tuple of strings to expect for each DUT.
# Define tuple of strings to expect for each DUT.
master_expect = ("CAN Master: Driver installed", "CAN Master: Driver uninstalled")
slave_expect = ("CAN Slave: Driver installed", "CAN Slave: Driver uninstalled")
listen_only_expect = ("CAN Listen Only: Driver installed", "Listen Only: Driver uninstalled")
def dut_thread_callback(**kwargs):
#Parse keyword arguments
dut = kwargs['dut'] #Get DUT from kwargs
expected = kwargs['expected']
result = kwargs['result'] #Get result[out] from kwargs. MUST be of mutable type e.g. list
#Must reset again as flashing during start_app will reset multiple times, causing unexpected results
def dut_thread_callback(**kwargs):
# Parse keyword arguments
dut = kwargs['dut'] # Get DUT from kwargs
expected = kwargs['expected']
result = kwargs['result'] # Get result[out] from kwargs. MUST be of mutable type e.g. list
# Must reset again as flashing during start_app will reset multiple times, causing unexpected results
dut.reset()
for string in expected:
dut.expect(string, 20)
#Mark thread has run to completion without any exceptions
# Mark thread has run to completion without any exceptions
result[0] = True
@IDF.idf_example_test(env_tag='Example_CAN')
def test_can_network_example(env, extra_data):
#Get device under test. "dut1", "dut2", and "dut3" must be properly defined in EnvConfig
# Get device under test. "dut1", "dut2", and "dut3" must be properly defined in EnvConfig
dut_master = env.get_dut("dut1", "examples/peripherals/can/can_network/can_network_master")
dut_slave = env.get_dut("dut2", "examples/peripherals/can/can_network/can_network_slave")
dut_listen_only = env.get_dut("dut3", "examples/peripherals/can/can_network/can_network_listen_only")
#Flash app onto each DUT, each DUT is reset again at the start of each thread
# Flash app onto each DUT, each DUT is reset again at the start of each thread
dut_master.start_app()
dut_slave.start_app()
dut_listen_only.start_app()
#Create dict of keyword arguments for each dut
# Create dict of keyword arguments for each dut
results = [[False], [False], [False]]
master_kwargs = {"dut" : dut_master, "result" : results[0], "expected" : master_expect}
slave_kwargs = {"dut" : dut_slave, "result" : results[1], "expected" : slave_expect}
listen_only_kwargs = {"dut" : dut_listen_only, "result" : results[2], "expected" : listen_only_expect}
master_kwargs = {"dut": dut_master, "result": results[0], "expected": master_expect}
slave_kwargs = {"dut": dut_slave, "result": results[1], "expected": slave_expect}
listen_only_kwargs = {"dut": dut_listen_only, "result": results[2], "expected": listen_only_expect}
#Create thread for each dut
dut_master_thread = Thread(target = dut_thread_callback, name = "Master Thread", kwargs = master_kwargs)
dut_slave_thread = Thread(target = dut_thread_callback, name = "Slave Thread", kwargs = slave_kwargs)
dut_listen_only_thread = Thread(target = dut_thread_callback, name = "Listen Only Thread", kwargs = listen_only_kwargs)
# Create thread for each dut
dut_master_thread = Thread(target=dut_thread_callback, name="Master Thread", kwargs=master_kwargs)
dut_slave_thread = Thread(target=dut_thread_callback, name="Slave Thread", kwargs=slave_kwargs)
dut_listen_only_thread = Thread(target=dut_thread_callback, name="Listen Only Thread", kwargs=listen_only_kwargs)
#Start each thread
# Start each thread
dut_listen_only_thread.start()
dut_master_thread.start()
dut_slave_thread.start()
#Wait for threads to complete
# Wait for threads to complete
dut_listen_only_thread.join()
dut_master_thread.join()
dut_slave_thread.join()
#check each thread ran to completion
# check each thread ran to completion
for result in results:
if result[0] != True:
if result[0] is not True:
raise Exception("One or more threads did not run successfully")
if __name__ == '__main__':
test_can_network_example()

View file

@ -1,29 +1,33 @@
#Need Python 3 string formatting functions
# Need Python 3 string formatting functions
from __future__ import print_function
import re
import os
import sys
# The test cause is dependent on the Tiny Test Framework. Ensure the
# `TEST_FW_PATH` environment variable is set to `$IDF_PATH/tools/tiny-test-fw`
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
try:
import IDF
except ImportError:
# The test cause is dependent on the Tiny Test Framework. Ensure the
# `TEST_FW_PATH` environment variable is set to `$IDF_PATH/tools/tiny-test-fw`
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
# CAN Self Test Example constants
STR_EXPECT = ("CAN Self Test: Driver installed", "CAN Self Test: Driver uninstalled")
EXPECT_TIMEOUT = 20
@IDF.idf_example_test(env_tag='Example_CAN')
def test_can_self_test_example(env, extra_data):
#Get device under test, flash and start example. "dut4" must be defined in EnvConfig
# Get device under test, flash and start example. "dut4" must be defined in EnvConfig
dut = env.get_dut('dut4', 'examples/peripherals/can/can_self_test')
dut.start_app()
for string in STR_EXPECT:
dut.expect(string, timeout = EXPECT_TIMEOUT)
dut.expect(string, timeout=EXPECT_TIMEOUT)
if __name__ == '__main__':
test_can_self_test_example()

View file

@ -4,6 +4,7 @@ import os
import wave
import struct
def get_wave_array_str(filename, target_bits):
wave_read = wave.open(filename, "r")
array_str = ""
@ -13,28 +14,30 @@ def get_wave_array_str(filename, target_bits):
val, = struct.unpack("<H", wave_read.readframes(1))
scale_val = (1 << target_bits) - 1
cur_lim = (1 << sampwidth) - 1
#scale current data to 8-bit data
# scale current data to 8-bit data
val = val * scale_val / cur_lim
val = int(val + ((scale_val + 1) // 2)) & scale_val
array_str += "0x%x, "%(val)
array_str += "0x%x, " % (val)
if (i + 1) % 16 == 0:
array_str += "\n"
return array_str
def gen_wave_table(wav_file_list, target_file_name, scale_bits = 8):
def gen_wave_table(wav_file_list, target_file_name, scale_bits=8):
with open(target_file_name, "w") as audio_table:
print('#include <stdio.h>', file=audio_table)
print('const unsigned char audio_table[] = {', file=audio_table)
for wav in wav_file_list:
print("processing: {}".format(wav))
print(get_wave_array_str(filename = wav, target_bits = scale_bits), file=audio_table)
print(get_wave_array_str(filename=wav, target_bits=scale_bits), file=audio_table)
print('};\n', file=audio_table)
print("Done...")
if __name__=='__main__':
if __name__ == '__main__':
print("Generating audio array...")
wav_list = []
for filename in os.listdir("./"):
if filename.endswith(".wav"):
wav_list.append(filename)
gen_wave_table(wav_file_list = wav_list, target_file_name = "audio_example_file.h")
gen_wave_table(wav_file_list=wav_list, target_file_name="audio_example_file.h")

View file

@ -13,17 +13,19 @@
# limitations under the License.
""" example of writing test with TinyTestFW """
import re
import os
import sys
# if we want to run test case outside `tiny-test-fw` folder,
# we need to insert tiny-test-fw path into sys path
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
try:
import TinyFW
except ImportError:
# if we want to run test case outside `tiny-test-fw` folder,
# we need to insert tiny-test-fw path into sys path
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import TinyFW
import IDF
@ -50,7 +52,7 @@ def test_example_sdio_communication(env, extra_data):
dut1 = env.get_dut("sdio_host", "examples/peripherals/sdio/host")
dut2 = env.get_dut("sdio_slave", "examples/peripherals/sdio/slave")
dut1.start_app()
#wait until the master is ready to setup the slave
# wait until the master is ready to setup the slave
dut1.expect("host ready, start initializing slave...")
dut2.start_app()
@ -96,12 +98,12 @@ def test_example_sdio_communication(env, extra_data):
dut2.expect("recv len: 6")
dut2.expect("recv len: 12")
dut2.expect("recv len: 128")
#511
# 511
dut2.expect("recv len: 128")
dut2.expect("recv len: 128")
dut2.expect("recv len: 128")
dut2.expect("recv len: 127")
#512
# 512
dut2.expect("recv len: 128")
dut2.expect("recv len: 128")
dut2.expect("recv len: 128")
@ -122,9 +124,9 @@ def test_example_sdio_communication(env, extra_data):
dut1.expect("receive data, size: 128")
dut1.expect("receive data, size: 128")
#the last valid line of one round
# the last valid line of one round
dut1.expect("ce d3 d8 dd e2 e7 ec f1 f6 fb 00 05 0a 0f 14 19")
#the first 2 lines of the second round
# the first 2 lines of the second round
dut1.expect("46 4b 50")
dut1.expect("5a 5f 64 69 6e 73")

View file

@ -1,49 +1,52 @@
import re
import os
import sys
from socket import *
import socket
from threading import Thread
import time
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import TinyFW
import IDF
global g_client_response;
global g_msg_to_client;
global g_client_response
global g_msg_to_client
g_client_response = b""
g_msg_to_client = b" 3XYZ"
def get_my_ip():
s1 = socket(AF_INET, SOCK_DGRAM)
s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s1.connect(("8.8.8.8", 80))
my_ip = s1.getsockname()[0]
s1.close()
return my_ip
def chat_server_sketch(my_ip):
global g_client_response
print("Starting the server on {}".format(my_ip))
port=2222
s=socket(AF_INET, SOCK_STREAM)
port = 2222
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(600)
s.bind((my_ip, port))
s.listen(1)
q,addr=s.accept()
q,addr = s.accept()
print("connection accepted")
q.settimeout(30)
q.send(g_msg_to_client)
data = q.recv(1024)
# check if received initial empty message
if (len(data)>4):
if (len(data) > 4):
g_client_response = data
else:
g_client_response = q.recv(1024)
@ -51,6 +54,7 @@ def chat_server_sketch(my_ip):
s.close()
print("server closed")
@IDF.idf_example_test(env_tag="Example_WIFI")
def test_examples_protocol_asio_chat_client(env, extra_data):
"""
@ -64,24 +68,24 @@ def test_examples_protocol_asio_chat_client(env, extra_data):
"""
global g_client_response
global g_msg_to_client
test_msg="ABC"
test_msg = "ABC"
dut1 = env.get_dut("chat_client", "examples/protocols/asio/chat_client")
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "asio_chat_client.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("asio_chat_client_size", "{}KB".format(bin_size//1024))
IDF.check_performance("asio_chat_client_size", bin_size//1024)
IDF.log_performance("asio_chat_client_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("asio_chat_client_size", bin_size // 1024)
# 1. start a tcp server on the host
host_ip = get_my_ip()
thread1 = Thread(target = chat_server_sketch, args = (host_ip,))
thread1 = Thread(target=chat_server_sketch, args=(host_ip,))
thread1.start()
# 2. start the dut test and wait till client gets IP address
dut1.start_app()
data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
# 3. send host's IP to the client i.e. the `dut1`
dut1.write(host_ip)
# 4. client `dut1` should receive a message
dut1.expect(g_msg_to_client[4:].decode()) # Strip out the front 4 bytes of message len (see chat_message protocol)
dut1.expect(g_msg_to_client[4:].decode()) # Strip out the front 4 bytes of message len (see chat_message protocol)
# 5. write test message from `dut1` chat_client to the server
dut1.write(test_msg)
while len(g_client_response) == 0:
@ -97,5 +101,6 @@ def test_examples_protocol_asio_chat_client(env, extra_data):
raise ValueError('Wrong data received from asi tcp server: {} (expected:{})'.format(g_client_response[4:7], test_msg))
thread1.join()
if __name__ == '__main__':
test_examples_protocol_asio_chat_client()

View file

@ -1,21 +1,20 @@
import re
import os
import sys
from socket import *
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
import socket
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
@IDF.idf_example_test(env_tag="Example_WIFI")
@ -27,19 +26,19 @@ def test_examples_protocol_asio_chat_server(env, extra_data):
3. Test connects to server and sends a test message
4. Test evaluates received test message from server
"""
test_msg=b" 4ABC\n"
test_msg = b" 4ABC\n"
dut1 = env.get_dut("chat_server", "examples/protocols/asio/chat_server")
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "asio_chat_server.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("asio_chat_server_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("asio_chat_server_size", bin_size//1024)
IDF.log_performance("asio_chat_server_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("asio_chat_server_size", bin_size // 1024)
# 1. start test
dut1.start_app()
# 2. get the server IP address
data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
# 3. create tcp client and connect to server
cli = socket(AF_INET,SOCK_STREAM)
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cli.settimeout(30)
cli.connect((data[0],80))
cli.send(test_msg)

View file

@ -1,21 +1,21 @@
import re
import os
import sys
from socket import *
import socket
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
@IDF.idf_example_test(env_tag="Example_WIFI")
@ -28,19 +28,19 @@ def test_examples_protocol_asio_tcp_server(env, extra_data):
4. Test evaluates received test message from server
5. Test evaluates received test message on server stdout
"""
test_msg=b"echo message from client to server"
test_msg = b"echo message from client to server"
dut1 = env.get_dut("tcp_echo_server", "examples/protocols/asio/tcp_echo_server")
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "asio_tcp_echo_server.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("asio_tcp_echo_server_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("asio_tcp_echo_server_size", bin_size//1024)
IDF.log_performance("asio_tcp_echo_server_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("asio_tcp_echo_server_size", bin_size // 1024)
# 1. start test
dut1.start_app()
# 2. get the server IP address
data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
# 3. create tcp client and connect to server
cli = socket(AF_INET,SOCK_STREAM)
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cli.settimeout(30)
cli.connect((data[0],80))
cli.send(test_msg)

View file

@ -1,21 +1,21 @@
import re
import os
import sys
from socket import *
import socket
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
@IDF.idf_example_test(env_tag="Example_WIFI")
@ -28,19 +28,19 @@ def test_examples_protocol_asio_udp_server(env, extra_data):
4. Test evaluates received test message from server
5. Test evaluates received test message on server stdout
"""
test_msg=b"echo message from client to server"
test_msg = b"echo message from client to server"
dut1 = env.get_dut("udp_echo_server", "examples/protocols/asio/udp_echo_server")
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "asio_udp_echo_server.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("asio_udp_echo_server_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("asio_udp_echo_server_size", bin_size//1024)
IDF.log_performance("asio_udp_echo_server_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("asio_udp_echo_server_size", bin_size // 1024)
# 1. start test
dut1.start_app()
# 2. get the server IP address
data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
# 3. create tcp client and connect to server
cli = socket(AF_INET, SOCK_DGRAM)
cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cli.settimeout(30)
cli.connect((data[0], 80))
cli.send(test_msg)
@ -55,5 +55,6 @@ def test_examples_protocol_asio_udp_server(env, extra_data):
# 5. check the client message appears also on server terminal
dut1.expect(test_msg.decode())
if __name__ == '__main__':
test_examples_protocol_asio_udp_server()

View file

@ -2,16 +2,18 @@ import re
import os
import sys
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
import IDF
@IDF.idf_example_test(env_tag="Example_WIFI", ignore=True)
@ -25,8 +27,8 @@ def test_examples_protocol_esp_http_client(env, extra_data):
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "esp-http-client-example.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("esp_http_client_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("esp_http_client_bin_size", bin_size//1024)
IDF.log_performance("esp_http_client_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("esp_http_client_bin_size", bin_size // 1024)
# start test
dut1.start_app()
dut1.expect("Connected to AP, begin http example", timeout=30)

View file

@ -21,28 +21,29 @@ import imp
import re
import os
import sys
import string
import random
import socket
# This environment variable is expected on the host machine
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
try:
import IDF
except ImportError:
# This environment variable is expected on the host machine
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import Utility
# When running on local machine execute the following before running this script
# > make app bootloader
# > make print_flash_cmd | tail -n 1 > build/download.config
# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
import TinyFW
import IDF
import Utility
# Import client module
expath = os.path.dirname(os.path.realpath(__file__))
client = imp.load_source("client", expath + "/scripts/test.py")
# Due to connectivity issues (between runner host and DUT) in the runner environment,
# some of the `advanced_tests` are ignored. These tests are intended for verifying
# the expected limits of the http_server capabilities, and implement sending and receiving
@ -57,8 +58,8 @@ def test_examples_protocol_http_server_advanced(env, extra_data):
# Get binary file
binary_file = os.path.join(dut1.app.binary_path, "tests.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("http_server_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("http_server_bin_size", bin_size//1024)
IDF.log_performance("http_server_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("http_server_bin_size", bin_size // 1024)
# Upload binary and start testing
Utility.console_log("Starting http_server advanced test app")
@ -69,8 +70,10 @@ def test_examples_protocol_http_server_advanced(env, extra_data):
got_ip = dut1.expect(re.compile(r"(?:[\s\S]*)Got IP: '(\d+.\d+.\d+.\d+)'"), timeout=30)[0]
got_port = dut1.expect(re.compile(r"(?:[\s\S]*)Started HTTP server on port: '(\d+)'"), timeout=15)[0]
result = dut1.expect(re.compile(r"(?:[\s\S]*)Max URI handlers: '(\d+)'(?:[\s\S]*)Max Open Sessions: '(\d+)'(?:[\s\S]*)Max Header Length: '(\d+)'(?:[\s\S]*)Max URI Length: '(\d+)'(?:[\s\S]*)Max Stack Size: '(\d+)'"), timeout=15)
max_uri_handlers = int(result[0])
result = dut1.expect(re.compile(r"(?:[\s\S]*)Max URI handlers: '(\d+)'(?:[\s\S]*)Max Open Sessions: " # noqa: W605
r"'(\d+)'(?:[\s\S]*)Max Header Length: '(\d+)'(?:[\s\S]*)Max URI Length: "
r"'(\d+)'(?:[\s\S]*)Max Stack Size: '(\d+)'"), timeout=15)
# max_uri_handlers = int(result[0])
max_sessions = int(result[1])
max_hdr_len = int(result[2])
max_uri_len = int(result[3])
@ -95,9 +98,9 @@ def test_examples_protocol_http_server_advanced(env, extra_data):
if not client.recv_timeout_test(got_ip, got_port):
failed = True
## This test fails a lot! Enable when connection is stable
#test_size = 50*1024 # 50KB
#if not client.packet_size_limit_test(got_ip, got_port, test_size):
# This test fails a lot! Enable when connection is stable
# test_size = 50*1024 # 50KB
# if not client.packet_size_limit_test(got_ip, got_port, test_size):
# Utility.console_log("Ignoring failure")
Utility.console_log("Getting initial stack usage...")
@ -106,7 +109,7 @@ def test_examples_protocol_http_server_advanced(env, extra_data):
inital_stack = int(dut1.expect(re.compile(r"(?:[\s\S]*)Free Stack for server task: '(\d+)'"), timeout=15)[0])
if inital_stack < 0.1*max_stack_size:
if inital_stack < 0.1 * max_stack_size:
Utility.console_log("More than 90% of stack being used on server start")
failed = True
@ -158,12 +161,13 @@ def test_examples_protocol_http_server_advanced(env, extra_data):
final_stack = int(dut1.expect(re.compile(r"(?:[\s\S]*)Free Stack for server task: '(\d+)'"), timeout=15)[0])
if final_stack < 0.05*max_stack_size:
if final_stack < 0.05 * max_stack_size:
Utility.console_log("More than 95% of stack got used during tests")
failed = True
if failed:
raise RuntimeError
if __name__ == '__main__':
test_examples_protocol_http_server_advanced()

View file

@ -94,7 +94,7 @@
# - Server should automatically close the socket
############# TODO TESTS #############
# ############ TODO TESTS #############
# 3. Stress Tests
#
@ -131,8 +131,6 @@
from __future__ import division
from __future__ import print_function
from future import standard_library
standard_library.install_aliases()
from builtins import str
from builtins import range
from builtins import object
@ -148,9 +146,10 @@ import Utility
_verbose_ = False
class Session(object):
def __init__(self, addr, port, timeout = 15):
self.client = socket.create_connection((addr, int(port)), timeout = timeout)
def __init__(self, addr, port, timeout=15):
self.client = socket.create_connection((addr, int(port)), timeout=timeout)
self.target = addr
self.status = 0
self.encoding = ''
@ -160,7 +159,7 @@ class Session(object):
def send_err_check(self, request, data=None):
rval = True
try:
self.client.sendall(request.encode());
self.client.sendall(request.encode())
if data:
self.client.sendall(data.encode())
except socket.error as err:
@ -173,24 +172,24 @@ class Session(object):
request = "GET " + path + " HTTP/1.1\r\nHost: " + self.target
if headers:
for field, value in headers.items():
request += "\r\n"+field+": "+value
request += "\r\n" + field + ": " + value
request += "\r\n\r\n"
return self.send_err_check(request)
def send_put(self, path, data, headers=None):
request = "PUT " + path + " HTTP/1.1\r\nHost: " + self.target
request = "PUT " + path + " HTTP/1.1\r\nHost: " + self.target
if headers:
for field, value in headers.items():
request += "\r\n"+field+": "+value
request += "\r\nContent-Length: " + str(len(data)) +"\r\n\r\n"
request += "\r\n" + field + ": " + value
request += "\r\nContent-Length: " + str(len(data)) + "\r\n\r\n"
return self.send_err_check(request, data)
def send_post(self, path, data, headers=None):
request = "POST " + path + " HTTP/1.1\r\nHost: " + self.target
request = "POST " + path + " HTTP/1.1\r\nHost: " + self.target
if headers:
for field, value in headers.items():
request += "\r\n"+field+": "+value
request += "\r\nContent-Length: " + str(len(data)) +"\r\n\r\n"
request += "\r\n" + field + ": " + value
request += "\r\nContent-Length: " + str(len(data)) + "\r\n\r\n"
return self.send_err_check(request, data)
def read_resp_hdrs(self):
@ -284,6 +283,7 @@ class Session(object):
def close(self):
self.client.close()
def test_val(text, expected, received):
if expected != received:
Utility.console_log(" Fail!")
@ -293,6 +293,7 @@ def test_val(text, expected, received):
return False
return True
class adder_thread (threading.Thread):
def __init__(self, id, dut, port):
threading.Thread.__init__(self)
@ -329,6 +330,7 @@ class adder_thread (threading.Thread):
def close(self):
self.session.close()
def get_hello(dut, port):
# GET /hello should return 'Hello World!'
Utility.console_log("[test] GET /hello returns 'Hello World!' =>", end=' ')
@ -348,6 +350,7 @@ def get_hello(dut, port):
conn.close()
return True
def put_hello(dut, port):
# PUT /hello returns 405'
Utility.console_log("[test] PUT /hello returns 405 =>", end=' ')
@ -361,6 +364,7 @@ def put_hello(dut, port):
conn.close()
return True
def post_hello(dut, port):
# POST /hello returns 405'
Utility.console_log("[test] POST /hello returns 404 =>", end=' ')
@ -374,6 +378,7 @@ def post_hello(dut, port):
conn.close()
return True
def post_echo(dut, port):
# POST /echo echoes data'
Utility.console_log("[test] POST /echo echoes data =>", end=' ')
@ -390,6 +395,7 @@ def post_echo(dut, port):
conn.close()
return True
def put_echo(dut, port):
# PUT /echo echoes data'
Utility.console_log("[test] PUT /echo echoes data =>", end=' ')
@ -406,6 +412,7 @@ def put_echo(dut, port):
conn.close()
return True
def get_echo(dut, port):
# GET /echo returns 404'
Utility.console_log("[test] GET /echo returns 405 =>", end=' ')
@ -419,6 +426,7 @@ def get_echo(dut, port):
conn.close()
return True
def get_hello_type(dut, port):
# GET /hello/type_html returns text/html as Content-Type'
Utility.console_log("[test] GET /hello/type_html has Content-Type of text/html =>", end=' ')
@ -438,6 +446,7 @@ def get_hello_type(dut, port):
conn.close()
return True
def get_hello_status(dut, port):
# GET /hello/status_500 returns status 500'
Utility.console_log("[test] GET /hello/status_500 returns status 500 =>", end=' ')
@ -451,6 +460,7 @@ def get_hello_status(dut, port):
conn.close()
return True
def get_false_uri(dut, port):
# GET /false_uri returns status 404'
Utility.console_log("[test] GET /false_uri returns status 404 =>", end=' ')
@ -464,6 +474,7 @@ def get_false_uri(dut, port):
conn.close()
return True
def parallel_sessions_adder(dut, port, max_sessions):
# POSTs on /adder in parallel sessions
Utility.console_log("[test] POST {pipelined} on /adder in " + str(max_sessions) + " sessions =>", end=' ')
@ -487,6 +498,7 @@ def parallel_sessions_adder(dut, port, max_sessions):
Utility.console_log("Success")
return res
def async_response_test(dut, port):
# Test that an asynchronous work is executed in the HTTPD's context
# This is tested by reading two responses over the same session
@ -506,6 +518,7 @@ def async_response_test(dut, port):
Utility.console_log("Success")
return True
def leftover_data_test(dut, port):
# Leftover data in POST is purged (valid and invalid URIs)
Utility.console_log("[test] Leftover data in POST is purged (valid and invalid URIs) =>", end=' ')
@ -540,6 +553,7 @@ def leftover_data_test(dut, port):
Utility.console_log("Success")
return True
def spillover_session(dut, port, max_sess):
# Session max_sess_sessions + 1 is rejected
Utility.console_log("[test] Session max_sess_sessions (" + str(max_sess) + ") + 1 is rejected =>", end=' ')
@ -556,7 +570,7 @@ def spillover_session(dut, port, max_sess):
a.close()
break
s.append(a)
except:
except Exception:
if (_verbose_):
Utility.console_log("Connection " + str(i) + " rejected")
a.close()
@ -570,6 +584,7 @@ def spillover_session(dut, port, max_sess):
Utility.console_log(["Fail","Success"][len(s) == max_sess])
return (len(s) == max_sess)
def recv_timeout_test(dut, port):
Utility.console_log("[test] Timeout occurs if partial packet sent =>", end=' ')
s = Session(dut, port)
@ -583,6 +598,7 @@ def recv_timeout_test(dut, port):
Utility.console_log("Success")
return True
def packet_size_limit_test(dut, port, test_size):
Utility.console_log("[test] send size limit test =>", end=' ')
retry = 5
@ -590,14 +606,14 @@ def packet_size_limit_test(dut, port, test_size):
retry -= 1
Utility.console_log("data size = ", test_size)
s = http.client.HTTPConnection(dut + ":" + port, timeout=15)
random_data = ''.join(string.printable[random.randint(0,len(string.printable))-1] for _ in list(range(test_size)))
random_data = ''.join(string.printable[random.randint(0,len(string.printable)) - 1] for _ in list(range(test_size)))
path = "/echo"
s.request("POST", url=path, body=random_data)
resp = s.getresponse()
if not test_val("Error", "200", str(resp.status)):
if test_val("Error", "500", str(resp.status)):
Utility.console_log("Data too large to be allocated")
test_size = test_size//10
test_size = test_size // 10
else:
Utility.console_log("Unexpected error")
s.close()
@ -616,6 +632,7 @@ def packet_size_limit_test(dut, port, test_size):
Utility.console_log("Failed")
return False
def code_500_server_error_test(dut, port):
Utility.console_log("[test] 500 Server Error test =>", end=' ')
s = Session(dut, port)
@ -623,7 +640,7 @@ def code_500_server_error_test(dut, port):
content_len = 2**31
s.client.sendall(("POST /echo HTTP/1.1\r\nHost: " + dut + "\r\nContent-Length: " + str(content_len) + "\r\n\r\nABCD").encode())
s.read_resp_hdrs()
resp = s.read_resp_data()
s.read_resp_data()
if not test_val("Server Error", "500", s.status):
s.close()
return False
@ -631,17 +648,18 @@ def code_500_server_error_test(dut, port):
Utility.console_log("Success")
return True
def code_501_method_not_impl(dut, port):
Utility.console_log("[test] 501 Method Not Implemented =>", end=' ')
s = Session(dut, port)
path = "/hello"
s.client.sendall(("ABC " + path + " HTTP/1.1\r\nHost: " + dut + "\r\n\r\n").encode())
s.read_resp_hdrs()
resp = s.read_resp_data()
s.read_resp_data()
# Presently server sends back 400 Bad Request
#if not test_val("Server Error", "501", s.status):
#s.close()
#return False
# if not test_val("Server Error", "501", s.status):
# s.close()
# return False
if not test_val("Server Error", "400", s.status):
s.close()
return False
@ -649,13 +667,14 @@ def code_501_method_not_impl(dut, port):
Utility.console_log("Success")
return True
def code_505_version_not_supported(dut, port):
Utility.console_log("[test] 505 Version Not Supported =>", end=' ')
s = Session(dut, port)
path = "/hello"
s.client.sendall(("GET " + path + " HTTP/2.0\r\nHost: " + dut + "\r\n\r\n").encode())
s.read_resp_hdrs()
resp = s.read_resp_data()
s.read_resp_data()
if not test_val("Server Error", "505", s.status):
s.close()
return False
@ -663,13 +682,14 @@ def code_505_version_not_supported(dut, port):
Utility.console_log("Success")
return True
def code_400_bad_request(dut, port):
Utility.console_log("[test] 400 Bad Request =>", end=' ')
s = Session(dut, port)
path = "/hello"
s.client.sendall(("XYZ " + path + " HTTP/1.1\r\nHost: " + dut + "\r\n\r\n").encode())
s.read_resp_hdrs()
resp = s.read_resp_data()
s.read_resp_data()
if not test_val("Client Error", "400", s.status):
s.close()
return False
@ -677,13 +697,14 @@ def code_400_bad_request(dut, port):
Utility.console_log("Success")
return True
def code_404_not_found(dut, port):
Utility.console_log("[test] 404 Not Found =>", end=' ')
s = Session(dut, port)
path = "/dummy"
s.client.sendall(("GET " + path + " HTTP/1.1\r\nHost: " + dut + "\r\n\r\n").encode())
s.read_resp_hdrs()
resp = s.read_resp_data()
s.read_resp_data()
if not test_val("Client Error", "404", s.status):
s.close()
return False
@ -691,13 +712,14 @@ def code_404_not_found(dut, port):
Utility.console_log("Success")
return True
def code_405_method_not_allowed(dut, port):
Utility.console_log("[test] 405 Method Not Allowed =>", end=' ')
s = Session(dut, port)
path = "/hello"
s.client.sendall(("POST " + path + " HTTP/1.1\r\nHost: " + dut + "\r\n\r\n").encode())
s.read_resp_hdrs()
resp = s.read_resp_data()
s.read_resp_data()
if not test_val("Client Error", "405", s.status):
s.close()
return False
@ -705,12 +727,13 @@ def code_405_method_not_allowed(dut, port):
Utility.console_log("Success")
return True
def code_408_req_timeout(dut, port):
Utility.console_log("[test] 408 Request Timeout =>", end=' ')
s = Session(dut, port)
s.client.sendall(("POST /echo HTTP/1.1\r\nHost: " + dut + "\r\nContent-Length: 10\r\n\r\nABCD").encode())
s.read_resp_hdrs()
resp = s.read_resp_data()
s.read_resp_data()
if not test_val("Client Error", "408", s.status):
s.close()
return False
@ -718,17 +741,18 @@ def code_408_req_timeout(dut, port):
Utility.console_log("Success")
return True
def code_411_length_required(dut, port):
Utility.console_log("[test] 411 Length Required =>", end=' ')
s = Session(dut, port)
path = "/echo"
s.client.sendall(("POST " + path + " HTTP/1.1\r\nHost: " + dut + "\r\nContent-Type: text/plain\r\nTransfer-Encoding: chunked\r\n\r\n").encode())
s.read_resp_hdrs()
resp = s.read_resp_data()
s.read_resp_data()
# Presently server sends back 400 Bad Request
#if not test_val("Client Error", "411", s.status):
#s.close()
#return False
# if not test_val("Client Error", "411", s.status):
# s.close()
# return False
if not test_val("Client Error", "400", s.status):
s.close()
return False
@ -736,21 +760,23 @@ def code_411_length_required(dut, port):
Utility.console_log("Success")
return True
def send_getx_uri_len(dut, port, length):
s = Session(dut, port)
method = "GET "
version = " HTTP/1.1\r\n"
path = "/"+"x"*(length - len(method) - len(version) - len("/"))
path = "/" + "x" * (length - len(method) - len(version) - len("/"))
s.client.sendall(method.encode())
time.sleep(1)
s.client.sendall(path.encode())
time.sleep(1)
s.client.sendall((version + "Host: " + dut + "\r\n\r\n").encode())
s.read_resp_hdrs()
resp = s.read_resp_data()
s.read_resp_data()
s.close()
return s.status
def code_414_uri_too_long(dut, port, max_uri_len):
Utility.console_log("[test] 414 URI Too Long =>", end=' ')
status = send_getx_uri_len(dut, port, max_uri_len)
@ -762,16 +788,17 @@ def code_414_uri_too_long(dut, port, max_uri_len):
Utility.console_log("Success")
return True
def send_postx_hdr_len(dut, port, length):
s = Session(dut, port)
path = "/echo"
host = "Host: " + dut
custom_hdr_field = "\r\nCustom: "
custom_hdr_val = "x"*(length - len(host) - len(custom_hdr_field) - len("\r\n\r\n") + len("0"))
custom_hdr_val = "x" * (length - len(host) - len(custom_hdr_field) - len("\r\n\r\n") + len("0"))
request = ("POST " + path + " HTTP/1.1\r\n" + host + custom_hdr_field + custom_hdr_val + "\r\n\r\n").encode()
s.client.sendall(request[:length//2])
s.client.sendall(request[:length // 2])
time.sleep(1)
s.client.sendall(request[length//2:])
s.client.sendall(request[length // 2:])
hdr = s.read_resp_hdrs()
resp = s.read_resp_data()
s.close()
@ -779,6 +806,7 @@ def send_postx_hdr_len(dut, port, length):
return (hdr["Custom"] == custom_hdr_val), resp
return False, s.status
def code_431_hdr_too_long(dut, port, max_hdr_len):
Utility.console_log("[test] 431 Header Too Long =>", end=' ')
res, status = send_postx_hdr_len(dut, port, max_hdr_len)
@ -790,13 +818,14 @@ def code_431_hdr_too_long(dut, port, max_hdr_len):
Utility.console_log("Success")
return True
def test_upgrade_not_supported(dut, port):
Utility.console_log("[test] Upgrade Not Supported =>", end=' ')
s = Session(dut, port)
path = "/hello"
# path = "/hello"
s.client.sendall(("OPTIONS * HTTP/1.1\r\nHost:" + dut + "\r\nUpgrade: TLS/1.0\r\nConnection: Upgrade\r\n\r\n").encode())
s.read_resp_hdrs()
resp = s.read_resp_data()
s.read_resp_data()
if not test_val("Client Error", "200", s.status):
s.close()
return False
@ -804,8 +833,9 @@ def test_upgrade_not_supported(dut, port):
Utility.console_log("Success")
return True
if __name__ == '__main__':
########### Execution begins here...
# Execution begins here...
# Configuration
# Max number of threads/sessions
max_sessions = 7
@ -849,7 +879,7 @@ if __name__ == '__main__':
test_upgrade_not_supported(dut, port)
# Not supported yet (Error on chunked request)
###code_411_length_required(dut, port)
# code_411_length_required(dut, port)
Utility.console_log("### Sessions and Context Tests")
parallel_sessions_adder(dut, port, max_sessions)
@ -857,7 +887,7 @@ if __name__ == '__main__':
async_response_test(dut, port)
spillover_session(dut, port, max_sessions)
recv_timeout_test(dut, port)
packet_size_limit_test(dut, port, 50*1024)
packet_size_limit_test(dut, port, 50 * 1024)
get_hello(dut, port)
sys.exit()

View file

@ -23,28 +23,29 @@ import imp
import re
import os
import sys
import string
import random
import socket
# This environment variable is expected on the host machine
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
try:
import IDF
except ImportError:
# This environment variable is expected on the host machine
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import Utility
# When running on local machine execute the following before running this script
# > make app bootloader
# > make print_flash_cmd | tail -n 1 > build/download.config
# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
import TinyFW
import IDF
import Utility
# Import client module
expath = os.path.dirname(os.path.realpath(__file__))
client = imp.load_source("client", expath + "/scripts/adder.py")
@IDF.idf_example_test(env_tag="Example_WIFI")
def test_examples_protocol_http_server_persistence(env, extra_data):
# Acquire DUT
@ -53,8 +54,8 @@ def test_examples_protocol_http_server_persistence(env, extra_data):
# Get binary file
binary_file = os.path.join(dut1.app.binary_path, "persistent_sockets.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("http_server_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("http_server_bin_size", bin_size//1024)
IDF.log_performance("http_server_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("http_server_bin_size", bin_size // 1024)
# Upload binary and start testing
Utility.console_log("Starting http_server persistance test app")
@ -97,7 +98,7 @@ def test_examples_protocol_http_server_persistence(env, extra_data):
dut1.expect("PUT allocating new session", timeout=30)
# Not expected
raise RuntimeError
except:
except Exception:
# As expected
pass
@ -126,7 +127,7 @@ def test_examples_protocol_http_server_persistence(env, extra_data):
Utility.console_log("Validating user context data")
# Start another session to check user context data
conn2 = client.start_session(got_ip, got_port)
client.start_session(got_ip, got_port)
num = random.randint(0,100)
client.putreq(conn, "/adder", str(num))
visitor += 1
@ -136,5 +137,6 @@ def test_examples_protocol_http_server_persistence(env, extra_data):
client.end_session(conn)
dut1.expect("/adder Free Context function called", timeout=30)
if __name__ == '__main__':
test_examples_protocol_http_server_persistence()

View file

@ -16,20 +16,22 @@
from __future__ import print_function
from __future__ import unicode_literals
from future import standard_library
standard_library.install_aliases()
from builtins import str
from builtins import range
import http.client
import argparse
import Utility
def start_session (ip, port):
def start_session(ip, port):
return http.client.HTTPConnection(ip, int(port), timeout=15)
def end_session (conn):
def end_session(conn):
conn.close()
def getreq (conn, path, verbose = False):
def getreq(conn, path, verbose=False):
conn.request("GET", path)
resp = conn.getresponse()
data = resp.read()
@ -41,7 +43,8 @@ def getreq (conn, path, verbose = False):
Utility.console_log("Data content : " + data)
return data
def postreq (conn, path, data, verbose = False):
def postreq(conn, path, data, verbose=False):
conn.request("POST", path, data)
resp = conn.getresponse()
data = resp.read()
@ -53,7 +56,8 @@ def postreq (conn, path, data, verbose = False):
Utility.console_log("Data content : " + data)
return data
def putreq (conn, path, body, verbose = False):
def putreq(conn, path, body, verbose=False):
conn.request("PUT", path, body)
resp = conn.getresponse()
data = resp.read()
@ -65,12 +69,13 @@ def putreq (conn, path, body, verbose = False):
Utility.console_log("Data content : " + data)
return data
if __name__ == '__main__':
# Configure argument parser
parser = argparse.ArgumentParser(description='Run HTTPd Test')
parser.add_argument('IP' , metavar='IP' , type=str, help='Server IP')
parser.add_argument('IP', metavar='IP', type=str, help='Server IP')
parser.add_argument('port', metavar='port', type=str, help='Server port')
parser.add_argument('N' , metavar='integer', type=int, help='Integer to sum upto')
parser.add_argument('N', metavar='integer', type=int, help='Integer to sum upto')
args = vars(parser.parse_args())
# Get arguments
@ -80,21 +85,21 @@ if __name__ == '__main__':
# Establish HTTP connection
Utility.console_log("Connecting to => " + ip + ":" + port)
conn = start_session (ip, port)
conn = start_session(ip, port)
# Reset adder context to specified value(0)
# -- Not needed as new connection will always
# -- have zero value of the accumulator
Utility.console_log("Reset the accumulator to 0")
putreq (conn, "/adder", str(0))
putreq(conn, "/adder", str(0))
# Sum numbers from 1 to specified value(N)
Utility.console_log("Summing numbers from 1 to " + str(N))
for i in range(1, N+1):
postreq (conn, "/adder", str(i))
for i in range(1, N + 1):
postreq(conn, "/adder", str(i))
# Fetch the result
Utility.console_log("Result :" + getreq (conn, "/adder"))
Utility.console_log("Result :" + getreq(conn, "/adder"))
# Close HTTP connection
end_session (conn)
end_session(conn)

View file

@ -24,26 +24,29 @@ import os
import sys
import string
import random
import socket
# This environment variable is expected on the host machine
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
try:
import IDF
except ImportError:
# This environment variable is expected on the host machine
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import Utility
# When running on local machine execute the following before running this script
# > make app bootloader
# > make print_flash_cmd | tail -n 1 > build/download.config
# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
import TinyFW
import IDF
import Utility
# Import client module
expath = os.path.dirname(os.path.realpath(__file__))
client = imp.load_source("client", expath + "/scripts/client.py")
@IDF.idf_example_test(env_tag="Example_WIFI")
def test_examples_protocol_http_server_simple(env, extra_data):
# Acquire DUT
@ -52,8 +55,8 @@ def test_examples_protocol_http_server_simple(env, extra_data):
# Get binary file
binary_file = os.path.join(dut1.app.binary_path, "simple.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("http_server_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("http_server_bin_size", bin_size//1024)
IDF.log_performance("http_server_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("http_server_bin_size", bin_size // 1024)
# Upload binary and start testing
Utility.console_log("Starting http_server simple test app")
@ -77,7 +80,7 @@ def test_examples_protocol_http_server_simple(env, extra_data):
raise RuntimeError
# Acquire host IP. Need a way to check it
host_ip = dut1.expect(re.compile(r"(?:[\s\S]*)Found header => Host: (\d+.\d+.\d+.\d+)"), timeout=30)[0]
dut1.expect(re.compile(r"(?:[\s\S]*)Found header => Host: (\d+.\d+.\d+.\d+)"), timeout=30)[0]
# Match additional headers sent in the request
dut1.expect("Found header => Test-Header-2: Test-Value-2", timeout=30)
@ -94,7 +97,7 @@ def test_examples_protocol_http_server_simple(env, extra_data):
dut1.expect("Registering /hello and /echo URIs", timeout=30)
# Generate random data of 10KB
random_data = ''.join(string.printable[random.randint(0,len(string.printable))-1] for _ in range(10*1024))
random_data = ''.join(string.printable[random.randint(0,len(string.printable)) - 1] for _ in range(10 * 1024))
Utility.console_log("Test /echo POST handler with random data")
if not client.test_post_handler(got_ip, got_port, random_data):
raise RuntimeError
@ -117,5 +120,6 @@ def test_examples_protocol_http_server_simple(env, extra_data):
raise RuntimeError
dut1.expect("400 Bad Request - Server unable to understand request due to invalid syntax", timeout=30)
if __name__ == '__main__':
test_examples_protocol_http_server_simple()

View file

@ -16,21 +16,22 @@
from __future__ import print_function
from __future__ import unicode_literals
from future import standard_library
standard_library.install_aliases()
from builtins import str
import http.client
import argparse
import Utility
def verbose_print(verbosity, *args):
if (verbosity):
Utility.console_log(''.join(str(elems) for elems in args))
def test_get_handler(ip, port, verbosity = False):
def test_get_handler(ip, port, verbosity=False):
verbose_print(verbosity, "======== GET HANDLER TEST =============")
# Establish HTTP connection
verbose_print(verbosity, "Connecting to => " + ip + ":" + port)
sess = http.client.HTTPConnection(ip + ":" + port, timeout = 15)
sess = http.client.HTTPConnection(ip + ":" + port, timeout=15)
uri = "/hello?query1=value1&query2=value2&query3=value3"
# GET hello response
@ -48,7 +49,7 @@ def test_get_handler(ip, port, verbosity = False):
return False
if resp.getheader("Custom-Header-2") != "Custom-Value-2":
return False
except:
except Exception:
return False
verbose_print(verbosity, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
@ -63,11 +64,12 @@ def test_get_handler(ip, port, verbosity = False):
sess.close()
return (resp_data == "Hello World!")
def test_post_handler(ip, port, msg, verbosity = False):
def test_post_handler(ip, port, msg, verbosity=False):
verbose_print(verbosity, "======== POST HANDLER TEST ============")
# Establish HTTP connection
verbose_print(verbosity, "Connecting to => " + ip + ":" + port)
sess = http.client.HTTPConnection(ip + ":" + port, timeout = 15)
sess = http.client.HTTPConnection(ip + ":" + port, timeout=15)
# POST message to /echo and get back response
sess.request("POST", url="/echo", body=msg)
@ -82,11 +84,12 @@ def test_post_handler(ip, port, msg, verbosity = False):
sess.close()
return (resp_data == msg)
def test_put_handler(ip, port, verbosity = False):
def test_put_handler(ip, port, verbosity=False):
verbose_print(verbosity, "======== PUT HANDLER TEST =============")
# Establish HTTP connection
verbose_print(verbosity, "Connecting to => " + ip + ":" + port)
sess = http.client.HTTPConnection(ip + ":" + port, timeout = 15)
sess = http.client.HTTPConnection(ip + ":" + port, timeout=15)
# PUT message to /ctrl to disable /hello URI handler
verbose_print(verbosity, "Disabling /hello handler")
@ -114,11 +117,12 @@ def test_put_handler(ip, port, verbosity = False):
sess.close()
return ((resp_data2 == "Hello World!") and (resp_data1 == "This URI doesn't exist"))
def test_custom_uri_query(ip, port, query, verbosity = False):
def test_custom_uri_query(ip, port, query, verbosity=False):
verbose_print(verbosity, "======== GET HANDLER TEST =============")
# Establish HTTP connection
verbose_print(verbosity, "Connecting to => " + ip + ":" + port)
sess = http.client.HTTPConnection(ip + ":" + port, timeout = 15)
sess = http.client.HTTPConnection(ip + ":" + port, timeout=15)
uri = "/hello?" + query
# GET hello response
@ -136,10 +140,11 @@ def test_custom_uri_query(ip, port, query, verbosity = False):
sess.close()
return (resp_data == "Hello World!")
if __name__ == '__main__':
# Configure argument parser
parser = argparse.ArgumentParser(description='Run HTTPd Test')
parser.add_argument('IP' , metavar='IP' , type=str, help='Server IP')
parser.add_argument('IP', metavar='IP', type=str, help='Server IP')
parser.add_argument('port', metavar='port', type=str, help='Server port')
parser.add_argument('msg', metavar='message', type=str, help='Message to be sent to server')
args = vars(parser.parse_args())
@ -149,9 +154,9 @@ if __name__ == '__main__':
port = args['port']
msg = args['msg']
if not test_get_handler (ip, port, True):
if not test_get_handler(ip, port, True):
Utility.console_log("Failed!")
if not test_post_handler(ip, port, msg, True):
Utility.console_log("Failed!")
if not test_put_handler (ip, port, True):
if not test_put_handler(ip, port, True):
Utility.console_log("Failed!")

View file

@ -2,16 +2,17 @@ import re
import os
import sys
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
@IDF.idf_example_test(env_tag="Example_WIFI", ignore=True)
@ -26,8 +27,8 @@ def test_examples_protocol_https_request(env, extra_data):
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "https_request.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("https_request_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("https_request_bin_size", bin_size//1024)
IDF.log_performance("https_request_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("https_request_bin_size", bin_size // 1024)
# start test
dut1.start_app()
dut1.expect("Connection established...", timeout=30)

View file

@ -3,38 +3,41 @@ import os
import sys
import socket
import time
import imp
import struct
import dpkt, dpkt.dns
import dpkt
import dpkt.dns
from threading import Thread
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
try:
import IDF
except ImportError:
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import TinyFW
import IDF
import DUT
g_run_server = True
g_done = False
def mdns_server(esp_host):
global g_run_server
global g_done
UDP_IP="0.0.0.0"
UDP_PORT=5353
UDP_IP = "0.0.0.0"
UDP_PORT = 5353
MCAST_GRP = '224.0.0.251'
sock = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.bind( (UDP_IP,UDP_PORT) )
sock.bind((UDP_IP,UDP_PORT))
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
@ -47,30 +50,31 @@ def mdns_server(esp_host):
arr.cls = dpkt.dns.DNS_IN
arr.type = dpkt.dns.DNS_A
arr.name = u'tinytester.local'
arr.ip =socket.inet_aton('127.0.0.1')
arr.ip = socket.inet_aton('127.0.0.1')
resp_dns. an.append(arr)
sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
while g_run_server:
try:
m=sock.recvfrom( 1024 );
m = sock.recvfrom(1024)
dns = dpkt.dns.DNS(m[0])
if len(dns.qd)>0 and dns.qd[0].type == dpkt.dns.DNS_A:
if len(dns.qd) > 0 and dns.qd[0].type == dpkt.dns.DNS_A:
if dns.qd[0].name == u'tinytester.local':
print (dns.__repr__(),dns.qd[0].name)
print(dns.__repr__(),dns.qd[0].name)
sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
if len(dns.an)>0 and dns.an[0].type == dpkt.dns.DNS_A:
if dns.an[0].name == esp_host + u'.local':
if len(dns.an) > 0 and dns.an[0].type == dpkt.dns.DNS_A:
if dns.an[0].name == esp_host + u'.local':
print("Received answer esp32-mdns query")
g_done = True
print (dns.an[0].name)
print(dns.an[0].name)
dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
dns.qd[0].name= esp_host + u'.local'
dns.qd[0].name = esp_host + u'.local'
sock.sendto(dns.pack(),(MCAST_GRP,UDP_PORT))
print("Sending esp32-mdns query")
time.sleep(0.5)
except socket.timeout:
break
@IDF.idf_example_test(env_tag="Example_WIFI")
def test_examples_protocol_mdns(env, extra_data):
global g_run_server
@ -86,20 +90,19 @@ def test_examples_protocol_mdns(env, extra_data):
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mdns-test.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("mdns-test_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("mdns-test_bin_size", bin_size//1024)
IDF.log_performance("mdns-test_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("mdns-test_bin_size", bin_size // 1024)
# 1. start mdns application
dut1.start_app()
# 2. get the dut host name (and IP address)
specific_host = dut1.expect(re.compile(r"mdns hostname set to: \[([^\]]+)\]"), timeout=30)
specific_host = str(specific_host[0])
dut_ip = ""
try:
dut_ip = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
# 3. check the mdns name is accessible
thread1 = Thread(target = mdns_server, args = (specific_host,))
thread1 = Thread(target=mdns_server, args=(specific_host,))
thread1.start()
start = time.time()
while (time.time() - start) <= 60:
@ -108,10 +111,11 @@ def test_examples_protocol_mdns(env, extra_data):
break
g_run_server = False
thread1.join()
if g_done == False:
if g_done is False:
raise ValueError('Test has failed: did not receive mdns answer within timeout')
# 4. check DUT output if mdns advertized host is resolved
dut1.expect(re.compile(r"mdns-test: Query A: tinytester.local resolved to: 127.0.0.1"), timeout=30)
if __name__ == '__main__':
test_examples_protocol_mdns()

View file

@ -2,45 +2,46 @@ import re
import os
import sys
import time
import socket
import imp
import ssl
import paho.mqtt.client as mqtt
g_recv_data=""
g_recv_topic=""
g_broker_connected=0
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import DUT
g_recv_data = ""
g_recv_topic = ""
g_broker_connected = 0
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
global g_broker_connected
print("Connected with result code "+str(rc))
print("Connected with result code " + str(rc))
g_broker_connected = 1
client.subscribe("/topic/qos0")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
global g_recv_topic
global g_recv_data
payload = msg.payload.decode()
if g_recv_data == "" and payload == "data":
if g_recv_data == "" and payload == "data":
client.publish("/topic/qos0", "data_to_esp32")
g_recv_topic = msg.topic
g_recv_data = payload
print(msg.topic+" "+str(payload))
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
import DUT
print(msg.topic + " " + str(payload))
@IDF.idf_example_test(env_tag="Example_WIFI")
@ -48,7 +49,7 @@ def test_examples_protocol_mqtt_ssl(env, extra_data):
global g_recv_topic
global g_recv_data
global g_broker_connected
broker_url="iot.eclipse.org"
broker_url = "iot.eclipse.org"
"""
steps: |
1. join AP and connects to ssl broker
@ -60,8 +61,8 @@ def test_examples_protocol_mqtt_ssl(env, extra_data):
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_ssl.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("mqtt_ssl_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("mqtt_ssl_size", bin_size//1024)
IDF.log_performance("mqtt_ssl_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("mqtt_ssl_size", bin_size // 1024)
# 1. start test (and check the environment is healthy)
dut1.start_app()
client = None
@ -73,15 +74,15 @@ def test_examples_protocol_mqtt_ssl(env, extra_data):
client.on_connect = on_connect
client.on_message = on_message
client.tls_set(None,
None,
None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)
None,
None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)
client.tls_insecure_set(True)
print("Connecting...")
client.connect(broker_url, 8883, 60)
print("...done")
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
except:
except Exception:
print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_url, sys.exc_info()[0]))
raise
print("Start Looping...")
@ -92,7 +93,7 @@ def test_examples_protocol_mqtt_ssl(env, extra_data):
if g_broker_connected == 0:
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url))
# 3. check the message received back from the server
if g_recv_topic == "/topic/qos0" and g_recv_data == "data" :
if g_recv_topic == "/topic/qos0" and g_recv_data == "data":
print("PASS: Received correct message")
else:
print("Failure!")
@ -100,5 +101,6 @@ def test_examples_protocol_mqtt_ssl(env, extra_data):
# 4. check that the esp32 client received data sent by this python client
dut1.expect(re.compile(r"DATA=data_to_esp32"), timeout=30)
if __name__ == '__main__':
test_examples_protocol_mqtt_ssl()

View file

@ -1,36 +1,53 @@
import re
import os
import sys
from socket import *
import socket
from threading import Thread
import struct
import time
msgid=-1
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import DUT
msgid = -1
def get_my_ip():
s1 = socket(AF_INET, SOCK_DGRAM)
s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s1.connect(("8.8.8.8", 80))
my_ip = s1.getsockname()[0]
s1.close()
return my_ip
def mqqt_server_sketch(my_ip, port):
global msgid
print("Starting the server on {}".format(my_ip))
s = None
try:
s=socket(AF_INET, SOCK_STREAM)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(60)
s.bind((my_ip, port))
s.listen(1)
q,addr=s.accept()
q,addr = s.accept()
q.settimeout(30)
print("connection accepted")
except:
except Exception:
print("Local server on {}:{} listening/accepting failure: {}"
"Possibly check permissions or firewall settings"
"to accept connections on this address".format(my_ip, port, sys.exc_info()[0]))
"Possibly check permissions or firewall settings"
"to accept connections on this address".format(my_ip, port, sys.exc_info()[0]))
raise
data = q.recv(1024)
# check if received initial empty message
@ -47,20 +64,6 @@ def mqqt_server_sketch(my_ip, port):
s.close()
print("server closed")
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
import DUT
@IDF.idf_example_test(env_tag="Example_WIFI")
def test_examples_protocol_mqtt_qos1(env, extra_data):
@ -76,11 +79,11 @@ def test_examples_protocol_mqtt_qos1(env, extra_data):
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_tcp.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("mqtt_tcp_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("mqtt_tcp_size", bin_size//1024)
IDF.log_performance("mqtt_tcp_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("mqtt_tcp_size", bin_size // 1024)
# 1. start mqtt broker sketch
host_ip = get_my_ip()
thread1 = Thread(target = mqqt_server_sketch, args = (host_ip,1883))
thread1 = Thread(target=mqqt_server_sketch, args=(host_ip,1883))
thread1.start()
# 2. start the dut test and wait till client gets IP address
dut1.start_app()
@ -91,10 +94,10 @@ def test_examples_protocol_mqtt_qos1(env, extra_data):
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
print ("writing to device: {}".format("mqtt://" + host_ip + "\n"))
print("writing to device: {}".format("mqtt://" + host_ip + "\n"))
dut1.write("mqtt://" + host_ip + "\n")
thread1.join()
print ("Message id received from server: {}".format(msgid))
print("Message id received from server: {}".format(msgid))
# 3. check the message id was enqueued and then deleted
msgid_enqueued = dut1.expect(re.compile(r"OUTBOX: ENQUEUE msgid=([0-9]+)"), timeout=30)
msgid_deleted = dut1.expect(re.compile(r"OUTBOX: DELETED msgid=([0-9]+)"), timeout=30)
@ -105,5 +108,6 @@ def test_examples_protocol_mqtt_qos1(env, extra_data):
print("Failure!")
raise ValueError('Mismatch of msgid: received: {}, enqueued {}, deleted {}'.format(msgid, msgid_enqueued, msgid_deleted))
if __name__ == '__main__':
test_examples_protocol_mqtt_qos1()

View file

@ -5,43 +5,46 @@ import re
import os
import sys
import time
import socket
import imp
import paho.mqtt.client as mqtt
g_recv_data=""
g_recv_topic=""
g_broker_connected=0
try:
import IDF
except Exception:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import DUT
g_recv_data = ""
g_recv_topic = ""
g_broker_connected = 0
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
global g_broker_connected
print("Connected with result code "+str(rc))
print("Connected with result code " + str(rc))
g_broker_connected = 1
client.subscribe("/topic/qos0")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
global g_recv_topic
global g_recv_data
payload = msg.payload.decode()
if g_recv_data == "" and payload == "data":
if g_recv_data == "" and payload == "data":
client.publish("/topic/qos0", "data_to_esp32")
g_recv_topic = msg.topic
g_recv_data = payload
print(msg.topic+" "+payload)
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
import DUT
print(msg.topic + " " + payload)
@IDF.idf_example_test(env_tag="Example_WIFI")
@ -49,7 +52,7 @@ def test_examples_protocol_mqtt_ws(env, extra_data):
global g_recv_topic
global g_recv_data
global g_broker_connected
broker_url="iot.eclipse.org"
broker_url = "iot.eclipse.org"
"""
steps: |
1. join AP and connects to ws broker
@ -61,8 +64,8 @@ def test_examples_protocol_mqtt_ws(env, extra_data):
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_websocket.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("mqtt_websocket_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("mqtt_websocket_size", bin_size//1024)
IDF.log_performance("mqtt_websocket_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("mqtt_websocket_size", bin_size // 1024)
# 1. start test (and check the environment is healthy)
dut1.start_app()
client = None
@ -79,7 +82,7 @@ def test_examples_protocol_mqtt_ws(env, extra_data):
print("...done")
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
except:
except Exception:
print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_url, sys.exc_info()[0]))
raise
print("Start Looping...")
@ -90,7 +93,7 @@ def test_examples_protocol_mqtt_ws(env, extra_data):
if g_broker_connected == 0:
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url))
# 3. check the message received back from the server
if g_recv_topic == "/topic/qos0" and g_recv_data == "data" :
if g_recv_topic == "/topic/qos0" and g_recv_data == "data":
print("PASS: Received correct message")
else:
print("Failure!")
@ -98,5 +101,6 @@ def test_examples_protocol_mqtt_ws(env, extra_data):
# 4. check that the esp32 client received data sent by this python client
dut1.expect(re.compile(r"DATA=data_to_esp32"), timeout=30)
if __name__ == '__main__':
test_examples_protocol_mqtt_ws()

View file

@ -3,45 +3,46 @@ import re
import os
import sys
import time
import socket
import imp
import ssl
import paho.mqtt.client as mqtt
g_recv_data=""
g_recv_topic=""
g_broker_connected=0
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import DUT
g_recv_data = ""
g_recv_topic = ""
g_broker_connected = 0
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
global g_broker_connected
print("Connected with result code "+str(rc))
print("Connected with result code " + str(rc))
g_broker_connected = 1
client.subscribe("/topic/qos0")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
global g_recv_topic
global g_recv_data
payload = msg.payload.decode()
if g_recv_data == "" and payload == "data":
if g_recv_data == "" and payload == "data":
client.publish("/topic/qos0", "data_to_esp32")
g_recv_topic = msg.topic
g_recv_data = payload
print(msg.topic+" "+str(payload))
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
import DUT
print(msg.topic + " " + str(payload))
@IDF.idf_example_test(env_tag="Example_WIFI")
@ -49,7 +50,7 @@ def test_examples_protocol_mqtt_wss(env, extra_data):
global g_recv_topic
global g_recv_data
global g_broker_connected
broker_url="iot.eclipse.org"
broker_url = "iot.eclipse.org"
"""
steps: |
1. join AP and connects to wss broker
@ -61,8 +62,8 @@ def test_examples_protocol_mqtt_wss(env, extra_data):
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_websocket_secure.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("mqtt_websocket_secure_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("mqtt_websocket_secure_size", bin_size//1024)
IDF.log_performance("mqtt_websocket_secure_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("mqtt_websocket_secure_size", bin_size // 1024)
# 1. start test (and check the environment is healthy)
dut1.start_app()
client = None
@ -74,14 +75,14 @@ def test_examples_protocol_mqtt_wss(env, extra_data):
client.on_connect = on_connect
client.on_message = on_message
client.tls_set(None,
None,
None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)
None,
None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)
print("Connecting...")
client.connect(broker_url, 443, 60)
print("...done")
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
except:
except Exception:
print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_url, sys.exc_info()[0]))
raise
print("Start Looping...")
@ -92,7 +93,7 @@ def test_examples_protocol_mqtt_wss(env, extra_data):
if g_broker_connected == 0:
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url))
# 3. check the message received back from the server
if g_recv_topic == "/topic/qos0" and g_recv_data == "data" :
if g_recv_topic == "/topic/qos0" and g_recv_data == "data":
print("PASS: Received correct message")
else:
print("Failure!")
@ -100,5 +101,6 @@ def test_examples_protocol_mqtt_wss(env, extra_data):
# 4. check that the esp32 client received data sent by this python client
dut1.expect(re.compile(r"DATA=data_to_esp32"), timeout=30)
if __name__ == '__main__':
test_examples_protocol_mqtt_wss()

View file

@ -11,7 +11,7 @@ import socket
import sys
# ----------- Config ----------
PORT = 3333;
PORT = 3333
IP_VERSION = 'IPv4'
IPV4 = '192.168.0.167'
IPV6 = 'FE80::32AE:A4FF:FE80:5288'
@ -31,14 +31,14 @@ try:
sock = socket.socket(family_addr, socket.SOCK_STREAM)
except socket.error as msg:
print('Could not create socket: ' + str(msg[0]) + ': ' + msg[1])
sys.exit(1);
sys.exit(1)
try:
sock.connect((host, PORT))
except socket.error as msg:
print('Could not open socket: ', msg)
sock.close()
sys.exit(1);
sys.exit(1)
while True:
msg = input('Enter message to send: ')
@ -46,6 +46,7 @@ while True:
msg = msg.encode()
sock.sendall(msg)
data = sock.recv(1024)
if not data: break;
print( 'Reply: ' + data.decode())
if not data:
break
print('Reply: ' + data.decode())
sock.close()

View file

@ -11,7 +11,7 @@ import sys
# ----------- Config ----------
IP_VERSION = 'IPv4'
PORT = 3333;
PORT = 3333
# -------------------------------
if IP_VERSION == 'IPv4':
@ -45,7 +45,8 @@ except socket.error as msg:
while True:
data = conn.recv(128)
if not data: break
if not data:
break
data = data.decode()
print('Received data: ' + data)
reply = 'OK: ' + data

View file

@ -30,7 +30,7 @@ else:
try:
sock = socket.socket(family_addr, socket.SOCK_DGRAM)
except socket.error as msg:
except socket.error:
print('Failed to create socket')
sys.exit()
@ -39,7 +39,8 @@ while True:
try:
sock.sendto(msg.encode(), (host, PORT))
reply, addr = sock.recvfrom(128)
if not reply: break
if not reply:
break
print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + str(reply))
except socket.error as msg:
print('Error Code : ' + str(msg[0]) + ' Message: ' + msg[1])

View file

@ -11,7 +11,7 @@ import sys
# ----------- Config ----------
IP_VERSION = 'IPv4'
PORT = 3333;
PORT = 3333
# -------------------------------
if IP_VERSION == 'IPv4':
@ -23,10 +23,10 @@ else:
sys.exit(1)
try :
try:
sock = socket.socket(family_addr, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except socket.error as msg :
except socket.error as msg:
print('Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1])
sys.exit()
@ -37,10 +37,11 @@ except socket.error as msg:
sys.exit()
while True:
try :
try:
print('Waiting for data...')
data, addr = sock.recvfrom(1024)
if not data: break
if not data:
break
data = data.decode()
print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + data)
reply = 'OK ' + data

View file

@ -3,12 +3,13 @@ import os
import sys
import subprocess
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
try:
import IDF
except ImportError:
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
@IDF.idf_example_test(env_tag='Example_WIFI')

View file

@ -2,12 +2,14 @@ from __future__ import print_function
import os
import sys
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
try:
import IDF
except ImportError:
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import TinyFW
import IDF
@IDF.idf_example_test(env_tag='Example_WIFI')
def test_examples_system_cpp_exceptions(env, extra_data):
@ -20,9 +22,10 @@ def test_examples_system_cpp_exceptions(env, extra_data):
'In destructor, m_arg=42',
'Exception caught: Exception in constructor',
'app_main done'
]
]
for line in lines:
dut.expect(line, timeout=2)
if __name__ == '__main__':
test_examples_system_cpp_exceptions()

View file

@ -1,18 +1,18 @@
from __future__ import print_function
import re
import os
import sys
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
# Timer events
TIMER_EVENT_LIMIT = 3
@ -26,6 +26,7 @@ TASK_UNREGISTRATION_LIMIT = 3
TASK_ITERATION_POST = "TASK_EVENTS:TASK_ITERATION_EVENT: posting to default loop, {} out of " + str(TASK_ITERATION_LIMIT)
TASK_ITERATION_HANDLING = "TASK_EVENTS:TASK_ITERATION_EVENT: task_iteration_handler, executed {} times"
def _test_timer_events(dut):
dut.start_app()
@ -49,13 +50,13 @@ def _test_timer_events(dut):
if expiries < TIMER_EVENT_LIMIT:
dut.expect_all("TIMER_EVENTS:TIMER_EVENT_EXPIRY: all_event_handler",
"TIMER_EVENTS:TIMER_EVENT_EXPIRY: timer_any_handler",
TIMER_EXPIRY_HANDLING.format(expiries))
"TIMER_EVENTS:TIMER_EVENT_EXPIRY: timer_any_handler",
TIMER_EXPIRY_HANDLING.format(expiries))
else:
dut.expect_all("TIMER_EVENTS:TIMER_EVENT_STOPPED: posting to default loop",
"TIMER_EVENTS:TIMER_EVENT_EXPIRY: all_event_handler",
"TIMER_EVENTS:TIMER_EVENT_EXPIRY: timer_any_handler",
TIMER_EXPIRY_HANDLING.format(expiries))
"TIMER_EVENTS:TIMER_EVENT_EXPIRY: all_event_handler",
"TIMER_EVENTS:TIMER_EVENT_EXPIRY: timer_any_handler",
TIMER_EXPIRY_HANDLING.format(expiries))
print("Posted timer stopped event")
print("Handled timer expiry event {} out of {}".format(expiries, TIMER_EVENT_LIMIT))
@ -64,6 +65,7 @@ def _test_timer_events(dut):
dut.expect("TIMER_EVENTS:TIMER_EVENT_STOPPED: deleted timer event source")
print("Handled timer stopped event")
def _test_iteration_events(dut):
dut.start_app()
@ -89,6 +91,7 @@ def _test_iteration_events(dut):
dut.expect("TASK_EVENTS:TASK_ITERATION_EVENT: deleting task event source")
print("Deleted task event source")
@IDF.idf_example_test(env_tag='Example_WIFI')
def test_default_event_loop_example(env, extra_data):
dut = env.get_dut('default_event_loop', 'examples/system/event/default_event_loop')
@ -96,5 +99,6 @@ def test_default_event_loop_example(env, extra_data):
_test_iteration_events(dut)
_test_timer_events(dut)
if __name__ == '__main__':
test_default_event_loop_example()

View file

@ -1,24 +1,25 @@
from __future__ import print_function
import re
import os
import sys
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
TASK_ITERATION_LIMIT = 10
TASK_ITERATION_POSTING = "posting TASK_EVENTS:TASK_ITERATION_EVENT to {}, iteration {} out of " + str(TASK_ITERATION_LIMIT)
TASK_ITERATION_HANDLING = "handling TASK_EVENTS:TASK_ITERATION_EVENT from {}, iteration {}"
@IDF.idf_example_test(env_tag='Example_WIFI')
def test_user_event_loops_example(env, extra_data):
dut = env.get_dut('user_event_loops', 'examples/system/event/user_event_loops')
@ -46,5 +47,6 @@ def test_user_event_loops_example(env, extra_data):
dut.expect("deleting task event source")
print("Deleted task event source")
if __name__ == '__main__':
test_user_event_loops_example()

View file

@ -3,16 +3,17 @@ import re
import os
import sys
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
STARTING_TIMERS_REGEX = re.compile(r'Started timers, time since boot: (\d+) us')
@ -35,6 +36,7 @@ FINAL_TIMER_PERIOD = 1000000
LIGHT_SLEEP_TIME = 500000
ONE_SHOT_TIMER_PERIOD = 5000000
@IDF.idf_example_test(env_tag='Example_WIFI')
def test_examples_system_esp_timer(env, extra_data):
dut = env.get_dut('esp_timer_example', 'examples/system/esp_timer')
@ -93,5 +95,6 @@ def test_examples_system_esp_timer(env, extra_data):
dut.expect(STOP_REGEX, timeout=2)
if __name__ == '__main__':
test_examples_system_esp_timer()

View file

@ -4,12 +4,13 @@ import os
import sys
import time
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
try:
import IDF
except ImportError:
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
ENTERING_SLEEP_STR = 'Entering light sleep'
EXIT_SLEEP_REGEX = re.compile(r'Returned from light sleep, reason: (\w+), t=(\d+) ms, slept for (\d+) ms')
@ -17,6 +18,7 @@ WAITING_FOR_GPIO_STR = 'Waiting for GPIO0 to go high...'
WAKEUP_INTERVAL_MS = 2000
@IDF.idf_example_test(env_tag='Example_WIFI')
def test_examples_system_light_sleep(env, extra_data):
dut = env.get_dut('light_sleep_example', 'examples/system/light_sleep')
@ -58,5 +60,6 @@ def test_examples_system_light_sleep(env, extra_data):
assert(groups[0] == 'timer' and int(groups[2]) == WAKEUP_INTERVAL_MS)
print('Woke up from timer again')
if __name__ == '__main__':
test_examples_system_light_sleep()

View file

@ -3,16 +3,17 @@ import os
import sys
import subprocess
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
@IDF.idf_example_test(env_tag='Example_WIFI')

View file

@ -30,21 +30,28 @@ import sys
import time
import subprocess
# add current folder to system path for importing test_report
sys.path.append(os.path.dirname(__file__))
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import IDF
import DUT
import Utility
from Utility import (Attenuator, PowerControl, LineChart)
from test_report import (ThroughputForConfigsReport, ThroughputVsRssiReport)
try:
from test_report import (ThroughputForConfigsReport, ThroughputVsRssiReport)
except ImportError:
# add current folder to system path for importing test_report
sys.path.append(os.path.dirname(__file__))
from test_report import (ThroughputForConfigsReport, ThroughputVsRssiReport)
# configurations
TEST_TIME = TEST_TIMEOUT = 60
@ -80,7 +87,7 @@ class TestResult(object):
BAD_POINT_PERCENTAGE_THRESHOLD = 0.3
# we need at least 1/2 valid points to qualify the test result
THROUGHPUT_QUALIFY_COUNT = TEST_TIME//2
THROUGHPUT_QUALIFY_COUNT = TEST_TIME // 2
def __init__(self, proto, direction, config_name):
self.proto = proto

View file

@ -88,7 +88,7 @@ class ThroughputForConfigsReport(object):
current_config = self.sdkconfigs[_config_name]
if i > 0:
previous_config_name = self.sort_order[i-1]
previous_config_name = self.sort_order[i - 1]
previous_config = self.sdkconfigs[previous_config_name]
else:
previous_config = previous_config_name = None