Skip to content

Commit

Permalink
feat: add timeout for running actions (#496)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hamsajj authored Mar 14, 2024
1 parent e895b0d commit 2060af4
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 66 deletions.
51 changes: 51 additions & 0 deletions act/act_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package act

import (
"time"

sdkAct "github.com/gatewayd-io/gatewayd-plugin-sdk/act"
)

func createWaitActEntities(async bool) (
string,
map[string]*sdkAct.Action,
map[string]*sdkAct.Signal,
map[string]*sdkAct.Policy,
) {
name := "waitSync"
if async {
name = "waitAsync"
}
actions := map[string]*sdkAct.Action{
name: {
Name: name,
Metadata: nil,
Sync: !async,
Terminal: false,
Run: func(_ map[string]any, _ ...sdkAct.Parameter) (any, error) {
time.Sleep(1 * time.Second)
return true, nil
},
},
}
signals := map[string]*sdkAct.Signal{
name: {
Name: name,
Metadata: map[string]any{
"log": true,
"level": "info",
"message": "test",
"async": async,
},
},
}
policy := map[string]*sdkAct.Policy{
name: sdkAct.MustNewPolicy(
name,
`true`,
map[string]any{"log": "enabled"},
),
}

return name, actions, signals, policy
}
94 changes: 62 additions & 32 deletions act/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type Registry struct {
logger zerolog.Logger
// Timeout for policy evaluation.
policyTimeout time.Duration
// Default timeout for running actions
defaultActionTimeout time.Duration

Signals map[string]*sdkAct.Signal
Policies map[string]*sdkAct.Policy
Expand All @@ -41,6 +43,7 @@ func NewActRegistry(
builtinActions map[string]*sdkAct.Action,
defaultPolicy string,
policyTimeout time.Duration,
defaultActionTimeout time.Duration,
logger zerolog.Logger,
) *Registry {
if builtinSignals == nil || builtinsPolicies == nil || builtinActions == nil {
Expand Down Expand Up @@ -82,13 +85,14 @@ func NewActRegistry(
logger.Debug().Str("name", defaultPolicy).Msg("Using default policy")

return &Registry{
logger: logger,
policyTimeout: policyTimeout,
Signals: builtinSignals,
Policies: builtinsPolicies,
Actions: builtinActions,
DefaultPolicy: builtinsPolicies[defaultPolicy],
DefaultSignal: builtinSignals[defaultPolicy],
logger: logger,
policyTimeout: policyTimeout,
defaultActionTimeout: defaultActionTimeout,
Signals: builtinSignals,
Policies: builtinsPolicies,
Actions: builtinActions,
DefaultPolicy: builtinsPolicies[defaultPolicy],
DefaultSignal: builtinSignals[defaultPolicy],
}
}

Expand Down Expand Up @@ -229,41 +233,67 @@ func (r *Registry) Run(
// Prepend the logger to the parameters.
params = append([]sdkAct.Parameter{WithLogger(r.logger)}, params...)

timeout := r.defaultActionTimeout
if action.Timeout > 0 {
timeout = time.Duration(action.Timeout) * time.Second
}
var ctx context.Context
var cancel context.CancelFunc
// if timeout is zero, then the context should not have timeout
if timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), timeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
// If the action is synchronous, run it and return the result immediately.
if action.Sync {
r.logger.Debug().Fields(map[string]interface{}{
"executionMode": "sync",
"action": action.Name,
}).Msgf("Running action")

output, err := action.Run(output.Metadata, params...)
if err != nil {
r.logger.Error().Err(err).Str("action", action.Name).Msg("Error running action")
return nil, gerr.ErrRunningAction.Wrap(err)
}
return output, nil
defer cancel()
return runActionWithTimeout(ctx, action, output, params, r.logger)
}

r.logger.Debug().Fields(map[string]interface{}{
"executionMode": "async",
// Run the action asynchronously.
go func() {
defer cancel()
_, _ = runActionWithTimeout(ctx, action, output, params, r.logger)
}()
return nil, gerr.ErrAsyncAction
}

func runActionWithTimeout(
ctx context.Context,
action *sdkAct.Action,
output *sdkAct.Output,
params []sdkAct.Parameter,
logger zerolog.Logger,
) (any, *gerr.GatewayDError) {
execMode := "sync"
if !action.Sync {
execMode = "async"
}
logger.Debug().Fields(map[string]interface{}{
"executionMode": execMode,
"action": action.Name,
}).Msgf("Running action")
outputChan := make(chan any)
errChan := make(chan *gerr.GatewayDError)

// Run the action asynchronously.
// TODO: Add a way to cancel the action.
go func(
action *sdkAct.Action,
output *sdkAct.Output,
params []sdkAct.Parameter,
logger zerolog.Logger,
) {
_, err := action.Run(output.Metadata, params...)
go func() {
actionOutput, err := action.Run(output.Metadata, params...)
if err != nil {
logger.Error().Err(err).Str("action", action.Name).Msg("Error running action")
errChan <- gerr.ErrRunningAction.Wrap(err)
}
}(action, output, params, r.logger)

return nil, gerr.ErrAsyncAction
outputChan <- actionOutput
}()
select {
case <-ctx.Done():
logger.Error().Str("action", action.Name).Msg("Action timed out")
return nil, gerr.ErrRunningActionTimeout
case actionOutput := <-outputChan:
return actionOutput, nil
case err := <-errChan:
return nil, err
}
}

// WithLogger returns a parameter with the logger to be used by the action.
Expand Down
Loading

0 comments on commit 2060af4

Please sign in to comment.