From 68a06205daab79ab99b9fce51e3aa3f77853e9bc Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Thu, 12 Oct 2023 11:00:57 -0400 Subject: [PATCH 1/3] [yaml] Normalize JdbcIO Signed-off-by: Jeffrey Kinard --- .../jdbc/JdbcReadSchemaTransformProvider.java | 12 ++++++ .../JdbcWriteSchemaTransformProvider.java | 41 +++++++++++++++---- sdks/python/apache_beam/yaml/standard_io.yaml | 35 ++++++++++++++++ sdks/python/apache_beam/yaml/yaml_mapping.py | 2 + 4 files changed, 82 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index dbf12f35024a..c0510e178626 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -38,6 +38,9 @@ * An implementation of {@link org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider} for * reading from JDBC connections using {@link org.apache.beam.sdk.io.jdbc.JdbcIO}. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) @AutoService(SchemaTransformProvider.class) public class JdbcReadSchemaTransformProvider extends TypedSchemaTransformProvider< @@ -80,6 +83,11 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { dsConfig = dsConfig.withConnectionInitSqls(initialSql); } + String driverJars = config.getDriverJars(); + if (driverJars != null) { + dsConfig = dsConfig.withDriverJars(config.getDriverJars()); + } + return dsConfig; } @@ -152,6 +160,9 @@ public abstract static class JdbcReadSchemaTransformConfiguration implements Ser @Nullable public abstract Boolean getOutputParallelization(); + @Nullable + public abstract String getDriverJars(); + public void validate() throws IllegalArgumentException { if (Strings.isNullOrEmpty(getDriverClassName())) { throw new IllegalArgumentException("JDBC Driver class name cannot be blank."); @@ -198,6 +209,7 @@ public abstract static class Builder { public abstract Builder setFetchSize(Short value); public abstract Builder setOutputParallelization(Boolean value); + public abstract Builder setDriverJars(String value); public abstract JdbcReadSchemaTransformConfiguration build(); } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index cb9d79631ca8..8dcfa28b0866 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -19,16 +19,15 @@ import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; -import java.io.Serializable; -import java.util.Collections; -import java.util.List; -import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; @@ -36,10 +35,18 @@ import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import javax.annotation.Nullable; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + /** * An implementation of {@link org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider} for * writing to a JDBC connections using {@link org.apache.beam.sdk.io.jdbc.JdbcIO}. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) @AutoService(SchemaTransformProvider.class) public class JdbcWriteSchemaTransformProvider extends TypedSchemaTransformProvider< @@ -82,6 +89,11 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { dsConfig = dsConfig.withConnectionInitSqls(initialSql); } + String driverJars = config.getDriverJars(); + if (driverJars != null) { + dsConfig = dsConfig.withDriverJars(config.getDriverJars()); + } + return dsConfig; } @@ -101,19 +113,27 @@ protected String writeStatement(Schema schema) { } } + private static class NoOutputDoFn extends DoFn { + @ProcessElement + public void process(ProcessContext c) {} + } + @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - JdbcIO.Write writeRows = + JdbcIO.WriteVoid writeRows = JdbcIO.write() .withDataSourceConfiguration(dataSourceConfiguration()) .withStatement(writeStatement(input.get("input").getSchema())) - .withPreparedStatementSetter(new JdbcUtil.BeamRowPreparedStatementSetter()); + .withPreparedStatementSetter(new JdbcUtil.BeamRowPreparedStatementSetter()) + .withResults(); Boolean autosharding = config.getAutosharding(); if (autosharding != null && autosharding) { writeRows = writeRows.withAutoSharding(); } - input.get("input").apply(writeRows); - return PCollectionRowTuple.empty(input.getPipeline()); + PCollection postWrite = input.get("input").apply(writeRows) + .apply("post-write", ParDo.of(new NoOutputDoFn<>())) + .setRowSchema(Schema.of()); + return PCollectionRowTuple.of("post_write", postWrite); } } @@ -164,6 +184,9 @@ public abstract static class JdbcWriteSchemaTransformConfiguration implements Se @Nullable public abstract Boolean getAutosharding(); + @Nullable + public abstract String getDriverJars(); + public void validate() throws IllegalArgumentException { if (Strings.isNullOrEmpty(getDriverClassName())) { throw new IllegalArgumentException("JDBC Driver class name cannot be blank."); @@ -211,6 +234,8 @@ public abstract Builder setConnectionInitSql( public abstract Builder setAutosharding(Boolean value); + public abstract Builder setDriverJars(String value); + public abstract JdbcWriteSchemaTransformConfiguration build(); } } diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index c4748483b04b..5045ecd2b988 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -112,3 +112,38 @@ 'WriteToJson': 'beam:schematransform:org.apache.beam:json_write:v1' config: gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar' + +- type: renaming + transforms: + 'ReadFromJdbc': 'ReadFromJdbc' + 'WriteToJdbc': 'WriteToJdbc' + config: + mappings: + 'ReadFromJdbc': + driver_class_name: 'driverClassName' + jdbc_url: 'jdbcUrl' + username: 'username' + password: 'password' + table_name: 'location' + read_query: 'readQuery' + num_rows: 'fetchSize' + driver_jars: 'driverJars' + connection_properties: 'connectionProperties' + connection_init_sql: 'connectionInitSql' + 'WriteToJdbc': + driver_class_name: 'driverClassName' + jdbc_url: 'jdbcUrl' + username: 'username' + password: 'password' + table_name: 'location' + write_statement: 'writeStatement' + driver_jars: 'driverJars' + connection_properties: 'connectionProperties' + connection_init_sql: 'connectionInitSql' + underlying_provider: + type: beamJar + transforms: + 'ReadFromJdbc': 'beam:schematransform:org.apache.beam:jdbc_read:v1' + 'WriteToJdbc': 'beam:schematransform:org.apache.beam:jdbc_write:v1' + config: + gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar' diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 889f7f1ee309..2c7272443c16 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -114,6 +114,8 @@ def _expand_python_mapping_func( for name in original_fields if name in expression ] + [' return (' + expression + ')']) + return python_callable.PythonCallableWithSource(source) + else: source = callable From 6b21c4cba95f1181faf84c065f8e229909438669 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Fri, 20 Oct 2023 12:51:52 -0400 Subject: [PATCH 2/3] address initial comments Signed-off-by: Jeffrey Kinard --- .../jdbc/JdbcReadSchemaTransformProvider.java | 3 ++- .../JdbcWriteSchemaTransformProvider.java | 20 ++++++++++--------- sdks/python/apache_beam/yaml/standard_io.yaml | 12 +++++------ sdks/python/apache_beam/yaml/yaml_mapping.py | 2 -- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index c0510e178626..3b504b1a90d4 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -39,7 +39,7 @@ * reading from JDBC connections using {@link org.apache.beam.sdk.io.jdbc.JdbcIO}. */ @SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) @AutoService(SchemaTransformProvider.class) public class JdbcReadSchemaTransformProvider @@ -209,6 +209,7 @@ public abstract static class Builder { public abstract Builder setFetchSize(Short value); public abstract Builder setOutputParallelization(Boolean value); + public abstract Builder setDriverJars(String value); public abstract JdbcReadSchemaTransformConfiguration build(); diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index 8dcfa28b0866..cfe7264b4d7d 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -19,6 +19,10 @@ import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; @@ -35,17 +39,12 @@ import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; -import javax.annotation.Nullable; -import java.io.Serializable; -import java.util.Collections; -import java.util.List; - /** * An implementation of {@link org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider} for * writing to a JDBC connections using {@link org.apache.beam.sdk.io.jdbc.JdbcIO}. */ @SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) @AutoService(SchemaTransformProvider.class) public class JdbcWriteSchemaTransformProvider @@ -130,9 +129,12 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { if (autosharding != null && autosharding) { writeRows = writeRows.withAutoSharding(); } - PCollection postWrite = input.get("input").apply(writeRows) - .apply("post-write", ParDo.of(new NoOutputDoFn<>())) - .setRowSchema(Schema.of()); + PCollection postWrite = + input + .get("input") + .apply(writeRows) + .apply("post-write", ParDo.of(new NoOutputDoFn<>())) + .setRowSchema(Schema.of()); return PCollectionRowTuple.of("post_write", postWrite); } } diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 5045ecd2b988..d796ecc28bc7 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -121,22 +121,20 @@ mappings: 'ReadFromJdbc': driver_class_name: 'driverClassName' - jdbc_url: 'jdbcUrl' + url: 'jdbcUrl' username: 'username' password: 'password' - table_name: 'location' - read_query: 'readQuery' - num_rows: 'fetchSize' + table: 'location' + query: 'readQuery' driver_jars: 'driverJars' connection_properties: 'connectionProperties' connection_init_sql: 'connectionInitSql' 'WriteToJdbc': driver_class_name: 'driverClassName' - jdbc_url: 'jdbcUrl' + url: 'jdbcUrl' username: 'username' password: 'password' - table_name: 'location' - write_statement: 'writeStatement' + table: 'location' driver_jars: 'driverJars' connection_properties: 'connectionProperties' connection_init_sql: 'connectionInitSql' diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 2c7272443c16..889f7f1ee309 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -114,8 +114,6 @@ def _expand_python_mapping_func( for name in original_fields if name in expression ] + [' return (' + expression + ')']) - return python_callable.PythonCallableWithSource(source) - else: source = callable From d5ac56c4eabec328182b6be29780404568479f2a Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Wed, 25 Oct 2023 13:54:52 -0400 Subject: [PATCH 3/3] add fieldnames to jdbc insert Signed-off-by: Jeffrey Kinard --- .../beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index cfe7264b4d7d..e9f67969626e 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -103,7 +103,9 @@ protected String writeStatement(Schema schema) { } else { StringBuilder statement = new StringBuilder("INSERT INTO "); statement.append(config.getLocation()); - statement.append(" VALUES("); + statement.append(" ("); + statement.append(String.join(", ", schema.getFieldNames())); + statement.append(") VALUES("); for (int i = 0; i < schema.getFieldCount() - 1; i++) { statement.append("?, "); }