74d768fe6d
Added default sdkconfig for qemu build for the mqtt publish example, Added environment configuration for running the same test on target or in qemu Updated missing example tests per latest ttfw refactoring
151 lines
5.6 KiB
Python
151 lines
5.6 KiB
Python
import re
|
|
import os
|
|
import socket
|
|
from threading import Thread, Event
|
|
import subprocess
|
|
import time
|
|
from shutil import copyfile
|
|
|
|
from tiny_test_fw import Utility, DUT
|
|
import ttfw_idf
|
|
|
|
stop_sock_listener = Event()
|
|
stop_io_listener = Event()
|
|
sock = None
|
|
client_address = None
|
|
manual_test = False
|
|
|
|
|
|
def io_listener(dut1):
|
|
global sock
|
|
global client_address
|
|
data = b''
|
|
while not stop_io_listener.is_set():
|
|
try:
|
|
data = dut1.expect(re.compile(r"PacketOut:\[([a-fA-F0-9]+)\]"), timeout=5)
|
|
except DUT.ExpectTimeout:
|
|
continue
|
|
if data != () and data[0] != b'':
|
|
packet_data = data[0]
|
|
print("Packet_data>{}<".format(packet_data))
|
|
response = bytearray.fromhex(packet_data.decode())
|
|
print("Sending to socket:")
|
|
packet = ' '.join(format(x, '02x') for x in bytearray(response))
|
|
print("Packet>{}<".format(packet))
|
|
if client_address is not None:
|
|
sock.sendto(response, ('127.0.0.1', 7777))
|
|
|
|
|
|
def sock_listener(dut1):
|
|
global sock
|
|
global client_address
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.settimeout(5)
|
|
server_address = '0.0.0.0'
|
|
server_port = 7771
|
|
server = (server_address, server_port)
|
|
sock.bind(server)
|
|
try:
|
|
while not stop_sock_listener.is_set():
|
|
try:
|
|
payload, client_address = sock.recvfrom(1024)
|
|
packet = ' '.join(format(x, '02x') for x in bytearray(payload))
|
|
print("Received from address {}, data {}".format(client_address, packet))
|
|
dut1.write(str.encode(packet))
|
|
except socket.timeout:
|
|
pass
|
|
finally:
|
|
sock.close()
|
|
sock = None
|
|
|
|
|
|
@ttfw_idf.idf_example_test(env_tag="Example_WIFI")
|
|
def lwip_test_suite(env, extra_data):
|
|
global stop_io_listener
|
|
global stop_sock_listener
|
|
"""
|
|
steps: |
|
|
1. Rebuilds test suite with esp32_netsuite.ttcn
|
|
2. Starts listeners on stdout and socket
|
|
3. Execute ttcn3 test suite
|
|
4. Collect result from ttcn3
|
|
"""
|
|
dut1 = env.get_dut("net_suite", "examples/system/network_tests", dut_class=ttfw_idf.ESP32DUT)
|
|
# check and log bin size
|
|
binary_file = os.path.join(dut1.app.binary_path, "net_suite.bin")
|
|
bin_size = os.path.getsize(binary_file)
|
|
ttfw_idf.log_performance("net_suite", "{}KB".format(bin_size // 1024))
|
|
ttfw_idf.check_performance("net_suite", bin_size // 1024)
|
|
dut1.start_app()
|
|
thread1 = Thread(target=sock_listener, args=(dut1, ))
|
|
thread2 = Thread(target=io_listener, args=(dut1, ))
|
|
if not manual_test:
|
|
# Variables refering to esp32 ttcn test suite
|
|
TTCN_SRC = 'esp32_netsuite.ttcn'
|
|
TTCN_CFG = 'esp32_netsuite.cfg'
|
|
# System Paths
|
|
netsuite_path = os.getenv("NETSUITE_PATH")
|
|
netsuite_src_path = os.path.join(netsuite_path, "src")
|
|
test_dir = os.path.dirname(os.path.realpath(__file__))
|
|
# Building the suite
|
|
print("Rebuilding the test suite")
|
|
print("-------------------------")
|
|
# copy esp32 specific files to ttcn net-suite dir
|
|
copyfile(os.path.join(test_dir, TTCN_SRC), os.path.join(netsuite_src_path, TTCN_SRC))
|
|
copyfile(os.path.join(test_dir, TTCN_CFG), os.path.join(netsuite_src_path, TTCN_CFG))
|
|
proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && source make.sh'],
|
|
cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
output = proc.stdout.read()
|
|
print("Note: First build step we expect failure (titan/net_suite build system not suitable for multijob make)")
|
|
print(output)
|
|
proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && make'],
|
|
cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
print("Note: This time all dependencies shall be generated -- multijob make shall pass")
|
|
output = proc.stdout.read()
|
|
print(output)
|
|
# Executing the test suite
|
|
thread1.start()
|
|
thread2.start()
|
|
time.sleep(2)
|
|
print("Executing the test suite")
|
|
print("------------------------")
|
|
proc = subprocess.Popen(['ttcn3_start', os.path.join(netsuite_src_path,'test_suite'), os.path.join(netsuite_src_path, TTCN_CFG)],
|
|
stdout=subprocess.PIPE)
|
|
output = proc.stdout.read()
|
|
print(output)
|
|
print("Collecting results")
|
|
print("------------------")
|
|
verdict_stats = re.search('(Verdict statistics:.*)', output)
|
|
if verdict_stats:
|
|
verdict_stats = verdict_stats.group(1)
|
|
else:
|
|
verdict_stats = b""
|
|
verdict = re.search('Overall verdict: pass', output)
|
|
if verdict:
|
|
print("Test passed!")
|
|
Utility.console_log(verdict_stats, "green")
|
|
else:
|
|
Utility.console_log(verdict_stats, "red")
|
|
raise ValueError('Test failed with: {}'.format(verdict_stats))
|
|
else:
|
|
try:
|
|
# Executing the test suite
|
|
thread1.start()
|
|
thread2.start()
|
|
time.sleep(2)
|
|
while True:
|
|
time.sleep(0.5)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
print("Executing done, waiting for tests to finish")
|
|
print("-------------------------------------------")
|
|
stop_io_listener.set()
|
|
stop_sock_listener.set()
|
|
thread1.join()
|
|
thread2.join()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print("Manual execution, please build and start ttcn in a separate console")
|
|
manual_test = True
|
|
lwip_test_suite()
|