Skip to content

Commit

Permalink
Properly close Storage API batch connections (apache#31710)
Browse files Browse the repository at this point in the history
* properly close connections; add active connection counter

* only invalidate stream at teardown for PENDING type

* cleanup
  • Loading branch information
ahmedabu98 authored and acrites committed Jul 17, 2024
1 parent 9e36426 commit 710d4a8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;

/**
* Container class used by {@link StorageApiWritesShardedRecords} and {@link
Expand All @@ -38,6 +40,9 @@
*/
@AutoValue
abstract class AppendClientInfo {
private final Counter activeConnections =
Metrics.counter(AppendClientInfo.class, "activeConnections");

abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient();

abstract TableSchema getTableSchema();
Expand Down Expand Up @@ -114,19 +119,21 @@ public AppendClientInfo withAppendClient(
return this;
} else {
String streamName = getStreamName.get();
return toBuilder()
.setStreamName(streamName)
.setStreamAppendClient(
writeStreamService.getStreamAppendClient(
streamName, getDescriptor(), useConnectionPool, missingValueInterpretation))
.build();
BigQueryServices.StreamAppendClient client =
writeStreamService.getStreamAppendClient(
streamName, getDescriptor(), useConnectionPool, missingValueInterpretation);

activeConnections.inc();

return toBuilder().setStreamName(streamName).setStreamAppendClient(client).build();
}
}

public void close() {
BigQueryServices.StreamAppendClient client = getStreamAppendClient();
if (client != null) {
getCloseAppendClient().accept(client);
activeConnections.dec();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,11 @@ void teardown() {
if (client != null) {
runAsyncIgnoreFailure(closeWriterExecutor, client::unpin);
}
// if this is a PENDING stream, we won't be using it again after cleaning up this
// destination state, so clear it from the cache
if (!useDefaultStream) {
APPEND_CLIENTS.invalidate(streamName);
}
appendClientInfo = null;
}
}
Expand Down

0 comments on commit 710d4a8

Please sign in to comment.