Skip to content

Commit

Permalink
Merge branch 'feat/grace_attempts' into feat/grpc-sync-addition
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Schrottner <[email protected]>
  • Loading branch information
aepfli committed Dec 6, 2024
2 parents c05b07d + 518572a commit 6b21f96
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses
import os
import typing
from enum import Enum
Expand All @@ -19,10 +20,13 @@ class CacheType(Enum):
DEFAULT_HOST = "localhost"
DEFAULT_KEEP_ALIVE = 0
DEFAULT_OFFLINE_SOURCE_PATH: typing.Optional[str] = None
DEFAULT_OFFLINE_POLL_MS = 5000
DEFAULT_PORT_IN_PROCESS = 8015
DEFAULT_PORT_RPC = 8013
DEFAULT_RESOLVER_TYPE = ResolverType.RPC
DEFAULT_RETRY_BACKOFF = 1000
DEFAULT_RETRY_BACKOFF_MAX = 120000
DEFAULT_RETRY_GRACE_ATTEMPTS = 5
DEFAULT_STREAM_DEADLINE = 600000
DEFAULT_TLS = False

Expand All @@ -32,9 +36,12 @@ class CacheType(Enum):
ENV_VAR_HOST = "FLAGD_HOST"
ENV_VAR_KEEP_ALIVE_TIME_MS = "FLAGD_KEEP_ALIVE_TIME_MS"
ENV_VAR_OFFLINE_FLAG_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
ENV_VAR_OFFLINE_POLL_MS = "FLAGD_OFFLINE_POLL_MS"
ENV_VAR_PORT = "FLAGD_PORT"
ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER"
ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS"
ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS"
ENV_VAR_RETRY_GRACE_ATTEMPTS = "FLAGD_RETRY_GRACE_ATTEMPTS"
ENV_VAR_SELECTOR = "FLAGD_SELECTOR"
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
ENV_VAR_TLS = "FLAGD_TLS"
Expand Down Expand Up @@ -63,6 +70,7 @@ def env_or_default(
return val if cast is None else cast(val)


@dataclasses.dataclass
class Config:
def __init__( # noqa: PLR0913
self,
Expand All @@ -72,7 +80,10 @@ def __init__( # noqa: PLR0913
selector: typing.Optional[str] = None,
resolver: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
offline_poll_interval_ms: typing.Optional[int] = None,
retry_backoff_ms: typing.Optional[int] = None,
retry_backoff_max_ms: typing.Optional[int] = None,
retry_grace_attempts: typing.Optional[int] = None,
deadline_ms: typing.Optional[int] = None,
stream_deadline_ms: typing.Optional[int] = None,
keep_alive_time: typing.Optional[int] = None,
Expand All @@ -96,6 +107,25 @@ def __init__( # noqa: PLR0913
if retry_backoff_ms is None
else retry_backoff_ms
)
self.retry_backoff_max_ms: int = (
int(
env_or_default(
ENV_VAR_RETRY_BACKOFF_MAX_MS, DEFAULT_RETRY_BACKOFF_MAX, cast=int
)
)
if retry_backoff_max_ms is None
else retry_backoff_max_ms
)

self.retry_grace_attempts: int = (
int(
env_or_default(
ENV_VAR_RETRY_GRACE_ATTEMPTS, DEFAULT_RETRY_GRACE_ATTEMPTS, cast=int
)
)
if retry_grace_attempts is None
else retry_grace_attempts
)

self.resolver = (
env_or_default(
Expand Down Expand Up @@ -125,6 +155,16 @@ def __init__( # noqa: PLR0913
else offline_flag_source_path
)

self.offline_poll_interval_ms: int = (
int(
env_or_default(
ENV_VAR_OFFLINE_POLL_MS, DEFAULT_OFFLINE_POLL_MS, cast=int
)
)
if offline_poll_interval_ms is None
else offline_poll_interval_ms
)

self.deadline_ms: int = (
int(env_or_default(ENV_VAR_DEADLINE_MS, DEFAULT_DEADLINE, cast=int))
if deadline_ms is None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def __init__( # noqa: PLR0913
keep_alive_time: typing.Optional[int] = None,
cache_type: typing.Optional[CacheType] = None,
max_cache_size: typing.Optional[int] = None,
retry_backoff_max_ms: typing.Optional[int] = None,
retry_grace_attempts: typing.Optional[int] = None,
):
"""
Create an instance of the FlagdProvider
Expand Down Expand Up @@ -83,6 +85,8 @@ def __init__( # noqa: PLR0913
tls=tls,
deadline_ms=deadline,
retry_backoff_ms=retry_backoff_ms,
retry_backoff_max_ms=retry_backoff_max_ms,
retry_grace_attempts=retry_grace_attempts,
selector=selector,
resolver=resolver_type,
offline_flag_source_path=offline_flag_source_path,
Expand All @@ -100,6 +104,7 @@ def setup_resolver(self) -> AbstractResolver:
self.config,
self.emit_provider_ready,
self.emit_provider_error,
self.emit_provider_stale,
self.emit_provider_configuration_changed,
)
elif self.config.resolver == ResolverType.IN_PROCESS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

from ..config import CacheType, Config
from ..flag_type import FlagType
from .protocol import AbstractResolver

if typing.TYPE_CHECKING:
from google.protobuf.message import Message
Expand All @@ -37,23 +36,21 @@
logger = logging.getLogger("openfeature.contrib")


class GrpcResolver(AbstractResolver):
MAX_BACK_OFF = 120

MAX_BACK_OFF = 120

class GrpcResolver:
def __init__(
self,
config: Config,
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
emit_provider_configuration_changed: typing.Callable[
[ProviderEventDetails], None
],
):
self.config = config
self.emit_provider_ready = emit_provider_ready
self.emit_provider_error = emit_provider_error
self.emit_provider_stale = emit_provider_stale
self.emit_provider_configuration_changed = emit_provider_configuration_changed
self.cache: typing.Optional[BaseCacheImpl] = (
LRUCache(maxsize=self.config.max_cache_size)
Expand All @@ -62,6 +59,8 @@ def __init__(
)
self.stub, self.channel = self._create_stub()
self.retry_backoff_seconds = config.retry_backoff_ms * 0.001
self.retry_backoff_max_seconds = config.retry_backoff_ms * 0.001
self.retry_grace_attempts = config.retry_grace_attempts
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
self.deadline = config.deadline_ms * 0.001
self.connected = False
Expand All @@ -77,9 +76,6 @@ def _create_stub(
)
stub = evaluation_pb2_grpc.ServiceStub(channel)

if self.cache:
self.cache.clear()

return stub, channel

def initialize(self, evaluation_context: EvaluationContext) -> None:
Expand Down Expand Up @@ -116,8 +112,10 @@ def listen(self) -> None:
if self.streamline_deadline_seconds > 0
else {}
)
retry_counter = 0
while self.active:
request = evaluation_pb2.EventStreamRequest()

try:
logger.debug("Setting up gRPC sync flags connection")
for message in self.stub.EventStream(request, **call_args):
Expand All @@ -129,6 +127,7 @@ def listen(self) -> None:
)
)
self.connected = True
retry_counter = 0
# reset retry delay after successsful read
retry_delay = self.retry_backoff_seconds

Expand All @@ -149,15 +148,37 @@ def listen(self) -> None:
)

self.connected = False
self.handle_error(retry_counter, retry_delay)

retry_delay = self.handle_retry(retry_counter, retry_delay)

retry_counter = retry_counter + 1

def handle_retry(self, retry_counter: int, retry_delay: float) -> float:
if retry_counter == 0:
logger.info("gRPC sync disconnected, reconnecting immediately")
else:
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
time.sleep(retry_delay)
retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds)
return retry_delay

def handle_error(self, retry_counter: int, retry_delay: float) -> None:
if retry_counter == self.retry_grace_attempts:
if self.cache:
self.cache.clear()
self.emit_provider_error(
ProviderEventDetails(
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
error_code=ErrorCode.GENERAL,
)
)
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
time.sleep(retry_delay)
retry_delay = min(1.1 * retry_delay, self.MAX_BACK_OFF)
elif retry_counter == 1:
self.emit_provider_stale(
ProviderEventDetails(
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
)
)

def handle_changed_flags(self, data: typing.Any) -> None:
changed_flags = list(data["flags"].keys())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from openfeature.evaluation_context import EvaluationContext
from openfeature.event import ProviderEventDetails
from openfeature.exception import ErrorCode, ParseError, ProviderNotReadyError
from openfeature.schemas.protobuf.flagd.sync.v1 import ( # type:ignore[import-not-found]
from openfeature.schemas.protobuf.flagd.sync.v1 import (
sync_pb2,
sync_pb2_grpc,
)
Expand All @@ -22,8 +22,6 @@


class GrpcWatcher(FlagStateConnector):
MAX_BACK_OFF = 120

def __init__(
self,
config: Config,
Expand All @@ -36,6 +34,8 @@ def __init__(

self.stub, self.channel = self.create_stub()
self.retry_backoff_seconds = config.retry_backoff_ms * 0.001
self.retry_backoff_max_seconds = config.retry_backoff_ms * 0.001
self.retry_grace_attempts = config.retry_grace_attempts
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
self.deadline = config.deadline_ms * 0.001
self.selector = config.selector
Expand Down Expand Up @@ -85,9 +85,12 @@ def sync_flags(self) -> None:
if self.streamline_deadline_seconds > 0
else {}
)

while self.active:
try:
request = sync_pb2.SyncFlagsRequest(selector=self.selector)
request = sync_pb2.SyncFlagsRequest(
selector=(self.selector if self.selector else None)
)
logger.debug("Setting up gRPC sync flags connection")
for flag_rsp in self.stub.SyncFlags(request, **call_args):
flag_str = flag_rsp.flag_configuration
Expand Down Expand Up @@ -130,4 +133,4 @@ def sync_flags(self) -> None:
)
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
time.sleep(retry_delay)
retry_delay = min(1.1 * retry_delay, self.MAX_BACK_OFF)
retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds)
2 changes: 1 addition & 1 deletion providers/openfeature-provider-flagd/tests/e2e/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ def assert_handlers(
)
)
def assert_handler_run(event_type: ProviderEvent, event_handles):
assert_handlers(event_handles, event_type, max_wait=6)
assert_handlers(event_handles, event_type, max_wait=30)


@then(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ def image():

scenarios(
f"{TEST_HARNESS_PATH}/gherkin/flagd-reconnect.feature",
f"{TEST_HARNESS_PATH}/gherkin/events.feature",
)

0 comments on commit 6b21f96

Please sign in to comment.