Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added condition when downloading record for notification with stream_status finished #102

Merged
merged 5 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion pytapo/media_stream/downloader.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from pytapo.media_stream.convert import Convert
from pytapo import Tapo
from datetime import datetime
from json import JSONDecodeError

import json
import os
import hashlib


class Downloader:
FRESH_RECORDING_TIME_SECONDS = 60

Expand Down Expand Up @@ -172,6 +172,31 @@ async def download(self, retry=False):
convert.save(fileName, segmentLength)
downloading = False
break
# in case a finished stream notification is caught, save the chunks as is
elif resp.mimetype == "application/json":
try:
json_data = json.loads(resp.plaintext.decode())

if ("type" in json_data
and json_data["type"] == "notification"
and "params" in json_data
and "event_type" in json_data["params"]
and json_data["params"]["event_type"] == "stream_status"
and "status" in json_data["params"]
and json_data["params"]["status"] == "finished"):
downloadedFull = True
currentAction = "Converting"
yield {
"currentAction": currentAction,
"fileName": fileName,
"progress": 0,
"total": 0,
}
convert.save(fileName, convert.getLength())
downloading = False
break
except JSONDecodeError:
self.tapo.debugLog("Unable to parse JSON sent from device")
if downloading:
# Handle case where camera randomly stopped respoding
if not downloadedFull and not retry:
Expand Down
52 changes: 19 additions & 33 deletions pytapo/media_stream/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,9 @@ async def _device_response_handler_loop(self):

logger.debug("Handling new server response")

# print("got response")

# Read and parse headers
headers_block = await self._reader.readuntil(b"\r\n\r\n")
headers = parse_http_headers(headers_block)
# print(headers)

mimetype = headers["Content-Type"]
length = int(headers["Content-Length"])
encrypted = bool(int(headers["X-If-Encrypt"]))
Expand All @@ -252,26 +248,14 @@ async def _device_response_handler_loop(self):
if "X-Data-Sequence" in headers:
seq = int(headers["X-Data-Sequence"])

# print(headers)

# Now we know the content length, let's read it and decrypt it
json_data = None
# print("TEST0")
data = await self._reader.readexactly(length)
if encrypted:
# print("encrypted")
ciphertext = data
# print("TEST1")
try:
# print("lolo")
# print(ciphertext)
plaintext = self._aes.decrypt(ciphertext)
# if length == 384:
# print(plaintext)
# print("lala")
# print(plaintext)
except ValueError as e:
# print(e)
if "padding is incorrect" in e.args[0].lower():
e = ValueError(
e.args[0]
Expand All @@ -282,26 +266,36 @@ async def _device_response_handler_loop(self):
except Exception as e:
plaintext = e
else:
# print("plaintext")
ciphertext = None
plaintext = data
# print(plaintext)

queue: Optional[Queue] = None

# JSON responses sometimes have the above info in the payload,
# not the headers. Let's parse it.
if mimetype == "application/json":
try:
json_data = json.loads(plaintext.decode())
if "seq" in json_data:
# print("Setting seq")
seq = json_data["seq"]
if "params" in json_data and "session_id" in json_data["params"]:
session = int(json_data["params"]["session_id"])
# print("Setting session")
elif ("type" in json_data
and json_data["type"] == "notification"
and "params" in json_data
and "event_type" in json_data["params"]
and json_data["params"]["event_type"] == "stream_status"
and "status" in json_data["params"]
and json_data["params"]["status"] == "finished"
and len(self._sessions) > 0):
# use next queue item to inject this info, since no id session can be inferred
queue = next(iter(self._sessions.values()))
except JSONDecodeError:
logger.warning("Unable to parse JSON sent from device")

if (
(session is None)
(queue is None)
and (session is None)
and (seq is None)
or (
(session is not None)
Expand All @@ -316,15 +310,10 @@ async def _device_response_handler_loop(self):
)
continue

# # Update our own sequence numbers to avoid collisions
# if (seq is not None) and (seq > self._seq_counter):
# self._seq_counter = seq + 1

queue: Optional[Queue] = None

# Move queue to use sessions from now on
if (
(session is not None)
(queue is None)
and (session is not None)
and (seq is not None)
and (session not in self._sessions)
and (seq in self._sequence_numbers)
Expand All @@ -350,7 +339,6 @@ async def _device_response_handler_loop(self):
)

if seq is not None and seq % self.window_size == 0: # never ack live stream
# print("sending ack")
data = {
"type": "notification",
"params": {"event_type": "stream_sequence"},
Expand All @@ -367,7 +355,6 @@ async def _device_response_handler_loop(self):
await self._send_http_request(b"--" + self.client_boundary, headers)
chunk_size = 4096
for i in range(0, len(data), chunk_size):
# print(data[i : i + chunk_size])
self._writer.write(data[i : i + chunk_size])
await self._writer.drain()

Expand Down Expand Up @@ -455,9 +442,8 @@ async def transceive(
await self._send_http_request(b"--" + self.client_boundary, headers)

chunk_size = 4096
# print("Sending:")

for i in range(0, len(data), chunk_size):
# print(data[i : i + chunk_size])
self._writer.write(data[i : i + chunk_size])
await self._writer.drain()

Expand Down Expand Up @@ -508,7 +494,7 @@ async def transceive(
session = resp.session
if resp.encrypted and isinstance(resp.plaintext, Exception):
raise resp.plaintext
# print(resp.plaintext)

tsReader.setBuffer(list(resp.plaintext))
pkt = tsReader.getPacket()
if pkt:
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ deps =
requests
urllib3
pycryptodome
rtp
commands =
pytest --ignore=pytapo/media_stream --cov=pytapo --cov-report html --cov-report term
coverage report --fail-under=100