Skip to content

Commit

Permalink
Reduce breaking changes
Browse files Browse the repository at this point in the history
Reduce breaking changes by configuring IO with simple objects
  • Loading branch information
RustedBones committed Dec 10, 2024
1 parent 0032817 commit adc31e2
Show file tree
Hide file tree
Showing 14 changed files with 420 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DefaultCoder;
Expand All @@ -60,7 +59,6 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.range.OffsetRange;
Expand Down Expand Up @@ -228,8 +226,7 @@ public static void modelBigQueryIO(
// [START BigQueryReadFunction]
PCollection<Double> maxTemperatures =
p.apply(
BigQueryIO.read(
(SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature"))
BigQueryIO.readAvro(record -> (Double) record.get("max_temperature"))
.from(tableSpec)
.withCoder(DoubleCoder.of()));
// [END BigQueryReadFunction]
Expand All @@ -239,8 +236,7 @@ public static void modelBigQueryIO(
// [START BigQueryReadQuery]
PCollection<Double> maxTemperatures =
p.apply(
BigQueryIO.read(
(SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature"))
BigQueryIO.readAvro(record -> (Double) record.get("max_temperature"))
.fromQuery(
"SELECT max_temperature FROM [apache-beam-testing.samples.weather_stations]")
.withCoder(DoubleCoder.of()));
Expand All @@ -251,8 +247,7 @@ public static void modelBigQueryIO(
// [START BigQueryReadQueryStdSQL]
PCollection<Double> maxTemperatures =
p.apply(
BigQueryIO.read(
(SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature"))
BigQueryIO.readAvro(record -> (Double) record.get("max_temperature"))
.fromQuery(
"SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`")
.usingStandardSql()
Expand Down Expand Up @@ -392,15 +387,13 @@ public WeatherData(long year, long month, long day, double maxTemp) {

PCollection<WeatherData> weatherData =
p.apply(
BigQueryIO.read(
(SchemaAndRecord elem) -> {
GenericRecord record = elem.getRecord();
return new WeatherData(
(Long) record.get("year"),
(Long) record.get("month"),
(Long) record.get("day"),
(Double) record.get("max_temperature"));
})
BigQueryIO.readAvro(
record ->
new WeatherData(
(Long) record.get("year"),
(Long) record.get("month"),
(Long) record.get("day"),
(Double) record.get("max_temperature")))
.fromQuery(
"SELECT year, month, day, max_temperature "
+ "FROM [apache-beam-testing.samples.weather_stations] "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object Snippets {
val tableSpec = "apache-beam-testing.samples.weather_stations"
// [START BigQueryReadFunction]
val maxTemperatures = pipeline.apply(
BigQueryIO.read { it.record["max_temperature"] as Double? }
BigQueryIO.readAvro { it["max_temperature"] as Double? }
.from(tableSpec)
.withCoder(DoubleCoder.of()))
// [END BigQueryReadFunction]
Expand All @@ -130,7 +130,7 @@ object Snippets {
run {
// [START BigQueryReadQuery]
val maxTemperatures = pipeline.apply(
BigQueryIO.read { it.record["max_temperature"] as Double? }
BigQueryIO.readAvro { it["max_temperature"] as Double? }
.fromQuery(
"SELECT max_temperature FROM [apache-beam-testing.samples.weather_stations]")
.withCoder(DoubleCoder.of()))
Expand All @@ -140,7 +140,7 @@ object Snippets {
run {
// [START BigQueryReadQueryStdSQL]
val maxTemperatures = pipeline.apply(
BigQueryIO.read { it.record["max_temperature"] as Double? }
BigQueryIO.readAvro { it["max_temperature"] as Double? }
.fromQuery(
"SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`")
.usingStandardSql()
Expand Down Expand Up @@ -249,13 +249,12 @@ object Snippets {
)
*/
val weatherData = pipeline.apply(
BigQueryIO.read {
val record = it.record
BigQueryIO.readAvro {
WeatherData(
record.get("year") as Long,
record.get("month") as Long,
record.get("day") as Long,
record.get("max_temperature") as Double)
it.get("year") as Long,
it.get("month") as Long,
it.get("day") as Long,
it.get("max_temperature") as Double)
}
.fromQuery("""
SELECT year, month, day, max_temperature
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private String generateRowRestrictions(Schema schema, List<RexNode> supported) {

private TypedRead<Row> getBigQueryTypedRead(Schema schema) {
return BigQueryIO.read(
record -> BigQueryUtils.toBeamRow(record.getRecord(), schema, conversionOptions))
record -> BigQueryUtils.toBeamRow(record.getElement(), schema, conversionOptions))
.withMethod(method)
.from(bqLocation)
.withCoder(SchemaCoder.of(schema));
Expand Down
Loading

0 comments on commit adc31e2

Please sign in to comment.