Skip to content

Commit

Permalink
Add support for dynamic table in looker implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-jsummer committed Sep 3, 2024
1 parent d1baa0f commit ec81679
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 53 deletions.
153 changes: 105 additions & 48 deletions admin_apps/partner/looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
check_valid_session_state_values,
format_snowflake_context,
get_available_databases,
get_available_warehouses,
get_available_schemas,
get_snowflake_connection,
input_sample_value_num,
input_semantic_file_name,
run_generate_model_str_from_snowflake,
set_sit_query_tag,
get_sit_query_tag
)
from semantic_model_generator.data_processing.proto_utils import proto_to_dict

Expand Down Expand Up @@ -95,6 +97,8 @@ def set_looker_semantic() -> None:
)
with st.spinner("Loading databases..."):
available_databases = get_available_databases()
with st.spinner("Loading warehouses..."):
available_warehouses = get_available_warehouses()
col1, col2 = st.columns(2)

with col1:
Expand Down Expand Up @@ -140,11 +144,19 @@ def set_looker_semantic() -> None:
)

st.divider()
st.write(
"""
Please pick a Snowflake destination for the table.
"""
)
col1, col2 = st.columns(2, vertical_alignment="center")
with col1:
st.write(
"""
Please pick a Snowflake destination for the table.
"""
)
with col2:
dynamic_table = st.checkbox(
"Dynamic Table",
key='dynamic',
value=False,
)
col1, col2 = st.columns(2)
with col1:
st.selectbox(
Expand All @@ -171,7 +183,35 @@ def set_looker_semantic() -> None:
help="Specify the name of the Snowflake table to materialize.",
)
with col2:
target_lag: int = st.selectbox( # type: ignore
"Dynamic Table Target Lag",
list(range(1, 41)),
index=0,
key='target_lag',
disabled=not dynamic_table,
help="Specifies the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables.",
)
target_lag_unit: str = st.selectbox( # type: ignore
"Target Lag Unit",
['seconds','minutes','hours','days'],
index=1,
key='target_lag_unit',
disabled=not dynamic_table,
help="Specifies the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables.",
)
dynamic_warehouse: str = st.selectbox( # type: ignore
"Warehouse",
available_warehouses,
index=None,
key='dynamic_warehouse',
disabled=not dynamic_table,
help="Specifies the name of the warehouse that provides the compute resources for refreshing the dynamic table.",
)
st.divider()
col1, col2 = st.columns(2)
with col1:
model_name = input_semantic_file_name()
with col2:
sample_values = input_sample_value_num()

if st.button("Continue", type="primary"):
Expand All @@ -194,13 +234,17 @@ def set_looker_semantic() -> None:
full_tablename = f"{st.session_state['looker_target_schema']}.{st.session_state['looker_target_table_name']}"

looker_columns = render_looker_explore_as_table(
get_snowflake_connection(),
st.session_state["looker_model_name"].lower(),
st.session_state["looker_explore_name"].lower(),
st.session_state["looker_target_schema"],
st.session_state["looker_target_table_name"].upper(),
st.session_state["looker_connection_db"],
None, # TO DO - Add support for field selection
conn = get_snowflake_connection(),
model_name = st.session_state["looker_model_name"].lower(),
explore_name = st.session_state["looker_explore_name"].lower(),
snowflake_context = st.session_state["looker_target_schema"],
table_name = st.session_state["looker_target_table_name"].upper(),
optional_db = st.session_state["looker_connection_db"],
fields = None, # TO DO - Add support for field selection
dynamic = dynamic_table,
target_lag = target_lag,
target_lag_unit = target_lag_unit,
warehouse = dynamic_warehouse,
)
st.session_state["looker_field_metadata"] = looker_columns
if st.session_state[
Expand Down Expand Up @@ -328,7 +372,14 @@ def create_query_id(


def create_explore_ctas(
query_string: str, snowflake_context: str, table_name: str
query_string: str,
snowflake_context: str,
table_name: str,
column_list: list[str],
dynamic: Optional[bool] = False,
target_lag: Optional[int] = 20,
target_lag_unit: Optional[str] = "minutes",
warehouse: Optional[str] = None,
) -> str:
"""
Augments Looker Explore SQL with CTAS prefix to create a materialized view.
Expand All @@ -346,10 +397,29 @@ def create_explore_ctas(
line for line in lines if not line.strip().startswith(("LIMIT", "FETCH"))
]
filtered_query_string = "\n".join(filtered_lines)
columns = ", ".join(column_list)
comment = f"COMMENT = '{get_sit_query_tag(vendor = 'looker', action = 'materialize')}'"

if dynamic:
if not [x for x in (target_lag,
target_lag_unit,
warehouse) if x is None]:
ctas_clause = f"""
CREATE OR REPLACE DYNAMIC TABLE {snowflake_context}.{table_name}({columns})
TARGET_LAG = '{target_lag} {target_lag_unit}'
WAREHOUSE = {warehouse}
"""
else:
st.warning("""
Dynamic materialization requires target_lag, target_lag_unit, and warehouse.
Standard materialization will be used.
""")
ctas_clause = f"CREATE OR REPLACE TABLE {snowflake_context}.{table_name}({columns}) "
else:
ctas_clause = f"CREATE OR REPLACE TABLE {snowflake_context}.{table_name}({columns}) "

# Add CTAS clause
ctas_clause = f"CREATE OR REPLACE TABLE {snowflake_context}.{table_name} AS\n"
return ctas_clause + filtered_query_string
ctas_clause += comment + " AS\n" + filtered_query_string
return ctas_clause


def get_explore_sql(
Expand All @@ -370,47 +440,22 @@ def get_explore_sql(
return response


def fetch_columns_in_table(conn: SnowflakeConnection, table_name: str) -> list[str]:
def prep_column_names(columns: list[str]) -> list[str]:
"""
Fetches all columns in a Snowflake table table
Prepares column names for Snowflake table creation
Args:
conn: SnowflakeConnection to run the query
table_name: The fully-qualified name of the table.
Returns: a list of qualified column names (db.schema.column)
"""

query = f"show columns in table {table_name};"
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
return [result[2] for result in results]


def clean_table_columns(
conn: SnowflakeConnection, snowflake_context: str, table_name: str
) -> list[str]:
"""
Renames table columns to remove alias prefixes and double quotes.
Args:
conn: SnowflakeConnection to run the query
snowflake_context (str): Database_name.Schema_name for Snowflake
table_name: The fully-qualified name of the table.
columns: List of column names
Returns: a list of column names
"""

columns = fetch_columns_in_table(conn, f"{snowflake_context}.{table_name}")
new_cols = []

for col in columns:
if "." in col:
new_col = col.split(".")[-1].upper()
else:
new_col = col
new_cols.append(new_col)
query = f'ALTER TABLE {snowflake_context}.{table_name} RENAME COLUMN "{col}" TO {new_col};'
conn.cursor().execute(query)
return new_cols


Expand All @@ -422,6 +467,10 @@ def render_looker_explore_as_table(
table_name: str,
optional_db: Optional[str] = None,
fields: Optional[str] = None,
dynamic: Optional[bool] = False,
target_lag: Optional[int] = 20,
target_lag_unit: Optional[str] = "minutes",
warehouse: Optional[str] = None,
) -> None | dict[str, dict[str, str]]:
"""
Creates materialized table corresponding to Looker Explore.
Expand All @@ -444,6 +493,7 @@ def render_looker_explore_as_table(
field_metadata = get_explore_fields(sdk, model_name, explore_name, fields)
if field_metadata:
metadata_fields = list(field_metadata.keys())
clean_columns = prep_column_names(metadata_fields)
else:
metadata_fields = None
except Exception as e:
Expand All @@ -457,7 +507,16 @@ def render_looker_explore_as_table(
except Exception as e:
st.error(f"Error fetching Looker Explore SQL: {e}")
return None
ctas = create_explore_ctas(explore_sql, snowflake_context, table_name)
ctas = create_explore_ctas(
explore_sql,
snowflake_context,
table_name,
clean_columns,
dynamic,
target_lag,
target_lag_unit,
warehouse,
)

# Create materialized equivalent of Explore
# Looker sources don't require explicit database qualification but instead use connection database implicitly.
Expand All @@ -472,11 +531,9 @@ def render_looker_explore_as_table(
conn.cursor().execute(f"USE DATABASE {current_db};")
else:
conn.cursor().execute(ctas)
# Looker creates lower case double-quoted column names with alias prefixes
new_columns = clean_table_columns(conn, snowflake_context, table_name)

# Associate new column names with looker field metadata
column_metadata = dict(zip(new_columns, list(field_metadata.values())))
column_metadata = dict(zip(clean_columns, list(field_metadata.values())))
return column_metadata


Expand Down
32 changes: 27 additions & 5 deletions admin_apps/shared_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from semantic_model_generator.snowflake_utils.snowflake_connector import (
SnowflakeConnector,
fetch_databases,
fetch_warehouses,
fetch_schemas_in_database,
fetch_tables_views_in_schema,
set_database,
Expand Down Expand Up @@ -98,6 +99,16 @@ def get_available_databases() -> list[str]:

return fetch_databases(get_snowflake_connection())

@st.cache_resource(show_spinner=False)
def get_available_warehouses() -> list[str]:
"""
Simple wrapper around fetch_warehouses to cache the results.
Returns: list of warehouse names
"""

return fetch_warehouses(get_snowflake_connection())


class GeneratorAppScreen(str, Enum):
"""
Expand Down Expand Up @@ -864,12 +875,12 @@ def download_yaml(file_name: str, conn: SnowflakeConnection) -> str:
# Read the raw contents from {temp_dir}/{file_name} and return it as a string.
yaml_str = temp_file.read()
return yaml_str



def set_sit_query_tag(conn: SnowflakeConnection, vendor: str, action: str) -> None:
def get_sit_query_tag(vendor: str = None, action: str = None) -> str:
"""
Sets query tag on a single zero-compute ping for tracking.
Returns: None
Returns SIT query tag.
Returns: str
"""

query_tag = {
Expand All @@ -878,8 +889,19 @@ def set_sit_query_tag(conn: SnowflakeConnection, vendor: str, action: str) -> No
"version": {"major": 1, "minor": 0},
"attributes": {"vendor": vendor, "action": action},
}
return json.dumps(query_tag)



def set_sit_query_tag(conn: SnowflakeConnection, vendor: str = None, action: str = None) -> None:
"""
Sets query tag on a single zero-compute ping for tracking.
Returns: None
"""

query_tag = get_sit_query_tag(vendor, action)

conn.cursor().execute(f"alter session set query_tag='{json.dumps(query_tag)}'")
conn.cursor().execute(f"alter session set query_tag='{query_tag}'")
conn.cursor().execute("SELECT 'SKIMANTICS';")
conn.cursor().execute("alter session set query_tag=''")

Expand Down
16 changes: 16 additions & 0 deletions semantic_model_generator/snowflake_utils/snowflake_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,22 @@ def fetch_databases(conn: SnowflakeConnection) -> List[str]:
return [result[1] for result in results]


def fetch_warehouses(conn: SnowflakeConnection) -> List[str]:
"""
Fetches all warehouses that the current user has access to
Args:
conn: SnowflakeConnection to run the query
Returns: a list of warehouses names
"""
query = "show warehouses;"
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
return [result[0] for result in results]


def fetch_schemas_in_database(conn: SnowflakeConnection, db_name: str) -> List[str]:
"""
Fetches all schemas that the current user has access to in the current database
Expand Down

0 comments on commit ec81679

Please sign in to comment.