Skip to content

Commit

Permalink
Replace dict with hashtable: SET datatype
Browse files Browse the repository at this point in the history
Signed-off-by: Rain Valentine <[email protected]>
  • Loading branch information
SoftlyRaining committed Dec 11, 2024
1 parent 3eb8314 commit 1138fac
Show file tree
Hide file tree
Showing 17 changed files with 324 additions and 374 deletions.
73 changes: 48 additions & 25 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ void keysScanCallback(void *privdata, void *entry) {

/* This callback is used by scanGenericCommand in order to collect elements
* returned by the dictionary iterator into a list. */
void scanCallback(void *privdata, const dictEntry *de) {
void dictScanCallback(void *privdata, const dictEntry *de) {
scanData *data = (scanData *)privdata;
list *keys = data->keys;
robj *o = data->o;
Expand All @@ -998,9 +998,7 @@ void scanCallback(void *privdata, const dictEntry *de) {
}
}

if (o->type == OBJ_SET) {
key = keysds;
} else if (o->type == OBJ_HASH) {
if (o->type == OBJ_HASH) {
key = keysds;
if (!data->only_keys) {
val = dictGetVal(de);
Expand All @@ -1013,13 +1011,36 @@ void scanCallback(void *privdata, const dictEntry *de) {
val = sdsnewlen(buf, len);
}
} else {
serverPanic("Type not handled in SCAN callback.");
serverPanic("Type not handled in dict SCAN callback.");
}

listAddNodeTail(keys, key);
if (val) listAddNodeTail(keys, val);
}

void hashtableScanCallback(void *privdata, void *entry) {
scanData *data = (scanData *)privdata;
robj *o = data->o;
list *keys = data->keys;
data->sampled++;

/* currently only implemented for SET scan */
serverAssert(o && o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHTABLE);
sds key = (sds)entry; /* Specific for OBJ_SET */

/* o and typename can not have values at the same time. */
serverAssert(!((data->type != LLONG_MAX) && o));

/* Filter element if it does not match the pattern. */
if (data->pattern) {
if (!stringmatchlen(data->pattern, sdslen(data->pattern), key, sdslen(key), 0)) {
return;
}
}

listAddNodeTail(keys, key);
}

/* Try to parse a SCAN cursor stored at object 'o':
* if the cursor is valid, store it as unsigned integer into *cursor and
* returns C_OK. Otherwise return C_ERR and send an error to the
Expand Down Expand Up @@ -1083,7 +1104,6 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
sds typename = NULL;
long long type = LLONG_MAX;
int patlen = 0, use_pattern = 0, only_keys = 0;
dict *ht;

/* Object must be NULL (to iterate keys names), or the type of the object
* must be Set, Sorted Set, or Hash. */
Expand Down Expand Up @@ -1152,34 +1172,35 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
* just return everything inside the object in a single call, setting the
* cursor to zero to signal the end of the iteration. */

/* Handle the case of a hash table. */
ht = NULL;
/* Handle the case of kvstore, dict or hashtable. */
dict *dict_table = NULL;
hashtable *hashtable_table = NULL;
int shallow_copied_list_items = 0;
if (o == NULL) {
ht = NULL;
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
ht = o->ptr;
shallow_copied_list_items = 1;
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHTABLE) {
hashtable_table = o->ptr;
shallow_copied_list_items = 1;
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
ht = o->ptr;
dict_table = o->ptr;
shallow_copied_list_items = 1;
} else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
ht = zs->dict;
dict_table = zs->dict;
/* scanning ZSET allocates temporary strings even though it's a dict */
shallow_copied_list_items = 0;
}

list *keys = listCreate();
/* Set a free callback for the contents of the collected keys list.
* For the main keyspace dict, and when we scan a key that's dict encoded
* (we have 'ht'), we don't need to define free method because the strings
* in the list are just a shallow copy from the pointer in the dictEntry.
* When scanning a key with other encodings (e.g. listpack), we need to
* free the temporary strings we add to that list.
* The exception to the above is ZSET, where we do allocate temporary
* strings even when scanning a dict. */
if (o && (!ht || o->type == OBJ_ZSET)) {
/* Set a free callback for the contents of the collected keys list if they
* are deep copied temporary strings. We must not free them if they are just
* a shallow copy - a pointer to the actual data in the data structure */
if (!shallow_copied_list_items) {
listSetFreeMethod(keys, (void (*)(void *))sdsfree);
}

/* For main dictionary scan or data structure using hashtable. */
if (!o || ht) {
/* For main dictionary scan or scannable data structure. */
if (!o || dict_table || hashtable_table) {
/* We set the max number of iterations to ten times the specified
* COUNT, so if the hash table is in a pathological state (very
* sparsely populated) we avoid to block too much time at the cost
Expand Down Expand Up @@ -1219,8 +1240,10 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
* If cursor is empty, we should try exploring next non-empty slot. */
if (o == NULL) {
cursor = kvstoreScan(c->db->keys, cursor, onlydidx, keysScanCallback, NULL, &data);
} else if (dict_table) {
cursor = dictScan(dict_table, cursor, dictScanCallback, &data);
} else {
cursor = dictScan(ht, cursor, scanCallback, &data);
cursor = hashtableScan(hashtable_table, cursor, hashtableScanCallback, &data);
}
} while (cursor && maxiterations-- && data.sampled < count);
} else if (o->type == OBJ_SET) {
Expand Down
29 changes: 17 additions & 12 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -916,30 +916,35 @@ void debugCommand(client *c) {
addReplyVerbatim(c, stats, sdslen(stats), "txt");
sdsfree(stats);
} else if (!strcasecmp(c->argv[1]->ptr, "htstats-key") && c->argc >= 3) {
robj *o;
dict *ht = NULL;
int full = 0;

if (c->argc >= 4 && !strcasecmp(c->argv[3]->ptr, "full")) full = 1;

if ((o = objectCommandLookupOrReply(c, c->argv[2], shared.nokeyerr)) == NULL) return;
robj *o = objectCommandLookupOrReply(c, c->argv[2], shared.nokeyerr);
if (o == NULL) return;

/* Get the hash table reference from the object, if possible. */
/* Get the dict reference from the object, if possible. */
dict *d = NULL;
hashtable *hs = NULL;
switch (o->encoding) {
case OBJ_ENCODING_SKIPLIST: {
zset *zs = o->ptr;
ht = zs->dict;
d = zs->dict;
} break;
case OBJ_ENCODING_HT: ht = o->ptr; break;
case OBJ_ENCODING_HT: d = o->ptr; break;
case OBJ_ENCODING_HASHTABLE: hs = o->ptr; break;
}

if (ht == NULL) {
addReplyError(c, "The value stored at the specified key is not "
"represented using an hash table");
} else {
if (d != NULL) {
char buf[4096];
dictGetStats(buf, sizeof(buf), ht, full);
dictGetStats(buf, sizeof(buf), d, full);
addReplyVerbatim(c, buf, strlen(buf), "txt");
} else if (hs != NULL) {
char buf[4096];
hashtableGetStats(buf, sizeof(buf), hs, full);
addReplyVerbatim(c, buf, strlen(buf), "txt");
} else {
addReplyError(c, "The value stored at the specified key is not "
"represented using an hash table");
}
} else if (!strcasecmp(c->argv[1]->ptr, "change-repl-id") && c->argc == 2) {
serverLog(LL_NOTICE, "Changing replication IDs after receiving DEBUG change-repl-id");
Expand Down
42 changes: 28 additions & 14 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
*/

#include "server.h"
#include "hashtable.h"
#include <stddef.h>

#ifdef HAVE_DEFRAG
Expand Down Expand Up @@ -378,6 +379,20 @@ static void activeDefragSdsDict(dict *d, int val_type) {
} while (cursor != 0);
}

void activeDefragSdsHashtableCallback(void *privdata, void *entry_ref) {
UNUSED(privdata);
sds *sds_ref = (sds *)entry_ref;
sds new_sds = activeDefragSds(*sds_ref);
if (new_sds != NULL) *sds_ref = new_sds;
}

void activeDefragSdsHashtable(hashtable *hs) {
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(hs, cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}

/* Defrag a list of ptr, sds or robj string values */
static void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
quicklistNode *newnode, *node = *node_ref;
Expand Down Expand Up @@ -496,11 +511,9 @@ static void scanCallbackCountScanned(void *privdata, const dictEntry *de) {
}

static void scanLaterSet(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT) return;
dict *d = ob->ptr;
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc,
.defragKey = (dictDefragAllocFunction *)activeDefragSds};
*cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL);
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HASHTABLE) return;
hashtable *hs = ob->ptr;
*cursor = hashtableScanDefrag(hs, *cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
}

static void scanLaterHash(robj *ob, unsigned long *cursor) {
Expand Down Expand Up @@ -559,15 +572,16 @@ static void defragHash(robj *ob) {
}

static void defragSet(robj *ob) {
dict *d, *newd;
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT);
d = ob->ptr;
if (dictSize(d) > server.active_defrag_max_scan_fields)
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HASHTABLE);
hashtable *ht = ob->ptr;
if (hashtableSize(ht) > server.active_defrag_max_scan_fields) {
defragLater(ob);
else
activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL);
/* defrag the dict struct and tables */
if ((newd = dictDefragTables(ob->ptr))) ob->ptr = newd;
} else {
activeDefragSdsHashtable(ht);
}
/* defrag the hashtable struct and members */
hashtable *newHashtable = hashtableDefragTables(ht, activeDefragAlloc);
if (newHashtable) ob->ptr = newHashtable;
}

/* Defrag callback for radix tree iterator, called for each node,
Expand Down Expand Up @@ -765,7 +779,7 @@ static void defragKey(defragKeysCtx *ctx, robj **elemref) {
serverPanic("Unknown list encoding");
}
} else if (ob->type == OBJ_SET) {
if (ob->encoding == OBJ_ENCODING_HT) {
if (ob->encoding == OBJ_ENCODING_HASHTABLE) {
defragSet(ob);
} else if (ob->encoding == OBJ_ENCODING_INTSET || ob->encoding == OBJ_ENCODING_LISTPACK) {
void *newptr, *ptr = ob->ptr;
Expand Down
10 changes: 9 additions & 1 deletion src/hashtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ void *hashtableMetadata(hashtable *ht) {
}

/* Returns the number of entries stored. */
size_t hashtableSize(hashtable *ht) {
size_t hashtableSize(const hashtable *ht) {
return ht->used[0] + ht->used[1];
}

Expand Down Expand Up @@ -1180,6 +1180,14 @@ hashtable *hashtableDefragTables(hashtable *ht, void *(*defragfn)(void *)) {
return ht1;
}

/* Used for releasing memory to OS to avoid unnecessary CoW. Called when we've
* forked and memory won't be used again. See zmadvise_dontneed() */
void dismissHashtable(hashtable *ht) {
for (int i = 0; i < 2; i++) {
zmadvise_dontneed(ht->tables[i], numBuckets(ht->bucket_exp[i]) * sizeof(bucket *));
}
}

/* Returns 1 if an entry was found matching the key. Also points *found to it,
* if found is provided. Returns 0 if no matching entry was found. */
int hashtableFind(hashtable *ht, const void *key, void **found) {
Expand Down
3 changes: 2 additions & 1 deletion src/hashtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void hashtableRelease(hashtable *ht);
void hashtableEmpty(hashtable *ht, void(callback)(hashtable *));
hashtableType *hashtableGetType(hashtable *ht);
void *hashtableMetadata(hashtable *ht);
size_t hashtableSize(hashtable *ht);
size_t hashtableSize(const hashtable *ht);
size_t hashtableBuckets(hashtable *ht);
size_t hashtableChainedBuckets(hashtable *ht, int table);
size_t hashtableMemUsage(hashtable *ht);
Expand All @@ -123,6 +123,7 @@ int hashtableTryExpand(hashtable *ht, size_t size);
int hashtableExpandIfNeeded(hashtable *ht);
int hashtableShrinkIfNeeded(hashtable *ht);
hashtable *hashtableDefragTables(hashtable *ht, void *(*defragfn)(void *));
void dismissHashtable(hashtable *ht);

/* Entries */
int hashtableFind(hashtable *ht, const void *key, void **found);
Expand Down
6 changes: 3 additions & 3 deletions src/lazyfree.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) {
if (obj->type == OBJ_LIST && obj->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = obj->ptr;
return ql->len;
} else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
dict *ht = obj->ptr;
return dictSize(ht);
} else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HASHTABLE) {
hashtable *ht = obj->ptr;
return hashtableSize(ht);
} else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = obj->ptr;
return zs->zsl->length;
Expand Down
39 changes: 29 additions & 10 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -11017,27 +11017,38 @@ typedef struct {
ValkeyModuleScanKeyCB fn;
} ScanKeyCBData;

static void moduleScanKeyCallback(void *privdata, const dictEntry *de) {
static void moduleScanKeyDictCallback(void *privdata, const dictEntry *de) {
ScanKeyCBData *data = privdata;
sds key = dictGetKey(de);
robj *o = data->key->value;
robj *field = createStringObject(key, sdslen(key));
robj *value = NULL;
if (o->type == OBJ_SET) {
value = NULL;
} else if (o->type == OBJ_HASH) {
if (o->type == OBJ_HASH) {
sds val = dictGetVal(de);
value = createStringObject(val, sdslen(val));
} else if (o->type == OBJ_ZSET) {
double *val = (double *)dictGetVal(de);
value = createStringObjectFromLongDouble(*val, 0);
} else {
serverPanic("unexpected object type");
}

data->fn(data->key, field, value, data->user_data);
decrRefCount(field);
if (value) decrRefCount(value);
}

static void moduleScanKeyHashtableCallback(void *privdata, void *entry) {
ScanKeyCBData *data = privdata;
robj *o = data->key->value;
serverAssert(o->type == OBJ_SET);
sds key = entry;
robj *field = createStringObject(key, sdslen(key));

data->fn(data->key, field, NULL, data->user_data);
decrRefCount(field);
}

/* Scan api that allows a module to scan the elements in a hash, set or sorted set key
*
* Callback for scan implementation.
Expand Down Expand Up @@ -11091,14 +11102,15 @@ int VM_ScanKey(ValkeyModuleKey *key, ValkeyModuleScanCursor *cursor, ValkeyModul
errno = EINVAL;
return 0;
}
dict *ht = NULL;
dict *d = NULL;
hashtable *ht = NULL;
robj *o = key->value;
if (o->type == OBJ_SET) {
if (o->encoding == OBJ_ENCODING_HT) ht = o->ptr;
if (o->encoding == OBJ_ENCODING_HASHTABLE) ht = o->ptr;
} else if (o->type == OBJ_HASH) {
if (o->encoding == OBJ_ENCODING_HT) ht = o->ptr;
if (o->encoding == OBJ_ENCODING_HT) d = o->ptr;
} else if (o->type == OBJ_ZSET) {
if (o->encoding == OBJ_ENCODING_SKIPLIST) ht = ((zset *)o->ptr)->dict;
if (o->encoding == OBJ_ENCODING_SKIPLIST) d = ((zset *)o->ptr)->dict;
} else {
errno = EINVAL;
return 0;
Expand All @@ -11108,9 +11120,16 @@ int VM_ScanKey(ValkeyModuleKey *key, ValkeyModuleScanCursor *cursor, ValkeyModul
return 0;
}
int ret = 1;
if (ht) {
if (d) {
ScanKeyCBData data = {key, privdata, fn};
cursor->cursor = dictScan(d, cursor->cursor, moduleScanKeyDictCallback, &data);
if (cursor->cursor == 0) {
cursor->done = 1;
ret = 0;
}
} else if (ht) {
ScanKeyCBData data = {key, privdata, fn};
cursor->cursor = dictScan(ht, cursor->cursor, moduleScanKeyCallback, &data);
cursor->cursor = hashtableScan(ht, cursor->cursor, moduleScanKeyHashtableCallback, &data);
if (cursor->cursor == 0) {
cursor->done = 1;
ret = 0;
Expand Down
Loading

0 comments on commit 1138fac

Please sign in to comment.