From eb59788cf4b46360c2469f3ec4b0c388255c3f40 Mon Sep 17 00:00:00 2001 From: lahariguduru <108150650+lahariguduru@users.noreply.github.com> Date: Fri, 12 Jul 2024 00:01:30 +0000 Subject: [PATCH] [CsvIO] Changed CsvIOParseConfiguration to include DLQ (#31852) * Create CsvIOParseConfiguration class * Altered CsvIOParseConfiguration class to include Dead Letter Queue messages * Altered CsvIOParseConfiguration class to allow for Dead Letter Queue processing * removed System.out.println(input) --------- Co-authored-by: Lahari Guduru --- .../sdk/io/csv/CsvIOParseConfiguration.java | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java index 22f06edc8322..7d421a8209bb 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java @@ -22,22 +22,27 @@ import java.util.Map; import java.util.Optional; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.values.PCollection; import org.apache.commons.csv.CSVFormat; /** Stores parameters needed for CSV record parsing. */ @AutoValue abstract class CsvIOParseConfiguration { + /** A Dead Letter Queue that returns potential errors with {@link BadRecord}. */ + final PTransform, PCollection> errorHandlerTransform = + new BadRecordOutput(); + static Builder builder() { return new AutoValue_CsvIOParseConfiguration.Builder(); } - /** - * The expected CSVFormat - * of the parsed CSV record. - */ + /** The expected {@link CSVFormat} of the parsed CSV record. */ abstract CSVFormat getCsvFormat(); /** The expected {@link Schema} of the target type. */ @@ -66,4 +71,20 @@ final CsvIOParseConfiguration build() { return autoBuild(); } } + + private static class BadRecordOutput + extends PTransform, PCollection> { + + @Override + public PCollection expand(PCollection input) { + return input.apply(ParDo.of(new BadRecordTransformFn())); + } + + private static class BadRecordTransformFn extends DoFn { + @ProcessElement + public void process(@Element BadRecord input, OutputReceiver receiver) { + receiver.output(input); + } + } + } }