Merge branch 'test/add_iperf_example_test' into 'master'

test: add iperf example test

See merge request idf/esp-idf!1753
This commit is contained in:
He Yin Ling 2018-05-15 14:58:08 +08:00
commit 0ae6a83768
27 changed files with 1562 additions and 42 deletions

View file

@ -493,10 +493,17 @@ check_submodule_sync:
- git submodule update --init --recursive
assign_test:
<<: *build_template
tags:
- assign_test
image: $CI_DOCKER_REGISTRY/ubuntu-test-env$BOT_DOCKER_IMAGE_TAG
stage: assign_test
# gitlab ci do not support match job with RegEx or wildcard now in dependencies.
# we have a lot build example jobs. now we don't use dependencies, just download all artificats of build stage.
dependencies:
- build_ssc_00
- build_ssc_01
- build_ssc_02
- build_esp_idf_tests
variables:
UT_BIN_PATH: "tools/unit-test-app/output"
OUTPUT_BIN_PATH: "test_bins/ESP32_IDF"
@ -548,12 +555,17 @@ assign_test:
TEST_CASE_PATH: "$CI_PROJECT_DIR/examples"
CONFIG_FILE: "$CI_PROJECT_DIR/examples/test_configs/$CI_JOB_NAME.yml"
LOG_PATH: "$CI_PROJECT_DIR/TEST_LOGS"
ENV_FILE: "$CI_PROJECT_DIR/ci-test-runner-configs/$CI_RUNNER_DESCRIPTION/EnvConfig.yml"
script:
# first test if config file exists, if not exist, exit 0
- test -e $CONFIG_FILE || exit 0
# clone test env configs
- git clone $TEST_ENV_CONFIG_REPOSITORY
- cd ci-test-runner-configs
- python $CHECKOUT_REF_SCRIPT ci-test-runner-configs
- cd $TEST_FW_PATH
# run test
- python Runner.py $TEST_CASE_PATH -c $CONFIG_FILE
- python Runner.py $TEST_CASE_PATH -c $CONFIG_FILE -e $ENV_FILE
.unit_test_template: &unit_test_template
<<: *example_test_template
@ -565,6 +577,7 @@ assign_test:
TEST_CASE_PATH: "$CI_PROJECT_DIR/tools/unit-test-app"
CONFIG_FILE: "$CI_PROJECT_DIR/components/idf_test/unit_test/CIConfigs/$CI_JOB_NAME.yml"
LOG_PATH: "$CI_PROJECT_DIR/TEST_LOGS"
ENV_FILE: "$CI_PROJECT_DIR/ci-test-runner-configs/$CI_RUNNER_DESCRIPTION/EnvConfig.yml"
.test_template: &test_template
stage: test
@ -638,6 +651,12 @@ example_test_001_01:
- ESP32
- Example_WIFI
example_test_002_01:
<<: *example_test_template
tags:
- ESP32
- Example_ShieldBox
UT_001_01:
<<: *unit_test_template
tags:

View file

@ -17,4 +17,8 @@
#define IDF_PERFORMANCE_MAX_SPI_PER_TRANS_NO_POLLING 30
#define IDF_PERFORMANCE_MAX_SPI_PER_TRANS_NO_POLLING_NO_DMA 27
#define IDF_PERFORMANCE_MAX_VFS_OPEN_WRITE_CLOSE_TIME 20000
// throughput performance by iperf
#define IDF_PERFORMANCE_MIN_TCP_RX_THROUGHPUT 50
#define IDF_PERFORMANCE_MIN_TCP_TX_THROUGHPUT 40
#define IDF_PERFORMANCE_MIN_UDP_RX_THROUGHPUT 80
#define IDF_PERFORMANCE_MIN_UDP_TX_THROUGHPUT 50

View file

@ -0,0 +1,663 @@
"""
Test case for iperf example.
This test case might have problem running on windows:
1. direct use of `make`
2. use `sudo killall iperf` to force kill iperf, didn't implement windows version
The test env Example_ShieldBox do need the following config::
Example_ShieldBox:
ap_list:
- ssid: "ssid"
password: "password"
outlet: 1
apc_ip: "192.168.1.88"
attenuator_port: "/dev/ttyUSB0"
iperf: "/dev/ttyUSB1"
apc_ip: "192.168.1.88"
pc_nic: "eth0"
"""
import re
import os
import sys
import time
import subprocess
# add current folder to system path for importing test_report
sys.path.append(os.path.dirname(__file__))
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import DUT
import Utility
from Utility import (Attenuator, PowerControl, LineChart)
from test_report import (ThroughputForConfigsReport, ThroughputVsRssiReport)
# configurations
TEST_TIME = TEST_TIMEOUT = 60
WAIT_AP_POWER_ON_TIMEOUT = 90
SCAN_TIMEOUT = 3
SCAN_RETRY_COUNT = 3
RETRY_COUNT_FOR_BEST_PERFORMANCE = 2
ATTEN_VALUE_LIST = range(0, 60, 2)
# constants
FAILED_TO_SCAN_RSSI = -97
INVALID_HEAP_SIZE = 0xFFFFFFFF
PC_IPERF_TEMP_LOG_FILE = ".tmp_iperf.log"
CONFIG_NAME_PATTERN = re.compile(r"sdkconfig\.defaults\.(.+)")
# We need to auto compare the difference between adjacent configs (01 -> 00, 02 -> 01, ...) and put them to reports.
# Using numbers for config will make this easy.
# Use default value `99` for config with best performance.
BEST_PERFORMANCE_CONFIG = "99"
class TestResult(object):
""" record, analysis test result and convert data to output format """
PC_BANDWIDTH_LOG_PATTERN = re.compile(r"(\d+).0\s*-\s*(\d+).0\s+sec\s+[\d.]+\s+MBytes\s+([\d.]+)\s+Mbits/sec")
DUT_BANDWIDTH_LOG_PATTERN = re.compile(r"(\d+)-\s+(\d+)\s+sec\s+([\d.]+)\s+Mbits/sec")
ZERO_POINT_THRESHOLD = -88 # RSSI, dbm
ZERO_THROUGHPUT_THRESHOLD = -92 # RSSI, dbm
BAD_POINT_RSSI_THRESHOLD = -85 # RSSI, dbm
BAD_POINT_MIN_THRESHOLD = 3 # Mbps
BAD_POINT_PERCENTAGE_THRESHOLD = 0.3
# we need at least 1/2 valid points to qualify the test result
THROUGHPUT_QUALIFY_COUNT = TEST_TIME / 2
def __init__(self, proto, direction, config_name):
self.proto = proto
self.direction = direction
self.config_name = config_name
self.throughput_by_rssi = dict()
self.throughput_by_att = dict()
self.att_rssi_map = dict()
self.heap_size = INVALID_HEAP_SIZE
self.error_list = []
def _save_result(self, throughput, ap_ssid, att, rssi, heap_size):
"""
save the test results:
* record the better throughput if att/rssi is the same.
* record the min heap size.
"""
if ap_ssid not in self.att_rssi_map:
# for new ap, create empty dict()
self.throughput_by_att[ap_ssid] = dict()
self.throughput_by_rssi[ap_ssid] = dict()
self.att_rssi_map[ap_ssid] = dict()
self.att_rssi_map[ap_ssid][att] = rssi
def record_throughput(database, key_value):
try:
# we save the larger value for same att
if throughput > database[ap_ssid][key_value]:
database[ap_ssid][key_value] = throughput
except KeyError:
database[ap_ssid][key_value] = throughput
record_throughput(self.throughput_by_att, att)
record_throughput(self.throughput_by_rssi, rssi)
if int(heap_size) < self.heap_size:
self.heap_size = int(heap_size)
def add_result(self, raw_data, ap_ssid, att, rssi, heap_size):
"""
add result for one test
:param raw_data: iperf raw data
:param ap_ssid: ap ssid that tested
:param att: attenuate value
:param rssi: AP RSSI
:param heap_size: min heap size during test
:return: throughput
"""
fall_to_0_recorded = 0
throughput_list = []
result_list = self.PC_BANDWIDTH_LOG_PATTERN.findall(raw_data)
if not result_list:
# failed to find raw data by PC pattern, it might be DUT pattern
result_list = self.DUT_BANDWIDTH_LOG_PATTERN.findall(raw_data)
for result in result_list:
if int(result[1]) - int(result[0]) != 1:
# this could be summary, ignore this
continue
throughput_list.append(float(result[2]))
if float(result[2]) == 0 and rssi > self.ZERO_POINT_THRESHOLD \
and fall_to_0_recorded < 1:
# throughput fall to 0 error. we only record 1 records for one test
self.error_list.append("[Error][fall to 0][{}][att: {}][rssi: {}]: 0 throughput interval: {}-{}"
.format(ap_ssid, att, rssi, result[0], result[1]))
fall_to_0_recorded += 1
if len(throughput_list) > self.THROUGHPUT_QUALIFY_COUNT:
throughput = sum(throughput_list) / len(throughput_list)
else:
throughput = 0.0
if throughput == 0 and rssi > self.ZERO_THROUGHPUT_THRESHOLD:
self.error_list.append("[Error][Fatal][{}][att: {}][rssi: {}]: No throughput data found"
.format(ap_ssid, att, rssi))
self._save_result(throughput, ap_ssid, att, rssi, heap_size)
return throughput
def post_analysis(self):
"""
some rules need to be checked after we collected all test raw data:
1. throughput value 30% worse than the next point with lower RSSI
2. throughput value 30% worse than the next point with larger attenuate
"""
def analysis_bad_point(data, index_type):
for ap_ssid in data:
result_dict = data[ap_ssid]
index_list = result_dict.keys()
index_list.sort()
if index_type == "att":
index_list.reverse()
for i, index_value in enumerate(index_list[1:]):
if index_value < self.BAD_POINT_RSSI_THRESHOLD or \
result_dict[index_list[i]] < self.BAD_POINT_MIN_THRESHOLD:
continue
_percentage = result_dict[index_value] / result_dict[index_list[i]]
if _percentage < 1 - self.BAD_POINT_PERCENTAGE_THRESHOLD:
self.error_list.append("[Error][Bad point][{}][{}: {}]: drop {:.02f}%"
.format(ap_ssid, index_type, index_value,
(1 - _percentage) * 100))
analysis_bad_point(self.throughput_by_rssi, "rssi")
analysis_bad_point(self.throughput_by_att, "att")
@staticmethod
def _convert_to_draw_format(data, label):
keys = data.keys()
keys.sort()
return {
"x-axis": keys,
"y-axis": [data[x] for x in keys],
"label": label,
}
def draw_throughput_figure(self, path, ap_ssid, draw_type):
"""
:param path: folder to save figure. make sure the folder is already created.
:param ap_ssid: ap ssid string or a list of ap ssid string
:param draw_type: "att" or "rssi"
:return: file_name
"""
if draw_type == "rssi":
type_name = "RSSI"
data = self.throughput_by_rssi
elif draw_type == "att":
type_name = "Att"
data = self.throughput_by_att
else:
raise AssertionError("draw type not supported")
if isinstance(ap_ssid, list):
file_name = "ThroughputVs{}_{}_{}_{}.png".format(type_name, self.proto, self.direction,
hash(ap_ssid)[:6])
data_list = [self._convert_to_draw_format(data[_ap_ssid], _ap_ssid)
for _ap_ssid in ap_ssid]
else:
file_name = "ThroughputVs{}_{}_{}_{}.png".format(type_name, self.proto, self.direction, ap_ssid)
data_list = [self._convert_to_draw_format(data[ap_ssid], ap_ssid)]
LineChart.draw_line_chart(os.path.join(path, file_name),
"Throughput Vs {} ({} {})".format(type_name, self.proto, self.direction),
"Throughput (Mbps)",
"{} (dbm)".format(type_name),
data_list)
return file_name
def draw_rssi_vs_att_figure(self, path, ap_ssid):
"""
:param path: folder to save figure. make sure the folder is already created.
:param ap_ssid: ap to use
:return: file_name
"""
if isinstance(ap_ssid, list):
file_name = "AttVsRSSI_{}.png".format(hash(ap_ssid)[:6])
data_list = [self._convert_to_draw_format(self.att_rssi_map[_ap_ssid], _ap_ssid)
for _ap_ssid in ap_ssid]
else:
file_name = "AttVsRSSI_{}.png".format(ap_ssid)
data_list = [self._convert_to_draw_format(self.att_rssi_map[ap_ssid], ap_ssid)]
LineChart.draw_line_chart(os.path.join(path, file_name),
"Att Vs RSSI",
"Att (dbm)",
"RSSI (dbm)",
data_list)
return file_name
def get_best_throughput(self):
""" get the best throughput during test """
best_for_aps = [max(self.throughput_by_att[ap_ssid].values())
for ap_ssid in self.throughput_by_att]
return max(best_for_aps)
def __str__(self):
"""
returns summary for this test:
1. test result (success or fail)
2. best performance for each AP
3. min free heap size during test
"""
if self.throughput_by_att:
ret = "[{}_{}][{}]: {}\r\n\r\n".format(self.proto, self.direction, self.config_name,
"Fail" if self.error_list else "Success")
ret += "Performance for each AP:\r\n"
for ap_ssid in self.throughput_by_att:
ret += "[{}]: {:.02f} Mbps\r\n".format(ap_ssid, max(self.throughput_by_att[ap_ssid].values()))
if self.heap_size != INVALID_HEAP_SIZE:
ret += "Minimum heap size: {}".format(self.heap_size)
else:
ret = ""
return ret
class IperfTestUtility(object):
""" iperf test implementation """
def __init__(self, dut, config_name, ap_ssid, ap_password,
pc_nic_ip, pc_iperf_log_file, test_result=None):
self.config_name = config_name
self.dut = dut
self.pc_iperf_log_file = pc_iperf_log_file
self.ap_ssid = ap_ssid
self.ap_password = ap_password
self.pc_nic_ip = pc_nic_ip
if test_result:
self.test_result = test_result
else:
self.test_result = {
"tcp_tx": TestResult("tcp", "tx", config_name),
"tcp_rx": TestResult("tcp", "rx", config_name),
"udp_tx": TestResult("udp", "tx", config_name),
"udp_rx": TestResult("udp", "rx", config_name),
}
def setup(self):
"""
setup iperf test:
1. kill current iperf process
2. reboot DUT (currently iperf is not very robust, need to reboot DUT)
3. scan to get AP RSSI
4. connect to AP
"""
try:
subprocess.check_output("sudo killall iperf 2>&1 > /dev/null", shell=True)
except subprocess.CalledProcessError:
pass
self.dut.write("restart")
self.dut.expect("esp32>")
self.dut.write("scan {}".format(self.ap_ssid))
for _ in range(SCAN_RETRY_COUNT):
try:
rssi = int(self.dut.expect(re.compile(r"\[{}]\[rssi=(-\d+)]".format(self.ap_ssid)),
timeout=SCAN_TIMEOUT)[0])
break
except DUT.ExpectTimeout:
continue
else:
raise AssertionError("Failed to scan AP")
self.dut.write("sta {} {}".format(self.ap_ssid, self.ap_password))
dut_ip = self.dut.expect(re.compile(r"event: sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)"))[0]
return dut_ip, rssi
def _save_test_result(self, test_case, raw_data, att, rssi, heap_size):
return self.test_result[test_case].add_result(raw_data, self.ap_ssid, att, rssi, heap_size)
def _test_once(self, proto, direction):
""" do measure once for one type """
# connect and scan to get RSSI
dut_ip, rssi = self.setup()
assert direction in ["rx", "tx"]
assert proto in ["tcp", "udp"]
# run iperf test
if direction == "tx":
with open(PC_IPERF_TEMP_LOG_FILE, "w") as f:
if proto == "tcp":
process = subprocess.Popen(["iperf", "-s", "-B", self.pc_nic_ip,
"-t", str(TEST_TIME), "-i", "1", "-f", "m"],
stdout=f, stderr=f)
self.dut.write("iperf -c {} -i 1 -t {}".format(self.pc_nic_ip, TEST_TIME))
else:
process = subprocess.Popen(["iperf", "-s", "-u", "-B", self.pc_nic_ip,
"-t", str(TEST_TIME), "-i", "1", "-f", "m"],
stdout=f, stderr=f)
self.dut.write("iperf -c {} -u -i 1 -t {}".format(self.pc_nic_ip, TEST_TIME))
for _ in range(TEST_TIMEOUT):
if process.poll() is not None:
break
time.sleep(1)
else:
process.terminate()
with open(PC_IPERF_TEMP_LOG_FILE, "r") as f:
pc_raw_data = server_raw_data = f.read()
else:
with open(PC_IPERF_TEMP_LOG_FILE, "w") as f:
if proto == "tcp":
self.dut.write("iperf -s -i 1 -t {}".format(TEST_TIME))
process = subprocess.Popen(["iperf", "-c", dut_ip,
"-t", str(TEST_TIME), "-f", "m"],
stdout=f, stderr=f)
else:
self.dut.write("iperf -s -u -i 1 -t {}".format(TEST_TIME))
process = subprocess.Popen(["iperf", "-c", dut_ip, "-u", "-b", "100M",
"-t", str(TEST_TIME), "-f", "m"],
stdout=f, stderr=f)
for _ in range(TEST_TIMEOUT):
if process.poll() is not None:
break
time.sleep(1)
else:
process.terminate()
server_raw_data = self.dut.read()
with open(PC_IPERF_TEMP_LOG_FILE, "r") as f:
pc_raw_data = f.read()
# save PC iperf logs to console
with open(self.pc_iperf_log_file, "a+") as f:
f.write("## [{}] `{}`\r\n##### {}"
.format(self.config_name,
"{}_{}".format(proto, direction),
time.strftime("%m-%d %H:%M:%S", time.localtime(time.time()))))
f.write('\r\n```\r\n\r\n' + pc_raw_data + '\r\n```\r\n')
self.dut.write("heap")
heap_size = self.dut.expect(re.compile(r"min heap size: (\d+)\D"))[0]
# return server raw data (for parsing test results) and RSSI
return server_raw_data, rssi, heap_size
def run_test(self, proto, direction, atten_val):
"""
run test for one type, with specified atten_value and save the test result
:param proto: tcp or udp
:param direction: tx or rx
:param atten_val: attenuate value
"""
rssi = FAILED_TO_SCAN_RSSI
heap_size = INVALID_HEAP_SIZE
try:
server_raw_data, rssi, heap_size = self._test_once(proto, direction)
throughput = self._save_test_result("{}_{}".format(proto, direction),
server_raw_data, atten_val,
rssi, heap_size)
Utility.console_log("[{}][{}_{}][{}][{}]: {:.02f}"
.format(self.config_name, proto, direction, rssi, self.ap_ssid, throughput))
except Exception as e:
self._save_test_result("{}_{}".format(proto, direction), "", atten_val, rssi, heap_size)
Utility.console_log("Failed during test: {}".format(e))
def run_all_cases(self, atten_val):
"""
run test for all types (udp_tx, udp_rx, tcp_tx, tcp_rx).
:param atten_val: attenuate value
"""
self.run_test("tcp", "tx", atten_val)
self.run_test("tcp", "rx", atten_val)
self.run_test("udp", "tx", atten_val)
self.run_test("udp", "rx", atten_val)
def wait_ap_power_on(self):
"""
AP need to take sometime to power on. It changes for different APs.
This method will scan to check if the AP powers on.
:return: True or False
"""
self.dut.write("restart")
self.dut.expect("esp32>")
for _ in range(WAIT_AP_POWER_ON_TIMEOUT // SCAN_TIMEOUT):
try:
self.dut.write("scan {}".format(self.ap_ssid))
self.dut.expect(re.compile(r"\[{}]\[rssi=(-\d+)]".format(self.ap_ssid)),
timeout=SCAN_TIMEOUT)
ret = True
break
except DUT.ExpectTimeout:
pass
else:
ret = False
return ret
def build_iperf_with_config(config_name):
"""
we need to build iperf example with different configurations.
:param config_name: sdkconfig we want to build
"""
# switch to iperf example path before build when we're running test with Runner
example_path = os.path.dirname(__file__)
cwd = os.getcwd()
if cwd != example_path and example_path:
os.chdir(example_path)
try:
subprocess.check_call("make clean > /dev/null", shell=True)
subprocess.check_call(["cp", "sdkconfig.defaults.{}".format(config_name), "sdkconfig.defaults"])
subprocess.check_call(["rm", "-f", "sdkconfig"])
subprocess.check_call("make defconfig > /dev/null", shell=True)
# save sdkconfig to generate config comparision report
subprocess.check_call(["cp", "sdkconfig", "sdkconfig.{}".format(config_name)])
subprocess.check_call("make -j5 > /dev/null", shell=True)
subprocess.check_call("make print_flash_cmd | tail -n 1 > build/download.config", shell=True)
finally:
os.chdir(cwd)
def get_configs(env):
att_port = env.get_variable("attenuator_port")
ap_list = env.get_variable("ap_list")
pc_nic_ip = env.get_pc_nic_info("pc_nic", "ipv4")["addr"]
apc_ip = env.get_variable("apc_ip")
pc_iperf_log_file = os.path.join(env.log_path, "pc_iperf_log.md")
return att_port, ap_list, pc_nic_ip, apc_ip, pc_iperf_log_file
@IDF.idf_example_test(env_tag="Example_ShieldBox", category="stress")
def test_wifi_throughput_with_different_configs(env, extra_data):
"""
steps: |
1. build iperf with specified configs
2. test throughput for all routers
"""
att_port, ap_list, pc_nic_ip, apc_ip, pc_iperf_log_file = get_configs(env)
ap_info = ap_list[0]
config_names_raw = subprocess.check_output(["ls", os.path.dirname(os.path.abspath(__file__))])
test_result = dict()
sdkconfig_files = dict()
for config_name in CONFIG_NAME_PATTERN.findall(config_names_raw):
# 1. build config
build_iperf_with_config(config_name)
sdkconfig_files[config_name] = os.path.join(os.path.dirname(__file__),
"sdkconfig.{}".format(config_name))
# 2. get DUT and download
dut = env.get_dut("iperf", "examples/wifi/iperf")
dut.start_app()
dut.expect("esp32>")
# 3. run test for each required att value
test_result[config_name] = {
"tcp_tx": TestResult("tcp", "tx", config_name),
"tcp_rx": TestResult("tcp", "rx", config_name),
"udp_tx": TestResult("udp", "tx", config_name),
"udp_rx": TestResult("udp", "rx", config_name),
}
test_utility = IperfTestUtility(dut, config_name, ap_info["ssid"],
ap_info["password"], pc_nic_ip, pc_iperf_log_file, test_result[config_name])
PowerControl.Control.control_rest(apc_ip, ap_info["outlet"], "OFF")
PowerControl.Control.control(apc_ip, {ap_info["outlet"]: "ON"})
assert Attenuator.set_att(att_port, 0) is True
if not test_utility.wait_ap_power_on():
Utility.console_log("[{}] failed to power on, skip testing this AP"
.format(ap_info["ssid"]), color="red")
for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
test_utility.run_all_cases(0)
for result_type in test_result[config_name]:
summary = str(test_result[config_name][result_type])
if summary:
Utility.console_log(summary, color="orange")
# 4. check test results
env.close_dut("iperf")
# 5. generate report
report = ThroughputForConfigsReport(os.path.join(env.log_path, "ThroughputForConfigsReport"),
ap_info["ssid"], test_result, sdkconfig_files)
report.generate_report()
@IDF.idf_example_test(env_tag="Example_ShieldBox", category="stress")
def test_wifi_throughput_vs_rssi(env, extra_data):
"""
steps: |
1. build with best performance config
2. switch on one router
3. set attenuator value from 0-60 for each router
4. test TCP tx rx and UDP tx rx throughput
"""
att_port, ap_list, pc_nic_ip, apc_ip, pc_iperf_log_file = get_configs(env)
pc_iperf_log_file = os.path.join(env.log_path, "pc_iperf_log.md")
test_result = {
"tcp_tx": TestResult("tcp", "tx", BEST_PERFORMANCE_CONFIG),
"tcp_rx": TestResult("tcp", "rx", BEST_PERFORMANCE_CONFIG),
"udp_tx": TestResult("udp", "tx", BEST_PERFORMANCE_CONFIG),
"udp_rx": TestResult("udp", "rx", BEST_PERFORMANCE_CONFIG),
}
# 1. build config
build_iperf_with_config(BEST_PERFORMANCE_CONFIG)
# 2. get DUT and download
dut = env.get_dut("iperf", "examples/wifi/iperf")
dut.start_app()
dut.expect("esp32>")
# 3. run test for each required att value
for ap_info in ap_list:
test_utility = IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info["ssid"], ap_info["password"],
pc_nic_ip, pc_iperf_log_file, test_result)
PowerControl.Control.control_rest(apc_ip, ap_info["outlet"], "OFF")
PowerControl.Control.control(apc_ip, {ap_info["outlet"]: "ON"})
Attenuator.set_att(att_port, 0)
if not test_utility.wait_ap_power_on():
Utility.console_log("[{}] failed to power on, skip testing this AP"
.format(ap_info["ssid"]), color="red")
continue
for atten_val in ATTEN_VALUE_LIST:
assert Attenuator.set_att(att_port, atten_val) is True
test_utility.run_all_cases(atten_val)
# 4. check test results
env.close_dut("iperf")
# 5. generate report
report = ThroughputVsRssiReport(os.path.join(env.log_path, "ThroughputVsRssiReport"),
test_result)
report.generate_report()
@IDF.idf_example_test(env_tag="Example_ShieldBox")
def test_wifi_throughput_basic(env, extra_data):
"""
steps: |
1. test TCP tx rx and UDP tx rx throughput
2. compare with the pre-defined pass standard
"""
att_port, ap_list, pc_nic_ip, apc_ip, pc_iperf_log_file = get_configs(env)
ap_info = ap_list[0]
# 1. build iperf with best config
build_iperf_with_config(BEST_PERFORMANCE_CONFIG)
# 2. get DUT
dut = env.get_dut("iperf", "examples/wifi/iperf")
dut.start_app()
dut.expect("esp32>")
# 3. preparing
test_result = {
"tcp_tx": TestResult("tcp", "tx", BEST_PERFORMANCE_CONFIG),
"tcp_rx": TestResult("tcp", "rx", BEST_PERFORMANCE_CONFIG),
"udp_tx": TestResult("udp", "tx", BEST_PERFORMANCE_CONFIG),
"udp_rx": TestResult("udp", "rx", BEST_PERFORMANCE_CONFIG),
}
test_utility = IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info["ssid"],
ap_info["password"], pc_nic_ip, pc_iperf_log_file, test_result)
PowerControl.Control.control_rest(apc_ip, ap_info["outlet"], "OFF")
PowerControl.Control.control(apc_ip, {ap_info["outlet"]: "ON"})
assert Attenuator.set_att(att_port, 0) is True
if not test_utility.wait_ap_power_on():
Utility.console_log("[{}] failed to power on, skip testing this AP"
.format(ap_info["ssid"]), color="red")
# 4. run test for TCP Tx, Rx and UDP Tx, Rx
for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
test_utility.run_all_cases(0)
# 5. log performance and compare with pass standard
for throughput_type in test_result:
IDF.log_performance("{}_throughput".format(throughput_type),
"{:.02f} Mbps".format(test_result[throughput_type].get_best_throughput()))
# do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged.
for throughput_type in test_result:
IDF.check_performance("{}_throughput".format(throughput_type),
test_result[throughput_type].get_best_throughput())
env.close_dut("iperf")
if __name__ == '__main__':
test_wifi_throughput_basic(env_config_file="EnvConfig.yml")
test_wifi_throughput_with_different_configs(env_config_file="EnvConfig.yml")
test_wifi_throughput_vs_rssi(env_config_file="EnvConfig.yml")

View file

@ -37,7 +37,14 @@ typedef struct {
struct arg_str *password;
struct arg_end *end;
} wifi_args_t;
typedef struct {
struct arg_str *ssid;
struct arg_end *end;
} wifi_scan_arg_t;
static wifi_args_t sta_args;
static wifi_scan_arg_t scan_args;
static wifi_args_t ap_args;
static bool reconnect = true;
static const char *TAG="iperf";
@ -46,6 +53,27 @@ static EventGroupHandle_t wifi_event_group;
const int CONNECTED_BIT = BIT0;
const int DISCONNECTED_BIT = BIT1;
static void scan_done_handler(void)
{
uint16_t sta_number = 0;
uint8_t i;
wifi_ap_record_t *ap_list_buffer;
esp_wifi_scan_get_ap_num(&sta_number);
ap_list_buffer = malloc(sta_number * sizeof(wifi_ap_record_t));
if (ap_list_buffer == NULL) {
ESP_LOGE(TAG, "Failed to malloc buffer to print scan results");
return;
}
if (esp_wifi_scan_get_ap_records(&sta_number,(wifi_ap_record_t *)ap_list_buffer) == ESP_OK) {
for(i=0; i<sta_number; i++) {
ESP_LOGI(TAG, "[%s][rssi=%d]", ap_list_buffer[i].ssid, ap_list_buffer[i].rssi);
}
}
free(ap_list_buffer);
}
static esp_err_t event_handler(void *ctx, system_event_t *event)
{
switch(event->event_id) {
@ -53,6 +81,10 @@ static esp_err_t event_handler(void *ctx, system_event_t *event)
xEventGroupClearBits(wifi_event_group, DISCONNECTED_BIT);
xEventGroupSetBits(wifi_event_group, CONNECTED_BIT);
break;
case SYSTEM_EVENT_SCAN_DONE:
scan_done_handler();
ESP_LOGI(TAG, "sta scan done");
break;
case SYSTEM_EVENT_STA_DISCONNECTED:
if (reconnect) {
ESP_LOGI(TAG, "sta disconnect, reconnect...");
@ -131,6 +163,35 @@ static int wifi_cmd_sta(int argc, char** argv)
return 0;
}
static bool wifi_cmd_sta_scan(const char* ssid)
{
wifi_scan_config_t scan_config = { 0 };
scan_config.ssid = (uint8_t *) ssid;
ESP_ERROR_CHECK( esp_wifi_set_mode(WIFI_MODE_STA) );
ESP_ERROR_CHECK( esp_wifi_scan_start(&scan_config, false) );
return true;
}
static int wifi_cmd_scan(int argc, char** argv)
{
int nerrors = arg_parse(argc, argv, (void**) &scan_args);
if (nerrors != 0) {
arg_print_errors(stderr, scan_args.end, argv[0]);
return 1;
}
ESP_LOGI(TAG, "sta start to scan");
if ( scan_args.ssid->count == 1 ) {
wifi_cmd_sta_scan(scan_args.ssid->sval[0]);
} else {
wifi_cmd_sta_scan(NULL);
}
return 0;
}
static bool wifi_cmd_ap_set(const char* ssid, const char* pass)
{
@ -314,6 +375,13 @@ static int restart(int argc, char** argv)
ESP_LOGI(TAG, "Restarting");
esp_restart();
}
static int heap_size(int argc, char** argv)
{
uint32_t heap_size = heap_caps_get_minimum_free_size(MALLOC_CAP_DEFAULT);
ESP_LOGI(TAG, "min heap size: %u", heap_size);
return 0;
}
void register_wifi()
{
@ -331,10 +399,24 @@ void register_wifi()
ESP_ERROR_CHECK( esp_console_cmd_register(&sta_cmd) );
scan_args.ssid = arg_str0(NULL, NULL, "<ssid>", "SSID of AP want to be scanned");
scan_args.end = arg_end(1);
const esp_console_cmd_t scan_cmd = {
.command = "scan",
.help = "WiFi is station mode, start scan ap",
.hint = NULL,
.func = &wifi_cmd_scan,
.argtable = &scan_args
};
ap_args.ssid = arg_str1(NULL, NULL, "<ssid>", "SSID of AP");
ap_args.password = arg_str0(NULL, NULL, "<pass>", "password of AP");
ap_args.end = arg_end(2);
ESP_ERROR_CHECK( esp_console_cmd_register(&scan_cmd) );
const esp_console_cmd_t ap_cmd = {
.command = "ap",
.help = "AP mode, configure ssid and password",
@ -378,4 +460,12 @@ void register_wifi()
};
ESP_ERROR_CHECK( esp_console_cmd_register(&iperf_cmd) );
const esp_console_cmd_t heap_cmd = {
.command = "heap",
.help = "get min free heap size druing test",
.hint = NULL,
.func = &heap_size,
};
ESP_ERROR_CHECK( esp_console_cmd_register(&heap_cmd) );
}

View file

@ -2,6 +2,8 @@ CONFIG_ESP32_DEFAULT_CPU_FREQ_240=y
CONFIG_ESP32_DEFAULT_CPU_FREQ_MHZ=240
CONFIG_MEMMAP_SMP=y
CONFIG_SYSTEM_EVENT_TASK_STACK_SIZE=4096
CONFIG_ESP32_WIFI_STATIC_RX_BUFFER_NUM=16
CONFIG_ESP32_WIFI_DYNAMIC_RX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_DYNAMIC_TX_BUFFER_NUM=64

View file

@ -0,0 +1,6 @@
CONFIG_MEMMAP_SMP=y
CONFIG_SYSTEM_EVENT_TASK_STACK_SIZE=4096
CONFIG_INT_WDT=
CONFIG_TASK_WDT=

View file

@ -0,0 +1,25 @@
CONFIG_MEMMAP_SMP=y
CONFIG_INT_WDT=
CONFIG_TASK_WDT=
CONFIG_SYSTEM_EVENT_TASK_STACK_SIZE=4096
CONFIG_FREERTOS_UNICORE=
CONFIG_FREERTOS_HZ=1000
CONFIG_ESP32_WIFI_STATIC_RX_BUFFER_NUM=12
CONFIG_ESP32_WIFI_DYNAMIC_RX_BUFFER_NUM=48
CONFIG_ESP32_WIFI_DYNAMIC_TX_BUFFER_NUM=48
CONFIG_ESP32_WIFI_AMPDU_TX_ENABLED=y
CONFIG_ESP32_WIFI_TX_BA_WIN=12
CONFIG_ESP32_WIFI_AMPDU_RX_ENABLED=y
CONFIG_ESP32_WIFI_RX_BA_WIN=12
CONFIG_TCP_SND_BUF_DEFAULT=11488
CONFIG_TCP_WND_DEFAULT=11488
CONFIG_TCP_RECVMBOX_SIZE=12
CONFIG_UDP_RECVMBOX_SIZE=12
CONFIG_TCPIP_RECVMBOX_SIZE=48
CONFIG_LWIP_ETHARP_TRUST_IP_MAC=

View file

@ -0,0 +1,25 @@
CONFIG_MEMMAP_SMP=y
CONFIG_INT_WDT=
CONFIG_TASK_WDT=
CONFIG_SYSTEM_EVENT_TASK_STACK_SIZE=4096
CONFIG_FREERTOS_UNICORE=
CONFIG_FREERTOS_HZ=1000
CONFIG_ESP32_WIFI_STATIC_RX_BUFFER_NUM=16
CONFIG_ESP32_WIFI_DYNAMIC_RX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_DYNAMIC_TX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_AMPDU_TX_ENABLED=y
CONFIG_ESP32_WIFI_TX_BA_WIN=32
CONFIG_ESP32_WIFI_AMPDU_RX_ENABLED=y
CONFIG_ESP32_WIFI_RX_BA_WIN=32
CONFIG_TCP_SND_BUF_DEFAULT=11488
CONFIG_TCP_WND_DEFAULT=11488
CONFIG_TCP_RECVMBOX_SIZE=12
CONFIG_UDP_RECVMBOX_SIZE=12
CONFIG_TCPIP_RECVMBOX_SIZE=48
CONFIG_LWIP_ETHARP_TRUST_IP_MAC=

View file

@ -0,0 +1,25 @@
CONFIG_MEMMAP_SMP=y
CONFIG_INT_WDT=
CONFIG_TASK_WDT=
CONFIG_SYSTEM_EVENT_TASK_STACK_SIZE=4096
CONFIG_FREERTOS_UNICORE=
CONFIG_FREERTOS_HZ=1000
CONFIG_ESP32_WIFI_STATIC_RX_BUFFER_NUM=16
CONFIG_ESP32_WIFI_DYNAMIC_RX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_DYNAMIC_TX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_AMPDU_TX_ENABLED=y
CONFIG_ESP32_WIFI_TX_BA_WIN=32
CONFIG_ESP32_WIFI_AMPDU_RX_ENABLED=y
CONFIG_ESP32_WIFI_RX_BA_WIN=32
CONFIG_TCP_SND_BUF_DEFAULT=32768
CONFIG_TCP_WND_DEFAULT=32768
CONFIG_TCP_RECVMBOX_SIZE=64
CONFIG_UDP_RECVMBOX_SIZE=64
CONFIG_TCPIP_RECVMBOX_SIZE=64
CONFIG_LWIP_ETHARP_TRUST_IP_MAC=

View file

@ -0,0 +1,25 @@
CONFIG_MEMMAP_SMP=y
CONFIG_INT_WDT=
CONFIG_TASK_WDT=
CONFIG_SYSTEM_EVENT_TASK_STACK_SIZE=4096
CONFIG_FREERTOS_UNICORE=
CONFIG_FREERTOS_HZ=1000
CONFIG_ESP32_WIFI_STATIC_RX_BUFFER_NUM=16
CONFIG_ESP32_WIFI_DYNAMIC_RX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_DYNAMIC_TX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_AMPDU_TX_ENABLED=y
CONFIG_ESP32_WIFI_TX_BA_WIN=32
CONFIG_ESP32_WIFI_AMPDU_RX_ENABLED=y
CONFIG_ESP32_WIFI_RX_BA_WIN=32
CONFIG_TCP_SND_BUF_DEFAULT=65535
CONFIG_TCP_WND_DEFAULT=65535
CONFIG_TCP_RECVMBOX_SIZE=64
CONFIG_UDP_RECVMBOX_SIZE=64
CONFIG_TCPIP_RECVMBOX_SIZE=64
CONFIG_LWIP_ETHARP_TRUST_IP_MAC=

View file

@ -0,0 +1,29 @@
CONFIG_MEMMAP_SMP=y
CONFIG_INT_WDT=
CONFIG_TASK_WDT=
CONFIG_SYSTEM_EVENT_TASK_STACK_SIZE=4096
CONFIG_FREERTOS_UNICORE=
CONFIG_FREERTOS_HZ=1000
CONFIG_ESP32_WIFI_STATIC_RX_BUFFER_NUM=16
CONFIG_ESP32_WIFI_DYNAMIC_RX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_DYNAMIC_TX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_AMPDU_TX_ENABLED=y
CONFIG_ESP32_WIFI_TX_BA_WIN=32
CONFIG_ESP32_WIFI_AMPDU_RX_ENABLED=y
CONFIG_ESP32_WIFI_RX_BA_WIN=32
CONFIG_TCP_SND_BUF_DEFAULT=65535
CONFIG_TCP_WND_DEFAULT=65535
CONFIG_TCP_RECVMBOX_SIZE=64
CONFIG_UDP_RECVMBOX_SIZE=64
CONFIG_TCPIP_RECVMBOX_SIZE=64
CONFIG_LWIP_ETHARP_TRUST_IP_MAC=
CONFIG_FLASHMODE_QIO=y
CONFIG_ESPTOOLPY_FLASHFREQ_80M=y
CONFIG_LWIP_IRAM_OPTIMIZATION=y

View file

@ -0,0 +1,26 @@
CONFIG_SYSTEM_EVENT_TASK_STACK_SIZE=4096
CONFIG_ESP32_WIFI_STATIC_RX_BUFFER_NUM=16
CONFIG_ESP32_WIFI_DYNAMIC_RX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_DYNAMIC_TX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_AMPDU_TX_ENABLED=y
CONFIG_ESP32_WIFI_TX_BA_WIN=32
CONFIG_ESP32_WIFI_AMPDU_RX_ENABLED=y
CONFIG_ESP32_WIFI_RX_BA_WIN=32
CONFIG_FREERTOS_UNICORE=y
CONFIG_FREERTOS_HZ=1000
CONFIG_INT_WDT=
CONFIG_TASK_WDT=
CONFIG_TCP_SND_BUF_DEFAULT=65535
CONFIG_TCP_WND_DEFAULT=65535
CONFIG_TCP_RECVMBOX_SIZE=64
CONFIG_UDP_RECVMBOX_SIZE=64
CONFIG_TCPIP_RECVMBOX_SIZE=64
CONFIG_LWIP_ETHARP_TRUST_IP_MAC=
CONFIG_FLASHMODE_QIO=y
CONFIG_ESPTOOLPY_FLASHFREQ_80M=y
CONFIG_LWIP_IRAM_OPTIMIZATION=y

View file

@ -0,0 +1,30 @@
CONFIG_ESP32_DEFAULT_CPU_FREQ_80=y
CONFIG_ESP32_DEFAULT_CPU_FREQ_MHZ=80
CONFIG_MEMMAP_SMP=y
CONFIG_SYSTEM_EVENT_TASK_STACK_SIZE=4096
CONFIG_ESP32_WIFI_STATIC_RX_BUFFER_NUM=16
CONFIG_ESP32_WIFI_DYNAMIC_RX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_DYNAMIC_TX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_AMPDU_TX_ENABLED=y
CONFIG_ESP32_WIFI_TX_BA_WIN=32
CONFIG_ESP32_WIFI_AMPDU_RX_ENABLED=y
CONFIG_ESP32_WIFI_RX_BA_WIN=32
CONFIG_FREERTOS_UNICORE=
CONFIG_FREERTOS_HZ=1000
CONFIG_INT_WDT=
CONFIG_TASK_WDT=
CONFIG_TCP_SND_BUF_DEFAULT=65535
CONFIG_TCP_WND_DEFAULT=65535
CONFIG_TCP_RECVMBOX_SIZE=64
CONFIG_UDP_RECVMBOX_SIZE=64
CONFIG_TCPIP_RECVMBOX_SIZE=64
CONFIG_LWIP_ETHARP_TRUST_IP_MAC=
CONFIG_FLASHMODE_QIO=y
CONFIG_ESPTOOLPY_FLASHFREQ_80M=y
CONFIG_LWIP_IRAM_OPTIMIZATION=y

View file

@ -0,0 +1,30 @@
CONFIG_ESP32_DEFAULT_CPU_FREQ_240=y
CONFIG_ESP32_DEFAULT_CPU_FREQ_MHZ=240
CONFIG_MEMMAP_SMP=y
CONFIG_SYSTEM_EVENT_TASK_STACK_SIZE=4096
CONFIG_ESP32_WIFI_STATIC_RX_BUFFER_NUM=16
CONFIG_ESP32_WIFI_DYNAMIC_RX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_DYNAMIC_TX_BUFFER_NUM=64
CONFIG_ESP32_WIFI_AMPDU_TX_ENABLED=y
CONFIG_ESP32_WIFI_TX_BA_WIN=32
CONFIG_ESP32_WIFI_AMPDU_RX_ENABLED=y
CONFIG_ESP32_WIFI_RX_BA_WIN=32
CONFIG_FREERTOS_UNICORE=
CONFIG_FREERTOS_HZ=1000
CONFIG_INT_WDT=
CONFIG_TASK_WDT=
CONFIG_TCP_SND_BUF_DEFAULT=65535
CONFIG_TCP_WND_DEFAULT=65535
CONFIG_TCP_RECVMBOX_SIZE=64
CONFIG_UDP_RECVMBOX_SIZE=64
CONFIG_TCPIP_RECVMBOX_SIZE=64
CONFIG_LWIP_ETHARP_TRUST_IP_MAC=
CONFIG_FLASHMODE_QIO=y
CONFIG_ESPTOOLPY_FLASHFREQ_80M=y
CONFIG_LWIP_IRAM_OPTIMIZATION=y

View file

@ -0,0 +1,245 @@
"""
this module generates markdown format test report for throughput test.
The test report contains 2 parts:
1. throughput with different configs
2. throughput with RSSI
"""
import os
class ThroughputForConfigsReport(object):
THROUGHPUT_TYPES = ["tcp_tx", "tcp_rx", "udp_tx", "udp_rx"]
REPORT_FILE_NAME = "ThroughputForConfigs.md"
def __init__(self, output_path, ap_ssid, throughput_results, sdkconfig_files):
"""
:param ap_ssid: the ap we expected to use
:param throughput_results: config with the following type::
{
"config_name": {
"tcp_tx": result,
"tcp_rx": result,
"udp_tx": result,
"udp_rx": result,
},
"config_name2": {},
}
"""
self.output_path = output_path
self.ap_ssid = ap_ssid
self.results = throughput_results
self.sdkconfigs = dict()
for config_name in sdkconfig_files:
self.sdkconfigs[config_name] = self._parse_config_file(sdkconfig_files[config_name])
if not os.path.exists(output_path):
os.makedirs(output_path)
self.sort_order = self.sdkconfigs.keys()
self.sort_order.sort()
@staticmethod
def _parse_config_file(config_file_path):
sdkconfig = {}
with open(config_file_path, "r") as f:
for line in f:
if not line.isspace():
if line[0] == "#":
continue
name, value = line.split("=")
value = value.strip("\r\n")
sdkconfig[name] = value if value else "n"
return sdkconfig
def _generate_the_difference_between_configs(self):
"""
generate markdown list for different configs::
default: esp-idf default
low:
* `config name 1`: old value -> new value
* `config name 2`: old value -> new value
* ...
...
"""
data = "## Config Definition:\r\n\r\n"
def find_difference(base, new):
_difference = {}
all_configs = set(base.keys())
all_configs.update(set(new.keys()))
for _config in all_configs:
try:
_base_value = base[_config]
except KeyError:
_base_value = "null"
try:
_new_value = new[_config]
except KeyError:
_new_value = "null"
if _base_value != _new_value:
_difference[_config] = "{} -> {}".format(_base_value, _new_value)
return _difference
for i, _config_name in enumerate(self.sort_order):
current_config = self.sdkconfigs[_config_name]
if i > 0:
previous_config_name = self.sort_order[i-1]
previous_config = self.sdkconfigs[previous_config_name]
else:
previous_config = previous_config_name = None
if previous_config:
# log the difference
difference = find_difference(previous_config, current_config)
data += "* {} (compared to {}):\r\n".format(_config_name, previous_config_name)
for diff_name in difference:
data += " * `{}`: {}\r\n".format(diff_name, difference[diff_name])
return data
def _generate_report_for_one_type(self, throughput_type):
"""
generate markdown table with the following format::
| config name | throughput (Mbps) | free heap size (bytes) |
|-------------|-------------------|------------------------|
| default | 32.11 | 147500 |
| low | 32.11 | 147000 |
| medium | 33.22 | 120000 |
| high | 43.11 | 100000 |
| max | 45.22 | 79000 |
"""
empty = True
ret = "\r\n### {} {}\r\n\r\n".format(*throughput_type.split("_"))
ret += "| config name | throughput (Mbps) | free heap size (bytes) |\r\n"
ret += "|-------------|-------------------|------------------------|\r\n"
for config in self.sort_order:
try:
result = self.results[config][throughput_type]
throughput = "{:.02f}".format(max(result.throughput_by_att[self.ap_ssid].values()))
heap_size = str(result.heap_size)
# although markdown table will do alignment
# do align here for better text editor presentation
ret += "| {:<12}| {:<18}| {:<23}|\r\n".format(config, throughput, heap_size)
empty = False
except KeyError:
pass
return ret if not empty else ""
def generate_report(self):
data = "# Throughput for different configs\r\n"
data += "\r\nAP: {}\r\n".format(self.ap_ssid)
for throughput_type in self.THROUGHPUT_TYPES:
data += self._generate_report_for_one_type(throughput_type)
data += "\r\n------\r\n"
data += self._generate_the_difference_between_configs()
with open(os.path.join(self.output_path, self.REPORT_FILE_NAME), "w") as f:
f.write(data)
class ThroughputVsRssiReport(object):
REPORT_FILE_NAME = "ThroughputVsRssi.md"
def __init__(self, output_path, throughput_results):
"""
:param throughput_results: config with the following type::
{
"tcp_tx": result,
"tcp_rx": result,
"udp_tx": result,
"udp_rx": result,
}
"""
self.output_path = output_path
self.raw_data_path = os.path.join(output_path, "raw_data")
self.results = throughput_results
self.throughput_types = self.results.keys()
self.throughput_types.sort()
if not os.path.exists(self.raw_data_path):
os.makedirs(self.raw_data_path)
def _generate_summary(self):
"""
generate summary with the following format::
| item | curve analysis | max throughput (Mbps) |
|---------|----------------|-----------------------|
| tcp tx | Success | 32.11 |
| tcp rx | Success | 32.11 |
| udp tx | Success | 45.22 |
| udp rx | Failed | 55.44 |
"""
ret = "\r\n### Summary\r\n\r\n"
ret += "| item | curve analysis | max throughput (Mbps) |\r\n"
ret += "|---------|----------------|-----------------------|\r\n"
for _type in self.throughput_types:
result = self.results[_type]
max_throughput = 0.0
curve_analysis = "Failed" if result.error_list else "Success"
for ap_ssid in result.throughput_by_att:
_max_for_ap = max(result.throughput_by_rssi[ap_ssid].values())
if _max_for_ap > max_throughput:
max_throughput = _max_for_ap
max_throughput = "{:.02f}".format(max_throughput)
ret += "| {:<8}| {:<15}| {:<22}|\r\n".format("{}_{}".format(result.proto, result.direction),
curve_analysis, max_throughput)
return ret
def _generate_report_for_one_type(self, result):
"""
generate markdown table with the following format::
### tcp rx
Errors:
* detected error 1
* ...
AP: ap_ssid
![throughput Vs RSSI](path to figure)
AP: ap_ssid
![throughput Vs RSSI](path to figure)
"""
result.post_analysis()
ret = "\r\n### {} {}\r\n".format(result.proto, result.direction)
if result.error_list:
ret += "\r\nErrors:\r\n\r\n"
for error in result.error_list:
ret += "* " + error + "\r\n"
for ap_ssid in result.throughput_by_rssi:
ret += "\r\nAP: {}\r\n".format(ap_ssid)
# draw figure
file_name = result.draw_throughput_figure(self.raw_data_path, ap_ssid, "rssi")
result.draw_throughput_figure(self.raw_data_path, ap_ssid, "att")
result.draw_rssi_vs_att_figure(self.raw_data_path, ap_ssid)
ret += "\r\n![throughput Vs RSSI]({})\r\n".format(os.path.join("raw_data", file_name))
return ret
def generate_report(self):
data = "# Throughput Vs RSSI\r\n"
data += self._generate_summary()
for _type in self.throughput_types:
data += self._generate_report_for_one_type(self.results[_type])
with open(os.path.join(self.output_path, self.REPORT_FILE_NAME), "w") as f:
f.write(data)

View file

@ -78,9 +78,12 @@ class BaseApp(object):
if not test_suite_name:
test_suite_name = os.path.splitext(os.path.basename(sys.modules['__main__'].__file__))[0]
sdk_path = cls.get_sdk_path()
return os.path.join(sdk_path, "TEST_LOGS",
test_suite_name +
time.strftime("_%m%d_%H_%M_%S", time.localtime(LOG_FOLDER_TIMESTAMP)))
log_folder = os.path.join(sdk_path, "TEST_LOGS",
test_suite_name +
time.strftime("_%m%d_%H_%M_%S", time.localtime(LOG_FOLDER_TIMESTAMP)))
if not os.path.exists(log_folder):
os.makedirs(log_folder)
return log_folder
def process_app_info(self):
"""

View file

@ -85,6 +85,14 @@ def _decode_data(data):
return data
def _pattern_to_string(pattern):
try:
ret = "RegEx: " + pattern.pattern
except AttributeError:
ret = pattern
return ret
class _DataCache(_queue.Queue):
"""
Data cache based on Queue. Allow users to process data cache based on bytes instead of Queue."
@ -94,6 +102,21 @@ class _DataCache(_queue.Queue):
_queue.Queue.__init__(self, maxsize=maxsize)
self.data_cache = str()
def _move_from_queue_to_cache(self):
"""
move all of the available data in the queue to cache
:return: True if moved any item from queue to data cache, else False
"""
ret = False
while True:
try:
self.data_cache += _decode_data(self.get(0))
ret = True
except _queue.Empty:
break
return ret
def get_data(self, timeout=0):
"""
get a copy of data from cache.
@ -105,12 +128,16 @@ class _DataCache(_queue.Queue):
if timeout < 0:
timeout = 0
try:
data = self.get(timeout=timeout)
self.data_cache += _decode_data(data)
except _queue.Empty:
# don't do anything when on update for cache
pass
ret = self._move_from_queue_to_cache()
if not ret:
# we only wait for new data if we can't provide a new data_cache
try:
data = self.get(timeout=timeout)
self.data_cache += _decode_data(data)
except _queue.Empty:
# don't do anything when on update for cache
pass
return copy.deepcopy(self.data_cache)
def flush(self, index=0xFFFFFFFF):
@ -417,7 +444,7 @@ class BaseDUT(object):
data = self.data_cache.get_data(time.time() + timeout - start_time)
if ret is None:
raise ExpectTimeout(self.name + ": " + str(pattern))
raise ExpectTimeout(self.name + ": " + _pattern_to_string(pattern))
return ret
def _expect_multi(self, expect_all, expect_item_list, timeout):
@ -457,12 +484,11 @@ class BaseDUT(object):
if expect_item["ret"] is not None:
# match succeed for one item
matched_expect_items.append(expect_item)
break
# if expect all, then all items need to be matched,
# else only one item need to matched
if expect_all:
match_succeed = (matched_expect_items == expect_items)
match_succeed = len(matched_expect_items) == len(expect_items)
else:
match_succeed = True if matched_expect_items else False
@ -482,7 +508,7 @@ class BaseDUT(object):
# flush already matched data
self.data_cache.flush(slice_index)
else:
raise ExpectTimeout(self.name + ": " + str(expect_items))
raise ExpectTimeout(self.name + ": " + str([_pattern_to_string(x) for x in expect_items]))
@_expect_lock
def expect_any(self, *expect_items, **timeout):

View file

@ -17,6 +17,8 @@ import os
import threading
import functools
import netifaces
import EnvConfig
@ -47,12 +49,12 @@ class Env(object):
dut=None,
env_tag=None,
env_config_file=None,
test_name=None,
test_suite_name=None,
**kwargs):
self.app_cls = app
self.default_dut_cls = dut
self.config = EnvConfig.Config(env_config_file, env_tag)
self.log_path = self.app_cls.get_log_folder(test_name)
self.log_path = self.app_cls.get_log_folder(test_suite_name)
if not os.path.exists(self.log_path):
os.makedirs(self.log_path)
@ -130,17 +132,34 @@ class Env(object):
"""
return self.config.get_variable(variable_name)
PROTO_MAP = {
"ipv4": netifaces.AF_INET,
"ipv6": netifaces.AF_INET6,
"mac": netifaces.AF_LINK,
}
@_synced
def get_pc_nic_info(self, nic_name="pc_nic"):
def get_pc_nic_info(self, nic_name="pc_nic", proto="ipv4"):
"""
get_pc_nic_info(nic_name="pc_nic")
try to get nic info (ip address, ipv6 address, mac address)
try to get info of a specified NIC and protocol.
:param nic_name: pc nic name. allows passing variable name, nic name value or omitted (to get default nic info).
:return: a dict of address ("ipv4", "ipv6", "mac") if successfully found. otherwise None.
:param nic_name: pc nic name. allows passing variable name, nic name value.
:param proto: "ipv4", "ipv6" or "mac"
:return: a dict of nic info if successfully found. otherwise None.
nic info keys could be different for different protocols.
key "addr" is available for both mac, ipv4 and ipv6 pic info.
"""
# TODO: need to implement auto get nic info method
return self.config.get_variable("nic_info/" + nic_name)
interfaces = netifaces.interfaces()
if nic_name in interfaces:
# the name is in the interface list, we regard it as NIC name
if_addr = netifaces.ifaddresses(nic_name)
else:
# it's not in interface name list, we assume it's variable name
_nic_name = self.get_variable(nic_name)
if_addr = netifaces.ifaddresses(_nic_name)
return if_addr[self.PROTO_MAP[proto]][0]
@_synced
def close(self):

View file

@ -53,7 +53,7 @@ class Config(object):
try:
with open(config_file) as f:
configs = yaml.load(f)[env_name]
except (OSError, TypeError):
except (OSError, TypeError, IOError):
configs = dict()
return configs

View file

@ -40,18 +40,22 @@ class Runner(threading.Thread):
def __init__(self, test_case, case_config, env_config_file=None):
super(Runner, self).__init__()
self.setDaemon(True)
test_methods = SearchCases.Search.search_test_cases(test_case)
self.test_cases = CaseConfig.Parser.apply_config(test_methods, case_config)
self.test_result = True
if case_config:
test_suite_name = os.path.splitext(os.path.basename(case_config))[0]
else:
test_suite_name = "TestRunner"
TinyFW.set_default_config(env_config_file=env_config_file, test_suite_name=test_suite_name)
test_methods = SearchCases.Search.search_test_cases(test_case)
self.test_cases = CaseConfig.Parser.apply_config(test_methods, case_config)
self.test_result = []
def run(self):
for case in self.test_cases:
self.test_result = self.test_result and case.run()
result = case.run()
self.test_result.append(result)
def get_test_result(self):
return self.test_result and all(self.test_result)
if __name__ == '__main__':
@ -76,5 +80,5 @@ if __name__ == '__main__':
except KeyboardInterrupt:
print("exit by Ctrl-C")
break
if not runner.test_result:
if not runner.get_test_result():
sys.exit(1)

View file

@ -106,6 +106,7 @@ get_passed_cases = TestResult.get_passed_cases
MANDATORY_INFO = {
"execution_time": 1,
"env_tag": "default",
"category": "function",
}
@ -132,12 +133,6 @@ def test_method(**kwargs):
case_info["name"] = test_func.__name__
case_info.update(kwargs)
# create env instance
env_config = DefaultEnvConfig.get_default_config()
for key in kwargs:
if key in env_config:
env_config[key] = kwargs[key]
@functools.wraps(test_func)
def handle_test(extra_data=None, **overwrite):
"""
@ -147,6 +142,12 @@ def test_method(**kwargs):
:param overwrite: args that runner or main want to overwrite
:return: None
"""
# create env instance
env_config = DefaultEnvConfig.get_default_config()
for key in kwargs:
if key in env_config:
env_config[key] = kwargs[key]
env_config.update(overwrite)
env_inst = Env.Env(**env_config)
# prepare for xunit test results

View file

@ -0,0 +1,70 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Internal use only.
This file provide method to control programmable attenuator.
"""
import time
import serial
def set_att(port, att, att_fix=False):
"""
set attenuation value on the attenuator
:param port: serial port for attenuator
:param att: attenuation value we want to set
:param att_fix: fix the deviation with experience value
:return: True or False
"""
assert 0 <= att <= 62
# fix att
if att_fix:
if att >= 33 and (att - 30 + 1) % 4 == 0:
att_t = att - 1
elif att >= 33 and (att - 30) % 4 == 0:
att_t = att + 1
else:
att_t = att
else:
att_t = att
serial_port = serial.Serial(port, baudrate=9600, rtscts=False, timeout=0.1)
if serial_port.isOpen() is False:
raise IOError("attenuator control, failed to open att port")
cmd_hex = "7e7e10{:02x}{:x}".format(att_t, 0x10 + att_t)
exp_res_hex = "7e7e20{:02x}00{:x}".format(att_t, 0x20 + att_t)
cmd = cmd_hex.decode("hex")
exp_res = exp_res_hex.decode("hex")
serial_port.write(cmd)
res = ""
for i in range(5):
res += serial_port.read(20)
if res == exp_res:
result = True
break
time.sleep(0.1)
else:
result = False
serial_port.close()
return result

View file

@ -122,6 +122,10 @@ class AssignTest(object):
"""
# subclass need to rewrite CI test job pattern, to filter all test jobs
CI_TEST_JOB_PATTERN = re.compile(r"^test_.+")
# by default we only run function in CI, as other tests could take long time
DEFAULT_FILTER = {
"category": "function",
}
def __init__(self, test_case_path, ci_config_file, case_group=Group):
self.test_case_path = test_case_path
@ -140,15 +144,17 @@ class AssignTest(object):
job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name))
return job_list
@staticmethod
def _search_cases(test_case_path, case_filter=None):
def _search_cases(self, test_case_path, case_filter=None):
"""
:param test_case_path: path contains test case folder
:param case_filter: filter for test cases
:param case_filter: filter for test cases. the filter to use is default filter updated with case_filter param.
:return: filtered test case list
"""
_case_filter = self.DEFAULT_FILTER.copy()
if case_filter:
_case_filter.update(case_filter)
test_methods = SearchCases.Search.search_test_cases(test_case_path)
return CaseConfig.filter_test_cases(test_methods, case_filter if case_filter else dict())
return CaseConfig.filter_test_cases(test_methods, _case_filter)
def _group_cases(self):
"""

View file

@ -0,0 +1,50 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import matplotlib
# fix can't draw figure with docker
matplotlib.use('Agg')
import matplotlib.pyplot as plt
# candidate colors
LINE_STYLE_CANDIDATE = ['b-o', 'r-o', 'k-o', 'm-o', 'c-o', 'g-o', 'y-o',
'b-s', 'r-s', 'k-s', 'm-s', 'c-s', 'g-s', 'y-s']
def draw_line_chart(file_name, title, x_label, y_label, data_list):
"""
draw line chart and save to file.
:param file_name: abs/relative file name to save chart figure
:param title: chart title
:param x_label: x-axis label
:param y_label: y-axis label
:param data_list: a list of line data.
each line is a dict of ("x-axis": list, "y-axis": list, "label": string)
"""
plt.figure(figsize=(12, 6))
plt.grid(True)
for i, data in enumerate(data_list):
plt.plot(data["x-axis"], data["y-axis"], LINE_STYLE_CANDIDATE[i], label=data["label"])
plt.xlabel(x_label)
plt.ylabel(y_label)
plt.legend(fontsize=12)
plt.title(title)
plt.tight_layout(pad=3, w_pad=3, h_pad=3)
plt.savefig(file_name)
plt.close()

View file

@ -0,0 +1,95 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Internal use only.
This file implements controlling APC PDU via telnet.
"""
import telnetlib
class Control(object):
""" control APC via telnet """
@classmethod
def apc_telnet_make_choice(cls, telnet, choice):
""" select a choice """
telnet.read_until("Event Log")
telnet.read_until(">")
telnet.write(choice + "\r\n")
@classmethod
def apc_telnet_common_action(cls, telnet, check_str, action):
""" wait until a pattern and then write a line """
telnet.read_until(check_str)
telnet.write(action + "\r\n")
@classmethod
def control(cls, apc_ip, control_dict):
"""
control APC
:param apc_ip: IP of APC
:param control_dict: dict with outlet ID and "ON" or "OFF"
"""
for _outlet in control_dict:
assert 0 < _outlet < 9
assert control_dict[_outlet] in ["ON", "OFF"]
# telnet
# set timeout as 2s so that it won't waste time even can't access APC
tn = telnetlib.Telnet(host=apc_ip, timeout=5)
# log on
cls.apc_telnet_common_action(tn, "User Name :", "apc")
cls.apc_telnet_common_action(tn, "Password :", "apc")
# go to Device Manager
cls.apc_telnet_make_choice(tn, "1")
# go to Outlet Management
cls.apc_telnet_make_choice(tn, "2")
# go to Outlet Control/Configuration
cls.apc_telnet_make_choice(tn, "1")
# do select Outlet and control
for _outlet in control_dict:
# choose Outlet
cls.apc_telnet_make_choice(tn, str(_outlet))
# choose Control Outlet
cls.apc_telnet_make_choice(tn, "1")
# choose action
_action = control_dict[_outlet]
if "ON" in _action:
cls.apc_telnet_make_choice(tn, "1")
else:
cls.apc_telnet_make_choice(tn, "2")
# do confirm
cls.apc_telnet_common_action(tn, "cancel :", "YES")
cls.apc_telnet_common_action(tn, "continue...", "")
# return to Outlet Control/Configuration
cls.apc_telnet_make_choice(tn, "\033")
cls.apc_telnet_make_choice(tn, "\033")
# exit to main menu and logout
tn.write("\033\r\n")
tn.write("\033\r\n")
tn.write("\033\r\n")
tn.write("4\r\n")
@classmethod
def control_rest(cls, apc_ip, outlet, action):
outlet_list = range(1, 9)
outlet_list.remove(outlet)
cls.control(apc_ip, dict.fromkeys(outlet_list, action))

View file

@ -69,7 +69,7 @@ Let's first check a simple simple::
if __name__ == '__main__':
TinyFW.set_default_config(config_file="EnvConfigTemplate.yml")
TinyFW.set_default_config(env_config_file="EnvConfigTemplate.yml")
test_examples_protocol_https_request()
@ -128,6 +128,8 @@ The following 3rd party lib is required:
* pyserial
* pyyaml
* xunitgen
* netifaces
* matplotlib (if use Utility.LineChart)
To build document, we need to install ``Sphinx`` and ``sphinx-rtd-theme`` (you may replace this with your own theme).

View file

@ -47,5 +47,5 @@ def test_examples_protocol_https_request(env, extra_data):
if __name__ == '__main__':
TinyFW.set_default_config(config_file="EnvConfigTemplate.yml", dut=IDF.IDFDUT)
TinyFW.set_default_config(env_config_file="EnvConfigTemplate.yml", dut=IDF.IDFDUT)
test_examples_protocol_https_request()