Skip to content

Commit

Permalink
Internal change
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 406833837
  • Loading branch information
achoum authored and copybara-github committed Nov 1, 2021
1 parent 3b85dbe commit c795a1e
Show file tree
Hide file tree
Showing 22 changed files with 267 additions and 130 deletions.
8 changes: 5 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Changelog

## 0.2.0 - ????
## 0.2.0 - 2021-10-29

### Features

Expand All @@ -11,8 +11,10 @@
- Add support for permutation variable importance in the GBT learner with the
`compute_permutation_variable_importance` parameter.
- Support for tf.int8 and tf.int16 values.
- Support for distributed gradient boosted trees learning using the
ParameterServerStrategy distribution strategy.
- Support for distributed gradient boosted trees learning. Currently, the TF
ParameterServerStrategy distribution strategy is only available in
monolithic TF-DF builds. The Yggdrasil Decision Forest GRPC distribute
strategy can be used instead.
- Support for training from dataset stored on disk in CSV and RecordIO format
(instead of creating a tensorflow dataset). This option is currently more
efficient for distributed training (until the ParameterServerStrategy
Expand Down
6 changes: 6 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
# absl used by tensorflow.
http_archive(
name = "org_tensorflow",

# sha256 = "4896b49c4088030f62b98264441475c09569ea6e49cfb270e2e1f3ef0f743a2f",
# strip_prefix = "tensorflow-2.7.0-rc1",
# urls = ["https://github.com/tensorflow/tensorflow/archive/refs/tags/v2.7.0-rc1.zip"],

sha256 = "40d3203ab5f246d83bae328288a24209a2b85794f1b3e2cd0329458d8e7c1985",
strip_prefix = "tensorflow-2.6.0",
urls = ["https://github.com/tensorflow/tensorflow/archive/refs/tags/v2.6.0.zip"],
Expand Down Expand Up @@ -58,6 +63,7 @@ ydf_load_deps(
"absl",
"protobuf",
"zlib",
"farmhash",
],
repo_name = "@ydf",
)
1 change: 1 addition & 0 deletions configure/MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ recursive-include * *.so
recursive-include * *.so.[0-9]
recursive-include * *.dylib
recursive-include * *.dll
recursive-include * grpc_worker_main
4 changes: 2 additions & 2 deletions configure/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
from setuptools.command.install import install
from setuptools.dist import Distribution

_VERSION = "0.1.9"
_VERSION = "0.2.0"

with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()

REQUIRED_PACKAGES = [
"numpy",
"pandas",
"tensorflow~=2.6",
"tensorflow~=2.6", # "tensorflow >= 2.7.0rc0, < 2.8'",
"six",
"absl_py",
"wheel",
Expand Down
58 changes: 48 additions & 10 deletions documentation/distributed_training.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@

Distributed training makes it possible to train models quickly on larger
datasets. Distributed training in TF-DF relies on the TensorFlow
ParameterServerV2 distribution strategy. Only some of the TF-DF models support
distributed training.
ParameterServerV2 distribution strategy or the Yggdrasil Decision Forest GRPC
distribute strategy. Only some of the TF-DF models support distributed training.

See the
[distributed training](https://github.com/google/yggdrasil-decision-forests/documentation/user_manual.md?#distributed-training)
section in the Yggdrasil Decision Forests user manual for details about the
available distributed training algorithms. When using distributed training in
TF-DF, Yggdrasil Decision Forests is effectively running the `TF_DIST distribute
implementation`.
available distributed training algorithms. When using distributed training with
TF Parameter Server in TF-DF, Yggdrasil Decision Forests is effectively running
the `TF_DIST` distribute implementation.

**Note:** Currently (Oct. 2021), the shared (i.e. != monolithic) OSS build of
TF-DF does not support TF ParameterServer distribution strategy. Please use the
Yggdrasil DF GRPC distribute strategy instead.

## Dataset

Expand All @@ -40,21 +44,20 @@ As of today ( Oct 2021), the following solutions are available for TF-DF:
solution is the fastest and the one that gives the best results as it is
currently the only one that guarantees that each example is read only once.
The downside is that this solution does not support TensorFlow
pre-processing.
pre-processing. The "Yggdrasil DF GRPC distribute strategy" only support
this option for dataset reading.

2. To use **ParameterServerV2 distributed dataset** with dataset file sharding
using TF-DF worker index. This solution is the most natural for TF users.

Currently, using ParameterServerV2 distributed dataset with context or
tf.data.service are not compatible with TF-DF.

Note that in all cases, ParameterServerV2 is used to distribute the computation.

## Examples

Following are some examples of distributed training.

### Distribution with Yggdrasil distributed dataset reading
### Distribution with Yggdrasil distributed dataset reading and TF ParameterServerV2 strategy

```python
import tensorflow_decision_forests as tfdf
Expand All @@ -78,7 +81,7 @@ See Yggdrasil Decision Forests
[supported formats](https://github.com/google/yggdrasil-decision-forests/blob/main/documentation/user_manual.md#dataset-path-and-format)
for the possible values of `dataset_format`.

### Distribution with ParameterServerV2 distributed dataset
### Distribution with ParameterServerV2 distributed dataset and TF ParameterServerV2 strategy

```python
import tensorflow_decision_forests as tfdf
Expand Down Expand Up @@ -149,3 +152,38 @@ model.fit(
print("Trained model")
model.summary()
```

### Distribution with Yggdrasil distributed dataset reading and Yggdrasil DF GRPC distribute strategy

```python
import tensorflow_decision_forests as tfdf
import tensorflow as tf

deployment_config = tfdf.keras.core.YggdrasilDeploymentConfig()
deployment_config.try_resume_training = True
deployment_config.distribute.implementation_key = "GRPC"
socket_addresses = deployment_config.distribute.Extensions[
tfdf.keras.core.grpc_pb2.grpc].socket_addresses

# Socket addresses of ":grpc_worker_main" running instances.
socket_addresses.addresses.add(ip="127.0.0.1", port=2001)
socket_addresses.addresses.add(ip="127.0.0.2", port=2001)
socket_addresses.addresses.add(ip="127.0.0.3", port=2001)
socket_addresses.addresses.add(ip="127.0.0.4", port=2001)

model = tfdf.keras.DistributedGradientBoostedTreesModel(
advanced_arguments=tfdf.keras.AdvancedArguments(
yggdrasil_deployment_config=deployment_config))

model.fit_on_dataset_path(
train_path="/path/to/dataset@100000",
label_key="label_key",
dataset_format="tfrecord+tfe")

print("Trained model")
model.summary()
```

See Yggdrasil Decision Forests
[supported formats](https://github.com/google/yggdrasil-decision-forests/blob/main/documentation/user_manual.md#dataset-path-and-format)
for the possible values of `dataset_format`.
1 change: 1 addition & 0 deletions documentation/known_issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ The following table shows the compatibility between

tensorflow_decision_forests | tensorflow
--------------------------- | ----------
0.2.0 | 2.6
0.1.9 | 2.6
0.1.1 - 0.1.8 | 2.5
0.1.0 | 2.4
Expand Down
11 changes: 11 additions & 0 deletions tensorflow_decision_forests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,14 @@ config_setting(
name = "stop_training_on_interrupt",
values = {"define": "stop_training_on_interrupt=1"},
)

# If "disable_tf_ps_distribution_strategy" is true, the TF Parameter Server
# distribution strategy is not available for distributed training.
#
# Distribution with TF PS is currently NOT supported for OSS TF-DF with shared
# build (monolithic build works however) and TF<2.7. In this case, the GRPC
# Worker Server can be used instead.
config_setting(
name = "disable_tf_ps_distribution_strategy",
values = {"define": "tf_ps_distribution_strategy=0"},
)
2 changes: 1 addition & 1 deletion tensorflow_decision_forests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"""

__version__ = "0.1.9"
__version__ = "0.2.0"
__author__ = "Mathieu Guillame-Bert"

from tensorflow_decision_forests import keras
Expand Down
20 changes: 16 additions & 4 deletions tensorflow_decision_forests/keras/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ py_library(
"@ydf//yggdrasil_decision_forests/dataset:data_spec_py_proto",
"@ydf//yggdrasil_decision_forests/learner:abstract_learner_py_proto",
"@ydf//yggdrasil_decision_forests/model:abstract_model_py_proto",
"@ydf//yggdrasil_decision_forests/utils/distribute/implementations/grpc:grpc_py_proto",
],
)

Expand Down Expand Up @@ -112,13 +113,15 @@ py_test(

# This test relies on the support of TF PS distribution strategy and TF-DF.
# Note: TF PS distribution strategy and TF-DF are currently not compatible in non-monolithic build of TensorFlow+TFDF (e.g. OSS TFDF).
#
# This test is expected to fail TF PS distributed training is disabled (i.e.
# enabling the ":disable_tf_ps_distribution_strategy" rule).
py_test(
name = "keras_distributed_test",
size = "large",
srcs = ["keras_distributed_test.py"],
data = [
":synthetic_dataset",
":test_runner",
":grpc_worker_main",
"@ydf//yggdrasil_decision_forests/test_data",
],
python_version = "PY3",
Expand All @@ -132,10 +135,10 @@ py_test(
# absl/testing:parameterized dep,
# numpy dep,
# pandas dep,
"//third_party/py/portpicker",
# portpicker dep,
"@org_tensorflow//tensorflow/python",
"@org_tensorflow//tensorflow/python/distribute:distribute_lib",
"//third_party/tensorflow_decision_forests",
"//tensorflow_decision_forests",
],
)

Expand Down Expand Up @@ -164,3 +167,12 @@ tf_cc_binary(
"@ydf//yggdrasil_decision_forests/cli/utils:synthetic_dataset_lib_with_main",
],
)

tf_cc_binary(
name = "grpc_worker_main",
deps = [
"@org_tensorflow//tensorflow/core:framework",
"@org_tensorflow//tensorflow/core:lib",
"@ydf//yggdrasil_decision_forests/utils/distribute/implementations/grpc:grpc_worker_lib_with_main",
],
)
1 change: 1 addition & 0 deletions tensorflow_decision_forests/keras/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from yggdrasil_decision_forests.dataset import data_spec_pb2
from yggdrasil_decision_forests.learner import abstract_learner_pb2
from yggdrasil_decision_forests.model import abstract_model_pb2 # pylint: disable=unused-import
from yggdrasil_decision_forests.utils.distribute.implementations.grpc import grpc_pb2 # pylint: disable=unused-import

layers = tf.keras.layers
models = tf.keras.models
Expand Down
82 changes: 75 additions & 7 deletions tensorflow_decision_forests/keras/keras_distributed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from __future__ import print_function

import os
from typing import List
import subprocess
from typing import List, Tuple

from absl import flags
from absl import logging
Expand All @@ -44,7 +45,7 @@ def tmp_path() -> str:
return flags.FLAGS.test_tmpdir


def _create_in_process_cluster(num_workers, num_ps):
def _create_in_process_tf_ps_cluster(num_workers, num_ps):
"""Create a cluster of TF workers and returns their addresses.
Such cluster simulate the behavior of multiple TF parameter servers.
Expand Down Expand Up @@ -85,6 +86,32 @@ def _create_in_process_cluster(num_workers, num_ps):
cluster_spec, rpc_layer="grpc")


def _create_in_process_grpc_worker_cluster(
num_workers) -> List[Tuple[str, int]]:
"""Create a cluster of GRPC workers and returns their addresses.
Args:
num_workers: Number of workers..
Returns:
List of socket addresses.
"""

worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
worker_ip = "localhost"
worker_addresses = []

for i in range(num_workers):
worker_addresses.append((worker_ip, worker_ports[i]))
args = [
"tensorflow_decision_forests/keras/grpc_worker_main",
"--alsologtostderr", "--port",
str(worker_ports[i])
]
subprocess.Popen(args, stdout=subprocess.PIPE)
return worker_addresses


class TFDFDistributedTest(parameterized.TestCase, tf.test.TestCase):

def test_distributed_training_synthetic(self):
Expand Down Expand Up @@ -124,7 +151,7 @@ def dataset_fn(context: distribute_lib.InputContext, seed: int):
return dataset

# Create the workers
cluster_resolver = _create_in_process_cluster(num_workers=4, num_ps=1)
cluster_resolver = _create_in_process_tf_ps_cluster(num_workers=4, num_ps=1)

# Configure the model and datasets
strategy = tf.distribute.experimental.ParameterServerStrategy(
Expand Down Expand Up @@ -191,7 +218,7 @@ def dataset_fn(input_context):

dataset_creator = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

cluster_resolver = _create_in_process_cluster(num_workers=2, num_ps=1)
cluster_resolver = _create_in_process_tf_ps_cluster(num_workers=2, num_ps=1)

strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver)
Expand Down Expand Up @@ -288,7 +315,7 @@ def extract_label(*columns):
return ds_dataset

# Create the workers
cluster_resolver = _create_in_process_cluster(num_workers=5, num_ps=1)
cluster_resolver = _create_in_process_tf_ps_cluster(num_workers=5, num_ps=1)

# Configure the model and datasets
strategy = tf.distribute.experimental.ParameterServerStrategy(
Expand Down Expand Up @@ -335,7 +362,7 @@ def extract_label(*columns):
# at different speed, some examples can be repeated.
self.assertAlmostEqual(evaluation["accuracy"], 0.8603476, delta=0.02)

def test_distributed_training_adult_from_disk(self):
def test_distributed_training_adult_from_file(self):
# Path to dataset.
dataset_directory = os.path.join(test_data_path(), "dataset")
train_path = os.path.join(dataset_directory, "adult_train.csv")
Expand All @@ -344,7 +371,7 @@ def test_distributed_training_adult_from_disk(self):
label = "income"

# Create the workers
cluster_resolver = _create_in_process_cluster(num_workers=5, num_ps=1)
cluster_resolver = _create_in_process_tf_ps_cluster(num_workers=5, num_ps=1)

# Configure the model and datasets
strategy = tf.distribute.experimental.ParameterServerStrategy(
Expand Down Expand Up @@ -378,6 +405,47 @@ def test_distributed_training_adult_from_disk(self):
"capital_gain", "capital_loss", "hours_per_week", "native_country"
])

def test_distributed_training_adult_from_file_with_grpc_worker(self):
# Path to dataset.
dataset_directory = os.path.join(test_data_path(), "dataset")
train_path = os.path.join(dataset_directory, "adult_train.csv")
test_path = os.path.join(dataset_directory, "adult_test.csv")

label = "income"

# Create GRPC Yggdrasil DF workers
worker_addresses = _create_in_process_grpc_worker_cluster(5)

# Specify the socket addresses of the worker to the manager.
deployment_config = tfdf.keras.core.YggdrasilDeploymentConfig()
deployment_config.try_resume_training = True
deployment_config.distribute.implementation_key = "GRPC"
socket_addresses = deployment_config.distribute.Extensions[
tfdf.keras.core.grpc_pb2.grpc].socket_addresses
for worker_ip, worker_port in worker_addresses:
socket_addresses.addresses.add(ip=worker_ip, port=worker_port)

model = tfdf.keras.DistributedGradientBoostedTreesModel(
advanced_arguments=tfdf.keras.AdvancedArguments(
yggdrasil_deployment_config=deployment_config))
model.compile(metrics=["accuracy"])

training_history = model.fit_on_dataset_path(
train_path=train_path,
label_key=label,
dataset_format="csv",
valid_path=test_path)
logging.info("Training history: %s", training_history.history)

logging.info("Trained model:")
model.summary()

test_df = pd.read_csv(test_path)
tf_test = tfdf.keras.pd_dataframe_to_tf_dataset(test_df, label)
evaluation = model.evaluate(tf_test, return_dict=True)
logging.info("Evaluation: %s", evaluation)
self.assertAlmostEqual(evaluation["accuracy"], 0.8703476, delta=0.01)

def test_in_memory_not_supported(self):

dataframe = pd.DataFrame({
Expand Down
Loading

0 comments on commit c795a1e

Please sign in to comment.