From 015922f8d9939347044d2002b8a490bb27cbf8aa Mon Sep 17 00:00:00 2001 From: Anurag Kar Date: Fri, 10 May 2019 03:06:56 +0530 Subject: [PATCH] esp_prov : Runtime discovery of Service UUID and endpoint name mapping List of changes: * Retrieve UUID property from Bluez device object before connecting to retrieve UUID contained in advertisement * Read Characteristic User Descriptions attribute of each UUID for mapping endpoint names * To support older implementations with hardcoded Name-UUID map, revert to fallback mode in order if advertisement data has no UUID field --- tools/esp_prov/esp_prov.py | 7 ++ tools/esp_prov/transport/ble_cli.py | 128 +++++++++++++++++----- tools/esp_prov/transport/transport_ble.py | 21 +++- 3 files changed, 122 insertions(+), 34 deletions(-) diff --git a/tools/esp_prov/esp_prov.py b/tools/esp_prov/esp_prov.py index 21d15e084..88b58c1bb 100644 --- a/tools/esp_prov/esp_prov.py +++ b/tools/esp_prov/esp_prov.py @@ -60,6 +60,13 @@ def get_transport(sel_transport, softap_endpoint=None, ble_devname=None): if (sel_transport == 'softap'): tp = transport.Transport_Softap(softap_endpoint) elif (sel_transport == 'ble'): + # BLE client is now capable of automatically figuring out + # the primary service from the advertisement data and the + # characteristics corresponding to each endpoint. + # Below, the service_uuid field and 16bit UUIDs in the nu_lookup + # table are provided only to support devices running older firmware, + # in which case, the automated discovery will fail and the client + # will fallback to using the provided UUIDs instead tp = transport.Transport_BLE(devname=ble_devname, service_uuid='0000ffff-0000-1000-8000-00805f9b34fb', nu_lookup={'prov-session': 'ff51', diff --git a/tools/esp_prov/transport/ble_cli.py b/tools/esp_prov/transport/ble_cli.py index 04d2921ac..ad80bf2dd 100644 --- a/tools/esp_prov/transport/ble_cli.py +++ b/tools/esp_prov/transport/ble_cli.py @@ -15,6 +15,7 @@ from __future__ import print_function from builtins import input +from future.utils import iteritems import platform @@ -40,20 +41,24 @@ if platform.system() == 'Linux': # BLE client (Linux Only) using Bluez and DBus class BLE_Bluez_Client: - def connect(self, devname, iface, srv_uuid): + def connect(self, devname, iface, chrc_names, fallback_srv_uuid): self.devname = devname - self.srv_uuid = srv_uuid + self.srv_uuid_fallback = fallback_srv_uuid + self.chrc_names = [name.lower() for name in chrc_names] self.device = None self.adapter = None self.adapter_props = None self.services = None + self.nu_lookup = None + self.characteristics = dict() + self.srv_uuid_adv = None dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") objects = manager.GetManagedObjects() - for path, interfaces in objects.items(): + for path, interfaces in iteritems(objects): adapter = interfaces.get("org.bluez.Adapter1") if adapter is not None: if path.endswith(iface): @@ -94,8 +99,8 @@ class BLE_Bluez_Client: manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") objects = manager.GetManagedObjects() dev_path = None - for path, interfaces in objects.items(): - if "org.bluez.Device1" not in interfaces.keys(): + for path, interfaces in iteritems(objects): + if "org.bluez.Device1" not in interfaces: continue if interfaces["org.bluez.Device1"].get("Name") == self.devname: dev_path = path @@ -106,6 +111,19 @@ class BLE_Bluez_Client: try: self.device = bus.get_object("org.bluez", dev_path) + try: + uuids = self.device.Get('org.bluez.Device1', 'UUIDs', + dbus_interface='org.freedesktop.DBus.Properties') + # There should be 1 service UUID in advertising data + # If bluez had cached an old version of the advertisement data + # the list of uuids may be incorrect, in which case connection + # or service discovery may fail the first time. If that happens + # the cache will be refreshed before next retry + if len(uuids) == 1: + self.srv_uuid_adv = uuids[0] + except dbus.exceptions.DBusException as e: + print(e) + self.device.Connect(dbus_interface='org.bluez.Device1') except Exception as e: print(e) @@ -116,35 +134,84 @@ class BLE_Bluez_Client: bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") objects = manager.GetManagedObjects() - srv_path = None - for path, interfaces in objects.items(): - if "org.bluez.GattService1" not in interfaces.keys(): + service_found = False + for srv_path, srv_interfaces in iteritems(objects): + if "org.bluez.GattService1" not in srv_interfaces: continue - if path.startswith(self.device.object_path): - service = bus.get_object("org.bluez", path) - uuid = service.Get('org.bluez.GattService1', 'UUID', + if not srv_path.startswith(self.device.object_path): + continue + service = bus.get_object("org.bluez", srv_path) + srv_uuid = service.Get('org.bluez.GattService1', 'UUID', dbus_interface='org.freedesktop.DBus.Properties') - if uuid == self.srv_uuid: - srv_path = path - break - if srv_path is None: - self.device.Disconnect(dbus_interface='org.bluez.Device1') - self.device = None - raise RuntimeError("Provisioning service not found") - - self.characteristics = dict() - for path, interfaces in objects.items(): - if "org.bluez.GattCharacteristic1" not in interfaces.keys(): + # If service UUID doesn't match the one found in advertisement data + # then also check if it matches the fallback UUID + if srv_uuid not in [self.srv_uuid_adv, self.srv_uuid_fallback]: continue - if path.startswith(srv_path): - chrc = bus.get_object("org.bluez", path) + + nu_lookup = dict() + characteristics = dict() + for chrc_path, chrc_interfaces in iteritems(objects): + if "org.bluez.GattCharacteristic1" not in chrc_interfaces: + continue + if not chrc_path.startswith(service.object_path): + continue + chrc = bus.get_object("org.bluez", chrc_path) uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID', dbus_interface='org.freedesktop.DBus.Properties') - self.characteristics[uuid] = chrc + characteristics[uuid] = chrc + for desc_path, desc_interfaces in iteritems(objects): + if "org.bluez.GattDescriptor1" not in desc_interfaces: + continue + if not desc_path.startswith(chrc.object_path): + continue + desc = bus.get_object("org.bluez", desc_path) + desc_uuid = desc.Get('org.bluez.GattDescriptor1', 'UUID', + dbus_interface='org.freedesktop.DBus.Properties') + if desc_uuid[4:8] != '2901': + continue + try: + readval = desc.ReadValue({}, dbus_interface='org.bluez.GattDescriptor1') + except dbus.exceptions.DBusException: + break + found_name = ''.join(chr(b) for b in readval).lower() + nu_lookup[found_name] = uuid + break + + match_found = True + for name in self.chrc_names: + if name not in nu_lookup: + # Endpoint name not present + match_found = False + break + + # Create lookup table only if all endpoint names found + self.nu_lookup = [None, nu_lookup][match_found] + self.characteristics = characteristics + service_found = True + + # If the service UUID matches that in the advertisement + # we can stop the search now. If it doesn't match, we + # have found the service corresponding to the fallback + # UUID, in which case don't break and keep searching + # for the advertised service + if srv_uuid == self.srv_uuid_adv: + break + + if not service_found: + self.device.Disconnect(dbus_interface='org.bluez.Device1') + if self.adapter: + self.adapter.RemoveDevice(self.device) + self.device = None + self.nu_lookup = None + self.characteristics = dict() + raise RuntimeError("Provisioning service not found") + + def get_nu_lookup(self): + return self.nu_lookup def has_characteristic(self, uuid): - if uuid in self.characteristics.keys(): + if uuid in self.characteristics: return True return False @@ -154,6 +221,8 @@ class BLE_Bluez_Client: if self.adapter: self.adapter.RemoveDevice(self.device) self.device = None + self.nu_lookup = None + self.characteristics = dict() if self.adapter_props: self.adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(0)) @@ -180,7 +249,7 @@ class BLE_Bluez_Client: # Console based BLE client for Cross Platform support class BLE_Console_Client: - def connect(self, devname, iface, srv_uuid): + def connect(self, devname, iface, chrc_names, fallback_srv_uuid): print("BLE client is running in console mode") print("\tThis could be due to your platform not being supported or dependencies not being met") print("\tPlease ensure all pre-requisites are met to run the full fledged client") @@ -189,11 +258,14 @@ class BLE_Console_Client: if resp != 'Y' and resp != 'y': return False print("BLECLI >> List available attributes of the connected device") - resp = input("BLECLI >> Is the service UUID '" + srv_uuid + "' listed among available attributes? [y/n] ") + resp = input("BLECLI >> Is the service UUID '" + fallback_srv_uuid + "' listed among available attributes? [y/n] ") if resp != 'Y' and resp != 'y': return False return True + def get_nu_lookup(self): + return None + def has_characteristic(self, uuid): resp = input("BLECLI >> Is the characteristic UUID '" + uuid + "' listed among available attributes? [y/n] ") if resp != 'Y' and resp != 'y': diff --git a/tools/esp_prov/transport/transport_ble.py b/tools/esp_prov/transport/transport_ble.py index d0d26a836..333d95105 100644 --- a/tools/esp_prov/transport/transport_ble.py +++ b/tools/esp_prov/transport/transport_ble.py @@ -27,19 +27,28 @@ class Transport_BLE(Transport): # Calculate characteristic UUID for each endpoint nu_lookup[name] = service_uuid[:4] + '{:02x}'.format( int(nu_lookup[name], 16) & int(service_uuid[4:8], 16)) + service_uuid[8:] - self.name_uuid_lookup = nu_lookup # Get BLE client module self.cli = ble_cli.get_client() # Use client to connect to BLE device and bind to service - if not self.cli.connect(devname=devname, iface='hci0', srv_uuid=service_uuid): + if not self.cli.connect(devname=devname, iface='hci0', + chrc_names=nu_lookup.keys(), + fallback_srv_uuid=service_uuid): raise RuntimeError("Failed to initialize transport") - # Check if expected characteristics are provided by the service - for name in self.name_uuid_lookup.keys(): - if not self.cli.has_characteristic(self.name_uuid_lookup[name]): - raise RuntimeError("'" + name + "' endpoint not found") + # Irrespective of provided parameters, let the client + # generate a lookup table by reading advertisement data + # and characteristic user descriptors + self.name_uuid_lookup = self.cli.get_nu_lookup() + + # If that doesn't work, use the lookup table provided as parameter + if self.name_uuid_lookup is None: + self.name_uuid_lookup = nu_lookup + # Check if expected characteristics are provided by the service + for name in self.name_uuid_lookup.keys(): + if not self.cli.has_characteristic(self.name_uuid_lookup[name]): + raise RuntimeError("'" + name + "' endpoint not found") def __del__(self): # Make sure device is disconnected before application gets closed