From cb64856d06cf37af30f3f7d9102d8b72b818a87d Mon Sep 17 00:00:00 2001 From: Justin Tieri <37750742+jtieri@users.noreply.github.com> Date: Fri, 2 Feb 2024 17:00:47 -0600 Subject: [PATCH] refactor: avoid killing entire rly process Currently, the relayer will terminate the entire process if any of the ChainProcessors encounter critical errors due to underlying node issues. Instead, we should keep the rly process running and let individual ChainProcessors and PathProcessors be terminated. --- relayer/processor/event_processor.go | 39 ++++++++++++++++++---------- relayer/processor/path_processor.go | 6 ++++- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/relayer/processor/event_processor.go b/relayer/processor/event_processor.go index 37dba66fc..5bbaa5205 100644 --- a/relayer/processor/event_processor.go +++ b/relayer/processor/event_processor.go @@ -2,8 +2,7 @@ package processor import ( "context" - - "golang.org/x/sync/errgroup" + "sync" ) // EventProcessorBuilder is a configuration type with .With functions used for building an EventProcessor. @@ -91,25 +90,37 @@ func (ep EventProcessorBuilder) Build() EventProcessor { // It will return once all PathProcessors and ChainProcessors have stopped running due to context cancellation, // or if a critical error has occurred within one of the ChainProcessors. func (ep EventProcessor) Run(ctx context.Context) error { - var eg errgroup.Group + var wg sync.WaitGroup + runCtx, runCtxCancel := context.WithCancel(ctx) for _, pathProcessor := range ep.pathProcessors { pathProcessor := pathProcessor - eg.Go(func() error { + + wg.Add(1) + go func() { pathProcessor.Run(runCtx, runCtxCancel) - return nil - }) + wg.Done() + }() } + for _, chainProcessor := range ep.chainProcessors { chainProcessor := chainProcessor - eg.Go(func() error { - err := chainProcessor.Run(runCtx, ep.initialBlockHistory, ep.stuckPacket) - // Signal the other chain processors to exit. - runCtxCancel() - return err - }) + + wg.Add(1) + go func() { + err := func() error { + err := chainProcessor.Run(runCtx, ep.initialBlockHistory, ep.stuckPacket) + return err + }() + if err != nil { + // TODO: do we need to log errors here or have they already been logged by ChainProcessor? + } + wg.Done() + }() } - err := eg.Wait() + + wg.Wait() runCtxCancel() - return err + + return nil } diff --git a/relayer/processor/path_processor.go b/relayer/processor/path_processor.go index b8b9d4da2..10677b9e2 100644 --- a/relayer/processor/path_processor.go +++ b/relayer/processor/path_processor.go @@ -399,7 +399,11 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) { return } - for len(pp.pathEnd1.incomingCacheData) > 0 || len(pp.pathEnd2.incomingCacheData) > 0 || len(pp.retryProcess) > 0 || len(pp.pathEnd1.finishedProcessing) > 0 || len(pp.pathEnd2.finishedProcessing) > 0 { + for len(pp.pathEnd1.incomingCacheData) > 0 || + len(pp.pathEnd2.incomingCacheData) > 0 || + len(pp.retryProcess) > 0 || + len(pp.pathEnd1.finishedProcessing) > 0 || + len(pp.pathEnd2.finishedProcessing) > 0 { // signals are available, so this will not need to block. if pp.processAvailableSignals(ctx, cancel) { return