diff --git a/README.md b/README.md index a0e78400f..22daede99 100644 --- a/README.md +++ b/README.md @@ -64,9 +64,10 @@ pip install git+https://github.com/explodinggradients/ragas ### Evaluate your RAG with Ragas metrics -This is 4 main lines: +This is 5 main lines: ```python +from ragas import evaluate from ragas.metrics import LLMContextRecall, Faithfulness, FactualCorrectness from langchain_openai.chat_models import ChatOpenAI from ragas.llms import LangchainLLMWrapper diff --git a/docs/howtos/customizations/customize_models.md b/docs/howtos/customizations/customize_models.md index 42c74fa29..2ee6b7dfe 100644 --- a/docs/howtos/customizations/customize_models.md +++ b/docs/howtos/customizations/customize_models.md @@ -70,11 +70,12 @@ import google.auth from langchain_google_vertexai import ChatVertexAI, VertexAIEmbeddings from ragas.llms import LangchainLLMWrapper from ragas.embeddings import LangchainEmbeddingsWrapper +from langchain_core.outputs import LLMResult, ChatGeneration config = { "project_id": "", - "chat_model_id": "gemini-1.0-pro-002", - "embedding_model_id": "textembedding-gecko", + "chat_model_id": "gemini-1.5-pro-002", + "embedding_model_id": "text-embedding-005", } # authenticate to GCP @@ -89,7 +90,41 @@ vertextai_embeddings = VertexAIEmbeddings( credentials=creds, model_name=config["embedding_model_id"] ) -vertextai_llm = LangchainLLMWrapper(vertextai_llm) +# Create a custom is_finished_parser to capture Gemini generation completion signals +def gemini_is_finished_parser(response: LLMResult) -> bool: + is_finished_list = [] + for g in response.flatten(): + resp = g.generations[0][0] + + # Check generation_info first + if resp.generation_info is not None: + finish_reason = resp.generation_info.get("finish_reason") + if finish_reason is not None: + is_finished_list.append( + finish_reason in ["STOP", "MAX_TOKENS"] + ) + continue + + # Check response_metadata as fallback + if isinstance(resp, ChatGeneration) and resp.message is not None: + metadata = resp.message.response_metadata + if metadata.get("finish_reason"): + is_finished_list.append( + metadata["finish_reason"] in ["STOP", "MAX_TOKENS"] + ) + elif metadata.get("stop_reason"): + is_finished_list.append( + metadata["stop_reason"] in ["STOP", "MAX_TOKENS"] + ) + + # If no finish reason found, default to True + if not is_finished_list: + is_finished_list.append(True) + + return all(is_finished_list) + + +vertextai_llm = LangchainLLMWrapper(vertextai_llm, is_finished_parser=gemini_is_finished_parser) vertextai_embeddings = LangchainEmbeddingsWrapper(vertextai_embeddings) ``` Yay! Now are you ready to use ragas with Google VertexAI endpoints diff --git a/src/ragas/callbacks.py b/src/ragas/callbacks.py index 7b16059f5..ae5ea8b2e 100644 --- a/src/ragas/callbacks.py +++ b/src/ragas/callbacks.py @@ -133,12 +133,15 @@ def __str__(self): def parse_run_traces( traces: t.Dict[str, ChainRun], + parent_run_id: t.Optional[str] = None, ) -> t.List[t.Dict[str, t.Any]]: + root_traces = [ chain_trace for chain_trace in traces.values() - if chain_trace.parent_run_id is None + if chain_trace.parent_run_id == parent_run_id ] + if len(root_traces) > 1: raise ValueError( "Multiple root traces found! This is a bug on our end, please file an issue and we will fix it ASAP :)" @@ -159,7 +162,7 @@ def parse_run_traces( prompt_traces = {} for i, prompt_uuid in enumerate(metric_trace.children): prompt_trace = traces[prompt_uuid] - prompt_traces[f"{i}_{prompt_trace.name}"] = { + prompt_traces[f"{prompt_trace.name}"] = { "input": prompt_trace.inputs.get("data", {}), "output": prompt_trace.outputs.get("output", {}), } diff --git a/src/ragas/config.py b/src/ragas/config.py index c3dde696a..b12e9b2a2 100644 --- a/src/ragas/config.py +++ b/src/ragas/config.py @@ -5,7 +5,7 @@ from ragas.embeddings import BaseRagasEmbeddings from ragas.llms import BaseRagasLLM from ragas.losses import Loss -from ragas.optimizers import Optimizer +from ragas.optimizers import GeneticOptimizer, Optimizer DEFAULT_OPTIMIZER_CONFIG = {"max_steps": 100} @@ -20,7 +20,7 @@ class DemonstrationConfig(BaseModel): class InstructionConfig(BaseModel): enabled: bool = True loss: t.Optional[Loss] = None - optimizer: Optimizer + optimizer: Optimizer = GeneticOptimizer() optimizer_config: t.Dict[str, t.Any] = Field( default_factory=lambda: DEFAULT_OPTIMIZER_CONFIG ) diff --git a/src/ragas/dataset_schema.py b/src/ragas/dataset_schema.py index c8ae52872..704144ee9 100644 --- a/src/ragas/dataset_schema.py +++ b/src/ragas/dataset_schema.py @@ -6,6 +6,7 @@ from abc import ABC, abstractmethod from collections import defaultdict from dataclasses import dataclass, field +from uuid import UUID import numpy as np from datasets import Dataset as HFDataset @@ -43,6 +44,13 @@ def get_features(self) -> t.List[str]: """ return list(self.to_dict().keys()) + def to_string(self) -> str: + """ + Get the string representation of the sample. + """ + sample_dict = self.to_dict() + return "".join(f"\n{key}:\n\t{val}\n" for key, val in sample_dict.items()) + class SingleTurnSample(BaseSample): """ @@ -378,6 +386,7 @@ class EvaluationResult: cost_cb: t.Optional[CostCallbackHandler] = None traces: t.List[t.Dict[str, t.Any]] = field(default_factory=list) ragas_traces: t.Dict[str, ChainRun] = field(default_factory=dict, repr=False) + run_id: t.Optional[UUID] = None def __post_init__(self): # transform scores from list of dicts to dict of lists @@ -395,7 +404,8 @@ def __post_init__(self): values.append(value + 1e-10) # parse the traces - self.traces = parse_run_traces(self.ragas_traces) + run_id = str(self.run_id) if self.run_id is not None else None + self.traces = parse_run_traces(self.ragas_traces, run_id) def __repr__(self) -> str: score_strs = [f"'{k}': {v:0.4f}" for k, v in self._repr_dict.items()] diff --git a/src/ragas/evaluation.py b/src/ragas/evaluation.py index a1878c43f..9fa1e0116 100644 --- a/src/ragas/evaluation.py +++ b/src/ragas/evaluation.py @@ -1,11 +1,13 @@ from __future__ import annotations import typing as t +from uuid import UUID from datasets import Dataset from langchain_core.callbacks import BaseCallbackHandler, BaseCallbackManager from langchain_core.embeddings import Embeddings as LangchainEmbeddings from langchain_core.language_models import BaseLanguageModel as LangchainLLM +from tqdm.auto import tqdm from ragas._analytics import track_was_completed from ragas.callbacks import ChainType, RagasTracer, new_group @@ -59,12 +61,14 @@ def evaluate( embeddings: t.Optional[BaseRagasEmbeddings | LangchainEmbeddings] = None, callbacks: Callbacks = None, in_ci: bool = False, - run_config: RunConfig = RunConfig(), + run_config: t.Optional[RunConfig] = None, token_usage_parser: t.Optional[TokenUsageParser] = None, raise_exceptions: bool = False, column_map: t.Optional[t.Dict[str, str]] = None, show_progress: bool = True, batch_size: t.Optional[int] = None, + _run_id: t.Optional[UUID] = None, + _pbar: t.Optional[tqdm] = None, ) -> EvaluationResult: """ Run the evaluation on the dataset with different metrics @@ -146,6 +150,7 @@ def evaluate( """ column_map = column_map or {} callbacks = callbacks or [] + run_config = run_config or RunConfig() if helicone_config.is_enabled: import uuid @@ -226,6 +231,7 @@ def evaluate( run_config=run_config, show_progress=show_progress, batch_size=batch_size, + pbar=_pbar, ) # Ragas Callbacks @@ -333,6 +339,7 @@ def evaluate( cost_cb, ), ragas_traces=tracer.traces, + run_id=_run_id, ) if not evaluation_group_cm.ended: evaluation_rm.on_chain_end({"scores": result.scores}) diff --git a/src/ragas/executor.py b/src/ragas/executor.py index cd672b5c0..a0e209694 100644 --- a/src/ragas/executor.py +++ b/src/ragas/executor.py @@ -84,6 +84,7 @@ class Executor: batch_size: t.Optional[int] = None run_config: t.Optional[RunConfig] = field(default=None, repr=False) _nest_asyncio_applied: bool = field(default=False, repr=False) + pbar: t.Optional[tqdm] = None def wrap_callable_with_index( self, callable: t.Callable, counter: int @@ -130,21 +131,22 @@ async def _process_jobs(self) -> t.List[t.Any]: results = [] if not self.batch_size: - with tqdm( - total=len(self.jobs), - desc=self.desc, - disable=not self.show_progress, - ) as pbar: - # Create coroutines - coroutines = [ - afunc(*args, **kwargs) for afunc, args, kwargs, _ in self.jobs - ] - for future in await as_completed(coroutines, max_workers): - result = await future - results.append(result) - pbar.update(1) + # Use external progress bar if provided, otherwise create one + if self.pbar is None: + with tqdm( + total=len(self.jobs), + desc=self.desc, + disable=not self.show_progress, + ) as internal_pbar: + await self._process_coroutines( + self.jobs, internal_pbar, results, max_workers + ) + else: + await self._process_coroutines( + self.jobs, self.pbar, results, max_workers + ) - return results + return results # With batching, show nested progress bars batches = batched(self.jobs, self.batch_size) # generator of job tuples @@ -182,6 +184,14 @@ async def _process_jobs(self) -> t.List[t.Any]: return results + async def _process_coroutines(self, jobs, pbar, results, max_workers): + """Helper function to process coroutines and update the progress bar.""" + coroutines = [afunc(*args, **kwargs) for afunc, args, kwargs, _ in jobs] + for future in await as_completed(coroutines, max_workers): + result = await future + results.append(result) + pbar.update(1) + def results(self) -> t.List[t.Any]: """ Execute all submitted jobs and return their results. The results are returned in the order of job submission. diff --git a/src/ragas/llms/base.py b/src/ragas/llms/base.py index 9594d4344..b08a00e48 100644 --- a/src/ragas/llms/base.py +++ b/src/ragas/llms/base.py @@ -134,7 +134,7 @@ def __init__( def is_finished(self, response: LLMResult) -> bool: """ Parse the response to check if the LLM finished by checking the finish_reason - or stop_reason. + or stop_reason. Supports OpenAI and Vertex AI models. """ if self.is_finished_parser is not None: return self.is_finished_parser(response) @@ -145,30 +145,34 @@ def is_finished(self, response: LLMResult) -> bool: resp = g.generations[0][0] if resp.generation_info is not None: # generation_info is provided - so we parse that - - # OpenAI uses "stop" to indicate that the generation is finished - # and is stored in 'finish_reason' key in generation_info - if resp.generation_info.get("finish_reason") is not None: + finish_reason = resp.generation_info.get("finish_reason") + if finish_reason is not None: + # OpenAI uses "stop" + # Vertex AI uses "STOP" or "MAX_TOKENS" is_finished_list.append( - resp.generation_info.get("finish_reason") == "stop" + finish_reason in ["stop", "STOP", "MAX_TOKENS"] ) + # provied more conditions here # https://github.com/explodinggradients/ragas/issues/1548 # if generation_info is empty, we parse the response_metadata # this is less reliable + elif ( isinstance(resp, ChatGeneration) and t.cast(ChatGeneration, resp).message is not None ): resp_message: BaseMessage = t.cast(ChatGeneration, resp).message if resp_message.response_metadata.get("finish_reason") is not None: + finish_reason = resp_message.response_metadata.get("finish_reason") is_finished_list.append( - resp_message.response_metadata.get("finish_reason") == "stop" + finish_reason in ["stop", "STOP", "MAX_TOKENS"] ) elif resp_message.response_metadata.get("stop_reason") is not None: + stop_reason = resp_message.response_metadata.get("stop_reason") is_finished_list.append( - resp_message.response_metadata.get("stop_reason") == "end_turn" + stop_reason in ["end_turn", "STOP", "MAX_TOKENS"] ) # default to True else: diff --git a/src/ragas/losses.py b/src/ragas/losses.py index 52661bd87..0efc15eaa 100644 --- a/src/ragas/losses.py +++ b/src/ragas/losses.py @@ -1,6 +1,9 @@ import typing as t from abc import ABC, abstractmethod +from pydantic import GetCoreSchemaHandler +from pydantic_core import CoreSchema, core_schema + class Loss(ABC): """ @@ -11,6 +14,17 @@ class Loss(ABC): def __call__(self, predicted: t.List, actual: t.List) -> float: raise NotImplementedError + @classmethod + def __get_pydantic_core_schema__( + cls, source_type: t.Any, handler: GetCoreSchemaHandler + ) -> CoreSchema: + """ + Define how Pydantic generates a schema for BaseRagasEmbeddings. + """ + return core_schema.no_info_after_validator_function( + cls, core_schema.is_instance_schema(cls) # The validator function + ) + class MSELoss(Loss): """ diff --git a/src/ragas/metrics/base.py b/src/ragas/metrics/base.py index 12e23bbbf..52838b0b2 100644 --- a/src/ragas/metrics/base.py +++ b/src/ragas/metrics/base.py @@ -12,8 +12,9 @@ from ragas._analytics import EvaluationEvent, _analytics_batcher from ragas.callbacks import ChainType, new_group -from ragas.dataset_schema import MultiTurnSample, SingleTurnSample +from ragas.dataset_schema import MetricAnnotation, MultiTurnSample, SingleTurnSample from ragas.executor import is_event_loop_running +from ragas.losses import BinaryMetricLoss, MSELoss from ragas.prompt import PromptMixin from ragas.run_config import RunConfig from ragas.utils import ( @@ -232,12 +233,77 @@ def init(self, run_config: RunConfig): def train( self, path: str, - demonstration_config: DemonstrationConfig, - instruction_config: InstructionConfig, - callbacks: Callbacks, + demonstration_config: t.Optional[DemonstrationConfig] = None, + instruction_config: t.Optional[InstructionConfig] = None, + callbacks: t.Optional[Callbacks] = None, + run_config: t.Optional[RunConfig] = None, + batch_size: t.Optional[int] = None, + with_debugging_logs=False, + raise_exceptions: bool = True, ) -> None: - raise NotImplementedError("Training is not implemented for this metric.") + if not path.endswith(".json"): + raise ValueError("Train data must be in json format") + + if instruction_config is None: + from ragas.config import InstructionConfig + + instruction_config = InstructionConfig() + + if demonstration_config is None: + from ragas.config import DemonstrationConfig + + demonstration_config = DemonstrationConfig() + + dataset = MetricAnnotation.from_json(path, metric_name=self.name) + + optimizer = instruction_config.optimizer + llm = instruction_config.llm or self.llm + if llm is None: + raise ValueError( + f"Metric '{self.name}' has no valid LLM provided (self.llm is None). Please initantiate a the metric with an LLM to run." # noqa + ) + if optimizer.llm is None: + optimizer.llm = llm + + if instruction_config.loss is None: + if self.output_type is None: + raise ValueError( + f"Output type for metric '{self.name}' is not defined. Please set the output type in the metric or in the instruction config." + ) + + if self.output_type.name == MetricOutputType.BINARY.name: + loss_fun = BinaryMetricLoss() + elif ( + self.output_type.name == MetricOutputType.CONTINUOUS.name + or self.output_type.name == MetricOutputType.DISCRETE.name + ): + loss_fun = MSELoss() + else: + raise NotImplementedError( + f"Output type '{self.output_type.name}' not implemented" + ) + else: + loss_fun = instruction_config.loss + + optimizer.metric = self + + optimizer_config = instruction_config.optimizer_config or {} + optimized_prompts = optimizer.optimize( + dataset[self.name], + loss_fun, + optimizer_config, + callbacks=callbacks, + run_config=run_config, + batch_size=batch_size, + with_debugging_logs=with_debugging_logs, + raise_exceptions=raise_exceptions, + ) + prompts = self.get_prompts() + for key, val in optimized_prompts.items(): + prompts[key].instruction = val + self.set_prompts(**prompts) + return @dataclass diff --git a/src/ragas/optimizers/__init__.py b/src/ragas/optimizers/__init__.py index 93b17a651..2780da08f 100644 --- a/src/ragas/optimizers/__init__.py +++ b/src/ragas/optimizers/__init__.py @@ -1,3 +1,7 @@ -from .base import Optimizer +from ragas.optimizers.base import Optimizer +from ragas.optimizers.genetic import GeneticOptimizer -__all__ = ["Optimizer"] +__all__ = [ + "Optimizer", + "GeneticOptimizer", +] diff --git a/src/ragas/optimizers/base.py b/src/ragas/optimizers/base.py index 31a1c9c61..2ba582a52 100644 --- a/src/ragas/optimizers/base.py +++ b/src/ragas/optimizers/base.py @@ -49,4 +49,4 @@ def optimize( Dict[str, str] The optimized prompts for given chain. """ - pass + raise NotImplementedError("The method `optimize` must be implemented.") diff --git a/src/ragas/optimizers/genetic.py b/src/ragas/optimizers/genetic.py new file mode 100644 index 000000000..9dd7cf538 --- /dev/null +++ b/src/ragas/optimizers/genetic.py @@ -0,0 +1,752 @@ +import logging +import typing as t +from uuid import UUID + +import numpy as np +from langchain_core.callbacks import Callbacks +from pydantic import BaseModel +from tqdm.auto import tqdm + +from ragas.callbacks import new_group +from ragas.dataset_schema import ( + EvaluationDataset, + EvaluationResult, + SampleAnnotation, + SingleMetricAnnotation, +) +from ragas.evaluation import evaluate +from ragas.executor import Executor +from ragas.losses import Loss +from ragas.optimizers.base import Optimizer +from ragas.optimizers.utils import hamming_distance +from ragas.prompt import PydanticPrompt +from ragas.run_config import RunConfig + +logger = logging.getLogger(__name__) + +RAGAS_OPTIMIZATION_GROUP = "ragas_optimization" + +example_type = t.TypeVar( + "example_type", bound=t.Dict[t.Dict[str, t.Any], t.Dict[str, t.Any]] +) + + +class FormattedExamples(BaseModel): + examples: t.List[t.Tuple[str, t.Any]] + + @classmethod + def from_examples(cls, examples: t.List[example_type]) -> "FormattedExamples": + + formated_examples = [] + for example in examples: + input_, output = example.values() + input_ = "".join(f"\n{key}:\n\t{val}\n" for key, val in input_.items()) + formated_examples.append((input_, output)) + + return cls(examples=formated_examples) + + +class OutputInstruction(BaseModel): + instruction: str + + +class ReverseEngineerPrompt(PydanticPrompt[FormattedExamples, OutputInstruction]): + name: str = "reverse_engineer" + instruction: str = ( + "Given a set of (input containing (user_input, response, reference, etc), expected output) pairs that were manually annotated, guess and generate the instruction given to the annotator." + ) + input_model = FormattedExamples + output_model = OutputInstruction + + +class ParentPrompts(BaseModel): + parent_1: str + parent_2: str + + +class CrossOverPrompt(PydanticPrompt[ParentPrompts, OutputInstruction]): + name: str = "crossover" + instruction: str = ( + "You are a mutator who is familiar with the concept of cross-over in genetic algorithm, namely " + "combining the genetic information of two parents to generate new offspring. Given two parent " + "prompts, you will perform a cross-over to generate an offspring prompt that covers the same " + "semantic meaning as both parents." + ) + input_model = ParentPrompts + output_model = OutputInstruction + examples = [ + ( + ParentPrompts( + parent_1="Now you are a categorizer, your mission is to ascertain the sentiment of the provided text, either favorable or unfavorable.", + parent_2="Assign a sentiment label to the given sentence from [’negative’, ’positive’] and return only the label without any other text.", + ), + OutputInstruction( + instruction="Your mission is to ascertain the sentiment of the provided text and assign a sentiment label from [’negative’, ’positive’].", + ), + ) + ] + + +class FeedbackExample(BaseModel): + input: str + output: t.Dict[str, t.Any] + expected_output: t.Dict[str, t.Any] + + +class FeedbackMutationInput(BaseModel): + instruction: str + examples: t.List[FeedbackExample] + + +class FeedbackMutationOutput(BaseModel): + feedbacks: t.List[str] + + +class FeedbackMutationPrompt( + PydanticPrompt[FeedbackMutationInput, FeedbackMutationOutput] +): + name: str = "feedback_mutation" + instruction: str = ( + "You're an expert reviewer. Given an instruction and a set of (input containing (user_input, response, reference, etc), output, expected_output) examples, give maximum 3 feedbacks on how the instruction can be improved to correct the mistakes in incorrect outputs and reach expected output." + "Do not provide the feedback to add examples with the instruction." + ) + input_model = FeedbackMutationInput + output_model = FeedbackMutationOutput + + +class FeedbackMutationPromptInput(BaseModel): + instruction: str + feedbacks: t.List[str] + + +class FeedbackMutationPromptGeneration( + PydanticPrompt[FeedbackMutationPromptInput, OutputInstruction] +): + name: str = "feedback_mutation_generation" + instruction: str = ( + "You are a mutator. Given an instruction and a set of feedbacks on how the instruction can be improved generate a new instruction that incorporates the feedback." + ) + input_model = FeedbackMutationPromptInput + output_model = OutputInstruction + + +class GeneticOptimizer(Optimizer): + """ + A genetic algorithm optimizer that balances exploration and exploitation. + """ + + reverse_engineer_prompt = ReverseEngineerPrompt() + cross_over_prompt = CrossOverPrompt() + feedback_generation_prompt = FeedbackMutationPrompt() + feedback_mutation_prompt = FeedbackMutationPromptGeneration() + + def optimize( + self, + dataset: SingleMetricAnnotation, + loss: Loss, + config: t.Dict[t.Any, t.Any], + run_config: t.Optional[RunConfig] = None, + batch_size: t.Optional[int] = None, + callbacks: t.Optional[Callbacks] = None, + with_debugging_logs=False, + raise_exceptions: bool = True, + ) -> t.Dict[str, str]: + + callbacks = callbacks or [] + + if self.metric is None: + raise ValueError("No metric provided for optimization.") + + if self.llm is None: + raise ValueError("No llm provided for optimization.") + + population_size = config.get("population_size", 3) + num_demonstrations = config.get("num_demonstrations", 3) + sample_size = config.get("sample_size", 10) + + # new group for optimization + optimization_generation_rm, optimization_generation_grp = new_group( + name=RAGAS_OPTIMIZATION_GROUP, + inputs={"metric": self.metric.name}, + callbacks=callbacks, + ) + + stages = [ + {"name": "Initializing Population", "steps": population_size - 1}, + { + "name": "Feedback Mutation", + "steps": population_size * sample_size + population_size, + }, + { + "name": "Cross-over Mutation", + "steps": population_size * len(dataset) + population_size, + }, + {"name": "Fitness Evaluation", "steps": population_size * len(dataset)}, + ] + total_steps = sum([stage["steps"] for stage in stages]) + with tqdm( + total=total_steps, desc="Overall Progress", dynamic_ncols=True + ) as parent_pbar: + + parent_pbar.set_description(f"{stages[0]['name']} Step 1/{len(stages)}") + initial_population = self.initialize_population( + dataset=dataset, + population_size=population_size - 1, + num_demonstrations=num_demonstrations, + run_config=run_config, + batch_size=batch_size, + callbacks=optimization_generation_grp, + raise_exceptions=raise_exceptions, + parent_pbar=parent_pbar, + ) + + # get the default prompt used in the metric as seed prompt + seed_prompts = { + key: val.instruction + for key, val in self.metric.get_prompts().items() + if key in initial_population[0].keys() + } + initial_population.append(seed_prompts) + + parent_pbar.set_description(f"{stages[1]['name']} Step 2/{len(stages)}") + improved_prompts = self.feedback_mutation( + initial_population, + dataset, + sample_size=sample_size, + run_config=run_config, + batch_size=batch_size, + callbacks=optimization_generation_grp, + raise_exceptions=raise_exceptions, + parent_pbar=parent_pbar, + ) + + parent_pbar.set_description(f"{stages[2]['name']} Step 3/{len(stages)}") + improved_prompts = self.cross_over_mutation( + candidates=improved_prompts, + dataset=dataset, + run_config=run_config, + batch_size=batch_size, + callbacks=optimization_generation_grp, + raise_exceptions=raise_exceptions, + parent_pbar=parent_pbar, + ) + + parent_pbar.set_description(f"{stages[3]['name']} Step 4/{len(stages)}") + fitness_scores = self.evaluate_fitness( + candidates=improved_prompts, + dataset=dataset, + loss_fn=loss, + run_config=run_config, + batch_size=batch_size, + callbacks=optimization_generation_grp, + raise_exceptions=raise_exceptions, + parent_pbar=parent_pbar, + ) + best_candidate = improved_prompts[np.argmax(fitness_scores)] + + optimization_generation_rm.on_chain_end( + outputs={"best_candidate": best_candidate} + ) + + return best_candidate + + def initialize_population( + self, + *, + dataset: SingleMetricAnnotation, + population_size: int, + num_demonstrations: int = 3, + run_config: t.Optional[RunConfig] = None, + batch_size: t.Optional[int] = None, + callbacks: t.Optional[Callbacks] = None, + raise_exceptions: bool = True, + parent_pbar: t.Optional[tqdm] = None, + ) -> t.List[t.Dict[str, str]]: + + initialize_population_rm, initialize_population_grp = new_group( + name="Initializing Population", + inputs={"population_size": population_size}, + callbacks=callbacks, + ) + + exec = Executor( + desc="Initializing Population", + raise_exceptions=raise_exceptions, + run_config=run_config, + keep_progress_bar=False, + batch_size=batch_size, + pbar=parent_pbar, + ) + + candidates = [] + dataset = dataset.filter(lambda x: x["is_accepted"]) + batches = dataset.stratified_batches( + batch_size=num_demonstrations, + stratify_key="metric_output", + replace=False, + drop_last_batch=False, + ) + for batch in batches[:population_size]: + exec.submit( + self._reverse_engineer_instruction, + batch=batch, + callbacks=initialize_population_grp, + ) + + try: + candidates = exec.results() + except Exception as e: + initialize_population_rm.on_chain_error(e) + raise e + else: + initialize_population_rm.on_chain_end( + outputs={"initial_population": candidates} + ) + + return candidates + + async def _reverse_engineer_instruction( + self, batch: t.List[SampleAnnotation], callbacks: Callbacks = None + ) -> t.Dict[str, str]: + + if self.llm is None: + raise ValueError("No llm provided for optimization.") + + if self.metric is None: + raise ValueError("No metric provided for optimization.") + + prompt_annotations = {key: [] for key in batch[0]["prompts"].keys()} + candidates = {} + for sample in batch: + input_ouputs = sample["prompts"] + for name, example in input_ouputs.items(): + input_ = { + key: val + for key, val in example["prompt_input"].items() + if val is not None + } + output = ( + example["edited_output"] + if example["edited_output"] + else example["prompt_output"] + ) + prompt_annotations[name].append({"input": input_, "output": output}) + + for prompt_name, examples in prompt_annotations.items(): + formatted_examples = FormattedExamples.from_examples(examples) + instruction = await self.reverse_engineer_prompt.generate( + data=formatted_examples, llm=self.llm, callbacks=callbacks + ) + candidates[prompt_name] = instruction.instruction + + return candidates + + async def _cross_over_prompts( + self, parent_1: str, parent_2: str, callbacks: Callbacks = None + ) -> str: + + if self.llm is None: + raise ValueError("No llm provided for optimization.") + + parents = ParentPrompts(parent_1=parent_1, parent_2=parent_2) + offspring = await self.cross_over_prompt.generate( + data=parents, llm=self.llm, callbacks=callbacks + ) + return offspring.instruction + + def _set_instructions(self, candidates: t.Dict[str, str]): + if self.metric is None: + raise ValueError("No metric provided for optimization.") + prompts = self.metric.get_prompts() + for key, val in candidates.items(): + prompts[key].instruction = val + self.metric.set_prompts(**prompts) + + def feedback_mutation( + self, + candidates: t.List[t.Dict[str, str]], + dataset: SingleMetricAnnotation, + sample_size: int, + run_config: t.Optional[RunConfig] = None, + batch_size: t.Optional[int] = None, + callbacks: t.Optional[Callbacks] = None, + raise_exceptions: bool = True, + parent_pbar: t.Optional[tqdm] = None, + ) -> t.List[t.Dict[str, str]]: + + if self.metric is None: + raise ValueError("No metric provided for optimization.") + + feedback_rm, feedback_grp = new_group( + name="Feedback mutation", + inputs={"candidates": candidates}, + callbacks=callbacks, + ) + improved_candidates = [] + dataset = dataset.filter(lambda x: x["is_accepted"]) + + exec = Executor( + desc="Feedback Mutation", + raise_exceptions=raise_exceptions, + run_config=run_config, + keep_progress_bar=False, + batch_size=batch_size, + pbar=parent_pbar, + ) + + for candidate in candidates: + dataset_sample = dataset.sample(sample_size, stratify_key="metric_output") + exec.submit( + self._feedback_mutation, + candidate=candidate, + dataset=dataset_sample, + callbacks=feedback_grp, + raise_exceptions=raise_exceptions, + batch_size=batch_size, + run_config=run_config, + parent_pbar=parent_pbar, + ) + + try: + improved_candidates = exec.results() + except Exception as e: + feedback_rm.on_chain_error(e) + raise e + else: + feedback_rm.on_chain_end( + outputs={"improved_candidate": improved_candidates} + ) + feedback_rm.on_chain_end(outputs={"improved candidates": improved_candidates}) + + return improved_candidates + + async def _feedback_mutation( + self, + candidate: t.Dict[str, str], + dataset: SingleMetricAnnotation, + run_config: t.Optional[RunConfig] = None, + batch_size: t.Optional[int] = None, + callbacks: t.Optional[Callbacks] = None, + raise_exceptions: bool = True, + parent_pbar: t.Optional[tqdm] = None, + ) -> t.Dict[str, str]: + + if self.llm is None: + raise ValueError("No llm provided for optimization.") + + if self.metric is None: + raise ValueError("No metric provided for optimization.") + + candidate_rm, candidate_grp = new_group( + name="Candidate feedback mutation", + inputs={"candidate": candidate}, + callbacks=callbacks, + ) + batch, target = self._get_evaluation_dataset(dataset) + results = self.evaluate_candidate( + candidate=candidate, + eval_dataset=batch, + run_config=run_config, + batch_size=batch_size, + callbacks=candidate_grp, + raise_exceptions=raise_exceptions, + run_id=candidate_rm.run_id, + parent_pbar=parent_pbar, + ) + + feedback_candidate = await self._get_feedbacks( + candidate, dataset, results, target, candidate_grp + ) + improved_candidate = await self._implement_feedbacks( + candidate, feedback_candidate, candidate_grp + ) + + candidate_rm.on_chain_end(outputs={"improved_candidate": improved_candidate}) + return improved_candidate + + async def _implement_feedbacks( + self, + candidate: t.Dict[str, str], + feedbacks: t.Dict[str, t.List[str]], + callbacks: Callbacks = None, + ) -> t.Dict[str, str]: + + if self.llm is None: + raise ValueError("No llm provided for optimization.") + + improved_candidate = {} + for key in candidate.keys(): + feedback = feedbacks[key] + if feedback: + feedback_input = FeedbackMutationPromptInput( + instruction=candidate[key], feedbacks=feedback + ) + output = await self.feedback_mutation_prompt.generate( + data=feedback_input, llm=self.llm, callbacks=callbacks + ) + improved_candidate[key] = output.instruction + else: + improved_candidate[key] = candidate[key] + logger.warning( + f"No feedbacks found for the prompt {key}. Returning the original prompt." + ) + + return improved_candidate + + async def _get_feedbacks( + self, + candidate: t.Dict[str, str], + dataset: SingleMetricAnnotation, + results: EvaluationResult, + target: t.List[float], + callbacks: Callbacks = None, + ) -> t.Dict[str, t.List[str]]: + + def dict_to_str(dict: t.Dict[str, t.Any]) -> str: + return "".join(f"\n{key}:\n\t{val}\n" for key, val in dict.items()) + + if self.llm is None: + raise ValueError("No llm provided for optimization.") + + if self.metric is None: + raise ValueError("No metric provided for optimization.") + + prediction = results.to_pandas()[self.metric.name].values.tolist() + indices = [idx for idx in range(len(target)) if target[idx] != prediction[idx]] + traces = [trace[self.metric.name] for trace in results.traces] + if indices: + feedback_candidates = {} + for prompt_name in candidate.keys(): + feedback_data = [ + FeedbackExample( + input=dict_to_str( + traces[idx][prompt_name]["input"].model_dump( + exclude_none=True + ) + ), + output=traces[idx][prompt_name]["output"][0].model_dump( + exclude_none=True + ), + expected_output=dataset[idx]["prompts"][prompt_name][ + "prompt_output" + ], + ) + for idx in indices + ] + prompt_input = FeedbackMutationInput( + instruction=candidate[prompt_name], examples=feedback_data + ) + feedbacks = await self.feedback_generation_prompt.generate( + data=prompt_input, llm=self.llm, callbacks=callbacks + ) + feedback_candidates[prompt_name] = feedbacks.feedbacks + else: + logger.warning("No samples found for the feedback generation.") + feedback_candidates = {prompt_name: [] for prompt_name in candidate.keys()} + + return feedback_candidates + + def _get_evaluation_dataset( + self, dataset: SingleMetricAnnotation + ) -> t.Tuple[EvaluationDataset, t.List[float]]: + + if self.metric is None: + raise ValueError("No metric provided for optimization.") + + if self.metric.output_type is None: + raise ValueError("No output type provided for the metric.") + + training_ids = [] + y_true = [] + for idx, sample in enumerate(dataset): + if sample["is_accepted"]: + training_ids.append(idx) + y_true.append(sample.metric_output) + elif not sample["is_accepted"] and self.metric.output_type.name == "BINARY": + training_ids.append(idx) + y_true.append(int(not sample.metric_output)) + + dataset = dataset.select(training_ids) + eval_dataset = dataset.to_evaluation_dataset() + return eval_dataset, y_true + + def evaluate_candidate( + self, + *, + candidate: t.Dict[str, str], + eval_dataset: EvaluationDataset, + run_config: t.Optional[RunConfig] = None, + batch_size: t.Optional[int] = None, + callbacks: t.Optional[Callbacks] = None, + raise_exceptions: bool = True, + run_id: t.Optional[UUID] = None, + parent_pbar: t.Optional[tqdm] = None, + ) -> EvaluationResult: + + if self.metric is None: + raise ValueError("No metric provided for optimization.") + + self._set_instructions(candidate) + results = evaluate( + eval_dataset, + metrics=[self.metric], + llm=self.llm, + run_config=run_config, + batch_size=batch_size, + callbacks=callbacks, + raise_exceptions=raise_exceptions, + _run_id=run_id, + _pbar=parent_pbar, + ) + # remap the traces to the original prompt names + remap_traces = {val.name: key for key, val in self.metric.get_prompts().items()} + for trace in results.traces: + for key in remap_traces: + if key in trace[self.metric.name]: + trace[self.metric.name][remap_traces[key]] = trace[ + self.metric.name + ].pop(key) + return results + + def evaluate_fitness( + self, + *, + candidates: t.List[t.Dict[str, str]], + dataset: SingleMetricAnnotation, + loss_fn: Loss, + run_config: t.Optional[RunConfig] = None, + batch_size: t.Optional[int] = None, + callbacks: t.Optional[Callbacks] = None, + raise_exceptions: bool = True, + parent_pbar: t.Optional[tqdm] = None, + ) -> t.List[float]: + + if self.metric is None: + raise ValueError("No metric provided for optimization.") + + losses = [] + + eval_dataset, y_true = self._get_evaluation_dataset(dataset) + + initialize_population_rm, initialize_population_grp = new_group( + name="Evaluating candidate fitness", + inputs={"candidates": candidates}, + callbacks=callbacks, + ) + run_id = initialize_population_rm.run_id + for candidate in candidates: + + results = self.evaluate_candidate( + candidate=candidate, + eval_dataset=eval_dataset, + run_config=run_config, + batch_size=batch_size, + callbacks=initialize_population_grp, + raise_exceptions=raise_exceptions, + run_id=run_id, + parent_pbar=parent_pbar, + ) + y_pred = results.to_pandas()[self.metric.name].values.tolist() + loss = loss_fn(y_true, y_pred) + losses.append(loss) + + initialize_population_rm.on_chain_end(outputs={"losses": losses}) + + return losses + + async def _cross_over_chain( + self, + parent_x: t.Dict[str, str], + parent_y: t.Dict[str, str], + callbacks: Callbacks, + ): + + if parent_x.keys() != parent_y.keys(): + raise ValueError("The parents must have the same prompt names.") + + chain_offsprings = {} + for key in parent_x.keys(): + offspring = await self._cross_over_prompts( + parent_x[key], parent_y[key], callbacks + ) + chain_offsprings[key] = offspring + + return chain_offsprings + + def cross_over_mutation( + self, + *, + candidates: t.List[t.Dict[str, str]], + dataset: SingleMetricAnnotation, + run_config: t.Optional[RunConfig] = None, + batch_size: t.Optional[int] = None, + callbacks: t.Optional[Callbacks] = None, + raise_exceptions: bool = True, + parent_pbar: t.Optional[tqdm] = None, + ): + + if self.metric is None: + raise ValueError("No metric provided for optimization.") + + if self.llm is None: + raise ValueError("No llm provided for optimization.") + + eval_dataset, y_true = self._get_evaluation_dataset(dataset) + + cross_over_rm, cross_over_grp = new_group( + name="Cross-over mutation", + inputs={"candidates": candidates}, + callbacks=callbacks, + ) + run_id = cross_over_rm.run_id + prediction_vectors = [] + for candidate in candidates: + + results = self.evaluate_candidate( + candidate=candidate, + eval_dataset=eval_dataset, + run_config=run_config, + batch_size=batch_size, + callbacks=cross_over_grp, + raise_exceptions=raise_exceptions, + run_id=run_id, + parent_pbar=parent_pbar, + ) + y_pred = results.to_pandas()[self.metric.name].values.tolist() + prediction = [int(pred == true) for pred, true in zip(y_pred, y_true)] + prediction_vectors.append(prediction) + + prediction_vectors = np.array(prediction_vectors) + distance_matrix = hamming_distance(prediction_vectors) + + exec = Executor( + desc="Mutating candidates", + raise_exceptions=raise_exceptions, + run_config=run_config, + keep_progress_bar=False, + batch_size=batch_size, + pbar=parent_pbar, + ) + + offspring_candidates = [] + for idx, candidate in enumerate(candidates): + parent_x = candidates[idx] + parent_y = candidates[np.argmin(distance_matrix[idx])] + exec.submit( + self._cross_over_chain, + parent_x=parent_x, + parent_y=parent_y, + callbacks=cross_over_grp, + ) + + try: + offspring_candidates = exec.results() + except Exception as e: + cross_over_rm.on_chain_error(e) + raise e + else: + cross_over_rm.on_chain_end( + outputs={"offspring_candidates": offspring_candidates} + ) + + return offspring_candidates diff --git a/src/ragas/optimizers/utils.py b/src/ragas/optimizers/utils.py new file mode 100644 index 000000000..a061fb554 --- /dev/null +++ b/src/ragas/optimizers/utils.py @@ -0,0 +1,28 @@ +import numpy as np +from numpy.typing import NDArray + + +def hamming_distance(vectors: NDArray[np.int_]) -> NDArray[np.int_]: + """ + Calculate the Hamming distance between pairs of vectors in a list of lists. + + Args: + vectors (list of lists): A list where each inner list is a vector. + + Returns: + list of tuples: A list of tuples containing the pair indices and their Hamming distance. + """ + + # Validate that all vectors have the same dimension + length = len(vectors[0]) + if any(len(v) != length for v in vectors): + raise ValueError("All vectors must have the same dimensions.") + + # Calculate Hamming distances for all pairs + distances = np.zeros((len(vectors), len(vectors)), dtype=int) + for i in range(len(vectors)): + for j in range(i + 1, len(vectors)): + distance = np.sum(vectors[i] != vectors[j]) + distances[i][j] = distance + + return distances