From d79c7514454f002b32d2512756d7b8c71222dce8 Mon Sep 17 00:00:00 2001 From: Casey Jao Date: Wed, 13 Dec 2023 07:40:59 -0500 Subject: [PATCH 1/2] Unbreak task packing When submitting a task group, only attempt to upload task inputs corresponding to nodes external to the task group since only those will have been resolved. --- .github/workflows/tests.yml | 3 ++- CHANGELOG.md | 1 + covalent_dispatcher/_core/dispatcher.py | 15 +++++++++++++-- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 946e343f5..fce060c1d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -209,9 +209,10 @@ jobs: if: env.BUILD_AND_RUN_ALL id: covalent_start run: | + export COVALENT_ENABLE_TASK_PACKING=1 covalent db migrate if [ "${{ matrix.backend }}" = 'dask' ] ; then - COVALENT_ENABLE_TASK_PACKING=1 covalent start -d + covalent start -d elif [ "${{ matrix.backend }}" = 'local' ] ; then covalent start --no-cluster -d else diff --git a/CHANGELOG.md b/CHANGELOG.md index 594c1772e..3c35865f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Reduced number of assets to upload when submitting a dispatch. +- Fixed inaccuracies in task packing exposed by no longer uploading null attributes upon dispatch. ### Operations diff --git a/covalent_dispatcher/_core/dispatcher.py b/covalent_dispatcher/_core/dispatcher.py index e17547969..b0ecd27b6 100644 --- a/covalent_dispatcher/_core/dispatcher.py +++ b/covalent_dispatcher/_core/dispatcher.py @@ -183,6 +183,7 @@ async def _submit_task_group(dispatch_id: str, sorted_nodes: List[int], task_gro app_log.debug("8A: Update node success (run_planned_workflow).") else: + # Nodes whose values have already been resolved known_nodes = [] # Skip the group if all task outputs can be reused from a @@ -196,6 +197,8 @@ async def _submit_task_group(dispatch_id: str, sorted_nodes: List[int], task_gro # Gather inputs for each task and send the task spec sequence to the runner task_specs = [] + sorted_nodes_set = set(sorted_nodes) + for node_id in sorted_nodes: app_log.debug(f"Gathering inputs for task {node_id} (run_planned_workflow).") @@ -214,8 +217,16 @@ async def _submit_task_group(dispatch_id: str, sorted_nodes: List[int], task_gro "args_ids": abs_task_input["args"], "kwargs_ids": abs_task_input["kwargs"], } - known_nodes += abs_task_input["args"] - known_nodes += list(abs_task_input["kwargs"].values()) + # Task inputs that don't belong to the task group have already beeen resolved + external_task_args = filter( + lambda x: x not in sorted_nodes_set, abs_task_input["args"] + ) + known_nodes.extend(external_task_args) + external_task_kwargs = filter( + lambda x: x not in sorted_nodes_set, abs_task_input["kwargs"].values() + ) + known_nodes.extend(external_task_kwargs) + task_specs.append(task_spec) app_log.debug( From d7849acc50a0728594979a40d386418f5877541f Mon Sep 17 00:00:00 2001 From: Casey Jao Date: Wed, 13 Dec 2023 08:37:13 -0500 Subject: [PATCH 2/2] Fix file transfer test --- tests/functional_tests/file_transfer_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/functional_tests/file_transfer_test.py b/tests/functional_tests/file_transfer_test.py index ea5f4029d..f2196c266 100644 --- a/tests/functional_tests/file_transfer_test.py +++ b/tests/functional_tests/file_transfer_test.py @@ -180,7 +180,9 @@ def workflow(): rm._delete_result(dispatch_id) assert len(workflow_result.result) == 2 - assert workflow_result.result == (MOCK_CONTENTS, MOCK_CONTENTS) + assert workflow_result.result[0] == MOCK_CONTENTS + assert workflow_result.result[1] == MOCK_CONTENTS + for f in [source_file_1, dest_file_1, source_file_2, dest_file_2]: f.unlink()