diff --git a/framework-evalanche/README.md b/framework-evalanche/README.md index 13678d2..6d20c37 100644 --- a/framework-evalanche/README.md +++ b/framework-evalanche/README.md @@ -21,7 +21,7 @@ Please see TAGGING.md for details on object comments. * [Running](#release) * [Extras](#extras) + [Custom Metrics](#custom-metrics) - + [Crafting a LLM Pipeline Stored Procedure](#crafting-a-llm-pipeline-stored-procedure) + + [Generating Results to Evaluate](#generating-results-to-evaluate) # Overview Evalanche is a Streamlit in Snowflake (SiS) application that provides a single location to evaluate and compare generative AI use case outputs in a streamlined, on demand, and automated fashion. Regardless if your goal is to measure the quality of RAG-based LLM solutions or accuracy of SQL generation, Evalanche provides a scalable, customizable, and trackable way to do it. @@ -95,7 +95,10 @@ CALL GENAI_UTILITIES.EVALUATION.DELETE_METRIC('Rudeness'); Lastly, please be aware that Streamlit in Snowflake now supports multiple python versions. Custom metrics may only be available with consistent Python versions. For example, if you create a custom metric while running the app with Python version 3.11, the custom metric will only be available in subsequent sessions when running Python 3.11. -## Crafting a LLM Pipeline Stored Procedure +## Generating Results to Evaluate +Evalanche primarily assumes you've saved LLM outputs to table(s) in Snowflake for us to evaluate. That may not be the case. Evalanche supports two ways to generate outputs using either a custom LLM pipeline or a Cortex Analyst runner. Both options are available from the data page (under "Need to Generate Results?") once you've selected your desired Metric(s). + +### Crafting a Stored Procedure for your Custom LLM Pipeline To run a reference dataset through your desired LLM pipelines on the data page, we must first encapsulated the pipeline logic in a Stored Procedure. To take advantage of this feature, the stored procedure must have a single VARIANT input type and return a single value. When we execute the stored procedure, a single row from the reference dataset will be passed in the form of a Python dictionary. In other words, a row in the reference dataset that looks like: ```markdown | TASK | PERSONA | @@ -109,7 +112,7 @@ will be passed to the stored procedure as: "PERSONA": "Pirate" } ``` -A appropriately crafted stored procedure could look like the below. +An appropriately crafted stored procedure could look like the below. ```sql CREATE OR REPLACE PROCEDURE MY_PIPELINE(INPUT VARIANT) RETURNS STRING @@ -131,3 +134,9 @@ def run(session, INPUT): prompt = prompt) $$; ``` + +### Using the Cortex Analyst Runner +To run a gold or reference set of questions through Cortex Analyst, select the target semantic model and the table containing the reference questions. The SQL results will be written to a table for further evaluation with the Cortex Analyst-suggested metric. + +# Feedback +Please add issues to GitHub or email Jason Summer (jason.summer@snowflake.com). \ No newline at end of file diff --git a/framework-evalanche/pages/data.py b/framework-evalanche/pages/data.py index 2609a58..19cd2ec 100644 --- a/framework-evalanche/pages/data.py +++ b/framework-evalanche/pages/data.py @@ -27,16 +27,11 @@ ) TITLE = "Data Selection" -if ( - st.session_state.get("selected_metrics", None) is not None - and st.session_state.get("eval_funnel", None) == "new" -): - INSTRUCTIONS = """ - Select your evaluation data below. - The evaluation data should contain all metric inputs and any additional columns to retain through evaluation. - You can specify a single dataset or separate datasets for expected and actual results, if applicable.""" -else: - INSTRUCTIONS = "Please first select a metric from home." + +INSTRUCTIONS = """ +Select your evaluation data below. +The evaluation data should contain all metric inputs and any additional columns to retain through evaluation. +You can specify a single dataset or separate datasets for expected and actual results, if applicable.""" st.set_page_config( page_title=TITLE, @@ -57,6 +52,16 @@ FROM """ +BESPOKE_INSTRUCTIONS = """Before you start, your LLM pipeline must be encapsulated in a stored procedure that takes a VARIANT input and returns a single value. + Every row of the reference table will be passed through the stored procedure as a dictionary. + Every column in the reference table will be passed to the stored procedure but only those columns selected will be passed to the stored procedure. + Please see [Snowflake Stored Procedure documentation](https://docs.snowflake.com/en/developer-guide/stored-procedure/stored-procedures-overview) + for details on stored procedures and these [specific instructions](https://github.com/sfc-gh-jsummer/evalanche#crafting-a-llm-pipeline-stored-procedure) on crafting these stored procedures.""" + +CORTEX_ANALYST_INSTRUCTIONS = """Have reference questions to run through Cortex Analyst? + Select the Semantic Model in stage, table containing the reference questions, and a destination table. + We will do the rest. Take note of the table name as it will be used in the next step to evaluate the results.""" + def check_models(models: List[str]) -> None: """Check if models are available in the Snowflake account region.""" @@ -211,13 +216,22 @@ def sproc_runner(session: Session, sproc_name: str, inputs: Dict[str, Any]) -> T elapsed_time = time.time() - start_time return (record_result, elapsed_time) +def cortex_analyst_sproc_runner(session: Session, sproc_name: str, question: str, semantic_model_path: str) -> Tuple[Union[int, float], Any]: + start_time = time.time() + record_result = session.sql(f"""CALL {sproc_name}('{question}', '{semantic_model_path}')""").collect_nowait().result()[0][0] + # record_result = session.call(sproc_name, inputs) # Once Snowpark supports thread-safe calls without parameter change + elapsed_time = time.time() - start_time + return (record_result, elapsed_time) + def pipeline_runner( session: Session, sproc: str, input_tablename: str, output_tablename: str, columns: List[str], -) -> None: + cortex_analyst: bool = False, + semantic_model: str = None, +) -> str: """Runs stored procedures asynchronously over input from Snowflake table. Stored procedures may not be asynchronous but calling of them is done asynchronously in the app. @@ -233,93 +247,178 @@ def pipeline_runner( input_tablename (string): Fully-qualified name of table with input values. output_tablename (string): Fully-qualified name of table to write results to. columns (list): List of columns to pass to stored procedure. + cortex_analyst (bool): Whether to run Cortex Analyst SQL generation. + semantic_model (string): Fully-qualified path to semantic model for Cortex Analyst. + Returns: + string: Fully-qualified name of table where results are written. """ import multiprocessing - from joblib import Parallel, delayed + from snowflake.snowpark.functions import lit + from src.snowflake_utils import add_row_id, save_eval_to_table df = add_row_id(session.table(input_tablename)) + first_column = columns[0] # We will use this to pass to Cortex Analyst sproc as the semantic file path columns = columns + ["ROW_ID"] - for pandas_df in df.select(*columns).to_pandas_batches(): - # for pandas_df in df.to_pandas_batches(): - results = Parallel(n_jobs=multiprocessing.cpu_count(), backend="threading")( - delayed( - lambda row: { - "ROW_ID": row["ROW_ID"], # Capture ROW_ID - "RESPONSE": (response := sproc_runner(session, sproc, row.to_dict()))[0], - "ELAPSED_TIME": response[1], - } - )(row) - for _, row in pandas_df.iterrows() - ) + # Add semantic model as additional column for tracking purposes + if cortex_analyst: + if semantic_model is not None: + df = df.withColumn("MODEL", lit(semantic_model)) + columns = columns + ["MODEL"] + + for pandas_df in df.select(*columns).to_pandas_batches(): + results = [] + for _, row in pandas_df.iterrows(): + result = { + "ROW_ID": row["ROW_ID"], # Capture ROW_ID + "CORTEX_ANALYST_SQL": (response := cortex_analyst_sproc_runner( + session, + sproc, + row.to_dict()[first_column], + semantic_model))[0], + "ELAPSED_TIME": response[1], + } + results.append(result) + time.sleep(3) # Add a 3-second delay between processing each record to avoid overloading the system + + else: + for pandas_df in df.select(*columns).to_pandas_batches(): + # for pandas_df in df.to_pandas_batches(): + results = Parallel(n_jobs=multiprocessing.cpu_count(), backend="threading")( + delayed( + lambda row: { + "ROW_ID": row["ROW_ID"], # Capture ROW_ID + "RESPONSE": (response := sproc_runner(session, sproc, row.to_dict()))[0], + "ELAPSED_TIME": response[1], + } + )(row) + for _, row in pandas_df.iterrows() + ) result = session.create_dataframe(results).join(df, on="ROW_ID", how="left") save_eval_to_table(result, output_tablename) + return output_tablename @st.experimental_dialog("Run your LLM Pipeline", width="large") def pipeline_runner_dialog() -> None: """Dialog to run reference data through LLM pipeline and record results for evaluation.""" - from src.app_utils import get_sprocs, select_schema_context - - st.write(""" - Have reference questions or inputs but still need to run them through your LLM pipeline? - Use this dialog to run your reference set through your LLM pipeline and record the results to evaluate here. - - Before you start, your LLM pipeline must be encapsulated in a stored procedure that takes a VARIANT input and returns a single value. - Every row of the reference table will be passed through the stored procedure as a dictionary. - Every column in the reference table will be passed to the stored procedure but only those columns selected will be passed to the stored procedure. - Please see [Snowflake Stored Procedure documentation](https://docs.snowflake.com/en/developer-guide/stored-procedure/stored-procedures-overview) - for details on stored procedures and these [specific instructions](https://github.com/sfc-gh-jsummer/evalanche#crafting-a-llm-pipeline-stored-procedure) on crafting these stored procedures.""") - - name = "runner" - st.write("Select the stored procedure that encapsulates your LLM pipeline.") - schema_context = select_schema_context(name, on_change=get_sprocs, args=(name,)) - if f"{name}_sprocs" not in st.session_state: - st.session_state[f"{name}_sprocs"] = [] - sproc_name = st.selectbox( - "Select Stored Procedure", - st.session_state[f"{name}_sprocs"], - index=None, - ) - sproc_name = f"{schema_context['database']}.{schema_context['schema']}.{sproc_name}" - table = st.text_input("Enter Name for Generated Table", key=f"new_table_{name}") - new_tablename = f"{schema_context['database']}.{schema_context['schema']}.{table}" - st.divider() - - st.write("Select the reference data.") - name = "runner_output" - table_spec = table_data_selector(name, new_table=False) - data_table = ( - f'{table_spec["database"]}.{table_spec["schema"]}.{table_spec["table"]}' - ) - available_columns = fetch_columns(table_spec["database"],table_spec["schema"],table_spec["table"]) - selected_columns = st.multiselect( - "Select Columns", available_columns, default=None, key=f"columns_{name}", - help = "Select the columns to explicitly passed to the stored procedure." - ) + from src.app_utils import get_sprocs, select_schema_context, get_stages, get_semantic_models - if st.button("Run"): - with st.spinner("Running pipeline..."): - pipeline_runner( - st.session_state["session"], - sproc_name.split("(")[0], - data_table, - new_tablename, - selected_columns + + st.write("""Have reference questions or inputs but still need to run them through your LLM pipeline? + Use this dialog to run a reference set through your LLM pipeline and record the results.""") + + pipeline_selection = st.selectbox("Do you want to run **Cortex Analyst SQL Generation** or a **custom LLM Pipeline**?", + options=["Cortex Analyst", "Custom"], index=None) + + if pipeline_selection is not None: + if pipeline_selection == "Custom": + st.write(f'**Instructions:** {BESPOKE_INSTRUCTIONS}') + else: + st.write(f'**Instructions:** {CORTEX_ANALYST_INSTRUCTIONS}') + + name = "runner" + if pipeline_selection == "Custom": + st.write("Select the stored procedure that encapsulates your LLM pipeline.") + schema_context = select_schema_context(name, on_change=get_sprocs, args=(name,)) + + if f"{name}_sprocs" not in st.session_state: + st.session_state[f"{name}_sprocs"] = [] + sproc_name = st.selectbox( + "Select Stored Procedure", + st.session_state[f"{name}_sprocs"], + index=None, ) - # Set result_data to None so first rendering on results - # page will create it as pandas dataframe from Snowpark result dataframe - set_session_var_to_none('result_data') - st.success(f"Results written to {new_tablename}.") - time.sleep(2) - st.rerun() + sproc_name = f"{schema_context['database']}.{schema_context['schema']}.{sproc_name}" + + else: + st.write("Select the stage that contains your semantic model for Cortex Analyst.") + schema_context = select_schema_context(name, on_change=get_stages, args=(name,)) + + if f"{name}_stages" not in st.session_state: + st.session_state[f"{name}_stages"] = [] + if f"{name}_models" not in st.session_state: + st.session_state[f"{name}_models"] = [] + stage_name = st.selectbox( + "Select Snowflake Stage", + st.session_state[f"{name}_stages"], + index=None, + key=f"{name}_stage", + on_change=get_semantic_models, + args=(name,) + ) + semantic_model = st.selectbox( + "Select Semantic Model", + st.session_state[f"{name}_models"], + index=None, + key=f"{name}_model", + ) + qualified_semantic_model = f"{schema_context['database']}.{schema_context['schema']}.{stage_name}/{semantic_model}" + + + table = st.text_input("Enter Name for Generated Table", key=f"new_table_{name}", help = "Fully qualify if you would like to save in a different database/schema than above.") + if '.' not in table: + new_tablename = f"{schema_context['database']}.{schema_context['schema']}.{table}" + else: + new_tablename = table + st.divider() + + st.write("Select the reference question set.") + name = "runner_output" + table_spec = table_data_selector(name, new_table=False) + data_table = ( + f'{table_spec["database"]}.{table_spec["schema"]}.{table_spec["table"]}' + ) + available_columns = fetch_columns(table_spec["database"],table_spec["schema"],table_spec["table"]) + + if pipeline_selection == "Custom": + selected_columns = st.multiselect( + "Select Columns", available_columns, default=None, key=f"columns_{name}", + help = "Select the columns to be explicitly passed to the stored procedure." + ) + else: + selected_columns = st.selectbox( + "Select Column containing Reference Question", available_columns, index = None, key=f"columns_{name}", + help = "Select the column that contains the reference questions for Cortex Analyst." + ) + if st.button("Run"): + try: + if pipeline_selection == "Custom": + with st.spinner("Running pipeline..."): + st.session_state['returned_tablename'] = pipeline_runner( + st.session_state["session"], + sproc_name.split("(")[0], + data_table, + new_tablename, + selected_columns + ) + else: + with st.spinner("Running Cortex Analyst..."): + st.session_state['returned_tablename'] = pipeline_runner( + session = st.session_state["session"], + sproc = "GENAI_UTILITIES.EVALUATION.CORTEX_ANALYST_SQL", + input_tablename = data_table, + output_tablename = new_tablename, + columns = [selected_columns], + cortex_analyst = True, + semantic_model = qualified_semantic_model, + ) + # Set result_data to None so first rendering on results + # page will create it as pandas dataframe from Snowpark result dataframe + set_session_var_to_none('result_data') + st.success(f"Results written to {new_tablename}.") + except Exception as e: + st.error(f"Error: {e}") + st.stop() + time.sleep(2) + st.rerun() @st.experimental_dialog("Configure Metrics", width="large") @@ -424,56 +523,58 @@ def run_eval() -> None: def pick_data() -> None: """Main rendering function for page.""" - if ( - st.session_state.get("selected_metrics", None) is not None - and st.session_state.get("eval_funnel", None) == "new" - ): - data_split, runner_col, _ = st.columns([1, 1, 2]) - with data_split: - data_toggle = st.toggle( - "Separate Expected & Actual", - help="""Turn on to specify expected and actual datasets separately. - A join key will be necessary to compare the two datasets.""", - value=False, + data_split, runner_col, _ = st.columns([1, 1, 2]) + # Show table if results were written to a table in stored procedure runner + if 'returned_tablename' in st.session_state: + st.info(f"Recent results written to {st.session_state['returned_tablename']}.") + with data_split: + data_toggle = st.toggle( + "Separate Expected & Actual", + help="""Turn on to specify expected and actual datasets separately. + A join key will be necessary to compare the two datasets.""", + value=False, + ) + with runner_col: + runner_button = st.button( + "Need to generate results?", + use_container_width=True, + help="""Have reference questions or inputs but still need to run them through your LLM pipeline? + Use this dialog to run a reference set through your LLM pipeline and record the results to evaluate.""", + ) + if runner_button: + pipeline_runner_dialog() + if not data_toggle: + single_col, _ = st.columns(2) + with single_col: + data_spec( + key_name="single_source", + instructions="Select your evaluation dataset.", + join_key=False, ) - with runner_col: - runner_button = st.button( - "Need to generate results?", - use_container_width=True, - help="""Have reference questions or inputs but still need to run them through your LLM pipeline? - Use this dialog to run your reference set through your LLM pipeline and record the results to evaluate here.""", + else: + inf_col, ground_col = st.columns(2) + with inf_col: + data_spec( + key_name="ground", instructions="Select your expected results." ) - if runner_button: - pipeline_runner_dialog() - if not data_toggle: - single_col, _ = st.columns(2) - with single_col: - data_spec( - key_name="single_source", - instructions="Select your evaluation dataset.", - join_key=False, - ) - else: - inf_col, ground_col = st.columns(2) - with inf_col: - data_spec( - key_name="ground", instructions="Select your expected results." - ) - with ground_col: - data_spec( - key_name="inference", instructions="Select your actual results." - ) - button_container = row(10, vertical_align="center") - preview_button = button_container.button(":mag_right: Preview", use_container_width=True) - configure_button = button_container.button( - "▶️ Configure", use_container_width=True, - type="primary", - ) + with ground_col: + data_spec( + key_name="inference", instructions="Select your actual results." + ) + button_container = row(10, vertical_align="center") + preview_button = button_container.button(":mag_right: Preview", use_container_width=True) + configure_button = button_container.button( + "▶️ Configure", + use_container_width=True, + help = "Select metrics and data to configure your evaluation.", + type="primary", + disabled = len(st.session_state.get("selected_metrics", []))==0 + ) - if preview_button: - preview_merge_data() - if configure_button: - configure_metrics() + if preview_button: + preview_merge_data() + if configure_button: + configure_metrics() pick_data() diff --git a/framework-evalanche/pages/results.py b/framework-evalanche/pages/results.py index cf577b7..a434c22 100644 --- a/framework-evalanche/pages/results.py +++ b/framework-evalanche/pages/results.py @@ -21,8 +21,9 @@ SAVED_EVAL_TABLE, STAGE_NAME, add_row_id, + run_async_sql_to_dataframe, ) -from src.metrics import Metric +from src.metrics import Metric, SQLResultsAccuracy def get_result_title() -> str: @@ -268,7 +269,7 @@ def give_recommendation_instruction() -> None: ) -def get_metric_cols(current_df: DataFrame) -> list: +def get_metric_cols(current_df: Union[DataFrame, pd.DataFrame]) -> list: """Returns list of columns in dataframe that contain metric values. Some metric names have spaces and Snowpark keeps them in lower case with double quotes. @@ -279,6 +280,7 @@ def get_metric_cols(current_df: DataFrame) -> list: return [c_name for c_name in df_columns if c_name.upper() in (m_name.upper() for m_name in metric_names)] + def show_metric() -> None: """Renders metric KPIs based on selected metrics.""" @@ -297,12 +299,13 @@ def show_metric() -> None: Please create a new evaluation or select an existing one from the homepage.""") st.stop() - if st.session_state.get("metric_result_data", None) is not None: - df = st.session_state["metric_result_data"] + if st.session_state.get("result_data", None) is not None: + df = st.session_state["result_data"] metric_names = [metric.get_column() for metric in st.session_state["metrics_in_results"]] kpi_row = row(6, vertical_align="top") # Placing entire dataframe in memory seems to be more stable than iterating over columns and averaging in snowpark - metric_values = df.select(*metric_names).to_pandas() + # metric_values = df.select(*metric_names).to_pandas() + metric_values = df[metric_names] for metric_name, metric_value in metric_values.mean().to_dict().items(): kpi_row.metric(label=metric_name, value=round(metric_value, 2)) @@ -390,13 +393,37 @@ def update_record(table_update_inputs: Dict[str, str], selected_metric_name: str st.session_state["result_data"] = df -# metrics = fetch_metrics(st.session_state["session"], STAGE_NAME) +def show_cortex_analyst_sql_results(metric: Metric, prompt_inputs: Dict[str, str]) -> None: + """Displays data retrieved from SQL used in Cortex Analyst metrics. + + Shows results for generated_sql and expected_sql in the prompt_inputs dictionary. + Only shows results if metric matches the name property of SQLResultsAccuracy. + + Args: + metric (Metric): Column name keys with updated values to replace in dataframe. + prompt_inputs (dict[str, str]): Dictionary of prompt inputs for the metric. + """ + + if type(metric).__name__ is (type(SQLResultsAccuracy()).__name__): + with st.expander("Retrieved Data", expanded=False): + st.caption("Results limited to 100 rows.") + for key in ["generated_sql", "expected_sql"]: + st.write(f"{key.upper()} Result") + if key in prompt_inputs: + try: + inference_data = run_async_sql_to_dataframe(metric.session, prompt_inputs[key]) + st.dataframe(inference_data, + hide_index = True,) + except Exception as e: + st.write(f"Error: {e}") + else: + st.write("No data returned") @st.experimental_dialog("Review Record", width="large") def review_record() -> None: """Render dialog box to review a metric result record.""" - + st.write("Analyze and explore the selected record. Model selection will be used for analysis and metric rerunning. Updates can be saved to viewed results.") if st.session_state["selected_dict"] is None or len(st.session_state["selected_dict"]) == 0: st.write("Please select a record to review.") elif len(st.session_state["selected_dict"]) > 1: @@ -404,7 +431,6 @@ def review_record() -> None: else: # Only first record is selected for analysis selected_record = st.session_state["selected_dict"][0] - # metrics = fetch_metrics(st.session_state["session"], STAGE_NAME) metric_cols = get_metric_cols(st.session_state.get("metric_result_data", None)) metric_col, model_col = st.columns(2) @@ -435,7 +461,10 @@ def review_record() -> None: for key, value in st.session_state["param_selection"][ matching_metric.name ].items(): - entered_value = st.text_area(value, selected_record[value]) + entered_value = st.text_area(value, + selected_record[value], + key = value) + prompt_inputs[key] = entered_value table_update_inputs[value] = entered_value metric_col, comment_col = st.columns((1, 4)) @@ -452,16 +481,29 @@ def review_record() -> None: on_click = rerun_metric, args = (prompt_inputs, matching_metric), use_container_width=True,) save = bottom_selection.button("Save", disabled = selected_metric_name is None, - use_container_width=True,) + use_container_width=True, + help = "Save changes to record in current view.") + + # Unsaved changes in the dialog may linger if user navigates away and returns. + # Here we provide a reset button to clear out any unsaved changes. + reset = bottom_selection.button("Reset", disabled = selected_metric_name is None, + use_container_width=True, + help = "Reset all unsaved changed to selected record.") if st.session_state.get('analysis', None) is not None: st.write(f"**Analysis:** {st.session_state['analysis']}") - + + # If evaluating SQL, show SQL results of current inputs + show_cortex_analyst_sql_results(matching_metric, prompt_inputs) + if save: update_record(table_update_inputs, selected_metric_name, selected_record['ROW_ID']) st.rerun() + if reset: + st.rerun() + def show_dataframe_results() -> Optional[pd.DataFrame]: """ @@ -477,15 +519,8 @@ def show_dataframe_results() -> Optional[pd.DataFrame]: pandas Dataframe """ - if st.session_state.get("metric_result_data", None) is not None: - if st.session_state.get('result_data', None) is None: - st.session_state["result_data"] = add_row_id(st.session_state["metric_result_data"])\ - .withColumn("REVIEW", F.lit(False))\ - .withColumn("COMMENT", F.lit(None)).to_pandas() - - # Store available metrics in session state - st.session_state["metrics"] = fetch_metrics(st.session_state["session"], STAGE_NAME) - + + if st.session_state.get('result_data', None) is not None: df_selection = st.data_editor( st.session_state["result_data"], hide_index=True, @@ -498,7 +533,6 @@ def show_dataframe_results() -> Optional[pd.DataFrame]: return df_selection else: - st.session_state["result_data"] = None return None @@ -509,18 +543,14 @@ def trend_avg_metrics() -> None: """ if ( - st.session_state.get("metric_result_data", None) is not None + st.session_state.get("result_data", None) is not None and st.session_state.get("metrics_in_results", None) is not None ): - metric_cols = get_metric_cols(st.session_state.get("metric_result_data", None)) + metric_cols = get_metric_cols(st.session_state.get("result_data", None)) - # We cast to variant in case the metric is a boolean + df = st.session_state["result_data"].groupby('METRIC_DATETIME')[metric_cols].mean() + # METRIC_DATETIME is batched for every run so there should be many rows per metric calculation set - df = ( - st.session_state["metric_result_data"] - .group_by("METRIC_DATETIME") - .agg(*[F.avg(F.to_variant(col)).alias(col) for col in metric_cols]) - ) st.write("Average Metric Scores over Time") st.line_chart( df, @@ -536,12 +566,12 @@ def trend_count_metrics() -> None: """ if ( - st.session_state.get("metric_result_data", None) is not None + st.session_state.get("result_data", None) is not None and st.session_state.get("metrics_in_results", None) is not None ): - metric_cols = get_metric_cols(st.session_state.get("metric_result_data", None)) + metric_cols = get_metric_cols(st.session_state.get("result_data", None)) - df = st.session_state["metric_result_data"] + df = st.session_state["result_data"] st.write("Metric Scores over Time") st.bar_chart( df, @@ -557,20 +587,16 @@ def bar_chart_metrics() -> None: """ if ( - st.session_state.get("metric_result_data", None) is not None + st.session_state.get("result_data", None) is not None and len(st.session_state.get("metrics_in_results", []))>0 ): - metric_cols = get_metric_cols(st.session_state.get("metric_result_data", None)) + metric_cols = get_metric_cols(st.session_state.get("result_data", None)) - df = st.session_state["metric_result_data"] - chart_df = ( - df.select(metric_cols) - .unpivot("SCORE", "METRIC", metric_cols) - .group_by("METRIC", "SCORE") - .count() - ) + df = pd.melt(st.session_state["result_data"], + value_vars=metric_cols, var_name = 'METRIC', value_name = 'SCORE')\ + .groupby(['METRIC', 'SCORE']).size().reset_index(name='COUNT') st.write("Score Counts by Metric") - st.bar_chart(chart_df, x="SCORE", y="COUNT", color="METRIC") + st.bar_chart(df, x="SCORE", y="COUNT", color="METRIC") def get_trendable_column() -> Union[None, str]: @@ -617,6 +643,15 @@ def show_results(): from src.app_utils import fetch_warehouses + if st.session_state.get("metric_result_data", None) is not None: + if st.session_state.get('result_data', None) is None: + st.session_state["result_data"] = add_row_id(st.session_state["metric_result_data"])\ + .withColumn("REVIEW", F.lit(False))\ + .withColumn("COMMENT", F.lit(None)).to_pandas() + + # Store available metrics in session state + st.session_state["metrics"] = fetch_metrics(st.session_state["session"], STAGE_NAME) + show_metric() if st.session_state["eval_funnel"] is not None: top_row = row(5, vertical_align="top") diff --git a/framework-evalanche/setup/cli_setup.sql b/framework-evalanche/setup/cli_setup.sql index 9013399..79c219c 100644 --- a/framework-evalanche/setup/cli_setup.sql +++ b/framework-evalanche/setup/cli_setup.sql @@ -1,5 +1,5 @@ SET major = 2; -SET minor = 0; +SET minor = 1; SET COMMENT = concat('{"origin": "sf_sit", "name": "evalanche", "version": {"major": ',$major,', "minor": ',$minor,'}}'); @@ -77,6 +77,55 @@ def run(session, metric_name): return f"An error occurred: {e}" $$; +-- Cortex Analyst runner +CREATE OR REPLACE PROCEDURE GENAI_UTILITIES.EVALUATION.CORTEX_ANALYST_SQL(prompt STRING, semantic_file_path STRING) +RETURNS STRING +LANGUAGE PYTHON +PACKAGES = ('snowflake-snowpark-python') +RUNTIME_VERSION = '3.9' +HANDLER = 'process_message' +as +$$ +import _snowflake +import json +def send_message(messages, semantic_file_path): + """Calls the REST API and returns the response.""" + + request_body = { + "messages": messages, + "semantic_model_file": f"@{semantic_file_path}", + } + resp = _snowflake.send_snow_api_request( + "POST", + f"/api/v2/cortex/analyst/message", + {}, + {}, + request_body, + {}, + 30000, + ) + if resp["status"] < 400: + response_content = json.loads(resp["content"]) + return response_content + else: + raise Exception( + f"Failed request with status {resp['status']}: {resp}" + ) + +def process_message(session, prompt, semantic_file_path): + """Processes a message and adds the response to the chat.""" + messages = [] + messages.append( + {"role": "user", "content": [{"type": "text", "text": prompt}]} + ) + response = send_message(messages, semantic_file_path) + for item in response["message"]["content"]: + if item["type"] == "sql": + return item.get("statement", None) + else: + return None +$$; + -- Create Streamlit CREATE OR REPLACE STREAMLIT GENAI_UTILITIES.EVALUATION.EVALUATION_APP ROOT_LOCATION = '@GENAI_UTILITIES.EVALUATION.STREAMLIT_STAGE' diff --git a/framework-evalanche/setup/git_setup.sql b/framework-evalanche/setup/git_setup.sql index db46bd5..1933275 100644 --- a/framework-evalanche/setup/git_setup.sql +++ b/framework-evalanche/setup/git_setup.sql @@ -1,5 +1,5 @@ SET major = 2; -SET minor = 0; +SET minor = 1; SET COMMENT = concat('{"origin": "sf_sit", "name": "evalanche", "version": {"major": ',$major,', "minor": ',$minor,'}}'); @@ -107,6 +107,55 @@ def run(session, metric_name): return f"An error occurred: {e}" $$; +-- Cortex Analyst runner +CREATE OR REPLACE PROCEDURE GENAI_UTILITIES.EVALUATION.CORTEX_ANALYST_SQL(prompt STRING, database STRING, SCHEMA STRING, STAGE STRING, SEMANTIC_FILE STRING) +RETURNS STRING +LANGUAGE PYTHON +PACKAGES = ('snowflake-snowpark-python') +RUNTIME_VERSION = '3.9' +HANDLER = 'process_message' +as +$$ +import _snowflake +import json +def send_message(messages, database, schema, stage, semantic_file): + """Calls the REST API and returns the response.""" + + request_body = { + "messages": messages, + "semantic_model_file": f"@{database}.{schema}.{stage}/{semantic_file}", + } + resp = _snowflake.send_snow_api_request( + "POST", + f"/api/v2/cortex/analyst/message", + {}, + {}, + request_body, + {}, + 30000, + ) + if resp["status"] < 400: + response_content = json.loads(resp["content"]) + return response_content + else: + raise Exception( + f"Failed request with status {resp['status']}: {resp}" + ) + +def process_message(session, prompt, database, schema, stage, semantic_file): + """Processes a message and adds the response to the chat.""" + messages = [] + messages.append( + {"role": "user", "content": [{"type": "text", "text": prompt}]} + ) + response = send_message(messages, database, schema, stage, semantic_file) + for item in response["message"]["content"]: + if item["type"] == "sql": + return item.get("statement", None) + else: + return None +$$; + -- Create Streamlit CREATE OR REPLACE STREAMLIT GENAI_UTILITIES.EVALUATION.EVALUATION_APP ROOT_LOCATION = '@GENAI_UTILITIES.EVALUATION.STREAMLIT_STAGE' diff --git a/framework-evalanche/src.zip b/framework-evalanche/src.zip index 67bbb86..6df8180 100644 Binary files a/framework-evalanche/src.zip and b/framework-evalanche/src.zip differ diff --git a/framework-evalanche/src/app_utils.py b/framework-evalanche/src/app_utils.py index f386de8..617ccf9 100644 --- a/framework-evalanche/src/app_utils.py +++ b/framework-evalanche/src/app_utils.py @@ -308,6 +308,27 @@ def get_stages(name: str): else: st.session_state[f"{name}_stages"] = [] +def get_semantic_models(name: str): + """Call back function to associate available semantic model selector with corresponding stage selection.""" + + if ( + st.session_state[f"{name}_database"] is not None + and st.session_state[f"{name}_schema"] is not None + and st.session_state[f"{name}_stage"] is not None + ): + if "session" not in st.session_state: + session = get_connection() + else: + session = st.session_state["session"] + stage = f'{st.session_state[f"{name}_database"]}.{st.session_state[f"{name}_schema"]}.{st.session_state[f"{name}_stage"]}' + query = f"ls @{stage} pattern='.*\\yaml'" + result = session.sql(query) + files = [file[0].split("/")[-1] for file in result.collect()] + if len(files) > 0: + st.session_state[f"{name}_models"] = files + else: + st.session_state[f"{name}_models"] = [] + def get_sprocs(name: str): """Call back function to associate database and schema selector with corresponding stored procedures.""" diff --git a/framework-evalanche/src/metrics.py b/framework-evalanche/src/metrics.py index bcc4eb0..ca31ce6 100644 --- a/framework-evalanche/src/metrics.py +++ b/framework-evalanche/src/metrics.py @@ -40,13 +40,15 @@ def evaluate( import re model_to_use = model if model else self.model - - prompt = self.get_prompt(**kwargs) + try: + prompt = self.get_prompt(**kwargs) - response = run_async_sql_complete(self.session, model_to_use, prompt) - rating = re.search(r'\d+', response) - if rating: - return int(rating.group()) + response = run_async_sql_complete(self.session, model_to_use, prompt) + rating = re.search(r'\d+', response) + if rating: + return int(rating.group()) + except Exception: + return None else: return None @@ -69,7 +71,7 @@ def __init__( prompt=SQLAccuracy_prompt, required={ "question": "User question", - "inference_sql": "LLM-generated SQL statement", + "generated_sql": "LLM-generated SQL statement", "expected_sql": "Ground truth SQL statement", }, ) @@ -81,11 +83,11 @@ def get_prompt( if self.prompt is not None: from src.snowflake_utils import return_sql_result - if "inference_sql" in kwargs: - inference_data = return_sql_result(self.session, kwargs["inference_sql"]) + if "generated_sql" in kwargs: + inference_data = return_sql_result(self.session, kwargs["generated_sql"]) else: inference_data = "No data returned" - if "inference_sql" in kwargs: + if "expected_sql" in kwargs: expected_data = return_sql_result(self.session, kwargs["expected_sql"]) else: expected_data = "No data returned" @@ -110,13 +112,15 @@ def evaluate( ): model_to_use = model if model else self.model + try: + prompt = self.get_prompt(**kwargs) - prompt = self.get_prompt(**kwargs) - - response = run_async_sql_complete(self.session, model_to_use, prompt) - if "true" in response.lower(): - return True - else: + response = run_async_sql_complete(self.session, model_to_use, prompt) + if "true" in response.lower(): + return True + else: + return False + except Exception: return False diff --git a/framework-evalanche/src/prompts.py b/framework-evalanche/src/prompts.py index 5c938fe..8690afd 100644 --- a/framework-evalanche/src/prompts.py +++ b/framework-evalanche/src/prompts.py @@ -2,9 +2,10 @@ The JSON data is the output of a SQL query generated to answer a user question. You are to determine if the provided JSON data matches the ground truth JSON data and answers the user question. +The Inference JSON does not have to match the Ground Truth JSON perfectly but should contain the correct answer as denoted by the Ground Truth JSON. Your answer should be either "True" or "False". -Answer "True" if you believe the JSON data matches the ground truth JSON data response. -Answer "False" if you do not believe the JSON data matches the ground truth JSON data. +Answer "True" if you believe the Inference JSON data reflects the Ground Truth JSON data given the user question. +Otherwise, answer "False". [User Question] {question} diff --git a/framework-evalanche/src/snowflake_utils.py b/framework-evalanche/src/snowflake_utils.py index ed0306e..84bdf53 100644 --- a/framework-evalanche/src/snowflake_utils.py +++ b/framework-evalanche/src/snowflake_utils.py @@ -143,13 +143,12 @@ def return_sql_result(session: Session, sql: str) -> Union[str, None]: """ from snowflake.snowpark import functions as F - - result = ( - session.sql(sql.replace(";", "")) - .limit(100) - .select(F.to_varchar(F.array_agg(F.object_construct("*")))) - ) try: + result = ( + session.sql(sql.replace(";", "")) + .limit(100) + .select(F.to_varchar(F.array_agg(F.object_construct("*")))) + ) return result.collect_nowait().result()[0][0] except Exception as e: st.error(f"Error: {e}") @@ -280,3 +279,12 @@ def call_sproc(session: Session, name: str) -> Any: def call_async_sproc(session: Session, sproc: str, input_value: Dict[str, Any]) -> Any: return session.sql(f"CALL {sproc}({input_value})").collect_nowait().result()[0][0] + +def run_async_sql_to_dataframe(session: Session, query: str) -> DataFrame: + """Runs a SQL query and returns the result as a Snowpark DataFrame.""" + query_id = session.sql(query.replace(';','')).collect_nowait().query_id + async_job = session.create_async_job(query_id) + + return async_job.to_df() + +