From 09ccadbc2bde22e9c5d93be5cf94cfde872af091 Mon Sep 17 00:00:00 2001 From: Kerkesni Date: Thu, 19 Dec 2024 15:04:22 +0100 Subject: [PATCH] fix triggering replication when multiple destinations are set Issue: S3UTILS-184 --- crrExistingObjects.js | 84 ++++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 40 deletions(-) diff --git a/crrExistingObjects.js b/crrExistingObjects.js index 06312574..91d2d8f1 100644 --- a/crrExistingObjects.js +++ b/crrExistingObjects.js @@ -45,6 +45,10 @@ if (!STORAGE_TYPE) { if (!TARGET_REPLICATION_STATUS) { TARGET_REPLICATION_STATUS = 'NEW'; } +if (!SITE_NAME) { + log.fatal('missing SITE_NAME environment variable'); + process.exit(1); +} const replicationStatusToProcess = TARGET_REPLICATION_STATUS.split(','); replicationStatusToProcess.forEach(state => { @@ -83,11 +87,9 @@ const logProgressInterval = setInterval(_logProgress, LOG_PROGRESS_INTERVAL_MS); function _objectShouldBeUpdated(objMD) { return replicationStatusToProcess.some(filter => { if (filter === 'NEW') { - return (!objMD.getReplicationInfo() - || objMD.getReplicationInfo().status === ''); + return !objMD.getReplicationSiteStatus(SITE_NAME); } - return (objMD.getReplicationInfo() - && objMD.getReplicationInfo().status === filter); + return objMD.getReplicationSiteStatus(SITE_NAME) === filter; }); } @@ -95,7 +97,6 @@ function _markObjectPending( bucket, key, versionId, - storageClass, repConfig, cb, ) { @@ -158,32 +159,45 @@ function _markObjectPending( return next(); } - // Initialize replication info, if missing - if (!objMD.getReplicationInfo() - || !objMD.getReplicationSiteStatus(storageClass)) { - const { Rules, Role } = repConfig; - const destination = Rules[0].Destination.Bucket; - // set replication properties - const ops = objMD.getContentLength() === 0 ? ['METADATA'] - : ['METADATA', 'DATA']; - const backends = [{ - site: storageClass, + // Initialize replication info if missing + if (objMD.getReplicationSiteStatus(SITE_NAME) === undefined) { + let replicationInfo = objMD.getReplicationInfo(); + // Objects on buckets with no replication configuration usually + // still have replicationInfo set with empty values. In this case + // we still need to initialize all the fields. + if (!replicationInfo || replicationInfo.backends.length === 0) { + const { Rules, Role } = repConfig; + const destination = Rules[0].Destination.Bucket; + // set replication properties + const ops = objMD.getContentLength() === 0 ? ['METADATA'] + : ['METADATA', 'DATA']; + replicationInfo = { + status: 'PENDING', + content: ops, + backends: [], + destination, + storageClass: SITE_NAME, + role: Role, + storageType: STORAGE_TYPE, + }; + } else { + // Apending the storageClass and storageType to the existing + // list of storage classes and types. This can happen when + // multiple destinations. + replicationInfo.storageClass += `,${SITE_NAME}`; + if (STORAGE_TYPE) { + replicationInfo.storageType += `,${STORAGE_TYPE}`; + } + } + replicationInfo.backends.push({ + site: SITE_NAME, status: 'PENDING', dataStoreVersionId: '', - }]; - const replicationInfo = { - status: 'PENDING', - backends, - content: ops, - destination, - storageClass, - role: Role, - storageType: STORAGE_TYPE, - }; + }); objMD.setReplicationInfo(replicationInfo); } - objMD.setReplicationSiteStatus(storageClass, 'PENDING'); + objMD.setReplicationSiteStatus(SITE_NAME, 'PENDING'); objMD.setReplicationStatus('PENDING'); objMD.updateMicroVersionId(); const md = objMD.getValue(); @@ -232,20 +246,10 @@ function _markPending(bucket, versions, cb) { } return next(null, res.ReplicationConfiguration); }), - (repConfig, next) => { - const { Rules } = repConfig; - const storageClass = Rules[0].Destination.StorageClass || SITE_NAME; - if (!storageClass) { - const errMsg = 'missing SITE_NAME environment variable, must be set to' - + ' the value of "site" property in the CRR configuration'; - log.error(errMsg); - return next(new Error(errMsg)); - } - return eachLimit(versions, WORKERS, (i, apply) => { - const { Key, VersionId } = i; - _markObjectPending(bucket, Key, VersionId, storageClass, repConfig, apply); - }, next); - }, + (repConfig, next) => eachLimit(versions, WORKERS, (i, apply) => { + const { Key, VersionId } = i; + _markObjectPending(bucket, Key, VersionId, repConfig, apply); + }, next), ], cb); }