Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46858][PYTHON][PS][BUILD] Upgrade Pandas to 2.2.0 #44881

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9ae857a
[SPARK-46858][PYTHON][PS][INFRA] Upgrade Pandas to 2.2.0
itholic Jan 25, 2024
5caa678
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Jan 29, 2024
e9a6445
pin version
itholic Jan 29, 2024
edb3d9a
fix series default name issue
itholic Jan 29, 2024
5440381
upperbound for PyPy3
itholic Jan 30, 2024
3e66505
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Feb 13, 2024
8643ebd
Fix melt
itholic Feb 13, 2024
a8237b4
Fix test util related changes
itholic Feb 13, 2024
836dcfe
Fix more test utils
itholic Feb 13, 2024
d3c5f57
Fix resample test
itholic Feb 13, 2024
66f69a2
Rule code mapping
itholic Feb 14, 2024
9d4e8a1
Fix booleanops tests
itholic Feb 14, 2024
37300e8
use proper rule code
itholic Feb 14, 2024
ea57fdb
Fix unsupported cases
itholic Feb 14, 2024
a3f3e91
Fix Categorical test
itholic Feb 15, 2024
8a24900
Fix SparkConnectFunctionsTests
itholic Feb 15, 2024
b727550
Fix linter
itholic Feb 15, 2024
e92082f
Fix NumOpsTests
itholic Feb 16, 2024
5f62fcc
Fix FrameReshapingTests
itholic Feb 16, 2024
f235780
ResampleSeriesTests
itholic Feb 16, 2024
ad67735
Fix ReverseTests
itholic Feb 16, 2024
4e6c77a
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Feb 16, 2024
26b7bd6
revert unrelated changes
itholic Feb 16, 2024
4c84b2a
Fix plotting
itholic Feb 16, 2024
fbbaf88
Fix BoxPlot
itholic Feb 19, 2024
0ca4aa6
Fix concat bug in Pandas
itholic Feb 19, 2024
7536263
Fix DataFrame hist plot
itholic Feb 19, 2024
b07e608
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Feb 20, 2024
acd7b7f
Add release note
itholic Feb 20, 2024
d560825
has -> have
itholic Feb 20, 2024
6de7931
Make resample work in old pandas as well
itholic Feb 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dev/infra/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ RUN mkdir -p /usr/local/pypy/pypy3.8 && \
ln -sf /usr/local/pypy/pypy3.8/bin/pypy /usr/local/bin/pypy3.8 && \
ln -sf /usr/local/pypy/pypy3.8/bin/pypy /usr/local/bin/pypy3
RUN curl -sS https://bootstrap.pypa.io/get-pip.py | pypy3
RUN pypy3 -m pip install numpy 'six==1.16.0' 'pandas<=2.1.4' scipy coverage matplotlib lxml
RUN pypy3 -m pip install numpy 'six==1.16.0' 'pandas<=2.2.0' scipy coverage matplotlib lxml
itholic marked this conversation as resolved.
Show resolved Hide resolved


ARG BASIC_PIP_PKGS="numpy pyarrow>=15.0.0 six==1.16.0 pandas<=2.1.4 scipy plotly>=4.8 mlflow>=2.8.1 coverage matplotlib openpyxl memory-profiler>=0.61.0 scikit-learn>=1.3.2"
ARG BASIC_PIP_PKGS="numpy pyarrow>=15.0.0 six==1.16.0 pandas<=2.2.0 scipy plotly>=4.8 mlflow>=2.8.1 coverage matplotlib openpyxl memory-profiler>=0.61.0 scikit-learn>=1.3.2"
# Python deps for Spark Connect
ARG CONNECT_PIP_PKGS="grpcio==1.59.3 grpcio-status==1.59.3 protobuf==4.25.1 googleapis-common-protos==1.56.4"

Expand Down
1 change: 1 addition & 0 deletions python/docs/source/migration_guide/pyspark_upgrade.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Upgrading from PySpark 3.5 to 4.0
* In Spark 4.0, ``Series.dt.week`` and ``Series.dt.weekofyear`` have been removed from Pandas API on Spark, use ``Series.dt.isocalendar().week`` instead.
* In Spark 4.0, when applying ``astype`` to a decimal type object, the existing missing value is changed to ``True`` instead of ``False`` from Pandas API on Spark.
* In Spark 4.0, ``pyspark.testing.assertPandasOnSparkEqual`` has been removed from Pandas API on Spark, use ``pyspark.pandas.testing.assert_frame_equal`` instead.
* In Spark 4.0, the aliases ``Y``, ``M``, ``H``, ``T``, ``S`` have been deprecated from Pandas API on Spark, use ``YE``, ``ME``, ``h``, ``min``, ``s`` instead respectively.



Expand Down
6 changes: 4 additions & 2 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -10609,8 +10609,10 @@ def melt(
name_like_string(name) if name is not None else "variable_{}".format(i)
for i, name in enumerate(self._internal.column_label_names)
]
elif isinstance(var_name, str):
var_name = [var_name]
elif is_list_like(var_name):
raise ValueError(f"{var_name=} must be a scalar.")
else:
var_name = [var_name] # type: ignore[list-item]

pairs = F.explode(
F.array(
Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/pandas/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -2554,7 +2554,10 @@ def resolve_func(psdf, this_column_labels, that_column_labels):
if isinstance(obj, Series):
num_series += 1
series_names.add(obj.name)
new_objs.append(obj.to_frame(DEFAULT_SERIES_NAME))
if not ignore_index and not should_return_series:
new_objs.append(obj.to_frame())
else:
new_objs.append(obj.to_frame(DEFAULT_SERIES_NAME))
Comment on lines -2557 to +2560
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else:
assert isinstance(obj, DataFrame)
new_objs.append(obj)
Expand Down
99 changes: 81 additions & 18 deletions python/pyspark/pandas/plot/matplotlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
# limitations under the License.
#

from typing import final

from pyspark.loose_version import LooseVersion

import matplotlib as mat
import numpy as np
from matplotlib.axes._base import _process_plot_format # type: ignore[attr-defined]
from matplotlib.figure import Figure
from pandas.core.dtypes.inference import is_list_like
from pandas.io.formats.printing import pprint_thing
from pandas.plotting._matplotlib import ( # type: ignore[attr-defined]
Expand All @@ -44,10 +47,29 @@
unsupported_function,
KdePlotBase,
)
from pyspark.pandas.series import Series, first_series

_all_kinds = PlotAccessor._all_kinds # type: ignore[attr-defined]


def _set_ticklabels(ax, labels, is_vertical, **kwargs) -> None:
"""Set the tick labels of a given axis.

Due to https://github.com/matplotlib/matplotlib/pull/17266, we need to handle the
case of repeated ticks (due to `FixedLocator`) and thus we duplicate the number of
labels.
"""
ticks = ax.get_xticks() if is_vertical else ax.get_yticks()
if len(ticks) != len(labels):
i, remainder = divmod(len(ticks), len(labels))
assert remainder == 0, remainder
labels *= i
if is_vertical:
ax.set_xticklabels(labels, **kwargs)
else:
ax.set_yticklabels(labels, **kwargs)


class PandasOnSparkBarPlot(PandasBarPlot, TopNPlotBase):
_kind = "bar"

Expand Down Expand Up @@ -231,10 +253,23 @@ def _plot(self, ax, bxpstats, column_num=None, return_type="axes", **kwds):
else:
return ax, bp

@final
def _ensure_frame(self, data):
if isinstance(data, Series):
label = self.label
if label is None and data.name is None:
label = ""
if label is None:
data = data.to_frame()
else:
data = data.to_frame(name=label)
return data

def _compute_plot_data(self):
colname = self.data.name
spark_column_name = self.data._internal.spark_column_name_for(self.data._column_label)
data = self.data
data = first_series(data) if not isinstance(data, Series) else data
colname = data.name
spark_column_name = data._internal.spark_column_name_for(data._column_label)

# Updates all props with the rc defaults from matplotlib
self.kwds.update(PandasOnSparkBoxPlot.rc_defaults(**self.kwds))
Expand Down Expand Up @@ -277,7 +312,7 @@ def _compute_plot_data(self):

self.data = {labels[0]: stats}

def _make_plot(self):
def _make_plot(self, fig: Figure):
bxpstats = list(self.data.values())[0]
ax = self._get_ax(0)
kwds = self.kwds.copy()
Expand All @@ -303,7 +338,7 @@ def _make_plot(self):
labels = [pprint_thing(lbl) for lbl in labels]
if not self.use_index:
labels = [pprint_thing(key) for key in range(len(labels))]
self._set_ticklabels(ax, labels)
_set_ticklabels(ax, labels, self.orientation == "vertical")

@staticmethod
def rc_defaults(
Expand Down Expand Up @@ -363,10 +398,32 @@ def _args_adjust(self):
if is_list_like(self.bottom):
self.bottom = np.array(self.bottom)

@final
def _ensure_frame(self, data):
if isinstance(data, Series):
label = self.label
if label is None and data.name is None:
label = ""
if label is None:
data = data.to_frame()
else:
data = data.to_frame(name=label)
return data

def _calculate_bins(self, data, bins):
return bins
Comment on lines +413 to +414
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pandas recently pushed couple of commits for refactoring the internal plotting structure such as pandas-dev/pandas#55850 or pandas-dev/pandas#55872, so we also should inherits couple of internal methods to follow the latest Pandas behavior.


def _compute_plot_data(self):
self.data, self.bins = HistogramPlotBase.prepare_hist_data(self.data, self.bins)

def _make_plot(self):
def _make_plot_keywords(self, kwds, y):
"""merge BoxPlot/KdePlot properties to passed kwds"""
# y is required for KdePlot
kwds["bottom"] = self.bottom
kwds["bins"] = self.bins
return kwds

def _make_plot(self, fig: Figure):
# TODO: this logic is similar to KdePlot. Might have to deduplicate it.
# 'num_colors' requires to calculate `shape` which has to count all.
# Use 1 for now to save the computation.
Expand Down Expand Up @@ -423,9 +480,9 @@ class PandasOnSparkPiePlot(PandasPiePlot, TopNPlotBase):
def __init__(self, data, **kwargs):
super().__init__(self.get_top_n(data), **kwargs)

def _make_plot(self):
def _make_plot(self, fig: Figure):
self.set_result_text(self._get_ax(0))
super()._make_plot()
super()._make_plot(fig)


class PandasOnSparkAreaPlot(PandasAreaPlot, SampledPlotBase):
Expand All @@ -434,9 +491,9 @@ class PandasOnSparkAreaPlot(PandasAreaPlot, SampledPlotBase):
def __init__(self, data, **kwargs):
super().__init__(self.get_sampled(data), **kwargs)

def _make_plot(self):
def _make_plot(self, fig: Figure):
self.set_result_text(self._get_ax(0))
super()._make_plot()
super()._make_plot(fig)


class PandasOnSparkLinePlot(PandasLinePlot, SampledPlotBase):
Expand All @@ -445,9 +502,9 @@ class PandasOnSparkLinePlot(PandasLinePlot, SampledPlotBase):
def __init__(self, data, **kwargs):
super().__init__(self.get_sampled(data), **kwargs)

def _make_plot(self):
def _make_plot(self, fig: Figure):
self.set_result_text(self._get_ax(0))
super()._make_plot()
super()._make_plot(fig)


class PandasOnSparkBarhPlot(PandasBarhPlot, TopNPlotBase):
Expand All @@ -456,9 +513,9 @@ class PandasOnSparkBarhPlot(PandasBarhPlot, TopNPlotBase):
def __init__(self, data, **kwargs):
super().__init__(self.get_top_n(data), **kwargs)

def _make_plot(self):
def _make_plot(self, fig: Figure):
self.set_result_text(self._get_ax(0))
super()._make_plot()
super()._make_plot(fig)


class PandasOnSparkScatterPlot(PandasScatterPlot, TopNPlotBase):
Expand All @@ -467,9 +524,9 @@ class PandasOnSparkScatterPlot(PandasScatterPlot, TopNPlotBase):
def __init__(self, data, x, y, **kwargs):
super().__init__(self.get_top_n(data), x, y, **kwargs)

def _make_plot(self):
def _make_plot(self, fig: Figure):
self.set_result_text(self._get_ax(0))
super()._make_plot()
super()._make_plot(fig)


class PandasOnSparkKdePlot(PandasKdePlot, KdePlotBase):
Expand All @@ -478,7 +535,12 @@ class PandasOnSparkKdePlot(PandasKdePlot, KdePlotBase):
def _compute_plot_data(self):
self.data = KdePlotBase.prepare_kde_data(self.data)

def _make_plot(self):
def _make_plot_keywords(self, kwds, y):
kwds["bw_method"] = self.bw_method
kwds["ind"] = type(self)._get_ind(y, ind=self.ind)
return kwds

def _make_plot(self, fig: Figure):
# 'num_colors' requires to calculate `shape` which has to count all.
# Use 1 for now to save the computation.
colors = self._get_colors(num_colors=1)
Expand Down Expand Up @@ -515,8 +577,9 @@ def _make_plot(self):
self, "_append_legend_handles_labels"
) else self._add_legend_handle(artists[0], label, index=i)

def _get_ind(self, y):
return KdePlotBase.get_ind(y, self.ind)
@staticmethod
def _get_ind(y, ind):
return KdePlotBase.get_ind(y, ind)

@classmethod
def _plot(
Expand Down
24 changes: 16 additions & 8 deletions python/pyspark/pandas/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,21 @@ def __init__(
self._resamplekey = resamplekey

self._offset = to_offset(rule)
if self._offset.rule_code not in ["A-DEC", "M", "D", "H", "T", "S"]:

if self._offset.rule_code not in ["A-DEC", "M", "ME", "D", "H", "h", "T", "min", "S", "s"]:
raise ValueError("rule code {} is not supported".format(self._offset.rule_code))
if not getattr(self._offset, "n") > 0:
raise ValueError("rule offset must be positive")

if closed is None:
self._closed = "right" if self._offset.rule_code in ["A-DEC", "M"] else "left"
self._closed = "right" if self._offset.rule_code in ["A-DEC", "M", "ME"] else "left"
elif closed in ["left", "right"]:
self._closed = closed
else:
raise ValueError("invalid closed: '{}'".format(closed))

if label is None:
self._label = "right" if self._offset.rule_code in ["A-DEC", "M"] else "left"
self._label = "right" if self._offset.rule_code in ["A-DEC", "M", "ME"] else "left"
elif label in ["left", "right"]:
self._label = label
else:
Expand Down Expand Up @@ -184,7 +185,7 @@ def _bin_timestamp(self, origin: pd.Timestamp, ts_scol: Column) -> Column:
)
)

elif rule_code == "M":
elif rule_code in ["ME", "M"]:
assert (
origin.is_month_end
and origin.hour == 0
Expand Down Expand Up @@ -264,8 +265,15 @@ def _bin_timestamp(self, origin: pd.Timestamp, ts_scol: Column) -> Column:

ret = F.when(edge_cond, edge_label).otherwise(non_edge_label)

elif rule_code in ["H", "T", "S"]:
unit_mapping = {"H": "HOUR", "T": "MINUTE", "S": "SECOND"}
elif rule_code in ["h", "min", "s", "H", "T", "S"]:
unit_mapping = {
"h": "HOUR",
"min": "MINUTE",
"s": "SECOND",
"H": "HOUR",
"T": "MINUTE",
"S": "SECOND",
}
unit_str = unit_mapping[rule_code]

truncated_ts_scol = F.date_trunc(unit_str, ts_scol)
Expand All @@ -274,10 +282,10 @@ def _bin_timestamp(self, origin: pd.Timestamp, ts_scol: Column) -> Column:
diff = timestampdiff(unit_str, origin_scol, truncated_ts_scol)
mod = F.lit(0) if n == 1 else (diff % F.lit(n))

if rule_code == "H":
if rule_code in ["h", "H"]:
assert origin.minute == 0 and origin.second == 0
edge_cond = (mod == 0) & (F.minute(ts_scol) == 0) & (F.second(ts_scol) == 0)
elif rule_code == "T":
elif rule_code in ["min", "T"]:
assert origin.second == 0
edge_cond = (mod == 0) & (F.second(ts_scol) == 0)
else:
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -7092,15 +7092,15 @@ def resample(
----------
rule : str
The offset string or object representing target conversion.
Currently, supported units are {'Y', 'A', 'M', 'D', 'H',
'T', 'MIN', 'S'}.
Currently, supported units are {'YE', 'A', 'ME', 'D', 'h',
'min', 'MIN', 's'}.
itholic marked this conversation as resolved.
Show resolved Hide resolved
closed : {{'right', 'left'}}, default None
Which side of bin interval is closed. The default is 'left'
for all frequency offsets except for 'A', 'Y' and 'M' which all
for all frequency offsets except for 'A', 'YE' and 'ME' which all
have a default of 'right'.
label : {{'right', 'left'}}, default None
Which bin edge label to label bucket with. The default is 'left'
for all frequency offsets except for 'A', 'Y' and 'M' which all
for all frequency offsets except for 'A', 'YE' and 'ME' which all
have a default of 'right'.
on : Series, optional
For a DataFrame, column to use instead of index for resampling.
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/pandas/supported_api_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
MAX_MISSING_PARAMS_SIZE = 5
COMMON_PARAMETER_SET = {"kwargs", "args", "cls"}
MODULE_GROUP_MATCH = [(pd, ps), (pdw, psw), (pdg, psg)]
PANDAS_LATEST_VERSION = "2.1.4"
PANDAS_LATEST_VERSION = "2.2.0"

RST_HEADER = """
=====================
Expand Down
13 changes: 3 additions & 10 deletions python/pyspark/pandas/tests/computation/test_melt.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,16 @@ def test_melt(self):
.sort_values(["variable_0", "variable_1", "value"])
.rename(columns=name_like_string),
)
self.assert_eq(
psdf.melt(
self.assertRaises(
ValueError,
lambda: psdf.melt(
id_vars=[(TEN, "A")],
value_vars=[(TEN, "B")],
var_name=["myV1", "myV2"],
value_name="myValname",
)
.sort_values(["myV1", "myV2", "myValname"])
.reset_index(drop=True),
pdf.melt(
id_vars=[(TEN, "A")],
value_vars=[(TEN, "B")],
var_name=["myV1", "myV2"],
value_name="myValname",
)
.sort_values(["myV1", "myV2", "myValname"])
.rename(columns=name_like_string),
)

columns.names = ["v0", "v1"]
Expand Down
Loading