diff --git a/config.yaml b/config.yaml index e7a5a92..2cff105 100644 --- a/config.yaml +++ b/config.yaml @@ -5,7 +5,7 @@ url: http://192.168.178.252:8123/local_eufyp2pstream arch: [amd64, aarch64, i386, armv7] # https://developers.home-assistant.io/docs/add-ons/configuration -version: 0.2.1-beta +version: 0.3.0-beta slug: eufyp2pstream init: false startup: application @@ -14,6 +14,116 @@ map: [config, media] host_network: false options: eufy_security_ws_port: 3000 + debug_log: false schema: eufy_security_ws_port: port -ports: { "63336/tcp": 63336, "63337/tcp": 63337, "63338/tcp": 63338 } + debug_log: bool + camera_1_serial_number: "str?" + camera_2_serial_number: "str?" + camera_3_serial_number: "str?" + camera_4_serial_number: "str?" + camera_5_serial_number: "str?" + camera_6_serial_number: "str?" + camera_7_serial_number: "str?" + camera_8_serial_number: "str?" + camera_9_serial_number: "str?" + camera_10_serial_number: "str?" + camera_11_serial_number: "str?" + camera_12_serial_number: "str?" + camera_13_serial_number: "str?" + camera_14_serial_number: "str?" + camera_15_serial_number: "str?" +ports: { + "63336/tcp": 63336, + "63337/tcp": 63337, + "63338/tcp": 63338, + "63339/tcp": 63339, + "63340/tcp": 63340, + "63341/tcp": 63341, + "63342/tcp": 63342, + "63343/tcp": 63343, + "63344/tcp": 63344, + "63345/tcp": 63345, + "63346/tcp": 63346, + "63347/tcp": 63347, + "63348/tcp": 63348, + "63349/tcp": 63349, + "63350/tcp": 63350, + "63351/tcp": 63351, + "63352/tcp": 63352, + "63353/tcp": 63353, + "63354/tcp": 63354, + "63355/tcp": 63355, + "63356/tcp": 63356, + "63357/tcp": 63357, + "63358/tcp": 63358, + "63359/tcp": 63359, + "63360/tcp": 63360, + "63361/tcp": 63361, + "63362/tcp": 63362, + "63363/tcp": 63363, + "63364/tcp": 63364, + "63365/tcp": 63365, + "63366/tcp": 63366, + "63367/tcp": 63367, + "63368/tcp": 63368, + "63369/tcp": 63369, + "63370/tcp": 63370, + "63371/tcp": 63371, + "63372/tcp": 63372, + "63373/tcp": 63373, + "63374/tcp": 63374, + "63375/tcp": 63375, + "63376/tcp": 63376, + "63377/tcp": 63377, + "63378/tcp": 63378, + "63379/tcp": 63379, + "63380/tcp": 63380, +} +ports_description: { + "63336/tcp": "Camera-1 Video Stream", + "63337/tcp": "Camera-1 Audio Stream", + "63338/tcp": "Camera-1 Backchannel", + "63339/tcp": "Camera-2 Video Stream", + "63340/tcp": "Camera-2 Audio Stream", + "63341/tcp": "Camera-2 Backchannel", + "63342/tcp": "Camera-3 Video Stream", + "63343/tcp": "Camera-3 Audio Stream", + "63344/tcp": "Camera-3 Backchannel", + "63345/tcp": "Camera-4 Video Stream", + "63346/tcp": "Camera-4 Audio Stream", + "63347/tcp": "Camera-4 Backchannel", + "63348/tcp": "Camera-5 Video Stream", + "63349/tcp": "Camera-5 Audio Stream", + "63350/tcp": "Camera-5 Backchannel", + "63351/tcp": "Camera-6 Video Stream", + "63352/tcp": "Camera-6 Audio Stream", + "63353/tcp": "Camera-6 Backchannel", + "63354/tcp": "Camera-7 Video Stream", + "63355/tcp": "Camera-7 Audio Stream", + "63356/tcp": "Camera-7 Backchannel", + "63357/tcp": "Camera-8 Video Stream", + "63358/tcp": "Camera-8 Audio Stream", + "63359/tcp": "Camera-8 Backchannel", + "63360/tcp": "Camera-9 Video Stream", + "63361/tcp": "Camera-9 Audio Stream", + "63362/tcp": "Camera-9 Backchannel", + "63363/tcp": "Camera-10 Video Stream", + "63364/tcp": "Camera-10 Audio Stream", + "63365/tcp": "Camera-10 Backchannel", + "63366/tcp": "Camera-11 Video Stream", + "63367/tcp": "Camera-11 Audio Stream", + "63368/tcp": "Camera-11 Backchannel", + "63369/tcp": "Camera-12 Video Stream", + "63370/tcp": "Camera-12 Audio Stream", + "63371/tcp": "Camera-12 Backchannel", + "63372/tcp": "Camera-13 Video Stream", + "63373/tcp": "Camera-13 Audio Stream", + "63374/tcp": "Camera-13 Backchannel", + "63375/tcp": "Camera-14 Video Stream", + "63376/tcp": "Camera-14 Audio Stream", + "63377/tcp": "Camera-14 Backchannel", + "63378/tcp": "Camera-15 Video Stream", + "63379/tcp": "Camera-15 Audio Stream", + "63380/tcp": "Camera-15 Backchannel", +} diff --git a/files/eufyp2pstream.py b/files/eufyp2pstream.py index 1282030..10dec45 100644 --- a/files/eufyp2pstream.py +++ b/files/eufyp2pstream.py @@ -1,24 +1,25 @@ - -from websocket import EufySecurityWebSocket -import aiohttp +import os +import signal +import sys +import threading +from aiohttp import ClientSession import asyncio -import json -import socket import select -import threading -import time -import sys -import signal +import socket from http.server import BaseHTTPRequestHandler, HTTPServer -import os from queue import Queue +from threading import Thread +import argparse +from websocket import EufySecurityWebSocket +import json -RECV_CHUNK_SIZE = 4096 - -video_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -audio_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -backchannel_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +# Variables +camera_handlers = {} +run_event = threading.Event() +debug = False +# Constants +RECV_CHUNK_SIZE = 4096 EVENT_CONFIGURATION: dict = { "livestream video data": { @@ -55,7 +56,7 @@ "messageId": "talkback_audio_data", "command": "device.talkback_audio_data", "serialNumber": None, - "buffer": None + "buffer": None, } STOP_TALKBACK = { @@ -75,21 +76,36 @@ START_LISTENING_MESSAGE = {"messageId": "start_listening", "command": "start_listening"} -TALKBACK_RESULT_MESSAGE = {"messageId": "talkback_audio_data", "errorCode": "device_talkback_not_running"} +TALKBACK_RESULT_MESSAGE = { + "messageId": "talkback_audio_data", + "errorCode": "device_talkback_not_running", +} DRIVER_CONNECT_MESSAGE = {"messageId": "driver_connect", "command": "driver.connect"} -run_event = threading.Event() def exit_handler(signum, frame): - print(f'Signal handler called with signal {signum}') + """Signal handler to stop the script.""" + logMessage(f"Signal handler called with signal {signum}") run_event.set() + # Install signal handler signal.signal(signal.SIGINT, exit_handler) + +def logMessage(message, force=False): + """Log a message to the console.""" + if debug or force: + print(message) + sys.stdout.flush() + + class ClientAcceptThread(threading.Thread): - def __init__(self,socket,run_event,name,ws,serialno): + """Thread to accept incoming connections from clients.""" + + def __init__(self, socket, run_event, name, ws, serialno): + """Initialize the thread.""" threading.Thread.__init__(self) self.socket = socket self.queues = [] @@ -100,6 +116,7 @@ def __init__(self,socket,run_event,name,ws,serialno): self.my_threads = [] def update_threads(self): + """Update the list of active threads.""" my_threads_before = len(self.my_threads) for thread in self.my_threads: if not thread.is_alive(): @@ -107,37 +124,36 @@ def update_threads(self): self.my_threads = [t for t in self.my_threads if t.is_alive()] if self.ws and my_threads_before > 0 and len(self.my_threads) == 0: if self.name == "BackChannel": - print("All clients died (BackChannel): ", self.name) - sys.stdout.flush() + logMessage(f"All clients died (BackChannel): {self.name}") else: - print("All clients died. Stopping Stream: ", self.name) - sys.stdout.flush() - + logMessage(f"All clients died. Stopping Stream: {self.name}") msg = STOP_P2P_LIVESTREAM_MESSAGE.copy() msg["serialNumber"] = self.serialno asyncio.run(self.ws.send_message(json.dumps(msg))) def run(self): - print("Accepting connection for ", self.name) + """Run the thread to accept incoming connections.""" + logMessage(f"Accepting {self.name} connection for {self.serialno}") msg = STOP_TALKBACK.copy() msg["serialNumber"] = self.serialno asyncio.run(self.ws.send_message(json.dumps(msg))) + logMessage(f"stop talkback sent for {self.serialno}") while not self.run_event.is_set(): self.update_threads() sys.stdout.flush() try: client_sock, client_addr = self.socket.accept() - print ("New connection added: ", client_addr, " for ", self.name) - sys.stdout.flush() - if self.name == "BackChannel": client_sock.setblocking(True) - print("Starting BackChannel") - thread = ClientRecvThread(client_sock, run_event, self.name, self.ws, self.serialno) + thread = ClientRecvThread( + client_sock, run_event, self.name, self.ws, self.serialno + ) thread.start() else: client_sock.setblocking(False) - thread = ClientSendThread(client_sock, run_event, self.name, self.ws, self.serialno) + thread = ClientSendThread( + client_sock, run_event, self.name, self.ws, self.serialno + ) self.queues.append(thread.queue) if self.ws: msg = START_P2P_LIVESTREAM_MESSAGE.copy() @@ -147,9 +163,14 @@ def run(self): thread.start() except socket.timeout: pass + logMessage(f"ClientAcceptThread {self.name} ended for {self.serialno}") + class ClientSendThread(threading.Thread): - def __init__(self,client_sock,run_event,name,ws,serialno): + """Thread to send data to clients.""" + + def __init__(self, client_sock, run_event, name, ws, serialno): + """Initialize the thread.""" threading.Thread.__init__(self) self.client_sock = client_sock self.queue = Queue(100) @@ -159,41 +180,42 @@ def __init__(self,client_sock,run_event,name,ws,serialno): self.serialno = serialno def run(self): - print ("Thread running: ", self.name) - sys.stdout.flush() - + """Run the thread to send data to clients.""" + logMessage(f"Thread {self.name} running for {self.serialno}") try: while not self.run_event.is_set(): - ready_to_read, ready_to_write, in_error = \ - select.select([], [self.client_sock], [self.client_sock], 2) + ready_to_read, ready_to_write, in_error = select.select( + [], [self.client_sock], [self.client_sock], 2 + ) if len(in_error): - print("Exception in socket", self.name) - sys.stdout.flush() + logMessage(f"Exception in socket {self.name}") + break if not len(ready_to_write): - print("Socket not ready to write ", self.name) - sys.stdout.flush() + logMessage(f"Socket not ready to write {self.name}") + break if not self.queue.empty(): - self.client_sock.sendall( - bytearray(self.queue.get(True)["data"]) - ) + self.client_sock.sendall(bytearray(self.queue.get(True)["data"])) except socket.error as e: - print("Connection lost", self.name, e) + logMessage(f"Connection lost {self.name}: {e}") pass except socket.timeout: - print("Timeout on socket for ", self.name) + logMessage(f"Timeout on socket for {self.name}") pass try: self.client_sock.shutdown(socket.SHUT_RDWR) except OSError: - print ("Error shutdown socket: ", self.name) + logMessage(f"Error shutdown socket: {self.name}") self.client_sock.close() - print ("Thread stopping: ", self.name) - sys.stdout.flush() + logMessage(f"Thread {self.name} stopping for {self.serialno}") + class ClientRecvThread(threading.Thread): - def __init__(self,client_sock,run_event,name,ws,serialno): + """Thread to receive data from clients.""" + + def __init__(self, client_sock, run_event, name, ws, serialno): + """Initialize the thread.""" threading.Thread.__init__(self) self.client_sock = client_sock self.run_event = run_event @@ -202,203 +224,319 @@ def __init__(self,client_sock,run_event,name,ws,serialno): self.serialno = serialno def run(self): + """Run the thread to receive data from clients.""" msg = START_TALKBACK.copy() msg["serialNumber"] = self.serialno asyncio.run(self.ws.send_message(json.dumps(msg))) try: - curr_packet = bytearray() + curr_packet = bytearray() no_data = 0 while not self.run_event.is_set(): try: - ready_to_read, ready_to_write, in_error = \ - select.select([self.client_sock,], [], [self.client_sock], 2) + ready_to_read, ready_to_write, in_error = select.select( + [ + self.client_sock, + ], + [], + [self.client_sock], + 2, + ) if len(in_error): - print("Exception in socket", self.name) - sys.stdout.flush() + logMessage(f"Exception in socket {self.name}") + break if len(ready_to_read): data = self.client_sock.recv(RECV_CHUNK_SIZE) curr_packet += bytearray(data) - if len(data) > 0: # and len(data) <= RECV_CHUNK_SIZE: + if len(data) > 0: # and len(data) <= RECV_CHUNK_SIZE: msg = SEND_TALKBACK_AUDIO_DATA.copy() msg["serialNumber"] = self.serialno - msg["buffer"] = list(bytes(curr_packet)) + msg["buffer"] = list(bytes(curr_packet)) asyncio.run(self.ws.send_message(json.dumps(msg))) - curr_packet = bytearray() + curr_packet = bytearray() no_data = 0 else: no_data += 1 else: no_data += 1 - if (no_data >= 15): - print("15x in a row no data in socket ", self.name) - sys.stdout.flush() + if no_data >= 15: + logMessage(f"15x in a row no data in socket {self.name}") break except BlockingIOError: # Resource temporarily unavailable (errno EWOULDBLOCK) pass except socket.error as e: - print("Connection lost", self.name, e) + logMessage(f"Connection lost {self.name}: {e}") pass except socket.timeout: - print("Timeout on socket for ", self.name) + logMessage(f"Timeout on socket for {self.name}") pass except select.error: - print("Select error on socket ", self.name) + logMessage(f"Select error on socket {self.name}") pass - sys.stdout.flush() + try: self.client_sock.shutdown(socket.SHUT_RDWR) except OSError: - print ("Error shutdown socket: ", self.name) - sys.stdout.flush() + logMessage(f"Error shutdown socket: {self.name}") + self.client_sock.close() msg = STOP_TALKBACK.copy() msg["serialNumber"] = self.serialno asyncio.run(self.ws.send_message(json.dumps(msg))) + logMessage(f"Thread {self.name} stopping for {self.serialno}") -class Connector: - def __init__( - self, - run_event, - ): - video_sock.bind(("0.0.0.0", 63336)) - video_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - video_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - video_sock.settimeout(1) # timeout for listening - video_sock.listen() - audio_sock.bind(("0.0.0.0", 63337)) - audio_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - audio_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - audio_sock.settimeout(1) # timeout for listening - audio_sock.listen() - backchannel_sock.bind(("0.0.0.0", 63338)) - backchannel_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - backchannel_sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - backchannel_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - backchannel_sock.settimeout(1) # timeout for listening - backchannel_sock.listen() - self.ws = None + +class CameraStreamHandler: + """Handler for camera streams.""" + + def __init__(self, serial_number, start_port, run_event): + """Initialize the handler.""" + logMessage( + f" - CameraStreamHandler - __init__ - serial_number: {serial_number} - video_port: {start_port} - audio_port: {start_port + 1} - backchannel_port: {start_port + 2}" + ) + self.serial_number = serial_number + self.start_port = start_port self.run_event = run_event - self.serialno = "" + self.ws = None + self.video_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.audio_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.backchannel_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.video_sock.bind(("0.0.0.0", self.start_port)) + self.video_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.video_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.video_sock.settimeout(1) # timeout for listening + self.audio_sock.bind(("0.0.0.0", self.start_port + 1)) + self.audio_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.audio_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.audio_sock.settimeout(1) # timeout for listening + self.backchannel_sock.bind(("0.0.0.0", self.start_port + 2)) + self.backchannel_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.backchannel_sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + self.backchannel_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.backchannel_sock.settimeout(1) # timeout for listening + self.video_sock.listen() + self.audio_sock.listen() + self.backchannel_sock.listen() + + def start_stream(self): + """Start the stream.""" + logMessage(f"Starting stream for camera {self.serial_number}.") + self.video_thread = ClientAcceptThread( + self.video_sock, self.run_event, "Video", self.ws, self.serial_number + ) + self.audio_thread = ClientAcceptThread( + self.audio_sock, self.run_event, "Audio", self.ws, self.serial_number + ) + self.backchannel_thread = ClientAcceptThread( + self.backchannel_sock, + self.run_event, + "BackChannel", + self.ws, + self.serial_number, + ) + self.audio_thread.start() + self.video_thread.start() + self.backchannel_thread.start() + + def setWs(self, ws: EufySecurityWebSocket): + """Set the websocket for the camera handler.""" + self.ws = ws def stop(self): + """Stop the stream.""" try: self.video_sock.shutdown(socket.SHUT_RDWR) except OSError: - print ("Error shutdown socket") + logMessage("Error shutdown socket", True) self.video_sock.close() try: self.audio_sock.shutdown(socket.SHUT_RDWR) except OSError: - print ("Error shutdown socket") + logMessage("Error shutdown socket", True) self.audio_sock.close() try: self.backchannel_sock.shutdown(socket.SHUT_RDWR) except OSError: - print ("Error shutdown socket") + logMessage("Error shutdown socket", True) self.backchannel_sock.close() - def setWs(self, ws : EufySecurityWebSocket): - self.ws = ws + def start_talkback(self): + """Start the talkback.""" + logMessage(f"Starting talkback for camera {self.serial_number}.") + msg = START_TALKBACK.copy() + msg["serialNumber"] = self.serial_number + asyncio.run(self.ws.send_message(json.dumps(msg))) - async def on_open(self): - print(f" on_open - executed") - async def on_close(self): - print(f" on_close - executed") - self.run_event.set() - self.ws = None - stop() - os._exit(-1) - - async def on_error(self, message): - print(f" on_error - executed - {message}") - - async def on_message(self, message): - payload = message.json() - message_type: str = payload["type"] - if message_type == "result": - message_id = payload["messageId"] - if message_id != SEND_TALKBACK_AUDIO_DATA["messageId"]: - # Avoid spamming of TALKBACK_AUDIO_DATA logs - print(f"on_message result: {payload}") - sys.stdout.flush() - if message_id == START_LISTENING_MESSAGE["messageId"]: - message_result = payload[message_type] - states = message_result["state"] - for state in states["devices"]: - self.serialno = state["serialNumber"] - self.video_thread = ClientAcceptThread(video_sock, run_event, "Video", self.ws, self.serialno) - self.audio_thread = ClientAcceptThread(audio_sock, run_event, "Audio", self.ws, self.serialno) - self.backchannel_thread = ClientAcceptThread(backchannel_sock, run_event, "BackChannel", self.ws, self.serialno) - self.audio_thread.start() - self.video_thread.start() - self.backchannel_thread.start() - if message_id == TALKBACK_RESULT_MESSAGE["messageId"] and "errorCode" in payload: - error_code = payload["errorCode"] - if error_code == "device_talkback_not_running": - msg = START_TALKBACK.copy() - msg["serialNumber"] = self.serialno - await self.ws.send_message(json.dumps(msg)) - - if message_type == "event": - message = payload[message_type] - event_type = message["event"] - sys.stdout.flush() - if message["event"] == "livestream audio data": - #print(f"on_audio - {payload}") - event_value = message[EVENT_CONFIGURATION[event_type]["value"]] - event_data_type = EVENT_CONFIGURATION[event_type]["type"] - if event_data_type == "event": - for queue in self.audio_thread.queues: +async def on_open(): + """Callback when the websocket is opened.""" + logMessage(f" on_open - executed") + + +async def on_close(): + """Callback when the websocket is closed.""" + logMessage(f" on_close - executed") + run_event.set() + # Close all camera handlers. + for handler in camera_handlers.values(): + handler.stop() + os._exit(-1) + + +async def on_error(message): + """Callback when an error occurs.""" + logMessage(f" on_error - executed - {message}") + + +async def on_message(message): + """Callback when a message is received.""" + payload = message.json() + message_type: str = payload["type"] + sys.stdout.flush() + if message_type == "result": + message_id = payload["messageId"] + if message_id != SEND_TALKBACK_AUDIO_DATA["messageId"]: + # Avoid spamming of TALKBACK_AUDIO_DATA logs + logMessage(f"on_message result: {payload}") + + if message_id == START_LISTENING_MESSAGE["messageId"]: + message_result = payload[message_type] + states = message_result["state"] + for state in states["devices"]: + serialno = state["serialNumber"] + if serialno in camera_handlers: + camera_handlers[serialno].start_stream() + logMessage(f"Started stream for camera {serialno}.", True) + else: + logMessage( + f"Found unknown Eufy camera with serial number {serialno}.", + True, + ) + elif ( + message_id == TALKBACK_RESULT_MESSAGE["messageId"] + and "errorCode" in payload + ): + # TODO: Handle error codes with muliple cameras. This one is tricky since + # the error code is not specific to a camera. Alternatives: 1) Send the + # START_TALKBACK message again for all cameras. 2) Somehow determine which + # camera caused the error and send the START_TALKBACK message for that camera. + # Don't know if 2) is possible. + # 1) is not ideal. It seems to break streaming completely. + # error_code = payload["errorCode"] + # if error_code == "device_talkback_not_running": + # for handler in camera_handlers.values(): + # handler.start_talkback() + pass + elif message_type == "event": + message = payload[message_type] + event_type = message["event"] + sys.stdout.flush() + if message["event"] == "livestream audio data": + event_value = message[EVENT_CONFIGURATION[event_type]["value"]] + event_data_type = EVENT_CONFIGURATION[event_type]["type"] + if event_data_type == "event": + serialno = message["serialNumber"] + if serialno in camera_handlers: + for queue in camera_handlers[serialno].audio_thread.queues: if queue.full(): - print("Audio queue full.") + logMessage(f"Audio queue full.") queue.get(False) queue.put(event_value) - if message["event"] == "livestream video data": - #print(f"on_video - {payload}") - event_value = message[EVENT_CONFIGURATION[event_type]["value"]] - event_data_type = EVENT_CONFIGURATION[event_type]["type"] - if event_data_type == "event": - for queue in self.video_thread.queues: + elif message["event"] == "livestream video data": + event_value = message[EVENT_CONFIGURATION[event_type]["value"]] + event_data_type = EVENT_CONFIGURATION[event_type]["type"] + if event_data_type == "event": + serialno = message["serialNumber"] + if serialno in camera_handlers: + for queue in camera_handlers[serialno].video_thread.queues: if queue.full(): - print("Video queue full.") + logMessage(f"Video queue full.") queue.get(False) queue.put(event_value) - if message["event"] == "livestream error": - print("Livestream Error!") - if self.ws and len(self.video_thread.queues) > 0: - msg = START_P2P_LIVESTREAM_MESSAGE.copy() - msg["serialNumber"] = self.serialno - await self.ws.send_message(json.dumps(msg)) - -# Websocket connector -c = Connector(run_event) - -async def init_websocket(): - ws: EufySecurityWebSocket = EufySecurityWebSocket( + elif message["event"] == "livestream error": + logMessage(f"Livestream Error! - {message}") + # TODO: Handle error codes with muliple cameras. + # if self.ws and len(self.video_thread.queues) > 0: + # msg = START_P2P_LIVESTREAM_MESSAGE.copy() + # msg["serialNumber"] = self.serialno + # await self.ws.send_message(json.dumps(msg)) + else: + logMessage(f"Unknown event type: {message['event']}") + logMessage(f"{message}") + else: + logMessage(f"Unknown message type: {message_type}") + logMessage(f"{message}") + + +async def init_websocket(ws_security_port): + """Initialize the websocket.""" + websocket = EufySecurityWebSocket( "402f1039-eufy-security-ws", - sys.argv[1], - aiohttp.ClientSession(), - c.on_open, - c.on_message, - c.on_close, - c.on_error, + ws_security_port, + ClientSession(), + on_open, + on_message, + on_close, + on_error, ) - c.setWs(ws) + # Set the websocket for all camera handlers. + for handler in camera_handlers.values(): + handler.setWs(websocket) + try: - await ws.connect() - await ws.send_message(json.dumps(START_LISTENING_MESSAGE)) - await ws.send_message(json.dumps(SET_API_SCHEMA)) - await ws.send_message(json.dumps(DRIVER_CONNECT_MESSAGE)) + await websocket.connect() + await websocket.send_message(json.dumps(START_LISTENING_MESSAGE)) + await websocket.send_message(json.dumps(SET_API_SCHEMA)) + await websocket.send_message(json.dumps(DRIVER_CONNECT_MESSAGE)) while not run_event.is_set(): await asyncio.sleep(1000) except Exception as ex: - print(ex) - print("init_websocket failed. Exiting.") + logMessage(ex) + logMessage(f"init_websocket failed. Exiting.") os._exit(-1) -loop = asyncio.get_event_loop() -loop.run_until_complete(init_websocket()) +if __name__ == "__main__": + """Main entry point.""" + # Parse command-line arguments + parser = argparse.ArgumentParser( + description="Stream video and audio from multiple Eufy cameras." + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug mode (default: disabled).", + ) + parser.add_argument( + "--camera_serials", + nargs="+", + required=True, + help="List of camera serial numbers (e.g., --camera_serials CAM1_SERIAL CAM2_SERIAL).", + ) + parser.add_argument( + "--ws_security_port", + type=int, + default=3000, + help="Base port number for streaming (default: 3000).", + ) + args = parser.parse_args() + debug = args.debug + print(f"Debug: {debug}") + sys.stdout.flush() + logMessage(f"WS Security Port: {args.ws_security_port}") + + # Define constants. + BASE_PORT = 63336 + # Create one Camera Stream Handler per camera. + for i, serial in enumerate(args.camera_serials): + if serial != "null": + logMessage(f"Creating CameraStreamHandler for camera: {serial}") + handler = CameraStreamHandler(serial, BASE_PORT + i * 3, run_event) + # handler.setup_sockets() + camera_handlers[serial] = handler + + logMessage(f"Starting websocket.") + # Loop forever. + loop = asyncio.get_event_loop() + loop.run_until_complete(init_websocket(args.ws_security_port)) diff --git a/files/run.sh b/files/run.sh index 83cd891..4c5e6ae 100755 --- a/files/run.sh +++ b/files/run.sh @@ -1,9 +1,33 @@ -#!/bin/bash +#!/usr/bin/with-contenv bashio +export PYTHONUNBUFFERED=1 set +u CONFIG_PATH=/data/options.json +echo "Starting EufyP2PStream" + EUFY_WS_PORT=$(jq --raw-output ".eufy_security_ws_port" $CONFIG_PATH) +CAM1_SN=$(jq --raw-output ".camera_1_serial_number" $CONFIG_PATH) +CAM2_SN=$(jq --raw-output ".camera_2_serial_number" $CONFIG_PATH) +CAM3_SN=$(jq --raw-output ".camera_3_serial_number" $CONFIG_PATH) +CAM4_SN=$(jq --raw-output ".camera_4_serial_number" $CONFIG_PATH) +CAM5_SN=$(jq --raw-output ".camera_5_serial_number" $CONFIG_PATH) +CAM6_SN=$(jq --raw-output ".camera_6_serial_number" $CONFIG_PATH) +CAM7_SN=$(jq --raw-output ".camera_7_serial_number" $CONFIG_PATH) +CAM8_SN=$(jq --raw-output ".camera_8_serial_number" $CONFIG_PATH) +CAM9_SN=$(jq --raw-output ".camera_9_serial_number" $CONFIG_PATH) +CAM10_SN=$(jq --raw-output ".camera_10_serial_number" $CONFIG_PATH) +CAM11_SN=$(jq --raw-output ".camera_11_serial_number" $CONFIG_PATH) +CAM12_SN=$(jq --raw-output ".camera_12_serial_number" $CONFIG_PATH) +CAM13_SN=$(jq --raw-output ".camera_13_serial_number" $CONFIG_PATH) +CAM14_SN=$(jq --raw-output ".camera_14_serial_number" $CONFIG_PATH) +CAM15_SN=$(jq --raw-output ".camera_15_serial_number" $CONFIG_PATH) +DEBUG_LOG=$(jq --raw-output ".debug_log" $CONFIG_PATH) +if [ "$DEBUG_LOG" == "true" ]; then + DEBUG_LOG="--debug" +else + DEBUG_LOG="" +fi echo "Starting EufyP2PStream. eufy_security_ws_port is $EUFY_WS_PORT" -python3 -u /eufyp2pstream.py $EUFY_WS_PORT +python3 -u /eufyp2pstream.py $DEBUG_LOG --ws_security_port $EUFY_WS_PORT --camera_serials $CAM1_SN $CAM2_SN $CAM3_SN $CAM4_SN $CAM5_SN $CAM6_SN $CAM7_SN $CAM8_SN $CAM9_SN $CAM10_SN $CAM11_SN $CAM12_SN $CAM13_SN $CAM14_SN $CAM15_SN echo "Exited with code $?" \ No newline at end of file diff --git a/files/websocket.py b/files/websocket.py index 9341a4e..1dc2790 100644 --- a/files/websocket.py +++ b/files/websocket.py @@ -73,13 +73,11 @@ def on_error(self, error: Text = "Unspecified") -> None: def on_close(self, future="") -> None: print(f" - WebSocket Connection Closed. %s", future) - print( - f" - WebSocket Connection Closed. %s", self.close_callback - ) + print(f" - WebSocket Connection Closed. %s", self.close_callback) if self.close_callback is not None: self.ws = None asyncio.run_coroutine_threadsafe(self.close_callback(), self.loop) async def send_message(self, message): - #print(f" - WebSocket message sent. %s", message) + # print(f" - WebSocket message sent. %s", message) await self.ws.send_str(message)