Skip to content

Commit

Permalink
apply auto-fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Nov 10, 2024
1 parent 9e5bdb1 commit fb99f92
Show file tree
Hide file tree
Showing 454 changed files with 5,509 additions and 5,833 deletions.
13 changes: 7 additions & 6 deletions airbyte_cdk/config_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
)

import time
from collections.abc import MutableMapping
from copy import copy
from typing import Any, List, MutableMapping
from typing import Any

from orjson import orjson

from airbyte_cdk.models import (
AirbyteControlConnectorConfigMessage,
Expand All @@ -18,7 +21,6 @@
OrchestratorType,
Type,
)
from orjson import orjson


class ObservedDict(dict): # type: ignore # disallow_any_generics is set to True, and dict is equivalent to dict[Any]
Expand All @@ -37,7 +39,7 @@ def __init__(
non_observed_mapping[item] = ObservedDict(value, observer)

# Observe nested list of dicts
if isinstance(value, List):
if isinstance(value, list):
for i, sub_value in enumerate(value):
if isinstance(sub_value, MutableMapping):
value[i] = ObservedDict(sub_value, observer)
Expand All @@ -51,7 +53,7 @@ def __setitem__(self, item: Any, value: Any) -> None:
previous_value = self.get(item)
if isinstance(value, MutableMapping):
value = ObservedDict(value, self.observer)
if isinstance(value, List):
if isinstance(value, list):
for i, sub_value in enumerate(value):
if isinstance(sub_value, MutableMapping):
value[i] = ObservedDict(sub_value, self.observer)
Expand Down Expand Up @@ -86,8 +88,7 @@ def observe_connector_config(


def emit_configuration_as_airbyte_control_message(config: MutableMapping[str, Any]) -> None:
"""
WARNING: deprecated - emit_configuration_as_airbyte_control_message is being deprecated in favor of the MessageRepository mechanism.
"""WARNING: deprecated - emit_configuration_as_airbyte_control_message is being deprecated in favor of the MessageRepository mechanism.
See the airbyte_cdk.sources.message package
"""
airbyte_message = create_connector_config_control_message(config)
Expand Down
28 changes: 12 additions & 16 deletions airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from __future__ import annotations

import json
import logging
import os
import pkgutil
from abc import ABC, abstractmethod
from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar
from collections.abc import Mapping
from typing import Any, Generic, Protocol, TypeVar

import yaml

from airbyte_cdk.models import (
AirbyteConnectionStatus,
ConnectorSpecification,
ConnectorSpecificationSerializer,
)


def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
def load_optional_package_file(package: str, filename: str) -> bytes | None:
"""Gets a resource from a package, returning None if it does not exist"""
try:
return pkgutil.get_data(package, filename)
Expand All @@ -35,23 +37,20 @@ class BaseConnector(ABC, Generic[TConfig]):

@abstractmethod
def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig:
"""
Persist config in temporary directory to run the Source job
"""
"""Persist config in temporary directory to run the Source job"""

@staticmethod
def read_config(config_path: str) -> Mapping[str, Any]:
config = BaseConnector._read_json_file(config_path)
if isinstance(config, Mapping):
return config
else:
raise ValueError(
f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config."
)
raise ValueError(
f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config."
)

@staticmethod
def _read_json_file(file_path: str) -> Any:
with open(file_path, "r") as file:
with open(file_path) as file:
contents = file.read()

try:
Expand All @@ -67,11 +66,9 @@ def write_config(config: TConfig, config_path: str) -> None:
fh.write(json.dumps(config))

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
"""Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
required to run this integration. By default, this will be loaded from a "spec.yaml" or a "spec.json" in the package root.
"""

package = self.__class__.__module__.split(".")[0]

yaml_spec = load_optional_package_file(package, "spec.yaml")
Expand All @@ -98,8 +95,7 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:

@abstractmethod
def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
"""Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
to the Stripe API.
"""

Expand Down
13 changes: 8 additions & 5 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from __future__ import annotations

import dataclasses
from collections.abc import Mapping
from datetime import datetime
from typing import Any, List, Mapping
from typing import Any

from airbyte_cdk.connector_builder.message_grouper import MessageGrouper
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
Type,
)
from airbyte_cdk.models import Type
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
Expand All @@ -23,6 +25,7 @@
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5
DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5
DEFAULT_MAXIMUM_RECORDS = 100
Expand Down Expand Up @@ -68,7 +71,7 @@ def read_stream(
source: DeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
state: list[AirbyteStateMessage],
limits: TestReadLimits,
) -> AirbyteMessage:
try:
Expand All @@ -89,7 +92,7 @@ def read_stream(
error = AirbyteTracedException.from_exception(
exc,
message=filter_secrets(
f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}"
f"Error reading stream with config={config} and catalog={configured_catalog}: {exc!s}"
),
)
return error.as_airbyte_message()
Expand All @@ -107,7 +110,7 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
)
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error resolving manifest: {str(exc)}"
exc, message=f"Error resolving manifest: {exc!s}"
)
return error.as_airbyte_message()

Expand Down
25 changes: 13 additions & 12 deletions airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from __future__ import annotations

import sys
from typing import Any, List, Mapping, Optional, Tuple
from collections.abc import Mapping
from typing import Any

from orjson import orjson

from airbyte_cdk.connector import BaseConnector
from airbyte_cdk.connector_builder.connector_builder_handler import (
Expand All @@ -25,12 +28,11 @@
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.source import Source
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from orjson import orjson


def get_config_and_catalog_from_args(
args: List[str],
) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
args: list[str],
) -> tuple[str, Mapping[str, Any], ConfiguredAirbyteCatalog | None, Any]:
# TODO: Add functionality for the `debug` logger.
# Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
parsed_args = AirbyteEntrypoint.parse_args(args)
Expand Down Expand Up @@ -69,22 +71,21 @@ def handle_connector_builder_request(
source: ManifestDeclarativeSource,
command: str,
config: Mapping[str, Any],
catalog: Optional[ConfiguredAirbyteCatalog],
state: List[AirbyteStateMessage],
catalog: ConfiguredAirbyteCatalog | None,
state: list[AirbyteStateMessage],
limits: TestReadLimits,
) -> AirbyteMessage:
if command == "resolve_manifest":
return resolve_manifest(source)
elif command == "test_read":
if command == "test_read":
assert (
catalog is not None
), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
return read_stream(source, config, catalog, state, limits)
else:
raise ValueError(f"Unrecognized command {command}.")
raise ValueError(f"Unrecognized command {command}.")


def handle_request(args: List[str]) -> str:
def handle_request(args: list[str]) -> str:
command, config, catalog, state = get_config_and_catalog_from_args(args)
limits = get_limits(config)
source = create_source(config, limits)
Expand All @@ -100,7 +101,7 @@ def handle_request(args: List[str]) -> str:
print(handle_request(sys.argv[1:]))
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error handling request: {str(exc)}"
exc, message=f"Error handling request: {exc!s}"
)
m = error.as_airbyte_message()
print(orjson.dumps(AirbyteMessageSerializer.dump(m)).decode())
Loading

0 comments on commit fb99f92

Please sign in to comment.