-
Notifications
You must be signed in to change notification settings - Fork 906
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor run methods more into abstract method #4353
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Merel Theisen <[email protected]>
…l runner as well Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
) -> ThreadPoolExecutor | ProcessPoolExecutor: | ||
"""Abstract method to provide the correct executor (e.g., ThreadPoolExecutor or ProcessPoolExecutor).""" | ||
pass | ||
|
||
@abstractmethod # pragma: no cover |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this still an abstractmethod?
@@ -46,11 +46,16 @@ def __init__( | |||
is_async=is_async, extra_dataset_patterns=self._extra_dataset_patterns | |||
) | |||
|
|||
def _get_executor(self, max_workers: int) -> ThreadPoolExecutor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary to create a thread for SeqeuntialRunner?
@@ -443,3 +546,27 @@ def run_node( | |||
) | |||
node = task.execute() | |||
return node | |||
|
|||
|
|||
def validate_max_workers(max_workers: int | None) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why _validate_catalog
and _validate_nodes
are private methods and this one is public?
) | ||
|
||
self._release_datasets(node, catalog, load_counts, pipeline) | ||
super()._run( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it still needed if it's just using the base method?
from collections import Counter, deque | ||
from concurrent.futures import ( | ||
FIRST_COMPLETED, | ||
ProcessPoolExecutor, | ||
ThreadPoolExecutor, | ||
wait, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this import multiprocessing
as a dependencies? I recalled in the past we have issues with ShelveStore
because even importing the library cause issues on restricted environment like AWS Lambda.
Description
Resolves #4290
Development notes
_run
method implementations into the abstract_run
method._get_executor
method, implemented by each runner.ParallelRunner
andThreadRunner
into sharedvalidate_max_workers
method.hook_manager
argument in runners to allow it to beNone
, which is needed for theParallelRunner
, because hook manager can't be serialised.TestSuggestResumeScenario
forSequentialRunner
, because it's now using aThreadPoolExecutor
, the suggestions can vary per run. I've manually created a project with the same pipelines and verified that the suggestions (even if they vary) do always work.TestSuggestResumeScenario
tests forThreadRunner
.Developer Certificate of Origin
We need all contributions to comply with the Developer Certificate of Origin (DCO). All commits must be signed off by including a
Signed-off-by
line in the commit message. See our wiki for guidance.If your PR is blocked due to unsigned commits, then you must follow the instructions under "Rebase the branch" on the GitHub Checks page for your PR. This will retroactively add the sign-off to all unsigned commits and allow the DCO check to pass.
Checklist
RELEASE.md
file