Skip to content

Commit

Permalink
Feat/graceful shutdown (#7449)
Browse files Browse the repository at this point in the history
* skip delay time between task if stop method is executed

* skip sleep if stop method is executed

* add stop method to services

* add graceful shutdown to workers

* remove test code

* skip delay time if stop method is executed before task finish

* refactor test

* make sure to unlock after stop

* log warning message

* set stop timeout  to 10 seconds

* fix test case

* add namespace to logs on worker

---------

Co-authored-by: Joan Gallego Girona <[email protected]>
  • Loading branch information
Joao-vi and daneryl authored Nov 22, 2024
1 parent 6f44268 commit 372d653
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 47 deletions.
14 changes: 12 additions & 2 deletions app/api/log.v2/infrastructure/StandardLogger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,15 @@ class StandardLogger implements Logger {
}

const DefaultLogger = (writer = UwaziJSONWriter) => new StandardLogger(writer, getTenant());

export { StandardLogger, DefaultLogger };
const SystemLogger = (writer = UwaziJSONWriter) =>
new StandardLogger(writer, {
name: 'System Logger',
dbName: 'N/a',
activityLogs: 'N/a',
attachments: 'N/a',
customUploads: 'N/a',
indexName: 'N/a',
uploadedDocuments: 'N/a',
});

export { StandardLogger, DefaultLogger, SystemLogger };
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class InformationExtraction {
this.taskManager.subscribeToResults();
}

async stop() {
await this.taskManager.stop();
}

requestResults = async (message: InternalIXResultsMessage) => {
const response = await request.get(message.data_url);

Expand Down
4 changes: 4 additions & 0 deletions app/api/services/pdfsegmentation/PDFSegmentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class PDFSegmentation {
this.segmentationTaskManager.subscribeToResults();
}

async stop() {
await this.segmentationTaskManager.stop();
}

segmentOnePdf = async (
file: { filename: string; _id: ObjectIdSchema },
serviceUrl: string,
Expand Down
18 changes: 13 additions & 5 deletions app/api/services/tasksmanager/DistributedLoop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ export class DistributedLoop {

private host: string;

private stopDelayTimeBetweenTasks?: Function;

constructor(
lockName: string,
task: () => Promise<void>,
Expand Down Expand Up @@ -63,7 +65,11 @@ export class DistributedLoop {

async waitBetweenTasks(delay = this.delayTimeBetweenTasks) {
await new Promise(resolve => {
setTimeout(resolve, delay);
const timeout = setTimeout(resolve, delay);
this.stopDelayTimeBetweenTasks = () => {
resolve(undefined);
clearTimeout(timeout);
};
});
}

Expand All @@ -74,10 +80,13 @@ export class DistributedLoop {
handleError(error, { useContext: false });
}

await this.waitBetweenTasks();
if (!this.stopTask) {
await this.waitBetweenTasks();
}
}

async stop() {
if (this.stopDelayTimeBetweenTasks) this.stopDelayTimeBetweenTasks();
await new Promise(resolve => {
this.stopTask = resolve;
});
Expand All @@ -94,16 +103,15 @@ export class DistributedLoop {
);

if (this.stopTask) {
await lock.unlock();
this.stopTask();
return;
}

await this.runTask();
await lock.unlock();
} catch (error) {
if (error instanceof Error && error.name !== 'LockError') {
throw error;
}
if (error instanceof Error && error.name !== 'LockError') throw error;
}

// eslint-disable-next-line no-void
Expand Down
70 changes: 70 additions & 0 deletions app/api/services/tasksmanager/specs/distributedLoop.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as errorHelper from 'api/utils/handleError';
import Redis from 'redis';
import waitForExpect from 'wait-for-expect';
import { DistributedLoop } from '../DistributedLoop';

Expand Down Expand Up @@ -162,4 +163,73 @@ describe('DistributedLoopLock', () => {
finishTask();
await nodeTwo.stop();
});

it('when stop method is executed after task finish, it should skip delay time between tasks', async () => {
const sut = new DistributedLoop('skip_delay_time_2', task, {
delayTimeBetweenTasks: 100_000,
});

const waitBetweenTasksSpy = jest.spyOn(sut, 'waitBetweenTasks');

sut.start();
await waitForExpect(() => expect(task).toHaveBeenCalledTimes(1));

finishTask();
await sleepTime(25);
const stopPromise = sut.stop();

expect(waitBetweenTasksSpy).toHaveBeenCalled();
await expect(stopPromise).resolves.toBeUndefined();
});

test('when stop method is executed before a task finish, it should skip delay time between tasks', async () => {
const sut = new DistributedLoop('skip_delay_time_2', task, {
delayTimeBetweenTasks: 100_000,
});

const waitBetweenTasksSpy = jest.spyOn(sut, 'waitBetweenTasks');

sut.start();

await waitForExpect(() => expect(task).toHaveBeenCalledTimes(1));

const stopPromise = sut.stop();
finishTask();
await sleepTime(25);

expect(waitBetweenTasksSpy).not.toHaveBeenCalled();
await expect(stopPromise).resolves.toBeUndefined();
});

test('when stop method is executed, it should unlock the Distributed Loop', async () => {
const connectionConfig = { port: 6379, host: 'localhost' };
const connection = Redis.createClient(
`redis://${connectionConfig.host}:${connectionConfig.port}`
);
const get = key =>
new Promise((resolve, reject) => {
connection.get(key, (error, data) => {
if (error) reject(error);
resolve(data);
});
});

const lockName = 'skip_delay_time_3';
const sut = new DistributedLoop(lockName, task, {
delayTimeBetweenTasks: 100_000,
});

sut.start();
await waitForExpect(() => expect(task).toHaveBeenCalledTimes(1));

const stopPromise = sut.stop();

finishTask();
await sleepTime(25);
await expect(stopPromise).resolves.toBeUndefined();

const result = await get(`locks:${lockName}`);
expect(result).toBeFalsy();
connection.quit();
});
});
4 changes: 4 additions & 0 deletions app/api/services/twitterintegration/TwitterIntegration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class TwitterIntegration {
this.twitterTaskManager.subscribeToResults();
}

async stop() {
await this.twitterTaskManager.stop();
}

getTwitterIntegrationSettings = async (): Promise<TwitterIntegrationSettingsType> => {
const settingsValues = await settings.get({}, 'features');
if (!settingsValues.features || !settingsValues.features.twitterIntegration) {
Expand Down
24 changes: 18 additions & 6 deletions app/api/utils/Repeater.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,39 @@
const timeout = async interval =>
new Promise(resolve => {
setTimeout(resolve, interval);
});

export class Repeater {
stopSleep = undefined;

constructor(cb, interval) {
this.cb = cb;
this.interval = interval;
this.stopped = null;
}

async sleep() {
await new Promise(resolve => {
const timeout = setTimeout(resolve, this.interval);

this.stopSleep = () => {
resolve(undefined);
clearTimeout(timeout);
};
});
}

async start() {
while (!this.stopped) {
// eslint-disable-next-line no-await-in-loop
await this.cb();
// eslint-disable-next-line no-await-in-loop
await timeout(this.interval);
await this.sleep();
}

this.stopped();
}

async stop() {
if (this.stopSleep) {
this.stopSleep();
}

return new Promise(resolve => {
this.stopped = resolve;
});
Expand Down
17 changes: 17 additions & 0 deletions app/api/utils/specs/Repeater.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,21 @@ describe('Repeater', () => {

await expect(repeaterOne.stop()).resolves.toBeUndefined();
});

it('should skip interval between executions if stop method is executed', async () => {
let promise;
let resolvePromise;
const sut = new Repeater(() => {
promise = new Promise(resolve => {
resolvePromise = resolve;
});

return promise;
}, 10_000);

sut.start();
resolvePromise();
await expect(promise).resolves.toBeUndefined();
await expect(sut.stop()).resolves.toBeUndefined();
}, 5_000);
});
85 changes: 51 additions & 34 deletions app/worker.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable max-statements */
import { DB } from 'api/odm';
import { config } from 'api/config';
import { tenants } from 'api/tenants';
Expand All @@ -12,8 +13,12 @@ import { syncWorker } from 'api/sync/syncWorker';
import { InformationExtraction } from 'api/services/informationextraction/InformationExtraction';
import { setupWorkerSockets } from 'api/socketio/setupSockets';
import { ConvertToPdfWorker } from 'api/services/convertToPDF/ConvertToPdfWorker';
import { handleError } from './api/utils/handleError.js';
import { ATServiceListener } from 'api/externalIntegrations.v2/automaticTranslation/adapters/driving/ATServiceListener';
import { SystemLogger } from 'api/log.v2/infrastructure/StandardLogger';
import { sleep } from 'shared/tsUtils';
import { handleError } from './api/utils/handleError.js';

const systemLogger = SystemLogger();

let dbAuth = {};

Expand Down Expand Up @@ -41,55 +46,67 @@ DB.connect(config.DBHOST, dbAuth)
await tenants.run(async () => {
permissionsContext.setCommandContext();

console.info('==> 📡 starting external services...');
ocrManager.start();
new ATServiceListener().start();
new InformationExtraction().start();
systemLogger.info('[Worker] - ==> 📡 starting external services...');

new ConvertToPdfWorker().start();
const services: any[] = [
ocrManager,
new ATServiceListener(),
new InformationExtraction(),
new ConvertToPdfWorker(),
new DistributedLoop('preserve_integration', async () => preserveSync.syncAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}),
new DistributedLoop('toc_service', async () => tocService.processAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}),
new DistributedLoop('sync_job', async () => syncWorker.runAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 1000,
}),
];

const segmentationConnector = new PDFSegmentation();
segmentationConnector.start();

const segmentationRepeater = new DistributedLoop(
'segmentation_repeat',
segmentationConnector.segmentPdfs,
{ port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 5000 }
);

// eslint-disable-next-line no-void
void segmentationRepeater.start();
services.push(segmentationConnector, segmentationRepeater);

const twitterIntegration = new TwitterIntegration();
twitterIntegration.start();
const twitterRepeater = new DistributedLoop(
'twitter_repeat',
twitterIntegration.addTweetsRequestsToQueue,
{ port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 120000 }
);
services.push(twitterIntegration, twitterRepeater);

services.forEach(service => service.start());

process.on('SIGINT', async () => {
systemLogger.info(
'[Worker Graceful shutdown] - Received SIGINT, waiting for graceful stop...'
);

const stopPromises = Promise.all(services.map(async service => service.stop()));
const firstToFinish = await Promise.race([stopPromises, sleep(10_000)]);

if (Array.isArray(firstToFinish)) {
systemLogger.info('[Worker Graceful shutdown] - Services stopped successfully!');
} else {
systemLogger.info(
'[Worker Graceful shutdown] - Some services did not stop in time, initiating forceful shutdown...'
);
}

// eslint-disable-next-line no-void
void twitterRepeater.start();

// eslint-disable-next-line no-void
void new DistributedLoop('preserve_integration', async () => preserveSync.syncAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}).start();

// eslint-disable-next-line no-void
void new DistributedLoop('toc_service', async () => tocService.processAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 30000,
}).start();

// eslint-disable-next-line no-void
void new DistributedLoop('sync_job', async () => syncWorker.runAllTenants(), {
port: config.redis.port,
host: config.redis.host,
delayTimeBetweenTasks: 1000,
}).start();
process.exit(0);
});
});
})
.catch(error => {
Expand Down

0 comments on commit 372d653

Please sign in to comment.