Skip to content

Commit

Permalink
Critical client fix #114. Cleanup code
Browse files Browse the repository at this point in the history
Provides critical client fix where request IDs were incorrectly generated, and could lead to failed response when multiple clients called same procedure.

Code cleanup to obsolete the use of `wamp.OptionXXX` functions.
  • Loading branch information
gammazero authored Jun 24, 2018
1 parent 0f544f2 commit 8c1e19d
Show file tree
Hide file tree
Showing 22 changed files with 335 additions and 300 deletions.
12 changes: 6 additions & 6 deletions aat/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestJoinRealmWithCRAuth(t *testing.T) {
}

realmDetails := cli.RealmDetails()
if wamp.OptionString(realmDetails, "authrole") != "user" {
if s, _ := wamp.AsString(realmDetails["authrole"]); s != "user" {
t.Fatal("missing or incorrect authrole")
}

Expand Down Expand Up @@ -80,7 +80,7 @@ func TestJoinRealmWithCRCookieAuth(t *testing.T) {
details := cli.RealmDetails()

// Client should not have be authenticated by cookie first time.
if wamp.OptionFlag(details, "authbycookie") {
if ok, _ := wamp.AsBool(details["authbycookie"]); ok {
t.Fatal("authbycookie set incorrectly to true")
}

Expand All @@ -93,11 +93,11 @@ func TestJoinRealmWithCRCookieAuth(t *testing.T) {

// If websocket, then should be authenticated by cookie this time.
if cfg.WsCfg.Jar != nil {
if !wamp.OptionFlag(details, "authbycookie") {
if ok, _ := wamp.AsBool(details["authbycookie"]); !ok {
t.Fatal("should have been authenticated by cookie")
}
} else {
if wamp.OptionFlag(details, "authbycookie") {
if ok, _ := wamp.AsBool(details["authbycookie"]); ok {
t.Fatal("authbycookie set incorrectly to true")
}
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestAuthz(t *testing.T) {
t.Fatal("Call error:", err)
}
dict, _ := wamp.AsDict(result.Arguments[0])
if wamp.OptionString(dict, "foobar") != "" {
if _, ok := dict["foobar"]; ok {
t.Fatal("Should not have special info in session")
}

Expand All @@ -183,7 +183,7 @@ func TestAuthz(t *testing.T) {
t.Fatal("Call error:", err)
}
dict, _ = wamp.AsDict(result.Arguments[0])
if wamp.OptionString(dict, "foobar") != "baz" {
if s, _ := wamp.AsString(dict["foobar"]); s != "baz" {
t.Fatal("Missing special info in session")
}

Expand Down
2 changes: 1 addition & 1 deletion aat/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestPubSubWildcard(t *testing.T) {
errChan <- errors.New("event missing or bad args")
return
}
origTopic := wamp.OptionURI(details, "topic")
origTopic, _ := wamp.AsURI(details["topic"])
if origTopic != testTopic {
errChan <- errors.New("wrong original topic")
return
Expand Down
9 changes: 4 additions & 5 deletions aat/registrationmeta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,13 @@ func TestMetaEventRegOnCreateRegOnRegister(t *testing.T) {
errChanC <- errors.New("argument 0 (session) was not wamp.ID")
return
}
onCreateID = wamp.OptionID(dict, "id")
if wamp.OptionURI(dict, "uri") != wamp.URI("some.proc") {
onCreateID, _ = wamp.AsID(dict["id"])
if u, _ := wamp.AsURI(dict["uri"]); u != wamp.URI("some.proc") {
errChanC <- fmt.Errorf(
"on_create had wrong procedure, got '%v' want 'some.proc'",
wamp.OptionURI(dict, "uri"))
"on_create had wrong procedure, got '%v' want 'some.proc'", u)
return
}
if wamp.OptionString(dict, "created") == "" {
if s, _ := wamp.AsString(dict["created"]); s == "" {
errChanC <- errors.New("on_create missing created time")
return
}
Expand Down
88 changes: 75 additions & 13 deletions aat/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aat

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -42,34 +43,95 @@ func TestRPCRegisterAndCall(t *testing.T) {
t.Fatal("Failed to connect client:", err)
}

// Test calling the procedure.
callArgs := wamp.List{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
ctx := context.Background()
result, err := caller.Call(ctx, procName, nil, callArgs, nil, "")
// Connect second caller session.
caller2, err := connectClient()
if err != nil {
t.Fatal("Failed to call procedure:", err)
t.Fatal("Failed to connect client:", err)
}
sum, ok := wamp.AsInt64(result.Arguments[0])
if !ok {
t.Fatal("Could not convert result to int64")

// Connect third caller session.
caller3, err := connectClient()
if err != nil {
t.Fatal("Failed to connect client:", err)
}
if sum != 55 {
t.Fatal("Wrong result:", sum)

// Test calling the procedure.
callArgs := wamp.List{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
var result1, result2, result3 *wamp.Result
var err1, err2, err3 error
var ready, allDone sync.WaitGroup
release := make(chan struct{})
ready.Add(3)
allDone.Add(3)
go func() {
defer allDone.Done()
ready.Done()
<-release
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
result1, err1 = caller.Call(ctx, procName, nil, callArgs, nil, "")
}()
go func() {
defer allDone.Done()
// Call it with caller2.
ready.Done()
<-release
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
result2, err2 = caller2.Call(ctx, procName, nil, callArgs, nil, "")
}()
go func() {
// Call it with caller3.
defer allDone.Done()
ready.Done()
<-release
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
result3, err3 = caller3.Call(ctx, procName, nil, callArgs, nil, "")
}()

ready.Wait()
close(release)
allDone.Wait()

errs := []error{err1, err2, err3}
results := []*wamp.Result{result1, result2, result3}
for i := 0; i < 3; i++ {
if errs[i] != nil {
t.Error("Caller", i, "failed to call procedure:", errs[i])
} else {
sum, ok := wamp.AsInt64(results[i].Arguments[0])
if !ok {
t.Error("Could not convert result", i, "to int64")
} else if sum != 55 {
t.Errorf("Wrong result %d: %d", i, sum)
}
}
}

// Test unregister.
if err = callee.Unregister(procName); err != nil {
t.Fatal("Failed to unregister procedure:", err)
t.Error("Failed to unregister procedure:", err)
}

err = caller.Close()
if err != nil {
t.Fatal("Failed to disconnect client:", err)
t.Error("Failed to disconnect client:", err)
}

err = caller2.Close()
if err != nil {
t.Error("Failed to disconnect client:", err)
}

err = caller3.Close()
if err != nil {
t.Error("Failed to disconnect client:", err)
}

err = callee.Close()
if err != nil {
t.Fatal("Failed to disconnect client:", err)
t.Error("Failed to disconnect client:", err)
}
}

Expand Down
4 changes: 2 additions & 2 deletions aat/sessionmeta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestMetaEventOnJoin(t *testing.T) {
errChan <- errors.New("argument was not wamp.Dict")
return
}
onJoinID = wamp.OptionID(details, "session")
onJoinID, _ = wamp.AsID(details["session"])
errChan <- nil
}

Expand Down Expand Up @@ -436,7 +436,7 @@ func TestMetaProcSessionGet(t *testing.T) {
if !ok {
t.Fatal("Could not convert result to wamp.Dict")
}
resultID := wamp.OptionID(dict, "session")
resultID, _ := wamp.AsID(dict["session"])
if resultID != sess.ID() {
t.Fatal("Wrong session ID in result")
}
Expand Down
9 changes: 4 additions & 5 deletions aat/subscriptionmeta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,13 @@ func TestMetaEventOnCreateOnSubscribe(t *testing.T) {
errChanC <- errors.New("argument 0 (session) was not wamp.ID")
return
}
onCreateID = wamp.OptionID(dict, "id")
if wamp.OptionURI(dict, "uri") != wamp.URI("some.topic") {
onCreateID, _ = wamp.AsID(dict["id"])
if u, _ := wamp.AsURI(dict["uri"]); u != wamp.URI("some.topic") {
errChanC <- fmt.Errorf(
"on_create had wrong topic, got '%v' want 'some.topic'",
wamp.OptionURI(dict, "uri"))
"on_create had wrong topic, got '%v' want 'some.topic'", u)
return
}
if wamp.OptionString(dict, "created") == "" {
if s, _ := wamp.AsString(dict["created"]); s == "" {
errChanC <- errors.New("on_create missing created time")
return
}
Expand Down
32 changes: 16 additions & 16 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ type Client struct {
progGate map[context.Context]wamp.ID

actionChan chan func()
idGen *wamp.SyncIDGen

stopping chan struct{}
activeInvHandlers sync.WaitGroup
Expand Down Expand Up @@ -191,7 +190,6 @@ func NewClient(p wamp.Peer, cfg Config) (*Client, error) {
progGate: map[context.Context]wamp.ID{},

actionChan: make(chan func()),
idGen: new(wamp.SyncIDGen),
stopping: make(chan struct{}),
done: make(chan struct{}),

Expand Down Expand Up @@ -252,7 +250,7 @@ func (c *Client) Subscribe(topic string, fn EventHandler, options wamp.Dict) err
if options == nil {
options = wamp.Dict{}
}
id := c.idGen.Next()
id := wamp.GlobalID()
c.expectReply(id)
c.sess.Send(&wamp.Subscribe{
Request: id,
Expand Down Expand Up @@ -322,7 +320,7 @@ func (c *Client) Unsubscribe(topic string) error {
return err
}

id := c.idGen.Next()
id := wamp.GlobalID()
c.expectReply(id)
c.sess.Send(&wamp.Unsubscribe{
Request: id,
Expand Down Expand Up @@ -382,7 +380,7 @@ func (c *Client) Publish(topic string, options wamp.Dict, args wamp.List, kwargs
// Check if the client is asking for a PUBLISHED response.
pubAck, _ := options[wamp.OptAcknowledge].(bool)

id := c.idGen.Next()
id := wamp.GlobalID()
if pubAck {
c.expectReply(id)
}
Expand Down Expand Up @@ -443,7 +441,7 @@ type InvocationHandler func(context.Context, wamp.List, wamp.Dict, wamp.Dict) (r
//
// NOTE: Use consts defined in wamp/options.go instead of raw strings.
func (c *Client) Register(procedure string, fn InvocationHandler, options wamp.Dict) error {
id := c.idGen.Next()
id := wamp.GlobalID()
c.expectReply(id)
c.sess.Send(&wamp.Register{
Request: id,
Expand Down Expand Up @@ -517,7 +515,7 @@ func (c *Client) Unregister(procedure string) error {
return err
}

id := c.idGen.Next()
id := wamp.GlobalID()
c.expectReply(id)
c.sess.Send(&wamp.Unregister{
Request: id,
Expand Down Expand Up @@ -657,7 +655,7 @@ func (c *Client) CallProgress(ctx context.Context, procedure string, options wam
}()
}

id := c.idGen.Next()
id := wamp.GlobalID()
c.expectReply(id)
c.sess.Send(&wamp.Call{
Request: id,
Expand Down Expand Up @@ -848,7 +846,7 @@ func handleCRAuth(peer wamp.Peer, challenge *wamp.Challenge, authHandlers map[st
// If router sent back ABORT in response to client's authentication attempt
// return error.
if abort, ok := msg.(*wamp.Abort); ok {
authErr := wamp.OptionString(abort.Details, wamp.OptError)
authErr, _ := wamp.AsString(abort.Details[wamp.OptError])
if authErr == "" {
authErr = "authentication failed"
}
Expand Down Expand Up @@ -1002,10 +1000,11 @@ CollectResults:
}
// If this is a progressive result.
if progChan != nil {
result, ok := msg.(*wamp.Result)
if ok && wamp.OptionFlag(result.Details, wamp.OptProgress) {
progChan <- result
goto CollectResults
if result, ok := msg.(*wamp.Result); ok {
if ok, _ = wamp.AsBool(result.Details[wamp.OptProgress]); ok {
progChan <- result
goto CollectResults
}
}
}
c.actionChan <- func() {
Expand Down Expand Up @@ -1126,7 +1125,7 @@ func (c *Client) runHandleInvocation(msg *wamp.Invocation) {
// Create a kill switch so that invocation can be canceled.
var cancel context.CancelFunc
var ctx context.Context
timeout := wamp.OptionInt64(msg.Details, wamp.OptTimeout)
timeout, _ := wamp.AsInt64(msg.Details[wamp.OptTimeout])
if timeout > 0 {
// The caller specified a timeout, in milliseconds.
ctx, cancel = context.WithTimeout(context.Background(),
Expand All @@ -1139,7 +1138,7 @@ func (c *Client) runHandleInvocation(msg *wamp.Invocation) {

// If caller is accepting progressive results, create map entry to
// allow progress to be sent.
if wamp.OptionFlag(msg.Details, wamp.OptReceiveProgress) {
if ok, _ = wamp.AsBool(msg.Details[wamp.OptReceiveProgress]); ok {
c.progGate[ctx] = msg.Request
}

Expand Down Expand Up @@ -1230,7 +1229,8 @@ func (c *Client) runHandleInterrupt(msg *wamp.Interrupt) {
// If the interrupt mode is "killnowait", then the router is not
// waiting for a response, so do not send one. This is indicated by
// deleting the cancel for the invocation early.
if wamp.OptionString(msg.Options, wamp.OptMode) == wamp.CancelModeKillNoWait {
mode, _ := wamp.AsString(msg.Options[wamp.OptMode])
if mode == wamp.CancelModeKillNoWait {
delete(c.invHandlerKill, msg.Request)
}
cancel()
Expand Down
2 changes: 1 addition & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestSubscribe(t *testing.T) {
errChan <- errors.New("event missing or bad args")
return
}
origTopic := wamp.OptionURI(details, "topic")
origTopic, _ := wamp.AsURI(details["topic"])
if origTopic != wamp.URI(testTopic) {
errChan <- errors.New("wrong original topic")
return
Expand Down
4 changes: 2 additions & 2 deletions router/auth/anonymous_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ func TestAnonAuth(t *testing.T) {
t.Fatal("expected WELCOME message, got: ", welcome.MessageType())
}

if wamp.OptionString(welcome.Details, "authmethod") != "anonymous" {
if s, _ := wamp.AsString(welcome.Details["authmethod"]); s != "anonymous" {
t.Fatal("invalid authmethod in welcome details")
}
if wamp.OptionString(welcome.Details, "authrole") != "anonymous" {
if s, _ := wamp.AsString(welcome.Details["authrole"]); s != "anonymous" {
t.Fatal("incorrect authrole in welcome details")
}
}
2 changes: 1 addition & 1 deletion router/auth/crauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewCRAuthenticator(keyStore KeyStore, timeout time.Duration) *CRAuthenticat
func (cr *CRAuthenticator) AuthMethod() string { return "wampcra" }

func (cr *CRAuthenticator) Authenticate(sid wamp.ID, details wamp.Dict, client wamp.Peer) (*wamp.Welcome, error) {
authid := wamp.OptionString(details, "authid")
authid, _ := wamp.AsString(details["authid"])
if authid == "" {
return nil, errors.New("missing authid")
}
Expand Down
Loading

0 comments on commit 8c1e19d

Please sign in to comment.