From 28753577f79575bebae760af0ff52f170d8a1292 Mon Sep 17 00:00:00 2001 From: Jian Xiao <99709935+jianoaix@users.noreply.github.com> Date: Wed, 1 Mar 2023 13:55:57 -0800 Subject: [PATCH] Streaming executor fixes #4 (#32882) --- python/ray/data/tests/test_bulk_executor.py | 1 + python/ray/data/tests/test_dataset.py | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/python/ray/data/tests/test_bulk_executor.py b/python/ray/data/tests/test_bulk_executor.py index 4af3643185f4..c3d293978d52 100644 --- a/python/ray/data/tests/test_bulk_executor.py +++ b/python/ray/data/tests/test_bulk_executor.py @@ -125,6 +125,7 @@ def test_actor_strategy(ray_start_10_cpus_shared): def test_new_execution_backend_invocation(ray_start_10_cpus_shared): DatasetContext.get_current().new_execution_backend = True + DatasetContext.get_current().use_streaming_executor = False # Read-only: will use legacy executor for now. ds = ray.data.range(10) assert ds.take_all() == list(range(10)) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 14da782339e0..fb61d2f520aa 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -382,12 +382,13 @@ def test_zip_different_num_blocks_split_smallest( [{str(i): i for i in range(num_cols1, num_cols1 + num_cols2)}] * n, parallelism=num_blocks2, ) - ds = ds1.zip(ds2) + ds = ds1.zip(ds2).fully_executed() + num_blocks = ds._plan._snapshot_blocks.executed_num_blocks() assert ds.take() == [{str(i): i for i in range(num_cols1 + num_cols2)}] * n if should_invert: - assert ds.num_blocks() == num_blocks2 + assert num_blocks == num_blocks2 else: - assert ds.num_blocks() == num_blocks1 + assert num_blocks == num_blocks1 def test_zip_pandas(ray_start_regular_shared):