diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java
index 76440b1ebf1a..0bcf6e0c4f75 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java
@@ -76,40 +76,34 @@ public String description() {
+ "\n"
+ "Example configuration for performing a read using a SQL query: ::\n"
+ "\n"
- + " pipeline:\n"
- + " transforms:\n"
- + " - type: ReadFromSpanner\n"
- + " config:\n"
- + " instance_id: 'my-instance-id'\n"
- + " database_id: 'my-database'\n"
- + " query: 'SELECT * FROM table'\n"
+ + " - type: ReadFromSpanner\n"
+ + " config:\n"
+ + " instance_id: 'my-instance-id'\n"
+ + " database_id: 'my-database'\n"
+ + " query: 'SELECT * FROM table'\n"
+ "\n"
+ "It is also possible to read a table by specifying a table name and a list of columns. For "
+ "example, the following configuration will perform a read on an entire table: ::\n"
+ "\n"
- + " pipeline:\n"
- + " transforms:\n"
- + " - type: ReadFromSpanner\n"
- + " config:\n"
- + " instance_id: 'my-instance-id'\n"
- + " database_id: 'my-database'\n"
- + " table: 'my-table'\n"
- + " columns: ['col1', 'col2']\n"
+ + " - type: ReadFromSpanner\n"
+ + " config:\n"
+ + " instance_id: 'my-instance-id'\n"
+ + " database_id: 'my-database'\n"
+ + " table: 'my-table'\n"
+ + " columns: ['col1', 'col2']\n"
+ "\n"
+ "Additionally, to read using a "
+ "Secondary Index, specify the index name: ::"
+ "\n"
- + " pipeline:\n"
- + " transforms:\n"
- + " - type: ReadFromSpanner\n"
- + " config:\n"
- + " instance_id: 'my-instance-id'\n"
- + " database_id: 'my-database'\n"
- + " table: 'my-table'\n"
- + " index: 'my-index'\n"
- + " columns: ['col1', 'col2']\n"
+ + " - type: ReadFromSpanner\n"
+ + " config:\n"
+ + " instance_id: 'my-instance-id'\n"
+ + " database_id: 'my-database'\n"
+ + " table: 'my-table'\n"
+ + " index: 'my-index'\n"
+ + " columns: ['col1', 'col2']\n"
+ "\n"
- + "### Advanced Usage\n"
+ + "#### Advanced Usage\n"
+ "\n"
+ "Reads by default use the "
+ "PartitionQuery API which enforces some limitations on the type of queries that can be used so that "
@@ -118,12 +112,10 @@ public String description() {
+ "\n"
+ "For example: ::"
+ "\n"
- + " pipeline:\n"
- + " transforms:\n"
- + " - type: ReadFromSpanner\n"
- + " config:\n"
- + " batching: false\n"
- + " ...\n"
+ + " - type: ReadFromSpanner\n"
+ + " config:\n"
+ + " batching: false\n"
+ + " ...\n"
+ "\n"
+ "Note: See "
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java
index 8601da09ea09..61955f448c3f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java
@@ -84,14 +84,12 @@ public String description() {
+ "\n"
+ "Example configuration for performing a write to a single table: ::\n"
+ "\n"
- + " pipeline:\n"
- + " transforms:\n"
- + " - type: ReadFromSpanner\n"
- + " config:\n"
- + " project_id: 'my-project-id'\n"
- + " instance_id: 'my-instance-id'\n"
- + " database_id: 'my-database'\n"
- + " table: 'my-table'\n"
+ + " - type: ReadFromSpanner\n"
+ + " config:\n"
+ + " project_id: 'my-project-id'\n"
+ + " instance_id: 'my-instance-id'\n"
+ + " database_id: 'my-database'\n"
+ + " table: 'my-table'\n"
+ "\n"
+ "Note: See "
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
index 435bfc138b5b..4056b2518ac1 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
@@ -28,6 +28,7 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
@@ -49,33 +50,174 @@ public class JdbcReadSchemaTransformProvider
extends TypedSchemaTransformProvider<
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration> {
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:jdbc_read:v1";
+ }
+
+ @Override
+ public String description() {
+ return baseDescription("JDBC")
+ + "\n"
+ + "This transform can be used to read from a JDBC source using either a given JDBC driver jar "
+ + "and class name, or by using one of the default packaged drivers given a `jdbc_type`.\n"
+ + "\n"
+ + "#### Using a default driver\n"
+ + "\n"
+ + "This transform comes packaged with drivers for several popular JDBC distributions. The following "
+ + "distributions can be declared as the `jdbc_type`: "
+ + JDBC_DRIVER_MAP.keySet().toString().replaceAll("[\\[\\]]", "")
+ + ".\n"
+ + "\n"
+ + "For example, reading a MySQL source using a SQL query: ::"
+ + "\n"
+ + " - type: ReadFromJdbc\n"
+ + " config:\n"
+ + " jdbc_type: mysql\n"
+ + " url: \"jdbc:mysql://my-host:3306/database\"\n"
+ + " query: \"SELECT * FROM table\"\n"
+ + "\n"
+ + "\n"
+ + "**Note**: See the following transforms which are built on top of this transform and simplify "
+ + "this logic for several popular JDBC distributions:\n\n"
+ + " - ReadFromMySql\n"
+ + " - ReadFromPostgres\n"
+ + " - ReadFromOracle\n"
+ + " - ReadFromSqlServer\n"
+ + "\n"
+ + "#### Declaring custom JDBC drivers\n"
+ + "\n"
+ + "If reading from a JDBC source not listed above, or if it is necessary to use a custom driver not "
+ + "packaged with Beam, one must define a JDBC driver and class name.\n"
+ + "\n"
+ + "For example, reading a MySQL source table: ::"
+ + "\n"
+ + " - type: ReadFromJdbc\n"
+ + " config:\n"
+ + " driver_jars: \"path/to/some/jdbc.jar\"\n"
+ + " driver_class_name: \"com.mysql.jdbc.Driver\"\n"
+ + " url: \"jdbc:mysql://my-host:3306/database\"\n"
+ + " table: \"my-table\"\n"
+ + "\n"
+ + "#### Connection Properties\n"
+ + "\n"
+ + "Connection properties are properties sent to the Driver used to connect to the JDBC source. For example, "
+ + "to set the character encoding to UTF-8, one could write: ::\n"
+ + "\n"
+ + " - type: ReadFromJdbc\n"
+ + " config:\n"
+ + " connectionProperties: \"characterEncoding=UTF-8;\"\n"
+ + " ...\n"
+ + "All properties should be semi-colon-delimited (e.g. \"key1=value1;key2=value2;\")\n";
+ }
+
+ protected String baseDescription(String jdbcType) {
+ return String.format(
+ "Read from a %s source using a SQL query or by directly accessing " + "a single table.\n",
+ jdbcType);
+ }
+
+ protected String inheritedDescription(
+ String prettyName, String transformName, String prefix, int port) {
+ return String.format(
+ "\n"
+ + "This is a special case of ReadFromJdbc that includes the "
+ + "necessary %s Driver and classes.\n"
+ + "\n"
+ + "An example of using %s with SQL query: ::\n"
+ + "\n"
+ + " - type: %s\n"
+ + " config:\n"
+ + " url: \"jdbc:%s://my-host:%d/database\"\n"
+ + " query: \"SELECT * FROM table\"\n"
+ + "\n"
+ + "It is also possible to read a table by specifying a table name. For example, the "
+ + "following configuration will perform a read on an entire table: ::\n"
+ + "\n"
+ + " - type: %s\n"
+ + " config:\n"
+ + " url: \"jdbc:%s://my-host:%d/database\"\n"
+ + " table: \"my-table\"\n"
+ + "\n"
+ + "#### Advanced Usage\n"
+ + "\n"
+ + "It might be necessary to use a custom JDBC driver that is not packaged with this "
+ + "transform. If that is the case, see ReadFromJdbc which "
+ + "allows for more custom configuration.",
+ prettyName, transformName, transformName, prefix, port, transformName, prefix, port);
+ }
+
@Override
protected @UnknownKeyFor @NonNull @Initialized Class
configurationClass() {
return JdbcReadSchemaTransformConfiguration.class;
}
+ protected static void validateConfig(JdbcReadSchemaTransformConfiguration config, String jdbcType)
+ throws IllegalArgumentException {
+ if (Strings.isNullOrEmpty(config.getJdbcUrl())) {
+ throw new IllegalArgumentException("JDBC URL cannot be blank");
+ }
+
+ boolean driverClassNamePresent = !Strings.isNullOrEmpty(config.getDriverClassName());
+ boolean driverJarsPresent = !Strings.isNullOrEmpty(config.getDriverJars());
+ boolean jdbcTypePresent = !Strings.isNullOrEmpty(jdbcType);
+ if (!driverClassNamePresent && !driverJarsPresent && !jdbcTypePresent) {
+ throw new IllegalArgumentException(
+ "If JDBC type is not specified, then Driver Class Name and Driver Jars must be specified.");
+ }
+ if (!driverClassNamePresent && !jdbcTypePresent) {
+ throw new IllegalArgumentException(
+ "One of JDBC Driver class name or JDBC type must be specified.");
+ }
+ if (jdbcTypePresent
+ && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(jdbcType).toLowerCase())) {
+ throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet());
+ }
+
+ boolean readQueryPresent = (config.getReadQuery() != null && !"".equals(config.getReadQuery()));
+ boolean locationPresent = (config.getLocation() != null && !"".equals(config.getLocation()));
+
+ if (readQueryPresent && locationPresent) {
+ throw new IllegalArgumentException("Query and Table are mutually exclusive configurations");
+ }
+ if (!readQueryPresent && !locationPresent) {
+ throw new IllegalArgumentException("Either Query or Table must be specified.");
+ }
+ }
+
+ protected static void validateConfig(JdbcReadSchemaTransformConfiguration config)
+ throws IllegalArgumentException {
+ validateConfig(config, config.getJdbcType());
+ }
+
@Override
protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
JdbcReadSchemaTransformConfiguration configuration) {
- configuration.validate();
+ validateConfig(configuration);
return new JdbcReadSchemaTransform(configuration);
}
- static class JdbcReadSchemaTransform extends SchemaTransform implements Serializable {
+ protected static class JdbcReadSchemaTransform extends SchemaTransform implements Serializable {
JdbcReadSchemaTransformConfiguration config;
+ private String jdbcType;
public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) {
this.config = config;
+ this.jdbcType = config.getJdbcType();
+ }
+
+ public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config, String jdbcType) {
+ this.config = config;
+ this.jdbcType = jdbcType;
}
protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
String driverClassName = config.getDriverClassName();
if (Strings.isNullOrEmpty(driverClassName)) {
- driverClassName =
- JDBC_DRIVER_MAP.get(Objects.requireNonNull(config.getJdbcType()).toLowerCase());
+ driverClassName = JDBC_DRIVER_MAP.get(Objects.requireNonNull(jdbcType).toLowerCase());
}
JdbcIO.DataSourceConfiguration dsConfig =
@@ -109,7 +251,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}
JdbcIO.ReadRows readRows =
JdbcIO.readRows().withDataSourceConfiguration(dataSourceConfiguration()).withQuery(query);
- Short fetchSize = config.getFetchSize();
+ Integer fetchSize = config.getFetchSize();
if (fetchSize != null && fetchSize > 0) {
readRows = readRows.withFetchSize(fetchSize);
}
@@ -125,11 +267,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}
}
- @Override
- public @UnknownKeyFor @NonNull @Initialized String identifier() {
- return "beam:schematransform:org.apache.beam:jdbc_read:v1";
- }
-
@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
@@ -145,76 +282,66 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class JdbcReadSchemaTransformConfiguration implements Serializable {
- @Nullable
- public abstract String getDriverClassName();
-
- @Nullable
- public abstract String getJdbcType();
+ @SchemaFieldDescription("Connection URL for the JDBC source.")
public abstract String getJdbcUrl();
+ @SchemaFieldDescription(
+ "Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.")
@Nullable
- public abstract String getUsername();
-
- @Nullable
- public abstract String getPassword();
+ public abstract List<@org.checkerframework.checker.nullness.qual.Nullable String>
+ getConnectionInitSql();
+ @SchemaFieldDescription(
+ "Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be \"key1=value1;key2=value2;\".")
@Nullable
public abstract String getConnectionProperties();
+ @SchemaFieldDescription(
+ "Whether to disable auto commit on read. Defaults to true if not provided. The need for this config varies depending on the database platform. Informix requires this to be set to false while Postgres requires this to be set to true.")
@Nullable
- public abstract List<@org.checkerframework.checker.nullness.qual.Nullable String>
- getConnectionInitSql();
+ public abstract Boolean getDisableAutoCommit();
+ @SchemaFieldDescription(
+ "Name of a Java Driver class to use to connect to the JDBC source. For example, \"com.mysql.jdbc.Driver\".")
@Nullable
- public abstract String getReadQuery();
+ public abstract String getDriverClassName();
+ @SchemaFieldDescription(
+ "Comma separated path(s) for the JDBC driver jar(s). This can be a local path or GCS (gs://) path.")
@Nullable
- public abstract String getLocation();
+ public abstract String getDriverJars();
+ @SchemaFieldDescription(
+ "This method is used to override the size of the data that is going to be fetched and loaded in memory per every database call. It should ONLY be used if the default value throws memory errors.")
@Nullable
- public abstract Short getFetchSize();
+ public abstract Integer getFetchSize();
+ @SchemaFieldDescription(
+ "Type of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.")
@Nullable
- public abstract Boolean getOutputParallelization();
+ public abstract String getJdbcType();
+ @SchemaFieldDescription("Name of the table to read from.")
@Nullable
- public abstract Boolean getDisableAutoCommit();
+ public abstract String getLocation();
+ @SchemaFieldDescription(
+ "Whether to reshuffle the resulting PCollection so results are distributed to all workers.")
@Nullable
- public abstract String getDriverJars();
-
- public void validate() throws IllegalArgumentException {
- if (Strings.isNullOrEmpty(getJdbcUrl())) {
- throw new IllegalArgumentException("JDBC URL cannot be blank");
- }
+ public abstract Boolean getOutputParallelization();
- boolean driverClassNamePresent = !Strings.isNullOrEmpty(getDriverClassName());
- boolean jdbcTypePresent = !Strings.isNullOrEmpty(getJdbcType());
- if (driverClassNamePresent && jdbcTypePresent) {
- throw new IllegalArgumentException(
- "JDBC Driver class name and JDBC type are mutually exclusive configurations.");
- }
- if (!driverClassNamePresent && !jdbcTypePresent) {
- throw new IllegalArgumentException(
- "One of JDBC Driver class name or JDBC type must be specified.");
- }
- if (jdbcTypePresent
- && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(getJdbcType()).toLowerCase())) {
- throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet());
- }
+ @SchemaFieldDescription("Password for the JDBC source.")
+ @Nullable
+ public abstract String getPassword();
- boolean readQueryPresent = (getReadQuery() != null && !"".equals(getReadQuery()));
- boolean locationPresent = (getLocation() != null && !"".equals(getLocation()));
+ @SchemaFieldDescription("SQL query used to query the JDBC source.")
+ @Nullable
+ public abstract String getReadQuery();
- if (readQueryPresent && locationPresent) {
- throw new IllegalArgumentException(
- "ReadQuery and Location are mutually exclusive configurations");
- }
- if (!readQueryPresent && !locationPresent) {
- throw new IllegalArgumentException("Either ReadQuery or Location must be set.");
- }
- }
+ @SchemaFieldDescription("Username for the JDBC source.")
+ @Nullable
+ public abstract String getUsername();
public static Builder builder() {
return new AutoValue_JdbcReadSchemaTransformProvider_JdbcReadSchemaTransformConfiguration
@@ -241,7 +368,7 @@ public abstract static class Builder {
public abstract Builder setConnectionInitSql(List value);
- public abstract Builder setFetchSize(Short value);
+ public abstract Builder setFetchSize(Integer value);
public abstract Builder setOutputParallelization(Boolean value);
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
index c0f7d68899b3..503b64e4a446 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
@@ -83,20 +83,25 @@
import org.slf4j.LoggerFactory;
/** Provides utility functions for working with {@link JdbcIO}. */
-class JdbcUtil {
+public class JdbcUtil {
private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);
+ public static final String MYSQL = "mysql";
+ public static final String POSTGRES = "postgres";
+ public static final String ORACLE = "oracle";
+ public static final String MSSQL = "mssql";
+
static final Map JDBC_DRIVER_MAP =
new HashMap<>(
ImmutableMap.of(
- "mysql",
+ MYSQL,
"com.mysql.cj.jdbc.Driver",
- "postgres",
+ POSTGRES,
"org.postgresql.Driver",
- "oracle",
+ ORACLE,
"oracle.jdbc.driver.OracleDriver",
- "mssql",
+ MSSQL,
"com.microsoft.sqlserver.jdbc.SQLServerDriver"));
@VisibleForTesting
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
index a409b604b11f..c0ac9c02ad5d 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
@@ -29,6 +29,7 @@
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
@@ -54,33 +55,177 @@ public class JdbcWriteSchemaTransformProvider
extends TypedSchemaTransformProvider<
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration> {
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:jdbc_write:v1";
+ }
+
+ @Override
+ public String description() {
+ return baseDescription("JDBC")
+ + "\n"
+ + "This transform can be used to write to a JDBC sink using either a given JDBC driver jar "
+ + "and class name, or by using one of the default packaged drivers given a `jdbc_type`.\n"
+ + "\n"
+ + "#### Using a default driver\n"
+ + "\n"
+ + "This transform comes packaged with drivers for several popular JDBC distributions. The following "
+ + "distributions can be declared as the `jdbc_type`: "
+ + JDBC_DRIVER_MAP.keySet().toString().replaceAll("[\\[\\]]", "")
+ + ".\n"
+ + "\n"
+ + "For example, writing to a MySQL sink using a SQL query: ::"
+ + "\n"
+ + " - type: WriteToJdbc\n"
+ + " config:\n"
+ + " jdbc_type: mysql\n"
+ + " url: \"jdbc:mysql://my-host:3306/database\"\n"
+ + " query: \"INSERT INTO table VALUES(?, ?)\"\n"
+ + "\n"
+ + "\n"
+ + "**Note**: See the following transforms which are built on top of this transform and simplify "
+ + "this logic for several popular JDBC distributions:\n\n"
+ + " - WriteToMySql\n"
+ + " - WriteToPostgres\n"
+ + " - WriteToOracle\n"
+ + " - WriteToSqlServer\n"
+ + "\n"
+ + "#### Declaring custom JDBC drivers\n"
+ + "\n"
+ + "If writing to a JDBC sink not listed above, or if it is necessary to use a custom driver not "
+ + "packaged with Beam, one must define a JDBC driver and class name.\n"
+ + "\n"
+ + "For example, writing to a MySQL table: ::"
+ + "\n"
+ + " - type: WriteToJdbc\n"
+ + " config:\n"
+ + " driver_jars: \"path/to/some/jdbc.jar\"\n"
+ + " driver_class_name: \"com.mysql.jdbc.Driver\"\n"
+ + " url: \"jdbc:mysql://my-host:3306/database\"\n"
+ + " table: \"my-table\"\n"
+ + "\n"
+ + "#### Connection Properties\n"
+ + "\n"
+ + "Connection properties are properties sent to the Driver used to connect to the JDBC source. For example, "
+ + "to set the character encoding to UTF-8, one could write: ::\n"
+ + "\n"
+ + " - type: WriteToJdbc\n"
+ + " config:\n"
+ + " connectionProperties: \"characterEncoding=UTF-8;\"\n"
+ + " ...\n"
+ + "All properties should be semi-colon-delimited (e.g. \"key1=value1;key2=value2;\")\n";
+ }
+
+ protected String baseDescription(String jdbcType) {
+ return String.format(
+ "Write to a %s sink using a SQL query or by directly accessing " + "a single table.\n",
+ jdbcType);
+ }
+
+ protected String inheritedDescription(
+ String prettyName, String transformName, String prefix, int port) {
+ return String.format(
+ "\n"
+ + "This is a special case of WriteToJdbc that includes the "
+ + "necessary %s Driver and classes.\n"
+ + "\n"
+ + "An example of using %s with SQL query: ::\n"
+ + "\n"
+ + " - type: %s\n"
+ + " config:\n"
+ + " url: \"jdbc:%s://my-host:%d/database\"\n"
+ + " query: \"INSERT INTO table VALUES(?, ?)\"\n"
+ + "\n"
+ + "It is also possible to read a table by specifying a table name. For example, the "
+ + "following configuration will perform a read on an entire table: ::\n"
+ + "\n"
+ + " - type: %s\n"
+ + " config:\n"
+ + " url: \"jdbc:%s://my-host:%d/database\"\n"
+ + " table: \"my-table\"\n"
+ + "\n"
+ + "#### Advanced Usage\n"
+ + "\n"
+ + "It might be necessary to use a custom JDBC driver that is not packaged with this "
+ + "transform. If that is the case, see WriteToJdbc which "
+ + "allows for more custom configuration.",
+ prettyName, transformName, transformName, prefix, port, transformName, prefix, port);
+ }
+
@Override
protected @UnknownKeyFor @NonNull @Initialized Class
configurationClass() {
return JdbcWriteSchemaTransformConfiguration.class;
}
+ protected static void validateConfig(
+ JdbcWriteSchemaTransformConfiguration config, String jdbcType)
+ throws IllegalArgumentException {
+ if (Strings.isNullOrEmpty(config.getJdbcUrl())) {
+ throw new IllegalArgumentException("JDBC URL cannot be blank");
+ }
+
+ boolean driverClassNamePresent = !Strings.isNullOrEmpty(config.getDriverClassName());
+ boolean driverJarsPresent = !Strings.isNullOrEmpty(config.getDriverJars());
+ boolean jdbcTypePresent = !Strings.isNullOrEmpty(jdbcType);
+ if (!driverClassNamePresent && !driverJarsPresent && !jdbcTypePresent) {
+ throw new IllegalArgumentException(
+ "If JDBC type is not specified, then Driver Class Name and Driver Jars must be specified.");
+ }
+ if (!driverClassNamePresent && !jdbcTypePresent) {
+ throw new IllegalArgumentException(
+ "One of JDBC Driver class name or JDBC type must be specified.");
+ }
+ if (jdbcTypePresent
+ && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(jdbcType).toLowerCase())) {
+ throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet());
+ }
+
+ boolean writeStatementPresent =
+ (config.getWriteStatement() != null && !"".equals(config.getWriteStatement()));
+ boolean locationPresent = (config.getLocation() != null && !"".equals(config.getLocation()));
+
+ if (writeStatementPresent && locationPresent) {
+ throw new IllegalArgumentException(
+ "Write Statement and Table are mutually exclusive configurations");
+ }
+ if (!writeStatementPresent && !locationPresent) {
+ throw new IllegalArgumentException("Either Write Statement or Table must be set.");
+ }
+ }
+
+ protected static void validateConfig(JdbcWriteSchemaTransformConfiguration config)
+ throws IllegalArgumentException {
+ validateConfig(config, config.getJdbcType());
+ }
+
@Override
protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
JdbcWriteSchemaTransformConfiguration configuration) {
- configuration.validate();
+ validateConfig(configuration);
return new JdbcWriteSchemaTransform(configuration);
}
- static class JdbcWriteSchemaTransform extends SchemaTransform implements Serializable {
+ protected static class JdbcWriteSchemaTransform extends SchemaTransform implements Serializable {
JdbcWriteSchemaTransformConfiguration config;
+ private String jdbcType;
public JdbcWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config) {
this.config = config;
+ this.jdbcType = config.getJdbcType();
+ }
+
+ public JdbcWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config, String jdbcType) {
+ this.config = config;
+ this.jdbcType = jdbcType;
}
protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
String driverClassName = config.getDriverClassName();
if (Strings.isNullOrEmpty(driverClassName)) {
- driverClassName =
- JDBC_DRIVER_MAP.get(Objects.requireNonNull(config.getJdbcType()).toLowerCase());
+ driverClassName = JDBC_DRIVER_MAP.get(Objects.requireNonNull(jdbcType).toLowerCase());
}
JdbcIO.DataSourceConfiguration dsConfig =
@@ -151,11 +296,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}
}
- @Override
- public @UnknownKeyFor @NonNull @Initialized String identifier() {
- return "beam:schematransform:org.apache.beam:jdbc_write:v1";
- }
-
@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
@@ -172,71 +312,55 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
@DefaultSchema(AutoValueSchema.class)
public abstract static class JdbcWriteSchemaTransformConfiguration implements Serializable {
- @Nullable
- public abstract String getDriverClassName();
-
- @Nullable
- public abstract String getJdbcType();
-
+ @SchemaFieldDescription("Connection URL for the JDBC sink.")
public abstract String getJdbcUrl();
+ @SchemaFieldDescription(
+ "If true, enables using a dynamically determined number of shards to write.")
@Nullable
- public abstract String getUsername();
+ public abstract Boolean getAutosharding();
+ @SchemaFieldDescription(
+ "Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.")
@Nullable
- public abstract String getPassword();
+ public abstract List<@org.checkerframework.checker.nullness.qual.Nullable String>
+ getConnectionInitSql();
+ @SchemaFieldDescription(
+ "Used to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be \"key1=value1;key2=value2;\".")
@Nullable
public abstract String getConnectionProperties();
+ @SchemaFieldDescription(
+ "Name of a Java Driver class to use to connect to the JDBC source. For example, \"com.mysql.jdbc.Driver\".")
@Nullable
- public abstract List<@org.checkerframework.checker.nullness.qual.Nullable String>
- getConnectionInitSql();
+ public abstract String getDriverClassName();
+ @SchemaFieldDescription(
+ "Comma separated path(s) for the JDBC driver jar(s). This can be a local path or GCS (gs://) path.")
@Nullable
- public abstract String getLocation();
+ public abstract String getDriverJars();
+ @SchemaFieldDescription(
+ "Type of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.")
@Nullable
- public abstract String getWriteStatement();
+ public abstract String getJdbcType();
+ @SchemaFieldDescription("Name of the table to write to.")
@Nullable
- public abstract Boolean getAutosharding();
+ public abstract String getLocation();
+ @SchemaFieldDescription("Password for the JDBC source.")
@Nullable
- public abstract String getDriverJars();
-
- public void validate() throws IllegalArgumentException {
- if (Strings.isNullOrEmpty(getJdbcUrl())) {
- throw new IllegalArgumentException("JDBC URL cannot be blank");
- }
-
- boolean driverClassNamePresent = !Strings.isNullOrEmpty(getDriverClassName());
- boolean jdbcTypePresent = !Strings.isNullOrEmpty(getJdbcType());
- if (driverClassNamePresent && jdbcTypePresent) {
- throw new IllegalArgumentException(
- "JDBC Driver class name and JDBC type are mutually exclusive configurations.");
- }
- if (!driverClassNamePresent && !jdbcTypePresent) {
- throw new IllegalArgumentException(
- "One of JDBC Driver class name or JDBC type must be specified.");
- }
- if (jdbcTypePresent
- && !JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(getJdbcType()).toLowerCase())) {
- throw new IllegalArgumentException("JDBC type must be one of " + JDBC_DRIVER_MAP.keySet());
- }
+ public abstract String getPassword();
- boolean writeStatementPresent =
- (getWriteStatement() != null && !"".equals(getWriteStatement()));
- boolean locationPresent = (getLocation() != null && !"".equals(getLocation()));
+ @SchemaFieldDescription("Username for the JDBC source.")
+ @Nullable
+ public abstract String getUsername();
- if (writeStatementPresent && locationPresent) {
- throw new IllegalArgumentException(
- "ReadQuery and Location are mutually exclusive configurations");
- }
- if (!writeStatementPresent && !locationPresent) {
- throw new IllegalArgumentException("Either ReadQuery or Location must be set.");
- }
- }
+ @SchemaFieldDescription("SQL query used to insert records into the JDBC sink.")
+ @Nullable
+ public abstract String getWriteStatement();
public static Builder builder() {
return new AutoValue_JdbcWriteSchemaTransformProvider_JdbcWriteSchemaTransformConfiguration
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java
new file mode 100644
index 000000000000..a90929e08278
--- /dev/null
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MYSQL;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class ReadFromMySqlSchemaTransformProvider extends JdbcReadSchemaTransformProvider {
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:mysql_read:v1";
+ }
+
+ @Override
+ public String description() {
+ return baseDescription("MySQL") + inheritedDescription("MySQL", "ReadFromMySql", "mysql", 3306);
+ }
+
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcReadSchemaTransformConfiguration configuration) {
+ validateConfig(configuration, MYSQL);
+ return new JdbcReadSchemaTransform(configuration, MYSQL);
+ }
+}
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java
new file mode 100644
index 000000000000..008dba41ae04
--- /dev/null
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromOracleSchemaTransformProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.ORACLE;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class ReadFromOracleSchemaTransformProvider extends JdbcReadSchemaTransformProvider {
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:oracle_read:v1";
+ }
+
+ @Override
+ public String description() {
+ return baseDescription("Oracle")
+ + inheritedDescription("Oracle", "ReadFromOracle", "oracle", 1521);
+ }
+
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcReadSchemaTransformConfiguration configuration) {
+ validateConfig(configuration, ORACLE);
+ return new JdbcReadSchemaTransform(configuration, ORACLE);
+ }
+}
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java
new file mode 100644
index 000000000000..773386db40c3
--- /dev/null
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.POSTGRES;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class ReadFromPostgresSchemaTransformProvider extends JdbcReadSchemaTransformProvider {
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:postgres_read:v1";
+ }
+
+ @Override
+ public String description() {
+ return baseDescription("PostgreSQL")
+ + inheritedDescription("Postgres", "ReadFromPostgres", "postgresql", 5432);
+ }
+
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcReadSchemaTransformConfiguration configuration) {
+ validateConfig(configuration, POSTGRES);
+ return new JdbcReadSchemaTransform(configuration, POSTGRES);
+ }
+}
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java
new file mode 100644
index 000000000000..a0afbf28c3d0
--- /dev/null
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MSSQL;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class ReadFromSqlServerSchemaTransformProvider extends JdbcReadSchemaTransformProvider {
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:sql_server_read:v1";
+ }
+
+ @Override
+ public String description() {
+ return baseDescription("Sql Server (Microsoft SQL)")
+ + inheritedDescription("SQL Server", "ReadFromSqlServer", "sqlserver", 1433);
+ }
+
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcReadSchemaTransformConfiguration configuration) {
+ validateConfig(configuration, MSSQL);
+ return new JdbcReadSchemaTransform(configuration, MSSQL);
+ }
+}
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java
new file mode 100644
index 000000000000..ceab4461d8fa
--- /dev/null
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MYSQL;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class WriteToMySqlSchemaTransformProvider extends JdbcWriteSchemaTransformProvider {
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:mysql_write:v1";
+ }
+
+ @Override
+ public String description() {
+ return baseDescription("MySQL") + inheritedDescription("MySQL", "WriteToMySql", "mysql", 3306);
+ }
+
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcWriteSchemaTransformConfiguration configuration) {
+ validateConfig(configuration, MYSQL);
+ return new JdbcWriteSchemaTransform(configuration, MYSQL);
+ }
+}
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java
new file mode 100644
index 000000000000..55e4a2d8b3de
--- /dev/null
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToOracleSchemaTransformProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.ORACLE;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class WriteToOracleSchemaTransformProvider extends JdbcWriteSchemaTransformProvider {
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:oracle_write:v1";
+ }
+
+ @Override
+ public String description() {
+ return baseDescription("Oracle")
+ + inheritedDescription("Oracle", "WriteToOracle", "oracle", 1521);
+ }
+
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcWriteSchemaTransformConfiguration configuration) {
+ validateConfig(configuration, ORACLE);
+ return new JdbcWriteSchemaTransform(configuration, ORACLE);
+ }
+}
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java
new file mode 100644
index 000000000000..6ac4f6f43efd
--- /dev/null
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.POSTGRES;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class WriteToPostgresSchemaTransformProvider extends JdbcWriteSchemaTransformProvider {
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:postgres_write:v1";
+ }
+
+ @Override
+ public String description() {
+ return baseDescription("PostgreSQL")
+ + inheritedDescription("Postgres", "WriteToPostgres", "postgresql", 5432);
+ }
+
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcWriteSchemaTransformConfiguration configuration) {
+ validateConfig(configuration, POSTGRES);
+ return new JdbcWriteSchemaTransform(configuration, POSTGRES);
+ }
+}
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java
new file mode 100644
index 000000000000..c9166c1ea549
--- /dev/null
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MSSQL;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class WriteToSqlServerSchemaTransformProvider extends JdbcWriteSchemaTransformProvider {
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:sql_server_write:v1";
+ }
+
+ @Override
+ public String description() {
+ return baseDescription("Sql Server (Microsoft SQL)")
+ + inheritedDescription("SQL Server", "WriteToSqlServer", "sqlserver", 1433);
+ }
+
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcWriteSchemaTransformConfiguration configuration) {
+ validateConfig(configuration, MSSQL);
+ return new JdbcWriteSchemaTransform(configuration, MSSQL);
+ }
+}
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
index 7cbdd48d1587..931c7f248bbf 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
@@ -77,73 +77,73 @@ public void testInvalidReadSchemaOptions() {
assertThrows(
IllegalArgumentException.class,
() -> {
- JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
- .setDriverClassName("")
- .setJdbcUrl("")
- .build()
- .validate();
+ JdbcReadSchemaTransformProvider.validateConfig(
+ JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setDriverClassName("")
+ .setJdbcUrl("")
+ .build());
});
assertThrows(
IllegalArgumentException.class,
() -> {
- JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
- .setDriverClassName("ClassName")
- .setJdbcUrl("JdbcUrl")
- .setLocation("Location")
- .setReadQuery("Query")
- .build()
- .validate();
+ JdbcReadSchemaTransformProvider.validateConfig(
+ JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setDriverClassName("ClassName")
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .setReadQuery("Query")
+ .build());
});
assertThrows(
IllegalArgumentException.class,
() -> {
- JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
- .setDriverClassName("ClassName")
- .setJdbcUrl("JdbcUrl")
- .build()
- .validate();
+ JdbcReadSchemaTransformProvider.validateConfig(
+ JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setDriverClassName("ClassName")
+ .setJdbcUrl("JdbcUrl")
+ .build());
});
assertThrows(
IllegalArgumentException.class,
() -> {
- JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
- .setJdbcUrl("JdbcUrl")
- .setLocation("Location")
- .setJdbcType("invalidType")
- .build()
- .validate();
+ JdbcReadSchemaTransformProvider.validateConfig(
+ JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .setJdbcType("invalidType")
+ .build());
});
assertThrows(
IllegalArgumentException.class,
() -> {
- JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
- .setJdbcUrl("JdbcUrl")
- .setLocation("Location")
- .build()
- .validate();
+ JdbcReadSchemaTransformProvider.validateConfig(
+ JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .build());
});
assertThrows(
IllegalArgumentException.class,
() -> {
- JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
- .setJdbcUrl("JdbcUrl")
- .setLocation("Location")
- .setDriverClassName("ClassName")
- .setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0])
- .build()
- .validate();
+ JdbcReadSchemaTransformProvider.validateConfig(
+ JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .setDriverClassName("ClassName")
+ .setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0])
+ .build());
});
}
@Test
public void testValidReadSchemaOptions() {
for (String jdbcType : JDBC_DRIVER_MAP.keySet()) {
- JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
- .setJdbcUrl("JdbcUrl")
- .setLocation("Location")
- .setJdbcType(jdbcType)
- .build()
- .validate();
+ JdbcReadSchemaTransformProvider.validateConfig(
+ JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .setJdbcType(jdbcType)
+ .build());
}
}
diff --git a/sdks/python/apache_beam/yaml/generate_yaml_docs.py b/sdks/python/apache_beam/yaml/generate_yaml_docs.py
index 2123c7a9f202..30c5199d9791 100644
--- a/sdks/python/apache_beam/yaml/generate_yaml_docs.py
+++ b/sdks/python/apache_beam/yaml/generate_yaml_docs.py
@@ -196,12 +196,23 @@ def io_grouping_key(transform_name):
]
+def add_transform_links(description, providers):
+ for provider in providers:
+ description = re.sub(
+ rf"(?{provider}',
+ description or '')
+ return description
+
+
def transform_docs(transform_base, transforms, providers, extra_docs=''):
return '\n'.join([
f'## {transform_base}',
'',
longest(
- lambda t: longest(lambda p: p.description(t), providers[t]),
+ lambda t: longest(
+ lambda p: add_transform_links(p.description(t), providers.keys()),
+ providers[t]),
transforms).replace('::\n', '\n\n :::yaml\n'),
'',
extra_docs,
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml
index 269c14e17baa..e45794595c9f 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -194,37 +194,42 @@
transforms:
'ReadFromJdbc': 'ReadFromJdbc'
'WriteToJdbc': 'WriteToJdbc'
- 'ReadFromMySql': 'ReadFromJdbc'
- 'WriteToMySql': 'WriteToJdbc'
- 'ReadFromPostgres': 'ReadFromJdbc'
- 'WriteToPostgres': 'WriteToJdbc'
- 'ReadFromOracle': 'ReadFromJdbc'
- 'WriteToOracle': 'WriteToJdbc'
- 'ReadFromSqlServer': 'ReadFromJdbc'
- 'WriteToSqlServer': 'WriteToJdbc'
+ 'ReadFromMySql': 'ReadFromMySql'
+ 'WriteToMySql': 'WriteToMySql'
+ 'ReadFromPostgres': 'ReadFromPostgres'
+ 'WriteToPostgres': 'WriteToPostgres'
+ 'ReadFromOracle': 'ReadFromOracle'
+ 'WriteToOracle': 'WriteToOracle'
+ 'ReadFromSqlServer': 'ReadFromSqlServer'
+ 'WriteToSqlServer': 'WriteToSqlServer'
config:
mappings:
'ReadFromJdbc':
- driver_class_name: 'driver_class_name'
- type: 'jdbc_type'
url: 'jdbc_url'
- username: 'username'
- password: 'password'
- table: 'location'
- query: 'read_query'
- driver_jars: 'driver_jars'
- connection_properties: 'connection_properties'
connection_init_sql: 'connection_init_sql'
- 'WriteToJdbc':
+ connection_properties: 'connection_properties'
+ disable_auto_commit: 'disable_auto_commit'
driver_class_name: 'driver_class_name'
+ driver_jars: 'driver_jars'
+ fetch_size: 'fetch_size'
+ output_parallelization: 'output_parallelization'
+ password: 'password'
+ query: 'read_query'
+ table: 'location'
type: 'jdbc_type'
- url: 'jdbc_url'
username: 'username'
+ 'WriteToJdbc':
+ url: 'jdbc_url'
+ auto_sharding: 'autosharding'
+ connection_init_sql: 'connection_init_sql'
+ connection_properties: 'connection_properties'
+ driver_class_name: 'driver_class_name'
+ driver_jars: 'driver_jars'
password: 'password'
table: 'location'
- driver_jars: 'driver_jars'
- connection_properties: 'connection_properties'
- connection_init_sql: 'connection_init_sql'
+ type: 'jdbc_type'
+ username: 'username'
+ query: 'write_statement'
'ReadFromMySql': 'ReadFromJdbc'
'WriteToMySql': 'WriteToJdbc'
'ReadFromPostgres': 'ReadFromJdbc'
@@ -235,26 +240,48 @@
'WriteToSqlServer': 'WriteToJdbc'
defaults:
'ReadFromMySql':
- jdbc_type: 'mysql'
+ driver_class_name: ''
+ driver_jars: ''
'WriteToMySql':
- jdbc_type: 'mysql'
+ driver_class_name: ''
+ driver_jars: ''
'ReadFromPostgres':
- jdbc_type: 'postgres'
+ connection_init_sql: ''
+ driver_class_name: ''
+ driver_jars: ''
'WriteToPostgres':
- jdbc_type: 'postgres'
+ connection_init_sql: ''
+ driver_class_name: ''
+ driver_jars: ''
'ReadFromOracle':
- jdbc_type: 'oracle'
+ connection_init_sql: ''
+ driver_class_name: ''
+ driver_jars: ''
'WriteToOracle':
- jdbc_type: 'oracle'
+ connection_init_sql: ''
+ driver_class_name: ''
+ driver_jars: ''
'ReadFromSqlServer':
- jdbc_type: 'mssql'
+ connection_init_sql: ''
+ driver_class_name: ''
+ driver_jars: ''
'WriteToSqlServer':
- jdbc_type: 'mssql'
+ connection_init_sql: ''
+ driver_class_name: ''
+ driver_jars: ''
underlying_provider:
type: beamJar
transforms:
'ReadFromJdbc': 'beam:schematransform:org.apache.beam:jdbc_read:v1'
+ 'ReadFromMySql': 'beam:schematransform:org.apache.beam:mysql_read:v1'
+ 'ReadFromPostgres': 'beam:schematransform:org.apache.beam:postgres_read:v1'
+ 'ReadFromOracle': 'beam:schematransform:org.apache.beam:oracle_read:v1'
+ 'ReadFromSqlServer': 'beam:schematransform:org.apache.beam:sql_server_read:v1'
'WriteToJdbc': 'beam:schematransform:org.apache.beam:jdbc_write:v1'
+ 'WriteToMySql': 'beam:schematransform:org.apache.beam:mysql_write:v1'
+ 'WriteToPostgres': 'beam:schematransform:org.apache.beam:postgres_write:v1'
+ 'WriteToOracle': 'beam:schematransform:org.apache.beam:oracle_write:v1'
+ 'WriteToSqlServer': 'beam:schematransform:org.apache.beam:sql_server_write:v1'
config:
gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar'