diff --git a/.golangci.yaml b/.golangci.yaml index 87f9503d..6fa9bef9 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -70,6 +70,7 @@ linters-settings: - "gopkg.in/natefinch/lumberjack.v2" - "github.com/expr-lang/expr" - "github.com/jackc/pgx/v5/pgproto3" + - "github.com/golang-queue/queue" test: files: - $test @@ -88,6 +89,7 @@ linters-settings: - "github.com/knadh/koanf" - "github.com/spf13/cast" - "github.com/jackc/pgx/v5/pgproto3" + - "github.com/golang-queue/queue" tagalign: align: false sort: false diff --git a/act/registry.go b/act/registry.go index 1a45a85d..b46f4561 100644 --- a/act/registry.go +++ b/act/registry.go @@ -2,6 +2,7 @@ package act import ( "context" + "encoding/json" "errors" "slices" "time" @@ -9,6 +10,7 @@ import ( sdkAct "github.com/gatewayd-io/gatewayd-plugin-sdk/act" "github.com/gatewayd-io/gatewayd/config" gerr "github.com/gatewayd-io/gatewayd/errors" + "github.com/golang-queue/queue" "github.com/rs/zerolog" ) @@ -32,6 +34,20 @@ type Registry struct { DefaultPolicyName string DefaultPolicy *sdkAct.Policy DefaultSignal *sdkAct.Signal + ActionQueue *queue.Queue +} + +type asyncActionMessage struct { + Output *sdkAct.Output + Params []sdkAct.Parameter +} + +func (j *asyncActionMessage) Bytes() []byte { + b, err := json.Marshal(j) + if err != nil { + panic(err) + } + return b } var _ IRegistry = (*Registry)(nil) @@ -79,6 +95,11 @@ func NewActRegistry( registry.Logger.Debug().Str("name", registry.DefaultPolicyName).Msg("Using default policy") + if registry.ActionQueue == nil { + registry.Logger.Warn().Msg("ActionQueue is nil, not creating registry") + return nil + } + return &Registry{ Logger: registry.Logger, PolicyTimeout: registry.PolicyTimeout, @@ -88,6 +109,7 @@ func NewActRegistry( Actions: registry.Actions, DefaultPolicy: registry.Policies[registry.DefaultPolicyName], DefaultSignal: registry.Signals[registry.DefaultPolicyName], + ActionQueue: registry.ActionQueue, } } @@ -225,32 +247,35 @@ func (r *Registry) Run( return nil, gerr.ErrActionNotExist } - // Prepend the logger to the parameters. - params = append([]sdkAct.Parameter{WithLogger(r.Logger)}, params...) - timeout := r.DefaultActionTimeout if action.Timeout > 0 { timeout = time.Duration(action.Timeout) * time.Second } - var ctx context.Context - var cancel context.CancelFunc - // if timeout is zero, then the context should not have timeout - if timeout > 0 { - ctx, cancel = context.WithTimeout(context.Background(), timeout) - } else { - ctx, cancel = context.WithCancel(context.Background()) - } + // If the action is synchronous, run it and return the result immediately. if action.Sync { + // Prepend the logger to the parameters. + params = append([]sdkAct.Parameter{WithLogger(r.Logger)}, params...) + + var ctx context.Context + var cancel context.CancelFunc + // if timeout is zero, then the context should not have timeout + if timeout > 0 { + ctx, cancel = context.WithTimeout(context.Background(), timeout) + } else { + ctx, cancel = context.WithCancel(context.Background()) + } defer cancel() return runActionWithTimeout(ctx, action, output, params, r.Logger) } - // Run the action asynchronously. - go func() { - defer cancel() - _, _ = runActionWithTimeout(ctx, action, output, params, r.Logger) - }() + if err := r.ActionQueue.Queue(&asyncActionMessage{ + Output: output, + Params: params, + }); err != nil { + return nil, gerr.ErrAsyncQueueFailed + } + return nil, gerr.ErrAsyncAction } @@ -261,6 +286,12 @@ func runActionWithTimeout( params []sdkAct.Parameter, logger zerolog.Logger, ) (any, *gerr.GatewayDError) { + defer func() { + // recover from panic if one occurred. Set err to nil otherwise. + if recover() != nil { + logger.Error().Str("action", action.Name).Msg("Action panicked") + } + }() execMode := "sync" if !action.Sync { execMode = "async" diff --git a/act/registry_test.go b/act/registry_test.go index c5cc3e7c..1991e626 100644 --- a/act/registry_test.go +++ b/act/registry_test.go @@ -8,6 +8,7 @@ import ( sdkAct "github.com/gatewayd-io/gatewayd-plugin-sdk/act" "github.com/gatewayd-io/gatewayd/config" gerr "github.com/gatewayd-io/gatewayd/errors" + "github.com/golang-queue/queue" "github.com/rs/zerolog" "github.com/spf13/cast" "github.com/stretchr/testify/assert" @@ -27,6 +28,7 @@ func Test_NewRegistry(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.NotNil(t, actRegistry) assert.NotNil(t, actRegistry.Signals) @@ -89,6 +91,7 @@ func Test_NewRegistry_NilPolicy(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.Nil(t, actRegistry) assert.Contains(t, buf.String(), "Policy is nil, not adding") @@ -110,6 +113,7 @@ func Test_NewRegistry_NilAction(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.Nil(t, actRegistry) assert.Contains(t, buf.String(), "Action is nil, not adding") @@ -126,6 +130,7 @@ func Test_Add(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: zerolog.Logger{}, + ActionQueue: &queue.Queue{}, }) assert.NotNil(t, actRegistry) @@ -151,6 +156,7 @@ func Test_Add_NilPolicy(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.NotNil(t, actRegistry) @@ -173,6 +179,7 @@ func Test_Add_ExistentPolicy(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.NotNil(t, actRegistry) @@ -193,6 +200,7 @@ func Test_Apply(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: zerolog.Logger{}, + ActionQueue: &queue.Queue{}, }) assert.NotNil(t, actRegistry) @@ -222,6 +230,7 @@ func Test_Apply_NoSignals(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.NotNil(t, actRegistry) @@ -268,6 +277,7 @@ func Test_Apply_ContradictorySignals(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.NotNil(t, actRegistry) @@ -313,6 +323,7 @@ func Test_Apply_ActionNotMatched(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.NotNil(t, actRegistry) @@ -346,6 +357,7 @@ func Test_Apply_PolicyNotMatched(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.NotNil(t, actRegistry) @@ -394,6 +406,7 @@ func Test_Apply_NonBoolPolicy(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.NotNil(t, actRegistry) @@ -442,6 +455,7 @@ func Test_Apply_BadPolicy(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.Nil(t, actRegistry) } @@ -459,6 +473,7 @@ func Test_Run(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.NotNil(t, actRegistry) @@ -484,6 +499,7 @@ func Test_Run_Terminate(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.NotNil(t, actRegistry) @@ -508,6 +524,15 @@ func Test_Run_Terminate(t *testing.T) { func Test_Run_Async(t *testing.T) { out := bytes.Buffer{} logger := zerolog.New(&out) + worker := NewActWorker( + Worker{ + Logger: logger, + Actions: BuiltinActions(), + DefaultActionTimeout: config.DefaultActionTimeout, + }, + ) + + workerQueue := queue.NewPool(1, queue.WithFn(worker.RunFunc())) actRegistry := NewActRegistry( Registry{ Signals: BuiltinSignals(), @@ -517,6 +542,7 @@ func Test_Run_Async(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: workerQueue, }) assert.NotNil(t, actRegistry) @@ -542,7 +568,7 @@ func Test_Run_Async(t *testing.T) { assert.Equal(t, err, gerr.ErrAsyncAction, "expected async action sentinel error") assert.Nil(t, result, "expected nil result") - time.Sleep(time.Millisecond) // wait for async action to complete + time.Sleep(time.Millisecond * 2000) // wait for async action to complete // The following is the expected log output from running the async action. assert.Contains(t, out.String(), "{\"level\":\"debug\",\"action\":\"log\",\"executionMode\":\"async\",\"message\":\"Running action\"}") //nolint:lll @@ -563,6 +589,7 @@ func Test_Run_NilOutput(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.NotNil(t, actRegistry) @@ -585,6 +612,7 @@ func Test_Run_ActionNotExist(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) assert.NotNil(t, actRegistry) @@ -635,6 +663,15 @@ func Test_Run_Timeout(t *testing.T) { name, actions, signals, policies := createWaitActEntities(isAsync) out := bytes.Buffer{} logger := zerolog.New(&out) + worker := NewActWorker( + Worker{ + Logger: logger, + Actions: actions, + DefaultActionTimeout: test.timeout, + }, + ) + + workerQueue := queue.NewPool(1, queue.WithFn(worker.RunFunc())) actRegistry := NewActRegistry( Registry{ Signals: signals, @@ -644,6 +681,7 @@ func Test_Run_Timeout(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: test.timeout, Logger: logger, + ActionQueue: workerQueue, }) assert.NotNil(t, actRegistry) @@ -673,7 +711,7 @@ func Test_Run_Timeout(t *testing.T) { assert.Equal(t, test.expectedResult, result) if isAsync { - time.Sleep(3 * time.Millisecond) + time.Sleep(2000 * time.Millisecond) } if test.expectedLog != "" { assert.Contains(t, out.String(), test.expectedLog) diff --git a/act/worker.go b/act/worker.go new file mode 100644 index 00000000..3385abed --- /dev/null +++ b/act/worker.go @@ -0,0 +1,115 @@ +package act + +import ( + "context" + "encoding/json" + "fmt" + "time" + + sdkAct "github.com/gatewayd-io/gatewayd-plugin-sdk/act" + gerr "github.com/gatewayd-io/gatewayd/errors" + "github.com/golang-queue/queue/core" + "github.com/rs/zerolog" +) + +type IWorker interface { + RunFunc() func(ctx context.Context, m core.QueuedMessage) error +} + +// Worker keeps track of all policies and actions. +type Worker struct { + Logger zerolog.Logger + DefaultActionTimeout time.Duration + + Actions map[string]*sdkAct.Action +} + +var _ IWorker = (*Worker)(nil) + +// NewActWorker creates a new act worker. +func NewActWorker( + worker Worker, +) *Worker { + if worker.Actions == nil { + worker.Logger.Warn().Msg("Builtin actions are nil") + return nil + } + + for _, action := range worker.Actions { + if action == nil { + worker.Logger.Warn().Msg("Action is nil, not adding") + return nil + } + worker.Logger.Debug().Str("name", action.Name).Msg("Registered builtin action") + } + + return &Worker{ + Logger: worker.Logger, + Actions: worker.Actions, + DefaultActionTimeout: worker.DefaultActionTimeout, + } +} + +func (w *Worker) RunFunc() func(ctx context.Context, m core.QueuedMessage) error { + return func(ctx context.Context, m core.QueuedMessage) error { + actionMessage, ok := m.(*asyncActionMessage) + if !ok { + if err := json.Unmarshal(m.Bytes(), &actionMessage); err != nil { + return fmt.Errorf("failed to unmarshal action message message: %w", err) + } + } + res, err := w.runAction(ctx, actionMessage.Output, actionMessage.Params...) + if err == nil { + w.Logger.Debug().Interface("result", res).Msg("Action ran successfully") + } + return err + } +} + +// Run runs the function associated with the output.MatchedPolicy and +// returns its result. +func (w *Worker) runAction( + ctx context.Context, + output *sdkAct.Output, params ...sdkAct.Parameter, +) (any, *gerr.GatewayDError) { + // In certain cases, the output may be nil, for example, if the policy + // evaluation fails. In this case, the run is aborted. + if output == nil { + // This should never happen, since the output is always set by the registry + // to be the default policy if no signals are provided. + w.Logger.Debug().Msg("Output is nil, run aborted") + return nil, gerr.ErrNilPointer + } + + action, ok := w.Actions[output.MatchedPolicy] + if !ok { + w.Logger.Warn().Str("matchedPolicy", output.MatchedPolicy).Msg( + "Action does not exist, run aborted") + return nil, gerr.ErrActionNotExist + } + + // Prepend the logger to the parameters. + params = append([]sdkAct.Parameter{WithLogger(w.Logger)}, params...) + + timeout := w.DefaultActionTimeout + if action.Timeout > 0 { + timeout = time.Duration(action.Timeout) * time.Second + } + + // If the action is asynchronous, run it and return the result immediately. + if action.Sync { + w.Logger.Warn().Str("action", action.Name).Msg("Action is synchronous, run aborted") + return nil, gerr.ErrSyncActionInQueue + } + + var ctxWithTimeout context.Context + var cancel context.CancelFunc + // if timeout is zero, then the context should not have timeout + if timeout > 0 { + ctxWithTimeout, cancel = context.WithTimeout(ctx, timeout) + } else { + ctxWithTimeout, cancel = context.WithCancel(ctx) + } + defer cancel() + return runActionWithTimeout(ctxWithTimeout, action, output, params, w.Logger) +} diff --git a/api/api_test.go b/api/api_test.go index 35b0f5dd..6c755c0b 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -13,6 +13,7 @@ import ( "github.com/gatewayd-io/gatewayd/network" "github.com/gatewayd-io/gatewayd/plugin" "github.com/gatewayd-io/gatewayd/pool" + "github.com/golang-queue/queue" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -128,6 +129,7 @@ func TestGetPlugins(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: zerolog.Logger{}, + ActionQueue: &queue.Queue{}, }) pluginRegistry := plugin.NewRegistry( context.TODO(), @@ -182,6 +184,7 @@ func TestGetPluginsWithEmptyPluginRegistry(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: zerolog.Logger{}, + ActionQueue: &queue.Queue{}, }) pluginRegistry := plugin.NewRegistry( context.TODO(), @@ -303,6 +306,7 @@ func TestGetServers(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: zerolog.Logger{}, + ActionQueue: &queue.Queue{}, }) pluginRegistry := plugin.NewRegistry( diff --git a/cmd/run.go b/cmd/run.go index 686e8103..ecff9652 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -33,6 +33,7 @@ import ( usage "github.com/gatewayd-io/gatewayd/usagereport/v1" "github.com/getsentry/sentry-go" "github.com/go-co-op/gocron" + "github.com/golang-queue/queue" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" @@ -283,6 +284,20 @@ var runCmd = &cobra.Command{ logger.Warn().Msg( "Running GatewayD in development mode (not recommended for production)") } + worker := act.NewActWorker( + act.Worker{ + Logger: logger, + Actions: act.BuiltinActions(), + DefaultActionTimeout: conf.Plugin.ActionTimeout, + }, + ) + + workerQueue := queue.NewPool(1, queue.WithFn(worker.RunFunc())) + + if workerQueue == nil { + logger.Error().Msg("Failed to create worker queue") + os.Exit(gerr.FailedToCreateWorkerQueue) + } // Create a new act registry given the built-in signals, policies, and actions. actRegistry = act.NewActRegistry( @@ -294,6 +309,7 @@ var runCmd = &cobra.Command{ PolicyTimeout: conf.Plugin.PolicyTimeout, DefaultActionTimeout: conf.Plugin.ActionTimeout, Logger: logger, + ActionQueue: workerQueue, }) if actRegistry == nil { diff --git a/errors/errors.go b/errors/errors.go index 8078a6eb..161e3cb2 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -49,6 +49,7 @@ const ( ErrCodeEvalError ErrCodeMsgEncodeError ErrCodeConfigParseError + ErrCodeAsyncQueueFailed ) var ( @@ -178,6 +179,12 @@ var ( ErrCodeConfigParseError, "error parsing config", nil, } + ErrAsyncQueueFailed = &GatewayDError{ + ErrCodeAsyncQueueFailed, "async queue failed", nil, + } + ErrSyncActionInQueue = &GatewayDError{ + ErrCodeRunError, "sync action in async queue", nil, + } // Unwrapped errors. ErrLoggerRequired = errors.New("terminate action requires a logger parameter") ) @@ -188,4 +195,5 @@ const ( FailedToStartServer = 3 FailedToStartTracer = 4 FailedToCreateActRegistry = 5 + FailedToCreateWorkerQueue = 6 ) diff --git a/go.mod b/go.mod index 7b4b13af..ef61b96d 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/gatewayd-io/gatewayd-plugin-sdk v0.2.10 github.com/getsentry/sentry-go v0.27.0 github.com/go-co-op/gocron v1.37.0 + github.com/golang-queue/queue v0.2.0 github.com/google/go-github/v53 v53.2.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 github.com/hashicorp/go-hclog v1.6.3 @@ -61,6 +62,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/yamux v0.1.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/jpillora/backoff v1.0.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/go.sum b/go.sum index 4ce55244..47070c89 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/appleboy/com v0.1.7 h1:4lYTFNoMAAXGGIC8lDxVg/NY+1aXbYqfAWN05cZhd0M= +github.com/appleboy/com v0.1.7/go.mod h1:JUK+oH0SXCLRH57pDMJx6VWVsm8CPdajalmRSWwamBE= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= @@ -111,8 +113,12 @@ github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr6 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-queue/queue v0.2.0 h1:R0INU16rLCzYmc5h9wqHI/6owNxqcRVVMd5gyKVmnfU= +github.com/golang-queue/queue v0.2.0/go.mod h1:5nEkJTzw9Boc8ZCylQlrJK5f/Vd8Uo58yAssRli5ckg= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -215,6 +221,7 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= diff --git a/network/proxy_test.go b/network/proxy_test.go index 1e183e90..c463dc7d 100644 --- a/network/proxy_test.go +++ b/network/proxy_test.go @@ -10,6 +10,7 @@ import ( "github.com/gatewayd-io/gatewayd/logging" "github.com/gatewayd-io/gatewayd/plugin" "github.com/gatewayd-io/gatewayd/pool" + "github.com/golang-queue/queue" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" ) @@ -54,6 +55,7 @@ func TestNewProxy(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) // Create a proxy with a fixed buffer newPool @@ -110,6 +112,7 @@ func BenchmarkNewProxy(b *testing.B) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) // Create a proxy with a fixed buffer newPool @@ -169,6 +172,7 @@ func BenchmarkProxyConnectDisconnect(b *testing.B) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) // Create a proxy with a fixed buffer newPool @@ -235,6 +239,7 @@ func BenchmarkProxyPassThrough(b *testing.B) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) // Create a proxy with a fixed buffer newPool @@ -306,6 +311,7 @@ func BenchmarkProxyIsHealthyAndIsExhausted(b *testing.B) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) // Create a proxy with a fixed buffer newPool @@ -375,6 +381,7 @@ func BenchmarkProxyAvailableAndBusyConnectionsString(b *testing.B) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) // Create a proxy with a fixed buffer newPool diff --git a/network/server_test.go b/network/server_test.go index b090e126..e46d2d3a 100644 --- a/network/server_test.go +++ b/network/server_test.go @@ -16,6 +16,7 @@ import ( "github.com/gatewayd-io/gatewayd/logging" "github.com/gatewayd-io/gatewayd/plugin" "github.com/gatewayd-io/gatewayd/pool" + "github.com/golang-queue/queue" "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -48,6 +49,7 @@ func TestRunServer(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) pluginRegistry := plugin.NewRegistry( context.Background(), diff --git a/plugin/plugin_registry_test.go b/plugin/plugin_registry_test.go index 4a33d910..ba5b4e1a 100644 --- a/plugin/plugin_registry_test.go +++ b/plugin/plugin_registry_test.go @@ -10,6 +10,7 @@ import ( "github.com/gatewayd-io/gatewayd/act" "github.com/gatewayd-io/gatewayd/config" "github.com/gatewayd-io/gatewayd/logging" + "github.com/golang-queue/queue" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "google.golang.org/grpc" @@ -35,6 +36,7 @@ func NewPluginRegistry(t *testing.T) *Registry { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) reg := NewRegistry( context.Background(), @@ -152,6 +154,7 @@ func BenchmarkHookRun(b *testing.B) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) reg := NewRegistry( diff --git a/plugin/utils_test.go b/plugin/utils_test.go index 690c492e..ebd31ebc 100644 --- a/plugin/utils_test.go +++ b/plugin/utils_test.go @@ -7,6 +7,7 @@ import ( sdkAct "github.com/gatewayd-io/gatewayd-plugin-sdk/act" "github.com/gatewayd-io/gatewayd/act" "github.com/gatewayd-io/gatewayd/config" + "github.com/golang-queue/queue" "github.com/rs/zerolog" "github.com/spf13/cast" "github.com/stretchr/testify/assert" @@ -99,6 +100,7 @@ func Test_applyPolicies(t *testing.T) { PolicyTimeout: config.DefaultPolicyTimeout, DefaultActionTimeout: config.DefaultActionTimeout, Logger: logger, + ActionQueue: &queue.Queue{}, }) output := applyPolicies(