diff --git a/samples/iceberg_cortex/.env b/samples/iceberg_cortex/.env index 2678eb3..9095920 100644 --- a/samples/iceberg_cortex/.env +++ b/samples/iceberg_cortex/.env @@ -1,8 +1,8 @@ -SNOWFLAKE_ACCOUNT="" SNOWFLAKE_ROLE="ACCOUNTADMIN" SNOWFLAKE_USERNAME="" SNOWFLAKE_PASSWORD="" SNOWFLAKE_CATALOG_URI="jdbc:snowflake://.snowflakecomputing.com" +# If using MFA for your Snowflake user, add these parameters to the URI ?user=&passcode=" # If using Amazon S3 for storage, then include these variables: PACKAGES="org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,net.snowflake:snowflake-jdbc:3.14.2,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160" diff --git a/samples/iceberg_cortex/auth.json b/samples/iceberg_cortex/auth.json deleted file mode 100644 index 86e89ef..0000000 --- a/samples/iceberg_cortex/auth.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "username": "", - "password": "", - "account": "" -} \ No newline at end of file diff --git a/samples/iceberg_cortex/demo.sql b/samples/iceberg_cortex/demo.sql deleted file mode 100644 index 253a935..0000000 --- a/samples/iceberg_cortex/demo.sql +++ /dev/null @@ -1,96 +0,0 @@ --- Create a separate database and warehouse for demo -CREATE OR REPLACE DATABASE demo; -CREATE OR REPLACE WAREHOUSE demo_wh; -USE DATABASE demo; -USE WAREHOUSE demo_wh; - --- External Volume named s3_vol created in advance -CREATE OR REPLACE EXTERNAL VOLUME s3_vol - STORAGE_LOCATIONS = - ( - ( - NAME = '' - STORAGE_PROVIDER = 'S3' - STORAGE_BASE_URL = 's3:///' - STORAGE_AWS_ROLE_ARN = 'arn:aws:iam:::role/' - ) - ); - --- Create a Snowflake-managed Iceberg table that writes to Amazon S3 -CREATE OR REPLACE ICEBERG TABLE demo.public.product_reviews ( - id STRING, - product_name STRING, - product_id STRING, - reviewer_name STRING, - review_date DATE, - review STRING -) - CATALOG = 'SNOWFLAKE' - EXTERNAL_VOLUME = 's3_vol' - BASE_LOCATION = 'demo/product_reviews/' -; -SELECT * FROM demo.public.product_reviews LIMIT 10; - --- Create a file format to specify how CSVs should be parsed -CREATE OR REPLACE FILE FORMAT csv_ff - TYPE = 'CSV' - FIELD_OPTIONALLY_ENCLOSED_BY = '"' - SKIP_HEADER = 1; - --- Create a stage to store the CSV files -CREATE OR REPLACE STAGE demo.public.files - FILE_FORMAT = csv_ff - DIRECTORY = (ENABLE = TRUE); - --- Create a stream to capture new records in the Iceberg table -CREATE STREAM product_reviews_stream ON TABLE product_reviews; - --- Create task to process new records with Cortex sentiment LLM function -CREATE OR REPLACE TASK demo.public.cortex_sentiment_score - SCHEDULE = 'USING CRON 0 0 * * * America/Los_Angeles' - USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL' -AS -UPDATE demo.public.product_reviews AS pr - SET sentiment = snowflake.cortex.sentiment(prs.review) - FROM demo.public.product_reviews_stream AS prs - WHERE prs.id = pr.id; - --- Use UI to create a reader account --- Use UI to create a share with reader account and add both secure views to share - --- Query --- For each product, what was the change in sentiment from January to February? -WITH jan AS ( - SELECT - product_name, - AVG(sentiment) AS avg_sentiment - FROM demo.public.product_reviews - WHERE MONTHNAME(review_date) = 'Jan' - GROUP BY 1 -) -, feb AS ( - SELECT - product_name, - AVG(sentiment) AS avg_sentiment - FROM demo.public.product_reviews - WHERE MONTHNAME(review_date) = 'Feb' - GROUP BY 1 -) -SELECT - COALESCE(j.product_name, f.product_name) AS product_name, - j.avg_sentiment AS jan_sentiment, - f.avg_sentiment AS feb_sentiment, - feb_sentiment - jan_sentiment AS sentiment_diff -FROM jan j -FULL OUTER JOIN feb f - ON j.product_name = f.product_name -ORDER BY sentiment_diff DESC; - --- Reload -DELETE FROM demo.public.product_reviews - WHERE REVIEW_DATE >= DATE('2024-02-01'); - --- Cleanup -DROP DATABASE demo; -DROP WAREHOUSE demo_wh; -DROP EXTERNAL VOLUME s3_vol; diff --git a/samples/iceberg_cortex/snowflake.ipynb b/samples/iceberg_cortex/snowflake.ipynb index 5bc89e3..37a8066 100644 --- a/samples/iceberg_cortex/snowflake.ipynb +++ b/samples/iceberg_cortex/snowflake.ipynb @@ -13,7 +13,8 @@ "id": "67b56ef8-8cd2-440e-930d-15190fe5da6e", "metadata": { "name": "md_setup_ev", - "collapsed": false + "collapsed": false, + "resultHeight": 496 }, "source": "# Setup Snowflake\n## Create an External Volume\nTo create an external volume, complete the instructions for your cloud storage service:\n- [Accessing Amazon S3 using external volumes](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume-s3)\n- [Accessing Microsoft Azure Storage using external volumes](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume-azure)\n\nRemember from the Overview section, your Snowflake account must be in the same region as your external volume location. And to use the Sentiment LLM function, [supported regions](https://docs.snowflake.com/en/user-guide/snowflake-cortex/llm-functions#availability) currently include:\n- AWS US West 2 (Oregon)\n- AWS US East 1 (N. Virginia)\n- AWS Europe Central 1 (Frankfurt)\n- Azure East US 2 (Virginia)\n- Azure West Europe (Netherlands)" }, @@ -23,39 +24,20 @@ "metadata": { "language": "sql", "name": "sql_create_ev", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 0 }, "outputs": [], "source": "-- Use accountadmin role to create an external volume\nUSE ROLE accountadmin;\n\n-- Create an external volume\nCREATE OR REPLACE EXTERNAL VOLUME iceberg_cortex_vol\n STORAGE_LOCATIONS =\n (\n (\n NAME = ''\n STORAGE_PROVIDER = ''\n STORAGE_BASE_URL = ''\n \n STORAGE_AWS_ROLE_ARN = ''\n STORAGE_AWS_EXTERNAL_ID = ''\n\n AZURE_TENANT_ID = ''\n )\n );", "execution_count": null }, - { - "cell_type": "markdown", - "id": "9f76cdb8-b8a9-4034-88ef-683cf1d54068", - "metadata": { - "name": "md_iceberg_catalogs", - "collapsed": false - }, - "source": "## Create an Iceberg Table\nIceberg Tables can currently use Snowflake, AWS Glue, or object storage as the catalog. In public preview soon, Snowflake can use catalog integration with an Iceberg REST endpoint. In this quickstart, use Snowflake as the catalog to allow read and write operations to the table. More information about integrating catalogs can be found [here](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration).\n\nCreate an Iceberg Table referencing the external volume you just created. You can specify `BASE_LOCATION` to instruct Snowflake where to write table data and metadata, or leave empty to write data and metadata to the location specified in the external volume definition." - }, - { - "cell_type": "code", - "id": "8d50cbf4-0c8d-4950-86cb-114990437ac9", - "metadata": { - "language": "sql", - "name": "sql_create_iceberg_table", - "codeCollapsed": false - }, - "source": "CREATE OR REPLACE ICEBERG TABLE demo.public.product_reviews (\n id STRING,\n product_name STRING,\n product_id STRING,\n reviewer_name STRING,\n review_date DATE,\n review STRING,\n sentiment FLOAT\n)\n CATALOG = 'SNOWFLAKE'\n EXTERNAL_VOLUME = 'iceberg_cortex_vol'\n BASE_LOCATION = 'demo/product_reviews/'\n;", - "execution_count": null, - "outputs": [] - }, { "cell_type": "markdown", "id": "9b89570d-c063-44f2-9c34-2d8e229dbb9c", "metadata": { "name": "md_load", - "collapsed": false + "collapsed": false, + "resultHeight": 288 }, "source": "# Load CSV files into Iceberg via Snowpark Python\nThere are multiple ways to load new data into Snowflake-managed Iceberg Tables including INSERT, [COPY INTO](https://docs.snowflake.com/en/sql-reference/sql/copy-into-table), and [Snowpipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-auto).\n\nFor this quickstart, we will use Snowpark to write CSV files from dataframes into the Iceberg Table. Snowflake will write Parquet files and Iceberg metadata to your external volume.\n\nFirst, create an external stage and file format." }, @@ -65,7 +47,8 @@ "metadata": { "language": "sql", "name": "sql_file_setup", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 0 }, "outputs": [], "source": "-- Create a file format\nCREATE OR REPLACE FILE FORMAT demo.public.csv_ff\n TYPE = 'CSV'\n FIELD_OPTIONALLY_ENCLOSED_BY = '\"'\n SKIP_HEADER = 1;\n\n-- Create an external stage to read CSV files from an S3 bucket in-place\nCREATE OR REPLACE STAGE demo.public.files\n URL = 's3://sfquickstarts/iceberg_cortex/'\n FILE_FORMAT = demo.public.csv_ff\n DIRECTORY = (ENABLE = TRUE);", @@ -77,9 +60,10 @@ "metadata": { "language": "python", "name": "imports", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 0 }, - "source": "# Import necessary modules and create a session\nimport json|\nfrom snowflake.snowpark import Session\nimport snowflake.snowpark.types as T\n\nsession = get_active_session()", + "source": "# Import necessary modules and create a session\nimport json\nfrom snowflake.snowpark import Session\nimport snowflake.snowpark.types as T\n\nsession = get_active_session()", "execution_count": null, "outputs": [] }, @@ -88,7 +72,8 @@ "id": "e97bfc44-4b44-4b55-8773-def2557abec9", "metadata": { "language": "python", - "name": "py_create_schema" + "name": "py_create_schema", + "resultHeight": 0 }, "outputs": [], "source": "# Create a schema Snowpark dataframe matching the CSV files\nreviews_schema = T.StructType([T.StructField(\"ID\", T.StringType()),\n T.StructField(\"PRODUCT_NAME\", T.StringType()),\n T.StructField(\"PRODUCT_ID\", T.StringType()),\n T.StructField(\"REVIEWER_NAME\", T.StringType()),\n T.StructField(\"REVIEW_DATE\", T.DateType()),\n T.StructField(\"REVIEW\", T.StringType()),\n T.StructField(\"SENTIMENT\", T.FloatType())])", @@ -100,22 +85,34 @@ "metadata": { "language": "python", "name": "py_jan_df", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 351 }, "outputs": [], "source": "# Read the January product reviews into a dataframe\njan_df = session.read \\\n .schema(reviews_schema) \\\n .option(\"skip_header\", 1) \\\n .option(\"field_optionally_enclosed_by\", '\"') \\\n .csv(\"@demo.public.files/product_reviews_jan_24.csv\")\n\n# View the dataframe\njan_df.show()", "execution_count": null }, + { + "cell_type": "markdown", + "id": "23fa7d27-7ef7-4090-ab0d-36fd33247971", + "metadata": { + "name": "md_iceberg_catalogs", + "collapsed": false, + "resultHeight": 186 + }, + "source": "Iceberg Tables can currently use Snowflake, Iceberg REST catalogs, AWS Glue, or object storage as the catalog. In this quickstart, use Snowflake as the catalog to allow read and write operations to the table. More information about integrating catalogs can be found [here](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration).\n\nCreate an Iceberg Table referencing the external volume you just created. You can specify `BASE_LOCATION` to instruct Snowflake where to write table data and metadata, or leave empty to write data and metadata to the location specified in the external volume definition." + }, { "cell_type": "code", "id": "c3d25722-6560-4716-b3e6-d558bd1415ed", "metadata": { "language": "python", "name": "py_jan_df_write", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 0 }, "outputs": [], - "source": "# Write the dataframe to the Iceberg Table\njan_df.write.mode(\"append\").save_as_table(\"demo.public.product_reviews\")", + "source": "# Write the dataframe to the Iceberg Table\njan_df.write \\\n .mode(\"overwrite\") \\\n .save_as_table(\"demo.public.product_reviews\",\n iceberg_config={\n \"external_volume\":\"iceberg_cortex_vol\",\n \"catalog\":\"snowflake\",\n \"base_location\":\"demo/product_reviews/\"\n }\n )", "execution_count": null }, { @@ -123,16 +120,18 @@ "id": "d22fa91f-31c3-48ba-bbf8-a889ab8ccbc0", "metadata": { "name": "md_load_complete", - "collapsed": false + "collapsed": false, + "resultHeight": 67 }, - "source": "You now see metadata files and Parquet data files in your object storage, whether you’re using Amazon S3 or Azure storage.\n\n![iceberg_files](https://github.com/Snowflake-Labs/sfquickstarts/blob/master/site/sfguides/src/cortex_ai_sentiment_iceberg/assets/iceberg_files.png)" + "source": "You now see metadata files and Parquet data files in your object storage, whether you’re using Amazon S3 or Azure storage." }, { "cell_type": "markdown", "id": "cfe9fb1f-5f0c-4547-9dca-66c5c0f6d1a1", "metadata": { "name": "md_cortex", - "collapsed": false + "collapsed": false, + "resultHeight": 141 }, "source": "# Snowflake Cortex LLM Functions\nNow you can query the Iceberg Table using LLM functions from Snowflake Cortex AI. Run the query below to calculate sentiment scores for product reviews." }, @@ -141,8 +140,9 @@ "id": "300f04d3-98c4-4697-976a-267b7fb7914c", "metadata": { "language": "sql", - "name": "sql_reviews_select", - "codeCollapsed": false + "name": "sql_reviews_jan", + "codeCollapsed": false, + "resultHeight": 439 }, "outputs": [], "source": "SELECT\n id,\n product_name,\n review_date,\n snowflake.cortex.sentiment(review) as review_sentiment\nFROM demo.public.product_reviews", @@ -154,7 +154,8 @@ "metadata": { "language": "sql", "name": "sql_update", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 112 }, "outputs": [], "source": "-- Write the sentiment scores back to the Iceberg Table.\nUPDATE demo.public.product_reviews AS pr\n SET sentiment = jan.review_sentiment\n FROM {{sql_reviews_jan}} AS jan\n WHERE jan.id = pr.id;", @@ -165,7 +166,8 @@ "id": "e131af80-db4c-4de4-8fef-3ba4f9ee163c", "metadata": { "name": "md_create_pipeline", - "collapsed": false + "collapsed": false, + "resultHeight": 327 }, "source": "# Create a CDC Pipeline\nSuppose new product reviews continue to be generated, stored as new CSV files, and you'd like to use Snowflake to automatically compute sentiment scores on new product reviews.\n\n[Streams on Directory Tables](https://docs.snowflake.com/en/user-guide/data-load-dirtables-pipeline) can detect new files in stages, perform computation, and store results. LLM functions from Snowflake Cortex can be called in these pipelines, writing results to Iceberg Tables.\n\nTo simulate this, create a Stream on the Iceberg Table to detect new product reviews loaded to the table. On a schedule, a Serverless Task will call the SENTIMENT function on to incrementally process new records." }, @@ -175,10 +177,11 @@ "metadata": { "language": "sql", "name": "sql_create_pipeline", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 112 }, "outputs": [], - "source": "-- Create a Stream to detect new product review records in the Iceberg Table\nCREATE STREAM demo.public.product_reviews_stream ON TABLE demo.public.product_reviews;\n\n-- Create a Serverless Task to add sentiment for new records from the Stream\nCREATE OR REPLACE TASK demo.public.cortex_sentiment_score\n SCHEDULE = 'USING CRON 0 0 * * * America/Los_Angeles'\n USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'\nAS\nUPDATE demo.public.product_reviews AS pr\n SET sentiment = snowflake.cortex.sentiment(prs.review)\n FROM demo.public.product_reviews_stream AS prs\n WHERE prs.id = pr.id;", + "source": "-- Create a Stream to detect new product review records in the Iceberg Table\nCREATE OR REPLACE STREAM demo.public.product_reviews_stream ON TABLE demo.public.product_reviews;\n\n-- Create a Serverless Task to add sentiment for new records from the Stream\nCREATE OR REPLACE TASK demo.public.cortex_sentiment_score\n SCHEDULE = 'USING CRON 0 0 * * * America/Los_Angeles'\n USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'\nAS\nUPDATE demo.public.product_reviews AS pr\n SET sentiment = snowflake.cortex.sentiment(prs.review)\n FROM demo.public.product_reviews_stream AS prs\n WHERE prs.id = pr.id;", "execution_count": null }, { @@ -186,7 +189,8 @@ "id": "8080fa4d-3b6c-469e-8e0e-f502e707f053", "metadata": { "name": "md_feb_df", - "collapsed": false + "collapsed": false, + "resultHeight": 67 }, "source": "Now see the incremental processing pipeline in action. Create a dataframe for February product reviews and write it to the Iceberg Table." }, @@ -196,7 +200,8 @@ "metadata": { "language": "python", "name": "py_feb_df", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 0 }, "outputs": [], "source": "feb_df = session.read \\\n .schema(reviews_schema) \\\n .option(\"skip_header\", 1) \\\n .option(\"field_optionally_enclosed_by\", '\"') \\\n .csv(\"@demo.public.files/product_reviews_feb_24.csv\")\n\nfeb_df.write.mode(\"append\").save_as_table(\"demo.public.product_reviews\")", @@ -207,7 +212,8 @@ "id": "04f10f72-e47c-4662-abbb-a880b37d3bf4", "metadata": { "name": "md_reviews", - "collapsed": false + "collapsed": false, + "resultHeight": 159 }, "source": "The Task will execute on the specified schedule. Manually trigger the task to calculate sentiment scores for February product reviews, writing the results back to the Iceberg Table. Now, you should see the February product reviews and sentiment scores.\n\nFor example, for each product, what was the change in sentiment from January to February? Run the query below." }, @@ -216,7 +222,8 @@ "id": "113cd2e4-335e-4d77-9667-89fc2ca452f1", "metadata": { "language": "sql", - "name": "run_task" + "name": "run_task", + "resultHeight": 112 }, "outputs": [], "source": "-- Manually trigger Task\nEXECUTE task demo.public.cortex_sentiment_score;", @@ -228,7 +235,8 @@ "metadata": { "language": "sql", "name": "sql_reviews", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 322 }, "outputs": [], "source": "-- Sentiment change from January to February\nWITH jan AS (\n SELECT\n product_name,\n AVG(sentiment) AS avg_sentiment\n FROM demo.public.product_reviews\n WHERE MONTHNAME(review_date) = 'Jan'\n GROUP BY 1\n)\n, feb AS (\n SELECT\n product_name,\n AVG(sentiment) AS avg_sentiment\n FROM demo.public.product_reviews\n WHERE MONTHNAME(review_date) = 'Feb'\n GROUP BY 1\n)\nSELECT\n COALESCE(j.product_name, f.product_name) AS product_name,\n j.avg_sentiment AS jan_sentiment,\n f.avg_sentiment AS feb_sentiment,\n feb_sentiment - jan_sentiment AS sentiment_diff\nFROM jan j\nFULL OUTER JOIN feb f\n ON j.product_name = f.product_name\nORDER BY sentiment_diff DESC;", @@ -240,7 +248,8 @@ "metadata": { "language": "python", "name": "py_chart", - "codeCollapsed": false + "codeCollapsed": false, + "resultHeight": 373 }, "outputs": [], "source": "import streamlit as st\n\nst.bar_chart(sql_reviews.to_df(), x='PRODUCT_NAME', y='SENTIMENT_DIFF')", diff --git a/samples/iceberg_cortex/snowpark.ipynb b/samples/iceberg_cortex/snowpark.ipynb deleted file mode 100644 index 37a4c58..0000000 --- a/samples/iceberg_cortex/snowpark.ipynb +++ /dev/null @@ -1,189 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Import Packages\n", - "Just like the Python packages we are importing, we will import the Snowpark modules that we need." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import json\n", - "from snowflake.snowpark import Session\n", - "import snowflake.snowpark.types as T" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Connect to Snowflake\n", - "Use these parameters and our Snowflake account credentials to connect to Snowflake and create a Snowpark session." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Get account credentials from a json file\n", - "with open(\"auth.json\") as f:\n", - " data = json.load(f)\n", - " username = data[\"username\"]\n", - " password = data[\"password\"]\n", - " account = data[\"account\"]\n", - " f.close()\n", - "\n", - "# Specify connection parameters\n", - "connection_parameters = {\n", - " \"account\": account,\n", - " \"user\": username,\n", - " \"password\": password,\n", - " \"role\": \"accountadmin\",\n", - " \"warehouse\": \"demo_wh\",\n", - " \"database\": \"demo\",\n", - " \"schema\": \"public\",\n", - "}\n", - "\n", - "# Create Snowpark session\n", - "session = Session.builder.configs(connection_parameters).create()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Upload CSVs\n", - "Upload product review CSVs to Snowflake internal stage." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "try:\n", - " put_results = session.file.put(\n", - " \"product_reviews_*.csv\",\n", - " \"@demo.public.files\",\n", - " overwrite=False,\n", - " auto_compress=False)\n", - " for r in put_results:\n", - " str_output = (\"File {src}: {status}\").format(src = r.source, status=r.status)\n", - " print(str_output)\n", - "except Exception as e:\n", - " print(e)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create Snowpark DataFrame and write to Iceberg table" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Define schema for CSV file\n", - "reviews_schema = T.StructType([T.StructField(\"ID\", T.StringType()),\n", - " T.StructField(\"PRODUCT_NAME\", T.StringType()),\n", - " T.StructField(\"PRODUCT_ID\", T.StringType()),\n", - " T.StructField(\"REVIEWER_NAME\", T.StringType()),\n", - " T.StructField(\"REVIEW_DATE\", T.DateType()),\n", - " T.StructField(\"REVIEW\", T.StringType()),\n", - " T.StructField(\"SENTIMENT\", T.FloatType())])" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Read January reviews CSV into DataFrame \n", - "jan_df = session.read \\\n", - " .schema(reviews_schema) \\\n", - " .option(\"skip_header\", 1) \\\n", - " .option(\"field_optionally_enclosed_by\", '\"') \\\n", - " .csv(\"@demo.public.files/product_reviews_jan_24.csv\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Preview January DataFrame\n", - "jan_df.show()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Write January product reviews to Snowflake-managed Iceberg Table\n", - "jan_df.write.mode(\"append\").save_as_table(\"demo.public.product_reviews\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Read February reviews CSV into DataFrame \n", - "feb_df = session.read \\\n", - " .schema(reviews_schema) \\\n", - " .option(\"skip_header\", 1) \\\n", - " .option(\"field_optionally_enclosed_by\", '\"') \\\n", - " .csv(\"@demo.public.files/product_reviews_feb_24.csv\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Write February product reviews to Snowflake-managed Iceberg Table\n", - "feb_df.write.mode(\"append\").save_as_table(\"demo.public.product_reviews\")" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "py38_env_forrester", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.18" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/samples/iceberg_cortex/spark.ipynb b/samples/iceberg_cortex/spark.ipynb index 4505a85..366d600 100644 --- a/samples/iceberg_cortex/spark.ipynb +++ b/samples/iceberg_cortex/spark.ipynb @@ -67,7 +67,7 @@ "outputs": [], "source": [ "# Load the environment variables from the .env file\n", - "load_dotenv()\n" + "load_dotenv('env.txt')" ] }, { diff --git a/samples/iceberg_cortex/spark_env_variables.txt b/samples/iceberg_cortex/spark_env_variables.txt deleted file mode 100644 index a415ef7..0000000 --- a/samples/iceberg_cortex/spark_env_variables.txt +++ /dev/null @@ -1,8 +0,0 @@ -export SPARK_HOME=/Users//anaconda3/envs/iceberg-lab/lib/python3.11/site-packages/pyspark -export SNOWFLAKE_CATALOG_URI=jdbc:snowflake://.snowflakecomputing.com -export SNOWFLAKE_ROLE=ACCOUNTADMIN -export SNOWFLAKE_USERNAME= -export PACKAGES=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,net.snowflake:snowflake-jdbc:3.14.2,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160 -export AWS_REGION= -export AWS_ACCESS_KEY_ID= -export AWS_SECRET_ACCESS_KEY=