Skip to content

Commit

Permalink
expose to Python SDK as well
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Nov 11, 2024
1 parent a600f62 commit ad4dcd9
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
Expand Down Expand Up @@ -87,18 +89,23 @@ public static class BigQueryFileLoadsSchemaTransform extends SchemaTransform {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> rowPCollection = input.getSinglePCollection();
BigQueryIO.Write<Row> write = toWrite();
BigQueryIO.Write<Row> write = toWrite(input.getPipeline().getOptions());
rowPCollection.apply(write);

return PCollectionRowTuple.empty(input.getPipeline());
}

BigQueryIO.Write<Row> toWrite() {
BigQueryIO.Write<Row> toWrite(PipelineOptions options) {
BigQueryIO.Write<Row> write =
BigQueryIO.<Row>write()
.to(configuration.getTable())
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withFormatFunction(BigQueryUtils.toTableRow())
// TODO(https://github.com/apache/beam/issues/33074) BatchLoad's
// createTempFilePrefixView() doesn't pick up the pipeline option
.withCustomGcsTempLocation(
ValueProvider.StaticValueProvider.of(options.getTempLocation()))
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.useBeamSchema();

if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

/** Test for {@link BigQueryFileLoadsSchemaTransformProvider}. */
@RunWith(JUnit4.class)
public class BigQueryFileLoadsWriteSchemaTransformProviderTest {
public class BigQueryFileLoadsSchemaTransformProviderTest {

private static final String PROJECT = "fakeproject";
private static final String DATASET = "fakedataset";
Expand Down
8 changes: 7 additions & 1 deletion sdks/python/apache_beam/transforms/managed.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,24 @@

ICEBERG = "iceberg"
KAFKA = "kafka"
BIGQUERY = "bigquery"
_MANAGED_IDENTIFIER = "beam:transform:managed:v1"
_EXPANSION_SERVICE_JAR_TARGETS = {
"sdks:java:io:expansion-service:shadowJar": [KAFKA, ICEBERG],
"sdks:java:io:google-cloud-platform:expansion-service:shadowJar": [
BIGQUERY
]
}

__all__ = ["ICEBERG", "KAFKA", "Read", "Write"]
__all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"]


class Read(PTransform):
"""Read using Managed Transforms"""
_READ_TRANSFORMS = {
ICEBERG: ManagedTransforms.Urns.ICEBERG_READ.urn,
KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn,
BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn
}

def __init__(
Expand Down Expand Up @@ -130,6 +135,7 @@ class Write(PTransform):
_WRITE_TRANSFORMS = {
ICEBERG: ManagedTransforms.Urns.ICEBERG_WRITE.urn,
KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn,
BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn
}

def __init__(
Expand Down

0 comments on commit ad4dcd9

Please sign in to comment.