diff --git a/src/main/java/libs/JavaNDArray.java b/src/main/java/libs/JavaNDArray.java index d0f436e..951ae26 100644 --- a/src/main/java/libs/JavaNDArray.java +++ b/src/main/java/libs/JavaNDArray.java @@ -10,7 +10,7 @@ public class JavaNDArray implements java.io.Serializable { private final int[] strides; public JavaNDArray(float[] data, int dim, int[] shape, int offset, int[] strides) { - assert(data.length == JavaNDUtils.arrayProduct(shape)); + // TODO(rkn): check that all of the arguments are consistent with each other assert(shape.length == strides.length); this.data = data; this.dim = dim; diff --git a/src/main/scala/apps/CifarApp.scala b/src/main/scala/apps/CifarApp.scala index 5e924a8..2932fdb 100644 --- a/src/main/scala/apps/CifarApp.scala +++ b/src/main/scala/apps/CifarApp.scala @@ -70,15 +70,13 @@ object CifarApp { val numTestData = testDF.count() logger.log("numTestData = " + numTestData.toString) - val trainPartitionSizes = trainDF.mapPartitions(iter => Array(iter.size).iterator).persist() - val testPartitionSizes = testDF.mapPartitions(iter => Array(iter.size).iterator).persist() - trainPartitionSizes.foreach(size => workerStore.put("trainPartitionSize", size)) - testPartitionSizes.foreach(size => workerStore.put("testPartitionSize", size)) - logger.log("trainPartitionSizes = " + trainPartitionSizes.collect().deep.toString) - logger.log("testPartitionSizes = " + testPartitionSizes.collect().deep.toString) - val workers = sc.parallelize(Array.range(0, numWorkers), numWorkers) + trainDF.foreachPartition(iter => workerStore.put("trainPartitionSize", iter.size)) + testDF.foreachPartition(iter => workerStore.put("testPartitionSize", iter.size)) + logger.log("trainPartitionSizes = " + workers.map(_ => workerStore.get[Int]("trainPartitionSize")).collect().deep.toString) + logger.log("testPartitionSizes = " + workers.map(_ => workerStore.get[Int]("testPartitionSize")).collect().deep.toString) + // initialize nets on workers workers.foreach(_ => { val netParam = new NetParameter() diff --git a/src/main/scala/apps/FeaturizerApp.scala b/src/main/scala/apps/FeaturizerApp.scala new file mode 100644 index 0000000..7923282 --- /dev/null +++ b/src/main/scala/apps/FeaturizerApp.scala @@ -0,0 +1,102 @@ +package apps + +import java.io._ + +import org.apache.spark.SparkContext +import org.apache.spark.SparkConf + +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Row} +import org.bytedeco.javacpp.caffe._ + +import scala.collection.mutable.Map + +import libs._ +import loaders._ +import preprocessing._ + +// For this app to work, $SPARKNET_HOME should be the SparkNet root directory +// and you need to run $SPARKNET_HOME/data/cifar10/get_cifar10.sh. This app +// shows how to use an already trained network to featurize some images. +object FeaturizerApp { + val batchSize = 100 + + val workerStore = new WorkerStore() + + def main(args: Array[String]) { + val conf = new SparkConf() + .setAppName("Featurizer") + .set("spark.driver.maxResultSize", "5G") + .set("spark.task.maxFailures", "1") + // Fetch generic options: they must precede program specific options + var startIx = 0 + for (arg <- args if arg.startsWith("--")) { + if (arg.startsWith("--master=")) { + conf.setMaster(args(0).substring("--master=".length)) + startIx += 1 + } else { + System.err.println(s"Unknown generic option [$arg]") + } + } + val numWorkers = args(startIx).toInt + + val sc = new SparkContext(conf) + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val sparkNetHome = sys.env("SPARKNET_HOME") + val logger = new Logger(sparkNetHome + "/training_log_" + System.currentTimeMillis().toString + ".txt") + + val loader = new CifarLoader(sparkNetHome + "/data/cifar10/") + logger.log("loading data") + var trainRDD = sc.parallelize(loader.trainImages.zip(loader.trainLabels)) + + // convert to dataframes + val schema = StructType(StructField("data", ArrayType(FloatType), false) :: StructField("label", IntegerType, false) :: Nil) + var trainDF = sqlContext.createDataFrame(trainRDD.map{ case (a, b) => Row(a, b)}, schema) + + logger.log("repartition data") + trainDF = trainDF.repartition(numWorkers).cache() + + val workers = sc.parallelize(Array.range(0, numWorkers), numWorkers) + + trainDF.foreachPartition(iter => workerStore.put("trainPartitionSize", iter.size)) + + // initialize nets on workers + workers.foreach(_ => { + val netParam = new NetParameter() + ReadProtoFromTextFileOrDie(sparkNetHome + "/models/cifar10/cifar10_quick_train_test.prototxt", netParam) + val net = CaffeNet(netParam, schema, new DefaultPreprocessor(schema)) + + // Caffe.set_mode(Caffe.GPU) + workerStore.put("netParam", netParam) // prevent netParam from being garbage collected + workerStore.put("net", net) // prevent net from being garbage collected + }) + + // initialize weights on master + var netWeights = workers.map(_ => workerStore.get[CaffeNet]("net").getWeights()).collect()(0) // alternatively, load weights from a .caffemodel file + logger.log("broadcasting weights") + val broadcastWeights = sc.broadcast(netWeights) + logger.log("setting weights on workers") + workers.foreach(_ => workerStore.get[CaffeNet]("net").setWeights(broadcastWeights.value)) + + // featurize the images + val featurizedDF = trainDF.mapPartitions( it => { + val trainPartitionSize = workerStore.get[Int]("trainPartitionSize") + val numTrainBatches = trainPartitionSize / batchSize + val featurizedData = new Array[Array[Float]](trainPartitionSize) + val input = new Array[Row](batchSize) + var i = 0 + var out = None: Option[Map[String, NDArray]] + while (i < trainPartitionSize) { + if (i % batchSize == 0) { + it.copyToArray(input, 0, batchSize) + out = Some(workerStore.get[CaffeNet]("net").forward(input.iterator, List("ip1"))) + } + featurizedData(i) = out.get("ip1").slice(0, i % batchSize).toFlat() + i += 1 + } + featurizedData.iterator + }) + + logger.log("featurized " + featurizedDF.count().toString + " images") + } +} diff --git a/src/main/scala/apps/ImageNetApp.scala b/src/main/scala/apps/ImageNetApp.scala index 9e4436e..fd6a060 100644 --- a/src/main/scala/apps/ImageNetApp.scala +++ b/src/main/scala/apps/ImageNetApp.scala @@ -71,15 +71,13 @@ object ImageNetApp { trainDF = trainDF.coalesce(numWorkers) testDF = testDF.coalesce(numWorkers) - val trainPartitionSizes = trainDF.mapPartitions(iter => Array(iter.size).iterator).persist() - val testPartitionSizes = testDF.mapPartitions(iter => Array(iter.size).iterator).persist() - trainPartitionSizes.foreach(size => workerStore.put("trainPartitionSize", size)) - testPartitionSizes.foreach(size => workerStore.put("testPartitionSize", size)) - logger.log("trainPartitionSizes = " + trainPartitionSizes.collect().deep.toString) - logger.log("testPartitionSizes = " + testPartitionSizes.collect().deep.toString) - val workers = sc.parallelize(Array.range(0, numWorkers), numWorkers) + trainDF.foreachPartition(iter => workerStore.put("trainPartitionSize", iter.size)) + testDF.foreachPartition(iter => workerStore.put("testPartitionSize", iter.size)) + logger.log("trainPartitionSizes = " + workers.map(_ => workerStore.get[Int]("trainPartitionSize")).collect().deep.toString) + logger.log("testPartitionSizes = " + workers.map(_ => workerStore.get[Int]("testPartitionSize")).collect().deep.toString) + // initialize nets on workers workers.foreach(_ => { val netParam = new NetParameter() diff --git a/src/main/scala/libs/CaffeNet.scala b/src/main/scala/libs/CaffeNet.scala index 1e2e283..f3041a1 100644 --- a/src/main/scala/libs/CaffeNet.scala +++ b/src/main/scala/libs/CaffeNet.scala @@ -41,7 +41,7 @@ class CaffeNet(netParam: NetParameter, schema: StructType, preprocessor: Preproc for (i <- 0 to inputSize - 1) { val name = netParam.input(i).getString - transformations(i) = preprocessor.convert(name, getInputShape(i)) + transformations(i) = preprocessor.convert(name, JavaCPPUtils.getInputShape(netParam, i).drop(1)) // drop first index to ignore batchSize inputIndices(i) = columnNames.indexOf(name) } @@ -60,11 +60,11 @@ class CaffeNet(netParam: NetParameter, schema: StructType, preprocessor: Preproc val inputBuffer = new Array[Array[Float]](inputSize) val inputBufferSize = new Array[Int](inputSize) for (i <- 0 to inputSize - 1) { - inputBufferSize(i) = getInputShape(i).product + inputBufferSize(i) = JavaCPPUtils.getInputShape(netParam, i).drop(1).product // drop 1 to ignore batchSize inputBuffer(i) = new Array[Float](inputBufferSize(i) * batchSize) } - def transformInto(iterator: Iterator[Row], data: FloatBlobVector): Unit = { + def transformInto(iterator: Iterator[Row], data: FloatBlobVector) = { var batchIndex = 0 while (iterator.hasNext && batchIndex != batchSize) { val row = iterator.next @@ -82,7 +82,7 @@ class CaffeNet(netParam: NetParameter, schema: StructType, preprocessor: Preproc } } - def forward(rowIt: Iterator[Row]): Map[String, NDArray] = { + def forward(rowIt: Iterator[Row], dataBlobNames: List[String] = List[String]()): Map[String, NDArray] = { // Caffe.set_mode(Caffe.GPU) transformInto(rowIt, inputs) val tops = caffeNet.Forward(inputs) @@ -95,6 +95,13 @@ class CaffeNet(netParam: NetParameter, schema: StructType, preprocessor: Preproc top.cpu_data().get(output, 0, shape.product) outputs += (outputName -> NDArray(output, shape)) } + for (name <- dataBlobNames) { + val floatBlob = caffeNet.blob_by_name(name) + if (floatBlob == null) { + throw new IllegalArgumentException("The net does not have a layer named " + name + ".\n") + } + outputs += (name -> JavaCPPUtils.floatBlobToNDArray(floatBlob)) + } return outputs } @@ -116,7 +123,7 @@ class CaffeNet(netParam: NetParameter, schema: StructType, preprocessor: Preproc val weightList = MutableList[NDArray]() for (j <- 0 to numLayerBlobs(i) - 1) { val blob = caffeNet.layers().get(i).blobs().get(j) - val shape = getShape(blob) + val shape = JavaCPPUtils.getFloatBlobShape(blob) val data = new Array[Float](shape.product) blob.cpu_data.get(data, 0, data.length) weightList += NDArray(data, shape) @@ -131,7 +138,7 @@ class CaffeNet(netParam: NetParameter, schema: StructType, preprocessor: Preproc for (i <- 0 to numLayers - 1) { for (j <- 0 to numLayerBlobs(i) - 1) { val blob = caffeNet.layers().get(i).blobs().get(j) - val shape = getShape(blob) + val shape = JavaCPPUtils.getFloatBlobShape(blob) assert(shape.deep == weights.allWeights(layerNames(i))(j).shape.deep) // check that weights are the correct shape val flatWeights = weights.allWeights(layerNames(i))(j).toFlat() // this allocation can be avoided blob.cpu_data.put(flatWeights, 0, flatWeights.length) @@ -139,24 +146,6 @@ class CaffeNet(netParam: NetParameter, schema: StructType, preprocessor: Preproc } } - private def getShape(blob: FloatBlob): Array[Int] = { - val numAxes = blob.num_axes() - val shape = new Array[Int](numAxes) - for (k <- 0 to numAxes - 1) { - shape(k) = blob.shape(k) - } - return shape - } - - private def getInputShape(i: Int): Array[Int] = { - val numAxes = netParam.input_shape(i).dim_size - 1 - val shape = new Array[Int](numAxes) - for (j <- 0 to numAxes - 1) { - shape(j) = netParam.input_shape(i).dim(j + 1).toInt - } - return shape - } - def outputSchema(): StructType = { val fields = Array.range(0, numOutputs).map(i => { val output = caffeNet.blob_names().get(caffeNet.output_blob_indices().get(i)).getString diff --git a/src/main/scala/libs/JavaCPPUtils.scala b/src/main/scala/libs/JavaCPPUtils.scala new file mode 100644 index 0000000..1be8e0f --- /dev/null +++ b/src/main/scala/libs/JavaCPPUtils.scala @@ -0,0 +1,38 @@ +package libs + +import org.bytedeco.javacpp._ +import org.bytedeco.javacpp.caffe._ + +object JavaCPPUtils { + def floatBlobToNDArray(floatBlob: FloatBlob): NDArray = { + val shape = getFloatBlobShape(floatBlob) + val data = new Array[Float](shape.product) + val pointer = floatBlob.cpu_data + var i = 0 + while (i < shape.product) { + data(i) = pointer.get(i) + i += 1 + } + NDArray(data, shape) + } + + def getFloatBlobShape(floatBlob: FloatBlob): Array[Int] = { + val numAxes = floatBlob.num_axes() + val shape = new Array[Int](numAxes) + for (k <- 0 to numAxes - 1) { + shape(k) = floatBlob.shape.get(k) + } + shape + } + + def getInputShape(netParam: NetParameter, i: Int): Array[Int] = { + val numAxes = netParam.input_shape(i).dim_size + val shape = new Array[Int](numAxes) + for (j <- 0 to numAxes - 1) { + shape(j) = netParam.input_shape(i).dim(j).toInt + } + shape + } + + +} diff --git a/src/main/scala/libs/NDArray.scala b/src/main/scala/libs/NDArray.scala index 0af278a..e333bcc 100644 --- a/src/main/scala/libs/NDArray.scala +++ b/src/main/scala/libs/NDArray.scala @@ -51,6 +51,9 @@ class NDArray private(val javaArray: JavaNDArray) extends java.io.Serializable { object NDArray { def apply(data: Array[Float], shape: Array[Int]) = { + if (data.length != shape.product) { + throw new IllegalArgumentException("The data and shape arguments are not compatible, data.length = " + data.length.toString + " and shape = " + shape.deep + ".\n") + } new NDArray(new JavaNDArray(data, shape:_*)) } diff --git a/src/test/scala/libs/PreprocessorSpec.scala b/src/test/scala/libs/PreprocessorSpec.scala index 377b2df..af466b8 100644 --- a/src/test/scala/libs/PreprocessorSpec.scala +++ b/src/test/scala/libs/PreprocessorSpec.scala @@ -61,7 +61,7 @@ class PreprocessorSpec extends FlatSpec with BeforeAndAfterAll { case (t, v) => { val schema = StructType(StructField("x", t, false) :: Nil) val preprocessor = new DefaultPreprocessor(schema) - val convert = preprocessor.convert("x", Array[Int](1, 3)) + val convert = preprocessor.convert("x", Array[Int](256, 256)) var x = Row(v) val df = sqlContext.createDataFrame(sc.parallelize(Array(x)), schema) val startTime = System.currentTimeMillis()