Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improved chaincode event view #506

Merged
merged 7 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions integration/fabric/events/chaincode/views/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ SPDX-License-Identifier: Apache-2.0
package views

import (
context2 "context"
"encoding/json"
"sync"
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/services/chaincode"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/assert"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
"go.uber.org/zap/zapcore"
"github.com/pkg/errors"
)

var logger = flogging.MustGetLogger("view-events")
Expand All @@ -36,33 +38,56 @@ func (c *EventsView) Call(context view.Context) (interface{}, error) {
wg := sync.WaitGroup{}
wg.Add(1)
var eventReceived *chaincode.Event
var eventError error

// Register for events
callBack := func(event *chaincode.Event) (bool, error) {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("Chaincode Event Received in callback %s", event.EventName)
logger.Debugf("Chaincode Event Received in callback %s", event.EventName)
if event.Err != nil {
eventError = event.Err
wg.Done()
return true, nil
}

if event.EventName == c.EventName {
eventReceived = event
defer wg.Done()
wg.Done()
return true, nil
}

return false, nil
}

_, err := context.RunView(chaincode.NewListenToEventsView("events", callBack))
// Test timeout
ctx, cancelFunc := context2.WithTimeout(context.Context(), 10*time.Second)
defer cancelFunc()
_, err := context.RunView(chaincode.NewListenToEventsViewWithContext(ctx, "events", callBack))
assert.NoError(err, "failed to listen to events")
wg.Wait()
assert.Error(eventError, "expected error to have happened")
assert.Equal(errors.Cause(eventError), context2.DeadlineExceeded, "expected deadline exceeded error")
cancelFunc()

// Now invoke the chaincode
// Invoke the chaincode
wg.Add(1)
eventReceived = nil
eventError = nil
ctx1, cancelFunc1 := context2.WithTimeout(context.Context(), 10*time.Minute)
defer cancelFunc1()
_, err = context.RunView(chaincode.NewListenToEventsViewWithContext(ctx1, "events", callBack))
assert.NoError(err, "failed to listen to events")
_, err = context.RunView(
chaincode.NewInvokeView(
"events",
c.Function,
),
)
assert.NoError(err, "Failed Running Invoke View ")

// wait for the event to arriver
assert.NoError(err, "Failed Running Invoke View")
wg.Wait()
assert.NoError(eventError, "expected error to not have happened")
assert.NotNil(eventReceived, "expected to have received an event")

return &EventReceived{
Event: eventReceived,
}, nil
Expand Down
1 change: 1 addition & 0 deletions platform/fabric/core/generic/committer/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ChaincodeEvent struct {
ChaincodeID string
EventName string
Payload []byte
Err error
}

func (chaincodeEvent *ChaincodeEvent) Message() interface{} {
Expand Down
75 changes: 59 additions & 16 deletions platform/fabric/services/chaincode/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
package chaincode

import (
"context"

"github.com/hyperledger-labs/fabric-smart-client/platform/fabric"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/committer"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
Expand All @@ -29,6 +31,7 @@ type info struct {
}

type RegisterChaincodeCall struct {
Context context.Context
InvokerIdentity view.Identity
Network string
Channel string
Expand All @@ -38,32 +41,42 @@ type RegisterChaincodeCall struct {
CallBack EventCallback
}

type listenToEventsView struct {
type ListenToEventsView struct {
*RegisterChaincodeCall
}

// NewListenToEventsView register a listener for the events generated by the passed chaincode
// and call the passed callback when an event is caught.
func NewListenToEventsView(chaincode string, callBack EventCallback) *listenToEventsView {
return &listenToEventsView{
func NewListenToEventsView(chaincode string, callBack EventCallback) *ListenToEventsView {
return &ListenToEventsView{
RegisterChaincodeCall: &RegisterChaincodeCall{
ChaincodeName: chaincode,
CallBack: callBack,
},
}
}

func NewListenToEventsViewWithContext(context context.Context, chaincode string, callBack EventCallback) *ListenToEventsView {
return &ListenToEventsView{
RegisterChaincodeCall: &RegisterChaincodeCall{
ChaincodeName: chaincode,
CallBack: callBack,
Context: context,
},
}
}

func (r *listenToEventsView) Call(context view.Context) (interface{}, error) {
func (r *ListenToEventsView) Call(context view.Context) (interface{}, error) {
err := r.RegisterChaincodeEvents(context)
if err != nil {
return nil, err
}
return nil, nil
}

func (r *listenToEventsView) RegisterChaincodeEvents(context view.Context) error {
func (r *ListenToEventsView) RegisterChaincodeEvents(viewContext view.Context) error {
// TODO: endorse and then send to ordering
chaincode, err := getChaincode(context, &info{
chaincode, err := getChaincode(viewContext, &info{
chaincodeName: r.ChaincodeName,
network: r.Network,
channel: r.Channel,
Expand All @@ -72,27 +85,57 @@ func (r *listenToEventsView) RegisterChaincodeEvents(context view.Context) error
if err != nil {
return errors.Wrapf(err, "failed to get chaincode [%s:%s:%s]", r.Network, r.Channel, r.ChaincodeName)
}
logger.Debugf("getting chaincode events stream for [%s:%s:%s]", r.Network, r.Channel, r.ChaincodeName)
events, err := chaincode.EventListener.ChaincodeEvents()
if err != nil {
return errors.Wrapf(err, "failed to get chaincode event channel [%s:%s:%s]", r.Network, r.Channel, r.ChaincodeName)
}

go func() {
for event := range events {
stop, err := r.CallBack(event)
if err != nil {
logger.Errorf("callback failed [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err)
break
defer func() {
if err := chaincode.EventListener.CloseChaincodeEvents(); err != nil {
logger.Errorf("Failed to close event channel [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err)
}
}()

ctx := r.Context
if ctx == nil {
ctx = context.Background()
}
stop := false
for {
select {
case event := <-events:
logger.Debugf("got chaincode event for [%s:%s:%s], event name [%s]", r.Network, r.Channel, r.ChaincodeName, event.EventName)
stop, err = r.CallBack(event)
if err != nil {
logger.Errorf("callback failed [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err)
}
case <-ctx.Done():
originErr := ctx.Err()
logger.Debugf("context done with err [%s]", originErr)
_, err = r.CallBack(&committer.ChaincodeEvent{
Err: errors.Wrapf(originErr, "context done"),
})
if err != nil {
logger.Errorf("callback failed [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err)
}
stop = true
case <-viewContext.Context().Done():
originErr := viewContext.Context().Err()
logger.Debugf("view context done with err [%s]", originErr)
_, err = r.CallBack(&committer.ChaincodeEvent{
Err: errors.Wrapf(originErr, "view context done"),
})
if err != nil {
logger.Errorf("callback failed [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err)
}
stop = true
}
if stop {
break
}
}

err := chaincode.EventListener.CloseChaincodeEvents()
if err != nil {
logger.Errorf("Failed to close event channel [%s:%s:%s]: [%s]", r.Network, r.Channel, r.ChaincodeName, err)
}
}()

return nil
Expand Down
22 changes: 14 additions & 8 deletions platform/view/services/comm/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
)

func newHost(ListenAddress string, keyDispenser PrivateKeyDispenser, metrics *Metrics) (*P2PNode, error) {
Expand Down Expand Up @@ -97,9 +98,9 @@ func NewBootstrapNode(ListenAddress string, keyDispenser PrivateKeyDispenser, me
}

node.host.Peerstore().AddAddrs(node.host.ID(), node.host.Addrs(), time.Hour)

node.start()

if err := node.start(false); err != nil {
return nil, err
}
return node, nil
}

Expand All @@ -123,9 +124,9 @@ func NewNode(ListenAddress, BootstrapNode string, keyDispenser PrivateKeyDispens
if err != nil {
return nil, err
}

node.start()

if err := node.start(false); err != nil {
return nil, err
}
return node, nil
}

Expand Down Expand Up @@ -161,14 +162,19 @@ func (p *P2PNode) startFinder() {
}
}

func (p *P2PNode) start() {
func (p *P2PNode) start(failAdv bool) error {
_, err := p.finder.Advertise(context.Background(), rendezVousString)
if err != nil {
logger.Errorf("error while announcing: %s", err)
if failAdv {
return errors.Wrap(err, "error while announcing")
}
logger.Errorf("error while announcing [%s]", err)
}

p.host.SetStreamHandler(protocol.ID(viewProtocol), p.handleStream())

p.finderWg.Add(1)
go p.startFinder()

return nil
}
20 changes: 11 additions & 9 deletions platform/view/services/events/simple/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,41 @@ type eventHandler struct {
receiver events.Listener
}

type eventBus struct {
type EventBus struct {
handlers map[string][]*eventHandler
lock sync.RWMutex
}

func NewEventBus() *eventBus {
return &eventBus{
func NewEventBus() *EventBus {
return &EventBus{
handlers: make(map[string][]*eventHandler),
lock: sync.RWMutex{},
}
}

func (e *eventBus) Publish(event events.Event) {
func (e *EventBus) Publish(event events.Event) {
if event == nil {
return
}

e.lock.RLock()
defer e.lock.RUnlock()

subs, ok := e.handlers[event.Topic()]
if !ok {
e.lock.RUnlock()
// no subscriber ok
return
}
cloned := make([]*eventHandler, len(subs))
copy(cloned, subs)
e.lock.RUnlock()

// call all receivers
for _, sub := range subs {
for _, sub := range cloned {
sub.receiver.OnReceive(event)
}
}

func (e *eventBus) Subscribe(topic string, receiver events.Listener) {
func (e *EventBus) Subscribe(topic string, receiver events.Listener) {
if receiver == nil {
return
}
Expand All @@ -60,7 +62,7 @@ func (e *eventBus) Subscribe(topic string, receiver events.Listener) {
e.handlers[topic] = append(handlers, &eventHandler{receiver: receiver})
}

func (e *eventBus) Unsubscribe(topic string, receiver events.Listener) {
func (e *EventBus) Unsubscribe(topic string, receiver events.Listener) {
if receiver == nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion platform/view/services/events/simple/eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var _ = Describe("Event system", func() {
})

When("publish and subscribe", func() {
var notifier *eventBus
var notifier *EventBus

var alice events.Subscriber
var bob events.Publisher
Expand Down