Skip to content

Commit

Permalink
Documenting Kafka triggers
Browse files Browse the repository at this point in the history
  • Loading branch information
christad92 committed Jul 30, 2024
1 parent a7c95e8 commit 7cef6b0
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 2 deletions.
37 changes: 37 additions & 0 deletions docs/build/triggers.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,40 @@ fn(state => {
return state;
});
```

## Kafka Triggers

The Kafka trigger allows OpenFn users to build workflows triggered by messages
published by a Kafka cluster. The triggers make use of Kafka consumer groups that are
set up on-demand to receive messages from the cluster and pass them to an OpenFn pipleine
that transforms the message into dataclips that used to initialize a workorder.

![Configuring Kafka Trigger](/img/configuring-kafka.png)

### how to configure a Kafka trigger
1. Create a new workflow or Ooen an existing workflow
2. Click on the trigger step and change the trigger type to Kafka Consumer in the Trigger type dropdown
3. Fill out the required connection details:
- Hosts: Provide the URL of the host(s) your trigger should listen to for message.
- Topics: What specific topics should the Kafka consumers subscribe to. You need at
least one topic for a successful connection
- Some Kafka cluster require SSL connection. If you are connecting to an environment
that requires SSL connection, select `Enable SSL`
- Select the type of Authentication and provide the username and password for
connecting to the instance. (see tip below for more information on authenticating Kafka)
- Set the initial offset policy. The intial offset dictates where the consumer starts
reading messages from a topic when it subscribes for the first time. There are three
possible options here : `earliest`, `latest` or specific `timestamp` *for example: 1721889238000*.
- Set the Connect timeout. The connect timeout specified in secs is the duraiton of time
the consumer can wait before timing out when attempting to connect with a Kafka cluster.
4. If you have not completed budiling your workflow or you're not ready to start receiving messages
from the Kafka cluster, please disable the trigger until you're ready.

:::warning Disable the trigger during workflw design
Once the required connection information is provided via the modal, the triggest start attempting to connect to the
cluster. We advice that you disable the workflow until your workflow is ready to consumer data from cluster. To stop
the trigger from receiving and processing messages, disable the trigger.
:::

Learn how a Kafka trigger workflow's initial `state` gets built from a webhook trigger
[here](../jobs/state#kafka-triggered-runs).
43 changes: 41 additions & 2 deletions docs/jobs/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ operation. Remember that job expressions are a series of operations: they each
take state and return state, after creating any number of side effects. You can
control what is returned at the end of all of these operations.

### Webhook triggered Runs
### Webhook triggered runs

On the platform, when a Run is triggered by a webhook event, the input state
contains important parts of the inbound **http request**.
Expand All @@ -79,7 +79,46 @@ The input state will look something like this:
}
```

### Cron triggered Runs
### Kafka triggered runs

When a kafka message is received by the trigger, the input state contains important information for
auditing or recovering from any loss of connection or failure of the workorder.

The input state looks like this:

```js
{
data: { // the body of the kafka message request
body: {
formId: "patient_enrollment",
name: "John Doe"
},
headers: {
"X-Forwarded-For": "41.90.69.203",
"X-Forwarded-Host": "ec2-13-244-108-204.af-south-1.compute.amazonaws.com:5001",
"X-OpenHIM-ClientID": "test",
"X-OpenHIM-TransactionID": "66a1f1d4c81081a1e6809484",
"accept": "application/fhir+json",
"content-length": "20592",
"content-type": "application/fhir+json",
"user-agent": "k6/0.49.0 (https://k6.io/)"
},
method: "POST",
path: "/kafka",
pattern: "^/kafka$"
},
request: {
"headers": [],
"key": "",
"offset": 168321,
"partition": 1,
"topic": "fhir-data-pipes",
"ts": 1721889238000
}
}
```

### Cron triggered runs

On the platform, when a Run is triggered by a cron job, the input state will the
final output state of the **last succesful run** for this workflow. This allows
Expand Down
Binary file added static/img/configuring-kafka.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 7cef6b0

Please sign in to comment.