Skip to content

Commit

Permalink
refactor(core): Centralize CronJob management (n8n-io#10033)
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy authored Jul 16, 2024
1 parent 36b314d commit 09f2cf9
Show file tree
Hide file tree
Showing 18 changed files with 725 additions and 424 deletions.
3 changes: 1 addition & 2 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
"devDependencies": {
"@types/aws4": "^1.5.1",
"@types/concat-stream": "^2.0.0",
"@types/cron": "~1.7.1",
"@types/express": "^4.17.21",
"@types/lodash": "^4.14.195",
"@types/mime-types": "^2.1.0",
Expand All @@ -40,7 +39,7 @@
"aws4": "1.11.0",
"axios": "1.6.7",
"concat-stream": "2.0.0",
"cron": "1.7.2",
"cron": "3.1.7",
"fast-glob": "3.2.12",
"file-type": "16.5.4",
"form-data": "4.0.0",
Expand Down
65 changes: 19 additions & 46 deletions packages/core/src/ActiveWorkflows.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { Service } from 'typedi';
import { CronJob } from 'cron';

import type {
IGetExecutePollFunctions,
IGetExecuteTriggerFunctions,
INode,
IPollResponse,
ITriggerResponse,
IWorkflowExecuteAdditionalData,
TriggerTime,
Expand All @@ -23,10 +21,13 @@ import {
WorkflowDeactivationError,
} from 'n8n-workflow';

import { ScheduledTaskManager } from './ScheduledTaskManager';
import type { IWorkflowData } from './Interfaces';

@Service()
export class ActiveWorkflows {
constructor(private readonly scheduledTaskManager: ScheduledTaskManager) {}

private activeWorkflows: { [workflowId: string]: IWorkflowData } = {};

/**
Expand Down Expand Up @@ -102,20 +103,15 @@ export class ActiveWorkflows {

if (pollingNodes.length === 0) return;

this.activeWorkflows[workflowId].pollResponses = [];

for (const pollNode of pollingNodes) {
try {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
this.activeWorkflows[workflowId].pollResponses!.push(
await this.activatePolling(
pollNode,
workflow,
additionalData,
getPollFunctions,
mode,
activation,
),
await this.activatePolling(
pollNode,
workflow,
additionalData,
getPollFunctions,
mode,
activation,
);
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
Expand All @@ -138,7 +134,7 @@ export class ActiveWorkflows {
getPollFunctions: IGetExecutePollFunctions,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
): Promise<IPollResponse> {
): Promise<void> {
const pollFunctions = getPollFunctions(workflow, node, additionalData, mode, activation);

const pollTimes = pollFunctions.getNodeParameter('pollTimes') as unknown as {
Expand All @@ -161,7 +157,7 @@ export class ActiveWorkflows {
pollFunctions.__emit(pollResponse);
}
} catch (error) {
// If the poll function failes in the first activation
// If the poll function fails in the first activation
// throw the error back so we let the user know there is
// an issue with the trigger.
if (testingTrigger) {
Expand All @@ -174,11 +170,6 @@ export class ActiveWorkflows {
// Execute the trigger directly to be able to know if it works
await executeTrigger(true);

const timezone = pollFunctions.getTimezone();

// Start the cron-jobs
const cronJobs: CronJob[] = [];

for (const cronTime of cronTimes) {
const cronTimeParts = cronTime.split(' ');
if (cronTimeParts.length > 0 && cronTimeParts[0].includes('*')) {
Expand All @@ -187,19 +178,8 @@ export class ActiveWorkflows {
);
}

cronJobs.push(new CronJob(cronTime, executeTrigger, undefined, true, timezone));
}

// Stop the cron-jobs
async function closeFunction() {
for (const cronJob of cronJobs) {
cronJob.stop();
}
this.scheduledTaskManager.registerCron(workflow, cronTime, executeTrigger);
}

return {
closeFunction,
};
}

/**
Expand All @@ -211,14 +191,11 @@ export class ActiveWorkflows {
return false;
}

const w = this.activeWorkflows[workflowId];
this.scheduledTaskManager.deregisterCrons(workflowId);

const w = this.activeWorkflows[workflowId];
for (const r of w.triggerResponses ?? []) {
await this.close(r, workflowId, 'trigger');
}

for (const r of w.pollResponses ?? []) {
await this.close(r, workflowId, 'poller');
await this.closeTrigger(r, workflowId);
}

delete this.activeWorkflows[workflowId];
Expand All @@ -232,11 +209,7 @@ export class ActiveWorkflows {
}
}

private async close(
response: ITriggerResponse | IPollResponse,
workflowId: string,
target: 'trigger' | 'poller',
) {
private async closeTrigger(response: ITriggerResponse, workflowId: string) {
if (!response.closeFunction) return;

try {
Expand All @@ -246,14 +219,14 @@ export class ActiveWorkflows {
Logger.error(
`There was a problem calling "closeFunction" on "${e.node.name}" in workflow "${workflowId}"`,
);
ErrorReporter.error(e, { extra: { target, workflowId } });
ErrorReporter.error(e, { extra: { workflowId } });
return;
}

const error = e instanceof Error ? e : new Error(`${e}`);

throw new WorkflowDeactivationError(
`Failed to deactivate ${target} of workflow ID "${workflowId}": "${error.message}"`,
`Failed to deactivate trigger of workflow ID "${workflowId}": "${error.message}"`,
{ cause: error, workflowId },
);
}
Expand Down
2 changes: 0 additions & 2 deletions packages/core/src/Interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type {
IPollResponse,
ITriggerResponse,
IWorkflowSettings as IWorkflowSettingsWorkflow,
ValidationResult,
Expand All @@ -18,7 +17,6 @@ export interface IWorkflowSettings extends IWorkflowSettingsWorkflow {
}

export interface IWorkflowData {
pollResponses?: IPollResponse[];
triggerResponses?: ITriggerResponse[];
}

Expand Down
22 changes: 13 additions & 9 deletions packages/core/src/NodeExecuteFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ import type {
INodeParameters,
EnsureTypeOptions,
SSHTunnelFunctions,
SchedulingFunctions,
} from 'n8n-workflow';
import {
ExpressionError,
Expand All @@ -114,7 +115,6 @@ import {
createDeferredPromise,
deepCopy,
fileTypeFromMimeType,
getGlobalState,
isObjectEmpty,
isResourceMapperValue,
validateFieldType,
Expand Down Expand Up @@ -157,6 +157,7 @@ import Container from 'typedi';
import type { BinaryData } from './BinaryData/types';
import merge from 'lodash/merge';
import { InstanceSettings } from './InstanceSettings';
import { ScheduledTaskManager } from './ScheduledTaskManager';
import { SSHClientsManager } from './SSHClientsManager';
import { binaryToBuffer } from './BinaryData/utils';

Expand Down Expand Up @@ -2585,13 +2586,6 @@ export function getNodeWebhookUrl(
return NodeHelpers.getNodeWebhookUrl(baseUrl, workflow.id, node, path.toString(), isFullPath);
}

/**
* Returns the timezone for the workflow
*/
export function getTimezone(workflow: Workflow): string {
return workflow.settings.timezone ?? getGlobalState().defaultTimezone;
}

/**
* Returns the full webhook description of the webhook with the given name
*
Expand Down Expand Up @@ -2957,7 +2951,7 @@ const getCommonWorkflowFunctions = (
getRestApiUrl: () => additionalData.restApiUrl,
getInstanceBaseUrl: () => additionalData.instanceBaseUrl,
getInstanceId: () => Container.get(InstanceSettings).instanceId,
getTimezone: () => getTimezone(workflow),
getTimezone: () => workflow.timezone,
getCredentialsProperties: (type: string) =>
additionalData.credentialsHelper.getCredentialsProperties(type),
prepareOutputData: async (outputData) => [outputData],
Expand Down Expand Up @@ -3286,6 +3280,14 @@ const getSSHTunnelFunctions = (): SSHTunnelFunctions => ({
await Container.get(SSHClientsManager).getClient(credentials),
});

const getSchedulingFunctions = (workflow: Workflow): SchedulingFunctions => {
const scheduledTaskManager = Container.get(ScheduledTaskManager);
return {
registerCron: (cronExpression, onTick) =>
scheduledTaskManager.registerCron(workflow, cronExpression, onTick),
};
};

const getAllowedPaths = () => {
const restrictFileAccessTo = process.env[RESTRICT_FILE_ACCESS_TO];
if (!restrictFileAccessTo) {
Expand Down Expand Up @@ -3489,6 +3491,7 @@ export function getExecutePollFunctions(
createDeferredPromise,
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
...getSchedulingFunctions(workflow),
returnJsonArray,
},
};
Expand Down Expand Up @@ -3553,6 +3556,7 @@ export function getExecuteTriggerFunctions(
...getSSHTunnelFunctions(),
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
...getSchedulingFunctions(workflow),
returnJsonArray,
},
};
Expand Down
31 changes: 31 additions & 0 deletions packages/core/src/ScheduledTaskManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Service } from 'typedi';
import { CronJob } from 'cron';
import type { CronExpression, Workflow } from 'n8n-workflow';

@Service()
export class ScheduledTaskManager {
readonly cronJobs = new Map<string, CronJob[]>();

registerCron(workflow: Workflow, cronExpression: CronExpression, onTick: () => void) {
const cronJob = new CronJob(cronExpression, onTick, undefined, true, workflow.timezone);
const cronJobsForWorkflow = this.cronJobs.get(workflow.id);
if (cronJobsForWorkflow) {
cronJobsForWorkflow.push(cronJob);
} else {
this.cronJobs.set(workflow.id, [cronJob]);
}
}

deregisterCrons(workflowId: string) {
const cronJobs = this.cronJobs.get(workflowId) ?? [];
for (const cronJob of cronJobs) {
cronJob.stop();
}
}

deregisterAllCrons() {
for (const workflowId of Object.keys(this.cronJobs)) {
this.deregisterCrons(workflowId);
}
}
}
54 changes: 54 additions & 0 deletions packages/core/test/ScheduledTaskManager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import type { Workflow } from 'n8n-workflow';
import { mock } from 'jest-mock-extended';

import { ScheduledTaskManager } from '@/ScheduledTaskManager';

describe('ScheduledTaskManager', () => {
const workflow = mock<Workflow>({ timezone: 'GMT' });
const everyMinute = '0 * * * * *';
const onTick = jest.fn();

let scheduledTaskManager: ScheduledTaskManager;

beforeEach(() => {
jest.clearAllMocks();
jest.useFakeTimers();
scheduledTaskManager = new ScheduledTaskManager();
});

it('should throw when workflow timezone is invalid', () => {
expect(() =>
scheduledTaskManager.registerCron(
mock<Workflow>({ timezone: 'somewhere' }),
everyMinute,
onTick,
),
).toThrow('Invalid timezone.');
});

it('should throw when cron expression is invalid', () => {
expect(() =>
//@ts-expect-error invalid cron expression is a type-error
scheduledTaskManager.registerCron(workflow, 'invalid-cron-expression', onTick),
).toThrow();
});

it('should register valid CronJobs', async () => {
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);

expect(onTick).not.toHaveBeenCalled();
jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes
expect(onTick).toHaveBeenCalledTimes(10);
});

it('should deregister CronJobs for a workflow', async () => {
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
scheduledTaskManager.deregisterCrons(workflow.id);

expect(onTick).not.toHaveBeenCalled();
jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes
expect(onTick).not.toHaveBeenCalled();
});
});
24 changes: 3 additions & 21 deletions packages/nodes-base/nodes/Cron/Cron.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import type {
} from 'n8n-workflow';
import { NodeHelpers, toCronExpression } from 'n8n-workflow';

import { CronJob } from 'cron';

export class Cron implements INodeType {
description: INodeTypeDescription = {
displayName: 'Cron',
Expand Down Expand Up @@ -66,27 +64,11 @@ export class Cron implements INodeType {
this.emit([this.helpers.returnJsonArray([{}])]);
};

const timezone = this.getTimezone();

// Start the cron-jobs
const cronJobs = cronTimes.map(
(cronTime) => new CronJob(cronTime, executeTrigger, undefined, true, timezone),
);

// Stop the cron-jobs
async function closeFunction() {
for (const cronJob of cronJobs) {
cronJob.stop();
}
}

async function manualTriggerFunction() {
executeTrigger();
}
// Register the cron-jobs
cronTimes.forEach((cronTime) => this.helpers.registerCron(cronTime, executeTrigger));

return {
closeFunction,
manualTriggerFunction,
manualTriggerFunction: async () => executeTrigger(),
};
}
}
Loading

0 comments on commit 09f2cf9

Please sign in to comment.