Skip to content

Commit

Permalink
improved chaincode event view (#506)
Browse files Browse the repository at this point in the history
Signed-off-by: Angelo De Caro <[email protected]>
  • Loading branch information
adecaro authored Nov 2, 2023
1 parent b3425a2 commit 5fa8a42
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 42 deletions.
41 changes: 33 additions & 8 deletions integration/fabric/events/chaincode/views/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ SPDX-License-Identifier: Apache-2.0
package views

import (
context2 "context"
"encoding/json"
"sync"
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/services/chaincode"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/assert"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
"go.uber.org/zap/zapcore"
"github.com/pkg/errors"
)

var logger = flogging.MustGetLogger("view-events")
Expand All @@ -36,33 +38,56 @@ func (c *EventsView) Call(context view.Context) (interface{}, error) {
wg := sync.WaitGroup{}
wg.Add(1)
var eventReceived *chaincode.Event
var eventError error

// Register for events
callBack := func(event *chaincode.Event) (bool, error) {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("Chaincode Event Received in callback %s", event.EventName)
logger.Debugf("Chaincode Event Received in callback %s", event.EventName)
if event.Err != nil {
eventError = event.Err
wg.Done()
return true, nil
}

if event.EventName == c.EventName {
eventReceived = event
defer wg.Done()
wg.Done()
return true, nil
}

return false, nil
}

_, err := context.RunView(chaincode.NewListenToEventsView("events", callBack))
// Test timeout
ctx, cancelFunc := context2.WithTimeout(context.Context(), 10*time.Second)
defer cancelFunc()
_, err := context.RunView(chaincode.NewListenToEventsViewWithContext(ctx, "events", callBack))
assert.NoError(err, "failed to listen to events")
wg.Wait()
assert.Error(eventError, "expected error to have happened")
assert.Equal(errors.Cause(eventError), context2.DeadlineExceeded, "expected deadline exceeded error")
cancelFunc()

// Now invoke the chaincode
// Invoke the chaincode
wg.Add(1)
eventReceived = nil
eventError = nil
ctx1, cancelFunc1 := context2.WithTimeout(context.Context(), 10*time.Minute)
defer cancelFunc1()
_, err = context.RunView(chaincode.NewListenToEventsViewWithContext(ctx1, "events", callBack))
assert.NoError(err, "failed to listen to events")
_, err = context.RunView(
chaincode.NewInvokeView(
"events",
c.Function,
),
)
assert.NoError(err, "Failed Running Invoke View ")

// wait for the event to arriver
assert.NoError(err, "Failed Running Invoke View")
wg.Wait()
assert.NoError(eventError, "expected error to not have happened")
assert.NotNil(eventReceived, "expected to have received an event")

return &EventReceived{
Event: eventReceived,
}, nil
Expand Down
1 change: 1 addition & 0 deletions platform/fabric/core/generic/committer/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ChaincodeEvent struct {
ChaincodeID string
EventName string
Payload []byte
Err error
}

func (chaincodeEvent *ChaincodeEvent) Message() interface{} {
Expand Down
75 changes: 59 additions & 16 deletions platform/fabric/services/chaincode/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
package chaincode

import (
"context"

"github.com/hyperledger-labs/fabric-smart-client/platform/fabric"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/committer"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
Expand All @@ -29,6 +31,7 @@ type info struct {
}

type RegisterChaincodeCall struct {
Context context.Context
InvokerIdentity view.Identity
Network string
Channel string
Expand All @@ -38,32 +41,42 @@ type RegisterChaincodeCall struct {
CallBack EventCallback
}

type listenToEventsView struct {
type ListenToEventsView struct {
*RegisterChaincodeCall
}

// NewListenToEventsView register a listener for the events generated by the passed chaincode
// and call the passed callback when an event is caught.
func NewListenToEventsView(chaincode string, callBack EventCallback) *listenToEventsView {
return &listenToEventsView{
func NewListenToEventsView(chaincode string, callBack EventCallback) *ListenToEventsView {
return &ListenToEventsView{
RegisterChaincodeCall: &RegisterChaincodeCall{
ChaincodeName: chaincode,
CallBack: callBack,
},
}
}

func NewListenToEventsViewWithContext(context context.Context, chaincode string, callBack EventCallback) *ListenToEventsView {
return &ListenToEventsView{
RegisterChaincodeCall: &RegisterChaincodeCall{
ChaincodeName: chaincode,
CallBack: callBack,
Context: context,
},
}
}

func (r *listenToEventsView) Call(context view.Context) (interface{}, error) {
func (r *ListenToEventsView) Call(context view.Context) (interface{}, error) {
err := r.RegisterChaincodeEvents(context)
if err != nil {
return nil, err
}
return nil, nil
}

func (r *listenToEventsView) RegisterChaincodeEvents(context view.Context) error {
func (r *ListenToEventsView) RegisterChaincodeEvents(viewContext view.Context) error {
// TODO: endorse and then send to ordering
chaincode, err := getChaincode(context, &info{
chaincode, err := getChaincode(viewContext, &info{
chaincodeName: r.ChaincodeName,
network: r.Network,
channel: r.Channel,
Expand All @@ -72,27 +85,57 @@ func (r *listenToEventsView) RegisterChaincodeEvents(context view.Context) error
if err != nil {
return errors.Wrapf(err, "failed to get chaincode [%s:%s:%s]", r.Network, r.Channel, r.ChaincodeName)
}
logger.Debugf("getting chaincode events stream for [%s:%s:%s]", r.Network, r.Channel, r.ChaincodeName)
events, err := chaincode.EventListener.ChaincodeEvents()
if err != nil {
return errors.Wrapf(err, "failed to get chaincode event channel [%s:%s:%s]", r.Network, r.Channel, r.ChaincodeName)
}

go func() {
for event := range events {
stop, err := r.CallBack(event)
if err != nil {
logger.Errorf("callback failed [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err)
break
defer func() {
if err := chaincode.EventListener.CloseChaincodeEvents(); err != nil {
logger.Errorf("Failed to close event channel [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err)
}
}()

ctx := r.Context
if ctx == nil {
ctx = context.Background()
}
stop := false
for {
select {
case event := <-events:
logger.Debugf("got chaincode event for [%s:%s:%s], event name [%s]", r.Network, r.Channel, r.ChaincodeName, event.EventName)
stop, err = r.CallBack(event)
if err != nil {
logger.Errorf("callback failed [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err)
}
case <-ctx.Done():
originErr := ctx.Err()
logger.Debugf("context done with err [%s]", originErr)
_, err = r.CallBack(&committer.ChaincodeEvent{
Err: errors.Wrapf(originErr, "context done"),
})
if err != nil {
logger.Errorf("callback failed [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err)
}
stop = true
case <-viewContext.Context().Done():
originErr := viewContext.Context().Err()
logger.Debugf("view context done with err [%s]", originErr)
_, err = r.CallBack(&committer.ChaincodeEvent{
Err: errors.Wrapf(originErr, "view context done"),
})
if err != nil {
logger.Errorf("callback failed [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err)
}
stop = true
}
if stop {
break
}
}

err := chaincode.EventListener.CloseChaincodeEvents()
if err != nil {
logger.Errorf("Failed to close event channel [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err)
}
}()

return nil
Expand Down
22 changes: 14 additions & 8 deletions platform/view/services/comm/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
)

func newHost(ListenAddress string, keyDispenser PrivateKeyDispenser, metrics *Metrics) (*P2PNode, error) {
Expand Down Expand Up @@ -97,9 +98,9 @@ func NewBootstrapNode(ListenAddress string, keyDispenser PrivateKeyDispenser, me
}

node.host.Peerstore().AddAddrs(node.host.ID(), node.host.Addrs(), time.Hour)

node.start()

if err := node.start(false); err != nil {
return nil, err
}
return node, nil
}

Expand All @@ -123,9 +124,9 @@ func NewNode(ListenAddress, BootstrapNode string, keyDispenser PrivateKeyDispens
if err != nil {
return nil, err
}

node.start()

if err := node.start(false); err != nil {
return nil, err
}
return node, nil
}

Expand Down Expand Up @@ -161,14 +162,19 @@ func (p *P2PNode) startFinder() {
}
}

func (p *P2PNode) start() {
func (p *P2PNode) start(failAdv bool) error {
_, err := p.finder.Advertise(context.Background(), rendezVousString)
if err != nil {
logger.Errorf("error while announcing: %s", err)
if failAdv {
return errors.Wrap(err, "error while announcing")
}
logger.Errorf("error while announcing [%s]", err)
}

p.host.SetStreamHandler(protocol.ID(viewProtocol), p.handleStream())

p.finderWg.Add(1)
go p.startFinder()

return nil
}
20 changes: 11 additions & 9 deletions platform/view/services/events/simple/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,41 @@ type eventHandler struct {
receiver events.Listener
}

type eventBus struct {
type EventBus struct {
handlers map[string][]*eventHandler
lock sync.RWMutex
}

func NewEventBus() *eventBus {
return &eventBus{
func NewEventBus() *EventBus {
return &EventBus{
handlers: make(map[string][]*eventHandler),
lock: sync.RWMutex{},
}
}

func (e *eventBus) Publish(event events.Event) {
func (e *EventBus) Publish(event events.Event) {
if event == nil {
return
}

e.lock.RLock()
defer e.lock.RUnlock()

subs, ok := e.handlers[event.Topic()]
if !ok {
e.lock.RUnlock()
// no subscriber ok
return
}
cloned := make([]*eventHandler, len(subs))
copy(cloned, subs)
e.lock.RUnlock()

// call all receivers
for _, sub := range subs {
for _, sub := range cloned {
sub.receiver.OnReceive(event)
}
}

func (e *eventBus) Subscribe(topic string, receiver events.Listener) {
func (e *EventBus) Subscribe(topic string, receiver events.Listener) {
if receiver == nil {
return
}
Expand All @@ -60,7 +62,7 @@ func (e *eventBus) Subscribe(topic string, receiver events.Listener) {
e.handlers[topic] = append(handlers, &eventHandler{receiver: receiver})
}

func (e *eventBus) Unsubscribe(topic string, receiver events.Listener) {
func (e *EventBus) Unsubscribe(topic string, receiver events.Listener) {
if receiver == nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion platform/view/services/events/simple/eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var _ = Describe("Event system", func() {
})

When("publish and subscribe", func() {
var notifier *eventBus
var notifier *EventBus

var alice events.Subscriber
var bob events.Publisher
Expand Down

0 comments on commit 5fa8a42

Please sign in to comment.