From 1af0e2f1936742a9e9a769ee9e0ff26385f56d07 Mon Sep 17 00:00:00 2001 From: Allen Liu Date: Mon, 13 Mar 2023 10:55:19 -0700 Subject: [PATCH] fix: smp will not be imported if not specified by user (#651) * fix: smp will not be imported if not specified by user * fix: add pipeline_parallel_degree for smp after v1.60 * remove tf related model parallel vars * version bump --- smdebug/_version.py | 2 +- smdebug/core/utils.py | 18 ++++++++++++------ smdebug/tensorflow/base_hook.py | 17 ----------------- 3 files changed, 13 insertions(+), 24 deletions(-) diff --git a/smdebug/_version.py b/smdebug/_version.py index 7b64613a0..976a991dc 100644 --- a/smdebug/_version.py +++ b/smdebug/_version.py @@ -1 +1 @@ -__version__ = "1.0.29" +__version__ = "1.0.30" diff --git a/smdebug/core/utils.py b/smdebug/core/utils.py index 625b1c354..5b18c7e40 100644 --- a/smdebug/core/utils.py +++ b/smdebug/core/utils.py @@ -35,6 +35,7 @@ SMDebugRuntimeError, SMDebugTypeError, SMDebugValueError, + SMDebugError ) @@ -49,18 +50,18 @@ class FRAMEWORK(Enum): _smddp_tf_imported = None _smddp_pt_imported = None _is_using_smmodelparallel = None +_smp_imported = None -try: - import smdistributed.modelparallel.tensorflow as smp - _smp_imported = smp -except (ImportError, ModuleNotFoundError): +if check_smmodelparallel_training(): try: import smdistributed.modelparallel.torch as smp _smp_imported = smp except (ImportError, ModuleNotFoundError): _smp_imported = None + except Exception as e: + raise SMDebugError(e) try: @@ -644,8 +645,13 @@ def check_smmodelparallel_training(): else: try: smp_flag = json.loads(os.getenv("SM_HPS")) - if "mp_parameters" in smp_flag and "partitions" in smp_flag["mp_parameters"]: - _is_using_smmodelparallel = True + if "mp_parameters" in smp_flag: + if "pipeline_parallel_degree" in smp_flag["mp_parameters"]: + _is_using_smmodelparallel = True + elif "partitions" in smp_flag["mp_parameters"]: + _is_using_smmodelparallel = True + else: + _is_using_smmodelparallel = False else: _is_using_smmodelparallel = False except: diff --git a/smdebug/tensorflow/base_hook.py b/smdebug/tensorflow/base_hook.py index 72d6f31b7..ded8fd972 100644 --- a/smdebug/tensorflow/base_hook.py +++ b/smdebug/tensorflow/base_hook.py @@ -40,13 +40,6 @@ load_tf_config_json, ) -try: - import smdistributed.modelparallel.tensorflow as smp # noqa isort:skip - - _smp_imported = smp -except ImportError: - _smp_imported = None - DEFAULT_INCLUDE_COLLECTIONS = [ CollectionKeys.METRICS, @@ -195,11 +188,6 @@ def _get_worker_name(self) -> str: """ self._assert_distribution_strategy() if self.distribution_strategy == TFDistributionStrategy.HOROVOD: - if _smp_imported and _smp_imported.core.initialized: - # when model parallel is being used, there will be multiple processes - # with same hvd rank, hence use smp.rank - return f"worker_{smp.rank()}" - import horovod.tensorflow as hvd return f"worker_{hvd.rank()}" @@ -277,11 +265,6 @@ def _get_custom_and_default_collections(self) -> Tuple[Set["Collection"], Set["C def _get_num_workers(self): self._assert_distribution_strategy() if self.distribution_strategy == TFDistributionStrategy.HOROVOD: - if _smp_imported and smp.core.initialized: - # when model parallel is being used, there will be multiple hvd process groups, - # hence use smp.size - return smp.size() - import horovod.tensorflow as hvd return hvd.size()