From 252410c6c08b1599a5c9f45f669d9980ac69908d Mon Sep 17 00:00:00 2001 From: Aaron Peterson Date: Tue, 1 Oct 2024 16:24:48 -0700 Subject: [PATCH] Keep task results if available and update logs/counter (#1559) --- turbinia/state_manager.py | 30 +++++++++++++------------- turbinia/task_manager.py | 44 ++++++++++++++++++++++++++------------- 2 files changed, 46 insertions(+), 28 deletions(-) diff --git a/turbinia/state_manager.py b/turbinia/state_manager.py index 9dfc4bae5..af067941b 100644 --- a/turbinia/state_manager.py +++ b/turbinia/state_manager.py @@ -174,7 +174,7 @@ def get_task(self, task_id: str) -> dict: task_id (str): The ID of the stored task. Returns: - task_dict (dict): Dict containing task attributes. + task_dict (dict): Dict containing task attributes. """ task_key = ':'.join(('TurbiniaTask', task_id)) task_dict = {} @@ -265,7 +265,7 @@ def get_task_data( def update_request_task(self, task) -> None: """Adds a Turbinia task to the corresponding request list. - + Args: task (TurbiniaTask): Turbinia task object. """ @@ -315,7 +315,9 @@ def write_new_task(self, task) -> Optional[str]: Returns: task_key Optional[str]: The key corresponding for the task. """ - log.info(f'Writing metadata for new task {task.name:s} with id {task.id:s}') + log.info( + f'Writing metadata for new task {task.name:s} with id {task.id:s} ' + f'and request ID {task.request_id}') try: task_key = self.redis_client.build_key_name('task', task.id) except ValueError as exception: @@ -359,7 +361,7 @@ def update_task_helper(self, task) -> Dict[str, Any]: def update_task(self, task) -> Optional[str]: """Updates a Turbinia task key. - + Args: task: A TurbiniaTask object. @@ -400,7 +402,7 @@ def write_evidence(self, evidence_dict: dict[str]) -> str: Returns: evidence_key (str): The key corresponding to the evidence in Redis - + Raises: TurbiniaException: If the attribute deserialization fails. """ @@ -443,7 +445,7 @@ def get_evidence_data(self, evidence_id: str) -> dict: evidence_id (str): The ID of the stored evidence. Returns: - evidence_dict (dict): Dict containing evidence attributes. + evidence_dict (dict): Dict containing evidence attributes. """ evidence_key = ':'.join(('TurbiniaEvidence', evidence_id)) evidence_dict = {} @@ -462,7 +464,7 @@ def get_evidence_summary( output (str): Output of the function (keys | content | count). Returns: - summary (dict | list | int): Object containing evidences. + summary (dict | list | int): Object containing evidences. """ if output == 'count' and not group: return sum(1 for _ in self.redis_client.iterate_keys('Evidence')) @@ -494,7 +496,7 @@ def query_evidence( output (str): Output of the function (keys | content | count). Returns: - query_result (list | int): Result of the query. + query_result (list | int): Result of the query. """ keys = [] for evidence_key in self.redis_client.iterate_keys('Evidence'): @@ -517,7 +519,7 @@ def get_evidence_key_by_hash(self, file_hash: str) -> str | None: file_hash (str): The hash of the stored evidence. Returns: - key (str | None): Key of the stored evidence. + key (str | None): Key of the stored evidence. """ try: if file_hash: @@ -533,7 +535,7 @@ def get_evidence_by_hash(self, file_hash: str) -> dict: file_hash (str): The hash of the stored evidence. Returns: - evidence_dict (dict): Dict containing evidence attributes. + evidence_dict (dict): Dict containing evidence attributes. """ evidence_id = self.get_evidence_key_by_hash(file_hash).split(':')[1] return self.get_evidence_data(evidence_id) @@ -544,12 +546,12 @@ def write_request(self, request_dict: dict, overwrite=False): Args: request_dict (dict[str]): A dictionary containing the serialized request attributes that will be saved. - overwrite (bool): Allows overwriting previous key and blocks writing new + overwrite (bool): Allows overwriting previous key and blocks writing new ones. Returns: request_key (str): The key corresponding to the evidence in Redis - + Raises: TurbiniaException: If the attribute deserialization fails or tried to overwrite an existing key without overwrite=True @@ -589,7 +591,7 @@ def get_request_data(self, request_id: str) -> dict: request_id (str): The ID of the stored request. Returns: - request_dict (dict): Dict containing request attributes. + request_dict (dict): Dict containing request attributes. """ request_key = self.redis_client.build_key_name('request', request_id) request_dict = {} @@ -637,7 +639,7 @@ def query_requests( output (str): Output of the function (keys | content | count). Returns: - query_result (list | int): Result of the query. + query_result (list | int): Result of the query. """ keys = [] for request_key in self.redis_client.iterate_keys('request'): diff --git a/turbinia/task_manager.py b/turbinia/task_manager.py index 6f55098b5..e11865f4f 100644 --- a/turbinia/task_manager.py +++ b/turbinia/task_manager.py @@ -78,6 +78,9 @@ turbinia_server_task_timeout_total = Counter( 'turbinia_server_task_timeout_total', 'Total number of Tasks that have timed out on the Server.') +turbinia_server_task_fail_total = Counter( + 'turbinia_server_task_fail_total', + 'Total number of Tasks that have failed according to the Server.') turbinia_result_success_invalid = Counter( 'turbinia_result_success_invalid', 'The result returned from the Task had an invalid success status of None') @@ -623,18 +626,30 @@ def close_failed_task(self, task): Returns: TurbiniaTask: The updated Task. """ - result = workers.TurbiniaTaskResult( - request_id=task.request_id, no_output_manager=True, - no_state_manager=True) - result.setup(task) - if task.stub.traceback: - result.status = ( - f'Task {task.id} failed with exception: {task.stub.traceback}') - else: - result.status = f'Task {task.id} failed.' - result.successful = False - result.closed = True - task.result = result + if not task.result: + result = workers.TurbiniaTaskResult( + request_id=task.request_id, no_output_manager=True, + no_state_manager=True) + result.setup(task) + task.result = result + + if not task.result.status: + if task.stub.traceback: + task.result.status = ( + f'Task {task.id} failed with exception: {task.stub.traceback}') + else: + task.result.status = f'Task {task.id} failed.' + # Record the traceback in the status if we have one otherwise it won't + # be visible for the user. + elif task.result.status and task.stub.traceback: + new_status = ( + f'{task.result.status} (Task failure exception: ' + f'{task.stub.traceback})') + task.result.status = new_status + + task.result.successful = False + task.result.closed = True + turbinia_server_task_fail_total.inc() return task def timeout_task(self, task, timeout): @@ -813,8 +828,9 @@ def get_evidence(self): def enqueue_task(self, task, evidence_, timeout): log.info( - f'Adding Celery task {task.name:s} with evidence {evidence_.name:s}' - f' to queue with base task timeout {timeout}') + f'Adding Celery task {task.name:s} for request id {task.request_id} ' + f'with evidence {evidence_.name:s} to queue with base task ' + f'timeout {timeout}') # https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-time-limit # Hard limit in seconds, the worker processing the task will be killed and # replaced with a new one when this is exceeded.