Skip to content

Commit

Permalink
Add resource specification fields to HTEX (#3638)
Browse files Browse the repository at this point in the history
Adds the parameter 'priority' as a valid entry in the resource spec dict. Necessary for changing the pending_task_queue to a different structure than queue.Queue. Also passes resource spec to the interchange.
  • Loading branch information
matthewc2003 authored Oct 17, 2024
1 parent af42414 commit 29f960f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
20 changes: 11 additions & 9 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,15 +346,17 @@ def worker_logdir(self):
return self.logdir

def validate_resource_spec(self, resource_specification: dict):
"""HTEX does not support *any* resource_specification options and
will raise InvalidResourceSpecification is any are passed to it"""
"""HTEX supports the following *Optional* resource specifications:
priority: lower value is higher priority"""
if resource_specification:
raise InvalidResourceSpecification(
set(resource_specification.keys()),
("HTEX does not support the supplied resource_specifications. "
"For MPI applications consider using the MPIExecutor. "
"For specifications for core count/memory/walltime, consider using WorkQueueExecutor.")
)
acceptable_fields = {'priority'}
keys = set(resource_specification.keys())
invalid_keys = keys - acceptable_fields
if invalid_keys:
message = "Task resource specification only accepts these types of resources: {}".format(
', '.join(acceptable_fields))
logger.error(message)
raise InvalidResourceSpecification(set(invalid_keys), message)
return

def initialize_scaling(self):
Expand Down Expand Up @@ -662,7 +664,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
except TypeError:
raise SerializationError(func.__name__)

msg = {"task_id": task_id, "buffer": fn_buf}
msg = {"task_id": task_id, "resource_spec": resource_specification, "buffer": fn_buf}

# Post task to the outgoing queue
self.outgoing_q.put(msg)
Expand Down
7 changes: 7 additions & 0 deletions parsl/tests/test_htex/test_resource_spec_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ def test_resource_spec_validation():
assert ret_val is None


@pytest.mark.local
def test_resource_spec_validation_one_key():
htex = HighThroughputExecutor()
ret_val = htex.validate_resource_spec({"priority": 2})
assert ret_val is None


@pytest.mark.local
def test_resource_spec_validation_bad_keys():
htex = HighThroughputExecutor()
Expand Down

0 comments on commit 29f960f

Please sign in to comment.