Skip to content

Commit

Permalink
python formatter
Browse files Browse the repository at this point in the history
fix some flake8 warnings and typos
add new pytests to cover changed lines

Signed-off-by: Rosi2143 <[email protected]>
  • Loading branch information
Rosi2143 committed Nov 19, 2023
1 parent daea3cf commit 2d8219e
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 64 deletions.
4 changes: 2 additions & 2 deletions blinkpy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ async def request_camera_sensors(blink, network, camera_id):
:param blink: Blink instance.
:param network: Sync module network id.
:param camera_id: Camera ID of camera to request sesnor info from.
:param camera_id: Camera ID of camera to request sensor info from.
"""
url = f"{blink.urls.base_url}/network/{network}/camera/{camera_id}/signals"
return await http_get(blink, url)
Expand Down Expand Up @@ -476,7 +476,7 @@ async def http_get(
async def http_post(blink, url, is_retry=False, data=None, json=True, timeout=TIMEOUT):
"""Perform an http post request.
:param url: URL to perfom post request.
:param url: URL to perform post request.
:param is_retry: Is this part of a re-auth attempt?
:param data: str body for post request
:param json: Return json response? TRUE/False
Expand Down
6 changes: 3 additions & 3 deletions blinkpy/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, login_data=None, no_prompt=False, session=None):
- username
- password
:param no_prompt: Should any user input prompts
be supressed? True/FALSE
be suppressed? True/FALSE
"""
if login_data is None:
login_data = {}
Expand Down Expand Up @@ -152,8 +152,8 @@ async def query(
is_retry=False,
timeout=TIMEOUT,
):
"""Perform server requests."""
"""
"""Perform server requests.
:param url: URL to perform request
:param data: Data to send
:param headers: Headers to send
Expand Down
13 changes: 8 additions & 5 deletions blinkpy/blinkpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(
Useful for preventing motion_detected property
from de-asserting too quickly.
:param no_owls: Disable searching for owl entries (blink mini cameras \
only known entity). Prevents an uneccessary API call \
only known entity). Prevents an unnecessary API call \
if you don't have these in your network.
"""
self.auth = Auth(session=session)
Expand Down Expand Up @@ -101,7 +101,7 @@ async def refresh(self, force=False, force_cache=False):
# Prevents rapid clearing of motion detect property
self.last_refresh = int(time.time())
last_refresh = datetime.datetime.fromtimestamp(self.last_refresh)
_LOGGER.debug(f"last_refresh={last_refresh}")
_LOGGER.debug("last_refresh = %s", last_refresh)

return True
return False
Expand All @@ -128,8 +128,9 @@ async def start(self):
# Initialize last_refresh to be just before the refresh delay period.
self.last_refresh = int(time.time() - self.refresh_rate * 1.05)
_LOGGER.debug(
f"Initialized last_refresh to {self.last_refresh} == "
f"{datetime.datetime.fromtimestamp(self.last_refresh)}"
"Initialized last_refresh to %s == %s",
self.last_refresh,
datetime.datetime.fromtimestamp(self.last_refresh),
)

return await self.setup_post_verify()
Expand Down Expand Up @@ -167,12 +168,13 @@ async def setup_sync_module(self, name, network_id, cameras):
await self.sync[name].start()

async def get_homescreen(self):
"""Get homecreen information."""
"""Get homescreen information."""
if self.no_owls:
_LOGGER.debug("Skipping owl extraction.")
self.homescreen = {}
return
self.homescreen = await api.request_homescreen(self)
_LOGGER.debug("homescreen = %s", util.json_dumps(self.homescreen))

async def setup_owls(self):
"""Check for mini cameras."""
Expand Down Expand Up @@ -234,6 +236,7 @@ async def setup_camera_list(self):
response = await api.request_camera_usage(self)
try:
for network in response["networks"]:
_LOGGER.info("network = %s", util.json_dumps(network))
camera_network = str(network["network_id"])
if camera_network not in all_cameras:
all_cameras[camera_network] = []
Expand Down
69 changes: 42 additions & 27 deletions blinkpy/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class BlinkCamera:
"""Class to initialize individual camera."""

def __init__(self, sync):
"""Initiailize BlinkCamera."""
"""Initialize BlinkCamera."""
self.sync = sync
self.name = None
self.camera_id = None
Expand Down Expand Up @@ -76,7 +76,7 @@ def battery(self):

@property
def temperature_c(self):
"""Return temperature in celcius."""
"""Return temperature in celsius."""
try:
return round((self.temperature - 32) / 9.0 * 5.0, 1)
except TypeError:
Expand Down Expand Up @@ -170,7 +170,7 @@ async def get_thumbnail(self, url=None):
if not url:
url = self.thumbnail
if not url:
_LOGGER.warning(f"Thumbnail URL not available: self.thumbnail={url}")
_LOGGER.warning("Thumbnail URL not available: self.thumbnail=%s", url)
return None
return await api.http_get(
self.sync.blink,
Expand All @@ -185,7 +185,7 @@ async def get_video_clip(self, url=None):
if not url:
url = self.clip
if not url:
_LOGGER.warning(f"Video clip URL not available: self.clip={url}")
_LOGGER.warning("Video clip URL not available: self.clip=%s", url)
return None
return await api.http_get(
self.sync.blink,
Expand Down Expand Up @@ -240,15 +240,22 @@ def extract_config_info(self, config):
self.product_type = config.get("type", None)

async def get_sensor_info(self):
"""Retrieve calibrated temperatue from special endpoint."""
"""Retrieve calibrated temperature from special endpoint."""
resp = await api.request_camera_sensors(
self.sync.blink, self.network_id, self.camera_id
)
try:
self.temperature_calibrated = resp["temp"]
except (TypeError, KeyError):
self.temperature_calibrated = self.temperature
_LOGGER.warning("Could not retrieve calibrated temperature.")
_LOGGER.warning(
"Could not retrieve calibrated temperature response %s.", resp
)
_LOGGER.warning(
"for network_id (%s) and camera_id (%s)",
self.network_id,
self.camera_id,
)

async def update_images(self, config, force_cache=False, expire_clips=True):
"""Update images for camera."""
Expand Down Expand Up @@ -278,7 +285,7 @@ async def update_images(self, config, force_cache=False, expire_clips=True):
new_thumbnail = urljoin(self.sync.urls.base_url, thumb_string)

else:
_LOGGER.warning("Could not find thumbnail for camera %s", self.name)
_LOGGER.warning("Could not find thumbnail for camera %s.", self.name)

try:
self.motion_detected = self.sync.motion[self.name]
Expand All @@ -288,7 +295,7 @@ async def update_images(self, config, force_cache=False, expire_clips=True):
clip_addr = None
try:

def timest(record):
def timesort(record):
rec_time = record["time"]
iso_time = datetime.datetime.fromisoformat(rec_time)
stamp = int(iso_time.timestamp())
Expand All @@ -298,7 +305,7 @@ def timest(record):
len(self.sync.last_records) > 0
and len(self.sync.last_records[self.name]) > 0
):
last_records = sorted(self.sync.last_records[self.name], key=timest)
last_records = sorted(self.sync.last_records[self.name], key=timesort)
for rec in last_records:
clip_addr = rec["clip"]
self.clip = f"{self.sync.urls.base_url}{clip_addr}"
Expand All @@ -310,17 +317,21 @@ def timest(record):
self.recent_clips.append(recent)
if len(self.recent_clips) > 0:
_LOGGER.debug(
f"Found {len(self.recent_clips)} recent clips for {self.name}"
"Found %s recent clips for %s",
len(self.recent_clips),
self.name,
)
_LOGGER.debug(
f"Most recent clip for {self.name} was created at "
f"{self.last_record}: {self.clip}"
"Most recent clip for %s was created at %s : %s",
self.name,
self.last_record,
self.clip,
)
except (KeyError, IndexError):
ex = traceback.format_exc()
trace = "".join(traceback.format_stack())
_LOGGER.error(f"Error getting last records for '{self.name}': {ex}")
_LOGGER.debug(f"\n{trace}")
_LOGGER.error("Error getting last records for '%s': %s", self.name, ex)
_LOGGER.debug("\n%s", trace)

# If the thumbnail or clip have changed, update the cache
update_cached_image = False
Expand Down Expand Up @@ -356,20 +367,21 @@ async def expire_recent_clips(self, delta=datetime.timedelta(hours=1)):
to_keep.append(clip)
num_expired = len(self.recent_clips) - len(to_keep)
if num_expired > 0:
_LOGGER.info(f"Expired {num_expired} clips from '{self.name}'")
_LOGGER.info("Expired %s clips from '%s'", num_expired, self.name)
self.recent_clips = copy.deepcopy(to_keep)
if len(self.recent_clips) > 0:
_LOGGER.info(
f"'{self.name}' has {len(self.recent_clips)} "
"clips available for download"
"'%s' has %s clips available for download",
self.name,
len(self.recent_clips),
)
for clip in self.recent_clips:
url = clip["clip"]
if "local_storage" in url:
await api.http_post(self.sync.blink, url)

async def get_liveview(self):
"""Get livewview rtsps link."""
"""Get liveview rtsps link."""
response = await api.request_camera_liveview(
self.sync.blink, self.sync.network_id, self.camera_id
)
Expand All @@ -384,8 +396,8 @@ async def image_to_file(self, path):
_LOGGER.debug("Writing image from %s to %s", self.name, path)
response = await self.get_media()
if response and response.status == 200:
async with open(path, "wb") as imgfile:
await imgfile.write(await response.read())
async with open(path, "wb") as imagefile:
await imagefile.write(await response.read())
else:
_LOGGER.error("Cannot write image to file, response %s", response.status)

Expand Down Expand Up @@ -425,7 +437,7 @@ async def save_recent_clips(
created=created_at, name=to_alphanumeric(self.name)
)
path = os.path.join(output_dir, file_name)
_LOGGER.debug(f"Saving {clip_addr} to {path}")
_LOGGER.debug("Saving %s to %s", clip_addr, path)
media = await self.get_video_clip(clip_addr)
if media and media.status == 200:
async with open(path, "wb") as clip_file:
Expand All @@ -434,19 +446,22 @@ async def save_recent_clips(
try:
# Remove recent clip from the list once the download has finished.
self.recent_clips.remove(clip)
_LOGGER.debug(f"Removed {clip} from recent clips")
_LOGGER.debug("Removed %s from recent clips", clip)
except ValueError:
ex = traceback.format_exc()
_LOGGER.error(f"Error removing clip from list: {ex}")
_LOGGER.error("Error removing clip from list: %s", ex)
trace = "".join(traceback.format_stack())
_LOGGER.debug(f"\n{trace}")
_LOGGER.debug("\n%s", trace)

if len(recent) == 0:
_LOGGER.info(f"No recent clips to save for '{self.name}'.")
_LOGGER.info("No recent clips to save for '%s'.", self.name)
else:
_LOGGER.info(
f"Saved {num_saved} of {len(recent)} recent clips from "
f"'{self.name}' to directory {output_dir}"
"Saved %s of %s recent clips from '%s' to directory %s",
num_saved,
len(recent),
self.name,
output_dir,
)


Expand Down
7 changes: 6 additions & 1 deletion blinkpy/helpers/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ async def json_save(data, file_name):
await json_file.write(json.dumps(data, indent=4))


def json_dumps(json_in, indent=2):
"""Return a well formated json string."""
return json.dumps(json_in, indent=indent)


def gen_uid(size, uid_format=False):
"""Create a random sring."""
"""Create a random string."""
if uid_format:
token = (
f"BlinkCamera_{secrets.token_hex(4)}-"
Expand Down
Loading

0 comments on commit 2d8219e

Please sign in to comment.