Skip to content

Commit

Permalink
Add CheckInRange API
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Zhang <[email protected]>
  • Loading branch information
v01dstar committed Oct 2, 2024
1 parent a2bd301 commit 8fd25f7
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 4 deletions.
2 changes: 2 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,8 @@ class DBImpl : public DB {
Status ValidateForMerge(const MergeInstanceOptions& merge_options,
DBImpl* rhs);

Status CheckInRange(const Slice* begin, const Slice* end) override;

Status MergeDisjointInstances(const MergeInstanceOptions& merge_options,
const std::vector<DB*>& instances) override;

Expand Down
33 changes: 30 additions & 3 deletions db/db_impl/db_impl_merge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,32 @@ Status DBImpl::ValidateForMerge(const MergeInstanceOptions& mopts,
return Status::OK();
}

Status DBImpl::CheckInRange(const Slice* begin, const Slice* end) {
Status s;
if (begin == nullptr && end == nullptr) {
return s;
}
for (auto cfd : *versions_->GetColumnFamilySet()) {
assert(cfd != nullptr);
auto* comparator = cfd->user_comparator();
PinnableSlice smallest, largest;
bool found = false;
s = cfd->GetUserKeyRange(&smallest, &largest, &found);
if (!s.ok()) {
return s;
}
if (!found) {
continue;
}
if (begin != nullptr && comparator->Compare(smallest, *begin) < 0) {
return Status::InvalidArgument("Has data smaller than left boundary");
} else if (end != nullptr && comparator->Compare(largest, *end) >= 0) {
return Status::InvalidArgument("Has data larger than right boundary");
}
}
return s;
}

Status DBImpl::MergeDisjointInstances(const MergeInstanceOptions& merge_options,
const std::vector<DB*>& instances) {
Status s;
Expand Down Expand Up @@ -155,7 +181,8 @@ Status DBImpl::MergeDisjointInstances(const MergeInstanceOptions& merge_options,
//
// - Acquire snapshots of table files (`SuperVersion`).
//
// - Do memtable merge if needed. We do this together with acquiring snapshot
// - Do memtable merge if needed. We do this together with acquiring
// snapshot
// to avoid the case where a memtable is flushed shortly after being
// merged, and the resulting L0 data is merged again as a table file.
assert(s.ok());
Expand All @@ -166,8 +193,8 @@ Status DBImpl::MergeDisjointInstances(const MergeInstanceOptions& merge_options,
// source data. See [A].
uint64_t max_seq_number = 0;
// RocksDB's recovery is heavily dependent on the one-on-one mapping between
// memtable and WAL (even when WAL is empty). Each memtable keeps a record of
// `next_log_number` to mark its position within a series of WALs. This
// memtable and WAL (even when WAL is empty). Each memtable keeps a record
// of `next_log_number` to mark its position within a series of WALs. This
// counter must be monotonic. We work around this issue by setting the
// counters of all involved memtables to the same maximum value. See [B].
uint64_t max_log_number = 0;
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);

while (writer.ConsumeOne());
while (writer.ConsumeOne())
;
MultiBatchWriteCommit(writer.request);

WriteStatusCheck(writer.status);
Expand Down
6 changes: 6 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ class DB {
return Status::NotSupported("`MergeDisjointInstances` not implemented");
}

// Check all data written before this call is in the range [begin, end).
// Return InvalidArgument if not.
virtual Status CheckInRange(const Slice* /*begin*/, const Slice* /*end*/) {
return Status::NotSupported("`AssertInRange` not implemented");
}

virtual Status Resume() { return Status::NotSupported(); }

// Close the DB by releasing resources, closing files etc. This should be
Expand Down

0 comments on commit 8fd25f7

Please sign in to comment.