diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 15362bca9aec..21ae001b2d12 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' - project.version = '2.45.18' + project.version = '2.45.19' if (isLinkedin(project)) { project.ext.mavenGroupId = 'com.linkedin.beam' } diff --git a/gradle.properties b/gradle.properties index e9cbd0a7aa2f..8d286f625d5f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy. # To build a custom Beam version make sure you change it in both places, see # https://github.com/apache/beam/issues/21302. -version=2.45.18 -sdk_version=2.45.18 +version=2.45.19 +sdk_version=2.45.19 javaVersion=1.8 diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/FlinkSQLTransformTranslator.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/FlinkSQLTransformTranslator.java index 407789f1bceb..3f5c3307be9b 100644 --- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/FlinkSQLTransformTranslator.java +++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/FlinkSQLTransformTranslator.java @@ -58,10 +58,6 @@ class FlinkSQLTransformTranslator public void translateNode( PTransform, PCollection> transform, FlinkStreamingTranslationContext context) { - if (context.isStreaming()) { - throw new IllegalStateException( - "The current job is a streaming job. Flink SQL transform only support batch jobs."); - } MultiOutputSqlTransformWithInput sqlTransform = (MultiOutputSqlTransformWithInput) transform; StreamTableEnvironment tEnv = StreamTableEnvironment.create(context.getExecutionEnvironment()); diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransform.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransform.java index a0d6da8e0f8d..a3ea89c6570f 100644 --- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransform.java +++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransform.java @@ -39,10 +39,6 @@ * {@link PCollection PCollections}, either {@link SingleOutputSqlTransformWithInput} or {@link * MultiOutputSqlTransformWithInput} is the way to go. * - *

NOTE: This {@link PTransform} only works with Flink Runner in batch mode. - * - *

- * *

* *

Specify the input tables

diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransformWithInput.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransformWithInput.java index 3e8605451e5a..6ea9c863db55 100644 --- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransformWithInput.java +++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/MultiOutputSqlTransformWithInput.java @@ -44,10 +44,6 @@ * PCollection PCollections}. The {@link MultiOutputSqlTransformWithInput} differs from {@link * SingleOutputSqlTransformWithInput} that it supports multiple output {@link PCollection}s. * - *

NOTE: This {@link PTransform} only works with Flink Runner in batch mode. - * - *

- * *

* *

Specify the input tables

diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransform.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransform.java index e87dce145684..507ec2532ed7 100644 --- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransform.java +++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransform.java @@ -37,10 +37,6 @@ * want to apply a SQL Transform to existing {@link PCollection PCollections}, either {@link * SingleOutputSqlTransformWithInput} or {@link MultiOutputSqlTransformWithInput} is the way to go. * - *

NOTE: This {@link PTransform} only works with Flink Runner in batch mode. - * - *

- * *

* *

Specify the input tables

diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransformWithInput.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransformWithInput.java index 274b28bb8e9b..9d8b5dd980b8 100644 --- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransformWithInput.java +++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SingleOutputSqlTransformWithInput.java @@ -37,10 +37,6 @@ * PCollection PCollections}. The {@link SingleOutputSqlTransformWithInput} differs from the {@link * MultiOutputSqlTransformWithInput} that it only supports one output {@link PCollection}. * - *

NOTE: This {@link PTransform} only works with Flink Runner in batch mode. - * - *

- * *

* *

Specify the input tables

diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SqlTransform.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SqlTransform.java index f2cc87ed28af..55e715856274 100644 --- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SqlTransform.java +++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/SqlTransform.java @@ -114,6 +114,16 @@ public static SingleOutputSqlTransform of(Class outputClass) { return new SingleOutputSqlTransform<>(of(Integer.class, outputClass)); } + /** + * Create a {@link StatementOnlySqlTransform} which takes a full script of SQL statements and + * execute them. The statements must have at least one INSERT INTO statement. + * + * @return A {@link StatementOnlySqlTransform}. + */ + public static StatementOnlySqlTransform ofStatements() { + return new StatementOnlySqlTransform(); + } + // --------------------- setters ---------------------------- /** * Use DDL to define Tables. The DDL string can contain multiple {@code CREATE TABLE} / {@code diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlyFlinkSqlTransformTranslator.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlyFlinkSqlTransformTranslator.java new file mode 100644 index 000000000000..b965657b00fe --- /dev/null +++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlyFlinkSqlTransformTranslator.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink.transform.sql; + +import com.google.auto.service.AutoService; +import java.util.Map; +import java.util.StringJoiner; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.runners.flink.FlinkCustomTransformTranslatorRegistrar; +import org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator; +import org.apache.beam.runners.flink.FlinkStreamingTranslationContext; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.flink.table.api.bridge.java.StreamStatementSet; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StatementOnlyFlinkSqlTransformTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator>{ + private static final Logger LOG = LoggerFactory.getLogger(StatementOnlyFlinkSqlTransformTranslator.class); + public static final String FLINK_STATEMENT_ONLY_SQL_URN = "beam:transform:flink:sql-statements-only:v1"; + private static final String INSERT_INTO = "INSERT INTO"; + + @Override + public void translateNode(PTransform transform, FlinkStreamingTranslationContext context) { + StatementOnlySqlTransform sqlTransform = (StatementOnlySqlTransform) transform; + + StreamTableEnvironment tEnv = StreamTableEnvironment.create(context.getExecutionEnvironment()); + sqlTransform.getCatalogs().forEach(tEnv::registerCatalog); + StringJoiner combinedStatements = new StringJoiner("\n\n"); + StreamStatementSet ss = tEnv.createStatementSet(); + for (String statement : sqlTransform.getStatements()) { + combinedStatements.add(statement); + try { + if (isInsertIntoStatement(statement)) { + ss.addInsertSql(statement); + } else { + // Not an insert into statement. Treat it as a DDL. + tEnv.executeSql(statement); + } + } catch (Exception e) { + LOG.error("Encountered exception when executing statement: {}", statement); + throw new RuntimeException(e); + } + } + // Now attach everything to StreamExecutionEnv. + ss.attachAsDataStream(); + LOG.info("Executing SQL statements:\n {}", combinedStatements); + } + + /** Registers Flink SQL PTransform URN. */ + @AutoService(TransformPayloadTranslatorRegistrar.class) + @SuppressWarnings("rawtypes") + public static class FlinkTransformsRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map< + ? extends Class, + ? extends PTransformTranslation.TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap + ., PTransformTranslation.TransformPayloadTranslator>builder() + .put( + StatementOnlySqlTransform.class, + PTransformTranslation.TransformPayloadTranslator.NotSerializable.forUrn( + FLINK_STATEMENT_ONLY_SQL_URN)) + .build(); + } + } + + /** Registers Flink SQL PTransform to the Flink runner. */ + @AutoService(FlinkCustomTransformTranslatorRegistrar.class) + public static class FlinkSqlTransformsRegistrar + implements FlinkCustomTransformTranslatorRegistrar { + @Override + public Map> + getTransformTranslators() { + return ImmutableMap + .>builder() + .put(FLINK_STATEMENT_ONLY_SQL_URN, new StatementOnlyFlinkSqlTransformTranslator()) + .build(); + } + } + + // ------------------- private helper methods ----------------- + private static boolean isInsertIntoStatement(String statement) { + return statement.substring(0, INSERT_INTO.length()).toUpperCase().startsWith(INSERT_INTO); + } +} diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransform.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransform.java new file mode 100644 index 000000000000..6fbcf7b39ed2 --- /dev/null +++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransform.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink.transform.sql; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PDone; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.util.Preconditions; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Beam PTransform that only take a complete SQL statements with INSERT INTO clause. + */ +public class StatementOnlySqlTransform extends PTransform { + private static final Logger LOG = LoggerFactory.getLogger(StatementOnlySqlTransform.class); + + private final List statements; + private final Map catalogs; + + StatementOnlySqlTransform() { + this.statements = new ArrayList<>(); + this.catalogs = new HashMap<>(); + } + + @Override + public PDone expand(PBegin input) { + if (LOG.isDebugEnabled()) { + LOG.debug("User statements:"); + for (String statement : statements) { + LOG.debug("{}\n", statement); + } + } + return PDone.in(input.getPipeline()); + } + + @Override + public void validate(@Nullable PipelineOptions options) { + Preconditions.checkArgument(!statements.isEmpty(), "No statement is provided for the SqlPtransform.."); + } + + /** + * Add any Flink SQL statement to this transform. Note that there must be a INSERT INTO + * statement. Otherwise, an exception will be thrown. + * + * @param statement the statement to be added. + * @return this {@link StatementOnlySqlTransform}. + */ + public StatementOnlySqlTransform addStatement(String statement) { + statements.add(cleanUp(statement)); + return this; + } + + /** + * Define add a new {@link Catalog} to be used by the SQL query. + * + * @param name the name of the catalog. + * @param catalog the catalog to use. + * @return this {@link MultiOutputSqlTransformWithInput} itself. + */ + public StatementOnlySqlTransform withCatalog(String name, SerializableCatalog catalog) { + catalogs.put(name, catalog); + return this; + } + + // --------------------- package private getters ----------------- + List getStatements() { + return Collections.unmodifiableList(statements); + } + + Map getCatalogs() { + return Collections.unmodifiableMap(catalogs); + } + + // --------------------- private helpers ------------------------ + private static String cleanUp(String s) { + return s.trim().endsWith(";") ? s : s + ";"; + } +} diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/FlinkSqlTestUtils.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/FlinkSqlTestUtils.java index 46b0aa2a35c8..9c60fc0903ec 100644 --- a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/FlinkSqlTestUtils.java +++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/FlinkSqlTestUtils.java @@ -28,6 +28,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.runners.flink.translation.types.TypeInformationCoder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.ValueProvider; @@ -75,6 +77,22 @@ public class FlinkSqlTestUtils { + ")", getFilePath("Orders")); + public static final String ORDERS_VERIFYING_SINK_2_DDL = + String.format("CREATE TABLE OrdersVerify2 (\n" + + " orderNumber BIGINT,\n" + + " product String,\n" + + " amount INT,\n" + + " price DECIMAL(8, 2),\n" + + " buyer STRING,\n" + + " orderTime TIMESTAMP(3)\n" + + ") WITH (\n" + + " 'connector' = '%s',\n" + + " '%s' = '%s'\n" + + ")", + VerifyingTableSinkFactory.IDENTIFIER, + VerifyingTableSinkFactory.EXPECTED_RESULT_FILE_PATH_OPTION.key(), + getFilePath("Orders")); + public static final String PRODUCTS_DDL = String.format( "CREATE TABLE Products (\n" @@ -131,17 +149,7 @@ public boolean isAccessible() { public static CatalogTable getOrdersCatalogTable() { // Create schema - ResolvedSchema resolvedSchema = - new ResolvedSchema( - Arrays.asList( - Column.physical("orderNumber", DataTypes.BIGINT()), - Column.physical("product", DataTypes.STRING()), - Column.physical("amount", DataTypes.INT()), - Column.physical("price", DataTypes.DOUBLE()), - Column.physical("buyer", DataTypes.STRING()), - Column.physical("orderTime", DataTypes.TIMESTAMP(3))), - Collections.emptyList(), - UniqueConstraint.primaryKey("UniqueProductName", Collections.singletonList("name"))); + ResolvedSchema resolvedSchema = getOrdersSchema(); Map connectorOptions = new HashMap<>(); connectorOptions.put(FactoryUtil.CONNECTOR.key(), "filesystem"); @@ -152,13 +160,53 @@ public static CatalogTable getOrdersCatalogTable() { final CatalogTable origin = CatalogTable.of( Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), - "Products Catalog Table", + "Orders Catalog Table", Collections.emptyList(), connectorOptions); return new ResolvedCatalogTable(origin, resolvedSchema); } + public static CatalogTable getOrdersVerifyCatalogTable() { + // Create schema + ResolvedSchema resolvedSchema = getOrdersSchema(); + + Map connectorOptions = new HashMap<>(); + connectorOptions.put(FactoryUtil.CONNECTOR.key(), VerifyingTableSinkFactory.IDENTIFIER); + connectorOptions.put(VerifyingTableSinkFactory.EXPECTED_RESULT_FILE_PATH_OPTION.key(), getFilePath("Orders")); + connectorOptions.put(VerifyingTableSinkFactory.HAS_HEADER_OPTION.key(), "true"); + + final CatalogTable origin = + CatalogTable.of( + Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), + "Orders Catalog Verify Table", + Collections.emptyList(), + connectorOptions); + + return new ResolvedCatalogTable(origin, resolvedSchema); + } + + public static FlinkPipelineOptions getPipelineOptions() { + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + options.setRunner(FlinkRunner.class); + options.setUseDataStreamForBatch(true); + options.setParallelism(2); + return options; + } + + private static ResolvedSchema getOrdersSchema() { + return new ResolvedSchema( + Arrays.asList( + Column.physical("orderNumber", DataTypes.BIGINT()), + Column.physical("product", DataTypes.STRING()), + Column.physical("amount", DataTypes.INT()), + Column.physical("price", DataTypes.DOUBLE()), + Column.physical("buyer", DataTypes.STRING()), + Column.physical("orderTime", DataTypes.TIMESTAMP(3))), + Collections.emptyList(), + UniqueConstraint.primaryKey("UniqueProductName", Collections.singletonList("name"))); + } + // -------------------- public classes ---------------------- public static class ToUpperCaseAndReplaceString extends ScalarFunction { diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/SqlTransformTest.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/SqlTransformTest.java index 3074bf673cc6..9c3ff695cb6e 100644 --- a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/SqlTransformTest.java +++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/SqlTransformTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.ORDER; import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.ORDERS_DDL; import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.PRODUCTS_DDL; +import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.getPipelineOptions; import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.getSingletonOrderPCollection; import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.getSingletonPCollection; import static org.junit.Assert.fail; @@ -31,7 +32,6 @@ import java.util.Set; import java.util.function.Consumer; import org.apache.beam.runners.flink.FlinkPipelineOptions; -import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.TextualIntegerCoder; import org.apache.beam.sdk.testing.PAssert; @@ -278,8 +278,8 @@ public void testOnlySetAdditionalInputForMultiOutputSqlTransform() { pipeline.run(getPipelineOptions()); } - @Test(expected = IllegalStateException.class) - public void testApplySqlToStreamingJobThrowException() { + @Test + public void testStreamingMode() throws IOException { Pipeline pipeline = Pipeline.create(); SingleOutputSqlTransform transform = SqlTransform.of(FlinkSqlTestUtils.Order.class) @@ -287,6 +287,8 @@ public void testApplySqlToStreamingJobThrowException() { .withQuery("SELECT orderNumber, product, amount, price, buyer, orderTime FROM Orders"); pipeline.apply(transform); + PCollection outputs = pipeline.apply(transform); + verifyRecords(outputs, "Orders", FlinkSqlTestUtils.Order.class); FlinkPipelineOptions options = getPipelineOptions(); options.setStreaming(true); pipeline.run(options); @@ -381,14 +383,6 @@ private void testUserDefinedFunction( // ---------------- private helper methods ----------------------- - private static FlinkPipelineOptions getPipelineOptions() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); - options.setRunner(FlinkRunner.class); - options.setUseDataStreamForBatch(true); - options.setParallelism(2); - return options; - } - private static void verifyRecords(PCollection pCollection, String file, Class clazz) throws IOException { PAssert.that(pCollection).containsInAnyOrder(getExpectedRecords(file, clazz)); diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransformTest.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransformTest.java new file mode 100644 index 000000000000..4710a0779c17 --- /dev/null +++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransformTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink.transform.sql; + +import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.ORDERS_VERIFYING_SINK_2_DDL; + +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.sdk.Pipeline; +import org.junit.Test; + + +/** + * Unit tests for {@link StatementOnlySqlTransform}. + */ +public class StatementOnlySqlTransformTest { + @Test + public void testBatch() { + testBasics(false); + } + + @Test + public void testStreaming() { + testBasics(true); + } + + @Test + public void testCreateCatalogViaDDL() { + Pipeline pipeline = Pipeline.create(); + StatementOnlySqlTransform transform = SqlTransform.ofStatements() + .addStatement( + String.format( + "CREATE CATALOG MyCatalog with ( 'type' = '%s' )", + TestingInMemCatalogFactory.IDENTIFIER)) + .addStatement("INSERT INTO MyCatalog.TestDatabase.OrdersVerify SELECT * FROM MyCatalog.TestDatabase.Orders;"); + + pipeline.apply(transform); + pipeline.run(getPipelineOptions()); + } + + @Test + public void testDDLAndMultipleInsertStatements() { + SerializableCatalog catalog = TestingInMemCatalogFactory.getCatalog("TestCatalog"); + + Pipeline pipeline = Pipeline.create(); + StatementOnlySqlTransform transform = SqlTransform.ofStatements() + .withCatalog("MyCatalog", catalog) + .addStatement(ORDERS_VERIFYING_SINK_2_DDL) + .addStatement("CREATE TEMPORARY VIEW MyView AS SELECT * FROM MyCatalog.TestDatabase.Orders;") + .addStatement("INSERT INTO MyCatalog.TestDatabase.OrdersVerify SELECT * FROM MyView;") + .addStatement("INSERT INTO OrdersVerify2 SELECT * FROM MyView;"); + + pipeline.apply(transform); + pipeline.run(getPipelineOptions()); + } + + @Test (expected = IllegalArgumentException.class) + public void testEmptyStatements() { + StatementOnlySqlTransform transform = SqlTransform.ofStatements(); + Pipeline pipeline = Pipeline.create(); + pipeline.apply(transform); + pipeline.run(getPipelineOptions()); + } + + // ---------------- + private void testBasics(boolean isStreaming) { + SerializableCatalog catalog = TestingInMemCatalogFactory.getCatalog("TestCatalog"); + + Pipeline pipeline = Pipeline.create(); + StatementOnlySqlTransform transform = SqlTransform.ofStatements() + .withCatalog("MyCatalog", catalog) + .addStatement("CREATE TEMPORARY VIEW MyView AS SELECT * FROM MyCatalog.TestDatabase.Orders;") + .addStatement("INSERT INTO MyCatalog.TestDatabase.OrdersVerify SELECT * FROM MyView;"); + + pipeline.apply(transform); + FlinkPipelineOptions options = getPipelineOptions(); + options.setStreaming(isStreaming); + pipeline.run(options); + } + + private FlinkPipelineOptions getPipelineOptions() { + FlinkPipelineOptions options = FlinkSqlTestUtils.getPipelineOptions(); + options.setParallelism(1); + return options; + } +} diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/TestingInMemCatalogFactory.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/TestingInMemCatalogFactory.java index fd6515cdda96..3d28b67677a0 100644 --- a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/TestingInMemCatalogFactory.java +++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/TestingInMemCatalogFactory.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.flink.transform.sql; import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.getOrdersCatalogTable; +import static org.apache.beam.runners.flink.transform.sql.FlinkSqlTestUtils.getOrdersVerifyCatalogTable; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; @@ -34,7 +35,14 @@ public class TestingInMemCatalogFactory implements CatalogFactory { public static TestingInMemCatalog getCatalog(String name) { TestingInMemCatalog catalog = new TestingInMemCatalog(name, DEFAULT_DATABASE_NAME); try { - catalog.createTable(new ObjectPath("TestDatabase", "Orders"), getOrdersCatalogTable(), true); + catalog.createTable( + new ObjectPath("TestDatabase", "Orders"), + getOrdersCatalogTable(), + true); + catalog.createTable( + new ObjectPath("TestDatabase", "OrdersVerify"), + getOrdersVerifyCatalogTable(), + true); } catch (TableAlreadyExistException | DatabaseNotExistException e) { throw new RuntimeException(e); } diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/VerifyingTableSinkFactory.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/VerifyingTableSinkFactory.java new file mode 100644 index 000000000000..f313174e5734 --- /dev/null +++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/VerifyingTableSinkFactory.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink.transform.sql; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.Serializable; +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Scanner; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkV2Provider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; + + +/** + * A table factory class that verifies the output of {@link StatementOnlySqlTransform}. + * It reads from a specified table file as the expected output, and then compare it + * with the records sent to this Sink. + */ +public class VerifyingTableSinkFactory implements DynamicTableSinkFactory, Serializable { + public static final String IDENTIFIER = "verifyingSink"; + public static final ConfigOption EXPECTED_RESULT_FILE_PATH_OPTION = + ConfigOptions + .key("expected.result.file.path") + .stringType() + .noDefaultValue(); + public static final ConfigOption HAS_HEADER_OPTION = + ConfigOptions + .key("has.header") + .booleanType() + .defaultValue(true); + + @Override + public DynamicTableSink createDynamicTableSink(Context tableSinkFactoryContext) { + // We need to make sure the sink is serializable. So we need to get rid of the non-serializable + // objects here and extract the information into serializable objects. + Configuration config = Configuration.fromMap(tableSinkFactoryContext.getCatalogTable().getOptions()); + ResolvedSchema schema = tableSinkFactoryContext.getCatalogTable().getResolvedSchema(); + List logicalTypes = schema + .getColumns() + .stream() + .map(c -> c.getDataType().getLogicalType()) + .collect(Collectors.toList()); + + final String expectedResultFilePath = config.get(EXPECTED_RESULT_FILE_PATH_OPTION); + final boolean hasHeader = config.get(HAS_HEADER_OPTION); + final List columnsToVerify = schema.getColumnNames(); + + return new SerializableDynamicTableSink() { + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return ChangelogMode.all(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider( + org.apache.flink.table.connector.sink.DynamicTableSink.Context tableSinkContext) { + return new SerializableSinkV2Provider() { + @Override + public Sink createSink() { + return new SerializableSink() { + @Override + public SinkWriter createWriter(InitContext sinkInitContext) throws IOException { + return new VerifyingSinkWriter(logicalTypes, expectedResultFilePath, hasHeader, columnsToVerify); + } + }; + } + }; + } + + @Override + public DynamicTableSink copy() { + return this; + } + + @Override + public String asSummaryString() { + return "VerifyingDynamicTableSink"; + } + }; + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.singleton(EXPECTED_RESULT_FILE_PATH_OPTION); + } + + @Override + public Set> optionalOptions() { + return Collections.singleton(HAS_HEADER_OPTION); + } + + private static class VerifyingSinkWriter implements SinkWriter, Serializable { + + private final List logicalTypes; + private final ExpectedResultTable expectedRows; + private final int[] indexToVerify; + private int rowNumber; + private VerifyingSinkWriter( + List logicalTypes, + String expectedRowsFile, + boolean hasHeaderLine, + List columnsToVerify) throws FileNotFoundException { + this.logicalTypes = logicalTypes; + this.rowNumber = 0; + this.expectedRows = readRowsFromFile(expectedRowsFile, hasHeaderLine); + + indexToVerify = new int[columnsToVerify.size()]; + for (int i = 0; i < columnsToVerify.size(); i++) { + indexToVerify[i] = Preconditions.checkStateNotNull(expectedRows.headerNameToIndex.get(columnsToVerify.get(i))); + } + } + + @Override + public void write(RowData element, org.apache.flink.api.connector.sink2.SinkWriter.Context sinkContext) + throws IOException, InterruptedException { + List actual = Splitter.on(',').splitToList( + element.toString().replace("+I(", "").replace(")", "")); + for (int index : indexToVerify) { + String expected = expectedRows.rows.get(rowNumber).get(index); + if (!compareValue(logicalTypes.get(index), expected, element, index)) { + throw new RuntimeException(String.format( + "The following Rows are unequal:\n expected: %s\n actual: %s\n", expected, actual)); + } + } + rowNumber++; + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + // No op. + } + + @Override + public void close() throws Exception { + if (rowNumber != expectedRows.rows.size()) { + throw new RuntimeException(String.format("Expect %d rows, but only received %d rows", + expectedRows.rows.size(), rowNumber)); + } + } + } + + private static boolean compareValue(LogicalType logicalType, String expected, RowData row, int index) { + if (logicalType instanceof TimestampType) { + LocalDateTime actual = row.getTimestamp(index, 3).toLocalDateTime(); + return LocalDateTime.parse(expected.replace(" ", "T")).equals(actual); + } else if (logicalType instanceof BigIntType) { + long actual = row.getLong(index); + return Long.parseLong(expected) == actual; + } else if (logicalType instanceof IntType) { + int actual = row.getInt(index); + return Integer.parseInt(expected) == actual; + } else if (logicalType instanceof DecimalType) { + BigDecimal expectedBigDecimal = BigDecimal.valueOf(Double.parseDouble(expected)); + return row.getDecimal(index, 8, 2).toBigDecimal().compareTo(expectedBigDecimal) == 0; + } else if (logicalType instanceof DoubleType) { + return row.getDouble(index) == Double.parseDouble(expected); + } else if (logicalType instanceof VarCharType) { + return row.getString(index).toString().equals(expected); + } else { + throw new RuntimeException("Unrecognized logical type." + logicalType); + } + } + + private static ExpectedResultTable readRowsFromFile(String path, boolean hasHeaderLine) throws FileNotFoundException { + ExpectedResultTable expected = new ExpectedResultTable(); + File file = new File(path); + try (Scanner scanner = new Scanner(file, Charsets.UTF_8.name())) { + boolean readHeaderLine = hasHeaderLine; + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + List columns = Splitter.on(',').splitToList(line); + if (readHeaderLine) { + for (int i = 0; i < columns.size(); i++) { + expected.headerNameToIndex.put(columns.get(i).replace("#", ""), i); + } + readHeaderLine = false; + } else { + expected.rows.add(columns); + } + } + return expected; + } + } + + interface SerializableDynamicTableSink extends DynamicTableSink, Serializable { + + } + + interface SerializableSinkV2Provider extends SinkV2Provider, Serializable { + + } + + interface SerializableSink extends Sink, Serializable { + + } + + private static class ExpectedResultTable { + Map headerNameToIndex = new HashMap<>(); + List> rows = new ArrayList<>(); + } +} diff --git a/runners/flink/1.15/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/runners/flink/1.15/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 1055b429e958..0a4103d5f6eb 100644 --- a/runners/flink/1.15/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/runners/flink/1.15/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -17,3 +17,4 @@ # org.apache.beam.runners.flink.transform.sql.TestingInMemCatalogFactory +org.apache.beam.runners.flink.transform.sql.VerifyingTableSinkFactory