From 5f953ceb079bb90d5c7377be6ee62338b99b133a Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 6 Jun 2024 13:33:06 -0400 Subject: [PATCH 1/3] Implement Hugging Face Image Embedding MLTransform --- .../ml/transforms/embeddings/huggingface.py | 40 +++++++++++++ .../transforms/embeddings/huggingface_test.py | 56 +++++++++++++++++++ 2 files changed, 96 insertions(+) diff --git a/sdks/python/apache_beam/ml/transforms/embeddings/huggingface.py b/sdks/python/apache_beam/ml/transforms/embeddings/huggingface.py index 7fcaa9c9a5df..9c9d7cf29178 100644 --- a/sdks/python/apache_beam/ml/transforms/embeddings/huggingface.py +++ b/sdks/python/apache_beam/ml/transforms/embeddings/huggingface.py @@ -32,6 +32,7 @@ from apache_beam.ml.inference.base import ModelHandler from apache_beam.ml.inference.base import RunInference from apache_beam.ml.transforms.base import EmbeddingsManager +from apache_beam.ml.transforms.base import _ImageEmbeddingHandler from apache_beam.ml.transforms.base import _TextEmbeddingHandler try: @@ -153,6 +154,45 @@ def get_ptransform_for_processing(self, **kwargs) -> beam.PTransform: )) +class SentenceTransformerImageEmbeddings(EmbeddingsManager): + def __init__(self, model_name: str, columns: List[str], **kwargs): + """ + Embedding config for sentence-transformers. This config can be used with + MLTransform to embed image data. Models are loaded using the RunInference + PTransform with the help of ModelHandler. + + Args: + model_name: Name of the model to use. The model should be hosted on + HuggingFace Hub or compatible with sentence_transformers. See + https://www.sbert.net/docs/sentence_transformer/pretrained_models.html#image-text-models # pylint: disable=line-too-long + for a list of sentence_transformers models. + columns: List of columns to be embedded. + min_batch_size: The minimum batch size to be used for inference. + max_batch_size: The maximum batch size to be used for inference. + large_model: Whether to share the model across processes. + """ + super().__init__(columns, **kwargs) + self.model_name = model_name + + def get_model_handler(self): + return _SentenceTransformerModelHandler( + model_class=SentenceTransformer, + model_name=self.model_name, + load_model_args=self.load_model_args, + min_batch_size=self.min_batch_size, + max_batch_size=self.max_batch_size, + large_model=self.large_model) + + def get_ptransform_for_processing(self, **kwargs) -> beam.PTransform: + # wrap the model handler in a _TextEmbeddingHandler since + # the SentenceTransformerEmbeddings works on text input data. + return ( + RunInference( + model_handler=_ImageEmbeddingHandler(self), + inference_args=self.inference_args, + )) + + class _InferenceAPIHandler(ModelHandler): def __init__(self, config: 'InferenceAPIEmbeddings'): super().__init__() diff --git a/sdks/python/apache_beam/ml/transforms/embeddings/huggingface_test.py b/sdks/python/apache_beam/ml/transforms/embeddings/huggingface_test.py index f94e747c5edd..a0fa072724c2 100644 --- a/sdks/python/apache_beam/ml/transforms/embeddings/huggingface_test.py +++ b/sdks/python/apache_beam/ml/transforms/embeddings/huggingface_test.py @@ -34,7 +34,9 @@ # pylint: disable=ungrouped-imports try: from apache_beam.ml.transforms.embeddings.huggingface import SentenceTransformerEmbeddings + from apache_beam.ml.transforms.embeddings.huggingface import SentenceTransformerImageEmbeddings from apache_beam.ml.transforms.embeddings.huggingface import InferenceAPIEmbeddings + from PIL import Image import torch except ImportError: SentenceTransformerEmbeddings = None # type: ignore @@ -278,6 +280,60 @@ def test_mltransform_to_ptransform_with_sentence_transformer(self): ptransform_list[i]._model_handler._underlying.model_name, model_name) +@pytest.mark.no_xdist +@unittest.skipIf( + SentenceTransformerEmbeddings is None, + 'sentence-transformers is not installed.') +class SentenceTransformerImageEmbeddingsTest(unittest.TestCase): + def setUp(self) -> None: + self.artifact_location = tempfile.mkdtemp(prefix='sentence_transformers_') + # this bucket has TTL and will be deleted periodically + self.gcs_artifact_location = os.path.join( + 'gs://temp-storage-for-perf-tests/sentence_transformers', + uuid.uuid4().hex) + self.model_name = "clip-ViT-B-32" + + def tearDown(self) -> None: + shutil.rmtree(self.artifact_location) + + def generateRandomImage(self, size: int) -> Image.Image: + imarray = np.random.rand(size, size, 3) * 255 + return Image.fromarray(imarray.astype('uint8')).convert('RGBA') + + def test_sentence_transformer_image_embeddings(self): + embedding_config = SentenceTransformerImageEmbeddings( + model_name=self.model_name, columns=[test_query_column]) + img = self.generateRandomImage(256) + with beam.Pipeline() as pipeline: + result_pcoll = ( + pipeline + | "CreateData" >> beam.Create([{ + test_query_column: img + }]) + | "MLTransform" >> MLTransform( + write_artifact_location=self.artifact_location).with_transform( + embedding_config)) + + def assert_element(element): + assert len(element[test_query_column]) == 512 + + _ = (result_pcoll | beam.Map(assert_element)) + + def test_sentence_transformer_images_with_str_data_types(self): + embedding_config = SentenceTransformerImageEmbeddings( + model_name=self.model_name, columns=[test_query_column]) + with self.assertRaises(TypeError): + with beam.Pipeline() as pipeline: + _ = ( + pipeline + | "CreateData" >> beam.Create([{ + test_query_column: "image.jpg" + }]) + | "MLTransform" >> MLTransform( + write_artifact_location=self.artifact_location).with_transform( + embedding_config)) + + @unittest.skipIf(_HF_TOKEN is None, 'HF_TOKEN environment variable not set.') class HuggingfaceInferenceAPITest(unittest.TestCase): def setUp(self): From e6d67194b9cea49d3a6f122cc0f9206c145745e3 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 6 Jun 2024 13:57:14 -0400 Subject: [PATCH 2/3] correct imports --- .../ml/transforms/embeddings/huggingface_test.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/transforms/embeddings/huggingface_test.py b/sdks/python/apache_beam/ml/transforms/embeddings/huggingface_test.py index a0fa072724c2..78b00333721f 100644 --- a/sdks/python/apache_beam/ml/transforms/embeddings/huggingface_test.py +++ b/sdks/python/apache_beam/ml/transforms/embeddings/huggingface_test.py @@ -48,6 +48,12 @@ except ImportError: tft = None +# pylint: disable=ungrouped-imports +try: + from PIL import Image +except ImportError: + Image = None + _HF_TOKEN = os.environ.get('HF_INFERENCE_TOKEN') test_query = "This is a test" test_query_column = "feature_1" @@ -284,6 +290,7 @@ def test_mltransform_to_ptransform_with_sentence_transformer(self): @unittest.skipIf( SentenceTransformerEmbeddings is None, 'sentence-transformers is not installed.') +@unittest.skipIf(Image is None, 'Pillow is not installed.') class SentenceTransformerImageEmbeddingsTest(unittest.TestCase): def setUp(self) -> None: self.artifact_location = tempfile.mkdtemp(prefix='sentence_transformers_') @@ -296,7 +303,7 @@ def setUp(self) -> None: def tearDown(self) -> None: shutil.rmtree(self.artifact_location) - def generateRandomImage(self, size: int) -> Image.Image: + def generateRandomImage(self, size: int): imarray = np.random.rand(size, size, 3) * 255 return Image.fromarray(imarray.astype('uint8')).convert('RGBA') From 40fa9dfda5b55c8d0af75def888080b9d1feeddb Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 7 Jun 2024 09:58:09 -0400 Subject: [PATCH 3/3] Simplify to original sentence transformer class --- .../ml/transforms/embeddings/huggingface.py | 57 +++++-------------- .../transforms/embeddings/huggingface_test.py | 35 ++++-------- 2 files changed, 26 insertions(+), 66 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/embeddings/huggingface.py b/sdks/python/apache_beam/ml/transforms/embeddings/huggingface.py index 9c9d7cf29178..46b4ef9cf7d6 100644 --- a/sdks/python/apache_beam/ml/transforms/embeddings/huggingface.py +++ b/sdks/python/apache_beam/ml/transforms/embeddings/huggingface.py @@ -115,6 +115,7 @@ def __init__( model_name: str, columns: List[str], max_seq_length: Optional[int] = None, + image_model: bool = False, **kwargs): """ Embedding config for sentence-transformers. This config can be used with @@ -123,9 +124,13 @@ def __init__( Args: model_name: Name of the model to use. The model should be hosted on - HuggingFace Hub or compatible with sentence_transformers. + HuggingFace Hub or compatible with sentence_transformers. For image + embedding models, see + https://www.sbert.net/docs/sentence_transformer/pretrained_models.html#image-text-models # pylint: disable=line-too-long + for a list of available sentence_transformers models. columns: List of columns to be embedded. max_seq_length: Max sequence length to use for the model if applicable. + image_model: Whether the model is generating image embeddings. min_batch_size: The minimum batch size to be used for inference. max_batch_size: The maximum batch size to be used for inference. large_model: Whether to share the model across processes. @@ -133,6 +138,7 @@ def __init__( super().__init__(columns, **kwargs) self.model_name = model_name self.max_seq_length = max_seq_length + self.image_model = image_model def get_model_handler(self): return _SentenceTransformerModelHandler( @@ -145,8 +151,14 @@ def get_model_handler(self): large_model=self.large_model) def get_ptransform_for_processing(self, **kwargs) -> beam.PTransform: - # wrap the model handler in a _TextEmbeddingHandler since - # the SentenceTransformerEmbeddings works on text input data. + # wrap the model handler in an appropriate embedding handler to provide + # some type checking. + if self.image_model: + return ( + RunInference( + model_handler=_ImageEmbeddingHandler(self), + inference_args=self.inference_args, + )) return ( RunInference( model_handler=_TextEmbeddingHandler(self), @@ -154,45 +166,6 @@ def get_ptransform_for_processing(self, **kwargs) -> beam.PTransform: )) -class SentenceTransformerImageEmbeddings(EmbeddingsManager): - def __init__(self, model_name: str, columns: List[str], **kwargs): - """ - Embedding config for sentence-transformers. This config can be used with - MLTransform to embed image data. Models are loaded using the RunInference - PTransform with the help of ModelHandler. - - Args: - model_name: Name of the model to use. The model should be hosted on - HuggingFace Hub or compatible with sentence_transformers. See - https://www.sbert.net/docs/sentence_transformer/pretrained_models.html#image-text-models # pylint: disable=line-too-long - for a list of sentence_transformers models. - columns: List of columns to be embedded. - min_batch_size: The minimum batch size to be used for inference. - max_batch_size: The maximum batch size to be used for inference. - large_model: Whether to share the model across processes. - """ - super().__init__(columns, **kwargs) - self.model_name = model_name - - def get_model_handler(self): - return _SentenceTransformerModelHandler( - model_class=SentenceTransformer, - model_name=self.model_name, - load_model_args=self.load_model_args, - min_batch_size=self.min_batch_size, - max_batch_size=self.max_batch_size, - large_model=self.large_model) - - def get_ptransform_for_processing(self, **kwargs) -> beam.PTransform: - # wrap the model handler in a _TextEmbeddingHandler since - # the SentenceTransformerEmbeddings works on text input data. - return ( - RunInference( - model_handler=_ImageEmbeddingHandler(self), - inference_args=self.inference_args, - )) - - class _InferenceAPIHandler(ModelHandler): def __init__(self, config: 'InferenceAPIEmbeddings'): super().__init__() diff --git a/sdks/python/apache_beam/ml/transforms/embeddings/huggingface_test.py b/sdks/python/apache_beam/ml/transforms/embeddings/huggingface_test.py index 78b00333721f..d09a573b6766 100644 --- a/sdks/python/apache_beam/ml/transforms/embeddings/huggingface_test.py +++ b/sdks/python/apache_beam/ml/transforms/embeddings/huggingface_test.py @@ -34,7 +34,6 @@ # pylint: disable=ungrouped-imports try: from apache_beam.ml.transforms.embeddings.huggingface import SentenceTransformerEmbeddings - from apache_beam.ml.transforms.embeddings.huggingface import SentenceTransformerImageEmbeddings from apache_beam.ml.transforms.embeddings.huggingface import InferenceAPIEmbeddings from PIL import Image import torch @@ -58,6 +57,7 @@ test_query = "This is a test" test_query_column = "feature_1" DEFAULT_MODEL_NAME = "sentence-transformers/all-mpnet-base-v2" +IMAGE_MODEL_NAME = "clip-ViT-B-32" _parameterized_inputs = [ ([{ test_query_column: 'That is a happy person' @@ -93,7 +93,7 @@ @unittest.skipIf( SentenceTransformerEmbeddings is None, 'sentence-transformers is not installed.') -class SentenceTrasformerEmbeddingsTest(unittest.TestCase): +class SentenceTransformerEmbeddingsTest(unittest.TestCase): def setUp(self) -> None: self.artifact_location = tempfile.mkdtemp(prefix='sentence_transformers_') # this bucket has TTL and will be deleted periodically @@ -285,31 +285,16 @@ def test_mltransform_to_ptransform_with_sentence_transformer(self): self.assertEqual( ptransform_list[i]._model_handler._underlying.model_name, model_name) - -@pytest.mark.no_xdist -@unittest.skipIf( - SentenceTransformerEmbeddings is None, - 'sentence-transformers is not installed.') -@unittest.skipIf(Image is None, 'Pillow is not installed.') -class SentenceTransformerImageEmbeddingsTest(unittest.TestCase): - def setUp(self) -> None: - self.artifact_location = tempfile.mkdtemp(prefix='sentence_transformers_') - # this bucket has TTL and will be deleted periodically - self.gcs_artifact_location = os.path.join( - 'gs://temp-storage-for-perf-tests/sentence_transformers', - uuid.uuid4().hex) - self.model_name = "clip-ViT-B-32" - - def tearDown(self) -> None: - shutil.rmtree(self.artifact_location) - def generateRandomImage(self, size: int): imarray = np.random.rand(size, size, 3) * 255 return Image.fromarray(imarray.astype('uint8')).convert('RGBA') + @unittest.skipIf(Image is None, 'Pillow is not installed.') def test_sentence_transformer_image_embeddings(self): - embedding_config = SentenceTransformerImageEmbeddings( - model_name=self.model_name, columns=[test_query_column]) + embedding_config = SentenceTransformerEmbeddings( + model_name=IMAGE_MODEL_NAME, + columns=[test_query_column], + image_model=True) img = self.generateRandomImage(256) with beam.Pipeline() as pipeline: result_pcoll = ( @@ -327,8 +312,10 @@ def assert_element(element): _ = (result_pcoll | beam.Map(assert_element)) def test_sentence_transformer_images_with_str_data_types(self): - embedding_config = SentenceTransformerImageEmbeddings( - model_name=self.model_name, columns=[test_query_column]) + embedding_config = SentenceTransformerEmbeddings( + model_name=IMAGE_MODEL_NAME, + columns=[test_query_column], + image_model=True) with self.assertRaises(TypeError): with beam.Pipeline() as pipeline: _ = (