diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6907ac6 --- /dev/null +++ b/Makefile @@ -0,0 +1,26 @@ +# Copyright 2023 Iguazio +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +CONTROLLER_NAME = "genai-factory-controller" + +.PHONY: controller +controller: + # Build controller's image: + docker build -f controller/Dockerfile -t $(CONTROLLER_NAME):latest . + + # Run controller locally in a container: + docker run -d --net host --name $(CONTROLLER_NAME) $(CONTROLLER_NAME):latest + + # Announce the server is running: + @echo "GenAI Factory Controller is running in the background" diff --git a/README.md b/README.md new file mode 100644 index 0000000..ecac21e --- /dev/null +++ b/README.md @@ -0,0 +1,68 @@ +# GenAI Factory + +Demo an end to end LLM agent solution with modular architecture, persistent storage and front-end UI that can work with various LLM models and storage solutions. + +the configuration is specified in a YAML file, which indicate the model, embeddings, storage to use, and various parameters. +the user can point to the configuration file by setting the `AGENT_CONFIG_PATH` environment variable. + +environment variables and credentials can be loaded from a `.env` file in the root directory. or an alternate path set by the `AGENT_ENV_PATH` environment variable. +data can be stored in local files or remote SQL and Vector databases. the local file storage path can be set by the `AGENT_DATA_PATH` environment variable (defaults to `./data/`). + +# Getting it to work + +## Deploy the controller +This command will start the API controller server into a local docker container. +```shell +make controller +``` + +## Initialize the database: +The database is Initialized when building the controller. +In order to erase and start fresh, we can simply use the controller's command line interface. + +```shell +python -m controller.src.main initdb +``` + +## To start the application's API: + +```shell +uvicorn pipeline:app +``` + +## To start UI: +Future work will include a UI command to run the UI. +```shell +make ui +``` + +# CLI usage + +To ingest data into the vector database: +```shell +python -m controller.src.main ingest -l web https://milvus.io/docs/overview.md +``` + +To ask a question: +```shell +python -m controller.src.main query "What is a vector?" +``` + + +Full CLI: + +```shell +python -m controller.src.main + +Usage: python -m controller.src.main [OPTIONS] COMMAND [ARGS]... + +Options: + --help Show this message and exit. + +Commands: + config Print the config as a yaml file + ingest Ingest documents into the vector database + initdb Initialize the database (delete old tables) + list List the different objects in the database (by category) + query Run a chat query on the vector database collection +``` diff --git a/controller/Dockerfile b/controller/Dockerfile new file mode 100644 index 0000000..cd42172 --- /dev/null +++ b/controller/Dockerfile @@ -0,0 +1,45 @@ +# Copyright 2023 Iguazio +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ARG PYTHON_VERSION=3.9.19 +FROM python:${PYTHON_VERSION} + +# Update OS packages: +RUN apt-get update && \ + DEBIAN_FRONTEND=noninteractive apt-get -y upgrade && \ + rm -rf /var/lib/apt/lists/* + +RUN apt update -qqq \ + && apt -y upgrade \ + && apt install -y \ + build-essential \ + cmake \ + gcc \ + && rm -rf /var/lib/apt/lists/* \ + +WORKDIR /controller +COPY /controller/src /controller/src +COPY /controller/requirements.txt /controller/ + +# Make the data directory: +RUN mkdir -p ../data + +# Install requirements: +RUN pip install -r /controller/requirements.txt + +# Initiate database: +RUN python -m controller.src.main initdb + +# Run the controller's API server: +CMD ["uvicorn", "controller.src.api:app", "--port", "8001"] diff --git a/server/requirements.txt b/controller/requirements.txt similarity index 93% rename from server/requirements.txt rename to controller/requirements.txt index 7965a9e..4f3c6b1 100644 --- a/server/requirements.txt +++ b/controller/requirements.txt @@ -5,4 +5,6 @@ fastapi==0.85.1 SQLAlchemy~=2.0.23 uvicorn python-dotenv -pyyaml \ No newline at end of file +pyyaml +requests +tabulate \ No newline at end of file diff --git a/server/src/__init__.py b/controller/src/__init__.py similarity index 100% rename from server/src/__init__.py rename to controller/src/__init__.py diff --git a/server/src/api.py b/controller/src/api.py similarity index 64% rename from server/src/api.py rename to controller/src/api.py index e08e4de..333218d 100644 --- a/server/src/api.py +++ b/controller/src/api.py @@ -11,17 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json from typing import List, Optional, Tuple, Union +import requests from fastapi import (APIRouter, Depends, FastAPI, File, Header, Request, UploadFile) from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel -from .config import logger -from .model import ChatSession, DocCollection, OutputMode, QueryItem, User -from .sqlclient import client +from controller.src.config import config +from controller.src.model import ChatSession, DocCollection, OutputMode, User +from controller.src.sqlclient import client app = FastAPI() @@ -56,7 +57,7 @@ class AuthInfo(BaseModel): # placeholder for extracting the Auth info from the request -async def get_auth_user( +def get_auth_user( request: Request, x_username: Union[str, None] = Header(None) ) -> AuthInfo: """Get the user from the database""" @@ -64,36 +65,85 @@ async def get_auth_user( if x_username: return AuthInfo(username=x_username, token=token) else: - return AuthInfo(username="yhaviv@gmail.com", token=token) + return AuthInfo(username="guest@example.com", token=token) + + +def _send_to_application(path: str, method: str = "POST", request=None, auth=None, **kwargs): + """ + Send a request to the application's API. + + :param path: The API path to send the request to. + :param method: The HTTP method to use: GET, POST, PUT, DELETE, etc. + :param request: The FastAPI request object. If provided, the data will be taken from the body of the request. + :param auth: The authentication information to use. If provided, the username will be added to the headers. + :param kwargs: Additional keyword arguments to pass in the request function. For example, headers, params, etc. + + :return: The JSON response from the application. + """ + url = f"{config.application_url}/api/{path}" + + if isinstance(request, Request): + # If the request is a FastAPI request, get the data from the body + kwargs["data"] = request._body.decode("utf-8") + if auth is not None: + kwargs["headers"] = {"x_username": auth.username} + + response = requests.request( + method=method, + url=url, + **kwargs, + ) + + # Check the response + if response.status_code == 200: + # If the request was successful, return the JSON response + return response.json() + else: + # If the request failed, raise an exception + response.raise_for_status() @router.post("/tables") -async def create_tables(drop_old: bool = False, names: list[str] = None): +def create_tables(drop_old: bool = False, names: list[str] = None): return client.create_tables(drop_old=drop_old, names=names) @router.post("/pipeline/{name}/run") -async def run_pipeline( - request: Request, name: str, item: QueryItem, auth=Depends(get_auth_user) +def run_pipeline( + request: Request, name: str, auth=Depends(get_auth_user) ): """This is the query command""" - app_server = request.app.extra.get("app_server") - if not app_server: - raise ValueError("app_server not found in app") - event = { - "username": auth.username, - "session_id": item.session_id, - "query": item.question, - "collection_name": item.collection, + return _send_to_application( + path=f"pipeline/{name}/run", + method="POST", + request=request, + auth=auth, + ) + + +@router.post("/collections/{collection}/{loader}/ingest") +def ingest( + collection, path, loader, metadata, version, from_file, auth=Depends(get_auth_user) +): + """Ingest documents into the vector database""" + params = { + "path": path, + "from_file": from_file, + "version": version, } - logger.debug(f"running pipeline {name}: {event}") - resp = app_server.run_pipeline(name, event) - print(f"resp: {resp}") - return resp + if metadata is not None: + params["metadata"] = json.dumps(metadata) + + return _send_to_application( + path=f"collections/{collection}/{loader}/ingest", + method="POST", + params=params, + auth=auth, + ) @router.get("/collections") -async def list_collections( +def list_collections( owner: str = None, labels: Optional[List[Tuple[str, str]]] = None, mode: OutputMode = OutputMode.Details, @@ -105,12 +155,12 @@ async def list_collections( @router.get("/collection/{name}") -async def get_collection(name: str, session=Depends(get_db)): +def get_collection(name: str, session=Depends(get_db)): return client.get_collection(name, session=session) @router.post("/collection/{name}") -async def create_collection( +def create_collection( request: Request, name: str, collection: DocCollection, @@ -122,7 +172,7 @@ async def create_collection( @router.get("/users") -async def list_users( +def list_users( email: str = None, username: str = None, mode: OutputMode = OutputMode.Details, @@ -134,12 +184,12 @@ async def list_users( @router.get("/user/{username}") -async def get_user(username: str, session=Depends(get_db)): +def get_user(username: str, session=Depends(get_db)): return client.get_user(username, session=session) @router.post("/user/{username}") -async def create_user( +def create_user( user: User, username: str, session=Depends(get_db), @@ -149,13 +199,13 @@ async def create_user( @router.delete("/user/{username}") -async def delete_user(username: str, session=Depends(get_db)): +def delete_user(username: str, session=Depends(get_db)): return client.delete_user(username, session=session) # get last user sessions, specify user and last @router.get("/user/{username}/sessions") -async def list_user_sessions( +def list_user_sessions( username: str, last: int = 0, created: str = None, @@ -168,7 +218,7 @@ async def list_user_sessions( @router.put("/user/{username}") -async def update_user( +def update_user( user: User, username: str, session=Depends(get_db), @@ -178,7 +228,7 @@ async def update_user( # add routs for chat sessions, list_sessions, get_session @router.post("/session") -async def create_session(chat_session: ChatSession, session=Depends(get_db)): +def create_session(chat_session: ChatSession, session=Depends(get_db)): return client.create_session(chat_session, session=session) diff --git a/server/src/config.py b/controller/src/config.py similarity index 97% rename from server/src/config.py rename to controller/src/config.py index 48cf4ba..ea289c1 100644 --- a/server/src/config.py +++ b/controller/src/config.py @@ -36,6 +36,7 @@ class CtrlConfig(BaseModel): log_level: str = "DEBUG" # SQL Database sql_connection_str: str = default_db_path + application_url: str = "http://localhost:8000" def print(self): print(yaml.dump(self.dict())) diff --git a/controller/src/main.py b/controller/src/main.py new file mode 100644 index 0000000..02f18ef --- /dev/null +++ b/controller/src/main.py @@ -0,0 +1,295 @@ +# Copyright 2023 Iguazio +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# main file with cli commands using python click library +# include two click commands: 1. data ingestion (using the data loader), 2. query (using the agent) +import json + +import click +import yaml +from tabulate import tabulate + +from controller.src.sqlclient import client +from controller.src.model import User, DocCollection, QueryItem +from controller.src.config import config +import controller.src.api as api + + +@click.group() +def cli(): + pass + + +# click command for initializing the database tables +@click.command() +def initdb(): + """Initialize the database (delete old tables)""" + click.echo("Running Init DB") + session = client.get_db_session() + client.create_tables(True) + # create a guest user, and the default document collection + client.create_user( + User( + name="guest", + email="guest@any.com", + full_name="Guest User", + ), + session=session, + ) + client.create_collection( + DocCollection( + name="default", + description="Default Collection", + owner_name="guest", + category="vector", + ), + session=session, + ) + session.close() + + +@click.command("config") +def print_config(): + """Print the config as a yaml file""" + click.echo("Running Config") + click.echo(yaml.dump(config.dict())) + + +@click.command() +@click.argument("path", type=str) +@click.option("-l", "--loader", type=str, help="Type of data loader") +@click.option( + "-m", "--metadata", type=(str, str), multiple=True, help="Metadata Key value pair" +) +@click.option("-v", "--version", type=str, help="document version") +@click.option("-c", "--collection", type=str, help="Vector DB collection name") +@click.option( + "-f", "--from-file", is_flag=True, help="Take the document paths from the file" +) +def ingest(path, loader, metadata, version, collection, from_file): + """Ingest documents into the vector database""" + params = { + "path": path, + "from_file": from_file, + "version": version, + } + if metadata: + print(metadata) + params["metadata"] = json.dumps({metadata[0]: metadata[1]}) + + collection = collection or "default" + click.echo(f"Running Data Ingestion from: {path} with loader: {loader}") + response = api._send_to_application( + path=f"collections/{collection}/{loader}/ingest", + method="POST", + params=params, + ) + if response["status"] == "ok": + click.echo("Ingestion completed successfully") + else: + click.echo("Ingestion failed") + + +@click.command() +@click.argument("question", type=str) +@click.option( + "-f", + "--filter", + type=(str, str), + multiple=True, + help="Search filter Key value pair", +) +@click.option("-c", "--collection", type=str, help="Vector DB collection name") +@click.option("-u", "--user", type=str, help="Username") +@click.option("-s", "--session", type=str, help="Session ID") +@click.option( + "-n", "--pipeline-name", type=str, default="default", help="Pipeline name" +) +def query(question, filter, collection, user, session, pipeline_name): + """Run a chat query on the vector database collection""" + click.echo(f"Running Query for: {question}") + search_args = [filter] if filter else None + query_item = QueryItem( + question=question, + session_id=session, + filter=search_args, + collection=collection, + ) + data = json.dumps(query_item.dict()) + + headers = {"x_username": user} if user else {} + response = api._send_to_application( + path=f"pipeline/{pipeline_name}/run", + method="POST", + data=data, + headers=headers, + ) + result = response["data"] + click.echo(result["answer"]) + click.echo(sources_to_text(result["sources"])) + + +@click.group() +def list(): + """List the different objects in the database (by category)""" + pass + + +@click.group() +def update(): + """Create or update an object in the database""" + pass + + +@click.command("users") +@click.option("-u", "--user", type=str, help="user name filter") +@click.option("-e", "--email", type=str, help="email filter") +def list_users(user, email): + """List users""" + click.echo("Running List Users") + + data = client.list_users(email, user, output_mode="short") + table = format_table_results(data) + click.echo(table) + + +# add a command to list document collections, similar to the list users command +@click.command("collections") +@click.option("-o", "--owner", type=str, help="owner filter") +@click.option( + "-m", "--metadata", type=(str, str), multiple=True, help="metadata filter" +) +def list_collections(owner, metadata): + """List document collections""" + click.echo("Running List Collections") + + data = client.list_collections(owner, metadata, output_mode="short") + table = format_table_results(data) + click.echo(table) + + +@click.command("collection") +@click.argument("name", type=str) +@click.option("-o", "--owner", type=str, help="owner name") +@click.option("-d", "--description", type=str, help="collection description") +@click.option("-c", "--category", type=str, help="collection category") +@click.option( + "-l", "--labels", multiple=True, default=[], help="metadata labels filter" +) +def update_collection(name, owner, description, category, labels): + """Create or update a document collection""" + click.echo("Running Create or Update Collection") + labels = fill_params(labels) + + session = client.get_db_session() + # check if the collection exists, if it does, update it, otherwise create it + collection_exists = client.get_collection(name, session=session).success + if collection_exists: + client.update_collection( + session=session, + collection=DocCollection( + name=name, description=description, category=category, labels=labels + ), + ).with_raise() + else: + client.create_collection( + session=session, + collection=DocCollection( + name=name, + description=description, + owner_name=owner, + category=category, + labels=labels, + ), + ).with_raise() + + +# add a command to list chat sessions, similar to the list_users command +@click.command("sessions") +@click.option("-u", "--user", type=str, help="user name filter") +@click.option("-l", "--last", type=int, default=0, help="last n sessions") +@click.option("-c", "--created", type=str, help="created after date") +def list_sessions(user, last, created): + """List chat sessions""" + click.echo("Running List Sessions") + + data = client.list_sessions(user, created, last, output_mode="short") + table = format_table_results(data["data"]) + click.echo(table) + + +def sources_to_text(sources): + """Convert a list of sources to a string.""" + if not sources: + return "" + return "\nSource documents:\n" + "\n".join( + f"- {get_title(source)} ({source['source']})" for source in sources + ) + + +def sources_to_md(sources): + """Convert a list of sources to a Markdown string.""" + if not sources: + return "" + sources = { + source.metadata["source"]: get_title(source.metadata) for source in sources + } + return "\n**Source documents:**\n" + "\n".join( + f"- [{title}]({url})" for url, title in sources.items() + ) + + +def get_title(metadata): + """Get title from metadata.""" + if "chunk" in metadata: + return f"{metadata.get('title', '')}-{metadata['chunk']}" + if "page" in metadata: + return f"{metadata.get('title', '')} - page {metadata['page']}" + return metadata.get("title", "") + + +def fill_params(params, params_dict=None): + params_dict = params_dict or {} + for param in params: + i = param.find("=") + if i == -1: + continue + key, value = param[:i].strip(), param[i + 1 :].strip() + if key is None: + raise ValueError(f"cannot find param key in line ({param})") + params_dict[key] = value + if not params_dict: + return None + return params_dict + + +def format_table_results(table_results): + return tabulate(table_results, headers="keys", tablefmt="fancy_grid") + + +cli.add_command(ingest) +cli.add_command(query) +cli.add_command(initdb) +cli.add_command(print_config) + +cli.add_command(list) +list.add_command(list_users) +list.add_command(list_collections) +list.add_command(list_sessions) + +cli.add_command(update) +update.add_command(update_collection) + +if __name__ == "__main__": + cli() diff --git a/server/src/model.py b/controller/src/model.py similarity index 100% rename from server/src/model.py rename to controller/src/model.py diff --git a/server/src/sqlclient.py b/controller/src/sqlclient.py similarity index 95% rename from server/src/sqlclient.py rename to controller/src/sqlclient.py index b6f99d2..2a386d9 100644 --- a/server/src/sqlclient.py +++ b/controller/src/sqlclient.py @@ -18,11 +18,11 @@ import sqlalchemy from sqlalchemy.orm import sessionmaker -from server.src import model -from server.src.model import ApiResponse +from controller.src import model +from controller.src.model import ApiResponse -from .config import config, logger -from .sqldb import Base, ChatSessionContext, DocumentCollection, User +from controller.src.config import config, logger +from controller.src.sqldb import Base, ChatSessionContext, DocumentCollection, User class SqlClient: @@ -142,13 +142,16 @@ def create_collection( self, collection: model.DocCollection, session: sqlalchemy.orm.Session = None ): logger.debug(f"Creating collection: {collection}") - collection = model.DocCollection.from_dict(collection) + if isinstance(collection, dict): + collection = model.DocCollection.from_dict(collection) return self._create(session, DocumentCollection, collection) def update_collection( self, collection: model.DocCollection, session: sqlalchemy.orm.Session = None ): logger.debug(f"Updating collection: {collection}") + if isinstance(collection, dict): + collection = model.DocCollection.from_dict(collection) return self._update( session, DocumentCollection, collection, name=collection.name ) diff --git a/server/src/sqldb.py b/controller/src/sqldb.py similarity index 100% rename from server/src/sqldb.py rename to controller/src/sqldb.py