Add protocol example tests

This commit is contained in:
Roland Dobai 2020-05-07 12:47:46 +02:00
parent 56c307b356
commit 5f024e1cd4
14 changed files with 390 additions and 54 deletions

View file

@ -0,0 +1,42 @@
from __future__ import unicode_literals
import re
import textwrap
import ttfw_idf
@ttfw_idf.idf_example_test(env_tag='Example_WIFI')
def test_examples_cbor(env, extra_data):
dut = env.get_dut('cbor', 'examples/protocols/cbor')
dut.start_app()
dut.expect(re.compile(r'example: encoded buffer size \d+'))
dut.expect('example: convert CBOR to JSON')
parsed_info = dut.expect(re.compile(r'\[\{"chip":"(\w+)","unicore":(\w+),"ip":\[(\d+),(\d+),(\d+),(\d+)\]\},'
r'3.1400001049041748'
r',"simple\(99\)","2019-07-10 09:00:00\+0000","undefined"\]'))
dut.expect('example: decode CBOR manually')
dut.expect(re.compile(textwrap.dedent(r'''
Array\[\s+
Map{{\s+
chip\s+
{}\s+
unicore\s+
{}\s+
ip\s+
Array\[\s+
{}\s+
{}\s+
{}\s+
{}\s+
\]\s+
}}\s+
3.14\s+
simple\(99\)\s+
2019-07-10 09:00:00\+0000\s+
undefined\s+
\]'''.format(*parsed_info)).replace('{', r'\{').replace('}', r'\}')))
if __name__ == '__main__':
test_examples_cbor()

View file

@ -2,9 +2,5 @@
# in this exact order for cmake to work correctly
cmake_minimum_required(VERSION 3.5)
# (Not part of the boilerplate)
# This example uses an extra component for common functions such as Wi-Fi and Ethernet connection.
set(EXTRA_COMPONENT_DIRS $ENV{IDF_PATH}/examples/common_components/protocol_examples_common)
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
project(esp_local_ctrl)

View file

@ -10,7 +10,22 @@ Before using the example, run `idf.py menuconfig` (or `idf.py menuconfig` if usi
A python test script `scripts/esp_local_ctrl.py` has been provided for as a client side application for controlling the device over the same Wi-Fi network. The script relies on a pre-generated `main/certs/rootCA.pem` to verify the server certificate. The server side private key and certificate can also be found under `main/certs`, namely `prvtkey.pem` and `cacert.pem`.
After configuring the Wi-Fi, flashing and booting the device, run:
After configuring the Wi-Fi, flashing and booting the device, run the following command to test the device name
resolution through mDNS:
```
ping my_esp_ctrl_device.local
```
Sample output:
```
64 bytes from 192.168.32.156 (192.168.32.156): icmp_seq=1 ttl=255 time=58.1 ms
64 bytes from 192.168.32.156 (192.168.32.156): icmp_seq=2 ttl=255 time=89.9 ms
64 bytes from 192.168.32.156 (192.168.32.156): icmp_seq=3 ttl=255 time=123 ms
```
After you've tested the name resolution, run:
```
python scripts/esp_local_ctrl.py

View file

@ -0,0 +1,78 @@
from __future__ import unicode_literals
import os
import re
import sys
import ttfw_idf
@ttfw_idf.idf_example_test(env_tag='Example_WIFI')
def test_examples_esp_local_ctrl(env, extra_data):
rel_project_path = os.path.join('examples', 'protocols', 'esp_local_ctrl')
dut = env.get_dut('esp_local_ctrl', rel_project_path)
idf_path = dut.app.get_sdk_path()
dut.start_app()
dut_ip = dut.expect(re.compile(r'esp_netif_handlers: sta ip: (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'))[0]
dut.expect('esp_https_server: Starting server')
dut.expect('esp_https_server: Server listening on port 443')
dut.expect('control: esp_local_ctrl service started with name : my_esp_ctrl_device')
def dut_expect_read():
dut.expect('control: Reading property : timestamp (us)')
dut.expect('control: Reading property : property1')
dut.expect('control: Reading property : property2')
dut.expect('control: Reading property : property3')
# Running mDNS services in docker is not a trivial task. Therefore, the script won't connect to the host name but
# to IP address. However, the certificates were generated for the host name and will be rejected.
cmd = ' '.join([sys.executable, os.path.join(idf_path, rel_project_path, 'scripts/esp_local_ctrl.py'),
'--name', dut_ip,
'--dont-check-hostname']) # don't reject the certificate because of the hostname
esp_local_ctrl_log = os.path.join(idf_path, rel_project_path, 'esp_local_ctrl.log')
with ttfw_idf.CustomProcess(cmd, esp_local_ctrl_log) as ctrl_py:
def expect_properties(prop1, prop3):
dut_expect_read()
ctrl_py.pexpect_proc.expect_exact('==== Available Properties ====')
ctrl_py.pexpect_proc.expect(re.compile(r'S.N. Name\s+Type\s+Flags\s+Value'))
ctrl_py.pexpect_proc.expect(re.compile(r'\[ 1\] timestamp \(us\)\s+TIME\(us\)\s+Read-Only\s+\d+'))
ctrl_py.pexpect_proc.expect(re.compile(r'\[ 2\] property1\s+INT32\s+{}'.format(prop1)))
ctrl_py.pexpect_proc.expect(re.compile(r'\[ 3\] property2\s+BOOLEAN\s+Read-Only\s+(True)|(False)'))
ctrl_py.pexpect_proc.expect(re.compile(r'\[ 4\] property3\s+STRING\s+{}'.format(prop3)))
ctrl_py.pexpect_proc.expect_exact('Select properties to set (0 to re-read, \'q\' to quit) :')
property1 = 123456789
property3 = ''
ctrl_py.pexpect_proc.expect_exact('Connecting to {}'.format(dut_ip))
dut.expect('esp_https_server: performing session handshake', timeout=60)
expect_properties(property1, property3)
ctrl_py.pexpect_proc.sendline('1')
ctrl_py.pexpect_proc.expect_exact('Enter value to set for property (timestamp (us)) :')
ctrl_py.pexpect_proc.sendline('2')
ctrl_py.pexpect_proc.expect_exact('Failed to set values!')
dut.expect('control: timestamp (us) is read-only')
expect_properties(property1, property3)
property1 = 638
ctrl_py.pexpect_proc.sendline('2')
ctrl_py.pexpect_proc.expect_exact('Enter value to set for property (property1) :')
ctrl_py.pexpect_proc.sendline(str(property1))
dut.expect('control: Setting property1 value to {}'.format(property1))
expect_properties(property1, property3)
property3 = 'test'
ctrl_py.pexpect_proc.sendline('4')
ctrl_py.pexpect_proc.expect_exact('Enter value to set for property (property3) :')
ctrl_py.pexpect_proc.sendline(property3)
dut.expect('control: Setting property3 value to {}'.format(property3))
expect_properties(property1, property3)
ctrl_py.pexpect_proc.sendline('q')
ctrl_py.pexpect_proc.expect_exact('Quitting...')
if __name__ == '__main__':
test_examples_esp_local_ctrl()

View file

@ -1,18 +1,18 @@
menu "Example Configuration"
config ESP_WIFI_SSID
config EXAMPLE_WIFI_SSID
string "WiFi SSID"
default "myssid"
help
SSID (network name) for the example to connect to.
config ESP_WIFI_PASSWORD
config EXAMPLE_WIFI_PASSWORD
string "WiFi Password"
default "mypassword"
help
WiFi password (WPA or WPA2) for the example to use.
config ESP_MAXIMUM_RETRY
config EXAMPLE_MAXIMUM_RETRY
int "Maximum retry"
default 5
help

View file

@ -16,6 +16,7 @@
#include "esp_event.h"
#include "esp_log.h"
#include "nvs_flash.h"
#include "sdkconfig.h"
#include "lwip/err.h"
#include "lwip/sys.h"
@ -25,9 +26,9 @@
If you'd rather not, just change the below entries to strings with
the config you want - ie #define EXAMPLE_WIFI_SSID "mywifissid"
*/
#define EXAMPLE_ESP_WIFI_SSID CONFIG_ESP_WIFI_SSID
#define EXAMPLE_ESP_WIFI_PASS CONFIG_ESP_WIFI_PASSWORD
#define EXAMPLE_ESP_MAXIMUM_RETRY CONFIG_ESP_MAXIMUM_RETRY
#define EXAMPLE_ESP_WIFI_SSID CONFIG_EXAMPLE_WIFI_SSID
#define EXAMPLE_ESP_WIFI_PASS CONFIG_EXAMPLE_WIFI_PASSWORD
#define EXAMPLE_ESP_MAXIMUM_RETRY CONFIG_EXAMPLE_MAXIMUM_RETRY
/* FreeRTOS event group to signal when we are connected*/
static EventGroupHandle_t s_wifi_event_group;

View file

@ -22,16 +22,14 @@ import os
import sys
import struct
import argparse
import ssl
import proto
try:
import esp_prov
except ImportError:
idf_path = os.environ['IDF_PATH']
sys.path.insert(1, idf_path + "/tools/esp_prov")
import esp_prov
# The tools directory is already in the PATH in environment prepared by install.sh which would allow to import
# esp_prov as file but not as complete module.
sys.path.insert(0, os.path.join(os.environ['IDF_PATH'], 'tools/esp_prov'))
import esp_prov # noqa: E402
# Set this to true to allow exceptions to be thrown
@ -110,7 +108,7 @@ def str_to_prop_value(prop, strval):
def prop_is_readonly(prop):
return (prop["flags"] & PROP_FLAG_READONLY) is not 0
return (prop["flags"] & PROP_FLAG_READONLY) != 0
def on_except(err):
@ -120,13 +118,15 @@ def on_except(err):
print(err)
def get_transport(sel_transport, service_name):
def get_transport(sel_transport, service_name, check_hostname):
try:
tp = None
if (sel_transport == 'http'):
example_path = os.environ['IDF_PATH'] + "/examples/protocols/esp_local_ctrl"
cert_path = example_path + "/main/certs/rootCA.pem"
tp = esp_prov.transport.Transport_HTTP(service_name, cert_path)
ssl_ctx = ssl.create_default_context(cafile=cert_path)
ssl_ctx.check_hostname = check_hostname
tp = esp_prov.transport.Transport_HTTP(service_name, ssl_ctx)
elif (sel_transport == 'ble'):
tp = esp_prov.transport.Transport_BLE(
devname=service_name, service_uuid='0000ffff-0000-1000-8000-00805f9b34fb',
@ -199,6 +199,11 @@ if __name__ == '__main__':
parser.add_argument("--name", dest='service_name', type=str,
help="BLE Device Name / HTTP Server hostname or IP", default='')
parser.add_argument("--dont-check-hostname", action="store_true",
# If enabled, the certificate won't be rejected for hostname mismatch.
# This option is hidden because it should be used only for testing purposes.
help=argparse.SUPPRESS)
parser.add_argument("-v", "--verbose", dest='verbose', help="increase output verbosity", action="store_true")
args = parser.parse_args()
@ -210,7 +215,7 @@ if __name__ == '__main__':
if args.transport == 'http':
args.service_name += '.local'
obj_transport = get_transport(args.transport, args.service_name)
obj_transport = get_transport(args.transport, args.service_name, not args.dont_check_hostname)
if obj_transport is None:
print("---- Invalid transport ----")
exit(1)

View file

@ -0,0 +1,27 @@
from __future__ import unicode_literals
import re
import ttfw_idf
@ttfw_idf.idf_example_test(env_tag='Example_WIFI')
def test_examples_icmp_echo(env, extra_data):
dut = env.get_dut('icmp_echo', 'examples/protocols/icmp_echo')
dut.start_app()
dut.expect('example_connect: Connected to')
dut.expect('esp>')
dut.write('ping www.espressif.com')
ip_re = r'\.'.join((r'\d{1,3}',) * 4)
ip = dut.expect(re.compile(r'64 bytes from ({}) icmp_seq=1 ttl=\d+ time=\d+ ms'.format(ip_re)))[0]
# expect at least one more (there could be lost packets)
dut.expect(re.compile(r'64 bytes from {} icmp_seq=[2-5] ttl=49 time='.format(ip)))
dut.expect(re.compile(r'5 packets transmitted, [2-5] received, \d{1,3}% packet loss'))
dut.write('')
dut.expect('esp>')
if __name__ == '__main__':
test_examples_icmp_echo()

View file

@ -0,0 +1,102 @@
from __future__ import unicode_literals
from tiny_test_fw import Utility
import os
import serial
import threading
import time
import ttfw_idf
class SerialThread(object):
'''
Connect to serial port and fake responses just like from a real modem
'''
# Dictionary for transforming received AT command to expected response
AT_FSM = {b'AT+CGMM': b'0G Dummy Model',
b'AT+CGSN': b'0123456789',
b'AT+CIMI': b'ESP',
b'AT+COPS?': b'+COPS: 0,0,"ESP Network"',
b'AT+CSQ': b'+CSQ: 4,0',
b'AT+CBC': b'+CBC: 0,50',
b'ATD*99***1#': b'CONNECT',
}
def run(self, log_path, exit_event):
with serial.Serial(self.port, 115200) as ser, open(log_path, 'w') as f:
buff = b''
while not exit_event.is_set():
time.sleep(0.1)
buff += ser.read(ser.in_waiting)
if not buff.endswith(b'\r'):
continue # read more because the complete command wasn't yet received
cmd_list = buff.split(b'\r')
buff = b''
for cmd in cmd_list:
if len(cmd) == 0:
continue
snd = self.AT_FSM.get(cmd, b'')
if snd != b'':
snd += b'\n'
snd += b'OK\n'
f.write('Received: {}\n'.format(repr(cmd.decode())))
f.write('Sent: {}\n'.format(repr(snd.decode())))
ser.write(snd)
def __init__(self, port, log_path):
self.port = port
self.exit_event = threading.Event()
self.t = threading.Thread(target=self.run, args=(log_path, self.exit_event,))
self.t.start()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.exit_event.set()
self.t.join(60)
if self.t.is_alive():
Utility.console_log('The serial thread is still alive', 'O')
@ttfw_idf.idf_example_test(env_tag='Example_PPP')
def test_examples_pppos_client(env, extra_data):
rel_project_path = 'examples/protocols/pppos_client'
dut = env.get_dut('pppos_client', rel_project_path)
project_path = os.path.join(dut.app.get_sdk_path(), rel_project_path)
modem_port = '/dev/ttyUSB{}'.format(0 if dut.port.endswith('1') else 1)
with SerialThread(modem_port, os.path.join(project_path, 'serial.log')):
dut.start_app()
dut.expect_all('pppos_example: Module: 0G Dummy Model',
'pppos_example: Operator: "ESP Network"',
'pppos_example: IMEI: 0123456789',
'pppos_example: IMSI: ESP',
'pppos_example: rssi: 4, ber: 0',
'pppos_example: Battery voltage: 0 mV',
'pppos_example: Modem PPP Started',
timeout=60)
cmd = ('pppd {} 115200 10.0.0.1:10.0.0.2 logfile {} local noauth debug nocrtscts nodetach +ipv6'
''.format(modem_port, os.path.join(project_path, 'ppp.log')))
with ttfw_idf.CustomProcess(cmd, '/dev/null'): # Nothing is printed here
dut.expect_all('pppos_example: Modem Connect to PPP Server',
'pppos_example: IP : 10.0.0.2',
'pppos_example: Netmask : 255.255.255.255',
'pppos_example: Gateway : 10.0.0.1',
'pppos_example: Name Server1: 0.0.0.0',
'pppos_example: Name Server2: 0.0.0.0',
'pppos_example: GOT ip event!!!',
'pppos_example: MQTT other event id: 7',
# There are no fake DNS server and MQTT server set up so the example fails at this point
'TRANS_TCP: DNS lookup failed err=202 res=0x0',
'MQTT_CLIENT: Error transport connect',
'pppos_example: MQTT_EVENT_ERROR',
'pppos_example: MQTT_EVENT_DISCONNECTED')
if __name__ == '__main__':
test_examples_pppos_client()

View file

@ -257,6 +257,7 @@ void app_main(void)
#else
#error "Unsupported DCE"
#endif
assert(dce != NULL);
ESP_ERROR_CHECK(dce->set_flow_ctrl(dce, MODEM_FLOW_CONTROL_NONE));
ESP_ERROR_CHECK(dce->store_profile(dce));
/* Print Module ID, Operator, IMEI, IMSI */

View file

@ -0,0 +1 @@
CONFIG_ESP_SYSTEM_PANIC_PRINT_HALT=y

View file

@ -0,0 +1,49 @@
from __future__ import unicode_literals
from tiny_test_fw import Utility
import datetime
import re
import ttfw_idf
@ttfw_idf.idf_example_test(env_tag='Example_WIFI')
def test_examples_sntp(env, extra_data):
dut = env.get_dut('sntp', 'examples/protocols/sntp')
dut.start_app()
dut.expect_all('Time is not set yet. Connecting to WiFi and getting time over NTP.',
'Initializing SNTP',
'Waiting for system time to be set... (1/10)',
'Notification of a time synchronization event',
timeout=60)
TIME_FORMAT = '%a %b %d %H:%M:%S %Y'
TIME_FORMAT_REGEX = r'\w+ \w+ \d{1,2} \d{2}:\d{2}:\d{2} \d{4}'
TIME_DIFF = datetime.timedelta(seconds=10 + 2) # cpu spends 10 seconds in deep sleep
NY_time = None
SH_time = None
def check_time(prev_NY_time, prev_SH_time):
NY_str = dut.expect(re.compile(r'The current date/time in New York is: ({})'.format(TIME_FORMAT_REGEX)))[0]
SH_str = dut.expect(re.compile(r'The current date/time in Shanghai is: ({})'.format(TIME_FORMAT_REGEX)))[0]
Utility.console_log('New York: "{}"'.format(NY_str))
Utility.console_log('Shanghai: "{}"'.format(SH_str))
dut.expect('Entering deep sleep for 10 seconds')
Utility.console_log('Sleeping...')
new_NY_time = datetime.datetime.strptime(NY_str, TIME_FORMAT)
new_SH_time = datetime.datetime.strptime(SH_str, TIME_FORMAT)
# The initial time is not checked because datetime has problems with timezones
assert prev_NY_time is None or new_NY_time - prev_NY_time < TIME_DIFF
assert prev_SH_time is None or new_SH_time - prev_SH_time < TIME_DIFF
return (new_NY_time, new_SH_time)
NY_time, SH_time = check_time(NY_time, SH_time)
for i in range(2, 4):
dut.expect('example: Boot count: {}'.format(i), timeout=30)
NY_time, SH_time = check_time(NY_time, SH_time)
if __name__ == '__main__':
test_examples_sntp()

View file

@ -207,6 +207,11 @@ test_weekend_network:
example_test_001A:
extends: .example_test_template
parallel: 5
artifacts:
when: always
paths:
- $CI_PROJECT_DIR/examples/*/*/*.log
expire_in: 1 week
tags:
- ESP32
- Example_WIFI
@ -300,31 +305,6 @@ example_test_010:
- ESP32
- Example_ExtFlash
test_app_test_001:
extends: .test_app_template
tags:
- ESP32
- test_jtag_arm
artifacts:
when: always
paths:
- $CI_PROJECT_DIR/tools/test_apps/system/gdb_loadable_elf/*.log
expire_in: 1 week
variables:
SETUP_TOOLS: "1"
test_app_test_002:
extends: .test_app_template
tags:
- ESP32
- Example_WIFI
test_app_test_003:
extends: .test_app_template
tags:
- ESP32
- Example_PPP
example_test_011:
extends: .example_debug_template
tags:
@ -357,6 +337,43 @@ example_test_014:
- ESP32
- 8Mpsram
example_test_015:
extends: .example_test_template
tags:
- ESP32
- Example_PPP
artifacts:
when: always
expire_in: 1 week
paths:
- $CI_PROJECT_DIR/examples/*/*/*.log
- $LOG_PATH
test_app_test_001:
extends: .test_app_template
tags:
- ESP32
- test_jtag_arm
artifacts:
when: always
paths:
- $CI_PROJECT_DIR/tools/test_apps/system/gdb_loadable_elf/*.log
expire_in: 1 week
variables:
SETUP_TOOLS: "1"
test_app_test_002:
extends: .test_app_template
tags:
- ESP32
- Example_WIFI
test_app_test_003:
extends: .test_app_template
tags:
- ESP32
- Example_PPP
UT_001:
extends: .unit_test_template
parallel: 38

View file

@ -17,24 +17,26 @@ from __future__ import print_function
from future.utils import tobytes
import socket
import http.client
import ssl
try:
from http.client import HTTPConnection, HTTPSConnection
except ImportError:
# Python 2 fallback
from httplib import HTTPConnection, HTTPSConnection
from .transport import Transport
class Transport_HTTP(Transport):
def __init__(self, hostname, certfile=None):
def __init__(self, hostname, ssl_context=None):
try:
socket.gethostbyname(hostname.split(':')[0])
except socket.gaierror:
raise RuntimeError("Unable to resolve hostname :" + hostname)
if certfile is None:
self.conn = http.client.HTTPConnection(hostname, timeout=30)
if ssl_context is None:
self.conn = HTTPConnection(hostname, timeout=30)
else:
ssl_ctx = ssl.create_default_context(cafile=certfile)
self.conn = http.client.HTTPSConnection(hostname, context=ssl_ctx, timeout=30)
self.conn = HTTPSConnection(hostname, context=ssl_context, timeout=30)
try:
print("Connecting to " + hostname)
self.conn.connect()