Skip to content

Commit

Permalink
Log bucket processor task RequestId
Browse files Browse the repository at this point in the history
The requestId of the task is now logged, to allow listing all requests
associated with a specific task.

In addition, the requestId is added to the bucket topic entries, in
order to allow following the chain of events.

Issue: BB-439
  • Loading branch information
francoisferrand committed Oct 13, 2023
1 parent b7a55bb commit 953ec82
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 6 deletions.
11 changes: 7 additions & 4 deletions extensions/lifecycle/conductor/LifecycleConductor.js
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,13 @@ class LifecycleConductor {
});
}

_taskToMessage(task, taskVersion) {
_taskToMessage(task, taskVersion, log) {
return {
message: JSON.stringify({
action: 'processObjects',
contextInfo: {
reqId: log.getSerializedUids(),
},
target: {
bucket: task.bucketName,
owner: task.canonicalId,
Expand Down Expand Up @@ -308,17 +311,17 @@ class LifecycleConductor {

_createBucketTaskMessages(tasks, log, cb) {
if (this.lcConfig.forceLegacyListing) {
return process.nextTick(cb, null, tasks.map(t => this._taskToMessage(t, lifecycleTaskVersions.v1)));
return process.nextTick(cb, null, tasks.map(t => this._taskToMessage(t, lifecycleTaskVersions.v1, log)));
}

return async.mapLimit(tasks, 10, (t, taskDone) =>
this._indexesGetOrCreate(t, log, (err, taskVersion) => {
if (err) {
// should not happen as indexes methods would
// ignore the errors and fallback to v1 listing
return taskDone(null, this._taskToMessage(t, lifecycleTaskVersions.v1));
return taskDone(null, this._taskToMessage(t, lifecycleTaskVersions.v1, log));
}
return taskDone(null, this._taskToMessage(t, taskVersion));
return taskDone(null, this._taskToMessage(t, taskVersion, log));
}), cb);
}

Expand Down
13 changes: 12 additions & 1 deletion extensions/lifecycle/tasks/LifecycleTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class LifecycleTask extends BackbeatTask {
}

const entry = Object.assign({}, bucketData, {
contextInfo: { reqId: log.getSerializedUids() },
details: { marker },
});
this._sendBucketEntry(entry, err => {
Expand Down Expand Up @@ -385,6 +386,9 @@ class LifecycleTask extends BackbeatTask {
// Uses last version whether Version or DeleteMarker
const last = allVersions[allVersions.length - 1];
const entry = Object.assign({}, bucketData, {
contextInfo: {
reqId: log.getSerializedUids(),
},
details: {
keyMarker: data.NextKeyMarker,
versionIdMarker: data.NextVersionIdMarker,
Expand Down Expand Up @@ -468,6 +472,9 @@ class LifecycleTask extends BackbeatTask {
// re-queue to kafka with `NextUploadIdMarker` &
// `NextKeyMarker` only once.
const entry = Object.assign({}, bucketData, {
contextInfo: {
reqId: log.getSerializedUids(),
},
details: {
keyMarker: data.NextKeyMarker,
uploadIdMarker: data.NextUploadIdMarker,
Expand Down Expand Up @@ -1759,6 +1766,8 @@ class LifecycleTask extends BackbeatTask {
log.debug('processing bucket entry', {
bucket: bucketData.target.bucket,
owner: bucketData.target.owner,
contextInfo: bucketData.contextInfo,
details: bucketData.details,
});

// Initially, processing a Bucket entry should check mpu AND
Expand Down Expand Up @@ -1826,10 +1835,12 @@ class LifecycleTask extends BackbeatTask {
], cb);
},
], err => {
this.log.info('finished processing task for bucket lifecycle', {
log.info('finished processing task for bucket lifecycle', {
method: 'LifecycleTask.processBucketEntry',
bucket: bucketData.target.bucket,
owner: bucketData.target.owner,
contextInfo: bucketData.contextInfo,
details: bucketData.details,
});
// An optimization is possible by only publishing when
// finishing a complete bucket listing, let it aside for
Expand Down
3 changes: 3 additions & 0 deletions extensions/lifecycle/tasks/LifecycleTaskV2.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class LifecycleTaskV2 extends LifecycleTask {
} = l;

const entry = Object.assign({}, bucketData, {
contextInfo: { requestId: log.getSerializedUids() },
details: { beforeDate, prefix, listType, storageClass },
});

Expand Down Expand Up @@ -114,6 +115,7 @@ class LifecycleTaskV2 extends LifecycleTask {
// re-queue truncated listing only once.
if (isTruncated && nbRetries === 0) {
const entry = Object.assign({}, bucketData, {
contextInfo: { requestId: log.getSerializedUids() },
details: {
beforeDate: params.BeforeDate,
prefix: params.Prefix,
Expand Down Expand Up @@ -197,6 +199,7 @@ class LifecycleTaskV2 extends LifecycleTask {
// re-queue truncated listing only once.
if (isTruncated && nbRetries === 0) {
const entry = Object.assign({}, bucketData, {
contextInfo: { requestId: log.getSerializedUids() },
details: {
beforeDate: params.BeforeDate,
prefix: params.Prefix,
Expand Down
6 changes: 6 additions & 0 deletions tests/functional/lifecycle/LifecycleConductor.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ const expected2Messages = [
{
value: {
action: 'processObjects',
contextInfo: { reqId: 'test-request-id' },
target: { bucket: 'bucket1', owner: 'owner1', taskVersion: 'v1' },
details: {},
},
},
{
value: {
action: 'processObjects',
contextInfo: { reqId: 'test-request-id' },
target: { bucket: 'bucket1-2', owner: 'owner1', taskVersion: 'v1' },
details: {},
},
Expand All @@ -58,27 +60,31 @@ const expected4Messages = [
{
value: {
action: 'processObjects',
contextInfo: { reqId: 'test-request-id' },
target: { bucket: 'bucket1', owner: 'owner1', taskVersion: 'v1' },
details: {},
},
},
{
value: {
action: 'processObjects',
contextInfo: { reqId: 'test-request-id' },
target: { bucket: 'bucket1-2', owner: 'owner1', taskVersion: 'v1' },
details: {},
},
},
{
value: {
action: 'processObjects',
contextInfo: { reqId: 'test-request-id' },
target: { bucket: 'bucket3', owner: 'owner3', taskVersion: 'v1' },
details: {},
},
},
{
value: {
action: 'processObjects',
contextInfo: { reqId: 'test-request-id' },
target: { bucket: 'bucket4', owner: 'owner4', taskVersion: 'v1' },
details: {},
},
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/lifecycle/LifecycleConductor.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ describe('Lifecycle Conductor', () => {
this.timeout(4000);
// tests that `activeIndexingJobRetrieved` is not reset until the e
it('should not reset `activeIndexingJobsRetrieved` while async operations are in progress', done => {
let order = [];
const order = [];

conductor._mongodbClient = { getIndexingJobs: () => {} };
conductor._producer = { send: (msg, cb) => cb() };
Expand Down
7 changes: 7 additions & 0 deletions tests/utils/BackbeatTestConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ class BackbeatTestConsumer extends BackbeatConsumer {
const parsedMsg = typeof expectedMsg.value === 'object' ?
JSON.parse(message.value) :
message.value.toString();
if (typeof expectedMsg.value === 'object' &&
expectedMsg.value.contextInfo?.reqId === 'test-request-id') {
// RequestId is generated randomly, we can't compare it: just check that it is
// present
assert(parsedMsg.contextInfo?.reqId, 'expected contextInfo.reqId field');
parsedMsg.contextInfo.reqId = expectedMsg.value.contextInfo?.reqId;
}
assert.deepStrictEqual(
parsedMsg, expectedMsg.value,
`unexpected message value ${parsedMsg}, ` +
Expand Down

0 comments on commit 953ec82

Please sign in to comment.