From 626c864a119560284f3c60d437be02b81bab5bcc Mon Sep 17 00:00:00 2001 From: Maciej Szwaja Date: Fri, 22 Mar 2024 19:23:51 +0000 Subject: [PATCH 1/5] dump the record schema in BigQueryUtils exception message for debugging purposes --- .github/trigger_files/beam_PostCommit_Java_DataflowV1.json | 2 +- runners/google-cloud-dataflow-java/build.gradle | 4 ++-- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json index c4edaa85a89d..a03c067d2c4e 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json @@ -1,3 +1,3 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run" + "comment": "Modify this file in a trivial way to cause this test suite to run" } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 09ab59b3f4bc..1212d7b0114c 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -560,8 +560,8 @@ task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test, dependsOn: copyG "--firestoreDb=${firestoreDb}", ]) - include '**/*IT.class' - exclude '**/BigQueryIOReadIT.class' +// include '**/*IT.class' + include '**/BigQueryIOReadIT.class' exclude '**/BigQueryIOStorageReadTableRowIT.class' exclude '**/PubsubReadIT.class' exclude '**/FhirIOReadIT.class' diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index e3ace73ee960..a0d5baf31884 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -500,7 +500,7 @@ public static Row toBeamRow(GenericRecord record, Schema schema, ConversionOptio return convertAvroFormat(field.getType(), record.get(field.getName()), options); } catch (Exception cause) { throw new IllegalArgumentException( - "Error converting field " + field + ": " + cause.getMessage(), cause); + "Error converting field " + field + ", record schema " + record.getSchema() + ": " + cause.getMessage(), cause); } }) .collect(toList()); From 040ba1c28d1a4b53a3c413a96f836210c52392ce Mon Sep 17 00:00:00 2001 From: Maciej Szwaja Date: Fri, 22 Mar 2024 20:14:15 +0000 Subject: [PATCH 2/5] include the right test --- runners/google-cloud-dataflow-java/build.gradle | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 1212d7b0114c..20b893ab9113 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -561,7 +561,8 @@ task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test, dependsOn: copyG ]) // include '**/*IT.class' - include '**/BigQueryIOReadIT.class' + include '**/BigQueryIOStorageReadIT.class' + exclude '**/BigQueryIOReadIT.class' exclude '**/BigQueryIOStorageReadTableRowIT.class' exclude '**/PubsubReadIT.class' exclude '**/FhirIOReadIT.class' From 9c4052fa195f1928ea770e239a692f9b62ac89bf Mon Sep 17 00:00:00 2001 From: Maciej Szwaja Date: Fri, 22 Mar 2024 21:17:50 +0000 Subject: [PATCH 3/5] check if the field exists in the record before fetching it --- .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index a0d5baf31884..4378c7c48a79 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -497,10 +497,18 @@ public static Row toBeamRow(GenericRecord record, Schema schema, ConversionOptio .map( field -> { try { - return convertAvroFormat(field.getType(), record.get(field.getName()), options); + Object value = + record.hasField(field.getName()) ? record.get(field.getName()) : null; + return convertAvroFormat(field.getType(), value, options); } catch (Exception cause) { throw new IllegalArgumentException( - "Error converting field " + field + ", record schema " + record.getSchema() + ": " + cause.getMessage(), cause); + "Error converting field " + + field + + ", record schema " + + record.getSchema() + + ": " + + cause.getMessage(), + cause); } }) .collect(toList()); @@ -709,7 +717,8 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso + jsonBQValue.getClass() + "' to '" + fieldType - + "' because the BigQuery type is a List, while the output type is not a collection."); + + "' because the BigQuery type is a List, while the output type is not a" + + " collection."); } boolean innerTypeIsMap = fieldType.getCollectionElementType().getTypeName().isMapType(); From 3cd2318d2c9cfa2b08c4ed967e3f7a07c125b1d7 Mon Sep 17 00:00:00 2001 From: Maciej Szwaja Date: Mon, 25 Mar 2024 12:43:08 +0000 Subject: [PATCH 4/5] add test, restore previous exception message --- .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 13 ++++--------- .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 16 ++++++++++++++++ 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 4378c7c48a79..c0fa80ccfc61 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -497,18 +497,13 @@ public static Row toBeamRow(GenericRecord record, Schema schema, ConversionOptio .map( field -> { try { - Object value = - record.hasField(field.getName()) ? record.get(field.getName()) : null; + org.apache.avro.Schema.Field avroField = + record.getSchema().getField(field.getName()); + Object value = avroField != null ? record.get(avroField.pos()) : null; return convertAvroFormat(field.getType(), value, options); } catch (Exception cause) { throw new IllegalArgumentException( - "Error converting field " - + field - + ", record schema " - + record.getSchema() - + ": " - + cause.getMessage(), - cause); + "Error converting field " + field + ": " + cause.getMessage(), cause); } }) .collect(toList()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index deeee8db71b6..8a3ad16e190d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -1004,6 +1004,22 @@ public void testToBeamRow_avro_array_array_row() { assertEquals(expected, beamRow); } + @Test + public void testToBeamRow_projection() { + long testId = 123L; + // recordSchema is a projection of FLAT_TYPE schema + org.apache.avro.Schema recordSchema = + org.apache.avro.SchemaBuilder.record("__root__").fields().optionalLong("id").endRecord(); + GenericData.Record record = new GenericData.Record(recordSchema); + record.put("id", testId); + + Row expected = Row.withSchema(FLAT_TYPE).withFieldValue("id", testId).build(); + Row actual = + BigQueryUtils.toBeamRow( + record, FLAT_TYPE, BigQueryUtils.ConversionOptions.builder().build()); + assertEquals(expected, actual); + } + @Test public void testToTableSpec() { TableReference withProject = From c8a9d8b5cd06a4d1196626bd7651d14c75ab899b Mon Sep 17 00:00:00 2001 From: Maciej Szwaja Date: Mon, 25 Mar 2024 14:07:06 +0000 Subject: [PATCH 5/5] trigger V2 PostCommit as well --- .github/trigger_files/beam_PostCommit_Java_DataflowV2.json | 3 +++ runners/google-cloud-dataflow-java/build.gradle | 3 +-- 2 files changed, 4 insertions(+), 2 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_Java_DataflowV2.json diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json new file mode 100644 index 000000000000..a03c067d2c4e --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -0,0 +1,3 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run" +} diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 20b893ab9113..09ab59b3f4bc 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -560,8 +560,7 @@ task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test, dependsOn: copyG "--firestoreDb=${firestoreDb}", ]) -// include '**/*IT.class' - include '**/BigQueryIOStorageReadIT.class' + include '**/*IT.class' exclude '**/BigQueryIOReadIT.class' exclude '**/BigQueryIOStorageReadTableRowIT.class' exclude '**/PubsubReadIT.class'