-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Generating a recipe's hash fails when using process_chunk
and process_input
functions
#427
Comments
Looks like this issue was documented by @cisaacstern in pangeo-forge/staged-recipes#183 (comment) |
@andersy005: I'm happy to debug with you sometime this week. I have some time tomorrow and Friday mornings PST -- when are you most available? |
Looking a bit at the sources: The |
My leading hypothesis is that the inspect call cannot find Dataclass fields that are Callables, like
This is defined as an arbitrary callable:
My first suggestion is to try to exclude the
If that doesn't work -- is there another Callable in the target subclass that is trying to be serialized in the dataclass? For example,
To refine my question above: do we want to serialize callables like If these are essential for the hash function -- I am happy to look into how we can get Beam's serializer to include these sources. Happy do some pickle engineering with you 🥒 . |
thank you for looking into this, @alxmrs!
That would be great. i'm available tomorrow (wed 10/27) anytime between 9am - 11am PT and 12:30pm - 3:30pm PT |
let's meet at 9:30 pst! ill send you a calendar invite in a bit. :) |
Let me provide some background on the hash function. The goal here is to be able to determine if two invocations of a recipe are "the same": same inputs, same options, same processing steps, etc. In order to do that, Charles and I spent quite a bit of time developing these hashing capabilities and ensuring they are deterministic in #349 and #359. Our approach to hashing preprocessing functions, described here - #359 (comment), indeed involves using Alternative solutions might investigate hashing the bytecode, or whatever else is available within the beam runtime. As a more extreme solution, we could simply not compute the hash for any recipe with processing functions. I think that moving those blocked recipes forward is much more important than maintaining this somewhat fragile approach to hashing recipes. I'm not very optimistic that hashing can survive the beam refactor anyway--can we deterministically hash a beam Pipeline? I doubt it. So let's not spend too much time maintaining this feature. |
This is guaranteed to be *not* stable across python versions (and probably other implementation details), but without this stuff just crashes Fixes pangeo-forge#427
Yeah, I fell down this rabbit hole a few months (or weeks?) ago, and I just pushed my local branch up to this PR: #428. It might temporarily unblock us, but I agree that we should probably drop the hashing instead maybe? |
pangeo-forge/pangeo-forge-runner#28 was my fix for pangeo-forge-runner, which allowed at least some functions (not all) to go through |
Note that pangeo-forge/staged-recipes#183 (comment) is not just the hash, but more complex and relates (partially) to how beam runs code with the FnAPI. While #428 will unblock some it won't unblock all. @alxmrs can you add me to the invite too? :) |
This is guaranteed to be *not* stable across python versions (and probably other implementation details), but without this stuff just crashes Fixes pangeo-forge#427
Yep, the issue I had with the ASCAT recipes is separate and related to subclassing |
I'd looked into this a bit last week when trying to debug the CCMP recipe issue, I've copied some of the comment text here along with a few other thoughts: Looking at the staged/feedstock recipes that failed last week (EOOffshore CCMP, AGDC, LMR...) vs the recipe that ran successfully (eNATL60), the latter is the only one that doesn't define either a I had a look at reproducing the scenario with a combination of code snippets similar to what's used in In [43]: from fsspec.implementations.local import LocalFileSystem
...: import inspect
...: from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget, MetadataTarget, StorageConfig, temporary_storage_config
...: from pangeo_forge_runner import Feedstock
...: from pathlib import Path
...:
...: feedstock = Feedstock(Path(".../staged-recipes/recipes/eooffshore_ics_ccmp_v02_1_nrt_wind"))
...:
...: recipes = feedstock.parse_recipes()
...:
...: recipes = {k: r.copy_pruned() for k, r in recipes.items()}
...:
...: recipe = recipes['eooffshore_ics_ccmp_v02_1_nrt_wind']
...:
...: storage_config = temporary_storage_config()
...: storage_config.target = FSSpecTarget(LocalFileSystem(), f'./ccmp.zarr')
...: storage_config.cache = CacheFSSpecTarget(LocalFileSystem(), './input-cache')
...: recipe.storage_config = storage_config
...:
In [44]: recipe
Out[44]: XarrayZarrRecipe(file_pattern=<FilePattern {'time': 2}>, storage_config=StorageConfig(target=FSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7fbf1b443370>, root_path='./ccmp.zarr'), cache=CacheFSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7fbf1b443370>, root_path='./input-cache'), metadata=MetadataTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7fbf1b443370>, root_path='/tmp/tmps62mr0gl/urkdQjLc')), inputs_per_chunk=2000, target_chunks={'time': 8000, 'latitude': -1, 'longitude': -1}, cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, consolidate_dimension_coordinates=True, xarray_open_kwargs={}, xarray_concat_kwargs={}, delete_input_encoding=True, process_input=<function ics_wind_speed_direction at 0x7fbf19f7df70>, process_chunk=None, lock_timeout=None, subset_inputs={}, open_input_with_kerchunk=False)
In [45]: recipe.sha256()
Out[45]: b'\xe6\x85\xd6\xfa\xb0\xe7\x8a\x80\xf7ST\xa1M\xed\xae\x8e\x9e\xbe\xe8!d\xb2\xc6)\n\x8f3}}\x85\x7f\xf5'
In [46]: dataclass_sha256(recipe, ignore_keys=recipe._hash_exclude_)
Out[46]: b'\xe6\x85\xd6\xfa\xb0\xe7\x8a\x80\xf7ST\xa1M\xed\xae\x8e\x9e\xbe\xe8!d\xb2\xc6)\n\x8f3}}\x85\x7f\xf5'
In [47]: either_encode_or_hash(recipe.process_input)
Out[47]: 'def ics_wind_speed_direction(ds, fname):\n """\n Selects a subset for the Irish Continental Shelf (ICS) region, and computes wind speed and\n direction for the u and v components in the specified product. Dask arrays are\n created for delayed execution.\n """\n import dask\n import dask.array as da\n from datetime import datetime\n from metpy.calc import wind_direction, wind_speed\n import xarray as xr\n\n @dask.delayed\n def delayed_metpy_fn(fn, u, v):\n return fn(u, v).values\n\n # ICS grid\n geospatial_lat_min = 45.75\n geospatial_lat_max = 58.25\n geospatial_lon_min = 333.85\n geospatial_lon_max = 355.35\n icds = ds.sel(\n latitude=slice(geospatial_lat_min, geospatial_lat_max),\n longitude=slice(geospatial_lon_min, geospatial_lon_max),\n )\n\n # Remove subset of original attrs as they\'re no longer relevant\n for attr in ["base_date", "date_created", "history"]:\n del icds.attrs[attr]\n\n # Update the grid attributes\n icds.attrs.update(\n {\n "geospatial_lat_min": geospatial_lat_min,\n "geospatial_lat_max": geospatial_lat_max,\n "geospatial_lon_min": geospatial_lon_min,\n "geospatial_lon_max": geospatial_lon_max,\n }\n )\n u = icds.uwnd\n v = icds.vwnd\n # Original wind speed \'units\': \'m s-1\' attribute not accepted by MetPy,\n # use the unit contained in ERA5 data\n ccmp_wind_speed_units = u.units\n era5_wind_speed_units = "m s**-1"\n u.attrs["units"] = era5_wind_speed_units\n v.attrs["units"] = era5_wind_speed_units\n\n variables = [\n {\n "name": "wind_speed",\n "metpy_fn": wind_speed,\n "attrs": {"long_name": "Wind speed", "units": ccmp_wind_speed_units},\n },\n {\n "name": "wind_direction",\n "metpy_fn": wind_direction,\n "attrs": {"long_name": "Wind direction", "units": "degree"},\n },\n ]\n\n # CCMP provides u/v at a single height, 10m\n for variable in variables:\n icds[variable["name"]] = (\n xr.DataArray(\n da.from_delayed(\n delayed_metpy_fn(variable["metpy_fn"], u, v), u.shape, dtype=u.dtype\n ),\n coords=u.coords,\n dims=u.dims,\n )\n .assign_coords(height=10)\n .expand_dims(["height"])\n )\n icds[variable["name"]].attrs.update(variable["attrs"])\n\n icds.height.attrs.update(\n {\n "long_name": "Height above the surface",\n "standard_name": "height",\n "units": "m",\n }\n )\n # Restore units\n for variable in ["uwnd", "vwnd"]:\n icds[variable].attrs["units"] = ccmp_wind_speed_units\n\n icds.attrs["eooffshore_zarr_creation_time"] = datetime.strftime(\n datetime.now(), "%Y-%m-%dT%H:%M:%SZ"\n )\n icds.attrs[\n "eooffshore_zarr_details"\n ] = "EOOffshore Project: Concatenated CCMP v0.2.1.NRT 6-hourly wind products provided by Remote Sensing Systems (RSS), for Irish Continental Shelf. Wind speed and direction have been calculated from the uwnd and vwnd variables. CCMP Version-2 vector wind analyses are produced by Remote Sensing Systems. Data are available at www.remss.com."\n return icds\n'
In [48]: inspect.isfunction(recipe.process_input)
Out[48]: True
In [49]: inspect.getsource(recipe.process_input)
Out[49]: 'def ics_wind_speed_direction(ds, fname):\n """\n Selects a subset for the Irish Continental Shelf (ICS) region, and computes wind speed and\n direction for the u and v components in the specified product. Dask arrays are\n created for delayed execution.\n """\n import dask\n import dask.array as da\n from datetime import datetime\n from metpy.calc import wind_direction, wind_speed\n import xarray as xr\n\n @dask.delayed\n def delayed_metpy_fn(fn, u, v):\n return fn(u, v).values\n\n # ICS grid\n geospatial_lat_min = 45.75\n geospatial_lat_max = 58.25\n geospatial_lon_min = 333.85\n geospatial_lon_max = 355.35\n icds = ds.sel(\n latitude=slice(geospatial_lat_min, geospatial_lat_max),\n longitude=slice(geospatial_lon_min, geospatial_lon_max),\n )\n\n # Remove subset of original attrs as they\'re no longer relevant\n for attr in ["base_date", "date_created", "history"]:\n del icds.attrs[attr]\n\n # Update the grid attributes\n icds.attrs.update(\n {\n "geospatial_lat_min": geospatial_lat_min,\n "geospatial_lat_max": geospatial_lat_max,\n "geospatial_lon_min": geospatial_lon_min,\n "geospatial_lon_max": geospatial_lon_max,\n }\n )\n u = icds.uwnd\n v = icds.vwnd\n # Original wind speed \'units\': \'m s-1\' attribute not accepted by MetPy,\n # use the unit contained in ERA5 data\n ccmp_wind_speed_units = u.units\n era5_wind_speed_units = "m s**-1"\n u.attrs["units"] = era5_wind_speed_units\n v.attrs["units"] = era5_wind_speed_units\n\n variables = [\n {\n "name": "wind_speed",\n "metpy_fn": wind_speed,\n "attrs": {"long_name": "Wind speed", "units": ccmp_wind_speed_units},\n },\n {\n "name": "wind_direction",\n "metpy_fn": wind_direction,\n "attrs": {"long_name": "Wind direction", "units": "degree"},\n },\n ]\n\n # CCMP provides u/v at a single height, 10m\n for variable in variables:\n icds[variable["name"]] = (\n xr.DataArray(\n da.from_delayed(\n delayed_metpy_fn(variable["metpy_fn"], u, v), u.shape, dtype=u.dtype\n ),\n coords=u.coords,\n dims=u.dims,\n )\n .assign_coords(height=10)\n .expand_dims(["height"])\n )\n icds[variable["name"]].attrs.update(variable["attrs"])\n\n icds.height.attrs.update(\n {\n "long_name": "Height above the surface",\n "standard_name": "height",\n "units": "m",\n }\n )\n # Restore units\n for variable in ["uwnd", "vwnd"]:\n icds[variable].attrs["units"] = ccmp_wind_speed_units\n\n icds.attrs["eooffshore_zarr_creation_time"] = datetime.strftime(\n datetime.now(), "%Y-%m-%dT%H:%M:%SZ"\n )\n icds.attrs[\n "eooffshore_zarr_details"\n ] = "EOOffshore Project: Concatenated CCMP v0.2.1.NRT 6-hourly wind products provided by Remote Sensing Systems (RSS), for Irish Continental Shelf. Wind speed and direction have been calculated from the uwnd and vwnd variables. CCMP Version-2 vector wind analyses are produced by Remote Sensing Systems. Data are available at www.remss.com."\n return icds\n'
In [50]: dataclass_sha256(recipe, ignore_keys=recipe._hash_exclude_)
Out[50]: b'\xe6\x85\xd6\xfa\xb0\xe7\x8a\x80\xf7ST\xa1M\xed\xae\x8e\x9e\xbe\xe8!d\xb2\xc6)\n\x8f3}}\x85\x7f\xf5' As we're excluding In [51]: dataclass_sha256(recipe, ignore_keys=recipe._hash_exclude_ + ['process_input'])
Out[51]: b'\x14w\xd5\xcbH\x15At:\x1bY;\xbb/P\x90%t\xf3\xd9T\\P\x9a\xf4\xf5\xa18\x0b\xa7M\xa8' I don't think
E.g. In [27]: from dataclasses import asdict
In [28]: from pangeo_forge_recipes.serialization import dict_drop_empty
In [29]: asdict(recipe, dict_factory=dict_drop_empty)
Out[29]:
{'file_pattern': <FilePattern {'time': 2}>,
'storage_config': {'target': {'fs': <fsspec.implementations.local.LocalFileSystem at 0x7f79d31b3fd0>,
'root_path': '/tmp/tmpz2tbd4o2/VTmsSMTh'},
'cache': {'fs': <fsspec.implementations.local.LocalFileSystem at 0x7f79d31b3fd0>,
'root_path': '/tmp/tmpz2tbd4o2/HC5bJHu4'},
'metadata': {'fs': <fsspec.implementations.local.LocalFileSystem at 0x7f79d31b3fd0>,
'root_path': '/tmp/tmpz2tbd4o2/80Soxcza'}},
'inputs_per_chunk': 1,
'target_chunks': {'time': 8000, 'latitude': -1, 'longitude': -1},
'cache_inputs': True,
'copy_input_to_local_file': False,
'consolidate_zarr': True,
'consolidate_dimension_coordinates': True,
'delete_input_encoding': True,
'process_input': <function ics_wind_speed_direction(ds: xarray.core.dataset.Dataset, fname: str) -> xarray.core.dataset.Dataset>,
'open_input_with_kerchunk': False,
'concat_dim': 'time',
'concat_dim_chunks': 8000,
'cache_metadata': True} Suggested changes: In _hash_exclude_ = ["process_chunk", "process_input", "storage_config"] and in if k in d:
del d[k] Afaict, apart from tests, Although a recipe
which will be called by:
Any
I'd still be curious why this previously worked for feedstocks (e.g. AGDC), it might be worth looking into replicating the environment used for the previous successful feedstock runs. |
We could exclude them, but this would defeat the point of hashing, because two recipes with very different processing code (and therefore very different results) would end up with the same hash. At that point, the hash is useless, and we should just drop this feature. The logic for excluding storage config is very different. That is about where the data are going, but not the contents of the dataset itself. |
Thanks, that makes sense. I'd still be curious about what changed between the previous successful feedstock recipe bakes and the current failures. E.g. AGDC seems to have worked in the staged environment, but not in production. If the hash issue is resolved here, e.g. with #429, it might the case that other issues appear. Having said that, if I understand correctly, the CCMP recipe will have called |
Recently, I've encountered a beam issue that has not been easy to reproduce. This seems to affect recipes that use the
process_input
andprocess_chunk
functions.Everything works perfectly fine when using apache beam locally. However, for reasons I don't understand, pangeo-forge-recipes is unable to hash these two inputs when using dataflow, resulting in the error below.
There are plenty of recipes/feedstocks that are blocked by this issue, and any help diagnosing the issue would be greatly appreciated. Cc @alxmrs, @rabernat, @derekocallaghan, @yuvipanda
Apologies for the lack of a minimal reproducible example. I haven't been able to come up with one :(
The text was updated successfully, but these errors were encountered: