Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting BigQuery endpoint #32153

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* BigQuery endpoint can be overridden via PipelineOptions, this enables BigQuery emulators (Java) ([#28149](https://github.com/apache/beam/issues/28149)).
* Go SDK Minimum Go Version updated to 1.21 ([#32092](https://github.com/apache/beam/pull/32092)).
* Updated Go protobuf package to new version (Go) ([#21515](https://github.com/apache/beam/issues/21515)).

Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ dependencies {
implementation library.java.http_client
implementation library.java.hamcrest
implementation library.java.http_core
implementation library.java.jackson_annotations
implementation library.java.jackson_core
implementation library.java.jackson_databind
implementation library.java.jackson_datatype_joda
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;

Expand Down Expand Up @@ -213,4 +215,12 @@ public interface BigQueryOptions
Boolean getEnableStorageReadApiV2();

void setEnableStorageReadApiV2(Boolean value);

/** BQ endpoint to use. If unspecified, uses the default endpoint. */
@JsonIgnore
@Hidden
@Description("The URL for the BigQuery API.")
String getBigQueryEndpoint();

void setBigQueryEndpoint(String value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand All @@ -158,6 +159,7 @@
"keyfor"
})
public class BigQueryServicesImpl implements BigQueryServices {

private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);

// The maximum number of retries to execute a BigQuery RPC.
Expand Down Expand Up @@ -215,6 +217,7 @@ private static BackOff createDefaultBackoff() {

@VisibleForTesting
static class JobServiceImpl implements BigQueryServices.JobService {

private final ApiErrorExtractor errorExtractor;
private final Bigquery client;
private final BigQueryIOMetadata bqIOMetadata;
Expand All @@ -226,12 +229,18 @@ static class JobServiceImpl implements BigQueryServices.JobService {
this.bqIOMetadata = BigQueryIOMetadata.create();
}

private JobServiceImpl(BigQueryOptions options) {
@VisibleForTesting
JobServiceImpl(BigQueryOptions options) {
this.errorExtractor = new ApiErrorExtractor();
this.client = newBigQueryClient(options).build();
this.bqIOMetadata = BigQueryIOMetadata.create();
}

@VisibleForTesting
Bigquery getClient() {
return client;
}

/**
* {@inheritDoc}
*
Expand Down Expand Up @@ -558,6 +567,7 @@ public void close() throws Exception {}

@VisibleForTesting
public static class DatasetServiceImpl implements DatasetService {

// Backoff: 200ms * 1.5 ^ n, n=[1,5]
private static final FluentBackoff INSERT_BACKOFF_FACTORY =
FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);
Expand Down Expand Up @@ -610,6 +620,11 @@ public DatasetServiceImpl(BigQueryOptions bqOptions) {
this.executor = null;
}

@VisibleForTesting
Bigquery getClient() {
return client;
}

/**
* {@inheritDoc}
*
Expand Down Expand Up @@ -931,6 +946,7 @@ public void deleteDataset(String projectId, String datasetId)
}

static class InsertBatchofRowsCallable implements Callable<List<InsertErrors>> {

private final TableReference ref;
private final Boolean skipInvalidRows;
private final Boolean ignoreUnkownValues;
Expand Down Expand Up @@ -1359,6 +1375,7 @@ public void close() throws Exception {

@VisibleForTesting
public static class WriteStreamServiceImpl implements WriteStreamService {

private final BigQueryWriteClient newWriteClient;
private final long storageWriteMaxInflightRequests;
private final long storageWriteMaxInflightBytes;
Expand All @@ -1383,6 +1400,11 @@ public WriteStreamServiceImpl(BigQueryOptions bqOptions) {
this.bqIOMetadata = BigQueryIOMetadata.create();
}

@VisibleForTesting
BigQueryWriteClient getClient() {
return newWriteClient;
}

@Override
public WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
throws IOException {
Expand Down Expand Up @@ -1599,10 +1621,16 @@ private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) {
HttpRequestInitializer chainInitializer =
new ChainingHttpRequestInitializer(
Iterables.toArray(initBuilder.build(), HttpRequestInitializer.class));
return new Bigquery.Builder(
Transport.getTransport(), Transport.getJsonFactory(), chainInitializer)
.setApplicationName(options.getAppName())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
Bigquery.Builder builder =
new Bigquery.Builder(Transport.getTransport(), Transport.getJsonFactory(), chainInitializer)
.setApplicationName(options.getAppName())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());

@Nullable String endpoint = options.getBigQueryEndpoint();
if (!Strings.isNullOrEmpty(endpoint)) {
builder.setRootUrl(endpoint);
}
return builder;
}

private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions options) {
Expand All @@ -1615,8 +1643,13 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option
.setChannelsPerCpu(2)
.build();

BigQueryWriteSettings.Builder builder = BigQueryWriteSettings.newBuilder();
@Nullable String endpoint = options.getBigQueryEndpoint();
if (!Strings.isNullOrEmpty(endpoint)) {
builder.setEndpoint(trimSchemaIfNecessary(endpoint));
}
return BigQueryWriteClient.create(
BigQueryWriteSettings.newBuilder()
builder
.setCredentialsProvider(() -> options.as(GcpOptions.class).getGcpCredential())
.setTransportChannelProvider(transportChannelProvider)
.setBackgroundExecutorProvider(
Expand All @@ -1628,6 +1661,15 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option
}
}

private static String trimSchemaIfNecessary(String endpoint) {
if (endpoint.startsWith("http://")) {
return endpoint.substring("http://".length());
} else if (endpoint.startsWith("https://")) {
return endpoint.substring("https://".length());
}
return endpoint;
}

public static CustomHttpErrors createBigQueryClientCustomErrors() {
CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();
// 403 errors, to list tables, matching this URL:
Expand Down Expand Up @@ -1725,6 +1767,10 @@ public void onRetryAttempt(Status status, Metadata metadata) {
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
.build())
.setReadRowsRetryAttemptListener(listener);
@Nullable String endpoint = options.getBigQueryEndpoint();
if (!Strings.isNullOrEmpty(endpoint)) {
settingsBuilder.setEndpoint(trimSchemaIfNecessary(endpoint));
}

UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession> createReadSessionSettings =
settingsBuilder.getStubSettingsBuilder().createReadSessionSettings();
Expand Down Expand Up @@ -1754,6 +1800,11 @@ public void onRetryAttempt(Status status, Metadata metadata) {
this.client = BigQueryReadClient.create(settingsBuilder.build());
}

@VisibleForTesting
BigQueryReadClient getClient() {
return client;
}

@VisibleForTesting
RetryAttemptCounter getListener() {
return listener;
Expand Down Expand Up @@ -1840,6 +1891,7 @@ public void close() {
}

private static class BoundedExecutorService {

private final ListeningExecutorService taskExecutor;
private final ListeningExecutorService taskSubmitExecutor;
private final Semaphore semaphore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
/** Tests for {@link BigQueryServicesImpl}. */
@RunWith(JUnit4.class)
public class BigQueryServicesImplTest {

@Rule public ExpectedException thrown = ExpectedException.none();
@Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryServicesImpl.class);
// A test can make mock responses through setupMockResponses
Expand Down Expand Up @@ -171,6 +172,7 @@ public LowLevelHttpResponse execute() throws IOException {

@FunctionalInterface
private interface MockSetupFunction {

void apply(LowLevelHttpResponse t) throws IOException;
}

Expand Down Expand Up @@ -2092,4 +2094,29 @@ public RetryInfo parseBytes(byte[] serialized) {
impl.reportPendingMetrics();
assertEquals(123456, (long) container.getCounter(metricName).getCumulative());
}

@Test
public void testEndpointOverrides() throws IOException {
BigQueryOptions options = PipelineOptionsFactory.create().as(BigQueryOptions.class);
options.setBigQueryEndpoint("http://example.com:80");

assertEquals(
"http://example.com:80/bigquery/v2/",
new BigQueryServicesImpl.JobServiceImpl(options).getClient().getBaseUrl());
assertEquals(
"http://example.com:80/bigquery/v2/",
new BigQueryServicesImpl.DatasetServiceImpl(options).getClient().getBaseUrl());
assertEquals(
"example.com:80",
new BigQueryServicesImpl.WriteStreamServiceImpl(options)
.getClient()
.getSettings()
.getEndpoint());
assertEquals(
"example.com:80",
new BigQueryServicesImpl.StorageClientImpl(options)
.getClient()
.getSettings()
.getEndpoint());
}
}
Loading