Skip to content

Commit

Permalink
Consistent naming with AvroIO
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Dec 17, 2024
1 parent afab5e0 commit 1c52b2a
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public static void modelBigQueryIO(
// [START BigQueryReadFunction]
PCollection<Double> maxTemperatures =
p.apply(
BigQueryIO.readAvro(record -> (Double) record.get("max_temperature"))
BigQueryIO.parseGenericRecords(record -> (Double) record.get("max_temperature"))
.from(tableSpec)
.withCoder(DoubleCoder.of()));
// [END BigQueryReadFunction]
Expand All @@ -236,7 +236,7 @@ public static void modelBigQueryIO(
// [START BigQueryReadQuery]
PCollection<Double> maxTemperatures =
p.apply(
BigQueryIO.readAvro(record -> (Double) record.get("max_temperature"))
BigQueryIO.parseGenericRecords(record -> (Double) record.get("max_temperature"))
.fromQuery(
"SELECT max_temperature FROM [apache-beam-testing.samples.weather_stations]")
.withCoder(DoubleCoder.of()));
Expand All @@ -247,7 +247,7 @@ public static void modelBigQueryIO(
// [START BigQueryReadQueryStdSQL]
PCollection<Double> maxTemperatures =
p.apply(
BigQueryIO.readAvro(record -> (Double) record.get("max_temperature"))
BigQueryIO.parseGenericRecords(record -> (Double) record.get("max_temperature"))
.fromQuery(
"SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`")
.usingStandardSql()
Expand Down Expand Up @@ -387,7 +387,7 @@ public WeatherData(long year, long month, long day, double maxTemp) {

PCollection<WeatherData> weatherData =
p.apply(
BigQueryIO.readAvro(
BigQueryIO.parseGenericRecords(
record ->
new WeatherData(
(Long) record.get("year"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object Snippets {
val tableSpec = "apache-beam-testing.samples.weather_stations"
// [START BigQueryReadFunction]
val maxTemperatures = pipeline.apply(
BigQueryIO.readAvro { it["max_temperature"] as Double? }
BigQueryIO.parseGenericRecords { it["max_temperature"] as Double? }
.from(tableSpec)
.withCoder(DoubleCoder.of()))
// [END BigQueryReadFunction]
Expand All @@ -130,7 +130,7 @@ object Snippets {
run {
// [START BigQueryReadQuery]
val maxTemperatures = pipeline.apply(
BigQueryIO.readAvro { it["max_temperature"] as Double? }
BigQueryIO.parseGenericRecords { it["max_temperature"] as Double? }
.fromQuery(
"SELECT max_temperature FROM [apache-beam-testing.samples.weather_stations]")
.withCoder(DoubleCoder.of()))
Expand All @@ -140,7 +140,7 @@ object Snippets {
run {
// [START BigQueryReadQueryStdSQL]
val maxTemperatures = pipeline.apply(
BigQueryIO.readAvro { it["max_temperature"] as Double? }
BigQueryIO.parseGenericRecords { it["max_temperature"] as Double? }
.fromQuery(
"SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`")
.usingStandardSql()
Expand Down Expand Up @@ -249,7 +249,7 @@ object Snippets {
)
*/
val weatherData = pipeline.apply(
BigQueryIO.readAvro {
BigQueryIO.parseGenericRecords {
WeatherData(
it.get("year") as Long,
it.get("month") as Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ record ->
Pipeline p = Pipeline.create(options);
PCollection<Long> result =
p.apply(
BigQueryIO.readAvro(parseQueryResultToByteArray)
BigQueryIO.parseGenericRecords(parseQueryResultToByteArray)
.fromQuery(query)
.usingStandardSql()
.withMethod(Method.DIRECT_READ)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,22 +614,15 @@ public class BigQueryIO {
static final SerializableFunction<TableRow, TableRow> TABLE_ROW_IDENTITY_FORMATTER =
SerializableFunctions.identity();

/**
* A formatting function that maps a GenericRecord to itself. This allows sending a {@code
* PCollection<GenericRecord>} directly to BigQueryIO.Write.
*/
static final SerializableFunction<AvroWriteRequest<GenericRecord>, GenericRecord>
GENERIC_RECORD_IDENTITY_FORMATTER = AvroWriteRequest::getElement;

static final SerializableFunction<org.apache.avro.Schema, DatumWriter<GenericRecord>>
GENERIC_DATUM_WRITER_FACTORY = schema -> new GenericDatumWriter<>();

private static final SerializableFunction<TableSchema, org.apache.avro.Schema>
DEFAULT_AVRO_SCHEMA_FACTORY = BigQueryAvroUtils::toGenericAvroSchema;

/**
* @deprecated Use {@link #readAvro(SerializableFunction)}, {@link
* #readArrow(SerializableFunction)} or {@link #readTableRows} instead. {@link
* @deprecated Use {@link #parseGenericRecords(SerializableFunction)}, {@link
* #parseArrowRows(SerializableFunction)} or {@link #readTableRows} instead. {@link
* #readTableRows()} does exactly the same as {@link #read}.
*/
@Deprecated
Expand All @@ -643,9 +636,9 @@ public static Read read() {
* {@link TableRow}.
*
* <p>This method is more convenient to use in some cases, but usually has significantly lower
* performance than using {@link #readAvro(SerializableFunction)} or {@link
* #readArrow(SerializableFunction)} directly to parse data into a domain-specific type, due to
* the overhead of converting the rows to {@link TableRow}.
* performance than using {@link #parseGenericRecords(SerializableFunction)} or {@link
* #parseArrowRows(SerializableFunction)} directly to parse data into a domain-specific type, due
* to the overhead of converting the rows to {@link TableRow}.
*/
public static TypedRead<TableRow> readTableRows() {
return readTableRows(DataFormat.AVRO);
Expand Down Expand Up @@ -695,9 +688,9 @@ public static TypedRead<TableRow> readTableRowsWithSchema(DataFormat dataFormat)
* {@link Row}.
*
* <p>This method is more convenient to use in some cases, but usually has significantly lower
* performance than using {@link #readAvro(SerializableFunction)} or {@link
* #readArrow(SerializableFunction)} directly to parse data into a domain-specific type, due to
* the overhead of converting the rows to {@link Row}.
* performance than using {@link #parseGenericRecords(SerializableFunction)} or {@link
* #parseArrowRows(SerializableFunction)} directly to parse data into a domain-specific type, due
* to the overhead of converting the rows to {@link Row}.
*/
public static TypedRead<Row> readRows() {
return readRows(DataFormat.AVRO);
Expand All @@ -708,9 +701,9 @@ public static TypedRead<Row> readRows() {
*/
public static TypedRead<Row> readRows(DataFormat dataFormat) {
if (dataFormat == DataFormat.AVRO) {
return readAvro(new RowAvroParser());
return parseGenericRecords(new RowAvroParser());
} else if (dataFormat == DataFormat.ARROW) {
return readArrow();
return readArrowRows();
} else {
throw new IllegalArgumentException("Unsupported data format: " + dataFormat);
}
Expand All @@ -736,7 +729,7 @@ public static TypedRead<Row> readRows(DataFormat dataFormat) {
* }).from("...");
* }</pre>
*
* @deprecated Use {@link #readAvro(SerializableFunction)} instead.
* @deprecated Use {@link #parseGenericRecords(SerializableFunction)} instead.
*/
@Deprecated
public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> parseFn) {
Expand Down Expand Up @@ -769,7 +762,7 @@ public static <T> TypedRead<T> readWithDatumReader(
* each row of the table or query result as {@link GenericRecord}. Logical type in Extract jobs
* will be enabled.
*/
public static TypedRead<GenericRecord> readAvro() {
public static TypedRead<GenericRecord> readGenericRecords() {
return readAvroImpl(
null,
true,
Expand All @@ -784,7 +777,7 @@ public static TypedRead<GenericRecord> readAvro() {
* each row of the table or query result as {@link GenericRecord} with the desired schema. Logical
* type in Extract jobs will be enabled.
*/
public static TypedRead<GenericRecord> readAvro(org.apache.avro.Schema schema) {
public static TypedRead<GenericRecord> readGenericRecords(org.apache.avro.Schema schema) {
return readAvroImpl(
schema,
true,
Expand All @@ -799,7 +792,7 @@ public static TypedRead<GenericRecord> readAvro(org.apache.avro.Schema schema) {
* each row of the table or query result as input avro class. Logical type in Extract jobs will be
* enabled.
*/
public static <T> TypedRead<T> readAvro(Class<T> recordClass) {
public static <T> TypedRead<T> readSpecificRecords(Class<T> recordClass) {
org.apache.avro.Schema schema = ReflectData.get().getSchema(recordClass);
AvroDatumFactory<T> factory;
if (GenericRecord.class.equals(recordClass)) {
Expand All @@ -820,7 +813,7 @@ public static <T> TypedRead<T> readAvro(Class<T> recordClass) {
* input class, based on the appropriate {@link org.apache.avro.io.DatumReader} and schema.
* Logical type in Extract jobs will be enabled.
*/
public static <T> TypedRead<T> readAvro(
public static <T> TypedRead<T> readRecords(
org.apache.avro.Schema schema, AvroSource.DatumReaderFactory<T> readerFactory) {
TypeDescriptor<T> td = null;
Coder<T> coder = null;
Expand All @@ -836,7 +829,7 @@ public static <T> TypedRead<T> readAvro(
* each row of the table or query result, parsed from the BigQuery AVRO format using the specified
* function. Logical type in Extract jobs will be enabled.
*/
public static <T> TypedRead<T> readAvro(
public static <T> TypedRead<T> parseGenericRecords(
SerializableFunction<GenericRecord, T> avroFormatFunction) {
return readAvroImpl(
null,
Expand Down Expand Up @@ -883,15 +876,15 @@ private static <AvroT, T> TypedRead<T> readAvroImpl(
* Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
* each row of the table or query result as {@link Row}.
*/
public static TypedRead<Row> readArrow() {
public static TypedRead<Row> readArrowRows() {
return readArrowImpl(null, SchemaAndRow::getElement, null, TypeDescriptor.of(Row.class));
}

/**
* Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
* each row of the table or query result as {@link Row} with the desired schema.
*/
public static TypedRead<Row> readArrow(Schema schema) {
public static TypedRead<Row> readArrowRows(Schema schema) {
return readArrowImpl(
schema, SchemaAndRow::getElement, RowCoder.of(schema), TypeDescriptor.of(Row.class));
}
Expand All @@ -901,7 +894,7 @@ public static TypedRead<Row> readArrow(Schema schema) {
* each row of the table or query result, parsed from the BigQuery ARROW format using the
* specified function.
*/
public static <T> TypedRead<T> readArrow(SerializableFunction<Row, T> arrowFormatFunction) {
public static <T> TypedRead<T> parseArrowRows(SerializableFunction<Row, T> arrowFormatFunction) {
return readArrowImpl(
null,
input -> arrowFormatFunction.apply(input.getElement()),
Expand Down Expand Up @@ -2210,7 +2203,7 @@ public TypedRead<T> withMethod(TypedRead.Method method) {
/**
* See {@link DataFormat}.
*
* @deprecated User {@link #readAvro()} or {@link #readArrow()} instead
* @deprecated User {@link #readGenericRecords()} or {@link #readArrowRows()} instead
*/
@Deprecated
public TypedRead<T> withFormat(DataFormat format) {
Expand Down Expand Up @@ -2438,8 +2431,15 @@ public static Write<RowMutation> applyRowMutations() {
* GenericRecords} to a BigQuery table.
*/
public static Write<GenericRecord> writeGenericRecords() {
return BigQueryIO.<GenericRecord>write()
.withAvroFormatFunction(GENERIC_RECORD_IDENTITY_FORMATTER);
return BigQueryIO.<GenericRecord>write().withAvroWriter(GENERIC_DATUM_WRITER_FACTORY);
}

/**
* A {@link PTransform} that writes a {@link PCollection} containing {@link SpecificRecord
* SpecificRecord} to a BigQuery table.
*/
public static <T extends SpecificRecord> Write<T> writeSpecificRecords(Class<T> type) {
return BigQueryIO.<T>write().withAvroWriter(AvroDatumFactory.specific(type)::apply);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void testBigQueryStorageQueryWithErrorHandling1M() throws Exception {
PCollection<Long> count =
p.apply(
"Read",
BigQueryIO.readAvro(FailingTableRowParser.INSTANCE)
BigQueryIO.parseGenericRecords(FailingTableRowParser.INSTANCE)
.fromQuery("SELECT * FROM `" + options.getInputTable() + "`")
.usingStandardSql()
.withMethod(Method.DIRECT_READ)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ public TypedRead<KV<String, Long>> configureTypedRead(
when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));

return BigQueryIO.readAvro(parseFn)
return BigQueryIO.parseGenericRecords(parseFn)
.fromQuery(encodedQuery)
.withMethod(Method.DIRECT_READ)
.withTestServices(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void runBigQueryIOStorageReadPipelineErrorHandling() throws Exception {
PCollection<Long> count =
p.apply(
"Read",
BigQueryIO.readAvro(FailingTableRowParser.INSTANCE)
BigQueryIO.parseGenericRecords(FailingTableRowParser.INSTANCE)
.from(options.getInputTable())
.withMethod(Method.DIRECT_READ)
.withErrorHandler(errorHandler))
Expand Down Expand Up @@ -237,7 +237,7 @@ public void testBigQueryStorageReadProjectionPushdown() throws Exception {
PCollection<Long> count =
p.apply(
"Read",
BigQueryIO.readAvro(
BigQueryIO.parseGenericRecords(
record ->
BigQueryUtils.toBeamRow(
record, multiFieldSchema, ConversionOptions.builder().build()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1508,7 +1508,7 @@ public void testReadFromBigQueryIO() throws Exception {

PCollection<KV<String, Long>> output =
p.apply(
BigQueryIO.readAvro(new ParseAvroKeyValue())
BigQueryIO.parseGenericRecords(new ParseAvroKeyValue())
.from("foo.com:project:dataset.table")
.withMethod(Method.DIRECT_READ)
.withTestServices(
Expand Down Expand Up @@ -1716,7 +1716,7 @@ public void testReadFromBigQueryIOArrow() throws Exception {

PCollection<KV<String, Long>> output =
p.apply(
BigQueryIO.readArrow(new ParseArrowKeyValue())
BigQueryIO.parseArrowRows(new ParseArrowKeyValue())
.from("foo.com:project:dataset.table")
.withMethod(Method.DIRECT_READ)
.withTestServices(
Expand Down

0 comments on commit 1c52b2a

Please sign in to comment.