examples: Fix Python coding style

This commit is contained in:
Roland Dobai 2018-12-04 08:32:48 +01:00
parent a36d714d1a
commit 57c54f96f1
36 changed files with 645 additions and 561 deletions

38
.flake8
View file

@ -149,6 +149,8 @@ exclude =
components/expat/expat, components/expat/expat,
components/unity/unity, components/unity/unity,
examples/build_system/cmake/import_lib/main/lib/tinyxml2 examples/build_system/cmake/import_lib/main/lib/tinyxml2
# autogenerated scripts
examples/provisioning/custom_config/components/custom_provisioning/python/custom_config_pb2.py,
# temporary list (should be empty) # temporary list (should be empty)
components/app_update/dump_otadata.py, components/app_update/dump_otadata.py,
components/app_update/gen_empty_partition.py, components/app_update/gen_empty_partition.py,
@ -164,45 +166,9 @@ exclude =
components/ulp/esp32ulp_mapgen.py, components/ulp/esp32ulp_mapgen.py,
components/wifi_provisioning/python/wifi_config_pb2.py, components/wifi_provisioning/python/wifi_config_pb2.py,
components/wifi_provisioning/python/wifi_constants_pb2.py, components/wifi_provisioning/python/wifi_constants_pb2.py,
examples/peripherals/can/can_alert_and_recovery/example_test.py,
examples/peripherals/can/can_network/example_test.py,
examples/peripherals/can/can_self_test/example_test.py,
examples/peripherals/i2s_adc_dac/tools/generate_audio_file.py,
examples/peripherals/sdio/sdio_test.py,
examples/protocols/asio/chat_client/asio_chat_client_test.py,
examples/protocols/asio/chat_server/asio_chat_server_test.py,
examples/protocols/asio/tcp_echo_server/asio_tcp_server_test.py,
examples/protocols/asio/udp_echo_server/asio_udp_server_test.py,
examples/protocols/esp_http_client/esp_http_client_test.py,
examples/protocols/http_server/advanced_tests/http_server_advanced_test.py,
examples/protocols/http_server/advanced_tests/scripts/test.py,
examples/protocols/http_server/persistent_sockets/http_server_persistence_test.py,
examples/protocols/http_server/persistent_sockets/scripts/adder.py,
examples/protocols/http_server/simple/http_server_simple_test.py,
examples/protocols/http_server/simple/scripts/client.py,
examples/protocols/https_request/example_test.py,
examples/protocols/mdns/mdns_example_test.py,
examples/protocols/mqtt/ssl/mqtt_ssl_example_test.py,
examples/protocols/mqtt/tcp/mqtt_tcp_example_test.py,
examples/protocols/mqtt/ws/mqtt_ws_example_test.py,
examples/protocols/mqtt/wss/mqtt_wss_example_test.py,
examples/protocols/sockets/scripts/tcpclient.py,
examples/protocols/sockets/scripts/tcpserver.py,
examples/protocols/sockets/scripts/udpclient.py,
examples/protocols/sockets/scripts/udpserver.py,
examples/provisioning/ble_prov/ble_prov_test.py, examples/provisioning/ble_prov/ble_prov_test.py,
examples/provisioning/custom_config/components/custom_provisioning/python/custom_config_pb2.py,
examples/provisioning/softap_prov/softap_prov_test.py, examples/provisioning/softap_prov/softap_prov_test.py,
examples/provisioning/softap_prov/utils/wifi_tools.py, examples/provisioning/softap_prov/utils/wifi_tools.py,
examples/storage/parttool/example_test.py,
examples/system/cpp_exceptions/example_test.py,
examples/system/esp_event/default_event_loop/example_test.py,
examples/system/esp_event/user_event_loops/example_test.py,
examples/system/esp_timer/example_test.py,
examples/system/light_sleep/example_test.py,
examples/system/ota/otatool/example_test.py,
examples/wifi/iperf/iperf_test.py,
examples/wifi/iperf/test_report.py,
tools/ci/apply_bot_filter.py, tools/ci/apply_bot_filter.py,
tools/cmake/convert_to_cmake.py, tools/cmake/convert_to_cmake.py,
tools/esp_app_trace/apptrace_proc.py, tools/esp_app_trace/apptrace_proc.py,

View file

@ -1,29 +1,33 @@
#Need Python 3 string formatting functions # Need Python 3 string formatting functions
from __future__ import print_function from __future__ import print_function
import re
import os import os
import sys import sys
# The test cause is dependent on the Tiny Test Framework. Ensure the
# `TEST_FW_PATH` environment variable is set to `$IDF_PATH/tools/tiny-test-fw` try:
test_fw_path = os.getenv("TEST_FW_PATH") import IDF
if test_fw_path and test_fw_path not in sys.path: except ImportError:
sys.path.insert(0, test_fw_path) # The test cause is dependent on the Tiny Test Framework. Ensure the
import TinyFW # `TEST_FW_PATH` environment variable is set to `$IDF_PATH/tools/tiny-test-fw`
import IDF test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
# CAN Self Test Example constants # CAN Self Test Example constants
STR_EXPECT = ("CAN Alert and Recovery: Driver installed", "CAN Alert and Recovery: Driver uninstalled") STR_EXPECT = ("CAN Alert and Recovery: Driver installed", "CAN Alert and Recovery: Driver uninstalled")
EXPECT_TIMEOUT = 20 EXPECT_TIMEOUT = 20
@IDF.idf_example_test(env_tag='Example_CAN') @IDF.idf_example_test(env_tag='Example_CAN')
def test_can_alert_and_recovery_example(env, extra_data): def test_can_alert_and_recovery_example(env, extra_data):
#Get device under test, flash and start example. "dut4" must be defined in EnvConfig # Get device under test, flash and start example. "dut4" must be defined in EnvConfig
dut = env.get_dut('dut4', 'examples/peripherals/can/can_alert_and_recovery') dut = env.get_dut('dut4', 'examples/peripherals/can/can_alert_and_recovery')
dut.start_app() dut.start_app()
for string in STR_EXPECT: for string in STR_EXPECT:
dut.expect(string, timeout = EXPECT_TIMEOUT) dut.expect(string, timeout=EXPECT_TIMEOUT)
if __name__ == '__main__': if __name__ == '__main__':
test_can_alert_and_recovery_example() test_can_alert_and_recovery_example()

View file

@ -1,77 +1,80 @@
#Need Python 3 string formatting functions # Need Python 3 string formatting functions
from __future__ import print_function from __future__ import print_function
import re
import os import os
import sys import sys
import time
from threading import Thread from threading import Thread
# The test cause is dependent on the Tiny Test Framework. Ensure the try:
# `TEST_FW_PATH` environment variable is set to `$IDF_PATH/tools/tiny-test-fw` import IDF
test_fw_path = os.getenv("TEST_FW_PATH") except ImportError:
if test_fw_path and test_fw_path not in sys.path: # The test cause is dependent on the Tiny Test Framework. Ensure the
sys.path.insert(0, test_fw_path) # `TEST_FW_PATH` environment variable is set to `$IDF_PATH/tools/tiny-test-fw`
import TinyFW test_fw_path = os.getenv("TEST_FW_PATH")
import IDF if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
#Define tuple of strings to expect for each DUT. # Define tuple of strings to expect for each DUT.
master_expect = ("CAN Master: Driver installed", "CAN Master: Driver uninstalled") master_expect = ("CAN Master: Driver installed", "CAN Master: Driver uninstalled")
slave_expect = ("CAN Slave: Driver installed", "CAN Slave: Driver uninstalled") slave_expect = ("CAN Slave: Driver installed", "CAN Slave: Driver uninstalled")
listen_only_expect = ("CAN Listen Only: Driver installed", "Listen Only: Driver uninstalled") listen_only_expect = ("CAN Listen Only: Driver installed", "Listen Only: Driver uninstalled")
def dut_thread_callback(**kwargs): def dut_thread_callback(**kwargs):
#Parse keyword arguments # Parse keyword arguments
dut = kwargs['dut'] #Get DUT from kwargs dut = kwargs['dut'] # Get DUT from kwargs
expected = kwargs['expected'] expected = kwargs['expected']
result = kwargs['result'] #Get result[out] from kwargs. MUST be of mutable type e.g. list result = kwargs['result'] # Get result[out] from kwargs. MUST be of mutable type e.g. list
#Must reset again as flashing during start_app will reset multiple times, causing unexpected results # Must reset again as flashing during start_app will reset multiple times, causing unexpected results
dut.reset() dut.reset()
for string in expected: for string in expected:
dut.expect(string, 20) dut.expect(string, 20)
#Mark thread has run to completion without any exceptions # Mark thread has run to completion without any exceptions
result[0] = True result[0] = True
@IDF.idf_example_test(env_tag='Example_CAN') @IDF.idf_example_test(env_tag='Example_CAN')
def test_can_network_example(env, extra_data): def test_can_network_example(env, extra_data):
#Get device under test. "dut1", "dut2", and "dut3" must be properly defined in EnvConfig # Get device under test. "dut1", "dut2", and "dut3" must be properly defined in EnvConfig
dut_master = env.get_dut("dut1", "examples/peripherals/can/can_network/can_network_master") dut_master = env.get_dut("dut1", "examples/peripherals/can/can_network/can_network_master")
dut_slave = env.get_dut("dut2", "examples/peripherals/can/can_network/can_network_slave") dut_slave = env.get_dut("dut2", "examples/peripherals/can/can_network/can_network_slave")
dut_listen_only = env.get_dut("dut3", "examples/peripherals/can/can_network/can_network_listen_only") dut_listen_only = env.get_dut("dut3", "examples/peripherals/can/can_network/can_network_listen_only")
#Flash app onto each DUT, each DUT is reset again at the start of each thread # Flash app onto each DUT, each DUT is reset again at the start of each thread
dut_master.start_app() dut_master.start_app()
dut_slave.start_app() dut_slave.start_app()
dut_listen_only.start_app() dut_listen_only.start_app()
#Create dict of keyword arguments for each dut # Create dict of keyword arguments for each dut
results = [[False], [False], [False]] results = [[False], [False], [False]]
master_kwargs = {"dut" : dut_master, "result" : results[0], "expected" : master_expect} master_kwargs = {"dut": dut_master, "result": results[0], "expected": master_expect}
slave_kwargs = {"dut" : dut_slave, "result" : results[1], "expected" : slave_expect} slave_kwargs = {"dut": dut_slave, "result": results[1], "expected": slave_expect}
listen_only_kwargs = {"dut" : dut_listen_only, "result" : results[2], "expected" : listen_only_expect} listen_only_kwargs = {"dut": dut_listen_only, "result": results[2], "expected": listen_only_expect}
#Create thread for each dut # Create thread for each dut
dut_master_thread = Thread(target = dut_thread_callback, name = "Master Thread", kwargs = master_kwargs) dut_master_thread = Thread(target=dut_thread_callback, name="Master Thread", kwargs=master_kwargs)
dut_slave_thread = Thread(target = dut_thread_callback, name = "Slave Thread", kwargs = slave_kwargs) dut_slave_thread = Thread(target=dut_thread_callback, name="Slave Thread", kwargs=slave_kwargs)
dut_listen_only_thread = Thread(target = dut_thread_callback, name = "Listen Only Thread", kwargs = listen_only_kwargs) dut_listen_only_thread = Thread(target=dut_thread_callback, name="Listen Only Thread", kwargs=listen_only_kwargs)
#Start each thread # Start each thread
dut_listen_only_thread.start() dut_listen_only_thread.start()
dut_master_thread.start() dut_master_thread.start()
dut_slave_thread.start() dut_slave_thread.start()
#Wait for threads to complete # Wait for threads to complete
dut_listen_only_thread.join() dut_listen_only_thread.join()
dut_master_thread.join() dut_master_thread.join()
dut_slave_thread.join() dut_slave_thread.join()
#check each thread ran to completion # check each thread ran to completion
for result in results: for result in results:
if result[0] != True: if result[0] is not True:
raise Exception("One or more threads did not run successfully") raise Exception("One or more threads did not run successfully")
if __name__ == '__main__': if __name__ == '__main__':
test_can_network_example() test_can_network_example()

View file

@ -1,29 +1,33 @@
#Need Python 3 string formatting functions # Need Python 3 string formatting functions
from __future__ import print_function from __future__ import print_function
import re
import os import os
import sys import sys
# The test cause is dependent on the Tiny Test Framework. Ensure the try:
# `TEST_FW_PATH` environment variable is set to `$IDF_PATH/tools/tiny-test-fw` import IDF
test_fw_path = os.getenv("TEST_FW_PATH") except ImportError:
if test_fw_path and test_fw_path not in sys.path: # The test cause is dependent on the Tiny Test Framework. Ensure the
sys.path.insert(0, test_fw_path) # `TEST_FW_PATH` environment variable is set to `$IDF_PATH/tools/tiny-test-fw`
import TinyFW test_fw_path = os.getenv("TEST_FW_PATH")
import IDF if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
# CAN Self Test Example constants # CAN Self Test Example constants
STR_EXPECT = ("CAN Self Test: Driver installed", "CAN Self Test: Driver uninstalled") STR_EXPECT = ("CAN Self Test: Driver installed", "CAN Self Test: Driver uninstalled")
EXPECT_TIMEOUT = 20 EXPECT_TIMEOUT = 20
@IDF.idf_example_test(env_tag='Example_CAN') @IDF.idf_example_test(env_tag='Example_CAN')
def test_can_self_test_example(env, extra_data): def test_can_self_test_example(env, extra_data):
#Get device under test, flash and start example. "dut4" must be defined in EnvConfig # Get device under test, flash and start example. "dut4" must be defined in EnvConfig
dut = env.get_dut('dut4', 'examples/peripherals/can/can_self_test') dut = env.get_dut('dut4', 'examples/peripherals/can/can_self_test')
dut.start_app() dut.start_app()
for string in STR_EXPECT: for string in STR_EXPECT:
dut.expect(string, timeout = EXPECT_TIMEOUT) dut.expect(string, timeout=EXPECT_TIMEOUT)
if __name__ == '__main__': if __name__ == '__main__':
test_can_self_test_example() test_can_self_test_example()

View file

@ -4,6 +4,7 @@ import os
import wave import wave
import struct import struct
def get_wave_array_str(filename, target_bits): def get_wave_array_str(filename, target_bits):
wave_read = wave.open(filename, "r") wave_read = wave.open(filename, "r")
array_str = "" array_str = ""
@ -13,28 +14,30 @@ def get_wave_array_str(filename, target_bits):
val, = struct.unpack("<H", wave_read.readframes(1)) val, = struct.unpack("<H", wave_read.readframes(1))
scale_val = (1 << target_bits) - 1 scale_val = (1 << target_bits) - 1
cur_lim = (1 << sampwidth) - 1 cur_lim = (1 << sampwidth) - 1
#scale current data to 8-bit data # scale current data to 8-bit data
val = val * scale_val / cur_lim val = val * scale_val / cur_lim
val = int(val + ((scale_val + 1) // 2)) & scale_val val = int(val + ((scale_val + 1) // 2)) & scale_val
array_str += "0x%x, "%(val) array_str += "0x%x, " % (val)
if (i + 1) % 16 == 0: if (i + 1) % 16 == 0:
array_str += "\n" array_str += "\n"
return array_str return array_str
def gen_wave_table(wav_file_list, target_file_name, scale_bits = 8):
def gen_wave_table(wav_file_list, target_file_name, scale_bits=8):
with open(target_file_name, "w") as audio_table: with open(target_file_name, "w") as audio_table:
print('#include <stdio.h>', file=audio_table) print('#include <stdio.h>', file=audio_table)
print('const unsigned char audio_table[] = {', file=audio_table) print('const unsigned char audio_table[] = {', file=audio_table)
for wav in wav_file_list: for wav in wav_file_list:
print("processing: {}".format(wav)) print("processing: {}".format(wav))
print(get_wave_array_str(filename = wav, target_bits = scale_bits), file=audio_table) print(get_wave_array_str(filename=wav, target_bits=scale_bits), file=audio_table)
print('};\n', file=audio_table) print('};\n', file=audio_table)
print("Done...") print("Done...")
if __name__=='__main__':
if __name__ == '__main__':
print("Generating audio array...") print("Generating audio array...")
wav_list = [] wav_list = []
for filename in os.listdir("./"): for filename in os.listdir("./"):
if filename.endswith(".wav"): if filename.endswith(".wav"):
wav_list.append(filename) wav_list.append(filename)
gen_wave_table(wav_file_list = wav_list, target_file_name = "audio_example_file.h") gen_wave_table(wav_file_list=wav_list, target_file_name="audio_example_file.h")

View file

@ -13,17 +13,19 @@
# limitations under the License. # limitations under the License.
""" example of writing test with TinyTestFW """ """ example of writing test with TinyTestFW """
import re
import os import os
import sys import sys
# if we want to run test case outside `tiny-test-fw` folder, try:
# we need to insert tiny-test-fw path into sys path import TinyFW
test_fw_path = os.getenv("TEST_FW_PATH") except ImportError:
if test_fw_path and test_fw_path not in sys.path: # if we want to run test case outside `tiny-test-fw` folder,
sys.path.insert(0, test_fw_path) # we need to insert tiny-test-fw path into sys path
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import TinyFW
import IDF import IDF
@ -50,7 +52,7 @@ def test_example_sdio_communication(env, extra_data):
dut1 = env.get_dut("sdio_host", "examples/peripherals/sdio/host") dut1 = env.get_dut("sdio_host", "examples/peripherals/sdio/host")
dut2 = env.get_dut("sdio_slave", "examples/peripherals/sdio/slave") dut2 = env.get_dut("sdio_slave", "examples/peripherals/sdio/slave")
dut1.start_app() dut1.start_app()
#wait until the master is ready to setup the slave # wait until the master is ready to setup the slave
dut1.expect("host ready, start initializing slave...") dut1.expect("host ready, start initializing slave...")
dut2.start_app() dut2.start_app()
@ -96,12 +98,12 @@ def test_example_sdio_communication(env, extra_data):
dut2.expect("recv len: 6") dut2.expect("recv len: 6")
dut2.expect("recv len: 12") dut2.expect("recv len: 12")
dut2.expect("recv len: 128") dut2.expect("recv len: 128")
#511 # 511
dut2.expect("recv len: 128") dut2.expect("recv len: 128")
dut2.expect("recv len: 128") dut2.expect("recv len: 128")
dut2.expect("recv len: 128") dut2.expect("recv len: 128")
dut2.expect("recv len: 127") dut2.expect("recv len: 127")
#512 # 512
dut2.expect("recv len: 128") dut2.expect("recv len: 128")
dut2.expect("recv len: 128") dut2.expect("recv len: 128")
dut2.expect("recv len: 128") dut2.expect("recv len: 128")
@ -122,9 +124,9 @@ def test_example_sdio_communication(env, extra_data):
dut1.expect("receive data, size: 128") dut1.expect("receive data, size: 128")
dut1.expect("receive data, size: 128") dut1.expect("receive data, size: 128")
#the last valid line of one round # the last valid line of one round
dut1.expect("ce d3 d8 dd e2 e7 ec f1 f6 fb 00 05 0a 0f 14 19") dut1.expect("ce d3 d8 dd e2 e7 ec f1 f6 fb 00 05 0a 0f 14 19")
#the first 2 lines of the second round # the first 2 lines of the second round
dut1.expect("46 4b 50") dut1.expect("46 4b 50")
dut1.expect("5a 5f 64 69 6e 73") dut1.expect("5a 5f 64 69 6e 73")

View file

@ -1,49 +1,52 @@
import re import re
import os import os
import sys import sys
from socket import * import socket
from threading import Thread from threading import Thread
import time import time
# this is a test case write with tiny-test-fw. try:
# to run test cases outside tiny-test-fw, import IDF
# we need to set environment variable `TEST_FW_PATH`, except ImportError:
# then get and insert `TEST_FW_PATH` to sys path before import FW module # this is a test case write with tiny-test-fw.
test_fw_path = os.getenv("TEST_FW_PATH") # to run test cases outside tiny-test-fw,
if test_fw_path and test_fw_path not in sys.path: # we need to set environment variable `TEST_FW_PATH`,
sys.path.insert(0, test_fw_path) # then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import TinyFW global g_client_response
import IDF global g_msg_to_client
global g_client_response;
global g_msg_to_client;
g_client_response = b"" g_client_response = b""
g_msg_to_client = b" 3XYZ" g_msg_to_client = b" 3XYZ"
def get_my_ip(): def get_my_ip():
s1 = socket(AF_INET, SOCK_DGRAM) s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s1.connect(("8.8.8.8", 80)) s1.connect(("8.8.8.8", 80))
my_ip = s1.getsockname()[0] my_ip = s1.getsockname()[0]
s1.close() s1.close()
return my_ip return my_ip
def chat_server_sketch(my_ip): def chat_server_sketch(my_ip):
global g_client_response global g_client_response
print("Starting the server on {}".format(my_ip)) print("Starting the server on {}".format(my_ip))
port=2222 port = 2222
s=socket(AF_INET, SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(600) s.settimeout(600)
s.bind((my_ip, port)) s.bind((my_ip, port))
s.listen(1) s.listen(1)
q,addr=s.accept() q,addr = s.accept()
print("connection accepted") print("connection accepted")
q.settimeout(30) q.settimeout(30)
q.send(g_msg_to_client) q.send(g_msg_to_client)
data = q.recv(1024) data = q.recv(1024)
# check if received initial empty message # check if received initial empty message
if (len(data)>4): if (len(data) > 4):
g_client_response = data g_client_response = data
else: else:
g_client_response = q.recv(1024) g_client_response = q.recv(1024)
@ -51,6 +54,7 @@ def chat_server_sketch(my_ip):
s.close() s.close()
print("server closed") print("server closed")
@IDF.idf_example_test(env_tag="Example_WIFI") @IDF.idf_example_test(env_tag="Example_WIFI")
def test_examples_protocol_asio_chat_client(env, extra_data): def test_examples_protocol_asio_chat_client(env, extra_data):
""" """
@ -64,24 +68,24 @@ def test_examples_protocol_asio_chat_client(env, extra_data):
""" """
global g_client_response global g_client_response
global g_msg_to_client global g_msg_to_client
test_msg="ABC" test_msg = "ABC"
dut1 = env.get_dut("chat_client", "examples/protocols/asio/chat_client") dut1 = env.get_dut("chat_client", "examples/protocols/asio/chat_client")
# check and log bin size # check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "asio_chat_client.bin") binary_file = os.path.join(dut1.app.binary_path, "asio_chat_client.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("asio_chat_client_size", "{}KB".format(bin_size//1024)) IDF.log_performance("asio_chat_client_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("asio_chat_client_size", bin_size//1024) IDF.check_performance("asio_chat_client_size", bin_size // 1024)
# 1. start a tcp server on the host # 1. start a tcp server on the host
host_ip = get_my_ip() host_ip = get_my_ip()
thread1 = Thread(target = chat_server_sketch, args = (host_ip,)) thread1 = Thread(target=chat_server_sketch, args=(host_ip,))
thread1.start() thread1.start()
# 2. start the dut test and wait till client gets IP address # 2. start the dut test and wait till client gets IP address
dut1.start_app() dut1.start_app()
data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30) dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
# 3. send host's IP to the client i.e. the `dut1` # 3. send host's IP to the client i.e. the `dut1`
dut1.write(host_ip) dut1.write(host_ip)
# 4. client `dut1` should receive a message # 4. client `dut1` should receive a message
dut1.expect(g_msg_to_client[4:].decode()) # Strip out the front 4 bytes of message len (see chat_message protocol) dut1.expect(g_msg_to_client[4:].decode()) # Strip out the front 4 bytes of message len (see chat_message protocol)
# 5. write test message from `dut1` chat_client to the server # 5. write test message from `dut1` chat_client to the server
dut1.write(test_msg) dut1.write(test_msg)
while len(g_client_response) == 0: while len(g_client_response) == 0:
@ -97,5 +101,6 @@ def test_examples_protocol_asio_chat_client(env, extra_data):
raise ValueError('Wrong data received from asi tcp server: {} (expected:{})'.format(g_client_response[4:7], test_msg)) raise ValueError('Wrong data received from asi tcp server: {} (expected:{})'.format(g_client_response[4:7], test_msg))
thread1.join() thread1.join()
if __name__ == '__main__': if __name__ == '__main__':
test_examples_protocol_asio_chat_client() test_examples_protocol_asio_chat_client()

View file

@ -1,21 +1,20 @@
import re import re
import os import os
import sys import sys
from socket import * import socket
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
@IDF.idf_example_test(env_tag="Example_WIFI") @IDF.idf_example_test(env_tag="Example_WIFI")
@ -27,19 +26,19 @@ def test_examples_protocol_asio_chat_server(env, extra_data):
3. Test connects to server and sends a test message 3. Test connects to server and sends a test message
4. Test evaluates received test message from server 4. Test evaluates received test message from server
""" """
test_msg=b" 4ABC\n" test_msg = b" 4ABC\n"
dut1 = env.get_dut("chat_server", "examples/protocols/asio/chat_server") dut1 = env.get_dut("chat_server", "examples/protocols/asio/chat_server")
# check and log bin size # check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "asio_chat_server.bin") binary_file = os.path.join(dut1.app.binary_path, "asio_chat_server.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("asio_chat_server_bin_size", "{}KB".format(bin_size//1024)) IDF.log_performance("asio_chat_server_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("asio_chat_server_size", bin_size//1024) IDF.check_performance("asio_chat_server_size", bin_size // 1024)
# 1. start test # 1. start test
dut1.start_app() dut1.start_app()
# 2. get the server IP address # 2. get the server IP address
data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30) data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
# 3. create tcp client and connect to server # 3. create tcp client and connect to server
cli = socket(AF_INET,SOCK_STREAM) cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cli.settimeout(30) cli.settimeout(30)
cli.connect((data[0],80)) cli.connect((data[0],80))
cli.send(test_msg) cli.send(test_msg)

View file

@ -1,21 +1,21 @@
import re import re
import os import os
import sys import sys
from socket import * import socket
# this is a test case write with tiny-test-fw. try:
# to run test cases outside tiny-test-fw, import IDF
# we need to set environment variable `TEST_FW_PATH`, except ImportError:
# then get and insert `TEST_FW_PATH` to sys path before import FW module # this is a test case write with tiny-test-fw.
test_fw_path = os.getenv("TEST_FW_PATH") # to run test cases outside tiny-test-fw,
if test_fw_path and test_fw_path not in sys.path: # we need to set environment variable `TEST_FW_PATH`,
sys.path.insert(0, test_fw_path) # then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
import TinyFW if test_fw_path and test_fw_path not in sys.path:
import IDF sys.path.insert(0, test_fw_path)
import IDF
@IDF.idf_example_test(env_tag="Example_WIFI") @IDF.idf_example_test(env_tag="Example_WIFI")
@ -28,19 +28,19 @@ def test_examples_protocol_asio_tcp_server(env, extra_data):
4. Test evaluates received test message from server 4. Test evaluates received test message from server
5. Test evaluates received test message on server stdout 5. Test evaluates received test message on server stdout
""" """
test_msg=b"echo message from client to server" test_msg = b"echo message from client to server"
dut1 = env.get_dut("tcp_echo_server", "examples/protocols/asio/tcp_echo_server") dut1 = env.get_dut("tcp_echo_server", "examples/protocols/asio/tcp_echo_server")
# check and log bin size # check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "asio_tcp_echo_server.bin") binary_file = os.path.join(dut1.app.binary_path, "asio_tcp_echo_server.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("asio_tcp_echo_server_bin_size", "{}KB".format(bin_size//1024)) IDF.log_performance("asio_tcp_echo_server_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("asio_tcp_echo_server_size", bin_size//1024) IDF.check_performance("asio_tcp_echo_server_size", bin_size // 1024)
# 1. start test # 1. start test
dut1.start_app() dut1.start_app()
# 2. get the server IP address # 2. get the server IP address
data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30) data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
# 3. create tcp client and connect to server # 3. create tcp client and connect to server
cli = socket(AF_INET,SOCK_STREAM) cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cli.settimeout(30) cli.settimeout(30)
cli.connect((data[0],80)) cli.connect((data[0],80))
cli.send(test_msg) cli.send(test_msg)

View file

@ -1,21 +1,21 @@
import re import re
import os import os
import sys import sys
from socket import * import socket
# this is a test case write with tiny-test-fw. try:
# to run test cases outside tiny-test-fw, import IDF
# we need to set environment variable `TEST_FW_PATH`, except ImportError:
# then get and insert `TEST_FW_PATH` to sys path before import FW module # this is a test case write with tiny-test-fw.
test_fw_path = os.getenv("TEST_FW_PATH") # to run test cases outside tiny-test-fw,
if test_fw_path and test_fw_path not in sys.path: # we need to set environment variable `TEST_FW_PATH`,
sys.path.insert(0, test_fw_path) # then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
import TinyFW if test_fw_path and test_fw_path not in sys.path:
import IDF sys.path.insert(0, test_fw_path)
import IDF
@IDF.idf_example_test(env_tag="Example_WIFI") @IDF.idf_example_test(env_tag="Example_WIFI")
@ -28,19 +28,19 @@ def test_examples_protocol_asio_udp_server(env, extra_data):
4. Test evaluates received test message from server 4. Test evaluates received test message from server
5. Test evaluates received test message on server stdout 5. Test evaluates received test message on server stdout
""" """
test_msg=b"echo message from client to server" test_msg = b"echo message from client to server"
dut1 = env.get_dut("udp_echo_server", "examples/protocols/asio/udp_echo_server") dut1 = env.get_dut("udp_echo_server", "examples/protocols/asio/udp_echo_server")
# check and log bin size # check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "asio_udp_echo_server.bin") binary_file = os.path.join(dut1.app.binary_path, "asio_udp_echo_server.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("asio_udp_echo_server_bin_size", "{}KB".format(bin_size//1024)) IDF.log_performance("asio_udp_echo_server_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("asio_udp_echo_server_size", bin_size//1024) IDF.check_performance("asio_udp_echo_server_size", bin_size // 1024)
# 1. start test # 1. start test
dut1.start_app() dut1.start_app()
# 2. get the server IP address # 2. get the server IP address
data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30) data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
# 3. create tcp client and connect to server # 3. create tcp client and connect to server
cli = socket(AF_INET, SOCK_DGRAM) cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cli.settimeout(30) cli.settimeout(30)
cli.connect((data[0], 80)) cli.connect((data[0], 80))
cli.send(test_msg) cli.send(test_msg)
@ -55,5 +55,6 @@ def test_examples_protocol_asio_udp_server(env, extra_data):
# 5. check the client message appears also on server terminal # 5. check the client message appears also on server terminal
dut1.expect(test_msg.decode()) dut1.expect(test_msg.decode())
if __name__ == '__main__': if __name__ == '__main__':
test_examples_protocol_asio_udp_server() test_examples_protocol_asio_udp_server()

View file

@ -2,16 +2,18 @@ import re
import os import os
import sys import sys
# this is a test case write with tiny-test-fw. try:
# to run test cases outside tiny-test-fw, import IDF
# we need to set environment variable `TEST_FW_PATH`, except ImportError:
# then get and insert `TEST_FW_PATH` to sys path before import FW module # this is a test case write with tiny-test-fw.
test_fw_path = os.getenv("TEST_FW_PATH") # to run test cases outside tiny-test-fw,
if test_fw_path and test_fw_path not in sys.path: # we need to set environment variable `TEST_FW_PATH`,
sys.path.insert(0, test_fw_path) # then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW import IDF
import IDF
@IDF.idf_example_test(env_tag="Example_WIFI", ignore=True) @IDF.idf_example_test(env_tag="Example_WIFI", ignore=True)
@ -25,8 +27,8 @@ def test_examples_protocol_esp_http_client(env, extra_data):
# check and log bin size # check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "esp-http-client-example.bin") binary_file = os.path.join(dut1.app.binary_path, "esp-http-client-example.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("esp_http_client_bin_size", "{}KB".format(bin_size//1024)) IDF.log_performance("esp_http_client_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("esp_http_client_bin_size", bin_size//1024) IDF.check_performance("esp_http_client_bin_size", bin_size // 1024)
# start test # start test
dut1.start_app() dut1.start_app()
dut1.expect("Connected to AP, begin http example", timeout=30) dut1.expect("Connected to AP, begin http example", timeout=30)

View file

@ -21,28 +21,29 @@ import imp
import re import re
import os import os
import sys import sys
import string
import random
import socket
# This environment variable is expected on the host machine try:
test_fw_path = os.getenv("TEST_FW_PATH") import IDF
if test_fw_path and test_fw_path not in sys.path: except ImportError:
sys.path.insert(0, test_fw_path) # This environment variable is expected on the host machine
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import Utility
# When running on local machine execute the following before running this script # When running on local machine execute the following before running this script
# > make app bootloader # > make app bootloader
# > make print_flash_cmd | tail -n 1 > build/download.config # > make print_flash_cmd | tail -n 1 > build/download.config
# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw # > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
import TinyFW
import IDF
import Utility
# Import client module # Import client module
expath = os.path.dirname(os.path.realpath(__file__)) expath = os.path.dirname(os.path.realpath(__file__))
client = imp.load_source("client", expath + "/scripts/test.py") client = imp.load_source("client", expath + "/scripts/test.py")
# Due to connectivity issues (between runner host and DUT) in the runner environment, # Due to connectivity issues (between runner host and DUT) in the runner environment,
# some of the `advanced_tests` are ignored. These tests are intended for verifying # some of the `advanced_tests` are ignored. These tests are intended for verifying
# the expected limits of the http_server capabilities, and implement sending and receiving # the expected limits of the http_server capabilities, and implement sending and receiving
@ -57,8 +58,8 @@ def test_examples_protocol_http_server_advanced(env, extra_data):
# Get binary file # Get binary file
binary_file = os.path.join(dut1.app.binary_path, "tests.bin") binary_file = os.path.join(dut1.app.binary_path, "tests.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("http_server_bin_size", "{}KB".format(bin_size//1024)) IDF.log_performance("http_server_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("http_server_bin_size", bin_size//1024) IDF.check_performance("http_server_bin_size", bin_size // 1024)
# Upload binary and start testing # Upload binary and start testing
Utility.console_log("Starting http_server advanced test app") Utility.console_log("Starting http_server advanced test app")
@ -69,8 +70,10 @@ def test_examples_protocol_http_server_advanced(env, extra_data):
got_ip = dut1.expect(re.compile(r"(?:[\s\S]*)Got IP: '(\d+.\d+.\d+.\d+)'"), timeout=30)[0] got_ip = dut1.expect(re.compile(r"(?:[\s\S]*)Got IP: '(\d+.\d+.\d+.\d+)'"), timeout=30)[0]
got_port = dut1.expect(re.compile(r"(?:[\s\S]*)Started HTTP server on port: '(\d+)'"), timeout=15)[0] got_port = dut1.expect(re.compile(r"(?:[\s\S]*)Started HTTP server on port: '(\d+)'"), timeout=15)[0]
result = dut1.expect(re.compile(r"(?:[\s\S]*)Max URI handlers: '(\d+)'(?:[\s\S]*)Max Open Sessions: '(\d+)'(?:[\s\S]*)Max Header Length: '(\d+)'(?:[\s\S]*)Max URI Length: '(\d+)'(?:[\s\S]*)Max Stack Size: '(\d+)'"), timeout=15) result = dut1.expect(re.compile(r"(?:[\s\S]*)Max URI handlers: '(\d+)'(?:[\s\S]*)Max Open Sessions: " # noqa: W605
max_uri_handlers = int(result[0]) r"'(\d+)'(?:[\s\S]*)Max Header Length: '(\d+)'(?:[\s\S]*)Max URI Length: "
r"'(\d+)'(?:[\s\S]*)Max Stack Size: '(\d+)'"), timeout=15)
# max_uri_handlers = int(result[0])
max_sessions = int(result[1]) max_sessions = int(result[1])
max_hdr_len = int(result[2]) max_hdr_len = int(result[2])
max_uri_len = int(result[3]) max_uri_len = int(result[3])
@ -95,9 +98,9 @@ def test_examples_protocol_http_server_advanced(env, extra_data):
if not client.recv_timeout_test(got_ip, got_port): if not client.recv_timeout_test(got_ip, got_port):
failed = True failed = True
## This test fails a lot! Enable when connection is stable # This test fails a lot! Enable when connection is stable
#test_size = 50*1024 # 50KB # test_size = 50*1024 # 50KB
#if not client.packet_size_limit_test(got_ip, got_port, test_size): # if not client.packet_size_limit_test(got_ip, got_port, test_size):
# Utility.console_log("Ignoring failure") # Utility.console_log("Ignoring failure")
Utility.console_log("Getting initial stack usage...") Utility.console_log("Getting initial stack usage...")
@ -106,7 +109,7 @@ def test_examples_protocol_http_server_advanced(env, extra_data):
inital_stack = int(dut1.expect(re.compile(r"(?:[\s\S]*)Free Stack for server task: '(\d+)'"), timeout=15)[0]) inital_stack = int(dut1.expect(re.compile(r"(?:[\s\S]*)Free Stack for server task: '(\d+)'"), timeout=15)[0])
if inital_stack < 0.1*max_stack_size: if inital_stack < 0.1 * max_stack_size:
Utility.console_log("More than 90% of stack being used on server start") Utility.console_log("More than 90% of stack being used on server start")
failed = True failed = True
@ -158,12 +161,13 @@ def test_examples_protocol_http_server_advanced(env, extra_data):
final_stack = int(dut1.expect(re.compile(r"(?:[\s\S]*)Free Stack for server task: '(\d+)'"), timeout=15)[0]) final_stack = int(dut1.expect(re.compile(r"(?:[\s\S]*)Free Stack for server task: '(\d+)'"), timeout=15)[0])
if final_stack < 0.05*max_stack_size: if final_stack < 0.05 * max_stack_size:
Utility.console_log("More than 95% of stack got used during tests") Utility.console_log("More than 95% of stack got used during tests")
failed = True failed = True
if failed: if failed:
raise RuntimeError raise RuntimeError
if __name__ == '__main__': if __name__ == '__main__':
test_examples_protocol_http_server_advanced() test_examples_protocol_http_server_advanced()

View file

@ -94,7 +94,7 @@
# - Server should automatically close the socket # - Server should automatically close the socket
############# TODO TESTS ############# # ############ TODO TESTS #############
# 3. Stress Tests # 3. Stress Tests
# #
@ -131,8 +131,6 @@
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
from future import standard_library
standard_library.install_aliases()
from builtins import str from builtins import str
from builtins import range from builtins import range
from builtins import object from builtins import object
@ -148,9 +146,10 @@ import Utility
_verbose_ = False _verbose_ = False
class Session(object): class Session(object):
def __init__(self, addr, port, timeout = 15): def __init__(self, addr, port, timeout=15):
self.client = socket.create_connection((addr, int(port)), timeout = timeout) self.client = socket.create_connection((addr, int(port)), timeout=timeout)
self.target = addr self.target = addr
self.status = 0 self.status = 0
self.encoding = '' self.encoding = ''
@ -160,7 +159,7 @@ class Session(object):
def send_err_check(self, request, data=None): def send_err_check(self, request, data=None):
rval = True rval = True
try: try:
self.client.sendall(request.encode()); self.client.sendall(request.encode())
if data: if data:
self.client.sendall(data.encode()) self.client.sendall(data.encode())
except socket.error as err: except socket.error as err:
@ -173,24 +172,24 @@ class Session(object):
request = "GET " + path + " HTTP/1.1\r\nHost: " + self.target request = "GET " + path + " HTTP/1.1\r\nHost: " + self.target
if headers: if headers:
for field, value in headers.items(): for field, value in headers.items():
request += "\r\n"+field+": "+value request += "\r\n" + field + ": " + value
request += "\r\n\r\n" request += "\r\n\r\n"
return self.send_err_check(request) return self.send_err_check(request)
def send_put(self, path, data, headers=None): def send_put(self, path, data, headers=None):
request = "PUT " + path + " HTTP/1.1\r\nHost: " + self.target request = "PUT " + path + " HTTP/1.1\r\nHost: " + self.target
if headers: if headers:
for field, value in headers.items(): for field, value in headers.items():
request += "\r\n"+field+": "+value request += "\r\n" + field + ": " + value
request += "\r\nContent-Length: " + str(len(data)) +"\r\n\r\n" request += "\r\nContent-Length: " + str(len(data)) + "\r\n\r\n"
return self.send_err_check(request, data) return self.send_err_check(request, data)
def send_post(self, path, data, headers=None): def send_post(self, path, data, headers=None):
request = "POST " + path + " HTTP/1.1\r\nHost: " + self.target request = "POST " + path + " HTTP/1.1\r\nHost: " + self.target
if headers: if headers:
for field, value in headers.items(): for field, value in headers.items():
request += "\r\n"+field+": "+value request += "\r\n" + field + ": " + value
request += "\r\nContent-Length: " + str(len(data)) +"\r\n\r\n" request += "\r\nContent-Length: " + str(len(data)) + "\r\n\r\n"
return self.send_err_check(request, data) return self.send_err_check(request, data)
def read_resp_hdrs(self): def read_resp_hdrs(self):
@ -284,6 +283,7 @@ class Session(object):
def close(self): def close(self):
self.client.close() self.client.close()
def test_val(text, expected, received): def test_val(text, expected, received):
if expected != received: if expected != received:
Utility.console_log(" Fail!") Utility.console_log(" Fail!")
@ -293,6 +293,7 @@ def test_val(text, expected, received):
return False return False
return True return True
class adder_thread (threading.Thread): class adder_thread (threading.Thread):
def __init__(self, id, dut, port): def __init__(self, id, dut, port):
threading.Thread.__init__(self) threading.Thread.__init__(self)
@ -329,6 +330,7 @@ class adder_thread (threading.Thread):
def close(self): def close(self):
self.session.close() self.session.close()
def get_hello(dut, port): def get_hello(dut, port):
# GET /hello should return 'Hello World!' # GET /hello should return 'Hello World!'
Utility.console_log("[test] GET /hello returns 'Hello World!' =>", end=' ') Utility.console_log("[test] GET /hello returns 'Hello World!' =>", end=' ')
@ -348,6 +350,7 @@ def get_hello(dut, port):
conn.close() conn.close()
return True return True
def put_hello(dut, port): def put_hello(dut, port):
# PUT /hello returns 405' # PUT /hello returns 405'
Utility.console_log("[test] PUT /hello returns 405 =>", end=' ') Utility.console_log("[test] PUT /hello returns 405 =>", end=' ')
@ -361,6 +364,7 @@ def put_hello(dut, port):
conn.close() conn.close()
return True return True
def post_hello(dut, port): def post_hello(dut, port):
# POST /hello returns 405' # POST /hello returns 405'
Utility.console_log("[test] POST /hello returns 404 =>", end=' ') Utility.console_log("[test] POST /hello returns 404 =>", end=' ')
@ -374,6 +378,7 @@ def post_hello(dut, port):
conn.close() conn.close()
return True return True
def post_echo(dut, port): def post_echo(dut, port):
# POST /echo echoes data' # POST /echo echoes data'
Utility.console_log("[test] POST /echo echoes data =>", end=' ') Utility.console_log("[test] POST /echo echoes data =>", end=' ')
@ -390,6 +395,7 @@ def post_echo(dut, port):
conn.close() conn.close()
return True return True
def put_echo(dut, port): def put_echo(dut, port):
# PUT /echo echoes data' # PUT /echo echoes data'
Utility.console_log("[test] PUT /echo echoes data =>", end=' ') Utility.console_log("[test] PUT /echo echoes data =>", end=' ')
@ -406,6 +412,7 @@ def put_echo(dut, port):
conn.close() conn.close()
return True return True
def get_echo(dut, port): def get_echo(dut, port):
# GET /echo returns 404' # GET /echo returns 404'
Utility.console_log("[test] GET /echo returns 405 =>", end=' ') Utility.console_log("[test] GET /echo returns 405 =>", end=' ')
@ -419,6 +426,7 @@ def get_echo(dut, port):
conn.close() conn.close()
return True return True
def get_hello_type(dut, port): def get_hello_type(dut, port):
# GET /hello/type_html returns text/html as Content-Type' # GET /hello/type_html returns text/html as Content-Type'
Utility.console_log("[test] GET /hello/type_html has Content-Type of text/html =>", end=' ') Utility.console_log("[test] GET /hello/type_html has Content-Type of text/html =>", end=' ')
@ -438,6 +446,7 @@ def get_hello_type(dut, port):
conn.close() conn.close()
return True return True
def get_hello_status(dut, port): def get_hello_status(dut, port):
# GET /hello/status_500 returns status 500' # GET /hello/status_500 returns status 500'
Utility.console_log("[test] GET /hello/status_500 returns status 500 =>", end=' ') Utility.console_log("[test] GET /hello/status_500 returns status 500 =>", end=' ')
@ -451,6 +460,7 @@ def get_hello_status(dut, port):
conn.close() conn.close()
return True return True
def get_false_uri(dut, port): def get_false_uri(dut, port):
# GET /false_uri returns status 404' # GET /false_uri returns status 404'
Utility.console_log("[test] GET /false_uri returns status 404 =>", end=' ') Utility.console_log("[test] GET /false_uri returns status 404 =>", end=' ')
@ -464,6 +474,7 @@ def get_false_uri(dut, port):
conn.close() conn.close()
return True return True
def parallel_sessions_adder(dut, port, max_sessions): def parallel_sessions_adder(dut, port, max_sessions):
# POSTs on /adder in parallel sessions # POSTs on /adder in parallel sessions
Utility.console_log("[test] POST {pipelined} on /adder in " + str(max_sessions) + " sessions =>", end=' ') Utility.console_log("[test] POST {pipelined} on /adder in " + str(max_sessions) + " sessions =>", end=' ')
@ -487,6 +498,7 @@ def parallel_sessions_adder(dut, port, max_sessions):
Utility.console_log("Success") Utility.console_log("Success")
return res return res
def async_response_test(dut, port): def async_response_test(dut, port):
# Test that an asynchronous work is executed in the HTTPD's context # Test that an asynchronous work is executed in the HTTPD's context
# This is tested by reading two responses over the same session # This is tested by reading two responses over the same session
@ -506,6 +518,7 @@ def async_response_test(dut, port):
Utility.console_log("Success") Utility.console_log("Success")
return True return True
def leftover_data_test(dut, port): def leftover_data_test(dut, port):
# Leftover data in POST is purged (valid and invalid URIs) # Leftover data in POST is purged (valid and invalid URIs)
Utility.console_log("[test] Leftover data in POST is purged (valid and invalid URIs) =>", end=' ') Utility.console_log("[test] Leftover data in POST is purged (valid and invalid URIs) =>", end=' ')
@ -540,6 +553,7 @@ def leftover_data_test(dut, port):
Utility.console_log("Success") Utility.console_log("Success")
return True return True
def spillover_session(dut, port, max_sess): def spillover_session(dut, port, max_sess):
# Session max_sess_sessions + 1 is rejected # Session max_sess_sessions + 1 is rejected
Utility.console_log("[test] Session max_sess_sessions (" + str(max_sess) + ") + 1 is rejected =>", end=' ') Utility.console_log("[test] Session max_sess_sessions (" + str(max_sess) + ") + 1 is rejected =>", end=' ')
@ -556,7 +570,7 @@ def spillover_session(dut, port, max_sess):
a.close() a.close()
break break
s.append(a) s.append(a)
except: except Exception:
if (_verbose_): if (_verbose_):
Utility.console_log("Connection " + str(i) + " rejected") Utility.console_log("Connection " + str(i) + " rejected")
a.close() a.close()
@ -570,6 +584,7 @@ def spillover_session(dut, port, max_sess):
Utility.console_log(["Fail","Success"][len(s) == max_sess]) Utility.console_log(["Fail","Success"][len(s) == max_sess])
return (len(s) == max_sess) return (len(s) == max_sess)
def recv_timeout_test(dut, port): def recv_timeout_test(dut, port):
Utility.console_log("[test] Timeout occurs if partial packet sent =>", end=' ') Utility.console_log("[test] Timeout occurs if partial packet sent =>", end=' ')
s = Session(dut, port) s = Session(dut, port)
@ -583,6 +598,7 @@ def recv_timeout_test(dut, port):
Utility.console_log("Success") Utility.console_log("Success")
return True return True
def packet_size_limit_test(dut, port, test_size): def packet_size_limit_test(dut, port, test_size):
Utility.console_log("[test] send size limit test =>", end=' ') Utility.console_log("[test] send size limit test =>", end=' ')
retry = 5 retry = 5
@ -590,14 +606,14 @@ def packet_size_limit_test(dut, port, test_size):
retry -= 1 retry -= 1
Utility.console_log("data size = ", test_size) Utility.console_log("data size = ", test_size)
s = http.client.HTTPConnection(dut + ":" + port, timeout=15) s = http.client.HTTPConnection(dut + ":" + port, timeout=15)
random_data = ''.join(string.printable[random.randint(0,len(string.printable))-1] for _ in list(range(test_size))) random_data = ''.join(string.printable[random.randint(0,len(string.printable)) - 1] for _ in list(range(test_size)))
path = "/echo" path = "/echo"
s.request("POST", url=path, body=random_data) s.request("POST", url=path, body=random_data)
resp = s.getresponse() resp = s.getresponse()
if not test_val("Error", "200", str(resp.status)): if not test_val("Error", "200", str(resp.status)):
if test_val("Error", "500", str(resp.status)): if test_val("Error", "500", str(resp.status)):
Utility.console_log("Data too large to be allocated") Utility.console_log("Data too large to be allocated")
test_size = test_size//10 test_size = test_size // 10
else: else:
Utility.console_log("Unexpected error") Utility.console_log("Unexpected error")
s.close() s.close()
@ -616,6 +632,7 @@ def packet_size_limit_test(dut, port, test_size):
Utility.console_log("Failed") Utility.console_log("Failed")
return False return False
def code_500_server_error_test(dut, port): def code_500_server_error_test(dut, port):
Utility.console_log("[test] 500 Server Error test =>", end=' ') Utility.console_log("[test] 500 Server Error test =>", end=' ')
s = Session(dut, port) s = Session(dut, port)
@ -623,7 +640,7 @@ def code_500_server_error_test(dut, port):
content_len = 2**31 content_len = 2**31
s.client.sendall(("POST /echo HTTP/1.1\r\nHost: " + dut + "\r\nContent-Length: " + str(content_len) + "\r\n\r\nABCD").encode()) s.client.sendall(("POST /echo HTTP/1.1\r\nHost: " + dut + "\r\nContent-Length: " + str(content_len) + "\r\n\r\nABCD").encode())
s.read_resp_hdrs() s.read_resp_hdrs()
resp = s.read_resp_data() s.read_resp_data()
if not test_val("Server Error", "500", s.status): if not test_val("Server Error", "500", s.status):
s.close() s.close()
return False return False
@ -631,17 +648,18 @@ def code_500_server_error_test(dut, port):
Utility.console_log("Success") Utility.console_log("Success")
return True return True
def code_501_method_not_impl(dut, port): def code_501_method_not_impl(dut, port):
Utility.console_log("[test] 501 Method Not Implemented =>", end=' ') Utility.console_log("[test] 501 Method Not Implemented =>", end=' ')
s = Session(dut, port) s = Session(dut, port)
path = "/hello" path = "/hello"
s.client.sendall(("ABC " + path + " HTTP/1.1\r\nHost: " + dut + "\r\n\r\n").encode()) s.client.sendall(("ABC " + path + " HTTP/1.1\r\nHost: " + dut + "\r\n\r\n").encode())
s.read_resp_hdrs() s.read_resp_hdrs()
resp = s.read_resp_data() s.read_resp_data()
# Presently server sends back 400 Bad Request # Presently server sends back 400 Bad Request
#if not test_val("Server Error", "501", s.status): # if not test_val("Server Error", "501", s.status):
#s.close() # s.close()
#return False # return False
if not test_val("Server Error", "400", s.status): if not test_val("Server Error", "400", s.status):
s.close() s.close()
return False return False
@ -649,13 +667,14 @@ def code_501_method_not_impl(dut, port):
Utility.console_log("Success") Utility.console_log("Success")
return True return True
def code_505_version_not_supported(dut, port): def code_505_version_not_supported(dut, port):
Utility.console_log("[test] 505 Version Not Supported =>", end=' ') Utility.console_log("[test] 505 Version Not Supported =>", end=' ')
s = Session(dut, port) s = Session(dut, port)
path = "/hello" path = "/hello"
s.client.sendall(("GET " + path + " HTTP/2.0\r\nHost: " + dut + "\r\n\r\n").encode()) s.client.sendall(("GET " + path + " HTTP/2.0\r\nHost: " + dut + "\r\n\r\n").encode())
s.read_resp_hdrs() s.read_resp_hdrs()
resp = s.read_resp_data() s.read_resp_data()
if not test_val("Server Error", "505", s.status): if not test_val("Server Error", "505", s.status):
s.close() s.close()
return False return False
@ -663,13 +682,14 @@ def code_505_version_not_supported(dut, port):
Utility.console_log("Success") Utility.console_log("Success")
return True return True
def code_400_bad_request(dut, port): def code_400_bad_request(dut, port):
Utility.console_log("[test] 400 Bad Request =>", end=' ') Utility.console_log("[test] 400 Bad Request =>", end=' ')
s = Session(dut, port) s = Session(dut, port)
path = "/hello" path = "/hello"
s.client.sendall(("XYZ " + path + " HTTP/1.1\r\nHost: " + dut + "\r\n\r\n").encode()) s.client.sendall(("XYZ " + path + " HTTP/1.1\r\nHost: " + dut + "\r\n\r\n").encode())
s.read_resp_hdrs() s.read_resp_hdrs()
resp = s.read_resp_data() s.read_resp_data()
if not test_val("Client Error", "400", s.status): if not test_val("Client Error", "400", s.status):
s.close() s.close()
return False return False
@ -677,13 +697,14 @@ def code_400_bad_request(dut, port):
Utility.console_log("Success") Utility.console_log("Success")
return True return True
def code_404_not_found(dut, port): def code_404_not_found(dut, port):
Utility.console_log("[test] 404 Not Found =>", end=' ') Utility.console_log("[test] 404 Not Found =>", end=' ')
s = Session(dut, port) s = Session(dut, port)
path = "/dummy" path = "/dummy"
s.client.sendall(("GET " + path + " HTTP/1.1\r\nHost: " + dut + "\r\n\r\n").encode()) s.client.sendall(("GET " + path + " HTTP/1.1\r\nHost: " + dut + "\r\n\r\n").encode())
s.read_resp_hdrs() s.read_resp_hdrs()
resp = s.read_resp_data() s.read_resp_data()
if not test_val("Client Error", "404", s.status): if not test_val("Client Error", "404", s.status):
s.close() s.close()
return False return False
@ -691,13 +712,14 @@ def code_404_not_found(dut, port):
Utility.console_log("Success") Utility.console_log("Success")
return True return True
def code_405_method_not_allowed(dut, port): def code_405_method_not_allowed(dut, port):
Utility.console_log("[test] 405 Method Not Allowed =>", end=' ') Utility.console_log("[test] 405 Method Not Allowed =>", end=' ')
s = Session(dut, port) s = Session(dut, port)
path = "/hello" path = "/hello"
s.client.sendall(("POST " + path + " HTTP/1.1\r\nHost: " + dut + "\r\n\r\n").encode()) s.client.sendall(("POST " + path + " HTTP/1.1\r\nHost: " + dut + "\r\n\r\n").encode())
s.read_resp_hdrs() s.read_resp_hdrs()
resp = s.read_resp_data() s.read_resp_data()
if not test_val("Client Error", "405", s.status): if not test_val("Client Error", "405", s.status):
s.close() s.close()
return False return False
@ -705,12 +727,13 @@ def code_405_method_not_allowed(dut, port):
Utility.console_log("Success") Utility.console_log("Success")
return True return True
def code_408_req_timeout(dut, port): def code_408_req_timeout(dut, port):
Utility.console_log("[test] 408 Request Timeout =>", end=' ') Utility.console_log("[test] 408 Request Timeout =>", end=' ')
s = Session(dut, port) s = Session(dut, port)
s.client.sendall(("POST /echo HTTP/1.1\r\nHost: " + dut + "\r\nContent-Length: 10\r\n\r\nABCD").encode()) s.client.sendall(("POST /echo HTTP/1.1\r\nHost: " + dut + "\r\nContent-Length: 10\r\n\r\nABCD").encode())
s.read_resp_hdrs() s.read_resp_hdrs()
resp = s.read_resp_data() s.read_resp_data()
if not test_val("Client Error", "408", s.status): if not test_val("Client Error", "408", s.status):
s.close() s.close()
return False return False
@ -718,17 +741,18 @@ def code_408_req_timeout(dut, port):
Utility.console_log("Success") Utility.console_log("Success")
return True return True
def code_411_length_required(dut, port): def code_411_length_required(dut, port):
Utility.console_log("[test] 411 Length Required =>", end=' ') Utility.console_log("[test] 411 Length Required =>", end=' ')
s = Session(dut, port) s = Session(dut, port)
path = "/echo" path = "/echo"
s.client.sendall(("POST " + path + " HTTP/1.1\r\nHost: " + dut + "\r\nContent-Type: text/plain\r\nTransfer-Encoding: chunked\r\n\r\n").encode()) s.client.sendall(("POST " + path + " HTTP/1.1\r\nHost: " + dut + "\r\nContent-Type: text/plain\r\nTransfer-Encoding: chunked\r\n\r\n").encode())
s.read_resp_hdrs() s.read_resp_hdrs()
resp = s.read_resp_data() s.read_resp_data()
# Presently server sends back 400 Bad Request # Presently server sends back 400 Bad Request
#if not test_val("Client Error", "411", s.status): # if not test_val("Client Error", "411", s.status):
#s.close() # s.close()
#return False # return False
if not test_val("Client Error", "400", s.status): if not test_val("Client Error", "400", s.status):
s.close() s.close()
return False return False
@ -736,21 +760,23 @@ def code_411_length_required(dut, port):
Utility.console_log("Success") Utility.console_log("Success")
return True return True
def send_getx_uri_len(dut, port, length): def send_getx_uri_len(dut, port, length):
s = Session(dut, port) s = Session(dut, port)
method = "GET " method = "GET "
version = " HTTP/1.1\r\n" version = " HTTP/1.1\r\n"
path = "/"+"x"*(length - len(method) - len(version) - len("/")) path = "/" + "x" * (length - len(method) - len(version) - len("/"))
s.client.sendall(method.encode()) s.client.sendall(method.encode())
time.sleep(1) time.sleep(1)
s.client.sendall(path.encode()) s.client.sendall(path.encode())
time.sleep(1) time.sleep(1)
s.client.sendall((version + "Host: " + dut + "\r\n\r\n").encode()) s.client.sendall((version + "Host: " + dut + "\r\n\r\n").encode())
s.read_resp_hdrs() s.read_resp_hdrs()
resp = s.read_resp_data() s.read_resp_data()
s.close() s.close()
return s.status return s.status
def code_414_uri_too_long(dut, port, max_uri_len): def code_414_uri_too_long(dut, port, max_uri_len):
Utility.console_log("[test] 414 URI Too Long =>", end=' ') Utility.console_log("[test] 414 URI Too Long =>", end=' ')
status = send_getx_uri_len(dut, port, max_uri_len) status = send_getx_uri_len(dut, port, max_uri_len)
@ -762,16 +788,17 @@ def code_414_uri_too_long(dut, port, max_uri_len):
Utility.console_log("Success") Utility.console_log("Success")
return True return True
def send_postx_hdr_len(dut, port, length): def send_postx_hdr_len(dut, port, length):
s = Session(dut, port) s = Session(dut, port)
path = "/echo" path = "/echo"
host = "Host: " + dut host = "Host: " + dut
custom_hdr_field = "\r\nCustom: " custom_hdr_field = "\r\nCustom: "
custom_hdr_val = "x"*(length - len(host) - len(custom_hdr_field) - len("\r\n\r\n") + len("0")) custom_hdr_val = "x" * (length - len(host) - len(custom_hdr_field) - len("\r\n\r\n") + len("0"))
request = ("POST " + path + " HTTP/1.1\r\n" + host + custom_hdr_field + custom_hdr_val + "\r\n\r\n").encode() request = ("POST " + path + " HTTP/1.1\r\n" + host + custom_hdr_field + custom_hdr_val + "\r\n\r\n").encode()
s.client.sendall(request[:length//2]) s.client.sendall(request[:length // 2])
time.sleep(1) time.sleep(1)
s.client.sendall(request[length//2:]) s.client.sendall(request[length // 2:])
hdr = s.read_resp_hdrs() hdr = s.read_resp_hdrs()
resp = s.read_resp_data() resp = s.read_resp_data()
s.close() s.close()
@ -779,6 +806,7 @@ def send_postx_hdr_len(dut, port, length):
return (hdr["Custom"] == custom_hdr_val), resp return (hdr["Custom"] == custom_hdr_val), resp
return False, s.status return False, s.status
def code_431_hdr_too_long(dut, port, max_hdr_len): def code_431_hdr_too_long(dut, port, max_hdr_len):
Utility.console_log("[test] 431 Header Too Long =>", end=' ') Utility.console_log("[test] 431 Header Too Long =>", end=' ')
res, status = send_postx_hdr_len(dut, port, max_hdr_len) res, status = send_postx_hdr_len(dut, port, max_hdr_len)
@ -790,13 +818,14 @@ def code_431_hdr_too_long(dut, port, max_hdr_len):
Utility.console_log("Success") Utility.console_log("Success")
return True return True
def test_upgrade_not_supported(dut, port): def test_upgrade_not_supported(dut, port):
Utility.console_log("[test] Upgrade Not Supported =>", end=' ') Utility.console_log("[test] Upgrade Not Supported =>", end=' ')
s = Session(dut, port) s = Session(dut, port)
path = "/hello" # path = "/hello"
s.client.sendall(("OPTIONS * HTTP/1.1\r\nHost:" + dut + "\r\nUpgrade: TLS/1.0\r\nConnection: Upgrade\r\n\r\n").encode()) s.client.sendall(("OPTIONS * HTTP/1.1\r\nHost:" + dut + "\r\nUpgrade: TLS/1.0\r\nConnection: Upgrade\r\n\r\n").encode())
s.read_resp_hdrs() s.read_resp_hdrs()
resp = s.read_resp_data() s.read_resp_data()
if not test_val("Client Error", "200", s.status): if not test_val("Client Error", "200", s.status):
s.close() s.close()
return False return False
@ -804,8 +833,9 @@ def test_upgrade_not_supported(dut, port):
Utility.console_log("Success") Utility.console_log("Success")
return True return True
if __name__ == '__main__': if __name__ == '__main__':
########### Execution begins here... # Execution begins here...
# Configuration # Configuration
# Max number of threads/sessions # Max number of threads/sessions
max_sessions = 7 max_sessions = 7
@ -849,7 +879,7 @@ if __name__ == '__main__':
test_upgrade_not_supported(dut, port) test_upgrade_not_supported(dut, port)
# Not supported yet (Error on chunked request) # Not supported yet (Error on chunked request)
###code_411_length_required(dut, port) # code_411_length_required(dut, port)
Utility.console_log("### Sessions and Context Tests") Utility.console_log("### Sessions and Context Tests")
parallel_sessions_adder(dut, port, max_sessions) parallel_sessions_adder(dut, port, max_sessions)
@ -857,7 +887,7 @@ if __name__ == '__main__':
async_response_test(dut, port) async_response_test(dut, port)
spillover_session(dut, port, max_sessions) spillover_session(dut, port, max_sessions)
recv_timeout_test(dut, port) recv_timeout_test(dut, port)
packet_size_limit_test(dut, port, 50*1024) packet_size_limit_test(dut, port, 50 * 1024)
get_hello(dut, port) get_hello(dut, port)
sys.exit() sys.exit()

View file

@ -23,28 +23,29 @@ import imp
import re import re
import os import os
import sys import sys
import string
import random import random
import socket
# This environment variable is expected on the host machine try:
test_fw_path = os.getenv("TEST_FW_PATH") import IDF
if test_fw_path and test_fw_path not in sys.path: except ImportError:
sys.path.insert(0, test_fw_path) # This environment variable is expected on the host machine
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import Utility
# When running on local machine execute the following before running this script # When running on local machine execute the following before running this script
# > make app bootloader # > make app bootloader
# > make print_flash_cmd | tail -n 1 > build/download.config # > make print_flash_cmd | tail -n 1 > build/download.config
# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw # > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
import TinyFW
import IDF
import Utility
# Import client module # Import client module
expath = os.path.dirname(os.path.realpath(__file__)) expath = os.path.dirname(os.path.realpath(__file__))
client = imp.load_source("client", expath + "/scripts/adder.py") client = imp.load_source("client", expath + "/scripts/adder.py")
@IDF.idf_example_test(env_tag="Example_WIFI") @IDF.idf_example_test(env_tag="Example_WIFI")
def test_examples_protocol_http_server_persistence(env, extra_data): def test_examples_protocol_http_server_persistence(env, extra_data):
# Acquire DUT # Acquire DUT
@ -53,8 +54,8 @@ def test_examples_protocol_http_server_persistence(env, extra_data):
# Get binary file # Get binary file
binary_file = os.path.join(dut1.app.binary_path, "persistent_sockets.bin") binary_file = os.path.join(dut1.app.binary_path, "persistent_sockets.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("http_server_bin_size", "{}KB".format(bin_size//1024)) IDF.log_performance("http_server_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("http_server_bin_size", bin_size//1024) IDF.check_performance("http_server_bin_size", bin_size // 1024)
# Upload binary and start testing # Upload binary and start testing
Utility.console_log("Starting http_server persistance test app") Utility.console_log("Starting http_server persistance test app")
@ -97,7 +98,7 @@ def test_examples_protocol_http_server_persistence(env, extra_data):
dut1.expect("PUT allocating new session", timeout=30) dut1.expect("PUT allocating new session", timeout=30)
# Not expected # Not expected
raise RuntimeError raise RuntimeError
except: except Exception:
# As expected # As expected
pass pass
@ -126,7 +127,7 @@ def test_examples_protocol_http_server_persistence(env, extra_data):
Utility.console_log("Validating user context data") Utility.console_log("Validating user context data")
# Start another session to check user context data # Start another session to check user context data
conn2 = client.start_session(got_ip, got_port) client.start_session(got_ip, got_port)
num = random.randint(0,100) num = random.randint(0,100)
client.putreq(conn, "/adder", str(num)) client.putreq(conn, "/adder", str(num))
visitor += 1 visitor += 1
@ -136,5 +137,6 @@ def test_examples_protocol_http_server_persistence(env, extra_data):
client.end_session(conn) client.end_session(conn)
dut1.expect("/adder Free Context function called", timeout=30) dut1.expect("/adder Free Context function called", timeout=30)
if __name__ == '__main__': if __name__ == '__main__':
test_examples_protocol_http_server_persistence() test_examples_protocol_http_server_persistence()

View file

@ -16,20 +16,22 @@
from __future__ import print_function from __future__ import print_function
from __future__ import unicode_literals from __future__ import unicode_literals
from future import standard_library
standard_library.install_aliases()
from builtins import str from builtins import str
from builtins import range from builtins import range
import http.client import http.client
import argparse import argparse
import Utility
def start_session (ip, port):
def start_session(ip, port):
return http.client.HTTPConnection(ip, int(port), timeout=15) return http.client.HTTPConnection(ip, int(port), timeout=15)
def end_session (conn):
def end_session(conn):
conn.close() conn.close()
def getreq (conn, path, verbose = False):
def getreq(conn, path, verbose=False):
conn.request("GET", path) conn.request("GET", path)
resp = conn.getresponse() resp = conn.getresponse()
data = resp.read() data = resp.read()
@ -41,7 +43,8 @@ def getreq (conn, path, verbose = False):
Utility.console_log("Data content : " + data) Utility.console_log("Data content : " + data)
return data return data
def postreq (conn, path, data, verbose = False):
def postreq(conn, path, data, verbose=False):
conn.request("POST", path, data) conn.request("POST", path, data)
resp = conn.getresponse() resp = conn.getresponse()
data = resp.read() data = resp.read()
@ -53,7 +56,8 @@ def postreq (conn, path, data, verbose = False):
Utility.console_log("Data content : " + data) Utility.console_log("Data content : " + data)
return data return data
def putreq (conn, path, body, verbose = False):
def putreq(conn, path, body, verbose=False):
conn.request("PUT", path, body) conn.request("PUT", path, body)
resp = conn.getresponse() resp = conn.getresponse()
data = resp.read() data = resp.read()
@ -65,12 +69,13 @@ def putreq (conn, path, body, verbose = False):
Utility.console_log("Data content : " + data) Utility.console_log("Data content : " + data)
return data return data
if __name__ == '__main__': if __name__ == '__main__':
# Configure argument parser # Configure argument parser
parser = argparse.ArgumentParser(description='Run HTTPd Test') parser = argparse.ArgumentParser(description='Run HTTPd Test')
parser.add_argument('IP' , metavar='IP' , type=str, help='Server IP') parser.add_argument('IP', metavar='IP', type=str, help='Server IP')
parser.add_argument('port', metavar='port', type=str, help='Server port') parser.add_argument('port', metavar='port', type=str, help='Server port')
parser.add_argument('N' , metavar='integer', type=int, help='Integer to sum upto') parser.add_argument('N', metavar='integer', type=int, help='Integer to sum upto')
args = vars(parser.parse_args()) args = vars(parser.parse_args())
# Get arguments # Get arguments
@ -80,21 +85,21 @@ if __name__ == '__main__':
# Establish HTTP connection # Establish HTTP connection
Utility.console_log("Connecting to => " + ip + ":" + port) Utility.console_log("Connecting to => " + ip + ":" + port)
conn = start_session (ip, port) conn = start_session(ip, port)
# Reset adder context to specified value(0) # Reset adder context to specified value(0)
# -- Not needed as new connection will always # -- Not needed as new connection will always
# -- have zero value of the accumulator # -- have zero value of the accumulator
Utility.console_log("Reset the accumulator to 0") Utility.console_log("Reset the accumulator to 0")
putreq (conn, "/adder", str(0)) putreq(conn, "/adder", str(0))
# Sum numbers from 1 to specified value(N) # Sum numbers from 1 to specified value(N)
Utility.console_log("Summing numbers from 1 to " + str(N)) Utility.console_log("Summing numbers from 1 to " + str(N))
for i in range(1, N+1): for i in range(1, N + 1):
postreq (conn, "/adder", str(i)) postreq(conn, "/adder", str(i))
# Fetch the result # Fetch the result
Utility.console_log("Result :" + getreq (conn, "/adder")) Utility.console_log("Result :" + getreq(conn, "/adder"))
# Close HTTP connection # Close HTTP connection
end_session (conn) end_session(conn)

View file

@ -24,26 +24,29 @@ import os
import sys import sys
import string import string
import random import random
import socket
# This environment variable is expected on the host machine try:
test_fw_path = os.getenv("TEST_FW_PATH") import IDF
if test_fw_path and test_fw_path not in sys.path: except ImportError:
sys.path.insert(0, test_fw_path) # This environment variable is expected on the host machine
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import Utility
# When running on local machine execute the following before running this script # When running on local machine execute the following before running this script
# > make app bootloader # > make app bootloader
# > make print_flash_cmd | tail -n 1 > build/download.config # > make print_flash_cmd | tail -n 1 > build/download.config
# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw # > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
import TinyFW
import IDF
import Utility
# Import client module # Import client module
expath = os.path.dirname(os.path.realpath(__file__)) expath = os.path.dirname(os.path.realpath(__file__))
client = imp.load_source("client", expath + "/scripts/client.py") client = imp.load_source("client", expath + "/scripts/client.py")
@IDF.idf_example_test(env_tag="Example_WIFI") @IDF.idf_example_test(env_tag="Example_WIFI")
def test_examples_protocol_http_server_simple(env, extra_data): def test_examples_protocol_http_server_simple(env, extra_data):
# Acquire DUT # Acquire DUT
@ -52,8 +55,8 @@ def test_examples_protocol_http_server_simple(env, extra_data):
# Get binary file # Get binary file
binary_file = os.path.join(dut1.app.binary_path, "simple.bin") binary_file = os.path.join(dut1.app.binary_path, "simple.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("http_server_bin_size", "{}KB".format(bin_size//1024)) IDF.log_performance("http_server_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("http_server_bin_size", bin_size//1024) IDF.check_performance("http_server_bin_size", bin_size // 1024)
# Upload binary and start testing # Upload binary and start testing
Utility.console_log("Starting http_server simple test app") Utility.console_log("Starting http_server simple test app")
@ -77,7 +80,7 @@ def test_examples_protocol_http_server_simple(env, extra_data):
raise RuntimeError raise RuntimeError
# Acquire host IP. Need a way to check it # Acquire host IP. Need a way to check it
host_ip = dut1.expect(re.compile(r"(?:[\s\S]*)Found header => Host: (\d+.\d+.\d+.\d+)"), timeout=30)[0] dut1.expect(re.compile(r"(?:[\s\S]*)Found header => Host: (\d+.\d+.\d+.\d+)"), timeout=30)[0]
# Match additional headers sent in the request # Match additional headers sent in the request
dut1.expect("Found header => Test-Header-2: Test-Value-2", timeout=30) dut1.expect("Found header => Test-Header-2: Test-Value-2", timeout=30)
@ -94,7 +97,7 @@ def test_examples_protocol_http_server_simple(env, extra_data):
dut1.expect("Registering /hello and /echo URIs", timeout=30) dut1.expect("Registering /hello and /echo URIs", timeout=30)
# Generate random data of 10KB # Generate random data of 10KB
random_data = ''.join(string.printable[random.randint(0,len(string.printable))-1] for _ in range(10*1024)) random_data = ''.join(string.printable[random.randint(0,len(string.printable)) - 1] for _ in range(10 * 1024))
Utility.console_log("Test /echo POST handler with random data") Utility.console_log("Test /echo POST handler with random data")
if not client.test_post_handler(got_ip, got_port, random_data): if not client.test_post_handler(got_ip, got_port, random_data):
raise RuntimeError raise RuntimeError
@ -117,5 +120,6 @@ def test_examples_protocol_http_server_simple(env, extra_data):
raise RuntimeError raise RuntimeError
dut1.expect("400 Bad Request - Server unable to understand request due to invalid syntax", timeout=30) dut1.expect("400 Bad Request - Server unable to understand request due to invalid syntax", timeout=30)
if __name__ == '__main__': if __name__ == '__main__':
test_examples_protocol_http_server_simple() test_examples_protocol_http_server_simple()

View file

@ -16,21 +16,22 @@
from __future__ import print_function from __future__ import print_function
from __future__ import unicode_literals from __future__ import unicode_literals
from future import standard_library
standard_library.install_aliases()
from builtins import str from builtins import str
import http.client import http.client
import argparse import argparse
import Utility
def verbose_print(verbosity, *args): def verbose_print(verbosity, *args):
if (verbosity): if (verbosity):
Utility.console_log(''.join(str(elems) for elems in args)) Utility.console_log(''.join(str(elems) for elems in args))
def test_get_handler(ip, port, verbosity = False):
def test_get_handler(ip, port, verbosity=False):
verbose_print(verbosity, "======== GET HANDLER TEST =============") verbose_print(verbosity, "======== GET HANDLER TEST =============")
# Establish HTTP connection # Establish HTTP connection
verbose_print(verbosity, "Connecting to => " + ip + ":" + port) verbose_print(verbosity, "Connecting to => " + ip + ":" + port)
sess = http.client.HTTPConnection(ip + ":" + port, timeout = 15) sess = http.client.HTTPConnection(ip + ":" + port, timeout=15)
uri = "/hello?query1=value1&query2=value2&query3=value3" uri = "/hello?query1=value1&query2=value2&query3=value3"
# GET hello response # GET hello response
@ -48,7 +49,7 @@ def test_get_handler(ip, port, verbosity = False):
return False return False
if resp.getheader("Custom-Header-2") != "Custom-Value-2": if resp.getheader("Custom-Header-2") != "Custom-Value-2":
return False return False
except: except Exception:
return False return False
verbose_print(verbosity, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv") verbose_print(verbosity, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
@ -63,11 +64,12 @@ def test_get_handler(ip, port, verbosity = False):
sess.close() sess.close()
return (resp_data == "Hello World!") return (resp_data == "Hello World!")
def test_post_handler(ip, port, msg, verbosity = False):
def test_post_handler(ip, port, msg, verbosity=False):
verbose_print(verbosity, "======== POST HANDLER TEST ============") verbose_print(verbosity, "======== POST HANDLER TEST ============")
# Establish HTTP connection # Establish HTTP connection
verbose_print(verbosity, "Connecting to => " + ip + ":" + port) verbose_print(verbosity, "Connecting to => " + ip + ":" + port)
sess = http.client.HTTPConnection(ip + ":" + port, timeout = 15) sess = http.client.HTTPConnection(ip + ":" + port, timeout=15)
# POST message to /echo and get back response # POST message to /echo and get back response
sess.request("POST", url="/echo", body=msg) sess.request("POST", url="/echo", body=msg)
@ -82,11 +84,12 @@ def test_post_handler(ip, port, msg, verbosity = False):
sess.close() sess.close()
return (resp_data == msg) return (resp_data == msg)
def test_put_handler(ip, port, verbosity = False):
def test_put_handler(ip, port, verbosity=False):
verbose_print(verbosity, "======== PUT HANDLER TEST =============") verbose_print(verbosity, "======== PUT HANDLER TEST =============")
# Establish HTTP connection # Establish HTTP connection
verbose_print(verbosity, "Connecting to => " + ip + ":" + port) verbose_print(verbosity, "Connecting to => " + ip + ":" + port)
sess = http.client.HTTPConnection(ip + ":" + port, timeout = 15) sess = http.client.HTTPConnection(ip + ":" + port, timeout=15)
# PUT message to /ctrl to disable /hello URI handler # PUT message to /ctrl to disable /hello URI handler
verbose_print(verbosity, "Disabling /hello handler") verbose_print(verbosity, "Disabling /hello handler")
@ -114,11 +117,12 @@ def test_put_handler(ip, port, verbosity = False):
sess.close() sess.close()
return ((resp_data2 == "Hello World!") and (resp_data1 == "This URI doesn't exist")) return ((resp_data2 == "Hello World!") and (resp_data1 == "This URI doesn't exist"))
def test_custom_uri_query(ip, port, query, verbosity = False):
def test_custom_uri_query(ip, port, query, verbosity=False):
verbose_print(verbosity, "======== GET HANDLER TEST =============") verbose_print(verbosity, "======== GET HANDLER TEST =============")
# Establish HTTP connection # Establish HTTP connection
verbose_print(verbosity, "Connecting to => " + ip + ":" + port) verbose_print(verbosity, "Connecting to => " + ip + ":" + port)
sess = http.client.HTTPConnection(ip + ":" + port, timeout = 15) sess = http.client.HTTPConnection(ip + ":" + port, timeout=15)
uri = "/hello?" + query uri = "/hello?" + query
# GET hello response # GET hello response
@ -136,10 +140,11 @@ def test_custom_uri_query(ip, port, query, verbosity = False):
sess.close() sess.close()
return (resp_data == "Hello World!") return (resp_data == "Hello World!")
if __name__ == '__main__': if __name__ == '__main__':
# Configure argument parser # Configure argument parser
parser = argparse.ArgumentParser(description='Run HTTPd Test') parser = argparse.ArgumentParser(description='Run HTTPd Test')
parser.add_argument('IP' , metavar='IP' , type=str, help='Server IP') parser.add_argument('IP', metavar='IP', type=str, help='Server IP')
parser.add_argument('port', metavar='port', type=str, help='Server port') parser.add_argument('port', metavar='port', type=str, help='Server port')
parser.add_argument('msg', metavar='message', type=str, help='Message to be sent to server') parser.add_argument('msg', metavar='message', type=str, help='Message to be sent to server')
args = vars(parser.parse_args()) args = vars(parser.parse_args())
@ -149,9 +154,9 @@ if __name__ == '__main__':
port = args['port'] port = args['port']
msg = args['msg'] msg = args['msg']
if not test_get_handler (ip, port, True): if not test_get_handler(ip, port, True):
Utility.console_log("Failed!") Utility.console_log("Failed!")
if not test_post_handler(ip, port, msg, True): if not test_post_handler(ip, port, msg, True):
Utility.console_log("Failed!") Utility.console_log("Failed!")
if not test_put_handler (ip, port, True): if not test_put_handler(ip, port, True):
Utility.console_log("Failed!") Utility.console_log("Failed!")

View file

@ -2,16 +2,17 @@ import re
import os import os
import sys import sys
# this is a test case write with tiny-test-fw. try:
# to run test cases outside tiny-test-fw, import IDF
# we need to set environment variable `TEST_FW_PATH`, except ImportError:
# then get and insert `TEST_FW_PATH` to sys path before import FW module # this is a test case write with tiny-test-fw.
test_fw_path = os.getenv("TEST_FW_PATH") # to run test cases outside tiny-test-fw,
if test_fw_path and test_fw_path not in sys.path: # we need to set environment variable `TEST_FW_PATH`,
sys.path.insert(0, test_fw_path) # then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
import TinyFW if test_fw_path and test_fw_path not in sys.path:
import IDF sys.path.insert(0, test_fw_path)
import IDF
@IDF.idf_example_test(env_tag="Example_WIFI", ignore=True) @IDF.idf_example_test(env_tag="Example_WIFI", ignore=True)
@ -26,8 +27,8 @@ def test_examples_protocol_https_request(env, extra_data):
# check and log bin size # check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "https_request.bin") binary_file = os.path.join(dut1.app.binary_path, "https_request.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("https_request_bin_size", "{}KB".format(bin_size//1024)) IDF.log_performance("https_request_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("https_request_bin_size", bin_size//1024) IDF.check_performance("https_request_bin_size", bin_size // 1024)
# start test # start test
dut1.start_app() dut1.start_app()
dut1.expect("Connection established...", timeout=30) dut1.expect("Connection established...", timeout=30)

View file

@ -3,38 +3,41 @@ import os
import sys import sys
import socket import socket
import time import time
import imp
import struct import struct
import dpkt, dpkt.dns import dpkt
import dpkt.dns
from threading import Thread from threading import Thread
# this is a test case write with tiny-test-fw. # this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw, # to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`, # we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module # then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH") try:
if test_fw_path and test_fw_path not in sys.path: import IDF
sys.path.insert(0, test_fw_path) except ImportError:
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import TinyFW import DUT
import IDF
g_run_server = True g_run_server = True
g_done = False g_done = False
def mdns_server(esp_host): def mdns_server(esp_host):
global g_run_server global g_run_server
global g_done global g_done
UDP_IP="0.0.0.0" UDP_IP = "0.0.0.0"
UDP_PORT=5353 UDP_PORT = 5353
MCAST_GRP = '224.0.0.251' MCAST_GRP = '224.0.0.251'
sock = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.bind( (UDP_IP,UDP_PORT) ) sock.bind((UDP_IP,UDP_PORT))
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01') dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
@ -47,30 +50,31 @@ def mdns_server(esp_host):
arr.cls = dpkt.dns.DNS_IN arr.cls = dpkt.dns.DNS_IN
arr.type = dpkt.dns.DNS_A arr.type = dpkt.dns.DNS_A
arr.name = u'tinytester.local' arr.name = u'tinytester.local'
arr.ip =socket.inet_aton('127.0.0.1') arr.ip = socket.inet_aton('127.0.0.1')
resp_dns. an.append(arr) resp_dns. an.append(arr)
sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT)) sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
while g_run_server: while g_run_server:
try: try:
m=sock.recvfrom( 1024 ); m = sock.recvfrom(1024)
dns = dpkt.dns.DNS(m[0]) dns = dpkt.dns.DNS(m[0])
if len(dns.qd)>0 and dns.qd[0].type == dpkt.dns.DNS_A: if len(dns.qd) > 0 and dns.qd[0].type == dpkt.dns.DNS_A:
if dns.qd[0].name == u'tinytester.local': if dns.qd[0].name == u'tinytester.local':
print (dns.__repr__(),dns.qd[0].name) print(dns.__repr__(),dns.qd[0].name)
sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT)) sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
if len(dns.an)>0 and dns.an[0].type == dpkt.dns.DNS_A: if len(dns.an) > 0 and dns.an[0].type == dpkt.dns.DNS_A:
if dns.an[0].name == esp_host + u'.local': if dns.an[0].name == esp_host + u'.local':
print("Received answer esp32-mdns query") print("Received answer esp32-mdns query")
g_done = True g_done = True
print (dns.an[0].name) print(dns.an[0].name)
dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01') dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
dns.qd[0].name= esp_host + u'.local' dns.qd[0].name = esp_host + u'.local'
sock.sendto(dns.pack(),(MCAST_GRP,UDP_PORT)) sock.sendto(dns.pack(),(MCAST_GRP,UDP_PORT))
print("Sending esp32-mdns query") print("Sending esp32-mdns query")
time.sleep(0.5) time.sleep(0.5)
except socket.timeout: except socket.timeout:
break break
@IDF.idf_example_test(env_tag="Example_WIFI") @IDF.idf_example_test(env_tag="Example_WIFI")
def test_examples_protocol_mdns(env, extra_data): def test_examples_protocol_mdns(env, extra_data):
global g_run_server global g_run_server
@ -86,20 +90,19 @@ def test_examples_protocol_mdns(env, extra_data):
# check and log bin size # check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mdns-test.bin") binary_file = os.path.join(dut1.app.binary_path, "mdns-test.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("mdns-test_bin_size", "{}KB".format(bin_size//1024)) IDF.log_performance("mdns-test_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("mdns-test_bin_size", bin_size//1024) IDF.check_performance("mdns-test_bin_size", bin_size // 1024)
# 1. start mdns application # 1. start mdns application
dut1.start_app() dut1.start_app()
# 2. get the dut host name (and IP address) # 2. get the dut host name (and IP address)
specific_host = dut1.expect(re.compile(r"mdns hostname set to: \[([^\]]+)\]"), timeout=30) specific_host = dut1.expect(re.compile(r"mdns hostname set to: \[([^\]]+)\]"), timeout=30)
specific_host = str(specific_host[0]) specific_host = str(specific_host[0])
dut_ip = ""
try: try:
dut_ip = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30) dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
except DUT.ExpectTimeout: except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP') raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
# 3. check the mdns name is accessible # 3. check the mdns name is accessible
thread1 = Thread(target = mdns_server, args = (specific_host,)) thread1 = Thread(target=mdns_server, args=(specific_host,))
thread1.start() thread1.start()
start = time.time() start = time.time()
while (time.time() - start) <= 60: while (time.time() - start) <= 60:
@ -108,10 +111,11 @@ def test_examples_protocol_mdns(env, extra_data):
break break
g_run_server = False g_run_server = False
thread1.join() thread1.join()
if g_done == False: if g_done is False:
raise ValueError('Test has failed: did not receive mdns answer within timeout') raise ValueError('Test has failed: did not receive mdns answer within timeout')
# 4. check DUT output if mdns advertized host is resolved # 4. check DUT output if mdns advertized host is resolved
dut1.expect(re.compile(r"mdns-test: Query A: tinytester.local resolved to: 127.0.0.1"), timeout=30) dut1.expect(re.compile(r"mdns-test: Query A: tinytester.local resolved to: 127.0.0.1"), timeout=30)
if __name__ == '__main__': if __name__ == '__main__':
test_examples_protocol_mdns() test_examples_protocol_mdns()

View file

@ -2,45 +2,46 @@ import re
import os import os
import sys import sys
import time import time
import socket
import imp
import ssl import ssl
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
g_recv_data="" try:
g_recv_topic="" import IDF
g_broker_connected=0 except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import DUT
g_recv_data = ""
g_recv_topic = ""
g_broker_connected = 0
# The callback for when the client receives a CONNACK response from the server. # The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc): def on_connect(client, userdata, flags, rc):
global g_broker_connected global g_broker_connected
print("Connected with result code "+str(rc)) print("Connected with result code " + str(rc))
g_broker_connected = 1 g_broker_connected = 1
client.subscribe("/topic/qos0") client.subscribe("/topic/qos0")
# The callback for when a PUBLISH message is received from the server. # The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg): def on_message(client, userdata, msg):
global g_recv_topic global g_recv_topic
global g_recv_data global g_recv_data
payload = msg.payload.decode() payload = msg.payload.decode()
if g_recv_data == "" and payload == "data": if g_recv_data == "" and payload == "data":
client.publish("/topic/qos0", "data_to_esp32") client.publish("/topic/qos0", "data_to_esp32")
g_recv_topic = msg.topic g_recv_topic = msg.topic
g_recv_data = payload g_recv_data = payload
print(msg.topic+" "+str(payload)) print(msg.topic + " " + str(payload))
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
import DUT
@IDF.idf_example_test(env_tag="Example_WIFI") @IDF.idf_example_test(env_tag="Example_WIFI")
@ -48,7 +49,7 @@ def test_examples_protocol_mqtt_ssl(env, extra_data):
global g_recv_topic global g_recv_topic
global g_recv_data global g_recv_data
global g_broker_connected global g_broker_connected
broker_url="iot.eclipse.org" broker_url = "iot.eclipse.org"
""" """
steps: | steps: |
1. join AP and connects to ssl broker 1. join AP and connects to ssl broker
@ -60,8 +61,8 @@ def test_examples_protocol_mqtt_ssl(env, extra_data):
# check and log bin size # check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_ssl.bin") binary_file = os.path.join(dut1.app.binary_path, "mqtt_ssl.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("mqtt_ssl_bin_size", "{}KB".format(bin_size//1024)) IDF.log_performance("mqtt_ssl_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("mqtt_ssl_size", bin_size//1024) IDF.check_performance("mqtt_ssl_size", bin_size // 1024)
# 1. start test (and check the environment is healthy) # 1. start test (and check the environment is healthy)
dut1.start_app() dut1.start_app()
client = None client = None
@ -73,15 +74,15 @@ def test_examples_protocol_mqtt_ssl(env, extra_data):
client.on_connect = on_connect client.on_connect = on_connect
client.on_message = on_message client.on_message = on_message
client.tls_set(None, client.tls_set(None,
None, None,
None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None) None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)
client.tls_insecure_set(True) client.tls_insecure_set(True)
print("Connecting...") print("Connecting...")
client.connect(broker_url, 8883, 60) client.connect(broker_url, 8883, 60)
print("...done") print("...done")
except DUT.ExpectTimeout: except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP') raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
except: except Exception:
print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_url, sys.exc_info()[0])) print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_url, sys.exc_info()[0]))
raise raise
print("Start Looping...") print("Start Looping...")
@ -92,7 +93,7 @@ def test_examples_protocol_mqtt_ssl(env, extra_data):
if g_broker_connected == 0: if g_broker_connected == 0:
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url)) raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url))
# 3. check the message received back from the server # 3. check the message received back from the server
if g_recv_topic == "/topic/qos0" and g_recv_data == "data" : if g_recv_topic == "/topic/qos0" and g_recv_data == "data":
print("PASS: Received correct message") print("PASS: Received correct message")
else: else:
print("Failure!") print("Failure!")
@ -100,5 +101,6 @@ def test_examples_protocol_mqtt_ssl(env, extra_data):
# 4. check that the esp32 client received data sent by this python client # 4. check that the esp32 client received data sent by this python client
dut1.expect(re.compile(r"DATA=data_to_esp32"), timeout=30) dut1.expect(re.compile(r"DATA=data_to_esp32"), timeout=30)
if __name__ == '__main__': if __name__ == '__main__':
test_examples_protocol_mqtt_ssl() test_examples_protocol_mqtt_ssl()

View file

@ -1,36 +1,53 @@
import re import re
import os import os
import sys import sys
from socket import * import socket
from threading import Thread from threading import Thread
import struct import struct
import time import time
msgid=-1
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import DUT
msgid = -1
def get_my_ip(): def get_my_ip():
s1 = socket(AF_INET, SOCK_DGRAM) s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s1.connect(("8.8.8.8", 80)) s1.connect(("8.8.8.8", 80))
my_ip = s1.getsockname()[0] my_ip = s1.getsockname()[0]
s1.close() s1.close()
return my_ip return my_ip
def mqqt_server_sketch(my_ip, port): def mqqt_server_sketch(my_ip, port):
global msgid global msgid
print("Starting the server on {}".format(my_ip)) print("Starting the server on {}".format(my_ip))
s = None s = None
try: try:
s=socket(AF_INET, SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(60) s.settimeout(60)
s.bind((my_ip, port)) s.bind((my_ip, port))
s.listen(1) s.listen(1)
q,addr=s.accept() q,addr = s.accept()
q.settimeout(30) q.settimeout(30)
print("connection accepted") print("connection accepted")
except: except Exception:
print("Local server on {}:{} listening/accepting failure: {}" print("Local server on {}:{} listening/accepting failure: {}"
"Possibly check permissions or firewall settings" "Possibly check permissions or firewall settings"
"to accept connections on this address".format(my_ip, port, sys.exc_info()[0])) "to accept connections on this address".format(my_ip, port, sys.exc_info()[0]))
raise raise
data = q.recv(1024) data = q.recv(1024)
# check if received initial empty message # check if received initial empty message
@ -47,20 +64,6 @@ def mqqt_server_sketch(my_ip, port):
s.close() s.close()
print("server closed") print("server closed")
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
import DUT
@IDF.idf_example_test(env_tag="Example_WIFI") @IDF.idf_example_test(env_tag="Example_WIFI")
def test_examples_protocol_mqtt_qos1(env, extra_data): def test_examples_protocol_mqtt_qos1(env, extra_data):
@ -76,14 +79,14 @@ def test_examples_protocol_mqtt_qos1(env, extra_data):
# check and log bin size # check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_tcp.bin") binary_file = os.path.join(dut1.app.binary_path, "mqtt_tcp.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("mqtt_tcp_bin_size", "{}KB".format(bin_size//1024)) IDF.log_performance("mqtt_tcp_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("mqtt_tcp_size", bin_size//1024) IDF.check_performance("mqtt_tcp_size", bin_size // 1024)
# 1. start mqtt broker sketch # 1. start mqtt broker sketch
host_ip = get_my_ip() host_ip = get_my_ip()
thread1 = Thread(target = mqqt_server_sketch, args = (host_ip,1883)) thread1 = Thread(target=mqqt_server_sketch, args=(host_ip,1883))
thread1.start() thread1.start()
# 2. start the dut test and wait till client gets IP address # 2. start the dut test and wait till client gets IP address
dut1.start_app() dut1.start_app()
# waiting for getting the IP address # waiting for getting the IP address
try: try:
ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30) ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
@ -91,10 +94,10 @@ def test_examples_protocol_mqtt_qos1(env, extra_data):
except DUT.ExpectTimeout: except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP') raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
print ("writing to device: {}".format("mqtt://" + host_ip + "\n")) print("writing to device: {}".format("mqtt://" + host_ip + "\n"))
dut1.write("mqtt://" + host_ip + "\n") dut1.write("mqtt://" + host_ip + "\n")
thread1.join() thread1.join()
print ("Message id received from server: {}".format(msgid)) print("Message id received from server: {}".format(msgid))
# 3. check the message id was enqueued and then deleted # 3. check the message id was enqueued and then deleted
msgid_enqueued = dut1.expect(re.compile(r"OUTBOX: ENQUEUE msgid=([0-9]+)"), timeout=30) msgid_enqueued = dut1.expect(re.compile(r"OUTBOX: ENQUEUE msgid=([0-9]+)"), timeout=30)
msgid_deleted = dut1.expect(re.compile(r"OUTBOX: DELETED msgid=([0-9]+)"), timeout=30) msgid_deleted = dut1.expect(re.compile(r"OUTBOX: DELETED msgid=([0-9]+)"), timeout=30)
@ -105,5 +108,6 @@ def test_examples_protocol_mqtt_qos1(env, extra_data):
print("Failure!") print("Failure!")
raise ValueError('Mismatch of msgid: received: {}, enqueued {}, deleted {}'.format(msgid, msgid_enqueued, msgid_deleted)) raise ValueError('Mismatch of msgid: received: {}, enqueued {}, deleted {}'.format(msgid, msgid_enqueued, msgid_deleted))
if __name__ == '__main__': if __name__ == '__main__':
test_examples_protocol_mqtt_qos1() test_examples_protocol_mqtt_qos1()

View file

@ -5,43 +5,46 @@ import re
import os import os
import sys import sys
import time import time
import socket
import imp
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
g_recv_data="" try:
g_recv_topic="" import IDF
g_broker_connected=0 except Exception:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import DUT
g_recv_data = ""
g_recv_topic = ""
g_broker_connected = 0
# The callback for when the client receives a CONNACK response from the server. # The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc): def on_connect(client, userdata, flags, rc):
global g_broker_connected global g_broker_connected
print("Connected with result code "+str(rc)) print("Connected with result code " + str(rc))
g_broker_connected = 1 g_broker_connected = 1
client.subscribe("/topic/qos0") client.subscribe("/topic/qos0")
# The callback for when a PUBLISH message is received from the server. # The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg): def on_message(client, userdata, msg):
global g_recv_topic global g_recv_topic
global g_recv_data global g_recv_data
payload = msg.payload.decode() payload = msg.payload.decode()
if g_recv_data == "" and payload == "data": if g_recv_data == "" and payload == "data":
client.publish("/topic/qos0", "data_to_esp32") client.publish("/topic/qos0", "data_to_esp32")
g_recv_topic = msg.topic g_recv_topic = msg.topic
g_recv_data = payload g_recv_data = payload
print(msg.topic+" "+payload) print(msg.topic + " " + payload)
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
import DUT
@IDF.idf_example_test(env_tag="Example_WIFI") @IDF.idf_example_test(env_tag="Example_WIFI")
@ -49,7 +52,7 @@ def test_examples_protocol_mqtt_ws(env, extra_data):
global g_recv_topic global g_recv_topic
global g_recv_data global g_recv_data
global g_broker_connected global g_broker_connected
broker_url="iot.eclipse.org" broker_url = "iot.eclipse.org"
""" """
steps: | steps: |
1. join AP and connects to ws broker 1. join AP and connects to ws broker
@ -61,8 +64,8 @@ def test_examples_protocol_mqtt_ws(env, extra_data):
# check and log bin size # check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_websocket.bin") binary_file = os.path.join(dut1.app.binary_path, "mqtt_websocket.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("mqtt_websocket_bin_size", "{}KB".format(bin_size//1024)) IDF.log_performance("mqtt_websocket_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("mqtt_websocket_size", bin_size//1024) IDF.check_performance("mqtt_websocket_size", bin_size // 1024)
# 1. start test (and check the environment is healthy) # 1. start test (and check the environment is healthy)
dut1.start_app() dut1.start_app()
client = None client = None
@ -79,7 +82,7 @@ def test_examples_protocol_mqtt_ws(env, extra_data):
print("...done") print("...done")
except DUT.ExpectTimeout: except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP') raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
except: except Exception:
print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_url, sys.exc_info()[0])) print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_url, sys.exc_info()[0]))
raise raise
print("Start Looping...") print("Start Looping...")
@ -90,7 +93,7 @@ def test_examples_protocol_mqtt_ws(env, extra_data):
if g_broker_connected == 0: if g_broker_connected == 0:
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url)) raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url))
# 3. check the message received back from the server # 3. check the message received back from the server
if g_recv_topic == "/topic/qos0" and g_recv_data == "data" : if g_recv_topic == "/topic/qos0" and g_recv_data == "data":
print("PASS: Received correct message") print("PASS: Received correct message")
else: else:
print("Failure!") print("Failure!")
@ -98,5 +101,6 @@ def test_examples_protocol_mqtt_ws(env, extra_data):
# 4. check that the esp32 client received data sent by this python client # 4. check that the esp32 client received data sent by this python client
dut1.expect(re.compile(r"DATA=data_to_esp32"), timeout=30) dut1.expect(re.compile(r"DATA=data_to_esp32"), timeout=30)
if __name__ == '__main__': if __name__ == '__main__':
test_examples_protocol_mqtt_ws() test_examples_protocol_mqtt_ws()

View file

@ -3,45 +3,46 @@ import re
import os import os
import sys import sys
import time import time
import socket
import imp
import ssl import ssl
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
g_recv_data="" try:
g_recv_topic="" import IDF
g_broker_connected=0 except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import DUT
g_recv_data = ""
g_recv_topic = ""
g_broker_connected = 0
# The callback for when the client receives a CONNACK response from the server. # The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc): def on_connect(client, userdata, flags, rc):
global g_broker_connected global g_broker_connected
print("Connected with result code "+str(rc)) print("Connected with result code " + str(rc))
g_broker_connected = 1 g_broker_connected = 1
client.subscribe("/topic/qos0") client.subscribe("/topic/qos0")
# The callback for when a PUBLISH message is received from the server. # The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg): def on_message(client, userdata, msg):
global g_recv_topic global g_recv_topic
global g_recv_data global g_recv_data
payload = msg.payload.decode() payload = msg.payload.decode()
if g_recv_data == "" and payload == "data": if g_recv_data == "" and payload == "data":
client.publish("/topic/qos0", "data_to_esp32") client.publish("/topic/qos0", "data_to_esp32")
g_recv_topic = msg.topic g_recv_topic = msg.topic
g_recv_data = payload g_recv_data = payload
print(msg.topic+" "+str(payload)) print(msg.topic + " " + str(payload))
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
import DUT
@IDF.idf_example_test(env_tag="Example_WIFI") @IDF.idf_example_test(env_tag="Example_WIFI")
@ -49,7 +50,7 @@ def test_examples_protocol_mqtt_wss(env, extra_data):
global g_recv_topic global g_recv_topic
global g_recv_data global g_recv_data
global g_broker_connected global g_broker_connected
broker_url="iot.eclipse.org" broker_url = "iot.eclipse.org"
""" """
steps: | steps: |
1. join AP and connects to wss broker 1. join AP and connects to wss broker
@ -61,8 +62,8 @@ def test_examples_protocol_mqtt_wss(env, extra_data):
# check and log bin size # check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_websocket_secure.bin") binary_file = os.path.join(dut1.app.binary_path, "mqtt_websocket_secure.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("mqtt_websocket_secure_bin_size", "{}KB".format(bin_size//1024)) IDF.log_performance("mqtt_websocket_secure_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("mqtt_websocket_secure_size", bin_size//1024) IDF.check_performance("mqtt_websocket_secure_size", bin_size // 1024)
# 1. start test (and check the environment is healthy) # 1. start test (and check the environment is healthy)
dut1.start_app() dut1.start_app()
client = None client = None
@ -74,14 +75,14 @@ def test_examples_protocol_mqtt_wss(env, extra_data):
client.on_connect = on_connect client.on_connect = on_connect
client.on_message = on_message client.on_message = on_message
client.tls_set(None, client.tls_set(None,
None, None,
None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None) None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)
print("Connecting...") print("Connecting...")
client.connect(broker_url, 443, 60) client.connect(broker_url, 443, 60)
print("...done") print("...done")
except DUT.ExpectTimeout: except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP') raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
except: except Exception:
print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_url, sys.exc_info()[0])) print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_url, sys.exc_info()[0]))
raise raise
print("Start Looping...") print("Start Looping...")
@ -92,7 +93,7 @@ def test_examples_protocol_mqtt_wss(env, extra_data):
if g_broker_connected == 0: if g_broker_connected == 0:
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url)) raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url))
# 3. check the message received back from the server # 3. check the message received back from the server
if g_recv_topic == "/topic/qos0" and g_recv_data == "data" : if g_recv_topic == "/topic/qos0" and g_recv_data == "data":
print("PASS: Received correct message") print("PASS: Received correct message")
else: else:
print("Failure!") print("Failure!")
@ -100,5 +101,6 @@ def test_examples_protocol_mqtt_wss(env, extra_data):
# 4. check that the esp32 client received data sent by this python client # 4. check that the esp32 client received data sent by this python client
dut1.expect(re.compile(r"DATA=data_to_esp32"), timeout=30) dut1.expect(re.compile(r"DATA=data_to_esp32"), timeout=30)
if __name__ == '__main__': if __name__ == '__main__':
test_examples_protocol_mqtt_wss() test_examples_protocol_mqtt_wss()

View file

@ -10,8 +10,8 @@ from builtins import input
import socket import socket
import sys import sys
# ----------- Config ---------- # ----------- Config ----------
PORT = 3333; PORT = 3333
IP_VERSION = 'IPv4' IP_VERSION = 'IPv4'
IPV4 = '192.168.0.167' IPV4 = '192.168.0.167'
IPV6 = 'FE80::32AE:A4FF:FE80:5288' IPV6 = 'FE80::32AE:A4FF:FE80:5288'
@ -31,14 +31,14 @@ try:
sock = socket.socket(family_addr, socket.SOCK_STREAM) sock = socket.socket(family_addr, socket.SOCK_STREAM)
except socket.error as msg: except socket.error as msg:
print('Could not create socket: ' + str(msg[0]) + ': ' + msg[1]) print('Could not create socket: ' + str(msg[0]) + ': ' + msg[1])
sys.exit(1); sys.exit(1)
try: try:
sock.connect((host, PORT)) sock.connect((host, PORT))
except socket.error as msg: except socket.error as msg:
print('Could not open socket: ', msg) print('Could not open socket: ', msg)
sock.close() sock.close()
sys.exit(1); sys.exit(1)
while True: while True:
msg = input('Enter message to send: ') msg = input('Enter message to send: ')
@ -46,6 +46,7 @@ while True:
msg = msg.encode() msg = msg.encode()
sock.sendall(msg) sock.sendall(msg)
data = sock.recv(1024) data = sock.recv(1024)
if not data: break; if not data:
print( 'Reply: ' + data.decode()) break
sock.close() print('Reply: ' + data.decode())
sock.close()

View file

@ -9,9 +9,9 @@
import socket import socket
import sys import sys
# ----------- Config ---------- # ----------- Config ----------
IP_VERSION = 'IPv4' IP_VERSION = 'IPv4'
PORT = 3333; PORT = 3333
# ------------------------------- # -------------------------------
if IP_VERSION == 'IPv4': if IP_VERSION == 'IPv4':
@ -30,7 +30,7 @@ except socket.error as msg:
sys.exit(1) sys.exit(1)
print('Socket created') print('Socket created')
try: try:
sock.bind(('', PORT)) sock.bind(('', PORT))
print('Socket binded') print('Socket binded')
@ -45,7 +45,8 @@ except socket.error as msg:
while True: while True:
data = conn.recv(128) data = conn.recv(128)
if not data: break if not data:
break
data = data.decode() data = data.decode()
print('Received data: ' + data) print('Received data: ' + data)
reply = 'OK: ' + data reply = 'OK: ' + data

View file

@ -8,9 +8,9 @@
from builtins import input from builtins import input
import socket import socket
import sys import sys
# ----------- Config ---------- # ----------- Config ----------
PORT = 3333 PORT = 3333
IP_VERSION = 'IPv4' IP_VERSION = 'IPv4'
IPV4 = '192.168.0.167' IPV4 = '192.168.0.167'
@ -18,7 +18,7 @@ IPV6 = 'FE80::32AE:A4FF:FE80:5288'
# ------------------------------- # -------------------------------
if IP_VERSION == 'IPv4': if IP_VERSION == 'IPv4':
host = IPV4 host = IPV4
family_addr = socket.AF_INET family_addr = socket.AF_INET
elif IP_VERSION == 'IPv6': elif IP_VERSION == 'IPv6':
host = IPV6 host = IPV6
@ -29,18 +29,19 @@ else:
try: try:
sock = socket.socket(family_addr, socket.SOCK_DGRAM) sock = socket.socket(family_addr, socket.SOCK_DGRAM)
except socket.error as msg: except socket.error:
print('Failed to create socket') print('Failed to create socket')
sys.exit() sys.exit()
while True: while True:
msg = input('Enter message to send : ') msg = input('Enter message to send : ')
try: try:
sock.sendto(msg.encode(), (host, PORT)) sock.sendto(msg.encode(), (host, PORT))
reply, addr = sock.recvfrom(128) reply, addr = sock.recvfrom(128)
if not reply: break if not reply:
break
print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + str(reply)) print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + str(reply))
except socket.error as msg: except socket.error as msg:
print('Error Code : ' + str(msg[0]) + ' Message: ' + msg[1]) print('Error Code : ' + str(msg[0]) + ' Message: ' + msg[1])
sys.exit() sys.exit()

View file

@ -9,9 +9,9 @@
import socket import socket
import sys import sys
# ----------- Config ---------- # ----------- Config ----------
IP_VERSION = 'IPv4' IP_VERSION = 'IPv4'
PORT = 3333; PORT = 3333
# ------------------------------- # -------------------------------
if IP_VERSION == 'IPv4': if IP_VERSION == 'IPv4':
@ -23,10 +23,10 @@ else:
sys.exit(1) sys.exit(1)
try : try:
sock = socket.socket(family_addr, socket.SOCK_DGRAM) sock = socket.socket(family_addr, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except socket.error as msg : except socket.error as msg:
print('Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]) print('Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1])
sys.exit() sys.exit()
@ -37,10 +37,11 @@ except socket.error as msg:
sys.exit() sys.exit()
while True: while True:
try : try:
print('Waiting for data...') print('Waiting for data...')
data, addr = sock.recvfrom(1024) data, addr = sock.recvfrom(1024)
if not data: break if not data:
break
data = data.decode() data = data.decode()
print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + data) print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + data)
reply = 'OK ' + data reply = 'OK ' + data

View file

@ -3,12 +3,13 @@ import os
import sys import sys
import subprocess import subprocess
test_fw_path = os.getenv('TEST_FW_PATH') try:
if test_fw_path and test_fw_path not in sys.path: import IDF
sys.path.insert(0, test_fw_path) except ImportError:
test_fw_path = os.getenv('TEST_FW_PATH')
import TinyFW if test_fw_path and test_fw_path not in sys.path:
import IDF sys.path.insert(0, test_fw_path)
import IDF
@IDF.idf_example_test(env_tag='Example_WIFI') @IDF.idf_example_test(env_tag='Example_WIFI')

View file

@ -2,12 +2,14 @@ from __future__ import print_function
import os import os
import sys import sys
test_fw_path = os.getenv('TEST_FW_PATH') try:
if test_fw_path and test_fw_path not in sys.path: import IDF
sys.path.insert(0, test_fw_path) except ImportError:
test_fw_path = os.getenv('TEST_FW_PATH')
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import TinyFW
import IDF
@IDF.idf_example_test(env_tag='Example_WIFI') @IDF.idf_example_test(env_tag='Example_WIFI')
def test_examples_system_cpp_exceptions(env, extra_data): def test_examples_system_cpp_exceptions(env, extra_data):
@ -20,9 +22,10 @@ def test_examples_system_cpp_exceptions(env, extra_data):
'In destructor, m_arg=42', 'In destructor, m_arg=42',
'Exception caught: Exception in constructor', 'Exception caught: Exception in constructor',
'app_main done' 'app_main done'
] ]
for line in lines: for line in lines:
dut.expect(line, timeout=2) dut.expect(line, timeout=2)
if __name__ == '__main__': if __name__ == '__main__':
test_examples_system_cpp_exceptions() test_examples_system_cpp_exceptions()

View file

@ -1,18 +1,18 @@
from __future__ import print_function from __future__ import print_function
import re
import os import os
import sys import sys
# this is a test case write with tiny-test-fw. try:
# to run test cases outside tiny-test-fw, import IDF
# we need to set environment variable `TEST_FW_PATH`, except ImportError:
# then get and insert `TEST_FW_PATH` to sys path before import FW module # this is a test case write with tiny-test-fw.
test_fw_path = os.getenv('TEST_FW_PATH') # to run test cases outside tiny-test-fw,
if test_fw_path and test_fw_path not in sys.path: # we need to set environment variable `TEST_FW_PATH`,
sys.path.insert(0, test_fw_path) # then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv('TEST_FW_PATH')
import TinyFW if test_fw_path and test_fw_path not in sys.path:
import IDF sys.path.insert(0, test_fw_path)
import IDF
# Timer events # Timer events
TIMER_EVENT_LIMIT = 3 TIMER_EVENT_LIMIT = 3
@ -26,6 +26,7 @@ TASK_UNREGISTRATION_LIMIT = 3
TASK_ITERATION_POST = "TASK_EVENTS:TASK_ITERATION_EVENT: posting to default loop, {} out of " + str(TASK_ITERATION_LIMIT) TASK_ITERATION_POST = "TASK_EVENTS:TASK_ITERATION_EVENT: posting to default loop, {} out of " + str(TASK_ITERATION_LIMIT)
TASK_ITERATION_HANDLING = "TASK_EVENTS:TASK_ITERATION_EVENT: task_iteration_handler, executed {} times" TASK_ITERATION_HANDLING = "TASK_EVENTS:TASK_ITERATION_EVENT: task_iteration_handler, executed {} times"
def _test_timer_events(dut): def _test_timer_events(dut):
dut.start_app() dut.start_app()
@ -49,13 +50,13 @@ def _test_timer_events(dut):
if expiries < TIMER_EVENT_LIMIT: if expiries < TIMER_EVENT_LIMIT:
dut.expect_all("TIMER_EVENTS:TIMER_EVENT_EXPIRY: all_event_handler", dut.expect_all("TIMER_EVENTS:TIMER_EVENT_EXPIRY: all_event_handler",
"TIMER_EVENTS:TIMER_EVENT_EXPIRY: timer_any_handler", "TIMER_EVENTS:TIMER_EVENT_EXPIRY: timer_any_handler",
TIMER_EXPIRY_HANDLING.format(expiries)) TIMER_EXPIRY_HANDLING.format(expiries))
else: else:
dut.expect_all("TIMER_EVENTS:TIMER_EVENT_STOPPED: posting to default loop", dut.expect_all("TIMER_EVENTS:TIMER_EVENT_STOPPED: posting to default loop",
"TIMER_EVENTS:TIMER_EVENT_EXPIRY: all_event_handler", "TIMER_EVENTS:TIMER_EVENT_EXPIRY: all_event_handler",
"TIMER_EVENTS:TIMER_EVENT_EXPIRY: timer_any_handler", "TIMER_EVENTS:TIMER_EVENT_EXPIRY: timer_any_handler",
TIMER_EXPIRY_HANDLING.format(expiries)) TIMER_EXPIRY_HANDLING.format(expiries))
print("Posted timer stopped event") print("Posted timer stopped event")
print("Handled timer expiry event {} out of {}".format(expiries, TIMER_EVENT_LIMIT)) print("Handled timer expiry event {} out of {}".format(expiries, TIMER_EVENT_LIMIT))
@ -64,6 +65,7 @@ def _test_timer_events(dut):
dut.expect("TIMER_EVENTS:TIMER_EVENT_STOPPED: deleted timer event source") dut.expect("TIMER_EVENTS:TIMER_EVENT_STOPPED: deleted timer event source")
print("Handled timer stopped event") print("Handled timer stopped event")
def _test_iteration_events(dut): def _test_iteration_events(dut):
dut.start_app() dut.start_app()
@ -89,6 +91,7 @@ def _test_iteration_events(dut):
dut.expect("TASK_EVENTS:TASK_ITERATION_EVENT: deleting task event source") dut.expect("TASK_EVENTS:TASK_ITERATION_EVENT: deleting task event source")
print("Deleted task event source") print("Deleted task event source")
@IDF.idf_example_test(env_tag='Example_WIFI') @IDF.idf_example_test(env_tag='Example_WIFI')
def test_default_event_loop_example(env, extra_data): def test_default_event_loop_example(env, extra_data):
dut = env.get_dut('default_event_loop', 'examples/system/event/default_event_loop') dut = env.get_dut('default_event_loop', 'examples/system/event/default_event_loop')
@ -96,5 +99,6 @@ def test_default_event_loop_example(env, extra_data):
_test_iteration_events(dut) _test_iteration_events(dut)
_test_timer_events(dut) _test_timer_events(dut)
if __name__ == '__main__': if __name__ == '__main__':
test_default_event_loop_example() test_default_event_loop_example()

View file

@ -1,24 +1,25 @@
from __future__ import print_function from __future__ import print_function
import re
import os import os
import sys import sys
# this is a test case write with tiny-test-fw. try:
# to run test cases outside tiny-test-fw, import IDF
# we need to set environment variable `TEST_FW_PATH`, except ImportError:
# then get and insert `TEST_FW_PATH` to sys path before import FW module # this is a test case write with tiny-test-fw.
test_fw_path = os.getenv('TEST_FW_PATH') # to run test cases outside tiny-test-fw,
if test_fw_path and test_fw_path not in sys.path: # we need to set environment variable `TEST_FW_PATH`,
sys.path.insert(0, test_fw_path) # then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv('TEST_FW_PATH')
import TinyFW if test_fw_path and test_fw_path not in sys.path:
import IDF sys.path.insert(0, test_fw_path)
import IDF
TASK_ITERATION_LIMIT = 10 TASK_ITERATION_LIMIT = 10
TASK_ITERATION_POSTING = "posting TASK_EVENTS:TASK_ITERATION_EVENT to {}, iteration {} out of " + str(TASK_ITERATION_LIMIT) TASK_ITERATION_POSTING = "posting TASK_EVENTS:TASK_ITERATION_EVENT to {}, iteration {} out of " + str(TASK_ITERATION_LIMIT)
TASK_ITERATION_HANDLING = "handling TASK_EVENTS:TASK_ITERATION_EVENT from {}, iteration {}" TASK_ITERATION_HANDLING = "handling TASK_EVENTS:TASK_ITERATION_EVENT from {}, iteration {}"
@IDF.idf_example_test(env_tag='Example_WIFI') @IDF.idf_example_test(env_tag='Example_WIFI')
def test_user_event_loops_example(env, extra_data): def test_user_event_loops_example(env, extra_data):
dut = env.get_dut('user_event_loops', 'examples/system/event/user_event_loops') dut = env.get_dut('user_event_loops', 'examples/system/event/user_event_loops')
@ -46,5 +47,6 @@ def test_user_event_loops_example(env, extra_data):
dut.expect("deleting task event source") dut.expect("deleting task event source")
print("Deleted task event source") print("Deleted task event source")
if __name__ == '__main__': if __name__ == '__main__':
test_user_event_loops_example() test_user_event_loops_example()

View file

@ -3,16 +3,17 @@ import re
import os import os
import sys import sys
# this is a test case write with tiny-test-fw. try:
# to run test cases outside tiny-test-fw, import IDF
# we need to set environment variable `TEST_FW_PATH`, except ImportError:
# then get and insert `TEST_FW_PATH` to sys path before import FW module # this is a test case write with tiny-test-fw.
test_fw_path = os.getenv('TEST_FW_PATH') # to run test cases outside tiny-test-fw,
if test_fw_path and test_fw_path not in sys.path: # we need to set environment variable `TEST_FW_PATH`,
sys.path.insert(0, test_fw_path) # then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv('TEST_FW_PATH')
import TinyFW if test_fw_path and test_fw_path not in sys.path:
import IDF sys.path.insert(0, test_fw_path)
import IDF
STARTING_TIMERS_REGEX = re.compile(r'Started timers, time since boot: (\d+) us') STARTING_TIMERS_REGEX = re.compile(r'Started timers, time since boot: (\d+) us')
@ -35,6 +36,7 @@ FINAL_TIMER_PERIOD = 1000000
LIGHT_SLEEP_TIME = 500000 LIGHT_SLEEP_TIME = 500000
ONE_SHOT_TIMER_PERIOD = 5000000 ONE_SHOT_TIMER_PERIOD = 5000000
@IDF.idf_example_test(env_tag='Example_WIFI') @IDF.idf_example_test(env_tag='Example_WIFI')
def test_examples_system_esp_timer(env, extra_data): def test_examples_system_esp_timer(env, extra_data):
dut = env.get_dut('esp_timer_example', 'examples/system/esp_timer') dut = env.get_dut('esp_timer_example', 'examples/system/esp_timer')
@ -43,12 +45,12 @@ def test_examples_system_esp_timer(env, extra_data):
groups = dut.expect(STARTING_TIMERS_REGEX, timeout=30) groups = dut.expect(STARTING_TIMERS_REGEX, timeout=30)
start_time = int(groups[0]) start_time = int(groups[0])
print('Start time: {} us'.format(start_time)) print('Start time: {} us'.format(start_time))
groups = dut.expect(TIMER_DUMP_LINE_REGEX, timeout=2) groups = dut.expect(TIMER_DUMP_LINE_REGEX, timeout=2)
assert(groups[0] == 'periodic' and int(groups[1]) == INITIAL_TIMER_PERIOD) assert(groups[0] == 'periodic' and int(groups[1]) == INITIAL_TIMER_PERIOD)
groups = dut.expect(TIMER_DUMP_LINE_REGEX, timeout=2) groups = dut.expect(TIMER_DUMP_LINE_REGEX, timeout=2)
assert(groups[0] == 'one-shot' and int(groups[1]) == 0) assert(groups[0] == 'one-shot' and int(groups[1]) == 0)
for i in range(0, 5): for i in range(0, 5):
groups = dut.expect(PERIODIC_TIMER_REGEX, timeout=2) groups = dut.expect(PERIODIC_TIMER_REGEX, timeout=2)
cur_time = int(groups[0]) cur_time = int(groups[0])
@ -93,5 +95,6 @@ def test_examples_system_esp_timer(env, extra_data):
dut.expect(STOP_REGEX, timeout=2) dut.expect(STOP_REGEX, timeout=2)
if __name__ == '__main__': if __name__ == '__main__':
test_examples_system_esp_timer() test_examples_system_esp_timer()

View file

@ -4,12 +4,13 @@ import os
import sys import sys
import time import time
test_fw_path = os.getenv('TEST_FW_PATH') try:
if test_fw_path and test_fw_path not in sys.path: import IDF
sys.path.insert(0, test_fw_path) except ImportError:
test_fw_path = os.getenv('TEST_FW_PATH')
import TinyFW if test_fw_path and test_fw_path not in sys.path:
import IDF sys.path.insert(0, test_fw_path)
import IDF
ENTERING_SLEEP_STR = 'Entering light sleep' ENTERING_SLEEP_STR = 'Entering light sleep'
EXIT_SLEEP_REGEX = re.compile(r'Returned from light sleep, reason: (\w+), t=(\d+) ms, slept for (\d+) ms') EXIT_SLEEP_REGEX = re.compile(r'Returned from light sleep, reason: (\w+), t=(\d+) ms, slept for (\d+) ms')
@ -17,6 +18,7 @@ WAITING_FOR_GPIO_STR = 'Waiting for GPIO0 to go high...'
WAKEUP_INTERVAL_MS = 2000 WAKEUP_INTERVAL_MS = 2000
@IDF.idf_example_test(env_tag='Example_WIFI') @IDF.idf_example_test(env_tag='Example_WIFI')
def test_examples_system_light_sleep(env, extra_data): def test_examples_system_light_sleep(env, extra_data):
dut = env.get_dut('light_sleep_example', 'examples/system/light_sleep') dut = env.get_dut('light_sleep_example', 'examples/system/light_sleep')
@ -58,5 +60,6 @@ def test_examples_system_light_sleep(env, extra_data):
assert(groups[0] == 'timer' and int(groups[2]) == WAKEUP_INTERVAL_MS) assert(groups[0] == 'timer' and int(groups[2]) == WAKEUP_INTERVAL_MS)
print('Woke up from timer again') print('Woke up from timer again')
if __name__ == '__main__': if __name__ == '__main__':
test_examples_system_light_sleep() test_examples_system_light_sleep()

View file

@ -3,16 +3,17 @@ import os
import sys import sys
import subprocess import subprocess
# this is a test case write with tiny-test-fw. try:
# to run test cases outside tiny-test-fw, import IDF
# we need to set environment variable `TEST_FW_PATH`, except ImportError:
# then get and insert `TEST_FW_PATH` to sys path before import FW module # this is a test case write with tiny-test-fw.
test_fw_path = os.getenv('TEST_FW_PATH') # to run test cases outside tiny-test-fw,
if test_fw_path and test_fw_path not in sys.path: # we need to set environment variable `TEST_FW_PATH`,
sys.path.insert(0, test_fw_path) # then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv('TEST_FW_PATH')
import TinyFW if test_fw_path and test_fw_path not in sys.path:
import IDF sys.path.insert(0, test_fw_path)
import IDF
@IDF.idf_example_test(env_tag='Example_WIFI') @IDF.idf_example_test(env_tag='Example_WIFI')

View file

@ -30,21 +30,28 @@ import sys
import time import time
import subprocess import subprocess
# add current folder to system path for importing test_report try:
sys.path.append(os.path.dirname(__file__)) import IDF
# this is a test case write with tiny-test-fw. except ImportError:
# to run test cases outside tiny-test-fw, # this is a test case write with tiny-test-fw.
# we need to set environment variable `TEST_FW_PATH`, # to run test cases outside tiny-test-fw,
# then get and insert `TEST_FW_PATH` to sys path before import FW module # we need to set environment variable `TEST_FW_PATH`,
test_fw_path = os.getenv("TEST_FW_PATH") # then get and insert `TEST_FW_PATH` to sys path before import FW module
if test_fw_path and test_fw_path not in sys.path: test_fw_path = os.getenv("TEST_FW_PATH")
sys.path.insert(0, test_fw_path) if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import IDF
import DUT import DUT
import Utility import Utility
from Utility import (Attenuator, PowerControl, LineChart) from Utility import (Attenuator, PowerControl, LineChart)
from test_report import (ThroughputForConfigsReport, ThroughputVsRssiReport)
try:
from test_report import (ThroughputForConfigsReport, ThroughputVsRssiReport)
except ImportError:
# add current folder to system path for importing test_report
sys.path.append(os.path.dirname(__file__))
from test_report import (ThroughputForConfigsReport, ThroughputVsRssiReport)
# configurations # configurations
TEST_TIME = TEST_TIMEOUT = 60 TEST_TIME = TEST_TIMEOUT = 60
@ -80,7 +87,7 @@ class TestResult(object):
BAD_POINT_PERCENTAGE_THRESHOLD = 0.3 BAD_POINT_PERCENTAGE_THRESHOLD = 0.3
# we need at least 1/2 valid points to qualify the test result # we need at least 1/2 valid points to qualify the test result
THROUGHPUT_QUALIFY_COUNT = TEST_TIME//2 THROUGHPUT_QUALIFY_COUNT = TEST_TIME // 2
def __init__(self, proto, direction, config_name): def __init__(self, proto, direction, config_name):
self.proto = proto self.proto = proto

View file

@ -88,7 +88,7 @@ class ThroughputForConfigsReport(object):
current_config = self.sdkconfigs[_config_name] current_config = self.sdkconfigs[_config_name]
if i > 0: if i > 0:
previous_config_name = self.sort_order[i-1] previous_config_name = self.sort_order[i - 1]
previous_config = self.sdkconfigs[previous_config_name] previous_config = self.sdkconfigs[previous_config_name]
else: else:
previous_config = previous_config_name = None previous_config = previous_config_name = None