Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DataChain.listings() method and use it in getting storages #331

Merged
merged 28 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
44552e1
first version of from_storage without deprecated listing
ilongin Aug 13, 2024
1e64b7f
first version of from_storage without deprecated listing
ilongin Aug 13, 2024
1f1b7b3
fixing tests and removing prints, refactoring
ilongin Aug 14, 2024
5fbcfc7
refactoring listing static methods
ilongin Aug 16, 2024
a922e0a
fixing non recursive queries
ilongin Aug 16, 2024
a09a2c8
using ctc in test session
ilongin Aug 17, 2024
1d2a8b0
fixing json
ilongin Aug 19, 2024
76e2cce
added DataChain.listings classmethod that returns list of ListingInfo…
ilongin Aug 20, 2024
ad98378
another test for listings
ilongin Aug 20, 2024
3587f8b
removed not needed filters
ilongin Aug 20, 2024
925c7cd
refactoring test
ilongin Aug 20, 2024
8714fcd
removed not needed catalog storage methods and their related codebase
ilongin Aug 22, 2024
e363c13
fixing windows tests
ilongin Aug 23, 2024
38503d1
returning to all tests
ilongin Aug 23, 2024
48931f6
removed unlist_source method and related codebase
ilongin Aug 23, 2024
1f3f7aa
fixing dataset dependencies
ilongin Aug 24, 2024
5a56062
added session on cloud test catalog and refactoring tests
ilongin Aug 26, 2024
40e83f6
using new listings method in from_storage
ilongin Aug 27, 2024
84ad5da
fixing test
ilongin Aug 27, 2024
ed4f4a1
fixing test
ilongin Aug 27, 2024
8537347
added dataset name dependencies test and fixes
ilongin Sep 2, 2024
18a0a01
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 2, 2024
14aadc0
small refactoring
ilongin Sep 2, 2024
d74f4bf
Merge branch 'ilongin/329-refactor-storages' of github.com:iterative/…
ilongin Sep 2, 2024
bf1e24e
Merge branch 'main' into ilongin/329-refactor-storages
ilongin Sep 3, 2024
15afd42
Merge branch 'main' into ilongin/329-refactor-storages
ilongin Sep 4, 2024
97297bf
refactor comments
ilongin Sep 4, 2024
2d7fb35
Merge branch 'main' into ilongin/329-refactor-storages
ilongin Sep 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -1018,20 +1018,6 @@ def _row_to_node(d: dict[str, Any]) -> Node:

return node_groups

def unlist_source(self, uri: StorageURI) -> None:
self.metastore.clone(uri=uri).mark_storage_not_indexed(uri)

def storage_stats(self, uri: StorageURI) -> Optional[DatasetStats]:
"""
Returns tuple with storage stats: total number of rows and total dataset size.
"""
partial_path = self.metastore.get_last_partial_path(uri)
if partial_path is None:
return None
dataset = self.get_dataset(Storage.dataset_name(uri, partial_path))

return self.dataset_stats(dataset.name, dataset.latest_version)

def create_dataset(
self,
name: str,
Expand Down Expand Up @@ -1618,15 +1604,6 @@ def ls(
for source in data_sources: # type: ignore [union-attr]
yield source, source.ls(fields)

def ls_storage_uris(self) -> Iterator[str]:
yield from self.metastore.get_all_storage_uris()

def get_storage(self, uri: StorageURI) -> Storage:
return self.metastore.get_storage(uri)

def ls_storages(self) -> list[Storage]:
return self.metastore.list_storages()

def pull_dataset(
self,
dataset_uri: str,
Expand Down
18 changes: 4 additions & 14 deletions src/datachain/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from datachain import utils
from datachain.cli_utils import BooleanOptionalAction, CommaSeparatedArgs, KeyValueArgs
from datachain.lib.dc import DataChain
from datachain.utils import DataChainDir

if TYPE_CHECKING:
Expand Down Expand Up @@ -615,18 +616,6 @@ def _ls_urls_flat(
raise FileNotFoundError(f"No such file or directory: {source}")


def ls_indexed_storages(catalog: "Catalog", long: bool = False) -> Iterator[str]:
from datachain.node import long_line_str

storage_uris = catalog.ls_storage_uris()
if long:
for uri in storage_uris:
# TODO: add Storage.created so it can be used here
yield long_line_str(uri, None, "")
else:
yield from storage_uris


def ls_local(
sources,
long: bool = False,
Expand Down Expand Up @@ -657,8 +646,9 @@ def ls_local(
for entry in entries:
print(format_ls_entry(entry))
else:
for entry in ls_indexed_storages(catalog, long=long):
print(format_ls_entry(entry))
chain = DataChain.listings()
for ls in chain.collect("listing"):
print(format_ls_entry(f"{ls.uri}@v{ls.version}")) # type: ignore[union-attr]


def format_ls_entry(entry: str) -> str:
Expand Down
29 changes: 1 addition & 28 deletions src/datachain/data_storage/metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,32 +167,17 @@ def mark_storage_indexed(
This method should be called when index operation is finished.
"""

@abstractmethod
def mark_storage_not_indexed(self, uri: StorageURI) -> None:
"""
Mark storage as not indexed.
This method should be called when storage index is deleted.
"""

@abstractmethod
def update_last_inserted_at(self, uri: Optional[StorageURI] = None) -> None:
shcheklein marked this conversation as resolved.
Show resolved Hide resolved
"""Updates last inserted datetime in bucket with current time."""

@abstractmethod
def get_all_storage_uris(self) -> Iterator[StorageURI]:
"""Returns all storage uris."""

@abstractmethod
def get_storage(self, uri: StorageURI) -> Storage:
shcheklein marked this conversation as resolved.
Show resolved Hide resolved
"""
Gets storage representation from database.
E.g. if s3 is used as storage this would be s3 bucket data.
"""

@abstractmethod
def list_storages(self) -> list[Storage]:
"""Returns all storages."""

@abstractmethod
def mark_storage_pending(self, storage: Storage) -> Storage:
"""Marks storage as pending."""
Expand Down Expand Up @@ -324,7 +309,7 @@ def add_dependency(
self.add_dataset_dependency(
source_dataset_name,
source_dataset_version,
dependency.name,
dependency.dataset_name,
int(dependency.version),
)
else:
Expand Down Expand Up @@ -906,11 +891,6 @@ def update_last_inserted_at(self, uri: Optional[StorageURI] = None) -> None:
self._storages_update().where(s.c.uri == uri).values(**updates) # type: ignore [attr-defined]
)

def get_all_storage_uris(self) -> Iterator[StorageURI]:
"""Returns all storage uris."""
s = self._storages
yield from (r[0] for r in self.db.execute(self._storages_select(s.c.uri)))

def get_storage(self, uri: StorageURI, conn=None) -> Storage:
"""
Gets storage representation from database.
Expand All @@ -926,13 +906,6 @@ def get_storage(self, uri: StorageURI, conn=None) -> Storage:

return self.storage_class._make(result)

def list_storages(self) -> list[Storage]:
result = self.db.execute(self._storages_select())
if not result:
return []

return [self.storage_class._make(r) for r in result]

def mark_storage_pending(self, storage: Storage, conn=None) -> Storage:
# Update status to pending and dates
updates = {
Expand Down
11 changes: 0 additions & 11 deletions src/datachain/data_storage/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,17 +517,6 @@ def _datasets_versions_insert(self) -> "Insert":
def _datasets_dependencies_insert(self) -> "Insert":
return sqlite.insert(self._datasets_dependencies)

#
# Storages
#

def mark_storage_not_indexed(self, uri: StorageURI) -> None:
"""
Mark storage as not indexed.
This method should be called when storage index is deleted.
"""
self.db.execute(self._storages_delete().where(self._storages.c.uri == uri))

#
# Dataset dependencies
#
Expand Down
69 changes: 38 additions & 31 deletions src/datachain/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
)
from urllib.parse import urlparse

from dateutil.parser import isoparse

from datachain.client import Client
from datachain.sql.types import NAME_TYPES_MAPPING, SQLType

Expand Down Expand Up @@ -73,11 +71,22 @@
class DatasetDependency:
id: int
type: str
name: str # when the type is STORAGE, this is actually StorageURI
version: str # string until we'll have proper bucket listing versions
name: str
version: str # TODO change to int
created_at: datetime
dependencies: list[Optional["DatasetDependency"]]

@property
def dataset_name(self) -> str:
"""Returns clean dependency dataset name"""
from datachain.lib.listing import parse_listing_uri

if self.type == DatasetDependencyType.DATASET:
return self.name

list_dataset_name, _, _ = parse_listing_uri(self.name.strip("/"), None, {})
return list_dataset_name

@classmethod
def parse(
cls: builtins.type[DD],
Expand All @@ -92,33 +101,31 @@
dataset_version_created_at: Optional[datetime],
bucket_uri: Optional["StorageURI"],
) -> Optional["DatasetDependency"]:
if dataset_id:
assert dataset_name is not None
return cls(
id,
DatasetDependencyType.DATASET,
dataset_name,
(
str(dataset_version) # type: ignore[arg-type]
if dataset_version
else None
),
dataset_version_created_at or dataset_created_at, # type: ignore[arg-type]
[],
)
if bucket_uri:
return cls(
id,
DatasetDependencyType.STORAGE,
bucket_uri,
bucket_version, # type: ignore[arg-type]
isoparse(bucket_version), # type: ignore[arg-type]
[],
)
# dependency has been removed
# TODO we should introduce flags for removed datasets, instead of
# removing them from tables so that we can still have references
return None
from datachain.lib.listing import is_listing_dataset, listing_uri_from_name

if not dataset_id:
return None

Check warning on line 107 in src/datachain/dataset.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/dataset.py#L107

Added line #L107 was not covered by tests

assert dataset_name is not None
dependency_type = DatasetDependencyType.DATASET
dependency_name = dataset_name

if is_listing_dataset(dataset_name):
dependency_type = DatasetDependencyType.STORAGE # type: ignore[arg-type]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use LISTING as a type (since we use listing as term in some other places)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this as well. Not sure, but both is correct IMO

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but both is correct IMO

yes, but it just make it simpler to maintain, search, understand

not critical, but I would try to go everywhere with the same term

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will consider that in upcoming PRs just to not block this one

dependency_name = listing_uri_from_name(dataset_name)

return cls(
id,
dependency_type,
dependency_name,
(
str(dataset_version) # type: ignore[arg-type]
if dataset_version
else None
),
dataset_version_created_at or dataset_created_at, # type: ignore[arg-type]
[],
)

@property
def is_dataset(self) -> bool:
Expand Down
4 changes: 4 additions & 0 deletions src/datachain/lib/dataset_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class DatasetInfo(DataModel):
size: Optional[int] = Field(default=None)
params: dict[str, str] = Field(default=dict)
metrics: dict[str, Any] = Field(default=dict)
error_message: str = Field(default="")
error_stack: str = Field(default="")

@staticmethod
def _validate_dict(
Expand Down Expand Up @@ -67,4 +69,6 @@ def from_models(
size=version.size,
params=job.params if job else {},
metrics=job.metrics if job else {},
error_message=version.error_message,
error_stack=version.error_stack,
)
47 changes: 39 additions & 8 deletions src/datachain/lib/dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
ls,
parse_listing_uri,
)
from datachain.lib.listing_info import ListingInfo
from datachain.lib.meta_formats import read_meta, read_schema
from datachain.lib.model_store import ModelStore
from datachain.lib.settings import Settings
Expand Down Expand Up @@ -349,10 +350,7 @@ def from_storage(
"""
file_type = get_file_type(type)

if anon:
client_config = {"anon": True}
else:
client_config = None
client_config = {"anon": True} if anon else None

session = Session.get(session, client_config=client_config, in_memory=in_memory)

Expand All @@ -361,12 +359,9 @@ def from_storage(
)
need_listing = True

for ds in cls.datasets(
session=session, in_memory=in_memory, include_listing=True
).collect("dataset"):
for ds in cls.listings(session=session, in_memory=in_memory).collect("listing"):
if (
not is_listing_expired(ds.created_at) # type: ignore[union-attr]
and is_listing_dataset(ds.name) # type: ignore[union-attr]
and is_listing_subset(ds.name, list_dataset_name) # type: ignore[union-attr]
and not update
):
Expand Down Expand Up @@ -577,6 +572,42 @@ def datasets(
**{object_name: datasets}, # type: ignore[arg-type]
)

@classmethod
def listings(
cls,
session: Optional[Session] = None,
in_memory: bool = False,
object_name: str = "listing",
**kwargs,
) -> "DataChain":
"""Generate chain with list of cached listings.
Listing is a special kind of dataset which has directory listing data of
some underlying storage (e.g S3 bucket).

Example:
```py
from datachain import DataChain
DataChain.listings().show()
```
"""
session = Session.get(session, in_memory=in_memory)
catalog = kwargs.get("catalog") or session.catalog

listings = [
ListingInfo.from_models(d, v, j)
for d, v, j in catalog.list_datasets_versions(
include_listing=True, **kwargs
)
if is_listing_dataset(d.name)
]

return cls.from_values(
session=session,
in_memory=in_memory,
output={object_name: ListingInfo},
**{object_name: listings}, # type: ignore[arg-type]
)

def print_json_schema( # type: ignore[override]
self, jmespath: Optional[str] = None, model_name: Optional[str] = None
) -> "Self":
Expand Down
7 changes: 7 additions & 0 deletions src/datachain/lib/listing.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ def is_listing_dataset(name: str) -> bool:
return name.startswith(LISTING_PREFIX)


def listing_uri_from_name(dataset_name: str) -> str:
"""Returns clean storage URI from listing dataset name"""
if not is_listing_dataset(dataset_name):
raise ValueError(f"Dataset {dataset_name} is not a listing")
return dataset_name.removeprefix(LISTING_PREFIX)


def is_listing_expired(created_at: datetime) -> bool:
"""Checks if listing has expired based on it's creation date"""
return datetime.now(timezone.utc) > created_at + timedelta(seconds=LISTING_TTL)
Expand Down
32 changes: 32 additions & 0 deletions src/datachain/lib/listing_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from datetime import datetime, timedelta, timezone
from typing import Optional

from datachain.client import Client
from datachain.lib.dataset_info import DatasetInfo
from datachain.lib.listing import LISTING_PREFIX, LISTING_TTL


class ListingInfo(DatasetInfo):
@property
def uri(self) -> str:
return self.name.removeprefix(LISTING_PREFIX)

@property
def storage_uri(self) -> str:
client, _ = Client.parse_url(self.uri, None) # type: ignore[arg-type]
return client.uri

@property
def expires(self) -> Optional[datetime]:
if not self.finished_at:
return None

Check warning on line 22 in src/datachain/lib/listing_info.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/lib/listing_info.py#L22

Added line #L22 was not covered by tests
return self.finished_at + timedelta(seconds=LISTING_TTL)

@property
def is_expired(self) -> bool:
return datetime.now(timezone.utc) > self.expires if self.expires else False

@property
def last_inserted_at(self):
# TODO we need to add updated_at to dataset version or explicit last_inserted_at
raise NotImplementedError
Loading