Skip to content

Commit

Permalink
change float ms to int ns for timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Feb 16, 2024
1 parent e1fe3ab commit 6d2e9f4
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 150 deletions.
108 changes: 44 additions & 64 deletions google/cloud/bigtable/data/_metrics/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,6 @@

INVALID_STATE_ERROR = "Invalid state for {}: {}"


@dataclass(frozen=True)
class TimeTuple:
"""
Tuple that holds both the utc timestamp to record with the metrics, and a
monotonic timestamp for calculating durations. The monotonic timestamp is
preferred for calculations because it is resilient to clock changes, eg DST
"""

utc: datetime.datetime = field(
default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
)
monotonic: float = field(default_factory=time.monotonic)


class OperationType(Enum):
"""Enum for the type of operation being performed."""

Expand Down Expand Up @@ -93,14 +78,13 @@ class CompletedAttemptMetric:
corresponding CompletedOperationMetric or ActiveOperationMetric object.
"""

start_time: datetime.datetime
duration_ms: float
duration_ns: int
end_status: StatusCode
first_response_latency_ms: float | None = None
gfe_latency_ms: float | None = None
application_blocking_time_ms: float = 0.0
backoff_before_attempt_ms: float = 0.0
grpc_throttling_time_ms: float = 0.0
first_response_latency_ns: int | None = None
gfe_latency_ns: int | None = None
application_blocking_time_ns: int = 0
backoff_before_attempt_ns: int = 0
grpc_throttling_time_ns: int = 0


@dataclass(frozen=True)
Expand All @@ -114,14 +98,13 @@ class CompletedOperationMetric:
"""

op_type: OperationType
start_time: datetime.datetime
duration_ms: float
duration_ns: int
completed_attempts: list[CompletedAttemptMetric]
final_status: StatusCode
cluster_id: str
zone: str
is_streaming: bool
flow_throttling_time_ms: float = 0.0
flow_throttling_time_ns: int = 0.0


@dataclass
Expand All @@ -131,21 +114,21 @@ class ActiveAttemptMetric:
currently in progress. Fields are mutable and may be optional.
"""

# keep both clock time and monotonic timestamps for active attempts
start_time: TimeTuple = field(default_factory=TimeTuple)
# the time it takes to recieve the first response from the server, in milliseconds
# keep monotonic timestamps for active attempts
start_time_ns: int = field(default_factory=time.monotonic_ns)
# the time it takes to recieve the first response from the server, in nanoseconds
# currently only tracked for ReadRows
first_response_latency_ms: float | None = None
# the time taken by the backend, in milliseconds. Taken from response header
gfe_latency_ms: float | None = None
# time waiting on user to process the response, in milliseconds
first_response_latency_ns: int | None = None
# the time taken by the backend, in nanoseconds. Taken from response header
gfe_latency_ns: int | None = None
# time waiting on user to process the response, in nanoseconds
# currently only relevant for ReadRows
application_blocking_time_ms: float = 0.0
# backoff time is added to application_blocking_time_ms
backoff_before_attempt_ms: float = 0.0
# time waiting on grpc channel, in milliseconds
application_blocking_time_ns: int = 0
# backoff time is added to application_blocking_time_ns
backoff_before_attempt_ns: int = 0
# time waiting on grpc channel, in nanoseconds
# TODO: capture grpc_throttling_time
grpc_throttling_time_ms: float = 0.0
grpc_throttling_time_ns: int = 0


@dataclass
Expand All @@ -157,17 +140,17 @@ class ActiveOperationMetric:

op_type: OperationType
backoff_generator: Generator[float, int, None] | None = None
# keep both clock time and monotonic timestamps for active operations
start_time: TimeTuple = field(default_factory=TimeTuple)
# keep monotonic timestamps for active operations
start_time_ns: int = field(default_factory=time.monotonic_ns)
active_attempt: ActiveAttemptMetric | None = None
cluster_id: str | None = None
zone: str | None = None
completed_attempts: list[CompletedAttemptMetric] = field(default_factory=list)
is_streaming: bool = False # only True for read_rows operations
was_completed: bool = False
handlers: list[MetricsHandler] = field(default_factory=list)
# time waiting on flow control, in milliseconds
flow_throttling_time_ms: float = 0.0
# time waiting on flow control, in nanoseconds
flow_throttling_time_ns: int = 0

@property
def state(self) -> OperationState:
Expand All @@ -190,7 +173,7 @@ def start(self) -> None:
"""
if self.state != OperationState.CREATED:
return self._handle_error(INVALID_STATE_ERROR.format("start", self.state))
self.start_time = TimeTuple()
self.start_time_ns = time.monotonic_ns()

def start_attempt(self) -> None:
"""
Expand All @@ -209,14 +192,14 @@ def start_attempt(self) -> None:
# find backoff value
if self.backoff_generator and len(self.completed_attempts) > 0:
# find the attempt's backoff by sending attempt number to generator
# generator will return the backoff time in seconds, so convert to ms
backoff_ms = (
self.backoff_generator.send(len(self.completed_attempts) - 1) * 1000
# generator will return the backoff time in seconds, so convert to nanoseconds
backoff_ns = int(
self.backoff_generator.send(len(self.completed_attempts) - 1) * 1e9
)
else:
backoff_ms = 0
backoff_ns = 0

self.active_attempt = ActiveAttemptMetric(backoff_before_attempt_ms=backoff_ms)
self.active_attempt = ActiveAttemptMetric(backoff_before_attempt_ns=backoff_ns)

def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None:
"""
Expand Down Expand Up @@ -253,7 +236,8 @@ def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None:
if timing_header:
timing_data = SERVER_TIMING_REGEX.match(timing_header)
if timing_data and self.active_attempt:
self.active_attempt.gfe_latency_ms = float(timing_data.group(1))
gfe_latency_ms = float(timing_data.group(1))
self.active_attempt.gfe_latency_ns = int(gfe_latency_ms * 1e6)

@staticmethod
@lru_cache(maxsize=32)
Expand Down Expand Up @@ -286,12 +270,11 @@ def attempt_first_response(self) -> None:
return self._handle_error(
INVALID_STATE_ERROR.format("attempt_first_response", self.state)
)
if self.active_attempt.first_response_latency_ms is not None:
if self.active_attempt.first_response_latency_ns is not None:
return self._handle_error("Attempt already received first response")
# convert duration to milliseconds
self.active_attempt.first_response_latency_ms = (
time.monotonic() - self.active_attempt.start_time.monotonic
) * 1000
self.active_attempt.first_response_latency_ns = (
time.monotonic_ns() - self.active_attempt.start_time_ns
)

def end_attempt_with_status(self, status: StatusCode | Exception) -> None:
"""
Expand All @@ -312,16 +295,14 @@ def end_attempt_with_status(self, status: StatusCode | Exception) -> None:
)
if isinstance(status, Exception):
status = self._exc_to_status(status)
duration_seconds = time.monotonic() - self.active_attempt.start_time.monotonic
new_attempt = CompletedAttemptMetric(
start_time=self.active_attempt.start_time.utc,
first_response_latency_ms=self.active_attempt.first_response_latency_ms,
duration_ms=duration_seconds * 1000,
first_response_latency_ns=self.active_attempt.first_response_latency_ns,
duration_ns=time.monotonic_ns() - self.active_attempt.start_time_ns,
end_status=status,
gfe_latency_ms=self.active_attempt.gfe_latency_ms,
application_blocking_time_ms=self.active_attempt.application_blocking_time_ms,
backoff_before_attempt_ms=self.active_attempt.backoff_before_attempt_ms,
grpc_throttling_time_ms=self.active_attempt.grpc_throttling_time_ms,
gfe_latency_ns=self.active_attempt.gfe_latency_ns,
application_blocking_time_ns=self.active_attempt.application_blocking_time_ns,
backoff_before_attempt_ns=self.active_attempt.backoff_before_attempt_ns,
grpc_throttling_time_ns=self.active_attempt.grpc_throttling_time_ns,
)
self.completed_attempts.append(new_attempt)
self.active_attempt = None
Expand Down Expand Up @@ -352,14 +333,13 @@ def end_with_status(self, status: StatusCode | Exception) -> None:
self.was_completed = True
finalized = CompletedOperationMetric(
op_type=self.op_type,
start_time=self.start_time.utc,
completed_attempts=self.completed_attempts,
duration_ms=(time.monotonic() - self.start_time.monotonic) * 1000,
duration_ns=time.monotonic_ns() - self.start_time_ns,
final_status=final_status,
cluster_id=self.cluster_id or DEFAULT_CLUSTER_ID,
zone=self.zone or DEFAULT_ZONE,
is_streaming=self.is_streaming,
flow_throttling_time_ms=self.flow_throttling_time_ms,
flow_throttling_time_ns=self.flow_throttling_time_ns,
)
for handler in self.handlers:
handler.on_operation_complete(finalized)
Expand Down
Loading

0 comments on commit 6d2e9f4

Please sign in to comment.