Skip to content

Commit

Permalink
Add write amp based rate limiter
Browse files Browse the repository at this point in the history
Signed-off-by: v01dstar <[email protected]>
Signed-off-by: Yang Zhang <[email protected]>
  • Loading branch information
v01dstar committed Sep 20, 2024
1 parent 49a8021 commit ca2cf28
Show file tree
Hide file tree
Showing 12 changed files with 906 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,7 @@ set(SOURCES
utilities/persistent_cache/block_cache_tier_metadata.cc
utilities/persistent_cache/persistent_cache_tier.cc
utilities/persistent_cache/volatile_tier_impl.cc
utilities/rate_limiters/write_amp_based_rate_limiter.cc
utilities/simulator_cache/cache_simulator.cc
utilities/simulator_cache/sim_cache.cc
utilities/table_properties_collectors/compact_on_deletion_collector.cc
Expand Down Expand Up @@ -1450,6 +1451,7 @@ if(WITH_TESTS)
utilities/options/options_util_test.cc
utilities/persistent_cache/hash_table_test.cc
utilities/persistent_cache/persistent_cache_test.cc
utilities/rate_limiters/write_amp_based_rate_limiter_test.cc
utilities/simulator_cache/cache_simulator_test.cc
utilities/simulator_cache/sim_cache_test.cc
utilities/table_properties_collectors/compact_on_deletion_collector_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1990,6 +1990,9 @@ wide_column_serialization_test: $(OBJ_DIR)/db/wide/wide_column_serialization_tes
wide_columns_helper_test: $(OBJ_DIR)/db/wide/wide_columns_helper_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

write_amp_based_rate_limiter_test: $(OBJ_DIR)/utilities/rate_limiters/write_amp_based_rate_limiter_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

#-------------------------------------------------
# make install related stuff
PREFIX ?= /usr/local
Expand Down
7 changes: 7 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"utilities/persistent_cache/block_cache_tier_metadata.cc",
"utilities/persistent_cache/persistent_cache_tier.cc",
"utilities/persistent_cache/volatile_tier_impl.cc",
"utilities/rate_limiters/write_amp_based_rate_limiter.cc",
"utilities/simulator_cache/cache_simulator.cc",
"utilities/simulator_cache/sim_cache.cc",
"utilities/table_properties_collectors/compact_on_deletion_collector.cc",
Expand Down Expand Up @@ -5574,6 +5575,12 @@ cpp_unittest_wrapper(name="work_queue_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="write_amp_based_rate_limiter_test",
srcs=["utilities/rate_limiters/write_amp_based_rate_limiter_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="write_batch_test",
srcs=["db/write_batch_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
10 changes: 10 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "rocksdb/write_batch.h"
#include "rocksdb/write_buffer_manager.h"
#include "utilities/merge_operators.h"
#include "utilities/rate_limiters/write_amp_based_rate_limiter.h"

using ROCKSDB_NAMESPACE::BackupEngine;
using ROCKSDB_NAMESPACE::BackupEngineOptions;
Expand Down Expand Up @@ -95,6 +96,7 @@ using ROCKSDB_NAMESPACE::NewCompactOnDeletionCollectorFactory;
using ROCKSDB_NAMESPACE::NewGenericRateLimiter;
using ROCKSDB_NAMESPACE::NewLRUCache;
using ROCKSDB_NAMESPACE::NewRibbonFilterPolicy;
using ROCKSDB_NAMESPACE::NewWriteAmpBasedRateLimiter;
using ROCKSDB_NAMESPACE::OptimisticTransactionDB;
using ROCKSDB_NAMESPACE::OptimisticTransactionOptions;
using ROCKSDB_NAMESPACE::Options;
Expand Down Expand Up @@ -3997,6 +3999,14 @@ rocksdb_ratelimiter_t* rocksdb_ratelimiter_create_auto_tuned(
return rate_limiter;
}

rocksdb_ratelimiter_t* rocksdb_writeampbasedratelimiter_create(
int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness) {
rocksdb_ratelimiter_t* rate_limiter = new rocksdb_ratelimiter_t;
rate_limiter->rep.reset(NewWriteAmpBasedRateLimiter(
rate_bytes_per_sec, refill_period_us, fairness));
return rate_limiter;
}

void rocksdb_ratelimiter_destroy(rocksdb_ratelimiter_t* limiter) {
delete limiter;
}
Expand Down
22 changes: 18 additions & 4 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,8 @@ ColumnFamilyData::ColumnFamilyData(
}
}

RecalculateWriteStallConditions(mutable_cf_options_);
RecalculateWriteStallConditions(mutable_cf_options_,
ioptions_.rate_limiter.get());

if (cf_options.table_factory->IsInstanceOf(
TableFactory::kBlockBasedTableName()) &&
Expand Down Expand Up @@ -935,7 +936,7 @@ ColumnFamilyData::GetWriteStallConditionAndCause(
}

WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) {
const MutableCFOptions& mutable_cf_options, RateLimiter* rate_limiter) {
auto write_stall_condition = WriteStallCondition::kNormal;
if (current_ != nullptr) {
auto* vstorage = current_->storage_info();
Expand Down Expand Up @@ -1064,6 +1065,9 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
// compaction.
write_controller_token_ =
write_controller->GetCompactionPressureToken();
if (rate_limiter) {
rate_limiter->PaceUp(false /*critical*/);
}
} else if (vstorage->estimated_compaction_needed_bytes() >=
GetPendingCompactionBytesForCompactionSpeedup(
mutable_cf_options, vstorage)) {
Expand Down Expand Up @@ -1093,6 +1097,16 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
4);
}
}
if (rate_limiter) {
// pace up limiter when close to write stall
if (write_stall_condition != WriteStallCondition::kNormal ||
vstorage->l0_delay_trigger_count() >=
0.8 * mutable_cf_options.level0_slowdown_writes_trigger ||
vstorage->estimated_compaction_needed_bytes() >=
0.5 * mutable_cf_options.soft_pending_compaction_bytes_limit) {
rate_limiter->PaceUp(true /*critical*/);
}
}
prev_compaction_needed_bytes_ = compaction_needed_bytes;
}
return write_stall_condition;
Expand Down Expand Up @@ -1320,8 +1334,8 @@ void ColumnFamilyData::InstallSuperVersion(
// Should not recalculate slow down condition if nothing has changed, since
// currently RecalculateWriteStallConditions() treats it as further slowing
// down is needed.
super_version_->write_stall_condition =
RecalculateWriteStallConditions(mutable_cf_options);
super_version_->write_stall_condition = RecalculateWriteStallConditions(
mutable_cf_options, ioptions_.rate_limiter.get());
} else {
super_version_->write_stall_condition =
old_superversion->write_stall_condition;
Expand Down
3 changes: 2 additions & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ class ColumnFamilyData {
// Recalculate some stall conditions, which are changed only during
// compaction, adding new memtable and/or recalculation of compaction score.
WriteStallCondition RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options);
const MutableCFOptions& mutable_cf_options,
RateLimiter* rate_limiter = nullptr);

void set_initialized() { initialized_.store(true); }

Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -1681,6 +1681,10 @@ extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t*
rocksdb_ratelimiter_create_auto_tuned(int64_t rate_bytes_per_sec,
int64_t refill_period_us,
int32_t fairness);
extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t*
rocksdb_writeampbasedratelimiter_create(int64_t rate_bytes_per_sec,
int64_t refill_period_us,
int32_t fairness);
extern ROCKSDB_LIBRARY_API void rocksdb_ratelimiter_destroy(
rocksdb_ratelimiter_t*);

Expand Down
13 changes: 13 additions & 0 deletions include/rocksdb/rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class RateLimiter {
virtual Status SetSingleBurstBytes(int64_t /* single_burst_bytes */) {
return Status::NotSupported();
}
// Dynamically change rate limiter's auto_tuned mode.
virtual void SetAutoTuned(bool /*auto_tuned*/) {}

// Deprecated. New RateLimiter derived classes should override
// Request(const int64_t, const Env::IOPriority, Statistics*) or
Expand Down Expand Up @@ -120,6 +122,8 @@ class RateLimiter {

virtual int64_t GetBytesPerSecond() const = 0;

virtual bool GetAutoTuned() const { return false; }

virtual bool IsRateLimited(OpType op_type) {
if ((mode_ == RateLimiter::Mode::kWritesOnly &&
op_type == RateLimiter::OpType::kRead) ||
Expand All @@ -130,6 +134,8 @@ class RateLimiter {
return true;
}

virtual void PaceUp(bool /*critical*/) {}

protected:
Mode GetMode() { return mode_; }

Expand Down Expand Up @@ -165,4 +171,11 @@ extern RateLimiter* NewGenericRateLimiter(
RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly,
bool auto_tuned = false);

extern RateLimiter* NewWriteAmpBasedRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us = 100 * 1000,
int32_t fairness = 10,
RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly,
bool auto_tuned = false, int tune_per_sec = 1,
size_t smooth_window_size = 300, size_t recent_window_size = 30);

} // namespace ROCKSDB_NAMESPACE
2 changes: 2 additions & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ LIB_SOURCES = \
utilities/persistent_cache/block_cache_tier_metadata.cc \
utilities/persistent_cache/persistent_cache_tier.cc \
utilities/persistent_cache/volatile_tier_impl.cc \
utilities/rate_limiters/write_amp_based_rate_limiter.cc \
utilities/simulator_cache/cache_simulator.cc \
utilities/simulator_cache/sim_cache.cc \
utilities/table_properties_collectors/compact_on_deletion_collector.cc \
Expand Down Expand Up @@ -619,6 +620,7 @@ TEST_MAIN_SOURCES = \
utilities/options/options_util_test.cc \
utilities/persistent_cache/hash_table_test.cc \
utilities/persistent_cache/persistent_cache_test.cc \
utilities/rate_limiters/write_amp_based_rate_limiter_test.cc \
utilities/simulator_cache/cache_simulator_test.cc \
utilities/simulator_cache/sim_cache_test.cc \
utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \
Expand Down
Loading

0 comments on commit ca2cf28

Please sign in to comment.