Skip to content
This repository has been archived by the owner on Jun 24, 2024. It is now read-only.

Commit

Permalink
Start a generic CSV codec for Sparkle (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvcnt committed May 1, 2018
1 parent 8662434 commit 1eea1d8
Show file tree
Hide file tree
Showing 31 changed files with 347 additions and 525 deletions.
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ script:
--action_env MYSQL_USER \
--experimental_repository_cache="$HOME/.bazel_repository_cache" \
--local_resources=400,1,1.0 \
//accio/...
//accio/...
branches:
except:
- experimental
4 changes: 4 additions & 0 deletions 3rdparty/dependencies.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ dependencies:
breeze:
version: "0.12"
lang: scala
com.univocity:
univocity-parsers:
version: 2.6.3
lang: java
org.ocpsoft.prettytime:
prettytime:
version: "4.0.1.Final"
Expand Down
12 changes: 12 additions & 0 deletions 3rdparty/jvm/com/univocity/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
load("@io_bazel_rules_scala//scala:scala_import.bzl", "scala_import")
java_library(
name = "univocity_parsers",
exports = [
"//external:jar/com/univocity/univocity_parsers"
],
visibility = [
"//visibility:public"
]
)


1 change: 1 addition & 0 deletions 3rdparty/jvm/workspace.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def list_dependencies():
{"artifact": "com.twitter:util-slf4j-jul-bridge_2.11:18.4.0", "lang": "scala", "sha1": "c6b35122cde95f20d9f78049aec9467c6cafc332", "repository": "http://central.maven.org/maven2/", "name": "com_twitter_util_slf4j_jul_bridge_2_11", "actual": "@com_twitter_util_slf4j_jul_bridge_2_11//jar:file", "bind": "jar/com/twitter/util_slf4j_jul_bridge_2_11"},
{"artifact": "com.twitter:util-stats_2.11:18.4.0", "lang": "scala", "sha1": "00063cdf341cc686f42ec9926185ef06e51b2e78", "repository": "http://central.maven.org/maven2/", "name": "com_twitter_util_stats_2_11", "actual": "@com_twitter_util_stats_2_11//jar:file", "bind": "jar/com/twitter/util_stats_2_11"},
{"artifact": "com.twitter:util-tunable_2.11:18.4.0", "lang": "java", "sha1": "37f61d1bbc66136dc282e0705dc79c5fe3e95b7e", "repository": "http://central.maven.org/maven2/", "name": "com_twitter_util_tunable_2_11", "actual": "@com_twitter_util_tunable_2_11//jar", "bind": "jar/com/twitter/util_tunable_2_11"},
{"artifact": "com.univocity:univocity-parsers:2.6.3", "lang": "java", "sha1": "da951415f863956ffdbbf1a29ad3e79b4fc22aa2", "repository": "http://central.maven.org/maven2/", "name": "com_univocity_univocity_parsers", "actual": "@com_univocity_univocity_parsers//jar", "bind": "jar/com/univocity/univocity_parsers"},
{"artifact": "commons-cli:commons-cli:1.3.1", "lang": "java", "sha1": "1303efbc4b181e5a58bf2e967dc156a3132b97c0", "repository": "http://central.maven.org/maven2/", "name": "commons_cli_commons_cli", "actual": "@commons_cli_commons_cli//jar", "bind": "jar/commons_cli/commons_cli"},
{"artifact": "commons-codec:commons-codec:1.9", "lang": "java", "sha1": "9ce04e34240f674bc72680f8b843b1457383161a", "repository": "http://central.maven.org/maven2/", "name": "commons_codec_commons_codec", "actual": "@commons_codec_commons_codec//jar", "bind": "jar/commons_codec/commons_codec"},
{"artifact": "commons-fileupload:commons-fileupload:1.3.1", "lang": "java", "sha1": "c621b54583719ac0310404463d6d99db27e1052c", "repository": "http://central.maven.org/maven2/", "name": "commons_fileupload_commons_fileupload", "actual": "@commons_fileupload_commons_fileupload//jar", "bind": "jar/commons_fileupload/commons_fileupload"},
Expand Down
1 change: 0 additions & 1 deletion accio/java/fr/cnrs/liris/locapriv/install/OpsModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ object OpsModule extends TwitterModule {
ops.addBinding.toInstance(OpMeta.apply[SpatioTemporalDistortionOp])
ops.addBinding.toInstance(OpMeta.apply[TemporalGapSplittingOp])
ops.addBinding.toInstance(OpMeta.apply[TemporalSamplingOp])
ops.addBinding.toInstance(OpMeta.apply[TransmissionDelayOp])
ops.addBinding.toInstance(OpMeta.apply[UniformSamplingOp])
ops.addBinding.toInstance(OpMeta.apply[Wait4MeOp])
}
Expand Down
14 changes: 8 additions & 6 deletions accio/java/fr/cnrs/liris/locapriv/io/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
# along with Accio. If not, see <http://www.gnu.org/licenses/>.

scala_library(
name = 'io',
name = "io",
srcs = glob(["*.scala"]),
deps = [
'//accio/java/fr/cnrs/liris/locapriv/model',
'//accio/java/fr/cnrs/liris/locapriv/sparkle',
'//accio/java/fr/cnrs/liris/util',
'//accio/java/fr/cnrs/liris/util/geo',
'//3rdparty/jvm/com/twitter:inject_core',
"//accio/java/fr/cnrs/liris/locapriv/model",
"//accio/java/fr/cnrs/liris/locapriv/sparkle",
"//accio/java/fr/cnrs/liris/util",
"//accio/java/fr/cnrs/liris/util/geo",
"//accio/java/fr/cnrs/liris/util/reflect",
"//3rdparty/jvm/com/univocity:univocity_parsers",
"//3rdparty/jvm/com/twitter:inject_core",
],
visibility = ["//visibility:public"],
)
184 changes: 132 additions & 52 deletions accio/java/fr/cnrs/liris/locapriv/io/Codec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,66 +18,146 @@

package fr.cnrs.liris.locapriv.io

import fr.cnrs.liris.util.ByteUtils
import fr.cnrs.liris.util.geo.{Distance, LatLng, Location}
import fr.cnrs.liris.util.reflect.CaseClass
import org.joda.time.{Duration, Instant}

import scala.reflect.ClassTag
import scala.util.Try

/**
* A codec is the association of an encoder and a decoder of the same type.
*
* @tparam T Type of elements being read and written.
*/
trait Codec[T] extends Encoder[T] with Decoder[T]
trait Codec[T] {
def columnNames(fieldName: String): Seq[String] = Seq(fieldName)

/**
* An encoder converts a plain object into bytes.
*
* @tparam T Type of elements being written.
*/
trait Encoder[T] {
/**
* Return the class tag of the element being written.
*/
def elementClassTag: ClassTag[T]

/**
* Encode an object into a sequence of bytes.
*
* @param key Key associated with the file being written.
* @param elements Elements to encode.
*/
def encode(key: String, elements: Seq[T]): Array[Byte]
def encode(value: T): Seq[String]

def decode(values: Seq[String]): Option[T]
}

/**
* A decoder converts a binary (record) into a plain object.
*
* @tparam T Type of elements being read.
*/
trait Decoder[T] {
/**
* Return the class tag of the element being read.
*/
def elementClassTag: ClassTag[T]

/**
* Decode a binary record into one or several objects.
*
* @param key Key associated with the file being read.
* @param bytes Binary content.
*/
def decode(key: String, bytes: Array[Byte]): Seq[T]
object IntCodec extends Codec[Int] {
override def encode(value: Int): Seq[String] = Seq(value.toString)

override def decode(values: Seq[String]): Option[Int] = Try(values.head.toInt).toOption
}

import scala.reflect.{ClassTag, classTag}
object LongCodec extends Codec[Long] {
override def encode(value: Long): Seq[String] = Seq(value.toString)

/**
* Codec doing nothing (reading and returning bytes).
*/
object IdentityCodec extends Codec[Array[Byte]] {
override def elementClassTag: ClassTag[Array[Byte]] = classTag[Array[Byte]]
override def decode(values: Seq[String]): Option[Long] = Try(values.head.toLong).toOption
}

object FloatCodec extends Codec[Float] {
override def encode(value: Float): Seq[String] = Seq(value.toString)

override def decode(values: Seq[String]): Option[Float] = Try(values.head.toFloat).toOption
}

object DoubleCodec extends Codec[Double] {
override def encode(value: Double): Seq[String] = Seq(value.toString)

override def decode(values: Seq[String]): Option[Double] = Try(values.head.toDouble).toOption
}

object BooleanCodec extends Codec[Boolean] {
override def encode(value: Boolean): Seq[String] = Seq(if (value) "t" else "f")

override def decode(values: Seq[String]): Option[Boolean] =
values.head match {
case "yes" | "y" | "true" | "t" | "1" => Some(true)
case "no" | "n" | "false" | "f" | "0" => Some(false)
case _ => None
}
}

object StringCodec extends Codec[String] {
override def encode(value: String): Seq[String] = Seq(value)

override def decode(values: Seq[String]): Option[String] = Some(values.head)
}

object TimestampCodec extends Codec[Instant] {
override def encode(value: Instant): Seq[String] = Seq(value.getMillis.toString)

override def decode(values: Seq[String]): Option[Instant] =
LongCodec.decode(values).map(v => new Instant(v))
}

object DurationCodec extends Codec[Duration] {
override def encode(value: Duration): Seq[String] = Seq(value.getMillis.toString)

override def decode(values: Seq[String]): Option[Duration] =
LongCodec.decode(values).map(v => new Duration(v))
}

object LocationCodec extends Codec[Location] {
override def columnNames(fieldName: String): Seq[String] =
Seq(s"${fieldName}_lat", s"${fieldName}_lng")

override def encode(value: Location): Seq[String] = {
val latLng = value.toLatLng
Seq(latLng.lat.degrees.toString, latLng.lng.degrees.toString)
}

override def decode(values: Seq[String]): Option[Location] = {
DoubleCodec.decode(Seq(values.head))
.zip(DoubleCodec.decode(values.tail))
.map { case (lat, lng) => LatLng.degrees(lat, lng) }
.headOption
}
}

object DistanceCodec extends Codec[Distance] {
override def encode(value: Distance): Seq[String] = Seq(value.meters.toString)

override def decode(values: Seq[String]): Option[Distance] =
DoubleCodec.decode(values).map(Distance.meters)
}

final class StructCodec[T <: Product](reflect: CaseClass) extends Codec[T] {
private[this] val codecs = {
reflect
.fields
.map { field =>
field.scalaType match {
case t if t.isA[Int] => IntCodec
case t if t.isA[Long] => LongCodec
case t if t.isA[Float] => FloatCodec
case t if t.isA[Double] => DoubleCodec
case t if t.isA[Boolean] => BooleanCodec
case t if t.isA[String] => StringCodec
case t if t.isA[Distance] => DistanceCodec
case t if t.isLike[Instant] => TimestampCodec
case t if t.isLike[Duration] => DurationCodec
case t if t.isLike[Location] => LocationCodec
case t => throw new IllegalArgumentException(s"Unsupported type: $t")
}
}
.map(_.asInstanceOf[Codec[Any]])
}
private[this] val columns: Seq[Seq[Int]] = {
reflect.fields.map { field =>
codecs(field.index).columnNames(field.name).indices.map(idx => field.index + idx).toList
}
}

override def columnNames(fieldName: String): Seq[String] = {
reflect.fields.flatMap { field =>
val names = codecs(field.index).columnNames(field.name)
if (fieldName.isEmpty) names else names.map(name => s"$fieldName.$name")
}
}

override def encode(key: String, elements: Seq[Array[Byte]]): Array[Byte] = ByteUtils.foldLines(elements)
override def encode(value: T): Seq[String] = {
reflect.fields.flatMap(field => codecs(field.index).encode(value.productElement(field.index)))
}

override def decode(key: String, bytes: Array[Byte]): Seq[Array[Byte]] = Seq(bytes)
override def decode(values: Seq[String]): Option[T] = {
val args = reflect.fields.flatMap { field =>
val codec = codecs(field.index)
codec.decode(columns(field.index).map(values.apply))
}
if (args.size == reflect.fields.size) {
Some(reflect.newInstance(args).asInstanceOf[T])
} else {
None
}
}
}
71 changes: 0 additions & 71 deletions accio/java/fr/cnrs/liris/locapriv/io/CsvEventCodec.scala

This file was deleted.

Loading

0 comments on commit 1eea1d8

Please sign in to comment.