Skip to content

Commit

Permalink
Fail namespace creation if passed load balancing group doesn't exist
Browse files Browse the repository at this point in the history
Fixes #627

Signed-off-by: Gil Bregman <[email protected]>
  • Loading branch information
gbregman committed May 7, 2024
1 parent 8fecf04 commit dd4bc7c
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 137 deletions.
2 changes: 2 additions & 0 deletions control/cephutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ def __init__(self, config):
self.last_sent = time.time()

def execute_ceph_monitor_command(self, cmd):
self.logger.debug(f"Execute monitor command: {cmd}")
with rados.Rados(conffile=self.ceph_conf, rados_id=self.rados_id) as cluster:
rply = cluster.mon_command(cmd, b'')
self.logger.debug(f"Monitor reply: {rply}")
return rply

def get_number_created_gateways(self, pool, group):
Expand Down
21 changes: 18 additions & 3 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ def create_subsystem_safe(self, request, context):
peer_msg = self.get_peer_message(context)

self.logger.info(
f"Received request to create subsystem {request.subsystem_nqn}, enable_ha: {request.enable_ha}, context: {context}{peer_msg}")
f"Received request to create subsystem {request.subsystem_nqn}, enable_ha: {request.enable_ha}, max_namespaces: {request.max_namespaces}, context: {context}{peer_msg}")

if not request.enable_ha:
errmsg = f"{create_subsystem_error_prefix}: HA must be enabled for subsystems"
Expand Down Expand Up @@ -592,7 +592,7 @@ def create_subsystem_safe(self, request, context):
ana_reporting = enable_ha,
)
self.subsys_ha[request.subsystem_nqn] = enable_ha
self.subsys_max_ns[request.subsystem_nqn] = request.max_namespaces if request.max_namespaces is not None else 32
self.subsys_max_ns[request.subsystem_nqn] = request.max_namespaces if request.max_namespaces else 32
self.logger.debug(f"create_subsystem {request.subsystem_nqn}: {ret}")
except Exception as ex:
self.logger.exception(create_subsystem_error_prefix)
Expand Down Expand Up @@ -957,8 +957,17 @@ def namespace_add_safe(self, request, context):
if not context:
create_image = False
else: # new namespace
if request.anagrpid == 0:
# If an explicit load balancing group was passed, make sure it exists
if request.anagrpid != 0:
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: Load balancing group {request.anagrpid} doesn't exist"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
else:
anagrp = self.choose_anagrpid_for_namespace(request.nsid)
assert anagrp != 0
# if anagrp == 0:
# errmsg = f"Failure adding namespace with automatic ana group load balancing {nsid_msg} to {request.subsystem_nqn}"
# self.logger.error(errmsg)
Expand Down Expand Up @@ -1036,6 +1045,12 @@ def namespace_change_load_balancing_group_safe(self, request, context):
self.logger.info(f"Received request to change load balancing group for namespace {nsid_msg}in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")

with self.omap_lock(context=context):
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Load balancing group {request.anagrpid} doesn't exist"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
find_ret = self.find_namespace_and_bdev_name(request.subsystem_nqn, request.nsid, request.uuid, False,
f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}")
if not find_ret[0]:
Expand Down
280 changes: 166 additions & 114 deletions tests/test_cli.py

Large diffs are not rendered by default.

9 changes: 3 additions & 6 deletions tests/test_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def test_create_get_subsys(caplog, config):
with GatewayServer(config) as gateway:
gateway.set_group_id(0)
gateway.serve()
gateway.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gateway.name}", "pool": "{pool}", "group": ""' + "}")

for i in range(created_resource_count):
create_resource_by_index(i)
Expand All @@ -47,11 +48,6 @@ def test_create_get_subsys(caplog, config):
gateway.gateway_rpc.host_name, "--traddr", "127.0.0.1", "--trsvcid", "5001"])
assert f"Adding {subsystem_prefix}0 listener at 127.0.0.1:5001: Successful" in caplog.text

# Change ANA group id for the first namesapce
cli(["namespace", "change_load_balancing_group", "--subsystem", f"{subsystem_prefix}0", "--nsid", "1",
"--load-balancing-group", "4"])
assert f"Changing load balancing group of namespace 1 in {subsystem_prefix}0 to 4: Successful" in caplog.text

# Set QOS for the first namespace
cli(["namespace", "set_qos", "--subsystem", f"{subsystem_prefix}0", "--nsid", "1",
"--rw-ios-per-second", "2000"])
Expand All @@ -69,6 +65,7 @@ def test_create_get_subsys(caplog, config):
with GatewayServer(config) as gateway:
gateway.set_group_id(0)
gateway.serve()
gateway.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gateway.name}", "pool": "{pool}", "group": ""' + "}")

for i in range(subsys_list_count):
cli(["--format", "plain", "subsystem", "list"])
Expand All @@ -77,7 +74,7 @@ def test_create_get_subsys(caplog, config):
time.sleep(0.1)

time.sleep(20) # Make sure update() is over
assert f"{subsystem_prefix}0 with ANA group id 4" in caplog.text
assert f"{subsystem_prefix}0 with ANA group id 1" in caplog.text
assert f"Received request to set QOS limits for namespace using NSID 1 on {subsystem_prefix}0, R/W IOs per second: 2000 Read megabytes per second: 5" in caplog.text
caplog.clear()
cli(["--format", "plain", "subsystem", "list"])
Expand Down
2 changes: 2 additions & 0 deletions tests/test_log_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

config = "ceph-nvmeof.conf"
subsystem_prefix = "nqn.2016-06.io.spdk:cnode"
pool = "rbd"

def clear_log_files():
files = os.listdir("/var/log/ceph")
Expand Down Expand Up @@ -49,6 +50,7 @@ def gateway(config, request):
# Start gateway
gateway.set_group_id(0)
gateway.serve()
gateway.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gateway.name}", "pool": "{pool}", "group": ""' + "}")

# Bind the client and Gateway
channel = grpc.insecure_channel(f"{addr}:{port}")
Expand Down
8 changes: 5 additions & 3 deletions tests/test_multi_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
def conn(config):
"""Sets up and tears down Gateways A and B."""
# Setup GatewayA and GatewayB configs
pool = config.get("ceph", "pool")
configA = copy.deepcopy(config)
configA.config["gateway"]["name"] = "GatewayA"
configA.config["gateway"]["group"] = "Group1"
Expand All @@ -27,8 +28,7 @@ def conn(config):
portB = portA + 2
configB.config["gateway"]["name"] = "GatewayB"
configB.config["gateway"]["port"] = str(portB)
configB.config["gateway"]["state_update_interval_sec"] = str(
update_interval_sec)
configB.config["gateway"]["state_update_interval_sec"] = str(update_interval_sec)
configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock"
configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x02"

Expand All @@ -39,11 +39,13 @@ def conn(config):
):
gatewayA.set_group_id(0)
gatewayA.serve()
gatewayA.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gatewayA.name}", "pool": "{pool}", "group": "Group1"' + "}")
# Delete existing OMAP state
gatewayA.gateway_rpc.gateway_state.delete_state()
# Create new
gatewayB.set_group_id(1)
gatewayB.serve()
gatewayB.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gatewayB.name}", "pool": "{pool}", "group": "Group1"' + "}")

# Bind the client and Gateways A & B
channelA = grpc.insecure_channel(f"{addr}:{portA}")
Expand Down Expand Up @@ -73,7 +75,7 @@ def test_multi_gateway_coordination(config, image, conn):
pool = config.get("ceph", "pool")

# Send requests to create a subsystem with one namespace to GatewayA
subsystem_req = pb2.create_subsystem_req(subsystem_nqn=nqn,
subsystem_req = pb2.create_subsystem_req(subsystem_nqn=nqn, max_namespaces=256,
serial_number=serial, enable_ha=True)
namespace_req = pb2.namespace_add_req(subsystem_nqn=nqn,
rbd_pool_name=pool,
Expand Down
2 changes: 2 additions & 0 deletions tests/test_namespaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ def conn(config):
):
gatewayA.set_group_id(0)
gatewayA.serve()
gatewayA.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gatewayA.name}", "pool": "{pool}", "group": "Group1"' + "}")
# Delete existing OMAP state
gatewayA.gateway_rpc.gateway_state.delete_state()
# Create new
gatewayB.set_group_id(1)
gatewayB.serve()
gatewayB.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gatewayB.name}", "pool": "{pool}", "group": "Group1"' + "}")

# Bind the client and Gateways A & B
channelA = grpc.insecure_channel(f"{addr}:{portA}")
Expand Down
11 changes: 7 additions & 4 deletions tests/test_nsid.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@ def setup_config(config, gw1_name, gw2_name, gw_group, update_notify, update_int

return configA, configB

def start_servers(gatewayA, gatewayB, addr, portA, portB):
def start_servers(gatewayA, gatewayB, gw_group, addr, portA, portB):
gatewayA.set_group_id(0)
gatewayA.serve()
gatewayA.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gatewayA.name}", "pool": "{pool}", "group": "{gw_group}"' + "}")
# Delete existing OMAP state
gatewayA.gateway_rpc.gateway_state.delete_state()
# Create new
gatewayB.set_group_id(1)
gatewayB.serve()
gatewayB.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gatewayB.name}", "pool": "{pool}", "group": "{gw_group}"' + "}")
gatewayB.gateway_rpc.gateway_state.delete_state()

# Bind the client and Gateways A & B
Expand All @@ -70,18 +72,18 @@ def test_multi_gateway_namespace_ids(config, image, caplog):
GatewayServer(configA) as gatewayA,
GatewayServer(configB) as gatewayB,
):
stubA, stubB = start_servers(gatewayA, gatewayB, addr, portA, portB)
stubA, stubB = start_servers(gatewayA, gatewayB, "Group1", addr, portA, portB)

# Send requests to create a subsystem on GatewayA
caplog.clear()
subsystem = f"{subsystem_prefix}PPP"
subsystem_add_req = pb2.create_subsystem_req(subsystem_nqn=subsystem)
subsystem_add_req = pb2.create_subsystem_req(subsystem_nqn=subsystem, max_namespaces=256)
ret_subsystem = stubA.create_subsystem(subsystem_add_req)
assert ret_subsystem.status != 0
assert "HA must be enabled for subsystems" in caplog.text
caplog.clear()
subsystem = f"{subsystem_prefix}WWW"
subsystem_add_req = pb2.create_subsystem_req(subsystem_nqn=subsystem, enable_ha=True)
subsystem_add_req = pb2.create_subsystem_req(subsystem_nqn=subsystem, max_namespaces=256, enable_ha=True)
ret_subsystem = stubA.create_subsystem(subsystem_add_req)
assert ret_subsystem.status == 0
assert f"create_subsystem {subsystem}: True" in caplog.text
Expand Down Expand Up @@ -152,6 +154,7 @@ def test_multi_gateway_namespace_ids(config, image, caplog):
gatewayB = GatewayServer(configB)
gatewayB.set_group_id(1)
gatewayB.serve()
gatewayB.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gatewayB.name}", "pool": "{pool}", "group": "Group1"' + "}")
channelB = grpc.insecure_channel(f"{addr}:{portB}")
stubB = pb2_grpc.GatewayStub(channelB)
time.sleep(10)
Expand Down
3 changes: 3 additions & 0 deletions tests/test_old_omap.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import grpc
from control.proto import gateway_pb2_grpc as pb2_grpc

pool = "rbd"

def test_old_omap(caplog, config):
with GatewayServer(config) as gateway:
gateway.set_group_id(0)
gateway.serve()
gateway.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gateway.name}", "pool": "{pool}", "group": ""' + "}")
gateway.gateway_rpc.gateway_state.omap._add_key("bdev_dummy", "dummy")

caplog.clear()
Expand Down
16 changes: 9 additions & 7 deletions tests/test_omap_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ def setup_config(config, gw1_name, gw2_name, gw_group, update_notify ,update_int

return configA, configB

def start_servers(gatewayA, gatewayB, addr, portA, portB):
def start_servers(gatewayA, gatewayB, gw_group, addr, portA, portB):
gatewayA.set_group_id(0)
gatewayA.serve()
gatewayA.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gatewayA.name}", "pool": "{pool}", "group": "{gw_group}"' + "}")
# Delete existing OMAP state
gatewayA.gateway_rpc.gateway_state.delete_state()
# Create new
gatewayB.set_group_id(1)
gatewayB.serve()
gatewayB.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gatewayB.name}", "pool": "{pool}", "group": "{gw_group}"' + "}")
gatewayB.gateway_rpc.gateway_state.delete_state()

# Bind the client and Gateways A & B
Expand Down Expand Up @@ -81,7 +83,7 @@ def conn_omap_reread(config, request):
GatewayServer(configA) as gatewayA,
GatewayServer(configB) as gatewayB,
):
stubA, stubB = start_servers(gatewayA, gatewayB, addr, portA, portB)
stubA, stubB = start_servers(gatewayA, gatewayB, "Group1", addr, portA, portB)
yield stubA, stubB, gatewayA.gateway_rpc, gatewayB.gateway_rpc
stop_servers(gatewayA, gatewayB)

Expand All @@ -100,7 +102,7 @@ def conn_lock_twice(config, request):
GatewayServer(configA) as gatewayA,
GatewayServer(configB) as gatewayB,
):
stubA, stubB = start_servers(gatewayA, gatewayB, addr, portA, portB)
stubA, stubB = start_servers(gatewayA, gatewayB, "Group2", addr, portA, portB)
yield stubA, stubB
stop_servers(gatewayA, gatewayB)

Expand All @@ -124,7 +126,7 @@ def conn_concurrent(config, request):
GatewayServer(configA) as gatewayA,
GatewayServer(configB) as gatewayB,
):
stubA, stubB = start_servers(gatewayA, gatewayB, addr, portA, portB)
stubA, stubB = start_servers(gatewayA, gatewayB, "Group3", addr, portA, portB)
yield gatewayA.gateway_rpc, gatewayB.gateway_rpc, stubA, stubB
stop_servers(gatewayA, gatewayB)

Expand All @@ -135,7 +137,7 @@ def build_host_nqn(i):

def create_resource_by_index(stub, i, caplog):
subsystem = f"{subsystem_prefix}{i}"
subsystem_req = pb2.create_subsystem_req(subsystem_nqn=subsystem, enable_ha=True)
subsystem_req = pb2.create_subsystem_req(subsystem_nqn=subsystem, max_namespaces=256, enable_ha=True)
ret_subsystem = stub.create_subsystem(subsystem_req)
assert ret_subsystem.status == 0
if caplog != None:
Expand Down Expand Up @@ -187,7 +189,7 @@ def test_multi_gateway_omap_reread(config, conn_omap_reread, caplog):
num_subsystems = 2

# Send requests to create a subsystem with one namespace to GatewayA
subsystem_req = pb2.create_subsystem_req(subsystem_nqn=nqn, serial_number=serial, enable_ha=True)
subsystem_req = pb2.create_subsystem_req(subsystem_nqn=nqn, serial_number=serial, max_namespaces=256, enable_ha=True)
namespace_req = pb2.namespace_add_req(subsystem_nqn=nqn, nsid=nsid,
rbd_pool_name=pool, rbd_image_name=image, block_size=4096,
create_image=True, size=16*1024*1024, force=True)
Expand Down Expand Up @@ -315,7 +317,7 @@ def test_multi_gateway_listener_update(config, image, conn_concurrent, caplog):

caplog.clear()
subsystem = f"{subsystem_prefix}QQQ"
subsystem_add_req = pb2.create_subsystem_req(subsystem_nqn=subsystem, enable_ha=True)
subsystem_add_req = pb2.create_subsystem_req(subsystem_nqn=subsystem, max_namespaces=256, enable_ha=True)
ret_subsystem = stubA.create_subsystem(subsystem_add_req)
assert ret_subsystem.status == 0
assert f"create_subsystem {subsystem}: True" in caplog.text
Expand Down
6 changes: 6 additions & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import unittest
from control.server import GatewayServer

pool = "rbd"

class TestServer(unittest.TestCase):
@pytest.fixture(autouse=True)
def _config(self, config):
Expand Down Expand Up @@ -36,13 +38,15 @@ def test_spdk_exception(self):
with GatewayServer(config_spdk_exception) as gateway:
gateway.set_group_id(0)
gateway.serve()
gateway.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gateway.name}", "pool": "{pool}", "group": ""' + "}")
self.validate_exception(cm.exception)

def test_spdk_abort(self):
"""Tests spdk sub process dumps core on during normal shutdown."""
with GatewayServer(copy.deepcopy(self.config)) as gateway:
gateway.set_group_id(0)
gateway.serve()
gateway.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gateway.name}", "pool": "{pool}", "group": ""' + "}")
time.sleep(10)
# exited context, spdk process should be aborted here by __exit__()
time.sleep(10) # let it dump
Expand All @@ -68,8 +72,10 @@ def test_spdk_multi_gateway_exception(self):
):
gatewayA.set_group_id(0)
gatewayA.serve()
gatewayA.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gatewayA.name}", "pool": "{pool}", "group": ""' + "}")
gatewayB.set_group_id(1)
gatewayB.serve()
gatewayB.ceph_utils.execute_ceph_monitor_command("{" + f'"prefix":"nvme-gw create", "id": "{gatewayB.name}", "pool": "{pool}", "group": ""' + "}")
self.validate_exception(cm.exception)

if __name__ == '__main__':
Expand Down

0 comments on commit dd4bc7c

Please sign in to comment.