Skip to content

Commit

Permalink
Refactor worker dependencies on server (#919)
Browse files Browse the repository at this point in the history
* Initial refactor pass

* Merge CronAnalysisTask changes from HEAD

* rename deserialize -> task_deserialize

* remove task imports from client

* import psq as needed

* remove --run_local and refactor TASK_MAP

* Move monitoring setup to own method

* Fix bad merge

* Do not overwrite output_manager

* clean-up

* Manually merge update and address review comments
  • Loading branch information
aarontp authored Oct 16, 2021
1 parent ab059ba commit c400bef
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 296 deletions.
5 changes: 0 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ optional arguments:
Log file
-r REQUEST_ID, --request_id REQUEST_ID
Create new requests with this Request ID
-R, --run_local Run completely locally without any server or other
infrastructure. This can be used to run one-off Tasks to
process data locally.
-S, --server Run Turbinia Server indefinitely
-V, --version Show the version
-D, --dump_json Dump JSON output of Turbinia Request instead of sending
Expand All @@ -125,8 +122,6 @@ optional arguments:
-p POLL_INTERVAL, --poll_interval POLL_INTERVAL
Number of seconds to wait between polling for task state
info
-t TASK, --task TASK The name of a single Task to run locally (must be used
with --run_local.
-T, --debug_tasks Show debug output for all supported tasks
-w, --wait Wait to exit until all tasks for the given request have
completed
Expand Down
119 changes: 7 additions & 112 deletions turbinia/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,80 +37,16 @@
from turbinia.config import logger
from turbinia.config import DATETIME_FORMAT
from turbinia import task_manager
from turbinia import task_utils
from turbinia import TurbiniaException
from turbinia.lib import text_formatter as fmt
from turbinia.lib import docker_manager
from turbinia.jobs import manager as job_manager
from turbinia.workers import Priority
from turbinia.workers.artifact import FileArtifactExtractionTask
from turbinia.workers.analysis.wordpress_access import WordpressAccessLogAnalysisTask
from turbinia.workers.analysis.wordpress_creds import WordpressCredsAnalysisTask
from turbinia.workers.analysis.jenkins import JenkinsAnalysisTask
from turbinia.workers.analysis.jupyter import JupyterAnalysisTask
from turbinia.workers.analysis.linux_acct import LinuxAccountAnalysisTask
from turbinia.workers.analysis.loki import LokiAnalysisTask
from turbinia.workers.analysis.windows_acct import WindowsAccountAnalysisTask
from turbinia.workers.finalize_request import FinalizeRequestTask
from turbinia.workers.cron import CronAnalysisTask
from turbinia.workers.dfdewey import DfdeweyTask
from turbinia.workers.docker import DockerContainersEnumerationTask
from turbinia.workers.grep import GrepTask
from turbinia.workers.fsstat import FsstatTask
from turbinia.workers.hadoop import HadoopAnalysisTask
from turbinia.workers.hindsight import HindsightTask
from turbinia.workers.partitions import PartitionEnumerationTask
from turbinia.workers.plaso import PlasoTask
from turbinia.workers.psort import PsortTask
from turbinia.workers.redis import RedisAnalysisTask
from turbinia.workers.sshd import SSHDAnalysisTask
from turbinia.workers.strings import StringsAsciiTask
from turbinia.workers.strings import StringsUnicodeTask
from turbinia.workers.tomcat import TomcatAnalysisTask
from turbinia.workers.volatility import VolatilityTask
from turbinia.workers.worker_stat import StatTask
from turbinia.workers.binary_extractor import BinaryExtractorTask
from turbinia.workers.bulk_extractor import BulkExtractorTask
from turbinia.workers.photorec import PhotorecTask
from turbinia.workers.abort import AbortTask

MAX_RETRIES = 10
RETRY_SLEEP = 60

# TODO(aarontp): Remove this map after
# https://github.com/google/turbinia/issues/278 is fixed.
TASK_MAP = {
'fileartifactextractiontask': FileArtifactExtractionTask,
'wordpressaccessloganalysistask': WordpressAccessLogAnalysisTask,
'wordpresscredsanalysistask': WordpressCredsAnalysisTask,
'finalizerequesttask': FinalizeRequestTask,
'jenkinsanalysistask': JenkinsAnalysisTask,
'jupyteranalysistask': JupyterAnalysisTask,
'greptask': GrepTask,
'fsstattask': FsstatTask,
'hadoopanalysistask': HadoopAnalysisTask,
'hindsighttask': HindsightTask,
'linuxaccountanalysistask': LinuxAccountAnalysisTask,
'windowsaccountanalysistask': WindowsAccountAnalysisTask,
'lokianalysistask': LokiAnalysisTask,
'partitionenumerationtask': PartitionEnumerationTask,
'plasotask': PlasoTask,
'psorttask': PsortTask,
'redisanalysistask': RedisAnalysisTask,
'sshdanalysistask': SSHDAnalysisTask,
'stringsasciitask': StringsAsciiTask,
'stringsunicodetask': StringsUnicodeTask,
'tomcatanalysistask': TomcatAnalysisTask,
'volatilitytask': VolatilityTask,
'stattask': StatTask,
'binaryextractortask': BinaryExtractorTask,
'bulkextractortask': BulkExtractorTask,
'dockercontainersenumerationtask': DockerContainersEnumerationTask,
'photorectask': PhotorecTask,
'aborttask': AbortTask,
'cronanalysistask': CronAnalysisTask,
'dfdeweytask': DfdeweyTask
}

config.LoadConfig()
if config.TASK_MANAGER.lower() == 'psq':
from libcloudforensics.providers.gcp.internal import function as gcp_function
Expand All @@ -128,7 +64,7 @@ def setup(is_client=False):
logger.setup()


def get_turbinia_client(run_local=False):
def get_turbinia_client():
"""Return Turbinia client based on config.
Returns:
Expand All @@ -137,9 +73,9 @@ def get_turbinia_client(run_local=False):
# pylint: disable=no-else-return
setup(is_client=True)
if config.TASK_MANAGER.lower() == 'psq':
return BaseTurbiniaClient(run_local=run_local)
return BaseTurbiniaClient()
elif config.TASK_MANAGER.lower() == 'celery':
return TurbiniaCeleryClient(run_local=run_local)
return TurbiniaCeleryClient()
else:
msg = 'Task Manager type "{0:s}" not implemented'.format(
config.TASK_MANAGER)
Expand Down Expand Up @@ -225,31 +161,10 @@ class BaseTurbiniaClient:
task_manager (TaskManager): Turbinia task manager
"""

def __init__(self, run_local=False):
def __init__(self):
config.LoadConfig()
if run_local:
self.task_manager = None
else:
self.task_manager = task_manager.get_task_manager()
self.task_manager.setup(server=False)

def create_task(self, task_name):
"""Creates a Turbinia Task by name.
Args:
task_name(string): Name of the Task we are going to run.
Returns:
TurbiniaTask: An instantiated Task object.
Raises:
TurbiniaException: When no Task object matching task_name is found.
"""
task_obj = TASK_MAP.get(task_name.lower())
log.debug('Looking up Task {0:s} by name'.format(task_name))
if not task_obj:
raise TurbiniaException('No Task named {0:s} found'.format(task_name))
return task_obj()
self.task_manager = task_manager.get_task_manager()
self.task_manager.setup(server=False)

def list_jobs(self):
"""List the available jobs."""
Expand Down Expand Up @@ -958,26 +873,6 @@ def format_task_status(

return '\n'.join(report)

def run_local_task(self, task_name, request):
"""Runs a Turbinia Task locally.
Args:
task_name(string): Name of the Task we are going to run.
request (TurbiniaRequest): Object containing request and evidence info.
Returns:
TurbiniaTaskResult: The result returned by the Task Execution.
"""
task = self.create_task(task_name)
task.request_id = request.request_id
task.base_output_dir = config.OUTPUT_DIR
task.run_local = True
if not request.evidence:
raise TurbiniaException('TurbiniaRequest does not contain evidence.')
log.info('Running Task {0:s} locally'.format(task_name))
result = task.run_wrapper(request.evidence[0].serialize())
return result

def send_request(self, request):
"""Sends a TurbiniaRequest message.
Expand Down
8 changes: 4 additions & 4 deletions turbinia/lib/recipe_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from yaml import load
from turbinia.lib.file_helpers import file_to_str
from turbinia.lib.file_helpers import file_to_list
from turbinia.task_utils import TaskLoader

log = logging.getLogger('turbinia')

Expand Down Expand Up @@ -156,13 +157,12 @@ def validate_recipe(recipe_dict):
return (False, message)
proposed_task = recipe_item_contents['task']

# Doing a delayed import to avoid circular dependencies.
from turbinia.client import TASK_MAP
if proposed_task.lower() not in TASK_MAP:
task_loader = TaskLoader()
if not task_loader.check_task_name(proposed_task):
log.error(
'Task {0:s} defined for task recipe {1:s} does not exist.'.format(
proposed_task, recipe_item))
return (False, message)
tasks_with_recipe.append(recipe_item)

return (True, '')
return (True, '')
38 changes: 4 additions & 34 deletions turbinia/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from turbinia import evidence
from turbinia import config
from turbinia import state_manager
from turbinia import task_utils
from turbinia import TurbiniaException
from turbinia.jobs import manager as jobs_manager
from turbinia.lib import recipe_helpers
Expand Down Expand Up @@ -86,38 +87,6 @@ def get_task_manager():
raise turbinia.TurbiniaException(msg)


def task_runner(obj, *args, **kwargs):
"""Wrapper function to run specified TurbiniaTask object.
Args:
obj: An instantiated TurbiniaTask object.
*args: Any Args to pass to obj.
**kwargs: Any keyword args to pass to obj.
Returns:
Output from TurbiniaTask (should be TurbiniaTaskResult).
"""

# GKE Specific - do not queue more work if pod places this file
if os.path.exists(config.SCALEDOWN_WORKER_FILE):
raise psq.Retry()

# try to acquire lock, timeout and requeue task if the worker
# is already processing a task.
try:
lock = filelock.FileLock(config.LOCK_FILE)
with lock.acquire(timeout=0.001):
obj = workers.TurbiniaTask.deserialize(obj)
run = obj.run_wrapper(*args, **kwargs)
except filelock.Timeout:
raise psq.Retry()
finally:
# *always* make sure we release the lock
lock.release()

return run


class BaseTaskManager:
"""Class to manage Turbinia Tasks.
Expand Down Expand Up @@ -600,7 +569,8 @@ def _backend_setup(self, *args, **kwargs):
self.celery.setup()
self.kombu = turbinia_celery.TurbiniaKombu(config.KOMBU_CHANNEL)
self.kombu.setup()
self.celery_runner = self.celery.app.task(task_runner, name="task_runner")
self.celery_runner = self.celery.app.task(
task_utils.task_runner, name="task_runner")

def process_tasks(self):
"""Determine the current state of our tasks.
Expand Down Expand Up @@ -761,5 +731,5 @@ def enqueue_task(self, task, evidence_):
'Adding PSQ task {0:s} with evidence {1:s} to queue'.format(
task.name, evidence_.name))
task.stub = self.psq.enqueue(
task_runner, task.serialize(), evidence_.serialize())
task_utils.task_runner, task.serialize(), evidence_.serialize())
time.sleep(PSQ_QUEUE_WAIT_SECONDS)
Loading

0 comments on commit c400bef

Please sign in to comment.