From 7db9155822220ec627ef618b3108e05be1aee6ac Mon Sep 17 00:00:00 2001 From: Hao Date: Sat, 9 Nov 2024 21:54:40 -0800 Subject: [PATCH] New backup consolidated commit --- fdbbackup/FileConverter.actor.cpp | 5 +- fdbclient/BackupAgentBase.actor.cpp | 43 +- fdbclient/BackupContainerFileSystem.actor.cpp | 18 +- fdbclient/ClientKnobs.cpp | 1 + fdbclient/FileBackupAgent.actor.cpp | 812 +++++++++++++++++- fdbclient/KeyRangeMap.actor.cpp | 8 +- .../include/fdbclient/BackupAgent.actor.h | 5 +- fdbclient/include/fdbclient/ClientKnobs.h | 1 + .../fdbclient/PartitionedLogIterator.h | 26 + fdbserver/ApplyMetadataMutation.cpp | 1 + fdbserver/BackupWorker.actor.cpp | 1 + fdbserver/CommitProxyServer.actor.cpp | 3 +- 12 files changed, 908 insertions(+), 16 deletions(-) create mode 100644 fdbclient/include/fdbclient/PartitionedLogIterator.h diff --git a/fdbbackup/FileConverter.actor.cpp b/fdbbackup/FileConverter.actor.cpp index 84cded889c2..14c7ab68d19 100644 --- a/fdbbackup/FileConverter.actor.cpp +++ b/fdbbackup/FileConverter.actor.cpp @@ -291,7 +291,7 @@ struct MutationFilesReadProgress : public ReferenceCountedempty()) { self->fileProgress.erase(self->fileProgress.begin()); } else { - // Keep fileProgress sorted + // Keep fileProgress sorted because only the first one can be chagned,so this is enough for (int i = 1; i < self->fileProgress.size(); i++) { if (*self->fileProgress[i - 1] <= *self->fileProgress[i]) { break; @@ -489,6 +489,9 @@ ACTOR Future convert(ConvertParams params) { arena = Arena(); } + // keep getting data until a new version is encounter, then flush all data buffered and start to buffer for a + // new version. + ArenaReader rd(data.arena, data.message, AssumeVersion(g_network->protocolVersion())); MutationRef m; rd >> m; diff --git a/fdbclient/BackupAgentBase.actor.cpp b/fdbclient/BackupAgentBase.actor.cpp index 201e33b3505..2ddc96db5a7 100644 --- a/fdbclient/BackupAgentBase.actor.cpp +++ b/fdbclient/BackupAgentBase.actor.cpp @@ -187,9 +187,12 @@ Standalone> getLogRanges(Version beginVersion, return ret; } +// given a begin and end version, get the prefix in the database for this range +// which is applyLogKeys.begin/backupUid/hash(uint8)/version(64bites)/part +// returns multiple key ranges, each should be of length APPLY_BLOCK_SIZE +// (64, 200) -> [(64, 128), (128, 192), (192, 200)] Standalone> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid) { Standalone> ret; - Key baLogRangePrefix = backupUid.withPrefix(applyLogKeys.begin); //TraceEvent("GetLogRanges").detail("BackupUid", backupUid).detail("Prefix", baLogRangePrefix); @@ -292,6 +295,7 @@ void _addResult(bool* tenantMapChanging, each mutation (if needed) and adding/removing prefixes from the mutations. The final mutations are then added to the "result" vector alongside their encrypted counterparts (which is added to the "encryptedResult" vector) */ +// hfu5: value is each Param2 ACTOR static Future decodeBackupLogValue(Arena* arena, VectorRef* result, VectorRef>* encryptedResult, @@ -318,6 +322,7 @@ ACTOR static Future decodeBackupLogValue(Arena* arena, throw incompatible_protocol_version(); } + // hfu5: this is the format for Param2 state uint32_t totalBytes = 0; memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t)); offset += sizeof(uint32_t); @@ -332,6 +337,7 @@ ACTOR static Future decodeBackupLogValue(Arena* arena, while (consumed < totalBytes) { uint32_t type = 0; + // hfu5: format should be type|kLen|vLen|Key|Value memcpy(&type, value.begin() + offset, sizeof(uint32_t)); offset += sizeof(uint32_t); state uint32_t len1 = 0; @@ -448,6 +454,9 @@ ACTOR static Future decodeBackupLogValue(Arena* arena, } else { Version ver = key_version->rangeContaining(logValue.param1).value(); //TraceEvent("ApplyMutation").detail("LogValue", logValue).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion); + // version is the version of this mutation decoded from log + // ver is the old version stored in keyVersionMap + // as a result, only add this mutation in log when the version is larger(to work with range file) if (version > ver && ver != invalidVersion) { if (removePrefix.size()) { logValue.param1 = logValue.param1.removePrefix(removePrefix); @@ -587,6 +596,7 @@ ACTOR Future readCommitted(Database cx, } } +// hfu5: read each version, potentially multiple part within the same version ACTOR Future readCommitted(Database cx, PromiseStream results, Future active, @@ -639,7 +649,12 @@ ACTOR Future readCommitted(Database cx, wait(lock->take(TaskPriority::DefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize())); releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize()); + // iterate on a version range. + // each version - partition is a key-value pair + // hfu5 question: when in the edge case, two partitions of same key goes to two different blocks, so they + // cannot be combined here, what happens? for (auto& s : rangevalue) { + // hfu5 : (version, part) uint64_t groupKey = groupBy(s.key).first; //TraceEvent("Log_ReadCommitted").detail("GroupKey", groupKey).detail("SkipGroup", skipGroup).detail("NextKey", nextKey.key).detail("End", end.key).detail("Valuesize", value.size()).detail("Index",index++).detail("Size",s.value.size()); if (groupKey != skipGroup) { @@ -647,6 +662,10 @@ ACTOR Future readCommitted(Database cx, rcGroup.version = tr.getReadVersion().get(); rcGroup.groupKey = groupKey; } else if (rcGroup.groupKey != groupKey) { + // hfu5: if seeing a different version, then send result directly, and then create another + // rcGroup as a result, each rcgroup is for a single version, but a single version can span in + // different rcgroups + //TraceEvent("Log_ReadCommitted").detail("SendGroup0", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength",rcGroup.items[0].value.size()); // state uint32_t len(0); // for (size_t j = 0; j < rcGroup.items.size(); ++j) { @@ -665,6 +684,7 @@ ACTOR Future readCommitted(Database cx, rcGroup.version = tr.getReadVersion().get(); rcGroup.groupKey = groupKey; } + // this is each item, so according to kvMutationLogToTransactions, each item should be a partition rcGroup.items.push_back_deep(rcGroup.items.arena(), s); } } @@ -706,6 +726,8 @@ Future readCommitted(Database cx, cx, results, Void(), lock, range, groupBy, Terminator::True, AccessSystemKeys::True, LockAware::True); } +// restore transaction has to be first in the batch, or it is the only txn in batch to make sure it never conflicts with +// others. ACTOR Future sendCommitTransactionRequest(CommitTransactionRequest req, Key uid, Version newBeginVersion, @@ -759,6 +781,8 @@ ACTOR Future kvMutationLogToTransactions(Database cx, state Version lastVersion = invalidVersion; state bool endOfStream = false; state int totalBytes = 0; + // two layer of loops, outside loop for each file range, + // inside look for each transaction(version) loop { state CommitTransactionRequest req; state Version newBeginVersion = invalidVersion; @@ -774,10 +798,12 @@ ACTOR Future kvMutationLogToTransactions(Database cx, BinaryWriter bw(Unversioned()); for (int i = 0; i < group.items.size(); ++i) { + // hfu5 : each value should be a partition bw.serializeBytes(group.items[i].value); } // Parse a single transaction from the backup mutation log Standalone value = bw.toValue(); + // ref: https://github.com/apple/foundationdb/blob/release-6.2/design/backup-dataFormat.md wait(decodeBackupLogValue(&curReq.arena, &curReq.transaction.mutations, &curReq.transaction.encryptedMutations, @@ -882,6 +908,13 @@ ACTOR Future coalesceKeyVersionCache(Key uid, lastVersion = it.value(); } else { Version ver = it.value(); + // ver: version from keyVersion + // endVersion: after applying a batch of versions from log files, the largest version + // if ver < endVersion, that means this key in keyVersion is outdated + // in this case, runClearRange on the keyVersionMapRange prefix for this key, + // so that the alog key is the truth, otherwise, keyVersionMapRange should be the truth + // each key needs to be individually checked, because even though range file is for a range, log file does + // not if (ver < endVersion && lastVersion < endVersion && ver != invalidVersion && lastVersion != invalidVersion) { Key removeKey = it.range().begin.withPrefix(mapPrefix); @@ -940,15 +973,22 @@ ACTOR Future applyMutations(Database cx, } int rangeCount = std::max(1, CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / maxBytes); + // this means newEndVersion can only be at most of size APPLY_BLOCK_SIZE state Version newEndVersion = std::min(*endVersion, ((beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE) + rangeCount) * CLIENT_KNOBS->APPLY_BLOCK_SIZE); + + // ranges each represent a partition of version, e.g. [100, 200], [201, 300], [301, 400] + // (64, 200) -> [(64, 128), (128, 192), (192, 200)] assuming block size is 64 state Standalone> ranges = getApplyRanges(beginVersion, newEndVersion, uid); + // ranges have format: applyLogKeys.begin/uid/hash(uint8)/version(64bites)/part state size_t idx; state std::vector> results; state std::vector> rc; state std::vector> locks; + // each RCGroup is for a single version, each results[i] is for a single range + // one range might have multiple versions for (int i = 0; i < ranges.size(); ++i) { results.push_back(PromiseStream()); locks.push_back(makeReference( @@ -957,6 +997,7 @@ ACTOR Future applyMutations(Database cx, } maxBytes = std::max(maxBytes * CLIENT_KNOBS->APPLY_MAX_DECAY_RATE, CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES); + for (idx = 0; idx < ranges.size(); ++idx) { int bytes = wait(kvMutationLogToTransactions(cx, diff --git a/fdbclient/BackupContainerFileSystem.actor.cpp b/fdbclient/BackupContainerFileSystem.actor.cpp index 62d35d82549..9ad22636e7d 100644 --- a/fdbclient/BackupContainerFileSystem.actor.cpp +++ b/fdbclient/BackupContainerFileSystem.actor.cpp @@ -904,7 +904,15 @@ class BackupContainerFileSystemImpl { // If "keyRangesFilter" is empty, the file set will cover all key ranges present in the backup. // It's generally a good idea to specify "keyRangesFilter" to reduce the number of files for // restore times. - // + // hfu5:1. it first reads and parse snapshot file, each snapshot file can map to a list of range files + // including ranges/ and kvranges/, then it collects range files who has intersecting keys + // 2. not sure why restorable.targetVersion < maxKeyRangeVersion it would continue + // 3. then it has a minKeyRangeVersion representing min version of all range files + // 4. then it read all log files with start smaller than targetVersion and end larget than minKeyRangeVersion + // 4. if the first log file start version is smaller than minKeyRangeVersion, then we do not know the value, + // give up. + // otherwise return both range and log files. + // 5. LogFile object is created in BackupContainerFileSystem::listLogFiles, and tagID are populated for plog // If "logsOnly" is true, then only log files are returned and "keyRangesFilter" is ignored, // because the log can contain mutations of the whole key space, unlike range files that each // is limited to a smaller key range. @@ -943,6 +951,7 @@ class BackupContainerFileSystemImpl { state Version minKeyRangeVersion = MAX_VERSION; state Version maxKeyRangeVersion = -1; + // iterate each listed file, why still return a vector std::pair, std::map> results = wait(bc->readKeyspaceSnapshot(snapshots[i])); @@ -955,6 +964,7 @@ class BackupContainerFileSystemImpl { maxKeyRangeVersion = snapshots[i].endVersion; } else { for (const auto& rangeFile : results.first) { + // each file is a version on a [begin, end] key range const auto& keyRange = results.second.at(rangeFile.fileName); if (keyRange.intersects(keyRangesFilter)) { restorable.ranges.push_back(rangeFile); @@ -971,6 +981,9 @@ class BackupContainerFileSystemImpl { // 'latestVersion' represents using the minimum restorable version in a snapshot. restorable.targetVersion = targetVersion == latestVersion ? maxKeyRangeVersion : targetVersion; // Any version < maxKeyRangeVersion is not restorable. + // hfu5 question: why? what if target version is 8500, and this snapshot has [8000, 8200, 8800] + // do we give up directly? why it is not restorable? + // not give up, try to find the next smaller one if (restorable.targetVersion < maxKeyRangeVersion) continue; @@ -993,6 +1006,7 @@ class BackupContainerFileSystemImpl { store(plogs, bc->listLogFiles(minKeyRangeVersion, restorable.targetVersion, true))); if (plogs.size() > 0) { + // hfu5 : this is how files are decided logs.swap(plogs); // sort by tag ID so that filterDuplicates works. std::sort(logs.begin(), logs.end(), [](const LogFile& a, const LogFile& b) { @@ -1005,6 +1019,8 @@ class BackupContainerFileSystemImpl { restorable.logs.swap(filtered); // sort by version order again for continuous analysis std::sort(restorable.logs.begin(), restorable.logs.end()); + // sort by version, but isPartitionedLogsContinuous will sort each tag separately + // need to refactor. if (isPartitionedLogsContinuous(restorable.logs, minKeyRangeVersion, restorable.targetVersion)) { restorable.continuousBeginVersion = minKeyRangeVersion; restorable.continuousEndVersion = restorable.targetVersion + 1; // not inclusive diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index b2b43012ce9..29d7736c14b 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -182,6 +182,7 @@ void ClientKnobs::initialize(Randomize randomize) { init( BACKUP_DISPATCH_ADDTASK_SIZE, 50 ); init( RESTORE_DISPATCH_ADDTASK_SIZE, 150 ); init( RESTORE_DISPATCH_BATCH_SIZE, 30000 ); if( randomize && BUGGIFY ) RESTORE_DISPATCH_BATCH_SIZE = 20; + init (RESTORE_PARTITIONED_BATCH_VERSION_SIZE, 1000000); init( RESTORE_WRITE_TX_SIZE, 256 * 1024 ); init( APPLY_MAX_LOCK_BYTES, 1e9 ); init( APPLY_MIN_LOCK_BYTES, 11e6 ); //Must be bigger than TRANSACTION_SIZE_LIMIT diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index f0f02877e0c..73faa6ebd8c 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -40,6 +40,7 @@ #include "fdbclient/KeyRangeMap.h" #include "fdbclient/Knobs.h" #include "fdbclient/ManagementAPI.actor.h" +#include "fdbclient/PartitionedLogIterator.h" #include "fdbclient/RestoreInterface.h" #include "fdbclient/Status.h" #include "fdbclient/SystemData.h" @@ -156,7 +157,6 @@ ACTOR Future> TagUidMap::getAll_impl(TagUidMap* tagsMa KeyBackedTag::KeyBackedTag(std::string tagName, StringRef tagMapPrefix) : KeyBackedProperty(TagUidMap(tagMapPrefix).getProperty(tagName)), tagName(tagName), tagMapPrefix(tagMapPrefix) {} - class RestoreConfig : public KeyBackedTaskConfig { public: RestoreConfig(UID uid = UID()) : KeyBackedTaskConfig(fileRestorePrefixRange.begin, uid) {} @@ -232,16 +232,20 @@ class RestoreConfig : public KeyBackedTaskConfig { // Describes a file to load blocks from during restore. Ordered by version and then fileName to enable // incrementally advancing through the map, saving the version and path of the next starting point. + // question: do we want to add tag here? struct RestoreFile { - Version version; + Version version; // this is beginVersion, not endVersion std::string fileName; bool isRange{ false }; // false for log file int64_t blockSize{ 0 }; int64_t fileSize{ 0 }; Version endVersion{ ::invalidVersion }; // not meaningful for range files + int64_t tagId = -1; // only meaningful to log files, Log router tag. Non-negative for new backup format. + int64_t totalTags = -1; // only meaningful to log files, Total number of log router tags. Tuple pack() const { - return Tuple::makeTuple(version, fileName, (int)isRange, fileSize, blockSize, endVersion); + return Tuple::makeTuple( + version, fileName, (int64_t)isRange, fileSize, blockSize, endVersion, tagId, totalTags); } static RestoreFile unpack(Tuple const& t) { RestoreFile r; @@ -252,6 +256,8 @@ class RestoreConfig : public KeyBackedTaskConfig { r.fileSize = t.getInt(i++); r.blockSize = t.getInt(i++); r.endVersion = t.getInt(i++); + r.tagId = t.getInt(i++); + r.totalTags = t.getInt(i++); return r; } }; @@ -488,6 +494,323 @@ ACTOR Future RestoreConfig::getFullStatus_impl(RestoreConfig restor return returnStr; } +// two buffers are alternatively serving data and reading data from file +// thus when one buffer is serving data through peek() +// the other buffer is reading data from file to provide pipelining. +class TwoBuffers : public ReferenceCounted, NonCopyable { +public: + class IteratorBuffer : public ReferenceCounted { + public: + std::shared_ptr data; + // has_value means there is data, otherwise it means there is no data being fetched or ready + // is_valid means data is being fetched, is_ready means data is ready + std::optional> fetchingData; + size_t size; + int capacity; + IteratorBuffer(int _capacity) { + data = std::shared_ptr(new char[capacity]()); + fetchingData.reset(); + size = 0; + capacity = _capacity; + } + + bool is_valid() { return fetchingData.has_value(); } + }; + TwoBuffers(int); + // ready need to be called first before calling peek + // because a shared_ptr cannot be wrapped by a Future + // this method ensures the current buffer has available data + Future ready(); + ACTOR static Future ready(Reference self); + // fill buffer[index] with the next block of file + // it has side effects to change currentFileIndex and currentFilePosition + ACTOR static Future readNextBlock(Reference self, int index); + // peek can only be called after ready is called + // it returns the pointer to the active buffer + std::shared_ptr peek(); + + bool hasNext(); + + // discard the current buffer and swap to the next one + void discardAndSwap(); + + // try to fill the buffer[index] + // but no-op if the buffer have valid data or it is actively being filled + void fillBufferIfAbsent(int index); + + size_t getBufferSize(); + +private: + Reference buffers[2]; // Two buffers for alternating + size_t bufferCapacity; // Size of each buffer in bytes + Reference bc; + std::vector files; + + int cur; // Index of the current active buffer (0 or 1) + size_t currentFileIndex; // Index of the current file being read + size_t currentFilePosition; // Current read position in the current file +}; + +TwoBuffers::TwoBuffers(int capacity) : currentFileIndex(0), currentFilePosition(0), cur(0) { + // bufferCapacity = BATCH_READ_BLOCK_COUNT * BLOCK_SIZE; + bufferCapacity = capacity; + buffers[0] = makeReference(capacity); + buffers[1] = makeReference(capacity); +} + +bool TwoBuffers::hasNext() { + return currentFileIndex < files.size() || + (buffers[0]->is_valid() || buffers[1]->is_valid()); // as long as not reset +} + +Future TwoBuffers::ready() { + return ready(Reference::addRef(this)); +} + +ACTOR Future TwoBuffers::ready(Reference self) { + // if cur is not ready, then wait + if (!self->hasNext()) { + return Void(); + } + // try to fill the current buffer, and wait before it is filled + self->fillBufferIfAbsent(self->cur); + wait(self->buffers[self->cur]->fetchingData.value()); + + // try to fill the next buffer, do not wait for the filling + self->fillBufferIfAbsent(1 - self->cur); + return Void(); +} + +std::shared_ptr TwoBuffers::peek() { + return buffers[cur]->data; +} + +void TwoBuffers::discardAndSwap() { + // invalid cur and change cur to next + buffers[cur]->fetchingData.reset(); + cur = 1 - cur; +} + +size_t TwoBuffers::getBufferSize() { + return buffers[cur]->size; +} + +// only one readNextBlock can be run at a single time, otherwie the same block might be loaded twice +ACTOR Future TwoBuffers::readNextBlock(Reference self, int index) { + if (self->currentFileIndex >= self->files.size()) { + self->buffers[index]->size = 0; + return Void(); + } + state Reference asyncFile = wait(self->bc->readFile(self->files[self->currentFileIndex].fileName)); + state size_t fileSize = self->files[self->currentFileIndex].fileSize; + size_t remaining = fileSize - self->currentFilePosition; + state size_t bytesToRead = std::min(self->bufferCapacity, remaining); + state int bytesRead = + wait(asyncFile->read((uint8_t*)self->buffers[index]->data.get(), bytesToRead, self->currentFilePosition)); + if (bytesRead != bytesToRead) + throw restore_bad_read(); + self->buffers[index]->size = bytesRead; // Set to actual bytes read + // self->bufferOffset[index] = 0; // Reset bufferOffset for the new data + self->currentFilePosition += bytesRead; + if (self->currentFilePosition >= fileSize) { + self->currentFileIndex++; + self->currentFilePosition = 0; + } + return Void(); +} + +void TwoBuffers::fillBufferIfAbsent(int index) { + auto self = Reference::addRef(this); + if (self->buffers[index]->is_valid()) { + // if this buffer is valid, then do not overwrite it + return; + } + self->buffers[index]->fetchingData = readNextBlock(self, index); + return; +} + +class PartitionedLogIteratorTwoBuffers : public PartitionedLogIterator { +private: + Reference twobuffer; + + // consume single version data upto the end of the current batch + // stop if seeing a different version from the parameter. + // it has side effects to update bufferOffset after reading the data + Future>> consumeData(Version firstVersion); + ACTOR static Future>> consumeData( + Reference self, + Version v); + + // each block has a format of {
[mutations]}, need to skip the header to read mutations + // this method check if bufferOffset is at the boundary and advance it if necessary + void removeBlockHeader(); + +public: + // read up to a fixed number of block count + // noted that each version has to be contained within 2 blocks + const int BATCH_READ_BLOCK_COUNT = 10; + const int BLOCK_SIZE = CLIENT_KNOBS->BACKUP_LOGFILE_BLOCK_SIZE; + const int mutationHeaderBytes = sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t); + Reference bc; + int tag; + std::vector files; + bool hasMoreData; // Flag indicating if more data is available + size_t bufferOffset; // Current read offset within each buffer + // empty means no data, future is valid but not ready means being fetched + // future is ready means it currently holds data + + PartitionedLogIteratorTwoBuffers(Reference _bc, + int _tag, + std::vector _files); + + // whether there are more contents for this tag in all files specified + bool hasNext() const; + + // find the next version without advanding the iterator + Future peekNextVersion(); + ACTOR static Future peekNextVersion(Reference iterator); + + // get all the mutations of next version and advance the iterator + // this might issue multiple consumeData() if the data of a version cross buffer boundary + Future>> getNext(); + ACTOR static Future>> getNext( + Reference iterator); +}; + +Future>> PartitionedLogIteratorTwoBuffers::consumeData(Version firstVersion) { + return consumeData(Reference::addRef(this), firstVersion); +} + +ACTOR Future>> PartitionedLogIteratorTwoBuffers::consumeData( + Reference self, + Version firstVersion) { + state Standalone> mutations = Standalone>(); + wait(self->twobuffer->ready()); + std::shared_ptr start = self->twobuffer->peek(); + int size = self->twobuffer->getBufferSize(); + bool foundNewVersion = false; + while (self->bufferOffset < size) { + while (self->bufferOffset < size && *(start.get() + self->bufferOffset) != 0xFF) { + // for each block + self->removeBlockHeader(); + + Version version; + std::memcpy(&version, start.get() + self->bufferOffset, sizeof(Version)); + version = bigEndian64(version); + if (version != firstVersion) { + foundNewVersion = true; + break; // Different version, stop here + } + + int32_t subsequence; + std::memcpy(&subsequence, start.get() + self->bufferOffset + sizeof(Version), sizeof(int32_t)); + subsequence = bigEndian32(subsequence); + + int32_t mutationSize; + std::memcpy( + &mutationSize, start.get() + self->bufferOffset + sizeof(Version) + sizeof(int32_t), sizeof(int32_t)); + mutationSize = bigEndian32(mutationSize); + + // assumption: the entire mutation is within the buffer + size_t mutationTotalSize = self->mutationHeaderBytes + mutationSize; + ASSERT(self->bufferOffset + mutationTotalSize <= size); + + Standalone mutationData = makeString(mutationSize); + std::memcpy( + mutateString(mutationData), start.get() + self->bufferOffset + self->mutationHeaderBytes, mutationSize); + VersionedMutation mutation; + mutation.version = version; + mutation.subsequence = subsequence; + mutation.mutation = mutationData; + mutations.push_back_deep(mutations.arena(), mutation); + // Move the bufferOffset to include this mutation + self->bufferOffset += mutationTotalSize; + } + if (self->bufferOffset < size && *(start.get() + self->bufferOffset) == 0xFF) { + // there are paddings + int remain = self->BLOCK_SIZE - (self->bufferOffset % self->BLOCK_SIZE); + self->bufferOffset += remain; + } + if (foundNewVersion) { + break; + } + } + + return mutations; +} + +void PartitionedLogIteratorTwoBuffers::removeBlockHeader() { + // wait(logFile->append((uint8_t*)&PARTITIONED_MLOG_VERSION, sizeof(PARTITIONED_MLOG_VERSION))); + if (bufferOffset % BLOCK_SIZE == 0) { + bufferOffset += sizeof(uint32_t); + } +} + +PartitionedLogIteratorTwoBuffers::PartitionedLogIteratorTwoBuffers(Reference _bc, + int _tag, + std::vector _files) + : bc(_bc), tag(_tag), files(std::move(_files)), bufferOffset(0) {} + +bool PartitionedLogIteratorTwoBuffers::hasNext() const { + // if there are no more data, return false, else return true + // if currentFileIndex is not the end, then there are more data + // if it is in the process of loading the last block, fileIndex=files.size() - 1 + // because bufferDataSize and buffer are set before adding fileIndex + // if currentFileIndex >= files.size(), then bufferDataSize must has been set + // + return twobuffer->hasNext(); +} + +Future PartitionedLogIteratorTwoBuffers::peekNextVersion() { + return peekNextVersion(Reference::addRef(this)); +} +ACTOR Future PartitionedLogIteratorTwoBuffers::peekNextVersion( + Reference self) { + // Read the first mutation's version + if (!self->hasNext()) { + return Version(0); + } + wait(self->twobuffer->ready()); + std::shared_ptr start = self->twobuffer->peek(); + self->removeBlockHeader(); + Version version; + std::memcpy(&version, start.get() + self->bufferOffset, sizeof(Version)); + version = bigEndian64(version); + return version; +} + +ACTOR Future>> PartitionedLogIteratorTwoBuffers::getNext( + Reference self) { + state Standalone> mutations; + if (!self->hasNext()) { + TraceEvent(SevWarn, "IteratorExhausted").log(); + return mutations; + } + state Version firstVersion = wait(self->peekNextVersion()); + Standalone> firstBatch = wait(self->consumeData(firstVersion)); + mutations = firstBatch; + // If the current buffer is fully consumed, then we need to check the next buffer in case + // the version is sliced across this buffer boundary + while (self->bufferOffset >= self->twobuffer->getBufferSize()) { + self->twobuffer->discardAndSwap(); + // data for one version cannot exceed single buffer size + // if hitting the end of a batch, check the next batch in case version is + if (self->twobuffer->hasNext()) { + // now this is run for each block, but it is not necessary if it is the last block of a file + // cannot check hasMoreData here because other buffer might have the last piece + Standalone> batch = wait(self->consumeData(firstVersion)); + for (const VersionedMutation& vm : batch) { + mutations.push_back_deep(mutations.arena(), vm); + } + } + } + return mutations; +} + +Future>> PartitionedLogIteratorTwoBuffers::getNext() { + return getNext(Reference::addRef(this)); +} + FileBackupAgent::FileBackupAgent() : subspace(Subspace(fileBackupPrefixRange.begin)) // The other subspaces have logUID -> value @@ -1155,6 +1478,7 @@ ACTOR static Future decodeKVPairs(StringRefReader* reader, // If eof reached or first value len byte is 0xFF then a valid block end was reached. if (reader->eof() || *reader->rptr == 0xFF) { + // hfu5: last key is not included results->push_back(results->arena(), KeyValueRef(KeyRef(k, kLen), ValueRef())); break; } @@ -1368,6 +1692,8 @@ struct LogFileWriter { int64_t blockEnd; }; +// input: a string of [param1, param2], [param1, param2] ..., [param1, param2] +// output: a vector of [param1, param2] after removing the length info Standalone> decodeMutationLogFileBlock(const Standalone& buf) { Standalone> results({}, buf.arena()); StringRefReader reader(buf, restore_corrupted_data()); @@ -1640,6 +1966,7 @@ ACTOR static Future addBackupTask(StringRef name, state Reference task(new Task(name, version, doneKey, priority)); // Bind backup config to new task + // allow this new task to find the config(keyspace) of the parent task wait(config.toTask(tr, task, setValidation)); // Set task specific params @@ -3058,6 +3385,7 @@ struct BackupLogsDispatchTask : BackupTaskFuncBase { if (!partitionedLog.present() || !partitionedLog.get()) { // Add the initial log range task to read/copy the mutations and the next logs dispatch task which will // run after this batch is done + // read blog/ prefix and write those (param1, param2) into files wait(success(BackupLogRangeTaskFunc::addTask(tr, taskBucket, task, @@ -3065,6 +3393,7 @@ struct BackupLogsDispatchTask : BackupTaskFuncBase { beginVersion, endVersion, TaskCompletionKey::joinWith(logDispatchBatchFuture)))); + // issue the next key range wait(success(BackupLogsDispatchTask::addTask(tr, taskBucket, task, @@ -3771,6 +4100,7 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase { state Reference inFile = wait(bc.get()->readFile(rangeFile.fileName)); state Standalone> blockData; try { + // data is each real KV, not encoded mutations Standalone> data = wait(decodeRangeFileBlock(inFile, readOffset, readLen, cx)); blockData = data; } catch (Error& e) { @@ -3858,6 +4188,8 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase { // Clear the range we are about to set. // If start == 0 then use fileBegin for the start of the range, else data[start] // If iend == end then use fileEnd for the end of the range, else data[iend] + // it seems we are clear the raw key, without alog prefix, right? + // [80, 120], [100] state KeyRange trRange = KeyRangeRef( (start == 0) ? fileRange.begin : data[start].key.removePrefix(removePrefix.get()).withPrefix(addPrefix.get()), @@ -3945,8 +4277,10 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase { // Update the KV range map if originalFileRange is set std::vector> updateMap; std::vector ranges = Params.getOriginalFileRanges(task); + // if want to restore((a, b), (e, f), (x, y)), then there are 3 ranges for (auto& range : ranges) { Value versionEncoded = BinaryWriter::toValue(Params.inputFile().get(task).version, Unversioned()); + // hfu5 : find how it is synced updateMap.push_back(krmSetRange(tr, restore.applyMutationsMapPrefix(), range, versionEncoded)); } @@ -4061,6 +4395,7 @@ std::vector decodeMutationLogValue(const StringRef& value) { } void AccumulatedMutations::addChunk(int chunkNumber, const KeyValueRef& kv) { + // hfu5[important] : here it validates that partition(chunk) number has to be continuous if (chunkNumber == lastChunkNumber + 1) { lastChunkNumber = chunkNumber; serializedMutations += kv.value.toString(); @@ -4091,6 +4426,7 @@ bool AccumulatedMutations::isComplete() const { // range in ranges. // It is undefined behavior to run this if isComplete() does not return true. bool AccumulatedMutations::matchesAnyRange(const RangeMapFilters& filters) const { + // decode param2, so that each actual mutations are in mutations variable std::vector mutations = decodeMutationLogValue(serializedMutations); for (auto& m : mutations) { if (m.type == MutationRef::Encrypted) { @@ -4143,17 +4479,22 @@ bool RangeMapFilters::match(const KeyRangeRef& range) const { std::vector filterLogMutationKVPairs(VectorRef data, const RangeMapFilters& filters) { std::unordered_map mutationBlocksByVersion; + // first group mutations by version for (auto& kv : data) { + // each kv is a [param1, param2] auto versionAndChunkNumber = decodeMutationLogKey(kv.key); mutationBlocksByVersion[versionAndChunkNumber.first].addChunk(versionAndChunkNumber.second, kv); } std::vector output; + // then add each version to the output, and now each K in output is also a KeyValueRef, + // but mutations of the same versions stay together for (auto& vb : mutationBlocksByVersion) { AccumulatedMutations& m = vb.second; // If the mutations are incomplete or match one of the ranges, include in results. + // hfu5: incomplete, why? if (!m.isComplete() || m.matchesAnyRange(filters)) { output.insert(output.end(), m.kvs.begin(), m.kvs.end()); } @@ -4192,7 +4533,7 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase { state Reference tr(new ReadYourWritesTransaction(cx)); state Reference bc; - state std::vector ranges; + state std::vector ranges; // this is the actual KV, not version loop { try { @@ -4241,7 +4582,7 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase { state int txBytes = 0; for (; i < end && txBytes < dataSizeLimit; ++i) { Key k = dataFiltered[i].key.withPrefix(mutationLogPrefix); - ValueRef v = dataFiltered[i].value; + ValueRef v = dataFiltered[i].value; // each KV is a [param1 with added prefix -> param2] tr->set(k, v); txBytes += k.expectedSize(); txBytes += v.expectedSize(); @@ -4313,6 +4654,7 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase { state Reference task(new Task(RestoreLogDataTaskFunc::name, RestoreLogDataTaskFunc::version, doneKey)); // Create a restore config from the current task and bind it to the new task. + // RestoreConfig(parentTask) createsa prefix of : fileRestorePrefixRange.begin/uid->config/[uid] wait(RestoreConfig(parentTask).toTask(tr, task)); Params.inputFile().set(task, lf); Params.readOffset().set(task, offset); @@ -4342,6 +4684,435 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase { StringRef RestoreLogDataTaskFunc::name = "restore_log_data"_sr; REGISTER_TASKFUNC(RestoreLogDataTaskFunc); +// this method takes a version and a list of list of mutations of this verison, +// each list is returned from a iterator sorted by sub +// it will first add all mutations in subsequence order +// then combine them in old-format (param1, parma2) and return +// this method assumes that iterator can return a list of mutations +/* + mutations are serialized in file as below format: + `` + `` + `` + `…` + ` + + for now, assume each iterator returns a vector > + noted that the mutation's arena has to be valid during the execution + + according to BackupWorker::addMutation, version has 64, sub has 32 and mutation length has 32 + So iterator will combine all mutations in the same version and return a vector + iterator should also return the subsequence together with each mutation + as here we will do another mergeSort for subsequence again to decide the order + and here we will decode the stringref + + + Version currentVersion; + uint32_t sub; + uint32_t mutationSize; + BinaryReader rd(str, Unversioned()); + rd >> currentVersion >> sub >> mutationSize; +*/ + +// type|kLen|vLen|Key|Value +// similar to addBackupMutations( +// MutationList::push_back_deep +Standalone transformMutationToOldFormat(MutationRef m) { + BinaryWriter bw(Unversioned()); + bw << m.type; + bw << m.param1.size(); + bw << m.param1; + bw << m.param2.size(); + bw << m.param2; + return bw.toValue(); +} + +Standalone> generateOldFormatMutations( + Version commitVersion, + std::vector>>& newFormatMutations) { + Standalone> results; + std::vector>> oldFormatMutations; + // mergeSort subversion here + // just do a global sort for everyone + int64_t totalBytes = 0; + std::map>> mutationsBySub; + for (auto& vec : newFormatMutations) { + for (auto& p : vec) { + uint32_t sub = p.subsequence; + BinaryReader reader(p.mutation, IncludeVersion()); + MutationRef mutation; + reader >> mutation; + // transform the mutation format and add to each subversion + // where is mutation written in new format + Standalone mutationOldFormat = transformMutationToOldFormat(mutation); + mutationsBySub[sub].push_back(mutationOldFormat); + totalBytes += mutationOldFormat.size(); + } + } + BinaryWriter param2Writer(Unversioned()); + param2Writer << totalBytes; + + for (auto& mutationsForSub : mutationsBySub) { + // concatenate them to param2Str + for (auto& m : mutationsForSub.second) { + param2Writer.serializeBytes(m); + } + } + Key param2Concat = param2Writer.toValue(); + + // deal with param1 + int32_t hashBase = commitVersion / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE; + + BinaryWriter wrParam1(Unversioned()); // hash/commitVersion/part + wrParam1 << (uint8_t)hashlittle(&hashBase, sizeof(hashBase), 0); + wrParam1 << bigEndian64(commitVersion); + uint32_t* partBuffer = nullptr; + + // just generate a list of (param1, param2) + // are they mutations or are they key value + // param2 has format: length_of_the_mutation_group | encoded_mutation_1 | … | encoded_mutation_k + // each mutation has format type|kLen|vLen|Key|Value + for (int part = 0; part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE < param2Concat.size(); part++) { + KeyValueRef backupKV; + // Assign the second parameter as the part + backupKV.value = param2Concat.substr(part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE, + std::min(param2Concat.size() - part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE, + CLIENT_KNOBS->MUTATION_BLOCK_SIZE)); + // Write the last part of the mutation to the serialization, if the buffer is not defined + if (!partBuffer) { + // part = 0 + wrParam1 << bigEndian32(part); + // Define the last buffer part + partBuffer = (uint32_t*)((char*)wrParam1.getData() + wrParam1.getLength() - sizeof(uint32_t)); + } else { + // part > 0 + *partBuffer = bigEndian32(part); + } + backupKV.key = wrParam1.toValue(); + results.push_back_deep(results.arena(), backupKV); + } + return results; +} + +// each task can be partitioned to smaller ranges because commit proxy would +// only start to commit alog/ prefix mutations to original prefix when +// the final version is set, but do it in a single task for now for simplicity +struct RestoreDispatchPartitionedTaskFunc : RestoreTaskFuncBase { + static StringRef name; + static constexpr uint32_t version = 1; + StringRef getName() const override { return name; }; + + static struct { + static TaskParam beginVersion() { return __FUNCTION__sr; } + static TaskParam endVersion() { return __FUNCTION__sr; } + } Params; + + ACTOR static Future _finish(Reference tr, + Reference taskBucket, + Reference futureBucket, + Reference task) { + state RestoreConfig restore(task); + + state Version beginVersion = Params.beginVersion().get(task); + state Version endVersion = Params.endVersion().get(task); + Reference _bc = wait(restore.sourceContainer().getOrThrow(tr)); + state Reference bc = getBackupContainerWithProxy(_bc); + state Reference onDone = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]); + + state Version restoreVersion; + + wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr)) && + checkTaskVersion(tr->getDatabase(), task, name, version)); + + state int nextEndVersion = + std::min(restoreVersion, endVersion + CLIENT_KNOBS->RESTORE_PARTITIONED_BATCH_VERSION_SIZE); + // update the apply mutations end version so the mutations from the + // previous batch can be applied. + // Only do this once beginVersion is > 0 (it will be 0 for the initial dispatch). + if (beginVersion > 0) { + // hfu5 : unblock apply alog to normal key space + // if the last file is [80, 100] and the restoreVersion is 90, we should use 90 here + // this call an additional call after last file + restore.setApplyEndVersion(tr, std::min(beginVersion, restoreVersion + 1)); + } + + // The applyLag must be retrieved AFTER potentially updating the apply end version. + state int64_t applyLag = wait(restore.getApplyVersionLag(tr)); + + // this is to guarantee commit proxy is catching up doing apply alog -> normal key + // with this backupFile -> alog process + // If starting a new batch and the apply lag is too large then re-queue and wait + if (applyLag > (BUGGIFY ? 1 : CLIENT_KNOBS->CORE_VERSIONSPERSECOND * 300)) { + // Wait a small amount of time and then re-add this same task. + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + wait(success(RestoreDispatchPartitionedTaskFunc::addTask(tr, taskBucket, task, beginVersion, endVersion))); + + TraceEvent("RestorePartitionDispatch") + .detail("RestoreUID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("ApplyLag", applyLag) + .detail("Decision", "too_far_behind") + .detail("TaskInstance", THIS_ADDR); + + wait(taskBucket->finish(tr, task)); + return Void(); + } + + // Get a batch of files. We're targeting batchSize blocks(30k) being dispatched so query for batchSize(150) + // files (each of which is 0 or more blocks). + int fileLimit = 1000; + state RestoreConfig::FileSetT::RangeResultType files = + wait(restore.fileSet().getRange(tr, + Optional({ beginVersion, "" }), + Optional({ endVersion, "" }), + fileLimit)); + + state int64_t maxTagID = 0; + state std::vector logs; + state std::vector ranges; + for (auto f : files.results) { + if (f.isRange) { + ranges.push_back(f); + } else { + logs.push_back(f); + maxTagID = std::max(maxTagID, f.tagId); + } + } + // allPartsDone will be set once all block tasks in the current batch are finished. + // create a new future for the new batch + state Reference allPartsDone = allPartsDone = futureBucket->future(tr); + restore.batchFuture().set(tr, allPartsDone->pack()); + + // if there are no files, if i am not the last batch, then on to the next batch + // if there are no files and i am the last batch, then just wait for applying to finish + // do we need this files.results.size() == 0 at all? + // if (files.results.size() == 0 && beginVersion >= restoreVersion) { + if (beginVersion >= restoreVersion) { + if (applyLag == 0) { + // i am the last batch + // If apply lag is 0 then we are done so create the completion task + wait(success(RestoreCompleteTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal()))); + + TraceEvent("RestorePartitionDispatch") + .detail("RestoreUID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("ApplyLag", applyLag) + .detail("Decision", "restore_complete") + .detail("TaskInstance", THIS_ADDR); + } else { + // i am the last batch, and applyLag is not zero, then I will create another dummy task to wait + // for apply log to be zero, then it will go into the branch above. + // Applying of mutations is not yet finished so wait a small amount of time and then re-add this + // same task. + // this is only to create a dummy one wait for it to finish + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + wait(success( + RestoreDispatchPartitionedTaskFunc::addTask(tr, taskBucket, task, beginVersion, endVersion))); + + TraceEvent("RestorePartitionDispatch") + .detail("RestoreUID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("ApplyLag", applyLag) + .detail("Decision", "apply_still_behind") + .detail("TaskInstance", THIS_ADDR); + } + + // If adding to existing batch then task is joined with a batch future so set done future + // Note that this must be done after joining at least one task with the batch future in case all other + // blockers already finished. + + wait(taskBucket->finish(tr, task)); + return Void(); + } + + // if we reach here, this batch is not empty(i.e. we have range and/or mutation files in this) + + // Start moving through the file list and queuing up blocks. Only queue up to RESTORE_DISPATCH_ADDTASK_SIZE + // blocks per Dispatch task and target batchSize total per batch but a batch must end on a complete version + // boundary so exceed the limit if necessary to reach the end of a version of files. + state std::vector> addTaskFutures; + state int versionRestored = 0; + state int i = 0; + // need to process all range files, because RestoreRangeTaskFunc takes a block offset, keep using ti here. + // this can be done first, because they are not overlap within a restore uid + // each task will read the file, restore those key to their original keys after clear that range + // also it will update the keyVersionMap[key -> versionFromRangeFile] + // by this time, corresponding mutation files within the same version range has not been applied yet + // because they are waiting for the singal of this RestoreDispatchPartitionedTaskFunc + // when log are being applied, they will compare version of key to the keyVersionMap updated by range file + // after each RestoreDispatchPartitionedTaskFunc, keyVersionMap will be clear if mutation version is larger. + for (; i < ranges.size(); ++i) { + RestoreConfig::RestoreFile& f = ranges[i]; + // For each block of the file + for (int64_t j = 0; j < f.fileSize; j += f.blockSize) { + addTaskFutures.push_back(RestoreRangeTaskFunc::addTask(tr, + taskBucket, + task, + f, + j, + std::min(f.blockSize, f.fileSize - j), + TaskCompletionKey::joinWith(allPartsDone))); + } + } + // aggregate logs by tag id + std::vector> filesByTag(maxTagID + 1); + for (RestoreConfig::RestoreFile& f : logs) { + // find the tag, aggregate files by tags + if (f.tagId == -1) { + // inconsistent data + TraceEvent(SevError, "PartitionedLogFileNoTag") + .detail("FileName", f.fileName) + .detail("FileSize", f.fileSize) + .log(); + } else { + filesByTag[f.tagId].push_back(f); + } + } + + state std::vector> iterators(maxTagID + 1); + // for each tag, create an iterator + for (int k = 0; k < filesByTag.size(); k++) { + iterators[i] = makeReference(bc, k, filesByTag[k]); + } + + // mergeSort all iterator until all are exhausted + state int totalItereators = iterators.size(); + // it stores all mutations for the next min version, in new format + state std::vector>> mutationsSingleVersion; + state bool atLeastOneIteratorHasNext = true; + while (atLeastOneIteratorHasNext) { + atLeastOneIteratorHasNext = false; + state int64_t minVersion = std::numeric_limits::max(); + state int k = 0; + for (; k < totalItereators; k++) { + if (!iterators[i]->hasNext()) { + continue; + } + // TODO: maybe embed filtering key into iterator, + // as a result, backup agent should not worry about key range filtering + atLeastOneIteratorHasNext = true; + Version v = wait(iterators[i]->peekNextVersion()); + if (v < minVersion) { + minVersion = v; + mutationsSingleVersion.clear(); + Standalone> tmp = wait(iterators[i]->getNext()); + mutationsSingleVersion.push_back(tmp); + } else if (v == minVersion) { + Standalone> tmp = wait(iterators[i]->getNext()); + mutationsSingleVersion.push_back(tmp); + } + } + + if (atLeastOneIteratorHasNext) { + // transform from new format to old format(param1, param2) + // in the current implementation, each version will trigger a mutation + // if each version data is too small, we might want to combine multiple versions + // for a single mutation + state Standalone> oldFormatMutations = + generateOldFormatMutations(minVersion, mutationsSingleVersion); + state int mutationIndex = 0; + state int txBytes = 0; + state int totalMutation = oldFormatMutations.size(); + state int txBytesLimit = CLIENT_KNOBS->RESTORE_WRITE_TX_SIZE; + state Key mutationLogPrefix = restore.mutationLogPrefix(); + + loop { + try { + if (mutationIndex == totalMutation) { + break; + } + txBytes = 0; + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + + while (mutationIndex < totalMutation && txBytes < txBytesLimit) { + Key k = oldFormatMutations[mutationIndex].key.withPrefix(mutationLogPrefix); + ValueRef v = oldFormatMutations[mutationIndex] + .value; // each KV is a [param1 with added prefix -> param2] + tr->set(k, v); + txBytes += k.expectedSize(); + txBytes += v.expectedSize(); + } + wait(tr->commit()); + } catch (Error& e) { + if (e.code() == error_code_transaction_too_large) + txBytesLimit /= 2; + else + wait(tr->onError(e)); + } + } + ++versionRestored; + } + mutationsSingleVersion.clear(); + } + + // even if file exsists, but they are empty, in this case just start the next batch + if (versionRestored == 0) { + addTaskFutures.push_back( + RestoreDispatchPartitionedTaskFunc::addTask(tr, taskBucket, task, endVersion, nextEndVersion)); + return Void(); + } + + addTaskFutures.push_back(RestoreDispatchPartitionedTaskFunc::addTask( + tr, taskBucket, task, endVersion, nextEndVersion, TaskCompletionKey::noSignal(), allPartsDone)); + + wait(waitForAll(addTaskFutures)); + wait(taskBucket->finish(tr, task)); + + TraceEvent("RestorePartitionDispatch") + .detail("RestoreUID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("EndVersion", endVersion) + .detail("ApplyLag", applyLag) + .detail("Decision", "dispatch_batch_complete") + .detail("TaskInstance", THIS_ADDR); + + return Void(); + } + + ACTOR static Future addTask(Reference tr, + Reference taskBucket, + Reference parentTask, + Version beginVersion, + Version endVersion, + TaskCompletionKey completionKey = TaskCompletionKey::noSignal(), + Reference waitFor = Reference()) { + Key doneKey = wait(completionKey.get(tr, taskBucket)); + + // Use high priority for dispatch tasks that have to queue more blocks for the current batch + unsigned int priority = 0; + state Reference task(new Task( + RestoreDispatchPartitionedTaskFunc::name, RestoreDispatchPartitionedTaskFunc::version, doneKey, priority)); + + // Create a config from the parent task and bind it to the new task + wait(RestoreConfig(parentTask).toTask(tr, task)); + Params.beginVersion().set(task, beginVersion); + Params.endVersion().set(task, endVersion); + + if (!waitFor) { + return taskBucket->addTask(tr, task); + } + + wait(waitFor->onSetAddTask(tr, taskBucket, task)); + return "OnSetAddTask"_sr; + } + + Future execute(Database cx, + Reference tb, + Reference fb, + Reference task) override { + return Void(); + }; + Future finish(Reference tr, + Reference tb, + Reference fb, + Reference task) override { + return _finish(tr, tb, fb, task); + }; +}; +StringRef RestoreDispatchPartitionedTaskFunc::name = "restore_dispatch"_sr; +REGISTER_TASKFUNC(RestoreDispatchPartitionedTaskFunc); struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { static StringRef name; static constexpr uint32_t version = 1; @@ -4367,15 +5138,17 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { state int64_t remainingInBatch = Params.remainingInBatch().get(task); state bool addingToExistingBatch = remainingInBatch > 0; state Version restoreVersion; - state Future> onlyApplyMutationLogs = restore.onlyApplyMutationLogs().get(tr); - wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr)) && success(onlyApplyMutationLogs) && + wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr)) && checkTaskVersion(tr->getDatabase(), task, name, version)); // If not adding to an existing batch then update the apply mutations end version so the mutations from the // previous batch can be applied. Only do this once beginVersion is > 0 (it will be 0 for the initial // dispatch). if (!addingToExistingBatch && beginVersion > 0) { + // hfu5 : unblock apply alog to normal key space + // if the last file is [80, 100] and the restoreVersion is 90, we should use 90 here + // this call an additional call after last file restore.setApplyEndVersion(tr, std::min(beginVersion, restoreVersion + 1)); } @@ -4402,6 +5175,7 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { return Void(); } + // question why do we need beginFile at all -- this to handle stop in the middle of version case state std::string beginFile = Params.beginFile().getOrDefault(task); // Get a batch of files. We're targeting batchSize blocks being dispatched so query for batchSize files // (each of which is 0 or more blocks). @@ -4453,6 +5227,7 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { .detail("TaskInstance", THIS_ADDR); } else if (beginVersion < restoreVersion) { // If beginVersion is less than restoreVersion then do one more dispatch task to get there + // there are no more files between beginVersion and restoreVersion wait(success(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, restoreVersion, "", 0, batchSize))); TraceEvent("FileRestoreDispatch") @@ -4479,6 +5254,7 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { } else { // Applying of mutations is not yet finished so wait a small amount of time and then re-add this // same task. + // this is only to create a dummy one wait for it to finish wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); wait(success(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, beginVersion, "", 0, batchSize))); @@ -4508,6 +5284,9 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { state int64_t beginBlock = Params.beginBlock().getOrDefault(task); state int i = 0; + // for each file + // not creating a new task at this level because restore files are read back together -- both range and log + // so i have to process range files anyway. for (; i < files.results.size(); ++i) { RestoreConfig::RestoreFile& f = files.results[i]; @@ -4517,6 +5296,14 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { if (f.version != endVersion && remainingInBatch <= 0) { // Next start will be at the first version after endVersion at the first file first block ++endVersion; + // beginFile set to empty to indicate we are not in the middle of a range + // by middle of a range, we mean that we have rangeFile v=80, and logFile v=[80, 100], + // then we have to include this log file too in this batch + + // if range comes first, say range=80, log=(81, 100), then its fine we stop before the log, + // what if log comes first: + // range=80, log=(60,90), and range should not be read, but + // what about the 80-90 part? we should not allow those to commit beginFile = ""; beginBlock = 0; break; @@ -4626,8 +5413,12 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { // If beginFile is not empty then we had to stop in the middle of a version (possibly within a file) so we // cannot end the batch here because we do not know if we got all of the files and blocks from the last // version queued, so make sure remainingInBatch is at least 1. - if (!beginFile.empty()) + if (!beginFile.empty()) { + // this is to make sure if we stop in the middle of a version, we do not end this batch + // instead next RestoreDispatchTaskFunc should have addingToExistingBatch as true + // thus they are considered the same batch and alog will be committed only when all of them succeed remainingInBatch = std::max(1, remainingInBatch); + } // If more blocks need to be dispatched in this batch then add a follow-on task that is part of the // allPartsDone group which will won't wait to run and will add more block tasks. @@ -4906,6 +5697,7 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { for (auto const& r : ranges) { keyRangesFilter.push_back_deep(keyRangesFilter.arena(), KeyRangeRef(r)); } + // hfu5 : all files are read from here state Optional restorable = wait(bc->getRestoreSet(restoreVersion, keyRangesFilter, logsOnly, beginVersion)); if (!restorable.present()) @@ -4926,6 +5718,7 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { } else { for (int i = 0; i < restorable.get().ranges.size(); ++i) { const RangeFile& f = restorable.get().ranges[i]; + // hfu5: insert range files first files.push_back({ f.version, f.fileName, true, f.blockSize, f.fileSize }); // In inconsistentSnapshotOnly mode, if all range files have the same version, then it is the // firstConsistentVersion, otherwise unknown (use -1). @@ -4942,6 +5735,7 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { } if (!inconsistentSnapshotOnly) { for (const LogFile& f : restorable.get().logs) { + // hfu5: log files are added to files here files.push_back({ f.beginVersion, f.fileName, false, f.blockSize, f.fileSize, f.endVersion }); } } @@ -4978,8 +5772,10 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { state int nFileBlocks = 0; state int nFiles = 0; auto fileSet = restore.fileSet(); + // as a result, fileSet has everything, including [beginVersion, endVersion] for each tag for (; i != end && txBytes < 1e6; ++i) { txBytes += fileSet.insert(tr, *i); + // handle the remaining nFileBlocks += (i->fileSize + i->blockSize - 1) / i->blockSize; ++nFiles; } diff --git a/fdbclient/KeyRangeMap.actor.cpp b/fdbclient/KeyRangeMap.actor.cpp index a678c28e4a3..0d57cc280e8 100644 --- a/fdbclient/KeyRangeMap.actor.cpp +++ b/fdbclient/KeyRangeMap.actor.cpp @@ -199,10 +199,12 @@ ACTOR Future krmSetRange(Transaction* tr, Key mapPrefix, KeyRange range, V } ACTOR Future krmSetRange(Reference tr, Key mapPrefix, KeyRange range, Value value) { + // keyVersionMap, (a, b), v1 state KeyRange withPrefix = KeyRangeRef(mapPrefix.toString() + range.begin.toString(), mapPrefix.toString() + range.end.toString()); RangeResult old = wait(tr->getRange(lastLessOrEqual(withPrefix.end), firstGreaterThan(withPrefix.end), 1, Snapshot::True)); + // fetch [keyVersionMap/end, keyVersionMap/inc(end)] Value oldValue; bool hasResult = old.size() > 0 && old[0].key.startsWith(mapPrefix); @@ -213,8 +215,10 @@ ACTOR Future krmSetRange(Reference tr, Key mapP if (!conflictRange.empty()) tr->addReadConflictRange(conflictRange); - tr->clear(withPrefix); - tr->set(withPrefix.begin, value); + tr->clear(withPrefix); // clear [keyVersionMap/a, keyVersionMap/b) + tr->set(withPrefix.begin, value); // set [keyVersionMap/a, v1) + // set [keyVersionMap/b, preveiousVersion], because end is exclusive here, + // but starting from end it might be covered by another range file, so set it to old value tr->set(withPrefix.end, oldValue); return Void(); diff --git a/fdbclient/include/fdbclient/BackupAgent.actor.h b/fdbclient/include/fdbclient/BackupAgent.actor.h index c8931750aad..3a163193a55 100644 --- a/fdbclient/include/fdbclient/BackupAgent.actor.h +++ b/fdbclient/include/fdbclient/BackupAgent.actor.h @@ -522,8 +522,8 @@ using RangeResultWithVersion = std::pair; struct RCGroup { RangeResult items; - Version version; - uint64_t groupKey; + Version version; // this is read version for this group + uint64_t groupKey; // this is the original version for this group RCGroup() : version(-1), groupKey(ULLONG_MAX){}; @@ -676,6 +676,7 @@ class KeyBackedTaskConfig : public KeyBackedClass { Reference task, SetValidation setValidation = SetValidation::True) { // Set the uid task parameter + // task's uid is set to my uid TaskParams.uid().set(task, uid); if (!setValidation) { diff --git a/fdbclient/include/fdbclient/ClientKnobs.h b/fdbclient/include/fdbclient/ClientKnobs.h index 6847d5c5d55..e8e1b707c35 100644 --- a/fdbclient/include/fdbclient/ClientKnobs.h +++ b/fdbclient/include/fdbclient/ClientKnobs.h @@ -183,6 +183,7 @@ class ClientKnobs : public KnobsImpl { int RESTORE_DISPATCH_ADDTASK_SIZE; int RESTORE_DISPATCH_BATCH_SIZE; int RESTORE_WRITE_TX_SIZE; + int RESTORE_PARTITIONED_BATCH_VERSION_SIZE; int APPLY_MAX_LOCK_BYTES; int APPLY_MIN_LOCK_BYTES; int APPLY_BLOCK_SIZE; diff --git a/fdbclient/include/fdbclient/PartitionedLogIterator.h b/fdbclient/include/fdbclient/PartitionedLogIterator.h new file mode 100644 index 00000000000..d7ef82e78b8 --- /dev/null +++ b/fdbclient/include/fdbclient/PartitionedLogIterator.h @@ -0,0 +1,26 @@ +#ifndef FDBCLIENT_PARTITIONED_LOG_ITERATOR_H +#define FDBCLIENT_PARTITIONED_LOG_ITERATOR_H + +#include "fdbclient/FDBTypes.h" + +// Structure to represent each mutation entity +struct VersionedMutation { + Version version; + int32_t subsequence; + StringRef mutation; + VersionedMutation(Arena& p, const VersionedMutation& toCopy) : mutation(p, toCopy.mutation) { + version = toCopy.version; + subsequence = toCopy.subsequence; + } + VersionedMutation() {} +}; + +class PartitionedLogIterator : public ReferenceCounted { +public: + virtual bool hasNext() const = 0; + + virtual Future peekNextVersion() = 0; + + virtual Future>> getNext() = 0; +}; +#endif \ No newline at end of file diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index 4a5b247428f..d0d0079bc25 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -510,6 +510,7 @@ class ApplyMetadataMutationsImpl { } void checkSetApplyMutationsEndRange(MutationRef m) { + // only proceed when see mutation with applyMutationsEndRange if (!m.param1.startsWith(applyMutationsEndRange.begin)) { return; } diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index cbed292db6b..9199788e233 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -671,6 +671,7 @@ ACTOR Future addMutation(Reference logFile, StringRef mutation, int64_t* blockEnd, int blockSize) { + // version, subversion, messageSize, message state int bytes = sizeof(Version) + sizeof(uint32_t) + sizeof(int) + mutation.size(); // Convert to big Endianness for version.version, version.sub, and msgSize diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 894b403a76e..9dbb086c60f 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -496,6 +496,7 @@ bool isWhitelisted(const std::vector>& binPathVec, StringR return std::find(binPathVec.begin(), binPathVec.end(), binPath) != binPathVec.end(); } +// hfu5 question is logRangeMutations Key version or actual key ACTOR Future addBackupMutations(ProxyCommitData* self, const std::map* logRangeMutations, LogPushData* toCommit, @@ -532,7 +533,7 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, Key val = valueWriter.toValue(); - BinaryWriter wr(Unversioned()); + BinaryWriter wr(Unversioned()); // backupName/hash/commitVersion/part, so wr is param1 // Serialize the log destination wr.serializeBytes(logRangeMutation->first);