Skip to content

Commit

Permalink
Fix/local client modifies msg (#237)
Browse files Browse the repository at this point in the history
Fix problem with local clients modifying contents of message

If local clients modified contents of a message, this could affect the message contents received by another client. Local clients must never receive the same message instance or messages with shared contents.
  • Loading branch information
gammazero authored Nov 30, 2020
1 parent 74fa74a commit 3df26ec
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 112 deletions.
26 changes: 11 additions & 15 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ type InvokeResult struct {
// the invocation was canceled.
var InvocationCanceled = InvokeResult{Err: wamp.ErrCanceled}

var emptyDict = wamp.Dict{}

// NewClient takes a connected Peer, joins the realm specified in cfg, and if
// successful, returns a new client.
//
Expand Down Expand Up @@ -166,7 +164,7 @@ func (c *Client) Subscribe(topic string, fn EventHandler, options wamp.Dict) err
}

if options == nil {
options = emptyDict
options = wamp.Dict{}
}
id := c.idGen.Next()
c.expectReply(id)
Expand Down Expand Up @@ -304,7 +302,7 @@ func (c *Client) Publish(topic string, options wamp.Dict, args wamp.List, kwargs

var pubAck bool
if options == nil {
options = emptyDict
options = wamp.Dict{}
} else {
// Check if the client is asking for a PUBLISHED response.
pubAck, _ = options[wamp.OptAcknowledge].(bool)
Expand Down Expand Up @@ -378,7 +376,7 @@ func (c *Client) Register(procedure string, fn InvocationHandler, options wamp.D
id := c.idGen.Next()
c.expectReply(id)
if options == nil {
options = emptyDict
options = wamp.Dict{}
}
c.sess.Send(&wamp.Register{
Request: id,
Expand Down Expand Up @@ -534,17 +532,17 @@ func (c *Client) Call(ctx context.Context, procedure string, options wamp.Dict,
return nil, ErrNotConn
}

if options == nil {
options = wamp.Dict{}
}

// If caller is willing to receive progressive results, create a channel to
// receive these on. Then, start a goroutine to receive progressive
// results and call the callback for each.
var progChan chan *wamp.Result
var progDone chan struct{}
if progcb != nil {
if options == nil {
options = wamp.Dict{}
}
options[wamp.OptReceiveProgress] = true

progChan = make(chan *wamp.Result)
progDone = make(chan struct{})
go func() {
Expand All @@ -553,8 +551,6 @@ func (c *Client) Call(ctx context.Context, procedure string, options wamp.Dict,
}
close(progDone)
}()
} else if options == nil {
options = emptyDict
}

id := c.idGen.Next()
Expand Down Expand Up @@ -661,7 +657,7 @@ func (c *Client) Close() error {

var stopped bool
if c.sess.SendCtx(sendCtx, &wamp.Goodbye{
Details: emptyDict,
Details: wamp.Dict{},
Reason: wamp.CloseRealm,
}) == nil {
// The router should respond with a GOODBYE message, which causes
Expand Down Expand Up @@ -1136,7 +1132,7 @@ func (c *Client) runHandleInvocation(msg *wamp.Invocation) {
c.sess.Send(&wamp.Error{
Type: wamp.INVOCATION,
Request: reqID,
Details: emptyDict,
Details: wamp.Dict{},
Error: wamp.ErrInvalidArgument,
Arguments: wamp.List{errMsg},
})
Expand Down Expand Up @@ -1219,7 +1215,7 @@ func (c *Client) runHandleInvocation(msg *wamp.Invocation) {
c.sess.SendCtx(c.ctx, &wamp.Error{
Type: wamp.INVOCATION,
Request: reqID,
Details: emptyDict,
Details: wamp.Dict{},
Arguments: result.Args,
ArgumentsKw: result.Kwargs,
Error: result.Err,
Expand All @@ -1228,7 +1224,7 @@ func (c *Client) runHandleInvocation(msg *wamp.Invocation) {
}
c.sess.SendCtx(c.ctx, &wamp.Yield{
Request: reqID,
Options: emptyDict,
Options: wamp.Dict{},
Arguments: result.Args,
ArgumentsKw: result.Kwargs,
})
Expand Down
72 changes: 72 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,9 @@ func (*testEmptyDictLeakAuthorizer) Authorize(sess *wamp.Session, message wamp.M
subMsg *wamp.Subscribe
ok bool
)
if _, ok = message.(*wamp.Goodbye); ok {
return true, nil
}
if subMsg, ok = message.(*wamp.Subscribe); !ok {
panic(fmt.Sprintf("I can only handle %T, saw %T", subMsg, message))
}
Expand Down Expand Up @@ -1111,3 +1114,72 @@ func TestEmptyDictLeak(t *testing.T) {
router1.Close()
router2.Close()
}

func TestEventContentSafety(t *testing.T) {
defer leaktest.Check(t)()

// Connect two subscribers and one publisher to router
sub1, sub2, r, err := connectedTestClients()
if err != nil {
t.Fatal("failed to connect subscribers clients:", err)
}
defer sub1.Close()
defer sub2.Close()
defer r.Close()
pub, err := newTestClient(r)
if err != nil {
t.Fatal("failed to connect published client:", err)
}
defer pub.Close()

errChan := make(chan error)
gate := make(chan struct{}, 1)
eventHandler := func(event *wamp.Event) {
gate <- struct{}{}
_, ok := event.Details["oops"]
if ok {
errChan <- errors.New("should not have seen oops")
<-gate
return
}
arg, ok := event.Arguments[0].(string)
if !ok {
errChan <- errors.New("arg was not strings")
<-gate
return
}
if arg != "Hello" {
errChan <- fmt.Errorf("expected \"Hello\", got %q", arg)
<-gate
return
}

event.Details["oops"] = true
event.Arguments[0] = "oops"
errChan <- nil
<-gate
}

// Expect invalid URI error if not setting match option.
if err = sub1.Subscribe(testTopic, eventHandler, nil); err != nil {
t.Fatal(err)
}
if err = sub2.Subscribe(testTopic, eventHandler, nil); err != nil {
t.Fatal(err)
}

if err = pub.Publish(testTopic, nil, wamp.List{"Hello"}, nil); err != nil {
t.Fatal("Failed to publish:", err)
}

for i := 0; i < 2; i++ {
select {
case err = <-errChan:
if err != nil {
t.Fatal(err)
}
case <-time.After(3 * time.Second):
t.Fatal("did not get published event")
}
}
}
Loading

0 comments on commit 3df26ec

Please sign in to comment.