From a88a980569a28317e85106169f4d38970fbbdea8 Mon Sep 17 00:00:00 2001 From: Anders Date: Fri, 16 Sep 2022 17:05:08 +0000 Subject: [PATCH 1/2] refactored DAG --- dbt_project.yml | 11 ++--- models/fruit_join.sql | 12 ++++++ models/fruit_summary.sql | 13 ++++++ models/stage/scehma.yml | 43 +++++++++++++++++++ models/stage/stg_fruit_prices_fact.sql | 3 ++ .../stg_fruit_user_input.py} | 16 +------ 6 files changed, 77 insertions(+), 21 deletions(-) create mode 100644 models/fruit_join.sql create mode 100644 models/fruit_summary.sql create mode 100644 models/stage/scehma.yml create mode 100644 models/stage/stg_fruit_prices_fact.sql rename models/{fruit_join.py => stage/stg_fruit_user_input.py} (55%) diff --git a/dbt_project.yml b/dbt_project.yml index 51bea7a..9f8178b 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -34,13 +34,10 @@ clean-targets: # directories to be removed by `dbt clean` models: python_wrench: # Config indicated by + and applies to all files under models/example/ - example: - +materialized: view - -quoting: - database: false - schema: false - identifier: false + stage: + stg_fruit_user_input: + +materialized: table + seeds: # to allow for lowercase diff --git a/models/fruit_join.sql b/models/fruit_join.sql new file mode 100644 index 0000000..38e35a6 --- /dev/null +++ b/models/fruit_join.sql @@ -0,0 +1,12 @@ +WITH +stg_input AS (SELECT * FROM {{ ref('stg_fruit_user_input') }}), + +stg_fact AS (SELECT * FROM {{ ref('stg_fruit_prices_fact') }}) + +SELECT + stg_fact."fruit_name", + stg_input."user_name", + stg_input."quantity" * stg_fact."cost" AS "total" +FROM + stg_input LEFT JOIN stg_fact + ON stg_input."fruit_name" = stg_fact."fruit_name" diff --git a/models/fruit_summary.sql b/models/fruit_summary.sql new file mode 100644 index 0000000..be1e70f --- /dev/null +++ b/models/fruit_summary.sql @@ -0,0 +1,13 @@ +WITH +fruit_join AS ( + SELECT * FROM {{ ref('fruit_join') }} +) + +SELECT + "user_name", + SUM("total") AS "total_final" + +FROM fruit_join +WHERE "user_name" IS NOT NULL +GROUP BY "user_name" +ORDER BY SUM("total") DESC diff --git a/models/stage/scehma.yml b/models/stage/scehma.yml new file mode 100644 index 0000000..6dd61a2 --- /dev/null +++ b/models/stage/scehma.yml @@ -0,0 +1,43 @@ +version: 2 + +models: + - name: stg_fruit_user_input + description: prepare to fuzzymatch + columns: + - name: fruit_user_input + quote: true + description: what the user manually typed in the app + tests: + - not_null + - name: quantity + quote: true + description: how many user wants to buy + tests: + - not_null + - name: user_name + quote: true + description: the internal ID of the app user + tests: + - not_null + - name: fruit_name + quote: true + description: best possible fuzzy match b/w user input and fact table + tests: + - not_null: + config: + severity: warn + error_if: ">5" + warn_if: ">2" + - name: fruit_summary + description: total each customer definitely owes minus mismatches + columns: + - name: user_name + quote: true + description: what the user manually typed in the app + tests: + - not_null + - name: total_final + quote: true + description: total amount each user owes + tests: + - not_null \ No newline at end of file diff --git a/models/stage/stg_fruit_prices_fact.sql b/models/stage/stg_fruit_prices_fact.sql new file mode 100644 index 0000000..b023377 --- /dev/null +++ b/models/stage/stg_fruit_prices_fact.sql @@ -0,0 +1,3 @@ +SELECT +* +FROM {{ ref('fruit_prices_fact') }} \ No newline at end of file diff --git a/models/fruit_join.py b/models/stage/stg_fruit_user_input.py similarity index 55% rename from models/fruit_join.py rename to models/stage/stg_fruit_user_input.py index 3573bc8..55c1ed7 100644 --- a/models/fruit_join.py +++ b/models/stage/stg_fruit_user_input.py @@ -7,9 +7,7 @@ def model(dbt, session): packages=["fuzzywuzzy"] ) - df_input = dbt.ref("fruit_user_input").to_pandas() - - df_price = dbt.ref("fruit_prices_fact").to_pandas() + df_price = dbt.ref("stg_fruit_prices_fact").to_pandas() def custom_scorer(string): ''' @@ -24,17 +22,7 @@ def custom_scorer(string): else: return None - df_final = (df_input + return (dbt.ref("fruit_user_input").to_pandas() # make new col, `fruit_name`, with best match against actual table .assign(fruit_name=lambda df: df["fruit_user_input"].apply(custom_scorer)) - # join the actual fruit price table - .merge(df_price, on="fruit_name") - # calculate subtotal - .assign(total=lambda df: df.quantity * df.cost) - # find total for each user - .groupby("user_name")["total"].sum() - .reset_index() - .sort_values("total", ascending=False) ) - - return df_final From c96365475d886f6ced08af392d607539874ad4eb Mon Sep 17 00:00:00 2001 From: Anders Date: Fri, 16 Sep 2022 20:47:55 +0000 Subject: [PATCH 2/2] add back old version under disabled v1 folder --- README.md | 21 ++++++++++++++++++--- dbt_project.yml | 3 +++ models/v1/fruit_join.py | 40 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) create mode 100644 models/v1/fruit_join.py diff --git a/README.md b/README.md index 0e042c9..102bb2f 100644 --- a/README.md +++ b/README.md @@ -4,15 +4,21 @@ It's without question that us dbters [stan](https://www.urbandictionary.com/defi This dbt project shows a trivial example fuzzy string matching in Snowflake using dbt-snowflake Python models in Snowpark. [thefuzz](https://github.com/seatgeek/thefuzz) is the defacto package. While Snowflake SQL has the `EDITDISTANCE()` ([docs](https://docs.snowflake.com/en/sql-reference/functions/editdistance.html)) function, what we're after is "give me the best match for this string, as long as it's 'close enough'" -This is easily accomplished with `process.extractOne()` ([source](https://github.com/seatgeek/thefuzz/blob/791c0bd18c77b4d9911f234c70808dbf24f74152/thefuzz/process.py#L200-L225)) +This is easily accomplished with `thefuzz.process.extractOne()` ([source](https://github.com/seatgeek/thefuzz/blob/791c0bd18c77b4d9911f234c70808dbf24f74152/thefuzz/process.py#L200-L225)) ## Imaginiary Scenario +### Video Walkthough + +If you'd prefer to here a rambling overview. Check out the [video walkthrough]() + ### Shut up and show me the code! - [fuzzer.ipynb](fuzzer.ipynb): A notebook that shows you the code on your local machine -- [/models/fruit_join.py](/models/fruit_join.py): A notebook that shows you the code on your local machine +- [/models/v1/fruit_join.py](/models/v1/fruit_join.py): A Python model that does effectively the majority of the transformation +- [models/stage/stg_fruit_user_input.py](models/stage/stg_fruit_user_input.py) a Python + ### Background @@ -110,4 +116,13 @@ df_final = (df_input return df_final ``` -3. to run this DAG, simply call `dbt build`! \ No newline at end of file +3. to run this DAG, simply call `dbt build`! + + +#### Making the code more dbtonic + +All we're really doing is adding a new column to a raw dataset. This falls which is also know as a staging model. So for v2, [models/stage/stg_fruit_user_input.py](models/stage/stg_fruit_user_input.py), the new column calculation is the only thing that's done to the staging model and it is done in Python. Everything else happens in SQL in downstream models as per usual. + + +From [dbt's best practices](https://docs.getdbt.com/guides/legacy/best-practices) +> Source-centric transformations to transform data from different sources into a consistent structure, for example, re-aliasing and recasting columns, or unioning, joining or deduplicating source data to ensure your model has the correct grain. diff --git a/dbt_project.yml b/dbt_project.yml index 9f8178b..9d8a6aa 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -37,6 +37,9 @@ models: stage: stg_fruit_user_input: +materialized: table + v1: + fruit_join: + +enabled: false seeds: diff --git a/models/v1/fruit_join.py b/models/v1/fruit_join.py new file mode 100644 index 0000000..3573bc8 --- /dev/null +++ b/models/v1/fruit_join.py @@ -0,0 +1,40 @@ +from fuzzywuzzy import process + + +def model(dbt, session): + dbt.config( + materialized="table", + packages=["fuzzywuzzy"] + ) + + df_input = dbt.ref("fruit_user_input").to_pandas() + + df_price = dbt.ref("fruit_prices_fact").to_pandas() + + def custom_scorer(string): + ''' + for a given string + return the best match out of the `fruit_name` column in the df_to table + ''' + + x = process.extractOne(string, df_price["fruit_name"], score_cutoff=60) + + if x is not None: + return x[0] + else: + return None + + df_final = (df_input + # make new col, `fruit_name`, with best match against actual table + .assign(fruit_name=lambda df: df["fruit_user_input"].apply(custom_scorer)) + # join the actual fruit price table + .merge(df_price, on="fruit_name") + # calculate subtotal + .assign(total=lambda df: df.quantity * df.cost) + # find total for each user + .groupby("user_name")["total"].sum() + .reset_index() + .sort_values("total", ascending=False) + ) + + return df_final