-
I'm sure I'm just being stupid but I've taken the library example and want to modify it to allow the main python program to I have the scanner part working and I build a list from the matching devices and I can open and send a command and receive a response via the asyncio.gather(ble.send_loop(), hello_ble()) line but the connection doesn't terminate or close if I remove the while True: part of the hello_send loop. What am I missing and thanks in advance. I know the testserial() loop is incorrect but this is one of many tries just to see if I understand what is going on. Thanks! And sorry.. I haven't figured out how to get code to paste correctly into this. See code below: # This is a sample Python script.
import socket
import serial
import time
from pynmeagps import NMEAReader
from signal import SIGINT
import tkinter as tk
from ble_serial.scan import main as scanner
import asyncio, logging
from ble_serial.bluetooth.ble_interface import BLE_interface
def testxls():
import pylightxl as xl
db = xl.Database()
db.add_ws(ws="OSCR_ST")
mydata = [10, 20, 30, 40]
# loop to add our data to the worksheet
for row_id, data in enumerate(mydata, start=1):
db.ws(ws="OSCR_ST").update_index(row=row_id, col=1, val=data)
# write out the db
xl.writexl(db=db, fn="output.xlsx")
async def scanble(oscrname):
### general scan
ADAPTER = "hci0"
SCAN_TIME = 5 #seconds
SERVICE_UUID = None # optional filtering
devices = await scanner.scan(ADAPTER, SCAN_TIME, SERVICE_UUID)
oscr = []
for i in devices:
#print(i.name)
if str(i).__contains__(oscrname):
oscr += [i]
print(i.name, i.address)
return oscr
def receiveble(value: bytes):
print("Received:", value)
async def sendble(ble: BLE_interface, cmd):
#while True:
await asyncio.sleep(3.0)
print("Sending...", cmd)
ble.queue_send(cmd)
await asyncio.sleep(3.0)
async def disconnectble(ble: BLE_interface):
await asyncio.sleep(10)
await ble.disconnect()
async def testserial(device):
# None uses default/autodetection, insert values if needed
ADAPTER = "hci0"
SERVICE_UUID = None
WRITE_UUID = None
READ_UUID = None
cmd = b'$C$'
ble = BLE_interface(ADAPTER, SERVICE_UUID)
ble.set_receiver(receiveble)
try:
await ble.connect(device, "public", 10.0)
await ble.setup_chars(WRITE_UUID, READ_UUID, "rw")
await asyncio.gather(ble.send_loop() , sendble(ble, b'$C$'), sendble(ble, b'$B$'))
await asyncio.gather(ble.send_loop(), disconnectble(ble))
finally:
# await ble.disconnect()
print("disconnect")
return
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
# testxls()
oscr = asyncio.run(scanble('OSCR'))
logging.basicConfig(level=logging.INFO)
for i in oscr:
asyncio.run(testserial(i.address)) |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
First of all I edited your post for code formatting, it needs triple backticks for code blocks. Most important thing to know is that rx_buffer = b''
rx_available = asyncio.Event()
def receive_callback(value: bytes):
print("Received:", value)
global rx_buffer
rx_buffer = value
rx_available.set()
async def sendble(ble: BLE_interface, cmd):
#while True:
await asyncio.sleep(3.0)
print("Sending...", cmd)
rx_available.clear()
ble.queue_send(cmd)
await rx_available.wait()
print('got response', rx_buffer)
await asyncio.sleep(3.0)
async def commander(ble: BLE_interface):
await sendble(ble, b'$C$')
await sendble(ble, b'$B$')
await ble.disconnect()
async def testserial(device):
# None uses default/autodetection, insert values if needed
ADAPTER = "hci0"
SERVICE_UUID = None
WRITE_UUID = None
READ_UUID = None
cmd = b'$C$'
ble = BLE_interface(ADAPTER, SERVICE_UUID)
ble.set_receiver(receive_callback)
await ble.connect(DEVICE, "public", 10.0)
await ble.setup_chars(WRITE_UUID, READ_UUID, "rw")
await asyncio.gather(ble.send_loop(), commander(ble)) I also added some syncing with Event() to check the response before sending the next command. To launch it you should never call async def main():
# testxls()
oscr = await scanble('OSCR')
for i in oscr:
await testserial(i.address)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.run(main()) (instead of doing one after another in the testserial() loop, an optimization could be to put multiple testserials into gather() etc. to run it in parallel, ble allows up to 7 connections at the same time, but this is probably only required if you have many devices) |
Beta Was this translation helpful? Give feedback.
First of all I edited your post for code formatting, it needs triple backticks for code blocks.
Most important thing to know is that
send_loop()
will never return without external signals andasyncio.gather
will wait for all coroutines to complete. So it does not matter that your 2sendble()
finish after about 6 sec - the second gather is never reached. To get out of it you can either callstop_loop()
or directlydisconnect()
.Also gather runs everything in parallel, I am not sure if you really intend to send both commands practically at the same time. Probably you want something like this instead: