-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding a to_spatial method on ParquetSource #11
Changes from 1 commit
b059667
f865e56
f029a36
6e8622a
8bbf503
23396ac
9b8d848
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -106,5 +106,32 @@ def _to_dask(self): | |
self._load_metadata() | ||
return self._df | ||
|
||
def to_spatial(self): | ||
""" | ||
Create a datashader spatial object from the parquet data | ||
""" | ||
try: | ||
from datashader.spatial.points import SpatialPointsFrame | ||
except ImportError: | ||
raise ImportError('SpatialPointsFrame not found in this ' | ||
'version of datashader. Get latest using ' | ||
'`conda install -c pyviz datashader`.') | ||
import json | ||
import fastparquet as fp | ||
|
||
frame = self._df or self.to_dask() | ||
urlpath = self._get_cache(self._urlpath)[0] | ||
pf = fp.ParquetFile(urlpath) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess this probably doesn't work for remote. Is there a good way to get There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No there isn't. You need slightly different things to work with FP directly, but it can be done. |
||
# Check for spatial points metadata | ||
if 'SpatialPointsFrame' in pf.key_value_metadata: | ||
# Load metadata | ||
props = json.loads(pf.key_value_metadata['SpatialPointsFrame']) | ||
else: | ||
props = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if no? |
||
|
||
# Call DataFrame constructor with the internals of frame | ||
return SpatialPointsFrame(frame.dask, frame._name, frame._meta, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Presumably this can fail in various way? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I should have made this clearer, I copied all of this straight from: https://github.com/pyviz/datashader/blob/master/datashader/spatial/points.py#L297-L312 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, so we just defer everything to whatever that interface does... I guess then the docstring should be caveated to say that whatever goes wrong, it's not Intake's fault! |
||
frame.divisions, props) | ||
|
||
def _close(self): | ||
self._df = None |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
""" | ||
Minimal spatial test copied from datashader | ||
""" | ||
import os | ||
import pytest | ||
import numpy as np | ||
import pandas as pd | ||
|
||
from intake_parquet.source import ParquetSource | ||
import intake | ||
|
||
pytest.importorskip('datashader') | ||
|
||
intake.registry['parquet'] = ParquetSource # because pytest defers import | ||
|
||
@pytest.fixture() | ||
def df(): | ||
N = 1000 | ||
np.random.seed(25) | ||
|
||
df = pd.DataFrame({ | ||
'x': np.random.rand(N), | ||
'y': np.random.rand(N) * 2, | ||
'a': np.random.randn(N) | ||
}) | ||
|
||
# Make sure we have x/y values of 0 and 1 represented so that | ||
# autocomputed ranges are predictable | ||
df.x.iloc[0] = 0.0 | ||
df.x.iloc[-1] = 1.0 | ||
df.y.iloc[0] = 0.0 | ||
df.y.iloc[-1] = 2.0 | ||
return df | ||
|
||
@pytest.fixture(params=[False, True]) | ||
def s_points_frame(request, tmp_path, df): | ||
import datashader.spatial.points as dsp | ||
|
||
# Work around https://bugs.python.org/issue33617 | ||
tmp_path = str(tmp_path) | ||
p = 5 | ||
path = os.path.join(tmp_path, 'spatial_points.parquet') | ||
|
||
dsp.to_parquet( | ||
df, path, 'x', 'y', p=p, npartitions=10) | ||
|
||
spf = ParquetSource(path).to_spatial() | ||
|
||
if request.param: | ||
spf = spf.persist() | ||
|
||
return spf | ||
|
||
|
||
def test_spatial_points_frame_properties(s_points_frame): | ||
assert s_points_frame.spatial.x == 'x' | ||
assert s_points_frame.spatial.y == 'y' | ||
assert s_points_frame.spatial.p == 5 | ||
assert s_points_frame.npartitions == 10 | ||
assert s_points_frame.spatial.x_range == (0, 1) | ||
assert s_points_frame.spatial.y_range == (0, 2) | ||
assert s_points_frame.spatial.nrows == 1000 | ||
|
||
# x_bin_edges | ||
np.testing.assert_array_equal( | ||
s_points_frame.spatial.x_bin_edges, | ||
np.linspace(0.0, 1.0, 2 ** 5 + 1)) | ||
|
||
# y_bin_edges | ||
np.testing.assert_array_equal( | ||
s_points_frame.spatial.y_bin_edges, | ||
np.linspace(0.0, 2.0, 2 ** 5 + 1)) | ||
|
||
# distance_divisions | ||
distance_divisions = s_points_frame.spatial.distance_divisions | ||
assert len(distance_divisions) == 10 + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs a lot more description, and certainly a link to wherever the general idea is described (e.g., how do you make these special parquet datasets?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.