Skip to content

Commit

Permalink
fix triggering replication when multpile destinations are set
Browse files Browse the repository at this point in the history
- Issue 1: In Artesca "SITE_NAME" is never passed, so we always
trigger objects that are replicated to the first storageClass in
the replication rule.

- Issue 2: We check the global replication status when verifying
wether or not an object should be retriggered. This doesn't necessarily
work all the time, especially when replicating to multiple destinations.
As if one destination fails the global status becomes failed, which will
make it impossible to trigger objects with a completed status for example.

- Issue 3: replication info is completely overwritten when it does not contain
info about a specific site. This will cause an issue when replicating to multiple
destinations as the script can only be launched for one site at a time, so when
having a object with non initialized replication info, we won't be able to set
the replication info propely for all destinations.

Issue: S3UTILS-184
  • Loading branch information
Kerkesni committed Dec 23, 2024
1 parent 2310928 commit 55c1b97
Show file tree
Hide file tree
Showing 2 changed files with 298 additions and 16 deletions.
56 changes: 40 additions & 16 deletions CRR/ReplicationStatusUpdater.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,19 @@ class ReplicationStatusUpdater {
* Determines if an object should be updated based on its replication metadata properties.
* @private
* @param {ObjectMD} objMD - The metadata of the object.
* @param {string} site - The destination site name.
* @returns {boolean} True if the object should be updated.
*/
_objectShouldBeUpdated(objMD) {
_objectShouldBeUpdated(objMD, site) {
return this.replicationStatusToProcess.some(filter => {
if (filter === 'NEW') {
// Either site specific replication info is missing
// or are initialized with empty fields.
return (!objMD.getReplicationInfo()
|| objMD.getReplicationInfo().status === '');
|| !objMD.getReplicationSiteStatus(site));
}
return (objMD.getReplicationInfo()
&& objMD.getReplicationInfo().status === filter);
&& objMD.getReplicationSiteStatus(site) === filter);
});
}

Expand Down Expand Up @@ -172,36 +175,54 @@ class ReplicationStatusUpdater {
// codebase easier to maintain and upgrade, as opposed to having multiple branches or versions of
// the code for different schema versions.
objMD = new ObjectMD(JSON.parse(mdRes.Body));
if (!this._objectShouldBeUpdated(objMD)) {
if (!this._objectShouldBeUpdated(objMD, storageClass)) {
skip = true;
return process.nextTick(next);
}
// Initialize replication info, if missing
// This is particularly important if the object was created before
// enabling replication on the bucket.
if (!objMD.getReplicationInfo()
|| !objMD.getReplicationSiteStatus(storageClass)) {
let replicationInfo = objMD.getReplicationInfo();
if (!replicationInfo || !replicationInfo.status) {
const { Rules, Role } = repConfig;
const destination = Rules[0].Destination.Bucket;
// set replication properties
const ops = objMD.getContentLength() === 0 ? ['METADATA']
: ['METADATA', 'DATA'];
const backends = [{
site: storageClass,
status: 'PENDING',
dataStoreVersionId: '',
}];
const replicationInfo = {
replicationInfo = {
status: 'PENDING',
backends,
content: ops,
backends: [],
destination,
storageClass,
storageClass: '',
role: Role,
storageType: this.storageType,
storageType: '',
};
objMD.setReplicationInfo(replicationInfo);
}
// Update replication info with site specific info
if (objMD.getReplicationSiteStatus(storageClass) === undefined) {
// When replicating to multiple destinations,
// the storageClass and storageType properties
// become comma-separated lists of the storage
// classes and types of the replication destinations.
const storageClasses = objMD.getReplicationStorageClass()
? `${objMD.getReplicationStorageClass()},${storageClass}` : storageClass;
objMD.setReplicationStorageClass(storageClasses);
if (this.storageType) {
const storageTypes = objMD.getReplicationStorageType()
? `${objMD.getReplicationStorageType()},${this.storageType}` : this.storageType;
objMD.setReplicationStorageType(storageTypes);
}
// Add site to the list of replication backends
const backends = objMD.getReplicationBackends();
backends.push({
site: storageClass,
status: 'PENDING',
dataStoreVersionId: '',
});
objMD.setReplicationBackends(backends);
}

objMD.setReplicationSiteStatus(storageClass, 'PENDING');
objMD.setReplicationStatus('PENDING');
Expand Down Expand Up @@ -273,13 +294,16 @@ class ReplicationStatusUpdater {
}),
(repConfig, next) => {
const { Rules } = repConfig;
const storageClass = Rules[0].Destination.StorageClass || this.siteName;
const storageClass = this.siteName || Rules[0].Destination.StorageClass;
if (!storageClass) {
const errMsg = 'missing SITE_NAME environment variable, must be set to'
+ ' the value of "site" property in the CRR configuration';
this.log.error(errMsg);
return next(new Error(errMsg));
}
if (!this.siteName) {
this.log.warn(`missing SITE_NAME environment variable, triggering replication to the ${storageClass} storage class`);
}
return eachLimit(versions, this.workers, (i, apply) => {
const { Key, VersionId } = i;
this._markObjectPending(bucket, Key, VersionId, storageClass, repConfig, apply);
Expand Down
258 changes: 258 additions & 0 deletions tests/unit/CRR/ReplicationStatusUpdater.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ describe('ReplicationStatusUpdater', () => {
replicationStatusToProcess: ['NEW'],
targetPrefix: 'toto',
listingLimit: 10,
siteName: 'aws-location',
}, logger);
});

Expand Down Expand Up @@ -129,6 +130,263 @@ describe('ReplicationStatusUpdater', () => {
return done();
});
});

[
{
description: 'for an object with a null replication info',
replicationInfo: null,
replicationStatusToProcess: ['NEW'],
expectedReplicationInfo: {
status: 'PENDING',
backends: [
{
site: 'aws-location',
status: 'PENDING',
dataStoreVersionId: '',
},
],
content: ['METADATA', 'DATA'],
destination: 'arn:aws:s3:::sourcebucket',
storageClass: 'aws-location',
role: 'arn:aws:iam::root:role/s3-replication-role',
storageType: 'aws_s3',
dataStoreVersionId: '',
isNFS: null,
},
}, {
description: 'for an object with empty replication info',
replicationInfo: {
status: '',
backends: [],
content: [],
destination: '',
storageClass: '',
role: '',
storageType: '',
dataStoreVersionId: '',
isNFS: null,
},
replicationStatusToProcess: ['NEW'],
expectedReplicationInfo: {
status: 'PENDING',
backends: [
{
site: 'aws-location',
status: 'PENDING',
dataStoreVersionId: '',
},
],
content: ['METADATA', 'DATA'],
destination: 'arn:aws:s3:::sourcebucket',
storageClass: 'aws-location',
role: 'arn:aws:iam::root:role/s3-replication-role',
storageType: 'aws_s3',
dataStoreVersionId: '',
isNFS: null,
},
}, {
description: 'for an object with a failed replication',
replicationInfo: {
status: 'FAILED',
backends: [
{
site: 'aws-location',
status: 'FAILED',
dataStoreVersionId: '',
},
],
content: ['METADATA', 'DATA'],
destination: 'arn:aws:s3:::sourcebucket',
storageClass: 'aws-location',
role: 'arn:aws:iam::root:role/s3-replication-role',
storageType: 'aws_s3',
dataStoreVersionId: '',
isNFS: null,
},
replicationStatusToProcess: ['FAILED'],
expectedReplicationInfo: {
status: 'PENDING',
backends: [
{
site: 'aws-location',
status: 'PENDING',
dataStoreVersionId: '',
},
],
content: ['METADATA', 'DATA'],
destination: 'arn:aws:s3:::sourcebucket',
storageClass: 'aws-location',
role: 'arn:aws:iam::root:role/s3-replication-role',
storageType: 'aws_s3',
dataStoreVersionId: '',
isNFS: null,
},
}, {
description: 'for an object with a completed replication',
replicationInfo: {
status: 'COMPLETED',
backends: [
{
site: 'aws-location',
status: 'COMPLETED',
dataStoreVersionId: '',
},
],
content: ['METADATA', 'DATA'],
destination: 'arn:aws:s3:::sourcebucket',
storageClass: 'aws-location',
role: 'arn:aws:iam::root:role/s3-replication-role',
storageType: 'aws_s3',
dataStoreVersionId: '',
isNFS: null,
},
replicationStatusToProcess: ['COMPLETED'],
expectedReplicationInfo: {
status: 'PENDING',
backends: [
{
site: 'aws-location',
status: 'PENDING',
dataStoreVersionId: '',
},
],
content: ['METADATA', 'DATA'],
destination: 'arn:aws:s3:::sourcebucket',
storageClass: 'aws-location',
role: 'arn:aws:iam::root:role/s3-replication-role',
storageType: 'aws_s3',
dataStoreVersionId: '',
isNFS: null,
},
}, {
description: 'of a single site for an object with multiple replication destinations',
replicationInfo: {
status: 'FAILED',
backends: [
{
site: 'azure-location',
status: 'COMPLETED',
dataStoreVersionId: '',
},
{
site: 'aws-location',
status: 'FAILED',
dataStoreVersionId: '',
},
],
content: ['METADATA', 'DATA'],
destination: 'arn:aws:s3:::sourcebucket',
storageClass: 'azure-location,aws-location',
role: 'arn:aws:iam::root:role/s3-replication-role',
storageType: 'azure,aws_s3',
dataStoreVersionId: '',
isNFS: null,
},
replicationStatusToProcess: ['FAILED'],
expectedReplicationInfo: {
status: 'PENDING',
backends: [
{
site: 'azure-location',
status: 'COMPLETED',
dataStoreVersionId: '',
}, {
site: 'aws-location',
status: 'PENDING',
dataStoreVersionId: '',
},
],
content: ['METADATA', 'DATA'],
destination: 'arn:aws:s3:::sourcebucket',
storageClass: 'azure-location,aws-location',
role: 'arn:aws:iam::root:role/s3-replication-role',
storageType: 'azure,aws_s3',
dataStoreVersionId: '',
isNFS: null,
},
}, {
description: 'of a single non initialized site for an object with multiple replication destinations',
replicationInfo: {
status: 'FAILED',
backends: [
{
site: 'azure-location',
status: 'COMPLETED',
dataStoreVersionId: '',
},
{
site: 'azure-location-2',
status: 'FAILED',
dataStoreVersionId: '',
},
],
content: ['METADATA', 'DATA'],
destination: 'arn:aws:s3:::sourcebucket',
storageClass: 'azure-location,azure-location-2',
role: 'arn:aws:iam::root:role/s3-replication-role',
storageType: 'azure,azure',
dataStoreVersionId: '',
isNFS: null,
},
replicationStatusToProcess: ['NEW'],
expectedReplicationInfo: {
status: 'PENDING',
backends: [
{
site: 'azure-location',
status: 'COMPLETED',
dataStoreVersionId: '',
}, {
site: 'azure-location-2',
status: 'FAILED',
dataStoreVersionId: '',
}, {
site: 'aws-location',
status: 'PENDING',
dataStoreVersionId: '',
},
],
content: ['METADATA', 'DATA'],
destination: 'arn:aws:s3:::sourcebucket',
storageClass: 'azure-location,azure-location-2,aws-location',
role: 'arn:aws:iam::root:role/s3-replication-role',
storageType: 'azure,azure,aws_s3',
dataStoreVersionId: '',
isNFS: null,
},
},
].forEach(params => {
it(`should trigger replication ${params.description}`, done => {
crr.bb.getMetadata = jest.fn((p, cb) => {
const objectMd = JSON.parse(getMetadataRes.Body);
objectMd.replicationInfo = params.replicationInfo;
cb(null, { Body: JSON.stringify(objectMd) });
});
crr.siteName = 'aws-location';
crr.storageType = 'aws_s3';
crr.replicationStatusToProcess = params.replicationStatusToProcess;
crr.run(err => {
assert.ifError(err);

expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1);
expect(crr.s3.getBucketReplication).toHaveBeenCalledTimes(1);
expect(crr.bb.getMetadata).toHaveBeenCalledTimes(1);
expect(crr.bb.putMetadata).toHaveBeenCalledTimes(1);
expect(crr.bb.putMetadata).toHaveBeenCalledWith(
expect.objectContaining({
Body: expect.stringContaining(JSON.stringify(params.expectedReplicationInfo)),
}),
expect.any(Function),
);

assert.strictEqual(crr._nProcessed, 1);
assert.strictEqual(crr._nSkipped, 0);
assert.strictEqual(crr._nUpdated, 1);
assert.strictEqual(crr._nErrors, 0);
return done();
});
});
});
});

describe('ReplicationStatusUpdater with specifics', () => {
Expand Down

0 comments on commit 55c1b97

Please sign in to comment.