From 6d33e489fa8f711f183842b343077c92238bb7f4 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 4 Oct 2024 16:17:17 -0400 Subject: [PATCH 1/4] Fix bugs in cluster-manager start/stop --- .../safe-message-handlers/src/activities.ts | 5 +++ .../src/cluster-manager.ts | 32 +++++++++++-------- .../src/run-simulation.ts | 6 ++-- .../src/test/workflows.test.ts | 5 +-- .../safe-message-handlers/src/types.ts | 8 +++-- .../safe-message-handlers/src/workflows.ts | 12 ++++--- 6 files changed, 45 insertions(+), 23 deletions(-) diff --git a/message-passing/safe-message-handlers/src/activities.ts b/message-passing/safe-message-handlers/src/activities.ts index 1e2399f0..35efc237 100644 --- a/message-passing/safe-message-handlers/src/activities.ts +++ b/message-passing/safe-message-handlers/src/activities.ts @@ -15,6 +15,11 @@ export async function startCluster(): Promise { await activities.sleep(100); // Simulate RPC } +export async function shutdownCluster(): Promise { + activities.log.info('Shutting down cluster'); + await activities.sleep(100); // Simulate RPC +} + export async function assignNodesToJob(input: AssignNodesToJobInput): Promise { activities.log.info(`Assigning nodes ${input.nodes} to job ${input.jobName}`); await activities.sleep(100); // Simulate RPC diff --git a/message-passing/safe-message-handlers/src/cluster-manager.ts b/message-passing/safe-message-handlers/src/cluster-manager.ts index 3af5616f..e4f07e71 100644 --- a/message-passing/safe-message-handlers/src/cluster-manager.ts +++ b/message-passing/safe-message-handlers/src/cluster-manager.ts @@ -4,11 +4,12 @@ import { Mutex } from 'async-mutex'; import { AssignNodesToJobUpdateInput, ClusterManagerState, + ClusterState, ClusterManagerStateSummary, DeleteJobUpdateInput, } from './types'; -const { assignNodesToJob, unassignNodesForJob, startCluster } = wf.proxyActivities({ +const { assignNodesToJob, unassignNodesForJob, startCluster, shutdownCluster } = wf.proxyActivities({ startToCloseTimeout: '1 minute', }); @@ -27,8 +28,7 @@ export class ClusterManager { constructor(state?: ClusterManagerState) { this.state = state ?? { - clusterStarted: false, - clusterShutdown: false, + clusterState: ClusterState.DOWN, nodes: new Map(), maxAssignedNodes: 0, }; @@ -37,23 +37,30 @@ export class ClusterManager { } async startCluster(): Promise { + if (this.state.clusterState === ClusterState.UP) { + return; + } await startCluster(); - this.state.clusterStarted = true; + this.state.clusterState = ClusterState.UP; for (let i = 0; i < 25; i++) { this.state.nodes.set(i.toString(), null); } wf.log.info('Cluster started'); } - async shutDownCluster(): Promise { - await wf.condition(() => this.state.clusterStarted); - this.state.clusterShutdown = true; + async shutDownCluster(): Promise { + if (this.state.clusterState === ClusterState.DOWN) { + throw new wf.ApplicationFailure('Cluster is already down'); + } + await shutdownCluster(); + this.state.clusterState = ClusterState.DOWN; wf.log.info('Cluster shutdown'); + return true; } async assignNodesToJob(input: AssignNodesToJobUpdateInput): Promise { - await wf.condition(() => this.state.clusterStarted); - if (this.state.clusterShutdown) { + await wf.condition(() => this.state.clusterState === ClusterState.UP); + if (this.state.clusterState === ClusterState.DOWN) { // If you want the client to receive a failure, either add an update validator and throw the // exception from there, or raise an ApplicationError. Other exceptions in the handler will // cause the workflow to keep retrying and get it stuck. @@ -83,8 +90,8 @@ export class ClusterManager { } async deleteJob(input: DeleteJobUpdateInput) { - await wf.condition(() => this.state.clusterStarted); - if (this.state.clusterShutdown) { + await wf.condition(() => this.state.clusterState === ClusterState.UP); + if (this.state.clusterState === ClusterState.DOWN) { // If you want the client to receive a failure, either add an update validator and throw the // exception from there, or raise an ApplicationError. Other exceptions in the handler will // cause the workflow to keep retrying and get it stuck. @@ -105,8 +112,7 @@ export class ClusterManager { getState(): ClusterManagerState { return { - clusterStarted: this.state.clusterStarted, - clusterShutdown: this.state.clusterShutdown, + clusterState: this.state.clusterState, nodes: this.state.nodes, maxAssignedNodes: this.state.maxAssignedNodes, }; diff --git a/message-passing/safe-message-handlers/src/run-simulation.ts b/message-passing/safe-message-handlers/src/run-simulation.ts index 9096c888..b4623b4a 100644 --- a/message-passing/safe-message-handlers/src/run-simulation.ts +++ b/message-passing/safe-message-handlers/src/run-simulation.ts @@ -1,6 +1,6 @@ import { WorkflowHandle } from '@temporalio/client'; -import { assignNodesToJobUpdate, startClusterSignal, deleteJobUpdate, shutdownClusterSignal } from './workflows'; +import { assignNodesToJobUpdate, startClusterSignal, deleteJobUpdate, shutdownClusterUpdate } from './workflows'; import { startClusterManager } from './client'; import { setTimeout } from 'timers/promises'; @@ -23,7 +23,9 @@ async function runSimulation(wf: WorkflowHandle, delaySeconds?: number): Promise } await Promise.all(deletionUpdates); - await wf.signal(shutdownClusterSignal); + if (!(await wf.executeUpdate(shutdownClusterUpdate))) { + throw new Error('Failed to shutdown cluster'); + } } async function main() { diff --git a/message-passing/safe-message-handlers/src/test/workflows.test.ts b/message-passing/safe-message-handlers/src/test/workflows.test.ts index 383328a1..02e38483 100644 --- a/message-passing/safe-message-handlers/src/test/workflows.test.ts +++ b/message-passing/safe-message-handlers/src/test/workflows.test.ts @@ -2,11 +2,12 @@ import { TestWorkflowEnvironment } from '@temporalio/testing'; import { before, describe, it } from 'mocha'; import { bundleWorkflowCode, WorkflowBundleWithSourceMap, DefaultLogger, Runtime, Worker } from '@temporalio/worker'; import * as activities from '../activities'; +import * as client from '@temporalio/client'; import { clusterManagerWorkflow, assignNodesToJobUpdate, startClusterSignal, - shutdownClusterSignal, + shutdownClusterUpdate, deleteJobUpdate, getClusterStatusQuery, } from '../workflows'; @@ -81,7 +82,7 @@ describe('cluster manager', function () { ); assert.equal(queryResult.maxAssignedNodes, request1.numNodes + request2.numNodes); // Terminate the workflow and check that workflow returns same value as obtained from last query. - await workflow.signal(shutdownClusterSignal); + assert.equal(await workflow.executeUpdate(shutdownClusterUpdate), true); const wfResult = await workflow.result(); assert.deepEqual(wfResult, queryResult); }); diff --git a/message-passing/safe-message-handlers/src/types.ts b/message-passing/safe-message-handlers/src/types.ts index 6454944b..86278616 100644 --- a/message-passing/safe-message-handlers/src/types.ts +++ b/message-passing/safe-message-handlers/src/types.ts @@ -1,6 +1,5 @@ export interface ClusterManagerState { - clusterStarted: boolean; - clusterShutdown: boolean; + clusterState: ClusterState; nodes: Map; maxAssignedNodes: number; } @@ -28,3 +27,8 @@ export interface ClusterManagerWorkflowResult { numCurrentlyAssignedNodes: number; numBadNodes: number; } + +export enum ClusterState { + UP = 'UP', + DOWN = 'DOWN', +} diff --git a/message-passing/safe-message-handlers/src/workflows.ts b/message-passing/safe-message-handlers/src/workflows.ts index 5605f3ae..bac804d5 100644 --- a/message-passing/safe-message-handlers/src/workflows.ts +++ b/message-passing/safe-message-handlers/src/workflows.ts @@ -4,11 +4,12 @@ import { AssignNodesToJobUpdateInput, ClusterManagerInput, ClusterManagerStateSummary, + ClusterState, DeleteJobUpdateInput, } from './types'; export const startClusterSignal = wf.defineSignal('startCluster'); -export const shutdownClusterSignal = wf.defineSignal('shutdownCluster'); +export const shutdownClusterUpdate = wf.defineUpdate('shutdownCluster'); export const assignNodesToJobUpdate = wf.defineUpdate( 'allocateNodesToJob' ); @@ -21,7 +22,7 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): P // Message-handling API // wf.setHandler(startClusterSignal, () => manager.startCluster()); - wf.setHandler(shutdownClusterSignal, () => manager.shutDownCluster()); + wf.setHandler(shutdownClusterUpdate, () => manager.shutDownCluster()); // This is an update as opposed to a signal because the client may want to wait for nodes to be // allocated before sending work to those nodes. Returns the array of node names that were @@ -49,8 +50,11 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): P // lies in the message-processing handlers implented in the ClusterManager class. The main // workflow itself simply waits until the cluster is shutdown, or the workflow needs to // continue-as-new. - await wf.condition(() => manager.state.clusterShutdown || wf.workflowInfo().continueAsNewSuggested); - if (!manager.state.clusterShutdown) { + await wf.condition(() => manager.state.clusterState === ClusterState.UP); + await wf.condition( + () => manager.state.clusterState === ClusterState.DOWN || wf.workflowInfo().continueAsNewSuggested + ); + if (manager.state.clusterState !== ClusterState.DOWN) { // You should typically wait for all async handlers to finish before // completing a workflow or continuing as new. If the main workflow method // is scheduling activities or child workflows, then you should typically From 57b829b740e78897d21bf114c752889bca1fa616 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 4 Oct 2024 18:37:14 -0400 Subject: [PATCH 2/4] Initialize cluster with lock held --- .../safe-message-handlers/src/cluster-manager.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/message-passing/safe-message-handlers/src/cluster-manager.ts b/message-passing/safe-message-handlers/src/cluster-manager.ts index e4f07e71..fe1cafb1 100644 --- a/message-passing/safe-message-handlers/src/cluster-manager.ts +++ b/message-passing/safe-message-handlers/src/cluster-manager.ts @@ -40,11 +40,13 @@ export class ClusterManager { if (this.state.clusterState === ClusterState.UP) { return; } - await startCluster(); - this.state.clusterState = ClusterState.UP; - for (let i = 0; i < 25; i++) { - this.state.nodes.set(i.toString(), null); - } + await this.nodesMutex.runExclusive(async () => { + await startCluster(); + this.state.clusterState = ClusterState.UP; + for (let i = 0; i < 25; i++) { + this.state.nodes.set(i.toString(), null); + } + }); wf.log.info('Cluster started'); } From fd1cbd8102ab5d10b8f5ecb9cdc600656da3ff20 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 7 Oct 2024 17:14:56 -0400 Subject: [PATCH 3/4] Fix inconsistent capitalization --- message-passing/safe-message-handlers/src/cluster-manager.ts | 2 +- message-passing/safe-message-handlers/src/workflows.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/message-passing/safe-message-handlers/src/cluster-manager.ts b/message-passing/safe-message-handlers/src/cluster-manager.ts index fe1cafb1..4942cd6a 100644 --- a/message-passing/safe-message-handlers/src/cluster-manager.ts +++ b/message-passing/safe-message-handlers/src/cluster-manager.ts @@ -50,7 +50,7 @@ export class ClusterManager { wf.log.info('Cluster started'); } - async shutDownCluster(): Promise { + async shutdownCluster(): Promise { if (this.state.clusterState === ClusterState.DOWN) { throw new wf.ApplicationFailure('Cluster is already down'); } diff --git a/message-passing/safe-message-handlers/src/workflows.ts b/message-passing/safe-message-handlers/src/workflows.ts index bac804d5..938a7a24 100644 --- a/message-passing/safe-message-handlers/src/workflows.ts +++ b/message-passing/safe-message-handlers/src/workflows.ts @@ -22,7 +22,7 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): P // Message-handling API // wf.setHandler(startClusterSignal, () => manager.startCluster()); - wf.setHandler(shutdownClusterUpdate, () => manager.shutDownCluster()); + wf.setHandler(shutdownClusterUpdate, () => manager.shutdownCluster()); // This is an update as opposed to a signal because the client may want to wait for nodes to be // allocated before sending work to those nodes. Returns the array of node names that were From c150943f29fa6f482504a707779bd676b455abdc Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 9 Oct 2024 06:10:46 -0400 Subject: [PATCH 4/4] Switch to tri-state state space --- .../src/cluster-manager.ts | 25 +++++++++++-------- .../safe-message-handlers/src/types.ts | 5 ++-- .../safe-message-handlers/src/workflows.ts | 6 ++--- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/message-passing/safe-message-handlers/src/cluster-manager.ts b/message-passing/safe-message-handlers/src/cluster-manager.ts index 4942cd6a..7f85b2be 100644 --- a/message-passing/safe-message-handlers/src/cluster-manager.ts +++ b/message-passing/safe-message-handlers/src/cluster-manager.ts @@ -28,7 +28,7 @@ export class ClusterManager { constructor(state?: ClusterManagerState) { this.state = state ?? { - clusterState: ClusterState.DOWN, + clusterState: ClusterState.NOT_STARTED, nodes: new Map(), maxAssignedNodes: 0, }; @@ -37,12 +37,14 @@ export class ClusterManager { } async startCluster(): Promise { - if (this.state.clusterState === ClusterState.UP) { + if (this.state.clusterState !== ClusterState.NOT_STARTED) { + // This is used as a Signal handler so we log a warning but cannot return an error. + wf.log.warn(`Cannot start cluster in state ${this.state.clusterState}`); return; } await this.nodesMutex.runExclusive(async () => { await startCluster(); - this.state.clusterState = ClusterState.UP; + this.state.clusterState = ClusterState.STARTED; for (let i = 0; i < 25; i++) { this.state.nodes.set(i.toString(), null); } @@ -51,22 +53,23 @@ export class ClusterManager { } async shutdownCluster(): Promise { - if (this.state.clusterState === ClusterState.DOWN) { - throw new wf.ApplicationFailure('Cluster is already down'); + if (this.state.clusterState !== ClusterState.STARTED) { + // This is used as an Update handler we return an error to the caller. + throw new wf.ApplicationFailure(`Cannot shutdown cluster in state ${this.state.clusterState}`); } await shutdownCluster(); - this.state.clusterState = ClusterState.DOWN; + this.state.clusterState = ClusterState.SHUTTING_DOWN; wf.log.info('Cluster shutdown'); return true; } async assignNodesToJob(input: AssignNodesToJobUpdateInput): Promise { - await wf.condition(() => this.state.clusterState === ClusterState.UP); - if (this.state.clusterState === ClusterState.DOWN) { + await wf.condition(() => this.state.clusterState === ClusterState.STARTED); + if (this.state.clusterState === ClusterState.SHUTTING_DOWN) { // If you want the client to receive a failure, either add an update validator and throw the // exception from there, or raise an ApplicationError. Other exceptions in the handler will // cause the workflow to keep retrying and get it stuck. - throw new wf.ApplicationFailure('Cannot assign nodes to a job: Cluster is already shut down'); + throw new wf.ApplicationFailure('Cannot assign nodes to a job: Cluster is shutting down'); } return await this.nodesMutex.runExclusive(async (): Promise => { // Idempotency guard: do nothing if the job already has nodes assigned. @@ -92,8 +95,8 @@ export class ClusterManager { } async deleteJob(input: DeleteJobUpdateInput) { - await wf.condition(() => this.state.clusterState === ClusterState.UP); - if (this.state.clusterState === ClusterState.DOWN) { + await wf.condition(() => this.state.clusterState === ClusterState.STARTED); + if (this.state.clusterState === ClusterState.SHUTTING_DOWN) { // If you want the client to receive a failure, either add an update validator and throw the // exception from there, or raise an ApplicationError. Other exceptions in the handler will // cause the workflow to keep retrying and get it stuck. diff --git a/message-passing/safe-message-handlers/src/types.ts b/message-passing/safe-message-handlers/src/types.ts index 86278616..50fecb66 100644 --- a/message-passing/safe-message-handlers/src/types.ts +++ b/message-passing/safe-message-handlers/src/types.ts @@ -29,6 +29,7 @@ export interface ClusterManagerWorkflowResult { } export enum ClusterState { - UP = 'UP', - DOWN = 'DOWN', + NOT_STARTED = 'NOT_STARTED', + STARTED = 'STARTED', + SHUTTING_DOWN = 'SHUTTING_DOWN', } diff --git a/message-passing/safe-message-handlers/src/workflows.ts b/message-passing/safe-message-handlers/src/workflows.ts index 938a7a24..84c99e9b 100644 --- a/message-passing/safe-message-handlers/src/workflows.ts +++ b/message-passing/safe-message-handlers/src/workflows.ts @@ -50,11 +50,11 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): P // lies in the message-processing handlers implented in the ClusterManager class. The main // workflow itself simply waits until the cluster is shutdown, or the workflow needs to // continue-as-new. - await wf.condition(() => manager.state.clusterState === ClusterState.UP); + await wf.condition(() => manager.state.clusterState === ClusterState.STARTED); await wf.condition( - () => manager.state.clusterState === ClusterState.DOWN || wf.workflowInfo().continueAsNewSuggested + () => manager.state.clusterState === ClusterState.SHUTTING_DOWN || wf.workflowInfo().continueAsNewSuggested ); - if (manager.state.clusterState !== ClusterState.DOWN) { + if (manager.state.clusterState !== ClusterState.SHUTTING_DOWN) { // You should typically wait for all async handlers to finish before // completing a workflow or continuing as new. If the main workflow method // is scheduling activities or child workflows, then you should typically