-
Notifications
You must be signed in to change notification settings - Fork 940
Scala Inference API
For users who have existing Spark pipelines written in Scala, we now provide a Scala API for inferencing from previously-trained TensorFlow SavedModels. This API leverages the Tensorflow Java API to provide a Spark ML Pipeline style API, including support for loading TFRecords into a DataFrame and saving DataFrames as TFRecords.
Notes:
- Requires TensorFlow 1.4 or later.
- This API is for inferencing only. TensorFlow recommends using their Python API for model training.
- TensorFlow's Java API is currently marked as "experimental" and is not covered by API stability guarantees, which may impact this API over time.
- TensorFlow models must be exported as a SavedModel.
- Generally, the TensorFlow Java API version in the
pom.xml
should be equal to or greater than the TensorFlow Python version that generated the SavedModel. - Due to Spark's lazy-evaluation model, this API currently only supports TensorFlow models which fit in the memory of your executors, i.e. each executor will run inferencing in a single, standalone instance of TensorFlow. In other words, this API is "data-parallel", but not "model-parallel". If you need "model-parallel" inferencing, you can use the TFoS Python API.
First, clone the TensorFlow Ecosystem repository and follow the build instructions for the Hadoop MapReduce InputFormat/OutputFormat for TFRecords. We use this jar as a dependency, but since is not currently available in public Maven repositories, you must mvn install
the package into your local machine, so it will be available as a dependency.
cd <path_to_ecosystem>
export ECOSYSTEM_HOME=$(pwd)
Next, clone the TFoS repository and build the Scala API:
git clone https://github.com/yahoo/TensorFlowOnSpark.git
cd TensorFlowOnSpark
export TFoS_HOME=$(pwd)
mvn package
# check for the packaged jar file
ls target/tensorflowonspark-*.jar
Follow the Spark Standalone instructions to training an MNIST model on a local Spark Standalone cluster. When done, you should have an mnist_model
checkpoint on disk.
Next, convert the TensorFlow Checkpoint to a SavedModel. Generally, this would be done by the authors of the algorithm that produced the model checkpoint, but in cases where a SavedModel is not available, we provide a sample utility to do this conversion.
Note: you should have some knowledge of the available input and output tensors of the TensorFlow graph, but if not, you can view all operations of the graph as follows:
python ${TFoS_HOME}/examples/model_export.py --model_dir mnist_model --show
Then, export the SavedModel with a JSON-formatted signature definition:
python ${TFoS_HOME}/examples/model_export.py \
--model_dir mnist_model \
--export_dir mnist_export \
--signatures '{"serving_default": { "inputs": { "img": "inputs/x" }, "outputs": { "pred": "prediction" }, "method_name": "predict"}}'
If your signature definition is too unwieldy for the command line, you can also supply the JSON signature in a file:
cat >signatures.json <<EOF
{
"serving_default": {
"inputs": {
"img": "inputs/x"
},
"outputs": {
"pred": "prediction"
},
"method_name": "predict"
}
}
EOF
python examples/model_export.py \
--model_dir mnist_model \
--export_dir mnist_export \
--signatures signatures.json
This model will have a single signature called "serving_default", with one input tensor "x" (as alias "img"), and one output tensor "prediction" (as alias "pred").
You can confirm the model by using TensorFlow's saved_model_cli
tool as follows:
saved_model_cli show --dir mnist_export --all
If you had shut down the cluster after training the MNIST model, start it up again:
export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=2
export CORES_PER_WORKER=1
export TOTAL_CORES=$((${CORES_PER_WORKER}*${SPARK_WORKER_INSTANCES}))
${SPARK_HOME}/sbin/start-master.sh; ${SPARK_HOME}/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G ${MASTER}
If you haven't already done so, run the following utility to convert the MNIST binary data into TFRecord format.
cd ${TFoS_HOME}
${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--jars ${ECOSYSTEM_HOME}/hadoop/target/tensorflow-hadoop-1.0-SNAPSHOT.jar \
${TFoS_HOME}/examples/mnist/mnist_data_setup.py \
--output ${TFoS_HOME}/examples/mnist/tfr \
--format tfr
ls -lR examples/mnist/tfr
Now, you can run inferencing on the TFRecords on disk, using the Scala Inference API, as follows:
# rm -Rf ${TFoS_HOME}/predictions; \
${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
--class com.yahoo.tensorflowonspark.Inference \
target/tensorflowonspark-1.0-SNAPSHOT.jar \
--export_dir ${TFoS_HOME}/mnist_export \
--input ${TFoS_HOME}/mnist/tfr/test \
--schema_hint 'struct<image:array<float>,label:array<float>>' \
--input_mapping '{"image": "inputs/x", "label": "inputs/y_"}' \
--output_mapping '{"prediction": "prediction", "layer/hidden_layer/Relu": "features"}' \
--output ${TFoS_HOME}/predictions \
--verbose
less ${TFoS_HOME}/predictions/part-*
Arguments:
-
--export_dir
: path to exported SavedModel. -
--input
: path to input TFRecords. -
--schema-hint
: uses Spark'sDataFrame.schema.simplestring()
format to aid type conversions from TFRecords to DataFrame columns. -
--input_mapping
: a JSON string which maps input DataFrame columns to input Tensors. -
--output_mapping
: a JSON string which maps output Tensors to output DataFrame columns. -
--output
: path to save output DataFrame as JSON. -
--verbose
: invokesDataFrame.show()
on the input DataFrame and prints the schema.
${SPARK_HOME}/sbin/stop-slave.sh; ${SPARK_HOME}/sbin/stop-master.sh
Finally, you should understand the difference between the various "names" that are used, since it can be confusing.
First, TFRecords have features, which in this example are the "image" and "label" data, consisting of arrays of numbers. When these features are loaded into a Spark DataFrame, the columns will be automatically named after these features. Next, the columns must be mapped to specific tensor names (typically tf.Placeholder
ops) within the TensorFlow graph. And after prediction, the output tensor names must be mapped to the output DataFrame columns.
And, to add to the confusion, the TensorFlow SavedModel signatures typically define an alias (our terminology) for a tensor name. This is intended to provide an abstraction to allow the underlying models/operations to evolve over time. Note that the Scala Inference API actually bypasses these signature aliases and maps directly to the underlying tensor names. This has the advantage of allowing you to use and extract tensors that may not have been exposed in the original SavedModel signatures (e.g. the "Relu" output was not exposed in the signature), but it has the disadvantage that you lose the abstraction layer and are tied to the specific implementation of the model.
So, to summarize the mappings:
TFRecord | DataFrame | Tensor | Signature |
---|---|---|---|
feature | column | name | alias |