esp_prov : Runtime discovery of Service UUID and endpoint name mapping

List of changes:
* Retrieve UUID property from Bluez device object before connecting to retrieve UUID contained in advertisement
* Read Characteristic User Descriptions attribute of each UUID for mapping endpoint names
* To support older implementations with hardcoded Name-UUID map, revert to fallback mode in order if advertisement data has no UUID field
This commit is contained in:
Anurag Kar 2019-05-10 03:06:56 +05:30 committed by bot
parent bc83d470e3
commit 56866567ae
3 changed files with 122 additions and 34 deletions

View file

@ -60,6 +60,13 @@ def get_transport(sel_transport, softap_endpoint=None, ble_devname=None):
if (sel_transport == 'softap'): if (sel_transport == 'softap'):
tp = transport.Transport_Softap(softap_endpoint) tp = transport.Transport_Softap(softap_endpoint)
elif (sel_transport == 'ble'): elif (sel_transport == 'ble'):
# BLE client is now capable of automatically figuring out
# the primary service from the advertisement data and the
# characteristics corresponding to each endpoint.
# Below, the service_uuid field and 16bit UUIDs in the nu_lookup
# table are provided only to support devices running older firmware,
# in which case, the automated discovery will fail and the client
# will fallback to using the provided UUIDs instead
tp = transport.Transport_BLE(devname=ble_devname, tp = transport.Transport_BLE(devname=ble_devname,
service_uuid='0000ffff-0000-1000-8000-00805f9b34fb', service_uuid='0000ffff-0000-1000-8000-00805f9b34fb',
nu_lookup={'prov-session': 'ff51', nu_lookup={'prov-session': 'ff51',

View file

@ -15,6 +15,7 @@
from __future__ import print_function from __future__ import print_function
from builtins import input from builtins import input
from future.utils import iteritems
import platform import platform
@ -40,20 +41,24 @@ if platform.system() == 'Linux':
# BLE client (Linux Only) using Bluez and DBus # BLE client (Linux Only) using Bluez and DBus
class BLE_Bluez_Client: class BLE_Bluez_Client:
def connect(self, devname, iface, srv_uuid): def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
self.devname = devname self.devname = devname
self.srv_uuid = srv_uuid self.srv_uuid_fallback = fallback_srv_uuid
self.chrc_names = [name.lower() for name in chrc_names]
self.device = None self.device = None
self.adapter = None self.adapter = None
self.adapter_props = None self.adapter_props = None
self.services = None self.services = None
self.nu_lookup = None
self.characteristics = dict()
self.srv_uuid_adv = None
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus() bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")
objects = manager.GetManagedObjects() objects = manager.GetManagedObjects()
for path, interfaces in objects.items(): for path, interfaces in iteritems(objects):
adapter = interfaces.get("org.bluez.Adapter1") adapter = interfaces.get("org.bluez.Adapter1")
if adapter is not None: if adapter is not None:
if path.endswith(iface): if path.endswith(iface):
@ -94,8 +99,8 @@ class BLE_Bluez_Client:
manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")
objects = manager.GetManagedObjects() objects = manager.GetManagedObjects()
dev_path = None dev_path = None
for path, interfaces in objects.items(): for path, interfaces in iteritems(objects):
if "org.bluez.Device1" not in interfaces.keys(): if "org.bluez.Device1" not in interfaces:
continue continue
if interfaces["org.bluez.Device1"].get("Name") == self.devname: if interfaces["org.bluez.Device1"].get("Name") == self.devname:
dev_path = path dev_path = path
@ -106,6 +111,19 @@ class BLE_Bluez_Client:
try: try:
self.device = bus.get_object("org.bluez", dev_path) self.device = bus.get_object("org.bluez", dev_path)
try:
uuids = self.device.Get('org.bluez.Device1', 'UUIDs',
dbus_interface='org.freedesktop.DBus.Properties')
# There should be 1 service UUID in advertising data
# If bluez had cached an old version of the advertisement data
# the list of uuids may be incorrect, in which case connection
# or service discovery may fail the first time. If that happens
# the cache will be refreshed before next retry
if len(uuids) == 1:
self.srv_uuid_adv = uuids[0]
except dbus.exceptions.DBusException as e:
print(e)
self.device.Connect(dbus_interface='org.bluez.Device1') self.device.Connect(dbus_interface='org.bluez.Device1')
except Exception as e: except Exception as e:
print(e) print(e)
@ -116,35 +134,84 @@ class BLE_Bluez_Client:
bus = dbus.SystemBus() bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")
objects = manager.GetManagedObjects() objects = manager.GetManagedObjects()
srv_path = None service_found = False
for path, interfaces in objects.items(): for srv_path, srv_interfaces in iteritems(objects):
if "org.bluez.GattService1" not in interfaces.keys(): if "org.bluez.GattService1" not in srv_interfaces:
continue continue
if path.startswith(self.device.object_path): if not srv_path.startswith(self.device.object_path):
service = bus.get_object("org.bluez", path) continue
uuid = service.Get('org.bluez.GattService1', 'UUID', service = bus.get_object("org.bluez", srv_path)
srv_uuid = service.Get('org.bluez.GattService1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties') dbus_interface='org.freedesktop.DBus.Properties')
if uuid == self.srv_uuid:
srv_path = path
break
if srv_path is None: # If service UUID doesn't match the one found in advertisement data
self.device.Disconnect(dbus_interface='org.bluez.Device1') # then also check if it matches the fallback UUID
self.device = None if srv_uuid not in [self.srv_uuid_adv, self.srv_uuid_fallback]:
raise RuntimeError("Provisioning service not found")
self.characteristics = dict()
for path, interfaces in objects.items():
if "org.bluez.GattCharacteristic1" not in interfaces.keys():
continue continue
if path.startswith(srv_path):
chrc = bus.get_object("org.bluez", path) nu_lookup = dict()
characteristics = dict()
for chrc_path, chrc_interfaces in iteritems(objects):
if "org.bluez.GattCharacteristic1" not in chrc_interfaces:
continue
if not chrc_path.startswith(service.object_path):
continue
chrc = bus.get_object("org.bluez", chrc_path)
uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID', uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties') dbus_interface='org.freedesktop.DBus.Properties')
self.characteristics[uuid] = chrc characteristics[uuid] = chrc
for desc_path, desc_interfaces in iteritems(objects):
if "org.bluez.GattDescriptor1" not in desc_interfaces:
continue
if not desc_path.startswith(chrc.object_path):
continue
desc = bus.get_object("org.bluez", desc_path)
desc_uuid = desc.Get('org.bluez.GattDescriptor1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties')
if desc_uuid[4:8] != '2901':
continue
try:
readval = desc.ReadValue({}, dbus_interface='org.bluez.GattDescriptor1')
except dbus.exceptions.DBusException:
break
found_name = ''.join(chr(b) for b in readval).lower()
nu_lookup[found_name] = uuid
break
match_found = True
for name in self.chrc_names:
if name not in nu_lookup:
# Endpoint name not present
match_found = False
break
# Create lookup table only if all endpoint names found
self.nu_lookup = [None, nu_lookup][match_found]
self.characteristics = characteristics
service_found = True
# If the service UUID matches that in the advertisement
# we can stop the search now. If it doesn't match, we
# have found the service corresponding to the fallback
# UUID, in which case don't break and keep searching
# for the advertised service
if srv_uuid == self.srv_uuid_adv:
break
if not service_found:
self.device.Disconnect(dbus_interface='org.bluez.Device1')
if self.adapter:
self.adapter.RemoveDevice(self.device)
self.device = None
self.nu_lookup = None
self.characteristics = dict()
raise RuntimeError("Provisioning service not found")
def get_nu_lookup(self):
return self.nu_lookup
def has_characteristic(self, uuid): def has_characteristic(self, uuid):
if uuid in self.characteristics.keys(): if uuid in self.characteristics:
return True return True
return False return False
@ -154,6 +221,8 @@ class BLE_Bluez_Client:
if self.adapter: if self.adapter:
self.adapter.RemoveDevice(self.device) self.adapter.RemoveDevice(self.device)
self.device = None self.device = None
self.nu_lookup = None
self.characteristics = dict()
if self.adapter_props: if self.adapter_props:
self.adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(0)) self.adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(0))
@ -180,7 +249,7 @@ class BLE_Bluez_Client:
# Console based BLE client for Cross Platform support # Console based BLE client for Cross Platform support
class BLE_Console_Client: class BLE_Console_Client:
def connect(self, devname, iface, srv_uuid): def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
print("BLE client is running in console mode") print("BLE client is running in console mode")
print("\tThis could be due to your platform not being supported or dependencies not being met") print("\tThis could be due to your platform not being supported or dependencies not being met")
print("\tPlease ensure all pre-requisites are met to run the full fledged client") print("\tPlease ensure all pre-requisites are met to run the full fledged client")
@ -189,11 +258,14 @@ class BLE_Console_Client:
if resp != 'Y' and resp != 'y': if resp != 'Y' and resp != 'y':
return False return False
print("BLECLI >> List available attributes of the connected device") print("BLECLI >> List available attributes of the connected device")
resp = input("BLECLI >> Is the service UUID '" + srv_uuid + "' listed among available attributes? [y/n] ") resp = input("BLECLI >> Is the service UUID '" + fallback_srv_uuid + "' listed among available attributes? [y/n] ")
if resp != 'Y' and resp != 'y': if resp != 'Y' and resp != 'y':
return False return False
return True return True
def get_nu_lookup(self):
return None
def has_characteristic(self, uuid): def has_characteristic(self, uuid):
resp = input("BLECLI >> Is the characteristic UUID '" + uuid + "' listed among available attributes? [y/n] ") resp = input("BLECLI >> Is the characteristic UUID '" + uuid + "' listed among available attributes? [y/n] ")
if resp != 'Y' and resp != 'y': if resp != 'Y' and resp != 'y':

View file

@ -27,19 +27,28 @@ class Transport_BLE(Transport):
# Calculate characteristic UUID for each endpoint # Calculate characteristic UUID for each endpoint
nu_lookup[name] = service_uuid[:4] + '{:02x}'.format( nu_lookup[name] = service_uuid[:4] + '{:02x}'.format(
int(nu_lookup[name], 16) & int(service_uuid[4:8], 16)) + service_uuid[8:] int(nu_lookup[name], 16) & int(service_uuid[4:8], 16)) + service_uuid[8:]
self.name_uuid_lookup = nu_lookup
# Get BLE client module # Get BLE client module
self.cli = ble_cli.get_client() self.cli = ble_cli.get_client()
# Use client to connect to BLE device and bind to service # Use client to connect to BLE device and bind to service
if not self.cli.connect(devname=devname, iface='hci0', srv_uuid=service_uuid): if not self.cli.connect(devname=devname, iface='hci0',
chrc_names=nu_lookup.keys(),
fallback_srv_uuid=service_uuid):
raise RuntimeError("Failed to initialize transport") raise RuntimeError("Failed to initialize transport")
# Check if expected characteristics are provided by the service # Irrespective of provided parameters, let the client
for name in self.name_uuid_lookup.keys(): # generate a lookup table by reading advertisement data
if not self.cli.has_characteristic(self.name_uuid_lookup[name]): # and characteristic user descriptors
raise RuntimeError("'" + name + "' endpoint not found") self.name_uuid_lookup = self.cli.get_nu_lookup()
# If that doesn't work, use the lookup table provided as parameter
if self.name_uuid_lookup is None:
self.name_uuid_lookup = nu_lookup
# Check if expected characteristics are provided by the service
for name in self.name_uuid_lookup.keys():
if not self.cli.has_characteristic(self.name_uuid_lookup[name]):
raise RuntimeError("'" + name + "' endpoint not found")
def __del__(self): def __del__(self):
# Make sure device is disconnected before application gets closed # Make sure device is disconnected before application gets closed