From 1c12f92a9a56889473b23944d0e028d94a18f8aa Mon Sep 17 00:00:00 2001 From: johnjcasey <95318300+johnjcasey@users.noreply.github.com> Date: Tue, 20 Jun 2023 13:03:03 -0400 Subject: [PATCH] Add initial DLQ router (#27045) * Add initial DLQ router * fix apache licenses --- sdks/java/io/components/build.gradle | 41 ++++++++++ .../components/deadletterqueue/DLQRouter.java | 55 ++++++++++++++ .../deadletterqueue/sinks/ThrowingSink.java | 45 +++++++++++ .../deadletterqueue/DLQRouterTest.java | 75 +++++++++++++++++++ 4 files changed, 216 insertions(+) create mode 100644 sdks/java/io/components/build.gradle create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouter.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/sinks/ThrowingSink.java create mode 100644 sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouterTest.java diff --git a/sdks/java/io/components/build.gradle b/sdks/java/io/components/build.gradle new file mode 100644 index 000000000000..3bf987d82bcc --- /dev/null +++ b/sdks/java/io/components/build.gradle @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.components', +) + +description = "Apache Beam :: SDKs :: Java :: IO :: Components" +ext.summary = "Components for building fully featured IOs" + +dependencies { + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.protobuf_java + permitUnusedDeclared library.java.protobuf_java // BEAM-11761 + implementation library.java.slf4j_api + implementation library.java.vendored_guava_26_0_jre + implementation library.java.vendored_grpc_1_54_0 + + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation library.java.guava_testlib + testImplementation library.java.junit + testImplementation library.java.hamcrest + testRuntimeOnly library.java.slf4j_jdk14 + testImplementation project(path: ":runners:direct-java", configuration: "shadow") +} \ No newline at end of file diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouter.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouter.java new file mode 100644 index 000000000000..7cff6cb3e791 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.deadletterqueue; + +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.checkerframework.checker.nullness.qual.NonNull; + +public class DLQRouter extends PTransform<@NonNull PCollectionTuple, @NonNull PCollection> { + + private final TupleTag goodMessages; + + private final TupleTag badMessages; + + private final PTransform<@NonNull PCollection,?> errorSink; + + public DLQRouter (TupleTag goodMessages, TupleTag badMessages, PTransform<@NonNull PCollection,?> errorSink){ + this.goodMessages = goodMessages; + this.badMessages = badMessages; + this.errorSink = errorSink; + } + @Override + public PCollection expand(@NonNull PCollectionTuple input) { + //validate no extra messages are dropped + Map,PCollection> pcollections = new HashMap<>(input.getAll()); + pcollections.remove(goodMessages); + pcollections.remove(badMessages); + if (pcollections.size() != 0){ + throw new IllegalArgumentException("DLQ Router only supports PCollectionTuples split between two message groupings"); + } + + input.get(badMessages).apply(errorSink); + + return input.get(goodMessages); + } +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/sinks/ThrowingSink.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/sinks/ThrowingSink.java new file mode 100644 index 000000000000..3e5994e41e3b --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/sinks/ThrowingSink.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.deadletterqueue.sinks; + +import org.apache.beam.repackaged.core.org.apache.commons.lang3.ObjectUtils.Null; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.NonNull; + +public class ThrowingSink extends PTransform<@NonNull PCollection, @NonNull PDone> { + + @Override + public PDone expand(@NonNull PCollection input) { + input.apply(ParDo.of(new ThrowingDoFn())); + + return PDone.in(input.getPipeline()); + } + + public class ThrowingDoFn extends DoFn { + + @ProcessElement + public void processElement(@Element @NonNull T element){ + throw new RuntimeException(element.toString()); + } + } +} diff --git a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouterTest.java b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouterTest.java new file mode 100644 index 000000000000..14e076df1dcc --- /dev/null +++ b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouterTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.deadletterqueue; + +import org.apache.beam.sdk.io.components.deadletterqueue.sinks.ThrowingSink; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class DLQRouterTest { + + @Rule + public final transient TestPipeline p = TestPipeline.create(); + + @Rule public ExpectedException thrown = ExpectedException.none(); + + + @Test + public void testExceptionWithInvalidConfiguration(){ + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("DLQ Router only supports PCollectionTuples split between two message groupings"); + + TupleTag tag1 = new TupleTag<>(); + TupleTag tag2 = new TupleTag<>(); + TupleTag tag3 = new TupleTag<>(); + PCollectionTuple tuple = PCollectionTuple.of(tag1, p.apply(Create.of("elem1"))) + .and(tag2, p.apply(Create.of("elem2"))) + .and(tag3, p.apply(Create.of("elem1"))); + tuple.apply(new DLQRouter<>(tag1, tag2, new ThrowingSink<>())); + + p.run(); + + } + + @Test + public void testExpectCorrectRouting(){ + thrown.expect(RuntimeException.class); + thrown.expectMessage("elem2"); + + TupleTag tag1 = new TupleTag<>(); + TupleTag tag2 = new TupleTag<>(); + + PCollectionTuple tuple = PCollectionTuple.of(tag1, p.apply("create elem1", Create.of("elem1"))) + .and(tag2, p.apply("create elem2", Create.of("elem2"))); + + PCollection expectedElement = tuple.apply(new DLQRouter<>(tag1, tag2, new ThrowingSink<>())); + + PAssert.thatSingleton(expectedElement).isEqualTo("elem1"); + + p.run(); + } + + +}