From bfd1eeefa283a00cd52190666008ed0b00b7d66d Mon Sep 17 00:00:00 2001 From: Shivani Tipnis Date: Fri, 5 Jul 2019 08:14:04 +0800 Subject: [PATCH] Add NimBLE bleprph,blecent,blehr example tests --- examples/bluetooth/nimble/blecent/README.md | 69 +- .../bluetooth/nimble/blecent/blecent_test.py | 129 +++ examples/bluetooth/nimble/blehr/README.md | 54 ++ examples/bluetooth/nimble/blehr/blehr_test.py | 149 ++++ examples/bluetooth/nimble/bleprph/README.md | 75 ++ .../bluetooth/nimble/bleprph/bleprph_test.py | 169 ++++ tools/ble/lib_ble_client.py | 805 ++++++++++++++++++ tools/ble/lib_gap.py | 87 ++ tools/ble/lib_gatt.py | 412 +++++++++ tools/ble/requirements.txt | 3 + 10 files changed, 1950 insertions(+), 2 deletions(-) create mode 100644 examples/bluetooth/nimble/blecent/blecent_test.py create mode 100644 examples/bluetooth/nimble/blehr/blehr_test.py create mode 100644 examples/bluetooth/nimble/bleprph/bleprph_test.py create mode 100644 tools/ble/lib_ble_client.py create mode 100644 tools/ble/lib_gap.py create mode 100644 tools/ble/lib_gatt.py create mode 100644 tools/ble/requirements.txt diff --git a/examples/bluetooth/nimble/blecent/README.md b/examples/bluetooth/nimble/blecent/README.md index bfa7b68d4..4b531f978 100644 --- a/examples/bluetooth/nimble/blecent/README.md +++ b/examples/bluetooth/nimble/blecent/README.md @@ -20,6 +20,12 @@ This example aims at understanding BLE service discovery, connection and charact To test this demo, use any BLE GATT server app that advertises support for the Alert Notification service (0x1811) and includes it in the GATT database. +A Python based utility `blecent_test.py` is also provided (which will run as a BLE GATT server) and can be used to test this example. + +Note : + +* Make sure to run `python -m pip install --user -r $IDF_PATH/requirements.txt -r $IDF_PATH/tools/ble/requirements.txt` to install the dependency packages needed. +* Currently this Python utility is only supported on Linux (BLE communication is via BLuez + DBus). ## How to use example @@ -45,7 +51,8 @@ See the Getting Started Guide for full steps to configure and use ESP-IDF to bui ## Example Output -There is this console output on successful connection: +This is the console output on successful connection: + ``` I (202) BTDM_INIT: BT controller compile version [0b60040] I (202) system_api: Base MAC address is not set, read default base MAC address from BLK0 of EFUSE @@ -71,7 +78,8 @@ Write complete; status=0 conn_handle=0 attr_handle=47 Subscribe complete; status=0 conn_handle=0 attr_handle=43 ``` -There is this console output on failure (or peripheral does not support New Alert Service category): +This is the console output on failure (or peripheral does not support New Alert Service category): + ``` I (180) BTDM_INIT: BT controller compile version [8e87ec7] I (180) system_api: Base MAC address is not set, read default base MAC address from BLK0 of EFUSE @@ -92,3 +100,60 @@ Error: Peer doesn't support the Supported New Alert Category characteristic GAP procedure initiated: terminate connection; conn_handle=0 hci_reason=19 disconnect; reason=534 ``` + +## Running Python Utility + +``` +python blecent_test.py +``` + +## Python Utility Output + +This is this output seen on the python side on successful connection: + +``` +discovering adapter... +bluetooth adapter discovered +powering on adapter... +bluetooth adapter powered on +Advertising started +GATT Data created +GATT Application registered +Advertising data created +Advertisement registered +Read Request received + SupportedNewAlertCategoryCharacteristic + Value: [dbus.Byte(2)] +Write Request received + AlertNotificationControlPointCharacteristic + Current value: [dbus.Byte(0)] + New value: [dbus.Byte(99), dbus.Byte(100)] + +Notify Started +New value on write: [dbus.Byte(1), dbus.Byte(0)] + Value on read: [dbus.Byte(1), dbus.Byte(0)] + +Notify Stopped + +exiting from test... +GATT Data removed +GATT Application unregistered +Advertising data removed +Advertisement unregistered +Stop Advertising status: True +disconnecting device... +device disconnected +powering off adapter... +bluetooth adapter powered off +Service discovery passed + Service Discovery Status: 0 +Read passed + SupportedNewAlertCategoryCharacteristic + Read Status: 0 +Write passed + AlertNotificationControlPointCharacteristic + Write Status: 0 +Subscribe passed + ClientCharacteristicConfigurationDescriptor + Subscribe Status: 0 +``` diff --git a/examples/bluetooth/nimble/blecent/blecent_test.py b/examples/bluetooth/nimble/blecent/blecent_test.py new file mode 100644 index 000000000..d1d79298c --- /dev/null +++ b/examples/bluetooth/nimble/blecent/blecent_test.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python +# +# Copyright 2019 Espressif Systems (Shanghai) PTE LTD +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import os +import sys +import re +import uuid +import subprocess + +try: + # This environment variable is expected on the host machine + test_fw_path = os.getenv("TEST_FW_PATH") + if test_fw_path and test_fw_path not in sys.path: + sys.path.insert(0, test_fw_path) + import IDF +except ImportError as e: + print(e) + print("\nCheck your IDF_PATH\nOR") + print("Try `export TEST_FW_PATH=$IDF_PATH/tools/tiny-test-fw` for resolving the issue\nOR") + print("Try `pip install -r $IDF_PATH/tools/tiny-test-fw/requirements.txt` for resolving the issue") + import IDF + +try: + import lib_ble_client +except ImportError: + lib_ble_client_path = os.getenv("IDF_PATH") + "/tools/ble" + if lib_ble_client_path and lib_ble_client_path not in sys.path: + sys.path.insert(0, lib_ble_client_path) + import lib_ble_client + + +import Utility + +# When running on local machine execute the following before running this script +# > make app bootloader +# > make print_flash_cmd | tail -n 1 > build/download.config +# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw + + +@IDF.idf_example_test(env_tag="Example_WIFI_BT") +def test_example_app_ble_central(env, extra_data): + """ + Steps: + 1. Discover Bluetooth Adapter and Power On + """ + + interface = 'hci0' + adv_host_name = "BleCentTestApp" + adv_iface_index = 0 + adv_type = 'peripheral' + adv_uuid = '1811' + + # Acquire DUT + dut = env.get_dut("blecent", "examples/bluetooth/nimble/blecent") + + # Get binary file + binary_file = os.path.join(dut.app.binary_path, "blecent.bin") + bin_size = os.path.getsize(binary_file) + IDF.log_performance("blecent_bin_size", "{}KB".format(bin_size // 1024)) + + # Upload binary and start testing + Utility.console_log("Starting blecent example test app") + dut.start_app() + + subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*']) + device_addr = ':'.join(re.findall('..', '%012x' % uuid.getnode())) + + # Get BLE client module + ble_client_obj = lib_ble_client.BLE_Bluez_Client(interface) + if not ble_client_obj: + raise RuntimeError("Get DBus-Bluez object failed !!") + + # Discover Bluetooth Adapter and power on + is_adapter_set = ble_client_obj.set_adapter() + if not is_adapter_set: + raise RuntimeError("Adapter Power On failed !!") + + # Write device address to dut + dut.expect("BLE Host Task Started", timeout=60) + dut.write(device_addr + "\n") + + ''' + Blecent application run: + Create GATT data + Register GATT Application + Create Advertising data + Register advertisement + Start advertising + ''' + ble_client_obj.start_advertising(adv_host_name, adv_iface_index, adv_type, adv_uuid) + + # Call disconnect to perform cleanup operations before exiting application + ble_client_obj.disconnect() + + # Check dut responses + dut.expect("Connection established", timeout=30) + + dut.expect("Service discovery complete; status=0", timeout=30) + print("Service discovery passed\n\tService Discovery Status: 0") + + dut.expect("GATT procedure initiated: read;", timeout=30) + dut.expect("Read complete; status=0", timeout=30) + print("Read passed\n\tSupportedNewAlertCategoryCharacteristic\n\tRead Status: 0") + + dut.expect("GATT procedure initiated: write;", timeout=30) + dut.expect("Write complete; status=0", timeout=30) + print("Write passed\n\tAlertNotificationControlPointCharacteristic\n\tWrite Status: 0") + + dut.expect("GATT procedure initiated: write;", timeout=30) + dut.expect("Subscribe complete; status=0", timeout=30) + print("Subscribe passed\n\tClientCharacteristicConfigurationDescriptor\n\tSubscribe Status: 0") + + +if __name__ == '__main__': + test_example_app_ble_central() diff --git a/examples/bluetooth/nimble/blehr/README.md b/examples/bluetooth/nimble/blehr/README.md index 29a773d96..af4b0581f 100644 --- a/examples/bluetooth/nimble/blehr/README.md +++ b/examples/bluetooth/nimble/blehr/README.md @@ -10,6 +10,13 @@ This example aims at understanding notification subscriptions and sending notifi To test this demo, any BLE scanner app can be used. +A Python based utility `blehr_test.py` is also provided (which will run as a BLE GATT Client) and can be used to test this example. + +Note : + +* Make sure to run `python -m pip install --user -r $IDF_PATH/requirements.txt -r $IDF_PATH/tools/ble/requirements.txt` to install the dependency packages needed. +* Currently this Python utility is only supported on Linux (BLE communication is via BLuez + DBus). + ## How to use example @@ -59,3 +66,50 @@ GATT procedure initiated: notify; att_handle=3 ``` +## Running Python Utility + +``` +python blehr_test.py +``` + +## Python Utility Output + +This is this output seen on the python side on successful connection: + +``` +discovering adapter... +bluetooth adapter discovered +powering on adapter... +bluetooth adapter powered on + +Started Discovery + +Connecting to device... + +Connected to device + +Services + +[dbus.String(u'00001801-0000-1000-8000-00805f9b34fb', variant_level=1), dbus.String(u'0000180d-0000-1000-8000-00805f9b34fb', variant_level=1), dbus.String(u'0000180a-0000-1000-8000-00805f9b34fb', variant_level=1)] + +Subscribe to notifications: On +dbus.Array([dbus.Byte(6), dbus.Byte(90)], signature=dbus.Signature('y'), variant_level=1) +dbus.Array([dbus.Byte(6), dbus.Byte(91)], signature=dbus.Signature('y'), variant_level=1) +dbus.Array([dbus.Byte(6), dbus.Byte(92)], signature=dbus.Signature('y'), variant_level=1) +dbus.Array([dbus.Byte(6), dbus.Byte(93)], signature=dbus.Signature('y'), variant_level=1) +dbus.Array([dbus.Byte(6), dbus.Byte(94)], signature=dbus.Signature('y'), variant_level=1) +dbus.Array([dbus.Byte(6), dbus.Byte(95)], signature=dbus.Signature('y'), variant_level=1) +dbus.Array([dbus.Byte(6), dbus.Byte(96)], signature=dbus.Signature('y'), variant_level=1) +dbus.Array([dbus.Byte(6), dbus.Byte(97)], signature=dbus.Signature('y'), variant_level=1) +dbus.Array([dbus.Byte(6), dbus.Byte(98)], signature=dbus.Signature('y'), variant_level=1) +dbus.Array([dbus.Byte(6), dbus.Byte(99)], signature=dbus.Signature('y'), variant_level=1) + +Subscribe to notifications: Off +Success: blehr example test passed + +exiting from test... +disconnecting device... +device disconnected +powering off adapter... +bluetooth adapter powered off +``` diff --git a/examples/bluetooth/nimble/blehr/blehr_test.py b/examples/bluetooth/nimble/blehr/blehr_test.py new file mode 100644 index 000000000..81f21ee92 --- /dev/null +++ b/examples/bluetooth/nimble/blehr/blehr_test.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +# +# Copyright 2019 Espressif Systems (Shanghai) PTE LTD +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import os +import sys +import re +from threading import Thread +import subprocess + +try: + # This environment variable is expected on the host machine + test_fw_path = os.getenv("TEST_FW_PATH") + if test_fw_path and test_fw_path not in sys.path: + sys.path.insert(0, test_fw_path) + import IDF +except ImportError as e: + print(e) + print("\nCheck your IDF_PATH\nOR") + print("Try `export TEST_FW_PATH=$IDF_PATH/tools/tiny-test-fw` for resolving the issue\nOR") + print("Try `pip install -r $IDF_PATH/tools/tiny-test-fw/requirements.txt` for resolving the issue\n") + import IDF + +try: + import lib_ble_client +except ImportError: + lib_ble_client_path = os.getenv("IDF_PATH") + "/tools/ble" + if lib_ble_client_path and lib_ble_client_path not in sys.path: + sys.path.insert(0, lib_ble_client_path) + import lib_ble_client + + +import Utility + +# When running on local machine execute the following before running this script +# > make app bootloader +# > make print_flash_cmd | tail -n 1 > build/download.config +# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw + + +def blehr_client_task(dut_addr, dut): + interface = 'hci0' + ble_devname = 'blehr_sensor_1.0' + hr_srv_uuid = '180d' + hr_char_uuid = '2a37' + + # Get BLE client module + ble_client_obj = lib_ble_client.BLE_Bluez_Client(interface, devname=ble_devname, devaddr=dut_addr) + if not ble_client_obj: + raise RuntimeError("Failed to get DBus-Bluez object") + + # Discover Bluetooth Adapter and power on + is_adapter_set = ble_client_obj.set_adapter() + if not is_adapter_set: + raise RuntimeError("Adapter Power On failed !!") + + # Connect BLE Device + is_connected = ble_client_obj.connect() + if not is_connected: + Utility.console_log("Connection to device ", ble_devname, "failed !!") + # Call disconnect to perform cleanup operations before exiting application + ble_client_obj.disconnect() + return + + # Read Services + services_ret = ble_client_obj.get_services() + if services_ret: + print("\nServices\n") + print(services_ret) + else: + print("Failure: Read Services failed") + ble_client_obj.disconnect() + return + + ''' + Blehr application run: + Start Notifications + Retrieve updated value + Stop Notifications + ''' + blehr_ret = ble_client_obj.hr_update_simulation(hr_srv_uuid, hr_char_uuid) + if blehr_ret: + print("Success: blehr example test passed") + else: + print("Failure: blehr example test failed") + + # Call disconnect to perform cleanup operations before exiting application + ble_client_obj.disconnect() + + +@IDF.idf_example_test(env_tag="Example_WIFI_BT") +def test_example_app_ble_hr(env, extra_data): + """ + Steps: + 1. Discover Bluetooth Adapter and Power On + 2. Connect BLE Device + 3. Start Notifications + 4. Updated value is retrieved + 5. Stop Notifications + """ + try: + # Acquire DUT + dut = env.get_dut("blehr", "examples/bluetooth/nimble/blehr") + + # Get binary file + binary_file = os.path.join(dut.app.binary_path, "blehr.bin") + bin_size = os.path.getsize(binary_file) + IDF.log_performance("blehr_bin_size", "{}KB".format(bin_size // 1024)) + IDF.check_performance("blehr_bin_size", bin_size // 1024) + + # Upload binary and start testing + Utility.console_log("Starting blehr simple example test app") + dut.start_app() + + subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*']) + + # Get device address from dut + dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0] + + # Starting a py-client in a separate thread + thread1 = Thread(target=blehr_client_task, args=(dut_addr,dut,)) + thread1.start() + thread1.join() + + # Check dut responses + dut.expect("subscribe event; cur_notify=1", timeout=30) + dut.expect("GATT procedure initiated: notify;", timeout=30) + dut.expect("subscribe event; cur_notify=0", timeout=30) + dut.expect("disconnect;", timeout=30) + + except Exception as e: + sys.exit(e) + + +if __name__ == '__main__': + test_example_app_ble_hr() diff --git a/examples/bluetooth/nimble/bleprph/README.md b/examples/bluetooth/nimble/bleprph/README.md index 5630bd0ef..6f0d96e8a 100644 --- a/examples/bluetooth/nimble/bleprph/README.md +++ b/examples/bluetooth/nimble/bleprph/README.md @@ -12,6 +12,12 @@ It also demonstrates security features of NimBLE stack. SMP parameters like I/O To test this demo, any BLE scanner app can be used. +A Python based utility `bleprph_test.py` is also provided (which will run as a BLE GATT Client) and can be used to test this example. + +Note : + +* Make sure to run `python -m pip install --user -r $IDF_PATH/requirements.txt -r $IDF_PATH/tools/ble/requirements.txt` to install the dependency packages needed. +* Currently this Python utility is only supported on Linux (BLE communication is via BLuez + DBus). ## How to use example @@ -73,3 +79,72 @@ peer_ota_addr_type=1 peer_ota_addr=xx:xx:xx:xx:xx:xx peer_id_addr_type=1 peer_id ``` +## Running Python Utility + +``` +python bleprph_test.py +``` + +## Python Utility Output + +This is this output seen on the python side on successful connection: + +``` +discovering adapter... +bluetooth adapter discovered +powering on adapter... +bluetooth adapter powered on + +Started Discovery + +Connecting to device... + +Connected to device + +Services + +[dbus.String(u'00001801-0000-1000-8000-00805f9b34fb', variant_level=1), dbus.String(u'59462f12-9543-9999-12c8-58b459a2712d', variant_level=1)] + +Characteristics retrieved + + Characteristic: /org/bluez/hci0/dev_xx_xx_xx_xx_xx_xx/service000a/char000b + Characteristic UUID: 5c3a659e-897e-45e1-b016-007107c96df6 + Value: dbus.Array([dbus.Byte(45), dbus.Byte(244), dbus.Byte(81), dbus.Byte(88)], signature=dbus.Signature('y')) + Properties: : dbus.Array([dbus.String(u'read')], signature=dbus.Signature('s'), variant_level=1) + + Characteristic: /org/bluez/hci0/dev_xx_xx_xx_xx_xx_xx/service000a/char000d + Characteristic UUID: 5c3a659e-897e-45e1-b016-007107c96df7 + Value: dbus.Array([dbus.Byte(0)], signature=dbus.Signature('y')) + Properties: : dbus.Array([dbus.String(u'read'), dbus.String(u'write')], signature=dbus.Signature('s'), variant_level=1) + + Characteristic: /org/bluez/hci0/dev_xx_xx_xx_xx_xx_xx/service0006/char0007 + Characteristic UUID: 00002a05-0000-1000-8000-00805f9b34fb + Value: None + Properties: : dbus.Array([dbus.String(u'indicate')], signature=dbus.Signature('s'), variant_level=1) + +Characteristics after write operation + + Characteristic: /org/bluez/hci0/dev_xx_xx_xx_xx_xx_xx/service000a/char000b + Characteristic UUID: 5c3a659e-897e-45e1-b016-007107c96df6 + Value: dbus.Array([dbus.Byte(45), dbus.Byte(244), dbus.Byte(81), dbus.Byte(88)], signature=dbus.Signature('y')) + Properties: : dbus.Array([dbus.String(u'read')], signature=dbus.Signature('s'), variant_level=1) + + Characteristic: /org/bluez/hci0/dev_xx_xx_xx_xx_xx_xx/service000a/char000d + Characteristic UUID: 5c3a659e-897e-45e1-b016-007107c96df7 + Value: dbus.Array([dbus.Byte(65)], signature=dbus.Signature('y')) + Properties: : dbus.Array([dbus.String(u'read'), dbus.String(u'write')], signature=dbus.Signature('s'), variant_level=1) + + Characteristic: /org/bluez/hci0/dev_xx_xx_xx_xx_xx_xx/service0006/char0007 + Characteristic UUID: 00002a05-0000-1000-8000-00805f9b34fb + Value: None + Properties: : dbus.Array([dbus.String(u'indicate')], signature=dbus.Signature('s'), variant_level=1) + +exiting from test... +disconnecting device... +device disconnected +powering off adapter... +bluetooth adapter powered off +``` + +## Note +* NVS support is not yet integrated to bonding. So, for now, bonding is not persistent across reboot. diff --git a/examples/bluetooth/nimble/bleprph/bleprph_test.py b/examples/bluetooth/nimble/bleprph/bleprph_test.py new file mode 100644 index 000000000..3e29f668c --- /dev/null +++ b/examples/bluetooth/nimble/bleprph/bleprph_test.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python +# +# Copyright 2019 Espressif Systems (Shanghai) PTE LTD +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import os +import sys +import re +from threading import Thread +import subprocess + +try: + # This environment variable is expected on the host machine + test_fw_path = os.getenv("TEST_FW_PATH") + if test_fw_path and test_fw_path not in sys.path: + sys.path.insert(0, test_fw_path) + import IDF +except ImportError as e: + print(e) + print("Try `export TEST_FW_PATH=$IDF_PATH/tools/tiny-test-fw` for resolving the issue") + print("Try `pip install -r $IDF_PATH/tools/tiny-test-fw/requirements.txt` for resolving the issue") + import IDF + +try: + import lib_ble_client +except ImportError: + lib_ble_client_path = os.getenv("IDF_PATH") + "/tools/ble" + if lib_ble_client_path and lib_ble_client_path not in sys.path: + sys.path.insert(0, lib_ble_client_path) + import lib_ble_client + + +import Utility + +# When running on local machine execute the following before running this script +# > make app bootloader +# > make print_flash_cmd | tail -n 1 > build/download.config +# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw + + +def bleprph_client_task(dut_addr, dut): + interface = 'hci0' + ble_devname = 'nimble-bleprph' + srv_uuid = '2f12' + + # Get BLE client module + ble_client_obj = lib_ble_client.BLE_Bluez_Client(interface, devname=ble_devname, devaddr=dut_addr) + if not ble_client_obj: + raise RuntimeError("Failed to get DBus-Bluez object") + + # Discover Bluetooth Adapter and power on + is_adapter_set = ble_client_obj.set_adapter() + if not is_adapter_set: + raise RuntimeError("Adapter Power On failed !!") + + # Connect BLE Device + is_connected = ble_client_obj.connect() + if not is_connected: + Utility.console_log("Connection to device ", ble_devname, "failed !!") + # Call disconnect to perform cleanup operations before exiting application + ble_client_obj.disconnect() + return + + # Check dut responses + dut.expect("GAP procedure initiated: advertise;", timeout=30) + + # Read Services + services_ret = ble_client_obj.get_services(srv_uuid) + if services_ret: + print("\nServices\n") + print(services_ret) + else: + print("Failure: Read Services failed") + ble_client_obj.disconnect() + return + + # Read Characteristics + chars_ret = {} + chars_ret = ble_client_obj.read_chars() + if chars_ret: + Utility.console_log("\nCharacteristics retrieved") + for path, props in chars_ret.items(): + print("\n\tCharacteristic: ", path) + print("\tCharacteristic UUID: ", props[2]) + print("\tValue: ", props[0]) + print("\tProperties: : ", props[1]) + else: + print("Failure: Read Characteristics failed") + ble_client_obj.disconnect() + return + + ''' + Write Characteristics + - write 'A' to characteristic with write permission + ''' + chars_ret_on_write = {} + chars_ret_on_write = ble_client_obj.write_chars('A') + if chars_ret_on_write: + Utility.console_log("\nCharacteristics after write operation") + for path, props in chars_ret_on_write.items(): + print("\n\tCharacteristic:", path) + print("\tCharacteristic UUID: ", props[2]) + print("\tValue:", props[0]) + print("\tProperties: : ", props[1]) + else: + print("Failure: Write Characteristics failed") + ble_client_obj.disconnect() + return + + # Call disconnect to perform cleanup operations before exiting application + ble_client_obj.disconnect() + + +@IDF.idf_example_test(env_tag="Example_WIFI_BT") +def test_example_app_ble_peripheral(env, extra_data): + """ + Steps: + 1. Discover Bluetooth Adapter and Power On + 2. Connect BLE Device + 3. Read Services + 4. Read Characteristics + 5. Write Characteristics + """ + try: + + # Acquire DUT + dut = env.get_dut("bleprph", "examples/bluetooth/nimble/bleprph") + + # Get binary file + binary_file = os.path.join(dut.app.binary_path, "bleprph.bin") + bin_size = os.path.getsize(binary_file) + IDF.log_performance("bleprph_bin_size", "{}KB".format(bin_size // 1024)) + IDF.check_performance("bleprph_bin_size", bin_size // 1024) + + # Upload binary and start testing + Utility.console_log("Starting bleprph simple example test app") + dut.start_app() + + subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*']) + + # Get device address from dut + dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0] + + # Starting a py-client in a separate thread + thread1 = Thread(target=bleprph_client_task, args=(dut_addr,dut,)) + thread1.start() + thread1.join() + + # Check dut responses + dut.expect("connection established; status=0", timeout=30) + dut.expect("disconnect;", timeout=30) + except Exception as e: + sys.exit(e) + + +if __name__ == '__main__': + test_example_app_ble_peripheral() diff --git a/tools/ble/lib_ble_client.py b/tools/ble/lib_ble_client.py new file mode 100644 index 000000000..3cbec0370 --- /dev/null +++ b/tools/ble/lib_ble_client.py @@ -0,0 +1,805 @@ +#!/usr/bin/env python +# +# Copyright 2019 Espressif Systems (Shanghai) PTE LTD +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# DBus-Bluez BLE library + +from __future__ import print_function +import sys +import time + +try: + from future.moves.itertools import zip_longest + import dbus + import dbus.mainloop.glib + from gi.repository import GLib +except ImportError as e: + if 'linux' not in sys.platform: + sys.exit("Error: Only supported on Linux platform") + print(e) + print("Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue") + print("Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue") + raise + +import lib_gatt +import lib_gap + +srv_added_old_cnt = 0 +srv_added_new_cnt = 0 +blecent_retry_check_cnt = 0 +verify_service_cnt = 0 +verify_readchars_cnt = 0 +blecent_adv_uuid = '1811' +iface_added = False +gatt_app_obj_check = False +gatt_app_reg_check = False +adv_data_check = False +adv_reg_check = False +read_req_check = False +write_req_check = False +subscribe_req_check = False +ble_hr_chrc = False + +DISCOVERY_START = False + +TEST_CHECKS_PASS = False +ADV_STOP = False + +SERVICES_RESOLVED = False +SERVICE_UUID_FOUND = False + +BLUEZ_SERVICE_NAME = 'org.bluez' +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' + +ADAPTER_IFACE = 'org.bluez.Adapter1' +DEVICE_IFACE = 'org.bluez.Device1' + +GATT_MANAGER_IFACE = 'org.bluez.GattManager1' +LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1' + +GATT_SERVICE_IFACE = 'org.bluez.GattService1' +GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' + +ADAPTER_ON = False +DEVICE_CONNECTED = False +GATT_APP_REGISTERED = False +ADV_REGISTERED = False +ADV_ACTIVE_INSTANCE = False + +CHRC_VALUE_CNT = False + +# Set up the main loop. +dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) +dbus.mainloop.glib.threads_init() +# Set up the event main loop. +event_loop = GLib.MainLoop() + + +def set_props_status(props): + """ + Set Adapter status if it is powered on or off + """ + global ADAPTER_ON, SERVICES_RESOLVED, GATT_OBJ_REMOVED, GATT_APP_REGISTERED, \ + ADV_REGISTERED, ADV_ACTIVE_INSTANCE, DEVICE_CONNECTED, CHRC_VALUE, CHRC_VALUE_CNT + is_service_uuid = False + # Signal caught for change in Adapter Powered property + if 'Powered' in props: + if props['Powered'] == 1: + ADAPTER_ON = True + else: + ADAPTER_ON = False + event_loop.quit() + elif 'ServicesResolved' in props: + if props['ServicesResolved'] == 1: + SERVICES_RESOLVED = True + else: + SERVICES_RESOLVED = False + elif 'UUIDs' in props: + # Signal caught for add/remove GATT data having service uuid + for uuid in props['UUIDs']: + if blecent_adv_uuid in uuid: + is_service_uuid = True + if not is_service_uuid: + # Signal caught for removing GATT data having service uuid + # and for unregistering GATT application + GATT_APP_REGISTERED = False + lib_gatt.GATT_APP_OBJ = False + elif 'ActiveInstances' in props: + # Signal caught for Advertising - add/remove Instances property + if props['ActiveInstances'] == 1: + ADV_ACTIVE_INSTANCE = True + elif props['ActiveInstances'] == 0: + ADV_ACTIVE_INSTANCE = False + ADV_REGISTERED = False + lib_gap.ADV_OBJ = False + elif 'Connected' in props: + # Signal caught for device connect/disconnect + if props['Connected'] is True: + DEVICE_CONNECTED = True + event_loop.quit() + else: + DEVICE_CONNECTED = False + elif 'Value' in props: + # Signal caught for change in chars value + if ble_hr_chrc: + CHRC_VALUE_CNT += 1 + print(props['Value']) + if CHRC_VALUE_CNT == 10: + event_loop.quit() + + +def props_change_handler(iface, changed_props, invalidated): + """ + PropertiesChanged Signal handler. + Catch and print information about PropertiesChanged signal. + """ + + if iface == ADAPTER_IFACE: + set_props_status(changed_props) + if iface == LE_ADVERTISING_MANAGER_IFACE: + set_props_status(changed_props) + if iface == DEVICE_IFACE: + set_props_status(changed_props) + if iface == GATT_CHRC_IFACE: + set_props_status(changed_props) + + +class BLE_Bluez_Client: + def __init__(self, iface, devname=None, devaddr=None): + self.bus = None + self.device = None + self.devname = devname + self.devaddr = devaddr + self.iface = iface + self.ble_objs = None + self.props_iface_obj = None + self.adapter_path = [] + self.adapter = None + self.services = [] + self.srv_uuid = [] + self.chars = {} + self.char_uuid = [] + + try: + self.bus = dbus.SystemBus() + om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE) + self.ble_objs = om_iface_obj.GetManagedObjects() + + except Exception as e: + print(e) + + def __del__(self): + try: + print("Test Exit") + except Exception as e: + print(e) + sys.exit(1) + + def set_adapter(self): + ''' + Discover Bluetooth Adapter + Power On Bluetooth Adapter + ''' + try: + print("discovering adapter...") + for path, interfaces in self.ble_objs.items(): + adapter = interfaces.get(ADAPTER_IFACE) + if adapter is not None: + if path.endswith(self.iface): + self.adapter = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), ADAPTER_IFACE) + # Create Properties Interface object only after adapter is found + self.props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE) + self.adapter_path = [path, interfaces] + # Check adapter status - power on/off + set_props_status(interfaces[ADAPTER_IFACE]) + break + + if self.adapter is None: + raise RuntimeError("\nError: bluetooth adapter not found") + + if self.props_iface_obj is None: + raise RuntimeError("\nError: properties interface not found") + + print("bluetooth adapter discovered") + + self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler) + + # Check if adapter is already powered on + if ADAPTER_ON: + print("bluetooth adapter is already on") + return True + + # Power On Adapter + print("powering on adapter...") + self.props_iface_obj.Set(ADAPTER_IFACE, "Powered", dbus.Boolean(1)) + + event_loop.run() + + if ADAPTER_ON: + print("bluetooth adapter powered on") + return True + else: + print("Failure: bluetooth adapter not powered on") + return False + + except Exception as e: + print(e) + sys.exit(1) + + def connect(self): + ''' + Connect to the device discovered + Retry 10 times to discover and connect to device + ''' + global DISCOVERY_START + + device_found = False + try: + self.adapter.StartDiscovery() + print("\nStarted Discovery") + + DISCOVERY_START = True + + for retry_cnt in range(10,0,-1): + try: + if self.device is None: + print("\nConnecting to device...") + # Wait for device to be discovered + time.sleep(5) + device_found = self.get_device() + if device_found: + self.device.Connect(dbus_interface=DEVICE_IFACE) + event_loop.quit() + print("\nConnected to device") + return True + except Exception as e: + print(e) + print("\nRetries left", retry_cnt - 1) + continue + + # Device not found + return False + + except Exception as e: + print(e) + self.device = None + return False + + def get_device(self): + ''' + Discover device based on device name + and device address and connect + ''' + dev_path = None + + om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE) + self.ble_objs = om_iface_obj.GetManagedObjects() + for path, interfaces in self.ble_objs.items(): + if DEVICE_IFACE not in interfaces.keys(): + continue + device_addr_iface = (path.replace('_', ':')).lower() + dev_addr = self.devaddr.lower() + if dev_addr in device_addr_iface and \ + interfaces[DEVICE_IFACE].get("Name") == self.devname: + dev_path = path + break + + if dev_path is None: + print("\nBLE device not found") + return False + + device_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DBUS_PROP_IFACE) + device_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler) + + self.device = self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path) + return True + + def srvc_iface_added_handler(self, path, interfaces): + ''' + Add services found as lib_ble_client obj + ''' + if self.device and path.startswith(self.device.object_path): + if GATT_SERVICE_IFACE in interfaces.keys(): + service = self.bus.get_object(BLUEZ_SERVICE_NAME, path) + uuid = service.Get(GATT_SERVICE_IFACE, 'UUID', dbus_interface=DBUS_PROP_IFACE) + if uuid not in self.srv_uuid: + self.srv_uuid.append(uuid) + if path not in self.services: + self.services.append(path) + + def verify_get_services(self): + global SERVICE_SCAN_FAIL, verify_service_cnt + verify_service_cnt += 1 + if iface_added and self.services and SERVICES_RESOLVED: + event_loop.quit() + + if verify_service_cnt == 10: + event_loop.quit() + + def verify_service_uuid_found(self): + ''' + Verify service uuid found + ''' + global SERVICE_UUID_FOUND + + srv_uuid_found = [uuid for uuid in self.srv_uuid if service_uuid in uuid] + if srv_uuid_found: + SERVICE_UUID_FOUND = True + + def get_services(self, srv_uuid=None): + ''' + Retrieve Services found in the device connected + ''' + global service_uuid, iface_added, SERVICE_UUID_FOUND + service_uuid = srv_uuid + iface_added = False + SERVICE_UUID_FOUND = False + try: + om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE) + self.ble_objs = om_iface_obj.GetManagedObjects() + + for path, interfaces in self.ble_objs.items(): + self.srvc_iface_added_handler(path, interfaces) + # If services not found, then they may not have been added yet on dbus + if not self.services: + iface_added = True + GLib.timeout_add_seconds(2, self.verify_get_services) + om_iface_obj.connect_to_signal('InterfacesAdded', self.srvc_iface_added_handler) + event_loop.run() + if service_uuid: + self.verify_service_uuid_found() + if not SERVICE_UUID_FOUND: + raise Exception("Service with uuid: %s not found !!!" % service_uuid) + return self.srv_uuid + except Exception as e: + print("Error: ", e) + return False + + def chrc_iface_added_handler(self, path, interfaces): + ''' + Add services found as lib_ble_client obj + ''' + global chrc, chrc_discovered + chrc_val = None + + if self.device and path.startswith(self.device.object_path): + if GATT_CHRC_IFACE in interfaces.keys(): + chrc = self.bus.get_object(BLUEZ_SERVICE_NAME, path) + chrc_props = chrc.GetAll(GATT_CHRC_IFACE, + dbus_interface=DBUS_PROP_IFACE) + chrc_flags = chrc_props['Flags'] + if 'read' in chrc_flags: + chrc_val = chrc.ReadValue({}, dbus_interface=GATT_CHRC_IFACE) + uuid = chrc_props['UUID'] + self.chars[path] = chrc_val, chrc_flags, uuid + + def verify_get_chars(self): + global verify_readchars_cnt + verify_readchars_cnt += 1 + if iface_added and self.chars: + event_loop.quit() + if verify_readchars_cnt == 10: + event_loop.quit() + + def read_chars(self): + ''' + Read characteristics found in the device connected + ''' + global iface_added, chrc_discovered + chrc_discovered = False + iface_added = False + + try: + om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE) + self.ble_objs = om_iface_obj.GetManagedObjects() + for path, interfaces in self.ble_objs.items(): + self.chrc_iface_added_handler(path, interfaces) + + # If chars not found, then they have not been added yet to interface + if not self.chars: + iface_added = True + GLib.timeout_add_seconds(2, self.verify_get_chars) + om_iface_obj.connect_to_signal('InterfacesAdded', self.chars_iface_added_handler) + event_loop.run() + return self.chars + except Exception as e: + print("Error: ", e) + return False + + def write_chars(self, write_val): + ''' + Write characteristics to the device connected + ''' + chrc = None + chrc_val = None + char_write_props = False + + try: + for path, props in self.chars.items(): + if 'write' in props[1]: # check permission + char_write_props = True + chrc = self.bus.get_object(BLUEZ_SERVICE_NAME, path) + chrc.WriteValue(write_val,{},dbus_interface=GATT_CHRC_IFACE) + if 'read' in props[1]: + chrc_val = chrc.ReadValue({}, dbus_interface=GATT_CHRC_IFACE) + else: + print("Warning: Cannot read value. Characteristic does not have read permission.") + if not (ord(write_val) == int(chrc_val[0])): + print("\nWrite Failed") + return False + self.chars[path] = chrc_val, props[1], props[2] # update value + if not char_write_props: + print("Failure: Cannot perform write operation. Characteristic does not have write permission.") + return False + + return self.chars + except Exception as e: + print(e) + return False + + def hr_update_simulation(self, hr_srv_uuid, hr_char_uuid): + ''' + Start Notifications + Retrieve updated value + Stop Notifications + ''' + global ble_hr_chrc + + srv_path = None + chrc = None + uuid = None + chrc_path = None + chars_ret = None + ble_hr_chrc = True + + try: + # Get HR Measurement characteristic + services = list(zip_longest(self.srv_uuid, self.services)) + for uuid, path in services: + if hr_srv_uuid in uuid: + srv_path = path + break + + if srv_path is None: + print("Failure: HR UUID:", hr_srv_uuid, "not found") + return False + + chars_ret = self.read_chars() + + for path, props in chars_ret.items(): + if path.startswith(srv_path): + chrc = self.bus.get_object(BLUEZ_SERVICE_NAME, path) + chrc_path = path + if hr_char_uuid in props[2]: # uuid + break + if chrc is None: + print("Failure: Characteristics for service: ", srv_path, "not found") + return False + # Subscribe to notifications + print("\nSubscribe to notifications: On") + chrc.StartNotify(dbus_interface=GATT_CHRC_IFACE) + + chrc_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, chrc_path), DBUS_PROP_IFACE) + chrc_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler) + + event_loop.run() + chrc.StopNotify(dbus_interface=GATT_CHRC_IFACE) + time.sleep(2) + print("\nSubscribe to notifications: Off") + + ble_hr_chrc = False + return True + + except Exception as e: + print(e) + return False + + def create_gatt_app(self): + ''' + Create GATT data + Register GATT Application + ''' + global gatt_app_obj, gatt_manager_iface_obj + + gatt_app_obj = None + gatt_manager_iface_obj = None + + try: + gatt_app_obj = lib_gatt.Application(self.bus, self.adapter_path[0]) + gatt_manager_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME,self.adapter_path[0]), GATT_MANAGER_IFACE) + + gatt_manager_iface_obj.RegisterApplication(gatt_app_obj, {}, + reply_handler=self.gatt_app_handler, + error_handler=self.gatt_app_error_handler) + return True + except Exception as e: + print(e) + return False + + def gatt_app_handler(self): + ''' + GATT Application Register success handler + ''' + global GATT_APP_REGISTERED + GATT_APP_REGISTERED = True + + def gatt_app_error_handler(self, error): + ''' + GATT Application Register error handler + ''' + global GATT_APP_REGISTERED + GATT_APP_REGISTERED = False + + def start_advertising(self, adv_host_name, adv_iface_index, adv_type, adv_uuid): + ''' + Create Advertising data + Register Advertisement + Start Advertising + ''' + global le_adv_obj, le_adv_manager_iface_obj + le_adv_obj = None + le_adv_manager_iface_obj = None + le_adv_iface_path = None + + try: + print("Advertising started") + gatt_app_ret = self.create_gatt_app() + + if not gatt_app_ret: + return False + + for path,interface in self.ble_objs.items(): + if LE_ADVERTISING_MANAGER_IFACE in interface: + le_adv_iface_path = path + + if le_adv_iface_path is None: + print('\n Cannot start advertising. LEAdvertisingManager1 Interface not found') + return False + + le_adv_obj = lib_gap.Advertisement(self.bus, adv_iface_index, adv_type, adv_uuid, adv_host_name) + le_adv_manager_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, le_adv_iface_path), LE_ADVERTISING_MANAGER_IFACE) + + le_adv_manager_iface_obj.RegisterAdvertisement(le_adv_obj.get_path(), {}, + reply_handler=self.adv_handler, + error_handler=self.adv_error_handler) + + GLib.timeout_add_seconds(2, self.verify_blecent) + event_loop.run() + + if TEST_CHECKS_PASS: + return True + else: + return False + + except Exception as e: + print("in Exception") + print(e) + return False + + def adv_handler(self): + ''' + Advertisement Register success handler + ''' + global ADV_REGISTERED + ADV_REGISTERED = True + + def adv_error_handler(self, error): + ''' + Advertisement Register error handler + ''' + global ADV_REGISTERED + ADV_REGISTERED = False + + def verify_blecent(self): + """ + Verify blecent test commands are successful + """ + global blecent_retry_check_cnt, gatt_app_obj_check, gatt_app_reg_check,\ + adv_data_check, adv_reg_check, read_req_check, write_req_check,\ + subscribe_req_check, TEST_CHECKS_PASS + + # Get device when connected + if not self.device: + om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE) + self.ble_objs = om_iface_obj.GetManagedObjects() + + for path, interfaces in self.ble_objs.items(): + if DEVICE_IFACE not in interfaces.keys(): + continue + device_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE) + device_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler) + self.device = self.bus.get_object(BLUEZ_SERVICE_NAME, path) + + # Check for failures after 10 retries + if blecent_retry_check_cnt == 10: + # check for failures + if not gatt_app_obj_check: + print("Failure: GATT Data could not be created") + if not gatt_app_reg_check: + print("Failure: GATT Application could not be registered") + if not adv_data_check: + print("Failure: Advertising data could not be created") + if not adv_reg_check: + print("Failure: Advertisement could not be registered") + if not read_req_check: + print("Failure: Read Request not received") + if not write_req_check: + print("Failure: Write Request not received") + if not subscribe_req_check: + print("Failure: Subscribe Request not received") + + # Blecent Test failed + TEST_CHECKS_PASS = False + if subscribe_req_check: + lib_gatt.alert_status_char_obj.StopNotify() + event_loop.quit() + return False # polling for checks will stop + + # Check for success + if not gatt_app_obj_check and lib_gatt.GATT_APP_OBJ: + print("GATT Data created") + gatt_app_obj_check = True + if not gatt_app_reg_check and GATT_APP_REGISTERED: + print("GATT Application registered") + gatt_app_reg_check = True + if not adv_data_check and lib_gap.ADV_OBJ: + print("Advertising data created") + adv_data_check = True + if not adv_reg_check and ADV_REGISTERED and ADV_ACTIVE_INSTANCE: + print("Advertisement registered") + adv_reg_check = True + if not read_req_check and lib_gatt.CHAR_READ: + read_req_check = True + if not write_req_check and lib_gatt.CHAR_WRITE: + write_req_check = True + if not subscribe_req_check and lib_gatt.CHAR_SUBSCRIBE: + subscribe_req_check = True + + # Increment retry count + blecent_retry_check_cnt += 1 + if read_req_check and write_req_check and subscribe_req_check: + # all checks passed + # Blecent Test passed + TEST_CHECKS_PASS = True + lib_gatt.alert_status_char_obj.StopNotify() + event_loop.quit() + return False # polling for checks will stop + + # Default return True - polling for checks will continue + return True + + def verify_blecent_disconnect(self): + """ + Verify cleanup is successful + """ + global blecent_retry_check_cnt, gatt_app_obj_check, gatt_app_reg_check,\ + adv_data_check, adv_reg_check, ADV_STOP + + if blecent_retry_check_cnt == 0: + gatt_app_obj_check = False + gatt_app_reg_check = False + adv_data_check = False + adv_reg_check = False + + # Check for failures after 10 retries + if blecent_retry_check_cnt == 10: + # check for failures + if not gatt_app_obj_check: + print("Warning: GATT Data could not be removed") + if not gatt_app_reg_check: + print("Warning: GATT Application could not be unregistered") + if not adv_data_check: + print("Warning: Advertising data could not be removed") + if not adv_reg_check: + print("Warning: Advertisement could not be unregistered") + + # Blecent Test failed + ADV_STOP = False + event_loop.quit() + return False # polling for checks will stop + + # Check for success + if not gatt_app_obj_check and not lib_gatt.GATT_APP_OBJ: + print("GATT Data removed") + gatt_app_obj_check = True + if not gatt_app_reg_check and not GATT_APP_REGISTERED: + print("GATT Application unregistered") + gatt_app_reg_check = True + if not adv_data_check and not adv_reg_check and not (ADV_REGISTERED or ADV_ACTIVE_INSTANCE or lib_gap.ADV_OBJ): + print("Advertising data removed") + print("Advertisement unregistered") + adv_data_check = True + adv_reg_check = True + + # Increment retry count + blecent_retry_check_cnt += 1 + if adv_reg_check: + # all checks passed + ADV_STOP = True + event_loop.quit() + return False # polling for checks will stop + + # Default return True - polling for checks will continue + return True + + def disconnect(self): + ''' + Before application exits: + Advertisement is unregistered + Advertisement object created is removed from dbus + GATT Application is unregistered + GATT Application object created is removed from dbus + Disconnect device if connected + Adapter is powered off + ''' + try: + global blecent_retry_check_cnt, DISCOVERY_START + blecent_retry_check_cnt = 0 + + print("\nexiting from test...") + + if ADV_REGISTERED: + # Unregister Advertisement + le_adv_manager_iface_obj.UnregisterAdvertisement(le_adv_obj.get_path()) + + # Remove Advertising data + dbus.service.Object.remove_from_connection(le_adv_obj) + + if GATT_APP_REGISTERED: + # Unregister GATT Application + gatt_manager_iface_obj.UnregisterApplication(gatt_app_obj.get_path()) + + # Remove GATT data + dbus.service.Object.remove_from_connection(gatt_app_obj) + + GLib.timeout_add_seconds(2, self.verify_blecent_disconnect) + event_loop.run() + + if ADV_STOP: + print("Stop Advertising status: ", ADV_STOP) + else: + print("Warning: Stop Advertising status: ", ADV_STOP) + + # Disconnect device + if self.device: + print("disconnecting device...") + self.device.Disconnect(dbus_interface=DEVICE_IFACE) + if self.adapter: + self.adapter.RemoveDevice(self.device) + self.device = None + if DISCOVERY_START: + self.adapter.StopDiscovery() + DISCOVERY_START = False + + # Power off Adapter + self.props_iface_obj.Set(ADAPTER_IFACE, "Powered", dbus.Boolean(0)) + event_loop.run() + + if not DEVICE_CONNECTED: + print("device disconnected") + else: + print("Warning: device could not be disconnected") + + print("powering off adapter...") + if not ADAPTER_ON: + print("bluetooth adapter powered off") + else: + print("\nWarning: Bluetooth adapter not powered off") + + except Exception as e: + print(e) + return False diff --git a/tools/ble/lib_gap.py b/tools/ble/lib_gap.py new file mode 100644 index 000000000..2033dc4ab --- /dev/null +++ b/tools/ble/lib_gap.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python +# +# Copyright 2019 Espressif Systems (Shanghai) PTE LTD +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Register Advertisement + +from __future__ import print_function +import sys + +try: + import dbus + import dbus.service +except ImportError as e: + if 'linux' not in sys.platform: + sys.exit("Error: Only supported on Linux platform") + print(e) + print("Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue") + print("Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue") + raise + +ADV_OBJ = False + +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' +LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1' + + +class InvalidArgsException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' + + +class Advertisement(dbus.service.Object): + PATH_BASE = '/org/bluez/hci0/advertisement' + + def __init__(self, bus, index, advertising_type, uuid, name): + self.path = self.PATH_BASE + str(index) + self.bus = bus + self.ad_type = advertising_type + self.service_uuids = [uuid] + self.local_name = dbus.String(name) + dbus.service.Object.__init__(self, bus, self.path) + + def __del__(self): + pass + + def get_properties(self): + properties = dict() + properties['Type'] = self.ad_type + + if self.service_uuids is not None: + properties['ServiceUUIDs'] = dbus.Array(self.service_uuids, + signature='s') + if self.local_name is not None: + properties['LocalName'] = dbus.String(self.local_name) + + return {LE_ADVERTISEMENT_IFACE: properties} + + def get_path(self): + return dbus.ObjectPath(self.path) + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + global ADV_OBJ + if interface != LE_ADVERTISEMENT_IFACE: + raise InvalidArgsException() + ADV_OBJ = True + return self.get_properties()[LE_ADVERTISEMENT_IFACE] + + @dbus.service.method(LE_ADVERTISEMENT_IFACE, + in_signature='', + out_signature='') + def Release(self): + pass diff --git a/tools/ble/lib_gatt.py b/tools/ble/lib_gatt.py new file mode 100644 index 000000000..c666d8e7e --- /dev/null +++ b/tools/ble/lib_gatt.py @@ -0,0 +1,412 @@ +#!/usr/bin/env python +# +# Copyright 2019 Espressif Systems (Shanghai) PTE LTD +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Creating GATT Application which then becomes available to remote devices. + +from __future__ import print_function +import sys + +try: + import dbus + import dbus.service +except ImportError as e: + if 'linux' not in sys.platform: + sys.exit("Error: Only supported on Linux platform") + print(e) + print("Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue") + print("Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue") + raise + +alert_status_char_obj = None + +GATT_APP_OBJ = False +CHAR_READ = False +CHAR_WRITE = False +CHAR_SUBSCRIBE = False + +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' +GATT_MANAGER_IFACE = 'org.bluez.GattManager1' +GATT_SERVICE_IFACE = 'org.bluez.GattService1' +GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' +GATT_DESC_IFACE = 'org.bluez.GattDescriptor1' + + +class InvalidArgsException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' + + +class NotSupportedException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.NotSupported' + + +class Application(dbus.service.Object): + """ + org.bluez.GattApplication1 interface implementation + """ + + def __init__(self, bus, path): + self.path = path + self.services = [] + srv_obj = AlertNotificationService(bus, '0001') + self.add_service(srv_obj) + dbus.service.Object.__init__(self, bus, self.path) + + def __del__(self): + pass + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_service(self, service): + self.services.append(service) + + @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') + def GetManagedObjects(self): + global GATT_APP_OBJ + response = {} + + for service in self.services: + response[service.get_path()] = service.get_properties() + chrcs = service.get_characteristics() + for chrc in chrcs: + response[chrc.get_path()] = chrc.get_properties() + descs = chrc.get_descriptors() + for desc in descs: + response[desc.get_path()] = desc.get_properties() + + GATT_APP_OBJ = True + return response + + def Release(self): + pass + + +class Service(dbus.service.Object): + """ + org.bluez.GattService1 interface implementation + """ + PATH_BASE = '/org/bluez/hci0/service' + + def __init__(self, bus, index, uuid, primary=False): + self.path = self.PATH_BASE + str(index) + self.bus = bus + self.uuid = uuid + self.primary = primary + self.characteristics = [] + dbus.service.Object.__init__(self, bus, self.path) + + def get_properties(self): + return { + GATT_SERVICE_IFACE: { + 'UUID': self.uuid, + 'Primary': self.primary, + 'Characteristics': dbus.Array( + self.get_characteristic_paths(), + signature='o') + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_characteristic(self, characteristic): + self.characteristics.append(characteristic) + + def get_characteristic_paths(self): + result = [] + for chrc in self.characteristics: + result.append(chrc.get_path()) + return result + + def get_characteristics(self): + return self.characteristics + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_SERVICE_IFACE: + raise InvalidArgsException() + return self.get_properties()[GATT_SERVICE_IFACE] + + +class Characteristic(dbus.service.Object): + """ + org.bluez.GattCharacteristic1 interface implementation + """ + def __init__(self, bus, index, uuid, flags, service): + self.path = service.path + '/char' + str(index) + self.bus = bus + self.uuid = uuid + self.service = service + self.flags = flags + self.value = [0] + self.descriptors = [] + dbus.service.Object.__init__(self, bus, self.path) + + def get_properties(self): + return { + GATT_CHRC_IFACE: { + 'Service': self.service.get_path(), + 'UUID': self.uuid, + 'Flags': self.flags, + 'Value': self.value, + 'Descriptors': dbus.Array(self.get_descriptor_paths(), signature='o') + + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_descriptor(self, descriptor): + self.descriptors.append(descriptor) + + def get_descriptor_paths(self): + result = [] + for desc in self.descriptors: + result.append(desc.get_path()) + return result + + def get_descriptors(self): + return self.descriptors + + @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_CHRC_IFACE: + raise InvalidArgsException() + + return self.get_properties()[GATT_CHRC_IFACE] + + @dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay') + def ReadValue(self, options): + print('\nDefault ReadValue called, returning error') + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}') + def WriteValue(self, value, options): + print('\nDefault WriteValue called, returning error') + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE) + def StartNotify(self): + print('Default StartNotify called, returning error') + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE) + def StopNotify(self): + print('Default StopNotify called, returning error') + raise NotSupportedException() + + @dbus.service.signal(DBUS_PROP_IFACE, + signature='sa{sv}as') + def PropertiesChanged(self, interface, changed, invalidated): + print("\nProperties Changed") + + +class Descriptor(dbus.service.Object): + """ + org.bluez.GattDescriptor1 interface implementation + """ + def __init__(self, bus, index, uuid, flags, characteristic): + self.path = characteristic.path + '/desc' + str(index) + self.bus = bus + self.uuid = uuid + self.flags = flags + self.chrc = characteristic + dbus.service.Object.__init__(self, bus, self.path) + + def get_properties(self): + return { + GATT_DESC_IFACE: { + 'Characteristic': self.chrc.get_path(), + 'UUID': self.uuid, + 'Flags': self.flags, + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_DESC_IFACE: + raise InvalidArgsException() + + return self.get_properties()[GATT_DESC_IFACE] + + @dbus.service.method(GATT_DESC_IFACE, in_signature='a{sv}', out_signature='ay') + def ReadValue(self, options): + print('Default ReadValue called, returning error') + raise NotSupportedException() + + @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}') + def WriteValue(self, value, options): + print('Default WriteValue called, returning error') + raise NotSupportedException() + + +class AlertNotificationService(Service): + TEST_SVC_UUID = '00001811-0000-1000-8000-00805f9b34fb' + + def __init__(self, bus, index): + global alert_status_char_obj + Service.__init__(self, bus, index, self.TEST_SVC_UUID, primary=True) + self.add_characteristic(SupportedNewAlertCategoryCharacteristic(bus, '0001', self)) + self.add_characteristic(AlertNotificationControlPointCharacteristic(bus, '0002', self)) + alert_status_char_obj = UnreadAlertStatusCharacteristic(bus, '0003', self) + self.add_characteristic(alert_status_char_obj) + + +class SupportedNewAlertCategoryCharacteristic(Characteristic): + SUPPORT_NEW_ALERT_UUID = '00002A47-0000-1000-8000-00805f9b34fb' + + def __init__(self, bus, index, service): + Characteristic.__init__( + self, bus, index, + self.SUPPORT_NEW_ALERT_UUID, + ['read'], + service) + + self.value = [dbus.Byte(2)] + + def ReadValue(self, options): + global CHAR_READ + CHAR_READ = True + val_list = [] + for val in self.value: + val_list.append(dbus.Byte(val)) + print("Read Request received\n", "\tSupportedNewAlertCategoryCharacteristic") + print("\tValue:", "\t", val_list) + return val_list + + +class AlertNotificationControlPointCharacteristic(Characteristic): + ALERT_NOTIF_UUID = '00002A44-0000-1000-8000-00805f9b34fb' + + def __init__(self, bus, index, service): + Characteristic.__init__( + self, bus, index, + self.ALERT_NOTIF_UUID, + ['read','write'], + service) + + self.value = [dbus.Byte(0)] + + def ReadValue(self, options): + val_list = [] + for val in self.value: + val_list.append(dbus.Byte(val)) + print("Read Request received\n", "\tAlertNotificationControlPointCharacteristic") + print("\tValue:", "\t", val_list) + return val_list + + def WriteValue(self, value, options): + global CHAR_WRITE + CHAR_WRITE = True + print("Write Request received\n", "\tAlertNotificationControlPointCharacteristic") + print("\tCurrent value:", "\t", self.value) + val_list = [] + for val in value: + val_list.append(val) + self.value = val_list + # Check if new value is written + print("\tNew value:", "\t", self.value) + if not self.value == value: + print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value) + + +class UnreadAlertStatusCharacteristic(Characteristic): + UNREAD_ALERT_STATUS_UUID = '00002A45-0000-1000-8000-00805f9b34fb' + + def __init__(self, bus, index, service): + Characteristic.__init__( + self, bus, index, + self.UNREAD_ALERT_STATUS_UUID, + ['read', 'write', 'notify'], + service) + self.value = [dbus.Byte(0)] + self.cccd_obj = ClientCharacteristicConfigurationDescriptor(bus, '0001', self) + self.add_descriptor(self.cccd_obj) + self.notifying = False + + def StartNotify(self): + global CHAR_SUBSCRIBE + CHAR_SUBSCRIBE = True + + if self.notifying: + print('\nAlready notifying, nothing to do') + return + self.notifying = True + print("\nNotify Started") + self.cccd_obj.WriteValue([dbus.Byte(1), dbus.Byte(0)]) + self.cccd_obj.ReadValue() + + def StopNotify(self): + if not self.notifying: + print('\nNot notifying, nothing to do') + return + self.notifying = False + print("\nNotify Stopped") + + def ReadValue(self, options): + print("Read Request received\n", "\tUnreadAlertStatusCharacteristic") + val_list = [] + for val in self.value: + val_list.append(dbus.Byte(val)) + print("\tValue:", "\t", val_list) + return val_list + + def WriteValue(self, value, options): + print("Write Request received\n", "\tUnreadAlertStatusCharacteristic") + val_list = [] + for val in value: + val_list.append(val) + self.value = val_list + # Check if new value is written + print("\tNew value:", "\t", self.value) + if not self.value == value: + print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value) + + +class ClientCharacteristicConfigurationDescriptor(Descriptor): + CCCD_UUID = '00002902-0000-1000-8000-00805f9b34fb' + + def __init__(self, bus, index, characteristic): + self.value = [dbus.Byte(0)] + Descriptor.__init__( + self, bus, index, + self.CCCD_UUID, + ['read', 'write'], + characteristic) + + def ReadValue(self): + print("\tValue on read:", "\t", self.value) + return self.value + + def WriteValue(self, value): + val_list = [] + for val in value: + val_list.append(val) + self.value = val_list + # Check if new value is written + print("New value on write:", "\t", self.value) + if not self.value == value: + print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value) diff --git a/tools/ble/requirements.txt b/tools/ble/requirements.txt new file mode 100644 index 000000000..8049fd2f9 --- /dev/null +++ b/tools/ble/requirements.txt @@ -0,0 +1,3 @@ +future +dbus-python +pygobject