From 94289bec30a4181428323581d44ab5ec77b7cdc8 Mon Sep 17 00:00:00 2001 From: Aleks Obukhov Date: Thu, 17 Oct 2024 23:44:34 +0200 Subject: [PATCH] Implement parallel handling of batch query #218 --- http.go | 80 +++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 30 deletions(-) diff --git a/http.go b/http.go index c2d312c..786a3cc 100644 --- a/http.go +++ b/http.go @@ -8,6 +8,7 @@ import ( "net/http" "strconv" "strings" + "sync" "github.com/nautilus/graphql" ) @@ -27,6 +28,8 @@ type HTTPOperation struct { } `json:"extensions"` } +type setResultFunc func(r map[string]interface{}) + func formatErrors(err error) map[string]interface{} { return formatErrorsWithCode(nil, err, "UNKNOWN_ERROR") } @@ -70,12 +73,14 @@ func (g *Gateway) GraphQLHandler(w http.ResponseWriter, r *http.Request) { /// Handle the operations regardless of the request method // we have to respond to each operation in the right order - results := []map[string]interface{}{} + results := make([]map[string]interface{}, len(operations)) + opWg := new(sync.WaitGroup) + opMutex := new(sync.Mutex) // the status code to report statusCode := http.StatusOK - for _, operation := range operations { + for opNum, operation := range operations { // there might be a query plan cache key embedded in the operation cacheKey := "" if operation.Extensions.QueryPlanCache != nil { @@ -85,10 +90,8 @@ func (g *Gateway) GraphQLHandler(w http.ResponseWriter, r *http.Request) { // if there is no query or cache key if operation.Query == "" && cacheKey == "" { statusCode = http.StatusUnprocessableEntity - results = append( - results, - formatErrorsWithCode(nil, errors.New("could not find query body"), "BAD_USER_INPUT"), - ) + results[opNum] = formatErrorsWithCode(nil, errors.New("could not find query body"), "BAD_USER_INPUT") + continue } @@ -116,32 +119,12 @@ func (g *Gateway) GraphQLHandler(w http.ResponseWriter, r *http.Request) { return } - // fire the query with the request context passed through to execution - result, err := g.Execute(requestContext, plan) - if err != nil { - results = append(results, formatErrorsWithCode(result, err, "INTERNAL_SERVER_ERROR")) - - continue - } - - // the result for this operation - payload := map[string]interface{}{"data": result} - - // if there was a cache key associated with this query - if requestContext.CacheKey != "" { - // embed the cache key in the response - payload["extensions"] = map[string]interface{}{ - "persistedQuery": map[string]interface{}{ - "sha265Hash": requestContext.CacheKey, - "version": "1", - }, - } - } - - // add this result to the list - results = append(results, payload) + opWg.Add(1) + go g.executeRequest(requestContext, plan, opWg, g.setResultFunc(opNum, results, opMutex)) } + opWg.Wait() + // the final result depends on whether we are executing in batch mode or not var finalResponse interface{} if batchMode { @@ -165,6 +148,43 @@ func (g *Gateway) GraphQLHandler(w http.ResponseWriter, r *http.Request) { emitResponse(w, statusCode, string(response)) } +func (g *Gateway) setResultFunc(opNum int, results []map[string]interface{}, opMutex *sync.Mutex) setResultFunc { + return func(r map[string]interface{}) { + opMutex.Lock() + defer opMutex.Unlock() + results[opNum] = r + } +} + +func (g *Gateway) executeRequest(requestContext *RequestContext, plan QueryPlanList, opWg *sync.WaitGroup, setResult setResultFunc) { + defer opWg.Done() + + // fire the query with the request context passed through to execution + result, err := g.Execute(requestContext, plan) + if err != nil { + setResult(formatErrorsWithCode(result, err, "INTERNAL_SERVER_ERROR")) + + return + } + + // the result for this operation + payload := map[string]interface{}{"data": result} + + // if there was a cache key associated with this query + if requestContext.CacheKey != "" { + // embed the cache key in the response + payload["extensions"] = map[string]interface{}{ + "persistedQuery": map[string]interface{}{ + "sha265Hash": requestContext.CacheKey, + "version": "1", + }, + } + } + + // add this result to the list + setResult(payload) +} + // Parses request to operations (single or batch mode). // Returns an error and an error status code if the request is invalid. func parseRequest(r *http.Request) (operations []*HTTPOperation, batchMode bool, errStatusCode int, payloadErr error) {