Skip to content

Commit

Permalink
Add serializers/desirializers for models
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Jun 3, 2017
1 parent 7f6875e commit 28078f7
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.Properties

import com.lightbend.kafka.ModelServingConfiguration
import com.lightbend.model.winerecord.WineRecord
import com.lightbend.modelServer.model.{PMMLModelSerializerKryo, TensorFlowModel, TensorFlowModelSerializerKryo}
import com.lightbend.modelServer.typeschema.ByteArraySchema
import org.apache.flink.api.scala._
import org.apache.flink.configuration.{ConfigConstants, Configuration, QueryableStateOptions}
Expand Down Expand Up @@ -80,6 +81,10 @@ object ModelServingFlatJob {
def buildGraph(env : StreamExecutionEnvironment) : Unit = {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(5000)
// Add custom serializers
env.getConfig.addDefaultKryoSerializer(TensorFlowModel.getClass, classOf[TensorFlowModelSerializerKryo])
env.getConfig.addDefaultKryoSerializer(TensorFlowModel.getClass, classOf[PMMLModelSerializerKryo])


// configure Kafka consumer
// Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import java.util.Properties

import com.lightbend.kafka.ModelServingConfiguration
import com.lightbend.model.winerecord.WineRecord
import com.lightbend.modelServer.model.{PMMLModelSerializerKryo, TensorFlowModel, TensorFlowModelSerializerKryo}
import org.apache.flink.api.scala._
import com.lightbend.modelServer.typeschema.ByteArraySchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

import org.apache.flink.configuration.Configuration
import org.apache.flink.configuration.ConfigConstants
import org.apache.flink.configuration.QueryableStateOptions
Expand Down Expand Up @@ -84,6 +84,9 @@ object ModelServingKeyedJob {
def buildGraph(env : StreamExecutionEnvironment) : Unit = {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(5000)
// Add custom serializer
env.getConfig.addDefaultKryoSerializer(TensorFlowModel.getClass, classOf[TensorFlowModelSerializerKryo])
env.getConfig.addDefaultKryoSerializer(TensorFlowModel.getClass, classOf[PMMLModelSerializerKryo])

// configure Kafka consumer
// Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ package com.lightbend.modelServer.model
import org.jpmml.evaluator.{FieldValue, ModelEvaluatorFactory, TargetField}
import org.jpmml.evaluator.visitors._
import org.jpmml.model.PMMLUtil
import java.io.{ByteArrayInputStream, InputStream}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream}

import org.jpmml.evaluator.Computable
import com.lightbend.model.winerecord.WineRecord
Expand Down Expand Up @@ -67,6 +67,12 @@ class PMMLModel(inputStream: Array[Byte]) extends Model {
case _ => .0
}

def toBytes : Array[Byte] = {
var stream = new ByteArrayOutputStream()
PMMLUtil.marshal(pmml, stream)
stream.toByteArray
}

}

object PMMLModel{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.lightbend.modelServer.model

/**
* Created by boris on 6/2/17.
*/
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}


class PMMLModelSerializerKryo extends Serializer[PMMLModel]{

super.setAcceptsNull(false)
super.setImmutable(true)

/** Reads bytes and returns a new object of the specified concrete type.
* <p>
* Before Kryo can be used to read child objects, {@link Kryo#reference(Object)} must be called with the parent object to
* ensure it can be referenced by the child objects. Any serializer that uses {@link Kryo} to read a child object may need to
* be reentrant.
* <p>
* This method should not be called directly, instead this serializer can be passed to {@link Kryo} read methods that accept a
* serialier.
*
* @return May be null if { @link #getAcceptsNull()} is true. */

override def read(kryo: Kryo, input: Input, `type`: Class[PMMLModel]): PMMLModel = {
val bytes = Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
PMMLModel(bytes).get
}

/** Writes the bytes for the object to the output.
* <p>
* This method should not be called directly, instead this serializer can be passed to {@link Kryo} write methods that accept a
* serialier.
*
* @param value May be null if { @link #getAcceptsNull()} is true. */

override def write(kryo: Kryo, output: Output, value: PMMLModel): Unit = {
output.write(value.toBytes)
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.lightbend.modelServer.model

import java.io.InputStream

import com.lightbend.model.winerecord.WineRecord
import org.tensorflow.{Graph, Session, Tensor}
Expand All @@ -9,11 +8,12 @@ import org.tensorflow.{Graph, Session, Tensor}
* Created by boris on 5/26/17.
* Implementation of tensorflow model
*/
class TensorFlowModel(inputStream: Array[Byte]) extends Model {

class TensorFlowModel(inputStream : Array[Byte]) extends Model {

val graph = new Graph
graph.importGraphDef(inputStream)
val session = new Session (graph)
val session = new Session(graph)

override def score(input: AnyVal): AnyVal = {

Expand Down Expand Up @@ -45,8 +45,16 @@ class TensorFlowModel(inputStream: Array[Byte]) extends Model {
}

override def cleanup(): Unit = {
session.close
graph.close
try{
session.close
}catch {
case t: Throwable => // Swallow
}
try{
graph.close
}catch {
case t: Throwable => // Swallow
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.lightbend.modelServer.model

/**
* Created by boris on 6/2/17.
*/
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Input, Output}


class TensorFlowModelSerializerKryo extends Serializer[TensorFlowModel]{

super.setAcceptsNull(false)
super.setImmutable(true)

/** Reads bytes and returns a new object of the specified concrete type.
* <p>
* Before Kryo can be used to read child objects, {@link Kryo#reference(Object)} must be called with the parent object to
* ensure it can be referenced by the child objects. Any serializer that uses {@link Kryo} to read a child object may need to
* be reentrant.
* <p>
* This method should not be called directly, instead this serializer can be passed to {@link Kryo} read methods that accept a
* serialier.
*
* @return May be null if { @link #getAcceptsNull()} is true. */

override def read(kryo: Kryo, input: Input, `type`: Class[TensorFlowModel]): TensorFlowModel = {
val bytes = Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
TensorFlowModel(bytes).get
}

/** Writes the bytes for the object to the output.
* <p>
* This method should not be called directly, instead this serializer can be passed to {@link Kryo} write methods that accept a
* serialier.
*
* @param value May be null if { @link #getAcceptsNull()} is true. */

override def write(kryo: Kryo, output: Output, value: TensorFlowModel): Unit = {
output.write(value.graph.toGraphDef)
}
}

0 comments on commit 28078f7

Please sign in to comment.