Skip to content

Commit

Permalink
chore: set line length 100 (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Nov 10, 2024
1 parent 29ae4fc commit f62c91e
Show file tree
Hide file tree
Showing 374 changed files with 18,598 additions and 4,712 deletions.
15 changes: 1 addition & 14 deletions .github/workflows/semantic_pr_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
# Configure which types are allowed (newline-delimited).
# These are intentionally case-insensitive, allowing title casing or all lowercase.
# See: https://github.com/commitizen/conventional-commit-types/blob/master/index.json
types: |
fix
Expand All @@ -38,17 +39,3 @@ jobs:
Build
test
Test
# # We don't use scopes as of now
# scopes: |
# core
# ui
# JIRA-\d+

# Require capitalization for the first letter of the subject.
subjectPattern: ^[A-Z].*$
# The variables `subject` and `title` can be used within the message.
subjectPatternError: |
The subject "{subject}" found in the pull request title "{title}"
didn't match the configured pattern. Please check the title against
the naming rules. You can also use the [WIP] prefix to bypass this check.
13 changes: 10 additions & 3 deletions airbyte_cdk/config_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@

class ObservedDict(dict): # type: ignore # disallow_any_generics is set to True, and dict is equivalent to dict[Any]
def __init__(
self, non_observed_mapping: MutableMapping[Any, Any], observer: ConfigObserver, update_on_unchanged_value: bool = True
self,
non_observed_mapping: MutableMapping[Any, Any],
observer: ConfigObserver,
update_on_unchanged_value: bool = True,
) -> None:
non_observed_mapping = copy(non_observed_mapping)
self.observer = observer
Expand Down Expand Up @@ -69,11 +72,15 @@ def update(self) -> None:
emit_configuration_as_airbyte_control_message(self.config)


def observe_connector_config(non_observed_connector_config: MutableMapping[str, Any]) -> ObservedDict:
def observe_connector_config(
non_observed_connector_config: MutableMapping[str, Any],
) -> ObservedDict:
if isinstance(non_observed_connector_config, ObservedDict):
raise ValueError("This connector configuration is already observed")
connector_config_observer = ConfigObserver()
observed_connector_config = ObservedDict(non_observed_connector_config, connector_config_observer)
observed_connector_config = ObservedDict(
non_observed_connector_config, connector_config_observer
)
connector_config_observer.set_config(observed_connector_config)
return observed_connector_config

Expand Down
22 changes: 17 additions & 5 deletions airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar

import yaml
from airbyte_cdk.models import AirbyteConnectionStatus, ConnectorSpecification, ConnectorSpecificationSerializer
from airbyte_cdk.models import (
AirbyteConnectionStatus,
ConnectorSpecification,
ConnectorSpecificationSerializer,
)


def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
Expand Down Expand Up @@ -53,7 +57,9 @@ def _read_json_file(file_path: str) -> Any:
try:
return json.loads(contents)
except json.JSONDecodeError as error:
raise ValueError(f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON.")
raise ValueError(
f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON."
)

@staticmethod
def write_config(config: TConfig, config_path: str) -> None:
Expand All @@ -72,15 +78,19 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
json_spec = load_optional_package_file(package, "spec.json")

if yaml_spec and json_spec:
raise RuntimeError("Found multiple spec files in the package. Only one of spec.yaml or spec.json should be provided.")
raise RuntimeError(
"Found multiple spec files in the package. Only one of spec.yaml or spec.json should be provided."
)

if yaml_spec:
spec_obj = yaml.load(yaml_spec, Loader=yaml.SafeLoader)
elif json_spec:
try:
spec_obj = json.loads(json_spec)
except json.JSONDecodeError as error:
raise ValueError(f"Could not read json spec file: {error}. Please ensure that it is a valid JSON.")
raise ValueError(
f"Could not read json spec file: {error}. Please ensure that it is a valid JSON."
)
else:
raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.")

Expand All @@ -101,7 +111,9 @@ def write_config(config: Mapping[str, Any], config_path: str) -> None: ...

class DefaultConnectorMixin:
# can be overridden to change an input config
def configure(self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
def configure(
self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str
) -> Mapping[str, Any]:
config_path = os.path.join(temp_dir, "config.json")
self.write_config(config, config_path)
return config
Expand Down
36 changes: 28 additions & 8 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@
from typing import Any, List, Mapping

from airbyte_cdk.connector_builder.message_grouper import MessageGrouper
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
)
from airbyte_cdk.models import Type
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
ModelToComponentFactory,
)
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

Expand All @@ -34,7 +41,9 @@ class TestReadLimits:

def get_limits(config: Mapping[str, Any]) -> TestReadLimits:
command_config = config.get("__test_read_config", {})
max_pages_per_slice = command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
max_pages_per_slice = (
command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
)
max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES
max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS
return TestReadLimits(max_records, max_pages_per_slice, max_slices)
Expand Down Expand Up @@ -64,15 +73,24 @@ def read_stream(
) -> AirbyteMessage:
try:
handler = MessageGrouper(limits.max_pages_per_slice, limits.max_slices, limits.max_records)
stream_name = configured_catalog.streams[0].stream.name # The connector builder only supports a single stream
stream_read = handler.get_message_groups(source, config, configured_catalog, state, limits.max_records)
stream_name = configured_catalog.streams[
0
].stream.name # The connector builder only supports a single stream
stream_read = handler.get_message_groups(
source, config, configured_catalog, state, limits.max_records
)
return AirbyteMessage(
type=MessageType.RECORD,
record=AirbyteRecordMessage(data=dataclasses.asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()),
record=AirbyteRecordMessage(
data=dataclasses.asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()
),
)
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=filter_secrets(f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}")
exc,
message=filter_secrets(
f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}"
),
)
return error.as_airbyte_message()

Expand All @@ -88,7 +106,9 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
),
)
except Exception as exc:
error = AirbyteTracedException.from_exception(exc, message=f"Error resolving manifest: {str(exc)}")
error = AirbyteTracedException.from_exception(
exc, message=f"Error resolving manifest: {str(exc)}"
)
return error.as_airbyte_message()


Expand Down
30 changes: 24 additions & 6 deletions airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
from typing import Any, List, Mapping, Optional, Tuple

from airbyte_cdk.connector import BaseConnector
from airbyte_cdk.connector_builder.connector_builder_handler import TestReadLimits, create_source, get_limits, read_stream, resolve_manifest
from airbyte_cdk.connector_builder.connector_builder_handler import (
TestReadLimits,
create_source,
get_limits,
read_stream,
resolve_manifest,
)
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.models import (
AirbyteMessage,
Expand All @@ -22,11 +28,17 @@
from orjson import orjson


def get_config_and_catalog_from_args(args: List[str]) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
def get_config_and_catalog_from_args(
args: List[str],
) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
# TODO: Add functionality for the `debug` logger.
# Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
parsed_args = AirbyteEntrypoint.parse_args(args)
config_path, catalog_path, state_path = parsed_args.config, parsed_args.catalog, parsed_args.state
config_path, catalog_path, state_path = (
parsed_args.config,
parsed_args.catalog,
parsed_args.state,
)
if parsed_args.command != "read":
raise ValueError("Only read commands are allowed for Connector Builder requests.")

Expand Down Expand Up @@ -64,7 +76,9 @@ def handle_connector_builder_request(
if command == "resolve_manifest":
return resolve_manifest(source)
elif command == "test_read":
assert catalog is not None, "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
assert (
catalog is not None
), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
return read_stream(source, config, catalog, state, limits)
else:
raise ValueError(f"Unrecognized command {command}.")
Expand All @@ -75,14 +89,18 @@ def handle_request(args: List[str]) -> str:
limits = get_limits(config)
source = create_source(config, limits)
return orjson.dumps(
AirbyteMessageSerializer.dump(handle_connector_builder_request(source, command, config, catalog, state, limits))
AirbyteMessageSerializer.dump(
handle_connector_builder_request(source, command, config, catalog, state, limits)
)
).decode() # type: ignore[no-any-return] # Serializer.dump() always returns AirbyteMessage


if __name__ == "__main__":
try:
print(handle_request(sys.argv[1:]))
except Exception as exc:
error = AirbyteTracedException.from_exception(exc, message=f"Error handling request: {str(exc)}")
error = AirbyteTracedException.from_exception(
exc, message=f"Error handling request: {str(exc)}"
)
m = error.as_airbyte_message()
print(orjson.dumps(AirbyteMessageSerializer.dump(m)).decode())
Loading

0 comments on commit f62c91e

Please sign in to comment.