Skip to content

Commit

Permalink
context feature added
Browse files Browse the repository at this point in the history
  • Loading branch information
tphung3 committed Dec 14, 2024
1 parent 1f7ca16 commit 7160085
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 1 deletion.
15 changes: 15 additions & 0 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
argument_file = None
result_file = None
map_file = None
function_context_file = None

# Get path to files that will contain the pickled function,
# arguments, result, and map of input and output files
Expand All @@ -438,6 +439,19 @@ def submit(self, func, resource_specification, *args, **kwargs):
result_file = self._path_in_task(executor_task_id, "result")
map_file = self._path_in_task(executor_task_id, "map")

if exec_mode == 'serverless':
if 'function_context' in resource_specification:
if 'function_context_file' not in self._map_func_names_to_serialized_func_file[func.__name__]:
function_context = resource_specification.get('function_context')
function_context_args = resource_specification.get('function_context_args', [])
function_context_kwargs = resource_specification.get('function_context_kwargs', {})
function_context_file = os.path.join(self._function_data_dir.name, func.__name__, 'function_context')
self._serialize_object_to_file(function_context_file, [function_context, function_context_args, function_context_kwargs])
self._map_func_names_to_serialized_func_file[func.__name__]['function_context_file'] = function_context_file
else:
function_context_file = self._map_func_names_to_serialized_func_file[func.__name__]['function_context_file']


logger.debug("Creating executor task {} with function at: {}, argument at: {}, \
and result to be found at: {}".format(executor_task_id, function_file, argument_file, result_file))

Expand Down Expand Up @@ -475,6 +489,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
function_file=function_file,
argument_file=argument_file,
result_file=result_file,
function_context_file=function_context_file,
cores=cores,
memory=memory,
disk=disk,
Expand Down
9 changes: 8 additions & 1 deletion parsl/executors/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ def _taskvine_submit_wait(ready_task_queue=None,
# This cost is paid only once per function/app.
func = _deserialize_object_from_file(task.function_file)

# Deserialize the function context to add it to the library if available
# This cost is paid only once per function/app.
function_context_list = None
if task.function_context_file:
function_context_list = _deserialize_object_from_file(task.function_context_file)

# Don't automatically add environment so manager can declare and cache the vine file associated with the environment file
add_env = False
lib_name = f'{task.func_name}-lib'
Expand All @@ -278,7 +284,8 @@ def _taskvine_submit_wait(ready_task_queue=None,
poncho_env=poncho_env_path,
init_command=manager_config.init_command,
exec_mode='direct',
add_env=add_env)
add_env=add_env,
library_context_info=function_context_list)

# Configure the library if provided
if manager_config.library_config:
Expand Down
7 changes: 7 additions & 0 deletions parsl/executors/taskvine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(self,
function_file: Optional[str], # pickled file containing the function information
argument_file: Optional[str], # pickled file containing the arguments to the function call
result_file: Optional[str], # path to the pickled result object of the function execution
function_context_file: Optional[list], # path to the pickled list of function context details for serverless functions
cores: Optional[float], # number of cores to allocate
memory: Optional[int], # amount of memory in MBs to allocate
disk: Optional[int], # amount of disk in MBs to allocate
Expand All @@ -33,6 +34,7 @@ def __init__(self,
self.result_file = result_file
self.input_files = input_files
self.output_files = output_files
self.function_context_file = function_context_file
self.cores = cores
self.memory = memory
self.disk = disk
Expand Down Expand Up @@ -85,3 +87,8 @@ def run_parsl_function(map_file, function_file, argument_file, result_file):
"""
from parsl.executors.taskvine.exec_parsl_function import run
run(map_file, function_file, argument_file, result_file)


def load_variable_in_serverless(var_name):
from ndcctools.taskvine.utils import load_variable_from_library
return load_variable_from_library(var_name)

0 comments on commit 7160085

Please sign in to comment.