Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add grpc sync flag store #84

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@ jobs:
- "hooks/openfeature-hooks-opentelemetry"
- "providers/openfeature-provider-flagd"

services:
beeme1mr marked this conversation as resolved.
Show resolved Hide resolved
# flagd-testbed for flagd RPC provider e2e tests
flagd:
image: ghcr.io/open-feature/flagd-testbed:v0.5.4
ports:
- 8013:8013
# flagd-testbed for flagd RPC provider reconnect e2e tests
flagd-unstable:
image: ghcr.io/open-feature/flagd-testbed-unstable:v0.5.4
ports:
- 8014:8013
# sync-testbed for flagd in-process provider e2e tests
sync:
image: ghcr.io/open-feature/sync-testbed:v0.5.4
ports:
- 9090:9090
# sync-testbed for flagd in-process provider reconnect e2e tests
sync-unstable:
image: ghcr.io/open-feature/sync-testbed-unstable:v0.5.4
ports:
- 9091:9090

steps:
- uses: actions/checkout@v4
with:
Expand Down
12 changes: 10 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@ We use `pytest` for our unit testing, making use of `parametrized` to inject cas

### Integration tests

These are planned once the SDK has been stabilized and a Flagd provider implemented. At that point, we will utilize the [gherkin integration tests](https://github.com/open-feature/test-harness/blob/main/features/evaluation.feature) to validate against a live, seeded Flagd instance.
The Flagd provider utilizes the [gherkin integration tests](https://github.com/open-feature/test-harness/blob/main/features/evaluation.feature) to validate against a live, seeded Flagd instance.

To run the integration tests:

```bash
cd providers/openfeature-provider-flagd
docker-compose up -d # this runs the flagd sidecars
hatch run test
```

## Pull Request

Expand Down Expand Up @@ -62,7 +70,7 @@ To start working on a new feature or bugfix, create a new branch and start worki
```bash
git checkout -b feat/NAME_OF_FEATURE
# Make your changes
git commit
git commit -s -m "feat: my feature"
git push fork feat/NAME_OF_FEATURE
```

Expand Down
30 changes: 24 additions & 6 deletions providers/openfeature-provider-flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ from openfeature.contrib.provider.flagd import FlagdProvider
api.set_provider(FlagdProvider())
```


To use in-process evaluation with flagd gRPC sync service:

```python
from openfeature import api
from openfeature.contrib.provider.flagd import FlagdProvider
from openfeature.contrib.provider.flagd.config import ResolverType

api.set_provider(FlagdProvider(
resolver_type=ResolverType.IN_PROCESS,
))
```

To use in-process evaluation in offline mode with a file as source:

```python
Expand All @@ -36,12 +49,17 @@ api.set_provider(FlagdProvider(

The default options can be defined in the FlagdProvider constructor.

| Option name | Type & Values | Default |
|----------------|---------------|-----------|
| host | str | localhost |
| port | int | 8013 |
| schema | str | http |
| timeout | int | 2 |
| Option name | Environment Variable Name | Type & Values | Default |
|-------------------------------|-------------------------------------|----------------|-----------|
| resolver_type | FLAGD_RESOLVER_TYPE | enum | grpc |
| host | FLAGD_HOST | str | localhost |
| port | FLAGD_PORT | int | 8013 |
| tls | FLAGD_TLS | bool | false |
| timeout | | int | 5 |
| retry_backoff_seconds | FLAGD_RETRY_BACKOFF_SECONDS | float | 2.0 |
| selector | FLAGD_SELECTOR | str | None |
| offline_flag_source_path | FLAGD_OFFLINE_FLAG_SOURCE_PATH | str | None |
| offline_poll_interval_seconds | FLAGD_OFFLINE_POLL_INTERVAL_SECONDS | float | 1.0 |

## License

Expand Down
25 changes: 25 additions & 0 deletions providers/openfeature-provider-flagd/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
services:
flagd:
build:
context: test-harness
dockerfile: flagd/Dockerfile
ports:
- 8013:8013
flagd-unstable:
build:
context: test-harness
dockerfile: flagd/Dockerfile.unstable
ports:
- 8014:8013
flagd-sync:
build:
context: test-harness
dockerfile: sync/Dockerfile
ports:
- 9090:9090
flagd-sync-unstable:
build:
context: test-harness
dockerfile: sync/Dockerfile.unstable
ports:
- 9091:9090
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure this file is excluded from published package. We can do so from the pyproject.toml

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Checking this now it also includes tests and test-harness etc. Should we simplify to only include src?

[tool.hatch.build.targets.sdist]
include = [
  "src",
]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd have to check the generated build to confirm. If you can please make this change in a separate PR.

1 change: 1 addition & 0 deletions providers/openfeature-provider-flagd/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ cov = [
exclude = [
".gitignore",
"schemas",
"docker-compose.yaml",
]

[tool.hatch.build.targets.wheel]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def __init__( # noqa: PLR0913
port: typing.Optional[int] = None,
tls: typing.Optional[bool] = None,
timeout: typing.Optional[int] = None,
retry_backoff_seconds: typing.Optional[float] = None,
selector: typing.Optional[str] = None,
resolver_type: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
offline_poll_interval_seconds: typing.Optional[float] = None,
Expand All @@ -42,6 +44,14 @@ def __init__( # noqa: PLR0913
env_or_default("FLAGD_TLS", False, cast=str_to_bool) if tls is None else tls
)
self.timeout = 5 if timeout is None else timeout
self.retry_backoff_seconds: float = (
float(env_or_default("FLAGD_RETRY_BACKOFF_SECONDS", 2.0))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do cast=float here for consistency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but then it gives me this:

Screenshot 2024-06-20 at 21 09 12

After fighting with mypy a bit I realised that pulling the cast outside of the function is the cleanest way to have clean typing. I would instead suggest refactoring to remove this helper function everywhere and just use the python libraries directly. This abstraction doesn't add too much if you ask me:

self.retry_backoff_seconds: float = (
    env_or_default("FLAGD_RETRY_BACKOFF_SECONDS", 2.0, cast=float)
    if retry_backoff_seconds is None
    else retry_backoff_seconds
)

vs

self.retry_backoff_seconds: float = (
    float(os.environ.get("FLAGD_RETRY_BACKOFF_SECONDS", 2.0))
    if retry_backoff_seconds is None
    else retry_backoff_seconds
)

if retry_backoff_seconds is None
else retry_backoff_seconds
)
self.selector = (
env_or_default("FLAGD_SELECTOR", None) if selector is None else selector
)
self.resolver_type = (
ResolverType(env_or_default("FLAGD_RESOLVER_TYPE", "grpc"))
if resolver_type is None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

from flagd.evaluation.v1 import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2
from . import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes automated after grpc code generation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I actually didn't notice that script. When I try to run it I get this file not found error, does it need to be fixed and maybe documented in the contributing guide?

Failure: could not read file schemas/protobuf/buf.gen.python.yaml: open schemas/protobuf/buf.gen.python.yaml: no such file or directory



class ServiceStub(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

from flagd.sync.v1 import sync_pb2 as flagd_dot_sync_dot_v1_dot_sync__pb2
from . import sync_pb2 as flagd_dot_sync_dot_v1_dot_sync__pb2


class FlagSyncServiceStub(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

from sync.v1 import sync_service_pb2 as sync_dot_v1_dot_sync__service__pb2
from . import sync_service_pb2 as sync_dot_v1_dot_sync__service__pb2


class FlagSyncServiceStub(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def __init__( # noqa: PLR0913
port: typing.Optional[int] = None,
tls: typing.Optional[bool] = None,
timeout: typing.Optional[int] = None,
retry_backoff_seconds: typing.Optional[float] = None,
selector: typing.Optional[str] = None,
resolver_type: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
offline_poll_interval_seconds: typing.Optional[float] = None,
Expand All @@ -60,6 +62,8 @@ def __init__( # noqa: PLR0913
port=port,
tls=tls,
timeout=timeout,
retry_backoff_seconds=retry_backoff_seconds,
selector=selector,
resolver_type=resolver_type,
offline_flag_source_path=offline_flag_source_path,
offline_poll_interval_seconds=offline_poll_interval_seconds,
Expand All @@ -77,6 +81,10 @@ def setup_resolver(self) -> AbstractResolver:
f"`resolver_type` parameter invalid: {self.config.resolver_type}"
)

def initialize(self, evaluation_context: EvaluationContext) -> None:
if hasattr(self.resolver, "initialize"):
self.resolver.initialize(evaluation_context)
colebaileygit marked this conversation as resolved.
Show resolved Hide resolved

def shutdown(self) -> None:
if self.resolver:
self.resolver.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
from openfeature.provider.provider import AbstractProvider

from ..config import Config
from .process.connector import FlagStateConnector
from .process.connector.file_watcher import FileWatcher
from .process.connector.grpc_watcher import GrpcWatcher
from .process.custom_ops import ends_with, fractional, sem_ver, starts_with
from .process.file_watcher import FileWatcherFlagStore
from .process.flags import FlagStore

T = typing.TypeVar("T")

Expand All @@ -27,18 +30,23 @@
def __init__(self, config: Config, provider: AbstractProvider):
self.config = config
self.provider = provider
if not self.config.offline_flag_source_path:
raise ValueError(
"offline_flag_source_path must be provided when using in-process resolver"
self.flag_store = FlagStore(provider)
self.connector: FlagStateConnector = (
FileWatcher(
self.config.offline_flag_source_path,
self.provider,
self.flag_store,
self.config.offline_poll_interval_seconds,
)
self.flag_store = FileWatcherFlagStore(
self.config.offline_flag_source_path,
self.provider,
self.config.offline_poll_interval_seconds,
if self.config.offline_flag_source_path
else GrpcWatcher(self.config, self.provider, self.flag_store)
)
colebaileygit marked this conversation as resolved.
Show resolved Hide resolved

def initialize(self, evaluation_context: EvaluationContext) -> None:
self.connector.initialize(evaluation_context)

def shutdown(self) -> None:
self.flag_store.shutdown()
self.connector.shutdown()

def resolve_boolean_details(
self,
Expand All @@ -62,15 +70,21 @@
default_value: float,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[float]:
return self._resolve(key, default_value, evaluation_context)
result = self._resolve(key, default_value, evaluation_context)
if not isinstance(result.value, float):
result.value = float(result.value)
return result

def resolve_integer_details(
self,
key: str,
default_value: int,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[int]:
return self._resolve(key, default_value, evaluation_context)
result = self._resolve(key, default_value, evaluation_context)
if not isinstance(result.value, int):
result.value = int(result.value)

Check warning on line 86 in providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py

View check run for this annotation

Codecov / codecov/patch

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py#L86

Added line #L86 was not covered by tests
return result

def resolve_object_details(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import typing

from openfeature.evaluation_context import EvaluationContext


class FlagStateConnector(typing.Protocol):
def initialize(
self, evaluation_context: EvaluationContext
) -> None: ... # pragma: no cover

def shutdown(self) -> None: ... # pragma: no cover
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import json
import logging
import os
import threading
import time
import typing

import yaml

from openfeature.evaluation_context import EvaluationContext
from openfeature.event import ProviderEventDetails
from openfeature.exception import ParseError, ProviderNotReadyError
from openfeature.provider.provider import AbstractProvider

from ..connector import FlagStateConnector
from ..flags import FlagStore

logger = logging.getLogger("openfeature.contrib")


class FileWatcher(FlagStateConnector):
def __init__(
self,
file_path: str,
provider: AbstractProvider,
flag_store: FlagStore,
poll_interval_seconds: float = 1.0,
):
self.file_path = file_path
self.provider = provider
self.poll_interval_seconds = poll_interval_seconds

self.last_modified = 0.0
self.flag_store = flag_store
self.should_emit_ready_on_success = False

def initialize(self, evaluation_context: EvaluationContext) -> None:
self.active = True
self.thread = threading.Thread(target=self.refresh_file, daemon=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is daemon set to true in this thread?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a name to all child threads for easier debugging. This could be FlagdFileWatcherWorkerThread

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically this:

By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.

https://stackoverflow.com/questions/190010/daemon-threads-explanation

self.thread.start()

colebaileygit marked this conversation as resolved.
Show resolved Hide resolved
# Let this throw exceptions so that provider status is set correctly
try:
self._load_data()
self.should_emit_ready_on_success = True
except Exception as err:
raise ProviderNotReadyError from err

def shutdown(self) -> None:
self.active = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shutting down and then initializing again can leave more than one thread running here, because active might not be checked until after the sleep expires.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the same issue in both connector implementations. The current thread handling logic is a bit fragile, would it make sense to build a cancellable scheduled task manager and concentrate our efforts on making sure its implementation is robust? I'm happy to discuss how to approach it.

Copy link
Contributor Author

@colebaileygit colebaileygit Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds sensible. Would this be a simple refactor/enhancement we can add directly to this PR? It might be easier for me to review and test a first reference implementation if you can help provide that. I'm definitely not an expert in python thread handling


def refresh_file(self) -> None:
while self.active:
time.sleep(self.poll_interval_seconds)
logger.debug("checking for new flag store contents from file")
self.safe_load_data()

def safe_load_data(self) -> None:
try:
last_modified = os.path.getmtime(self.file_path)
if last_modified > self.last_modified:
self._load_data(last_modified)
except FileNotFoundError:
self.handle_error("Provided file path not valid")
except json.JSONDecodeError:
self.handle_error("Could not parse JSON flag data from file")
except yaml.error.YAMLError:
self.handle_error("Could not parse YAML flag data from file")

Check warning on line 68 in providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py

View check run for this annotation

Codecov / codecov/patch

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py#L68

Added line #L68 was not covered by tests
except ParseError:
self.handle_error("Could not parse flag data using flagd syntax")
except Exception:
self.handle_error("Could not read flags from file")

Check warning on line 72 in providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py

View check run for this annotation

Codecov / codecov/patch

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py#L71-L72

Added lines #L71 - L72 were not covered by tests

def _load_data(self, modified_time: typing.Optional[float] = None) -> None:
with open(self.file_path) as file:
if self.file_path.endswith(".yaml"):
data = yaml.safe_load(file)
else:
data = json.load(file)

self.flag_store.update(data)

if self.should_emit_ready_on_success:
self.provider.emit_provider_ready(
ProviderEventDetails(
message="Reloading file contents recovered from error state"
)
)
self.should_emit_ready_on_success = False

self.last_modified = modified_time or os.path.getmtime(self.file_path)

def handle_error(self, error_message: str) -> None:
logger.exception(error_message)
self.should_emit_ready_on_success = True
self.provider.emit_provider_error(ProviderEventDetails(message=error_message))
Loading
Loading