Skip to content

Commit

Permalink
added a wire format for Thrift. fixes #341
Browse files Browse the repository at this point in the history
  • Loading branch information
etorreborre committed Sep 1, 2014
1 parent c5635b0 commit bf71185
Show file tree
Hide file tree
Showing 8 changed files with 515 additions and 1 deletion.
7 changes: 6 additions & 1 deletion project/dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ object dependencies {
lazy val dependencies = libraryDependencies ++=
scoobi(scalaVersion.value) ++
hadoop(version.value) ++
scalaz() ++
thrift ++
scalaz() ++
specs2()

// Libraries
Expand Down Expand Up @@ -54,6 +55,10 @@ object dependencies {
"org.scalaz" %% "scalaz-typelevel" % scalazVersion intransitive(),
"org.scalaz" %% "scalaz-xml" % scalazVersion intransitive())

val thrift = Seq(
"org.apache.thrift" % "libthrift" % "0.9.1"
)

def specs2(specs2Version: String = "2.4") = Seq(
"org.specs2" %% "specs2-core" % specs2Version % "optional") ++ Seq(
"org.specs2" %% "specs2-mock" % specs2Version ,
Expand Down
47 changes: 47 additions & 0 deletions src/main/scala/com/nicta/scoobi/io/thrift/ThriftSchema.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.nicta.scoobi.io.thrift

import java.io.{DataInput, DataOutput}

import com.nicta.scoobi.Scoobi._
import org.apache.hadoop.io.BytesWritable

/**
* Schema for creating Thrift WireFormat and SeqSchema instances.
*/
object ThriftSchema {

/* WARNING THIS MUST BE A DEF OR OR IT CAN TRIGGER CONCURRENCY ISSUES WITH SHARED THRIFT SERIALIZERS */
def mkThriftFmt[A](implicit m: Manifest[A], ev: A <:< ThriftLike): WireFormat[A] = new WireFormat[A] {
// Call once when the implicit is created to avoid further reflection
val empty = m.runtimeClass.newInstance().asInstanceOf[A]

def toWire(x: A, out: DataOutput) = {
val bytes = ThriftSerialiser().toBytes(x)
out.writeInt(bytes.length)
out.write(bytes)
}

def fromWire(in: DataInput): A = {
val size = in.readInt()
val bytes = new Array[Byte](size)
in.readFully(bytes)
ThriftSerialiser().fromBytes(empty, bytes)
}

override def toString = "ThriftObject"
}

/* WARNING THIS MUST BE A DEF OR OR IT CAN TRIGGER CONCURRENCY ISSUES WITH SHARED THRIFT SERIALIZERS*/
def mkThriftSchema[A](implicit m: Manifest[A], ev: A <:< ThriftLike) = new SeqSchema[A] {
type SeqType = BytesWritable

// Call once when the implicit is created to avoid further reflection
val empty = m.runtimeClass.newInstance().asInstanceOf[A]

def toWritable(x: A) = new BytesWritable(ThriftSerialiser().toBytes(x))

def fromWritable(x: BytesWritable): A = ThriftSerialiser().fromBytes(empty, x.getBytes)

val mf: Manifest[SeqType] = implicitly
}
}
27 changes: 27 additions & 0 deletions src/main/scala/com/nicta/scoobi/io/thrift/ThriftSerialiser.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.nicta.scoobi.io.thrift

import org.apache.thrift.{TDeserializer, TSerializer}
import org.apache.thrift.protocol.TCompactProtocol

/**
* Util for converting a `ThriftLike` object to and from bytes.
*
* WARNING: This class is _not_ threadsafe and should be used with extreme caution!
*
* https://issues.apache.org/jira/browse/THRIFT-2218
*/
case class ThriftSerialiser() {

val serialiser = new TSerializer(new TCompactProtocol.Factory)
val deserialiser = new TDeserializer(new TCompactProtocol.Factory)

def toBytes[A](a: A)(implicit ev: A <:< ThriftLike): Array[Byte] =
serialiser.serialize(ev(a))

def fromBytes[A](empty: A, bytes: Array[Byte])(implicit ev: A <:< ThriftLike): A = {
val e = ev(empty).deepCopy
e.clear()
deserialiser.deserialize(e, bytes)
e.asInstanceOf[A]
}
}
12 changes: 12 additions & 0 deletions src/main/scala/com/nicta/scoobi/io/thrift/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.nicta.scoobi.io

import com.nicta.scoobi.Scoobi._

package object thrift {

type ThriftLike = org.apache.thrift.TBase[_ <: org.apache.thrift.TBase[_, _], _ <: org.apache.thrift.TFieldIdEnum]

implicit def ThriftWireFormat[A](implicit m: Manifest[A], ev: A <:< ThriftLike): WireFormat[A] =ThriftSchema.mkThriftFmt[A]

implicit def ThriftSeqSchema[A](implicit m: Manifest[A], ev: A <:< ThriftLike): SeqSchema[A] = ThriftSchema.mkThriftSchema[A]
}
Loading

0 comments on commit bf71185

Please sign in to comment.