Skip to content

Commit

Permalink
Add --rfa and --rfro option
Browse files Browse the repository at this point in the history
- When users enable rfa(read_from_all) option, they can read from replica and primary.
- When users enable rfro(read_from_replicas only) option, they can read from replicas only.
- If users don't use any option related to replicas, they can read from primaries only.
- Add READONLY when enabling option for reading all nodes

Signed-off-by: bluayer <[email protected]>
  • Loading branch information
bluayer committed Dec 11, 2024
1 parent 66cd375 commit bd44040
Showing 1 changed file with 47 additions and 16 deletions.
63 changes: 47 additions & 16 deletions src/valkey-benchmark.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ static struct config {
int num_threads;
struct benchmarkThread **threads;
int cluster_mode;
int read_from_replicas;
int read_from_all;
int read_from_replicas_only;
int cluster_node_count;
struct clusterNode **cluster_nodes;
struct serverConfig *redis_config;
Expand Down Expand Up @@ -714,7 +715,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
c->prefix_pending++;
}

if (config.cluster_mode && config.read_from_replicas) {
if (config.cluster_mode && (config.read_from_replicas_only || config.read_from_all)) {
char *buf = NULL;
int len;
len = redisFormatCommand(&buf, "READONLY");
Expand Down Expand Up @@ -848,7 +849,14 @@ static void showLatencyReport(void) {
printf(" %d bytes payload\n", config.datasize);
printf(" keep alive: %d\n", config.keepalive);
if (config.cluster_mode) {
char *node_prefix = config.read_from_replicas ? "replicas" : "primaries";
const char *node_prefix = NULL;
if (config.read_from_all) {
node_prefix = "all";
} else if (config.read_from_replicas_only) {
node_prefix = "replica";
} else {
node_prefix = "primary";
}
printf(" cluster mode: yes (%d %s)\n", config.cluster_node_count, node_prefix);
int m;
for (m = 0; m < config.cluster_node_count; m++) {
Expand Down Expand Up @@ -1061,12 +1069,14 @@ static int fetchClusterConfiguration(void) {
int success = 1;
redisContext *ctx = NULL;
redisReply *reply = NULL;
dict *nodes = NULL;
const char *errmsg = "Failed to fetch cluster configuration";
size_t i, j;
ctx = getRedisContext(config.conn_info.hostip, config.conn_info.hostport, config.hostsocket);
if (ctx == NULL) {
exit(1);
}
assert(!(config.read_from_all && config.read_from_replicas_only) && "--rfa and --rfro cannot be enabled simultaneously");

reply = redisCommand(ctx, "CLUSTER SLOTS");
if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
Expand All @@ -1075,7 +1085,7 @@ static int fetchClusterConfiguration(void) {
goto cleanup;
}
assert(reply->type == REDIS_REPLY_ARRAY);
dict *nodes = dictCreate(&dtype);
nodes = dictCreate(&dtype);
for (i = 0; i < reply->elements; i++) {
redisReply *r = reply->element[i];
assert(r->type == REDIS_REPLY_ARRAY);
Expand All @@ -1084,7 +1094,8 @@ static int fetchClusterConfiguration(void) {
int to = r->element[1]->integer;
for (j = 2; j < r->elements; j++) {
int is_primary = (j == 2);
if (config.read_from_replicas == is_primary) continue;
int is_cluster_option_only = (!config.read_from_all && !config.read_from_replicas_only);
if ((config.read_from_replicas_only && is_primary) || (is_cluster_option_only && !is_primary)) continue;

redisReply *nr = r->element[j];
assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3);
Expand All @@ -1093,6 +1104,8 @@ static int fetchClusterConfiguration(void) {
sds ip = sdsnew(nr->element[0]->str);
sds name = sdsnew(nr->element[2]->str);
int port = nr->element[1]->integer;
int slot_start = from;
int slot_end = to;

clusterNode *node = NULL;
dictEntry *entry = dictFind(nodes, name);
Expand All @@ -1106,11 +1119,11 @@ static int fetchClusterConfiguration(void) {
} else {
node = dictGetVal(entry);
}
if (from == to) {
node->slots[node->slots_count++] = from;
if (slot_start == slot_end) {
node->slots[node->slots_count++] = slot_start;
} else {
while (from <= to) {
int slot = from++;
while (slot_start <= slot_end) {
int slot = slot_start++;
node->slots[node->slots_count++] = slot;
}
}
Expand All @@ -1133,6 +1146,7 @@ static int fetchClusterConfiguration(void) {
if (config.cluster_nodes) freeClusterNodes();
}
if (reply) freeReplyObject(reply);
if (nodes) dictRelease(nodes);
return success;
}

Expand All @@ -1154,7 +1168,7 @@ static int fetchClusterSlotsConfiguration(client c) {
if (is_fetching_slots) return -1; // TODO: use other codes || errno ?
atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed);
fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n");
fprintf(stderr, "If you are using the --rfr option and sending write requests (set type commands),\nthe requests could not be processed properly.\n");
fprintf(stderr, "If you are using the --rfa and --rfro option and sending write requests (set type commands),\nthe requests could not be processed properly.\n");

const char *errmsg = "Failed to update cluster slots configuration";

Expand Down Expand Up @@ -1194,7 +1208,10 @@ static int fetchClusterSlotsConfiguration(client c) {
from = r->element[0]->integer;
to = r->element[1]->integer;
size_t start, end;
if (config.read_from_replicas) {
if (config.read_from_all) {
start = 2;
end = r->elements;
} else if (config.read_from_replicas_only) {
start = 3;
end = r->elements;
} else {
Expand Down Expand Up @@ -1385,8 +1402,10 @@ int parseOptions(int argc, char **argv) {
config.num_threads = 0;
} else if (!strcmp(argv[i], "--cluster")) {
config.cluster_mode = 1;
} else if (!strcmp(argv[i], "--rfr")) {
config.read_from_replicas = 1;
} else if (!strcmp(argv[i], "--rfa")) {
config.read_from_all = 1;
} else if (!strcmp(argv[i], "--rfro")) {
config.read_from_replicas_only = 1;
} else if (!strcmp(argv[i], "--enable-tracking")) {
config.enable_tracking = 1;
} else if (!strcmp(argv[i], "--help")) {
Expand Down Expand Up @@ -1484,7 +1503,11 @@ int parseOptions(int argc, char **argv) {
" If the command is supplied on the command line in cluster\n"
" mode, the key must contain \"{tag}\". Otherwise, the\n"
" command will not be sent to the right cluster node.\n"
" --rfr Enable read from replicas in cluster mode.\n"
" --rfa Enable read from all nodes(primary and replica) in cluster mode.\n"
" This command must be used with the --cluster option.\n"
" When using this option, it is recommended to use only \n"
" the commands for read requests.\n"
" --rfro Enable read from replicas only in cluster mode.\n"
" This command must be used with the --cluster option.\n"
" When using this option, it is recommended to use only \n"
" the commands for read requests.\n"
Expand Down Expand Up @@ -1629,7 +1652,8 @@ int main(int argc, char **argv) {
config.num_threads = 0;
config.threads = NULL;
config.cluster_mode = 0;
config.read_from_replicas = 0;
config.read_from_all = 0;
config.read_from_replicas_only = 0;
config.cluster_node_count = 0;
config.cluster_nodes = NULL;
config.redis_config = NULL;
Expand Down Expand Up @@ -1674,7 +1698,14 @@ int main(int argc, char **argv) {
fprintf(stderr, "Invalid cluster: %d node(s).\n", config.cluster_node_count);
exit(1);
}
char *node_prefix = config.read_from_replicas ? "replica" : "primary";
const char *node_prefix = NULL;
if (config.read_from_all) {
node_prefix = "all";
} else if (config.read_from_replicas_only) {
node_prefix = "replica";
} else {
node_prefix = "primary";
}
printf("Cluster has %d %s nodes:\n\n", config.cluster_node_count, node_prefix);
int i = 0;
for (; i < config.cluster_node_count; i++) {
Expand Down

0 comments on commit bd44040

Please sign in to comment.