Skip to content

Commit

Permalink
featurizer app + refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
robertnishihara committed Feb 21, 2016
1 parent 00d2710 commit c130de3
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/main/java/libs/JavaNDArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class JavaNDArray implements java.io.Serializable {
private final int[] strides;

public JavaNDArray(float[] data, int dim, int[] shape, int offset, int[] strides) {
assert(data.length == JavaNDUtils.arrayProduct(shape));
// TODO(rkn): check that all of the arguments are consistent with each other
assert(shape.length == strides.length);
this.data = data;
this.dim = dim;
Expand Down
12 changes: 5 additions & 7 deletions src/main/scala/apps/CifarApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,13 @@ object CifarApp {
val numTestData = testDF.count()
logger.log("numTestData = " + numTestData.toString)

val trainPartitionSizes = trainDF.mapPartitions(iter => Array(iter.size).iterator).persist()
val testPartitionSizes = testDF.mapPartitions(iter => Array(iter.size).iterator).persist()
trainPartitionSizes.foreach(size => workerStore.put("trainPartitionSize", size))
testPartitionSizes.foreach(size => workerStore.put("testPartitionSize", size))
logger.log("trainPartitionSizes = " + trainPartitionSizes.collect().deep.toString)
logger.log("testPartitionSizes = " + testPartitionSizes.collect().deep.toString)

val workers = sc.parallelize(Array.range(0, numWorkers), numWorkers)

trainDF.foreachPartition(iter => workerStore.put("trainPartitionSize", iter.size))
testDF.foreachPartition(iter => workerStore.put("testPartitionSize", iter.size))
logger.log("trainPartitionSizes = " + workers.map(_ => workerStore.get[Int]("trainPartitionSize")).collect().deep.toString)
logger.log("testPartitionSizes = " + workers.map(_ => workerStore.get[Int]("testPartitionSize")).collect().deep.toString)

// initialize nets on workers
workers.foreach(_ => {
val netParam = new NetParameter()
Expand Down
102 changes: 102 additions & 0 deletions src/main/scala/apps/FeaturizerApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package apps

import java.io._

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}
import org.bytedeco.javacpp.caffe._

import scala.collection.mutable.Map

import libs._
import loaders._
import preprocessing._

// For this app to work, $SPARKNET_HOME should be the SparkNet root directory
// and you need to run $SPARKNET_HOME/data/cifar10/get_cifar10.sh. This app
// shows how to use an already trained network to featurize some images.
object FeaturizerApp {
val batchSize = 100

val workerStore = new WorkerStore()

def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("Featurizer")
.set("spark.driver.maxResultSize", "5G")
.set("spark.task.maxFailures", "1")
// Fetch generic options: they must precede program specific options
var startIx = 0
for (arg <- args if arg.startsWith("--")) {
if (arg.startsWith("--master=")) {
conf.setMaster(args(0).substring("--master=".length))
startIx += 1
} else {
System.err.println(s"Unknown generic option [$arg]")
}
}
val numWorkers = args(startIx).toInt

val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val sparkNetHome = sys.env("SPARKNET_HOME")
val logger = new Logger(sparkNetHome + "/training_log_" + System.currentTimeMillis().toString + ".txt")

val loader = new CifarLoader(sparkNetHome + "/data/cifar10/")
logger.log("loading data")
var trainRDD = sc.parallelize(loader.trainImages.zip(loader.trainLabels))

// convert to dataframes
val schema = StructType(StructField("data", ArrayType(FloatType), false) :: StructField("label", IntegerType, false) :: Nil)
var trainDF = sqlContext.createDataFrame(trainRDD.map{ case (a, b) => Row(a, b)}, schema)

logger.log("repartition data")
trainDF = trainDF.repartition(numWorkers).cache()

val workers = sc.parallelize(Array.range(0, numWorkers), numWorkers)

trainDF.foreachPartition(iter => workerStore.put("trainPartitionSize", iter.size))

// initialize nets on workers
workers.foreach(_ => {
val netParam = new NetParameter()
ReadProtoFromTextFileOrDie(sparkNetHome + "/models/cifar10/cifar10_quick_train_test.prototxt", netParam)
val net = CaffeNet(netParam, schema, new DefaultPreprocessor(schema))

// Caffe.set_mode(Caffe.GPU)
workerStore.put("netParam", netParam) // prevent netParam from being garbage collected
workerStore.put("net", net) // prevent net from being garbage collected
})

// initialize weights on master
var netWeights = workers.map(_ => workerStore.get[CaffeNet]("net").getWeights()).collect()(0) // alternatively, load weights from a .caffemodel file
logger.log("broadcasting weights")
val broadcastWeights = sc.broadcast(netWeights)
logger.log("setting weights on workers")
workers.foreach(_ => workerStore.get[CaffeNet]("net").setWeights(broadcastWeights.value))

// featurize the images
val featurizedDF = trainDF.mapPartitions( it => {
val trainPartitionSize = workerStore.get[Int]("trainPartitionSize")
val numTrainBatches = trainPartitionSize / batchSize
val featurizedData = new Array[Array[Float]](trainPartitionSize)
val input = new Array[Row](batchSize)
var i = 0
var out = None: Option[Map[String, NDArray]]
while (i < trainPartitionSize) {
if (i % batchSize == 0) {
it.copyToArray(input, 0, batchSize)
out = Some(workerStore.get[CaffeNet]("net").forward(input.iterator, List("ip1")))
}
featurizedData(i) = out.get("ip1").slice(0, i % batchSize).toFlat()
i += 1
}
featurizedData.iterator
})

logger.log("featurized " + featurizedDF.count().toString + " images")
}
}
12 changes: 5 additions & 7 deletions src/main/scala/apps/ImageNetApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,13 @@ object ImageNetApp {
trainDF = trainDF.coalesce(numWorkers)
testDF = testDF.coalesce(numWorkers)

val trainPartitionSizes = trainDF.mapPartitions(iter => Array(iter.size).iterator).persist()
val testPartitionSizes = testDF.mapPartitions(iter => Array(iter.size).iterator).persist()
trainPartitionSizes.foreach(size => workerStore.put("trainPartitionSize", size))
testPartitionSizes.foreach(size => workerStore.put("testPartitionSize", size))
logger.log("trainPartitionSizes = " + trainPartitionSizes.collect().deep.toString)
logger.log("testPartitionSizes = " + testPartitionSizes.collect().deep.toString)

val workers = sc.parallelize(Array.range(0, numWorkers), numWorkers)

trainDF.foreachPartition(iter => workerStore.put("trainPartitionSize", iter.size))
testDF.foreachPartition(iter => workerStore.put("testPartitionSize", iter.size))
logger.log("trainPartitionSizes = " + workers.map(_ => workerStore.get[Int]("trainPartitionSize")).collect().deep.toString)
logger.log("testPartitionSizes = " + workers.map(_ => workerStore.get[Int]("testPartitionSize")).collect().deep.toString)

// initialize nets on workers
workers.foreach(_ => {
val netParam = new NetParameter()
Expand Down
37 changes: 13 additions & 24 deletions src/main/scala/libs/CaffeNet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CaffeNet(netParam: NetParameter, schema: StructType, preprocessor: Preproc

for (i <- 0 to inputSize - 1) {
val name = netParam.input(i).getString
transformations(i) = preprocessor.convert(name, getInputShape(i))
transformations(i) = preprocessor.convert(name, JavaCPPUtils.getInputShape(netParam, i).drop(1)) // drop first index to ignore batchSize
inputIndices(i) = columnNames.indexOf(name)
}

Expand All @@ -60,11 +60,11 @@ class CaffeNet(netParam: NetParameter, schema: StructType, preprocessor: Preproc
val inputBuffer = new Array[Array[Float]](inputSize)
val inputBufferSize = new Array[Int](inputSize)
for (i <- 0 to inputSize - 1) {
inputBufferSize(i) = getInputShape(i).product
inputBufferSize(i) = JavaCPPUtils.getInputShape(netParam, i).drop(1).product // drop 1 to ignore batchSize
inputBuffer(i) = new Array[Float](inputBufferSize(i) * batchSize)
}

def transformInto(iterator: Iterator[Row], data: FloatBlobVector): Unit = {
def transformInto(iterator: Iterator[Row], data: FloatBlobVector) = {
var batchIndex = 0
while (iterator.hasNext && batchIndex != batchSize) {
val row = iterator.next
Expand All @@ -82,7 +82,7 @@ class CaffeNet(netParam: NetParameter, schema: StructType, preprocessor: Preproc
}
}

def forward(rowIt: Iterator[Row]): Map[String, NDArray] = {
def forward(rowIt: Iterator[Row], dataBlobNames: List[String] = List[String]()): Map[String, NDArray] = {
// Caffe.set_mode(Caffe.GPU)
transformInto(rowIt, inputs)
val tops = caffeNet.Forward(inputs)
Expand All @@ -95,6 +95,13 @@ class CaffeNet(netParam: NetParameter, schema: StructType, preprocessor: Preproc
top.cpu_data().get(output, 0, shape.product)
outputs += (outputName -> NDArray(output, shape))
}
for (name <- dataBlobNames) {
val floatBlob = caffeNet.blob_by_name(name)
if (floatBlob == null) {
throw new IllegalArgumentException("The net does not have a layer named " + name + ".\n")
}
outputs += (name -> JavaCPPUtils.floatBlobToNDArray(floatBlob))
}
return outputs
}

Expand All @@ -116,7 +123,7 @@ class CaffeNet(netParam: NetParameter, schema: StructType, preprocessor: Preproc
val weightList = MutableList[NDArray]()
for (j <- 0 to numLayerBlobs(i) - 1) {
val blob = caffeNet.layers().get(i).blobs().get(j)
val shape = getShape(blob)
val shape = JavaCPPUtils.getFloatBlobShape(blob)
val data = new Array[Float](shape.product)
blob.cpu_data.get(data, 0, data.length)
weightList += NDArray(data, shape)
Expand All @@ -131,32 +138,14 @@ class CaffeNet(netParam: NetParameter, schema: StructType, preprocessor: Preproc
for (i <- 0 to numLayers - 1) {
for (j <- 0 to numLayerBlobs(i) - 1) {
val blob = caffeNet.layers().get(i).blobs().get(j)
val shape = getShape(blob)
val shape = JavaCPPUtils.getFloatBlobShape(blob)
assert(shape.deep == weights.allWeights(layerNames(i))(j).shape.deep) // check that weights are the correct shape
val flatWeights = weights.allWeights(layerNames(i))(j).toFlat() // this allocation can be avoided
blob.cpu_data.put(flatWeights, 0, flatWeights.length)
}
}
}

private def getShape(blob: FloatBlob): Array[Int] = {
val numAxes = blob.num_axes()
val shape = new Array[Int](numAxes)
for (k <- 0 to numAxes - 1) {
shape(k) = blob.shape(k)
}
return shape
}

private def getInputShape(i: Int): Array[Int] = {
val numAxes = netParam.input_shape(i).dim_size - 1
val shape = new Array[Int](numAxes)
for (j <- 0 to numAxes - 1) {
shape(j) = netParam.input_shape(i).dim(j + 1).toInt
}
return shape
}

def outputSchema(): StructType = {
val fields = Array.range(0, numOutputs).map(i => {
val output = caffeNet.blob_names().get(caffeNet.output_blob_indices().get(i)).getString
Expand Down
38 changes: 38 additions & 0 deletions src/main/scala/libs/JavaCPPUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package libs

import org.bytedeco.javacpp._
import org.bytedeco.javacpp.caffe._

object JavaCPPUtils {
def floatBlobToNDArray(floatBlob: FloatBlob): NDArray = {
val shape = getFloatBlobShape(floatBlob)
val data = new Array[Float](shape.product)
val pointer = floatBlob.cpu_data
var i = 0
while (i < shape.product) {
data(i) = pointer.get(i)
i += 1
}
NDArray(data, shape)
}

def getFloatBlobShape(floatBlob: FloatBlob): Array[Int] = {
val numAxes = floatBlob.num_axes()
val shape = new Array[Int](numAxes)
for (k <- 0 to numAxes - 1) {
shape(k) = floatBlob.shape.get(k)
}
shape
}

def getInputShape(netParam: NetParameter, i: Int): Array[Int] = {
val numAxes = netParam.input_shape(i).dim_size
val shape = new Array[Int](numAxes)
for (j <- 0 to numAxes - 1) {
shape(j) = netParam.input_shape(i).dim(j).toInt
}
shape
}


}
3 changes: 3 additions & 0 deletions src/main/scala/libs/NDArray.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class NDArray private(val javaArray: JavaNDArray) extends java.io.Serializable {

object NDArray {
def apply(data: Array[Float], shape: Array[Int]) = {
if (data.length != shape.product) {
throw new IllegalArgumentException("The data and shape arguments are not compatible, data.length = " + data.length.toString + " and shape = " + shape.deep + ".\n")
}
new NDArray(new JavaNDArray(data, shape:_*))
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/libs/PreprocessorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class PreprocessorSpec extends FlatSpec with BeforeAndAfterAll {
case (t, v) => {
val schema = StructType(StructField("x", t, false) :: Nil)
val preprocessor = new DefaultPreprocessor(schema)
val convert = preprocessor.convert("x", Array[Int](1, 3))
val convert = preprocessor.convert("x", Array[Int](256, 256))
var x = Row(v)
val df = sqlContext.createDataFrame(sc.parallelize(Array(x)), schema)
val startTime = System.currentTimeMillis()
Expand Down

0 comments on commit c130de3

Please sign in to comment.