-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rollback Bigtable throttling counter #32442
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,16 +21,13 @@ | |
import static org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; | ||
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; | ||
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; | ||
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; | ||
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; | ||
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; | ||
|
||
import com.google.api.gax.batching.BatchingException; | ||
import com.google.api.gax.rpc.ApiException; | ||
import com.google.api.gax.rpc.DeadlineExceededException; | ||
import com.google.api.gax.rpc.InvalidArgumentException; | ||
import com.google.api.gax.rpc.NotFoundException; | ||
import com.google.api.gax.rpc.ResourceExhaustedException; | ||
import com.google.auto.value.AutoValue; | ||
import com.google.bigtable.v2.MutateRowResponse; | ||
import com.google.bigtable.v2.Mutation; | ||
|
@@ -41,7 +38,6 @@ | |
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; | ||
import com.google.cloud.bigtable.data.v2.models.KeyOffset; | ||
import com.google.protobuf.ByteString; | ||
import io.grpc.StatusRuntimeException; | ||
import java.io.IOException; | ||
import java.util.ArrayDeque; | ||
import java.util.ArrayList; | ||
|
@@ -78,8 +74,6 @@ | |
import org.apache.beam.sdk.io.range.ByteKey; | ||
import org.apache.beam.sdk.io.range.ByteKeyRange; | ||
import org.apache.beam.sdk.io.range.ByteKeyRangeTracker; | ||
import org.apache.beam.sdk.metrics.Counter; | ||
import org.apache.beam.sdk.metrics.Metrics; | ||
import org.apache.beam.sdk.options.ExperimentalOptions; | ||
import org.apache.beam.sdk.options.PipelineOptions; | ||
import org.apache.beam.sdk.options.ValueProvider; | ||
|
@@ -1121,52 +1115,27 @@ public Write withMaxOutstandingBytes(long bytes) { | |
* always enabled on batch writes and limits the number of outstanding requests to the Bigtable | ||
* server. | ||
* | ||
* <p>When enabled, will also set default {@link #withThrottlingReportTargetMs} to 1 minute. | ||
* This enables runner react with increased latency in flush call due to flow control. | ||
* | ||
* <p>Does not modify this object. | ||
*/ | ||
public Write withFlowControl(boolean enableFlowControl) { | ||
BigtableWriteOptions options = getBigtableWriteOptions(); | ||
BigtableWriteOptions.Builder builder = options.toBuilder().setFlowControl(enableFlowControl); | ||
if (enableFlowControl) { | ||
builder = builder.setThrottlingReportTargetMs(60_000); | ||
} | ||
return toBuilder().setBigtableWriteOptions(builder.build()).build(); | ||
return toBuilder() | ||
.setBigtableWriteOptions(options.toBuilder().setFlowControl(enableFlowControl).build()) | ||
.build(); | ||
} | ||
|
||
/** | ||
* Returns a new {@link BigtableIO.Write} with client side latency based throttling enabled. | ||
* | ||
* <p>Will also set {@link #withThrottlingReportTargetMs} to the same value. | ||
*/ | ||
/** @deprecated This method has been deprecated in Beam 2.60.0. It does not have an effect. */ | ||
@Deprecated | ||
public Write withThrottlingTargetMs(int throttlingTargetMs) { | ||
BigtableWriteOptions options = getBigtableWriteOptions(); | ||
return toBuilder() | ||
.setBigtableWriteOptions( | ||
options | ||
.toBuilder() | ||
.setThrottlingTargetMs(throttlingTargetMs) | ||
.setThrottlingReportTargetMs(throttlingTargetMs) | ||
.build()) | ||
.build(); | ||
LOG.warn("withThrottlingTargetMs has been removed and does not have effect."); | ||
return this; | ||
} | ||
|
||
/** | ||
* Returns a new {@link BigtableIO.Write} with throttling time reporting enabled. When write | ||
* request latency exceeded the set value, the amount greater than the target will be considered | ||
* as throttling time and report back to runner. | ||
* | ||
* <p>If not set, defaults to 3 min for completed batch request. Client side flowing control | ||
* configurations (e.g. {@link #withFlowControl}, {@link #withThrottlingTargetMs} will adjust | ||
* the default value accordingly. Set to 0 to disable throttling time reporting. | ||
*/ | ||
/** @deprecated This method has been deprecated in Beam 2.60.0. It does not have an effect. */ | ||
@Deprecated | ||
public Write withThrottlingReportTargetMs(int throttlingReportTargetMs) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please dont throw an exception, instead log a warning. We dont want to be in position where are user started using this flag in 2.59 and then their jobs start failing when they upgrade to 2.60. In other words throwing an UnsupportedOperationException is effectively the same as breaking binary compatibility |
||
BigtableWriteOptions options = getBigtableWriteOptions(); | ||
return toBuilder() | ||
.setBigtableWriteOptions( | ||
options.toBuilder().setThrottlingReportTargetMs(throttlingReportTargetMs).build()) | ||
.build(); | ||
LOG.warn("withThrottlingReportTargetMs has been removed and does not have an effect."); | ||
return this; | ||
} | ||
|
||
public Write withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler) { | ||
|
@@ -1333,16 +1302,8 @@ private static class BigtableWriterFn | |
private final BigtableServiceFactory.ConfigId id; | ||
private final Coder<KV<ByteString, Iterable<Mutation>>> inputCoder; | ||
private final BadRecordRouter badRecordRouter; | ||
|
||
private final Counter throttlingMsecs = | ||
Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME); | ||
|
||
private final int throttleReportThresMsecs; | ||
|
||
private transient ConcurrentLinkedQueue<KV<BigtableWriteException, BoundedWindow>> badRecords = | ||
null; | ||
// Due to callback thread not supporting Beam metrics, Record pending metrics and report later. | ||
private transient long pendingThrottlingMsecs; | ||
private transient boolean reportedLineage; | ||
|
||
// Assign serviceEntry in startBundle and clear it in tearDown. | ||
|
@@ -1363,8 +1324,6 @@ private static class BigtableWriterFn | |
this.badRecordRouter = badRecordRouter; | ||
this.failures = new ConcurrentLinkedQueue<>(); | ||
this.id = factory.newId(); | ||
// a request completed more than this time will be considered throttled. Disabled if set to 0 | ||
throttleReportThresMsecs = firstNonNull(writeOptions.getThrottlingReportTargetMs(), 180_000); | ||
LOG.debug("Created Bigtable Write Fn with writeOptions {} ", writeOptions); | ||
} | ||
|
||
|
@@ -1393,18 +1352,13 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except | |
drainCompletedElementFutures(); | ||
checkForFailures(); | ||
KV<ByteString, Iterable<Mutation>> record = c.element(); | ||
Instant writeStart = Instant.now(); | ||
pendingThrottlingMsecs = 0; | ||
CompletableFuture<Void> f = | ||
bigtableWriter | ||
.writeRecord(record) | ||
// transform the next CompletionStage to have its own status | ||
// this allows us to capture any unexpected errors in the handler | ||
.handle(handleMutationException(record, window, writeStart)); | ||
.handle(handleMutationException(record, window)); | ||
outstandingWrites.add(f); | ||
if (pendingThrottlingMsecs > 0) { | ||
throttlingMsecs.inc(pendingThrottlingMsecs); | ||
} | ||
++recordsWritten; | ||
seenWindows.compute(window, (key, count) -> (count != null ? count : 0) + 1); | ||
} | ||
|
@@ -1420,39 +1374,14 @@ private void drainCompletedElementFutures() throws ExecutionException, Interrupt | |
} | ||
|
||
private BiFunction<MutateRowResponse, Throwable, Void> handleMutationException( | ||
KV<ByteString, Iterable<Mutation>> record, BoundedWindow window, Instant writeStart) { | ||
KV<ByteString, Iterable<Mutation>> record, BoundedWindow window) { | ||
return (MutateRowResponse result, Throwable exception) -> { | ||
if (exception != null) { | ||
if (isDataException(exception)) { | ||
retryIndividualRecord(record, window); | ||
} else { | ||
// Exception due to resource unavailable or rate limited, | ||
// including DEADLINE_EXCEEDED and RESOURCE_EXHAUSTED. | ||
boolean isResourceException = false; | ||
if (exception instanceof StatusRuntimeException) { | ||
StatusRuntimeException se = (StatusRuntimeException) exception; | ||
if (io.grpc.Status.DEADLINE_EXCEEDED.equals(se.getStatus()) | ||
|| io.grpc.Status.RESOURCE_EXHAUSTED.equals(se.getStatus())) { | ||
isResourceException = true; | ||
} | ||
} else if (exception instanceof DeadlineExceededException | ||
|| exception instanceof ResourceExhaustedException) { | ||
isResourceException = true; | ||
} | ||
if (isResourceException) { | ||
pendingThrottlingMsecs = new Duration(writeStart, Instant.now()).getMillis(); | ||
} | ||
failures.add(new BigtableWriteException(record, exception)); | ||
} | ||
} else { | ||
// add the excessive amount to throttling metrics if elapsed time > target latency | ||
if (throttleReportThresMsecs > 0) { | ||
long excessTime = | ||
new Duration(writeStart, Instant.now()).getMillis() - throttleReportThresMsecs; | ||
if (excessTime > 0) { | ||
pendingThrottlingMsecs = excessTime; | ||
} | ||
} | ||
} | ||
return null; | ||
}; | ||
|
@@ -1489,7 +1418,6 @@ private static boolean isDataException(Throwable e) { | |
@FinishBundle | ||
public void finishBundle(FinishBundleContext c) throws Exception { | ||
if (bigtableWriter != null) { | ||
Instant closeStart = Instant.now(); | ||
try { | ||
bigtableWriter.close(); | ||
} catch (IOException e) { | ||
|
@@ -1498,7 +1426,6 @@ public void finishBundle(FinishBundleContext c) throws Exception { | |
// to the error queue. Bigtable will successfully write other failures in the batch, | ||
// so this exception should be ignored | ||
if (!(e.getCause() instanceof BatchingException)) { | ||
throttlingMsecs.inc(new Duration(closeStart, Instant.now()).getMillis()); | ||
throw e; | ||
} | ||
} | ||
|
@@ -1514,14 +1441,6 @@ public void finishBundle(FinishBundleContext c) throws Exception { | |
e); | ||
} | ||
|
||
// add the excessive amount to throttling metrics if elapsed time > target latency | ||
if (throttleReportThresMsecs > 0) { | ||
long excessTime = | ||
new Duration(closeStart, Instant.now()).getMillis() - throttleReportThresMsecs; | ||
if (excessTime > 0) { | ||
throttlingMsecs.inc(excessTime); | ||
} | ||
} | ||
if (!reportedLineage) { | ||
bigtableWriter.reportLineage(); | ||
reportedLineage = true; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you actually remove a public method? I suspect that you need to make it a no-op that logs a warning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I can and we should.
Only user actively set this method in their 2.59.0 pipeline will be affected. Users do not explicitly set this method, and upgrade from <=2.58.0 are unaffected. For those indeed use this method in 2.59.0 they have good reason to do so. We are removing a tested and functioning feature on request, make it no-op or warning then user won't aware this feature is gone in the next release.
In the past Beam has an "Experimental" annotation but voted to remove them (#26490). Now "new code is changeable/evolving by default" (see discussion link in that PR) especially true for a new API in single version not enabled by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the conceptual "binary compatibility" is a concern, I've add back the method, but throw instead of no-op or ignore there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I was concerned about breaking binary compatibility. I
I dont see it throwing an exception though, nor do I think it should throw an exception. Please make it a no-op and log a warning instead.
Also we didnt expose ThrottlingTargetMs nor latency based throttling for a reason, it has some really sharp edge cases that are very hard to debug. Please make this a no-op as it was prior to your change