Skip to content

Commit

Permalink
Revert "[SPARK-47777][PYTHON][SS][TESTS] Add spark connect test for p…
Browse files Browse the repository at this point in the history
…ython streaming data source"

This reverts commit 3d2b7fe.
  • Loading branch information
HyukjinKwon committed May 6, 2024
1 parent 8b22517 commit 4e69857
Showing 3 changed files with 5 additions and 41 deletions.
1 change: 0 additions & 1 deletion dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
@@ -1047,7 +1047,6 @@ def __hash__(self):
"pyspark.sql.tests.connect.test_parity_arrow_grouped_map",
"pyspark.sql.tests.connect.test_parity_arrow_cogrouped_map",
"pyspark.sql.tests.connect.test_parity_python_datasource",
"pyspark.sql.tests.connect.test_parity_python_streaming_datasource",
"pyspark.sql.tests.connect.test_utils",
"pyspark.sql.tests.connect.client.test_artifact",
"pyspark.sql.tests.connect.client.test_client",

This file was deleted.

6 changes: 5 additions & 1 deletion python/pyspark/sql/tests/test_python_streaming_datasource.py
Original file line number Diff line number Diff line change
@@ -141,11 +141,15 @@ def test_stream_reader(self):
self.spark.dataSource.register(self._get_test_data_source())
df = self.spark.readStream.format("TestDataSource").load()

current_batch_id = -1

def check_batch(df, batch_id):
nonlocal current_batch_id
current_batch_id = batch_id
assertDataFrameEqual(df, [Row(batch_id * 2), Row(batch_id * 2 + 1)])

q = df.writeStream.foreachBatch(check_batch).start()
while len(q.recentProgress) < 10:
while current_batch_id < 10:
time.sleep(0.2)
q.stop()
q.awaitTermination

0 comments on commit 4e69857

Please sign in to comment.