From 1c52b2a31389b7dd535a1141362ad3397ba1f3c3 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Mon, 16 Dec 2024 18:22:48 +0100 Subject: [PATCH] Consistent naming with AvroIO --- .../beam/examples/snippets/Snippets.java | 8 +-- .../beam/examples/kotlin/snippets/Snippets.kt | 8 +-- .../BigQueryHllSketchCompatibilityIT.java | 2 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 58 +++++++++---------- .../bigquery/BigQueryIOStorageQueryIT.java | 2 +- .../bigquery/BigQueryIOStorageQueryTest.java | 2 +- .../gcp/bigquery/BigQueryIOStorageReadIT.java | 4 +- .../bigquery/BigQueryIOStorageReadTest.java | 4 +- 8 files changed, 44 insertions(+), 44 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java index afa736202cb9..49f6ba89b350 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java @@ -226,7 +226,7 @@ public static void modelBigQueryIO( // [START BigQueryReadFunction] PCollection maxTemperatures = p.apply( - BigQueryIO.readAvro(record -> (Double) record.get("max_temperature")) + BigQueryIO.parseGenericRecords(record -> (Double) record.get("max_temperature")) .from(tableSpec) .withCoder(DoubleCoder.of())); // [END BigQueryReadFunction] @@ -236,7 +236,7 @@ public static void modelBigQueryIO( // [START BigQueryReadQuery] PCollection maxTemperatures = p.apply( - BigQueryIO.readAvro(record -> (Double) record.get("max_temperature")) + BigQueryIO.parseGenericRecords(record -> (Double) record.get("max_temperature")) .fromQuery( "SELECT max_temperature FROM [apache-beam-testing.samples.weather_stations]") .withCoder(DoubleCoder.of())); @@ -247,7 +247,7 @@ public static void modelBigQueryIO( // [START BigQueryReadQueryStdSQL] PCollection maxTemperatures = p.apply( - BigQueryIO.readAvro(record -> (Double) record.get("max_temperature")) + BigQueryIO.parseGenericRecords(record -> (Double) record.get("max_temperature")) .fromQuery( "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`") .usingStandardSql() @@ -387,7 +387,7 @@ public WeatherData(long year, long month, long day, double maxTemp) { PCollection weatherData = p.apply( - BigQueryIO.readAvro( + BigQueryIO.parseGenericRecords( record -> new WeatherData( (Long) record.get("year"), diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt index e84f70027d5a..a216f4e09748 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt @@ -121,7 +121,7 @@ object Snippets { val tableSpec = "apache-beam-testing.samples.weather_stations" // [START BigQueryReadFunction] val maxTemperatures = pipeline.apply( - BigQueryIO.readAvro { it["max_temperature"] as Double? } + BigQueryIO.parseGenericRecords { it["max_temperature"] as Double? } .from(tableSpec) .withCoder(DoubleCoder.of())) // [END BigQueryReadFunction] @@ -130,7 +130,7 @@ object Snippets { run { // [START BigQueryReadQuery] val maxTemperatures = pipeline.apply( - BigQueryIO.readAvro { it["max_temperature"] as Double? } + BigQueryIO.parseGenericRecords { it["max_temperature"] as Double? } .fromQuery( "SELECT max_temperature FROM [apache-beam-testing.samples.weather_stations]") .withCoder(DoubleCoder.of())) @@ -140,7 +140,7 @@ object Snippets { run { // [START BigQueryReadQueryStdSQL] val maxTemperatures = pipeline.apply( - BigQueryIO.readAvro { it["max_temperature"] as Double? } + BigQueryIO.parseGenericRecords { it["max_temperature"] as Double? } .fromQuery( "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`") .usingStandardSql() @@ -249,7 +249,7 @@ object Snippets { ) */ val weatherData = pipeline.apply( - BigQueryIO.readAvro { + BigQueryIO.parseGenericRecords { WeatherData( it.get("year") as Long, it.get("month") as Long, diff --git a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java b/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java index 5b55ff5edb6a..4c46c2f41543 100644 --- a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java +++ b/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java @@ -189,7 +189,7 @@ record -> Pipeline p = Pipeline.create(options); PCollection result = p.apply( - BigQueryIO.readAvro(parseQueryResultToByteArray) + BigQueryIO.parseGenericRecords(parseQueryResultToByteArray) .fromQuery(query) .usingStandardSql() .withMethod(Method.DIRECT_READ) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index c3330368bc3a..2ab58960c589 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -614,13 +614,6 @@ public class BigQueryIO { static final SerializableFunction TABLE_ROW_IDENTITY_FORMATTER = SerializableFunctions.identity(); - /** - * A formatting function that maps a GenericRecord to itself. This allows sending a {@code - * PCollection} directly to BigQueryIO.Write. - */ - static final SerializableFunction, GenericRecord> - GENERIC_RECORD_IDENTITY_FORMATTER = AvroWriteRequest::getElement; - static final SerializableFunction> GENERIC_DATUM_WRITER_FACTORY = schema -> new GenericDatumWriter<>(); @@ -628,8 +621,8 @@ public class BigQueryIO { DEFAULT_AVRO_SCHEMA_FACTORY = BigQueryAvroUtils::toGenericAvroSchema; /** - * @deprecated Use {@link #readAvro(SerializableFunction)}, {@link - * #readArrow(SerializableFunction)} or {@link #readTableRows} instead. {@link + * @deprecated Use {@link #parseGenericRecords(SerializableFunction)}, {@link + * #parseArrowRows(SerializableFunction)} or {@link #readTableRows} instead. {@link * #readTableRows()} does exactly the same as {@link #read}. */ @Deprecated @@ -643,9 +636,9 @@ public static Read read() { * {@link TableRow}. * *

This method is more convenient to use in some cases, but usually has significantly lower - * performance than using {@link #readAvro(SerializableFunction)} or {@link - * #readArrow(SerializableFunction)} directly to parse data into a domain-specific type, due to - * the overhead of converting the rows to {@link TableRow}. + * performance than using {@link #parseGenericRecords(SerializableFunction)} or {@link + * #parseArrowRows(SerializableFunction)} directly to parse data into a domain-specific type, due + * to the overhead of converting the rows to {@link TableRow}. */ public static TypedRead readTableRows() { return readTableRows(DataFormat.AVRO); @@ -695,9 +688,9 @@ public static TypedRead readTableRowsWithSchema(DataFormat dataFormat) * {@link Row}. * *

This method is more convenient to use in some cases, but usually has significantly lower - * performance than using {@link #readAvro(SerializableFunction)} or {@link - * #readArrow(SerializableFunction)} directly to parse data into a domain-specific type, due to - * the overhead of converting the rows to {@link Row}. + * performance than using {@link #parseGenericRecords(SerializableFunction)} or {@link + * #parseArrowRows(SerializableFunction)} directly to parse data into a domain-specific type, due + * to the overhead of converting the rows to {@link Row}. */ public static TypedRead readRows() { return readRows(DataFormat.AVRO); @@ -708,9 +701,9 @@ public static TypedRead readRows() { */ public static TypedRead readRows(DataFormat dataFormat) { if (dataFormat == DataFormat.AVRO) { - return readAvro(new RowAvroParser()); + return parseGenericRecords(new RowAvroParser()); } else if (dataFormat == DataFormat.ARROW) { - return readArrow(); + return readArrowRows(); } else { throw new IllegalArgumentException("Unsupported data format: " + dataFormat); } @@ -736,7 +729,7 @@ public static TypedRead readRows(DataFormat dataFormat) { * }).from("..."); * } * - * @deprecated Use {@link #readAvro(SerializableFunction)} instead. + * @deprecated Use {@link #parseGenericRecords(SerializableFunction)} instead. */ @Deprecated public static TypedRead read(SerializableFunction parseFn) { @@ -769,7 +762,7 @@ public static TypedRead readWithDatumReader( * each row of the table or query result as {@link GenericRecord}. Logical type in Extract jobs * will be enabled. */ - public static TypedRead readAvro() { + public static TypedRead readGenericRecords() { return readAvroImpl( null, true, @@ -784,7 +777,7 @@ public static TypedRead readAvro() { * each row of the table or query result as {@link GenericRecord} with the desired schema. Logical * type in Extract jobs will be enabled. */ - public static TypedRead readAvro(org.apache.avro.Schema schema) { + public static TypedRead readGenericRecords(org.apache.avro.Schema schema) { return readAvroImpl( schema, true, @@ -799,7 +792,7 @@ public static TypedRead readAvro(org.apache.avro.Schema schema) { * each row of the table or query result as input avro class. Logical type in Extract jobs will be * enabled. */ - public static TypedRead readAvro(Class recordClass) { + public static TypedRead readSpecificRecords(Class recordClass) { org.apache.avro.Schema schema = ReflectData.get().getSchema(recordClass); AvroDatumFactory factory; if (GenericRecord.class.equals(recordClass)) { @@ -820,7 +813,7 @@ public static TypedRead readAvro(Class recordClass) { * input class, based on the appropriate {@link org.apache.avro.io.DatumReader} and schema. * Logical type in Extract jobs will be enabled. */ - public static TypedRead readAvro( + public static TypedRead readRecords( org.apache.avro.Schema schema, AvroSource.DatumReaderFactory readerFactory) { TypeDescriptor td = null; Coder coder = null; @@ -836,7 +829,7 @@ public static TypedRead readAvro( * each row of the table or query result, parsed from the BigQuery AVRO format using the specified * function. Logical type in Extract jobs will be enabled. */ - public static TypedRead readAvro( + public static TypedRead parseGenericRecords( SerializableFunction avroFormatFunction) { return readAvroImpl( null, @@ -883,7 +876,7 @@ private static TypedRead readAvroImpl( * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per * each row of the table or query result as {@link Row}. */ - public static TypedRead readArrow() { + public static TypedRead readArrowRows() { return readArrowImpl(null, SchemaAndRow::getElement, null, TypeDescriptor.of(Row.class)); } @@ -891,7 +884,7 @@ public static TypedRead readArrow() { * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per * each row of the table or query result as {@link Row} with the desired schema. */ - public static TypedRead readArrow(Schema schema) { + public static TypedRead readArrowRows(Schema schema) { return readArrowImpl( schema, SchemaAndRow::getElement, RowCoder.of(schema), TypeDescriptor.of(Row.class)); } @@ -901,7 +894,7 @@ public static TypedRead readArrow(Schema schema) { * each row of the table or query result, parsed from the BigQuery ARROW format using the * specified function. */ - public static TypedRead readArrow(SerializableFunction arrowFormatFunction) { + public static TypedRead parseArrowRows(SerializableFunction arrowFormatFunction) { return readArrowImpl( null, input -> arrowFormatFunction.apply(input.getElement()), @@ -2210,7 +2203,7 @@ public TypedRead withMethod(TypedRead.Method method) { /** * See {@link DataFormat}. * - * @deprecated User {@link #readAvro()} or {@link #readArrow()} instead + * @deprecated User {@link #readGenericRecords()} or {@link #readArrowRows()} instead */ @Deprecated public TypedRead withFormat(DataFormat format) { @@ -2438,8 +2431,15 @@ public static Write applyRowMutations() { * GenericRecords} to a BigQuery table. */ public static Write writeGenericRecords() { - return BigQueryIO.write() - .withAvroFormatFunction(GENERIC_RECORD_IDENTITY_FORMATTER); + return BigQueryIO.write().withAvroWriter(GENERIC_DATUM_WRITER_FACTORY); + } + + /** + * A {@link PTransform} that writes a {@link PCollection} containing {@link SpecificRecord + * SpecificRecord} to a BigQuery table. + */ + public static Write writeSpecificRecords(Class type) { + return BigQueryIO.write().withAvroWriter(AvroDatumFactory.specific(type)::apply); } /** diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java index 32ca9e61bed0..0f8df034da08 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java @@ -138,7 +138,7 @@ public void testBigQueryStorageQueryWithErrorHandling1M() throws Exception { PCollection count = p.apply( "Read", - BigQueryIO.readAvro(FailingTableRowParser.INSTANCE) + BigQueryIO.parseGenericRecords(FailingTableRowParser.INSTANCE) .fromQuery("SELECT * FROM `" + options.getInputTable() + "`") .usingStandardSql() .withMethod(Method.DIRECT_READ) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java index 68a17190f125..c78f7c0af4a4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java @@ -830,7 +830,7 @@ public TypedRead> configureTypedRead( when(fakeStorageClient.readRows(expectedReadRowsRequest, "")) .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses)); - return BigQueryIO.readAvro(parseFn) + return BigQueryIO.parseGenericRecords(parseFn) .fromQuery(encodedQuery) .withMethod(Method.DIRECT_READ) .withTestServices( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java index bf999582dfd7..cd1a8629b617 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java @@ -147,7 +147,7 @@ private void runBigQueryIOStorageReadPipelineErrorHandling() throws Exception { PCollection count = p.apply( "Read", - BigQueryIO.readAvro(FailingTableRowParser.INSTANCE) + BigQueryIO.parseGenericRecords(FailingTableRowParser.INSTANCE) .from(options.getInputTable()) .withMethod(Method.DIRECT_READ) .withErrorHandler(errorHandler)) @@ -237,7 +237,7 @@ public void testBigQueryStorageReadProjectionPushdown() throws Exception { PCollection count = p.apply( "Read", - BigQueryIO.readAvro( + BigQueryIO.parseGenericRecords( record -> BigQueryUtils.toBeamRow( record, multiFieldSchema, ConversionOptions.builder().build())) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java index 435ef9e0742c..ecaad482d6a9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java @@ -1508,7 +1508,7 @@ public void testReadFromBigQueryIO() throws Exception { PCollection> output = p.apply( - BigQueryIO.readAvro(new ParseAvroKeyValue()) + BigQueryIO.parseGenericRecords(new ParseAvroKeyValue()) .from("foo.com:project:dataset.table") .withMethod(Method.DIRECT_READ) .withTestServices( @@ -1716,7 +1716,7 @@ public void testReadFromBigQueryIOArrow() throws Exception { PCollection> output = p.apply( - BigQueryIO.readArrow(new ParseArrowKeyValue()) + BigQueryIO.parseArrowRows(new ParseArrowKeyValue()) .from("foo.com:project:dataset.table") .withMethod(Method.DIRECT_READ) .withTestServices(