diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransform.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransform.java
index a0d6da8e0f8d..a3ea89c6570f 100644
--- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransform.java
+++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransform.java
@@ -39,10 +39,6 @@
* {@link PCollection PCollections}, either {@link SingleOutputSqlTransformWithInput} or {@link
* MultiOutputSqlTransformWithInput} is the way to go.
*
- *
NOTE: This {@link PTransform} only works with Flink Runner in batch mode.
- *
- *
- *
*
*
*
Specify the input tables
diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransformWithInput.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransformWithInput.java
index 3e8605451e5a..6ea9c863db55 100644
--- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransformWithInput.java
+++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransformWithInput.java
@@ -44,10 +44,6 @@
* PCollection PCollections}. The {@link MultiOutputSqlTransformWithInput} differs from {@link
* SingleOutputSqlTransformWithInput} that it supports multiple output {@link PCollection}s.
*
- * NOTE: This {@link PTransform} only works with Flink Runner in batch mode.
- *
- *
- *
*
*
*
Specify the input tables
diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransform.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransform.java
index e87dce145684..507ec2532ed7 100644
--- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransform.java
+++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransform.java
@@ -37,10 +37,6 @@
* want to apply a SQL Transform to existing {@link PCollection PCollections}, either {@link
* SingleOutputSqlTransformWithInput} or {@link MultiOutputSqlTransformWithInput} is the way to go.
*
- * NOTE: This {@link PTransform} only works with Flink Runner in batch mode.
- *
- *
- *
*
*
*
Specify the input tables
diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransformWithInput.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransformWithInput.java
index 274b28bb8e9b..9d8b5dd980b8 100644
--- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransformWithInput.java
+++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransformWithInput.java
@@ -37,10 +37,6 @@
* PCollection PCollections}. The {@link SingleOutputSqlTransformWithInput} differs from the {@link
* MultiOutputSqlTransformWithInput} that it only supports one output {@link PCollection}.
*
- * NOTE: This {@link PTransform} only works with Flink Runner in batch mode.
- *
- *
- *
*
*
*
Specify the input tables
diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SqlTransform.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SqlTransform.java
index f2cc87ed28af..55e715856274 100644
--- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SqlTransform.java
+++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SqlTransform.java
@@ -114,6 +114,16 @@ public static SingleOutputSqlTransform of(Class outputClass) {
return new SingleOutputSqlTransform<>(of(Integer.class, outputClass));
}
+ /**
+ * Create a {@link StatementOnlySqlTransform} which takes a full script of SQL statements and
+ * execute them. The statements must have at least one INSERT INTO
statement.
+ *
+ * @return A {@link StatementOnlySqlTransform}.
+ */
+ public static StatementOnlySqlTransform ofStatements() {
+ return new StatementOnlySqlTransform();
+ }
+
// --------------------- setters ----------------------------
/**
* Use DDL to define Tables. The DDL string can contain multiple {@code CREATE TABLE} / {@code
diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlyFlinkSqlTransformTranslator.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlyFlinkSqlTransformTranslator.java
new file mode 100644
index 000000000000..b965657b00fe
--- /dev/null
+++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlyFlinkSqlTransformTranslator.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink.transform.sql;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import java.util.StringJoiner;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.runners.flink.FlinkCustomTransformTranslatorRegistrar;
+import org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator;
+import org.apache.beam.runners.flink.FlinkStreamingTranslationContext;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.flink.table.api.bridge.java.StreamStatementSet;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class StatementOnlyFlinkSqlTransformTranslator
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator>{
+ private static final Logger LOG = LoggerFactory.getLogger(StatementOnlyFlinkSqlTransformTranslator.class);
+ public static final String FLINK_STATEMENT_ONLY_SQL_URN = "beam:transform:flink:sql-statements-only:v1";
+ private static final String INSERT_INTO = "INSERT INTO";
+
+ @Override
+ public void translateNode(PTransform transform, FlinkStreamingTranslationContext context) {
+ StatementOnlySqlTransform sqlTransform = (StatementOnlySqlTransform) transform;
+
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(context.getExecutionEnvironment());
+ sqlTransform.getCatalogs().forEach(tEnv::registerCatalog);
+ StringJoiner combinedStatements = new StringJoiner("\n\n");
+ StreamStatementSet ss = tEnv.createStatementSet();
+ for (String statement : sqlTransform.getStatements()) {
+ combinedStatements.add(statement);
+ try {
+ if (isInsertIntoStatement(statement)) {
+ ss.addInsertSql(statement);
+ } else {
+ // Not an insert into statement. Treat it as a DDL.
+ tEnv.executeSql(statement);
+ }
+ } catch (Exception e) {
+ LOG.error("Encountered exception when executing statement: {}", statement);
+ throw new RuntimeException(e);
+ }
+ }
+ // Now attach everything to StreamExecutionEnv.
+ ss.attachAsDataStream();
+ LOG.info("Executing SQL statements:\n {}", combinedStatements);
+ }
+
+ /** Registers Flink SQL PTransform URN. */
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ @SuppressWarnings("rawtypes")
+ public static class FlinkTransformsRegistrar implements TransformPayloadTranslatorRegistrar {
+ @Override
+ public Map<
+ ? extends Class extends PTransform>,
+ ? extends PTransformTranslation.TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return ImmutableMap
+ ., PTransformTranslation.TransformPayloadTranslator>builder()
+ .put(
+ StatementOnlySqlTransform.class,
+ PTransformTranslation.TransformPayloadTranslator.NotSerializable.forUrn(
+ FLINK_STATEMENT_ONLY_SQL_URN))
+ .build();
+ }
+ }
+
+ /** Registers Flink SQL PTransform to the Flink runner. */
+ @AutoService(FlinkCustomTransformTranslatorRegistrar.class)
+ public static class FlinkSqlTransformsRegistrar
+ implements FlinkCustomTransformTranslatorRegistrar {
+ @Override
+ public Map>
+ getTransformTranslators() {
+ return ImmutableMap
+ .>builder()
+ .put(FLINK_STATEMENT_ONLY_SQL_URN, new StatementOnlyFlinkSqlTransformTranslator())
+ .build();
+ }
+ }
+
+ // ------------------- private helper methods -----------------
+ private static boolean isInsertIntoStatement(String statement) {
+ return statement.substring(0, INSERT_INTO.length()).toUpperCase().startsWith(INSERT_INTO);
+ }
+}
diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransform.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransform.java
new file mode 100644
index 000000000000..6fbcf7b39ed2
--- /dev/null
+++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransform.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink.transform.sql;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.util.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Beam PTransform that only take a complete SQL statements with INSERT INTO clause.
+ */
+public class StatementOnlySqlTransform extends PTransform {
+ private static final Logger LOG = LoggerFactory.getLogger(StatementOnlySqlTransform.class);
+
+ private final List statements;
+ private final Map catalogs;
+
+ StatementOnlySqlTransform() {
+ this.statements = new ArrayList<>();
+ this.catalogs = new HashMap<>();
+ }
+
+ @Override
+ public PDone expand(PBegin input) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("User statements:");
+ for (String statement : statements) {
+ LOG.debug("{}\n", statement);
+ }
+ }
+ return PDone.in(input.getPipeline());
+ }
+
+ @Override
+ public void validate(@Nullable PipelineOptions options) {
+ Preconditions.checkArgument(!statements.isEmpty(), "No statement is provided for the SqlPtransform..");
+ }
+
+ /**
+ * Add any Flink SQL statement to this transform. Note that there must be a INSERT INTO
+ * statement. Otherwise, an exception will be thrown.
+ *
+ * @param statement the statement to be added.
+ * @return this {@link StatementOnlySqlTransform}.
+ */
+ public StatementOnlySqlTransform addStatement(String statement) {
+ statements.add(cleanUp(statement));
+ return this;
+ }
+
+ /**
+ * Define add a new {@link Catalog} to be used by the SQL query.
+ *
+ * @param name the name of the catalog.
+ * @param catalog the catalog to use.
+ * @return this {@link MultiOutputSqlTransformWithInput} itself.
+ */
+ public StatementOnlySqlTransform withCatalog(String name, SerializableCatalog catalog) {
+ catalogs.put(name, catalog);
+ return this;
+ }
+
+ // --------------------- package private getters -----------------
+ List getStatements() {
+ return Collections.unmodifiableList(statements);
+ }
+
+ Map getCatalogs() {
+ return Collections.unmodifiableMap(catalogs);
+ }
+
+ // --------------------- private helpers ------------------------
+ private static String cleanUp(String s) {
+ return s.trim().endsWith(";") ? s : s + ";";
+ }
+}
diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/FlinkSqlTestUtils.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/FlinkSqlTestUtils.java
index 46b0aa2a35c8..9c60fc0903ec 100644
--- a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/FlinkSqlTestUtils.java
+++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/FlinkSqlTestUtils.java
@@ -28,6 +28,8 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.flink.translation.types.TypeInformationCoder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.ValueProvider;
@@ -75,6 +77,22 @@ public class FlinkSqlTestUtils {
+ ")",
getFilePath("Orders"));
+ public static final String ORDERS_VERIFYING_SINK_2_DDL =
+ String.format("CREATE TABLE OrdersVerify2 (\n"
+ + " orderNumber BIGINT,\n"
+ + " product String,\n"
+ + " amount INT,\n"
+ + " price DECIMAL(8, 2),\n"
+ + " buyer STRING,\n"
+ + " orderTime TIMESTAMP(3)\n"
+ + ") WITH (\n"
+ + " 'connector' = '%s',\n"
+ + " '%s' = '%s'\n"
+ + ")",
+ VerifyingTableSinkFactory.IDENTIFIER,
+ VerifyingTableSinkFactory.EXPECTED_RESULT_FILE_PATH_OPTION.key(),
+ getFilePath("Orders"));
+
public static final String PRODUCTS_DDL =
String.format(
"CREATE TABLE Products (\n"
@@ -131,17 +149,7 @@ public boolean isAccessible() {
public static CatalogTable getOrdersCatalogTable() {
// Create schema
- ResolvedSchema resolvedSchema =
- new ResolvedSchema(
- Arrays.asList(
- Column.physical("orderNumber", DataTypes.BIGINT()),
- Column.physical("product", DataTypes.STRING()),
- Column.physical("amount", DataTypes.INT()),
- Column.physical("price", DataTypes.DOUBLE()),
- Column.physical("buyer", DataTypes.STRING()),
- Column.physical("orderTime", DataTypes.TIMESTAMP(3))),
- Collections.emptyList(),
- UniqueConstraint.primaryKey("UniqueProductName", Collections.singletonList("name")));
+ ResolvedSchema resolvedSchema = getOrdersSchema();
Map connectorOptions = new HashMap<>();
connectorOptions.put(FactoryUtil.CONNECTOR.key(), "filesystem");
@@ -152,13 +160,53 @@ public static CatalogTable getOrdersCatalogTable() {
final CatalogTable origin =
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
- "Products Catalog Table",
+ "Orders Catalog Table",
Collections.emptyList(),
connectorOptions);
return new ResolvedCatalogTable(origin, resolvedSchema);
}
+ public static CatalogTable getOrdersVerifyCatalogTable() {
+ // Create schema
+ ResolvedSchema resolvedSchema = getOrdersSchema();
+
+ Map connectorOptions = new HashMap<>();
+ connectorOptions.put(FactoryUtil.CONNECTOR.key(), VerifyingTableSinkFactory.IDENTIFIER);
+ connectorOptions.put(VerifyingTableSinkFactory.EXPECTED_RESULT_FILE_PATH_OPTION.key(), getFilePath("Orders"));
+ connectorOptions.put(VerifyingTableSinkFactory.HAS_HEADER_OPTION.key(), "true");
+
+ final CatalogTable origin =
+ CatalogTable.of(
+ Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+ "Orders Catalog Verify Table",
+ Collections.emptyList(),
+ connectorOptions);
+
+ return new ResolvedCatalogTable(origin, resolvedSchema);
+ }
+
+ public static FlinkPipelineOptions getPipelineOptions() {
+ FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
+ options.setRunner(FlinkRunner.class);
+ options.setUseDataStreamForBatch(true);
+ options.setParallelism(2);
+ return options;
+ }
+
+ private static ResolvedSchema getOrdersSchema() {
+ return new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("orderNumber", DataTypes.BIGINT()),
+ Column.physical("product", DataTypes.STRING()),
+ Column.physical("amount", DataTypes.INT()),
+ Column.physical("price", DataTypes.DOUBLE()),
+ Column.physical("buyer", DataTypes.STRING()),
+ Column.physical("orderTime", DataTypes.TIMESTAMP(3))),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("UniqueProductName", Collections.singletonList("name")));
+ }
+
// -------------------- public classes ----------------------
public static class ToUpperCaseAndReplaceString extends ScalarFunction {
diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/SqlTransformTest.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/SqlTransformTest.java
index 32b00a72cc71..9c3ff695cb6e 100644
--- a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/SqlTransformTest.java
+++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/SqlTransformTest.java
@@ -21,6 +21,7 @@
import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.ORDER;
import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.ORDERS_DDL;
import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.PRODUCTS_DDL;
+import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.getPipelineOptions;
import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.getSingletonOrderPCollection;
import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.getSingletonPCollection;
import static org.junit.Assert.fail;
@@ -31,7 +32,6 @@
import java.util.Set;
import java.util.function.Consumer;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.TextualIntegerCoder;
import org.apache.beam.sdk.testing.PAssert;
@@ -383,14 +383,6 @@ private void testUserDefinedFunction(
// ---------------- private helper methods -----------------------
- private static FlinkPipelineOptions getPipelineOptions() {
- FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
- options.setRunner(FlinkRunner.class);
- options.setUseDataStreamForBatch(true);
- options.setParallelism(2);
- return options;
- }
-
private static void verifyRecords(PCollection pCollection, String file, Class clazz)
throws IOException {
PAssert.that(pCollection).containsInAnyOrder(getExpectedRecords(file, clazz));
diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransformTest.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransformTest.java
new file mode 100644
index 000000000000..4710a0779c17
--- /dev/null
+++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransformTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink.transform.sql;
+
+import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.ORDERS_VERIFYING_SINK_2_DDL;
+
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.junit.Test;
+
+
+/**
+ * Unit tests for {@link StatementOnlySqlTransform}.
+ */
+public class StatementOnlySqlTransformTest {
+ @Test
+ public void testBatch() {
+ testBasics(false);
+ }
+
+ @Test
+ public void testStreaming() {
+ testBasics(true);
+ }
+
+ @Test
+ public void testCreateCatalogViaDDL() {
+ Pipeline pipeline = Pipeline.create();
+ StatementOnlySqlTransform transform = SqlTransform.ofStatements()
+ .addStatement(
+ String.format(
+ "CREATE CATALOG MyCatalog with ( 'type' = '%s' )",
+ TestingInMemCatalogFactory.IDENTIFIER))
+ .addStatement("INSERT INTO MyCatalog.TestDatabase.OrdersVerify SELECT * FROM MyCatalog.TestDatabase.Orders;");
+
+ pipeline.apply(transform);
+ pipeline.run(getPipelineOptions());
+ }
+
+ @Test
+ public void testDDLAndMultipleInsertStatements() {
+ SerializableCatalog catalog = TestingInMemCatalogFactory.getCatalog("TestCatalog");
+
+ Pipeline pipeline = Pipeline.create();
+ StatementOnlySqlTransform transform = SqlTransform.ofStatements()
+ .withCatalog("MyCatalog", catalog)
+ .addStatement(ORDERS_VERIFYING_SINK_2_DDL)
+ .addStatement("CREATE TEMPORARY VIEW MyView AS SELECT * FROM MyCatalog.TestDatabase.Orders;")
+ .addStatement("INSERT INTO MyCatalog.TestDatabase.OrdersVerify SELECT * FROM MyView;")
+ .addStatement("INSERT INTO OrdersVerify2 SELECT * FROM MyView;");
+
+ pipeline.apply(transform);
+ pipeline.run(getPipelineOptions());
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void testEmptyStatements() {
+ StatementOnlySqlTransform transform = SqlTransform.ofStatements();
+ Pipeline pipeline = Pipeline.create();
+ pipeline.apply(transform);
+ pipeline.run(getPipelineOptions());
+ }
+
+ // ----------------
+ private void testBasics(boolean isStreaming) {
+ SerializableCatalog catalog = TestingInMemCatalogFactory.getCatalog("TestCatalog");
+
+ Pipeline pipeline = Pipeline.create();
+ StatementOnlySqlTransform transform = SqlTransform.ofStatements()
+ .withCatalog("MyCatalog", catalog)
+ .addStatement("CREATE TEMPORARY VIEW MyView AS SELECT * FROM MyCatalog.TestDatabase.Orders;")
+ .addStatement("INSERT INTO MyCatalog.TestDatabase.OrdersVerify SELECT * FROM MyView;");
+
+ pipeline.apply(transform);
+ FlinkPipelineOptions options = getPipelineOptions();
+ options.setStreaming(isStreaming);
+ pipeline.run(options);
+ }
+
+ private FlinkPipelineOptions getPipelineOptions() {
+ FlinkPipelineOptions options = FlinkSqlTestUtils.getPipelineOptions();
+ options.setParallelism(1);
+ return options;
+ }
+}
diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/TestingInMemCatalogFactory.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/TestingInMemCatalogFactory.java
index fd6515cdda96..3d28b67677a0 100644
--- a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/TestingInMemCatalogFactory.java
+++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/TestingInMemCatalogFactory.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.flink.transform.sql;
import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.getOrdersCatalogTable;
+import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.getOrdersVerifyCatalogTable;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
@@ -34,7 +35,14 @@ public class TestingInMemCatalogFactory implements CatalogFactory {
public static TestingInMemCatalog getCatalog(String name) {
TestingInMemCatalog catalog = new TestingInMemCatalog(name, DEFAULT_DATABASE_NAME);
try {
- catalog.createTable(new ObjectPath("TestDatabase", "Orders"), getOrdersCatalogTable(), true);
+ catalog.createTable(
+ new ObjectPath("TestDatabase", "Orders"),
+ getOrdersCatalogTable(),
+ true);
+ catalog.createTable(
+ new ObjectPath("TestDatabase", "OrdersVerify"),
+ getOrdersVerifyCatalogTable(),
+ true);
} catch (TableAlreadyExistException | DatabaseNotExistException e) {
throw new RuntimeException(e);
}
diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/VerifyingTableSinkFactory.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/VerifyingTableSinkFactory.java
new file mode 100644
index 000000000000..f313174e5734
--- /dev/null
+++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/VerifyingTableSinkFactory.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink.transform.sql;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+
+/**
+ * A table factory class that verifies the output of {@link StatementOnlySqlTransform}.
+ * It reads from a specified table file as the expected output, and then compare it
+ * with the records sent to this Sink.
+ */
+public class VerifyingTableSinkFactory implements DynamicTableSinkFactory, Serializable {
+ public static final String IDENTIFIER = "verifyingSink";
+ public static final ConfigOption EXPECTED_RESULT_FILE_PATH_OPTION =
+ ConfigOptions
+ .key("expected.result.file.path")
+ .stringType()
+ .noDefaultValue();
+ public static final ConfigOption HAS_HEADER_OPTION =
+ ConfigOptions
+ .key("has.header")
+ .booleanType()
+ .defaultValue(true);
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context tableSinkFactoryContext) {
+ // We need to make sure the sink is serializable. So we need to get rid of the non-serializable
+ // objects here and extract the information into serializable objects.
+ Configuration config = Configuration.fromMap(tableSinkFactoryContext.getCatalogTable().getOptions());
+ ResolvedSchema schema = tableSinkFactoryContext.getCatalogTable().getResolvedSchema();
+ List logicalTypes = schema
+ .getColumns()
+ .stream()
+ .map(c -> c.getDataType().getLogicalType())
+ .collect(Collectors.toList());
+
+ final String expectedResultFilePath = config.get(EXPECTED_RESULT_FILE_PATH_OPTION);
+ final boolean hasHeader = config.get(HAS_HEADER_OPTION);
+ final List columnsToVerify = schema.getColumnNames();
+
+ return new SerializableDynamicTableSink() {
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return ChangelogMode.all();
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(
+ org.apache.flink.table.connector.sink.DynamicTableSink.Context tableSinkContext) {
+ return new SerializableSinkV2Provider() {
+ @Override
+ public Sink createSink() {
+ return new SerializableSink() {
+ @Override
+ public SinkWriter createWriter(InitContext sinkInitContext) throws IOException {
+ return new VerifyingSinkWriter(logicalTypes, expectedResultFilePath, hasHeader, columnsToVerify);
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return this;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "VerifyingDynamicTableSink";
+ }
+ };
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ return Collections.singleton(EXPECTED_RESULT_FILE_PATH_OPTION);
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ return Collections.singleton(HAS_HEADER_OPTION);
+ }
+
+ private static class VerifyingSinkWriter implements SinkWriter, Serializable {
+
+ private final List logicalTypes;
+ private final ExpectedResultTable expectedRows;
+ private final int[] indexToVerify;
+ private int rowNumber;
+ private VerifyingSinkWriter(
+ List logicalTypes,
+ String expectedRowsFile,
+ boolean hasHeaderLine,
+ List columnsToVerify) throws FileNotFoundException {
+ this.logicalTypes = logicalTypes;
+ this.rowNumber = 0;
+ this.expectedRows = readRowsFromFile(expectedRowsFile, hasHeaderLine);
+
+ indexToVerify = new int[columnsToVerify.size()];
+ for (int i = 0; i < columnsToVerify.size(); i++) {
+ indexToVerify[i] = Preconditions.checkStateNotNull(expectedRows.headerNameToIndex.get(columnsToVerify.get(i)));
+ }
+ }
+
+ @Override
+ public void write(RowData element, org.apache.flink.api.connector.sink2.SinkWriter.Context sinkContext)
+ throws IOException, InterruptedException {
+ List actual = Splitter.on(',').splitToList(
+ element.toString().replace("+I(", "").replace(")", ""));
+ for (int index : indexToVerify) {
+ String expected = expectedRows.rows.get(rowNumber).get(index);
+ if (!compareValue(logicalTypes.get(index), expected, element, index)) {
+ throw new RuntimeException(String.format(
+ "The following Rows are unequal:\n expected: %s\n actual: %s\n", expected, actual));
+ }
+ }
+ rowNumber++;
+ }
+
+ @Override
+ public void flush(boolean endOfInput) throws IOException, InterruptedException {
+ // No op.
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (rowNumber != expectedRows.rows.size()) {
+ throw new RuntimeException(String.format("Expect %d rows, but only received %d rows",
+ expectedRows.rows.size(), rowNumber));
+ }
+ }
+ }
+
+ private static boolean compareValue(LogicalType logicalType, String expected, RowData row, int index) {
+ if (logicalType instanceof TimestampType) {
+ LocalDateTime actual = row.getTimestamp(index, 3).toLocalDateTime();
+ return LocalDateTime.parse(expected.replace(" ", "T")).equals(actual);
+ } else if (logicalType instanceof BigIntType) {
+ long actual = row.getLong(index);
+ return Long.parseLong(expected) == actual;
+ } else if (logicalType instanceof IntType) {
+ int actual = row.getInt(index);
+ return Integer.parseInt(expected) == actual;
+ } else if (logicalType instanceof DecimalType) {
+ BigDecimal expectedBigDecimal = BigDecimal.valueOf(Double.parseDouble(expected));
+ return row.getDecimal(index, 8, 2).toBigDecimal().compareTo(expectedBigDecimal) == 0;
+ } else if (logicalType instanceof DoubleType) {
+ return row.getDouble(index) == Double.parseDouble(expected);
+ } else if (logicalType instanceof VarCharType) {
+ return row.getString(index).toString().equals(expected);
+ } else {
+ throw new RuntimeException("Unrecognized logical type." + logicalType);
+ }
+ }
+
+ private static ExpectedResultTable readRowsFromFile(String path, boolean hasHeaderLine) throws FileNotFoundException {
+ ExpectedResultTable expected = new ExpectedResultTable();
+ File file = new File(path);
+ try (Scanner scanner = new Scanner(file, Charsets.UTF_8.name())) {
+ boolean readHeaderLine = hasHeaderLine;
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine();
+ List columns = Splitter.on(',').splitToList(line);
+ if (readHeaderLine) {
+ for (int i = 0; i < columns.size(); i++) {
+ expected.headerNameToIndex.put(columns.get(i).replace("#", ""), i);
+ }
+ readHeaderLine = false;
+ } else {
+ expected.rows.add(columns);
+ }
+ }
+ return expected;
+ }
+ }
+
+ interface SerializableDynamicTableSink extends DynamicTableSink, Serializable {
+
+ }
+
+ interface SerializableSinkV2Provider extends SinkV2Provider, Serializable {
+
+ }
+
+ interface SerializableSink extends Sink, Serializable {
+
+ }
+
+ private static class ExpectedResultTable {
+ Map headerNameToIndex = new HashMap<>();
+ List> rows = new ArrayList<>();
+ }
+}
diff --git a/runners/flink/1.15/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/runners/flink/1.15/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 1055b429e958..0a4103d5f6eb 100644
--- a/runners/flink/1.15/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/runners/flink/1.15/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -17,3 +17,4 @@
#
org.apache.beam.runners.flink.transform.sql.TestingInMemCatalogFactory
+org.apache.beam.runners.flink.transform.sql.VerifyingTableSinkFactory