From 9eb0bbb90f354a017cdbb48803b0372832b07cf8 Mon Sep 17 00:00:00 2001 From: Gil Bregman Date: Thu, 30 May 2024 11:59:34 +0300 Subject: [PATCH] Make sure to unlock OMAP file when we're done writing to it. Fixes #689 Signed-off-by: Gil Bregman --- control/grpc.py | 96 +++++++++++++++++++++++++++++------------------- control/state.py | 21 +++++------ 2 files changed, 68 insertions(+), 49 deletions(-) diff --git a/control/grpc.py b/control/grpc.py index f94c0c22..855ca2eb 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -227,7 +227,7 @@ def _put_cluster(self, name: str) -> None: self.logger.info(f"put_cluster {name=} number bdevs: {self.clusters[anagrp][name]}") return - assert False # we should find the cluster in our state + assert False, f"Cluster {name} is not found" # we should find the cluster in our state def _alloc_cluster_name(self, anagrp: int) -> str: """Allocates a new cluster name for ana group""" @@ -253,7 +253,10 @@ def _alloc_cluster(self, anagrp: int) -> str: def _grpc_function_with_lock(self, func, request, context): with self.rpc_lock: - return func(request, context) + rc = func(request, context) + if not self.omap_lock.omap_file_disable_unlock: + assert not self.omap_lock.locked(), f"OMAP is still locked when we're out of function {func}" + return rc def execute_grpc_function(self, func, request, context): """This functions handles RPC lock by wrapping 'func' with @@ -263,7 +266,7 @@ def execute_grpc_function(self, func, request, context): """ return self.omap_lock.execute_omap_locking_function(self._grpc_function_with_lock, func, request, context) - def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size, peer_msg = ""): + def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size, context, peer_msg = ""): """Creates a bdev from an RBD image.""" if create_image: @@ -273,7 +276,7 @@ def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, bl self.logger.info(f"Received request to create bdev {name} from" f" {rbd_pool_name}/{rbd_image_name} (size {rbd_image_size} bytes)" - f" with block size {block_size}, {cr_img_msg}{peer_msg}") + f" with block size {block_size}, {cr_img_msg}, context={context}{peer_msg}") if block_size == 0: return BdevStatus(status=errno.EINVAL, @@ -410,7 +413,7 @@ def delete_bdev(self, bdev_name, recycling_mode=False, peer_msg=""): if not self.rpc_lock.locked(): self.logger.error(f"A call to delete_bdev() without holding the RPC lock") - assert self.rpc_lock.locked() + assert self.rpc_lock.locked(), "RPC is unlocked when calling delete_bdev()" self.logger.info(f"Received request to delete bdev {bdev_name}{peer_msg}") try: @@ -540,7 +543,8 @@ def create_subsystem_safe(self, request, context): self.logger.info(f"No serial number specified for {request.subsystem_nqn}, will use {request.serial_number}") ret = False - with self.omap_lock(context=context): + omap_lock = self.omap_lock.get_omap_lock_to_use(context) + with omap_lock: errmsg = "" try: subsys_using_serial = None @@ -653,7 +657,8 @@ def delete_subsystem_safe(self, request, context): delete_subsystem_error_prefix = f"Failure deleting subsystem {request.subsystem_nqn}" ret = False - with self.omap_lock(context=context): + omap_lock = self.omap_lock.get_omap_lock_to_use(context) + with omap_lock: try: ret = rpc_nvmf.nvmf_delete_subsystem( self.spdk_rpc_client, @@ -744,7 +749,7 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, conte """Adds a namespace to a subsystem.""" if context: - assert self.omap_lock.locked() + assert self.omap_lock.locked(), "OMAP is unlocked when calling create_namespace()" nsid_msg = "" if nsid and uuid: nsid_msg = f" using NSID {nsid} and UUID {uuid}" @@ -756,7 +761,7 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, conte add_namespace_error_prefix = f"Failure adding namespace{nsid_msg}to {subsystem_nqn}" peer_msg = self.get_peer_message(context) - self.logger.info(f"Received request to add {bdev_name} to {subsystem_nqn} with ANA group id {anagrpid}{nsid_msg}{peer_msg}") + self.logger.info(f"Received request to add {bdev_name} to {subsystem_nqn} with ANA group id {anagrpid}{nsid_msg}, context: {context}{peer_msg}") if anagrpid > self.subsys_max_ns[subsystem_nqn]: errmsg = f"{add_namespace_error_prefix}: Group ID {anagrpid} is bigger than configured maximum {self.subsys_max_ns[subsystem_nqn]}" @@ -908,14 +913,24 @@ def choose_anagrpid_for_namespace(self, nsid) ->int: def namespace_add_safe(self, request, context): """Adds a namespace to a subsystem.""" + grps_list = [] + anagrp = 0 peer_msg = self.get_peer_message(context) nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) - self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}") + self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}") if not request.uuid: request.uuid = str(uuid.uuid4()) - with self.omap_lock(context=context): + if context: + if request.anagrpid != 0: + grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group) + else: + anagrp = self.choose_anagrpid_for_namespace(request.nsid) + assert anagrp != 0, "Chosen ANA group is 0" + + omap_lock = self.omap_lock.get_omap_lock_to_use(context) + with omap_lock: if context: errmsg, ns_nqn = self.check_if_image_used(request.rbd_pool_name, request.rbd_image_name) if errmsg and ns_nqn: @@ -934,24 +949,17 @@ def namespace_add_safe(self, request, context): else: # new namespace # If an explicit load balancing group was passed, make sure it exists if request.anagrpid != 0: - grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group) if request.anagrpid not in grps_list: self.logger.debug(f"ANA groups: {grps_list}") errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: Load balancing group {request.anagrpid} doesn't exist" self.logger.error(errmsg) return pb2.req_status(status=errno.ENODEV, error_message=errmsg) else: - anagrp = self.choose_anagrpid_for_namespace(request.nsid) - assert anagrp != 0 - # if anagrp == 0: - # errmsg = f"Failure adding namespace with automatic ana group load balancing {nsid_msg} to {request.subsystem_nqn}" - # self.logger.error(errmsg) - # return pb2.req_status(status=errno.EINVAL, error_message=errmsg) request.anagrpid = anagrp anagrp = request.anagrpid ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name, - request.rbd_image_name, request.block_size, create_image, request.size, peer_msg) + request.rbd_image_name, request.block_size, create_image, request.size, context, peer_msg) if ret_bdev.status != 0: errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: {ret_bdev.error_message}" self.logger.error(errmsg) @@ -1015,12 +1023,14 @@ def namespace_add(self, request, context=None): def namespace_change_load_balancing_group_safe(self, request, context): """Changes a namespace load balancing group.""" + grps_list = [] peer_msg = self.get_peer_message(context) nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) self.logger.info(f"Received request to change load balancing group for namespace {nsid_msg}in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}") - with self.omap_lock(context=context): - grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group) + grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group) + omap_lock = self.omap_lock.get_omap_lock_to_use(context) + with omap_lock: if request.anagrpid not in grps_list: self.logger.debug(f"ANA groups: {grps_list}") errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Load balancing group {request.anagrpid} doesn't exist" @@ -1139,7 +1149,7 @@ def remove_namespace_from_state(self, nqn, nsid, context): return pb2.req_status(status=0, error_message=os.strerror(0)) # If we got here context is not None, so we must hold the OMAP lock - assert self.omap_lock.locked() + assert self.omap_lock.locked(), "OMAP is unlocked when calling remove_namespace_from_state()" # Update gateway state try: @@ -1160,7 +1170,7 @@ def remove_namespace(self, subsystem_nqn, nsid, context): """Removes a namespace from a subsystem.""" if context: - assert self.omap_lock.locked() + assert self.omap_lock.locked(), "OMAP is unlocked when calling remove_namespace()" peer_msg = self.get_peer_message(context) namespace_failure_prefix = f"Failure removing namespace {nsid} from {subsystem_nqn}" self.logger.info(f"Received request to remove namespace {nsid} from {subsystem_nqn}{peer_msg}") @@ -1469,7 +1479,8 @@ def namespace_set_qos_limits_safe(self, request, context): limits_to_set = self.get_qos_limits_string(request) self.logger.debug(f"After merging current QOS limits with previous ones for namespace {nsid_msg}on {request.subsystem_nqn},{limits_to_set}") - with self.omap_lock(context=context): + omap_lock = self.omap_lock.get_omap_lock_to_use(context) + with omap_lock: try: ret = rpc_bdev.bdev_set_qos_limit( self.spdk_rpc_client, @@ -1520,7 +1531,7 @@ def find_namespace_and_bdev_name(self, nqn, nsid, uuid, needs_lock, err_prefix): else: if not self.rpc_lock.locked(): self.logger.error(f"A call to find_namespace_and_bdev_name() with 'needs_lock' set to False and without holding the RPC lock") - assert self.rpc_lock.locked() + assert self.rpc_lock.locked(), "RPC is unlocked when calling find_namespace_and_bdev_name()" lock_to_use = contextlib.suppress() with lock_to_use: @@ -1607,7 +1618,7 @@ def namespace_recycle_safe(self, ana_id, peer_msg = "") ->int: self.logger.info(f"nsid {ns_key} for nqn {subsys} to recycle:") nsid = ns_key bdev_name = self.subsystem_nsid_bdev[subsys][nsid] - assert bdev_name + assert bdev_name, f"Can't find bdev for subsystem {subsys}, namespace {nsid}" ns_params = {'nsid':nsid, 'bdev_name':bdev_name, 'subsys':subsys} list_ns_params.append(ns_params) self.logger.info(f"nsid :{nsid}, pool_name: {self.bdev_params[bdev_name]['pool_name']}, rbd_name: {self.bdev_params[bdev_name]['image_name']}, block_size: {self.bdev_params[bdev_name]['block_size']}, uuid:{self.bdev_params[bdev_name]['uuid']}, anagrpid:{ana_id}") @@ -1627,7 +1638,7 @@ def namespace_recycle_safe(self, ana_id, peer_msg = "") ->int: self.logger.info(f"ns params: {ns_params} ") ret_bdev = self.create_bdev( ana_id, bdev_name, self.bdev_params[bdev_name]['uuid'], self.bdev_params[bdev_name]['pool_name'], self.bdev_params[bdev_name]['image_name'], self.bdev_params[bdev_name]['block_size'], False, - self.bdev_params[bdev_name]['image_size'], peer_msg) + self.bdev_params[bdev_name]['image_size'], None, peer_msg) self.logger.info(f"bdev_rbd_create: {bdev_name}") if ret_bdev.status != 0: errmsg = f"Failure adding bdev {bdev_name} " @@ -1655,7 +1666,8 @@ def namespace_delete_safe(self, request, context): nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) self.logger.info(f"Received request to delete namespace {nsid_msg}from {request.subsystem_nqn}, context: {context}{peer_msg}") - with self.omap_lock(context=context): + omap_lock = self.omap_lock.get_omap_lock_to_use(context) + with omap_lock: find_ret = self.find_namespace_and_bdev_name(request.subsystem_nqn, request.nsid, request.uuid, False, "Failure deleting namespace") if not find_ret[0]: @@ -1733,7 +1745,8 @@ def add_host_safe(self, request, context): self.logger.error(f"{errmsg}") return pb2.req_status(status=errno.EINVAL, error_message=errmsg) - with self.omap_lock(context=context): + omap_lock = self.omap_lock.get_omap_lock_to_use(context) + with omap_lock: try: host_already_exist = self.matching_host_exists(context, request.subsystem_nqn, request.host_nqn) if host_already_exist: @@ -1812,7 +1825,7 @@ def remove_host_from_state(self, subsystem_nqn, host_nqn, context): return pb2.req_status(status=0, error_message=os.strerror(0)) if context: - assert self.omap_lock.locked() + assert self.omap_lock.locked(), "OMAP is unlocked when calling remove_host_from_state()" # Update gateway state try: self.gateway_state.remove_host(subsystem_nqn, host_nqn) @@ -1846,7 +1859,8 @@ def remove_host_safe(self, request, context): self.logger.error(f"{errmsg}") return pb2.req_status(status=errno.EINVAL, error_message=errmsg) - with self.omap_lock(context=context): + omap_lock = self.omap_lock.get_omap_lock_to_use(context) + with omap_lock: try: if request.host_nqn == "*": # Disable allow any host access self.logger.info( @@ -2105,7 +2119,8 @@ def create_listener_safe(self, request, context): self.logger.error(f"{errmsg}") return pb2.req_status(status=errno.EINVAL, error_message=errmsg) - with self.omap_lock(context=context): + omap_lock = self.omap_lock.get_omap_lock_to_use(context) + with omap_lock: try: if request.host_name == self.host_name: if (adrfam, request.traddr, request.trsvcid) in self.subsystem_listeners[request.nqn]: @@ -2217,7 +2232,7 @@ def remove_listener_from_state(self, nqn, host_name, traddr, port, context): return pb2.req_status(status=0, error_message=os.strerror(0)) if context: - assert self.omap_lock.locked() + assert self.omap_lock.locked(), "OMAP is unlocked when calling remove_listener_from_state()" host_name = host_name.strip() listener_hosts = [] @@ -2309,7 +2324,8 @@ def delete_listener_safe(self, request, context): self.logger.error(errmsg) return pb2.req_status(status=errno.ENOTEMPTY, error_message=errmsg) - with self.omap_lock(context=context): + omap_lock = self.omap_lock.get_omap_lock_to_use(context) + with omap_lock: try: if request.host_name == self.host_name or request.force: ret = rpc_nvmf.nvmf_subsystem_remove_listener( @@ -2361,7 +2377,8 @@ def list_listeners_safe(self, request, context): self.logger.info(f"Received request to list listeners for {request.subsystem}, context: {context}{peer_msg}") listeners = [] - with self.omap_lock(context=context): + omap_lock = self.omap_lock.get_omap_lock_to_use(context) + with omap_lock: state = self.gateway_state.local.get_state() listener_prefix = GatewayState.build_partial_listener_key(request.subsystem) for key, val in state.items(): @@ -2487,7 +2504,8 @@ def get_spdk_nvmf_log_flags_and_level_safe(self, request, context): peer_msg = self.get_peer_message(context) self.logger.info(f"Received request to get SPDK nvmf log flags and level{peer_msg}") log_flags = [] - with self.omap_lock(context=context): + omap_lock = self.omap_lock.get_omap_lock_to_use(context) + with omap_lock: try: nvmf_log_flags = {key: value for key, value in rpc_log.log_get_flags( self.spdk_rpc_client).items() if key.startswith('nvmf')} @@ -2544,7 +2562,8 @@ def set_spdk_nvmf_logs_safe(self, request, context): self.logger.info(f"Received request to set SPDK nvmf logs: log_level: {log_level}, print_level: {print_level}{peer_msg}") - with self.omap_lock(context=context): + omap_lock = self.omap_lock.get_omap_lock_to_use(context) + with omap_lock: try: nvmf_log_flags = [key for key in rpc_log.log_get_flags(self.spdk_rpc_client).keys() \ if key.startswith('nvmf')] @@ -2592,7 +2611,8 @@ def disable_spdk_nvmf_logs_safe(self, request, context): peer_msg = self.get_peer_message(context) self.logger.info(f"Received request to disable SPDK nvmf logs{peer_msg}") - with self.omap_lock(context=context): + omap_lock = self.omap_lock.get_omap_lock_to_use(context) + with omap_lock: try: nvmf_log_flags = [key for key in rpc_log.log_get_flags(self.spdk_rpc_client).keys() \ if key.startswith('nvmf')] diff --git a/control/state.py b/control/state.py index e6b28c20..c5e67262 100644 --- a/control/state.py +++ b/control/state.py @@ -11,6 +11,7 @@ import threading import rados import errno +import contextlib from typing import Dict from collections import defaultdict from abc import ABC, abstractmethod @@ -216,12 +217,6 @@ def __init__(self, omap_state, gateway_state, rpc_lock: threading.Lock) -> None: self.omap_file_disable_unlock = self.omap_state.config.getboolean_with_default("gateway", "omap_file_disable_unlock", False) if self.omap_file_disable_unlock: self.logger.warning(f"Will not unlock OMAP file for testing purposes") - self.enter_args = {} - - def __call__(self, **kwargs): - self.enter_args.clear() - self.enter_args.update(kwargs) - return self # # We pass the context from the different functions here. It should point to a real object in case we come from a real @@ -231,17 +226,19 @@ def __call__(self, **kwargs): # are done in such a case. # def __enter__(self): - context = self.enter_args.get("context") - if context and self.omap_file_lock_duration > 0: + if self.omap_file_lock_duration > 0: self.lock_omap() return self def __exit__(self, typ, value, traceback): - context = self.enter_args.get("context") - self.enter_args.clear() - if context and self.omap_file_lock_duration > 0: + if self.omap_file_lock_duration > 0: self.unlock_omap() + def get_omap_lock_to_use(self, context): + if context: + return self + return contextlib.suppress() + # # This function accepts a function in which there is Omap locking. It will execute this function # and in case the Omap is not current, will reload it and try again @@ -318,6 +315,7 @@ def unlock_omap(self): self.is_locked = False except rados.ObjectNotFound as ex: self.logger.warning(f"No such lock, the lock duration might have passed") + self.is_locked = False except Exception: self.logger.exception(f"Unable to unlock OMAP file") pass @@ -649,6 +647,7 @@ def update(self) -> bool: local_version = self.omap.get_local_version() if local_version < omap_version: + self.logger.debug(f"Start update from {local_version} to {omap_version}.") local_state_dict = self.local.get_state() local_state_keys = local_state_dict.keys() omap_state_keys = omap_state_dict.keys()