Skip to content

Commit

Permalink
Add dateFormat, timestampFormat support (#524)
Browse files Browse the repository at this point in the history
* Add dateFormat, timestampFormat support
  • Loading branch information
srowen authored Feb 20, 2021
1 parent 0a7289a commit ceed1b8
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 25 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ In this case, to use local XSD `/foo/bar.xsd`, call `addFile("/foo/bar.xsd")` an
for example, be treated as if both are just `<author>`. Note that, at the moment, namespaces cannot be ignored on the
`rowTag` element, only its children. Note that XML parsing is in general not namespace-aware even if `false`.
Defaults to `false`. New in 0.11.0.
* `timestampFormat`: Specifies an additional timestamp format that will be tried when parsing values as `TimestampType`
columns. The format is specified as described in [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
Defaults to try several formats, including [ISO_INSTANT](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT),
including variations with offset timezones or no timezone (defaults to UTC). New in 0.12.0.
* `dateFormat`: Specifies an additional timestamp format that will be tried when parsing values as `DateType`
columns. The format is specified as described in [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
Defaults to [ISO_DATE](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_DATE). New in 0.12.0.

When writing files the API accepts several options:

Expand All @@ -98,6 +105,12 @@ When writing files the API accepts several options:
* `attributePrefix`: The prefix for attributes so that we can differentiating attributes and elements. This will be the prefix for field names. Default is `_`.
* `valueTag`: The tag used for the value when there are attributes in the element having no child. Default is `_VALUE`.
* `compression`: compression codec to use when saving to file. Should be the fully qualified name of a class implementing `org.apache.hadoop.io.compress.CompressionCodec` or one of case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). Defaults to no compression when a codec is not specified.
* `timestampFormat`: Controls the format used to write `TimestampType` format columns.
The format is specified as described in [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
Defaults to [ISO_INSTANT](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT). New in 0.12.0.
* `dateFormat`: Controls the format used to write `DateType` format columns.
The format is specified as described in [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
Defaults to [ISO_DATE](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_DATE). New in 0.12.0.

Currently it supports the shortened name usage. You can use just `xml` instead of `com.databricks.spark.xml`.

Expand Down
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ mimaBinaryIssueFilters ++= {
exclude[DirectMissingMethodProblem](
"com.databricks.spark.xml.util.TypeCast.supportedXmlTimestampFormatters"),
exclude[DirectMissingMethodProblem](
"com.databricks.spark.xml.util.TypeCast.parseXmlTimestamp")
"com.databricks.spark.xml.util.TypeCast.parseXmlTimestamp"),
exclude[DirectMissingMethodProblem](
"com.databricks.spark.xml.util.TypeCast.isTimestamp")
)
}
2 changes: 2 additions & 0 deletions src/main/scala/com/databricks/spark/xml/XmlOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ private[xml] class XmlOptions(
val wildcardColName =
parameters.getOrElse("wildcardColName", XmlOptions.DEFAULT_WILDCARD_COL_NAME)
val ignoreNamespace = parameters.get("ignoreNamespace").map(_.toBoolean).getOrElse(false)
val timestampFormat = parameters.get("timestampFormat")
val dateFormat = parameters.get("dateFormat")
}

private[xml] object XmlOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.databricks.spark.xml.parsers

import java.sql.{Date, Timestamp}
import java.time.format.DateTimeFormatter

import javax.xml.stream.XMLStreamWriter
Expand Down Expand Up @@ -46,10 +47,10 @@ private[xml] object StaxXmlGenerator {
// elements when given values are `null`.
case (_, _, _) if name == options.valueTag =>
// If this is meant to be value but in no child, write only a value
writeElement(dt, v)
writeElement(dt, v, options)
case (_, _, _) =>
writer.writeStartElement(name)
writeElement(dt, v)
writeElement(dt, v, options)
writer.writeEndElement()
}

Expand All @@ -75,11 +76,17 @@ private[xml] object StaxXmlGenerator {
}
}

def writeElement(dt: DataType, v: Any): Unit = (dt, v) match {
def writeElement(dt: DataType, v: Any, options: XmlOptions): Unit = (dt, v) match {
case (_, null) | (NullType, _) => writer.writeCharacters(options.nullValue)
case (StringType, v: String) => writer.writeCharacters(v)
case (TimestampType, v: java.sql.Timestamp) =>
writer.writeCharacters(DateTimeFormatter.ISO_INSTANT.format(v.toInstant()))
case (TimestampType, v: Timestamp) =>
val formatter = options.timestampFormat.map(DateTimeFormatter.ofPattern).
getOrElse(DateTimeFormatter.ISO_INSTANT)
writer.writeCharacters(formatter.format(v.toInstant()))
case (DateType, v: Date) =>
val formatter = options.dateFormat.map(DateTimeFormatter.ofPattern).
getOrElse(DateTimeFormatter.ISO_DATE)
writer.writeCharacters(formatter.format(v.toLocalDate()))
case (IntegerType, v: Int) => writer.writeCharacters(v.toString)
case (ShortType, v: Short) => writer.writeCharacters(v.toString)
case (FloatType, v: Float) => writer.writeCharacters(v.toString)
Expand All @@ -88,7 +95,6 @@ private[xml] object StaxXmlGenerator {
case (DecimalType(), v: java.math.BigDecimal) => writer.writeCharacters(v.toString)
case (ByteType, v: Byte) => writer.writeCharacters(v.toString)
case (BooleanType, v: Boolean) => writer.writeCharacters(v.toString)
case (DateType, _) => writer.writeCharacters(v.toString)

// For the case roundtrip in reading and writing XML files, [[ArrayType]] cannot have
// [[ArrayType]] as element type. It always wraps the element with [[StructType]]. So,
Expand Down Expand Up @@ -142,7 +148,7 @@ private[xml] object StaxXmlGenerator {
val (names, values) = elements.unzip
val elementSchema = StructType(schema.filter(names.contains))
val elementRow = Row.fromSeq(row.toSeq.filter(values.contains))
writeElement(elementSchema, elementRow)
writeElement(elementSchema, elementRow, options)
writer.writeEndElement()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ private[xml] object InferSchema {
case v if isInteger(v) => IntegerType
case v if isDouble(v) => DoubleType
case v if isBoolean(v) => BooleanType
case v if isTimestamp(v) => TimestampType
case v if isDate(v) => DateType
case v if isTimestamp(v, options) => TimestampType
case v if isDate(v, options) => DateType
case _ => StringType
}
} else {
Expand Down
34 changes: 24 additions & 10 deletions src/main/scala/com/databricks/spark/xml/util/TypeCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ private[xml] object TypeCast {
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue())
case _: BooleanType => parseXmlBoolean(datum)
case _: DecimalType => new BigDecimal(datum.replaceAll(",", ""))
case _: TimestampType => parseXmlTimestamp(datum)
case _: DateType => parseXmlDate(datum)
case _: TimestampType => parseXmlTimestamp(datum, options)
case _: DateType => parseXmlDate(datum, options)
case _: StringType => datum
case _ => throw new IllegalArgumentException(s"Unsupported type: ${castType.typeName}")
}
Expand All @@ -86,8 +86,10 @@ private[xml] object TypeCast {
DateTimeFormatter.ISO_DATE
)

private def parseXmlDate(value: String): Date = {
supportedXmlDateFormatters.foreach { format =>
private def parseXmlDate(value: String, options: XmlOptions): Date = {
val formatters = options.dateFormat.map(DateTimeFormatter.ofPattern).
map(supportedXmlDateFormatters :+ _).getOrElse(supportedXmlDateFormatters)
formatters.foreach { format =>
try {
return Date.valueOf(LocalDate.parse(value, format))
} catch {
Expand All @@ -114,8 +116,10 @@ private[xml] object TypeCast {
DateTimeFormatter.ISO_INSTANT
)

private def parseXmlTimestamp(value: String): Timestamp = {
supportedXmlTimestampFormatters.foreach { format =>
private def parseXmlTimestamp(value: String, options: XmlOptions): Timestamp = {
val formatters = options.timestampFormat.map(DateTimeFormatter.ofPattern).
map(supportedXmlTimestampFormatters :+ _).getOrElse(supportedXmlTimestampFormatters)
formatters.foreach { format =>
try {
return Timestamp.from(ZonedDateTime.parse(value, format).toInstant)
} catch {
Expand Down Expand Up @@ -191,12 +195,22 @@ private[xml] object TypeCast {
(allCatch opt signSafeValue.toLong).isDefined
}

private[xml] def isTimestamp(value: String): Boolean = {
(allCatch opt Timestamp.valueOf(value)).isDefined
private[xml] def isTimestamp(value: String, options: XmlOptions): Boolean = {
try {
parseXmlTimestamp(value, options)
true
} catch {
case _: IllegalArgumentException => false
}
}

private[xml] def isDate(value: String): Boolean = {
(allCatch opt Date.valueOf(value)).isDefined
private[xml] def isDate(value: String, options: XmlOptions): Boolean = {
try {
parseXmlDate(value, options)
true
} catch {
case _: IllegalArgumentException => false
}
}

private[xml] def signSafeToLong(value: String, options: XmlOptions): Long = {
Expand Down
3 changes: 2 additions & 1 deletion src/test/resources/date.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<book>
<author>John Smith</author>
<date>2021-01-01</date>
<date>2021-02-01</date>
<date2>02-01-2021</date2>
</book>
5 changes: 5 additions & 0 deletions src/test/resources/time.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<book>
<author>John Smith</author>
<time>2011-12-03T10:15:30Z</time>
<time2>12-03-2011 10:15:30 PST</time2>
</book>
52 changes: 49 additions & 3 deletions src/test/scala/com/databricks/spark/xml/XmlSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ final class XmlSuite extends AnyFunSuite with BeforeAndAfterAll {
master("local[2]").
appName("XmlSuite").
config("spark.ui.enabled", false).
config("spark.sql.session.timeZone", "UTC").
getOrCreate()
}
private var tempDir: Path = _
Expand Down Expand Up @@ -1298,19 +1299,64 @@ final class XmlSuite extends AnyFunSuite with BeforeAndAfterAll {
}

test("Test date parsing") {
val schema = buildSchema(field("author"), field("date", DateType))
val schema = buildSchema(field("author"), field("date", DateType), field("date2", StringType))
val df = spark.read
.option("rowTag", "book")
.schema(schema)
.xml(resDir + "date.xml")
assert(df.collect().head.getAs[Date](1).toString === "2021-01-01")
assert(df.collect().head.getAs[Date](1).toString === "2021-02-01")
}

test("Test date type inference") {
val df = spark.read
.option("rowTag", "book")
.xml(resDir + "date.xml")
assert(df.dtypes(1) === ("date", "DateType"))
val expectedSchema =
buildSchema(field("author"), field("date", DateType), field("date2", StringType))
assert(df.schema === expectedSchema)
assert(df.collect().head.getAs[Date](1).toString === "2021-02-01")
}

test("Test timestamp parsing") {
val schema =
buildSchema(field("author"), field("time", TimestampType), field("time2", StringType))
val df = spark.read
.option("rowTag", "book")
.schema(schema)
.xml(resDir + "time.xml")
assert(df.collect().head.getAs[Timestamp](1).getTime === 1322907330000L)
}

test("Test timestamp type inference") {
val df = spark.read
.option("rowTag", "book")
.xml(resDir + "time.xml")
val expectedSchema =
buildSchema(field("author"), field("time", TimestampType), field("time2", StringType))
assert(df.schema === expectedSchema)
assert(df.collect().head.getAs[Timestamp](1).getTime === 1322907330000L)
}

test("Test dateFormat") {
val df = spark.read
.option("rowTag", "book")
.option("dateFormat", "MM-dd-yyyy")
.xml(resDir + "date.xml")
val expectedSchema =
buildSchema(field("author"), field("date", DateType), field("date2", DateType))
assert(df.schema === expectedSchema)
assert(df.collect().head.getAs[Date](2).toString === "2021-02-01")
}

test("Test timestampFormat") {
val df = spark.read
.option("rowTag", "book")
.option("timestampFormat", "MM-dd-yyyy HH:mm:ss z")
.xml(resDir + "time.xml")
val expectedSchema =
buildSchema(field("author"), field("time", TimestampType), field("time2", TimestampType))
assert(df.schema === expectedSchema)
assert(df.collect().head.getAs[Timestamp](2).getTime === 1322936130000L)
}

private def getLines(path: Path): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ final class TypeCastSuite extends AnyFunSuite {
assert(TypeCast.isLong("10"))
assert(TypeCast.isDouble("+10.1"))
val timestamp = "2015-01-01 00:00:00"
assert(TypeCast.isTimestamp(timestamp))
assert(TypeCast.isTimestamp(timestamp, new XmlOptions()))
}

test("Float and Double Types are cast correctly with Locale") {
Expand Down

0 comments on commit ceed1b8

Please sign in to comment.