Skip to content

Commit

Permalink
Improve BQ typed support
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Nov 28, 2024
1 parent a1fce09 commit 98e3faf
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 149 deletions.
4 changes: 4 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,10 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
// added BQ Json object
ProblemFilters.exclude[MissingTypesProblem](
"com.spotify.scio.bigquery.types.package$Json$"
),
// dropped custom BigQueryAvroUtilsWrapper
ProblemFilters.exclude[MissingClassProblem](
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryAvroUtilsWrapper"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ object TypedBigQueryIT {
timestamp: Instant,
date: LocalDate,
time: LocalTime,
datetime: LocalDateTime,
// BQ DATETIME is problematic with avro: export as 'string(datetime)', load as '(long)local-timestamp-micros'
// datetime: LocalDateTime,
geography: Geography,
json: Json,
bigNumeric: BigNumeric
Expand Down Expand Up @@ -116,20 +117,32 @@ object TypedBigQueryIT {
class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
import TypedBigQueryIT._

private val bq = BigQuery.defaultInstance()

override protected def afterAll(): Unit = {
val bq = BigQuery.defaultInstance()
// best effort cleanup
Try(bq.tables.delete(typedTable.ref))
Try(bq.tables.delete(tableRowTable.ref))
Try(bq.tables.delete(avroTable.ref))
}

def waitForTable(table: Table.Spec): Unit = {
var retries = 0
while (!bq.tables.exists(table.ref) && retries < 3) {
Thread.sleep(500)
retries += 1
}
if (retries >= 3) throw new RuntimeException(s"Table $table not found")
}

"TypedBigQuery" should "handle records as TableRow" in {
runWithRealContext(options) { sc =>
sc.parallelize(records)
.saveAsTypedBigQueryTable(typedTable, createDisposition = CREATE_IF_NEEDED)
}.waitUntilFinish()

waitForTable(typedTable)

runWithRealContext(options) { sc =>
val data = sc.typedBigQuery[Record](typedTable)
data should containInAnyOrder(records)
Expand All @@ -147,15 +160,17 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
)
}.waitUntilFinish()

waitForTable(tableRowTable)

runWithRealContext(options) { sc =>
val data = sc.bigQueryTable(tableRowTable).map(Record.fromTableRow)
data should containInAnyOrder(records)
}
}

// TODO fix if in beam 2.61
ignore should "handle records as avro format" in {
it should "handle records as avro format" in {
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(Record.avroSchema)

runWithRealContext(options) { sc =>
sc.parallelize(records)
.map(Record.toAvro)
Expand All @@ -166,6 +181,8 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
)
}.waitUntilFinish()

waitForTable(avroTable)

runWithRealContext(options) { sc =>
val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro)
data should containInAnyOrder(records)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ object BigQueryTypedTable {
beam.BigQueryIO.readTableRows(),
beam.BigQueryIO.writeTableRows(),
table,
BigQueryUtils.convertGenericRecordToTableRow(_, _)
(r, _) => BigQueryUtils.convertGenericRecordToTableRow(r)
)(coders.tableRowCoder)

private[this] def genericRecord(
Expand Down Expand Up @@ -423,8 +423,8 @@ object BigQueryTypedTable {
val writer = beam.BigQueryIO
.write[T]()
.withFormatFunction(Functions.serializableFn(wFn))
val fn: (GenericRecord, TableSchema) => T = (gr, ts) =>
tableRowFn(BigQueryUtils.convertGenericRecordToTableRow(gr, ts))
val fn: (GenericRecord, TableSchema) => T = (gr, _) =>
tableRowFn(BigQueryUtils.convertGenericRecordToTableRow(gr))

BigQueryTypedTable(reader, writer, table, fn)
}
Expand All @@ -437,13 +437,15 @@ object BigQueryTypedTable {
): BigQueryTypedTable[T] = {
val rFn = ClosureCleaner.clean(readerFn)
val wFn = ClosureCleaner.clean(writerFn)
val reader = beam.BigQueryIO.read(rFn(_))
val reader = beam.BigQueryIO
.read(rFn(_))
.useAvroLogicalTypes()
val writer = beam.BigQueryIO
.write[T]()
.useAvroLogicalTypes()
.withAvroFormatFunction(input => wFn(input.getElement()))
.withAvroSchemaFactory { ts =>
BigQueryAvroUtilsWrapper.toGenericAvroSchema("root", ts.getFields())
BigQueryUtils.toGenericAvroSchema("root", ts.getFields(), true)
}

BigQueryTypedTable(reader, writer, table, fn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,13 @@ object Timestamp {
case t: Long => new Instant(t / 1000)
case _ => parse(timestamp.toString)
}

def micros(timestamp: Instant): Long = timestamp.getMillis * 1000
}

/** Utility for BigQuery `DATE` type. */
object Date {
private val EpochDate = new LocalDate(1970, 1, 1)
// YYYY-[M]M-[D]D
private[this] val Formatter =
DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC()
Expand All @@ -195,6 +198,8 @@ object Date {
case d: Int => new LocalDate(0, DateTimeZone.UTC).plusDays(d)
case _ => parse(date.toString)
}

def days(date: LocalDate): Int = Days.daysBetween(EpochDate, date).getDays
}

/** Utility for BigQuery `TIME` type. */
Expand All @@ -219,6 +224,8 @@ object Time {
case t: Long => new LocalTime(t / 1000, DateTimeZone.UTC)
case _ => parse(time.toString)
}

def micros(time: LocalTime): Long = time.millisOfDay().get().toLong * 1000
}

/** Utility for BigQuery `DATETIME` type. */
Expand Down Expand Up @@ -324,4 +331,7 @@ object Numeric {
case b: ByteBuffer => DecimalConverter.fromBytes(b, null, DecimalLogicalType)
case _ => apply(value.toString)
}

def bytes(value: BigDecimal): ByteBuffer =
DecimalConverter.toBytes(value.bigDecimal, null, DecimalLogicalType)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.{BinaryDecoder, DecoderFactory}
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition, WriteDisposition}
import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryAvroUtilsWrapper, BigQueryOptions}
import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryOptions, BigQueryUtils}
import org.apache.beam.sdk.io.gcp.{bigquery => bq}
import org.apache.beam.sdk.options.{ExecutorOptions, PipelineOptionsFactory}
import org.joda.time.Instant
Expand Down Expand Up @@ -64,11 +64,8 @@ final private[client] class TableOps(client: Client) {
storageAvroRows(table, TableReadOptions.getDefaultInstance)

def storageRows(table: STable, readOptions: TableReadOptions): Iterator[TableRow] =
withBigQueryService { bqServices =>
val tb = bqServices.getTable(table.ref, readOptions.getSelectedFieldsList)
storageAvroRows(table, readOptions).map { gr =>
BigQueryAvroUtilsWrapper.convertGenericRecordToTableRow(gr, tb.getSchema)
}
storageAvroRows(table, readOptions).map { gr =>
BigQueryUtils.convertGenericRecordToTableRow(gr)
}

def storageAvroRows(table: STable, readOptions: TableReadOptions): Iterator[GenericRecord] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,18 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[String] => tree

case t if t =:= typeOf[BigDecimal] =>
q"_root_.com.spotify.scio.bigquery.Numeric($tree).toString"
q"_root_.com.spotify.scio.bigquery.Numeric.bytes($tree)"
case t if t =:= typeOf[ByteString] =>
q"_root_.java.nio.ByteBuffer.wrap($tree.toByteArray)"
case t if t =:= typeOf[Array[Byte]] =>
q"_root_.java.nio.ByteBuffer.wrap($tree)"

case t if t =:= typeOf[Instant] => q"$tree.getMillis * 1000"
case t if t =:= typeOf[Instant] =>
q"_root_.com.spotify.scio.bigquery.Timestamp.micros($tree)"
case t if t =:= typeOf[LocalDate] =>
q"_root_.com.spotify.scio.bigquery.Date($tree)"
q"_root_.com.spotify.scio.bigquery.Date.days($tree)"
case t if t =:= typeOf[LocalTime] =>
q"_root_.com.spotify.scio.bigquery.Time($tree)"
q"_root_.com.spotify.scio.bigquery.Time.micros($tree)"
case t if t =:= typeOf[LocalDateTime] =>
q"_root_.com.spotify.scio.bigquery.DateTime($tree)"

Expand All @@ -200,7 +201,7 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[Json] =>
q"$tree.wkt"
case t if t =:= typeOf[BigNumeric] =>
q"_root_.com.spotify.scio.bigquery.types.BigNumeric($tree.wkt).toString"
q"_root_.com.spotify.scio.bigquery.types.BigNumeric.bytes($tree)"

// nested records
case t if isCaseClass(c)(t) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[types] object SchemaProvider {
def avroSchemaOf[T: TypeTag]: Schema =
AvroSchemaCache.get(
typeTag[T].tpe.toString,
BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields)
BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields, true)
)

def schemaOf[T: TypeTag]: TableSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,8 @@ package object types {
case b: ByteBuffer => new BigNumeric(DecimalConverter.fromBytes(b, null, DecimalLogicalType))
case _ => apply(value.toString)
}

def bytes(value: BigNumeric): ByteBuffer =
DecimalConverter.toBytes(value.wkt.bigDecimal, null, DecimalLogicalType)
}
}
Loading

0 comments on commit 98e3faf

Please sign in to comment.