Skip to content

Commit

Permalink
Merge pull request #118 from ucbepic/shreyashankar/server
Browse files Browse the repository at this point in the history
v1 of the UI!
  • Loading branch information
shreyashankar authored Oct 28, 2024
2 parents 50936a8 + 09d3b8b commit 518d551
Show file tree
Hide file tree
Showing 119 changed files with 12,321 additions and 2,520 deletions.
37 changes: 35 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.PHONY: tests tests-basic lint install mypy update
.PHONY: tests tests-basic lint install mypy update ui-install ui-run

# Existing commands
tests:
poetry run pytest

Expand All @@ -19,4 +20,36 @@ mypy:
poetry run mypy

update:
poetry update
poetry update

# New UI-related commands
UI_DIR := ./website

install-ui:
cd $(UI_DIR) && npm install

run-ui-dev:
@echo "Starting server..."
@python server/app/main.py & \
echo "Starting UI development server..." && \
cd $(UI_DIR) && npm run dev

run-ui:
@echo "Starting server..."
@python server/app/main.py & \
echo "Building UI..." && \
cd $(UI_DIR) && npm run build && npm run start

# Help command
help:
@echo "Available commands:"
@echo " make tests : Run all tests"
@echo " make tests-basic : Run basic tests"
@echo " make lint : Run linter"
@echo " make install : Install dependencies using Poetry"
@echo " make mypy : Run mypy for type checking"
@echo " make update : Update dependencies"
@echo " make install-ui : Install UI dependencies"
@echo " make run-ui-dev : Run UI development server"
@echo " make run-ui-prod : Run UI production server"
@echo " make help : Show this help message"
2 changes: 1 addition & 1 deletion docetl/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def optimize(
yaml_file_suffix=self.name,
max_threads=max_threads,
)
optimized_config = runner.optimize(return_pipeline=False)
optimized_config, _ = runner.optimize(return_pipeline=False)

updated_pipeline = Pipeline(
name=self.name,
Expand Down
4 changes: 3 additions & 1 deletion docetl/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def _load_optimized_ops(self):
else:
self.console.log("[yellow]No optimized operations found[/yellow]")

def optimize(self):
def optimize(self) -> float:
"""
Optimize the entire pipeline defined in the configuration.
Expand Down Expand Up @@ -602,6 +602,8 @@ def optimize(self):
f"[bold]Total cost: ${self.llm_client.total_cost + self.operations_cost:.2f}[/bold]"
)

return self.llm_client.total_cost + self.operations_cost

def _run_partial_step(
self,
step: Dict[str, Any],
Expand Down
14 changes: 11 additions & 3 deletions docetl/config_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import datetime
import os
from docetl.console import get_console
from docetl.utils import load_config
from typing import Any, Dict, List, Optional, Tuple, Union
from docetl.operations.utils import APIWrapper
from rich.console import Console
import pyrate_limiter
from inspect import isawaitable
import math
from rich.console import Console


class BucketCollection(pyrate_limiter.BucketFactory):
Expand Down Expand Up @@ -52,15 +53,22 @@ def __init__(
base_name: Optional[str] = None,
yaml_file_suffix: Optional[str] = None,
max_threads: int = None,
console: Console = Console(),
console: Optional[Console] = None,
):
self.config = config
self.base_name = base_name
self.yaml_file_suffix = yaml_file_suffix or datetime.datetime.now().strftime(
"%Y%m%d_%H%M%S"
)
self.default_model = self.config.get("default_model", "gpt-4o-mini")
self.console = console
if console:
self.console = console
else:
# Reset the DOCETL_CONSOLE
global DOCETL_CONSOLE
DOCETL_CONSOLE = get_console()

self.console = DOCETL_CONSOLE
self.max_threads = max_threads or (os.cpu_count() or 1) * 4
self.status = None

Expand Down
53 changes: 53 additions & 0 deletions docetl/console.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os
from typing import Optional
from rich.console import Console
from io import StringIO
import threading
import queue


class ThreadSafeConsole(Console):
def __init__(self, *args, **kwargs):
self.buffer = StringIO()
kwargs["file"] = self.buffer
super().__init__(*args, **kwargs)
self.input_event = threading.Event()
self.input_value = None

def print(self, *args, **kwargs):
super().print(*args, **kwargs)

def input(
self, prompt="", *, markup: bool = True, emoji: bool = True, **kwargs
) -> str:
if prompt:
self.print(prompt, markup=markup, emoji=emoji, end="")

# TODO: Handle password

self.input_event.wait()
self.input_event.clear()
return self.input_value

def post_input(self, value: str):
if self.input_event.is_set():
super().print("Warning: Input ignored as we're not waiting for user input.")
return
self.input_value = value
self.input_event.set()


def get_console():
# Check if we're running with a frontend
if os.environ.get("USE_FRONTEND") == "true":
return ThreadSafeConsole(
force_terminal=True,
width=80,
soft_wrap=True,
highlight=False,
)
else:
return Console()


DOCETL_CONSOLE = get_console()
3 changes: 2 additions & 1 deletion docetl/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Dict, List, Optional, Tuple

from docetl.operations.utils import APIWrapper
from docetl.console import DOCETL_CONSOLE
from rich.console import Console
from rich.status import Status
import jsonschema
Expand Down Expand Up @@ -55,7 +56,7 @@ def __init__(
self.config = config
self.default_model = default_model
self.max_threads = max_threads
self.console = console or Console()
self.console = console or DOCETL_CONSOLE
self.manually_fix_errors = self.config.get("manually_fix_errors", False)
self.status = status
self.num_retries_on_validate_failure = self.config.get(
Expand Down
1 change: 1 addition & 0 deletions docetl/operations/equijoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ def get_hashable_key(item: Dict) -> str:
f"[yellow]Warning: {dropped_pairs} pairs will be dropped due to the comparison limit. "
f"Proceeding with {limit_comparisons} randomly sampled pairs. "
f"Do you want to continue?[/yellow]",
self.console,
):
raise ValueError("Operation cancelled by user due to pair limit.")

Expand Down
9 changes: 2 additions & 7 deletions docetl/operations/gather.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,7 @@ def syntax_check(self) -> None:
f"Missing required key '{key}' in GatherOperation configuration"
)

if "peripheral_chunks" not in self.config:
raise ValueError(
"Missing 'peripheral_chunks' configuration in GatherOperation"
)

peripheral_config = self.config["peripheral_chunks"]
peripheral_config = self.config.get("peripheral_chunks", {})
for direction in ["previous", "next"]:
if direction not in peripheral_config:
continue
Expand Down Expand Up @@ -87,7 +82,7 @@ def execute(self, input_data: List[Dict]) -> Tuple[List[Dict], float]:
content_key = self.config["content_key"]
doc_id_key = self.config["doc_id_key"]
order_key = self.config["order_key"]
peripheral_config = self.config["peripheral_chunks"]
peripheral_config = self.config.get("peripheral_chunks", {})
main_chunk_start = self.config.get(
"main_chunk_start", "--- Begin Main Chunk ---"
)
Expand Down
5 changes: 4 additions & 1 deletion docetl/operations/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,14 @@ def process_prompt(item, prompt_config):
local_output_schema = {
key: output_schema[key] for key in prompt_config["output_keys"]
}
model = prompt_config.get("model", self.default_model)
if not model:
model = self.default_model

# Start of Selection
# If there are tools, we need to pass in the tools
response = self.runner.api.call_llm(
prompt_config.get("model", self.default_model),
model,
"parallel_map",
[{"role": "user", "content": prompt}],
local_output_schema,
Expand Down
3 changes: 3 additions & 0 deletions docetl/operations/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ def process_group(

total_cost += cost

# Add the counts of items in the group to the result
result[f"_counts_prereduce_{self.config['name']}"] = len(group_elems)

# Apply pass-through at the group level
if (
result is not None
Expand Down
24 changes: 22 additions & 2 deletions docetl/operations/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def compare_pair(
if all(
key in item1
and key in item2
and item1[key].lower() == item2[key].lower()
and str(item1[key]).lower() == str(item2[key]).lower()
for key in blocking_keys
):
return True, 0
Expand Down Expand Up @@ -241,6 +241,7 @@ def execute(self, input_data: List[Dict]) -> Tuple[List[Dict], float]:
f"This may result in a large number of comparisons. "
f"We recommend specifying at least one blocking key or condition, or using the optimizer to automatically come up with these. "
f"Do you want to continue without blocking?[/yellow]",
console=self.runner.console,
):
raise ValueError("Operation cancelled by user.")

Expand All @@ -260,7 +261,9 @@ def is_match(item1: Dict[str, Any], item2: Dict[str, Any]) -> bool:
# Calculate embeddings if blocking_threshold is set
embeddings = None
if blocking_threshold is not None:
embedding_model = self.config.get("embedding_model", self.default_model)
embedding_model = self.config.get(
"embedding_model", "text-embedding-3-small"
)

def get_embeddings_batch(
items: List[Dict[str, Any]]
Expand Down Expand Up @@ -414,6 +417,9 @@ def merge_clusters(item1: int, item2: int) -> None:
f"[green]Comparisons saved by blocking: {comparisons_saved} "
f"({(comparisons_saved / total_possible_comparisons) * 100:.2f}%)[/green]"
)
self.console.log(
f"[blue]Number of pairs to compare: {len(blocked_pairs)}[/blue]"
)

# Compute an auto-batch size based on the number of comparisons
def auto_batch() -> int:
Expand Down Expand Up @@ -544,10 +550,21 @@ def process_cluster(cluster):
self.config["output"]["schema"],
manually_fix_errors=self.manually_fix_errors,
)[0]

# If the output is overwriting an existing key, we want to save the kv pairs
keys_in_output = [
k
for k in set(reduction_output.keys())
if k in cluster_items[0].keys()
]

return (
[
{
**item,
f"_kv_pairs_preresolve_{self.config['name']}": {
k: item[k] for k in keys_in_output
},
**{
k: reduction_output[k]
for k in self.config["output"]["schema"]
Expand Down Expand Up @@ -589,6 +606,9 @@ def process_cluster(cluster):

# Create the result dictionary using the key mapping
result = input_data[list(cluster)[0]].copy()
result[f"_kv_pairs_preresolve_{self.config['name']}"] = {
ok: result[ck] for ok, ck in key_mapping.items() if ck in result
}
for output_key, compare_key in key_mapping.items():
if compare_key in input_data[list(cluster)[0]]:
result[output_key] = input_data[list(cluster)[0]][compare_key]
Expand Down
14 changes: 12 additions & 2 deletions docetl/operations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from tqdm import tqdm
from pydantic import BaseModel

from docetl.console import DOCETL_CONSOLE
from docetl.utils import completion_cost, count_tokens
import time

Expand Down Expand Up @@ -80,7 +81,7 @@ def wrapped(*args, **kwargs):
return wrapped


def flush_cache(console: Console = Console()):
def flush_cache(console: Console = DOCETL_CONSOLE):
"""
Flush the cache to disk.
"""
Expand All @@ -89,7 +90,7 @@ def flush_cache(console: Console = Console()):
console.log("[bold green]Cache flushed to disk.[/bold green]")


def clear_cache(console: Console = Console()):
def clear_cache(console: Console = DOCETL_CONSOLE):
"""
Clear the LLM cache stored on disk.
Expand Down Expand Up @@ -916,6 +917,15 @@ def _parse_llm_response_helper(

outputs = []
for tool_call in tool_calls:
if response.choices[0].finish_reason == "content_filter":
raise InvalidOutputError(
"Content filter triggered in LLM response",
"",
schema,
response.choices,
tools,
)

try:
output_dict = json.loads(tool_call.function.arguments)
if "ollama" in response.model:
Expand Down
2 changes: 1 addition & 1 deletion docetl/optimizers/join_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ def optimize_equijoin(
if self.status:
self.status.stop()
# Use Rich's Confirm for input
if Confirm.ask("Use this rule?"):
if Confirm.ask("Use this rule?", self.console):
selected_containment_rules.append(rule)
# Restart the status
if self.status:
Expand Down
3 changes: 2 additions & 1 deletion docetl/optimizers/reduce_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ def _evaluate_decomposition(
# Ask user if they agree with the decomposition assessment
user_agrees = Confirm.ask(
f"Do you agree with the decomposition assessment? "
f"[bold]{'Recommended' if should_decompose['should_decompose'] else 'Not recommended'}[/bold]"
f"[bold]{'Recommended' if should_decompose['should_decompose'] else 'Not recommended'}[/bold]",
self.console,
)

# If user disagrees, invert the decomposition decision
Expand Down
Loading

0 comments on commit 518d551

Please sign in to comment.