From 0f33b05ae8d0910afa5da6756a7852c2a4a0bcaa Mon Sep 17 00:00:00 2001 From: Frankie Bromage Date: Tue, 26 Nov 2024 10:55:53 -0800 Subject: [PATCH 1/6] add option to not embed field names for vector db destination --- airbyte_cdk/destinations/vector_db_based/config.py | 6 ++++++ .../destinations/vector_db_based/document_processor.py | 8 +++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/destinations/vector_db_based/config.py b/airbyte_cdk/destinations/vector_db_based/config.py index 904f40d3..71085bb4 100644 --- a/airbyte_cdk/destinations/vector_db_based/config.py +++ b/airbyte_cdk/destinations/vector_db_based/config.py @@ -108,6 +108,12 @@ class ProcessingConfigModel(BaseModel): always_show=True, examples=["text", "user.name", "users.*.name"], ) + omit_field_names_from_embeddings: bool = Field( + default=False, + title="Omit field names from embeddings", + description="Do not include the field names in the text that gets embedded. By default field names are embedded e.g 'user: name, user.email: email@email.com'. If set to true, only the values are embedded e.g. 'name, email@email.com'.", + always_show=True, + ) metadata_fields: Optional[List[str]] = Field( default=[], title="Fields to store as metadata", diff --git a/airbyte_cdk/destinations/vector_db_based/document_processor.py b/airbyte_cdk/destinations/vector_db_based/document_processor.py index 6e1723cb..6f983716 100644 --- a/airbyte_cdk/destinations/vector_db_based/document_processor.py +++ b/airbyte_cdk/destinations/vector_db_based/document_processor.py @@ -125,6 +125,7 @@ def __init__(self, config: ProcessingConfigModel, catalog: ConfiguredAirbyteCata self.text_fields = config.text_fields self.metadata_fields = config.metadata_fields self.field_name_mappings = config.field_name_mappings + self.omit_field_names_from_embeddings = config.omit_field_names_from_embeddings self.logger = logging.getLogger("airbyte.document_processor") def process(self, record: AirbyteRecordMessage) -> Tuple[List[Chunk], Optional[str]]: @@ -162,7 +163,12 @@ def _generate_document(self, record: AirbyteRecordMessage) -> Optional[Document] relevant_fields = self._extract_relevant_fields(record, self.text_fields) if len(relevant_fields) == 0: return None - text = stringify_dict(relevant_fields) + if self.omit_field_names_from_embeddings == False: + text = stringify_dict(relevant_fields) + else: + text = "" + for key, value in relevant_fields.items(): + text += f"{value}\n" metadata = self._extract_metadata(record) return Document(page_content=text, metadata=metadata) From 9313f6df990c2bce9409b16277ec2ff545fd0987 Mon Sep 17 00:00:00 2001 From: Frankie Bromage Date: Tue, 26 Nov 2024 11:25:35 -0800 Subject: [PATCH 2/6] add unit test --- .../vector_db_based/document_processor.py | 12 +++-- .../vector_db_based/config_test.py | 7 +++ .../document_processor_test.py | 49 +++++++++++++++++++ 3 files changed, 65 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/destinations/vector_db_based/document_processor.py b/airbyte_cdk/destinations/vector_db_based/document_processor.py index 6f983716..ce2bcf61 100644 --- a/airbyte_cdk/destinations/vector_db_based/document_processor.py +++ b/airbyte_cdk/destinations/vector_db_based/document_processor.py @@ -166,12 +166,18 @@ def _generate_document(self, record: AirbyteRecordMessage) -> Optional[Document] if self.omit_field_names_from_embeddings == False: text = stringify_dict(relevant_fields) else: - text = "" - for key, value in relevant_fields.items(): - text += f"{value}\n" + text = self._extract_values_from_dict(relevant_fields) metadata = self._extract_metadata(record) return Document(page_content=text, metadata=metadata) + def _extract_values_from_dict(self, data): + if isinstance(data, dict): + return "\n".join(self._extract_values_from_dict(value) for value in data.values()) + elif isinstance(data, list): + return "\n".join(self._extract_values_from_dict(item) for item in data) + else: + return str(data) + def _extract_relevant_fields( self, record: AirbyteRecordMessage, fields: Optional[List[str]] ) -> Dict[str, Any]: diff --git a/unit_tests/destinations/vector_db_based/config_test.py b/unit_tests/destinations/vector_db_based/config_test.py index 0eeae37b..28de50e9 100644 --- a/unit_tests/destinations/vector_db_based/config_test.py +++ b/unit_tests/destinations/vector_db_based/config_test.py @@ -242,6 +242,13 @@ def test_json_schema_generation(): "type": "array", "items": {"type": "string"}, }, + "omit_field_names_from_embeddings": { + "title": "Omit field names from embeddings", + "description": "Do not include the field names in the text that gets embedded. By default field names are embedded e.g 'user: name, user.email: email@email.com'. If set to true, only the values are embedded e.g. 'name, email@email.com'.", + "default": False, + "always_show": True, + "type": "boolean", + }, "metadata_fields": { "title": "Fields to store as metadata", "description": "List of fields in the record that should be stored as metadata. The field list is applied to all streams in the same way and non-existing fields are ignored. If none are defined, all fields are considered metadata fields. When specifying text fields, you can access nested fields in the record by using dot notation, e.g. `user.name` will access the `name` field in the `user` object. It's also possible to use wildcards to access all fields in an object, e.g. `users.*.name` will access all `names` fields in all entries of the `users` array. When specifying nested paths, all matching values are flattened into an array set to a field named by the path.", diff --git a/unit_tests/destinations/vector_db_based/document_processor_test.py b/unit_tests/destinations/vector_db_based/document_processor_test.py index f427f42d..76beb3da 100644 --- a/unit_tests/destinations/vector_db_based/document_processor_test.py +++ b/unit_tests/destinations/vector_db_based/document_processor_test.py @@ -193,6 +193,7 @@ def test_complex_text_fields(): "non.*.existing", ] processor.metadata_fields = ["non_text", "non_text_2", "id"] + processor.omit_field_names_from_embeddings = False chunks, _ = processor.process(record) @@ -212,6 +213,54 @@ def test_complex_text_fields(): "_ab_stream": "namespace1_stream1", } +def test_complex_text_fields_omit_field_names(): + processor = initialize_processor() + + record = AirbyteRecordMessage( + stream="stream1", + namespace="namespace1", + data={ + "id": 1, + "nested": { + "texts": [ + {"text": "This is the text"}, + {"text": "And another"}, + ] + }, + "non_text": "a", + "non_text_2": 1, + "text": "This is the regular text", + "other_nested": {"non_text": {"a": "xyz", "b": "abc"}}, + }, + emitted_at=1234, + ) + + processor.text_fields = [ + "nested.texts.*.text", + "text", + "other_nested.non_text", + "non.*.existing", + ] + processor.metadata_fields = ["non_text", "non_text_2", "id"] + processor.omit_field_names_from_embeddings = True + + chunks, _ = processor.process(record) + + assert len(chunks) == 1 + assert ( + chunks[0].page_content + == """This is the text +And another +This is the regular text +xyz +abc""" + ) + assert chunks[0].metadata == { + "id": 1, + "non_text": "a", + "non_text_2": 1, + "_ab_stream": "namespace1_stream1", + } def test_no_text_fields(): processor = initialize_processor() From 42be95f401d8493382a5dd3adfdc94c9050ac00f Mon Sep 17 00:00:00 2001 From: Frankie Bromage Date: Tue, 26 Nov 2024 16:13:07 -0800 Subject: [PATCH 3/6] reformat according to standards --- .../destinations/vector_db_based/document_processor.py | 6 +++--- .../destinations/vector_db_based/document_processor_test.py | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/destinations/vector_db_based/document_processor.py b/airbyte_cdk/destinations/vector_db_based/document_processor.py index ce2bcf61..b3b4afc7 100644 --- a/airbyte_cdk/destinations/vector_db_based/document_processor.py +++ b/airbyte_cdk/destinations/vector_db_based/document_processor.py @@ -5,7 +5,7 @@ import json import logging from dataclasses import dataclass -from typing import Any, Dict, List, Mapping, Optional, Tuple +from typing import Any, Dict, List, Mapping, Optional, Tuple, Union import dpath from airbyte_cdk.destinations.vector_db_based.config import ( @@ -163,14 +163,14 @@ def _generate_document(self, record: AirbyteRecordMessage) -> Optional[Document] relevant_fields = self._extract_relevant_fields(record, self.text_fields) if len(relevant_fields) == 0: return None - if self.omit_field_names_from_embeddings == False: + if not self.omit_field_names_from_embeddings: text = stringify_dict(relevant_fields) else: text = self._extract_values_from_dict(relevant_fields) metadata = self._extract_metadata(record) return Document(page_content=text, metadata=metadata) - def _extract_values_from_dict(self, data): + def _extract_values_from_dict(self, data: Union[dict, list, Any]) -> str: if isinstance(data, dict): return "\n".join(self._extract_values_from_dict(value) for value in data.values()) elif isinstance(data, list): diff --git a/unit_tests/destinations/vector_db_based/document_processor_test.py b/unit_tests/destinations/vector_db_based/document_processor_test.py index 76beb3da..c43ab30b 100644 --- a/unit_tests/destinations/vector_db_based/document_processor_test.py +++ b/unit_tests/destinations/vector_db_based/document_processor_test.py @@ -213,6 +213,7 @@ def test_complex_text_fields(): "_ab_stream": "namespace1_stream1", } + def test_complex_text_fields_omit_field_names(): processor = initialize_processor() @@ -262,6 +263,7 @@ def test_complex_text_fields_omit_field_names(): "_ab_stream": "namespace1_stream1", } + def test_no_text_fields(): processor = initialize_processor() From 5f8b8a3e6fe1ad6308c7883da1767cef615011d3 Mon Sep 17 00:00:00 2001 From: Frankie Bromage Date: Tue, 26 Nov 2024 16:45:08 -0800 Subject: [PATCH 4/6] increase test coverage and add typing --- .../destinations/vector_db_based/config.py | 2 +- .../vector_db_based/document_processor.py | 21 ++++++++++++------- .../vector_db_based/config_test.py | 2 +- .../document_processor_test.py | 9 +++++++- 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/airbyte_cdk/destinations/vector_db_based/config.py b/airbyte_cdk/destinations/vector_db_based/config.py index 71085bb4..0633012f 100644 --- a/airbyte_cdk/destinations/vector_db_based/config.py +++ b/airbyte_cdk/destinations/vector_db_based/config.py @@ -111,7 +111,7 @@ class ProcessingConfigModel(BaseModel): omit_field_names_from_embeddings: bool = Field( default=False, title="Omit field names from embeddings", - description="Do not include the field names in the text that gets embedded. By default field names are embedded e.g 'user: name, user.email: email@email.com'. If set to true, only the values are embedded e.g. 'name, email@email.com'.", + description="Do not include the field names in the text that gets embedded. By default field names are embedded (e.g., 'user.name: John Doe \n user.email: john@example.com'). If set to true, only the values are embedded (e.g., 'John Doe \n john@example.com').", always_show=True, ) metadata_fields: Optional[List[str]] = Field( diff --git a/airbyte_cdk/destinations/vector_db_based/document_processor.py b/airbyte_cdk/destinations/vector_db_based/document_processor.py index b3b4afc7..d2040c4d 100644 --- a/airbyte_cdk/destinations/vector_db_based/document_processor.py +++ b/airbyte_cdk/destinations/vector_db_based/document_processor.py @@ -163,18 +163,23 @@ def _generate_document(self, record: AirbyteRecordMessage) -> Optional[Document] relevant_fields = self._extract_relevant_fields(record, self.text_fields) if len(relevant_fields) == 0: return None - if not self.omit_field_names_from_embeddings: - text = stringify_dict(relevant_fields) - else: - text = self._extract_values_from_dict(relevant_fields) + text = self._generate_text_from_fields(relevant_fields) metadata = self._extract_metadata(record) return Document(page_content=text, metadata=metadata) + + def _generate_text_from_fields(self, fields: Dict[str, Any]) -> str: + if self.omit_field_names_from_embeddings: + return self._extract_values_from_dict(fields) + else: + return stringify_dict(fields) - def _extract_values_from_dict(self, data: Union[dict, list, Any]) -> str: - if isinstance(data, dict): - return "\n".join(self._extract_values_from_dict(value) for value in data.values()) + def _extract_values_from_dict(self, data: Union[Dict[Any, Any], List[Any], Any], join_char: str = '\n') -> str: + if data is None: + return "" + elif isinstance(data, dict): + return join_char.join(self._extract_values_from_dict(value) for value in data.values()) elif isinstance(data, list): - return "\n".join(self._extract_values_from_dict(item) for item in data) + return join_char.join(self._extract_values_from_dict(item) for item in data) else: return str(data) diff --git a/unit_tests/destinations/vector_db_based/config_test.py b/unit_tests/destinations/vector_db_based/config_test.py index 28de50e9..7899a516 100644 --- a/unit_tests/destinations/vector_db_based/config_test.py +++ b/unit_tests/destinations/vector_db_based/config_test.py @@ -244,7 +244,7 @@ def test_json_schema_generation(): }, "omit_field_names_from_embeddings": { "title": "Omit field names from embeddings", - "description": "Do not include the field names in the text that gets embedded. By default field names are embedded e.g 'user: name, user.email: email@email.com'. If set to true, only the values are embedded e.g. 'name, email@email.com'.", + "description": "Do not include the field names in the text that gets embedded. By default field names are embedded (e.g., 'user.name: John Doe \n user.email: john@example.com'). If set to true, only the values are embedded (e.g., 'John Doe \n john@example.com').", "default": False, "always_show": True, "type": "boolean", diff --git a/unit_tests/destinations/vector_db_based/document_processor_test.py b/unit_tests/destinations/vector_db_based/document_processor_test.py index c43ab30b..21e52c21 100644 --- a/unit_tests/destinations/vector_db_based/document_processor_test.py +++ b/unit_tests/destinations/vector_db_based/document_processor_test.py @@ -232,6 +232,9 @@ def test_complex_text_fields_omit_field_names(): "non_text_2": 1, "text": "This is the regular text", "other_nested": {"non_text": {"a": "xyz", "b": "abc"}}, + "empty_list": [], + "empty_dict": {}, + "large_nested": {"a": {"b": {"c": {"d": {"e": {"f": {"g": "h"}}}}}}}, }, emitted_at=1234, ) @@ -241,6 +244,9 @@ def test_complex_text_fields_omit_field_names(): "text", "other_nested.non_text", "non.*.existing", + "large_nested", + "empty_list", + "empty_dict", ] processor.metadata_fields = ["non_text", "non_text_2", "id"] processor.omit_field_names_from_embeddings = True @@ -254,7 +260,8 @@ def test_complex_text_fields_omit_field_names(): And another This is the regular text xyz -abc""" +abc +h""" ) assert chunks[0].metadata == { "id": 1, From 6f82849933db4195cbb3be7e66ae8f5e1808a450 Mon Sep 17 00:00:00 2001 From: Frankie Bromage Date: Tue, 26 Nov 2024 19:52:00 -0800 Subject: [PATCH 5/6] reformat according to standards --- .../destinations/vector_db_based/document_processor.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/destinations/vector_db_based/document_processor.py b/airbyte_cdk/destinations/vector_db_based/document_processor.py index d2040c4d..41623113 100644 --- a/airbyte_cdk/destinations/vector_db_based/document_processor.py +++ b/airbyte_cdk/destinations/vector_db_based/document_processor.py @@ -166,14 +166,16 @@ def _generate_document(self, record: AirbyteRecordMessage) -> Optional[Document] text = self._generate_text_from_fields(relevant_fields) metadata = self._extract_metadata(record) return Document(page_content=text, metadata=metadata) - + def _generate_text_from_fields(self, fields: Dict[str, Any]) -> str: if self.omit_field_names_from_embeddings: return self._extract_values_from_dict(fields) else: return stringify_dict(fields) - def _extract_values_from_dict(self, data: Union[Dict[Any, Any], List[Any], Any], join_char: str = '\n') -> str: + def _extract_values_from_dict( + self, data: Union[Dict[Any, Any], List[Any], Any], join_char: str = "\n" + ) -> str: if data is None: return "" elif isinstance(data, dict): From d55f599a921861c17db455f34fa0822f01748e76 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Wed, 4 Dec 2024 10:47:24 -0800 Subject: [PATCH 6/6] replace pinecone connector test with pgvector test --- .github/workflows/connector-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/connector-tests.yml b/.github/workflows/connector-tests.yml index 0a082a38..bcc944b1 100644 --- a/.github/workflows/connector-tests.yml +++ b/.github/workflows/connector-tests.yml @@ -76,7 +76,7 @@ jobs: cdk_extra: n/a - connector: source-s3 cdk_extra: file-based - - connector: destination-pinecone + - connector: destination-pgvector cdk_extra: vector-db-based - connector: destination-motherduck cdk_extra: sql