Skip to content

Commit

Permalink
Merge pull request #10 from orellabac/main
Browse files Browse the repository at this point in the history
0.0.10
  • Loading branch information
orellabac authored Dec 19, 2022
2 parents e332613 + 457d368 commit 55c5c35
Show file tree
Hide file tree
Showing 8 changed files with 569 additions and 69 deletions.
16 changes: 15 additions & 1 deletion CHANGE_LOG.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,18 @@ Fixing issue with requirements.txt

Version 0.0.9
-------------
Fixing bug in create_map
Fixing bug in create_map

Version 0.0.10
-------------
Adding test_cases
Adding support for:
* arrays_zip
* applyInPandas
* explode for arrays and dicts
* explode_outer for arrays and dicts
* adding array alias for array_construct
* F.arrays_sort
* F.array_distinct
* F.flatten for arrays
* F.asc, F.desc, F.asc_nulls_first, F.desc_nulls_first
115 changes: 81 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

Snowpark by itself is a powerful library, but still some utility functions can always help.




# Installation

We recommended installing using [PYPI](https://pypi.org/)
Expand Down Expand Up @@ -203,10 +200,17 @@ df.group_by("ID").applyInPandas(
| functions.from_unixtimestamp | can be used to convert UNIX time to Snowflake timestamp |
| functions.format_number | formats numbers using the specified number of decimal places |
| functions.reverse | returns a reversed string |
| functions.explode | returns a new row for each element in the given array |
| functions.explode | returns a new row for each element in the given array
| functions.explode_outer | returns a new row for each element in the given array or map. Unlike explode, if the array/map is null or empty then null is produced |
| functions.arrays_zip | returns a merged array of arrays |
| functions.array_sort | sorts the input array in ascending order. The elements of the input array must be orderable. Null elements will be placed at the end of the returned array.
| functions.array_distinct | removes duplicate values from the array.
| functions.date_add | returns the date that is n days days after |
| functions.date_sub | returns the date that is n days before |
| functions.regexp_extract | Extract a specific group matched by a regex, from the specified string column. |
| functions.regexp_extract | extract a specific group matched by a regex, from the specified string column. |
| functions.asc | returns a sort expression based on the ascending order of the given column name. |
| functions.desc | returns a sort expression based on the descending order of the given column name. |
| functions.flatten | creates a single array from an array of arrays


### Examples:
Expand All @@ -225,39 +229,82 @@ df.select(F.array_sort(df.data)).show()
```

```
-------------------------------------------
|"ARRAY_SORT(""DATA"", TRUE :: BOOLEAN)" |
-------------------------------------------
|[ |
| 2, |
| 1, |
| 3, |
| null |
|] |
|[] |
|[ |
| 1 |
|] |
-------------------------------------------
------------
|"SORTED" |
------------
|[ |
| 1, |
| 2, |
| 3, |
| null |
|] |
|[ |
| 1 |
|] |
|[] |
------------
```

### explode and explode_outer

Snowflake builtin [FLATTEN](https://docs.snowflake.com/en/sql-reference/functions/flatten.html) provide the same functionality, but the explode syntax can be somethings easier. This helper provide the same syntax.
> NOTE: explode can be used with arrays and maps/structs. In this helper at least for now you need to specify if you want to process this as array or map. We provide explode and explode outer our you can just use explode with the outer=True flag.
```python
from snowflake.snowpark import Session
import snowpark_extensions
from snowflake.snowpark.functions import explode
session = Session.builder.appName('snowpark_extensions_unittest').from_snowsql().getOrCreate()
schema = StructType([StructField("id", IntegerType()), StructField("an_array", ArrayType()), StructField("a_map", MapType()) ])
sf_df = session.createDataFrame([(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)],schema)
```
```
# +---+----------+----------+
# | id| an_array| a_map|
# +---+----------+----------+
# | 1|[foo, bar]|{x -> 1.0}|
# | 2| []| {}|
# | 3| null| null|
# +---+----------+----------+
```
```python
sf_df.select("id", "an_array", explode("an_array")).show()
```
```
# +---+----------+---+
# | id| an_array|col|
# +---+----------+---+
# | 1|[foo, bar]|foo|
# | 1|[foo, bar]|bar|
# +---+----------+---+
```
```python
sf_df.select("id", "an_array", explode_outer("an_array")).show()
```
```
# +---+----------+----+
# | id| an_array| COL|
# +---+----------+----+
# | 1|[foo, bar]| foo|
# | 1|[foo, bar]| bar|
# | 2| []| |
# | 3| | |
# +---+----------+----+
```
df.select(F.array_sort(df.data, asc=False)).show()

For a map use
```
--------------------------------------------
|"ARRAY_SORT(""DATA"", FALSE :: BOOLEAN)" |
--------------------------------------------
|[ |
| 1 |
|] |
|[] |
|[ |
| null, |
| 2, |
| 1, |
| 3 |
|] |
--------------------------------------------
results = sf_df.select("id", "an_array", explode_outer("an_array",map=True))
```
# +---+----------+----+-----+
# | id| an_array| KEY| VALUE|
# +---+----------+----+-----+
# | 1|[foo, bar]| x| 1 |
# | 2| []| | |
# | 3| | | |
# +---+----------+----+-----+
```
### regexp_extract
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
this_directory = Path(__file__).parent
long_description = (this_directory / "README.md").read_text()

VERSION = '0.0.9'
VERSION = '0.0.10'

setup(name='snowpark_extensions',
version=VERSION,
Expand Down
160 changes: 150 additions & 10 deletions snowpark_extensions/dataframe_extensions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from snowflake.snowpark import DataFrame, Row, DataFrameNaFunctions
from snowflake.snowpark.functions import col, lit, udtf, regexp_replace
from snowflake.snowpark import functions as F
from snowflake.snowpark.dataframe import _generate_prefix
from snowflake.snowpark.functions import table_function
from snowflake.snowpark.column import _to_col_if_str, _to_col_if_lit
import pandas as pd
import numpy as np
from snowpark_extensions.utils import map_to_python_type, schema_str_to_schema
Expand Down Expand Up @@ -85,26 +88,163 @@ def extended_replace(
else:
return self.__oldreplace(to_replace,value,subset)

DataFrameNaFunctions.replace = extended_replace
DataFrameNaFunctions.replace = extended_replace

# EXPLODE HELPERS
class Explode:
def __init__(self,expr):
def has_null(col):
return F.array_contains(F.sql_expr("parse_json('null')"),col) | F.coalesce(F.array_contains(lit(None) ,col),lit(False))

# SPECIAL COLUMN HELPERS
class SpecialColumn():
def _is_special_column():
return True
def gen_unique_value_name(self,idx,base_name):
return base_name if idx == 0 else f"{base_name}_{idx}"
def add_columns(self,new_cols, idx):
pass
def expand(self,df):
pass

class ArraySort(SpecialColumn):
def __init__(self,array_col):
self.array_col = array_col
def add_columns(self, new_cols, idx):
self.value_col_name = self.gen_unique_value_name(idx,"sorted")
new_cols.append(self.value_col_name)
def expand(self,df):
array_col = _to_col_if_str(self.array_col, "array_sort")
df = df.with_column("__IDX",F.seq8())
flatten = table_function("flatten")
df_array_sorted=df.join_table_function(flatten(input=array_col,outer=lit(True))).group_by("__IDX").agg(F.sql_expr("array_agg(value) within group(order by value)").alias("sorted"))
df = df.join(df_array_sorted,on="__IDX").drop("__IDX")
return df

class ArrayFlatten(SpecialColumn):
def __init__(self,flatten_col,remove_arrays_when_there_is_a_null):
self.flatten_col = flatten_col
self.remove_arrays_when_there_is_a_null = remove_arrays_when_there_is_a_null
def add_columns(self, new_cols, idx):
self.value_col_name = self.gen_unique_value_name(idx,"flatten")
new_cols.append(self.value_col_name)
def expand(self,df):
array_col = _to_col_if_str(self.flatten_col, "flatten")
flatten = table_function("flatten")
df=df.join_table_function(flatten(array_col).alias("__SEQ_FLATTEN","KEY","PATH","__INDEX_FLATTEN","__FLATTEN_VALUE","THIS"))
df = df.drop("KEY","PATH","THIS")
if self.remove_arrays_when_there_is_a_null:
df_with_has_null=df.withColumn("__HAS_NULL",has_null(array_col))
df_flattened= df_with_has_null.group_by(col("__SEQ_FLATTEN")).agg(F.call_builtin("BOOLOR_AGG",col("__HAS_NULL")).alias("__HAS_NULL"),F.call_builtin("ARRAY_UNION_AGG",col("__FLATTEN_VALUE")).alias("__FLATTEN_VALUE"))
df_flattened=df_flattened.with_column("__FLATTEN_VALUE",F.iff("__HAS_NULL", lit(None), col("__FLATTEN_VALUE"))).drop("__HAS_NULL")
df=df.drop("__FLATTEN_VALUE").where(col("__INDEX_FLATTEN")==0).join(df_flattened,on="__SEQ_FLATTEN").drop("__SEQ_FLATTEN","__INDEX_FLATTEN").rename("__FLATTEN_VALUE",self.value_col_name)
return df
else:
df_flattened= df.group_by(col("__SEQ_FLATTEN")).agg(F.call_builtin("ARRAY_UNION_AGG",col("__FLATTEN_VALUE")).alias("__FLATTEN_VALUE"))
df=df.drop("__FLATTEN_VALUE").where(col("__INDEX_FLATTEN")==0).join(df_flattened,on="__SEQ_FLATTEN").drop("__SEQ_FLATTEN","__INDEX_FLATTEN").rename("__FLATTEN_VALUE",self.value_col_name)
return df

class ArrayZip(SpecialColumn):
def __init__(self,left,*right):
self.left_col = left
self.right_cols = right
#self.right_col = right
def add_columns(self,new_cols,idx):
self.value_col_name = self.gen_unique_value_name(idx,"zipped")
new_cols.append(self.value_col_name)
def expand(self,df):
flatten = table_function("flatten")
left = df.join_table_function(flatten(self.left_col)\
.alias("SEQ","KEY","PATH","INDEX","__VALUE_0","THIS")) \
.orderBy("SEQ","INDEX") \
.with_column("__IDX",F.seq8()) \
.drop("SEQ","KEY","PATH","INDEX","THIS")
vals=["__VALUE_0"]
for right_col in self.right_cols:
prior=len(vals)-1
next=len(vals)
left_col_name=f"__VALUE_{prior}"
right_col_name=f"__VALUE_{next}"
vals.append(right_col_name)
right=df.select(right_col).join_table_function(flatten(right_col)\
.alias("SEQ","KEY","PATH","INDEX",right_col_name,"THIS")) \
.orderBy("SEQ","INDEX") \
.select(F.seq8().alias("__IDX"),col(right_col_name))
left=left.join(right,on="__IDX",how="left",lsuffix="___LEFT")
zipped = left.with_column("ZIPPED",F.array_construct(*vals))\
.drop(*vals,"__IDX")
return zipped

class Explode(SpecialColumn):
def __init__(self,expr,map=False,outer=False):
""" Right not it must be explictly stated if the value is a map. By default it is assumed it is not"""
self.expr = expr
self.map = map
self.outer = outer
def add_columns(self,new_cols,idx):
self.value_col_name = self.gen_unique_value_name(idx,"value" if self.map else "col")
self.key_col_name = None
if self.map:
self.key_col_name = self.gen_unique_value_name(idx,"key")
new_cols.append(self.key_col_name)
new_cols.append(self.value_col_name)
def expand(self,df):
if self.key_col_name:
df = df.join_table_function(flatten(input=self.expr,outer=lit(self.outer)).alias("SEQ",self.key_col_name,"PATH","INDEX",self.value_col_name,"THIS")).drop(["SEQ","PATH","INDEX","THIS"])
else:
df = df.join_table_function(flatten(input=self.expr,outer=lit(self.outer)).alias("SEQ","KEY","PATH","INDEX",self.value_col_name,"THIS")).drop(["SEQ","KEY","PATH","INDEX","THIS"])
return df

def explode(expr,outer=False,map=False):
return Explode(expr,map,outer)

def explode(expr):
return Explode(expr)
def explode_outer(expr,map=False):
return Explode(expr,map,True)

F.explode = explode
F.explode_outer = explode_outer
def _arrays_zip(left,*right):
return ArrayZip(left,*right)
def _arrays_flatten(array_col,remove_arrays_when_there_is_a_null=True):
return ArrayFlatten(array_col,remove_arrays_when_there_is_a_null)
def _array_sort(array_col):
return ArraySort(array_col)

DataFrame.oldwithColumn = DataFrame.withColumn
F.arrays_zip = _arrays_zip
F.flatten = _arrays_flatten
F.array_sort = _array_sort
flatten = table_function("flatten")
_oldwithColumn = DataFrame.withColumn
def withColumnExtended(self,colname,expr):
if isinstance(expr, Explode):
return self.join_table_function('flatten',date_range_udf(col("epoch_min"), col("epoch_max"))).drop(["SEQ","KEY","PATH","INDEX","THIS"]).rename("VALUE",colname)
if isinstance(expr, SpecialColumn):
new_cols = []
expr.add_columns(new_cols, 0)
df=expr.expand(self)
return self.withColumns(df,new_cols,[col(x) for x in new_cols])
else:
return self.oldwithColumn(colname,expr)
return _oldwithColumn(self,colname,expr)

DataFrame.withColumn = withColumnExtended

oldSelect = DataFrame.select

def selectExtended(self,*cols):
## EXPLODE
if any(isinstance(x, SpecialColumn) for x in cols):
new_cols = []
extended_cols = []
for x in cols:
if isinstance(x, SpecialColumn):
x.add_columns(new_cols,len(extended_cols))
extended_cols.append(x)
else:
new_cols.append(x)
df = self
for extended_col in extended_cols:
df = extended_col.expand(df)
return oldSelect(df,*[_to_col_if_str(x,"extended") for x in new_cols])
else:
return oldSelect(self,*cols)

DataFrame.select = selectExtended


import shortuuid
from snowflake.snowpark import Window, Column
Expand Down
Loading

0 comments on commit 55c5c35

Please sign in to comment.