Skip to content

Commit

Permalink
add test and remove recovered atmoic bool
Browse files Browse the repository at this point in the history
  • Loading branch information
kbearXD committed Oct 13, 2023
1 parent de1a884 commit c544937
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 38 deletions.
72 changes: 40 additions & 32 deletions pkg/strategy/grid2/active_order_recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,26 @@ package grid2
import (
"context"
"strconv"
"sync/atomic"
"time"

"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"
)

type SyncActiveOrdersOpts struct {
logger *logrus.Entry
metricsLabels prometheus.Labels
activeOrderBook *bbgo.ActiveOrderBook
orderQueryService types.ExchangeOrderQueryService
exchange types.Exchange
}

func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) {
// every time we activeOrdersRecoverCh receive signal, do active orders recover
s.activeOrdersRecoverCh = make(chan struct{}, 1)
Expand All @@ -22,70 +33,66 @@ func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

opts := SyncActiveOrdersOpts{
logger: s.logger,
metricsLabels: s.newPrometheusLabels(),
activeOrderBook: s.orderExecutor.ActiveMakerOrders(),
orderQueryService: s.orderQueryService,
exchange: s.session.Exchange,
}

for {
select {

case <-ctx.Done():
return

case <-ticker.C:
if err := s.syncActiveOrders(ctx); err != nil {
if err := syncActiveOrders(ctx, opts); err != nil {
log.WithError(err).Errorf("unable to sync active orders")
}

case <-s.activeOrdersRecoverCh:
if err := s.syncActiveOrders(ctx); err != nil {
if err := syncActiveOrders(ctx, opts); err != nil {
log.WithError(err).Errorf("unable to sync active orders")
}

}
}
}

func (s *Strategy) syncActiveOrders(ctx context.Context) error {
s.logger.Infof("[ActiveOrderRecover] syncActiveOrders")
func syncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error {
opts.logger.Infof("[ActiveOrderRecover] syncActiveOrders")

notAddNonExistingOpenOrdersAfter := time.Now().Add(-5 * time.Minute)

recovered := atomic.LoadInt32(&s.recovered)
if recovered == 0 {
s.logger.Infof("[ActiveOrderRecover] skip recovering active orders because recover not ready")
return nil
}

if s.getGrid() == nil {
return nil
}

openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.session.Exchange, s.Symbol)
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, opts.exchange, opts.activeOrderBook.Symbol)
if err != nil {
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time")
return err
opts.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time")
return errors.Wrapf(err, "[ActiveOrderRecover] failed to query open orders, skip this time")
}

metricsNumOfOpenOrders.With(s.newPrometheusLabels()).Set(float64(len(openOrders)))
metricsNumOfOpenOrders.With(opts.metricsLabels).Set(float64(len(openOrders)))

s.mu.Lock()
defer s.mu.Unlock()

activeOrderBook := s.orderExecutor.ActiveMakerOrders()
activeOrders := activeOrderBook.Orders()
activeOrders := opts.activeOrderBook.Orders()

openOrdersMap := make(map[uint64]types.Order)
for _, openOrder := range openOrders {
openOrdersMap[openOrder.OrderID] = openOrder
}

var errs error
// update active orders not in open orders
for _, activeOrder := range activeOrders {
if _, exist := openOrdersMap[activeOrder.OrderID]; exist {
// no need to sync active order already in active orderbook, because we only need to know if it filled or not.
delete(openOrdersMap, activeOrder.OrderID)
} else {
s.logger.Infof("found active order #%d is not in the open orders, updating...", activeOrder.OrderID)
opts.logger.Infof("found active order #%d is not in the open orders, updating...", activeOrder.OrderID)

if err := s.syncActiveOrder(ctx, activeOrderBook, activeOrder.OrderID); err != nil {
s.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order #%d", activeOrder.OrderID)
if err := syncActiveOrder(ctx, opts.activeOrderBook, opts.orderQueryService, activeOrder.OrderID); err != nil {
opts.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order #%d", activeOrder.OrderID)
errs = multierr.Append(errs, err)
continue
}
}
Expand All @@ -98,15 +105,16 @@ func (s *Strategy) syncActiveOrders(ctx context.Context) error {
continue
}

activeOrderBook.Update(openOrder)
opts.activeOrderBook.Add(openOrder)
// opts.activeOrderBook.Update(openOrder)
}

return nil
return errs
}

func (s *Strategy) syncActiveOrder(ctx context.Context, activeOrderBook *bbgo.ActiveOrderBook, orderID uint64) error {
updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, s.orderQueryService, types.OrderQuery{
Symbol: s.Symbol,
func syncActiveOrder(ctx context.Context, activeOrderBook *bbgo.ActiveOrderBook, orderQueryService types.ExchangeOrderQueryService, orderID uint64) error {
updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, orderQueryService, types.OrderQuery{
Symbol: activeOrderBook.Symbol,
OrderID: strconv.FormatUint(orderID, 10),
})

Expand Down
176 changes: 176 additions & 0 deletions pkg/strategy/grid2/active_order_recover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package grid2

import (
"context"
"strconv"
"testing"
"time"

"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/types/mocks"
"github.com/golang/mock/gomock"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
)

func TestSyncActiveOrders(t *testing.T) {
assert := assert.New(t)

mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

symbol := "ETHUSDT"
labels := prometheus.Labels{
"exchange": "default",
"symbol": symbol,
}
t.Run("all open orders are match with active orderbook", func(t *testing.T) {
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)

opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}

order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
}
order.Symbol = symbol

activeOrderbook.Add(order)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order}, nil)

assert.NoError(syncActiveOrders(ctx, opts))

// verify active orderbook
activeOrders := activeOrderbook.Orders()
assert.Equal(1, len(activeOrders))
assert.Equal(uint64(1), activeOrders[0].OrderID)
assert.Equal(types.OrderStatusNew, activeOrders[0].Status)
})

t.Run("there is order in active orderbook but not in open orders", func(t *testing.T) {
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)

opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}

order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
SubmitOrder: types.SubmitOrder{
Symbol: symbol,
},
}
updatedOrder := order
updatedOrder.Status = types.OrderStatusFilled

activeOrderbook.Add(order)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return(nil, nil)
mockOrderQueryService.EXPECT().QueryOrder(ctx, types.OrderQuery{
Symbol: symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),
}).Return(&updatedOrder, nil)

assert.NoError(syncActiveOrders(ctx, opts))

// verify active orderbook
activeOrders := activeOrderbook.Orders()
assert.Equal(0, len(activeOrders))
})

t.Run("there is order on open orders but not in active orderbook", func(t *testing.T) {
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)

opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}

order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
SubmitOrder: types.SubmitOrder{
Symbol: symbol,
},
CreationTime: types.Time(time.Now()),
}

mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order}, nil)
assert.NoError(syncActiveOrders(ctx, opts))

// verify active orderbook
activeOrders := activeOrderbook.Orders()
assert.Equal(1, len(activeOrders))
assert.Equal(uint64(1), activeOrders[0].OrderID)
assert.Equal(types.OrderStatusNew, activeOrders[0].Status)
})

t.Run("there is order on open order but not in active orderbook also order in active orderbook but not on open orders", func(t *testing.T) {
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)

opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}

order1 := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
SubmitOrder: types.SubmitOrder{
Symbol: symbol,
},
}
updatedOrder1 := order1
updatedOrder1.Status = types.OrderStatusFilled
order2 := types.Order{
OrderID: 2,
Status: types.OrderStatusNew,
SubmitOrder: types.SubmitOrder{
Symbol: symbol,
},
}

activeOrderbook.Add(order1)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order2}, nil)
mockOrderQueryService.EXPECT().QueryOrder(ctx, types.OrderQuery{
Symbol: symbol,
OrderID: strconv.FormatUint(order1.OrderID, 10),
}).Return(&updatedOrder1, nil)

assert.NoError(syncActiveOrders(ctx, opts))

// verify active orderbook
activeOrders := activeOrderbook.Orders()
assert.Equal(1, len(activeOrders))
assert.Equal(uint64(2), activeOrders[0].OrderID)
assert.Equal(types.OrderStatusNew, activeOrders[0].Status)
})
}
6 changes: 0 additions & 6 deletions pkg/strategy/grid2/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -206,7 +205,6 @@ type Strategy struct {
tradingCtx, writeCtx context.Context
cancelWrite context.CancelFunc

recovered int32
activeOrdersRecoverCh chan struct{}

// this ensures that bbgo.Sync to lock the object
Expand Down Expand Up @@ -2028,10 +2026,6 @@ func (s *Strategy) startProcess(ctx context.Context, session *bbgo.ExchangeSessi
}

func (s *Strategy) recoverGrid(ctx context.Context, session *bbgo.ExchangeSession) error {
defer func() {
atomic.AddInt32(&s.recovered, 1)
}()

if s.RecoverGridByScanningTrades {
s.debugLog("recovering grid by scanning trades")
return s.recoverByScanningTrades(ctx, session)
Expand Down

0 comments on commit c544937

Please sign in to comment.