From 4c293d85bb2668f3a065b96334c9fde8e90e5ab1 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Thu, 8 Aug 2024 19:11:41 -0700 Subject: [PATCH 1/7] Fix: Add "Destinations 1.0" handling to PyAirbyte: sync IDs, generation IDs, and stream success statuses (#330) --- airbyte/_future_cdk/catalog_providers.py | 19 +++++++- airbyte/_message_iterators.py | 3 ++ airbyte/progress.py | 46 ++++++++++++++++-- examples/run_bigquery_destination.py | 59 ++++++++++++++++++++++++ 4 files changed, 123 insertions(+), 4 deletions(-) create mode 100644 examples/run_bigquery_destination.py diff --git a/airbyte/_future_cdk/catalog_providers.py b/airbyte/_future_cdk/catalog_providers.py index 2b1c0a93..b8ea6137 100644 --- a/airbyte/_future_cdk/catalog_providers.py +++ b/airbyte/_future_cdk/catalog_providers.py @@ -44,7 +44,24 @@ def __init__( Since the catalog is passed by reference, the catalog manager may be updated with new streams as they are discovered. """ - self._catalog: ConfiguredAirbyteCatalog = configured_catalog + self._catalog: ConfiguredAirbyteCatalog = self.validate_catalog(configured_catalog) + + @staticmethod + def validate_catalog(catalog: ConfiguredAirbyteCatalog) -> None: + """Validate the catalog to ensure it is valid. + + This requires ensuring that `generationId` and `minGenerationId` are both set. If + not, both values will be set to `1`. + """ + for stream in catalog.streams: + if stream.generation_id is None: + stream.generation_id = 1 + if stream.minimum_generation_id is None: + stream.minimum_generation_id = 1 + if stream.sync_id is None: + stream.sync_id = 1 # This should ideally increment monotonically with each sync. + + return catalog @property def configured_catalog(self) -> ConfiguredAirbyteCatalog: diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index 82ee71e9..c9a53908 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -18,6 +18,7 @@ ) from airbyte.constants import AB_EXTRACTED_AT_COLUMN +from airbyte.progress import _new_stream_success_message if TYPE_CHECKING: @@ -90,6 +91,8 @@ def generator() -> Generator[AirbyteMessage, None, None]: state=state_provider.get_stream_state(stream_name), ) + yield _new_stream_success_message(stream_name) + return cls(generator()) @classmethod diff --git a/airbyte/progress.py b/airbyte/progress.py index 7660c328..1069af47 100644 --- a/airbyte/progress.py +++ b/airbyte/progress.py @@ -30,7 +30,15 @@ from rich.markdown import Markdown as RichMarkdown from typing_extensions import Literal -from airbyte_protocol.models import AirbyteStreamStatus, Type +from airbyte_protocol.models import ( + AirbyteMessage, + AirbyteStreamStatus, + AirbyteStreamStatusTraceMessage, + AirbyteTraceMessage, + StreamDescriptor, + TraceType, + Type, +) from airbyte._util import meta from airbyte._util.telemetry import EventState, EventType, send_telemetry @@ -40,8 +48,6 @@ from collections.abc import Generator, Iterable from types import ModuleType - from airbyte_protocol.models import AirbyteMessage - from airbyte._message_iterators import AirbyteMessageIterator from airbyte.caches.base import CacheBase from airbyte.destinations.base import Destination @@ -68,6 +74,24 @@ IS_NOTEBOOK = False +def _new_stream_success_message(stream_name: str) -> AirbyteMessage: + """Return a new stream success message.""" + return AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + stream=stream_name, + emitted_at=pendulum.now().float_timestamp, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor( + name=stream_name, + ), + status=AirbyteStreamStatus.COMPLETE, + ), + ), + ) + + class ProgressStyle(Enum): """An enum of progress bar styles.""" @@ -201,6 +225,8 @@ def __init__( def tally_records_read( self, messages: Iterable[AirbyteMessage], + *, + auto_close_streams: bool = False, ) -> Generator[AirbyteMessage, Any, None]: """This method simply tallies the number of records processed and yields the messages.""" # Update the display before we start. @@ -247,6 +273,11 @@ def tally_records_read( # Update the display. self._update_display() + if auto_close_streams: + for stream_name in self._unclosed_stream_names: + yield _new_stream_success_message(stream_name) + self._log_stream_read_end(stream_name) + def tally_pending_writes( self, messages: IO[str] | AirbyteMessageIterator, @@ -342,6 +373,15 @@ def _log_stream_read_end(self, stream_name: str) -> None: ) self.stream_read_end_times[stream_name] = time.time() + @property + def _unclosed_stream_names(self) -> list[str]: + """Return a list of streams that have not yet been fully read.""" + return [ + stream_name + for stream_name in self.stream_read_counts + if stream_name not in self.stream_read_end_times + ] + def log_success( self, ) -> None: diff --git a/examples/run_bigquery_destination.py b/examples/run_bigquery_destination.py new file mode 100644 index 00000000..62f22682 --- /dev/null +++ b/examples/run_bigquery_destination.py @@ -0,0 +1,59 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +""" +Usage: + poetry install + poetry run python examples/run_bigquery_destination.py +""" + +from __future__ import annotations + +import tempfile +import warnings +from typing import cast + +import airbyte as ab +from airbyte.secrets.base import SecretString +from airbyte.secrets.google_gsm import GoogleGSMSecretManager + +warnings.filterwarnings("ignore", message="Cannot create BigQuery Storage client") + + +AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing" +SECRET_NAME = "SECRET_DESTINATION-BIGQUERY_CREDENTIALS__CREDS" + +bigquery_destination_secret: dict = cast( + SecretString, + GoogleGSMSecretManager( + project=AIRBYTE_INTERNAL_GCP_PROJECT, + credentials_json=ab.get_secret("GCP_GSM_CREDENTIALS"), + ).get_secret(SECRET_NAME), +).parse_json() + + +def main() -> None: + source = ab.get_source( + "source-faker", + config={"count": 1000, "seed": 0, "parallelism": 1, "always_updated": False}, + install_if_missing=True, + ) + source.check() + source.select_all_streams() + + with tempfile.NamedTemporaryFile(mode="w+", delete=False, encoding="utf-8") as temp: + # Write credentials to the temp file + temp.write(bigquery_destination_secret["credentials_json"]) + temp.flush() + temp.close() + + destination = ab.get_destination( + "destination-bigquery", + config={**bigquery_destination_secret, "dataset_id": "pyairbyte_tests"}, + ) + write_result = destination.write( + source, + # cache=False, # Toggle comment to test with/without caching + ) + + +if __name__ == "__main__": + main() From 383d89ce0aa027d4f4311e495565c4dcc1b9844d Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 9 Aug 2024 11:37:15 -0700 Subject: [PATCH 2/7] Fix: Spammy logging from `grpcio` (temporarily force-downgrade `grpcio`) (#334) --- poetry.lock | 104 ++++++++++++++++++++++++------------------------- pyproject.toml | 1 + 2 files changed, 53 insertions(+), 52 deletions(-) diff --git a/poetry.lock b/poetry.lock index f82f5d6c..b7987ac1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1105,61 +1105,61 @@ protobuf = ">=3.20.2,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4 [[package]] name = "grpcio" -version = "1.65.1" +version = "1.64.3" description = "HTTP/2-based RPC framework" optional = false python-versions = ">=3.8" files = [ - {file = "grpcio-1.65.1-cp310-cp310-linux_armv7l.whl", hash = "sha256:3dc5f928815b8972fb83b78d8db5039559f39e004ec93ebac316403fe031a062"}, - {file = "grpcio-1.65.1-cp310-cp310-macosx_12_0_universal2.whl", hash = "sha256:8333ca46053c35484c9f2f7e8d8ec98c1383a8675a449163cea31a2076d93de8"}, - {file = "grpcio-1.65.1-cp310-cp310-manylinux_2_17_aarch64.whl", hash = "sha256:7af64838b6e615fff0ec711960ed9b6ee83086edfa8c32670eafb736f169d719"}, - {file = "grpcio-1.65.1-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:dbb64b4166362d9326f7efbf75b1c72106c1aa87f13a8c8b56a1224fac152f5c"}, - {file = "grpcio-1.65.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a8422dc13ad93ec8caa2612b5032a2b9cd6421c13ed87f54db4a3a2c93afaf77"}, - {file = "grpcio-1.65.1-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:4effc0562b6c65d4add6a873ca132e46ba5e5a46f07c93502c37a9ae7f043857"}, - {file = "grpcio-1.65.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:a6c71575a2fedf259724981fd73a18906513d2f306169c46262a5bae956e6364"}, - {file = "grpcio-1.65.1-cp310-cp310-win32.whl", hash = "sha256:34966cf526ef0ea616e008d40d989463e3db157abb213b2f20c6ce0ae7928875"}, - {file = "grpcio-1.65.1-cp310-cp310-win_amd64.whl", hash = "sha256:ca931de5dd6d9eb94ff19a2c9434b23923bce6f767179fef04dfa991f282eaad"}, - {file = "grpcio-1.65.1-cp311-cp311-linux_armv7l.whl", hash = "sha256:bbb46330cc643ecf10bd9bd4ca8e7419a14b6b9dedd05f671c90fb2c813c6037"}, - {file = "grpcio-1.65.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:d827a6fb9215b961eb73459ad7977edb9e748b23e3407d21c845d1d8ef6597e5"}, - {file = "grpcio-1.65.1-cp311-cp311-manylinux_2_17_aarch64.whl", hash = "sha256:6e71aed8835f8d9fbcb84babc93a9da95955d1685021cceb7089f4f1e717d719"}, - {file = "grpcio-1.65.1-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9a1c84560b3b2d34695c9ba53ab0264e2802721c530678a8f0a227951f453462"}, - {file = "grpcio-1.65.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:27adee2338d697e71143ed147fe286c05810965d5d30ec14dd09c22479bfe48a"}, - {file = "grpcio-1.65.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:f62652ddcadc75d0e7aa629e96bb61658f85a993e748333715b4ab667192e4e8"}, - {file = "grpcio-1.65.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:71a05fd814700dd9cb7d9a507f2f6a1ef85866733ccaf557eedacec32d65e4c2"}, - {file = "grpcio-1.65.1-cp311-cp311-win32.whl", hash = "sha256:b590f1ad056294dfaeac0b7e1b71d3d5ace638d8dd1f1147ce4bd13458783ba8"}, - {file = "grpcio-1.65.1-cp311-cp311-win_amd64.whl", hash = "sha256:12e9bdf3b5fd48e5fbe5b3da382ad8f97c08b47969f3cca81dd9b36b86ed39e2"}, - {file = "grpcio-1.65.1-cp312-cp312-linux_armv7l.whl", hash = "sha256:54cb822e177374b318b233e54b6856c692c24cdbd5a3ba5335f18a47396bac8f"}, - {file = "grpcio-1.65.1-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:aaf3c54419a28d45bd1681372029f40e5bfb58e5265e3882eaf21e4a5f81a119"}, - {file = "grpcio-1.65.1-cp312-cp312-manylinux_2_17_aarch64.whl", hash = "sha256:557de35bdfbe8bafea0a003dbd0f4da6d89223ac6c4c7549d78e20f92ead95d9"}, - {file = "grpcio-1.65.1-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8bfd95ef3b097f0cc86ade54eafefa1c8ed623aa01a26fbbdcd1a3650494dd11"}, - {file = "grpcio-1.65.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9e6a8f3d6c41e6b642870afe6cafbaf7b61c57317f9ec66d0efdaf19db992b90"}, - {file = "grpcio-1.65.1-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:1faaf7355ceed07ceaef0b9dcefa4c98daf1dd8840ed75c2de128c3f4a4d859d"}, - {file = "grpcio-1.65.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:60f1f38eed830488ad2a1b11579ef0f345ff16fffdad1d24d9fbc97ba31804ff"}, - {file = "grpcio-1.65.1-cp312-cp312-win32.whl", hash = "sha256:e75acfa52daf5ea0712e8aa82f0003bba964de7ae22c26d208cbd7bc08500177"}, - {file = "grpcio-1.65.1-cp312-cp312-win_amd64.whl", hash = "sha256:ff5a84907e51924973aa05ed8759210d8cdae7ffcf9e44fd17646cf4a902df59"}, - {file = "grpcio-1.65.1-cp38-cp38-linux_armv7l.whl", hash = "sha256:1fbd6331f18c3acd7e09d17fd840c096f56eaf0ef830fbd50af45ae9dc8dfd83"}, - {file = "grpcio-1.65.1-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:de5b6be29116e094c5ef9d9e4252e7eb143e3d5f6bd6d50a78075553ab4930b0"}, - {file = "grpcio-1.65.1-cp38-cp38-manylinux_2_17_aarch64.whl", hash = "sha256:e4a3cdba62b2d6aeae6027ae65f350de6dc082b72e6215eccf82628e79efe9ba"}, - {file = "grpcio-1.65.1-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:941c4869aa229d88706b78187d60d66aca77fe5c32518b79e3c3e03fc26109a2"}, - {file = "grpcio-1.65.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f40cebe5edb518d78b8131e87cb83b3ee688984de38a232024b9b44e74ee53d3"}, - {file = "grpcio-1.65.1-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:2ca684ba331fb249d8a1ce88db5394e70dbcd96e58d8c4b7e0d7b141a453dce9"}, - {file = "grpcio-1.65.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:8558f0083ddaf5de64a59c790bffd7568e353914c0c551eae2955f54ee4b857f"}, - {file = "grpcio-1.65.1-cp38-cp38-win32.whl", hash = "sha256:8d8143a3e3966f85dce6c5cc45387ec36552174ba5712c5dc6fcc0898fb324c0"}, - {file = "grpcio-1.65.1-cp38-cp38-win_amd64.whl", hash = "sha256:76e81a86424d6ca1ce7c16b15bdd6a964a42b40544bf796a48da241fdaf61153"}, - {file = "grpcio-1.65.1-cp39-cp39-linux_armv7l.whl", hash = "sha256:cb5175f45c980ff418998723ea1b3869cce3766d2ab4e4916fbd3cedbc9d0ed3"}, - {file = "grpcio-1.65.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:b12c1aa7b95abe73b3e04e052c8b362655b41c7798da69f1eaf8d186c7d204df"}, - {file = "grpcio-1.65.1-cp39-cp39-manylinux_2_17_aarch64.whl", hash = "sha256:3019fb50128b21a5e018d89569ffaaaa361680e1346c2f261bb84a91082eb3d3"}, - {file = "grpcio-1.65.1-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7ae15275ed98ea267f64ee9ddedf8ecd5306a5b5bb87972a48bfe24af24153e8"}, - {file = "grpcio-1.65.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5f096ffb881f37e8d4f958b63c74bfc400c7cebd7a944b027357cd2fb8d91a57"}, - {file = "grpcio-1.65.1-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:2f56b5a68fdcf17a0a1d524bf177218c3c69b3947cb239ea222c6f1867c3ab68"}, - {file = "grpcio-1.65.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:941596d419b9736ab548aa0feb5bbba922f98872668847bf0720b42d1d227b9e"}, - {file = "grpcio-1.65.1-cp39-cp39-win32.whl", hash = "sha256:5fd7337a823b890215f07d429f4f193d24b80d62a5485cf88ee06648591a0c57"}, - {file = "grpcio-1.65.1-cp39-cp39-win_amd64.whl", hash = "sha256:1bceeec568372cbebf554eae1b436b06c2ff24cfaf04afade729fb9035408c6c"}, - {file = "grpcio-1.65.1.tar.gz", hash = "sha256:3c492301988cd720cd145d84e17318d45af342e29ef93141228f9cd73222368b"}, + {file = "grpcio-1.64.3-cp310-cp310-linux_armv7l.whl", hash = "sha256:32b6d78f378df38914cbb6340cec5e02ed78cb3c9cc9f7db3bb8c8132ccd1a9a"}, + {file = "grpcio-1.64.3-cp310-cp310-macosx_12_0_universal2.whl", hash = "sha256:5900314a91ac4b4bad70b64c7ccd013605dcbad92a1e28f73b54dd7d1d32f09e"}, + {file = "grpcio-1.64.3-cp310-cp310-manylinux_2_17_aarch64.whl", hash = "sha256:b6bb9d6180fc71a32a0608724f80f40d3c7e26910b65e9dd88e7c38d8400214f"}, + {file = "grpcio-1.64.3-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:efaadc7b76d8475aec84cbf79169457960274f018fb53e7da19eb752d0d6e924"}, + {file = "grpcio-1.64.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4f6c9b6b91dcfe68fe50b15ea89bc602feb597b5191631ac3b3353d5dddc5a0d"}, + {file = "grpcio-1.64.3-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:0d25eee06cfdfe2387d3394b313acee5a22148613a7d8dd3f994b369c019fc92"}, + {file = "grpcio-1.64.3-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:7d613f6f4bfb4c475201c35bcdfd596ddfa0ed41906441dd514d5a972c7f364a"}, + {file = "grpcio-1.64.3-cp310-cp310-win32.whl", hash = "sha256:577249998c8f6db7275413431f05717be0fdc258a1f427827967a9fe21f83ad4"}, + {file = "grpcio-1.64.3-cp310-cp310-win_amd64.whl", hash = "sha256:9b11173fae31abd5ce81315696bad87daed5bdb74160e3cacd4bec9c352870d7"}, + {file = "grpcio-1.64.3-cp311-cp311-linux_armv7l.whl", hash = "sha256:56d21c7392aaf7c193a4ba1341974400cf268941007203b05e9bee707d0f2d83"}, + {file = "grpcio-1.64.3-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:145069b1ee5ee8bb1060f32b00f0f462838064879788cbcbc43d599cdbf5ab9e"}, + {file = "grpcio-1.64.3-cp311-cp311-manylinux_2_17_aarch64.whl", hash = "sha256:a9f8a8c4cfbd43a44e6415e42995d7dbb8b98cb3a9d88eff34291ef670e69121"}, + {file = "grpcio-1.64.3-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6902b9ebfc833a927aa0f41fe1ffa986a2666ae96c909c7d0cf265cabc78ce93"}, + {file = "grpcio-1.64.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ee2987148d689f14f53b8cdb3c2545d42826343e38d0d31c00ab9249ecbe579d"}, + {file = "grpcio-1.64.3-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:a87076b01979d7e0297f6cd79d6ad90f305bd0168a2d217c6ae9870023f76776"}, + {file = "grpcio-1.64.3-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:fe357d7b2114568b55dde795c324c272e0029a48fdcb7c2eb5ee06a311c19b91"}, + {file = "grpcio-1.64.3-cp311-cp311-win32.whl", hash = "sha256:48933a53b57941ed02a2c97a5a821872fec80d4240c936df7800a5af0c89263f"}, + {file = "grpcio-1.64.3-cp311-cp311-win_amd64.whl", hash = "sha256:5bbeea3aac7dc25fdbf39a42cd99e7b8cce9ad248ed99747c403de540fc1157d"}, + {file = "grpcio-1.64.3-cp312-cp312-linux_armv7l.whl", hash = "sha256:90dc5acc2059737b98b849b910fde8ff83467fe5d791042333d007136085c7e0"}, + {file = "grpcio-1.64.3-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:f5674f0c9ff5af675a85e1ff03546f938b1e9f28022535e18482835165016cc7"}, + {file = "grpcio-1.64.3-cp312-cp312-manylinux_2_17_aarch64.whl", hash = "sha256:9a6a1c0b5133fdcde4473112000eae10d04cafd8bcd6d0a1fe01b04535e24f49"}, + {file = "grpcio-1.64.3-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:eb19ec6c0b3064a8f94e0842804be185ce2c7a872ea45327f4c7b626b67b663b"}, + {file = "grpcio-1.64.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:54fe53b503746f76981b96e7c6c8fde7d3cc1fdfd804e7aa399e7eb0d24d5b65"}, + {file = "grpcio-1.64.3-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:448cb624544caa17f90acce094b6cfab8f7f788c616be591114d679c580e8485"}, + {file = "grpcio-1.64.3-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:786f6c15d648b29ef25278de9026c9f2042903b9e875830b64f27816e3055b93"}, + {file = "grpcio-1.64.3-cp312-cp312-win32.whl", hash = "sha256:7599dc7d4ff6079612386fe93f45a98c3f2bea66e59bcf1c5de811d2c4da8084"}, + {file = "grpcio-1.64.3-cp312-cp312-win_amd64.whl", hash = "sha256:2b3154eb0cb1db36934c7fbb7686698650a607b9581bad103101ed86462e369e"}, + {file = "grpcio-1.64.3-cp38-cp38-linux_armv7l.whl", hash = "sha256:25532056a702a3fdb7a7306da87d49f5636e5b5495abdb9dfd9bb20149a300ca"}, + {file = "grpcio-1.64.3-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:896461893a76c933ae6b05595255a3f49e6161cc805d94fbc74359ad5f213681"}, + {file = "grpcio-1.64.3-cp38-cp38-manylinux_2_17_aarch64.whl", hash = "sha256:ae947a47a793da4df0ea3af6ebf9c3e3d8bcf266a86c9547778e6d750ce2f69d"}, + {file = "grpcio-1.64.3-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fae61bcbbbf9bcc48178bb03ea419b5faca48b420e53e663bbb57d65c3bd6733"}, + {file = "grpcio-1.64.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ce0a8bcb9ff28cb972353cf321759872cfdb27f927b2b9442a599198c53d4013"}, + {file = "grpcio-1.64.3-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:cc4ebad61b073ae08326b61c2a0707fc7aa1a53bd4db201408b7d118db51b94c"}, + {file = "grpcio-1.64.3-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:9c8ee1858eab59e6a6aed142d698a37e9474394956e399b69b8ec32977ce2d0d"}, + {file = "grpcio-1.64.3-cp38-cp38-win32.whl", hash = "sha256:be3e5d3b693e9998599c5a173af56c11de14ae37fa21bb071c24ed09ac80bee7"}, + {file = "grpcio-1.64.3-cp38-cp38-win_amd64.whl", hash = "sha256:48cabd2278e6f3dc20a5bf8a73b20e3663297d40dc7702b5e4513c50bbd5cbc3"}, + {file = "grpcio-1.64.3-cp39-cp39-linux_armv7l.whl", hash = "sha256:bcb1a5c09151b2454463143125b591b2909138e56b7dcbcb310cc98b21f56a14"}, + {file = "grpcio-1.64.3-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:d7a600d6eddaf579bdbda4e8a6ce03ee1c7a7ff66635ab1b19cf0e5b6618829e"}, + {file = "grpcio-1.64.3-cp39-cp39-manylinux_2_17_aarch64.whl", hash = "sha256:137be8c41cdb284fecd9f823c680522e602fca810aa38fa008e29d9e8b2bfe5a"}, + {file = "grpcio-1.64.3-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3e707bd119d2ddd521f45d0f0e6cb1c0714b900fa4644a6c16a5b68050065083"}, + {file = "grpcio-1.64.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:595cf5a0b4bcb1f583f7014defe4cf96cf8fc1225d1f221c1dbfc5773c619900"}, + {file = "grpcio-1.64.3-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:50553d7d48d9107bc1dcce745ca5d4f62f7a82c6780df0acb42e50ced766a2fc"}, + {file = "grpcio-1.64.3-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:1f937a3124bfcb8995d78c1f9a9600395d97c5674703c823018dc5764e16ae5e"}, + {file = "grpcio-1.64.3-cp39-cp39-win32.whl", hash = "sha256:067a1d9539e4822f03982d0736e2e6890b356a255d82452921555165154a3c1e"}, + {file = "grpcio-1.64.3-cp39-cp39-win_amd64.whl", hash = "sha256:ca272e96578a844a6cb4f3b461c419ae9c30c039cb7ff4e69624d68005ef9c38"}, + {file = "grpcio-1.64.3.tar.gz", hash = "sha256:f37a0297293918c695e625d7148f99f4e401298d1b6e2bea7a8e9130aa940419"}, ] [package.extras] -protobuf = ["grpcio-tools (>=1.65.1)"] +protobuf = ["grpcio-tools (>=1.64.3)"] [[package]] name = "grpcio-status" @@ -1334,8 +1334,8 @@ files = [ [package.dependencies] orjson = ">=3.9.14,<4.0.0" pydantic = [ - {version = ">=2.7.4,<3.0.0", markers = "python_full_version >= \"3.12.4\""}, {version = ">=1,<3", markers = "python_full_version < \"3.12.4\""}, + {version = ">=2.7.4,<3.0.0", markers = "python_full_version >= \"3.12.4\""}, ] requests = ">=2,<3" @@ -1747,8 +1747,8 @@ files = [ [package.dependencies] numpy = [ - {version = ">=1.26.0,<2", markers = "python_version >= \"3.12\""}, {version = ">=1.23.2,<2", markers = "python_version == \"3.11\""}, + {version = ">=1.26.0,<2", markers = "python_version >= \"3.12\""}, {version = ">=1.22.4,<2", markers = "python_version < \"3.11\""}, ] python-dateutil = ">=2.8.2" @@ -2127,8 +2127,8 @@ files = [ annotated-types = ">=0.4.0" pydantic-core = "2.20.1" typing-extensions = [ - {version = ">=4.12.2", markers = "python_version >= \"3.13\""}, {version = ">=4.6.1", markers = "python_version < \"3.13\""}, + {version = ">=4.12.2", markers = "python_version >= \"3.13\""}, ] [package.extras] @@ -3490,4 +3490,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = ">=3.9,<4.0" -content-hash = "ff078465575d9db425a09b760fc3768d496ea291cb050206afcac62820d55a56" +content-hash = "d6cab43d92e5f7b22cb1bac5ebee693bc5330dc6a816dc8f504286d36640f36b" diff --git a/pyproject.toml b/pyproject.toml index 29c9a556..6528c643 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,7 @@ airbyte-api = "^0.49.2" google-cloud-bigquery-storage = "^2.25.0" pyiceberg = "^0.6.1" uuid7 = "^0.1.0" +grpcio = "<=1.65.0" [tool.poetry.group.dev.dependencies] docker = "^7.0.0" From 23da9202f64b13d76f552fec46c01cd9b70392c7 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 9 Aug 2024 14:48:01 -0700 Subject: [PATCH 3/7] Feat: Improve log file handling for connector failures (#333) Co-authored-by: octavia-squidington-iii --- airbyte/_connector_base.py | 17 ++++++------- airbyte/_util/meta.py | 16 ------------- airbyte/constants.py | 49 ++++++++++++++++++++++++++++++++++++++ airbyte/exceptions.py | 26 ++++++++++++-------- 4 files changed, 74 insertions(+), 34 deletions(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index f14a3924..36b88f21 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -25,13 +25,13 @@ ) from airbyte import exceptions as exc -from airbyte._util import meta from airbyte._util.telemetry import ( EventState, log_config_validation_result, log_connector_check_result, ) from airbyte._util.temp_files import as_temp_files +from airbyte.constants import AIRBYTE_LOGGING_ROOT if TYPE_CHECKING: @@ -307,16 +307,22 @@ def _init_logger(self) -> logging.Logger: # Prevent logging to stderr by stopping propagation to the root logger logger.propagate = False + if AIRBYTE_LOGGING_ROOT is None: + # No temp directory available, so return a basic logger + return logger + + # Else, configure the logger to write to a file + # Remove any existing handlers for handler in logger.handlers: logger.removeHandler(handler) - folder = meta.get_logging_root() / self.name + folder = AIRBYTE_LOGGING_ROOT / self.name folder.mkdir(parents=True, exist_ok=True) # Create and configure file handler handler = logging.FileHandler( - filename=folder / f"{ulid.ULID()!s}-run-log.txt", + filename=folder / f"connector-log-{ulid.ULID()!s}.txt", encoding="utf-8", ) handler.setFormatter( @@ -329,11 +335,6 @@ def _init_logger(self) -> logging.Logger: logger.addHandler(handler) return logger - def _new_log_file(self, verb: str = "run") -> Path: - folder = meta.get_logging_root() / self.name - folder.mkdir(parents=True, exist_ok=True) - return folder / f"{ulid.ULID()!s}-{self.name}-{verb}-log.txt" - def _peek_airbyte_message( self, message: AirbyteMessage, diff --git a/airbyte/_util/meta.py b/airbyte/_util/meta.py index bfffae0b..26dfacff 100644 --- a/airbyte/_util/meta.py +++ b/airbyte/_util/meta.py @@ -8,7 +8,6 @@ import os import sys -import tempfile from contextlib import suppress from functools import lru_cache from pathlib import Path @@ -21,21 +20,6 @@ """URL to get the current Google Colab session information.""" -@lru_cache -def get_logging_root() -> Path: - """Return the root directory for logs. - - This is the directory where logs are stored. - """ - if "AIRBYTE_LOGGING_ROOT" in os.environ: - log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"]) - else: - log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs" - - log_root.mkdir(parents=True, exist_ok=True) - return log_root - - def get_colab_release_version() -> str | None: if "COLAB_RELEASE_TAG" in os.environ: return os.environ["COLAB_RELEASE_TAG"] diff --git a/airbyte/constants.py b/airbyte/constants.py index 0a0503d6..7ee71de0 100644 --- a/airbyte/constants.py +++ b/airbyte/constants.py @@ -3,6 +3,12 @@ from __future__ import annotations +import os +import tempfile +import warnings +from functools import lru_cache +from pathlib import Path + DEBUG_MODE = False # Set to True to enable additional debug logging. @@ -41,3 +47,46 @@ DEFAULT_ARROW_MAX_CHUNK_SIZE = 100_000 """The default number of records to include in each batch of an Arrow dataset.""" + + +@lru_cache +def _get_logging_root() -> Path | None: + """Return the root directory for logs. + + Returns `None` if no valid path can be found. + + This is the directory where logs are stored. + """ + if "AIRBYTE_LOGGING_ROOT" in os.environ: + log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"]) + else: + log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs" + + try: + # Attempt to create the log root directory if it does not exist + log_root.mkdir(parents=True, exist_ok=True) + except OSError: + # Handle the error by returning None + warnings.warn( + ( + f"Failed to create PyAirbyte logging directory at `{log_root}`. " + "You can override the default path by setting the `AIRBYTE_LOGGING_ROOT` " + "environment variable." + ), + category=UserWarning, + stacklevel=0, + ) + return None + else: + return log_root + + +AIRBYTE_LOGGING_ROOT: Path | None = _get_logging_root() +"""The root directory for Airbyte logs. + +This value can be overridden by setting the `AIRBYTE_LOGGING_ROOT` environment variable. + +If not provided, PyAirbyte will use `/tmp/airbyte/logs/` where `/tmp/` is the OS's default +temporary directory. If the directory cannot be created, PyAirbyte will log a warning and +set this value to `None`. +""" diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index 7208038d..a59ae471 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -64,6 +64,7 @@ class PyAirbyteError(Exception): guidance: str | None = None help_url: str | None = None log_text: str | list[str] | None = None + log_file: Path | None = None context: dict[str, Any] | None = None message: str | None = None @@ -81,7 +82,7 @@ def get_message(self) -> str: def __str__(self) -> str: """Return a string representation of the exception.""" - special_properties = ["message", "guidance", "help_url", "log_text", "context"] + special_properties = ["message", "guidance", "help_url", "log_text", "context", "log_file"] display_properties = { k: v for k, v in self.__dict__.items() @@ -99,13 +100,16 @@ def __str__(self) -> str: if isinstance(self.log_text, list): self.log_text = "\n".join(self.log_text) - exception_str += f"\nLog output: \n {indent(self.log_text, ' ')}" + exception_str += f"\n Log output: \n {indent(self.log_text, ' ')}" + + if self.log_file: + exception_str += f"\n Log file: {self.log_file.absolute()!s}" if self.guidance: - exception_str += f"\nSuggestion: {self.guidance}" + exception_str += f"\n Suggestion: {self.guidance}" if self.help_url: - exception_str += f"\nMore info: {self.help_url}" + exception_str += f"\n More info: {self.help_url}" return exception_str @@ -263,13 +267,13 @@ class AirbyteConnectorError(PyAirbyteError): connector_name: str | None = None def __post_init__(self) -> None: - """Log the error message when the exception is raised.""" + """Set the log file path for the connector.""" + self.log_file = self._get_log_file() + + def _get_log_file(self) -> Path | None: + """Return the log file path for the connector.""" if self.connector_name: logger = logging.getLogger(f"airbyte.{self.connector_name}") - if self.connector_name: - logger.error(str(self)) - else: - logger.error(str(self)) log_paths: list[Path] = [ Path(handler.baseFilename).absolute() @@ -278,7 +282,9 @@ def __post_init__(self) -> None: ] if log_paths: - print(f"Connector logs: {', '.join(str(path) for path in log_paths)}") + return log_paths[0] + + return None class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError): From 9bb782e83607e0017cccc13d00d0a47535ebadc7 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 9 Aug 2024 17:08:10 -0700 Subject: [PATCH 4/7] Feat: Process streams using deterministic (alpha-sorted) ordering (#335) --- airbyte/_future_cdk/record_processor.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte/_future_cdk/record_processor.py b/airbyte/_future_cdk/record_processor.py index 89c3f0d0..dbe21baa 100644 --- a/airbyte/_future_cdk/record_processor.py +++ b/airbyte/_future_cdk/record_processor.py @@ -252,8 +252,12 @@ def write_all_stream_data( write_strategy: WriteStrategy, progress_tracker: ProgressTracker, ) -> None: - """Finalize any pending writes.""" - for stream_name in self.catalog_provider.stream_names: + """Finalize any pending writes. + + Streams are processed in alphabetical order, so that order is deterministic and opaque, + without resorting to knowledge about catalog declaration order. + """ + for stream_name in sorted(self.catalog_provider.stream_names): self.write_stream_data( stream_name, write_strategy=write_strategy, From 94681b06f6a1b7512bcd0f232066e300f542a053 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 9 Aug 2024 17:16:18 -0700 Subject: [PATCH 5/7] Feat: Add new configurable env var `AIRBYTE_TEMP_FILE_CLEANUP` (#336) --- airbyte/caches/base.py | 4 ++-- airbyte/constants.py | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/airbyte/caches/base.py b/airbyte/caches/base.py index 33f7cb2d..b26de1f9 100644 --- a/airbyte/caches/base.py +++ b/airbyte/caches/base.py @@ -21,7 +21,7 @@ from airbyte._future_cdk.state_writers import StdOutStateWriter from airbyte.caches._catalog_backend import CatalogBackendBase, SqlCatalogBackend from airbyte.caches._state_backend import SqlStateBackend -from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE +from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE, TEMP_FILE_CLEANUP from airbyte.datasets._sql import CachedDataset @@ -49,7 +49,7 @@ class CacheBase(SqlConfig): cache_dir: Path = Field(default=Path(".cache")) """The directory to store the cache in.""" - cleanup: bool = True + cleanup: bool = TEMP_FILE_CLEANUP """Whether to clean up the cache after use.""" _deployed_api_root: Optional[str] = PrivateAttr(default=None) diff --git a/airbyte/constants.py b/airbyte/constants.py index 7ee71de0..8cf9794c 100644 --- a/airbyte/constants.py +++ b/airbyte/constants.py @@ -90,3 +90,18 @@ def _get_logging_root() -> Path | None: temporary directory. If the directory cannot be created, PyAirbyte will log a warning and set this value to `None`. """ + +TEMP_FILE_CLEANUP = bool( + os.getenv( + key="AIRBYTE_TEMP_FILE_CLEANUP", + default="true", + ) + .lower() + .replace("false", "") + .replace("0", "") +) +"""Whether to clean up temporary files after use. + +This value is read from the `AIRBYTE_TEMP_FILE_CLEANUP` environment variable. If the variable is +not set, the default value is `True`. +""" From 2aa93996c3ef34f2e737e482250e4cd1240deecd Mon Sep 17 00:00:00 2001 From: nakamichi Date: Sat, 10 Aug 2024 09:36:55 +0900 Subject: [PATCH 6/7] Fix: Normalize schema names for correct name comparison (#296) --- airbyte/_future_cdk/sql_processor.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte/_future_cdk/sql_processor.py b/airbyte/_future_cdk/sql_processor.py index 925ab5e3..dd4df3d7 100644 --- a/airbyte/_future_cdk/sql_processor.py +++ b/airbyte/_future_cdk/sql_processor.py @@ -305,13 +305,13 @@ def _get_table_by_name( def _ensure_schema_exists( self, ) -> None: - """Return a new (unique) temporary table name.""" - schema_name = self.sql_config.schema_name - - if self._known_schemas_list and self.sql_config.schema_name in self._known_schemas_list: + schema_name = self.normalizer.normalize(self.sql_config.schema_name) + known_schemas_list = self.normalizer.normalize_list(self._known_schemas_list) + if known_schemas_list and schema_name in known_schemas_list: return # Already exists - if schema_name in self._get_schemas_list(): + schemas_list = self.normalizer.normalize_list(self._get_schemas_list()) + if schema_name in schemas_list: return sql = f"CREATE SCHEMA IF NOT EXISTS {schema_name}" @@ -324,7 +324,7 @@ def _ensure_schema_exists( raise if DEBUG_MODE: - found_schemas = self._get_schemas_list() + found_schemas = schemas_list assert ( schema_name in found_schemas ), f"Schema {schema_name} was not created. Found: {found_schemas}" From 39fe9a3e29190c0a0cb8873bc3d2197a39892855 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 9 Aug 2024 20:10:14 -0700 Subject: [PATCH 7/7] CI: Skip PR title checks on draft PRs (#337) --- .github/workflows/semantic_pr_check.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/semantic_pr_check.yml b/.github/workflows/semantic_pr_check.yml index bd01b877..1cdb7868 100644 --- a/.github/workflows/semantic_pr_check.yml +++ b/.github/workflows/semantic_pr_check.yml @@ -1,11 +1,12 @@ name: "Verify Semantic PR Title" on: - pull_request_target: + pull_request: types: - opened - edited - synchronize + - ready_for_review env: AIRBYTE_ANALYTICS_ID: ${{ vars.AIRBYTE_ANALYTICS_ID }} @@ -19,6 +20,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: amannn/action-semantic-pull-request@v5 + if: ${{ github.event.pull_request.draft == false }} env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: