python - bleak的低功耗蓝牙设备连接

发布时间 2023-10-16 16:18:24作者: wstong

1. 接收蓝牙日志

可以用手机拨号##5959##打开蓝牙调试
image
然后使用对应的手机app先使用对应的蓝牙产品

2. 蓝牙日志分析

将手机上的蓝牙日志文件(btsnoop_hci.log)拷贝到电脑上使用wireshark打开,主要是看发送和接收的数据,以下是某体重秤
image

3. 寻找设备

import asyncio
from bleak import discover

async def main():
    devices = await discover(timeout=5.0)
    for d in devices:
        print(d)
        print(f"信号: {d.rssi}")

if __name__ == "__main__":
    asyncio.run(main())

image

4. 查看uuid

使用找到的地址查看uuid,例如这里是D8:0B:CB:50:1A:C7

"""
Service Explorer
----------------

An example showing how to access and print out the services, characteristics and
descriptors of a connected GATT server.

Created on 2019-03-25 by hbldh <henrik.blidh@nedomkull.com>

"""

import argparse
import asyncio
import logging

from bleak import BleakClient, BleakScanner

logger = logging.getLogger(__name__)


async def main(args: argparse.Namespace):
    logger.info("starting scan...")

    if args.address:
        device = await BleakScanner.find_device_by_address(
            args.address, cb=dict(use_bdaddr=args.macos_use_bdaddr)
        )
        if device is None:
            logger.error("could not find device with address '%s'", args.address)
            return
    else:
        device = await BleakScanner.find_device_by_name(
            args.name, cb=dict(use_bdaddr=args.macos_use_bdaddr)
        )
        if device is None:
            logger.error("could not find device with name '%s'", args.name)
            return

    logger.info("connecting to device...")

    async with BleakClient(
        device,
        services=args.services,
    ) as client:
        logger.info("connected")

        for service in client.services:
            logger.info("[Service] %s", service)

            for char in service.characteristics:
                if "read" in char.properties:
                    try:
                        value = await client.read_gatt_char(char.uuid)
                        logger.info(
                            "  [Characteristic] %s (%s), Value: %r",
                            char,
                            ",".join(char.properties),
                            value,
                        )
                    except Exception as e:
                        logger.error(
                            "  [Characteristic] %s (%s), Error: %s",
                            char,
                            ",".join(char.properties),
                            e,
                        )

                else:
                    logger.info(
                        "  [Characteristic] %s (%s)", char, ",".join(char.properties)
                    )

                for descriptor in char.descriptors:
                    try:
                        value = await client.read_gatt_descriptor(descriptor.handle)
                        logger.info("    [Descriptor] %s, Value: %r", descriptor, value)
                    except Exception as e:
                        logger.error("    [Descriptor] %s, Error: %s", descriptor, e)

        logger.info("disconnecting...")

    logger.info("disconnected")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    device_group = parser.add_mutually_exclusive_group(required=True)

    device_group.add_argument(
        "--name",
        metavar="<name>",
        help="the name of the bluetooth device to connect to",
    )
    device_group.add_argument(
        "--address",
        metavar="<address>",
        help="the address of the bluetooth device to connect to",
    )

    parser.add_argument(
        "--macos-use-bdaddr",
        action="store_true",
        help="when true use Bluetooth address instead of UUID on macOS",
    )

    parser.add_argument(
        "--services",
        nargs="+",
        metavar="<uuid>",
        help="if provided, only enumerate matching service(s)",
    )

    parser.add_argument(
        "-d",
        "--debug",
        action="store_true",
        help="sets the log level to debug",
    )

    args = parser.parse_args()

    log_level = logging.DEBUG if args.debug else logging.INFO
    logging.basicConfig(
        level=log_level,
        format="%(asctime)-15s %(name)-8s %(levelname)s: %(message)s",
    )

    asyncio.run(main(args))

以下是输出结果

2023-10-16 16:05:03,386 __main__ INFO: starting scan...
2023-10-16 16:05:04,092 __main__ INFO: connecting to device...
2023-10-16 16:05:06,654 __main__ INFO: connected
2023-10-16 16:05:06,654 __main__ INFO: [Service] 00001800-0000-1000-8000-00805f9b34fb (Handle: 1): Generic Access Profile
2023-10-16 16:05:06,747 __main__ INFO:   [Characteristic] 00002a00-0000-1000-8000-00805f9b34fb (Handle: 2): Device Name (read), Value: bytearray(b'QN-Scale\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
2023-10-16 16:05:06,747 __main__ INFO: [Service] 0000180f-0000-1000-8000-00805f9b34fb (Handle: 4): Battery Service
2023-10-16 16:05:06,857 __main__ INFO:   [Characteristic] 00002a19-0000-1000-8000-00805f9b34fb (Handle: 5): Battery Level (read), Value: bytearray(b'?')
2023-10-16 16:05:06,857 __main__ INFO: [Service] 0000180a-0000-1000-8000-00805f9b34fb (Handle: 7): Device Information
2023-10-16 16:05:06,950 __main__ INFO:   [Characteristic] 00002a29-0000-1000-8000-00805f9b34fb (Handle: 8): Manufacturer Name String (read), Value: bytearray(b'Qing Niu Technology\x00')
2023-10-16 16:05:07,044 __main__ INFO:   [Characteristic] 00002a26-0000-1000-8000-00805f9b34fb (Handle: 10): Firmware Revision String (read), Value: bytearray(b'V07.0\x00')
2023-10-16 16:05:07,154 __main__ INFO:   [Characteristic] 00002a23-0000-1000-8000-00805f9b34fb (Handle: 12): System ID (read), Value: bytearray(b'\xc7\x1aP\xcb\x0b\xd8')
2023-10-16 16:05:07,247 __main__ INFO:   [Characteristic] 00002a25-0000-1000-8000-00805f9b34fb (Handle: 14): Serial Number String (read), Value: bytearray(b'NONE\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff')
2023-10-16 16:05:07,247 __main__ INFO: [Service] 00010203-0405-0607-0809-0a0b0c0d1912 (Handle: 16): Unknown
2023-10-16 16:05:07,357 __main__ INFO:   [Characteristic] 00010203-0405-0607-0809-0a0b0c0d2b12 (Handle: 17): OTA (read,write-without-response), Value: bytearray(b'\x00')
2023-10-16 16:05:07,497 __main__ INFO:     [Descriptor] 00002901-0000-1000-8000-00805f9b34fb (Handle: 19): Characteristic User Description, Value: bytearray(b'OTA')
2023-10-16 16:05:07,497 __main__ INFO: [Service] 0000fff0-0000-1000-8000-00805f9b34fb (Handle: 20): Vendor specific
2023-10-16 16:05:07,497 __main__ INFO:   [Characteristic] 0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific (notify)
2023-10-16 16:05:07,607 __main__ INFO:     [Descriptor] 00002902-0000-1000-8000-00805f9b34fb (Handle: 23): Client Characteristic Configuration, Value: bytearray(b'\x00\x00')
2023-10-16 16:05:07,607 __main__ INFO:   [Characteristic] 0000fff2-0000-1000-8000-00805f9b34fb (Handle: 24): Vendor specific (write-without-response,write)
2023-10-16 16:05:07,607 __main__ INFO: disconnecting...
2023-10-16 16:05:10,812 __main__ INFO: disconnected

5. 主要代码

一般主要看notify的uuid,和对应写的uuid,同时结合蓝牙日志分析,像这款体重秤,要先发送命令之后,才开始接收数据
config.py

device_address = "d8:0b:cb:50:1a:c7"
device_name = "QN-Scale"
notify_uuid = "0000fff1-0000-1000-8000-00805f9b34fb"
write_uuid = "0000fff2-0000-1000-8000-00805f9b34fb"
open_data1 = [0x13, 0x09, 0x21, 0x01, 0x10, 0xAF, 0x1D, 0x01, 0x1B]
get_data1 = [0x14, 0x0B, 0x21, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x41]
open_data2 = [0x20, 0x08, 0x21, 0xD9, 0x04, 0x8E, 0x2C, 0xE0]
open_data3 = [0x22, 0x04, 0x21, 0x47]
close_data = [0x1F, 0x05, 0x21, 0x10, 0x55]
# -*- coding: utf-8 -*-
import asyncio
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
import config

end_flag = False
weight = 0


def print_hex(bytes):
    l = [hex(int(i)) for i in bytes]
    return " ".join(l)


async def main():
    print("starting scan...")

    # 基于设备名称查找设备
    device = await BleakScanner.find_device_by_name(config.device_name)
    if device is None:
        print("could not find device with name '%s'", config.device_name)
        return

    print("connecting to device...")

    async with BleakClient(device) as client:
        print("connected")

        def notification_handler(sender, data):
            global end_flag
            global weight
            print(f"{sender}: {print_hex(data)}")
            if len(data) == 12 and data[5] == 2:
                weight = data[3] * 16 * 16 + data[4]
                end_flag = True

        await client.start_notify(config.notify_uuid, notification_handler)
        await client.write_gatt_char(config.write_uuid, bytes(config.open_data1))
        await client.write_gatt_char(config.write_uuid, bytes(config.open_data2))
        await client.write_gatt_char(config.write_uuid, bytes(config.open_data3))

        while not end_flag:
            await asyncio.sleep(1.0)  # 监听1秒

        await client.write_gatt_char(config.write_uuid, bytes(config.close_data))
        await client.stop_notify(config.notify_uuid)
        print(f"接收体重: {weight/100}")


if __name__ == "__main__":
    asyncio.run(main())

以下是运行结果

starting scan...
connecting to device...
connected
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x12 0x11 0x21 0xc7 0x1a 0x50 0xcb 0xb 0xd8 0xc 0x33 0x7 0x40 0x3 0x2 0x7 0xb5
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xc 0x21 0x14 0xfa 0x0 0x0 0x0 0x0 0x0 0x0 0x4b
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x23 0x13 0x21 0x3 0x1 0xd9 0x4 0x8e 0x2c 0x3 0x11 0x0 0x0 0x0 0x0 0x0 0x0 0x0 0x6
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x23 0x13 0x21 0x3 0x1 0xd9 0x4 0x8e 0x2c 0x2 0xfd 0x0 0x0 0x0 0x0 0x0 0x0 0x0 0xf1
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x23 0x13 0x21 0x3 0x2 0xd9 0x4 0x8e 0x2c 0x4 0x29 0x0 0x0 0x0 0x0 0x0 0x0 0x0 0x20
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x23 0x13 0x21 0x3 0x2 0xd9 0x4 0x8e 0x2c 0x4 0x29 0x0 0x0 0x0 0x0 0x0 0x0 0x0 0x20
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xc 0x21 0x16 0x1c 0x0 0x0 0x0 0x0 0x0 0x0 0x6f
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xc 0x21 0x16 0x17 0x0 0x0 0x0 0x0 0x0 0x0 0x6a
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xc 0x21 0x16 0x17 0x0 0x0 0x0 0x0 0x0 0x0 0x6a
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xc 0x21 0x16 0x12 0x0 0x0 0x0 0x0 0x0 0x0 0x65
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xc 0x21 0x16 0x12 0x0 0x0 0x0 0x0 0x0 0x0 0x65
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xc 0x21 0x16 0x12 0x0 0x0 0x0 0x0 0x0 0x0 0x65
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xc 0x21 0x16 0x12 0x0 0x0 0x0 0x0 0x0 0x0 0x65
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xc 0x21 0x16 0xd 0x0 0x0 0x0 0x0 0x0 0x0 0x60
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xb 0x21 0x16 0xd 0x11 0x0 0x0 0x0 0x0 0x0 0x70
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xb 0x21 0x16 0xd 0x11 0x0 0x0 0x0 0x0 0x0 0x70
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xb 0x21 0x16 0xd 0x11 0x0 0x0 0x0 0x0 0x0 0x70
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xb 0x21 0x16 0xd 0x11 0x0 0x0 0x0 0x0 0x0 0x70
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xb 0x21 0x16 0xd 0x11 0x0 0x0 0x0 0x0 0x0 0x70
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xb 0x21 0x16 0xd 0x11 0x0 0x0 0x0 0x0 0x0 0x70
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xb 0x21 0x16 0xd 0x11 0x0 0x0 0x0 0x0 0x0 0x70
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xb 0x21 0x16 0xd 0x11 0x0 0x0 0x0 0x0 0x0 0x70
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xb 0x21 0x16 0xd 0x11 0x0 0x87 0x0 0x0 0x0 0xf7
0000fff1-0000-1000-8000-00805f9b34fb (Handle: 21): Vendor specific: 0x10 0xb 0x21 0x16 0xd 0x2 0x0 0x0 0x0 0x0 0x0 0x61
接收体重: 56.45