diff --git a/README.md b/README.md index 632455b..edbcba5 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ Sniffle has a number of useful features, including: * Easy to extend host-side software written in Python * PCAP export compatible with the Ubertooth * Wireshark compatible plugin +* ZMQ Publishing server ## Prerequisites @@ -244,6 +245,9 @@ options: -d, --decode Decode advertising data -o OUTPUT, --output OUTPUT PCAP output file name + -z, --zmq Enable ZMQ server + --zmqport Set ZMQ server port (default:4222) + --zmqhost Set ZMQ server ip (default: 127.0.0.1) ``` The XDS110 debugger on the Launchpad boards creates two serial ports. On diff --git a/python_cli/advertiser.py b/python_cli/advertiser.py index 31622df..1a15ef6 100755 --- a/python_cli/advertiser.py +++ b/python_cli/advertiser.py @@ -14,10 +14,11 @@ def main(): aparse = argparse.ArgumentParser(description="Connection initiator test script for Sniffle BLE5 sniffer") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate") args = aparse.parse_args() global hw - hw = SniffleHW(args.serport) + hw = SniffleHW(args.serport,baudrate=args.baudrate) # set the advertising channel (and return to ad-sniffing mode) hw.cmd_chan_aa_phy(37, BLE_ADV_AA, 0) diff --git a/python_cli/initiator.py b/python_cli/initiator.py index a49f258..95fb862 100755 --- a/python_cli/initiator.py +++ b/python_cli/initiator.py @@ -18,6 +18,7 @@ def main(): aparse = argparse.ArgumentParser(description="Connection initiator test script for Sniffle BLE5 sniffer") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate") aparse.add_argument("-c", "--advchan", default=37, choices=[37, 38, 39], type=int, help="Advertising channel to listen on") aparse.add_argument("-m", "--mac", default=None, help="Specify target MAC address") @@ -31,7 +32,7 @@ def main(): args = aparse.parse_args() global hw - hw = SniffleHW(args.serport) + hw = SniffleHW(args.serport, baudrate=args.baudrate) targ_specs = bool(args.mac) + bool(args.irk) + bool(args.string) if targ_specs < 1: diff --git a/python_cli/requirements.txt b/python_cli/requirements.txt new file mode 100644 index 0000000..c6f497a --- /dev/null +++ b/python_cli/requirements.txt @@ -0,0 +1,7 @@ +pyserial +zmq +numpy +scipy + + + diff --git a/python_cli/reset.py b/python_cli/reset.py index 329b604..8f78c88 100755 --- a/python_cli/reset.py +++ b/python_cli/reset.py @@ -11,9 +11,10 @@ def main(): aparse = argparse.ArgumentParser(description="Firmware reset utility for Sniffle BLE5 sniffer") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate") args = aparse.parse_args() - hw = SniffleHW(args.serport) + hw = SniffleHW(args.serport, baudrate=args.baudrate) # 5 resets seems to work more reliably than fewer print("Sending reset commands...") diff --git a/python_cli/scanner.py b/python_cli/scanner.py index 6604fb9..d720ccb 100755 --- a/python_cli/scanner.py +++ b/python_cli/scanner.py @@ -48,6 +48,7 @@ def add_hit(self, rssi): def main(): aparse = argparse.ArgumentParser(description="Scanner utility for Sniffle BLE5 sniffer") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate") aparse.add_argument("-c", "--advchan", default=37, choices=[37, 38, 39], type=int, help="Advertising channel to listen on") aparse.add_argument("-r", "--rssi", default=-128, type=int, @@ -60,7 +61,7 @@ def main(): args = aparse.parse_args() global hw - hw = make_sniffle_hw(args.serport) + hw = make_sniffle_hw(serport=args.serport,baudrate=args.baudrate) hw.setup_sniffer( mode=SnifferMode.ACTIVE_SCAN, diff --git a/python_cli/sniff_receiver.py b/python_cli/sniff_receiver.py index 0aeed42..a5f138b 100755 --- a/python_cli/sniff_receiver.py +++ b/python_cli/sniff_receiver.py @@ -1,17 +1,19 @@ #!/usr/bin/env python3 # Written by Sultan Qasim Khan +# OpenDroneID mods (c) by B. Kerler # Copyright (c) 2018-2024, NCC Group plc # Released as open source under GPLv3 import argparse, sys +import json +import time from binascii import unhexlify -from sniffle.constants import BLE_ADV_AA from sniffle.pcap import PcapBleWriter from sniffle.sniffle_hw import (make_sniffle_hw, PacketMessage, DebugMessage, StateMessage, MeasurementMessage, SnifferMode, PhyMode) from sniffle.packet_decoder import (AdvaMessage, AdvDirectIndMessage, AdvExtIndMessage, - ScanRspMessage, DataMessage, str_mac) + ScanRspMessage, DataMessage, str_mac, AdvIndMessage) from sniffle.errors import UsageError, SourceDone from sniffle.advdata.decoder import decode_adv_data @@ -24,6 +26,7 @@ def main(): aparse = argparse.ArgumentParser(description="Host-side receiver for Sniffle BLE5 sniffer") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate") aparse.add_argument("-c", "--advchan", default=40, choices=[37, 38, 39], type=int, help="Advertising channel to listen on") aparse.add_argument("-p", "--pause", action="store_true", @@ -55,8 +58,40 @@ def main(): aparse.add_argument("-d", "--decode", action="store_true", help="Decode advertising data") aparse.add_argument("-o", "--output", default=None, help="PCAP output file name") + aparse.add_argument("-z", "--zmq", action="store_true", help="Enable zmq") + aparse.add_argument("--zmqsetting", default="127.0.0.1:4222", help="Define zmq server settings") args = aparse.parse_args() + if args.zmq: + import zmq + + url = f"tcp://{args.zmqsetting}" + + context = zmq.Context() + socket = context.socket(zmq.XPUB) + socket.setsockopt(zmq.XPUB_VERBOSE, True) + socket.bind(url) + + def zmq_thread(socket): + try: + while True: + event = socket.recv() + # Event is one byte 0=unsub or 1=sub, followed by topic + if event[0] == 1: + log("new subscriber for", event[1:]) + elif event[0] == 0: + log("unsubscribed", event[1:]) + except zmq.error.ContextTerminated: + pass + + def log(*msg): + s = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + print("%s:" % s, *msg, end="\n", file=sys.stderr) + + from threading import Thread + zthread = Thread(target=zmq_thread, args=[socket], daemon=True, name='zmq') + zthread.start() + # Sanity check argument combinations targ_specs = bool(args.mac) + bool(args.irk) + bool(args.string) if args.hop and targ_specs < 1: @@ -70,7 +105,7 @@ def main(): raise UsageError("Don't specify an advertising channel if you want advertising channel hopping!") global hw - hw = make_sniffle_hw(args.serport) + hw = make_sniffle_hw(serport=args.serport, baudrate=args.baudrate) # if a channel was explicitly specified, don't hop hop3 = True if targ_specs else False @@ -137,10 +172,18 @@ def main(): while True: try: msg = hw.recv_and_decode() - print_message(msg, args.quiet, args.decode) + if args.zmq: + smsg = msg.to_dict() + smsg = json.dumps(smsg) + socket.send_string(smsg) + print_message(msg, args.quiet, args.decode) + else: + print_message(msg, args.quiet, args.decode) except SourceDone: break except KeyboardInterrupt: + if args.zmq: + socket.close() hw.cancel_recv() sys.stderr.write("\r") break diff --git a/python_cli/sniffle/packet_decoder.py b/python_cli/sniffle/packet_decoder.py index dc26673..36fb1eb 100644 --- a/python_cli/sniffle/packet_decoder.py +++ b/python_cli/sniffle/packet_decoder.py @@ -82,6 +82,7 @@ def __init__(self, raw_msg, dstate: SniffleDecoderState, crc_rev=None): self.crc_rev = -1 else: self.crc_rev = crc_ble_reverse(dstate.crc_init_rev, body) + self.pkt = self.to_dict() @staticmethod def from_body(body, is_data=False, slave_send=False, is_aux_adv=False): @@ -105,6 +106,10 @@ def __repr__(self): type(self).__name__, self.ts, self.aa, self.rssi, self.chan, self.phy, self.event, repr(self.body)) + def to_dict(self): + return {"ts": self.ts, "aa": self.aa, "rssi": self.rssi, "chan": self.chan, + "phy": self.phy, "event": self.event, "body": self.body.hex()} + def str_header(self): phy_names = ["1M", "2M", "Coded (S=8)", "Coded (S=2)"] if self.crc_err: @@ -139,6 +144,7 @@ def __init__(self, pkt: PacketMessage): self.crc_err = pkt.crc_err self.event = pkt.event self.crc_rev = pkt.crc_rev + self.pkt = pkt.to_dict() def _str_decode(self): raise NotImplementedError("Use a derived class") @@ -153,7 +159,7 @@ def __str__(self): return "\n".join([self.str_header(), self.str_decode(), self.hexdump()]) @staticmethod - def from_body(body, is_data=False, slave_send=False): + def from_body(body, is_data=False, slave_send=False, **kwargs): return DPacketMessage.decode(PacketMessage.from_body(body, is_data, slave_send)) @staticmethod @@ -183,6 +189,14 @@ def str_adtype(self): atstr += "Ad Length: %i" % self.ad_length return atstr + def dict_adtype(self): + res = {} + res["ChSel"]=self.ChSel + res["TxAdd"]=self.TxAdd + res["RxAdd"]=self.RxAdd + res["AdLength"]=self.ad_length + return res + def _str_decode(self): return self.str_adtype() @@ -245,12 +259,24 @@ def str_datatype(self): dtstr += "Data Length: %i" % self.data_length return dtstr + def dict_datatype(self): + return {"LLID": self.pdutype, + "Dir": ("S->M" if self.data_dir else "M->S"), + "NESN": self.NESN, + "SN": self.SN, + "MD": self.MD, + "Data Length": self.data_length} + + def str_header(self): return super().str_header() + " Event: %d" % self.event def _str_decode(self): return self.str_datatype() + def to_dict(self): + return self.dict_datatype() + @staticmethod def decode(pkt: PacketMessage, dstate=None): LLID = pkt.body[0] & 0x3 @@ -267,6 +293,34 @@ class LlDataMessage(DataMessage): class LlDataContMessage(DataMessage): pdutype = "LL DATA CONT" +control_opcodes = [ + "LL_CONNECTION_UPDATE_IND", + "LL_CHANNEL_MAP_IND", + "LL_TERMINATE_IND", + "LL_ENC_REQ", + "LL_ENC_RSP", + "LL_START_ENC_REQ", + "LL_START_ENC_RSP", + "LL_UNKNOWN_RSP", + "LL_FEATURE_REQ", + "LL_FEATURE_RSP", + "LL_PAUSE_ENC_REQ", + "LL_PAUSE_ENC_RSP", + "LL_VERSION_IND", + "LL_REJECT_IND", + "LL_SLAVE_FEATURE_REQ", + "LL_CONNECTION_PARAM_REQ", + "LL_CONNECTION_PARAM_RSP", + "LL_REJECT_EXT_IND", + "LL_PING_REQ", + "LL_PING_RSP", + "LL_LENGTH_REQ", + "LL_LENGTH_RSP", + "LL_PHY_REQ", + "LL_PHY_RSP", + "LL_PHY_UPDATE_IND", + "LL_MIN_USED_CHANNELS_IND" +] class LlControlMessage(DataMessage): pdutype = "LL CONTROL" @@ -275,44 +329,26 @@ def __init__(self, pkt: PacketMessage): self.opcode = self.body[2] def str_opcode(self): - control_opcodes = [ - "LL_CONNECTION_UPDATE_IND", - "LL_CHANNEL_MAP_IND", - "LL_TERMINATE_IND", - "LL_ENC_REQ", - "LL_ENC_RSP", - "LL_START_ENC_REQ", - "LL_START_ENC_RSP", - "LL_UNKNOWN_RSP", - "LL_FEATURE_REQ", - "LL_FEATURE_RSP", - "LL_PAUSE_ENC_REQ", - "LL_PAUSE_ENC_RSP", - "LL_VERSION_IND", - "LL_REJECT_IND", - "LL_SLAVE_FEATURE_REQ", - "LL_CONNECTION_PARAM_REQ", - "LL_CONNECTION_PARAM_RSP", - "LL_REJECT_EXT_IND", - "LL_PING_REQ", - "LL_PING_RSP", - "LL_LENGTH_REQ", - "LL_LENGTH_RSP", - "LL_PHY_REQ", - "LL_PHY_RSP", - "LL_PHY_UPDATE_IND", - "LL_MIN_USED_CHANNELS_IND" - ] if self.opcode < len(control_opcodes): return "Opcode: %s" % control_opcodes[self.opcode] else: return "Opcode: RFU (0x%02X)" % self.opcode + def dict_opcode(self): + if self.opcode < len(control_opcodes): + return {"Opcode": control_opcodes[self.opcode] } + else: + return {"Opcode RFU":self.opcode} + def _str_decode(self): return "\n".join([ self.str_datatype(), self.str_opcode()]) + def to_dict(self): + return {self.pdutype:self.pkt,"DataType":self.dict_datatype(), + "Opcode":self.dict_opcode()} + class AdvaMessage(AdvertMessage): def __init__(self, pkt: PacketMessage): super().__init__(pkt) @@ -327,6 +363,13 @@ def _str_decode(self): self.str_adtype(), self.str_adva()]) + def to_dict(self): + res={self.pdutype:self.pkt} + res["AdvA"]=str_mac2(self.AdvA, self.TxAdd) + if len(self.adv_data)>0: + res["AdvData"]=self.adv_data.hex() + return res + class AdvIndMessage(AdvaMessage): pdutype = "ADV_IND" @@ -351,11 +394,17 @@ def __init__(self, pkt: PacketMessage): def str_ata(self): return "AdvA: %s TargetA: %s" % (str_mac2(self.AdvA, self.TxAdd), str_mac2(self.TargetA, self.RxAdd)) + def dict_ata(self): + return {"AdvA": str_mac2(self.AdvA, self.TxAdd), "TargetA": str_mac2(self.TargetA, self.RxAdd), "AdvData":self.adv_data.hex()} + def _str_decode(self): return "\n".join([ self.str_adtype(), self.str_ata()]) + def to_dict(self): + return {self.pdutype:self.pkt,"adtype":self.dict_adtype(),"ata":self.dict_ata(),"AdvData":self.adv_data.hex()} + class ScanReqMessage(AdvertMessage): pdutype = "SCAN_REQ" @@ -367,11 +416,17 @@ def __init__(self, pkt: PacketMessage): def str_asa(self): return "ScanA: %s AdvA: %s" % (str_mac2(self.ScanA, self.TxAdd), str_mac2(self.AdvA, self.RxAdd)) + def dict_asa(self): + return {"ScanA":str_mac2(self.ScanA, self.TxAdd),"AdvA": str_mac2(self.AdvA, self.RxAdd)} + def _str_decode(self): return "\n".join([ self.str_adtype(), self.str_asa()]) + def to_dict(self): + return {self.pdutype:self.pkt,"adtype":self.dict_adtype(),"asa":self.str_asa()} + class AuxScanReqMessage(ScanReqMessage): pdutype = "AUX_SCAN_REQ" @@ -395,11 +450,26 @@ def str_aia(self): return "InitA: %s AdvA: %s AA: 0x%08X CRCInit: 0x%06X" % ( str_mac2(self.InitA, self.TxAdd), str_mac2(self.AdvA, self.RxAdd), self.aa_conn, self.CRCInit) + def dict_aia(self): + return {"InitA":str_mac2(self.InitA, self.TxAdd), + "AdvA":str_mac2(self.AdvA, self.RxAdd), + "aa": self.aa_conn, + "CRCInit": self.CRCInit} + def str_conn_params(self): return "WinSize: %d WinOffset: %d Interval: %d Latency: %d Timeout: %d Hop: %d SCA: %d" % ( self.WinSize, self.WinOffset, self.Interval, self.Latency, self.Timeout, self.Hop, self.SCA) + def dict_conn_params(self): + return {"WinSize": self.WinSize, + "WinOffset": self.WinOffset, + "Interval": self.Interval, + "Latency": self.Latency, + "Timeout": self.Timeout, + "Hop": self.Hop, + "SCA": self.SCA} + def str_chm(self): if self.ChM == b'\xFF\xFF\xFF\xFF\x1F': descstr = "all channels" @@ -413,6 +483,19 @@ def str_chm(self): chanstr = "%02X %02X %02X %02X %02X" % tuple(self.ChM) return "Channel Map: %s (%s)" % (chanstr, descstr) + def dict_chm(self): + if self.ChM == b'\xFF\xFF\xFF\xFF\x1F': + descstr = "all channels" + else: + has_chan = lambda chm, i: (chm[i // 8] & (1 << (i & 7))) != 0 + excludes = [] + for i in range(37): + if not has_chan(self.ChM, i): + excludes.append(i) + descstr = "excludes " + ", ".join([str(i) for i in excludes]) + chanstr = "%02X %02X %02X %02X %02X" % tuple(self.ChM) + return {"Channel Map": {"channel":chanstr,"desc":descstr}} + def _str_decode(self): return "\n".join([ self.str_adtype(), @@ -420,9 +503,16 @@ def _str_decode(self): self.str_conn_params(), self.str_chm()]) + def to_dict(self): + return {self.pdutype:self.pkt,"adtype":self.dict_adtype(),"aia":self.dict_aia(), + "conn_params":self.dict_conn_params(),"chm":self.dict_chm()} + class AuxConnectReqMessage(ConnectIndMessage): pdutype = "AUX_CONNECT_REQ" + +phy_names = ["1M", "2M", "Coded", "Invalid3", "Invalid4", + "Invalid5", "Invalid6", "Invalid7"] class AuxPtr: def __init__(self, ptr): self.chan = ptr[0] & 0x3F @@ -432,11 +522,13 @@ def __init__(self, ptr): self.offsetUsec = auxOffset * offsetMult def __str__(self): - phy_names = ["1M", "2M", "Coded", "Invalid3", "Invalid4", - "Invalid5", "Invalid6", "Invalid7"] return "AuxPtr Chan: %d PHY: %s Delay: %d us" % ( self.chan, phy_names[self.phy], self.offsetUsec) + def to_dict(self): + return {"chan":self.chan, "PHY":phy_names[self.phy], + "Delay_us":self.offsetUsec} + class AdvDataInfo: def __init__(self, adi): self.did = adi[0] + ((adi[1] & 0x0F) << 8) @@ -450,6 +542,9 @@ def __eq__(self, other): return self.did == other.did and self.sid == other.sid return False + def to_dict(self): + return {"did":self.did, "sid":self.sid} + class AdvExtIndMessage(AdvertMessage): pdutype = "ADV_EXT_IND" @@ -533,11 +628,39 @@ def str_aext(self): else: return dmsg + def dict_aext(self): + ret = {} + + amodes = ["Non-connectable, non-scannable", + "Connectable", "Scannable", "RFU"] + ret["AdvMode"]=amodes[self.AdvMode] + if self.AdvA: + ret["AdvA"]= str_mac2(self.AdvA, self.TxAdd) + if self.TargetA: + ret["TargetA"]= str_mac2(self.TargetA, self.RxAdd) + if self.CTEInfo: + ret["CTEInfo"]= self.CTEInfo + if self.AdvDataInfo: + ret["AdvDataInfo"]=self.AdvDataInfo.to_dict() + if self.SyncInfo: + # TODO decode this nicely + ret["SyncInfo"]=self.SyncInfo.hex() + if self.TxPower: + ret["TxPower"]=self.TxPower + if self.ACAD: + ret["ACAD"]=self.ACAD.hex() + if self.AuxPtr: + ret["AuxPtr"]=self.AuxPtr.to_dict() + return ret + def _str_decode(self): return "\n".join([ self.str_adtype(), self.str_aext()]) + def to_dict(self): + return {self.pdutype:self.pkt,"adtype":self.dict_adtype(), "aext": self.dict_aext(), "AdvData":self.adv_data.hex()} + def get_adi(pkt: PacketMessage): dpkt = AdvExtIndMessage(pkt) return dpkt.AdvDataInfo diff --git a/python_cli/sniffle/sniffle_hw.py b/python_cli/sniffle/sniffle_hw.py index 102b669..d3ab4f8 100644 --- a/python_cli/sniffle/sniffle_hw.py +++ b/python_cli/sniffle/sniffle_hw.py @@ -7,7 +7,7 @@ from struct import pack, unpack from base64 import b64encode, b64decode from binascii import Error as BAError -from time import time +from time import time, sleep from random import randint, randrange from serial.tools.list_ports import comports from traceback import format_exception @@ -80,7 +80,7 @@ def is_cp2102(serport): return True return False -def make_sniffle_hw(serport=None, logger=None, timeout=None): +def make_sniffle_hw(serport=None, baudrate=921600, logger=None, timeout=None): if serport is None: return SniffleHW(serport, logger, timeout) elif serport.startswith('rfnm'): @@ -96,14 +96,17 @@ def make_sniffle_hw(serport=None, logger=None, timeout=None): fname = serport[5:] return SniffleFileSDR(fname, logger=logger) else: - return SniffleHW(serport, logger, timeout) + return SniffleHW(serport, baudrate, logger, timeout) class SniffleHW: max_interval_preload_pairs = 4 api_level = 0 - def __init__(self, serport=None, logger=None, timeout=None): - baud = 2000000 + def __init__(self, serport=None, baudrate=None, logger=None, timeout=None): + if baudrate is None: + baud = 2000000 + else: + baud = int(baudrate) if serport is None: serport = find_xds110_serport() if serport is None: @@ -112,10 +115,11 @@ def __init__(self, serport=None, logger=None, timeout=None): serport = find_catsniffer_v3_serport() if serport is None: raise IOError("Sniffle device not found") - else: + elif baudrate is None: baud = 921600 elif is_cp2102(serport): - baud = 921600 + if baudrate is None: + baud = 921600 self.timeout = timeout self.decoder_state = SniffleDecoderState() @@ -420,10 +424,14 @@ def mark_and_flush(self): marker_data = pack(' 1000: + break def probe_fw_version(self): self.cmd_version() diff --git a/python_cli/sniffle/sniffle_sdr.py b/python_cli/sniffle/sniffle_sdr.py index 158bec3..63ba944 100644 --- a/python_cli/sniffle/sniffle_sdr.py +++ b/python_cli/sniffle/sniffle_sdr.py @@ -394,10 +394,16 @@ def __init__(self, driver='rfnm', mode='single', logger=None): multi_chan = False gain = 10 chan = 37 - + if driver == 'rfnm': - self.sdr = SoapyDevice({'driver': driver}) - + self.sdr = None + results = SoapyDevice.enumerate() + for device in results: + if device["driver"] == driver: + self.sdr = SoapyDevice(device) + break + if self.sdr is None: + assert False, "No SoapySDR found" rates = self.sdr.listSampleRates(SOAPY_SDR_RX, self.sdr_chan) antennas = self.sdr.listAntennas(SOAPY_SDR_RX, self.sdr_chan) self.sdr.setAntenna(SOAPY_SDR_RX, self.sdr_chan, antennas[1]) diff --git a/python_cli/sniffle_extcap.py b/python_cli/sniffle_extcap.py index 1b5ed93..74aabc2 100755 --- a/python_cli/sniffle_extcap.py +++ b/python_cli/sniffle_extcap.py @@ -38,7 +38,7 @@ import traceback from serial.tools.list_ports import comports from sniffle.constants import BLE_ADV_AA -from sniffle.sniffle_hw import make_sniffle_hw, PacketMessage, SnifferMode, PhyMode +from sniffle.sniffle_hw import make_sniffle_hw, PacketMessage, SnifferMode, PhyMode, SniffleHW from sniffle.packet_decoder import (DataMessage, AdvaMessage, AdvDirectIndMessage, ScanRspMessage, AdvExtIndMessage, str_mac) from sniffle.pcap import PcapBleWriter @@ -188,6 +188,7 @@ def loadArgs(self, args=None): help="Ignore encrypted PHY mode changes") argParser.add_argument("--crcerr", action="store_true", help="Capture packets with CRC errors") + argParser.add_argument("--baudrate", default=None, help="Sniffer serial port baudrate") self.args = argParser.parse_args(args=args) diff --git a/python_cli/uart_test.py b/python_cli/uart_test.py index 47ae826..3647f0a 100755 --- a/python_cli/uart_test.py +++ b/python_cli/uart_test.py @@ -10,9 +10,10 @@ def main(): aparse = argparse.ArgumentParser(description="UART echo test for Sniffle BLE5 sniffer") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate") args = aparse.parse_args() - hw = SniffleHW(args.serport, timeout=0.1) + hw = SniffleHW(serport=args.serport, baudrate=args.baudrate, timeout=0.1) # listen in a way that will receive nothing hw.cmd_chan_aa_phy(0, 0xFFFFFFFF, 0) diff --git a/python_cli/version_check.py b/python_cli/version_check.py index 09bf5bb..e22125e 100755 --- a/python_cli/version_check.py +++ b/python_cli/version_check.py @@ -10,9 +10,10 @@ def main(): aparse = argparse.ArgumentParser(description="Sniffle firmware version check utility") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate") args = aparse.parse_args() - hw = SniffleHW(args.serport, timeout=0.1) + hw = SniffleHW(serport=args.serport, baudrate=args.baudrate, timeout=0.1) ver_msg = hw.probe_fw_version() if ver_msg: