Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for apply_vectorcube udf Open-EO/openeo-geopyspark-driver… #314

47 changes: 38 additions & 9 deletions openeo_driver/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import inspect
import io
import logging
import re
import zipfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
Expand Down Expand Up @@ -770,9 +771,44 @@ def apply_dimension(
or (dimension in {self.DIM_BANDS, self.DIM_PROPERTIES}.intersection(self.get_dimension_names()))
and target_dimension is None
):

def check_udf_result(result: openeo.udf.UdfData, require_datacube=False):
if not isinstance(result, openeo.udf.UdfData):
raise ValueError(f"UDF should return UdfData, but got {type(result)}")
feature_list = result.get_feature_collection_list()
if not (feature_list and len(feature_list) == 1):
raise ValueError(
f"UDF should return single feature collection but got {feature_list and len(feature_list)}"
)
if require_datacube:
datacube_list = result.get_datacube_list()
if not (datacube_list and len(datacube_list) == 1):
raise ValueError(
f"UDF should return single datacube but got {datacube_list and len(datacube_list)}"
)
return feature_list[0].data, datacube_list[0].array
return feature_list[0].data, None

log.warning(
f"Using experimental feature: DriverVectorCube.apply_dimension along dim {dimension} and empty cube"
)
if len(re.findall(r"^def apply_vectorcube\(", single_run_udf.udf, re.MULTILINE)) != 0:
JeroenVerstraelen marked this conversation as resolved.
Show resolved Hide resolved
datacube = openeo.udf.XarrayDataCube(array=self._cube)
feature_collection = openeo.udf.FeatureCollection(id="_", data=self._geometries)
udf_data = openeo.udf.UdfData(
proj={"EPSG": self._geometries.crs.to_epsg()} if self._geometries.crs else None,
datacube_list=[datacube],
feature_collection_list=[feature_collection],
user_context=context,
)
log.info(
f"[run_udf] Running apply_vectorcube UDF {str_truncate(single_run_udf.udf, width = 256)!r} on {udf_data!r}"
)
result_data = env.backend_implementation.processing.run_udf(udf=single_run_udf.udf, data=udf_data)
log.info(f"[run_udf] UDF resulted in {result_data!r}")
result_features, result_datacube = check_udf_result(result_data, require_datacube=True)
return DriverVectorCube(geometries=result_features, cube=result_datacube)

# TODO: data chunking (e.g. large feature collections)
gdf = self._as_geopandas_df()
feature_collection = openeo.udf.FeatureCollection(id="_", data=gdf)
Expand All @@ -785,15 +821,8 @@ def apply_dimension(
log.info(f"[run_udf] Running UDF {str_truncate(single_run_udf.udf, width=256)!r} on {udf_data!r}")
result_data = env.backend_implementation.processing.run_udf(udf=single_run_udf.udf, data=udf_data)
log.info(f"[run_udf] UDF resulted in {result_data!r}")

if not isinstance(result_data, openeo.udf.UdfData):
raise ValueError(f"UDF should return UdfData, but got {type(result_data)}")
result_features = result_data.get_feature_collection_list()
if not (result_features and len(result_features) == 1):
raise ValueError(
f"UDF should return single feature collection but got {result_features and len(result_features)}"
)
return DriverVectorCube.from_geodataframe(result_features[0].data)
result_features, _ = check_udf_result(result_data)
return DriverVectorCube.from_geodataframe(result_features)

raise FeatureUnsupportedException(
message=f"DriverVectorCube.apply_dimension with {dimension=} and {bool(single_run_udf)=}"
Expand Down