From 2c7a492f3d46ac061da0a16a64d1bd6f7b3275b0 Mon Sep 17 00:00:00 2001 From: lucasliang Date: Wed, 6 Mar 2024 10:21:49 +0100 Subject: [PATCH] *: Round-robin choose channel by source ip Derived from: https://github.com/tikv/grpc/pull/53 Signed-off-by: lucasliang --- src/core/lib/iomgr/tcp_server_posix.cc | 45 ++++++++++++--------- src/core/lib/iomgr/tcp_server_utils_posix.h | 7 +++- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 580a1f2cff705..6746a97945cfb 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -104,7 +104,7 @@ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete, s->fd_handler = nullptr; s->memory_quota = grpc_core::ResourceQuotaFromChannelArgs(args)->memory_quota(); - gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0); + gpr_atm_no_barrier_store(&s->next_pollset_to_assign_ids[""], 0); *server = s; return GRPC_ERROR_NONE; } @@ -253,15 +253,8 @@ static void on_read(void* arg, grpc_error_handle err) { addr_str.c_str()); } - std::string name = absl::StrCat("tcp-server-connection:", addr_str); - grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true); - - read_notifier_pollset = (*(sp->server->pollsets)) - [static_cast(gpr_atm_no_barrier_fetch_add( - &sp->server->next_pollset_to_assign, 1)) % - sp->server->pollsets->size()]; - - grpc_pollset_add_fd(read_notifier_pollset, fdobj); + // Create and bind fd randomly with the given addr. + grpc_fd* fdobj = randomly_bind_tcp_server(fd, addr_str, sp); // Create acceptor. grpc_tcp_server_acceptor* acceptor = @@ -276,7 +269,7 @@ static void on_read(void* arg, grpc_error_handle err) { read_notifier_pollset, acceptor); } - GPR_UNREACHABLE_CODE(return ); + GPR_UNREACHABLE_CODE(return); error: gpr_mu_lock(&sp->server->mu); @@ -600,13 +593,8 @@ class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler { gpr_log(GPR_INFO, "SERVER_CONNECT: incoming external connection: %s", addr_str.c_str()); } - std::string name = absl::StrCat("tcp-server-connection:", addr_str); - grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true); - read_notifier_pollset = - (*(s_->pollsets))[static_cast(gpr_atm_no_barrier_fetch_add( - &s_->next_pollset_to_assign, 1)) % - s_->pollsets->size()]; - grpc_pollset_add_fd(read_notifier_pollset, fdobj); + // Create and bind fd randomly with the given addr. + grpc_fd* fdobj = randomly_bind_tcp_server(fd, addr_str, s_); grpc_tcp_server_acceptor* acceptor = static_cast(gpr_malloc(sizeof(*acceptor))); acceptor->from_server = s_; @@ -631,6 +619,27 @@ static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler( return s->fd_handler; } +grpc_fd* randomly_bind_tcp_server(int fd, const std::string& addr_str, + grpc_tcp_server* s) { + // addr_str format: ipv4/ipv6:ipv6:port + std::size_t start = addr_str.find_first_of(":") + 1; + std::size_t end = addr_str.find(":", start); + std::string ip = addr_str.substr(start, end - start); + + std::string name = absl::StrCat("tcp-server-connection:", addr_str); + grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true); + + // Randomly choose one channel idx for this fd. + std::size_t cq_idx = static_cast(rand()) % s->pollsets->size(); + if (!gpr_atm_no_barrier_cas(&s->next_pollset_to_assign_ids[ip], 0, cq_idx)) { + cq_idx = static_cast(gpr_atm_no_barrier_fetch_add( + &s->next_pollset_to_assign_ids[ip], 1)) % + s->pollsets->size(); + } + grpc_pollset_add_fd((*(s->pollsets))[cq_idx], fdobj); + return fdobj; +} + grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = { tcp_server_create, tcp_server_start, tcp_server_add_port, tcp_server_create_fd_handler, diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index 79527c2461380..63c27a84674d6 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -21,6 +21,8 @@ #include +#include + #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/socket_utils_posix.h" @@ -87,8 +89,9 @@ struct grpc_tcp_server { * owned by this struct */ const std::vector* pollsets = nullptr; - /* next pollset to assign a channel to */ - gpr_atm next_pollset_to_assign = 0; + /* next pollset to assign a channel to, it is a map from pollset name to ip + * address */ + std::unordered_map next_pollset_to_assign_ids; /* channel args for this server */ grpc_channel_args* channel_args = nullptr;