From 7488afb5bcd14f0810ab583a670683d585e7942e Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Mon, 2 Sep 2024 12:08:11 -0700 Subject: [PATCH] Feat: Add performance metrics and mb/s tracking (#356) --- airbyte/_connector_base.py | 8 +++ airbyte/progress.py | 105 ++++++++++++++++++++++++++++++++++++- airbyte/sources/base.py | 1 + 3 files changed, 112 insertions(+), 2 deletions(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index f1db8b21..6247a3e2 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -38,6 +38,7 @@ from airbyte._executors.base import Executor from airbyte._message_iterators import AirbyteMessageIterator + from airbyte.progress import ProgressTracker MAX_LOG_LINES = 20 @@ -352,6 +353,8 @@ def _execute( self, args: list[str], stdin: IO[str] | AirbyteMessageIterator | None = None, + *, + progress_tracker: ProgressTracker | None = None, ) -> Generator[AirbyteMessage, None, None]: """Execute the connector with the given arguments. @@ -371,6 +374,11 @@ def _execute( for line in self.executor.execute(args, stdin=stdin): try: message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line) + if progress_tracker and message.record: + progress_tracker.tally_bytes_read( + len(line), + stream_name=message.record.stream, + ) self._peek_airbyte_message(message) yield message diff --git a/airbyte/progress.py b/airbyte/progress.py index c867b739..def54028 100644 --- a/airbyte/progress.py +++ b/airbyte/progress.py @@ -18,6 +18,7 @@ import datetime import importlib +import json import math import os import sys @@ -207,6 +208,7 @@ def __init__( self.stream_read_counts: dict[str, int] = defaultdict(int) self.stream_read_start_times: dict[str, float] = {} self.stream_read_end_times: dict[str, float] = {} + self.stream_bytes_read: dict[str, int] = defaultdict(int) # Cache Writes self.total_records_written = 0 @@ -238,6 +240,27 @@ def _print_info_message( if self._file_logger: self._file_logger.info(message) + @property + def bytes_tracking_enabled(self) -> bool: + """Return True if bytes are being tracked.""" + return bool(self.stream_bytes_read) + + @property + def total_bytes_read(self) -> int: + """Return the total number of bytes read. + + Return None if bytes are not being tracked. + """ + return sum(self.stream_bytes_read.values()) + + @property + def total_megabytes_read(self) -> float: + """Return the total number of bytes read. + + Return None if no bytes have been read, as this is generally due to bytes not being tracked. + """ + return self.total_bytes_read / 1_000_000 + def tally_records_read( self, messages: Iterable[AirbyteMessage], @@ -351,6 +374,13 @@ def tally_confirmed_writes( self._update_display(force_refresh=True) + def tally_bytes_read(self, bytes_read: int, stream_name: str) -> None: + """Tally the number of bytes read. + + Unlike the other tally methods, this method does not yield messages. + """ + self.stream_bytes_read[stream_name] += bytes_read + # Logging methods @property @@ -393,6 +423,72 @@ def _log_stream_read_end(self, stream_name: str) -> None: ) self.stream_read_end_times[stream_name] = time.time() + def _log_read_metrics(self) -> None: + """Log read performance metrics.""" + # Source performance metrics + if not self.total_records_read or not self._file_logger: + return + + perf_metrics: dict[str, Any] = { + "job_description": { + "description": self.job_description, + } + } + if self._source: + perf_metrics["job_description"]["source"] = self._source.name + if self._cache: + perf_metrics["job_description"]["cache"] = type(self._cache).__name__ + if self._destination: + perf_metrics["job_description"]["destination"] = self._destination.name + + perf_metrics["records_read"] = self.total_records_read + perf_metrics["read_time_seconds"] = self.elapsed_read_seconds + perf_metrics["read_start_time"] = self.read_start_time + perf_metrics["read_end_time"] = self.read_end_time + if self.elapsed_read_seconds > 0: + perf_metrics["records_per_second"] = round( + self.total_records_read / self.elapsed_read_seconds, 4 + ) + if self.bytes_tracking_enabled: + mb_read = self.total_megabytes_read + perf_metrics["mb_read"] = mb_read + perf_metrics["mb_per_second"] = round(mb_read / self.elapsed_read_seconds, 4) + + stream_metrics = {} + for stream_name, count in self.stream_read_counts.items(): + stream_metrics[stream_name] = { + "records_read": count, + "read_start_time": self.stream_read_start_times.get(stream_name), + "read_end_time": self.stream_read_end_times.get(stream_name), + } + if ( + stream_name in self.stream_read_end_times + and stream_name in self.stream_read_start_times + and count > 0 + ): + duration: float = ( + self.stream_read_end_times[stream_name] + - self.stream_read_start_times[stream_name] + ) + stream_metrics[stream_name]["read_time_seconds"] = duration + if duration > 0: + stream_metrics[stream_name]["records_per_second"] = round( + count + / ( + self.stream_read_end_times[stream_name] + - self.stream_read_start_times[stream_name] + ), + 4, + ) + if self.bytes_tracking_enabled: + mb_read = self.stream_bytes_read[stream_name] / 1_000_000 + stream_metrics[stream_name]["mb_read"] = mb_read + stream_metrics[stream_name]["mb_per_second"] = round(mb_read / duration, 4) + + perf_metrics["stream_metrics"] = stream_metrics + + self._file_logger.info(json.dumps({"read_performance_metrics": perf_metrics})) + @property def _unclosed_stream_names(self) -> list[str]: """Return a list of streams that have not yet been fully read.""" @@ -416,6 +512,7 @@ def log_success( self._print_info_message( f"Completed `{self.job_description}` sync at `{pendulum.now().format('HH:mm:ss')}`." ) + self._log_read_metrics() send_telemetry( source=self._source, cache=self._cache, @@ -663,8 +760,12 @@ def _get_status_message(self) -> str: # Format start time as a friendly string in local timezone: start_time_str = _to_time_str(self.read_start_time) records_per_second: float = 0.0 + mb_per_second_str = "" if self.elapsed_read_seconds > 0: records_per_second = self.total_records_read / self.elapsed_read_seconds + if self.bytes_tracking_enabled: + mb_per_second = self.total_megabytes_read / self.elapsed_read_seconds + mb_per_second_str = f", {mb_per_second:,.2f} MB/s" status_message = HORIZONTAL_LINE + f"\n### Sync Progress: `{self.job_description}`\n\n" @@ -680,7 +781,7 @@ def join_streams_strings(streams_list: list[str]) -> str: f"**Started reading from source at `{start_time_str}`:**\n\n" f"- Read **{self.total_records_read:,}** records " f"over **{self.elapsed_read_time_string}** " - f"({records_per_second:,.1f} records / second).\n\n" + f"({records_per_second:,.1f} records/s{mb_per_second_str}).\n\n" ) if self.stream_read_counts: @@ -747,7 +848,7 @@ def join_streams_strings(streams_list: list[str]) -> str: status_message += ( f"- Sent **{self.total_destination_records_delivered:,} records** " f"to destination over **{self.total_destination_write_time_str}** " - f"({self.destination_records_delivered_per_second:,.1f} records per second)." + f"({self.destination_records_delivered_per_second:,.1f} records/s)." "\n\n" ) status_message += ( diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index 48aaece8..fed5a973 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -552,6 +552,7 @@ def _read_with_catalog( "--state", state_file, ], + progress_tracker=progress_tracker, ) yield from progress_tracker.tally_records_read(message_generator) progress_tracker.log_read_complete()