Skip to content

Commit

Permalink
Merge branch 'main' into GovindHede/ensure-compatible-table-schema
Browse files Browse the repository at this point in the history
  • Loading branch information
GovindHede authored Aug 10, 2024
2 parents 5b3a971 + 39fe9a3 commit 4070e2d
Show file tree
Hide file tree
Showing 14 changed files with 282 additions and 101 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/semantic_pr_check.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
name: "Verify Semantic PR Title"

on:
pull_request_target:
pull_request:
types:
- opened
- edited
- synchronize
- ready_for_review

env:
AIRBYTE_ANALYTICS_ID: ${{ vars.AIRBYTE_ANALYTICS_ID }}
Expand All @@ -19,6 +20,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: amannn/action-semantic-pull-request@v5
if: ${{ github.event.pull_request.draft == false }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
Expand Down
17 changes: 9 additions & 8 deletions airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
)

from airbyte import exceptions as exc
from airbyte._util import meta
from airbyte._util.telemetry import (
EventState,
log_config_validation_result,
log_connector_check_result,
)
from airbyte._util.temp_files import as_temp_files
from airbyte.constants import AIRBYTE_LOGGING_ROOT


if TYPE_CHECKING:
Expand Down Expand Up @@ -307,16 +307,22 @@ def _init_logger(self) -> logging.Logger:
# Prevent logging to stderr by stopping propagation to the root logger
logger.propagate = False

if AIRBYTE_LOGGING_ROOT is None:
# No temp directory available, so return a basic logger
return logger

# Else, configure the logger to write to a file

# Remove any existing handlers
for handler in logger.handlers:
logger.removeHandler(handler)

folder = meta.get_logging_root() / self.name
folder = AIRBYTE_LOGGING_ROOT / self.name
folder.mkdir(parents=True, exist_ok=True)

# Create and configure file handler
handler = logging.FileHandler(
filename=folder / f"{ulid.ULID()!s}-run-log.txt",
filename=folder / f"connector-log-{ulid.ULID()!s}.txt",
encoding="utf-8",
)
handler.setFormatter(
Expand All @@ -329,11 +335,6 @@ def _init_logger(self) -> logging.Logger:
logger.addHandler(handler)
return logger

def _new_log_file(self, verb: str = "run") -> Path:
folder = meta.get_logging_root() / self.name
folder.mkdir(parents=True, exist_ok=True)
return folder / f"{ulid.ULID()!s}-{self.name}-{verb}-log.txt"

def _peek_airbyte_message(
self,
message: AirbyteMessage,
Expand Down
19 changes: 18 additions & 1 deletion airbyte/_future_cdk/catalog_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,24 @@ def __init__(
Since the catalog is passed by reference, the catalog manager may be updated with new
streams as they are discovered.
"""
self._catalog: ConfiguredAirbyteCatalog = configured_catalog
self._catalog: ConfiguredAirbyteCatalog = self.validate_catalog(configured_catalog)

@staticmethod
def validate_catalog(catalog: ConfiguredAirbyteCatalog) -> None:
"""Validate the catalog to ensure it is valid.
This requires ensuring that `generationId` and `minGenerationId` are both set. If
not, both values will be set to `1`.
"""
for stream in catalog.streams:
if stream.generation_id is None:
stream.generation_id = 1
if stream.minimum_generation_id is None:
stream.minimum_generation_id = 1
if stream.sync_id is None:
stream.sync_id = 1 # This should ideally increment monotonically with each sync.

return catalog

@property
def configured_catalog(self) -> ConfiguredAirbyteCatalog:
Expand Down
8 changes: 6 additions & 2 deletions airbyte/_future_cdk/record_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,12 @@ def write_all_stream_data(
write_strategy: WriteStrategy,
progress_tracker: ProgressTracker,
) -> None:
"""Finalize any pending writes."""
for stream_name in self.catalog_provider.stream_names:
"""Finalize any pending writes.
Streams are processed in alphabetical order, so that order is deterministic and opaque,
without resorting to knowledge about catalog declaration order.
"""
for stream_name in sorted(self.catalog_provider.stream_names):
self.write_stream_data(
stream_name,
write_strategy=write_strategy,
Expand Down
12 changes: 6 additions & 6 deletions airbyte/_future_cdk/sql_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,13 +305,13 @@ def _get_table_by_name(
def _ensure_schema_exists(
self,
) -> None:
"""Return a new (unique) temporary table name."""
schema_name = self.sql_config.schema_name

if self._known_schemas_list and self.sql_config.schema_name in self._known_schemas_list:
schema_name = self.normalizer.normalize(self.sql_config.schema_name)
known_schemas_list = self.normalizer.normalize_list(self._known_schemas_list)
if known_schemas_list and schema_name in known_schemas_list:
return # Already exists

if schema_name in self._get_schemas_list():
schemas_list = self.normalizer.normalize_list(self._get_schemas_list())
if schema_name in schemas_list:
return

sql = f"CREATE SCHEMA IF NOT EXISTS {schema_name}"
Expand All @@ -324,7 +324,7 @@ def _ensure_schema_exists(
raise

if DEBUG_MODE:
found_schemas = self._get_schemas_list()
found_schemas = schemas_list
assert (
schema_name in found_schemas
), f"Schema {schema_name} was not created. Found: {found_schemas}"
Expand Down
3 changes: 3 additions & 0 deletions airbyte/_message_iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)

from airbyte.constants import AB_EXTRACTED_AT_COLUMN
from airbyte.progress import _new_stream_success_message


if TYPE_CHECKING:
Expand Down Expand Up @@ -90,6 +91,8 @@ def generator() -> Generator[AirbyteMessage, None, None]:
state=state_provider.get_stream_state(stream_name),
)

yield _new_stream_success_message(stream_name)

return cls(generator())

@classmethod
Expand Down
16 changes: 0 additions & 16 deletions airbyte/_util/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import os
import sys
import tempfile
from contextlib import suppress
from functools import lru_cache
from pathlib import Path
Expand All @@ -21,21 +20,6 @@
"""URL to get the current Google Colab session information."""


@lru_cache
def get_logging_root() -> Path:
"""Return the root directory for logs.
This is the directory where logs are stored.
"""
if "AIRBYTE_LOGGING_ROOT" in os.environ:
log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"])
else:
log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs"

log_root.mkdir(parents=True, exist_ok=True)
return log_root


def get_colab_release_version() -> str | None:
if "COLAB_RELEASE_TAG" in os.environ:
return os.environ["COLAB_RELEASE_TAG"]
Expand Down
4 changes: 2 additions & 2 deletions airbyte/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from airbyte._future_cdk.state_writers import StdOutStateWriter
from airbyte.caches._catalog_backend import CatalogBackendBase, SqlCatalogBackend
from airbyte.caches._state_backend import SqlStateBackend
from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE
from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE, TEMP_FILE_CLEANUP
from airbyte.datasets._sql import CachedDataset


Expand Down Expand Up @@ -49,7 +49,7 @@ class CacheBase(SqlConfig):
cache_dir: Path = Field(default=Path(".cache"))
"""The directory to store the cache in."""

cleanup: bool = True
cleanup: bool = TEMP_FILE_CLEANUP
"""Whether to clean up the cache after use."""

_deployed_api_root: Optional[str] = PrivateAttr(default=None)
Expand Down
64 changes: 64 additions & 0 deletions airbyte/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@

from __future__ import annotations

import os
import tempfile
import warnings
from functools import lru_cache
from pathlib import Path


DEBUG_MODE = False # Set to True to enable additional debug logging.

Expand Down Expand Up @@ -41,3 +47,61 @@

DEFAULT_ARROW_MAX_CHUNK_SIZE = 100_000
"""The default number of records to include in each batch of an Arrow dataset."""


@lru_cache
def _get_logging_root() -> Path | None:
"""Return the root directory for logs.
Returns `None` if no valid path can be found.
This is the directory where logs are stored.
"""
if "AIRBYTE_LOGGING_ROOT" in os.environ:
log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"])
else:
log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs"

try:
# Attempt to create the log root directory if it does not exist
log_root.mkdir(parents=True, exist_ok=True)
except OSError:
# Handle the error by returning None
warnings.warn(
(
f"Failed to create PyAirbyte logging directory at `{log_root}`. "
"You can override the default path by setting the `AIRBYTE_LOGGING_ROOT` "
"environment variable."
),
category=UserWarning,
stacklevel=0,
)
return None
else:
return log_root


AIRBYTE_LOGGING_ROOT: Path | None = _get_logging_root()
"""The root directory for Airbyte logs.
This value can be overridden by setting the `AIRBYTE_LOGGING_ROOT` environment variable.
If not provided, PyAirbyte will use `/tmp/airbyte/logs/` where `/tmp/` is the OS's default
temporary directory. If the directory cannot be created, PyAirbyte will log a warning and
set this value to `None`.
"""

TEMP_FILE_CLEANUP = bool(
os.getenv(
key="AIRBYTE_TEMP_FILE_CLEANUP",
default="true",
)
.lower()
.replace("false", "")
.replace("0", "")
)
"""Whether to clean up temporary files after use.
This value is read from the `AIRBYTE_TEMP_FILE_CLEANUP` environment variable. If the variable is
not set, the default value is `True`.
"""
26 changes: 16 additions & 10 deletions airbyte/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class PyAirbyteError(Exception):
guidance: str | None = None
help_url: str | None = None
log_text: str | list[str] | None = None
log_file: Path | None = None
context: dict[str, Any] | None = None
message: str | None = None

Expand All @@ -81,7 +82,7 @@ def get_message(self) -> str:

def __str__(self) -> str:
"""Return a string representation of the exception."""
special_properties = ["message", "guidance", "help_url", "log_text", "context"]
special_properties = ["message", "guidance", "help_url", "log_text", "context", "log_file"]
display_properties = {
k: v
for k, v in self.__dict__.items()
Expand All @@ -99,13 +100,16 @@ def __str__(self) -> str:
if isinstance(self.log_text, list):
self.log_text = "\n".join(self.log_text)

exception_str += f"\nLog output: \n {indent(self.log_text, ' ')}"
exception_str += f"\n Log output: \n {indent(self.log_text, ' ')}"

if self.log_file:
exception_str += f"\n Log file: {self.log_file.absolute()!s}"

if self.guidance:
exception_str += f"\nSuggestion: {self.guidance}"
exception_str += f"\n Suggestion: {self.guidance}"

if self.help_url:
exception_str += f"\nMore info: {self.help_url}"
exception_str += f"\n More info: {self.help_url}"

return exception_str

Expand Down Expand Up @@ -263,13 +267,13 @@ class AirbyteConnectorError(PyAirbyteError):
connector_name: str | None = None

def __post_init__(self) -> None:
"""Log the error message when the exception is raised."""
"""Set the log file path for the connector."""
self.log_file = self._get_log_file()

def _get_log_file(self) -> Path | None:
"""Return the log file path for the connector."""
if self.connector_name:
logger = logging.getLogger(f"airbyte.{self.connector_name}")
if self.connector_name:
logger.error(str(self))
else:
logger.error(str(self))

log_paths: list[Path] = [
Path(handler.baseFilename).absolute()
Expand All @@ -278,7 +282,9 @@ def __post_init__(self) -> None:
]

if log_paths:
print(f"Connector logs: {', '.join(str(path) for path in log_paths)}")
return log_paths[0]

return None


class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError):
Expand Down
Loading

0 comments on commit 4070e2d

Please sign in to comment.