Skip to content

Commit

Permalink
Normalize structure into its own table (#637)
Browse files Browse the repository at this point in the history
* Normalize structure into a separate table.

* Add migration for structures table.

* Separate PG and SQLite paths.

* Add jcs to 'all' pip selector.

* Switch from jcs to canonicaljson.

* Be more DRY

* Run VACUUM at the end of migration.

* Migrate time_ columns as well.

* Decode JSON before copying.

* TMP: Pin back pytest while plugins catch up to pytest 8
  • Loading branch information
danielballan authored Jan 31, 2024
1 parent fa00781 commit c76d1b3
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 12 deletions.
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ all = [
"h5netcdf",
"h5py",
"httpx >=0.20.0,!=0.23.1",
"canonicaljson",
"jinja2",
"jmespath",
"jsonschema",
Expand Down Expand Up @@ -144,7 +145,7 @@ dev = [
"flake8",
"ldap3",
"pre-commit",
"pytest",
"pytest <8", # TMP pin while plugins catch up
"pytest-asyncio",
"pytest-rerunfailures",
"sphinx !=4.1.0, !=4.1.1, !=4.1.2, !=4.2.0",
Expand Down Expand Up @@ -191,6 +192,7 @@ minimal-server = [
"dask",
"fastapi",
"httpx >=0.20.0,!=0.23.1",
"canonicaljson",
"jinja2",
"jmespath",
"jsonschema",
Expand Down Expand Up @@ -233,6 +235,7 @@ server = [
"h5netcdf",
"h5py",
"httpx >=0.20.0,!=0.23.1",
"canonicaljson",
"jinja2",
"jmespath",
"jsonschema",
Expand Down
57 changes: 50 additions & 7 deletions tiled/catalog/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from sqlalchemy.dialects.postgresql import JSONB, REGCONFIG
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import selectinload
from sqlalchemy.sql.expression import cast

from tiled.queries import (
Expand Down Expand Up @@ -55,7 +56,7 @@
SPARSE_BLOCKS_PARQUET_MIMETYPE,
ZARR_MIMETYPE,
)
from .utils import SCHEME_PATTERN, ensure_uri
from .utils import SCHEME_PATTERN, compute_structure_id, ensure_uri

DEFAULT_ECHO = bool(int(os.getenv("TILED_ECHO_SQL", "0") or "0"))
INDEX_PATTERN = re.compile(r"^[A-Za-z0-9_-]+$")
Expand Down Expand Up @@ -352,8 +353,14 @@ async def lookup_adapter(
# Search queries and access controls apply only at the top level.
assert not first_level.conditions
return await first_level.lookup_adapter(segments[1:])
statement = select(orm.Node).filter(
orm.Node.ancestors == self.segments + ancestors
statement = (
select(orm.Node)
.filter(orm.Node.ancestors == self.segments + ancestors)
.options(
selectinload(orm.Node.data_sources).selectinload(
orm.DataSource.structure
)
)
)
for condition in self.conditions:
statement = statement.filter(condition)
Expand Down Expand Up @@ -577,13 +584,35 @@ async def create_node(
init_storage, data_uri, data_source.structure
)
data_source.assets.extend(assets)
data_source_orm = orm.DataSource(
structure=_prepare_structure(
if data_source.structure is None:
structure_id = None
else:
# Obtain and hash the canonical (RFC 8785) representation of
# the JSON structure.
structure = _prepare_structure(
structure_family, data_source.structure
),
)
structure_id = compute_structure_id(structure)
# The only way to do "insert if does not exist" i.e. ON CONFLICT
# is to invoke dialect-specific insert.
if self.context.engine.dialect.name == "sqlite":
from sqlalchemy.dialects.sqlite import insert
elif self.context.engine.dialect.name == "postgresql":
from sqlalchemy.dialects.postgresql import insert
else:
assert False # future-proofing
statement = (
insert(orm.Structure).values(
id=structure_id,
structure=structure,
)
).on_conflict_do_nothing(index_elements=["id"])
await db.execute(statement)
data_source_orm = orm.DataSource(
mimetype=data_source.mimetype,
management=data_source.management,
parameters=data_source.parameters,
structure_id=structure_id,
)
node.data_sources.append(data_source_orm)
# await db.flush(data_source_orm)
Expand All @@ -601,7 +630,21 @@ async def create_node(
db.add(node)
await db.commit()
await db.refresh(node)
return key, type(self)(self.context, node, access_policy=self.access_policy)
# Load with DataSources each DataSource's Structure.
refreshed_node = (
await db.execute(
select(orm.Node)
.filter(orm.Node.id == node.id)
.options(
selectinload(orm.Node.data_sources).selectinload(
orm.DataSource.structure
)
)
)
).scalar()
return key, type(self)(
self.context, refreshed_node, access_policy=self.access_policy
)

# async def patch_node(datasources=None):
# ...
Expand Down
1 change: 1 addition & 0 deletions tiled/catalog/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

# This is list of all valid revisions (from current to oldest).
ALL_REVISIONS = [
"2ca16566d692",
"1cd99c02d0c7",
"a66028395cab",
"3db11ff95b6c",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
"""Separate structure table
Revision ID: 2ca16566d692
Revises: 1cd99c02d0c7
Create Date: 2024-01-22 20:44:23.132801
"""
from datetime import datetime

import orjson
import sqlalchemy as sa
from alembic import op
from sqlalchemy.sql import func

from tiled.catalog.orm import JSONVariant
from tiled.catalog.utils import compute_structure_id
from tiled.server.schemas import Management

# revision identifiers, used by Alembic.
revision = "2ca16566d692"
down_revision = "1cd99c02d0c7"
branch_labels = None
depends_on = None


def upgrade():
connection = op.get_bind()

# Create new 'structures' table.
op.create_table(
"structures",
sa.Column("id", sa.Unicode(32), primary_key=True, unique=True),
sa.Column("structure", JSONVariant, nullable=False),
)
# Get reference, to be used for copying data.
structures = sa.Table(
"structures",
sa.MetaData(),
sa.Column("id", sa.Unicode(32)),
sa.Column("structure", JSONVariant),
)
if connection.engine.dialect.name == "sqlite":
# We use a copy-and-move strategy here because we cannot get exactly
# the result we want by adding a FOREIGN KEY to SQLite on an existing
# table.
op.create_table(
"new_data_sources",
sa.Column(
"id", sa.Integer, primary_key=True, index=True, autoincrement=True
),
sa.Column(
"node_id",
sa.Integer,
sa.ForeignKey("nodes.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"structure_id",
sa.Unicode(32),
sa.ForeignKey("structures.id", ondelete="CASCADE"),
nullable=True,
),
sa.Column("mimetype", sa.Unicode(255), nullable=False),
sa.Column("parameters", JSONVariant, nullable=True),
sa.Column("management", sa.Enum(Management), nullable=False),
sa.Column(
"time_created",
sa.DateTime(timezone=False),
server_default=func.now(),
),
sa.Column(
"time_updated",
sa.DateTime(timezone=False),
onupdate=func.now(),
server_default=func.now(),
),
)
new_data_sources = sa.Table(
"new_data_sources",
sa.MetaData(),
sa.Column("id", sa.Integer),
sa.Column(
"node_id",
sa.Integer,
sa.ForeignKey("nodes.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("structure_id", sa.Unicode(32)),
sa.Column("mimetype", sa.Unicode(255), nullable=False),
sa.Column("parameters", JSONVariant, nullable=True),
sa.Column("management", sa.Enum(Management), nullable=False),
sa.Column(
"time_created",
sa.DateTime(timezone=False),
server_default=func.now(),
),
sa.Column(
"time_updated",
sa.DateTime(timezone=False),
onupdate=func.now(),
server_default=func.now(),
),
)
# Extract rows from data_sources and compute structure_id.
results = connection.execute(
sa.text(
"SELECT id, node_id, structure, mimetype, parameters, management, "
"time_created, time_updated FROM data_sources"
)
).fetchall()

new_data_sources_rows = []
unique_structures = {} # map unique structure_id -> structure
for row in results:
structure_id = compute_structure_id(row[2])
new_row = {
"id": row[0],
"node_id": row[1],
"structure_id": structure_id,
"mimetype": row[3],
"parameters": orjson.loads(row[4]),
"management": row[5],
"time_created": datetime.fromisoformat(row[6]),
"time_udpated": datetime.fromisoformat(row[7]),
}
new_data_sources_rows.append(new_row)
if structure_id not in unique_structures:
unique_structures[structure_id] = orjson.loads(row[2])
structures_rows = [
{"id": structure_id, "structure": structure}
for structure_id, structure in unique_structures.items()
]

# Copy data into new tables.
op.bulk_insert(structures, structures_rows)
op.bulk_insert(new_data_sources, new_data_sources_rows)

# Drop old 'data_structures' and move 'new_data_structures' into its place.
op.drop_table("data_sources")
op.rename_table("new_data_sources", "data_sources")
# The above leaves many partially filled pages and, run on example
# datasets, left the database slightly _larger_. Clean up.
with op.get_context().autocommit_block():
connection.execute(sa.text("VACUUM"))
else:
# PostgreSQL
# Extract rows from data_sources and compute structure_id.
results = connection.execute(
sa.text("SELECT id, structure FROM data_sources")
).fetchall()
unique_structures = {} # map unique structure_id -> structure
data_source_id_to_structure_id = {}
for data_source_id, structure in results:
structure_id = compute_structure_id(structure)
unique_structures[structure_id] = structure
data_source_id_to_structure_id[data_source_id] = structure_id
structures_rows = [
{"id": structure_id, "structure": structure}
for structure_id, structure in unique_structures.items()
]
# Copy data into 'structures' table.
op.bulk_insert(structures, structures_rows)
op.add_column(
"data_sources",
sa.Column(
"structure_id",
sa.Unicode(32),
sa.ForeignKey("structures.id", ondelete="CASCADE"),
),
)
data_sources = sa.Table(
"data_sources",
sa.MetaData(),
sa.Column("id", sa.Integer),
sa.Column(
"node_id",
sa.Integer,
sa.ForeignKey("nodes.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("structure_id", sa.Unicode(32)),
sa.Column("structure", sa.Unicode(32)),
sa.Column("mimetype", sa.Unicode(255), nullable=False),
sa.Column("parameters", JSONVariant, nullable=True),
sa.Column("management", sa.Enum(Management), nullable=False),
)
for data_source_id, structure_id in data_source_id_to_structure_id.items():
connection.execute(
data_sources.update()
.values(structure_id=structure_id)
.where(data_sources.c.id == data_source_id)
)
op.drop_column("data_sources", "structure")


def downgrade():
# This _could_ be implemented but we will wait for a need since we are
# still in alpha releases.
raise NotImplementedError
31 changes: 28 additions & 3 deletions tiled/catalog/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,16 +291,24 @@ class DataSource(Timestamped, Base):
node_id = Column(
Integer, ForeignKey("nodes.id", ondelete="CASCADE"), nullable=False
)

structure = Column(JSONVariant, nullable=True)
structure_id = Column(
Unicode(32), ForeignKey("structures.id", ondelete="CASCADE"), nullable=True
)
mimetype = Column(Unicode(255), nullable=False) # max length given by RFC 4288
# These are additional parameters passed to the Adapter to guide
# it to access and arrange the data in the file correctly.
parameters = Column(JSONVariant, nullable=True)
# This relates to the mutability of the data.
management = Column(Enum(Management), nullable=False)

# many-to-many relationship to DataSource, bypassing the `Association` class
# many-to-one relationship to Structure
structure: Mapped["Structure"] = relationship(
"Structure",
lazy="selectin",
passive_deletes=True,
)

# many-to-many relationship to Asset, bypassing the `Association` class
assets: Mapped[List["Asset"]] = relationship(
secondary="data_source_asset_association",
back_populates="data_sources",
Expand All @@ -316,6 +324,23 @@ class DataSource(Timestamped, Base):
)


class Structure(Base):
"""
The describes the structure of a DataSource.
The id is the HEX digest of the MD5 hash of the canonical representation
of the JSON structure, as specified by RFC 8785.
https://datatracker.ietf.org/doc/html/rfc8785
"""

__tablename__ = "structures"

id: str = Column(Unicode(32), primary_key=True, unique=True)
structure = Column(JSONVariant, nullable=False)


class Asset(Timestamped, Base):
"""
This tracks individual files/blobs.
Expand Down
Loading

0 comments on commit c76d1b3

Please sign in to comment.