From cccac3a7300e4fc75452001be006cfc7182310fc Mon Sep 17 00:00:00 2001 From: Becket Qin Date: Thu, 8 Feb 2024 10:47:36 -0800 Subject: [PATCH] Allow SqlPtransform to run in streaming mode. --- .../flink/transform/sql/FlinkSQLTransformTranslator.java | 4 ---- .../beam/runners/flink/transform/sql/SqlTransformTest.java | 6 ++++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/FlinkSQLTransformTranslator.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/FlinkSQLTransformTranslator.java index 407789f1bceb..3f5c3307be9b 100644 --- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/FlinkSQLTransformTranslator.java +++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/FlinkSQLTransformTranslator.java @@ -58,10 +58,6 @@ class FlinkSQLTransformTranslator public void translateNode( PTransform, PCollection> transform, FlinkStreamingTranslationContext context) { - if (context.isStreaming()) { - throw new IllegalStateException( - "The current job is a streaming job. Flink SQL transform only support batch jobs."); - } MultiOutputSqlTransformWithInput sqlTransform = (MultiOutputSqlTransformWithInput) transform; StreamTableEnvironment tEnv = StreamTableEnvironment.create(context.getExecutionEnvironment()); diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/SqlTransformTest.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/SqlTransformTest.java index 3074bf673cc6..32b00a72cc71 100644 --- a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/SqlTransformTest.java +++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/SqlTransformTest.java @@ -278,8 +278,8 @@ public void testOnlySetAdditionalInputForMultiOutputSqlTransform() { pipeline.run(getPipelineOptions()); } - @Test(expected = IllegalStateException.class) - public void testApplySqlToStreamingJobThrowException() { + @Test + public void testStreamingMode() throws IOException { Pipeline pipeline = Pipeline.create(); SingleOutputSqlTransform transform = SqlTransform.of(FlinkSqlTestUtils.Order.class) @@ -287,6 +287,8 @@ public void testApplySqlToStreamingJobThrowException() { .withQuery("SELECT orderNumber, product, amount, price, buyer, orderTime FROM Orders"); pipeline.apply(transform); + PCollection outputs = pipeline.apply(transform); + verifyRecords(outputs, "Orders", FlinkSqlTestUtils.Order.class); FlinkPipelineOptions options = getPipelineOptions(); options.setStreaming(true); pipeline.run(options);