Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RRIO] Stub the RequestResponseIO transform #28950

Merged
merged 3 commits into from
Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions sdks/java/io/rrio/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ description = "Apache Beam :: SDKS :: Java :: IO :: RequestResponseIO (RRIO)"
ext.summary = "Support to read from and write to Web APIs"

dependencies {
// TODO(damondouglas): revert to implementation after project is more fully developed
permitUnusedDeclared project(path: ":sdks:java:core", configuration: "shadow")
permitUnusedDeclared library.java.joda_time
permitUnusedDeclared library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.joda_time
implementation library.java.vendored_guava_32_1_2_jre

testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.junit
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.requestresponseio;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCaseFormat;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
import org.joda.time.Instant;

/** {@link ApiIOError} is a data class for storing details about an error. */
@SchemaCaseFormat(CaseFormat.LOWER_UNDERSCORE)
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class ApiIOError {

static Builder builder() {
return new AutoValue_ApiIOError.Builder();
}

/** The encoded UTF-8 string representation of the related processed element. */
public abstract String getEncodedElementAsUtfString();

/** The observed timestamp of the error. */
public abstract Instant getObservedTimestamp();

/** The {@link Exception} message. */
public abstract String getMessage();

/** The {@link Exception} stack trace. */
public abstract String getStackTrace();

@AutoValue.Builder
abstract static class Builder {

public abstract Builder setEncodedElementAsUtfString(String value);

public abstract Builder setObservedTimestamp(Instant value);

public abstract Builder setMessage(String value);

public abstract Builder setStackTrace(String value);

abstract ApiIOError build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.requestresponseio;

import com.google.auto.value.AutoValue;
import java.util.Map;
import org.apache.beam.io.requestresponseio.RequestResponseIO.Result;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;

/**
* {@link PTransform} for reading from and writing to Web APIs.
damondouglas marked this conversation as resolved.
Show resolved Hide resolved
*
* <p>{@link RequestResponseIO} is recommended for interacting with external systems that offer RPCs
* that execute relatively quickly and do not offer advance features to make RPC execution
* efficient.
*
* <p>For systems that offer features for more efficient reading, for example, tracking progress of
* RPCs, support for splitting RPCs (deduct two or more RPCs which when combined return the same
* result), consider using the Apache Beam's `Splittable DoFn` interface instead.
*
* <h2>Basic Usage</h2>
*
* {@link RequestResponseIO} minimally requires implementing the {@link Caller} interface:
*
* <pre>{@code class MyCaller implements Caller<SomeRequest, SomeResponse> {
* public SomeResponse call(SomeRequest request) throws UserCodeExecutionException {
* // calls the API submitting SomeRequest payload and returning SomeResponse
* }
* }}</pre>
*
* <p>Then provide {@link RequestResponseIO}'s {@link #create} method your {@link Caller}
* implementation.
*
* <pre>{@code PCollection<SomeRequest> requests = ...
* Result result = requests.apply(RequestResponseIO.create(new MyCaller()));
* result.getResponses().apply( ... );
* result.getFailures().apply( ... );
* }</pre>
*/
public class RequestResponseIO<RequestT, ResponseT>
extends PTransform<PCollection<RequestT>, Result<ResponseT>> {

private static final TupleTag<ApiIOError> FAILURE_TAG = new TupleTag<ApiIOError>() {};

// TODO(damondouglas): remove when utilized.
@SuppressWarnings({"unused"})
private final Configuration<RequestT, ResponseT> configuration;

private RequestResponseIO(Configuration<RequestT, ResponseT> configuration) {
this.configuration = configuration;
}

public static <RequestT, ResponseT> RequestResponseIO<RequestT, ResponseT> of(
Caller<RequestT, ResponseT> caller) {
return new RequestResponseIO<>(
Configuration.<RequestT, ResponseT>builder().setCaller(caller).build());
}

/** Configuration details for {@link RequestResponseIO}. */
@AutoValue
abstract static class Configuration<RequestT, ResponseT> {

static <RequestT, ResponseT> Builder<RequestT, ResponseT> builder() {
return new AutoValue_RequestResponseIO_Configuration.Builder<>();
}

/**
* The {@link Caller} that interfaces user custom code to process a {@link RequestT} into a
* {@link ResponseT}.
*/
abstract Caller<RequestT, ResponseT> getCaller();

abstract Builder<RequestT, ResponseT> toBuilder();

@AutoValue.Builder
abstract static class Builder<RequestT, ResponseT> {

abstract Builder<RequestT, ResponseT> setCaller(Caller<RequestT, ResponseT> value);

abstract Configuration<RequestT, ResponseT> build();
}
}

@Override
public Result<ResponseT> expand(PCollection<RequestT> input) {
// TODO(damondouglas; https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+%5BRRIO%5D):
damondouglas marked this conversation as resolved.
Show resolved Hide resolved
// expand pipeline as more dependencies develop.
return Result.of(new TupleTag<ResponseT>() {}, PCollectionTuple.empty(input.getPipeline()));
}

/**
* The {@link Result} of processing request {@link PCollection} into response {@link PCollection}
* using custom {@link Caller} code.
*/
public static class Result<ResponseT> implements POutput {

static <ResponseT> Result<ResponseT> of(TupleTag<ResponseT> responseTag, PCollectionTuple pct) {
return new Result<>(responseTag, pct);
}

private final Pipeline pipeline;
private final TupleTag<ResponseT> responseTag;
private final PCollection<ResponseT> responses;
private final PCollection<ApiIOError> failures;

private Result(TupleTag<ResponseT> responseTag, PCollectionTuple pct) {
this.pipeline = pct.getPipeline();
this.responseTag = responseTag;
this.responses = pct.get(responseTag);
this.failures = pct.get(FAILURE_TAG);
}

public PCollection<ResponseT> getResponses() {
return responses;
}

public PCollection<ApiIOError> getFailures() {
return failures;
}

@Override
public Pipeline getPipeline() {
return this.pipeline;
}

@Override
public Map<TupleTag<?>, PValue> expand() {
return ImmutableMap.of(
responseTag, responses,
FAILURE_TAG, failures);
}

@Override
public void finishSpecifyingOutput(
String transformName, PInput input, PTransform<?, ?> transform) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.rrio;
package org.apache.beam.io.requestresponseio;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import org.apache.beam.io.requestresponseio.Caller;
import org.apache.beam.io.requestresponseio.UserCodeExecutionException;
import org.apache.beam.io.requestresponseio.UserCodeQuotaException;
import org.apache.beam.io.requestresponseio.UserCodeTimeoutException;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.rrio;
package org.apache.beam.io.requestresponseio;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import org.apache.beam.io.requestresponseio.SetupTeardown;
import org.apache.beam.io.requestresponseio.UserCodeExecutionException;
import org.apache.beam.io.requestresponseio.UserCodeQuotaException;
import org.apache.beam.io.requestresponseio.UserCodeTimeoutException;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
Expand Down
Loading