Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rag opensearch usecase with Beam's MLTransform #32018

Merged
merged 27 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1980ff8
Adding insertion and enrichment pipeline
itsayushpandey Jun 20, 2024
f511706
Enhanced Data Schema
itsayushpandey Jun 27, 2024
d96c7b3
Added Apache Licensed to the notebook
itsayushpandey Jun 27, 2024
89a651a
Adding Chunking Strategy
itsayushpandey Jul 6, 2024
acceb6f
removed unused imports
itsayushpandey Jul 6, 2024
d766f48
Modified insertion logic in redis for incorporating chunking strategy
itsayushpandey Jul 6, 2024
e3d9ca7
refacted redis code
itsayushpandey Jul 10, 2024
2158fab
code review changes
itsayushpandey Jul 15, 2024
d4eaf1b
Added chunking code in notebook
itsayushpandey Jul 15, 2024
12a4297
Added code review changes
itsayushpandey Jul 23, 2024
0167f81
Code review changes: using chunking strategy as enum
itsayushpandey Jul 25, 2024
b37b5a8
Added Code Review Changes
itsayushpandey Jul 30, 2024
471580c
Code review changes
itsayushpandey Jul 31, 2024
5175714
Added code review changes
itsayushpandey Jul 31, 2024
8466e47
Added Code Review Changes
itsayushpandey Aug 9, 2024
6054399
Code review changes
itsayushpandey Aug 9, 2024
3affaad
Ingestion and Enrichment pipeline for OpenSearch Vector DB
itsayushpandey Jul 30, 2024
2aaf074
Added logic for reading password from .env file
itsayushpandey Aug 10, 2024
26d1243
Added opensearch vector notebook
itsayushpandey Aug 17, 2024
fb644e0
Update credentials.env
itsayushpandey Aug 18, 2024
845acb6
Added code review changes
itsayushpandey Aug 20, 2024
5db3fd0
Merge branch 'apache:master' into rag_opensearch
itsayushpandey Sep 12, 2024
c1e8d6d
Merge branch 'apache:master' into rag_opensearch
itsayushpandey Sep 13, 2024
f972959
Added Description in opensearch notebook
itsayushpandey Sep 21, 2024
c4c27bd
Added description in opensearch notebook
itsayushpandey Sep 21, 2024
8ddf529
Merge branch 'apache:master' into rag_opensearch
itsayushpandey Oct 3, 2024
3968fc6
Code review changes
itsayushpandey Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,795 changes: 1,795 additions & 0 deletions examples/notebooks/beam-ml/rag_usecase/beam_rag_notebook.ipynb

Large diffs are not rendered by default.

129 changes: 129 additions & 0 deletions examples/notebooks/beam-ml/rag_usecase/chunks_generation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import absolute_import

import apache_beam as beam
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter
from langchain_text_splitters import SentenceTransformersTokenTextSplitter

from apache_beam.transforms import DoFn
from apache_beam.transforms import PTransform
from enum import Enum


__all__ = [
'ChunksGeneration',
'ChunkingStrategy'
]

class ChunkingStrategy(Enum):
SPLIT_BY_CHARACTER = 0
RECURSIVE_SPLIT_BY_CHARACTER = 1
SPLIT_BY_TOKENS = 2


class ChunksGeneration(PTransform):
"""ChunkingStrategy is a ``PTransform`` that takes a ``PCollection`` of
key, value tuple or 2-element array and generates different chunks for documents.
"""

def __init__(
self,
chunk_size: int,
chunk_overlap: int,
chunking_strategy: ChunkingStrategy
):
"""

Args:
chunk_size : Chunk size is the maximum number of characters that a chunk can contain
chunk_overlap : the number of characters that should overlap between two adjacent chunks
chunking_strategy : Defines the way to split text

Returns:
:class:`~apache_beam.transforms.ptransform.PTransform`

"""

self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.chunking_strategy = chunking_strategy

def expand(self, pcoll):
return pcoll \
| "Generate text chunks" >> beam.ParDo(_GenerateChunksFn(self.chunk_size,
self.chunk_overlap,
self.chunking_strategy))


class _GenerateChunksFn(DoFn):
"""Abstract class that takes in ptransform
and generate chunks.
"""

def __init__(
self,
chunk_size: int,
chunk_overlap: int,
chunking_strategy: ChunkingStrategy
):

self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.chunking_strategy = chunking_strategy

def process(self, element, *args, **kwargs):

# For recursive split by character
if self.chunking_strategy == ChunkingStrategy.RECURSIVE_SPLIT_BY_CHARACTER:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
length_function=len,
is_separator_regex=False,
)

# For split by character
elif self.chunking_strategy == ChunkingStrategy.SPLIT_BY_CHARACTER:
text_splitter = CharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
length_function=len,
is_separator_regex=False,
)

# For split by tokens
elif self.chunking_strategy == ChunkingStrategy.SPLIT_BY_TOKENS:
text_splitter = SentenceTransformersTokenTextSplitter(
chunk_overlap=self.chunk_overlap,
model_name='all-MiniLM-L6-v2'
)

else:
raise ValueError(f"Invalid chunking strategy: {self.chunking_strategy}")

texts = text_splitter.split_text(element['text'])[:]

element_copy = element.copy()
del element_copy['text']
for i, section in enumerate(texts):
element_copy['text'] = section
element_copy['section_id'] = i + 1
yield element_copy


20 changes: 20 additions & 0 deletions examples/notebooks/beam-ml/rag_usecase/credentials.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


OPENSEARCH_USERNAME="<ADD_YOUR>"
OPENSEARCH_PASSWORD="<ADD_YOUR>"
itsayushpandey marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading