diff --git a/.travis.yml b/.travis.yml index 741e7cdc..dea28289 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,8 @@ language: go go: - 1.11.x - 1.12.x + - 1.13.x + - tip env: global: diff --git a/Makefile b/Makefile index c75ca14b..3e0d9953 100644 --- a/Makefile +++ b/Makefile @@ -4,12 +4,8 @@ SERVICE_DIR = nexusd all: vet test service -$(GOPATH)/bin/shadow: - go install golang.org/x/tools/go/analysis/passes/shadow/cmd/shadow - -vet: $(GOPATH)/bin/shadow +vet: go vet -all -composites=false ./... - go vet -vettool=$(GOPATH)/bin/shadow ./... test: go get github.com/fortytw2/leaktest diff --git a/go.mod b/go.mod index 97f7eb16..6ddd35ab 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,10 @@ require ( github.com/gorilla/websocket v1.4.1 github.com/ugorji/go v1.1.7 github.com/ugorji/go/codec v1.1.7 - golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472 - golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297 // indirect - golang.org/x/sys v0.0.0-20190830142957-1e83adbbebd0 // indirect + golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7 + golang.org/x/net v0.0.0-20190912160710-24e19bdeb0f2 // indirect + golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect + golang.org/x/sys v0.0.0-20190913121621-c3b328c6e5a7 // indirect golang.org/x/text v0.3.2 // indirect - golang.org/x/tools v0.0.0-20190830223141-573d9926052a // indirect + golang.org/x/tools v0.0.0-20190914235951-31e00f45c22e // indirect ) diff --git a/go.sum b/go.sum index 79f5a5ac..1f336b25 100644 --- a/go.sum +++ b/go.sum @@ -17,18 +17,26 @@ golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f h1:R423Cnkcp5JABoeemiGEPl golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472 h1:Gv7RPwsi3eZ2Fgewe3CBsuOebPwO27PoXzRpJPsvSSM= golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7 h1:0hQKqeLdqlt5iIwVOBErRisrHJAN57yOiPRQItI20fU= +golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190912160710-24e19bdeb0f2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190830142957-1e83adbbebd0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904005037-43c01164e931/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190913121621-c3b328c6e5a7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190520200954-7e7c6e521403 h1:LkLPH7H115t/Cjvl2cV2VNZYdWWKt22DucZfiEgZ83o= golang.org/x/tools v0.0.0-20190520200954-7e7c6e521403/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190830223141-573d9926052a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20190903163617-be0da057c5e3/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20190914235951-31e00f45c22e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/router/broker.go b/router/broker.go index 5197b0f3..07bab2af 100644 --- a/router/broker.go +++ b/router/broker.go @@ -41,10 +41,7 @@ type subscription struct { subscribers map[*wamp.Session]struct{} } -// FilterFactory is a function which creates a PublishFilter from a publication -type FilterFactory func(msg *wamp.Publish) PublishFilter - -type Broker struct { +type broker struct { // topic -> subscription topicSubscription map[wamp.URI]*subscription pfxTopicSubscription map[wamp.URI]*subscription @@ -69,15 +66,15 @@ type Broker struct { filterFactory FilterFactory } -// NewBroker returns a new default broker implementation instance. -func NewBroker(logger stdlog.StdLog, strictURI, allowDisclose, debug bool, publishFilter FilterFactory) *Broker { +// newBroker returns a new default broker implementation instance. +func newBroker(logger stdlog.StdLog, strictURI, allowDisclose, debug bool, publishFilter FilterFactory) *broker { if logger == nil { panic("logger is nil") } if publishFilter == nil { publishFilter = NewSimplePublishFilter } - b := &Broker{ + b := &broker{ topicSubscription: map[wamp.URI]*subscription{}, pfxTopicSubscription: map[wamp.URI]*subscription{}, wcTopicSubscription: map[wamp.URI]*subscription{}, @@ -103,13 +100,13 @@ func NewBroker(logger stdlog.StdLog, strictURI, allowDisclose, debug bool, publi return b } -// Role returns the role information for the "broker" role. The data returned +// role returns the role information for the "broker" role. The data returned // is suitable for use as broker role info in a WELCOME message. -func (b *Broker) Role() wamp.Dict { +func (b *broker) role() wamp.Dict { return brokerRole } -// Publish finds all subscriptions for the topic being published to, including +// publish finds all subscriptions for the topic being published to, including // those matching the topic by pattern, and sends an event to the subscribers // of that topic. // @@ -118,7 +115,7 @@ func (b *Broker) Role() wamp.Dict { // // The Subscriber can detect the delivery of that same event on multiple // subscriptions via EVENT.PUBLISHED.Publication, which will be identical. -func (b *Broker) Publish(pub *wamp.Session, msg *wamp.Publish) { +func (b *broker) publish(pub *wamp.Session, msg *wamp.Publish) { if pub == nil || msg == nil { panic("broker.Publish with nil session or message") } @@ -177,7 +174,7 @@ func (b *Broker) Publish(pub *wamp.Session, msg *wamp.Publish) { filter := b.filterFactory(msg) b.actionChan <- func() { - b.publish(pub, msg, pubID, excludePub, disclose, filter) + b.syncPublish(pub, msg, pubID, excludePub, disclose, filter) } // Send PUBLISHED message if acknowledge is present and true. @@ -186,7 +183,7 @@ func (b *Broker) Publish(pub *wamp.Session, msg *wamp.Publish) { } } -// Subscribe subscribes the client to the given topic. +// subscribe subscribes the client to the given topic. // // In case of receiving a SUBSCRIBE message from the same Subscriber and to // already subscribed topic, Broker should answer with SUBSCRIBED message, @@ -196,7 +193,7 @@ func (b *Broker) Publish(pub *wamp.Session, msg *wamp.Publish) { // Subscriber might want to subscribe to topics based on a pattern. If the // Broker and the Subscriber support pattern-based subscriptions, this matching // can happen by prefix-matching policy or wildcard-matching policy. -func (b *Broker) Subscribe(sub *wamp.Session, msg *wamp.Subscribe) { +func (b *broker) subscribe(sub *wamp.Session, msg *wamp.Subscribe) { if sub == nil || msg == nil { panic("broker.Subscribe with nil session or message") } @@ -219,39 +216,39 @@ func (b *Broker) Subscribe(sub *wamp.Session, msg *wamp.Subscribe) { } b.actionChan <- func() { - b.subscribe(sub, msg, match) + b.syncSubscribe(sub, msg, match) } } -// Unsubscribe removes the requested subscription. -func (b *Broker) Unsubscribe(sub *wamp.Session, msg *wamp.Unsubscribe) { +// unsubscribe removes the requested subscription. +func (b *broker) unsubscribe(sub *wamp.Session, msg *wamp.Unsubscribe) { if sub == nil || msg == nil { panic("broker.Unsubscribe with nil session or message") } b.actionChan <- func() { - b.unsubscribe(sub, msg) + b.syncUnsubscribe(sub, msg) } } -// RemoveSession removes all subscriptions of the subscriber. This is called +// removeSession removes all subscriptions of the subscriber. This is called // when a client leaves the realm by sending a GOODBYE message or by // disconnecting from the router. If there are any subscriptions for this // session a wamp.subscription.on_delete meta event is published for each. -func (b *Broker) RemoveSession(sess *wamp.Session) { +func (b *broker) removeSession(sess *wamp.Session) { if sess == nil { return } b.actionChan <- func() { - b.removeSession(sess) + b.syncRemoveSession(sess) } } // Close stops the broker, letting already queued actions finish. -func (b *Broker) Close() { +func (b *broker) close() { close(b.actionChan) } -func (b *Broker) run() { +func (b *broker) run() { for action := range b.actionChan { action() } @@ -260,30 +257,30 @@ func (b *Broker) run() { } } -func (b *Broker) publish(pub *wamp.Session, msg *wamp.Publish, pubID wamp.ID, excludePub, disclose bool, filter PublishFilter) { +func (b *broker) syncPublish(pub *wamp.Session, msg *wamp.Publish, pubID wamp.ID, excludePub, disclose bool, filter PublishFilter) { // Publish to subscribers with exact match. if sub, ok := b.topicSubscription[msg.Topic]; ok { - b.pubEvent(pub, msg, pubID, sub, excludePub, false, disclose, filter) + b.syncPubEvent(pub, msg, pubID, sub, excludePub, false, disclose, filter) } // Publish to subscribers with prefix match. for pfxTopic, sub := range b.pfxTopicSubscription { if msg.Topic.PrefixMatch(pfxTopic) { - b.pubEvent(pub, msg, pubID, sub, excludePub, true, disclose, filter) + b.syncPubEvent(pub, msg, pubID, sub, excludePub, true, disclose, filter) } } // Publish to subscribers with wildcard match. for wcTopic, sub := range b.wcTopicSubscription { if msg.Topic.WildcardMatch(wcTopic) { - b.pubEvent(pub, msg, pubID, sub, excludePub, true, disclose, filter) + b.syncPubEvent(pub, msg, pubID, sub, excludePub, true, disclose, filter) } } } -func (b *Broker) newSubscription(subscriber *wamp.Session, topic wamp.URI, match string) *subscription { +func newSubscription(id wamp.ID, subscriber *wamp.Session, topic wamp.URI, match string) *subscription { return &subscription{ - id: b.idGen.Next(), + id: id, topic: topic, match: match, created: wamp.NowISO8601(), @@ -291,7 +288,7 @@ func (b *Broker) newSubscription(subscriber *wamp.Session, topic wamp.URI, match } } -func (b *Broker) subscribe(subscriber *wamp.Session, msg *wamp.Subscribe, match string) { +func (b *broker) syncSubscribe(subscriber *wamp.Session, msg *wamp.Subscribe, match string) { var sub *subscription var existingSub bool @@ -301,7 +298,7 @@ func (b *Broker) subscribe(subscriber *wamp.Session, msg *wamp.Subscribe, match sub, existingSub = b.pfxTopicSubscription[msg.Topic] if !existingSub { // Create a new prefix subscription. - sub = b.newSubscription(subscriber, msg.Topic, match) + sub = newSubscription(b.idGen.Next(), subscriber, msg.Topic, match) b.pfxTopicSubscription[msg.Topic] = sub } case wamp.MatchWildcard: @@ -309,7 +306,7 @@ func (b *Broker) subscribe(subscriber *wamp.Session, msg *wamp.Subscribe, match sub, existingSub = b.wcTopicSubscription[msg.Topic] if !existingSub { // Create a new wildcard subscription. - sub = b.newSubscription(subscriber, msg.Topic, match) + sub = newSubscription(b.idGen.Next(), subscriber, msg.Topic, match) b.wcTopicSubscription[msg.Topic] = sub } default: @@ -317,7 +314,7 @@ func (b *Broker) subscribe(subscriber *wamp.Session, msg *wamp.Subscribe, match sub, existingSub = b.topicSubscription[msg.Topic] if !existingSub { // Create a new subscription. - sub = b.newSubscription(subscriber, msg.Topic, match) + sub = newSubscription(b.idGen.Next(), subscriber, msg.Topic, match) b.topicSubscription[msg.Topic] = sub } } @@ -351,16 +348,16 @@ func (b *Broker) subscribe(subscriber *wamp.Session, msg *wamp.Subscribe, match b.trySend(subscriber, &wamp.Subscribed{Request: msg.Request, Subscription: sub.id}) if !existingSub { - b.pubSubCreateMeta(msg.Topic, subscriber.ID, sub) + b.syncPubSubCreateMeta(msg.Topic, subscriber.ID, sub) } // Publish WAMP on_subscribe meta event. - b.pubSubMeta(wamp.MetaEventSubOnSubscribe, subscriber.ID, sub.id) + b.syncPubSubMeta(wamp.MetaEventSubOnSubscribe, subscriber.ID, sub.id) } -// deleteSubscription removes the the ID->subscription mapping and removes the -// topic->subscription mapping. -func (b *Broker) delSubscription(sub *subscription) { +// syncDeleteSubscription removes the the ID->subscription mapping and removes +// the topic->subscription mapping. +func (b *broker) syncDelSubscription(sub *subscription) { // Remove ID -> subscription. delete(b.subscriptions, sub.id) @@ -375,8 +372,8 @@ func (b *Broker) delSubscription(sub *subscription) { } } -// unsibsubscribe removes the subscriber from the specified subscription. -func (b *Broker) unsubscribe(subscriber *wamp.Session, msg *wamp.Unsubscribe) { +// syncUnsibsubscribe removes the subscriber from the specified subscription. +func (b *broker) syncUnsubscribe(subscriber *wamp.Session, msg *wamp.Unsubscribe) { subID := msg.Subscription sub, ok := b.subscriptions[subID] if !ok { @@ -396,7 +393,7 @@ func (b *Broker) unsubscribe(subscriber *wamp.Session, msg *wamp.Unsubscribe) { // send on_delete meta event. var delLastSub bool if len(sub.subscribers) == 0 { - b.delSubscription(sub) + b.syncDelSubscription(sub) delLastSub = true } @@ -419,16 +416,16 @@ func (b *Broker) unsubscribe(subscriber *wamp.Session, msg *wamp.Unsubscribe) { b.trySend(subscriber, &wamp.Unsubscribed{Request: msg.Request}) // Publish WAMP unsubscribe meta event. - b.pubSubMeta(wamp.MetaEventSubOnUnsubscribe, subscriber.ID, subID) + b.syncPubSubMeta(wamp.MetaEventSubOnUnsubscribe, subscriber.ID, subID) if delLastSub { // Fired when a subscription is deleted after the last session attached // to it has been removed. - b.pubSubMeta(wamp.MetaEventSubOnDelete, subscriber.ID, subID) + b.syncPubSubMeta(wamp.MetaEventSubOnDelete, subscriber.ID, subID) } } -// removeSession removed all subscriptions for the session. -func (b *Broker) removeSession(subscriber *wamp.Session) { +// syncRemoveSession removed all subscriptions for the session. +func (b *broker) syncRemoveSession(subscriber *wamp.Session) { subIDSet, ok := b.sessionSubIDSet[subscriber] if !ok { return @@ -449,32 +446,17 @@ func (b *Broker) removeSession(subscriber *wamp.Session) { // If no more subscribers on this subscription. if len(sub.subscribers) == 0 { - b.delSubscription(sub) + b.syncDelSubscription(sub) // Fired when a subscription is deleted after the last // session attached to it has been removed. - b.pubSubMeta(wamp.MetaEventSubOnDelete, subscriber.ID, subID) + b.syncPubSubMeta(wamp.MetaEventSubOnDelete, subscriber.ID, subID) } } } -func allowPublish(sub *wamp.Session, filter PublishFilter) bool { - if filter == nil { - return true - } - // Create a safe session to prevent access to the session.Peer. - safeSession := wamp.Session{ - ID: sub.ID, - Details: sub.Details, - } - sub.Lock() - ok := filter.Allowed(&safeSession) - sub.Unlock() - return ok -} - -// pubEvent sends an event to all subscribers that are not excluded from +// syncPubEvent sends an event to all subscribers that are not excluded from // receiving the event. -func (b *Broker) pubEvent(pub *wamp.Session, msg *wamp.Publish, pubID wamp.ID, sub *subscription, excludePublisher, sendTopic, disclose bool, filter PublishFilter) { +func (b *broker) syncPubEvent(pub *wamp.Session, msg *wamp.Publish, pubID wamp.ID, sub *subscription, excludePublisher, sendTopic, disclose bool, filter PublishFilter) { for subscriber, _ := range sub.subscribers { // Do not send event to publisher. if subscriber == pub && excludePublisher { @@ -482,8 +464,18 @@ func (b *Broker) pubEvent(pub *wamp.Session, msg *wamp.Publish, pubID wamp.ID, s } // Check if receiver is restricted. - if !allowPublish(subscriber, filter) { - continue + if filter != nil { + // Create a safe session to prevent access to the session.Peer. + safeSession := wamp.Session{ + ID: subscriber.ID, + Details: subscriber.Details, + } + subscriber.Lock() + ok := filter.Allowed(&safeSession) + subscriber.Unlock() + if !ok { + continue + } } details := wamp.Dict{} @@ -511,9 +503,9 @@ func (b *Broker) pubEvent(pub *wamp.Session, msg *wamp.Publish, pubID wamp.ID, s } } -// pubMeta publishes the subscription meta event, using the supplied function, -// to the matching subscribers. -func (b *Broker) pubMeta(metaTopic wamp.URI, sendMeta func(metaSub *subscription, sendTopic bool)) { +// syncPubMeta publishes the subscription meta event, using the supplied +// function, to the matching subscribers. +func (b *broker) syncPubMeta(metaTopic wamp.URI, sendMeta func(metaSub *subscription, sendTopic bool)) { // Publish to subscribers with exact match. if metaSub, ok := b.topicSubscription[metaTopic]; ok { sendMeta(metaSub, false) @@ -532,11 +524,11 @@ func (b *Broker) pubMeta(metaTopic wamp.URI, sendMeta func(metaSub *subscription } } -// pubSubMeta publishes a subscription meta event when a subscription is added, -// removed, or deleted. -func (b *Broker) pubSubMeta(metaTopic wamp.URI, subSessID, subID wamp.ID) { +// syncPubSubMeta publishes a subscription meta event when a subscription is +// added, removed, or deleted. +func (b *broker) syncPubSubMeta(metaTopic wamp.URI, subSessID, subID wamp.ID) { pubID := wamp.GlobalID() // create here so that it is same for all events - sendMeta := func(metaSub *subscription, sendTopic bool) { + b.syncPubMeta(metaTopic, func(metaSub *subscription, sendTopic bool) { if len(metaSub.subscribers) == 0 { return } @@ -558,17 +550,16 @@ func (b *Broker) pubSubMeta(metaTopic wamp.URI, subSessID, subID wamp.ID) { Arguments: wamp.List{subSessID, subID}, }) } - } - b.pubMeta(metaTopic, sendMeta) + }) } -// pubSubCreateMeta publishes a meta event on subscription creation. +// syncPubSubCreateMeta publishes a meta event on subscription creation. // // Fired when a subscription is created through a subscription request for a // topic which was previously without subscribers. -func (b *Broker) pubSubCreateMeta(topic wamp.URI, subSessID wamp.ID, sub *subscription) { +func (b *broker) syncPubSubCreateMeta(topic wamp.URI, subSessID wamp.ID, sub *subscription) { pubID := wamp.GlobalID() // create here so that it is same for all events - sendMeta := func(metaSub *subscription, sendTopic bool) { + b.syncPubMeta(wamp.MetaEventSubOnCreate, func(metaSub *subscription, sendTopic bool) { if len(metaSub.subscribers) == 0 { return } @@ -597,11 +588,10 @@ func (b *Broker) pubSubCreateMeta(topic wamp.URI, subSessID wamp.ID, sub *subscr Arguments: wamp.List{subSessID, subDetails}, }) } - } - b.pubMeta(wamp.MetaEventSubOnCreate, sendMeta) + }) } -func (b *Broker) trySend(sess *wamp.Session, msg wamp.Message) bool { +func (b *broker) trySend(sess *wamp.Session, msg wamp.Message) bool { if err := sess.TrySend(msg); err != nil { b.log.Printf("!!! Dropped %s to session %s: %s", msg.MessageType(), sess, err) return false @@ -625,8 +615,8 @@ func disclosePublisher(pub *wamp.Session, details wamp.Dict) { // ----- Subscription Meta Procedure Handlers ----- -// SubList retrieves subscription IDs listed according to match policies. -func (b *Broker) SubList(msg *wamp.Invocation) wamp.Message { +// subList retrieves subscription IDs listed according to match policies. +func (b *broker) subList(msg *wamp.Invocation) wamp.Message { var exactSubs, pfxSubs, wcSubs []wamp.ID sync := make(chan struct{}) b.actionChan <- func() { @@ -654,9 +644,9 @@ func (b *Broker) SubList(msg *wamp.Invocation) wamp.Message { } } -// SubLookup obtains the subscription (if any) managing a topic, according +// subLookup obtains the subscription (if any) managing a topic, according // to some match policy. -func (b *Broker) SubLookup(msg *wamp.Invocation) wamp.Message { +func (b *broker) subLookup(msg *wamp.Invocation) wamp.Message { var subID wamp.ID if len(msg.Arguments) != 0 { if topic, ok := wamp.AsURI(msg.Arguments[0]); ok { @@ -692,9 +682,9 @@ func (b *Broker) SubLookup(msg *wamp.Invocation) wamp.Message { } } -// SubMatch retrieves a list of IDs of subscriptions matching a topic URI, +// subMatch retrieves a list of IDs of subscriptions matching a topic URI, // irrespective of match policy. -func (b *Broker) SubMatch(msg *wamp.Invocation) wamp.Message { +func (b *broker) subMatch(msg *wamp.Invocation) wamp.Message { var subIDs []wamp.ID if len(msg.Arguments) != 0 { if topic, ok := wamp.AsURI(msg.Arguments[0]); ok { @@ -730,8 +720,8 @@ func (b *Broker) SubMatch(msg *wamp.Invocation) wamp.Message { } } -// SubGet retrieves information on a particular subscription. -func (b *Broker) SubGet(msg *wamp.Invocation) wamp.Message { +// subGet retrieves information on a particular subscription. +func (b *broker) subGet(msg *wamp.Invocation) wamp.Message { var dict wamp.Dict if len(msg.Arguments) != 0 { if subID, ok := wamp.AsID(msg.Arguments[0]); ok { @@ -764,9 +754,9 @@ func (b *Broker) SubGet(msg *wamp.Invocation) wamp.Message { } } -// SubListSubscribers retrieves a list of session IDs for sessions currently +// subListSubscribers retrieves a list of session IDs for sessions currently // attached to the subscription. -func (b *Broker) SubListSubscribers(msg *wamp.Invocation) wamp.Message { +func (b *broker) subListSubscribers(msg *wamp.Invocation) wamp.Message { var subscriberIDs []wamp.ID if len(msg.Arguments) != 0 { if subID, ok := wamp.AsID(msg.Arguments[0]); ok { @@ -799,9 +789,9 @@ func (b *Broker) SubListSubscribers(msg *wamp.Invocation) wamp.Message { } } -// SubCountSubscribers obtains the number of sessions currently attached to the +// subCountSubscribers obtains the number of sessions currently attached to the // subscription. -func (b *Broker) SubCountSubscribers(msg *wamp.Invocation) wamp.Message { +func (b *broker) subCountSubscribers(msg *wamp.Invocation) wamp.Message { var count int var ok bool if len(msg.Arguments) != 0 { diff --git a/router/broker_test.go b/router/broker_test.go index f16d3dd0..0a0fd95a 100644 --- a/router/broker_test.go +++ b/router/broker_test.go @@ -36,11 +36,11 @@ func (p *testPeer) Close() { return } func TestBasicSubscribe(t *testing.T) { // Test subscribing to a topic. - broker := NewBroker(logger, false, true, debug, nil) + broker := newBroker(logger, false, true, debug, nil) subscriber := newTestPeer() sess := wamp.NewSession(subscriber, 0, nil, nil) testTopic := wamp.URI("nexus.test.topic") - broker.Subscribe(sess, &wamp.Subscribe{Request: 123, Topic: testTopic}) + broker.subscribe(sess, &wamp.Subscribe{Request: 123, Topic: testTopic}) // Test that subscriber received SUBSCRIBED message rsp := <-sess.Recv() @@ -77,7 +77,7 @@ func TestBasicSubscribe(t *testing.T) { } // Test subscribing to same topic again. - broker.Subscribe(sess, &wamp.Subscribe{Request: 123, Topic: testTopic}) + broker.subscribe(sess, &wamp.Subscribe{Request: 123, Topic: testTopic}) // Test that subscriber received SUBSCRIBED message rsp = <-sess.Recv() subMsg, ok = rsp.(*wamp.Subscribed) @@ -107,7 +107,7 @@ func TestBasicSubscribe(t *testing.T) { // Test subscribing to different topic. testTopic2 := wamp.URI("nexus.test.topic2") - broker.Subscribe(sess, &wamp.Subscribe{Request: 123, Topic: testTopic2}) + broker.subscribe(sess, &wamp.Subscribe{Request: 123, Topic: testTopic2}) // Test that subscriber received SUBSCRIBED message rsp = <-sess.Recv() subMsg, ok = rsp.(*wamp.Subscribed) @@ -142,20 +142,20 @@ func TestBasicSubscribe(t *testing.T) { } func TestUnsubscribe(t *testing.T) { - broker := NewBroker(logger, false, true, debug, nil) + broker := newBroker(logger, false, true, debug, nil) testTopic := wamp.URI("nexus.test.topic") // Subscribe session1 to topic subscriber := newTestPeer() sess1 := wamp.NewSession(subscriber, 0, nil, nil) - broker.Subscribe(sess1, &wamp.Subscribe{Request: 123, Topic: testTopic}) + broker.subscribe(sess1, &wamp.Subscribe{Request: 123, Topic: testTopic}) rsp := <-sess1.Recv() subID := rsp.(*wamp.Subscribed).Subscription // Subscribe session2 to topic subscriber2 := newTestPeer() sess2 := wamp.NewSession(subscriber2, 0, nil, nil) - broker.Subscribe(sess2, &wamp.Subscribe{Request: 567, Topic: testTopic}) + broker.subscribe(sess2, &wamp.Subscribe{Request: 567, Topic: testTopic}) rsp = <-sess2.Recv() subID2 := rsp.(*wamp.Subscribed).Subscription @@ -186,7 +186,7 @@ func TestUnsubscribe(t *testing.T) { } // Test unsubscribing session1 from topic. - broker.Unsubscribe(sess1, &wamp.Unsubscribe{Request: 124, Subscription: subID}) + broker.unsubscribe(sess1, &wamp.Unsubscribe{Request: 124, Subscription: subID}) // Check that session received UNSUBSCRIBED message. rsp = <-sess1.Recv() unsub, ok := rsp.(*wamp.Unsubscribed) @@ -225,7 +225,7 @@ func TestUnsubscribe(t *testing.T) { } // Test unsubscribing session2 from topic. - broker.Unsubscribe(sess2, &wamp.Unsubscribe{Request: 124, Subscription: subID}) + broker.unsubscribe(sess2, &wamp.Unsubscribe{Request: 124, Subscription: subID}) // Check that session received UNSUBSCRIBED message. rsp = <-sess2.Recv() unsub, ok = rsp.(*wamp.Unsubscribed) @@ -250,25 +250,25 @@ func TestUnsubscribe(t *testing.T) { func TestRemove(t *testing.T) { // Subscribe to topic - broker := NewBroker(logger, false, true, debug, nil) + broker := newBroker(logger, false, true, debug, nil) subscriber := newTestPeer() sess := wamp.NewSession(subscriber, 0, nil, nil) testTopic := wamp.URI("nexus.test.topic") - broker.Subscribe(sess, &wamp.Subscribe{Request: 123, Topic: testTopic}) + broker.subscribe(sess, &wamp.Subscribe{Request: 123, Topic: testTopic}) rsp := <-sess.Recv() subID := rsp.(*wamp.Subscribed).Subscription testTopic2 := wamp.URI("nexus.test.topic2") - broker.Subscribe(sess, &wamp.Subscribe{Request: 456, Topic: testTopic2}) + broker.subscribe(sess, &wamp.Subscribe{Request: 456, Topic: testTopic2}) rsp = <-sess.Recv() subID2 := rsp.(*wamp.Subscribed).Subscription - broker.RemoveSession(sess) + broker.removeSession(sess) // Wait for another subscriber as a way to wait for the RemoveSession to // complete. sess2 := wamp.NewSession(subscriber, 0, nil, nil) - broker.Subscribe(sess2, + broker.subscribe(sess2, &wamp.Subscribe{Request: 789, Topic: wamp.URI("nexus.test.sync")}) rsp = <-sess2.Recv() @@ -292,7 +292,7 @@ func TestRemove(t *testing.T) { } func TestBasicPubSub(t *testing.T) { - broker := NewBroker(logger, false, true, debug, nil) + broker := newBroker(logger, false, true, debug, nil) subscriber := newTestPeer() sess := wamp.NewSession(subscriber, 0, nil, nil) testTopic := wamp.URI("nexus.test.topic") @@ -300,7 +300,7 @@ func TestBasicPubSub(t *testing.T) { Request: 123, Topic: testTopic, } - broker.Subscribe(sess, msg) + broker.subscribe(sess, msg) // Test that subscriber received SUBSCRIBED message rsp := <-sess.Recv() @@ -311,7 +311,7 @@ func TestBasicPubSub(t *testing.T) { publisher := newTestPeer() pubSess := wamp.NewSession(publisher, 0, nil, nil) - broker.Publish(pubSess, &wamp.Publish{Request: 124, Topic: testTopic, + broker.publish(pubSess, &wamp.Publish{Request: 124, Topic: testTopic, Arguments: wamp.List{"hello world"}}) rsp = <-sess.Recv() evt, ok := rsp.(*wamp.Event) @@ -331,7 +331,7 @@ func TestBasicPubSub(t *testing.T) { func TestPrefxPatternBasedSubscription(t *testing.T) { // Test match=prefix - broker := NewBroker(logger, false, true, debug, nil) + broker := newBroker(logger, false, true, debug, nil) subscriber := newTestPeer() sess := wamp.NewSession(subscriber, 0, nil, nil) testTopic := wamp.URI("nexus.test.topic") @@ -341,7 +341,7 @@ func TestPrefxPatternBasedSubscription(t *testing.T) { Topic: testTopicPfx, Options: wamp.Dict{"match": "prefix"}, } - broker.Subscribe(sess, msg) + broker.subscribe(sess, msg) // Test that subscriber received SUBSCRIBED message rsp := <-sess.Recv() @@ -376,7 +376,7 @@ func TestPrefxPatternBasedSubscription(t *testing.T) { publisher := newTestPeer() pubSess := wamp.NewSession(publisher, 0, nil, nil) - broker.Publish(pubSess, &wamp.Publish{Request: 124, Topic: testTopic}) + broker.publish(pubSess, &wamp.Publish{Request: 124, Topic: testTopic}) rsp = <-sess.Recv() evt, ok := rsp.(*wamp.Event) if !ok { @@ -394,7 +394,7 @@ func TestPrefxPatternBasedSubscription(t *testing.T) { func TestWildcardPatternBasedSubscription(t *testing.T) { // Test match=prefix - broker := NewBroker(logger, false, true, debug, nil) + broker := newBroker(logger, false, true, debug, nil) subscriber := newTestPeer() sess := wamp.NewSession(subscriber, 0, nil, nil) testTopic := wamp.URI("nexus.test.topic") @@ -404,7 +404,7 @@ func TestWildcardPatternBasedSubscription(t *testing.T) { Topic: testTopicWc, Options: wamp.Dict{"match": "wildcard"}, } - broker.Subscribe(sess, msg) + broker.subscribe(sess, msg) // Test that subscriber received SUBSCRIBED message rsp := <-sess.Recv() @@ -448,7 +448,7 @@ func TestWildcardPatternBasedSubscription(t *testing.T) { publisher := newTestPeer() pubSess := wamp.NewSession(publisher, 0, nil, nil) - broker.Publish(pubSess, &wamp.Publish{Request: 124, Topic: testTopic}) + broker.publish(pubSess, &wamp.Publish{Request: 124, Topic: testTopic}) rsp = <-sess.Recv() evt, ok := rsp.(*wamp.Event) if !ok { @@ -465,7 +465,7 @@ func TestWildcardPatternBasedSubscription(t *testing.T) { } func TestSubscriberBlackwhiteListing(t *testing.T) { - broker := NewBroker(logger, false, true, debug, nil) + broker := newBroker(logger, false, true, debug, nil) subscriber := newTestPeer() details := wamp.Dict{ "authid": "jdoe", @@ -474,7 +474,7 @@ func TestSubscriberBlackwhiteListing(t *testing.T) { sess := wamp.NewSession(subscriber, wamp.GlobalID(), details, nil) testTopic := wamp.URI("nexus.test.topic") - broker.Subscribe(sess, &wamp.Subscribe{Request: 123, Topic: testTopic}) + broker.subscribe(sess, &wamp.Subscribe{Request: 123, Topic: testTopic}) // Test that subscriber received SUBSCRIBED message rsp := <-sess.Recv() @@ -497,7 +497,7 @@ func TestSubscriberBlackwhiteListing(t *testing.T) { pubSess := wamp.NewSession(publisher, 0, details, nil) // Test whilelist - broker.Publish(pubSess, &wamp.Publish{ + broker.publish(pubSess, &wamp.Publish{ Request: 124, Topic: testTopic, Options: wamp.Dict{"eligible": wamp.List{sess.ID}}, @@ -507,7 +507,7 @@ func TestSubscriberBlackwhiteListing(t *testing.T) { t.Fatal("not allowed by whitelist") } // Test whitelist authrole - broker.Publish(pubSess, &wamp.Publish{ + broker.publish(pubSess, &wamp.Publish{ Request: 125, Topic: testTopic, Options: wamp.Dict{"eligible_authrole": wamp.List{"admin"}}, @@ -517,7 +517,7 @@ func TestSubscriberBlackwhiteListing(t *testing.T) { t.Fatal("not allowed by authrole whitelist") } // Test whitelist authid - broker.Publish(pubSess, &wamp.Publish{ + broker.publish(pubSess, &wamp.Publish{ Request: 126, Topic: testTopic, Options: wamp.Dict{"eligible_authid": wamp.List{"jdoe"}}, @@ -528,7 +528,7 @@ func TestSubscriberBlackwhiteListing(t *testing.T) { } // Test blacklist. - broker.Publish(pubSess, &wamp.Publish{ + broker.publish(pubSess, &wamp.Publish{ Request: 127, Topic: testTopic, Options: wamp.Dict{"exclude": wamp.List{sess.ID}}, @@ -538,7 +538,7 @@ func TestSubscriberBlackwhiteListing(t *testing.T) { t.Fatal("not excluded by blacklist") } // Test blacklist authrole - broker.Publish(pubSess, &wamp.Publish{ + broker.publish(pubSess, &wamp.Publish{ Request: 128, Topic: testTopic, Options: wamp.Dict{"exclude_authrole": wamp.List{"admin"}}, @@ -548,7 +548,7 @@ func TestSubscriberBlackwhiteListing(t *testing.T) { t.Fatal("not excluded by authrole blacklist") } // Test blacklist authid - broker.Publish(pubSess, &wamp.Publish{ + broker.publish(pubSess, &wamp.Publish{ Request: 129, Topic: testTopic, Options: wamp.Dict{"exclude_authid": wamp.List{"jdoe"}}, @@ -559,7 +559,7 @@ func TestSubscriberBlackwhiteListing(t *testing.T) { } // Test that blacklist takes precedence over whitelist. - broker.Publish(pubSess, &wamp.Publish{ + broker.publish(pubSess, &wamp.Publish{ Request: 126, Topic: testTopic, Options: wamp.Dict{"eligible_authid": []string{"jdoe"}, @@ -572,12 +572,12 @@ func TestSubscriberBlackwhiteListing(t *testing.T) { } func TestPublisherExclusion(t *testing.T) { - broker := NewBroker(logger, false, true, debug, nil) + broker := newBroker(logger, false, true, debug, nil) subscriber := newTestPeer() sess := wamp.NewSession(subscriber, 0, nil, nil) testTopic := wamp.URI("nexus.test.topic") - broker.Subscribe(sess, &wamp.Subscribe{Request: 123, Topic: testTopic}) + broker.subscribe(sess, &wamp.Subscribe{Request: 123, Topic: testTopic}) // Test that subscriber received SUBSCRIBED message rsp, err := wamp.RecvTimeout(sess, time.Second) @@ -603,7 +603,7 @@ func TestPublisherExclusion(t *testing.T) { pubSess := wamp.NewSession(publisher, 0, nil, details) // Subscribe the publish session also. - broker.Subscribe(pubSess, &wamp.Subscribe{Request: 123, Topic: testTopic}) + broker.subscribe(pubSess, &wamp.Subscribe{Request: 123, Topic: testTopic}) // Test that pub session received SUBSCRIBED message rsp, err = wamp.RecvTimeout(pubSess, time.Second) if err != nil { @@ -615,7 +615,7 @@ func TestPublisherExclusion(t *testing.T) { } // Publish message with exclud_me = false. - broker.Publish(pubSess, &wamp.Publish{ + broker.publish(pubSess, &wamp.Publish{ Request: 124, Topic: testTopic, Options: wamp.Dict{"exclude_me": false}, @@ -630,7 +630,7 @@ func TestPublisherExclusion(t *testing.T) { } // Publish message with exclud_me = true. - broker.Publish(pubSess, &wamp.Publish{ + broker.publish(pubSess, &wamp.Publish{ Request: 124, Topic: testTopic, Options: wamp.Dict{"exclude_me": true}, @@ -646,7 +646,7 @@ func TestPublisherExclusion(t *testing.T) { } func TestPublisherIdentification(t *testing.T) { - broker := NewBroker(logger, false, true, debug, nil) + broker := newBroker(logger, false, true, debug, nil) subscriber := newTestPeer() details := wamp.Dict{ @@ -662,7 +662,7 @@ func TestPublisherIdentification(t *testing.T) { testTopic := wamp.URI("nexus.test.topic") - broker.Subscribe(sess, &wamp.Subscribe{Request: 123, Topic: testTopic}) + broker.subscribe(sess, &wamp.Subscribe{Request: 123, Topic: testTopic}) // Test that subscriber received SUBSCRIBED message rsp := <-sess.Recv() @@ -673,7 +673,7 @@ func TestPublisherIdentification(t *testing.T) { publisher := newTestPeer() pubSess := wamp.NewSession(publisher, wamp.GlobalID(), nil, nil) - broker.Publish(pubSess, &wamp.Publish{ + broker.publish(pubSess, &wamp.Publish{ Request: 124, Topic: testTopic, Options: wamp.Dict{"disclose_me": true}, diff --git a/router/dealer.go b/router/dealer.go index cfccb683..48eafdda 100644 --- a/router/dealer.go +++ b/router/dealer.go @@ -75,7 +75,7 @@ type requestID struct { request wamp.ID } -type Dealer struct { +type dealer struct { // procedure URI -> registration ID procRegMap map[wamp.URI]*registration pfxProcRegMap map[wamp.URI]*registration @@ -119,14 +119,14 @@ type Dealer struct { debug bool } -// NewDealer creates the default Dealer implementation. +// newDealer creates the default Dealer implementation. // // Messages are routed serially by the dealer's message handling goroutine. // This serialization is limited to the work of determining the message's // destination, and then the message is handed off to the next goroutine, // typically the receiving client's send handler. -func NewDealer(logger stdlog.StdLog, strictURI, allowDisclose, debug bool) *Dealer { - d := &Dealer{ +func newDealer(logger stdlog.StdLog, strictURI, allowDisclose, debug bool) *dealer { + d := &dealer{ procRegMap: map[wamp.URI]*registration{}, pfxProcRegMap: map[wamp.URI]*registration{}, wcProcRegMap: map[wamp.URI]*registration{}, @@ -156,25 +156,25 @@ func NewDealer(logger stdlog.StdLog, strictURI, allowDisclose, debug bool) *Deal return d } -// SetMetaPeer sets the client that the dealer uses to publish meta events. -func (d *Dealer) SetMetaPeer(metaPeer wamp.Peer) { +// setMetaPeer sets the client that the dealer uses to publish meta events. +func (d *dealer) setMetaPeer(metaPeer wamp.Peer) { d.actionChan <- func() { d.metaPeer = metaPeer } } -// Role returns the role information for the "dealer" role. The data returned +// role returns the role information for the "dealer" role. The data returned // is suitable for use as broker role info in a WELCOME message. -func (d *Dealer) Role() wamp.Dict { +func (d *dealer) role() wamp.Dict { return dealerRole } -// Register registers a callee to handle calls to a procedure. +// register registers a callee to handle calls to a procedure. // // If the shared_registration feature is supported, and if allowed by the // invocation policy, multiple callees may register to handle the same // procedure. -func (d *Dealer) Register(callee *wamp.Session, msg *wamp.Register) { +func (d *dealer) register(callee *wamp.Session, msg *wamp.Register) { if callee == nil || msg == nil { panic("dealer.Register with nil session or message") } @@ -235,7 +235,7 @@ func (d *Dealer) Register(callee *wamp.Session, msg *wamp.Register) { var metaPubs []*wamp.Publish done := make(chan struct{}) d.actionChan <- func() { - metaPubs = d.register(callee, msg, match, invoke, disclose, wampURI) + metaPubs = d.syncRegister(callee, msg, match, invoke, disclose, wampURI) close(done) } <-done @@ -244,15 +244,15 @@ func (d *Dealer) Register(callee *wamp.Session, msg *wamp.Register) { } } -// Unregister removes a remote procedure previously registered by the callee. -func (d *Dealer) Unregister(callee *wamp.Session, msg *wamp.Unregister) { +// unregister removes a remote procedure previously registered by the callee. +func (d *dealer) unregister(callee *wamp.Session, msg *wamp.Unregister) { if callee == nil || msg == nil { panic("dealer.Unregister with nil session or message") } var metaPubs []*wamp.Publish done := make(chan struct{}) d.actionChan <- func() { - metaPubs = d.unregister(callee, msg) + metaPubs = d.syncUnregister(callee, msg) close(done) } <-done @@ -262,17 +262,17 @@ func (d *Dealer) Unregister(callee *wamp.Session, msg *wamp.Unregister) { } -// Call invokes a registered remote procedure. -func (d *Dealer) Call(caller *wamp.Session, msg *wamp.Call) { +// call invokes a registered remote procedure. +func (d *dealer) call(caller *wamp.Session, msg *wamp.Call) { if caller == nil || msg == nil { panic("dealer.Call with nil session or message") } d.actionChan <- func() { - d.call(caller, msg) + d.syncCall(caller, msg) } } -// Cancel actively cancels a call that is in progress. +// cancel actively cancels a call that is in progress. // // Cancellation behaves differently depending on the mode: // @@ -290,7 +290,7 @@ func (d *Dealer) Call(caller *wamp.Session, msg *wamp.Call) { // invocation or interrupt from the callee is discarded when received. // // If the callee does not support call canceling, then behavior is "skip". -func (d *Dealer) Cancel(caller *wamp.Session, msg *wamp.Cancel) { +func (d *dealer) cancel(caller *wamp.Session, msg *wamp.Cancel) { if caller == nil || msg == nil { panic("dealer.Cancel with nil session or message") } @@ -310,24 +310,24 @@ func (d *Dealer) Cancel(caller *wamp.Session, msg *wamp.Cancel) { return } d.actionChan <- func() { - d.cancel(caller, msg, mode, wamp.ErrCanceled) + d.syncCancel(caller, msg, mode, wamp.ErrCanceled) } } -// Yield handles the result of successfully processing and finishing the +// yield handles the result of successfully processing and finishing the // execution of a call, send from callee to dealer. // // If the RESULT could not be sent to the caller because the caller was blocked // (send queue full), then retry sending until timeout. If timeout while // trying to send RESULT, then cancel call. -func (d *Dealer) Yield(callee *wamp.Session, msg *wamp.Yield) { +func (d *dealer) yield(callee *wamp.Session, msg *wamp.Yield) { if callee == nil || msg == nil { panic("dealer.Yield with nil session or message") } var again bool done := make(chan struct{}) d.actionChan <- func() { - again = d.yield(callee, msg, true) + again = d.syncYield(callee, msg, true) done <- struct{}{} } <-done @@ -348,7 +348,7 @@ func (d *Dealer) Yield(callee *wamp.Session, msg *wamp.Yield) { retry = false } d.actionChan <- func() { - again = d.yield(callee, msg, retry) + again = d.syncYield(callee, msg, retry) done <- struct{}{} } <-done @@ -360,21 +360,22 @@ func (d *Dealer) Yield(callee *wamp.Session, msg *wamp.Yield) { } } -// Error handles an invocation error returned by the callee. -func (d *Dealer) Error(msg *wamp.Error) { +// error handles an invocation error returned by the callee. +func (d *dealer) error(msg *wamp.Error) { if msg == nil { panic("dealer.Error with nil message") } d.actionChan <- func() { - d.error(msg) + d.syncError(msg) } } -// Remove a callee's registrations. This is called when a client leaves the -// realm by sending a GOODBYE message or by disconnecting from the router. If -// there are any registrations for this session wamp.registration.on_unregister -// and wamp.registration.on_delete meta events are published for each. -func (d *Dealer) RemoveSession(sess *wamp.Session) { +// removeSessiom removes a callee's registrations. This is called when a +// client leaves the realm by sending a GOODBYE message or by disconnecting +// from the router. If there are any registrations for this session +// wamp.registration.on_unregister and wamp.registration.on_delete meta events +// are published for each. +func (d *dealer) removeSession(sess *wamp.Session) { if sess == nil { // No session specified, no session removed. return @@ -386,7 +387,7 @@ func (d *Dealer) RemoveSession(sess *wamp.Session) { var metaPubs []*wamp.Publish done := make(chan struct{}) d.actionChan <- func() { - metaPubs = d.removeSession(sess) + metaPubs = d.syncRemoveSession(sess) close(done) } <-done @@ -395,12 +396,12 @@ func (d *Dealer) RemoveSession(sess *wamp.Session) { } } -// Close stops the dealer, letting already queued actions finish. -func (d *Dealer) Close() { +// close stops the dealer, letting already queued actions finish. +func (d *dealer) close() { close(d.actionChan) } -func (d *Dealer) run() { +func (d *dealer) run() { for action := range d.actionChan { action() } @@ -409,7 +410,7 @@ func (d *Dealer) run() { } } -func (d *Dealer) register(callee *wamp.Session, msg *wamp.Register, match, invokePolicy string, disclose, wampURI bool) []*wamp.Publish { +func (d *dealer) syncRegister(callee *wamp.Session, msg *wamp.Register, match, invokePolicy string, disclose, wampURI bool) []*wamp.Publish { var metaPubs []*wamp.Publish var reg *registration switch match { @@ -533,7 +534,7 @@ func (d *Dealer) register(callee *wamp.Session, msg *wamp.Register, match, invok return metaPubs } -func (d *Dealer) unregister(callee *wamp.Session, msg *wamp.Unregister) []*wamp.Publish { +func (d *dealer) syncUnregister(callee *wamp.Session, msg *wamp.Unregister) []*wamp.Publish { var metaPubs []*wamp.Publish // Delete the registration ID from the callee's set of registrations. if _, ok := d.calleeRegIDSet[callee]; ok { @@ -543,7 +544,7 @@ func (d *Dealer) unregister(callee *wamp.Session, msg *wamp.Unregister) []*wamp. } } - delReg, err := d.delCalleeReg(callee, msg.Registration) + delReg, err := d.syncDelCalleeReg(callee, msg.Registration) if err != nil { d.log.Println("Cannot unregister:", err) d.trySend(callee, &wamp.Error{ @@ -583,11 +584,12 @@ func (d *Dealer) unregister(callee *wamp.Session, msg *wamp.Unregister) []*wamp. return metaPubs } -// matchProcedure finds the best matching registration given a procedure URI. +// syncMatchProcedure finds the best matching registration given a procedure +// URI. // // If there are both matching prefix and wildcard registrations, then find the // one with the more specific match (longest matched pattern). -func (d *Dealer) matchProcedure(procedure wamp.URI) (*registration, bool) { +func (d *dealer) syncMatchProcedure(procedure wamp.URI) (*registration, bool) { // Find registered procedures with exact match. reg, ok := d.procRegMap[procedure] if !ok { @@ -624,8 +626,8 @@ func (d *Dealer) matchProcedure(procedure wamp.URI) (*registration, bool) { return reg, ok } -func (d *Dealer) call(caller *wamp.Session, msg *wamp.Call) { - reg, ok := d.matchProcedure(msg.Procedure) +func (d *dealer) syncCall(caller *wamp.Session, msg *wamp.Call) { + reg, ok := d.syncMatchProcedure(msg.Procedure) if !ok || len(reg.callees) == 0 { // If no registered procedure, send error. d.trySend(caller, &wamp.Error{ @@ -721,7 +723,7 @@ func (d *Dealer) call(caller *wamp.Session, msg *wamp.Call) { // If the Callee supports progressive calls, the Dealer will forward // the Caller's willingness to receive progressive results by setting. // - // The Callee must all call canceling, as this is necessary to stop + // The Callee must support call canceling, as this is necessary to stop // progressive results if the caller session is closed during // progressive result delivery. if callee.HasFeature(roleCallee, featureProgCallResults) && callee.HasFeature(roleCallee, featureCallCanceling) { @@ -756,7 +758,7 @@ func (d *Dealer) call(caller *wamp.Session, msg *wamp.Call) { Arguments: msg.Arguments, ArgumentsKw: msg.ArgumentsKw, }) { - d.error(&wamp.Error{ + d.syncError(&wamp.Error{ Type: wamp.INVOCATION, Request: invocationID, Details: wamp.Dict{}, @@ -766,7 +768,7 @@ func (d *Dealer) call(caller *wamp.Session, msg *wamp.Call) { } } -func (d *Dealer) cancel(caller *wamp.Session, msg *wamp.Cancel, mode string, reason wamp.URI) { +func (d *dealer) syncCancel(caller *wamp.Session, msg *wamp.Cancel, mode string, reason wamp.URI) { reqID := requestID{ session: caller.ID, request: msg.Request, @@ -849,7 +851,7 @@ func (d *Dealer) cancel(caller *wamp.Session, msg *wamp.Cancel, mode string, rea }) } -func (d *Dealer) yield(callee *wamp.Session, msg *wamp.Yield, canRetry bool) bool { +func (d *dealer) syncYield(callee *wamp.Session, msg *wamp.Yield, canRetry bool) bool { progress, _ := msg.Options[wamp.OptProgress].(bool) // Find and delete pending invocation. @@ -935,13 +937,13 @@ func (d *Dealer) yield(callee *wamp.Session, msg *wamp.Yield, canRetry bool) boo return true } d.log.Printf("!!! Dropped %s to caller %s: %s", res.MessageType(), caller, err) - d.cancel(caller, &wamp.Cancel{Request: callID.request}, + d.syncCancel(caller, &wamp.Cancel{Request: callID.request}, wamp.CancelModeKillNoWait, wamp.ErrCanceled) } return false } -func (d *Dealer) error(msg *wamp.Error) { +func (d *dealer) syncError(msg *wamp.Error) { // Find and delete pending invocation. invk, ok := d.invocations[msg.Request] if !ok { @@ -977,11 +979,11 @@ func (d *Dealer) error(msg *wamp.Error) { }) } -func (d *Dealer) removeSession(sess *wamp.Session) []*wamp.Publish { +func (d *dealer) syncRemoveSession(sess *wamp.Session) []*wamp.Publish { var metaPubs []*wamp.Publish // Remove any remaining registrations for the removed session. for regID := range d.calleeRegIDSet[sess] { - delReg, err := d.delCalleeReg(sess, regID) + delReg, err := d.syncDelCalleeReg(sess, regID) if err != nil { panic("!!! Callee had ID of nonexistent registration") } @@ -1030,13 +1032,13 @@ func (d *Dealer) removeSession(sess *wamp.Session) []*wamp.Publish { return metaPubs } -// delCalleeReg deletes the the callee from the specified registration and +// syncDelCalleeReg deletes the the callee from the specified registration and // deletes the registration from the set of registrations for the callee. // // If there are no more callees for the registration, then the registration is // removed and true is returned to indicate that the last registration was // deleted. -func (d *Dealer) delCalleeReg(callee *wamp.Session, regID wamp.ID) (bool, error) { +func (d *dealer) syncDelCalleeReg(callee *wamp.Session, regID wamp.ID) (bool, error) { reg, ok := d.registrations[regID] if !ok { // The registration doesn't exist @@ -1083,8 +1085,8 @@ func (d *Dealer) delCalleeReg(callee *wamp.Session, regID wamp.ID) (bool, error) // ----- Meta Procedure Handlers ----- -// RegList retrieves registration IDs listed according to match policies. -func (d *Dealer) RegList(msg *wamp.Invocation) wamp.Message { +// regList retrieves registration IDs listed according to match policies. +func (d *dealer) regList(msg *wamp.Invocation) wamp.Message { var exactRegs, pfxRegs, wcRegs []wamp.ID sync := make(chan struct{}) d.actionChan <- func() { @@ -1111,9 +1113,9 @@ func (d *Dealer) RegList(msg *wamp.Invocation) wamp.Message { } } -// RegLookup obtains the registration (if any) managing a procedure, according +// regLookup obtains the registration (if any) managing a procedure, according // to some match policy. -func (d *Dealer) RegLookup(msg *wamp.Invocation) wamp.Message { +func (d *dealer) regLookup(msg *wamp.Invocation) wamp.Message { var regID wamp.ID if len(msg.Arguments) != 0 { if procedure, ok := wamp.AsURI(msg.Arguments[0]); ok { @@ -1150,15 +1152,15 @@ func (d *Dealer) RegLookup(msg *wamp.Invocation) wamp.Message { } } -// RegMatch obtains the registration best matching a given procedure URI. -func (d *Dealer) RegMatch(msg *wamp.Invocation) wamp.Message { +// regMatch obtains the registration best matching a given procedure URI. +func (d *dealer) regMatch(msg *wamp.Invocation) wamp.Message { var regID wamp.ID if len(msg.Arguments) != 0 { if procedure, ok := wamp.AsURI(msg.Arguments[0]); ok { sync := make(chan wamp.ID) d.actionChan <- func() { var r wamp.ID - if reg, ok := d.matchProcedure(procedure); ok { + if reg, ok := d.syncMatchProcedure(procedure); ok { r = reg.id } sync <- r @@ -1172,8 +1174,8 @@ func (d *Dealer) RegMatch(msg *wamp.Invocation) wamp.Message { } } -// RegGet retrieves information on a particular registration. -func (d *Dealer) RegGet(msg *wamp.Invocation) wamp.Message { +// regGet retrieves information on a particular registration. +func (d *dealer) regGet(msg *wamp.Invocation) wamp.Message { var dict wamp.Dict if len(msg.Arguments) != 0 { if regID, ok := wamp.AsID(msg.Arguments[0]); ok { @@ -1207,9 +1209,9 @@ func (d *Dealer) RegGet(msg *wamp.Invocation) wamp.Message { } } -// RegListCallees retrieves a list of session IDs for sessions currently +// regListCallees retrieves a list of session IDs for sessions currently // attached to the registration. -func (d *Dealer) RegListCallees(msg *wamp.Invocation) wamp.Message { +func (d *dealer) regListCallees(msg *wamp.Invocation) wamp.Message { var calleeIDs []wamp.ID if len(msg.Arguments) != 0 { if regID, ok := wamp.AsID(msg.Arguments[0]); ok { @@ -1240,9 +1242,9 @@ func (d *Dealer) RegListCallees(msg *wamp.Invocation) wamp.Message { } } -// RegCountCallees obtains the number of sessions currently attached to the +// regCountCallees obtains the number of sessions currently attached to the // registration. -func (d *Dealer) RegCountCallees(msg *wamp.Invocation) wamp.Message { +func (d *dealer) regCountCallees(msg *wamp.Invocation) wamp.Message { var count int var ok bool if len(msg.Arguments) != 0 { @@ -1274,7 +1276,7 @@ func (d *Dealer) RegCountCallees(msg *wamp.Invocation) wamp.Message { } } -func (d *Dealer) trySend(sess *wamp.Session, msg wamp.Message) bool { +func (d *dealer) trySend(sess *wamp.Session, msg wamp.Message) bool { if err := sess.TrySend(msg); err != nil { d.log.Printf("!!! Dropped %s to session %s: %s", msg.MessageType(), sess, err) return false diff --git a/router/dealer_test.go b/router/dealer_test.go index 462783d4..a94c970d 100644 --- a/router/dealer_test.go +++ b/router/dealer_test.go @@ -10,10 +10,10 @@ import ( "github.com/gammazero/nexus/wamp" ) -func newTestDealer() (*Dealer, wamp.Peer) { - d := NewDealer(logger, false, true, debug) +func newTestDealer() (*dealer, wamp.Peer) { + d := newDealer(logger, false, true, debug) metaClient, rtr := transport.LinkedPeers() - d.SetMetaPeer(rtr) + d.setMetaPeer(rtr) return d, metaClient } @@ -46,7 +46,7 @@ func TestBasicRegister(t *testing.T) { // Register callee callee := newTestPeer() sess := wamp.NewSession(callee, 0, nil, nil) - dealer.Register(sess, &wamp.Register{Request: 123, Procedure: testProcedure}) + dealer.register(sess, &wamp.Register{Request: 123, Procedure: testProcedure}) rsp := <-callee.Recv() // Test that callee receives a registered message. @@ -84,7 +84,7 @@ func TestBasicRegister(t *testing.T) { } // Check the procedure cannot be registered more than once. - dealer.Register(sess, &wamp.Register{Request: 456, Procedure: testProcedure}) + dealer.register(sess, &wamp.Register{Request: 456, Procedure: testProcedure}) rsp = <-callee.Recv() errMsg := rsp.(*wamp.Error) if errMsg.Error != wamp.ErrProcedureAlreadyExists { @@ -101,7 +101,7 @@ func TestUnregister(t *testing.T) { // Register a procedure. callee := newTestPeer() sess := wamp.NewSession(callee, 0, nil, nil) - dealer.Register(sess, &wamp.Register{Request: 123, Procedure: testProcedure}) + dealer.register(sess, &wamp.Register{Request: 123, Procedure: testProcedure}) rsp := <-callee.Recv() regID := rsp.(*wamp.Registered).Registration @@ -113,7 +113,7 @@ func TestUnregister(t *testing.T) { } // Unregister the procedure. - dealer.Unregister(sess, &wamp.Unregister{Request: 124, Registration: regID}) + dealer.unregister(sess, &wamp.Unregister{Request: 124, Registration: regID}) // Check that callee received UNREGISTERED message. rsp = <-callee.Recv() @@ -151,7 +151,7 @@ func TestBasicCall(t *testing.T) { // Register a procedure. callee := newTestPeer() calleeSess := wamp.NewSession(callee, 0, nil, nil) - dealer.Register(calleeSess, + dealer.register(calleeSess, &wamp.Register{Request: 123, Procedure: testProcedure}) var rsp wamp.Message select { @@ -174,7 +174,7 @@ func TestBasicCall(t *testing.T) { callerSession := wamp.NewSession(caller, 0, nil, nil) // Test calling invalid procedure - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 124, Procedure: wamp.URI("nexus.test.bad")}) rsp = <-callerSession.Recv() errMsg, ok := rsp.(*wamp.Error) @@ -189,7 +189,7 @@ func TestBasicCall(t *testing.T) { } // Test calling valid procedure - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 125, Procedure: testProcedure}) // Test that callee received an INVOCATION message. @@ -200,7 +200,7 @@ func TestBasicCall(t *testing.T) { } // Callee responds with a YIELD message - dealer.Yield(calleeSess, &wamp.Yield{Request: inv.Request}) + dealer.yield(calleeSess, &wamp.Yield{Request: inv.Request}) // Check that caller received a RESULT message. rsp = <-caller.Recv() rslt, ok := rsp.(*wamp.Result) @@ -215,14 +215,14 @@ func TestBasicCall(t *testing.T) { } // Test calling valid procedure, with callee responding with error. - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 126, Procedure: testProcedure}) // callee received an INVOCATION message. rsp = <-callee.Recv() inv = rsp.(*wamp.Invocation) // Callee responds with a ERROR message - dealer.Error(&wamp.Error{Request: inv.Request}) + dealer.error(&wamp.Error{Request: inv.Request}) // Check that caller received an ERROR message. rsp = <-caller.Recv() @@ -242,7 +242,7 @@ func TestRemovePeer(t *testing.T) { callee := newTestPeer() sess := wamp.NewSession(callee, 0, nil, nil) msg := &wamp.Register{Request: 123, Procedure: testProcedure} - dealer.Register(sess, msg) + dealer.register(sess, msg) rsp := <-callee.Recv() regID := rsp.(*wamp.Registered).Registration @@ -261,11 +261,11 @@ func TestRemovePeer(t *testing.T) { } // Test that removing the callee session removes the registration. - dealer.RemoveSession(sess) + dealer.removeSession(sess) // Register as a way to sync with dealer. sess2 := wamp.NewSession(callee, 0, nil, nil) - dealer.Register(sess2, + dealer.register(sess2, &wamp.Register{Request: 789, Procedure: wamp.URI("nexus.test.p2")}) rsp = <-callee.Recv() @@ -278,7 +278,7 @@ func TestRemovePeer(t *testing.T) { // Tests that registering the callee again succeeds. msg.Request = 124 - dealer.Register(sess, msg) + dealer.register(sess, msg) rsp = <-callee.Recv() if rsp.MessageType() != wamp.REGISTERED { t.Fatal("expected", wamp.REGISTERED, "got:", rsp.MessageType()) @@ -303,7 +303,7 @@ func TestCancelCallModeKill(t *testing.T) { // Register a procedure. callee := newTestPeer() calleeSess := wamp.NewSession(callee, 0, nil, calleeRoles) - dealer.Register(calleeSess, + dealer.register(calleeSess, &wamp.Register{Request: 123, Procedure: testProcedure}) rsp := <-callee.Recv() _, ok := rsp.(*wamp.Registered) @@ -319,7 +319,7 @@ func TestCancelCallModeKill(t *testing.T) { callerSession := wamp.NewSession(caller, 0, nil, nil) // Test calling valid procedure - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 125, Procedure: testProcedure}) // Test that callee received an INVOCATION message. @@ -331,7 +331,7 @@ func TestCancelCallModeKill(t *testing.T) { // Test caller cancelling call. mode=kill opts := wamp.SetOption(nil, "mode", "kill") - dealer.Cancel(callerSession, &wamp.Cancel{Request: 125, Options: opts}) + dealer.cancel(callerSession, &wamp.Cancel{Request: 125, Options: opts}) // callee should receive an INTERRUPT request rsp = <-callee.Recv() @@ -344,7 +344,7 @@ func TestCancelCallModeKill(t *testing.T) { } // callee responds with ERROR message - dealer.Error(&wamp.Error{ + dealer.error(&wamp.Error{ Type: wamp.INVOCATION, Request: inv.Request, Error: wamp.ErrCanceled, @@ -384,7 +384,7 @@ func TestCancelCallModeKillNoWait(t *testing.T) { // Register a procedure. callee := newTestPeer() calleeSess := wamp.NewSession(callee, 0, nil, calleeRoles) - dealer.Register(calleeSess, + dealer.register(calleeSess, &wamp.Register{Request: 123, Procedure: testProcedure}) rsp := <-callee.Recv() _, ok := rsp.(*wamp.Registered) @@ -400,7 +400,7 @@ func TestCancelCallModeKillNoWait(t *testing.T) { callerSession := wamp.NewSession(caller, 0, nil, nil) // Test calling valid procedure - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 125, Procedure: testProcedure}) // Test that callee received an INVOCATION message. @@ -412,7 +412,7 @@ func TestCancelCallModeKillNoWait(t *testing.T) { // Test caller cancelling call. mode=kill opts := wamp.SetOption(nil, "mode", "killnowait") - dealer.Cancel(callerSession, &wamp.Cancel{Request: 125, Options: opts}) + dealer.cancel(callerSession, &wamp.Cancel{Request: 125, Options: opts}) // callee should receive an INTERRUPT request rsp = <-callee.Recv() @@ -425,7 +425,7 @@ func TestCancelCallModeKillNoWait(t *testing.T) { } // callee responds with ERROR message - dealer.Error(&wamp.Error{ + dealer.error(&wamp.Error{ Type: wamp.INVOCATION, Request: inv.Request, Error: wamp.ErrCanceled, @@ -462,7 +462,7 @@ func TestCancelCallModeSkip(t *testing.T) { } calleeSess := wamp.NewSession(callee, 0, nil, calleeRoles) - dealer.Register(calleeSess, + dealer.register(calleeSess, &wamp.Register{Request: 123, Procedure: testProcedure}) rsp := <-callee.Recv() _, ok := rsp.(*wamp.Registered) @@ -478,7 +478,7 @@ func TestCancelCallModeSkip(t *testing.T) { callerSession := wamp.NewSession(caller, 0, nil, nil) // Test calling valid procedure - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 125, Procedure: testProcedure}) // Test that callee received an INVOCATION message. @@ -490,7 +490,7 @@ func TestCancelCallModeSkip(t *testing.T) { // Test caller cancelling call. mode=kill opts := wamp.SetOption(nil, "mode", "skip") - dealer.Cancel(callerSession, &wamp.Cancel{Request: 125, Options: opts}) + dealer.cancel(callerSession, &wamp.Cancel{Request: 125, Options: opts}) // callee should NOT receive an INTERRUPT request select { @@ -526,7 +526,7 @@ func TestSharedRegistrationRoundRobin(t *testing.T) { // Register callee1 with roundrobin shared registration callee1 := newTestPeer() calleeSess1 := wamp.NewSession(callee1, 0, nil, calleeRoles) - dealer.Register(calleeSess1, &wamp.Register{ + dealer.register(calleeSess1, &wamp.Register{ Request: 123, Procedure: testProcedure, Options: wamp.SetOption(nil, "invoke", "roundrobin"), @@ -548,7 +548,7 @@ func TestSharedRegistrationRoundRobin(t *testing.T) { // Register callee2 with roundrobin shared registration callee2 := newTestPeer() calleeSess2 := wamp.NewSession(callee2, 0, nil, calleeRoles) - dealer.Register(calleeSess2, &wamp.Register{ + dealer.register(calleeSess2, &wamp.Register{ Request: 124, Procedure: testProcedure, Options: wamp.SetOption(nil, "invoke", "roundrobin"), @@ -570,7 +570,7 @@ func TestSharedRegistrationRoundRobin(t *testing.T) { // Test calling valid procedure caller := newTestPeer() callerSession := wamp.NewSession(caller, 0, nil, nil) - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 125, Procedure: testProcedure}) // Test that callee1 received an INVOCATION message. @@ -588,7 +588,7 @@ func TestSharedRegistrationRoundRobin(t *testing.T) { } // Callee responds with a YIELD message - dealer.Yield(calleeSess1, &wamp.Yield{Request: inv.Request}) + dealer.yield(calleeSess1, &wamp.Yield{Request: inv.Request}) // Check that caller received a RESULT message. rsp = <-caller.Recv() rslt, ok := rsp.(*wamp.Result) @@ -600,7 +600,7 @@ func TestSharedRegistrationRoundRobin(t *testing.T) { } // Test calling valid procedure - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 126, Procedure: testProcedure}) // Test that callee2 received an INVOCATION message. @@ -617,7 +617,7 @@ func TestSharedRegistrationRoundRobin(t *testing.T) { } // Callee responds with a YIELD message - dealer.Yield(calleeSess2, &wamp.Yield{Request: inv.Request}) + dealer.yield(calleeSess2, &wamp.Yield{Request: inv.Request}) // Check that caller received a RESULT message. rsp = <-caller.Recv() rslt, ok = rsp.(*wamp.Result) @@ -645,7 +645,7 @@ func TestSharedRegistrationFirst(t *testing.T) { // Register callee1 with first shared registration callee1 := newTestPeer() calleeSess1 := wamp.NewSession(callee1, 1111, nil, calleeRoles) - dealer.Register(calleeSess1, &wamp.Register{ + dealer.register(calleeSess1, &wamp.Register{ Request: 123, Procedure: testProcedure, Options: wamp.SetOption(nil, "invoke", "first"), @@ -667,7 +667,7 @@ func TestSharedRegistrationFirst(t *testing.T) { // Register callee2 with roundrobin shared registration callee2 := newTestPeer() calleeSess2 := wamp.NewSession(callee2, 2222, nil, calleeRoles) - dealer.Register(calleeSess2, &wamp.Register{ + dealer.register(calleeSess2, &wamp.Register{ Request: 1233, Procedure: testProcedure, Options: wamp.SetOption(nil, "invoke", "roundrobin"), @@ -679,7 +679,7 @@ func TestSharedRegistrationFirst(t *testing.T) { } // Register callee2 with "first" shared registration - dealer.Register(calleeSess2, &wamp.Register{ + dealer.register(calleeSess2, &wamp.Register{ Request: 124, Procedure: testProcedure, Options: wamp.SetOption(nil, "invoke", "first"), @@ -705,7 +705,7 @@ func TestSharedRegistrationFirst(t *testing.T) { // Test calling valid procedure caller := newTestPeer() callerSession := wamp.NewSession(caller, 333, nil, nil) - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 125, Procedure: testProcedure}) // Test that callee1 received an INVOCATION message. @@ -723,7 +723,7 @@ func TestSharedRegistrationFirst(t *testing.T) { } // Callee1 responds with a YIELD message - dealer.Yield(calleeSess1, &wamp.Yield{Request: inv.Request}) + dealer.yield(calleeSess1, &wamp.Yield{Request: inv.Request}) // Check that caller received a RESULT message. select { @@ -740,7 +740,7 @@ func TestSharedRegistrationFirst(t *testing.T) { } // Test calling valid procedure - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 126, Procedure: testProcedure}) // Test that callee1 received an INVOCATION message. @@ -757,7 +757,7 @@ func TestSharedRegistrationFirst(t *testing.T) { } // Callee responds with a YIELD message - dealer.Yield(calleeSess1, &wamp.Yield{Request: inv.Request}) + dealer.yield(calleeSess1, &wamp.Yield{Request: inv.Request}) // Check that caller received a RESULT message. select { @@ -774,7 +774,7 @@ func TestSharedRegistrationFirst(t *testing.T) { } // Remove callee1 - dealer.RemoveSession(calleeSess1) + dealer.removeSession(calleeSess1) if err = checkMetaReg(metaClient, calleeSess1.ID); err != nil { t.Fatal("Registration meta event fail:", err) } @@ -783,7 +783,7 @@ func TestSharedRegistrationFirst(t *testing.T) { } // Test calling valid procedure - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 127, Procedure: testProcedure}) // Test that callee2 received an INVOCATION message. @@ -800,7 +800,7 @@ func TestSharedRegistrationFirst(t *testing.T) { } // Callee responds with a YIELD message - dealer.Yield(calleeSess2, &wamp.Yield{Request: inv.Request}) + dealer.yield(calleeSess2, &wamp.Yield{Request: inv.Request}) // Check that caller received a RESULT message. select { @@ -834,7 +834,7 @@ func TestSharedRegistrationLast(t *testing.T) { // Register callee1 with last shared registration callee1 := newTestPeer() calleeSess1 := wamp.NewSession(callee1, 0, nil, calleeRoles) - dealer.Register(calleeSess1, &wamp.Register{ + dealer.register(calleeSess1, &wamp.Register{ Request: 123, Procedure: testProcedure, Options: wamp.SetOption(nil, "invoke", "last"), @@ -855,7 +855,7 @@ func TestSharedRegistrationLast(t *testing.T) { // Register callee2 with last shared registration callee2 := newTestPeer() calleeSess2 := wamp.NewSession(callee2, 0, nil, calleeRoles) - dealer.Register(calleeSess2, &wamp.Register{ + dealer.register(calleeSess2, &wamp.Register{ Request: 124, Procedure: testProcedure, Options: wamp.SetOption(nil, "invoke", "last"), @@ -871,7 +871,7 @@ func TestSharedRegistrationLast(t *testing.T) { // Test calling valid procedure caller := newTestPeer() callerSession := wamp.NewSession(caller, 0, nil, nil) - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 125, Procedure: testProcedure}) // Test that callee2 received an INVOCATION message. @@ -889,7 +889,7 @@ func TestSharedRegistrationLast(t *testing.T) { } // Callee responds with a YIELD message - dealer.Yield(calleeSess2, &wamp.Yield{Request: inv.Request}) + dealer.yield(calleeSess2, &wamp.Yield{Request: inv.Request}) // Check that caller received a RESULT message. rsp = <-caller.Recv() rslt, ok := rsp.(*wamp.Result) @@ -901,7 +901,7 @@ func TestSharedRegistrationLast(t *testing.T) { } // Test calling valid procedure - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 126, Procedure: testProcedure}) // Test that callee2 received an INVOCATION message. @@ -918,7 +918,7 @@ func TestSharedRegistrationLast(t *testing.T) { } // Callee responds with a YIELD message - dealer.Yield(calleeSess2, &wamp.Yield{Request: inv.Request}) + dealer.yield(calleeSess2, &wamp.Yield{Request: inv.Request}) // Check that caller received a RESULT message. rsp = <-caller.Recv() rslt, ok = rsp.(*wamp.Result) @@ -930,7 +930,7 @@ func TestSharedRegistrationLast(t *testing.T) { } // Remove callee2 - dealer.RemoveSession(calleeSess2) + dealer.removeSession(calleeSess2) if err = checkMetaReg(metaClient, calleeSess2.ID); err != nil { t.Fatal("Registration meta event fail:", err) } @@ -939,7 +939,7 @@ func TestSharedRegistrationLast(t *testing.T) { } // Test calling valid procedure - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 127, Procedure: testProcedure}) // Test that callee1 received an INVOCATION message. @@ -956,7 +956,7 @@ func TestSharedRegistrationLast(t *testing.T) { } // Callee responds with a YIELD message - dealer.Yield(calleeSess1, &wamp.Yield{Request: inv.Request}) + dealer.yield(calleeSess1, &wamp.Yield{Request: inv.Request}) // Check that caller received a RESULT message. rsp = <-caller.Recv() rslt, ok = rsp.(*wamp.Result) @@ -984,7 +984,7 @@ func TestPatternBasedRegistration(t *testing.T) { // Register a procedure with wildcard match. callee := newTestPeer() calleeSess := wamp.NewSession(callee, 0, nil, calleeRoles) - dealer.Register(calleeSess, + dealer.register(calleeSess, &wamp.Register{ Request: 123, Procedure: testProcedureWC, @@ -1008,7 +1008,7 @@ func TestPatternBasedRegistration(t *testing.T) { callerSession := wamp.NewSession(caller, 0, nil, nil) // Test calling valid procedure with full name. Widlcard should match. - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 125, Procedure: testProcedure}) // Test that callee received an INVOCATION message. @@ -1027,7 +1027,7 @@ func TestPatternBasedRegistration(t *testing.T) { } // Callee responds with a YIELD message - dealer.Yield(calleeSess, &wamp.Yield{Request: inv.Request}) + dealer.yield(calleeSess, &wamp.Yield{Request: inv.Request}) // Check that caller received a RESULT message. rsp = <-caller.Recv() rslt, ok := rsp.(*wamp.Result) @@ -1051,7 +1051,7 @@ func TestRPCBlockedUnresponsiveCallee(t *testing.T) { callee, rtr := transport.LinkedPeers() calleeSess := wamp.NewSession(rtr, 0, nil, nil) opts := wamp.Dict{wamp.OptTimeout: timeoutMs} - dealer.Register(calleeSess, + dealer.register(calleeSess, &wamp.Register{Request: 223, Procedure: testProcedure, Options: opts}) rsp := <-callee.Recv() _, ok := rsp.(*wamp.Registered) @@ -1073,7 +1073,7 @@ sendLoop: i++ fmt.Println("Calling", i) // Test calling valid procedure - dealer.Call(callerSession, &wamp.Call{ + dealer.call(callerSession, &wamp.Call{ Request: wamp.ID(i + 225), Procedure: testProcedure, Options: opts, @@ -1115,7 +1115,7 @@ func TestCallerIdentification(t *testing.T) { // Register a procedure, set option to request disclosing caller. callee := newTestPeer() calleeSess := wamp.NewSession(callee, 0, nil, calleeRoles) - dealer.Register(calleeSess, + dealer.register(calleeSess, &wamp.Register{ Request: 123, Procedure: testProcedure, @@ -1138,7 +1138,7 @@ func TestCallerIdentification(t *testing.T) { callerSession := wamp.NewSession(caller, callerID, nil, nil) // Test calling valid procedure with full name. Widlcard should match. - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 125, Procedure: testProcedure}) // Test that callee received an INVOCATION message. @@ -1156,12 +1156,12 @@ func TestCallerIdentification(t *testing.T) { } func TestWrongYielder(t *testing.T) { - dealer := NewDealer(logger, false, true, debug) + dealer := newDealer(logger, false, true, debug) // Register a procedure. callee := newTestPeer() calleeSess := wamp.NewSession(callee, 7777, nil, nil) - dealer.Register(calleeSess, + dealer.register(calleeSess, &wamp.Register{Request: 4321, Procedure: testProcedure}) rsp := <-callee.Recv() _, ok := rsp.(*wamp.Registered) @@ -1178,7 +1178,7 @@ func TestWrongYielder(t *testing.T) { badCalleeSess := wamp.NewSession(badCallee, 1313, nil, nil) // Call the procedure - dealer.Call(callerSession, + dealer.call(callerSession, &wamp.Call{Request: 4322, Procedure: testProcedure}) // Test that callee received an INVOCATION message. @@ -1189,7 +1189,7 @@ func TestWrongYielder(t *testing.T) { } // Imposter callee responds with a YIELD message - dealer.Yield(badCalleeSess, &wamp.Yield{Request: inv.Request}) + dealer.yield(badCalleeSess, &wamp.Yield{Request: inv.Request}) // Check that caller did not received a RESULT message. select { diff --git a/router/publishfilter.go b/router/publishfilter.go index fbb64ef4..28d2fe1d 100644 --- a/router/publishfilter.go +++ b/router/publishfilter.go @@ -12,6 +12,9 @@ type PublishFilter interface { Allowed(sess *wamp.Session) bool } +// FilterFactory is a function which creates a PublishFilter from a publication +type FilterFactory func(msg *wamp.Publish) PublishFilter + type simplePublishFilter struct { blIDs []wamp.ID wlIDs []wamp.ID diff --git a/router/realm.go b/router/realm.go index b605d5da..df34a630 100644 --- a/router/realm.go +++ b/router/realm.go @@ -83,8 +83,8 @@ type testamentBucket struct { // authentication and authorization. WAMP messages are only routed within a // Realm. type realm struct { - broker *Broker - dealer *Dealer + broker *broker + dealer *dealer authorizer Authorizer @@ -133,7 +133,7 @@ var ( ) // newRealm creates a new realm with the given RealmConfig, broker and dealer. -func newRealm(config *RealmConfig, broker *Broker, dealer *Dealer, logger stdlog.StdLog, debug bool) (*realm, error) { +func newRealm(config *RealmConfig, broker *broker, dealer *dealer, logger stdlog.StdLog, debug bool) (*realm, error) { if !config.URI.ValidURI(config.StrictURI, "") { return nil, fmt.Errorf( "invalid realm URI %v (URI strict checking %v)", config.URI, config.StrictURI) @@ -262,8 +262,8 @@ func (r *realm) close() { // No new messages, so safe to close dealer and broker. Stop broker and // dealer so they can be GC'd, and then so can this realm. - r.dealer.Close() - r.broker.Close() + r.dealer.close() + r.broker.close() // Finally close realm's action channel. close(r.actionChan) @@ -289,20 +289,20 @@ func (r *realm) run() { r.registerMetaProcedure(wamp.MetaProcSessionModifyDetails, r.sessionModifyDetails) } // Register to handle registration meta procedures. - r.registerMetaProcedure(wamp.MetaProcRegList, r.dealer.RegList) - r.registerMetaProcedure(wamp.MetaProcRegLookup, r.dealer.RegLookup) - r.registerMetaProcedure(wamp.MetaProcRegMatch, r.dealer.RegMatch) - r.registerMetaProcedure(wamp.MetaProcRegGet, r.dealer.RegGet) - r.registerMetaProcedure(wamp.MetaProcRegListCallees, r.dealer.RegListCallees) - r.registerMetaProcedure(wamp.MetaProcRegCountCallees, r.dealer.RegCountCallees) + r.registerMetaProcedure(wamp.MetaProcRegList, r.dealer.regList) + r.registerMetaProcedure(wamp.MetaProcRegLookup, r.dealer.regLookup) + r.registerMetaProcedure(wamp.MetaProcRegMatch, r.dealer.regMatch) + r.registerMetaProcedure(wamp.MetaProcRegGet, r.dealer.regGet) + r.registerMetaProcedure(wamp.MetaProcRegListCallees, r.dealer.regListCallees) + r.registerMetaProcedure(wamp.MetaProcRegCountCallees, r.dealer.regCountCallees) // Register to handle subscription meta procedures. - r.registerMetaProcedure(wamp.MetaProcSubList, r.broker.SubList) - r.registerMetaProcedure(wamp.MetaProcSubLookup, r.broker.SubLookup) - r.registerMetaProcedure(wamp.MetaProcSubMatch, r.broker.SubMatch) - r.registerMetaProcedure(wamp.MetaProcSubGet, r.broker.SubGet) - r.registerMetaProcedure(wamp.MetaProcSubListSubscribers, r.broker.SubListSubscribers) - r.registerMetaProcedure(wamp.MetaProcSubCountSubscribers, r.broker.SubCountSubscribers) + r.registerMetaProcedure(wamp.MetaProcSubList, r.broker.subList) + r.registerMetaProcedure(wamp.MetaProcSubLookup, r.broker.subLookup) + r.registerMetaProcedure(wamp.MetaProcSubMatch, r.broker.subMatch) + r.registerMetaProcedure(wamp.MetaProcSubGet, r.broker.subGet) + r.registerMetaProcedure(wamp.MetaProcSubListSubscribers, r.broker.subListSubscribers) + r.registerMetaProcedure(wamp.MetaProcSubCountSubscribers, r.broker.subCountSubscribers) // Register to handle testament meta procedures. r.registerMetaProcedure(wamp.MetaProcSessionAddTestament, r.testamentAdd) @@ -325,7 +325,7 @@ func (r *realm) createMetaSession() { cli, rtr := transport.LinkedPeers() r.metaPeer = cli - r.dealer.SetMetaPeer(cli) + r.dealer.setMetaPeer(cli) // This session is the local leg of the router uplink. r.metaSess = wamp.NewSession(rtr, metaID, wamp.Dict{"authrole": "trusted"}, nil) @@ -394,8 +394,8 @@ func (r *realm) onLeave(sess *wamp.Session, shutdown, killAll bool) { // If realm is shutdown, do not bother to remove session from broker // and dealer. They will be closed after sessions are closed. if !shutdown { - r.dealer.RemoveSession(sess) - r.broker.RemoveSession(sess) + r.dealer.removeSession(sess) + r.broker.removeSession(sess) } close(sync) } @@ -521,22 +521,22 @@ func (r *realm) handleInboundMessages(sess *wamp.Session) (bool, bool, error) { switch msg := msg.(type) { case *wamp.Publish: - r.broker.Publish(sess, msg) + r.broker.publish(sess, msg) case *wamp.Subscribe: - r.broker.Subscribe(sess, msg) + r.broker.subscribe(sess, msg) case *wamp.Unsubscribe: - r.broker.Unsubscribe(sess, msg) + r.broker.unsubscribe(sess, msg) case *wamp.Register: - r.dealer.Register(sess, msg) + r.dealer.register(sess, msg) case *wamp.Unregister: - r.dealer.Unregister(sess, msg) + r.dealer.unregister(sess, msg) case *wamp.Call: - r.dealer.Call(sess, msg) + r.dealer.call(sess, msg) case *wamp.Yield: - r.dealer.Yield(sess, msg) + r.dealer.yield(sess, msg) case *wamp.Cancel: - r.dealer.Cancel(sess, msg) + r.dealer.cancel(sess, msg) case *wamp.Error: // An INVOCATION error is the only type of ERROR message the @@ -544,7 +544,7 @@ func (r *realm) handleInboundMessages(sess *wamp.Session) (bool, bool, error) { if msg.Type != wamp.INVOCATION { return false, false, fmt.Errorf("invalid ERROR received: %v", msg) } - r.dealer.Error(msg) + r.dealer.error(msg) case *wamp.Goodbye: // Handle client leaving realm. @@ -652,8 +652,8 @@ func (r *realm) authClient(sid wamp.ID, client wamp.Peer, details wamp.Dict) (*w "authmethod": "local", "authprovider": "static", "roles": wamp.Dict{ - "broker": r.broker.Role(), - "dealer": r.dealer.Role(), + "broker": r.broker.role(), + "dealer": r.dealer.role(), }, } return &wamp.Welcome{Details: details}, nil @@ -694,8 +694,8 @@ func (r *realm) authClient(sid wamp.ID, client wamp.Peer, details wamp.Dict) (*w } welcome.Details["authmethod"] = method welcome.Details["roles"] = wamp.Dict{ - "broker": r.broker.Role(), - "dealer": r.dealer.Role(), + "broker": r.broker.role(), + "dealer": r.dealer.role(), } return welcome, nil } diff --git a/router/router.go b/router/router.go index 33651f0e..d4feba3b 100644 --- a/router/router.go +++ b/router/router.go @@ -357,8 +357,8 @@ func (r *router) addRealm(config *RealmConfig) (*realm, error) { realm, err := newRealm( config, - NewBroker(r.log, config.StrictURI, config.AllowDisclose, r.debug, config.PublishFilterFactory), - NewDealer(r.log, config.StrictURI, config.AllowDisclose, r.debug), + newBroker(r.log, config.StrictURI, config.AllowDisclose, r.debug, config.PublishFilterFactory), + newDealer(r.log, config.StrictURI, config.AllowDisclose, r.debug), r.log, r.debug) if err != nil { return nil, err