Skip to content

Commit

Permalink
Support instance merge (#294)
Browse files Browse the repository at this point in the history
Add support to merge multiple DBs that have no overlapping data (tombstone included).

Memtables are frozen and then referenced by the target DB. Table files are hard linked
with new file numbers into the target DB. After merge, the sequence numbers of memtables
and L0 files will appear out-of-order compared to a single DB. But for any given user
key, the ordering still holds because there will only be one unique source DB that
contains the key and the source DB's ordering is inherited by the target DB.

If source and target instances share the same block cache, target instance will be able
to reuse cache. This is done by cloning the table readers of source instances to the
target instance. Because the cache key is stored in table reader, reads after the merge
can still retrieve source instances' blocks via old cache key.

Under release build, it takes 8ms to merge a 25GB DB (500 files) into another.

Signed-off-by: tabokie <[email protected]>
Signed-off-by: Yang Zhang <[email protected]>
  • Loading branch information
tabokie authored and v01dstar committed Oct 2, 2024
1 parent 81fbe5e commit 4c52cc0
Show file tree
Hide file tree
Showing 17 changed files with 1,237 additions and 20 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ set(SOURCES
db/db_impl/db_impl_experimental.cc
db/db_impl/db_impl_readonly.cc
db/db_impl/db_impl_secondary.cc
db/db_impl/db_impl_merge.cc
db/db_info_dumper.cc
db/db_iter.cc
db/dbformat.cc
Expand Down Expand Up @@ -1327,6 +1328,7 @@ if(WITH_TESTS)
db/db_memtable_test.cc
db/db_merge_operator_test.cc
db/db_merge_operand_test.cc
db/db_merge_test.cc
db/db_options_test.cc
db/db_properties_test.cc
db/db_range_del_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,9 @@ db_merge_operator_test: $(OBJ_DIR)/db/db_merge_operator_test.o $(TEST_LIBRARY) $
db_merge_operand_test: $(OBJ_DIR)/db/db_merge_operand_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_merge_test: $(OBJ_DIR)/db/db_merge_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_options_test: $(OBJ_DIR)/db/db_options_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
7 changes: 7 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/db_impl/db_impl_debug.cc",
"db/db_impl/db_impl_experimental.cc",
"db/db_impl/db_impl_files.cc",
"db/db_impl/db_impl_merge.cc",
"db/db_impl/db_impl_open.cc",
"db/db_impl/db_impl_readonly.cc",
"db/db_impl/db_impl_secondary.cc",
Expand Down Expand Up @@ -4862,6 +4863,12 @@ cpp_unittest_wrapper(name="db_merge_operator_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_merge_test",
srcs=["db/db_merge_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_options_test",
srcs=["db/db_options_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
99 changes: 99 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,105 @@ Status ColumnFamilyData::RangesOverlapWithMemtables(
return status;
}

Status ColumnFamilyData::GetMemtablesUserKeyRange(PinnableSlice* smallest,
PinnableSlice* largest,
bool* found) {
assert(smallest && largest && found);
Status s;
auto* ucmp = user_comparator();
Arena arena;
ReadOptions read_opts;
read_opts.total_order_seek = true;
MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
merge_iter_builder.AddIterator(mem_->NewIterator(read_opts, &arena));
imm_.current()->AddIterators(read_opts, &merge_iter_builder, false);
ScopedArenaIterator mem_iter(merge_iter_builder.Finish());
mem_iter->SeekToFirst();
if (mem_iter->Valid()) {
auto ukey = mem_iter->user_key();
if (!(*found) || ucmp->Compare(ukey, *smallest) < 0) {
smallest->PinSelf(ukey);
}
mem_iter->SeekToLast();
assert(mem_iter->Valid());
ukey = mem_iter->user_key();
if (!(*found) || ucmp->Compare(*largest, ukey) < 0) {
largest->PinSelf(ukey);
}
*found = true;
}

if (s.ok()) {
autovector<MemTable*> memtables{mem_};
imm_.ExportMemtables(&memtables);
for (auto* mem : memtables) {
auto* iter =
mem->NewRangeTombstoneIterator(read_opts, kMaxSequenceNumber, false);
if (iter != nullptr) {
iter->SeekToFirst();
if (iter->Valid()) {
// It's already a user key.
auto ukey = iter->start_key();
if (!(*found) || ucmp->Compare(ukey, *smallest) < 0) {
smallest->PinSelf(ukey);
}
iter->SeekToLast();
assert(iter->Valid());
// Get the end_key of all tombstones.
ukey = iter->end_key();
if (!(*found) || ucmp->Compare(*largest, ukey) < 0) {
largest->PinSelf(ukey);
}
*found = true;
}
}
}
}

return s;
}

Status ColumnFamilyData::GetUserKeyRange(PinnableSlice* smallest,
PinnableSlice* largest, bool* found) {
assert(smallest && largest && found);
if (ioptions_.compaction_style != CompactionStyle::kCompactionStyleLevel) {
return Status::NotSupported("Unexpected compaction style");
}
Status s = GetMemtablesUserKeyRange(smallest, largest, found);
if (!s.ok()) {
return s;
}

VersionStorageInfo& vsi = *current()->storage_info();
auto* ucmp = user_comparator();
for (const auto& f : vsi.LevelFiles(0)) {
Slice start = f->smallest.user_key();
Slice end = f->largest.user_key();
if (!(*found) || ucmp->Compare(start, *smallest) < 0) {
smallest->PinSelf(start);
}
if (!(*found) || ucmp->Compare(*largest, end) < 0) {
largest->PinSelf(end);
}
*found = true;
}
for (int level = 1; level < vsi.num_levels(); ++level) {
const auto& level_files = vsi.LevelFiles(level);
if (level_files.size() > 0) {
Slice start = level_files.front()->smallest.user_key();
Slice end = level_files.back()->largest.user_key();
if (!(*found) || ucmp->Compare(start, *smallest) < 0) {
smallest->PinSelf(start);
}
if (!(*found) || ucmp->Compare(*largest, end) < 0) {
largest->PinSelf(end);
}
*found = true;
}
}
return s;
}

const int ColumnFamilyData::kCompactAllLevels = -1;
const int ColumnFamilyData::kCompactToBaseLevel = -2;

Expand Down
8 changes: 8 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,14 @@ class ColumnFamilyData {
SuperVersion* super_version,
bool allow_data_in_errors, bool* overlap);

// Get user key range of memtables. Tombstones are counted.
Status GetMemtablesUserKeyRange(PinnableSlice* smallest,
PinnableSlice* largest, bool* found);

// Get user key range of all data. Tombstones are counted.
Status GetUserKeyRange(PinnableSlice* smallest, PinnableSlice* largest,
bool* found);

// A flag to tell a manual compaction is to compact all levels together
// instead of a specific level.
static const int kCompactAllLevels;
Expand Down
9 changes: 9 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class WriteCallback;
struct JobContext;
struct ExternalSstFileInfo;
struct MemTableInfo;
class WriteBlocker;

// Class to maintain directories for all database paths other than main one.
class Directories {
Expand Down Expand Up @@ -1060,6 +1061,13 @@ class DBImpl : public DB {
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
const bool seq_per_batch, const bool batch_per_txn);

// Validate `rhs` can be merged into this DB with given merge options.
Status ValidateForMerge(const MergeInstanceOptions& merge_options,
DBImpl* rhs);

Status MergeDisjointInstances(const MergeInstanceOptions& merge_options,
const std::vector<DB*>& instances) override;

static IOStatus CreateAndNewDirectory(
FileSystem* fs, const std::string& dirname,
std::unique_ptr<FSDirectory>* directory);
Expand Down Expand Up @@ -1615,6 +1623,7 @@ class DBImpl : public DB {
friend class WriteBatchWithIndex;
friend class WriteUnpreparedTxnDB;
friend class WriteUnpreparedTxn;
friend class WriteBlocker;

friend class ForwardIterator;
friend struct SuperVersion;
Expand Down
Loading

0 comments on commit 4c52cc0

Please sign in to comment.