Skip to content

Commit

Permalink
Merge branch 'master_perf'
Browse files Browse the repository at this point in the history
  • Loading branch information
gptankit committed Jul 13, 2020
2 parents 734d6b8 + 1741d91 commit f6fc88d
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 83 deletions.
2 changes: 1 addition & 1 deletion config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
SQP_K_KEEP_ALIVE_TIMEOUT = "KEEP_ALIVE_TIMEOUT"

SQ_WD = "/usr/local/serviceq"
SQ_VER = "serviceq/0.3"
SQ_VER = "serviceq/0.4"
)

// getPropertyFilePath returns path to sq.properties.
Expand Down
2 changes: 1 addition & 1 deletion errorlog/service_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var logger *log.Logger
// init opens the log file and creates a logger object.
func init() {

logFileLocation := "/opt/serviceq/logs/serviceq_error.log"
logFileLocation := "/usr/local/serviceq/logs/serviceq_error.log"
file, err := os.OpenFile(logFileLocation, os.O_APPEND|os.O_WRONLY, os.ModeAppend)
if err == nil {
logger = log.New(file, "ServiceQ: ", log.Ldate | log.Ltime)
Expand Down
19 changes: 5 additions & 14 deletions model/http_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package model
import (
"bufio"
"errors"
"io/ioutil"
"net"
"net/http"
"strings"
Expand Down Expand Up @@ -38,21 +37,13 @@ func (httpConn *HTTPConnection) ReadFrom() (*http.Request, error) {
}

// WriteTo writes http response to writer (in http format).
func (httpConn *HTTPConnection) WriteTo(res *http.Response, customHeaders []string) error {
func (httpConn *HTTPConnection) WriteTo(res ResponseParam, customHeaders []string) error {

var responseBody []byte
if res.Body != nil {
responseBody, _ = ioutil.ReadAll(res.Body)
res.Body.Close()
}

responseProtocol := res.Proto
responseHeaders := ""
responseStatus := res.Status

// add original response headers
if res.Header != nil {
for k, v := range res.Header {
if res.Headers != nil {
for k, v := range res.Headers {
responseHeaders += k + ": " + strings.Join(v, ",") + "\n"
}
}
Expand All @@ -66,10 +57,10 @@ func (httpConn *HTTPConnection) WriteTo(res *http.Response, customHeaders []stri

if responseHeaders != "" {
responseHeaders = responseHeaders[:len(responseHeaders)-1]
responseStatus = responseStatus + "\n"
res.Status = res.Status + "\n"
}

clientResStr := responseProtocol + " " + responseStatus + responseHeaders + "\n\n" + string(responseBody)
clientResStr := res.Protocol + " " + res.Status + responseHeaders + "\n\n" + string(res.BodyBuff)

clientRes := []byte(clientResStr)

Expand Down
8 changes: 8 additions & 0 deletions model/response_param.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package model

type ResponseParam struct {
Protocol string
Status string
Headers map[string][]string
BodyBuff []byte
}
4 changes: 2 additions & 2 deletions protocol/connectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
func setTCPDeadline(conn *net.Conn, keepAliveTimeout int32) {

if keepAliveTimeout >= 0 {
(*conn).SetDeadline(time.Now().Add(time.Millisecond * time.Duration(keepAliveTimeout)))
(*conn).SetDeadline(time.Now().Add(time.Second * time.Duration(keepAliveTimeout)))
}
}

// isTCPAlive is a ping service to determine tcp connection state.
func isTCPAlive(service string) bool {

dialTO := 5000
conn, err := net.DialTimeout("tcp", service, time.Duration(dialTO) * time.Millisecond)
conn, err := net.DialTimeout("tcp", service, time.Duration(dialTO)*time.Millisecond)
if err == nil {
conn.Close()
return true
Expand Down
131 changes: 73 additions & 58 deletions protocol/http_handler.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,49 @@
package protocol

import (
"github.com/gptankit/serviceq/algorithm"
"bytes"
"github.com/gptankit/serviceq/errorlog"
"errors"
"fmt"
"io"
"io/ioutil"
"github.com/gptankit/serviceq/algorithm"
"github.com/gptankit/serviceq/errorlog"
"github.com/gptankit/serviceq/model"
"io/ioutil"
"net"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
)

var client *http.Client
var once sync.Once

const (
func init() {

client = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 200,
IdleConnTimeout: 30 * time.Second},
}
}

SERVICEQ_NO_ERR = 600
const (
SERVICEQ_NO_ERR = 600
SERVICEQ_FLOODED_ERR = 601
UPSTREAM_NO_ERR = 700
UPSTREAM_TCP_ERR = 701
UPSTREAM_HTTP_ERR = 702
DOWNSTREAM_NO_ERR = 700
DOWNSTREAM_TCP_ERR = 701
DOWNSTREAM_HTTP_ERR = 702

RESPONSE_FLOODED = "SERVICEQ_FLOODED"
RESPONSE_TIMED_OUT = "UPSTREAM_TIMED_OUT"
RESPONSE_SERVICE_DOWN = "UPSTREAM_DOWN"
RESPONSE_NO_RESPONSE = "UPSTREAM_NO_RESPONSE"
RESPONSE_TIMED_OUT = "DOWNSTREAM_TIMED_OUT"
RESPONSE_SERVICE_DOWN = "DOWNSTREAM_DOWN"
RESPONSE_NO_RESPONSE = "DOWNSTREAM_NO_RESPONSE"
)

// HandleHttpConnection reads from incoming http connection and attempts to forward it to downstream nodes by calling
// dialAndSend(). It temporarily saves the request before forwarding, if needed for subsequent retries. This saved
// dialAndSend(). It temporarily saves the request before forwarding, if needed for subsequent retries. This saved
// request can be buffered if dialAndSend() is unable to forward to any downstream nodes.
func HandleHttpConnection(conn *net.Conn, creq chan interface{}, cwork chan int, sqp *model.ServiceQProperties) {

Expand All @@ -44,7 +53,7 @@ func HandleHttpConnection(conn *net.Conn, creq chan interface{}, cwork chan int,

for {

var res *http.Response
var resParam model.ResponseParam
var reqParam model.RequestParam
var toBuffer bool

Expand All @@ -56,10 +65,9 @@ func HandleHttpConnection(conn *net.Conn, creq chan interface{}, cwork chan int,
cwork <- 1

reqParam = saveReqParam(req)
res, toBuffer, err = dialAndSend(reqParam, sqp)

resParam, toBuffer, err = dialAndSend(reqParam, sqp)
if err == nil {
err = httpConn.WriteTo(res, (*sqp).CustomResponseHeaders)
err = httpConn.WriteTo(resParam, (*sqp).CustomResponseHeaders)
if err != nil {
fmt.Fprintf(os.Stderr, "Error on writing to client conn\n")
}
Expand Down Expand Up @@ -90,16 +98,16 @@ func HandleHttpConnection(conn *net.Conn, creq chan interface{}, cwork chan int,
// DiscardHttpConnection sets error response and discards upstream http connection.
func DiscardHttpConnection(conn *net.Conn, sqp *model.ServiceQProperties) {

var res *http.Response
var resParam model.ResponseParam
httpConn := model.HTTPConnection{}
httpConn.Enclose(conn)
req, err := httpConn.ReadFrom()

if err == nil {
res = getCustomResponse(req.Proto, http.StatusTooManyRequests, "Request Discarded")
resParam = getCustomResponse(req.Proto, http.StatusTooManyRequests, "Request Discarded")
clientErr := errors.New(RESPONSE_FLOODED)
errorlog.IncrementErrorCount(sqp, "SQ_PROXY", SERVICEQ_FLOODED_ERR, clientErr.Error())
err = httpConn.WriteTo(res, (*sqp).CustomResponseHeaders)
err = httpConn.WriteTo(resParam, (*sqp).CustomResponseHeaders)
if err != nil {
fmt.Fprintf(os.Stderr, "Error on writing to client conn\n")
}
Expand Down Expand Up @@ -157,41 +165,40 @@ func saveReqParam(req *http.Request) model.RequestParam {
return reqParam
}

// dialAndSend forwards request to downstream node selected by algorithm.ChooseServiceIndex() and in case of
// dialAndSend forwards request to downstream node selected by algorithm.ChooseServiceIndex() and in case of
// error, increments the error count, and retries for a maximum (*sqp).MaxRetries times. If the request succeedes,
// the coresponding node error count is reset. If the request fails on all nodes, it can be set to buffer.
func dialAndSend(reqParam model.RequestParam, sqp *model.ServiceQProperties) (*http.Response, bool, error) {
func dialAndSend(reqParam model.RequestParam, sqp *model.ServiceQProperties) (model.ResponseParam, bool, error) {

choice := -1
var clientErr error

for retry := 0; retry < (*sqp).MaxRetries; retry++ {

choice = algorithm.ChooseServiceIndex(sqp, choice, retry)
upstrService := (*sqp).ServiceList[choice]
downstrService := (*sqp).ServiceList[choice]

//fmt.Printf("%s] Connecting to %s\n", time.Now().UTC().Format("2006-01-02 15:04:05"), upstrService.Host)
// ping ip -- response/error flow below will take care of tcp connect
// fmt.Printf("%s] Connecting to %s\n", time.Now().UTC().Format("2006-01-02 15:04:05"), downstrService.Host)

//if !isTCPAlive(upstrService.Host) {
// clientErr = errors.New(RESPONSE_SERVICE_DOWN)
// errorlog.IncrementErrorCount(sqp, upstrService.QualifiedUrl, UPSTREAM_TCP_ERR, clientErr.Error())
// time.Sleep(time.Duration((*sqp).RetryGap) * time.Millisecond) // wait on error
// continue
//}
// ping ip -- response/error flow below will take care of tcp connect
/*
if !isTCPAlive(downstrService.Host) {
clientErr = errors.New(RESPONSE_SERVICE_DOWN)
errorlog.IncrementErrorCount(sqp, downstrService.QualifiedUrl, UPSTREAM_TCP_ERR, clientErr.Error())
time.Sleep(time.Duration((*sqp).RetryGap) * time.Second) // wait on error
continue
}*/

//fmt.Printf("->Forwarding to %s\n", upstrService.QualifiedUrl)
//fmt.Printf("->Forwarding to %s\n", downstrService.QualifiedUrl)

body := ioutil.NopCloser(bytes.NewReader(reqParam.BodyBuff))
upstrReq, _ := http.NewRequest(reqParam.Method, upstrService.QualifiedUrl+reqParam.RequestURI, body)
upstrReq.Header = reqParam.Headers
downstrReq, _ := http.NewRequest(reqParam.Method, downstrService.QualifiedUrl+reqParam.RequestURI, body)
downstrReq.Header = reqParam.Headers

// do http call
if client == nil {
client = &http.Client{Timeout: time.Duration((*sqp).OutRequestTimeout) * time.Millisecond}
}

resp, err := client.Do(upstrReq)
once.Do(func() {
client.Timeout = time.Duration((*sqp).OutRequestTimeout) * time.Second
})
resp, err := client.Do(downstrReq)

// handle response
if resp == nil || err != nil {
Expand All @@ -205,13 +212,22 @@ func dialAndSend(reqParam model.RequestParam, sqp *model.ServiceQProperties) (*h
} else {
clientErr = errors.New(RESPONSE_NO_RESPONSE)
}
go errorlog.IncrementErrorCount(sqp, upstrService.QualifiedUrl, UPSTREAM_HTTP_ERR, clientErr.Error())
time.Sleep(time.Duration((*sqp).RetryGap) * time.Millisecond) // wait on error
go errorlog.IncrementErrorCount(sqp, downstrService.QualifiedUrl, DOWNSTREAM_HTTP_ERR, clientErr.Error())
time.Sleep(time.Duration((*sqp).RetryGap) * time.Second) // wait on error
continue
} else {
go errorlog.ResetErrorCount(sqp, upstrService.QualifiedUrl)
go errorlog.ResetErrorCount(sqp, downstrService.QualifiedUrl)
clientErr = nil
return resp, false, nil

responseParam := model.ResponseParam{}
responseParam.Protocol = resp.Proto
responseParam.Status = resp.Status
responseParam.Headers = resp.Header
if resp.Body != nil {
responseParam.BodyBuff, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close()
}
return responseParam, false, nil
}
}

Expand All @@ -220,11 +236,11 @@ func dialAndSend(reqParam model.RequestParam, sqp *model.ServiceQProperties) (*h
return checkErrorAndRespond(clientErr, reqParam, sqp)
}

return nil, true, errors.New("send-fail")
return model.ResponseParam{}, true, errors.New("send-fail")
}

// checkErrorAndRespond sets error and buffer flag based on buffer config and type of error from downstream node.
func checkErrorAndRespond(clientErr error, reqParam model.RequestParam, sqp *model.ServiceQProperties) (*http.Response, bool, error) {
func checkErrorAndRespond(clientErr error, reqParam model.RequestParam, sqp *model.ServiceQProperties) (model.ResponseParam, bool, error) {

if clientErr.Error() == RESPONSE_NO_RESPONSE || clientErr.Error() == RESPONSE_TIMED_OUT {
if canBeBuffered(reqParam, sqp) {
Expand All @@ -240,21 +256,21 @@ func checkErrorAndRespond(clientErr error, reqParam model.RequestParam, sqp *mod
}

// getCustomResponse creates a new http response with appropriates status code and response message.
func getCustomResponse(protocol string, statusCode int, resMsg string) *http.Response {
func getCustomResponse(protocol string, statusCode int, resMsg string) model.ResponseParam {

var body io.ReadCloser
var body []byte
var json string
if resMsg != "" {
json = `{"sq_msg":"` + resMsg +`"}`
body = ioutil.NopCloser(bytes.NewReader([]byte(json)))
json = `{"sq_msg":"` + resMsg + `"}`
body = []byte(json)
}
jsonLen := strconv.Itoa(len(json))

return &http.Response{
Proto: protocol,
Status: strconv.Itoa(statusCode) + " " + http.StatusText(statusCode),
StatusCode: statusCode, Header: http.Header{"Content-Type": []string{"application/json"}, "Content-Length": []string{jsonLen},},
Body : body,
return model.ResponseParam{
Protocol: protocol,
Status: strconv.Itoa(statusCode) + " " + http.StatusText(statusCode),
Headers: http.Header{"Content-Type": []string{"application/json"}, "Content-Length": []string{jsonLen}},
BodyBuff: body,
}
}

Expand Down Expand Up @@ -299,12 +315,12 @@ func optCloseConn(conn *net.Conn, reqParam model.RequestParam, keepAliveServe bo
if reqParam.Protocol == "HTTP/1.0" || reqParam.Protocol == "HTTP/1.1" { // Connection and keep-alive are ignored for http/2
if v, ok := reqParam.Headers["Connection"]; ok {
if v[0] == "keep-alive" && keepAliveServe {
return false// do not close conn
return false // do not close conn
} else if v[0] == "close" || !keepAliveServe { // close conn if Connection: close or keep-alive is not part of response
return forceCloseConn(conn)
}
} else if reqParam.Protocol == "HTTP/1.1" && keepAliveServe {
return false// do not close conn if Connection header not found for protocol http/1.1 -- follow default behaviour
return false // do not close conn if Connection header not found for protocol http/1.1 -- follow default behaviour
} else {
return forceCloseConn(conn)
}
Expand All @@ -317,6 +333,5 @@ func optCloseConn(conn *net.Conn, reqParam model.RequestParam, keepAliveServe bo
func forceCloseConn(conn *net.Conn) bool {

(*conn).Close()

return true
}
2 changes: 1 addition & 1 deletion serviceq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestWorkAssigment(t *testing.T) {
RetryGap: 0, // ms
IdleGap: 500, // ms
RequestErrorLog: make(map[string]uint64, 2),
OutRequestTimeout: 300000,
OutRequestTimeout: 1,
}

cw := make(chan int, sqp.MaxConcurrency)
Expand Down
12 changes: 6 additions & 6 deletions sq.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ LISTENER_PORT=5252
PROTO=http

#Endpoints seperated by comma (,) -- no spaces allowed, can be a combination of http/https
ENDPOINTS=http://127.0.0.1:8001,http://127.0.0.1:8000
ENDPOINTS=http://my.server1.com:8080,http://my.server2.com:8080,http://my.server3.com:8080

#Concurrency peak defines how many max concurrent connections are allowed to the cluster of endpoints defined above
CONCURRENCY_PEAK=2048

#Timeout (ms) is added to each outgoing request to endpoints, the existing timeouts are overriden, value of -1 means no timeout
OUTGOING_REQUEST_TIMEOUT=300000
#Timeout (s) is added to each outgoing request to endpoints, the existing timeouts are overriden, value of -1 means no timeout
OUTGOING_REQUEST_TIMEOUT=5

#Interval (ms) between two retries -- recommended 0 for best performance
#Interval (s) between two retries -- recommended 0 for best performance
RETRY_GAP=0


Expand All @@ -45,8 +45,8 @@ DEFERRED_Q_REQUEST_FORMATS=POST,PUT,PATCH,DELETE
#CUSTOM_RESPONSE_HEADERS=Connection: keep-alive|Server
CUSTOM_RESPONSE_HEADERS=Server

#Keep Alive Timeout (ms), value of -1 means no timeout
KEEP_ALIVE_TIMEOUT=120000
#Keep Alive Timeout (s), value of -1 means no timeout
KEEP_ALIVE_TIMEOUT=120


#--------------#
Expand Down

0 comments on commit f6fc88d

Please sign in to comment.