From e6c0c9e9b08407da2541accb26e9ec023cd99abf Mon Sep 17 00:00:00 2001 From: Augustin Date: Fri, 6 Dec 2024 15:18:56 +0100 Subject: [PATCH 01/13] ci: update connector-tests.yml worfklow to use dev binary of airbyte-ci (#131) --- .github/workflows/connector-tests.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/connector-tests.yml b/.github/workflows/connector-tests.yml index 666b722e..f0d9a81c 100644 --- a/.github/workflows/connector-tests.yml +++ b/.github/workflows/connector-tests.yml @@ -88,10 +88,10 @@ jobs: # cdk_extra: n/a # TODO: These are manifest connectors and won't work as expected until we # add `--use-local-cdk` support for manifest connectors. - # - connector: source-the-guardian-api - # cdk_extra: n/a - # - connector: source-pokeapi - # cdk_extra: n/a + - connector: source-the-guardian-api + cdk_extra: n/a + - connector: source-pokeapi + cdk_extra: n/a name: "Check: '${{matrix.connector}}' (skip=${{needs.cdk_changes.outputs['src'] == 'false' || needs.cdk_changes.outputs[matrix.cdk_extra] == 'false'}})" permissions: From d3cece77feb83784c2d11423e0803fc7b59aa49c Mon Sep 17 00:00:00 2001 From: Augustin Date: Fri, 6 Dec 2024 17:53:51 +0100 Subject: [PATCH 02/13] chore: update SDM image to be rootless (#143) --- Dockerfile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 3b0f34a1..2f87497e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM docker.io/airbyte/python-connector-base:2.0.0@sha256:c44839ba84406116e8ba68722a0f30e8f6e7056c726f447681bb9e9ece8bd916 +FROM docker.io/airbyte/python-connector-base:3.0.0@sha256:1a0845ff2b30eafa793c6eee4e8f4283c2e52e1bbd44eed6cb9e9abd5d34d844 WORKDIR /airbyte/integration_code @@ -26,3 +26,4 @@ RUN rm -rf dist/ pyproject.toml poetry.lock README.md # Set the entrypoint ENV AIRBYTE_ENTRYPOINT="python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] +USER airbyte From 59505ffdb235c8344a7a82d222700b17028119a8 Mon Sep 17 00:00:00 2001 From: Augustin Date: Fri, 6 Dec 2024 18:09:36 +0100 Subject: [PATCH 03/13] chore: revert SDM image change (#145) --- Dockerfile | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2f87497e..3b0f34a1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM docker.io/airbyte/python-connector-base:3.0.0@sha256:1a0845ff2b30eafa793c6eee4e8f4283c2e52e1bbd44eed6cb9e9abd5d34d844 +FROM docker.io/airbyte/python-connector-base:2.0.0@sha256:c44839ba84406116e8ba68722a0f30e8f6e7056c726f447681bb9e9ece8bd916 WORKDIR /airbyte/integration_code @@ -26,4 +26,3 @@ RUN rm -rf dist/ pyproject.toml poetry.lock README.md # Set the entrypoint ENV AIRBYTE_ENTRYPOINT="python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -USER airbyte From 0f40e22448cc78bf2dc210e59ee0f3d7ccb2025d Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 6 Dec 2024 12:02:46 -0800 Subject: [PATCH 04/13] ci: update publish workflows, removing old workflow, tweaking main workflow for usability, and updating release docs (#151) --- .github/workflows/publish_sdm_connector.yml | 178 -------------------- .github/workflows/pypi_publish.yml | 4 +- docs/RELEASES.md | 36 +--- 3 files changed, 11 insertions(+), 207 deletions(-) delete mode 100644 .github/workflows/publish_sdm_connector.yml diff --git a/.github/workflows/publish_sdm_connector.yml b/.github/workflows/publish_sdm_connector.yml deleted file mode 100644 index 3dcb86de..00000000 --- a/.github/workflows/publish_sdm_connector.yml +++ /dev/null @@ -1,178 +0,0 @@ -# This flow publishes the Source-Declarative-Manifest (SDM) -# connector to DockerHub as a Docker image. -# TODO: Delete this workflow file once the unified publish flow is implemented and proven stable. - -name: Publish SDM Connector - -on: - workflow_dispatch: - inputs: - version: - description: The version to publish, ie 1.0.0 or 1.0.0-dev1. - If omitted, and if run from a release branch, the version will be - inferred from the git tag. - If omitted, and if run from a non-release branch, then only a SHA-based - Docker tag will be created. - required: false - dry_run: - description: If true, the workflow will not push to DockerHub. - type: boolean - required: false - default: false - -jobs: - build: - runs-on: ubuntu-latest - steps: - - name: Detect Release Tag Version - if: startsWith(github.ref, 'refs/tags/v') - run: | - DETECTED_VERSION=${{ github.ref_name }} - echo "Version ref set to '${DETECTED_VERSION}'" - # Remove the 'v' prefix if it exists - DETECTED_VERSION="${DETECTED_VERSION#v}" - echo "Setting version to '$DETECTED_VERSION'" - echo "DETECTED_VERSION=${DETECTED_VERSION}" >> $GITHUB_ENV - - - name: Validate and set VERSION from tag ('${{ github.ref_name }}') and input (${{ github.event.inputs.version || 'none' }}) - id: set_version - if: github.event_name == 'workflow_dispatch' - run: | - INPUT_VERSION=${{ github.event.inputs.version }} - echo "Version input set to '${INPUT_VERSION}'" - # Exit with success if both detected and input versions are empty - if [ -z "${DETECTED_VERSION:-}" ] && [ -z "${INPUT_VERSION:-}" ]; then - echo "No version detected or input. Will publish to SHA tag instead." - echo 'VERSION=' >> $GITHUB_ENV - exit 0 - fi - # Remove the 'v' prefix if it exists - INPUT_VERSION="${INPUT_VERSION#v}" - # Fail if detected version is non-empty and different from the input version - if [ -n "${DETECTED_VERSION:-}" ] && [ -n "${INPUT_VERSION:-}" ] && [ "${DETECTED_VERSION}" != "${INPUT_VERSION}" ]; then - echo "Error: Version input '${INPUT_VERSION}' does not match detected version '${DETECTED_VERSION}'." - exit 1 - fi - # Set the version to the input version if non-empty, otherwise the detected version - VERSION="${INPUT_VERSION:-$DETECTED_VERSION}" - # Fail if the version is still empty - if [ -z "$VERSION" ]; then - echo "Error: VERSION is not set. Ensure the tag follows the format 'refs/tags/vX.Y.Z'." - exit 1 - fi - echo "Setting version to '$VERSION'" - echo "VERSION=${VERSION}" >> $GITHUB_ENV - echo "VERSION=${VERSION}" >> $GITHUB_OUTPUT - # Check if version is a prerelease version (will not tag 'latest') - if [[ "${VERSION}" =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; then - echo "IS_PRERELEASE=false" >> $GITHUB_ENV - echo "IS_PRERELEASE=false" >> $GITHUB_OUTPUT - else - echo "IS_PRERELEASE=true" >> $GITHUB_ENV - echo "IS_PRERELEASE=true" >> $GITHUB_OUTPUT - fi - - - uses: actions/checkout@v4 - with: - fetch-depth: 0 - - - uses: hynek/build-and-inspect-python-package@v2 - name: Build package with version ref '${{ env.VERSION || '0.0.0dev0' }}' - env: - # Pass in the evaluated version from the previous step - # More info: https://github.com/mtkennerly/poetry-dynamic-versioning#user-content-environment-variables - POETRY_DYNAMIC_VERSIONING_BYPASS: ${{ env.VERSION || '0.0.0dev0'}} - - - uses: actions/upload-artifact@v4 - with: - name: Packages-${{ github.run_id }} - path: | - /tmp/baipp/dist/*.whl - /tmp/baipp/dist/*.tar.gz - outputs: - VERSION: ${{ steps.set_version.outputs.VERSION }} - IS_PRERELEASE: ${{ steps.set_version.outputs.IS_PRERELEASE }} - - publish_sdm: - name: Publish SDM to DockerHub - if: startsWith(github.ref, 'refs/tags/v') || github.event_name == 'workflow_dispatch' - runs-on: ubuntu-latest - needs: [build] - environment: - name: DockerHub - url: https://hub.docker.com/r/airbyte/source-declarative-manifest/tags - env: - VERSION: ${{ needs.build.outputs.VERSION }} - IS_PRERELEASE: ${{ needs.build.outputs.IS_PRERELEASE }} - - steps: - - uses: actions/checkout@v4 - with: - fetch-depth: 0 - - # We need to download the build artifact again because the previous job was on a different runner - - name: Download Build Artifact - uses: actions/download-artifact@v4 - with: - name: Packages-${{ github.run_id }} - path: dist - - - name: Set up QEMU for multi-platform builds - uses: docker/setup-qemu-action@v3 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Login to Docker Hub - uses: docker/login-action@v3 - with: - username: ${{ secrets.DOCKER_HUB_USERNAME }} - password: ${{ secrets.DOCKER_HUB_PASSWORD }} - - - name: "Check for existing tag (version: ${{ env.VERSION || 'none' }} )" - if: env.VERSION != '' - run: | - tag="airbyte/source-declarative-manifest:${{ env.VERSION }}" - if [ -z "$tag" ]; then - echo "Error: VERSION is not set. Ensure the tag follows the format 'refs/tags/vX.Y.Z'." - exit 1 - fi - echo "Checking if tag '$tag' exists on DockerHub..." - if DOCKER_CLI_EXPERIMENTAL=enabled docker manifest inspect "$tag" > /dev/null 2>&1; then - echo "The tag '$tag' already exists on DockerHub. Skipping publish to prevent overwrite." - exit 1 - fi - echo "No existing tag '$tag' found. Proceeding with publish." - - - name: Build and push (sha tag) - # Only run if the version is not set - if: env.VERSION == '' && github.event.inputs.dry_run == 'false' - uses: docker/build-push-action@v5 - with: - context: . - platforms: linux/amd64,linux/arm64 - push: true - tags: | - airbyte/source-declarative-manifest:${{ github.sha }} - - - name: "Build and push (version tag: ${{ env.VERSION || 'none'}})" - # Only run if the version is set - if: env.VERSION != '' && github.event.inputs.dry_run == 'false' - uses: docker/build-push-action@v5 - with: - context: . - platforms: linux/amd64,linux/arm64 - push: true - tags: | - airbyte/source-declarative-manifest:${{ env.VERSION }} - - - name: Build and push ('latest' tag) - # Only run if version is set and IS_PRERELEASE is false - if: env.VERSION != '' && env.IS_PRERELEASE == 'false' && github.event.inputs.dry_run == 'false' - uses: docker/build-push-action@v5 - with: - context: . - platforms: linux/amd64,linux/arm64 - push: true - tags: | - airbyte/source-declarative-manifest:latest diff --git a/.github/workflows/pypi_publish.yml b/.github/workflows/pypi_publish.yml index 2bfd7cb2..8baf6765 100644 --- a/.github/workflows/pypi_publish.yml +++ b/.github/workflows/pypi_publish.yml @@ -14,7 +14,7 @@ on: workflow_dispatch: inputs: version: - description: "Version. The version to publish, ie 1.0.0 or 1.0.0-dev1. In most cases, you can leave this blank. If run from a release tag (recommended), the version number will be inferred from the git tag." + description: "Note that this workflow is intended for prereleases. For public-facing stable releases, please use the GitHub Releases workflow instead: https://github.com/airbytehq/airbyte-python-cdk/blob/main/docs/RELEASES.md. If running this workflow from main or from a dev branch, please enter the desired version number here, for instance 1.2.3dev0 or 1.2.3rc1." required: false publish_to_pypi: description: "Publish to PyPI. If true, the workflow will publish to PyPI." @@ -30,7 +30,7 @@ on: description: "Update Connector Builder. If true, the workflow will create a PR to bump the CDK version used by Connector Builder." type: boolean required: true - default: true + default: false jobs: build: diff --git a/docs/RELEASES.md b/docs/RELEASES.md index c51ebd6b..5a92e090 100644 --- a/docs/RELEASES.md +++ b/docs/RELEASES.md @@ -1,6 +1,6 @@ # Airbyte Python CDK - Release Management Guide -## Publishing stable releases of the CDK +## Publishing stable releases of the CDK and SDM A few seconds after any PR is merged to `main` , a release draft will be created or updated on the releases page here: https://github.com/airbytehq/airbyte-python-cdk/releases. Here are the steps to publish a CDK release: @@ -13,34 +13,16 @@ A few seconds after any PR is merged to `main` , a release draft will be created - *Only maintainers can see release drafts. Non-maintainers will only see published releases.* - If you create a tag on accident that you need to remove, contact a maintainer to delete the tag and the release. -- You can monitor the PyPi release process here in the GitHub Actions view: https://github.com/airbytehq/airbyte-python-cdk/actions/workflows/pypi_publish.yml +- You can monitor the PyPI release process here in the GitHub Actions view: https://github.com/airbytehq/airbyte-python-cdk/actions/workflows/pypi_publish.yml - **_[▶️ Loom Walkthrough](https://www.loom.com/share/ceddbbfc625141e382fd41c4f609dc51?sid=78e13ef7-16c8-478a-af47-4978b3ff3fad)_** -## Publishing Pre-Release Versions of the CDK +## Publishing Pre-Release Versions of the CDK and/or SDM (Internal) -Publishing a pre-release version is similar to publishing a stable version. However, instead of using the auto-generated release draft, you’ll create a new release draft. +This process is slightly different from the above, since we don't necessarily want public release notes to be published for internal testing releases. The same underlying workflow will be run, but we'll kick it off directly: -1. Navigate to the releases page: https://github.com/airbytehq/airbyte-python-cdk/releases -2. Click “Draft a new release”. -3. In the tag selector, type the version number of the prerelease you’d like to create and copy-past the same into the Release name box. - - The release should be like `vX.Y.Zsuffix` where `suffix` is something like `dev0`, `dev1` , `alpha0`, `beta1`, etc. - -## Publishing new versions of SDM (source-declarative-manifest) - -Prereqs: - -1. The SDM publish process assumes you have already published the CDK. If you have not already done so, you’ll want to first publish the CDK using the steps above. While this prereq is not technically *required*, it is highly recommended. - -Publish steps: - -1. Navigate to the GitHub action page here: https://github.com/airbytehq/airbyte-python-cdk/actions/workflows/publish_sdm_connector.yml -2. Click “Run workflow” to start the process of invoking a new manual workflow. -3. Click the drop-down for “Run workflow from” and then select the “tags” tab to browse already-created tags. Select the tag of the published CDK version you want to use for the SDM publish process. Notes: - 1. Optionally you can type part of the version number to filter down the list. - 2. You can ignore the version prompt box (aka leave blank) when publishing from a release tag. The version will be detected from the git tag. - 3. You can optionally click the box for “Dry run” if you want to observe the process before running the real thing. The dry run option will perform all steps *except* for the DockerHub publish step. -4. Without changing any other options, you can click “Run workflow” to run the workflow. -5. Watch the GitHub Action run. If successful, you should see it publish to DockerHub and a URL will appear on the “Summary” view once it has completed. - -- **_[▶️ Loom Walkthrough](https://www.loom.com/share/bc8ddffba9384fcfacaf535608360ee1)_** +1. Navigate to the "Packaging and Publishing" workflow in GitHub Actions. +2. Type the version number - including a valid pre-release suffix. Examples: `1.2.3dev0`, `1.2.3rc1`, `1.2.3b0`, etc. +3. Select `main` or your dev branch from the "Use workflow from" dropdown. +4. Select your options and click "Run workflow". +5. Monitor the workflow to ensure the process has succeeded. From 580f60ecc5b6720e2601b807614ebf8131bbc746 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 6 Dec 2024 12:44:24 -0800 Subject: [PATCH 05/13] ci: make it easier to find the connector test report url upon failure (#153) --- .github/workflows/connector-tests.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/connector-tests.yml b/.github/workflows/connector-tests.yml index f0d9a81c..e24761f4 100644 --- a/.github/workflows/connector-tests.yml +++ b/.github/workflows/connector-tests.yml @@ -163,9 +163,10 @@ jobs: fi echo -e "\n[Download Job Output](${{steps.upload_job_output.outputs.artifact-url}})" >> $GITHUB_STEP_SUMMARY if [ "${success}" != "true" ]; then - echo "::error::Test failed for connector '${{ matrix.connector }}' on step '${failed_step}'. Check the logs for more details." + echo "::error::Test failed for connector '${{ matrix.connector }}' on step '${failed_step}'. " exit 1 fi + echo "See the execution report for details: ${html_report_url}" echo "success=${success}" >> $GITHUB_OUTPUT echo "html_report_url=${html_report_url}" >> $GITHUB_OUTPUT From d516523fcecca1e314d55302b7867d2dd06616d3 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 6 Dec 2024 17:38:36 -0800 Subject: [PATCH 06/13] ci: add `poe install` definition to save some keystrokes (#154) --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index c3267ed7..fbc7ad7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -124,6 +124,9 @@ line-length = 100 select = ["I"] [tool.poe.tasks] +# Installation +install = { shell = "poetry install --all-extras" } + # Build tasks assemble = {cmd = "bin/generate-component-manifest-dagger.sh", help = "Generate component manifest files."} build-package = {cmd = "poetry build", help = "Build the python package: source and wheels archives."} From 6ab54c0def3c0b5e4732944b93fa418cf9c3cd66 Mon Sep 17 00:00:00 2001 From: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com> Date: Mon, 9 Dec 2024 12:06:24 -0500 Subject: [PATCH 07/13] fix(concurrent): Reduce occurrences of database table is locked errors (#146) --- .../sources/streams/http/http_client.py | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 4f99bbeb..71548441 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -138,12 +138,22 @@ def _request_session(self) -> requests.Session: cache_dir = os.getenv(ENV_REQUEST_CACHE_PATH) # Use in-memory cache if cache_dir is not set # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests - if cache_dir: - sqlite_path = str(Path(cache_dir) / self.cache_filename) - else: - sqlite_path = "file::memory:?cache=shared" + # Use in-memory cache if cache_dir is not set + # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests + sqlite_path = ( + str(Path(cache_dir) / self.cache_filename) + if cache_dir + else "file::memory:?cache=shared" + ) + # By using `PRAGMA synchronous=OFF` and `PRAGMA journal_mode=WAL`, we reduce the possible occurrences of `database table is locked` errors. + # Note that those were blindly added at the same time and one or the other might be sufficient to prevent the issues but we have seen good results with both. Feel free to revisit given more information. + # There are strong signals that `fast_save` might create problems but if the sync crashes, we start back from the beginning in terms of sqlite anyway so the impact should be minimal. Signals are: + # * https://github.com/requests-cache/requests-cache/commit/7fa89ffda300331c37d8fad7f773348a3b5b0236#diff-f43db4a5edf931647c32dec28ea7557aae4cae8444af4b26c8ecbe88d8c925aaR238 + # * https://github.com/requests-cache/requests-cache/commit/7fa89ffda300331c37d8fad7f773348a3b5b0236#diff-2e7f95b7d7be270ff1a8118f817ea3e6663cdad273592e536a116c24e6d23c18R164-R168 + # * `If the application running SQLite crashes, the data will be safe, but the database [might become corrupted](https://www.sqlite.org/howtocorrupt.html#cfgerr) if the operating system crashes or the computer loses power before that data has been written to the disk surface.` in [this description](https://www.sqlite.org/pragma.html#pragma_synchronous). + backend = requests_cache.SQLiteCache(sqlite_path, fast_save=True, wal=True) return CachedLimiterSession( - sqlite_path, backend="sqlite", api_budget=self._api_budget, match_headers=True + sqlite_path, backend=backend, api_budget=self._api_budget, match_headers=True ) else: return LimiterSession(api_budget=self._api_budget) From e7ccf768b03837b07748fc64da66151b9693f760 Mon Sep 17 00:00:00 2001 From: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com> Date: Mon, 9 Dec 2024 12:06:47 -0500 Subject: [PATCH 08/13] fix(concurrent): Ensure default concurrency level does not generate deadlock (#148) --- .../sources/declarative/concurrent_declarative_source.py | 9 +++++---- .../declarative/declarative_component_schema.yaml | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index a393bbeb..aa3cea70 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -56,8 +56,9 @@ class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): - # By default, we defer to a value of 1 which represents running a connector using the Concurrent CDK engine on only one thread. - SINGLE_THREADED_CONCURRENCY_LEVEL = 1 + # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock + # because it has hit the limit of futures but not partition reader is consuming them. + _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 def __init__( self, @@ -107,8 +108,8 @@ def __init__( concurrency_level // 2, 1 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up else: - concurrency_level = self.SINGLE_THREADED_CONCURRENCY_LEVEL - initial_number_of_partitions_to_generate = self.SINGLE_THREADED_CONCURRENCY_LEVEL + concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL + initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 self._concurrent_source = ConcurrentSource.create( num_workers=concurrency_level, diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index dff7abab..48c8f8c0 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -327,7 +327,7 @@ definitions: additionalProperties: true ConcurrencyLevel: title: Concurrency Level - description: Defines the amount of parallelization for the streams that are being synced. The factor of parallelization is how many partitions or streams are synced at the same time. For example, with a concurrency_level of 10, ten streams or partitions of data will processed at the same time. + description: Defines the amount of parallelization for the streams that are being synced. The factor of parallelization is how many partitions or streams are synced at the same time. For example, with a concurrency_level of 10, ten streams or partitions of data will processed at the same time. Note that a value of 1 could create deadlock if a stream has a very high number of partitions. type: object required: - default_concurrency From 547a4a29a06ff79cc249b6a06d3f84d38123928a Mon Sep 17 00:00:00 2001 From: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com> Date: Mon, 9 Dec 2024 12:07:16 -0500 Subject: [PATCH 09/13] fix(concurrent): Fix memory issue following concurrency in source-jira (#150) --- .../declarative/interpolation/jinja.py | 71 ++++++++++--------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/airbyte_cdk/sources/declarative/interpolation/jinja.py b/airbyte_cdk/sources/declarative/interpolation/jinja.py index ec5e861c..3bb9b0c2 100644 --- a/airbyte_cdk/sources/declarative/interpolation/jinja.py +++ b/airbyte_cdk/sources/declarative/interpolation/jinja.py @@ -4,7 +4,7 @@ import ast from functools import cache -from typing import Any, Mapping, Optional, Tuple, Type +from typing import Any, Mapping, Optional, Set, Tuple, Type from jinja2 import meta from jinja2.environment import Template @@ -27,7 +27,35 @@ class StreamPartitionAccessEnvironment(SandboxedEnvironment): def is_safe_attribute(self, obj: Any, attr: str, value: Any) -> bool: if attr in ["_partition"]: return True - return super().is_safe_attribute(obj, attr, value) + return super().is_safe_attribute(obj, attr, value) # type: ignore # for some reason, mypy says 'Returning Any from function declared to return "bool"' + + +# These aliases are used to deprecate existing keywords without breaking all existing connectors. +_ALIASES = { + "stream_interval": "stream_slice", # Use stream_interval to access incremental_sync values + "stream_partition": "stream_slice", # Use stream_partition to access partition router's values +} + +# These extensions are not installed so they're not currently a problem, +# but we're still explicitly removing them from the jinja context. +# At worst, this is documentation that we do NOT want to include these extensions because of the potential security risks +_RESTRICTED_EXTENSIONS = ["jinja2.ext.loopcontrols"] # Adds support for break continue in loops + +# By default, these Python builtin functions are available in the Jinja context. +# We explicitly remove them because of the potential security risk. +# Please add a unit test to test_jinja.py when adding a restriction. +_RESTRICTED_BUILTIN_FUNCTIONS = [ + "range" +] # The range function can cause very expensive computations + +_ENVIRONMENT = StreamPartitionAccessEnvironment() +_ENVIRONMENT.filters.update(**filters) +_ENVIRONMENT.globals.update(**macros) + +for extension in _RESTRICTED_EXTENSIONS: + _ENVIRONMENT.extensions.pop(extension, None) +for builtin in _RESTRICTED_BUILTIN_FUNCTIONS: + _ENVIRONMENT.globals.pop(builtin, None) class JinjaInterpolation(Interpolation): @@ -48,34 +76,6 @@ class JinjaInterpolation(Interpolation): Additional information on jinja templating can be found at https://jinja.palletsprojects.com/en/3.1.x/templates/# """ - # These aliases are used to deprecate existing keywords without breaking all existing connectors. - ALIASES = { - "stream_interval": "stream_slice", # Use stream_interval to access incremental_sync values - "stream_partition": "stream_slice", # Use stream_partition to access partition router's values - } - - # These extensions are not installed so they're not currently a problem, - # but we're still explicitely removing them from the jinja context. - # At worst, this is documentation that we do NOT want to include these extensions because of the potential security risks - RESTRICTED_EXTENSIONS = ["jinja2.ext.loopcontrols"] # Adds support for break continue in loops - - # By default, these Python builtin functions are available in the Jinja context. - # We explicitely remove them because of the potential security risk. - # Please add a unit test to test_jinja.py when adding a restriction. - RESTRICTED_BUILTIN_FUNCTIONS = [ - "range" - ] # The range function can cause very expensive computations - - def __init__(self) -> None: - self._environment = StreamPartitionAccessEnvironment() - self._environment.filters.update(**filters) - self._environment.globals.update(**macros) - - for extension in self.RESTRICTED_EXTENSIONS: - self._environment.extensions.pop(extension, None) - for builtin in self.RESTRICTED_BUILTIN_FUNCTIONS: - self._environment.globals.pop(builtin, None) - def eval( self, input_str: str, @@ -86,7 +86,7 @@ def eval( ) -> Any: context = {"config": config, **additional_parameters} - for alias, equivalent in self.ALIASES.items(): + for alias, equivalent in _ALIASES.items(): if alias in context: # This is unexpected. We could ignore or log a warning, but failing loudly should result in fewer surprises raise ValueError( @@ -105,6 +105,7 @@ def eval( raise Exception(f"Expected a string, got {input_str}") except UndefinedError: pass + # If result is empty or resulted in an undefined error, evaluate and return the default string return self._literal_eval(self._eval(default, context), valid_types) @@ -132,16 +133,16 @@ def _eval(self, s: Optional[str], context: Mapping[str, Any]) -> Optional[str]: return s @cache - def _find_undeclared_variables(self, s: Optional[str]) -> set[str]: + def _find_undeclared_variables(self, s: Optional[str]) -> Set[str]: """ Find undeclared variables and cache them """ - ast = self._environment.parse(s) # type: ignore # parse is able to handle None + ast = _ENVIRONMENT.parse(s) # type: ignore # parse is able to handle None return meta.find_undeclared_variables(ast) @cache - def _compile(self, s: Optional[str]) -> Template: + def _compile(self, s: str) -> Template: """ We must cache the Jinja Template ourselves because we're using `from_string` instead of a template loader """ - return self._environment.from_string(s) # type: ignore [arg-type] # Expected `str | Template` but passed `str | None` + return _ENVIRONMENT.from_string(s) From a7124418b7eaf2fcef7e1fa658d88895a3fa148c Mon Sep 17 00:00:00 2001 From: Augustin Date: Mon, 9 Dec 2024 18:17:48 +0100 Subject: [PATCH 10/13] chore: make SDM image rootless (#147) Co-authored-by: Aaron ("AJ") Steers --- Dockerfile | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 3b0f34a1..400fe4d4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM docker.io/airbyte/python-connector-base:2.0.0@sha256:c44839ba84406116e8ba68722a0f30e8f6e7056c726f447681bb9e9ece8bd916 +FROM docker.io/airbyte/python-connector-base:3.0.0@sha256:1a0845ff2b30eafa793c6eee4e8f4283c2e52e1bbd44eed6cb9e9abd5d34d844 WORKDIR /airbyte/integration_code @@ -23,6 +23,10 @@ RUN mkdir -p source_declarative_manifest \ # Remove unnecessary build files RUN rm -rf dist/ pyproject.toml poetry.lock README.md +# Set ownership of /airbyte to the non-root airbyte user and group (1000:1000) +RUN chown -R 1000:1000 /airbyte + # Set the entrypoint ENV AIRBYTE_ENTRYPOINT="python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] +USER airbyte From 99b2985fc3dd3e7d8e7c0a8ed7ffac8207fb78a7 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Mon, 9 Dec 2024 13:55:08 -0800 Subject: [PATCH 11/13] docs: add SDM release troubleshooting info (#157) Co-authored-by: Christo Grabowski <108154848+ChristoGrab@users.noreply.github.com> --- docs/RELEASES.md | 54 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/docs/RELEASES.md b/docs/RELEASES.md index 5a92e090..cf74d423 100644 --- a/docs/RELEASES.md +++ b/docs/RELEASES.md @@ -26,3 +26,57 @@ This process is slightly different from the above, since we don't necessarily wa 3. Select `main` or your dev branch from the "Use workflow from" dropdown. 4. Select your options and click "Run workflow". 5. Monitor the workflow to ensure the process has succeeded. + +## Understanding and Debugging Builder and SDM Releases + +### How Connector Builder uses SDM/CDK + +The Connector Builder (written in Java) calls the CDK Python package directly, executing the CDK's Source Declarative Manfiest code via Python processes on the Builder container. (The Connector Builder does not directly invoke the SDM image, but there is an open project to change this in the future.) + +Our publish flow sends a PR to the Builder repo (`airbyte-platform-internal`) to bump the version used in Builder. The Marketplace Contributions team (aka Connector Builder maintainers) will review and merge the PR. + +### How the SDM Image is used in Platform + +The platform scans DockerHub at an [every 10 minutes cadence](https://github.com/airbytehq/airbyte-platform-internal/blob/d744174c0f3ca8fa70f3e05cca6728f067219752/oss/airbyte-cron/src/main/java/io/airbyte/cron/jobs/DeclarativeSourcesUpdater.java) as of 2024-12-09. Based on that DockerHub scan, the platform bumps the default SDM version that is stored in the `declarative_manifest_image_version` table in prod. + +Note: Currently we don't pre-test images in Platform so manual testing is needed. + +### How to confirm what SDM version is used on the Platform + +Currently there are two ways to do this. + +The first option is to look in the `declarative_manifest_image_version` database table in Prod. + +If that is not available as an option, you can run an Builder-created connector in Cloud and note the version number printed in the logs. Warning: this may not be indicative if that connector instance has been manually pinned to a specific version. + +TODO: Would be great to find a way to inspect directly without requiring direct prod DB access. + +### How to pretest changes to SDM images manually + +To manually test changes against a dev image of SDM before committing to a release, first use the Publishing & Packaging workflow to publish a pre-release version of the CDK/SDM. Be sure to uncheck the option to create a connector builder PR. + +#### Pretesting Manifest-Only connectors + +Once the publish pipeline has completed, choose a connector to test. Set the base_image in the connector's metadata to your pre-release version in Dockerhub (make sure to update the SHA as well). +Next, build the pre-release image locally using `airbyte-ci connectors —name= build`. +You can now run connector interfaces against the built image using the pattern
`docker run airbyte/:dev `. +The connector's README should include a list of these commands, which can be copy/pasted and run from the connector's directory for quick testing against a local config. +You can also run `airbyte-ci connectors —name= test` to run the CI test suite against the dev image. + +#### Pretesting Low-Code Python connectors + +Once the publish pipeline has completed, set the version of `airbyte-cdk` in the connector's pyproject.toml file to the pre-release version in PyPI. +Update the lockfile and run connector interfaces via poetry:
`poetry run source- spec/check/discover/read`. +You can also run `airbyte-ci connectors —name= test` to run the CI test suite against the dev image.

 + +#### Pretesting in Cloud + +It is possible to pretest a version of SDM in Airbyte Cloud using the following steps: + +1. Publish a pre-release version. +2. Open Cloud and create a custom source in the Builder (ie fork PokeAPI with no changes). Publish the source to your workspace. +3. Set up a connection using the forked custom source. +4. Connect to the production database with a tool like DBeaver. +5. Manually update the connector's `actor_definition_version`. + +Because this process requires accessing and updating the production database manually, it is NOT RECOMMENDED for most cases. Only do so if you understand the risks, are already confident navigating the database, and feel the potential risk of your changes breaking the CDK/SDM is high enough to warrant this process. From 42ee3b4c05821cbb9c2415a9c13f2f4e931be4e0 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Mon, 9 Dec 2024 19:18:36 -0800 Subject: [PATCH 12/13] ci: change dependabot github actions to monthly (#159) --- .github/dependabot.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 2d7ddad1..ce6e3b1a 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -21,7 +21,7 @@ updates: commit-message: prefix: "ci(deps): " schedule: - interval: daily + interval: monthly labels: - ci groups: From ed9a5e7aafb1ba4b9be04db70003e29d4c765de9 Mon Sep 17 00:00:00 2001 From: Christo Grabowski <108154848+ChristoGrab@users.noreply.github.com> Date: Tue, 10 Dec 2024 15:16:40 -0800 Subject: [PATCH 13/13] feat: add unit test fixtures for manifest-only connectors to CDK (#121) Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- .../test/utils/manifest_only_fixtures.py | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 airbyte_cdk/test/utils/manifest_only_fixtures.py diff --git a/airbyte_cdk/test/utils/manifest_only_fixtures.py b/airbyte_cdk/test/utils/manifest_only_fixtures.py new file mode 100644 index 00000000..47620e7c --- /dev/null +++ b/airbyte_cdk/test/utils/manifest_only_fixtures.py @@ -0,0 +1,60 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + + +import importlib.util +from pathlib import Path +from types import ModuleType +from typing import Optional + +import pytest + +# The following fixtures are used to load a manifest-only connector's components module and manifest file. +# They can be accessed from any test file in the connector's unit_tests directory by importing them as follows: + +# from airbyte_cdk.test.utils.manifest_only_fixtures import components_module, connector_dir, manifest_path + +# individual components can then be referenced as: components_module. + + +@pytest.fixture(scope="session") +def connector_dir(request: pytest.FixtureRequest) -> Path: + """Return the connector's root directory.""" + + current_dir = Path(request.config.invocation_params.dir) + + # If the tests are run locally from the connector's unit_tests directory, return the parent (connector) directory + if current_dir.name == "unit_tests": + return current_dir.parent + # In CI, the tests are run from the connector directory itself + return current_dir + + +@pytest.fixture(scope="session") +def components_module(connector_dir: Path) -> Optional[ModuleType]: + """Load and return the components module from the connector directory. + + This assumes the components module is located at /components.py. + """ + components_path = connector_dir / "components.py" + if not components_path.exists(): + return None + + components_spec = importlib.util.spec_from_file_location("components", components_path) + if components_spec is None: + return None + + components_module = importlib.util.module_from_spec(components_spec) + if components_spec.loader is None: + return None + + components_spec.loader.exec_module(components_module) + return components_module + + +@pytest.fixture(scope="session") +def manifest_path(connector_dir: Path) -> Path: + """Return the path to the connector's manifest file.""" + path = connector_dir / "manifest.yaml" + if not path.exists(): + raise FileNotFoundError(f"Manifest file not found at {path}") + return path