Skip to content

Commit

Permalink
chore:apply -add-noqa, review and annotate all notations
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Nov 11, 2024
1 parent 55cb1ce commit 4b622b2
Show file tree
Hide file tree
Showing 146 changed files with 1,058 additions and 755 deletions.
5 changes: 3 additions & 2 deletions airbyte_cdk/config_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(
self,
non_observed_mapping: MutableMapping[Any, Any],
observer: ConfigObserver,
*,
update_on_unchanged_value: bool = True,
) -> None:
non_observed_mapping = copy(non_observed_mapping)
Expand All @@ -45,7 +46,7 @@ def __init__(
value[i] = ObservedDict(sub_value, observer)
super().__init__(non_observed_mapping)

def __setitem__(self, item: Any, value: Any) -> None:
def __setitem__(self, item: Any, value: Any) -> None: # noqa: ANN401 (any-type)
"""Override dict.__setitem__ by:
1. Observing the new value if it is a dict
2. Call observer update if the new value is different from the previous one
Expand Down Expand Up @@ -78,7 +79,7 @@ def observe_connector_config(
non_observed_connector_config: MutableMapping[str, Any],
) -> ObservedDict:
if isinstance(non_observed_connector_config, ObservedDict):
raise ValueError("This connector configuration is already observed")
raise ValueError("This connector configuration is already observed") # noqa: TRY004 (expected TypeError)
connector_config_observer = ConfigObserver()
observed_connector_config = ObservedDict(
non_observed_connector_config, connector_config_observer
Expand Down
13 changes: 7 additions & 6 deletions airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,27 @@ def read_config(config_path: str) -> Mapping[str, Any]:
)

@staticmethod
def _read_json_file(file_path: str) -> Any:
with open(file_path, encoding="utf-8") as file:
def _read_json_file(file_path: str) -> Any: # noqa: ANN401 (any-type)
with open(file_path, encoding="utf-8") as file: # noqa: PTH123, FURB101 (prefer pathlib)
contents = file.read()

try:
return json.loads(contents)
except json.JSONDecodeError as error:
raise ValueError(
f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON."
)
) from None

@staticmethod
def write_config(config: TConfig, config_path: str) -> None:
with open(config_path, "w", encoding="utf-8") as fh:
with open(config_path, "w", encoding="utf-8") as fh: # noqa: PTH123, FURB103 (replace with pathlib)
fh.write(json.dumps(config))

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
required to run this integration. By default, this will be loaded from a "spec.yaml" or a "spec.json" in the package root.
"""
_ = logger # unused
package = self.__class__.__module__.split(".")[0]

yaml_spec = load_optional_package_file(package, "spec.yaml")
Expand All @@ -90,7 +91,7 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
except json.JSONDecodeError as error:
raise ValueError(
f"Could not read json spec file: {error}. Please ensure that it is a valid JSON."
)
) from None
else:
raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.")

Expand All @@ -113,7 +114,7 @@ class DefaultConnectorMixin:
def configure(
self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str
) -> Mapping[str, Any]:
config_path = os.path.join(temp_dir, "config.json")
config_path = os.path.join(temp_dir, "config.json") # noqa: PTH118 (should use pathlib)
self.write_config(config, config_path)
return config

Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/connector_builder/message_grouper.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def get_message_groups(
schema = schema_inferrer.get_stream_schema(configured_stream.stream.name)
except SchemaValidationException as exception:
for validation_error in exception.validation_errors:
log_messages.append(LogMessage(validation_error, "ERROR")) # noqa: PERF401 # Could be list comprehension
log_messages.append(LogMessage(validation_error, "ERROR")) # noqa: PERF401 (consider list comprehension)
schema = exception.schema

return StreamRead(
Expand Down
24 changes: 14 additions & 10 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def parse_args(args: list[str]) -> argparse.Namespace:
def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
cmd = parsed_args.command
if not cmd:
raise Exception("No command passed")
raise Exception("No command passed") # noqa: TRY002 (vanilla exception)

if hasattr(parsed_args, "debug") and parsed_args.debug:
self.logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -174,7 +174,7 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
self.read(source_spec, config, config_catalog, state),
)
else:
raise Exception("Unexpected command " + cmd)
raise Exception("Unexpected command " + cmd) # noqa: TRY002 (vanilla exception)
finally:
yield from [
self.airbyte_message_to_string(queued_message)
Expand Down Expand Up @@ -234,7 +234,11 @@ def discover(
yield AirbyteMessage(type=Type.CATALOG, catalog=catalog)

def read(
self, source_spec: ConnectorSpecification, config: TConfig, catalog: Any, state: list[Any]
self,
source_spec: ConnectorSpecification,
config: TConfig,
catalog: Any, # noqa: ANN401 (any-type)
state: list[Any],
) -> Iterable[AirbyteMessage]:
self.set_up_secret_filter(config, source_spec.connectionSpecification)
if self.source.check_config_against_spec:
Expand Down Expand Up @@ -290,21 +294,21 @@ def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str:
return orjson.dumps(AirbyteMessageSerializer.dump(airbyte_message)).decode() # type: ignore[no-any-return] # orjson.dumps(message).decode() always returns string

@classmethod
def extract_state(cls, args: list[str]) -> Any | None:
def extract_state(cls, args: list[str]) -> Any | None: # noqa: ANN401 (any-type)
parsed_args = cls.parse_args(args)
if hasattr(parsed_args, "state"):
return parsed_args.state
return None

@classmethod
def extract_catalog(cls, args: list[str]) -> Any | None:
def extract_catalog(cls, args: list[str]) -> Any | None: # noqa: ANN401 (any-type)
parsed_args = cls.parse_args(args)
if hasattr(parsed_args, "catalog"):
return parsed_args.catalog
return None

@classmethod
def extract_config(cls, args: list[str]) -> Any | None:
def extract_config(cls, args: list[str]) -> Any | None: # noqa: ANN401 (any-type)
parsed_args = cls.parse_args(args)
if hasattr(parsed_args, "config"):
return parsed_args.config
Expand Down Expand Up @@ -333,13 +337,13 @@ def _init_internal_request_filter() -> None:
wrapped_fn = Session.send

@wraps(wrapped_fn)
def filtered_send(self: Any, request: PreparedRequest, **kwargs: Any) -> Response:
def filtered_send(self: Any, request: PreparedRequest, **kwargs: Any) -> Response: # noqa: ANN401 (any-type)
parsed_url = urlparse(request.url)

if parsed_url.scheme not in VALID_URL_SCHEMES:
raise requests.exceptions.InvalidSchema(
"Invalid Protocol Scheme: The endpoint that data is being requested from is using an invalid or insecure "
+ f"protocol {parsed_url.scheme!r}. Valid protocol schemes: {','.join(VALID_URL_SCHEMES)}"
f"protocol {parsed_url.scheme!r}. Valid protocol schemes: {','.join(VALID_URL_SCHEMES)}"
)

if not parsed_url.hostname:
Expand All @@ -359,7 +363,7 @@ def filtered_send(self: Any, request: PreparedRequest, **kwargs: Any) -> Respons
# This is a special case where the developer specifies an IP address string that is not formatted correctly like trailing
# whitespace which will fail the socket IP lookup. This only happens when using IP addresses and not text hostnames.
# Knowing that this is a request using the requests library, we will mock the exception without calling the lib
raise requests.exceptions.InvalidURL(f"Invalid URL {parsed_url}: {exception}")
raise requests.exceptions.InvalidURL(f"Invalid URL {parsed_url}: {exception}") from None

return wrapped_fn(self, request, **kwargs)

Expand Down Expand Up @@ -389,6 +393,6 @@ def main() -> None:
source = impl()

if not isinstance(source, Source):
raise Exception("Source implementation provided does not implement Source class!")
raise Exception("Source implementation provided does not implement Source class!") # noqa: TRY002, TRY004 (should raise TypeError)

launch(source, sys.argv[1:])
2 changes: 1 addition & 1 deletion airbyte_cdk/exception_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def hook_fn(
exception_type: type[BaseException],
exception_value: BaseException,
traceback_: TracebackType | None,
) -> Any:
) -> Any: # noqa: ANN401 (any-type)
# For developer ergonomics, we want to see the stack trace in the logs when we do a ctrl-c
if issubclass(exception_type, KeyboardInterrupt):
sys.__excepthook__(exception_type, exception_value, traceback_)
Expand Down
4 changes: 2 additions & 2 deletions airbyte_cdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import json
import logging
import logging.config
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, ClassVar

from orjson import orjson

Expand Down Expand Up @@ -61,7 +61,7 @@ class AirbyteLogFormatter(logging.Formatter):
"""Output log records using AirbyteMessage"""

# Transforming Python log levels to Airbyte protocol log levels
level_mapping = {
level_mapping: ClassVar[dict[int, Level]] = {
logging.FATAL: Level.FATAL,
logging.ERROR: Level.ERROR,
logging.WARNING: Level.WARN,
Expand Down
15 changes: 10 additions & 5 deletions airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,15 @@ def streams(self, config: Mapping[str, Any]) -> list[Stream]:
"""

# Stream name to instance map for applying output object transformation
_stream_to_instance_map: dict[str, Stream] = {}
_stream_to_instance_map: dict[str, Stream] = {} # noqa: RUF012 (mutable class member can leak across instances)
_slice_logger: SliceLogger = DebugSliceLogger()

def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
def discover(
self,
logger: logging.Logger,
config: Mapping[str, Any],
) -> AirbyteCatalog:
_ = logger # unused
"""Implements the Discover operation from the Airbyte Specification.
See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.
"""
Expand All @@ -94,7 +99,7 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon
return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

def read(
def read( # noqa: PLR0915
self,
logger: logging.Logger,
config: Mapping[str, Any],
Expand All @@ -104,7 +109,7 @@ def read(
"""Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/."""
logger.info(f"Starting syncing {self.name}")
config, internal_config = split_config(config)
# TODO assert all streams exist in the connector
# TODO assert all streams exist in the connector # noqa: TD004
# get the streams once in case the connector needs to make any queries to generate them
stream_instances = {s.name: s for s in self.streams(config)}
state_manager = ConnectorStateManager(state=state)
Expand Down Expand Up @@ -133,7 +138,7 @@ def read(
# Use configured_stream as stream_instance to support references in error handling.
stream_instance = configured_stream.stream

raise AirbyteTracedException(
raise AirbyteTracedException( # noqa: TRY301
message="A stream listed in your configuration was not found in the source. Please check the logs for more "
"details.",
internal_message=error_message,
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class BaseConfig(BaseModel):
"""

@classmethod
def schema(cls, *args: Any, **kwargs: Any) -> dict[str, Any]:
def schema(cls, *args: Any, **kwargs: Any) -> dict[str, Any]: # noqa: ANN401 (any-type)
"""We're overriding the schema classmethod to enable some post-processing"""
schema = super().schema(*args, **kwargs)
rename_key(schema, old_key="anyOf", new_key="oneOf") # UI supports only oneOf
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/declarative/async_job/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class AsyncJobStatus(Enum):
FAILED = ("FAILED", _TERMINAL)
TIMED_OUT = ("TIMED_OUT", _TERMINAL)

def __init__(self, value: str, is_terminal: bool) -> None: # noqa: FBT001 (positional bool arg)
def __init__(self, value: str, is_terminal: bool) -> None: # noqa: FBT001 (positional bool)
self._value = value
self._is_terminal = is_terminal

Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/declarative/auth/token.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
# ruff: noqa: A005 (shadows built-in 'token' module)
# ruff: noqa: A005 # Shadows built-in 'token' module

from __future__ import annotations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def get_concurrency_level(self) -> int:
if isinstance(self._default_concurrency, InterpolatedString):
evaluated_default_concurrency = self._default_concurrency.eval(config=self.config)
if not isinstance(evaluated_default_concurrency, int):
raise ValueError("default_concurrency did not evaluate to an integer")
raise ValueError("default_concurrency did not evaluate to an integer") # noqa: TRY004 (expected TypeError)
return (
min(evaluated_default_concurrency, self.max_concurrency)
if self.max_concurrency
Expand Down
11 changes: 6 additions & 5 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):

def __init__(
self,
catalog: ConfiguredAirbyteCatalog | None,
catalog: ConfiguredAirbyteCatalog | None, # noqa: ARG002 (unused)
config: Mapping[str, Any] | None,
state: TState,
source_config: ConnectionDefinition,
*,
debug: bool = False,
emit_connector_builder_messages: bool = False,
component_factory: ModelToComponentFactory | None = None,
**kwargs: Any,
**kwargs: Any, # noqa: ANN401, ARG002 (any-type, unused)
) -> None:
super().__init__(
source_config=source_config,
Expand Down Expand Up @@ -149,7 +150,7 @@ def read(

yield from super().read(logger, config, filtered_catalog, state)

def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: # noqa: ARG002 (unused)
concurrent_streams = self._concurrent_streams or []
synchronous_streams = self._synchronous_streams or []
return AirbyteCatalog(
Expand Down Expand Up @@ -277,13 +278,13 @@ def _stream_supports_concurrent_partition_processing(
declarative_stream.retriever.requester, HttpRequester
):
http_requester = declarative_stream.retriever.requester
if "stream_state" in http_requester._path.string:
if "stream_state" in http_requester._path.string: # noqa: SLF001 (private member)
self.logger.warning(
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing"
)
return False

request_options_provider = http_requester._request_options_provider
request_options_provider = http_requester._request_options_provider # noqa: SLF001 (private member)
if request_options_provider.request_options_contain_stream_state():
self.logger.warning(
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing"
Expand Down
4 changes: 2 additions & 2 deletions airbyte_cdk/sources/declarative/datetime/datetime_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class DatetimeParser:

_UNIX_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)

def parse(self, date: str | int, format: str) -> datetime.datetime:
def parse(self, date: str | int, format: str) -> datetime.datetime: # noqa: A002 (shadowed built-in)
# "%s" is a valid (but unreliable) directive for formatting, but not for parsing
# It is defined as
# The number of seconds since the Epoch, 1970-01-01 00:00:00+0000 (UTC). https://man7.org/linux/man-pages/man3/strptime.3.html
Expand All @@ -36,7 +36,7 @@ def parse(self, date: str | int, format: str) -> datetime.datetime:
return parsed_datetime.replace(tzinfo=datetime.timezone.utc)
return parsed_datetime

def format(self, dt: datetime.datetime, format: str) -> str:
def format(self, dt: datetime.datetime, format: str) -> str: # noqa: A002 (shadowed built-in)
# strftime("%s") is unreliable because it ignores the time zone information and assumes the time zone of the system it's running on
# It's safer to use the timestamp() method than the %s directive
# See https://stackoverflow.com/a/4974930
Expand Down
18 changes: 10 additions & 8 deletions airbyte_cdk/sources/declarative/declarative_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ def state(self, value: MutableMapping[str, Any]) -> None:
self.retriever.state = state

def get_updated_state(
self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
self,
current_stream_state: MutableMapping[str, Any], # noqa: ARG002 (unused)
latest_record: Mapping[str, Any], # noqa: ARG002 (unused)
) -> MutableMapping[str, Any]:
return self.state

Expand All @@ -131,10 +133,10 @@ def is_resumable(self) -> bool:

def read_records(
self,
sync_mode: SyncMode,
cursor_field: list[str] | None = None,
sync_mode: SyncMode, # noqa: ARG002 (unused)
cursor_field: list[str] | None = None, # noqa: ARG002 (unused)
stream_slice: Mapping[str, Any] | None = None,
stream_state: Mapping[str, Any] | None = None,
stream_state: Mapping[str, Any] | None = None, # noqa: ARG002 (unused)
) -> Iterable[Mapping[str, Any]]:
""":param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state."""
if stream_slice is None or stream_slice == {}:
Expand All @@ -145,7 +147,7 @@ def read_records(
# empty slice which seems to make sense.
stream_slice = StreamSlice(partition={}, cursor_slice={})
if not isinstance(stream_slice, StreamSlice):
raise ValueError(
raise ValueError( # noqa: TRY004 (expected TypeError)
f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}"
)
yield from self.retriever.read_records(self.get_json_schema(), stream_slice) # type: ignore # records are of the correct type
Expand All @@ -161,9 +163,9 @@ def get_json_schema(self) -> Mapping[str, Any]: # type: ignore
def stream_slices(
self,
*,
sync_mode: SyncMode,
cursor_field: list[str] | None = None,
stream_state: Mapping[str, Any] | None = None,
sync_mode: SyncMode, # noqa: ARG002 (unused)
cursor_field: list[str] | None = None, # noqa: ARG002 (unused)
stream_state: Mapping[str, Any] | None = None, # noqa: ARG002 (unused)
) -> Iterable[StreamSlice | None]:
"""Override to define the slices for this stream. See the stream slicing section of the docs for more information.
Expand Down
Loading

0 comments on commit 4b622b2

Please sign in to comment.