From 8fb06da4ce83f0246a3c9c4c42575a7ca2eb3331 Mon Sep 17 00:00:00 2001 From: Damon Date: Tue, 9 Jan 2024 10:19:25 -0800 Subject: [PATCH] [RRIO] Build RequestResponseIO and related dependency changes (#29710) * Move Call.Result as upper level class * Patch documentation * Replace RRIO's Result * Implement RequestResponseIO expand methods * Begin tests for transform construction * spotlessApply * Complete RequestResponseIO transform composite construction tests * Implement windowed CallShouldBackoff * Start completion of Call tasks * Complete Call features * Expose Call features configuration * Add coverage of custom Call configuration * Final cleanup * Patch arrangement of Builder methods * Clean up Call.Configuration * Patch typo * Patch unused annotations * Remove support of Redis based throttling * WIP: Refactor per PR comments * Refactor RequestResponseIOTest * Improve readability * Add sleepIfNeeded --- .../beam/io/requestresponse/ApiIOError.java | 7 +- .../apache/beam/io/requestresponse/Cache.java | 190 ++++-- .../apache/beam/io/requestresponse/Call.java | 377 +++++++++--- .../io/requestresponse/CallShouldBackoff.java | 2 +- ...uldBackoffBasedOnRejectionProbability.java | 2 +- .../DefaultSerializableBackoffSupplier.java | 35 ++ .../beam/io/requestresponse/Monitoring.java | 362 ++++++++++++ .../apache/beam/io/requestresponse/Quota.java | 7 +- .../beam/io/requestresponse/RedisClient.java | 4 +- .../beam/io/requestresponse/Repeater.java | 85 ++- .../io/requestresponse/RequestResponseIO.java | 552 ++++++++++++++++-- .../beam/io/requestresponse/Result.java | 90 +++ .../requestresponse/SerializableSupplier.java | 28 + .../ThrottleWithExternalResource.java | 12 +- .../UserCodeExecutionException.java | 8 + .../UserCodeQuotaException.java | 6 + .../UserCodeRemoteSystemException.java | 9 + .../UserCodeTimeoutException.java | 9 + .../WindowedCallShouldBackoff.java | 76 +++ .../beam/io/requestresponse/CacheIT.java | 16 +- ...ackoffBasedOnRejectionProbabilityTest.java | 2 +- .../beam/io/requestresponse/CallTest.java | 1 - .../EchoGRPCCallerWithSetupTeardownIT.java | 2 +- .../io/requestresponse/EchoRequestCoder.java | 1 + .../io/requestresponse/EchoResponseCoder.java | 43 ++ .../io/requestresponse/RedisClientIT.java | 2 +- .../beam/io/requestresponse/RepeaterTest.java | 2 +- .../requestresponse/RequestResponseIOIT.java | 95 +++ .../RequestResponseIOTest.java | 508 ++++++++++++++++ .../ThrottleWithExternalResourceIT.java | 6 +- .../WindowedCallShouldBackoffTest.java | 56 ++ 31 files changed, 2358 insertions(+), 237 deletions(-) create mode 100644 sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/DefaultSerializableBackoffSupplier.java create mode 100644 sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Monitoring.java create mode 100644 sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Result.java create mode 100644 sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/SerializableSupplier.java create mode 100644 sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoff.java create mode 100644 sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoResponseCoder.java create mode 100644 sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOIT.java create mode 100644 sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java create mode 100644 sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoffTest.java diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ApiIOError.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ApiIOError.java index cfff3bd89414..abb25bd33ba8 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ApiIOError.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ApiIOError.java @@ -17,6 +17,8 @@ */ package org.apache.beam.io.requestresponse; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.auto.value.AutoValue; @@ -26,7 +28,6 @@ import org.apache.beam.sdk.schemas.annotations.SchemaCaseFormat; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; -import org.checkerframework.checker.nullness.qual.NonNull; import org.joda.time.Instant; /** {@link ApiIOError} is a data class for storing details about an error. */ @@ -41,10 +42,10 @@ public abstract class ApiIOError { * Instantiate an {@link ApiIOError} from an {@link ErrorT} {@link T} element. The {@link T} * element is converted to a JSON string. */ - static ApiIOError of(@NonNull ErrorT e, @NonNull T element) + static ApiIOError of(ErrorT e, T element) throws JsonProcessingException { - String json = OBJECT_MAPPER.writeValueAsString(element); + String json = OBJECT_MAPPER.writeValueAsString(checkStateNotNull(element)); return ApiIOError.builder() .setRequestAsJsonString(json) diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Cache.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Cache.java index b8e526c4829b..6bbbb41de99a 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Cache.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Cache.java @@ -21,9 +21,16 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.List; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -32,7 +39,73 @@ import org.joda.time.Duration; /** Transforms for reading and writing request/response associations to a cache. */ -final class Cache { +// TODO(damondouglas): Add metrics per https://github.com/apache/beam/issues/29888. +public final class Cache { + + /** + * Builds a {@link Pair} using a Redis cache to read and write + * {@link RequestT} and {@link ResponseT} pairs. The purpose of the cache is to offload {@link + * RequestT}s from the API and instead return the {@link ResponseT} if the association is known. + * Since the {@link RequestT}s and {@link ResponseT}s need encoding and decoding, checks are made + * whether the requestTCoder and responseTCoders are {@link Coder#verifyDeterministic}. + * This feature is only appropriate for API reads such as HTTP list, get, etc. + * + *
Below describes the parameters in more detail and their usage.
+ * + *
    + *
  • {@code URI uri} - the {@link URI} of the Redis instance. + *
  • {@code Coder requestTCoder} - the {@link RequestT} {@link Coder} to encode and + * decode {@link RequestT}s during cache read and writes. + *
  • {@code Duration expiry} - the duration to hold {@link RequestT} and {@link ResponseT} + * pairs in the cache. + *
+ */ + public static Pair usingRedis( + URI uri, Coder requestTCoder, Coder responseTCoder, Duration expiry) + throws NonDeterministicException { + PTransform, Result>> read = + Cache.readUsingRedis( + new RedisClient(uri), requestTCoder, new CacheResponseCoder<>(responseTCoder)); + + PTransform>, Result>> write = + // Type arguments needed to resolve "error: [assignment] incompatible types in assignment." + Cache.writeUsingRedis( + expiry, new RedisClient(uri), requestTCoder, new CacheResponseCoder<>(responseTCoder)); + + return Pair.of(read, write); + } + + /** + * A simple POJO that holds both cache read and write {@link PTransform}s. Functionally, these go + * together and must at times be instantiated using the same inputs. + */ + public static class Pair { + private final PTransform, Result>> read; + private final PTransform>, Result>> + write; + + public static Pair of( + PTransform, Result>> read, + PTransform>, Result>> write) { + return new Pair<>(read, write); + } + + private Pair( + PTransform, Result>> read, + PTransform>, Result>> write) { + this.read = read; + this.write = write; + } + + public PTransform, Result>> getRead() { + return read; + } + + public PTransform>, Result>> + getWrite() { + return write; + } + } /** * Instantiates a {@link Call} {@link PTransform} that reads {@link RequestT} {@link ResponseT} @@ -45,7 +118,7 @@ final class Cache { @Nullable ResponseT, CallerSetupTeardownT extends Caller> & SetupTeardown> - PTransform, Call.Result>> read( + PTransform, Result>> read( CallerSetupTeardownT implementsCallerSetupTeardown, Coder requestTCoder, Coder<@Nullable ResponseT> responseTCoder) { @@ -66,12 +139,11 @@ final class Cache { * considerations when using this method to achieve cache reads. */ static - PTransform, Call.Result>> - readUsingRedis( - RedisClient client, - Coder requestTCoder, - Coder<@Nullable ResponseT> responseTCoder) - throws NonDeterministicException { + PTransform, Result>> readUsingRedis( + RedisClient client, + Coder requestTCoder, + Coder<@Nullable ResponseT> responseTCoder) + throws NonDeterministicException { return read( new UsingRedis<>(requestTCoder, responseTCoder, client).read(), requestTCoder, @@ -88,7 +160,7 @@ final class Cache { ResponseT, CallerSetupTeardownT extends Caller, KV> & SetupTeardown> - PTransform>, Call.Result>> write( + PTransform>, Result>> write( CallerSetupTeardownT implementsCallerSetupTeardown, KvCoder kvCoder) { return Call.ofCallerAndSetupTeardown(implementsCallerSetupTeardown, kvCoder); @@ -107,7 +179,7 @@ PTransform>, Call.Result - PTransform>, Call.Result>> + PTransform>, Result>> writeUsingRedis( Duration expiry, RedisClient client, @@ -136,16 +208,16 @@ private UsingRedis( this.responseTCoder = responseTCoder; } - private Read read() { + Read read() { return new Read<>(requestTCoder, responseTCoder, client); } - private Write write(Duration expiry) { + Write write(Duration expiry) { return new Write<>(expiry, requestTCoder, responseTCoder, client); } /** Reads associated {@link RequestT} {@link ResponseT} using a {@link RedisClient}. */ - private static class Read + static class Read implements Caller>, SetupTeardown { private final Coder requestTCoder; @@ -191,49 +263,79 @@ public void teardown() throws UserCodeExecutionException { client.teardown(); } } + + static class Write + implements Caller, KV>, SetupTeardown { + private final Duration expiry; + private final Coder requestTCoder; + private final Coder<@Nullable ResponseT> responseTCoder; + private final RedisClient client; + + private Write( + Duration expiry, + Coder requestTCoder, + Coder<@Nullable ResponseT> responseTCoder, + RedisClient client) { + this.expiry = expiry; + this.requestTCoder = requestTCoder; + this.responseTCoder = responseTCoder; + this.client = client; + } + + @Override + public KV call(KV request) + throws UserCodeExecutionException { + ByteArrayOutputStream keyStream = new ByteArrayOutputStream(); + ByteArrayOutputStream valueStream = new ByteArrayOutputStream(); + try { + requestTCoder.encode(request.getKey(), keyStream); + responseTCoder.encode(request.getValue(), valueStream); + } catch (IOException e) { + throw new UserCodeExecutionException(e); + } + client.setex(keyStream.toByteArray(), valueStream.toByteArray(), expiry); + return request; + } + + @Override + public void setup() throws UserCodeExecutionException { + client.setup(); + } + + @Override + public void teardown() throws UserCodeExecutionException { + client.teardown(); + } + } } - private static class Write - implements Caller, KV>, SetupTeardown { - private final Duration expiry; - private final Coder requestTCoder; - private final Coder<@Nullable ResponseT> responseTCoder; - private final RedisClient client; + /** Resolves checker error: incompatible argument for parameter ResponseT Coder. */ + private static class CacheResponseCoder extends CustomCoder<@Nullable ResponseT> { + private final NullableCoder basis; - private Write( - Duration expiry, - Coder requestTCoder, - Coder<@Nullable ResponseT> responseTCoder, - RedisClient client) { - this.expiry = expiry; - this.requestTCoder = requestTCoder; - this.responseTCoder = responseTCoder; - this.client = client; + private CacheResponseCoder(Coder basis) { + this.basis = NullableCoder.of(basis); } @Override - public KV call(KV request) - throws UserCodeExecutionException { - ByteArrayOutputStream keyStream = new ByteArrayOutputStream(); - ByteArrayOutputStream valueStream = new ByteArrayOutputStream(); - try { - requestTCoder.encode(request.getKey(), keyStream); - responseTCoder.encode(request.getValue(), valueStream); - } catch (IOException e) { - throw new UserCodeExecutionException(e); - } - client.setex(keyStream.toByteArray(), valueStream.toByteArray(), expiry); - return request; + public void encode(@Nullable ResponseT value, OutputStream outStream) + throws CoderException, IOException { + basis.encode(value, outStream); + } + + @Override + public @Nullable ResponseT decode(InputStream inStream) throws CoderException, IOException { + return basis.decode(inStream); } @Override - public void setup() throws UserCodeExecutionException { - client.setup(); + public List> getCoderArguments() { + return basis.getCoderArguments(); } @Override - public void teardown() throws UserCodeExecutionException { - client.teardown(); + public void verifyDeterministic() throws NonDeterministicException { + basis.verifyDeterministic(); } } } diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java index d52ca971ca47..65038a8ffa3d 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java @@ -17,12 +17,13 @@ */ package org.apache.beam.io.requestresponse; +import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.auto.value.AutoValue; +import java.io.IOException; import java.io.Serializable; -import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -31,38 +32,29 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.beam.io.requestresponse.Call.Result; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; -import org.checkerframework.checker.nullness.qual.NonNull; import org.joda.time.Duration; /** * {@link Call} transforms a {@link RequestT} {@link PCollection} into a {@link ResponseT} {@link * PCollection} and {@link ApiIOError} {@link PCollection}, both wrapped in a {@link Result}. */ -class Call - extends PTransform<@NonNull PCollection, @NonNull Result> { - - /** - * The default {@link Duration} to wait until completion of user code. A {@link - * UserCodeTimeoutException} is thrown when {@link Caller#call}, {@link SetupTeardown#setup}, or - * {@link SetupTeardown#teardown} exceed this timeout. - */ - static final Duration DEFAULT_TIMEOUT = Duration.standardSeconds(30L); +class Call extends PTransform, Result> { /** * Instantiates a {@link Call} {@link PTransform} with the required {@link Caller} and {@link @@ -100,6 +92,12 @@ Call ofCallerAndSetupTeardown( .build()); } + /** Instantiates a {@link Call} using the {@link Configuration}. */ + static Call of( + Configuration configuration) { + return new Call<>(configuration); + } + // TupleTags need to be instantiated for each Call instance. We cannot use a shared // static instance that is shared for multiple PCollectionTuples when Call is // instantiated multiple times as it is reused throughout code in this library. @@ -109,7 +107,7 @@ Call ofCallerAndSetupTeardown( private final Configuration configuration; private Call(Configuration configuration) { - this.configuration = configuration; + this.configuration = SerializableUtils.ensureSerializable(configuration); } /** @@ -122,16 +120,16 @@ Call withSetupTeardown(SetupTeardown setupTeardown) { } /** - * Overrides the default {@link #DEFAULT_TIMEOUT}. A {@link UserCodeTimeoutException} is thrown - * when {@link Caller#call}, {@link SetupTeardown#setup}, or {@link SetupTeardown#teardown} exceed - * the timeout. + * Overrides the default {@link RequestResponseIO#DEFAULT_TIMEOUT}. A {@link + * UserCodeTimeoutException} is thrown when {@link Caller#call}, {@link SetupTeardown#setup}, or + * {@link SetupTeardown#teardown} exceed the timeout. */ Call withTimeout(Duration timeout) { return new Call<>(configuration.toBuilder().setTimeout(timeout).build()); } @Override - public @NonNull Result expand(PCollection input) { + public Result expand(PCollection input) { PCollectionTuple pct = input.apply( @@ -147,6 +145,16 @@ private static class CallFn extends DoFn failureTag; private final CallerWithTimeout caller; private final SetupTeardownWithTimeout setupTeardown; + private final Configuration configuration; + private @MonotonicNonNull Counter requestsCounter = null; + private @MonotonicNonNull Counter responsesCounter = null; + private @MonotonicNonNull Counter failuresCounter = null; + private @MonotonicNonNull Counter callCounter = null; + private @MonotonicNonNull Counter setupCounter = null; + private @MonotonicNonNull Counter teardownCounter = null; + private @MonotonicNonNull Counter backoffCounter = null; + private @MonotonicNonNull Counter sleeperCounter = null; + private @MonotonicNonNull Counter shouldBackoffCounter = null; private transient @MonotonicNonNull ExecutorService executor; @@ -160,6 +168,57 @@ private CallFn( this.setupTeardown = new SetupTeardownWithTimeout( configuration.getTimeout(), configuration.getSetupTeardown()); + this.configuration = configuration; + } + + private void setupMetrics() { + Monitoring monitoring = configuration.getMonitoringConfiguration(); + if (monitoring.getCountRequests()) { + requestsCounter = Metrics.counter(Call.class, Monitoring.REQUESTS_COUNTER_NAME); + } + if (monitoring.getCountResponses()) { + responsesCounter = Metrics.counter(Call.class, Monitoring.RESPONSES_COUNTER_NAME); + } + if (monitoring.getCountFailures()) { + failuresCounter = Metrics.counter(Call.class, Monitoring.FAILURES_COUNTER_NAME); + } + if (monitoring.getCountCalls()) { + callCounter = + Metrics.counter(Call.class, Monitoring.callCounterNameOf(configuration.getCaller())); + } + if (monitoring.getCountSetup()) { + setupCounter = + Metrics.counter( + Call.class, Monitoring.setupCounterNameOf(configuration.getSetupTeardown())); + } + if (monitoring.getCountTeardown()) { + teardownCounter = + Metrics.counter( + Call.class, Monitoring.teardownCounterNameOf(configuration.getSetupTeardown())); + } + if (monitoring.getCountBackoffs()) { + backoffCounter = + Metrics.counter( + Call.class, + Monitoring.backoffCounterNameOf(configuration.getBackOffSupplier().get())); + } + if (monitoring.getCountSleeps()) { + sleeperCounter = + Metrics.counter( + Call.class, + Monitoring.sleeperCounterNameOf(configuration.getSleeperSupplier().get())); + } + if (monitoring.getCountShouldBackoff()) { + shouldBackoffCounter = + Metrics.counter( + Call.class, + Monitoring.shouldBackoffCounterName(configuration.getCallShouldBackoff())); + } + } + + private void setupWithoutRepeat() throws UserCodeExecutionException { + incIfPresent(setupCounter); + this.setupTeardown.setup(); } /** @@ -168,13 +227,38 @@ private CallFn( */ @Setup public void setup() throws UserCodeExecutionException { + + setupMetrics(); + this.executor = Executors.newSingleThreadExecutor(); - this.caller.setExecutor(executor); - this.setupTeardown.setExecutor(executor); + caller.setExecutor(executor); + setupTeardown.setExecutor(executor); - // TODO(damondouglas): Incorporate repeater when https://github.com/apache/beam/issues/28926 - // resolves. - this.setupTeardown.setup(); + if (!this.configuration.getShouldRepeat()) { + setupWithoutRepeat(); + return; + } + + BackOff backOff = this.configuration.getBackOffSupplier().get(); + Sleeper sleeper = this.configuration.getSleeperSupplier().get(); + + backoffIfNeeded(backOff, sleeper); + + Repeater repeater = + Repeater.builder() + .setBackOff(backOff) + .setSleeper(sleeper) + .setThrowableFunction( + ignored -> { + incIfPresent(setupCounter); + this.setupTeardown.setup(); + return null; + }) + .build() + .withBackoffCounter(backoffCounter) + .withSleeperCounter(sleeperCounter); + + repeater.apply(null); } /** @@ -183,9 +267,33 @@ public void setup() throws UserCodeExecutionException { */ @Teardown public void teardown() throws UserCodeExecutionException { - // TODO(damondouglas): Incorporate repeater when https://github.com/apache/beam/issues/28926 - // resolves. - this.setupTeardown.teardown(); + BackOff backOff = configuration.getBackOffSupplier().get(); + Sleeper sleeper = configuration.getSleeperSupplier().get(); + + backoffIfNeeded(backOff, sleeper); + + if (!configuration.getShouldRepeat()) { + incIfPresent(teardownCounter); + setupTeardown.teardown(); + return; + } + + Repeater repeater = + Repeater.builder() + .setBackOff(backOff) + .setSleeper(sleeper) + .setThrowableFunction( + ignored -> { + incIfPresent(teardownCounter); + setupTeardown.teardown(); + return null; + }) + .build() + .withBackoffCounter(backoffCounter) + .withSleeperCounter(sleeperCounter); + + repeater.apply(null); + checkStateNotNull(executor).shutdown(); try { boolean ignored = executor.awaitTermination(3L, TimeUnit.SECONDS); @@ -194,16 +302,63 @@ public void teardown() throws UserCodeExecutionException { } @ProcessElement - public void process(@Element @NonNull RequestT request, MultiOutputReceiver receiver) + public void process(@Element RequestT request, MultiOutputReceiver receiver) throws JsonProcessingException { + + BackOff backOff = configuration.getBackOffSupplier().get(); + Sleeper sleeper = configuration.getSleeperSupplier().get(); + + incIfPresent(requestsCounter); + backoffIfNeeded(backOff, sleeper); + + if (!configuration.getShouldRepeat()) { + incIfPresent(callCounter); + try { + // TODO(damondouglas): https://github.com/apache/beam/issues/29248 + ResponseT response = caller.call(request); + receiver.get(responseTag).output(response); + incIfPresent(responsesCounter); + } catch (UserCodeExecutionException e) { + incIfPresent(failuresCounter); + receiver.get(failureTag).output(ApiIOError.of(e, request)); + } + + return; + } + + Repeater repeater = + Repeater.builder() + .setSleeper(sleeper) + .setBackOff(backOff) + .setThrowableFunction(caller::call) + .build() + .withSleeperCounter(sleeperCounter) + .withBackoffCounter(backoffCounter) + .withCallCounter(callCounter); + try { - // TODO(damondouglas): https://github.com/apache/beam/issues/29248 - ResponseT response = this.caller.call(request); + ResponseT response = repeater.apply(request); receiver.get(responseTag).output(response); + incIfPresent(responsesCounter); } catch (UserCodeExecutionException e) { + incIfPresent(failuresCounter); receiver.get(failureTag).output(ApiIOError.of(e, request)); } } + + private void backoffIfNeeded(BackOff backOff, Sleeper sleeper) { + if (configuration.getCallShouldBackoff().isTrue()) { + incIfPresent(shouldBackoffCounter); + incIfPresent(backoffCounter); + try { + incIfPresent(sleeperCounter); + sleeper.sleep(backOff.nextBackOffMillis()); + } catch (InterruptedException ignored) { + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } } /** Configuration details for {@link Call}. */ @@ -211,7 +366,7 @@ public void process(@Element @NonNull RequestT request, MultiOutputReceiver rece abstract static class Configuration implements Serializable { static Builder builder() { - return new AutoValue_Call_Configuration.Builder<>(); + return new AutoValue_Call_Configuration.Builder(); } /** The user custom code that converts a {@link RequestT} into a {@link ResponseT}. */ @@ -234,25 +389,85 @@ static Builder builder() { */ abstract Coder getResponseCoder(); + /** + * Configures whether the {@link DoFn} should repeat {@link SetupTeardown} and {@link Caller} + * invocations, using the {@link Repeater}, in the setting of {@link + * RequestResponseIO#REPEATABLE_ERROR_TYPES}. Defaults to false. + */ + abstract Boolean getShouldRepeat(); + + /** + * The {@link CallShouldBackoff} that determines whether the {@link DoFn} should hold {@link + * RequestT}s. Defaults to a private no-op implementation; no {@link RequestT}s are held during + * {@link ProcessElement}. + */ + abstract CallShouldBackoff getCallShouldBackoff(); + + /** + * The {@link SerializableSupplier} of a {@link Sleeper} that pauses code execution of when user + * custom code throws a {@link RequestResponseIO#REPEATABLE_ERROR_TYPES} {@link + * UserCodeExecutionException}. Supplies with {@link Sleeper#DEFAULT} by default. The need for a + * {@link SerializableSupplier} instead of setting this directly is that some implementations of + * {@link Sleeper} may not be {@link Serializable}. + */ + abstract SerializableSupplier getSleeperSupplier(); + + /** + * The {@link SerializableSupplier} of a {@link BackOff} that reports to a {@link Sleeper} how + * long to pause execution. It reports a {@link BackOff#STOP} to stop repeating invocation + * attempts. Supplies with {@link FluentBackoff#DEFAULT} by default. The need for a {@link + * SerializableSupplier} instead of setting this directly is that some {@link BackOff} + * implementations, such as {@link FluentBackoff} are not {@link Serializable}. + */ + abstract SerializableSupplier getBackOffSupplier(); + + abstract Monitoring getMonitoringConfiguration(); + abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { - /** See {@link #getCaller()}. */ + /** See {@link Configuration#getCaller}. */ abstract Builder setCaller(Caller value); - /** See {@link #getSetupTeardown()}. */ + /** See {@link Configuration#getResponseCoder}. */ + abstract Builder setResponseCoder(Coder value); + + /** See {@link Configuration#getSetupTeardown}. */ abstract Builder setSetupTeardown(SetupTeardown value); abstract Optional getSetupTeardown(); - /** See {@link #getTimeout()}. */ + /** See {@link Configuration#getTimeout}. */ abstract Builder setTimeout(Duration value); abstract Optional getTimeout(); - abstract Builder setResponseCoder(Coder value); + /** See {@link Configuration#getShouldRepeat}. */ + abstract Builder setShouldRepeat(Boolean value); + + abstract Optional getShouldRepeat(); + + /** See {@link Configuration#getCallShouldBackoff}. */ + abstract Builder setCallShouldBackoff( + CallShouldBackoff value); + + abstract Optional> getCallShouldBackoff(); + + /** See {@link Configuration#getSleeperSupplier}. */ + abstract Builder setSleeperSupplier(SerializableSupplier value); + + abstract Optional> getSleeperSupplier(); + + /** See {@link Configuration#getBackOffSupplier}. */ + abstract Builder setBackOffSupplier(SerializableSupplier value); + + abstract Optional> getBackOffSupplier(); + + abstract Builder setMonitoringConfiguration(Monitoring value); + + abstract Optional getMonitoringConfiguration(); abstract Configuration autoBuild(); @@ -261,84 +476,64 @@ final Configuration build() { setSetupTeardown(new NoopSetupTeardown()); } - if (!getTimeout().isPresent()) { - setTimeout(DEFAULT_TIMEOUT); + if (!getShouldRepeat().isPresent()) { + setShouldRepeat(false); } - return autoBuild(); - } - } - } + if (!getTimeout().isPresent()) { + setTimeout(RequestResponseIO.DEFAULT_TIMEOUT); + } - /** - * The {@link Result} of processing request {@link PCollection} into response {@link PCollection}. - */ - static class Result implements POutput { + if (!getCallShouldBackoff().isPresent()) { + setCallShouldBackoff(new NoopCallShouldBackoff<>()); + } - static Result of( - Coder responseTCoder, - TupleTag responseTag, - TupleTag failureTag, - PCollectionTuple pct) { - return new Result<>(responseTCoder, responseTag, pct, failureTag); - } + if (!getSleeperSupplier().isPresent()) { + setSleeperSupplier((SerializableSupplier) () -> Sleeper.DEFAULT); + } - private final Pipeline pipeline; - private final TupleTag responseTag; - private final TupleTag failureTag; - private final PCollection responses; - private final PCollection failures; + if (!getBackOffSupplier().isPresent()) { + setBackOffSupplier(new DefaultSerializableBackoffSupplier()); + } - private Result( - Coder responseTCoder, - TupleTag responseTag, - PCollectionTuple pct, - TupleTag failureTag) { - this.pipeline = pct.getPipeline(); - this.responseTag = responseTag; - this.failureTag = failureTag; - this.responses = pct.get(responseTag).setCoder(responseTCoder); - this.failures = pct.get(this.failureTag); - } + if (!getMonitoringConfiguration().isPresent()) { + setMonitoringConfiguration(Monitoring.builder().build()); + } - public PCollection getResponses() { - return responses; + return autoBuild(); + } } + } - public PCollection getFailures() { - return failures; - } + static class NoopSetupTeardown implements SetupTeardown { @Override - public @NonNull Pipeline getPipeline() { - return this.pipeline; + public void setup() throws UserCodeExecutionException { + // Noop } @Override - public @NonNull Map, PValue> expand() { - return ImmutableMap.of( - responseTag, responses, - failureTag, failures); + public void teardown() throws UserCodeExecutionException { + // Noop } - - @Override - public void finishSpecifyingOutput( - @NonNull String transformName, - @NonNull PInput input, - @NonNull PTransform transform) {} } - private static class NoopSetupTeardown implements SetupTeardown { + private static class NoopCallShouldBackoff implements CallShouldBackoff { @Override - public void setup() throws UserCodeExecutionException { + public void update(UserCodeExecutionException exception) { // Noop } @Override - public void teardown() throws UserCodeExecutionException { + public void update(ResponseT response) { // Noop } + + @Override + public boolean isTrue() { + return false; + } } private static class CallerWithTimeout diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CallShouldBackoff.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CallShouldBackoff.java index 1d093f2efb12..43341052c46a 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CallShouldBackoff.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CallShouldBackoff.java @@ -29,5 +29,5 @@ public interface CallShouldBackoff extends Serializable { void update(ResponseT response); /** Report whether to backoff. */ - boolean value(); + boolean isTrue(); } diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CallShouldBackoffBasedOnRejectionProbability.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CallShouldBackoffBasedOnRejectionProbability.java index 62a7990d21ee..d517023eb15d 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CallShouldBackoffBasedOnRejectionProbability.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CallShouldBackoffBasedOnRejectionProbability.java @@ -97,7 +97,7 @@ public void update(ResponseT response) { /** Report whether to backoff. */ @Override - public boolean value() { + public boolean isTrue() { return getRejectionProbability() > getThreshold(); } } diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/DefaultSerializableBackoffSupplier.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/DefaultSerializableBackoffSupplier.java new file mode 100644 index 000000000000..89e9400854d7 --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/DefaultSerializableBackoffSupplier.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.SerializableUtils; + +/** + * A default {@link SerializableSupplier} of a {@link BackOff} that relies on {@link + * FluentBackoff#DEFAULT}. Embedding {@link FluentBackoff#DEFAULT} in a lambda failed {@link + * SerializableUtils#ensureSerializable}. + */ +class DefaultSerializableBackoffSupplier implements SerializableSupplier { + + @Override + public BackOff get() { + return FluentBackoff.DEFAULT.backoff(); + } +} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Monitoring.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Monitoring.java new file mode 100644 index 000000000000..4b870b431d8a --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Monitoring.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Optional; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metric; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Configures {@link Metric}s throughout various features of {@link RequestResponseIO}. By default, + * all monitoring is turned off. + * + *
+ * Cache metrics are not yet supported. See https://github.com/apache/beam/issues/29888
+ * 
+ */ +@AutoValue +public abstract class Monitoring implements Serializable { + + private static final String SEPARATOR = "_"; + + /** Counter name for the number of requests received by the {@link Call} transform. */ + static final String REQUESTS_COUNTER_NAME = "requests"; + + /** Counter name for the number of responses emitted by the {@link Call} transform. */ + static final String RESPONSES_COUNTER_NAME = "responses"; + + /** Counter name for the number of failures emitted by the {@link Call} transform. */ + static final String FAILURES_COUNTER_NAME = "failures"; + + /** Counter name for the number of cache read attempts. */ + static final String CACHE_READ_REQUESTS_COUNTER_NAME = metricNameOf("cache", "read", "requests"); + + /** + * Counter name for the number of null requests and response associations received during cache + * reads. + */ + static final String CACHE_READ_NULL_COUNTER_NAME = metricNameOf("cache", "read", "nulls"); + + /** + * Counter name for the number of non-null requests and response associations received during + * cache reads. + */ + static final String CACHE_READ_NON_NULL_COUNTER_NAME = + metricNameOf("cache", "read", "non", "nulls"); + + /** Counter name for the number of failures encountered during cache reads. */ + static final String CACHE_READ_FAILURES_COUNTER_NAME = metricNameOf("cache", "read", "failures"); + + /** Counter name for the number of cache write attempts. */ + static final String CACHE_WRITE_REQUESTS_COUNTER_NAME = + metricNameOf("cache", "write", "requests"); + + /** Counter name for the number of successful cache writes. */ + static final String CACHE_WRITE_SUCCESSES_COUNTER_NAME = + metricNameOf("cache", "write", "successes"); + + /** Counter name for the number of failures encountered during cache writes. */ + static final String CACHE_WRITE_FAILURES_COUNTER_NAME = + metricNameOf("cache", "write", "failures"); + + private static final String CALL_COUNTER_NAME = metricNameOf("call", "invocations"); + private static final String SETUP_COUNTER_NAME = metricNameOf("setup", "invocations"); + private static final String TEARDOWN_COUNTER_NAME = metricNameOf("teardowns", "invocations"); + private static final String BACKOFF_COUNTER_NAME = "backoffs"; + private static final String SLEEPER_COUNTER_NAME = "sleeps"; + private static final String SHOULD_BACKOFF_COUNTER_NAME = + metricNameOf("should", "backoff", "count"); + + /** Derives a {@link Counter} name for a {@link Caller#call} invocation count. */ + static String callCounterNameOf(Caller instance) { + return metricNameOf(instance.getClass(), CALL_COUNTER_NAME); + } + + /** Derives a {@link Counter} name for a {@link SetupTeardown#setup} invocation count. */ + static String setupCounterNameOf(SetupTeardown instance) { + return metricNameOf(instance.getClass(), SETUP_COUNTER_NAME); + } + + /** Derives a {@link Counter} name for a {@link SetupTeardown#teardown} invocation count. */ + static String teardownCounterNameOf(SetupTeardown instance) { + return metricNameOf(instance.getClass(), TEARDOWN_COUNTER_NAME); + } + + /** Derives a {@link Counter} name for a {@link BackOff#nextBackOffMillis} invocation count. */ + static String backoffCounterNameOf(BackOff instance) { + return metricNameOf(instance.getClass(), BACKOFF_COUNTER_NAME); + } + + /** Derives a {@link Counter} name for a {@link Sleeper#sleep} invocation count. */ + static String sleeperCounterNameOf(Sleeper instance) { + return metricNameOf(instance.getClass(), SLEEPER_COUNTER_NAME); + } + + /** + * Derives a {@link Counter} name for counts of when {@link CallShouldBackoff#isTrue} is found + * true. + */ + static String shouldBackoffCounterName(CallShouldBackoff instance) { + return metricNameOf(instance.getClass(), SHOULD_BACKOFF_COUNTER_NAME); + } + + private static String metricNameOf(String... segments) { + return String.join(SEPARATOR, Arrays.asList(segments)); + } + + private static String metricNameOf(Class clazz, String suffix) { + String simpleName = + CaseFormat.UPPER_CAMEL + .converterTo(CaseFormat.LOWER_UNDERSCORE) + .convert(clazz.getSimpleName()); + return simpleName + SEPARATOR + suffix; + } + + public static Builder builder() { + return new AutoValue_Monitoring.Builder(); + } + + /** Count incoming request elements processed by {@link Call}'s {@link DoFn}. */ + public abstract Boolean getCountRequests(); + + /** + * Count outgoing responses resulting from {@link Call}'s successful {@link Caller} invocation. + */ + public abstract Boolean getCountResponses(); + + /** Count invocations of {@link Caller#call}. */ + public abstract Boolean getCountCalls(); + + /** Count failures resulting from {@link Call}'s successful {@link Caller} invocation. */ + public abstract Boolean getCountFailures(); + + /** Count invocations of {@link SetupTeardown#setup}. */ + public abstract Boolean getCountSetup(); + + /** Count invocations of {@link SetupTeardown#teardown}. */ + public abstract Boolean getCountTeardown(); + + /** Count invocations of {@link BackOff#nextBackOffMillis}. */ + public abstract Boolean getCountBackoffs(); + + /** Count invocations of {@link Sleeper#sleep}. */ + public abstract Boolean getCountSleeps(); + + /** Count when {@link CallShouldBackoff#isTrue} is found true. */ + public abstract Boolean getCountShouldBackoff(); + + /** Count number of attempts to read from the {@link Cache}. */ + public abstract Boolean getCountCacheReadRequests(); + + /** Count associated null values resulting from {@link Cache} reads. */ + public abstract Boolean getCountCacheReadNulls(); + + /** Count associated non-null values resulting from {@link Cache} reads. */ + public abstract Boolean getCountCacheReadNonNulls(); + + /** Count {@link Cache} read failures. */ + public abstract Boolean getCountCacheReadFailures(); + + /** Count number of attempts to write to the {@link Cache}. */ + public abstract Boolean getCountCacheWriteRequests(); + + /** Count {@link Cache} write successes. */ + public abstract Boolean getCountCacheWriteSuccesses(); + + /** Count {@link Cache} write failures. */ + public abstract Boolean getCountCacheWriteFailures(); + + /** + * Turns on all monitoring. The purpose of this method is, when used with {@link #toBuilder} and + * other setters, to turn everything on except for a few select counters. + */ + public Monitoring withEverythingCounted() { + return toBuilder() + .setCountRequests(true) + .setCountResponses(true) + .setCountCalls(true) + .setCountFailures(true) + .setCountSetup(true) + .setCountTeardown(true) + .setCountBackoffs(true) + .setCountSleeps(true) + .setCountShouldBackoff(true) + .build() + .withCountCaching(true); + } + + /** Turns on all monitoring except for cache related metrics. */ + public Monitoring withEverythingCountedExceptedCaching() { + return withEverythingCounted().withCountCaching(false); + } + + private Monitoring withCountCaching(boolean value) { + return toBuilder() + .setCountCacheReadRequests(value) + .setCountCacheReadNulls(value) + .setCountCacheReadNonNulls(value) + .setCountCacheReadFailures(value) + .setCountCacheWriteRequests(value) + .setCountCacheWriteSuccesses(value) + .setCountCacheWriteFailures(value) + .build(); + } + + public abstract Builder toBuilder(); + + /** + * Convenience wrapper to minimize the number of if statements in the code to check for nullness + * prior to invoking {@link Counter#inc}. + */ + static void incIfPresent(@Nullable Counter counter) { + if (counter != null) { + counter.inc(); + } + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setCountRequests(Boolean value); + + public abstract Builder setCountResponses(Boolean value); + + public abstract Builder setCountCalls(Boolean value); + + public abstract Builder setCountFailures(Boolean value); + + public abstract Builder setCountSetup(Boolean value); + + public abstract Builder setCountTeardown(Boolean value); + + public abstract Builder setCountBackoffs(Boolean value); + + public abstract Builder setCountSleeps(Boolean value); + + public abstract Builder setCountShouldBackoff(Boolean value); + + public abstract Builder setCountCacheReadRequests(Boolean value); + + public abstract Builder setCountCacheReadNulls(Boolean value); + + public abstract Builder setCountCacheReadNonNulls(Boolean value); + + public abstract Builder setCountCacheReadFailures(Boolean value); + + public abstract Builder setCountCacheWriteRequests(Boolean value); + + public abstract Builder setCountCacheWriteSuccesses(Boolean value); + + public abstract Builder setCountCacheWriteFailures(Boolean value); + + abstract Optional getCountRequests(); + + abstract Optional getCountResponses(); + + abstract Optional getCountCalls(); + + abstract Optional getCountFailures(); + + abstract Optional getCountSetup(); + + abstract Optional getCountTeardown(); + + abstract Optional getCountBackoffs(); + + abstract Optional getCountSleeps(); + + abstract Optional getCountShouldBackoff(); + + abstract Optional getCountCacheReadRequests(); + + abstract Optional getCountCacheReadNulls(); + + abstract Optional getCountCacheReadNonNulls(); + + abstract Optional getCountCacheReadFailures(); + + abstract Optional getCountCacheWriteRequests(); + + abstract Optional getCountCacheWriteSuccesses(); + + abstract Optional getCountCacheWriteFailures(); + + abstract Monitoring autoBuild(); + + final Monitoring build() { + if (!getCountRequests().isPresent()) { + setCountRequests(false); + } + if (!getCountResponses().isPresent()) { + setCountResponses(false); + } + if (!getCountCalls().isPresent()) { + setCountCalls(false); + } + if (!getCountFailures().isPresent()) { + setCountFailures(false); + } + if (!getCountSetup().isPresent()) { + setCountSetup(false); + } + if (!getCountTeardown().isPresent()) { + setCountTeardown(false); + } + if (!getCountBackoffs().isPresent()) { + setCountBackoffs(false); + } + if (!getCountSleeps().isPresent()) { + setCountSleeps(false); + } + if (!getCountShouldBackoff().isPresent()) { + setCountShouldBackoff(false); + } + if (!getCountCacheReadRequests().isPresent()) { + setCountCacheReadRequests(false); + } + if (!getCountCacheReadNulls().isPresent()) { + setCountCacheReadNulls(false); + } + if (!getCountCacheReadNonNulls().isPresent()) { + setCountCacheReadNonNulls(false); + } + if (!getCountCacheReadFailures().isPresent()) { + setCountCacheReadFailures(false); + } + if (!getCountCacheWriteRequests().isPresent()) { + setCountCacheWriteRequests(false); + } + if (!getCountCacheWriteSuccesses().isPresent()) { + setCountCacheWriteSuccesses(false); + } + if (!getCountCacheWriteFailures().isPresent()) { + setCountCacheWriteFailures(false); + } + + return autoBuild(); + } + } +} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java index d2e538cf7cf3..1adc46b836a2 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java @@ -19,7 +19,6 @@ import java.io.Serializable; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects; -import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -29,9 +28,9 @@ */ public class Quota implements Serializable { private final long numRequests; - private final @NonNull Duration interval; + private final Duration interval; - public Quota(long numRequests, @NonNull Duration interval) { + public Quota(long numRequests, Duration interval) { this.numRequests = numRequests; this.interval = interval; } @@ -42,7 +41,7 @@ public long getNumRequests() { } /** The duration context within which to allow requests. */ - public @NonNull Duration getInterval() { + public Duration getInterval() { return interval; } diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java index a347a1852413..0da8f0fbc57c 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java @@ -151,7 +151,7 @@ byte[] lpop(String key) throws UserCodeExecutionException { * Sets the key/value for a Duration expiry. Naming of this method preserves that of the * underlying {@link JedisPooled} client and performs a null check prior to execution. */ - void setex(byte[] key, byte[] value, @NonNull Duration expiry) throws UserCodeExecutionException { + void setex(byte[] key, byte[] value, Duration expiry) throws UserCodeExecutionException { try { getSafeClient().setex(key, expiry.getStandardSeconds(), value); } catch (JedisException e) { @@ -163,7 +163,7 @@ void setex(byte[] key, byte[] value, @NonNull Duration expiry) throws UserCodeEx * Sets the key/value for a Duration expiry. Naming of this method preserves that of the * underlying {@link JedisPooled} client and performs a null check prior to execution. */ - void setex(String key, Long value, @NonNull Duration expiry) throws UserCodeExecutionException { + void setex(String key, Long value, Duration expiry) throws UserCodeExecutionException { try { getSafeClient().setex(key, expiry.getStandardSeconds(), String.valueOf(value)); } catch (JedisException e) { diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java index e298511494a1..69364d85887e 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java @@ -17,14 +17,17 @@ */ package org.apache.beam.io.requestresponse; +import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + import com.google.auto.value.AutoValue; import java.io.IOException; import java.util.Optional; -import java.util.Set; +import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.Sleeper; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Repeats a method invocation when it encounters an error, pausing invocations using {@link @@ -33,28 +36,21 @@ @AutoValue abstract class Repeater { - /** {@link Set} of {@link UserCodeExecutionException}s that warrant repeating. */ - static final Set> REPEATABLE_ERROR_TYPES = - ImmutableSet.of( - UserCodeRemoteSystemException.class, - UserCodeTimeoutException.class, - UserCodeQuotaException.class); - static Builder builder() { return new AutoValue_Repeater.Builder<>(); } /** * The {@link ThrowableFunction} to invoke repeatedly until it succeeds, throws a {@link - * UserCodeExecutionException} that is not {@link #REPEATABLE_ERROR_TYPES}, or {@link - * BackOff#STOP}. + * UserCodeExecutionException} that is not {@link RequestResponseIO#REPEATABLE_ERROR_TYPES}, or + * {@link BackOff#STOP}. */ abstract ThrowableFunction getThrowableFunction(); /** * The {@link Sleeper} that pauses execution of the {@link #getThrowableFunction} when it throws a - * {@link #REPEATABLE_ERROR_TYPES} {@link UserCodeExecutionException}. Uses {@link - * Sleeper#DEFAULT} by default. + * {@link RequestResponseIO#REPEATABLE_ERROR_TYPES} {@link UserCodeExecutionException}. Uses + * {@link Sleeper#DEFAULT} by default. */ abstract Sleeper getSleeper(); @@ -65,28 +61,66 @@ static Builder builder() { */ abstract BackOff getBackOff(); + /** Counts invocations of {@link Caller#call}. */ + abstract @Nullable Counter getCallCounter(); + + /** Convenience setter for {@link Builder#setCallCounter}. */ + Repeater withCallCounter(@Nullable Counter value) { + if (value == null) { + return this; + } + return toBuilder().setCallCounter(checkStateNotNull(value)).build(); + } + + /** Counts invocations of {@link BackOff#nextBackOffMillis}. */ + abstract @Nullable Counter getBackoffCounter(); + + /** Convenience setter for {@link Builder#setBackoffCounter}. */ + Repeater withBackoffCounter(@Nullable Counter value) { + if (value == null) { + return this; + } + return toBuilder().setBackoffCounter(checkStateNotNull(value)).build(); + } + + /** Counts invocations of {@link Sleeper#sleep}. */ + abstract @Nullable Counter getSleeperCounter(); + + /** Convenience setter for {@link Builder#setSleeperCounter}. */ + Repeater withSleeperCounter(@Nullable Counter value) { + if (value == null) { + return this; + } + return toBuilder().setSleeperCounter(checkStateNotNull(value)).build(); + } + + abstract Builder toBuilder(); + /** * Applies the {@link InputT} to the {@link ThrowableFunction}, returning the {@link OutputT} if - * successful. If the function throws an exception that {@link #REPEATABLE_ERROR_TYPES} contains, - * repeats the invocation after {@link Sleeper#sleep} for the amount of time reported by {@link - * BackOff#nextBackOffMillis}. Throws the latest encountered {@link UserCodeExecutionException} - * when {@link BackOff} reports a {@link BackOff#STOP}. + * successful. If the function throws an exception that {@link + * RequestResponseIO#REPEATABLE_ERROR_TYPES} contains, repeats the invocation after {@link + * Sleeper#sleep} for the amount of time reported by {@link BackOff#nextBackOffMillis}. Throws the + * latest encountered {@link UserCodeExecutionException} when {@link BackOff} reports a {@link + * BackOff#STOP}. */ OutputT apply(InputT input) throws UserCodeExecutionException { Optional latestError = Optional.empty(); long waitFor = 0L; while (waitFor != BackOff.STOP) { try { - getSleeper().sleep(waitFor); + sleepIfNeeded(waitFor); + incIfPresent(getCallCounter()); return getThrowableFunction().apply(input); } catch (UserCodeExecutionException e) { - if (!REPEATABLE_ERROR_TYPES.contains(e.getClass())) { + if (!e.shouldRepeat()) { throw e; } latestError = Optional.of(e); } catch (InterruptedException ignored) { } try { + incIfPresent(getBackoffCounter()); waitFor = getBackOff().nextBackOffMillis(); } catch (IOException e) { throw new UserCodeExecutionException(e); @@ -96,6 +130,13 @@ OutputT apply(InputT input) throws UserCodeExecutionException { new UserCodeExecutionException("failed to process for input: " + input)); } + private void sleepIfNeeded(long waitFor) throws InterruptedException { + if (waitFor > 0L) { + incIfPresent(getSleeperCounter()); + getSleeper().sleep(waitFor); + } + } + /** * A {@link FunctionalInterface} for executing a {@link UserCodeExecutionException} throwable * function. @@ -123,6 +164,12 @@ abstract Builder setThrowableFunction( abstract Optional getBackOff(); + abstract Builder setCallCounter(Counter value); + + abstract Builder setBackoffCounter(Counter value); + + abstract Builder setSleeperCounter(Counter value); + abstract Repeater autoBuild(); final Repeater build() { diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java index de7d26aab4bd..b7338cb64ff7 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java @@ -17,18 +17,35 @@ */ package org.apache.beam.io.requestresponse; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + import com.google.auto.value.AutoValue; -import java.util.Map; -import org.apache.beam.io.requestresponse.RequestResponseIO.Result; -import org.apache.beam.sdk.Pipeline; +import java.io.Serializable; +import java.util.Set; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Triple; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.Keys; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Partition; +import org.apache.beam.sdk.transforms.Partition.PartitionFn; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; /** * {@link PTransform} for reading from and writing to Web APIs. @@ -51,35 +68,221 @@ * } * }} * - *

Then provide {@link RequestResponseIO}'s {@link #create} method your {@link Caller} + *

Then provide {@link RequestResponseIO}'s {@link #of} method your {@link Caller} * implementation. * - *

{@code  PCollection requests = ...
- *  Result result = requests.apply(RequestResponseIO.create(new MyCaller()));
- *  result.getResponses().apply( ... );
- *  result.getFailures().apply( ... );
+ * 
{@code
+ * Coder responseCoder = ...
+ * PCollection requests = ...
+ * Result result = requests.apply(RequestResponseIO.of(new MyCaller(), responseCoder));
+ * result.getResponses().apply( ... );
+ * result.getFailures().apply( ... );
  * }
*/ public class RequestResponseIO extends PTransform, Result> { - private static final TupleTag FAILURE_TAG = new TupleTag() {}; + /** + * The default {@link Duration} to wait until completion of user code. A {@link + * UserCodeTimeoutException} is thrown when {@link Caller#call}, {@link SetupTeardown#setup}, or + * {@link SetupTeardown#teardown} exceed this timeout. + */ + public static final Duration DEFAULT_TIMEOUT = Duration.standardSeconds(30L); + + /** + * {@link Set} of {@link UserCodeExecutionException}s that warrant repeating. Not all errors + * should be repeat execution such as bad or unauthenticated requests. However, certain errors + * such as timeouts, remote system or quota exceed errors may not be related to the code but due + * to the remote system and thus warrant repeating. + */ + public static final Set> REPEATABLE_ERROR_TYPES = + ImmutableSet.of( + UserCodeRemoteSystemException.class, + UserCodeTimeoutException.class, + UserCodeQuotaException.class); + + private static final String CALL_NAME = Call.class.getSimpleName(); + + private static final String CACHE_READ_NAME = "CacheRead"; + + private static final String CACHE_WRITE_NAME = "CacheWrite"; - // TODO(damondouglas): remove when utilized. - @SuppressWarnings({"unused"}) - private final Configuration configuration; + private static final String THROTTLE_NAME = "Throttle"; - private RequestResponseIO(Configuration configuration) { - this.configuration = configuration; + private final TupleTag responseTag = new TupleTag() {}; + private final TupleTag failureTag = new TupleTag() {}; + + private final Configuration rrioConfiguration; + private final Call.Configuration callConfiguration; + + private RequestResponseIO( + Configuration rrioConfiguration, + Call.Configuration callConfiguration) { + this.rrioConfiguration = rrioConfiguration; + this.callConfiguration = callConfiguration; } + /** + * Instantiates a {@link RequestResponseIO} with a {@link Caller} and a {@link ResponseT} {@link + * Coder}. Checks for the {@link Caller}'s {@link SerializableUtils#ensureSerializable} + * serializable errors. + */ public static RequestResponseIO of( - Caller caller) { + Caller caller, Coder responseTCoder) { + + caller = SerializableUtils.ensureSerializable(caller); + + return new RequestResponseIO<>( + Configuration.builder().setResponseTCoder(responseTCoder).build(), + Call.Configuration.builder() + .setCaller(caller) + .setResponseCoder(responseTCoder) + .build()) + .withDefaults(); + } + + /** + * Instantiates a {@link RequestResponseIO} with a {@link ResponseT} {@link Coder} and an + * implementation of both the {@link Caller} and {@link SetupTeardown} interfaces. Checks {@link + * SerializableUtils#ensureSerializable} serializable errors. + */ + public static < + RequestT, + ResponseT, + CallerSetupTeardownT extends Caller & SetupTeardown> + RequestResponseIO ofCallerAndSetupTeardown( + CallerSetupTeardownT implementsCallerAndSetupTeardown, Coder responseTCoder) { + + implementsCallerAndSetupTeardown = + SerializableUtils.ensureSerializable(implementsCallerAndSetupTeardown); + + return new RequestResponseIO<>( + Configuration.builder().setResponseTCoder(responseTCoder).build(), + Call.Configuration.builder() + .setCaller(implementsCallerAndSetupTeardown) + .setSetupTeardown(implementsCallerAndSetupTeardown) + .setResponseCoder(responseTCoder) + .build()) + .withDefaults(); + } + + private RequestResponseIO withDefaults() { + return withTimeout(DEFAULT_TIMEOUT) + .shouldRepeat(true) + .withBackOffSupplier(new DefaultSerializableBackoffSupplier()) + .withSleeperSupplier((SerializableSupplier) () -> Sleeper.DEFAULT) + .withCallShouldBackoff(new CallShouldBackoffBasedOnRejectionProbability<>()); + } + + /** + * Overrides the {@link #DEFAULT_TIMEOUT} expected timeout of all user custom code. If user custom + * code exceeds this timeout, then a {@link UserCodeTimeoutException} is thrown. User custom code + * may throw this exception prior to the configured timeout value on their own. + */ + public RequestResponseIO withTimeout(Duration value) { + return new RequestResponseIO<>( + rrioConfiguration, callConfiguration.toBuilder().setTimeout(value).build()); + } + + /** + * Turns off repeat invocations (default is on) of {@link SetupTeardown} and {@link Caller}, using + * the {@link Repeater}, in the setting of {@link RequestResponseIO#REPEATABLE_ERROR_TYPES}. + */ + public RequestResponseIO withoutRepeater() { + return shouldRepeat(false); + } + + private RequestResponseIO shouldRepeat(boolean value) { return new RequestResponseIO<>( - Configuration.builder().setCaller(caller).build()); + rrioConfiguration, callConfiguration.toBuilder().setShouldRepeat(value).build()); + } + + /** + * Overrides the private no-op implementation of {@link CallShouldBackoff} that determines whether + * the {@link DoFn} should hold {@link RequestT}s. Without this configuration, {@link RequestT}s + * are never held; no-op implemented {@link CallShouldBackoff#isTrue} always returns {@code + * false}. + */ + public RequestResponseIO withCallShouldBackoff( + CallShouldBackoff value) { + return new RequestResponseIO<>( + rrioConfiguration, callConfiguration.toBuilder().setCallShouldBackoff(value).build()); + } + + /** + * Overrides the default {@link SerializableSupplier} of a {@link Sleeper} that pauses code + * execution when user custom code throws a {@link RequestResponseIO#REPEATABLE_ERROR_TYPES} + * {@link UserCodeExecutionException}. The default supplies with {@link Sleeper#DEFAULT}. The need + * for a {@link SerializableSupplier} instead of setting this directly is that some + * implementations of {@link Sleeper} may not be {@link Serializable}. + */ + RequestResponseIO withSleeperSupplier(SerializableSupplier value) { + return new RequestResponseIO<>( + rrioConfiguration, callConfiguration.toBuilder().setSleeperSupplier(value).build()); + } + + /** + * Overrides the default {@link SerializableSupplier} of a {@link BackOff} that reports to a + * {@link Sleeper} how long to pause execution. It reports a {@link BackOff#STOP} to stop + * repeating invocation attempts. The default supplies with {@link FluentBackoff#DEFAULT}. The + * need for a {@link SerializableSupplier} instead of setting this directly is that some {@link + * BackOff} implementations, such as {@link FluentBackoff} are not {@link Serializable}. + */ + RequestResponseIO withBackOffSupplier(SerializableSupplier value) { + return new RequestResponseIO<>( + rrioConfiguration, callConfiguration.toBuilder().setBackOffSupplier(value).build()); + } + + /** + * Configures {@link RequestResponseIO} for reading and writing {@link RequestT} and {@link + * ResponseT} pairs using a cache. {@link RequestResponseIO}, by default, does not interact with a + * cache. + * + *
When reading, the transform {@link Flatten}s the {@link ResponseT} {@link PCollection}
+   * of successful pairs with that resulting from API calls of {@link RequestT}s of unsuccessful pairs.
+   * 
+ * + *
When writing, the transform {@link Flatten}s the {@link ResponseT} {@link PCollection} of
+   *    successful pairs with that resulting from API calls of {@link RequestT}s.
+   * 
+ */ + public RequestResponseIO withCache(Cache.Pair pair) { + return new RequestResponseIO<>( + rrioConfiguration + .toBuilder() + .setCacheRead(pair.getRead()) + .setCacheWrite(pair.getWrite()) + .build(), + callConfiguration); + } + + public RequestResponseIO withMonitoringConfiguration(Monitoring value) { + return new RequestResponseIO<>( + rrioConfiguration, callConfiguration.toBuilder().setMonitoringConfiguration(value).build()); + } + + /** + * Configures {@link RequestResponseIO} with a {@link PTransform} that holds back {@link + * RequestT}s to prevent quota errors such as HTTP 429 or gRPC RESOURCE_EXHAUSTION errors. + */ + // TODO(damondouglas): Until https://github.com/apache/beam/issues/28930 there is no provided + // solution for this, however this method allows users to provide their own at this time. + public RequestResponseIO withPreventiveThrottle( + PTransform, Result> throttle) { + return new RequestResponseIO<>( + rrioConfiguration.toBuilder().setThrottle(throttle).build(), callConfiguration); + } + + /** Exposes the transform's {@link Call.Configuration} for testing. */ + @VisibleForTesting + Call.Configuration getCallConfiguration() { + return callConfiguration; } - /** Configuration details for {@link RequestResponseIO}. */ + /** + * Configuration details for {@link RequestResponseIO}. Package-private as minimally required by + * {@link AutoValue}. + */ @AutoValue abstract static class Configuration { @@ -88,17 +291,45 @@ static Builder builder() { } /** - * The {@link Caller} that interfaces user custom code to process a {@link RequestT} into a - * {@link ResponseT}. + * Required by {@link Call} and applied to the resulting {@link ResponseT} {@link PCollection}. + */ + abstract Coder getResponseTCoder(); + + /** + * Reads a {@link RequestT} {@link PCollection} from an optional cache and returns a {@link KV} + * {@link PCollection} of the original {@link RequestT}s and associated {@link ResponseT}, null + * if no association persists in the cache. */ - abstract Caller getCaller(); + abstract @Nullable PTransform, Result>> + getCacheRead(); + + /** Writes {@link RequestT} and {@link ResponseT} associations to a cache. */ + abstract @Nullable PTransform< + PCollection>, Result>> + getCacheWrite(); + + /** Throttles a {@link RequestT} {@link PCollection}. */ + abstract @Nullable PTransform, Result> getThrottle(); abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { - abstract Builder setCaller(Caller value); + /** See {@link #getResponseTCoder}. */ + abstract Builder setResponseTCoder(Coder value); + + /** See {@link #getCacheRead}. */ + abstract Builder setCacheRead( + PTransform, Result>> value); + + /** See {@link #getCacheWrite}. */ + abstract Builder setCacheWrite( + PTransform>, Result>> value); + + /** See {@link #getThrottle}. */ + abstract Builder setThrottle( + PTransform, Result> value); abstract Configuration build(); } @@ -106,55 +337,268 @@ abstract static class Builder { @Override public Result expand(PCollection input) { - // TODO(damondouglas; https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+%5BRRIO%5D): - // expand pipeline as more dependencies develop. - return Result.of(new TupleTag() {}, PCollectionTuple.empty(input.getPipeline())); + + // Initialize empty ResponseT and APIIOError PCollectionLists. + PCollectionList responseList = PCollectionList.empty(input.getPipeline()); + PCollectionList failureList = PCollectionList.empty(input.getPipeline()); + + // Apply cache reads, if available, updating the input, responses, and failures. + Triple, PCollectionList, PCollectionList> + cacheRead = expandCacheRead(input, responseList, failureList); + input = cacheRead.getLeft(); + responseList = cacheRead.getMiddle(); + failureList = cacheRead.getRight(); + + // Throttle the RequestT input PCollection. + Pair, PCollectionList> throttle = + expandThrottle(input, failureList); + input = throttle.getLeft(); + failureList = throttle.getRight(); + + // Invoke the Caller for each RequestT input and write associated RequestT and ResponseT to + // the cache, if available. + Pair, PCollectionList> call = + expandCallWithOptionalCacheWrites(input, responseList, failureList); + responseList = call.getLeft(); + failureList = call.getRight(); + + // Flatten the responses and failures. + PCollection responses = + responseList.apply("FlattenResponses", Flatten.pCollections()); + PCollection failures = failureList.apply("FlattenErrors", Flatten.pCollections()); + + // Prepare and return final result. + PCollectionTuple pct = PCollectionTuple.of(responseTag, responses).and(failureTag, failures); + return Result.of(responses.getCoder(), responseTag, failureTag, pct); } /** - * The {@link Result} of processing request {@link PCollection} into response {@link PCollection} - * using custom {@link Caller} code. + * Expands with {@link Configuration#getCacheRead} if available. Otherwise, returns a {@link + * Triple} of original arguments. + * + *
Algorithm is as follows.
+   * 
    + *
  1. Attempts reads of associated {@link RequestT}/{@link ResponseT} pairs in a cache.
  2. + *
  3. {@link Partition}s {@link RequestT} elements based on whether an associated {@link ResponseT} exists.
  4. + *
  5. Returns a new {@link RequestT} {@link PCollection} for elements not associated with a {@link ResponseT} in the cache.
  6. + *
  7. Returns a new {@link ResponseT} {@link PCollectionList} appending elements found associated with a {@link RequestT}.
  8. + *
  9. Returns a new {@link ApiIOError} {@link PCollectionList} with any errors encountered reading from the cache.
  10. + *
*/ - public static class Result implements POutput { + Triple, PCollectionList, PCollectionList> + expandCacheRead( + PCollection input, + PCollectionList responseList, + PCollectionList failureList) { + if (rrioConfiguration.getCacheRead() == null) { + return Triple.of(input, responseList, failureList); + } + Result> cacheReadResult = + input.apply(CACHE_READ_NAME, checkStateNotNull(rrioConfiguration.getCacheRead())); - static Result of(TupleTag responseTag, PCollectionTuple pct) { - return new Result<>(responseTag, pct); + // Partition KV PCollection into elements that have null or not null response values. + PCollectionList> cacheReadList = + cacheReadResult + .getResponses() + .apply( + "PartitionCacheReads", + Partition.of(PartitionCacheReadsFn.NUM_PARTITIONS, new PartitionCacheReadsFn<>())); + + // Requests of null response values will be sent for Call downstream. + input = + cacheReadList + .get(PartitionCacheReadsFn.NULL_PARTITION) + .apply("UncachedRequests", Keys.create()); + + // Responses of non-null values will be returned to the final RequestResponseIO output. + responseList = + responseList.and( + cacheReadList + .get(PartitionCacheReadsFn.NON_NULL_PARTITION) + .apply("CachedResponses", Values.create())); + + // Append any failures to the final output. + failureList = failureList.and(cacheReadResult.getFailures()); + + return Triple.of(input, responseList, failureList); + } + + /** + * Expands with {@link Configuration#getThrottle}, if available. Otherwise, returns a {@link Pair} + * of original arguments. + * + *
Algorithm is as follows:
+   * 
    + *
  1. Applies throttle transform to {@link RequestT} {@link PCollection} input.
  2. + *
  3. Returns throttled {@link PCollection} of {@link RequestT} elements.
  4. + *
  5. Returns appended {@link PCollection} of {@link ApiIOError}s to the failureList.
  6. + *
+ */ + // TODO(damondouglas): See https://github.com/apache/beam/issues/28930; currently there is no + // provided solution for this, though users could provide their own via withThrottle. + Pair, PCollectionList> expandThrottle( + PCollection input, PCollectionList failureList) { + + if (rrioConfiguration.getThrottle() == null) { + return Pair.of(input, failureList); } - private final Pipeline pipeline; - private final TupleTag responseTag; - private final PCollection responses; - private final PCollection failures; + Result throttleResult = + input.apply(THROTTLE_NAME, checkStateNotNull(rrioConfiguration.getThrottle())); + + return Pair.of(throttleResult.getResponses(), failureList.and(throttleResult.getFailures())); + } + + /** + * Expands with a {@link Call} and {@link Configuration#getCacheWrite}, if available. + * + *
Algorithm is as follows:
+   * 
    + *
  1. If {@link Configuration#getCacheWrite} not available, instantiates and applies a + * {@link Call} using original {@link Caller} and optionally + * {@link SetupTeardown}.
  2. + *
  3. Otherwise, wraps the original {@link Caller} using + * {@link WrappedAssociatingRequestResponseCaller} that preserves the association between the + * {@link RequestT} and its {@link ResponseT}.
  4. + *
  5. Applies the resulting {@link KV} of the {@link RequestT} and its {@link ResponseT} + * to the {@link Configuration#getCacheWrite}, returning appended failures to the failureList + * and appended responses to the responseList.
  6. + *
+ */ + Pair, PCollectionList> expandCallWithOptionalCacheWrites( + PCollection input, + PCollectionList responseList, + PCollectionList failureList) { - private Result(TupleTag responseTag, PCollectionTuple pct) { - this.pipeline = pct.getPipeline(); - this.responseTag = responseTag; - this.responses = pct.get(responseTag); - this.failures = pct.get(FAILURE_TAG); + if (rrioConfiguration.getCacheWrite() == null) { + Call call = Call.of(callConfiguration); + Result result = input.apply(CALL_NAME, call); + return Pair.of( + responseList.and(result.getResponses()), failureList.and(result.getFailures())); } - public PCollection getResponses() { - return responses; + // Wrap caller to associate RequestT with ResponseT as a KV. + Caller> caller = + new WrappedAssociatingRequestResponseCaller<>(callConfiguration.getCaller()); + + Coder> coder = + KvCoder.of(input.getCoder(), rrioConfiguration.getResponseTCoder()); + + // Could not re-use original configuration because of different type parameters. + Call.Configuration> configuration = + Call.Configuration.>builder() + .setResponseCoder(coder) + .setCaller(caller) + .setSetupTeardown(callConfiguration.getSetupTeardown()) + .setBackOffSupplier(callConfiguration.getBackOffSupplier()) + .setCallShouldBackoff( + new WrappedAssociatingRequestResponseCallShouldBackoff<>( + callConfiguration.getCallShouldBackoff())) + .setShouldRepeat(callConfiguration.getShouldRepeat()) + .setSleeperSupplier(callConfiguration.getSleeperSupplier()) + .setTimeout(callConfiguration.getTimeout()) + .build(); + + Call> call = Call.of(configuration); + + // Extract ResponseT from KV; append failures. + Result> result = input.apply(CALL_NAME, call); + PCollection responses = + result.getResponses().apply(CALL_NAME + "Responses", Values.create()); + responseList = responseList.and(responses); + failureList = failureList.and(result.getFailures()); + + // Write RequestT and ResponseT pairs to the cache; append failures. + Result> cacheWriteResult = + result + .getResponses() + .apply(CACHE_WRITE_NAME, checkStateNotNull(rrioConfiguration.getCacheWrite())); + failureList = failureList.and(cacheWriteResult.getFailures()); + + return Pair.of(responseList, failureList); + } + + /** + * The {@link PartitionFn} used by {@link #expandCacheRead} to separate non-null {@link ResponseT} + * value {@link KV}s from null ones. This is so that {@link RequestT}s associated with non-null + * {@link ResponseT}s are not forwarded to the {@link Caller} and {@link PCollection} of the + * associated {@link ResponseT}s flatten and merge with the final result output. + */ + private static class PartitionCacheReadsFn + implements PartitionFn> { + + /** Used as input into {@link Partition#of}. */ + private static final int NUM_PARTITIONS = 2; + + /** + * Prevents supplying the wrong index when calling {@link PCollectionList#get} for the + * associated request and non-null response KVs. + */ + private static final int NON_NULL_PARTITION = 0; + + /** + * Prevents supplying the wrong index when calling {@link PCollectionList#get} for the + * associated request and null response KVs. + */ + private static final int NULL_PARTITION = 1; + + @Override + public int partitionFor(KV elem, int numPartitions) { + if (checkStateNotNull(elem).getValue() != null) { + return NON_NULL_PARTITION; + } + return NULL_PARTITION; + } + } + + /** + * Used by {@link #expandCallWithOptionalCacheWrites}, in the setting of a non-null {@link + * Configuration#getCacheWrite}, to wrap the original {@link Caller} so that it returns a {@link + * KV} of {@link RequestT}s and their associated {@link ResponseT}s for use as input into the + * {@link Configuration#getCacheWrite} {@link PTransform}. + */ + private static class WrappedAssociatingRequestResponseCaller + implements Caller> { + + private final Caller caller; + + private WrappedAssociatingRequestResponseCaller(Caller caller) { + this.caller = caller; } - public PCollection getFailures() { - return failures; + @Override + public KV call(RequestT request) throws UserCodeExecutionException { + ResponseT response = caller.call(request); + return KV.of(request, response); + } + } + + /** + * Required by {@link #expandCallWithOptionalCacheWrites}, in the setting of a non-null {@link + * Configuration#getCacheWrite}, to match the signature of {@link CallShouldBackoff} with the + * corresponding {@link WrappedAssociatingRequestResponseCaller}. + */ + private static class WrappedAssociatingRequestResponseCallShouldBackoff + implements CallShouldBackoff> { + private final CallShouldBackoff basis; + + private WrappedAssociatingRequestResponseCallShouldBackoff(CallShouldBackoff basis) { + this.basis = basis; } @Override - public Pipeline getPipeline() { - return this.pipeline; + public void update(UserCodeExecutionException exception) { + basis.update(exception); } @Override - public Map, PValue> expand() { - return ImmutableMap.of( - responseTag, responses, - FAILURE_TAG, failures); + public void update(KV response) { + basis.update(response.getValue()); } @Override - public void finishSpecifyingOutput( - String transformName, PInput input, PTransform transform) {} + public boolean isTrue() { + return basis.isTrue(); + } } } diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Result.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Result.java new file mode 100644 index 000000000000..48b679a80b3e --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Result.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +/** + * The {@link Result} of processing request {@link PCollection} into response {@link PCollection}. + */ +public class Result implements POutput { + + /** + * Instantiates a {@link Result}. Package private as the design goals of {@link Result} are to be + * a convenience read-only wrapper around a {@link PCollectionTuple}. + */ + static Result of( + Coder responseTCoder, + TupleTag responseTag, + TupleTag failureTag, + PCollectionTuple pct) { + return new Result<>(responseTCoder, responseTag, pct, failureTag); + } + + private final Pipeline pipeline; + private final TupleTag responseTag; + private final TupleTag failureTag; + private final PCollection responses; + private final PCollection failures; + + private Result( + Coder responseTCoder, + TupleTag responseTag, + PCollectionTuple pct, + TupleTag failureTag) { + this.pipeline = pct.getPipeline(); + this.responseTag = responseTag; + this.failureTag = failureTag; + this.responses = pct.get(responseTag).setCoder(responseTCoder); + this.failures = pct.get(this.failureTag); + } + + public PCollection getResponses() { + return responses; + } + + public PCollection getFailures() { + return failures; + } + + @Override + public Pipeline getPipeline() { + return this.pipeline; + } + + @Override + public Map, PValue> expand() { + return ImmutableMap.of( + responseTag, responses, + failureTag, failures); + } + + @Override + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform transform) {} +} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/SerializableSupplier.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/SerializableSupplier.java new file mode 100644 index 000000000000..f9ebaf815605 --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/SerializableSupplier.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import java.io.Serializable; +import java.util.function.Supplier; + +/** + * A union of a {@link Supplier} and {@link Serializable}, enabling configuration with {@link T} + * types that are not {@link Serializable}. + */ +@FunctionalInterface +public interface SerializableSupplier extends Supplier, Serializable {} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java index dffc034770aa..808f0b5d9639 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java @@ -76,7 +76,7 @@ * *

{@link ThrottleWithExternalResource} flattens errors emitted from {@link EnqueuerT}, {@link * RefresherT}, and its own {@link DoFn} into a single {@link ApiIOError} {@link PCollection} that - * is encapsulated, with a {@link T} {@link PCollection} output into a {@link Call.Result}. + * is encapsulated, with a {@link T} {@link PCollection} output into a {@link Result}. */ class ThrottleWithExternalResource< T, @@ -85,7 +85,7 @@ class ThrottleWithExternalResource< DequeuerT extends Caller & SetupTeardown, DecrementerT extends Caller & SetupTeardown, RefresherT extends Caller & SetupTeardown> - extends PTransform, Call.Result> { + extends PTransform, Result> { /** * Instantiate a {@link ThrottleWithExternalResource} using a {@link RedisClient}. @@ -151,17 +151,17 @@ class ThrottleWithExternalResource< } @Override - public Call.Result expand(PCollection input) { + public Result expand(PCollection input) { Pipeline pipeline = input.getPipeline(); // Refresh known quota to control the throttle rate. - Call.Result refreshResult = + Result refreshResult = pipeline .apply("quota impulse", PeriodicImpulse.create().withInterval(quota.getInterval())) .apply("quota refresh", getRefresher()); // Enqueue T elements. - Call.Result enqueuResult = input.apply("enqueue", getEnqueuer()); + Result enqueuResult = input.apply("enqueue", getEnqueuer()); TupleTag outputTag = new TupleTag() {}; TupleTag failureTag = new TupleTag() {}; @@ -191,7 +191,7 @@ public Call.Result expand(PCollection input) { TupleTag resultOutputTag = new TupleTag() {}; TupleTag resultFailureTag = new TupleTag() {}; - return Call.Result.of( + return Result.of( coder, resultOutputTag, resultFailureTag, diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeExecutionException.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeExecutionException.java index be545b6da66a..a4e0e155a878 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeExecutionException.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeExecutionException.java @@ -35,4 +35,12 @@ public UserCodeExecutionException( String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } + + /** + * Reports whether when thrown warrants repeat execution. Some errors are not necessarily due to + * user code such as quota or remote service errors and may warrant repeating. + */ + public boolean shouldRepeat() { + return false; + } } diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeQuotaException.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeQuotaException.java index c513a5371da7..390fb2c3147f 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeQuotaException.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeQuotaException.java @@ -39,4 +39,10 @@ public UserCodeQuotaException( String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } + + /** Reports that quota errors should be repeated. */ + @Override + public boolean shouldRepeat() { + return true; + } } diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeRemoteSystemException.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeRemoteSystemException.java index dac16344bec7..fc0ea4c53389 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeRemoteSystemException.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeRemoteSystemException.java @@ -38,4 +38,13 @@ public UserCodeRemoteSystemException( String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } + + /** + * Reports that remote system errors should be repeated. These may be transient errors of a remote + * API service that resolve in time. Thus requests should be repeated.. + */ + @Override + public boolean shouldRepeat() { + return true; + } } diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeTimeoutException.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeTimeoutException.java index 869b8a51b73f..5fa5f0cceef6 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeTimeoutException.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/UserCodeTimeoutException.java @@ -36,4 +36,13 @@ public UserCodeTimeoutException( String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } + + /** + * Reports that timeouts should be repeated. A remote API service may be intermittently + * unavailable, thus warranting this type of error to be repeated. + */ + @Override + public boolean shouldRepeat() { + return true; + } } diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoff.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoff.java new file mode 100644 index 000000000000..fbbafeb906f7 --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoff.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Performs {@link CallShouldBackoff} computations but within a windowed {@link Duration}. + * Reinstantiates {@link CallShouldBackoff} using a {@link SerializableSupplier} at {@link #update} + * after a check for whether a windowed {@link Duration} elapsed. + */ +class WindowedCallShouldBackoff implements CallShouldBackoff { + + private final Duration window; + private final SerializableSupplier> callShouldBackoffSupplier; + private @MonotonicNonNull CallShouldBackoff basis; + private Instant nextReset; + + /** + * Instantiates a {@link WindowedCallShouldBackoff} with a {@link Duration} window and a {@link + * SerializableSupplier}. Within the constructor, sets the clock to {@link Instant#now()} and + * instantiates {@link CallShouldBackoff} using the {@link SerializableSupplier}. + */ + WindowedCallShouldBackoff( + Duration window, + SerializableSupplier> callShouldBackoffSupplier) { + this.window = window; + this.callShouldBackoffSupplier = callShouldBackoffSupplier; + this.basis = callShouldBackoffSupplier.get(); + this.nextReset = Instant.now().plus(window); + } + + private void resetIfNeeded() { + if (nextReset.isBeforeNow()) { + basis = callShouldBackoffSupplier.get(); + nextReset = nextReset.plus(window); + } + } + + @Override + public void update(UserCodeExecutionException exception) { + resetIfNeeded(); + checkStateNotNull(basis).update(exception); + } + + @Override + public void update(ResponseT response) { + resetIfNeeded(); + checkStateNotNull(basis).update(response); + } + + @Override + public boolean isTrue() { + resetIfNeeded(); + return checkStateNotNull(basis).isTrue(); + } +} diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheIT.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheIT.java index 95497e6013af..732eb15dca17 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheIT.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheIT.java @@ -17,6 +17,8 @@ */ package org.apache.beam.io.requestresponse; +import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions; + import java.net.URI; import java.util.List; import java.util.stream.Collectors; @@ -30,6 +32,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.joda.time.Duration; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -37,12 +40,12 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.utility.DockerImageName; -/** Integration tests for {@link Cache}. */ @RunWith(JUnit4.class) public class CacheIT { - @Rule public TestPipeline writePipeline = TestPipeline.create(); + private final EchoITOptions options = readIOTestPipelineOptions(EchoITOptions.class); + @Rule public TestPipeline writePipeline = TestPipeline.fromOptions(options); - @Rule public TestPipeline readPipeline = TestPipeline.create(); + @Rule public TestPipeline readPipeline = TestPipeline.fromOptions(options); private static final String CONTAINER_IMAGE_NAME = "redis:5.0.3-alpine"; private static final Integer PORT = 6379; @@ -60,6 +63,11 @@ public class CacheIT { String.format("redis://%s:%d", redis.getHost(), redis.getFirstMappedPort())); }); + @BeforeClass + public static void removeIntegrationTestsProperty() { + System.clearProperty("integrationTestPipelineOptions"); + } + @Test public void givenRequestResponsesCached_writeThenReadYieldsMatches() throws NonDeterministicException { @@ -104,7 +112,7 @@ private void writeThenReadThenPAssert( PCollection requests = readPipeline.apply(Create.of(toRead)).setCoder(CallTest.DETERMINISTIC_REQUEST_CODER); - Call.Result> gotKVsResult = + Result> gotKVsResult = requests.apply( Cache.readUsingRedis( externalClients.getActualClient(), diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallShouldBackoffBasedOnRejectionProbabilityTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallShouldBackoffBasedOnRejectionProbabilityTest.java index 40aaa48c2692..894b0ceaf6d9 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallShouldBackoffBasedOnRejectionProbabilityTest.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallShouldBackoffBasedOnRejectionProbabilityTest.java @@ -42,7 +42,7 @@ public void testValue() { } } assertEquals(caze.toString(), caze.wantPReject, shouldBackoff.getRejectionProbability(), 0.1); - assertEquals(caze.toString(), caze.wantValue, shouldBackoff.value()); + assertEquals(caze.toString(), caze.wantValue, shouldBackoff.isTrue()); } } diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java index 1566d1725295..b369a62ae78d 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java @@ -26,7 +26,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import org.apache.beam.io.requestresponse.Call.Result; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownIT.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownIT.java index c10b7ee1609e..8f763b96719a 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownIT.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownIT.java @@ -58,7 +58,7 @@ public static void setUp() throws UserCodeExecutionException { + GRPC_ENDPOINT_ADDRESS_NAME + " is missing. See " + EchoITOptions.class - + "for details."); + + " for details."); } client = EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getGrpcEndpointAddress())); checkStateNotNull(client).setup(); diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoRequestCoder.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoRequestCoder.java index 75cd49904cff..f654f8225e47 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoRequestCoder.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoRequestCoder.java @@ -25,6 +25,7 @@ import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest; import org.checkerframework.checker.nullness.qual.NonNull; +/** A {@link CustomCoder} for {@link EchoRequest}s. */ class EchoRequestCoder extends CustomCoder<@NonNull EchoRequest> { @Override diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoResponseCoder.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoResponseCoder.java new file mode 100644 index 000000000000..5b379b7dcf0f --- /dev/null +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoResponseCoder.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse; + +/** A {@link CustomCoder} for {@link EchoResponse}es. */ +public class EchoResponseCoder extends CustomCoder { + + @Override + public void encode(EchoResponse value, OutputStream outStream) + throws CoderException, IOException { + value.writeTo(outStream); + } + + @Override + public EchoResponse decode(InputStream inStream) throws CoderException, IOException { + return EchoResponse.parseFrom(inStream); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException {} +} diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientIT.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientIT.java index 939515836bff..9cb2d90dfe68 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientIT.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientIT.java @@ -121,7 +121,7 @@ public void setex_expiresDataWhenExpected() externalClients.getActualClient().setex(keyBytes, keyBytes, expiry); assertTrue(externalClients.getValidatingClient().exists(keyBytes)); assertTrue(externalClients.getValidatingClient().ttl(keyBytes) > 0L); - Thread.sleep(expiry.getMillis()); + Thread.sleep(expiry.plus(Duration.millis(100L)).getMillis()); assertFalse(externalClients.getValidatingClient().exists(keyBytes)); } diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RepeaterTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RepeaterTest.java index 14bb25ce4ad4..f1c5db9c72a6 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RepeaterTest.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RepeaterTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.io.requestresponse; -import static org.apache.beam.io.requestresponse.Repeater.REPEATABLE_ERROR_TYPES; +import static org.apache.beam.io.requestresponse.RequestResponseIO.REPEATABLE_ERROR_TYPES; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.instanceOf; diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOIT.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOIT.java new file mode 100644 index 000000000000..e68db671de7d --- /dev/null +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOIT.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import static org.apache.beam.io.requestresponse.EchoITOptions.GRPC_ENDPOINT_ADDRESS_NAME; +import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import com.google.protobuf.ByteString; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest; +import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Integration tests for {@link RequestResponseIO}. See {@link EchoITOptions} for details on the + * required parameters and how to provide these for running integration tests. + */ +public class RequestResponseIOIT { + @Rule public TestPipeline pipeline = TestPipeline.create(); + + private static @MonotonicNonNull EchoITOptions options; + private static @MonotonicNonNull EchoGRPCCallerWithSetupTeardown client; + private static final ByteString PAYLOAD = ByteString.copyFromUtf8("payload"); + + @BeforeClass + public static void setUp() throws UserCodeExecutionException { + options = readIOTestPipelineOptions(EchoITOptions.class); + if (Strings.isNullOrEmpty(options.getGrpcEndpointAddress())) { + throw new RuntimeException( + "--" + + GRPC_ENDPOINT_ADDRESS_NAME + + " is missing. See " + + EchoITOptions.class + + " for details."); + } + client = EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getGrpcEndpointAddress())); + } + + @Test + public void givenMinimalConfiguration_executesRequests() { + Result response = + createShouldNeverExceedQuotaRequestPCollection(10L) + .apply( + "echo", + RequestResponseIO.ofCallerAndSetupTeardown(client, new EchoResponseCoder())); + + PAssert.that(response.getFailures()).empty(); + PAssert.thatSingleton(response.getResponses().apply("count", Count.globally())).isEqualTo(10L); + + pipeline.run(); + } + + private PCollection createShouldNeverExceedQuotaRequestPCollection(long size) { + List requests = new ArrayList<>(); + for (long i = 0; i < size; i++) { + requests.add(createShouldNeverExceedQuotaRequest()); + } + return pipeline.apply("generate", Create.of(requests)); + } + + private static @NonNull EchoRequest createShouldNeverExceedQuotaRequest() { + return EchoRequest.newBuilder() + .setPayload(PAYLOAD) + .setId(checkStateNotNull(options).getNeverExceedQuotaId()) + .build(); + } +} diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java new file mode 100644 index 000000000000..f54d3e595b03 --- /dev/null +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.util.List; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaProvider; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link RequestResponseIO}. */ +@RunWith(JUnit4.class) +public class RequestResponseIOTest { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + private static final TypeDescriptor RESPONSE_TYPE = TypeDescriptor.of(Response.class); + private static final SchemaProvider SCHEMA_PROVIDER = new AutoValueSchema(); + + private static final Coder RESPONSE_CODER = + SchemaCoder.of( + checkStateNotNull(SCHEMA_PROVIDER.schemaFor(RESPONSE_TYPE)), + RESPONSE_TYPE, + checkStateNotNull(SCHEMA_PROVIDER.toRowFunction(RESPONSE_TYPE)), + checkStateNotNull(SCHEMA_PROVIDER.fromRowFunction(RESPONSE_TYPE))); + + @Test + public void givenCallerOnly_thenProcessesRequestsWithDefaultFeatures() { + Caller caller = new CallerImpl(); + + RequestResponseIO transform = + RequestResponseIO.of(caller, RESPONSE_CODER) + .withMonitoringConfiguration( + Monitoring.builder().build().withEverythingCountedExceptedCaching()); + + Result result = requests().apply("rrio", transform); + + PAssert.that(result.getFailures()).empty(); + PAssert.that(result.getResponses()).containsInAnyOrder(responses()); + + PipelineResult pipelineResult = pipeline.run(); + MetricResults metrics = pipelineResult.metrics(); + pipelineResult.waitUntilFinish(); + + assertThat( + getCounterResult(metrics, Call.class, Monitoring.REQUESTS_COUNTER_NAME), greaterThan(0L)); + assertThat( + getCounterResult(metrics, Call.class, Monitoring.RESPONSES_COUNTER_NAME), greaterThan(0L)); + + assertThat( + getCounterResult(metrics, Call.class, Monitoring.callCounterNameOf(caller)), + greaterThan(0L)); + + assertThat( + getCounterResult( + metrics, Call.class, Monitoring.setupCounterNameOf(new Call.NoopSetupTeardown())), + greaterThan(0L)); + + // We expect remaining metrics to be 0. + assertThat( + getCounterResult(metrics, Call.class, Monitoring.FAILURES_COUNTER_NAME), equalTo(0L)); + + assertThat( + getCounterResult( + metrics, + Call.class, + Monitoring.shouldBackoffCounterName( + transform.getCallConfiguration().getCallShouldBackoff())), + equalTo(0L)); + + assertThat( + getCounterResult( + metrics, + Call.class, + Monitoring.sleeperCounterNameOf( + transform.getCallConfiguration().getSleeperSupplier().get())), + equalTo(0L)); + + assertThat( + getCounterResult( + metrics, + Call.class, + Monitoring.backoffCounterNameOf( + transform.getCallConfiguration().getBackOffSupplier().get())), + equalTo(0L)); + } + + @Test + public void givenCallerAndSetupTeardown_thenCallerInvokesSetupTeardown() { + Result result = + requests() + .apply( + "rrio", + RequestResponseIO.ofCallerAndSetupTeardown( + new CallerSetupTeardownImpl(), RESPONSE_CODER) + .withMonitoringConfiguration( + Monitoring.builder().setCountCalls(true).setCountSetup(true).build())); + + PAssert.that(result.getFailures()).empty(); + PAssert.that(result.getResponses()).containsInAnyOrder(responses()); + + PipelineResult pipelineResult = pipeline.run(); + MetricResults metricResults = pipelineResult.metrics(); + pipelineResult.waitUntilFinish(); + + assertThat( + getCounterResult( + metricResults, Call.class, Monitoring.callCounterNameOf(new CallerSetupTeardownImpl())), + greaterThan(0L)); + + assertThat( + getCounterResult( + metricResults, + Call.class, + Monitoring.setupCounterNameOf(new CallerSetupTeardownImpl())), + greaterThan(0L)); + } + + @Test + public void givenDefaultConfiguration_shouldRepeatFailedRequests() { + Result result = + requests() + .apply( + "rrio", + RequestResponseIO.of(new CallerImpl(1), RESPONSE_CODER) + .withMonitoringConfiguration(Monitoring.builder().setCountCalls(true).build())); + + PAssert.that(result.getFailures()).empty(); + PAssert.that(result.getResponses()).containsInAnyOrder(responses()); + + PipelineResult pipelineResult = pipeline.run(); + MetricResults metrics = pipelineResult.metrics(); + pipelineResult.waitUntilFinish(); + assertThat( + getCounterResult(metrics, Call.class, Monitoring.callCounterNameOf(new CallerImpl())), + equalTo(2L)); + } + + @Test + public void givenDefaultConfiguration_usesDefaultBackoffSupplier() { + Caller caller = new CallerImpl(1); + + requests() + .apply( + "rrio", + RequestResponseIO.of(caller, RESPONSE_CODER) + .withMonitoringConfiguration(Monitoring.builder().setCountBackoffs(true).build())); + + PipelineResult pipelineResult = pipeline.run(); + MetricResults metrics = pipelineResult.metrics(); + pipelineResult.waitUntilFinish(); + assertThat( + getCounterResult( + metrics, + Call.class, + Monitoring.backoffCounterNameOf(new DefaultSerializableBackoffSupplier().get())), + greaterThan(0L)); + } + + @Test + public void givenDefaultConfiguration_usesDefaultSleeper() { + Caller caller = new CallerImpl(1); + + requests() + .apply( + "rrio", + RequestResponseIO.of(caller, RESPONSE_CODER) + .withMonitoringConfiguration(Monitoring.builder().setCountSleeps(true).build())); + + PipelineResult pipelineResult = pipeline.run(); + MetricResults metrics = pipelineResult.metrics(); + pipelineResult.waitUntilFinish(); + assertThat( + getCounterResult(metrics, Call.class, Monitoring.sleeperCounterNameOf(Sleeper.DEFAULT)), + greaterThan(0L)); + } + + @Test + public void givenDefaultConfiguration_usesDefaultCallShouldBackoff() { + Caller caller = new CallerImpl(); + + RequestResponseIO transform = RequestResponseIO.of(caller, RESPONSE_CODER); + // We prime the default implementation so that we guarantee value() == true during the test. + CallShouldBackoffBasedOnRejectionProbability shouldBackoffImpl = + (CallShouldBackoffBasedOnRejectionProbability) + transform.getCallConfiguration().getCallShouldBackoff(); + shouldBackoffImpl.setThreshold(0); + shouldBackoffImpl.update(new UserCodeExecutionException("")); + + requests() + .apply( + "rrio", + transform + .withCallShouldBackoff(shouldBackoffImpl) + .withMonitoringConfiguration( + Monitoring.builder().setCountShouldBackoff(true).build())); + + PipelineResult pipelineResult = pipeline.run(); + MetricResults metrics = pipelineResult.metrics(); + pipelineResult.waitUntilFinish(); + assertThat( + getCounterResult( + metrics, Call.class, Monitoring.shouldBackoffCounterName(shouldBackoffImpl)), + greaterThan(0L)); + } + + @Test + public void givenWithoutRepeater_shouldNotRepeatRequests() { + Result result = + requests() + .apply( + "rrio", + RequestResponseIO.of(new CallerImpl(1), RESPONSE_CODER) + .withoutRepeater() + .withMonitoringConfiguration( + Monitoring.builder().setCountCalls(true).setCountFailures(true).build())); + + PAssert.that(result.getResponses()).empty(); + + PipelineResult pipelineResult = pipeline.run(); + MetricResults metrics = pipelineResult.metrics(); + pipelineResult.waitUntilFinish(); + + assertThat( + getCounterResult(metrics, Call.class, Monitoring.callCounterNameOf(new CallerImpl())), + greaterThan(0L)); + + assertThat( + getCounterResult(metrics, Call.class, Monitoring.FAILURES_COUNTER_NAME), greaterThan(0L)); + } + + @Test + public void givenCustomCallShouldBackoff_thenComputeUsingCustom() { + CustomCallShouldBackoff customCallShouldBackoff = new CustomCallShouldBackoff<>(); + requests() + .apply( + "rrio", + RequestResponseIO.of(new CallerImpl(), RESPONSE_CODER) + .withCallShouldBackoff(customCallShouldBackoff)); + + PipelineResult pipelineResult = pipeline.run(); + MetricResults metrics = pipelineResult.metrics(); + pipelineResult.waitUntilFinish(); + + assertThat( + getCounterResult( + metrics, + customCallShouldBackoff.getClass(), + customCallShouldBackoff.getCounterName().getName()), + greaterThan(0L)); + } + + @Test + public void givenCustomSleeper_thenSleepBehaviorCustom() { + CustomSleeperSupplier customSleeperSupplier = new CustomSleeperSupplier(); + requests() + .apply( + "rrio", + RequestResponseIO.of(new CallerImpl(100), RESPONSE_CODER) + .withSleeperSupplier(customSleeperSupplier)); + + PipelineResult pipelineResult = pipeline.run(); + MetricResults metrics = pipelineResult.metrics(); + pipelineResult.waitUntilFinish(); + assertThat( + getCounterResult( + metrics, + customSleeperSupplier.getClass(), + customSleeperSupplier.getCounterName().getName()), + greaterThan(0L)); + } + + @Test + public void givenCustomBackoff_thenBackoffBehaviorCustom() { + CustomBackOffSupplier customBackOffSupplier = new CustomBackOffSupplier(); + requests() + .apply( + "rrio", + RequestResponseIO.of(new CallerImpl(100), RESPONSE_CODER) + .withBackOffSupplier(customBackOffSupplier)); + + PipelineResult pipelineResult = pipeline.run(); + MetricResults metrics = pipelineResult.metrics(); + pipelineResult.waitUntilFinish(); + assertThat( + getCounterResult( + metrics, + customBackOffSupplier.getClass(), + customBackOffSupplier.getCounterName().getName()), + greaterThan(0L)); + } + + // TODO(damondouglas): Count metrics of caching after https://github.com/apache/beam/issues/29888 + // resolves. + @Ignore + @Test + public void givenWithCache_thenRequestsResponsesCachedUsingCustom() {} + + private PCollection requests() { + return pipeline.apply( + "create requests", Create.of(Request.builder().setALong(1L).setAString("a").build())); + } + + private List responses() { + return ImmutableList.of(Response.builder().setAString("a").setALong(1L).build()); + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + abstract static class Request { + + static Builder builder() { + return new AutoValue_RequestResponseIOTest_Request.Builder(); + } + + abstract String getAString(); + + abstract Long getALong(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setAString(String value); + + abstract Builder setALong(Long value); + + abstract Request build(); + } + } + + @AutoValue + abstract static class Response { + static Builder builder() { + return new AutoValue_RequestResponseIOTest_Response.Builder(); + } + + abstract String getAString(); + + abstract Long getALong(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setAString(String value); + + abstract Builder setALong(Long value); + + abstract Response build(); + } + } + + private static class CallerSetupTeardownImpl implements Caller, SetupTeardown { + private final CallerImpl caller = new CallerImpl(); + + @Override + public Response call(Request request) throws UserCodeExecutionException { + return caller.call(request); + } + + @Override + public void setup() throws UserCodeExecutionException {} + + @Override + public void teardown() throws UserCodeExecutionException {} + } + + private static class CallerImpl implements Caller { + private int numErrors = 0; + + private CallerImpl() {} + + private CallerImpl(int numErrors) { + this.numErrors = numErrors; + } + + @Override + public Response call(Request request) throws UserCodeExecutionException { + if (numErrors > 0) { + numErrors--; + throw new UserCodeQuotaException(""); + } + return Response.builder() + .setAString(request.getAString()) + .setALong(request.getALong()) + .build(); + } + } + + private static class CustomCallShouldBackoff implements CallShouldBackoff { + private final Counter counter = + Metrics.counter(CustomCallShouldBackoff.class, "custom_counter"); + + @Override + public void update(UserCodeExecutionException exception) {} + + @Override + public void update(ResponseT response) {} + + @Override + public boolean isTrue() { + counter.inc(); + return false; + } + + MetricName getCounterName() { + return counter.getName(); + } + } + + private static class CustomSleeperSupplier implements SerializableSupplier { + private final Counter counter = Metrics.counter(CustomSleeperSupplier.class, "custom_counter"); + + @Override + public Sleeper get() { + return millis -> counter.inc(); + } + + MetricName getCounterName() { + return counter.getName(); + } + } + + private static class CustomBackOffSupplier implements SerializableSupplier { + + private final Counter counter = Metrics.counter(CustomBackOffSupplier.class, "custom_counter"); + + @Override + public BackOff get() { + return new BackOff() { + @Override + public void reset() throws IOException {} + + @Override + public long nextBackOffMillis() throws IOException { + counter.inc(); + return 0; + } + }; + } + + MetricName getCounterName() { + return counter.getName(); + } + } + + private static Long getCounterResult(MetricResults metrics, Class namespace, String name) { + MetricQueryResults metricQueryResults = + metrics.queryMetrics( + MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build()); + return getCounterResult(metricQueryResults.getCounters(), namespace, name); + } + + private static Long getCounterResult( + Iterable> counters, Class namespace, String name) { + Long result = 0L; + for (MetricResult counter : counters) { + MetricName metricName = counter.getName(); + if (metricName.getNamespace().equals(namespace.getName()) + && metricName.getName().equals(name)) { + result = counter.getCommitted(); + } + } + return result; + } +} diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java index 24db38f926ee..00cee0a49357 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java @@ -85,7 +85,7 @@ public static void setUp() throws UserCodeExecutionException { + GRPC_ENDPOINT_ADDRESS_NAME + " is missing. See " + EchoITOptions.class - + "for details."); + + " for details."); } client = EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getGrpcEndpointAddress())); checkStateNotNull(client).setup(); @@ -115,7 +115,7 @@ public void givenThrottleUsingRedis_preventsQuotaErrors() throws NonDeterministi URI.create(String.format("redis://%s:%d", redis.getHost(), redis.getFirstMappedPort())); pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false); - Call.Result throttleResult = + Result throttleResult = createRequestStream() .apply( "throttle", @@ -148,7 +148,7 @@ public void givenThrottleUsingRedis_preventsQuotaErrors() throws NonDeterministi .containsInAnyOrder(QUOTA_ID); // Call the Echo service with throttled requests. - Call.Result echoResult = + Result echoResult = throttleResult .getResponses() .apply("call", Call.ofCallerAndSetupTeardown(client, RESPONSE_CODER)); diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoffTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoffTest.java new file mode 100644 index 000000000000..5316f251200a --- /dev/null +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoffTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import static org.apache.beam.sdk.testing.SerializableMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.joda.time.Duration; +import org.junit.Test; + +/** Tests for {@link WindowedCallShouldBackoff}. */ +public class WindowedCallShouldBackoffTest { + + @Test + public void resetsComputationPerWindow() throws InterruptedException { + WindowedCallShouldBackoff instance = instantiate(Duration.standardSeconds(1L)); + instance.update(new UserCodeExecutionException("")); + instance.update(new UserCodeExecutionException("")); + instance.update(new UserCodeExecutionException("")); + instance.update(new UserCodeExecutionException("")); + instance.update(new UserCodeExecutionException("")); + assertThat(instance.isTrue(), equalTo(true)); + + Thread.sleep(1001L); + assertThat(instance.isTrue(), equalTo(false)); + + instance.update(new UserCodeExecutionException("")); + instance.update(new UserCodeExecutionException("")); + instance.update(new UserCodeExecutionException("")); + instance.update(new UserCodeExecutionException("")); + instance.update(new UserCodeExecutionException("")); + assertThat(instance.isTrue(), equalTo(true)); + } + + private static WindowedCallShouldBackoff instantiate(Duration window) { + return new WindowedCallShouldBackoff<>( + window, + (SerializableSupplier>) + () -> new CallShouldBackoffBasedOnRejectionProbability().setThreshold(0.5)); + } +}