Skip to content

Commit

Permalink
Websocket notifier implementation in the relay proxy (#802)
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaspoignant authored Jun 27, 2023
1 parent 65a4062 commit f568822
Show file tree
Hide file tree
Showing 20 changed files with 1,025 additions and 47 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true

swagger-change:
name: Swagger Change
Expand Down
21 changes: 21 additions & 0 deletions cmd/relayproxy/api/middleware/websocket_authorizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package middleware

import (
"github.com/labstack/echo/v4"
"github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/config"
)

// WebsocketAuthorizer is a middleware that checks in the params if we have the needed parameter for authorization
func WebsocketAuthorizer(config *config.Config) echo.MiddlewareFunc {
return func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
if len(config.APIKeys) > 0 {
apiKey := c.QueryParam("apiKey")
if !config.APIKeyExists(apiKey) {
return echo.ErrUnauthorized
}
}
return next(c)
}
}
}
65 changes: 65 additions & 0 deletions cmd/relayproxy/api/middleware/websocket_authorizer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package middleware_test

import (
"fmt"
middleware2 "github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/api/middleware"
"github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/config"
"net/http"
"net/http/httptest"
"testing"

"github.com/labstack/echo/v4"
"github.com/stretchr/testify/assert"
)

func TestWebsocketAuthorizer(t *testing.T) {
type args struct {
confAPIKey string
urlAPIKey string
}
tests := []struct {
name string
args args
want int
wantErr assert.ErrorAssertionFunc
}{
{
name: "valid apiKey",
args: args{
confAPIKey: "valid-api-key",
urlAPIKey: "valid-api-key",
},
want: http.StatusOK,
wantErr: assert.NoError,
},
{
name: "invalid apiKey",
args: args{
confAPIKey: "valid-api-key",
urlAPIKey: "invalid-api-key",
},
want: http.StatusUnauthorized,
wantErr: assert.Error,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := echo.New()
req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/websocket?apiKey=%s", tt.args.urlAPIKey), nil)
req.Header.Set(echo.HeaderContentType, echo.MIMEApplicationJSON)
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
conf := &config.Config{
APIKeys: []string{tt.args.confAPIKey},
}
middleware := middleware2.WebsocketAuthorizer(conf)
handler := middleware(func(c echo.Context) error {
return c.String(http.StatusOK, "Authorized")
})

err := handler(c)
tt.wantErr(t, err)
})
}
}
94 changes: 53 additions & 41 deletions cmd/relayproxy/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,40 @@ package api

import (
"fmt"
custommiddleware "github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/api/middleware"
"strings"
"time"

"github.com/labstack/echo-contrib/prometheus"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
echoSwagger "github.com/swaggo/echo-swagger"
ffclient "github.com/thomaspoignant/go-feature-flag"
custommiddleware "github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/api/middleware"
"github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/config"
"github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/controller"
"github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/metric"
"github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/service"
"go.uber.org/zap"
"strings"
"time"
)

// New is used to create a new instance of the API server
func New(config *config.Config,
monitoringService service.Monitoring,
goFF *ffclient.GoFeatureFlag,
services service.Services,
zapLog *zap.Logger,
) Server {
s := Server{
config: config,
monitoringService: monitoringService,
goFF: goFF,
zapLog: zapLog,
config: config,
services: services,
zapLog: zapLog,
}
s.init()
return s
}

// Server is the struct that represent the API server
type Server struct {
config *config.Config
echoInstance *echo.Echo
monitoringService service.Monitoring
goFF *ffclient.GoFeatureFlag
zapLog *zap.Logger
config *config.Config
echoInstance *echo.Echo
services service.Services
zapLog *zap.Logger
}

// init initialize the configuration of our API server (using echo)
Expand All @@ -50,40 +45,55 @@ func (s *Server) init() {
s.echoInstance.HidePort = true
s.echoInstance.Debug = s.config.Debug

// Prometheus
// Global Middlewares
metrics := metric.NewMetrics()
prometheus := prometheus.NewPrometheus("gofeatureflag", nil, metrics.MetricList())
prometheus.Use(s.echoInstance)
prom := prometheus.NewPrometheus("gofeatureflag", nil, metrics.MetricList())
prom.Use(s.echoInstance)
s.echoInstance.Use(metrics.AddCustomMetricsMiddleware)

// Middlewares
s.echoInstance.Use(custommiddleware.ZapLogger(s.zapLog, s.config))
s.echoInstance.Use(middleware.Recover())
s.echoInstance.Use(middleware.TimeoutWithConfig(
middleware.TimeoutConfig{Timeout: time.Duration(s.config.RestAPITimeout) * time.Millisecond}),
)
if len(s.config.APIKeys) > 0 {
s.echoInstance.Use(middleware.KeyAuthWithConfig(middleware.KeyAuthConfig{
middleware.TimeoutConfig{
Skipper: func(c echo.Context) bool {
_, ok := map[string]struct{}{
"/health": {},
"/info": {},
"/metrics": {},
}[c.Path()]
return ok || strings.HasPrefix(c.Path(), "/swagger/")
// ignore websocket in the timeout
return strings.HasPrefix(c.Request().URL.String(), "/ws")
},
Timeout: time.Duration(s.config.RestAPITimeout) * time.Millisecond,
}),
)

// endpoints configuration
s.initAPIEndpoints()
s.initPublicEndpoints()
s.initWebsocketsEndpoints()
}

// initAPIEndpoints initialize the API endpoints
func (s *Server) initAPIEndpoints() {
// Init controllers
cAllFlags := controller.NewAllFlags(s.services.GOFeatureFlagService)
cFlagEval := controller.NewFlagEval(s.services.GOFeatureFlagService)
cEvalDataCollector := controller.NewCollectEvalData(s.services.GOFeatureFlagService)

// Init routes
v1 := s.echoInstance.Group("/v1")
if len(s.config.APIKeys) > 0 {
v1.Use(middleware.KeyAuthWithConfig(middleware.KeyAuthConfig{
Validator: func(key string, c echo.Context) (bool, error) {
return s.config.APIKeyExists(key), nil
},
}))
}
v1.POST("/allflags", cAllFlags.Handler)
v1.POST("/feature/:flagKey/eval", cFlagEval.Handler)
v1.POST("/data/collector", cEvalDataCollector.Handler)
}

// initPublicEndpoints initialize the public endpoints to monitor the application
func (s *Server) initPublicEndpoints() {
// Init controllers
cHealth := controller.NewHealth(s.monitoringService)
cInfo := controller.NewInfo(s.monitoringService)
cAllFlags := controller.NewAllFlags(s.goFF)
cFlagEval := controller.NewFlagEval(s.goFF)
cEvalDataCollector := controller.NewCollectEvalData(s.goFF)
cHealth := controller.NewHealth(s.services.MonitoringService)
cInfo := controller.NewInfo(s.services.MonitoringService)

// health Routes
s.echoInstance.GET("/health", cHealth.Handler)
Expand All @@ -93,12 +103,14 @@ func (s *Server) init() {
if s.config.EnableSwagger {
s.echoInstance.GET("/swagger/*", echoSwagger.WrapHandler)
}
}

// GO feature flags routes
v1 := s.echoInstance.Group("/v1")
v1.POST("/allflags", cAllFlags.Handler)
v1.POST("/feature/:flagKey/eval", cFlagEval.Handler)
v1.POST("/data/collector", cEvalDataCollector.Handler)
// initWebsocketsEndpoints initialize the websocket endpoints
func (s *Server) initWebsocketsEndpoints() {
cFlagReload := controller.NewWsFlagChange(s.services.WebsocketService, s.zapLog)
v1 := s.echoInstance.Group("/ws/v1")
v1.Use(custommiddleware.WebsocketAuthorizer(s.config))
v1.GET("/flag/change", cFlagReload.Handler)
}

// Start launch the API server
Expand Down
2 changes: 1 addition & 1 deletion cmd/relayproxy/controller/all_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewAllFlags(goFF *ffclient.GoFeatureFlag) Controller {
// @Produce json
// @Accept json
// @Param data body model.AllFlagRequest true "Payload of the user we want to challenge against the flag."
// @Success 200 {object} modeldocs.AllFlags "Success"
// @Success 200 {object} modeldocs.AllFlags "Success"
// @Failure 400 {object} modeldocs.HTTPErrorDoc "Bad Request"
// @Failure 500 {object} modeldocs.HTTPErrorDoc "Internal server error"
// @Router /v1/allflags [post]
Expand Down
92 changes: 92 additions & 0 deletions cmd/relayproxy/controller/ws_flag_change.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package controller

import (
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/service"
"go.uber.org/zap"
"net/http"
"time"
)

// NewWsFlagChange is the constructor to create a new controller to handle websocket
// request to be notified about flag changes.
func NewWsFlagChange(websocketService service.WebsocketService, logger *zap.Logger) Controller {
return &wsFlagChange{
websocketService: websocketService,
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
},
logger: logger,
}
}

// wsFlagChange is the implementation of the controller
type wsFlagChange struct {
websocketService service.WebsocketService
upgrader websocket.Upgrader
logger *zap.Logger
}

// Handler is the entry point for the websocket endpoint to get notified when a flag has been edited
// @Summary Websocket endpoint to be notified about flag changes
// @Description This endpoint is a websocket endpoint to be notified about flag changes, every change
// @Description will send a request to the client with a model.DiffCache format.
// @Description
// @Produce json
// @Accept json
// @Param apiKey query string false "apiKey use authorize the connection to the relay proxy"
// @Success 200 {object} notifier.DiffCache "Success"
// @Failure 400 {object} modeldocs.HTTPErrorDoc "Bad Request"
// @Failure 500 {object} modeldocs.HTTPErrorDoc "Internal server error"
// @Router /ws/v1/flag/change [post]
func (f *wsFlagChange) Handler(c echo.Context) error {
conn, err := f.upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
return err
}
defer func() { _ = conn.Close() }()
f.websocketService.Register(conn)
defer f.websocketService.Deregister(conn)
f.logger.Debug("registering new websocket connection", zap.Any("connection", conn))

// Start the ping pong loop
go f.pingPongLoop(conn)
isOpen := true
for isOpen {
_, _, err := conn.ReadMessage()
if err != nil {
f.websocketService.Deregister(conn)
f.logger.Debug("closing websocket connection", zap.Error(err), zap.Any("connection", conn))
isOpen = false
}
}
return nil
}

// pingPongLoop is a keep-alive call to the client.
// It calls the client to ensure that the connection is still active.
// If the ping is not working we are closing the session.
func (f *wsFlagChange) pingPongLoop(conn *websocket.Conn) {
// Ping interval duration
pingInterval := 1 * time.Second
// Create a ticker to send pings at regular intervals
ticker := time.NewTicker(pingInterval)
defer ticker.Stop()

// nolint: gosimple
for {
select {
case <-ticker.C:
// Send a ping message to the client
err := conn.WriteMessage(websocket.PingMessage, nil)
if err != nil {
f.websocketService.Deregister(conn)
f.logger.Debug("closing websocket connection", zap.Error(err), zap.Any("connection", conn))
return
}
}
}
}
Loading

0 comments on commit f568822

Please sign in to comment.