Skip to content

Commit

Permalink
Add initial DLQ router (#27045)
Browse files Browse the repository at this point in the history
* Add initial DLQ router

* fix apache licenses
  • Loading branch information
johnjcasey authored Jun 20, 2023
1 parent 4c66866 commit 1c12f92
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 0 deletions.
41 changes: 41 additions & 0 deletions sdks/java/io/components/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.components',
)

description = "Apache Beam :: SDKs :: Java :: IO :: Components"
ext.summary = "Components for building fully featured IOs"

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.protobuf_java
permitUnusedDeclared library.java.protobuf_java // BEAM-11761
implementation library.java.slf4j_api
implementation library.java.vendored_guava_26_0_jre
implementation library.java.vendored_grpc_1_54_0

testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.guava_testlib
testImplementation library.java.junit
testImplementation library.java.hamcrest
testRuntimeOnly library.java.slf4j_jdk14
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.components.deadletterqueue;

import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.checkerframework.checker.nullness.qual.NonNull;

public class DLQRouter<T, K> extends PTransform<@NonNull PCollectionTuple, @NonNull PCollection<T>> {

private final TupleTag<T> goodMessages;

private final TupleTag<K> badMessages;

private final PTransform<@NonNull PCollection<K>,?> errorSink;

public DLQRouter (TupleTag<T> goodMessages, TupleTag<K> badMessages, PTransform<@NonNull PCollection<K>,?> errorSink){
this.goodMessages = goodMessages;
this.badMessages = badMessages;
this.errorSink = errorSink;
}
@Override
public PCollection<T> expand(@NonNull PCollectionTuple input) {
//validate no extra messages are dropped
Map<TupleTag<?>,PCollection<?>> pcollections = new HashMap<>(input.getAll());
pcollections.remove(goodMessages);
pcollections.remove(badMessages);
if (pcollections.size() != 0){
throw new IllegalArgumentException("DLQ Router only supports PCollectionTuples split between two message groupings");
}

input.get(badMessages).apply(errorSink);

return input.get(goodMessages);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.components.deadletterqueue.sinks;

import org.apache.beam.repackaged.core.org.apache.commons.lang3.ObjectUtils.Null;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.Row;
import org.checkerframework.checker.nullness.qual.NonNull;

public class ThrowingSink<T> extends PTransform<@NonNull PCollection<T>, @NonNull PDone> {

@Override
public PDone expand(@NonNull PCollection<T> input) {
input.apply(ParDo.of(new ThrowingDoFn()));

return PDone.in(input.getPipeline());
}

public class ThrowingDoFn extends DoFn<T, Null> {

@ProcessElement
public void processElement(@Element @NonNull T element){
throw new RuntimeException(element.toString());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.components.deadletterqueue;

import org.apache.beam.sdk.io.components.deadletterqueue.sinks.ThrowingSink;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class DLQRouterTest {

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Rule public ExpectedException thrown = ExpectedException.none();


@Test
public void testExceptionWithInvalidConfiguration(){
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("DLQ Router only supports PCollectionTuples split between two message groupings");

TupleTag<String> tag1 = new TupleTag<>();
TupleTag<String> tag2 = new TupleTag<>();
TupleTag<String> tag3 = new TupleTag<>();
PCollectionTuple tuple = PCollectionTuple.of(tag1, p.apply(Create.<String>of("elem1")))
.and(tag2, p.apply(Create.<String>of("elem2")))
.and(tag3, p.apply(Create.<String>of("elem1")));
tuple.apply(new DLQRouter<>(tag1, tag2, new ThrowingSink<>()));

p.run();

}

@Test
public void testExpectCorrectRouting(){
thrown.expect(RuntimeException.class);
thrown.expectMessage("elem2");

TupleTag<String> tag1 = new TupleTag<>();
TupleTag<String> tag2 = new TupleTag<>();

PCollectionTuple tuple = PCollectionTuple.of(tag1, p.apply("create elem1", Create.<String>of("elem1")))
.and(tag2, p.apply("create elem2", Create.<String>of("elem2")));

PCollection<String> expectedElement = tuple.apply(new DLQRouter<>(tag1, tag2, new ThrowingSink<>()));

PAssert.thatSingleton(expectedElement).isEqualTo("elem1");

p.run();
}


}

0 comments on commit 1c12f92

Please sign in to comment.