Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use io_uring to batch handle clients pending writes to reduce SYSCALL count. #112

Open
wants to merge 6 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,17 @@ else
LIBCRYPTO_LIBS=-lcrypto
endif

# only Linux has IO_URING support
ifeq ($(uname_S),Linux)
HAS_LIBURING := $(shell sh -c 'echo "$(NUMBER_SIGN_CHAR)include <liburing.h>" > foo.c; \
$(CC) -E foo.c > /dev/null 2>&1 && echo yes; \
rm foo.c')
ifeq ($(HAS_LIBURING),yes)
FINAL_CFLAGS+= -DHAVE_LIBURING
FINAL_LIBS+= -luring
endif
endif

BUILD_NO:=0
BUILD_YES:=1
BUILD_MODULE:=2
Expand Down Expand Up @@ -423,7 +434,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o io_uring.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3107,6 +3107,7 @@ standardConfig static_configs[] = {
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("io-uring-enabled", NULL, IMMUTABLE_CONFIG, server.io_uring_enabled, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
88 changes: 88 additions & 0 deletions src/io_uring.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include "io_uring.h"

#ifdef HAVE_LIBURING
#include <liburing.h>
#include <string.h>
#include "zmalloc.h"

/* io_uring instance queue depth. */
#define IO_URING_DEPTH 256

static struct io_uring *_io_uring;
static size_t io_uring_write_queue_len = 0;

/* Initialize io_uring at server startup if io_uring enabled,
* setup io_uring submission and completion. */
int initIOUring(void) {
struct io_uring_params params;
_io_uring = zmalloc(sizeof(struct io_uring));
memset(&params, 0, sizeof(params));
/* On success, io_uring_queue_init_params(3) returns 0 and _io_uring will
* point to the shared memory containing the io_uring queues.
* On failure -errno is returned. */
if (io_uring_queue_init_params(IO_URING_DEPTH, _io_uring, &params) < 0) return IO_URING_ERR;
return IO_URING_OK;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deal IO_URING_OK in io_uring.c only, outside should not handle IO uring related code any more. Please convert IO_URING_OK to C_OK, so does IO_URING_ERR.

}

/* Use io_uring to handle the client write request. */
int ioUringPrepWrite(void *data, int fd, const void *buf, size_t len) {
struct io_uring_sqe *sqe = io_uring_get_sqe(_io_uring);
if (sqe == NULL) return IO_URING_ERR;
io_uring_prep_send(sqe, fd, buf, len, MSG_DONTWAIT);
io_uring_sqe_set_data(sqe, data);
io_uring_write_queue_len++;
return IO_URING_OK;
}

/* Submit requests to the submission queue and wait for completion. */
int ioUringWaitWriteBarrier(io_uring_cqe_handler cqe_handler) {
if (io_uring_submit(_io_uring) < 0) return IO_URING_ERR;
while (io_uring_write_queue_len) {
struct io_uring_cqe *cqe;
int ret = io_uring_wait_cqe(_io_uring, &cqe);
if (ret == 0) {
if (cqe_handler) {
void *data = io_uring_cqe_get_data(cqe);
cqe_handler(data, cqe->res);
}
io_uring_cqe_seen(_io_uring, cqe);
io_uring_write_queue_len--;
} else {
return IO_URING_ERR;
}
}
return IO_URING_OK;
}

/* Free io_uring. */
void freeIOUring(void) {
io_uring_queue_exit(_io_uring);
zfree(_io_uring);
_io_uring = NULL;
}
#else
#ifndef UNUSED
#define UNUSED(V) ((void)V)
#endif

int initIOUring(void) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is called if server.io_uring_enabled is true, this means a user specifies io-uring-enabled yes. So I think error log should be printed here and return error instead of a silent error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got your point, maybe I should simply return C_ERR in the dummy initIOUring, error will be printed and exit.

void InitServerLast(void) {
    bioInit();
    initIOThreads();
    if (server.io_uring_enabled) {
        if (initIOUring() == IO_URING_ERR) {
            serverLog(LL_WARNING, "Failed to initialize io_uring.");
            exit(1);
        }
    }
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lipzhu
I prefer explicit log message in initIOUring like: "IO Uring unsupported on this platform". This will distinguish HAVE_LIBURING branch easily.

#ifdef HAVE_LIBURING
int initIOUring(void) {
...
if (io_uring_queue_init_params(IO_URING_DEPTH, _io_uring, &params) < 0) return IO_URING_ERR;
...
}

Once initIOUring returns error, user would get enough message to know HAVE_LIBURING is disabled or io-uring init fails.

return 0;
}

int ioUringPrepWrite(void *data, int fd, const void *buf, size_t len) {
UNUSED(data);
UNUSED(fd);
UNUSED(buf);
UNUSED(len);
return 0;
}

int ioUringWaitWriteBarrier(io_uring_cqe_handler cqe_handler) {
UNUSED(cqe_handler);
return 0;
}

void freeIOUring(void) {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These dummy stubs are never called, right? They're defined just to make it compile for when we don't have liburing?

Should we mark them as dead code in some way as? Assert that they're never called?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert sounds like a great idea.

I also like @pizhenwei's suggestion of failing the dummy initIOUring, which should be sufficient to cover the user errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are unused, just to make sure it can compile.

Should we mark them as dead code in some way as? Assert that they're never called?

How about adding the __attribute__((unused)) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @zuiderkwast
do you mean this style?

void freeIOUring(void) {
    assert(0); /* this should never happen */
}


#endif
15 changes: 15 additions & 0 deletions src/io_uring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef IO_URING_H
#define IO_URING_H
#include <stddef.h>

#define IO_URING_OK 0
#define IO_URING_ERR -1

typedef void (*io_uring_cqe_handler)(void *, int);

int initIOUring(void);
int ioUringPrepWrite(void *data, int fd, const void *buf, size_t len);
int ioUringWaitWriteBarrier(io_uring_cqe_handler cqe_handler);
void freeIOUring(void);

#endif /* IO_URING_H */
136 changes: 123 additions & 13 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "fpconv_dtoa.h"
#include "fmtargs.h"
#include "io_threads.h"
#include "io_uring.h"
#include <strings.h>
#include <sys/socket.h>
#include <sys/uio.h>
Expand Down Expand Up @@ -2448,6 +2449,83 @@ int processIOThreadsWriteDone(void) {
return processed;
}

/* If client is suitable to use io_uring to handle the write request. */
static inline int _canWriteUsingIOUring(client *c) {
if (server.io_uring_enabled && server.io_threads_num == 1) {
/* Currently, we only use io_uring to handle the static buffer write requests.
* If io-threads or tls is enabled, skip the io_uring. */
return connIsTLS(c->conn) == 0 && getClientType(c) != CLIENT_TYPE_REPLICA && listLength(c->reply) == 0 &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These conditions don't cover RDMA. Does it work or should we exclude that too? What about other fake clients, like the fake client used from Lua?

Rather then defining a negated condition for skipping it, like "not TLS", it's usually better to a have a positive condition for when it's known to work. In the future, we may add more connection types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,
In fact, RDMA supports async API only, it uses send queue, receive queue and completion queue, but they are different from IO uring queue, so IO uring can't support RDMA.

Copy link
Member

@PingXie PingXie Jan 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think we can safely say RDMA and io-uring are "mutually exclusive" or "non-overlapping".

c->bufpos > 0;
}
return 0;
}

/* Check the completed io_uring event and update the state. */
static int _checkPendingIOUringWriteState(client *c) {
/* Note that where synchronous system calls will return -1 on
* failure and set errno to the actual error value,
* io_uring never uses errno. Instead it returns the negated
* errno directly in the CQE res field. */
if (c->nwritten <= 0) {
if (c->nwritten != -EAGAIN) {
c->conn->last_errno = -(c->nwritten);
/* Don't overwrite the state of a connection that is not already
* connected, not to mess with handler callbacks. */
if (c->nwritten != -EINTR && c->conn->state == CONN_STATE_CONNECTED) c->conn->state = CONN_STATE_ERROR;
}
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE, "Error writing to client: %s", connGetLastError(c->conn));
freeClientAsync(c);
}
return IO_URING_ERR;
}

c->sentlen += c->nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
server.stat_net_output_bytes += c->nwritten;
c->net_output_bytes += c->nwritten;

/* For clients representing masters we don't count sending data
* as an interaction, since we always send REPLCONF ACK commands
* that take some time to just fill the socket output buffer.
* We just rely on data / pings received for timeout detection. */
if (!c->flag.primary) c->last_interaction = server.unixtime;

return IO_URING_OK;
}

static void _postIOUringWrite(void) {
listIter li;
listNode *ln;
listRewind(server.clients_pending_write, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
listUnlinkNode(server.clients_pending_write, ln);

if (_checkPendingIOUringWriteState(c) == IO_URING_ERR) continue;
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
/* Close connection after entire reply has been sent. */
if (c->flag.close_after_reply) {
freeClientAsync(c);
continue;
}
}
/* Update client's memory usage after writing.*/
updateClientMemUsageAndBucket(c);
}
}

void setClientLastWritten(void *data, int res) {
client *c = data;
c->nwritten = res;
}

/* This function is called just before entering the event loop, in the hope
* we can just write the replies to the client output buffer without any
* need to use a syscall in order to install the writable event handler,
Expand All @@ -2467,34 +2545,66 @@ int handleClientsWithPendingWrites(void) {
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flag.pending_write = 0;
listUnlinkNode(server.clients_pending_write, ln);

/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
if (c->flag.protected) continue;
if (c->flag.protected) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

/* Don't write to clients that are going to be closed anyway. */
if (c->flag.close_asap) continue;
if (c->flag.close_asap) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

if (!clientHasPendingReplies(c)) continue;
if (!clientHasPendingReplies(c)) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

/* If we can send the client to the I/O thread, let it handle the write. */
if (trySendWriteToIOThreads(c) == C_OK) continue;
if (server.io_threads_num > 1) {
listUnlinkNode(server.clients_pending_write, ln);
if (trySendWriteToIOThreads(c) == C_OK) {
continue;
}
}

/* We can't write to the client while IO operation is in progress. */
if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) continue;
if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) {
if (server.io_threads_num == 1) {
listUnlinkNode(server.clients_pending_write, ln);
}
continue;
}

processed++;
if (_canWriteUsingIOUring(c)) {
if (ioUringPrepWrite(c, c->conn->fd, c->buf + c->sentlen, c->bufpos - c->sentlen) == IO_URING_ERR) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}
} else {
if (server.io_threads_num == 1) {
listUnlinkNode(server.clients_pending_write, ln);
}
/* Try to write buffers to the client socket. */
if (writeToClient(c) == C_ERR) continue;

/* Try to write buffers to the client socket. */
if (writeToClient(c) == C_ERR) continue;

/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
installClientWriteHandler(c);
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
installClientWriteHandler(c);
}
}
}

if (server.io_uring_enabled && server.io_threads_num == 1 && listLength(server.clients_pending_write) > 0) {
ioUringWaitWriteBarrier(setClientLastWritten);
_postIOUringWrite();
}
return processed;
}

Expand Down
8 changes: 8 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "threads_mngr.h"
#include "fmtargs.h"
#include "io_threads.h"
#include "io_uring.h"
#include "sds.h"

#include <time.h>
Expand Down Expand Up @@ -2851,6 +2852,12 @@ void initListeners(void) {
void InitServerLast(void) {
bioInit();
initIOThreads();
if (server.io_uring_enabled) {
if (initIOUring() == IO_URING_ERR) {
serverLog(LL_WARNING, "Failed to initialize io_uring.");
exit(1);
}
}
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
Expand Down Expand Up @@ -7051,6 +7058,7 @@ int main(int argc, char **argv) {

aeMain(server.el);
aeDeleteEventLoop(server.el);
if (server.io_uring_enabled) freeIOUring();
return 0;
}

Expand Down
3 changes: 2 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2240,7 +2240,8 @@ struct valkeyServer {
sds availability_zone; /* When run in a cloud environment we can configure the availability zone it is running in */
/* Local environment */
char *locale_collate;
char *debug_context; /* A free-form string that has no impact on server except being included in a crash report. */
char *debug_context; /* A free-form string that has no impact on server except being included in a crash report. */
int io_uring_enabled; /* If io_uring is enabled */
};

#define MAX_KEYS_BUFFER 256
Expand Down
1 change: 1 addition & 0 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ start_server {tags {"introspection"}} {
req-res-logfile
client-default-resp
dual-channel-replication-enabled
io-uring-enabled
}

if {!$::tls} {
Expand Down
6 changes: 6 additions & 0 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2392,3 +2392,9 @@ jemalloc-bg-thread yes
# we may also use this when making decisions for replication.
#
# availability-zone "zone-name"

# If Valkey is compiled with io_uring support and liburing is installed in the
# system, then io_uring can be enabled with this config. The io_uring kernel
# interface was adopted in Linux kernel version 5.1.
#
# io-uring-enabled no
Loading