Skip to content

Commit

Permalink
Allow custom timestamp with Spark timezone property (#621)
Browse files Browse the repository at this point in the history
* feat: allow custom timestamp with spark timezone

* docs: updated README

* fix: ability to run without setting spark.sql.session.timeZone

* feat: break parseXmlTimestamp method into built-in formats and custom format processing

* fix: removed unused code

* fix: timestampFormat with offset should not use spark.sql.session.timezone

* fix: ISO_INSTANT

* feat: added isParseableAsZonedDateTime

* refactor: isParseableAsZonedDateTime and Spark timeZone

* fix: removed sys.exit

* feat: use Instant.from() instead of converting a ZonedDateTime to an Instant

* fix: parameters with timezone

* fix: apply Spark timeZone only if no temporal information

* fix: hasTemporalInformation

* fix: spark config

* docs: commented for java 8 and 11
  • Loading branch information
JorisTruong authored Jan 3, 2023
1 parent 72957d5 commit d376877
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 13 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Defaults to `false`. New in 0.11.0.
* `timestampFormat`: Specifies an additional timestamp format that will be tried when parsing values as `TimestampType`
columns. The format is specified as described in [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
Defaults to try several formats, including [ISO_INSTANT](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT),
including variations with offset timezones or no timezone (defaults to UTC). New in 0.12.0.
including variations with offset timezones or no timezone (defaults to UTC). New in 0.12.0. As of 0.16.0, if a custom format pattern is used without a timezone, the default Spark timezone specified by `spark.sql.session.timeZone` will be used.
* `dateFormat`: Specifies an additional timestamp format that will be tried when parsing values as `DateType`
columns. The format is specified as described in [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
Defaults to [ISO_DATE](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_DATE). New in 0.12.0.
Expand All @@ -83,7 +83,7 @@ When writing files the API accepts several options:
* `compression`: compression codec to use when saving to file. Should be the fully qualified name of a class implementing `org.apache.hadoop.io.compress.CompressionCodec` or one of case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). Defaults to no compression when a codec is not specified.
* `timestampFormat`: Controls the format used to write `TimestampType` format columns.
The format is specified as described in [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
Defaults to [ISO_INSTANT](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT). New in 0.12.0.
Defaults to [ISO_INSTANT](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT). New in 0.12.0. As of 0.16.0, if a custom format pattern is used without a timezone, the default Spark timezone specified by `spark.sql.session.timeZone` will be used.
* `dateFormat`: Controls the format used to write `DateType` format columns.
The format is specified as described in [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
Defaults to [ISO_DATE](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_DATE). New in 0.12.0.
Expand Down
8 changes: 7 additions & 1 deletion src/main/scala/com/databricks/spark/xml/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,16 @@ class DefaultSource
(options.charset, options.rowTag)
}

val paramsWithTZ =
sqlContext.sparkContext.getConf.getOption("spark.sql.session.timeZone") match {
case Some(tz) => parameters.updated("timezone", tz)
case None => parameters
}

XmlRelation(
() => XmlFile.withCharset(sqlContext.sparkContext, path, charset, rowTag),
Some(path),
parameters,
paramsWithTZ,
schema)(sqlContext)
}

Expand Down
1 change: 1 addition & 0 deletions src/main/scala/com/databricks/spark/xml/XmlOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private[xml] class XmlOptions(
parameters.getOrElse("wildcardColName", XmlOptions.DEFAULT_WILDCARD_COL_NAME)
val ignoreNamespace = parameters.get("ignoreNamespace").map(_.toBoolean).getOrElse(false)
val timestampFormat = parameters.get("timestampFormat")
val timezone = parameters.get("timezone")
val dateFormat = parameters.get("dateFormat")
}

Expand Down
34 changes: 26 additions & 8 deletions src/main/scala/com/databricks/spark/xml/util/TypeCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ package com.databricks.spark.xml.util

import java.math.BigDecimal
import java.sql.{Date, Timestamp}
import java.text.NumberFormat
import java.time.{LocalDate, ZoneId, ZonedDateTime}
import java.text.{NumberFormat, ParsePosition}
import java.time.{Instant, LocalDate, ZoneId}
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder}
import java.util.Locale

import scala.util.Try
import scala.util.control.Exception._

import org.apache.spark.sql.types._
import com.databricks.spark.xml.XmlOptions

import java.time.temporal.TemporalQueries

/**
* Utility functions for type casting
*/
Expand Down Expand Up @@ -116,11 +116,29 @@ private[xml] object TypeCast {
)

private def parseXmlTimestamp(value: String, options: XmlOptions): Timestamp = {
val formatters = options.timestampFormat.map(DateTimeFormatter.ofPattern).
map(supportedXmlTimestampFormatters :+ _).getOrElse(supportedXmlTimestampFormatters)
formatters.foreach { format =>
supportedXmlTimestampFormatters.foreach { format =>
try {
return Timestamp.from(Instant.from(format.parse(value)))
} catch {
case _: Exception => // continue
}
}
options.timestampFormat.foreach { formatString =>
// Check if there is offset or timezone and apply Spark timeZone if not
// Useful to support Java 8 and Java 11+ as they prioritize zone and offset differently
val hasTemporalInformation = formatString.indexOf("V") +
formatString.indexOf("z") +
formatString.indexOf("O") +
formatString.indexOf("X") +
formatString.indexOf("x") +
formatString.indexOf("Z") != (-6)
val format = if (hasTemporalInformation) {
DateTimeFormatter.ofPattern(formatString)
} else {
DateTimeFormatter.ofPattern(formatString).withZone(options.timezone.map(ZoneId.of).orNull)
}
try {
return Timestamp.from(ZonedDateTime.parse(value, format).toInstant)
return Timestamp.from(Instant.from(format.parse(value)))
} catch {
case _: Exception => // continue
}
Expand Down
2 changes: 2 additions & 0 deletions src/test/resources/time.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
<author>John Smith</author>
<time>2011-12-03T10:15:30Z</time>
<time2>12-03-2011 10:15:30 PST</time2>
<time3>2011/12/03 06:15:30</time3>
<time4>2011/12/03 16:15:30 +1000</time4>
</book>
53 changes: 51 additions & 2 deletions src/test/scala/com/databricks/spark/xml/XmlSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,13 @@ final class XmlSuite extends AnyFunSuite with BeforeAndAfterAll {
.option("rowTag", "book")
.xml(resDir + "time.xml")
val expectedSchema =
buildSchema(field("author"), field("time", TimestampType), field("time2", StringType))
buildSchema(
field("author"),
field("time", TimestampType),
field("time2", StringType),
field("time3", StringType),
field("time4", StringType)
)
assert(df.schema === expectedSchema)
assert(df.collect().head.getAs[Timestamp](1).getTime === 1322907330000L)
}
Expand All @@ -1392,11 +1398,54 @@ final class XmlSuite extends AnyFunSuite with BeforeAndAfterAll {
.option("timestampFormat", "MM-dd-yyyy HH:mm:ss z")
.xml(resDir + "time.xml")
val expectedSchema =
buildSchema(field("author"), field("time", TimestampType), field("time2", TimestampType))
buildSchema(
field("author"),
field("time", TimestampType),
field("time2", TimestampType),
field("time3", StringType),
field("time4", StringType),
)
assert(df.schema === expectedSchema)
assert(df.collect().head.getAs[Timestamp](1).getTime === 1322907330000L)
assert(df.collect().head.getAs[Timestamp](2).getTime === 1322936130000L)
}

test("Test custom timestampFormat without timezone") {
val df = spark.read
.option("rowTag", "book")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
.xml(resDir + "time.xml")
val expectedSchema =
buildSchema(
field("author"),
field("time", TimestampType),
field("time2", StringType),
field("time3", TimestampType),
field("time4", StringType)
)
assert(df.schema === expectedSchema)
assert(df.collect().head.getAs[Timestamp](1).getTime === 1322907330000L)
assert(df.collect().head.getAs[Timestamp](3).getTime === 1322892930000L)
}

test("Test custom timestampFormat with offset") {
val df = spark.read
.option("rowTag", "book")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss xx")
.xml(resDir + "time.xml")
val expectedSchema =
buildSchema(
field("author"),
field("time", TimestampType),
field("time2", StringType),
field("time3", StringType),
field("time4", TimestampType)
)
assert(df.schema === expectedSchema)
assert(df.collect().head.getAs[Timestamp](1).getTime === 1322907330000L)
assert(df.collect().head.getAs[Timestamp](4).getTime === 1322892930000L)
}

test("Test null number type is null not 0.0") {
val schema = buildSchema(
struct("Header",
Expand Down
69 changes: 69 additions & 0 deletions src/test/scala/com/databricks/spark/xml/util/TypeCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,73 @@ final class TypeCastSuite extends AnyFunSuite {
Locale.setDefault(defaultLocale)
}
}

test("Parsing built-in timestamp formatters") {
val options = XmlOptions(Map())
val expectedResult = Timestamp.from(
ZonedDateTime.of(2002, 5, 30, 21, 46, 54, 0, ZoneId.of("UTC"))
.toInstant
)
assert(
TypeCast.castTo("2002-05-30 21:46:54", TimestampType, options) === expectedResult
)
assert(
TypeCast.castTo("2002-05-30T21:46:54", TimestampType, options) === expectedResult
)
assert(
TypeCast.castTo("2002-05-30T21:46:54+00:00", TimestampType, options) === expectedResult
)
assert(
TypeCast.castTo("2002-05-30T21:46:54.0000Z", TimestampType, options) === expectedResult
)
}

test("Custom timestamp format is used to parse correctly") {
var options = XmlOptions(Map("timestampFormat" -> "MM-dd-yyyy HH:mm:ss", "timezone" -> "UTC"))
assert(
TypeCast.castTo("12-03-2011 10:15:30", TimestampType, options) ===
Timestamp.from(
ZonedDateTime.of(2011, 12, 3, 10, 15, 30, 0, ZoneId.of("UTC"))
.toInstant
)
)

options = XmlOptions(Map("timestampFormat" -> "yyyy/MM/dd HH:mm:ss", "timezone" -> "UTC"))
assert(
TypeCast.castTo("2011/12/03 10:15:30", TimestampType, options) ===
Timestamp.from(
ZonedDateTime.of(2011, 12, 3, 10, 15, 30, 0, ZoneId.of("UTC"))
.toInstant
)
)

options = XmlOptions(Map("timestampFormat" -> "yyyy/MM/dd HH:mm:ss",
"timezone" -> "Asia/Shanghai"))
assert(
TypeCast.castTo("2011/12/03 10:15:30", TimestampType, options) !==
Timestamp.from(
ZonedDateTime.of(2011, 12, 3, 10, 15, 30, 0, ZoneId.of("UTC"))
.toInstant
)
)

options = XmlOptions(Map("timestampFormat" -> "yyyy/MM/dd HH:mm:ss",
"timezone" -> "Asia/Shanghai"))
assert(
TypeCast.castTo("2011/12/03 10:15:30", TimestampType, options) ===
Timestamp.from(
ZonedDateTime.of(2011, 12, 3, 10, 15, 30, 0, ZoneId.of("Asia/Shanghai"))
.toInstant
)
)

options = XmlOptions(Map("timestampFormat" -> "yyyy/MM/dd HH:mm:ss"))
intercept[IllegalArgumentException](
TypeCast.castTo("2011/12/03 10:15:30", TimestampType, options) ===
Timestamp.from(
ZonedDateTime.of(2011, 12, 3, 10, 15, 30, 0, ZoneId.of("UTC"))
.toInstant
)
)
}
}

0 comments on commit d376877

Please sign in to comment.