Skip to content

Commit

Permalink
Refactor of ActiveDefrag to reduce latencies (#1242)
Browse files Browse the repository at this point in the history
Refer to:  #1141

This update refactors the defrag code to:
* Make the overall code more readable and maintainable
* Reduce latencies incurred during defrag processing

With this update, the defrag cycle time is reduced to 500us, with more
frequent cycles. This results in much more predictable latencies, with a
dramatic reduction in tail latencies.

(See #1141 for more complete
details.)

This update is focused mostly on the high-level processing, and does NOT
address lower level functions which aren't currently timebound (e.g.
`activeDefragSdsDict()`, and `moduleDefragGlobals()`). These are out of
scope for this update and left for a future update.

I fixed `kvstoreDictLUTDefrag` because it was using up to 7ms on a CME
single shard. See original github issue for performance details.

---------

Signed-off-by: Jim Brunner <[email protected]>
Signed-off-by: Madelyn Olson <[email protected]>
Co-authored-by: Madelyn Olson <[email protected]>
  • Loading branch information
JimB123 and madolson authored Dec 3, 2024
1 parent 3df609e commit 397201c
Show file tree
Hide file tree
Showing 12 changed files with 731 additions and 471 deletions.
2 changes: 1 addition & 1 deletion src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->timeEventNextId = 1;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
Expand Down
5 changes: 3 additions & 2 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3278,10 +3278,11 @@ standardConfig static_configs[] = {
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("tcp-keepalive", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.tcpkeepalive, 300, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-migration-barrier", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_migration_barrier, 1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("active-defrag-cycle-min", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cycle_min, 1, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 1% CPU min (at lower threshold) */
createIntConfig("active-defrag-cycle-max", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cycle_max, 25, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 25% CPU max (at upper threshold) */
createIntConfig("active-defrag-cycle-min", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cpu_min, 1, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 1% CPU min (at lower threshold) */
createIntConfig("active-defrag-cycle-max", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cpu_max, 25, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 25% CPU max (at upper threshold) */
createIntConfig("active-defrag-threshold-lower", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_lower, 10, INTEGER_CONFIG, NULL, NULL), /* Default: don't defrag when fragmentation is below 10% */
createIntConfig("active-defrag-threshold-upper", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_upper, 100, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: maximum defrag force at 100% fragmentation */
createIntConfig("active-defrag-cycle-us", NULL, MODIFIABLE_CONFIG, 0, 100000, server.active_defrag_cycle_us, 500, INTEGER_CONFIG, NULL, updateDefragConfiguration),
createIntConfig("lfu-log-factor", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_log_factor, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("lfu-decay-time", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_decay_time, 1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("replica-priority", "slave-priority", MODIFIABLE_CONFIG, 0, INT_MAX, server.replica_priority, 100, INTEGER_CONFIG, NULL, NULL),
Expand Down
1,078 changes: 666 additions & 412 deletions src/defrag.c

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,7 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) {

/* Reallocate the dictEntry, key and value allocations in a bucket using the
* provided allocation functions in order to defrag them. */
static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragfns, void *privdata) {
static void dictDefragBucket(dictEntry **bucketref, const dictDefragFunctions *defragfns, void *privdata) {
dictDefragAllocFunction *defragalloc = defragfns->defragAlloc;
dictDefragAllocFunction *defragkey = defragfns->defragKey;
dictDefragAllocFunction *defragval = defragfns->defragVal;
Expand Down Expand Up @@ -1499,7 +1499,7 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *pri
* where NULL means that no reallocation happened and the old memory is still
* valid. */
unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata) {
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, const dictDefragFunctions *defragfns, void *privdata) {
int htidx0, htidx1;
const dictEntry *de, *next;
unsigned long m0, m1;
Expand Down
2 changes: 1 addition & 1 deletion src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ void dictSetHashFunctionSeed(uint8_t *seed);
uint8_t *dictGetHashFunctionSeed(void);
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata);
unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata);
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, const dictDefragFunctions *defragfns, void *privdata);
uint64_t dictGetHash(dict *d, const void *key);
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size);

Expand Down
23 changes: 18 additions & 5 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
int didx,
unsigned long v,
dictScanFunction *fn,
dictDefragFunctions *defragfns,
const dictDefragFunctions *defragfns,
void *privdata) {
dict *d = kvstoreGetDict(kvs, didx);
if (!d) return 0;
Expand All @@ -750,14 +750,27 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
* within dict, it only reallocates the memory used by the dict structure itself using
* the provided allocation function. This feature was added for the active defrag feature.
*
* The 'defragfn' callback is called with a reference to the dict
* that callback can reallocate. */
void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn) {
for (int didx = 0; didx < kvs->num_dicts; didx++) {
* With 16k dictionaries for cluster mode with 1 shard, this operation may require substantial time
* to execute. A "cursor" is used to perform the operation iteratively. When first called, a
* cursor value of 0 should be provided. The return value is an updated cursor which should be
* provided on the next iteration. The operation is complete when 0 is returned.
*
* The 'defragfn' callback is called with a reference to the dict that callback can reallocate. */
unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn) {
for (int didx = cursor; didx < kvs->num_dicts; didx++) {
dict **d = kvstoreGetDictRef(kvs, didx), *newd;
if (!*d) continue;

listNode *rehashing_node = NULL;
if (listLength(kvs->rehashing) > 0) {
rehashing_node = ((kvstoreDictMetadata *)dictMetadata(*d))->rehashing_node;
}

if ((newd = defragfn(*d))) *d = newd;
if (rehashing_node) listNodeValue(rehashing_node) = *d;
return (didx + 1);
}
return 0;
}

uint64_t kvstoreGetHash(kvstore *kvs, const void *key) {
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
int didx,
unsigned long v,
dictScanFunction *fn,
dictDefragFunctions *defragfns,
const dictDefragFunctions *defragfns,
void *privdata);
typedef dict *(kvstoreDictLUTDefragFunction)(dict *d);
void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn);
unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn);
void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key);
dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key);
dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing);
Expand Down
29 changes: 5 additions & 24 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1140,8 +1140,8 @@ void databasesCron(void) {
}
}

/* Defrag keys gradually. */
activeDefragCycle();
/* Start active defrag cycle or adjust defrag CPU if needed. */
monitorActiveDefrag();

/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
Expand Down Expand Up @@ -1611,24 +1611,7 @@ void whileBlockedCron(void) {
mstime_t latency;
latencyStartMonitor(latency);

/* In some cases we may be called with big intervals, so we may need to do
* extra work here. This is because some of the functions in serverCron rely
* on the fact that it is performed every 10 ms or so. For instance, if
* activeDefragCycle needs to utilize 25% cpu, it will utilize 2.5ms, so we
* need to call it multiple times. */
long hz_ms = 1000 / server.hz;
while (server.blocked_last_cron < server.mstime) {
/* Defrag keys gradually. */
activeDefragCycle();

server.blocked_last_cron += hz_ms;

/* Increment cronloop so that run_with_period works. */
server.cronloops++;
}

/* Other cron jobs do not need to be done in a loop. No need to check
* server.blocked_last_cron since we have an early exit at the top. */
defragWhileBlocked();

/* Update memory stats during loading (excluding blocked scripts) */
if (server.loading) cronUpdateMemoryStats();
Expand Down Expand Up @@ -2120,7 +2103,7 @@ void initServerConfig(void) {
server.aof_flush_postponed_start = 0;
server.aof_last_incr_size = 0;
server.aof_last_incr_fsync_offset = 0;
server.active_defrag_running = 0;
server.active_defrag_cpu_percent = 0;
server.active_defrag_configuration_changed = 0;
server.notify_keyspace_events = 0;
server.blocked_clients = 0;
Expand Down Expand Up @@ -2722,8 +2705,6 @@ void initServer(void) {
server.db[j].watched_keys = dictCreate(&keylistDictType);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
listSetFreeMethod(server.db[j].defrag_later, (void (*)(void *))sdsfree);
}
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
/* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which
Expand Down Expand Up @@ -5704,7 +5685,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"mem_aof_buffer:%zu\r\n", mh->aof_buffer,
"mem_allocator:%s\r\n", ZMALLOC_LIB,
"mem_overhead_db_hashtable_rehashing:%zu\r\n", mh->overhead_db_hashtable_rehashing,
"active_defrag_running:%d\r\n", server.active_defrag_running,
"active_defrag_running:%d\r\n", server.active_defrag_cpu_percent,
"lazyfree_pending_objects:%zu\r\n", lazyfreeGetPendingObjectsCount(),
"lazyfreed_objects:%zu\r\n", lazyfreeGetFreedObjectsCount()));
freeMemoryOverheadData(mh);
Expand Down
11 changes: 6 additions & 5 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,6 @@ typedef struct serverDb {
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} serverDb;

/* forward declaration for functions ctx */
Expand Down Expand Up @@ -1702,7 +1701,7 @@ struct valkeyServer {
int last_sig_received; /* Indicates the last SIGNAL received, if any (e.g., SIGINT or SIGTERM). */
int shutdown_flags; /* Flags passed to prepareForShutdown(). */
int activerehashing; /* Incremental rehash in serverCron() */
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
int active_defrag_cpu_percent; /* Current desired CPU percentage for active defrag */
char *pidfile; /* PID file path */
int arch_bits; /* 32 or 64 depending on sizeof(long) */
int cronloops; /* Number of times the cron function run */
Expand Down Expand Up @@ -1899,8 +1898,9 @@ struct valkeyServer {
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */
int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */
int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
int active_defrag_cpu_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cpu_max; /* maximal effort for defrag in CPU percentage */
int active_defrag_cycle_us; /* standard duration of defrag cycle */
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from
within the main dict scan */
size_t client_max_querybuf_len; /* Limit for client query buffer length */
Expand Down Expand Up @@ -3353,7 +3353,8 @@ void bytesToHuman(char *s, size_t size, unsigned long long n);
void enterExecutionUnit(int update_cached_time, long long us);
void exitExecutionUnit(void);
void resetServerStats(void);
void activeDefragCycle(void);
void monitorActiveDefrag(void);
void defragWhileBlocked(void);
unsigned int getLRUClock(void);
unsigned int LRU_CLOCK(void);
const char *evictPolicyToString(void);
Expand Down
25 changes: 16 additions & 9 deletions tests/unit/memefficiency.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ run_solo {defrag} {
proc test_active_defrag {type} {
if {[string match {*jemalloc*} [s mem_allocator]] && [r debug mallctl arenas.page] <= 8192} {
test "Active defrag main dictionary: $type" {
r config set hz 100
r config set activedefrag no
r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65
Expand Down Expand Up @@ -89,6 +88,8 @@ run_solo {defrag} {
r config set active-defrag-cycle-min 65
r config set active-defrag-cycle-max 75

after 1000 ;# Give defrag time to work (might be multiple cycles)

# Wait for the active defrag to stop working.
wait_for_condition 2000 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -138,12 +139,13 @@ run_solo {defrag} {
r config resetstat
r config set key-load-delay -25 ;# sleep on average 1/25 usec
r debug loadaof
after 1000 ;# give defrag a chance to work before turning it off
r config set activedefrag no

# measure hits and misses right after aof loading
set misses [s active_defrag_misses]
set hits [s active_defrag_hits]

after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio]
set max_latency 0
foreach event [r latency latest] {
Expand Down Expand Up @@ -181,7 +183,6 @@ run_solo {defrag} {
r flushdb sync
r script flush sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65
Expand All @@ -203,7 +204,7 @@ run_solo {defrag} {
$rd read ; # Discard script load replies
$rd read ; # Discard set replies
}
after 120 ;# serverCron only updates the info once in 100ms
after 1000 ;# give defrag some time to work
if {$::verbose} {
puts "used [s allocator_allocated]"
puts "rss [s allocator_active]"
Expand Down Expand Up @@ -239,6 +240,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag time to work (might be multiple cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -266,7 +269,6 @@ run_solo {defrag} {
test "Active defrag big keys: $type" {
r flushdb sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5
Expand Down Expand Up @@ -361,6 +363,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -407,7 +411,6 @@ run_solo {defrag} {
test "Active defrag pubsub: $type" {
r flushdb sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65
Expand All @@ -430,7 +433,6 @@ run_solo {defrag} {
$rd read ; # Discard set replies
}

after 120 ;# serverCron only updates the info once in 100ms
if {$::verbose} {
puts "used [s allocator_allocated]"
puts "rss [s allocator_active]"
Expand Down Expand Up @@ -466,6 +468,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand All @@ -475,6 +479,7 @@ run_solo {defrag} {
puts [r memory malloc-stats]
fail "defrag didn't stop."
}
r config set activedefrag no ;# disable before we accidentally create more frag

# test the fragmentation is lower
after 120 ;# serverCron only updates the info once in 100ms
Expand Down Expand Up @@ -507,7 +512,6 @@ run_solo {defrag} {
test "Active defrag big list: $type" {
r flushdb sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5
Expand Down Expand Up @@ -561,6 +565,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -619,7 +625,6 @@ run_solo {defrag} {
start_server {tags {"defrag"} overrides {save ""}} {
r flushdb sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5
Expand Down Expand Up @@ -685,6 +690,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down
1 change: 0 additions & 1 deletion tests/unit/moduleapi/defrag.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ set testmodule [file normalize tests/modules/defragtest.so]

start_server {tags {"modules"} overrides {{save ""}}} {
r module load $testmodule 10000
r config set hz 100
r config set active-defrag-ignore-bytes 1
r config set active-defrag-threshold-lower 0
r config set active-defrag-cycle-min 99
Expand Down
Loading

0 comments on commit 397201c

Please sign in to comment.