Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple adapters from core #8666

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230915-123733.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: 'Allow adapters to include package logs in dbt standard logging '
time: 2023-09-15T12:37:33.862862-07:00
custom:
Author: colin-rogers-dbt
Issue: "7859"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20230831-164435.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Added more type annotations.
time: 2023-08-31T16:44:35.737954-04:00
custom:
Author: peterallenwebb
Issue: "8537"
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Configuration for pre-commit hooks (see https://pre-commit.com/).
# Eventually the hooks described here will be run as tests before merging each PR.

exclude: ^(core/dbt/docs/build/|core/dbt/events/types_pb2.py)
exclude: ^(core/dbt/docs/build/|core/dbt/common/events/types_pb2.py)

# Force all unspecified python hooks to run python 3.8
default_language_version:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ dev: dev_req ## Installs dbt-* packages in develop mode along with development d

.PHONY: proto_types
proto_types: ## generates google protobuf python file from types.proto
protoc -I=./core/dbt/events --python_out=./core/dbt/events ./core/dbt/events/types.proto
protoc -I=./core/dbt/common/events --python_out=./core/dbt/common/events ./core/dbt/common/events/types.proto

.PHONY: mypy
mypy: .env ## Runs mypy against staged changes for static type checking.
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/adapters/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# these are all just exports, #noqa them so flake8 will be happy

# TODO: Should we still include this in the `adapters` namespace?
from dbt.contracts.connection import Credentials # noqa: F401
from dbt.adapters.contracts.connection import Credentials # noqa: F401
from dbt.adapters.base.meta import available # noqa: F401
from dbt.adapters.base.connections import BaseConnectionManager # noqa: F401
from dbt.adapters.base.relation import ( # noqa: F401
Expand Down
10 changes: 5 additions & 5 deletions core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import agate

import dbt.exceptions
from dbt.contracts.connection import (
from dbt.adapters.contracts.connection import (
Connection,
Identifier,
ConnectionState,
Expand All @@ -36,9 +36,9 @@
from dbt.adapters.base.query_headers import (
MacroQueryStringSetter,
)
from dbt.events import AdapterLogger
from dbt.events.functions import fire_event
from dbt.events.types import (
from dbt.common.events import AdapterLogger
from dbt.common.events.functions import fire_event
from dbt.common.events.types import (
NewConnection,
ConnectionReused,
ConnectionLeftOpenInCleanup,
Expand All @@ -48,7 +48,7 @@
Rollback,
RollbackFailed,
)
from dbt.events.contextvars import get_node_info
from dbt.common.events.contextvars import get_node_info
from dbt import flags
from dbt.utils import cast_to_str

Expand Down
192 changes: 27 additions & 165 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
Set,
Tuple,
Type,
TypedDict,
Union,
)

from dbt.adapters.capability import Capability, CapabilityDict
from dbt.contracts.graph.nodes import ColumnLevelConstraint, ConstraintType, ModelLevelConstraint

import agate
Expand All @@ -46,17 +44,12 @@
)

from dbt.adapters.protocol import AdapterConfig
from dbt.clients.agate_helper import (
empty_table,
get_column_value_uncased,
merge_tables,
table_from_rows,
)
from dbt.clients.agate_helper import empty_table, merge_tables, table_from_rows
from dbt.clients.jinja import MacroGenerator
from dbt.contracts.graph.manifest import Manifest, MacroManifest
from dbt.contracts.graph.nodes import ResultNode
from dbt.events.functions import fire_event, warn_or_error
from dbt.events.types import (
from dbt.common.events.functions import fire_event, warn_or_error
from dbt.common.events.types import (
CacheMiss,
ListRelations,
CodeExecution,
Expand All @@ -81,9 +74,7 @@
from dbt import deprecations

GET_CATALOG_MACRO_NAME = "get_catalog"
GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations"
FRESHNESS_MACRO_NAME = "collect_freshness"
GET_RELATION_LAST_MODIFIED_MACRO_NAME = "get_relation_last_modified"


class ConstraintSupport(str, Enum):
Expand Down Expand Up @@ -118,7 +109,7 @@ def test(row: agate.Row) -> bool:
return test


def _utc(dt: Optional[datetime], source: Optional[BaseRelation], field_name: str) -> datetime:
def _utc(dt: Optional[datetime], source: BaseRelation, field_name: str) -> datetime:
"""If dt has a timezone, return a new datetime that's in UTC. Otherwise,
assume the datetime is already for UTC and add the timezone.
"""
Expand Down Expand Up @@ -170,12 +161,6 @@ def submit(self, compiled_code: str) -> Any:
raise NotImplementedError("PythonJobHelper submit function is not implemented yet")


class FreshnessResponse(TypedDict):
max_loaded_at: datetime
snapshotted_at: datetime
age: float # age in seconds


class BaseAdapter(metaclass=AdapterMeta):
"""The BaseAdapter provides an abstract base class for adapters.

Expand Down Expand Up @@ -237,10 +222,6 @@ class BaseAdapter(metaclass=AdapterMeta):
ConstraintType.foreign_key: ConstraintSupport.ENFORCED,
}

# This static member variable can be overriden in concrete adapter
# implementations to indicate adapter support for optional capabilities.
_capabilities = CapabilityDict({})

def __init__(self, config) -> None:
self.config = config
self.cache = RelationsCache()
Expand Down Expand Up @@ -434,55 +415,23 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
lowercase strings.
"""
info_schema_name_map = SchemaSearchMap()
relations = self._get_catalog_relations(manifest)
for relation in relations:
nodes: Iterator[ResultNode] = chain(
[
node
for node in manifest.nodes.values()
if (node.is_relational and not node.is_ephemeral_model)
],
manifest.sources.values(),
)
for node in nodes:
relation = self.Relation.create_from(self.config, node)
info_schema_name_map.add(relation)
# result is a map whose keys are information_schema Relations without
# identifiers that have appropriate database prefixes, and whose values
# are sets of lowercase schema names that are valid members of those
# databases
return info_schema_name_map

def _get_catalog_relations_by_info_schema(
self, relations
) -> Dict[InformationSchema, List[BaseRelation]]:
relations_by_info_schema: Dict[InformationSchema, List[BaseRelation]] = dict()
for relation in relations:
info_schema = relation.information_schema_only()
if info_schema not in relations_by_info_schema:
relations_by_info_schema[info_schema] = []
relations_by_info_schema[info_schema].append(relation)

return relations_by_info_schema

def _get_catalog_relations(
self, manifest: Manifest, selected_nodes: Optional[Set] = None
) -> List[BaseRelation]:
nodes: Iterator[ResultNode]
if selected_nodes:
selected: List[ResultNode] = []
for unique_id in selected_nodes:
if unique_id in manifest.nodes:
node = manifest.nodes[unique_id]
if node.is_relational and not node.is_ephemeral_model:
selected.append(node)
elif unique_id in manifest.sources:
source = manifest.sources[unique_id]
selected.append(source)
nodes = iter(selected)
else:
nodes = chain(
[
node
for node in manifest.nodes.values()
if (node.is_relational and not node.is_ephemeral_model)
],
manifest.sources.values(),
)

relations = [self.Relation.create_from(self.config, n) for n in nodes]
return relations

def _relations_cache_for_schemas(
self, manifest: Manifest, cache_schemas: Optional[Set[BaseRelation]] = None
) -> None:
Expand Down Expand Up @@ -1144,60 +1093,20 @@ def _get_one_catalog(
results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
return results

def _get_one_catalog_by_relations(
self,
information_schema: InformationSchema,
relations: List[BaseRelation],
manifest: Manifest,
) -> agate.Table:

kwargs = {
"information_schema": information_schema,
"relations": relations,
}
table = self.execute_macro(
GET_CATALOG_RELATIONS_MACRO_NAME,
kwargs=kwargs,
# pass in the full manifest, so we get any local project
# overrides
manifest=manifest,
)

results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
return results

def get_catalog(
self, manifest: Manifest, selected_nodes: Optional[Set] = None
) -> Tuple[agate.Table, List[Exception]]:
def get_catalog(self, manifest: Manifest) -> Tuple[agate.Table, List[Exception]]:
schema_map = self._get_catalog_schemas(manifest)

with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
catalog_relations = self._get_catalog_relations(manifest, selected_nodes)
relation_count = len(catalog_relations)
if relation_count <= 100 and self.supports(Capability.SchemaMetadataByRelations):
relations_by_schema = self._get_catalog_relations_by_info_schema(catalog_relations)
for info_schema in relations_by_schema:
name = ".".join([str(info_schema.database), "information_schema"])
relations = relations_by_schema[info_schema]
fut = tpe.submit_connected(
self,
name,
self._get_one_catalog_by_relations,
info_schema,
relations,
manifest,
)
futures.append(fut)
else:
schema_map: SchemaSearchMap = self._get_catalog_schemas(manifest)
for info, schemas in schema_map.items():
if len(schemas) == 0:
continue
name = ".".join([str(info.database), "information_schema"])
fut = tpe.submit_connected(
self, name, self._get_one_catalog, info, schemas, manifest
)
futures.append(fut)
for info, schemas in schema_map.items():
if len(schemas) == 0:
continue
name = ".".join([str(info.database), "information_schema"])

fut = tpe.submit_connected(
self, name, self._get_one_catalog, info, schemas, manifest
)
futures.append(fut)

catalogs, exceptions = catch_as_completed(futures)

Expand All @@ -1213,7 +1122,7 @@ def calculate_freshness(
loaded_at_field: str,
filter: Optional[str],
manifest: Optional[Manifest] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
) -> Tuple[Optional[AdapterResponse], Dict[str, Any]]:
"""Calculate the freshness of sources in dbt, and return it"""
kwargs: Dict[str, Any] = {
"source": source,
Expand Down Expand Up @@ -1248,50 +1157,11 @@ def calculate_freshness(

snapshotted_at = _utc(table[0][1], source, loaded_at_field)
age = (snapshotted_at - max_loaded_at).total_seconds()
freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}
return adapter_response, freshness

def calculate_freshness_from_metadata(
self,
source: BaseRelation,
manifest: Optional[Manifest] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
kwargs: Dict[str, Any] = {
"information_schema": source.information_schema_only(),
"relations": [source],
}
result = self.execute_macro(
GET_RELATION_LAST_MODIFIED_MACRO_NAME, kwargs=kwargs, manifest=manifest
)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]

try:
row = table[0]
last_modified_val = get_column_value_uncased("last_modified", row)
snapshotted_at_val = get_column_value_uncased("snapshotted_at", row)
except Exception:
raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table)

if last_modified_val is None:
# Interpret missing value as "infinitely long ago"
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(last_modified_val, None, "last_modified")

snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at")

age = (snapshotted_at - max_loaded_at).total_seconds()

freshness: FreshnessResponse = {
freshness = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}

return adapter_response, freshness

def pre_model_hook(self, config: Mapping[str, Any]) -> Any:
Expand Down Expand Up @@ -1567,14 +1437,6 @@ def render_model_constraint(cls, constraint: ModelLevelConstraint) -> Optional[s
else:
return None

@classmethod
def capabilities(cls) -> CapabilityDict:
return cls._capabilities

@classmethod
def supports(cls, capability: Capability) -> bool:
return bool(cls.capabilities()[capability])


COLUMNS_EQUAL_SQL = """
with diff_count as (
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/adapters/base/query_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dbt.clients.jinja import QueryStringGenerator

from dbt.context.manifest import generate_query_header_context
from dbt.contracts.connection import AdapterRequiredConfig, QueryComment
from dbt.adapters.contracts.connection import AdapterRequiredConfig, QueryComment
from dbt.contracts.graph.nodes import ResultNode
from dbt.contracts.graph.manifest import Manifest
from dbt.exceptions import DbtRuntimeError
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
ReferencedLinkNotCachedError,
TruncatedModelNameCausedCollisionError,
)
from dbt.events.functions import fire_event, fire_event_if
from dbt.events.types import CacheAction, CacheDumpGraph
from dbt.common.events.functions import fire_event, fire_event_if
from dbt.common.events.types import CacheAction, CacheDumpGraph
from dbt.flags import get_flags
from dbt.utils import lowercase

Expand Down
Loading
Loading