Skip to content

Commit

Permalink
Docs: Improve docs around 0.10.x features (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Apr 12, 2024
1 parent ff38fbf commit 77497c5
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 79 deletions.
48 changes: 0 additions & 48 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,54 +68,6 @@ The naming convention for secrets is as `{CONNECTOR_NAME}_{PROPERTY_NAME}`, for

PyAirbyte will also auto-discover secrets for interop with hosted Airbyte: `AIRBYTE_CLOUD_API_URL`, `AIRBYTE_CLOUD_API_KEY`, etc.

## Connector compatibility

To make a connector compatible with PyAirbyte, the following requirements must be met:

- The connector must be a Python package, with a `pyproject.toml` or a `setup.py` file.
- In the package, there must be a `run.py` file that contains a `run` method. This method should read arguments from the command line, and run the connector with them, outputting messages to stdout.
- The `pyproject.toml` or `setup.py` file must specify a command line entry point for the `run` method called `source-<connector name>`. This is usually done by adding a `console_scripts` section to the `pyproject.toml` file, or a `entry_points` section to the `setup.py` file. For example:

```toml
[tool.poetry.scripts]
source-my-connector = "my_connector.run:run"
```

```python
setup(
...
entry_points={
'console_scripts': [
'source-my-connector = my_connector.run:run',
],
},
...
)
```

To publish a connector to PyPI, specify the `pypi` section in the `metadata.yaml` file. For example:

```yaml
data:
# ...
remoteRegistries:
pypi:
enabled: true
packageName: "airbyte-source-my-connector"
```
## Validating source connectors
To validate a source connector for compliance, the `airbyte-lib-validate-source` script can be used. It can be used like this:

```bash
airbyte-lib-validate-source —connector-dir . -—sample-config secrets/config.json
```

The script will install the python package in the provided directory, and run the connector against the provided config. The config should be a valid JSON file, with the same structure as the one that would be provided to the connector in Airbyte. The script will exit with a non-zero exit code if the connector fails to run.

For a more lightweight check, the `--validate-install-only` flag can be used. This will only check that the connector can be installed and returns a spec, no sample config required.

## Contributing

To learn how you can contribute to PyAirbyte, please see our [PyAirbyte Contributors Guide](./CONTRIBUTING.md).
Expand Down
2 changes: 1 addition & 1 deletion airbyte/_util/temp_files.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""___"""
"""Internal helper functions for working with temporary files."""

from __future__ import annotations

Expand Down
10 changes: 7 additions & 3 deletions airbyte/cloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
`SyncResult` object. Currently this is supported in Snowflake and BigQuery only.
```
# Assuming we've already created a `connection` object
```python
# Assuming we've already created a `connection` object...
# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
Expand All @@ -56,8 +56,9 @@

from __future__ import annotations

from airbyte.cloud import connections, sync_results, workspaces
from airbyte.cloud import connections, constants, sync_results, workspaces
from airbyte.cloud.connections import CloudConnection
from airbyte.cloud.constants import JobStatusEnum
from airbyte.cloud.sync_results import SyncResult
from airbyte.cloud.workspaces import CloudWorkspace

Expand All @@ -66,9 +67,12 @@
# Submodules
"workspaces",
"connections",
"constants",
"sync_results",
# Classes
"CloudWorkspace",
"CloudConnection",
"SyncResult",
# Enums
"JobStatusEnum",
]
9 changes: 8 additions & 1 deletion airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@


class CloudConnection:
"""A connection is an extract-load (EL) pairing of a source and destination."""
"""A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
"""

def __init__(
self,
Expand All @@ -25,6 +28,10 @@ def __init__(
source: str | None = None,
destination: str | None = None,
) -> None:
"""It is not recommended to create a `CloudConnection` object directly.
Instead, use `CloudWorkspace.get_connection()` to create a connection object.
"""
self.connection_id = connection_id
"""The ID of the connection."""

Expand Down
26 changes: 26 additions & 0 deletions airbyte/cloud/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Useful constants for working with Airbyte Cloud features in PyAirbyte."""

from __future__ import annotations

from airbyte._util.api_imports import JobStatusEnum


FINAL_STATUSES: set[JobStatusEnum] = {
JobStatusEnum.SUCCEEDED,
JobStatusEnum.FAILED,
JobStatusEnum.CANCELLED,
}
"""The set of `.JobStatusEnum` strings that indicate a sync job has completed."""

FAILED_STATUSES: set[JobStatusEnum] = {
JobStatusEnum.FAILED,
JobStatusEnum.CANCELLED,
}
"""The set of `.JobStatusEnum` strings that indicate a sync job has failed."""

READABLE_DESTINATION_TYPES: set[str] = {
"bigquery",
"snowflake",
}
"""List of Airbyte Cloud destinations that PyAirbyte is able to read from."""
143 changes: 123 additions & 20 deletions airbyte/cloud/sync_results.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,102 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Sync results for Airbyte Cloud workspaces."""
"""Sync results for Airbyte Cloud workspaces.
## Examples
### Run a sync job and wait for completion
To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling
`.CloudWorkspace.get_connection()`.
```python
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Get a connection object
connection = workspace.get_connection(connection_id="456")
```
Once we have a `.CloudConnection` object, we can simply call `run_sync()`
to start a sync job and wait for it to complete.
```python
# Run a sync job
sync_result: SyncResult = connection.run_sync()
```
### Run a sync job and return immediately
By default, `run_sync()` will wait for the job to complete and raise an
exception if the job fails. You can instead return immediately by setting
`wait=False`.
```python
# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)
while not sync_result.is_job_complete():
print("Job is still running...")
time.sleep(5)
print(f"Job is complete! Status: {sync_result.get_job_status()}")
```
### Examining the sync result
You can examine the sync result to get more information about the job:
```python
sync_result: SyncResult = connection.run_sync()
# Print the job details
print(
f'''
Job ID: {sync_result.job_id}
Job URL: {sync_result.job_url}
Start Time: {sync_result.start_time}
Records Synced: {sync_result.records_synced}
Bytes Synced: {sync_result.bytes_synced}
Job Status: {sync_result.get_job_status()}
List of Stream Names: {', '.join(sync_result.stream_names)}
'''
)
```
### Reading data from Airbyte Cloud sync result
**This feature is currently only available for specific SQL-based destinations.** This includes
SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`.
If your destination is supported, you can read records directly from the SyncResult object.
```python
# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()
# Print a list of available stream names
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
```
------
"""

from __future__ import annotations

Expand All @@ -10,37 +107,31 @@
from typing import TYPE_CHECKING, Any, final

from airbyte._util import api_util
from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum
from airbyte.cloud._destination_util import create_cache_from_destination_config
from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES
from airbyte.datasets import CachedDataset
from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError


DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60 # 30 minutes

"""The default timeout for waiting for a sync job to complete, in seconds."""

if TYPE_CHECKING:
import sqlalchemy

from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum
from airbyte.caches.base import CacheBase
from airbyte.cloud.connections import CloudConnection
from airbyte.cloud.workspaces import CloudWorkspace


FINAL_STATUSES = {
JobStatusEnum.SUCCEEDED,
JobStatusEnum.FAILED,
JobStatusEnum.CANCELLED,
}
FAILED_STATUSES = {
JobStatusEnum.FAILED,
JobStatusEnum.CANCELLED,
}


@dataclass
class SyncResult:
"""The result of a sync operation."""
"""The result of a sync operation.
**This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
"""

workspace: CloudWorkspace
connection: CloudConnection
Expand Down Expand Up @@ -200,7 +291,10 @@ def get_sql_table(
self.get_sql_cache().processor.get_sql_table(stream_name)

def get_dataset(self, stream_name: str) -> CachedDataset:
"""Return cached dataset."""
"""Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
"""
return CachedDataset(self.get_sql_cache(), stream_name=stream_name)

def get_sql_database_name(self) -> str:
Expand All @@ -222,11 +316,15 @@ def stream_names(self) -> list[str]:
@property
def streams(
self,
) -> SyncResultStreams:
"""Return a temporary table name."""
return self.SyncResultStreams(self)
) -> _SyncResultStreams:
"""Return a mapping of stream names to `airbyte.CachedDataset` objects.
class SyncResultStreams(Mapping[str, CachedDataset]):
This is a convenience wrapper around the `stream_names`
property and `get_dataset()` method.
"""
return self._SyncResultStreams(self)

class _SyncResultStreams(Mapping[str, CachedDataset]):
"""A mapping of stream names to cached datasets."""

def __init__(
Expand All @@ -245,3 +343,8 @@ def __iter__(self) -> Iterator[str]:

def __len__(self) -> int:
return len(self.parent.stream_names)


__all__ = [
"SyncResult",
]
2 changes: 1 addition & 1 deletion airbyte/secrets/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""___"""
"""Base classes and methods for working with secrets in PyAirbyte."""

from __future__ import annotations

Expand Down
2 changes: 1 addition & 1 deletion airbyte/secrets/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""___"""
"""This module provides customization of how PyAirbyte locates secrets."""

from __future__ import annotations

Expand Down
2 changes: 1 addition & 1 deletion airbyte/secrets/custom.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""___"""
"""Custom secret manager that retrieves secrets from a custom source."""

from __future__ import annotations

Expand Down
2 changes: 1 addition & 1 deletion airbyte/secrets/env_vars.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""___"""
"""Secret manager that retrieves secrets from environment variables and `.env` files."""

from __future__ import annotations

Expand Down
2 changes: 1 addition & 1 deletion airbyte/secrets/prompt.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""___"""
"""Secret manager that prompts the user to enter a secret."""

from __future__ import annotations

Expand Down
2 changes: 1 addition & 1 deletion airbyte/secrets/util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""___"""
"""Helper functions for working with secrets."""

from __future__ import annotations

Expand Down

0 comments on commit 77497c5

Please sign in to comment.