Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logistic regression to check equality for resolve before cascading to LLM #234

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 83 additions & 5 deletions docetl/optimizers/join_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from rich.console import Console
from rich.prompt import Confirm
from rich.status import Status
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import log_loss, accuracy_score
from sklearn.model_selection import train_test_split

from docetl.operations.equijoin import EquijoinOperation
from docetl.operations.resolve import ResolveOperation
Expand Down Expand Up @@ -435,11 +438,47 @@ def should_optimize(self, input_data: List[Dict[str, Any]]) -> Tuple[bool, str]:
return dedup, explanation

return False, ""

def _generate_log_reg_training_data(self, comparison_results_train: List[Tuple[int, int, bool]], embeddings: np.ndarray, impute_data = True) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
y_train_real = np.array([])
y_train_fake = np.array([])
X_train_real = np.array([])
X_train_fake = np.array([])
embeddings_noise = np.random.normal(0, np.std(embeddings) * 0.03, embeddings.shape)

for (i, j, result) in comparison_results_train:
if result:
y_train_real = np.append(y_train_real, 1)
else:
y_train_real = np.append(y_train_real, 0)
embedding_pair = np.append(embeddings[i], embeddings[j])
if X_train_real.shape == (0,):
X_train_real = embedding_pair
else:
X_train_real = np.vstack((X_train_real, embedding_pair))
# imputing some fake data to address the equality class imbalance
if impute_data:
idx = np.random.choice([i, j])
embeddings_fake_pair = np.append(embeddings[idx], (embeddings[idx] + embeddings_noise[idx]))
if X_train_fake.shape == (0,):
X_train_fake = embeddings_fake_pair
else:
X_train_fake = np.vstack((X_train_fake, embeddings_fake_pair))
y_train_fake = np.append(y_train_fake, 1)
return X_train_fake, X_train_real, y_train_fake, y_train_real
def _generate_log_reg_test_data(self, sample_pairs: List[Tuple[int, int]], embeddings: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
X_test = np.array([])
for (i, j) in sample_pairs:
embedding_pair = np.append(embeddings[i], embeddings[j])
if X_test.shape == (0,):
X_test = embedding_pair
else:
X_test = np.vstack((X_test, embedding_pair))
return X_test

def optimize_resolve(
self, input_data: List[Dict[str, Any]]
) -> Tuple[Dict[str, Any], float]:

# Check if the operation is marked as empty
if self.op_config.get("empty", False):
# Extract the map prompt from the intermediates
Expand Down Expand Up @@ -491,20 +530,59 @@ def optimize_resolve(
# Pop off the empty flag
self.op_config.pop("empty")


embeddings, blocking_keys, embedding_cost = self._compute_embeddings(input_data)

self.console.log(
f"[bold]Cost of creating embeddings on the sample: ${embedding_cost:.4f}[/bold]"
)

# Get the labels for the training data using the LLM based method.

similarities = self._calculate_cosine_similarities(embeddings)

sampled_pairs = self._sample_pairs(similarities)
comparison_results, comparison_cost = self._perform_comparisons_resolve(
input_data, sampled_pairs
)

sampled_pairs_train, sampled_pairs_test = train_test_split(sampled_pairs, test_size=0.2, random_state=42)
comparison_results_train, comparison_cost = self._perform_comparisons_resolve(
input_data, sampled_pairs_train
)
num_true = 0
num_false = 0
for _, _, flag in comparison_results_train:
if flag is True:
num_true+= 1
elif flag is False:
num_false+= 1
should_cascade = True
if num_true/len(comparison_results_train) > 0.9 or num_false/len(comparison_results_train) > 0.9:
should_cascade = False # too much class imbalance,don't cascade unless the user explicitly is ok with imputing data.
# add a flag here that if they do want to continue to cascade need to enable some sort of allow imputation flag in the config.
if should_cascade:
embeddings = np.array(embeddings)
## Use the logistic to try and predict the rest of the sampled pairs
X_train_fake, X_train_real, y_train_fake, y_train_real = self._generate_log_reg_training_data(comparison_results_train, embeddings)
X_train = np.vstack((X_train_real, X_train_fake))
y_train = np.append(y_train_real, y_train_fake)
clf = LogisticRegression().fit(X_train, y_train)
comparison_results_test = []
X_test = self._generate_log_reg_test_data(sampled_pairs_test, embeddings)
y_test = clf.predict_proba(X_test)[:, 1]
cascaded_pairs = []
classifier_threshold = 0.7
for((i, j), y) in zip(sampled_pairs_test, y_test):
if y > classifier_threshold or y < 1-classifier_threshold:
comparison_results_test.append((i, j, y > classifier_threshold))
else:
cascaded_pairs.append((i, j))
llm_cascade_results, llm_cascade_cost = self._perform_comparisons_resolve(input_data, cascaded_pairs)
comparison_cost += llm_cascade_cost
else:
llm_cascade_results = []
comparison_results_test, remainder_cost = self._perform_comparisons_resolve(input_data, sampled_pairs_test)
comparison_cost += remainder_cost
comparison_results = comparison_results_train + comparison_results_test + llm_cascade_results
self._print_similarity_histogram(similarities, comparison_results)

threshold, estimated_selectivity = self._find_optimal_threshold(
comparison_results, similarities
)
Expand Down
Loading