Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RRIO] [Call] Implement PTransform without adaptive throttling #29144

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdks/java/io/rrio/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.joda_time
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.jackson_core
implementation library.java.jackson_databind
implementation "redis.clients:jedis:$jedisVersion"

testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
*/
package org.apache.beam.io.requestresponse;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.value.AutoValue;
import java.util.Optional;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCaseFormat;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.joda.time.Instant;

/** {@link ApiIOError} is a data class for storing details about an error. */
Expand All @@ -30,12 +35,31 @@
@AutoValue
public abstract class ApiIOError {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/**
* Instantiate an {@link ApiIOError} from an {@link ErrorT} {@link T} element. The {@link T}
* element is converted to a JSON string.
*/
static <T, ErrorT extends Exception> ApiIOError of(@NonNull ErrorT e, @NonNull T element)
throws JsonProcessingException {

String json = OBJECT_MAPPER.writeValueAsString(element);

return ApiIOError.builder()
.setRequestAsJsonString(json)
.setMessage(Optional.ofNullable(e.getMessage()).orElse(""))
.setObservedTimestamp(Instant.now())
.setStackTrace(Throwables.getStackTraceAsString(e))
.build();
}

static Builder builder() {
return new AutoValue_ApiIOError.Builder();
}

/** The encoded UTF-8 string representation of the related processed element. */
public abstract String getEncodedElementAsUtfString();
/** The JSON string representation of the request associated with the error. */
public abstract String getRequestAsJsonString();

/** The observed timestamp of the error. */
public abstract Instant getObservedTimestamp();
Expand All @@ -49,13 +73,13 @@ static Builder builder() {
@AutoValue.Builder
abstract static class Builder {

public abstract Builder setEncodedElementAsUtfString(String value);
abstract Builder setRequestAsJsonString(String value);

public abstract Builder setObservedTimestamp(Instant value);
abstract Builder setObservedTimestamp(Instant value);

public abstract Builder setMessage(String value);
abstract Builder setMessage(String value);

public abstract Builder setStackTrace(String value);
abstract Builder setStackTrace(String value);

abstract ApiIOError build();
}
Expand Down
Loading
Loading