diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java index 0fbe949e1556..5273b43bfe0c 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java @@ -23,6 +23,7 @@ import java.math.BigDecimal; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; @@ -35,7 +36,7 @@ final class CsvIOParseHelpers { * "Reading CSV Files" section of the {@link CsvIO} documentation for information regarding which * {@link CSVFormat} parameters are checked during validation. */ - static void validate(CSVFormat format) { + static void validateCsvFormat(CSVFormat format) { String[] header = checkArgumentNotNull(format.getHeader(), "Illegal %s: header is required", CSVFormat.class); @@ -66,8 +67,20 @@ static void validate(CSVFormat format) { * Validate the {@link CSVFormat} in relation to the {@link Schema} for CSV record parsing * requirements. */ - // TODO(https://github.com/apache/beam/issues/31716): implement method. - static void validate(CSVFormat format, Schema schema) {} + static void validateCsvFormatWithSchema(CSVFormat format, Schema schema) { + List header = Arrays.asList(format.getHeader()); + for (Schema.Field field : schema.getFields()) { + String fieldName = field.getName(); + if (!field.getType().getNullable()) { + checkArgument( + header.contains(fieldName), + "Illegal %s: required %s field '%s' not found in header", + CSVFormat.class, + Schema.class.getTypeName(), + fieldName); + } + } + } /** * Build a {@link List} of {@link Schema.Field}s corresponding to the expected position of each diff --git a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java index 820464005eaa..f4ba855dfc26 100644 --- a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java +++ b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java @@ -34,18 +34,19 @@ @RunWith(JUnit4.class) public class CsvIOParseHelpersTest { - /** Tests for {@link CsvIOParseHelpers#validate(CSVFormat)}. */ + /** Tests for {@link CsvIOParseHelpers#validateCsvFormat(CSVFormat)}. */ @Test public void givenCSVFormatWithHeader_validates() { CSVFormat format = csvFormatWithHeader(); - CsvIOParseHelpers.validate(format); + CsvIOParseHelpers.validateCsvFormat(format); } @Test public void givenCSVFormatWithNullHeader_throwsException() { CSVFormat format = csvFormat(); String gotMessage = - assertThrows(IllegalArgumentException.class, () -> CsvIOParseHelpers.validate(format)) + assertThrows( + IllegalArgumentException.class, () -> CsvIOParseHelpers.validateCsvFormat(format)) .getMessage(); assertEquals("Illegal class org.apache.commons.csv.CSVFormat: header is required", gotMessage); } @@ -54,7 +55,8 @@ public void givenCSVFormatWithNullHeader_throwsException() { public void givenCSVFormatWithEmptyHeader_throwsException() { CSVFormat format = csvFormat().withHeader(); String gotMessage = - assertThrows(IllegalArgumentException.class, () -> CsvIOParseHelpers.validate(format)) + assertThrows( + IllegalArgumentException.class, () -> CsvIOParseHelpers.validateCsvFormat(format)) .getMessage(); assertEquals( "Illegal class org.apache.commons.csv.CSVFormat: header cannot be empty", gotMessage); @@ -64,7 +66,8 @@ public void givenCSVFormatWithEmptyHeader_throwsException() { public void givenCSVFormatWithHeaderContainingEmptyString_throwsException() { CSVFormat format = csvFormat().withHeader("", "bar"); String gotMessage = - assertThrows(IllegalArgumentException.class, () -> CsvIOParseHelpers.validate(format)) + assertThrows( + IllegalArgumentException.class, () -> CsvIOParseHelpers.validateCsvFormat(format)) .getMessage(); assertEquals( "Illegal class org.apache.commons.csv.CSVFormat: column name is required", gotMessage); @@ -74,7 +77,8 @@ public void givenCSVFormatWithHeaderContainingEmptyString_throwsException() { public void givenCSVFormatWithHeaderContainingNull_throwsException() { CSVFormat format = csvFormat().withHeader(null, "bar"); String gotMessage = - assertThrows(IllegalArgumentException.class, () -> CsvIOParseHelpers.validate(format)) + assertThrows( + IllegalArgumentException.class, () -> CsvIOParseHelpers.validateCsvFormat(format)) .getMessage(); assertEquals( "Illegal class org.apache.commons.csv.CSVFormat: column name is required", gotMessage); @@ -84,7 +88,8 @@ public void givenCSVFormatWithHeaderContainingNull_throwsException() { public void givenCSVFormatThatAllowsMissingColumnNames_throwsException() { CSVFormat format = csvFormatWithHeader().withAllowMissingColumnNames(true); String gotMessage = - assertThrows(IllegalArgumentException.class, () -> CsvIOParseHelpers.validate(format)) + assertThrows( + IllegalArgumentException.class, () -> CsvIOParseHelpers.validateCsvFormat(format)) .getMessage(); assertEquals( "Illegal class org.apache.commons.csv.CSVFormat: cannot allow missing column names", @@ -95,7 +100,8 @@ public void givenCSVFormatThatAllowsMissingColumnNames_throwsException() { public void givenCSVFormatThatIgnoresHeaderCase_throwsException() { CSVFormat format = csvFormatWithHeader().withIgnoreHeaderCase(true); String gotMessage = - assertThrows(IllegalArgumentException.class, () -> CsvIOParseHelpers.validate(format)) + assertThrows( + IllegalArgumentException.class, () -> CsvIOParseHelpers.validateCsvFormat(format)) .getMessage(); assertEquals( "Illegal class org.apache.commons.csv.CSVFormat: cannot ignore header case", gotMessage); @@ -105,14 +111,48 @@ public void givenCSVFormatThatIgnoresHeaderCase_throwsException() { public void givenCSVFormatThatAllowsDuplicateHeaderNames_throwsException() { CSVFormat format = csvFormatWithHeader().withAllowDuplicateHeaderNames(true); String gotMessage = - assertThrows(IllegalArgumentException.class, () -> CsvIOParseHelpers.validate(format)) + assertThrows( + IllegalArgumentException.class, () -> CsvIOParseHelpers.validateCsvFormat(format)) .getMessage(); assertEquals( "Illegal class org.apache.commons.csv.CSVFormat: cannot allow duplicate header names", gotMessage); } - /** End of tests for {@link CsvIOParseHelpers#validate(CSVFormat)}. */ + /** End of tests for {@link CsvIOParseHelpers#validateCsvFormat(CSVFormat)}. */ + ////////////////////////////////////////////////////////////////////////////////////////////// + + /** Tests for {@link CsvIOParseHelpers#validateCsvFormatWithSchema(CSVFormat, Schema)}. */ + @Test + public void givenNullableSchemaFieldNotPresentInHeader_validates() { + CSVFormat format = csvFormat().withHeader("foo", "bar"); + Schema schema = + Schema.of( + Schema.Field.of("foo", Schema.FieldType.STRING), + Schema.Field.of("bar", Schema.FieldType.STRING), + Schema.Field.nullable("baz", Schema.FieldType.STRING)); + CsvIOParseHelpers.validateCsvFormatWithSchema(format, schema); + } + + @Test + public void givenRequiredSchemaFieldNotPresentInHeader_throwsException() { + CSVFormat format = csvFormat().withHeader("foo", "bar"); + Schema schema = + Schema.of( + Schema.Field.of("foo", Schema.FieldType.STRING), + Schema.Field.of("bar", Schema.FieldType.STRING), + Schema.Field.of("baz", Schema.FieldType.STRING)); + String gotMessage = + assertThrows( + IllegalArgumentException.class, + () -> CsvIOParseHelpers.validateCsvFormatWithSchema(format, schema)) + .getMessage(); + assertEquals( + "Illegal class org.apache.commons.csv.CSVFormat: required org.apache.beam.sdk.schemas.Schema field 'baz' not found in header", + gotMessage); + } + + /** End of tests for {@link CsvIOParseHelpers#validateCsvFormatWithSchema(CSVFormat, Schema)}. */ ////////////////////////////////////////////////////////////////////////////////////////////// /** Tests for {@link CsvIOParseHelpers#parseCell(String, Schema.Field)}. */