From 953ec820fd7044da3668f2c395a5fe3519ce86a5 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Thu, 12 Oct 2023 14:42:10 +0200 Subject: [PATCH] Log bucket processor task `RequestId` The requestId of the task is now logged, to allow listing all requests associated with a specific task. In addition, the requestId is added to the bucket topic entries, in order to allow following the chain of events. Issue: BB-439 --- .../lifecycle/conductor/LifecycleConductor.js | 11 +++++++---- extensions/lifecycle/tasks/LifecycleTask.js | 13 ++++++++++++- extensions/lifecycle/tasks/LifecycleTaskV2.js | 3 +++ .../functional/lifecycle/LifecycleConductor.spec.js | 6 ++++++ tests/unit/lifecycle/LifecycleConductor.spec.js | 2 +- tests/utils/BackbeatTestConsumer.js | 7 +++++++ 6 files changed, 36 insertions(+), 6 deletions(-) diff --git a/extensions/lifecycle/conductor/LifecycleConductor.js b/extensions/lifecycle/conductor/LifecycleConductor.js index 1f17c95ba..79b29ee77 100644 --- a/extensions/lifecycle/conductor/LifecycleConductor.js +++ b/extensions/lifecycle/conductor/LifecycleConductor.js @@ -263,10 +263,13 @@ class LifecycleConductor { }); } - _taskToMessage(task, taskVersion) { + _taskToMessage(task, taskVersion, log) { return { message: JSON.stringify({ action: 'processObjects', + contextInfo: { + reqId: log.getSerializedUids(), + }, target: { bucket: task.bucketName, owner: task.canonicalId, @@ -308,7 +311,7 @@ class LifecycleConductor { _createBucketTaskMessages(tasks, log, cb) { if (this.lcConfig.forceLegacyListing) { - return process.nextTick(cb, null, tasks.map(t => this._taskToMessage(t, lifecycleTaskVersions.v1))); + return process.nextTick(cb, null, tasks.map(t => this._taskToMessage(t, lifecycleTaskVersions.v1, log))); } return async.mapLimit(tasks, 10, (t, taskDone) => @@ -316,9 +319,9 @@ class LifecycleConductor { if (err) { // should not happen as indexes methods would // ignore the errors and fallback to v1 listing - return taskDone(null, this._taskToMessage(t, lifecycleTaskVersions.v1)); + return taskDone(null, this._taskToMessage(t, lifecycleTaskVersions.v1, log)); } - return taskDone(null, this._taskToMessage(t, taskVersion)); + return taskDone(null, this._taskToMessage(t, taskVersion, log)); }), cb); } diff --git a/extensions/lifecycle/tasks/LifecycleTask.js b/extensions/lifecycle/tasks/LifecycleTask.js index 7208d29c9..a0ad30714 100644 --- a/extensions/lifecycle/tasks/LifecycleTask.js +++ b/extensions/lifecycle/tasks/LifecycleTask.js @@ -213,6 +213,7 @@ class LifecycleTask extends BackbeatTask { } const entry = Object.assign({}, bucketData, { + contextInfo: { reqId: log.getSerializedUids() }, details: { marker }, }); this._sendBucketEntry(entry, err => { @@ -385,6 +386,9 @@ class LifecycleTask extends BackbeatTask { // Uses last version whether Version or DeleteMarker const last = allVersions[allVersions.length - 1]; const entry = Object.assign({}, bucketData, { + contextInfo: { + reqId: log.getSerializedUids(), + }, details: { keyMarker: data.NextKeyMarker, versionIdMarker: data.NextVersionIdMarker, @@ -468,6 +472,9 @@ class LifecycleTask extends BackbeatTask { // re-queue to kafka with `NextUploadIdMarker` & // `NextKeyMarker` only once. const entry = Object.assign({}, bucketData, { + contextInfo: { + reqId: log.getSerializedUids(), + }, details: { keyMarker: data.NextKeyMarker, uploadIdMarker: data.NextUploadIdMarker, @@ -1759,6 +1766,8 @@ class LifecycleTask extends BackbeatTask { log.debug('processing bucket entry', { bucket: bucketData.target.bucket, owner: bucketData.target.owner, + contextInfo: bucketData.contextInfo, + details: bucketData.details, }); // Initially, processing a Bucket entry should check mpu AND @@ -1826,10 +1835,12 @@ class LifecycleTask extends BackbeatTask { ], cb); }, ], err => { - this.log.info('finished processing task for bucket lifecycle', { + log.info('finished processing task for bucket lifecycle', { method: 'LifecycleTask.processBucketEntry', bucket: bucketData.target.bucket, owner: bucketData.target.owner, + contextInfo: bucketData.contextInfo, + details: bucketData.details, }); // An optimization is possible by only publishing when // finishing a complete bucket listing, let it aside for diff --git a/extensions/lifecycle/tasks/LifecycleTaskV2.js b/extensions/lifecycle/tasks/LifecycleTaskV2.js index 94a87319f..5eba2e86a 100644 --- a/extensions/lifecycle/tasks/LifecycleTaskV2.js +++ b/extensions/lifecycle/tasks/LifecycleTaskV2.js @@ -52,6 +52,7 @@ class LifecycleTaskV2 extends LifecycleTask { } = l; const entry = Object.assign({}, bucketData, { + contextInfo: { requestId: log.getSerializedUids() }, details: { beforeDate, prefix, listType, storageClass }, }); @@ -114,6 +115,7 @@ class LifecycleTaskV2 extends LifecycleTask { // re-queue truncated listing only once. if (isTruncated && nbRetries === 0) { const entry = Object.assign({}, bucketData, { + contextInfo: { requestId: log.getSerializedUids() }, details: { beforeDate: params.BeforeDate, prefix: params.Prefix, @@ -197,6 +199,7 @@ class LifecycleTaskV2 extends LifecycleTask { // re-queue truncated listing only once. if (isTruncated && nbRetries === 0) { const entry = Object.assign({}, bucketData, { + contextInfo: { requestId: log.getSerializedUids() }, details: { beforeDate: params.BeforeDate, prefix: params.Prefix, diff --git a/tests/functional/lifecycle/LifecycleConductor.spec.js b/tests/functional/lifecycle/LifecycleConductor.spec.js index d1b822be4..47ff158f0 100644 --- a/tests/functional/lifecycle/LifecycleConductor.spec.js +++ b/tests/functional/lifecycle/LifecycleConductor.spec.js @@ -41,6 +41,7 @@ const expected2Messages = [ { value: { action: 'processObjects', + contextInfo: { reqId: 'test-request-id' }, target: { bucket: 'bucket1', owner: 'owner1', taskVersion: 'v1' }, details: {}, }, @@ -48,6 +49,7 @@ const expected2Messages = [ { value: { action: 'processObjects', + contextInfo: { reqId: 'test-request-id' }, target: { bucket: 'bucket1-2', owner: 'owner1', taskVersion: 'v1' }, details: {}, }, @@ -58,6 +60,7 @@ const expected4Messages = [ { value: { action: 'processObjects', + contextInfo: { reqId: 'test-request-id' }, target: { bucket: 'bucket1', owner: 'owner1', taskVersion: 'v1' }, details: {}, }, @@ -65,6 +68,7 @@ const expected4Messages = [ { value: { action: 'processObjects', + contextInfo: { reqId: 'test-request-id' }, target: { bucket: 'bucket1-2', owner: 'owner1', taskVersion: 'v1' }, details: {}, }, @@ -72,6 +76,7 @@ const expected4Messages = [ { value: { action: 'processObjects', + contextInfo: { reqId: 'test-request-id' }, target: { bucket: 'bucket3', owner: 'owner3', taskVersion: 'v1' }, details: {}, }, @@ -79,6 +84,7 @@ const expected4Messages = [ { value: { action: 'processObjects', + contextInfo: { reqId: 'test-request-id' }, target: { bucket: 'bucket4', owner: 'owner4', taskVersion: 'v1' }, details: {}, }, diff --git a/tests/unit/lifecycle/LifecycleConductor.spec.js b/tests/unit/lifecycle/LifecycleConductor.spec.js index c8ccb408e..8f4b61c0e 100644 --- a/tests/unit/lifecycle/LifecycleConductor.spec.js +++ b/tests/unit/lifecycle/LifecycleConductor.spec.js @@ -50,7 +50,7 @@ describe('Lifecycle Conductor', () => { this.timeout(4000); // tests that `activeIndexingJobRetrieved` is not reset until the e it('should not reset `activeIndexingJobsRetrieved` while async operations are in progress', done => { - let order = []; + const order = []; conductor._mongodbClient = { getIndexingJobs: () => {} }; conductor._producer = { send: (msg, cb) => cb() }; diff --git a/tests/utils/BackbeatTestConsumer.js b/tests/utils/BackbeatTestConsumer.js index ea412237a..c5ec0869c 100644 --- a/tests/utils/BackbeatTestConsumer.js +++ b/tests/utils/BackbeatTestConsumer.js @@ -26,6 +26,13 @@ class BackbeatTestConsumer extends BackbeatConsumer { const parsedMsg = typeof expectedMsg.value === 'object' ? JSON.parse(message.value) : message.value.toString(); + if (typeof expectedMsg.value === 'object' && + expectedMsg.value.contextInfo?.reqId === 'test-request-id') { + // RequestId is generated randomly, we can't compare it: just check that it is + // present + assert(parsedMsg.contextInfo?.reqId, 'expected contextInfo.reqId field'); + parsedMsg.contextInfo.reqId = expectedMsg.value.contextInfo?.reqId; + } assert.deepStrictEqual( parsedMsg, expectedMsg.value, `unexpected message value ${parsedMsg}, ` +