Skip to content

Commit

Permalink
Add support of websocket in the python provider (#1294)
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaspoignant authored Nov 24, 2023
1 parent 3234444 commit 0e73acc
Show file tree
Hide file tree
Showing 10 changed files with 838 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ class BaseModel(PydanticBaseModel):


class GoFeatureFlagOptions(BaseModel):
# endpoint is the endpoint of the relay proxy.
# example: http://localhost:1031
endpoint: AnyHttpUrl
urllib3_pool_manager: typing.Optional[urllib3.PoolManager] = None

# flagCacheSize (optional) is the maximum number of flag events we keep in memory to cache your flags.
# default: 10000
Expand All @@ -25,3 +26,22 @@ class GoFeatureFlagOptions(BaseModel):
# disableDataCollection set to true if you don't want to collect the usage of flags retrieved in the cache.
# default: false
disable_data_collection: typing.Optional[bool] = False

# reconnectInterval (optional) interval time (in seconds) we use to reconnect to the server if the \
# connection is stopped.
# default: 1 minute
reconnect_interval: typing.Optional[int] = 60

# ADVANCED OPTIONS --- be careful when changing these options

# debug (optional) if set to true, the provider will print debug logs
# default: false
debug: typing.Optional[bool] = False

# http_client (optional) is the http client used to call the relay proxy.
urllib3_pool_manager: typing.Optional[urllib3.PoolManager] = None

# disable_cache_invalidation (optional) set to true if you don't want to invalidate the cache when the remote
# config changes.
# default: false
disable_cache_invalidation: typing.Optional[bool] = False
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
from http import HTTPStatus
from threading import Thread
from typing import List, Optional, Type, Union
from urllib.parse import urljoin

import pylru
import urllib3
import websocket
from openfeature.evaluation_context import EvaluationContext
from openfeature.exception import ErrorCode
from openfeature.exception import (
Expand Down Expand Up @@ -41,6 +43,8 @@ class GoFeatureFlagProvider(AbstractProvider, BaseModel):
_cache: pylru.lrucache = PrivateAttr()
_status: ProviderStatus = PrivateAttr(ProviderStatus.NOT_READY)
_data_collector_hook: Optional[DataCollectorHook] = PrivateAttr()
_ws: websocket.WebSocketApp = PrivateAttr()
_ws_thread: Thread = PrivateAttr()

def __init__(self, **data):
"""
Expand All @@ -62,16 +66,42 @@ def __init__(self, **data):
options=self.options,
http_client=self._http_client,
)
websocket.enableTrace(self.options.debug)
self._ws = websocket.WebSocketApp(
self._build_websocket_uri(),
on_message=self._websocket_message_handler,
on_open=self._websocket_open_handler,
on_close=self._websocket_close_handler,
on_error=self._websocket_error_handler,
)

def get_status(self):
def get_status(self) -> ProviderStatus:
"""
get_status returns the status of the provider
:return: the status of the provider
"""
return self._status

def initialize(self, evaluation_context: EvaluationContext):
def initialize(self, evaluation_context: EvaluationContext) -> None:
"""
initialize is called when the provider is initialized.
:param evaluation_context: the evaluation context
:return: None
"""
self._cache = pylru.lrucache(self.options.cache_size)
self._data_collector_hook.initialize()
self._status = ProviderStatus.READY
# start the websocket thread
if self.options.disable_cache_invalidation is False:
self._ws_thread = Thread(target=self.run_websocket)
self._ws_thread.start()
else:
self._status = ProviderStatus.READY

def shutdown(self):
if self.options.disable_cache_invalidation is False:
self._ws.close(status=websocket.STATUS_NORMAL)
self._ws_thread.join()

if self._cache is not None:
self._cache.clear()

Expand Down Expand Up @@ -208,7 +238,7 @@ def generic_go_feature_flag_resolver(

if original_type == int:
response_json = json.loads(response_body)
# in some cases pydantic auto convert float in int.
# in some cases, pydantic auto convert float in int.
if type(response_json.get("value")) != int:
raise TypeMismatchError(
"unexpected type for flag {}".format(flag_key)
Expand Down Expand Up @@ -247,3 +277,60 @@ def generic_go_feature_flag_resolver(
raise GeneralError(
"unexpected error while evaluating flag {}: {}".format(flag_key, exc)
)

def _build_websocket_uri(self):
"""
_build_websocket_uri is a helper to build the websocket uri to connect to the GO Feature Flag relay proxy.
:return: a string representing the websocket uri
"""
http_uri = urljoin(
str(self.options.endpoint),
"/ws/v1/flag/change",
)
http_uri = http_uri.replace("http", "ws")
http_uri = http_uri.replace("https", "wss")
return http_uri

def run_websocket(self) -> None:
"""
run_websocket is a helper to run the websocket connection to the GO Feature Flag server.
:return: None
"""
self._ws.run_forever(reconnect=self.options.reconnect_interval)

def _websocket_message_handler(self, wsapp, message) -> None:
"""
websocket_message_handler is the handler called when we receive a message from the GO Feature Flag server
:param wsapp: the websocket app
:param message: the message received
:return: None
"""
# when we receive a message from go-feature-flag server, we clear the cache.
self._cache.clear()

def _websocket_open_handler(self, ws_app) -> None:
"""
websocket_open_handler is the handler called when the websocket is open
:param ws app: the websocket app
:return: None
"""
self._status = ProviderStatus.READY

def _websocket_error_handler(self, ws_app, error) -> None:
"""
websocket_error_handler is the handler called when we receive an error from the GO Feature Flag server
:param ws_app: the websocket app
:param error: error received
:return: None
"""
self._status = ProviderStatus.ERROR

def _websocket_close_handler(self, ws_app, close_status_code, close_msg) -> None:
"""
websocket_close_handler is the handler called when the websocket is closed
:param wsapp: the websocket app
:param close_status_code: the status code of the close
:param close_msg: the message of the close
:return: None
"""
self._status = ProviderStatus.STALE
Loading

0 comments on commit 0e73acc

Please sign in to comment.