Skip to content

Commit

Permalink
Add ingestion and enrichment examples.
Browse files Browse the repository at this point in the history
  • Loading branch information
claudevdm committed Dec 8, 2024
1 parent 769cdc4 commit 55cacf2
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 0 deletions.
77 changes: 77 additions & 0 deletions sdks/python/apache_beam/ml/rag/examples/enrichment_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import apache_beam as beam

import tempfile

from langchain.text_splitter import RecursiveCharacterTextSplitter
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.rag.embeddings.huggingface import HuggingfaceTextEmbeddings
from transformers import AutoTokenizer
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.ml.rag.types import Chunk, Content
from apache_beam.ml.rag.enrichment.bigquery_vector_search import BigQueryVectorSearchEnrichmentHandler, BigQueryVectorSearchParameters
from apache_beam.transforms.enrichment import Enrichment

PROJECT = "<PROJECT>"
BIGQUERY_TABLE = "<BIGQUERY_TABLE>"

huggingface_embedder = HuggingfaceTextEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2")
tokenizer = AutoTokenizer.from_pretrained(
"sentence-transformers/all-MiniLM-L6-v2")
splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(
tokenizer,
chunk_size=512,
chunk_overlap=52,
)


def run_pipeline():
with beam.Pipeline() as p:

# Enrichment
_ = (
p
| beam.Create([
Chunk(
id="simple_query",
content=Content(text="This is a simple test document."),
metadata={"language": "en"}),
Chunk(
id="medical_query",
content=Content(text="When did the patient arrive?"),
metadata={"language": "en"}),
])
| MLTransform(write_artifact_location=tempfile.mkdtemp()).
with_transform(huggingface_embedder)
| Enrichment(
BigQueryVectorSearchEnrichmentHandler(
project=PROJECT,
vector_search_parameters=BigQueryVectorSearchParameters(
table_name=BIGQUERY_TABLE,
embedding_column='embedding',
columns=['metadata', 'content'],
neighbor_count=3,
metadata_restriction_template=(
"check_metadata(metadata, 'language','{language}')"))))
| beam.Map(print))


if __name__ == '__main__':
run_pipeline()
71 changes: 71 additions & 0 deletions sdks/python/apache_beam/ml/rag/examples/ingestion_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import apache_beam as beam

import tempfile

from typing import Any, Dict

from langchain.text_splitter import RecursiveCharacterTextSplitter
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.rag.chunking.langchain import LangChainChunkingProvider
from apache_beam.ml.rag.embeddings.huggingface import HuggingfaceTextEmbeddings
from apache_beam.ml.rag.ingestion.base import VectorDatabaseWriteTransform
from apache_beam.ml.rag.ingestion.bigquery import BigQueryVectorWriterConfig
from transformers import AutoTokenizer
from apache_beam.options.pipeline_options import PipelineOptions

TEMP_LOCATION = "<TEMP_LOCATION>"
BIGQUERY_TABLE = "<BIGQUERY_TABLE>"

huggingface_embedder = HuggingfaceTextEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2")
tokenizer = AutoTokenizer.from_pretrained(
"sentence-transformers/all-MiniLM-L6-v2")
splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(
tokenizer,
chunk_size=512,
chunk_overlap=52,
)


def run_pipeline():
with beam.Pipeline(
options=PipelineOptions(['--runner=DirectRunner',
f'--temp_location={TEMP_LOCATION}',
'--expansion_service_port=8888'])) as p:

# Ingestion
_ = (
p
| beam.Create([{
'content': 'This is a simple test document. It has multiple sentences. '
'We will use it to test basic splitting. ' * 20,
'source': 'simple.txt',
'language': 'en'
},
{
'content': (
'The patient arrived at 2 p.m. yesterday. '
'Initial assessment was completed. '
'Lab results showed normal ranges. '
'Follow-up scheduled for next week.' * 10),
'source': 'medical.txt',
'language': 'en'
}])
|
MLTransform(write_artifact_location=tempfile.mkdtemp()).with_transform(
LangChainChunkingProvider(
text_splitter=splitter,
document_field="content",
metadata_fields=["source", "language"
])).with_transform(huggingface_embedder)
| VectorDatabaseWriteTransform(
BigQueryVectorWriterConfig(
write_config={
"table": BIGQUERY_TABLE,
"create_disposition": "CREATE_IF_NEEDED",
"write_disposition": "WRITE_TRUNCATE",
})))


if __name__ == '__main__':
run_pipeline()

0 comments on commit 55cacf2

Please sign in to comment.