From 75442b3269284f5e9b5bf8199f739ba23171f90f Mon Sep 17 00:00:00 2001 From: sushruth2003 Date: Sat, 7 Dec 2024 17:47:23 -0800 Subject: [PATCH] 'Add support for logistic regression classifier to cascade to LLM on low confidence in join_optimizer.py' --- docetl/optimizers/join_optimizer.py | 88 +++++++++++++++++++++++++++-- 1 file changed, 83 insertions(+), 5 deletions(-) diff --git a/docetl/optimizers/join_optimizer.py b/docetl/optimizers/join_optimizer.py index 42d51871..871ba9b1 100644 --- a/docetl/optimizers/join_optimizer.py +++ b/docetl/optimizers/join_optimizer.py @@ -8,6 +8,9 @@ from rich.console import Console from rich.prompt import Confirm from rich.status import Status +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import log_loss, accuracy_score +from sklearn.model_selection import train_test_split from docetl.operations.equijoin import EquijoinOperation from docetl.operations.resolve import ResolveOperation @@ -435,11 +438,47 @@ def should_optimize(self, input_data: List[Dict[str, Any]]) -> Tuple[bool, str]: return dedup, explanation return False, "" + + def _generate_log_reg_training_data(self, comparison_results_train: List[Tuple[int, int, bool]], embeddings: np.ndarray, impute_data = True) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + y_train_real = np.array([]) + y_train_fake = np.array([]) + X_train_real = np.array([]) + X_train_fake = np.array([]) + embeddings_noise = np.random.normal(0, np.std(embeddings) * 0.03, embeddings.shape) + + for (i, j, result) in comparison_results_train: + if result: + y_train_real = np.append(y_train_real, 1) + else: + y_train_real = np.append(y_train_real, 0) + embedding_pair = np.append(embeddings[i], embeddings[j]) + if X_train_real.shape == (0,): + X_train_real = embedding_pair + else: + X_train_real = np.vstack((X_train_real, embedding_pair)) + # imputing some fake data to address the equality class imbalance + if impute_data: + idx = np.random.choice([i, j]) + embeddings_fake_pair = np.append(embeddings[idx], (embeddings[idx] + embeddings_noise[idx])) + if X_train_fake.shape == (0,): + X_train_fake = embeddings_fake_pair + else: + X_train_fake = np.vstack((X_train_fake, embeddings_fake_pair)) + y_train_fake = np.append(y_train_fake, 1) + return X_train_fake, X_train_real, y_train_fake, y_train_real + def _generate_log_reg_test_data(self, sample_pairs: List[Tuple[int, int]], embeddings: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + X_test = np.array([]) + for (i, j) in sample_pairs: + embedding_pair = np.append(embeddings[i], embeddings[j]) + if X_test.shape == (0,): + X_test = embedding_pair + else: + X_test = np.vstack((X_test, embedding_pair)) + return X_test def optimize_resolve( self, input_data: List[Dict[str, Any]] ) -> Tuple[Dict[str, Any], float]: - # Check if the operation is marked as empty if self.op_config.get("empty", False): # Extract the map prompt from the intermediates @@ -491,20 +530,59 @@ def optimize_resolve( # Pop off the empty flag self.op_config.pop("empty") + embeddings, blocking_keys, embedding_cost = self._compute_embeddings(input_data) + self.console.log( f"[bold]Cost of creating embeddings on the sample: ${embedding_cost:.4f}[/bold]" ) + + # Get the labels for the training data using the LLM based method. similarities = self._calculate_cosine_similarities(embeddings) sampled_pairs = self._sample_pairs(similarities) - comparison_results, comparison_cost = self._perform_comparisons_resolve( - input_data, sampled_pairs - ) + sampled_pairs_train, sampled_pairs_test = train_test_split(sampled_pairs, test_size=0.2, random_state=42) + comparison_results_train, comparison_cost = self._perform_comparisons_resolve( + input_data, sampled_pairs_train + ) + num_true = 0 + num_false = 0 + for _, _, flag in comparison_results_train: + if flag is True: + num_true+= 1 + elif flag is False: + num_false+= 1 + should_cascade = True + if num_true/len(comparison_results_train) > 0.9 or num_false/len(comparison_results_train) > 0.9: + should_cascade = False # too much class imbalance,don't cascade unless the user explicitly is ok with imputing data. + # add a flag here that if they do want to continue to cascade need to enable some sort of allow imputation flag in the config. + if should_cascade: + embeddings = np.array(embeddings) + ## Use the logistic to try and predict the rest of the sampled pairs + X_train_fake, X_train_real, y_train_fake, y_train_real = self._generate_log_reg_training_data(comparison_results_train, embeddings) + X_train = np.vstack((X_train_real, X_train_fake)) + y_train = np.append(y_train_real, y_train_fake) + clf = LogisticRegression().fit(X_train, y_train) + comparison_results_test = [] + X_test = self._generate_log_reg_test_data(sampled_pairs_test, embeddings) + y_test = clf.predict_proba(X_test)[:, 1] + cascaded_pairs = [] + classifier_threshold = 0.7 + for((i, j), y) in zip(sampled_pairs_test, y_test): + if y > classifier_threshold or y < 1-classifier_threshold: + comparison_results_test.append((i, j, y > classifier_threshold)) + else: + cascaded_pairs.append((i, j)) + llm_cascade_results, llm_cascade_cost = self._perform_comparisons_resolve(input_data, cascaded_pairs) + comparison_cost += llm_cascade_cost + else: + llm_cascade_results = [] + comparison_results_test, remainder_cost = self._perform_comparisons_resolve(input_data, sampled_pairs_test) + comparison_cost += remainder_cost + comparison_results = comparison_results_train + comparison_results_test + llm_cascade_results self._print_similarity_histogram(similarities, comparison_results) - threshold, estimated_selectivity = self._find_optimal_threshold( comparison_results, similarities )