Skip to content

Commit

Permalink
*: Round-robin choose channel by source ip
Browse files Browse the repository at this point in the history
Derived from: tikv#53

Signed-off-by: lucasliang <[email protected]>
  • Loading branch information
LykxSassinator committed Mar 6, 2024
1 parent 38a9cd9 commit 2c7a492
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 20 deletions.
45 changes: 27 additions & 18 deletions src/core/lib/iomgr/tcp_server_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
s->fd_handler = nullptr;
s->memory_quota =
grpc_core::ResourceQuotaFromChannelArgs(args)->memory_quota();
gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
gpr_atm_no_barrier_store(&s->next_pollset_to_assign_ids[""], 0);
*server = s;
return GRPC_ERROR_NONE;
}
Expand Down Expand Up @@ -253,15 +253,8 @@ static void on_read(void* arg, grpc_error_handle err) {
addr_str.c_str());
}

std::string name = absl::StrCat("tcp-server-connection:", addr_str);
grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);

read_notifier_pollset = (*(sp->server->pollsets))
[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
&sp->server->next_pollset_to_assign, 1)) %
sp->server->pollsets->size()];

grpc_pollset_add_fd(read_notifier_pollset, fdobj);
// Create and bind fd randomly with the given addr.
grpc_fd* fdobj = randomly_bind_tcp_server(fd, addr_str, sp);

// Create acceptor.
grpc_tcp_server_acceptor* acceptor =
Expand All @@ -276,7 +269,7 @@ static void on_read(void* arg, grpc_error_handle err) {
read_notifier_pollset, acceptor);
}

GPR_UNREACHABLE_CODE(return );
GPR_UNREACHABLE_CODE(return);

error:
gpr_mu_lock(&sp->server->mu);
Expand Down Expand Up @@ -600,13 +593,8 @@ class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
gpr_log(GPR_INFO, "SERVER_CONNECT: incoming external connection: %s",
addr_str.c_str());
}
std::string name = absl::StrCat("tcp-server-connection:", addr_str);
grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
read_notifier_pollset =
(*(s_->pollsets))[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
&s_->next_pollset_to_assign, 1)) %
s_->pollsets->size()];
grpc_pollset_add_fd(read_notifier_pollset, fdobj);
// Create and bind fd randomly with the given addr.
grpc_fd* fdobj = randomly_bind_tcp_server(fd, addr_str, s_);
grpc_tcp_server_acceptor* acceptor =
static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
acceptor->from_server = s_;
Expand All @@ -631,6 +619,27 @@ static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
return s->fd_handler;
}

grpc_fd* randomly_bind_tcp_server(int fd, const std::string& addr_str,
grpc_tcp_server* s) {
// addr_str format: ipv4/ipv6:ipv6:port
std::size_t start = addr_str.find_first_of(":") + 1;
std::size_t end = addr_str.find(":", start);
std::string ip = addr_str.substr(start, end - start);

std::string name = absl::StrCat("tcp-server-connection:", addr_str);
grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);

// Randomly choose one channel idx for this fd.
std::size_t cq_idx = static_cast<size_t>(rand()) % s->pollsets->size();
if (!gpr_atm_no_barrier_cas(&s->next_pollset_to_assign_ids[ip], 0, cq_idx)) {
cq_idx = static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
&s->next_pollset_to_assign_ids[ip], 1)) %
s->pollsets->size();
}
grpc_pollset_add_fd((*(s->pollsets))[cq_idx], fdobj);
return fdobj;
}

grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
tcp_server_create, tcp_server_start,
tcp_server_add_port, tcp_server_create_fd_handler,
Expand Down
7 changes: 5 additions & 2 deletions src/core/lib/iomgr/tcp_server_utils_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#include <grpc/support/port_platform.h>

#include <unordered_map>

#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
Expand Down Expand Up @@ -87,8 +89,9 @@ struct grpc_tcp_server {
* owned by this struct */
const std::vector<grpc_pollset*>* pollsets = nullptr;

/* next pollset to assign a channel to */
gpr_atm next_pollset_to_assign = 0;
/* next pollset to assign a channel to, it is a map from pollset name to ip
* address */
std::unordered_map<std::string, gpr_atm> next_pollset_to_assign_ids;

/* channel args for this server */
grpc_channel_args* channel_args = nullptr;
Expand Down

0 comments on commit 2c7a492

Please sign in to comment.