From 36763b2ccad74312b06308affaf1b02549521702 Mon Sep 17 00:00:00 2001 From: Emmanuel Schmidbauer Date: Fri, 24 Nov 2017 11:20:56 -0500 Subject: [PATCH] add context deadline support --- .travis.yml | 7 +- client.go | 6 +- connection.go | 137 ++++++++++++------ examples/client.go | 12 +- .../{server_playback.go => serverplayback.go} | 15 +- examples/{server_tts.go => servertts.go} | 19 ++- helpers.go | 32 ++-- server.go | 7 +- 8 files changed, 147 insertions(+), 88 deletions(-) rename examples/{server_playback.go => serverplayback.go} (87%) rename examples/{server_tts.go => servertts.go} (83%) diff --git a/.travis.yml b/.travis.yml index 1f333fc..d674bd7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,11 +1,10 @@ language: go go: - - 1.3 - - 1.4 - - 1.5 + - 1.8 + - 1.9 - tip - + install: - go get -v "github.com/smartystreets/goconvey" - go get -v "github.com/op/go-logging" diff --git a/client.go b/client.go index f17614d..bcda5e0 100644 --- a/client.go +++ b/client.go @@ -35,8 +35,8 @@ func (c *Client) EstablishConnection() error { c.SocketConnection = SocketConnection{ Conn: conn, - err: make(chan error), - m: make(chan *Message), + + receive: make(chan message), } return nil @@ -102,7 +102,7 @@ func NewClient(host string, port uint, passwd string, timeout int) (*Client, err err = client.Authenticate() if err != nil { - client.Close() + _ = client.Close() return nil, err } diff --git a/connection.go b/connection.go index 396b936..b0a8b8b 100644 --- a/connection.go +++ b/connection.go @@ -9,6 +9,7 @@ package goesl import ( "bufio" "bytes" + "context" "fmt" "io" "net" @@ -18,11 +19,17 @@ import ( "time" ) +type message struct { + err error + m *Message +} + // Main connection against ESL - Gotta add more description here type SocketConnection struct { net.Conn - err chan error - m chan *Message + + receive chan message + mtx sync.Mutex } @@ -32,7 +39,7 @@ func (c *SocketConnection) Dial(network string, addr string, timeout time.Durati } // Send - Will send raw message to open net connection -func (c *SocketConnection) Send(cmd string) error { +func (c *SocketConnection) Send(ctx context.Context, cmd string) error { if strings.Contains(cmd, "\r\n") { return fmt.Errorf(EInvalidCommandProvided, cmd) @@ -42,6 +49,12 @@ func (c *SocketConnection) Send(cmd string) error { c.mtx.Lock() defer c.mtx.Unlock() + deadline, ok := ctx.Deadline() + if ok { + _ = c.SetWriteDeadline(deadline) + defer func() { _ = c.SetWriteDeadline(time.Time{}) }() + } + _, err := io.WriteString(c, cmd) if err != nil { return err @@ -56,10 +69,10 @@ func (c *SocketConnection) Send(cmd string) error { } // SendMany - Will loop against passed commands and return 1st error if error happens -func (c *SocketConnection) SendMany(cmds []string) error { +func (c *SocketConnection) SendMany(ctx context.Context, cmds []string) error { for _, cmd := range cmds { - if err := c.Send(cmd); err != nil { + if err := c.Send(ctx, cmd); err != nil { return err } } @@ -68,7 +81,7 @@ func (c *SocketConnection) SendMany(cmds []string) error { } // SendEvent - Will loop against passed event headers -func (c *SocketConnection) SendEvent(eventHeaders []string) error { +func (c *SocketConnection) SendEvent(ctx context.Context, eventHeaders []string) error { if len(eventHeaders) <= 0 { return fmt.Errorf(ECouldNotSendEvent, len(eventHeaders)) } @@ -77,18 +90,20 @@ func (c *SocketConnection) SendEvent(eventHeaders []string) error { c.mtx.Lock() defer c.mtx.Unlock() + deadline, ok := ctx.Deadline() + if ok { + _ = c.SetWriteDeadline(deadline) + defer func() { _ = c.SetWriteDeadline(time.Time{}) }() + } + _, err := io.WriteString(c, "sendevent ") if err != nil { return err } for _, eventHeader := range eventHeaders { - _, err := io.WriteString(c, eventHeader) - if err != nil { - return err - } - _, err = io.WriteString(c, "\r\n") + _, err := io.WriteString(c, eventHeader+"\r\n") if err != nil { return err } @@ -104,27 +119,29 @@ func (c *SocketConnection) SendEvent(eventHeaders []string) error { } // Execute - Helper fuck to execute commands with its args and sync/async mode -func (c *SocketConnection) Execute(command, args string, sync bool) (m *Message, err error) { - return c.SendMsg(map[string]string{ - "call-command": "execute", - "execute-app-name": command, - "execute-app-arg": args, - "event-lock": strconv.FormatBool(sync), - }, "", "") +func (c *SocketConnection) Execute(ctx context.Context, command, args string, sync bool) (m *Message, err error) { + return c.SendMsg(ctx, + map[string]string{ + "call-command": "execute", + "execute-app-name": command, + "execute-app-arg": args, + "event-lock": strconv.FormatBool(sync), + }, "", "") } // ExecuteUUID - Helper fuck to execute uuid specific commands with its args and sync/async mode -func (c *SocketConnection) ExecuteUUID(uuid string, command string, args string, sync bool) (m *Message, err error) { - return c.SendMsg(map[string]string{ - "call-command": "execute", - "execute-app-name": command, - "execute-app-arg": args, - "event-lock": strconv.FormatBool(sync), - }, uuid, "") +func (c *SocketConnection) ExecuteUUID(ctx context.Context, uuid string, command string, args string, sync bool) (m *Message, err error) { + return c.SendMsg(ctx, + map[string]string{ + "call-command": "execute", + "execute-app-name": command, + "execute-app-arg": args, + "event-lock": strconv.FormatBool(sync), + }, uuid, "") } // SendMsg - Basically this func will send message to the opened connection -func (c *SocketConnection) SendMsg(msg map[string]string, uuid, data string) (m *Message, err error) { +func (c *SocketConnection) SendMsg(ctx context.Context, msg map[string]string, uuid, data string) (*Message, error) { b := bytes.NewBufferString("sendmsg") @@ -160,19 +177,26 @@ func (c *SocketConnection) SendMsg(msg map[string]string, uuid, data string) (m // lock mutex c.mtx.Lock() - _, err = b.WriteTo(c) + defer c.mtx.Unlock() + + deadline, ok := ctx.Deadline() + if ok { + _ = c.SetWriteDeadline(deadline) + defer func() { _ = c.SetWriteDeadline(time.Time{}) }() + } + + _, err := b.WriteTo(c) if err != nil { - c.mtx.Unlock() return nil, err } - c.mtx.Unlock() - select { - case err := <-c.err: + m, err := c.ReadMessage(ctx) + if err != nil { return nil, err - case m := <-c.m: - return m, nil } + + return m, nil + } // OriginatorAdd - Will return originator address known as net.RemoteAddr() @@ -183,40 +207,65 @@ func (c *SocketConnection) OriginatorAddr() net.Addr { // ReadMessage - Will read message from channels and return them back accordingy. // If error is received, error will be returned. If not, message will be returned back! -func (c *SocketConnection) ReadMessage() (*Message, error) { +func (c *SocketConnection) ReadMessage(ctx context.Context) (*Message, error) { Debug("Waiting for connection message to be received ...") + var m message select { - case err := <-c.err: - return nil, err - case msg := <-c.m: - return msg, nil + case m = <-c.receive: + case <-ctx.Done(): + return nil, fmt.Errorf("context deadline exceeded") + } + + if m.m == nil { + return nil, fmt.Errorf("unable to read message, channel closed") + } + + if m.err != nil { + return nil, m.err } + + return m.m, nil } +const ( + defaultHandleTimeout = time.Second +) + // Handle - Will handle new messages and close connection when there are no messages left to process func (c *SocketConnection) Handle() { - done := make(chan bool) + done := make(chan struct{}) go func() { for { - msg, err := newMessage(bufio.NewReaderSize(c, ReadBufferSize), true) + msg, err := newMessage(bufio.NewReaderSize(c, ReadBufferSize), true) if err != nil { - c.err <- err - done <- true + + select { + case c.receive <- message{err: err}: + case <-time.After(defaultHandleTimeout): + } + + close(done) break } - c.m <- msg + select { + case c.receive <- message{m: msg}: + case <-time.After(defaultHandleTimeout): + // if messages are getting dropped, receive syncronization will be messed up and unreliable + } } }() <-done + close(c.receive) + // Closing the connection now as there's nothing left to do ... - c.Close() + _ = c.Close() } // Close - Will close down net connection and return error if error happen diff --git a/examples/client.go b/examples/client.go index f299f0b..8cde490 100644 --- a/examples/client.go +++ b/examples/client.go @@ -7,11 +7,13 @@ package examples import ( + "context" "flag" "fmt" - . "github.com/0x19/goesl" "runtime" "strings" + + . "github.com/0x19/goesl" ) var ( @@ -24,6 +26,8 @@ var ( // Small client that will first make sure all events are returned as JSON and second, will originate func main() { + ctx := context.Background() + // Boost it as much as it can go ... runtime.GOMAXPROCS(runtime.NumCPU()) @@ -41,12 +45,12 @@ func main() { // Remember that this is crutial part in handling incoming messages :) go client.Handle() - client.Send("events json ALL") + client.Send(ctx, "events json ALL") - client.BgApi(fmt.Sprintf("originate %s %s", "sofia/internal/1001@127.0.0.1", "&socket(192.168.1.2:8084 async full)")) + client.BgApi(ctx, fmt.Sprintf("originate %s %s", "sofia/internal/1001@127.0.0.1", "&socket(192.168.1.2:8084 async full)")) for { - msg, err := client.ReadMessage() + msg, err := client.ReadMessage(ctx) if err != nil { diff --git a/examples/server_playback.go b/examples/serverplayback.go similarity index 87% rename from examples/server_playback.go rename to examples/serverplayback.go index 1d7bb21..a69bc44 100644 --- a/examples/server_playback.go +++ b/examples/serverplayback.go @@ -7,11 +7,13 @@ package examples import ( + "context" "fmt" - . "github.com/0x19/goesl" "os" "runtime" "strings" + + . "github.com/0x19/goesl" ) var welcomeFile = "%s/media/welcome.wav" @@ -47,6 +49,7 @@ func main() { // handle - Running under goroutine here to explain how to handle playback ( play to the caller ) func handle(s *OutboundServer) { + ctx := context.Background() for { @@ -55,12 +58,12 @@ func handle(s *OutboundServer) { case conn := <-s.Conns: Notice("New incomming connection: %v", conn) - if err := conn.Connect(); err != nil { + if err := conn.Connect(ctx); err != nil { Error("Got error while accepting connection: %s", err) break } - answer, err := conn.ExecuteAnswer("", false) + answer, err := conn.ExecuteAnswer(ctx, "", false) if err != nil { Error("Got error while executing answer: %s", err) @@ -72,14 +75,14 @@ func handle(s *OutboundServer) { cUUID := answer.GetCallUUID() - if sm, err := conn.Execute("playback", welcomeFile, true); err != nil { + if sm, err := conn.Execute(ctx, "playback", welcomeFile, true); err != nil { Error("Got error while executing playback: %s", err) break } else { Debug("Playback Message: %s", sm) } - if hm, err := conn.ExecuteHangup(cUUID, "", false); err != nil { + if hm, err := conn.ExecuteHangup(ctx, cUUID, "", false); err != nil { Error("Got error while executing hangup: %s", err) break } else { @@ -88,7 +91,7 @@ func handle(s *OutboundServer) { go func() { for { - msg, err := conn.ReadMessage() + msg, err := conn.ReadMessage(ctx) if err != nil { diff --git a/examples/server_tts.go b/examples/servertts.go similarity index 83% rename from examples/server_tts.go rename to examples/servertts.go index b60e4fc..9a55319 100644 --- a/examples/server_tts.go +++ b/examples/servertts.go @@ -7,9 +7,11 @@ package examples import ( - . "github.com/0x19/goesl" + "context" "runtime" "strings" + + . "github.com/0x19/goesl" ) var ( @@ -38,6 +40,7 @@ func main() { // handle - Running under goroutine here to explain how to run tts outbound server func handle(s *OutboundServer) { + ctx := context.Background() for { @@ -46,12 +49,12 @@ func handle(s *OutboundServer) { case conn := <-s.Conns: Notice("New incomming connection: %v", conn) - if err := conn.Connect(); err != nil { + if err := conn.Connect(ctx); err != nil { Error("Got error while accepting connection: %s", err) break } - answer, err := conn.ExecuteAnswer("", false) + answer, err := conn.ExecuteAnswer(ctx, "", false) if err != nil { Error("Got error while executing answer: %s", err) @@ -63,26 +66,26 @@ func handle(s *OutboundServer) { cUUID := answer.GetCallUUID() - if te, err := conn.ExecuteSet("tts_engine", "flite", false); err != nil { + if te, err := conn.ExecuteSet(ctx, "tts_engine", "flite", false); err != nil { Error("Got error while attempting to set tts_engine: %s", err) } else { Debug("TTS Engine Msg: %s", te) } - if tv, err := conn.ExecuteSet("tts_voice", "slt", false); err != nil { + if tv, err := conn.ExecuteSet(ctx, "tts_voice", "slt", false); err != nil { Error("Got error while attempting to set tts_voice: %s", err) } else { Debug("TTS Voice Msg: %s", tv) } - if sm, err := conn.Execute("speak", goeslMessage, true); err != nil { + if sm, err := conn.Execute(ctx, "speak", goeslMessage, true); err != nil { Error("Got error while executing speak: %s", err) break } else { Debug("Speak Message: %s", sm) } - if hm, err := conn.ExecuteHangup(cUUID, "", false); err != nil { + if hm, err := conn.ExecuteHangup(ctx, cUUID, "", false); err != nil { Error("Got error while executing hangup: %s", err) break } else { @@ -91,7 +94,7 @@ func handle(s *OutboundServer) { go func() { for { - msg, err := conn.ReadMessage() + msg, err := conn.ReadMessage(ctx) if err != nil { diff --git a/helpers.go b/helpers.go index 1ecf33a..ea1d644 100644 --- a/helpers.go +++ b/helpers.go @@ -6,42 +6,44 @@ package goesl +import "context" + // Set - Helper that you can use to execute SET application against active ESL session -func (sc *SocketConnection) ExecuteSet(key string, value string, sync bool) (m *Message, err error) { - return sc.Execute("set", key+"="+value, sync) +func (sc *SocketConnection) ExecuteSet(ctx context.Context, key string, value string, sync bool) (m *Message, err error) { + return sc.Execute(ctx, "set", key+"="+value, sync) } // ExecuteHangup - Helper desgned to help with executing Answer against active ESL session -func (sc *SocketConnection) ExecuteAnswer(args string, sync bool) (m *Message, err error) { - return sc.Execute("answer", args, sync) +func (sc *SocketConnection) ExecuteAnswer(ctx context.Context, args string, sync bool) (m *Message, err error) { + return sc.Execute(ctx, "answer", args, sync) } // ExecuteHangup - Helper desgned to help with executing Hangup against active ESL session -func (sc *SocketConnection) ExecuteHangup(uuid string, args string, sync bool) (m *Message, err error) { +func (sc *SocketConnection) ExecuteHangup(ctx context.Context, uuid string, args string, sync bool) (m *Message, err error) { if uuid != "" { - return sc.ExecuteUUID(uuid, "hangup", args, sync) + return sc.ExecuteUUID(ctx, uuid, "hangup", args, sync) } - return sc.Execute("hangup", args, sync) + return sc.Execute(ctx, "hangup", args, sync) } // BgApi - Helper designed to attach api in front of the command so that you do not need to write it -func (sc *SocketConnection) Api(command string) error { - return sc.Send("api " + command) +func (sc *SocketConnection) Api(ctx context.Context, command string) error { + return sc.Send(ctx, "api "+command) } // BgApi - Helper designed to attach bgapi in front of the command so that you do not need to write it -func (sc *SocketConnection) BgApi(command string) error { - return sc.Send("bgapi " + command) +func (sc *SocketConnection) BgApi(ctx context.Context, command string) error { + return sc.Send(ctx, "bgapi "+command) } // Connect - Helper designed to help you handle connection. Each outbound server when handling needs to connect e.g. accept // connection in order for you to do answer, hangup or do whatever else you wish to do -func (sc *SocketConnection) Connect() error { - return sc.Send("connect") +func (sc *SocketConnection) Connect(ctx context.Context) error { + return sc.Send(ctx, "connect") } // Exit - Used to send exit signal to ESL. It will basically hangup call and close connection -func (sc *SocketConnection) Exit() error { - return sc.Send("exit") +func (sc *SocketConnection) Exit(ctx context.Context) error { + return sc.Send(ctx, "exit") } diff --git a/server.go b/server.go index 1e296ec..a07e017 100644 --- a/server.go +++ b/server.go @@ -52,9 +52,8 @@ func (s *OutboundServer) Start() error { } conn := SocketConnection{ - Conn: c, - err: make(chan error), - m: make(chan *Message), + Conn: c, + receive: make(chan message), } Notice("Got new connection from: %s", conn.OriginatorAddr()) @@ -76,7 +75,7 @@ func (s *OutboundServer) Start() error { // Stop - Will close server connection once SIGTERM/Interrupt is received func (s *OutboundServer) Stop() { Warning("Stopping Outbound Server ...") - s.Close() + _ = s.Close() } // NewOutboundServer - Will instanciate new outbound server