Merge branch 'fix/nimble_example_test' into 'master'

nimble_example_test: Fix BlePeripehral Test thread stuck issue

See merge request espressif/esp-idf!5510
This commit is contained in:
Angus Gratton 2019-09-19 11:11:08 +08:00
commit b48daafb0c
4 changed files with 297 additions and 221 deletions

View file

@ -64,6 +64,8 @@ def test_example_app_ble_central(env, extra_data):
adv_type = 'peripheral'
adv_uuid = '1811'
subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*'])
subprocess.check_output(['hciconfig','hci0','reset'])
# Acquire DUT
dut = env.get_dut("blecent", "examples/bluetooth/nimble/blecent")
@ -75,8 +77,8 @@ def test_example_app_ble_central(env, extra_data):
# Upload binary and start testing
Utility.console_log("Starting blecent example test app")
dut.start_app()
dut.reset()
subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*'])
device_addr = ':'.join(re.findall('..', '%012x' % uuid.getnode()))
# Get BLE client module

View file

@ -18,7 +18,9 @@ from __future__ import print_function
import os
import sys
import re
from threading import Thread
import threading
import traceback
import Queue
import subprocess
try:
@ -51,7 +53,7 @@ import Utility
# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
def blehr_client_task(dut_addr, dut):
def blehr_client_task(hr_obj, dut_addr):
interface = 'hci0'
ble_devname = 'blehr_sensor_1.0'
hr_srv_uuid = '180d'
@ -70,20 +72,18 @@ def blehr_client_task(dut_addr, dut):
# Connect BLE Device
is_connected = ble_client_obj.connect()
if not is_connected:
Utility.console_log("Connection to device ", ble_devname, "failed !!")
# Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect()
return
raise RuntimeError("Connection to device " + str(ble_devname) + " failed !!")
# Read Services
services_ret = ble_client_obj.get_services()
if services_ret:
print("\nServices\n")
print(services_ret)
Utility.console_log("\nServices\n")
Utility.console_log(str(services_ret))
else:
print("Failure: Read Services failed")
ble_client_obj.disconnect()
return
raise RuntimeError("Failure: Read Services failed")
'''
Blehr application run:
@ -93,14 +93,27 @@ def blehr_client_task(dut_addr, dut):
'''
blehr_ret = ble_client_obj.hr_update_simulation(hr_srv_uuid, hr_char_uuid)
if blehr_ret:
print("Success: blehr example test passed")
Utility.console_log("Success: blehr example test passed")
else:
print("Failure: blehr example test failed")
raise RuntimeError("Failure: blehr example test failed")
# Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect()
class BleHRThread(threading.Thread):
def __init__(self, dut_addr, exceptions_queue):
threading.Thread.__init__(self)
self.dut_addr = dut_addr
self.exceptions_queue = exceptions_queue
def run(self):
try:
blehr_client_task(self, self.dut_addr)
except Exception:
self.exceptions_queue.put(traceback.format_exc(), block=False)
@IDF.idf_example_test(env_tag="Example_WIFI_BT")
def test_example_app_ble_hr(env, extra_data):
"""
@ -111,38 +124,47 @@ def test_example_app_ble_hr(env, extra_data):
4. Updated value is retrieved
5. Stop Notifications
"""
try:
# Acquire DUT
dut = env.get_dut("blehr", "examples/bluetooth/nimble/blehr")
subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*'])
subprocess.check_output(['hciconfig','hci0','reset'])
# Get binary file
binary_file = os.path.join(dut.app.binary_path, "blehr.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("blehr_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("blehr_bin_size", bin_size // 1024)
# Acquire DUT
dut = env.get_dut("blehr", "examples/bluetooth/nimble/blehr")
# Upload binary and start testing
Utility.console_log("Starting blehr simple example test app")
dut.start_app()
# Get binary file
binary_file = os.path.join(dut.app.binary_path, "blehr.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("blehr_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("blehr_bin_size", bin_size // 1024)
subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*'])
# Upload binary and start testing
Utility.console_log("Starting blehr simple example test app")
dut.start_app()
dut.reset()
# Get device address from dut
dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0]
# Get device address from dut
dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0]
exceptions_queue = Queue.Queue()
# Starting a py-client in a separate thread
blehr_thread_obj = BleHRThread(dut_addr, exceptions_queue)
blehr_thread_obj.start()
blehr_thread_obj.join()
# Starting a py-client in a separate thread
thread1 = Thread(target=blehr_client_task, args=(dut_addr,dut,))
thread1.start()
thread1.join()
exception_msg = None
while True:
try:
exception_msg = exceptions_queue.get(block=False)
except Queue.Empty:
break
else:
Utility.console_log("\n" + exception_msg)
# Check dut responses
dut.expect("subscribe event; cur_notify=1", timeout=30)
dut.expect("GATT procedure initiated: notify;", timeout=30)
dut.expect("subscribe event; cur_notify=0", timeout=30)
dut.expect("disconnect;", timeout=30)
if exception_msg:
raise Exception("Thread did not run successfully")
except Exception as e:
sys.exit(e)
# Check dut responses
dut.expect("subscribe event; cur_notify=1", timeout=30)
dut.expect("subscribe event; cur_notify=0", timeout=30)
dut.expect("disconnect;", timeout=30)
if __name__ == '__main__':

View file

@ -18,7 +18,9 @@ from __future__ import print_function
import os
import sys
import re
from threading import Thread
import Queue
import traceback
import threading
import subprocess
try:
@ -50,7 +52,7 @@ import Utility
# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
def bleprph_client_task(dut_addr, dut):
def bleprph_client_task(prph_obj, dut, dut_addr):
interface = 'hci0'
ble_devname = 'nimble-bleprph'
srv_uuid = '2f12'
@ -68,10 +70,9 @@ def bleprph_client_task(dut_addr, dut):
# Connect BLE Device
is_connected = ble_client_obj.connect()
if not is_connected:
Utility.console_log("Connection to device ", ble_devname, "failed !!")
# Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect()
return
raise RuntimeError("Connection to device " + ble_devname + " failed !!")
# Check dut responses
dut.expect("GAP procedure initiated: advertise;", timeout=30)
@ -79,12 +80,11 @@ def bleprph_client_task(dut_addr, dut):
# Read Services
services_ret = ble_client_obj.get_services(srv_uuid)
if services_ret:
print("\nServices\n")
print(services_ret)
Utility.console_log("\nServices\n")
Utility.console_log(str(services_ret))
else:
print("Failure: Read Services failed")
ble_client_obj.disconnect()
return
raise RuntimeError("Failure: Read Services failed")
# Read Characteristics
chars_ret = {}
@ -92,14 +92,13 @@ def bleprph_client_task(dut_addr, dut):
if chars_ret:
Utility.console_log("\nCharacteristics retrieved")
for path, props in chars_ret.items():
print("\n\tCharacteristic: ", path)
print("\tCharacteristic UUID: ", props[2])
print("\tValue: ", props[0])
print("\tProperties: : ", props[1])
Utility.console_log("\n\tCharacteristic: " + str(path))
Utility.console_log("\tCharacteristic UUID: " + str(props[2]))
Utility.console_log("\tValue: " + str(props[0]))
Utility.console_log("\tProperties: : " + str(props[1]))
else:
print("Failure: Read Characteristics failed")
ble_client_obj.disconnect()
return
raise RuntimeError("Failure: Read Characteristics failed")
'''
Write Characteristics
@ -110,19 +109,32 @@ def bleprph_client_task(dut_addr, dut):
if chars_ret_on_write:
Utility.console_log("\nCharacteristics after write operation")
for path, props in chars_ret_on_write.items():
print("\n\tCharacteristic:", path)
print("\tCharacteristic UUID: ", props[2])
print("\tValue:", props[0])
print("\tProperties: : ", props[1])
Utility.console_log("\n\tCharacteristic:" + str(path))
Utility.console_log("\tCharacteristic UUID: " + str(props[2]))
Utility.console_log("\tValue:" + str(props[0]))
Utility.console_log("\tProperties: : " + str(props[1]))
else:
print("Failure: Write Characteristics failed")
ble_client_obj.disconnect()
return
raise RuntimeError("Failure: Write Characteristics failed")
# Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect()
class BlePrphThread(threading.Thread):
def __init__(self, dut, dut_addr, exceptions_queue):
threading.Thread.__init__(self)
self.dut = dut
self.dut_addr = dut_addr
self.exceptions_queue = exceptions_queue
def run(self):
try:
bleprph_client_task(self, self.dut, self.dut_addr)
except Exception:
self.exceptions_queue.put(traceback.format_exc(), block=False)
@IDF.idf_example_test(env_tag="Example_WIFI_BT")
def test_example_app_ble_peripheral(env, extra_data):
"""
@ -133,36 +145,47 @@ def test_example_app_ble_peripheral(env, extra_data):
4. Read Characteristics
5. Write Characteristics
"""
try:
subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*'])
subprocess.check_output(['hciconfig','hci0','reset'])
# Acquire DUT
dut = env.get_dut("bleprph", "examples/bluetooth/nimble/bleprph")
# Acquire DUT
dut = env.get_dut("bleprph", "examples/bluetooth/nimble/bleprph")
# Get binary file
binary_file = os.path.join(dut.app.binary_path, "bleprph.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("bleprph_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("bleprph_bin_size", bin_size // 1024)
# Get binary file
binary_file = os.path.join(dut.app.binary_path, "bleprph.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("bleprph_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("bleprph_bin_size", bin_size // 1024)
# Upload binary and start testing
Utility.console_log("Starting bleprph simple example test app")
dut.start_app()
# Upload binary and start testing
Utility.console_log("Starting bleprph simple example test app")
dut.start_app()
dut.reset()
subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*'])
# Get device address from dut
dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0]
# Get device address from dut
dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0]
exceptions_queue = Queue.Queue()
# Starting a py-client in a separate thread
bleprph_thread_obj = BlePrphThread(dut, dut_addr, exceptions_queue)
bleprph_thread_obj.start()
bleprph_thread_obj.join()
# Starting a py-client in a separate thread
thread1 = Thread(target=bleprph_client_task, args=(dut_addr,dut,))
thread1.start()
thread1.join()
exception_msg = None
while True:
try:
exception_msg = exceptions_queue.get(block=False)
except Queue.Empty:
break
else:
Utility.console_log("\n" + exception_msg)
# Check dut responses
dut.expect("connection established; status=0", timeout=30)
dut.expect("disconnect;", timeout=30)
except Exception as e:
sys.exit(e)
if exception_msg:
raise Exception("Thread did not run successfully")
# Check dut responses
dut.expect("connection established; status=0", timeout=30)
dut.expect("disconnect;", timeout=30)
if __name__ == '__main__':

View file

@ -20,6 +20,7 @@
from __future__ import print_function
import sys
import time
import traceback
try:
from future.moves.itertools import zip_longest
@ -39,11 +40,11 @@ import lib_gap
srv_added_old_cnt = 0
srv_added_new_cnt = 0
verify_signal_check = 0
blecent_retry_check_cnt = 0
verify_service_cnt = 0
verify_readchars_cnt = 0
blecent_adv_uuid = '1811'
iface_added = False
gatt_app_obj_check = False
gatt_app_reg_check = False
adv_data_check = False
@ -54,6 +55,7 @@ subscribe_req_check = False
ble_hr_chrc = False
DISCOVERY_START = False
SIGNAL_CAUGHT = False
TEST_CHECKS_PASS = False
ADV_STOP = False
@ -89,26 +91,42 @@ dbus.mainloop.glib.threads_init()
event_loop = GLib.MainLoop()
def verify_signal_is_caught():
global verify_signal_check
verify_signal_check += 1
if (not SIGNAL_CAUGHT and verify_signal_check == 15) or (SIGNAL_CAUGHT):
if event_loop.is_running():
event_loop.quit()
return False # polling for checks will stop
return True # polling will continue
def set_props_status(props):
"""
Set Adapter status if it is powered on or off
"""
global ADAPTER_ON, SERVICES_RESOLVED, GATT_OBJ_REMOVED, GATT_APP_REGISTERED, \
ADV_REGISTERED, ADV_ACTIVE_INSTANCE, DEVICE_CONNECTED, CHRC_VALUE, CHRC_VALUE_CNT
ADV_REGISTERED, ADV_ACTIVE_INSTANCE, DEVICE_CONNECTED, CHRC_VALUE, CHRC_VALUE_CNT, \
SIGNAL_CAUGHT
is_service_uuid = False
# Signal caught for change in Adapter Powered property
if 'Powered' in props:
if props['Powered'] == 1:
SIGNAL_CAUGHT = True
ADAPTER_ON = True
else:
SIGNAL_CAUGHT = True
ADAPTER_ON = False
event_loop.quit()
elif 'ServicesResolved' in props:
if 'ServicesResolved' in props:
if props['ServicesResolved'] == 1:
SIGNAL_CAUGHT = True
SERVICES_RESOLVED = True
else:
SIGNAL_CAUGHT = True
SERVICES_RESOLVED = False
elif 'UUIDs' in props:
if 'UUIDs' in props:
# Signal caught for add/remove GATT data having service uuid
for uuid in props['UUIDs']:
if blecent_adv_uuid in uuid:
@ -118,7 +136,7 @@ def set_props_status(props):
# and for unregistering GATT application
GATT_APP_REGISTERED = False
lib_gatt.GATT_APP_OBJ = False
elif 'ActiveInstances' in props:
if 'ActiveInstances' in props:
# Signal caught for Advertising - add/remove Instances property
if props['ActiveInstances'] == 1:
ADV_ACTIVE_INSTANCE = True
@ -126,20 +144,24 @@ def set_props_status(props):
ADV_ACTIVE_INSTANCE = False
ADV_REGISTERED = False
lib_gap.ADV_OBJ = False
elif 'Connected' in props:
if 'Connected' in props:
# Signal caught for device connect/disconnect
if props['Connected'] is True:
if props['Connected'] == 1:
SIGNAL_CAUGHT = True
DEVICE_CONNECTED = True
event_loop.quit()
else:
SIGNAL_CAUGHT = True
DEVICE_CONNECTED = False
elif 'Value' in props:
if 'Value' in props:
# Signal caught for change in chars value
if ble_hr_chrc:
CHRC_VALUE_CNT += 1
print(props['Value'])
if CHRC_VALUE_CNT == 10:
event_loop.quit()
SIGNAL_CAUGHT = True
return False
return False
def props_change_handler(iface, changed_props, invalidated):
@ -194,6 +216,9 @@ class BLE_Bluez_Client:
Discover Bluetooth Adapter
Power On Bluetooth Adapter
'''
global verify_signal_check, SIGNAL_CAUGHT, ADAPTER_ON
verify_signal_check = 0
ADAPTER_ON = False
try:
print("discovering adapter...")
for path, interfaces in self.ble_objs.items():
@ -209,45 +234,45 @@ class BLE_Bluez_Client:
break
if self.adapter is None:
raise RuntimeError("\nError: bluetooth adapter not found")
raise Exception("Bluetooth adapter not found")
if self.props_iface_obj is None:
raise RuntimeError("\nError: properties interface not found")
raise Exception("Properties interface not found")
print("bluetooth adapter discovered")
self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
# Check if adapter is already powered on
if ADAPTER_ON:
print("bluetooth adapter is already on")
print("Adapter already powered on")
return True
# Power On Adapter
print("powering on adapter...")
self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
self.props_iface_obj.Set(ADAPTER_IFACE, "Powered", dbus.Boolean(1))
SIGNAL_CAUGHT = False
GLib.timeout_add_seconds(5, verify_signal_is_caught)
event_loop.run()
if ADAPTER_ON:
print("bluetooth adapter powered on")
return True
else:
print("Failure: bluetooth adapter not powered on")
return False
raise Exception("Failure: bluetooth adapter not powered on")
except Exception as e:
print(e)
sys.exit(1)
except Exception:
print(traceback.format_exc())
return False
def connect(self):
'''
Connect to the device discovered
Retry 10 times to discover and connect to device
'''
global DISCOVERY_START
global DISCOVERY_START, SIGNAL_CAUGHT, DEVICE_CONNECTED, verify_signal_check
device_found = False
DEVICE_CONNECTED = False
try:
self.adapter.StartDiscovery()
print("\nStarted Discovery")
@ -255,6 +280,7 @@ class BLE_Bluez_Client:
DISCOVERY_START = True
for retry_cnt in range(10,0,-1):
verify_signal_check = 0
try:
if self.device is None:
print("\nConnecting to device...")
@ -263,9 +289,15 @@ class BLE_Bluez_Client:
device_found = self.get_device()
if device_found:
self.device.Connect(dbus_interface=DEVICE_IFACE)
event_loop.quit()
print("\nConnected to device")
return True
time.sleep(15)
SIGNAL_CAUGHT = False
GLib.timeout_add_seconds(5, verify_signal_is_caught)
event_loop.run()
if DEVICE_CONNECTED:
print("\nConnected to device")
return True
else:
raise Exception
except Exception as e:
print(e)
print("\nRetries left", retry_cnt - 1)
@ -274,8 +306,8 @@ class BLE_Bluez_Client:
# Device not found
return False
except Exception as e:
print(e)
except Exception:
print(traceback.format_exc())
self.device = None
return False
@ -299,8 +331,7 @@ class BLE_Bluez_Client:
break
if dev_path is None:
print("\nBLE device not found")
return False
raise Exception("\nBLE device not found")
device_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DBUS_PROP_IFACE)
device_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
@ -321,16 +352,7 @@ class BLE_Bluez_Client:
if path not in self.services:
self.services.append(path)
def verify_get_services(self):
global SERVICE_SCAN_FAIL, verify_service_cnt
verify_service_cnt += 1
if iface_added and self.services and SERVICES_RESOLVED:
event_loop.quit()
if verify_service_cnt == 10:
event_loop.quit()
def verify_service_uuid_found(self):
def verify_service_uuid_found(self, service_uuid):
'''
Verify service uuid found
'''
@ -340,40 +362,45 @@ class BLE_Bluez_Client:
if srv_uuid_found:
SERVICE_UUID_FOUND = True
def get_services(self, srv_uuid=None):
def get_services(self, service_uuid=None):
'''
Retrieve Services found in the device connected
'''
global service_uuid, iface_added, SERVICE_UUID_FOUND
service_uuid = srv_uuid
iface_added = False
global SIGNAL_CAUGHT, SERVICE_UUID_FOUND, SERVICES_RESOLVED, verify_signal_check
verify_signal_check = 0
SERVICE_UUID_FOUND = False
SERVICES_RESOLVED = False
SIGNAL_CAUGHT = False
try:
om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE)
self.ble_objs = om_iface_obj.GetManagedObjects()
for path, interfaces in self.ble_objs.items():
self.srvc_iface_added_handler(path, interfaces)
# If services not found, then they may not have been added yet on dbus
if not self.services:
iface_added = True
GLib.timeout_add_seconds(2, self.verify_get_services)
GLib.timeout_add_seconds(5, verify_signal_is_caught)
om_iface_obj.connect_to_signal('InterfacesAdded', self.srvc_iface_added_handler)
event_loop.run()
if not SERVICES_RESOLVED:
raise Exception("Services not found...")
if service_uuid:
self.verify_service_uuid_found()
self.verify_service_uuid_found(service_uuid)
if not SERVICE_UUID_FOUND:
raise Exception("Service with uuid: %s not found !!!" % service_uuid)
raise Exception("Service with uuid: %s not found..." % service_uuid)
# Services found
return self.srv_uuid
except Exception as e:
print("Error: ", e)
except Exception:
print(traceback.format_exc())
return False
def chrc_iface_added_handler(self, path, interfaces):
'''
Add services found as lib_ble_client obj
'''
global chrc, chrc_discovered
global chrc, chrc_discovered, SIGNAL_CAUGHT
chrc_val = None
if self.device and path.startswith(self.device.object_path):
@ -386,22 +413,17 @@ class BLE_Bluez_Client:
chrc_val = chrc.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
uuid = chrc_props['UUID']
self.chars[path] = chrc_val, chrc_flags, uuid
def verify_get_chars(self):
global verify_readchars_cnt
verify_readchars_cnt += 1
if iface_added and self.chars:
event_loop.quit()
if verify_readchars_cnt == 10:
event_loop.quit()
SIGNAL_CAUGHT = True
def read_chars(self):
'''
Read characteristics found in the device connected
'''
global iface_added, chrc_discovered
global iface_added, chrc_discovered, SIGNAL_CAUGHT, verify_signal_check
verify_signal_check = 0
chrc_discovered = False
iface_added = False
SIGNAL_CAUGHT = False
try:
om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE)
@ -410,14 +432,15 @@ class BLE_Bluez_Client:
self.chrc_iface_added_handler(path, interfaces)
# If chars not found, then they have not been added yet to interface
time.sleep(15)
if not self.chars:
iface_added = True
GLib.timeout_add_seconds(2, self.verify_get_chars)
GLib.timeout_add_seconds(5, verify_signal_is_caught)
om_iface_obj.connect_to_signal('InterfacesAdded', self.chars_iface_added_handler)
event_loop.run()
return self.chars
except Exception as e:
print("Error: ", e)
except Exception:
print(traceback.format_exc())
return False
def write_chars(self, write_val):
@ -443,12 +466,11 @@ class BLE_Bluez_Client:
return False
self.chars[path] = chrc_val, props[1], props[2] # update value
if not char_write_props:
print("Failure: Cannot perform write operation. Characteristic does not have write permission.")
return False
raise Exception("Failure: Cannot perform write operation. Characteristic does not have write permission.")
return self.chars
except Exception as e:
print(e)
except Exception:
print(traceback.format_exc())
return False
def hr_update_simulation(self, hr_srv_uuid, hr_char_uuid):
@ -457,7 +479,7 @@ class BLE_Bluez_Client:
Retrieve updated value
Stop Notifications
'''
global ble_hr_chrc
global ble_hr_chrc, verify_signal_check, SIGNAL_CAUGHT, CHRC_VALUE_CNT
srv_path = None
chrc = None
@ -465,6 +487,7 @@ class BLE_Bluez_Client:
chrc_path = None
chars_ret = None
ble_hr_chrc = True
CHRC_VALUE_CNT = 0
try:
# Get HR Measurement characteristic
@ -475,8 +498,7 @@ class BLE_Bluez_Client:
break
if srv_path is None:
print("Failure: HR UUID:", hr_srv_uuid, "not found")
return False
raise Exception("Failure: HR UUID:", hr_srv_uuid, "not found")
chars_ret = self.read_chars()
@ -487,8 +509,8 @@ class BLE_Bluez_Client:
if hr_char_uuid in props[2]: # uuid
break
if chrc is None:
print("Failure: Characteristics for service: ", srv_path, "not found")
return False
raise Exception("Failure: Characteristics for service: ", srv_path, "not found")
# Subscribe to notifications
print("\nSubscribe to notifications: On")
chrc.StartNotify(dbus_interface=GATT_CHRC_IFACE)
@ -496,6 +518,9 @@ class BLE_Bluez_Client:
chrc_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, chrc_path), DBUS_PROP_IFACE)
chrc_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
SIGNAL_CAUGHT = False
verify_signal_check = 0
GLib.timeout_add_seconds(5, verify_signal_is_caught)
event_loop.run()
chrc.StopNotify(dbus_interface=GATT_CHRC_IFACE)
time.sleep(2)
@ -504,8 +529,8 @@ class BLE_Bluez_Client:
ble_hr_chrc = False
return True
except Exception as e:
print(e)
except Exception:
print(traceback.format_exc())
return False
def create_gatt_app(self):
@ -513,10 +538,12 @@ class BLE_Bluez_Client:
Create GATT data
Register GATT Application
'''
global gatt_app_obj, gatt_manager_iface_obj
global gatt_app_obj, gatt_manager_iface_obj, GATT_APP_REGISTERED
gatt_app_obj = None
gatt_manager_iface_obj = None
GATT_APP_REGISTERED = False
lib_gatt.GATT_APP_OBJ = False
try:
gatt_app_obj = lib_gatt.Application(self.bus, self.adapter_path[0])
@ -526,8 +553,8 @@ class BLE_Bluez_Client:
reply_handler=self.gatt_app_handler,
error_handler=self.gatt_app_error_handler)
return True
except Exception as e:
print(e)
except Exception:
print(traceback.format_exc())
return False
def gatt_app_handler(self):
@ -550,25 +577,28 @@ class BLE_Bluez_Client:
Register Advertisement
Start Advertising
'''
global le_adv_obj, le_adv_manager_iface_obj
global le_adv_obj, le_adv_manager_iface_obj, ADV_ACTIVE_INSTANCE, ADV_REGISTERED
le_adv_obj = None
le_adv_manager_iface_obj = None
le_adv_iface_path = None
ADV_ACTIVE_INSTANCE = False
ADV_REGISTERED = False
lib_gap.ADV_OBJ = False
try:
print("Advertising started")
gatt_app_ret = self.create_gatt_app()
if not gatt_app_ret:
return False
raise Exception
for path,interface in self.ble_objs.items():
if LE_ADVERTISING_MANAGER_IFACE in interface:
le_adv_iface_path = path
if le_adv_iface_path is None:
print('\n Cannot start advertising. LEAdvertisingManager1 Interface not found')
return False
raise Exception('\n Cannot start advertising. LEAdvertisingManager1 Interface not found')
le_adv_obj = lib_gap.Advertisement(self.bus, adv_iface_index, adv_type, adv_uuid, adv_host_name)
le_adv_manager_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, le_adv_iface_path), LE_ADVERTISING_MANAGER_IFACE)
@ -583,11 +613,10 @@ class BLE_Bluez_Client:
if TEST_CHECKS_PASS:
return True
else:
return False
raise Exception
except Exception as e:
print("in Exception")
print(e)
except Exception:
print(traceback.format_exc())
return False
def adv_handler(self):
@ -646,39 +675,41 @@ class BLE_Bluez_Client:
TEST_CHECKS_PASS = False
if subscribe_req_check:
lib_gatt.alert_status_char_obj.StopNotify()
event_loop.quit()
return False # polling for checks will stop
else:
# Check for success
if not gatt_app_obj_check and lib_gatt.GATT_APP_OBJ:
print("GATT Data created")
gatt_app_obj_check = True
if not gatt_app_reg_check and GATT_APP_REGISTERED:
print("GATT Application registered")
gatt_app_reg_check = True
if not adv_data_check and lib_gap.ADV_OBJ:
print("Advertising data created")
adv_data_check = True
if not adv_reg_check and ADV_REGISTERED and ADV_ACTIVE_INSTANCE:
print("Advertisement registered")
adv_reg_check = True
if not read_req_check and lib_gatt.CHAR_READ:
read_req_check = True
if not write_req_check and lib_gatt.CHAR_WRITE:
write_req_check = True
if not subscribe_req_check and lib_gatt.CHAR_SUBSCRIBE:
subscribe_req_check = True
# Check for success
if not gatt_app_obj_check and lib_gatt.GATT_APP_OBJ:
print("GATT Data created")
gatt_app_obj_check = True
if not gatt_app_reg_check and GATT_APP_REGISTERED:
print("GATT Application registered")
gatt_app_reg_check = True
if not adv_data_check and lib_gap.ADV_OBJ:
print("Advertising data created")
adv_data_check = True
if not adv_reg_check and ADV_REGISTERED and ADV_ACTIVE_INSTANCE:
print("Advertisement registered")
adv_reg_check = True
if not read_req_check and lib_gatt.CHAR_READ:
read_req_check = True
if not write_req_check and lib_gatt.CHAR_WRITE:
write_req_check = True
if not subscribe_req_check and lib_gatt.CHAR_SUBSCRIBE:
subscribe_req_check = True
# Increment retry count
blecent_retry_check_cnt += 1
if read_req_check and write_req_check and subscribe_req_check:
# all checks passed
# Blecent Test passed
TEST_CHECKS_PASS = True
lib_gatt.alert_status_char_obj.StopNotify()
event_loop.quit()
if (blecent_retry_check_cnt == 10 or TEST_CHECKS_PASS):
if event_loop.is_running():
event_loop.quit()
return False # polling for checks will stop
# Increment retry count
blecent_retry_check_cnt += 1
# Default return True - polling for checks will continue
return True
@ -709,29 +740,29 @@ class BLE_Bluez_Client:
# Blecent Test failed
ADV_STOP = False
event_loop.quit()
return False # polling for checks will stop
else:
# Check for success
if not gatt_app_obj_check and not lib_gatt.GATT_APP_OBJ:
print("GATT Data removed")
gatt_app_obj_check = True
if not gatt_app_reg_check and not GATT_APP_REGISTERED:
print("GATT Application unregistered")
gatt_app_reg_check = True
if not adv_data_check and not adv_reg_check and not (ADV_REGISTERED or ADV_ACTIVE_INSTANCE or lib_gap.ADV_OBJ):
print("Advertising data removed")
print("Advertisement unregistered")
adv_data_check = True
adv_reg_check = True
# all checks passed
ADV_STOP = True
# Check for success
if not gatt_app_obj_check and not lib_gatt.GATT_APP_OBJ:
print("GATT Data removed")
gatt_app_obj_check = True
if not gatt_app_reg_check and not GATT_APP_REGISTERED:
print("GATT Application unregistered")
gatt_app_reg_check = True
if not adv_data_check and not adv_reg_check and not (ADV_REGISTERED or ADV_ACTIVE_INSTANCE or lib_gap.ADV_OBJ):
print("Advertising data removed")
print("Advertisement unregistered")
adv_data_check = True
adv_reg_check = True
if (blecent_retry_check_cnt == 10 or ADV_STOP):
if event_loop.is_running():
event_loop.quit()
return False # polling for checks will stop
# Increment retry count
blecent_retry_check_cnt += 1
if adv_reg_check:
# all checks passed
ADV_STOP = True
event_loop.quit()
return False # polling for checks will stop
# Default return True - polling for checks will continue
return True
@ -747,11 +778,14 @@ class BLE_Bluez_Client:
Adapter is powered off
'''
try:
global blecent_retry_check_cnt, DISCOVERY_START
global blecent_retry_check_cnt, DISCOVERY_START, verify_signal_check, SIGNAL_CAUGHT
blecent_retry_check_cnt = 0
verify_signal_check = 0
print("\nexiting from test...")
self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
if ADV_REGISTERED:
# Unregister Advertisement
le_adv_manager_iface_obj.UnregisterAdvertisement(le_adv_obj.get_path())
@ -766,7 +800,7 @@ class BLE_Bluez_Client:
# Remove GATT data
dbus.service.Object.remove_from_connection(gatt_app_obj)
GLib.timeout_add_seconds(2, self.verify_blecent_disconnect)
GLib.timeout_add_seconds(5, self.verify_blecent_disconnect)
event_loop.run()
if ADV_STOP:
@ -784,9 +818,10 @@ class BLE_Bluez_Client:
if DISCOVERY_START:
self.adapter.StopDiscovery()
DISCOVERY_START = False
time.sleep(15)
# Power off Adapter
self.props_iface_obj.Set(ADAPTER_IFACE, "Powered", dbus.Boolean(0))
SIGNAL_CAUGHT = False
GLib.timeout_add_seconds(5, verify_signal_is_caught)
event_loop.run()
if not DEVICE_CONNECTED:
@ -794,12 +829,6 @@ class BLE_Bluez_Client:
else:
print("Warning: device could not be disconnected")
print("powering off adapter...")
if not ADAPTER_ON:
print("bluetooth adapter powered off")
else:
print("\nWarning: Bluetooth adapter not powered off")
except Exception as e:
print(e)
except Exception:
print(traceback.format_exc())
return False