Skip to content

Commit

Permalink
OS-693: do not detect mpu bucket deletes and consistency changes
Browse files Browse the repository at this point in the history
  • Loading branch information
williamlardier committed Oct 13, 2023
1 parent a009457 commit 9ce6f16
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
37 changes: 22 additions & 15 deletions CountItemsV2/CountItems.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class CountItems {

this.maxConcurrentBucketProcessing = config.maxConcurrentOperations || 10;
this.mongoDBSupportsPreImages = config.mongoDBSupportsPreImages || false;
this.lastModifiedLagSeconds = config.lastModifiedLagSeconds || 1;
this.lastModifiedLagSeconds = config.lastModifiedLagSeconds || 5;
this.refreshFrequencySeconds = config.refreshFrequencySeconds || 86400;
this.sleepDurationSecondsBetweenRounds = config.sleepDurationSecondsBetweenRounds || 2;

Expand Down Expand Up @@ -203,7 +203,8 @@ class CountItems {
await this.setCheckPoints();
// then compute all metrics and save them
await this.aggregateResults();
this.log.info(`Round completed in ${process.hrtime(startTime)[0]}s. Restarting in 2 seconds...`);
// TODO save the current pool progress in Redis with a TTL equal to this.refreshFrequencySeconds
this.log.info(`Round completed in ${process.hrtime(startTime)[0]}s. Restarting in ${this.sleepDurationSecondsBetweenRounds * 1000} seconds...`);
// Sleep between two round to avoid overloading the cluster
await new Promise(r => setTimeout(r, this.sleepDurationSecondsBetweenRounds * 1000));
// Periodically flush all data according to the configuration
Expand Down Expand Up @@ -338,16 +339,17 @@ class CountItems {
* @returns {Promise} - resolves to the checkpoint value
*/
getCheckpoint(bucketName) {
this.log.info(`Getting checkpoint for bucket ${bucketName}.`);
const bucketNameForCheckpoint = bucketName.split('_')[0];
this.log.info(`Getting checkpoint for bucket ${bucketNameForCheckpoint}.`);
const collection = this.db.getCollection(METASTORE_COLLECTION);
// find the document whose _id matches the bucket name
// and get the propery `metrics_checkpoint` as a date string
return new Promise((resolve, reject) => collection.findOne({ _id: bucketName }, (err, doc) => {
return new Promise((resolve, reject) => collection.findOne({ _id: bucketNameForCheckpoint }, (err, doc) => {
if (err) {
// by default, we restart from scratch, in case of error
this.log.error('Error while retrieving checkpoint', {
error: err,
bucketName,
bucketName: bucketNameForCheckpoint,
});
return resolve(0);
}
Expand Down Expand Up @@ -426,6 +428,7 @@ class CountItems {
let lastSyncedTimestamp = new Date();
lastSyncedTimestamp.setSeconds(lastSyncedTimestamp.getSeconds() - this.lastModifiedLagSeconds);
lastSyncedTimestamp = lastSyncedTimestamp.toISOString();
const bucketNameForCheckpoint = bucketName.split('_')[0];

const result = await new Promise((resolve, reject) => {
// Step 1: Setup collection and checkpoint
Expand All @@ -437,7 +440,7 @@ class CountItems {
});
return reject(new Error('Bucket not found in pool'));
}
const collection = this.db.getCollection(bucketName.split('_')[0]);
const collection = this.db.getCollection(bucketNameForCheckpoint);

// Step 2: Set the aggregation filter
let filter = {
Expand Down Expand Up @@ -535,7 +538,7 @@ class CountItems {
_id: {
$ifNull: [
{ $ifNull: ['$coldLocation', '$locationsNames'] },
`${INTERNAL_NULL_LOCATION}_${'$dataStoreName'}`,
{ $concat: [INTERNAL_NULL_LOCATION, '_', '$dataStoreName'] },
],
},
masterData: { $sum: { $multiply: ['$isMaster', '$contentLength'] } },
Expand Down Expand Up @@ -567,7 +570,7 @@ class CountItems {
metrics[_metricsForLocation._id].deleteMarkerCount = _metricsForLocation.deleteMarkerCount || 0;
});

this.bulkedCheckpoints[bucketName] = lastSyncedTimestamp;
this.bulkedCheckpoints[bucketNameForCheckpoint] = lastSyncedTimestamp;

return new Promise(resolve => resolve({
accountName,
Expand Down Expand Up @@ -690,7 +693,13 @@ class CountItems {
if (!Object.hasOwn(result.metrics, location)) {
continue;
}
const objectId = locationConfig[location] ? locationConfig[location].objectId : null;
let realLocation = location;
// This code is not (yet) useful but helps detecting delete markers, if custom
// logic is needed here
if (location.startsWith(INTERNAL_NULL_LOCATION)) {
realLocation = location.replace(`${INTERNAL_NULL_LOCATION}_`, '');
}
const objectId = locationConfig[realLocation] ? locationConfig[realLocation].objectId : null;
// No location config must be ignored
if (!objectId && !location.startsWith(INTERNAL_NULL_LOCATION)) {
this.log.warn('No location config found for location', {
Expand All @@ -699,12 +708,6 @@ class CountItems {
});
continue;
}
let realLocation = location;
// This code is not (yet) useful but helps detecting delete markers, if custom
// logic is needed here
if (location.startsWith(INTERNAL_NULL_LOCATION)) {
realLocation = location.replace(`${INTERNAL_NULL_LOCATION}_`, '');
}
// Initialize metrics object for objectId if it doesn't exist
if (!this.pool[bucketName].metrics[objectId]) {
this.pool[bucketName].metrics[objectId] = {};
Expand Down Expand Up @@ -858,6 +861,10 @@ class CountItems {
this.log.debug('Change stream event', {
change,
});
if (change.ns.coll && change.ns.coll.startsWith('mpuShadowBucket')) {
// ignore multipart uploads
return;
}
// find the pool entry that is starting with change.ns.coll_
const bucketNameForMetrics = Object.keys(this.pool).find(bucketName => bucketName.startsWith(`${change.ns.coll}_`));
// ignore unknown buckets: they are yet to be processed
Expand Down
2 changes: 1 addition & 1 deletion CountItemsV2/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const config = {
maxRetries: MAX_CONNECT_RETRIES,
maxConcurrentOperations: MAX_CONCURRENT_OPERATIONS,
mongoDBSupportsPreImages: process.env.MONGODB_SUPPORTS_PREIMAGES === 'true',
lastModifiedLagSeconds: Number.parseInt(process.env.LAST_MODIFIED_LAG_SECONDS, 10) || 0,
lastModifiedLagSeconds: Number.parseInt(process.env.LAST_MODIFIED_LAG_SECONDS, 10) || 5,
refreshFrequencySeconds: Number.parseInt(process.env.REFRESH_FREQUENCY_SECONDS, 10) || 86400,
sleepDurationSecondsBetweenRounds: Number.parseInt(process.env.SLEEP_DURATION_SECONDS_BETWEEN_ROUNDS, 10) || 2,
};
Expand Down

0 comments on commit 9ce6f16

Please sign in to comment.