Skip to content

Scala Inference API

leewyang edited this page Feb 23, 2018 · 11 revisions

For users who have existing Spark pipelines written in Scala, we now provide a Scala API for inferencing from previously-trained TensorFlow SavedModels. This API leverages the Tensorflow Java API to provide a Spark ML Pipeline style API, including support for loading TFRecords into a DataFrame and saving DataFrames as TFRecords.

Notes:

  • Requires TensorFlow 1.4 or later.
  • This API is for inferencing only. TensorFlow recommends using their Python API for model training.
  • TensorFlow's Java API is currently marked as "experimental" and is not covered by API stability guarantees, which may impact this API over time.
  • TensorFlow models must be exported as a SavedModel.
  • Due to Spark's lazy-evaluation model, this API currently only supports TensorFlow models which fit in the memory of your executors, i.e. each executor will run inferencing in a single, standalone instance of TensorFlow. In other words, this API is "data-parallel", but not "model-parallel". If you need "model-parallel" inferencing, you can use the TFoS Python API.

Build TensorFlow Ecosystem JAR

First, clone the TensorFlow Ecosystem repository and follow the build instructions for the Hadoop MapReduce InputFormat/OutputFormat for TFRecords. We use this jar as a dependency, but since is not currently available in public Maven repositories, you must mvn install the package into your local machine, so it will be available as a dependency.

Build TFoS Scala Inference API

Next, clone the TFoS repository and build the Scala API:

git clone https://github.com/yahoo/TensorFlowOnSpark.git
cd TensorFlowOnSpark
export TFoS_HOME=$(pwd)
mvn package

# check for the packaged jar file
ls target/tensorflowonspark-*.jar

Train MNIST Model

Follow the Spark Standalone instructions to training an MNIST model on a local Spark Standalone cluster. When done, you should have an mnist_model checkpoint on disk.

Export SavedModel

Next, convert the TensorFlow Checkpoint to a SavedModel. Generally, this would be done by the authors of the algorithm that produced the model checkpoint, but in cases where a SavedModel is not available, we provide a sample utility to do this conversion.

Note: you should have some knowledge of the available input and output tensors of the TensorFlow graph, but if not, you can view all operations of the graph as follows:

python ${TFoS_HOME}/examples/model_export.py --model_dir mnist_model --show

Then, export the SavedModel with a JSON-formatted signature definition:

python ${TFoS_HOME}/examples/model_export.py \
--model_dir mnist_model \
--export_dir mnist_export \
--signatures '{"serving_default": { "inputs": { "img": "x" }, "outputs": { "pred": "prediction" }, "method_name": "predict"}}'

If your signature definition is too unwieldy for the command line, you can also supply the JSON signature in a file:

cat >signatures.json <<EOF
{
  "serving_default": {
    "inputs": {
      "img": "x"
    },
    "outputs": {
      "pred": "prediction"
    },
    "method_name": "predict"
  }
}
EOF

python examples/model_export.py \
--model_dir mnist_model \
--export_dir mnist_export \
--signatures signatures.json

This model will have a single signature called "serving_default", with one input tensor "x" (as alias "img"), and one output tensor "prediction" (as alias "pred").

You can confirm the model by using TensorFlow's saved_model_cli tool as follows:

saved_model_cli show --dir mnist_export --all

Launch Spark Standalone Cluster

If you shut down the cluster after training the MNIST model, start it up again for the following steps:

export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=2
export CORES_PER_WORKER=1 
export TOTAL_CORES=$((${CORES_PER_WORKER}*${SPARK_WORKER_INSTANCES})) 
${SPARK_HOME}/sbin/start-master.sh; ${SPARK_HOME}/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G ${MASTER}

Convert MNIST data to TFRecords

If you haven't already done so, run the following utility to convert the MNIST binary data into TFRecord format.

cd ${TFoS_HOME}
${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
${TFoS_HOME}/examples/mnist/mnist_data_setup.py \
--output examples/mnist/tfr \
--format tfr

ls -lR examples/mnist/tfr

Inference via TFoS Scala API

Now, you can run inferencing on the TFRecords on disk, using the Scala Inference API, as follows:

# rm -Rf ${TFoS_HOME}/predictions; \
time ${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
--class com.yahoo.tensorflowonspark.Inference \
target/tensorflowonspark-1.0-SNAPSHOT.jar \
--export_dir ${TFoS_HOME}/mnist_export \
--input ${TFoS_HOME}/mnist/tfr/test \
--schema_hint 'struct<image:array<float>,label:array<float>>' \
--input_mapping '{"image": "x", "label": "y_"}' \
--output_mapping '{"prediction": "prediction", "Relu": "features"}' \
--output ${TFoS_HOME}/predictions \
--verbose

less ${TFoS_HOME}/predictions/*

Arguments:

  • --export_dir: path to exported SavedModel.
  • --input: path to input TFRecords.
  • --schema-hint: uses Spark's DataFrame.schema.simplestring() format to aid type conversions from TFRecords to DataFrame columns.
  • --input_mapping: a JSON string which maps input DataFrame columns to input Tensors.
  • --output_mapping: a JSON string which maps output Tensors to output DataFrame columns.
  • --output: path to save output DataFrame as JSON.
  • --verbose: invokes DataFrame.show() on the input DataFrame and prints the schema.

Shutdown Standalone Cluster

${SPARK_HOME}/sbin/start-master.sh; ${SPARK_HOME}/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G ${MASTER}

Tensor Names, DataFrame Columns, and Signature Aliases

Finally, you should understand the difference between the various "names" that are used, since it can be confusing.

First, TFRecords have features, which in this example are the "image" and "label" data, consisting of arrays of numbers. When these features are loaded into a Spark DataFrame, the columns will be automatically named after these features. Next, the columns must be mapped to specific tensor names (typically tf.Placeholder ops) within the TensorFlow graph. And after prediction, the output tensor names must be mapped to the output DataFrame columns.

And, to add to the confusion, the TensorFlow SavedModel signatures typically define an alias (our terminology) for a tensor name. This is intended to provide an abstraction to allow the underlying models/operations to evolve over time. Note that the Scala Inference API actually bypasses these signature aliases and maps directly to the underlying tensor names. This has the advantage of allowing you to use and extract tensors that may not have been exposed in the original SavedModel signatures (e.g. the "Relu" output was not exposed in the signature), but it has the disadvantage that you lose the abstraction layer and are tied to the specific implementation of the model.

So, to summarize the mappings:

TFRecord DataFrame Tensor Signature
feature column name alias
Clone this wiki locally