Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managed BigQueryIO #31486

Merged
merged 30 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ce14b96
managed bigqueryio
ahmedabu98 Jun 3, 2024
550c1b4
spotless
ahmedabu98 Jun 4, 2024
c94de3c
move managed dependency to test only
ahmedabu98 Jun 4, 2024
912dc08
Merge branch 'master' of https://github.com/ahmedabu98/beam into mana…
ahmedabu98 Jun 5, 2024
f436e62
cleanup after merging snake_case PR
ahmedabu98 Jun 5, 2024
fe60904
choose write method based on boundedness and pipeline options
ahmedabu98 Jul 9, 2024
7d405cf
Merge branch 'master' of https://github.com/ahmedabu98/beam into mana…
ahmedabu98 Jul 9, 2024
d45159f
rename bigquery write config class
ahmedabu98 Jul 9, 2024
989ad0f
spotless
ahmedabu98 Jul 9, 2024
b9b49e7
change read output tag to 'output'
ahmedabu98 Jul 9, 2024
a119bbc
spotless
ahmedabu98 Jul 9, 2024
74bc178
revert logic that depends on DataflowServiceOptions. switching BQ met…
ahmedabu98 Jul 16, 2024
528b504
spotless
ahmedabu98 Jul 16, 2024
dcc398a
fix typo
ahmedabu98 Jul 29, 2024
36edc38
separate BQ write config to a new class
ahmedabu98 Aug 6, 2024
f9be86c
fix doc
ahmedabu98 Aug 6, 2024
bd1e534
Merge branch 'master' of https://github.com/ahmedabu98/beam into mana…
ahmedabu98 Oct 25, 2024
a26765e
resolve after syncing to HEAD
ahmedabu98 Oct 25, 2024
725f7bd
spotless
ahmedabu98 Oct 26, 2024
2631104
fork on batch/streaming
ahmedabu98 Nov 5, 2024
770cf50
cleanup
ahmedabu98 Nov 5, 2024
0a70466
spotless
ahmedabu98 Nov 5, 2024
01a01f7
move forking logic to BQ schematransform side
ahmedabu98 Nov 6, 2024
697c0b8
add file loads translation and tests; add test checks that the correc…
ahmedabu98 Nov 7, 2024
105474b
set top-level wrapper to be the underlying managed BQ transform urn; …
ahmedabu98 Nov 8, 2024
d6b9e69
move unit tests to respectvie schematransform test classes
ahmedabu98 Nov 8, 2024
c0767d7
Merge branch 'master' of https://github.com/ahmedabu98/beam into mana…
ahmedabu98 Nov 8, 2024
a600f62
Merge branch 'master' of https://github.com/ahmedabu98/beam into mana…
ahmedabu98 Nov 8, 2024
ad4dcd9
expose to Python SDK as well
ahmedabu98 Nov 11, 2024
6f325ce
Merge branch 'master' of https://github.com/ahmedabu98/beam into mana…
ahmedabu98 Nov 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/trigger_files/beam_PostCommit_Java_DataflowV2.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run"
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ message ManagedTransforms {
"beam:schematransform:org.apache.beam:kafka_read:v1"];
KAFKA_WRITE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:kafka_write:v1"];
BIGQUERY_READ = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:bigquery_storage_read:v1"];
BIGQUERY_STORAGE_WRITE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:bigquery_storage_write:v2"];
BIGQUERY_FILE_LOADS = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:bigquery_fileloads:v1"];
}
}

Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ dependencies {
testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration")
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
testImplementation project(":sdks:java:managed")
testImplementation project(path: ":sdks:java:io:common")
testImplementation project(path: ":sdks:java:testing:test-utils")
testImplementation library.java.commons_math3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ dependencies {
permitUnusedDeclared project(":sdks:java:io:google-cloud-platform") // BEAM-11761
implementation project(":sdks:java:extensions:schemaio-expansion-service")
permitUnusedDeclared project(":sdks:java:extensions:schemaio-expansion-service") // BEAM-11761
implementation project(":sdks:java:managed")
permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notes, no action required on this PR:

  • This is a link to Jira, so probably there's a github issue it is migrated to
  • This should be equivalent to runtimeOnly because it is "implementation" but no static references to it. I would guess this works the same, or else the uberjar plugin might not treat it right.
  • Putting these deps into a docker container without making an uber jar would honestly be better in the case where it does end up in a container, so we keep the original jar metadata.


runtimeOnly library.java.slf4j_jdk14
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,28 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;

/**
* An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured
* using {@link BigQueryFileLoadsWriteSchemaTransformConfiguration}.
* using {@link BigQueryWriteConfiguration}.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
Expand All @@ -56,201 +50,82 @@
@Internal
@AutoService(SchemaTransformProvider.class)
public class BigQueryFileLoadsWriteSchemaTransformProvider
extends TypedSchemaTransformProvider<BigQueryFileLoadsWriteSchemaTransformConfiguration> {

private static final String IDENTIFIER =
"beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
static final String INPUT_TAG = "INPUT";
extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {

/** Returns the expected class of the configuration. */
@Override
protected Class<BigQueryFileLoadsWriteSchemaTransformConfiguration> configurationClass() {
return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
}
static final String INPUT_TAG = "input";

/** Returns the expected {@link SchemaTransform} of the configuration. */
@Override
protected SchemaTransform from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
return new BigQueryWriteSchemaTransform(configuration);
}

/** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
@Override
public String identifier() {
return IDENTIFIER;
return getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_FILE_LOADS);
}

/**
* Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a
* single is expected, this returns a list with a single name.
*/
@Override
public List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_TAG);
}

/**
* Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
* no output is expected, this returns an empty list.
*/
@Override
public List<String> outputCollectionNames() {
return Collections.emptyList();
}

/**
* A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based on a {@link
* BigQueryFileLoadsWriteSchemaTransformConfiguration}.
*/
protected static class BigQueryWriteSchemaTransform extends SchemaTransform {
/** An instance of {@link BigQueryServices} used for testing. */
private BigQueryServices testBigQueryServices = null;

private final BigQueryFileLoadsWriteSchemaTransformConfiguration configuration;
private final BigQueryWriteConfiguration configuration;
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved

BigQueryWriteSchemaTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
BigQueryWriteSchemaTransform(BigQueryWriteConfiguration configuration) {
configuration.validate();
this.configuration = configuration;
}

@Override
public void validate(PipelineOptions options) {
if (!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name())) {
return;
}
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> rowPCollection = input.getSinglePCollection();
BigQueryIO.Write<Row> write = toWrite();
rowPCollection.apply(write);

BigQueryOptions bigQueryOptions = options.as(BigQueryOptions.class);
return PCollectionRowTuple.empty(input.getPipeline());
}

BigQueryServices bigQueryServices = new BigQueryServicesImpl();
if (testBigQueryServices != null) {
bigQueryServices = testBigQueryServices;
BigQueryIO.Write<Row> toWrite() {
BigQueryIO.Write<Row> write =
BigQueryIO.<Row>write()
.to(configuration.getTable())
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withFormatFunction(BigQueryUtils.toTableRow())
.useBeamSchema();

if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a larger point, I think we should do any transform overriding in job submission (BQ modes for batch/streaming etc.) so that we can just upgrade in the backend (at least in the first version).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean making this switch in the SDK (ie. construction time)? I assumed we had settled on making it a runner side decision

Some decisions are actually dependent on the runner (e.g. at least one streaming mode in Dataflow)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean making this switch in the SDK (ie. construction time)? I assumed we had settled on making it a runner side decision

Yeah. Added some comments to the relavent doc.

write = write.withCreateDisposition(createDisposition);
}

DatasetService datasetService = bigQueryServices.getDatasetService(bigQueryOptions);
TableReference tableReference = BigQueryUtils.toTableReference(configuration.getTableSpec());

try {
Table table = datasetService.getTable(tableReference);
if (table == null) {
throw new NullPointerException();
}

if (table.getSchema() == null) {
throw new InvalidConfigurationException(
String.format("could not fetch schema for table: %s", configuration.getTableSpec()));
}

} catch (NullPointerException | InterruptedException | IOException ex) {
throw new InvalidConfigurationException(
String.format(
"could not fetch table %s, error: %s",
configuration.getTableSpec(), ex.getMessage()));
if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
WriteDisposition writeDisposition =
WriteDisposition.valueOf(configuration.getWriteDisposition().toUpperCase());
write = write.withWriteDisposition(writeDisposition);
}
if (!Strings.isNullOrEmpty(configuration.getKmsKey())) {
write = write.withKmsKey(configuration.getKmsKey());
}
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
validate(input);
PCollection<Row> rowPCollection = input.get(INPUT_TAG);
Schema schema = rowPCollection.getSchema();
BigQueryIO.Write<TableRow> write = toWrite(schema);
if (testBigQueryServices != null) {
write = write.withTestServices(testBigQueryServices);
}

PCollection<TableRow> tableRowPCollection =
rowPCollection.apply(
MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
tableRowPCollection.apply(write);
return PCollectionRowTuple.empty(input.getPipeline());
}

/** Instantiates a {@link BigQueryIO.Write<TableRow>} from a {@link Schema}. */
BigQueryIO.Write<TableRow> toWrite(Schema schema) {
TableSchema tableSchema = BigQueryUtils.toTableSchema(schema);
CreateDisposition createDisposition =
CreateDisposition.valueOf(configuration.getCreateDisposition());
WriteDisposition writeDisposition =
WriteDisposition.valueOf(configuration.getWriteDisposition());

return BigQueryIO.writeTableRows()
.to(configuration.getTableSpec())
.withCreateDisposition(createDisposition)
.withWriteDisposition(writeDisposition)
.withSchema(tableSchema);
return write;
}

/** Setter for testing using {@link BigQueryServices}. */
@VisibleForTesting
void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
this.testBigQueryServices = testBigQueryServices;
}

/** Validate a {@link PCollectionRowTuple} input. */
void validate(PCollectionRowTuple input) {
if (!input.has(INPUT_TAG)) {
throw new IllegalArgumentException(
String.format(
"%s %s is missing expected tag: %s",
getClass().getSimpleName(), input.getClass().getSimpleName(), INPUT_TAG));
}

PCollection<Row> rowInput = input.get(INPUT_TAG);
Schema sourceSchema = rowInput.getSchema();

if (sourceSchema == null) {
throw new IllegalArgumentException(
String.format("%s is null for input of tag: %s", Schema.class, INPUT_TAG));
}

if (!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name())) {
return;
}

BigQueryOptions bigQueryOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);

BigQueryServices bigQueryServices = new BigQueryServicesImpl();
if (testBigQueryServices != null) {
bigQueryServices = testBigQueryServices;
}

DatasetService datasetService = bigQueryServices.getDatasetService(bigQueryOptions);
TableReference tableReference = BigQueryUtils.toTableReference(configuration.getTableSpec());

try {
Table table = datasetService.getTable(tableReference);
if (table == null) {
throw new NullPointerException();
}

TableSchema tableSchema = table.getSchema();
if (tableSchema == null) {
throw new NullPointerException();
}

Schema destinationSchema = BigQueryUtils.fromTableSchema(tableSchema);
if (destinationSchema == null) {
throw new NullPointerException();
}

validateMatching(sourceSchema, destinationSchema);

} catch (NullPointerException | InterruptedException | IOException e) {
throw new InvalidConfigurationException(
String.format(
"could not validate input for create disposition: %s and table: %s, error: %s",
configuration.getCreateDisposition(),
configuration.getTableSpec(),
e.getMessage()));
}
}

void validateMatching(Schema sourceSchema, Schema destinationSchema) {
if (!sourceSchema.equals(destinationSchema)) {
throw new IllegalArgumentException(
String.format(
"source and destination schema mismatch for table: %s",
configuration.getTableSpec()));
}
}
}
}
Loading
Loading