From 60d5798e04b2bf8eb4bdf6a06a9390810084b914 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Thu, 26 Dec 2024 15:14:28 +0100 Subject: [PATCH] Make OOB logic safer Instead of rebuilding the whole metadata on updates, simply update the required fields, i.e. tags. This also fixes subtle issues, where some fields from Zenko may be overwritten when some operations are performed on s3c. Issue: BB-590 --- .../mongoProcessor/MongoQueueProcessor.js | 61 ++------- .../ingestion/MongoQueueProcessor.js | 129 ++++++++++-------- 2 files changed, 85 insertions(+), 105 deletions(-) diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 7adcd9135..593a6291a 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -233,7 +233,7 @@ class MongoQueueProcessor { /** * get dataStoreVersionId, if exists - * @param {Object} objMd - object md fetched from mongo + * @param {ObjectMDData} objMd - object md fetched from mongo * @param {String} site - storage location name * @return {String} dataStoreVersionId */ @@ -508,46 +508,6 @@ class MongoQueueProcessor { return done(err); } - // If the object has `x-amz-meta-scal-version-id`, we need to use it instead of the id. - // This should only happen for objects restored onto the OOB location, and the location - // should match in that case - if (scalVersionId) { - if (!zenkoObjMd) { - this.logger.warn('missing source entry, ignoring x-amz-meta-scal-version-id', { - method: 'MongoQueueProcessor._processObjectQueueEntry', - location, - }); - // This may happen if the object has been deleted from Zenko, but we processed - // the create/update event from oplog after the object was deleted. We can - // proceed normally and create the entry, as we will get a followup "delete" - // operation and will thus eventually be consistent. - } else if (zenkoObjMd.location?.length !== 1 || - zenkoObjMd.location[0].dataStoreName !== location || - zenkoObjMd.location[0].dataStoreVersionId !== sourceEntry.getVersionId()) { - this.logger.warn('mismatched source entry, skipping entry', { - method: 'MongoQueueProcessor._processObjectQueueEntry', - location, - }); - - // If the versionId does not match, it mean the object metadata has been updated - // in Zenko already, and we are thus processing an outdated oplog entry: which - // should be ignored. Not much of an issue though, as we have retrieved Zenko - // object, so `getContentType()` will pickup tag changes only. - } else { - this.logger.info('restored oob object', { - bucket, key, scalVersionId, zenkoObjMd, sourceEntry - }); - - sourceEntry.setVersionId(scalVersionId); - - // TODO: do we need to update the (mongo) metadata in that case??? - // - This may happen if object is re-tagged while restored? - // - Need to cleanup scal version id: delete objVal['x-amz-meta-scal-version-id']; - // - Need to keep the archive & restore fields in the metadata - return done(); - } - } - const content = getContentType(sourceEntry, zenkoObjMd); if (content.length === 0) { this._normalizePendingMetric(location); @@ -560,11 +520,20 @@ class MongoQueueProcessor { return done(); } - // update necessary metadata fields before saving to Zenko MongoDB - this._updateOwnerMD(sourceEntry, bucketInfo); - this._updateObjectDataStoreName(sourceEntry, location); - this._updateLocations(sourceEntry, location); - this._updateAcl(sourceEntry); + if (zenkoObjMd) { + // Keep existing metadata fields, only need to update the tags + const tags = sourceEntry.getTags(); + sourceEntry._data = { ...zenkoObjMd }; // eslint-disable-line no-param-reassign + sourceEntry.setTags(tags); + } else { + // Update necessary metadata fields before saving to Zenko MongoDB + this._updateOwnerMD(sourceEntry, bucketInfo); + this._updateObjectDataStoreName(sourceEntry, location); + this._updateLocations(sourceEntry, location); + this._updateAcl(sourceEntry); + } + + // Try to update replication info, if applicable this._updateReplicationInfo(sourceEntry, bucketInfo, content, zenkoObjMd); diff --git a/tests/functional/ingestion/MongoQueueProcessor.js b/tests/functional/ingestion/MongoQueueProcessor.js index e78858b4d..2c4ec48a3 100644 --- a/tests/functional/ingestion/MongoQueueProcessor.js +++ b/tests/functional/ingestion/MongoQueueProcessor.js @@ -16,6 +16,7 @@ const authdata = require('../../../conf/authdata.json'); const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); const DeleteOpQueueEntry = require('../../../lib/models/DeleteOpQueueEntry'); const fakeLogger = require('../../utils/fakeLogger'); +const { ObjectMDArchive } = require('arsenal/build/lib/models'); const kafkaConfig = config.kafka; const mongoProcessorConfig = config.extensions.mongoProcessor; @@ -31,6 +32,12 @@ const VERSION_ID = '98445230573829999999RG001 15.144.0'; // new version id > existing version id const NEW_VERSION_ID = '98445235075994999999RG001 14.90.2'; +const mockArchive = new ObjectMDArchive( + { archiveId: '123456789' }, + Date.now() - 3600 * 1000, 1, + Date.now(), Date.now() + 23 * 3600 * 1000, +); + const mockReplicationInfo = { role: 'arn:aws:iam::root:role/s3-replication-role', destination: `arn:aws:s3:::${BUCKET}`, @@ -712,15 +719,29 @@ describe('MongoQueueProcessor', function mqp() { }); }); - it('should save to mongo a new version entry when scal-version-id does not match the data location', done => { + it('should not update restored entry', done => { const versionKey = `${KEY}${VID_SEP}${NEW_VERSION_ID}`; const objmd = new ObjectMD() .setAcl() .setKey(KEY) - .setVersionId(NEW_VERSION_ID); - const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd) + .setVersionId(NEW_VERSION_ID) .setUserMetadata({ 'x-amz-meta-scal-version-id': encode(VERSION_ID) }); - const getObject = sinon.stub(mongoClient, 'getObject').callThrough(); + const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); + const getObject = sinon.stub(mongoClient, 'getObject').yields(null, + new ObjectMD() + .setKey(KEY) + .setVersionId(VERSION_ID) + .setDataStoreName(LOCATION) + .setAmzStorageClass('cold') + .setArchive(mockArchive) + .setLocation([{ + key: KEY, + start: 0, + size: 50, + dataStoreName: LOCATION, + dataStoreVersionId: NEW_VERSION_ID, + }])._data + ); async.waterfall([ next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, next), @@ -732,73 +753,42 @@ describe('MongoQueueProcessor', function mqp() { assert.ifError(err); sinon.assert.calledOnce(getObject); + assert.strictEqual(getObject.getCall(0).args[0], BUCKET); + assert.strictEqual(getObject.getCall(0).args[1], KEY); + assert.strictEqual(getObject.getCall(0).args[2].versionId, VERSION_ID); const added = mqp.getAdded(); - assert.strictEqual(added.length, 1); - const objVal = added[0].objVal; - assert.strictEqual(added[0].key, versionKey); - // key shall now be always populated - assert.deepStrictEqual(objVal.key, KEY); - // acl should reset - assert.deepStrictEqual(objVal.acl, new ObjectMD().getAcl()); - // owner md should update - assert.strictEqual(objVal['owner-display-name'], - authdata.accounts[0].name); - assert.strictEqual(objVal['owner-id'], - authdata.accounts[0].canonicalID); - // dataStoreName should update - assert.strictEqual(objVal.dataStoreName, LOCATION); - // locations should update, no data in object - assert.strictEqual(objVal.location.length, 1); - const loc = objVal.location[0]; - assert.strictEqual(loc.key, KEY); - assert.strictEqual(loc.size, 0); - assert.strictEqual(loc.start, 0); - assert.strictEqual(loc.dataStoreName, LOCATION); - assert.strictEqual(loc.dataStoreType, 'aws_s3'); - assert.strictEqual(decode(loc.dataStoreVersionId), - NEW_VERSION_ID); - - // replication info should be empty - const repInfo = objVal.replicationInfo; - assert.strictEqual(repInfo.status, ''); - assert.deepStrictEqual(repInfo.backends, []); - assert.deepStrictEqual(repInfo.content, []); - assert.strictEqual(repInfo.storageClass, ''); - assert.strictEqual(repInfo.storageType, ''); - assert.strictEqual(repInfo.dataStoreVersionId, ''); + assert.strictEqual(added.length, 0); done(); }); }); - it('should skip restored entry scal-version-id', done => { + it('should update tags on restored entry', done => { const versionKey = `${KEY}${VID_SEP}${NEW_VERSION_ID}`; - const objmd = new ObjectMD() + const entry = new ObjectQueueEntry(BUCKET, versionKey, new ObjectMD() .setAcl() .setKey(KEY) - .setVersionId(NEW_VERSION_ID); - const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd) - .setUserMetadata({ 'x-amz-meta-scal-version-id': encode(VERSION_ID) }); - const getObject = sinon.stub(mongoClient, 'getObject').yields(null, - new ObjectMD() - .setVersionId(VERSION_ID) - .setTags({ mytag: 'mytags-value' }) - .setDataStoreName(LOCATION) - .setAmzStorageClass('cold') - .setLocation([{ - key: KEY, - start: 0, - size: 50, - dataStoreName: LOCATION, - dataStoreVersionId: NEW_VERSION_ID, - }])._data - ); + .setTags({ mytag: 'mytags-value' }) + .setVersionId(NEW_VERSION_ID) + .setUserMetadata({ 'x-amz-meta-scal-version-id': encode(VERSION_ID) })); + const objmd = new ObjectMD() + .setKey(KEY) + .setVersionId(VERSION_ID) + .setDataStoreName(LOCATION) + .setAmzStorageClass('cold') + .setArchive(mockArchive) + .setLocation([{ + key: KEY, + start: 0, + size: 50, + dataStoreName: LOCATION, + dataStoreVersionId: NEW_VERSION_ID, + }]); + const getObject = sinon.stub(mongoClient, 'getObject').yields(null, objmd.getValue()); async.waterfall([ next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, next), - (bucketInfo, next) => next(null, - bucketInfo.setReplicationConfiguration(null)), (bucketInfo, next) => mqp._processObjectQueueEntry(fakeLogger, entry, LOCATION, bucketInfo, next), ], err => { @@ -810,7 +800,28 @@ describe('MongoQueueProcessor', function mqp() { assert.strictEqual(getObject.getCall(0).args[2].versionId, VERSION_ID); const added = mqp.getAdded(); - assert.strictEqual(added.length, 0); + assert.strictEqual(added.length, 1); + + // Expect tags and replicationInfo to have been updated + const objVal = added[0].objVal; + assert.deepStrictEqual(objVal, objmd + .setTags({ mytag: 'mytags-value' }) + .setReplicationInfo({ + backends: [{ + dataStoreVersionId: '', + site: 'test-site-2', + status: 'PENDING' + }], + content: ['METADATA', 'PUT_TAGGING'], + dataStoreVersionId: '', + destination: 'arn:aws:s3:::mqp-test-bucket', + isNFS: null, + role: 'arn:aws:iam::root:role/s3-replication-role', + status: 'PENDING', + storageClass: 'test-site-2', + storageType: 'aws_s3' + }) + .getValue()); done(); });