Skip to content

Commit

Permalink
[RRIO] Build RequestResponseIO and related dependency changes (#29710)
Browse files Browse the repository at this point in the history
* Move Call.Result as upper level class

* Patch documentation

* Replace RRIO's Result

* Implement RequestResponseIO expand methods

* Begin tests for transform construction

* spotlessApply

* Complete RequestResponseIO transform composite construction tests

* Implement windowed CallShouldBackoff

* Start completion of Call tasks

* Complete Call features

* Expose Call features configuration

* Add coverage of custom Call configuration

* Final cleanup

* Patch arrangement of Builder methods

* Clean up Call.Configuration

* Patch typo

* Patch unused annotations

* Remove support of Redis based throttling

* WIP: Refactor per PR comments

* Refactor RequestResponseIOTest

* Improve readability

* Add sleepIfNeeded
  • Loading branch information
damondouglas authored Jan 9, 2024
1 parent bb0a203 commit 8fb06da
Show file tree
Hide file tree
Showing 31 changed files with 2,358 additions and 237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.io.requestresponse;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.value.AutoValue;
Expand All @@ -26,7 +28,6 @@
import org.apache.beam.sdk.schemas.annotations.SchemaCaseFormat;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.joda.time.Instant;

/** {@link ApiIOError} is a data class for storing details about an error. */
Expand All @@ -41,10 +42,10 @@ public abstract class ApiIOError {
* Instantiate an {@link ApiIOError} from an {@link ErrorT} {@link T} element. The {@link T}
* element is converted to a JSON string.
*/
static <T, ErrorT extends Exception> ApiIOError of(@NonNull ErrorT e, @NonNull T element)
static <T, ErrorT extends Exception> ApiIOError of(ErrorT e, T element)
throws JsonProcessingException {

String json = OBJECT_MAPPER.writeValueAsString(element);
String json = OBJECT_MAPPER.writeValueAsString(checkStateNotNull(element));

return ApiIOError.builder()
.setRequestAsJsonString(json)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,16 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -32,7 +39,73 @@
import org.joda.time.Duration;

/** Transforms for reading and writing request/response associations to a cache. */
final class Cache {
// TODO(damondouglas): Add metrics per https://github.com/apache/beam/issues/29888.
public final class Cache {

/**
* Builds a {@link Pair} using a <a href="https://redis.io">Redis</a> cache to read and write
* {@link RequestT} and {@link ResponseT} pairs. The purpose of the cache is to offload {@link
* RequestT}s from the API and instead return the {@link ResponseT} if the association is known.
* Since the {@link RequestT}s and {@link ResponseT}s need encoding and decoding, checks are made
* whether the requestTCoder and responseTCoders are {@link Coder#verifyDeterministic}.
* <strong>This feature is only appropriate for API reads such as HTTP list, get, etc.</strong>
*
* <pre>Below describes the parameters in more detail and their usage.</pre>
*
* <ul>
* <li>{@code URI uri} - the {@link URI} of the Redis instance.
* <li>{@code Coder<RequestT> requestTCoder} - the {@link RequestT} {@link Coder} to encode and
* decode {@link RequestT}s during cache read and writes.
* <li>{@code Duration expiry} - the duration to hold {@link RequestT} and {@link ResponseT}
* pairs in the cache.
* </ul>
*/
public static <RequestT, ResponseT> Pair<RequestT, ResponseT> usingRedis(
URI uri, Coder<RequestT> requestTCoder, Coder<ResponseT> responseTCoder, Duration expiry)
throws NonDeterministicException {
PTransform<PCollection<RequestT>, Result<KV<RequestT, @Nullable ResponseT>>> read =
Cache.<RequestT, @Nullable ResponseT>readUsingRedis(
new RedisClient(uri), requestTCoder, new CacheResponseCoder<>(responseTCoder));

PTransform<PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>> write =
// Type arguments needed to resolve "error: [assignment] incompatible types in assignment."
Cache.<RequestT, ResponseT>writeUsingRedis(
expiry, new RedisClient(uri), requestTCoder, new CacheResponseCoder<>(responseTCoder));

return Pair.<RequestT, ResponseT>of(read, write);
}

/**
* A simple POJO that holds both cache read and write {@link PTransform}s. Functionally, these go
* together and must at times be instantiated using the same inputs.
*/
public static class Pair<RequestT, ResponseT> {
private final PTransform<PCollection<RequestT>, Result<KV<RequestT, @Nullable ResponseT>>> read;
private final PTransform<PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>>
write;

public static <RequestT, ResponseT> Pair<RequestT, ResponseT> of(
PTransform<PCollection<RequestT>, Result<KV<RequestT, @Nullable ResponseT>>> read,
PTransform<PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>> write) {
return new Pair<>(read, write);
}

private Pair(
PTransform<PCollection<RequestT>, Result<KV<RequestT, @Nullable ResponseT>>> read,
PTransform<PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>> write) {
this.read = read;
this.write = write;
}

public PTransform<PCollection<RequestT>, Result<KV<RequestT, @Nullable ResponseT>>> getRead() {
return read;
}

public PTransform<PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>>
getWrite() {
return write;
}
}

/**
* Instantiates a {@link Call} {@link PTransform} that reads {@link RequestT} {@link ResponseT}
Expand All @@ -45,7 +118,7 @@ final class Cache {
@Nullable ResponseT,
CallerSetupTeardownT extends
Caller<RequestT, KV<RequestT, @Nullable ResponseT>> & SetupTeardown>
PTransform<PCollection<RequestT>, Call.Result<KV<RequestT, @Nullable ResponseT>>> read(
PTransform<PCollection<RequestT>, Result<KV<RequestT, @Nullable ResponseT>>> read(
CallerSetupTeardownT implementsCallerSetupTeardown,
Coder<RequestT> requestTCoder,
Coder<@Nullable ResponseT> responseTCoder) {
Expand All @@ -66,12 +139,11 @@ final class Cache {
* considerations when using this method to achieve cache reads.
*/
static <RequestT, @Nullable ResponseT>
PTransform<PCollection<RequestT>, Call.Result<KV<RequestT, @Nullable ResponseT>>>
readUsingRedis(
RedisClient client,
Coder<RequestT> requestTCoder,
Coder<@Nullable ResponseT> responseTCoder)
throws NonDeterministicException {
PTransform<PCollection<RequestT>, Result<KV<RequestT, @Nullable ResponseT>>> readUsingRedis(
RedisClient client,
Coder<RequestT> requestTCoder,
Coder<@Nullable ResponseT> responseTCoder)
throws NonDeterministicException {
return read(
new UsingRedis<>(requestTCoder, responseTCoder, client).read(),
requestTCoder,
Expand All @@ -88,7 +160,7 @@ final class Cache {
ResponseT,
CallerSetupTeardownT extends
Caller<KV<RequestT, ResponseT>, KV<RequestT, ResponseT>> & SetupTeardown>
PTransform<PCollection<KV<RequestT, ResponseT>>, Call.Result<KV<RequestT, ResponseT>>> write(
PTransform<PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>> write(
CallerSetupTeardownT implementsCallerSetupTeardown,
KvCoder<RequestT, ResponseT> kvCoder) {
return Call.ofCallerAndSetupTeardown(implementsCallerSetupTeardown, kvCoder);
Expand All @@ -107,7 +179,7 @@ PTransform<PCollection<KV<RequestT, ResponseT>>, Call.Result<KV<RequestT, Respon
* considerations when using this method to achieve cache writes.
*/
static <RequestT, ResponseT>
PTransform<PCollection<KV<RequestT, ResponseT>>, Call.Result<KV<RequestT, ResponseT>>>
PTransform<PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>>
writeUsingRedis(
Duration expiry,
RedisClient client,
Expand Down Expand Up @@ -136,16 +208,16 @@ private UsingRedis(
this.responseTCoder = responseTCoder;
}

private Read<RequestT, @Nullable ResponseT> read() {
Read<RequestT, @Nullable ResponseT> read() {
return new Read<>(requestTCoder, responseTCoder, client);
}

private Write<RequestT, ResponseT> write(Duration expiry) {
Write<RequestT, ResponseT> write(Duration expiry) {
return new Write<>(expiry, requestTCoder, responseTCoder, client);
}

/** Reads associated {@link RequestT} {@link ResponseT} using a {@link RedisClient}. */
private static class Read<RequestT, @Nullable ResponseT>
static class Read<RequestT, @Nullable ResponseT>
implements Caller<RequestT, KV<RequestT, @Nullable ResponseT>>, SetupTeardown {

private final Coder<RequestT> requestTCoder;
Expand Down Expand Up @@ -191,49 +263,79 @@ public void teardown() throws UserCodeExecutionException {
client.teardown();
}
}

static class Write<RequestT, ResponseT>
implements Caller<KV<RequestT, ResponseT>, KV<RequestT, ResponseT>>, SetupTeardown {
private final Duration expiry;
private final Coder<RequestT> requestTCoder;
private final Coder<@Nullable ResponseT> responseTCoder;
private final RedisClient client;

private Write(
Duration expiry,
Coder<RequestT> requestTCoder,
Coder<@Nullable ResponseT> responseTCoder,
RedisClient client) {
this.expiry = expiry;
this.requestTCoder = requestTCoder;
this.responseTCoder = responseTCoder;
this.client = client;
}

@Override
public KV<RequestT, ResponseT> call(KV<RequestT, ResponseT> request)
throws UserCodeExecutionException {
ByteArrayOutputStream keyStream = new ByteArrayOutputStream();
ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
try {
requestTCoder.encode(request.getKey(), keyStream);
responseTCoder.encode(request.getValue(), valueStream);
} catch (IOException e) {
throw new UserCodeExecutionException(e);
}
client.setex(keyStream.toByteArray(), valueStream.toByteArray(), expiry);
return request;
}

@Override
public void setup() throws UserCodeExecutionException {
client.setup();
}

@Override
public void teardown() throws UserCodeExecutionException {
client.teardown();
}
}
}

private static class Write<RequestT, ResponseT>
implements Caller<KV<RequestT, ResponseT>, KV<RequestT, ResponseT>>, SetupTeardown {
private final Duration expiry;
private final Coder<RequestT> requestTCoder;
private final Coder<@Nullable ResponseT> responseTCoder;
private final RedisClient client;
/** Resolves checker error: incompatible argument for parameter ResponseT Coder. */
private static class CacheResponseCoder<ResponseT> extends CustomCoder<@Nullable ResponseT> {
private final NullableCoder<ResponseT> basis;

private Write(
Duration expiry,
Coder<RequestT> requestTCoder,
Coder<@Nullable ResponseT> responseTCoder,
RedisClient client) {
this.expiry = expiry;
this.requestTCoder = requestTCoder;
this.responseTCoder = responseTCoder;
this.client = client;
private CacheResponseCoder(Coder<ResponseT> basis) {
this.basis = NullableCoder.of(basis);
}

@Override
public KV<RequestT, ResponseT> call(KV<RequestT, ResponseT> request)
throws UserCodeExecutionException {
ByteArrayOutputStream keyStream = new ByteArrayOutputStream();
ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
try {
requestTCoder.encode(request.getKey(), keyStream);
responseTCoder.encode(request.getValue(), valueStream);
} catch (IOException e) {
throw new UserCodeExecutionException(e);
}
client.setex(keyStream.toByteArray(), valueStream.toByteArray(), expiry);
return request;
public void encode(@Nullable ResponseT value, OutputStream outStream)
throws CoderException, IOException {
basis.encode(value, outStream);
}

@Override
public @Nullable ResponseT decode(InputStream inStream) throws CoderException, IOException {
return basis.decode(inStream);
}

@Override
public void setup() throws UserCodeExecutionException {
client.setup();
public List<? extends Coder<?>> getCoderArguments() {
return basis.getCoderArguments();
}

@Override
public void teardown() throws UserCodeExecutionException {
client.teardown();
public void verifyDeterministic() throws NonDeterministicException {
basis.verifyDeterministic();
}
}
}
Loading

0 comments on commit 8fb06da

Please sign in to comment.