diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
index a03c067d2c4e..1efc8e9e4405 100644
--- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
+++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
@@ -1,3 +1,4 @@
{
- "comment": "Modify this file in a trivial way to cause this test suite to run"
+ "comment": "Modify this file in a trivial way to cause this test suite to run",
+ "modification": 1
}
diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index b26833333238..e3d6056a5de9 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
- "modification": 2
+ "modification": 1
}
diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
index b03350966d6c..f102e82bafa6 100644
--- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
+++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
@@ -70,6 +70,10 @@ message ManagedTransforms {
"beam:schematransform:org.apache.beam:kafka_read:v1"];
KAFKA_WRITE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:kafka_write:v1"];
+ BIGQUERY_READ = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:schematransform:org.apache.beam:bigquery_storage_read:v1"];
+ BIGQUERY_WRITE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:schematransform:org.apache.beam:bigquery_write:v1"];
}
}
diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index 3e322d976c1a..2acce3e94cc2 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -159,6 +159,7 @@ dependencies {
testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration")
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
+ testImplementation project(":sdks:java:managed")
testImplementation project(path: ":sdks:java:io:common")
testImplementation project(path: ":sdks:java:testing:test-utils")
testImplementation library.java.commons_math3
diff --git a/sdks/java/io/google-cloud-platform/expansion-service/build.gradle b/sdks/java/io/google-cloud-platform/expansion-service/build.gradle
index 1288d91964e1..f6c6f07d0cdf 100644
--- a/sdks/java/io/google-cloud-platform/expansion-service/build.gradle
+++ b/sdks/java/io/google-cloud-platform/expansion-service/build.gradle
@@ -36,6 +36,9 @@ dependencies {
permitUnusedDeclared project(":sdks:java:io:google-cloud-platform") // BEAM-11761
implementation project(":sdks:java:extensions:schemaio-expansion-service")
permitUnusedDeclared project(":sdks:java:extensions:schemaio-expansion-service") // BEAM-11761
+ implementation project(":sdks:java:managed")
+ permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761
+
runtimeOnly library.java.slf4j_jdk14
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformConfiguration.java
deleted file mode 100644
index f634b5ec6f60..000000000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformConfiguration.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.auto.value.AutoValue;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-
-/**
- * Configuration for writing to BigQuery.
- *
- *
This class is meant to be used with {@link BigQueryFileLoadsWriteSchemaTransformProvider}.
- *
- *
Internal only: This class is actively being worked on, and it will likely change. We
- * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
- * repository.
- */
-@DefaultSchema(AutoValueSchema.class)
-@AutoValue
-public abstract class BigQueryFileLoadsWriteSchemaTransformConfiguration {
-
- /** Instantiates a {@link BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder}. */
- public static Builder builder() {
- return new AutoValue_BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder();
- }
-
- /**
- * Writes to the given table specification. See {@link BigQueryIO.Write#to(String)}} for the
- * expected format.
- */
- public abstract String getTableSpec();
-
- /** Specifies whether the table should be created if it does not exist. */
- public abstract String getCreateDisposition();
-
- /** Specifies what to do with existing data in the table, in case the table already exists. */
- public abstract String getWriteDisposition();
-
- @AutoValue.Builder
- public abstract static class Builder {
-
- /**
- * Writes to the given table specification. See {@link BigQueryIO.Write#to(String)}} for the
- * expected format.
- */
- public abstract Builder setTableSpec(String value);
-
- /** Specifies whether the table should be created if it does not exist. */
- public abstract Builder setCreateDisposition(String value);
-
- /** Specifies what to do with existing data in the table, in case the table already exists. */
- public abstract Builder setWriteDisposition(String value);
-
- /** Builds the {@link BigQueryFileLoadsWriteSchemaTransformConfiguration} configuration. */
- public abstract BigQueryFileLoadsWriteSchemaTransformConfiguration build();
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java
deleted file mode 100644
index 3212e2a30348..000000000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.auto.service.AutoService;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
-import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
-import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
-import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-
-/**
- * An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured
- * using {@link BigQueryFileLoadsWriteSchemaTransformConfiguration}.
- *
- *
Internal only: This class is actively being worked on, and it will likely change. We
- * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
- * repository.
- */
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Internal
-@AutoService(SchemaTransformProvider.class)
-public class BigQueryFileLoadsWriteSchemaTransformProvider
- extends TypedSchemaTransformProvider {
-
- private static final String IDENTIFIER =
- "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
- static final String INPUT_TAG = "INPUT";
-
- /** Returns the expected class of the configuration. */
- @Override
- protected Class configurationClass() {
- return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
- }
-
- /** Returns the expected {@link SchemaTransform} of the configuration. */
- @Override
- protected SchemaTransform from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
- return new BigQueryWriteSchemaTransform(configuration);
- }
-
- /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
- @Override
- public String identifier() {
- return IDENTIFIER;
- }
-
- /**
- * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a
- * single is expected, this returns a list with a single name.
- */
- @Override
- public List inputCollectionNames() {
- return Collections.singletonList(INPUT_TAG);
- }
-
- /**
- * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
- * no output is expected, this returns an empty list.
- */
- @Override
- public List outputCollectionNames() {
- return Collections.emptyList();
- }
-
- /**
- * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based on a {@link
- * BigQueryFileLoadsWriteSchemaTransformConfiguration}.
- */
- protected static class BigQueryWriteSchemaTransform extends SchemaTransform {
- /** An instance of {@link BigQueryServices} used for testing. */
- private BigQueryServices testBigQueryServices = null;
-
- private final BigQueryFileLoadsWriteSchemaTransformConfiguration configuration;
-
- BigQueryWriteSchemaTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
- this.configuration = configuration;
- }
-
- @Override
- public void validate(PipelineOptions options) {
- if (!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name())) {
- return;
- }
-
- BigQueryOptions bigQueryOptions = options.as(BigQueryOptions.class);
-
- BigQueryServices bigQueryServices = new BigQueryServicesImpl();
- if (testBigQueryServices != null) {
- bigQueryServices = testBigQueryServices;
- }
-
- DatasetService datasetService = bigQueryServices.getDatasetService(bigQueryOptions);
- TableReference tableReference = BigQueryUtils.toTableReference(configuration.getTableSpec());
-
- try {
- Table table = datasetService.getTable(tableReference);
- if (table == null) {
- throw new NullPointerException();
- }
-
- if (table.getSchema() == null) {
- throw new InvalidConfigurationException(
- String.format("could not fetch schema for table: %s", configuration.getTableSpec()));
- }
-
- } catch (NullPointerException | InterruptedException | IOException ex) {
- throw new InvalidConfigurationException(
- String.format(
- "could not fetch table %s, error: %s",
- configuration.getTableSpec(), ex.getMessage()));
- }
- }
-
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- validate(input);
- PCollection rowPCollection = input.get(INPUT_TAG);
- Schema schema = rowPCollection.getSchema();
- BigQueryIO.Write write = toWrite(schema);
- if (testBigQueryServices != null) {
- write = write.withTestServices(testBigQueryServices);
- }
-
- PCollection tableRowPCollection =
- rowPCollection.apply(
- MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
- tableRowPCollection.apply(write);
- return PCollectionRowTuple.empty(input.getPipeline());
- }
-
- /** Instantiates a {@link BigQueryIO.Write} from a {@link Schema}. */
- BigQueryIO.Write toWrite(Schema schema) {
- TableSchema tableSchema = BigQueryUtils.toTableSchema(schema);
- CreateDisposition createDisposition =
- CreateDisposition.valueOf(configuration.getCreateDisposition());
- WriteDisposition writeDisposition =
- WriteDisposition.valueOf(configuration.getWriteDisposition());
-
- return BigQueryIO.writeTableRows()
- .to(configuration.getTableSpec())
- .withCreateDisposition(createDisposition)
- .withWriteDisposition(writeDisposition)
- .withSchema(tableSchema);
- }
-
- /** Setter for testing using {@link BigQueryServices}. */
- @VisibleForTesting
- void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
- this.testBigQueryServices = testBigQueryServices;
- }
-
- /** Validate a {@link PCollectionRowTuple} input. */
- void validate(PCollectionRowTuple input) {
- if (!input.has(INPUT_TAG)) {
- throw new IllegalArgumentException(
- String.format(
- "%s %s is missing expected tag: %s",
- getClass().getSimpleName(), input.getClass().getSimpleName(), INPUT_TAG));
- }
-
- PCollection rowInput = input.get(INPUT_TAG);
- Schema sourceSchema = rowInput.getSchema();
-
- if (sourceSchema == null) {
- throw new IllegalArgumentException(
- String.format("%s is null for input of tag: %s", Schema.class, INPUT_TAG));
- }
-
- if (!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name())) {
- return;
- }
-
- BigQueryOptions bigQueryOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
-
- BigQueryServices bigQueryServices = new BigQueryServicesImpl();
- if (testBigQueryServices != null) {
- bigQueryServices = testBigQueryServices;
- }
-
- DatasetService datasetService = bigQueryServices.getDatasetService(bigQueryOptions);
- TableReference tableReference = BigQueryUtils.toTableReference(configuration.getTableSpec());
-
- try {
- Table table = datasetService.getTable(tableReference);
- if (table == null) {
- throw new NullPointerException();
- }
-
- TableSchema tableSchema = table.getSchema();
- if (tableSchema == null) {
- throw new NullPointerException();
- }
-
- Schema destinationSchema = BigQueryUtils.fromTableSchema(tableSchema);
- if (destinationSchema == null) {
- throw new NullPointerException();
- }
-
- validateMatching(sourceSchema, destinationSchema);
-
- } catch (NullPointerException | InterruptedException | IOException e) {
- throw new InvalidConfigurationException(
- String.format(
- "could not validate input for create disposition: %s and table: %s, error: %s",
- configuration.getCreateDisposition(),
- configuration.getTableSpec(),
- e.getMessage()));
- }
- }
-
- void validateMatching(Schema sourceSchema, Schema destinationSchema) {
- if (!sourceSchema.equals(destinationSchema)) {
- throw new IllegalArgumentException(
- String.format(
- "source and destination schema mismatch for table: %s",
- configuration.getTableSpec()));
- }
- }
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java
index 8b8e8179ce7d..15b1b01d7f6c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery.providers;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
@@ -26,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
@@ -33,7 +35,9 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransformConfiguration;
import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -62,7 +66,7 @@
public class BigQueryDirectReadSchemaTransformProvider
extends TypedSchemaTransformProvider {
- private static final String OUTPUT_TAG = "OUTPUT_ROWS";
+ public static final String OUTPUT_TAG = "output";
@Override
protected Class configurationClass() {
@@ -76,7 +80,7 @@ protected SchemaTransform from(BigQueryDirectReadSchemaTransformConfiguration co
@Override
public String identifier() {
- return "beam:schematransform:org.apache.beam:bigquery_storage_read:v1";
+ return getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ);
}
@Override
@@ -139,6 +143,10 @@ public static Builder builder() {
@Nullable
public abstract List getSelectedFields();
+ @SchemaFieldDescription("Use this Cloud KMS key to encrypt your data")
+ @Nullable
+ public abstract String getKmsKey();
+
@Nullable
/** Builder for the {@link BigQueryDirectReadSchemaTransformConfiguration}. */
@AutoValue.Builder
@@ -151,6 +159,8 @@ public abstract static class Builder {
public abstract Builder setSelectedFields(List selectedFields);
+ public abstract Builder setKmsKey(String kmsKey);
+
/** Builds a {@link BigQueryDirectReadSchemaTransformConfiguration} instance. */
public abstract BigQueryDirectReadSchemaTransformConfiguration build();
}
@@ -161,7 +171,7 @@ public abstract static class Builder {
* BigQueryDirectReadSchemaTransformConfiguration} and instantiated by {@link
* BigQueryDirectReadSchemaTransformProvider}.
*/
- protected static class BigQueryDirectReadSchemaTransform extends SchemaTransform {
+ public static class BigQueryDirectReadSchemaTransform extends SchemaTransform {
private BigQueryServices testBigQueryServices = null;
private final BigQueryDirectReadSchemaTransformConfiguration configuration;
@@ -172,6 +182,20 @@ protected static class BigQueryDirectReadSchemaTransform extends SchemaTransform
this.configuration = configuration;
}
+ public Row getConfigurationRow() {
+ try {
+ // To stay consistent with our SchemaTransform configuration naming conventions,
+ // we sort lexicographically
+ return SchemaRegistry.createDefault()
+ .getToRowFunction(BigQueryDirectReadSchemaTransformConfiguration.class)
+ .apply(configuration)
+ .sorted()
+ .toSnakeCase();
+ } catch (NoSuchSchemaException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@VisibleForTesting
public void setBigQueryServices(BigQueryServices testBigQueryServices) {
this.testBigQueryServices = testBigQueryServices;
@@ -211,6 +235,9 @@ BigQueryIO.TypedRead createDirectReadTransform() {
} else {
read = read.fromQuery(configuration.getQuery());
}
+ if (!Strings.isNullOrEmpty(configuration.getKmsKey())) {
+ read = read.withKmsKey(configuration.getKmsKey());
+ }
if (this.testBigQueryServices != null) {
read = read.withTestServices(testBigQueryServices);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java
new file mode 100644
index 000000000000..092cf42a29a4
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured
+ * using {@link org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration}.
+ *
+ * Internal only: This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@Internal
+@AutoService(SchemaTransformProvider.class)
+public class BigQueryFileLoadsSchemaTransformProvider
+ extends TypedSchemaTransformProvider {
+
+ static final String INPUT_TAG = "input";
+
+ @Override
+ protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
+ return new BigQueryFileLoadsSchemaTransform(configuration);
+ }
+
+ @Override
+ public String identifier() {
+ return "beam:schematransform:org.apache.beam:bigquery_fileloads:v1";
+ }
+
+ @Override
+ public List inputCollectionNames() {
+ return Collections.singletonList(INPUT_TAG);
+ }
+
+ @Override
+ public List outputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ public static class BigQueryFileLoadsSchemaTransform extends SchemaTransform {
+ /** An instance of {@link BigQueryServices} used for testing. */
+ private BigQueryServices testBigQueryServices = null;
+
+ private final BigQueryWriteConfiguration configuration;
+
+ BigQueryFileLoadsSchemaTransform(BigQueryWriteConfiguration configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ PCollection rowPCollection = input.getSinglePCollection();
+ BigQueryIO.Write write = toWrite(input.getPipeline().getOptions());
+ rowPCollection.apply(write);
+
+ return PCollectionRowTuple.empty(input.getPipeline());
+ }
+
+ BigQueryIO.Write toWrite(PipelineOptions options) {
+ BigQueryIO.Write write =
+ BigQueryIO.write()
+ .to(configuration.getTable())
+ .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
+ .withFormatFunction(BigQueryUtils.toTableRow())
+ // TODO(https://github.com/apache/beam/issues/33074) BatchLoad's
+ // createTempFilePrefixView() doesn't pick up the pipeline option
+ .withCustomGcsTempLocation(
+ ValueProvider.StaticValueProvider.of(options.getTempLocation()))
+ .withWriteDisposition(WriteDisposition.WRITE_APPEND)
+ .useBeamSchema();
+
+ if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
+ CreateDisposition createDisposition =
+ CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());
+ write = write.withCreateDisposition(createDisposition);
+ }
+ if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
+ WriteDisposition writeDisposition =
+ WriteDisposition.valueOf(configuration.getWriteDisposition().toUpperCase());
+ write = write.withWriteDisposition(writeDisposition);
+ }
+ if (!Strings.isNullOrEmpty(configuration.getKmsKey())) {
+ write = write.withKmsKey(configuration.getKmsKey());
+ }
+ if (testBigQueryServices != null) {
+ write = write.withTestServices(testBigQueryServices);
+ }
+
+ return write;
+ }
+
+ /** Setter for testing using {@link BigQueryServices}. */
+ @VisibleForTesting
+ void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+ this.testBigQueryServices = testBigQueryServices;
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQuerySchemaTransformTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQuerySchemaTransformTranslation.java
new file mode 100644
index 000000000000..555df0d0a2b8
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQuerySchemaTransformTranslation.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransform;
+import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteSchemaTransformProvider.BigQueryWriteSchemaTransform;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class BigQuerySchemaTransformTranslation {
+ public static class BigQueryStorageReadSchemaTransformTranslator
+ extends SchemaTransformTranslation.SchemaTransformPayloadTranslator<
+ BigQueryDirectReadSchemaTransform> {
+ @Override
+ public SchemaTransformProvider provider() {
+ return new BigQueryDirectReadSchemaTransformProvider();
+ }
+
+ @Override
+ public Row toConfigRow(BigQueryDirectReadSchemaTransform transform) {
+ return transform.getConfigurationRow();
+ }
+ }
+
+ public static class BigQueryWriteSchemaTransformTranslator
+ extends SchemaTransformTranslation.SchemaTransformPayloadTranslator<
+ BigQueryWriteSchemaTransform> {
+ @Override
+ public SchemaTransformProvider provider() {
+ return new BigQueryWriteSchemaTransformProvider();
+ }
+
+ @Override
+ public Row toConfigRow(BigQueryWriteSchemaTransform transform) {
+ return transform.getConfigurationRow();
+ }
+ }
+
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class ReadWriteRegistrar implements TransformPayloadTranslatorRegistrar {
+ @Override
+ @SuppressWarnings({
+ "rawtypes",
+ })
+ public Map<
+ ? extends Class extends PTransform>,
+ ? extends PTransformTranslation.TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return ImmutableMap
+ ., PTransformTranslation.TransformPayloadTranslator>builder()
+ .put(
+ BigQueryDirectReadSchemaTransform.class,
+ new BigQueryStorageReadSchemaTransformTranslator())
+ .put(BigQueryWriteSchemaTransform.class, new BigQueryWriteSchemaTransformTranslator())
+ .build();
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index c1c06fc592f4..c45433aaf0e7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -17,20 +17,16 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery.providers;
+import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.service.AutoService;
-import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
@@ -42,15 +38,11 @@
import org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
-import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
@@ -65,12 +57,11 @@
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
/**
* An implementation of {@link TypedSchemaTransformProvider} for BigQuery Storage Write API jobs
- * configured via {@link BigQueryStorageWriteApiSchemaTransformConfiguration}.
+ * configured via {@link BigQueryWriteConfiguration}.
*
* Internal only: This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
@@ -81,7 +72,7 @@
})
@AutoService(SchemaTransformProvider.class)
public class BigQueryStorageWriteApiSchemaTransformProvider
- extends TypedSchemaTransformProvider {
+ extends TypedSchemaTransformProvider {
private static final Integer DEFAULT_TRIGGER_FREQUENCY_SECS = 5;
private static final Duration DEFAULT_TRIGGERING_FREQUENCY =
Duration.standardSeconds(DEFAULT_TRIGGER_FREQUENCY_SECS);
@@ -89,7 +80,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
private static final String FAILED_ROWS_TAG = "FailedRows";
private static final String FAILED_ROWS_WITH_ERRORS_TAG = "FailedRowsWithErrors";
// magic string that tells us to write to dynamic destinations
- protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";
protected static final String ROW_PROPERTY_MUTATION_INFO = "row_mutation_info";
protected static final String ROW_PROPERTY_MUTATION_TYPE = "mutation_type";
protected static final String ROW_PROPERTY_MUTATION_SQN = "change_sequence_number";
@@ -100,14 +90,13 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
.build();
@Override
- protected SchemaTransform from(
- BigQueryStorageWriteApiSchemaTransformConfiguration configuration) {
+ protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
return new BigQueryStorageWriteApiSchemaTransform(configuration);
}
@Override
public String identifier() {
- return String.format("beam:schematransform:org.apache.beam:bigquery_storage_write:v2");
+ return "beam:schematransform:org.apache.beam:bigquery_storage_write:v2";
}
@Override
@@ -130,201 +119,17 @@ public List outputCollectionNames() {
return Arrays.asList(FAILED_ROWS_TAG, FAILED_ROWS_WITH_ERRORS_TAG, "errors");
}
- /** Configuration for writing to BigQuery with Storage Write API. */
- @DefaultSchema(AutoValueSchema.class)
- @AutoValue
- public abstract static class BigQueryStorageWriteApiSchemaTransformConfiguration {
-
- static final Map CREATE_DISPOSITIONS =
- ImmutableMap.builder()
- .put(CreateDisposition.CREATE_IF_NEEDED.name(), CreateDisposition.CREATE_IF_NEEDED)
- .put(CreateDisposition.CREATE_NEVER.name(), CreateDisposition.CREATE_NEVER)
- .build();
-
- static final Map WRITE_DISPOSITIONS =
- ImmutableMap.builder()
- .put(WriteDisposition.WRITE_TRUNCATE.name(), WriteDisposition.WRITE_TRUNCATE)
- .put(WriteDisposition.WRITE_EMPTY.name(), WriteDisposition.WRITE_EMPTY)
- .put(WriteDisposition.WRITE_APPEND.name(), WriteDisposition.WRITE_APPEND)
- .build();
-
- @AutoValue
- public abstract static class ErrorHandling {
- @SchemaFieldDescription("The name of the output PCollection containing failed writes.")
- public abstract String getOutput();
-
- public static Builder builder() {
- return new AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration_ErrorHandling
- .Builder();
- }
-
- @AutoValue.Builder
- public abstract static class Builder {
- public abstract Builder setOutput(String output);
-
- public abstract ErrorHandling build();
- }
- }
-
- public void validate() {
- String invalidConfigMessage = "Invalid BigQuery Storage Write configuration: ";
-
- // validate output table spec
- checkArgument(
- !Strings.isNullOrEmpty(this.getTable()),
- invalidConfigMessage + "Table spec for a BigQuery Write must be specified.");
-
- // if we have an input table spec, validate it
- if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) {
- checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable()));
- }
-
- // validate create and write dispositions
- if (!Strings.isNullOrEmpty(this.getCreateDisposition())) {
- checkNotNull(
- CREATE_DISPOSITIONS.get(this.getCreateDisposition().toUpperCase()),
- invalidConfigMessage
- + "Invalid create disposition (%s) was specified. Available dispositions are: %s",
- this.getCreateDisposition(),
- CREATE_DISPOSITIONS.keySet());
- }
- if (!Strings.isNullOrEmpty(this.getWriteDisposition())) {
- checkNotNull(
- WRITE_DISPOSITIONS.get(this.getWriteDisposition().toUpperCase()),
- invalidConfigMessage
- + "Invalid write disposition (%s) was specified. Available dispositions are: %s",
- this.getWriteDisposition(),
- WRITE_DISPOSITIONS.keySet());
- }
-
- if (this.getErrorHandling() != null) {
- checkArgument(
- !Strings.isNullOrEmpty(this.getErrorHandling().getOutput()),
- invalidConfigMessage + "Output must not be empty if error handling specified.");
- }
-
- if (this.getAutoSharding() != null
- && this.getAutoSharding()
- && this.getNumStreams() != null) {
- checkArgument(
- this.getNumStreams() == 0,
- invalidConfigMessage
- + "Cannot set a fixed number of streams when auto-sharding is enabled. Please pick only one of the two options.");
- }
- }
-
- /**
- * Instantiates a {@link BigQueryStorageWriteApiSchemaTransformConfiguration.Builder} instance.
- */
- public static Builder builder() {
- return new AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration
- .Builder();
- }
-
- @SchemaFieldDescription(
- "The bigquery table to write to. Format: [${PROJECT}:]${DATASET}.${TABLE}")
- public abstract String getTable();
-
- @SchemaFieldDescription(
- "Optional field that specifies whether the job is allowed to create new tables. "
- + "The following values are supported: CREATE_IF_NEEDED (the job may create the table), CREATE_NEVER ("
- + "the job must fail if the table does not exist already).")
- @Nullable
- public abstract String getCreateDisposition();
-
- @SchemaFieldDescription(
- "Specifies the action that occurs if the destination table already exists. "
- + "The following values are supported: "
- + "WRITE_TRUNCATE (overwrites the table data), "
- + "WRITE_APPEND (append the data to the table), "
- + "WRITE_EMPTY (job must fail if the table is not empty).")
- @Nullable
- public abstract String getWriteDisposition();
-
- @SchemaFieldDescription(
- "Determines how often to 'commit' progress into BigQuery. Default is every 5 seconds.")
- @Nullable
- public abstract Long getTriggeringFrequencySeconds();
-
- @SchemaFieldDescription(
- "This option enables lower latency for insertions to BigQuery but may ocassionally "
- + "duplicate data elements.")
- @Nullable
- public abstract Boolean getUseAtLeastOnceSemantics();
-
- @SchemaFieldDescription(
- "This option enables using a dynamically determined number of Storage Write API streams to write to "
- + "BigQuery. Only applicable to unbounded data.")
- @Nullable
- public abstract Boolean getAutoSharding();
-
- @SchemaFieldDescription(
- "Specifies the number of write streams that the Storage API sink will use. "
- + "This parameter is only applicable when writing unbounded data.")
- @Nullable
- public abstract Integer getNumStreams();
-
- @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
- @Nullable
- public abstract ErrorHandling getErrorHandling();
-
- @SchemaFieldDescription(
- "This option enables the use of BigQuery CDC functionality. The expected PCollection"
- + " should contain Beam Rows with a schema wrapping the record to be inserted and"
- + " adding the CDC info similar to: {row_mutation_info: {mutation_type:\"...\", "
- + "change_sequence_number:\"...\"}, record: {...}}")
- @Nullable
- public abstract Boolean getUseCdcWrites();
-
- @SchemaFieldDescription(
- "If CREATE_IF_NEEDED disposition is set, BigQuery table(s) will be created with this"
- + " columns as primary key. Required when CDC writes are enabled with CREATE_IF_NEEDED.")
- @Nullable
- public abstract List getPrimaryKey();
-
- /** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */
- @AutoValue.Builder
- public abstract static class Builder {
-
- public abstract Builder setTable(String table);
-
- public abstract Builder setCreateDisposition(String createDisposition);
-
- public abstract Builder setWriteDisposition(String writeDisposition);
-
- public abstract Builder setTriggeringFrequencySeconds(Long seconds);
-
- public abstract Builder setUseAtLeastOnceSemantics(Boolean use);
-
- public abstract Builder setAutoSharding(Boolean autoSharding);
-
- public abstract Builder setNumStreams(Integer numStreams);
-
- public abstract Builder setErrorHandling(ErrorHandling errorHandling);
-
- public abstract Builder setUseCdcWrites(Boolean cdcWrites);
-
- public abstract Builder setPrimaryKey(List pkColumns);
-
- /** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */
- public abstract BigQueryStorageWriteApiSchemaTransformProvider
- .BigQueryStorageWriteApiSchemaTransformConfiguration
- build();
- }
- }
-
/**
* A {@link SchemaTransform} for BigQuery Storage Write API, configured with {@link
- * BigQueryStorageWriteApiSchemaTransformConfiguration} and instantiated by {@link
+ * BigQueryWriteConfiguration} and instantiated by {@link
* BigQueryStorageWriteApiSchemaTransformProvider}.
*/
- protected static class BigQueryStorageWriteApiSchemaTransform extends SchemaTransform {
+ public static class BigQueryStorageWriteApiSchemaTransform extends SchemaTransform {
private BigQueryServices testBigQueryServices = null;
- private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration;
+ private final BigQueryWriteConfiguration configuration;
- BigQueryStorageWriteApiSchemaTransform(
- BigQueryStorageWriteApiSchemaTransformConfiguration configuration) {
+ BigQueryStorageWriteApiSchemaTransform(BigQueryWriteConfiguration configuration) {
configuration.validate();
this.configuration = configuration;
}
@@ -420,8 +225,7 @@ public TableConstraints getTableConstraints(String destination) {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
// Check that the input exists
- checkArgument(input.has(INPUT_ROWS_TAG), "Missing expected input tag: %s", INPUT_ROWS_TAG);
- PCollection inputRows = input.get(INPUT_ROWS_TAG);
+ PCollection inputRows = input.getSinglePCollection();
BigQueryIO.Write write = createStorageWriteApiTransform(inputRows.getSchema());
@@ -540,18 +344,18 @@ BigQueryIO.Write createStorageWriteApiTransform(Schema schema) {
if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
- BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS.get(
- configuration.getCreateDisposition().toUpperCase());
+ CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());
write = write.withCreateDisposition(createDisposition);
}
if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
WriteDisposition writeDisposition =
- BigQueryStorageWriteApiSchemaTransformConfiguration.WRITE_DISPOSITIONS.get(
- configuration.getWriteDisposition().toUpperCase());
+ WriteDisposition.valueOf(configuration.getWriteDisposition().toUpperCase());
write = write.withWriteDisposition(writeDisposition);
}
-
+ if (!Strings.isNullOrEmpty(configuration.getKmsKey())) {
+ write = write.withKmsKey(configuration.getKmsKey());
+ }
if (this.testBigQueryServices != null) {
write = write.withTestServices(testBigQueryServices);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
new file mode 100644
index 000000000000..4296da7e0cd5
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+
+/**
+ * Configuration for writing to BigQuery with SchemaTransforms. Used by {@link
+ * BigQueryStorageWriteApiSchemaTransformProvider} and {@link
+ * BigQueryFileLoadsSchemaTransformProvider}.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQueryWriteConfiguration {
+ protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";
+
+ @AutoValue
+ public abstract static class ErrorHandling {
+ @SchemaFieldDescription("The name of the output PCollection containing failed writes.")
+ public abstract String getOutput();
+
+ public static Builder builder() {
+ return new AutoValue_BigQueryWriteConfiguration_ErrorHandling.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setOutput(String output);
+
+ public abstract ErrorHandling build();
+ }
+ }
+
+ public void validate() {
+ String invalidConfigMessage = "Invalid BigQuery Storage Write configuration: ";
+
+ // validate output table spec
+ checkArgument(
+ !Strings.isNullOrEmpty(this.getTable()),
+ invalidConfigMessage + "Table spec for a BigQuery Write must be specified.");
+
+ // if we have an input table spec, validate it
+ if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) {
+ checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable()));
+ }
+
+ // validate create and write dispositions
+ String createDisposition = getCreateDisposition();
+ if (createDisposition != null && !createDisposition.isEmpty()) {
+ List createDispositions =
+ Arrays.stream(BigQueryIO.Write.CreateDisposition.values())
+ .map(c -> c.name())
+ .collect(Collectors.toList());
+ Preconditions.checkArgument(
+ createDispositions.contains(createDisposition.toUpperCase()),
+ "Invalid create disposition (%s) was specified. Available dispositions are: %s",
+ createDisposition,
+ createDispositions);
+ }
+ String writeDisposition = getWriteDisposition();
+ if (writeDisposition != null && !writeDisposition.isEmpty()) {
+ List writeDispostions =
+ Arrays.stream(BigQueryIO.Write.WriteDisposition.values())
+ .map(w -> w.name())
+ .collect(Collectors.toList());
+ Preconditions.checkArgument(
+ writeDispostions.contains(writeDisposition.toUpperCase()),
+ "Invalid write disposition (%s) was specified. Available dispositions are: %s",
+ writeDisposition,
+ writeDispostions);
+ }
+
+ ErrorHandling errorHandling = getErrorHandling();
+ if (errorHandling != null) {
+ checkArgument(
+ !Strings.isNullOrEmpty(errorHandling.getOutput()),
+ invalidConfigMessage + "Output must not be empty if error handling specified.");
+ }
+
+ Boolean autoSharding = getAutoSharding();
+ Integer numStreams = getNumStreams();
+ if (autoSharding != null && autoSharding && numStreams != null) {
+ checkArgument(
+ numStreams == 0,
+ invalidConfigMessage
+ + "Cannot set a fixed number of streams when auto-sharding is enabled. Please pick only one of the two options.");
+ }
+ }
+
+ /** Instantiates a {@link BigQueryWriteConfiguration.Builder} instance. */
+ public static Builder builder() {
+ return new AutoValue_BigQueryWriteConfiguration.Builder();
+ }
+
+ @SchemaFieldDescription(
+ "The bigquery table to write to. Format: [${PROJECT}:]${DATASET}.${TABLE}")
+ public abstract String getTable();
+
+ @SchemaFieldDescription(
+ "Optional field that specifies whether the job is allowed to create new tables. "
+ + "The following values are supported: CREATE_IF_NEEDED (the job may create the table), CREATE_NEVER ("
+ + "the job must fail if the table does not exist already).")
+ @Nullable
+ public abstract String getCreateDisposition();
+
+ @SchemaFieldDescription(
+ "Specifies the action that occurs if the destination table already exists. "
+ + "The following values are supported: "
+ + "WRITE_TRUNCATE (overwrites the table data), "
+ + "WRITE_APPEND (append the data to the table), "
+ + "WRITE_EMPTY (job must fail if the table is not empty).")
+ @Nullable
+ public abstract String getWriteDisposition();
+
+ @SchemaFieldDescription(
+ "Determines how often to 'commit' progress into BigQuery. Default is every 5 seconds.")
+ @Nullable
+ public abstract Long getTriggeringFrequencySeconds();
+
+ @SchemaFieldDescription(
+ "This option enables lower latency for insertions to BigQuery but may ocassionally "
+ + "duplicate data elements.")
+ @Nullable
+ public abstract Boolean getUseAtLeastOnceSemantics();
+
+ @SchemaFieldDescription(
+ "This option enables using a dynamically determined number of Storage Write API streams to write to "
+ + "BigQuery. Only applicable to unbounded data.")
+ @Nullable
+ public abstract Boolean getAutoSharding();
+
+ @SchemaFieldDescription(
+ "Specifies the number of write streams that the Storage API sink will use. "
+ + "This parameter is only applicable when writing unbounded data.")
+ @Nullable
+ public abstract Integer getNumStreams();
+
+ @SchemaFieldDescription("Use this Cloud KMS key to encrypt your data")
+ @Nullable
+ public abstract String getKmsKey();
+
+ @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
+ @Nullable
+ public abstract ErrorHandling getErrorHandling();
+
+ @SchemaFieldDescription(
+ "This option enables the use of BigQuery CDC functionality. The expected PCollection"
+ + " should contain Beam Rows with a schema wrapping the record to be inserted and"
+ + " adding the CDC info similar to: {row_mutation_info: {mutation_type:\"...\", "
+ + "change_sequence_number:\"...\"}, record: {...}}")
+ @Nullable
+ public abstract Boolean getUseCdcWrites();
+
+ @SchemaFieldDescription(
+ "If CREATE_IF_NEEDED disposition is set, BigQuery table(s) will be created with this"
+ + " columns as primary key. Required when CDC writes are enabled with CREATE_IF_NEEDED.")
+ @Nullable
+ public abstract List getPrimaryKey();
+
+ /** Builder for {@link BigQueryWriteConfiguration}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setTable(String table);
+
+ public abstract Builder setCreateDisposition(String createDisposition);
+
+ public abstract Builder setWriteDisposition(String writeDisposition);
+
+ public abstract Builder setTriggeringFrequencySeconds(Long seconds);
+
+ public abstract Builder setUseAtLeastOnceSemantics(Boolean use);
+
+ public abstract Builder setAutoSharding(Boolean autoSharding);
+
+ public abstract Builder setNumStreams(Integer numStreams);
+
+ public abstract Builder setKmsKey(String kmsKey);
+
+ public abstract Builder setErrorHandling(ErrorHandling errorHandling);
+
+ public abstract Builder setUseCdcWrites(Boolean cdcWrites);
+
+ public abstract Builder setPrimaryKey(List pkColumns);
+
+ /** Builds a {@link BigQueryWriteConfiguration} instance. */
+ public abstract BigQueryWriteConfiguration build();
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java
new file mode 100644
index 000000000000..abab169d6932
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A BigQuery Write SchemaTransformProvider that routes to either {@link
+ * BigQueryFileLoadsSchemaTransformProvider} or {@link
+ * BigQueryStorageWriteApiSchemaTransformProvider}.
+ *
+ * Internal only. Used by the Managed Transform layer.
+ */
+@Internal
+@AutoService(SchemaTransformProvider.class)
+public class BigQueryWriteSchemaTransformProvider
+ extends TypedSchemaTransformProvider {
+ @Override
+ public String identifier() {
+ return getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE);
+ }
+
+ @Override
+ protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
+ return new BigQueryWriteSchemaTransform(configuration);
+ }
+
+ public static class BigQueryWriteSchemaTransform extends SchemaTransform {
+ private final BigQueryWriteConfiguration configuration;
+
+ BigQueryWriteSchemaTransform(BigQueryWriteConfiguration configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ if (input.getSinglePCollection().isBounded().equals(PCollection.IsBounded.BOUNDED)) {
+ return input.apply(new BigQueryFileLoadsSchemaTransformProvider().from(configuration));
+ } else { // UNBOUNDED
+ return input.apply(
+ new BigQueryStorageWriteApiSchemaTransformProvider().from(configuration));
+ }
+ }
+
+ public Row getConfigurationRow() {
+ try {
+ // To stay consistent with our SchemaTransform configuration naming conventions,
+ // we sort lexicographically
+ return SchemaRegistry.createDefault()
+ .getToRowFunction(BigQueryWriteConfiguration.class)
+ .apply(configuration)
+ .sorted()
+ .toSnakeCase();
+ } catch (NoSuchSchemaException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java
deleted file mode 100644
index dd8bb9fc8664..000000000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryFileLoadsWriteSchemaTransformProvider.INPUT_TAG;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
-
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryFileLoadsWriteSchemaTransformProvider.BigQueryWriteSchemaTransform;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
-import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
-import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.Field;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayData.Identifier;
-import org.apache.beam.sdk.transforms.display.DisplayData.Item;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.commons.lang3.tuple.Pair;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Test for {@link BigQueryFileLoadsWriteSchemaTransformProvider}. */
-@RunWith(JUnit4.class)
-public class BigQueryFileLoadsWriteSchemaTransformProviderTest {
-
- private static final String PROJECT = "fakeproject";
- private static final String DATASET = "fakedataset";
- private static final String TABLE_ID = "faketable";
-
- private static final TableReference TABLE_REFERENCE =
- new TableReference().setProjectId(PROJECT).setDatasetId(DATASET).setTableId(TABLE_ID);
-
- private static final Schema SCHEMA =
- Schema.of(Field.of("name", FieldType.STRING), Field.of("number", FieldType.INT64));
-
- private static final TableSchema TABLE_SCHEMA = BigQueryUtils.toTableSchema(SCHEMA);
-
- private static final List ROWS =
- Arrays.asList(
- Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 1L).build(),
- Row.withSchema(SCHEMA).withFieldValue("name", "b").withFieldValue("number", 2L).build(),
- Row.withSchema(SCHEMA).withFieldValue("name", "c").withFieldValue("number", 3L).build());
-
- private static final BigQueryOptions OPTIONS =
- TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
- private final FakeDatasetService fakeDatasetService = new FakeDatasetService();
- private final FakeJobService fakeJobService = new FakeJobService();
- private final TemporaryFolder temporaryFolder = new TemporaryFolder();
- private final FakeBigQueryServices fakeBigQueryServices =
- new FakeBigQueryServices()
- .withJobService(fakeJobService)
- .withDatasetService(fakeDatasetService);
-
- @Before
- public void setUp() throws IOException, InterruptedException {
- FakeDatasetService.setUp();
- fakeDatasetService.createDataset(PROJECT, DATASET, "", "", null);
- temporaryFolder.create();
- OPTIONS.setProject(PROJECT);
- OPTIONS.setTempLocation(temporaryFolder.getRoot().getAbsolutePath());
- }
-
- @After
- public void tearDown() {
- temporaryFolder.delete();
- }
-
- @Rule public transient TestPipeline p = TestPipeline.fromOptions(OPTIONS);
-
- @Test
- public void testLoad() throws IOException, InterruptedException {
- BigQueryFileLoadsWriteSchemaTransformProvider provider =
- new BigQueryFileLoadsWriteSchemaTransformProvider();
- BigQueryFileLoadsWriteSchemaTransformConfiguration configuration =
- BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
- .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
- .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE.name())
- .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
- .build();
- BigQueryWriteSchemaTransform schemaTransform =
- (BigQueryWriteSchemaTransform) provider.from(configuration);
- schemaTransform.setTestBigQueryServices(fakeBigQueryServices);
- String tag = provider.inputCollectionNames().get(0);
- PCollectionRowTuple input =
- PCollectionRowTuple.of(tag, p.apply(Create.of(ROWS).withRowSchema(SCHEMA)));
- input.apply(schemaTransform);
-
- p.run();
-
- assertNotNull(fakeDatasetService.getTable(TABLE_REFERENCE));
- assertEquals(ROWS.size(), fakeDatasetService.getAllRows(PROJECT, DATASET, TABLE_ID).size());
- }
-
- @Test
- public void testValidatePipelineOptions() {
- List<
- Pair<
- BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder,
- Class extends Exception>>>
- cases =
- Arrays.asList(
- Pair.of(
- BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
- .setTableSpec("project.doesnot.exist")
- .setCreateDisposition(CreateDisposition.CREATE_NEVER.name())
- .setWriteDisposition(WriteDisposition.WRITE_APPEND.name()),
- InvalidConfigurationException.class),
- Pair.of(
- BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
- .setTableSpec(String.format("%s.%s.%s", PROJECT, DATASET, "doesnotexist"))
- .setCreateDisposition(CreateDisposition.CREATE_NEVER.name())
- .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name()),
- InvalidConfigurationException.class),
- Pair.of(
- BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
- .setTableSpec("project.doesnot.exist")
- .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
- .setWriteDisposition(WriteDisposition.WRITE_APPEND.name()),
- null));
- for (Pair<
- BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder, Class extends Exception>>
- caze : cases) {
- BigQueryWriteSchemaTransform transform = transformFrom(caze.getLeft().build());
- if (caze.getRight() != null) {
- assertThrows(caze.getRight(), () -> transform.validate(p.getOptions()));
- } else {
- transform.validate(p.getOptions());
- }
- }
- }
-
- @Test
- public void testToWrite() {
- List<
- Pair<
- BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder,
- BigQueryIO.Write>>
- cases =
- Arrays.asList(
- Pair.of(
- BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
- .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
- .setCreateDisposition(CreateDisposition.CREATE_NEVER.name())
- .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name()),
- BigQueryIO.writeTableRows()
- .to(TABLE_REFERENCE)
- .withCreateDisposition(CreateDisposition.CREATE_NEVER)
- .withWriteDisposition(WriteDisposition.WRITE_EMPTY)
- .withSchema(TABLE_SCHEMA)),
- Pair.of(
- BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
- .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
- .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
- .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE.name()),
- BigQueryIO.writeTableRows()
- .to(TABLE_REFERENCE)
- .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
- .withSchema(TABLE_SCHEMA)));
- for (Pair<
- BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder, BigQueryIO.Write>
- caze : cases) {
- BigQueryWriteSchemaTransform transform = transformFrom(caze.getLeft().build());
- Map gotDisplayData = DisplayData.from(transform.toWrite(SCHEMA)).asMap();
- Map wantDisplayData = DisplayData.from(caze.getRight()).asMap();
- Set keys = new HashSet<>();
- keys.addAll(gotDisplayData.keySet());
- keys.addAll(wantDisplayData.keySet());
- for (Identifier key : keys) {
- Item got = null;
- Item want = null;
- if (gotDisplayData.containsKey(key)) {
- got = gotDisplayData.get(key);
- }
- if (wantDisplayData.containsKey(key)) {
- want = wantDisplayData.get(key);
- }
- assertEquals(want, got);
- }
- }
- }
-
- @Test
- public void validatePCollectionRowTupleInput() {
- PCollectionRowTuple empty = PCollectionRowTuple.empty(p);
- PCollectionRowTuple valid =
- PCollectionRowTuple.of(
- INPUT_TAG, p.apply("CreateRowsWithValidSchema", Create.of(ROWS)).setRowSchema(SCHEMA));
-
- PCollectionRowTuple invalid =
- PCollectionRowTuple.of(
- INPUT_TAG,
- p.apply(
- "CreateRowsWithInvalidSchema",
- Create.of(
- Row.nullRow(
- Schema.builder().addNullableField("name", FieldType.STRING).build()))));
-
- BigQueryWriteSchemaTransform transform =
- transformFrom(
- BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
- .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
- .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
- .setWriteDisposition(WriteDisposition.WRITE_APPEND.name())
- .build());
-
- assertThrows(IllegalArgumentException.class, () -> transform.validate(empty));
-
- assertThrows(IllegalStateException.class, () -> transform.validate(invalid));
-
- transform.validate(valid);
-
- p.run();
- }
-
- private BigQueryWriteSchemaTransform transformFrom(
- BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
- BigQueryFileLoadsWriteSchemaTransformProvider provider =
- new BigQueryFileLoadsWriteSchemaTransformProvider();
- BigQueryWriteSchemaTransform transform =
- (BigQueryWriteSchemaTransform) provider.from(configuration);
-
- transform.setTestBigQueryServices(fakeBigQueryServices);
-
- return transform;
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java
new file mode 100644
index 000000000000..897d95da3b13
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.google.api.services.bigquery.model.TableReference;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider.BigQueryFileLoadsSchemaTransform;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQueryFileLoadsSchemaTransformProvider}. */
+@RunWith(JUnit4.class)
+public class BigQueryFileLoadsSchemaTransformProviderTest {
+
+ private static final String PROJECT = "fakeproject";
+ private static final String DATASET = "fakedataset";
+ private static final String TABLE_ID = "faketable";
+
+ private static final TableReference TABLE_REFERENCE =
+ new TableReference().setProjectId(PROJECT).setDatasetId(DATASET).setTableId(TABLE_ID);
+
+ private static final Schema SCHEMA =
+ Schema.of(Field.of("name", FieldType.STRING), Field.of("number", FieldType.INT64));
+
+ private static final List ROWS =
+ Arrays.asList(
+ Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 1L).build(),
+ Row.withSchema(SCHEMA).withFieldValue("name", "b").withFieldValue("number", 2L).build(),
+ Row.withSchema(SCHEMA).withFieldValue("name", "c").withFieldValue("number", 3L).build());
+
+ private static final BigQueryOptions OPTIONS =
+ TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+ private final FakeDatasetService fakeDatasetService = new FakeDatasetService();
+ private final FakeJobService fakeJobService = new FakeJobService();
+ private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+ private final FakeBigQueryServices fakeBigQueryServices =
+ new FakeBigQueryServices()
+ .withJobService(fakeJobService)
+ .withDatasetService(fakeDatasetService);
+
+ @Before
+ public void setUp() throws IOException, InterruptedException {
+ FakeDatasetService.setUp();
+ fakeDatasetService.createDataset(PROJECT, DATASET, "", "", null);
+ temporaryFolder.create();
+ OPTIONS.setProject(PROJECT);
+ OPTIONS.setTempLocation(temporaryFolder.getRoot().getAbsolutePath());
+ }
+
+ @After
+ public void tearDown() {
+ temporaryFolder.delete();
+ }
+
+ @Rule public transient TestPipeline p = TestPipeline.fromOptions(OPTIONS);
+
+ @Test
+ public void testLoad() throws IOException, InterruptedException {
+ BigQueryFileLoadsSchemaTransformProvider provider =
+ new BigQueryFileLoadsSchemaTransformProvider();
+ BigQueryWriteConfiguration configuration =
+ BigQueryWriteConfiguration.builder()
+ .setTable(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
+ .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE.name())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
+ .build();
+ BigQueryFileLoadsSchemaTransform schemaTransform =
+ (BigQueryFileLoadsSchemaTransform) provider.from(configuration);
+ schemaTransform.setTestBigQueryServices(fakeBigQueryServices);
+ String tag = provider.inputCollectionNames().get(0);
+ PCollectionRowTuple input =
+ PCollectionRowTuple.of(tag, p.apply(Create.of(ROWS).withRowSchema(SCHEMA)));
+ input.apply(schemaTransform);
+
+ p.run();
+
+ assertNotNull(fakeDatasetService.getTable(TABLE_REFERENCE));
+ assertEquals(ROWS.size(), fakeDatasetService.getAllRows(PROJECT, DATASET, TABLE_ID).size());
+ }
+
+ @Test
+ public void testManagedChoosesFileLoadsForBoundedWrites() {
+ PCollection batchInput = p.apply(Create.of(ROWS)).setRowSchema(SCHEMA);
+ batchInput.apply(
+ Managed.write(Managed.BIGQUERY)
+ .withConfig(ImmutableMap.of("table", "project.dataset.table")));
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+ List writeTransformProto =
+ pipelineProto.getComponents().getTransformsMap().values().stream()
+ .filter(
+ tr ->
+ tr.getUniqueName()
+ .contains(BigQueryFileLoadsSchemaTransform.class.getSimpleName()))
+ .collect(Collectors.toList());
+ assertThat(writeTransformProto.size(), greaterThan(0));
+ p.enableAbandonedNodeEnforcement(false);
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java
new file mode 100644
index 000000000000..63727107a651
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** This class tests the execution of {@link Managed} BigQueryIO. */
+@RunWith(JUnit4.class)
+public class BigQueryManagedIT {
+ @Rule public TestName testName = new TestName();
+ @Rule public transient TestPipeline writePipeline = TestPipeline.create();
+ @Rule public transient TestPipeline readPipeline = TestPipeline.create();
+
+ private static final Schema SCHEMA =
+ Schema.of(
+ Schema.Field.of("str", Schema.FieldType.STRING),
+ Schema.Field.of("number", Schema.FieldType.INT64));
+
+ private static final List ROWS =
+ LongStream.range(0, 20)
+ .mapToObj(
+ i ->
+ Row.withSchema(SCHEMA)
+ .withFieldValue("str", Long.toString(i))
+ .withFieldValue("number", i)
+ .build())
+ .collect(Collectors.toList());
+
+ private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryManagedIT");
+
+ private static final String PROJECT =
+ TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+ private static final String BIG_QUERY_DATASET_ID = "bigquery_managed_" + System.nanoTime();
+
+ @BeforeClass
+ public static void setUpTestEnvironment() throws IOException, InterruptedException {
+ // Create one BQ dataset for all test cases.
+ BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null);
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID);
+ }
+
+ @Test
+ public void testBatchFileLoadsWriteRead() {
+ String table =
+ String.format("%s:%s.%s", PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName());
+ Map config = ImmutableMap.of("table", table);
+
+ // file loads requires a GCS temp location
+ String tempLocation = writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot();
+ writePipeline.getOptions().setTempLocation(tempLocation);
+
+ // batch write
+ PCollectionRowTuple.of("input", getInput(writePipeline, false))
+ .apply(Managed.write(Managed.BIGQUERY).withConfig(config));
+ writePipeline.run().waitUntilFinish();
+
+ // read and validate
+ PCollection outputRows =
+ readPipeline
+ .apply(Managed.read(Managed.BIGQUERY).withConfig(config))
+ .getSinglePCollection();
+ PAssert.that(outputRows).containsInAnyOrder(ROWS);
+ readPipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testStreamingStorageWriteRead() {
+ String table =
+ String.format("%s:%s.%s", PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName());
+ Map config = ImmutableMap.of("table", table);
+
+ // streaming write
+ PCollectionRowTuple.of("input", getInput(writePipeline, true))
+ .apply(Managed.write(Managed.BIGQUERY).withConfig(config));
+ writePipeline.run().waitUntilFinish();
+
+ // read and validate
+ PCollection outputRows =
+ readPipeline
+ .apply(Managed.read(Managed.BIGQUERY).withConfig(config))
+ .getSinglePCollection();
+ PAssert.that(outputRows).containsInAnyOrder(ROWS);
+ readPipeline.run().waitUntilFinish();
+ }
+
+ public PCollection getInput(Pipeline p, boolean isStreaming) {
+ if (isStreaming) {
+ return p.apply(
+ PeriodicImpulse.create()
+ .startAt(new Instant(0))
+ .stopAt(new Instant(19))
+ .withInterval(Duration.millis(1)))
+ .apply(
+ MapElements.into(TypeDescriptors.rows())
+ .via(
+ i ->
+ Row.withSchema(SCHEMA)
+ .withFieldValue("str", Long.toString(i.getMillis()))
+ .withFieldValue("number", i.getMillis())
+ .build()))
+ .setRowSchema(SCHEMA);
+ }
+ return p.apply(Create.of(ROWS)).setRowSchema(SCHEMA);
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQuerySchemaTransformTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQuerySchemaTransformTranslationTest.java
new file mode 100644
index 000000000000..822c607aa3c9
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQuerySchemaTransformTranslationTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
+import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransform;
+import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQuerySchemaTransformTranslation.BigQueryStorageReadSchemaTransformTranslator;
+import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQuerySchemaTransformTranslation.BigQueryWriteSchemaTransformTranslator;
+import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteSchemaTransformProvider.BigQueryWriteSchemaTransform;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.construction.BeamUrns;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformTranslationTest {
+ static final BigQueryWriteSchemaTransformProvider WRITE_PROVIDER =
+ new BigQueryWriteSchemaTransformProvider();
+ static final BigQueryDirectReadSchemaTransformProvider READ_PROVIDER =
+ new BigQueryDirectReadSchemaTransformProvider();
+ static final Row WRITE_CONFIG_ROW =
+ Row.withSchema(WRITE_PROVIDER.configurationSchema())
+ .withFieldValue("table", "project:dataset.table")
+ .withFieldValue("create_disposition", "create_never")
+ .withFieldValue("write_disposition", "write_append")
+ .withFieldValue("triggering_frequency_seconds", 5L)
+ .withFieldValue("use_at_least_once_semantics", false)
+ .withFieldValue("auto_sharding", false)
+ .withFieldValue("num_streams", 5)
+ .withFieldValue("error_handling", null)
+ .build();
+ static final Row READ_CONFIG_ROW =
+ Row.withSchema(READ_PROVIDER.configurationSchema())
+ .withFieldValue("query", null)
+ .withFieldValue("table_spec", "apache-beam-testing.samples.weather_stations")
+ .withFieldValue("row_restriction", "col < 5")
+ .withFieldValue("selected_fields", Arrays.asList("col1", "col2", "col3"))
+ .build();
+
+ @Test
+ public void testRecreateWriteTransformFromRow() {
+ BigQueryWriteSchemaTransform writeTransform =
+ (BigQueryWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG_ROW);
+
+ BigQueryWriteSchemaTransformTranslator translator =
+ new BigQueryWriteSchemaTransformTranslator();
+ Row translatedRow = translator.toConfigRow(writeTransform);
+
+ BigQueryWriteSchemaTransform writeTransformFromRow =
+ translator.fromConfigRow(translatedRow, PipelineOptionsFactory.create());
+
+ assertEquals(WRITE_CONFIG_ROW, writeTransformFromRow.getConfigurationRow());
+ }
+
+ @Test
+ public void testWriteTransformProtoTranslation()
+ throws InvalidProtocolBufferException, IOException {
+ // First build a pipeline
+ Pipeline p = Pipeline.create();
+ Schema inputSchema = Schema.builder().addByteArrayField("b").build();
+ PCollection input =
+ p.apply(
+ Create.of(
+ Collections.singletonList(
+ Row.withSchema(inputSchema).addValue(new byte[] {1, 2, 3}).build())))
+ .setRowSchema(inputSchema);
+
+ BigQueryWriteSchemaTransform writeTransform =
+ (BigQueryWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG_ROW);
+ PCollectionRowTuple.of("input", input).apply(writeTransform);
+
+ // Then translate the pipeline to a proto and extract KafkaWriteSchemaTransform proto
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+ List writeTransformProto =
+ pipelineProto.getComponents().getTransformsMap().values().stream()
+ .filter(
+ tr -> {
+ RunnerApi.FunctionSpec spec = tr.getSpec();
+ try {
+ return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+ && SchemaTransformPayload.parseFrom(spec.getPayload())
+ .getIdentifier()
+ .equals(WRITE_PROVIDER.identifier());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ assertEquals(1, writeTransformProto.size());
+ RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec();
+
+ // Check that the proto contains correct values
+ SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload());
+ Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+ assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec);
+ Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+
+ assertEquals(WRITE_CONFIG_ROW, rowFromSpec);
+
+ // Use the information in the proto to recreate the KafkaWriteSchemaTransform
+ BigQueryWriteSchemaTransformTranslator translator =
+ new BigQueryWriteSchemaTransformTranslator();
+ BigQueryWriteSchemaTransform writeTransformFromSpec =
+ translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+ assertEquals(WRITE_CONFIG_ROW, writeTransformFromSpec.getConfigurationRow());
+ }
+
+ @Test
+ public void testReCreateReadTransformFromRow() {
+ BigQueryDirectReadSchemaTransform readTransform =
+ (BigQueryDirectReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG_ROW);
+
+ BigQueryStorageReadSchemaTransformTranslator translator =
+ new BigQueryStorageReadSchemaTransformTranslator();
+ Row row = translator.toConfigRow(readTransform);
+
+ BigQueryDirectReadSchemaTransform readTransformFromRow =
+ translator.fromConfigRow(row, PipelineOptionsFactory.create());
+
+ assertEquals(READ_CONFIG_ROW, readTransformFromRow.getConfigurationRow());
+ }
+
+ @Test
+ public void testReadTransformProtoTranslation()
+ throws InvalidProtocolBufferException, IOException {
+ // First build a pipeline
+ Pipeline p = Pipeline.create();
+
+ BigQueryDirectReadSchemaTransform readTransform =
+ (BigQueryDirectReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG_ROW);
+
+ PCollectionRowTuple.empty(p).apply(readTransform);
+
+ // Then translate the pipeline to a proto and extract KafkaReadSchemaTransform proto
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+ List readTransformProto =
+ pipelineProto.getComponents().getTransformsMap().values().stream()
+ .filter(
+ tr -> {
+ RunnerApi.FunctionSpec spec = tr.getSpec();
+ try {
+ return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+ && SchemaTransformPayload.parseFrom(spec.getPayload())
+ .getIdentifier()
+ .equals(READ_PROVIDER.identifier());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ assertEquals(1, readTransformProto.size());
+ RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec();
+
+ // Check that the proto contains correct values
+ SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload());
+ Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+ assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec);
+ Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+ assertEquals(READ_CONFIG_ROW, rowFromSpec);
+
+ // Use the information in the proto to recreate the KafkaReadSchemaTransform
+ BigQueryStorageReadSchemaTransformTranslator translator =
+ new BigQueryStorageReadSchemaTransformTranslator();
+ BigQueryDirectReadSchemaTransform readTransformFromSpec =
+ translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+ assertEquals(READ_CONFIG_ROW, readTransformFromSpec.getConfigurationRow());
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
index 87ba2961461a..7b59552bbbe4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery.providers;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
@@ -32,13 +34,14 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform;
-import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
@@ -50,13 +53,16 @@
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -108,15 +114,14 @@ public void setUp() throws Exception {
@Test
public void testInvalidConfig() {
- List invalidConfigs =
+ List invalidConfigs =
Arrays.asList(
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
- .setTable("not_a_valid_table_spec"),
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+ BigQueryWriteConfiguration.builder().setTable("not_a_valid_table_spec"),
+ BigQueryWriteConfiguration.builder()
.setTable("project:dataset.table")
.setCreateDisposition("INVALID_DISPOSITION"));
- for (BigQueryStorageWriteApiSchemaTransformConfiguration.Builder config : invalidConfigs) {
+ for (BigQueryWriteConfiguration.Builder config : invalidConfigs) {
assertThrows(
Exception.class,
() -> {
@@ -125,13 +130,11 @@ public void testInvalidConfig() {
}
}
- public PCollectionRowTuple runWithConfig(
- BigQueryStorageWriteApiSchemaTransformConfiguration config) {
+ public PCollectionRowTuple runWithConfig(BigQueryWriteConfiguration config) {
return runWithConfig(config, ROWS);
}
- public PCollectionRowTuple runWithConfig(
- BigQueryStorageWriteApiSchemaTransformConfiguration config, List inputRows) {
+ public PCollectionRowTuple runWithConfig(BigQueryWriteConfiguration config, List inputRows) {
BigQueryStorageWriteApiSchemaTransformProvider provider =
new BigQueryStorageWriteApiSchemaTransformProvider();
@@ -176,8 +179,8 @@ public boolean rowEquals(Row expectedRow, TableRow actualRow) {
@Test
public void testSimpleWrite() throws Exception {
String tableSpec = "project:dataset.simple_write";
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build();
+ BigQueryWriteConfiguration config =
+ BigQueryWriteConfiguration.builder().setTable(tableSpec).build();
runWithConfig(config, ROWS);
p.run().waitUntilFinish();
@@ -189,9 +192,9 @@ public void testSimpleWrite() throws Exception {
@Test
public void testWriteToDynamicDestinations() throws Exception {
- String dynamic = BigQueryStorageWriteApiSchemaTransformProvider.DYNAMIC_DESTINATIONS;
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(dynamic).build();
+ String dynamic = BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS;
+ BigQueryWriteConfiguration config =
+ BigQueryWriteConfiguration.builder().setTable(dynamic).build();
String baseTableSpec = "project:dataset.dynamic_write_";
@@ -273,8 +276,8 @@ public void testCDCWrites() throws Exception {
String tableSpec = "project:dataset.cdc_write";
List primaryKeyColumns = ImmutableList.of("name");
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+ BigQueryWriteConfiguration config =
+ BigQueryWriteConfiguration.builder()
.setUseAtLeastOnceSemantics(true)
.setTable(tableSpec)
.setUseCdcWrites(true)
@@ -304,9 +307,9 @@ public void testCDCWrites() throws Exception {
@Test
public void testCDCWriteToDynamicDestinations() throws Exception {
List primaryKeyColumns = ImmutableList.of("name");
- String dynamic = BigQueryStorageWriteApiSchemaTransformProvider.DYNAMIC_DESTINATIONS;
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+ String dynamic = BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS;
+ BigQueryWriteConfiguration config =
+ BigQueryWriteConfiguration.builder()
.setUseAtLeastOnceSemantics(true)
.setTable(dynamic)
.setUseCdcWrites(true)
@@ -338,8 +341,8 @@ public void testCDCWriteToDynamicDestinations() throws Exception {
@Test
public void testInputElementCount() throws Exception {
String tableSpec = "project:dataset.input_count";
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build();
+ BigQueryWriteConfiguration config =
+ BigQueryWriteConfiguration.builder().setTable(tableSpec).build();
runWithConfig(config);
PipelineResult result = p.run();
@@ -368,13 +371,11 @@ public void testInputElementCount() throws Exception {
@Test
public void testFailedRows() throws Exception {
String tableSpec = "project:dataset.write_with_fail";
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+ BigQueryWriteConfiguration config =
+ BigQueryWriteConfiguration.builder()
.setTable(tableSpec)
.setErrorHandling(
- BigQueryStorageWriteApiSchemaTransformConfiguration.ErrorHandling.builder()
- .setOutput("FailedRows")
- .build())
+ BigQueryWriteConfiguration.ErrorHandling.builder().setOutput("FailedRows").build())
.build();
String failValue = "fail_me";
@@ -420,13 +421,11 @@ public void testFailedRows() throws Exception {
@Test
public void testErrorCount() throws Exception {
String tableSpec = "project:dataset.error_count";
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+ BigQueryWriteConfiguration config =
+ BigQueryWriteConfiguration.builder()
.setTable(tableSpec)
.setErrorHandling(
- BigQueryStorageWriteApiSchemaTransformConfiguration.ErrorHandling.builder()
- .setOutput("FailedRows")
- .build())
+ BigQueryWriteConfiguration.ErrorHandling.builder().setOutput("FailedRows").build())
.build();
Function shouldFailRow =
@@ -456,4 +455,24 @@ public void testErrorCount() throws Exception {
assertEquals(expectedCount, count.getAttempted());
}
}
+
+ @Test
+ public void testManagedChoosesStorageApiForUnboundedWrites() {
+ PCollection batchInput =
+ p.apply(TestStream.create(SCHEMA).addElements(ROWS.get(0)).advanceWatermarkToInfinity());
+ batchInput.apply(
+ Managed.write(Managed.BIGQUERY)
+ .withConfig(ImmutableMap.of("table", "project.dataset.table")));
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+ List writeTransformProto =
+ pipelineProto.getComponents().getTransformsMap().values().stream()
+ .filter(
+ tr ->
+ tr.getUniqueName()
+ .contains(BigQueryStorageWriteApiSchemaTransform.class.getSimpleName()))
+ .collect(Collectors.toList());
+ assertThat(writeTransformProto.size(), greaterThan(0));
+ p.enableAbandonedNodeEnforcement(false);
+ }
}
diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index 8477726686ee..8e7e0862eff4 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -86,17 +86,20 @@ public class Managed {
// TODO: Dynamically generate a list of supported transforms
public static final String ICEBERG = "iceberg";
public static final String KAFKA = "kafka";
+ public static final String BIGQUERY = "bigquery";
// Supported SchemaTransforms
public static final Map READ_TRANSFORMS =
ImmutableMap.builder()
.put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ))
.put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ))
+ .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ))
.build();
public static final Map WRITE_TRANSFORMS =
ImmutableMap.builder()
.put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE))
.put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE))
+ .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE))
.build();
/**
@@ -104,7 +107,9 @@ public class Managed {
* supported managed sources are:
*
*
- * - {@link Managed#ICEBERG} : Read from Apache Iceberg
+ *
- {@link Managed#ICEBERG} : Read from Apache Iceberg tables
+ *
- {@link Managed#KAFKA} : Read from Apache Kafka topics
+ *
- {@link Managed#BIGQUERY} : Read from GCP BigQuery tables
*
*/
public static ManagedTransform read(String source) {
@@ -124,7 +129,9 @@ public static ManagedTransform read(String source) {
* managed sinks are:
*
*
- * - {@link Managed#ICEBERG} : Write to Apache Iceberg
+ *
- {@link Managed#ICEBERG} : Write to Apache Iceberg tables
+ *
- {@link Managed#KAFKA} : Write to Apache Kafka topics
+ *
- {@link Managed#BIGQUERY} : Write to GCP BigQuery tables
*
*/
public static ManagedTransform write(String sink) {
diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
index 6f97983d3260..b705306b9478 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
@@ -117,7 +117,7 @@ protected void validate() {
"Please specify a config or a config URL, but not both.");
}
- public @Nullable String resolveUnderlyingConfig() {
+ private Map resolveUnderlyingConfig() {
String yamlTransformConfig = getConfig();
// If YAML string is empty, then attempt to read from YAML file
if (Strings.isNullOrEmpty(yamlTransformConfig)) {
@@ -131,7 +131,8 @@ protected void validate() {
throw new RuntimeException(e);
}
}
- return yamlTransformConfig;
+
+ return YamlUtils.yamlStringToMap(yamlTransformConfig);
}
}
@@ -152,34 +153,34 @@ protected SchemaTransform from(ManagedConfig managedConfig) {
static class ManagedSchemaTransform extends SchemaTransform {
private final ManagedConfig managedConfig;
- private final Row underlyingTransformConfig;
+ private final Row underlyingRowConfig;
private final SchemaTransformProvider underlyingTransformProvider;
ManagedSchemaTransform(
ManagedConfig managedConfig, SchemaTransformProvider underlyingTransformProvider) {
// parse config before expansion to check if it matches underlying transform's config schema
Schema transformConfigSchema = underlyingTransformProvider.configurationSchema();
- Row underlyingTransformConfig;
+ Row underlyingRowConfig;
try {
- underlyingTransformConfig = getRowConfig(managedConfig, transformConfigSchema);
+ underlyingRowConfig = getRowConfig(managedConfig, transformConfigSchema);
} catch (Exception e) {
throw new IllegalArgumentException(
"Encountered an error when retrieving a Row configuration", e);
}
- this.managedConfig = managedConfig;
- this.underlyingTransformConfig = underlyingTransformConfig;
+ this.underlyingRowConfig = underlyingRowConfig;
this.underlyingTransformProvider = underlyingTransformProvider;
+ this.managedConfig = managedConfig;
}
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
LOG.debug(
- "Building transform \"{}\" with Row configuration: {}",
+ "Building transform \"{}\" with configuration: {}",
underlyingTransformProvider.identifier(),
- underlyingTransformConfig);
+ underlyingRowConfig);
- return input.apply(underlyingTransformProvider.from(underlyingTransformConfig));
+ return input.apply(underlyingTransformProvider.from(underlyingRowConfig));
}
public ManagedConfig getManagedConfig() {
@@ -201,16 +202,14 @@ Row getConfigurationRow() {
}
}
+ // May return an empty row (perhaps the underlying transform doesn't have any required
+ // parameters)
@VisibleForTesting
static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
- // May return an empty row (perhaps the underlying transform doesn't have any required
- // parameters)
- String yamlConfig = config.resolveUnderlyingConfig();
- Map configMap = YamlUtils.yamlStringToMap(yamlConfig);
-
- // The config Row object will be used to build the underlying SchemaTransform.
- // If a mapping for the SchemaTransform exists, we use it to update parameter names and align
- // with the underlying config schema
+ Map configMap = config.resolveUnderlyingConfig();
+ // Build a config Row that will be used to build the underlying SchemaTransform.
+ // If a mapping for the SchemaTransform exists, we use it to update parameter names to align
+ // with the underlying SchemaTransform config schema
Map mapping = MAPPINGS.get(config.getTransformIdentifier());
if (mapping != null && configMap != null) {
Map remappedConfig = new HashMap<>();
@@ -227,7 +226,7 @@ static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
return YamlUtils.toBeamRow(configMap, transformSchema, false);
}
- // We load providers seperately, after construction, to prevent the
+ // We load providers separately, after construction, to prevent the
// 'ManagedSchemaTransformProvider' from being initialized in a recursive loop
// when being loaded using 'AutoValue'.
synchronized Map getAllProviders() {
diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
index 4cf752747be5..30476a30d373 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
@@ -50,9 +50,27 @@ public class ManagedTransformConstants {
private static final Map KAFKA_WRITE_MAPPINGS =
ImmutableMap.builder().put("data_format", "format").build();
+ private static final Map BIGQUERY_READ_MAPPINGS =
+ ImmutableMap.builder()
+ .put("table", "table_spec")
+ .put("fields", "selected_fields")
+ .build();
+
+ private static final Map BIGQUERY_WRITE_MAPPINGS =
+ ImmutableMap.builder()
+ .put("at_least_once", "use_at_least_once_semantics")
+ .put("triggering_frequency", "triggering_frequency_seconds")
+ .build();
+
public static final Map> MAPPINGS =
ImmutableMap.>builder()
.put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ), KAFKA_READ_MAPPINGS)
.put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE), KAFKA_WRITE_MAPPINGS)
+ .put(
+ getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ),
+ BIGQUERY_READ_MAPPINGS)
+ .put(
+ getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE),
+ BIGQUERY_WRITE_MAPPINGS)
.build();
}
diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
index e9edf8751e34..a287ec6260ce 100644
--- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
+++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
@@ -88,8 +88,7 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {
.withFieldValue("extra_integer", 123)
.build();
Row configRow =
- ManagedSchemaTransformProvider.getRowConfig(
- config, new TestSchemaTransformProvider().configurationSchema());
+ ManagedSchemaTransformProvider.getRowConfig(config, TestSchemaTransformProvider.SCHEMA);
assertEquals(expectedRow, configRow);
}
diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py
index 22ee15b1de1c..cbcb6de56ed7 100644
--- a/sdks/python/apache_beam/transforms/managed.py
+++ b/sdks/python/apache_beam/transforms/managed.py
@@ -77,12 +77,16 @@
ICEBERG = "iceberg"
KAFKA = "kafka"
+BIGQUERY = "bigquery"
_MANAGED_IDENTIFIER = "beam:transform:managed:v1"
_EXPANSION_SERVICE_JAR_TARGETS = {
"sdks:java:io:expansion-service:shadowJar": [KAFKA, ICEBERG],
+ "sdks:java:io:google-cloud-platform:expansion-service:shadowJar": [
+ BIGQUERY
+ ]
}
-__all__ = ["ICEBERG", "KAFKA", "Read", "Write"]
+__all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"]
class Read(PTransform):
@@ -90,6 +94,7 @@ class Read(PTransform):
_READ_TRANSFORMS = {
ICEBERG: ManagedTransforms.Urns.ICEBERG_READ.urn,
KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn,
+ BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn
}
def __init__(
@@ -130,6 +135,7 @@ class Write(PTransform):
_WRITE_TRANSFORMS = {
ICEBERG: ManagedTransforms.Urns.ICEBERG_WRITE.urn,
KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn,
+ BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn
}
def __init__(