Skip to content

Commit

Permalink
Merge pull request #272 from nyaruka/engine_config
Browse files Browse the repository at this point in the history
Configurable limit on webhook response bodies
  • Loading branch information
rowanseymour authored Apr 25, 2018
2 parents ac2cd74 + cb772d4 commit 14f5007
Show file tree
Hide file tree
Showing 18 changed files with 306 additions and 113 deletions.
4 changes: 2 additions & 2 deletions cmd/flowrunner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func main() {
la, _ := time.LoadLocation("America/Los_Angeles")
env := utils.NewEnvironment(utils.DateFormatYearMonthDay, utils.TimeFormatHourMinute, la, utils.LanguageList{})

session := engine.NewSession(assetCache, assets.NewMockAssetServer(), httpClient)
session := engine.NewSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), httpClient)

contactJSON, err := ioutil.ReadFile(*contactFile)
if err != nil {
Expand Down Expand Up @@ -226,7 +226,7 @@ func main() {
callerEvents = append(callerEvents, []flows.Event{event})

// rebuild our session
session, err = engine.ReadSession(assetCache, assets.NewMockAssetServer(), httpClient, outJSON)
session, err = engine.ReadSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), httpClient, outJSON)
if err != nil {
log.Fatalf("Error unmarshalling output: %s", err)
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/flowrunner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func runFlow(assetsFilename string, triggerEnvelope *utils.TypedEnvelope, caller
return runResult{}, fmt.Errorf("Error reading test assets '%s': %s", assetsFilename, err)
}

session := engine.NewSession(assetCache, assets.NewMockAssetServer(), test.TestHTTPClient)
session := engine.NewSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient)

trigger, err := triggers.ReadTrigger(session, triggerEnvelope)
if err != nil {
Expand All @@ -119,7 +119,7 @@ func runFlow(assetsFilename string, triggerEnvelope *utils.TypedEnvelope, caller
}
outputs = append(outputs, &Output{outJSON, marshalEventLog(session.Events())})

session, err = engine.ReadSession(assetCache, assets.NewMockAssetServer(), test.TestHTTPClient, outJSON)
session, err = engine.ReadSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient, outJSON)
if err != nil {
return runResult{}, fmt.Errorf("Error marshalling output: %s", err)
}
Expand Down Expand Up @@ -221,10 +221,10 @@ func TestFlows(t *testing.T) {
actualOutput := runResult.outputs[i]
expectedOutput := expectedOutputs[i]

actualSession, err := engine.ReadSession(runResult.assetCache, assets.NewMockAssetServer(), test.TestHTTPClient, actualOutput.Session)
actualSession, err := engine.ReadSession(runResult.assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient, actualOutput.Session)
require.NoError(t, err, "Error unmarshalling session running flow '%s': %s\n", tc.assets, err)

expectedSession, err := engine.ReadSession(runResult.assetCache, assets.NewMockAssetServer(), test.TestHTTPClient, expectedOutput.Session)
expectedSession, err := engine.ReadSession(runResult.assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient, expectedOutput.Session)
require.NoError(t, err, "Error unmarshalling expected session running flow '%s': %s\n", tc.assets, err)

// number of runs should be the same
Expand Down
1 change: 0 additions & 1 deletion cmd/flowrunner/testdata/flows/all_actions_test.json
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,6 @@
"status": "completed",
"uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5",
"webhook": {
"body": "{ \"ok\": \"true\" }",
"request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n",
"response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }",
"status": "success",
Expand Down
1 change: 0 additions & 1 deletion cmd/flowrunner/testdata/flows/two_questions_test.json
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,6 @@
"status": "completed",
"uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5",
"webhook": {
"body": "{ \"ok\": \"true\" }",
"request": "POST /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nContent-Length: 69\r\nAccept-Encoding: gzip\r\n\r\n{ \"contact\": \"ba96bf7f-bc2a-4873-a7c7-254d1927c4e3\", \"soda\": \"Coke\" }",
"response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }",
"status": "success",
Expand Down
2 changes: 0 additions & 2 deletions cmd/flowrunner/testdata/flows/webhook_persists_test.json
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@
"status": "waiting",
"uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5",
"webhook": {
"body": "{ \"ok\": \"true\" }",
"request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n",
"response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }",
"status": "success",
Expand Down Expand Up @@ -400,7 +399,6 @@
"status": "completed",
"uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5",
"webhook": {
"body": "{ \"ok\": \"true\" }",
"request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n",
"response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }",
"status": "success",
Expand Down
37 changes: 23 additions & 14 deletions cmd/flowserver/config.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
package main

import "github.com/nyaruka/ezconf"
import (
"github.com/nyaruka/ezconf"
"github.com/nyaruka/goflow/flows"
)

// Config is our top level config for our flowserver
type Config struct {
Port int `help:"the port we will run on"`
LogLevel string `help:"the logging level to use"`
Static string `help:""`
AssetCacheSize int64 `help:"the maximum size of our asset cache"`
AssetCachePrune int `help:"the number of assets to prune when we reach our max size"`
AssetServerToken string `help:"the token to use when authentication to the asset server"`
Version string `help:"the version to use in request and response headers"`
Port int `help:"the port we will run on"`
LogLevel string `help:"the logging level to use"`
Static string `help:""`
AssetCacheSize int64 `help:"the maximum size of our asset cache"`
AssetCachePrune int `help:"the number of assets to prune when we reach our max size"`
AssetServerToken string `help:"the token to use when authentication to the asset server"`
EngineMaxWebhookResponseBytes int `help:"the maximum allowed byte size of webhook responses"`
Version string `help:"the version to use in request and response headers"`
}

func (c *Config) MaxWebhookResponseBytes() int { return c.EngineMaxWebhookResponseBytes }

// NewDefaultConfig returns our default configuration
func NewDefaultConfig() *Config {
return &Config{
Port: 8800,
LogLevel: "info",
AssetCacheSize: 1000,
AssetCachePrune: 100,
AssetServerToken: "missing_temba_token",
Version: "Dev",
Port: 8800,
LogLevel: "info",
AssetCacheSize: 1000,
AssetCachePrune: 100,
AssetServerToken: "missing_temba_token",
EngineMaxWebhookResponseBytes: 10000,
Version: "Dev",
}
}

Expand All @@ -36,3 +43,5 @@ func NewConfigWithPath(path string) *Config {
loader.MustLoad()
return config
}

var _ flows.EngineConfig = (*Config)(nil)
4 changes: 2 additions & 2 deletions cmd/flowserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (s *FlowServer) handleStart(w http.ResponseWriter, r *http.Request) (interf
}

// build our session
session := engine.NewSession(s.assetCache, assetServer, s.httpClient)
session := engine.NewSession(s.assetCache, assetServer, s.config, s.httpClient)

// read our trigger
trigger, err := triggers.ReadTrigger(session, start.Trigger)
Expand Down Expand Up @@ -236,7 +236,7 @@ func (s *FlowServer) handleResume(w http.ResponseWriter, r *http.Request) (inter
}

// read our session
session, err := engine.ReadSession(s.assetCache, assetServer, s.httpClient, resume.Session)
session, err := engine.ReadSession(s.assetCache, assetServer, s.config, s.httpClient, resume.Session)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/flowserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (ts *ServerTestSuite) parseSessionResponse(assetCache *assets.AssetCache, b
err := json.Unmarshal(body, &envelope)
ts.Require().NoError(err)

session, err := engine.ReadSession(assetCache, ts.assetServer, test.TestHTTPClient, envelope.Session)
session, err := engine.ReadSession(assetCache, ts.assetServer, engine.NewDefaultConfig(), test.TestHTTPClient, envelope.Session)
ts.Require().NoError(err)

return session, envelope.Log
Expand Down
6 changes: 4 additions & 2 deletions flows/actions/call_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ func (a *CallWebhookAction) Execute(run flows.FlowRun, step flows.Step, log flow
}

webhook, err := flows.MakeWebhookCall(run.Session(), req)

if err != nil {
log.Add(events.NewErrorEvent(err))
} else {
log.Add(events.NewWebhookCalledEvent(webhook.URL(), webhook.Status(), webhook.StatusCode(), webhook.Request(), webhook.Response()))
}
run.SetWebhook(webhook)

log.Add(events.NewWebhookCalledEvent(webhook.URL(), webhook.Status(), webhook.StatusCode(), webhook.Request(), webhook.Response()))
run.SetWebhook(webhook)
return nil
}
2 changes: 1 addition & 1 deletion flows/definition/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestFlowValidation(t *testing.T) {
err = assetCache.Include(assetsJSON)
assert.NoError(t, err)

session := engine.NewSession(assetCache, assets.NewMockAssetServer(), test.TestHTTPClient)
session := engine.NewSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient)
flow, err := session.Assets().GetFlow("76f0a02f-3b75-4b86-9064-e9195e1b3a02")
assert.NoError(t, err)

Expand Down
17 changes: 17 additions & 0 deletions flows/engine/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package engine

import (
"github.com/nyaruka/goflow/flows"
)

// the configuration options for the flow engine
type config struct {
maxWebhookResponseBytes int
}

// NewDefaultConfig returns the default engine configuration
func NewDefaultConfig() flows.EngineConfig {
return &config{maxWebhookResponseBytes: 10000}
}

func (c *config) MaxWebhookResponseBytes() int { return c.maxWebhookResponseBytes }
28 changes: 16 additions & 12 deletions flows/engine/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,22 @@ type session struct {
pushedFlow *pushedFlow
flowStack *flowStack
newEvents []flows.Event
httpClient *utils.HTTPClient

engineConfig flows.EngineConfig
httpClient *utils.HTTPClient
}

// NewSession creates a new session
func NewSession(assetCache *assets.AssetCache, assetServer assets.AssetServer, httpClient *utils.HTTPClient) flows.Session {
func NewSession(assetCache *assets.AssetCache, assetServer assets.AssetServer, engineConfig flows.EngineConfig, httpClient *utils.HTTPClient) flows.Session {
return &session{
env: utils.NewDefaultEnvironment(),
assets: assets.NewSessionAssets(assetCache, assetServer),
status: flows.SessionStatusActive,
newEvents: []flows.Event{},
runsByUUID: make(map[flows.RunUUID]flows.FlowRun),
flowStack: newFlowStack(),
httpClient: httpClient,
env: utils.NewDefaultEnvironment(),
assets: assets.NewSessionAssets(assetCache, assetServer),
status: flows.SessionStatusActive,
newEvents: []flows.Event{},
runsByUUID: make(map[flows.RunUUID]flows.FlowRun),
flowStack: newFlowStack(),
engineConfig: engineConfig,
httpClient: httpClient,
}
}

Expand Down Expand Up @@ -118,7 +121,8 @@ func (s *session) LogEvent(event flows.Event) {
}
func (s *session) Events() []flows.Event { return s.newEvents }

func (s *session) HTTPClient() *utils.HTTPClient { return s.httpClient }
func (s *session) EngineConfig() flows.EngineConfig { return s.engineConfig }
func (s *session) HTTPClient() *utils.HTTPClient { return s.httpClient }

//------------------------------------------------------------------------------------------
// Flow execution
Expand Down Expand Up @@ -483,15 +487,15 @@ type sessionEnvelope struct {
}

// ReadSession decodes a session from the passed in JSON
func ReadSession(assetCache *assets.AssetCache, assetServer assets.AssetServer, httpClient *utils.HTTPClient, data json.RawMessage) (flows.Session, error) {
func ReadSession(assetCache *assets.AssetCache, assetServer assets.AssetServer, engineConfig flows.EngineConfig, httpClient *utils.HTTPClient, data json.RawMessage) (flows.Session, error) {
var envelope sessionEnvelope
var err error

if err = utils.UnmarshalAndValidate(data, &envelope, "session"); err != nil {
return nil, err
}

s := NewSession(assetCache, assetServer, httpClient).(*session)
s := NewSession(assetCache, assetServer, engineConfig, httpClient).(*session)
s.status = envelope.Status

// read our environment
Expand Down
8 changes: 7 additions & 1 deletion flows/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,13 @@ type Step interface {
Leave(ExitUUID)
}

type EngineConfig interface {
MaxWebhookResponseBytes() int
}

// Session represents the session of a flow run which may contain many runs
type Session interface {
Assets() SessionAssets
HTTPClient() *utils.HTTPClient

Environment() utils.Environment
SetEnvironment(utils.Environment)
Expand All @@ -366,6 +369,9 @@ type Session interface {

Events() []Event
LogEvent(Event)

EngineConfig() EngineConfig
HTTPClient() *utils.HTTPClient
}

// RunSummary represents the minimum information available about all runs (current or related) and is the
Expand Down
Loading

0 comments on commit 14f5007

Please sign in to comment.