Skip to content

Commit

Permalink
Preserve the ablility to use read API backend feature
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn committed Feb 26, 2024
1 parent 973be6c commit 015541f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,12 @@ public interface BigQueryOptions
Long getStorageWriteApiMaxRequestSize();

void setStorageWriteApiMaxRequestSize(Long value);

@Description(
"If set, BigQueryIO.Read will rely on the Read API backends to surface the appropriate"
+ " number of streams for read")
@Default.Boolean(false)
Boolean getEnableStorageReadApiV2();

void setEnableStorageReadApiV2(Boolean value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,18 @@ public List<BigQueryStorageStreamSource<T>> split(
readSessionBuilder.setDataFormat(format);
}

// Setting the requested max stream count to 0, implies that the Read API backend will select
// an appropriate number of streams for the Session to produce reasonable throughput.
// This is required when using the Read API Source V2.
int streamCount = 0;
if (desiredBundleSizeBytes > 0) {
long tableSizeBytes = (targetTable != null) ? targetTable.getNumBytes() : 0;
streamCount = (int) Math.min(tableSizeBytes / desiredBundleSizeBytes, MAX_SPLIT_COUNT);
}
if (!bqOptions.getEnableStorageReadApiV2()) {
if (desiredBundleSizeBytes > 0) {
long tableSizeBytes = (targetTable != null) ? targetTable.getNumBytes() : 0;
streamCount = (int) Math.min(tableSizeBytes / desiredBundleSizeBytes, MAX_SPLIT_COUNT);
}

streamCount = Math.max(streamCount, MIN_SPLIT_COUNT);
streamCount = Math.max(streamCount, MIN_SPLIT_COUNT);
}

CreateReadSessionRequest createReadSessionRequest =
CreateReadSessionRequest.newBuilder()
Expand Down

0 comments on commit 015541f

Please sign in to comment.