Skip to content

Commit

Permalink
Add mkdocs
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyashankar committed Sep 13, 2024
1 parent 1583f8a commit 9db82e9
Show file tree
Hide file tree
Showing 40 changed files with 2,149 additions and 156 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ Example of a reduce operation with value sampling:
enabled: true
method: cluster
sample_size: 50
embedding_model: text-embedding-ada-002
embedding_model: text-embedding-3-small
embedding_keys:
- name
- price
Expand Down Expand Up @@ -609,7 +609,7 @@ Example:
blocking_keys:
- record
blocking_threshold: 0.8
embedding_model: text-embedding-ada-002
embedding_model: text-embedding-3-small
resolution_model: gpt-4o-mini
comparison_model: gpt-4o-mini
```
Expand Down
1 change: 1 addition & 0 deletions docetl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "0.1.0"
2 changes: 1 addition & 1 deletion docetl/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ def _optimize_step(
)

if (
not op_object.get("optimize", True)
not op_object.get("optimize", False) # Default don't optimize
or op_object.get("type") not in SUPPORTED_OPS
):
# If optimize is False or operation type is not supported, just use the operation without optimization
Expand Down
10 changes: 10 additions & 0 deletions docetl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,15 @@ def clear_cache():
cc()


@app.command()
def version():
"""
Display the current version of DocETL.
"""
import docetl

typer.echo(f"DocETL version: {docetl.__version__}")


if __name__ == "__main__":
app()
17 changes: 9 additions & 8 deletions docetl/operations/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from typing import Any, Dict, List, Tuple
import random

import numpy as np

import jinja2
from jinja2 import Template
from docetl.utils import completion_cost
Expand Down Expand Up @@ -291,17 +293,16 @@ def meets_blocking_conditions(pair):
else float("inf")
)
if remaining_comparisons > 0 and blocking_threshold is not None:
# Compute cosine similarity for all pairs at once
all_embeddings = np.array([embeddings[i] for i in range(len(input_data))])
similarity_matrix = cosine_similarity(all_embeddings)

cosine_pairs = []
for i, j in all_pairs:
if (i, j) not in blocked_pairs and find_cluster(i) != find_cluster(j):
try:
similarity = cosine_similarity(
[embeddings[i]], [embeddings[j]]
)[0][0]
if similarity >= blocking_threshold:
cosine_pairs.append((i, j, similarity))
except Exception as e:
self.console.log(f"Error comparing pair {i} and {j}: {e}")
similarity = similarity_matrix[i, j]
if similarity >= blocking_threshold:
cosine_pairs.append((i, j, similarity))

if remaining_comparisons != float("inf"):
cosine_pairs.sort(key=lambda x: x[2], reverse=True)
Expand Down
162 changes: 113 additions & 49 deletions docetl/operations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import threading
from concurrent.futures import as_completed
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
from openai import OpenAI

from dotenv import load_dotenv
from frozendict import frozendict
Expand All @@ -17,6 +18,7 @@
from diskcache import Cache
import tiktoken
from rich import print as rprint
from pydantic import BaseModel, create_model

from docetl.utils import count_tokens

Expand All @@ -28,6 +30,8 @@
LLM_CACHE_DIR = os.path.join(DOCETL_HOME_DIR, "llm_cache")
cache = Cache(LLM_CACHE_DIR)

client = OpenAI()


def freezeargs(func):
"""
Expand Down Expand Up @@ -150,6 +154,49 @@ def clear_cache(console: Console = Console()):
console.log(f"[bold red]Error clearing cache: {str(e)}[/bold red]")


def create_dynamic_model(schema: Dict[str, Any], model_name: str = "DynamicModel"):
fields = {}

def process_schema(s: Dict[str, Any], prefix: str = "") -> None:
for key, value in s.items():
field_name = f"{prefix}__{key}" if prefix else key
if isinstance(value, dict):
process_schema(value, field_name)
else:
fields[field_name] = parse_type(value, field_name)

def parse_type(type_str: str, field_name: str) -> tuple:
type_str = type_str.strip().lower()
if type_str in ["str", "text", "string", "varchar"]:
return (str, ...)
elif type_str in ["int", "integer"]:
return (int, ...)
elif type_str in ["float", "decimal", "number"]:
return (float, ...)
elif type_str in ["bool", "boolean"]:
return (bool, ...)
elif type_str.startswith("list["):
inner_type = type_str[5:-1].strip()
item_type = parse_type(inner_type, f"{field_name}_item")[0]
return (List[item_type], ...)
elif type_str == "list":
return (List[Any], ...)
elif type_str.startswith("{") and type_str.endswith("}"):
subfields = {}
for item in type_str[1:-1].split(","):
sub_key, sub_type = item.strip().split(":")
subfields[sub_key.strip()] = parse_type(
sub_type.strip(), f"{field_name}_{sub_key}"
)
SubModel = create_model(f"{model_name}_{field_name}", **subfields)
return (SubModel, ...)
else:
return (Any, ...)

process_schema(schema)
return create_model(model_name, **fields)


def convert_val(value: Any) -> Dict[str, Any]:
"""
Convert a string representation of a type to a dictionary representation.
Expand Down Expand Up @@ -419,47 +466,62 @@ def call_llm_with_cache(
parameters["required"] = list(props.keys())
parameters["additionalProperties"] = False

tools = [
{
"type": "function",
"function": {
"name": "write_output",
"description": "Write processing output to a database",
"strict": True,
"parameters": parameters,
"additionalProperties": False,
},
}
]
tool_choice = {"type": "function", "function": {"name": "write_output"}}
response_format = {
"type": "json_schema",
"json_schema": {
"name": "write_output",
"description": "Write task output to a database",
"strict": True,
"schema": parameters,
# "additionalProperties": False,
},
}

tools = []
# tool_choice = {"type": "function", "function": {"name": "write_output"}}
tool_choice = "auto"

else:
tools = json.loads(tools)
tool_choice = (
"required" if any(tool.get("required", False) for tool in tools) else "auto"
)
tools = [{"type": "function", "function": tool["function"]} for tool in tools]
response_format = None

system_prompt = f"You are a helpful assistant, intelligently processing data. This is a {op_type} operation."
system_prompt = f"You are a helpful assistant, intelligently processing data. This is a {op_type} operation. You will perform the task on the user-provided data and write the output to a database."
if scratchpad:
system_prompt += f"\n\nYou are incrementally processing data across multiple batches. Your task is to {op_type} the data. Consider what intermediate state you need to maintain between batches to accomplish this task effectively.\n\nYour current scratchpad contains: {scratchpad}\n\nAs you process each batch, update your scratchpad with information crucial for processing subsequent batches. This may include partial results, counters, or any other relevant data that doesn't fit into {output_schema.keys()}. For example, if you're counting occurrences, track items that have appeared once.\n\nKeep your scratchpad concise (~500 chars) and use a format you can easily parse in future batches. You may use bullet points, key-value pairs, or any other clear structure."
messages = json.loads(messages)

# Truncate messages if they exceed the model's context length
messages = truncate_messages(messages, model)

response = completion(
model=model,
messages=[
{
"role": "system",
"content": system_prompt,
},
]
+ messages,
tools=tools,
tool_choice=tool_choice,
)
if response_format is None:
response = completion(
model=model,
messages=[
{
"role": "system",
"content": system_prompt,
},
]
+ messages,
tools=tools,
tool_choice=tool_choice,
)
else:
response = completion(
model=model,
messages=[
{
"role": "system",
"content": system_prompt,
},
]
+ messages,
response_format=response_format,
)

return response

Expand Down Expand Up @@ -612,22 +674,20 @@ def call_llm_with_gleaning(
messages.append({"role": "user", "content": improvement_prompt})

# Call LLM for improvement
# TODO: support gleaning and tools
response = completion(
model=model,
messages=truncate_messages(messages, model),
tools=[
{
"type": "function",
"function": {
"name": "write_output",
"description": "Write processing output to a database",
"strict": True,
"parameters": parameters,
"additionalProperties": False,
},
}
],
tool_choice={"type": "function", "function": {"name": "write_output"}},
response_format={
"type": "json_schema",
"json_schema": {
"name": "write_output",
"description": "Write processing output to a database",
"strict": True,
"schema": parameters,
# "additionalProperties": False,
},
},
)

# Update messages with the new response
Expand Down Expand Up @@ -682,16 +742,20 @@ def parse_llm_response(
results.append(function_args)
return results
else:
# Default behavior for write_output function
tool_calls = response.choices[0].message.tool_calls
outputs = []
for tool_call in tool_calls:
if tool_call.function.name == "write_output":
try:
outputs.append(json.loads(tool_call.function.arguments))
except json.JSONDecodeError:
return [{}]
return outputs
if "tool_calls" in response.choices[0].message:
# Default behavior for write_output function
tool_calls = response.choices[0].message.tool_calls
outputs = []
for tool_call in tool_calls:
if tool_call.function.name == "write_output":
try:
outputs.append(json.loads(tool_call.function.arguments))
except json.JSONDecodeError:
return [{}]
return outputs

else:
return [json.loads(response.choices[0].message.content)]

# message = response.choices[0].message
# return [json.loads(message.content)]
Expand Down
Empty file.
Empty file.
Empty file.
Empty file added docs/api-reference/docetl.md
Empty file.
Empty file.
Empty file.
Empty file added docs/community.md
Empty file.
Empty file added docs/concepts/interface.md
Empty file.
Loading

0 comments on commit 9db82e9

Please sign in to comment.