Skip to content

Commit

Permalink
added tests for state machine, metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Nov 21, 2023
1 parent 5509c5e commit 7900f27
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 88 deletions.
88 changes: 50 additions & 38 deletions google/cloud/bigtable/data/_metrics/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@

SERVER_TIMING_REGEX = re.compile(r"gfet4t7; dur=(\d+)")

OPERATION_COMPLETE_ERROR = "Operation already completed"
NO_ATTEMPT_ERROR = "No active attempt"
HAS_ATTEMPT_ERROR = "Attempt already in progress"
INVALID_STATE_ERROR = "Invalid state for {}: {}"


class OperationType(Enum):
Expand All @@ -57,6 +55,14 @@ class OperationType(Enum):
READ_MODIFY_WRITE = "Bigtable.ReadModifyWriteRow"


class OperationState(Enum):
"""Enum for the state of the active operation."""
CREATED = 0
ACTIVE_ATTEMPT = 1
BETWEEN_ATTEMPTS = 2
COMPLETED = 3


@dataclass(frozen=True)
class CompletedAttemptMetric:
"""
Expand Down Expand Up @@ -90,7 +96,10 @@ class CompletedOperationMetric:
@dataclass
class ActiveAttemptMetric:
start_time: float = field(default_factory=time.monotonic)
# the time it takes to recieve the first response from the server
# currently only tracked for ReadRows
first_response_latency: float | None = None
# the time taken by the backend. Taken from response header
gfe_latency: float | None = None


Expand All @@ -108,9 +117,21 @@ class ActiveOperationMetric:
cluster_id: str | None = None
zone: str | None = None
completed_attempts: list[CompletedAttemptMetric] = field(default_factory=list)
was_completed: bool = False
handlers: list[MetricsHandler] = field(default_factory=list)
is_streaming: bool = False # only True for read_rows operations
was_completed: bool = False

@property
def state(self) -> OperationState:
if self.was_completed:
return OperationState.COMPLETED
elif self.active_attempt is None:
if self.completed_attempts:
return OperationState.BETWEEN_ATTEMPTS
else:
return OperationState.CREATED
else:
return OperationState.ACTIVE_ATTEMPT

def start(self) -> None:
"""
Expand All @@ -120,10 +141,8 @@ def start(self) -> None:
If the operation was completed or has active attempts, will raise an
exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS.
"""
if self.was_completed:
return self._handle_error(OPERATION_COMPLETE_ERROR)
if self.completed_attempts or self.active_attempt:
return self._handle_error(HAS_ATTEMPT_ERROR)
if self.state != OperationState.CREATED:
return self._handle_error(INVALID_STATE_ERROR.format("start", self.state))
self.start_time = time.monotonic()

def start_attempt(self) -> None:
Expand All @@ -133,10 +152,8 @@ def start_attempt(self) -> None:
If the operation was completed or there is already an active attempt,
will raise an exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS.
"""
if self.was_completed:
return self._handle_error(OPERATION_COMPLETE_ERROR)
if self.active_attempt is not None:
return self._handle_error(HAS_ATTEMPT_ERROR)
if self.state != OperationState.BETWEEN_ATTEMPTS and self.state != OperationState.CREATED:
return self._handle_error(INVALID_STATE_ERROR.format("start_attempt", self.state))

self.active_attempt = ActiveAttemptMetric()

Expand All @@ -152,25 +169,24 @@ def add_call_metadata(self, metadata: dict[str, bytes | str]) -> None:
Args:
- metadata: the metadata as extracted from the grpc call
"""
if self.was_completed:
return self._handle_error(OPERATION_COMPLETE_ERROR)
if self.active_attempt is None:
return self._handle_error(NO_ATTEMPT_ERROR)
if self.state != OperationState.ACTIVE_ATTEMPT:
return self._handle_error(INVALID_STATE_ERROR.format("add_call_metadata", self.state))

if self.cluster_id is None or self.zone is None:
bigtable_metadata = metadata.get(BIGTABLE_METADATA_KEY)
if bigtable_metadata and isinstance(bigtable_metadata, bytes):
decoded = "".join(
c if c.isprintable() else " "
for c in bigtable_metadata.decode("utf-8")
)
cluster_id, zone = decoded.split()
if cluster_id:
self.cluster_id = cluster_id
if zone:
self.zone = zone
if bigtable_metadata:
try:
decoded = "".join(
c if c.isprintable() else " "
for c in bigtable_metadata.decode("utf-8")
)
split_data = decoded.split()
self.cluster_id = split_data[0]
self.zone = split_data[1]
except (AttributeError, IndexError):
self._handle_error(f"Failed to decode {BIGTABLE_METADATA_KEY} metadata: {bigtable_metadata}")
timing_header = metadata.get(SERVER_TIMING_METADATA_KEY)
if timing_header and isinstance(timing_header, str):
if timing_header:
timing_data = SERVER_TIMING_REGEX.match(timing_header)
if timing_data:
# convert from milliseconds to seconds
Expand All @@ -185,10 +201,8 @@ def attempt_first_response(self) -> None:
active attempt already has a first response time, will raise an
exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS.
"""
if self.was_completed:
return self._handle_error(OPERATION_COMPLETE_ERROR)
elif self.active_attempt is None:
return self._handle_error(NO_ATTEMPT_ERROR)
if self.state != OperationState.ACTIVE_ATTEMPT:
return self._handle_error(INVALID_STATE_ERROR.format("attempt_first_response", self.state))
elif self.active_attempt.first_response_latency is not None:
return self._handle_error("Attempt already received first response")
self.active_attempt.first_response_latency = (
Expand All @@ -205,10 +219,8 @@ def end_attempt_with_status(self, status: StatusCode | Exception) -> None:
Args:
- status: The status of the attempt.
"""
if self.was_completed:
return self._handle_error(OPERATION_COMPLETE_ERROR)
if self.active_attempt is None:
return self._handle_error(NO_ATTEMPT_ERROR)
if self.state != OperationState.ACTIVE_ATTEMPT:
return self._handle_error(INVALID_STATE_ERROR.format("end_attempt_with_status", self.state))

new_attempt = CompletedAttemptMetric(
start_time=self.active_attempt.start_time,
Expand All @@ -235,9 +247,9 @@ def end_with_status(self, status: StatusCode | Exception) -> None:
Args:
- status: The status of the operation.
"""
if self.was_completed:
return self._handle_error(OPERATION_COMPLETE_ERROR)
if self.active_attempt is not None:
if self.state == OperationState.COMPLETED:
return self._handle_error(INVALID_STATE_ERROR.format("end_with_status", self.state))
elif self.state == OperationState.ACTIVE_ATTEMPT:
self.end_attempt_with_status(status)
self.was_completed = True
finalized = CompletedOperationMetric(
Expand Down
Loading

0 comments on commit 7900f27

Please sign in to comment.