Skip to content

Commit

Permalink
add context deadline support
Browse files Browse the repository at this point in the history
  • Loading branch information
eschmidbauer committed Nov 24, 2017
1 parent 54ed41e commit 36763b2
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 88 deletions.
7 changes: 3 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
language: go

go:
- 1.3
- 1.4
- 1.5
- 1.8
- 1.9
- tip

install:
- go get -v "github.com/smartystreets/goconvey"
- go get -v "github.com/op/go-logging"
Expand Down
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func (c *Client) EstablishConnection() error {

c.SocketConnection = SocketConnection{
Conn: conn,
err: make(chan error),
m: make(chan *Message),

receive: make(chan message),
}

return nil
Expand Down Expand Up @@ -102,7 +102,7 @@ func NewClient(host string, port uint, passwd string, timeout int) (*Client, err

err = client.Authenticate()
if err != nil {
client.Close()
_ = client.Close()
return nil, err
}

Expand Down
137 changes: 93 additions & 44 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package goesl
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"net"
Expand All @@ -18,11 +19,17 @@ import (
"time"
)

type message struct {
err error
m *Message
}

// Main connection against ESL - Gotta add more description here
type SocketConnection struct {
net.Conn
err chan error
m chan *Message

receive chan message

mtx sync.Mutex
}

Expand All @@ -32,7 +39,7 @@ func (c *SocketConnection) Dial(network string, addr string, timeout time.Durati
}

// Send - Will send raw message to open net connection
func (c *SocketConnection) Send(cmd string) error {
func (c *SocketConnection) Send(ctx context.Context, cmd string) error {

if strings.Contains(cmd, "\r\n") {
return fmt.Errorf(EInvalidCommandProvided, cmd)
Expand All @@ -42,6 +49,12 @@ func (c *SocketConnection) Send(cmd string) error {
c.mtx.Lock()
defer c.mtx.Unlock()

deadline, ok := ctx.Deadline()
if ok {
_ = c.SetWriteDeadline(deadline)
defer func() { _ = c.SetWriteDeadline(time.Time{}) }()
}

_, err := io.WriteString(c, cmd)
if err != nil {
return err
Expand All @@ -56,10 +69,10 @@ func (c *SocketConnection) Send(cmd string) error {
}

// SendMany - Will loop against passed commands and return 1st error if error happens
func (c *SocketConnection) SendMany(cmds []string) error {
func (c *SocketConnection) SendMany(ctx context.Context, cmds []string) error {

for _, cmd := range cmds {
if err := c.Send(cmd); err != nil {
if err := c.Send(ctx, cmd); err != nil {
return err
}
}
Expand All @@ -68,7 +81,7 @@ func (c *SocketConnection) SendMany(cmds []string) error {
}

// SendEvent - Will loop against passed event headers
func (c *SocketConnection) SendEvent(eventHeaders []string) error {
func (c *SocketConnection) SendEvent(ctx context.Context, eventHeaders []string) error {
if len(eventHeaders) <= 0 {
return fmt.Errorf(ECouldNotSendEvent, len(eventHeaders))
}
Expand All @@ -77,18 +90,20 @@ func (c *SocketConnection) SendEvent(eventHeaders []string) error {
c.mtx.Lock()
defer c.mtx.Unlock()

deadline, ok := ctx.Deadline()
if ok {
_ = c.SetWriteDeadline(deadline)
defer func() { _ = c.SetWriteDeadline(time.Time{}) }()
}

_, err := io.WriteString(c, "sendevent ")
if err != nil {
return err
}

for _, eventHeader := range eventHeaders {
_, err := io.WriteString(c, eventHeader)
if err != nil {
return err
}

_, err = io.WriteString(c, "\r\n")
_, err := io.WriteString(c, eventHeader+"\r\n")
if err != nil {
return err
}
Expand All @@ -104,27 +119,29 @@ func (c *SocketConnection) SendEvent(eventHeaders []string) error {
}

// Execute - Helper fuck to execute commands with its args and sync/async mode
func (c *SocketConnection) Execute(command, args string, sync bool) (m *Message, err error) {
return c.SendMsg(map[string]string{
"call-command": "execute",
"execute-app-name": command,
"execute-app-arg": args,
"event-lock": strconv.FormatBool(sync),
}, "", "")
func (c *SocketConnection) Execute(ctx context.Context, command, args string, sync bool) (m *Message, err error) {
return c.SendMsg(ctx,
map[string]string{
"call-command": "execute",
"execute-app-name": command,
"execute-app-arg": args,
"event-lock": strconv.FormatBool(sync),
}, "", "")
}

// ExecuteUUID - Helper fuck to execute uuid specific commands with its args and sync/async mode
func (c *SocketConnection) ExecuteUUID(uuid string, command string, args string, sync bool) (m *Message, err error) {
return c.SendMsg(map[string]string{
"call-command": "execute",
"execute-app-name": command,
"execute-app-arg": args,
"event-lock": strconv.FormatBool(sync),
}, uuid, "")
func (c *SocketConnection) ExecuteUUID(ctx context.Context, uuid string, command string, args string, sync bool) (m *Message, err error) {
return c.SendMsg(ctx,
map[string]string{
"call-command": "execute",
"execute-app-name": command,
"execute-app-arg": args,
"event-lock": strconv.FormatBool(sync),
}, uuid, "")
}

// SendMsg - Basically this func will send message to the opened connection
func (c *SocketConnection) SendMsg(msg map[string]string, uuid, data string) (m *Message, err error) {
func (c *SocketConnection) SendMsg(ctx context.Context, msg map[string]string, uuid, data string) (*Message, error) {

b := bytes.NewBufferString("sendmsg")

Expand Down Expand Up @@ -160,19 +177,26 @@ func (c *SocketConnection) SendMsg(msg map[string]string, uuid, data string) (m

// lock mutex
c.mtx.Lock()
_, err = b.WriteTo(c)
defer c.mtx.Unlock()

deadline, ok := ctx.Deadline()
if ok {
_ = c.SetWriteDeadline(deadline)
defer func() { _ = c.SetWriteDeadline(time.Time{}) }()
}

_, err := b.WriteTo(c)
if err != nil {
c.mtx.Unlock()
return nil, err
}
c.mtx.Unlock()

select {
case err := <-c.err:
m, err := c.ReadMessage(ctx)
if err != nil {
return nil, err
case m := <-c.m:
return m, nil
}

return m, nil

}

// OriginatorAdd - Will return originator address known as net.RemoteAddr()
Expand All @@ -183,40 +207,65 @@ func (c *SocketConnection) OriginatorAddr() net.Addr {

// ReadMessage - Will read message from channels and return them back accordingy.
// If error is received, error will be returned. If not, message will be returned back!
func (c *SocketConnection) ReadMessage() (*Message, error) {
func (c *SocketConnection) ReadMessage(ctx context.Context) (*Message, error) {
Debug("Waiting for connection message to be received ...")

var m message
select {
case err := <-c.err:
return nil, err
case msg := <-c.m:
return msg, nil
case m = <-c.receive:
case <-ctx.Done():
return nil, fmt.Errorf("context deadline exceeded")
}

if m.m == nil {
return nil, fmt.Errorf("unable to read message, channel closed")
}

if m.err != nil {
return nil, m.err
}

return m.m, nil
}

const (
defaultHandleTimeout = time.Second
)

// Handle - Will handle new messages and close connection when there are no messages left to process
func (c *SocketConnection) Handle() {

done := make(chan bool)
done := make(chan struct{})

go func() {
for {
msg, err := newMessage(bufio.NewReaderSize(c, ReadBufferSize), true)

msg, err := newMessage(bufio.NewReaderSize(c, ReadBufferSize), true)
if err != nil {
c.err <- err
done <- true

select {
case c.receive <- message{err: err}:
case <-time.After(defaultHandleTimeout):
}

close(done)
break
}

c.m <- msg
select {
case c.receive <- message{m: msg}:
case <-time.After(defaultHandleTimeout):
// if messages are getting dropped, receive syncronization will be messed up and unreliable
}
}
}()

<-done

close(c.receive)

// Closing the connection now as there's nothing left to do ...
c.Close()
_ = c.Close()
}

// Close - Will close down net connection and return error if error happen
Expand Down
12 changes: 8 additions & 4 deletions examples/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
package examples

import (
"context"
"flag"
"fmt"
. "github.com/0x19/goesl"
"runtime"
"strings"

. "github.com/0x19/goesl"
)

var (
Expand All @@ -24,6 +26,8 @@ var (
// Small client that will first make sure all events are returned as JSON and second, will originate
func main() {

ctx := context.Background()

// Boost it as much as it can go ...
runtime.GOMAXPROCS(runtime.NumCPU())

Expand All @@ -41,12 +45,12 @@ func main() {
// Remember that this is crutial part in handling incoming messages :)
go client.Handle()

client.Send("events json ALL")
client.Send(ctx, "events json ALL")

client.BgApi(fmt.Sprintf("originate %s %s", "sofia/internal/[email protected]", "&socket(192.168.1.2:8084 async full)"))
client.BgApi(ctx, fmt.Sprintf("originate %s %s", "sofia/internal/[email protected]", "&socket(192.168.1.2:8084 async full)"))

for {
msg, err := client.ReadMessage()
msg, err := client.ReadMessage(ctx)

if err != nil {

Expand Down
15 changes: 9 additions & 6 deletions examples/server_playback.go → examples/serverplayback.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
package examples

import (
"context"
"fmt"
. "github.com/0x19/goesl"
"os"
"runtime"
"strings"

. "github.com/0x19/goesl"
)

var welcomeFile = "%s/media/welcome.wav"
Expand Down Expand Up @@ -47,6 +49,7 @@ func main() {

// handle - Running under goroutine here to explain how to handle playback ( play to the caller )
func handle(s *OutboundServer) {
ctx := context.Background()

for {

Expand All @@ -55,12 +58,12 @@ func handle(s *OutboundServer) {
case conn := <-s.Conns:
Notice("New incomming connection: %v", conn)

if err := conn.Connect(); err != nil {
if err := conn.Connect(ctx); err != nil {
Error("Got error while accepting connection: %s", err)
break
}

answer, err := conn.ExecuteAnswer("", false)
answer, err := conn.ExecuteAnswer(ctx, "", false)

if err != nil {
Error("Got error while executing answer: %s", err)
Expand All @@ -72,14 +75,14 @@ func handle(s *OutboundServer) {

cUUID := answer.GetCallUUID()

if sm, err := conn.Execute("playback", welcomeFile, true); err != nil {
if sm, err := conn.Execute(ctx, "playback", welcomeFile, true); err != nil {
Error("Got error while executing playback: %s", err)
break
} else {
Debug("Playback Message: %s", sm)
}

if hm, err := conn.ExecuteHangup(cUUID, "", false); err != nil {
if hm, err := conn.ExecuteHangup(ctx, cUUID, "", false); err != nil {
Error("Got error while executing hangup: %s", err)
break
} else {
Expand All @@ -88,7 +91,7 @@ func handle(s *OutboundServer) {

go func() {
for {
msg, err := conn.ReadMessage()
msg, err := conn.ReadMessage(ctx)

if err != nil {

Expand Down
Loading

0 comments on commit 36763b2

Please sign in to comment.