diff --git a/integration/fabric/events/chaincode/views/events.go b/integration/fabric/events/chaincode/views/events.go index 11d06ba96..a5f2d57c7 100644 --- a/integration/fabric/events/chaincode/views/events.go +++ b/integration/fabric/events/chaincode/views/events.go @@ -7,14 +7,16 @@ SPDX-License-Identifier: Apache-2.0 package views import ( + context2 "context" "encoding/json" "sync" + "time" "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/services/chaincode" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/assert" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging" "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" - "go.uber.org/zap/zapcore" + "github.com/pkg/errors" ) var logger = flogging.MustGetLogger("view-events") @@ -36,33 +38,56 @@ func (c *EventsView) Call(context view.Context) (interface{}, error) { wg := sync.WaitGroup{} wg.Add(1) var eventReceived *chaincode.Event + var eventError error // Register for events callBack := func(event *chaincode.Event) (bool, error) { - if logger.IsEnabledFor(zapcore.DebugLevel) { - logger.Debugf("Chaincode Event Received in callback %s", event.EventName) + logger.Debugf("Chaincode Event Received in callback %s", event.EventName) + if event.Err != nil { + eventError = event.Err + wg.Done() + return true, nil } + if event.EventName == c.EventName { eventReceived = event - defer wg.Done() + wg.Done() return true, nil } + return false, nil } - _, err := context.RunView(chaincode.NewListenToEventsView("events", callBack)) + // Test timeout + ctx, cancelFunc := context2.WithTimeout(context.Context(), 10*time.Second) + defer cancelFunc() + _, err := context.RunView(chaincode.NewListenToEventsViewWithContext(ctx, "events", callBack)) assert.NoError(err, "failed to listen to events") + wg.Wait() + assert.Error(eventError, "expected error to have happened") + assert.Equal(errors.Cause(eventError), context2.DeadlineExceeded, "expected deadline exceeded error") + cancelFunc() + + // Now invoke the chaincode // Invoke the chaincode + wg.Add(1) + eventReceived = nil + eventError = nil + ctx1, cancelFunc1 := context2.WithTimeout(context.Context(), 10*time.Minute) + defer cancelFunc1() + _, err = context.RunView(chaincode.NewListenToEventsViewWithContext(ctx1, "events", callBack)) + assert.NoError(err, "failed to listen to events") _, err = context.RunView( chaincode.NewInvokeView( "events", c.Function, ), ) - assert.NoError(err, "Failed Running Invoke View ") - - // wait for the event to arriver + assert.NoError(err, "Failed Running Invoke View") wg.Wait() + assert.NoError(eventError, "expected error to not have happened") + assert.NotNil(eventReceived, "expected to have received an event") + return &EventReceived{ Event: eventReceived, }, nil diff --git a/platform/fabric/core/generic/committer/event.go b/platform/fabric/core/generic/committer/event.go index 41386a43e..157e79987 100644 --- a/platform/fabric/core/generic/committer/event.go +++ b/platform/fabric/core/generic/committer/event.go @@ -32,6 +32,7 @@ type ChaincodeEvent struct { ChaincodeID string EventName string Payload []byte + Err error } func (chaincodeEvent *ChaincodeEvent) Message() interface{} { diff --git a/platform/fabric/services/chaincode/events.go b/platform/fabric/services/chaincode/events.go index 66dd3afec..abd577150 100644 --- a/platform/fabric/services/chaincode/events.go +++ b/platform/fabric/services/chaincode/events.go @@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0 package chaincode import ( + "context" + "github.com/hyperledger-labs/fabric-smart-client/platform/fabric" "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/committer" "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" @@ -29,6 +31,7 @@ type info struct { } type RegisterChaincodeCall struct { + Context context.Context InvokerIdentity view.Identity Network string Channel string @@ -38,22 +41,32 @@ type RegisterChaincodeCall struct { CallBack EventCallback } -type listenToEventsView struct { +type ListenToEventsView struct { *RegisterChaincodeCall } // NewListenToEventsView register a listener for the events generated by the passed chaincode // and call the passed callback when an event is caught. -func NewListenToEventsView(chaincode string, callBack EventCallback) *listenToEventsView { - return &listenToEventsView{ +func NewListenToEventsView(chaincode string, callBack EventCallback) *ListenToEventsView { + return &ListenToEventsView{ + RegisterChaincodeCall: &RegisterChaincodeCall{ + ChaincodeName: chaincode, + CallBack: callBack, + }, + } +} + +func NewListenToEventsViewWithContext(context context.Context, chaincode string, callBack EventCallback) *ListenToEventsView { + return &ListenToEventsView{ RegisterChaincodeCall: &RegisterChaincodeCall{ ChaincodeName: chaincode, CallBack: callBack, + Context: context, }, } } -func (r *listenToEventsView) Call(context view.Context) (interface{}, error) { +func (r *ListenToEventsView) Call(context view.Context) (interface{}, error) { err := r.RegisterChaincodeEvents(context) if err != nil { return nil, err @@ -61,9 +74,9 @@ func (r *listenToEventsView) Call(context view.Context) (interface{}, error) { return nil, nil } -func (r *listenToEventsView) RegisterChaincodeEvents(context view.Context) error { +func (r *ListenToEventsView) RegisterChaincodeEvents(viewContext view.Context) error { // TODO: endorse and then send to ordering - chaincode, err := getChaincode(context, &info{ + chaincode, err := getChaincode(viewContext, &info{ chaincodeName: r.ChaincodeName, network: r.Network, channel: r.Channel, @@ -72,27 +85,57 @@ func (r *listenToEventsView) RegisterChaincodeEvents(context view.Context) error if err != nil { return errors.Wrapf(err, "failed to get chaincode [%s:%s:%s]", r.Network, r.Channel, r.ChaincodeName) } + logger.Debugf("getting chaincode events stream for [%s:%s:%s]", r.Network, r.Channel, r.ChaincodeName) events, err := chaincode.EventListener.ChaincodeEvents() if err != nil { return errors.Wrapf(err, "failed to get chaincode event channel [%s:%s:%s]", r.Network, r.Channel, r.ChaincodeName) } go func() { - for event := range events { - stop, err := r.CallBack(event) - if err != nil { - logger.Errorf("callback failed [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err) - break + defer func() { + if err := chaincode.EventListener.CloseChaincodeEvents(); err != nil { + logger.Errorf("Failed to close event channel [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err) + } + }() + + ctx := r.Context + if ctx == nil { + ctx = context.Background() + } + stop := false + for { + select { + case event := <-events: + logger.Debugf("got chaincode event for [%s:%s:%s], event name [%s]", r.Network, r.Channel, r.ChaincodeName, event.EventName) + stop, err = r.CallBack(event) + if err != nil { + logger.Errorf("callback failed [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err) + } + case <-ctx.Done(): + originErr := ctx.Err() + logger.Debugf("context done with err [%s]", originErr) + _, err = r.CallBack(&committer.ChaincodeEvent{ + Err: errors.Wrapf(originErr, "context done"), + }) + if err != nil { + logger.Errorf("callback failed [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err) + } + stop = true + case <-viewContext.Context().Done(): + originErr := viewContext.Context().Err() + logger.Debugf("view context done with err [%s]", originErr) + _, err = r.CallBack(&committer.ChaincodeEvent{ + Err: errors.Wrapf(originErr, "view context done"), + }) + if err != nil { + logger.Errorf("callback failed [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err) + } + stop = true } if stop { break } } - - err := chaincode.EventListener.CloseChaincodeEvents() - if err != nil { - logger.Errorf("Failed to close event channel [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err) - } }() return nil diff --git a/platform/view/services/comm/builder.go b/platform/view/services/comm/builder.go index 28604a900..6431c6435 100644 --- a/platform/view/services/comm/builder.go +++ b/platform/view/services/comm/builder.go @@ -19,6 +19,7 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/multiformats/go-multiaddr" + "github.com/pkg/errors" ) func newHost(ListenAddress string, keyDispenser PrivateKeyDispenser, metrics *Metrics) (*P2PNode, error) { @@ -97,9 +98,9 @@ func NewBootstrapNode(ListenAddress string, keyDispenser PrivateKeyDispenser, me } node.host.Peerstore().AddAddrs(node.host.ID(), node.host.Addrs(), time.Hour) - - node.start() - + if err := node.start(false); err != nil { + return nil, err + } return node, nil } @@ -123,9 +124,9 @@ func NewNode(ListenAddress, BootstrapNode string, keyDispenser PrivateKeyDispens if err != nil { return nil, err } - - node.start() - + if err := node.start(false); err != nil { + return nil, err + } return node, nil } @@ -161,14 +162,19 @@ func (p *P2PNode) startFinder() { } } -func (p *P2PNode) start() { +func (p *P2PNode) start(failAdv bool) error { _, err := p.finder.Advertise(context.Background(), rendezVousString) if err != nil { - logger.Errorf("error while announcing: %s", err) + if failAdv { + return errors.Wrap(err, "error while announcing") + } + logger.Errorf("error while announcing [%s]", err) } p.host.SetStreamHandler(protocol.ID(viewProtocol), p.handleStream()) p.finderWg.Add(1) go p.startFinder() + + return nil } diff --git a/platform/view/services/events/simple/eventbus.go b/platform/view/services/events/simple/eventbus.go index 92f62be02..794791913 100644 --- a/platform/view/services/events/simple/eventbus.go +++ b/platform/view/services/events/simple/eventbus.go @@ -16,39 +16,41 @@ type eventHandler struct { receiver events.Listener } -type eventBus struct { +type EventBus struct { handlers map[string][]*eventHandler lock sync.RWMutex } -func NewEventBus() *eventBus { - return &eventBus{ +func NewEventBus() *EventBus { + return &EventBus{ handlers: make(map[string][]*eventHandler), lock: sync.RWMutex{}, } } -func (e *eventBus) Publish(event events.Event) { +func (e *EventBus) Publish(event events.Event) { if event == nil { return } e.lock.RLock() - defer e.lock.RUnlock() - subs, ok := e.handlers[event.Topic()] if !ok { + e.lock.RUnlock() // no subscriber ok return } + cloned := make([]*eventHandler, len(subs)) + copy(cloned, subs) + e.lock.RUnlock() // call all receivers - for _, sub := range subs { + for _, sub := range cloned { sub.receiver.OnReceive(event) } } -func (e *eventBus) Subscribe(topic string, receiver events.Listener) { +func (e *EventBus) Subscribe(topic string, receiver events.Listener) { if receiver == nil { return } @@ -60,7 +62,7 @@ func (e *eventBus) Subscribe(topic string, receiver events.Listener) { e.handlers[topic] = append(handlers, &eventHandler{receiver: receiver}) } -func (e *eventBus) Unsubscribe(topic string, receiver events.Listener) { +func (e *EventBus) Unsubscribe(topic string, receiver events.Listener) { if receiver == nil { return } diff --git a/platform/view/services/events/simple/eventbus_test.go b/platform/view/services/events/simple/eventbus_test.go index 37d1e6008..88e3131cb 100644 --- a/platform/view/services/events/simple/eventbus_test.go +++ b/platform/view/services/events/simple/eventbus_test.go @@ -34,7 +34,7 @@ var _ = Describe("Event system", func() { }) When("publish and subscribe", func() { - var notifier *eventBus + var notifier *EventBus var alice events.Subscriber var bob events.Publisher