-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Windowing Support for the DaskRunner #27618
Changes from all commits
bc6b525
665ee61
d1095c7
fd13d40
e3d0c8a
69a660f
ab58334
5391cd6
5deb598
1768e47
e6c7106
1b58d4c
5fe9372
b98330e
e48780a
0dc9e23
326d3a3
b4cc408
40a6ebe
3c4204d
41623ec
676d752
09365f6
c6ba4ba
a325356
ea13125
f855ffc
3fd966e
173d79b
dd2d15c
8756618
8380d7b
b908dc3
f444b1e
174d6fd
60b063a
90ee474
c62050e
79d4603
248ec70
42452ca
1da2ddd
14885a3
fca2420
6dd1ada
6675687
88ed36b
2e3a126
6467b0e
793ba86
e535792
8e32668
318afc2
b01855f
2c2eb8d
e64e9eb
69b118b
9ffc8d8
8a2afb7
93f02f1
afdcf1b
41b5267
e3ac3f8
3fddc81
9eeb9ea
b4d0999
0319ffd
0b13bb0
1e7052b
292e023
31c1e2b
f4ecf2f
4d24ed9
ee62a4a
506c719
cd0ba8b
d0a7c63
775bd07
efba1c9
741d961
f66458a
5dcf969
ec5f613
04b1f1a
712944b
567b72b
f53c0a4
fb280ad
80ddfec
40c4e35
a70c5f3
9fb52e5
91115e0
26c6016
aec19bf
0673235
de03a32
7e0a2c7
39b1e1c
6db49fa
ed00139
2d8f5d6
d35e9d6
973d9f9
0e5d498
3feeeac
036561c
191580d
365fc87
085447e
1a60a5e
c1037f8
9e79ffd
3e2cc0f
4edc970
fd8e361
d88e8a1
36bea9a
df315c1
ef0d2b6
6c2cc4e
0fae761
119666c
2e46d88
2ed8b14
2f193d5
577f30a
b3a70f6
7acd8d5
7e90e2b
6e33ce2
f54f14c
8dd2cdb
518a8f0
801b131
393c508
f89f609
d6486fe
6f05963
4343e13
8b40a34
cae8561
c82a241
ad43ebe
71c10d9
ff5f47c
3dc2121
7e343e3
a83e85a
dea3e11
32ee6e1
305699b
b1226fa
ccbccc9
0a7f59a
9557e3a
899fe08
c398a21
bfbbf1c
f287a46
83fbfc5
a106dec
9067a54
d6c35eb
8c5d8d4
3c0fbc6
8dc8971
327ba0d
21573a3
9bf08a7
b8c1aaf
7932802
07e4274
4067c9a
7d04b16
0ed78d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,12 +31,22 @@ | |
from apache_beam.pipeline import PipelineVisitor | ||
from apache_beam.runners.dask.overrides import dask_overrides | ||
from apache_beam.runners.dask.transform_evaluator import TRANSLATIONS | ||
from apache_beam.runners.dask.transform_evaluator import DaskBagWindowedIterator | ||
from apache_beam.runners.dask.transform_evaluator import Flatten | ||
from apache_beam.runners.dask.transform_evaluator import NoOp | ||
from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner | ||
from apache_beam.runners.runner import PipelineResult | ||
from apache_beam.runners.runner import PipelineState | ||
from apache_beam.transforms.sideinputs import SideInputMap | ||
from apache_beam.utils.interactive_utils import is_in_notebook | ||
|
||
try: | ||
# Added to try to prevent threading related issues, see | ||
# https://github.com/pytest-dev/pytest/issues/3216#issuecomment-1502451456 | ||
import dask.distributed as ddist | ||
except ImportError: | ||
distributed = {} | ||
|
||
|
||
class DaskOptions(PipelineOptions): | ||
@staticmethod | ||
|
@@ -86,10 +96,9 @@ def _add_argparse_args(cls, parser: argparse.ArgumentParser) -> None: | |
|
||
@dataclasses.dataclass | ||
class DaskRunnerResult(PipelineResult): | ||
from dask import distributed | ||
|
||
client: distributed.Client | ||
futures: t.Sequence[distributed.Future] | ||
client: ddist.Client | ||
futures: t.Sequence[ddist.Future] | ||
|
||
def __post_init__(self): | ||
super().__init__(PipelineState.RUNNING) | ||
|
@@ -99,8 +108,16 @@ def wait_until_finish(self, duration=None) -> str: | |
if duration is not None: | ||
# Convert milliseconds to seconds | ||
duration /= 1000 | ||
self.client.wait_for_workers(timeout=duration) | ||
self.client.gather(self.futures, errors='raise') | ||
for _ in ddist.as_completed(self.futures, | ||
timeout=duration, | ||
with_results=True): | ||
# without gathering results, worker errors are not raised on the client: | ||
# https://distributed.dask.org/en/stable/resilience.html#user-code-failures | ||
# so we want to gather results to raise errors client-side, but we do | ||
# not actually need to use the results here, so we just pass. to gather, | ||
# we use the iterative `as_completed(..., with_results=True)`, instead | ||
# of aggregate `client.gather`, to minimize memory footprint of results. | ||
pass | ||
self._state = PipelineState.DONE | ||
except: # pylint: disable=broad-except | ||
self._state = PipelineState.FAILED | ||
|
@@ -133,6 +150,7 @@ def visit_transform(self, transform_node: AppliedPTransform) -> None: | |
op_class = TRANSLATIONS.get(transform_node.transform.__class__, NoOp) | ||
op = op_class(transform_node) | ||
|
||
op_kws = {"input_bag": None, "side_inputs": None} | ||
inputs = list(transform_node.inputs) | ||
if inputs: | ||
bag_inputs = [] | ||
|
@@ -144,13 +162,28 @@ def visit_transform(self, transform_node: AppliedPTransform) -> None: | |
if prev_op in self.bags: | ||
bag_inputs.append(self.bags[prev_op]) | ||
|
||
if len(bag_inputs) == 1: | ||
self.bags[transform_node] = op.apply(bag_inputs[0]) | ||
# Input to `Flatten` could be of length 1, e.g. a single-element | ||
# tuple: `(pcoll, ) | beam.Flatten()`. If so, we still pass it as | ||
# an iterable, because `Flatten.apply` always takes an iterable. | ||
if len(bag_inputs) == 1 and not isinstance(op, Flatten): | ||
op_kws["input_bag"] = bag_inputs[0] | ||
else: | ||
self.bags[transform_node] = op.apply(bag_inputs) | ||
op_kws["input_bag"] = bag_inputs | ||
|
||
side_inputs = list(transform_node.side_inputs) | ||
if side_inputs: | ||
bag_side_inputs = [] | ||
for si in side_inputs: | ||
si_asbag = self.bags.get(si.pvalue.producer) | ||
bag_side_inputs.append( | ||
SideInputMap( | ||
type(si), | ||
si._view_options(), | ||
DaskBagWindowedIterator(si_asbag, si._window_mapping_fn))) | ||
|
||
op_kws["side_inputs"] = bag_side_inputs | ||
|
||
else: | ||
self.bags[transform_node] = op.apply(None) | ||
self.bags[transform_node] = op.apply(**op_kws) | ||
|
||
return DaskBagVisitor() | ||
|
||
|
@@ -159,6 +192,8 @@ def is_fnapi_compatible(): | |
return False | ||
|
||
def run_pipeline(self, pipeline, options): | ||
import dask | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason to nest the import here when you're already trying to import There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The try..except is a recent addition (to deal with a test-time-only threading issue). Nested imports are what I've had for a long time. That's the main reason for why things are the way they are. Nested imports are there so that users don't have to depend on dask to load the runner. |
||
|
||
# TODO(alxr): Create interactive notebook support. | ||
if is_in_notebook(): | ||
raise NotImplementedError('interactive support will come later!') | ||
|
@@ -177,6 +212,6 @@ def run_pipeline(self, pipeline, options): | |
|
||
dask_visitor = self.to_dask_bag_visitor() | ||
pipeline.visit(dask_visitor) | ||
|
||
futures = client.compute(list(dask_visitor.bags.values())) | ||
opt_graph = dask.optimize(*list(dask_visitor.bags.values())) | ||
futures = client.compute(opt_graph) | ||
return DaskRunnerResult(client, futures) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could use help getting this to run on windows.