test: add test fw for example test

This commit is contained in:
He Yin Ling 2017-10-10 10:44:55 +08:00
parent 19aa3c72e9
commit 47a9a4a614
21 changed files with 2496 additions and 0 deletions

93
tools/tiny-test-fw/App.py Normal file
View file

@ -0,0 +1,93 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
class for handling Test Apps. Currently it provides the following features:
1. get SDK path
2. get SDK tools
3. parse application info from its path. for example:
* provide download info
* provide partition table info
Test Apps should inherent from BaseApp class and overwrite the methods.
"""
import os
import sys
import time
# timestamp used for calculate log folder name
LOG_FOLDER_TIMESTAMP = time.time()
class BaseApp(object):
"""
Base Class for App.
Defines the mandatory methods that App need to implement.
Also implements some common methods.
:param app_path: the path for app.
"""
def __init__(self, app_path):
pass
@classmethod
def get_sdk_path(cls):
"""
get sdk path.
subclass must overwrite this method.
:return: abs sdk path
"""
pass
@classmethod
def get_tools(cls):
"""
get SDK related tools for applications
subclass must overwrite this method.
:return: tuple, abs path of each tool
"""
pass
@classmethod
def get_log_folder(cls, test_suite_name):
"""
By default log folder is ``${SDK_PATH}/TEST_LOGS/${test_suite_name}_${timestamp}``.
The log folder name is consist once start running, ensure all logs of will be put into the same folder.
:param test_suite_name: the test suite name, by default it's the base file name for main module
:return: the log folder path
"""
if not test_suite_name:
test_suite_name = os.path.splitext(os.path.basename(sys.modules['__main__'].__file__))[0]
sdk_path = cls.get_sdk_path()
return os.path.join(sdk_path, "TEST_LOGS",
test_suite_name +
time.strftime("_%m%d_%H_%M_%S", time.localtime(LOG_FOLDER_TIMESTAMP)))
def process_app_info(self):
"""
parse built app info for DUTTool
subclass must overwrite this method.
:return: required info for specific DUTTool
"""
pass

View file

@ -0,0 +1,179 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Command line tool to assign example tests to CI test jobs.
"""
# TODO: Need to handle running examples on different chips
import os
import sys
import re
import argparse
import yaml
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path:
sys.path.insert(0, test_fw_path)
from Utility import CaseConfig, SearchCases, GitlabCIJob
class Group(object):
MAX_EXECUTION_TIME = 30
MAX_CASE = 15
SORT_KEYS = ["env_tag"]
def __init__(self, case):
self.execution_time = 0
self.case_list = [case]
self.filters = dict(zip(self.SORT_KEYS, [case.case_info[x] for x in self.SORT_KEYS]))
def accept_new_case(self):
"""
check if allowed to add any case to this group
:return: True or False
"""
max_time = (sum([x.case_info["execution_time"] for x in self.case_list]) < self.MAX_EXECUTION_TIME)
max_case = (len(self.case_list) < self.MAX_CASE)
return max_time and max_case
def add_case(self, case):
"""
add case to current group
:param case: test case
:return: True if add succeed, else False
"""
added = False
if self.accept_new_case():
for key in self.filters:
if case.case_info[key] != self.filters[key]:
break
else:
self.case_list.append(case)
added = True
return added
def output(self):
"""
output data for job configs
:return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
"""
output_data = {
"Filter": self.filters,
"CaseConfig": [{"name": x.case_info["name"]} for x in self.case_list],
}
return output_data
class AssignTest(object):
"""
Auto assign tests to CI jobs.
:param test_case: path of test case file(s)
:param ci_config_file: path of ``.gitlab-ci.yml``
"""
CI_TEST_JOB_PATTERN = re.compile(r"^example_test_.+")
def __init__(self, test_case, ci_config_file):
self.test_cases = self._search_cases(test_case)
self.jobs = self._parse_gitlab_ci_config(ci_config_file)
def _parse_gitlab_ci_config(self, ci_config_file):
with open(ci_config_file, "r") as f:
ci_config = yaml.load(f)
job_list = list()
for job_name in ci_config:
if self.CI_TEST_JOB_PATTERN.search(job_name) is not None:
job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name))
return job_list
@staticmethod
def _search_cases(test_case, case_filter=None):
"""
:param test_case: path contains test case folder
:param case_filter: filter for test cases
:return: filtered test case list
"""
test_methods = SearchCases.Search.search_test_cases(test_case)
return CaseConfig.filter_test_cases(test_methods, case_filter if case_filter else dict())
def _group_cases(self):
"""
separate all cases into groups according group rules. each group will be executed by one CI job.
:return: test case groups.
"""
groups = []
for case in self.test_cases:
for group in groups:
# add to current group
if group.add_case(case):
break
else:
# create new group
groups.append(Group(case))
return groups
def assign_cases(self):
"""
separate test cases to groups and assign test cases to CI jobs.
:raise AssertError: if failed to assign any case to CI job.
:return: None
"""
failed_to_assign = []
test_groups = self._group_cases()
for group in test_groups:
for job in self.jobs:
if job.match_group(group):
job.assign_group(group)
break
else:
failed_to_assign.append(group)
assert not failed_to_assign
def output_configs(self, output_path):
"""
:param output_path: path to output config files for each CI job
:return: None
"""
if not os.path.exists(output_path):
os.makedirs(output_path)
for job in self.jobs:
job.output_config(output_path)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("test_case",
help="test case folder or file")
parser.add_argument("ci_config_file",
help="gitlab ci config file")
parser.add_argument("output_path",
help="output path of config files")
args = parser.parse_args()
assign_test = AssignTest(args.test_case, args.ci_config_file)
assign_test.assign_cases()
assign_test.output_configs(args.output_path)

550
tools/tiny-test-fw/DUT.py Normal file
View file

@ -0,0 +1,550 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
DUT provides 3 major groups of features:
* DUT port feature, provide basic open/close/read/write features
* DUT tools, provide extra methods to control the device, like download and start app
* DUT expect method, provide features for users to check DUT outputs
The current design of DUT have 3 classes for one DUT: BaseDUT, DUTPort, DUTTool.
* BaseDUT class:
* defines methods DUT port and DUT tool need to overwrite
* provide the expect methods and some other methods based on DUTPort
* DUTPort class:
* inherent from BaseDUT class
* implements the port features by overwriting port methods defined in BaseDUT
* DUTTool class:
* inherent from one of the DUTPort class
* implements the tools features by overwriting tool methods defined in BaseDUT
* could add some new methods provided by the tool
This module implements the BaseDUT class and one of the port class SerialDUT.
User should implement their DUTTool classes.
If they using different port then need to implement their DUTPort class as well.
"""
import time
import re
import threading
import copy
import sys
import functools
import serial
from serial.tools import list_ports
if sys.version_info[0] == 2:
import Queue as _queue
else:
import queue as _queue
class ExpectTimeout(ValueError):
""" timeout for expect method """
pass
class UnsupportedExpectItem(ValueError):
""" expect item not supported by the expect method """
pass
def _expect_lock(func):
@functools.wraps(func)
def handler(self, *args, **kwargs):
with self.expect_lock:
ret = func(self, *args, **kwargs)
return ret
return handler
class _DataCache(_queue.Queue):
"""
Data cache based on Queue. Allow users to process data cache based on bytes instead of Queue."
"""
def __init__(self, maxsize=0):
_queue.Queue.__init__(self, maxsize=maxsize)
self.data_cache = str()
def get_data(self, timeout=0):
"""
get a copy of data from cache.
:param timeout: timeout for waiting new queue item
:return: copy of data cache
"""
# make sure timeout is non-negative
if timeout < 0:
timeout = 0
try:
data = self.get(timeout=timeout)
if isinstance(data, bytes):
# convert bytes to string
try:
data = data.decode("utf-8", "ignore")
except UnicodeDecodeError:
data = data.decode("iso8859-1",)
self.data_cache += data
except _queue.Empty:
# don't do anything when on update for cache
pass
return copy.deepcopy(self.data_cache)
def flush(self, index=0xFFFFFFFF):
"""
flush data from cache.
:param index: if < 0 then don't do flush, otherwise flush data before index
:return: None
"""
# first add data in queue to cache
self.get_data()
if index > 0:
self.data_cache = self.data_cache[index:]
class _RecvThread(threading.Thread):
def __init__(self, read, data_cache):
super(_RecvThread, self).__init__()
self.exit_event = threading.Event()
self.setDaemon(True)
self.read = read
self.data_cache = data_cache
def run(self):
while not self.exit_event.isSet():
data = self.read(1000)
if data:
self.data_cache.put(data)
def exit(self):
self.exit_event.set()
self.join()
class BaseDUT(object):
"""
:param name: application defined name for port
:param port: comport name, used to create DUT port
:param log_file: log file name
:param app: test app instance
:param kwargs: extra args for DUT to create ports
"""
DEFAULT_EXPECT_TIMEOUT = 5
def __init__(self, name, port, log_file, app, **kwargs):
self.expect_lock = threading.Lock()
self.name = name
self.port = port
self.log_file = log_file
self.app = app
self.data_cache = _DataCache()
self.receive_thread = None
# open and start during init
self.open()
def __str__(self):
return "DUT({}: {})".format(self.name, str(self.port))
# define for methods need to be overwritten by Port
@classmethod
def list_available_ports(cls):
"""
list all available ports.
subclass (port) must overwrite this method.
:return: list of available comports
"""
pass
def _port_open(self):
"""
open the port.
subclass (port) must overwrite this method.
:return: None
"""
pass
def _port_read(self, size=1):
"""
read form port. This method should not blocking for long time, otherwise receive thread can not exit.
subclass (port) must overwrite this method.
:param size: max size to read.
:return: read data.
"""
pass
def _port_write(self, data):
"""
write to port.
subclass (port) must overwrite this method.
:param data: data to write
:return: None
"""
pass
def _port_close(self):
"""
close port.
subclass (port) must overwrite this method.
:return: None
"""
pass
# methods that need to be overwritten by Tool
@classmethod
def confirm_dut(cls, port, app, **kwargs):
"""
confirm if it's a DUT, usually used by auto detecting DUT in by Env config.
subclass (tool) must overwrite this method.
:param port: comport
:param app: app instance
:return: True or False
"""
pass
def start_app(self):
"""
usually after we got DUT, we need to do some extra works to let App start.
For example, we need to reset->download->reset to let IDF application start on DUT.
subclass (tool) must overwrite this method.
:return: None
"""
pass
# methods that features raw port methods
def open(self):
"""
open port and create thread to receive data.
:return: None
"""
self._port_open()
self.receive_thread = _RecvThread(self._port_read, self.data_cache)
self.receive_thread.start()
def close(self):
"""
close receive thread and then close port.
:return: None
"""
if self.receive_thread:
self.receive_thread.exit()
self._port_close()
def write(self, data, eol="\r\n", flush=True):
"""
:param data: data
:param eol: end of line pattern.
:param flush: if need to flush received data cache before write data.
usually we need to flush data before write,
make sure processing outputs generated by wrote.
:return: None
"""
# do flush before write
if flush:
self.data_cache.flush()
# do write if cache
if data:
self._port_write(data + eol if eol else data)
@_expect_lock
def read(self, size=0xFFFFFFFF):
"""
read(size=0xFFFFFFFF)
read raw data. NOT suggested to use this method.
Only use it if expect method doesn't meet your requirement.
:param size: read size. default read all data
:return: read data
"""
data = self.data_cache.get_data(0)[:size]
self.data_cache.flush(size)
return data
# expect related methods
@staticmethod
def _expect_str(data, pattern):
"""
protected method. check if string is matched in data cache.
:param data: data to process
:param pattern: string
:return: pattern if match succeed otherwise None
"""
index = data.find(pattern)
if index != -1:
ret = pattern
index += len(pattern)
else:
ret = None
return ret, index
@staticmethod
def _expect_re(data, pattern):
"""
protected method. check if re pattern is matched in data cache
:param data: data to process
:param pattern: compiled RegEx pattern
:return: match groups if match succeed otherwise None
"""
ret = None
match = pattern.search(data)
if match:
ret = match.groups()
index = match.end()
else:
index = -1
return ret, index
EXPECT_METHOD = [
[type(re.compile("")), "_expect_re"],
[str, "_expect_str"],
]
def _get_expect_method(self, pattern):
"""
protected method. get expect method according to pattern type.
:param pattern: expect pattern, string or compiled RegEx
:return: ``_expect_str`` or ``_expect_re``
"""
for expect_method in self.EXPECT_METHOD:
if isinstance(pattern, expect_method[0]):
method = expect_method[1]
break
else:
raise UnsupportedExpectItem()
return self.__getattribute__(method)
@_expect_lock
def expect(self, pattern, timeout=DEFAULT_EXPECT_TIMEOUT):
"""
expect(pattern, timeout=DEFAULT_EXPECT_TIMEOUT)
expect received data on DUT match the pattern. will raise exception when expect timeout.
:raise ExpectTimeout: failed to find the pattern before timeout
:raise UnsupportedExpectItem: pattern is not string or compiled RegEx
:param pattern: string or compiled RegEx(string pattern)
:param timeout: timeout for expect
:return: string if pattern is string; matched groups if pattern is RegEx
"""
method = self._get_expect_method(pattern)
# non-blocking get data for first time
data = self.data_cache.get_data(0)
start_time = time.time()
while True:
ret, index = method(data, pattern)
if ret is not None or time.time() - start_time > timeout:
self.data_cache.flush(index)
break
# wait for new data from cache
data = self.data_cache.get_data(time.time() + timeout - start_time)
if ret is None:
raise ExpectTimeout(self.name + ": " + str(pattern))
return ret
def _expect_multi(self, expect_all, expect_item_list, timeout):
"""
protected method. internal logical for expect multi.
:param expect_all: True or False, expect all items in the list or any in the list
:param expect_item_list: expect item list
:param timeout: timeout
:return: None
"""
def process_expected_item(item_raw):
# convert item raw data to standard dict
item = {
"pattern": item_raw[0] if isinstance(item_raw, tuple) else item_raw,
"method": self._get_expect_method(item_raw[0] if isinstance(item_raw, tuple)
else item_raw),
"callback": item_raw[1] if isinstance(item_raw, tuple) else None,
"index": -1,
"ret": None,
}
return item
expect_items = [process_expected_item(x) for x in expect_item_list]
# non-blocking get data for first time
data = self.data_cache.get_data(0)
start_time = time.time()
matched_expect_items = list()
while True:
for expect_item in expect_items:
if expect_item not in matched_expect_items:
# exclude those already matched
expect_item["ret"], expect_item["index"] = \
expect_item["method"](data, expect_item["pattern"])
if expect_item["ret"] is not None:
# match succeed for one item
matched_expect_items.append(expect_item)
break
# if expect all, then all items need to be matched,
# else only one item need to matched
if expect_all:
match_succeed = (matched_expect_items == expect_items)
else:
match_succeed = True if matched_expect_items else False
if time.time() - start_time > timeout or match_succeed:
break
else:
data = self.data_cache.get_data(time.time() + timeout - start_time)
if match_succeed:
# do callback and flush matched data cache
slice_index = -1
for expect_item in matched_expect_items:
# trigger callback
if expect_item["callback"]:
expect_item["callback"](expect_item["ret"])
slice_index = max(slice_index, expect_item["index"])
# flush already matched data
self.data_cache.flush(slice_index)
else:
raise ExpectTimeout(self.name + ": " + str(expect_items))
@_expect_lock
def expect_any(self, *expect_items, **timeout):
"""
expect_any(*expect_items, timeout=DEFAULT_TIMEOUT)
expect any of the patterns.
will call callback (if provided) if pattern match succeed and then return.
will pass match result to the callback.
:raise ExpectTimeout: failed to match any one of the expect items before timeout
:raise UnsupportedExpectItem: pattern in expect_item is not string or compiled RegEx
:arg expect_items: one or more expect items.
string, compiled RegEx pattern or (string or RegEx(string pattern), callback)
:keyword timeout: timeout for expect
:return: None
"""
# to be compatible with python2
# in python3 we can write f(self, *expect_items, timeout=DEFAULT_TIMEOUT)
if "timeout" not in timeout:
timeout["timeout"] = self.DEFAULT_EXPECT_TIMEOUT
return self._expect_multi(False, expect_items, **timeout)
@_expect_lock
def expect_all(self, *expect_items, **timeout):
"""
expect_all(*expect_items, timeout=DEFAULT_TIMEOUT)
expect all of the patterns.
will call callback (if provided) if all pattern match succeed and then return.
will pass match result to the callback.
:raise ExpectTimeout: failed to match all of the expect items before timeout
:raise UnsupportedExpectItem: pattern in expect_item is not string or compiled RegEx
:arg expect_items: one or more expect items.
string, compiled RegEx pattern or (string or RegEx(string pattern), callback)
:keyword timeout: timeout for expect
:return: None
"""
# to be compatible with python2
# in python3 we can write f(self, *expect_items, timeout=DEFAULT_TIMEOUT)
if "timeout" not in timeout:
timeout["timeout"] = self.DEFAULT_EXPECT_TIMEOUT
return self._expect_multi(True, expect_items, **timeout)
class SerialDUT(BaseDUT):
""" serial with logging received data feature """
DEFAULT_UART_CONFIG = {
"baudrate": 115200,
"bytesize": serial.EIGHTBITS,
"parity": serial.PARITY_NONE,
"stopbits": serial.STOPBITS_ONE,
"timeout": 0.05,
"xonxoff": False,
"rtscts": False,
}
def __init__(self, name, port, log_file, app, **kwargs):
self.port_inst = None
self.serial_configs = self.DEFAULT_UART_CONFIG.copy()
self.serial_configs.update(kwargs)
super(SerialDUT, self).__init__(name, port, log_file, app, **kwargs)
@staticmethod
def _format_data(data):
"""
format data for logging. do decode and add timestamp.
:param data: raw data from read
:return: formatted data (str)
"""
timestamp = time.time()
timestamp = "{}:{}".format(time.strftime("%m-%d %H:%M:%S", time.localtime(timestamp)),
str(timestamp % 1)[2:5])
try:
formatted_data = "[{}]:\r\n{}\r\n".format(timestamp, data.decode("utf-8", "ignore"))
except UnicodeDecodeError:
# if utf-8 fail, use iso-8859-1 (single char codec with range 0-255)
formatted_data = "[{}]:\r\n{}\r\n".format(timestamp, data.decode("iso8859-1",))
return formatted_data
def _port_open(self):
self.port_inst = serial.Serial(self.port, **self.serial_configs)
def _port_close(self):
self.port_inst.close()
def _port_read(self, size=1):
data = self.port_inst.read(size)
if data:
with open(self.log_file, "a+") as _log_file:
_log_file.write(self._format_data(data))
return data
def _port_write(self, data):
self.port_inst.write(data)
@classmethod
def list_available_ports(cls):
return [x.device for x in list_ports.comports()]

156
tools/tiny-test-fw/Env.py Normal file
View file

@ -0,0 +1,156 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Test Env, manages DUT, App and EnvConfig, interface for test cases to access these components """
import os
import threading
import functools
import EnvConfig
def _synced(func):
@functools.wraps(func)
def decorator(self, *args, **kwargs):
with self.lock:
ret = func(self, *args, **kwargs)
return ret
decorator.__doc__ = func.__doc__
return decorator
class Env(object):
"""
test env, manages DUTs and env configs.
:keyword app: class for default application
:keyword dut: class for default DUT
:keyword env_tag: test env tag, used to select configs from env config file
:keyword env_config_file: test env config file path
:keyword test_name: test suite name, used when generate log folder name
"""
def __init__(self,
app=None,
dut=None,
env_tag=None,
env_config_file=None,
test_name=None,
**kwargs):
self.app_cls = app
self.default_dut_cls = dut
self.config = EnvConfig.Config(env_config_file, env_tag)
self.log_path = self.app_cls.get_log_folder(test_name)
if not os.path.exists(self.log_path):
os.makedirs(self.log_path)
self.allocated_duts = dict()
self.lock = threading.RLock()
@_synced
def get_dut(self, dut_name, app_path, dut_class=None, app_class=None):
"""
get_dut(dut_name, app_path, dut_class=None, app_class=None)
:param dut_name: user defined name for DUT
:param app_path: application path, app instance will use this path to process application info
:param dut_class: dut class, if not specified will use default dut class of env
:param app_class: app class, if not specified will use default app of env
:return: dut instance
"""
if dut_name in self.allocated_duts:
dut = self.allocated_duts[dut_name]["dut"]
else:
if dut_class is None:
dut_class = self.default_dut_cls
if app_class is None:
app_class = self.app_cls
app_inst = app_class(app_path)
try:
port = self.config.get_variable(dut_name)
except ValueError:
# try to auto detect ports
allocated_ports = [self.allocated_duts[x]["port"] for x in self.allocated_duts]
available_ports = dut_class.list_available_ports()
for port in available_ports:
if port not in allocated_ports:
if dut_class.confirm_dut(port, app_inst):
break
else:
port = None
if port:
try:
dut_config = self.get_variable(dut_name + "_port_config")
except ValueError:
dut_config = dict()
dut = self.default_dut_cls(dut_name, port,
os.path.join(self.log_path, dut_name + ".log"),
app_inst,
**dut_config)
self.allocated_duts[dut_name] = {"port": port, "dut": dut}
else:
raise ValueError("Failed to get DUT")
return dut
@_synced
def close_dut(self, dut_name):
"""
close_dut(dut_name)
close one DUT by name if DUT name is valid (the name used by ``get_dut``). otherwise will do nothing.
:param dut_name: user defined name for DUT
:return: None
"""
try:
dut = self.allocated_duts.pop(dut_name)["dut"]
dut.close()
except KeyError:
pass
@_synced
def get_variable(self, variable_name):
"""
get_variable(variable_name)
get variable from config file. If failed then try to auto-detected it.
:param variable_name: name of the variable
:return: value of variable if successfully found. otherwise None.
"""
return self.config.get_variable(variable_name)
@_synced
def get_pc_nic_info(self, nic_name="pc_nic"):
"""
get_pc_nic_info(nic_name="pc_nic")
try to get nic info (ip address, ipv6 address, mac address)
:param nic_name: pc nic name. allows passing variable name, nic name value or omitted (to get default nic info).
:return: a dict of address ("ipv4", "ipv6", "mac") if successfully found. otherwise None.
"""
# TODO: need to implement auto get nic info method
return self.config.get_variable("nic_info/" + nic_name)
@_synced
def close(self):
"""
close()
close all DUTs of the Env.
:return: None
"""
for dut_name in self.allocated_duts:
dut = self.allocated_duts[dut_name]["dut"]
dut.close()
self.allocated_duts = dict()

View file

@ -0,0 +1,74 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The test env could change when we running test from different computers.
Test env config provide ``get_variable`` method to allow user get test environment related variables.
It will first try to get variable from config file.
If failed, then it will try to auto detect (Not supported yet).
Config file format is yaml. it's a set of key-value pair. The following is an example of config file::
Example_WIFI:
ap_ssid: "myssid"
ap_password: "mypassword"
Example_ShieldBox:
attenuator_port: "/dev/ttyUSB2"
ap_ssid: "myssid"
ap_password: "mypassword"
It will first define the env tag for each environment, then add its key-value pairs.
This will prevent test cases from getting configs from other env when there're configs for multiple env in one file.
"""
import yaml
class Config(object):
""" Test Env Config """
def __init__(self, config_file, env_tag):
self.configs = self.load_config_file(config_file, env_tag)
@staticmethod
def load_config_file(config_file, env_name):
"""
load configs from config file.
:param config_file: config file path
:param env_name: env tag name
:return: configs for the test env
"""
try:
with open(config_file) as f:
configs = yaml.load(f)[env_name]
except (OSError, TypeError):
configs = dict()
return configs
def get_variable(self, variable_name):
"""
first try to get from config file. if not found, try to auto detect the variable.
:param variable_name: name of variable
:return: value or None
"""
try:
value = self.configs[variable_name]
except KeyError:
#TODO: to support auto get variable here
value = None
if value is None:
raise ValueError("Failed to get variable")
return value

View file

@ -0,0 +1,6 @@
.external_ap: &external_ap
ap_ssid: "myssid"
ap_password: "mypassword"
Examples_WIFI:
<<: external_ap

View file

@ -0,0 +1,164 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" IDF Test Applications """
import subprocess
import os
import App
class IDFApp(App.BaseApp):
"""
Implements common esp-idf application behavior.
idf applications should inherent from this class and overwrite method get_binary_path.
"""
IDF_DOWNLOAD_CONFIG_FILE = "download.config"
def __init__(self, app_path):
super(IDFApp, self).__init__(app_path)
self.idf_path = self.get_sdk_path()
self.binary_path = self.get_binary_path(app_path)
assert os.path.exists(self.binary_path)
assert self.IDF_DOWNLOAD_CONFIG_FILE in os.listdir(self.binary_path)
self.esptool, self.partition_tool = self.get_tools()
@classmethod
def get_sdk_path(cls):
idf_path = os.getenv("IDF_PATH")
assert idf_path
assert os.path.exists(idf_path)
return idf_path
@classmethod
def get_tools(cls):
idf_path = cls.get_sdk_path()
# get esptool and partition tool for esp-idf
esptool = os.path.join(idf_path, "components",
"esptool_py", "esptool", "esptool.py")
partition_tool = os.path.join(idf_path, "components",
"partition_table", "gen_esp32part.py")
assert os.path.exists(esptool) and os.path.exists(partition_tool)
return esptool, partition_tool
def get_binary_path(self, app_path):
"""
get binary path according to input app_path.
subclass must overwrite this method.
:param app_path: path of application
:return: abs app binary path
"""
pass
def process_arg(self, arg):
"""
process args in download.config. convert to abs path for .bin args. strip spaces and CRLFs.
"""
if ".bin" in arg:
ret = os.path.join(self.binary_path, arg)
else:
ret = arg
return ret.strip("\r\n ")
def process_app_info(self):
"""
get app download config and partition info from a specific app path
:return: download config, partition info
"""
with open(os.path.join(self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE), "r") as f:
configs = f.read().split(" ")
download_configs = ["--chip", "auto", "--before", "default_reset",
"--after", "hard_reset", "write_flash", "-z"]
download_configs += [self.process_arg(x) for x in configs]
# handle partition table
for partition_file in download_configs:
if "partition" in partition_file:
partition_file = os.path.join(self.binary_path, partition_file)
break
else:
raise ValueError("No partition table found for IDF binary path: {}".format(self.binary_path))
process = subprocess.Popen(["python", self.partition_tool, partition_file],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data = process.stdout.read()
if isinstance(raw_data, bytes):
raw_data = raw_data.decode()
partition_table = dict()
for line in raw_data.splitlines():
if line[0] != "#":
try:
_name, _type, _subtype, _offset, _size, _flags = line.split(",")
if _size[-1] == "K":
_size = int(_size[:-1]) * 1024
elif _size[-1] == "M":
_size = int(_size[:-1]) * 1024 * 1024
else:
_size = int(_size)
except ValueError:
continue
partition_table[_name] = {
"type": _type,
"subtype": _subtype,
"offset": _offset,
"size": _size,
"flags": _flags
}
return download_configs, partition_table
class Example(IDFApp):
def get_binary_path(self, app_path):
# build folder of example path
path = os.path.join(self.idf_path, app_path, "build")
if not os.path.exists(path):
# search for CI build folders
app = os.path.basename(app_path)
example_path = os.path.join(self.idf_path, "build_examples", "example_builds")
for dirpath, dirnames, files in os.walk(example_path):
if dirnames:
if dirnames[0] == app:
path = os.path.join(example_path, dirpath, dirnames[0], "build")
break
else:
raise OSError("Failed to find example binary")
return path
class UT(IDFApp):
def get_binary_path(self, app_path):
if app_path:
# specified path, join it and the idf path
path = os.path.join(self.idf_path, app_path)
else:
path = os.path.join(self.idf_path, "tools", "unit-test-app", "build")
return path
class SSC(IDFApp):
def get_binary_path(self, app_path):
# TODO: to implement SSC get binary path
return app_path
class AT(IDFApp):
def get_binary_path(self, app_path):
# TODO: to implement AT get binary path
return app_path

View file

@ -0,0 +1,126 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" DUT for IDF applications """
import os
import re
import subprocess
import functools
import DUT
class IDFToolError(OSError):
pass
def _tool_method(func):
""" close port, execute tool method and then reopen port """
@functools.wraps(func)
def handler(self, *args, **kwargs):
self.close()
ret = func(self, *args, **kwargs)
self.open()
return ret
return handler
class IDFDUT(DUT.SerialDUT):
""" IDF DUT, extends serial with ESPTool methods """
CHIP_TYPE_PATTERN = re.compile(r"Detecting chip type[.:\s]+(.+)")
def __init__(self, name, port, log_file, app, **kwargs):
self.download_config, self.partition_table = app.process_app_info()
super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
@classmethod
def get_chip(cls, app, port):
"""
get chip id via esptool
:param app: application instance (to get tool)
:param port: comport
:return: chip ID or None
"""
try:
output = subprocess.check_output(["python", app.esptool, "--port", port, "chip_id"])
except subprocess.CalledProcessError:
output = bytes()
if isinstance(output, bytes):
output = output.decode()
chip_type = cls.CHIP_TYPE_PATTERN.search(output)
return chip_type.group(1) if chip_type else None
@classmethod
def confirm_dut(cls, port, app, **kwargs):
return cls.get_chip(app, port) is not None
@_tool_method
def start_app(self):
"""
download and start app.
:return: None
"""
retry_baud_rates = ["921600", "115200"]
error = IDFToolError()
for baud_rate in retry_baud_rates:
try:
subprocess.check_output(["python", self.app.esptool,
"--port", self.port, "--baud", baud_rate]
+ self.download_config)
break
except subprocess.CalledProcessError as error:
continue
else:
raise error
@_tool_method
def reset(self):
"""
reset DUT with esptool
:return: None
"""
subprocess.check_output(["python", self.app.esptool, "--port", self.port, "run"])
@_tool_method
def dump_flush(self, output_file, **kwargs):
"""
dump flush
:param output_file: output file name, if relative path, will use sdk path as base path.
:keyword partition: partition name, dump the partition.
``partition`` is preferred than using ``address`` and ``size``.
:keyword address: dump from address (need to be used with size)
:keyword size: dump size (need to be used with address)
:return: None
"""
if os.path.isabs(output_file) is False:
output_file = os.path.relpath(output_file, self.app.get_log_folder())
if "partition" in kwargs:
partition = self.partition_table[kwargs["partition"]]
_address = partition["offset"]
_size = partition["size"]
elif "address" in kwargs and "size" in kwargs:
_address = kwargs["address"]
_size = kwargs["size"]
else:
raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash")
subprocess.check_output(
["python", self.app.esptool, "--port", self.port, "--baud", "921600",
"--before", "default_reset", "--after", "hard_reset", "read_flash",
_address, _size, output_file]
)

View file

@ -0,0 +1,36 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import TinyFW
from IDF.IDFApp import Example, UT
from IDF.IDFDUT import IDFDUT
def idf_example_test(app=Example, dut=IDFDUT, chip="ESP32",
module="examples", execution_time=1,
**kwargs):
"""
decorator for testing idf examples (with default values for some keyword args).
:param app: test application class
:param dut: dut class
:param chip: chip supported, string or tuple
:param module: module, string
:param execution_time: execution time in minutes, int
:param kwargs: other keyword args
:return: test method
"""
# not use partial function as define as function support auto generating document
return TinyFW.test_method(app=app, dut=dut, chip=chip, module=module,
execution_time=execution_time, **kwargs)

View file

@ -0,0 +1,80 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Command line interface to run test cases from a given path.
* search and run test cases of a given path
* config file which support to filter test cases and passing data to test case
Use ``python Runner.py test_case_path -c config_file -e env_config_file`` to run test cases.
"""
import os
import sys
import argparse
import threading
import TinyFW
from Utility import SearchCases, CaseConfig
class Runner(threading.Thread):
"""
:param test_case: test case file or folder
:param case_config: case config file, allow to filter test cases and pass data to test case
:param env_config_file: env config file
"""
def __init__(self, test_case, case_config, env_config_file=None):
super(Runner, self).__init__()
self.setDaemon(True)
test_methods = SearchCases.Search.search_test_cases(test_case)
self.test_cases = CaseConfig.Parser.apply_config(test_methods, case_config)
self.test_result = True
if case_config:
test_suite_name = os.path.splitext(os.path.basename(case_config))[0]
else:
test_suite_name = "TestRunner"
TinyFW.set_default_config(env_config_file=env_config_file, test_suite_name=test_suite_name)
def run(self):
for case in self.test_cases:
self.test_result = self.test_result and case.run()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("test_case",
help="test case folder or file")
parser.add_argument("--case_config", "-c", default=None,
help="case filter/config file")
parser.add_argument("--env_config_file", "-e", default=None,
help="test env config file")
args = parser.parse_args()
runner = Runner(args.test_case, args.case_config, args.env_config_file)
runner.start()
while True:
try:
runner.join(1)
if not runner.isAlive():
break
except KeyboardInterrupt:
print("exit by Ctrl-C")
break
if not runner.test_result:
sys.exit(1)

View file

@ -0,0 +1,53 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
class TestCase(object):
"""
Test Case Object, mainly used with runner.
runner can parse all test cases from a given path, set data and config for test case in prepare stage.
TestCase instance will record these data, provide run method to let runner execute test case.
:param test_method: test function
:param extra_data: data passed to test function
:param overwrite_args: kwargs that overwrite original test case configs
"""
DEFAULT_CASE_DOC = dict()
def __init__(self, test_method, extra_data, **overwrite_args):
self.test_method = test_method
self.extra_data = extra_data
self.overwrite_args = overwrite_args
def run(self):
""" execute the test case """
return self.test_method(self.extra_data, **self.overwrite_args)
def document(self):
"""
generate test case document.
parse the case doc with yaml parser and update to original case attributes.
:return: case document, dict of case attributes and values
"""
doc_string = self.test_method.__doc__
try:
doc = yaml.load(doc_string)
except (AttributeError, OSError, UnicodeDecodeError):
doc = self.DEFAULT_CASE_DOC
doc.update(self.test_method.env_args)
doc.update(self.test_method.accepted_filter)
return doc

View file

@ -0,0 +1,220 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Interface for test cases. """
import sys
import os
import time
import traceback
import inspect
import functools
import xunitgen
import Env
import DUT
import App
XUNIT_FILE_NAME = "XUNIT_RESULT.xml"
XUNIT_RECEIVER = xunitgen.EventReceiver()
XUNIT_DEFAULT_TEST_SUITE = "test-suite"
_COLOR_CODES = {
"white": '\033[0m',
"red": '\033[31m',
"green": '\033[32m',
"orange": '\033[33m',
"blue": '\033[34m',
"purple": '\033[35m',
"W": '\033[0m',
"R": '\033[31m',
"G": '\033[32m',
"O": '\033[33m',
"B": '\033[34m',
"P": '\033[35m'
}
def console_log(data, color="white"):
"""
log data to console.
(if not flush console log, Gitlab-CI won't update logs during job execution)
:param data: data content
:param color: color
"""
if color not in _COLOR_CODES:
color = "white"
color_codes = _COLOR_CODES[color]
print(color_codes + data)
if color not in ["white", "W"]:
# reset color to white for later logs
print(_COLOR_CODES["white"] + "\r")
sys.stdout.flush()
class DefaultEnvConfig(object):
"""
default test configs. There're 3 places to set configs, priority is (high -> low):
1. overwrite set by caller of test method
2. values set by test_method decorator
3. default env config get from this class
"""
DEFAULT_CONFIG = {
"app": App.BaseApp,
"dut": DUT.BaseDUT,
"env_tag": "default",
"env_config_file": None,
"test_suite_name": None,
}
@classmethod
def set_default_config(cls, **kwargs):
"""
:param kwargs: configs need to be updated
:return: None
"""
cls.DEFAULT_CONFIG.update(kwargs)
@classmethod
def get_default_config(cls):
"""
:return: current default config
"""
return cls.DEFAULT_CONFIG.copy()
set_default_config = DefaultEnvConfig.set_default_config
get_default_config = DefaultEnvConfig.get_default_config
class TestResult(object):
TEST_RESULT = {
"pass": [],
"fail": [],
}
@classmethod
def get_failed_cases(cls):
"""
:return: failed test cases
"""
return cls.TEST_RESULT["fail"]
@classmethod
def get_passed_cases(cls):
"""
:return: passed test cases
"""
return cls.TEST_RESULT["pass"]
@classmethod
def set_result(cls, result, case_name):
"""
:param result: True or False
:param case_name: test case name
:return: None
"""
cls.TEST_RESULT["pass" if result else "fail"].append(case_name)
get_failed_cases = TestResult.get_failed_cases
get_passed_cases = TestResult.get_passed_cases
MANDATORY_INFO = {
"execution_time": 1,
"env_tag": "default",
}
def test_method(**kwargs):
"""
decorator for test case function.
The following keyword arguments are pre-defined.
Any other keyword arguments will be regarded as filter for the test case,
able to access them by ``case_info`` attribute of test method.
:keyword app: class for test app. see :doc:`App <App>` for details
:keyword dut: class for current dut. see :doc:`DUT <DUT>` for details
:keyword env_tag: name for test environment, used to select configs from config file
:keyword env_config_file: test env config file. usually will not set this keyword when define case
:keyword test_suite_name: test suite name, used for generating log folder name and adding xunit format test result.
usually will not set this keyword when define case
"""
def test(test_func):
# get test function file name
frame = inspect.stack()
test_func_file_name = frame[1][1]
case_info = MANDATORY_INFO.copy()
case_info["name"] = test_func.__name__
case_info.update(kwargs)
# create env instance
env_config = DefaultEnvConfig.get_default_config()
for key in kwargs:
if key in env_config:
env_config[key] = kwargs[key]
@functools.wraps(test_func)
def handle_test(extra_data=None, **overwrite):
"""
create env, run test and record test results
:param extra_data: extra data that runner or main passed to test case
:param overwrite: args that runner or main want to overwrite
:return: None
"""
env_config.update(overwrite)
env_inst = Env.Env(**env_config)
# prepare for xunit test results
xunit_file = os.path.join(env_inst.app_cls.get_log_folder(env_config["test_suite_name"]),
XUNIT_FILE_NAME)
XUNIT_RECEIVER.begin_case(test_func.__name__, time.time(), test_func_file_name)
try:
console_log("starting running test: " + test_func.__name__, color="green")
# execute test function
test_func(env_inst, extra_data)
# if finish without exception, test result is True
result = True
except Exception as e:
# handle all the exceptions here
traceback.print_exc()
result = False
# log failure
XUNIT_RECEIVER.failure(str(e), test_func_file_name)
finally:
# do close all DUTs
env_inst.close()
# end case and output result
XUNIT_RECEIVER.end_case(test_func.__name__, time.time())
with open(xunit_file, "ab+") as f:
f.write(xunitgen.toxml(XUNIT_RECEIVER.results(),
XUNIT_DEFAULT_TEST_SUITE))
if result:
console_log("Test Succeed: " + test_func.__name__, color="green")
else:
console_log(("Test Fail: " + test_func.__name__), color="red")
TestResult.set_result(result, test_func.__name__)
return result
handle_test.case_info = case_info
handle_test.test_method = True
return handle_test
return test

View file

@ -0,0 +1,199 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Processing case config files.
This is mainly designed for CI, we need to auto create and assign test jobs.
Template Config File::
TestConfig:
app:
path: Users/Test/TinyTestFW/IDF/IDFApp.py
class: Example
dut:
path:
class:
config_file: /somewhere/config_file_for_runner
test_name: CI_test_job_1
Filter:
chip: ESP32
env_tag: default
CaseConfig:
- name: test_examples_protocol_https_request
# optional
extra_data: some extra data passed to case with kwarg extra_data
overwrite: # overwrite test configs
app:
path: Users/Test/TinyTestFW/IDF/IDFApp.py
class: Example
- name: xxx
"""
# TODO: add a function to use suitable import lib for python2 and python3
import imp
import yaml
import TestCase
def _filter_one_case(test_method, case_filter):
""" Apply filter for one case (the filter logic is the same as described in ``filter_test_cases``) """
filter_result = True
for key in case_filter:
if key in test_method.case_info:
# the filter key is both in case and filter
# we need to check if they match
filter_item, accepted_item = case_filter[key], test_method.case_info[key]
if isinstance(filter_item, (tuple, list)) \
and isinstance(accepted_item, (tuple, list)):
# both list/tuple, check if they have common item
filter_result = True if set(filter_item) & set(accepted_item) else False
elif isinstance(filter_item, (tuple, list)):
# filter item list/tuple, check if case accepted value in filter item list/tuple
filter_result = True if accepted_item in filter_item else False
elif isinstance(accepted_item, (tuple, list)):
# accepted item list/tuple, check if case filter value is in accept item list/tuple
filter_result = True if filter_item in accepted_item else False
else:
# both string/int, just do string compare
filter_result = (filter_item == accepted_item)
else:
# key in filter only, which means the case supports all values for this filter key, match succeed
pass
if not filter_result:
# match failed
break
return filter_result
def filter_test_cases(test_methods, case_filter):
"""
filter test case. filter logic:
1. if filter key both in case attribute and filter:
* if both value is string/int, then directly compare
* if one is list/tuple, the other one is string/int, then check if string/int is in list/tuple
* if both are list/tuple, then check if they have common item
2. if only case attribute or filter have the key, filter succeed
for example, the following are match succeed scenarios
(the rule is symmetric, result is same if exchange values for user filter and case attribute):
* user case filter is ``chip: ["esp32", "esp32c"]``, case doesn't have ``chip`` attribute
* user case filter is ``chip: ["esp32", "esp32c"]``, case attribute is ``chip: "esp32"``
* user case filter is ``chip: "esp32"``, case attribute is ``chip: "esp32"``
:param test_methods: a list of test methods functions
:param case_filter: case filter
:return: filtered test methods
"""
filtered_test_methods = []
for test_method in test_methods:
if _filter_one_case(test_method, case_filter):
filtered_test_methods.append(test_method)
return filtered_test_methods
class Parser(object):
DEFAULT_CONFIG = {
"TestConfig": dict(),
"Filter": dict(),
"CaseConfig": [{"extra_data": None}],
}
@classmethod
def parse_config_file(cls, config_file):
"""
parse from config file and then update to default config.
:param config_file: config file path
:return: configs
"""
configs = cls.DEFAULT_CONFIG.copy()
if config_file:
with open(config_file, "r") as f:
configs.update(yaml.load(f))
return configs
@classmethod
def handle_overwrite_args(cls, overwrite):
"""
handle overwrite configs. import module from path and then get the required class.
:param overwrite: overwrite args
:return: dict of (original key: class)
"""
output = dict()
for key in overwrite:
_path = overwrite[key]["path"]
# TODO: add a function to use suitable import lib for python2 and python3
_module = imp.load_source(str(hash(_path)), overwrite[key]["path"])
output[key] = _module.__getattribute__(overwrite[key]["class"])
return output
@classmethod
def apply_config(cls, test_methods, config_file):
"""
apply config for test methods
:param test_methods: a list of test methods functions
:param config_file: case filter file
:return: filtered cases
"""
configs = cls.parse_config_file(config_file)
test_case_list = []
for _config in configs["CaseConfig"]:
_filter = configs["Filter"].copy()
_filter.update(_config)
_overwrite = cls.handle_overwrite_args(_filter.pop("overwrite", dict()))
_extra_data = _filter.pop("extra_data", None)
for test_method in test_methods:
if _filter_one_case(test_method, _filter):
test_case_list.append(TestCase.TestCase(test_method, _extra_data, **_overwrite))
return test_case_list
class Generator(object):
""" Case config file generator """
def __init__(self):
self.default_config = {
"TestConfig": dict(),
"Filter": dict(),
}
def set_default_configs(self, test_config, case_filter):
"""
:param test_config: "TestConfig" value
:param case_filter: "Filter" value
:return: None
"""
self.default_config = {"TestConfig": test_config, "Filter": case_filter}
def generate_config(self, case_configs, output_file):
"""
:param case_configs: "CaseConfig" value
:param output_file: output file path
:return: None
"""
config = self.default_config.copy()
config.update({"CaseConfig": case_configs})
with open(output_file, "w") as f:
yaml.dump(config, f)

View file

@ -0,0 +1,73 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import yaml
class Job(dict):
"""
Gitlab CI job
:param job: job data loaded from .gitlab-ci.yml
:param job_name: job name
"""
def __init__(self, job, job_name):
super(Job, self).__init__(job)
self["name"] = job_name
def match_group(self, group):
"""
Match group by tags of job.
All filters values of group should be included in tags.
:param group: case group to match
:return: True or False
"""
match_result = False
for _ in range(1):
if "case group" in self:
# this job is already assigned
break
for value in group.filters.values():
if value not in self["tags"]:
break
else:
continue
break
else:
match_result = True
return match_result
def assign_group(self, group):
"""
assign a case group to a test job.
:param group: the case group to assign
"""
self["case group"] = group
def output_config(self, file_path):
"""
output test config to the given path.
file name will be job_name.yml
:param file_path: output file path
:return: None
"""
file_name = os.path.join(file_path, self["name"] + ".yml")
if "case group" in self:
with open(file_name, "w") as f:
yaml.dump(self["case group"].output(), f)

View file

@ -0,0 +1,112 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" search test cases from a given file or path """
import os
import fnmatch
import types
import copy
# TODO: add a function to use suitable import lib for python2 and python3
import imp
class Search(object):
TEST_CASE_FILE_PATTERN = "*_test.py"
@classmethod
def _search_cases_from_file(cls, file_name):
""" get test cases from test case .py file """
print("Try to get cases from: " + file_name)
test_functions = []
try:
# TODO: add a function to use suitable import lib for python2 and python3
mod = imp.load_source(str(hash(file_name)), file_name)
for func in [mod.__getattribute__(x) for x in dir(mod)
if isinstance(mod.__getattribute__(x), types.FunctionType)]:
try:
# test method decorator will add test_method attribute to test function
if func.test_method:
test_functions.append(func)
except AttributeError:
continue
except ImportError as e:
print("ImportError: \r\n\tFile:" + file_name + "\r\n\tError:" + str(e))
for i, test_function in enumerate(test_functions):
print("\t{}. ".format(i+1) + test_function.case_info["name"])
return test_functions
@classmethod
def _search_test_case_files(cls, test_case, file_pattern):
""" search all test case files recursively of a path """
if not os.path.exists(test_case):
raise OSError("test case path not exist")
if os.path.isdir(test_case):
test_case_files = []
for root, _, file_names in os.walk(test_case):
for filename in fnmatch.filter(file_names, file_pattern):
test_case_files.append(os.path.join(root, filename))
else:
test_case_files = [test_case]
return test_case_files
@classmethod
def replicate_case(cls, case):
"""
Replicate cases according to its filter values.
If one case has specified filter chip=(ESP32, ESP32C),
it will create 2 cases, one for ESP32 and on for ESP32C.
Once the cases are replicated, it's easy to filter those we want to execute.
:param case: the original case
:return: a list of replicated cases
"""
replicate_config = []
for key in case.case_info:
if isinstance(case.case_info[key], (list, tuple)):
replicate_config.append(key)
def _replicate_for_key(case_list, replicate_key, replicate_list):
case_out = []
for _case in case_list:
for value in replicate_list:
new_case = copy.deepcopy(_case)
new_case.case_info[replicate_key] = value
case_out.append(new_case)
return case_out
replicated_cases = [case]
for key in replicate_config:
replicated_cases = _replicate_for_key(replicated_cases, key, case.case_info[key])
return replicated_cases
@classmethod
def search_test_cases(cls, test_case):
"""
search all test cases from a folder or file, and then do case replicate.
:param test_case: test case file(s) path
:return: a list of replicated test methods
"""
test_case_files = cls._search_test_case_files(test_case, cls.TEST_CASE_FILE_PATTERN)
test_cases = []
for test_case_file in test_case_files:
test_cases += cls._search_cases_from_file(test_case_file)
# handle replicate cases
test_case_out = []
for case in test_cases:
test_case_out += cls.replicate_case(case)
return test_case_out

View file

View file

@ -0,0 +1,26 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXAPI = sphinx-apidoc
SPHINXAPISRC = ..
SPHINXBUILD = python -msphinx
SPHINXPROJ = TinyTestFW
SOURCEDIR = .
BUILDDIR = _build
# define the files to be excluded here
EXCLUEDLIST = "$(SPHINXAPISRC)/example.py"
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXAPI) -o $(SOURCEDIR) $(SPHINXAPISRC) $(EXCLUEDLIST)
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

0
tools/tiny-test-fw/docs/_static/.keep vendored Normal file
View file

View file

@ -0,0 +1,159 @@
# -*- coding: utf-8 -*-
#
# TinyTestFW documentation build configuration file, created by
# sphinx-quickstart on Thu Sep 21 20:19:12 2017.
#
# This file is execfile()d with the current directory set to its
# containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath('..'))
# import sphinx_rtd_theme
# -- General configuration ------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = ['sphinx.ext.autodoc',
'sphinx.ext.viewcode']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'TinyTestFW'
copyright = u'2017, Espressif'
author = u'Espressif'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = u'0.1'
# The full version, including alpha/beta/rc tags.
release = u'0.1'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This patterns also effect to html_static_path and html_extra_path
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = False
# -- Options for HTML output ----------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'sphinx_rtd_theme'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# -- Options for HTMLHelp output ------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = 'TinyTestFWdoc'
# -- Options for LaTeX output ---------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'TinyTestFW.tex', u'TinyTestFW Documentation',
u'He Yinling', 'manual'),
]
# -- Options for manual page output ---------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'tinytestfw', u'TinyTestFW Documentation',
[author], 1)
]
# -- Options for Texinfo output -------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'TinyTestFW', u'TinyTestFW Documentation',
author, 'TinyTestFW', 'One line description of project.',
'Miscellaneous'),
]

View file

@ -0,0 +1,139 @@
.. TinyTestFW documentation master file, created by
sphinx-quickstart on Thu Sep 21 20:19:12 2017.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to TinyTestFW's documentation!
======================================
We have a lot of test which depends on interact with DUT via communication port.
Usually we send command to the port and then check response to see if the test succeed.
TinyTestFW is designed for such scenarios.
It supports ESP-IDF applications and is able for other applications by writing new bundles.
Test FW features
----------------
1. Test Environment:
1. DUT: DUT provides methods to interact with DUT
* read/write through port
* expect method which supports expect one or multiple string or RegEx
* tool methods provided by the tool bundle, like ``start_app``, ``reset``
2. App:
* provide some specific features to the test application of DUT, for example:
* SDK path
* SDK tools
* application information like partition table, download configs
3. Environment Configs:
* support get env configs from config file or auto-detect from current PC
* provide ``get_variable`` method to get variables
2. allow to customize components (DUT, App) to support different devices
3. Integrate to CI:
* provide interfaces for Gitlab-CI
* provide ``search case`` and ``runner`` interfaces, able to integrate with other CI
Example
-------
Let's first check a simple simple::
import re
import os
import sys
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path:
sys.path.insert(0, test_fw_path)
import TinyFW
from IDF import IDFApp, IDFDUT
@TinyFW.test_method(app=IDFApp.Example, dut=IDFDUT.IDFDUT, env_tag="Example_WIFI",
chip="ESP32", module="examples", execution_time=1)
def test_examples_protocol_https_request(env, extra_data):
"""
steps: |
1. join AP
2. connect to www.howsmyssl.com:443
3. send http request
"""
dut1 = env.get_dut("https_request", "examples/protocols/https_request")
dut1.start_app()
dut1.expect("Connecting to www.howsmyssl.com:443", timeout=30)
dut1.expect("Performing the SSL/TLS handshake")
dut1.expect("Certificate verified.", timeout=15)
dut1.expect_all(re.compile(r"Cipher suite is TLS-ECDHE-RSA-WITH-AES-128-GCM-SHA256"),
"Reading HTTP response",
timeout=20)
dut1.expect(re.compile(r"Completed (\d) requests"))
if __name__ == '__main__':
TinyFW.set_default_config(config_file="EnvConfigTemplate.yml")
test_examples_protocol_https_request()
SOP for adding test cases
-------------------------
1. import test framework:
^^^^^^^^^^^^^^^^^^^^^^^^^
* we assume ``TEST_FW_PATH`` is pre-defined before running the tests
* Then we can import python packages and files from ``TEST_FW_PATH``
2. define test case:
^^^^^^^^^^^^^^^^^^^^
1. define test case ``test_xxx(env, extra_data)``
* env: instance of test env, see :doc:`Test Env <Env>` for details
* extra_data: extra data passed from test case caller
2. add decorator for test case
* add decorator ``TinyFW.test_method`` to test method
* define default case configs and filters in decorator, see :doc:`TinyFW.test_method <TinyFW>`
3. execute test cases:
^^^^^^^^^^^^^^^^^^^^^^
* define in ``main`` section and execute from this file
1. set preset configs(optional). If the config is not define in case decorator, it will use the preset configs.
2. call test case method:
* if you don't pass any arguments, it will use default values
* you can pass ``extra_data`` to test case by adding ``extra_data=some_data`` as kwarg of test case method.
default value for extra_data is None.
* you can overwrite test case config by adding them as kwarg of test case method.
It will overwrite preset configs and case default configs.
Examples::
test_examples_protocol_https_request(extra_data=["data1", "data2"], dut=SomeOtherDUT, env_tag="OtherEnv")
* or, use ``runner`` to execute. see :doc:`runner <Runner>` for details
.. toctree::
:maxdepth: 2
:caption: Contents:
modules
Dependency
==========
Support for both Python2 and Python3 (tested on python 2.7.13 and 3.6.2).
The following 3rd party lib is required:
* pyserial
* pyyaml
* xunitgen
To build document, we need to install ``Sphinx`` and ``sphinx-rtd-theme`` (you may replace this with your own theme).
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

View file

@ -0,0 +1,51 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" example of writing test with TinyTestFW """
import re
import os
import sys
# if we want to run test case outside `tiny-test-fw` folder,
# we need to insert tiny-test-fw path into sys path
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
@IDF.idf_example_test(env_tag="Example_WIFI")
def test_examples_protocol_https_request(env, extra_data):
"""
steps: |
1. join AP
2. connect to www.howsmyssl.com:443
3. send http request
"""
dut1 = env.get_dut("https_request", "examples/protocols/https_request")
dut1.start_app()
dut1.expect(re.compile(r"Connecting to www.howsmyssl.com:443"), timeout=30)
dut1.expect("Performing the SSL/TLS handshake")
dut1.expect("Certificate verified.", timeout=15)
dut1.expect_all(re.compile(r"Cipher suite is TLS-ECDHE-RSA-WITH-AES-128-GCM-SHA256"),
"Reading HTTP response",
timeout=20)
dut1.expect(re.compile(r"Completed (\d) requests"))
if __name__ == '__main__':
TinyFW.set_default_config(config_file="EnvConfigTemplate.yml", dut=IDF.IDFDUT)
test_examples_protocol_https_request()