diff --git a/.gitignore b/.gitignore index 15c2be2..41d5ee4 100644 --- a/.gitignore +++ b/.gitignore @@ -140,142 +140,3 @@ test2.py test3.py test4.ipynb secrets.sh - - -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -pip-wheel-metadata/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -.python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# Diaspora Web Service and Action Provider -secrets.sh -nohup.out -*.pyc -*venv* -.DS_Store - diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml deleted file mode 100644 index b658b86..0000000 --- a/.pre-commit-config.yaml +++ /dev/null @@ -1,40 +0,0 @@ -ci: - autofix_prs: false -repos: - - repo: 'https://github.com/pre-commit/pre-commit-hooks' - rev: v4.6.0 - hooks: - - id: mixed-line-ending - - id: trailing-whitespace - - id: end-of-file-fixer - - id: check-added-large-files - - id: check-docstring-first - - id: check-json - - id: check-yaml - - id: check-toml - - id: check-merge-conflict - - id: name-tests-test - - repo: 'https://github.com/codespell-project/codespell' - rev: v2.3.0 - hooks: - - id: codespell - - repo: https://github.com/nbQA-dev/nbQA - rev: 1.8.5 - hooks: - - id: nbqa-ruff - - repo: https://github.com/kynan/nbstripout - rev: 0.7.1 - hooks: - - id: nbstripout - - repo: 'https://github.com/charliermarsh/ruff-pre-commit' - rev: v0.4.5 - hooks: - - id: ruff - args: - - '--fix' - - id: ruff-format - - repo: 'https://github.com/pre-commit/mirrors-mypy' - rev: v1.10.0 - hooks: - - id: mypy - additional_dependencies: [] diff --git a/DiasporaDemo.ipynb b/DiasporaDemo.ipynb index 03033dc..6891a65 100644 --- a/DiasporaDemo.ipynb +++ b/DiasporaDemo.ipynb @@ -1,1030 +1,1029 @@ { - "cells": [ - { - "cell_type": "markdown", - "metadata": { - "id": "Y0g-XCxrVc8m" - }, - "source": [ - "# Diaspora Event SDK - Demo\n", - "\n", - "[GitHub Repository](https://github.com/globus-labs/diaspora-event-sdk/tree/main)\n", - "\n", - "[QuickStart Guide](https://github.com/globus-labs/diaspora-event-sdk/blob/main/docs/quickstart.md)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Available Methods to Web Service APIs\n", - "\n", - "| Function and Arguments | Description |\n", - "|------------------------|-------------|\n", - "| **MSK Credential Management** | |\n", - "| `create_key()` | Revokes existing keys, generates a new key, and updates the token storage with the newly created key and the Diaspora endpoint. |\n", - "| **MSK Topic Management** | |\n", - "| `list_topics()` | Returns a list of topics currently registered under the user's account. |\n", - "| `register_topic(topic)` | Registers a new topic the user's account with permissions to read, write, and describe the topic. |\n", - "| `unregister_topic(topic)` | Unregisters a topic from a user's account, but all existing events within the topic are unaffected. |\n", - "| `get_topic_configs(topic)` | Retrieves the current configurations for a registered topic. |\n", - "| `update_topic_configs(topic, configs)` | Updates the configurations for a registered topic. |\n", - "| `update_topic_partitions(topic, new_partitions)` | Increases the number of partitions for a given topic to the specified new partition count. |\n", - "| `reset_topic(topic)` | Deletes and recreates the topic, removing all messages and restoring the topic to the default configurations. |\n", - "| `register_topic_for_user(topic, user)` | Authorizes another user to access a registered topic under the invoker's account. |\n", - "| `unregister_topic_for_user(topic, user)` | Removes access permissions for another user from a registered topic under the invoker's account. |\n", - "| `list_topic_users(topic)` | Returns a list of users that have access to the topic. |\n", - "| **Lambda Function Management** | |\n", - "| `list_triggers()` | Retrieves a list of triggers associated created under the user's account, showing each trigger's configurations and UUID. |\n", - "| `create_trigger(topic, trigger, trigger_configs, invoke_configs)` | Creates a new trigger under the user's account with specific function and invocation configurations. |\n", - "| `delete_trigger(topic, trigger)` | Deletes a trigger and related AWS resources, while the associated topic remains unaffected.|\n", - "| `update_trigger(trigger_uuid, invoke_configs)` | Updates invocation configurations of an existing trigger, identified by its unique trigger UUID. |\n", - "| `list_log_streams(trigger)` | List log streams of a trigger under the user's account.|\n", - "| `get_log_events(trigger, stream)` | Get events in a particular log stream of a trigger under the user's account.|" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "vwoqvNy79GOY" - }, - "source": [ - "## Install the SDK and dependencies " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "6dEQb3pMj4T2" - }, - "outputs": [], - "source": [ - "%pip install -e '.[kafka-python]'" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import base64\n", - "import json\n", - "import os\n", - "import random\n", - "import time\n", - "from pprint import pprint\n", - "\n", - "from diaspora_event_sdk import Client as GlobusClient\n", - "from diaspora_event_sdk import KafkaConsumer, KafkaProducer, block_until_ready" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "DFrhINF_oA_s" - }, - "source": [ - "## Perform client login and print the user's OpenID" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "Y0g-XCxrVc8m" + }, + "source": [ + "# Diaspora Event SDK - Demo\n", + "\n", + "[GitHub Repository](https://github.com/globus-labs/diaspora-event-sdk/tree/main)\n", + "\n", + "[QuickStart Guide](https://github.com/globus-labs/diaspora-event-sdk/blob/main/docs/quickstart.md)" + ] }, - "id": "affKKJetnsel", - "outputId": "d41cadb1-64f4-42ea-d275-7df30227ff13" - }, - "outputs": [], - "source": [ - "c = GlobusClient()\n", - "print(\"User's OpenID:\", c.subject_openid)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "RIZ72NHUvzMy" - }, - "source": [ - "## 1 Create a cluster authentication credential and verify cluster connection" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "KC-Gdkc_xvNy" - }, - "source": [ - "### 1.1 Create and retrieve a cluster authentication credential" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Available Methods to Web Service APIs (version 0.3.0)\n", + "\n", + "| Function and Arguments | Description |\n", + "|------------------------|-------------|\n", + "| **MSK Credential Management** | |\n", + "| `create_key()` | Revokes existing keys, generates a new key, and updates the token storage with the newly created key and the Diaspora endpoint. |\n", + "| **MSK Topic Management** | |\n", + "| `list_topics()` | Returns a list of topics currently registered under the user's account. |\n", + "| `register_topic(topic)` | Registers a new topic the user's account with permissions to read, write, and describe the topic. |\n", + "| `unregister_topic(topic)` | Unregisters a topic from a user's account, but all existing events within the topic are unaffected. |\n", + "| `get_topic_configs(topic)` | Retrieves the current configurations for a registered topic. |\n", + "| `update_topic_configs(topic, configs)` | Updates the configurations for a registered topic. |\n", + "| `update_topic_partitions(topic, new_partitions)` | Increases the number of partitions for a given topic to the specified new partition count. |\n", + "| `reset_topic(topic)` | Deletes and recreates the topic, removing all messages and restoring the topic to the default configurations. |\n", + "| `register_topic_for_user(topic, user)` | Authorizes another user to access a registered topic under the invoker's account. |\n", + "| `unregister_topic_for_user(topic, user)` | Removes access permissions for another user from a registered topic under the invoker's account. |\n", + "| `list_topic_users(topic)` | Returns a list of users that have access to the topic. |\n", + "| **Lambda Function Management** | |\n", + "| `list_triggers()` | Retrieves a list of triggers associated created under the user's account, showing each trigger's configurations and UUID. |\n", + "| `create_trigger(topic, trigger, trigger_configs, invoke_configs)` | Creates a new trigger under the user's account with specific function and invocation configurations. |\n", + "| `delete_trigger(topic, trigger)` | Deletes a trigger and related AWS resources, while the associated topic remains unaffected.|\n", + "| `update_trigger(trigger_uuid, invoke_configs)` | Updates invocation configurations of an existing trigger, identified by its unique trigger UUID. |\n", + "| `list_log_streams(trigger)` | List log streams of a trigger under the user's account.|\n", + "| `get_log_events(trigger, stream)` | Get events in a particular log stream of a trigger under the user's account.|" + ] }, - "id": "AuEP_SyqomC4", - "outputId": "c0ea107a-ca1e-4670-c66a-0f88994ec9fd" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " call the Web Service API and store credentials and endpoint address in a local token storage.\n", - " Note: call `create_key` at another machine invalidate the retrieved credential at this machine\n", - " expected return: {'access_key': ..., 'secret_key': ..., 'endpoint': ...}\n", - "\"\"\"\n", - "print(c.create_key())" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "-eu8G9wzxz7E" - }, - "source": [ - "### 1.2 Verify cluster connection" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" + { + "cell_type": "markdown", + "metadata": { + "id": "vwoqvNy79GOY" + }, + "source": [ + "## Install the SDK and dependencies " + ] }, - "id": "OqeSQpkrpDgJ", - "outputId": "054fc968-6338-4177-9bc1-0efd1541cdbe" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " Internally, this method creates a producer and consumer using the retrieved credential.\n", - " It block until it produced a message and consumed it subsequently.\n", - " Note: it is normal to see a few error messages after calling create_key() because the key has not been ready.\n", - " expected return: None (the method blocks until the connection credential is ready)\n", - "\"\"\"\n", - "\n", - "assert block_until_ready()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "cJN-FkWDwRdf" - }, - "source": [ - "## 2 Demonstrate topic management APIs, SDK producing, and SDK consuming" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "qeogkI0PywGE" - }, - "source": [ - "### 2.1 Register a topic and list all topics registered." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "6dEQb3pMj4T2" + }, + "outputs": [], + "source": [ + "%pip install -e '.[kafka-python]'" + ] }, - "id": "scRz12X8vdG-", - "outputId": "51559fd6-9229-4116-bb2a-1c317047efbd" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " Register a topic -- the user get read, write, and describe access to it.\n", - " expected return (first time): {\"status\": \"success\", \"message\": ...}\n", - " expected return (subsequent): {\"status\": \"no-op\", \"message\": ...}\n", - "\"\"\"\n", - "topic = \"topic\" + c.subject_openid[-12:]\n", - "print(c.register_topic(topic))\n", - "\n", - "\"\"\"\n", - " List all topics that the user has access to.\n", - " expected return: {\"status\": \"success\", \"topics\": [...]}\n", - "\"\"\"\n", - "print(c.list_topics())" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "ezoAx8BBy5rG" - }, - "source": [ - "### 2.2 Get configurations of a registered topic" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "aIPqq2IxpcYY" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " For a registered topic, get its configurations.\n", - " If the topic is not registered (or registered by others), return an error message.\n", - " Explanations of these configurations: https://kafka.apache.org/documentation/#topicconfigs\n", - " expected return: { \"status\": \"success\", \"configs\": {...}}\n", - "\"\"\"\n", - "print(c.get_topic_configs(topic))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "kgWZMEQMzGZs" - }, - "source": [ - "### 2.3 Update topic configurations" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "CHV5324-vRV7" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " Update one or more topic configurations, if the topic has been registered.\n", - " If the topic is not registered (or registered by others), return an error message.\n", - " expected return: { \"status\": \"success\", \"before\": {...}, \"after\": {...}}\n", - "\"\"\"\n", - "configs = {'min.insync.replicas': 1}\n", - "print(c.update_topic_configs(topic, configs))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"\n", - " Adjust more than one configuration in a single update_topic_configs request.\n", - " expected return: { \"status\": \"success\", \"before\": {...}, \"after\": {...}}\n", - "\"\"\"\n", - "configs = {\n", - " 'delete.retention.ms': 43200000,\n", - " 'retention.ms': 43200000\n", - "}\n", - "print(c.update_topic_configs(topic, configs))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "SmRvFNCZzOys" - }, - "source": [ - "### 2.4 Update topic partitions (default=1)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "tQiDPK0XvTgg" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " Increase the number of partitions for a registered topic.\n", - " If the topic is not registered, return an error. \n", - " If the new_partitions argument is no larger than the current number of partitions, return an error.\n", - " expected return: { \"status\": \"success\" }\n", - "\"\"\"\n", - "print(c.update_topic_partitions(topic, 2))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### 2.4.2 Restore topic configs" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"\n", - " Deletes and recreates the topic, removing all messages and restoring the topic to the default configurations while user access is not affected.\n", - " If the topic is not registered, return an error. \n", - " Note: under repeated invocations, the topic may marked for deletion but not get deleted for a while. Wait and then call this method again.\n", - " expected return: { \"status\": \"success\", \"message\": \"topic deleted and re-created with default configs\" }\n", - "\"\"\"\n", - "print(c.reset_topic(topic))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "MJlE24gBzYwh" - }, - "source": [ - "### 2.5 Grant/List/Revoke access of another user" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "kC4xHbSAwi4u" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " Authorize another user to access the topic, if the topic has been register by the invoking user.\n", - " expected return (first time): { \"status\": \"success\", \"message\": ...}\n", - " expected return (subsequent): { \"status\": \"no-op\", \"message\": ...}\n", - "\n", - "\"\"\"\n", - "print(c.grant_user_access(topic, \"e2a8169b-feef-4d56-8eba-ab12747bee04\")) # a virtual user" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"\n", - " Returns a list of users that have access to the topic.\n", - " expected return (subsequent): { \"status\": \"success\", \"users\": [...]}\n", - "\"\"\"\n", - "print(c.list_topic_users(topic)) " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"\n", - " Revokes another user to access the topic, if the topic has been register by the invoking user.\n", - " expected return (first time): { \"status\": \"success\", \"message\": ...}\n", - " expected return (subsequent): { \"status\": \"no-op\", \"message\": ...}\n", - "\"\"\"\n", - "print(c.revoke_user_access(topic, \"e2a8169b-feef-4d56-8eba-ab12747bee04\")) # a virtual user" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "producer = KafkaProducer() \n", - "print(topic)\n", - "\n", - "try:\n", - " future = producer.send(\n", - " topic, {'message': 'Synchronous message 1 from Diaspora SDK'})\n", - " print(future.get(timeout=10))\n", - " future = producer.send(\n", - " topic, {'message': 'Synchronous message 2 from Diaspora SDK'})\n", - " print(future.get(timeout=10))\n", - "except Exception as e:\n", - " print(f\"Failed to send message: {e}\")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "aWFxBOo30PnY" - }, - "source": [ - "### 2.6 SDK Producing" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "NcqBY9j0QAN-" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " Synchronously produce messages to a registered topic.\n", - " expected return: \n", - " multiple RecordMetadata(...)\n", - "\"\"\"\n", - "\n", - "producer = KafkaProducer()\n", - "future = producer.send(\n", - " topic, {'message': 'Synchronous message 1 from Diaspora SDK'})\n", - "print(future.get(timeout=10))\n", - "future = producer.send(\n", - " topic, {'message': 'Synchronous message 2 from Diaspora SDK'})\n", - "print(future.get(timeout=10))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "4Z8gkRRO4w2v" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " Asynchronously produce batched messages to a registered topic.\n", - " See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html\n", - " for more producer settings and usage.\n", - " expected return: None\n", - "\"\"\"\n", - "producer = KafkaProducer()\n", - "producer.send(topic, {'message': 'Asynchronous message 3 from Diaspora SDK'})\n", - "producer.send(topic, {'message': 'Asynchronous message 4 from Diaspora SDK'})\n", - "producer.flush()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "ZUQtQyY_5HFh" - }, - "source": [ - "### 2.7 SDK Consuming" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "NhcHgb3TQDP6" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " Consume produced messages from the beginning of the topic.\n", - " The consumer exits in three seconds.\n", - " If the topic has more than one partitions, messages may arrive out of order.\n", - " See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html \n", - " for more consumer settings and usage.\n", - " expected return:\n", - " multiple {'message': ...}\n", - "\"\"\"\n", - "consumer = KafkaConsumer(topic, auto_offset_reset='earliest')\n", - "start_time = time.time()\n", - "try:\n", - " while True:\n", - " messages = consumer.poll(timeout_ms=100)\n", - " for tp, msgs in messages.items():\n", - " for message in msgs:\n", - " print(json.loads(message.value.decode(\"utf-8\")))\n", - "\n", - " if time.time() - start_time > 3:\n", - " # print(\"3 seconds have passed. Exiting...\")\n", - " break\n", - "finally:\n", - " consumer.close()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "AUTlFxC671Pi" - }, - "source": [ - "### 2.8 Unregister topic, list all topics" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "2TlCH8S0QDjY" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " Unregister a topic (i.e., remove user access), leave all existing events in the topic unaffected.\n", - " expected return (first time): { \"status\": \"success\", \"message\": ...}\n", - " expected return (subsequent): { \"status\": \"no-op\", \"message\": ...}\n", - "\"\"\"\n", - "print(c.unregister_topic(topic))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"\n", - " List all topics that the user has access to.\n", - " expected return: {\"status\": \"success\", \"topics\": [...]}\n", - "\"\"\"\n", - "print(c.list_topics())" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "7jCk-6QqxFUz" - }, - "source": [ - "## 3 Demonstrate trigger management APIs" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "sVK55OqpSkD6" - }, - "source": [ - "### 3.0 Create a deployment package" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" + { + "cell_type": "markdown", + "metadata": { + "id": "DFrhINF_oA_s" + }, + "source": [ + "## Perform client login and print the user's OpenID" + ] }, - "id": "CTKG5IU6SH9Q", - "outputId": "575dbbd5-02c9-4eb1-f2af-229629b9fa22" - }, - "outputs": [], - "source": [ - "trigger_package = f\"{os.getcwd()}/my_deployment_package\" # require abs path here\n", - "trigger_file = \"lambda_function.py\"\n", - "trigger_name_in_def=\"lambda_handler\"\n", - "os.system(f\"mkdir {trigger_package}\")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "OspfInFw3xeC" - }, - "source": [ - "### 3.1 Save code to `trigger_package/trigger_file`" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "xRaGaOJTR_9e" - }, - "outputs": [], - "source": [ - "trigger_code = f\"\"\"import base64\n", - "\n", - "def {trigger_name_in_def}(event, context):\n", - " try:\n", - " print('EVENT:')\n", - " print(event)\n", - "\n", - " for partition, records in event['records'].items():\n", - " for record in records:\n", - " print(\"topic:\", record['topic'],\n", - " \"partition:\", record['partition'],\n", - " \"offset:\", record['offset'],\n", - " \"key:\", record.get(\"key\", \"NOT-SET\"),\n", - " \"value:\", base64.b64decode(record['value']))\n", - " except Exception as e:\n", - " print(\"ERROR:\", e)\n", - "\"\"\"\n", - "\n", - "with open(os.path.join(trigger_package, trigger_file), \"w\") as f:\n", - " f.write(trigger_code)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "s_A6K1NV3_mv" - }, - "source": [ - "### 3.2 Zip the code at `trigger_file`" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "NRVkJZEaRfWJ" - }, - "outputs": [], - "source": [ - "def get_zipped_code(lambda_function_package):\n", - " print(f\"Zipping {lambda_function_package}\")\n", - " os.system(f\"cd {lambda_function_package} && zip -r {lambda_function_package}.zip .\")\n", - " with open(f\"{lambda_function_package}.zip\", \"rb\") as f:\n", - " return base64.b64encode(f.read()).decode('utf-8')\n", - "zipped_code = get_zipped_code(trigger_package)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "kvNu2kKf4Xpj" - }, - "source": [ - "### 3.3 Trigger info\n", - "\n", - "Note: one topic can be associated with multiple functions\n", - "\n", - "`topic_name`: which topic to consume from\n", - "\n", - "`function_name`: along with topic_name, used to identify and delete the function\n", - "\n", - "`function_runtime`: a function runtime like `python3.11` and `python3.12`\n", - "\n", - "`function_handler`: py-file-name.function-name\n", - "\n", - "`function_code_zipped`: serialized function code\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "affKKJetnsel", + "outputId": "d41cadb1-64f4-42ea-d275-7df30227ff13" + }, + "outputs": [], + "source": [ + "from diaspora_event_sdk import Client as GlobusClient\n", + "c = GlobusClient()\n", + "print(\"User's OpenID:\", c.subject_openid)" + ] }, - "id": "tM_U2RCY0eJH", - "outputId": "d71a9e92-62ec-4194-cba6-23c0460f047a" - }, - "outputs": [], - "source": [ - "\n", - "topic_name = \"topic\" + c.subject_openid[-12:]\n", - "trigger_name = f\"lambda{random.randint(100, 999)}\"\n", - "trigger_runtime = \"python3.11\"\n", - "trigger_handler = f\"{trigger_file.split('.')[0]}.{trigger_name_in_def}\"\n", - "print(c.register_topic(topic_name))\n", - "print()\n", - "print(\"topic name:\\t\\t\", topic_name)\n", - "print(\"trigger name:\\t\\t\", trigger_name)\n", - "print(\"trigger runtime:\\t\", trigger_runtime)\n", - "print(\"trigger handler:\\t\", trigger_handler)\n", - "print(\"zipped trigger code:\\t\", zipped_code)\n", - "print(\"length of the code:\\t\", len(zipped_code))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "XmWp7cs55Kq9" - }, - "source": [ - "### 3.4 Trigger creation call\n", - "\n", - "Note: the call blocks for a few seconds to wait for creation results or error message.\n", - "\n", - "Default values are listed in the table below, note that if the runtime is `python3.11` or `python3.12`, a layer with Globus SDK and Diaspora SDK will be attached.\n", - "\n", - "[Trigger parameter syntax (`Code`, etc.)](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/create_function.html)\n", - "\n", - "| Trigger Parameter | Default Value |\n", - "|--------------------|------------------------------------|\n", - "| Runtime | python3.11 |\n", - "| Handler | lambda_function.lambda_handler |\n", - "| Code | {} |\n", - "| Timeout | 30 |\n", - "| MemorySize | 128 |\n", - "| Environment | {} |\n", - "| EphemeralStorage | {'Size': 512} |\n", - "| Layers | [] |\n", - "\n", - "\n", - "\n", - "[Invocation parameter syntax (`FilterCriteria`, etc.)](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax)\n", - "\n", - "| Invocation Parameter | Default Value |\n", - "|--------------------------------|---------------|\n", - "| Enabled | True |\n", - "| BatchSize | 1 |\n", - "| FilterCriteria | {} |\n", - "| MaximumBatchingWindowInSeconds | 500ms |\n", - "| StartingPosition | LATEST |\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "_Z8kqJ5K1Rdi" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " Create a new trigger that response to events in a registered topic.\n", - " Note: the creation call takes around 10 seconds to return.\n", - " Note: for Python 3.12 runtime, use \n", - " arn:aws:lambda:us-east-1:845889416464:layer:globus-diaspora-layer312:1\n", - " to enable the globus SDK in the trigger.\n", - " expected return (first time): {\"status\": \"success\", \"message\": \"Trigger creation started.\"}\n", - " expected return (subsequent): {\"status\": \"error\", \"message\": ...}\n", - "\"\"\"\n", - "\n", - "trigger_configs = {\n", - " \"Runtime\": trigger_runtime,\n", - " \"Handler\": trigger_handler,\n", - " \"Code\": {'ZipFile': zipped_code},\n", - " \"Timeout\": 3,\n", - " \"MemorySize\": 128,\n", - " \"Environment\": {},\n", - " \"EphemeralStorage\": {'Size': 512},\n", - " \"Layers\": [\"arn:aws:lambda:us-east-1:845889416464:layer:globus-diaspora-layer311:1\"]\n", - "}\n", - "invoke_configs = {\n", - " \"Enabled\": True,\n", - " \"BatchSize\": 1,\n", - " \"StartingPosition\": \"LATEST\"\n", - "}\n", - "print(c.create_trigger(topic_name, trigger_name, trigger_configs, invoke_configs))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "FPNHrJk75aaA" - }, - "source": [ - "### 3.5 List created functions\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "N_pwg-fg1gEc" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " List all triggered created by the user.\n", - " Note: the print function below highlights the trigger name, \n", - " handler name, uuid, and topic it taps on.\n", - " expected return:\n", - " trigger name: ... trigger handler name: ... trigger uuid: ... trigger topic: ...\n", - "\"\"\"\n", - "\n", - "for function in c.list_triggers()['triggers']:\n", - " print(\"trigger name:\", function['function_name'], \"\\n\",\n", - " \"trigger handler name:\", function['function_detail']['Configuration']['Handler'], \"\\n\",\n", - " \"trigger uuid:\", function['triggers'][0]['UUID'], \"\\n\",\n", - " \"trigger topic:\", function['triggers'][0]['Topics'][0], \"\\n\",)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "tuJ3ozabuEyB" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " List all triggered created by the user.\n", - " Note: the print function below highlights the trigger most recently created\n", - " expected return: {'function_name': ..., 'function_detail': ..., 'triggers': [...]}\n", - "\"\"\"\n", - "\n", - "for function in c.list_triggers()['triggers']:\n", - " if function['function_name'] == trigger_name:\n", - " pprint(function, sort_dicts=False)\n", - " trigger_uuid = function['triggers'][0]['UUID']" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "M30sgRswvBT_" - }, - "source": [ - "### 3.6 Update trigger configurations" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "8160R8BnvRtq" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " Update the invocation configuration -- add event filters\n", - " Note: see the table above for other tunable configurations.\n", - " expected return: {\"status\": \"success\", \"before\": {}, \"after\": {}}\n", - "\"\"\"\n", - "\n", - "\n", - "pattern1 = {\"value\": {\"event_type\": [\"created\"]}}\n", - "config1 = {\n", - " \"Enabled\": True,\n", - " \"BatchSize\": 123,\n", - " \"FilterCriteria\": {\"Filters\": [{'Pattern': json.dumps(pattern1)}]},\n", - " \"MaximumBatchingWindowInSeconds\": 42\n", - "}\n", - "print(c.update_trigger(trigger_uuid, config1))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"\n", - " Update the invocation configuration -- remove event filters and change the batch size back.\n", - " Note: see the table above for other tunable configurations.\n", - " expected return: {\"status\": \"success\", \"before\": {}, \"after\": {}}\n", - "\"\"\"\n", - "config2 = {\n", - " \"BatchSize\": 1,\n", - " \"FilterCriteria\": {},\n", - " \"MaximumBatchingWindowInSeconds\": 1\n", - "}\n", - "print(c.update_trigger(trigger_uuid, config2))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### 3.6 Produce events to invoke the trigger and verify invocations through inspecting the latest log stream" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"\n", - " Synchronously produce messages to a registered topic to invoke triggers\n", - " expected return: \n", - " multiple RecordMetadata(...)\n", - "\"\"\"\n", - "\n", - "producer = KafkaProducer()\n", - "future = producer.send(\n", - " topic_name, {'message': 'Synchronous message 3 from Diaspora SDK'})\n", - "print(future.get(timeout=10))\n", - "future = producer.send(\n", - " topic_name, {'message': 'Synchronous message 4 from Diaspora SDK'})\n", - "print(future.get(timeout=10))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"\n", - " Get the list of log streams belong to the trigger.\n", - " Note: recent_log_stream_name may not contain logs of all trigger invocations,\n", - " as some logs may exist in other streams.\n", - " expected return: {\"status\": \"success\", \"streams\": [...]}\n", - "\"\"\"\n", - "\n", - "streams_response = c.list_log_streams(trigger_name)\n", - "print(streams_response)\n", - "recent_log_stream_name = streams_response['streams'][0]['logStreamName']" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"\n", - " Retrieve the events in a particular log stream.\n", - " Note: this log stream may not contain logs of all trigger invocations,\n", - " as some logs may exist in other streams.\n", - " expected return: {\"status\": \"success\", \"events\": [...]}\n", - "\"\"\"\n", - "print(c.get_log_events(trigger_name, recent_log_stream_name))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "KdDdbAg55ilM" - }, - "source": [ - "### 3.7 Trigger deletion call" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "xtxx1PiU1iS7" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " Delete trigger by (topic_name, trigger_name)\n", - " expected return: {\"status\": \"success\", \"message\": ...}\n", - "\"\"\"\n", - "print(c.delete_trigger(topic_name, trigger_name))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "CPk0DY6f58Xv" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " List all triggered created by the user.\n", - " expected return (if all triggers are deleted): None \n", - " expected return (otherwise): {'function_name': ..., 'function_detail': ..., 'triggers': [...]}\n", - "\"\"\"\n", - "for function in c.list_triggers()['triggers']:\n", - " print(\"trigger name:\", function['function_name'], \"\\n\",\n", - " \"trigger handler name:\", function['function_detail']['Configuration']['Handler'], \"\\n\",\n", - " \"trigger uuid:\", function['triggers'][0]['UUID'], \"\\n\",\n", - " \"trigger topic:\", function['triggers'][0]['Topics'][0], \"\\n\",)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "QDRr1hZ2qsw8" - }, - "source": [ - "### 3.7 Unregister topic" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { + { + "cell_type": "markdown", + "metadata": { + "id": "RIZ72NHUvzMy" + }, + "source": [ + "## 1 Create a cluster authentication credential and verify cluster connection" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "KC-Gdkc_xvNy" + }, + "source": [ + "### 1.1 Create and retrieve a cluster authentication credential" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "AuEP_SyqomC4", + "outputId": "c0ea107a-ca1e-4670-c66a-0f88994ec9fd" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " call the Web Service API and store credentials and endpoint address in a local token storage.\n", + " Note: call `create_key` at another machine invalidate the retrieved credential at this machine\n", + " expected return: {'access_key': ..., 'secret_key': ..., 'endpoint': ...}\n", + "\"\"\"\n", + "print(c.create_key())" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "-eu8G9wzxz7E" + }, + "source": [ + "### 1.2 Verify cluster connection" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "OqeSQpkrpDgJ", + "outputId": "054fc968-6338-4177-9bc1-0efd1541cdbe" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " Internally, this method creates a producer and consumer using the retrieved credential.\n", + " It block until it produced a message and consumed it subsequently.\n", + " Note: it is normal to see a few error messages after calling create_key() because the key has not been ready.\n", + " expected return: None (the method blocks until the connection credential is ready)\n", + "\"\"\"\n", + "from diaspora_event_sdk import block_until_ready\n", + "assert block_until_ready()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "cJN-FkWDwRdf" + }, + "source": [ + "## 2 Demonstrate topic management APIs, SDK producing, and SDK consuming" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "qeogkI0PywGE" + }, + "source": [ + "### 2.1 Register a topic and list all topics registered." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "scRz12X8vdG-", + "outputId": "51559fd6-9229-4116-bb2a-1c317047efbd" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " Register a topic -- the user get read, write, and describe access to it.\n", + " expected return (first time): {\"status\": \"success\", \"message\": ...}\n", + " expected return (subsequent): {\"status\": \"no-op\", \"message\": ...}\n", + "\"\"\"\n", + "topic = \"topic\" + c.subject_openid[-12:]\n", + "print(c.register_topic(topic))\n", + "\n", + "\"\"\"\n", + " List all topics that the user has access to.\n", + " expected return: {\"status\": \"success\", \"topics\": [...]}\n", + "\"\"\"\n", + "print(c.list_topics())" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "ezoAx8BBy5rG" + }, + "source": [ + "### 2.2 Get configurations of a registered topic" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "aIPqq2IxpcYY" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " For a registered topic, get its configurations.\n", + " If the topic is not registered (or registered by others), return an error message.\n", + " Explanations of these configuratons: https://kafka.apache.org/documentation/#topicconfigs\n", + " expected return: { \"status\": \"success\", \"configs\": {...}}\n", + "\"\"\"\n", + "print(c.get_topic_configs(topic))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "kgWZMEQMzGZs" + }, + "source": [ + "### 2.3 Update topic configurations" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "CHV5324-vRV7" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " Update one or more topic configurations, if the topic has been registered.\n", + " If the topic is not registered (or registered by others), return an error message.\n", + " expected return: { \"status\": \"success\", \"before\": {...}, \"after\": {...}}\n", + "\"\"\"\n", + "configs = {'min.insync.replicas': 1}\n", + "print(c.update_topic_configs(topic, configs))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + " Adjust more than one configuration in a single update_topic_configs request.\n", + " expected return: { \"status\": \"success\", \"before\": {...}, \"after\": {...}}\n", + "\"\"\"\n", + "configs = {\n", + " 'delete.retention.ms': 43200000,\n", + " 'retention.ms': 43200000\n", + "}\n", + "print(c.update_topic_configs(topic, configs))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "SmRvFNCZzOys" + }, + "source": [ + "### 2.4 Update topic partitions (default=1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "tQiDPK0XvTgg" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " Increase the number of partions for a registered topic.\n", + " If the topic is not registered, return an error. \n", + " If the new_partitions argument is no larger than the current number of partitions, return an error.\n", + " expected return: { \"status\": \"success\" }\n", + "\"\"\"\n", + "print(c.update_topic_partitions(topic, 2))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 2.4.2 Restore topic configs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + " Deletes and recreates the topic, removing all messages and restoring the topic to the default configurations while user access is not affected.\n", + " If the topic is not registered, return an error. \n", + " Note: under repeated invocations, the topic may marked for deletion but not get deleted for a while. Wait and then call this method again.\n", + " expected return: { \"status\": \"success\", \"message\": \"topic deleted and re-created with default configs\" }\n", + "\"\"\"\n", + "print(c.reset_topic(topic))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "MJlE24gBzYwh" + }, + "source": [ + "### 2.5 Grant/List/Revoke access of another user" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "kC4xHbSAwi4u" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " Authorize another user to access the topic, if the topic has been register by the invoking user.\n", + " expected return (first time): { \"status\": \"success\", \"message\": ...}\n", + " expected return (subsequent): { \"status\": \"no-op\", \"message\": ...}\n", + "\n", + "\"\"\"\n", + "print(c.grant_user_access(topic, \"e2a8169b-feef-4d56-8eba-ab12747bee04\")) # a virtual user" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + " Returns a list of users that have access to the topic.\n", + " expected return (subsequent): { \"status\": \"success\", \"users\": [...]}\n", + "\"\"\"\n", + "print(c.list_topic_users(topic)) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + " Revokes another user to access the topic, if the topic has been register by the invoking user.\n", + " expected return (first time): { \"status\": \"success\", \"message\": ...}\n", + " expected return (subsequent): { \"status\": \"no-op\", \"message\": ...}\n", + "\"\"\"\n", + "print(c.revoke_user_access(topic, \"e2a8169b-feef-4d56-8eba-ab12747bee04\")) # a virtual user" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from diaspora_event_sdk import KafkaProducer\n", + "\n", + "producer = KafkaProducer() \n", + "print(topic)\n", + "\n", + "try:\n", + " future = producer.send(\n", + " topic, {'message': 'Synchronous message 1 from Diaspora SDK'})\n", + " print(future.get(timeout=10))\n", + " future = producer.send(\n", + " topic, {'message': 'Synchronous message 2 from Diaspora SDK'})\n", + " print(future.get(timeout=10))\n", + "except Exception as e:\n", + " print(f\"Failed to send message: {e}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "aWFxBOo30PnY" + }, + "source": [ + "### 2.6 SDK Producing" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "NcqBY9j0QAN-" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " Synchronously produce messages to a registered topic.\n", + " expected return: \n", + " multiple RecordMetadata(...)\n", + "\"\"\"\n", + "from diaspora_event_sdk import KafkaProducer\n", + "\n", + "producer = KafkaProducer()\n", + "future = producer.send(\n", + " topic, {'message': 'Synchronous message 1 from Diaspora SDK'})\n", + "print(future.get(timeout=10))\n", + "future = producer.send(\n", + " topic, {'message': 'Synchronous message 2 from Diaspora SDK'})\n", + "print(future.get(timeout=10))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "4Z8gkRRO4w2v" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " Asynchronously produce batched messages to a registered topic.\n", + " See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html\n", + " for more producer settings and usage.\n", + " expected return: None\n", + "\"\"\"\n", + "from diaspora_event_sdk import KafkaProducer\n", + "\n", + "producer = KafkaProducer()\n", + "producer.send(topic, {'message': 'Asynchronous message 3 from Diaspora SDK'})\n", + "producer.send(topic, {'message': 'Asynchronous message 4 from Diaspora SDK'})\n", + "producer.flush()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "ZUQtQyY_5HFh" + }, + "source": [ + "### 2.7 SDK Consuming" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "NhcHgb3TQDP6" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " Consume produced messages from the beginning of the topic.\n", + " The consumer exits in three seconds.\n", + " If the topic has more than one partitions, messages may arrive out of order.\n", + " See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html \n", + " for more consumer settings and usage.\n", + " expected return:\n", + " multiple {'message': ...}\n", + "\"\"\"\n", + "import json\n", + "import time\n", + "\n", + "from diaspora_event_sdk import KafkaConsumer\n", + "\n", + "consumer = KafkaConsumer(topic, auto_offset_reset='earliest')\n", + "start_time = time.time()\n", + "try:\n", + " while True:\n", + " messages = consumer.poll(timeout_ms=100)\n", + " for tp, msgs in messages.items():\n", + " for message in msgs:\n", + " print(json.loads(message.value.decode(\"utf-8\")))\n", + "\n", + " if time.time() - start_time > 3:\n", + " # print(\"3 seconds have passed. Exiting...\")\n", + " break\n", + "finally:\n", + " consumer.close()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "AUTlFxC671Pi" + }, + "source": [ + "### 2.8 Unregister topic, list all topics" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "2TlCH8S0QDjY" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " Unregister a topic (i.e., remove user access), leave all existing events in the topic unaffected.\n", + " expected return (first time): { \"status\": \"success\", \"message\": ...}\n", + " expected return (subsequent): { \"status\": \"no-op\", \"message\": ...}\n", + "\"\"\"\n", + "print(c.unregister_topic(topic))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + " List all topics that the user has access to.\n", + " expected return: {\"status\": \"success\", \"topics\": [...]}\n", + "\"\"\"\n", + "print(c.list_topics())" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "7jCk-6QqxFUz" + }, + "source": [ + "## 3 Demonstrate trigger management APIs" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "sVK55OqpSkD6" + }, + "source": [ + "### 3.0 Create a deployment package" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "CTKG5IU6SH9Q", + "outputId": "575dbbd5-02c9-4eb1-f2af-229629b9fa22" + }, + "outputs": [], + "source": [ + "import os\n", + "trigger_package = f\"{os.getcwd()}/my_deployment_package\" # require abs path here\n", + "trigger_file = \"lambda_function.py\"\n", + "trigger_name_in_def=\"lambda_handler\"\n", + "os.system(f\"mkdir {trigger_package}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "OspfInFw3xeC" + }, + "source": [ + "### 3.1 Save code to `trigger_package/trigger_file`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "xRaGaOJTR_9e" + }, + "outputs": [], + "source": [ + "trigger_code = f\"\"\"import base64\n", + "\n", + "def {trigger_name_in_def}(event, context):\n", + " try:\n", + " print('EVENT:')\n", + " print(event)\n", + "\n", + " for partition, records in event['records'].items():\n", + " for record in records:\n", + " print(\"topic:\", record['topic'],\n", + " \"partition:\", record['partition'],\n", + " \"offset:\", record['offset'],\n", + " \"key:\", record.get(\"key\", \"NOT-SET\"),\n", + " \"value:\", base64.b64decode(record['value']))\n", + " except Exception as e:\n", + " print(\"ERROR:\", e)\n", + "\"\"\"\n", + "\n", + "with open(os.path.join(trigger_package, trigger_file), \"w\") as f:\n", + " f.write(trigger_code)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "s_A6K1NV3_mv" + }, + "source": [ + "### 3.2 Zip the code at `trigger_file`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "NRVkJZEaRfWJ" + }, + "outputs": [], + "source": [ + "import base64\n", + "def get_zipped_code(lambda_function_package):\n", + " print(f\"Zipping {lambda_function_package}\")\n", + " os.system(f\"cd {lambda_function_package} && zip -r {lambda_function_package}.zip .\")\n", + " with open(f\"{lambda_function_package}.zip\", \"rb\") as f:\n", + " return base64.b64encode(f.read()).decode('utf-8')\n", + "zipped_code = get_zipped_code(trigger_package)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "kvNu2kKf4Xpj" + }, + "source": [ + "### 3.3 Trigger info\n", + "\n", + "Note: one topic can be associated with multiple functions\n", + "\n", + "`topic_name`: which topic to consume from\n", + "\n", + "`function_name`: along with topic_name, used to identify and delete the function\n", + "\n", + "`function_runtime`: a function runtime like `python3.11` and `python3.12`\n", + "\n", + "`function_handler`: py-file-name.function-name\n", + "\n", + "`function_code_zipped`: serialized function code\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "tM_U2RCY0eJH", + "outputId": "d71a9e92-62ec-4194-cba6-23c0460f047a" + }, + "outputs": [], + "source": [ + "import random\n", + "\n", + "topic_name = \"topic\" + c.subject_openid[-12:]\n", + "trigger_name = f\"lambda{random.randint(100, 999)}\"\n", + "trigger_runtime = \"python3.11\"\n", + "trigger_handler = f\"{trigger_file.split('.')[0]}.{trigger_name_in_def}\"\n", + "print(c.register_topic(topic_name))\n", + "print()\n", + "print(\"topic name:\\t\\t\", topic_name)\n", + "print(\"trigger name:\\t\\t\", trigger_name)\n", + "print(\"trigger runtime:\\t\", trigger_runtime)\n", + "print(\"trigger handler:\\t\", trigger_handler)\n", + "print(\"zipped trigger code:\\t\", zipped_code)\n", + "print(\"length of the code:\\t\", len(zipped_code))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "XmWp7cs55Kq9" + }, + "source": [ + "### 3.4 Trigger creation call\n", + "\n", + "Note: the call blocks for a few seconds to wait for creation results or error message.\n", + "\n", + "Default values are listed in the table below, note that if the runtime is `python3.11` or `python3.12`, a layer with Globus SDK and Diaspora SDK will be attached.\n", + "\n", + "[Trigger parameter syntax (`Code`, etc.)](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/create_function.html)\n", + "\n", + "| Trigger Parameter | Default Value |\n", + "|--------------------|------------------------------------|\n", + "| Runtime | python3.11 |\n", + "| Handler | lambda_function.lambda_handler |\n", + "| Code | {} |\n", + "| Timeout | 30 |\n", + "| MemorySize | 128 |\n", + "| Environment | {} |\n", + "| EphemeralStorage | {'Size': 512} |\n", + "| Layers | [] |\n", + "\n", + "\n", + "\n", + "[Invocation parameter syntax (`FilterCriteria`, etc.)](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax)\n", + "\n", + "| Invocation Parameter | Default Value |\n", + "|--------------------------------|---------------|\n", + "| Enabled | True |\n", + "| BatchSize | 1 |\n", + "| FilterCriteria | {} |\n", + "| MaximumBatchingWindowInSeconds | 500ms |\n", + "| StartingPosition | LATEST |\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "_Z8kqJ5K1Rdi" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " Create a new trigger that response to events in a registered topic.\n", + " Note: the creation call takes around 10 seconds to return.\n", + " Note: for Python 3.12 runtime, use \n", + " arn:aws:lambda:us-east-1:845889416464:layer:globus-diaspora-layer312:1\n", + " to enable the globus SDK in the trigger.\n", + " expected return (first time): {\"status\": \"success\", \"message\": \"Trigger creation started.\"}\n", + " expected return (subsequent): {\"status\": \"error\", \"message\": ...}\n", + "\"\"\"\n", + "\n", + "trigger_configs = {\n", + " \"Runtime\": trigger_runtime,\n", + " \"Handler\": trigger_handler,\n", + " \"Code\": {'ZipFile': zipped_code},\n", + " \"Timeout\": 3,\n", + " \"MemorySize\": 128,\n", + " \"Environment\": {},\n", + " \"EphemeralStorage\": {'Size': 512},\n", + " \"Layers\": [\"arn:aws:lambda:us-east-1:845889416464:layer:globus-diaspora-layer311:1\"]\n", + "}\n", + "invoke_configs = {\n", + " \"Enabled\": True,\n", + " \"BatchSize\": 1,\n", + " \"StartingPosition\": \"LATEST\"\n", + "}\n", + "print(c.create_trigger(topic_name, trigger_name, trigger_configs, invoke_configs))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "FPNHrJk75aaA" + }, + "source": [ + "### 3.5 List created functions\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "N_pwg-fg1gEc" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " List all triggered created by the user.\n", + " Note: the print function below highlights the trigger name, \n", + " handler name, uuid, and topic it taps on.\n", + " expected return:\n", + " trigger name: ... trigger handler name: ... trigger uuid: ... trigger topic: ...\n", + "\"\"\"\n", + "\n", + "for function in c.list_triggers()['triggers']:\n", + " print(\"trigger name:\", function['function_name'], \"\\n\",\n", + " \"trigger handler name:\", function['function_detail']['Configuration']['Handler'], \"\\n\",\n", + " \"trigger uuid:\", function['triggers'][0]['UUID'], \"\\n\",\n", + " \"trigger topic:\", function['triggers'][0]['Topics'][0], \"\\n\",)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "tuJ3ozabuEyB" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " List all triggered created by the user.\n", + " Note: the print function below highlights the trigger most recently created\n", + " expected return: {'function_name': ..., 'function_detail': ..., 'triggers': [...]}\n", + "\"\"\"\n", + "from pprint import pprint\n", + "\n", + "for function in c.list_triggers()['triggers']:\n", + " if function['function_name'] == trigger_name:\n", + " pprint(function, sort_dicts=False)\n", + " trigger_uuid = function['triggers'][0]['UUID']" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "M30sgRswvBT_" + }, + "source": [ + "### 3.6 Update trigger configurations" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "8160R8BnvRtq" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " Update the invocation configuraiton -- add event filters\n", + " Note: see the table above for other tunable configurations.\n", + " expected return: {\"status\": \"success\", \"before\": {}, \"after\": {}}\n", + "\"\"\"\n", + "import json\n", + "\n", + "pattern1 = {\"value\": {\"event_type\": [\"created\"]}}\n", + "config1 = {\n", + " \"Enabled\": True,\n", + " \"BatchSize\": 123,\n", + " \"FilterCriteria\": {\"Filters\": [{'Pattern': json.dumps(pattern1)}]},\n", + " \"MaximumBatchingWindowInSeconds\": 42\n", + "}\n", + "print(c.update_trigger(trigger_uuid, config1))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + " Update the invocation configuraiton -- remove event filters and change the batch size back.\n", + " Note: see the table above for other tunable configurations.\n", + " expected return: {\"status\": \"success\", \"before\": {}, \"after\": {}}\n", + "\"\"\"\n", + "config2 = {\n", + " \"BatchSize\": 1,\n", + " \"FilterCriteria\": {},\n", + " \"MaximumBatchingWindowInSeconds\": 1\n", + "}\n", + "print(c.update_trigger(trigger_uuid, config2))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 3.6 Produce events to invoke the trigger and verify invocations through inspecting the latest log stream" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + " Synchronously produce messages to a registered topic to invoke triggers\n", + " expected return: \n", + " multiple RecordMetadata(...)\n", + "\"\"\"\n", + "from diaspora_event_sdk import KafkaProducer\n", + "\n", + "producer = KafkaProducer()\n", + "future = producer.send(\n", + " topic_name, {'message': 'Synchronous message 3 from Diaspora SDK'})\n", + "print(future.get(timeout=10))\n", + "future = producer.send(\n", + " topic_name, {'message': 'Synchronous message 4 from Diaspora SDK'})\n", + "print(future.get(timeout=10))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + " Get the list of log streams belong to the trigger.\n", + " Note: recent_log_stream_name may not contain logs of all trigger invocations,\n", + " as some logs may exist in other streams.\n", + " expected return: {\"status\": \"success\", \"streams\": [...]}\n", + "\"\"\"\n", + "\n", + "streams_response = c.list_log_streams(trigger_name)\n", + "print(streams_response)\n", + "recent_log_stream_name = streams_response['streams'][0]['logStreamName']" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + " Retrieve the events in a particular log stream.\n", + " Note: this log stream may not contain logs of all trigger invocations,\n", + " as some logs may exist in other streams.\n", + " expected return: {\"status\": \"success\", \"events\": [...]}\n", + "\"\"\"\n", + "print(c.get_log_events(trigger_name, recent_log_stream_name))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "KdDdbAg55ilM" + }, + "source": [ + "### 3.7 Trigger deletion call" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "xtxx1PiU1iS7" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " Delete trigger by (topic_name, trigger_name)\n", + " expected return: {\"status\": \"success\", \"message\": ...}\n", + "\"\"\"\n", + "print(c.delete_trigger(topic_name, trigger_name))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "CPk0DY6f58Xv" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " List all triggered created by the user.\n", + " expected return (if all triggers are deleted): None \n", + " expected return (otherwise): {'function_name': ..., 'function_detail': ..., 'triggers': [...]}\n", + "\"\"\"\n", + "for function in c.list_triggers()['triggers']:\n", + " print(\"trigger name:\", function['function_name'], \"\\n\",\n", + " \"trigger handler name:\", function['function_detail']['Configuration']['Handler'], \"\\n\",\n", + " \"trigger uuid:\", function['triggers'][0]['UUID'], \"\\n\",\n", + " \"trigger topic:\", function['triggers'][0]['Topics'][0], \"\\n\",)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "QDRr1hZ2qsw8" + }, + "source": [ + "### 3.7 Unregister topic" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "Mmr5Fuk-qxiN", + "outputId": "8b681091-9aae-4dcf-b43b-907c4e2c36f9" + }, + "outputs": [], + "source": [ + "\"\"\"\n", + " Unregister a topic (i.e., remove user access), leave all existing events in the topic unaffected.\n", + " expected return (first time): { \"status\": \"success\", \"message\": ...}\n", + " expected return (subsequent): { \"status\": \"no-op\", \"message\": ...}\n", + "\"\"\"\n", + "print(c.unregister_topic(topic_name))" + ] + } + ], + "metadata": { "colab": { - "base_uri": "https://localhost:8080/" + "provenance": [], + "toc_visible": true }, - "id": "Mmr5Fuk-qxiN", - "outputId": "8b681091-9aae-4dcf-b43b-907c4e2c36f9" - }, - "outputs": [], - "source": [ - "\"\"\"\n", - " Unregister a topic (i.e., remove user access), leave all existing events in the topic unaffected.\n", - " expected return (first time): { \"status\": \"success\", \"message\": ...}\n", - " expected return (subsequent): { \"status\": \"no-op\", \"message\": ...}\n", - "\"\"\"\n", - "print(c.unregister_topic(topic_name))" - ] - } - ], - "metadata": { - "colab": { - "provenance": [], - "toc_visible": true - }, - "kernelspec": { - "display_name": "Python 3", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.6" - } - }, - "nbformat": 4, - "nbformat_minor": 0 + "kernelspec": { + "display_name": "Python 3", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 } diff --git a/LICENSE b/LICENSE index d645695..7a4a3ea 100644 --- a/LICENSE +++ b/LICENSE @@ -199,4 +199,4 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and - limitations under the License. + limitations under the License. \ No newline at end of file diff --git a/README.md b/README.md index 2030888..8f8ce6e 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ pip install "diaspora-event-sdk[kafka-python]" ``` ### Alternative Installation: Without Kafka Client Library -To use alternative Kafka client libraries (e.g., `confluent-kafka-python`, `aiokafka`, and libraries for other programming languages), you can install the SDK without the `kafka-python` dependency. This option still provides topic-level access control (authorization) and login credential management features. +To use alternative Kafka client libraries (e.g., `confluent-kafka-python`, `aiokafka`, and libraries for other programming laguages), you can install the SDK without the `kafka-python` dependency. This option still provides topic-level access control (authorization) and login credential management features. To install the SDK without `kafka-python`, use: ```bash diff --git a/diaspora_event_sdk/__init__.py b/diaspora_event_sdk/__init__.py index 5d1db89..7ee6976 100644 --- a/diaspora_event_sdk/__init__.py +++ b/diaspora_event_sdk/__init__.py @@ -1,5 +1,6 @@ -"""Diaspora Event Fabric: Resilience-enabling services for science from HPC to edge.""" +""" Diaspora Event Fabric: Resilience-enabling services for science from HPC to edge. +""" from diaspora_event_sdk.version import __version__ as _version __author__ = "The Diaspora Event Team" diff --git a/diaspora_event_sdk/sdk/_environments.py b/diaspora_event_sdk/sdk/_environments.py index bda19e0..c00906a 100644 --- a/diaspora_event_sdk/sdk/_environments.py +++ b/diaspora_event_sdk/sdk/_environments.py @@ -9,14 +9,13 @@ def _get_envname(): return os.getenv("DIASPORA_SDK_ENVIRONMENT", "production") - def get_web_service_url(envname: Union[str, None] = None) -> str: env = envname or _get_envname() urls = { "production": "https://diaspora-web-service.ml22sevubfnks.us-east-1.cs.amazonlightsail.com", "dev": "https://diaspora-web-service-dev.ml22sevubfnks.us-east-1.cs.amazonlightsail.com", "local": "http://localhost:8000", - "legacy": "http://3.220.110.101/", + "legacy": "http://3.220.110.101/", } return urls.get(env, urls["production"]) diff --git a/diaspora_event_sdk/sdk/aws_iam_msk.py b/diaspora_event_sdk/sdk/aws_iam_msk.py index d05d21b..9a21f7b 100644 --- a/diaspora_event_sdk/sdk/aws_iam_msk.py +++ b/diaspora_event_sdk/sdk/aws_iam_msk.py @@ -2,7 +2,6 @@ # SPDX-License-Identifier: Apache-2.0 import base64 - # import logging from datetime import datetime, timezone from urllib.parse import parse_qs, urlparse @@ -12,7 +11,6 @@ # import pkg_resources from .botocore.auth import SigV4QueryAuth from .botocore.awsrequest import AWSRequest - # from botocore.config import Config from .botocore.credentials import Credentials @@ -27,7 +25,7 @@ def __get_user_agent__(): - return f"{LIB_NAME}/1.0.1" + return (f"{LIB_NAME}/1.0.1") def __get_expiration_time_ms(request): @@ -39,17 +37,15 @@ def __get_expiration_time_ms(request): # Parse the signed request parsed_url = urlparse(request.url) parsed_ul_params = parse_qs(parsed_url.query) - parsed_signing_time = datetime.strptime( - parsed_ul_params["X-Amz-Date"][0], "%Y%m%dT%H%M%SZ" - ) + parsed_signing_time = datetime.strptime(parsed_ul_params['X-Amz-Date'][0], + "%Y%m%dT%H%M%SZ") # Make the datetime object timezone-aware signing_time = parsed_signing_time.replace(tzinfo=timezone.utc) # Convert the Unix timestamp to milliseconds - expiration_timestamp_seconds = ( - int(signing_time.timestamp()) + DEFAULT_TOKEN_EXPIRY_SECONDS - ) + expiration_timestamp_seconds = int( + signing_time.timestamp()) + DEFAULT_TOKEN_EXPIRY_SECONDS # Get lifetime of token expiration_timestamp_ms = expiration_timestamp_seconds * 1000 @@ -78,7 +74,8 @@ def __construct_auth_token(region, aws_credentials): # Create SigV4 instance sig_v4 = SigV4QueryAuth( - aws_credentials, SIGNING_NAME, region, expires=DEFAULT_TOKEN_EXPIRY_SECONDS + aws_credentials, SIGNING_NAME, region, + expires=DEFAULT_TOKEN_EXPIRY_SECONDS ) # Create request with url and parameters @@ -113,12 +110,10 @@ def generate_auth_token(region, aws_debug_creds=False): # Load credentials import os - assert os.environ["AWS_ACCESS_KEY_ID"] assert os.environ["AWS_SECRET_ACCESS_KEY"] - aws_credentials = Credentials( - os.environ["AWS_ACCESS_KEY_ID"], os.environ["AWS_SECRET_ACCESS_KEY"] - ) + aws_credentials = Credentials(os.environ["AWS_ACCESS_KEY_ID"], + os.environ["AWS_SECRET_ACCESS_KEY"]) return __construct_auth_token(region, aws_credentials) diff --git a/diaspora_event_sdk/sdk/botocore/auth.py b/diaspora_event_sdk/sdk/botocore/auth.py index e6ec3fb..91a78f5 100644 --- a/diaspora_event_sdk/sdk/botocore/auth.py +++ b/diaspora_event_sdk/sdk/botocore/auth.py @@ -19,7 +19,7 @@ import logging from collections.abc import Mapping from email.utils import formatdate -from hashlib import sha256 +from hashlib import sha1, sha256 from .compat import ( HTTPHeaders, @@ -40,20 +40,22 @@ logger = logging.getLogger(__name__) -EMPTY_SHA256_HASH = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" +EMPTY_SHA256_HASH = ( + 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' +) # This is the buffer size used when calculating sha256 checksums. # Experimenting with various buffer sizes showed that this value generally # gave the best result (in terms of performance). PAYLOAD_BUFFER = 1024 * 1024 -ISO8601 = "%Y-%m-%dT%H:%M:%SZ" -SIGV4_TIMESTAMP = "%Y%m%dT%H%M%SZ" +ISO8601 = '%Y-%m-%dT%H:%M:%SZ' +SIGV4_TIMESTAMP = '%Y%m%dT%H%M%SZ' SIGNED_HEADERS_BLACKLIST = [ - "expect", - "user-agent", - "x-amzn-trace-id", + 'expect', + 'user-agent', + 'x-amzn-trace-id', ] -UNSIGNED_PAYLOAD = "UNSIGNED-PAYLOAD" -STREAMING_UNSIGNED_PAYLOAD_TRAILER = "STREAMING-UNSIGNED-PAYLOAD-TRAILER" +UNSIGNED_PAYLOAD = 'UNSIGNED-PAYLOAD' +STREAMING_UNSIGNED_PAYLOAD_TRAILER = 'STREAMING-UNSIGNED-PAYLOAD-TRAILER' def _host_from_url(url): @@ -64,14 +66,14 @@ def _host_from_url(url): url_parts = urlsplit(url) host = url_parts.hostname # urlsplit's hostname is always lowercase if is_valid_ipv6_endpoint_url(url): - host = f"[{host}]" + host = f'[{host}]' default_ports = { - "http": 80, - "https": 443, + 'http': 80, + 'https': 443, } if url_parts.port is not None: if url_parts.port != default_ports.get(url_parts.scheme): - host = "%s:%d" % (host, url_parts.port) + host = '%s:%d' % (host, url_parts.port) return host @@ -82,7 +84,7 @@ def _get_body_as_dict(request): # dict. data = request.data if isinstance(data, bytes): - data = json.loads(data.decode("utf-8")) + data = json.loads(data.decode('utf-8')) elif isinstance(data, str): data = json.loads(data) return data @@ -106,16 +108,16 @@ class SigV4Auth(BaseSigner): def __init__(self, credentials, service_name, region_name): self.credentials = credentials # We initialize these value here so the unit tests can have - # valid values. But these will get overridden in ``add_auth`` + # valid values. But these will get overriden in ``add_auth`` # later for real requests. self._region_name = region_name self._service_name = service_name def _sign(self, key, msg, hex=False): if hex: - sig = hmac.new(key, msg.encode("utf-8"), sha256).hexdigest() + sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest() else: - sig = hmac.new(key, msg.encode("utf-8"), sha256).digest() + sig = hmac.new(key, msg.encode('utf-8'), sha256).digest() return sig def headers_to_sign(self, request): @@ -128,10 +130,10 @@ def headers_to_sign(self, request): lname = name.lower() if lname not in SIGNED_HEADERS_BLACKLIST: header_map[lname] = value - if "host" not in header_map: + if 'host' not in header_map: # TODO: We should set the host ourselves, instead of relying on our # HTTP client to set it for us. - header_map["host"] = _host_from_url(request.url) + header_map['host'] = _host_from_url(request.url) return header_map def canonical_query_string(self, request): @@ -151,30 +153,30 @@ def _canonical_query_string_params(self, params): params = params.items() for key, value in params: key_val_pairs.append( - (quote(key, safe="-_.~"), quote(str(value), safe="-_.~")) + (quote(key, safe='-_.~'), quote(str(value), safe='-_.~')) ) sorted_key_vals = [] # Sort by the URI-encoded key names, and in the case of # repeated keys, then sort by the value. for key, value in sorted(key_val_pairs): - sorted_key_vals.append(f"{key}={value}") - canonical_query_string = "&".join(sorted_key_vals) + sorted_key_vals.append(f'{key}={value}') + canonical_query_string = '&'.join(sorted_key_vals) return canonical_query_string def _canonical_query_string_url(self, parts): - canonical_query_string = "" + canonical_query_string = '' if parts.query: # [(key, value), (key2, value2)] key_val_pairs = [] - for pair in parts.query.split("&"): - key, _, value = pair.partition("=") + for pair in parts.query.split('&'): + key, _, value = pair.partition('=') key_val_pairs.append((key, value)) sorted_key_vals = [] # Sort by the URI-encoded key names, and in the case of # repeated keys, then sort by the value. for key, value in sorted(key_val_pairs): - sorted_key_vals.append(f"{key}={value}") - canonical_query_string = "&".join(sorted_key_vals) + sorted_key_vals.append(f'{key}={value}') + canonical_query_string = '&'.join(sorted_key_vals) return canonical_query_string def canonical_headers(self, headers_to_sign): @@ -187,11 +189,11 @@ def canonical_headers(self, headers_to_sign): headers = [] sorted_header_names = sorted(set(headers_to_sign)) for key in sorted_header_names: - value = ",".join( + value = ','.join( self._header_value(v) for v in headers_to_sign.get_all(key) ) - headers.append(f"{key}:{ensure_unicode(value)}") - return "\n".join(headers) + headers.append(f'{key}:{ensure_unicode(value)}') + return '\n'.join(headers) def _header_value(self, value): # From the sigv4 docs: @@ -199,16 +201,16 @@ def _header_value(self, value): # # The Trimall function removes excess white space before and after # values, and converts sequential spaces to a single space. - return " ".join(value.split()) + return ' '.join(value.split()) def signed_headers(self, headers_to_sign): headers = sorted(n.lower().strip() for n in set(headers_to_sign)) - return ";".join(headers) + return ';'.join(headers) def _is_streaming_checksum_payload(self, request): - checksum_context = request.context.get("checksum", {}) - algorithm = checksum_context.get("request_algorithm") - return isinstance(algorithm, dict) and algorithm.get("in") == "trailer" + checksum_context = request.context.get('checksum', {}) + algorithm = checksum_context.get('request_algorithm') + return isinstance(algorithm, dict) and algorithm.get('in') == 'trailer' def payload(self, request): if self._is_streaming_checksum_payload(request): @@ -218,11 +220,13 @@ def payload(self, request): # place of the payload checksum. return UNSIGNED_PAYLOAD request_body = request.body - if request_body and hasattr(request_body, "seek"): + if request_body and hasattr(request_body, 'seek'): position = request_body.tell() - read_chunksize = functools.partial(request_body.read, PAYLOAD_BUFFER) + read_chunksize = functools.partial( + request_body.read, PAYLOAD_BUFFER + ) checksum = sha256() - for chunk in iter(read_chunksize, b""): + for chunk in iter(read_chunksize, b''): checksum.update(chunk) hex_checksum = checksum.hexdigest() request_body.seek(position) @@ -236,13 +240,13 @@ def payload(self, request): def _should_sha256_sign_payload(self, request): # Payloads will always be signed over insecure connections. - if not request.url.startswith("https"): + if not request.url.startswith('https'): return True # Certain operations may have payload signing disabled by default. # Since we don't have access to the operation model, we pass in this # bit of metadata through the request context. - return request.context.get("payload_signing_enabled", True) + return request.context.get('payload_signing_enabled', True) def canonical_request(self, request): cr = [request.method.upper()] @@ -250,34 +254,34 @@ def canonical_request(self, request): cr.append(path) cr.append(self.canonical_query_string(request)) headers_to_sign = self.headers_to_sign(request) - cr.append(self.canonical_headers(headers_to_sign) + "\n") + cr.append(self.canonical_headers(headers_to_sign) + '\n') cr.append(self.signed_headers(headers_to_sign)) - if "X-Amz-Content-SHA256" in request.headers: - body_checksum = request.headers["X-Amz-Content-SHA256"] + if 'X-Amz-Content-SHA256' in request.headers: + body_checksum = request.headers['X-Amz-Content-SHA256'] else: body_checksum = self.payload(request) cr.append(body_checksum) - return "\n".join(cr) + return '\n'.join(cr) def _normalize_url_path(self, path): - normalized_path = quote(normalize_url_path(path), safe="/~") + normalized_path = quote(normalize_url_path(path), safe='/~') return normalized_path def scope(self, request): scope = [self.credentials.access_key] - scope.append(request.context["timestamp"][0:8]) + scope.append(request.context['timestamp'][0:8]) scope.append(self._region_name) scope.append(self._service_name) - scope.append("aws4_request") - return "/".join(scope) + scope.append('aws4_request') + return '/'.join(scope) def credential_scope(self, request): scope = [] - scope.append(request.context["timestamp"][0:8]) + scope.append(request.context['timestamp'][0:8]) scope.append(self._region_name) scope.append(self._service_name) - scope.append("aws4_request") - return "/".join(scope) + scope.append('aws4_request') + return '/'.join(scope) def string_to_sign(self, request, canonical_request): """ @@ -285,94 +289,102 @@ def string_to_sign(self, request, canonical_request): containing the original version of all headers that were included in the StringToSign. """ - sts = ["AWS4-HMAC-SHA256"] - sts.append(request.context["timestamp"]) + sts = ['AWS4-HMAC-SHA256'] + sts.append(request.context['timestamp']) sts.append(self.credential_scope(request)) - sts.append(sha256(canonical_request.encode("utf-8")).hexdigest()) - return "\n".join(sts) + sts.append(sha256(canonical_request.encode('utf-8')).hexdigest()) + return '\n'.join(sts) def signature(self, string_to_sign, request): key = self.credentials.secret_key - k_date = self._sign((f"AWS4{key}").encode(), request.context["timestamp"][0:8]) + k_date = self._sign( + (f"AWS4{key}").encode(), request.context["timestamp"][0:8] + ) k_region = self._sign(k_date, self._region_name) k_service = self._sign(k_region, self._service_name) - k_signing = self._sign(k_service, "aws4_request") + k_signing = self._sign(k_service, 'aws4_request') return self._sign(k_signing, string_to_sign, hex=True) def add_auth(self, request): if self.credentials is None: raise NoCredentialsError() datetime_now = datetime.datetime.utcnow() - request.context["timestamp"] = datetime_now.strftime(SIGV4_TIMESTAMP) + request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) # This could be a retry. Make sure the previous # authorization header is removed first. self._modify_request_before_signing(request) canonical_request = self.canonical_request(request) logger.debug("Calculating signature using v4 auth.") - logger.debug("CanonicalRequest:\n%s", canonical_request) + logger.debug('CanonicalRequest:\n%s', canonical_request) string_to_sign = self.string_to_sign(request, canonical_request) - logger.debug("StringToSign:\n%s", string_to_sign) + logger.debug('StringToSign:\n%s', string_to_sign) signature = self.signature(string_to_sign, request) - logger.debug("Signature:\n%s", signature) + logger.debug('Signature:\n%s', signature) self._inject_signature_to_request(request, signature) def _inject_signature_to_request(self, request, signature): - auth_str = ["AWS4-HMAC-SHA256 Credential=%s" % self.scope(request)] + auth_str = ['AWS4-HMAC-SHA256 Credential=%s' % self.scope(request)] headers_to_sign = self.headers_to_sign(request) - auth_str.append(f"SignedHeaders={self.signed_headers(headers_to_sign)}") - auth_str.append("Signature=%s" % signature) - request.headers["Authorization"] = ", ".join(auth_str) + auth_str.append( + f"SignedHeaders={self.signed_headers(headers_to_sign)}" + ) + auth_str.append('Signature=%s' % signature) + request.headers['Authorization'] = ', '.join(auth_str) return request def _modify_request_before_signing(self, request): - if "Authorization" in request.headers: - del request.headers["Authorization"] + if 'Authorization' in request.headers: + del request.headers['Authorization'] self._set_necessary_date_headers(request) if self.credentials.token: - if "X-Amz-Security-Token" in request.headers: - del request.headers["X-Amz-Security-Token"] - request.headers["X-Amz-Security-Token"] = self.credentials.token + if 'X-Amz-Security-Token' in request.headers: + del request.headers['X-Amz-Security-Token'] + request.headers['X-Amz-Security-Token'] = self.credentials.token - if not request.context.get("payload_signing_enabled", True): - if "X-Amz-Content-SHA256" in request.headers: - del request.headers["X-Amz-Content-SHA256"] - request.headers["X-Amz-Content-SHA256"] = UNSIGNED_PAYLOAD + if not request.context.get('payload_signing_enabled', True): + if 'X-Amz-Content-SHA256' in request.headers: + del request.headers['X-Amz-Content-SHA256'] + request.headers['X-Amz-Content-SHA256'] = UNSIGNED_PAYLOAD def _set_necessary_date_headers(self, request): # The spec allows for either the Date _or_ the X-Amz-Date value to be # used so we check both. If there's a Date header, we use the date # header. Otherwise we use the X-Amz-Date header. - if "Date" in request.headers: - del request.headers["Date"] + if 'Date' in request.headers: + del request.headers['Date'] datetime_timestamp = datetime.datetime.strptime( - request.context["timestamp"], SIGV4_TIMESTAMP + request.context['timestamp'], SIGV4_TIMESTAMP ) - request.headers["Date"] = formatdate( + request.headers['Date'] = formatdate( int(calendar.timegm(datetime_timestamp.timetuple())) ) - if "X-Amz-Date" in request.headers: - del request.headers["X-Amz-Date"] + if 'X-Amz-Date' in request.headers: + del request.headers['X-Amz-Date'] else: - if "X-Amz-Date" in request.headers: - del request.headers["X-Amz-Date"] - request.headers["X-Amz-Date"] = request.context["timestamp"] + if 'X-Amz-Date' in request.headers: + del request.headers['X-Amz-Date'] + request.headers['X-Amz-Date'] = request.context['timestamp'] class SigV4QueryAuth(SigV4Auth): DEFAULT_EXPIRES = 3600 - def __init__(self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES): + def __init__( + self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES + ): super().__init__(credentials, service_name, region_name) self._expires = expires def _modify_request_before_signing(self, request): # We automatically set this header, so if it's the auto-set value we # want to get rid of it since it doesn't make sense for presigned urls. - content_type = request.headers.get("content-type") - blacklisted_content_type = "application/x-www-form-urlencoded; charset=utf-8" + content_type = request.headers.get('content-type') + blacklisted_content_type = ( + 'application/x-www-form-urlencoded; charset=utf-8' + ) if content_type == blacklisted_content_type: - del request.headers["content-type"] + del request.headers['content-type'] # Note that we're not including X-Amz-Signature. # From the docs: "The Canonical Query String must include all the query @@ -380,14 +392,14 @@ def _modify_request_before_signing(self, request): signed_headers = self.signed_headers(self.headers_to_sign(request)) auth_params = { - "X-Amz-Algorithm": "AWS4-HMAC-SHA256", - "X-Amz-Credential": self.scope(request), - "X-Amz-Date": request.context["timestamp"], - "X-Amz-Expires": self._expires, - "X-Amz-SignedHeaders": signed_headers, + 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', + 'X-Amz-Credential': self.scope(request), + 'X-Amz-Date': request.context['timestamp'], + 'X-Amz-Expires': self._expires, + 'X-Amz-SignedHeaders': signed_headers, } if self.credentials.token is not None: - auth_params["X-Amz-Security-Token"] = self.credentials.token + auth_params['X-Amz-Security-Token'] = self.credentials.token # Now parse the original query string to a dict, inject our new query # params, and serialize back to a query string. url_parts = urlsplit(request.url) @@ -406,15 +418,17 @@ def _modify_request_before_signing(self, request): # new_query_params.update(op_params) # new_query_params.update(auth_params) # percent_encode_sequence(new_query_params) - operation_params = "" + operation_params = '' if request.data: # We also need to move the body params into the query string. To # do this, we first have to convert it to a dict. query_dict.update(_get_body_as_dict(request)) - request.data = "" + request.data = '' if query_dict: - operation_params = percent_encode_sequence(query_dict) + "&" - new_query_string = f"{operation_params}{percent_encode_sequence(auth_params)}" + operation_params = percent_encode_sequence(query_dict) + '&' + new_query_string = ( + f"{operation_params}{percent_encode_sequence(auth_params)}" + ) # url_parts is a tuple (and therefore immutable) so we need to create # a new url_parts with the new query string. # - @@ -431,7 +445,7 @@ def _inject_signature_to_request(self, request, signature): # Rather than calculating an "Authorization" header, for the query # param quth, we just append an 'X-Amz-Signature' param to the end # of the query string. - request.url += "&X-Amz-Signature=%s" % signature + request.url += '&X-Amz-Signature=%s' % signature class S3SigV4QueryAuth(SigV4QueryAuth): diff --git a/diaspora_event_sdk/sdk/botocore/awsrequest.py b/diaspora_event_sdk/sdk/botocore/awsrequest.py index 7ba0816..ca18281 100644 --- a/diaspora_event_sdk/sdk/botocore/awsrequest.py +++ b/diaspora_event_sdk/sdk/botocore/awsrequest.py @@ -38,7 +38,7 @@ class AWSRequestPreparer: Botocore either performs these validations elsewhere or otherwise consistently provides well formatted URLs. - This class does not heavily prepare the body. Body preparation is + This class does not heavily prepare the body. Body preperation is simple and supports only the cases that we document: bytes and file-like objects to determine the content-length. This will also additionally prepare a body that is a dict to be url encoded params @@ -61,7 +61,7 @@ def _prepare_url(self, original): url = original.url if original.params: url_parts = urlparse(url) - delim = "&" if url_parts.query else "?" + delim = '&' if url_parts.query else '?' if isinstance(original.params, Mapping): params_to_encode = list(original.params.items()) else: @@ -74,35 +74,35 @@ def _prepare_headers(self, original, prepared_body=None): headers = HeadersDict(original.headers.items()) # If the transfer encoding or content length is already set, use that - if "Transfer-Encoding" in headers or "Content-Length" in headers: + if 'Transfer-Encoding' in headers or 'Content-Length' in headers: return headers # Ensure we set the content length when it is expected - if original.method not in ("GET", "HEAD", "OPTIONS"): + if original.method not in ('GET', 'HEAD', 'OPTIONS'): length = self._determine_content_length(prepared_body) if length is not None: - headers["Content-Length"] = str(length) + headers['Content-Length'] = str(length) else: # Failed to determine content length, using chunked # NOTE: This shouldn't ever happen in practice body_type = type(prepared_body) - logger.debug("Failed to determine length of %s", body_type) - headers["Transfer-Encoding"] = "chunked" + logger.debug('Failed to determine length of %s', body_type) + headers['Transfer-Encoding'] = 'chunked' return headers def _to_utf8(self, item): key, value = item if isinstance(key, str): - key = key.encode("utf-8") + key = key.encode('utf-8') if isinstance(value, str): - value = value.encode("utf-8") + value = value.encode('utf-8') return key, value def _prepare_body(self, original): """Prepares the given HTTP body data.""" body = original.data - if body == b"": + if body == b'': body = None if isinstance(body, dict): @@ -169,7 +169,7 @@ def prepare(self): def body(self): body = self.prepare().body if isinstance(body, str): - body = body.encode("utf-8") + body = body.encode('utf-8') return body @@ -194,7 +194,10 @@ def __init__(self, method, url, headers, body, stream_output): self.stream_output = stream_output def __repr__(self): - fmt = "" + fmt = ( + '' + ) return fmt % (self.stream_output, self.method, self.url, self.headers) def reset_stream(self): diff --git a/diaspora_event_sdk/sdk/botocore/compat.py b/diaspora_event_sdk/sdk/botocore/compat.py index 313d74f..9f7648e 100644 --- a/diaspora_event_sdk/sdk/botocore/compat.py +++ b/diaspora_event_sdk/sdk/botocore/compat.py @@ -12,18 +12,36 @@ # language governing permissions and limitations under the License. from urllib.parse import ( + quote, + urlencode, + unquote, unquote_plus, + urlparse, + urlsplit, + urlunsplit, + urljoin, + parse_qsl, + parse_qs, ) +import json +from itertools import zip_longest +from email.utils import formatdate +from base64 import encodebytes from io import IOBase as _IOBase +from http.client import HTTPResponse import copy +import datetime import sys import inspect +import warnings import hashlib from http.client import HTTPMessage import logging import shlex import re import os +from collections import OrderedDict +from collections.abc import MutableMapping from math import floor # from dateutil.tz import tzlocal @@ -66,7 +84,7 @@ def ensure_unicode(s, encoding=None, errors=None): return s -def ensure_bytes(s, encoding="utf-8", errors="strict"): +def ensure_bytes(s, encoding='utf-8', errors='strict'): if isinstance(s, str): return s.encode(encoding, errors) if isinstance(s, bytes): @@ -194,7 +212,7 @@ def _windows_shell_split(s): is_quoted = False num_backslashes = 0 for character in s: - if character == "\\": + if character == '\\': # We can't simply append backslashes because we don't know if # they are being used as escape characters or not. Instead we # keep track of how many we've encountered and handle them when @@ -204,7 +222,7 @@ def _windows_shell_split(s): if num_backslashes > 0: # The backslashes are in a chain leading up to a double # quote, so they are escaping each other. - buff.append("\\" * int(floor(num_backslashes / 2))) + buff.append('\\' * int(floor(num_backslashes / 2))) remainder = num_backslashes % 2 num_backslashes = 0 if remainder == 1: @@ -223,24 +241,24 @@ def _windows_shell_split(s): # sure it sticks around if there's nothing else between quotes. # If there is other stuff between quotes, the empty string will # disappear during the joining process. - buff.append("") - elif character in [" ", "\t"] and not is_quoted: + buff.append('') + elif character in [' ', '\t'] and not is_quoted: # Since the backslashes aren't leading up to a quote, we put in # the exact number of backslashes. if num_backslashes > 0: - buff.append("\\" * num_backslashes) + buff.append('\\' * num_backslashes) num_backslashes = 0 # Excess whitespace is ignored, so only add the components list # if there is anything in the buffer. if buff: - components.append("".join(buff)) + components.append(''.join(buff)) buff = [] else: # Since the backslashes aren't leading up to a quote, we put in # the exact number of backslashes. if num_backslashes > 0: - buff.append("\\" * num_backslashes) + buff.append('\\' * num_backslashes) num_backslashes = 0 buff.append(character) @@ -251,11 +269,11 @@ def _windows_shell_split(s): # There may be some leftover backslashes, so we need to add them in. # There's no quote so we add the exact number. if num_backslashes > 0: - buff.append("\\" * num_backslashes) + buff.append('\\' * num_backslashes) # Add the final component in if there is anything in the buffer. if buff: - components.append("".join(buff)) + components.append(''.join(buff)) return components @@ -274,11 +292,11 @@ def _windows_shell_split(s): # Detect if CRT is available for use try: - import awscrt.auth # type: ignore[import] # noqa: F401 + import awscrt.auth # type: ignore[import] # Allow user opt-out if needed - disabled = os.environ.get("BOTO_DISABLE_CRT", "false") - HAS_CRT = not disabled.lower() == "true" + disabled = os.environ.get('BOTO_DISABLE_CRT', "false") + HAS_CRT = not disabled.lower() == 'true' except ImportError: HAS_CRT = False @@ -315,19 +333,20 @@ def _windows_shell_split(s): "(?:(?:%(hex)s:){0,6}%(hex)s)?::", ] -UNRESERVED_PAT = r"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._!\-~" +UNRESERVED_PAT = ( + r"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._!\-~" +) IPV6_PAT = "(?:" + "|".join([x % _subs for x in _variations]) + ")" ZONE_ID_PAT = "(?:%25|%)(?:[" + UNRESERVED_PAT + "]|%[a-fA-F0-9]{2})+" IPV6_ADDRZ_PAT = r"\[" + IPV6_PAT + r"(?:" + ZONE_ID_PAT + r")?\]" IPV6_ADDRZ_RE = re.compile("^" + IPV6_ADDRZ_PAT + "$") # These are the characters that are stripped by post-bpo-43882 urlparse(). -UNSAFE_URL_CHARS = frozenset("\t\r\n") +UNSAFE_URL_CHARS = frozenset('\t\r\n') # Detect if gzip is available for use try: - import gzip # noqa: F401 - + import gzip HAS_GZIP = True except ImportError: HAS_GZIP = False diff --git a/diaspora_event_sdk/sdk/botocore/credentials.py b/diaspora_event_sdk/sdk/botocore/credentials.py index 1bbf144..5fac149 100644 --- a/diaspora_event_sdk/sdk/botocore/credentials.py +++ b/diaspora_event_sdk/sdk/botocore/credentials.py @@ -19,7 +19,7 @@ logger = logging.getLogger(__name__) ReadOnlyCredentials = namedtuple( - "ReadOnlyCredentials", ["access_key", "secret_key", "token"] + 'ReadOnlyCredentials', ['access_key', 'secret_key', 'token'] ) _DEFAULT_MANDATORY_REFRESH_TIMEOUT = 10 * 60 # 10 min @@ -43,7 +43,7 @@ def __init__(self, access_key, secret_key, token=None, method=None): self.token = token if method is None: - method = "explicit" + method = 'explicit' self.method = method self._normalize() @@ -61,4 +61,6 @@ def _normalize(self): self.secret_key = ensure_unicode(self.secret_key) def get_frozen_credentials(self): - return ReadOnlyCredentials(self.access_key, self.secret_key, self.token) + return ReadOnlyCredentials( + self.access_key, self.secret_key, self.token + ) diff --git a/diaspora_event_sdk/sdk/botocore/exceptions.py b/diaspora_event_sdk/sdk/botocore/exceptions.py index 55b2fe7..5d56380 100644 --- a/diaspora_event_sdk/sdk/botocore/exceptions.py +++ b/diaspora_event_sdk/sdk/botocore/exceptions.py @@ -12,7 +12,6 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. - def _exception_from_packed_args(exception_cls, args=None, kwargs=None): # This is helpful for reducing Exceptions that only accept kwargs as # only positional arguments can be provided for __reduce__ @@ -32,7 +31,7 @@ class BotoCoreError(Exception): :ivar msg: The descriptive message associated with the error. """ - fmt = "An unspecified error occurred" + fmt = 'An unspecified error occurred' def __init__(self, **kwargs): msg = self.fmt.format(**kwargs) @@ -48,13 +47,16 @@ class NoCredentialsError(BotoCoreError): No credentials could be found. """ - fmt = "Unable to locate credentials" + fmt = 'Unable to locate credentials' class UnseekableStreamError(BotoCoreError): """Need to seek a stream, but stream does not support seeking.""" - fmt = "Need to rewind the stream {stream_object}, but stream " "is not seekable." + fmt = ( + 'Need to rewind the stream {stream_object}, but stream ' + 'is not seekable.' + ) class MD5UnavailableError(BotoCoreError): diff --git a/diaspora_event_sdk/sdk/botocore/utils.py b/diaspora_event_sdk/sdk/botocore/utils.py index 68e9490..8409363 100644 --- a/diaspora_event_sdk/sdk/botocore/utils.py +++ b/diaspora_event_sdk/sdk/botocore/utils.py @@ -23,25 +23,25 @@ logger = logging.getLogger(__name__) DEFAULT_METADATA_SERVICE_TIMEOUT = 1 -METADATA_BASE_URL = "http://169.254.169.254/" -METADATA_BASE_URL_IPv6 = "http://[fd00:ec2::254]/" -METADATA_ENDPOINT_MODES = ("ipv4", "ipv6") +METADATA_BASE_URL = 'http://169.254.169.254/' +METADATA_BASE_URL_IPv6 = 'http://[fd00:ec2::254]/' +METADATA_ENDPOINT_MODES = ('ipv4', 'ipv6') # These are chars that do not need to be urlencoded. # Based on rfc2986, section 2.3 -SAFE_CHARS = "-._~" -LABEL_RE = re.compile(r"[a-z0-9][a-z0-9\-]*[a-z0-9]") +SAFE_CHARS = '-._~' +LABEL_RE = re.compile(r'[a-z0-9][a-z0-9\-]*[a-z0-9]') # RETRYABLE_HTTP_ERRORS = ( # ReadTimeoutError, # EndpointConnectionError, # ConnectionClosedError, # ConnectTimeoutError, # ) -S3_ACCELERATE_WHITELIST = ["dualstack"] +S3_ACCELERATE_WHITELIST = ['dualstack'] # This pattern can be used to detect if a header is a flexible checksum header CHECKSUM_HEADER_PATTERN = re.compile( - r"^X-Amz-Checksum-([a-z0-9]*)$", + r'^X-Amz-Checksum-([a-z0-9]*)$', flags=re.IGNORECASE, ) @@ -58,7 +58,7 @@ def determine_content_length(body): pass # Try getting the length from a seekable stream - if hasattr(body, "seek") and hasattr(body, "tell"): + if hasattr(body, 'seek') and hasattr(body, 'tell'): try: orig_pos = body.tell() body.seek(0, 2) @@ -78,13 +78,13 @@ def determine_content_length(body): def is_valid_ipv6_endpoint_url(endpoint_url): if UNSAFE_URL_CHARS.intersection(endpoint_url): return False - hostname = f"[{urlparse(endpoint_url).hostname}]" + hostname = f'[{urlparse(endpoint_url).hostname}]' return IPV6_ADDRZ_RE.match(hostname) is not None def normalize_url_path(path): if not path: - return "/" + return '/' return remove_dot_segments(path) @@ -93,26 +93,26 @@ def remove_dot_segments(url): # Also, AWS services require consecutive slashes to be removed, # so that's done here as well if not url: - return "" - input_url = url.split("/") + return '' + input_url = url.split('/') output_list = [] for x in input_url: - if x and x != ".": - if x == "..": + if x and x != '.': + if x == '..': if output_list: output_list.pop() else: output_list.append(x) - if url[0] == "/": - first = "/" + if url[0] == '/': + first = '/' else: - first = "" - if url[-1] == "/" and output_list: - last = "/" + first = '' + if url[-1] == '/' and output_list: + last = '/' else: - last = "" - return first + "/".join(output_list) + last + last = '' + return first + '/'.join(output_list) + last def percent_encode_sequence(mapping, safe=SAFE_CHARS): @@ -125,7 +125,7 @@ def percent_encode_sequence(mapping, safe=SAFE_CHARS): to be encoded, which matches what AWS services expect. If any value in the input ``mapping`` is a list type, - then each list element will be serialized. This is the equivalent + then each list element wil be serialized. This is the equivalent to ``urlencode``'s ``doseq=True`` argument. This function should be preferred over the stdlib @@ -136,17 +136,21 @@ def percent_encode_sequence(mapping, safe=SAFE_CHARS): """ encoded_pairs = [] - if hasattr(mapping, "items"): + if hasattr(mapping, 'items'): pairs = mapping.items() else: pairs = mapping for key, value in pairs: if isinstance(value, list): for element in value: - encoded_pairs.append(f"{percent_encode(key)}={percent_encode(element)}") + encoded_pairs.append( + f'{percent_encode(key)}={percent_encode(element)}' + ) else: - encoded_pairs.append(f"{percent_encode(key)}={percent_encode(value)}") - return "&".join(encoded_pairs) + encoded_pairs.append( + f'{percent_encode(key)}={percent_encode(value)}' + ) + return '&'.join(encoded_pairs) def percent_encode(input_str, safe=SAFE_CHARS): @@ -166,5 +170,5 @@ def percent_encode(input_str, safe=SAFE_CHARS): input_str = str(input_str) # If it's not bytes, make it bytes by UTF-8 encoding it. if not isinstance(input_str, bytes): - input_str = input_str.encode("utf-8") + input_str = input_str.encode('utf-8') return quote(input_str, safe=safe) diff --git a/diaspora_event_sdk/sdk/client.py b/diaspora_event_sdk/sdk/client.py index 741e611..c6a12b3 100644 --- a/diaspora_event_sdk/sdk/client.py +++ b/diaspora_event_sdk/sdk/client.py @@ -28,8 +28,7 @@ def __init__( self.login_manager.ensure_logged_in() self.web_client = self.login_manager.get_web_client( - base_url=self.web_service_address - ) + base_url=self.web_service_address) self.auth_client = self.login_manager.get_auth_client() self.subject_openid = self.auth_client.oauth2_userinfo()["sub"] @@ -154,9 +153,7 @@ def update_topic_partitions(self, topic, new_partitions): """ Increases the number of partitions for a given topic to the specified new partition count. """ - return self.web_client.update_topic_partitions( - self.subject_openid, topic, new_partitions - ) + return self.web_client.update_topic_partitions(self.subject_openid, topic, new_partitions) @requires_login def reset_topic(self, topic): @@ -170,18 +167,14 @@ def grant_user_access(self, topic, user): """ Authorizes another user to access a registered topic under the invoker's account. """ - return self.web_client.grant_user_access( - self.subject_openid, topic, user, "grant" - ) + return self.web_client.grant_user_access(self.subject_openid, topic, user, "grant") @requires_login def revoke_user_access(self, topic, user): """ Removes access permissions for another user from a registered topic under the invoker's account. """ - return self.web_client.grant_user_access( - self.subject_openid, topic, user, "revoke" - ) + return self.web_client.grant_user_access(self.subject_openid, topic, user, "revoke") @requires_login def list_topic_users(self, topic): @@ -200,16 +193,10 @@ def list_triggers(self): @requires_login def create_trigger(self, topic, function, function_configs, trigger_configs): """ - Creates a new trigger under the user's account with specific function and invocation configurations. + Creates a new trigger under the user's account with specific function and invocation configurations. """ return self.web_client.create_trigger( - self.subject_openid, - topic, - function, - "create", - function_configs, - trigger_configs, - ) + self.subject_openid, topic, function, "create", function_configs, trigger_configs) @requires_login def delete_trigger(self, topic, function): @@ -217,28 +204,27 @@ def delete_trigger(self, topic, function): Deletes a trigger and related AWS resources, while the associated topic remains unaffected. """ return self.web_client.create_trigger( - self.subject_openid, topic, function, "delete", {}, {} - ) + self.subject_openid, topic, function, "delete", {}, {}) @requires_login def update_trigger(self, trigger_uuid, trigger_configs): """ Updates invocation configurations of an existing trigger, identified by its unique trigger UUID. """ - return self.web_client.update_trigger( - self.subject_openid, trigger_uuid, trigger_configs - ) + return self.web_client.update_trigger(self.subject_openid, trigger_uuid, trigger_configs) @requires_login def list_log_streams(self, trigger): """ List log streams of a trigger under the user's account """ - return self.web_client.list_log_streams(self.subject_openid, trigger) + return self.web_client.list_log_streams( + self.subject_openid, trigger) @requires_login def get_log_events(self, trigger, stream): """ Get events in a particular log stream of a trigger under the user's account """ - return self.web_client.get_log_events(self.subject_openid, trigger, stream) + return self.web_client.get_log_events( + self.subject_openid, trigger, stream) diff --git a/diaspora_event_sdk/sdk/kafka_client.py b/diaspora_event_sdk/sdk/kafka_client.py index 2a67cc2..9fbbdc5 100644 --- a/diaspora_event_sdk/sdk/kafka_client.py +++ b/diaspora_event_sdk/sdk/kafka_client.py @@ -17,7 +17,7 @@ class MSKTokenProvider: def token(self): token, _ = generate_auth_token("us-east-1") return token -except Exception: +except Exception as e: kafka_available = False @@ -90,7 +90,7 @@ def producer_connection_test(result): value={"message": "Synchronous message from Diaspora SDK"}, ) result["producer_connection_test"] = future.get(timeout=10) - except Exception: + except Exception as e: pass def consumer_connection_test(result): @@ -103,7 +103,7 @@ def consumer_connection_test(result): for msg in consumer: result["consumer_connection_test"] = msg break - except Exception: + except Exception as e: pass result, retry_count = {}, 0 diff --git a/diaspora_event_sdk/sdk/login_manager/client_login.py b/diaspora_event_sdk/sdk/login_manager/client_login.py index 2a0eeaf..1b33837 100644 --- a/diaspora_event_sdk/sdk/login_manager/client_login.py +++ b/diaspora_event_sdk/sdk/login_manager/client_login.py @@ -4,7 +4,6 @@ The design is based on the Globus CLI client login: https://github.com/globus/globus-cli/blob/main/src/globus_cli/login_manager/client_login.py """ - from __future__ import annotations import logging diff --git a/diaspora_event_sdk/sdk/login_manager/login_flow.py b/diaspora_event_sdk/sdk/login_manager/login_flow.py index 82ee61b..e7a2db0 100644 --- a/diaspora_event_sdk/sdk/login_manager/login_flow.py +++ b/diaspora_event_sdk/sdk/login_manager/login_flow.py @@ -32,4 +32,4 @@ def do_link_auth_flow(scopes: List[str]): auth_code = input("Enter the resulting Authorization Code here: ").strip() # finish auth flow - return auth_client.oauth2_exchange_code_for_tokens(auth_code) + return auth_client.oauth2_exchange_code_for_tokens(auth_code) \ No newline at end of file diff --git a/diaspora_event_sdk/sdk/login_manager/protocol.py b/diaspora_event_sdk/sdk/login_manager/protocol.py index ccfd41c..e00c721 100644 --- a/diaspora_event_sdk/sdk/login_manager/protocol.py +++ b/diaspora_event_sdk/sdk/login_manager/protocol.py @@ -16,10 +16,14 @@ @runtime_checkable class LoginManagerProtocol(Protocol): - def ensure_logged_in(self) -> None: ... + def ensure_logged_in(self) -> None: + ... - def logout(self) -> bool: ... + def logout(self) -> bool: + ... - def get_auth_client(self) -> globus_sdk.AuthClient: ... + def get_auth_client(self) -> globus_sdk.AuthClient: + ... - def get_web_client(self, *, base_url: str | None = None) -> WebClient: ... + def get_web_client(self, *, base_url: str | None = None) -> WebClient: + ... diff --git a/diaspora_event_sdk/sdk/login_manager/tokenstore.py b/diaspora_event_sdk/sdk/login_manager/tokenstore.py index c99edd5..1e9857d 100644 --- a/diaspora_event_sdk/sdk/login_manager/tokenstore.py +++ b/diaspora_event_sdk/sdk/login_manager/tokenstore.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import os import pathlib @@ -7,6 +8,7 @@ from .._environments import _get_envname from .client_login import get_client_login, is_client_login +from .globus_auth import internal_auth_client def _home() -> pathlib.Path: diff --git a/diaspora_event_sdk/sdk/web_client.py b/diaspora_event_sdk/sdk/web_client.py index 8d1de24..a4a9371 100644 --- a/diaspora_event_sdk/sdk/web_client.py +++ b/diaspora_event_sdk/sdk/web_client.py @@ -36,14 +36,15 @@ def register_topic( ) -> globus_sdk.GlobusHTTPResponse: return self.put( f"/api/v2/topic/{topic}", - headers={"Subject": str(subject), "Action": action}, + headers={"Subject": str(subject), "Action": action} ) def get_topic_configs( self, subject: UUID_LIKE_T, topic: str ) -> globus_sdk.GlobusHTTPResponse: return self.get( - f"/api/v2/topic/{topic}", headers={"Subject": str(subject), "Topic": topic} + f"/api/v2/topic/{topic}", + headers={"Subject": str(subject), "Topic": topic} ) def update_topic_configs( @@ -51,12 +52,9 @@ def update_topic_configs( ) -> globus_sdk.GlobusHTTPResponse: return self.post( f"/api/v2/topic/{topic}", - headers={ - "Subject": str(subject), - "Topic": topic, - "Content-Type": "text/plain", - }, - data=json.dumps(configs).encode("utf-8"), + headers={"Subject": str(subject), "Topic": topic, + "Content-Type": "text/plain"}, + data=json.dumps(configs).encode("utf-8") ) def update_topic_partitions( @@ -64,11 +62,8 @@ def update_topic_partitions( ) -> globus_sdk.GlobusHTTPResponse: return self.post( f"/api/v2/topic/{topic}/partitions", - headers={ - "Subject": str(subject), - "Topic": topic, - "NewPartitions": str(new_partitions), - }, + headers={"Subject": str(subject), "Topic": topic, + "NewPartitions": str(new_partitions)} ) def reset_topic( @@ -76,7 +71,8 @@ def reset_topic( ) -> globus_sdk.GlobusHTTPResponse: return self.post( f"/api/v2/topic/{topic}/reset", - headers={"Subject": str(subject), "Topic": topic}, + headers={"Subject": str(subject), + "Topic": topic} ) def grant_user_access( @@ -84,12 +80,8 @@ def grant_user_access( ) -> globus_sdk.GlobusHTTPResponse: return self.post( f"/api/v2/topic/{topic}/user", - headers={ - "Subject": str(subject), - "Action": action, - "Topic": topic, - "User": str(user), - }, + headers={"Subject": str(subject), "Action": action, + "Topic": topic, "User": str(user)} ) def list_topic_users( @@ -97,33 +89,24 @@ def list_topic_users( ) -> globus_sdk.GlobusHTTPResponse: return self.get( f"/api/v2/topic/{topic}/users", - headers={"Subject": str(subject), "Topic": topic}, + headers={"Subject": str(subject), + "Topic": topic} ) def list_triggers(self, subject: UUID_LIKE_T) -> globus_sdk.GlobusHTTPResponse: return self.get("/api/v2/triggers", headers={"Subject": str(subject)}) def create_trigger( - self, - subject: UUID_LIKE_T, - topic: str, - function: str, - action: str, - function_configs: dict, - trigger_configs: dict, + self, subject: UUID_LIKE_T, topic: str, function: str, action: str, + function_configs: dict, trigger_configs: dict ) -> globus_sdk.GlobusHTTPResponse: return self.put( "/api/v2/trigger", - headers={ - "Subject": str(subject), - "Topic": topic, - "Trigger": function, - "Action": action, - "Content-Type": "text/plain", - }, - data=json.dumps( - {"function": function_configs, "trigger": trigger_configs} - ).encode("utf-8"), + headers={"Subject": str(subject), "Topic": topic, + "Trigger": function, "Action": action, + "Content-Type": "text/plain"}, + data=json.dumps({"function": function_configs, + "trigger": trigger_configs}).encode("utf-8") ) def update_trigger( @@ -131,25 +114,24 @@ def update_trigger( ) -> globus_sdk.GlobusHTTPResponse: return self.post( f"/api/v2/triggers/{trigger_uuid}", - headers={ - "Subject": str(subject), - "Trigger_id": str(trigger_uuid), - "Content-Type": "text/plain", - }, - data=json.dumps(trigger_configs).encode("utf-8"), + headers={"Subject": str(subject), "Trigger_id": str(trigger_uuid), + "Content-Type": "text/plain"}, + data=json.dumps(trigger_configs).encode("utf-8") ) def list_log_streams( self, subject: UUID_LIKE_T, trigger: str ) -> globus_sdk.GlobusHTTPResponse: return self.get( - "/api/v2/logs", headers={"Subject": str(subject), "Trigger": trigger} + f"/api/v2/logs", + headers={"Subject": str(subject), "Trigger": trigger} ) def get_log_events( self, subject: UUID_LIKE_T, trigger: str, stream: str ) -> globus_sdk.GlobusHTTPResponse: return self.get( - "/api/v2/log", - headers={"Subject": str(subject), "Trigger": trigger, "Stream": stream}, + f"/api/v2/log", + headers={"Subject": str(subject), "Trigger": trigger, + "Stream": stream} ) diff --git a/docs/quickstart.md b/docs/quickstart.md index 0c78827..674c3a0 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -17,10 +17,10 @@ Aside from topic authorization, authentication requires a username (user's OpenI To avoid errors during this delay, use `block_until_ready()`: -```python +```python from diaspora_event_sdk import block_until_ready assert block_until_ready() -# now the secret key is ready to use +# now the secret key is ready to use ``` This function waits until the key activates, usually within a few seconds. Subsequent `block_until_ready()` calls should return even faster. Still, include this in test/setup scripts, but not on the critical (happy) path. @@ -28,7 +28,7 @@ This function waits until the key activates, usually within a few seconds. Subse ## Producing or Consuming -Once the topic is registered and the access key and secret key are ready (through `c = GlobusClient()`), we can publish messages to it. The `KafkaProducer` wraps the [Python KafkaProducer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html), and event publication can be either synchronous or asynchronous. Below demonstrates the synchronous approach. +Once the topic is registered and the access key and secret key are ready (through `c = GlobusClient()`), we can publish messages to it. The `KafkaProducer` wraps the [Python KafkaProducer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html), and event publication can be either synchronous or asynchronous. Below demonstrates the synchronous approach. ```python from diaspora_event_sdk import KafkaProducer @@ -70,7 +70,7 @@ If you're opting to use a custom Kafka client library, here are the necessary cl | Bootstrap Servers | `c.retrieve_key()['endpoint']` | | Security Protocol | `SASL_SSL` | | Sasl Mechanism | `OAUTHBEARER` | -| Username | `c.retrieve_key()['access_key']` | +| Username | `c.retrieve_key()['access_key']` | | Password | `c.retrieve_key()['secret_key']` | The bootstrap server address, OAuth access key, and OAuth secret key can be retrieved through `retrieve_key()` and invalidated through `create_key()`. To use Diaspora Event Fabric with `confluent-kafka-python`, please refer to [this guide](https://github.com/aws/aws-msk-iam-sasl-signer-python?tab=readme-ov-file). For other programming languages, please refer to [this post](https://aws.amazon.com/blogs/big-data/amazon-msk-iam-authentication-now-supports-all-programming-languages/). @@ -78,4 +78,4 @@ The bootstrap server address, OAuth access key, and OAuth secret key can be retr from diaspora_event_sdk import Client as GlobusClient c = GlobusClient() print(c.retrieve_key()) -``` +``` \ No newline at end of file diff --git a/docs/troubleshooting.md b/docs/troubleshooting.md index b8278a1..bceb65f 100644 --- a/docs/troubleshooting.md +++ b/docs/troubleshooting.md @@ -62,7 +62,7 @@ from diaspora_event_sdk import block_until_ready c = GlobusClient() c.put_secret_key("", "", "") -print(c.retrieve_key()) +print(c.retrieve_key()) assert block_until_ready() # Should unblock in 1-10 seconds ``` @@ -71,13 +71,13 @@ assert block_until_ready() # Should unblock in 1-10 seconds ### Key Management After Logout and Login -If you log out and then log in again, any subsequent call to `block_until_ready()` or an attempt to create a producer or consumer will internally trigger the `create_key()` function because no secret key is found in `storage.db`. This API call will invalidate all previously issued keys and retrieve a new one. +If you log out and then log in again, any subsequent call to `block_until_ready()` or an attempt to create a producer or consumer will internally trigger the `create_key()` function because no secret key is found in `storage.db`. This API call will invalidate all previously issued keys and retrieve a new one. To avoid accidentally invalidating the secret key, it's recommended to use `put_secret_key()` (see above section) before calling `block_until_ready()` or creating a producer or consumer after re-login. This method allows you to manually set the secret key, ensuring that the existing key is not unintentionally invalidated. ### Managing Keys Across Multiple Machines -If machine A is logged in with Globus Auth credentials and has the AWS secret key stored in `storage.db`, logging into machine B with the same Globus Auth credential and calling `block_until_ready()` will invalidate the key on machine A. To ensure both machines have valid secret keys, follow the section above. +If machine A is logged in with Globus Auth credentials and has the AWS secret key stored in `storage.db`, logging into machine B with the same Globus Auth credential and calling `block_until_ready()` will invalidate the key on machine A. To ensure both machines have valid secret keys, follow the section above. If both machines have a valid secret key in storage.db, calling create_key() on one machine will not update the key on the other. This desynchronization can cause block_until_ready() to timeout on the machine with the outdated key. diff --git a/mypy.ini b/mypy.ini deleted file mode 100644 index 388f3dd..0000000 --- a/mypy.ini +++ /dev/null @@ -1,4 +0,0 @@ -[mypy] - -[mypy-diaspora_event_sdk.sdk.botocore.*] -ignore_errors = True \ No newline at end of file diff --git a/setup.py b/setup.py index a729ee3..47d1805 100644 --- a/setup.py +++ b/setup.py @@ -4,15 +4,7 @@ from setuptools import setup, find_packages -TEST_REQUIRES = [ - "pytest", - "pytest-cov", - "coverage", - "mypy", - "tox", - "check-manifest", - "pre-commit", -] +TEST_REQUIRES = ["pytest", "pytest-cov", "coverage", "mypy", "tox", "check-manifest"] def parse_version(): diff --git a/tests/unit/apis_test.py b/tests/unit/test_apis.py similarity index 98% rename from tests/unit/apis_test.py rename to tests/unit/test_apis.py index e9abd17..eb761bb 100644 --- a/tests/unit/apis_test.py +++ b/tests/unit/test_apis.py @@ -4,6 +4,7 @@ from globus_sdk import ConfidentialAppAuthClient from diaspora_event_sdk import Client from diaspora_event_sdk.sdk.login_manager import tokenstore +from diaspora_event_sdk.sdk._environments import get_web_service_url # Configure module-level logger logging.basicConfig( diff --git a/tests/unit/client_test.py b/tests/unit/test_client.py similarity index 100% rename from tests/unit/client_test.py rename to tests/unit/test_client.py diff --git a/tox.ini b/tox.ini index 7581bc2..7ee5903 100644 --- a/tox.ini +++ b/tox.ini @@ -21,6 +21,6 @@ commands = coverage report [testenv:mypy] -deps = +deps = mypy -commands = mypy -p diaspora_event_sdk {posargs} +commands = mypy -p diaspora_event_sdk {posargs} \ No newline at end of file