Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Optimize TaskVineExecutor #3724

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
79 changes: 73 additions & 6 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import uuid
from concurrent.futures import Future
from datetime import datetime
from typing import List, Literal, Optional, Union
from typing import Dict, List, Literal, Optional, Union

# Import other libraries
import typeguard
Expand Down Expand Up @@ -81,8 +81,12 @@ class TaskVineExecutor(BlockProviderExecutor, putils.RepresentationMixin):
pre-warmed forked python process.
Default is 'regular'.

use_tmp_dir_for_staging: bool
Whether to use tmp dir for staging functions, arguments, and results.
Default is True.

manager_config: TaskVineManagerConfig
Configuration for the TaskVine manager. Default
Configuration for the TaskVine manager.

factory_config: TaskVineFactoryConfig
Configuration for the TaskVine factory.
Expand All @@ -104,6 +108,7 @@ def __init__(self,
label: str = "TaskVineExecutor",
worker_launch_method: Union[Literal['provider'], Literal['factory'], Literal['manual']] = 'factory',
function_exec_mode: Union[Literal['regular'], Literal['serverless']] = 'regular',
use_tmp_dir_for_staging: bool = True,
manager_config: TaskVineManagerConfig = TaskVineManagerConfig(),
factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(),
provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1),
Expand All @@ -129,6 +134,7 @@ def __init__(self,
self.label = label
self.worker_launch_method = worker_launch_method
self.function_exec_mode = function_exec_mode
self.use_tmp_dir_for_staging = use_tmp_dir_for_staging
self.manager_config = manager_config
self.factory_config = factory_config
self.storage_access = storage_access
Expand Down Expand Up @@ -172,6 +178,13 @@ def __init__(self,
# Path to directory that holds all tasks' data and results.
self._function_data_dir = ""

# Mapping of function names to function details.
# Currently the values include function objects, path to serialized functions,
# path to serialized function contexts, and whether functions are serialized.
# Helpful to detect inconsistencies in serverless functions.
# Helpful to deduplicate the same function.
self._map_func_names_to_func_details: Dict[str, Dict] = {}

# Helper scripts to prepare package tarballs for Parsl apps
self._package_analyze_script = shutil.which("poncho_package_analyze")
self._package_create_script = shutil.which("poncho_package_create")
Expand Down Expand Up @@ -218,8 +231,12 @@ def __create_data_and_logging_dirs(self):
# Create directories for data and results
log_dir = os.path.join(run_dir, self.label)
os.makedirs(log_dir)
tmp_prefix = f'{self.label}-{getpass.getuser()}-{datetime.now().strftime("%Y%m%d%H%M%S%f")}-'
self._function_data_dir = tempfile.TemporaryDirectory(prefix=tmp_prefix)

if self.use_tmp_dir_for_staging:
tmp_prefix = f'{self.label}-{getpass.getuser()}-{datetime.now().strftime("%Y%m%d%H%M%S%f")}-'
self._function_data_dir = tempfile.TemporaryDirectory(prefix=tmp_prefix)
else:
self._function_data_dir = os.path.join(log_dir, 'function')

# put TaskVine logs outside of a Parsl run as TaskVine caches between runs while
# Parsl does not.
Expand Down Expand Up @@ -318,11 +335,35 @@ def submit(self, func, resource_specification, *args, **kwargs):
Keyword arguments to the Parsl app
"""

# a Parsl function must have a name
if func.__name__ is None:
raise ValueError('A Parsl function must have a name')

logger.debug(f'Got resource specification: {resource_specification}')

# If `_parsl_monitoring_task_id` is in kwargs, Parsl monitoring code is enabled.
is_monitoring_enabled = '_parsl_monitoring_task_id' in kwargs

# Default execution mode of apps is regular
exec_mode = resource_specification.get('exec_mode', self.function_exec_mode)

# Fall back to regular execution if a function is Parsl-monitored as a monitored function is invocation-specific.
# Note that it is possible to get the wrapped function by calling the `__wrapped__` attribute when monitoring is enabled.
# It will disable the monitoring wrapper code however.
if exec_mode == 'serverless' and is_monitoring_enabled:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit suspicious to me -- what is it about monitoring that is special here?

is it that there is a new function definition for every submission? If so, there are other parsl modes like some file staging scenarios, where that can happen too.

Although I think that is also what the block below, with function equality checking and fallback to regular is trying to do too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that for serverless execution model to work, a function func must be a generic function, instead of an invocation-specific function. The monitoring code replaces the original function with an invocation-specific version with task_id and so on. For file staging, I have been poking around it and find that as long as the storage_access attribute is None, which it is for TaskVineExecutor, the dfk won't replace the original function with file staging wrapper (I might be wrong here) and file staging is instead delegated to TaskVine's specialized file staging API.

The code block with function id checking is as you said a detection mechanism to fall back to the regular execution method if functions do change.

Thanks for reviewing it early! I was gonna fill this PR with more descriptions, tests, and documentation before pinging you.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

people don't use file staging much, but it might not be none - for example, I know one user who uses work queue plus zip file staging in Parsl, and that should translate directly to tv + zip file staging.

does that monitoring test detect anything that the line below it doesn't already detect? or is it for getting a nicer error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is for getting a nicer message so users may know what's wrong specifically with each case. The monitoring detection can be removed without affecting functionality.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the first monitoring check removes the case where the first function runs serverless-ly and subsequent functions run regularly.

logger.warning("A serverless task cannot run with Parsl monitoring enabled. Falling back to execute this task as a regular task.")
exec_mode = 'regular'

if exec_mode == 'serverless':
if func.__name__ not in self._map_func_names_to_func_details:
self._map_func_names_to_func_details[func.__name__] = {'func_obj': func}
else:
if id(func) != id(self._map_func_names_to_func_details[func.__name__]['func_obj']):
logger.warning('Inconsistency in a serverless function call detected.\
A function name cannot point to two different function objects.\
Falling back to executing it as a regular task.')
exec_mode = 'regular'

# Detect resources and features of a submitted Parsl app
cores = None
memory = None
Expand Down Expand Up @@ -386,19 +427,43 @@ def submit(self, func, resource_specification, *args, **kwargs):
argument_file = None
result_file = None
map_file = None
function_context_file = None

# Get path to files that will contain the pickled function,
# arguments, result, and map of input and output files
function_file = self._path_in_task(executor_task_id, "function")
if exec_mode == 'serverless':
if 'function_file' not in self._map_func_names_to_func_details[func.__name__]:
function_file = os.path.join(self._function_data_dir.name, func.__name__, 'function')
os.makedirs(os.path.join(self._function_data_dir.name, func.__name__))
self._map_func_names_to_func_details[func.__name__].update({'function_file': function_file, 'is_serialized': False})
else:
function_file = self._map_func_names_to_func_details[func.__name__]['function_file']
else:
function_file = self._path_in_task(executor_task_id, "function")
argument_file = self._path_in_task(executor_task_id, "argument")
result_file = self._path_in_task(executor_task_id, "result")
map_file = self._path_in_task(executor_task_id, "map")

if exec_mode == 'serverless':
if 'function_context' in resource_specification:
if 'function_context_file' not in self._map_func_names_to_func_details[func.__name__]:
function_context = resource_specification.get('function_context')
function_context_args = resource_specification.get('function_context_args', [])
function_context_kwargs = resource_specification.get('function_context_kwargs', {})
function_context_file = os.path.join(self._function_data_dir.name, func.__name__, 'function_context')
self._serialize_object_to_file(function_context_file, [function_context, function_context_args, function_context_kwargs])
self._map_func_names_to_func_details[func.__name__].update({'function_context_file': function_context_file})
else:
function_context_file = self._map_func_names_to_func_details[func.__name__]['function_context_file']

logger.debug("Creating executor task {} with function at: {}, argument at: {}, \
and result to be found at: {}".format(executor_task_id, function_file, argument_file, result_file))

# Serialize function object and arguments, separately
self._serialize_object_to_file(function_file, func)
if exec_mode == 'regular' or not self._map_func_names_to_func_details[func.__name__]['is_serialized']:
self._serialize_object_to_file(function_file, func)
if exec_mode == 'serverless':
self._map_func_names_to_func_details[func.__name__]['is_serialized'] = True
args_dict = {'args': args, 'kwargs': kwargs}
self._serialize_object_to_file(argument_file, args_dict)

Expand All @@ -419,6 +484,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
category = func.__name__ if self.manager_config.autocategory else 'parsl-default'

task_info = ParslTaskToVine(executor_id=executor_task_id,
func_name=func.__name__,
exec_mode=exec_mode,
category=category,
input_files=input_files,
Expand All @@ -427,6 +493,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
function_file=function_file,
argument_file=argument_file,
result_file=result_file,
function_context_file=function_context_file,
cores=cores,
memory=memory,
disk=disk,
Expand Down
Loading