Skip to content

Commit

Permalink
Accelerate hash table iterator with prefetching (valkey-io#1501)
Browse files Browse the repository at this point in the history
This PR introduces improvements to the hashtable iterator, implementing
prefetching technique described in the blog post [Unlock One Million RPS
- Part 2](https://valkey.io/blog/unlock-one-million-rps-part2/) . The
changes lay the groundwork for further enhancements in use cases
involving iterators. Future PRs will build upon this foundation to
improve performance and functionality in various iterator-dependent
operations.

In the pursuit of maximizing iterator performance, I conducted a
comprehensive series of experiments. My tests encompassed a wide range
of approaches, including processing multiple bucket indices in parallel,
prefetching the next bucket upon completion of the current one, and
several other timing and quantity variations. Surprisingly, after
rigorous testing and performance analysis, the simplest implementation
presented in this PR consistently outperformed all other more complex
strategies.

## Implementation

Each time we start iterating over a bucket, we prefetch data for future
iterations:

- We prefetch the entries of the next bucket (if it exists).
- We prefetch the structure (but not the entries) of the bucket after
  the next.

This prefetching is done when we pick up a new bucket, increasing the
chance that the data will be in cache by the time we need it.

## Performance

The data below was taken by conducting keys command on 64cores Graviton
3 Amazon EC2 instance with 50 mil keys in size of 100 bytes each. The
results regarding the duration of “keys *” command was taken from “info
all” command.

```
+--------------------+------------------+-----------------------------+
| prefetching        | Time (seconds)   | Keys Processed per Second   |
+--------------------+------------------+-----------------------------+
| No                 | 11.112279        | 4,499,529                   |
| Yes                | 3.141916         | 15,913,862                  |
+--------------------+------------------+-----------------------------+
Improvement:
Comparing the iterator without prefetching to the one with prefetching, 
we can see a speed improvement of 11.112279 / 3.141916 ≈ 3.54 times faster.
```


### Save command improvment

#### Setup:
- 64cores Graviton 3 Amazon EC2 instance.
-  50 mil keys in size of 100 bytes each.
-  Running valkey server over RAM file system.
-  crc checksum and comperssion off.

#### Results

```
+--------------------+------------------+-----------------------------+
| prefetching        | Time (seconds)   | Keys Processed per Second   |
+--------------------+------------------+-----------------------------+
| No                 | 28               | 1,785,700                   |
| Yes                | 19.6             | 2,550,000                   |
+--------------------+------------------+-----------------------------+
Improvement:
- Reduced SAVE time by 30% (8.4 seconds faster)
- Increased key processing rate by 42.8% (764,300 more keys/second)
```

Signed-off-by: NadavGigi <[email protected]>
  • Loading branch information
NadavGigi authored Jan 8, 2025
1 parent 418f1d0 commit 9f4815a
Showing 1 changed file with 70 additions and 22 deletions.
92 changes: 70 additions & 22 deletions src/hashtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ size_t nextCursor(size_t v, size_t mask) {
}

/* Returns the next bucket in a bucket chain, or NULL if there's no next. */
static bucket *bucketNext(bucket *b) {
static bucket *getChildBucket(bucket *b) {
return b->chained ? b->entries[ENTRIES_PER_BUCKET - 1] : NULL;
}

Expand Down Expand Up @@ -548,12 +548,12 @@ static void rehashStep(hashtable *ht) {
rehashBucket(ht, b);
if (b->chained) {
/* Rehash and free child buckets. */
bucket *next = bucketNext(b);
bucket *next = getChildBucket(b);
b->chained = 0;
b = next;
while (b != NULL) {
rehashBucket(ht, b);
next = bucketNext(b);
next = getChildBucket(b);
zfree(b);
if (ht->type->trackMemUsage) ht->type->trackMemUsage(ht, -sizeof(bucket));
ht->child_buckets[0]--;
Expand Down Expand Up @@ -708,7 +708,7 @@ static bucket *findBucket(hashtable *ht, uint64_t hash, const void *key, int *po
}
}
}
b = bucketNext(b);
b = getChildBucket(b);
} while (b != NULL);
}
return NULL;
Expand Down Expand Up @@ -753,7 +753,7 @@ static void bucketConvertToUnchained(bucket *b) {
* This function needs the penultimate 'before_last' bucket in the chain, to be
* able to update it when the last bucket is freed. */
static void pruneLastBucket(hashtable *ht, bucket *before_last, bucket *last, int table_index) {
assert(before_last->chained && bucketNext(before_last) == last);
assert(before_last->chained && getChildBucket(before_last) == last);
assert(!last->chained);
assert(last->presence == 0 || __builtin_popcount(last->presence) == 1);
bucketConvertToUnchained(before_last);
Expand All @@ -775,10 +775,10 @@ static void fillBucketHole(hashtable *ht, bucket *b, int pos_in_bucket, int tabl
assert(b->chained && !isPositionFilled(b, pos_in_bucket));
/* Find the last bucket */
bucket *before_last = b;
bucket *last = bucketNext(b);
bucket *last = getChildBucket(b);
while (last->chained) {
before_last = last;
last = bucketNext(last);
last = getChildBucket(last);
}
/* Unless the last bucket is empty, find an entry in the last bucket and
* move it to the hole in b. */
Expand All @@ -800,10 +800,10 @@ static void fillBucketHole(hashtable *ht, bucket *b, int pos_in_bucket, int tabl
static void compactBucketChain(hashtable *ht, size_t bucket_index, int table_index) {
bucket *b = &ht->tables[table_index][bucket_index];
while (b->chained) {
bucket *next = bucketNext(b);
bucket *next = getChildBucket(b);
if (next->chained && next->presence == 0) {
/* Empty bucket in the middle of the chain. Remove it from the chain. */
bucket *next_next = bucketNext(next);
bucket *next_next = getChildBucket(next);
b->entries[ENTRIES_PER_BUCKET - 1] = next_next;
zfree(next);
if (ht->type->trackMemUsage) ht->type->trackMemUsage(ht, -sizeof(bucket));
Expand Down Expand Up @@ -846,7 +846,7 @@ static bucket *findBucketForInsert(hashtable *ht, uint64_t hash, int *pos_in_buc
bucketConvertToChained(ht, b);
ht->child_buckets[table]++;
}
b = bucketNext(b);
b = getChildBucket(b);
}
/* Find a free slot in the bucket. There must be at least one. */
int pos;
Expand Down Expand Up @@ -934,6 +934,51 @@ static inline incrementalFind *incrementalFindFromOpaque(hashtableIncrementalFin
return (incrementalFind *)(void *)state;
}

/* Prefetches all filled entries in the given bucket to optimize future memory access. */
static void prefetchBucketEntries(bucket *b) {
for (int pos = 0; pos < numBucketPositions(b); pos++) {
if (isPositionFilled(b, pos)) {
valkey_prefetch(b->entries[pos]);
}
}
}

/* Returns the child bucket if chained, otherwise the next bucket in the table. returns NULL if neither exists. */
static bucket *getNextBucket(bucket *current_bucket, size_t bucket_index, hashtable *ht, int table_index) {
bucket *next_bucket = NULL;
if (current_bucket->chained) {
next_bucket = getChildBucket(current_bucket);
} else {
size_t table_size = numBuckets(ht->bucket_exp[table_index]);
size_t next_index = bucket_index + 1;
if (next_index < table_size) {
next_bucket = &ht->tables[table_index][next_index];
}
}
return next_bucket;
}

/* This function prefetches data that will be needed in subsequent iterations:
* - The entries of the next bucket
* - The next of the next bucket
* It attempts to bring this data closer to the L1 cache to reduce future memory access latency.
*
* Cache state before this function is called(due to last call for this function):
* 1. The current bucket and its entries are likely already in cache.
* 2. The next bucket is in cache.
*/
static void prefetchNextBucketEntries(iter *iter, bucket *current_bucket) {
size_t next_index = iter->index + 1;
bucket *next_bucket = getNextBucket(current_bucket, next_index, iter->hashtable, iter->table);
if (next_bucket) {
prefetchBucketEntries(next_bucket);
bucket *next_next_bucket = getNextBucket(next_bucket, next_index + 1, iter->hashtable, iter->table);
if (next_next_bucket) {
valkey_prefetch(next_next_bucket);
}
}
}

/* --- API functions --- */

/* Allocates and initializes a new hashtable specified by the given type. */
Expand Down Expand Up @@ -979,7 +1024,7 @@ void hashtableEmpty(hashtable *ht, void(callback)(hashtable *)) {
}
}
}
bucket *next = bucketNext(b);
bucket *next = getChildBucket(b);

/* Free allocated bucket. */
if (b != &ht->tables[table_index][idx]) {
Expand Down Expand Up @@ -1375,7 +1420,7 @@ int hashtableReplaceReallocatedEntry(hashtable *ht, const void *old_entry, void
return 1;
}
}
b = bucketNext(b);
b = getChildBucket(b);
} while (b != NULL);
}
return 0;
Expand Down Expand Up @@ -1538,8 +1583,8 @@ int hashtableIncrementalFindStep(hashtableIncrementalFindState *state) {
bucket_idx = data->hash & mask;
}
data->bucket = &ht->tables[data->table][bucket_idx];
} else if (bucketNext(data->bucket) != NULL) {
data->bucket = bucketNext(data->bucket);
} else if (getChildBucket(data->bucket) != NULL) {
data->bucket = getChildBucket(data->bucket);
} else if (data->table == 0 && ht->rehash_idx >= 0) {
data->table = 1;
size_t mask = expToMask(ht->bucket_exp[1]);
Expand Down Expand Up @@ -1656,7 +1701,7 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f
}
}
}
bucket *next = bucketNext(b);
bucket *next = getChildBucket(b);
if (next != NULL && defragfn != NULL) {
next = bucketDefrag(b, next, defragfn);
}
Expand Down Expand Up @@ -1693,7 +1738,7 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f
}
}
}
bucket *next = bucketNext(b);
bucket *next = getChildBucket(b);
if (next != NULL && defragfn != NULL) {
next = bucketDefrag(b, next, defragfn);
}
Expand Down Expand Up @@ -1723,7 +1768,7 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f
}
}
}
bucket *next = bucketNext(b);
bucket *next = getChildBucket(b);
if (next != NULL && defragfn != NULL) {
next = bucketDefrag(b, next, defragfn);
}
Expand Down Expand Up @@ -1859,7 +1904,7 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) {
iter->pos_in_bucket++;
if (iter->bucket->chained && iter->pos_in_bucket >= ENTRIES_PER_BUCKET - 1) {
iter->pos_in_bucket = 0;
iter->bucket = bucketNext(iter->bucket);
iter->bucket = getChildBucket(iter->bucket);
} else if (iter->pos_in_bucket >= ENTRIES_PER_BUCKET) {
/* Bucket index done. */
if (iter->safe) {
Expand Down Expand Up @@ -1890,6 +1935,9 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) {
}
}
bucket *b = iter->bucket;
if (iter->pos_in_bucket == 0) {
prefetchNextBucketEntries(iter, b);
}
if (!isPositionFilled(b, iter->pos_in_bucket)) {
/* No entry here. */
continue;
Expand Down Expand Up @@ -1988,7 +2036,7 @@ hashtableStats *hashtableGetStatsHt(hashtable *ht, int table_index, int full) {
unsigned long chainlen = 0;
while (b->chained) {
chainlen++;
b = bucketNext(b);
b = getChildBucket(b);
}
if (chainlen > stats->max_chain_len) {
stats->max_chain_len = chainlen;
Expand Down Expand Up @@ -2083,7 +2131,7 @@ void hashtableDump(hashtable *ht) {
printf("(empty)\n");
}
}
b = bucketNext(b);
b = getChildBucket(b);
level++;
} while (b != NULL);
}
Expand Down Expand Up @@ -2117,7 +2165,7 @@ void hashtableHistogram(hashtable *ht) {
continue;
}
printf("%X", __builtin_popcount(b->presence));
buckets[idx] = bucketNext(b);
buckets[idx] = getChildBucket(b);
if (buckets[idx] == NULL) chains_left--;
}
printf("\n");
Expand All @@ -2138,7 +2186,7 @@ int hashtableLongestBucketChain(hashtable *ht) {
if (++chainlen > maxlen) {
maxlen = chainlen;
}
b = bucketNext(b);
b = getChildBucket(b);
}
}
}
Expand Down

0 comments on commit 9f4815a

Please sign in to comment.