diff --git a/sdks/java/io/rrio/build.gradle b/sdks/java/io/rrio/build.gradle index 32fbd9d22e38..d65df370e0ca 100644 --- a/sdks/java/io/rrio/build.gradle +++ b/sdks/java/io/rrio/build.gradle @@ -25,10 +25,9 @@ description = "Apache Beam :: SDKS :: Java :: IO :: RequestResponseIO (RRIO)" ext.summary = "Support to read from and write to Web APIs" dependencies { - // TODO(damondouglas): revert to implementation after project is more fully developed - permitUnusedDeclared project(path: ":sdks:java:core", configuration: "shadow") - permitUnusedDeclared library.java.joda_time - permitUnusedDeclared library.java.vendored_guava_32_1_2_jre + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.joda_time + implementation library.java.vendored_guava_32_1_2_jre testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation library.java.junit diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/ApiIOError.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/ApiIOError.java new file mode 100644 index 000000000000..b7c5524e8237 --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/ApiIOError.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponseio; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaCaseFormat; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat; +import org.joda.time.Instant; + +/** {@link ApiIOError} is a data class for storing details about an error. */ +@SchemaCaseFormat(CaseFormat.LOWER_UNDERSCORE) +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class ApiIOError { + + static Builder builder() { + return new AutoValue_ApiIOError.Builder(); + } + + /** The encoded UTF-8 string representation of the related processed element. */ + public abstract String getEncodedElementAsUtfString(); + + /** The observed timestamp of the error. */ + public abstract Instant getObservedTimestamp(); + + /** The {@link Exception} message. */ + public abstract String getMessage(); + + /** The {@link Exception} stack trace. */ + public abstract String getStackTrace(); + + @AutoValue.Builder + abstract static class Builder { + + public abstract Builder setEncodedElementAsUtfString(String value); + + public abstract Builder setObservedTimestamp(Instant value); + + public abstract Builder setMessage(String value); + + public abstract Builder setStackTrace(String value); + + abstract ApiIOError build(); + } +} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/RequestResponseIO.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/RequestResponseIO.java new file mode 100644 index 000000000000..2ff0d50f68d5 --- /dev/null +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/RequestResponseIO.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponseio; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import org.apache.beam.io.requestresponseio.RequestResponseIO.Result; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +/** + * {@link PTransform} for reading from and writing to Web APIs. + * + *

{@link RequestResponseIO} is recommended for interacting with external systems that offer RPCs + * that execute relatively quickly and do not offer advance features to make RPC execution + * efficient. + * + *

For systems that offer features for more efficient reading, for example, tracking progress of + * RPCs, support for splitting RPCs (deduct two or more RPCs which when combined return the same + * result), consider using the Apache Beam's `Splittable DoFn` interface instead. + * + *

Basic Usage

+ * + * {@link RequestResponseIO} minimally requires implementing the {@link Caller} interface: + * + *
{@code class MyCaller implements Caller {
+ *    public SomeResponse call(SomeRequest request) throws UserCodeExecutionException {
+ *      // calls the API submitting SomeRequest payload and returning SomeResponse
+ *    }
+ * }}
+ * + *

Then provide {@link RequestResponseIO}'s {@link #create} method your {@link Caller} + * implementation. + * + *

{@code  PCollection requests = ...
+ *  Result result = requests.apply(RequestResponseIO.create(new MyCaller()));
+ *  result.getResponses().apply( ... );
+ *  result.getFailures().apply( ... );
+ * }
+ */ +public class RequestResponseIO + extends PTransform, Result> { + + private static final TupleTag FAILURE_TAG = new TupleTag() {}; + + // TODO(damondouglas): remove when utilized. + @SuppressWarnings({"unused"}) + private final Configuration configuration; + + private RequestResponseIO(Configuration configuration) { + this.configuration = configuration; + } + + public static RequestResponseIO of( + Caller caller) { + return new RequestResponseIO<>( + Configuration.builder().setCaller(caller).build()); + } + + /** Configuration details for {@link RequestResponseIO}. */ + @AutoValue + abstract static class Configuration { + + static Builder builder() { + return new AutoValue_RequestResponseIO_Configuration.Builder<>(); + } + + /** + * The {@link Caller} that interfaces user custom code to process a {@link RequestT} into a + * {@link ResponseT}. + */ + abstract Caller getCaller(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setCaller(Caller value); + + abstract Configuration build(); + } + } + + @Override + public Result expand(PCollection input) { + // TODO(damondouglas; https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+%5BRRIO%5D): + // expand pipeline as more dependencies develop. + return Result.of(new TupleTag() {}, PCollectionTuple.empty(input.getPipeline())); + } + + /** + * The {@link Result} of processing request {@link PCollection} into response {@link PCollection} + * using custom {@link Caller} code. + */ + public static class Result implements POutput { + + static Result of(TupleTag responseTag, PCollectionTuple pct) { + return new Result<>(responseTag, pct); + } + + private final Pipeline pipeline; + private final TupleTag responseTag; + private final PCollection responses; + private final PCollection failures; + + private Result(TupleTag responseTag, PCollectionTuple pct) { + this.pipeline = pct.getPipeline(); + this.responseTag = responseTag; + this.responses = pct.get(responseTag); + this.failures = pct.get(FAILURE_TAG); + } + + public PCollection getResponses() { + return responses; + } + + public PCollection getFailures() { + return failures; + } + + @Override + public Pipeline getPipeline() { + return this.pipeline; + } + + @Override + public Map, PValue> expand() { + return ImmutableMap.of( + responseTag, responses, + FAILURE_TAG, failures); + } + + @Override + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform transform) {} + } +} diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/CallerTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponseio/CallerTest.java similarity index 93% rename from sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/CallerTest.java rename to sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponseio/CallerTest.java index 5258573f4283..0ba2d93c5411 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/CallerTest.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponseio/CallerTest.java @@ -15,16 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.io.rrio; +package org.apache.beam.io.requestresponseio; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; -import org.apache.beam.io.requestresponseio.Caller; -import org.apache.beam.io.requestresponseio.UserCodeExecutionException; -import org.apache.beam.io.requestresponseio.UserCodeQuotaException; -import org.apache.beam.io.requestresponseio.UserCodeTimeoutException; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/SetupTeardownTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponseio/SetupTeardownTest.java similarity index 93% rename from sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/SetupTeardownTest.java rename to sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponseio/SetupTeardownTest.java index a8c5c45ede5c..9ef2f88a29c5 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/SetupTeardownTest.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponseio/SetupTeardownTest.java @@ -15,15 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.io.rrio; +package org.apache.beam.io.requestresponseio; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; -import org.apache.beam.io.requestresponseio.SetupTeardown; -import org.apache.beam.io.requestresponseio.UserCodeExecutionException; -import org.apache.beam.io.requestresponseio.UserCodeQuotaException; -import org.apache.beam.io.requestresponseio.UserCodeTimeoutException; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create;