diff --git a/internal/pool/subscriber.go b/internal/pool/subscriber.go index 920c511..9b8cd5a 100644 --- a/internal/pool/subscriber.go +++ b/internal/pool/subscriber.go @@ -111,41 +111,35 @@ func NewTransferOperationSubscriber(pool *Pool, tendermint *http.HTTP, log *loga } func (o *OperationSubscriber) Run(ctx context.Context) { - go func() { - for { - select { - case <-ctx.Done(): - o.log.Info("Context finished") - return - default: - o.log.Infof("[Pool] Subscribing to the pool. Query: %s", o.query) - o.runner(ctx) - } - } - }() -} + o.log.Infof("[Pool] Subscribing to the pool. Query: %s", o.query) -func (o *OperationSubscriber) runner(ctx context.Context) { out, err := o.client.Subscribe(ctx, OpServiceName, o.query, OpPoolSize) if err != nil { panic(err) } - for { - c, ok := <-out - if !ok { - o.log.Info("[Pool] WS unsubscribed. Resubscribing...") - if err := o.client.Unsubscribe(ctx, OpServiceName, o.query); err != nil { - o.log.WithError(err).Error("[Pool] Failed to unsubscribe from new operations") - } - break - } + go func() { + for { + select { + case <-ctx.Done(): + if err := o.client.Unsubscribe(ctx, OpServiceName, o.query); err != nil { + o.log.WithError(err).Error("[Pool] Failed to unsubscribe from new operations") + } - for _, index := range c.Events[fmt.Sprintf("%s.%s", rarimo.EventTypeOperationApproved, rarimo.AttributeKeyOperationId)] { - o.log.Infof("[Pool] New operation found index=%s", index) - if err := o.pool.Add(index); err != nil { - o.log.WithError(err).Error("error adding operation to the pool") + o.log.Info("Context finished") + return + case c, ok := <-out: + if !ok { + o.log.WithError(err).Fatal("[Pool] chanel closed") + } + + for _, index := range c.Events[fmt.Sprintf("%s.%s", rarimo.EventTypeOperationApproved, rarimo.AttributeKeyOperationId)] { + o.log.Infof("[Pool] New operation found index=%s", index) + if err := o.pool.Add(index); err != nil { + o.log.WithError(err).Error("error adding operation to the pool") + } + } } } - } + }() } diff --git a/internal/timer/subscriber.go b/internal/timer/subscriber.go index f5cc50e..b943f54 100644 --- a/internal/timer/subscriber.go +++ b/internal/timer/subscriber.go @@ -32,41 +32,33 @@ func NewBlockSubscriber(timer *Timer, tendermint *http.HTTP, log *logan.Entry) * } func (b *BlockSubscriber) Run(ctx context.Context) { + out, err := b.client.Subscribe(ctx, BlockServiceName, BlockQuery, ChanelCap) + if err != nil { + panic(err) + } + go func() { for { select { case <-ctx.Done(): + if err := b.client.Unsubscribe(ctx, BlockServiceName, BlockQuery); err != nil { + b.log.WithError(err).Error("[Block] failed to unsubscribe from new blocks") + } + b.log.Info("Context finished") return - default: - b.runner(ctx) - b.log.Info("[Block] Resubscribing to the blocks...") - } - } - }() -} + case c, ok := <-out: + if !ok { + b.log.WithError(err).Fatal("[Block] chanel closed") + } -func (b *BlockSubscriber) runner(ctx context.Context) { - out, err := b.client.Subscribe(ctx, BlockServiceName, BlockQuery, ChanelCap) - if err != nil { - panic(err) - } - - for { - c, ok := <-out - if !ok { - if err := b.client.Unsubscribe(ctx, BlockServiceName, BlockQuery); err != nil { - b.log.WithError(err).Error("[Block] failed to unsubscribe from new blocks") + switch data := c.Data.(type) { + case types.EventDataNewBlock: + b.log.Infof("[Block] Received New Block %s height: %d", data.Block.Hash().String(), data.Block.Height) + b.timer.newBlock(uint64(data.Block.Height)) + } } - break } + }() - switch data := c.Data.(type) { - case types.EventDataNewBlock: - b.log.Infof("[Block] Received New Block %s height: %d", data.Block.Hash().String(), data.Block.Height) - b.timer.newBlock(uint64(data.Block.Height)) - default: - - } - } }