Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into feat/gherkinmigration
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Schrottner <[email protected]>
  • Loading branch information
aepfli committed Dec 27, 2024
2 parents 67a6850 + f50351a commit 35790da
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 99 deletions.
24 changes: 12 additions & 12 deletions providers/openfeature-provider-flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ api.set_provider(FlagdProvider(
The default options can be defined in the FlagdProvider constructor.

| Option name | Environment variable name | Type & Values | Default | Compatible resolver |
| ------------------------ | ------------------------------ | -------------------------- | ----------------------------- | ------------------- |
|--------------------------|--------------------------------|----------------------------|-------------------------------|---------------------|
| resolver_type | FLAGD_RESOLVER | enum - `rpc`, `in-process` | rpc | |
| host | FLAGD_HOST | str | localhost | rpc & in-process |
| port | FLAGD_PORT | int | 8013 (rpc), 8015 (in-process) | rpc & in-process |
| tls | FLAGD_TLS | bool | false | rpc & in-process |
| cert_path | FLAGD_SERVER_CERT_PATH | String | null | rpc & in-process |
| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process |
| stream_deadline_ms | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process |
| keep_alive_time | FLAGD_KEEP_ALIVE_TIME_MS | int | 0 | rpc & in-process |
Expand All @@ -64,8 +65,6 @@ The default options can be defined in the FlagdProvider constructor.
<!-- not implemented
| target_uri | FLAGD_TARGET_URI | alternative to host/port, supporting custom name resolution | string | null | rpc & in-process |
| socket_path | FLAGD_SOCKET_PATH | alternative to host port, unix socket | String | null | rpc & in-process |
| cert_path | FLAGD_SERVER_CERT_PATH | tls cert path | String | null | rpc & in-process |
| max_event_stream_retries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc |
| context_enricher | - | sync-metadata to evaluation context mapping function | function | identity function | in-process |
| offline_pollIntervalMs | FLAGD_OFFLINE_POLL_MS | poll interval for reading offlineFlagSourcePath | int | 5000 | in-process |
-->
Expand Down Expand Up @@ -100,17 +99,18 @@ and the evaluation will default.

TLS is available in situations where flagd is running on another host.

<!--

You may optionally supply an X.509 certificate in PEM format. Otherwise, the default certificate store will be used.
```java
FlagdProvider flagdProvider = new FlagdProvider(
FlagdOptions.builder()
.host("myflagdhost")
.tls(true) // use TLS
.certPath("etc/cert/ca.crt") // PEM cert
.build());

```python
from openfeature import api
from openfeature.contrib.provider.flagd import FlagdProvider

api.set_provider(FlagdProvider(
tls=True, # use TLS
cert_path="etc/cert/ca.crt" # PEM cert
))
```
-->

## License

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class CacheType(Enum):
DEFAULT_RETRY_GRACE_PERIOD_SECONDS = 5
DEFAULT_STREAM_DEADLINE = 600000
DEFAULT_TLS = False
DEFAULT_TLS_CERT: typing.Optional[str] = None

ENV_VAR_CACHE_SIZE = "FLAGD_MAX_CACHE_SIZE"
ENV_VAR_CACHE_TYPE = "FLAGD_CACHE"
Expand All @@ -44,6 +45,7 @@ class CacheType(Enum):
ENV_VAR_RETRY_GRACE_PERIOD_SECONDS = "FLAGD_RETRY_GRACE_PERIOD"
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
ENV_VAR_TLS = "FLAGD_TLS"
ENV_VAR_TLS_CERT = "FLAGD_SERVER_CERT_PATH"

T = typing.TypeVar("T")

Expand Down Expand Up @@ -87,6 +89,7 @@ def __init__( # noqa: PLR0913
keep_alive_time: typing.Optional[int] = None,
cache: typing.Optional[CacheType] = None,
max_cache_size: typing.Optional[int] = None,
cert_path: typing.Optional[str] = None,
):
self.host = env_or_default(ENV_VAR_HOST, DEFAULT_HOST) if host is None else host

Expand Down Expand Up @@ -200,3 +203,9 @@ def __init__( # noqa: PLR0913
if max_cache_size is None
else max_cache_size
)

self.cert_path = (
env_or_default(ENV_VAR_TLS_CERT, DEFAULT_TLS_CERT)
if cert_path is None
else cert_path
)
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__( # noqa: PLR0913
max_cache_size: typing.Optional[int] = None,
retry_backoff_max_ms: typing.Optional[int] = None,
retry_grace_period: typing.Optional[int] = None,
cert_path: typing.Optional[str] = None,
):
"""
Create an instance of the FlagdProvider
Expand Down Expand Up @@ -91,6 +92,7 @@ def __init__( # noqa: PLR0913
keep_alive_time=keep_alive_time,
cache=cache,
max_cache_size=max_cache_size,
cert_path=cert_path,
)

self.resolver = self.setup_resolver()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,37 +64,50 @@ def __init__(
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
self.deadline = config.deadline_ms * 0.001
self.connected = False
channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel
self.channel = self._generate_channel(config)
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)

self.thread: typing.Optional[threading.Thread] = None
self.timer: typing.Optional[threading.Timer] = None

self.start_time = time.time()

def _generate_channel(self, config: Config) -> grpc.Channel:
target = f"{config.host}:{config.port}"
# Create the channel with the service config
options = [
("grpc.keepalive_time_ms", config.keep_alive_time),
("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms),
("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms),
("grpc.min_reconnect_backoff_ms", config.deadline_ms),
]
if config.tls:
channel_args = {
"options": options,
"credentials": grpc.ssl_channel_credentials(),
}
if config.cert_path:
with open(config.cert_path, "rb") as f:
channel_args["credentials"] = grpc.ssl_channel_credentials(f.read())

channel = grpc.secure_channel(target, **channel_args)

else:
channel = grpc.insecure_channel(
target,
options=options,
)

self.channel = channel_factory(
f"{config.host}:{config.port}",
options=options,
)
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)

self.thread: typing.Optional[threading.Thread] = None
self.timer: typing.Optional[threading.Timer] = None
self.active = False

self.thread: typing.Optional[threading.Thread] = None
self.timer: typing.Optional[threading.Timer] = None

self.start_time = time.time()
return channel

def initialize(self, evaluation_context: EvaluationContext) -> None:
self.connect()

def shutdown(self) -> None:
self.active = False
self.channel.close()
if self.cache:
self.cache.clear()

def connect(self) -> None:
self.active = True
Expand Down Expand Up @@ -166,16 +179,13 @@ def listen(self) -> None:
if self.streamline_deadline_seconds > 0
else {}
)
call_args["wait_for_ready"] = True
request = evaluation_pb2.EventStreamRequest()

# defining a never ending loop to recreate the stream
while self.active:
try:
logger.info("Setting up gRPC sync flags connection")
for message in self.stub.EventStream(
request, wait_for_ready=True, **call_args
):
logger.debug("Setting up gRPC sync flags connection")
for message in self.stub.EventStream(request, **call_args):
if message.type == "provider_ready":
self.connected = True
self.emit_provider_ready(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
import typing
from pathlib import Path

import grpc
Expand All @@ -14,9 +15,12 @@
class FlagdContainer(DockerContainer):
def __init__(
self,
feature: typing.Optional[str] = None,
**kwargs,
) -> None:
image: str = "ghcr.io/open-feature/flagd-testbed"
if feature is not None:
image = f"{image}-{feature}"
path = Path(__file__).parents[2] / "openfeature/test-harness/version.txt"
data = path.read_text().rstrip()
super().__init__(f"{image}:v{data}", **kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ def all_flags(request):
@pytest.fixture(params=["json", "yaml"], scope="module")
def file_name(request, all_flags):
extension = request.param
outfile = tempfile.NamedTemporaryFile("w", delete=False, suffix="." + extension)
write_test_file(outfile, all_flags)
yield outfile
return outfile
with tempfile.NamedTemporaryFile(
"w", delete=False, suffix="." + extension
) as outfile:
write_test_file(outfile, all_flags)
yield outfile
return outfile


def write_test_file(outfile, all_flags):
Expand Down Expand Up @@ -110,7 +112,7 @@ def changed_flag(


@pytest.fixture(autouse=True)
def container(request, file_name, all_flags, option_values):
def containers(request, file_name, all_flags, option_values):
api.set_provider(
FlagdProvider(
resolver_type=ResolverType.IN_PROCESS,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from e2e.conftest import SPEC_PATH
from pytest_bdd import scenarios
from tests.e2e.conftest import TEST_HARNESS_PATH
from tests.e2e.conftest import SPEC_PATH, TEST_HARNESS_PATH

scenarios(f"{TEST_HARNESS_PATH}/gherkin", f"{SPEC_PATH}/specification/assets/gherkin")
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# from tests.e2e.step.provider_steps import *

resolver = ResolverType.RPC
feature_list = ["~targetURI", "~customCert", "~unixsocket", "~sync"]
feature_list = ["~targetURI", "~unixsocket", "~sync"]


def pytest_collection_modifyitems(config, items):
Expand Down
Loading

0 comments on commit 35790da

Please sign in to comment.