diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index dbf12f35024a..3b504b1a90d4 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -38,6 +38,9 @@ * An implementation of {@link org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider} for * reading from JDBC connections using {@link org.apache.beam.sdk.io.jdbc.JdbcIO}. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) @AutoService(SchemaTransformProvider.class) public class JdbcReadSchemaTransformProvider extends TypedSchemaTransformProvider< @@ -80,6 +83,11 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { dsConfig = dsConfig.withConnectionInitSqls(initialSql); } + String driverJars = config.getDriverJars(); + if (driverJars != null) { + dsConfig = dsConfig.withDriverJars(config.getDriverJars()); + } + return dsConfig; } @@ -152,6 +160,9 @@ public abstract static class JdbcReadSchemaTransformConfiguration implements Ser @Nullable public abstract Boolean getOutputParallelization(); + @Nullable + public abstract String getDriverJars(); + public void validate() throws IllegalArgumentException { if (Strings.isNullOrEmpty(getDriverClassName())) { throw new IllegalArgumentException("JDBC Driver class name cannot be blank."); @@ -199,6 +210,8 @@ public abstract static class Builder { public abstract Builder setOutputParallelization(Boolean value); + public abstract Builder setDriverJars(String value); + public abstract JdbcReadSchemaTransformConfiguration build(); } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index cb9d79631ca8..e9f67969626e 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -29,6 +29,9 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; @@ -40,6 +43,9 @@ * An implementation of {@link org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider} for * writing to a JDBC connections using {@link org.apache.beam.sdk.io.jdbc.JdbcIO}. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) @AutoService(SchemaTransformProvider.class) public class JdbcWriteSchemaTransformProvider extends TypedSchemaTransformProvider< @@ -82,6 +88,11 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { dsConfig = dsConfig.withConnectionInitSqls(initialSql); } + String driverJars = config.getDriverJars(); + if (driverJars != null) { + dsConfig = dsConfig.withDriverJars(config.getDriverJars()); + } + return dsConfig; } @@ -92,7 +103,9 @@ protected String writeStatement(Schema schema) { } else { StringBuilder statement = new StringBuilder("INSERT INTO "); statement.append(config.getLocation()); - statement.append(" VALUES("); + statement.append(" ("); + statement.append(String.join(", ", schema.getFieldNames())); + statement.append(") VALUES("); for (int i = 0; i < schema.getFieldCount() - 1; i++) { statement.append("?, "); } @@ -101,19 +114,30 @@ protected String writeStatement(Schema schema) { } } + private static class NoOutputDoFn extends DoFn { + @ProcessElement + public void process(ProcessContext c) {} + } + @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - JdbcIO.Write writeRows = + JdbcIO.WriteVoid writeRows = JdbcIO.write() .withDataSourceConfiguration(dataSourceConfiguration()) .withStatement(writeStatement(input.get("input").getSchema())) - .withPreparedStatementSetter(new JdbcUtil.BeamRowPreparedStatementSetter()); + .withPreparedStatementSetter(new JdbcUtil.BeamRowPreparedStatementSetter()) + .withResults(); Boolean autosharding = config.getAutosharding(); if (autosharding != null && autosharding) { writeRows = writeRows.withAutoSharding(); } - input.get("input").apply(writeRows); - return PCollectionRowTuple.empty(input.getPipeline()); + PCollection postWrite = + input + .get("input") + .apply(writeRows) + .apply("post-write", ParDo.of(new NoOutputDoFn<>())) + .setRowSchema(Schema.of()); + return PCollectionRowTuple.of("post_write", postWrite); } } @@ -164,6 +188,9 @@ public abstract static class JdbcWriteSchemaTransformConfiguration implements Se @Nullable public abstract Boolean getAutosharding(); + @Nullable + public abstract String getDriverJars(); + public void validate() throws IllegalArgumentException { if (Strings.isNullOrEmpty(getDriverClassName())) { throw new IllegalArgumentException("JDBC Driver class name cannot be blank."); @@ -211,6 +238,8 @@ public abstract Builder setConnectionInitSql( public abstract Builder setAutosharding(Boolean value); + public abstract Builder setDriverJars(String value); + public abstract JdbcWriteSchemaTransformConfiguration build(); } } diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index c4748483b04b..d796ecc28bc7 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -112,3 +112,36 @@ 'WriteToJson': 'beam:schematransform:org.apache.beam:json_write:v1' config: gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar' + +- type: renaming + transforms: + 'ReadFromJdbc': 'ReadFromJdbc' + 'WriteToJdbc': 'WriteToJdbc' + config: + mappings: + 'ReadFromJdbc': + driver_class_name: 'driverClassName' + url: 'jdbcUrl' + username: 'username' + password: 'password' + table: 'location' + query: 'readQuery' + driver_jars: 'driverJars' + connection_properties: 'connectionProperties' + connection_init_sql: 'connectionInitSql' + 'WriteToJdbc': + driver_class_name: 'driverClassName' + url: 'jdbcUrl' + username: 'username' + password: 'password' + table: 'location' + driver_jars: 'driverJars' + connection_properties: 'connectionProperties' + connection_init_sql: 'connectionInitSql' + underlying_provider: + type: beamJar + transforms: + 'ReadFromJdbc': 'beam:schematransform:org.apache.beam:jdbc_read:v1' + 'WriteToJdbc': 'beam:schematransform:org.apache.beam:jdbc_write:v1' + config: + gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar'