Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove urlpath dependency and update relative path logic #602

Merged
merged 8 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 20 additions & 31 deletions apps/dc_tools/odc/apps/dc_tools/_stac.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from eodatasets3.serialise import from_doc
from eodatasets3.stac import to_stac_item
from toolz import get_in
from urlpath import URL
from urllib.parse import urlparse

from ._docs import odc_uuid

Expand Down Expand Up @@ -179,10 +179,23 @@ def _find_self_href(item: Document) -> str:
return self_uri[0]


def _get_relative_path(asset_href, self_link):
if self_link is None:
return asset_href

self_path = urlparse(self_link).path
href_path = urlparse(asset_href).path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems unlikely, but for completeness, I think we should be explicitly checking that both self_link and asset_href have the same scheme and netloc (and keeping as an absolute link if they differ).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could refrain from checking scheme, but check for some degree of similarity between the netloc values?


try:
return str(Path(href_path).relative_to(Path(self_path).parent))
except ValueError:
# if it's not relative, keep as an absolute link
return asset_href


def _get_stac_bands(
item: Document,
default_grid: str,
relative: bool = False,
proj_shape: Optional[str] = None,
proj_transform: Optional[str] = None,
) -> Tuple[Document, Document, Document]:
Expand All @@ -195,35 +208,16 @@ def _get_stac_bands(

assets = item.get("assets", {})

def _get_path(asset, force_relative=False):
path = URL(asset["href"])
if relative:
try:
if self_link is None:
raise ValueError
path = path.relative_to(URL(self_link).parent)
# Value error is raised if the path is not relative to the parent
# or if the self link cannot be found.
except ValueError:
# If the path is not relative to the parent force_relative
# is still used for data assets, due to a historical assumption.
# TODO: Implement rewrite_assets (like in stac_to_dc) in all
# tools so that this is no longer necessary.
if force_relative:
path = path.name
else:
pass

return str(path)

for asset_name, asset in assets.items():
image_types = ["jp2", "geotiff"]
# If something's not in image_types, make it an accessory
# include thumbnails in accessories
if not any(
t in asset.get("type", []) for t in image_types
) or "thumbnail" in asset.get("roles", []):
accessories[asset_name] = {"path": _get_path(asset)}
accessories[asset_name] = {
"path": _get_relative_path(asset["href"], self_link)
}
continue

# If transform specified here in the asset it should override
Expand All @@ -240,7 +234,7 @@ def _get_path(asset, force_relative=False):
"transform": transform,
}

path = _get_path(asset, force_relative=True)
path = _get_relative_path(asset["href"], self_link)
band_index = asset.get("band", None)

band_info = {"path": path}
Expand Down Expand Up @@ -275,10 +269,6 @@ def round_coords(c1, c2):
return None


def stac_transform_absolute(input_stac):
return stac_transform(input_stac, relative=False)


def _convert_value_to_eo3_type(key: str, value):
"""
Convert return type as per EO3 specification.
Expand Down Expand Up @@ -332,7 +322,7 @@ def _check_valid_uuid(uuid_string: str) -> bool:
return False


def stac_transform(input_stac: Document, relative: bool = True) -> Document:
def stac_transform(input_stac: Document) -> Document:
"""Takes in a raw STAC 1.0 dictionary and returns an ODC dictionary"""
# pylint: disable=too-many-locals

Expand Down Expand Up @@ -371,7 +361,6 @@ def stac_transform(input_stac: Document, relative: bool = True) -> Document:
bands, grids, accessories = _get_stac_bands(
input_stac,
default_grid,
relative=relative,
proj_shape=proj_shape,
proj_transform=proj_transform,
)
Expand Down
20 changes: 7 additions & 13 deletions apps/dc_tools/odc/apps/dc_tools/s3_to_dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from datacube import Datacube
from datacube.index.hl import Doc2Dataset
from odc.apps.dc_tools._docs import parse_doc_stream
from odc.apps.dc_tools._stac import stac_transform, stac_transform_absolute
from odc.apps.dc_tools._stac import stac_transform
from odc.apps.dc_tools.utils import (
IndexingException,
SkippedException,
Expand All @@ -26,7 +26,6 @@
statsd_gauge_reporting,
statsd_setting,
transform_stac,
transform_stac_absolute,
update_flag,
update_if_exists_flag,
verify_lineage,
Expand Down Expand Up @@ -59,12 +58,15 @@ def dump_to_odc(
uris_docs = parse_doc_stream(
((doc.url, doc.data) for doc in document_stream),
on_error=doc_error,
transform=transform,
)

found_docs = False
for uri, metadata in uris_docs:
found_docs = True
stac_doc = None
if transform:
stac_doc = metadata
metadata = stac_transform(metadata)
try:
index_update_dataset(
metadata,
Expand All @@ -76,6 +78,7 @@ def dump_to_odc(
allow_unsafe=allow_unsafe,
archive_less_mature=archive_less_mature,
publish_action=publish_action,
stac_doc=stac_doc,
)
ds_added += 1
except IndexingException:
Expand Down Expand Up @@ -103,7 +106,6 @@ def dump_to_odc(
@fail_on_missing_lineage
@verify_lineage
@transform_stac
@transform_stac_absolute
@update_flag
@update_if_exists_flag
@allow_unsafe
Expand All @@ -121,7 +123,6 @@ def cli(
fail_on_missing_lineage,
verify_lineage,
stac,
absolute,
update,
update_if_exists,
allow_unsafe,
Expand Down Expand Up @@ -151,13 +152,6 @@ def cli(
datefmt="%m/%d/%Y %I:%M:%S",
)

transform = None
if stac:
if absolute:
transform = stac_transform_absolute
else:
transform = stac_transform

opts = {}
if request_payer:
opts["RequestPayer"] = "requester"
Expand Down Expand Up @@ -215,7 +209,7 @@ def cli(
skip_lineage=skip_lineage,
fail_on_missing_lineage=fail_on_missing_lineage,
verify_lineage=verify_lineage,
transform=transform,
transform=stac,
update=update,
update_if_exists=update_if_exists,
allow_unsafe=allow_unsafe,
Expand Down
28 changes: 7 additions & 21 deletions apps/dc_tools/odc/apps/dc_tools/sqs_to_dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@
statsd_setting,
statsd_gauge_reporting,
transform_stac,
transform_stac_absolute,
archive_less_mature,
update_flag,
update_if_exists_flag,
verify_lineage,
publish_action,
)
from ._stac import stac_transform, stac_transform_absolute, ds_to_stac
from ._stac import stac_transform, ds_to_stac

# Added log handler
logging.basicConfig(level=logging.WARNING, handlers=[logging.StreamHandler()])
Expand Down Expand Up @@ -72,7 +71,7 @@ def extract_action_from_message(message):
return None


def handle_json_message(metadata, transform, odc_metadata_link):
def handle_json_message(metadata, odc_metadata_link):
odc_yaml_uri = None
uri = None

Expand Down Expand Up @@ -100,9 +99,6 @@ def handle_json_message(metadata, transform, odc_metadata_link):
# if no odc_metadata_link provided, it will look for metadata dict "href" value with "rel==self"
uri = get_uri(metadata, "self")

if transform:
metadata = transform(metadata)

return metadata, uri


Expand Down Expand Up @@ -250,13 +246,12 @@ def queue_to_odc(
do_archiving(metadata, dc, publish_action)
else:
if not record_path:
if transform:
stac_doc = metadata
# Extract metadata and URI from a STAC or similar
# json structure for indexing
metadata, uri = handle_json_message(
metadata, transform, odc_metadata_link
)
metadata, uri = handle_json_message(metadata, odc_metadata_link)
if transform:
stac_doc = metadata
metadata = stac_transform(metadata)
else:
# Extract metadata from an S3 bucket notification
# or similar for indexing
Expand Down Expand Up @@ -315,7 +310,6 @@ def queue_to_odc(
@fail_on_missing_lineage
@verify_lineage
@transform_stac
@transform_stac_absolute
@update_flag
@update_if_exists_flag
@allow_unsafe
Expand Down Expand Up @@ -352,7 +346,6 @@ def cli(
fail_on_missing_lineage,
verify_lineage,
stac,
absolute,
update,
update_if_exists,
allow_unsafe,
Expand All @@ -370,13 +363,6 @@ def cli(
):
"""Iterate through messages on an SQS queue and add them to datacube"""

transform = None
if stac:
if absolute:
transform = stac_transform_absolute
else:
transform = stac_transform

candidate_products = product.split()

sqs = boto3.resource("sqs")
Expand All @@ -391,7 +377,7 @@ def cli(
skip_lineage=skip_lineage,
fail_on_missing_lineage=fail_on_missing_lineage,
verify_lineage=verify_lineage,
transform=transform,
transform=stac,
limit=limit,
update=update,
no_sign_request=no_sign_request,
Expand Down
Loading
Loading