From d59ef194908387c7a2b87c7ceaf4c79faafcd2ed Mon Sep 17 00:00:00 2001 From: Maciej Szwaja Date: Mon, 1 Apr 2024 16:08:04 +0200 Subject: [PATCH] fix error in BigQueryUtils when retrieving non existent avro field (#30720) * dump the record schema in BigQueryUtils exception message for debugging purposes * include the right test * check if the field exists in the record before fetching it * add test, restore previous exception message * trigger V2 PostCommit as well --- .../beam_PostCommit_Java_DataflowV1.json | 2 +- .../beam_PostCommit_Java_DataflowV2.json | 3 +++ .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 8 ++++++-- .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 16 ++++++++++++++++ 4 files changed, 26 insertions(+), 3 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_Java_DataflowV2.json diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json index c4edaa85a89d..a03c067d2c4e 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json @@ -1,3 +1,3 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run" + "comment": "Modify this file in a trivial way to cause this test suite to run" } diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json new file mode 100644 index 000000000000..a03c067d2c4e --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -0,0 +1,3 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run" +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index e3ace73ee960..c0fa80ccfc61 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -497,7 +497,10 @@ public static Row toBeamRow(GenericRecord record, Schema schema, ConversionOptio .map( field -> { try { - return convertAvroFormat(field.getType(), record.get(field.getName()), options); + org.apache.avro.Schema.Field avroField = + record.getSchema().getField(field.getName()); + Object value = avroField != null ? record.get(avroField.pos()) : null; + return convertAvroFormat(field.getType(), value, options); } catch (Exception cause) { throw new IllegalArgumentException( "Error converting field " + field + ": " + cause.getMessage(), cause); @@ -709,7 +712,8 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso + jsonBQValue.getClass() + "' to '" + fieldType - + "' because the BigQuery type is a List, while the output type is not a collection."); + + "' because the BigQuery type is a List, while the output type is not a" + + " collection."); } boolean innerTypeIsMap = fieldType.getCollectionElementType().getTypeName().isMapType(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index deeee8db71b6..8a3ad16e190d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -1004,6 +1004,22 @@ public void testToBeamRow_avro_array_array_row() { assertEquals(expected, beamRow); } + @Test + public void testToBeamRow_projection() { + long testId = 123L; + // recordSchema is a projection of FLAT_TYPE schema + org.apache.avro.Schema recordSchema = + org.apache.avro.SchemaBuilder.record("__root__").fields().optionalLong("id").endRecord(); + GenericData.Record record = new GenericData.Record(recordSchema); + record.put("id", testId); + + Row expected = Row.withSchema(FLAT_TYPE).withFieldValue("id", testId).build(); + Row actual = + BigQueryUtils.toBeamRow( + record, FLAT_TYPE, BigQueryUtils.ConversionOptions.builder().build()); + assertEquals(expected, actual); + } + @Test public void testToTableSpec() { TableReference withProject =