Skip to content

Commit

Permalink
Merge pull request #50 from usingtechnology/event-stream-example
Browse files Browse the repository at this point in the history
Adjust subjects and message metadata.
  • Loading branch information
usingtechnology authored Jul 25, 2024
2 parents fef2453 + 6e8b1f1 commit bf3825a
Showing 1 changed file with 119 additions and 16 deletions.
135 changes: 119 additions & 16 deletions docs/Capabilities/Integrations/Event-Stream-Service.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
***
# CHEFS Event Stream Service

**NOTE (July 09, 2024) - this is in active development and not yet released. This documentation will be updated as development progresses.**
**NOTE (July 25, 2024) - this is in active development and not yet released. This documentation will be updated as development progresses.**

CHEFS is adding an Event Streaming Service which allows Form Owners to consume and process real-time data about the forms (ex. a new version of the form has been published) and submissions.

Expand Down Expand Up @@ -41,7 +41,7 @@ Under this stream will be two major subject prefixes:
2. `PRIVATE.forms`


Consumers specify which subjects they wish to process and often use wildcards. For example: a consumer can specify `PUBLIC.forms.>` and `PRIVATE.forms.*.submissions.>`. This would consume ALL public forms events and only private submission events.
Consumers specify which subjects they wish to process and often use wildcards. For example: a consumer can specify `PUBLIC.forms.>` and `PRIVATE.forms.submissions.>`. This would consume ALL public forms events and only private submission events.

`PUBLIC.forms` can be consumed by any client. These events will contain only metadata and no personal/private information.

Expand All @@ -56,35 +56,31 @@ As stated, the two major subjects are `PUBLIC.forms` and `PRIVATE.forms`; these
`PUBLIC|PRIVATE.<domain>.<form id>.<class>.<type>`

- domain = `forms`
- form id = the id of the form firing these events
- class = `schema` or `submission`.
- type = `created`, `deleted`, `modified` and more

- form id = the id of the form firing these events
-
Using wildcards a consumer could listen for events on a specific form id or a specific type (i.e. only listen to submission created events across any form).

- `PUBLIC.forms.<form id>.schema.created`
- `PUBLIC.forms.<form id>.schema.modified`
- `PUBLIC.forms.<form id>.schema.deleted`
- `PUBLIC.forms.schema.published.<form id>`
- `PUBLIC.forms.schema.unpublished.<form id>`

- `PUBLIC.forms.<form id>.submission.created`
- `PUBLIC.forms.<form id>.submission.modified`
- `PUBLIC.forms.<form id>.submission.deleted`
- `PUBLIC.forms.submission.created.<form id>`
- `PUBLIC.forms.submission.modified.<form id>`
- `PUBLIC.forms.submission.deleted.<form id>`

More events to come.

### Event Metadata

| Attribute | Notes |
| --- | --- |
| seqNo | A CHEFS system generated and tracked sequence number for ordering messages |
| timestamp | UTC Timestamp when event was added to stream. This is **not** the timestamp of the CHEFS form or submission record. |
| `source` | `chefs` - where did this event originate? |
| `domain` | `forms` - top level classification for event |
| `class` | `submission` or `schema` - secondary classification for event |
| `type` | `created`, `deleted`, `modified` - tertiary classification for event |
| `formId` | uuid - CHEFS form id . Form that originates the event |
| `formVersionId` | uuid - CHEFS form version id. Only if value exists at time of event. |
| `published` | boolean - For `schema` class events, the formVersion.published value.
| `submissionId` | uuid - CHEFS submission id. Only applies for `submission` class events. |
| `draft` | boolean - For `submission` class events, the submission.draft value |

Expand All @@ -93,8 +89,6 @@ An example to show the overall structure of an event message is:
```json
{
meta: {
seqNo: <numeric>,
timestamp: <iso utc timestamp>,
source: 'chefs',
domain: 'forms',
class: 'submission'
Expand All @@ -105,8 +99,117 @@ An example to show the overall structure of an event message is:
draft: false,
},
payload: {
submission: <encrypted>
data: <encrypted>
}

```

### NATS Message Metadata

nats messages contain very valuable metadata that consumers should leverage for optimal processing. Each message on the stream will have a [sequence number](https://github.com/nats-io/nats.docs/blob/803d660c33496c9b7ba42360945be58621bbba0b/nats-concepts/seq_num.md) and a timestamp. Consumers can schedule batch consumption based on the sequence or timestamp of their last processed event.

### Example Consumer
The following is a trivialized example of a [pull consumer](https://docs.nats.io/nats-concepts/jetstream/consumers). It is up to the external application that consumes/listens to the events to decide how to set up their consumer. This is one way in one language (JavaScript). Please review the documentation about [consumers](https://docs.nats.io/nats-concepts/jetstream/consumers) and review the approved [examples](https://natsbyexample.com) for more information.

The example will ask for a batch of messages every 5 seconds - illustrating some basic pull behaviour. We can see as it processes the messages that there is a sequence number (`m.seq`) and a timestamp (`m.info.timestampNanos`) which we could leverage for different [delivery policies](https://docs.nats.io/nats-concepts/jetstream/consumers#deliverpolicy) such as get all messages since X sequence number.

**Do not use this code for your client! Simplified example only!**

```js
const { AckPolicy, connect } = require("nats");

// connection info
const servers = ["localhost:4222", "localhost:4223", "localhost:4224"];

let nc = undefined; // nats connection
let js = undefined; // jet stream
let jsm = undefined; // jet stream manager
let consumer = undefined; // pull consumer (ordered, ephemeral)

// stream info
const STREAM_NAME = "CHEFS";
const FILTER_SUBJECTS = ["PUBLIC.forms.>", "PRIVATE.forms.>"];
const MAX_MESSAGES = 2;
const DURABLE_NAME = "pullConsumer";

const printMsg = (m) => {
// illustrate grabbing the sequence and timestamp from the nats message...
try {
const ts = new Date(m.info.timestampNanos / 1000000).toISOString();
console.log(
`msg seq: ${m.seq}, subject: ${m.subject}, timestamp: ${ts}, streamSequence: ${m.info.streamSequence}, deliverySequence: ${m.info.deliverySequence}`
);
// illustrate (one way of) grabbing message content as json
console.log(JSON.stringify(m.json(), null, 2));
} catch (e) {
console.error(`Error printing message: ${e.message}`);
}
};

const init = async () => {
if (nc && nc.info != undefined) {
// already connected.
return;
} else {
// open a connection...
try {
// no credentials provided.
// anonymous connections have read access to the stream
console.log(`connect to nats server(s) ${servers} as 'anonymous'...`);
nc = await connect({
servers: servers,
});

console.log("access jetstream...");
js = nc.jetstream();
console.log("get jetstream manager...");
jsm = await js.jetstreamManager();
await jsm.consumers.add(STREAM_NAME, {
ack_policy: AckPolicy.Explicit,
durable_name: DURABLE_NAME,
});
consumer = await js.consumers.get(STREAM_NAME, DURABLE_NAME);
} catch (e) {
console.error(e);
process.exit(0);
}
}
};

const pull = async () => {
console.log("fetch...");
let iter = await consumer.fetch({
filterSubjects: FILTER_SUBJECTS,
max_messages: MAX_MESSAGES,
});
for await (const m of iter) {
printMsg(m);
m.ack();
}
};

const main = async () => {
await init();
await pull();
setTimeout(main, 5000); // process a batch every 5 seconds
};

main();

const shutdown = async () => {
console.log("\nshutdown...");
console.log("drain connection...");
await nc.drain();
process.exit(0);
};

process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
process.on("SIGUSR1", shutdown);
process.on("SIGUSR2", shutdown);
process.on("exit", () => {
console.log("exit.");
});

```

Expand Down

0 comments on commit bf3825a

Please sign in to comment.