Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating ReadToJdbc block to make it compatible. #1056

Merged
1 commit merged into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.auto.blocks;

import com.google.cloud.teleport.metadata.auto.Consumes;
import com.google.cloud.teleport.metadata.auto.Outputs;
import com.google.cloud.teleport.metadata.auto.TemplateTransform;
import com.google.cloud.teleport.v2.auto.blocks.RowToJSONString.RowToJSONStringOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.ToJson;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;

public class RowToJSONString implements TemplateTransform<RowToJSONStringOptions> {
public interface RowToJSONStringOptions extends PipelineOptions {}

@Consumes(Row.class)
@Outputs(String.class)
public PCollection<String> transform(PCollection<Row> input, RowToJSONStringOptions options) {
return input.apply("Convert Row to String", ToJson.of());
}

@Override
public Class<RowToJSONStringOptions> getOptionsClass() {
return RowToJSONStringOptions.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public interface ReadFromJdbcOptions extends PipelineOptions {
void setQuery(String query);

@TemplateParameter.KmsEncryptionKey(
order = 9,
order = 8,
optional = true,
description = "Google Cloud KMS key",
helpText =
Expand All @@ -112,4 +112,60 @@ public interface ReadFromJdbcOptions extends PipelineOptions {
String getKMSEncryptionKey();

void setKMSEncryptionKey(String keyName);

@TemplateParameter.Text(
order = 9,
optional = true,
groupName = "Source Parameters",
description = "The name of a column of numeric type that will be used for partitioning.",
helpText =
"If this parameter is provided (along with `table`), JdbcIO reads the table in parallel by executing multiple instances of the query on the same table (subquery) using ranges. Currently, only Long partition columns are supported.")
String getPartitionColumn();

void setPartitionColumn(String partitionColumn);

@TemplateParameter.Text(
order = 10,
optional = true,
groupName = "Source Parameters",
description = "Name of the table in the external database.",
helpText =
"Table to read from using partitions. This parameter also accepts a subquery in parentheses.",
example = "(select id, name from Person) as subq")
String getTable();

void setTable(String table);

@TemplateParameter.Integer(
order = 11,
optional = true,
groupName = "Source Parameters",
description = "The number of partitions.",
helpText =
"The number of partitions. This, along with the lower and upper bound, form partitions strides for generated WHERE clause expressions used to split the partition column evenly. When the input is less than 1, the number is set to 1.")
Integer getNumPartitions();

void setNumPartitions(Integer numPartitions);

@TemplateParameter.Long(
order = 12,
optional = true,
groupName = "Source Parameters",
description = "Lower bound of partition column.",
helpText =
"Lower bound used in the partition scheme. If not provided, it is automatically inferred by Beam (for the supported types)")
Long getLowerBound();

void setLowerBound(Long lowerBound);

@TemplateParameter.Long(
order = 13,
optional = true,
groupName = "Source Parameters",
description = "Upper bound of partition column",
helpText =
"Upper bound used in partition scheme. If not provided, it is automatically inferred by Beam (for the supported types)")
Long getUpperBound();

void setUpperBound(Long lowerBound);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,71 +20,51 @@
import com.google.cloud.teleport.metadata.auto.Outputs;
import com.google.cloud.teleport.metadata.auto.TemplateTransform;
import com.google.cloud.teleport.v2.auto.options.ReadFromJdbcOptions;
import java.sql.Clob;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.io.jdbc.SchemaUtil;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.PCollection;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.beam.sdk.values.Row;

public class ReadFromJdbc implements TemplateTransform<ReadFromJdbcOptions> {

/* Logger for class.*/
private static final Logger LOG = LoggerFactory.getLogger(ReadFromJdbc.class);
@Outputs(Row.class)
public PCollection<Row> read(Pipeline pipeline, ReadFromJdbcOptions options) {
JdbcIO.DataSourceConfiguration dataSourceConfig = buildDataSourceConfig(options);
Schema schema =
JdbcIO.ReadRows.inferBeamSchema(
JdbcIO.DataSourceProviderFromDataSourceConfiguration.of(dataSourceConfig).apply(null),
options.getQuery());

if (options.getPartitionColumn() != null && options.getTable() != null) {
// Read with Partitions
// TODO(pranavbhandari): Support readWithPartitions for other data types.
JdbcIO.ReadWithPartitions<Row, Long> readIO =
JdbcIO.<Row>readWithPartitions()
.withDataSourceConfiguration(dataSourceConfig)
.withTable(options.getTable())
.withPartitionColumn(options.getPartitionColumn())
.withRowMapper(SchemaUtil.BeamRowMapper.of(schema));
if (options.getNumPartitions() != null) {
readIO = readIO.withNumPartitions(options.getNumPartitions());
}
if (options.getLowerBound() != null && options.getUpperBound() != null) {
readIO =
readIO.withLowerBound(options.getLowerBound()).withUpperBound(options.getUpperBound());
}
return pipeline.apply("Read from JDBC with Partitions", readIO);
}

@Outputs(String.class)
public PCollection<String> read(Pipeline pipeline, ReadFromJdbcOptions options) {
return pipeline.apply(
"readFromJdbc",
JdbcIO.<String>read()
.withDataSourceConfiguration(buildDataSourceConfig(options))
JdbcIO.<Row>read()
.withDataSourceConfiguration(dataSourceConfig)
.withQuery(options.getQuery())
.withCoder(StringUtf8Coder.of())
.withRowMapper(new ResultSetToJSONString()));
}

/**
* {@link JdbcIO.RowMapper} implementation to convert Jdbc ResultSet rows to UTF-8 encoded JSONs.
*/
public static class ResultSetToJSONString implements JdbcIO.RowMapper<String> {

@Override
public String mapRow(ResultSet resultSet) throws Exception {
ResultSetMetaData metaData = resultSet.getMetaData();
JSONObject json = new JSONObject();

for (int i = 1; i <= metaData.getColumnCount(); i++) {
Object value = resultSet.getObject(i);

// JSONObject.put() does not support null values. The exception is JSONObject.NULL
if (value == null) {
json.put(metaData.getColumnLabel(i), JSONObject.NULL);
continue;
}

switch (metaData.getColumnTypeName(i).toLowerCase()) {
case "clob":
Clob clobObject = resultSet.getClob(i);
if (clobObject.length() > Integer.MAX_VALUE) {
LOG.warn(
"The Clob value size {} in column {} exceeds 2GB and will be truncated.",
clobObject.length(),
metaData.getColumnLabel(i));
}
json.put(
metaData.getColumnLabel(i), clobObject.getSubString(1, (int) clobObject.length()));
break;
default:
json.put(metaData.getColumnLabel(i), value);
}
}
return json.toString();
}
.withCoder(RowCoder.of(schema))
.withRowMapper(SchemaUtil.BeamRowMapper.of(schema)));
}

static JdbcIO.DataSourceConfiguration buildDataSourceConfig(ReadFromJdbcOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.teleport.metadata.auto.AutoTemplate;
import com.google.cloud.teleport.metadata.auto.Preprocessor;
import com.google.cloud.teleport.v2.auto.blocks.ReadFromJdbc;
import com.google.cloud.teleport.v2.auto.blocks.RowToJSONString;
import com.google.cloud.teleport.v2.auto.blocks.WriteToPubSub;
import org.apache.beam.sdk.options.PipelineOptions;

Expand All @@ -30,7 +31,7 @@
description =
"The Java Database Connectivity (JDBC) to Pub/Sub template is a batch pipeline that ingests data from "
+ "JDBC source and writes the resulting records to a pre-existing Pub/Sub topic as a JSON string.",
blocks = {ReadFromJdbc.class, WriteToPubSub.class},
blocks = {ReadFromJdbc.class, RowToJSONString.class, WriteToPubSub.class},
flexContainerName = "jdbc-to-pubsub-auto",
contactInformation = "https://cloud.google.com/support",
// TODO: replace the original template when we are ready to do it, and remove `hidden`.
Expand Down
Loading