Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New backup consolidated commit #11760

Draft
wants to merge 1 commit into
base: release-7.3
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion fdbbackup/FileConverter.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
if (fp->empty()) {
self->fileProgress.erase(self->fileProgress.begin());
} else {
// Keep fileProgress sorted
// Keep fileProgress sorted because only the first one can be chagned,so this is enough
for (int i = 1; i < self->fileProgress.size(); i++) {
if (*self->fileProgress[i - 1] <= *self->fileProgress[i]) {
break;
Expand Down Expand Up @@ -489,6 +489,9 @@ ACTOR Future<Void> convert(ConvertParams params) {
arena = Arena();
}

// keep getting data until a new version is encounter, then flush all data buffered and start to buffer for a
// new version.

ArenaReader rd(data.arena, data.message, AssumeVersion(g_network->protocolVersion()));
MutationRef m;
rd >> m;
Expand Down
43 changes: 42 additions & 1 deletion fdbclient/BackupAgentBase.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,12 @@ Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion,
return ret;
}

// given a begin and end version, get the prefix in the database for this range
// which is applyLogKeys.begin/backupUid/hash(uint8)/version(64bites)/part
// returns multiple key ranges, each should be of length APPLY_BLOCK_SIZE
// (64, 200) -> [(64, 128), (128, 192), (192, 200)]
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid) {
Standalone<VectorRef<KeyRangeRef>> ret;

Key baLogRangePrefix = backupUid.withPrefix(applyLogKeys.begin);

//TraceEvent("GetLogRanges").detail("BackupUid", backupUid).detail("Prefix", baLogRangePrefix);
Expand Down Expand Up @@ -292,6 +295,7 @@ void _addResult(bool* tenantMapChanging,
each mutation (if needed) and adding/removing prefixes from the mutations. The final mutations are then added to the
"result" vector alongside their encrypted counterparts (which is added to the "encryptedResult" vector)
*/
// hfu5: value is each Param2
ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
VectorRef<MutationRef>* result,
VectorRef<Optional<MutationRef>>* encryptedResult,
Expand All @@ -318,6 +322,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
throw incompatible_protocol_version();
}

// hfu5: this is the format for Param2
state uint32_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
Expand All @@ -332,6 +337,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,

while (consumed < totalBytes) {
uint32_t type = 0;
// hfu5: format should be type|kLen|vLen|Key|Value
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
state uint32_t len1 = 0;
Expand Down Expand Up @@ -448,6 +454,9 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
} else {
Version ver = key_version->rangeContaining(logValue.param1).value();
//TraceEvent("ApplyMutation").detail("LogValue", logValue).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion);
// version is the version of this mutation decoded from log
// ver is the old version stored in keyVersionMap
// as a result, only add this mutation in log when the version is larger(to work with range file)
if (version > ver && ver != invalidVersion) {
if (removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);
Expand Down Expand Up @@ -587,6 +596,7 @@ ACTOR Future<Void> readCommitted(Database cx,
}
}

// hfu5: read each version, potentially multiple part within the same version
ACTOR Future<Void> readCommitted(Database cx,
PromiseStream<RCGroup> results,
Future<Void> active,
Expand Down Expand Up @@ -639,14 +649,23 @@ ACTOR Future<Void> readCommitted(Database cx,
wait(lock->take(TaskPriority::DefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize()));
releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize());

// iterate on a version range.
// each version - partition is a key-value pair
// hfu5 question: when in the edge case, two partitions of same key goes to two different blocks, so they
// cannot be combined here, what happens?
for (auto& s : rangevalue) {
// hfu5 : (version, part)
uint64_t groupKey = groupBy(s.key).first;
//TraceEvent("Log_ReadCommitted").detail("GroupKey", groupKey).detail("SkipGroup", skipGroup).detail("NextKey", nextKey.key).detail("End", end.key).detail("Valuesize", value.size()).detail("Index",index++).detail("Size",s.value.size());
if (groupKey != skipGroup) {
if (rcGroup.version == -1) {
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
} else if (rcGroup.groupKey != groupKey) {
// hfu5: if seeing a different version, then send result directly, and then create another
// rcGroup as a result, each rcgroup is for a single version, but a single version can span in
// different rcgroups

//TraceEvent("Log_ReadCommitted").detail("SendGroup0", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength",rcGroup.items[0].value.size());
// state uint32_t len(0);
// for (size_t j = 0; j < rcGroup.items.size(); ++j) {
Expand All @@ -665,6 +684,7 @@ ACTOR Future<Void> readCommitted(Database cx,
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
}
// this is each item, so according to kvMutationLogToTransactions, each item should be a partition
rcGroup.items.push_back_deep(rcGroup.items.arena(), s);
}
}
Expand Down Expand Up @@ -706,6 +726,8 @@ Future<Void> readCommitted(Database cx,
cx, results, Void(), lock, range, groupBy, Terminator::True, AccessSystemKeys::True, LockAware::True);
}

// restore transaction has to be first in the batch, or it is the only txn in batch to make sure it never conflicts with
// others.
ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,
Key uid,
Version newBeginVersion,
Expand Down Expand Up @@ -759,6 +781,8 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
state Version lastVersion = invalidVersion;
state bool endOfStream = false;
state int totalBytes = 0;
// two layer of loops, outside loop for each file range,
// inside look for each transaction(version)
loop {
state CommitTransactionRequest req;
state Version newBeginVersion = invalidVersion;
Expand All @@ -774,10 +798,12 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,

BinaryWriter bw(Unversioned());
for (int i = 0; i < group.items.size(); ++i) {
// hfu5 : each value should be a partition
bw.serializeBytes(group.items[i].value);
}
// Parse a single transaction from the backup mutation log
Standalone<StringRef> value = bw.toValue();
// ref: https://github.com/apple/foundationdb/blob/release-6.2/design/backup-dataFormat.md
wait(decodeBackupLogValue(&curReq.arena,
&curReq.transaction.mutations,
&curReq.transaction.encryptedMutations,
Expand Down Expand Up @@ -882,6 +908,13 @@ ACTOR Future<Void> coalesceKeyVersionCache(Key uid,
lastVersion = it.value();
} else {
Version ver = it.value();
// ver: version from keyVersion
// endVersion: after applying a batch of versions from log files, the largest version
// if ver < endVersion, that means this key in keyVersion is outdated
// in this case, runClearRange on the keyVersionMapRange prefix for this key,
// so that the alog key is the truth, otherwise, keyVersionMapRange should be the truth
// each key needs to be individually checked, because even though range file is for a range, log file does
// not
if (ver < endVersion && lastVersion < endVersion && ver != invalidVersion &&
lastVersion != invalidVersion) {
Key removeKey = it.range().begin.withPrefix(mapPrefix);
Expand Down Expand Up @@ -940,15 +973,22 @@ ACTOR Future<Void> applyMutations(Database cx,
}

int rangeCount = std::max(1, CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / maxBytes);
// this means newEndVersion can only be at most of size APPLY_BLOCK_SIZE
state Version newEndVersion = std::min(*endVersion,
((beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE) + rangeCount) *
CLIENT_KNOBS->APPLY_BLOCK_SIZE);

// ranges each represent a partition of version, e.g. [100, 200], [201, 300], [301, 400]
// (64, 200) -> [(64, 128), (128, 192), (192, 200)] assuming block size is 64
state Standalone<VectorRef<KeyRangeRef>> ranges = getApplyRanges(beginVersion, newEndVersion, uid);
// ranges have format: applyLogKeys.begin/uid/hash(uint8)/version(64bites)/part
state size_t idx;
state std::vector<PromiseStream<RCGroup>> results;
state std::vector<Future<Void>> rc;
state std::vector<Reference<FlowLock>> locks;

// each RCGroup is for a single version, each results[i] is for a single range
// one range might have multiple versions
for (int i = 0; i < ranges.size(); ++i) {
results.push_back(PromiseStream<RCGroup>());
locks.push_back(makeReference<FlowLock>(
Expand All @@ -957,6 +997,7 @@ ACTOR Future<Void> applyMutations(Database cx,
}

maxBytes = std::max<int>(maxBytes * CLIENT_KNOBS->APPLY_MAX_DECAY_RATE, CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES);

for (idx = 0; idx < ranges.size(); ++idx) {
int bytes =
wait(kvMutationLogToTransactions(cx,
Expand Down
18 changes: 17 additions & 1 deletion fdbclient/BackupContainerFileSystem.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,15 @@ class BackupContainerFileSystemImpl {
// If "keyRangesFilter" is empty, the file set will cover all key ranges present in the backup.
// It's generally a good idea to specify "keyRangesFilter" to reduce the number of files for
// restore times.
//
// hfu5:1. it first reads and parse snapshot file, each snapshot file can map to a list of range files
// including ranges/ and kvranges/, then it collects range files who has intersecting keys
// 2. not sure why restorable.targetVersion < maxKeyRangeVersion it would continue
// 3. then it has a minKeyRangeVersion representing min version of all range files
// 4. then it read all log files with start smaller than targetVersion and end larget than minKeyRangeVersion
// 4. if the first log file start version is smaller than minKeyRangeVersion, then we do not know the value,
// give up.
// otherwise return both range and log files.
// 5. LogFile object is created in BackupContainerFileSystem::listLogFiles, and tagID are populated for plog
// If "logsOnly" is true, then only log files are returned and "keyRangesFilter" is ignored,
// because the log can contain mutations of the whole key space, unlike range files that each
// is limited to a smaller key range.
Expand Down Expand Up @@ -943,6 +951,7 @@ class BackupContainerFileSystemImpl {
state Version minKeyRangeVersion = MAX_VERSION;
state Version maxKeyRangeVersion = -1;

// iterate each listed file, why still return a vector
std::pair<std::vector<RangeFile>, std::map<std::string, KeyRange>> results =
wait(bc->readKeyspaceSnapshot(snapshots[i]));

Expand All @@ -955,6 +964,7 @@ class BackupContainerFileSystemImpl {
maxKeyRangeVersion = snapshots[i].endVersion;
} else {
for (const auto& rangeFile : results.first) {
// each file is a version on a [begin, end] key range
const auto& keyRange = results.second.at(rangeFile.fileName);
if (keyRange.intersects(keyRangesFilter)) {
restorable.ranges.push_back(rangeFile);
Expand All @@ -971,6 +981,9 @@ class BackupContainerFileSystemImpl {
// 'latestVersion' represents using the minimum restorable version in a snapshot.
restorable.targetVersion = targetVersion == latestVersion ? maxKeyRangeVersion : targetVersion;
// Any version < maxKeyRangeVersion is not restorable.
// hfu5 question: why? what if target version is 8500, and this snapshot has [8000, 8200, 8800]
// do we give up directly? why it is not restorable?
// not give up, try to find the next smaller one
if (restorable.targetVersion < maxKeyRangeVersion)
continue;

Expand All @@ -993,6 +1006,7 @@ class BackupContainerFileSystemImpl {
store(plogs, bc->listLogFiles(minKeyRangeVersion, restorable.targetVersion, true)));

if (plogs.size() > 0) {
// hfu5 : this is how files are decided
logs.swap(plogs);
// sort by tag ID so that filterDuplicates works.
std::sort(logs.begin(), logs.end(), [](const LogFile& a, const LogFile& b) {
Expand All @@ -1005,6 +1019,8 @@ class BackupContainerFileSystemImpl {
restorable.logs.swap(filtered);
// sort by version order again for continuous analysis
std::sort(restorable.logs.begin(), restorable.logs.end());
// sort by version, but isPartitionedLogsContinuous will sort each tag separately
// need to refactor.
if (isPartitionedLogsContinuous(restorable.logs, minKeyRangeVersion, restorable.targetVersion)) {
restorable.continuousBeginVersion = minKeyRangeVersion;
restorable.continuousEndVersion = restorable.targetVersion + 1; // not inclusive
Expand Down
1 change: 1 addition & 0 deletions fdbclient/ClientKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( BACKUP_DISPATCH_ADDTASK_SIZE, 50 );
init( RESTORE_DISPATCH_ADDTASK_SIZE, 150 );
init( RESTORE_DISPATCH_BATCH_SIZE, 30000 ); if( randomize && BUGGIFY ) RESTORE_DISPATCH_BATCH_SIZE = 20;
init (RESTORE_PARTITIONED_BATCH_VERSION_SIZE, 1000000);
init( RESTORE_WRITE_TX_SIZE, 256 * 1024 );
init( APPLY_MAX_LOCK_BYTES, 1e9 );
init( APPLY_MIN_LOCK_BYTES, 11e6 ); //Must be bigger than TRANSACTION_SIZE_LIMIT
Expand Down
Loading