Skip to content

Commit

Permalink
Feat: Add performance metrics and mb/s tracking (#356)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Sep 2, 2024
1 parent 31b6eeb commit 7488afb
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 2 deletions.
8 changes: 8 additions & 0 deletions airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

from airbyte._executors.base import Executor
from airbyte._message_iterators import AirbyteMessageIterator
from airbyte.progress import ProgressTracker


MAX_LOG_LINES = 20
Expand Down Expand Up @@ -352,6 +353,8 @@ def _execute(
self,
args: list[str],
stdin: IO[str] | AirbyteMessageIterator | None = None,
*,
progress_tracker: ProgressTracker | None = None,
) -> Generator[AirbyteMessage, None, None]:
"""Execute the connector with the given arguments.
Expand All @@ -371,6 +374,11 @@ def _execute(
for line in self.executor.execute(args, stdin=stdin):
try:
message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line)
if progress_tracker and message.record:
progress_tracker.tally_bytes_read(
len(line),
stream_name=message.record.stream,
)
self._peek_airbyte_message(message)
yield message

Expand Down
105 changes: 103 additions & 2 deletions airbyte/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import datetime
import importlib
import json
import math
import os
import sys
Expand Down Expand Up @@ -207,6 +208,7 @@ def __init__(
self.stream_read_counts: dict[str, int] = defaultdict(int)
self.stream_read_start_times: dict[str, float] = {}
self.stream_read_end_times: dict[str, float] = {}
self.stream_bytes_read: dict[str, int] = defaultdict(int)

# Cache Writes
self.total_records_written = 0
Expand Down Expand Up @@ -238,6 +240,27 @@ def _print_info_message(
if self._file_logger:
self._file_logger.info(message)

@property
def bytes_tracking_enabled(self) -> bool:
"""Return True if bytes are being tracked."""
return bool(self.stream_bytes_read)

@property
def total_bytes_read(self) -> int:
"""Return the total number of bytes read.
Return None if bytes are not being tracked.
"""
return sum(self.stream_bytes_read.values())

@property
def total_megabytes_read(self) -> float:
"""Return the total number of bytes read.
Return None if no bytes have been read, as this is generally due to bytes not being tracked.
"""
return self.total_bytes_read / 1_000_000

def tally_records_read(
self,
messages: Iterable[AirbyteMessage],
Expand Down Expand Up @@ -351,6 +374,13 @@ def tally_confirmed_writes(

self._update_display(force_refresh=True)

def tally_bytes_read(self, bytes_read: int, stream_name: str) -> None:
"""Tally the number of bytes read.
Unlike the other tally methods, this method does not yield messages.
"""
self.stream_bytes_read[stream_name] += bytes_read

# Logging methods

@property
Expand Down Expand Up @@ -393,6 +423,72 @@ def _log_stream_read_end(self, stream_name: str) -> None:
)
self.stream_read_end_times[stream_name] = time.time()

def _log_read_metrics(self) -> None:
"""Log read performance metrics."""
# Source performance metrics
if not self.total_records_read or not self._file_logger:
return

perf_metrics: dict[str, Any] = {
"job_description": {
"description": self.job_description,
}
}
if self._source:
perf_metrics["job_description"]["source"] = self._source.name
if self._cache:
perf_metrics["job_description"]["cache"] = type(self._cache).__name__
if self._destination:
perf_metrics["job_description"]["destination"] = self._destination.name

perf_metrics["records_read"] = self.total_records_read
perf_metrics["read_time_seconds"] = self.elapsed_read_seconds
perf_metrics["read_start_time"] = self.read_start_time
perf_metrics["read_end_time"] = self.read_end_time
if self.elapsed_read_seconds > 0:
perf_metrics["records_per_second"] = round(
self.total_records_read / self.elapsed_read_seconds, 4
)
if self.bytes_tracking_enabled:
mb_read = self.total_megabytes_read
perf_metrics["mb_read"] = mb_read
perf_metrics["mb_per_second"] = round(mb_read / self.elapsed_read_seconds, 4)

stream_metrics = {}
for stream_name, count in self.stream_read_counts.items():
stream_metrics[stream_name] = {
"records_read": count,
"read_start_time": self.stream_read_start_times.get(stream_name),
"read_end_time": self.stream_read_end_times.get(stream_name),
}
if (
stream_name in self.stream_read_end_times
and stream_name in self.stream_read_start_times
and count > 0
):
duration: float = (
self.stream_read_end_times[stream_name]
- self.stream_read_start_times[stream_name]
)
stream_metrics[stream_name]["read_time_seconds"] = duration
if duration > 0:
stream_metrics[stream_name]["records_per_second"] = round(
count
/ (
self.stream_read_end_times[stream_name]
- self.stream_read_start_times[stream_name]
),
4,
)
if self.bytes_tracking_enabled:
mb_read = self.stream_bytes_read[stream_name] / 1_000_000
stream_metrics[stream_name]["mb_read"] = mb_read
stream_metrics[stream_name]["mb_per_second"] = round(mb_read / duration, 4)

perf_metrics["stream_metrics"] = stream_metrics

self._file_logger.info(json.dumps({"read_performance_metrics": perf_metrics}))

@property
def _unclosed_stream_names(self) -> list[str]:
"""Return a list of streams that have not yet been fully read."""
Expand All @@ -416,6 +512,7 @@ def log_success(
self._print_info_message(
f"Completed `{self.job_description}` sync at `{pendulum.now().format('HH:mm:ss')}`."
)
self._log_read_metrics()
send_telemetry(
source=self._source,
cache=self._cache,
Expand Down Expand Up @@ -663,8 +760,12 @@ def _get_status_message(self) -> str:
# Format start time as a friendly string in local timezone:
start_time_str = _to_time_str(self.read_start_time)
records_per_second: float = 0.0
mb_per_second_str = ""
if self.elapsed_read_seconds > 0:
records_per_second = self.total_records_read / self.elapsed_read_seconds
if self.bytes_tracking_enabled:
mb_per_second = self.total_megabytes_read / self.elapsed_read_seconds
mb_per_second_str = f", {mb_per_second:,.2f} MB/s"

status_message = HORIZONTAL_LINE + f"\n### Sync Progress: `{self.job_description}`\n\n"

Expand All @@ -680,7 +781,7 @@ def join_streams_strings(streams_list: list[str]) -> str:
f"**Started reading from source at `{start_time_str}`:**\n\n"
f"- Read **{self.total_records_read:,}** records "
f"over **{self.elapsed_read_time_string}** "
f"({records_per_second:,.1f} records / second).\n\n"
f"({records_per_second:,.1f} records/s{mb_per_second_str}).\n\n"
)

if self.stream_read_counts:
Expand Down Expand Up @@ -747,7 +848,7 @@ def join_streams_strings(streams_list: list[str]) -> str:
status_message += (
f"- Sent **{self.total_destination_records_delivered:,} records** "
f"to destination over **{self.total_destination_write_time_str}** "
f"({self.destination_records_delivered_per_second:,.1f} records per second)."
f"({self.destination_records_delivered_per_second:,.1f} records/s)."
"\n\n"
)
status_message += (
Expand Down
1 change: 1 addition & 0 deletions airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ def _read_with_catalog(
"--state",
state_file,
],
progress_tracker=progress_tracker,
)
yield from progress_tracker.tally_records_read(message_generator)
progress_tracker.log_read_complete()
Expand Down

0 comments on commit 7488afb

Please sign in to comment.