diff --git a/.flake8 b/.flake8 index d87d01485..171b2bd42 100644 --- a/.flake8 +++ b/.flake8 @@ -78,7 +78,7 @@ select = E275, # missing whitespace after keyword E301, # expected 1 blank line, found 0 E302, # expected 2 blank lines, found 0 - E303, # too many blank lines + E303, # too many blank lines E304, # blank lines found after function decorator E305, # expected 2 blank lines after end of function or class E306, # expected 1 blank line before a nested definition @@ -166,9 +166,6 @@ exclude = components/ulp/esp32ulp_mapgen.py, components/wifi_provisioning/python/wifi_config_pb2.py, components/wifi_provisioning/python/wifi_constants_pb2.py, - examples/provisioning/ble_prov/ble_prov_test.py, - examples/provisioning/softap_prov/softap_prov_test.py, - examples/provisioning/softap_prov/utils/wifi_tools.py, tools/ci/apply_bot_filter.py, tools/cmake/convert_to_cmake.py, tools/esp_app_trace/apptrace_proc.py, @@ -180,7 +177,6 @@ exclude = tools/esp_app_trace/pylibelf/types/__init__.py, tools/esp_app_trace/pylibelf/util/__init__.py, tools/esp_app_trace/pylibelf/util/syms/__init__.py, - tools/esp_prov/esp_prov.py, tools/esp_prov/proto/__init__.py, tools/esp_prov/prov/__init__.py, tools/esp_prov/prov/custom_prov.py, @@ -190,7 +186,6 @@ exclude = tools/esp_prov/security/security0.py, tools/esp_prov/security/security1.py, tools/esp_prov/transport/__init__.py, - tools/esp_prov/transport/ble_cli.py, tools/esp_prov/transport/transport.py, tools/esp_prov/transport/transport_ble.py, tools/esp_prov/transport/transport_console.py, diff --git a/examples/provisioning/ble_prov/ble_prov_test.py b/examples/provisioning/ble_prov/ble_prov_test.py index 367dcc555..f379e7d3a 100644 --- a/examples/provisioning/ble_prov/ble_prov_test.py +++ b/examples/provisioning/ble_prov/ble_prov_test.py @@ -15,32 +15,31 @@ # limitations under the License. from __future__ import print_function -import imp import re import os import sys import time -# This environment variable is expected on the host machine -test_fw_path = os.getenv("TEST_FW_PATH") -if test_fw_path and test_fw_path not in sys.path: - sys.path.insert(0, test_fw_path) +try: + import IDF +except ImportError: + test_fw_path = os.getenv("TEST_FW_PATH") + if test_fw_path and test_fw_path not in sys.path: + sys.path.insert(0, test_fw_path) + import IDF -# When running on local machine execute the following before running this script -# > export TEST_FW_PATH='~/esp/esp-idf/tools/tiny-test-fw' -# > make print_flash_cmd | tail -n 1 > build/download.config -# > make app bootloader - -import TinyFW -import IDF - -# Import esp_prov tool -idf_path = os.environ['IDF_PATH'] -esp_prov = imp.load_source("esp_prov", idf_path + "/tools/esp_prov/esp_prov.py") +try: + import esp_prov +except ImportError: + esp_prov_path = os.getenv("IDF_PATH") + "/tools/esp_prov" + if esp_prov_path and esp_prov_path not in sys.path: + sys.path.insert(0, esp_prov_path) + import esp_prov # Have esp_prov throw exception esp_prov.config_throw_except = True + @IDF.idf_example_test(env_tag="Example_WIFI_BT") def test_examples_provisioning_ble(env, extra_data): # Acquire DUT @@ -49,8 +48,8 @@ def test_examples_provisioning_ble(env, extra_data): # Get binary file binary_file = os.path.join(dut1.app.binary_path, "ble_prov.bin") bin_size = os.path.getsize(binary_file) - IDF.log_performance("ble_prov_bin_size", "{}KB".format(bin_size//1024)) - IDF.check_performance("ble_prov_bin_size", bin_size//1024) + IDF.log_performance("ble_prov_bin_size", "{}KB".format(bin_size // 1024)) + IDF.check_performance("ble_prov_bin_size", bin_size // 1024) # Upload binary and start testing dut1.start_app() @@ -73,12 +72,12 @@ def test_examples_provisioning_ble(env, extra_data): print("Getting security") security = esp_prov.get_security(secver, pop, verbose) - if security == None: + if security is None: raise RuntimeError("Failed to get security") print("Getting transport") transport = esp_prov.get_transport(provmode, None, devname) - if transport == None: + if transport is None: raise RuntimeError("Failed to get transport") print("Verifying protocol version") @@ -112,5 +111,6 @@ def test_examples_provisioning_ble(env, extra_data): if not success: raise RuntimeError("Provisioning failed") + if __name__ == '__main__': test_examples_provisioning_ble() diff --git a/examples/provisioning/softap_prov/softap_prov_test.py b/examples/provisioning/softap_prov/softap_prov_test.py index e79bbeae4..b8d8d9c7f 100644 --- a/examples/provisioning/softap_prov/softap_prov_test.py +++ b/examples/provisioning/softap_prov/softap_prov_test.py @@ -15,33 +15,39 @@ # limitations under the License. from __future__ import print_function -import imp import re import os import sys import time -# This environment variable is expected on the host machine -test_fw_path = os.getenv("TEST_FW_PATH") -if test_fw_path and test_fw_path not in sys.path: - sys.path.insert(0, test_fw_path) +try: + import IDF +except ImportError: + test_fw_path = os.getenv("TEST_FW_PATH") + if test_fw_path and test_fw_path not in sys.path: + sys.path.insert(0, test_fw_path) + import IDF -# When running on local machine execute the following before running this script -# > export TEST_FW_PATH='~/esp/esp-idf/tools/tiny-test-fw' -# > make print_flash_cmd | tail -n 1 > build/download.config -# > make app bootloader +try: + import esp_prov +except ImportError: + esp_prov_path = os.getenv("IDF_PATH") + "/tools/esp_prov" + if esp_prov_path and esp_prov_path not in sys.path: + sys.path.insert(0, esp_prov_path) + import esp_prov -import TinyFW -import IDF - -# Import esp_prov tool -idf_path = os.environ['IDF_PATH'] -esp_prov = imp.load_source("esp_prov", idf_path + "/tools/esp_prov/esp_prov.py") -wifi_tools = imp.load_source("wifi_tools", idf_path + "/examples/provisioning/softap_prov/utils/wifi_tools.py") +try: + import wifi_tools +except ImportError: + wifi_tools_path = os.getenv("IDF_PATH") + "/examples/provisioning/softap_prov/utils" + if wifi_tools_path and wifi_tools_path not in sys.path: + sys.path.insert(0, wifi_tools_path) + import wifi_tools # Have esp_prov throw exception esp_prov.config_throw_except = True + @IDF.idf_example_test(env_tag="Example_WIFI_BT") def test_examples_provisioning_softap(env, extra_data): # Acquire DUT @@ -50,8 +56,8 @@ def test_examples_provisioning_softap(env, extra_data): # Get binary file binary_file = os.path.join(dut1.app.binary_path, "softap_prov.bin") bin_size = os.path.getsize(binary_file) - IDF.log_performance("softap_prov_bin_size", "{}KB".format(bin_size//1024)) - IDF.check_performance("softap_prov_bin_size", bin_size//1024) + IDF.log_performance("softap_prov_bin_size", "{}KB".format(bin_size // 1024)) + IDF.check_performance("softap_prov_bin_size", bin_size // 1024) # Upload binary and start testing dut1.start_app() @@ -62,13 +68,13 @@ def test_examples_provisioning_softap(env, extra_data): [ssid, password] = dut1.expect(re.compile(r"SoftAP Provisioning started with SSID '(\S+)', Password '(\S+)'"), timeout=30) iface = wifi_tools.get_wiface_name() - if iface == None: + if iface is None: raise RuntimeError("Failed to get Wi-Fi interface on host") print("Interface name : " + iface) print("SoftAP SSID : " + ssid) print("SoftAP Password : " + password) - ctrl = wifi_tools.wpa_cli(iface, reset_on_exit = True) + ctrl = wifi_tools.wpa_cli(iface, reset_on_exit=True) print("Connecting to DUT SoftAP...") ip = ctrl.connect(ssid, password) got_ip = dut1.expect(re.compile(r"softAP assign IP to station,IP is: (\d+.\d+.\d+.\d+)"), timeout=30)[0] @@ -84,16 +90,16 @@ def test_examples_provisioning_softap(env, extra_data): provmode = "softap" ap_ssid = "myssid" ap_password = "mypassword" - softap_endpoint = ip.split('.')[0] + "." + ip.split('.')[1]+ "." + ip.split('.')[2] + ".1:80" + softap_endpoint = ip.split('.')[0] + "." + ip.split('.')[1] + "." + ip.split('.')[2] + ".1:80" print("Getting security") security = esp_prov.get_security(secver, pop, verbose) - if security == None: + if security is None: raise RuntimeError("Failed to get security") print("Getting transport") transport = esp_prov.get_transport(provmode, softap_endpoint, None) - if transport == None: + if transport is None: raise RuntimeError("Failed to get transport") print("Verifying protocol version") @@ -127,5 +133,6 @@ def test_examples_provisioning_softap(env, extra_data): if not success: raise RuntimeError("Provisioning failed") + if __name__ == '__main__': test_examples_provisioning_softap() diff --git a/examples/provisioning/softap_prov/utils/wifi_tools.py b/examples/provisioning/softap_prov/utils/wifi_tools.py index 1d40111d5..13376e52e 100644 --- a/examples/provisioning/softap_prov/utils/wifi_tools.py +++ b/examples/provisioning/softap_prov/utils/wifi_tools.py @@ -18,12 +18,14 @@ import dbus.mainloop.glib import netifaces import time + def get_wiface_name(): for iface in netifaces.interfaces(): if iface.startswith('w'): return iface return None + def get_wiface_IPv4(iface): try: [info] = netifaces.ifaddresses(iface)[netifaces.AF_INET] @@ -31,8 +33,9 @@ def get_wiface_IPv4(iface): except KeyError: return None + class wpa_cli: - def __init__(self, iface, reset_on_exit = False): + def __init__(self, iface, reset_on_exit=False): self.iface_name = iface self.iface_obj = None self.iface_ifc = None @@ -43,26 +46,27 @@ class wpa_cli: dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() - service = dbus.Interface(bus.get_object("fi.w1.wpa_supplicant1", "/fi/w1/wpa_supplicant1"), "fi.w1.wpa_supplicant1") - paths = service.Get("fi.w1.wpa_supplicant1", "Interfaces", dbus_interface='org.freedesktop.DBus.Properties') + service = dbus.Interface(bus.get_object("fi.w1.wpa_supplicant1", "/fi/w1/wpa_supplicant1"), + "fi.w1.wpa_supplicant1") iface_path = service.GetInterface(self.iface_name) self.iface_obj = bus.get_object("fi.w1.wpa_supplicant1", iface_path) self.iface_ifc = dbus.Interface(self.iface_obj, "fi.w1.wpa_supplicant1.Interface") - if self.iface_ifc == None: + if self.iface_ifc is None: raise RuntimeError('supplicant : Failed to fetch interface') - self.old_network = self.iface_obj.Get("fi.w1.wpa_supplicant1.Interface", "CurrentNetwork", dbus_interface='org.freedesktop.DBus.Properties') + self.old_network = self.iface_obj.Get("fi.w1.wpa_supplicant1.Interface", "CurrentNetwork", + dbus_interface='org.freedesktop.DBus.Properties') if self.old_network == '/': self.old_network = None else: self.connected = True def connect(self, ssid, password): - if self.connected == True: + if self.connected is True: self.iface_ifc.Disconnect() self.connected = False - if self.new_network != None: + if self.new_network is not None: self.iface_ifc.RemoveNetwork(self.new_network) self.new_network = self.iface_ifc.AddNetwork({"ssid": ssid, "psk": password}) @@ -73,7 +77,7 @@ class wpa_cli: while retry > 0: time.sleep(5) ip = get_wiface_IPv4(self.iface_name) - if ip != None: + if ip is not None: self.connected = True return ip retry -= 1 @@ -82,17 +86,17 @@ class wpa_cli: raise RuntimeError('wpa_cli : Connection failed') def reset(self): - if self.iface_ifc != None: - if self.connected == True: + if self.iface_ifc is not None: + if self.connected is True: self.iface_ifc.Disconnect() self.connected = False - if self.new_network != None: + if self.new_network is not None: self.iface_ifc.RemoveNetwork(self.new_network) self.new_network = None - if self.old_network != None: + if self.old_network is not None: self.iface_ifc.SelectNetwork(self.old_network) self.old_network = None def __del__(self): - if self.reset_on_exit == True: + if self.reset_on_exit is True: self.reset() diff --git a/tools/esp_prov/esp_prov.py b/tools/esp_prov/esp_prov.py index 4d7c0fee9..dbc8cd2c2 100644 --- a/tools/esp_prov/esp_prov.py +++ b/tools/esp_prov/esp_prov.py @@ -21,23 +21,31 @@ import time import os import sys -idf_path = os.environ['IDF_PATH'] -sys.path.insert(0, idf_path + "/components/protocomm/python") -sys.path.insert(1, idf_path + "/tools/esp_prov") +try: + import security + import transport + import prov -import security -import transport -import prov +except ImportError: + idf_path = os.environ['IDF_PATH'] + sys.path.insert(0, idf_path + "/components/protocomm/python") + sys.path.insert(1, idf_path + "/tools/esp_prov") + + import security + import transport + import prov # Set this to true to allow exceptions to be thrown config_throw_except = False + def on_except(err): if config_throw_except: raise RuntimeError(err) else: print(err) + def get_security(secver, pop=None, verbose=False): if secver == 1: return security.Security1(pop, verbose) @@ -45,19 +53,19 @@ def get_security(secver, pop=None, verbose=False): return security.Security0(verbose) return None + def get_transport(sel_transport, softap_endpoint=None, ble_devname=None): try: tp = None if (sel_transport == 'softap'): tp = transport.Transport_Softap(softap_endpoint) elif (sel_transport == 'ble'): - tp = transport.Transport_BLE(devname = ble_devname, - service_uuid = '0000ffff-0000-1000-8000-00805f9b34fb', - nu_lookup = { - 'prov-session': 'ff51', - 'prov-config' : 'ff52', - 'proto-ver' : 'ff53' - }) + tp = transport.Transport_BLE(devname=ble_devname, + service_uuid='0000ffff-0000-1000-8000-00805f9b34fb', + nu_lookup={'prov-session': 'ff51', + 'prov-config': 'ff52', + 'proto-ver': 'ff53' + }) elif (sel_transport == 'console'): tp = transport.Transport_Console() return tp @@ -65,6 +73,7 @@ def get_transport(sel_transport, softap_endpoint=None, ble_devname=None): on_except(e) return None + def version_match(tp, protover): try: response = tp.send_data('proto-ver', protover) @@ -75,21 +84,23 @@ def version_match(tp, protover): on_except(e) return None + def establish_session(tp, sec): try: response = None while True: request = sec.security_session(response) - if request == None: + if request is None: break response = tp.send_data('prov-session', request) - if (response == None): + if (response is None): return False return True except RuntimeError as e: on_except(e) return None + def custom_config(tp, sec, custom_info, custom_ver): try: message = prov.custom_config_request(sec, custom_info, custom_ver) @@ -99,6 +110,7 @@ def custom_config(tp, sec, custom_info, custom_ver): on_except(e) return None + def send_wifi_config(tp, sec, ssid, passphrase): try: message = prov.config_set_config_request(sec, ssid, passphrase) @@ -108,6 +120,7 @@ def send_wifi_config(tp, sec, ssid, passphrase): on_except(e) return None + def apply_wifi_config(tp, sec): try: message = prov.config_apply_config_request(sec) @@ -117,6 +130,7 @@ def apply_wifi_config(tp, sec): on_except(e) return None + def get_wifi_config(tp, sec): try: message = prov.config_get_status_request(sec) @@ -126,80 +140,80 @@ def get_wifi_config(tp, sec): on_except(e) return None + if __name__ == '__main__': parser = argparse.ArgumentParser(description="Generate ESP prov payload") - parser.add_argument("--ssid", dest = 'ssid', type = str, - help = "SSID of Wi-Fi Network", required = True) - parser.add_argument("--passphrase", dest = 'passphrase', type = str, - help = "Passphrase of Wi-Fi network", default = '') + parser.add_argument("--ssid", dest='ssid', type=str, + help="SSID of Wi-Fi Network", required=True) + parser.add_argument("--passphrase", dest='passphrase', type=str, + help="Passphrase of Wi-Fi network", default='') - parser.add_argument("--sec_ver", dest = 'secver', type = int, - help = "Security scheme version", default = 1) - parser.add_argument("--proto_ver", dest = 'protover', type = str, - help = "Protocol version", default = 'V0.1') - parser.add_argument("--pop", dest = 'pop', type = str, - help = "Proof of possession", default = '') + parser.add_argument("--sec_ver", dest='secver', type=int, + help="Security scheme version", default=1) + parser.add_argument("--proto_ver", dest='protover', type=str, + help="Protocol version", default='V0.1') + parser.add_argument("--pop", dest='pop', type=str, + help="Proof of possession", default='') - parser.add_argument("--softap_endpoint", dest = 'softap_endpoint', type = str, - help = ", http(s):// shouldn't be included", default = '192.168.4.1:80') + parser.add_argument("--softap_endpoint", dest='softap_endpoint', type=str, + help=", http(s):// shouldn't be included", default='192.168.4.1:80') - parser.add_argument("--ble_devname", dest = 'ble_devname', type = str, - help = "BLE Device Name", default = '') + parser.add_argument("--ble_devname", dest='ble_devname', type=str, + help="BLE Device Name", default='') - parser.add_argument("--transport", dest = 'provmode', type = str, - help = "provisioning mode i.e console or softap or ble", default = 'softap') + parser.add_argument("--transport", dest='provmode', type=str, + help="provisioning mode i.e console or softap or ble", default='softap') parser.add_argument("--custom_config", help="Provision Custom Configuration", - action = "store_true") - parser.add_argument("--custom_info", dest = 'custom_info', type = str, - help = "Custom Config Info String", default = '') - parser.add_argument("--custom_ver", dest = 'custom_ver', type = int, - help = "Custom Config Version Number", default = 2) + action="store_true") + parser.add_argument("--custom_info", dest='custom_info', type=str, + help="Custom Config Info String", default='') + parser.add_argument("--custom_ver", dest='custom_ver', type=int, + help="Custom Config Version Number", default=2) - parser.add_argument("-v","--verbose", help = "increase output verbosity", - action = "store_true") + parser.add_argument("-v","--verbose", help="increase output verbosity", action="store_true") args = parser.parse_args() print("==== Esp_Prov Version: " + args.protover + " ====") - security = get_security(args.secver, args.pop, args.verbose) - if security == None: + obj_security = get_security(args.secver, args.pop, args.verbose) + if obj_security is None: print("---- Invalid Security Version ----") exit(1) - transport = get_transport(args.provmode, args.softap_endpoint, args.ble_devname) - if transport == None: + obj_transport = get_transport(args.provmode, args.softap_endpoint, args.ble_devname) + if obj_transport is None: print("---- Invalid provisioning mode ----") exit(2) print("\n==== Verifying protocol version ====") - if not version_match(transport, args.protover): + if not version_match(obj_transport, args.protover): print("---- Error in protocol version matching ----") exit(3) print("==== Verified protocol version successfully ====") print("\n==== Starting Session ====") - if not establish_session(transport, security): + if not establish_session(obj_transport, obj_security): print("---- Error in establishing session ----") exit(4) print("==== Session Established ====") if args.custom_config: print("\n==== Sending Custom config to esp32 ====") - if not custom_config(transport, security, args.custom_info, args.custom_ver): + if not custom_config(obj_transport, obj_security, args.custom_info, args.custom_ver): print("---- Error in custom config ----") exit(5) print("==== Custom config sent successfully ====") print("\n==== Sending Wi-Fi credential to esp32 ====") - if not send_wifi_config(transport, security, args.ssid, args.passphrase): + if not send_wifi_config(obj_transport, obj_security, args.ssid, args.passphrase): print("---- Error in send Wi-Fi config ----") exit(6) print("==== Wi-Fi Credentials sent successfully ====") print("\n==== Applying config to esp32 ====") - if not apply_wifi_config(transport, security): + if not apply_wifi_config(obj_transport, obj_security): print("---- Error in apply Wi-Fi config ----") exit(7) print("==== Apply config sent successfully ====") @@ -207,7 +221,7 @@ if __name__ == '__main__': while True: time.sleep(5) print("\n==== Wi-Fi connection state ====") - ret = get_wifi_config(transport, security) + ret = get_wifi_config(obj_transport, obj_security) if (ret == 1): continue elif (ret == 0): diff --git a/tools/esp_prov/transport/ble_cli.py b/tools/esp_prov/transport/ble_cli.py index 9ba46a695..b6ea225c6 100644 --- a/tools/esp_prov/transport/ble_cli.py +++ b/tools/esp_prov/transport/ble_cli.py @@ -22,6 +22,7 @@ import utils fallback = True + # Check if platform is Linux and required packages are installed # else fallback to console mode if platform.system() == 'Linux': @@ -30,10 +31,12 @@ if platform.system() == 'Linux': import dbus.mainloop.glib import time fallback = False - except: + except ImportError: pass -#-------------------------------------------------------------------- + +# -------------------------------------------------------------------- + # BLE client (Linux Only) using Bluez and DBus class BLE_Bluez_Client: @@ -52,13 +55,13 @@ class BLE_Bluez_Client: for path, interfaces in objects.items(): adapter = interfaces.get("org.bluez.Adapter1") - if adapter != None: + if adapter is not None: if path.endswith(iface): self.adapter = dbus.Interface(bus.get_object("org.bluez", path), "org.bluez.Adapter1") self.adapter_props = dbus.Interface(bus.get_object("org.bluez", path), "org.freedesktop.DBus.Properties") break - if self.adapter == None: + if self.adapter is None: raise RuntimeError("Bluetooth adapter not found") self.adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1)) @@ -67,7 +70,7 @@ class BLE_Bluez_Client: retry = 10 while (retry > 0): try: - if self.device == None: + if self.device is None: print("Connecting...") # Wait for device to be discovered time.sleep(5) @@ -98,7 +101,7 @@ class BLE_Bluez_Client: dev_path = path break - if dev_path == None: + if dev_path is None: raise RuntimeError("BLE device not found") try: @@ -120,12 +123,12 @@ class BLE_Bluez_Client: if path.startswith(self.device.object_path): service = bus.get_object("org.bluez", path) uuid = service.Get('org.bluez.GattService1', 'UUID', - dbus_interface='org.freedesktop.DBus.Properties') + dbus_interface='org.freedesktop.DBus.Properties') if uuid == self.srv_uuid: srv_path = path break - if srv_path == None: + if srv_path is None: self.device.Disconnect(dbus_interface='org.bluez.Device1') self.device = None raise RuntimeError("Provisioning service not found") @@ -137,7 +140,7 @@ class BLE_Bluez_Client: if path.startswith(srv_path): chrc = bus.get_object("org.bluez", path) uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID', - dbus_interface='org.freedesktop.DBus.Properties') + dbus_interface='org.freedesktop.DBus.Properties') self.characteristics[uuid] = chrc def has_characteristic(self, uuid): @@ -162,7 +165,9 @@ class BLE_Bluez_Client: path.WriteValue([ord(c) for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1') return ''.join(chr(b) for b in path.ReadValue({}, dbus_interface='org.bluez.GattCharacteristic1')) -#-------------------------------------------------------------------- + +# -------------------------------------------------------------------- + # Console based BLE client for Cross Platform support class BLE_Console_Client: @@ -196,7 +201,9 @@ class BLE_Console_Client: resp = input("\t<< ") return utils.hexstr_to_str(resp) -#-------------------------------------------------------------------- + +# -------------------------------------------------------------------- + # Function to get client instance depending upon platform def get_client():