Skip to content

Commit

Permalink
Merge pull request #13 from globus-labs/pre-commit-ci-update-config
Browse files Browse the repository at this point in the history
[pre-commit.ci] pre-commit autoupdate
  • Loading branch information
haochenpan authored Sep 9, 2024
2 parents 0b228da + eead1c2 commit a454456
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 62 deletions.
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ repos:
hooks:
- id: codespell
- repo: https://github.com/nbQA-dev/nbQA
rev: 1.8.5
rev: 1.8.7
hooks:
- id: nbqa-ruff
- repo: https://github.com/kynan/nbstripout
rev: 0.7.1
hooks:
- id: nbstripout
- repo: 'https://github.com/charliermarsh/ruff-pre-commit'
rev: v0.4.7
rev: v0.6.3
hooks:
- id: ruff
args:
- '--fix'
- id: ruff-format
- repo: 'https://github.com/pre-commit/mirrors-mypy'
rev: v1.10.0
rev: v1.11.2
hooks:
- id: mypy
additional_dependencies: []
Expand Down
126 changes: 72 additions & 54 deletions DiasporaDemo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@
" If the topic is not registered (or registered by others), return an error message.\n",
" expected return: { \"status\": \"success\", \"before\": {...}, \"after\": {...}}\n",
"\"\"\"\n",
"configs = {'min.insync.replicas': 1}\n",
"configs = {\"min.insync.replicas\": 1}\n",
"print(c.update_topic_configs(topic, configs))"
]
},
Expand All @@ -283,10 +283,7 @@
" Adjust more than one configuration in a single update_topic_configs request.\n",
" expected return: { \"status\": \"success\", \"before\": {...}, \"after\": {...}}\n",
"\"\"\"\n",
"configs = {\n",
" 'delete.retention.ms': 43200000,\n",
" 'retention.ms': 43200000\n",
"}\n",
"configs = {\"delete.retention.ms\": 43200000, \"retention.ms\": 43200000}\n",
"print(c.update_topic_configs(topic, configs))"
]
},
Expand Down Expand Up @@ -361,7 +358,9 @@
" expected return (subsequent): { \"status\": \"no-op\", \"message\": ...}\n",
"\n",
"\"\"\"\n",
"print(c.grant_user_access(topic, \"e2a8169b-feef-4d56-8eba-ab12747bee04\")) # a virtual user"
"print(\n",
" c.grant_user_access(topic, \"e2a8169b-feef-4d56-8eba-ab12747bee04\")\n",
") # a virtual user"
]
},
{
Expand All @@ -374,7 +373,7 @@
" Returns a list of users that have access to the topic.\n",
" expected return (subsequent): { \"status\": \"success\", \"users\": [...]}\n",
"\"\"\"\n",
"print(c.list_topic_users(topic)) "
"print(c.list_topic_users(topic))"
]
},
{
Expand All @@ -388,7 +387,9 @@
" expected return (first time): { \"status\": \"success\", \"message\": ...}\n",
" expected return (subsequent): { \"status\": \"no-op\", \"message\": ...}\n",
"\"\"\"\n",
"print(c.revoke_user_access(topic, \"e2a8169b-feef-4d56-8eba-ab12747bee04\")) # a virtual user"
"print(\n",
" c.revoke_user_access(topic, \"e2a8169b-feef-4d56-8eba-ab12747bee04\")\n",
") # a virtual user"
]
},
{
Expand All @@ -397,15 +398,17 @@
"metadata": {},
"outputs": [],
"source": [
"producer = KafkaProducer() \n",
"producer = KafkaProducer()\n",
"print(topic)\n",
"\n",
"try:\n",
" future = producer.send(\n",
" topic, {'message': 'Synchronous message 1 from Diaspora SDK'})\n",
" topic, {\"message\": \"Synchronous message 1 from Diaspora SDK\"}\n",
" )\n",
" print(future.get(timeout=10))\n",
" future = producer.send(\n",
" topic, {'message': 'Synchronous message 2 from Diaspora SDK'})\n",
" topic, {\"message\": \"Synchronous message 2 from Diaspora SDK\"}\n",
" )\n",
" print(future.get(timeout=10))\n",
"except Exception as e:\n",
" print(f\"Failed to send message: {e}\")"
Expand Down Expand Up @@ -435,11 +438,9 @@
"\"\"\"\n",
"\n",
"producer = KafkaProducer()\n",
"future = producer.send(\n",
" topic, {'message': 'Synchronous message 1 from Diaspora SDK'})\n",
"future = producer.send(topic, {\"message\": \"Synchronous message 1 from Diaspora SDK\"})\n",
"print(future.get(timeout=10))\n",
"future = producer.send(\n",
" topic, {'message': 'Synchronous message 2 from Diaspora SDK'})\n",
"future = producer.send(topic, {\"message\": \"Synchronous message 2 from Diaspora SDK\"})\n",
"print(future.get(timeout=10))"
]
},
Expand All @@ -458,8 +459,8 @@
" expected return: None\n",
"\"\"\"\n",
"producer = KafkaProducer()\n",
"producer.send(topic, {'message': 'Asynchronous message 3 from Diaspora SDK'})\n",
"producer.send(topic, {'message': 'Asynchronous message 4 from Diaspora SDK'})\n",
"producer.send(topic, {\"message\": \"Asynchronous message 3 from Diaspora SDK\"})\n",
"producer.send(topic, {\"message\": \"Asynchronous message 4 from Diaspora SDK\"})\n",
"producer.flush()"
]
},
Expand Down Expand Up @@ -489,7 +490,7 @@
" expected return:\n",
" multiple {'message': ...}\n",
"\"\"\"\n",
"consumer = KafkaConsumer(topic, auto_offset_reset='earliest')\n",
"consumer = KafkaConsumer(topic, auto_offset_reset=\"earliest\")\n",
"start_time = time.time()\n",
"try:\n",
" while True:\n",
Expand Down Expand Up @@ -573,9 +574,9 @@
},
"outputs": [],
"source": [
"trigger_package = f\"{os.getcwd()}/my_deployment_package\" # require abs path here\n",
"trigger_package = f\"{os.getcwd()}/my_deployment_package\" # require abs path here\n",
"trigger_file = \"lambda_function.py\"\n",
"trigger_name_in_def=\"lambda_handler\"\n",
"trigger_name_in_def = \"lambda_handler\"\n",
"os.system(f\"mkdir {trigger_package}\")"
]
},
Expand Down Expand Up @@ -615,7 +616,7 @@
"\"\"\"\n",
"\n",
"with open(os.path.join(trigger_package, trigger_file), \"w\") as f:\n",
" f.write(trigger_code)"
" f.write(trigger_code)"
]
},
{
Expand All @@ -639,7 +640,9 @@
" print(f\"Zipping {lambda_function_package}\")\n",
" os.system(f\"cd {lambda_function_package} && zip -r {lambda_function_package}.zip .\")\n",
" with open(f\"{lambda_function_package}.zip\", \"rb\") as f:\n",
" return base64.b64encode(f.read()).decode('utf-8')\n",
" return base64.b64encode(f.read()).decode(\"utf-8\")\n",
"\n",
"\n",
"zipped_code = get_zipped_code(trigger_package)"
]
},
Expand Down Expand Up @@ -676,7 +679,6 @@
},
"outputs": [],
"source": [
"\n",
"topic_name = \"topic\" + c.subject_openid[-12:]\n",
"trigger_name = f\"lambda{random.randint(100, 999)}\"\n",
"trigger_runtime = \"python3.11\"\n",
Expand Down Expand Up @@ -750,18 +752,16 @@
"trigger_configs = {\n",
" \"Runtime\": trigger_runtime,\n",
" \"Handler\": trigger_handler,\n",
" \"Code\": {'ZipFile': zipped_code},\n",
" \"Code\": {\"ZipFile\": zipped_code},\n",
" \"Timeout\": 3,\n",
" \"MemorySize\": 128,\n",
" \"Environment\": {},\n",
" \"EphemeralStorage\": {'Size': 512},\n",
" \"Layers\": [\"arn:aws:lambda:us-east-1:845889416464:layer:globus-diaspora-layer311:1\"]\n",
"}\n",
"invoke_configs = {\n",
" \"Enabled\": True,\n",
" \"BatchSize\": 1,\n",
" \"StartingPosition\": \"LATEST\"\n",
" \"EphemeralStorage\": {\"Size\": 512},\n",
" \"Layers\": [\n",
" \"arn:aws:lambda:us-east-1:845889416464:layer:globus-diaspora-layer311:1\"\n",
" ],\n",
"}\n",
"invoke_configs = {\"Enabled\": True, \"BatchSize\": 1, \"StartingPosition\": \"LATEST\"}\n",
"print(c.create_trigger(topic_name, trigger_name, trigger_configs, invoke_configs))"
]
},
Expand Down Expand Up @@ -790,11 +790,21 @@
" trigger name: ... trigger handler name: ... trigger uuid: ... trigger topic: ...\n",
"\"\"\"\n",
"\n",
"for function in c.list_triggers()['triggers']:\n",
" print(\"trigger name:\", function['function_name'], \"\\n\",\n",
" \"trigger handler name:\", function['function_detail']['Configuration']['Handler'], \"\\n\",\n",
" \"trigger uuid:\", function['triggers'][0]['UUID'], \"\\n\",\n",
" \"trigger topic:\", function['triggers'][0]['Topics'][0], \"\\n\",)"
"for function in c.list_triggers()[\"triggers\"]:\n",
" print(\n",
" \"trigger name:\",\n",
" function[\"function_name\"],\n",
" \"\\n\",\n",
" \"trigger handler name:\",\n",
" function[\"function_detail\"][\"Configuration\"][\"Handler\"],\n",
" \"\\n\",\n",
" \"trigger uuid:\",\n",
" function[\"triggers\"][0][\"UUID\"],\n",
" \"\\n\",\n",
" \"trigger topic:\",\n",
" function[\"triggers\"][0][\"Topics\"][0],\n",
" \"\\n\",\n",
" )"
]
},
{
Expand All @@ -811,10 +821,10 @@
" expected return: {'function_name': ..., 'function_detail': ..., 'triggers': [...]}\n",
"\"\"\"\n",
"\n",
"for function in c.list_triggers()['triggers']:\n",
" if function['function_name'] == trigger_name:\n",
"for function in c.list_triggers()[\"triggers\"]:\n",
" if function[\"function_name\"] == trigger_name:\n",
" pprint(function, sort_dicts=False)\n",
" trigger_uuid = function['triggers'][0]['UUID']"
" trigger_uuid = function[\"triggers\"][0][\"UUID\"]"
]
},
{
Expand Down Expand Up @@ -845,8 +855,8 @@
"config1 = {\n",
" \"Enabled\": True,\n",
" \"BatchSize\": 123,\n",
" \"FilterCriteria\": {\"Filters\": [{'Pattern': json.dumps(pattern1)}]},\n",
" \"MaximumBatchingWindowInSeconds\": 42\n",
" \"FilterCriteria\": {\"Filters\": [{\"Pattern\": json.dumps(pattern1)}]},\n",
" \"MaximumBatchingWindowInSeconds\": 42,\n",
"}\n",
"print(c.update_trigger(trigger_uuid, config1))"
]
Expand All @@ -862,11 +872,7 @@
" Note: see the table above for other tunable configurations.\n",
" expected return: {\"status\": \"success\", \"before\": {}, \"after\": {}}\n",
"\"\"\"\n",
"config2 = {\n",
" \"BatchSize\": 1,\n",
" \"FilterCriteria\": {},\n",
" \"MaximumBatchingWindowInSeconds\": 1\n",
"}\n",
"config2 = {\"BatchSize\": 1, \"FilterCriteria\": {}, \"MaximumBatchingWindowInSeconds\": 1}\n",
"print(c.update_trigger(trigger_uuid, config2))"
]
},
Expand All @@ -891,10 +897,12 @@
"\n",
"producer = KafkaProducer()\n",
"future = producer.send(\n",
" topic_name, {'message': 'Synchronous message 3 from Diaspora SDK'})\n",
" topic_name, {\"message\": \"Synchronous message 3 from Diaspora SDK\"}\n",
")\n",
"print(future.get(timeout=10))\n",
"future = producer.send(\n",
" topic_name, {'message': 'Synchronous message 4 from Diaspora SDK'})\n",
" topic_name, {\"message\": \"Synchronous message 4 from Diaspora SDK\"}\n",
")\n",
"print(future.get(timeout=10))"
]
},
Expand All @@ -913,7 +921,7 @@
"\n",
"streams_response = c.list_log_streams(trigger_name)\n",
"print(streams_response)\n",
"recent_log_stream_name = streams_response['streams'][0]['logStreamName']"
"recent_log_stream_name = streams_response[\"streams\"][0][\"logStreamName\"]"
]
},
{
Expand Down Expand Up @@ -968,11 +976,21 @@
" expected return (if all triggers are deleted): None \n",
" expected return (otherwise): {'function_name': ..., 'function_detail': ..., 'triggers': [...]}\n",
"\"\"\"\n",
"for function in c.list_triggers()['triggers']:\n",
" print(\"trigger name:\", function['function_name'], \"\\n\",\n",
" \"trigger handler name:\", function['function_detail']['Configuration']['Handler'], \"\\n\",\n",
" \"trigger uuid:\", function['triggers'][0]['UUID'], \"\\n\",\n",
" \"trigger topic:\", function['triggers'][0]['Topics'][0], \"\\n\",)"
"for function in c.list_triggers()[\"triggers\"]:\n",
" print(\n",
" \"trigger name:\",\n",
" function[\"function_name\"],\n",
" \"\\n\",\n",
" \"trigger handler name:\",\n",
" function[\"function_detail\"][\"Configuration\"][\"Handler\"],\n",
" \"\\n\",\n",
" \"trigger uuid:\",\n",
" function[\"triggers\"][0][\"UUID\"],\n",
" \"\\n\",\n",
" \"trigger topic:\",\n",
" function[\"triggers\"][0][\"Topics\"][0],\n",
" \"\\n\",\n",
" )"
]
},
{
Expand Down
18 changes: 14 additions & 4 deletions diaspora_event_sdk/sdk/kafka_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,25 @@ def get_diaspora_config(extra_configs: Dict[str, Any] = {}) -> Dict[str, Any]:
Retrieve default Diaspora event fabric connection configurations for Kafka clients.
Merges default configurations with custom ones provided.
"""

bootstrap_servers = None
try:
keys = Client().retrieve_key()
os.environ["AWS_ACCESS_KEY_ID"] = keys["access_key"]
os.environ["AWS_SECRET_ACCESS_KEY"] = keys["secret_key"]
if (
"OCTOPUS_AWS_ACCESS_KEY_ID" not in os.environ
and "OCTOPUS_AWS_SECRET_ACCESS_KEY" not in os.environ
and "OCTOPUS_BOOTSTRAP_SERVERS" not in os.environ
):
keys = Client().retrieve_key()
os.environ["OCTOPUS_AWS_ACCESS_KEY_ID"] = keys["access_key"]
os.environ["OCTOPUS_AWS_SECRET_ACCESS_KEY"] = keys["secret_key"]
bootstrap_servers = keys["endpoint"]
else:
bootstrap_servers = os.environ["OCTOPUS_BOOTSTRAP_SERVERS"]
except Exception as e:
raise RuntimeError("Failed to retrieve Kafka keys") from e

conf = {
"bootstrap_servers": keys["endpoint"],
"bootstrap_servers": bootstrap_servers,
"security_protocol": "SASL_SSL",
"sasl_mechanism": "OAUTHBEARER",
"api_version": (3, 5, 1),
Expand Down
2 changes: 1 addition & 1 deletion diaspora_event_sdk/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.3.3"
__version__ = "0.3.4"

0 comments on commit a454456

Please sign in to comment.