Skip to content

Commit

Permalink
Stable behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Nov 28, 2024
1 parent 98e3faf commit 158c9bf
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
waitForTable(avroTable)

runWithRealContext(options) { sc =>
val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro)
val data =
sc.bigQueryTable(avroTable, Format.GenericRecordWithLogicalTypes).map(Record.fromAvro)
data should containInAnyOrder(records)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,11 @@ object BigQueryTypedTable {
/** Defines the format in which BigQuery can be read and written to. */
sealed abstract class Format[F]
object Format {
case object GenericRecord extends Format[GenericRecord]
sealed abstract private[bigquery] class AvroFormat(val useLogicalTypes: Boolean)
extends Format[GenericRecord]

case object GenericRecord extends AvroFormat(false)
case object GenericRecordWithLogicalTypes extends AvroFormat(true)
case object TableRow extends Format[TableRow]
}

Expand Down Expand Up @@ -389,25 +393,31 @@ object BigQueryTypedTable {
)(coders.tableRowCoder)

private[this] def genericRecord(
table: Table
table: Table,
useLogicalTypes: Boolean
)(implicit c: Coder[GenericRecord]): BigQueryTypedTable[GenericRecord] =
BigQueryTypedTable(
_.getRecord(),
identity[GenericRecord],
(genericRecord: GenericRecord, _: TableSchema) => genericRecord,
table
beam.BigQueryIO
.read(_.getRecord)
.pipe(r => if (useLogicalTypes) r.useAvroLogicalTypes() else r),
beam.BigQueryIO
.write[GenericRecord]()
.withAvroFormatFunction(_.getElement)
.pipe(r => if (useLogicalTypes) r.useAvroLogicalTypes() else r),
table,
(genericRecord: GenericRecord, _: TableSchema) => genericRecord
)

/**
* Creates a new instance of [[BigQueryTypedTable]] based on the supplied [[Format]].
*
* NOTE: LogicalType support when using `Format.GenericRecord` has some caveats: Reading: Bigquery
* types DATE, TIME, DATIME will be read as STRING. Writing: Supports LogicalTypes only for DATE
* and TIME. DATETIME is not yet supported. https://issuetracker.google.com/issues/140681683
* types DATE, TIME, DATEIME will be read as STRING. Use `Format.GenericRecordWithLogicalTypes`
* for avro `date`, `timestamp-micros` and `local-timestamp-micros` (avro 1.10+)
*/
def apply[F: Coder](table: Table, format: Format[F]): BigQueryTypedTable[F] =
format match {
case Format.GenericRecord => genericRecord(table)
case f: Format.AvroFormat => genericRecord(table, f.useLogicalTypes)
case Format.TableRow => tableRow(table)
}

Expand Down Expand Up @@ -437,16 +447,11 @@ object BigQueryTypedTable {
): BigQueryTypedTable[T] = {
val rFn = ClosureCleaner.clean(readerFn)
val wFn = ClosureCleaner.clean(writerFn)
val reader = beam.BigQueryIO
.read(rFn(_))
.useAvroLogicalTypes()
val reader = beam.BigQueryIO.read(rFn(_))
val writer = beam.BigQueryIO
.write[T]()
.useAvroLogicalTypes()
.withAvroFormatFunction(input => wFn(input.getElement()))
.withAvroSchemaFactory { ts =>
BigQueryUtils.toGenericAvroSchema("root", ts.getFields(), true)
}

BigQueryTypedTable(reader, writer, table, fn)
}
Expand Down Expand Up @@ -740,12 +745,31 @@ object BigQueryTyped {
override type ReadP = Unit
override type WriteP = Table.WriteParam[T]

private val underlying = BigQueryTypedTable[T](
(i: SchemaAndRecord) => BigQueryType[T].fromAvro(i.getRecord),
BigQueryType[T].toTableRow,
BigQueryType[T].fromTableRow,
table
)
private val underlying = {
val readFn = Functions.serializableFn[SchemaAndRecord, T] { x =>
BigQueryType[T].fromAvro(x.getRecord)
}
val writeFn = Functions.serializableFn[AvroWriteRequest[T], GenericRecord] { x =>
BigQueryType[T].toAvro(x.getElement)
}
val schemaFactory = Functions.serializableFn[TableSchema, org.apache.avro.Schema] { _ =>
BigQueryType[T].avroSchema
}
val parseFn = (r: GenericRecord, _: TableSchema) => BigQueryType[T].fromAvro(r)

BigQueryTypedTable[T](
beam.BigQueryIO
.read(readFn)
.useAvroLogicalTypes(),
beam.BigQueryIO
.write[T]()
.withAvroFormatFunction(writeFn)
.withAvroSchemaFactory(schemaFactory)
.useAvroLogicalTypes(),
table,
parseFn
)
}

override def testId: String = s"BigQueryIO(${table.spec})"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ object DateTime {
/** Convert BigQuery `DATETIME` string to `LocalDateTime`. */
def parse(datetime: String): LocalDateTime =
Parser.parseLocalDateTime(datetime)

// For BigQueryType macros only, do not use directly
// read
def format(datetime: LocalDateTime): String = apply(datetime)
// write
def micros(datetime: LocalDateTime): Long = datetime.getMillisOfDay * 1000
}

/** Scala wrapper for [[com.google.api.services.bigquery.model.TimePartitioning]]. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ final class SCollectionGenericRecordOps[T <: GenericRecord](private val self: SC
self
.covary[GenericRecord]
.write(
BigQueryTypedTable(table, Format.GenericRecord)(
BigQueryTypedTable(table, Format.GenericRecordWithLogicalTypes)(
self.coder.asInstanceOf[Coder[GenericRecord]]
)
)(param)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
* Reading records as GenericRecord **should** offer better performance over TableRow records.
*
* Note: When using `Format.GenericRecord` Bigquery types DATE, TIME and DATETIME are read as
* STRING.
* STRING. Use `Format.GenericRecordWithLogicalTypes` for avro `date`, `timestamp-micros` and
* `local-timestamp-micros` (avro 1.10+)
*/
def bigQueryTable[F: Coder](table: Table, format: Format[F]): SCollection[F] =
self.read(BigQueryTypedTable(table, format))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[LocalTime] =>
q"_root_.com.spotify.scio.bigquery.Time.micros($tree)"
case t if t =:= typeOf[LocalDateTime] =>
q"_root_.com.spotify.scio.bigquery.DateTime($tree)"
// LocalDateTime is read as avro string
// on write we should use `local-timestamp-micros`
q"_root_.com.spotify.scio.bigquery.DateTime.format($tree)"

// different than nested record match below, even though thore are case classes
case t if t =:= typeOf[Geography] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ private[types] object TypeProvider {
q"override def schema: ${p(c, GModel)}.TableSchema = ${p(c, SUtil)}.parseSchema(${schema.toString})"
}
val defAvroSchema =
q"override def avroSchema: org.apache.avro.Schema = ${p(c, BigQueryUtils)}.toGenericAvroSchema(${cName.toString}, this.schema.getFields)"
q"override def avroSchema: org.apache.avro.Schema = ${p(c, BigQueryUtils)}.toGenericAvroSchema(this.schema, true)"
val defToPrettyString =
q"override def toPrettyString(indent: Int = 0): String = ${p(c, s"$SBQ.types.SchemaUtil")}.toPrettyString(this.schema, ${cName.toString}, indent)"

Expand Down

0 comments on commit 158c9bf

Please sign in to comment.