Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start ceph nvmeof discovery service #249

Merged
merged 1 commit into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion .github/workflows/build-container.yml
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ jobs:

discovery:
needs: build
strategy:
fail-fast: false
matrix:
integration: ["container", "embedded"]
runs-on: ubuntu-latest
env:
HUGEPAGES: 768 # 3 spdk instances
Expand All @@ -285,10 +289,12 @@ jobs:
docker load < bdevperf.tar

- name: Start discovery controller
if: matrix.integration == 'container'
run: |
docker-compose up --detach discovery

- name: Wait for discovery controller to be listening
if: matrix.integration == 'container'
timeout-minutes: 3
run: |
. .env
Expand Down Expand Up @@ -392,7 +398,14 @@ jobs:
echo "ℹ️ bdevperf start up logs"
make logs SVC=bdevperf
eval $(make run SVC=bdevperf OPTS="--entrypoint=env" | grep BDEVPERF_SOCKET | tr -d '\n\r' )
ip=$(container_ip $DISC1)

if [ "${{ matrix.integration }}" == "embedded" ]; then
ip=$(container_ip $GW1)
echo "ℹ️ Using discovery service in gateway $GW1 ip $ip"
else
ip=$(container_ip $DISC1)
echo "ℹ️ Using standalone discovery container $DISC1 ip $ip"
fi
rpc="/usr/libexec/spdk/scripts/rpc.py"
echo "ℹ️ bdevperf bdev_nvme_set_options"
make exec SVC=bdevperf OPTS=-T CMD="$rpc -v -s $BDEVPERF_SOCKET bdev_nvme_set_options -r -1"
Expand Down
2 changes: 1 addition & 1 deletion ceph-nvmeof.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ state_update_notify = True
state_update_interval_sec = 5
#min_controller_id = 1
#max_controller_id = 65519
enable_discovery_controller = false
enable_spdk_discovery_controller = false

[discovery]
addr = 0.0.0.0
Expand Down
64 changes: 51 additions & 13 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import json
import logging
import signal
import traceback
from concurrent import futures
from google.protobuf import json_format

Expand All @@ -28,6 +27,7 @@
from .proto import gateway_pb2_grpc as pb2_grpc
from .state import GatewayState, LocalGatewayState, OmapGatewayState, GatewayStateHandler
from .grpc import GatewayService
from .discovery import DiscoveryService

def sigchld_handler(signum, frame):
"""Handle SIGCHLD, runs when a spdk process terminates."""
Expand All @@ -43,7 +43,7 @@ def sigchld_handler(signum, frame):
exit_code = os.waitstatus_to_exitcode(wait_status)

# GW process should exit now
raise SystemExit(f"spdk subprocess terminated {pid=} {exit_code=}")
raise SystemExit(f"Gateway subprocess terminated {pid=} {exit_code=}")

class GatewayServer:
"""Runs SPDK and receives client requests for the gateway service.
Expand All @@ -56,6 +56,7 @@ class GatewayServer:
spdk_rpc_client: Client of SPDK RPC server
spdk_rpc_ping_client: Ping client of SPDK RPC server
spdk_process: Subprocess running SPDK NVMEoF target application
discovery_pid: Subprocess running Ceph nvmeof discovery service
"""

def __init__(self, config):
Expand All @@ -64,6 +65,7 @@ def __init__(self, config):
self.spdk_process = None
self.gateway_rpc = None
self.server = None
self.discovery_pid = None

self.name = self.config.get("gateway", "name")
if not self.name:
Expand All @@ -79,22 +81,32 @@ def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
self.logger.exception("GatewayServer exception occurred:")

signal.signal(signal.SIGCHLD, signal.SIG_IGN)
if self.spdk_process is not None:
self._stop_spdk()

if self.server is not None:
self.logger.info("Stopping the server...")
self.server.stop(None)

if self.discovery_pid:
self._stop_discovery()

self.logger.info("Exiting the gateway process.")

def serve(self):
"""Starts gateway server."""
self.logger.debug("Starting serve")

# install SIGCHLD handler
signal.signal(signal.SIGCHLD, sigchld_handler)

# Start SPDK
self._start_spdk()

# Start discovery service
self._start_discovery_service()

# Register service implementation with server
omap_state = OmapGatewayState(self.config)
local_state = LocalGatewayState()
Expand All @@ -113,13 +125,28 @@ def serve(self):

# Start server
self.server.start()
enable_discovery_controller = self.config.getboolean_with_default("gateway", "enable_discovery_controller", False)
if not enable_discovery_controller:
try:
rpc_nvmf.nvmf_delete_subsystem(self.spdk_rpc_ping_client, "nqn.2014-08.org.nvmexpress.discovery")
except Exception as ex:
self.logger.error(f" Delete Discovery subsystem returned with error: \n {ex}")
raise


def _start_discovery_service(self):
"""Runs either SPDK on CEPH NVMEOF Discovery Service."""
enable_spdk_discovery_controller = self.config.getboolean_with_default("gateway", "enable_spdk_discpovery_controller", False)
if enable_spdk_discovery_controller:
self.logger.info("Using SPDK discovery service")
return

try:
rpc_nvmf.nvmf_delete_subsystem(self.spdk_rpc_ping_client, "nqn.2014-08.org.nvmexpress.discovery")
except Exception as ex:
self.logger.error(f" Delete Discovery subsystem returned with error: \n {ex}")
raise

# run ceph nvmeof discovery service in sub-process
assert self.discovery_pid is None
self.discovery_pid = os.fork()
if self.discovery_pid == 0:
self.logger.info("Starting ceph nvmeof discovery service")
DiscoveryService(self.config).start_service()
os._exit(0)

def _add_server_listener(self):
"""Adds listener port to server."""
Expand Down Expand Up @@ -170,9 +197,6 @@ def _start_spdk(self):
cmd += shlex.split(spdk_tgt_cmd_extra_args)
self.logger.info(f"Starting {' '.join(cmd)}")
try:
# install SIGCHLD handler
signal.signal(signal.SIGCHLD, sigchld_handler)

# start spdk process
self.spdk_process = subprocess.Popen(cmd)
except Exception as ex:
Expand Down Expand Up @@ -221,7 +245,6 @@ def _stop_spdk(self):
rpc_socket = self.config.get("spdk", "rpc_socket")

# Terminate spdk process
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
if return_code is not None:
self.logger.error(f"SPDK({self.name}) pid {self.spdk_process.pid} "
f"already terminated, exit code: {return_code}")
Expand All @@ -245,6 +268,21 @@ def _stop_spdk(self):
self.logger.exception(f"An error occurred while removing "
f"rpc socket {rpc_socket}:")

def _stop_discovery(self):
"""Stops Discovery service process."""
assert self.discovery_pid is not None # should be verified by the caller

self.logger.info("Terminating discovery service...")
# discovery service selector loop should exit due to KeyboardInterrupt exception
try:
os.kill(self.discovery_pid, signal.SIGINT)
os.waitpid(self.discovery_pid, 0)
except ChildProcessError:
pass # ignore
self.logger.info("Discovery service terminated")

self.discovery_pid = None

def _create_transport(self, trtype):
"""Initializes a transport type."""
args = {'trtype': trtype}
Expand Down
1 change: 1 addition & 0 deletions tests/test_multi_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def conn(config):
configA.config["gateway"]["state_update_notify"] = str(update_notify)
configA.config["gateway"]["min_controller_id"] = "1"
configA.config["gateway"]["max_controller_id"] = "20000"
configA.config["gateway"]["enable_spdk_discovery_controller"] = "true"
configB = copy.deepcopy(configA)
addr = configA.get("gateway", "addr")
portA = configA.getint("gateway", "port")
Expand Down
2 changes: 1 addition & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def _config(self, config):
self.config = config

def validate_exception(self, e):
pattern = r'spdk subprocess terminated pid=(\d+) exit_code=(\d+)'
pattern = r'Gateway subprocess terminated pid=(\d+) exit_code=(\d+)'
m = re.match(pattern, e.code)
assert(m)
pid = int(m.group(1))
Expand Down