From 41d0ad8b6a5b32272c75684cfcbabffb57e53470 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Thu, 12 Dec 2024 14:12:24 +0100 Subject: [PATCH 01/14] feat(flagd-rpc): adding grace attempts (#117) * feat(flagd-rpc): add caching with tests Signed-off-by: Simon Schrottner --- .../openfeature/test-harness | 2 +- .../contrib/provider/flagd/config.py | 74 ++++++++++++++----- .../contrib/provider/flagd/provider.py | 19 +++-- .../contrib/provider/flagd/resolvers/grpc.py | 46 +++++++++--- .../tests/e2e/conftest.py | 2 +- .../tests/e2e/steps.py | 2 +- .../tests/e2e/test_config.py | 16 ++-- .../tests/e2e/test_rpc_reconnect.py | 1 + .../tests/test_config.py | 42 +++++------ 9 files changed, 137 insertions(+), 67 deletions(-) diff --git a/providers/openfeature-provider-flagd/openfeature/test-harness b/providers/openfeature-provider-flagd/openfeature/test-harness index 536d4845..e132d258 160000 --- a/providers/openfeature-provider-flagd/openfeature/test-harness +++ b/providers/openfeature-provider-flagd/openfeature/test-harness @@ -1 +1 @@ -Subproject commit 536d4845c0fa4255e3e98b7ee382d0eb73f7b4c0 +Subproject commit e132d25822eaad367f81cf2a06b422edac32a76d diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py index 1bb73ece..bcd4da85 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py @@ -1,3 +1,4 @@ +import dataclasses import os import typing from enum import Enum @@ -19,10 +20,13 @@ class CacheType(Enum): DEFAULT_HOST = "localhost" DEFAULT_KEEP_ALIVE = 0 DEFAULT_OFFLINE_SOURCE_PATH: typing.Optional[str] = None +DEFAULT_OFFLINE_POLL_MS = 5000 DEFAULT_PORT_IN_PROCESS = 8015 DEFAULT_PORT_RPC = 8013 DEFAULT_RESOLVER_TYPE = ResolverType.RPC DEFAULT_RETRY_BACKOFF = 1000 +DEFAULT_RETRY_BACKOFF_MAX = 120000 +DEFAULT_RETRY_GRACE_ATTEMPTS = 5 DEFAULT_STREAM_DEADLINE = 600000 DEFAULT_TLS = False @@ -32,9 +36,12 @@ class CacheType(Enum): ENV_VAR_HOST = "FLAGD_HOST" ENV_VAR_KEEP_ALIVE_TIME_MS = "FLAGD_KEEP_ALIVE_TIME_MS" ENV_VAR_OFFLINE_FLAG_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH" +ENV_VAR_OFFLINE_POLL_MS = "FLAGD_OFFLINE_POLL_MS" ENV_VAR_PORT = "FLAGD_PORT" ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER" ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS" +ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS" +ENV_VAR_RETRY_GRACE_ATTEMPTS = "FLAGD_RETRY_GRACE_ATTEMPTS" ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS" ENV_VAR_TLS = "FLAGD_TLS" @@ -62,19 +69,23 @@ def env_or_default( return val if cast is None else cast(val) +@dataclasses.dataclass class Config: def __init__( # noqa: PLR0913 self, host: typing.Optional[str] = None, port: typing.Optional[int] = None, tls: typing.Optional[bool] = None, - resolver_type: typing.Optional[ResolverType] = None, + resolver: typing.Optional[ResolverType] = None, offline_flag_source_path: typing.Optional[str] = None, + offline_poll_interval_ms: typing.Optional[int] = None, retry_backoff_ms: typing.Optional[int] = None, - deadline: typing.Optional[int] = None, + retry_backoff_max_ms: typing.Optional[int] = None, + retry_grace_attempts: typing.Optional[int] = None, + deadline_ms: typing.Optional[int] = None, stream_deadline_ms: typing.Optional[int] = None, - keep_alive: typing.Optional[int] = None, - cache_type: typing.Optional[CacheType] = None, + keep_alive_time: typing.Optional[int] = None, + cache: typing.Optional[CacheType] = None, max_cache_size: typing.Optional[int] = None, ): self.host = env_or_default(ENV_VAR_HOST, DEFAULT_HOST) if host is None else host @@ -94,18 +105,37 @@ def __init__( # noqa: PLR0913 if retry_backoff_ms is None else retry_backoff_ms ) + self.retry_backoff_max_ms: int = ( + int( + env_or_default( + ENV_VAR_RETRY_BACKOFF_MAX_MS, DEFAULT_RETRY_BACKOFF_MAX, cast=int + ) + ) + if retry_backoff_max_ms is None + else retry_backoff_max_ms + ) - self.resolver_type = ( + self.retry_grace_attempts: int = ( + int( + env_or_default( + ENV_VAR_RETRY_GRACE_ATTEMPTS, DEFAULT_RETRY_GRACE_ATTEMPTS, cast=int + ) + ) + if retry_grace_attempts is None + else retry_grace_attempts + ) + + self.resolver = ( env_or_default( ENV_VAR_RESOLVER_TYPE, DEFAULT_RESOLVER_TYPE, cast=convert_resolver_type ) - if resolver_type is None - else resolver_type + if resolver is None + else resolver ) default_port = ( DEFAULT_PORT_RPC - if self.resolver_type is ResolverType.RPC + if self.resolver is ResolverType.RPC else DEFAULT_PORT_IN_PROCESS ) @@ -123,10 +153,20 @@ def __init__( # noqa: PLR0913 else offline_flag_source_path ) - self.deadline: int = ( + self.offline_poll_interval_ms: int = ( + int( + env_or_default( + ENV_VAR_OFFLINE_POLL_MS, DEFAULT_OFFLINE_POLL_MS, cast=int + ) + ) + if offline_poll_interval_ms is None + else offline_poll_interval_ms + ) + + self.deadline_ms: int = ( int(env_or_default(ENV_VAR_DEADLINE_MS, DEFAULT_DEADLINE, cast=int)) - if deadline is None - else deadline + if deadline_ms is None + else deadline_ms ) self.stream_deadline_ms: int = ( @@ -139,18 +179,18 @@ def __init__( # noqa: PLR0913 else stream_deadline_ms ) - self.keep_alive: int = ( + self.keep_alive_time: int = ( int( env_or_default(ENV_VAR_KEEP_ALIVE_TIME_MS, DEFAULT_KEEP_ALIVE, cast=int) ) - if keep_alive is None - else keep_alive + if keep_alive_time is None + else keep_alive_time ) - self.cache_type = ( + self.cache = ( CacheType(env_or_default(ENV_VAR_CACHE_TYPE, DEFAULT_CACHE)) - if cache_type is None - else cache_type + if cache is None + else cache ) self.max_cache_size: int = ( diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index 35fe2059..07e148e1 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -52,6 +52,8 @@ def __init__( # noqa: PLR0913 keep_alive_time: typing.Optional[int] = None, cache_type: typing.Optional[CacheType] = None, max_cache_size: typing.Optional[int] = None, + retry_backoff_max_ms: typing.Optional[int] = None, + retry_grace_attempts: typing.Optional[int] = None, ): """ Create an instance of the FlagdProvider @@ -79,31 +81,34 @@ def __init__( # noqa: PLR0913 host=host, port=port, tls=tls, - deadline=deadline, + deadline_ms=deadline, retry_backoff_ms=retry_backoff_ms, - resolver_type=resolver_type, + retry_backoff_max_ms=retry_backoff_max_ms, + retry_grace_attempts=retry_grace_attempts, + resolver=resolver_type, offline_flag_source_path=offline_flag_source_path, stream_deadline_ms=stream_deadline_ms, - keep_alive=keep_alive_time, - cache_type=cache_type, + keep_alive_time=keep_alive_time, + cache=cache_type, max_cache_size=max_cache_size, ) self.resolver = self.setup_resolver() def setup_resolver(self) -> AbstractResolver: - if self.config.resolver_type == ResolverType.RPC: + if self.config.resolver == ResolverType.RPC: return GrpcResolver( self.config, self.emit_provider_ready, self.emit_provider_error, + self.emit_provider_stale, self.emit_provider_configuration_changed, ) - elif self.config.resolver_type == ResolverType.IN_PROCESS: + elif self.config.resolver == ResolverType.IN_PROCESS: return InProcessResolver(self.config, self) else: raise ValueError( - f"`resolver_type` parameter invalid: {self.config.resolver_type}" + f"`resolver_type` parameter invalid: {self.config.resolver}" ) def initialize(self, evaluation_context: EvaluationContext) -> None: diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py index 17f721a4..8dfda518 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py @@ -37,13 +37,12 @@ class GrpcResolver: - MAX_BACK_OFF = 120 - def __init__( self, config: Config, emit_provider_ready: typing.Callable[[ProviderEventDetails], None], emit_provider_error: typing.Callable[[ProviderEventDetails], None], + emit_provider_stale: typing.Callable[[ProviderEventDetails], None], emit_provider_configuration_changed: typing.Callable[ [ProviderEventDetails], None ], @@ -51,16 +50,19 @@ def __init__( self.config = config self.emit_provider_ready = emit_provider_ready self.emit_provider_error = emit_provider_error + self.emit_provider_stale = emit_provider_stale self.emit_provider_configuration_changed = emit_provider_configuration_changed self.cache: typing.Optional[BaseCacheImpl] = ( LRUCache(maxsize=self.config.max_cache_size) - if self.config.cache_type == CacheType.LRU + if self.config.cache == CacheType.LRU else None ) self.stub, self.channel = self._create_stub() self.retry_backoff_seconds = config.retry_backoff_ms * 0.001 + self.retry_backoff_max_seconds = config.retry_backoff_max_ms * 0.001 + self.retry_grace_attempts = config.retry_grace_attempts self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001 - self.deadline = config.deadline * 0.001 + self.deadline = config.deadline_ms * 0.001 self.connected = False def _create_stub( @@ -70,13 +72,10 @@ def _create_stub( channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel channel = channel_factory( f"{config.host}:{config.port}", - options=(("grpc.keepalive_time_ms", config.keep_alive),), + options=(("grpc.keepalive_time_ms", config.keep_alive_time),), ) stub = evaluation_pb2_grpc.ServiceStub(channel) - if self.cache: - self.cache.clear() - return stub, channel def initialize(self, evaluation_context: EvaluationContext) -> None: @@ -113,8 +112,10 @@ def listen(self) -> None: if self.streamline_deadline_seconds > 0 else {} ) + retry_counter = 0 while self.active: request = evaluation_pb2.EventStreamRequest() + try: logger.debug("Setting up gRPC sync flags connection") for message in self.stub.EventStream(request, **call_args): @@ -126,6 +127,7 @@ def listen(self) -> None: ) ) self.connected = True + retry_counter = 0 # reset retry delay after successsful read retry_delay = self.retry_backoff_seconds @@ -146,15 +148,37 @@ def listen(self) -> None: ) self.connected = False + self.on_connection_error(retry_counter, retry_delay) + + retry_delay = self.handle_retry(retry_counter, retry_delay) + + retry_counter = retry_counter + 1 + + def handle_retry(self, retry_counter: int, retry_delay: float) -> float: + if retry_counter == 0: + logger.info("gRPC sync disconnected, reconnecting immediately") + else: + logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s") + time.sleep(retry_delay) + retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds) + return retry_delay + + def on_connection_error(self, retry_counter: int, retry_delay: float) -> None: + if retry_counter == self.retry_grace_attempts: + if self.cache: + self.cache.clear() self.emit_provider_error( ProviderEventDetails( message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", error_code=ErrorCode.GENERAL, ) ) - logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s") - time.sleep(retry_delay) - retry_delay = min(1.1 * retry_delay, self.MAX_BACK_OFF) + elif retry_counter == 1: + self.emit_provider_stale( + ProviderEventDetails( + message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", + ) + ) def handle_changed_flags(self, data: typing.Any) -> None: changed_flags = list(data["flags"].keys()) diff --git a/providers/openfeature-provider-flagd/tests/e2e/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/conftest.py index 350f6f8d..077e6926 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/conftest.py @@ -13,7 +13,7 @@ # running all gherkin tests, except the ones, not implemented def pytest_collection_modifyitems(config): - marker = "not customCert and not unixsocket and not sync" + marker = "not customCert and not unixsocket and not sync and not targetURI" # this seems to not work with python 3.8 if hasattr(config.option, "markexpr") and config.option.markexpr == "": diff --git a/providers/openfeature-provider-flagd/tests/e2e/steps.py b/providers/openfeature-provider-flagd/tests/e2e/steps.py index 6e09fb57..c81c8871 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/steps.py +++ b/providers/openfeature-provider-flagd/tests/e2e/steps.py @@ -591,7 +591,7 @@ def assert_handlers( ) ) def assert_handler_run(event_type: ProviderEvent, event_handles): - assert_handlers(event_handles, event_type, max_wait=6) + assert_handlers(event_handles, event_type, max_wait=30) @then( diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_config.py b/providers/openfeature-provider-flagd/tests/e2e/test_config.py index 25f0e03f..dc413aee 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_config.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_config.py @@ -4,7 +4,7 @@ import pytest from asserts import assert_equal -from pytest_bdd import parsers, scenarios, then, when +from pytest_bdd import given, parsers, scenarios, then, when from tests.e2e.conftest import TEST_HARNESS_PATH from openfeature.contrib.provider.flagd.config import CacheType, Config, ResolverType @@ -47,9 +47,9 @@ def option_values() -> dict: return {} -@when( +@given( parsers.cfparse( - 'we have an option "{option}" of type "{type_info}" with value "{value}"', + 'an option "{option}" of type "{type_info}" with value "{value}"', ), ) def option_with_value(option: str, value: str, type_info: str, option_values: dict): @@ -57,9 +57,9 @@ def option_with_value(option: str, value: str, type_info: str, option_values: di option_values[camel_to_snake(option)] = value -@when( +@given( parsers.cfparse( - 'we have an environment variable "{env}" with value "{value}"', + 'an environment variable "{env}" with value "{value}"', ), ) def env_with_value(monkeypatch, env: str, value: str): @@ -68,7 +68,7 @@ def env_with_value(monkeypatch, env: str, value: str): @when( parsers.cfparse( - "we initialize a config", + "a config was initialized", ), target_fixture="config", ) @@ -78,12 +78,12 @@ def initialize_config(option_values): @when( parsers.cfparse( - 'we initialize a config for "{resolver_type}"', + 'a config was initialized for "{resolver_type}"', ), target_fixture="config", ) def initialize_config_for(resolver_type: str, option_values: dict): - return Config(resolver_type=ResolverType(resolver_type), **option_values) + return Config(resolver=ResolverType(resolver_type), **option_values) @then( diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py b/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py index b99df2be..f56e82b7 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py @@ -27,4 +27,5 @@ def image(): scenarios( f"{TEST_HARNESS_PATH}/gherkin/flagd-reconnect.feature", + f"{TEST_HARNESS_PATH}/gherkin/events.feature", ) diff --git a/providers/openfeature-provider-flagd/tests/test_config.py b/providers/openfeature-provider-flagd/tests/test_config.py index c923fb68..cc53a029 100644 --- a/providers/openfeature-provider-flagd/tests/test_config.py +++ b/providers/openfeature-provider-flagd/tests/test_config.py @@ -32,29 +32,29 @@ def test_return_default_values_rpc(): config = Config() - assert config.cache_type == DEFAULT_CACHE + assert config.cache == DEFAULT_CACHE assert config.max_cache_size == DEFAULT_CACHE_SIZE - assert config.deadline == DEFAULT_DEADLINE + assert config.deadline_ms == DEFAULT_DEADLINE assert config.host == DEFAULT_HOST - assert config.keep_alive == DEFAULT_KEEP_ALIVE + assert config.keep_alive_time == DEFAULT_KEEP_ALIVE assert config.offline_flag_source_path == DEFAULT_OFFLINE_SOURCE_PATH assert config.port == DEFAULT_PORT_RPC - assert config.resolver_type == DEFAULT_RESOLVER_TYPE + assert config.resolver == DEFAULT_RESOLVER_TYPE assert config.retry_backoff_ms == DEFAULT_RETRY_BACKOFF assert config.stream_deadline_ms == DEFAULT_STREAM_DEADLINE assert config.tls is DEFAULT_TLS def test_return_default_values_in_process(): - config = Config(resolver_type=ResolverType.IN_PROCESS) - assert config.cache_type == DEFAULT_CACHE + config = Config(resolver=ResolverType.IN_PROCESS) + assert config.cache == DEFAULT_CACHE assert config.max_cache_size == DEFAULT_CACHE_SIZE - assert config.deadline == DEFAULT_DEADLINE + assert config.deadline_ms == DEFAULT_DEADLINE assert config.host == DEFAULT_HOST - assert config.keep_alive == DEFAULT_KEEP_ALIVE + assert config.keep_alive_time == DEFAULT_KEEP_ALIVE assert config.offline_flag_source_path == DEFAULT_OFFLINE_SOURCE_PATH assert config.port == DEFAULT_PORT_IN_PROCESS - assert config.resolver_type == ResolverType.IN_PROCESS + assert config.resolver == ResolverType.IN_PROCESS assert config.retry_backoff_ms == DEFAULT_RETRY_BACKOFF assert config.stream_deadline_ms == DEFAULT_STREAM_DEADLINE assert config.tls is DEFAULT_TLS @@ -90,14 +90,14 @@ def test_overrides_defaults_with_environment(monkeypatch, resolver_type): # noq monkeypatch.setenv(ENV_VAR_TLS, str(tls)) config = Config() - assert config.cache_type == cache + assert config.cache == cache assert config.max_cache_size == cache_size - assert config.deadline == deadline + assert config.deadline_ms == deadline assert config.host == host - assert config.keep_alive == keep_alive + assert config.keep_alive_time == keep_alive assert config.offline_flag_source_path == offline_path assert config.port == port - assert config.resolver_type == resolver_type + assert config.resolver == resolver_type assert config.retry_backoff_ms == retry_backoff assert config.stream_deadline_ms == stream_deadline assert config.tls is tls @@ -128,26 +128,26 @@ def test_uses_arguments_over_environments_and_defaults(monkeypatch, resolver_typ monkeypatch.setenv(ENV_VAR_TLS, str(tls) + "value") config = Config( - cache_type=cache, + cache=cache, max_cache_size=cache_size, - deadline=deadline, + deadline_ms=deadline, host=host, port=port, - resolver_type=resolver_type, + resolver=resolver_type, retry_backoff_ms=retry_backoff, stream_deadline_ms=stream_deadline, tls=tls, - keep_alive=keep_alive, + keep_alive_time=keep_alive, offline_flag_source_path=offline_path, ) - assert config.cache_type == cache + assert config.cache == cache assert config.max_cache_size == cache_size - assert config.deadline == deadline + assert config.deadline_ms == deadline assert config.host == host - assert config.keep_alive == keep_alive + assert config.keep_alive_time == keep_alive assert config.offline_flag_source_path == offline_path assert config.port == port - assert config.resolver_type == resolver_type + assert config.resolver == resolver_type assert config.retry_backoff_ms == retry_backoff assert config.stream_deadline_ms == stream_deadline assert config.tls is tls From 3c3e9c86e7111fc165eebd650453069a0e8f4dae Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Wed, 18 Dec 2024 07:38:46 +0100 Subject: [PATCH 02/14] feat(flagd): use test-harness version number for integration tests (#120) * feat(flagd): use test-harness version number for integration tests Signed-off-by: Simon Schrottner * fixup: migrating to pathlib Signed-off-by: Simon Schrottner --------- Signed-off-by: Simon Schrottner --- .gitmodules | 1 + .../openfeature-provider-flagd/openfeature/test-harness | 2 +- .../tests/e2e/flagd_container.py | 7 +++++-- providers/openfeature-provider-flagd/tests/e2e/test_rpc.py | 2 +- .../tests/e2e/test_rpc_reconnect.py | 2 +- 5 files changed, 9 insertions(+), 5 deletions(-) diff --git a/.gitmodules b/.gitmodules index 466f2596..8b2e2518 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,6 +4,7 @@ [submodule "providers/openfeature-provider-flagd/test-harness"] path = providers/openfeature-provider-flagd/openfeature/test-harness url = git@github.com:open-feature/flagd-testbed.git + branch = v0.5.18 [submodule "providers/openfeature-provider-flagd/spec"] path = providers/openfeature-provider-flagd/openfeature/spec url = https://github.com/open-feature/spec diff --git a/providers/openfeature-provider-flagd/openfeature/test-harness b/providers/openfeature-provider-flagd/openfeature/test-harness index e132d258..e908fb7d 160000 --- a/providers/openfeature-provider-flagd/openfeature/test-harness +++ b/providers/openfeature-provider-flagd/openfeature/test-harness @@ -1 +1 @@ -Subproject commit e132d25822eaad367f81cf2a06b422edac32a76d +Subproject commit e908fb7d19def6a4768ca90b02665075bbc1afbb diff --git a/providers/openfeature-provider-flagd/tests/e2e/flagd_container.py b/providers/openfeature-provider-flagd/tests/e2e/flagd_container.py index e80fb0f7..31045759 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/flagd_container.py +++ b/providers/openfeature-provider-flagd/tests/e2e/flagd_container.py @@ -1,4 +1,5 @@ import time +from pathlib import Path import grpc from grpc_health.v1 import health_pb2, health_pb2_grpc @@ -11,11 +12,13 @@ class FlagdContainer(DockerContainer): def __init__( self, - image: str = "ghcr.io/open-feature/flagd-testbed:v0.5.15", + image: str = "ghcr.io/open-feature/flagd-testbed", port: int = 8013, **kwargs, ) -> None: - super().__init__(image, **kwargs) + path = Path(__file__).parents[2] / "openfeature/test-harness/version.txt" + data = path.read_text().rstrip() + super().__init__(f"{image}:v{data}", **kwargs) self.port = port self.with_exposed_ports(self.port, HEALTH_CHECK) diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_rpc.py b/providers/openfeature-provider-flagd/tests/e2e/test_rpc.py index 3fefb300..e3508cf1 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_rpc.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_rpc.py @@ -22,7 +22,7 @@ def port(): @pytest.fixture(autouse=True, scope="module") def image(): - return "ghcr.io/open-feature/flagd-testbed:v0.5.13" + return "ghcr.io/open-feature/flagd-testbed" scenarios( diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py b/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py index f56e82b7..36f526e2 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py @@ -22,7 +22,7 @@ def port(): @pytest.fixture(autouse=True, scope="module") def image(): - return "ghcr.io/open-feature/flagd-testbed-unstable:v0.5.13" + return "ghcr.io/open-feature/flagd-testbed-unstable" scenarios( From 0b749b5dc8356c7fd12f1cd2884c9ae21f4fe92d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anton=20Gr=C3=BCbel?= Date: Sun, 22 Dec 2024 17:24:17 +0100 Subject: [PATCH 03/14] ci: add renovate config (#123) add renovate config Signed-off-by: gruebel --- .pre-commit-config.yaml | 2 +- renovate.json | 30 ++++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 renovate.json diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9b08fdba..dd627d73 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,7 +1,7 @@ default_stages: [commit] repos: - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.6.3 + rev: v0.8.4 hooks: - id: ruff args: [--fix] diff --git a/renovate.json b/renovate.json new file mode 100644 index 00000000..f29e38c2 --- /dev/null +++ b/renovate.json @@ -0,0 +1,30 @@ +{ + "$schema": "https://docs.renovatebot.com/renovate-schema.json", + "extends": [ + "config:base" + ], + "semanticCommits": "enabled", + "pep621": { + "enabled": true + }, + "pre-commit": { + "enabled": true + }, + "packageRules": [ + { + "description": "Automerge non-major updates", + "matchUpdateTypes": [ + "minor", + "patch" + ], + "matchCurrentVersion": "!/^0/", + "automerge": true + }, + { + "matchManagers": [ + "github-actions" + ], + "automerge": true + } + ] +} From 8ac7ab784ee636a046e6581aaa0b775c0aeb871e Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Sun, 22 Dec 2024 17:29:49 +0100 Subject: [PATCH 04/14] chore(deps): update codecov/codecov-action action to v4.6.0 (#124) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 6b153cb0..059c0bfb 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -60,7 +60,7 @@ jobs: - if: matrix.python-version == '3.11' name: Upload coverage to Codecov - uses: codecov/codecov-action@v4.5.0 + uses: codecov/codecov-action@v4.6.0 with: name: Code Coverage for ${{ matrix.package }} on Python ${{ matrix.python-version }} directory: ${{ matrix.package }} From 4e75a366468ab0f588031587a7224d16ae6cd0c6 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Sun, 22 Dec 2024 20:25:09 +0000 Subject: [PATCH 05/14] chore(deps): update dependency grpcio-health-checking to v1.68.1 (#125) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- providers/openfeature-provider-flagd/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/openfeature-provider-flagd/pyproject.toml b/providers/openfeature-provider-flagd/pyproject.toml index 738ba02a..bca7f949 100644 --- a/providers/openfeature-provider-flagd/pyproject.toml +++ b/providers/openfeature-provider-flagd/pyproject.toml @@ -40,7 +40,7 @@ dependencies = [ "pytest-bdd", "testcontainers", "asserts", - "grpcio-health-checking==1.60.0", + "grpcio-health-checking==1.68.1", ] pre-install-commands = [ "hatch build", From d92e8c6635667d806bca6442181766d9846e5839 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Sun, 22 Dec 2024 22:30:07 +0000 Subject: [PATCH 06/14] chore(deps): update python docker tag to v3.13 (#127) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- .github/workflows/release.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 497ddb1d..9f45b47b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -46,7 +46,7 @@ jobs: # IMPORTANT: this permission is mandatory for trusted publishing to pypi id-token: write container: - image: "python:3.12" + image: "python:3.13" steps: - uses: actions/checkout@v4 From 4e7b0e56d2732f81c53bc5fabd1bb84bb52ea526 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Sun, 22 Dec 2024 23:47:27 +0100 Subject: [PATCH 07/14] chore(deps): update pre-commit hook pre-commit/pre-commit-hooks to v5 (#129) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index dd627d73..16eb005e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,7 +8,7 @@ repos: - id: ruff-format - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.6.0 + rev: v5.0.0 hooks: - id: check-toml - id: check-yaml From f0118f0cdd78724a505fa56bb7d31db3145a3582 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 23 Dec 2024 00:33:14 +0000 Subject: [PATCH 08/14] chore(deps): update codecov/codecov-action action to v5 (#128) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 059c0bfb..0755a76a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -60,7 +60,7 @@ jobs: - if: matrix.python-version == '3.11' name: Upload coverage to Codecov - uses: codecov/codecov-action@v4.6.0 + uses: codecov/codecov-action@v5.1.2 with: name: Code Coverage for ${{ matrix.package }} on Python ${{ matrix.python-version }} directory: ${{ matrix.package }} From f156ea5d6288c1ab51e898acb928d13fade69aa6 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Thu, 26 Dec 2024 21:48:21 +0100 Subject: [PATCH 09/14] chore(config): migrate renovate config (#130) chore(config): migrate config renovate.json Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- renovate.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/renovate.json b/renovate.json index f29e38c2..6c501126 100644 --- a/renovate.json +++ b/renovate.json @@ -1,7 +1,7 @@ { "$schema": "https://docs.renovatebot.com/renovate-schema.json", "extends": [ - "config:base" + "config:recommended" ], "semanticCommits": "enabled", "pep621": { From 8e23a700244a85291671b41083b1be82670cf79d Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Fri, 27 Dec 2024 13:03:30 +0100 Subject: [PATCH 10/14] feat: attempts with connection improvements (#118) * feat(flagd-rpc): add caching with tests Signed-off-by: Simon Schrottner * fixup: using new test-harness Signed-off-by: Simon Schrottner * fixup(flagd): remove merge conflict error as stated by warber Signed-off-by: Simon Schrottner * feat(flagd): add graceful attempts Signed-off-by: Simon Schrottner * feat(flagd): add graceful attempts Signed-off-by: Simon Schrottner * feat: better reconnect gherkins Signed-off-by: Simon Schrottner * fixup: unblock Signed-off-by: Simon Schrottner * fixup: incuberating feedback from pr review Signed-off-by: Simon Schrottner * fixup: incuberating feedback from pr review Signed-off-by: Simon Schrottner --------- Signed-off-by: Simon Schrottner --- .../openfeature/test-harness | 2 +- .../contrib/provider/flagd/config.py | 16 +- .../contrib/provider/flagd/provider.py | 4 +- .../contrib/provider/flagd/resolvers/grpc.py | 149 ++++++++++-------- .../flagd/resolvers/process/file_watcher.py | 4 +- .../tests/e2e/conftest.py | 21 --- .../tests/e2e/steps.py | 65 +++++++- .../tests/e2e/test_config.py | 9 +- .../tests/e2e/test_in_process_file.py | 5 + 9 files changed, 166 insertions(+), 109 deletions(-) diff --git a/providers/openfeature-provider-flagd/openfeature/test-harness b/providers/openfeature-provider-flagd/openfeature/test-harness index e908fb7d..706a7e95 160000 --- a/providers/openfeature-provider-flagd/openfeature/test-harness +++ b/providers/openfeature-provider-flagd/openfeature/test-harness @@ -1 +1 @@ -Subproject commit e908fb7d19def6a4768ca90b02665075bbc1afbb +Subproject commit 706a7e951bb72a145523b38fe83060becc34c4d7 diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py index bcd4da85..1eb37463 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py @@ -26,7 +26,7 @@ class CacheType(Enum): DEFAULT_RESOLVER_TYPE = ResolverType.RPC DEFAULT_RETRY_BACKOFF = 1000 DEFAULT_RETRY_BACKOFF_MAX = 120000 -DEFAULT_RETRY_GRACE_ATTEMPTS = 5 +DEFAULT_RETRY_GRACE_PERIOD_SECONDS = 5 DEFAULT_STREAM_DEADLINE = 600000 DEFAULT_TLS = False @@ -41,7 +41,7 @@ class CacheType(Enum): ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER" ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS" ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS" -ENV_VAR_RETRY_GRACE_ATTEMPTS = "FLAGD_RETRY_GRACE_ATTEMPTS" +ENV_VAR_RETRY_GRACE_PERIOD_SECONDS = "FLAGD_RETRY_GRACE_PERIOD" ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS" ENV_VAR_TLS = "FLAGD_TLS" @@ -81,7 +81,7 @@ def __init__( # noqa: PLR0913 offline_poll_interval_ms: typing.Optional[int] = None, retry_backoff_ms: typing.Optional[int] = None, retry_backoff_max_ms: typing.Optional[int] = None, - retry_grace_attempts: typing.Optional[int] = None, + retry_grace_period: typing.Optional[int] = None, deadline_ms: typing.Optional[int] = None, stream_deadline_ms: typing.Optional[int] = None, keep_alive_time: typing.Optional[int] = None, @@ -115,14 +115,16 @@ def __init__( # noqa: PLR0913 else retry_backoff_max_ms ) - self.retry_grace_attempts: int = ( + self.retry_grace_period: int = ( int( env_or_default( - ENV_VAR_RETRY_GRACE_ATTEMPTS, DEFAULT_RETRY_GRACE_ATTEMPTS, cast=int + ENV_VAR_RETRY_GRACE_PERIOD_SECONDS, + DEFAULT_RETRY_GRACE_PERIOD_SECONDS, + cast=int, ) ) - if retry_grace_attempts is None - else retry_grace_attempts + if retry_grace_period is None + else retry_grace_period ) self.resolver = ( diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index 07e148e1..dd8beeab 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -53,7 +53,7 @@ def __init__( # noqa: PLR0913 cache_type: typing.Optional[CacheType] = None, max_cache_size: typing.Optional[int] = None, retry_backoff_max_ms: typing.Optional[int] = None, - retry_grace_attempts: typing.Optional[int] = None, + retry_grace_period: typing.Optional[int] = None, ): """ Create an instance of the FlagdProvider @@ -84,7 +84,7 @@ def __init__( # noqa: PLR0913 deadline_ms=deadline, retry_backoff_ms=retry_backoff_ms, retry_backoff_max_ms=retry_backoff_max_ms, - retry_grace_attempts=retry_grace_attempts, + retry_grace_period=retry_grace_period, resolver=resolver_type, offline_flag_source_path=offline_flag_source_path, stream_deadline_ms=stream_deadline_ms, diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py index 8dfda518..5841912c 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py @@ -7,6 +7,7 @@ from cachebox import BaseCacheImpl, LRUCache from google.protobuf.json_format import MessageToDict from google.protobuf.struct_pb2 import Struct +from grpc import ChannelConnectivity from openfeature.evaluation_context import EvaluationContext from openfeature.event import ProviderEventDetails @@ -47,6 +48,7 @@ def __init__( [ProviderEventDetails], None ], ): + self.active = False self.config = config self.emit_provider_ready = emit_provider_ready self.emit_provider_error = emit_provider_error @@ -57,26 +59,30 @@ def __init__( if self.config.cache == CacheType.LRU else None ) - self.stub, self.channel = self._create_stub() - self.retry_backoff_seconds = config.retry_backoff_ms * 0.001 - self.retry_backoff_max_seconds = config.retry_backoff_max_ms * 0.001 - self.retry_grace_attempts = config.retry_grace_attempts + + self.retry_grace_period = config.retry_grace_period self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001 self.deadline = config.deadline_ms * 0.001 self.connected = False - - def _create_stub( - self, - ) -> typing.Tuple[evaluation_pb2_grpc.ServiceStub, grpc.Channel]: - config = self.config channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel - channel = channel_factory( + + # Create the channel with the service config + options = [ + ("grpc.keepalive_time_ms", config.keep_alive_time), + ("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms), + ("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms), + ("grpc.min_reconnect_backoff_ms", config.deadline_ms), + ] + self.channel = channel_factory( f"{config.host}:{config.port}", - options=(("grpc.keepalive_time_ms", config.keep_alive_time),), + options=options, ) - stub = evaluation_pb2_grpc.ServiceStub(channel) + self.stub = evaluation_pb2_grpc.ServiceStub(self.channel) - return stub, channel + self.thread: typing.Optional[threading.Thread] = None + self.timer: typing.Optional[threading.Timer] = None + + self.start_time = time.time() def initialize(self, evaluation_context: EvaluationContext) -> None: self.connect() @@ -89,11 +95,12 @@ def shutdown(self) -> None: def connect(self) -> None: self.active = True - self.thread = threading.Thread( - target=self.listen, daemon=True, name="FlagdGrpcServiceWorkerThread" - ) - self.thread.start() + # Run monitoring in a separate thread + self.monitor_thread = threading.Thread( + target=self.monitor, daemon=True, name="FlagdGrpcServiceMonitorThread" + ) + self.monitor_thread.start() ## block until ready or deadline reached timeout = self.deadline + time.time() while not self.connected and time.time() < timeout: @@ -105,32 +112,72 @@ def connect(self) -> None: "Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations." ) + def monitor(self) -> None: + self.channel.subscribe(self._state_change_callback, try_to_connect=True) + + def _state_change_callback(self, new_state: ChannelConnectivity) -> None: + logger.debug(f"gRPC state change: {new_state}") + if new_state == ChannelConnectivity.READY: + if not self.thread or not self.thread.is_alive(): + self.thread = threading.Thread( + target=self.listen, + daemon=True, + name="FlagdGrpcServiceWorkerThread", + ) + self.thread.start() + + if self.timer and self.timer.is_alive(): + logger.debug("gRPC error timer expired") + self.timer.cancel() + + elif new_state == ChannelConnectivity.TRANSIENT_FAILURE: + # this is the failed reconnect attempt so we are going into stale + self.emit_provider_stale( + ProviderEventDetails( + message="gRPC sync disconnected, reconnecting", + ) + ) + self.start_time = time.time() + # adding a timer, so we can emit the error event after time + self.timer = threading.Timer(self.retry_grace_period, self.emit_error) + + logger.debug("gRPC error timer started") + self.timer.start() + self.connected = False + + def emit_error(self) -> None: + logger.debug("gRPC error emitted") + if self.cache: + self.cache.clear() + self.emit_provider_error( + ProviderEventDetails( + message="gRPC sync disconnected, reconnecting", + error_code=ErrorCode.GENERAL, + ) + ) + def listen(self) -> None: - retry_delay = self.retry_backoff_seconds + logger.debug("gRPC starting listener thread") call_args = ( {"timeout": self.streamline_deadline_seconds} if self.streamline_deadline_seconds > 0 else {} ) - retry_counter = 0 - while self.active: - request = evaluation_pb2.EventStreamRequest() + call_args["wait_for_ready"] = True + request = evaluation_pb2.EventStreamRequest() + # defining a never ending loop to recreate the stream + while self.active: try: logger.debug("Setting up gRPC sync flags connection") for message in self.stub.EventStream(request, **call_args): if message.type == "provider_ready": - if not self.connected: - self.emit_provider_ready( - ProviderEventDetails( - message="gRPC sync connection established" - ) + self.connected = True + self.emit_provider_ready( + ProviderEventDetails( + message="gRPC sync connection established" ) - self.connected = True - retry_counter = 0 - # reset retry delay after successsful read - retry_delay = self.retry_backoff_seconds - + ) elif message.type == "configuration_change": data = MessageToDict(message)["data"] self.handle_changed_flags(data) @@ -138,48 +185,14 @@ def listen(self) -> None: if not self.active: logger.info("Terminating gRPC sync thread") return - except grpc.RpcError as e: - logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}") - # re-create the stub if there's a connection issue - otherwise reconnect does not work as expected - self.stub, self.channel = self._create_stub() + except grpc.RpcError as e: # noqa: PERF203 + # although it seems like this error log is not interesting, without it, the retry is not working as expected + logger.debug(f"SyncFlags stream error, {e.code()=} {e.details()=}") except ParseError: logger.exception( f"Could not parse flag data using flagd syntax: {message=}" ) - self.connected = False - self.on_connection_error(retry_counter, retry_delay) - - retry_delay = self.handle_retry(retry_counter, retry_delay) - - retry_counter = retry_counter + 1 - - def handle_retry(self, retry_counter: int, retry_delay: float) -> float: - if retry_counter == 0: - logger.info("gRPC sync disconnected, reconnecting immediately") - else: - logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s") - time.sleep(retry_delay) - retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds) - return retry_delay - - def on_connection_error(self, retry_counter: int, retry_delay: float) -> None: - if retry_counter == self.retry_grace_attempts: - if self.cache: - self.cache.clear() - self.emit_provider_error( - ProviderEventDetails( - message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", - error_code=ErrorCode.GENERAL, - ) - ) - elif retry_counter == 1: - self.emit_provider_stale( - ProviderEventDetails( - message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", - ) - ) - def handle_changed_flags(self, data: typing.Any) -> None: changed_flags = list(data["flags"].keys()) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py index 0918981f..74835f94 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py @@ -31,17 +31,19 @@ def __init__( self.last_modified = 0.0 self.flag_data: typing.Mapping[str, Flag] = {} self.load_data() + self.active = True self.thread = threading.Thread(target=self.refresh_file, daemon=True) self.thread.start() def shutdown(self) -> None: + self.active = False pass def get_flag(self, key: str) -> typing.Optional[Flag]: return self.flag_data.get(key) def refresh_file(self) -> None: - while True: + while self.active: time.sleep(self.poll_interval_seconds) logger.debug("checking for new flag store contents from file") last_modified = os.path.getmtime(self.file_path) diff --git a/providers/openfeature-provider-flagd/tests/e2e/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/conftest.py index 077e6926..e80eb15b 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/conftest.py @@ -1,8 +1,5 @@ import typing -import pytest -from testcontainers.core.container import DockerContainer -from tests.e2e.flagd_container import FlagdContainer from tests.e2e.steps import * # noqa: F403 JsonPrimitive = typing.Union[str, bool, float, int] @@ -18,21 +15,3 @@ def pytest_collection_modifyitems(config): # this seems to not work with python 3.8 if hasattr(config.option, "markexpr") and config.option.markexpr == "": config.option.markexpr = marker - - -@pytest.fixture(autouse=True, scope="module") -def setup(request, port, image): - container: DockerContainer = FlagdContainer( - image=image, - port=port, - ) - # Setup code - c = container.start() - - def fin(): - c.stop() - - # Teardown code - request.addfinalizer(fin) - - return c.get_exposed_port(port) diff --git a/providers/openfeature-provider-flagd/tests/e2e/steps.py b/providers/openfeature-provider-flagd/tests/e2e/steps.py index c81c8871..b1325aff 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/steps.py +++ b/providers/openfeature-provider-flagd/tests/e2e/steps.py @@ -1,10 +1,13 @@ import logging +import threading import time import typing import pytest from asserts import assert_equal, assert_in, assert_not_equal, assert_true from pytest_bdd import given, parsers, then, when +from testcontainers.core.container import DockerContainer +from tests.e2e.flagd_container import FlagdContainer from tests.e2e.parsers import to_bool, to_list from openfeature import api @@ -26,9 +29,21 @@ def evaluation_context() -> EvaluationContext: @given("a flagd provider is set", target_fixture="client") @given("a provider is registered", target_fixture="client") -def setup_provider(setup, resolver_type, client_name) -> OpenFeatureClient: +def setup_provider( + container: FlagdContainer, resolver_type, client_name, port +) -> OpenFeatureClient: + try: + container.get_exposed_port(port) + except: # noqa: E722 + container.start() + api.set_provider( - FlagdProvider(resolver_type=resolver_type, port=setup, timeout=1), + FlagdProvider( + resolver_type=resolver_type, + port=int(container.get_exposed_port(port)), + timeout=1, + retry_grace_period=3, + ), client_name, ) client = api.get_client(client_name) @@ -517,6 +532,12 @@ def error_handles() -> list: return [] +@given( + parsers.cfparse( + "a {event_type:ProviderEvent} handler is added", + extra_types={"ProviderEvent": ProviderEvent}, + ), +) @when( parsers.cfparse( "a {event_type:ProviderEvent} handler is added", @@ -631,7 +652,10 @@ def assert_disconnect_error( def assert_flag_changed(event_handles, key): handle = None for h in event_handles: - if h["type"] == ProviderEvent.PROVIDER_CONFIGURATION_CHANGED: + if ( + h["type"] == ProviderEvent.PROVIDER_CONFIGURATION_CHANGED + and key in h["event"].flags_changed + ): handle = h break @@ -650,10 +674,7 @@ def wait_for(pred, poll_sec=2, timeout_sec=10): @given("flagd is unavailable", target_fixture="client") def flagd_unavailable(resolver_type): api.set_provider( - FlagdProvider( - resolver_type=resolver_type, - port=99999, - ), + FlagdProvider(resolver_type=resolver_type, port=99999, retry_grace_period=2), "unavailable", ) return api.get_client("unavailable") @@ -668,3 +689,33 @@ def flagd_init(client: OpenFeatureClient, event_handles, error_handles): @then("an error should be indicated within the configured deadline") def flagd_error(error_handles): assert_handlers(error_handles, ProviderEvent.PROVIDER_ERROR) + + +@when(parsers.cfparse("the connection is lost for {seconds}s")) +def flagd_restart(seconds, container): + def starting(): + container.start() + + container.stop() + threading.Timer(int(seconds), starting).start() + + +@pytest.fixture(autouse=True, scope="module") +def container(request, port, image): + container: DockerContainer = FlagdContainer( + image=image, + port=port, + ) + # Setup code + container = container.start() + + def fin(): + try: + container.stop() + except: # noqa: E722 + logging.debug("container was not running anymore") + + # Teardown code + request.addfinalizer(fin) + + return container diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_config.py b/providers/openfeature-provider-flagd/tests/e2e/test_config.py index dc413aee..238112d2 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_config.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_config.py @@ -37,8 +37,13 @@ def convert_resolver_type(val: typing.Union[str, ResolverType]) -> ResolverType: } -@pytest.fixture(autouse=True, scope="module") -def setup(request): +@pytest.fixture(autouse=True) +def container(): + pass + + +@pytest.fixture(autouse=True) +def setup_provider(request): pass diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_in_process_file.py b/providers/openfeature-provider-flagd/tests/e2e/test_in_process_file.py index f73dc990..278bd1ea 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_in_process_file.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_in_process_file.py @@ -62,6 +62,11 @@ def resolver_type() -> ResolverType: return ResolverType.IN_PROCESS +@pytest.fixture(autouse=True) +def container(): + pass + + @pytest.fixture(autouse=True, scope="module") def setup(request, client_name, file_name, resolver_type): """nothing to boot""" From f6431e6a956f9165861a84f692a6a4b61b248ddf Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Fri, 27 Dec 2024 14:11:14 +0100 Subject: [PATCH 11/14] build(renovate): Utilize default OpenFeature Renovate configuration (#132) We do have a default OpenFeature Renovate configuration within our community-tooling repository. (https://github.com/open-feature/community-tooling/blob/main/renovate.json) To reduce maintenance efforts, we should stick to the general one as a basis. Signed-off-by: Simon Schrottner --- renovate.json | 24 ++---------------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/renovate.json b/renovate.json index 6c501126..28b2e68b 100644 --- a/renovate.json +++ b/renovate.json @@ -1,30 +1,10 @@ { "$schema": "https://docs.renovatebot.com/renovate-schema.json", - "extends": [ - "config:recommended" - ], - "semanticCommits": "enabled", + "extends": ["github>open-feature/community-tooling"], "pep621": { "enabled": true }, "pre-commit": { "enabled": true - }, - "packageRules": [ - { - "description": "Automerge non-major updates", - "matchUpdateTypes": [ - "minor", - "patch" - ], - "matchCurrentVersion": "!/^0/", - "automerge": true - }, - { - "matchManagers": [ - "github-actions" - ], - "automerge": true - } - ] + } } From f50351a0435064111fb98753a49139fafa8307e6 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Fri, 27 Dec 2024 14:48:04 +0100 Subject: [PATCH 12/14] feat(flagd): add custom cert path (#131) feat(flagd): add ssl cert path option Signed-off-by: Simon Schrottner --- .../openfeature-provider-flagd/README.md | 24 +++---- .../contrib/provider/flagd/config.py | 9 +++ .../contrib/provider/flagd/provider.py | 2 + .../contrib/provider/flagd/resolvers/grpc.py | 36 +++++++--- .../tests/e2e/test_rpc_ssl.py | 68 +++++++++++++++++++ 5 files changed, 117 insertions(+), 22 deletions(-) create mode 100644 providers/openfeature-provider-flagd/tests/e2e/test_rpc_ssl.py diff --git a/providers/openfeature-provider-flagd/README.md b/providers/openfeature-provider-flagd/README.md index 6833da0c..8463700a 100644 --- a/providers/openfeature-provider-flagd/README.md +++ b/providers/openfeature-provider-flagd/README.md @@ -47,11 +47,12 @@ api.set_provider(FlagdProvider( The default options can be defined in the FlagdProvider constructor. | Option name | Environment variable name | Type & Values | Default | Compatible resolver | -| ------------------------ | ------------------------------ | -------------------------- | ----------------------------- | ------------------- | +|--------------------------|--------------------------------|----------------------------|-------------------------------|---------------------| | resolver_type | FLAGD_RESOLVER | enum - `rpc`, `in-process` | rpc | | | host | FLAGD_HOST | str | localhost | rpc & in-process | | port | FLAGD_PORT | int | 8013 (rpc), 8015 (in-process) | rpc & in-process | | tls | FLAGD_TLS | bool | false | rpc & in-process | +| cert_path | FLAGD_SERVER_CERT_PATH | String | null | rpc & in-process | | deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process | | stream_deadline_ms | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process | | keep_alive_time | FLAGD_KEEP_ALIVE_TIME_MS | int | 0 | rpc & in-process | @@ -64,8 +65,6 @@ The default options can be defined in the FlagdProvider constructor. @@ -100,17 +99,18 @@ and the evaluation will default. TLS is available in situations where flagd is running on another host. - ## License diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py index 1eb37463..68efb579 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py @@ -29,6 +29,7 @@ class CacheType(Enum): DEFAULT_RETRY_GRACE_PERIOD_SECONDS = 5 DEFAULT_STREAM_DEADLINE = 600000 DEFAULT_TLS = False +DEFAULT_TLS_CERT: typing.Optional[str] = None ENV_VAR_CACHE_SIZE = "FLAGD_MAX_CACHE_SIZE" ENV_VAR_CACHE_TYPE = "FLAGD_CACHE" @@ -44,6 +45,7 @@ class CacheType(Enum): ENV_VAR_RETRY_GRACE_PERIOD_SECONDS = "FLAGD_RETRY_GRACE_PERIOD" ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS" ENV_VAR_TLS = "FLAGD_TLS" +ENV_VAR_TLS_CERT = "FLAGD_SERVER_CERT_PATH" T = typing.TypeVar("T") @@ -87,6 +89,7 @@ def __init__( # noqa: PLR0913 keep_alive_time: typing.Optional[int] = None, cache: typing.Optional[CacheType] = None, max_cache_size: typing.Optional[int] = None, + cert_path: typing.Optional[str] = None, ): self.host = env_or_default(ENV_VAR_HOST, DEFAULT_HOST) if host is None else host @@ -200,3 +203,9 @@ def __init__( # noqa: PLR0913 if max_cache_size is None else max_cache_size ) + + self.cert_path = ( + env_or_default(ENV_VAR_TLS_CERT, DEFAULT_TLS_CERT) + if cert_path is None + else cert_path + ) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index dd8beeab..588e9c27 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -54,6 +54,7 @@ def __init__( # noqa: PLR0913 max_cache_size: typing.Optional[int] = None, retry_backoff_max_ms: typing.Optional[int] = None, retry_grace_period: typing.Optional[int] = None, + cert_path: typing.Optional[str] = None, ): """ Create an instance of the FlagdProvider @@ -91,6 +92,7 @@ def __init__( # noqa: PLR0913 keep_alive_time=keep_alive_time, cache=cache_type, max_cache_size=max_cache_size, + cert_path=cert_path, ) self.resolver = self.setup_resolver() diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py index 5841912c..cb345776 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py @@ -64,8 +64,16 @@ def __init__( self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001 self.deadline = config.deadline_ms * 0.001 self.connected = False - channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel + self.channel = self._generate_channel(config) + self.stub = evaluation_pb2_grpc.ServiceStub(self.channel) + + self.thread: typing.Optional[threading.Thread] = None + self.timer: typing.Optional[threading.Timer] = None + self.start_time = time.time() + + def _generate_channel(self, config: Config) -> grpc.Channel: + target = f"{config.host}:{config.port}" # Create the channel with the service config options = [ ("grpc.keepalive_time_ms", config.keep_alive_time), @@ -73,16 +81,24 @@ def __init__( ("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms), ("grpc.min_reconnect_backoff_ms", config.deadline_ms), ] - self.channel = channel_factory( - f"{config.host}:{config.port}", - options=options, - ) - self.stub = evaluation_pb2_grpc.ServiceStub(self.channel) - - self.thread: typing.Optional[threading.Thread] = None - self.timer: typing.Optional[threading.Timer] = None + if config.tls: + channel_args = { + "options": options, + "credentials": grpc.ssl_channel_credentials(), + } + if config.cert_path: + with open(config.cert_path, "rb") as f: + channel_args["credentials"] = grpc.ssl_channel_credentials(f.read()) + + channel = grpc.secure_channel(target, **channel_args) + + else: + channel = grpc.insecure_channel( + target, + options=options, + ) - self.start_time = time.time() + return channel def initialize(self, evaluation_context: EvaluationContext) -> None: self.connect() diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_rpc_ssl.py b/providers/openfeature-provider-flagd/tests/e2e/test_rpc_ssl.py new file mode 100644 index 00000000..3a3214b3 --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/test_rpc_ssl.py @@ -0,0 +1,68 @@ +from pathlib import Path + +import pytest +from pytest_bdd import given, scenarios +from tests.e2e.conftest import SPEC_PATH +from tests.e2e.flagd_container import FlagdContainer +from tests.e2e.steps import wait_for + +from openfeature import api +from openfeature.client import OpenFeatureClient +from openfeature.contrib.provider.flagd import FlagdProvider +from openfeature.contrib.provider.flagd.config import ResolverType +from openfeature.provider import ProviderStatus + + +@pytest.fixture(autouse=True, scope="module") +def client_name() -> str: + return "rpc" + + +@pytest.fixture(autouse=True, scope="module") +def resolver_type() -> ResolverType: + return ResolverType.RPC + + +@pytest.fixture(autouse=True, scope="module") +def port(): + return 8013 + + +@pytest.fixture(autouse=True, scope="module") +def image(): + return "ghcr.io/open-feature/flagd-testbed-ssl" + + +@given("a flagd provider is set", target_fixture="client") +@given("a provider is registered", target_fixture="client") +def setup_provider( + container: FlagdContainer, resolver_type, client_name, port +) -> OpenFeatureClient: + try: + container.get_exposed_port(port) + except: # noqa: E722 + container.start() + + path = ( + Path(__file__).parents[2] / "openfeature/test-harness/ssl/custom-root-cert.crt" + ) + + api.set_provider( + FlagdProvider( + resolver_type=resolver_type, + port=int(container.get_exposed_port(port)), + timeout=1, + retry_grace_period=3, + tls=True, + cert_path=str(path.absolute()), + ), + client_name, + ) + client = api.get_client(client_name) + wait_for(lambda: client.get_provider_status() == ProviderStatus.READY) + return client + + +scenarios( + f"{SPEC_PATH}/specification/assets/gherkin/evaluation.feature", +) From a2a0ba0d9a59c763829fa630fdc2f28b93b2f037 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Fri, 27 Dec 2024 16:13:20 +0100 Subject: [PATCH 13/14] chore(deps): update providers/openfeature-provider-flagd/openfeature/schemas digest to b81a56e (#134) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- providers/openfeature-provider-flagd/openfeature/schemas | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/openfeature-provider-flagd/openfeature/schemas b/providers/openfeature-provider-flagd/openfeature/schemas index 76d611fd..b81a56ee 160000 --- a/providers/openfeature-provider-flagd/openfeature/schemas +++ b/providers/openfeature-provider-flagd/openfeature/schemas @@ -1 +1 @@ -Subproject commit 76d611fd94689d906af316105ac12670d40f7648 +Subproject commit b81a56eea3b2c4c543a50d4f7f79a8f32592a0af From 9dcb6a5a73f53e69172a506ff426a2482c44ae92 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Fri, 27 Dec 2024 16:17:24 +0100 Subject: [PATCH 14/14] chore(deps): pin dependencies (#133) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- .github/workflows/build.yml | 18 +++++++++--------- .github/workflows/lint-pr.yml | 6 +++--- .github/workflows/release.yml | 4 ++-- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 0755a76a..3079e444 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -32,12 +32,12 @@ jobs: - "providers/openfeature-provider-ofrep" steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4 with: submodules: recursive - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 + uses: actions/setup-python@0b93645e9fea7318ecaed2b359559ac225c90a2b # v5 with: python-version: ${{ matrix.python-version }} cache: "pip" @@ -60,7 +60,7 @@ jobs: - if: matrix.python-version == '3.11' name: Upload coverage to Codecov - uses: codecov/codecov-action@v5.1.2 + uses: codecov/codecov-action@1e68e06f1dbfde0e4cefc87efeba9e4643565303 # v5.1.2 with: name: Code Coverage for ${{ matrix.package }} on Python ${{ matrix.python-version }} directory: ${{ matrix.package }} @@ -72,14 +72,14 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4 + - uses: actions/setup-python@0b93645e9fea7318ecaed2b359559ac225c90a2b # v5 with: python-version: "3.11" cache: "pip" - name: Run pre-commit - uses: pre-commit/action@v3.0.1 + uses: pre-commit/action@2c7b3805fd2a0fd8c1884dcaebf91fc102a13ecd # v3.0.1 sast: runs-on: ubuntu-latest @@ -88,13 +88,13 @@ jobs: contents: read security-events: write steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4 - name: Initialize CodeQL - uses: github/codeql-action/init@v3 + uses: github/codeql-action/init@48ab28a6f5dbc2a99bf1e0131198dd8f1df78169 # v3 with: languages: python config-file: ./.github/codeql-config.yml - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v3 + uses: github/codeql-action/analyze@48ab28a6f5dbc2a99bf1e0131198dd8f1df78169 # v3 diff --git a/.github/workflows/lint-pr.yml b/.github/workflows/lint-pr.yml index c2f9a9aa..304758cc 100644 --- a/.github/workflows/lint-pr.yml +++ b/.github/workflows/lint-pr.yml @@ -20,12 +20,12 @@ jobs: name: Validate PR title runs-on: ubuntu-latest steps: - - uses: amannn/action-semantic-pull-request@v5 + - uses: amannn/action-semantic-pull-request@0723387faaf9b38adef4775cd42cfd5155ed6017 # v5 id: lint_pr_title env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - uses: marocchino/sticky-pull-request-comment@v2 + - uses: marocchino/sticky-pull-request-comment@331f8f5b4215f0445d3c07b4967662a32a2d3e31 # v2 # When the previous steps fails, the workflow would stop. By adding this # condition you can continue the execution with the populated error message. if: always() && (steps.lint_pr_title.outputs.error_message != null) @@ -44,7 +44,7 @@ jobs: # Delete a previous comment when the issue has been resolved - if: ${{ steps.lint_pr_title.outputs.error_message == null }} - uses: marocchino/sticky-pull-request-comment@v2 + uses: marocchino/sticky-pull-request-comment@331f8f5b4215f0445d3c07b4967662a32a2d3e31 # v2 with: header: pr-title-lint-error delete: true diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 9f45b47b..8c009f96 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -19,7 +19,7 @@ jobs: pull-requests: write # for googleapis/release-please-action to create release PR # Release-please creates a PR that tracks all changes steps: - - uses: googleapis/release-please-action@v4 + - uses: googleapis/release-please-action@7987652d64b4581673a76e33ad5e98e3dd56832f # v4 id: release with: token: ${{secrets.GITHUB_TOKEN}} @@ -49,7 +49,7 @@ jobs: image: "python:3.13" steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4 with: submodules: recursive