diff --git a/.gitignore b/.gitignore index b35c2f08..5eabca97 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ .tox/* __pycache__/* htmlcov/* +.coverage .coverage.* coverage.xml *.pyc @@ -12,3 +13,4 @@ dist/* build/* docs/_build *.log +venv diff --git a/CHANGES.rst b/CHANGES.rst index fada7c67..310b2337 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,15 +1,40 @@ +========= Changelog ------------ +========= A list of changes between each release +0.16.0 (2020-07-20) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Breaking Changes:** + +- Add arm property to camera, deprecate motion enable method (`#273 `__) +- Complete refactoring of auth logic (breaks all pre-0.16.0 setups!) (`#261 `__) + +**New Features:** + +- Add is_errored property to Auth class (`#275 `__) +- Add new endpoint to get user infor (`#280 `__) +- Add get_liveview command to camera module (`#289 `__) +- Add blink Mini Camera support (`#290 `__) +- Add option to skip homescreen check (`#305 `__) +- Add different timeout for video and image retrieval (`#323 `__) +- Modifiy session to use HTTPAdapter and handle retries (`#324 `__) +- Add retry option overrides (`#339 `__) + +**All changes:** + +Please see the change list in the (`Release Notes `__) + + 0.15.1 (2020-07-11) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Bugfix: remove "Host" from auth header (`#330 `__) 0.15.0 (2020-05-08) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Breaking Changes:** - Removed support for Python 3.5 (3.6 is now the minimum supported version) @@ -34,7 +59,7 @@ This can be used by instantiating the Blink class with the ``device_id`` paramet - Fix typos (`#244 `__) 0.14.3 (2020-04-22) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Add time check on recorded videos before determining motion - Fix motion detection variable suck to ``True`` - Add ability to load credentials from a json file @@ -42,18 +67,18 @@ This can be used by instantiating the Blink class with the ``device_id`` paramet - Log response message from server if not attempting a re-authorization 0.14.2 (2019-10-12) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Update dependencies - Dockerize `(@3ch01c __)` 0.14.1 (2019-06-20) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Fix timeout problems blocking blinkpy startup - Updated login urls using ``rest-region`` subdomain - Removed deprecated thumbanil recovery from homescreen 0.14.0 (2019-05-23) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Breaking Changes:** - ``BlinkCamera.battery`` no longer reports a percentage, instead it returns a string representing the state of the battery. @@ -75,11 +100,11 @@ This can be used by instantiating the Blink class with the ``device_id`` paramet 0.13.1 (2019-03-01) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Remove throttle decorator from network status request 0.13.0 (2019-03-01) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Breaking change:** Wifi status reported in dBm again, instead of bars (which is great). Also, the old ``get_camera_info`` method has changed and requires a ``camera_id`` parameter. @@ -104,11 +129,11 @@ Wifi status reported in dBm again, instead of bars (which is great). Also, the 0.12.1 (2019-01-31) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Remove logging improvements since they were incompatible with home-assistant logging 0.12.0 (2019-01-31) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Fix video api endpoint, re-enables motion detection - Add improved logging capability - Add download video method @@ -116,22 +141,22 @@ Wifi status reported in dBm again, instead of bars (which is great). Also, the 0.11.2 (2019-01-23) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Hotfix to prevent platform from stalling due to API change - Motion detection and video recovery broken until new API endpoint discovered 0.11.1 (2019-01-02) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Fixed incorrect backup login url - Added calibrated temperature property for cameras 0.11.0 (2018-11-23) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Added support for multiple sync modules 0.10.3 (2018-11-18) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Use networks endpoint rather than homecreen to retrieve arm/disarm status (`@md-reddevil `__) - Fix incorrect command status endpoint (`@md-reddevil `__) - Add extra debug logging @@ -139,18 +164,18 @@ Wifi status reported in dBm again, instead of bars (which is great). Also, the 0.10.2 (2018-10-30) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Set minimum required version of the requests library to 2.20.0 due to vulnerability in earlier releases. - When multiple networks detected, changed log level to ``warning`` from ``error`` 0.10.1 (2018-10-18) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Fix re-authorization bug (fixes `#101 `__) - Log an error if saving video that doesn't exist 0.10.0 (2018-10-16) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Moved all API calls to own module for easier maintainability - Added network ids to sync module and cameras to allow for multi-network use - Removed dependency on video existance prior to camera setup (fixes `#93 `__) @@ -162,7 +187,7 @@ Wifi status reported in dBm again, instead of bars (which is great). Also, the - Check if retrieved clip is "None" prior to storing in cache 0.9.0 (2018-09-27) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Complete code refactoring to enable future multi-sync module support - Add image and video caching to the cameras - Add internal throttling of system refresh @@ -173,7 +198,7 @@ Wifi status reported in dBm again, instead of bars (which is great). Also, the 0.8.1 (2018-09-24) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Update requirements_test.txt - Update linter versions - Fix pylint warnings @@ -189,18 +214,18 @@ Wifi status reported in dBm again, instead of bars (which is great). Also, the - Reset the value every system refresh 0.8.0 (2018-05-21) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Added support for battery voltage level (fixes `#64 `__) - Added motion detection per camera - Added fully accessible camera configuration dict - Added celcius property to camera (fixes `#60 `__) 0.7.1 (2018-05-09) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Fixed pip 10 import issue during setup (`@fronzbot `__) 0.7.0 (2018-02-08) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Fixed style errors for bumped pydocstring and pylint versions - Changed Blink.cameras dictionary to be case-insensitive (fixes `#35 `__) - Changed api endpoint for video extraction (fixes `#35 `__ and `#41 `__) @@ -214,7 +239,7 @@ Wifi status reported in dBm again, instead of bars (which is great). Also, the - Added ``attributes`` dictionary to camera object 0.6.0 (2017-05-12) -^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~ - Removed redundent properties that only called hidden variables - Revised request wrapper function to be more intelligent - Added tests to ensure exceptions are caught and handled (100% coverage!) @@ -222,44 +247,44 @@ Wifi status reported in dBm again, instead of bars (which is great). Also, the - Added battery level string to reduce confusion with the way Blink reports battery level as integer from 0 to 3 0.5.2 (2017-03-12) -^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~ - Fixed packaging mishap, same as 0.5.0 otherwise 0.5.0 (2017-03-12) -^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~ - Fixed region handling problem - Added rest.piri subdomain as a backup if region can't be found - Improved the file writing function - Large test coverage increase 0.4.4 (2017-03-06) -^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~ - Fixed bug where region id was not being set in the header 0.4.3 (2017-03-05) -^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~ - Changed to bdist_wheel release 0.4.2 (2017-01-28) -^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~ - Fixed inability to retrieve motion data due to Key Error 0.4.1 (2017-01-27) -^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~ - Fixed refresh bug (0.3.1 did not actually fix the problem) - Image refresh routine added (per camera) - Dictionary of thumbnails per camera added - Improved test coverage 0.3.1 (2017-01-25) -^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~ - Fixed refresh bug (Key Error) 0.3.0 (2017-01-25) -^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~ - Added device id to camera lookup table - Added image to file method 0.2.0 (2017-01-21) -^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~ - Initial release of blinkpy diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md deleted file mode 100644 index 6ac21bdc..00000000 --- a/CONTRIBUTING.md +++ /dev/null @@ -1,89 +0,0 @@ -# Contributing to blinkpy - -Everyone is welcome to contribute to blinkpy! The process to get started is described below. - -## Fork the Repository - -You can do this right in github: just click the 'fork' button at the top right. - -## Start Developing - -1. Setup Local Repository - ```shell - $ git clone https://github.com//blinkpy.git - $ cd blinkpy - $ git remote add upstream https://github.com/fronzbot/blinkpy.git - ``` - -2. Create a Local Branch - - First, you will want to create a new branch to hold your changes: - ``git checkout -b `` - - -3. Make changes - - Now you can make changes to your code. It is worthwhile to test your code as you progress (see the **Testing** section) - -4. Commit Your Changes - - To commit changes to your branch, simply add the files you want and the commit them to the branch. After that, you can push to your fork on GitHub: - ```shell - $ git add . - $ git commit -m "Put your commit text here. Please be concise, but descriptive." - $ git push origin HEAD - ``` - -5. Submit your pull request on GitHub - - - On GitHub, navigate to the [blinkpy](https://github.com/fronzbot/blinkpy) repository. - - In the "Branch" menu, choose the branch that contains your commits (from your fork). - - To the right of the Branch menu, click New pull request. - - The base branch dropdown menu should read `dev`. Use the compare branch drop-down menu to choose the branch you made your changes in. - - Type a title and complete the provided description for your pull request. - - Click Create pull request. - - More detailed instructions can be found here: [Creating a Pull Request](https://help.github.com/articles/creating-a-pull-request/) - -6. Prior to merge approval - - Finally, the `blinkpy` repository uses continuous integration tools to run tests prior to merging. If there are any problems, you will see a red 'X' next to your pull request. To see what's wrong, you can find your pull request [here](https://travis- ci.org/fronzbot/blinkpy/pull_requests) and click on the failing test to see the logs. Those logs will indicate, as best as they can, what is causing that test to fail. - - -## Testing - -It is important to test the code to make sure your changes don't break anything major and that they pass PEP8 style conventions. -First, you need to locally install ``tox`` - -```shell -$ pip3 install tox -``` - -You can then run all of the tests with the following command: - -```shell -$ tox -``` - -### Tips - -If you only want to see if you can pass the local tests, you can run `tox -e py35` (or whatever python version you have installed. Only `py35`, `py36`, and `py37` will be accepted). If you just want to check for style violations, you can run `tox -e lint`. Regardless, when you submit a pull request, your code MUST pass both the unit tests, and the linters. - -If you need to change anything in `requirements.txt` for any reason, you'll want to regenerate the virtual envrionments used by `tox` by running with the `-r` flag: `tox -r` - -If you want to run a single test (perhaps you only changed a small thing in one file) you can run `tox -e py35 -- tests/.py -x`. This will run the test `.py` and stop testing upon the first failure, making it easier to figure out why a particular test might be failing. The test structure mimics the library structure, so if you changed something in `sync_module.py`, the associated test file would be in `test_sync_module.py` (ie. the filename is prepended with `test_`. - -# Catching Up With Reality - -If your code is taking a while to develop, you may be behind the ``dev`` branch, in which case you need to catch up before creating your pull-request. To do this you can run ``git rebase`` as follows (running this on your local branch): - -```shell -$ git fetch upstream dev -$ git rebase upstream/dev -``` - -If rebase detects conflicts, repeat the following process until all changes have been resolved: - -1. ``git status`` shows you the filw with a conflict. You will need to edit that file and resolve the lines between ``<<<< | >>>>`. -2. Add the modified file: ``git add `` or ``git add .``. -3. Continue rebase: ``git rebase --continue``. -4. Repeat until all conflicts resolved. diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst new file mode 100644 index 00000000..668c7c21 --- /dev/null +++ b/CONTRIBUTING.rst @@ -0,0 +1,109 @@ +======================== +Contributing to blinkpy +======================== + +Everyone is welcome to contribute to blinkpy! The process to get started is described below. + + +Fork the Repository +------------------- + +You can do this right in github: just click the 'fork' button at the top right. + +Start Developing +----------------- + +1. Setup Local Repository + .. code:: bash + + $ git clone https://github.com//blinkpy.git + $ cd blinkpy + $ git remote add upstream https://github.com/fronzbot/blinkpy.git + +2. Create virtualenv and install dependencies + + .. code:: bash + + $ python -m venv venv + $ source venv/bin/activate + $ pip install -r requirements.txt + $ pip install -r requirements_test.txt + $ pre-commit install + +3. Create a Local Branch + + First, you will want to create a new branch to hold your changes: + ``git checkout -b `` + + +4. Make changes + + Now you can make changes to your code. It is worthwhile to test your code as you progress (see the **Testing** section) + +5. Commit Your Changes + + To commit changes to your branch, simply add the files you want and the commit them to the branch. After that, you can push to your fork on GitHub: + + .. code:: bash + + $ git add . + $ git commit + $ git push origin HEAD + +6. Submit your pull request on GitHub + + - On GitHub, navigate to the `blinkpy `__ repository. + - In the "Branch" menu, choose the branch that contains your commits (from your fork). + - To the right of the Branch menu, click New pull request. + - The base branch dropdown menu should read ``dev``. Use the compare branch drop-down menu to choose the branch you made your changes in. + - Type a title and complete the provided description for your pull request. + - Click Create pull request. + - More detailed instructions can be found here: `Creating a Pull Request` `__ + +7. Prior to merge approval + + Finally, the ``blinkpy`` repository uses continuous integration tools to run tests prior to merging. If there are any problems, you will see a red 'X' next to your pull request. + + +Testing +------- + +It is important to test the code to make sure your changes don't break anything major and that they pass PEP8 style conventions. +First, you need to locally install ``tox`` + +.. code:: bash + + $ pip install tox + + +You can then run all of the tests with the following command: + +.. code:: bash + + $ tox + +**Tips** + +If you only want to see if you can pass the local tests, you can run ``tox -e py37`` (or whatever python version you have installed. Only ``py36``, ``py37``, and ``py38`` will be accepted). If you just want to check for style violations, you can run ``tox -e lint``. Regardless, when you submit a pull request, your code MUST pass both the unit tests, and the linters. + +If you need to change anything in ``requirements.txt`` for any reason, you'll want to regenerate the virtual envrionments used by ``tox`` by running with the ``-r`` flag: ``tox -r`` + +If you want to run a single test (perhaps you only changed a small thing in one file) you can run ``tox -e py37 -- tests/.py -x``. This will run the test ``.py`` and stop testing upon the first failure, making it easier to figure out why a particular test might be failing. The test structure mimics the library structure, so if you changed something in ``sync_module.py``, the associated test file would be in ``test_sync_module.py`` (ie. the filename is prepended with ``test_``. + + +Catching Up With Reality +------------------------- + +If your code is taking a while to develop, you may be behind the ``dev`` branch, in which case you need to catch up before creating your pull-request. To do this you can run ``git rebase`` as follows (running this on your local branch): + +.. code:: bash + + $ git fetch upstream dev + $ git rebase upstream/dev + +If rebase detects conflicts, repeat the following process until all changes have been resolved: + +1. ``git status`` shows you the filw with a conflict. You will need to edit that file and resolve the lines between ``<<<< | >>>>``. +2. Add the modified file: ``git add `` or ``git add .``. +3. Continue rebase: ``git rebase --continue``. +4. Repeat until all conflicts resolved. diff --git a/README.rst b/README.rst index f99e40e7..25e8221a 100644 --- a/README.rst +++ b/README.rst @@ -4,10 +4,9 @@ A Python library for the Blink Camera system (Python 3.6+) Like the library? Consider buying me a cup of coffee! -|Donate| +`Buy me a Coffee! `__ -Disclaimer: -~~~~~~~~~~~~~~~ +**Disclaimer:** Published under the MIT license - See LICENSE file for more details. "Blink Wire-Free HS Home Monitoring & Alert Systems" is a trademark owned by Immedia Inc., see www.blinkforhome.com for more information. @@ -18,11 +17,11 @@ Original protocol hacking by MattTW : https://github.com/MattTW/BlinkMonitorProt API calls faster than 60 seconds is not recommended as it can overwhelm Blink's servers. Please use this module responsibly. Installation -================ -``pip3 install blinkpy`` +------------- +``pip install blinkpy`` Installing Development Version -================================== +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ To install the current development version, perform the following steps. Note that the following will create a blinkpy directory in your home area: .. code:: bash @@ -35,69 +34,96 @@ To install the current development version, perform the following steps. Note t $ pip3 install --upgrade dist/*.whl -If you'd like to contribute to this library, please read the `contributing instructions `__. +If you'd like to contribute to this library, please read the `contributing instructions `__. For more information on how to use this library, please `read the docs `__. Purpose -=========== +------- This library was built with the intention of allowing easy communication with Blink camera systems, specifically to support the `Blink component `__ in `homeassistant `__. Quick Start ============= -The simplest way to use this package from a terminal is to call ``Blink.start()`` which will prompt for your Blink username and password and then log you in. Alternatively, you can instantiate the Blink class with a username and password, and call ``Blink.start()`` to login and setup without prompt, as shown below. In addition, http requests are throttled internally via use of the ``Blink.refresh_rate`` variable, which can be set at initialization and defaults to 30 seconds. +The simplest way to use this package from a terminal is to call ``Blink.start()`` which will prompt for your Blink username and password and then log you in. In addition, http requests are throttled internally via use of the ``Blink.refresh_rate`` variable, which can be set at initialization and defaults to 30 seconds. .. code:: python - from blinkpy import blinkpy - blink = blinkpy.Blink(username='YOUR USER NAME', password='YOUR PASSWORD', refresh_rate=30) + from blinkpy.blinkpy import Blink + + blink = Blink() blink.start() -At startup, you may be prompted for a verification key. Just enter this in the command-line prompt. If you just receive a verification email asking to validate access for your device, enter nothing at this prompt. To avoid any command-line interaction, call the ``Blink`` class with the ``no_prompt=True`` flag. Instead, once you receive the verification email, call the following functions: + +This flow will prompt you for your username and password. Once entered, if you likely will need to send a 2FA key to the blink servers (this pin is sent to your email address). When you receive this pin, enter at the prompt and the Blink library will proceed with setup. + +Starting blink without a prompt +------------------------------- +In some cases, having an interactive command-line session is not desired. In this case, you will need to set the ``Blink.auth.no_prompt`` value to ``True``. In addition, since you will not be prompted with a username and password, you must supply the login data to the blink authentication handler. This is best done by instantiating your own auth handler with a dictionary containing at least your username and password. + +.. code:: python + + from blinkpy.blinkpy import Blink + from blinkpy.auth import Auth + + blink = Blink() + # Can set no_prompt when initializing auth handler + auth = Auth({"username": , "password": }, no_prompt=True) + blink.auth = auth + blink.start() + + +Since you will not be prompted for any 2FA pin, you must call the ``blink.auth.send_auth_key`` function. There are two required parameters: the ``blink`` object as well as the ``key`` you received from Blink for 2FA: .. code:: python - blink.login_handler.send_auth_key(blink, VERIFICATION_KEY) + auth.send_auth_key(blink, ) blink.setup_post_verify() -In addition, you can also save your credentials in a json file and initialize Blink with the credential file as follows: + +Supplying credentials from file +-------------------------------- +Other use cases may involved loading credentials from a file. This file must be ``json`` formatted and contain a minimum of ``username`` and ``password``. A built in function in the ``blinkpy.helpers.util`` module can aid in loading this file. Note, if ``no_prompt`` is desired, a similar flow can be followed as above. .. code:: python - from blinkpy import blinkpy - blink = blinkpy.Blink(cred_file="path/to/credentials.json") + from blinkpy.blinkpy import Blink + from blinkpy.auth import Auth + from blinkpy.helpers.util import json_load + + blink = Blink() + auth = Auth(json_load("")) + blink.auth = auth blink.start() -The credential file must be json formatted with a ``username`` and ``password`` key like follows: -.. code:: json +Saving credentials +------------------- +This library also allows you to save your credentials to use in future sessions. Saved information includes authentication tokens as well as unique ids which should allow for a more streamlined experience and limits the frequency of login requests. This data can be saved as follows (it can then be loaded by following the instructions above for supplying credentials from a file): + +.. code:: python + + blink.save("") - { - "username": "YOUR USER NAME", - "password": "YOUR PASSWORD" - } +Getting cameras +---------------- Cameras are instantiated as individual ``BlinkCamera`` classes within a ``BlinkSyncModule`` instance. All of your sync modules are stored within the ``Blink.sync`` dictionary and can be accessed using the name of the sync module as the key (this is the name of your sync module in the Blink App). The below code will display cameras and their available attributes: .. code:: python - from blinkpy import blinkpy - - blink = blinkpy.Blink(username='YOUR USER NAME', password='YOUR PASSWORD') - blink.start() - for name, camera in blink.cameras.items(): print(name) # Name of the camera print(camera.attributes) # Print available attributes of camera -The most recent images and videos can be accessed as a bytes-object via internal variables. These can be updated with calls to ``Blink.refresh()`` but will only make a request if motion has been detected or other changes have been found. This can be overridden with the ``force_cache`` flag, but this should be used for debugging only since it overrides the internal request throttling. + +The most recent images and videos can be accessed as a bytes-object via internal variables. These can be updated with calls to ``Blink.refresh()`` but will only make a request if motion has been detected or other changes have been found. This can be overridden with the ``force`` flag, but this should be used for debugging only since it overrides the internal request throttling. .. code:: python camera = blink.cameras['SOME CAMERA NAME'] - blink.refresh(force_cache=True) # force a cache update USE WITH CAUTION + blink.refresh(force=True) # force a cache update USE WITH CAUTION camera.image_from_cache.raw # bytes-like image object (jpg) camera.video_from_cache.raw # bytes-like video object (mp4) @@ -110,15 +136,50 @@ The ``blinkpy`` api also allows for saving images and videos to a file and snapp blink.refresh() # Get new information from server camera.image_to_file('/local/path/for/image.jpg') camera.video_to_file('/local/path/for/video.mp4') - + + +Arming Blink +------------- +Methods exist to arm/disarm the sync module, as well as enable/disable motion detection for individual cameras. This is done as follows: + +.. code:: python + + # Arm a sync module + blink.sync["SYNC MODULE NAME"].arm = True + + # Disarm a sync module + blink.sync["SYNC MODULE NAME"].disarm = False + + # Print arm status of a sync module - a system refresh should be performed first + blink.refresh() + sync = blink.sync["SYNC MODULE NAME"] + print(f"{sync.name} status: {sync.arm}") + +Similar methods exist for individual cameras: + +.. code:: python + + camera = blink.cameras["SOME CAMERA NAME"] + + # Enable motion detection on a camera + camera.arm = True + + # Disable motion detection on a camera + camera.arm = False + + # Print arm status of a sync module - a system refresh should be performed first + blink.refresh() + print(f"{camera.name} status: {camera.arm}") + + +Download videos +---------------- You can also use this library to download all videos from the server. In order to do this, you must specify a ``path``. You may also specifiy a how far back in time to go to retrieve videos via the ``since=`` variable (a simple string such as ``"2017/09/21"`` is sufficient), as well as how many pages to traverse via the ``page=`` variable. Note that by default, the library will search the first ten pages which is sufficient in most use cases. Additionally, you can specidy one or more cameras via the ``camera=`` property. This can be a single string indicating the name of the camera, or a list of camera names. By default, it is set to the string ``'all'`` to grab videos from all cameras. Example usage, which downloads all videos recorded since July 4th, 2018 at 9:34am to the ``/home/blink`` directory: .. code:: python - blink = blinkpy.Blink(username="YOUR USER NAME", password="YOUR PASSWORD") - blink.start() blink.download_videos('/home/blink', since='2018/07/04 09:34') @@ -130,7 +191,5 @@ Example usage, which downloads all videos recorded since July 4th, 2018 at 9:34a :target: https://pypi.python.org/pypi/blinkpy .. |Docs| image:: https://readthedocs.org/projects/blinkpy/badge/?version=latest :target: http://blinkpy.readthedocs.io/en/latest/?badge=latest -.. |Donate| image:: https://www.paypalobjects.com/en_US/i/btn/btn_donateCC_LG.gif - :target: https://www.paypal.com/cgi-bin/webscr?cmd=_s-xclick&hosted_button_id=UR6Z2B8GXYUCC .. |Codestyle| image:: https://img.shields.io/badge/code%20style-black-000000.svg :target: https://github.com/psf/black diff --git a/app/app.py b/app/app.py index 11517a13..081bcdfc 100644 --- a/app/app.py +++ b/app/app.py @@ -1,11 +1,12 @@ """Script to run blinkpy as an app.""" from os import environ from datetime import datetime, timedelta -from blinkpy import blinkpy +from blinkpy.blinkpy import Blink +from blinkpy.auth import Auth +from blinkpy.helpers.util import json_load -USERNAME = environ.get("USERNAME") -PASSWORD = environ.get("PASSWORD") +CREDFILE = environ.get("CREDFILE") TIMEDELTA = timedelta(environ.get("TIMEDELTA", 1)) @@ -21,11 +22,18 @@ def download_videos(blink, save_dir="/media"): def start(): """Startup blink app.""" - blink = blinkpy.Blink(username=USERNAME, password=PASSWORD) + blink = Blink() + blink.auth = Auth(json_load(CREDFILE)) blink.start() return blink +def main(): + """Run the app.""" + blink = start() + download_videos(blink) + blink.save(CREDFILE) + + if __name__ == "__main__": - BLINK = start() - download_videos(BLINK) + main() diff --git a/blinkpy/api.py b/blinkpy/api.py index c850a2b9..93d3ab3a 100644 --- a/blinkpy/api.py +++ b/blinkpy/api.py @@ -2,55 +2,40 @@ import logging from json import dumps -import blinkpy.helpers.errors as ERROR -from blinkpy.helpers.util import http_req, get_time, BlinkException, Throttle -from blinkpy.helpers.constants import DEFAULT_URL +from blinkpy.helpers.util import get_time, Throttle +from blinkpy.helpers.constants import DEFAULT_URL, TIMEOUT _LOGGER = logging.getLogger(__name__) -MIN_THROTTLE_TIME = 2 +MIN_THROTTLE_TIME = 5 def request_login( - blink, - url, - username, - password, - notification_key, - uid, - is_retry=False, - device_id="Blinkpy", + auth, url, login_data, is_retry=False, ): """ Login request. - :param blink: Blink instance. + :param auth: Auth instance. :param url: Login url. - :param username: Blink username. - :param password: Blink password. - :param notification_key: Randomly genereated key. - :param uid: Randomly generated unique id key. - :param is_retry: Is this part of a re-authorization attempt? - :param device_id: Name of application to send at login. + :login_data: Dictionary containing blink login data. """ headers = {"Host": DEFAULT_URL, "Content-Type": "application/json"} data = dumps( { - "email": username, - "password": password, - "notification_key": notification_key, - "unique_id": uid, + "email": login_data["username"], + "password": login_data["password"], + "notification_key": login_data["notification_key"], + "unique_id": login_data["uid"], "app_version": "6.0.7 (520300) #afb0be72a", + "device_identifier": login_data["device_id"], "client_name": "Computer", "client_type": "android", - "device_identifier": device_id, - "device_name": "Blinkpy", "os_version": "5.1.1", - "reauth": "true", + "reauth": "false", } ) - return http_req( - blink, + return auth.query( url=url, headers=headers, data=data, @@ -60,25 +45,35 @@ def request_login( ) -def request_verify(blink, verify_key): +def request_verify(auth, blink, verify_key): """Send verification key to blink servers.""" - url = "{}/api/v4/account/{}/client/{}/pin/verify".format( - blink.urls.base_url, blink.account_id, blink.client_id - ) + url = f"{blink.urls.base_url}/api/v4/account/{blink.account_id}/client/{blink.client_id}/pin/verify" data = dumps({"pin": verify_key}) - return http_req( - blink, - url=url, - headers=blink.auth_header, - data=data, - json_resp=False, - reqtype="post", + return auth.query( + url=url, headers=auth.header, data=data, json_resp=False, reqtype="post", ) def request_networks(blink): """Request all networks information.""" - url = "{}/networks".format(blink.urls.base_url) + url = f"{blink.urls.base_url}/networks" + return http_get(blink, url) + + +def request_network_update(blink, network): + """ + Request network update. + + :param blink: Blink instance. + :param network: Sync module network id. + """ + url = f"{blink.urls.base_url}/network/{network}/update" + return http_post(blink, url) + + +def request_user(blink): + """Get user information from blink servers.""" + url = f"{blink.urls.base_url}/user" return http_get(blink, url) @@ -89,7 +84,7 @@ def request_network_status(blink, network): :param blink: Blink instance. :param network: Sync module network id. """ - url = "{}/network/{}".format(blink.urls.base_url, network) + url = f"{blink.urls.base_url}/network/{network}" return http_get(blink, url) @@ -100,7 +95,7 @@ def request_syncmodule(blink, network): :param blink: Blink instance. :param network: Sync module network id. """ - url = "{}/network/{}/syncmodules".format(blink.urls.base_url, network) + url = f"{blink.urls.base_url}/network/{network}/syncmodules" return http_get(blink, url) @@ -112,7 +107,7 @@ def request_system_arm(blink, network): :param blink: Blink instance. :param network: Sync module network id. """ - url = "{}/network/{}/arm".format(blink.urls.base_url, network) + url = f"{blink.urls.base_url}/api/v1/accounts/{blink.account_id}/networks/{network}/state/arm" return http_post(blink, url) @@ -124,7 +119,7 @@ def request_system_disarm(blink, network): :param blink: Blink instance. :param network: Sync module network id. """ - url = "{}/network/{}/disarm".format(blink.urls.base_url, network) + url = f"{blink.urls.base_url}/api/v1/accounts/{blink.account_id}/networks/{network}/state/disarm" return http_post(blink, url) @@ -136,16 +131,14 @@ def request_command_status(blink, network, command_id): :param network: Sync module network id. :param command_id: Command id to check. """ - url = "{}/network/{}/command/{}".format(blink.urls.base_url, network, command_id) + url = f"{blink.urls.base_url}/network/{network}/command/{command_id}" return http_get(blink, url) @Throttle(seconds=MIN_THROTTLE_TIME) def request_homescreen(blink): """Request homescreen info.""" - url = "{}/api/v3/accounts/{}/homescreen".format( - blink.urls.base_url, blink.account_id - ) + url = f"{blink.urls.base_url}/api/v3/accounts/{blink.account_id}/homescreen" return http_get(blink, url) @@ -157,7 +150,7 @@ def request_sync_events(blink, network): :param blink: Blink instance. :param network: Sync module network id. """ - url = "{}/events/network/{}".format(blink.urls.base_url, network) + url = f"{blink.urls.base_url}/events/network/{network}" return http_get(blink, url) @@ -170,9 +163,7 @@ def request_new_image(blink, network, camera_id): :param network: Sync module network id. :param camera_id: Camera ID of camera to request new image from. """ - url = "{}/network/{}/camera/{}/thumbnail".format( - blink.urls.base_url, network, camera_id - ) + url = f"{blink.urls.base_url}/network/{network}/camera/{camera_id}/thumbnail" return http_post(blink, url) @@ -185,14 +176,14 @@ def request_new_video(blink, network, camera_id): :param network: Sync module network id. :param camera_id: Camera ID of camera to request new video from. """ - url = "{}/network/{}/camera/{}/clip".format(blink.urls.base_url, network, camera_id) + url = f"{blink.urls.base_url}/network/{network}/camera/{camera_id}/clip" return http_post(blink, url) @Throttle(seconds=MIN_THROTTLE_TIME) def request_video_count(blink): """Request total video count.""" - url = "{}/api/v2/videos/count".format(blink.urls.base_url) + url = f"{blink.urls.base_url}/api/v2/videos/count" return http_get(blink, url) @@ -205,9 +196,7 @@ def request_videos(blink, time=None, page=0): :param page: Page number to get videos from. """ timestamp = get_time(time) - url = "{}/api/v1/accounts/{}/media/changed?since={}&page={}".format( - blink.urls.base_url, blink.account_id, timestamp, page - ) + url = f"{blink.urls.base_url}/api/v1/accounts/{blink.account_id}/media/changed?since={timestamp}&page={page}" return http_get(blink, url) @@ -218,7 +207,7 @@ def request_cameras(blink, network): :param Blink: Blink instance. :param network: Sync module network id. """ - url = "{}/network/{}/cameras".format(blink.urls.base_url, network) + url = f"{blink.urls.base_url}/network/{network}/cameras" return http_get(blink, url) @@ -230,12 +219,34 @@ def request_camera_info(blink, network, camera_id): :param network: Sync module network id. :param camera_id: Camera ID of camera to request info from. """ - url = "{}/network/{}/camera/{}/config".format( - blink.urls.base_url, network, camera_id - ) + url = f"{blink.urls.base_url}/network/{network}/camera/{camera_id}/config" + return http_get(blink, url) + + +def request_camera_usage(blink): + """ + Request camera status. + + :param blink: Blink instance. + """ + url = f"{blink.urls.base_url}/api/v1/camera/usage" return http_get(blink, url) +def request_camera_liveview(blink, network, camera_id): + """ + Request camera liveview. + + :param blink: Blink instance. + :param network: Sync module network id. + :param camera_id: Camera ID of camera to request liveview from. + """ + url = ( + f"{blink.urls.base_url}/api/v3/networks/{network}/cameras/{camera_id}/liveview" + ) + return http_post(blink, url) + + def request_camera_sensors(blink, network, camera_id): """ Request camera sensor info for one camera. @@ -244,9 +255,7 @@ def request_camera_sensors(blink, network, camera_id): :param network: Sync module network id. :param camera_id: Camera ID of camera to request sesnor info from. """ - url = "{}/network/{}/camera/{}/signals".format( - blink.urls.base_url, network, camera_id - ) + url = f"{blink.urls.base_url}/network/{network}/camera/{camera_id}/signals" return http_get(blink, url) @@ -259,9 +268,7 @@ def request_motion_detection_enable(blink, network, camera_id): :param network: Sync module network id. :param camera_id: Camera ID of camera to enable. """ - url = "{}/network/{}/camera/{}/enable".format( - blink.urls.base_url, network, camera_id - ) + url = f"{blink.urls.base_url}/network/{network}/camera/{camera_id}/enable" return http_post(blink, url) @@ -273,13 +280,11 @@ def request_motion_detection_disable(blink, network, camera_id): :param network: Sync module network id. :param camera_id: Camera ID of camera to disable. """ - url = "{}/network/{}/camera/{}/disable".format( - blink.urls.base_url, network, camera_id - ) + url = f"{blink.urls.base_url}/network/{network}/camera/{camera_id}/disable" return http_post(blink, url) -def http_get(blink, url, stream=False, json=True, is_retry=False): +def http_get(blink, url, stream=False, json=True, is_retry=False, timeout=TIMEOUT): """ Perform an http get request. @@ -288,13 +293,10 @@ def http_get(blink, url, stream=False, json=True, is_retry=False): :param json: Return json response? TRUE/False :param is_retry: Is this part of a re-auth attempt? """ - if blink.auth_header is None: - raise BlinkException(ERROR.AUTH_TOKEN) _LOGGER.debug("Making GET request to %s", url) - return http_req( - blink, + return blink.auth.query( url=url, - headers=blink.auth_header, + headers=blink.auth.header, reqtype="get", stream=stream, json_resp=json, @@ -302,16 +304,14 @@ def http_get(blink, url, stream=False, json=True, is_retry=False): ) -def http_post(blink, url, is_retry=False): +def http_post(blink, url, is_retry=False, timeout=TIMEOUT): """ Perform an http post request. :param url: URL to perfom post request. :param is_retry: Is this part of a re-auth attempt? """ - if blink.auth_header is None: - raise BlinkException(ERROR.AUTH_TOKEN) _LOGGER.debug("Making POST request to %s", url) - return http_req( - blink, url=url, headers=blink.auth_header, reqtype="post", is_retry=is_retry + return blink.auth.query( + url=url, headers=blink.auth.header, reqtype="post", is_retry=is_retry ) diff --git a/blinkpy/auth.py b/blinkpy/auth.py new file mode 100644 index 00000000..59950da4 --- /dev/null +++ b/blinkpy/auth.py @@ -0,0 +1,249 @@ +"""Login handler for blink.""" +import logging +from functools import partial +from requests import Request, Session, exceptions +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry +from blinkpy import api +from blinkpy.helpers import util +from blinkpy.helpers.constants import BLINK_URL, LOGIN_ENDPOINT, TIMEOUT + +_LOGGER = logging.getLogger(__name__) + + +class Auth: + """Class to handle login communication.""" + + def __init__(self, login_data=None, no_prompt=False): + """ + Initialize auth handler. + + :param login_data: dictionary for login data + must contain the following: + - username + - password + :param no_prompt: Should any user input prompts + be supressed? True/FALSE + """ + if login_data is None: + login_data = {} + self.data = login_data + self.token = login_data.get("token", None) + self.host = login_data.get("host", None) + self.region_id = login_data.get("region_id", None) + self.client_id = login_data.get("client_id", None) + self.account_id = login_data.get("account_id", None) + self.login_response = None + self.is_errored = False + self.no_prompt = no_prompt + self.session = self.create_session() + + @property + def login_attributes(self): + """Return a dictionary of login attributes.""" + self.data["token"] = self.token + self.data["host"] = self.host + self.data["region_id"] = self.region_id + self.data["client_id"] = self.client_id + self.data["account_id"] = self.account_id + return self.data + + @property + def header(self): + """Return authorization header.""" + if self.token is None: + return None + return {"TOKEN_AUTH": self.token} + + def create_session(self, opts=None): + """Create a session for blink communication.""" + if opts is None: + opts = {} + backoff = opts.get("backoff", 1) + retries = opts.get("retries", 3) + retry_list = opts.get("retry_list", [429, 500, 502, 503, 504]) + sess = Session() + assert_status_hook = [ + lambda response, *args, **kwargs: response.raise_for_status() + ] + sess.hooks["response"] = assert_status_hook + retry = Retry( + total=retries, backoff_factor=backoff, status_forcelist=retry_list + ) + adapter = HTTPAdapter(max_retries=retry) + sess.mount("https://", adapter) + sess.mount("http://", adapter) + sess.get = partial(sess.get, timeout=TIMEOUT) + return sess + + def prepare_request(self, url, headers, data, reqtype): + """Prepare a request.""" + req = Request(reqtype.upper(), url, headers=headers, data=data) + return req.prepare() + + def validate_login(self): + """Check login information and prompt if not available.""" + self.data["username"] = self.data.get("username", None) + self.data["password"] = self.data.get("password", None) + if not self.no_prompt: + self.data = util.prompt_login_data(self.data) + + self.data = util.validate_login_data(self.data) + + def login(self, login_url=LOGIN_ENDPOINT): + """Attempt login to blink servers.""" + self.validate_login() + _LOGGER.info("Attempting login with %s", login_url) + response = api.request_login(self, login_url, self.data, is_retry=False,) + try: + if response.status_code == 200: + return response.json() + raise LoginError + except AttributeError: + raise LoginError + + def refresh_token(self): + """Refresh auth token.""" + self.is_errored = True + try: + _LOGGER.info("Token expired, attempting automatic refresh.") + self.login_response = self.login() + self.region_id = self.login_response["region"]["tier"] + self.host = f"{self.region_id}.{BLINK_URL}" + self.token = self.login_response["authtoken"]["authtoken"] + self.client_id = self.login_response["client"]["id"] + self.account_id = self.login_response["account"]["id"] + self.is_errored = False + except LoginError: + _LOGGER.error("Login endpoint failed. Try again later.") + raise TokenRefreshFailed + except (TypeError, KeyError): + _LOGGER.error("Malformed login response: %s", self.login_response) + raise TokenRefreshFailed + return True + + def startup(self): + """Initialize tokens for communication.""" + self.validate_login() + if None in self.login_attributes.values(): + self.refresh_token() + + def validate_response(self, response, json_resp): + """Check for valid response.""" + if not json_resp: + self.is_errored = False + return response + self.is_errored = True + try: + if response.status_code in [101, 401]: + raise UnauthorizedError + if response.status_code == 404: + raise exceptions.ConnectionError + json_data = response.json() + except KeyError: + pass + except (AttributeError, ValueError): + raise BlinkBadResponse + + self.is_errored = False + return json_data + + def query( + self, + url=None, + data=None, + headers=None, + reqtype="get", + stream=False, + json_resp=True, + is_retry=False, + timeout=TIMEOUT, + ): + """ + Perform server requests. + + :param url: URL to perform request + :param data: Data to send + :param headers: Headers to send + :param reqtype: Can be 'get' or 'post' (default: 'get') + :param stream: Stream response? True/FALSE + :param json_resp: Return JSON response? TRUE/False + :param is_retry: Is this part of a re-auth attempt? True/FALSE + """ + req = self.prepare_request(url, headers, data, reqtype) + try: + response = self.session.send(req, stream=stream, timeout=timeout) + return self.validate_response(response, json_resp) + except (exceptions.ConnectionError, exceptions.Timeout): + _LOGGER.error( + "Connection error. Endpoint %s possibly down or throttled.", url, + ) + except BlinkBadResponse: + code = None + reason = None + try: + code = response.status_code + reason = response.reason + except AttributeError: + pass + _LOGGER.error( + "Expected json response from %s, but received: %s: %s", + url, + code, + reason, + ) + except UnauthorizedError: + try: + if not is_retry: + self.refresh_token() + return self.query( + url=url, + data=data, + headers=self.header, + reqtype=reqtype, + stream=stream, + json_resp=json_resp, + is_retry=True, + timeout=timeout, + ) + _LOGGER.error("Unable to access %s after token refresh.", url) + except TokenRefreshFailed: + _LOGGER.error("Unable to refresh token.") + return None + + def send_auth_key(self, blink, key): + """Send 2FA key to blink servers.""" + if key is not None: + response = api.request_verify(self, blink, key) + try: + json_resp = response.json() + blink.available = json_resp["valid"] + except (KeyError, TypeError): + _LOGGER.error("Did not receive valid response from server.") + return False + return True + + def check_key_required(self): + """Check if 2FA key is required.""" + try: + if self.login_response["client"]["verification_required"]: + return True + except (KeyError, TypeError): + pass + return False + + +class TokenRefreshFailed(Exception): + """Class to throw failed refresh exception.""" + + +class LoginError(Exception): + """Class to throw failed login exception.""" + + +class BlinkBadResponse(Exception): + """Class to throw bad json response exception.""" + + +class UnauthorizedError(Exception): + """Class to throw an unauthorized access error.""" diff --git a/blinkpy/blinkpy.py b/blinkpy/blinkpy.py index 21a711c1..913aa407 100644 --- a/blinkpy/blinkpy.py +++ b/blinkpy/blinkpy.py @@ -23,24 +23,16 @@ from slugify import slugify from blinkpy import api -from blinkpy.sync_module import BlinkSyncModule -from blinkpy.helpers.util import ( - create_session, - merge_dicts, - get_time, - BlinkURLHandler, - Throttle, -) +from blinkpy.sync_module import BlinkSyncModule, BlinkOwl +from blinkpy.helpers import util from blinkpy.helpers.constants import ( - BLINK_URL, DEFAULT_MOTION_INTERVAL, DEFAULT_REFRESH, MIN_THROTTLE_TIME, - LOGIN_URLS, + TIMEOUT_MEDIA, ) from blinkpy.helpers.constants import __version__ -from blinkpy.login_handler import LoginHandler - +from blinkpy.auth import Auth, TokenRefreshFailed, LoginError _LOGGER = logging.getLogger(__name__) @@ -50,201 +42,204 @@ class Blink: def __init__( self, - username=None, - password=None, - cred_file=None, refresh_rate=DEFAULT_REFRESH, motion_interval=DEFAULT_MOTION_INTERVAL, - legacy_subdomain=False, - no_prompt=False, - persist_key=None, - device_id="Blinkpy", + no_owls=False, ): """ Initialize Blink system. - :param username: Blink username (usually email address) - :param password: Blink password - :param cred_file: JSON formatted file to store credentials. - If username and password are given, file - is ignored. Otherwise, username and password - are loaded from file. :param refresh_rate: Refresh rate of blink information. Defaults to 15 (seconds) :param motion_interval: How far back to register motion in minutes. Defaults to last refresh time. Useful for preventing motion_detected property from de-asserting too quickly. - :param legacy_subdomain: Set to TRUE to use old 'rest.region' - endpoints (only use if you are having - api issues). - :param no_prompt: Set to TRUE if using an implementation that needs to - suppress command-line output. - :param persist_key: Location of persistant identifier. - :param device_id: Identifier for the application. Default is 'Blinkpy'. - This is used when logging in and should be changed to - fit the implementation (ie. "Home Assistant" in a - Home Assistant integration). + :param no_owls: Disable searching for owl entries (blink mini cameras only known entity). Prevents an uneccessary API call if you don't have these in your network. """ - self.login_handler = LoginHandler( - username=username, - password=password, - cred_file=cred_file, - persist_key=persist_key, - device_id=device_id, - ) - self._token = None - self._auth_header = None - self._host = None + self.auth = Auth() self.account_id = None self.client_id = None self.network_ids = [] self.urls = None self.sync = CaseInsensitiveDict({}) - self.region = None - self.region_id = None self.last_refresh = None self.refresh_rate = refresh_rate - self.session = create_session() self.networks = [] self.cameras = CaseInsensitiveDict({}) self.video_list = CaseInsensitiveDict({}) - self.login_url = LOGIN_URLS[0] - self.login_urls = [] self.motion_interval = motion_interval self.version = __version__ - self.legacy = legacy_subdomain - self.no_prompt = no_prompt self.available = False self.key_required = False - self.login_response = {} - - @property - def auth_header(self): - """Return the authentication header.""" - return self._auth_header + self.homescreen = {} + self.no_owls = no_owls - def start(self): + @util.Throttle(seconds=MIN_THROTTLE_TIME) + def refresh(self, force=False): """ - Perform full system setup. + Perform a system refresh. - Method logs in and sets auth token, urls, and ids for future requests. - Essentially this is just a wrapper function for ease of use. + :param force: Force an update of the camera data """ - if not self.available: - self.get_auth_token() - - if self.key_required and not self.no_prompt: - email = self.login_handler.data["username"] - key = input("Enter code sent to {}: ".format(email)) - result = self.login_handler.send_auth_key(self, key) - self.key_required = not result - self.setup_post_verify() - elif not self.key_required: - self.setup_post_verify() + if self.check_if_ok_to_update() or force: + if not self.available: + self.setup_post_verify() + + for sync_name, sync_module in self.sync.items(): + _LOGGER.debug("Attempting refresh of sync %s", sync_name) + sync_module.refresh(force_cache=force) + if not force: + # Prevents rapid clearing of motion detect property + self.last_refresh = int(time.time()) + return True + return False + + def start(self): + """Perform full system setup.""" + try: + self.auth.startup() + self.setup_login_ids() + self.setup_urls() + self.get_homescreen() + except (LoginError, TokenRefreshFailed, BlinkSetupError): + _LOGGER.error("Cannot setup Blink platform.") + self.available = False + return False + + self.key_required = self.auth.check_key_required() + if self.key_required: + if self.auth.no_prompt: + return True + self.setup_prompt_2fa() + return self.setup_post_verify() + + def setup_prompt_2fa(self): + """Prompt for 2FA.""" + email = self.auth.data["username"] + pin = input(f"Enter code sent to {email}: ") + result = self.auth.send_auth_key(self, pin) + self.key_required = not result def setup_post_verify(self): """Initialize blink system after verification.""" - camera_list = self.get_cameras() - networks = self.get_ids() - for network_name, network_id in networks.items(): - if network_id not in camera_list.keys(): - camera_list[network_id] = {} - _LOGGER.warning("No cameras found for %s", network_name) - sync_module = BlinkSyncModule( - self, network_name, network_id, camera_list[network_id] - ) - sync_module.start() - self.sync[network_name] = sync_module - self.cameras = self.merge_cameras() - self.available = self.refresh() - self.key_required = False - - def login(self): - """Perform server login. DEPRECATED.""" - _LOGGER.warning( - "Method is deprecated and will be removed in a future version. Please use the LoginHandler.login() method instead." - ) - return self.login_handler.login(self) - - def get_auth_token(self, is_retry=False): - """Retrieve the authentication token from Blink.""" - self.login_response = self.login_handler.login(self) - if not self.login_response: + try: + self.setup_networks() + networks = self.setup_network_ids() + cameras = self.setup_camera_list() + except BlinkSetupError: self.available = False return False - self.setup_params(self.login_response) - if self.login_handler.check_key_required(self): - self.key_required = True - return self._auth_header - - def setup_params(self, response): - """Retrieve blink parameters from login response.""" - self.login_url = self.login_handler.login_url - ((self.region_id, self.region),) = response["region"].items() - self._host = "{}.{}".format(self.region_id, BLINK_URL) - self._token = response["authtoken"]["authtoken"] - self._auth_header = {"TOKEN_AUTH": self._token} - self.urls = BlinkURLHandler(self.region_id, legacy=self.legacy) - self.networks = self.get_networks() - self.client_id = response["client"]["id"] - self.account_id = response["account"]["id"] - - def get_networks(self): - """Get network information.""" - response = api.request_networks(self) + + for name, network_id in networks.items(): + sync_cameras = cameras.get(network_id, {}) + self.setup_sync_module(name, network_id, sync_cameras) + + self.cameras = self.merge_cameras() + + self.available = True + self.key_required = False + return True + + def setup_sync_module(self, name, network_id, cameras): + """Initialize a sync module.""" + self.sync[name] = BlinkSyncModule(self, name, network_id, cameras) + self.sync[name].start() + + def get_homescreen(self): + """Get homecreen information.""" + if self.no_owls: + _LOGGER.debug("Skipping owl extraction.") + self.homescreen = {} + return + self.homescreen = api.request_homescreen(self) + + def setup_owls(self): + """Check for mini cameras.""" + network_list = [] + camera_list = [] try: - return response["summary"] + for owl in self.homescreen["owls"]: + name = owl["name"] + network_id = str(owl["network_id"]) + if network_id in self.network_ids: + camera_list.append( + {network_id: {"name": name, "id": network_id, "type": "mini"}} + ) + continue + if owl["onboarded"]: + network_list.append(str(network_id)) + self.sync[name] = BlinkOwl(self, name, network_id, owl) + self.sync[name].start() except KeyError: - return None - - def get_ids(self): - """Set the network ID and Account ID.""" - all_networks = [] - network_dict = {} - for network, status in self.networks.items(): - if status["onboarded"]: - all_networks.append("{}".format(network)) - network_dict[status["name"]] = network + # No sync-less devices found + pass - self.network_ids = all_networks - return network_dict + self.network_ids.extend(network_list) + return camera_list - def get_cameras(self): - """Retrieve a camera list for each onboarded network.""" - response = api.request_homescreen(self) + def setup_camera_list(self): + """Create camera list for onboarded networks.""" + all_cameras = {} + response = api.request_camera_usage(self) try: - all_cameras = {} - for camera in response["cameras"]: - camera_network = str(camera["network_id"]) - camera_name = camera["name"] - camera_id = camera["id"] - camera_info = {"name": camera_name, "id": camera_id} + for network in response["networks"]: + camera_network = str(network["network_id"]) if camera_network not in all_cameras: all_cameras[camera_network] = [] - - all_cameras[camera_network].append(camera_info) + for camera in network["cameras"]: + all_cameras[camera_network].append( + {"name": camera["name"], "id": camera["id"]} + ) + mini_cameras = self.setup_owls() + for camera in mini_cameras: + for network, camera_info in camera.items(): + all_cameras[network].append(camera_info) return all_cameras - except KeyError: - _LOGGER.error("Initialization failue. Could not retrieve cameras.") - return {} + except (KeyError, TypeError): + _LOGGER.error("Unable to retrieve cameras from response %s", response) + raise BlinkSetupError - @Throttle(seconds=MIN_THROTTLE_TIME) - def refresh(self, force_cache=False): - """ - Perform a system refresh. + def setup_login_ids(self): + """Retrieve login id numbers from login response.""" + self.client_id = self.auth.client_id + self.account_id = self.auth.account_id - :param force_cache: Force an update of the camera cache - """ - if self.check_if_ok_to_update() or force_cache: - for sync_name, sync_module in self.sync.items(): - _LOGGER.debug("Attempting refresh of sync %s", sync_name) - sync_module.refresh(force_cache=force_cache) - if not force_cache: - # Prevents rapid clearing of motion detect property - self.last_refresh = int(time.time()) - return True - return False + def setup_urls(self): + """Create urls for api.""" + try: + self.urls = util.BlinkURLHandler(self.auth.region_id) + except TypeError: + _LOGGER.error( + "Unable to extract region is from response %s", self.auth.login_response + ) + raise BlinkSetupError + + def setup_networks(self): + """Get network information.""" + response = api.request_networks(self) + try: + self.networks = response["summary"] + except (KeyError, TypeError): + raise BlinkSetupError + + def setup_network_ids(self): + """Create the network ids for onboarded networks.""" + all_networks = [] + network_dict = {} + try: + for network, status in self.networks.items(): + if status["onboarded"]: + all_networks.append(f"{network}") + network_dict[status["name"]] = network + except AttributeError: + _LOGGER.error( + "Unable to retrieve network information from %s", self.networks + ) + raise BlinkSetupError + + self.network_ids = all_networks + return network_dict def check_if_ok_to_update(self): """Check if it is ok to perform an http request.""" @@ -260,9 +255,13 @@ def merge_cameras(self): """Merge all sync camera dicts into one.""" combined = CaseInsensitiveDict({}) for sync in self.sync: - combined = merge_dicts(combined, self.sync[sync].cameras) + combined = util.merge_dicts(combined, self.sync[sync].cameras) return combined + def save(self, file_name): + """Save login data to file.""" + util.json_save(self.auth.login_attributes, file_name) + def download_videos(self, path, since=None, camera="all", stop=10, debug=False): """ Download all videos from server since specified time. @@ -270,7 +269,7 @@ def download_videos(self, path, since=None, camera="all", stop=10, debug=False): :param path: Path to write files. /path/_.mp4 :param since: Date and time to get videos from. Ex: "2018/07/28 12:33:00" to retrieve videos since - July 28th 2018 at 12:33:00 + July 28th 2018 at 12:33:00 :param camera: Camera name to retrieve. Defaults to "all". Use a list for multiple cameras. :param stop: Page to stop on (~25 items per page. Default page 10). @@ -283,7 +282,7 @@ def download_videos(self, path, since=None, camera="all", stop=10, debug=False): parsed_datetime = parse(since, fuzzy=True) since_epochs = parsed_datetime.timestamp() - formatted_date = get_time(time_to_convert=since_epochs) + formatted_date = util.get_time(time_to_convert=since_epochs) _LOGGER.info("Retrieving videos since %s", formatted_date) if not isinstance(camera, list): @@ -295,8 +294,8 @@ def download_videos(self, path, since=None, camera="all", stop=10, debug=False): try: result = response["media"] if not result: - raise IndexError - except (KeyError, IndexError): + raise KeyError + except (KeyError, TypeError): _LOGGER.info("No videos found on page %s. Exiting.", page) break @@ -322,9 +321,9 @@ def _parse_downloaded_items(self, result, camera, path, debug): _LOGGER.debug("%s: %s is marked as deleted.", camera_name, address) continue - clip_address = "{}{}".format(self.urls.base_url, address) - filename = "{}-{}".format(camera_name, created_at) - filename = "{}.mp4".format(slugify(filename)) + clip_address = f"{self.urls.base_url}{address}" + filename = f"{camera_name}-{created_at}" + filename = f"{slugify(filename)}.mp4" filename = os.path.join(path, filename) if not debug: @@ -332,14 +331,25 @@ def _parse_downloaded_items(self, result, camera, path, debug): _LOGGER.info("%s already exists, skipping...", filename) continue - response = api.http_get(self, url=clip_address, stream=True, json=False) + response = api.http_get( + self, + url=clip_address, + stream=True, + json=False, + timeout=TIMEOUT_MEDIA, + ) with open(filename, "wb") as vidfile: copyfileobj(response.raw, vidfile) _LOGGER.info("Downloaded video to %s", filename) else: print( - ("Camera: {}, Timestamp: {}, " "Address: {}, Filename: {}").format( - camera_name, created_at, address, filename + ( + f"Camera: {camera_name}, Timestamp: {created_at}, " + "Address: {address}, Filename: {filename}" ) ) + + +class BlinkSetupError(Exception): + """Class to handle setup errors.""" diff --git a/blinkpy/camera.py b/blinkpy/camera.py index a2f05bed..1715feee 100644 --- a/blinkpy/camera.py +++ b/blinkpy/camera.py @@ -3,6 +3,7 @@ from shutil import copyfileobj import logging from blinkpy import api +from blinkpy.helpers.constants import TIMEOUT_MEDIA _LOGGER = logging.getLogger(__name__) @@ -29,6 +30,7 @@ def __init__(self, sync): self.last_record = None self._cached_image = None self._cached_video = None + self.camera_type = "" @property def attributes(self): @@ -61,7 +63,10 @@ def battery(self): @property def temperature_c(self): """Return temperature in celcius.""" - return round((self.temperature - 32) / 9.0 * 5.0, 1) + try: + return round((self.temperature - 32) / 9.0 * 5.0, 1) + except TypeError: + return None @property def image_from_cache(self): @@ -77,12 +82,31 @@ def video_from_cache(self): return self._cached_video return None + @property + def arm(self): + """Return arm status of camera.""" + return self.motion_enabled + + @arm.setter + def arm(self, value): + """Set camera arm status.""" + if value: + return api.request_motion_detection_enable( + self.sync.blink, self.network_id, self.camera_id + ) + return api.request_motion_detection_disable( + self.sync.blink, self.network_id, self.camera_id + ) + def snap_picture(self): """Take a picture with camera to create a new thumbnail.""" return api.request_new_image(self.sync.blink, self.network_id, self.camera_id) def set_motion_detect(self, enable): """Set motion detection.""" + _LOGGER.warning( + "Method is deprecated as of v0.16.0 and will be removed in a future version. Please use the BlinkCamera.arm property instead." + ) if enable: return api.request_motion_detection_enable( self.sync.blink, self.network_id, self.camera_id @@ -93,41 +117,44 @@ def set_motion_detect(self, enable): def update(self, config, force_cache=False, **kwargs): """Update camera info.""" - # force = kwargs.pop('force', False) - self.name = config["name"] - self.camera_id = str(config["id"]) - self.network_id = str(config["network_id"]) - self.serial = config["serial"] - self.motion_enabled = config["enabled"] - self.battery_voltage = config["battery_voltage"] - self.battery_state = config["battery_state"] - self.temperature = config["temperature"] - self.wifi_strength = config["wifi_strength"] - - # Retrieve calibrated temperature from special endpoint + self.extract_config_info(config) + self.get_sensor_info() + self.update_images(config, force_cache=force_cache) + + def extract_config_info(self, config): + """Extract info from config.""" + self.name = config.get("name", "unknown") + self.camera_id = str(config.get("id", "unknown")) + self.network_id = str(config.get("network_id", "unknown")) + self.serial = config.get("serial", None) + self.motion_enabled = config.get("enabled", "unknown") + self.battery_voltage = config.get("battery_voltage", None) + self.battery_state = config.get("battery_state", None) + self.temperature = config.get("temperature", None) + self.wifi_strength = config.get("wifi_strength", None) + + def get_sensor_info(self): + """Retrieve calibrated temperatue from special endpoint.""" resp = api.request_camera_sensors( self.sync.blink, self.network_id, self.camera_id ) try: self.temperature_calibrated = resp["temp"] - except KeyError: + except (TypeError, KeyError): self.temperature_calibrated = self.temperature _LOGGER.warning("Could not retrieve calibrated temperature.") - # Check if thumbnail exists in config, if not try to - # get it from the homescreen info in the sync module - # otherwise set it to None and log an error + def update_images(self, config, force_cache=False): + """Update images for camera.""" new_thumbnail = None thumb_addr = None - if config["thumbnail"]: + if config.get("thumbnail", False): thumb_addr = config["thumbnail"] else: - _LOGGER.warning( - "Could not find thumbnail for camera %s", self.name, exc_info=True - ) + _LOGGER.warning("Could not find thumbnail for camera %s", self.name) if thumb_addr is not None: - new_thumbnail = "{}{}.jpg".format(self.sync.urls.base_url, thumb_addr) + new_thumbnail = f"{self.sync.urls.base_url}{thumb_addr}.jpg" try: self.motion_detected = self.sync.motion[self.name] @@ -135,10 +162,12 @@ def update(self, config, force_cache=False, **kwargs): self.motion_detected = False clip_addr = None - if self.name in self.sync.last_record: + try: clip_addr = self.sync.last_record[self.name]["clip"] self.last_record = self.sync.last_record[self.name]["time"] - self.clip = "{}{}".format(self.sync.urls.base_url, clip_addr) + self.clip = f"{self.sync.urls.base_url}{clip_addr}" + except KeyError: + pass # If the thumbnail or clip have changed, update the cache update_cached_image = False @@ -152,13 +181,28 @@ def update(self, config, force_cache=False, **kwargs): if new_thumbnail is not None and (update_cached_image or force_cache): self._cached_image = api.http_get( - self.sync.blink, url=self.thumbnail, stream=True, json=False + self.sync.blink, + url=self.thumbnail, + stream=True, + json=False, + timeout=TIMEOUT_MEDIA, ) if clip_addr is not None and (update_cached_video or force_cache): self._cached_video = api.http_get( - self.sync.blink, url=self.clip, stream=True, json=False + self.sync.blink, + url=self.clip, + stream=True, + json=False, + timeout=TIMEOUT_MEDIA, ) + def get_liveview(self): + """Get livewview rtsps link.""" + response = api.request_camera_liveview( + self.sync.blink, self.sync.network_id, self.camera_id + ) + return response["server"] + def image_to_file(self, path): """ Write image to file. @@ -176,7 +220,8 @@ def image_to_file(self, path): ) def video_to_file(self, path): - """Write video to file. + """ + Write video to file. :param path: Path to write file """ @@ -187,3 +232,42 @@ def video_to_file(self, path): return with open(path, "wb") as vidfile: copyfileobj(response.raw, vidfile) + + +class BlinkCameraMini(BlinkCamera): + """Define a class for a Blink Mini camera.""" + + def __init__(self, sync): + """Initialize a Blink Mini cameras.""" + super().__init__(sync) + self.camera_type = "mini" + + @property + def arm(self): + """Return camera arm status.""" + return self.sync.arm + + @arm.setter + def arm(self, value): + """Set camera arm status.""" + _LOGGER.warning( + "Individual camera motion detection enable/disable for Blink Mini cameras is unsupported at this time." + ) + + def snap_picture(self): + """Snap picture for a blink mini camera.""" + url = f"{self.sync.urls.base_url}/api/v1/accounts/{self.sync.blink.account_id}/networks/{self.network_id}/owls/{self.camera_id}/thumbnail" + return api.http_post(self.sync.blink, url) + + def get_sensor_info(self): + """Get sensor info for blink mini camera.""" + + def get_liveview(self): + """Get liveview link.""" + url = f"{self.sync.urls.base_url}/api/v1/accounts/{self.sync.blink.account_id}/networks/{self.network_id}/owls/{self.camera_id}/liveview" + response = api.http_post(self.sync.blink, url) + server = response["server"] + server_split = server.split(":") + server_split[0] = "rtsps" + link = "".join(server_split) + return link diff --git a/blinkpy/helpers/constants.py b/blinkpy/helpers/constants.py index 36ffad62..180e6598 100644 --- a/blinkpy/helpers/constants.py +++ b/blinkpy/helpers/constants.py @@ -3,18 +3,18 @@ import os MAJOR_VERSION = 0 -MINOR_VERSION = 15 -PATCH_VERSION = 1 +MINOR_VERSION = 16 +PATCH_VERSION = 0 -__version__ = "{}.{}.{}".format(MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION) +__version__ = f"{MAJOR_VERSION}.{MINOR_VERSION}.{PATCH_VERSION}" -REQUIRED_PYTHON_VER = (3, 5, 3) +REQUIRED_PYTHON_VER = (3, 6, 0) PROJECT_NAME = "blinkpy" PROJECT_PACKAGE_NAME = "blinkpy" PROJECT_LICENSE = "MIT" PROJECT_AUTHOR = "Kevin Fronczak" -PROJECT_COPYRIGHT = " 2017, {}".format(PROJECT_AUTHOR) +PROJECT_COPYRIGHT = f" 2017, {PROJECT_AUTHOR}" PROJECT_URL = "https://github.com/fronzbot/blinkpy" PROJECT_EMAIL = "kfronczak@gmail.com" PROJECT_DESCRIPTION = "A Blink camera Python library " "running on Python 3." @@ -40,19 +40,15 @@ PROJECT_GITHUB_USERNAME = "fronzbot" PROJECT_GITHUB_REPOSITORY = "blinkpy" -PYPI_URL = "https://pypi.python.org/pypi/{}".format(PROJECT_PACKAGE_NAME) +PYPI_URL = f"https://pypi.python.org/pypi/{PROJECT_PACKAGE_NAME}" """ URLS """ BLINK_URL = "immedia-semi.com" -DEFAULT_URL = "{}.{}".format("rest-prod", BLINK_URL) -BASE_URL = "https://{}".format(DEFAULT_URL) -LOGIN_URLS = [ - "{}/api/v4/login".format(BASE_URL), - "{}/api/v3/login".format(BASE_URL), - "{}/api/v2/login".format(BASE_URL), -] +DEFAULT_URL = f"rest-prod.{BLINK_URL}" +BASE_URL = f"https://{DEFAULT_URL}" +LOGIN_ENDPOINT = f"{BASE_URL}/api/v4/account/login" """ Dictionaries @@ -62,9 +58,12 @@ """ OTHER """ +DEVICE_ID = "Blinkpy" TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S%z" DEFAULT_MOTION_INTERVAL = 1 DEFAULT_REFRESH = 30 MIN_THROTTLE_TIME = 2 SIZE_NOTIFICATION_KEY = 152 SIZE_UID = 16 +TIMEOUT = 10 +TIMEOUT_MEDIA = 90 diff --git a/blinkpy/helpers/errors.py b/blinkpy/helpers/errors.py index 3f39086e..e701f2c1 100644 --- a/blinkpy/helpers/errors.py +++ b/blinkpy/helpers/errors.py @@ -12,4 +12,4 @@ ) REQUEST = (4, "Cannot perform request (get/post type incorrect)") -BLINK_ERRORS = [101, 400, 404] +BLINK_ERRORS = [400, 404] diff --git a/blinkpy/helpers/util.py b/blinkpy/helpers/util.py index 9079e3d3..3bc7380f 100644 --- a/blinkpy/helpers/util.py +++ b/blinkpy/helpers/util.py @@ -1,19 +1,38 @@ """Useful functions for blinkpy.""" +import json import logging import time import secrets from calendar import timegm -from functools import partial, wraps -from requests import Request, Session, exceptions +from functools import wraps +from getpass import getpass import dateutil.parser -from blinkpy.helpers.constants import BLINK_URL, TIMESTAMP_FORMAT -import blinkpy.helpers.errors as ERROR +from blinkpy.helpers import constants as const _LOGGER = logging.getLogger(__name__) +def json_load(file_name): + """Load json credentials from file.""" + try: + with open(file_name, "r") as json_file: + data = json.load(json_file) + return data + except FileNotFoundError: + _LOGGER.error("Could not find %s", file_name) + except json.decoder.JSONDecodeError: + _LOGGER.error("File %s has improperly formatted json", file_name) + return None + + +def json_save(data, file_name): + """Save data to file location.""" + with open(file_name, "w") as json_file: + json.dump(data, json_file, indent=4) + + def gen_uid(size): """Create a random sring.""" full_token = secrets.token_hex(size) @@ -34,7 +53,7 @@ def get_time(time_to_convert=None): """Create blink-compatible timestamp.""" if time_to_convert is None: time_to_convert = time.time() - return time.strftime(TIMESTAMP_FORMAT, time.gmtime(time_to_convert)) + return time.strftime(const.TIMESTAMP_FORMAT, time.gmtime(time_to_convert)) def merge_dicts(dict_a, dict_b): @@ -48,103 +67,25 @@ def merge_dicts(dict_a, dict_b): return {**dict_a, **dict_b} -def create_session(): - """ - Create a session for blink communication. - - From @ericfrederich via - https://github.com/kennethreitz/requests/issues/2011 - """ - sess = Session() - sess.get = partial(sess.get, timeout=5) - return sess - - -def attempt_reauthorization(blink): - """Attempt to refresh auth token and links.""" - _LOGGER.info("Auth token expired, attempting reauthorization.") - headers = blink.get_auth_token(is_retry=True) - return headers - - -def http_req( - blink, - url="http://example.com", - data=None, - headers=None, - reqtype="get", - stream=False, - json_resp=True, - is_retry=False, -): - """ - Perform server requests and check if reauthorization neccessary. - - :param blink: Blink instance - :param url: URL to perform request - :param data: Data to send (default: None) - :param headers: Headers to send (default: None) - :param reqtype: Can be 'get' or 'post' (default: 'get') - :param stream: Stream response? True/FALSE - :param json_resp: Return JSON response? TRUE/False - :param is_retry: Is this a retry attempt? True/FALSE - """ - if reqtype == "post": - req = Request("POST", url, headers=headers, data=data) - elif reqtype == "get": - req = Request("GET", url, headers=headers) - else: - _LOGGER.error("Invalid request type: %s", reqtype) - raise BlinkException(ERROR.REQUEST) - - prepped = req.prepare() +def prompt_login_data(data): + """Prompt user for username and password.""" + if data["username"] is None: + data["username"] = input("Username:") + if data["password"] is None: + data["password"] = getpass("Password:") - try: - response = blink.session.send(prepped, stream=stream) - if json_resp and "code" in response.json(): - resp_dict = response.json() - code = resp_dict["code"] - message = resp_dict["message"] - if is_retry and code in ERROR.BLINK_ERRORS: - _LOGGER.error("Cannot obtain new token for server auth.") - return None - elif code in ERROR.BLINK_ERRORS: - headers = attempt_reauthorization(blink) - if not headers: - raise exceptions.ConnectionError - return http_req( - blink, - url=url, - data=data, - headers=headers, - reqtype=reqtype, - stream=stream, - json_resp=json_resp, - is_retry=True, - ) - _LOGGER.warning("Response from server: %s - %s", code, message) - - except (exceptions.ConnectionError, exceptions.Timeout): - _LOGGER.info("Cannot connect to server with url %s.", url) - if not is_retry: - headers = attempt_reauthorization(blink) - return http_req( - blink, - url=url, - data=data, - headers=headers, - reqtype=reqtype, - stream=stream, - json_resp=json_resp, - is_retry=True, - ) - _LOGGER.error("Endpoint %s failed. Possible issue with Blink servers.", url) - return None - - if json_resp: - return response.json() - - return response + return data + + +def validate_login_data(data): + """Check for missing keys.""" + data["uid"] = data.get("uid", gen_uid(const.SIZE_UID)) + data["notification_key"] = data.get( + "notification_key", gen_uid(const.SIZE_NOTIFICATION_KEY) + ) + data["device_id"] = data.get("device_id", const.DEVICE_ID) + + return data class BlinkException(Exception): @@ -164,17 +105,17 @@ class BlinkAuthenticationException(BlinkException): class BlinkURLHandler: """Class that handles Blink URLS.""" - def __init__(self, region_id, legacy=False): + def __init__(self, region_id): """Initialize the urls.""" - self.subdomain = "rest-{}".format(region_id) - if legacy: - self.subdomain = "rest.{}".format(region_id) - self.base_url = "https://{}.{}".format(self.subdomain, BLINK_URL) - self.home_url = "{}/homescreen".format(self.base_url) - self.event_url = "{}/events/network".format(self.base_url) - self.network_url = "{}/network".format(self.base_url) - self.networks_url = "{}/networks".format(self.base_url) - self.video_url = "{}/api/v2/videos".format(self.base_url) + if region_id is None: + raise TypeError + self.subdomain = f"rest-{region_id}" + self.base_url = f"https://{self.subdomain}.{const.BLINK_URL}" + self.home_url = f"{self.base_url}/homescreen" + self.event_url = f"{self.base_url}/events/network" + self.network_url = f"{self.base_url}/network" + self.networks_url = f"{self.base_url}/networks" + self.video_url = f"{self.base_url}/api/v2/videos" _LOGGER.debug("Setting base url to %s.", self.base_url) diff --git a/blinkpy/login_handler.py b/blinkpy/login_handler.py deleted file mode 100644 index 2e978f75..00000000 --- a/blinkpy/login_handler.py +++ /dev/null @@ -1,154 +0,0 @@ -"""Login handler for blink.""" -import json -import logging -from os.path import isfile -from getpass import getpass -from blinkpy import api -from blinkpy.helpers import util -from blinkpy.helpers import constants as const - -_LOGGER = logging.getLogger(__name__) - - -class LoginHandler: - """Class to handle login communication.""" - - def __init__( - self, - username=None, - password=None, - cred_file=None, - persist_key=None, - device_id="Blinkpy", - ): - """ - Initialize login handler. - - :param username: Blink username - :param password: Blink password - :param cred_file: JSON formatted credential file. - :param persist_key: File location of persistant key. - :param device_id: Name of application to send at login. - """ - self.login_url = None - self.login_urls = const.LOGIN_URLS - self.cred_file = cred_file - self.persist_key = persist_key - self.device_id = device_id - self.data = { - "username": username, - "password": password, - "uid": None, - "notification_key": None, - } - - self.check_keys() - - def check_keys(self): - """Check if uid exists, if not create.""" - uid = util.gen_uid(const.SIZE_UID) - notification_key = util.gen_uid(const.SIZE_NOTIFICATION_KEY) - data = {"uid": uid, "notification_key": notification_key} - if self.persist_key is None: - return data - if not isfile(self.persist_key): - with open(self.persist_key, "w") as json_file: - json.dump(data, json_file) - else: - with open(self.persist_key, "r") as json_file: - data = json.load(json_file) - return data - - def check_cred_file(self): - """Check if credential file supplied and use if so.""" - if isfile(self.cred_file): - try: - with open(self.cred_file, "r") as json_file: - creds = json.load(json_file) - self.data["username"] = creds["username"] - self.data["password"] = creds["password"] - - except ValueError: - _LOGGER.error( - "Improperly formatted json file %s.", self.cred_file, exc_info=True - ) - return False - - except KeyError: - _LOGGER.error("JSON file information incomplete %s.", exc_info=True) - return False - return True - return False - - def check_login(self): - """Check login information and prompt if not available.""" - if self.data["username"] is None: - self.data["username"] = input("Username:") - if self.data["password"] is None: - self.data["password"] = getpass("Password:") - - if self.data["username"] and self.data["password"]: - return True - return False - - def validate_response(self, url, response): - """Validate response from login endpoint.""" - try: - if response.status_code != 200: - return False - except AttributeError: - _LOGGER.error( - "Response for %s did not return a status code. Deprecated endpoint?", - url, - ) - return False - return True - - def login(self, blink): - """Attempt login to blink servers.""" - if self.cred_file is not None: - self.check_cred_file() - if not self.check_login(): - _LOGGER.error("Cannot login with username %s", self.data["username"]) - return False - - for url in self.login_urls: - _LOGGER.info("Attempting login with %s", url) - response = api.request_login( - blink, - url, - self.data["username"], - self.data["password"], - self.data["notification_key"], - self.data["uid"], - is_retry=False, - device_id=self.device_id, - ) - - if self.validate_response(url, response): - self.login_url = url - return response.json() - - _LOGGER.error("Failed to login to Blink servers. Last response: %s", response) - return False - - def send_auth_key(self, blink, key): - """Send 2FA key to blink servers.""" - if key is not None: - response = api.request_verify(blink, key) - try: - json_resp = response.json() - blink.available = json_resp["valid"] - except (KeyError, TypeError): - _LOGGER.error("Did not receive valid response from server.") - return False - return True - - def check_key_required(self, blink): - """Check if 2FA key is required.""" - try: - if blink.login_response["client"]["verification_required"]: - return True - except KeyError: - pass - return False diff --git a/blinkpy/sync_module.py b/blinkpy/sync_module.py index def98104..ad780a16 100644 --- a/blinkpy/sync_module.py +++ b/blinkpy/sync_module.py @@ -4,7 +4,7 @@ from requests.structures import CaseInsensitiveDict from blinkpy import api -from blinkpy.camera import BlinkCamera +from blinkpy.camera import BlinkCamera, BlinkCameraMini from blinkpy.helpers.util import time_to_seconds from blinkpy.helpers.constants import ONLINE @@ -21,13 +21,11 @@ def __init__(self, blink, network_name, network_id, camera_list): :param blink: Blink class instantiation """ self.blink = blink - self._auth_header = blink.auth_header self.network_id = network_id - self.region = blink.region - self.region_id = blink.region_id + self.region_id = blink.auth.region_id self.name = network_name self.serial = None - self.status = None + self.status = "offline" self.sync_id = None self.host = None self.summary = None @@ -49,7 +47,6 @@ def attributes(self): "network_id": self.network_id, "serial": self.serial, "status": self.status, - "region": self.region, "region_id": self.region_id, } return attr @@ -62,7 +59,12 @@ def urls(self): @property def online(self): """Return boolean system online status.""" - return ONLINE[self.status] + try: + return ONLINE[self.status] + except KeyError: + _LOGGER.error("Unknown sync module status %s", self.status) + self.available = False + return False @property def arm(self): @@ -70,28 +72,20 @@ def arm(self): try: return self.network_info["network"]["armed"] except (KeyError, TypeError): + self.available = False return None @arm.setter def arm(self, value): - """Arm or disarm system.""" + """Arm or disarm camera.""" if value: return api.request_system_arm(self.blink, self.network_id) - return api.request_system_disarm(self.blink, self.network_id) def start(self): """Initialize the system.""" - response = api.request_syncmodule(self.blink, self.network_id) - try: - self.summary = response["syncmodule"] - self.network_id = self.summary["network_id"] - except (TypeError, KeyError): - _LOGGER.error( - ("Could not retrieve sync module information " "with response: %s"), - response, - exc_info=True, - ) + response = self.sync_initialize() + if not response: return False try: @@ -99,30 +93,62 @@ def start(self): self.serial = self.summary["serial"] self.status = self.summary["status"] except KeyError: + _LOGGER.error("Could not extract some sync module info: %s", response) + + is_ok = self.get_network_info() + self.check_new_videos() + + if not is_ok or not self.update_cameras(): + return False + self.available = True + return True + + def sync_initialize(self): + """Initialize a sync module.""" + response = api.request_syncmodule(self.blink, self.network_id) + try: + self.summary = response["syncmodule"] + self.network_id = self.summary["network_id"] + except (TypeError, KeyError): _LOGGER.error( - "Could not extract some sync module info: %s", response, exc_info=True + "Could not retrieve sync module information with response: %s", response ) + return False + return response - self.get_network_info() - self.check_new_videos() + def update_cameras(self, camera_type=BlinkCamera): + """Update cameras from server.""" try: for camera_config in self.camera_list: if "name" not in camera_config: break + blink_camera_type = camera_config.get("type", "") name = camera_config["name"] - self.cameras[name] = BlinkCamera(self) self.motion[name] = False - camera_info = self.get_camera_info(camera_config["id"]) + owl_info = self.get_owl_info(name) + if blink_camera_type == "mini": + camera_type = BlinkCameraMini + self.cameras[name] = camera_type(self) + camera_info = self.get_camera_info( + camera_config["id"], owl_info=owl_info + ) self.cameras[name].update(camera_info, force_cache=True, force=True) + except KeyError: - _LOGGER.error( - "Could not create cameras instances for %s", self.name, exc_info=True - ) + _LOGGER.error("Could not create camera instances for %s", self.name) return False - - self.available = True return True + def get_owl_info(self, name): + """Extract owl information.""" + try: + for owl in self.blink.homescreen["owls"]: + if owl["name"] == name: + return owl + except KeyError: + pass + return None + def get_events(self, **kwargs): """Retrieve events from server.""" force = kwargs.pop("force", False) @@ -130,38 +156,44 @@ def get_events(self, **kwargs): try: return response["event"] except (TypeError, KeyError): - _LOGGER.error("Could not extract events: %s", response, exc_info=True) + _LOGGER.error("Could not extract events: %s", response) return False - def get_camera_info(self, camera_id): + def get_camera_info(self, camera_id, **kwargs): """Retrieve camera information.""" + owl = kwargs.get("owl_info", None) + if owl is not None: + return owl response = api.request_camera_info(self.blink, self.network_id, camera_id) try: return response["camera"][0] except (TypeError, KeyError): - _LOGGER.error("Could not extract camera info: %s", response, exc_info=True) - return [] + _LOGGER.error("Could not extract camera info: %s", response) + return {} def get_network_info(self): """Retrieve network status.""" - is_errored = False - self.network_info = api.request_network_status(self.blink, self.network_id) + self.network_info = api.request_network_update(self.blink, self.network_id) try: - is_errored = self.network_info["network"]["sync_module_error"] - except KeyError: - is_errored = True - - if is_errored: + if self.network_info["network"]["sync_module_error"]: + raise KeyError + except (TypeError, KeyError): self.available = False + return False + return True def refresh(self, force_cache=False): """Get all blink cameras and pulls their most recent status.""" - self.get_network_info() + if not self.get_network_info(): + return self.check_new_videos() for camera_name in self.cameras.keys(): camera_id = self.cameras[camera_name].camera_id - camera_info = self.get_camera_info(camera_id) + camera_info = self.get_camera_info( + camera_id, owl_info=self.get_owl_info(camera_name) + ) self.cameras[camera_name].update(camera_info, force_cache=force_cache) + self.available = True def check_new_videos(self): """Check if new videos since last refresh.""" @@ -199,3 +231,66 @@ def check_new_videos(self): def check_new_video_time(self, timestamp): """Check if video has timestamp since last refresh.""" return time_to_seconds(timestamp) > self.blink.last_refresh + + +class BlinkOwl(BlinkSyncModule): + """Representation of a sync-less device.""" + + def __init__(self, blink, name, network_id, response): + """Initialize a sync-less object.""" + cameras = [{"name": name, "id": response["id"]}] + super().__init__(blink, name, network_id, cameras) + self.sync_id = response["id"] + self.serial = response["serial"] + self.status = response["enabled"] + if not self.serial: + self.serial = f"{network_id}-{self.sync_id}" + + def sync_initialize(self): + """Initialize a sync-less module.""" + self.summary = { + "id": self.sync_id, + "name": self.name, + "serial": self.serial, + "status": self.status, + "onboarded": True, + "account_id": self.blink.account_id, + "network_id": self.network_id, + } + return self.summary + + def update_cameras(self, camera_type=BlinkCameraMini): + """Update sync-less cameras.""" + return super().update_cameras(camera_type=BlinkCameraMini) + + def get_camera_info(self, camera_id, **kwargs): + """Retrieve camera information.""" + try: + for owl in self.blink.homescreen["owls"]: + if owl["name"] == self.name: + self.status = owl["enabled"] + return owl + except KeyError: + pass + return None + + def get_network_info(self): + """Get network info for sync-less module.""" + return True + + @property + def network_info(self): + """Format owl response to resemble sync module.""" + return { + "network": { + "id": self.network_id, + "name": self.name, + "armed": self.status, + "sync_module_error": False, + "account_id": self.blink.account_id, + } + } + + @network_info.setter + def network_info(self, value): + """Set network_info property.""" diff --git a/codecov.yml b/codecov.yml index abe7a216..42f2739b 100644 --- a/codecov.yml +++ b/codecov.yml @@ -4,10 +4,12 @@ coverage: status: project: default: + target: 80% threshold: 2% + patch: + default: + target: 60% + threshold: 5% comment: true require_ci_to_pass: yes -range: 65..90 -round: down -precision: 1 diff --git a/docs/CHANGES.rst b/docs/CHANGES.rst new file mode 120000 index 00000000..9d60ba96 --- /dev/null +++ b/docs/CHANGES.rst @@ -0,0 +1 @@ +../CHANGES.rst \ No newline at end of file diff --git a/docs/CONTRIBUTING.rst b/docs/CONTRIBUTING.rst new file mode 120000 index 00000000..798f2aa2 --- /dev/null +++ b/docs/CONTRIBUTING.rst @@ -0,0 +1 @@ +../CONTRIBUTING.rst \ No newline at end of file diff --git a/docs/advanced.rst b/docs/advanced.rst new file mode 100644 index 00000000..95bf6037 --- /dev/null +++ b/docs/advanced.rst @@ -0,0 +1,104 @@ +======================= +Advanced Library Usage +======================= + +Usage of this library was designed with the `Home Assistant `__ project in mind. With that said, this library is flexible to be used in other scripts where advanced usage not covered in the Quick Start guide may be required. This usage guide will attempt to cover as many use cases as possible. + +Throttling +-------------- +In general, attempting too many requests to the Blink servers will result in your account being throttled. Where possible, adding a delay between calls is ideal. For use cases where this is not an acceptable solution, the ``blinkpy.helpers.util`` module contains a ``Throttle`` class that can be used as a decorator for calls. There are many examples of usage within the ``blinkpy.api`` module. A simple example of usage is covered below, where the decorated method is prevented from executing again until 10s has passed. Note that if the method call is throttled by the decorator, the method will return `None`. + +.. code:: python + + from blinkpy.helpers.util import Throttle + + @Throttle(seconds=10) + def my_method(*args): + """Some method to be throttled.""" + return True + +Custom Sessions +----------------- +By default, the ``blink.auth.Auth`` class creates its own websession via its ``create_session`` method. This is done when the class is initialized and is accessible via the ``Auth.session`` property. To override with a custom websession, the following code can accomplish that: + +.. code:: python + + from blinkpy.blinkpy import Blink + from blinkpy.auth import Auth + + blink = Blink() + blink.auth = Auth() + blink.auth.session = YourCustomSession + + +Custom Retry Logic +-------------------- +The built-in auth session via the ``create_session`` method allows for customizable retry intervals and conditions. These parameters are: + +- retries +- backoff +- retry_list + +``retries`` is the total number of retry attempts that each http request can do before timing out. ``backoff`` is a parameter that allows for non-linear retry times such that the time between retries is backoff*(2^(retries) - 1). ``retry_list`` is simply a list of status codes to force a retry. By default ``retries=3``, ``backoff=1``, and ``retry_list=[429, 500, 502, 503, 504]``. To override them, you need to add you overrides to a dictionary and use that to create a new session with the ``opts`` variable in the ``create_session`` method. The following example can serve as a guide where only the number of retries and backoff factor are overridden: + +.. code:: python + + from blinkpy.blinkpy import Blink + from blinkpy.auth import Auth + + blink = Blink() + blink.auth = Auth() + + opts = {"retries": 10, "backoff": 2} + blink.auth.session = blink.auth.create_session(opts=opts) + + +Custom HTTP requests +--------------------- +In addition to custom sessions, custom blink server requests can be performed. This give you the ability to bypass the built-in ``Auth.query`` method. It also allows flexibility by giving you the option to pass your own url, rather than be limited to what is currently implemented in the ``blinkpy.api`` module. + +**Send custom url** +This prepares a standard "GET" request. + +.. code:: python + + from blinkpy.blinkpy import Blink + from blinkpy.auth import Auth + + blink = Blink() + blink.auth = Auth() + url = some_api_endpoint_string + request = blink.auth.prepare_request(url, blink.auth.header, None, "get") + response = blink.auth.session.send(request) + +**Overload query method** +Another option is to create your own ``Auth`` class with a custom ``query`` method to avoid the built-in response checking. This allows you to use the built in ``blinkpy.api`` endpoints, but also gives you flexibility to send your own urls. + +.. code:: python + + from blinkpy.blinkpy import Blink + from blinkpy.auth import Auth + from blinkpy import api + + class CustomAuth(Auth): + def query( + self, + url=None, + data=None, + headers=self.header, + reqtype="get", + stream=False, + json_resp=True, + **kwargs + ): + req = self.prepare_request(url, headers, data, reqtype) + return self.session.send(req, stream=stream) + + blink = blink.Blink() + blink.auth = CustomAuth() + + # Send custom GET query + response = blink.auth.query(url=some_custom_url) + + # Call built-in networks api endpoint + response = api.request_networks(blink) diff --git a/docs/api/blinkpy.rst b/docs/api/blinkpy.rst deleted file mode 100644 index edd9efa5..00000000 --- a/docs/api/blinkpy.rst +++ /dev/null @@ -1,17 +0,0 @@ -.. _core_module: - -Blinkpy ----------------------- - -.. automodule:: blinkpy.blinkpy - :members: - -.. automodule:: blinkpy.sync_module - :members: - -.. automodule:: blinkpy.camera - :members: - -.. automodule:: blinkpy.helpers.util - :members: - diff --git a/docs/api/implemented.rst b/docs/api/implemented.rst deleted file mode 100644 index d0ed9b1f..00000000 --- a/docs/api/implemented.rst +++ /dev/null @@ -1,6 +0,0 @@ -API Reference ----------------------- - -.. automodule:: blinkpy.api - :members: - diff --git a/docs/index.rst b/docs/index.rst index d31aabed..50951612 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -7,12 +7,16 @@ Welcome to blinkpy's documentation! =================================== .. toctree:: - :maxdepth: 2 + :maxdepth: 1 :caption: Contents: :glob: README - api/* + advanced + CONTRIBUTING + modules/* + CHANGES + Indices and tables ================== diff --git a/docs/modules/blinkpy.rst b/docs/modules/blinkpy.rst new file mode 100644 index 00000000..1dc8d18b --- /dev/null +++ b/docs/modules/blinkpy.rst @@ -0,0 +1,41 @@ +.. _core_module: + +=========================== +Blinkpy Library Reference +=========================== + +blinkpy.py +----------- +.. automodule:: blinkpy.blinkpy + :members: + +auth.py +-------- + +.. automodule:: blinkpy.auth + :members: + +sync_module.py +---------------- + +.. automodule:: blinkpy.sync_module + :members: + +camera.py +----------- + +.. automodule:: blinkpy.camera + :members: + +api.py +--------- + +.. automodule:: blinkpy.api + :members: + +helpers/util.py +---------------- + +.. automodule:: blinkpy.helpers.util + :members: + diff --git a/requirements.txt b/requirements.txt index 476bd19f..9e0e0208 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ python-dateutil~=2.8.1 -requests~=2.23.0 -python-slugify~=4.0.0 +requests~=2.24.0 +python-slugify~=4.0.1 testtools==2.4.0 diff --git a/requirements_test.txt b/requirements_test.txt index 870520ce..378ac722 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -1,12 +1,13 @@ black==19.10b0 -coverage==5.1 -flake8==3.7.9 +coverage==5.2 +flake8==3.8.3 flake8-docstrings==1.5.0 -pylint==2.5.2 +pre-commit==2.6.0 +pylint==2.5.3 pydocstyle==5.0.2 -pytest==5.4.1 -pytest-cov==2.8.1 -pytest-sugar==0.9.3 -pytest-timeout==1.3.4 -restructuredtext-lint==1.3.0 -pygments==2.6.1 \ No newline at end of file +pytest==5.4.3 +pytest-cov==2.10.0 +pytest-sugar==0.9.4 +pytest-timeout==1.4.2 +restructuredtext-lint==1.3.1 +pygments==2.6.1 diff --git a/setup.py b/setup.py index 2d19da0f..fc0b76f9 100644 --- a/setup.py +++ b/setup.py @@ -3,23 +3,26 @@ from os.path import abspath, dirname from setuptools import setup, find_packages from blinkpy.helpers.constants import ( - __version__, PROJECT_PACKAGE_NAME, PROJECT_LICENSE, PROJECT_URL, - PROJECT_EMAIL, PROJECT_DESCRIPTION, PROJECT_CLASSIFIERS, PROJECT_AUTHOR, + __version__, + PROJECT_PACKAGE_NAME, + PROJECT_LICENSE, + PROJECT_URL, + PROJECT_EMAIL, + PROJECT_DESCRIPTION, + PROJECT_CLASSIFIERS, + PROJECT_AUTHOR, ) PROJECT_VERSION = __version__ THIS_DIR = abspath(dirname(__file__)) -REQUIRES = [ - "python-dateutil~=2.8.1", - "requests~=2.23.0", - "python-slugify~=4.0.0", -] +with open(f"{THIS_DIR}/requirements.txt") as req_file: + REQUIRES = [line.rstrip() for line in req_file] -PACKAGES = find_packages(exclude=['tests*', 'docs']) +PACKAGES = find_packages(exclude=["tests*", "docs"]) -with open('{}/README.rst'.format(THIS_DIR), encoding='utf-8') as readme_file: +with open("{}/README.rst".format(THIS_DIR), encoding="utf-8") as readme_file: LONG_DESCRIPTION = readme_file.read() setup( @@ -31,11 +34,11 @@ author_email=PROJECT_EMAIL, license=PROJECT_LICENSE, url=PROJECT_URL, - platforms='any', - py_modules=['blinkpy'], + platforms="any", + py_modules=["blinkpy"], packages=PACKAGES, include_package_data=True, install_requires=REQUIRES, - test_suite='tests', - classifiers=PROJECT_CLASSIFIERS + test_suite="tests", + classifiers=PROJECT_CLASSIFIERS, ) diff --git a/tests/mock_responses.py b/tests/mock_responses.py index ea837765..a789e803 100644 --- a/tests/mock_responses.py +++ b/tests/mock_responses.py @@ -1,16 +1,5 @@ """Simple mock responses definitions.""" -from blinkpy.helpers.util import BlinkURLHandler -import blinkpy.helpers.constants as const - -LOGIN_RESPONSE = { - "region": {"mock": "Test"}, - "networks": {"1234": {"name": "test", "onboarded": True}}, - "authtoken": {"authtoken": "foobar123", "message": "auth"}, - "client": {"id": "5678"}, - "account": {"id": "1337"}, -} - class MockResponse: """Class for mock request response.""" @@ -20,6 +9,7 @@ def __init__(self, json_data, status_code, raw_data=None): self.json_data = json_data self.status_code = status_code self.raw_data = raw_data + self.reason = "foobar" def json(self): """Return json data from get_request.""" @@ -29,41 +19,3 @@ def json(self): def raw(self): """Return raw data from get request.""" return self.raw_data - - -def mocked_session_send(*args, **kwargs): - """Mock session.""" - prepped = args[0] - url = prepped.url - header = prepped.headers - method = prepped.method - if method == "GET": - expected_token = LOGIN_RESPONSE["authtoken"]["authtoken"] - if header["TOKEN_AUTH"] != expected_token: - response = {"message": "Not Authorized", "code": 400} - status = 400 - elif url == "use_bad_response": - response = {"foo": "bar"} - status = 200 - elif url == "reauth": - response = {"message": "REAUTH", "code": 777} - status = 777 - else: - response = {"test": "foo"} - status = 200 - elif method == "POST": - if url in const.LOGIN_URLS: - response = LOGIN_RESPONSE - status = 200 - elif url == "http://wrong.url/" or url is None: - response = {"message": "Error", "code": 404} - status = 404 - else: - response = {"message": "foo", "code": 200} - status = 200 - - return MockResponse(response, status) - - -class MockURLHandler(BlinkURLHandler): - """Mocks URL Handler in blinkpy module.""" diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 00000000..1c85a6d7 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,299 @@ +"""Test login handler.""" + +import unittest +from unittest import mock +from requests import exceptions +from blinkpy.auth import ( + Auth, + LoginError, + TokenRefreshFailed, + BlinkBadResponse, + UnauthorizedError, +) +import blinkpy.helpers.constants as const +import tests.mock_responses as mresp + +USERNAME = "foobar" +PASSWORD = "deadbeef" + + +class TestAuth(unittest.TestCase): + """Test the Auth class in blinkpy.""" + + def setUp(self): + """Set up Login Handler.""" + self.auth = Auth() + + def tearDown(self): + """Clean up after test.""" + self.auth = None + + @mock.patch("blinkpy.helpers.util.gen_uid") + @mock.patch("blinkpy.auth.util.getpass") + def test_empty_init(self, getpwd, genuid): + """Test initialization with no params.""" + auth = Auth() + self.assertDictEqual(auth.data, {}) + getpwd.return_value = "bar" + genuid.return_value = 1234 + with mock.patch("builtins.input", return_value="foo"): + auth.validate_login() + expected_data = { + "username": "foo", + "password": "bar", + "uid": 1234, + "notification_key": 1234, + "device_id": const.DEVICE_ID, + } + self.assertDictEqual(auth.data, expected_data) + + @mock.patch("blinkpy.helpers.util.gen_uid") + @mock.patch("blinkpy.auth.util.getpass") + def test_barebones_init(self, getpwd, genuid): + """Test basebones initialization.""" + login_data = {"username": "foo", "password": "bar"} + auth = Auth(login_data) + self.assertDictEqual(auth.data, login_data) + getpwd.return_value = "bar" + genuid.return_value = 1234 + with mock.patch("builtins.input", return_value="foo"): + auth.validate_login() + expected_data = { + "username": "foo", + "password": "bar", + "uid": 1234, + "notification_key": 1234, + "device_id": const.DEVICE_ID, + } + self.assertDictEqual(auth.data, expected_data) + + def test_full_init(self): + """Test full initialization.""" + login_data = { + "username": "foo", + "password": "bar", + "token": "token", + "host": "host", + "region_id": "region_id", + "client_id": "client_id", + "account_id": "account_id", + "uid": 1234, + "notification_key": 4321, + "device_id": "device_id", + } + auth = Auth(login_data) + self.assertEqual(auth.token, "token") + self.assertEqual(auth.host, "host") + self.assertEqual(auth.region_id, "region_id") + self.assertEqual(auth.client_id, "client_id") + self.assertEqual(auth.account_id, "account_id") + auth.validate_login() + self.assertDictEqual(auth.login_attributes, login_data) + + def test_bad_response_code(self): + """Check bad response code from server.""" + self.auth.is_errored = False + fake_resp = mresp.MockResponse({"code": 404}, 404) + with self.assertRaises(exceptions.ConnectionError): + self.auth.validate_response(fake_resp, True) + self.assertTrue(self.auth.is_errored) + + self.auth.is_errored = False + fake_resp = mresp.MockResponse({"code": 101}, 401) + with self.assertRaises(UnauthorizedError): + self.auth.validate_response(fake_resp, True) + self.assertTrue(self.auth.is_errored) + + def test_good_response_code(self): + """Check good response code from server.""" + fake_resp = mresp.MockResponse({"foo": "bar"}, 200) + self.auth.is_errored = True + self.assertEqual(self.auth.validate_response(fake_resp, True), {"foo": "bar"}) + self.assertFalse(self.auth.is_errored) + + def test_response_not_json(self): + """Check response when not json.""" + fake_resp = "foobar" + self.auth.is_errored = True + self.assertEqual(self.auth.validate_response(fake_resp, False), "foobar") + self.assertFalse(self.auth.is_errored) + + def test_response_bad_json(self): + """Check response when not json but expecting json.""" + self.auth.is_errored = False + with self.assertRaises(BlinkBadResponse): + self.auth.validate_response(None, True) + self.assertTrue(self.auth.is_errored) + + def test_header(self): + """Test header data.""" + self.auth.token = "bar" + expected_header = {"TOKEN_AUTH": "bar"} + self.assertDictEqual(self.auth.header, expected_header) + + def test_header_no_token(self): + """Test header without token.""" + self.auth.token = None + self.assertEqual(self.auth.header, None) + + @mock.patch("blinkpy.auth.Auth.validate_login", return_value=None) + @mock.patch("blinkpy.auth.api.request_login") + def test_login(self, mock_req, mock_validate): + """Test login handling.""" + fake_resp = mresp.MockResponse({"foo": "bar"}, 200) + mock_req.return_value = fake_resp + self.assertEqual(self.auth.login(), {"foo": "bar"}) + + @mock.patch("blinkpy.auth.Auth.validate_login", return_value=None) + @mock.patch("blinkpy.auth.api.request_login") + def test_login_bad_response(self, mock_req, mock_validate): + """Test login handling when bad response.""" + fake_resp = mresp.MockResponse({"foo": "bar"}, 404) + mock_req.return_value = fake_resp + self.auth.is_errored = False + with self.assertRaises(LoginError): + self.auth.login() + with self.assertRaises(TokenRefreshFailed): + self.auth.refresh_token() + self.assertTrue(self.auth.is_errored) + + @mock.patch("blinkpy.auth.Auth.login") + def test_refresh_token(self, mock_login): + """Test refresh token method.""" + mock_login.return_value = { + "region": {"tier": "test"}, + "authtoken": {"authtoken": "foobar"}, + "client": {"id": 1234}, + "account": {"id": 5678}, + } + self.assertTrue(self.auth.refresh_token()) + self.assertEqual(self.auth.region_id, "test") + self.assertEqual(self.auth.token, "foobar") + self.assertEqual(self.auth.client_id, 1234) + self.assertEqual(self.auth.account_id, 5678) + + @mock.patch("blinkpy.auth.Auth.login") + def test_refresh_token_failed(self, mock_login): + """Test refresh token failed.""" + mock_login.return_value = {} + self.auth.is_errored = False + with self.assertRaises(TokenRefreshFailed): + self.auth.refresh_token() + self.assertTrue(self.auth.is_errored) + + def test_check_key_required(self): + """Check key required method.""" + self.auth.login_response = {} + self.assertFalse(self.auth.check_key_required()) + + self.auth.login_response = {"client": {"verification_required": False}} + self.assertFalse(self.auth.check_key_required()) + + self.auth.login_response = {"client": {"verification_required": True}} + self.assertTrue(self.auth.check_key_required()) + + @mock.patch("blinkpy.auth.api.request_verify") + def test_send_auth_key(self, mock_req): + """Check sending of auth key.""" + mock_blink = MockBlink(None) + mock_req.return_value = mresp.MockResponse({"valid": True}, 200) + self.assertTrue(self.auth.send_auth_key(mock_blink, 1234)) + self.assertTrue(mock_blink.available) + + mock_req.return_value = mresp.MockResponse(None, 200) + self.assertFalse(self.auth.send_auth_key(mock_blink, 1234)) + + mock_req.return_value = mresp.MockResponse({}, 200) + self.assertFalse(self.auth.send_auth_key(mock_blink, 1234)) + + self.assertTrue(self.auth.send_auth_key(mock_blink, None)) + + @mock.patch("blinkpy.auth.api.request_verify") + def test_send_auth_key_fail(self, mock_req): + """Check handling of auth key failure.""" + mock_blink = MockBlink(None) + mock_req.return_value = mresp.MockResponse(None, 200) + self.assertFalse(self.auth.send_auth_key(mock_blink, 1234)) + mock_req.return_value = mresp.MockResponse({}, 200) + self.assertFalse(self.auth.send_auth_key(mock_blink, 1234)) + + @mock.patch("blinkpy.auth.Auth.validate_response") + @mock.patch("blinkpy.auth.Auth.refresh_token") + def test_query_retry(self, mock_refresh, mock_validate): + """Check handling of request retry.""" + self.auth.session = MockSession() + mock_validate.side_effect = [UnauthorizedError, "foobar"] + mock_refresh.return_value = True + self.assertEqual(self.auth.query(url="http://example.com"), "foobar") + + @mock.patch("blinkpy.auth.Auth.validate_response") + @mock.patch("blinkpy.auth.Auth.refresh_token") + def test_query_retry_failed(self, mock_refresh, mock_validate): + """Check handling of failed retry request.""" + self.auth.session = MockSession() + mock_validate.side_effect = [UnauthorizedError, BlinkBadResponse] + mock_refresh.return_value = True + self.assertEqual(self.auth.query(url="http://example.com"), None) + + mock_validate.side_effect = [UnauthorizedError, TokenRefreshFailed] + self.assertEqual(self.auth.query(url="http://example.com"), None) + + def test_default_session(self): + """Test default session creation.""" + sess = self.auth.create_session() + adapter = sess.adapters["https://"] + self.assertEqual(adapter.max_retries.total, 3) + self.assertEqual(adapter.max_retries.backoff_factor, 1) + self.assertEqual( + adapter.max_retries.status_forcelist, [429, 500, 502, 503, 504] + ) + + def test_custom_session_full(self): + """Test full custom session creation.""" + opts = {"backoff": 2, "retries": 10, "retry_list": [404]} + sess = self.auth.create_session(opts=opts) + adapter = sess.adapters["https://"] + self.assertEqual(adapter.max_retries.total, 10) + self.assertEqual(adapter.max_retries.backoff_factor, 2) + self.assertEqual(adapter.max_retries.status_forcelist, [404]) + + def test_custom_session_partial(self): + """Test partial custom session creation.""" + opts1 = {"backoff": 2} + opts2 = {"retries": 5} + opts3 = {"retry_list": [101, 202]} + sess1 = self.auth.create_session(opts=opts1) + sess2 = self.auth.create_session(opts=opts2) + sess3 = self.auth.create_session(opts=opts3) + adapt1 = sess1.adapters["https://"] + adapt2 = sess2.adapters["https://"] + adapt3 = sess3.adapters["https://"] + + self.assertEqual(adapt1.max_retries.total, 3) + self.assertEqual(adapt1.max_retries.backoff_factor, 2) + self.assertEqual(adapt1.max_retries.status_forcelist, [429, 500, 502, 503, 504]) + + self.assertEqual(adapt2.max_retries.total, 5) + self.assertEqual(adapt2.max_retries.backoff_factor, 1) + self.assertEqual(adapt2.max_retries.status_forcelist, [429, 500, 502, 503, 504]) + + self.assertEqual(adapt3.max_retries.total, 3) + self.assertEqual(adapt3.max_retries.backoff_factor, 1) + self.assertEqual(adapt3.max_retries.status_forcelist, [101, 202]) + + +class MockSession: + """Object to mock a session.""" + + def send(self, *args, **kwargs): + """Mock send function.""" + return None + + +class MockBlink: + """Object to mock basic blink class.""" + + def __init__(self, login_response): + """Initialize mock blink class.""" + self.available = False + self.login_response = login_response diff --git a/tests/test_blink_functions.py b/tests/test_blink_functions.py index cd1c0562..89a45dc6 100644 --- a/tests/test_blink_functions.py +++ b/tests/test_blink_functions.py @@ -5,11 +5,7 @@ from blinkpy import blinkpy from blinkpy.sync_module import BlinkSyncModule -from blinkpy.helpers.util import create_session, get_time -import tests.mock_responses as mresp - -USERNAME = "foobar" -PASSWORD = "deadbeef" +from blinkpy.helpers.util import get_time, BlinkURLHandler class MockSyncModule(BlinkSyncModule): @@ -34,42 +30,19 @@ def http_post(self, url): return self.return_value -@mock.patch("blinkpy.helpers.util.Session.send", side_effect=mresp.mocked_session_send) class TestBlinkFunctions(unittest.TestCase): """Test Blink and BlinkCamera functions in blinkpy.""" def setUp(self): """Set up Blink module.""" - self.blink = blinkpy.Blink(username=USERNAME, password=PASSWORD) - # pylint: disable=protected-access - self.blink._auth_header = {"Host": "test.url.tld", "TOKEN_AUTH": "foobar123"} - self.blink.urls = blinkpy.BlinkURLHandler("test") - self.blink.session = create_session() + self.blink = blinkpy.Blink() + self.blink.urls = BlinkURLHandler("test") def tearDown(self): """Clean up after test.""" self.blink = None - @mock.patch("blinkpy.login_handler.api.request_login") - def test_backup_url(self, req, mock_sess): - """Test backup login method.""" - json_resp = { - "authtoken": {"authtoken": "foobar123"}, - "networks": {"1234": {"name": "foobar", "onboarded": True}}, - } - bad_req = mresp.MockResponse({}, 404) - new_req = mresp.MockResponse(json_resp, 200) - req.side_effect = [bad_req, bad_req, new_req] - self.blink.login_handler.login_urls = ["test1", "test2", "test3"] - self.blink.login_handler.login(self.blink) - self.assertEqual(self.blink.login_handler.login_url, "test3") - - req.side_effect = [bad_req, new_req, bad_req] - self.blink.login_handler.login_urls = ["test1", "test2", "test3"] - self.blink.login_handler.login(self.blink) - self.assertEqual(self.blink.login_handler.login_url, "test2") - - def test_merge_cameras(self, mock_sess): + def test_merge_cameras(self): """Test merge camera functionality.""" first_dict = {"foo": "bar", "test": 123} next_dict = {"foobar": 456, "bar": "foo"} @@ -82,7 +55,7 @@ def test_merge_cameras(self, mock_sess): self.assertEqual(expected, result) @mock.patch("blinkpy.blinkpy.api.request_videos") - def test_download_video_exit(self, mock_req, mock_sess): + def test_download_video_exit(self, mock_req): """Test we exit method when provided bad response.""" blink = blinkpy.Blink() # pylint: disable=protected-access @@ -100,7 +73,7 @@ def test_download_video_exit(self, mock_req, mock_sess): self.assertEqual(dl_log.output, expected_log) @mock.patch("blinkpy.blinkpy.api.request_videos") - def test_parse_downloaded_items(self, mock_req, mock_sess): + def test_parse_downloaded_items(self, mock_req): """Test ability to parse downloaded items list.""" blink = blinkpy.Blink() # pylint: disable=protected-access @@ -125,7 +98,7 @@ def test_parse_downloaded_items(self, mock_req, mock_sess): self.assertEqual(dl_log.output, expected_log) @mock.patch("blinkpy.blinkpy.api.request_videos") - def test_parse_camera_not_in_list(self, mock_req, mock_sess): + def test_parse_camera_not_in_list(self, mock_req): """Test ability to parse downloaded items list.""" blink = blinkpy.Blink() # pylint: disable=protected-access diff --git a/tests/test_blinkpy.py b/tests/test_blinkpy.py index cfda5e74..821edf05 100644 --- a/tests/test_blinkpy.py +++ b/tests/test_blinkpy.py @@ -8,96 +8,58 @@ import unittest from unittest import mock -from blinkpy import api -from blinkpy.blinkpy import Blink -from blinkpy.sync_module import BlinkSyncModule -from blinkpy.login_handler import LoginHandler -from blinkpy.helpers.util import ( - http_req, - create_session, - BlinkException, - BlinkURLHandler, -) +from blinkpy.blinkpy import Blink, BlinkSetupError +from blinkpy.sync_module import BlinkOwl from blinkpy.helpers.constants import __version__ -import tests.mock_responses as mresp -USERNAME = "foobar" -PASSWORD = "deadbeef" - -@mock.patch("blinkpy.helpers.util.Session.send", side_effect=mresp.mocked_session_send) class TestBlinkSetup(unittest.TestCase): """Test the Blink class in blinkpy.""" def setUp(self): - """Set up Blink module.""" - self.blink = Blink(username=USERNAME, password=PASSWORD) - self.blink.sync["test"] = BlinkSyncModule(self.blink, "test", "1234", []) - self.blink.urls = BlinkURLHandler("test") - self.blink.session = create_session() + """Initialize blink test object.""" + self.blink = Blink() + self.blink.available = True def tearDown(self): - """Clean up after test.""" + """Cleanup blink test object.""" self.blink = None - def test_initialization(self, mock_sess): + def test_initialization(self): """Verify we can initialize blink.""" - self.assertEqual(self.blink.version, __version__) - self.assertEqual(self.blink.login_handler.data["username"], USERNAME) - self.assertEqual(self.blink.login_handler.data["password"], PASSWORD) - - def test_bad_request(self, mock_sess): - """Check that we raise an Exception with a bad request.""" - self.blink.session = create_session() - explog = "WARNING:blinkpy.helpers.util:" "Response from server: 200 - foo" - with self.assertRaises(BlinkException): - http_req(self.blink, reqtype="bad") - - with self.assertLogs() as logrecord: - http_req(self.blink, reqtype="post", is_retry=True) - self.assertEqual(logrecord.output, [explog]) - - def test_authentication(self, mock_sess): - """Check that we can authenticate Blink up properly.""" - authtoken = self.blink.get_auth_token()["TOKEN_AUTH"] - expected = mresp.LOGIN_RESPONSE["authtoken"]["authtoken"] - self.assertEqual(authtoken, expected) - - def test_reauthorization_attempt(self, mock_sess): - """Check that we can reauthorize after first unsuccessful attempt.""" - original_header = self.blink.get_auth_token() - # pylint: disable=protected-access - bad_header = {"Host": self.blink._host, "TOKEN_AUTH": "BADTOKEN"} - # pylint: disable=protected-access - self.blink._auth_header = bad_header - self.assertEqual(self.blink.auth_header, bad_header) - api.request_homescreen(self.blink) - self.assertEqual(self.blink.auth_header, original_header) - - def test_multiple_networks(self, mock_sess): + blink = Blink() + self.assertEqual(blink.version, __version__) + + def test_network_id_failure(self): + """Check that with bad network data a setup error is raised.""" + self.blink.networks = None + with self.assertRaises(BlinkSetupError): + self.blink.setup_network_ids() + + def test_multiple_networks(self): """Check that we handle multiple networks appropriately.""" self.blink.networks = { "0000": {"onboarded": False, "name": "foo"}, "5678": {"onboarded": True, "name": "bar"}, "1234": {"onboarded": False, "name": "test"}, } - self.blink.get_ids() + self.blink.setup_network_ids() self.assertTrue("5678" in self.blink.network_ids) - def test_multiple_onboarded_networks(self, mock_sess): + def test_multiple_onboarded_networks(self): """Check that we handle multiple networks appropriately.""" self.blink.networks = { "0000": {"onboarded": False, "name": "foo"}, "5678": {"onboarded": True, "name": "bar"}, "1234": {"onboarded": True, "name": "test"}, } - self.blink.get_ids() + self.blink.setup_network_ids() self.assertTrue("0000" not in self.blink.network_ids) self.assertTrue("5678" in self.blink.network_ids) self.assertTrue("1234" in self.blink.network_ids) @mock.patch("blinkpy.blinkpy.time.time") - def test_throttle(self, mock_time, mock_sess): + def test_throttle(self, mock_time): """Check throttling functionality.""" now = self.blink.refresh_rate + 1 mock_time.return_value = now @@ -113,28 +75,31 @@ def test_throttle(self, mock_time, mock_sess): self.assertEqual(self.blink.check_if_ok_to_update(), False) self.assertEqual(self.blink.last_refresh, now) - def test_sync_case_insensitive_dict(self, mock_sess): + def test_sync_case_insensitive_dict(self): """Check that we can access sync modules ignoring case.""" - self.assertEqual(self.blink.sync["test"].name, "test") - self.assertEqual(self.blink.sync["TEST"].name, "test") - - @mock.patch("blinkpy.api.request_login") - def test_unexpected_login(self, mock_login, mock_sess): - """Check that we appropriately handle unexpected login info.""" - mock_login.return_value = None - self.assertFalse(self.blink.get_auth_token()) + self.blink.sync["test"] = 1234 + self.assertEqual(self.blink.sync["test"], 1234) + self.assertEqual(self.blink.sync["TEST"], 1234) + self.assertEqual(self.blink.sync["tEsT"], 1234) + @mock.patch("blinkpy.api.request_camera_usage") @mock.patch("blinkpy.api.request_homescreen") - def test_get_cameras(self, mock_home, mock_sess): + def test_setup_cameras(self, mock_home, mock_req): """Check retrieval of camera information.""" - mock_home.return_value = { - "cameras": [ - {"name": "foo", "network_id": 1234, "id": 5678}, - {"name": "bar", "network_id": 1234, "id": 5679}, - {"name": "test", "network_id": 4321, "id": 0000}, + mock_home.return_value = {} + mock_req.return_value = { + "networks": [ + { + "network_id": 1234, + "cameras": [ + {"id": 5678, "name": "foo"}, + {"id": 5679, "name": "bar"}, + ], + }, + {"network_id": 4321, "cameras": [{"id": 0000, "name": "test"}]}, ] } - result = self.blink.get_cameras() + result = self.blink.setup_camera_list() self.assertEqual( result, { @@ -143,29 +108,190 @@ def test_get_cameras(self, mock_home, mock_sess): }, ) - @mock.patch("blinkpy.api.request_homescreen") - def test_get_cameras_failure(self, mock_home, mock_sess): - """Check that on failure we initialize empty info and move on.""" + @mock.patch("blinkpy.api.request_camera_usage") + def test_setup_cameras_failure(self, mock_home): + """Check that on failure we raise a setup error.""" mock_home.return_value = {} - result = self.blink.get_cameras() - self.assertEqual(result, {}) - - @mock.patch.object(LoginHandler, "send_auth_key") - @mock.patch.object(Blink, "setup_post_verify") - def test_startup_prompt(self, mock_send_key, mock_verify, mock_sess): - """Test startup logic with command-line prompt.""" - mock_send_key.return_value = True - mock_verify.return_value = True - self.blink.no_prompt = False + with self.assertRaises(BlinkSetupError): + self.blink.setup_camera_list() + mock_home.return_value = None + with self.assertRaises(BlinkSetupError): + self.blink.setup_camera_list() + + def test_setup_urls(self): + """Check setup of URLS.""" + self.blink.auth.region_id = "test" + self.blink.setup_urls() + self.assertEqual(self.blink.urls.subdomain, "rest-test") + + def test_setup_urls_failure(self): + """Check that on failure we raise a setup error.""" + self.blink.auth.region_id = None + with self.assertRaises(BlinkSetupError): + self.blink.setup_urls() + + @mock.patch("blinkpy.api.request_networks") + def test_setup_networks(self, mock_networks): + """Check setup of networks.""" + mock_networks.return_value = {"summary": "foobar"} + self.blink.setup_networks() + self.assertEqual(self.blink.networks, "foobar") + + @mock.patch("blinkpy.api.request_networks") + def test_setup_networks_failure(self, mock_networks): + """Check that on failure we raise a setup error.""" + mock_networks.return_value = {} + with self.assertRaises(BlinkSetupError): + self.blink.setup_networks() + mock_networks.return_value = None + with self.assertRaises(BlinkSetupError): + self.blink.setup_networks() + + @mock.patch("blinkpy.blinkpy.Auth.send_auth_key") + def test_setup_prompt_2fa(self, mock_key): + """Test setup with 2fa prompt.""" + self.blink.auth.data["username"] = "foobar" self.blink.key_required = True - self.blink.available = True - with mock.patch("builtins.input", return_value="1234"): - self.blink.start() + mock_key.return_value = True + with mock.patch("builtins.input", return_value="foo"): + self.blink.setup_prompt_2fa() self.assertFalse(self.blink.key_required) + mock_key.return_value = False + with mock.patch("builtins.input", return_value="foo"): + self.blink.setup_prompt_2fa() + self.assertTrue(self.blink.key_required) - def test_startup_no_prompt(self, mock_sess): - """Test startup with no_prompt flag set.""" + @mock.patch("blinkpy.blinkpy.Blink.setup_camera_list") + @mock.patch("blinkpy.api.request_networks") + @mock.patch("blinkpy.blinkpy.Blink.setup_owls") + def test_setup_post_verify(self, mock_owl, mock_networks, mock_camera): + """Test setup after verification.""" + self.blink.available = False self.blink.key_required = True - self.blink.no_prompt = True - self.blink.start() - self.assertTrue(self.blink.key_required) + mock_owl.return_value = True + mock_networks.return_value = { + "summary": {"foo": {"onboarded": False, "name": "bar"}} + } + mock_camera.return_value = [] + self.assertTrue(self.blink.setup_post_verify()) + self.assertTrue(self.blink.available) + self.assertFalse(self.blink.key_required) + + @mock.patch("blinkpy.api.request_networks") + def test_setup_post_verify_failure(self, mock_networks): + """Test failed setup after verification.""" + self.blink.available = False + mock_networks.return_value = {} + self.assertFalse(self.blink.setup_post_verify()) + self.assertFalse(self.blink.available) + + def test_merge_cameras(self): + """Test merging of cameras.""" + self.blink.sync = { + "foo": MockSync({"test": 123, "foo": "bar"}), + "bar": MockSync({"fizz": "buzz", "bar": "foo"}), + } + combined = self.blink.merge_cameras() + self.assertEqual(combined["test"], 123) + self.assertEqual(combined["foo"], "bar") + self.assertEqual(combined["fizz"], "buzz") + self.assertEqual(combined["bar"], "foo") + + @mock.patch("blinkpy.blinkpy.BlinkOwl.start") + def test_initialize_blink_minis(self, mock_start): + """Test blink mini initialization.""" + mock_start.return_value = True + self.blink.homescreen = { + "owls": [ + { + "enabled": False, + "id": 1, + "name": "foo", + "network_id": 2, + "onboarded": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "1234", + }, + { + "enabled": True, + "id": 3, + "name": "bar", + "network_id": 4, + "onboarded": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abcd", + }, + ] + } + self.blink.sync = {} + self.blink.setup_owls() + self.assertEqual(self.blink.sync["foo"].__class__, BlinkOwl) + self.assertEqual(self.blink.sync["bar"].__class__, BlinkOwl) + self.assertEqual(self.blink.sync["foo"].arm, False) + self.assertEqual(self.blink.sync["bar"].arm, True) + self.assertEqual(self.blink.sync["foo"].name, "foo") + self.assertEqual(self.blink.sync["bar"].name, "bar") + + def test_blink_mini_cameras_returned(self): + """Test that blink mini cameras are found if attached to sync module.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { + "owls": [ + { + "id": 1, + "name": "foo", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abc123", + } + ] + } + result = self.blink.setup_owls() + self.assertEqual(self.blink.network_ids, ["1234"]) + self.assertEqual( + result, [{"1234": {"name": "foo", "id": "1234", "type": "mini"}}] + ) + + self.blink.no_owls = True + self.blink.network_ids = [] + self.blink.get_homescreen() + result = self.blink.setup_owls() + self.assertEqual(self.blink.network_ids, []) + self.assertEqual(result, []) + + @mock.patch("blinkpy.api.request_camera_usage") + def test_blink_mini_attached_to_sync(self, mock_usage): + """Test that blink mini cameras are properly attached to sync module.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { + "owls": [ + { + "id": 1, + "name": "foo", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abc123", + } + ] + } + mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} + result = self.blink.setup_camera_list() + self.assertEqual( + result, {"1234": [{"name": "foo", "id": "1234", "type": "mini"}]} + ) + + +class MockSync: + """Mock sync module class.""" + + def __init__(self, cameras): + """Initialize fake class.""" + self.cameras = cameras diff --git a/tests/test_cameras.py b/tests/test_cameras.py index ca2fbbf2..25bdaf06 100644 --- a/tests/test_cameras.py +++ b/tests/test_cameras.py @@ -8,14 +8,11 @@ import unittest from unittest import mock -from blinkpy import blinkpy -from blinkpy.helpers.util import create_session, BlinkURLHandler +from blinkpy.blinkpy import Blink +from blinkpy.helpers.util import BlinkURLHandler from blinkpy.sync_module import BlinkSyncModule -from blinkpy.camera import BlinkCamera -import tests.mock_responses as mresp +from blinkpy.camera import BlinkCamera, BlinkCameraMini -USERNAME = "foobar" -PASSWORD = "deadbeef" CAMERA_CFG = { "camera": [ @@ -29,20 +26,13 @@ } -@mock.patch("blinkpy.helpers.util.Session.send", side_effect=mresp.mocked_session_send) +@mock.patch("blinkpy.auth.Auth.query") class TestBlinkCameraSetup(unittest.TestCase): """Test the Blink class in blinkpy.""" def setUp(self): """Set up Blink module.""" - self.blink = blinkpy.Blink(username=USERNAME, password=PASSWORD) - header = { - "Host": "abc.zxc", - "TOKEN_AUTH": mresp.LOGIN_RESPONSE["authtoken"]["authtoken"], - } - # pylint: disable=protected-access - self.blink._auth_header = header - self.blink.session = create_session() + self.blink = Blink() self.blink.urls = BlinkURLHandler("test") self.blink.sync["test"] = BlinkSyncModule(self.blink, "test", 1234, []) self.camera = BlinkCamera(self.blink.sync["test"]) @@ -52,8 +42,9 @@ def setUp(self): def tearDown(self): """Clean up after test.""" self.blink = None + self.camera = None - def test_camera_update(self, mock_sess): + def test_camera_update(self, mock_resp): """Test that we can properly update camera properties.""" config = { "name": "new", @@ -71,8 +62,8 @@ def test_camera_update(self, mock_sess): self.camera.sync.last_record = { "new": {"clip": "/test.mp4", "time": "1970-01-01T00:00:00"} } - mock_sess.side_effect = [ - mresp.MockResponse({"temp": 71}, 200), + mock_resp.side_effect = [ + {"temp": 71}, "test", "foobar", ] @@ -96,9 +87,9 @@ def test_camera_update(self, mock_sess): self.assertEqual(self.camera.image_from_cache, "test") self.assertEqual(self.camera.video_from_cache, "foobar") - def test_no_thumbnails(self, mock_sess): + def test_no_thumbnails(self, mock_resp): """Tests that thumbnail is 'None' if none found.""" - mock_sess.return_value = "foobar" + mock_resp.return_value = "foobar" self.camera.last_record = ["1"] config = { "name": "new", @@ -126,16 +117,13 @@ def test_no_thumbnails(self, mock_sess): "WARNING:blinkpy.camera:Could not retrieve calibrated " "temperature." ), - ( - "WARNING:blinkpy.camera:Could not find thumbnail for camera new" - "\nNoneType: None" - ), + ("WARNING:blinkpy.camera:Could not find thumbnail for camera new"), ], ) - def test_no_video_clips(self, mock_sess): + def test_no_video_clips(self, mock_resp): """Tests that we still proceed with camera setup with no videos.""" - mock_sess.return_value = "foobar" + mock_resp.return_value = "foobar" config = { "name": "new", "id": 1234, @@ -152,3 +140,39 @@ def test_no_video_clips(self, mock_sess): self.camera.update(config, force_cache=True) self.assertEqual(self.camera.clip, None) self.assertEqual(self.camera.video_from_cache, None) + + def test_camera_arm_status(self, mock_resp): + """Test arming and disarming camera.""" + self.camera.motion_enabled = None + self.assertFalse(self.camera.arm) + self.camera.motion_enabled = False + self.assertFalse(self.camera.arm) + self.camera.motion_enabled = True + self.assertTrue(self.camera.arm) + + @mock.patch("blinkpy.camera.api.request_motion_detection_enable") + @mock.patch("blinkpy.camera.api.request_motion_detection_disable") + def test_motion_detection_enable_disable(self, mock_dis, mock_en, mock_rep): + """Test setting motion detection enable properly.""" + mock_dis.return_value = "disable" + mock_en.return_value = "enable" + self.assertEqual(self.camera.set_motion_detect(True), "enable") + self.assertEqual(self.camera.set_motion_detect(False), "disable") + + def test_missing_attributes(self, mock_resp): + """Test that attributes return None if missing.""" + self.camera.temperature = None + self.camera.serial = None + attr = self.camera.attributes + self.assertEqual(attr["serial"], None) + self.assertEqual(attr["temperature"], None) + self.assertEqual(attr["temperature_c"], None) + + def test_mini_missing_attributes(self, mock_resp): + """Test that attributes return None if missing.""" + camera = BlinkCameraMini(self.blink.sync) + self.blink.sync.network_id = None + self.blink.sync.name = None + attr = camera.attributes + for key in attr: + self.assertEqual(attr[key], None) diff --git a/tests/test_login_handler.py b/tests/test_login_handler.py deleted file mode 100644 index 11bfc16b..00000000 --- a/tests/test_login_handler.py +++ /dev/null @@ -1,135 +0,0 @@ -"""Test login handler.""" - -import unittest -from unittest import mock -from blinkpy.login_handler import LoginHandler -import tests.mock_responses as mresp - -USERNAME = "foobar" -PASSWORD = "deadbeef" - - -@mock.patch("blinkpy.helpers.util.Session.send", side_effect=mresp.mocked_session_send) -class TestLoginHandler(unittest.TestCase): - """Test the LoginHandler class in blinkpy.""" - - def setUp(self): - """Set up Login Handler.""" - self.login_handler = LoginHandler() - - def tearDown(self): - """Clean up after test.""" - self.login_handler = None - - @mock.patch("blinkpy.login_handler.getpass") - def test_manual_login(self, getpwd, mock_sess): - """Check that we can manually use the login() function.""" - getpwd.return_value = PASSWORD - with mock.patch("builtins.input", return_value=USERNAME): - self.assertTrue(self.login_handler.check_login()) - self.assertEqual(self.login_handler.data["username"], USERNAME) - self.assertEqual(self.login_handler.data["password"], PASSWORD) - - def test_no_cred_file(self, mock_sess): - """Check that we return false when cred file doesn't exist.""" - self.login_handler.cred_file = "/tmp/fake.file" - self.assertFalse(self.login_handler.check_cred_file()) - - @mock.patch("blinkpy.login_handler.isfile") - def test_exit_on_missing_json(self, mockisfile, mock_sess): - """Test that we fail on missing json data.""" - self.login_handler.cred_file = "/tmp/fake.file" - mockisfile.return_value = True - with mock.patch("builtins.open", mock.mock_open(read_data="{}")): - self.assertFalse(self.login_handler.check_cred_file()) - - @mock.patch("blinkpy.login_handler.json.load") - @mock.patch("blinkpy.login_handler.isfile") - def test_cred_file(self, mockisfile, mockjson, mock_sess): - """Test that loading credential file works.""" - self.login_handler.cred_file = "/tmp/fake.file" - mockjson.return_value = {"username": "foo", "password": "bar"} - mockisfile.return_value = True - with mock.patch("builtins.open", mock.mock_open(read_data="")): - self.assertTrue(self.login_handler.check_cred_file()) - self.assertEqual(self.login_handler.data["username"], "foo") - self.assertEqual(self.login_handler.data["password"], "bar") - - def test_bad_response(self, mock_sess): - """Check bad response from server.""" - self.assertFalse(self.login_handler.validate_response(None, None)) - - def test_bad_response_code(self, mock_sess): - """Check bad response code from server.""" - fake_resp = mresp.MockResponse(None, 404) - self.assertFalse(self.login_handler.validate_response(None, fake_resp)) - - def test_good_response_code(self, mock_sess): - """Check good response code from server.""" - fake_resp = mresp.MockResponse(None, 200) - self.assertTrue(self.login_handler.validate_response(None, fake_resp)) - - @mock.patch("blinkpy.login_handler.util.gen_uid") - def test_check_keys_no_persist(self, mock_uid, mock_sess): - """Check key generation.""" - uid_value = "abc123" - mock_uid.return_value = "abc123" - self.login_handler.persist_key = None - data = self.login_handler.check_keys() - self.assertEqual(data["uid"], uid_value) - self.assertEqual(data["notification_key"], uid_value) - - @mock.patch("blinkpy.login_handler.util.gen_uid") - @mock.patch("blinkpy.login_handler.json.load") - @mock.patch("blinkpy.login_handler.isfile") - def test_check_keys_persist(self, mockisfile, mockjson, mock_uid, mock_sess): - """Check key load from file.""" - uid_value = "abc123" - mock_file = {"uid": "321cba", "notification_key": "foobar123"} - mock_uid.return_value = uid_value - mockjson.return_value = mock_file - mockisfile.return_value = True - self.login_handler.persist_key = True - data = self.login_handler.check_keys() - self.assertEqual(mock_file["uid"], data["uid"]) - self.assertEqual(mock_file["notification_key"], data["notification_key"]) - - def test_check_key_required(self, mock_sess): - """Check key required method.""" - response_true = {"client": {"verification_required": True}} - response_false = {"client": {"verification_required": False}} - response_nokey = {} - - mock_blink = MockBlink(response_nokey) - self.assertFalse(self.login_handler.check_key_required(mock_blink)) - - mock_blink = MockBlink(response_false) - self.assertFalse(self.login_handler.check_key_required(mock_blink)) - - mock_blink = MockBlink(response_true) - self.assertTrue(self.login_handler.check_key_required(mock_blink)) - - @mock.patch("blinkpy.login_handler.api.request_verify") - def test_send_auth_key(self, mock_req, mock_sess): - """Check sending of auth key.""" - mock_blink = MockBlink(None) - mock_req.return_value = mresp.MockResponse({"valid": True}, 200) - self.assertTrue(self.login_handler.send_auth_key(mock_blink, 1234)) - self.assertTrue(mock_blink.available) - - mock_req.return_value = mresp.MockResponse(None, 200) - self.assertFalse(self.login_handler.send_auth_key(mock_blink, 1234)) - - mock_req.return_value = mresp.MockResponse({}, 200) - self.assertFalse(self.login_handler.send_auth_key(mock_blink, 1234)) - - self.assertTrue(self.login_handler.send_auth_key(mock_blink, None)) - - -class MockBlink: - """Object to mock basic blink class.""" - - def __init__(self, login_response): - """Initialize mock blink class.""" - self.available = False - self.login_response = login_response diff --git a/tests/test_sync_module.py b/tests/test_sync_module.py index 5a8912f8..fc18a51e 100644 --- a/tests/test_sync_module.py +++ b/tests/test_sync_module.py @@ -2,27 +2,21 @@ import unittest from unittest import mock -from blinkpy import blinkpy -from blinkpy.sync_module import BlinkSyncModule -from blinkpy.camera import BlinkCamera +from blinkpy.blinkpy import Blink +from blinkpy.helpers.util import BlinkURLHandler +from blinkpy.sync_module import BlinkSyncModule, BlinkOwl +from blinkpy.camera import BlinkCamera, BlinkCameraMini -USERNAME = "foobar" -PASSWORD = "deadbeef" - -@mock.patch("blinkpy.api.http_req") +@mock.patch("blinkpy.auth.Auth.query") class TestBlinkSyncModule(unittest.TestCase): """Test BlinkSyncModule functions in blinkpy.""" def setUp(self): """Set up Blink module.""" - self.blink = blinkpy.Blink( - username=USERNAME, password=PASSWORD, motion_interval=0 - ) - # pylint: disable=protected-access - self.blink._auth_header = {"Host": "test.url.tld", "TOKEN_AUTH": "foobar123"} + self.blink = Blink(motion_interval=0) self.blink.last_refresh = 0 - self.blink.urls = blinkpy.BlinkURLHandler("test") + self.blink.urls = BlinkURLHandler("test") self.blink.sync["test"] = BlinkSyncModule(self.blink, "test", "1234", []) self.camera = BlinkCamera(self.blink.sync) self.mock_start = [ @@ -48,16 +42,68 @@ def tearDown(self): self.camera = None self.mock_start = None + def test_bad_status(self, mock_resp): + """Check that we mark module unavaiable on bad status.""" + self.blink.sync["test"].status = None + self.blink.sync["test"].available = True + self.assertFalse(self.blink.sync["test"].online) + self.assertFalse(self.blink.sync["test"].available) + + def test_bad_arm(self, mock_resp): + """Check that we mark module unavaiable if bad arm status.""" + self.blink.sync["test"].network_info = None + self.blink.sync["test"].available = True + self.assertEqual(self.blink.sync["test"].arm, None) + self.assertFalse(self.blink.sync["test"].available) + self.blink.sync["test"].network_info = {} + self.blink.sync["test"].available = True + self.assertEqual(self.blink.sync["test"].arm, None) + self.assertFalse(self.blink.sync["test"].available) + def test_get_events(self, mock_resp): """Test get events function.""" mock_resp.return_value = {"event": True} self.assertEqual(self.blink.sync["test"].get_events(), True) + def test_get_events_fail(self, mock_resp): + """Test handling of failed get events function.""" + mock_resp.return_value = None + self.assertFalse(self.blink.sync["test"].get_events()) + mock_resp.return_value = {} + self.assertFalse(self.blink.sync["test"].get_events()) + def test_get_camera_info(self, mock_resp): """Test get camera info function.""" mock_resp.return_value = {"camera": ["foobar"]} self.assertEqual(self.blink.sync["test"].get_camera_info("1234"), "foobar") + def test_get_camera_info_fail(self, mock_resp): + """Test handling of failed get camera info function.""" + mock_resp.return_value = None + self.assertEqual(self.blink.sync["test"].get_camera_info("1"), {}) + mock_resp.return_value = {} + self.assertEqual(self.blink.sync["test"].get_camera_info("1"), {}) + mock_resp.return_value = {"camera": None} + self.assertEqual(self.blink.sync["test"].get_camera_info("1"), {}) + + def test_get_network_info(self, mock_resp): + """Test network retrieval.""" + mock_resp.return_value = {"network": {"sync_module_error": False}} + self.assertTrue(self.blink.sync["test"].get_network_info()) + mock_resp.return_value = {"network": {"sync_module_error": True}} + self.assertFalse(self.blink.sync["test"].get_network_info()) + + def test_get_network_info_failure(self, mock_resp): + """Test failed network retrieval.""" + mock_resp.return_value = {} + self.blink.sync["test"].available = True + self.assertFalse(self.blink.sync["test"].get_network_info()) + self.assertFalse(self.blink.sync["test"].available) + self.blink.sync["test"].available = True + mock_resp.return_value = None + self.assertFalse(self.blink.sync["test"].get_network_info()) + self.assertFalse(self.blink.sync["test"].available) + def test_check_new_videos_startup(self, mock_resp): """Test that check_new_videos does not block startup.""" sync_module = self.blink.sync["test"] @@ -224,3 +270,25 @@ def test_missing_camera_info(self, mock_resp): self.mock_start[5] = {} self.blink.sync["test"].start() self.assertEqual(self.blink.sync["test"].cameras, {"foo": None}) + + def test_sync_attributes(self, mock_resp): + """Test sync attributes.""" + self.assertEqual(self.blink.sync["test"].attributes["name"], "test") + self.assertEqual(self.blink.sync["test"].attributes["network_id"], "1234") + + def test_owl_start(self, mock_resp): + """Test owl camera instantiation.""" + response = { + "name": "foo", + "id": 2, + "serial": "foobar123", + "enabled": True, + "network_id": 1, + "thumbnail": "/foo/bar", + } + self.blink.last_refresh = None + self.blink.homescreen = {"owls": [response]} + owl = BlinkOwl(self.blink, "foo", 1234, response) + self.assertTrue(owl.start()) + self.assertTrue("foo" in owl.cameras) + self.assertEqual(owl.cameras["foo"].__class__, BlinkCameraMini) diff --git a/tests/test_util.py b/tests/test_util.py index f048c35b..ee157cd6 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -3,7 +3,7 @@ import unittest from unittest import mock import time -from blinkpy.helpers.util import Throttle, BlinkURLHandler, time_to_seconds +from blinkpy.helpers.util import json_load, Throttle, time_to_seconds class TestUtil(unittest.TestCase): @@ -67,6 +67,25 @@ def test(self): self.assertEqual(throttled(), True) self.assertEqual(throttled(), None) + def test_throttle_multiple_objects(self): + """Test that function is throttled even if called by multiple objects.""" + + @Throttle(seconds=5) + def test_throttle_method(): + return True + + class Tester: + """A tester class for throttling.""" + + def test(self): + """Test function for throttle.""" + return test_throttle_method() + + tester1 = Tester() + tester2 = Tester() + self.assertEqual(tester1.test(), True) + self.assertEqual(tester2.test(), None) + def test_throttle_on_two_methods(self): """Test that throttle works for multiple methods.""" @@ -101,16 +120,15 @@ def test2(self): self.assertEqual(tester.test1(), None) self.assertEqual(tester.test2(), True) - def test_legacy_subdomains(self): - """Test that subdomain can be set to legacy mode.""" - urls = BlinkURLHandler("test") - self.assertEqual(urls.subdomain, "rest-test") - urls = BlinkURLHandler("test", legacy=True) - self.assertEqual(urls.subdomain, "rest.test") - def test_time_to_seconds(self): """Test time to seconds conversion.""" correct_time = "1970-01-01T00:00:05+00:00" wrong_time = "1/1/1970 00:00:03" self.assertEqual(time_to_seconds(correct_time), 5) self.assertFalse(time_to_seconds(wrong_time)) + + def test_json_load_bad_data(self): + """Check that bad file is handled.""" + self.assertEqual(json_load("fake.file"), None) + with mock.patch("builtins.open", mock.mock_open(read_data="")): + self.assertEqual(json_load("fake.file"), None) diff --git a/tox.ini b/tox.ini index c9b8f427..e82717a9 100644 --- a/tox.ini +++ b/tox.ini @@ -41,8 +41,7 @@ commands = flake8 blinkpy tests app pydocstyle blinkpy tests app black --check --diff blinkpy tests app - rst-lint README.rst - rst-lint CHANGES.rst + rst-lint README.rst CHANGES.rst CONTRIBUTING.rst [testenv:build] recreate = True