From 8b4be440f5b5d98dddad6ab31d920356ec8639de Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Mon, 18 Dec 2023 14:20:09 -0800 Subject: [PATCH] cherry pick: 1fdb0f55bf --- baseapp/baseapp.go | 6 ++++++ baseapp/chain_stream.go | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 baseapp/chain_stream.go diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 8de14c5f058b..a3198f8482f3 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -184,6 +184,9 @@ type BaseApp struct { // including the goroutine handling.This is experimental and must be enabled // by developers. optimisticExec *oe.OptimisticExecution + + // StreamEvents + StreamEvents chan StreamEvents } // NewBaseApp returns a reference to an initialized BaseApp. It accepts a @@ -203,6 +206,7 @@ func NewBaseApp( txDecoder: txDecoder, fauxMerkleMode: false, queryGasLimit: math.MaxUint64, + StreamEvents: make(chan StreamEvents), } for _, option := range options { @@ -719,6 +723,7 @@ func (app *BaseApp) beginBlock(req *abci.RequestFinalizeBlock) (sdk.BeginBlock, ) } + app.AddStreamEvents(app.finalizeBlockState.ctx.BlockHeight(), resp.Events, true) resp.Events = sdk.MarkEventsToIndex(resp.Events, app.indexEvents) } @@ -781,6 +786,7 @@ func (app *BaseApp) endBlock(ctx context.Context) (sdk.EndBlock, error) { ) } + app.AddStreamEvents(app.finalizeBlockState.ctx.BlockHeight(), eb.Events, true) eb.Events = sdk.MarkEventsToIndex(eb.Events, app.indexEvents) endblock = eb } diff --git a/baseapp/chain_stream.go b/baseapp/chain_stream.go new file mode 100644 index 000000000000..aae030b8d7ea --- /dev/null +++ b/baseapp/chain_stream.go @@ -0,0 +1,19 @@ +package baseapp + +import "github.com/cometbft/cometbft/abci/types" + +type StreamEvents struct { + Events []types.Event + Height uint64 + Flush bool +} + +func (app *BaseApp) AddStreamEvents(height int64, events []types.Event, flush bool) { + go func() { + app.StreamEvents <- StreamEvents{ + Events: events, + Height: uint64(height), + Flush: flush, + } + }() +}