Skip to content

Commit

Permalink
Backport PR pandas-dev#57843: DOC: Remove Dask and Modin sections in …
Browse files Browse the repository at this point in the history
…scale.rst in favor of linking to ecosystem docs. (pandas-dev#57861)

Co-authored-by: Yuki Kitayama <[email protected]>
  • Loading branch information
mroeschke and yukikitayama authored Mar 15, 2024
1 parent b6488af commit 962e233
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 161 deletions.
164 changes: 7 additions & 157 deletions doc/source/user_guide/scale.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ fits in memory, you can work with datasets that are much larger than memory.

Chunking works well when the operation you're performing requires zero or minimal
coordination between chunks. For more complicated workflows, you're better off
:ref:`using another library <scale.other_libraries>`.
:ref:`using other libraries <scale.other_libraries>`.

Suppose we have an even larger "logical dataset" on disk that's a directory of parquet
files. Each file in the directory represents a different year of the entire dataset.
Expand Down Expand Up @@ -219,160 +219,10 @@ different library that implements these out-of-core algorithms for you.

.. _scale.other_libraries:

Use Dask
--------
Use Other Libraries
-------------------

pandas is just one library offering a DataFrame API. Because of its popularity,
pandas' API has become something of a standard that other libraries implement.
The pandas documentation maintains a list of libraries implementing a DataFrame API
in `the ecosystem page <https://pandas.pydata.org/community/ecosystem.html>`_.

For example, `Dask`_, a parallel computing library, has `dask.dataframe`_, a
pandas-like API for working with larger than memory datasets in parallel. Dask
can use multiple threads or processes on a single machine, or a cluster of
machines to process data in parallel.


We'll import ``dask.dataframe`` and notice that the API feels similar to pandas.
We can use Dask's ``read_parquet`` function, but provide a globstring of files to read in.

.. ipython:: python
:okwarning:
import dask.dataframe as dd
ddf = dd.read_parquet("data/timeseries/ts*.parquet", engine="pyarrow")
ddf
Inspecting the ``ddf`` object, we see a few things

* There are familiar attributes like ``.columns`` and ``.dtypes``
* There are familiar methods like ``.groupby``, ``.sum``, etc.
* There are new attributes like ``.npartitions`` and ``.divisions``

The partitions and divisions are how Dask parallelizes computation. A **Dask**
DataFrame is made up of many pandas :class:`pandas.DataFrame`. A single method call on a
Dask DataFrame ends up making many pandas method calls, and Dask knows how to
coordinate everything to get the result.

.. ipython:: python
ddf.columns
ddf.dtypes
ddf.npartitions
One major difference: the ``dask.dataframe`` API is *lazy*. If you look at the
repr above, you'll notice that the values aren't actually printed out; just the
column names and dtypes. That's because Dask hasn't actually read the data yet.
Rather than executing immediately, doing operations build up a **task graph**.

.. ipython:: python
:okwarning:
ddf
ddf["name"]
ddf["name"].value_counts()
Each of these calls is instant because the result isn't being computed yet.
We're just building up a list of computation to do when someone needs the
result. Dask knows that the return type of a :class:`pandas.Series.value_counts`
is a pandas :class:`pandas.Series` with a certain dtype and a certain name. So the Dask version
returns a Dask Series with the same dtype and the same name.

To get the actual result you can call ``.compute()``.

.. ipython:: python
:okwarning:
%time ddf["name"].value_counts().compute()
At that point, you get back the same thing you'd get with pandas, in this case
a concrete pandas :class:`pandas.Series` with the count of each ``name``.

Calling ``.compute`` causes the full task graph to be executed. This includes
reading the data, selecting the columns, and doing the ``value_counts``. The
execution is done *in parallel* where possible, and Dask tries to keep the
overall memory footprint small. You can work with datasets that are much larger
than memory, as long as each partition (a regular pandas :class:`pandas.DataFrame`) fits in memory.

By default, ``dask.dataframe`` operations use a threadpool to do operations in
parallel. We can also connect to a cluster to distribute the work on many
machines. In this case we'll connect to a local "cluster" made up of several
processes on this single machine.

.. code-block:: python
>>> from dask.distributed import Client, LocalCluster
>>> cluster = LocalCluster()
>>> client = Client(cluster)
>>> client
<Client: 'tcp://127.0.0.1:53349' processes=4 threads=8, memory=17.18 GB>
Once this ``client`` is created, all of Dask's computation will take place on
the cluster (which is just processes in this case).

Dask implements the most used parts of the pandas API. For example, we can do
a familiar groupby aggregation.

.. ipython:: python
:okwarning:
%time ddf.groupby("name")[["x", "y"]].mean().compute().head()
The grouping and aggregation is done out-of-core and in parallel.

When Dask knows the ``divisions`` of a dataset, certain optimizations are
possible. When reading parquet datasets written by dask, the divisions will be
known automatically. In this case, since we created the parquet files manually,
we need to supply the divisions manually.

.. ipython:: python
:okwarning:
N = 12
starts = [f"20{i:>02d}-01-01" for i in range(N)]
ends = [f"20{i:>02d}-12-13" for i in range(N)]
divisions = tuple(pd.to_datetime(starts)) + (pd.Timestamp(ends[-1]),)
ddf.divisions = divisions
ddf
Now we can do things like fast random access with ``.loc``.

.. ipython:: python
:okwarning:
ddf.loc["2002-01-01 12:01":"2002-01-01 12:05"].compute()
Dask knows to just look in the 3rd partition for selecting values in 2002. It
doesn't need to look at any other data.

Many workflows involve a large amount of data and processing it in a way that
reduces the size to something that fits in memory. In this case, we'll resample
to daily frequency and take the mean. Once we've taken the mean, we know the
results will fit in memory, so we can safely call ``compute`` without running
out of memory. At that point it's just a regular pandas object.

.. ipython:: python
:okwarning:
@savefig dask_resample.png
ddf[["x", "y"]].resample("1D").mean().cumsum().compute().plot()
.. ipython:: python
:suppress:
import shutil
shutil.rmtree("data/timeseries")
These Dask examples have all be done using multiple processes on a single
machine. Dask can be `deployed on a cluster
<https://docs.dask.org/en/latest/setup.html>`_ to scale up to even larger
datasets.

You see more dask examples at https://examples.dask.org.

.. _Dask: https://dask.org
.. _dask.dataframe: https://docs.dask.org/en/latest/dataframe.html
There are other libraries which provide similar APIs to pandas and work nicely with pandas DataFrame,
and can give you the ability to scale your large dataset processing and analytics
by parallel runtime, distributed memory, clustering, etc. You can find more information
in `the ecosystem page <https://pandas.pydata.org/community/ecosystem.html#out-of-core>`_.
3 changes: 1 addition & 2 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ dependencies:
- zstandard>=0.19.0

# downstream packages
- dask-core<=2024.2.1
- dask-core
- seaborn-base
- dask-expr<=0.5.3

# local testing dependencies
- moto
Expand Down
3 changes: 1 addition & 2 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ xarray>=2022.12.0
xlrd>=2.0.1
xlsxwriter>=3.0.5
zstandard>=0.19.0
dask<=2024.2.1
dask
seaborn
dask-expr<=0.5.3
moto
flask
asv>=0.6.1
Expand Down

0 comments on commit 962e233

Please sign in to comment.