From 316c226a0d3dda6a5d852c9f4a60bdbbd9dcba35 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Tue, 28 May 2024 15:46:06 -0700 Subject: [PATCH 01/24] Add files via upload --- samples/iceberg_cortex/spark_env_variables.txt | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 samples/iceberg_cortex/spark_env_variables.txt diff --git a/samples/iceberg_cortex/spark_env_variables.txt b/samples/iceberg_cortex/spark_env_variables.txt new file mode 100644 index 0000000..1509048 --- /dev/null +++ b/samples/iceberg_cortex/spark_env_variables.txt @@ -0,0 +1,9 @@ +export SPARK_HOME=/Users//anaconda3/envs/iceberg-lab/lib/python3.11/site-packages/pyspark +export SNOWFLAKE_CATALOG_URI=jdbc:snowflake://.snowflakecomputing.com +export SNOWFLAKE_ROLE=ACCOUNTADMIN +export SNOWFLAKE_USERNAME= +export PRIVATE_KEY_FILE=~//rsa_key.p8 +export PACKAGES=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,net.snowflake:snowflake-jdbc:3.14.2,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160 +export AWS_REGION= +export AWS_ACCESS_KEY_ID= +export AWS_SECRET_ACCESS_KEY= \ No newline at end of file From ae5ddd37682cd50da2d1d2ec0deb08cc9660f44e Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Tue, 28 May 2024 15:47:15 -0700 Subject: [PATCH 02/24] Update spark.ipynb --- samples/iceberg_cortex/spark.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/iceberg_cortex/spark.ipynb b/samples/iceberg_cortex/spark.ipynb index 92024b2..89f4be2 100644 --- a/samples/iceberg_cortex/spark.ipynb +++ b/samples/iceberg_cortex/spark.ipynb @@ -22,7 +22,7 @@ "# 3. Make sure you're using the 3.11.6 iceberg-demo Python kernel\n", "# 4. Export environment variables from spark_env_variables.txt\n", "# 5. Activate the environment and open jupyter notebooks by running this from your command line\n", - "# conda activate iceberg-lab\n", + "# conda activate iceberg-demo\n", "# jupyter notebook" ] }, From 9ae6530f956b3d8f8510048ae606cbf21db9c652 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Tue, 28 May 2024 15:48:22 -0700 Subject: [PATCH 03/24] Update environment.yml --- samples/iceberg_cortex/environment.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/samples/iceberg_cortex/environment.yml b/samples/iceberg_cortex/environment.yml index afada1f..da7fb3c 100644 --- a/samples/iceberg_cortex/environment.yml +++ b/samples/iceberg_cortex/environment.yml @@ -7,4 +7,5 @@ dependencies: - pyspark=3.5.0 - openjdk=11.0.13 - python=3.11.6 + - python-dotenv - snowflake-snowpark-python From bc924a5f9b33bf46f2c8fb87e372d1c11f97eae3 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Mon, 10 Jun 2024 13:21:18 -0700 Subject: [PATCH 04/24] Update spark.ipynb --- samples/iceberg_cortex/spark.ipynb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/samples/iceberg_cortex/spark.ipynb b/samples/iceberg_cortex/spark.ipynb index a756016..2824ebf 100644 --- a/samples/iceberg_cortex/spark.ipynb +++ b/samples/iceberg_cortex/spark.ipynb @@ -19,10 +19,10 @@ "# 1. Install Conda\n", "# 2. Create an environtment by running this from your command line\n", "# conda env create -f environment.yml\n", - "# 3. Make sure you're using the 3.11.6 iceberg-demo Python kernel\n", + "# 3. Make sure you're using the 3.11.6 iceberg-cortex-demo Python kernel\n", "# 4. Export environment variables from spark_env_variables.txt\n", "# 5. Activate the environment and open jupyter notebooks by running this from your command line\n", - "# conda activate iceberg-lab\n", + "# conda activate iceberg-cortex-demo\n", "# jupyter notebook" ] }, @@ -255,7 +255,7 @@ "source": [ "# When complete, you can deactivate the environment and remove it by running this from command line\n", "# conda deactivate\n", - "# conda remove -n iceberg-demo --all" + "# conda remove -n iceberg-cortex-demo --all" ] } ], From 10b22ccb1d7c99dd8936e39dddf864cf52f6df29 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Mon, 10 Jun 2024 13:22:30 -0700 Subject: [PATCH 05/24] Update conda.sh --- samples/iceberg_cortex/conda.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/iceberg_cortex/conda.sh b/samples/iceberg_cortex/conda.sh index cce4a6a..57e7a8f 100644 --- a/samples/iceberg_cortex/conda.sh +++ b/samples/iceberg_cortex/conda.sh @@ -1,7 +1,7 @@ -conda activate iceberg-demo +conda activate iceberg-cortex-demo jupyter notebook conda deactivate -conda remove -n iceberg-demo --all \ No newline at end of file +conda remove -n iceberg-cortex-demo --all From 9077839253fc1a58b021629fe75884f57a4c6779 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Mon, 10 Jun 2024 13:22:51 -0700 Subject: [PATCH 06/24] Update environment.yml --- samples/iceberg_cortex/environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/iceberg_cortex/environment.yml b/samples/iceberg_cortex/environment.yml index afada1f..3cf5093 100644 --- a/samples/iceberg_cortex/environment.yml +++ b/samples/iceberg_cortex/environment.yml @@ -1,4 +1,4 @@ -name: iceberg-demo +name: iceberg-cortex-demo channels: - conda-forge dependencies: From e32b5084655b1494e28a764566fd8e9e672f67da Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Mon, 10 Jun 2024 13:23:43 -0700 Subject: [PATCH 07/24] Update and rename rename_this_file_to_dot_env to .env --- samples/iceberg_cortex/{rename_this_file_to_dot_env => .env} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename samples/iceberg_cortex/{rename_this_file_to_dot_env => .env} (86%) diff --git a/samples/iceberg_cortex/rename_this_file_to_dot_env b/samples/iceberg_cortex/.env similarity index 86% rename from samples/iceberg_cortex/rename_this_file_to_dot_env rename to samples/iceberg_cortex/.env index 7381202..a6392db 100644 --- a/samples/iceberg_cortex/rename_this_file_to_dot_env +++ b/samples/iceberg_cortex/.env @@ -8,8 +8,8 @@ PACKAGES="org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,net.snowflake: AWS_REGION="us-west-2" -# Get these from AWS SSO in your Okta +# If using Okta, get these from AWS SSO in your Okta AWS_ACCESS_KEY_ID="" AWS_SECRET_ACCESS_KEY="" -AWS_SESSION_TOKEN="" \ No newline at end of file +AWS_SESSION_TOKEN="" From 0437dc409b0310e9dd4e1cdfc1b9e9dcac2dccb7 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Mon, 10 Jun 2024 13:24:24 -0700 Subject: [PATCH 08/24] Update spark.ipynb --- samples/iceberg_cortex/spark.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/iceberg_cortex/spark.ipynb b/samples/iceberg_cortex/spark.ipynb index 2824ebf..3be8e04 100644 --- a/samples/iceberg_cortex/spark.ipynb +++ b/samples/iceberg_cortex/spark.ipynb @@ -83,7 +83,7 @@ "outputs": [], "source": [ "# Create SparkSession, for AWS\n", - "spark = SparkSession.builder.appName('iceberg_lab')\\\n", + "spark = SparkSession.builder.appName('iceberg_cortex_demo')\\\n", " .config('spark.jars.packages', os.environ['PACKAGES'])\\\n", " .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')\\\n", " .config(\"spark.driver.host\",\"127.0.0.1\")\\\n", From 841951df4aded1b778d84b054bd59ededb18cc0a Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Mon, 10 Jun 2024 13:25:01 -0700 Subject: [PATCH 09/24] Update spark.ipynb --- samples/iceberg_cortex/spark.ipynb | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/iceberg_cortex/spark.ipynb b/samples/iceberg_cortex/spark.ipynb index 3be8e04..81e8ecb 100644 --- a/samples/iceberg_cortex/spark.ipynb +++ b/samples/iceberg_cortex/spark.ipynb @@ -68,7 +68,6 @@ "outputs": [], "source": [ "# Load the environment variables from the .env file\n", - "# Before running this cell, rename the file \"rename_this_file_to_dot_env\" in this folder to \".env\"\n", "\n", "load_dotenv()\n", "print(f'AWS REGION: {os.getenv('AWS_REGION')}')\n", From f88781d304ce1d1a44bcc4e4dd48f62eaa9324ac Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Mon, 10 Jun 2024 13:52:12 -0700 Subject: [PATCH 10/24] Update demo.sql --- samples/iceberg_cortex/demo.sql | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/samples/iceberg_cortex/demo.sql b/samples/iceberg_cortex/demo.sql index 552e321..eb22025 100644 --- a/samples/iceberg_cortex/demo.sql +++ b/samples/iceberg_cortex/demo.sql @@ -50,15 +50,10 @@ CREATE OR REPLACE TASK cortex_sentiment_score SCHEDULE = 'USING CRON 0 0 * * * America/Los_Angeles' USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL' AS -UPDATE demo.public.product_reviews pr -SET sentiment = stream_sentiment -FROM ( - SELECT - id, - snowflake.cortex.sentiment(review) AS stream_sentiment - FROM demo.public.product_reviews_stream -) s -WHERE pr.id = s.id; +UPDATE demo.public.product_reviews AS pr + SET sentiment = snowflake.cortex.sentiment(prs.review) + FROM demo.public.product_reviews_stream AS prs + WHERE prs.id = pr.id; -- Use UI to create a reader account -- Use UI to create a share with reader account and add both secure views to share From abb08e4b460449edf2ee4486a2a364826e2f7545 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Mon, 10 Jun 2024 13:53:02 -0700 Subject: [PATCH 11/24] Update demo.sql --- samples/iceberg_cortex/demo.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/iceberg_cortex/demo.sql b/samples/iceberg_cortex/demo.sql index eb22025..253a935 100644 --- a/samples/iceberg_cortex/demo.sql +++ b/samples/iceberg_cortex/demo.sql @@ -46,7 +46,7 @@ CREATE OR REPLACE STAGE demo.public.files CREATE STREAM product_reviews_stream ON TABLE product_reviews; -- Create task to process new records with Cortex sentiment LLM function -CREATE OR REPLACE TASK cortex_sentiment_score +CREATE OR REPLACE TASK demo.public.cortex_sentiment_score SCHEDULE = 'USING CRON 0 0 * * * America/Los_Angeles' USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL' AS From ba2e9110d8d329efec1fb7758d0e4217c26c76ce Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Mon, 10 Jun 2024 13:54:11 -0700 Subject: [PATCH 12/24] Update environment.yml --- samples/iceberg_cortex/environment.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/samples/iceberg_cortex/environment.yml b/samples/iceberg_cortex/environment.yml index 3cf5093..8e31afd 100644 --- a/samples/iceberg_cortex/environment.yml +++ b/samples/iceberg_cortex/environment.yml @@ -7,4 +7,5 @@ dependencies: - pyspark=3.5.0 - openjdk=11.0.13 - python=3.11.6 + - python-dotenv - snowflake-snowpark-python From aacd4c0cf37f6d81bdb1071caf9eb356c8bb6224 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Mon, 10 Jun 2024 15:34:15 -0700 Subject: [PATCH 13/24] Add files via upload --- samples/iceberg_cortex/snowflake.ipynb | 250 +++++++++++++++++++++++++ 1 file changed, 250 insertions(+) create mode 100644 samples/iceberg_cortex/snowflake.ipynb diff --git a/samples/iceberg_cortex/snowflake.ipynb b/samples/iceberg_cortex/snowflake.ipynb new file mode 100644 index 0000000..9d6bc59 --- /dev/null +++ b/samples/iceberg_cortex/snowflake.ipynb @@ -0,0 +1,250 @@ +{ + "metadata": { + "kernelspec": { + "display_name": "Streamlit Notebook", + "name": "streamlit" + } + }, + "nbformat_minor": 5, + "nbformat": 4, + "cells": [ + { + "cell_type": "markdown", + "id": "67b56ef8-8cd2-440e-930d-15190fe5da6e", + "metadata": { + "name": "md_setup_ev", + "collapsed": false + }, + "source": "# Setup Snowflake\n## Create an External Volume\nTo create an external volume, complete the instructions for your cloud storage service:\n- [Accessing Amazon S3 using external volumes](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume-s3)\n- [Accessing Microsoft Azure Storage using external volumes](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume-azure)\n\nRemember from the Overview section, your Snowflake account must be in the same region as your external volume location. And to use the Sentiment LLM function, [supported regions](https://docs.snowflake.com/en/user-guide/snowflake-cortex/llm-functions#availability) currently include:\n- AWS US West 2 (Oregon)\n- AWS US East 1 (N. Virginia)\n- AWS Europe Central 1 (Frankfurt)\n- Azure East US 2 (Virginia)\n- Azure West Europe (Netherlands)" + }, + { + "cell_type": "code", + "id": "405c7e6d-3416-4c91-a474-8ebec71448e9", + "metadata": { + "language": "sql", + "name": "sql_create_ev", + "codeCollapsed": false + }, + "outputs": [], + "source": "-- Use accountadmin role to create an external volume\nUSE ROLE accountadmin;\n\n-- Create an external volume\nCREATE OR REPLACE EXTERNAL VOLUME iceberg_cortex_vol\n STORAGE_LOCATIONS =\n (\n (\n NAME = ''\n STORAGE_PROVIDER = ''\n STORAGE_BASE_URL = ''\n \n STORAGE_AWS_ROLE_ARN = ''\n STORAGE_AWS_EXTERNAL_ID = ''\n\n AZURE_TENANT_ID = ''\n )\n );", + "execution_count": null + }, + { + "cell_type": "markdown", + "id": "9f76cdb8-b8a9-4034-88ef-683cf1d54068", + "metadata": { + "name": "md_iceberg_catalogs", + "collapsed": false + }, + "source": "## Create an Iceberg Table\nIceberg Tables can currently use Snowflake, AWS Glue, or object storage as the catalog. In public preview soon, Snowflake can use catalog integration with an Iceberg REST endpoint. In this quickstart, use Snowflake as the catalog to allow read and write operations to the table. More information about integrating catalogs can be found [here](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration).\n\nCreate an Iceberg Table referencing the external volume you just created. You can specify `BASE_LOCATION` to instruct Snowflake where to write table data and metadata, or leave empty to write data and metadata to the location specified in the external volume definition." + }, + { + "cell_type": "code", + "id": "8d50cbf4-0c8d-4950-86cb-114990437ac9", + "metadata": { + "language": "sql", + "name": "sql_create_iceberg_table", + "codeCollapsed": false + }, + "source": "CREATE OR REPLACE ICEBERG TABLE demo.public.product_reviews (\n id STRING,\n product_name STRING,\n product_id STRING,\n reviewer_name STRING,\n review_date DATE,\n review STRING,\n sentiment FLOAT\n)\n CATALOG = 'SNOWFLAKE'\n EXTERNAL_VOLUME = 'iceberg_cortex_vol'\n BASE_LOCATION = 'demo/product_reviews/'\n;", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "9b89570d-c063-44f2-9c34-2d8e229dbb9c", + "metadata": { + "name": "md_load", + "collapsed": false + }, + "source": "# Load CSV files into Iceberg via Snowpark Python\nThere are multiple ways to load new data into Snowflake-managed Iceberg Tables including INSERT, [COPY INTO](https://docs.snowflake.com/en/sql-reference/sql/copy-into-table), and [Snowpipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-auto).\n\nFor this quickstart, we will use Snowpark to write CSV files from dataframes into the Iceberg Table. Snowflake will write Parquet files and Iceberg metadata to your external volume.\n\nFirst, create an external stage and file format." + }, + { + "cell_type": "code", + "id": "d5639abc-f09b-4816-a9ed-d3d150970ea6", + "metadata": { + "language": "sql", + "name": "sql_file_setup", + "codeCollapsed": false + }, + "outputs": [], + "source": "-- Create a file format\nCREATE OR REPLACE FILE FORMAT demo.public.csv_ff\n TYPE = 'CSV'\n FIELD_OPTIONALLY_ENCLOSED_BY = '\"'\n SKIP_HEADER = 1;\n\n-- Create an external stage to read CSV files from an S3 bucket in-place\nCREATE OR REPLACE STAGE demo.public.files\n URL = 's3://sfquickstarts/iceberg_cortex/'\n FILE_FORMAT = demo.public.csv_ff\n DIRECTORY = (ENABLE = TRUE);", + "execution_count": null + }, + { + "cell_type": "code", + "id": "c695373e-ac74-4b62-a1f1-08206cbd5c81", + "metadata": { + "language": "python", + "name": "imports", + "codeCollapsed": false + }, + "source": "# Import necessary modules and create a session\nimport json|\nfrom snowflake.snowpark import Session\nimport snowflake.snowpark.types as T\n\nsession = get_active_session()", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "e97bfc44-4b44-4b55-8773-def2557abec9", + "metadata": { + "language": "python", + "name": "py_create_schema" + }, + "outputs": [], + "source": "# Create a schema Snowpark dataframe matching the CSV files\nreviews_schema = T.StructType([T.StructField(\"ID\", T.StringType()),\n T.StructField(\"PRODUCT_NAME\", T.StringType()),\n T.StructField(\"PRODUCT_ID\", T.StringType()),\n T.StructField(\"REVIEWER_NAME\", T.StringType()),\n T.StructField(\"REVIEW_DATE\", T.DateType()),\n T.StructField(\"REVIEW\", T.StringType()),\n T.StructField(\"SENTIMENT\", T.FloatType())])", + "execution_count": null + }, + { + "cell_type": "code", + "id": "f3ffe28e-db1e-4a91-af82-7d2f36350b66", + "metadata": { + "language": "python", + "name": "py_jan_df", + "codeCollapsed": false + }, + "outputs": [], + "source": "# Read the January product reviews into a dataframe\njan_df = session.read \\\n .schema(reviews_schema) \\\n .option(\"skip_header\", 1) \\\n .option(\"field_optionally_enclosed_by\", '\"') \\\n .csv(\"@demo.public.files/product_reviews_jan_24.csv\")\n\n# View the dataframe\njan_df.show()", + "execution_count": null + }, + { + "cell_type": "code", + "id": "c3d25722-6560-4716-b3e6-d558bd1415ed", + "metadata": { + "language": "python", + "name": "py_jan_df_write", + "codeCollapsed": false + }, + "outputs": [], + "source": "# Write the dataframe to the Iceberg Table\njan_df.write.mode(\"append\").save_as_table(\"demo.public.product_reviews\")", + "execution_count": null + }, + { + "cell_type": "markdown", + "id": "d22fa91f-31c3-48ba-bbf8-a889ab8ccbc0", + "metadata": { + "name": "md_load_complete", + "collapsed": false + }, + "source": "You now see metadata files and Parquet data files in your object storage, whether you’re using Amazon S3 or Azure storage.\n\n![iceberg_files](https://github.com/Snowflake-Labs/sfquickstarts/blob/master/site/sfguides/src/cortex_ai_sentiment_iceberg/assets/iceberg_files.png)" + }, + { + "cell_type": "markdown", + "id": "cfe9fb1f-5f0c-4547-9dca-66c5c0f6d1a1", + "metadata": { + "name": "md_cortex", + "collapsed": false + }, + "source": "# Snowflake Cortex LLM Functions\nNow you can query the Iceberg Table using LLM functions from Snowflake Cortex AI. Run the query below to calculate sentiment scores for product reviews." + }, + { + "cell_type": "code", + "id": "300f04d3-98c4-4697-976a-267b7fb7914c", + "metadata": { + "language": "sql", + "name": "sql_reviews_select", + "codeCollapsed": false + }, + "outputs": [], + "source": "SELECT\n product_name,\n review_date,\n snowflake.cortex.sentiment(review) as review_sentiment\nFROM demo.public.product_reviews", + "execution_count": null + }, + { + "cell_type": "code", + "id": "1fec430b-3171-45e4-9059-570f21b4f06f", + "metadata": { + "language": "sql", + "name": "sql_update", + "codeCollapsed": false + }, + "outputs": [], + "source": "-- Write the sentiment scores back to the Iceberg Table.\nUPDATE demo.public.product_reviews AS pr\n SET sentiment = jan.review_sentiment\n FROM {{sql_reviews_jan}} AS jan\n WHERE jan.id = pr.id;", + "execution_count": null + }, + { + "cell_type": "markdown", + "id": "e131af80-db4c-4de4-8fef-3ba4f9ee163c", + "metadata": { + "name": "md_create_pipeline", + "collapsed": false + }, + "source": "# Create a CDC Pipeline\nSuppose new product reviews continue to be generated, stored as new CSV files, and you'd like to use Snowflake to automatically compute sentiment scores on new product reviews.\n\n[Streams on Directory Tables](https://docs.snowflake.com/en/user-guide/data-load-dirtables-pipeline) can detect new files in stages, perform computation, and store results. LLM functions from Snowflake Cortex can be called in these pipelines, writing results to Iceberg Tables.\n\nTo simulate this, create a Stream on the Iceberg Table to detect new product reviews loaded to the table. On a schedule, a Serverless Task will call the SENTIMENT function on to incrementally process new records." + }, + { + "cell_type": "code", + "id": "3090e499-5e0e-4bbf-99db-3103314d5582", + "metadata": { + "language": "sql", + "name": "sql_create_pipeline", + "codeCollapsed": false + }, + "outputs": [], + "source": "-- Create a Stream to detect new product review records in the Iceberg Table\nCREATE STREAM demo.public.product_reviews_stream ON TABLE demo.public.product_reviews;\n\n-- Create a Serverless Task to add sentiment for new records from the Stream\nCREATE OR REPLACE TASK demo.public.cortex_sentiment_score\n SCHEDULE = 'USING CRON 0 0 * * * America/Los_Angeles'\n USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'\nAS\nUPDATE demo.public.product_reviews AS pr\n SET sentiment = snowflake.cortex.sentiment(prs.review)\n FROM demo.public.product_reviews_stream AS prs\n WHERE prs.id = pr.id;", + "execution_count": null + }, + { + "cell_type": "markdown", + "id": "8080fa4d-3b6c-469e-8e0e-f502e707f053", + "metadata": { + "name": "md_feb_df", + "collapsed": false + }, + "source": "Now see the incremental processing pipeline in action. Create a dataframe for February product reviews and write it to the Iceberg Table." + }, + { + "cell_type": "code", + "id": "d0eb3b62-6823-4292-a4b8-d84309c98bc5", + "metadata": { + "language": "python", + "name": "py_feb_df", + "codeCollapsed": false + }, + "outputs": [], + "source": "feb_df = session.read \\\n .schema(reviews_schema) \\\n .option(\"skip_header\", 1) \\\n .option(\"field_optionally_enclosed_by\", '\"') \\\n .csv(\"@demo.public.files/product_reviews_feb_24.csv\")\n\nfeb_df.write.mode(\"append\").save_as_table(\"demo.public.product_reviews\")", + "execution_count": null + }, + { + "cell_type": "markdown", + "id": "04f10f72-e47c-4662-abbb-a880b37d3bf4", + "metadata": { + "name": "md_reviews", + "collapsed": false + }, + "source": "The Task will execute on the specified schedule. Manually trigger the task to calculate sentiment scores for February product reviews, writing the results back to the Iceberg Table. Now, you should see the February product reviews and sentiment scores.\n\nFor example, for each product, what was the change in sentiment from January to February? Run the query below." + }, + { + "cell_type": "code", + "id": "113cd2e4-335e-4d77-9667-89fc2ca452f1", + "metadata": { + "language": "sql", + "name": "run_task" + }, + "outputs": [], + "source": "-- Manually trigger Task\nEXECUTE task demo.public.cortex_sentiment_score;", + "execution_count": null + }, + { + "cell_type": "code", + "id": "cd58a206-4d87-47c6-a460-4c07235126c5", + "metadata": { + "language": "sql", + "name": "sql_reviews", + "codeCollapsed": false + }, + "outputs": [], + "source": "-- Sentiment change from January to February\nWITH jan AS (\n SELECT\n product_name,\n AVG(sentiment) AS avg_sentiment\n FROM demo.public.product_reviews\n WHERE MONTHNAME(review_date) = 'Jan'\n GROUP BY 1\n)\n, feb AS (\n SELECT\n product_name,\n AVG(sentiment) AS avg_sentiment\n FROM demo.public.product_reviews\n WHERE MONTHNAME(review_date) = 'Feb'\n GROUP BY 1\n)\nSELECT\n COALESCE(j.product_name, f.product_name) AS product_name,\n j.avg_sentiment AS jan_sentiment,\n f.avg_sentiment AS feb_sentiment,\n feb_sentiment - jan_sentiment AS sentiment_diff\nFROM jan j\nFULL OUTER JOIN feb f\n ON j.product_name = f.product_name\nORDER BY sentiment_diff DESC;", + "execution_count": null + }, + { + "cell_type": "code", + "id": "70e846d4-6377-4a67-a004-912086e0d8d3", + "metadata": { + "language": "python", + "name": "py_chart", + "codeCollapsed": false + }, + "outputs": [], + "source": "import streamlit as st\n\nst.bar_chart(sql_reviews.to_df(), x='SENTIMENT_DIFF', y='PRODUCT_NAME')", + "execution_count": null + } + ] +} \ No newline at end of file From cba3ded15e026d07f948ce9f16f162809ee65b6b Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Mon, 10 Jun 2024 15:39:01 -0700 Subject: [PATCH 14/24] Update .env --- samples/iceberg_cortex/.env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/iceberg_cortex/.env b/samples/iceberg_cortex/.env index a6392db..b830c3e 100644 --- a/samples/iceberg_cortex/.env +++ b/samples/iceberg_cortex/.env @@ -6,7 +6,7 @@ SNOWFLAKE_CATALOG_URI="jdbc:snowflake://.snowflakecomput PACKAGES="org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,net.snowflake:snowflake-jdbc:3.14.2,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160" -AWS_REGION="us-west-2" +AWS_REGION="" # If using Okta, get these from AWS SSO in your Okta From 88e35f21ec84079ecc3fc6bf49159407857c2271 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Mon, 10 Jun 2024 15:39:42 -0700 Subject: [PATCH 15/24] Update spark_env_variables.txt --- samples/iceberg_cortex/spark_env_variables.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/samples/iceberg_cortex/spark_env_variables.txt b/samples/iceberg_cortex/spark_env_variables.txt index 1509048..a415ef7 100644 --- a/samples/iceberg_cortex/spark_env_variables.txt +++ b/samples/iceberg_cortex/spark_env_variables.txt @@ -2,8 +2,7 @@ export SPARK_HOME=/Users//anaconda3/envs/iceberg-lab/lib/python3.11/si export SNOWFLAKE_CATALOG_URI=jdbc:snowflake://.snowflakecomputing.com export SNOWFLAKE_ROLE=ACCOUNTADMIN export SNOWFLAKE_USERNAME= -export PRIVATE_KEY_FILE=~//rsa_key.p8 export PACKAGES=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,net.snowflake:snowflake-jdbc:3.14.2,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160 export AWS_REGION= export AWS_ACCESS_KEY_ID= -export AWS_SECRET_ACCESS_KEY= \ No newline at end of file +export AWS_SECRET_ACCESS_KEY= From dc505acd7f4cde7be36f68eae773a2db1fb6cf81 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Tue, 11 Jun 2024 11:26:27 -0700 Subject: [PATCH 16/24] Update .env --- samples/iceberg_cortex/.env | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/samples/iceberg_cortex/.env b/samples/iceberg_cortex/.env index b830c3e..307639e 100644 --- a/samples/iceberg_cortex/.env +++ b/samples/iceberg_cortex/.env @@ -6,10 +6,9 @@ SNOWFLAKE_CATALOG_URI="jdbc:snowflake://.snowflakecomput PACKAGES="org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,net.snowflake:snowflake-jdbc:3.14.2,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160" +# If using Amazon S3 for storage, then include these variables: AWS_REGION="" - -# If using Okta, get these from AWS SSO in your Okta - AWS_ACCESS_KEY_ID="" AWS_SECRET_ACCESS_KEY="" +# If using MFA or SSO, provide the session token, otherwise delete this variable. AWS_SESSION_TOKEN="" From c48e45d6b6f395db0e4ee5f4bb6e72946943bdf9 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Tue, 11 Jun 2024 11:36:08 -0700 Subject: [PATCH 17/24] Update .env --- samples/iceberg_cortex/.env | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/samples/iceberg_cortex/.env b/samples/iceberg_cortex/.env index 307639e..2f2ba29 100644 --- a/samples/iceberg_cortex/.env +++ b/samples/iceberg_cortex/.env @@ -4,11 +4,14 @@ SNOWFLAKE_USERNAME="" SNOWFLAKE_PASSWORD="" SNOWFLAKE_CATALOG_URI="jdbc:snowflake://.snowflakecomputing.com" -PACKAGES="org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,net.snowflake:snowflake-jdbc:3.14.2,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160" - # If using Amazon S3 for storage, then include these variables: +PACKAGES="org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,net.snowflake:snowflake-jdbc:3.14.2,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160" AWS_REGION="" AWS_ACCESS_KEY_ID="" AWS_SECRET_ACCESS_KEY="" # If using MFA or SSO, provide the session token, otherwise delete this variable. AWS_SESSION_TOKEN="" + +# If using Azure for storage, then include these variables: +PACKAGES="org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,net.snowflake:snowflake-jdbc:3.14.2,com.microsoft.azure:azure-storage:8.6.6,org.apache.hadoop:hadoop-azure:3.3.6" +AZURE_ACCESS_KEY="" From dd9d021541c25387b5054c5a076f6a6fcfc47341 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Tue, 11 Jun 2024 11:37:00 -0700 Subject: [PATCH 18/24] Delete samples/iceberg_cortex/spark.ipynb --- samples/iceberg_cortex/spark.ipynb | 282 ----------------------------- 1 file changed, 282 deletions(-) delete mode 100644 samples/iceberg_cortex/spark.ipynb diff --git a/samples/iceberg_cortex/spark.ipynb b/samples/iceberg_cortex/spark.ipynb deleted file mode 100644 index 81e8ecb..0000000 --- a/samples/iceberg_cortex/spark.ipynb +++ /dev/null @@ -1,282 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "a1680d4b-49a6-4c8e-9a63-dba8c0d920a2", - "metadata": {}, - "source": [ - "# Locate Spark in Virtual Environment" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "37df6598-a793-4ec1-b5bb-8bd1fc7aaec7", - "metadata": {}, - "outputs": [], - "source": [ - "# Prior to executing this code below:\n", - "# 1. Install Conda\n", - "# 2. Create an environtment by running this from your command line\n", - "# conda env create -f environment.yml\n", - "# 3. Make sure you're using the 3.11.6 iceberg-cortex-demo Python kernel\n", - "# 4. Export environment variables from spark_env_variables.txt\n", - "# 5. Activate the environment and open jupyter notebooks by running this from your command line\n", - "# conda activate iceberg-cortex-demo\n", - "# jupyter notebook" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "fa2dd9f6-be83-47d2-836c-a077787a3aff", - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "from dotenv import load_dotenv\n", - "import findspark\n", - "findspark.init()\n", - "findspark.find()" - ] - }, - { - "cell_type": "markdown", - "id": "7ce62921-cc0d-41aa-b3d5-04f640f88f0a", - "metadata": {}, - "source": [ - "# Run Spark " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "fc30a594-eaec-47e5-9b37-1420936ee943", - "metadata": {}, - "outputs": [], - "source": [ - "import pyspark\n", - "from pyspark.sql import SparkSession\n", - "from pyspark.sql import functions as F" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "bd604d50", - "metadata": {}, - "outputs": [], - "source": [ - "# Load the environment variables from the .env file\n", - "\n", - "load_dotenv()\n", - "print(f'AWS REGION: {os.getenv('AWS_REGION')}')\n", - "print(f'PACKAGES: {os.getenv('PACKAGES')}')\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "20a99ce0-e4a4-4e52-89f3-cb6e3e4665f3", - "metadata": {}, - "outputs": [], - "source": [ - "# Create SparkSession, for AWS\n", - "spark = SparkSession.builder.appName('iceberg_cortex_demo')\\\n", - " .config('spark.jars.packages', os.environ['PACKAGES'])\\\n", - " .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')\\\n", - " .config(\"spark.driver.host\",\"127.0.0.1\")\\\n", - " .config(\"spark.driver.bindAddress\",\"127.0.0.1\")\\\n", - " .getOrCreate()" - ] - }, - { - "cell_type": "markdown", - "id": "8c160774-3d56-4e2f-9102-7b745da4a12f", - "metadata": {}, - "source": [ - "### Spark configurations\n", - "Set the following configurations for Spark" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "1ad480e1-8a79-4d81-a5ae-9f6401d5545d", - "metadata": {}, - "outputs": [], - "source": [ - "spark.conf.set(\"spark.sql.defaultCatalog\", \"snowflake_catalog\")\n", - "spark.conf.set(\"spark.sql.catalog.snowflake_catalog\", \"org.apache.iceberg.spark.SparkCatalog\")\n", - "spark.conf.set(\"spark.sql.catalog.snowflake_catalog.catalog-impl\", \"org.apache.iceberg.snowflake.SnowflakeCatalog\")\n", - "spark.conf.set(\"spark.sql.catalog.snowflake_catalog.uri\", os.environ['SNOWFLAKE_CATALOG_URI'])\n", - "spark.conf.set(\"spark.sql.catalog.snowflake_catalog.jdbc.role\", os.environ['SNOWFLAKE_ROLE'])\n", - "spark.conf.set(\"spark.sql.catalog.snowflake_catalog.jdbc.user\", os.environ['SNOWFLAKE_USERNAME'])\n", - "spark.conf.set(\"spark.sql.catalog.snowflake_catalog.jdbc.password\", os.environ['SNOWFLAKE_PASSWORD'])\n", - "spark.conf.set(\"spark.sql.iceberg.vectorization.enabled\", \"false\")\n", - "spark.conf.set(\"spark.sql.catalog.snowflake_catalog.io-impl\", \"org.apache.iceberg.aws.s3.S3FileIO\")\n", - "spark.conf.set(\"spark.hadoop.fs.s3a.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n", - "spark.conf.set(\"spark.hadoop.fs.s3a.aws.credentials.provider\", \"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider\")\n", - "spark.conf.set(\"spark.hadoop.fs.s3a.access.key\", os.environ['AWS_ACCESS_KEY_ID'])\n", - "spark.conf.set(\"spark.hadoop.fs.s3a.secret.key\", os.environ['AWS_SECRET_ACCESS_KEY'])\n", - "spark.conf.set(\"spark.hadoop.fs.s3a.endpoint\", \"s3.amazonaws.com\")\n", - "spark.conf.set(\"spark.hadoop.fs.s3a.endpoint.region\", os.environ['AWS_REGION'])" - ] - }, - { - "cell_type": "markdown", - "id": "c5304667-f991-4004-8385-0a948f1bc007", - "metadata": {}, - "source": [ - "# Read Snowflake-managed Iceberg Tables" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "9f02e0a2-eacc-4627-8016-46b45613c3fd", - "metadata": {}, - "outputs": [], - "source": [ - "spark.sql(\"USE DEMO.PUBLIC\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "adf6b944-ca4f-4c08-9d0c-56931d951875", - "metadata": {}, - "outputs": [], - "source": [ - "df_product_reviews = spark.table(\"demo.public.product_reviews\")\n", - "df_product_reviews.show()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "2cd1bc70", - "metadata": {}, - "outputs": [], - "source": [ - "df_reviews_per_day = df_product_reviews.groupBy(\"review_date\") \\\n", - " .agg(F.countDistinct(\"id\") \\\n", - " .alias(\"num_reviews\"))\n", - "df_reviews_per_day.orderBy(\"review_date\", ascenting=False).show(truncate=False)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "193fb82b", - "metadata": {}, - "outputs": [], - "source": [ - "df_product_sentiment = df_product_reviews.groupBy(\"product_name\") \\\n", - " .agg(F.avg(\"sentiment\") \\\n", - " .alias(\"avg_sentiment\"))\n", - "df_product_sentiment.orderBy(\"avg_sentiment\", ascending=False).show(truncate=False)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "10901aca", - "metadata": {}, - "outputs": [], - "source": [ - "df_product_reviews = spark.table(\"demo.public.product_reviews\")\n", - "df_product_reviews.createOrReplaceTempView(\"product_reviews\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "772ceaeb", - "metadata": {}, - "outputs": [], - "source": [ - "jan_df = spark.sql(\"\"\"\n", - " SELECT\n", - " product_name,\n", - " avg(sentiment) as avg_sentiment\n", - " FROM product_reviews\n", - " WHERE MONTH(review_date) = 1\n", - " GROUP BY product_name\n", - " ORDER BY avg_sentiment DESC\n", - "\"\"\")\n", - "jan_df.show()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "14bdbf1c", - "metadata": {}, - "outputs": [], - "source": [ - "feb_df = spark.sql(\"\"\"\n", - " SELECT\n", - " product_name,\n", - " avg(sentiment) as avg_sentiment\n", - " FROM product_reviews\n", - " WHERE MONTH(review_date) = 2\n", - " GROUP BY product_name\n", - " ORDER BY avg_sentiment DESC\n", - "\"\"\")\n", - "feb_df.show()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "d64f0650", - "metadata": {}, - "outputs": [], - "source": [ - "result_df = jan_df.alias(\"jan\").join(feb_df.alias(\"feb\"), jan_df.product_name == feb_df.product_name, how=\"full_outer\") \\\n", - " .select(\n", - " F.coalesce(F.col(\"jan.product_name\"), F.col(\"feb.product_name\")).alias(\"product_name\"),\n", - " jan_df.avg_sentiment.alias(\"jan_sentiment\"),\n", - " feb_df.avg_sentiment.alias(\"feb_sentiment\")\n", - " ) \\\n", - " .withColumn(\"sentiment_diff\", F.col(\"feb_sentiment\") - F.col(\"jan_sentiment\")) \\\n", - " .orderBy(\"sentiment_diff\", ascending=False)\n", - "result_df.show()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "4ee3f3f0", - "metadata": {}, - "outputs": [], - "source": [ - "# When complete, you can deactivate the environment and remove it by running this from command line\n", - "# conda deactivate\n", - "# conda remove -n iceberg-cortex-demo --all" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.6" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} From e6a098e37fb696b057c67cd216086f0704ca4136 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Tue, 11 Jun 2024 11:37:26 -0700 Subject: [PATCH 19/24] Add files via upload --- samples/iceberg_cortex/spark.ipynb | 335 +++++++++++++++++++++++++++++ 1 file changed, 335 insertions(+) create mode 100644 samples/iceberg_cortex/spark.ipynb diff --git a/samples/iceberg_cortex/spark.ipynb b/samples/iceberg_cortex/spark.ipynb new file mode 100644 index 0000000..ad73a55 --- /dev/null +++ b/samples/iceberg_cortex/spark.ipynb @@ -0,0 +1,335 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "a1680d4b-49a6-4c8e-9a63-dba8c0d920a2", + "metadata": {}, + "source": [ + "# Locate Spark in Virtual Environment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "37df6598-a793-4ec1-b5bb-8bd1fc7aaec7", + "metadata": {}, + "outputs": [], + "source": [ + "# Prior to executing this code below:\n", + "# 1. Install Conda\n", + "# 2. Create an environtment by running this from your command line\n", + "# conda env create -f environment.yml\n", + "# 3. Make sure you're using the 3.11.6 iceberg-cortex-demo Python kernel\n", + "# 4. Activate the environment and open jupyter notebooks by running this from your command line\n", + "# conda activate iceberg-cortex-demo\n", + "# jupyter notebook" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fa2dd9f6-be83-47d2-836c-a077787a3aff", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from dotenv import load_dotenv\n", + "import findspark\n", + "findspark.init()\n", + "findspark.find()" + ] + }, + { + "cell_type": "markdown", + "id": "7ce62921-cc0d-41aa-b3d5-04f640f88f0a", + "metadata": {}, + "source": [ + "# Run Spark " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fc30a594-eaec-47e5-9b37-1420936ee943", + "metadata": {}, + "outputs": [], + "source": [ + "import pyspark\n", + "from pyspark.sql import SparkSession\n", + "from pyspark.sql import functions as F" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bd604d50", + "metadata": {}, + "outputs": [], + "source": [ + "# Load the environment variables from the .env file\n", + "\n", + "load_dotenv()\n", + "print(f\"AWS REGION: {os.getenv('AWS_REGION')}\")\n", + "print(f\"PACKAGES: {os.getenv('PACKAGES')}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "20a99ce0-e4a4-4e52-89f3-cb6e3e4665f3", + "metadata": {}, + "outputs": [], + "source": [ + "# Create SparkSession, for AWS\n", + "spark = SparkSession.builder.appName('iceberg_cortex_demo')\\\n", + " .config('spark.jars.packages', os.environ['PACKAGES'])\\\n", + " .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')\\\n", + " .config(\"spark.driver.host\",\"127.0.0.1\")\\\n", + " .config(\"spark.driver.bindAddress\",\"127.0.0.1\")\\\n", + " .getOrCreate()" + ] + }, + { + "cell_type": "markdown", + "id": "8c160774-3d56-4e2f-9102-7b745da4a12f", + "metadata": {}, + "source": [ + "### Spark configurations\n", + "Set the following configurations for Spark regardless of which cloud your Snowflake and storage accounts are." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1ad480e1-8a79-4d81-a5ae-9f6401d5545d", + "metadata": {}, + "outputs": [], + "source": [ + "spark.conf.set(\"spark.sql.defaultCatalog\", \"snowflake_catalog\")\n", + "spark.conf.set(\"spark.sql.catalog.snowflake_catalog\", \"org.apache.iceberg.spark.SparkCatalog\")\n", + "spark.conf.set(\"spark.sql.catalog.snowflake_catalog.catalog-impl\", \"org.apache.iceberg.snowflake.SnowflakeCatalog\")\n", + "spark.conf.set(\"spark.sql.catalog.snowflake_catalog.uri\", os.environ['SNOWFLAKE_CATALOG_URI'])\n", + "spark.conf.set(\"spark.sql.catalog.snowflake_catalog.jdbc.role\", os.environ['SNOWFLAKE_ROLE'])\n", + "spark.conf.set(\"spark.sql.catalog.snowflake_catalog.jdbc.user\", os.environ['SNOWFLAKE_USERNAME'])\n", + "spark.conf.set(\"spark.sql.catalog.snowflake_catalog.jdbc.password\", os.environ['SNOWFLAKE_PASSWORD'])\n", + "spark.conf.set(\"spark.sql.iceberg.vectorization.enabled\", \"false\")" + ] + }, + { + "cell_type": "markdown", + "id": "8da5bdba", + "metadata": {}, + "source": [ + "#### AWS configurations\n", + "If using AWS for this demo, apply these configurations." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f25e479f", + "metadata": {}, + "outputs": [], + "source": [ + "spark.conf.set(\"spark.sql.catalog.snowflake_catalog.io-impl\", \"org.apache.iceberg.aws.s3.S3FileIO\")\n", + "spark.conf.set(\"spark.hadoop.fs.s3a.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n", + "spark.conf.set(\"spark.hadoop.fs.s3a.aws.credentials.provider\", \"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider\")\n", + "# If you are using session credentials instead of simple name/secret credentials, use the credentials provider below instead of above.\n", + "#spark.conf.set(\"spark.hadoop.fs.s3a.aws.credentials.provider\", \"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider\")\n", + "spark.conf.set(\"spark.hadoop.fs.s3a.access.key\", os.environ['AWS_ACCESS_KEY_ID'])\n", + "spark.conf.set(\"spark.hadoop.fs.s3a.secret.key\", os.environ['AWS_SECRET_ACCESS_KEY'])\n", + "# If you are using session credentials instead of simple name/secret credentials, also set the configuration below\n", + "#spark.conf.set(\"spark.hadoop.fs.s3a.session.token\", os.environ['AWS_SESSION_TOKEN'])\n", + "spark.conf.set(\"spark.hadoop.fs.s3a.endpoint\", \"s3.amazonaws.com\")\n", + "spark.conf.set(\"spark.hadoop.fs.s3a.endpoint.region\", os.environ['AWS_REGION'])" + ] + }, + { + "cell_type": "markdown", + "id": "94b8f2f2", + "metadata": {}, + "source": [ + "#### Azure configurations\n", + "If using Microsoft Azure for this demo, apply these configurations." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "69652a2c", + "metadata": {}, + "outputs": [], + "source": [ + "#This is using a storage account and container with anonymous access enabled.\n", + "spark.conf.set(\"spark.sql.catalog.snowflake_catalog.io-impl\", \"org.apache.iceberg.hadoop.HadoopFileIO\")\n", + "spark.conf.set(\"spark.hadoop.fs.azure.account.key.snowflakeiceberg.blob.core.windows.net\", os.environ['AZURE_ACCESS_KEY'])\n", + "spark.conf.set(\"spark.hadoop.fs.azure.account.auth.type.snowflakeiceberg.blob.core.windows.net\", \"SharedKey\")" + ] + }, + { + "cell_type": "markdown", + "id": "c5304667-f991-4004-8385-0a948f1bc007", + "metadata": {}, + "source": [ + "# Read Snowflake-managed Iceberg Tables" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9f02e0a2-eacc-4627-8016-46b45613c3fd", + "metadata": {}, + "outputs": [], + "source": [ + "spark.sql(\"USE DEMO.PUBLIC\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3df2262d", + "metadata": {}, + "outputs": [], + "source": [ + "spark.sql(\"SHOW TABLES\").show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "adf6b944-ca4f-4c08-9d0c-56931d951875", + "metadata": {}, + "outputs": [], + "source": [ + "df_product_reviews = spark.table(\"demo.public.product_reviews\")\n", + "df_product_reviews.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2cd1bc70", + "metadata": {}, + "outputs": [], + "source": [ + "df_reviews_per_day = df_product_reviews.groupBy(\"review_date\") \\\n", + " .agg(F.countDistinct(\"id\") \\\n", + " .alias(\"num_reviews\"))\n", + "df_reviews_per_day.orderBy(\"review_date\", ascenting=False).show(truncate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "193fb82b", + "metadata": {}, + "outputs": [], + "source": [ + "df_product_sentiment = df_product_reviews.groupBy(\"product_name\") \\\n", + " .agg(F.avg(\"sentiment\") \\\n", + " .alias(\"avg_sentiment\"))\n", + "df_product_sentiment.orderBy(\"avg_sentiment\", ascending=False).show(truncate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10901aca", + "metadata": {}, + "outputs": [], + "source": [ + "df_product_reviews = spark.table(\"demo.public.product_reviews\")\n", + "df_product_reviews.createOrReplaceTempView(\"product_reviews\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "772ceaeb", + "metadata": {}, + "outputs": [], + "source": [ + "jan_df = spark.sql(\"\"\"\n", + " SELECT\n", + " product_name,\n", + " avg(sentiment) as avg_sentiment\n", + " FROM product_reviews\n", + " WHERE MONTH(review_date) = 1\n", + " GROUP BY product_name\n", + " ORDER BY avg_sentiment DESC\n", + "\"\"\")\n", + "jan_df.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "14bdbf1c", + "metadata": {}, + "outputs": [], + "source": [ + "feb_df = spark.sql(\"\"\"\n", + " SELECT\n", + " product_name,\n", + " avg(sentiment) as avg_sentiment\n", + " FROM product_reviews\n", + " WHERE MONTH(review_date) = 2\n", + " GROUP BY product_name\n", + " ORDER BY avg_sentiment DESC\n", + "\"\"\")\n", + "feb_df.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d64f0650", + "metadata": {}, + "outputs": [], + "source": [ + "result_df = jan_df.alias(\"jan\").join(feb_df.alias(\"feb\"), jan_df.product_name == feb_df.product_name, how=\"full_outer\") \\\n", + " .select(\n", + " F.coalesce(F.col(\"jan.product_name\"), F.col(\"feb.product_name\")).alias(\"product_name\"),\n", + " jan_df.avg_sentiment.alias(\"jan_sentiment\"),\n", + " feb_df.avg_sentiment.alias(\"feb_sentiment\")\n", + " ) \\\n", + " .withColumn(\"sentiment_diff\", F.col(\"feb_sentiment\") - F.col(\"jan_sentiment\")) \\\n", + " .orderBy(\"sentiment_diff\", ascending=False)\n", + "result_df.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4ee3f3f0", + "metadata": {}, + "outputs": [], + "source": [ + "# When complete, you can deactivate the environment and remove it by running this from command line\n", + "# conda deactivate\n", + "# conda remove -n iceberg-cortex-demo --all" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From 8992c6f046b2d73d7b98853a31de695def07e9c0 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Tue, 11 Jun 2024 11:38:49 -0700 Subject: [PATCH 20/24] Update spark.ipynb --- samples/iceberg_cortex/spark.ipynb | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/samples/iceberg_cortex/spark.ipynb b/samples/iceberg_cortex/spark.ipynb index ad73a55..4505a85 100644 --- a/samples/iceberg_cortex/spark.ipynb +++ b/samples/iceberg_cortex/spark.ipynb @@ -67,10 +67,7 @@ "outputs": [], "source": [ "# Load the environment variables from the .env file\n", - "\n", - "load_dotenv()\n", - "print(f\"AWS REGION: {os.getenv('AWS_REGION')}\")\n", - "print(f\"PACKAGES: {os.getenv('PACKAGES')}\")\n" + "load_dotenv()\n" ] }, { From e4b4cdcfa75433f37c408b9f77fb16eca251c7a6 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Tue, 11 Jun 2024 11:52:59 -0700 Subject: [PATCH 21/24] Delete samples/iceberg_cortex/spark_env_variables.txt --- samples/iceberg_cortex/spark_env_variables.txt | 8 -------- 1 file changed, 8 deletions(-) delete mode 100644 samples/iceberg_cortex/spark_env_variables.txt diff --git a/samples/iceberg_cortex/spark_env_variables.txt b/samples/iceberg_cortex/spark_env_variables.txt deleted file mode 100644 index a415ef7..0000000 --- a/samples/iceberg_cortex/spark_env_variables.txt +++ /dev/null @@ -1,8 +0,0 @@ -export SPARK_HOME=/Users//anaconda3/envs/iceberg-lab/lib/python3.11/site-packages/pyspark -export SNOWFLAKE_CATALOG_URI=jdbc:snowflake://.snowflakecomputing.com -export SNOWFLAKE_ROLE=ACCOUNTADMIN -export SNOWFLAKE_USERNAME= -export PACKAGES=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,net.snowflake:snowflake-jdbc:3.14.2,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160 -export AWS_REGION= -export AWS_ACCESS_KEY_ID= -export AWS_SECRET_ACCESS_KEY= From c7b1def439ecabdae55f223bff21cfe2da606e74 Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Wed, 10 Jul 2024 13:06:10 -0700 Subject: [PATCH 22/24] Update snowflake.ipynb --- samples/iceberg_cortex/snowflake.ipynb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/iceberg_cortex/snowflake.ipynb b/samples/iceberg_cortex/snowflake.ipynb index 9d6bc59..3a872bb 100644 --- a/samples/iceberg_cortex/snowflake.ipynb +++ b/samples/iceberg_cortex/snowflake.ipynb @@ -243,8 +243,8 @@ "codeCollapsed": false }, "outputs": [], - "source": "import streamlit as st\n\nst.bar_chart(sql_reviews.to_df(), x='SENTIMENT_DIFF', y='PRODUCT_NAME')", + "source": "import streamlit as st\n\nst.bar_chart(sql_reviews.to_df(), x='PRODUCT_NAME', y='SENTIMENT_DIFF')", "execution_count": null } ] -} \ No newline at end of file +} From ebef3ad687a7ac6117334509915484ffbca286bb Mon Sep 17 00:00:00 2001 From: Scott Teal Date: Wed, 10 Jul 2024 13:07:14 -0700 Subject: [PATCH 23/24] Update snowflake.ipynb --- samples/iceberg_cortex/snowflake.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/iceberg_cortex/snowflake.ipynb b/samples/iceberg_cortex/snowflake.ipynb index 3a872bb..5bc89e3 100644 --- a/samples/iceberg_cortex/snowflake.ipynb +++ b/samples/iceberg_cortex/snowflake.ipynb @@ -145,7 +145,7 @@ "codeCollapsed": false }, "outputs": [], - "source": "SELECT\n product_name,\n review_date,\n snowflake.cortex.sentiment(review) as review_sentiment\nFROM demo.public.product_reviews", + "source": "SELECT\n id,\n product_name,\n review_date,\n snowflake.cortex.sentiment(review) as review_sentiment\nFROM demo.public.product_reviews", "execution_count": null }, { From 59ab18ef432855097c7618c95ed778cab8eeb818 Mon Sep 17 00:00:00 2001 From: scottteal Date: Tue, 19 Nov 2024 11:55:36 -0800 Subject: [PATCH 24/24] updates for solution center audit --- samples/iceberg_cortex/.env | 2 +- samples/iceberg_cortex/auth.json | 5 - samples/iceberg_cortex/demo.sql | 96 --------- samples/iceberg_cortex/snowflake.ipynb | 103 +++++----- samples/iceberg_cortex/snowpark.ipynb | 189 ------------------ samples/iceberg_cortex/spark.ipynb | 2 +- .../iceberg_cortex/spark_env_variables.txt | 8 - 7 files changed, 58 insertions(+), 347 deletions(-) delete mode 100644 samples/iceberg_cortex/auth.json delete mode 100644 samples/iceberg_cortex/demo.sql delete mode 100644 samples/iceberg_cortex/snowpark.ipynb delete mode 100644 samples/iceberg_cortex/spark_env_variables.txt diff --git a/samples/iceberg_cortex/.env b/samples/iceberg_cortex/.env index 2678eb3..9095920 100644 --- a/samples/iceberg_cortex/.env +++ b/samples/iceberg_cortex/.env @@ -1,8 +1,8 @@ -SNOWFLAKE_ACCOUNT="" SNOWFLAKE_ROLE="ACCOUNTADMIN" SNOWFLAKE_USERNAME="" SNOWFLAKE_PASSWORD="" SNOWFLAKE_CATALOG_URI="jdbc:snowflake://.snowflakecomputing.com" +# If using MFA for your Snowflake user, add these parameters to the URI ?user=&passcode=" # If using Amazon S3 for storage, then include these variables: PACKAGES="org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,net.snowflake:snowflake-jdbc:3.14.2,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160" diff --git a/samples/iceberg_cortex/auth.json b/samples/iceberg_cortex/auth.json deleted file mode 100644 index 86e89ef..0000000 --- a/samples/iceberg_cortex/auth.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "username": "", - "password": "", - "account": "" -} \ No newline at end of file diff --git a/samples/iceberg_cortex/demo.sql b/samples/iceberg_cortex/demo.sql deleted file mode 100644 index 253a935..0000000 --- a/samples/iceberg_cortex/demo.sql +++ /dev/null @@ -1,96 +0,0 @@ --- Create a separate database and warehouse for demo -CREATE OR REPLACE DATABASE demo; -CREATE OR REPLACE WAREHOUSE demo_wh; -USE DATABASE demo; -USE WAREHOUSE demo_wh; - --- External Volume named s3_vol created in advance -CREATE OR REPLACE EXTERNAL VOLUME s3_vol - STORAGE_LOCATIONS = - ( - ( - NAME = '' - STORAGE_PROVIDER = 'S3' - STORAGE_BASE_URL = 's3:///' - STORAGE_AWS_ROLE_ARN = 'arn:aws:iam:::role/' - ) - ); - --- Create a Snowflake-managed Iceberg table that writes to Amazon S3 -CREATE OR REPLACE ICEBERG TABLE demo.public.product_reviews ( - id STRING, - product_name STRING, - product_id STRING, - reviewer_name STRING, - review_date DATE, - review STRING -) - CATALOG = 'SNOWFLAKE' - EXTERNAL_VOLUME = 's3_vol' - BASE_LOCATION = 'demo/product_reviews/' -; -SELECT * FROM demo.public.product_reviews LIMIT 10; - --- Create a file format to specify how CSVs should be parsed -CREATE OR REPLACE FILE FORMAT csv_ff - TYPE = 'CSV' - FIELD_OPTIONALLY_ENCLOSED_BY = '"' - SKIP_HEADER = 1; - --- Create a stage to store the CSV files -CREATE OR REPLACE STAGE demo.public.files - FILE_FORMAT = csv_ff - DIRECTORY = (ENABLE = TRUE); - --- Create a stream to capture new records in the Iceberg table -CREATE STREAM product_reviews_stream ON TABLE product_reviews; - --- Create task to process new records with Cortex sentiment LLM function -CREATE OR REPLACE TASK demo.public.cortex_sentiment_score - SCHEDULE = 'USING CRON 0 0 * * * America/Los_Angeles' - USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL' -AS -UPDATE demo.public.product_reviews AS pr - SET sentiment = snowflake.cortex.sentiment(prs.review) - FROM demo.public.product_reviews_stream AS prs - WHERE prs.id = pr.id; - --- Use UI to create a reader account --- Use UI to create a share with reader account and add both secure views to share - --- Query --- For each product, what was the change in sentiment from January to February? -WITH jan AS ( - SELECT - product_name, - AVG(sentiment) AS avg_sentiment - FROM demo.public.product_reviews - WHERE MONTHNAME(review_date) = 'Jan' - GROUP BY 1 -) -, feb AS ( - SELECT - product_name, - AVG(sentiment) AS avg_sentiment - FROM demo.public.product_reviews - WHERE MONTHNAME(review_date) = 'Feb' - GROUP BY 1 -) -SELECT - COALESCE(j.product_name, f.product_name) AS product_name, - j.avg_sentiment AS jan_sentiment, - f.avg_sentiment AS feb_sentiment, - feb_sentiment - jan_sentiment AS sentiment_diff -FROM jan j -FULL OUTER JOIN feb f - ON j.product_name = f.product_name -ORDER BY sentiment_diff DESC; - --- Reload -DELETE FROM demo.public.product_reviews - WHERE REVIEW_DATE >= DATE('2024-02-01'); - --- Cleanup -DROP DATABASE demo; -DROP WAREHOUSE demo_wh; -DROP EXTERNAL VOLUME s3_vol; diff --git a/samples/iceberg_cortex/snowflake.ipynb b/samples/iceberg_cortex/snowflake.ipynb index 5bc89e3..9026299 100644 --- a/samples/iceberg_cortex/snowflake.ipynb +++ b/samples/iceberg_cortex/snowflake.ipynb @@ -13,7 +13,8 @@ "id": "67b56ef8-8cd2-440e-930d-15190fe5da6e", "metadata": { "name": "md_setup_ev", - "collapsed": false + "collapsed": false, + "resultHeight": 496 }, "source": "# Setup Snowflake\n## Create an External Volume\nTo create an external volume, complete the instructions for your cloud storage service:\n- [Accessing Amazon S3 using external volumes](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume-s3)\n- [Accessing Microsoft Azure Storage using external volumes](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume-azure)\n\nRemember from the Overview section, your Snowflake account must be in the same region as your external volume location. And to use the Sentiment LLM function, [supported regions](https://docs.snowflake.com/en/user-guide/snowflake-cortex/llm-functions#availability) currently include:\n- AWS US West 2 (Oregon)\n- AWS US East 1 (N. Virginia)\n- AWS Europe Central 1 (Frankfurt)\n- Azure East US 2 (Virginia)\n- Azure West Europe (Netherlands)" }, @@ -23,39 +24,20 @@ "metadata": { "language": "sql", "name": "sql_create_ev", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 0 }, "outputs": [], "source": "-- Use accountadmin role to create an external volume\nUSE ROLE accountadmin;\n\n-- Create an external volume\nCREATE OR REPLACE EXTERNAL VOLUME iceberg_cortex_vol\n STORAGE_LOCATIONS =\n (\n (\n NAME = ''\n STORAGE_PROVIDER = ''\n STORAGE_BASE_URL = ''\n \n STORAGE_AWS_ROLE_ARN = ''\n STORAGE_AWS_EXTERNAL_ID = ''\n\n AZURE_TENANT_ID = ''\n )\n );", "execution_count": null }, - { - "cell_type": "markdown", - "id": "9f76cdb8-b8a9-4034-88ef-683cf1d54068", - "metadata": { - "name": "md_iceberg_catalogs", - "collapsed": false - }, - "source": "## Create an Iceberg Table\nIceberg Tables can currently use Snowflake, AWS Glue, or object storage as the catalog. In public preview soon, Snowflake can use catalog integration with an Iceberg REST endpoint. In this quickstart, use Snowflake as the catalog to allow read and write operations to the table. More information about integrating catalogs can be found [here](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration).\n\nCreate an Iceberg Table referencing the external volume you just created. You can specify `BASE_LOCATION` to instruct Snowflake where to write table data and metadata, or leave empty to write data and metadata to the location specified in the external volume definition." - }, - { - "cell_type": "code", - "id": "8d50cbf4-0c8d-4950-86cb-114990437ac9", - "metadata": { - "language": "sql", - "name": "sql_create_iceberg_table", - "codeCollapsed": false - }, - "source": "CREATE OR REPLACE ICEBERG TABLE demo.public.product_reviews (\n id STRING,\n product_name STRING,\n product_id STRING,\n reviewer_name STRING,\n review_date DATE,\n review STRING,\n sentiment FLOAT\n)\n CATALOG = 'SNOWFLAKE'\n EXTERNAL_VOLUME = 'iceberg_cortex_vol'\n BASE_LOCATION = 'demo/product_reviews/'\n;", - "execution_count": null, - "outputs": [] - }, { "cell_type": "markdown", "id": "9b89570d-c063-44f2-9c34-2d8e229dbb9c", "metadata": { "name": "md_load", - "collapsed": false + "collapsed": false, + "resultHeight": 288 }, "source": "# Load CSV files into Iceberg via Snowpark Python\nThere are multiple ways to load new data into Snowflake-managed Iceberg Tables including INSERT, [COPY INTO](https://docs.snowflake.com/en/sql-reference/sql/copy-into-table), and [Snowpipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-auto).\n\nFor this quickstart, we will use Snowpark to write CSV files from dataframes into the Iceberg Table. Snowflake will write Parquet files and Iceberg metadata to your external volume.\n\nFirst, create an external stage and file format." }, @@ -65,7 +47,8 @@ "metadata": { "language": "sql", "name": "sql_file_setup", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 0 }, "outputs": [], "source": "-- Create a file format\nCREATE OR REPLACE FILE FORMAT demo.public.csv_ff\n TYPE = 'CSV'\n FIELD_OPTIONALLY_ENCLOSED_BY = '\"'\n SKIP_HEADER = 1;\n\n-- Create an external stage to read CSV files from an S3 bucket in-place\nCREATE OR REPLACE STAGE demo.public.files\n URL = 's3://sfquickstarts/iceberg_cortex/'\n FILE_FORMAT = demo.public.csv_ff\n DIRECTORY = (ENABLE = TRUE);", @@ -77,9 +60,10 @@ "metadata": { "language": "python", "name": "imports", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 0 }, - "source": "# Import necessary modules and create a session\nimport json|\nfrom snowflake.snowpark import Session\nimport snowflake.snowpark.types as T\n\nsession = get_active_session()", + "source": "# Import necessary modules and create a session\nimport json\nfrom snowflake.snowpark import Session\nimport snowflake.snowpark.types as T\n\nsession = get_active_session()", "execution_count": null, "outputs": [] }, @@ -88,7 +72,8 @@ "id": "e97bfc44-4b44-4b55-8773-def2557abec9", "metadata": { "language": "python", - "name": "py_create_schema" + "name": "py_create_schema", + "resultHeight": 0 }, "outputs": [], "source": "# Create a schema Snowpark dataframe matching the CSV files\nreviews_schema = T.StructType([T.StructField(\"ID\", T.StringType()),\n T.StructField(\"PRODUCT_NAME\", T.StringType()),\n T.StructField(\"PRODUCT_ID\", T.StringType()),\n T.StructField(\"REVIEWER_NAME\", T.StringType()),\n T.StructField(\"REVIEW_DATE\", T.DateType()),\n T.StructField(\"REVIEW\", T.StringType()),\n T.StructField(\"SENTIMENT\", T.FloatType())])", @@ -100,22 +85,34 @@ "metadata": { "language": "python", "name": "py_jan_df", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 351 }, "outputs": [], "source": "# Read the January product reviews into a dataframe\njan_df = session.read \\\n .schema(reviews_schema) \\\n .option(\"skip_header\", 1) \\\n .option(\"field_optionally_enclosed_by\", '\"') \\\n .csv(\"@demo.public.files/product_reviews_jan_24.csv\")\n\n# View the dataframe\njan_df.show()", "execution_count": null }, + { + "cell_type": "markdown", + "id": "23fa7d27-7ef7-4090-ab0d-36fd33247971", + "metadata": { + "name": "md_iceberg_catalogs", + "collapsed": false, + "resultHeight": 186 + }, + "source": "Iceberg Tables can currently use Snowflake, Iceberg REST catalogs, AWS Glue, or object storage as the catalog. In this quickstart, use Snowflake as the catalog to allow read and write operations to the table. More information about integrating catalogs can be found [here](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration).\n\nCreate an Iceberg Table referencing the external volume you just created. You can specify `BASE_LOCATION` to instruct Snowflake where to write table data and metadata, or leave empty to write data and metadata to the location specified in the external volume definition." + }, { "cell_type": "code", "id": "c3d25722-6560-4716-b3e6-d558bd1415ed", "metadata": { "language": "python", "name": "py_jan_df_write", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 0 }, "outputs": [], - "source": "# Write the dataframe to the Iceberg Table\njan_df.write.mode(\"append\").save_as_table(\"demo.public.product_reviews\")", + "source": "# Write the dataframe to the Iceberg Table\njan_df.write \\\n .mode(\"overwrite\") \\\n .save_as_table(\"demo.public.product_reviews\",\n iceberg_config={\n \"external_volume\":\"iceberg_cortex_vol\",\n \"catalog\":\"snowflake\",\n \"base_location\":\"demo/product_reviews/\"\n }\n )", "execution_count": null }, { @@ -123,16 +120,18 @@ "id": "d22fa91f-31c3-48ba-bbf8-a889ab8ccbc0", "metadata": { "name": "md_load_complete", - "collapsed": false + "collapsed": false, + "resultHeight": 67 }, - "source": "You now see metadata files and Parquet data files in your object storage, whether you’re using Amazon S3 or Azure storage.\n\n![iceberg_files](https://github.com/Snowflake-Labs/sfquickstarts/blob/master/site/sfguides/src/cortex_ai_sentiment_iceberg/assets/iceberg_files.png)" + "source": "You now see metadata files and Parquet data files in your object storage, whether you’re using Amazon S3 or Azure storage." }, { "cell_type": "markdown", "id": "cfe9fb1f-5f0c-4547-9dca-66c5c0f6d1a1", "metadata": { "name": "md_cortex", - "collapsed": false + "collapsed": false, + "resultHeight": 141 }, "source": "# Snowflake Cortex LLM Functions\nNow you can query the Iceberg Table using LLM functions from Snowflake Cortex AI. Run the query below to calculate sentiment scores for product reviews." }, @@ -141,8 +140,9 @@ "id": "300f04d3-98c4-4697-976a-267b7fb7914c", "metadata": { "language": "sql", - "name": "sql_reviews_select", - "codeCollapsed": false + "name": "sql_reviews_jan", + "codeCollapsed": false, + "resultHeight": 439 }, "outputs": [], "source": "SELECT\n id,\n product_name,\n review_date,\n snowflake.cortex.sentiment(review) as review_sentiment\nFROM demo.public.product_reviews", @@ -154,7 +154,8 @@ "metadata": { "language": "sql", "name": "sql_update", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 112 }, "outputs": [], "source": "-- Write the sentiment scores back to the Iceberg Table.\nUPDATE demo.public.product_reviews AS pr\n SET sentiment = jan.review_sentiment\n FROM {{sql_reviews_jan}} AS jan\n WHERE jan.id = pr.id;", @@ -165,7 +166,8 @@ "id": "e131af80-db4c-4de4-8fef-3ba4f9ee163c", "metadata": { "name": "md_create_pipeline", - "collapsed": false + "collapsed": false, + "resultHeight": 327 }, "source": "# Create a CDC Pipeline\nSuppose new product reviews continue to be generated, stored as new CSV files, and you'd like to use Snowflake to automatically compute sentiment scores on new product reviews.\n\n[Streams on Directory Tables](https://docs.snowflake.com/en/user-guide/data-load-dirtables-pipeline) can detect new files in stages, perform computation, and store results. LLM functions from Snowflake Cortex can be called in these pipelines, writing results to Iceberg Tables.\n\nTo simulate this, create a Stream on the Iceberg Table to detect new product reviews loaded to the table. On a schedule, a Serverless Task will call the SENTIMENT function on to incrementally process new records." }, @@ -175,10 +177,11 @@ "metadata": { "language": "sql", "name": "sql_create_pipeline", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 112 }, "outputs": [], - "source": "-- Create a Stream to detect new product review records in the Iceberg Table\nCREATE STREAM demo.public.product_reviews_stream ON TABLE demo.public.product_reviews;\n\n-- Create a Serverless Task to add sentiment for new records from the Stream\nCREATE OR REPLACE TASK demo.public.cortex_sentiment_score\n SCHEDULE = 'USING CRON 0 0 * * * America/Los_Angeles'\n USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'\nAS\nUPDATE demo.public.product_reviews AS pr\n SET sentiment = snowflake.cortex.sentiment(prs.review)\n FROM demo.public.product_reviews_stream AS prs\n WHERE prs.id = pr.id;", + "source": "-- Create a Stream to detect new product review records in the Iceberg Table\nCREATE OR REPLACE STREAM demo.public.product_reviews_stream ON TABLE demo.public.product_reviews;\n\n-- Create a Serverless Task to add sentiment for new records from the Stream\nCREATE OR REPLACE TASK demo.public.cortex_sentiment_score\n SCHEDULE = 'USING CRON 0 0 * * * America/Los_Angeles'\n USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'\nAS\nUPDATE demo.public.product_reviews AS pr\n SET sentiment = snowflake.cortex.sentiment(prs.review)\n FROM demo.public.product_reviews_stream AS prs\n WHERE prs.id = pr.id;", "execution_count": null }, { @@ -186,7 +189,8 @@ "id": "8080fa4d-3b6c-469e-8e0e-f502e707f053", "metadata": { "name": "md_feb_df", - "collapsed": false + "collapsed": false, + "resultHeight": 67 }, "source": "Now see the incremental processing pipeline in action. Create a dataframe for February product reviews and write it to the Iceberg Table." }, @@ -196,7 +200,8 @@ "metadata": { "language": "python", "name": "py_feb_df", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 0 }, "outputs": [], "source": "feb_df = session.read \\\n .schema(reviews_schema) \\\n .option(\"skip_header\", 1) \\\n .option(\"field_optionally_enclosed_by\", '\"') \\\n .csv(\"@demo.public.files/product_reviews_feb_24.csv\")\n\nfeb_df.write.mode(\"append\").save_as_table(\"demo.public.product_reviews\")", @@ -207,7 +212,8 @@ "id": "04f10f72-e47c-4662-abbb-a880b37d3bf4", "metadata": { "name": "md_reviews", - "collapsed": false + "collapsed": false, + "resultHeight": 159 }, "source": "The Task will execute on the specified schedule. Manually trigger the task to calculate sentiment scores for February product reviews, writing the results back to the Iceberg Table. Now, you should see the February product reviews and sentiment scores.\n\nFor example, for each product, what was the change in sentiment from January to February? Run the query below." }, @@ -216,7 +222,8 @@ "id": "113cd2e4-335e-4d77-9667-89fc2ca452f1", "metadata": { "language": "sql", - "name": "run_task" + "name": "run_task", + "resultHeight": 112 }, "outputs": [], "source": "-- Manually trigger Task\nEXECUTE task demo.public.cortex_sentiment_score;", @@ -228,7 +235,8 @@ "metadata": { "language": "sql", "name": "sql_reviews", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 322 }, "outputs": [], "source": "-- Sentiment change from January to February\nWITH jan AS (\n SELECT\n product_name,\n AVG(sentiment) AS avg_sentiment\n FROM demo.public.product_reviews\n WHERE MONTHNAME(review_date) = 'Jan'\n GROUP BY 1\n)\n, feb AS (\n SELECT\n product_name,\n AVG(sentiment) AS avg_sentiment\n FROM demo.public.product_reviews\n WHERE MONTHNAME(review_date) = 'Feb'\n GROUP BY 1\n)\nSELECT\n COALESCE(j.product_name, f.product_name) AS product_name,\n j.avg_sentiment AS jan_sentiment,\n f.avg_sentiment AS feb_sentiment,\n feb_sentiment - jan_sentiment AS sentiment_diff\nFROM jan j\nFULL OUTER JOIN feb f\n ON j.product_name = f.product_name\nORDER BY sentiment_diff DESC;", @@ -240,11 +248,12 @@ "metadata": { "language": "python", "name": "py_chart", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 373 }, "outputs": [], "source": "import streamlit as st\n\nst.bar_chart(sql_reviews.to_df(), x='PRODUCT_NAME', y='SENTIMENT_DIFF')", "execution_count": null } ] -} +} \ No newline at end of file diff --git a/samples/iceberg_cortex/snowpark.ipynb b/samples/iceberg_cortex/snowpark.ipynb deleted file mode 100644 index 37a4c58..0000000 --- a/samples/iceberg_cortex/snowpark.ipynb +++ /dev/null @@ -1,189 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Import Packages\n", - "Just like the Python packages we are importing, we will import the Snowpark modules that we need." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import json\n", - "from snowflake.snowpark import Session\n", - "import snowflake.snowpark.types as T" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Connect to Snowflake\n", - "Use these parameters and our Snowflake account credentials to connect to Snowflake and create a Snowpark session." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Get account credentials from a json file\n", - "with open(\"auth.json\") as f:\n", - " data = json.load(f)\n", - " username = data[\"username\"]\n", - " password = data[\"password\"]\n", - " account = data[\"account\"]\n", - " f.close()\n", - "\n", - "# Specify connection parameters\n", - "connection_parameters = {\n", - " \"account\": account,\n", - " \"user\": username,\n", - " \"password\": password,\n", - " \"role\": \"accountadmin\",\n", - " \"warehouse\": \"demo_wh\",\n", - " \"database\": \"demo\",\n", - " \"schema\": \"public\",\n", - "}\n", - "\n", - "# Create Snowpark session\n", - "session = Session.builder.configs(connection_parameters).create()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Upload CSVs\n", - "Upload product review CSVs to Snowflake internal stage." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "try:\n", - " put_results = session.file.put(\n", - " \"product_reviews_*.csv\",\n", - " \"@demo.public.files\",\n", - " overwrite=False,\n", - " auto_compress=False)\n", - " for r in put_results:\n", - " str_output = (\"File {src}: {status}\").format(src = r.source, status=r.status)\n", - " print(str_output)\n", - "except Exception as e:\n", - " print(e)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create Snowpark DataFrame and write to Iceberg table" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Define schema for CSV file\n", - "reviews_schema = T.StructType([T.StructField(\"ID\", T.StringType()),\n", - " T.StructField(\"PRODUCT_NAME\", T.StringType()),\n", - " T.StructField(\"PRODUCT_ID\", T.StringType()),\n", - " T.StructField(\"REVIEWER_NAME\", T.StringType()),\n", - " T.StructField(\"REVIEW_DATE\", T.DateType()),\n", - " T.StructField(\"REVIEW\", T.StringType()),\n", - " T.StructField(\"SENTIMENT\", T.FloatType())])" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Read January reviews CSV into DataFrame \n", - "jan_df = session.read \\\n", - " .schema(reviews_schema) \\\n", - " .option(\"skip_header\", 1) \\\n", - " .option(\"field_optionally_enclosed_by\", '\"') \\\n", - " .csv(\"@demo.public.files/product_reviews_jan_24.csv\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Preview January DataFrame\n", - "jan_df.show()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Write January product reviews to Snowflake-managed Iceberg Table\n", - "jan_df.write.mode(\"append\").save_as_table(\"demo.public.product_reviews\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Read February reviews CSV into DataFrame \n", - "feb_df = session.read \\\n", - " .schema(reviews_schema) \\\n", - " .option(\"skip_header\", 1) \\\n", - " .option(\"field_optionally_enclosed_by\", '\"') \\\n", - " .csv(\"@demo.public.files/product_reviews_feb_24.csv\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Write February product reviews to Snowflake-managed Iceberg Table\n", - "feb_df.write.mode(\"append\").save_as_table(\"demo.public.product_reviews\")" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "py38_env_forrester", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.18" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/samples/iceberg_cortex/spark.ipynb b/samples/iceberg_cortex/spark.ipynb index 4505a85..366d600 100644 --- a/samples/iceberg_cortex/spark.ipynb +++ b/samples/iceberg_cortex/spark.ipynb @@ -67,7 +67,7 @@ "outputs": [], "source": [ "# Load the environment variables from the .env file\n", - "load_dotenv()\n" + "load_dotenv('env.txt')" ] }, { diff --git a/samples/iceberg_cortex/spark_env_variables.txt b/samples/iceberg_cortex/spark_env_variables.txt deleted file mode 100644 index a415ef7..0000000 --- a/samples/iceberg_cortex/spark_env_variables.txt +++ /dev/null @@ -1,8 +0,0 @@ -export SPARK_HOME=/Users//anaconda3/envs/iceberg-lab/lib/python3.11/site-packages/pyspark -export SNOWFLAKE_CATALOG_URI=jdbc:snowflake://.snowflakecomputing.com -export SNOWFLAKE_ROLE=ACCOUNTADMIN -export SNOWFLAKE_USERNAME= -export PACKAGES=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,net.snowflake:snowflake-jdbc:3.14.2,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160 -export AWS_REGION= -export AWS_ACCESS_KEY_ID= -export AWS_SECRET_ACCESS_KEY=