Skip to content

Commit

Permalink
fix triggering replication when multiple destinations are set
Browse files Browse the repository at this point in the history
  • Loading branch information
Kerkesni committed Dec 19, 2024
1 parent 1be6653 commit 09ccadb
Showing 1 changed file with 44 additions and 40 deletions.
84 changes: 44 additions & 40 deletions crrExistingObjects.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ if (!STORAGE_TYPE) {
if (!TARGET_REPLICATION_STATUS) {
TARGET_REPLICATION_STATUS = 'NEW';
}
if (!SITE_NAME) {
log.fatal('missing SITE_NAME environment variable');
process.exit(1);
}

const replicationStatusToProcess = TARGET_REPLICATION_STATUS.split(',');
replicationStatusToProcess.forEach(state => {
Expand Down Expand Up @@ -83,19 +87,16 @@ const logProgressInterval = setInterval(_logProgress, LOG_PROGRESS_INTERVAL_MS);
function _objectShouldBeUpdated(objMD) {
return replicationStatusToProcess.some(filter => {
if (filter === 'NEW') {
return (!objMD.getReplicationInfo()
|| objMD.getReplicationInfo().status === '');
return !objMD.getReplicationSiteStatus(SITE_NAME);
}
return (objMD.getReplicationInfo()
&& objMD.getReplicationInfo().status === filter);
return objMD.getReplicationSiteStatus(SITE_NAME) === filter;
});
}

function _markObjectPending(
bucket,
key,
versionId,
storageClass,
repConfig,
cb,
) {
Expand Down Expand Up @@ -158,32 +159,45 @@ function _markObjectPending(
return next();
}

// Initialize replication info, if missing
if (!objMD.getReplicationInfo()
|| !objMD.getReplicationSiteStatus(storageClass)) {
const { Rules, Role } = repConfig;
const destination = Rules[0].Destination.Bucket;
// set replication properties
const ops = objMD.getContentLength() === 0 ? ['METADATA']
: ['METADATA', 'DATA'];
const backends = [{
site: storageClass,
// Initialize replication info if missing
if (objMD.getReplicationSiteStatus(SITE_NAME) === undefined) {
let replicationInfo = objMD.getReplicationInfo();
// Objects on buckets with no replication configuration usually
// still have replicationInfo set with empty values. In this case
// we still need to initialize all the fields.
if (!replicationInfo || replicationInfo.backends.length === 0) {
const { Rules, Role } = repConfig;
const destination = Rules[0].Destination.Bucket;
// set replication properties
const ops = objMD.getContentLength() === 0 ? ['METADATA']
: ['METADATA', 'DATA'];
replicationInfo = {
status: 'PENDING',
content: ops,
backends: [],
destination,
storageClass: SITE_NAME,
role: Role,
storageType: STORAGE_TYPE,
};
} else {
// Apending the storageClass and storageType to the existing
// list of storage classes and types. This can happen when
// multiple destinations.
replicationInfo.storageClass += `,${SITE_NAME}`;
if (STORAGE_TYPE) {
replicationInfo.storageType += `,${STORAGE_TYPE}`;
}
}
replicationInfo.backends.push({
site: SITE_NAME,
status: 'PENDING',
dataStoreVersionId: '',
}];
const replicationInfo = {
status: 'PENDING',
backends,
content: ops,
destination,
storageClass,
role: Role,
storageType: STORAGE_TYPE,
};
});
objMD.setReplicationInfo(replicationInfo);
}

objMD.setReplicationSiteStatus(storageClass, 'PENDING');
objMD.setReplicationSiteStatus(SITE_NAME, 'PENDING');
objMD.setReplicationStatus('PENDING');
objMD.updateMicroVersionId();
const md = objMD.getValue();
Expand Down Expand Up @@ -232,20 +246,10 @@ function _markPending(bucket, versions, cb) {
}
return next(null, res.ReplicationConfiguration);
}),
(repConfig, next) => {
const { Rules } = repConfig;
const storageClass = Rules[0].Destination.StorageClass || SITE_NAME;
if (!storageClass) {
const errMsg = 'missing SITE_NAME environment variable, must be set to'
+ ' the value of "site" property in the CRR configuration';
log.error(errMsg);
return next(new Error(errMsg));
}
return eachLimit(versions, WORKERS, (i, apply) => {
const { Key, VersionId } = i;
_markObjectPending(bucket, Key, VersionId, storageClass, repConfig, apply);
}, next);
},
(repConfig, next) => eachLimit(versions, WORKERS, (i, apply) => {
const { Key, VersionId } = i;
_markObjectPending(bucket, Key, VersionId, repConfig, apply);
}, next),
], cb);
}

Expand Down

0 comments on commit 09ccadb

Please sign in to comment.