Skip to content

Commit

Permalink
Merge pull request #39 from Snowflake-Labs/release-8312024
Browse files Browse the repository at this point in the history
Update
  • Loading branch information
sfc-gh-jsummer authored Oct 31, 2024
2 parents 8b5d49c + a2627f8 commit 39a579e
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 127 deletions.
9 changes: 6 additions & 3 deletions framework-evalanche/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,23 @@ Once you've selected Metric(s) and configured your Data Source for the Metric(s)
# Setup
Evalanche is deployed to Streamlit in Snowflake and can be done so using multiple methods. First obtain the source code for Evalanche by either downloading this repo or cloning the repository locally. We list a few options below for deployment. Use the one based on your desired tool.

## Snowsight Worksheet
Copy and paste the contents of `setup/git_setup.sql` into a Snowsight SQL Worksheet. Ensure your context role is appropriate as this will be the owning role of the Streamlit app. Execute the entire SQL Worksheet.

## Snowflake CLI
See [Snowflake CLI installation documentation](https://docs.snowflake.com/developer-guide/snowflake-cli/index) for instructions. Once installed, configure your connection parameters OR pass them via command flags. Run the below command in terminal from the project root to deploy the application.
```bash
snow sql -f setup.sql
snow sql -f setup/cli_setup.sql
```

## VSCode with Snowflake Extension
See [Snowflake Extension for Visual Studio Code installation documentation](https://docs.snowflake.com/en/user-guide/vscode-ext) for instructions. Once installed, sign into Snowflake in the extension. Execute all of `setup.sql` from VSCode.
See [Snowflake Extension for Visual Studio Code installation documentation](https://docs.snowflake.com/en/user-guide/vscode-ext) for instructions. Once installed, sign into Snowflake in the extension. Execute all of `setup/git_setup.sql` from VSCode.

# Running
Once Evalanche is deployed to Streamlit in Snowflake, the app is ready for use. Login into Snowsight and open the app named Evalanche: GenAI Evaluation Application. If desired, this can be done directly from terminal with Snowflake CLI command

```bash
snow streamlit get-url EVALUATION_APP --open --database CORTEX_ANALYST_UTILITIES --schema EVALUATION
snow streamlit get-url EVALUATION_APP --open --database GENAI_UTILITIES --schema EVALUATION
```

# Advanced
Expand Down
37 changes: 28 additions & 9 deletions framework-evalanche/pages/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections import OrderedDict

# Python 3.8 type hints
from typing import List, Union
from typing import List, Union, Dict, Tuple, Any

import streamlit as st
from snowflake.snowpark import DataFrame
Expand Down Expand Up @@ -73,7 +73,7 @@ def run_sql(sql: str) -> Union[None, DataFrame]:
st.warning("Please enter a SQL query.")
else:
try:
return st.session_state["session"].sql(sql)
return st.session_state["session"].sql(sql.replace(';', ''))
except Exception as e:
st.error(f"Error: {e}")

Expand Down Expand Up @@ -201,24 +201,34 @@ def data_spec(key_name: str, instructions: str, height=200, join_key=True) -> No
)


def sproc_runner(session: Session, sproc_name: str, inputs: Dict[str, Any]) -> Tuple[Union[int, float], Any]:
start_time = time.time()
record_result = session.sql(f"""CALL {sproc_name}({inputs})""").collect_nowait().result()[0][0]
elapsed_time = time.time() - start_time
return (elapsed_time, record_result)

def pipeline_runner(
session: Session,
sproc: str,
input_tablename: str,
output_tablename: str,
columns: List[str],
) -> None:
"""Runs stored procedures asynchronously over input from Snowflake table.
Stored procedures may not be asynchronous but calling of them is done asynchronously in the app.
Stored procedures must have one input that is a string and return a single value.
Results are written to a table in Snowflake.
Write mode is set to append so that multiple evaluations can be saved to the same table.
Note that all columns in table will be kept but only those passed in columns will be
passed to stored procedure to mitigate errors from other columns.
Args:
session (Session): Snowpark session
sproc (string): Fully-qualified name of stored procedure.
input_tablename (string): Fully-qualified name of table with input values.
output_tablename (string): Fully-qualified name of table to write results to.
columns (list): List of columns to pass to stored procedure.
"""

Expand All @@ -229,15 +239,16 @@ def pipeline_runner(
from src.snowflake_utils import add_row_id, save_eval_to_table

df = add_row_id(session.table(input_tablename))
columns = columns + ["ROW_ID"]

for pandas_df in df.to_pandas_batches():
for pandas_df in df.select(*columns).to_pandas_batches():
# for pandas_df in df.to_pandas_batches():
results = Parallel(n_jobs=multiprocessing.cpu_count(), backend="threading")(
delayed(
lambda row: {
"ROW_ID": row["ROW_ID"], # Capture ROW_ID
"RESPONSE": session.sql(f"""CALL {sproc}({row.to_dict()})""")
.collect_nowait()
.result()[0][0],
"RESPONSE": (response := sproc_runner(session, sproc, row.to_dict()))[0],
"ELAPSED_TIME": response[1],
}
)(row)
for _, row in pandas_df.iterrows()
Expand All @@ -259,6 +270,7 @@ def pipeline_runner_dialog() -> None:
Before you start, your LLM pipeline must be encapsulated in a stored procedure that takes a VARIANT input and returns a single value.
Every row of the reference table will be passed through the stored procedure as a dictionary.
Every column in the reference table will be passed to the stored procedure but only those columns selected will be passed to the stored procedure.
Please see [Snowflake Stored Procedure documentation](https://docs.snowflake.com/en/developer-guide/stored-procedure/stored-procedures-overview)
for details on stored procedures and these [specific instructions](https://github.com/sfc-gh-jsummer/evalanche#crafting-a-llm-pipeline-stored-procedure) on crafting these stored procedures.""")

Expand All @@ -278,10 +290,16 @@ def pipeline_runner_dialog() -> None:
st.divider()

st.write("Select the reference data.")
table_spec = table_data_selector("runner_output", new_table=False)
name = "runner_output"
table_spec = table_data_selector(name, new_table=False)
data_table = (
f'{table_spec["database"]}.{table_spec["schema"]}.{table_spec["table"]}'
)
available_columns = fetch_columns(table_spec["database"],table_spec["schema"],table_spec["table"])
selected_columns = st.multiselect(
"Select Columns", available_columns, default=None, key=f"columns_{name}",
help = "Select the columns to explicitly passed to the stored procedure."
)

if st.button("Run"):
with st.spinner("Running pipeline..."):
Expand All @@ -290,10 +308,11 @@ def pipeline_runner_dialog() -> None:
sproc_name.split("(")[0],
data_table,
new_tablename,
selected_columns
)
st.success(f"Results written to {new_tablename}.")
time.sleep(1.5)
st.rerun()
time.sleep(2)
st.rerun()


@st.experimental_dialog("Configure Metrics", width="large")
Expand Down
95 changes: 46 additions & 49 deletions framework-evalanche/pages/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from snowflake.snowpark import DataFrame
from streamlit_extras.row import row

from src.app_utils import render_sidebar, select_schema_context
from src.app_utils import render_sidebar, select_model
from src.metric_utils import AUTO_EVAL_TABLE, SAVED_EVAL_TABLE
from src.snowflake_utils import save_eval_to_table

Expand All @@ -19,11 +19,12 @@ def get_result_title() -> str:
The title includes the evaluation name if it is available in session state.
"""

if st.session_state.get("eval_name", None) is not None:
return f"Evaluation Results: {st.session_state.get('eval_name', '')}"
else:

if st.session_state.get("eval_funnel", "") == "new":
return "Evaluation Results"
else:
if st.session_state.get("eval_name", None) is not None:
return f"Evaluation Results: {st.session_state.get('eval_name', '')}"


TITLE = get_result_title()
Expand Down Expand Up @@ -139,15 +140,9 @@ def save_eval() -> None:

st.write("""Source data and metric configuration will be captured as a Snowflake Stored Procedure.
Select the evaluation from Homepage's **Saved Evaluations** section to run.""")
name = "save_eval"
schema_context = select_schema_context(name, on_change=get_stages, args=(name,))
if f"{name}_stages" not in st.session_state:
st.session_state[f"{name}_stages"] = []
stage_name = st.selectbox(
"Select Stage",
st.session_state[f"{name}_stages"],
index=None,
)
# App logic and saved evaluations must resides in same location so we hard-code these values.
schema_context = {"database": "GENAI_UTILITIES", "schema": "EVALUATION"}
stage_name = "STREAMLIT_STAGE"
eval_name, eval_description = get_eval_name_desc()

if st.button("Save"):
Expand Down Expand Up @@ -178,9 +173,8 @@ def save_eval() -> None:
st.success(
"Evaluation registered complete. See On Demand Evaluations to run."
)
except Exception as e:
st.error(f"Error: {e}")
try:
# Inserting metadata into table does not have its own try/except block as we
# only want to add to the table if everything above is successful
with st.spinner("Adding to On Demand Evaluations."):
msg = insert_to_eval_table(
session=st.session_state["session"],
Expand Down Expand Up @@ -212,21 +206,16 @@ def automate_eval() -> None:
st.write("""Source data will be tracked and metric(s) calculated for new records.
Results will be captured in a table.
Select the evaluation from Homepage's **Automated Evaluations** section to view results.""")
name = "auto_eval"
schema_context = select_schema_context(name, on_change=get_stages, args=(name,))
if f"{name}_stages" not in st.session_state:
st.session_state[f"{name}_stages"] = []
stage_name = st.selectbox(
"Select Stage",
st.session_state[f"{name}_stages"],
index=None,
)
# App logic and saved evaluations must resides in same location so we hard-code these values.
schema_context = {"database": "GENAI_UTILITIES", "schema": "EVALUATION"}
stage_name = "STREAMLIT_STAGE"

warehouse = st.selectbox(
"Select Warehouse",
st.session_state["warehouses"], # Set prior to launching dialog box
index=None,
help="Select the warehouse to power the automation task.",
)
warehouse = "WH_XS"
eval_name, eval_description = get_eval_name_desc()

if st.button("Save"):
Expand Down Expand Up @@ -258,9 +247,8 @@ def automate_eval() -> None:
"""Evaluation automation complete. Results from current records may be delayed.
Select Automated Evaluations from the Homepage to view."""
)
except Exception as e:
st.error(f"Error: {e}")
try:
# Inserting metadata into table does not have its own try/except block as we
# only want to add to the table if everything above is successful
with st.spinner("Adding to Automated Evaluations."):
msg = insert_to_eval_table(
session=st.session_state["session"],
Expand All @@ -285,6 +273,17 @@ def give_recommendation_instruction() -> None:
)


def get_metric_cols(current_df: DataFrame) -> list:
"""Returns list of columns in dataframe that contain metric values.
Some metric names have spaces and Snowpark keeps them in lower case with double quotes.
Metric names without spaces are capitalized when added to a Snowflake table/dataframe."""

metric_names = [metric.name for metric in st.session_state["selected_metrics"]]
df_columns = current_df.columns
return [c_name for c_name in df_columns if c_name.upper() in (m_name.upper() for m_name in metric_names)]


def show_metric() -> None:
"""Renders metric KPIs based on selected metrics."""

Expand Down Expand Up @@ -362,20 +361,20 @@ def show_recommendation(selection: Union[int, None], pandas_df: pd.DataFrame) ->
session = st.session_state["session"]

selected_row = pandas_df.iloc[selection].to_dict()
metric_cols = [
metric.name.upper() for metric in st.session_state["selected_metrics"]
]
metric_cols = get_metric_cols(pandas_df)

selected_metric_name = st.selectbox(
"Select Metric", metric_cols, index=None, key="metric_selector"
)
if selected_metric_name is not None:
# Get metric object that matches metric name
score = selected_row[selected_metric_name]

# Get the actual metric object
matching_metric = next(
(
metric
for metric in metrics
if metric.name.upper() == selected_metric_name
if metric.name.upper() == selected_metric_name.upper()
),
None,
)
Expand All @@ -390,11 +389,14 @@ def show_recommendation(selection: Union[int, None], pandas_df: pd.DataFrame) ->
}
original_prompt = matching_metric.get_prompt(**original_prompt_fstrings)
recommender_prompt = Recommendation_prompt.format(
prompt=original_prompt, score=selected_row[selected_metric_name]
prompt=original_prompt, score=score
)

with st.spinner("Thinking..."):
response = run_complete(session, "llama3.1-8b", recommender_prompt)
rec_model = select_model('rec_model',
default = "llama3.2-3b")
if st.button("Analyze", disabled = selected_metric_name is None):
with st.spinner("Crunching the numbers..."):
response = run_complete(session, rec_model, recommender_prompt)
if response is not None:
st.write(response)

Expand All @@ -411,9 +413,7 @@ def trend_avg_metrics() -> None:
st.session_state.get("metric_result_data", None) is not None
and st.session_state.get("selected_metrics", None) is not None
):
metric_cols = [
metric.name.upper() for metric in st.session_state["selected_metrics"]
]
metric_cols = get_metric_cols(st.session_state.get("metric_result_data", None))

# We cast to variant in case the metric is a boolean
# METRIC_DATETIME is batched for every run so there should be many rows per metric calculation set
Expand All @@ -439,9 +439,7 @@ def trend_count_metrics() -> None:
st.session_state.get("metric_result_data", None) is not None
and st.session_state.get("selected_metrics", None) is not None
):
metric_cols = [
metric.name.upper() for metric in st.session_state["selected_metrics"]
]
metric_cols = get_metric_cols(st.session_state.get("metric_result_data", None))

df = st.session_state["metric_result_data"]
st.bar_chart(
Expand All @@ -456,14 +454,13 @@ def bar_chart_metrics() -> None:
This is the default chart if no trendable column is found.
"""

# TO DO - Add preview metric vs. selected metric so user can see
# results for previously selected metrics until they select new one.
if (
st.session_state.get("metric_result_data", None) is not None
and st.session_state.get("selected_metrics", None) is not None
and len(st.session_state.get("selected_metrics", []))>0
):
metric_cols = [
metric.name.upper() for metric in st.session_state["selected_metrics"]
]
metric_cols = get_metric_cols(st.session_state.get("metric_result_data", None))

df = st.session_state["metric_result_data"]
chart_df = (
Expand Down
58 changes: 0 additions & 58 deletions framework-evalanche/setup.sql

This file was deleted.

Loading

0 comments on commit 39a579e

Please sign in to comment.