Skip to content

Commit

Permalink
Sample for message passing docs (#382)
Browse files Browse the repository at this point in the history
* Introduction to message passing sample

* Rename updates-and-signals => message-passing
  • Loading branch information
dandavison authored Sep 16, 2024
1 parent e126c56 commit c3ce8fa
Show file tree
Hide file tree
Showing 49 changed files with 512 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .scripts/list-of-samples.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"hello-world-js",
"hello-world-mtls",
"interceptors-opentelemetry",
"message-passing",
"monorepo-folders",
"mutex",
"nestjs-exchange-rates",
Expand All @@ -37,7 +38,6 @@
"state",
"timer-examples",
"timer-progress",
"updates-and-signals",
"vscode-debugger",
"worker-specific-task-queues",
"worker-versioning"
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
10 changes: 10 additions & 0 deletions message-passing/introduction/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
### Introduction

An introduction to Queries, Signals, and Updates.

### Running this sample

1. `temporal server start-dev --dynamic-config-value frontend.enableUpdateWorkflowExecution=true` to start [Temporal Server](https://github.com/temporalio/cli/#installation) with Workflow Updates enabled.
1. `npm install` to install dependencies.
1. `npm run start.watch` to start the Worker.
1. In another shell, `npm run workflow` to run the Workflow Client.
48 changes: 48 additions & 0 deletions message-passing/introduction/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"name": "temporal-update",
"version": "0.1.0",
"private": true,
"scripts": {
"build": "tsc --build",
"build.watch": "tsc --build --watch",
"lint": "eslint .",
"start": "ts-node src/worker.ts",
"start.watch": "nodemon src/worker.ts",
"workflow": "ts-node src/client.ts",
"format": "prettier --config .prettierrc 'src/**/*.ts' --write",
"test": "mocha --exit --require ts-node/register src/test/*.test.ts"
},
"nodemonConfig": {
"execMap": {
"ts": "ts-node"
},
"ext": "ts",
"watch": [
"src"
]
},
"dependencies": {
"@temporalio/activity": "^1.11.1",
"@temporalio/client": "^1.11.1",
"@temporalio/worker": "^1.11.1",
"@temporalio/workflow": "^1.11.1",
"async-mutex": "^0.5.0",
"nanoid": "3.x"
},
"devDependencies": {
"@temporalio/testing": "^1.11.1",
"@tsconfig/node16": "^1.0.0",
"@types/mocha": "8.x",
"@types/node": "^16.11.43",
"@typescript-eslint/eslint-plugin": "^5.0.0",
"@typescript-eslint/parser": "^5.0.0",
"eslint": "^7.32.0",
"eslint-config-prettier": "^8.3.0",
"eslint-plugin-deprecation": "^1.2.1",
"mocha": "8.x",
"nodemon": "^2.0.12",
"prettier": "^2.8.8",
"ts-node": "^10.8.1",
"typescript": "^4.4.2"
}
}
53 changes: 53 additions & 0 deletions message-passing/introduction/src/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// @@@SNIPSTART typescript-message-passing-introduction
import * as cl from '@temporalio/client';
import * as greetingWorkflow from './workflows';
import { Language } from './workflows';
import { nanoid } from 'nanoid';

export async function run() {
const connection = await cl.Connection.connect({ address: 'localhost:7233' });
const client = new cl.Client({ connection });
const handle = await client.workflow.start(greetingWorkflow.greetingWorkflow, {
taskQueue: 'my-task-queue',
args: [],
workflowId: `messages-introduction-${nanoid()}`,
});

const supportedLanguages = await handle.query(greetingWorkflow.getLanguages, { includeUnsupported: false });
console.log(`supported languages: ${supportedLanguages}`);

// Use executeUpdate to change the language
let previousLanguage = await handle.executeUpdate(greetingWorkflow.setLanguage, {
args: [Language.CHINESE],
});

// Send a query
let currentLanguage = await handle.query(greetingWorkflow.getLanguage);
console.log(`language changed: ${previousLanguage} -> ${currentLanguage}`);

// Use startUpdate followed by handle.result() to change the language
const updateHandle = await handle.startUpdate(greetingWorkflow.setLanguage, {
args: [Language.ENGLISH],
waitForStage: cl.WorkflowUpdateStage.ACCEPTED,
});
previousLanguage = await updateHandle.result();
currentLanguage = await handle.query(greetingWorkflow.getLanguage);
console.log(`language changed: ${previousLanguage} -> ${currentLanguage}`);

// Use an async update handler that calls an activity to change the language
previousLanguage = await handle.executeUpdate(greetingWorkflow.setLanguageUsingActivity, {
args: [Language.ARABIC],
});
currentLanguage = await handle.query(greetingWorkflow.getLanguage);
console.log(`language changed: ${previousLanguage} -> ${currentLanguage}`);

// Send a signal
await handle.signal(greetingWorkflow.approve, { name: '' });
console.log(await handle.result());
}

run().catch((err) => {
console.error(err);
process.exit(1);
});
// @@@SNIPEND
154 changes: 154 additions & 0 deletions message-passing/introduction/src/test/workflows.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import { TestWorkflowEnvironment } from '@temporalio/testing';
import { before, describe, it } from 'mocha';
import * as wo from '@temporalio/worker';
import * as greetingWorkflow from '../workflows';
import { Language } from '../workflows';
import assert from 'assert';
import { nanoid } from 'nanoid';
import { WorkflowUpdateFailedError } from '@temporalio/client';

const taskQueue = 'message-passing-introduction';

describe('greeting workflow', function () {
this.timeout(10000);
let worker: wo.Worker;
let env: TestWorkflowEnvironment;
let workflowBundle: wo.WorkflowBundleWithSourceMap;

before(async function () {
wo.Runtime.install({ logger: new wo.DefaultLogger('WARN') });
env = await TestWorkflowEnvironment.createLocal();

workflowBundle = await wo.bundleWorkflowCode({
workflowsPath: require.resolve('../workflows'),
logger: new wo.DefaultLogger('WARN'),
});
});

beforeEach(async function () {
worker = await wo.Worker.create({
connection: env.nativeConnection,
workflowBundle,
taskQueue,
activities: greetingWorkflow.activities,
});
});

after(async function () {
await env.teardown();
});

it('can be queried for supported languages', async function () {
const wfResult = await worker.runUntil(async () => {
const wfHandle = await env.client.workflow.start(greetingWorkflow.greetingWorkflow, {
taskQueue,
workflowId: nanoid(),
});
const supportedLanguages = await wfHandle.query(greetingWorkflow.getLanguages, {
includeUnsupported: false,
});
assert.deepEqual(supportedLanguages, [Language.CHINESE, Language.ENGLISH]);
await wfHandle.signal(greetingWorkflow.approve, { name: 'test-approver' });
return await wfHandle.result();
});
assert.equal(wfResult, 'Hello, world');
});

it('can be queried for unsupported language', async function () {
const wfResult = await worker.runUntil(async () => {
const wfHandle = await env.client.workflow.start(greetingWorkflow.greetingWorkflow, {
taskQueue,
workflowId: nanoid(),
});
const allLanguages = await wfHandle.query(greetingWorkflow.getLanguages, {
includeUnsupported: true,
});
assert.deepEqual(allLanguages, [
Language.ARABIC,
Language.CHINESE,
Language.ENGLISH,
Language.FRENCH,
Language.HINDI,
Language.PORTUGUESE,
Language.SPANISH,
]);
await wfHandle.signal(greetingWorkflow.approve, { name: 'test-approver' });
return await wfHandle.result();
});
assert.equal(wfResult, 'Hello, world');
});

it('can be updated to change the language', async function () {
const wfResult = await worker.runUntil(async () => {
const wfHandle = await env.client.workflow.start(greetingWorkflow.greetingWorkflow, {
taskQueue,
workflowId: nanoid(),
});

const currentLanguage = await wfHandle.query(greetingWorkflow.getLanguage);
assert.equal(currentLanguage, Language.ENGLISH);

await wfHandle.executeUpdate(greetingWorkflow.setLanguage, {
args: [Language.CHINESE],
});

const updatedLanguage = await wfHandle.query(greetingWorkflow.getLanguage);
assert.equal(updatedLanguage, Language.CHINESE);

await wfHandle.signal(greetingWorkflow.approve, { name: 'test-approver' });
return await wfHandle.result();
});
assert.equal(wfResult, '你好,世界');
});

it('rejects an invalid language', async function () {
const wfResult = await worker.runUntil(async () => {
const wfHandle = await env.client.workflow.start(greetingWorkflow.greetingWorkflow, {
taskQueue,
workflowId: nanoid(),
});

const currentLanguage = await wfHandle.query(greetingWorkflow.getLanguage);
assert.equal(currentLanguage, Language.ENGLISH);

try {
await wfHandle.executeUpdate(greetingWorkflow.setLanguage, {
args: [Language.PORTUGUESE],
});
assert.fail('Expected an error to be thrown');
} catch (err) {
assert(err instanceof WorkflowUpdateFailedError, 'Expected WorkflowUpdateFailedError');
}

const updatedLanguage = await wfHandle.query(greetingWorkflow.getLanguage);
assert.equal(updatedLanguage, Language.ENGLISH);

await wfHandle.signal(greetingWorkflow.approve, { name: 'test-approver' });
return await wfHandle.result();
});
assert.equal(wfResult, 'Hello, world');
});

it('can be updated to change the language using an activity', async function () {
const wfResult = await worker.runUntil(async () => {
const wfHandle = await env.client.workflow.start(greetingWorkflow.greetingWorkflow, {
taskQueue,
workflowId: nanoid(),
});

const currentLanguage = await wfHandle.query(greetingWorkflow.getLanguage);
assert.equal(currentLanguage, Language.ENGLISH);

await wfHandle.executeUpdate(greetingWorkflow.setLanguageUsingActivity, {
args: [Language.PORTUGUESE],
});

const updatedLanguage = await wfHandle.query(greetingWorkflow.getLanguage);
assert.equal(updatedLanguage, Language.PORTUGUESE);

await wfHandle.signal(greetingWorkflow.approve, { name: 'test-approver' });
return await wfHandle.result();
});
assert.equal(wfResult, 'Olá mundo');
});
});
23 changes: 23 additions & 0 deletions message-passing/introduction/src/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// @@@SNIPSTART typescript-message-passing-introduction
import * as wo from '@temporalio/worker';
import { activities } from './workflows';

async function run() {
const connection = await wo.NativeConnection.connect({
address: 'localhost:7233',
});
const worker = await wo.Worker.create({
connection,
namespace: 'default',
taskQueue: 'my-task-queue',
workflowsPath: require.resolve('./workflows'),
activities,
});
await worker.run();
}

run().catch((err) => {
console.error(err);
process.exit(1);
});
// @@@SNIPEND
Loading

0 comments on commit c3ce8fa

Please sign in to comment.