Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Only return parquet metadata if intending to write #549

Merged
merged 8 commits into from
Nov 20, 2024

Conversation

martindurant
Copy link
Collaborator

No description provided.

@codecov-commenter
Copy link

codecov-commenter commented Oct 22, 2024

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

Attention: Patch coverage is 75.00000% with 3 lines in your changes missing coverage. Please review.

Project coverage is 92.93%. Comparing base (8cb8994) to head (64b3649).
Report is 148 commits behind head on main.

Files with missing lines Patch % Lines
src/dask_awkward/lib/io/parquet.py 75.00% 3 Missing ⚠️

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #549      +/-   ##
==========================================
- Coverage   93.06%   92.93%   -0.14%     
==========================================
  Files          23       22       -1     
  Lines        3290     3395     +105     
==========================================
+ Hits         3062     3155      +93     
- Misses        228      240      +12     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.


🚨 Try these New Features:

@martindurant
Copy link
Collaborator Author

@pfackeldey : added fire_and_forget flag to to_parquet. Give it a try? You must already have your dask client instantiated.

@pfackeldey
Copy link
Collaborator

Thank you for adding this option @martindurant so quickly!
I'm redirecting this test to @ikrommyd as he has an analysis setup that can test this 👍

@lgray
Copy link
Collaborator

lgray commented Nov 8, 2024

@martindurant could we also get an option for the tree reduce?

@martindurant
Copy link
Collaborator Author

tree reduce?

Reducing N*None -> None in each reduction? I suppose so. I'll get back to you in about an hour. Naturally, all these approaches are mutually exclusive, and purely experimental for now.

@martindurant
Copy link
Collaborator Author

@lgray : tree= option seems to work

@ikrommyd
Copy link
Contributor

ikrommyd commented Nov 8, 2024

I will try those out. Takes some time because I’m trying a large enough sample to see those issues and also because I’m monitoring the dashboard as it’s running.

@ikrommyd
Copy link
Contributor

ikrommyd commented Nov 8, 2024

@pfackeldey : added fire_and_forget flag to to_parquet. Give it a try? You must already have your dask client instantiated.

So we need the client to be already up when we will dak.to_parquet (when building the graph)?

@martindurant
Copy link
Collaborator Author

martindurant commented Nov 8, 2024 via email

@ikrommyd
Copy link
Contributor

ikrommyd commented Nov 12, 2024

So first report from trying out the new fire_and_forget and tree options:

  1. The tree option seems to work very well. The workflow succeeded the first time without errors. The workers still had unmanaged memory(old) in the GB scale but the to-parquet tasks weren't accumulating on them. They were being forgotten and weren't staying in memory. When the merging to the final write-parquet task after the tree reduction was about to happen, I saw no spike in memory of 1-2 workers that which would happen if they had to accumulate all the to-parquet tasks. Even better, because one worked had died for different reasons, not a lot of tasks hard to be redone with tree reduction. The computation reached the end, couldn't gather the results from this dead worker and only went back and re-did ~100 tasks (out of many thousands) and then the computation finished first try.

  2. the fire_and_forget option seems to work (I see files being written on disk) but there is a problem. The dask.compute(to_compute) doesn't hold the interpreter. So the script proceeds and then reaches its end and when the python interpreter exits, it kills the client and the scheduler. I don't know if there is a way to prevent this or if I'm doing something wrong. However, until the client got killed, I was able to see tasks being computed in the dashboard and files being written to disk. There is some sense of tracking in the dashboard as well. I'm seeing 0/X to-parquet tasks completed as it doesn't track them with this option but X keeps getting smaller and smaller due to the remaining tasks number becoming smaller. So you do have a sense of how many writing tasks you have remaining.

@lgray
Copy link
Collaborator

lgray commented Nov 12, 2024

Cool - this is useful information re: tree reduction. It would seem we should try to use it in as many remaining places as possible where we otherwise have N:1 input-to-output partitions (like collecting finalized arrays or similar things).

Histograms are already a tree reduction but those face different issues. However, used in a few places here it could bring us the robustness we appear to be missing?

This also brings up the issue - why the heck are distributed tasks taking up so much memory!? There's an additional class that represents a task in distributed which is surely eating up some space if tasks are hanging around.

I guess we should think carefully about lifecycles.

@martindurant
Copy link
Collaborator Author

why the heck are distributed tasks taking up so much memory

Quite. I suggested that perhaps a worker plugin can figure out what's being allocated as tasks go through their lifecycles, perhaps on one-thread workers. Usual tools like object growth and reference cycle finders would be the first line of attack. I'm not certain that the worker plugin system (transition method) has enough granularity, but it's an easy place to start. https://distributed.dask.org/en/stable/plugins.html#worker-plugins

@lgray
Copy link
Collaborator

lgray commented Nov 12, 2024

We need to be careful with fire_and_forget since it depends on whatever is executing tasks being interface-similar to distributed. We already have options that are not, some logic to check what's being used to execute the graph and error out if it isn't distributed is probably useful.

@martindurant
Copy link
Collaborator Author

it depends on whatever is executing tasks being interface-similar to distributed.

At least we would fail early at get_client, but your point is valid. As implemented in this PR, it is only for trialing and getting the kind of information @ikrommyd supplied, of course.

@ikrommyd
Copy link
Contributor

ikrommyd commented Nov 14, 2024

I've just tried the fire_and_forget as well. I stopped the interpreter from going past the dask.compute() call by just adding a input("Press enter when the computation finishes") and monitored the dashboard. All went fine just like in the tree-reduction case. By the end, two workers had died and the tasks those workers had into memory were just redone by other workers that spawned in the end to do just that. I got exactly the same number of parquet files with tree-reduction and fire and forget and no memory problems (the unmanaged memory (old) of the workers was still in the GB scale for fire and forget as well).

@martindurant
Copy link
Collaborator Author

I'm the weekly awkward meeting, we decided that tree reduction should become the only implementation for write-parquet (it amounts to the same layout in the case of few partitions). The fire-and-forget route will be removed from this PR and maybe can be resurrected in a separate one for those that want it. Aside from being distributed-specific, it comes with the problem of not knowing when your process is finished.

@martindurant martindurant changed the title Only return parquet metadata if intending to write [feat]: Only return parquet metadata if intending to write Nov 18, 2024
@ikrommyd
Copy link
Contributor

@martindurant
Copy link
Collaborator Author

@ikrommyd - certainly, but that would be a separate PR of course. It does seem like the metadata in that case is simply discarded anyway.

@ikrommyd
Copy link
Contributor

@ikrommyd - certainly, but that would be a separate PR of course. It does seem like the metadata in that case is simply discarded anyway.

Oh yeah, I just mentioned it here for documentation purposes since there was a discussion above.

@martindurant martindurant changed the title [feat]: Only return parquet metadata if intending to write feat: Only return parquet metadata if intending to write Nov 18, 2024
@martindurant martindurant marked this pull request as ready for review November 20, 2024 15:40
@martindurant
Copy link
Collaborator Author

I don't suppose there's any testing we should be doing here, except that the existing parquet tests continue to work?

@lgray
Copy link
Collaborator

lgray commented Nov 20, 2024

Yeah it's hard to achieve the scale in CI to test the actual performance impact of this PR.

@martindurant martindurant merged commit 1d4d4e9 into dask-contrib:main Nov 20, 2024
24 of 25 checks passed
@martindurant martindurant deleted the pq-metadata branch November 20, 2024 15:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants