Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC][WIP] Add backend exec protocol #711

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,19 @@ def pprint(self):
def dask(self):
return self.__dask_graph__()

def exec(self, simplify=True):
"""Directly execute with the backend DataFrame library

WARNING: This is an experimental feature. Use at your own risk.

This function will NOT convert the expression to a task graph
and execute with dask. Instead, the backend library will be
used to execute the logic defined by the ``Expr.__exec__``
protocols directly.
"""
out = self.simplify() if simplify else self
return out.expr.__exec__()

def __dask_graph__(self):
out = self.expr
out = out.lower_completely()
Expand Down
5 changes: 5 additions & 0 deletions dask_expr/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ def __new__(cls, *args, **kwargs):
Expr._instances[_name] = inst
return inst

def __exec__(self):
raise NotImplementedError(
f"Backend exec is not yet supported for {type(self)}."
)

def _tune_down(self):
return None

Expand Down
4 changes: 4 additions & 0 deletions dask_expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,10 @@ def _meta(self):
args = [op._meta if isinstance(op, Expr) else op for op in self._args]
return self.operation(*args, **self._kwargs)

def __exec__(self):
args = [op.__exec__() if isinstance(op, Expr) else op for op in self._args]
return self.operation(*args, **self._kwargs)

@functools.cached_property
def _kwargs(self) -> dict:
if self._keyword_only:
Expand Down
9 changes: 9 additions & 0 deletions dask_expr/_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,15 @@ class GroupbyAggregationBase(GroupByApplyConcatApply, GroupByBase):
"_slice": None,
}

def __exec__(self):
frame = self.frame.__exec__()
kwargs = {
"sort": self.sort,
**_as_dict("observed", self.observed),
**_as_dict("dropna", self.dropna),
}
return frame.groupby(self.by, **kwargs).aggregate(self.arg)

@functools.cached_property
def spec(self):
# Converts the `arg` operand into specific
Expand Down
5 changes: 5 additions & 0 deletions dask_expr/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ def _meta(self):
kwargs["how"] = "left"
return make_meta(left.merge(right, **kwargs))

def __exec__(self):
left = self.left.__exec__()
right = self.right.__exec__()
return left.merge(right, **self.kwargs)

@functools.cached_property
def _npartitions(self):
if self.operand("_npartitions") is not None:
Expand Down
12 changes: 12 additions & 0 deletions dask_expr/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,14 @@ def _meta(self):
other = self._other
return self.frame._meta.set_index(other, drop=self.drop)

def __exec__(self):
frame = self.frame.__exec__()
if isinstance(self._other, Expr):
other = self._other.__exec__()
else:
other = self._other
return frame.set_index(other, drop=self.drop)

@property
def _divisions_column(self):
return self.other
Expand Down Expand Up @@ -996,6 +1004,10 @@ def sort_function_kwargs(self):
def _meta(self):
return self.frame._meta

def __exec__(self):
frame = self.frame.__exec__()
return self.sort_function(frame, **self.sort_function_kwargs)

@functools.cached_property
def _meta_by_dtype(self):
dtype = self._meta.dtypes[self.by]
Expand Down
6 changes: 6 additions & 0 deletions dask_expr/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,12 @@ def _meta(self):
return meta[self.columns[0]] if self._series else meta[self.columns]
return meta

def __exec__(self):
pdf = self.operand("frame")._data
if self.columns:
return pdf[self.columns[0]] if self._series else pdf[self.columns]
return pdf

@functools.cached_property
def columns(self):
columns_operand = self.operand("columns")
Expand Down
8 changes: 8 additions & 0 deletions dask_expr/tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2665,3 +2665,11 @@ def test_empty_from_pandas_projection():
df["foo"] = from_pandas(foo, npartitions=1)
pdf["foo"] = foo
assert_eq(df["foo"], pdf["foo"])


def test_exec():
pdf = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6], "b": 1, "c": 2})
df = from_pandas(pdf.copy())
result = (df + 1).sort_values("a")["a"]
result_pd = (pdf + 1).sort_values("a")["a"]
assert_eq(result.exec(), result_pd)
Loading