Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into feat/gherkinmigration
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Schrottner <[email protected]>
  • Loading branch information
aepfli committed Dec 27, 2024
2 parents 67a6850 + f50351a commit 7d5b35f
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 92 deletions.
24 changes: 12 additions & 12 deletions providers/openfeature-provider-flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ api.set_provider(FlagdProvider(
The default options can be defined in the FlagdProvider constructor.

| Option name | Environment variable name | Type & Values | Default | Compatible resolver |
| ------------------------ | ------------------------------ | -------------------------- | ----------------------------- | ------------------- |
|--------------------------|--------------------------------|----------------------------|-------------------------------|---------------------|
| resolver_type | FLAGD_RESOLVER | enum - `rpc`, `in-process` | rpc | |
| host | FLAGD_HOST | str | localhost | rpc & in-process |
| port | FLAGD_PORT | int | 8013 (rpc), 8015 (in-process) | rpc & in-process |
| tls | FLAGD_TLS | bool | false | rpc & in-process |
| cert_path | FLAGD_SERVER_CERT_PATH | String | null | rpc & in-process |
| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process |
| stream_deadline_ms | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process |
| keep_alive_time | FLAGD_KEEP_ALIVE_TIME_MS | int | 0 | rpc & in-process |
Expand All @@ -64,8 +65,6 @@ The default options can be defined in the FlagdProvider constructor.
<!-- not implemented
| target_uri | FLAGD_TARGET_URI | alternative to host/port, supporting custom name resolution | string | null | rpc & in-process |
| socket_path | FLAGD_SOCKET_PATH | alternative to host port, unix socket | String | null | rpc & in-process |
| cert_path | FLAGD_SERVER_CERT_PATH | tls cert path | String | null | rpc & in-process |
| max_event_stream_retries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc |
| context_enricher | - | sync-metadata to evaluation context mapping function | function | identity function | in-process |
| offline_pollIntervalMs | FLAGD_OFFLINE_POLL_MS | poll interval for reading offlineFlagSourcePath | int | 5000 | in-process |
-->
Expand Down Expand Up @@ -100,17 +99,18 @@ and the evaluation will default.

TLS is available in situations where flagd is running on another host.

<!--

You may optionally supply an X.509 certificate in PEM format. Otherwise, the default certificate store will be used.
```java
FlagdProvider flagdProvider = new FlagdProvider(
FlagdOptions.builder()
.host("myflagdhost")
.tls(true) // use TLS
.certPath("etc/cert/ca.crt") // PEM cert
.build());

```python
from openfeature import api
from openfeature.contrib.provider.flagd import FlagdProvider

api.set_provider(FlagdProvider(
tls=True, # use TLS
cert_path="etc/cert/ca.crt" # PEM cert
))
```
-->

## License

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class CacheType(Enum):
DEFAULT_RETRY_GRACE_PERIOD_SECONDS = 5
DEFAULT_STREAM_DEADLINE = 600000
DEFAULT_TLS = False
DEFAULT_TLS_CERT: typing.Optional[str] = None

ENV_VAR_CACHE_SIZE = "FLAGD_MAX_CACHE_SIZE"
ENV_VAR_CACHE_TYPE = "FLAGD_CACHE"
Expand All @@ -44,6 +45,7 @@ class CacheType(Enum):
ENV_VAR_RETRY_GRACE_PERIOD_SECONDS = "FLAGD_RETRY_GRACE_PERIOD"
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
ENV_VAR_TLS = "FLAGD_TLS"
ENV_VAR_TLS_CERT = "FLAGD_SERVER_CERT_PATH"

T = typing.TypeVar("T")

Expand Down Expand Up @@ -87,6 +89,7 @@ def __init__( # noqa: PLR0913
keep_alive_time: typing.Optional[int] = None,
cache: typing.Optional[CacheType] = None,
max_cache_size: typing.Optional[int] = None,
cert_path: typing.Optional[str] = None,
):
self.host = env_or_default(ENV_VAR_HOST, DEFAULT_HOST) if host is None else host

Expand Down Expand Up @@ -200,3 +203,9 @@ def __init__( # noqa: PLR0913
if max_cache_size is None
else max_cache_size
)

self.cert_path = (
env_or_default(ENV_VAR_TLS_CERT, DEFAULT_TLS_CERT)
if cert_path is None
else cert_path
)
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__( # noqa: PLR0913
max_cache_size: typing.Optional[int] = None,
retry_backoff_max_ms: typing.Optional[int] = None,
retry_grace_period: typing.Optional[int] = None,
cert_path: typing.Optional[str] = None,
):
"""
Create an instance of the FlagdProvider
Expand Down Expand Up @@ -91,6 +92,7 @@ def __init__( # noqa: PLR0913
keep_alive_time=keep_alive_time,
cache=cache,
max_cache_size=max_cache_size,
cert_path=cert_path,
)

self.resolver = self.setup_resolver()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,37 +64,50 @@ def __init__(
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
self.deadline = config.deadline_ms * 0.001
self.connected = False
channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel
self.channel = self._generate_channel(config)
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)

self.thread: typing.Optional[threading.Thread] = None
self.timer: typing.Optional[threading.Timer] = None

self.start_time = time.time()

def _generate_channel(self, config: Config) -> grpc.Channel:
target = f"{config.host}:{config.port}"
# Create the channel with the service config
options = [
("grpc.keepalive_time_ms", config.keep_alive_time),
("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms),
("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms),
("grpc.min_reconnect_backoff_ms", config.deadline_ms),
]
if config.tls:
channel_args = {
"options": options,
"credentials": grpc.ssl_channel_credentials(),
}
if config.cert_path:
with open(config.cert_path, "rb") as f:
channel_args["credentials"] = grpc.ssl_channel_credentials(f.read())

channel = grpc.secure_channel(target, **channel_args)

else:
channel = grpc.insecure_channel(
target,
options=options,
)

self.channel = channel_factory(
f"{config.host}:{config.port}",
options=options,
)
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)

self.thread: typing.Optional[threading.Thread] = None
self.timer: typing.Optional[threading.Timer] = None
self.active = False

self.thread: typing.Optional[threading.Thread] = None
self.timer: typing.Optional[threading.Timer] = None

self.start_time = time.time()
return channel

def initialize(self, evaluation_context: EvaluationContext) -> None:
self.connect()

def shutdown(self) -> None:
self.active = False
self.channel.close()
if self.cache:
self.cache.clear()

def connect(self) -> None:
self.active = True
Expand Down Expand Up @@ -166,16 +179,13 @@ def listen(self) -> None:
if self.streamline_deadline_seconds > 0
else {}
)
call_args["wait_for_ready"] = True
request = evaluation_pb2.EventStreamRequest()

# defining a never ending loop to recreate the stream
while self.active:
try:
logger.info("Setting up gRPC sync flags connection")
for message in self.stub.EventStream(
request, wait_for_ready=True, **call_args
):
logger.debug("Setting up gRPC sync flags connection")
for message in self.stub.EventStream(request, **call_args):
if message.type == "provider_ready":
self.connected = True
self.emit_provider_ready(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
import typing
from pathlib import Path

import grpc
Expand All @@ -14,9 +15,12 @@
class FlagdContainer(DockerContainer):
def __init__(
self,
feature: typing.Optional[str] = None,
**kwargs,
) -> None:
image: str = "ghcr.io/open-feature/flagd-testbed"
if feature is not None:
image = f"{image}-{feature}"
path = Path(__file__).parents[2] / "openfeature/test-harness/version.txt"
data = path.read_text().rstrip()
super().__init__(f"{image}:v{data}", **kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# from tests.e2e.step.provider_steps import *

resolver = ResolverType.RPC
feature_list = ["~targetURI", "~customCert", "~unixsocket", "~sync"]
feature_list = ["~targetURI", "~unixsocket", "~sync"]


def pytest_collection_modifyitems(config, items):
Expand Down
122 changes: 86 additions & 36 deletions providers/openfeature-provider-flagd/tests/e2e/step/provider_steps.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
import threading
import typing
from enum import Enum
from pathlib import Path

import pytest
from pytest_bdd import given, parsers, when
from testcontainers.core.container import DockerContainer
from tests.e2e.flagd_container import FlagdContainer
from tests.e2e.step._utils import wait_for

Expand All @@ -14,6 +16,14 @@
from openfeature.provider import ProviderStatus


class TestProviderType(Enum):
UNAVAILABLE = "unavailable"
STABLE = "stable"
UNSTABLE = "unstable"
SSL = "ssl"
SOCKET = "socket"


@given("a provider is registered", target_fixture="client")
def setup_provider_old(
container: FlagdContainer,
Expand All @@ -23,47 +33,83 @@ def setup_provider_old(
setup_provider(container, resolver_type, "stable", dict)


@given(parsers.cfparse("a {provider_type} flagd provider"), target_fixture="client")
def get_default_options_for_provider(
provider_type: str, resolver_type: ResolverType
) -> typing.Tuple[dict, bool]:
t = TestProviderType(provider_type)
options: dict = {
"resolver_type": resolver_type,
"deadline_ms": 500,
"stream_deadline_ms": 0,
"retry_backoff_ms": 1000,
"retry_grace_period": 2,
}
if t == TestProviderType.UNAVAILABLE:
return {}, False
elif t == TestProviderType.SSL:
path = (
Path(__file__).parents[3]
/ "openfeature/test-harness/ssl/custom-root-cert.crt"
)
options["cert_path"] = str(path.absolute())
options["tls"] = True
elif t == TestProviderType.SOCKET:
return options, True

return options, True


@given(
parsers.cfparse("a {provider_type} flagd provider"), target_fixture="provider_type"
)
def setup_provider(
container: FlagdContainer,
containers: dict,
resolver_type: ResolverType,
provider_type: str,
option_values: dict,
) -> OpenFeatureClient:
if provider_type == "unavailable":
api.set_provider(
FlagdProvider(
resolver_type=resolver_type,
**option_values,
),
"unavailable",
default_options, ready = get_default_options_for_provider(
provider_type, resolver_type
)

if ready:
container = (
containers.get(provider_type)
if provider_type in containers
else containers.get("default")
)
client = api.get_client("unavailable")
return client
try:
container.get_port(resolver_type)
except: # noqa: E722
container.start()

try:
container.get_port(resolver_type)
except: # noqa: E722
container.start()
default_options["port"] = container.get_port(resolver_type)

combined_options = {**default_options, **option_values}
api.set_provider(
FlagdProvider(
resolver_type=resolver_type,
port=container.get_port(resolver_type),
deadline_ms=500,
stream_deadline_ms=0,
retry_backoff_ms=1000,
**option_values,
),
FlagdProvider(**combined_options),
provider_type,
)
client = api.get_client(provider_type)

wait_for(lambda: client.get_provider_status() == ProviderStatus.READY)
return client
wait_for(
lambda: client.get_provider_status() == ProviderStatus.READY
) if ready else None
return provider_type


@pytest.fixture()
def client(ptype: str) -> OpenFeatureClient:
return api.get_client(ptype)


@when(parsers.cfparse("the connection is lost for {seconds}s"))
def flagd_restart(seconds, container: FlagdContainer):
def flagd_restart(seconds, containers: dict, provider_type: str):
container = (
containers.get(provider_type)
if provider_type in containers
else containers.get("default")
)
ipr_port = container.get_port(ResolverType.IN_PROCESS)
rpc_port = container.get_port(ResolverType.RPC)

Expand All @@ -78,19 +124,23 @@ def starting():


@pytest.fixture(autouse=True, scope="module")
def container(request):
container: DockerContainer = FlagdContainer()
def containers(request):
containers = {
"default": FlagdContainer(),
"ssl": FlagdContainer("ssl"),
}

# Setup code
container.start()
[containers[c].start() for c in containers]

def fin():
try:
container.stop()
except: # noqa: E722
logging.debug("container was not running anymore")
for name in containers.items():
container = containers[name]
try:
container.stop()
except: # noqa: E722
logging.debug(f"container '{name}' was not running anymore")

# Teardown code
request.addfinalizer(fin)

return container
return containers
Loading

0 comments on commit 7d5b35f

Please sign in to comment.