diff --git a/_version.py b/_version.py index 82a8d7a5..32b71806 100644 --- a/_version.py +++ b/_version.py @@ -20,7 +20,7 @@ from packaging import version -__version__ = "5.2.8" +__version__ = "5.2.10" def version_major() -> int: diff --git a/module_wrapper_global.py b/module_wrapper_global.py index c27d6a86..34de7e1c 100644 --- a/module_wrapper_global.py +++ b/module_wrapper_global.py @@ -28,6 +28,7 @@ from lmao.module_wrapper import STATUS_NOT_INITIALIZED, STATUS_IDLE, STATUS_BUSY, STATUS_FAILED from lmao.module_wrapper import MODULES as LMAO_MODULES +import psutil from google_ai_module import GoogleAIModule from ms_copilot_module import MSCopilotModule from ms_copilot_designer_module import MSCopilotDesignerModule @@ -51,6 +52,9 @@ # Maximum time (in seconds) to wait for LMAO module to close before killing it's process _LMAO_STOP_TIMEOUT = 10 +# How long to wait for module no become not busy +_WAIT_FOR_IDLE_TIMEOUT = 10 + class ModuleWrapperGlobal: def __init__( @@ -84,6 +88,9 @@ def __init__( self.module = None + # PID for non-LMAO modules for is_busy() function + self._pid_value = multiprocessing.Value(c_int32, -1) + ################ # LMAO modules # ################ @@ -178,6 +185,25 @@ def __init__( elif name == "ms_copilot_designer": self.module = MSCopilotDesignerModule(config, self.messages, self.users_handler) + def is_busy(self) -> bool: + """ + Returns: + bool: True if current module is busy, False if not + """ + # LMAO module is busy if it's status is not IDLE + if self.name.startswith("lmao_"): + with self._lmao_module_status.get_lock(): + module_status = self._lmao_module_status.value + return module_status != STATUS_IDLE + + # Other modules -> check for process_request() process + else: + with self._pid_value.get_lock(): + pid = self._pid_value.value + if pid >= -1 and psutil.pid_exists(pid): + return True + return False + def process_request(self, request_response: request_response_container.RequestResponseContainer) -> None: """Processes request This is called from separate queue process (non main) @@ -188,6 +214,11 @@ def process_request(self, request_response: request_response_container.RequestRe Raises: Exception: process state / status or any other error """ + # Set PID for is_busy() function + with self._pid_value.get_lock(): + self._pid_value.value = multiprocessing.current_process().pid + + # Extract user's language user_id = request_response.user_id lang_id = self.users_handler.get_key(user_id, "lang_id", "eng") @@ -382,6 +413,15 @@ def delete_conversation(self, user_id: int) -> None: Raises: Exception: process state / status or any other error """ + # Wait for module to become not busy or timeout + time_started = time.time() + while True: + if time.time() - time_started > _WAIT_FOR_IDLE_TIMEOUT: + raise Exception("Timeout waiting for module to become available. Please wait a bit and try again") + if not self.is_busy(): + break + time.sleep(0.1) + # Redirect to LMAO process and wait if self.name.startswith("lmao_"): # Check status diff --git a/queue_handler.py b/queue_handler.py index 7ffc9d63..53ac2bda 100644 --- a/queue_handler.py +++ b/queue_handler.py @@ -151,19 +151,9 @@ def _queue_processing_loop(self) -> None: ################################################# # Check if we're not processing this request yet if request_.processing_state == request_response_container.PROCESSING_STATE_IN_QUEUE: - # Check if requested module's process is busy (only 1 request to each module as a time) - module_is_busy = False - for request__ in queue_list: - if ( - request__.module_name == request_.module_name - and request__.pid != 0 - and psutil.pid_exists(request__.pid) - ): - module_is_busy = True - break - # Ignore until module is no longer busy - if module_is_busy: + module = self.modules.get(request_.module_name) + if module is not None and module.is_busy(): continue # Set initializing state