diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index bf09bf4d9e37..39e3203f2033 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -163,4 +163,12 @@ public interface BigQueryOptions Long getStorageWriteApiMaxRequestSize(); void setStorageWriteApiMaxRequestSize(Long value); + + @Description( + "If set, BigQueryIO.Read will rely on the Read API backends to surface the appropriate" + + " number of streams for read") + @Default.Boolean(false) + Boolean getEnableStorageReadApiV2(); + + void setEnableStorageReadApiV2(Boolean value); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java index 9fc164014b67..1739d7e93952 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java @@ -133,13 +133,18 @@ public List> split( readSessionBuilder.setDataFormat(format); } + // Setting the requested max stream count to 0, implies that the Read API backend will select + // an appropriate number of streams for the Session to produce reasonable throughput. + // This is required when using the Read API Source V2. int streamCount = 0; - if (desiredBundleSizeBytes > 0) { - long tableSizeBytes = (targetTable != null) ? targetTable.getNumBytes() : 0; - streamCount = (int) Math.min(tableSizeBytes / desiredBundleSizeBytes, MAX_SPLIT_COUNT); - } + if (!bqOptions.getEnableStorageReadApiV2()) { + if (desiredBundleSizeBytes > 0) { + long tableSizeBytes = (targetTable != null) ? targetTable.getNumBytes() : 0; + streamCount = (int) Math.min(tableSizeBytes / desiredBundleSizeBytes, MAX_SPLIT_COUNT); + } - streamCount = Math.max(streamCount, MIN_SPLIT_COUNT); + streamCount = Math.max(streamCount, MIN_SPLIT_COUNT); + } CreateReadSessionRequest createReadSessionRequest = CreateReadSessionRequest.newBuilder()