forked from pathwaycom/llm-app
-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
130 lines (103 loc) · 4.03 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
Microservice for a context-aware ChatGPT assistant.
The following program reads in a collection of documents,
embeds each document using the OpenAI document embedding model,
then builds an index for fast retrieval of documents relevant to a question,
effectively replacing a vector database.
The program then starts a REST API endpoint serving queries about programming in Pathway.
Each query text is first turned into a vector using OpenAI embedding service,
then relevant documentation pages are found using a Nearest Neighbor index computed
for documents in the corpus. A prompt is built from the relevant documentations pages
and sent to the OpenAI GPT-4 chat service for processing.
Usage:
In the root of this repository run:
`poetry run ./run_examples.py unstructured`
or, if all dependencies are managed manually rather than using poetry
`python examples/pipelines/unstructured/app.py`
You can also run this example directly in the environment with llm_app installed.
In another terminal, navigate to `examples/pipelines/unstructured/ui` and run
`streamlit run server.py`. You can interact with the app at `localhost:8501`
"""
import os
import pathway as pw
from pathway.stdlib.ml.index import KNNIndex
from pathway.xpacks.llm.embedders import OpenAIEmbedder
from pathway.xpacks.llm.llms import OpenAIChat, prompt_chat_single_qa
from pathway.xpacks.llm.parsers import ParseUnstructured
from pathway.xpacks.llm.splitters import TokenCountSplitter
class QueryInputSchema(pw.Schema):
query: str
user: str
def run(
*,
data_dir: str = os.environ.get("PATHWAY_DATA_DIR", "./examples/data/finance/"),
api_key: str = os.environ.get("OPENAI_API_KEY", ""),
host: str = "0.0.0.0",
port: int = 8080,
embedder_locator: str = "text-embedding-ada-002",
embedding_dimension: int = 1536,
model_locator: str = "gpt-3.5-turbo",
max_tokens: int = 300,
temperature: float = 0.0,
**kwargs,
):
embedder = OpenAIEmbedder(
api_key=api_key,
model=embedder_locator,
retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(),
cache_strategy=pw.asynchronous.DefaultCache(),
)
files = pw.io.fs.read(
data_dir,
mode="streaming",
format="binary",
autocommit_duration_ms=50,
)
parser = ParseUnstructured()
documents = files.select(texts=parser(pw.this.data))
documents = documents.flatten(pw.this.texts)
documents = documents.select(texts=pw.this.texts[0])
splitter = TokenCountSplitter()
documents = documents.select(chunks=splitter(pw.this.texts))
documents = documents.flatten(pw.this.chunks)
documents = documents.select(chunk=pw.this.chunks[0])
enriched_documents = documents + documents.select(vector=embedder(pw.this.chunk))
index = KNNIndex(
enriched_documents.vector, enriched_documents, n_dimensions=embedding_dimension
)
query, response_writer = pw.io.http.rest_connector(
host=host,
port=port,
schema=QueryInputSchema,
autocommit_duration_ms=50,
delete_completed_queries=True,
)
query += query.select(
vector=embedder(pw.this.query),
)
query_context = query + index.get_nearest_items(
query.vector, k=3, collapse_rows=True
).select(documents_list=pw.this.chunk)
@pw.udf
def build_prompt(documents, query):
docs_str = "\n".join(documents)
prompt = f"Given the following documents : \n {docs_str} \nanswer this query: {query}"
return prompt
prompt = query_context.select(
prompt=build_prompt(pw.this.documents_list, pw.this.query)
)
model = OpenAIChat(
api_key=api_key,
model=model_locator,
temperature=temperature,
max_tokens=max_tokens,
retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(),
cache_strategy=pw.asynchronous.DefaultCache(),
)
responses = prompt.select(
query_id=pw.this.id, result=model(prompt_chat_single_qa(pw.this.prompt))
)
response_writer(responses)
pw.run()
if __name__ == "__main__":
run()