Skip to content

Commit

Permalink
Convert SET from dict -> hashset (squashed)
Browse files Browse the repository at this point in the history
Signed-off-by: Rain Valentine <[email protected]>
  • Loading branch information
SoftlyRaining committed Oct 17, 2024
1 parent 8fe59b3 commit add04d0
Show file tree
Hide file tree
Showing 12 changed files with 281 additions and 323 deletions.
64 changes: 45 additions & 19 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ int objectTypeCompare(robj *o, long long target) {
}
/* This callback is used by scanGenericCommand in order to collect elements
* returned by the dictionary iterator into a list. */
void scanCallback(void *privdata, const dictEntry *de) {
void dictScanCallback(void *privdata, const dictEntry *de) {
scanData *data = (scanData *)privdata;
list *keys = data->keys;
robj *o = data->o;
Expand All @@ -911,8 +911,6 @@ void scanCallback(void *privdata, const dictEntry *de) {

if (o == NULL) {
key = keysds;
} else if (o->type == OBJ_SET) {
key = keysds;
} else if (o->type == OBJ_HASH) {
key = keysds;
if (!data->only_keys) {
Expand All @@ -926,13 +924,37 @@ void scanCallback(void *privdata, const dictEntry *de) {
val = sdsnewlen(buf, len);
}
} else {
serverPanic("Type not handled in SCAN callback.");
serverPanic("Type not handled in dict SCAN callback.");
}

listAddNodeTail(keys, key);
if (val) listAddNodeTail(keys, val);
}

void hashsetScanCallback(void *privdata, void *voidElement) {
scanData *data = (scanData *)privdata;
sds key = (sds)voidElement;

list *keys = data->keys;
robj *o = data->o;
data->sampled++;

/* o and typename can not have values at the same time. */
serverAssert(!((data->type != LLONG_MAX) && o));

// currently only implemented for SET scan
serverAssert(o && o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHSET);

/* Filter element if it does not match the pattern. */
if (data->pattern) {
if (!stringmatchlen(data->pattern, sdslen(data->pattern), key, sdslen(key), 0)) {
return;
}
}

listAddNodeTail(keys, key);
}

/* Try to parse a SCAN cursor stored at object 'o':
* if the cursor is valid, store it as unsigned integer into *cursor and
* returns C_OK. Otherwise return C_ERR and send an error to the
Expand Down Expand Up @@ -996,7 +1018,6 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
sds typename = NULL;
long long type = LLONG_MAX;
int patlen = 0, use_pattern = 0, only_keys = 0;
dict *ht;

/* Object must be NULL (to iterate keys names), or the type of the object
* must be Set, Sorted Set, or Hash. */
Expand Down Expand Up @@ -1065,34 +1086,37 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
* just return everything inside the object in a single call, setting the
* cursor to zero to signal the end of the iteration. */

/* Handle the case of a hash table. */
ht = NULL;
/* Handle the case of a dict or hashset. */
dict *dictTable = NULL;
hashset *hashsetTable = NULL;
if (o == NULL) {
ht = NULL;
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
ht = o->ptr;
dictTable = NULL;
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHSET) {
hashsetTable = o->ptr;
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
ht = o->ptr;
dictTable = o->ptr;
} else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
ht = zs->dict;
dictTable = zs->dict;
}

list *keys = listCreate();
/* Set a free callback for the contents of the collected keys list.
* For the main keyspace dict, and when we scan a key that's dict encoded
* (we have 'ht'), we don't need to define free method because the strings
* in the list are just a shallow copy from the pointer in the dictEntry.
* For the main keyspace dict, when we scan a key that's dict encoded
* (we have 'dictTable'), or when we scan a key that's hashset encoded
* (we have 'hashsetTable') we don't need to define free method because the
* strings in the list are just a shallow copy from the pointer in the
* dictEntry.
* When scanning a key with other encodings (e.g. listpack), we need to
* free the temporary strings we add to that list.
* The exception to the above is ZSET, where we do allocate temporary
* strings even when scanning a dict. */
if (o && (!ht || o->type == OBJ_ZSET)) {
if (o && ((!dictTable && !hashsetTable) || o->type == OBJ_ZSET)) {
listSetFreeMethod(keys, (void (*)(void *))sdsfree);
}

/* For main dictionary scan or data structure using hashtable. */
if (!o || ht) {
if (!o || dictTable || hashsetTable) {
/* We set the max number of iterations to ten times the specified
* COUNT, so if the hash table is in a pathological state (very
* sparsely populated) we avoid to block too much time at the cost
Expand Down Expand Up @@ -1130,9 +1154,11 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
/* In cluster mode there is a separate dictionary for each slot.
* If cursor is empty, we should try exploring next non-empty slot. */
if (o == NULL) {
cursor = kvstoreScan(c->db->keys, cursor, onlydidx, scanCallback, NULL, &data);
cursor = kvstoreScan(c->db->keys, cursor, onlydidx, dictScanCallback, NULL, &data);
} else if (dictTable) {
cursor = dictScan(dictTable, cursor, dictScanCallback, &data);
} else {
cursor = dictScan(ht, cursor, scanCallback, &data);
cursor = hashsetScan(hashsetTable, cursor, hashsetScanCallback, &data, 0);
}
} while (cursor && maxiterations-- && data.sampled < count);
} else if (o->type == OBJ_SET) {
Expand Down
29 changes: 17 additions & 12 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -915,30 +915,35 @@ void debugCommand(client *c) {
addReplyVerbatim(c, stats, sdslen(stats), "txt");
sdsfree(stats);
} else if (!strcasecmp(c->argv[1]->ptr, "htstats-key") && c->argc >= 3) {
robj *o;
dict *ht = NULL;
int full = 0;

if (c->argc >= 4 && !strcasecmp(c->argv[3]->ptr, "full")) full = 1;

if ((o = objectCommandLookupOrReply(c, c->argv[2], shared.nokeyerr)) == NULL) return;
robj *o = objectCommandLookupOrReply(c, c->argv[2], shared.nokeyerr);
if (o == NULL) return;

/* Get the hash table reference from the object, if possible. */
/* Get the dict reference from the object, if possible. */
dict *d = NULL;
hashset *hs = NULL;
switch (o->encoding) {
case OBJ_ENCODING_SKIPLIST: {
zset *zs = o->ptr;
ht = zs->dict;
d = zs->dict;
} break;
case OBJ_ENCODING_HT: ht = o->ptr; break;
case OBJ_ENCODING_HT: d = o->ptr; break;
case OBJ_ENCODING_HASHSET: hs = o->ptr; break;
}

if (ht == NULL) {
addReplyError(c, "The value stored at the specified key is not "
"represented using an hash table");
} else {
if (d != NULL) {
char buf[4096];
dictGetStats(buf, sizeof(buf), ht, full);
dictGetStats(buf, sizeof(buf), d, full);
addReplyVerbatim(c, buf, strlen(buf), "txt");
} else if (hs != NULL) {
char buf[4096];
hashsetGetStats(buf, sizeof(buf), hs, full);
addReplyVerbatim(c, buf, strlen(buf), "txt");
} else {
addReplyError(c, "The value stored at the specified key is not "
"represented using an hash table");
}
} else if (!strcasecmp(c->argv[1]->ptr, "change-repl-id") && c->argc == 2) {
serverLog(LL_NOTICE, "Changing replication IDs after receiving DEBUG change-repl-id");
Expand Down
42 changes: 28 additions & 14 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
*/

#include "server.h"
#include "hashset.h"
#include <stddef.h>

#ifdef HAVE_DEFRAG
Expand Down Expand Up @@ -309,6 +310,20 @@ void activeDefragSdsDict(dict *d, int val_type) {
} while (cursor != 0);
}

void activeDefragSdsHashsetCallback(void *privdata, void *element_ref) {
UNUSED(privdata);
sds *sds_ref = (sds *)element_ref;
sds new_sds = activeDefragSds(*sds_ref);
if (new_sds != NULL) *sds_ref = new_sds;
}

void activeDefragSdsHashset(hashset *hs) {
unsigned long cursor = 0;
do {
cursor = hashsetScan(hs, cursor, activeDefragSdsHashsetCallback, NULL, HASHSET_SCAN_EMIT_REF);
} while (cursor != 0);
}

/* Defrag a list of ptr, sds or robj string values */
void activeDefragList(list *l, int val_type) {
listNode *ln, *newln;
Expand Down Expand Up @@ -441,11 +456,9 @@ void scanCallbackCountScanned(void *privdata, const dictEntry *de) {
}

void scanLaterSet(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT) return;
dict *d = ob->ptr;
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc,
.defragKey = (dictDefragAllocFunction *)activeDefragSds};
*cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL);
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HASHSET) return;
hashset *hs = ob->ptr;
*cursor = hashsetScan(hs, *cursor, activeDefragSdsHashsetCallback, NULL, HASHSET_SCAN_EMIT_REF);
}

void scanLaterHash(robj *ob, unsigned long *cursor) {
Expand Down Expand Up @@ -508,15 +521,16 @@ void defragHash(serverDb *db, dictEntry *kde) {

void defragSet(serverDb *db, dictEntry *kde) {
robj *ob = dictGetVal(kde);
dict *d, *newd;
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT);
d = ob->ptr;
if (dictSize(d) > server.active_defrag_max_scan_fields)
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HASHSET);
hashset *hs = ob->ptr;
if (hashsetSize(hs) > server.active_defrag_max_scan_fields) {
defragLater(db, kde);
else
activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL);
/* defrag the dict struct and tables */
if ((newd = dictDefragTables(ob->ptr))) ob->ptr = newd;
} else {
activeDefragSdsHashset(hs);
}
/* defrag the hashset struct and members */
hashset *newHashset = hashsetDefragInternals(hs, activeDefragAlloc);
if (newHashset) ob->ptr = newHashset;
}

/* Defrag callback for radix tree iterator, called for each node,
Expand Down Expand Up @@ -704,7 +718,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) {
serverPanic("Unknown list encoding");
}
} else if (ob->type == OBJ_SET) {
if (ob->encoding == OBJ_ENCODING_HT) {
if (ob->encoding == OBJ_ENCODING_HASHSET) {
defragSet(db, de);
} else if (ob->encoding == OBJ_ENCODING_INTSET || ob->encoding == OBJ_ENCODING_LISTPACK) {
void *newptr, *ptr = ob->ptr;
Expand Down
37 changes: 24 additions & 13 deletions src/hashset.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
* addressing scheme, including the use of linear probing by scan cursor
* increment, by Viktor Söderqvist. */
#include "hashset.h"
#include "server.h"
#include "serverassert.h"
#include "zmalloc.h"
#include "mt19937-64.h"
Expand Down Expand Up @@ -814,12 +815,12 @@ void *hashsetMetadata(hashset *t) {
}

/* Returns the number of elements stored. */
size_t hashsetSize(hashset *t) {
size_t hashsetSize(const hashset *t) {
return t->used[0] + t->used[1];
}

/* Returns the number of hash table buckets. */
size_t hashsetBuckets(hashset *t) {
size_t hashsetBuckets(const hashset *t) {
return numBuckets(t->bucketExp[0]) + numBuckets(t->bucketExp[1]);
}

Expand Down Expand Up @@ -962,6 +963,15 @@ hashset *hashsetDefragInternals(hashset *s, void *(*defragfn)(void *)) {
return s1;
}

/* Used to release memory to OS to avoid unnecessary CoW.
* Called when we've forked and memory won't be used again.
* See dismissObject() */
void dismissHashset(hashset *t) {
for (int i = 0; i < 2; i++) {
dismissMemory(t->tables[i], numBuckets(t->bucketExp[i]) * sizeof(bucket *));
}
}

/* Returns 1 if an element was found matching the key. Also points *found to it,
* if found is provided. Returns 0 if no matching element was found. */
int hashsetFind(hashset *t, const void *key, void **found) {
Expand Down Expand Up @@ -1311,16 +1321,14 @@ size_t hashsetScan(hashset *t, size_t cursor, hashsetScanFunction fn, void *priv

/* Emit elements in the smaller table, if this bucket hasn't already
* been rehashed. */
if (table0 == 0 && !cursorIsLessThan(cursor, t->rehashIdx)) {
bucket *b = &t->tables[table0][cursor & mask0];
for (int pos = 0; pos < ELEMENTS_PER_BUCKET; pos++) {
if (b->presence & (1 << pos)) {
void *emit = emit_ref ? &b->elements[pos] : b->elements[pos];
fn(privdata, emit);
}
bucket *b = &t->tables[table0][cursor & mask0];
for (int pos = 0; pos < ELEMENTS_PER_BUCKET; pos++) {
if (b->presence & (1 << pos)) {
void *emit = emit_ref ? &b->elements[pos] : b->elements[pos];
fn(privdata, emit);
}
in_probe_sequence |= b->everfull;
}
in_probe_sequence |= b->everfull;

/* Iterate over indices in larger table that are the expansion of
* the index pointed to by the cursor in the smaller table. */
Expand Down Expand Up @@ -1431,6 +1439,10 @@ int hashsetNext(hashsetIterator *iter, void **elemptr) {
} else {
iter->fingerprint = hashsetFingerprint(iter->hashset);
}
if (!iter->hashset->tables[0]) {
/* Empty table - we're done */
break;
}
iter->index = 0;
/* skip the rehashed slots in table[0] */
if (hashsetIsRehashing(iter->hashset)) {
Expand All @@ -1442,8 +1454,8 @@ int hashsetNext(hashsetIterator *iter, void **elemptr) {
iter->posInBucket++;
if (iter->posInBucket >= ELEMENTS_PER_BUCKET) {
iter->posInBucket = 0;
iter->index++;
if (iter->index >= (long)numBuckets(iter->hashset->bucketExp[iter->table])) {
iter->index = nextCursor(iter->index, expToMask(iter->hashset->bucketExp[iter->table]));
if (iter->index == 0) {
iter->index = 0;
if (hashsetIsRehashing(iter->hashset) && iter->table == 0) {
iter->table++;
Expand Down Expand Up @@ -1511,7 +1523,6 @@ unsigned hashsetSampleElements(hashset *t, void **dst, unsigned count) {
while (samples.count < count) {
cursor = hashsetScan(t, cursor, sampleElementsScanFn, &samples, HASHSET_SCAN_SINGLE_STEP);
}
rehashStepOnReadIfNeeded(t);
return count;
}

Expand Down
5 changes: 3 additions & 2 deletions src/hashset.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ void hashsetRelease(hashset *t);
void hashsetEmpty(hashset *t, void(callback)(hashset *));
hashsetType *hashsetGetType(hashset *t);
void *hashsetMetadata(hashset *t);
size_t hashsetSize(hashset *t);
size_t hashsetBuckets(hashset *t);
size_t hashsetSize(const hashset *t);
size_t hashsetBuckets(const hashset *t);
size_t hashsetProbeCounter(hashset *t, int table);
size_t hashsetMemUsage(hashset *t);
void hashsetPauseAutoShrink(hashset *t);
Expand All @@ -165,6 +165,7 @@ int hashsetTryExpand(hashset *t, size_t size);
int hashsetExpandIfNeeded(hashset *t);
int hashsetShrinkIfNeeded(hashset *t);
hashset *hashsetDefragInternals(hashset *t, void *(*defragfn)(void *));
void dismissHashset(hashset *set);

/* Elements */
int hashsetFind(hashset *t, const void *key, void **found);
Expand Down
Loading

0 comments on commit add04d0

Please sign in to comment.