From babef09f428df6823c2e125565c30873a15a3c1c Mon Sep 17 00:00:00 2001 From: James Byrne Date: Fri, 24 May 2024 23:30:08 +0100 Subject: [PATCH] Dev #252: adding further amendments to stabilise MPI based training with horovod --- icenet/model/cli.py | 4 ++++ icenet/model/handlers/wandb.py | 10 ++++++++++ icenet/model/networks/base.py | 12 +++++++----- icenet/model/networks/tensorflow.py | 9 ++++++--- icenet/model/train.py | 6 +++++- 5 files changed, 32 insertions(+), 9 deletions(-) diff --git a/icenet/model/cli.py b/icenet/model/cli.py index 508c70d..b1c8edb 100644 --- a/icenet/model/cli.py +++ b/icenet/model/cli.py @@ -71,6 +71,10 @@ def add_tensorflow(self): return self def add_horovod(self): + self.add_argument("--no-horovod", + dest="horovod", + default=True, + action="store_false") self.add_argument("--device-type", default=None, help="Choose a device type to detect, if using") diff --git a/icenet/model/handlers/wandb.py b/icenet/model/handlers/wandb.py index 91648ee..e8dac92 100644 --- a/icenet/model/handlers/wandb.py +++ b/icenet/model/handlers/wandb.py @@ -11,6 +11,16 @@ def init_wandb(cli_args): if wandb_available: + if cli_args.horovod: + try: + import horovod.tensorflow.keras as hvd + except ModuleNotFoundError: + raise RuntimeError("We're running horovod jobs without the module, eh?") + + if hvd.rank() > 0: + logging.info("Not initialising wandb for rank {}".format(hvd.rank())) + return + logging.warning("Initialising WANDB for this run at user request") run = wandb.init( diff --git a/icenet/model/networks/base.py b/icenet/model/networks/base.py index 1be54b4..e1d8a28 100644 --- a/icenet/model/networks/base.py +++ b/icenet/model/networks/base.py @@ -9,8 +9,8 @@ class BaseNetwork: def __init__(self, - dataset: object, run_name: object, + dataset: object, callbacks_additional: list = None, callbacks_default: list = None, network_folder: object = None, @@ -25,10 +25,10 @@ def __init__(self, self._model_path = os.path.join( self._network_folder, "{}.model_{}.{}".format(run_name, - dataset.identifier, - seed)) + dataset.identifier, + seed)) - self._callbacks = list() if callbacks_default is None else callbacks_default + self._callbacks = self.get_default_callbacks() if callbacks_default is None else callbacks_default self._callbacks += callbacks_additional if callbacks_additional is not None else [] self._dataset = dataset self._run_name = run_name @@ -50,9 +50,11 @@ def add_callback(self, callback): logging.debug("Adding callback {}".format(callback)) self._callbacks.append(callback) + def get_default_callbacks(self): + return list() + @abstractmethod def train(self, - dataset: object, epochs: int, model_creator: callable, train_dataset: object, diff --git a/icenet/model/networks/tensorflow.py b/icenet/model/networks/tensorflow.py index a759079..7366391 100644 --- a/icenet/model/networks/tensorflow.py +++ b/icenet/model/networks/tensorflow.py @@ -103,10 +103,11 @@ def train(self, with open(history_path, 'w') as fh: pd.DataFrame(model_history.history).to_json(fh) - def get_callbacks(self): + def get_default_callbacks(self): callbacks_list = list() if self._checkpoint_monitor is not None: + logging.info("Adding ModelCheckpoint callback") callbacks_list.append( ModelCheckpoint(filepath=self._weights_path, monitor=self._checkpoint_monitor, @@ -115,6 +116,7 @@ def get_callbacks(self): save_best_only=True)) if self._early_stopping_patience > 0: + logging.info("Adding EarlyStopping callback") callbacks_list.append( EarlyStopping(monitor=self._checkpoint_monitor, mode=self._checkpoint_mode, @@ -123,6 +125,7 @@ def get_callbacks(self): baseline=None)) if self._lr_decay[0] > 0: + logging.info("ADding LearningRateScheduler callback") lr_decay = -0.1 * np.log(self._lr_decay[0]) callbacks_list.append( @@ -180,11 +183,11 @@ def train(self, logging.debug("Calling training loop") model_history = network.fit( - train_dataset, + train_dataset.repeat(), epochs=epochs, verbose=1 if hvd.rank() == 0 and self._verbose else 0, callbacks=self.callbacks, - validation_data=validation_dataset, + validation_data=validation_dataset.repeat(), max_queue_size=self._data_queue_size, steps_per_epoch=self.dataset.counts["train"] // (self.dataset.batch_size * hvd.size()), ) diff --git a/icenet/model/train.py b/icenet/model/train.py index f916742..9f7c87f 100644 --- a/icenet/model/train.py +++ b/icenet/model/train.py @@ -3,7 +3,11 @@ import time import tensorflow as tf -import horovod.tensorflow.keras as hvd + +try: + import horovod.tensorflow.keras as hvd +except ModuleNotFoundError: + pass from icenet.data.dataset import IceNetDataSet, MergedIceNetDataSet from icenet.model.cli import TrainingArgParser