Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CreateDisposition, withMethod, and schema for FirestoreToBigQuery template #1245

Merged
2 commits merged into from
Jan 4, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
*/
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.options.BigQueryCommonOptions;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiBatchOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.JavascriptTextTransformerOptions;
Expand Down Expand Up @@ -63,7 +66,8 @@ public interface FirestoreToBigQueryOptions
extends PipelineOptions,
FirestoreReadOptions,
JavascriptTextTransformerOptions,
BigQueryStorageApiBatchOptions {
BigQueryStorageApiBatchOptions,
BigQueryCommonOptions.WriteOptions {
@TemplateParameter.BigQueryTable(
order = 1,
description = "BigQuery output table",
Expand All @@ -83,6 +87,42 @@ public interface FirestoreToBigQueryOptions
String getBigQueryLoadingTemporaryDirectory();

void setBigQueryLoadingTemporaryDirectory(String directory);

@TemplateParameter.GcsReadFile(
order = 3,
optional = true,
description = "Cloud Storage path to BigQuery JSON schema",
helpText =
"The Cloud Storage path for the BigQuery JSON schema. If `createDisposition` is not set, or set to CREATE_IF_NEEDED, this parameter must be specified.",
example = "gs://your-bucket/your-schema.json")
String getBigQuerySchemaPath();

void setBigQuerySchemaPath(String path);
}

private static BigQueryIO.Write<TableRow> writeToBigQuery(FirestoreToBigQueryOptions options) {
BigQueryIO.Write<TableRow> write =
BigQueryIO.writeTableRows()
.withoutValidation()
.to(options.getOutputTableSpec())
.withWriteDisposition(WriteDisposition.valueOf(options.getWriteDisposition()))
.withCustomGcsTempLocation(
StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory()));

if (CreateDisposition.valueOf(options.getCreateDisposition())
== CreateDisposition.CREATE_NEVER) {
write = write.withCreateDisposition(CreateDisposition.CREATE_NEVER);
} else {
write =
write
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withJsonSchema(getGcsFileAsString(options.getBigQuerySchemaPath()));
}
if (options.getUseStorageWriteApi()) {
write = write.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API);
}

return write;
}

/**
Expand Down Expand Up @@ -118,16 +158,7 @@ public TableRow apply(String json) {
return BigQueryConverters.convertJsonToTableRow(json);
}
}))
.apply(
"WriteBigQuery",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.to(options.getOutputTableSpec())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.withCustomGcsTempLocation(
StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory())));
.apply("WriteBigQuery", writeToBigQuery(options));

pipeline.run();
}
Expand Down
Loading