diff --git a/pytapo/media_stream/downloader.py b/pytapo/media_stream/downloader.py index 8ee4608..7d50c8f 100644 --- a/pytapo/media_stream/downloader.py +++ b/pytapo/media_stream/downloader.py @@ -1,12 +1,12 @@ from pytapo.media_stream.convert import Convert from pytapo import Tapo from datetime import datetime +from json import JSONDecodeError import json import os import hashlib - class Downloader: FRESH_RECORDING_TIME_SECONDS = 60 @@ -172,6 +172,31 @@ async def download(self, retry=False): convert.save(fileName, segmentLength) downloading = False break + # in case a finished stream notification is caught, save the chunks as is + elif resp.mimetype == "application/json": + try: + json_data = json.loads(resp.plaintext.decode()) + + if ("type" in json_data + and json_data["type"] == "notification" + and "params" in json_data + and "event_type" in json_data["params"] + and json_data["params"]["event_type"] == "stream_status" + and "status" in json_data["params"] + and json_data["params"]["status"] == "finished"): + downloadedFull = True + currentAction = "Converting" + yield { + "currentAction": currentAction, + "fileName": fileName, + "progress": 0, + "total": 0, + } + convert.save(fileName, convert.getLength()) + downloading = False + break + except JSONDecodeError: + self.tapo.debugLog("Unable to parse JSON sent from device") if downloading: # Handle case where camera randomly stopped respoding if not downloadedFull and not retry: diff --git a/pytapo/media_stream/session.py b/pytapo/media_stream/session.py index 84091d1..8f13153 100644 --- a/pytapo/media_stream/session.py +++ b/pytapo/media_stream/session.py @@ -236,13 +236,9 @@ async def _device_response_handler_loop(self): logger.debug("Handling new server response") - # print("got response") - # Read and parse headers headers_block = await self._reader.readuntil(b"\r\n\r\n") headers = parse_http_headers(headers_block) - # print(headers) - mimetype = headers["Content-Type"] length = int(headers["Content-Length"]) encrypted = bool(int(headers["X-If-Encrypt"])) @@ -252,26 +248,14 @@ async def _device_response_handler_loop(self): if "X-Data-Sequence" in headers: seq = int(headers["X-Data-Sequence"]) - # print(headers) - # Now we know the content length, let's read it and decrypt it json_data = None - # print("TEST0") data = await self._reader.readexactly(length) if encrypted: - # print("encrypted") ciphertext = data - # print("TEST1") try: - # print("lolo") - # print(ciphertext) plaintext = self._aes.decrypt(ciphertext) - # if length == 384: - # print(plaintext) - # print("lala") - # print(plaintext) except ValueError as e: - # print(e) if "padding is incorrect" in e.args[0].lower(): e = ValueError( e.args[0] @@ -282,26 +266,36 @@ async def _device_response_handler_loop(self): except Exception as e: plaintext = e else: - # print("plaintext") ciphertext = None plaintext = data - # print(plaintext) + + queue: Optional[Queue] = None + # JSON responses sometimes have the above info in the payload, # not the headers. Let's parse it. if mimetype == "application/json": try: json_data = json.loads(plaintext.decode()) if "seq" in json_data: - # print("Setting seq") seq = json_data["seq"] if "params" in json_data and "session_id" in json_data["params"]: session = int(json_data["params"]["session_id"]) - # print("Setting session") + elif ("type" in json_data + and json_data["type"] == "notification" + and "params" in json_data + and "event_type" in json_data["params"] + and json_data["params"]["event_type"] == "stream_status" + and "status" in json_data["params"] + and json_data["params"]["status"] == "finished" + and len(self._sessions) > 0): + # use next queue item to inject this info, since no id session can be inferred + queue = next(iter(self._sessions.values())) except JSONDecodeError: logger.warning("Unable to parse JSON sent from device") if ( - (session is None) + (queue is None) + and (session is None) and (seq is None) or ( (session is not None) @@ -316,15 +310,10 @@ async def _device_response_handler_loop(self): ) continue - # # Update our own sequence numbers to avoid collisions - # if (seq is not None) and (seq > self._seq_counter): - # self._seq_counter = seq + 1 - - queue: Optional[Queue] = None - # Move queue to use sessions from now on if ( - (session is not None) + (queue is None) + and (session is not None) and (seq is not None) and (session not in self._sessions) and (seq in self._sequence_numbers) @@ -350,7 +339,6 @@ async def _device_response_handler_loop(self): ) if seq is not None and seq % self.window_size == 0: # never ack live stream - # print("sending ack") data = { "type": "notification", "params": {"event_type": "stream_sequence"}, @@ -367,7 +355,6 @@ async def _device_response_handler_loop(self): await self._send_http_request(b"--" + self.client_boundary, headers) chunk_size = 4096 for i in range(0, len(data), chunk_size): - # print(data[i : i + chunk_size]) self._writer.write(data[i : i + chunk_size]) await self._writer.drain() @@ -455,9 +442,8 @@ async def transceive( await self._send_http_request(b"--" + self.client_boundary, headers) chunk_size = 4096 - # print("Sending:") + for i in range(0, len(data), chunk_size): - # print(data[i : i + chunk_size]) self._writer.write(data[i : i + chunk_size]) await self._writer.drain() @@ -508,7 +494,7 @@ async def transceive( session = resp.session if resp.encrypted and isinstance(resp.plaintext, Exception): raise resp.plaintext - # print(resp.plaintext) + tsReader.setBuffer(list(resp.plaintext)) pkt = tsReader.getPacket() if pkt: diff --git a/tox.ini b/tox.ini index 63623fb..a0e5a4b 100644 --- a/tox.ini +++ b/tox.ini @@ -14,6 +14,7 @@ deps = requests urllib3 pycryptodome + rtp commands = pytest --ignore=pytapo/media_stream --cov=pytapo --cov-report html --cov-report term coverage report --fail-under=100 \ No newline at end of file