diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java index 22f06edc8322..7d421a8209bb 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java @@ -22,22 +22,27 @@ import java.util.Map; import java.util.Optional; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.values.PCollection; import org.apache.commons.csv.CSVFormat; /** Stores parameters needed for CSV record parsing. */ @AutoValue abstract class CsvIOParseConfiguration { + /** A Dead Letter Queue that returns potential errors with {@link BadRecord}. */ + final PTransform, PCollection> errorHandlerTransform = + new BadRecordOutput(); + static Builder builder() { return new AutoValue_CsvIOParseConfiguration.Builder(); } - /** - * The expected CSVFormat - * of the parsed CSV record. - */ + /** The expected {@link CSVFormat} of the parsed CSV record. */ abstract CSVFormat getCsvFormat(); /** The expected {@link Schema} of the target type. */ @@ -66,4 +71,20 @@ final CsvIOParseConfiguration build() { return autoBuild(); } } + + private static class BadRecordOutput + extends PTransform, PCollection> { + + @Override + public PCollection expand(PCollection input) { + return input.apply(ParDo.of(new BadRecordTransformFn())); + } + + private static class BadRecordTransformFn extends DoFn { + @ProcessElement + public void process(@Element BadRecord input, OutputReceiver receiver) { + receiver.output(input); + } + } + } }