Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cluster start/stop #386

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions message-passing/safe-message-handlers/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ export async function startCluster(): Promise<void> {
await activities.sleep(100); // Simulate RPC
}

export async function shutdownCluster(): Promise<void> {
activities.log.info('Shutting down cluster');
await activities.sleep(100); // Simulate RPC
}

export async function assignNodesToJob(input: AssignNodesToJobInput): Promise<void> {
activities.log.info(`Assigning nodes ${input.nodes} to job ${input.jobName}`);
await activities.sleep(100); // Simulate RPC
Expand Down
45 changes: 28 additions & 17 deletions message-passing/safe-message-handlers/src/cluster-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import { Mutex } from 'async-mutex';
import {
AssignNodesToJobUpdateInput,
ClusterManagerState,
ClusterState,
ClusterManagerStateSummary,
DeleteJobUpdateInput,
} from './types';

const { assignNodesToJob, unassignNodesForJob, startCluster } = wf.proxyActivities<typeof activities>({
const { assignNodesToJob, unassignNodesForJob, startCluster, shutdownCluster } = wf.proxyActivities<typeof activities>({
startToCloseTimeout: '1 minute',
});

Expand All @@ -27,8 +28,7 @@ export class ClusterManager {

constructor(state?: ClusterManagerState) {
this.state = state ?? {
clusterStarted: false,
clusterShutdown: false,
clusterState: ClusterState.NOT_STARTED,
nodes: new Map<string, string | null>(),
maxAssignedNodes: 0,
};
Expand All @@ -37,27 +37,39 @@ export class ClusterManager {
}

async startCluster(): Promise<void> {
await startCluster();
this.state.clusterStarted = true;
for (let i = 0; i < 25; i++) {
this.state.nodes.set(i.toString(), null);
if (this.state.clusterState !== ClusterState.NOT_STARTED) {
// This is used as a Signal handler so we log a warning but cannot return an error.
wf.log.warn(`Cannot start cluster in state ${this.state.clusterState}`);
return;
}
await this.nodesMutex.runExclusive(async () => {
await startCluster();
this.state.clusterState = ClusterState.STARTED;
for (let i = 0; i < 25; i++) {
this.state.nodes.set(i.toString(), null);
}
});
wf.log.info('Cluster started');
}

async shutDownCluster(): Promise<void> {
await wf.condition(() => this.state.clusterStarted);
this.state.clusterShutdown = true;
async shutdownCluster(): Promise<true> {
if (this.state.clusterState !== ClusterState.STARTED) {
// This is used as an Update handler we return an error to the caller.
throw new wf.ApplicationFailure(`Cannot shutdown cluster in state ${this.state.clusterState}`);
}
await shutdownCluster();
this.state.clusterState = ClusterState.SHUTTING_DOWN;
wf.log.info('Cluster shutdown');
return true;
}

async assignNodesToJob(input: AssignNodesToJobUpdateInput): Promise<ClusterManagerStateSummary> {
await wf.condition(() => this.state.clusterStarted);
if (this.state.clusterShutdown) {
await wf.condition(() => this.state.clusterState === ClusterState.STARTED);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dandavison I just noticed is there a deadlock here if the clusterState goes from STARTED to SHUTTING_DOWN without this handler running?

if (this.state.clusterState === ClusterState.SHUTTING_DOWN) {
// If you want the client to receive a failure, either add an update validator and throw the
// exception from there, or raise an ApplicationError. Other exceptions in the handler will
// cause the workflow to keep retrying and get it stuck.
throw new wf.ApplicationFailure('Cannot assign nodes to a job: Cluster is already shut down');
throw new wf.ApplicationFailure('Cannot assign nodes to a job: Cluster is shutting down');
}
return await this.nodesMutex.runExclusive(async (): Promise<ClusterManagerStateSummary> => {
// Idempotency guard: do nothing if the job already has nodes assigned.
Expand All @@ -83,8 +95,8 @@ export class ClusterManager {
}

async deleteJob(input: DeleteJobUpdateInput) {
await wf.condition(() => this.state.clusterStarted);
if (this.state.clusterShutdown) {
await wf.condition(() => this.state.clusterState === ClusterState.STARTED);
if (this.state.clusterState === ClusterState.SHUTTING_DOWN) {
// If you want the client to receive a failure, either add an update validator and throw the
// exception from there, or raise an ApplicationError. Other exceptions in the handler will
// cause the workflow to keep retrying and get it stuck.
Expand All @@ -105,8 +117,7 @@ export class ClusterManager {

getState(): ClusterManagerState {
return {
clusterStarted: this.state.clusterStarted,
clusterShutdown: this.state.clusterShutdown,
clusterState: this.state.clusterState,
nodes: this.state.nodes,
maxAssignedNodes: this.state.maxAssignedNodes,
};
Expand Down
6 changes: 4 additions & 2 deletions message-passing/safe-message-handlers/src/run-simulation.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { WorkflowHandle } from '@temporalio/client';

import { assignNodesToJobUpdate, startClusterSignal, deleteJobUpdate, shutdownClusterSignal } from './workflows';
import { assignNodesToJobUpdate, startClusterSignal, deleteJobUpdate, shutdownClusterUpdate } from './workflows';
import { startClusterManager } from './client';
import { setTimeout } from 'timers/promises';

Expand All @@ -23,7 +23,9 @@ async function runSimulation(wf: WorkflowHandle, delaySeconds?: number): Promise
}
await Promise.all(deletionUpdates);

await wf.signal(shutdownClusterSignal);
if (!(await wf.executeUpdate(shutdownClusterUpdate))) {
throw new Error('Failed to shutdown cluster');
}
}

async function main() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import { TestWorkflowEnvironment } from '@temporalio/testing';
import { before, describe, it } from 'mocha';
import { bundleWorkflowCode, WorkflowBundleWithSourceMap, DefaultLogger, Runtime, Worker } from '@temporalio/worker';
import * as activities from '../activities';
import * as client from '@temporalio/client';
import {
clusterManagerWorkflow,
assignNodesToJobUpdate,
startClusterSignal,
shutdownClusterSignal,
shutdownClusterUpdate,
deleteJobUpdate,
getClusterStatusQuery,
} from '../workflows';
Expand Down Expand Up @@ -81,7 +82,7 @@ describe('cluster manager', function () {
);
assert.equal(queryResult.maxAssignedNodes, request1.numNodes + request2.numNodes);
// Terminate the workflow and check that workflow returns same value as obtained from last query.
await workflow.signal(shutdownClusterSignal);
assert.equal(await workflow.executeUpdate(shutdownClusterUpdate), true);
const wfResult = await workflow.result();
assert.deepEqual(wfResult, queryResult);
});
Expand Down
9 changes: 7 additions & 2 deletions message-passing/safe-message-handlers/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
export interface ClusterManagerState {
clusterStarted: boolean;
clusterShutdown: boolean;
clusterState: ClusterState;
nodes: Map<string, string | null>;
maxAssignedNodes: number;
}
Expand Down Expand Up @@ -28,3 +27,9 @@ export interface ClusterManagerWorkflowResult {
numCurrentlyAssignedNodes: number;
numBadNodes: number;
}

export enum ClusterState {
NOT_STARTED = 'NOT_STARTED',
STARTED = 'STARTED',
SHUTTING_DOWN = 'SHUTTING_DOWN',
}
12 changes: 8 additions & 4 deletions message-passing/safe-message-handlers/src/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import {
AssignNodesToJobUpdateInput,
ClusterManagerInput,
ClusterManagerStateSummary,
ClusterState,
DeleteJobUpdateInput,
} from './types';

export const startClusterSignal = wf.defineSignal('startCluster');
export const shutdownClusterSignal = wf.defineSignal('shutdownCluster');
export const shutdownClusterUpdate = wf.defineUpdate('shutdownCluster');
export const assignNodesToJobUpdate = wf.defineUpdate<ClusterManagerStateSummary, [AssignNodesToJobUpdateInput]>(
'allocateNodesToJob'
);
Expand All @@ -21,7 +22,7 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): P
// Message-handling API
//
wf.setHandler(startClusterSignal, () => manager.startCluster());
wf.setHandler(shutdownClusterSignal, () => manager.shutDownCluster());
wf.setHandler(shutdownClusterUpdate, () => manager.shutdownCluster());

// This is an update as opposed to a signal because the client may want to wait for nodes to be
// allocated before sending work to those nodes. Returns the array of node names that were
Expand Down Expand Up @@ -49,8 +50,11 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): P
// lies in the message-processing handlers implented in the ClusterManager class. The main
// workflow itself simply waits until the cluster is shutdown, or the workflow needs to
// continue-as-new.
await wf.condition(() => manager.state.clusterShutdown || wf.workflowInfo().continueAsNewSuggested);
if (!manager.state.clusterShutdown) {
await wf.condition(() => manager.state.clusterState === ClusterState.STARTED);
await wf.condition(
() => manager.state.clusterState === ClusterState.SHUTTING_DOWN || wf.workflowInfo().continueAsNewSuggested
);
if (manager.state.clusterState !== ClusterState.SHUTTING_DOWN) {
// You should typically wait for all async handlers to finish before
// completing a workflow or continuing as new. If the main workflow method
// is scheduling activities or child workflows, then you should typically
Expand Down
Loading