diff --git a/garak/buffs/paraphrase.py b/garak/buffs/paraphrase.py index 935782aac..42d1a8a62 100644 --- a/garak/buffs/paraphrase.py +++ b/garak/buffs/paraphrase.py @@ -7,36 +7,39 @@ import garak.attempt from garak import _config -from garak.generators.huggingface import HFCompatible from garak.buffs.base import Buff +from garak.resources.api.huggingface import HFCompatible -class PegasusT5(Buff): +class PegasusT5(Buff, HFCompatible): """Paraphrasing buff using Pegasus model""" + DEFAULT_PARAMS = Buff.DEFAULT_PARAMS | { + "para_model_name": "garak-llm/pegasus_paraphrase", + "hf_args": { + "device": "cpu" + }, # torch_dtype doesn't have standard support in Pegasus + "max_length": 60, + "temperature": 1.5, + } bcp47 = "en" doc_uri = "https://huggingface.co/tuner007/pegasus_paraphrase" def __init__(self, config_root=_config) -> None: - self.para_model_name = "garak-llm/pegasus_paraphrase" # https://huggingface.co/tuner007/pegasus_paraphrase - self.max_length = 60 - self.temperature = 1.5 self.num_return_sequences = 6 self.num_beams = self.num_return_sequences - self.torch_device = None self.tokenizer = None self.para_model = None super().__init__(config_root=config_root) def _load_model(self): - import torch from transformers import PegasusForConditionalGeneration, PegasusTokenizer - self.torch_device = "cuda" if torch.cuda.is_available() else "cpu" - self.tokenizer = PegasusTokenizer.from_pretrained(self.para_model_name) + self.device = self._select_hf_device() self.para_model = PegasusForConditionalGeneration.from_pretrained( self.para_model_name - ).to(self.torch_device) + ).to(self.device) + self.tokenizer = PegasusTokenizer.from_pretrained(self.para_model_name) def _get_response(self, input_text): if self.para_model is None: @@ -48,7 +51,7 @@ def _get_response(self, input_text): padding="longest", max_length=self.max_length, return_tensors="pt", - ).to(self.torch_device) + ).to(self.device) translated = self.para_model.generate( **batch, max_length=self.max_length, @@ -89,7 +92,6 @@ def __init__(self, config_root=_config) -> None: self.no_repeat_ngram_size = 2 # self.temperature = 0.7 self.max_length = 128 - self.device = None self.tokenizer = None self.para_model = None super().__init__(config_root=config_root) diff --git a/garak/detectors/base.py b/garak/detectors/base.py index 703db8264..82770ba35 100644 --- a/garak/detectors/base.py +++ b/garak/detectors/base.py @@ -12,9 +12,10 @@ from garak import _config from garak.configurable import Configurable -from garak.generators.huggingface import HFCompatible import garak.attempt +from garak.resources.api.huggingface import HFCompatible + class Detector(Configurable): """Base class for objects that define a way of detecting a probe hit / LLM failure""" diff --git a/garak/generators/huggingface.py b/garak/generators/huggingface.py index 702470a6f..81c7742b4 100644 --- a/garak/generators/huggingface.py +++ b/garak/generators/huggingface.py @@ -14,11 +14,9 @@ https://huggingface.co/docs/api-inference/quicktour """ -import inspect import logging -import os import re -from typing import Callable, List, Union +from typing import List, Union import warnings import backoff @@ -28,7 +26,7 @@ from garak import _config from garak.exception import ModelNameMissingError, GarakException from garak.generators.base import Generator - +from garak.resources.api.huggingface import HFCompatible models_to_deprefix = ["gpt2"] @@ -45,107 +43,6 @@ class HFInternalServerError(GarakException): pass -class HFCompatible: - def _set_hf_context_len(self, config): - if hasattr(config, "n_ctx"): - if isinstance(config.n_ctx, int): - self.context_len = config.n_ctx - - def _gather_hf_params(self, hf_constructor: Callable): - """ "Identify arguments that impact huggingface transformers resources and behavior""" - - # this may be a bit too naive as it will pass any parameter valid for the hf_constructor signature - # this falls over when passed some `from_pretrained` methods as the callable model params are not always explicit - params = ( - self.hf_args - if hasattr(self, "hf_args") and isinstance(self.hf_args, dict) - else {} - ) - if params is not None and not "device" in params and hasattr(self, "device"): - # consider setting self.device in all cases or if self.device is not found raise error `_select_hf_device` must be called - params["device"] = self.device - - args = {} - - params_to_process = inspect.signature(hf_constructor).parameters - - if "model" in params_to_process: - args["model"] = self.name - # expand for - params_to_process = {"do_sample": True} | params_to_process - else: - # callable is for a Pretrained class also map standard `pipeline` params - from transformers import pipeline - - params_to_process = ( - {"low_cpu_mem_usage": True} - | params_to_process - | inspect.signature(pipeline).parameters - ) - - for k in params_to_process: - if k == "model": - continue # special case `model` comes from `name` in the generator - if k in params: - val = params[k] - if k == "torch_dtype" and hasattr(torch, val): - args[k] = getattr( - torch, val - ) # some model type specific classes do not yet support direct string representation - continue - if ( - k == "device" - and "device_map" in params_to_process - and "device_map" in params - ): - # per transformers convention hold `device_map` before `device` - continue - args[k] = params[k] - - if ( - not "device_map" in args - and "device_map" in params_to_process - and "device" in params_to_process - and "device" in args - ): - del args["device"] - args["device_map"] = self.device - - return args - - def _select_hf_device(self): - """Determine the most efficient device for tensor load, hold any existing `device` already selected""" - import torch.cuda - - selected_device = None - if self.hf_args.get("device", None) is not None: - if isinstance(self.hf_args["device"], int): - # this assumes that indexed only devices selections means `cuda` - if self.hf_args["device"] < 0: - msg = f"device {self.hf_args['device']} requested but CUDA device numbering starts at zero. Use 'device: cpu' to request CPU." - logging.critical(msg) - raise ValueError(msg) - selected_device = torch.device("cuda:" + str(self.hf_args["device"])) - else: - selected_device = torch.device(self.hf_args["device"]) - - if selected_device is None: - selected_device = torch.device( - "cuda" - if torch.cuda.is_available() - else "mps" if torch.backends.mps.is_available() else "cpu" - ) - - if isinstance(selected_device, torch.device) and selected_device.type == "mps": - os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" - logging.debug("Enabled MPS fallback environment variable") - - logging.debug( - "Using %s, based on torch environment evaluation", selected_device - ) - return selected_device - - class Pipeline(Generator, HFCompatible): """Get text generations from a locally-run Hugging Face pipeline""" diff --git a/garak/resources/api/huggingface.py b/garak/resources/api/huggingface.py new file mode 100644 index 000000000..6af14a834 --- /dev/null +++ b/garak/resources/api/huggingface.py @@ -0,0 +1,114 @@ +# SPDX-FileCopyrightText: Portions Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import inspect +import logging +import os + +from typing import Callable + + +class HFCompatible: + + """Mixin class providing private utility methods for using Huggingface + transformers within garak""" + + def _set_hf_context_len(self, config): + if hasattr(config, "n_ctx"): + if isinstance(config.n_ctx, int): + self.context_len = config.n_ctx + + def _gather_hf_params(self, hf_constructor: Callable): + """ "Identify arguments that impact huggingface transformers resources and behavior""" + import torch + + # this may be a bit too naive as it will pass any parameter valid for the hf_constructor signature + # this falls over when passed some `from_pretrained` methods as the callable model params are not always explicit + params = ( + self.hf_args + if hasattr(self, "hf_args") and isinstance(self.hf_args, dict) + else {} + ) + if params is not None and not "device" in params and hasattr(self, "device"): + # consider setting self.device in all cases or if self.device is not found raise error `_select_hf_device` must be called + params["device"] = self.device + + args = {} + + params_to_process = inspect.signature(hf_constructor).parameters + + if "model" in params_to_process: + args["model"] = self.name + # expand for + params_to_process = {"do_sample": True} | params_to_process + else: + # callable is for a Pretrained class also map standard `pipeline` params + from transformers import pipeline + + params_to_process = ( + {"low_cpu_mem_usage": True} + | params_to_process + | inspect.signature(pipeline).parameters + ) + + for k in params_to_process: + if k == "model": + continue # special case `model` comes from `name` in the generator + if k in params: + val = params[k] + if k == "torch_dtype" and hasattr(torch, val): + args[k] = getattr( + torch, val + ) # some model type specific classes do not yet support direct string representation + continue + if ( + k == "device" + and "device_map" in params_to_process + and "device_map" in params + ): + # per transformers convention hold `device_map` before `device` + continue + args[k] = params[k] + + if ( + not "device_map" in args + and "device_map" in params_to_process + and "device" in params_to_process + and "device" in args + ): + del args["device"] + args["device_map"] = self.device + + return args + + def _select_hf_device(self): + """Determine the most efficient device for tensor load, hold any existing `device` already selected""" + import torch + + selected_device = None + if self.hf_args.get("device", None) is not None: + if isinstance(self.hf_args["device"], int): + # this assumes that indexed only devices selections means `cuda` + if self.hf_args["device"] < 0: + msg = f"device {self.hf_args['device']} requested but CUDA device numbering starts at zero. Use 'device: cpu' to request CPU." + logging.critical(msg) + raise ValueError(msg) + selected_device = torch.device("cuda:" + str(self.hf_args["device"])) + else: + selected_device = torch.device(self.hf_args["device"]) + + if selected_device is None: + selected_device = torch.device( + "cuda" + if torch.cuda.is_available() + else "mps" if torch.backends.mps.is_available() else "cpu" + ) + + if isinstance(selected_device, torch.device) and selected_device.type == "mps": + os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" + logging.debug("Enabled MPS fallback environment variable") + + logging.debug( + "Using %s, based on torch environment evaluation", selected_device + ) + return selected_device diff --git a/tests/buffs/test_buffs_paraphrase.py b/tests/buffs/test_buffs_paraphrase.py new file mode 100644 index 000000000..8eb00e521 --- /dev/null +++ b/tests/buffs/test_buffs_paraphrase.py @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import pytest + +from garak import _plugins + +BUFFS = [ + classname + for (classname, active) in _plugins.enumerate_plugins("buffs") + if classname.startswith("buffs.paraphrase.") +] + + +@pytest.mark.parametrize("klassname", BUFFS) +def test_buff_results(klassname): + b = _plugins.load_plugin(klassname) + b._load_model() + paraphrases = b._get_response("The rain in Spain falls mainly in the plains.") + assert len(paraphrases) > 0, "paraphrase buffs must return paraphrases" + assert len(paraphrases) == len( + set(paraphrases) + ), "Paraphrases should not have dupes" + assert not any([i == "" for i in paraphrases]), "No paraphrase may be empty"