Skip to content

Commit

Permalink
use avro file format
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Dec 16, 2024
1 parent 002d3fd commit 1e1fe85
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -104,13 +103,12 @@ BigQueryIO.Write<Row> toWrite(Schema schema, PipelineOptions options) {
BigQueryIO.<Row>write()
.to(dynamicDestinations)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withFormatFunction(BigQueryUtils.toTableRow())
// TODO(https://github.com/apache/beam/issues/33074) BatchLoad's
// createTempFilePrefixView() doesn't pick up the pipeline option
.withCustomGcsTempLocation(
ValueProvider.StaticValueProvider.of(options.getTempLocation()))
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFormatFunction(dynamicDestinations.getFilterFormatFunction(false));
.withAvroFormatFunction(dynamicDestinations.getAvroFilterFormatFunction(false));

if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.gcp.bigquery.AvroWriteRequest;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
Expand Down Expand Up @@ -102,4 +105,16 @@ public SerializableFunction<Row, TableRow> getFilterFormatFunction(boolean fetch
return BigQueryUtils.toTableRow(filtered);
};
}

public SerializableFunction<AvroWriteRequest<Row>, GenericRecord> getAvroFilterFormatFunction(
boolean fetchNestedRecord) {
return request -> {
Row row = request.getElement();
if (fetchNestedRecord) {
row = checkStateNotNull(row.getRow(RECORD));
}
Row filtered = rowFilter.filter(row);
return AvroUtils.toGenericRecord(filtered);
};
}
}

0 comments on commit 1e1fe85

Please sign in to comment.