diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/auto/blocks/RowToJSONString.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/auto/blocks/RowToJSONString.java new file mode 100644 index 0000000000..7bbc235ffd --- /dev/null +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/auto/blocks/RowToJSONString.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.auto.blocks; + +import com.google.cloud.teleport.metadata.auto.Consumes; +import com.google.cloud.teleport.metadata.auto.Outputs; +import com.google.cloud.teleport.metadata.auto.TemplateTransform; +import com.google.cloud.teleport.v2.auto.blocks.RowToJSONString.RowToJSONStringOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.ToJson; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; + +public class RowToJSONString implements TemplateTransform { + public interface RowToJSONStringOptions extends PipelineOptions {} + + @Consumes(Row.class) + @Outputs(String.class) + public PCollection transform(PCollection input, RowToJSONStringOptions options) { + return input.apply("Convert Row to String", ToJson.of()); + } + + @Override + public Class getOptionsClass() { + return RowToJSONStringOptions.class; + } +} diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/auto/options/ReadFromJdbcOptions.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/auto/options/ReadFromJdbcOptions.java index fe895d30cd..5d3207a045 100644 --- a/v2/common/src/main/java/com/google/cloud/teleport/v2/auto/options/ReadFromJdbcOptions.java +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/auto/options/ReadFromJdbcOptions.java @@ -103,7 +103,7 @@ public interface ReadFromJdbcOptions extends PipelineOptions { void setQuery(String query); @TemplateParameter.KmsEncryptionKey( - order = 9, + order = 8, optional = true, description = "Google Cloud KMS key", helpText = @@ -112,4 +112,60 @@ public interface ReadFromJdbcOptions extends PipelineOptions { String getKMSEncryptionKey(); void setKMSEncryptionKey(String keyName); + + @TemplateParameter.Text( + order = 9, + optional = true, + groupName = "Source Parameters", + description = "The name of a column of numeric type that will be used for partitioning.", + helpText = + "If this parameter is provided (along with `table`), JdbcIO reads the table in parallel by executing multiple instances of the query on the same table (subquery) using ranges. Currently, only Long partition columns are supported.") + String getPartitionColumn(); + + void setPartitionColumn(String partitionColumn); + + @TemplateParameter.Text( + order = 10, + optional = true, + groupName = "Source Parameters", + description = "Name of the table in the external database.", + helpText = + "Table to read from using partitions. This parameter also accepts a subquery in parentheses.", + example = "(select id, name from Person) as subq") + String getTable(); + + void setTable(String table); + + @TemplateParameter.Integer( + order = 11, + optional = true, + groupName = "Source Parameters", + description = "The number of partitions.", + helpText = + "The number of partitions. This, along with the lower and upper bound, form partitions strides for generated WHERE clause expressions used to split the partition column evenly. When the input is less than 1, the number is set to 1.") + Integer getNumPartitions(); + + void setNumPartitions(Integer numPartitions); + + @TemplateParameter.Long( + order = 12, + optional = true, + groupName = "Source Parameters", + description = "Lower bound of partition column.", + helpText = + "Lower bound used in the partition scheme. If not provided, it is automatically inferred by Beam (for the supported types)") + Long getLowerBound(); + + void setLowerBound(Long lowerBound); + + @TemplateParameter.Long( + order = 13, + optional = true, + groupName = "Source Parameters", + description = "Upper bound of partition column", + helpText = + "Upper bound used in partition scheme. If not provided, it is automatically inferred by Beam (for the supported types)") + Long getUpperBound(); + + void setUpperBound(Long lowerBound); } diff --git a/v2/jdbc-common/src/main/java/com/google/cloud/teleport/v2/auto/blocks/ReadFromJdbc.java b/v2/jdbc-common/src/main/java/com/google/cloud/teleport/v2/auto/blocks/ReadFromJdbc.java index af95c828d6..ac66207356 100644 --- a/v2/jdbc-common/src/main/java/com/google/cloud/teleport/v2/auto/blocks/ReadFromJdbc.java +++ b/v2/jdbc-common/src/main/java/com/google/cloud/teleport/v2/auto/blocks/ReadFromJdbc.java @@ -20,71 +20,51 @@ import com.google.cloud.teleport.metadata.auto.Outputs; import com.google.cloud.teleport.metadata.auto.TemplateTransform; import com.google.cloud.teleport.v2.auto.options.ReadFromJdbcOptions; -import java.sql.Clob; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.io.jdbc.JdbcIO; +import org.apache.beam.sdk.io.jdbc.SchemaUtil; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.PCollection; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.beam.sdk.values.Row; public class ReadFromJdbc implements TemplateTransform { - /* Logger for class.*/ - private static final Logger LOG = LoggerFactory.getLogger(ReadFromJdbc.class); + @Outputs(Row.class) + public PCollection read(Pipeline pipeline, ReadFromJdbcOptions options) { + JdbcIO.DataSourceConfiguration dataSourceConfig = buildDataSourceConfig(options); + Schema schema = + JdbcIO.ReadRows.inferBeamSchema( + JdbcIO.DataSourceProviderFromDataSourceConfiguration.of(dataSourceConfig).apply(null), + options.getQuery()); + + if (options.getPartitionColumn() != null && options.getTable() != null) { + // Read with Partitions + // TODO(pranavbhandari): Support readWithPartitions for other data types. + JdbcIO.ReadWithPartitions readIO = + JdbcIO.readWithPartitions() + .withDataSourceConfiguration(dataSourceConfig) + .withTable(options.getTable()) + .withPartitionColumn(options.getPartitionColumn()) + .withRowMapper(SchemaUtil.BeamRowMapper.of(schema)); + if (options.getNumPartitions() != null) { + readIO = readIO.withNumPartitions(options.getNumPartitions()); + } + if (options.getLowerBound() != null && options.getUpperBound() != null) { + readIO = + readIO.withLowerBound(options.getLowerBound()).withUpperBound(options.getUpperBound()); + } + return pipeline.apply("Read from JDBC with Partitions", readIO); + } - @Outputs(String.class) - public PCollection read(Pipeline pipeline, ReadFromJdbcOptions options) { return pipeline.apply( "readFromJdbc", - JdbcIO.read() - .withDataSourceConfiguration(buildDataSourceConfig(options)) + JdbcIO.read() + .withDataSourceConfiguration(dataSourceConfig) .withQuery(options.getQuery()) - .withCoder(StringUtf8Coder.of()) - .withRowMapper(new ResultSetToJSONString())); - } - - /** - * {@link JdbcIO.RowMapper} implementation to convert Jdbc ResultSet rows to UTF-8 encoded JSONs. - */ - public static class ResultSetToJSONString implements JdbcIO.RowMapper { - - @Override - public String mapRow(ResultSet resultSet) throws Exception { - ResultSetMetaData metaData = resultSet.getMetaData(); - JSONObject json = new JSONObject(); - - for (int i = 1; i <= metaData.getColumnCount(); i++) { - Object value = resultSet.getObject(i); - - // JSONObject.put() does not support null values. The exception is JSONObject.NULL - if (value == null) { - json.put(metaData.getColumnLabel(i), JSONObject.NULL); - continue; - } - - switch (metaData.getColumnTypeName(i).toLowerCase()) { - case "clob": - Clob clobObject = resultSet.getClob(i); - if (clobObject.length() > Integer.MAX_VALUE) { - LOG.warn( - "The Clob value size {} in column {} exceeds 2GB and will be truncated.", - clobObject.length(), - metaData.getColumnLabel(i)); - } - json.put( - metaData.getColumnLabel(i), clobObject.getSubString(1, (int) clobObject.length())); - break; - default: - json.put(metaData.getColumnLabel(i), value); - } - } - return json.toString(); - } + .withCoder(RowCoder.of(schema)) + .withRowMapper(SchemaUtil.BeamRowMapper.of(schema))); } static JdbcIO.DataSourceConfiguration buildDataSourceConfig(ReadFromJdbcOptions options) { diff --git a/v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToPubSubAuto.java b/v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToPubSubAuto.java index f06caf30e9..9e9840ebbe 100644 --- a/v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToPubSubAuto.java +++ b/v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToPubSubAuto.java @@ -20,6 +20,7 @@ import com.google.cloud.teleport.metadata.auto.AutoTemplate; import com.google.cloud.teleport.metadata.auto.Preprocessor; import com.google.cloud.teleport.v2.auto.blocks.ReadFromJdbc; +import com.google.cloud.teleport.v2.auto.blocks.RowToJSONString; import com.google.cloud.teleport.v2.auto.blocks.WriteToPubSub; import org.apache.beam.sdk.options.PipelineOptions; @@ -30,7 +31,7 @@ description = "The Java Database Connectivity (JDBC) to Pub/Sub template is a batch pipeline that ingests data from " + "JDBC source and writes the resulting records to a pre-existing Pub/Sub topic as a JSON string.", - blocks = {ReadFromJdbc.class, WriteToPubSub.class}, + blocks = {ReadFromJdbc.class, RowToJSONString.class, WriteToPubSub.class}, flexContainerName = "jdbc-to-pubsub-auto", contactInformation = "https://cloud.google.com/support", // TODO: replace the original template when we are ready to do it, and remove `hidden`.