From 82f4eee91f02c643ad67610d885dcbdfd08a4402 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Tue, 24 Apr 2018 11:27:58 -0500 Subject: [PATCH 1/6] Webhook calls should limit bytes read from response bodies based on configurable number of bytes --- cmd/flowrunner/main.go | 4 +- cmd/flowrunner/runner_test.go | 8 +- .../testdata/flows/all_actions_test.json | 6 +- .../testdata/flows/two_questions_test.json | 6 +- .../testdata/flows/webhook_persists_test.json | 10 +-- cmd/flowserver/config.go | 38 ++++++---- cmd/flowserver/server.go | 16 ++-- cmd/flowserver/server_test.go | 2 +- docs/docs.md | 2 +- docs/index.html | 2 +- flows/actions/call_webhook.go | 6 +- flows/definition/flow_test.go | 2 +- flows/engine/config.go | 22 ++++++ flows/engine/session.go | 28 ++++--- flows/interfaces.go | 8 +- flows/webhook.go | 75 +++++++++++-------- flowserver.toml | 5 +- test/session.go | 2 +- 18 files changed, 152 insertions(+), 90 deletions(-) create mode 100644 flows/engine/config.go diff --git a/cmd/flowrunner/main.go b/cmd/flowrunner/main.go index 6296d31e2..32afaef65 100644 --- a/cmd/flowrunner/main.go +++ b/cmd/flowrunner/main.go @@ -170,7 +170,7 @@ func main() { la, _ := time.LoadLocation("America/Los_Angeles") env := utils.NewEnvironment(utils.DateFormatYearMonthDay, utils.TimeFormatHourMinute, la, utils.LanguageList{}) - session := engine.NewSession(assetCache, assets.NewMockAssetServer(), httpClient) + session := engine.NewSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), httpClient) contactJSON, err := ioutil.ReadFile(*contactFile) if err != nil { @@ -226,7 +226,7 @@ func main() { callerEvents = append(callerEvents, []flows.Event{event}) // rebuild our session - session, err = engine.ReadSession(assetCache, assets.NewMockAssetServer(), httpClient, outJSON) + session, err = engine.ReadSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), httpClient, outJSON) if err != nil { log.Fatalf("Error unmarshalling output: %s", err) } diff --git a/cmd/flowrunner/runner_test.go b/cmd/flowrunner/runner_test.go index 4218e7301..2dbb3f4bb 100644 --- a/cmd/flowrunner/runner_test.go +++ b/cmd/flowrunner/runner_test.go @@ -96,7 +96,7 @@ func runFlow(assetsFilename string, triggerEnvelope *utils.TypedEnvelope, caller return runResult{}, fmt.Errorf("Error reading test assets '%s': %s", assetsFilename, err) } - session := engine.NewSession(assetCache, assets.NewMockAssetServer(), test.TestHTTPClient) + session := engine.NewSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient) trigger, err := triggers.ReadTrigger(session, triggerEnvelope) if err != nil { @@ -119,7 +119,7 @@ func runFlow(assetsFilename string, triggerEnvelope *utils.TypedEnvelope, caller } outputs = append(outputs, &Output{outJSON, marshalEventLog(session.Events())}) - session, err = engine.ReadSession(assetCache, assets.NewMockAssetServer(), test.TestHTTPClient, outJSON) + session, err = engine.ReadSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient, outJSON) if err != nil { return runResult{}, fmt.Errorf("Error marshalling output: %s", err) } @@ -221,10 +221,10 @@ func TestFlows(t *testing.T) { actualOutput := runResult.outputs[i] expectedOutput := expectedOutputs[i] - actualSession, err := engine.ReadSession(runResult.assetCache, assets.NewMockAssetServer(), test.TestHTTPClient, actualOutput.Session) + actualSession, err := engine.ReadSession(runResult.assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient, actualOutput.Session) require.NoError(t, err, "Error unmarshalling session running flow '%s': %s\n", tc.assets, err) - expectedSession, err := engine.ReadSession(runResult.assetCache, assets.NewMockAssetServer(), test.TestHTTPClient, expectedOutput.Session) + expectedSession, err := engine.ReadSession(runResult.assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient, expectedOutput.Session) require.NoError(t, err, "Error unmarshalling expected session running flow '%s': %s\n", tc.assets, err) // number of runs should be the same diff --git a/cmd/flowrunner/testdata/flows/all_actions_test.json b/cmd/flowrunner/testdata/flows/all_actions_test.json index eb4570905..89a89acf9 100644 --- a/cmd/flowrunner/testdata/flows/all_actions_test.json +++ b/cmd/flowrunner/testdata/flows/all_actions_test.json @@ -282,7 +282,7 @@ { "created_on": "2000-01-01T00:00:00.000000000-00:00", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", "status": "success", "status_code": 200, "step_uuid": "692926ea-09d6-4942-bd38-d266ec8d3716", @@ -630,7 +630,7 @@ { "created_on": "2000-01-01T00:00:00.000000000-00:00", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", "status": "success", "status_code": 200, "step_uuid": "692926ea-09d6-4942-bd38-d266ec8d3716", @@ -678,7 +678,7 @@ "webhook": { "body": "{ \"ok\": \"true\" }", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", "status": "success", "status_code": 200, "url": "http://127.0.0.1:49999/?cmd=success" diff --git a/cmd/flowrunner/testdata/flows/two_questions_test.json b/cmd/flowrunner/testdata/flows/two_questions_test.json index 37c038c6a..fe00f6676 100644 --- a/cmd/flowrunner/testdata/flows/two_questions_test.json +++ b/cmd/flowrunner/testdata/flows/two_questions_test.json @@ -407,7 +407,7 @@ { "created_on": "2000-01-01T00:00:00.000000000-00:00", "request": "POST /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nContent-Length: 69\r\nAccept-Encoding: gzip\r\n\r\n{ \"contact\": \"ba96bf7f-bc2a-4873-a7c7-254d1927c4e3\", \"soda\": \"Coke\" }", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", "status": "success", "status_code": 200, "step_uuid": "970b8069-50f5-4f6f-8f41-6b2d9f33d623", @@ -557,7 +557,7 @@ { "created_on": "2000-01-01T00:00:00.000000000-00:00", "request": "POST /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nContent-Length: 69\r\nAccept-Encoding: gzip\r\n\r\n{ \"contact\": \"ba96bf7f-bc2a-4873-a7c7-254d1927c4e3\", \"soda\": \"Coke\" }", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", "status": "success", "status_code": 200, "step_uuid": "970b8069-50f5-4f6f-8f41-6b2d9f33d623", @@ -641,7 +641,7 @@ "webhook": { "body": "{ \"ok\": \"true\" }", "request": "POST /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nContent-Length: 69\r\nAccept-Encoding: gzip\r\n\r\n{ \"contact\": \"ba96bf7f-bc2a-4873-a7c7-254d1927c4e3\", \"soda\": \"Coke\" }", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", "status": "success", "status_code": 200, "url": "http://127.0.0.1:49999/?cmd=success" diff --git a/cmd/flowrunner/testdata/flows/webhook_persists_test.json b/cmd/flowrunner/testdata/flows/webhook_persists_test.json index ba16f0bcd..78acd2974 100644 --- a/cmd/flowrunner/testdata/flows/webhook_persists_test.json +++ b/cmd/flowrunner/testdata/flows/webhook_persists_test.json @@ -36,7 +36,7 @@ { "created_on": "2000-01-01T00:00:00.000000000-00:00", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", "status": "success", "status_code": 200, "step_uuid": "c34b6c7d-fa06-4563-92a3-d648ab64bccb", @@ -109,7 +109,7 @@ { "created_on": "2000-01-01T00:00:00.000000000-00:00", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", "status": "success", "status_code": 200, "step_uuid": "c34b6c7d-fa06-4563-92a3-d648ab64bccb", @@ -175,7 +175,7 @@ "webhook": { "body": "{ \"ok\": \"true\" }", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", "status": "success", "status_code": 200, "url": "http://127.0.0.1:49999/?cmd=success" @@ -288,7 +288,7 @@ { "created_on": "2000-01-01T00:00:00.000000000-00:00", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", "status": "success", "status_code": 200, "step_uuid": "c34b6c7d-fa06-4563-92a3-d648ab64bccb", @@ -402,7 +402,7 @@ "webhook": { "body": "{ \"ok\": \"true\" }", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", "status": "success", "status_code": 200, "url": "http://127.0.0.1:49999/?cmd=success" diff --git a/cmd/flowserver/config.go b/cmd/flowserver/config.go index 6ea4b4e73..4b2393e52 100644 --- a/cmd/flowserver/config.go +++ b/cmd/flowserver/config.go @@ -1,27 +1,37 @@ package main -import "github.com/nyaruka/ezconf" +import ( + "github.com/nyaruka/ezconf" + "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/goflow/flows/engine" +) // Config is our top level config for our flowserver type Config struct { - Port int `help:"the port we will run on"` - LogLevel string `help:"the logging level to use"` - Static string `help:""` - AssetCacheSize int64 `help:"the maximum size of our asset cache"` - AssetCachePrune int `help:"the number of assets to prune when we reach our max size"` - AssetServerToken string `help:"the token to use when authentication to the asset server"` - Version string `help:"the version to use in request and response headers"` + Port int `help:"the port we will run on"` + LogLevel string `help:"the logging level to use"` + Static string `help:""` + AssetCacheSize int64 `help:"the maximum size of our asset cache"` + AssetCachePrune int `help:"the number of assets to prune when we reach our max size"` + AssetServerToken string `help:"the token to use when authentication to the asset server"` + EngineMaxWebhookResponseBytes int `help:"the maximum allowed byte size of webhook responses"` + Version string `help:"the version to use in request and response headers"` +} + +func (c *Config) Engine() flows.EngineConfig { + return engine.NewConfig(c.EngineMaxWebhookResponseBytes) } // NewDefaultConfig returns our default configuration func NewDefaultConfig() *Config { return &Config{ - Port: 8800, - LogLevel: "info", - AssetCacheSize: 1000, - AssetCachePrune: 100, - AssetServerToken: "missing_temba_token", - Version: "Dev", + Port: 8800, + LogLevel: "info", + AssetCacheSize: 1000, + AssetCachePrune: 100, + AssetServerToken: "missing_temba_token", + EngineMaxWebhookResponseBytes: 10000, + Version: "Dev", } } diff --git a/cmd/flowserver/server.go b/cmd/flowserver/server.go index 82a72253c..e5d3ae82d 100644 --- a/cmd/flowserver/server.go +++ b/cmd/flowserver/server.go @@ -26,10 +26,11 @@ import ( ) type FlowServer struct { - config *Config - httpServer *http.Server - assetCache *assets.AssetCache - httpClient *utils.HTTPClient + config *Config + httpServer *http.Server + assetCache *assets.AssetCache + engineConfig flows.EngineConfig + httpClient *utils.HTTPClient } // NewFlowServer creates a new flow server instance @@ -59,7 +60,8 @@ func NewFlowServer(config *Config) *FlowServer { } s := &FlowServer{ - config: config, + config: config, + engineConfig: config.Engine(), httpServer: &http.Server{ Addr: fmt.Sprintf(":%d", config.Port), Handler: r, @@ -173,7 +175,7 @@ func (s *FlowServer) handleStart(w http.ResponseWriter, r *http.Request) (interf } // build our session - session := engine.NewSession(s.assetCache, assetServer, s.httpClient) + session := engine.NewSession(s.assetCache, assetServer, s.engineConfig, s.httpClient) // read our trigger trigger, err := triggers.ReadTrigger(session, start.Trigger) @@ -236,7 +238,7 @@ func (s *FlowServer) handleResume(w http.ResponseWriter, r *http.Request) (inter } // read our session - session, err := engine.ReadSession(s.assetCache, assetServer, s.httpClient, resume.Session) + session, err := engine.ReadSession(s.assetCache, assetServer, s.engineConfig, s.httpClient, resume.Session) if err != nil { return nil, err } diff --git a/cmd/flowserver/server_test.go b/cmd/flowserver/server_test.go index 93a6d4bc4..e0b956601 100644 --- a/cmd/flowserver/server_test.go +++ b/cmd/flowserver/server_test.go @@ -238,7 +238,7 @@ func (ts *ServerTestSuite) parseSessionResponse(assetCache *assets.AssetCache, b err := json.Unmarshal(body, &envelope) ts.Require().NoError(err) - session, err := engine.ReadSession(assetCache, ts.assetServer, test.TestHTTPClient, envelope.Session) + session, err := engine.ReadSession(assetCache, ts.assetServer, engine.NewDefaultConfig(), test.TestHTTPClient, envelope.Session) ts.Require().NoError(err) return session, envelope.Log diff --git a/docs/docs.md b/docs/docs.md index b126543e5..0eab03282 100644 --- a/docs/docs.md +++ b/docs/docs.md @@ -1888,7 +1888,7 @@ A `webhook_called` event will be created based on the results of the HTTP call. "status": "success", "status_code": 200, "request": "GET /?cmd=success HTTP/1.1\r\nHost: localhost:49998\r\nUser-Agent: goflow-testing\r\nAuthorization: Token AAFFZZHH\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }" + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n" } ``` diff --git a/docs/index.html b/docs/index.html index 6ac85d095..e2b5cd137 100644 --- a/docs/index.html +++ b/docs/index.html @@ -1262,7 +1262,7 @@

"status": "success", "status_code": 200, "request": "GET /?cmd=success HTTP/1.1\r\nHost: localhost:49998\r\nUser-Agent: goflow-testing\r\nAuthorization: Token AAFFZZHH\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }" + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n" }

diff --git a/flows/actions/call_webhook.go b/flows/actions/call_webhook.go index 8acc77828..f9fbd56fa 100644 --- a/flows/actions/call_webhook.go +++ b/flows/actions/call_webhook.go @@ -85,11 +85,13 @@ func (a *CallWebhookAction) Execute(run flows.FlowRun, step flows.Step, log flow } webhook, err := flows.MakeWebhookCall(run.Session(), req) + if err != nil { log.Add(events.NewErrorEvent(err)) + } else { + log.Add(events.NewWebhookCalledEvent(webhook.URL(), webhook.Status(), webhook.StatusCode(), webhook.Request(), webhook.Response())) } - run.SetWebhook(webhook) - log.Add(events.NewWebhookCalledEvent(webhook.URL(), webhook.Status(), webhook.StatusCode(), webhook.Request(), webhook.Response())) + run.SetWebhook(webhook) return nil } diff --git a/flows/definition/flow_test.go b/flows/definition/flow_test.go index b32cccbd3..c9a82c0a7 100644 --- a/flows/definition/flow_test.go +++ b/flows/definition/flow_test.go @@ -53,7 +53,7 @@ func TestFlowValidation(t *testing.T) { err = assetCache.Include(assetsJSON) assert.NoError(t, err) - session := engine.NewSession(assetCache, assets.NewMockAssetServer(), test.TestHTTPClient) + session := engine.NewSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient) flow, err := session.Assets().GetFlow("76f0a02f-3b75-4b86-9064-e9195e1b3a02") assert.NoError(t, err) diff --git a/flows/engine/config.go b/flows/engine/config.go new file mode 100644 index 000000000..1c10a2f0c --- /dev/null +++ b/flows/engine/config.go @@ -0,0 +1,22 @@ +package engine + +import ( + "github.com/nyaruka/goflow/flows" +) + +// the configuration options for the flow engine +type config struct { + maxWebhookResponseBytes int +} + +// NewConfig returns a new engine configuration +func NewConfig(maxWebhookResponseBytes int) flows.EngineConfig { + return &config{maxWebhookResponseBytes: maxWebhookResponseBytes} +} + +// NewDefaultConfig returns the default engine configuration +func NewDefaultConfig() flows.EngineConfig { + return &config{maxWebhookResponseBytes: 10000} +} + +func (c *config) MaxWebhookResponseBytes() int { return c.maxWebhookResponseBytes } diff --git a/flows/engine/session.go b/flows/engine/session.go index 48ca24c34..1a492a17c 100644 --- a/flows/engine/session.go +++ b/flows/engine/session.go @@ -38,19 +38,22 @@ type session struct { pushedFlow *pushedFlow flowStack *flowStack newEvents []flows.Event - httpClient *utils.HTTPClient + + engineConfig flows.EngineConfig + httpClient *utils.HTTPClient } // NewSession creates a new session -func NewSession(assetCache *assets.AssetCache, assetServer assets.AssetServer, httpClient *utils.HTTPClient) flows.Session { +func NewSession(assetCache *assets.AssetCache, assetServer assets.AssetServer, engineConfig flows.EngineConfig, httpClient *utils.HTTPClient) flows.Session { return &session{ - env: utils.NewDefaultEnvironment(), - assets: assets.NewSessionAssets(assetCache, assetServer), - status: flows.SessionStatusActive, - newEvents: []flows.Event{}, - runsByUUID: make(map[flows.RunUUID]flows.FlowRun), - flowStack: newFlowStack(), - httpClient: httpClient, + env: utils.NewDefaultEnvironment(), + assets: assets.NewSessionAssets(assetCache, assetServer), + status: flows.SessionStatusActive, + newEvents: []flows.Event{}, + runsByUUID: make(map[flows.RunUUID]flows.FlowRun), + flowStack: newFlowStack(), + engineConfig: engineConfig, + httpClient: httpClient, } } @@ -118,7 +121,8 @@ func (s *session) LogEvent(event flows.Event) { } func (s *session) Events() []flows.Event { return s.newEvents } -func (s *session) HTTPClient() *utils.HTTPClient { return s.httpClient } +func (s *session) EngineConfig() flows.EngineConfig { return s.engineConfig } +func (s *session) HTTPClient() *utils.HTTPClient { return s.httpClient } //------------------------------------------------------------------------------------------ // Flow execution @@ -483,7 +487,7 @@ type sessionEnvelope struct { } // ReadSession decodes a session from the passed in JSON -func ReadSession(assetCache *assets.AssetCache, assetServer assets.AssetServer, httpClient *utils.HTTPClient, data json.RawMessage) (flows.Session, error) { +func ReadSession(assetCache *assets.AssetCache, assetServer assets.AssetServer, engineConfig flows.EngineConfig, httpClient *utils.HTTPClient, data json.RawMessage) (flows.Session, error) { var envelope sessionEnvelope var err error @@ -491,7 +495,7 @@ func ReadSession(assetCache *assets.AssetCache, assetServer assets.AssetServer, return nil, err } - s := NewSession(assetCache, assetServer, httpClient).(*session) + s := NewSession(assetCache, assetServer, engineConfig, httpClient).(*session) s.status = envelope.Status // read our environment diff --git a/flows/interfaces.go b/flows/interfaces.go index 9ccd66ffe..b6d5d2713 100644 --- a/flows/interfaces.go +++ b/flows/interfaces.go @@ -340,10 +340,13 @@ type Step interface { Leave(ExitUUID) } +type EngineConfig interface { + MaxWebhookResponseBytes() int +} + // Session represents the session of a flow run which may contain many runs type Session interface { Assets() SessionAssets - HTTPClient() *utils.HTTPClient Environment() utils.Environment SetEnvironment(utils.Environment) @@ -366,6 +369,9 @@ type Session interface { Events() []Event LogEvent(Event) + + EngineConfig() EngineConfig + HTTPClient() *utils.HTTPClient } // RunSummary represents the minimum information available about all runs (current or related) and is the diff --git a/flows/webhook.go b/flows/webhook.go index 070a7d04b..63f513293 100644 --- a/flows/webhook.go +++ b/flows/webhook.go @@ -3,14 +3,25 @@ package flows import ( "encoding/json" "fmt" + "io" "io/ioutil" + "mime" "net/http" "net/http/httputil" - "strings" "github.com/nyaruka/goflow/excellent/types" ) +// response content-types that we'll save as @run.webhook.body +var saveResponseContentTypes = map[string]bool{ + "application/json": true, + "application/javascript": true, + "application/xml": true, + "text/html": true, + "text/plain": true, + "text/xml": true, +} + // WebhookStatus represents the status of a WebhookRequest type WebhookStatus string @@ -60,12 +71,10 @@ type WebhookCall struct { func MakeWebhookCall(session Session, request *http.Request) (*WebhookCall, error) { response, requestDump, err := session.HTTPClient().DoWithDump(request) if err != nil { - w, _ := newWebhookCallFromError(request, requestDump, err) - return w, err + return newWebhookCallFromError(request, requestDump, err), err } - defer response.Body.Close() - return newWebhookCallFromResponse(requestDump, response) + return newWebhookCallFromResponse(requestDump, response, session.EngineConfig().MaxWebhookResponseBytes()) } // URL returns the full URL @@ -125,21 +134,23 @@ var _ types.XValue = (*WebhookCall)(nil) var _ types.XResolvable = (*WebhookCall)(nil) // newWebhookCallFromError creates a new webhook call based on the passed in http request and error (when we received no response) -func newWebhookCallFromError(r *http.Request, requestTrace string, requestError error) (*WebhookCall, error) { +func newWebhookCallFromError(r *http.Request, requestTrace string, requestError error) *WebhookCall { return &WebhookCall{ url: r.URL.String(), request: requestTrace, status: WebhookStatusConnectionError, body: requestError.Error(), - }, nil + } } // newWebhookCallFromResponse creates a new RequestResponse based on the passed in http Response -func newWebhookCallFromResponse(requestTrace string, r *http.Response) (*WebhookCall, error) { - var err error +func newWebhookCallFromResponse(requestTrace string, response *http.Response, maxBodyBytes int) (*WebhookCall, error) { + defer response.Body.Close() + w := &WebhookCall{ - url: r.Request.URL.String(), - statusCode: r.StatusCode, + method: response.Request.Method, + url: response.Request.URL.String(), + statusCode: response.StatusCode, request: requestTrace, } @@ -150,31 +161,33 @@ func newWebhookCallFromResponse(requestTrace string, r *http.Response) (*Webhook w.status = WebhookStatusResponseError } - // figure out if our Response is something that looks like text from our headers - isText := false - contentType := r.Header.Get("Content-Type") - if strings.Contains(contentType, "text") || - strings.Contains(contentType, "json") || - strings.Contains(contentType, "utf") || - strings.Contains(contentType, "javascript") || - strings.Contains(contentType, "xml") { - - isText = true - } - - // only dump the whole body if this looks like text - response, err := httputil.DumpResponse(r, isText) + // save response dump without body which will be saved separately + responseDump, err := httputil.DumpResponse(response, false) if err != nil { - return w, err + return nil, err } - w.response = string(response) + w.response = string(responseDump) + + // only save response body's if we have a supported content-type + contentType := response.Header.Get("Content-Type") + mediaType, _, _ := mime.ParseMediaType(contentType) + saveBody := saveResponseContentTypes[mediaType] + + if saveBody { + // only read up to our max body bytes limit + bodyReader := io.LimitReader(response.Body, int64(maxBodyBytes)+1) - if isText { - bodyBytes, err := ioutil.ReadAll(r.Body) + bodyBytes, err := ioutil.ReadAll(bodyReader) if err != nil { - return w, err + return nil, err } - w.body = strings.TrimSpace(string(bodyBytes)) + + // if we have no remaining bytes, error because they body was too big + if bodyReader.(*io.LimitedReader).N <= 0 { + return nil, fmt.Errorf("webhook response body exceeds %d bytes limit", maxBodyBytes) + } + + w.body = string(bodyBytes) } else { // no body for non-text responses but add it to our Response log so users know why w.response = w.response + "\nNon-text body, ignoring" diff --git a/flowserver.toml b/flowserver.toml index 2f5862dca..b8527c08f 100644 --- a/flowserver.toml +++ b/flowserver.toml @@ -21,4 +21,7 @@ static = "" # asset cache settings - how many items it can hold and how many to prune when it reaches that size asset_cache_size = 1000 asset_cache_prune = 100 -asset_server_token = "missing_auth_token" \ No newline at end of file +asset_server_token = "missing_auth_token" + +# engine settings +engine_max_webhook_response_bytes = 10000 diff --git a/test/session.go b/test/session.go index 2773835b7..951def1a2 100644 --- a/test/session.go +++ b/test/session.go @@ -325,7 +325,7 @@ func CreateTestSession(testServerPort int, actionToAdd flows.Action) (flows.Sess } // create our engine session - session := engine.NewSession(assetCache, assets.NewMockAssetServer(), TestHTTPClient) + session := engine.NewSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), TestHTTPClient) // override the session environment session.SetEnvironment(newTestEnvironment()) From 778e25cee86624e9b6a541a58d0e63a2393925b7 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Tue, 24 Apr 2018 12:08:34 -0500 Subject: [PATCH 2/6] Include body in webhook events --- cmd/flowrunner/testdata/flows/all_actions_test.json | 4 ++-- cmd/flowrunner/testdata/flows/two_questions_test.json | 4 ++-- cmd/flowrunner/testdata/flows/webhook_persists_test.json | 6 +++--- docs/docs.md | 2 +- docs/index.html | 2 +- flows/actions/call_webhook.go | 4 +++- flows/webhook.go | 4 ++-- 7 files changed, 14 insertions(+), 12 deletions(-) diff --git a/cmd/flowrunner/testdata/flows/all_actions_test.json b/cmd/flowrunner/testdata/flows/all_actions_test.json index 89a89acf9..cac3689c1 100644 --- a/cmd/flowrunner/testdata/flows/all_actions_test.json +++ b/cmd/flowrunner/testdata/flows/all_actions_test.json @@ -282,7 +282,7 @@ { "created_on": "2000-01-01T00:00:00.000000000-00:00", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", "status_code": 200, "step_uuid": "692926ea-09d6-4942-bd38-d266ec8d3716", @@ -630,7 +630,7 @@ { "created_on": "2000-01-01T00:00:00.000000000-00:00", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", "status_code": 200, "step_uuid": "692926ea-09d6-4942-bd38-d266ec8d3716", diff --git a/cmd/flowrunner/testdata/flows/two_questions_test.json b/cmd/flowrunner/testdata/flows/two_questions_test.json index fe00f6676..98f210210 100644 --- a/cmd/flowrunner/testdata/flows/two_questions_test.json +++ b/cmd/flowrunner/testdata/flows/two_questions_test.json @@ -407,7 +407,7 @@ { "created_on": "2000-01-01T00:00:00.000000000-00:00", "request": "POST /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nContent-Length: 69\r\nAccept-Encoding: gzip\r\n\r\n{ \"contact\": \"ba96bf7f-bc2a-4873-a7c7-254d1927c4e3\", \"soda\": \"Coke\" }", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", "status_code": 200, "step_uuid": "970b8069-50f5-4f6f-8f41-6b2d9f33d623", @@ -557,7 +557,7 @@ { "created_on": "2000-01-01T00:00:00.000000000-00:00", "request": "POST /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nContent-Length: 69\r\nAccept-Encoding: gzip\r\n\r\n{ \"contact\": \"ba96bf7f-bc2a-4873-a7c7-254d1927c4e3\", \"soda\": \"Coke\" }", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", "status_code": 200, "step_uuid": "970b8069-50f5-4f6f-8f41-6b2d9f33d623", diff --git a/cmd/flowrunner/testdata/flows/webhook_persists_test.json b/cmd/flowrunner/testdata/flows/webhook_persists_test.json index 78acd2974..40158170e 100644 --- a/cmd/flowrunner/testdata/flows/webhook_persists_test.json +++ b/cmd/flowrunner/testdata/flows/webhook_persists_test.json @@ -36,7 +36,7 @@ { "created_on": "2000-01-01T00:00:00.000000000-00:00", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", "status_code": 200, "step_uuid": "c34b6c7d-fa06-4563-92a3-d648ab64bccb", @@ -109,7 +109,7 @@ { "created_on": "2000-01-01T00:00:00.000000000-00:00", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", "status_code": 200, "step_uuid": "c34b6c7d-fa06-4563-92a3-d648ab64bccb", @@ -288,7 +288,7 @@ { "created_on": "2000-01-01T00:00:00.000000000-00:00", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", "status_code": 200, "step_uuid": "c34b6c7d-fa06-4563-92a3-d648ab64bccb", diff --git a/docs/docs.md b/docs/docs.md index 0eab03282..b126543e5 100644 --- a/docs/docs.md +++ b/docs/docs.md @@ -1888,7 +1888,7 @@ A `webhook_called` event will be created based on the results of the HTTP call. "status": "success", "status_code": 200, "request": "GET /?cmd=success HTTP/1.1\r\nHost: localhost:49998\r\nUser-Agent: goflow-testing\r\nAuthorization: Token AAFFZZHH\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n" + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }" } ``` diff --git a/docs/index.html b/docs/index.html index e2b5cd137..6ac85d095 100644 --- a/docs/index.html +++ b/docs/index.html @@ -1262,7 +1262,7 @@

"status": "success", "status_code": 200, "request": "GET /?cmd=success HTTP/1.1\r\nHost: localhost:49998\r\nUser-Agent: goflow-testing\r\nAuthorization: Token AAFFZZHH\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n" + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }" }

diff --git a/flows/actions/call_webhook.go b/flows/actions/call_webhook.go index f9fbd56fa..bfea2e027 100644 --- a/flows/actions/call_webhook.go +++ b/flows/actions/call_webhook.go @@ -89,7 +89,9 @@ func (a *CallWebhookAction) Execute(run flows.FlowRun, step flows.Step, log flow if err != nil { log.Add(events.NewErrorEvent(err)) } else { - log.Add(events.NewWebhookCalledEvent(webhook.URL(), webhook.Status(), webhook.StatusCode(), webhook.Request(), webhook.Response())) + fullResponse := webhook.Response() + webhook.Body() + + log.Add(events.NewWebhookCalledEvent(webhook.URL(), webhook.Status(), webhook.StatusCode(), webhook.Request(), fullResponse)) } run.SetWebhook(webhook) diff --git a/flows/webhook.go b/flows/webhook.go index 63f513293..b78fe365e 100644 --- a/flows/webhook.go +++ b/flows/webhook.go @@ -182,7 +182,7 @@ func newWebhookCallFromResponse(requestTrace string, response *http.Response, ma return nil, err } - // if we have no remaining bytes, error because they body was too big + // if we have no remaining bytes, error because the body was too big if bodyReader.(*io.LimitedReader).N <= 0 { return nil, fmt.Errorf("webhook response body exceeds %d bytes limit", maxBodyBytes) } @@ -190,7 +190,7 @@ func newWebhookCallFromResponse(requestTrace string, response *http.Response, ma w.body = string(bodyBytes) } else { // no body for non-text responses but add it to our Response log so users know why - w.response = w.response + "\nNon-text body, ignoring" + w.response = w.response + "Non-text body, ignoring" } return w, nil From 61d4e03498e141472cfc993ba2e0aefe9b29ce6a Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Tue, 24 Apr 2018 12:12:29 -0500 Subject: [PATCH 3/6] Make flowserver.Config implement flows.EngineConfig --- cmd/flowserver/config.go | 7 +++---- cmd/flowserver/server.go | 16 +++++++--------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/cmd/flowserver/config.go b/cmd/flowserver/config.go index 4b2393e52..6b5476a18 100644 --- a/cmd/flowserver/config.go +++ b/cmd/flowserver/config.go @@ -3,7 +3,6 @@ package main import ( "github.com/nyaruka/ezconf" "github.com/nyaruka/goflow/flows" - "github.com/nyaruka/goflow/flows/engine" ) // Config is our top level config for our flowserver @@ -18,9 +17,7 @@ type Config struct { Version string `help:"the version to use in request and response headers"` } -func (c *Config) Engine() flows.EngineConfig { - return engine.NewConfig(c.EngineMaxWebhookResponseBytes) -} +func (c *Config) MaxWebhookResponseBytes() int { return c.EngineMaxWebhookResponseBytes } // NewDefaultConfig returns our default configuration func NewDefaultConfig() *Config { @@ -46,3 +43,5 @@ func NewConfigWithPath(path string) *Config { loader.MustLoad() return config } + +var _ flows.EngineConfig = (*Config)(nil) diff --git a/cmd/flowserver/server.go b/cmd/flowserver/server.go index e5d3ae82d..af30329fd 100644 --- a/cmd/flowserver/server.go +++ b/cmd/flowserver/server.go @@ -26,11 +26,10 @@ import ( ) type FlowServer struct { - config *Config - httpServer *http.Server - assetCache *assets.AssetCache - engineConfig flows.EngineConfig - httpClient *utils.HTTPClient + config *Config + httpServer *http.Server + assetCache *assets.AssetCache + httpClient *utils.HTTPClient } // NewFlowServer creates a new flow server instance @@ -60,8 +59,7 @@ func NewFlowServer(config *Config) *FlowServer { } s := &FlowServer{ - config: config, - engineConfig: config.Engine(), + config: config, httpServer: &http.Server{ Addr: fmt.Sprintf(":%d", config.Port), Handler: r, @@ -175,7 +173,7 @@ func (s *FlowServer) handleStart(w http.ResponseWriter, r *http.Request) (interf } // build our session - session := engine.NewSession(s.assetCache, assetServer, s.engineConfig, s.httpClient) + session := engine.NewSession(s.assetCache, assetServer, s.config, s.httpClient) // read our trigger trigger, err := triggers.ReadTrigger(session, start.Trigger) @@ -238,7 +236,7 @@ func (s *FlowServer) handleResume(w http.ResponseWriter, r *http.Request) (inter } // read our session - session, err := engine.ReadSession(s.assetCache, assetServer, s.engineConfig, s.httpClient, resume.Session) + session, err := engine.ReadSession(s.assetCache, assetServer, s.config, s.httpClient, resume.Session) if err != nil { return nil, err } From 66c6b1d3edac703d9a1447275af69d67332b5d3a Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Tue, 24 Apr 2018 12:16:59 -0500 Subject: [PATCH 4/6] Remove unused method --- flows/engine/config.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/flows/engine/config.go b/flows/engine/config.go index 1c10a2f0c..04d910c58 100644 --- a/flows/engine/config.go +++ b/flows/engine/config.go @@ -9,11 +9,6 @@ type config struct { maxWebhookResponseBytes int } -// NewConfig returns a new engine configuration -func NewConfig(maxWebhookResponseBytes int) flows.EngineConfig { - return &config{maxWebhookResponseBytes: maxWebhookResponseBytes} -} - // NewDefaultConfig returns the default engine configuration func NewDefaultConfig() flows.EngineConfig { return &config{maxWebhookResponseBytes: 10000} From 55061f88238a0da2d7ec4a9d11b20cb643ee1875 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Tue, 24 Apr 2018 15:33:12 -0500 Subject: [PATCH 5/6] Parse body as needed from webhook response --- .../testdata/flows/all_actions_test.json | 3 +- .../testdata/flows/two_questions_test.json | 3 +- .../testdata/flows/webhook_persists_test.json | 6 +-- flows/actions/call_webhook.go | 4 +- flows/webhook.go | 45 ++++++++++--------- 5 files changed, 29 insertions(+), 32 deletions(-) diff --git a/cmd/flowrunner/testdata/flows/all_actions_test.json b/cmd/flowrunner/testdata/flows/all_actions_test.json index cac3689c1..b9b0f43cd 100644 --- a/cmd/flowrunner/testdata/flows/all_actions_test.json +++ b/cmd/flowrunner/testdata/flows/all_actions_test.json @@ -676,9 +676,8 @@ "status": "completed", "uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5", "webhook": { - "body": "{ \"ok\": \"true\" }", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", "status_code": 200, "url": "http://127.0.0.1:49999/?cmd=success" diff --git a/cmd/flowrunner/testdata/flows/two_questions_test.json b/cmd/flowrunner/testdata/flows/two_questions_test.json index 98f210210..89d7eed48 100644 --- a/cmd/flowrunner/testdata/flows/two_questions_test.json +++ b/cmd/flowrunner/testdata/flows/two_questions_test.json @@ -639,9 +639,8 @@ "status": "completed", "uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5", "webhook": { - "body": "{ \"ok\": \"true\" }", "request": "POST /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nContent-Length: 69\r\nAccept-Encoding: gzip\r\n\r\n{ \"contact\": \"ba96bf7f-bc2a-4873-a7c7-254d1927c4e3\", \"soda\": \"Coke\" }", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", "status_code": 200, "url": "http://127.0.0.1:49999/?cmd=success" diff --git a/cmd/flowrunner/testdata/flows/webhook_persists_test.json b/cmd/flowrunner/testdata/flows/webhook_persists_test.json index 40158170e..26c44e2ca 100644 --- a/cmd/flowrunner/testdata/flows/webhook_persists_test.json +++ b/cmd/flowrunner/testdata/flows/webhook_persists_test.json @@ -173,9 +173,8 @@ "status": "waiting", "uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5", "webhook": { - "body": "{ \"ok\": \"true\" }", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", "status_code": 200, "url": "http://127.0.0.1:49999/?cmd=success" @@ -400,9 +399,8 @@ "status": "completed", "uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5", "webhook": { - "body": "{ \"ok\": \"true\" }", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n", + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", "status_code": 200, "url": "http://127.0.0.1:49999/?cmd=success" diff --git a/flows/actions/call_webhook.go b/flows/actions/call_webhook.go index bfea2e027..f9fbd56fa 100644 --- a/flows/actions/call_webhook.go +++ b/flows/actions/call_webhook.go @@ -89,9 +89,7 @@ func (a *CallWebhookAction) Execute(run flows.FlowRun, step flows.Step, log flow if err != nil { log.Add(events.NewErrorEvent(err)) } else { - fullResponse := webhook.Response() + webhook.Body() - - log.Add(events.NewWebhookCalledEvent(webhook.URL(), webhook.Status(), webhook.StatusCode(), webhook.Request(), fullResponse)) + log.Add(events.NewWebhookCalledEvent(webhook.URL(), webhook.Status(), webhook.StatusCode(), webhook.Request(), webhook.Response())) } run.SetWebhook(webhook) diff --git a/flows/webhook.go b/flows/webhook.go index b78fe365e..da5df5827 100644 --- a/flows/webhook.go +++ b/flows/webhook.go @@ -8,6 +8,7 @@ import ( "mime" "net/http" "net/http/httputil" + "strings" "github.com/nyaruka/goflow/excellent/types" ) @@ -58,12 +59,10 @@ func (r WebhookStatus) String() string { // @context webhook type WebhookCall struct { url string - method string status WebhookStatus statusCode int request string response string - body string } // MakeWebhookCall fires the passed in http request, returning any errors encountered. RequestResponse is always set @@ -80,6 +79,9 @@ func MakeWebhookCall(session Session, request *http.Request) (*WebhookCall, erro // URL returns the full URL func (w *WebhookCall) URL() string { return w.url } +// Method returns the full HTTP method +func (w *WebhookCall) Method() string { return w.request[:strings.IndexRune(w.request, ' ')] } + // Status returns the response status message func (w *WebhookCall) Status() WebhookStatus { return w.status } @@ -93,18 +95,20 @@ func (w *WebhookCall) Request() string { return w.request } func (w *WebhookCall) Response() string { return w.response } // Body returns the response body -func (w *WebhookCall) Body() string { return w.body } +func (w *WebhookCall) Body() string { + parts := strings.SplitN(w.response, "\r\n\r\n", 2) + if len(parts) == 2 { + return parts[1] + } + return "" +} // JSON returns the response as a JSON fragment -func (w *WebhookCall) JSON() types.XValue { return types.JSONToXValue([]byte(w.body)) } +func (w *WebhookCall) JSON() types.XValue { return types.JSONToXValue([]byte(w.Body())) } // Resolve resolves the given key when this webhook is referenced in an expression func (w *WebhookCall) Resolve(key string) types.XValue { switch key { - case "body": - return types.NewXText(w.Body()) - case "json": - return w.JSON() case "url": return types.NewXText(w.URL()) case "request": @@ -115,6 +119,8 @@ func (w *WebhookCall) Resolve(key string) types.XValue { return types.NewXText(string(w.Status())) case "status_code": return types.NewXNumberFromInt(w.StatusCode()) + case "json": + return w.JSON() } return types.NewXResolveError(w, key) @@ -122,7 +128,7 @@ func (w *WebhookCall) Resolve(key string) types.XValue { // Reduce reduces this to a string of method and URL, e.g. "GET http://example.com/hook.php" func (w *WebhookCall) Reduce() types.XPrimitive { - return types.NewXText(fmt.Sprintf("%s %s", w.method, w.url)) + return types.NewXText(fmt.Sprintf("%s %s", w.Method(), w.URL())) } // ToXJSON is called when this type is passed to @(json(...)) @@ -134,12 +140,13 @@ var _ types.XValue = (*WebhookCall)(nil) var _ types.XResolvable = (*WebhookCall)(nil) // newWebhookCallFromError creates a new webhook call based on the passed in http request and error (when we received no response) -func newWebhookCallFromError(r *http.Request, requestTrace string, requestError error) *WebhookCall { +func newWebhookCallFromError(request *http.Request, requestTrace string, requestError error) *WebhookCall { return &WebhookCall{ - url: r.URL.String(), - request: requestTrace, - status: WebhookStatusConnectionError, - body: requestError.Error(), + url: request.URL.String(), + status: WebhookStatusConnectionError, + statusCode: 0, + request: requestTrace, + response: requestError.Error(), } } @@ -148,7 +155,6 @@ func newWebhookCallFromResponse(requestTrace string, response *http.Response, ma defer response.Body.Close() w := &WebhookCall{ - method: response.Request.Method, url: response.Request.URL.String(), statusCode: response.StatusCode, request: requestTrace, @@ -161,7 +167,7 @@ func newWebhookCallFromResponse(requestTrace string, response *http.Response, ma w.status = WebhookStatusResponseError } - // save response dump without body which will be saved separately + // save response dump without body which will be parsed separately responseDump, err := httputil.DumpResponse(response, false) if err != nil { return nil, err @@ -187,10 +193,10 @@ func newWebhookCallFromResponse(requestTrace string, response *http.Response, ma return nil, fmt.Errorf("webhook response body exceeds %d bytes limit", maxBodyBytes) } - w.body = string(bodyBytes) + w.response += string(bodyBytes) } else { // no body for non-text responses but add it to our Response log so users know why - w.response = w.response + "Non-text body, ignoring" + w.response += "Non-text body, ignoring" } return w, nil @@ -204,7 +210,6 @@ type webhookCallEnvelope struct { URL string `json:"url"` Status WebhookStatus `json:"status"` StatusCode int `json:"status_code"` - Body string `json:"body"` Request string `json:"request"` Response string `json:"response"` } @@ -224,7 +229,6 @@ func (w *WebhookCall) UnmarshalJSON(data []byte) error { w.statusCode = envelope.StatusCode w.request = envelope.Request w.response = envelope.Response - w.body = envelope.Body return nil } @@ -236,6 +240,5 @@ func (r *WebhookCall) MarshalJSON() ([]byte, error) { StatusCode: r.statusCode, Request: r.request, Response: r.response, - Body: r.body, }) } From cb772d460da48a75c577f8ba5f1b52ed8278cac2 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Wed, 25 Apr 2018 10:37:31 -0500 Subject: [PATCH 6/6] Add unit tests for webhook calls --- flows/webhook_test.go | 114 ++++++++++++++++++++++++++++++++++++++++++ test/http.go | 68 +++++++++++++++++-------- 2 files changed, 161 insertions(+), 21 deletions(-) create mode 100644 flows/webhook_test.go diff --git a/flows/webhook_test.go b/flows/webhook_test.go new file mode 100644 index 000000000..a6365f24a --- /dev/null +++ b/flows/webhook_test.go @@ -0,0 +1,114 @@ +package flows_test + +import ( + "github.com/nyaruka/goflow/flows" + "net/http" + "strings" + "testing" + + "github.com/nyaruka/goflow/test" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var testServerPort = 49994 + +type call struct { + method string + url string + body string +} + +func (c *call) String() string { return c.method + " " + c.url } + +type webhook struct { + request string + response string + body string +} + +func TestWebhookParsing(t *testing.T) { + server, err := test.NewTestHTTPServer(testServerPort) + require.NoError(t, err) + defer server.Close() + + session, err := test.CreateTestSession(testServerPort, nil) + require.NoError(t, err) + + testCases := []struct { + call call + webhook webhook + isError bool + }{ + { + // successful GET + call: call{"GET", "http://127.0.0.1:49994/?cmd=success", ""}, + webhook: webhook{ + request: "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49994\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", + response: "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + body: "{ \"ok\": \"true\" }", + }, + }, { + // successful POST without body + call: call{"POST", "http://127.0.0.1:49994/?cmd=success", ""}, + webhook: webhook{ + request: "POST /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49994\r\nUser-Agent: goflow-testing\r\nContent-Length: 0\r\nAccept-Encoding: gzip\r\n\r\n", + response: "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + body: "{ \"ok\": \"true\" }", + }, + }, { + // successful POST with body + call: call{"POST", "http://127.0.0.1:49994/?cmd=success", `{"contact": "Bob"}`}, + webhook: webhook{ + request: "POST /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49994\r\nUser-Agent: goflow-testing\r\nContent-Length: 18\r\nAccept-Encoding: gzip\r\n\r\n{\"contact\": \"Bob\"}", + response: "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + body: "{ \"ok\": \"true\" }", + }, + }, { + // POST returning 503 + call: call{"POST", "http://127.0.0.1:49994/?cmd=unavailable", ""}, + webhook: webhook{ + request: "POST /?cmd=unavailable HTTP/1.1\r\nHost: 127.0.0.1:49994\r\nUser-Agent: goflow-testing\r\nContent-Length: 0\r\nAccept-Encoding: gzip\r\n\r\n", + response: "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 37\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"errors\": [\"service unavailable\"] }", + body: "{ \"errors\": [\"service unavailable\"] }", + }, + }, { + // GET returning non-text content type + call: call{"GET", "http://127.0.0.1:49994/?cmd=binary", ""}, + webhook: webhook{ + request: "GET /?cmd=binary HTTP/1.1\r\nHost: 127.0.0.1:49994\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", + response: "HTTP/1.1 200 OK\r\nContent-Length: 10\r\nContent-Type: application/octet-stream\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\nNon-text body, ignoring", + body: "Non-text body, ignoring", + }, + }, { + // GET returning binary body larger than allowed (we ignore binary body so no biggie) + call: call{"GET", "http://127.0.0.1:49994/?cmd=binary&size=11000", ""}, + webhook: webhook{ + request: "GET /?cmd=binary&size=11000 HTTP/1.1\r\nHost: 127.0.0.1:49994\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", + response: "HTTP/1.1 200 OK\r\nContent-Length: 11000\r\nContent-Type: application/octet-stream\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\nNon-text body, ignoring", + body: "Non-text body, ignoring", + }, + }, { + // GET returning text body larger than allowed + call: call{"GET", "http://127.0.0.1:49994/?cmd=binary&size=11000&type=text%2Fplain", ""}, + isError: true, + }, + } + + for _, tc := range testCases { + request, err := http.NewRequest(tc.call.method, tc.call.url, strings.NewReader(tc.call.body)) + require.NoError(t, err) + + webhook, err := flows.MakeWebhookCall(session, request) + if tc.isError { + assert.Error(t, err) + } else { + assert.Equal(t, tc.call.url, webhook.URL(), "URL mismatch for call %s", tc.call) + assert.Equal(t, tc.call.method, webhook.Method(), "method mismatch for call %s", tc.call) + assert.Equal(t, tc.webhook.request, webhook.Request(), "request trace mismatch for call %s", tc.call) + assert.Equal(t, tc.webhook.response, webhook.Response(), "response mismatch for call %s", tc.call) + assert.Equal(t, tc.webhook.body, webhook.Body(), "body mismatch for call %s", tc.call) + } + } +} diff --git a/test/http.go b/test/http.go index 7c39bca46..61573deb0 100644 --- a/test/http.go +++ b/test/http.go @@ -5,6 +5,7 @@ import ( "net" "net/http" "net/http/httptest" + "strconv" "github.com/nyaruka/goflow/utils" ) @@ -14,27 +15,8 @@ var TestHTTPClient = utils.NewHTTPClient("goflow-testing") // NewTestHTTPServer sets up a mock server for webhook actions func NewTestHTTPServer(port int) (*httptest.Server, error) { - server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - cmd := r.URL.Query().Get("cmd") - defer r.Body.Close() - - w.Header().Set("Date", "Wed, 11 Apr 2018 18:24:30 GMT") - - switch cmd { - case "success": - w.WriteHeader(http.StatusOK) - w.Write([]byte(`{ "ok": "true" }`)) - case "echo": - w.WriteHeader(http.StatusOK) - w.Write([]byte(r.URL.Query().Get("content"))) - case "unavailable": - w.WriteHeader(http.StatusServiceUnavailable) - w.Write([]byte(`{ "errors": ["service unavailable"] }`)) - default: - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(`{ "errors": ["bad_request"] }`)) - } - })) + server := httptest.NewUnstartedServer(http.HandlerFunc(testHTTPHandler)) + // manually create a listener for our test server so that our output is predictable l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) if err != nil { @@ -44,3 +26,47 @@ func NewTestHTTPServer(port int) (*httptest.Server, error) { server.Start() return server, nil } + +func testHTTPHandler(w http.ResponseWriter, r *http.Request) { + cmd := r.URL.Query().Get("cmd") + defer r.Body.Close() + + w.Header().Set("Date", "Wed, 11 Apr 2018 18:24:30 GMT") + + switch cmd { + case "success": + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ "ok": "true" }`)) + case "echo": + w.WriteHeader(http.StatusOK) + w.Write([]byte(r.URL.Query().Get("content"))) + case "binary": + typeParam := r.URL.Query().Get("type") + if typeParam == "" { + typeParam = "application/octet-stream" + } + + sizeParam := r.URL.Query().Get("size") + if sizeParam == "" { + sizeParam = "10" + } + size, _ := strconv.Atoi(sizeParam) + data := make([]byte, size) + for i := 0; i < size; i++ { + data[i] = byte(40 + i%10) + } + + w.Header().Set("Content-Type", typeParam) + w.Header().Set("Content-Length", sizeParam) + + w.WriteHeader(http.StatusOK) + w.Write(data) + + case "unavailable": + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte(`{ "errors": ["service unavailable"] }`)) + default: + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{ "errors": ["bad_request"] }`)) + } +}