Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unbreak task packing #1887

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,10 @@ jobs:
if: env.BUILD_AND_RUN_ALL
id: covalent_start
run: |
export COVALENT_ENABLE_TASK_PACKING=1
covalent db migrate
if [ "${{ matrix.backend }}" = 'dask' ] ; then
COVALENT_ENABLE_TASK_PACKING=1 covalent start -d
covalent start -d
elif [ "${{ matrix.backend }}" = 'local' ] ; then
covalent start --no-cluster -d
else
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- Reduced number of assets to upload when submitting a dispatch.
- Fixed inaccuracies in task packing exposed by no longer uploading null attributes upon dispatch.

### Operations

Expand Down
15 changes: 13 additions & 2 deletions covalent_dispatcher/_core/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ async def _submit_task_group(dispatch_id: str, sorted_nodes: List[int], task_gro
app_log.debug("8A: Update node success (run_planned_workflow).")

else:
# Nodes whose values have already been resolved
known_nodes = []

# Skip the group if all task outputs can be reused from a
Expand All @@ -196,6 +197,8 @@ async def _submit_task_group(dispatch_id: str, sorted_nodes: List[int], task_gro
# Gather inputs for each task and send the task spec sequence to the runner
task_specs = []

sorted_nodes_set = set(sorted_nodes)

for node_id in sorted_nodes:
app_log.debug(f"Gathering inputs for task {node_id} (run_planned_workflow).")

Expand All @@ -214,8 +217,16 @@ async def _submit_task_group(dispatch_id: str, sorted_nodes: List[int], task_gro
"args_ids": abs_task_input["args"],
"kwargs_ids": abs_task_input["kwargs"],
}
known_nodes += abs_task_input["args"]
known_nodes += list(abs_task_input["kwargs"].values())
# Task inputs that don't belong to the task group have already beeen resolved
external_task_args = filter(
lambda x: x not in sorted_nodes_set, abs_task_input["args"]
)
known_nodes.extend(external_task_args)
external_task_kwargs = filter(
lambda x: x not in sorted_nodes_set, abs_task_input["kwargs"].values()
)
known_nodes.extend(external_task_kwargs)

task_specs.append(task_spec)

app_log.debug(
Expand Down
4 changes: 3 additions & 1 deletion tests/functional_tests/file_transfer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ def workflow():
rm._delete_result(dispatch_id)

assert len(workflow_result.result) == 2
assert workflow_result.result == (MOCK_CONTENTS, MOCK_CONTENTS)
assert workflow_result.result[0] == MOCK_CONTENTS
assert workflow_result.result[1] == MOCK_CONTENTS

for f in [source_file_1, dest_file_1, source_file_2, dest_file_2]:
f.unlink()

Expand Down
Loading