From cbe6361670434f81a217f5d8edb89b758abfdbf4 Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Tue, 30 Jul 2024 18:15:50 +0800 Subject: [PATCH 1/3] Rebase based on unstable branch. --------- Signed-off-by: Lipeng Zhu Co-authored-by: Wangyang Guo --- src/Makefile | 13 +++- src/config.c | 1 + src/io_uring.c | 88 +++++++++++++++++++++++ src/io_uring.h | 15 ++++ src/networking.c | 132 +++++++++++++++++++++++++++++++---- src/server.c | 8 +++ src/server.h | 4 +- tests/unit/introspection.tcl | 1 + valkey.conf | 6 ++ 9 files changed, 252 insertions(+), 16 deletions(-) create mode 100644 src/io_uring.c create mode 100644 src/io_uring.h diff --git a/src/Makefile b/src/Makefile index eaf0e4e387..5a754685ec 100644 --- a/src/Makefile +++ b/src/Makefile @@ -318,6 +318,17 @@ else LIBCRYPTO_LIBS=-lcrypto endif +# only Linux has IO_URING support +ifeq ($(uname_S),Linux) +HAS_LIBURING := $(shell sh -c 'echo "$(NUMBER_SIGN_CHAR)include " > foo.c; \ + $(CC) -E foo.c > /dev/null 2>&1 && echo yes; \ + rm foo.c') +ifeq ($(HAS_LIBURING),yes) + FINAL_CFLAGS+= -DHAVE_LIBURING + FINAL_LIBS+= -luring +endif +endif + BUILD_NO:=0 BUILD_YES:=1 BUILD_MODULE:=2 @@ -423,7 +434,7 @@ endif ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o io_uring.o ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX) diff --git a/src/config.c b/src/config.c index ae60dd3fd0..f0409f9892 100644 --- a/src/config.c +++ b/src/config.c @@ -3106,6 +3106,7 @@ standardConfig static_configs[] = { createBoolConfig("extended-redis-compatibility", NULL, MODIFIABLE_CONFIG, server.extended_redis_compat, 0, NULL, updateExtendedRedisCompat), createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL), createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL), + createBoolConfig("io-uring-enabled", NULL, IMMUTABLE_CONFIG, server.io_uring_enabled, 0, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), diff --git a/src/io_uring.c b/src/io_uring.c new file mode 100644 index 0000000000..e4b918dab3 --- /dev/null +++ b/src/io_uring.c @@ -0,0 +1,88 @@ +#include "io_uring.h" + +#ifdef HAVE_LIBURING +#include +#include +#include "zmalloc.h" + +/* io_uring instance queue depth. */ +#define IO_URING_DEPTH 256 + +static struct io_uring *_io_uring; +static size_t io_uring_write_queue_len = 0; + +/* Initialize io_uring at server startup if io_uring enabled, + * setup io_uring submission and completion. */ +int initIOUring(void) { + struct io_uring_params params; + _io_uring = zmalloc(sizeof(struct io_uring)); + memset(¶ms, 0, sizeof(params)); + /* On success, io_uring_queue_init_params(3) returns 0 and _io_uring will + * point to the shared memory containing the io_uring queues. + * On failure -errno is returned. */ + if (io_uring_queue_init_params(IO_URING_DEPTH, _io_uring, ¶ms) < 0) return IO_URING_ERR; + return IO_URING_OK; +} + +/* Use io_uring to handle the client write request. */ +int ioUringPrepWrite(void *data, int fd, const void *buf, size_t len) { + struct io_uring_sqe *sqe = io_uring_get_sqe(_io_uring); + if (sqe == NULL) return IO_URING_ERR; + io_uring_prep_send(sqe, fd, buf, len, MSG_DONTWAIT); + io_uring_sqe_set_data(sqe, data); + io_uring_write_queue_len++; + return IO_URING_OK; +} + +/* Submit requests to the submission queue and wait for completion. */ +int ioUringWaitWriteBarrier(io_uring_cqe_handler cqe_handler) { + if (io_uring_submit(_io_uring) < 0) return IO_URING_ERR; + while (io_uring_write_queue_len) { + struct io_uring_cqe *cqe; + int ret = io_uring_wait_cqe(_io_uring, &cqe); + if (ret == 0) { + if (cqe_handler) { + void *data = io_uring_cqe_get_data(cqe); + cqe_handler(data, cqe->res); + } + io_uring_cqe_seen(_io_uring, cqe); + io_uring_write_queue_len--; + } else { + return IO_URING_ERR; + } + } + return IO_URING_OK; +} + +/* Free io_uring. */ +void freeIOUring(void) { + io_uring_queue_exit(_io_uring); + zfree(_io_uring); + _io_uring = NULL; +} +#else +#ifndef UNUSED +#define UNUSED(V) ((void)V) +#endif + +int initIOUring(void) { + return 0; +} + +int ioUringPrepWrite(void *data, int fd, const void *buf, size_t len) { + UNUSED(data); + UNUSED(fd); + UNUSED(buf); + UNUSED(len); + return 0; +} + +int ioUringWaitWriteBarrier(io_uring_cqe_handler cqe_handler) { + UNUSED(cqe_handler); + return 0; +} + +void freeIOUring(void) { +} + +#endif diff --git a/src/io_uring.h b/src/io_uring.h new file mode 100644 index 0000000000..1c73ce6900 --- /dev/null +++ b/src/io_uring.h @@ -0,0 +1,15 @@ +#ifndef IO_URING_H +#define IO_URING_H +#include + +#define IO_URING_OK 0 +#define IO_URING_ERR -1 + +typedef void (*io_uring_cqe_handler)(void *, int); + +int initIOUring(void); +int ioUringPrepWrite(void *data, int fd, const void *buf, size_t len); +int ioUringWaitWriteBarrier(io_uring_cqe_handler cqe_handler); +void freeIOUring(void); + +#endif /* IO_URING_H */ diff --git a/src/networking.c b/src/networking.c index 13b76a0893..7598fdeede 100644 --- a/src/networking.c +++ b/src/networking.c @@ -35,6 +35,7 @@ #include "fmtargs.h" #include #include "io_threads.h" +#include "io_uring.h" #include #include #include @@ -2424,6 +2425,82 @@ int processIOThreadsWriteDone(void) { return processed; } +/* If client is suitable to use io_uring to handle the write request. */ +static inline int _canWriteUsingIOUring(client *c) { + if (server.io_uring_enabled && server.io_threads_num == 1) { + /* Currently, we only use io_uring to handle the static buffer write requests. */ + return getClientType(c) != CLIENT_TYPE_REPLICA && listLength(c->reply) == 0 && c->bufpos > 0; + } + return 0; +} + +/* Check the completed io_uring event and update the state. */ +static int _checkPendingIOUringWriteState(client *c) { + /* Note that where synchronous system calls will return -1 on + * failure and set errno to the actual error value, + * io_uring never uses errno. Instead it returns the negated + * errno directly in the CQE res field. */ + if (c->nwritten <= 0) { + if (c->nwritten != -EAGAIN) { + c->conn->last_errno = -(c->nwritten); + /* Don't overwrite the state of a connection that is not already + * connected, not to mess with handler callbacks. */ + if (c->nwritten != -EINTR && c->conn->state == CONN_STATE_CONNECTED) c->conn->state = CONN_STATE_ERROR; + } + if (connGetState(c->conn) != CONN_STATE_CONNECTED) { + serverLog(LL_VERBOSE, "Error writing to client: %s", connGetLastError(c->conn)); + freeClientAsync(c); + } + return IO_URING_ERR; + } + + c->sentlen += c->nwritten; + /* If the buffer was sent, set bufpos to zero to continue with + * the remainder of the reply. */ + if ((int)c->sentlen == c->bufpos) { + c->bufpos = 0; + c->sentlen = 0; + } + server.stat_net_output_bytes += c->nwritten; + c->net_output_bytes += c->nwritten; + + /* For clients representing masters we don't count sending data + * as an interaction, since we always send REPLCONF ACK commands + * that take some time to just fill the socket output buffer. + * We just rely on data / pings received for timeout detection. */ + if (!c->flag.primary) c->last_interaction = server.unixtime; + + return IO_URING_OK; +} + +static void _postIOUringWrite(void) { + listIter li; + listNode *ln; + listRewind(server.clients_pending_write, &li); + while ((ln = listNext(&li))) { + client *c = listNodeValue(ln); + c->flag.pending_io_uring_write = 0; + listUnlinkNode(server.clients_pending_write, ln); + + if (_checkPendingIOUringWriteState(c) == IO_URING_ERR) continue; + if (!clientHasPendingReplies(c)) { + c->sentlen = 0; + /* Close connection after entire reply has been sent. */ + if (c->flag.close_after_reply) { + freeClientAsync(c); + continue; + } + } + /* Update client's memory usage after writing.*/ + updateClientMemUsageAndBucket(c); + } +} + +void setClientLastWritten(void *data, int res) { + client *c = data; + c->nwritten = res; +} + /* This function is called just before entering the event loop, in the hope * we can just write the replies to the client output buffer without any * need to use a syscall in order to install the writable event handler, @@ -2443,34 +2520,61 @@ int handleClientsWithPendingWrites(void) { while ((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flag.pending_write = 0; - listUnlinkNode(server.clients_pending_write, ln); /* If a client is protected, don't do anything, * that may trigger write error or recreate handler. */ - if (c->flag.protected) continue; + if (c->flag.protected) { + listUnlinkNode(server.clients_pending_write, ln); + continue; + } /* Don't write to clients that are going to be closed anyway. */ - if (c->flag.close_asap) continue; + if (c->flag.close_asap) { + listUnlinkNode(server.clients_pending_write, ln); + continue; + } - if (!clientHasPendingReplies(c)) continue; + if (!clientHasPendingReplies(c)) { + listUnlinkNode(server.clients_pending_write, ln); + continue; + } /* If we can send the client to the I/O thread, let it handle the write. */ - if (trySendWriteToIOThreads(c) == C_OK) continue; + if (trySendWriteToIOThreads(c) == C_OK) { + listUnlinkNode(server.clients_pending_write, ln); + continue; + } /* We can't write to the client while IO operation is in progress. */ - if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) continue; + if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) { + listUnlinkNode(server.clients_pending_write, ln); + continue; + } processed++; - - /* Try to write buffers to the client socket. */ - if (writeToClient(c) == C_ERR) continue; - - /* If after the synchronous writes above we still have data to - * output to the client, we need to install the writable handler. */ - if (clientHasPendingReplies(c)) { - installClientWriteHandler(c); + if (_canWriteUsingIOUring(c)) { + c->flag.pending_io_uring_write = 1; + if (ioUringPrepWrite(c, c->conn->fd, c->buf + c->sentlen, c->bufpos - c->sentlen) == IO_URING_ERR) { + listUnlinkNode(server.clients_pending_write, ln); + continue; + } + } else { + listUnlinkNode(server.clients_pending_write, ln); + /* Try to write buffers to the client socket. */ + if (writeToClient(c) == C_ERR) continue; + + /* If after the synchronous writes above we still have data to + * output to the client, we need to install the writable handler. */ + if (clientHasPendingReplies(c)) { + installClientWriteHandler(c); + } } } + + if (server.io_uring_enabled && server.io_threads_num == 1 && listLength(server.clients_pending_write) > 0) { + ioUringWaitWriteBarrier(setClientLastWritten); + _postIOUringWrite(); + } return processed; } diff --git a/src/server.c b/src/server.c index d332e6989c..d302996534 100644 --- a/src/server.c +++ b/src/server.c @@ -41,6 +41,7 @@ #include "threads_mngr.h" #include "fmtargs.h" #include "io_threads.h" +#include "io_uring.h" #include #include @@ -2821,6 +2822,12 @@ void initListeners(void) { void InitServerLast(void) { bioInit(); initIOThreads(); + if (server.io_uring_enabled) { + if (initIOUring() == IO_URING_ERR) { + serverLog(LL_WARNING, "Failed to initialize io_uring."); + exit(1); + } + } set_jemalloc_bg_thread(server.jemalloc_bg_thread); server.initial_memory_usage = zmalloc_used_memory(); } @@ -6968,6 +6975,7 @@ int main(int argc, char **argv) { aeMain(server.el); aeDeleteEventLoop(server.el); + if (server.io_uring_enabled) freeIOUring(); return 0; } diff --git a/src/server.h b/src/server.h index ccdece20dd..53959b7f71 100644 --- a/src/server.h +++ b/src/server.h @@ -1229,7 +1229,8 @@ typedef struct ClientFlags { * By using this flag, we ensure that the RDB client remains intact until the replica * \ has successfully initiated PSYNC. */ uint64_t repl_rdb_channel : 1; /* Dual channel replication sync: track a connection which is used for rdb snapshot */ - uint64_t reserved : 7; /* Reserved for future use */ + uint64_t pending_io_uring_write : 1; /* Client has output to send using io_uring. */ + uint64_t reserved : 6; /* Reserved for future use */ } ClientFlags; typedef struct client { @@ -2219,6 +2220,7 @@ struct valkeyServer { sds availability_zone; /* When run in a cloud environment we can configure the availability zone it is running in */ /* Local environment */ char *locale_collate; + int io_uring_enabled; /* If io_uring is enabled */ }; #define MAX_KEYS_BUFFER 256 diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 5396cd2e56..59a5f53853 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -559,6 +559,7 @@ start_server {tags {"introspection"}} { req-res-logfile client-default-resp dual-channel-replication-enabled + io-uring-enabled } if {!$::tls} { diff --git a/valkey.conf b/valkey.conf index 1261432c22..01b4a0d9a7 100644 --- a/valkey.conf +++ b/valkey.conf @@ -2360,3 +2360,9 @@ jemalloc-bg-thread yes # we may also use this when making decisions for replication. # # availability-zone "zone-name" + +# If Valkey is compiled with io_uring support and liburing is installed in the +# system, then io_uring can be enabled with this config. The io_uring kernel +# interface was adopted in Linux kernel version 5.1. +# +# io-uring-enabled no From f6c6dd76e9e056d23f7429653c3c916ee1f3639f Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Mon, 5 Aug 2024 09:46:56 +0000 Subject: [PATCH 2/3] code enhancement according to the feedback from secwall. Signed-off-by: Lipeng Zhu --- src/networking.c | 21 +++++++++++++-------- src/server.h | 3 +-- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/networking.c b/src/networking.c index 30b1d7016f..86895b29c6 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2428,8 +2428,9 @@ int processIOThreadsWriteDone(void) { /* If client is suitable to use io_uring to handle the write request. */ static inline int _canWriteUsingIOUring(client *c) { if (server.io_uring_enabled && server.io_threads_num == 1) { - /* Currently, we only use io_uring to handle the static buffer write requests. */ - return getClientType(c) != CLIENT_TYPE_REPLICA && listLength(c->reply) == 0 && c->bufpos > 0; + /* Currently, we only use io_uring to handle the static buffer write requests. + * If io-threads or tls is enabled, skip the io_uring. */ + return connIsTLS(c->conn) == 0 && getClientType(c) != CLIENT_TYPE_REPLICA && listLength(c->reply) == 0 && c->bufpos > 0; } return 0; } @@ -2479,7 +2480,6 @@ static void _postIOUringWrite(void) { listRewind(server.clients_pending_write, &li); while ((ln = listNext(&li))) { client *c = listNodeValue(ln); - c->flag.pending_io_uring_write = 0; listUnlinkNode(server.clients_pending_write, ln); if (_checkPendingIOUringWriteState(c) == IO_URING_ERR) continue; @@ -2540,26 +2540,31 @@ int handleClientsWithPendingWrites(void) { } /* If we can send the client to the I/O thread, let it handle the write. */ - if (trySendWriteToIOThreads(c) == C_OK) { + if (server.io_threads_num > 1) { listUnlinkNode(server.clients_pending_write, ln); - continue; + if (trySendWriteToIOThreads(c) == C_OK) { + continue; + } } /* We can't write to the client while IO operation is in progress. */ if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) { - listUnlinkNode(server.clients_pending_write, ln); + if (server.io_threads_num == 1) { + listUnlinkNode(server.clients_pending_write, ln); + } continue; } processed++; if (_canWriteUsingIOUring(c)) { - c->flag.pending_io_uring_write = 1; if (ioUringPrepWrite(c, c->conn->fd, c->buf + c->sentlen, c->bufpos - c->sentlen) == IO_URING_ERR) { listUnlinkNode(server.clients_pending_write, ln); continue; } } else { - listUnlinkNode(server.clients_pending_write, ln); + if (server.io_threads_num == 1) { + listUnlinkNode(server.clients_pending_write, ln); + } /* Try to write buffers to the client socket. */ if (writeToClient(c) == C_ERR) continue; diff --git a/src/server.h b/src/server.h index 53959b7f71..f2b348671a 100644 --- a/src/server.h +++ b/src/server.h @@ -1229,8 +1229,7 @@ typedef struct ClientFlags { * By using this flag, we ensure that the RDB client remains intact until the replica * \ has successfully initiated PSYNC. */ uint64_t repl_rdb_channel : 1; /* Dual channel replication sync: track a connection which is used for rdb snapshot */ - uint64_t pending_io_uring_write : 1; /* Client has output to send using io_uring. */ - uint64_t reserved : 6; /* Reserved for future use */ + uint64_t reserved : 7; /* Reserved for future use */ } ClientFlags; typedef struct client { From 2664c487d3daeee84b99a1986b76b65bec8fbd74 Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Tue, 13 Aug 2024 07:25:16 +0000 Subject: [PATCH 3/3] clang-format Signed-off-by: Lipeng Zhu --- src/server.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.h b/src/server.h index 0827eb49c8..828f3b08ef 100644 --- a/src/server.h +++ b/src/server.h @@ -2219,7 +2219,7 @@ struct valkeyServer { sds availability_zone; /* When run in a cloud environment we can configure the availability zone it is running in */ /* Local environment */ char *locale_collate; - char *debug_context; /* A free-form string that has no impact on server except being included in a crash report. */ + char *debug_context; /* A free-form string that has no impact on server except being included in a crash report. */ int io_uring_enabled; /* If io_uring is enabled */ };