OVMS3-idf/components/lwip/weekend_test/net_suite_test.py

165 lines
5.9 KiB
Python
Raw Normal View History

import re
import os
import sys
import socket
from threading import Thread, Event
import subprocess
import time
from shutil import copyfile
try:
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import DUT
import Utility
stop_sock_listener = Event()
stop_io_listener = Event()
sock = None
client_address = None
manual_test = False
def io_listener(dut1):
global sock
global client_address
data = b''
while not stop_io_listener.is_set():
try:
data = dut1.expect(re.compile(r"PacketOut:\[([a-fA-F0-9]+)\]"), timeout=5)
except DUT.ExpectTimeout:
continue
if data != () and data[0] != b'':
packet_data = data[0]
print("Packet_data>{}<".format(packet_data))
response = bytearray.fromhex(packet_data.decode())
print("Sending to socket:")
packet = ' '.join(format(x, '02x') for x in bytearray(response))
print("Packet>{}<".format(packet))
if client_address is not None:
sock.sendto(response, ('127.0.0.1', 7777))
def sock_listener(dut1):
global sock
global client_address
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(5)
server_address = '0.0.0.0'
server_port = 7771
server = (server_address, server_port)
sock.bind(server)
try:
while not stop_sock_listener.is_set():
try:
payload, client_address = sock.recvfrom(1024)
packet = ' '.join(format(x, '02x') for x in bytearray(payload))
print("Received from address {}, data {}".format(client_address, packet))
dut1.write(str.encode(packet))
except socket.timeout:
pass
finally:
sock.close()
sock = None
@IDF.idf_example_test(env_tag="Example_WIFI")
def lwip_test_suite(env, extra_data):
global stop_io_listener
global stop_sock_listener
"""
steps: |
1. Rebuilds test suite with esp32_netsuite.ttcn
2. Starts listeners on stdout and socket
3. Execute ttcn3 test suite
4. Collect result from ttcn3
"""
dut1 = env.get_dut("net_suite", "examples/system/network_tests")
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "net_suite.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("net_suite", "{}KB".format(bin_size // 1024))
IDF.check_performance("net_suite", bin_size // 1024)
dut1.start_app()
thread1 = Thread(target=sock_listener, args=(dut1, ))
thread2 = Thread(target=io_listener, args=(dut1, ))
if not manual_test:
# Variables refering to esp32 ttcn test suite
TTCN_SRC = 'esp32_netsuite.ttcn'
TTCN_CFG = 'esp32_netsuite.cfg'
# System Paths
netsuite_path = os.getenv("NETSUITE_PATH")
netsuite_src_path = os.path.join(netsuite_path, "src")
test_dir = os.path.dirname(os.path.realpath(__file__))
# Building the suite
print("Rebuilding the test suite")
print("-------------------------")
# copy esp32 specific files to ttcn net-suite dir
copyfile(os.path.join(test_dir, TTCN_SRC), os.path.join(netsuite_src_path, TTCN_SRC))
copyfile(os.path.join(test_dir, TTCN_CFG), os.path.join(netsuite_src_path, TTCN_CFG))
proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && source make.sh'],
cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output = proc.stdout.read()
print("Note: First build step we expect failure (titan/net_suite build system not suitable for multijob make)")
print(output)
proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && make'],
cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print("Note: This time all dependencies shall be generated -- multijob make shall pass")
output = proc.stdout.read()
print(output)
# Executing the test suite
thread1.start()
thread2.start()
time.sleep(2)
print("Executing the test suite")
print("------------------------")
proc = subprocess.Popen(['ttcn3_start', os.path.join(netsuite_src_path,'test_suite'), os.path.join(netsuite_src_path, TTCN_CFG)],
stdout=subprocess.PIPE)
output = proc.stdout.read()
print(output)
print("Collecting results")
print("------------------")
verdict_stats = re.search('(Verdict statistics:.*)', output)
if verdict_stats:
verdict_stats = verdict_stats.group(1)
else:
verdict_stats = b""
verdict = re.search('Overall verdict: pass', output)
if verdict:
print("Test passed!")
Utility.console_log(verdict_stats, "green")
else:
Utility.console_log(verdict_stats, "red")
raise ValueError('Test failed with: {}'.format(verdict_stats))
else:
try:
# Executing the test suite
thread1.start()
thread2.start()
time.sleep(2)
while True:
time.sleep(0.5)
except KeyboardInterrupt:
pass
print("Executing done, waiting for tests to finish")
print("-------------------------------------------")
stop_io_listener.set()
stop_sock_listener.set()
thread1.join()
thread2.join()
if __name__ == '__main__':
print("Manual execution, please build and start ttcn in a separate console")
manual_test = True
lwip_test_suite()