Skip to content

Commit

Permalink
Merge pull request #4 from sfc-gh-dyaroshenko/dev
Browse files Browse the repository at this point in the history
test
  • Loading branch information
sfc-gh-dyaroshenko authored Dec 10, 2024
2 parents ec8e7e6 + 963c571 commit 32dc67a
Showing 1 changed file with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"id": "4132a4ef-a90f-4aa4-b334-d7b1aeb2e91e",
"metadata": {
"name": "md_overview",
"collapsed": false
"collapsed": false,
"resultHeight": 190
},
"source": "# 07 Load Daily City Metrics\n\n* Author: Jeremiah Hansen\n* Last Updated: 6/11/2024\n\nThis notebook will load data into the `DAILY_CITY_METRICS` table with support for incremental processing."
},
Expand All @@ -22,7 +23,8 @@
"id": "1c47f41d-b110-4662-a907-fb9d0566fe94",
"metadata": {
"language": "sql",
"name": "sql_get_context"
"name": "sql_get_context",
"resultHeight": 111
},
"outputs": [],
"source": "-- This won't be needed when we can pass variables to Notebooks!\nSELECT current_database() AS DATABASE_NAME, current_schema() AS SCHEMA_NAME",
Expand All @@ -34,7 +36,8 @@
"metadata": {
"language": "python",
"name": "py_imports",
"collapsed": false
"collapsed": false,
"resultHeight": 0
},
"outputs": [],
"source": "# Import python packages\nimport logging\nfrom snowflake.core import Root\n\nlogger = logging.getLogger(\"demo_logger\")\n\n# Get the target database and schema using the results from the SQL cell above\n# This won't be needed when we can pass variables to Notebooks!\ncurrent_context_df = cells.sql_get_context.to_pandas()\ndatabase_name = current_context_df.iloc[0,0]\nschema_name = current_context_df.iloc[0,1]\n\n# We can also use Snowpark for our analyses!\nfrom snowflake.snowpark.context import get_active_session\nsession = get_active_session()\n#session.use_schema(f\"{database_name}.{schema_name}\")\n\nlogger.info(\"07_load_daily_city_metrics start\")",
Expand All @@ -45,7 +48,8 @@
"id": "2dd608eb-bc1f-45a9-81bb-35da23528eed",
"metadata": {
"name": "md_function",
"collapsed": false
"collapsed": false,
"resultHeight": 102
},
"source": "## Create a function to check if a table exists\n\nThis function uses the [Snowflake Python Management API](https://docs.snowflake.com/en/developer-guide/snowflake-python-api/snowflake-python-overview)."
},
Expand All @@ -54,7 +58,8 @@
"id": "f9b7500f-5c4f-4c87-a14f-542427705e07",
"metadata": {
"language": "python",
"name": "py_table_exists"
"name": "py_table_exists",
"resultHeight": 0
},
"outputs": [],
"source": "def table_exists(session, database_name='', schema_name='', table_name=''):\n root = Root(session)\n tables = root.databases[database_name].schemas[schema_name].tables.iter(like=table_name)\n for table_obj in tables:\n if table_obj.name == table_name:\n return True\n\n return False\n\n# Not used, SQL alternative to Python version above\ndef table_exists2(session, database_name='', schema_name='', table_name=''):\n exists = session.sql(\"SELECT EXISTS (SELECT * FROM {}.INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS\".format(database_name, schema_name, table_name)).collect()[0]['TABLE_EXISTS']\n return exists",
Expand All @@ -65,7 +70,8 @@
"id": "37822d24-6c8f-4afe-b010-ac1e7f4a9fdf",
"metadata": {
"name": "md_pipeline",
"collapsed": false
"collapsed": false,
"resultHeight": 60
},
"source": "## Pipeline to update daily_city_metrics"
},
Expand All @@ -76,7 +82,8 @@
"language": "python",
"name": "py_process_dcm",
"collapsed": false,
"codeCollapsed": false
"codeCollapsed": false,
"resultHeight": 0
},
"outputs": [],
"source": "import snowflake.snowpark.functions as F\n\ntable_name = \"DAILY_CITY_METRICS\"\n\n# Define the tables\norder_detail = session.table(\"ORDER_DETAIL\")\nhistory_day = session.table(\"FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.HISTORY_DAY\")\nlocation = session.table(\"LOCATION\")\n\n# Join the tables\norder_detail = order_detail.join(location, order_detail['LOCATION_ID'] == location['LOCATION_ID'])\norder_detail = order_detail.join(history_day, (F.builtin(\"DATE\")(order_detail['ORDER_TS']) == history_day['DATE_VALID_STD']) & (location['ISO_COUNTRY_CODE'] == history_day['COUNTRY']) & (location['CITY'] == history_day['CITY_NAME']))\n\n# Aggregate the data\nfinal_agg = order_detail.group_by(F.col('DATE_VALID_STD'), F.col('CITY_NAME'), F.col('ISO_COUNTRY_CODE')) \\\n .agg( \\\n F.sum('PRICE').alias('DAILY_SALES_SUM'), \\\n F.avg('AVG_TEMPERATURE_AIR_2M_F').alias(\"AVG_TEMPERATURE_F\"), \\\n F.avg(\"TOT_PRECIPITATION_IN\").alias(\"AVG_PRECIPITATION_IN\"), \\\n ) \\\n .select(F.col(\"DATE_VALID_STD\").alias(\"DATE\"), F.col(\"CITY_NAME\"), F.col(\"ISO_COUNTRY_CODE\").alias(\"COUNTRY_DESC\"), \\\n F.builtin(\"ZEROIFNULL\")(F.col(\"DAILY_SALES_SUM\")).alias(\"DAILY_SALES\"), \\\n F.round(F.col(\"AVG_TEMPERATURE_F\"), 2).alias(\"AVG_TEMPERATURE_FAHRENHEIT\"), \\\n F.round(F.col(\"AVG_PRECIPITATION_IN\"), 2).alias(\"AVG_PRECIPITATION_INCHES\"), \\\n )\n\n# If the table doesn't exist then create it\nif not table_exists(session, database_name=database_name, schema_name=schema_name, table_name=table_name):\n final_agg.write.mode(\"overwrite\").save_as_table(table_name)\n\n logger.info(f\"Successfully created {table_name}\")\n# Otherwise update it\nelse:\n cols_to_update = {c: final_agg[c] for c in final_agg.schema.names}\n\n dcm = session.table(table_name)\n dcm.merge(final_agg, (dcm['DATE'] == final_agg['DATE']) & (dcm['CITY_NAME'] == final_agg['CITY_NAME']) & (dcm['COUNTRY_DESC'] == final_agg['COUNTRY_DESC']), \\\n [F.when_matched().update(cols_to_update), F.when_not_matched().insert(cols_to_update)])\n\n logger.info(f\"Successfully updated {table_name}\")\n",
Expand All @@ -87,7 +94,8 @@
"id": "35b06e41-3330-43db-8026-02dfc8d8ecac",
"metadata": {
"name": "md_debugging",
"collapsed": false
"collapsed": false,
"resultHeight": 60
},
"source": "## Debugging"
},
Expand All @@ -97,10 +105,11 @@
"metadata": {
"language": "sql",
"name": "sql_debugging",
"collapsed": false
"collapsed": false,
"resultHeight": 146
},
"outputs": [],
"source": "-- SELECT * FROM DAILY_CITY_METRICS LIMIT 10;",
"source": " SELECT * FROM DAILY_CITY_METRICS LIMIT 10;",
"execution_count": null
}
]
Expand Down

0 comments on commit 32dc67a

Please sign in to comment.