diff --git a/CHANGELOG.md b/CHANGELOG.md index bc196b84817..44605d34a68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,12 @@ - `nullifzero` - `snowflake_cortex_sentiment` - Added `Catalog` class to manage snowflake objects. It can be accessed via `Session.catalog`. +- Added new methods in class `DataFrame`: + - `col_regex`: Select columns that match with provided regex. + - `map` and its alias `foreach`: A method to apply user function on each row with 1-1 mapping. + - `flat_map`: A method to apply user function on each row with one to many mapping. + - `toJSON` and its alias `to_json`: Convert each row of dataframe into json string. + - `transform`: Chain multiple transformations on dataframe. #### Improvements @@ -63,8 +69,6 @@ #### New Features - Added support for property `version` and class method `get_active_session` for `Session` class. -- Added new methods in class `DataFrame`: - - `col_regex`: Select columns that match with provided regex. - Added support for property `version` and class method `get_active_session` for `Session` class. - Added new methods and variables to enhance data type handling and JSON serialization/deserialization: - To `DataType`, its derived classes, and `StructField`: @@ -84,7 +88,6 @@ - `collect_list` an alias of `array_agg`. - `substring` makes `len` argument optional. - Added parameter `ast_enabled` to session for internal usage (default: `False`). -- Added support for `Dataframe.toJSON` #### Improvements diff --git a/docs/source/snowpark/dataframe.rst b/docs/source/snowpark/dataframe.rst index 1596b7b072d..af82dbef9e9 100644 --- a/docs/source/snowpark/dataframe.rst +++ b/docs/source/snowpark/dataframe.rst @@ -50,6 +50,8 @@ DataFrame DataFrame.fillna DataFrame.filter DataFrame.first + DataFrame.flat_map + DataFrame.flatMap DataFrame.flatten DataFrame.groupBy DataFrame.group_by diff --git a/src/snowflake/snowpark/_internal/analyzer/analyzer.py b/src/snowflake/snowpark/_internal/analyzer/analyzer.py index a528c9b4c58..40b1ac8581d 100644 --- a/src/snowflake/snowpark/_internal/analyzer/analyzer.py +++ b/src/snowflake/snowpark/_internal/analyzer/analyzer.py @@ -325,7 +325,7 @@ def analyze( if isinstance(expr, WindowSpecDefinition): return window_spec_expression( [ - self.analyze( + self.to_sql_try_avoid_cast( x, df_aliased_col_name_to_real_col_name, parse_local_name ) for x in expr.partition_spec @@ -456,7 +456,7 @@ def analyze( return table_function_partition_spec( expr.over, [ - self.analyze( + self.to_sql_try_avoid_cast( x, df_aliased_col_name_to_real_col_name, parse_local_name ) for x in expr.partition_spec @@ -623,7 +623,9 @@ def table_function_expression_extractor( "NamedArgumentsTableFunction, GeneratorTableFunction, or FlattenFunction." ) partition_spec_sql = ( - self.analyze(expr.partition_spec, df_aliased_col_name_to_real_col_name) + self.to_sql_try_avoid_cast( + expr.partition_spec, df_aliased_col_name_to_real_col_name + ) if expr.partition_spec else "" ) diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index 63c6f59f5db..2d0149e66c3 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -20,6 +20,7 @@ List, Optional, Tuple, + Type, Union, overload, ) @@ -199,6 +200,7 @@ StructType, _NumericType, ) +from snowflake.snowpark.udtf import UserDefinedTableFunction # Python 3.8 needs to use typing.Iterable because collections.abc.Iterable is not subscriptable # Python 3.9 can use both @@ -5827,6 +5829,137 @@ def print_schema(self, level: Optional[int] = None) -> None: """ print(self._format_schema(level)) # noqa: T201: we need to print here. + def _parse_and_register_udtf_for_map( + self, + func: Callable, + output_types: List[StructType], + output_column_names: Optional[List[str]], + imports: Optional[List[Union[str, Tuple[str, str]]]], + packages: Optional[List[Union[str, ModuleType]]], + immutable: bool, + vectorized: bool, + max_batch_size: Optional[int], + df_columns: List[str], + is_flat_map: bool, + ) -> Tuple[UserDefinedTableFunction, List[Column]]: + if len(output_types) == 0: + raise ValueError("output_types cannot be empty.") + + if output_column_names is None: + output_column_names = [f"C_{i+1}" for i in range(len(output_types))] + elif len(output_column_names) != len(output_types): + raise ValueError( + "'output_column_names' and 'output_types' must be of the same size." + ) + output_columns = [ + col(f"${i + len(df_columns) + 1}").alias( + col_name + ) # this is done to avoid collision with original table columns + for i, col_name in enumerate(output_column_names) + ] + + packages = packages or list(self._session.get_packages().values()) + packages = add_package_to_existing_packages( + packages, "snowflake-snowpark-python" + ) + + input_types = [field.datatype for field in self.schema.fields] + udtf_output_cols = [f"c{i}" for i in range(len(output_types))] + output_schema = StructType( + [ + StructField(col, type_) + for col, type_ in zip(udtf_output_cols, output_types) + ] + ) + + if vectorized: + # If the map is vectorized, we need to add pandas to packages if not + # already added. Also update the input_types and output_schema to + # be PandasDataFrameType. + packages = add_package_to_existing_packages(packages, pandas) + input_types = [PandasDataFrameType(input_types)] + output_schema = PandasDataFrameType(output_types, udtf_output_cols) + + if is_flat_map: + _MapFunc = self._get_udtf_class_for_flat_map(func, vectorized, df_columns) + else: + _MapFunc = self._get_udtf_class_for_map(func, vectorized, df_columns) + + map_udtf = self._session.udtf.register( + _MapFunc, + output_schema=output_schema, + input_types=input_types, + input_names=df_columns, + imports=imports, + packages=packages, + immutable=immutable, + max_batch_size=max_batch_size, + ) + return map_udtf, output_columns + + def _get_udtf_class_for_map( + self, func: Callable, vectorized: bool, df_columns: List[str] + ) -> Type: + if vectorized: + + def wrap_result(result): # pragma: no cover + if isinstance(result, pandas.DataFrame) or isinstance(result, tuple): + return result + return (result,) + + class _MapFunc: + def process(self, pdf): # pragma: no cover + return wrap_result(func(pdf)) + + else: + + def wrap_result(result): # pragma: no cover + if isinstance(result, Row): + return tuple(result) + elif isinstance(result, tuple): + return result + else: + return (result,) + + class _MapFunc: + def process(self, *argv): # pragma: no cover + input_args_to_row = Row(*df_columns) + yield wrap_result(func(input_args_to_row(*argv))) + + return _MapFunc + + def _get_udtf_class_for_flat_map( + self, func: Callable, vectorized: bool, df_columns: List[str] + ) -> Type: + if vectorized: + + def wrap_result(result): # pragma: no cover + if isinstance(result, pandas.DataFrame) or isinstance(result, tuple): + return result + return (result,) + + class _FlatMapFunc: + def end_partition(self, pdf): # pragma: no cover + return wrap_result(func(pdf)) + + else: + + def wrap_result(result): # pragma: no cover + if isinstance(result, Row): + return tuple(result) + elif isinstance(result, tuple): + return result + else: + return (result,) + + class _FlatMapFunc: + def process(self, *argv): # pragma: no cover + input_args_to_row = Row(*df_columns) + for row in func(input_args_to_row(*argv)): + yield wrap_result(row) + + return _FlatMapFunc + def map( self, func: Callable, @@ -5840,8 +5973,10 @@ def map( vectorized: bool = False, max_batch_size: Optional[int] = None, ): - """Returns a new DataFrame with the result of applying `func` to each of the - rows of the specified DataFrame. + """Returns a new DataFrame with the result of applying ``func`` to each of the + rows of the specified DataFrame. The function must return either a scalar value + or a tuple containing the same number of elements as specified in the + ``output_types`` argument. This method is used for one-to-one row transformations. This function registers a temporary `UDTF `_ and @@ -5956,91 +6091,205 @@ def map( Note: - 1. The result of the `func` function must be either a scalar value or + 1. The result of the ``func`` function must be either a scalar value or a tuple containing the same number of elements as specified in the - `output_types` argument. + ``output_types`` argument. - 2. When using the `vectorized` option, the `func` function must accept + 2. When using the ``vectorized`` option, the ``func`` function must accept a pandas DataFrame as input and return either a pandas DataFrame, or a tuple of pandas Series/arrays. """ - if len(output_types) == 0: - raise ValueError("output_types cannot be empty.") - - if output_column_names is None: - output_column_names = [f"c_{i+1}" for i in range(len(output_types))] - elif len(output_column_names) != len(output_types): - raise ValueError( - "'output_column_names' and 'output_types' must be of the same size." - ) - df_columns = self.columns - packages = packages or list(self._session.get_packages().values()) - packages = add_package_to_existing_packages( - packages, "snowflake-snowpark-python" + map_udtf, output_columns = self._parse_and_register_udtf_for_map( + func=func, + output_types=output_types, + output_column_names=output_column_names, + imports=imports, + packages=packages, + immutable=immutable, + vectorized=vectorized, + max_batch_size=max_batch_size, + df_columns=df_columns, + is_flat_map=False, ) - input_types = [field.datatype for field in self.schema.fields] - udtf_output_cols = [f"c{i}" for i in range(len(output_types))] - output_schema = StructType( - [ - StructField(col, type_) - for col, type_ in zip(udtf_output_cols, output_types) - ] - ) + return self.join_table_function( + map_udtf(*df_columns).over(partition_by=partition_by) + ).select(*output_columns) - if vectorized: - # If the map is vectorized, we need to add pandas to packages if not - # already added. Also update the input_types and output_schema to - # be PandasDataFrameType. - packages = add_package_to_existing_packages(packages, pandas) - input_types = [PandasDataFrameType(input_types)] - output_schema = PandasDataFrameType(output_types, udtf_output_cols) + def flat_map( + self, + func: Callable, + output_types: List[StructType], + *, + output_column_names: Optional[List[str]] = None, + imports: Optional[List[Union[str, Tuple[str, str]]]] = None, + packages: Optional[List[Union[str, ModuleType]]] = None, + immutable: bool = False, + partition_by: Optional[Union[ColumnOrName, List[ColumnOrName]]] = None, + vectorized: bool = False, + max_batch_size: Optional[int] = None, + ): + """Returns a new DataFrame with the result of applying ``func`` to each of the + rows of the specified DataFrame. This function is similar to :meth:`Dataframe.map`, + but it expects the `func` function to return an iterable of values for each row and + is used for one to many transformations. - output_columns = [ - col(f"${i + len(df_columns) + 1}").alias( - col_name - ) # this is done to avoid collision with original table columns - for i, col_name in enumerate(output_column_names) - ] + This function registers a temporary `UDTF + `_ and + returns a new DataFrame with the result of applying the `func` function to each row of the + given DataFrame. - if vectorized: + Args: + dataframe: The DataFrame instance. + func: A function to be applied to every row of the DataFrame. + output_types: A list of types for values generated by the ``func`` + output_column_names: A list of names to be assigned to the resulting columns. + imports: A list of imports that are required to run the function. This argument is passed + on when registering the UDTF. + packages: A list of packages that are required to run the function. This argument is passed + on when registering the UDTF. + immutable: A flag to specify if the result of the func is deterministic for the same input. + partition_by: Specify the partitioning column(s) for the UDTF. Default partition by is 1 when + vectorized is ``True`` and all data is processed in single partition. + vectorized: When true, the UDTF is registered using vectorized ``end_partition``, see + `vectorized UDTFs `_. + Otherwise, data is processed row-by-row using and registered UDTF using ``process`` method. + max_batch_size: The maximum number of rows per input pandas DataFrame when using vectorized option. - def wrap_result(result): - if isinstance(result, pandas.DataFrame) or isinstance(result, tuple): - return result - return (result,) + Example 1:: - class _MapFunc: - def process(self, pdf): - return wrap_result(func(pdf)) + >>> from snowflake.snowpark.types import IntegerType + >>> import pandas as pd + >>> df = session.create_dataframe([[10, "a", 22], [20, "b", 22]], schema=["col1", "col2", "col3"]) + >>> new_df = df.flat_map(lambda row: (row[0] * row[0], row[0] + row[0]), output_types=[IntegerType()]) + >>> new_df.order_by("c_1").show() + --------- + |"C_1" | + --------- + |20 | + |40 | + |100 | + |400 | + --------- + - else: + Example 2:: - def wrap_result(result): - if isinstance(result, Row): - return tuple(result) - elif isinstance(result, tuple): - return result - else: - return (result,) + >>> new_df = df.flat_map( + ... lambda row: [(row[1], row[0]), (row[1], row[0] * 2), (row[1], row[0] * 3)], + ... output_types=[StringType(), IntegerType()] + ... ) + >>> new_df.order_by("c_1", "c_2").show() + ----------------- + |"C_1" |"C_2" | + ----------------- + |a |10 | + |a |20 | + |a |30 | + |b |20 | + |b |40 | + |b |60 | + ----------------- + - class _MapFunc: - def process(self, *argv): - input_args_to_row = Row(*df_columns) - yield wrap_result(func(input_args_to_row(*argv))) + Example 3:: - map_udtf = self._session.udtf.register( - _MapFunc, - output_schema=output_schema, - input_types=input_types, - input_names=df_columns, + >>> new_df = df.flat_map( + ... lambda row: [(row[1], row[0]), (row[1], row[0] * 2), (row[1], row[0] * 3)], + ... output_types=[StringType(), IntegerType()], + ... output_column_names=["col1", "col2"] + ... ) + >>> new_df.order_by("col1", "col2").show() + ------------------- + |"COL1" |"COL2" | + ------------------- + |a |10 | + |a |20 | + |a |30 | + |b |20 | + |b |40 | + |b |60 | + ------------------- + + + Example 4:: + + >>> data = [ + ... [['A', 'B', 'C'], 'Guard', 7], + ... [['D', 'E'], 'Forward', 14], + ... [['F', 'G', 'H', 'I', 'J'], 'Center', 19], + ... ] + >>> df = session.create_dataframe(data, schema=["team", "position", "points"]) + >>> df.flat_map( + ... lambda pdf: pdf.explode('TEAM'), + ... output_types=[StringType(), StringType(), IntegerType()], + ... output_column_names=["team", "position", "points"], + ... vectorized=True, + ... packages=["pandas"] + ... ).order_by("team").show() + ---------------------------------- + |"TEAM" |"POSITION" |"POINTS" | + ---------------------------------- + |A |Guard |7 | + |B |Guard |7 | + |C |Guard |7 | + |D |Forward |14 | + |E |Forward |14 | + |F |Center |19 | + |G |Center |19 | + |H |Center |19 | + |I |Center |19 | + |J |Center |19 | + ---------------------------------- + + + Example 5:: + + >>> df.flat_map( + ... lambda pdf: ((len(pdf.iloc[0]["TEAM"]),), (pdf.iloc[0]["POSITION"],)), + ... output_types=[IntegerType(), StringType()], + ... output_column_names=["size", "position"], + ... vectorized=True, + ... partition_by=["position"], + ... packages=["pandas"] + ... ).order_by("size").show() + ----------------------- + |"SIZE" |"POSITION" | + ----------------------- + |2 |Forward | + |3 |Guard | + |5 |Center | + ----------------------- + + + Note: + 1. The result of the ``func`` function must be an iterable. Each element of the + iterable must either be a scalar value, or a tuple containing the same number of + elements and types as specified in the ``output_types`` argument. + + 2. When using ``vectorized`` option, the ``func`` function must accept a pandas + Dataframe as input and + + 3. Specifying ``partition_by`` is highly recommended ``vectorized`` option is set + for optimal load balancing. + """ + df_columns = self.columns + map_udtf, output_columns = self._parse_and_register_udtf_for_map( + func=func, + output_types=output_types, + output_column_names=output_column_names, imports=imports, packages=packages, immutable=immutable, + vectorized=vectorized, max_batch_size=max_batch_size, + df_columns=df_columns, + is_flat_map=True, ) + partition_by = lit(1) if (partition_by is None and vectorized) else partition_by + return self.join_table_function( map_udtf(*df_columns).over(partition_by=partition_by) ).select(*output_columns) @@ -6079,6 +6328,7 @@ def process(self, *argv): printSchema = print_schema foreach = map toJSON = to_json + flatMap = flat_map # These methods are not needed for code migration. So no aliases for them. # groupByGrouping_sets = group_by_grouping_sets diff --git a/tests/integ/test_dataframe.py b/tests/integ/test_dataframe.py index f4b7636911c..014e26df98c 100644 --- a/tests/integ/test_dataframe.py +++ b/tests/integ/test_dataframe.py @@ -4361,6 +4361,64 @@ def test_map_basic( Utils.check_answer(df.map(func, output_types), expected) +@pytest.mark.skipif( + "config.getoption('local_testing_mode', default=False)", + reason="Table function is not supported in Local Testing", +) +@pytest.mark.udf +@pytest.mark.parametrize("overlapping_columns", [True, False]) +@pytest.mark.parametrize( + "func,output_types,output_column_names,expected", + [ + ( + lambda row: (row[1] + 1, row[1] + 2), + [IntegerType()], + ["A"], + [(i + 1,) for i in range(5)] + [(i + 2,) for i in range(5)], + ), + ( + lambda row: [(row.B * 2, row.C)], + [IntegerType(), StringType()], + ["B", "C"], + [ + ( + i * 2, + f"w{i}", + ) + for i in range(5) + ], + ), + ( + lambda row: [ + Row(row.B * row.B, f"-{row.C}"), + Row(row.B + row.B, f"+{row.C}"), + ], + [IntegerType(), StringType()], + ["B", "C"], + [(i * i, f"-w{i}") for i in range(5)] + + [(i + i, f"+w{i}") for i in range(5)], + ), + ], +) +def test_flat_map_basic( + session, func, output_types, output_column_names, expected, overlapping_columns +): + df = session.create_dataframe( + [(True, i, f"w{i}") for i in range(5)], schema=["a", "b", "c"] + ) + if overlapping_columns: + row = Row(*output_column_names) + expected = [row(*e) for e in expected] + Utils.check_answer( + df.flat_map(func, output_types, output_column_names=output_column_names), + expected, + ) + else: + row = Row(*[f"C_{i+1}" for i in range(len(output_types))]) + expected = [row(*e) for e in expected] + Utils.check_answer(df.flat_map(func, output_types), expected) + + @pytest.mark.skipif( "config.getoption('local_testing_mode', default=False)", reason="Table function is not supported in Local Testing", @@ -4402,6 +4460,74 @@ def test_map_vectorized(session, func, output_types, expected): ) +@pytest.mark.skipif( + "config.getoption('local_testing_mode', default=False)", + reason="Table function is not supported in Local Testing", +) +@pytest.mark.skipif(not is_pandas_available, reason="pandas is required for this test") +@pytest.mark.udf +@pytest.mark.parametrize( + "func,data,output_types,partition_by,expected_lambda", + [ + (lambda df: df, [["a"], ["b"], ["c"]], [StringType()], None, lambda row: [row]), + ( + lambda df: df.explode("_1"), + [ + [["A", "B", "C"], "Guard", 7], + [["D"], "Forward", 15], + [["E", "F"], "Center", 20], + ], + [StringType(), StringType(), IntegerType()], + None, + lambda row: [(row[0][i], row[1], row[2]) for i in range(len(row[0]))], + ), + ( + lambda df: df.explode("_1"), + [ + [["A", "B", "C"], "Guard", 7], + [["D"], "Forward", 15], + [["E", "F"], "Center", 20], + ], + [StringType(), StringType(), IntegerType()], + "_2", + lambda row: [(row[0][i], row[1], row[2]) for i in range(len(row[0]))], + ), + ( + lambda df: ((len(df.iloc[0]["_1"]),), (df.iloc[0]["_2"],)), + [ + [["A", "B", "C"], "Guard", 7], + [["D"], "Forward", 15], + [["E", "F"], "Center", 20], + ], + [ + IntegerType(), + StringType(), + ], + "_2", + lambda row: [(len(row[0]), row[1])], + ), + ], +) +def test_flat_map_vectorized( + session, func, data, output_types, partition_by, expected_lambda +): + df = session.create_dataframe(data=data) + expected = [] + for row in data: + expected.extend(expected_lambda(row)) + print(expected) + Utils.check_answer( + df.flat_map( + func, + output_types, + vectorized=True, + partition_by=partition_by, + packages=["pandas"], + ), + expected, + ) + + @pytest.mark.skipif( "config.getoption('local_testing_mode', default=False)", reason="Table function is not supported in Local Testing", @@ -4437,20 +4563,65 @@ def test_map_chained(session): Utils.check_answer(new_df, expected) -def test_map_negative(session): +@pytest.mark.skipif( + "config.getoption('local_testing_mode', default=False)", + reason="Table function is not supported in Local Testing", +) +@pytest.mark.skipif(not is_pandas_available, reason="pandas is required for this test") +@pytest.mark.udf +def test_flat_map_chained(session): + # explode, partitiiton by _2 and apply len + data = [ + [["A", "B", "C"], "Guard", 7], + [["D"], "Forward", 15], + [["E", "F"], "Center", 20], + ] + schema = ["Team", "Position", "Points"] + df = session.create_dataframe(data, schema=schema) + new_df = df.flat_map( + lambda pdf: pdf.explode("TEAM"), + output_types=[StringType(), StringType(), IntegerType()], + output_column_names=schema, + vectorized=True, + packages=["pandas"], + ).flat_map( + lambda pdf: ( + (len(pdf),), + (pdf.iloc[0]["POSITION"],), + ), + output_types=[IntegerType(), StringType()], + output_column_names=["SIZE", "POSITION"], + vectorized=True, + partition_by="POSITION", + ) + Utils.check_answer(new_df, [(3, "Guard"), (1, "Forward"), (2, "Center")]) + + +@pytest.mark.parametrize("test_flat_map", [True, False]) +def test_map_negative(session, test_flat_map): df1 = session.create_dataframe( [[True, i, f"w{i}"] for i in range(5)], schema=["A", "B", "C"] ) with pytest.raises(ValueError, match="output_types cannot be empty."): - df1.map(lambda row: [row.B, row.C], output_types=[]) + if test_flat_map: + df1.flat_map(lambda row: [row.B, row.C], output_types=[]) + else: + df1.map(lambda row: [row.B, row.C], output_types=[]) with pytest.raises( ValueError, match="'output_column_names' and 'output_types' must be of the same size.", ): - df1.map( - lambda row: [row.B, row.C], - output_types=[IntegerType(), StringType()], - output_column_names=["a", "b", "c"], - ) + if test_flat_map: + df1.flat_map( + lambda row: [row.B, row.C], + output_types=[IntegerType(), StringType()], + output_column_names=["a", "b", "c"], + ) + else: + df1.map( + lambda row: [row.B, row.C], + output_types=[IntegerType(), StringType()], + output_column_names=["a", "b", "c"], + )