Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Leak Investigation #32

Closed
ranchodeluxe opened this issue Feb 5, 2024 · 5 comments
Closed

Memory Leak Investigation #32

ranchodeluxe opened this issue Feb 5, 2024 · 5 comments

Comments

@ranchodeluxe
Copy link
Contributor

ranchodeluxe commented Feb 5, 2024

Problem

A dependency of pangeo-forge-recipes might be producing a memory leak based on the memory distribution we see during job runs.

Next Steps (all can be done in parallel)

  1. run recipes in GCP Dataflow (an over-provisioned instance) and see if we see the same memory pattern

  2. profile the jobs with the DirectRunner locally using Scalene or see what we can do with apache-beam about profiling as this issue talks about

  3. It's a guess, but this might have to deal with file openers and pickling. Anyhow some issues that might give us a bearing:

ecmwf/cfgrib#283
fsspec/filesystem_spec#825
apache/beam#28246 (comment)

Distributions

~7k time steps of MURSST data (WriteCombinedReference workflow):

https://github.com/developmentseed/pangeo-forge-staging/tree/mursst-kerchunk/recipes/mursst

Screen Shot 2024-02-03 at 10 17 49 AM

~5k time steps of GPM IMERG data (WriteCombinedReference workflow):

https://github.com/developmentseed/pangeo-forge-staging/tree/gpm_imerg/recipes/gpm_imerg

Screen Shot 2024-02-04 at 6 39 13 AM
~20 yearly time steps of LEAP data (StoreToZarr workflow):

https://github.com/ranchodeluxe/leap-pgf-example/tree/main/feedstock

Screen Shot 2024-02-05 at 12 45 02 PM
@ranchodeluxe ranchodeluxe mentioned this issue Feb 5, 2024
4 tasks
@ranchodeluxe
Copy link
Contributor Author

If you look at the LEAP run above you'll see the difference. Yes, it uses a ton of memory during rechunking but then levels out so this means it's only our kerchunk worfklows that show the bad memory pattern and we can start whittling it down further

@ranchodeluxe
Copy link
Contributor Author

ranchodeluxe commented Feb 7, 2024

I was able to get memray giving feedback using --live mode about heap allocations and which parts of the workflow owns those allocations. To get back good info you need to run at least a three year subset of GPM IMERG data. At different parts of the job within CombineReferences we see different parts of fsspec and kerchunk own > 50% of the allocations and memory on the heap but they seem to be cleaned up fine

Job (configured to run as one worker process on DirectRunner)

class PrintKeyValueFn(beam.DoFn):
    def process(self, element):
        for mapper in element:
            print(f"{mapper}")
            #for key, value in mapper.items():
            #    print(f"Key: {key}, Value: {value}")


from apache_beam.options.pipeline_options import PipelineOptions
import argparse
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args()


with beam.Pipeline(argv=pipeline_args) as p:
    (
        p 
        | beam.Create(pattern.items()) 
        | OpenWithKerchunk(
            remote_protocol=earthdata_protocol,
            file_type=pattern.file_type,
            # lat/lon are around 5k, this is the best option for forcing kerchunk to inline them
            inline_threshold=6000,
            storage_options=fsspec_open_kwargs,
        ) 
        | CombineReferences(
            concat_dims=CONCAT_DIMS, 
            identical_dims=IDENTICAL_DIMS, 
            target_options=target_fsspec_open_kwargs, 
            remote_options=target_fsspec_open_kwargs, 
            remote_protocol='s3', 
            mzz_kwargs={}, 
            precombine_inputs=False
        ) | "Print Key-Value Pairs" >> beam.ParDo(PrintKeyValueFn())
    )

JH Memory Distribution

Screen Shot 2024-02-07 at 7 53 04 AM

Sorted by Own (late in the job)

Screen Shot 2024-02-07 at 7 53 12 AM

Sorted by Total (late in the job)

Screen Shot 2024-02-07 at 7 53 25 AM

Notable Other Theories

@norlandrhagen is correct that the biggest contributor is the result of the huge reduce we do in memory on the whole PCollection. So breaking this up with parallelism across distributed machines will help but will still require that we provision larger instances than we expected

https://beam.apache.org/documentation/transforms/java/aggregation/combine/

A user-defined CombineFn may be applied to combine all elements in a PCollection (global combine) or to combine all elements associated with each key

☝️ transforms.CombineReferences uses beam.CombineGlobally( CombineMultiZarrToZarr())

@norlandrhagen
Copy link
Collaborator

The CombineReferences seems like the culprit to me. As far as I understand it, it takes in references as they are generated and accumulates them with add_input and then combines them with MultiZarrToZarr. Since this is ran with a ombineGlobally, I think this transform wants accumulate (and multizarrtozarr-ify) all the references before it passes the MultiZarrToZarr object to the WriteReferences Transform.

From how I understand it, the StoreToZarr recipe builds an Xarray schema, writes and empty zarr and then inserts dataset chunks into that store as they arrive, which isn't blocking.

Maybe we can rewrite the Kerchunk pipeline to act in a similar way vis-a-vis appending?

@moradology
Copy link

moradology commented Feb 7, 2024

Been looking at this for a few minutes and trying to get a handle on the various pieces implicated by the CombineReferences function. I think @norlandrhagen's right that the global combine here is worth looking at. If the issue is excess shuffle (and associated memory pressure), it may be worth breaking this up into smaller pieces and that might be a smaller lift than rewriting the zarr writing function to pre-allocate disk and write out in windows

Perhaps something like this for CombineReferences:

    def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
        pcoll = pcoll | 'PreCombine' >> beam.groupIntoBatches | beam.ParDo (batch + combine per worker)
        return pcoll | 'CombineGlobally' >> beam.CombineGlobally(CombineMultiZarrToZarr(...))

I'm also seeing a pre_combine flag - it doesn't look to me like it quite does the above but I'm not confident about kerchunk/multizarrtozarr behavior to be honest

@ranchodeluxe
Copy link
Contributor Author

Good stuff in here to review later but closing for now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants