From bd440407cc0bb23bb1dd73942756b9e1ed1b9a4d Mon Sep 17 00:00:00 2001 From: bluayer Date: Thu, 12 Dec 2024 00:50:05 +0900 Subject: [PATCH] Add --rfa and --rfro option - When users enable rfa(read_from_all) option, they can read from replica and primary. - When users enable rfro(read_from_replicas only) option, they can read from replicas only. - If users don't use any option related to replicas, they can read from primaries only. - Add READONLY when enabling option for reading all nodes Signed-off-by: bluayer --- src/valkey-benchmark.c | 63 +++++++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index ab6ef54e62..7258b3f8c4 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -112,7 +112,8 @@ static struct config { int num_threads; struct benchmarkThread **threads; int cluster_mode; - int read_from_replicas; + int read_from_all; + int read_from_replicas_only; int cluster_node_count; struct clusterNode **cluster_nodes; struct serverConfig *redis_config; @@ -714,7 +715,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { c->prefix_pending++; } - if (config.cluster_mode && config.read_from_replicas) { + if (config.cluster_mode && (config.read_from_replicas_only || config.read_from_all)) { char *buf = NULL; int len; len = redisFormatCommand(&buf, "READONLY"); @@ -848,7 +849,14 @@ static void showLatencyReport(void) { printf(" %d bytes payload\n", config.datasize); printf(" keep alive: %d\n", config.keepalive); if (config.cluster_mode) { - char *node_prefix = config.read_from_replicas ? "replicas" : "primaries"; + const char *node_prefix = NULL; + if (config.read_from_all) { + node_prefix = "all"; + } else if (config.read_from_replicas_only) { + node_prefix = "replica"; + } else { + node_prefix = "primary"; + } printf(" cluster mode: yes (%d %s)\n", config.cluster_node_count, node_prefix); int m; for (m = 0; m < config.cluster_node_count; m++) { @@ -1061,12 +1069,14 @@ static int fetchClusterConfiguration(void) { int success = 1; redisContext *ctx = NULL; redisReply *reply = NULL; + dict *nodes = NULL; const char *errmsg = "Failed to fetch cluster configuration"; size_t i, j; ctx = getRedisContext(config.conn_info.hostip, config.conn_info.hostport, config.hostsocket); if (ctx == NULL) { exit(1); } + assert(!(config.read_from_all && config.read_from_replicas_only) && "--rfa and --rfro cannot be enabled simultaneously"); reply = redisCommand(ctx, "CLUSTER SLOTS"); if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { @@ -1075,7 +1085,7 @@ static int fetchClusterConfiguration(void) { goto cleanup; } assert(reply->type == REDIS_REPLY_ARRAY); - dict *nodes = dictCreate(&dtype); + nodes = dictCreate(&dtype); for (i = 0; i < reply->elements; i++) { redisReply *r = reply->element[i]; assert(r->type == REDIS_REPLY_ARRAY); @@ -1084,7 +1094,8 @@ static int fetchClusterConfiguration(void) { int to = r->element[1]->integer; for (j = 2; j < r->elements; j++) { int is_primary = (j == 2); - if (config.read_from_replicas == is_primary) continue; + int is_cluster_option_only = (!config.read_from_all && !config.read_from_replicas_only); + if ((config.read_from_replicas_only && is_primary) || (is_cluster_option_only && !is_primary)) continue; redisReply *nr = r->element[j]; assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3); @@ -1093,6 +1104,8 @@ static int fetchClusterConfiguration(void) { sds ip = sdsnew(nr->element[0]->str); sds name = sdsnew(nr->element[2]->str); int port = nr->element[1]->integer; + int slot_start = from; + int slot_end = to; clusterNode *node = NULL; dictEntry *entry = dictFind(nodes, name); @@ -1106,11 +1119,11 @@ static int fetchClusterConfiguration(void) { } else { node = dictGetVal(entry); } - if (from == to) { - node->slots[node->slots_count++] = from; + if (slot_start == slot_end) { + node->slots[node->slots_count++] = slot_start; } else { - while (from <= to) { - int slot = from++; + while (slot_start <= slot_end) { + int slot = slot_start++; node->slots[node->slots_count++] = slot; } } @@ -1133,6 +1146,7 @@ static int fetchClusterConfiguration(void) { if (config.cluster_nodes) freeClusterNodes(); } if (reply) freeReplyObject(reply); + if (nodes) dictRelease(nodes); return success; } @@ -1154,7 +1168,7 @@ static int fetchClusterSlotsConfiguration(client c) { if (is_fetching_slots) return -1; // TODO: use other codes || errno ? atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n"); - fprintf(stderr, "If you are using the --rfr option and sending write requests (set type commands),\nthe requests could not be processed properly.\n"); + fprintf(stderr, "If you are using the --rfa and --rfro option and sending write requests (set type commands),\nthe requests could not be processed properly.\n"); const char *errmsg = "Failed to update cluster slots configuration"; @@ -1194,7 +1208,10 @@ static int fetchClusterSlotsConfiguration(client c) { from = r->element[0]->integer; to = r->element[1]->integer; size_t start, end; - if (config.read_from_replicas) { + if (config.read_from_all) { + start = 2; + end = r->elements; + } else if (config.read_from_replicas_only) { start = 3; end = r->elements; } else { @@ -1385,8 +1402,10 @@ int parseOptions(int argc, char **argv) { config.num_threads = 0; } else if (!strcmp(argv[i], "--cluster")) { config.cluster_mode = 1; - } else if (!strcmp(argv[i], "--rfr")) { - config.read_from_replicas = 1; + } else if (!strcmp(argv[i], "--rfa")) { + config.read_from_all = 1; + } else if (!strcmp(argv[i], "--rfro")) { + config.read_from_replicas_only = 1; } else if (!strcmp(argv[i], "--enable-tracking")) { config.enable_tracking = 1; } else if (!strcmp(argv[i], "--help")) { @@ -1484,7 +1503,11 @@ int parseOptions(int argc, char **argv) { " If the command is supplied on the command line in cluster\n" " mode, the key must contain \"{tag}\". Otherwise, the\n" " command will not be sent to the right cluster node.\n" - " --rfr Enable read from replicas in cluster mode.\n" + " --rfa Enable read from all nodes(primary and replica) in cluster mode.\n" + " This command must be used with the --cluster option.\n" + " When using this option, it is recommended to use only \n" + " the commands for read requests.\n" + " --rfro Enable read from replicas only in cluster mode.\n" " This command must be used with the --cluster option.\n" " When using this option, it is recommended to use only \n" " the commands for read requests.\n" @@ -1629,7 +1652,8 @@ int main(int argc, char **argv) { config.num_threads = 0; config.threads = NULL; config.cluster_mode = 0; - config.read_from_replicas = 0; + config.read_from_all = 0; + config.read_from_replicas_only = 0; config.cluster_node_count = 0; config.cluster_nodes = NULL; config.redis_config = NULL; @@ -1674,7 +1698,14 @@ int main(int argc, char **argv) { fprintf(stderr, "Invalid cluster: %d node(s).\n", config.cluster_node_count); exit(1); } - char *node_prefix = config.read_from_replicas ? "replica" : "primary"; + const char *node_prefix = NULL; + if (config.read_from_all) { + node_prefix = "all"; + } else if (config.read_from_replicas_only) { + node_prefix = "replica"; + } else { + node_prefix = "primary"; + } printf("Cluster has %d %s nodes:\n\n", config.cluster_node_count, node_prefix); int i = 0; for (; i < config.cluster_node_count; i++) {