Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[yaml] Normalize JdbcIO #28971

Merged
merged 3 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
* An implementation of {@link org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider} for
* reading from JDBC connections using {@link org.apache.beam.sdk.io.jdbc.JdbcIO}.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
@AutoService(SchemaTransformProvider.class)
public class JdbcReadSchemaTransformProvider
extends TypedSchemaTransformProvider<
Expand Down Expand Up @@ -80,6 +83,11 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
dsConfig = dsConfig.withConnectionInitSqls(initialSql);
}

String driverJars = config.getDriverJars();
if (driverJars != null) {
dsConfig = dsConfig.withDriverJars(config.getDriverJars());
}

return dsConfig;
}

Expand Down Expand Up @@ -152,6 +160,9 @@ public abstract static class JdbcReadSchemaTransformConfiguration implements Ser
@Nullable
public abstract Boolean getOutputParallelization();

@Nullable
public abstract String getDriverJars();

public void validate() throws IllegalArgumentException {
if (Strings.isNullOrEmpty(getDriverClassName())) {
throw new IllegalArgumentException("JDBC Driver class name cannot be blank.");
Expand Down Expand Up @@ -199,6 +210,8 @@ public abstract static class Builder {

public abstract Builder setOutputParallelization(Boolean value);

public abstract Builder setDriverJars(String value);

public abstract JdbcReadSchemaTransformConfiguration build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
Expand All @@ -40,6 +43,9 @@
* An implementation of {@link org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider} for
* writing to a JDBC connections using {@link org.apache.beam.sdk.io.jdbc.JdbcIO}.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
@AutoService(SchemaTransformProvider.class)
public class JdbcWriteSchemaTransformProvider
extends TypedSchemaTransformProvider<
Expand Down Expand Up @@ -82,6 +88,11 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
dsConfig = dsConfig.withConnectionInitSqls(initialSql);
}

String driverJars = config.getDriverJars();
if (driverJars != null) {
dsConfig = dsConfig.withDriverJars(config.getDriverJars());
}

return dsConfig;
}

Expand All @@ -92,7 +103,9 @@ protected String writeStatement(Schema schema) {
} else {
StringBuilder statement = new StringBuilder("INSERT INTO ");
statement.append(config.getLocation());
statement.append(" VALUES(");
statement.append(" (");
statement.append(String.join(", ", schema.getFieldNames()));
statement.append(") VALUES(");
for (int i = 0; i < schema.getFieldCount() - 1; i++) {
statement.append("?, ");
}
Expand All @@ -101,19 +114,30 @@ protected String writeStatement(Schema schema) {
}
}

private static class NoOutputDoFn<T> extends DoFn<T, Row> {
@ProcessElement
public void process(ProcessContext c) {}
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
JdbcIO.Write<Row> writeRows =
JdbcIO.WriteVoid<Row> writeRows =
JdbcIO.<Row>write()
.withDataSourceConfiguration(dataSourceConfiguration())
.withStatement(writeStatement(input.get("input").getSchema()))
.withPreparedStatementSetter(new JdbcUtil.BeamRowPreparedStatementSetter());
.withPreparedStatementSetter(new JdbcUtil.BeamRowPreparedStatementSetter())
.withResults();
Boolean autosharding = config.getAutosharding();
if (autosharding != null && autosharding) {
writeRows = writeRows.withAutoSharding();
}
input.get("input").apply(writeRows);
return PCollectionRowTuple.empty(input.getPipeline());
PCollection<Row> postWrite =
input
.get("input")
.apply(writeRows)
.apply("post-write", ParDo.of(new NoOutputDoFn<>()))
.setRowSchema(Schema.of());
return PCollectionRowTuple.of("post_write", postWrite);
}
}

Expand Down Expand Up @@ -164,6 +188,9 @@ public abstract static class JdbcWriteSchemaTransformConfiguration implements Se
@Nullable
public abstract Boolean getAutosharding();

@Nullable
public abstract String getDriverJars();

public void validate() throws IllegalArgumentException {
if (Strings.isNullOrEmpty(getDriverClassName())) {
throw new IllegalArgumentException("JDBC Driver class name cannot be blank.");
Expand Down Expand Up @@ -211,6 +238,8 @@ public abstract Builder setConnectionInitSql(

public abstract Builder setAutosharding(Boolean value);

public abstract Builder setDriverJars(String value);

public abstract JdbcWriteSchemaTransformConfiguration build();
}
}
Expand Down
33 changes: 33 additions & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,36 @@
'WriteToJson': 'beam:schematransform:org.apache.beam:json_write:v1'
config:
gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar'

- type: renaming
transforms:
'ReadFromJdbc': 'ReadFromJdbc'
'WriteToJdbc': 'WriteToJdbc'
config:
mappings:
'ReadFromJdbc':
driver_class_name: 'driverClassName'
url: 'jdbcUrl'
username: 'username'
password: 'password'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should seriously think about if there's a better way to store these than in plain text...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's how the transform works today, but I can add a FR for KMS/Secret Manager support. Unless you think it's not worth offering without that support?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to a FR at least.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also tie in with the ability to templatize things too.

table: 'location'
query: 'readQuery'
driver_jars: 'driverJars'
connection_properties: 'connectionProperties'
connection_init_sql: 'connectionInitSql'
'WriteToJdbc':
driver_class_name: 'driverClassName'
url: 'jdbcUrl'
username: 'username'
password: 'password'
table: 'location'
driver_jars: 'driverJars'
connection_properties: 'connectionProperties'
connection_init_sql: 'connectionInitSql'
underlying_provider:
type: beamJar
transforms:
'ReadFromJdbc': 'beam:schematransform:org.apache.beam:jdbc_read:v1'
'WriteToJdbc': 'beam:schematransform:org.apache.beam:jdbc_write:v1'
config:
gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar'
Loading