Skip to content

Commit

Permalink
Make OOB logic safer
Browse files Browse the repository at this point in the history
Instead of rebuilding the whole metadata on updates, simply update the
required fields, i.e. tags.

This also fixes subtle issues, where some fields from Zenko may be
overwritten when some operations are performed on s3c.

Issue: BB-590
  • Loading branch information
francoisferrand committed Dec 26, 2024
1 parent 2f1035c commit 60d5798
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 105 deletions.
61 changes: 15 additions & 46 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class MongoQueueProcessor {

/**
* get dataStoreVersionId, if exists
* @param {Object} objMd - object md fetched from mongo
* @param {ObjectMDData} objMd - object md fetched from mongo
* @param {String} site - storage location name
* @return {String} dataStoreVersionId
*/
Expand Down Expand Up @@ -508,46 +508,6 @@ class MongoQueueProcessor {
return done(err);
}

// If the object has `x-amz-meta-scal-version-id`, we need to use it instead of the id.
// This should only happen for objects restored onto the OOB location, and the location
// should match in that case
if (scalVersionId) {
if (!zenkoObjMd) {
this.logger.warn('missing source entry, ignoring x-amz-meta-scal-version-id', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
location,
});
// This may happen if the object has been deleted from Zenko, but we processed
// the create/update event from oplog after the object was deleted. We can
// proceed normally and create the entry, as we will get a followup "delete"
// operation and will thus eventually be consistent.
} else if (zenkoObjMd.location?.length !== 1 ||
zenkoObjMd.location[0].dataStoreName !== location ||
zenkoObjMd.location[0].dataStoreVersionId !== sourceEntry.getVersionId()) {
this.logger.warn('mismatched source entry, skipping entry', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
location,
});

// If the versionId does not match, it mean the object metadata has been updated
// in Zenko already, and we are thus processing an outdated oplog entry: which
// should be ignored. Not much of an issue though, as we have retrieved Zenko
// object, so `getContentType()` will pickup tag changes only.
} else {
this.logger.info('restored oob object', {
bucket, key, scalVersionId, zenkoObjMd, sourceEntry
});

sourceEntry.setVersionId(scalVersionId);

// TODO: do we need to update the (mongo) metadata in that case???
// - This may happen if object is re-tagged while restored?
// - Need to cleanup scal version id: delete objVal['x-amz-meta-scal-version-id'];
// - Need to keep the archive & restore fields in the metadata
return done();
}
}

const content = getContentType(sourceEntry, zenkoObjMd);
if (content.length === 0) {
this._normalizePendingMetric(location);
Expand All @@ -560,11 +520,20 @@ class MongoQueueProcessor {
return done();
}

// update necessary metadata fields before saving to Zenko MongoDB
this._updateOwnerMD(sourceEntry, bucketInfo);
this._updateObjectDataStoreName(sourceEntry, location);
this._updateLocations(sourceEntry, location);
this._updateAcl(sourceEntry);
if (zenkoObjMd) {
// Keep existing metadata fields, only need to update the tags
const tags = sourceEntry.getTags();
sourceEntry._data = { ...zenkoObjMd }; // eslint-disable-line no-param-reassign
sourceEntry.setTags(tags);
} else {
// Update necessary metadata fields before saving to Zenko MongoDB
this._updateOwnerMD(sourceEntry, bucketInfo);
this._updateObjectDataStoreName(sourceEntry, location);
this._updateLocations(sourceEntry, location);
this._updateAcl(sourceEntry);
}

// Try to update replication info, if applicable
this._updateReplicationInfo(sourceEntry, bucketInfo, content,
zenkoObjMd);

Expand Down
129 changes: 70 additions & 59 deletions tests/functional/ingestion/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const authdata = require('../../../conf/authdata.json');
const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');
const DeleteOpQueueEntry = require('../../../lib/models/DeleteOpQueueEntry');
const fakeLogger = require('../../utils/fakeLogger');
const { ObjectMDArchive } = require('arsenal/build/lib/models');

const kafkaConfig = config.kafka;
const mongoProcessorConfig = config.extensions.mongoProcessor;
Expand All @@ -31,6 +32,12 @@ const VERSION_ID = '98445230573829999999RG001 15.144.0';
// new version id > existing version id
const NEW_VERSION_ID = '98445235075994999999RG001 14.90.2';

const mockArchive = new ObjectMDArchive(
{ archiveId: '123456789' },
Date.now() - 3600 * 1000, 1,
Date.now(), Date.now() + 23 * 3600 * 1000,
);

const mockReplicationInfo = {
role: 'arn:aws:iam::root:role/s3-replication-role',
destination: `arn:aws:s3:::${BUCKET}`,
Expand Down Expand Up @@ -712,15 +719,29 @@ describe('MongoQueueProcessor', function mqp() {
});
});

it('should save to mongo a new version entry when scal-version-id does not match the data location', done => {
it('should not update restored entry', done => {
const versionKey = `${KEY}${VID_SEP}${NEW_VERSION_ID}`;
const objmd = new ObjectMD()
.setAcl()
.setKey(KEY)
.setVersionId(NEW_VERSION_ID);
const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd)
.setVersionId(NEW_VERSION_ID)
.setUserMetadata({ 'x-amz-meta-scal-version-id': encode(VERSION_ID) });
const getObject = sinon.stub(mongoClient, 'getObject').callThrough();
const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd);
const getObject = sinon.stub(mongoClient, 'getObject').yields(null,
new ObjectMD()
.setKey(KEY)
.setVersionId(VERSION_ID)
.setDataStoreName(LOCATION)
.setAmzStorageClass('cold')
.setArchive(mockArchive)
.setLocation([{
key: KEY,
start: 0,
size: 50,
dataStoreName: LOCATION,
dataStoreVersionId: NEW_VERSION_ID,
}])._data
);
async.waterfall([
next => mongoClient.getBucketAttributes(BUCKET, fakeLogger,
next),
Expand All @@ -732,73 +753,42 @@ describe('MongoQueueProcessor', function mqp() {
assert.ifError(err);

sinon.assert.calledOnce(getObject);
assert.strictEqual(getObject.getCall(0).args[0], BUCKET);
assert.strictEqual(getObject.getCall(0).args[1], KEY);
assert.strictEqual(getObject.getCall(0).args[2].versionId, VERSION_ID);

const added = mqp.getAdded();
assert.strictEqual(added.length, 1);
const objVal = added[0].objVal;
assert.strictEqual(added[0].key, versionKey);
// key shall now be always populated
assert.deepStrictEqual(objVal.key, KEY);
// acl should reset
assert.deepStrictEqual(objVal.acl, new ObjectMD().getAcl());
// owner md should update
assert.strictEqual(objVal['owner-display-name'],
authdata.accounts[0].name);
assert.strictEqual(objVal['owner-id'],
authdata.accounts[0].canonicalID);
// dataStoreName should update
assert.strictEqual(objVal.dataStoreName, LOCATION);
// locations should update, no data in object
assert.strictEqual(objVal.location.length, 1);
const loc = objVal.location[0];
assert.strictEqual(loc.key, KEY);
assert.strictEqual(loc.size, 0);
assert.strictEqual(loc.start, 0);
assert.strictEqual(loc.dataStoreName, LOCATION);
assert.strictEqual(loc.dataStoreType, 'aws_s3');
assert.strictEqual(decode(loc.dataStoreVersionId),
NEW_VERSION_ID);

// replication info should be empty
const repInfo = objVal.replicationInfo;
assert.strictEqual(repInfo.status, '');
assert.deepStrictEqual(repInfo.backends, []);
assert.deepStrictEqual(repInfo.content, []);
assert.strictEqual(repInfo.storageClass, '');
assert.strictEqual(repInfo.storageType, '');
assert.strictEqual(repInfo.dataStoreVersionId, '');
assert.strictEqual(added.length, 0);

done();
});
});

it('should skip restored entry scal-version-id', done => {
it('should update tags on restored entry', done => {
const versionKey = `${KEY}${VID_SEP}${NEW_VERSION_ID}`;
const objmd = new ObjectMD()
const entry = new ObjectQueueEntry(BUCKET, versionKey, new ObjectMD()
.setAcl()
.setKey(KEY)
.setVersionId(NEW_VERSION_ID);
const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd)
.setUserMetadata({ 'x-amz-meta-scal-version-id': encode(VERSION_ID) });
const getObject = sinon.stub(mongoClient, 'getObject').yields(null,
new ObjectMD()
.setVersionId(VERSION_ID)
.setTags({ mytag: 'mytags-value' })
.setDataStoreName(LOCATION)
.setAmzStorageClass('cold')
.setLocation([{
key: KEY,
start: 0,
size: 50,
dataStoreName: LOCATION,
dataStoreVersionId: NEW_VERSION_ID,
}])._data
);
.setTags({ mytag: 'mytags-value' })
.setVersionId(NEW_VERSION_ID)
.setUserMetadata({ 'x-amz-meta-scal-version-id': encode(VERSION_ID) }));
const objmd = new ObjectMD()
.setKey(KEY)
.setVersionId(VERSION_ID)
.setDataStoreName(LOCATION)
.setAmzStorageClass('cold')
.setArchive(mockArchive)
.setLocation([{
key: KEY,
start: 0,
size: 50,
dataStoreName: LOCATION,
dataStoreVersionId: NEW_VERSION_ID,
}]);
const getObject = sinon.stub(mongoClient, 'getObject').yields(null, objmd.getValue());
async.waterfall([
next => mongoClient.getBucketAttributes(BUCKET, fakeLogger,
next),
(bucketInfo, next) => next(null,
bucketInfo.setReplicationConfiguration(null)),
(bucketInfo, next) => mqp._processObjectQueueEntry(fakeLogger,
entry, LOCATION, bucketInfo, next),
], err => {
Expand All @@ -810,7 +800,28 @@ describe('MongoQueueProcessor', function mqp() {
assert.strictEqual(getObject.getCall(0).args[2].versionId, VERSION_ID);

const added = mqp.getAdded();
assert.strictEqual(added.length, 0);
assert.strictEqual(added.length, 1);

// Expect tags and replicationInfo to have been updated
const objVal = added[0].objVal;
assert.deepStrictEqual(objVal, objmd
.setTags({ mytag: 'mytags-value' })
.setReplicationInfo({
backends: [{
dataStoreVersionId: '',
site: 'test-site-2',
status: 'PENDING'
}],
content: ['METADATA', 'PUT_TAGGING'],
dataStoreVersionId: '',
destination: 'arn:aws:s3:::mqp-test-bucket',
isNFS: null,
role: 'arn:aws:iam::root:role/s3-replication-role',
status: 'PENDING',
storageClass: 'test-site-2',
storageType: 'aws_s3'
})
.getValue());

done();
});
Expand Down

0 comments on commit 60d5798

Please sign in to comment.