diff --git a/pyproject.toml b/pyproject.toml index 89ddf1a43..ef611c2e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,6 +62,7 @@ all = [ "h5netcdf", "h5py", "httpx >=0.20.0,!=0.23.1", + "canonicaljson", "jinja2", "jmespath", "jsonschema", @@ -144,7 +145,7 @@ dev = [ "flake8", "ldap3", "pre-commit", - "pytest", + "pytest <8", # TMP pin while plugins catch up "pytest-asyncio", "pytest-rerunfailures", "sphinx !=4.1.0, !=4.1.1, !=4.1.2, !=4.2.0", @@ -191,6 +192,7 @@ minimal-server = [ "dask", "fastapi", "httpx >=0.20.0,!=0.23.1", + "canonicaljson", "jinja2", "jmespath", "jsonschema", @@ -233,6 +235,7 @@ server = [ "h5netcdf", "h5py", "httpx >=0.20.0,!=0.23.1", + "canonicaljson", "jinja2", "jmespath", "jsonschema", diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index 30fa83cab..e7864882f 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -16,6 +16,7 @@ from sqlalchemy.dialects.postgresql import JSONB, REGCONFIG from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy.orm import selectinload from sqlalchemy.sql.expression import cast from tiled.queries import ( @@ -55,7 +56,7 @@ SPARSE_BLOCKS_PARQUET_MIMETYPE, ZARR_MIMETYPE, ) -from .utils import SCHEME_PATTERN, ensure_uri +from .utils import SCHEME_PATTERN, compute_structure_id, ensure_uri DEFAULT_ECHO = bool(int(os.getenv("TILED_ECHO_SQL", "0") or "0")) INDEX_PATTERN = re.compile(r"^[A-Za-z0-9_-]+$") @@ -352,8 +353,14 @@ async def lookup_adapter( # Search queries and access controls apply only at the top level. assert not first_level.conditions return await first_level.lookup_adapter(segments[1:]) - statement = select(orm.Node).filter( - orm.Node.ancestors == self.segments + ancestors + statement = ( + select(orm.Node) + .filter(orm.Node.ancestors == self.segments + ancestors) + .options( + selectinload(orm.Node.data_sources).selectinload( + orm.DataSource.structure + ) + ) ) for condition in self.conditions: statement = statement.filter(condition) @@ -577,13 +584,35 @@ async def create_node( init_storage, data_uri, data_source.structure ) data_source.assets.extend(assets) - data_source_orm = orm.DataSource( - structure=_prepare_structure( + if data_source.structure is None: + structure_id = None + else: + # Obtain and hash the canonical (RFC 8785) representation of + # the JSON structure. + structure = _prepare_structure( structure_family, data_source.structure - ), + ) + structure_id = compute_structure_id(structure) + # The only way to do "insert if does not exist" i.e. ON CONFLICT + # is to invoke dialect-specific insert. + if self.context.engine.dialect.name == "sqlite": + from sqlalchemy.dialects.sqlite import insert + elif self.context.engine.dialect.name == "postgresql": + from sqlalchemy.dialects.postgresql import insert + else: + assert False # future-proofing + statement = ( + insert(orm.Structure).values( + id=structure_id, + structure=structure, + ) + ).on_conflict_do_nothing(index_elements=["id"]) + await db.execute(statement) + data_source_orm = orm.DataSource( mimetype=data_source.mimetype, management=data_source.management, parameters=data_source.parameters, + structure_id=structure_id, ) node.data_sources.append(data_source_orm) # await db.flush(data_source_orm) @@ -601,7 +630,21 @@ async def create_node( db.add(node) await db.commit() await db.refresh(node) - return key, type(self)(self.context, node, access_policy=self.access_policy) + # Load with DataSources each DataSource's Structure. + refreshed_node = ( + await db.execute( + select(orm.Node) + .filter(orm.Node.id == node.id) + .options( + selectinload(orm.Node.data_sources).selectinload( + orm.DataSource.structure + ) + ) + ) + ).scalar() + return key, type(self)( + self.context, refreshed_node, access_policy=self.access_policy + ) # async def patch_node(datasources=None): # ... diff --git a/tiled/catalog/core.py b/tiled/catalog/core.py index e4d7fcb1b..f81c57959 100644 --- a/tiled/catalog/core.py +++ b/tiled/catalog/core.py @@ -5,6 +5,7 @@ # This is list of all valid revisions (from current to oldest). ALL_REVISIONS = [ + "2ca16566d692", "1cd99c02d0c7", "a66028395cab", "3db11ff95b6c", diff --git a/tiled/catalog/migrations/versions/2ca16566d692_separate_structure_table.py b/tiled/catalog/migrations/versions/2ca16566d692_separate_structure_table.py new file mode 100644 index 000000000..94342c8cf --- /dev/null +++ b/tiled/catalog/migrations/versions/2ca16566d692_separate_structure_table.py @@ -0,0 +1,199 @@ +"""Separate structure table + +Revision ID: 2ca16566d692 +Revises: 1cd99c02d0c7 +Create Date: 2024-01-22 20:44:23.132801 + +""" +from datetime import datetime + +import orjson +import sqlalchemy as sa +from alembic import op +from sqlalchemy.sql import func + +from tiled.catalog.orm import JSONVariant +from tiled.catalog.utils import compute_structure_id +from tiled.server.schemas import Management + +# revision identifiers, used by Alembic. +revision = "2ca16566d692" +down_revision = "1cd99c02d0c7" +branch_labels = None +depends_on = None + + +def upgrade(): + connection = op.get_bind() + + # Create new 'structures' table. + op.create_table( + "structures", + sa.Column("id", sa.Unicode(32), primary_key=True, unique=True), + sa.Column("structure", JSONVariant, nullable=False), + ) + # Get reference, to be used for copying data. + structures = sa.Table( + "structures", + sa.MetaData(), + sa.Column("id", sa.Unicode(32)), + sa.Column("structure", JSONVariant), + ) + if connection.engine.dialect.name == "sqlite": + # We use a copy-and-move strategy here because we cannot get exactly + # the result we want by adding a FOREIGN KEY to SQLite on an existing + # table. + op.create_table( + "new_data_sources", + sa.Column( + "id", sa.Integer, primary_key=True, index=True, autoincrement=True + ), + sa.Column( + "node_id", + sa.Integer, + sa.ForeignKey("nodes.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column( + "structure_id", + sa.Unicode(32), + sa.ForeignKey("structures.id", ondelete="CASCADE"), + nullable=True, + ), + sa.Column("mimetype", sa.Unicode(255), nullable=False), + sa.Column("parameters", JSONVariant, nullable=True), + sa.Column("management", sa.Enum(Management), nullable=False), + sa.Column( + "time_created", + sa.DateTime(timezone=False), + server_default=func.now(), + ), + sa.Column( + "time_updated", + sa.DateTime(timezone=False), + onupdate=func.now(), + server_default=func.now(), + ), + ) + new_data_sources = sa.Table( + "new_data_sources", + sa.MetaData(), + sa.Column("id", sa.Integer), + sa.Column( + "node_id", + sa.Integer, + sa.ForeignKey("nodes.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("structure_id", sa.Unicode(32)), + sa.Column("mimetype", sa.Unicode(255), nullable=False), + sa.Column("parameters", JSONVariant, nullable=True), + sa.Column("management", sa.Enum(Management), nullable=False), + sa.Column( + "time_created", + sa.DateTime(timezone=False), + server_default=func.now(), + ), + sa.Column( + "time_updated", + sa.DateTime(timezone=False), + onupdate=func.now(), + server_default=func.now(), + ), + ) + # Extract rows from data_sources and compute structure_id. + results = connection.execute( + sa.text( + "SELECT id, node_id, structure, mimetype, parameters, management, " + "time_created, time_updated FROM data_sources" + ) + ).fetchall() + + new_data_sources_rows = [] + unique_structures = {} # map unique structure_id -> structure + for row in results: + structure_id = compute_structure_id(row[2]) + new_row = { + "id": row[0], + "node_id": row[1], + "structure_id": structure_id, + "mimetype": row[3], + "parameters": orjson.loads(row[4]), + "management": row[5], + "time_created": datetime.fromisoformat(row[6]), + "time_udpated": datetime.fromisoformat(row[7]), + } + new_data_sources_rows.append(new_row) + if structure_id not in unique_structures: + unique_structures[structure_id] = orjson.loads(row[2]) + structures_rows = [ + {"id": structure_id, "structure": structure} + for structure_id, structure in unique_structures.items() + ] + + # Copy data into new tables. + op.bulk_insert(structures, structures_rows) + op.bulk_insert(new_data_sources, new_data_sources_rows) + + # Drop old 'data_structures' and move 'new_data_structures' into its place. + op.drop_table("data_sources") + op.rename_table("new_data_sources", "data_sources") + # The above leaves many partially filled pages and, run on example + # datasets, left the database slightly _larger_. Clean up. + with op.get_context().autocommit_block(): + connection.execute(sa.text("VACUUM")) + else: + # PostgreSQL + # Extract rows from data_sources and compute structure_id. + results = connection.execute( + sa.text("SELECT id, structure FROM data_sources") + ).fetchall() + unique_structures = {} # map unique structure_id -> structure + data_source_id_to_structure_id = {} + for data_source_id, structure in results: + structure_id = compute_structure_id(structure) + unique_structures[structure_id] = structure + data_source_id_to_structure_id[data_source_id] = structure_id + structures_rows = [ + {"id": structure_id, "structure": structure} + for structure_id, structure in unique_structures.items() + ] + # Copy data into 'structures' table. + op.bulk_insert(structures, structures_rows) + op.add_column( + "data_sources", + sa.Column( + "structure_id", + sa.Unicode(32), + sa.ForeignKey("structures.id", ondelete="CASCADE"), + ), + ) + data_sources = sa.Table( + "data_sources", + sa.MetaData(), + sa.Column("id", sa.Integer), + sa.Column( + "node_id", + sa.Integer, + sa.ForeignKey("nodes.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("structure_id", sa.Unicode(32)), + sa.Column("structure", sa.Unicode(32)), + sa.Column("mimetype", sa.Unicode(255), nullable=False), + sa.Column("parameters", JSONVariant, nullable=True), + sa.Column("management", sa.Enum(Management), nullable=False), + ) + for data_source_id, structure_id in data_source_id_to_structure_id.items(): + connection.execute( + data_sources.update() + .values(structure_id=structure_id) + .where(data_sources.c.id == data_source_id) + ) + op.drop_column("data_sources", "structure") + + +def downgrade(): + # This _could_ be implemented but we will wait for a need since we are + # still in alpha releases. + raise NotImplementedError diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index 4beacd0b2..3fc418a49 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -291,8 +291,9 @@ class DataSource(Timestamped, Base): node_id = Column( Integer, ForeignKey("nodes.id", ondelete="CASCADE"), nullable=False ) - - structure = Column(JSONVariant, nullable=True) + structure_id = Column( + Unicode(32), ForeignKey("structures.id", ondelete="CASCADE"), nullable=True + ) mimetype = Column(Unicode(255), nullable=False) # max length given by RFC 4288 # These are additional parameters passed to the Adapter to guide # it to access and arrange the data in the file correctly. @@ -300,7 +301,14 @@ class DataSource(Timestamped, Base): # This relates to the mutability of the data. management = Column(Enum(Management), nullable=False) - # many-to-many relationship to DataSource, bypassing the `Association` class + # many-to-one relationship to Structure + structure: Mapped["Structure"] = relationship( + "Structure", + lazy="selectin", + passive_deletes=True, + ) + + # many-to-many relationship to Asset, bypassing the `Association` class assets: Mapped[List["Asset"]] = relationship( secondary="data_source_asset_association", back_populates="data_sources", @@ -316,6 +324,23 @@ class DataSource(Timestamped, Base): ) +class Structure(Base): + """ + The describes the structure of a DataSource. + + The id is the HEX digest of the MD5 hash of the canonical representation + of the JSON structure, as specified by RFC 8785. + + https://datatracker.ietf.org/doc/html/rfc8785 + + """ + + __tablename__ = "structures" + + id: str = Column(Unicode(32), primary_key=True, unique=True) + structure = Column(JSONVariant, nullable=False) + + class Asset(Timestamped, Base): """ This tracks individual files/blobs. diff --git a/tiled/catalog/utils.py b/tiled/catalog/utils.py index 0be02d1d8..c50b6a0d1 100644 --- a/tiled/catalog/utils.py +++ b/tiled/catalog/utils.py @@ -1,7 +1,10 @@ +import hashlib import re from pathlib import Path from urllib.parse import urlparse, urlunparse +import canonicaljson + SCHEME_PATTERN = re.compile(r"^[a-z0-9+]+:\/\/.*$") @@ -22,3 +25,10 @@ def ensure_uri(uri_or_path): mutable[1] = "localhost" uri_str = urlunparse(mutable) return str(uri_str) + + +def compute_structure_id(structure): + "Compute HEX digest of MD5 hash of RFC 8785 canonical form of JSON." + canonical_structure = canonicaljson.encode_canonical_json(structure) + + return hashlib.md5(canonical_structure).hexdigest() diff --git a/tiled/server/schemas.py b/tiled/server/schemas.py index b40da5682..063674061 100644 --- a/tiled/server/schemas.py +++ b/tiled/server/schemas.py @@ -148,7 +148,7 @@ class DataSource(pydantic.BaseModel): def from_orm(cls, orm): return cls( id=orm.id, - structure=orm.structure, + structure=getattr(orm.structure, "structure", None), mimetype=orm.mimetype, parameters=orm.parameters, assets=[Asset.from_orm(assoc) for assoc in orm.asset_associations],