Skip to content

Commit

Permalink
Merge pull request #527 from OpenFn/kafka-triggers
Browse files Browse the repository at this point in the history
Documenting Kafka triggers for Users
  • Loading branch information
aleksa-krolls authored Aug 7, 2024
2 parents 78a1dac + c92708e commit 2befb80
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 4 deletions.
61 changes: 59 additions & 2 deletions docs/build/triggers.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
title: Triggers
---

Triggers allow you to start the execution of workflows automatically. They come
in two types: Cron triggers and Webhook Event triggers.
Triggers allow you to start the execution of Workflows automatically. They come
in three types: Cron triggers, Webhook Event, and Kafka triggers.

## Webhook Event Triggers

Expand Down Expand Up @@ -93,3 +93,60 @@ fn(state => {
return state;
});
```

## Kafka Triggers

The Kafka trigger allows OpenFn users to build Workflows triggered by messages
published by a Kafka cluster. The triggers make use of Kafka consumer groups
that are set up on-demand to receive messages from a defined cluster then
converts them to `Input` dataclips that are used to initialize a Work Order.

![Configuring Kafka Trigger](/img/configuring-kafka.png)

:::info What is Kafka?

Apache Kafka® is an event streaming platform designed to handle high volumes
of data. Check out [Kafka Docs](https://kafka.apache.org/documentation/#gettingStarted)
to learn more.

:::

### Configuring a Kafka trigger for your workflow

1. Create a new Workflow or open an existing Workflow in your Project
2. Click on the workflow's Trigger and change the trigger type to `Kafka Consumer`
in the `Trigger type` dropdown.
3. Fill out the required connection details:

- **Hosts**: Provide the URL of the host(s) your trigger should listen to for
message.
- **Topics**: Enter the topics your Kafka consumers should subscribe to. You
need at least one topic for a successful connection.
- **SSL**: Some Kafka cluster require SSL connection. If you are connecting to
an environment that requires SSL connection, select `Enable SSL`.
- **SSL Authentication**: Select the type of Authentication required for the Kafka
cluster.
- **Initial offset policy**: The intial offset dictates where the consumer
starts reading messages from a topic when it subscribes for the first time.
There are three possible options: `earliest` messages available, `latest`
messages available, or messages with a specific `timestamp` (e.g., `1721889238000`).
- **Connect timeout**: The connect timeout specified in seconds (e.g., `30`) represents how
long the consumer should wait before timing out when attempting to connect
with a Kafka cluster.

4. If you have not finished designing your Workflow or you're not ready to start
receiving messages from the Kafka cluster, please check the box to **disable
the trigger** until you're ready for message ingestion to begin.

:::warning Disable the trigger during workflow design

Once the required connection information is provided via the modal, the trigger
will *immediately* start attempting to connect to the Kafka cluster. We advise that
the trigger is disabled until your workflow is ready to receive data from the cluster for
processing. **To stop the trigger from receiving and processing messages, check the `Disable
this trigger` checkbox at the bottom of the trigger configuration modal.**

:::

Learn how the initial `state` (and `Input`) for Kafka-triggered Workflows gets built
[here](../jobs/state#kafka-triggered-runs).
44 changes: 42 additions & 2 deletions docs/jobs/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ operation. Remember that job expressions are a series of operations: they each
take state and return state, after creating any number of side effects. You can
control what is returned at the end of all of these operations.

### Webhook triggered Runs
### Webhook triggered runs

On the platform, when a Run is triggered by a webhook event, the input state
contains important parts of the inbound **http request**.
Expand All @@ -79,7 +79,47 @@ The input state will look something like this:
}
```

### Cron triggered Runs
### Kafka triggered runs

When a kafka message is received by the trigger, the input state contains
important information for auditing or recovering from any loss of connection or
failure of the workorder.

The input state looks like this:

```js
{
data: { // the body of the kafka message request
body: {
formId: "patient_enrollment",
name: "John Doe"
},
headers: {
"X-Forwarded-For": "41.90.69.203",
"X-Forwarded-Host": "ec2-13-244-108-204.af-south-1.compute.amazonaws.com:5001",
"X-OpenHIM-ClientID": "test",
"X-OpenHIM-TransactionID": "66a1f1d4c81081a1e6809484",
"accept": "application/fhir+json",
"content-length": "20592",
"content-type": "application/fhir+json",
"user-agent": "k6/0.49.0 (https://k6.io/)"
},
method: "POST",
path: "/kafka",
pattern: "^/kafka$"
},
request: {
"headers": [],
"key": "", //optional and nullable
"offset": 168321,
"partition": 1,
"topic": "fhir-data-pipes",
"ts": 1721889238000
}
}
```

### Cron triggered runs

On the platform, when a Run is triggered by a cron job, the input state will the
final output state of the **last succesful run** for this workflow. This allows
Expand Down
Binary file added static/img/configuring-kafka.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 2befb80

Please sign in to comment.