Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

defrag: eliminate persistent kvstore pointer and edge case fixes #1430

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 41 additions & 26 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ typedef doneStatus (*kvstoreHelperPreContinueFn)(monotime endtime, void *privdat
// Private data for main dictionary keys
typedef struct {
kvstoreIterState kvstate;
serverDb *db;
int dbid;
} defragKeysCtx;
static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");

Expand Down Expand Up @@ -735,7 +735,7 @@ static void defragModule(serverDb *db, robj *obj) {
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. */
static void defragKey(defragKeysCtx *ctx, robj **elemref) {
serverDb *db = ctx->db;
serverDb *db = &server.db[ctx->dbid];
int slot = ctx->kvstate.slot;
robj *newob, *ob;
unsigned char *newzl;
Expand Down Expand Up @@ -919,7 +919,7 @@ static doneStatus defragLaterStep(monotime endtime, void *privdata) {
robj *ob = found;

long long key_defragged = server.stat_active_defrag_hits;
bool timeout = (defragLaterItem(ob, &defrag_later_cursor, endtime, ctx->db->id) == 1);
bool timeout = (defragLaterItem(ob, &defrag_later_cursor, endtime, ctx->dbid) == 1);
if (key_defragged != server.stat_active_defrag_hits) {
server.stat_active_defrag_key_hits++;
} else {
Expand Down Expand Up @@ -962,7 +962,10 @@ static doneStatus defragStageKvstoreHelper(monotime endtime,
state.cursor = 0;
return DEFRAG_NOT_DONE;
}
serverAssert(kvs == state.kvs); // Shouldn't change during the stage
if (kvs != state.kvs) {
// There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage.
return DEFRAG_DONE;
}

unsigned int iterations = 0;
unsigned long long prev_defragged = server.stat_active_defrag_hits;
Expand Down Expand Up @@ -1012,26 +1015,30 @@ static doneStatus defragStageKvstoreHelper(monotime endtime,
}


// Note: target is a DB, (not a KVS like most stages)
// Target is a DBID
static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
serverDb *db = (serverDb *)target;
int dbid = (uintptr_t)target;
serverDb *db = &server.db[dbid];

static defragKeysCtx ctx; // STATIC - this persists
if (endtime == 0) {
ctx.db = db;
ctx.dbid = dbid;
// Don't return yet. Call the helper with endtime==0 below.
}
serverAssert(ctx.db == db);
serverAssert(ctx.dbid == dbid);

return defragStageKvstoreHelper(endtime, db->keys,
dbKeysScanCallback, defragLaterStep, &ctx);
}


// Target is a DBID
static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
return defragStageKvstoreHelper(endtime, (kvstore *)target,
int dbid = (uintptr_t)target;
serverDb *db = &server.db[dbid];
return defragStageKvstoreHelper(endtime, db->expires,
scanHashtableCallbackCountScanned, NULL, NULL);
}

Expand Down Expand Up @@ -1222,29 +1229,38 @@ static long long activeDefragTimeProc(struct aeEventLoop *eventLoop, long long i
}

monotime starttime = getMonotonicUs();
monotime endtime = starttime + computeDefragCycleUs();
int dutyCycleUs = computeDefragCycleUs();
monotime endtime = starttime + dutyCycleUs;
bool haveMoreWork = true;
Comment on lines +1232 to +1234
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we were avoiding camel cased variables in methods. But the clang format check seems to be not complaining.


mstime_t latency;
latencyStartMonitor(latency);

if (!defrag.current_stage) {
defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages));
listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages));
// Initialize the stage with endtime==0
doneStatus status = defrag.current_stage->stage_fn(0, defrag.current_stage->target, defrag.current_stage->privdata);
serverAssert(status == DEFRAG_NOT_DONE); // Initialization should always return DEFRAG_NOT_DONE
}
do {
if (!defrag.current_stage) {
defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages));
listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages));
// Initialize the stage with endtime==0
doneStatus status = defrag.current_stage->stage_fn(0, defrag.current_stage->target, defrag.current_stage->privdata);
serverAssert(status == DEFRAG_NOT_DONE); // Initialization should always return DEFRAG_NOT_DONE
}

doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->target, defrag.current_stage->privdata);
if (status == DEFRAG_DONE) {
zfree(defrag.current_stage);
defrag.current_stage = NULL;
}
doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->target, defrag.current_stage->privdata);
if (status == DEFRAG_DONE) {
zfree(defrag.current_stage);
defrag.current_stage = NULL;
}

haveMoreWork = (defrag.current_stage || listLength(defrag.remaining_stages) > 0);
/* If we've completed a stage early, and still have a standard time allotment remaining,
* we'll start another stage. This can happen when defrag is running infrequently, and
* starvation protection has increased the duty-cycle. */
} while (haveMoreWork && getMonotonicUs() <= endtime - server.active_defrag_cycle_us);

latencyEndMonitor(latency);
latencyAddSampleIfNeeded("active-defrag-cycle", latency);

if (defrag.current_stage || listLength(defrag.remaining_stages) > 0) {
if (haveMoreWork) {
return computeDelayMs(endtime);
} else {
endDefragCycle(true);
Expand Down Expand Up @@ -1283,9 +1299,8 @@ static void beginDefragCycle(void) {
defrag.remaining_stages = listCreate();

for (int dbid = 0; dbid < server.dbnum; dbid++) {
serverDb *db = &server.db[dbid];
addDefragStage(defragStageDbKeys, db, NULL);
addDefragStage(defragStageExpiresKvstore, db->expires, NULL);
addDefragStage(defragStageDbKeys, (void *)(uintptr_t)dbid, NULL);
addDefragStage(defragStageExpiresKvstore, (void *)(uintptr_t)dbid, NULL);
}

static getClientChannelsFnWrapper getClientPubSubChannelsFn = {getClientPubSubChannels};
Expand Down
6 changes: 6 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1669,6 +1669,12 @@ void whileBlockedCron(void) {
* latency monitor if this function is called too often. */
if (server.blocked_last_cron >= server.mstime) return;

/* Increment server.cronloops so that run_with_period works. */
long hz_ms = 1000 / server.hz;
int cronloops = (server.mstime - server.blocked_last_cron + (hz_ms - 1)) / hz_ms; // rounding up
server.blocked_last_cron += cronloops * hz_ms;
server.cronloops += cronloops;

mstime_t latency;
latencyStartMonitor(latency);

Expand Down
3 changes: 1 addition & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1900,8 +1900,7 @@ struct valkeyServer {
int sanitize_dump_payload; /* Enables deep sanitization for ziplist and listpack in RDB and RESTORE. */
int skip_checksum_validation; /* Disable checksum validation for RDB and RESTORE payload. */
int jemalloc_bg_thread; /* Enable jemalloc background thread */
int active_defrag_configuration_changed; /* defrag configuration has been changed and need to reconsider
* active_defrag_running in computeDefragCycles. */
int active_defrag_configuration_changed; /* Config changed; need to recompute active_defrag_cpu_percent. */
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */
int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */
Expand Down
8 changes: 5 additions & 3 deletions tests/unit/memefficiency.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,12 @@ run_solo {defrag} {
# make sure the defragger did enough work to keep the fragmentation low during loading.
# we cannot check that it went all the way down, since we don't wait for full defrag cycle to complete.
assert {$frag < 1.4}
# since the AOF contains simple (fast) SET commands (and the cron during loading runs every 1024 commands),
# it'll still not block the loading for long periods of time.
# The AOF contains simple (fast) SET commands (and the cron during loading runs every 1024 commands).
# Even so, defrag can get starved for periods exceeding 100ms. Using 200ms for test stability, and
# a 75% CPU requirement (as set above), we should allow up to 600ms latency
# (as total time = 200 non duty + 600 duty = 800ms, and 75% of 800ms is 600ms).
if {!$::no_latency} {
assert {$max_latency <= 40}
assert {$max_latency <= 600}
}
}
} ;# Active defrag - AOF loading
Expand Down
Loading