diff --git a/cmd/flowrunner/main.go b/cmd/flowrunner/main.go index 6296d31e2..32afaef65 100644 --- a/cmd/flowrunner/main.go +++ b/cmd/flowrunner/main.go @@ -170,7 +170,7 @@ func main() { la, _ := time.LoadLocation("America/Los_Angeles") env := utils.NewEnvironment(utils.DateFormatYearMonthDay, utils.TimeFormatHourMinute, la, utils.LanguageList{}) - session := engine.NewSession(assetCache, assets.NewMockAssetServer(), httpClient) + session := engine.NewSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), httpClient) contactJSON, err := ioutil.ReadFile(*contactFile) if err != nil { @@ -226,7 +226,7 @@ func main() { callerEvents = append(callerEvents, []flows.Event{event}) // rebuild our session - session, err = engine.ReadSession(assetCache, assets.NewMockAssetServer(), httpClient, outJSON) + session, err = engine.ReadSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), httpClient, outJSON) if err != nil { log.Fatalf("Error unmarshalling output: %s", err) } diff --git a/cmd/flowrunner/runner_test.go b/cmd/flowrunner/runner_test.go index 4218e7301..2dbb3f4bb 100644 --- a/cmd/flowrunner/runner_test.go +++ b/cmd/flowrunner/runner_test.go @@ -96,7 +96,7 @@ func runFlow(assetsFilename string, triggerEnvelope *utils.TypedEnvelope, caller return runResult{}, fmt.Errorf("Error reading test assets '%s': %s", assetsFilename, err) } - session := engine.NewSession(assetCache, assets.NewMockAssetServer(), test.TestHTTPClient) + session := engine.NewSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient) trigger, err := triggers.ReadTrigger(session, triggerEnvelope) if err != nil { @@ -119,7 +119,7 @@ func runFlow(assetsFilename string, triggerEnvelope *utils.TypedEnvelope, caller } outputs = append(outputs, &Output{outJSON, marshalEventLog(session.Events())}) - session, err = engine.ReadSession(assetCache, assets.NewMockAssetServer(), test.TestHTTPClient, outJSON) + session, err = engine.ReadSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient, outJSON) if err != nil { return runResult{}, fmt.Errorf("Error marshalling output: %s", err) } @@ -221,10 +221,10 @@ func TestFlows(t *testing.T) { actualOutput := runResult.outputs[i] expectedOutput := expectedOutputs[i] - actualSession, err := engine.ReadSession(runResult.assetCache, assets.NewMockAssetServer(), test.TestHTTPClient, actualOutput.Session) + actualSession, err := engine.ReadSession(runResult.assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient, actualOutput.Session) require.NoError(t, err, "Error unmarshalling session running flow '%s': %s\n", tc.assets, err) - expectedSession, err := engine.ReadSession(runResult.assetCache, assets.NewMockAssetServer(), test.TestHTTPClient, expectedOutput.Session) + expectedSession, err := engine.ReadSession(runResult.assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient, expectedOutput.Session) require.NoError(t, err, "Error unmarshalling expected session running flow '%s': %s\n", tc.assets, err) // number of runs should be the same diff --git a/cmd/flowrunner/testdata/flows/all_actions_test.json b/cmd/flowrunner/testdata/flows/all_actions_test.json index eb4570905..b9b0f43cd 100644 --- a/cmd/flowrunner/testdata/flows/all_actions_test.json +++ b/cmd/flowrunner/testdata/flows/all_actions_test.json @@ -676,7 +676,6 @@ "status": "completed", "uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5", "webhook": { - "body": "{ \"ok\": \"true\" }", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", diff --git a/cmd/flowrunner/testdata/flows/two_questions_test.json b/cmd/flowrunner/testdata/flows/two_questions_test.json index 37c038c6a..89d7eed48 100644 --- a/cmd/flowrunner/testdata/flows/two_questions_test.json +++ b/cmd/flowrunner/testdata/flows/two_questions_test.json @@ -639,7 +639,6 @@ "status": "completed", "uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5", "webhook": { - "body": "{ \"ok\": \"true\" }", "request": "POST /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nContent-Length: 69\r\nAccept-Encoding: gzip\r\n\r\n{ \"contact\": \"ba96bf7f-bc2a-4873-a7c7-254d1927c4e3\", \"soda\": \"Coke\" }", "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", diff --git a/cmd/flowrunner/testdata/flows/webhook_persists_test.json b/cmd/flowrunner/testdata/flows/webhook_persists_test.json index ba16f0bcd..26c44e2ca 100644 --- a/cmd/flowrunner/testdata/flows/webhook_persists_test.json +++ b/cmd/flowrunner/testdata/flows/webhook_persists_test.json @@ -173,7 +173,6 @@ "status": "waiting", "uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5", "webhook": { - "body": "{ \"ok\": \"true\" }", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", @@ -400,7 +399,6 @@ "status": "completed", "uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5", "webhook": { - "body": "{ \"ok\": \"true\" }", "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", "status": "success", diff --git a/cmd/flowserver/config.go b/cmd/flowserver/config.go index 6ea4b4e73..6b5476a18 100644 --- a/cmd/flowserver/config.go +++ b/cmd/flowserver/config.go @@ -1,27 +1,34 @@ package main -import "github.com/nyaruka/ezconf" +import ( + "github.com/nyaruka/ezconf" + "github.com/nyaruka/goflow/flows" +) // Config is our top level config for our flowserver type Config struct { - Port int `help:"the port we will run on"` - LogLevel string `help:"the logging level to use"` - Static string `help:""` - AssetCacheSize int64 `help:"the maximum size of our asset cache"` - AssetCachePrune int `help:"the number of assets to prune when we reach our max size"` - AssetServerToken string `help:"the token to use when authentication to the asset server"` - Version string `help:"the version to use in request and response headers"` + Port int `help:"the port we will run on"` + LogLevel string `help:"the logging level to use"` + Static string `help:""` + AssetCacheSize int64 `help:"the maximum size of our asset cache"` + AssetCachePrune int `help:"the number of assets to prune when we reach our max size"` + AssetServerToken string `help:"the token to use when authentication to the asset server"` + EngineMaxWebhookResponseBytes int `help:"the maximum allowed byte size of webhook responses"` + Version string `help:"the version to use in request and response headers"` } +func (c *Config) MaxWebhookResponseBytes() int { return c.EngineMaxWebhookResponseBytes } + // NewDefaultConfig returns our default configuration func NewDefaultConfig() *Config { return &Config{ - Port: 8800, - LogLevel: "info", - AssetCacheSize: 1000, - AssetCachePrune: 100, - AssetServerToken: "missing_temba_token", - Version: "Dev", + Port: 8800, + LogLevel: "info", + AssetCacheSize: 1000, + AssetCachePrune: 100, + AssetServerToken: "missing_temba_token", + EngineMaxWebhookResponseBytes: 10000, + Version: "Dev", } } @@ -36,3 +43,5 @@ func NewConfigWithPath(path string) *Config { loader.MustLoad() return config } + +var _ flows.EngineConfig = (*Config)(nil) diff --git a/cmd/flowserver/server.go b/cmd/flowserver/server.go index 82a72253c..af30329fd 100644 --- a/cmd/flowserver/server.go +++ b/cmd/flowserver/server.go @@ -173,7 +173,7 @@ func (s *FlowServer) handleStart(w http.ResponseWriter, r *http.Request) (interf } // build our session - session := engine.NewSession(s.assetCache, assetServer, s.httpClient) + session := engine.NewSession(s.assetCache, assetServer, s.config, s.httpClient) // read our trigger trigger, err := triggers.ReadTrigger(session, start.Trigger) @@ -236,7 +236,7 @@ func (s *FlowServer) handleResume(w http.ResponseWriter, r *http.Request) (inter } // read our session - session, err := engine.ReadSession(s.assetCache, assetServer, s.httpClient, resume.Session) + session, err := engine.ReadSession(s.assetCache, assetServer, s.config, s.httpClient, resume.Session) if err != nil { return nil, err } diff --git a/cmd/flowserver/server_test.go b/cmd/flowserver/server_test.go index 93a6d4bc4..e0b956601 100644 --- a/cmd/flowserver/server_test.go +++ b/cmd/flowserver/server_test.go @@ -238,7 +238,7 @@ func (ts *ServerTestSuite) parseSessionResponse(assetCache *assets.AssetCache, b err := json.Unmarshal(body, &envelope) ts.Require().NoError(err) - session, err := engine.ReadSession(assetCache, ts.assetServer, test.TestHTTPClient, envelope.Session) + session, err := engine.ReadSession(assetCache, ts.assetServer, engine.NewDefaultConfig(), test.TestHTTPClient, envelope.Session) ts.Require().NoError(err) return session, envelope.Log diff --git a/flows/actions/call_webhook.go b/flows/actions/call_webhook.go index 8acc77828..f9fbd56fa 100644 --- a/flows/actions/call_webhook.go +++ b/flows/actions/call_webhook.go @@ -85,11 +85,13 @@ func (a *CallWebhookAction) Execute(run flows.FlowRun, step flows.Step, log flow } webhook, err := flows.MakeWebhookCall(run.Session(), req) + if err != nil { log.Add(events.NewErrorEvent(err)) + } else { + log.Add(events.NewWebhookCalledEvent(webhook.URL(), webhook.Status(), webhook.StatusCode(), webhook.Request(), webhook.Response())) } - run.SetWebhook(webhook) - log.Add(events.NewWebhookCalledEvent(webhook.URL(), webhook.Status(), webhook.StatusCode(), webhook.Request(), webhook.Response())) + run.SetWebhook(webhook) return nil } diff --git a/flows/definition/flow_test.go b/flows/definition/flow_test.go index b32cccbd3..c9a82c0a7 100644 --- a/flows/definition/flow_test.go +++ b/flows/definition/flow_test.go @@ -53,7 +53,7 @@ func TestFlowValidation(t *testing.T) { err = assetCache.Include(assetsJSON) assert.NoError(t, err) - session := engine.NewSession(assetCache, assets.NewMockAssetServer(), test.TestHTTPClient) + session := engine.NewSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), test.TestHTTPClient) flow, err := session.Assets().GetFlow("76f0a02f-3b75-4b86-9064-e9195e1b3a02") assert.NoError(t, err) diff --git a/flows/engine/config.go b/flows/engine/config.go new file mode 100644 index 000000000..04d910c58 --- /dev/null +++ b/flows/engine/config.go @@ -0,0 +1,17 @@ +package engine + +import ( + "github.com/nyaruka/goflow/flows" +) + +// the configuration options for the flow engine +type config struct { + maxWebhookResponseBytes int +} + +// NewDefaultConfig returns the default engine configuration +func NewDefaultConfig() flows.EngineConfig { + return &config{maxWebhookResponseBytes: 10000} +} + +func (c *config) MaxWebhookResponseBytes() int { return c.maxWebhookResponseBytes } diff --git a/flows/engine/session.go b/flows/engine/session.go index 48ca24c34..1a492a17c 100644 --- a/flows/engine/session.go +++ b/flows/engine/session.go @@ -38,19 +38,22 @@ type session struct { pushedFlow *pushedFlow flowStack *flowStack newEvents []flows.Event - httpClient *utils.HTTPClient + + engineConfig flows.EngineConfig + httpClient *utils.HTTPClient } // NewSession creates a new session -func NewSession(assetCache *assets.AssetCache, assetServer assets.AssetServer, httpClient *utils.HTTPClient) flows.Session { +func NewSession(assetCache *assets.AssetCache, assetServer assets.AssetServer, engineConfig flows.EngineConfig, httpClient *utils.HTTPClient) flows.Session { return &session{ - env: utils.NewDefaultEnvironment(), - assets: assets.NewSessionAssets(assetCache, assetServer), - status: flows.SessionStatusActive, - newEvents: []flows.Event{}, - runsByUUID: make(map[flows.RunUUID]flows.FlowRun), - flowStack: newFlowStack(), - httpClient: httpClient, + env: utils.NewDefaultEnvironment(), + assets: assets.NewSessionAssets(assetCache, assetServer), + status: flows.SessionStatusActive, + newEvents: []flows.Event{}, + runsByUUID: make(map[flows.RunUUID]flows.FlowRun), + flowStack: newFlowStack(), + engineConfig: engineConfig, + httpClient: httpClient, } } @@ -118,7 +121,8 @@ func (s *session) LogEvent(event flows.Event) { } func (s *session) Events() []flows.Event { return s.newEvents } -func (s *session) HTTPClient() *utils.HTTPClient { return s.httpClient } +func (s *session) EngineConfig() flows.EngineConfig { return s.engineConfig } +func (s *session) HTTPClient() *utils.HTTPClient { return s.httpClient } //------------------------------------------------------------------------------------------ // Flow execution @@ -483,7 +487,7 @@ type sessionEnvelope struct { } // ReadSession decodes a session from the passed in JSON -func ReadSession(assetCache *assets.AssetCache, assetServer assets.AssetServer, httpClient *utils.HTTPClient, data json.RawMessage) (flows.Session, error) { +func ReadSession(assetCache *assets.AssetCache, assetServer assets.AssetServer, engineConfig flows.EngineConfig, httpClient *utils.HTTPClient, data json.RawMessage) (flows.Session, error) { var envelope sessionEnvelope var err error @@ -491,7 +495,7 @@ func ReadSession(assetCache *assets.AssetCache, assetServer assets.AssetServer, return nil, err } - s := NewSession(assetCache, assetServer, httpClient).(*session) + s := NewSession(assetCache, assetServer, engineConfig, httpClient).(*session) s.status = envelope.Status // read our environment diff --git a/flows/interfaces.go b/flows/interfaces.go index 9ccd66ffe..b6d5d2713 100644 --- a/flows/interfaces.go +++ b/flows/interfaces.go @@ -340,10 +340,13 @@ type Step interface { Leave(ExitUUID) } +type EngineConfig interface { + MaxWebhookResponseBytes() int +} + // Session represents the session of a flow run which may contain many runs type Session interface { Assets() SessionAssets - HTTPClient() *utils.HTTPClient Environment() utils.Environment SetEnvironment(utils.Environment) @@ -366,6 +369,9 @@ type Session interface { Events() []Event LogEvent(Event) + + EngineConfig() EngineConfig + HTTPClient() *utils.HTTPClient } // RunSummary represents the minimum information available about all runs (current or related) and is the diff --git a/flows/webhook.go b/flows/webhook.go index 070a7d04b..da5df5827 100644 --- a/flows/webhook.go +++ b/flows/webhook.go @@ -3,7 +3,9 @@ package flows import ( "encoding/json" "fmt" + "io" "io/ioutil" + "mime" "net/http" "net/http/httputil" "strings" @@ -11,6 +13,16 @@ import ( "github.com/nyaruka/goflow/excellent/types" ) +// response content-types that we'll save as @run.webhook.body +var saveResponseContentTypes = map[string]bool{ + "application/json": true, + "application/javascript": true, + "application/xml": true, + "text/html": true, + "text/plain": true, + "text/xml": true, +} + // WebhookStatus represents the status of a WebhookRequest type WebhookStatus string @@ -47,12 +59,10 @@ func (r WebhookStatus) String() string { // @context webhook type WebhookCall struct { url string - method string status WebhookStatus statusCode int request string response string - body string } // MakeWebhookCall fires the passed in http request, returning any errors encountered. RequestResponse is always set @@ -60,17 +70,18 @@ type WebhookCall struct { func MakeWebhookCall(session Session, request *http.Request) (*WebhookCall, error) { response, requestDump, err := session.HTTPClient().DoWithDump(request) if err != nil { - w, _ := newWebhookCallFromError(request, requestDump, err) - return w, err + return newWebhookCallFromError(request, requestDump, err), err } - defer response.Body.Close() - return newWebhookCallFromResponse(requestDump, response) + return newWebhookCallFromResponse(requestDump, response, session.EngineConfig().MaxWebhookResponseBytes()) } // URL returns the full URL func (w *WebhookCall) URL() string { return w.url } +// Method returns the full HTTP method +func (w *WebhookCall) Method() string { return w.request[:strings.IndexRune(w.request, ' ')] } + // Status returns the response status message func (w *WebhookCall) Status() WebhookStatus { return w.status } @@ -84,18 +95,20 @@ func (w *WebhookCall) Request() string { return w.request } func (w *WebhookCall) Response() string { return w.response } // Body returns the response body -func (w *WebhookCall) Body() string { return w.body } +func (w *WebhookCall) Body() string { + parts := strings.SplitN(w.response, "\r\n\r\n", 2) + if len(parts) == 2 { + return parts[1] + } + return "" +} // JSON returns the response as a JSON fragment -func (w *WebhookCall) JSON() types.XValue { return types.JSONToXValue([]byte(w.body)) } +func (w *WebhookCall) JSON() types.XValue { return types.JSONToXValue([]byte(w.Body())) } // Resolve resolves the given key when this webhook is referenced in an expression func (w *WebhookCall) Resolve(key string) types.XValue { switch key { - case "body": - return types.NewXText(w.Body()) - case "json": - return w.JSON() case "url": return types.NewXText(w.URL()) case "request": @@ -106,6 +119,8 @@ func (w *WebhookCall) Resolve(key string) types.XValue { return types.NewXText(string(w.Status())) case "status_code": return types.NewXNumberFromInt(w.StatusCode()) + case "json": + return w.JSON() } return types.NewXResolveError(w, key) @@ -113,7 +128,7 @@ func (w *WebhookCall) Resolve(key string) types.XValue { // Reduce reduces this to a string of method and URL, e.g. "GET http://example.com/hook.php" func (w *WebhookCall) Reduce() types.XPrimitive { - return types.NewXText(fmt.Sprintf("%s %s", w.method, w.url)) + return types.NewXText(fmt.Sprintf("%s %s", w.Method(), w.URL())) } // ToXJSON is called when this type is passed to @(json(...)) @@ -125,21 +140,23 @@ var _ types.XValue = (*WebhookCall)(nil) var _ types.XResolvable = (*WebhookCall)(nil) // newWebhookCallFromError creates a new webhook call based on the passed in http request and error (when we received no response) -func newWebhookCallFromError(r *http.Request, requestTrace string, requestError error) (*WebhookCall, error) { +func newWebhookCallFromError(request *http.Request, requestTrace string, requestError error) *WebhookCall { return &WebhookCall{ - url: r.URL.String(), - request: requestTrace, - status: WebhookStatusConnectionError, - body: requestError.Error(), - }, nil + url: request.URL.String(), + status: WebhookStatusConnectionError, + statusCode: 0, + request: requestTrace, + response: requestError.Error(), + } } // newWebhookCallFromResponse creates a new RequestResponse based on the passed in http Response -func newWebhookCallFromResponse(requestTrace string, r *http.Response) (*WebhookCall, error) { - var err error +func newWebhookCallFromResponse(requestTrace string, response *http.Response, maxBodyBytes int) (*WebhookCall, error) { + defer response.Body.Close() + w := &WebhookCall{ - url: r.Request.URL.String(), - statusCode: r.StatusCode, + url: response.Request.URL.String(), + statusCode: response.StatusCode, request: requestTrace, } @@ -150,34 +167,36 @@ func newWebhookCallFromResponse(requestTrace string, r *http.Response) (*Webhook w.status = WebhookStatusResponseError } - // figure out if our Response is something that looks like text from our headers - isText := false - contentType := r.Header.Get("Content-Type") - if strings.Contains(contentType, "text") || - strings.Contains(contentType, "json") || - strings.Contains(contentType, "utf") || - strings.Contains(contentType, "javascript") || - strings.Contains(contentType, "xml") { - - isText = true - } - - // only dump the whole body if this looks like text - response, err := httputil.DumpResponse(r, isText) + // save response dump without body which will be parsed separately + responseDump, err := httputil.DumpResponse(response, false) if err != nil { - return w, err + return nil, err } - w.response = string(response) + w.response = string(responseDump) - if isText { - bodyBytes, err := ioutil.ReadAll(r.Body) + // only save response body's if we have a supported content-type + contentType := response.Header.Get("Content-Type") + mediaType, _, _ := mime.ParseMediaType(contentType) + saveBody := saveResponseContentTypes[mediaType] + + if saveBody { + // only read up to our max body bytes limit + bodyReader := io.LimitReader(response.Body, int64(maxBodyBytes)+1) + + bodyBytes, err := ioutil.ReadAll(bodyReader) if err != nil { - return w, err + return nil, err } - w.body = strings.TrimSpace(string(bodyBytes)) + + // if we have no remaining bytes, error because the body was too big + if bodyReader.(*io.LimitedReader).N <= 0 { + return nil, fmt.Errorf("webhook response body exceeds %d bytes limit", maxBodyBytes) + } + + w.response += string(bodyBytes) } else { // no body for non-text responses but add it to our Response log so users know why - w.response = w.response + "\nNon-text body, ignoring" + w.response += "Non-text body, ignoring" } return w, nil @@ -191,7 +210,6 @@ type webhookCallEnvelope struct { URL string `json:"url"` Status WebhookStatus `json:"status"` StatusCode int `json:"status_code"` - Body string `json:"body"` Request string `json:"request"` Response string `json:"response"` } @@ -211,7 +229,6 @@ func (w *WebhookCall) UnmarshalJSON(data []byte) error { w.statusCode = envelope.StatusCode w.request = envelope.Request w.response = envelope.Response - w.body = envelope.Body return nil } @@ -223,6 +240,5 @@ func (r *WebhookCall) MarshalJSON() ([]byte, error) { StatusCode: r.statusCode, Request: r.request, Response: r.response, - Body: r.body, }) } diff --git a/flows/webhook_test.go b/flows/webhook_test.go new file mode 100644 index 000000000..a6365f24a --- /dev/null +++ b/flows/webhook_test.go @@ -0,0 +1,114 @@ +package flows_test + +import ( + "github.com/nyaruka/goflow/flows" + "net/http" + "strings" + "testing" + + "github.com/nyaruka/goflow/test" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var testServerPort = 49994 + +type call struct { + method string + url string + body string +} + +func (c *call) String() string { return c.method + " " + c.url } + +type webhook struct { + request string + response string + body string +} + +func TestWebhookParsing(t *testing.T) { + server, err := test.NewTestHTTPServer(testServerPort) + require.NoError(t, err) + defer server.Close() + + session, err := test.CreateTestSession(testServerPort, nil) + require.NoError(t, err) + + testCases := []struct { + call call + webhook webhook + isError bool + }{ + { + // successful GET + call: call{"GET", "http://127.0.0.1:49994/?cmd=success", ""}, + webhook: webhook{ + request: "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49994\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", + response: "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + body: "{ \"ok\": \"true\" }", + }, + }, { + // successful POST without body + call: call{"POST", "http://127.0.0.1:49994/?cmd=success", ""}, + webhook: webhook{ + request: "POST /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49994\r\nUser-Agent: goflow-testing\r\nContent-Length: 0\r\nAccept-Encoding: gzip\r\n\r\n", + response: "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + body: "{ \"ok\": \"true\" }", + }, + }, { + // successful POST with body + call: call{"POST", "http://127.0.0.1:49994/?cmd=success", `{"contact": "Bob"}`}, + webhook: webhook{ + request: "POST /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49994\r\nUser-Agent: goflow-testing\r\nContent-Length: 18\r\nAccept-Encoding: gzip\r\n\r\n{\"contact\": \"Bob\"}", + response: "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"ok\": \"true\" }", + body: "{ \"ok\": \"true\" }", + }, + }, { + // POST returning 503 + call: call{"POST", "http://127.0.0.1:49994/?cmd=unavailable", ""}, + webhook: webhook{ + request: "POST /?cmd=unavailable HTTP/1.1\r\nHost: 127.0.0.1:49994\r\nUser-Agent: goflow-testing\r\nContent-Length: 0\r\nAccept-Encoding: gzip\r\n\r\n", + response: "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 37\r\nContent-Type: text/plain; charset=utf-8\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\n{ \"errors\": [\"service unavailable\"] }", + body: "{ \"errors\": [\"service unavailable\"] }", + }, + }, { + // GET returning non-text content type + call: call{"GET", "http://127.0.0.1:49994/?cmd=binary", ""}, + webhook: webhook{ + request: "GET /?cmd=binary HTTP/1.1\r\nHost: 127.0.0.1:49994\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", + response: "HTTP/1.1 200 OK\r\nContent-Length: 10\r\nContent-Type: application/octet-stream\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\nNon-text body, ignoring", + body: "Non-text body, ignoring", + }, + }, { + // GET returning binary body larger than allowed (we ignore binary body so no biggie) + call: call{"GET", "http://127.0.0.1:49994/?cmd=binary&size=11000", ""}, + webhook: webhook{ + request: "GET /?cmd=binary&size=11000 HTTP/1.1\r\nHost: 127.0.0.1:49994\r\nUser-Agent: goflow-testing\r\nAccept-Encoding: gzip\r\n\r\n", + response: "HTTP/1.1 200 OK\r\nContent-Length: 11000\r\nContent-Type: application/octet-stream\r\nDate: Wed, 11 Apr 2018 18:24:30 GMT\r\n\r\nNon-text body, ignoring", + body: "Non-text body, ignoring", + }, + }, { + // GET returning text body larger than allowed + call: call{"GET", "http://127.0.0.1:49994/?cmd=binary&size=11000&type=text%2Fplain", ""}, + isError: true, + }, + } + + for _, tc := range testCases { + request, err := http.NewRequest(tc.call.method, tc.call.url, strings.NewReader(tc.call.body)) + require.NoError(t, err) + + webhook, err := flows.MakeWebhookCall(session, request) + if tc.isError { + assert.Error(t, err) + } else { + assert.Equal(t, tc.call.url, webhook.URL(), "URL mismatch for call %s", tc.call) + assert.Equal(t, tc.call.method, webhook.Method(), "method mismatch for call %s", tc.call) + assert.Equal(t, tc.webhook.request, webhook.Request(), "request trace mismatch for call %s", tc.call) + assert.Equal(t, tc.webhook.response, webhook.Response(), "response mismatch for call %s", tc.call) + assert.Equal(t, tc.webhook.body, webhook.Body(), "body mismatch for call %s", tc.call) + } + } +} diff --git a/flowserver.toml b/flowserver.toml index 2f5862dca..b8527c08f 100644 --- a/flowserver.toml +++ b/flowserver.toml @@ -21,4 +21,7 @@ static = "" # asset cache settings - how many items it can hold and how many to prune when it reaches that size asset_cache_size = 1000 asset_cache_prune = 100 -asset_server_token = "missing_auth_token" \ No newline at end of file +asset_server_token = "missing_auth_token" + +# engine settings +engine_max_webhook_response_bytes = 10000 diff --git a/test/http.go b/test/http.go index 7c39bca46..61573deb0 100644 --- a/test/http.go +++ b/test/http.go @@ -5,6 +5,7 @@ import ( "net" "net/http" "net/http/httptest" + "strconv" "github.com/nyaruka/goflow/utils" ) @@ -14,27 +15,8 @@ var TestHTTPClient = utils.NewHTTPClient("goflow-testing") // NewTestHTTPServer sets up a mock server for webhook actions func NewTestHTTPServer(port int) (*httptest.Server, error) { - server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - cmd := r.URL.Query().Get("cmd") - defer r.Body.Close() - - w.Header().Set("Date", "Wed, 11 Apr 2018 18:24:30 GMT") - - switch cmd { - case "success": - w.WriteHeader(http.StatusOK) - w.Write([]byte(`{ "ok": "true" }`)) - case "echo": - w.WriteHeader(http.StatusOK) - w.Write([]byte(r.URL.Query().Get("content"))) - case "unavailable": - w.WriteHeader(http.StatusServiceUnavailable) - w.Write([]byte(`{ "errors": ["service unavailable"] }`)) - default: - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(`{ "errors": ["bad_request"] }`)) - } - })) + server := httptest.NewUnstartedServer(http.HandlerFunc(testHTTPHandler)) + // manually create a listener for our test server so that our output is predictable l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) if err != nil { @@ -44,3 +26,47 @@ func NewTestHTTPServer(port int) (*httptest.Server, error) { server.Start() return server, nil } + +func testHTTPHandler(w http.ResponseWriter, r *http.Request) { + cmd := r.URL.Query().Get("cmd") + defer r.Body.Close() + + w.Header().Set("Date", "Wed, 11 Apr 2018 18:24:30 GMT") + + switch cmd { + case "success": + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ "ok": "true" }`)) + case "echo": + w.WriteHeader(http.StatusOK) + w.Write([]byte(r.URL.Query().Get("content"))) + case "binary": + typeParam := r.URL.Query().Get("type") + if typeParam == "" { + typeParam = "application/octet-stream" + } + + sizeParam := r.URL.Query().Get("size") + if sizeParam == "" { + sizeParam = "10" + } + size, _ := strconv.Atoi(sizeParam) + data := make([]byte, size) + for i := 0; i < size; i++ { + data[i] = byte(40 + i%10) + } + + w.Header().Set("Content-Type", typeParam) + w.Header().Set("Content-Length", sizeParam) + + w.WriteHeader(http.StatusOK) + w.Write(data) + + case "unavailable": + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte(`{ "errors": ["service unavailable"] }`)) + default: + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{ "errors": ["bad_request"] }`)) + } +} diff --git a/test/session.go b/test/session.go index 2773835b7..951def1a2 100644 --- a/test/session.go +++ b/test/session.go @@ -325,7 +325,7 @@ func CreateTestSession(testServerPort int, actionToAdd flows.Action) (flows.Sess } // create our engine session - session := engine.NewSession(assetCache, assets.NewMockAssetServer(), TestHTTPClient) + session := engine.NewSession(assetCache, assets.NewMockAssetServer(), engine.NewDefaultConfig(), TestHTTPClient) // override the session environment session.SetEnvironment(newTestEnvironment())