Skip to content

Commit

Permalink
refactored: tendermint subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
olegfomenko committed Mar 4, 2024
1 parent 73443fd commit 78575c5
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 55 deletions.
50 changes: 22 additions & 28 deletions internal/pool/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,41 +111,35 @@ func NewTransferOperationSubscriber(pool *Pool, tendermint *http.HTTP, log *loga
}

func (o *OperationSubscriber) Run(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
o.log.Info("Context finished")
return
default:
o.log.Infof("[Pool] Subscribing to the pool. Query: %s", o.query)
o.runner(ctx)
}
}
}()
}
o.log.Infof("[Pool] Subscribing to the pool. Query: %s", o.query)

func (o *OperationSubscriber) runner(ctx context.Context) {
out, err := o.client.Subscribe(ctx, OpServiceName, o.query, OpPoolSize)
if err != nil {
panic(err)
}

for {
c, ok := <-out
if !ok {
o.log.Info("[Pool] WS unsubscribed. Resubscribing...")
if err := o.client.Unsubscribe(ctx, OpServiceName, o.query); err != nil {
o.log.WithError(err).Error("[Pool] Failed to unsubscribe from new operations")
}
break
}
go func() {
for {
select {
case <-ctx.Done():
if err := o.client.Unsubscribe(ctx, OpServiceName, o.query); err != nil {
o.log.WithError(err).Error("[Pool] Failed to unsubscribe from new operations")
}

for _, index := range c.Events[fmt.Sprintf("%s.%s", rarimo.EventTypeOperationApproved, rarimo.AttributeKeyOperationId)] {
o.log.Infof("[Pool] New operation found index=%s", index)
if err := o.pool.Add(index); err != nil {
o.log.WithError(err).Error("error adding operation to the pool")
o.log.Info("Context finished")
return
case c, ok := <-out:
if !ok {
o.log.WithError(err).Fatal("[Pool] chanel closed")
}

for _, index := range c.Events[fmt.Sprintf("%s.%s", rarimo.EventTypeOperationApproved, rarimo.AttributeKeyOperationId)] {
o.log.Infof("[Pool] New operation found index=%s", index)
if err := o.pool.Add(index); err != nil {
o.log.WithError(err).Error("error adding operation to the pool")
}
}
}
}
}
}()
}
46 changes: 19 additions & 27 deletions internal/timer/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,41 +32,33 @@ func NewBlockSubscriber(timer *Timer, tendermint *http.HTTP, log *logan.Entry) *
}

func (b *BlockSubscriber) Run(ctx context.Context) {
out, err := b.client.Subscribe(ctx, BlockServiceName, BlockQuery, ChanelCap)
if err != nil {
panic(err)
}

go func() {
for {
select {
case <-ctx.Done():
if err := b.client.Unsubscribe(ctx, BlockServiceName, BlockQuery); err != nil {
b.log.WithError(err).Error("[Block] failed to unsubscribe from new blocks")
}

b.log.Info("Context finished")
return
default:
b.runner(ctx)
b.log.Info("[Block] Resubscribing to the blocks...")
}
}
}()
}
case c, ok := <-out:
if !ok {
b.log.WithError(err).Fatal("[Block] chanel closed")
}

func (b *BlockSubscriber) runner(ctx context.Context) {
out, err := b.client.Subscribe(ctx, BlockServiceName, BlockQuery, ChanelCap)
if err != nil {
panic(err)
}

for {
c, ok := <-out
if !ok {
if err := b.client.Unsubscribe(ctx, BlockServiceName, BlockQuery); err != nil {
b.log.WithError(err).Error("[Block] failed to unsubscribe from new blocks")
switch data := c.Data.(type) {
case types.EventDataNewBlock:
b.log.Infof("[Block] Received New Block %s height: %d", data.Block.Hash().String(), data.Block.Height)
b.timer.newBlock(uint64(data.Block.Height))
}
}
break
}
}()

switch data := c.Data.(type) {
case types.EventDataNewBlock:
b.log.Infof("[Block] Received New Block %s height: %d", data.Block.Hash().String(), data.Block.Height)
b.timer.newBlock(uint64(data.Block.Height))
default:

}
}
}

0 comments on commit 78575c5

Please sign in to comment.