-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
1650f64
commit 0afe159
Showing
4 changed files
with
218 additions
and
173 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,185 +1,25 @@ | ||
<h1>Diaspora Event Fabric: Resilience-enabling services for science from HPC to edge</h1> | ||
|
||
- [Installation](#installation) | ||
* [Recommended Installation with Kafka Client Library](#recommended-installation-with-kafka-client-library) | ||
* [Installation Without Kafka Client Library](#installation-without-kafka-client-library) | ||
- [Use Diaspora Event SDK](#use-diaspora-event-sdk) | ||
* [Use the SDK to communicate with Kafka (kafka-python Required)](#use-the-sdk-to-communicate-with-kafka--kafka-python-required-) | ||
+ [Register Topic (create topic ACLs)](#register-topic--create-topic-acls-) | ||
+ [Block Until Ready](#block-until-ready) | ||
+ [Start Producer](#start-producer) | ||
+ [Start Consumer](#start-consumer) | ||
+ [Unregister Topic (remove topic ACLs)](#unregister-topic--remove-topic-acls-) | ||
* [Use Your Preferred Kafka Client Library](#use-your-preferred-kafka-client-library) | ||
+ [Register and Unregister Topic](#register-and-unregister-topic) | ||
+ [Cluster Connection Details](#cluster-connection-details) | ||
* [Advanced Usage](#advanced-usage) | ||
+ [Password Refresh](#password-refresh) | ||
- [Common Issues](#common-issues) | ||
* [ImportError: cannot import name 'KafkaProducer' from 'diaspora_event_sdk'](#importerror--cannot-import-name--kafkaproducer--from--diaspora-event-sdk-) | ||
* [kafka.errors.NoBrokersAvailable and kafka.errors.NodeNotReadyError](#kafkaerrorsnobrokersavailable-and-kafkaerrorsnodenotreadyerror) | ||
* [kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.](#kafkaerrorskafkatimeouterror--kafkatimeouterror--failed-to-update-metadata-after-600-secs) | ||
* [ssl.SSLCertVerificationError](#sslsslcertverificationerror) | ||
|
||
|
||
## Installation | ||
### Recommended Installation with Kafka Client Library | ||
`KafkaProducer` and `KafkaConsumer` classes in the SDK enable using Diaspora Event Fabric with ready-made configurations. To use these two classes,`kafka-python` library installation is required. | ||
The `KafkaProducer` and `KafkaConsumer` classes within the SDK are designed for seamless integration with Diaspora Event Fabric using pre-configured settings. For utilizing these classes, the `kafka-python` library is necessary. | ||
|
||
To install Diaspora Event SDK with `kafka-python`, run: | ||
To install the Diaspora Event SDK along with `kafka-python,` execute: | ||
```bash | ||
pip install "diaspora-event-sdk[kafka-python]" | ||
``` | ||
|
||
### Installation Without Kafka Client Library | ||
If you plan to use other client libraries to communicate with Kafka, you can install the SDK without `kafka-python` dependency. | ||
If you prefer using different client libraries for Kafka communication, you can install the SDK without the kafka-python dependency. The SDK still serves for topic-level access control (authorization) and login credential management (authentication). | ||
|
||
To install the SDK without client libraries, simply run: | ||
```bash | ||
pip install diaspora-event-sdk | ||
``` | ||
Note that this option does not install the necessary dependency for `KafkaProducer` and `KafkaConsumer` below to work. | ||
|
||
## Use Diaspora Event SDK | ||
### Use the SDK to communicate with Kafka (kafka-python Required) | ||
|
||
#### Register Topic (create topic ACLs) | ||
|
||
Before you can create, describe, and delete topics we need to set the appropriate ACLs in ZooKeeper. Here we use the Client to register ACLs for the desired topic name. | ||
|
||
```python | ||
from diaspora_event_sdk import Client as GlobusClient | ||
c = GlobusClient() | ||
topic = "topic-" + c.subject_openid[-12:] | ||
print(c.register_topic(topic)) | ||
print(c.list_topics()) | ||
Note: This method omits the necessary dependencies for KafkaProducer and KafkaConsumer classes. | ||
|
||
# registration_status = c.register_topic(topic) | ||
# print(registration_status) | ||
# assert registration_status["status"] in ["no-op", "success"] | ||
|
||
# registered_topics = c.list_topics() | ||
# print(registered_topics) | ||
# assert topic in registered_topics["topics"] | ||
``` | ||
Register a topic also creates it, if the topic previously does not exist. | ||
|
||
#### Block Until Ready | ||
`KafkaProducer` and `KafkaConsumer` would internally call `create_key` if the the connection credential is not found locally (e.g., when you first authenticated with Globus). Behind the sence, the middle service contacts AWS to initialize the asynchronous process of creating and associating the secret. The method below blocks until the credential is ready to be used by producer and consumer. When the method finishes, it returns True and the producer and consumer code below should work without further waiting. By default, the method retries in loop for five minutes before giving up and return False. Use parameter `max_minutes` to change the number of minutes of max waiting. | ||
|
||
```python | ||
from diaspora_event_sdk import block_until_ready | ||
assert block_until_ready() | ||
``` | ||
|
||
#### Start Producer | ||
|
||
Once the topic is created we can publish to it. The KafkaProducer wraps the [Python KafkaProducer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html) Event publication can be either synchronous or asynchronous. Below demonstrates the synchronous approach. | ||
|
||
```python | ||
from diaspora_event_sdk import KafkaProducer | ||
producer = KafkaProducer() | ||
future = producer.send( | ||
topic, {'message': 'Synchronous message from Diaspora SDK'}) | ||
print(future.get(timeout=10)) | ||
``` | ||
|
||
#### Start Consumer | ||
|
||
A consumer can be configured to monitor the topic and act on events as they are published. The KafkaConsumer wraps the [Python KafkaConsumer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html). Here we use the `auto_offset_reset` to consume from the first event published to the topic. Removing this field will have the consumer act only on new events. | ||
|
||
```python | ||
from diaspora_event_sdk import KafkaConsumer | ||
consumer = KafkaConsumer(topic, auto_offset_reset='earliest') | ||
for msg in consumer: | ||
print(msg) | ||
``` | ||
|
||
#### Unregister Topic (remove topic ACLs) | ||
```python | ||
from diaspora_event_sdk import Client as GlobusClient | ||
c = GlobusClient() | ||
topic = "topic-" + c.subject_openid[-12:] | ||
print(c.unregister_topic(topic)) | ||
print(c.list_topics()) | ||
``` | ||
|
||
### Use Your Preferred Kafka Client Library | ||
|
||
#### Register and Unregister Topic | ||
The steps are the same as above by using the `register_topic`, `unregister_topic`, and `list_topics` methods from the `Client` class. | ||
|
||
#### Cluster Connection Details | ||
| Configuration | Value | | ||
| ----------------- | ------------------------------------------------------------------- | | ||
| Bootstrap Servers | [`MSK_SCRAM_ENDPOINT`](/diaspora_event_sdk/sdk/_environments.py#L6) | | ||
| Security Protocol | `SASL_SSL` | | ||
| Sasl Mechanism | `SCRAM-SHA-512` | | ||
| Api Version | `3.5.1` | | ||
| Username | (See instructions below) | | ||
| Password | (See instructions below) | | ||
|
||
Execute the code snippet below to obtain your unique username and password for the Kafka cluster: | ||
```python | ||
from diaspora_event_sdk import Client as GlobusClient | ||
c = GlobusClient() | ||
print(c.retrieve_key()) | ||
``` | ||
|
||
### Advanced Usage | ||
#### Key Migration | ||
In case you want to use the same credential (OpenID, secret key) on the second macine, but a call to create_key() invalidates all previous keys so you cannot rely on this API for generating a new key for the new machine, while keeping the old key on the old machine valid. Starting at 0.0.19, you can call the following code to retrieve the active secret key on the first machine and store it to the second machine, so that both machines hold valid keys. | ||
|
||
On the first machine, call: | ||
```python | ||
from diaspora_event_sdk import Client as GlobusClient | ||
c = GlobusClient() | ||
print(c.get_secret_key()) # note down the secret key | ||
``` | ||
|
||
On the second machine, call: | ||
```python | ||
from diaspora_event_sdk import Client as GlobusClient | ||
from diaspora_event_sdk import block_until_ready | ||
|
||
c = GlobusClient() | ||
c.put_secret_key("<secret-key-from-first-machine>") | ||
assert block_until_ready() # should unblock immediately | ||
``` | ||
Note that a call to create_key() on any machine would invalidate the current key, on both machines. | ||
|
||
#### Password Refresh | ||
In case that you need to invalidate all previously issued passwords and generate a new one, call the `create_key` method from the `Client` class | ||
```python | ||
from diaspora_event_sdk import Client as GlobusClient | ||
c = GlobusClient() | ||
print(c.create_key()) | ||
``` | ||
Subsequent calls to `retrieve_key` will return the new password from the cache. This cache is reset with a logout or a new `create_key` call. | ||
|
||
## Common Issues | ||
|
||
### ImportError: cannot import name 'KafkaProducer' from 'diaspora_event_sdk' | ||
|
||
It seems that you ran `pip install diaspora-event-sdk` to install the Diaspora Event SDK without `kafka-python`. Run `pip install kafka-python` to install the necessary dependency for our `KafkaProducer` and `KafkaConsumer` classes. | ||
|
||
### kafka.errors.NoBrokersAvailable and kafka.errors.NodeNotReadyError | ||
These messages might pop up if `create_key` is called shortly before instanciating a Kafka client. This is because there's a delay for AWS Secret Manager to associate the newly generated credential with MSK. Note that `create_key` is called internally by `kafka_client.py` the first time you create one of these clients. Please wait a while (around 1 minute) and retry. | ||
|
||
### kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs. | ||
**Step 1: Verify Topic Creation and Access:** | ||
Before interacting with the producer/consumer, ensure that the topic has been successfully created and access is granted to you. Execute the following command: | ||
|
||
```python | ||
from diaspora_event_sdk import Client as GlobusClient | ||
c = GlobusClient() | ||
# topic = <the topic you want to use> | ||
print(c.register_topic(topic)) | ||
``` | ||
This should return a `status: no-op` message, indicating that the topic is already registered and accessible. | ||
## Use Diaspora Event Fabric SDK | ||
|
||
**Step 2: Wait Automatic Key Creation in KafkaProducer and KafkaConsumer** | ||
`KafkaProducer` and `KafkaConsumer` would internally call `create_key` if the keys are not found locally (e.g., when you first authenticated with Globus). Behind the sence, the middle service contacts AWS to initialize the asynchronous process of creating and associating the secret. Please wait a while (around 1 minute) and retry. | ||
Please refer to our [QuickStart Guide](docs/quickstart.md) for recommended use with `kafka-python` library as well as steps to use your own Kafka client. | ||
|
||
### ssl.SSLCertVerificationError | ||
This is commmon on MacOS system, see [this StackOverflow answer](https://stackoverflow.com/a/53310545). | ||
Please refer to our [TrobleShooting Guide](docs/troubleshooting.md) for debugging common problems and effective key management strategies. |
Oops, something went wrong.