From 4058defda6f89946179fd1216d1b1c1b057758b7 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 31 Oct 2023 14:03:05 -0400 Subject: [PATCH 1/7] add device param --- .../ml/inference/huggingface_inference.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference.py b/sdks/python/apache_beam/ml/inference/huggingface_inference.py index 3ec063808ae3..c7b483c19cb0 100644 --- a/sdks/python/apache_beam/ml/inference/huggingface_inference.py +++ b/sdks/python/apache_beam/ml/inference/huggingface_inference.py @@ -573,6 +573,7 @@ def __init__( task: Union[str, PipelineTask] = "", model: str = "", *, + device: str = 'GPU', inference_fn: PipelineInferenceFn = _default_pipeline_inference_fn, load_pipeline_args: Optional[Dict[str, Any]] = None, inference_args: Optional[Dict[str, Any]] = None, @@ -605,7 +606,11 @@ def __init__( model_handler = HuggingFacePipelineModelHandler( task="text-generation", model="meta-llama/Llama-2-7b-hf", load_pipeline_args={'model_kwargs':{'quantization_map':config}}) - + device (str): the device on which you wish to run the pipeline. Defaults + to GPU. If GPU is not available then it falls back to CPU. You can also + use advanced option like `device_map` with key-value pair as you would + do in the usual Hugging Face pipeline using `load_pipeline_args`. + Ex: load_pipeline_args={'device_map':auto}). inference_fn: the inference function to use during RunInference. Default is _default_pipeline_inference_fn. load_pipeline_args (Dict[str, Any]): keyword arguments to provide load @@ -627,6 +632,9 @@ def __init__( """ self._task = task self._model = model + self._device = 'cuda:1' + if device != 'GPU' or not is_gpu_available_torch(): + self._device = 'cpu' self._inference_fn = inference_fn self._load_pipeline_args = load_pipeline_args if load_pipeline_args else {} self._inference_args = inference_args if inference_args else {} @@ -638,6 +646,12 @@ def __init__( if max_batch_size is not None: self._batching_kwargs['max_batch_size'] = max_batch_size self._large_model = large_model + if 'device' not in self._load_pipeline_args: + self._load_pipeline_args['device'] = self._device + else: + _LOGGER.warning( + '`device` specified in `load_pipeline_args`. `device` ' + 'parameter for HuggingFacePipelineModelHandler will be ignored.') _validate_constructor_args_hf_pipeline(self._task, self._model) def load_model(self): From 02db2cae510aa30ebdacd09ea288542eeebf4a2d Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 31 Oct 2023 15:09:37 -0400 Subject: [PATCH 2/7] add spare line --- sdks/python/apache_beam/ml/inference/huggingface_inference.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference.py b/sdks/python/apache_beam/ml/inference/huggingface_inference.py index c7b483c19cb0..ba39999a21f6 100644 --- a/sdks/python/apache_beam/ml/inference/huggingface_inference.py +++ b/sdks/python/apache_beam/ml/inference/huggingface_inference.py @@ -606,6 +606,7 @@ def __init__( model_handler = HuggingFacePipelineModelHandler( task="text-generation", model="meta-llama/Llama-2-7b-hf", load_pipeline_args={'model_kwargs':{'quantization_map':config}}) + device (str): the device on which you wish to run the pipeline. Defaults to GPU. If GPU is not available then it falls back to CPU. You can also use advanced option like `device_map` with key-value pair as you would From 93832ecc3541fbc064086f0ab89c764b1fd4fa2d Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 31 Oct 2023 19:07:25 -0400 Subject: [PATCH 3/7] change validations --- .../ml/inference/huggingface_inference.py | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference.py b/sdks/python/apache_beam/ml/inference/huggingface_inference.py index ba39999a21f6..5f8a02dd3ab6 100644 --- a/sdks/python/apache_beam/ml/inference/huggingface_inference.py +++ b/sdks/python/apache_beam/ml/inference/huggingface_inference.py @@ -633,9 +633,6 @@ def __init__( """ self._task = task self._model = model - self._device = 'cuda:1' - if device != 'GPU' or not is_gpu_available_torch(): - self._device = 'cpu' self._inference_fn = inference_fn self._load_pipeline_args = load_pipeline_args if load_pipeline_args else {} self._inference_args = inference_args if inference_args else {} @@ -647,13 +644,29 @@ def __init__( if max_batch_size is not None: self._batching_kwargs['max_batch_size'] = max_batch_size self._large_model = large_model + + # Check if the device is specified twice. If true then the device parameter + # of model handler is overridden. + self._deduplicate_device_value(device) + _validate_constructor_args_hf_pipeline(self._task, self._model) + + def _deduplicate_device_value(self, device: str): if 'device' not in self._load_pipeline_args: - self._load_pipeline_args['device'] = self._device + if device == 'CPU': + self._load_pipeline_args['device'] = 'cpu' + else: + if is_gpu_available_torch(): + self._load_pipeline_args['device'] = 'cuda:1' + else: + _LOGGER.warning( + "HuggingFaceModelHandler specified a 'GPU' device, " + "but GPUs are not available. Switching to CPU.") + self._load_pipeline_args['device'] = 'cpu' else: - _LOGGER.warning( - '`device` specified in `load_pipeline_args`. `device` ' - 'parameter for HuggingFacePipelineModelHandler will be ignored.') - _validate_constructor_args_hf_pipeline(self._task, self._model) + if device: + _LOGGER.warning( + '`device` specified in `load_pipeline_args`. `device` ' + 'parameter for HuggingFacePipelineModelHandler will be ignored.') def load_model(self): """Loads and initializes the pipeline for processing.""" From 668dbc3af939d2d39f39934b18a20d1d0b80347e Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 31 Oct 2023 21:56:51 -0400 Subject: [PATCH 4/7] rm note --- sdks/python/apache_beam/ml/inference/huggingface_inference.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference.py b/sdks/python/apache_beam/ml/inference/huggingface_inference.py index 5f8a02dd3ab6..e9719f70c5e7 100644 --- a/sdks/python/apache_beam/ml/inference/huggingface_inference.py +++ b/sdks/python/apache_beam/ml/inference/huggingface_inference.py @@ -584,10 +584,6 @@ def __init__( """ Implementation of the ModelHandler interface for Hugging Face Pipelines. - **Note:** To specify which device to use (CPU/GPU), - use the load_pipeline_args with key-value as you would do in the usual - Hugging Face pipeline. Ex: load_pipeline_args={'device':0}) - Example Usage model:: pcoll | RunInference(HuggingFacePipelineModelHandler( task="fill-mask")) From 2c0d4c57f936cd80e92a4bd15e6cda9dd6d32864 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 1 Nov 2023 11:06:22 -0400 Subject: [PATCH 5/7] valueError, fix default --- .../ml/inference/huggingface_inference.py | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference.py b/sdks/python/apache_beam/ml/inference/huggingface_inference.py index e9719f70c5e7..1995fd6d6f5f 100644 --- a/sdks/python/apache_beam/ml/inference/huggingface_inference.py +++ b/sdks/python/apache_beam/ml/inference/huggingface_inference.py @@ -573,7 +573,7 @@ def __init__( task: Union[str, PipelineTask] = "", model: str = "", *, - device: str = 'GPU', + device: Optional[str] = None, inference_fn: PipelineInferenceFn = _default_pipeline_inference_fn, load_pipeline_args: Optional[Dict[str, Any]] = None, inference_args: Optional[Dict[str, Any]] = None, @@ -603,11 +603,11 @@ def __init__( task="text-generation", model="meta-llama/Llama-2-7b-hf", load_pipeline_args={'model_kwargs':{'quantization_map':config}}) - device (str): the device on which you wish to run the pipeline. Defaults - to GPU. If GPU is not available then it falls back to CPU. You can also - use advanced option like `device_map` with key-value pair as you would - do in the usual Hugging Face pipeline using `load_pipeline_args`. - Ex: load_pipeline_args={'device_map':auto}). + device (str): the device (`"CPU"` or `"GPU"`) on which you wish to run + the pipeline. Defaults to GPU. If GPU is not available then it falls + back to CPU. You can also use advanced option like `device_map` with + key-value pair as you would do in the usual Hugging Face pipeline using + `load_pipeline_args`. Ex: load_pipeline_args={'device_map':auto}). inference_fn: the inference function to use during RunInference. Default is _default_pipeline_inference_fn. load_pipeline_args (Dict[str, Any]): keyword arguments to provide load @@ -647,8 +647,15 @@ def __init__( _validate_constructor_args_hf_pipeline(self._task, self._model) def _deduplicate_device_value(self, device: str): + current_device = device.upper() if device else None if 'device' not in self._load_pipeline_args: - if device == 'CPU': + if (not current_device and current_device != 'CPU' and + current_device != 'GPU'): + raise ValueError( + f"Invalid device value: {device}. Please specify " + "either CPU or GPU. Defaults to GPU if no value " + "is provided.") + elif current_device == 'CPU': self._load_pipeline_args['device'] = 'cpu' else: if is_gpu_available_torch(): @@ -659,7 +666,7 @@ def _deduplicate_device_value(self, device: str): "but GPUs are not available. Switching to CPU.") self._load_pipeline_args['device'] = 'cpu' else: - if device: + if current_device: _LOGGER.warning( '`device` specified in `load_pipeline_args`. `device` ' 'parameter for HuggingFacePipelineModelHandler will be ignored.') From 1d089faf10c68e2e1fc481791b842f589c7fbf92 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 1 Nov 2023 12:12:22 -0400 Subject: [PATCH 6/7] valueError --- .../ml/inference/huggingface_inference.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference.py b/sdks/python/apache_beam/ml/inference/huggingface_inference.py index 1995fd6d6f5f..08e37e05ca26 100644 --- a/sdks/python/apache_beam/ml/inference/huggingface_inference.py +++ b/sdks/python/apache_beam/ml/inference/huggingface_inference.py @@ -648,14 +648,13 @@ def __init__( def _deduplicate_device_value(self, device: str): current_device = device.upper() if device else None + if (current_device and current_device != 'CPU' and current_device != 'GPU'): + raise ValueError( + f"Invalid device value: {device}. Please specify " + "either CPU or GPU. Defaults to GPU if no value " + "is provided.") if 'device' not in self._load_pipeline_args: - if (not current_device and current_device != 'CPU' and - current_device != 'GPU'): - raise ValueError( - f"Invalid device value: {device}. Please specify " - "either CPU or GPU. Defaults to GPU if no value " - "is provided.") - elif current_device == 'CPU': + if current_device == 'CPU': self._load_pipeline_args['device'] = 'cpu' else: if is_gpu_available_torch(): @@ -667,7 +666,7 @@ def _deduplicate_device_value(self, device: str): self._load_pipeline_args['device'] = 'cpu' else: if current_device: - _LOGGER.warning( + raise ValueError( '`device` specified in `load_pipeline_args`. `device` ' 'parameter for HuggingFacePipelineModelHandler will be ignored.') From 1d3417cca7206e906d6a72574b13901a3cc259bb Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 1 Nov 2023 12:50:15 -0400 Subject: [PATCH 7/7] Make param optional to make linter happy --- sdks/python/apache_beam/ml/inference/huggingface_inference.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference.py b/sdks/python/apache_beam/ml/inference/huggingface_inference.py index 08e37e05ca26..878d7bfc9cf2 100644 --- a/sdks/python/apache_beam/ml/inference/huggingface_inference.py +++ b/sdks/python/apache_beam/ml/inference/huggingface_inference.py @@ -646,7 +646,7 @@ def __init__( self._deduplicate_device_value(device) _validate_constructor_args_hf_pipeline(self._task, self._model) - def _deduplicate_device_value(self, device: str): + def _deduplicate_device_value(self, device: Optional[str]): current_device = device.upper() if device else None if (current_device and current_device != 'CPU' and current_device != 'GPU'): raise ValueError(