Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove support for multiple locations from postgis index #1658

Merged
merged 10 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 26 additions & 153 deletions datacube/drivers/postgis/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@
from . import _core
from ._fields import parse_fields, Expression, PgField, PgExpression, DateRangeDocField # noqa: F401
from ._fields import NativeField, DateDocField, SimpleDocField, UnindexableValue
from ._schema import MetadataType, Product, \
Dataset, DatasetLineage, DatasetLocation, SelectedDatasetLocation, \
from ._schema import MetadataType, Product, Dataset, DatasetLineage, \
search_field_index_map, search_field_indexes, DatasetHome
from ._spatial import geom_alchemy, generate_dataset_spatial_values, extract_geometry_from_eo3_projection
from .sql import escape_pg_identifier
Expand All @@ -67,24 +66,6 @@ def _base_known_fields():
'Archived date',
Dataset.archived
)
fields["uris"] = NativeField(
"uris",
"all uris",
func.array(
select(
SelectedDatasetLocation.uri
).where(
and_(
SelectedDatasetLocation.dataset_ref == Dataset.id,
SelectedDatasetLocation.archived == None
)
).order_by(
SelectedDatasetLocation.added.desc(),
SelectedDatasetLocation.id.desc()
).label('uris')
),
alchemy_table=Dataset.__table__ # type: ignore[attr-defined]
)
return fields


Expand All @@ -102,44 +83,15 @@ def _dataset_fields() -> tuple:
'Archived date',
Dataset.archived
),
NativeField("uris",
"all uris",
func.array(
select(
SelectedDatasetLocation.uri
).where(
and_(
SelectedDatasetLocation.dataset_ref == Dataset.id,
SelectedDatasetLocation.archived == None
)
).order_by(
SelectedDatasetLocation.added.desc(),
SelectedDatasetLocation.id.desc()
).label('uris')
),
alchemy_table=Dataset.__table__ # type: ignore[attr-defined]
)
native_flds["uri"]
)


def _dataset_bulk_select_fields() -> tuple:
return (
Dataset.product_ref,
Dataset.metadata_doc,
# All active URIs, from newest to oldest
func.array(
select(
SelectedDatasetLocation.uri
).where(
and_(
SelectedDatasetLocation.dataset_ref == Dataset.id,
SelectedDatasetLocation.archived == None
)
).order_by(
SelectedDatasetLocation.added.desc(),
SelectedDatasetLocation.id.desc()
).label('uris')
).label('uris')
Dataset.uri
)


Expand Down Expand Up @@ -188,17 +140,11 @@ def get_native_fields() -> dict[str, NativeField]:
'Full metadata document',
Dataset.metadata_doc
),
# Fields that can affect row selection

# Note that this field is a single uri: selecting it will result in one-result per uri.
# (ie. duplicate datasets if multiple uris, no dataset if no uris)
'uri': NativeField(
'uri',
"Dataset URI",
DatasetLocation.uri_body,
alchemy_expression=DatasetLocation.uri,
join_clause=(DatasetLocation.dataset_ref == Dataset.id),
affects_row_selection=True
Dataset.uri_body,
alchemy_expression=Dataset.uri,
),
}
return fields
Expand Down Expand Up @@ -396,22 +342,16 @@ def insert_dataset_location(self, dataset_id, uri):
scheme, body = split_uri(uri)

r = self._connection.execute(
insert(DatasetLocation).on_conflict_do_nothing(
index_elements=['uri_scheme', 'uri_body', 'dataset_ref']
update(Dataset).returning(Dataset.uri).where(
Dataset.id == dataset_id
).values(
dataset_ref=dataset_id,
uri_scheme=scheme,
uri_body=body,
uri_body=body
)
)

return r.rowcount > 0

def insert_dataset_location_bulk(self, values):
requested = len(values)
res = self._connection.execute(insert(DatasetLocation), values)
return res.rowcount, requested - res.rowcount

def insert_dataset_search(self, search_table, dataset_id, key, value):
"""
Add/update a search field index entry for a dataset
Expand Down Expand Up @@ -526,19 +466,17 @@ def get_datasets_for_location(self, uri, mode=None):
mode = 'exact' if body.count('#') > 0 else 'prefix'

if mode == 'exact':
body_query = DatasetLocation.uri_body == body
body_query = Dataset.uri_body == body
elif mode == 'prefix':
body_query = DatasetLocation.uri_body.startswith(body)
body_query = Dataset.uri_body.startswith(body)
else:
raise ValueError('Unsupported query mode {}'.format(mode))

return self._connection.execute(
select(
*_dataset_select_fields()
).join(
Dataset.locations
).where(
and_(DatasetLocation.uri_scheme == scheme, body_query)
and_(Dataset.uri_scheme == scheme, body_query)
)
).fetchall()

Expand Down Expand Up @@ -577,11 +515,6 @@ def restore_dataset(self, dataset_id):
return r.rowcount > 0

def delete_dataset(self, dataset_id):
self._connection.execute(
delete(DatasetLocation).where(
DatasetLocation.dataset_ref == dataset_id
)
)
for table in search_field_indexes.values():
self._connection.execute(
delete(table).where(table.dataset_ref == dataset_id)
Expand Down Expand Up @@ -1258,90 +1191,30 @@ def get_all_metadata_type_defs(self):
for r in self._connection.execute(select(MetadataType.definition).order_by(MetadataType.name.asc())):
yield r[0]

def get_locations(self, dataset_id):
return [
record[0]
for record in self._connection.execute(
select(
DatasetLocation.uri
).where(
DatasetLocation.dataset_ref == dataset_id
).where(
DatasetLocation.archived == None
).order_by(
DatasetLocation.added.desc(),
DatasetLocation.id.desc()
)
).fetchall()
]

def get_archived_locations(self, dataset_id):
"""
Return a list of uris and archived_times for a dataset
"""
return [
(location_uri, archived_time)
for location_uri, archived_time in self._connection.execute(
select(
DatasetLocation.uri, DatasetLocation.archived
).where(
DatasetLocation.dataset_ref == dataset_id
).where(
DatasetLocation.archived != None
).order_by(
DatasetLocation.added.desc()
)
).fetchall()
]
def get_location(self, dataset_id):
return self._connection.execute(
select(Dataset.uri).where(
Dataset.id == dataset_id
)
).first()

def remove_location(self, dataset_id, uri):
"""
Remove the given location for a dataset
Remove a dataset's location

:returns bool: Was the location deleted?
"""
scheme, body = split_uri(uri)
res = self._connection.execute(
delete(DatasetLocation).where(
DatasetLocation.dataset_ref == dataset_id
).where(
DatasetLocation.uri_scheme == scheme
).where(
DatasetLocation.uri_body == body
)
)
return res.rowcount > 0

def archive_location(self, dataset_id, uri):
scheme, body = split_uri(uri)
res = self._connection.execute(
update(DatasetLocation).where(
DatasetLocation.dataset_ref == dataset_id
).where(
DatasetLocation.uri_scheme == scheme
).where(
DatasetLocation.uri_body == body
).where(
DatasetLocation.archived == None
).values(
archived=func.now()
)
)
return res.rowcount > 0

def restore_location(self, dataset_id, uri):
scheme, body = split_uri(uri)
res = self._connection.execute(
update(DatasetLocation).where(
DatasetLocation.dataset_ref == dataset_id
).where(
DatasetLocation.uri_scheme == scheme
).where(
DatasetLocation.uri_body == body
).where(
DatasetLocation.archived != None
update(Dataset).where(
and_(
Dataset.id == dataset_id,
Dataset.uri_scheme == scheme,
Dataset.uri_body == body
)
).values(
archived=None
uri_scheme=None,
uri_body=None
)
)
return res.rowcount > 0
Expand Down
1 change: 0 additions & 1 deletion datacube/drivers/postgis/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ def ensure_db(engine, with_permissions=True):
grant select on all tables in schema {SCHEMA_NAME} to odc_user;

grant insert on {SCHEMA_NAME}.dataset,
{SCHEMA_NAME}.location,
{SCHEMA_NAME}.dataset_lineage to odc_manage;
grant usage, select on all sequences in schema {SCHEMA_NAME} to odc_manage;

Expand Down
52 changes: 8 additions & 44 deletions datacube/drivers/postgis/_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from typing import Type

from sqlalchemy.dialects.postgresql import NUMRANGE, TSTZRANGE
from sqlalchemy.orm import aliased, registry, relationship, column_property
from sqlalchemy import ForeignKey, UniqueConstraint, PrimaryKeyConstraint, CheckConstraint, SmallInteger, Text, Index, \
from sqlalchemy.orm import registry, relationship, column_property
from sqlalchemy import ForeignKey, PrimaryKeyConstraint, CheckConstraint, SmallInteger, Text, Index, \
literal
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import Column, String, DateTime
from sqlalchemy.dialects import postgresql as postgres
from sqlalchemy.sql import func

Expand Down Expand Up @@ -105,40 +105,8 @@ class Dataset:
updated = Column(DateTime(timezone=True), server_default=func.now(), nullable=False,
index=True, comment="when last updated")

locations = relationship("DatasetLocation", viewonly=True)
active_locations = relationship("DatasetLocation",
primaryjoin="and_(Dataset.id==DatasetLocation.dataset_ref, "
"DatasetLocation.archived==None)",
viewonly=True,
order_by="desc(DatasetLocation.added)")
archived_locations = relationship("DatasetLocation",
viewonly=True,
primaryjoin="and_(Dataset.id==DatasetLocation.dataset_ref, "
"DatasetLocation.archived!=None)"
)


Index("ix_ds_prod_active", Dataset.product_ref, postgresql_where=(Dataset.archived == None))
Index("ix_ds_mdt_active", Dataset.metadata_type_ref, postgresql_where=(Dataset.archived == None))


@orm_registry.mapped
class DatasetLocation:
__tablename__ = "location"
__table_args__ = (
_core.METADATA,
UniqueConstraint('uri_scheme', 'uri_body', 'dataset_ref'),
Index("ix_loc_ds_added", "dataset_ref", "added"),
{
"schema": sql.SCHEMA_NAME,
"comment": "Where data for the dataset can be found (uri)."
}
)
id = Column(Integer, primary_key=True, autoincrement=True)
dataset_ref = Column(postgres.UUID(as_uuid=True), ForeignKey(Dataset.id), nullable=False,
comment="The product this dataset belongs to")
uri_scheme = Column(String, nullable=False, comment="The scheme of the uri.")
uri_body = Column(String, nullable=False, comment="""The body of the uri.
uri_scheme = Column(String, comment="The scheme of the uri.")
uri_body = Column(String, comment="""The body of the uri.

The uri scheme and body make up the base URI to find the dataset.

Expand All @@ -147,15 +115,11 @@ class DatasetLocation:

eg 'file:///g/data/datasets/LS8_NBAR/odc-metadata.yaml' or 'ftp://eo.something.com/dataset'
'file' is a scheme, '///g/data/datasets/LS8_NBAR/odc-metadata.yaml' is a body.""")
added = Column(DateTime(timezone=True), server_default=func.now(), nullable=False, comment="when added")
added_by = Column(Text, server_default=func.current_user(), nullable=False, comment="added by whom")
archived = Column(DateTime(timezone=True), default=None, nullable=True, index=True,
comment="when archived, null for the active location")
uri = column_property(uri_scheme + literal(':') + uri_body)
dataset = relationship("Dataset")


SelectedDatasetLocation = aliased(DatasetLocation, name="sel_loc")
Index("ix_ds_prod_active", Dataset.product_ref, postgresql_where=(Dataset.archived == None))
Index("ix_ds_mdt_active", Dataset.metadata_type_ref, postgresql_where=(Dataset.archived == None))


@orm_registry.mapped
Expand Down Expand Up @@ -329,7 +293,7 @@ class DatasetSearchDateTime:

ALL_STATIC_TABLES = [
MetadataType.__table__, Product.__table__, # type: ignore[attr-defined]
Dataset.__table__, DatasetLocation.__table__, # type: ignore[attr-defined]
Dataset.__table__, # type: ignore[attr-defined]
DatasetLineage.__table__, DatasetHome.__table__, # type: ignore[attr-defined]
SpatialIndexRecord.__table__, # type: ignore[attr-defined]
DatasetSearchString.__table__, DatasetSearchNumeric.__table__, # type: ignore[attr-defined]
Expand Down
Loading
Loading