From ec81679981b3a549f3c3ad6c9d6cacfe9efa88fc Mon Sep 17 00:00:00 2001 From: sfc-gh-jsummer Date: Tue, 3 Sep 2024 15:54:51 -0500 Subject: [PATCH] Add support for dynamic table in looker implementation --- admin_apps/partner/looker.py | 153 ++++++++++++------ admin_apps/shared_utils.py | 32 +++- .../snowflake_utils/snowflake_connector.py | 16 ++ 3 files changed, 148 insertions(+), 53 deletions(-) diff --git a/admin_apps/partner/looker.py b/admin_apps/partner/looker.py index b2d1eb57..b078c6d8 100644 --- a/admin_apps/partner/looker.py +++ b/admin_apps/partner/looker.py @@ -19,12 +19,14 @@ check_valid_session_state_values, format_snowflake_context, get_available_databases, + get_available_warehouses, get_available_schemas, get_snowflake_connection, input_sample_value_num, input_semantic_file_name, run_generate_model_str_from_snowflake, set_sit_query_tag, + get_sit_query_tag ) from semantic_model_generator.data_processing.proto_utils import proto_to_dict @@ -95,6 +97,8 @@ def set_looker_semantic() -> None: ) with st.spinner("Loading databases..."): available_databases = get_available_databases() + with st.spinner("Loading warehouses..."): + available_warehouses = get_available_warehouses() col1, col2 = st.columns(2) with col1: @@ -140,11 +144,19 @@ def set_looker_semantic() -> None: ) st.divider() - st.write( - """ - Please pick a Snowflake destination for the table. - """ - ) + col1, col2 = st.columns(2, vertical_alignment="center") + with col1: + st.write( + """ + Please pick a Snowflake destination for the table. + """ + ) + with col2: + dynamic_table = st.checkbox( + "Dynamic Table", + key='dynamic', + value=False, + ) col1, col2 = st.columns(2) with col1: st.selectbox( @@ -171,7 +183,35 @@ def set_looker_semantic() -> None: help="Specify the name of the Snowflake table to materialize.", ) with col2: + target_lag: int = st.selectbox( # type: ignore + "Dynamic Table Target Lag", + list(range(1, 41)), + index=0, + key='target_lag', + disabled=not dynamic_table, + help="Specifies the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables.", + ) + target_lag_unit: str = st.selectbox( # type: ignore + "Target Lag Unit", + ['seconds','minutes','hours','days'], + index=1, + key='target_lag_unit', + disabled=not dynamic_table, + help="Specifies the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables.", + ) + dynamic_warehouse: str = st.selectbox( # type: ignore + "Warehouse", + available_warehouses, + index=None, + key='dynamic_warehouse', + disabled=not dynamic_table, + help="Specifies the name of the warehouse that provides the compute resources for refreshing the dynamic table.", + ) + st.divider() + col1, col2 = st.columns(2) + with col1: model_name = input_semantic_file_name() + with col2: sample_values = input_sample_value_num() if st.button("Continue", type="primary"): @@ -194,13 +234,17 @@ def set_looker_semantic() -> None: full_tablename = f"{st.session_state['looker_target_schema']}.{st.session_state['looker_target_table_name']}" looker_columns = render_looker_explore_as_table( - get_snowflake_connection(), - st.session_state["looker_model_name"].lower(), - st.session_state["looker_explore_name"].lower(), - st.session_state["looker_target_schema"], - st.session_state["looker_target_table_name"].upper(), - st.session_state["looker_connection_db"], - None, # TO DO - Add support for field selection + conn = get_snowflake_connection(), + model_name = st.session_state["looker_model_name"].lower(), + explore_name = st.session_state["looker_explore_name"].lower(), + snowflake_context = st.session_state["looker_target_schema"], + table_name = st.session_state["looker_target_table_name"].upper(), + optional_db = st.session_state["looker_connection_db"], + fields = None, # TO DO - Add support for field selection + dynamic = dynamic_table, + target_lag = target_lag, + target_lag_unit = target_lag_unit, + warehouse = dynamic_warehouse, ) st.session_state["looker_field_metadata"] = looker_columns if st.session_state[ @@ -328,7 +372,14 @@ def create_query_id( def create_explore_ctas( - query_string: str, snowflake_context: str, table_name: str + query_string: str, + snowflake_context: str, + table_name: str, + column_list: list[str], + dynamic: Optional[bool] = False, + target_lag: Optional[int] = 20, + target_lag_unit: Optional[str] = "minutes", + warehouse: Optional[str] = None, ) -> str: """ Augments Looker Explore SQL with CTAS prefix to create a materialized view. @@ -346,10 +397,29 @@ def create_explore_ctas( line for line in lines if not line.strip().startswith(("LIMIT", "FETCH")) ] filtered_query_string = "\n".join(filtered_lines) + columns = ", ".join(column_list) + comment = f"COMMENT = '{get_sit_query_tag(vendor = 'looker', action = 'materialize')}'" + + if dynamic: + if not [x for x in (target_lag, + target_lag_unit, + warehouse) if x is None]: + ctas_clause = f""" + CREATE OR REPLACE DYNAMIC TABLE {snowflake_context}.{table_name}({columns}) + TARGET_LAG = '{target_lag} {target_lag_unit}' + WAREHOUSE = {warehouse} + """ + else: + st.warning(""" + Dynamic materialization requires target_lag, target_lag_unit, and warehouse. + Standard materialization will be used. + """) + ctas_clause = f"CREATE OR REPLACE TABLE {snowflake_context}.{table_name}({columns}) " + else: + ctas_clause = f"CREATE OR REPLACE TABLE {snowflake_context}.{table_name}({columns}) " - # Add CTAS clause - ctas_clause = f"CREATE OR REPLACE TABLE {snowflake_context}.{table_name} AS\n" - return ctas_clause + filtered_query_string + ctas_clause += comment + " AS\n" + filtered_query_string + return ctas_clause def get_explore_sql( @@ -370,47 +440,22 @@ def get_explore_sql( return response -def fetch_columns_in_table(conn: SnowflakeConnection, table_name: str) -> list[str]: +def prep_column_names(columns: list[str]) -> list[str]: """ - Fetches all columns in a Snowflake table table + Prepares column names for Snowflake table creation Args: - conn: SnowflakeConnection to run the query - table_name: The fully-qualified name of the table. - - Returns: a list of qualified column names (db.schema.column) - """ - - query = f"show columns in table {table_name};" - cursor = conn.cursor() - cursor.execute(query) - results = cursor.fetchall() - return [result[2] for result in results] - - -def clean_table_columns( - conn: SnowflakeConnection, snowflake_context: str, table_name: str -) -> list[str]: - """ - Renames table columns to remove alias prefixes and double quotes. - Args: - conn: SnowflakeConnection to run the query - snowflake_context (str): Database_name.Schema_name for Snowflake - table_name: The fully-qualified name of the table. + columns: List of column names Returns: a list of column names """ - columns = fetch_columns_in_table(conn, f"{snowflake_context}.{table_name}") new_cols = [] - for col in columns: if "." in col: new_col = col.split(".")[-1].upper() else: new_col = col new_cols.append(new_col) - query = f'ALTER TABLE {snowflake_context}.{table_name} RENAME COLUMN "{col}" TO {new_col};' - conn.cursor().execute(query) return new_cols @@ -422,6 +467,10 @@ def render_looker_explore_as_table( table_name: str, optional_db: Optional[str] = None, fields: Optional[str] = None, + dynamic: Optional[bool] = False, + target_lag: Optional[int] = 20, + target_lag_unit: Optional[str] = "minutes", + warehouse: Optional[str] = None, ) -> None | dict[str, dict[str, str]]: """ Creates materialized table corresponding to Looker Explore. @@ -444,6 +493,7 @@ def render_looker_explore_as_table( field_metadata = get_explore_fields(sdk, model_name, explore_name, fields) if field_metadata: metadata_fields = list(field_metadata.keys()) + clean_columns = prep_column_names(metadata_fields) else: metadata_fields = None except Exception as e: @@ -457,7 +507,16 @@ def render_looker_explore_as_table( except Exception as e: st.error(f"Error fetching Looker Explore SQL: {e}") return None - ctas = create_explore_ctas(explore_sql, snowflake_context, table_name) + ctas = create_explore_ctas( + explore_sql, + snowflake_context, + table_name, + clean_columns, + dynamic, + target_lag, + target_lag_unit, + warehouse, + ) # Create materialized equivalent of Explore # Looker sources don't require explicit database qualification but instead use connection database implicitly. @@ -472,11 +531,9 @@ def render_looker_explore_as_table( conn.cursor().execute(f"USE DATABASE {current_db};") else: conn.cursor().execute(ctas) - # Looker creates lower case double-quoted column names with alias prefixes - new_columns = clean_table_columns(conn, snowflake_context, table_name) # Associate new column names with looker field metadata - column_metadata = dict(zip(new_columns, list(field_metadata.values()))) + column_metadata = dict(zip(clean_columns, list(field_metadata.values()))) return column_metadata diff --git a/admin_apps/shared_utils.py b/admin_apps/shared_utils.py index 42c42ae6..3c36cc4e 100644 --- a/admin_apps/shared_utils.py +++ b/admin_apps/shared_utils.py @@ -27,6 +27,7 @@ from semantic_model_generator.snowflake_utils.snowflake_connector import ( SnowflakeConnector, fetch_databases, + fetch_warehouses, fetch_schemas_in_database, fetch_tables_views_in_schema, set_database, @@ -98,6 +99,16 @@ def get_available_databases() -> list[str]: return fetch_databases(get_snowflake_connection()) +@st.cache_resource(show_spinner=False) +def get_available_warehouses() -> list[str]: + """ + Simple wrapper around fetch_warehouses to cache the results. + + Returns: list of warehouse names + """ + + return fetch_warehouses(get_snowflake_connection()) + class GeneratorAppScreen(str, Enum): """ @@ -864,12 +875,12 @@ def download_yaml(file_name: str, conn: SnowflakeConnection) -> str: # Read the raw contents from {temp_dir}/{file_name} and return it as a string. yaml_str = temp_file.read() return yaml_str + - -def set_sit_query_tag(conn: SnowflakeConnection, vendor: str, action: str) -> None: +def get_sit_query_tag(vendor: str = None, action: str = None) -> str: """ - Sets query tag on a single zero-compute ping for tracking. - Returns: None + Returns SIT query tag. + Returns: str """ query_tag = { @@ -878,8 +889,19 @@ def set_sit_query_tag(conn: SnowflakeConnection, vendor: str, action: str) -> No "version": {"major": 1, "minor": 0}, "attributes": {"vendor": vendor, "action": action}, } + return json.dumps(query_tag) + + + +def set_sit_query_tag(conn: SnowflakeConnection, vendor: str = None, action: str = None) -> None: + """ + Sets query tag on a single zero-compute ping for tracking. + Returns: None + """ + + query_tag = get_sit_query_tag(vendor, action) - conn.cursor().execute(f"alter session set query_tag='{json.dumps(query_tag)}'") + conn.cursor().execute(f"alter session set query_tag='{query_tag}'") conn.cursor().execute("SELECT 'SKIMANTICS';") conn.cursor().execute("alter session set query_tag=''") diff --git a/semantic_model_generator/snowflake_utils/snowflake_connector.py b/semantic_model_generator/snowflake_utils/snowflake_connector.py index c0d0e84b..a1e0cdbe 100644 --- a/semantic_model_generator/snowflake_utils/snowflake_connector.py +++ b/semantic_model_generator/snowflake_utils/snowflake_connector.py @@ -260,6 +260,22 @@ def fetch_databases(conn: SnowflakeConnection) -> List[str]: return [result[1] for result in results] +def fetch_warehouses(conn: SnowflakeConnection) -> List[str]: + """ + Fetches all warehouses that the current user has access to + Args: + conn: SnowflakeConnection to run the query + + Returns: a list of warehouses names + + """ + query = "show warehouses;" + cursor = conn.cursor() + cursor.execute(query) + results = cursor.fetchall() + return [result[0] for result in results] + + def fetch_schemas_in_database(conn: SnowflakeConnection, db_name: str) -> List[str]: """ Fetches all schemas that the current user has access to in the current database