diff --git a/providers/openfeature-provider-flagd/openfeature/test-harness b/providers/openfeature-provider-flagd/openfeature/test-harness index 536d4845..e132d258 160000 --- a/providers/openfeature-provider-flagd/openfeature/test-harness +++ b/providers/openfeature-provider-flagd/openfeature/test-harness @@ -1 +1 @@ -Subproject commit 536d4845c0fa4255e3e98b7ee382d0eb73f7b4c0 +Subproject commit e132d25822eaad367f81cf2a06b422edac32a76d diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py index 1bb73ece..bcd4da85 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py @@ -1,3 +1,4 @@ +import dataclasses import os import typing from enum import Enum @@ -19,10 +20,13 @@ class CacheType(Enum): DEFAULT_HOST = "localhost" DEFAULT_KEEP_ALIVE = 0 DEFAULT_OFFLINE_SOURCE_PATH: typing.Optional[str] = None +DEFAULT_OFFLINE_POLL_MS = 5000 DEFAULT_PORT_IN_PROCESS = 8015 DEFAULT_PORT_RPC = 8013 DEFAULT_RESOLVER_TYPE = ResolverType.RPC DEFAULT_RETRY_BACKOFF = 1000 +DEFAULT_RETRY_BACKOFF_MAX = 120000 +DEFAULT_RETRY_GRACE_ATTEMPTS = 5 DEFAULT_STREAM_DEADLINE = 600000 DEFAULT_TLS = False @@ -32,9 +36,12 @@ class CacheType(Enum): ENV_VAR_HOST = "FLAGD_HOST" ENV_VAR_KEEP_ALIVE_TIME_MS = "FLAGD_KEEP_ALIVE_TIME_MS" ENV_VAR_OFFLINE_FLAG_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH" +ENV_VAR_OFFLINE_POLL_MS = "FLAGD_OFFLINE_POLL_MS" ENV_VAR_PORT = "FLAGD_PORT" ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER" ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS" +ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS" +ENV_VAR_RETRY_GRACE_ATTEMPTS = "FLAGD_RETRY_GRACE_ATTEMPTS" ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS" ENV_VAR_TLS = "FLAGD_TLS" @@ -62,19 +69,23 @@ def env_or_default( return val if cast is None else cast(val) +@dataclasses.dataclass class Config: def __init__( # noqa: PLR0913 self, host: typing.Optional[str] = None, port: typing.Optional[int] = None, tls: typing.Optional[bool] = None, - resolver_type: typing.Optional[ResolverType] = None, + resolver: typing.Optional[ResolverType] = None, offline_flag_source_path: typing.Optional[str] = None, + offline_poll_interval_ms: typing.Optional[int] = None, retry_backoff_ms: typing.Optional[int] = None, - deadline: typing.Optional[int] = None, + retry_backoff_max_ms: typing.Optional[int] = None, + retry_grace_attempts: typing.Optional[int] = None, + deadline_ms: typing.Optional[int] = None, stream_deadline_ms: typing.Optional[int] = None, - keep_alive: typing.Optional[int] = None, - cache_type: typing.Optional[CacheType] = None, + keep_alive_time: typing.Optional[int] = None, + cache: typing.Optional[CacheType] = None, max_cache_size: typing.Optional[int] = None, ): self.host = env_or_default(ENV_VAR_HOST, DEFAULT_HOST) if host is None else host @@ -94,18 +105,37 @@ def __init__( # noqa: PLR0913 if retry_backoff_ms is None else retry_backoff_ms ) + self.retry_backoff_max_ms: int = ( + int( + env_or_default( + ENV_VAR_RETRY_BACKOFF_MAX_MS, DEFAULT_RETRY_BACKOFF_MAX, cast=int + ) + ) + if retry_backoff_max_ms is None + else retry_backoff_max_ms + ) - self.resolver_type = ( + self.retry_grace_attempts: int = ( + int( + env_or_default( + ENV_VAR_RETRY_GRACE_ATTEMPTS, DEFAULT_RETRY_GRACE_ATTEMPTS, cast=int + ) + ) + if retry_grace_attempts is None + else retry_grace_attempts + ) + + self.resolver = ( env_or_default( ENV_VAR_RESOLVER_TYPE, DEFAULT_RESOLVER_TYPE, cast=convert_resolver_type ) - if resolver_type is None - else resolver_type + if resolver is None + else resolver ) default_port = ( DEFAULT_PORT_RPC - if self.resolver_type is ResolverType.RPC + if self.resolver is ResolverType.RPC else DEFAULT_PORT_IN_PROCESS ) @@ -123,10 +153,20 @@ def __init__( # noqa: PLR0913 else offline_flag_source_path ) - self.deadline: int = ( + self.offline_poll_interval_ms: int = ( + int( + env_or_default( + ENV_VAR_OFFLINE_POLL_MS, DEFAULT_OFFLINE_POLL_MS, cast=int + ) + ) + if offline_poll_interval_ms is None + else offline_poll_interval_ms + ) + + self.deadline_ms: int = ( int(env_or_default(ENV_VAR_DEADLINE_MS, DEFAULT_DEADLINE, cast=int)) - if deadline is None - else deadline + if deadline_ms is None + else deadline_ms ) self.stream_deadline_ms: int = ( @@ -139,18 +179,18 @@ def __init__( # noqa: PLR0913 else stream_deadline_ms ) - self.keep_alive: int = ( + self.keep_alive_time: int = ( int( env_or_default(ENV_VAR_KEEP_ALIVE_TIME_MS, DEFAULT_KEEP_ALIVE, cast=int) ) - if keep_alive is None - else keep_alive + if keep_alive_time is None + else keep_alive_time ) - self.cache_type = ( + self.cache = ( CacheType(env_or_default(ENV_VAR_CACHE_TYPE, DEFAULT_CACHE)) - if cache_type is None - else cache_type + if cache is None + else cache ) self.max_cache_size: int = ( diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index 35fe2059..07e148e1 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -52,6 +52,8 @@ def __init__( # noqa: PLR0913 keep_alive_time: typing.Optional[int] = None, cache_type: typing.Optional[CacheType] = None, max_cache_size: typing.Optional[int] = None, + retry_backoff_max_ms: typing.Optional[int] = None, + retry_grace_attempts: typing.Optional[int] = None, ): """ Create an instance of the FlagdProvider @@ -79,31 +81,34 @@ def __init__( # noqa: PLR0913 host=host, port=port, tls=tls, - deadline=deadline, + deadline_ms=deadline, retry_backoff_ms=retry_backoff_ms, - resolver_type=resolver_type, + retry_backoff_max_ms=retry_backoff_max_ms, + retry_grace_attempts=retry_grace_attempts, + resolver=resolver_type, offline_flag_source_path=offline_flag_source_path, stream_deadline_ms=stream_deadline_ms, - keep_alive=keep_alive_time, - cache_type=cache_type, + keep_alive_time=keep_alive_time, + cache=cache_type, max_cache_size=max_cache_size, ) self.resolver = self.setup_resolver() def setup_resolver(self) -> AbstractResolver: - if self.config.resolver_type == ResolverType.RPC: + if self.config.resolver == ResolverType.RPC: return GrpcResolver( self.config, self.emit_provider_ready, self.emit_provider_error, + self.emit_provider_stale, self.emit_provider_configuration_changed, ) - elif self.config.resolver_type == ResolverType.IN_PROCESS: + elif self.config.resolver == ResolverType.IN_PROCESS: return InProcessResolver(self.config, self) else: raise ValueError( - f"`resolver_type` parameter invalid: {self.config.resolver_type}" + f"`resolver_type` parameter invalid: {self.config.resolver}" ) def initialize(self, evaluation_context: EvaluationContext) -> None: diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py index 17f721a4..3c260156 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py @@ -37,13 +37,12 @@ class GrpcResolver: - MAX_BACK_OFF = 120 - def __init__( self, config: Config, emit_provider_ready: typing.Callable[[ProviderEventDetails], None], emit_provider_error: typing.Callable[[ProviderEventDetails], None], + emit_provider_stale: typing.Callable[[ProviderEventDetails], None], emit_provider_configuration_changed: typing.Callable[ [ProviderEventDetails], None ], @@ -51,16 +50,19 @@ def __init__( self.config = config self.emit_provider_ready = emit_provider_ready self.emit_provider_error = emit_provider_error + self.emit_provider_stale = emit_provider_stale self.emit_provider_configuration_changed = emit_provider_configuration_changed self.cache: typing.Optional[BaseCacheImpl] = ( LRUCache(maxsize=self.config.max_cache_size) - if self.config.cache_type == CacheType.LRU + if self.config.cache == CacheType.LRU else None ) self.stub, self.channel = self._create_stub() self.retry_backoff_seconds = config.retry_backoff_ms * 0.001 + self.retry_backoff_max_seconds = config.retry_backoff_ms * 0.001 + self.retry_grace_attempts = config.retry_grace_attempts self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001 - self.deadline = config.deadline * 0.001 + self.deadline = config.deadline_ms * 0.001 self.connected = False def _create_stub( @@ -70,13 +72,10 @@ def _create_stub( channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel channel = channel_factory( f"{config.host}:{config.port}", - options=(("grpc.keepalive_time_ms", config.keep_alive),), + options=(("grpc.keepalive_time_ms", config.keep_alive_time),), ) stub = evaluation_pb2_grpc.ServiceStub(channel) - if self.cache: - self.cache.clear() - return stub, channel def initialize(self, evaluation_context: EvaluationContext) -> None: @@ -113,8 +112,10 @@ def listen(self) -> None: if self.streamline_deadline_seconds > 0 else {} ) + retry_counter = 0 while self.active: request = evaluation_pb2.EventStreamRequest() + try: logger.debug("Setting up gRPC sync flags connection") for message in self.stub.EventStream(request, **call_args): @@ -126,6 +127,7 @@ def listen(self) -> None: ) ) self.connected = True + retry_counter = 0 # reset retry delay after successsful read retry_delay = self.retry_backoff_seconds @@ -146,15 +148,37 @@ def listen(self) -> None: ) self.connected = False + self.handle_error(retry_counter, retry_delay) + + retry_delay = self.handle_retry(retry_counter, retry_delay) + + retry_counter = retry_counter + 1 + + def handle_retry(self, retry_counter: int, retry_delay: float) -> float: + if retry_counter == 0: + logger.info("gRPC sync disconnected, reconnecting immediately") + else: + logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s") + time.sleep(retry_delay) + retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds) + return retry_delay + + def handle_error(self, retry_counter: int, retry_delay: float) -> None: + if retry_counter == self.retry_grace_attempts: + if self.cache: + self.cache.clear() self.emit_provider_error( ProviderEventDetails( message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", error_code=ErrorCode.GENERAL, ) ) - logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s") - time.sleep(retry_delay) - retry_delay = min(1.1 * retry_delay, self.MAX_BACK_OFF) + elif retry_counter == 1: + self.emit_provider_stale( + ProviderEventDetails( + message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", + ) + ) def handle_changed_flags(self, data: typing.Any) -> None: changed_flags = list(data["flags"].keys()) diff --git a/providers/openfeature-provider-flagd/tests/e2e/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/conftest.py index 350f6f8d..077e6926 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/conftest.py @@ -13,7 +13,7 @@ # running all gherkin tests, except the ones, not implemented def pytest_collection_modifyitems(config): - marker = "not customCert and not unixsocket and not sync" + marker = "not customCert and not unixsocket and not sync and not targetURI" # this seems to not work with python 3.8 if hasattr(config.option, "markexpr") and config.option.markexpr == "": diff --git a/providers/openfeature-provider-flagd/tests/e2e/events.feature b/providers/openfeature-provider-flagd/tests/e2e/events.feature deleted file mode 100644 index 946eece5..00000000 --- a/providers/openfeature-provider-flagd/tests/e2e/events.feature +++ /dev/null @@ -1,29 +0,0 @@ -Feature: flagd providers - - # This test suite contains scenarios to test flagd providers. - # It's associated with the flags configured in flags/changing-flag.json and flags/zero-flags.json. - # It should be used in conjunction with the suites supplied by the OpenFeature specification. - - Background: - Given a flagd provider is set - - # events - Scenario Outline: Provider events - When a handler is added - Then the handler must run - Examples: - | event | - | PROVIDER_ERROR | - | PROVIDER_STALE | - | PROVIDER_READY | - - # events - Scenario: Provider events chain ready -> stale -> error -> ready - When a PROVIDER_READY handler is added - Then the PROVIDER_READY handler must run - When a PROVIDER_STALE handler is added - Then the PROVIDER_STALE handler must run - When a PROVIDER_ERROR handler is added - Then the PROVIDER_ERROR handler must run - When a PROVIDER_READY handler is added - Then the PROVIDER_READY handler must run diff --git a/providers/openfeature-provider-flagd/tests/e2e/steps.py b/providers/openfeature-provider-flagd/tests/e2e/steps.py index 6e09fb57..c81c8871 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/steps.py +++ b/providers/openfeature-provider-flagd/tests/e2e/steps.py @@ -591,7 +591,7 @@ def assert_handlers( ) ) def assert_handler_run(event_type: ProviderEvent, event_handles): - assert_handlers(event_handles, event_type, max_wait=6) + assert_handlers(event_handles, event_type, max_wait=30) @then( diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_config.py b/providers/openfeature-provider-flagd/tests/e2e/test_config.py index 25f0e03f..dc413aee 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_config.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_config.py @@ -4,7 +4,7 @@ import pytest from asserts import assert_equal -from pytest_bdd import parsers, scenarios, then, when +from pytest_bdd import given, parsers, scenarios, then, when from tests.e2e.conftest import TEST_HARNESS_PATH from openfeature.contrib.provider.flagd.config import CacheType, Config, ResolverType @@ -47,9 +47,9 @@ def option_values() -> dict: return {} -@when( +@given( parsers.cfparse( - 'we have an option "{option}" of type "{type_info}" with value "{value}"', + 'an option "{option}" of type "{type_info}" with value "{value}"', ), ) def option_with_value(option: str, value: str, type_info: str, option_values: dict): @@ -57,9 +57,9 @@ def option_with_value(option: str, value: str, type_info: str, option_values: di option_values[camel_to_snake(option)] = value -@when( +@given( parsers.cfparse( - 'we have an environment variable "{env}" with value "{value}"', + 'an environment variable "{env}" with value "{value}"', ), ) def env_with_value(monkeypatch, env: str, value: str): @@ -68,7 +68,7 @@ def env_with_value(monkeypatch, env: str, value: str): @when( parsers.cfparse( - "we initialize a config", + "a config was initialized", ), target_fixture="config", ) @@ -78,12 +78,12 @@ def initialize_config(option_values): @when( parsers.cfparse( - 'we initialize a config for "{resolver_type}"', + 'a config was initialized for "{resolver_type}"', ), target_fixture="config", ) def initialize_config_for(resolver_type: str, option_values: dict): - return Config(resolver_type=ResolverType(resolver_type), **option_values) + return Config(resolver=ResolverType(resolver_type), **option_values) @then( diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py b/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py index b99df2be..f56e82b7 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py @@ -27,4 +27,5 @@ def image(): scenarios( f"{TEST_HARNESS_PATH}/gherkin/flagd-reconnect.feature", + f"{TEST_HARNESS_PATH}/gherkin/events.feature", ) diff --git a/providers/openfeature-provider-flagd/tests/test_config.py b/providers/openfeature-provider-flagd/tests/test_config.py index c923fb68..cc53a029 100644 --- a/providers/openfeature-provider-flagd/tests/test_config.py +++ b/providers/openfeature-provider-flagd/tests/test_config.py @@ -32,29 +32,29 @@ def test_return_default_values_rpc(): config = Config() - assert config.cache_type == DEFAULT_CACHE + assert config.cache == DEFAULT_CACHE assert config.max_cache_size == DEFAULT_CACHE_SIZE - assert config.deadline == DEFAULT_DEADLINE + assert config.deadline_ms == DEFAULT_DEADLINE assert config.host == DEFAULT_HOST - assert config.keep_alive == DEFAULT_KEEP_ALIVE + assert config.keep_alive_time == DEFAULT_KEEP_ALIVE assert config.offline_flag_source_path == DEFAULT_OFFLINE_SOURCE_PATH assert config.port == DEFAULT_PORT_RPC - assert config.resolver_type == DEFAULT_RESOLVER_TYPE + assert config.resolver == DEFAULT_RESOLVER_TYPE assert config.retry_backoff_ms == DEFAULT_RETRY_BACKOFF assert config.stream_deadline_ms == DEFAULT_STREAM_DEADLINE assert config.tls is DEFAULT_TLS def test_return_default_values_in_process(): - config = Config(resolver_type=ResolverType.IN_PROCESS) - assert config.cache_type == DEFAULT_CACHE + config = Config(resolver=ResolverType.IN_PROCESS) + assert config.cache == DEFAULT_CACHE assert config.max_cache_size == DEFAULT_CACHE_SIZE - assert config.deadline == DEFAULT_DEADLINE + assert config.deadline_ms == DEFAULT_DEADLINE assert config.host == DEFAULT_HOST - assert config.keep_alive == DEFAULT_KEEP_ALIVE + assert config.keep_alive_time == DEFAULT_KEEP_ALIVE assert config.offline_flag_source_path == DEFAULT_OFFLINE_SOURCE_PATH assert config.port == DEFAULT_PORT_IN_PROCESS - assert config.resolver_type == ResolverType.IN_PROCESS + assert config.resolver == ResolverType.IN_PROCESS assert config.retry_backoff_ms == DEFAULT_RETRY_BACKOFF assert config.stream_deadline_ms == DEFAULT_STREAM_DEADLINE assert config.tls is DEFAULT_TLS @@ -90,14 +90,14 @@ def test_overrides_defaults_with_environment(monkeypatch, resolver_type): # noq monkeypatch.setenv(ENV_VAR_TLS, str(tls)) config = Config() - assert config.cache_type == cache + assert config.cache == cache assert config.max_cache_size == cache_size - assert config.deadline == deadline + assert config.deadline_ms == deadline assert config.host == host - assert config.keep_alive == keep_alive + assert config.keep_alive_time == keep_alive assert config.offline_flag_source_path == offline_path assert config.port == port - assert config.resolver_type == resolver_type + assert config.resolver == resolver_type assert config.retry_backoff_ms == retry_backoff assert config.stream_deadline_ms == stream_deadline assert config.tls is tls @@ -128,26 +128,26 @@ def test_uses_arguments_over_environments_and_defaults(monkeypatch, resolver_typ monkeypatch.setenv(ENV_VAR_TLS, str(tls) + "value") config = Config( - cache_type=cache, + cache=cache, max_cache_size=cache_size, - deadline=deadline, + deadline_ms=deadline, host=host, port=port, - resolver_type=resolver_type, + resolver=resolver_type, retry_backoff_ms=retry_backoff, stream_deadline_ms=stream_deadline, tls=tls, - keep_alive=keep_alive, + keep_alive_time=keep_alive, offline_flag_source_path=offline_path, ) - assert config.cache_type == cache + assert config.cache == cache assert config.max_cache_size == cache_size - assert config.deadline == deadline + assert config.deadline_ms == deadline assert config.host == host - assert config.keep_alive == keep_alive + assert config.keep_alive_time == keep_alive assert config.offline_flag_source_path == offline_path assert config.port == port - assert config.resolver_type == resolver_type + assert config.resolver == resolver_type assert config.retry_backoff_ms == retry_backoff assert config.stream_deadline_ms == stream_deadline assert config.tls is tls