Skip to content

Commit

Permalink
Persistence - Remove Unowned Keys
Browse files Browse the repository at this point in the history
For cluster, after startup loading, remove keys
that shouldn't be served by this server based on slot
assignment of a cluster.

Also added stat fields in server to count the total
removed keys and skipped slots from last loading.

Signed-off-by: Liang Tang <[email protected]>
  • Loading branch information
singku committed Jul 30, 2024
1 parent fd58b73 commit 591c814
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 3 deletions.
28 changes: 28 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_legacy.h"

#include <ctype.h>

Expand Down Expand Up @@ -107,6 +108,33 @@ ConnectionType *connTypeOfCluster(void) {
return connectionTypeTcp();
}

void delUnOwnedKeys(void) {
if (!server.cluster_enabled) {
return;
}
unsigned int deleted_keys = 0;
unsigned int unowned_slots = 0;
clusterNode *master;
if (clusterNodeIsMaster(server.cluster->myself)) {
master = server.cluster->myself;
} else {
master = server.cluster->myself->slaveof;
}
for (int i = 0; i < CLUSTER_SLOTS; i++) {
if (server.cluster->slots[i] != master) {
unsigned int deleted = delKeysInSlot(i);
deleted_keys += deleted;
if (deleted > 0) {
unowned_slots++;
serverLog(LL_NOTICE, "Deleted %u keys from unowned slot: %d after loading RDB", deleted, i);
}
}
}
serverLog(LL_NOTICE, "Deleted totoal: %d unowned keys from total: %d unowned slots after loading RDB",
deleted_keys, unowned_slots);
return;
}

/* -----------------------------------------------------------------------------
* DUMP, RESTORE and MIGRATE commands
* -------------------------------------------------------------------------- */
Expand Down
26 changes: 26 additions & 0 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6552,3 +6552,29 @@ void clusterReplicateOpenSlots(void) {

zfree(argv);
}

del_stat delUnOwnedKeys() {
del_stat result = {0, 0};
if (!server.cluster_enabled) {
return result;
}

clusterNode *master;
if (clusterNodeIsMaster(server.cluster->myself)) {
master = myself;
} else {
master = server.cluster->myself->slaveof;
}
for (int i = 0; i < CLUSTER_SLOTS; i++) {
if (server.cluster->slots[i] != master) {
unsigned int deleted = delKeysInSlot(i);
result.deleted_keys += deleted;
if (deleted > 0) {
result.unowned_slots++;
serverLog(LL_NOTICE, "Deleted %u keys from unowned slot: %d after loading RDB", deleted, i);
}
}
}
serverLog(LL_NOTICE, "Deleted totoal: %d unowned keys from total: %d unowned slots after loading RDB", result.deleted_keys, result.unowned_slots);
return result;
}
2 changes: 1 addition & 1 deletion src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,5 +364,5 @@ struct clusterState {
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
};


unsigned int delKeysInSlot(unsigned int hashslot);
#endif // CLUSTER_LEGACY_H
11 changes: 9 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2656,6 +2656,8 @@ void initServer(void) {
server.rdb_save_time_start = -1;
server.rdb_last_load_keys_expired = 0;
server.rdb_last_load_keys_loaded = 0;
server.stat_persistence_startup_load_keys_deleted = 0;
server.stat_persistence_startup_load_unowned_slots = 0;
server.dirty = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
Expand Down Expand Up @@ -5582,7 +5584,9 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
aof_bio_fsync_status == C_OK) ? "ok" : "err",
"aof_last_cow_size:%zu\r\n", server.stat_aof_cow_bytes,
"module_fork_in_progress:%d\r\n", server.child_type == CHILD_TYPE_MODULE,
"module_fork_last_cow_size:%zu\r\n", server.stat_module_cow_bytes));
"module_fork_last_cow_size:%zu\r\n", server.stat_module_cow_bytes,
"total_startup_load_deleted_keys:%lld\r\n", server.stat_persistence_startup_load_keys_deleted,
"total_startup_load_unowned_slots:%lld\r\n", server.stat_persistence_startup_load_unowned_slots));
/* clang-format on */

if (server.aof_enabled) {
Expand Down Expand Up @@ -6941,12 +6945,15 @@ int main(int argc, char **argv) {
serverLog(LL_NOTICE, "Server initialized");
aofLoadManifestFromDisk();
loadDataFromDisk();
del_stat stat = delUnOwnedKeys();
server.stat_persistence_startup_load_keys_deleted = stat.deleted_keys;
server.stat_persistence_startup_load_unowned_slots = stat.unowned_slots;
aofOpenIfNeededOnServerStart();
aofDelHistoryFiles();
if (server.cluster_enabled) {
serverAssert(verifyClusterConfigWithData() == C_OK);
}

delUnOwnedKeys();
for (j = 0; j < CONN_TYPE_MAX; j++) {
connListener *listener = &server.listeners[j];
if (listener->ct == NULL) continue;
Expand Down
5 changes: 5 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1814,6 +1814,8 @@ struct valkeyServer {
invocation of the event loop. */
unsigned int max_new_conns_per_cycle; /* The maximum number of tcp connections that will be accepted during each
invocation of the event loop. */
long long stat_persistence_startup_load_keys_deleted;
long long stat_persistence_startup_load_unowned_slots;
/* AOF persistence */
int aof_enabled; /* AOF configuration */
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
Expand Down Expand Up @@ -3535,6 +3537,9 @@ unsigned long LFUDecrAndReturn(robj *o);
int performEvictions(void);
void startEvictionTimeProc(void);

/* delete unowned keys from database at server start up after loading persistence files. */
void delUnOwnedKeys(void);

/* Keys hashing / comparison functions for dict.c hash tables. */
uint64_t dictSdsHash(const void *key);
uint64_t dictSdsCaseHash(const void *key);
Expand Down

0 comments on commit 591c814

Please sign in to comment.