Skip to content

Commit

Permalink
Start ceph nvmeof discovery service in sub-process
Browse files Browse the repository at this point in the history
Use SIGINT/KeyboardInterrupt for graceful discovery shutdown

Signed-off-by: Alexander Indenbaum <[email protected]>
  • Loading branch information
Alexander Indenbaum committed Oct 7, 2023
1 parent 666cd04 commit 406dc72
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 16 deletions.
15 changes: 14 additions & 1 deletion .github/workflows/build-container.yml
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ jobs:
discovery:
needs: build
strategy:
fail-fast: false
matrix:
integration: ["container", "embedded"]
runs-on: ubuntu-latest
env:
HUGEPAGES: 768 # 3 spdk instances
Expand All @@ -285,10 +289,12 @@ jobs:
docker load < bdevperf.tar
- name: Start discovery controller
if: matrix.integration == 'container'
run: |
docker-compose up --detach discovery
- name: Wait for discovery controller to be listening
if: matrix.integration == 'container'
timeout-minutes: 3
run: |
. .env
Expand Down Expand Up @@ -392,7 +398,14 @@ jobs:
echo "ℹ️ bdevperf start up logs"
make logs SVC=bdevperf
eval $(make run SVC=bdevperf OPTS="--entrypoint=env" | grep BDEVPERF_SOCKET | tr -d '\n\r' )
ip=$(container_ip $DISC1)
if [ "${{ matrix.integration }}" == "embedded" ]; then
ip=$(container_ip $GW1)
echo "ℹ️ Using discovery service in gateway $GW1 ip $ip"
else
ip=$(container_ip $DISC1)
echo "ℹ️ Using standalone discovery container $DISC1 ip $ip"
fi
rpc="/usr/libexec/spdk/scripts/rpc.py"
echo "ℹ️ bdevperf bdev_nvme_set_options"
make exec SVC=bdevperf OPTS=-T CMD="$rpc -v -s $BDEVPERF_SOCKET bdev_nvme_set_options -r -1"
Expand Down
2 changes: 1 addition & 1 deletion ceph-nvmeof.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ state_update_notify = True
state_update_interval_sec = 5
#min_controller_id = 1
#max_controller_id = 65519
enable_discovery_controller = false
enable_spdk_discovery_controller = false

[discovery]
addr = 0.0.0.0
Expand Down
2 changes: 2 additions & 0 deletions control/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,8 @@ def start_service(self):
"""Enable listening on the server side."""

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Set the SO_REUSEADDR option
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((self.discovery_addr, int(self.discovery_port)))
sock.listen(MAX_CONNECTION)
sock.setblocking(False)
Expand Down
64 changes: 51 additions & 13 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import json
import logging
import signal
import traceback
from concurrent import futures
from google.protobuf import json_format

Expand All @@ -28,6 +27,7 @@
from .proto import gateway_pb2_grpc as pb2_grpc
from .state import GatewayState, LocalGatewayState, OmapGatewayState, GatewayStateHandler
from .grpc import GatewayService
from .discovery import DiscoveryService

def sigchld_handler(signum, frame):
"""Handle SIGCHLD, runs when a spdk process terminates."""
Expand All @@ -43,7 +43,7 @@ def sigchld_handler(signum, frame):
exit_code = os.waitstatus_to_exitcode(wait_status)

# GW process should exit now
raise SystemExit(f"spdk subprocess terminated {pid=} {exit_code=}")
raise SystemExit(f"Gateway subprocess terminated {pid=} {exit_code=}")

class GatewayServer:
"""Runs SPDK and receives client requests for the gateway service.
Expand All @@ -56,6 +56,7 @@ class GatewayServer:
spdk_rpc_client: Client of SPDK RPC server
spdk_rpc_ping_client: Ping client of SPDK RPC server
spdk_process: Subprocess running SPDK NVMEoF target application
discovery_pid: Subprocess running Ceph nvmeof discovery service
"""

def __init__(self, config):
Expand All @@ -64,6 +65,7 @@ def __init__(self, config):
self.spdk_process = None
self.gateway_rpc = None
self.server = None
self.discovery_pid = None

self.name = self.config.get("gateway", "name")
if not self.name:
Expand All @@ -79,22 +81,32 @@ def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
self.logger.exception("GatewayServer exception occurred:")

signal.signal(signal.SIGCHLD, signal.SIG_IGN)
if self.spdk_process is not None:
self._stop_spdk()

if self.server is not None:
self.logger.info("Stopping the server...")
self.server.stop(None)

if self.discovery_pid:
self._stop_discovery()

self.logger.info("Exiting the gateway process.")

def serve(self):
"""Starts gateway server."""
self.logger.debug("Starting serve")

# install SIGCHLD handler
signal.signal(signal.SIGCHLD, sigchld_handler)

# Start SPDK
self._start_spdk()

# Start discovery service
self._start_discovery_service()

# Register service implementation with server
omap_state = OmapGatewayState(self.config)
local_state = LocalGatewayState()
Expand All @@ -113,13 +125,28 @@ def serve(self):

# Start server
self.server.start()
enable_discovery_controller = self.config.getboolean_with_default("gateway", "enable_discovery_controller", False)
if not enable_discovery_controller:
try:
rpc_nvmf.nvmf_delete_subsystem(self.spdk_rpc_ping_client, "nqn.2014-08.org.nvmexpress.discovery")
except Exception as ex:
self.logger.error(f" Delete Discovery subsystem returned with error: \n {ex}")
raise


def _start_discovery_service(self):
"""Runs either SPDK on CEPH NVMEOF Discovery Service."""
enable_spdk_discovery_controller = self.config.getboolean_with_default("gateway", "enable_spdk_discpovery_controller", False)
if enable_spdk_discovery_controller:
self.logger.info("Using SPDK discovery service")
return

try:
rpc_nvmf.nvmf_delete_subsystem(self.spdk_rpc_ping_client, "nqn.2014-08.org.nvmexpress.discovery")
except Exception as ex:
self.logger.error(f" Delete Discovery subsystem returned with error: \n {ex}")
raise

# run ceph nvmeof discovery service in sub-process
assert self.discovery_pid is None
self.discovery_pid = os.fork()
if self.discovery_pid == 0:
self.logger.info("Starting ceph nvmeof discovery service")
DiscoveryService(self.config).start_service()
os._exit(0)

def _add_server_listener(self):
"""Adds listener port to server."""
Expand Down Expand Up @@ -170,9 +197,6 @@ def _start_spdk(self):
cmd += shlex.split(spdk_tgt_cmd_extra_args)
self.logger.info(f"Starting {' '.join(cmd)}")
try:
# install SIGCHLD handler
signal.signal(signal.SIGCHLD, sigchld_handler)

# start spdk process
self.spdk_process = subprocess.Popen(cmd)
except Exception as ex:
Expand Down Expand Up @@ -221,7 +245,6 @@ def _stop_spdk(self):
rpc_socket = self.config.get("spdk", "rpc_socket")

# Terminate spdk process
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
if return_code is not None:
self.logger.error(f"SPDK({self.name}) pid {self.spdk_process.pid} "
f"already terminated, exit code: {return_code}")
Expand All @@ -245,6 +268,21 @@ def _stop_spdk(self):
self.logger.exception(f"An error occurred while removing "
f"rpc socket {rpc_socket}:")

def _stop_discovery(self):
"""Stops Discovery service process."""
assert self.discovery_pid is not None # should be verified by the caller

self.logger.info("Terminating discovery service...")
# discovery service selector loop should exit due to KeyboardInterrupt exception
try:
os.kill(self.discovery_pid, signal.SIGINT)
os.waitpid(self.discovery_pid, 0)
except ChildProcessError:
pass # ignore
self.logger.info("Discovery service terminated")

self.discovery_pid = None

def _create_transport(self, trtype):
"""Initializes a transport type."""
args = {'trtype': trtype}
Expand Down
2 changes: 1 addition & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def _config(self, config):
self.config = config

def validate_exception(self, e):
pattern = r'spdk subprocess terminated pid=(\d+) exit_code=(\d+)'
pattern = r'Gateway subprocess terminated pid=(\d+) exit_code=(\d+)'
m = re.match(pattern, e.code)
assert(m)
pid = int(m.group(1))
Expand Down

0 comments on commit 406dc72

Please sign in to comment.