Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(eviction): Tune eviction threshold in cache mode #4142

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ size_t max_memory_limit = 0;
size_t serialization_max_chunk_size = 0;
Namespaces* namespaces = nullptr;

size_t FetchRssMemory(io::StatusData sdata) {
return sdata.vm_rss + sdata.hugetlb_pages;
}

const char* GlobalStateName(GlobalState s) {
switch (s) {
case GlobalState::ACTIVE:
Expand Down
3 changes: 3 additions & 0 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "core/compact_object.h"
#include "facade/facade_types.h"
#include "facade/op_status.h"
#include "helio/io/proc_reader.h"
#include "util/fibers/fibers.h"
#include "util/fibers/synchronization.h"

Expand Down Expand Up @@ -133,6 +134,8 @@ extern std::atomic_uint64_t rss_mem_peak;

extern size_t max_memory_limit;

size_t FetchRssMemory(io::StatusData sdata);

extern Namespaces* namespaces;

// version 5.11 maps to 511 etc.
Expand Down
13 changes: 2 additions & 11 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ ABSL_DECLARE_FLAG(float, mem_defrag_threshold);
ABSL_DECLARE_FLAG(float, mem_defrag_waste_threshold);
ABSL_DECLARE_FLAG(uint32_t, mem_defrag_check_sec_interval);
ABSL_DECLARE_FLAG(std::vector<std::string>, rename_command);
ABSL_DECLARE_FLAG(double, oom_deny_ratio);
ABSL_DECLARE_FLAG(bool, lua_resp2_legacy_float);

namespace dfly {
Expand Down Expand Up @@ -456,12 +455,7 @@ TEST_F(DflyEngineTest, OOM) {
/// Reproduces the case where items with expiry data were evicted,
/// and then written with the same key.
TEST_F(DflyEngineTest, Bug207) {
max_memory_limit = 300000;

absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
ResetService();

max_memory_limit = 1200000; // 1.2mb = 300000 * 4
shard_set->TEST_EnableCacheMode();

ssize_t i = 0;
Expand Down Expand Up @@ -489,10 +483,7 @@ TEST_F(DflyEngineTest, Bug207) {
}

TEST_F(DflyEngineTest, StickyEviction) {
max_memory_limit = 300000;
absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
ResetService();
max_memory_limit = 600000; // 0.6mb
shard_set->TEST_EnableCacheMode();

string tmp_val(100, '.');
Expand Down
65 changes: 59 additions & 6 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,35 @@ optional<uint32_t> GetPeriodicCycleMs() {
return clock_cycle_ms;
}

/* Eviction begins when the shard's free memory falls below
(max_memory_limit * kEvictionMemoryThresholdFactor) / <number of shards>.
Same logic for the RSS memory. */
constexpr double kEvictionMemoryThresholdFactor = 0.1;

ssize_t CalculateMemoryBudget(size_t max_memory, size_t used_memory) {
if (max_memory >= used_memory) {
return max_memory - used_memory;
}
// negative value indicates memory overuse
return -1 * ssize_t(used_memory - max_memory);
}

ssize_t CalculateFreeMemoryOnShard() {
// Calculate how much memory is used by all shards
const size_t current_global_rss_memory = used_mem_current.load(memory_order_relaxed);
return CalculateMemoryBudget(max_memory_limit, current_global_rss_memory) / shard_set->size();
}

ssize_t CalculateFreeRssMemoryOnShard(size_t global_rss_memory_limit) {
// Calculate how much rss memory is used by all shards
io::Result<io::StatusData> sdata_res = io::ReadStatusInfo();
size_t current_global_rss_memory =
sdata_res ? FetchRssMemory(sdata_res.value()) : rss_mem_current.load(memory_order_relaxed);
// Calculate how much free rss memory we have
return CalculateMemoryBudget(global_rss_memory_limit, current_global_rss_memory) /
shard_set->size();
}

} // namespace

__thread EngineShard* EngineShard::shard_ = nullptr;
Expand Down Expand Up @@ -702,7 +731,6 @@ void EngineShard::RetireExpiredAndEvict() {
// that is serializing a big value.
{ std::unique_lock lk(db_slice.GetSerializationMutex()); }
constexpr double kTtlDeleteLimit = 200;
constexpr double kRedLimitFactor = 0.1;

uint32_t traversed = GetMovingSum6(TTL_TRAVERSE);
uint32_t deleted = GetMovingSum6(TTL_DELETE);
Expand All @@ -716,7 +744,25 @@ void EngineShard::RetireExpiredAndEvict() {
ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10);
}

ssize_t eviction_redline = size_t(max_memory_limit * kRedLimitFactor) / shard_set->size();
const size_t shards_count = shard_set->size();
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;

// We start eviction when we have less than 10% of free memory
const ssize_t shard_memory_budget_threshold =
ssize_t(max_memory_limit * kEvictionMemoryThresholdFactor) / shards_count;

size_t max_rss_memory;
ssize_t shard_rss_memory_budget_threshold; // Threshold for free rss memory
if (rss_oom_deny_ratio > 0.0) {
max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
// We start eviction when we have less than 10% of free rss memory
shard_rss_memory_budget_threshold =
ssize_t(max_rss_memory * kEvictionMemoryThresholdFactor) / shards_count;
} else {
max_rss_memory = std::numeric_limits<size_t>::max();
// We should never evict based on rss memory
shard_rss_memory_budget_threshold = std::numeric_limits<ssize_t>::min();
}

DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();
Expand All @@ -734,11 +780,18 @@ void EngineShard::RetireExpiredAndEvict() {
counter_[TTL_DELETE].IncBy(stats.deleted);
}

// if our budget is below the limit
if (db_slice.memory_budget() < eviction_redline && GetFlag(FLAGS_enable_heartbeat_eviction)) {
// Memory budget for this shard
const ssize_t memory_budget = CalculateFreeMemoryOnShard();
const ssize_t rss_memory_budget = CalculateFreeRssMemoryOnShard(max_rss_memory);

// If our budget is below the limit we need to evict
if ((memory_budget < shard_memory_budget_threshold ||
rss_memory_budget < shard_rss_memory_budget_threshold) &&
GetFlag(FLAGS_enable_heartbeat_eviction)) {
uint32_t starting_segment_id = rand() % pt->GetSegmentCount();
db_slice.FreeMemWithEvictionStep(i, starting_segment_id,
eviction_redline - db_slice.memory_budget());
const size_t goal_bytes = std::max(shard_memory_budget_threshold - memory_budget,
shard_rss_memory_budget_threshold - rss_memory_budget);
db_slice.FreeMemWithEvictionStep(i, starting_segment_id, goal_bytes);
}
}

Expand Down
18 changes: 5 additions & 13 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ ABSL_FLAG(dfly::MemoryBytesFlag, maxmemory, dfly::MemoryBytesFlag{},
"0 - means the program will automatically determine its maximum memory usage. "
"default: 0");

ABSL_FLAG(double, oom_deny_ratio, 1.1,
"commands with flag denyoom will return OOM when the ratio between maxmemory and used "
"memory is above this value");
ABSL_RETIRED_FLAG(
double, oom_deny_ratio, 1.1,
"commands with flag denyoom will return OOM when the ratio between maxmemory and used "
"memory is above this value");

ABSL_FLAG(double, rss_oom_deny_ratio, 1.25,
"When the ratio between maxmemory and RSS memory exceeds this value, commands marked as "
Expand Down Expand Up @@ -722,11 +723,6 @@ string FailedCommandToString(std::string_view command, facade::CmdArgList args,
return result;
}

void SetOomDenyRatioOnAllThreads(double ratio) {
auto cb = [ratio](unsigned, auto*) { ServerState::tlocal()->oom_deny_ratio = ratio; };
shard_set->pool()->AwaitBrief(cb);
}

void SetRssOomDenyRatioOnAllThreads(double ratio) {
auto cb = [ratio](unsigned, auto*) { ServerState::tlocal()->rss_oom_deny_ratio = ratio; };
shard_set->pool()->AwaitBrief(cb);
Expand Down Expand Up @@ -793,9 +789,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterMutable("max_eviction_per_heartbeat");
config_registry.RegisterMutable("max_segment_to_consider");

config_registry.RegisterSetter<double>("oom_deny_ratio",
[](double val) { SetOomDenyRatioOnAllThreads(val); });

config_registry.RegisterSetter<double>("rss_oom_deny_ratio",
[](double val) { SetRssOomDenyRatioOnAllThreads(val); });

Expand Down Expand Up @@ -872,7 +865,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
});
Transaction::Init(shard_num);

SetOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_oom_deny_ratio));
SetRssOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_rss_oom_deny_ratio));

// Requires that shard_set will be initialized before because server_family_.Init might
Expand Down Expand Up @@ -1000,7 +992,7 @@ bool ShouldDenyOnOOM(const CommandId* cid) {
uint64_t start_ns = absl::GetCurrentTimeNanos();
auto memory_stats = etl.GetMemoryUsage(start_ns);

if (memory_stats.used_mem > (max_memory_limit * etl.oom_deny_ratio) ||
if (memory_stats.used_mem > max_memory_limit ||
(etl.rss_oom_deny_ratio > 0 &&
memory_stats.rss_mem > (max_memory_limit * etl.rss_oom_deny_ratio))) {
DLOG(WARNING) << "Out of memory, used " << memory_stats.used_mem << " ,rss "
Expand Down
5 changes: 2 additions & 3 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ ABSL_DECLARE_FLAG(bool, tls);
ABSL_DECLARE_FLAG(string, tls_ca_cert_file);
ABSL_DECLARE_FLAG(string, tls_ca_cert_dir);
ABSL_DECLARE_FLAG(int, replica_priority);
ABSL_DECLARE_FLAG(double, oom_deny_ratio);
ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio);

bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) {
Expand Down Expand Up @@ -1023,7 +1022,7 @@ void ServerFamily::UpdateMemoryGlobalStats() {

io::Result<io::StatusData> sdata_res = io::ReadStatusInfo();
if (sdata_res) {
size_t total_rss = sdata_res->vm_rss + sdata_res->hugetlb_pages;
size_t total_rss = FetchRssMemory(sdata_res.value());
rss_mem_current.store(total_rss, memory_order_relaxed);
if (rss_mem_peak.load(memory_order_relaxed) < total_rss) {
rss_mem_peak.store(total_rss, memory_order_relaxed);
Expand Down Expand Up @@ -1348,7 +1347,7 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
&resp->body());
}
if (sdata_res.has_value()) {
size_t rss = sdata_res->vm_rss + sdata_res->hugetlb_pages;
size_t rss = FetchRssMemory(sdata_res.value());
AppendMetricWithoutLabels("used_memory_rss_bytes", "", rss, MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("swap_memory_bytes", "", sdata_res->vm_swap, MetricType::GAUGE,
&resp->body());
Expand Down
2 changes: 1 addition & 1 deletion src/server/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class ServerState { // public struct - to allow initialization.
uint64_t used_mem = 0;
uint64_t rss_mem = 0;
};

MemoryUsageStats GetMemoryUsage(uint64_t now_ns);

bool AllowInlineScheduling() const;
Expand Down Expand Up @@ -294,7 +295,6 @@ class ServerState { // public struct - to allow initialization.

// Exec descriptor frequency count for this thread.
absl::flat_hash_map<std::string, unsigned> exec_freq_count;
double oom_deny_ratio;
double rss_oom_deny_ratio;

private:
Expand Down
6 changes: 2 additions & 4 deletions tests/dragonfly/generic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,12 @@ async def test_reply_guard_oom(df_factory, df_seeder_factory):

@pytest.mark.asyncio
async def test_denyoom_commands(df_factory):
df_server = df_factory.create(
proactor_threads=1, maxmemory="256mb", oom_deny_commands="get", oom_deny_ratio=0.7
)
df_server = df_factory.create(proactor_threads=1, maxmemory="256mb", oom_deny_commands="get")
df_server.start()
client = df_server.client()
await client.execute_command("DEBUG POPULATE 7000 size 44000")

min_deny = 250 * 1024 * 1024 # 250mb
min_deny = 256 * 1024 * 1024 # 256mb
info = await client.info("memory")
print(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}')
assert info["used_memory"] > min_deny, "Weak testcase: too little used memory"
Expand Down
71 changes: 71 additions & 0 deletions tests/dragonfly/memory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,74 @@ async def test_rss_oom_ratio(df_factory: DflyInstanceFactory, admin_port):
# new client create shoud not fail after memory usage decrease
client = df_server.client()
await client.execute_command("set x y")


@pytest.mark.asyncio
@dfly_args(
{
"proactor_threads": 1,
"cache_mode": "true",
"maxmemory": "256mb",
"rss_oom_deny_ratio": 0.5,
}
)
async def test_cache_eviction_with_rss_deny_oom(
async_client: aioredis.Redis,
df_server: DflyInstance,
):
"""
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
"""

max_memory = 256 * 1024 * 1024 # 256 MB
data_fill_size = int(0.25 * max_memory) # 25% of max memory
rss_increase_size = int(0.3 * max_memory) # 30% of max memory

key_size = 1024 # 1 mb
num_keys = data_fill_size // key_size

# Fill data to 25% of max memory
await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", key_size)

await asyncio.sleep(1) # Wait for RSS heartbeat update

# First test that eviction is not triggered without connection creation
stats_info = await async_client.info("stats")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets also add a check here that used memory is below maxmemory * 0.9 and rss curr memory is below
maxmemory * rss_oom_deny_ratio * 0.9

assert stats_info["evicted_keys"] == 0, "No eviction should start yet."

# Get RSS memory before creating new connections
memory_info = await async_client.info("memory")
rss_before_connections = memory_info["used_memory_rss"]

# Increase RSS memory by 30% of max memory
# We can simulate RSS increase by creating new connections
# Estimate memory per connection
estimated_connection_memory = 15 * 1024 # 15 KB per connection
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make this test supper stable and more controlled
I would run
CONFIG SET enable_heartbeat_eviction false
before creating the connections
after that check the rss make sure its above the threshold
then run
CONFIG SET enable_heartbeat_eviction true
and after that check that rss memory dropped below the threshold and check the stats for evicted keys as you did below

num_connections = rss_increase_size // estimated_connection_memory
connections = []
for _ in range(num_connections):
conn = aioredis.Redis(port=df_server.port)
await conn.ping()
connections.append(conn)

await asyncio.sleep(1) # Wait for RSS heartbeat update

# Get RSS memory after creating new connections
memory_info = await async_client.info("memory")

assert (
memory_info["used_memory"] < data_fill_size
), "Used memory should be less than initial fill size due to eviction."

assert (
memory_info["used_memory_rss"] > rss_before_connections
), "RSS memory should have increased."

# Check that eviction has occurred
stats_info = await async_client.info("stats")
assert (
stats_info["evicted_keys"] > 0
), "Eviction should have occurred due to rss memory pressure."

for conn in connections:
await conn.close()
Loading