diff --git a/README.md b/README.md index 4c558fc..4895f81 100644 --- a/README.md +++ b/README.md @@ -1,185 +1,25 @@

Diaspora Event Fabric: Resilience-enabling services for science from HPC to edge

-- [Installation](#installation) - * [Recommended Installation with Kafka Client Library](#recommended-installation-with-kafka-client-library) - * [Installation Without Kafka Client Library](#installation-without-kafka-client-library) -- [Use Diaspora Event SDK](#use-diaspora-event-sdk) - * [Use the SDK to communicate with Kafka (kafka-python Required)](#use-the-sdk-to-communicate-with-kafka--kafka-python-required-) - + [Register Topic (create topic ACLs)](#register-topic--create-topic-acls-) - + [Block Until Ready](#block-until-ready) - + [Start Producer](#start-producer) - + [Start Consumer](#start-consumer) - + [Unregister Topic (remove topic ACLs)](#unregister-topic--remove-topic-acls-) - * [Use Your Preferred Kafka Client Library](#use-your-preferred-kafka-client-library) - + [Register and Unregister Topic](#register-and-unregister-topic) - + [Cluster Connection Details](#cluster-connection-details) - * [Advanced Usage](#advanced-usage) - + [Password Refresh](#password-refresh) -- [Common Issues](#common-issues) - * [ImportError: cannot import name 'KafkaProducer' from 'diaspora_event_sdk'](#importerror--cannot-import-name--kafkaproducer--from--diaspora-event-sdk-) - * [kafka.errors.NoBrokersAvailable and kafka.errors.NodeNotReadyError](#kafkaerrorsnobrokersavailable-and-kafkaerrorsnodenotreadyerror) - * [kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.](#kafkaerrorskafkatimeouterror--kafkatimeouterror--failed-to-update-metadata-after-600-secs) - * [ssl.SSLCertVerificationError](#sslsslcertverificationerror) - - ## Installation ### Recommended Installation with Kafka Client Library -`KafkaProducer` and `KafkaConsumer` classes in the SDK enable using Diaspora Event Fabric with ready-made configurations. To use these two classes,`kafka-python` library installation is required. +The `KafkaProducer` and `KafkaConsumer` classes within the SDK are designed for seamless integration with Diaspora Event Fabric using pre-configured settings. For utilizing these classes, the `kafka-python` library is necessary. -To install Diaspora Event SDK with `kafka-python`, run: +To install the Diaspora Event SDK along with `kafka-python,` execute: ```bash pip install "diaspora-event-sdk[kafka-python]" ``` ### Installation Without Kafka Client Library -If you plan to use other client libraries to communicate with Kafka, you can install the SDK without `kafka-python` dependency. +If you prefer using different client libraries for Kafka communication, you can install the SDK without the kafka-python dependency. The SDK still serves for topic-level access control (authorization) and login credential management (authentication). To install the SDK without client libraries, simply run: ```bash pip install diaspora-event-sdk ``` -Note that this option does not install the necessary dependency for `KafkaProducer` and `KafkaConsumer` below to work. - -## Use Diaspora Event SDK -### Use the SDK to communicate with Kafka (kafka-python Required) - -#### Register Topic (create topic ACLs) - -Before you can create, describe, and delete topics we need to set the appropriate ACLs in ZooKeeper. Here we use the Client to register ACLs for the desired topic name. - -```python -from diaspora_event_sdk import Client as GlobusClient -c = GlobusClient() -topic = "topic-" + c.subject_openid[-12:] -print(c.register_topic(topic)) -print(c.list_topics()) +Note: This method omits the necessary dependencies for KafkaProducer and KafkaConsumer classes. -# registration_status = c.register_topic(topic) -# print(registration_status) -# assert registration_status["status"] in ["no-op", "success"] - -# registered_topics = c.list_topics() -# print(registered_topics) -# assert topic in registered_topics["topics"] -``` -Register a topic also creates it, if the topic previously does not exist. - -#### Block Until Ready -`KafkaProducer` and `KafkaConsumer` would internally call `create_key` if the the connection credential is not found locally (e.g., when you first authenticated with Globus). Behind the sence, the middle service contacts AWS to initialize the asynchronous process of creating and associating the secret. The method below blocks until the credential is ready to be used by producer and consumer. When the method finishes, it returns True and the producer and consumer code below should work without further waiting. By default, the method retries in loop for five minutes before giving up and return False. Use parameter `max_minutes` to change the number of minutes of max waiting. - -```python -from diaspora_event_sdk import block_until_ready -assert block_until_ready() -``` - -#### Start Producer - -Once the topic is created we can publish to it. The KafkaProducer wraps the [Python KafkaProducer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html) Event publication can be either synchronous or asynchronous. Below demonstrates the synchronous approach. - -```python -from diaspora_event_sdk import KafkaProducer -producer = KafkaProducer() -future = producer.send( - topic, {'message': 'Synchronous message from Diaspora SDK'}) -print(future.get(timeout=10)) -``` - -#### Start Consumer - -A consumer can be configured to monitor the topic and act on events as they are published. The KafkaConsumer wraps the [Python KafkaConsumer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html). Here we use the `auto_offset_reset` to consume from the first event published to the topic. Removing this field will have the consumer act only on new events. - -```python -from diaspora_event_sdk import KafkaConsumer -consumer = KafkaConsumer(topic, auto_offset_reset='earliest') -for msg in consumer: - print(msg) -``` - -#### Unregister Topic (remove topic ACLs) -```python -from diaspora_event_sdk import Client as GlobusClient -c = GlobusClient() -topic = "topic-" + c.subject_openid[-12:] -print(c.unregister_topic(topic)) -print(c.list_topics()) -``` - -### Use Your Preferred Kafka Client Library - -#### Register and Unregister Topic -The steps are the same as above by using the `register_topic`, `unregister_topic`, and `list_topics` methods from the `Client` class. - -#### Cluster Connection Details -| Configuration | Value | -| ----------------- | ------------------------------------------------------------------- | -| Bootstrap Servers | [`MSK_SCRAM_ENDPOINT`](/diaspora_event_sdk/sdk/_environments.py#L6) | -| Security Protocol | `SASL_SSL` | -| Sasl Mechanism | `SCRAM-SHA-512` | -| Api Version | `3.5.1` | -| Username | (See instructions below) | -| Password | (See instructions below) | - -Execute the code snippet below to obtain your unique username and password for the Kafka cluster: -```python -from diaspora_event_sdk import Client as GlobusClient -c = GlobusClient() -print(c.retrieve_key()) -``` - -### Advanced Usage -#### Key Migration -In case you want to use the same credential (OpenID, secret key) on the second macine, but a call to create_key() invalidates all previous keys so you cannot rely on this API for generating a new key for the new machine, while keeping the old key on the old machine valid. Starting at 0.0.19, you can call the following code to retrieve the active secret key on the first machine and store it to the second machine, so that both machines hold valid keys. - -On the first machine, call: -```python -from diaspora_event_sdk import Client as GlobusClient -c = GlobusClient() -print(c.get_secret_key()) # note down the secret key -``` - -On the second machine, call: -```python -from diaspora_event_sdk import Client as GlobusClient -from diaspora_event_sdk import block_until_ready - -c = GlobusClient() -c.put_secret_key("") -assert block_until_ready() # should unblock immediately -``` -Note that a call to create_key() on any machine would invalidate the current key, on both machines. - -#### Password Refresh -In case that you need to invalidate all previously issued passwords and generate a new one, call the `create_key` method from the `Client` class -```python -from diaspora_event_sdk import Client as GlobusClient -c = GlobusClient() -print(c.create_key()) -``` -Subsequent calls to `retrieve_key` will return the new password from the cache. This cache is reset with a logout or a new `create_key` call. - -## Common Issues - -### ImportError: cannot import name 'KafkaProducer' from 'diaspora_event_sdk' - -It seems that you ran `pip install diaspora-event-sdk` to install the Diaspora Event SDK without `kafka-python`. Run `pip install kafka-python` to install the necessary dependency for our `KafkaProducer` and `KafkaConsumer` classes. - -### kafka.errors.NoBrokersAvailable and kafka.errors.NodeNotReadyError -These messages might pop up if `create_key` is called shortly before instanciating a Kafka client. This is because there's a delay for AWS Secret Manager to associate the newly generated credential with MSK. Note that `create_key` is called internally by `kafka_client.py` the first time you create one of these clients. Please wait a while (around 1 minute) and retry. - -### kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs. -**Step 1: Verify Topic Creation and Access:** -Before interacting with the producer/consumer, ensure that the topic has been successfully created and access is granted to you. Execute the following command: - -```python -from diaspora_event_sdk import Client as GlobusClient -c = GlobusClient() -# topic = -print(c.register_topic(topic)) -``` -This should return a `status: no-op` message, indicating that the topic is already registered and accessible. +## Use Diaspora Event Fabric SDK -**Step 2: Wait Automatic Key Creation in KafkaProducer and KafkaConsumer** -`KafkaProducer` and `KafkaConsumer` would internally call `create_key` if the keys are not found locally (e.g., when you first authenticated with Globus). Behind the sence, the middle service contacts AWS to initialize the asynchronous process of creating and associating the secret. Please wait a while (around 1 minute) and retry. +Please refer to our [QuickStart Guide](docs/quickstart.md) for recommended use with `kafka-python` library as well as steps to use your own Kafka client. -### ssl.SSLCertVerificationError -This is commmon on MacOS system, see [this StackOverflow answer](https://stackoverflow.com/a/53310545). \ No newline at end of file +Please refer to our [TrobleShooting Guide](docs/troubleshooting.md) for debugging common problems and effective key management strategies. \ No newline at end of file diff --git a/docs/README_old.md b/docs/README_old.md new file mode 100644 index 0000000..4c558fc --- /dev/null +++ b/docs/README_old.md @@ -0,0 +1,185 @@ +

Diaspora Event Fabric: Resilience-enabling services for science from HPC to edge

+ +- [Installation](#installation) + * [Recommended Installation with Kafka Client Library](#recommended-installation-with-kafka-client-library) + * [Installation Without Kafka Client Library](#installation-without-kafka-client-library) +- [Use Diaspora Event SDK](#use-diaspora-event-sdk) + * [Use the SDK to communicate with Kafka (kafka-python Required)](#use-the-sdk-to-communicate-with-kafka--kafka-python-required-) + + [Register Topic (create topic ACLs)](#register-topic--create-topic-acls-) + + [Block Until Ready](#block-until-ready) + + [Start Producer](#start-producer) + + [Start Consumer](#start-consumer) + + [Unregister Topic (remove topic ACLs)](#unregister-topic--remove-topic-acls-) + * [Use Your Preferred Kafka Client Library](#use-your-preferred-kafka-client-library) + + [Register and Unregister Topic](#register-and-unregister-topic) + + [Cluster Connection Details](#cluster-connection-details) + * [Advanced Usage](#advanced-usage) + + [Password Refresh](#password-refresh) +- [Common Issues](#common-issues) + * [ImportError: cannot import name 'KafkaProducer' from 'diaspora_event_sdk'](#importerror--cannot-import-name--kafkaproducer--from--diaspora-event-sdk-) + * [kafka.errors.NoBrokersAvailable and kafka.errors.NodeNotReadyError](#kafkaerrorsnobrokersavailable-and-kafkaerrorsnodenotreadyerror) + * [kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.](#kafkaerrorskafkatimeouterror--kafkatimeouterror--failed-to-update-metadata-after-600-secs) + * [ssl.SSLCertVerificationError](#sslsslcertverificationerror) + + +## Installation +### Recommended Installation with Kafka Client Library +`KafkaProducer` and `KafkaConsumer` classes in the SDK enable using Diaspora Event Fabric with ready-made configurations. To use these two classes,`kafka-python` library installation is required. + +To install Diaspora Event SDK with `kafka-python`, run: +```bash +pip install "diaspora-event-sdk[kafka-python]" +``` + +### Installation Without Kafka Client Library +If you plan to use other client libraries to communicate with Kafka, you can install the SDK without `kafka-python` dependency. + +To install the SDK without client libraries, simply run: +```bash +pip install diaspora-event-sdk +``` +Note that this option does not install the necessary dependency for `KafkaProducer` and `KafkaConsumer` below to work. + +## Use Diaspora Event SDK +### Use the SDK to communicate with Kafka (kafka-python Required) + +#### Register Topic (create topic ACLs) + +Before you can create, describe, and delete topics we need to set the appropriate ACLs in ZooKeeper. Here we use the Client to register ACLs for the desired topic name. + +```python +from diaspora_event_sdk import Client as GlobusClient +c = GlobusClient() +topic = "topic-" + c.subject_openid[-12:] +print(c.register_topic(topic)) +print(c.list_topics()) + +# registration_status = c.register_topic(topic) +# print(registration_status) +# assert registration_status["status"] in ["no-op", "success"] + +# registered_topics = c.list_topics() +# print(registered_topics) +# assert topic in registered_topics["topics"] +``` +Register a topic also creates it, if the topic previously does not exist. + +#### Block Until Ready +`KafkaProducer` and `KafkaConsumer` would internally call `create_key` if the the connection credential is not found locally (e.g., when you first authenticated with Globus). Behind the sence, the middle service contacts AWS to initialize the asynchronous process of creating and associating the secret. The method below blocks until the credential is ready to be used by producer and consumer. When the method finishes, it returns True and the producer and consumer code below should work without further waiting. By default, the method retries in loop for five minutes before giving up and return False. Use parameter `max_minutes` to change the number of minutes of max waiting. + +```python +from diaspora_event_sdk import block_until_ready +assert block_until_ready() +``` + +#### Start Producer + +Once the topic is created we can publish to it. The KafkaProducer wraps the [Python KafkaProducer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html) Event publication can be either synchronous or asynchronous. Below demonstrates the synchronous approach. + +```python +from diaspora_event_sdk import KafkaProducer +producer = KafkaProducer() +future = producer.send( + topic, {'message': 'Synchronous message from Diaspora SDK'}) +print(future.get(timeout=10)) +``` + +#### Start Consumer + +A consumer can be configured to monitor the topic and act on events as they are published. The KafkaConsumer wraps the [Python KafkaConsumer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html). Here we use the `auto_offset_reset` to consume from the first event published to the topic. Removing this field will have the consumer act only on new events. + +```python +from diaspora_event_sdk import KafkaConsumer +consumer = KafkaConsumer(topic, auto_offset_reset='earliest') +for msg in consumer: + print(msg) +``` + +#### Unregister Topic (remove topic ACLs) +```python +from diaspora_event_sdk import Client as GlobusClient +c = GlobusClient() +topic = "topic-" + c.subject_openid[-12:] +print(c.unregister_topic(topic)) +print(c.list_topics()) +``` + +### Use Your Preferred Kafka Client Library + +#### Register and Unregister Topic +The steps are the same as above by using the `register_topic`, `unregister_topic`, and `list_topics` methods from the `Client` class. + +#### Cluster Connection Details +| Configuration | Value | +| ----------------- | ------------------------------------------------------------------- | +| Bootstrap Servers | [`MSK_SCRAM_ENDPOINT`](/diaspora_event_sdk/sdk/_environments.py#L6) | +| Security Protocol | `SASL_SSL` | +| Sasl Mechanism | `SCRAM-SHA-512` | +| Api Version | `3.5.1` | +| Username | (See instructions below) | +| Password | (See instructions below) | + +Execute the code snippet below to obtain your unique username and password for the Kafka cluster: +```python +from diaspora_event_sdk import Client as GlobusClient +c = GlobusClient() +print(c.retrieve_key()) +``` + +### Advanced Usage +#### Key Migration +In case you want to use the same credential (OpenID, secret key) on the second macine, but a call to create_key() invalidates all previous keys so you cannot rely on this API for generating a new key for the new machine, while keeping the old key on the old machine valid. Starting at 0.0.19, you can call the following code to retrieve the active secret key on the first machine and store it to the second machine, so that both machines hold valid keys. + +On the first machine, call: +```python +from diaspora_event_sdk import Client as GlobusClient +c = GlobusClient() +print(c.get_secret_key()) # note down the secret key +``` + +On the second machine, call: +```python +from diaspora_event_sdk import Client as GlobusClient +from diaspora_event_sdk import block_until_ready + +c = GlobusClient() +c.put_secret_key("") +assert block_until_ready() # should unblock immediately +``` +Note that a call to create_key() on any machine would invalidate the current key, on both machines. + +#### Password Refresh +In case that you need to invalidate all previously issued passwords and generate a new one, call the `create_key` method from the `Client` class +```python +from diaspora_event_sdk import Client as GlobusClient +c = GlobusClient() +print(c.create_key()) +``` +Subsequent calls to `retrieve_key` will return the new password from the cache. This cache is reset with a logout or a new `create_key` call. + +## Common Issues + +### ImportError: cannot import name 'KafkaProducer' from 'diaspora_event_sdk' + +It seems that you ran `pip install diaspora-event-sdk` to install the Diaspora Event SDK without `kafka-python`. Run `pip install kafka-python` to install the necessary dependency for our `KafkaProducer` and `KafkaConsumer` classes. + +### kafka.errors.NoBrokersAvailable and kafka.errors.NodeNotReadyError +These messages might pop up if `create_key` is called shortly before instanciating a Kafka client. This is because there's a delay for AWS Secret Manager to associate the newly generated credential with MSK. Note that `create_key` is called internally by `kafka_client.py` the first time you create one of these clients. Please wait a while (around 1 minute) and retry. + +### kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs. +**Step 1: Verify Topic Creation and Access:** +Before interacting with the producer/consumer, ensure that the topic has been successfully created and access is granted to you. Execute the following command: + +```python +from diaspora_event_sdk import Client as GlobusClient +c = GlobusClient() +# topic = +print(c.register_topic(topic)) +``` +This should return a `status: no-op` message, indicating that the topic is already registered and accessible. + +**Step 2: Wait Automatic Key Creation in KafkaProducer and KafkaConsumer** +`KafkaProducer` and `KafkaConsumer` would internally call `create_key` if the keys are not found locally (e.g., when you first authenticated with Globus). Behind the sence, the middle service contacts AWS to initialize the asynchronous process of creating and associating the secret. Please wait a while (around 1 minute) and retry. + +### ssl.SSLCertVerificationError +This is commmon on MacOS system, see [this StackOverflow answer](https://stackoverflow.com/a/53310545). \ No newline at end of file diff --git a/docs/quickstart.md b/docs/quickstart.md index 02fed9e..8ec029d 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -61,4 +61,24 @@ print(c.unregister_topic(topic)) print(c.list_topics()) ``` -Once a topic is successfully unregistered, you'll lose access to it. However, it becomes available for registration and access by other users. \ No newline at end of file +Once a topic is successfully unregistered, you'll lose access to it. However, it becomes available for registration and access by other users. + + +## Integrating Your Own Kafka Client with Diaspora Event Fabric +If you're opting to use a custom Kafka client library, here are the necessary cluster connection details: + +| Configuration | Value | +| ----------------- | ------------------------------------------------------------------- | +| Bootstrap Servers | [`MSK_SCRAM_ENDPOINT`](/diaspora_event_sdk/sdk/_environments.py#L6) | +| Security Protocol | `SASL_SSL` | +| Sasl Mechanism | `SCRAM-SHA-512` | +| Api Version | `3.5.1` | +| Username | (See instructions below) | +| Password | (See instructions below) | + +Your Kafka cluster username is your OpenID, and the password is the active AWS secret key. Please refer to our [TroubleShooting Guide](docs/troubleshooting.md) for secret key management. +```python +from diaspora_event_sdk import Client as GlobusClient +c = GlobusClient() +print(c.retrieve_key()) +``` \ No newline at end of file diff --git a/docs/troubleshooting.md b/docs/troubleshooting.md index b2994fc..c32e6a1 100644 --- a/docs/troubleshooting.md +++ b/docs/troubleshooting.md @@ -46,7 +46,7 @@ New keys may take a while to become active due to AWS processing time. To preven ## Key Migration To avoid invalidating existing keys (if they're in use elsewhere), follow these steps below. -#### Exporting Key from Original Machine +### Exporting Key from Original Machine ```python from diaspora_event_sdk import Client as GlobusClient @@ -54,7 +54,7 @@ c = GlobusClient() print(c.get_secret_key()) # Note down this key ``` -#### Importing Key on New Machine: +### Importing Key on New Machine: ```python from diaspora_event_sdk import Client as GlobusClient @@ -67,15 +67,15 @@ print(c.retrieve_key()) assert block_until_ready() # Should unblock in 1-10 seconds ``` -## Key Management After Re-login and Across Multiple Machines. +## Key Management After Re-login and Across Multiple Machines -#### Key Management After Logout and Login +### Key Management After Logout and Login If you log out and then log in again, any subsequent call to `block_until_ready()` or an attempt to create a producer or consumer will internally trigger the `create_key()` function because no secret key is found in `storage.db`. This API call will invalidate all previously issued keys and retrieve a new one. To avoid accidentally invalidating the secret key, it's recommended to use `put_secret_key()` (see above section) before calling `block_until_ready()` or creating a producer or consumer after re-login. This method allows you to manually set the secret key, ensuring that the existing key is not unintentionally invalidated. -#### Managing Keys Across Multiple Machines +### Managing Keys Across Multiple Machines If machine A is logged in with Globus Auth credentials and has the AWS secret key stored in `storage.db`, logging into machine B with the same Globus Auth credential and calling `block_until_ready()` will invalidate the key on machine A. To ensure both machines have valid secret keys, follow the section above.