Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrays #471

Closed
wants to merge 25 commits into from
Closed

Arrays #471

wants to merge 25 commits into from

Conversation

mrocklin
Copy link
Member

@mrocklin mrocklin commented Dec 6, 2023

This implements a skeleton version of dask.array. It currently includes the following:

  • from_array
  • blockwise
  • slicing
  • rechunking
  • reductions
  • random

But there are many missing gaps. Some notabe omissions:

  • Don't combine slicing and from_array, so getting a single row isn't as fast as it could be
  • blockwise/elemwise operations need to be filled out. This is easy, but things like add are built and sub is not.
  • Same for reductions. All I've done is sum
  • There are various cut corners. For example in slicing I pulled out slicing by a Dask array and in reductions I've not handled reductions that require chunks or meta to be rewritten
  • blockwise fusion isn't implemented
  • reductions could probably use another layer in the class hierarchy and some lowering (we immediately generate some implementation-specific stuff)
  • We'll maybe need to think about combine-similar for rechunking, similar to what we did for dataframes

In general though I think that the majority (60%?) of the very hard work is done.

Examples

import dask_expr.array as da
x = da.random.random((10000, 10000))
y = (2 * x + x.T)[::10]
y.pprint()
Slice: index=(slice(None, None, 10), slice(None, None, None))
  Elemwise: op=<built-in function add>
    Elemwise: op=<built-in function mul>2
      Random: rng=<dask_expr.array.random.RandomState object at 0x143fc2c90> distribution='random_sample' size=(10000, 10000) chunks='auto' args=() kwargs={}
    Transpose: axes=(1, 0)
      Random: rng=<dask_expr.array.random.RandomState object at 0x143fc2c90> distribution='random_sample' size=(10000, 10000) chunks='auto' args=() kwargs={}
y.pprint()
Elemwise: op=<built-in function add>
  Elemwise: op=<built-in function mul>2
    Slice: index=(slice(None, None, 10), slice(None, None, None))
      Random: rng=<dask_expr.array.random.RandomState object at 0x143fc2c90> distribution='random_sample' size=(10000, 10000) chunks='auto' args=() kwargs={}
  Transpose: axes=(1, 0)
    Slice: index=(slice(None, None, None), slice(None, None, 10))
      Random: rng=<dask_expr.array.random.RandomState object at 0x143fc2c90> distribution='random_sample' size=(10000, 10000) chunks='auto' args=() kwargs={}
y.sum().compute()
15004283.422871998
y = (x + 1).T.rechunk((10000, 10))
y.pprint()
Rechunk: _chunks=(10000, 10) balance=False
  Transpose: axes=(1, 0)
    Elemwise: op=<built-in function add>1
      Random: rng=<dask_expr.array.random.RandomState object at 0x143fc2c90> distribution='random_sample' size=(10000, 10000) chunks='auto' args=() kwargs={}
y.optimize().pprint()
Transpose: axes=(1, 0)
  Elemwise: op=<built-in function add>1
    Random: rng=<dask_expr.array.random.RandomState object at 0x143fc2c90> distribution='random_sample' size=(10000, 10000) chunks=(10, 10000) args=() kwargs={}

@mrocklin
Copy link
Member Author

mrocklin commented Dec 6, 2023

Depends on dask/dask#10676

Also, no rush on this PR. I think that this is likely to just be a working branch for a while. I would like to get #470 in though if we can before that goes stale (which is likely to happen quickly I suspect).

@mrocklin
Copy link
Member Author

mrocklin commented Dec 8, 2023

OK, I went to add blockwise, but needed unify_chunks, which needed rechunk. I've added rechunk (including our first optimization!)

@mrocklin
Copy link
Member Author

mrocklin commented Dec 8, 2023

OK, this does trivial things now:

In [1]: import numpy as np, dask_expr.array as da

In [2]: x = da.from_array(np.random.random((1000, 1000)))

In [3]: y = da.from_array(np.random.random((1000)))

In [4]: z = (x + y).rechunk((500, 200))

In [5]: z.pprint()
Rechunk: _chunks=(500, 200) balance=False
  Elemwise: func=<built-in function add> out_ind=(1, 0) token='add' dtype=dtype('float64') new_axes={} kwargs={}(1, 0)(0,)
    FromArray: array='<array>' chunks='auto'
    FromArray: array='<array>' chunks='auto'

In [6]: z.optimize().pprint()
Elemwise: func=<built-in function add> out_ind=(1, 0) token='add' dtype=dtype('float64') new_axes={} kwargs={}(1, 0)(0,)
  FromArray: array='<array>' chunks=(500, 200)
  FromArray: array='<array>' chunks=(200,)

In [7]: x.T.T.pprint()
Transpose: axes=(1, 0)
  Transpose: axes=(1, 0)
    FromArray: array='<array>' chunks='auto'

In [8]: x.T.T.optimize().pprint()
FromArray: array='<array>' chunks='auto'

cc @dcherian @TomNicholas @jhamman

Still very broken, but hopefully enough of a prop for conversation at AGU


x = (xr.DataArray(b, dims=["x", "y"]) + 1).chunk(x=2)

assert x.data.optimize()._name == (da.from_array(a, chunks={0: 2}) + 1)._name
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @dcherian the xarray thing we were playing with works now

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woot!

@mrocklin
Copy link
Member Author

OK, I've implemented rudimentary versions of blockwise, slicing, rechunking, reductions, random, and from_array. The opening comment has been updated. This should suffice for POCs. I hope to be able to nerd snipe someone to flesh out this skeleton.

@mrocklin
Copy link
Member Author

@fjetter @phofl we'll need to figure out how/if we should review this and eventually merge.

Priority is certainly dataframes, and I don't want to upset the momentum there. I'd also like to avoid this sitting stale for a long time. Or maybe that's best. Better for things to go stale in a PR maybe rather than go stale in main.

@dcherian
Copy link

Mind adding an automatic .simplify() when calling .compute()? That will help some experiments with Xarray

@mrocklin
Copy link
Member Author

I've added support for general numpy ufuncs (not gufuncs) and filled out the reductions and operators a little

@fjetter fjetter mentioned this pull request Jun 24, 2024
@phofl
Copy link
Collaborator

phofl commented Jun 25, 2024

we merged the rebase

@phofl phofl closed this Jun 25, 2024
@mrocklin
Copy link
Member Author

mrocklin commented Jun 25, 2024 via email

@dcherian
Copy link

Exciting!

@mrocklin
Copy link
Member Author

@dcherian just to be clear, this implementation is broken and would produce wrong results in many cases. Please do not use today. This is still very much a work in progress.

@dcherian
Copy link

👍🏾 excited to see movement that's all

@mrocklin mrocklin deleted the arrays branch June 25, 2024 21:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants