Skip to content

Commit

Permalink
chore: fix shutdown sequence in Dragonfly server (#4168)
Browse files Browse the repository at this point in the history
1. Better logging in regtests
2. Release resources in dfly_main in more controlled manner.
3. Switch to ignoring signals when unregister signal handlers during the shutdown.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Nov 24, 2024
1 parent cfca3e7 commit 91caa94
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 42 deletions.
2 changes: 1 addition & 1 deletion helio
46 changes: 22 additions & 24 deletions src/server/dfly_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -762,44 +762,42 @@ Usage: dragonfly [FLAGS]

fb2::SetDefaultStackResource(&fb2::std_malloc_resource, kFiberDefaultStackSize);

unique_ptr<util::ProactorPool> pool;
{
unique_ptr<util::ProactorPool> pool;

#ifdef __linux__
base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver);
base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver);

CHECK_LT(kver.major, 99u);
dfly::kernel_version = kver.kernel * 100 + kver.major;
CHECK_LT(kver.major, 99u);
dfly::kernel_version = kver.kernel * 100 + kver.major;

bool use_epoll = ShouldUseEpollAPI(kver);
bool use_epoll = ShouldUseEpollAPI(kver);

if (use_epoll) {
pool.reset(fb2::Pool::Epoll(max_available_threads));
} else {
pool.reset(fb2::Pool::IOUring(1024, max_available_threads)); // 1024 - iouring queue size.
}
if (use_epoll) {
pool.reset(fb2::Pool::Epoll(max_available_threads));
} else {
pool.reset(fb2::Pool::IOUring(1024, max_available_threads)); // 1024 - iouring queue size.
}
#else
pool.reset(fb2::Pool::Epoll(max_available_threads));
pool.reset(fb2::Pool::Epoll(max_available_threads));
#endif

pool->Run();
pool->Run();

SetupAllocationTracker(pool.get());
SetupAllocationTracker(pool.get());

AcceptServer acceptor(pool.get(), &fb2::std_malloc_resource, true);
acceptor.set_back_log(absl::GetFlag(FLAGS_tcp_backlog));
AcceptServer acceptor(pool.get(), &fb2::std_malloc_resource, true);
acceptor.set_back_log(absl::GetFlag(FLAGS_tcp_backlog));

dfly::RunEngine(pool.get(), &acceptor);
dfly::RunEngine(pool.get(), &acceptor);

pool->Stop();
pool->Stop();

if (!pidfile_path.empty()) {
unlink(pidfile_path.c_str());
if (!pidfile_path.empty()) {
unlink(pidfile_path.c_str());
}
}

// Returns memory to OS.
// This is a workaround for a bug in mi_malloc that may cause a crash on exit.
mi_collect(true);

return 0;
}
2 changes: 1 addition & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ Service::Service(ProactorPool* pp)

Service::~Service() {
#ifdef PRINT_STACKTRACES_ON_SIGNAL
ProactorBase::ClearSignal({SIGUSR1});
ProactorBase::ClearSignal({SIGUSR1}, true);
#endif

delete shard_set;
Expand Down
8 changes: 8 additions & 0 deletions tests/dragonfly/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ async def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
path=path,
cwd=tmp_dir,
gdb=request.config.getoption("--gdb"),
direct_output=request.config.getoption("--direct-out"),
buffered_out=request.config.getoption("--buffered-output"),
args=parse_args(request.config.getoption("--df")),
existing_port=int(existing) if existing else None,
Expand Down Expand Up @@ -259,6 +260,13 @@ def pytest_addoption(parser):
default=None,
help="Provide a port to the existing memcached process for the test",
)
parser.addoption(
"--direct-out",
action="store_true",
default=False,
help="If true, does not post process dragonfly output",
)

parser.addoption("--repeat", action="store", help="Number of times to repeat each test")


Expand Down
38 changes: 22 additions & 16 deletions tests/dragonfly/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class DflyParams:
path: str
cwd: str
gdb: bool
direct_output: bool
buffered_out: bool
args: Dict[str, Union[str, None]]
existing_port: int
Expand Down Expand Up @@ -186,7 +187,7 @@ def _wait_for_server(self):
try:
self.get_port_from_psutil()
logging.debug(
f"Process started after {time.time() - s:.2f} seconds. port={self.port}"
f"Process {self.proc.pid} started after {time.time() - s:.2f} seconds. port={self.port}"
)
break
except RuntimeError:
Expand All @@ -202,18 +203,19 @@ def _wait_for_server(self):
sed_cmd = ["sed", "-u", "-e", sed_format]
if self.params.buffered_out:
sed_cmd.remove("-u")
self.sed_proc = subprocess.Popen(
sed_cmd,
stdin=self.proc.stdout,
stdout=subprocess.PIPE,
bufsize=1,
universal_newlines=True,
)
self.stacktrace = []
self.sed_thread = threading.Thread(
target=read_sedout, args=(self.sed_proc.stdout, self.stacktrace), daemon=True
)
self.sed_thread.start()
if not self.params.direct_output:
self.sed_proc = subprocess.Popen(
sed_cmd,
stdin=self.proc.stdout,
stdout=subprocess.PIPE,
bufsize=1,
universal_newlines=True,
)
self.stacktrace = []
self.sed_thread = threading.Thread(
target=read_sedout, args=(self.sed_proc.stdout, self.stacktrace), daemon=True
)
self.sed_thread.start()

def set_proc_to_none(self):
self.proc = None
Expand All @@ -235,7 +237,8 @@ def stop(self, kill=False):
# if the return code is positive it means abnormal exit
if proc.returncode != 0:
raise Exception(
f"Dragonfly did not terminate gracefully, exit code {proc.returncode}"
f"Dragonfly did not terminate gracefully, exit code {proc.returncode}, "
f"pid: {proc.pid}"
)

except subprocess.TimeoutExpired:
Expand Down Expand Up @@ -268,15 +271,18 @@ def _start(self):

all_args = self.format_args(self.args)
real_path = os.path.realpath(self.params.path)
logging.debug(f"Starting instance with arguments {' '.join(all_args)} from {real_path}")

run_cmd = [self.params.path, *all_args]
if self.params.gdb:
run_cmd = ["gdb", "--ex", "r", "--args"] + run_cmd

self.proc = subprocess.Popen(
run_cmd, cwd=self.params.cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
run_cmd,
cwd=self.params.cwd,
stdout=None if self.params.direct_output else subprocess.PIPE,
stderr=subprocess.STDOUT,
)
logging.debug(f"Starting {real_path} {' '.join(all_args)}, pid {self.proc.pid}")

def _check_status(self):
if not self.params.existing_port:
Expand Down

0 comments on commit 91caa94

Please sign in to comment.