Skip to content

Commit

Permalink
Allow SqlPtransform to run in streaming mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
Becket Qin committed Feb 8, 2024
1 parent 0145270 commit cccac3a
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ class FlinkSQLTransformTranslator<InputT, OutputT>
public void translateNode(
PTransform<PCollection<InputT>, PCollection<OutputT>> transform,
FlinkStreamingTranslationContext context) {
if (context.isStreaming()) {
throw new IllegalStateException(
"The current job is a streaming job. Flink SQL transform only support batch jobs.");
}
MultiOutputSqlTransformWithInput<InputT, OutputT> sqlTransform =
(MultiOutputSqlTransformWithInput) transform;
StreamTableEnvironment tEnv = StreamTableEnvironment.create(context.getExecutionEnvironment());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,17 @@ public void testOnlySetAdditionalInputForMultiOutputSqlTransform() {
pipeline.run(getPipelineOptions());
}

@Test(expected = IllegalStateException.class)
public void testApplySqlToStreamingJobThrowException() {
@Test
public void testStreamingMode() throws IOException {
Pipeline pipeline = Pipeline.create();
SingleOutputSqlTransform<FlinkSqlTestUtils.Order> transform =
SqlTransform.of(FlinkSqlTestUtils.Order.class)
.withDDL(ORDERS_DDL)
.withQuery("SELECT orderNumber, product, amount, price, buyer, orderTime FROM Orders");
pipeline.apply(transform);

PCollection<FlinkSqlTestUtils.Order> outputs = pipeline.apply(transform);
verifyRecords(outputs, "Orders", FlinkSqlTestUtils.Order.class);
FlinkPipelineOptions options = getPipelineOptions();
options.setStreaming(true);
pipeline.run(options);
Expand Down

0 comments on commit cccac3a

Please sign in to comment.