Skip to content

Commit

Permalink
unifies ResourceHints typed dict
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Sep 22, 2024
1 parent 875bf29 commit 897e8b4
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 24 deletions.
2 changes: 1 addition & 1 deletion dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> No
# convert to table meta if created table variant so item is assigned to this table
if meta.create_table_variant:
# name in hints meta must be a string, otherwise merge_hints would fail
meta = TableNameMeta(meta.hints["name"]) # type: ignore[arg-type]
meta = TableNameMeta(meta.hints["table_name"]) # type: ignore[arg-type]
self._reset_contracts_cache()

if table_name := self._get_static_table_name(resource, meta):
Expand Down
72 changes: 51 additions & 21 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TypedDict, cast, Any, Optional, Dict
from typing import Sequence, TypedDict, Union, cast, Any, Optional, Dict
from typing_extensions import Self

from dlt.common import logger
Expand Down Expand Up @@ -42,23 +42,23 @@


class TResourceHintsBase(TypedDict, total=False):
table_name: Optional[TTableHintTemplate[str]]
write_disposition: Optional[TTableHintTemplate[TWriteDispositionConfig]]
parent: Optional[TTableHintTemplate[str]]
primary_key: Optional[TTableHintTemplate[TColumnNames]]
columns: Optional[TTableHintTemplate[TAnySchemaColumns]]
schema_contract: Optional[TTableHintTemplate[TSchemaContract]]
table_format: Optional[TTableHintTemplate[TTableFormat]]
file_format: TTableHintTemplate[TFileFormat]
merge_key: Optional[TTableHintTemplate[TColumnNames]]
nested_hints: Optional[Dict[str, "TResourceHintsBase"]]


class TResourceHints(TResourceHintsBase, total=False):
name: TTableHintTemplate[str]
# description: TTableHintTemplate[str]
# table_sealed: Optional[bool]
columns: TTableHintTemplate[TTableSchemaColumns]
incremental: Incremental[Any]
file_format: TTableHintTemplate[TFileFormat]
incremental: Optional[Incremental[Any]]
validator: ValidateItem
original_columns: TTableHintTemplate[TAnySchemaColumns]
original_columns: Optional[TTableHintTemplate[TAnySchemaColumns]]


class HintsMeta:
Expand Down Expand Up @@ -89,6 +89,7 @@ def make_hints(
"""
validator, schema_contract = create_item_validator(columns, schema_contract)
# create a table schema template where hints can be functions taking TDataItem
# TODO: do not use new_table here and get rid if typing ignores
new_template: TResourceHints = new_table(
table_name, # type: ignore
parent_table_name, # type: ignore
Expand All @@ -97,8 +98,9 @@ def make_hints(
table_format=table_format, # type: ignore
file_format=file_format, # type: ignore
)
new_template["table_name"] = new_template.pop("name") # type: ignore
if not table_name:
new_template.pop("name")
del new_template["table_name"]
if not write_disposition and "write_disposition" in new_template:
new_template.pop("write_disposition")
# remember original columns and set template columns
Expand All @@ -117,12 +119,34 @@ def make_hints(
return new_template


class DltResourceHintsDict(Dict[str, "DltResourceHints"]):
# def __init__(self, initial_value: TResourceHintsBase)

def __getitem__(self, key: Union[str, Sequence[str]]) -> "DltResourceHints":
"""Get item at `key` is string or recursively if sequence"""
if isinstance(key, str):
return super().__getitem__(key)
else:
item = super().__getitem__(key[0])
for k_ in key[1:]:
item = item.nested_hints[k_]
return item

def __setitem__(self, key: str, value: Union["DltResourceHints", TResourceHintsBase]) -> None:
"""Sets resource hints at given `key` or create new instance from table template"""
if isinstance(value, DltResourceHints):
return super().__setitem__(key, value)
else:
return super().__setitem__(key, DltResourceHints(value)) # type: ignore


class DltResourceHints:
def __init__(self, table_schema_template: TResourceHints = None):
self.__qualname__ = self.__name__ = self.name
self._table_name_hint_fun: TFunHintTemplate[str] = None
self._table_has_other_dynamic_hints: bool = False
self._hints: TResourceHints = None
self._nested_hints: DltResourceHintsDict = None
"""Hints for the resource"""
self._hints_variants: Dict[str, TResourceHints] = {}
"""Hints for tables emitted from resources"""
Expand All @@ -139,7 +163,7 @@ def table_name(self) -> TTableHintTemplate[str]:
if self._table_name_hint_fun:
return self._table_name_hint_fun
# get table name or default name
return self._hints.get("name") or self.name if self._hints else self.name
return self._hints.get("table_name") or self.name if self._hints else self.name

@table_name.setter
def table_name(self, value: TTableHintTemplate[str]) -> None:
Expand All @@ -158,7 +182,11 @@ def write_disposition(self, value: TTableHintTemplate[TWriteDispositionConfig])
@property
def columns(self) -> TTableHintTemplate[TTableSchemaColumns]:
"""Gets columns' schema that can be modified in place"""
return None if self._hints is None else self._hints.get("columns")
return None if self._hints is None else self._hints.get("columns") # type: ignore[return-value]

@property
def nested_hints(self) -> DltResourceHintsDict:
pass

@property
def schema_contract(self) -> TTableHintTemplate[TSchemaContract]:
Expand All @@ -179,24 +207,24 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab
"""
if isinstance(meta, TableNameMeta):
# look for variant
table_template = self._hints_variants.get(meta.table_name, self._hints)
root_table_template = self._hints_variants.get(meta.table_name, self._hints)
else:
table_template = self._hints
if not table_template:
root_table_template = self._hints
if not root_table_template:
return new_table(self.name, resource=self.name)

# resolve a copy of a held template
table_template = self._clone_hints(table_template)
if "name" not in table_template:
table_template["name"] = self.name
root_table_template = self._clone_hints(root_table_template)
if "table_name" not in root_table_template:
root_table_template["table_name"] = self.name

# if table template present and has dynamic hints, the data item must be provided.
if self._table_name_hint_fun and item is None:
raise DataItemRequiredForDynamicTableHints(self.name)
# resolve
resolved_template: TResourceHints = {
k: self._resolve_hint(item, v)
for k, v in table_template.items()
for k, v in root_table_template.items()
if k not in NATURAL_CALLABLES
} # type: ignore
table_schema = self._create_table_schema(resolved_template, self.name)
Expand Down Expand Up @@ -276,9 +304,9 @@ def apply_hints(
t = self._clone_hints(t)
if table_name is not None:
if table_name:
t["name"] = table_name
t["table_name"] = table_name
else:
t.pop("name", None)
t.pop("table_name", None)
if parent_table_name is not None:
if parent_table_name:
t["parent"] = parent_table_name
Expand All @@ -296,6 +324,7 @@ def apply_hints(
# normalize columns
columns = ensure_table_schema_columns(columns)
# this updates all columns with defaults
assert isinstance(t["columns"], dict)
t["columns"] = merge_columns(t["columns"], columns, merge_columns=True)
else:
# set to empty columns
Expand Down Expand Up @@ -354,7 +383,8 @@ def _set_hints(
DltResourceHints.validate_dynamic_hints(hints_template)
DltResourceHints.validate_write_disposition_hint(hints_template.get("write_disposition"))
if create_table_variant:
table_name: str = hints_template["name"] # type: ignore[assignment]
# for table variants, table name must be a str
table_name: str = hints_template["table_name"] # type: ignore[assignment]
# incremental cannot be specified in variant
if hints_template.get("incremental"):
raise InconsistentTableTemplate(
Expand Down Expand Up @@ -388,7 +418,7 @@ def merge_hints(
self, hints_template: TResourceHints, create_table_variant: bool = False
) -> None:
self.apply_hints(
table_name=hints_template.get("name"),
table_name=hints_template.get("table_name"),
parent_table_name=hints_template.get("parent"),
write_disposition=hints_template.get("write_disposition"),
columns=hints_template.get("original_columns"),
Expand Down
2 changes: 0 additions & 2 deletions dlt/sources/rest_api/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,7 @@ class ProcessingSteps(TypedDict):
class ResourceBase(TResourceHintsBase, total=False):
"""Defines hints that may be passed to `dlt.resource` decorator"""

table_name: Optional[TTableHintTemplate[str]]
max_table_nesting: Optional[int]
columns: Optional[TTableHintTemplate[TAnySchemaColumns]]
selected: Optional[bool]
parallelized: Optional[bool]
processing_steps: Optional[List[ProcessingSteps]]
Expand Down

0 comments on commit 897e8b4

Please sign in to comment.