Skip to content

Commit

Permalink
Merge pull request lightninglabs#266 from halseth/active-traders-pred…
Browse files Browse the repository at this point in the history
…icate

Active traders predicate
  • Loading branch information
halseth authored Feb 23, 2021
2 parents ad29b5b + 3d26e45 commit 942bcd6
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 81 deletions.
6 changes: 5 additions & 1 deletion auctioneer.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ type AuctioneerConfig struct {
// automatically clear the above TraderRejected map.
TraderRejectResetInterval time.Duration

// TraderOnline is an order filter that is used to filter out offline
// traders before we start match making.
TraderOnline matching.OrderFilter

// RatingsAgency if non-nil, will be used as an extract matching
// predicate when doing match making.
RatingsAgency ratings.Agency
Expand Down Expand Up @@ -1554,7 +1558,7 @@ func (a *Auctioneer) stateStep(currentState AuctionState, // nolint:gocyclo
)
filterChain := []matching.OrderFilter{
matching.NewBatchFeeRateFilter(s.batchFeeRate),
accountFilter,
accountFilter, a.cfg.TraderOnline,
}

// We pass in our two conflict handlers that also act as match
Expand Down
11 changes: 8 additions & 3 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ type rpcServer struct {
snapshotCache map[orderT.BatchID]*subastadb.BatchSnapshot
snapshotCacheMtx sync.Mutex

// activeTraders is a map where we'll add/remove traders as they come
// and go.
activeTraders *activeTradersMap

// connectedStreams is the list of all currently connected
// bi-directional update streams. Each trader has exactly one stream
// but can subscribe to updates for multiple accounts through the same
Expand All @@ -183,7 +187,7 @@ func newRPCServer(store subastadb.Store, signer lndclient.SignerClient,
ratingAgency ratings.Agency, ratingsDB ratings.NodeRatingsDatabase,
listener, restListener net.Listener, serverOpts []grpc.ServerOption,
restProxyCertOpt grpc.DialOption,
subscribeTimeout time.Duration) *rpcServer {
subscribeTimeout time.Duration, activeTraders *activeTradersMap) *rpcServer {

return &rpcServer{
grpcServer: grpc.NewServer(serverOpts...),
Expand All @@ -204,6 +208,7 @@ func newRPCServer(store subastadb.Store, signer lndclient.SignerClient,
subscribeTimeout: subscribeTimeout,
ratingAgency: ratingAgency,
ratingsDB: ratingsDB,
activeTraders: activeTraders,
}
}

Expand Down Expand Up @@ -1392,7 +1397,7 @@ func (s *rpcServer) addStreamSubscription(traderID lsat.TokenID,
// There's no subscription for that account yet, notify our batch
// executor that the trader for a certain account is now connected.
trader.Subscriptions[newSub.AccountKey] = newSub
err := s.batchExecutor.RegisterTrader(newSub)
err := s.activeTraders.RegisterTrader(newSub)
if err != nil {
return fmt.Errorf("error registering trader at venue: %v", err)
}
Expand Down Expand Up @@ -1421,7 +1426,7 @@ func (s *rpcServer) disconnectTrader(traderID lsat.TokenID) error {
for acctKey, trader := range subscriptions {
monitoring.ObserveFailedConnection(acctKey)

err := s.batchExecutor.UnregisterTrader(trader)
err := s.activeTraders.UnregisterTrader(trader)
if err != nil {
return fmt.Errorf("error unregistering"+
"trader at venue: %v", err)
Expand Down
6 changes: 5 additions & 1 deletion rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,13 +450,17 @@ func TestRPCServerBatchAuctionStreamInitialTimeout(t *testing.T) {
}

func newServer(store subastadb.Store) *rpcServer {
activeTraders := &activeTradersMap{
activeTraders: make(map[matching.AccountID]*venue.ActiveTrader),
}
batchExecutor := venue.NewBatchExecutor(&venue.ExecutorConfig{
Store: &executorStore{
Store: store,
},
Signer: mockSigner,
BatchStorer: venue.NewExeBatchStorer(store),
TraderMsgTimeout: time.Second * 15,
ActiveTraders: activeTraders.GetTraders,
})

return newRPCServer(
Expand All @@ -465,7 +469,7 @@ func newServer(store subastadb.Store) *rpcServer {
OrderExecBaseFee: 1,
OrderExecFeeRate: 100,
}, nil, nil, bufconn.Listen(100), bufconn.Listen(100), nil, nil,
defaultTimeout,
defaultTimeout, activeTraders,
)
}

Expand Down
69 changes: 69 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,63 @@ func (e *executorStore) UpdateExecutionState(newState venue.ExecutionState) erro
return nil
}

type activeTradersMap struct {
activeTraders map[matching.AccountID]*venue.ActiveTrader
sync.RWMutex
}

// RegisterTrader registers a new trader as being active. An active traders is
// eligible to join execution of a batch that they're a part of.
func (a *activeTradersMap) RegisterTrader(t *venue.ActiveTrader) error {
a.Lock()
defer a.Unlock()

_, ok := a.activeTraders[t.AccountKey]
if ok {
return fmt.Errorf("trader %x already registered",
t.AccountKey)
}
a.activeTraders[t.AccountKey] = t

log.Infof("Registering new trader: %x", t.AccountKey[:])

return nil
}

// UnregisterTrader removes a registered trader from the batch.
func (a *activeTradersMap) UnregisterTrader(t *venue.ActiveTrader) error {
a.Lock()
defer a.Unlock()

delete(a.activeTraders, t.AccountKey)

log.Infof("Disconnecting trader: %x", t.AccountKey[:])
return nil
}

// IsActive returns true if the given key is among the active traders.
func (a *activeTradersMap) IsActive(acctKey [33]byte) bool {
a.RLock()
defer a.RUnlock()

_, ok := a.activeTraders[acctKey]
return ok
}

// GetTrades returns the current set of active traders.
func (a *activeTradersMap) GetTraders() map[matching.AccountID]*venue.ActiveTrader {
a.RLock()
defer a.RUnlock()

c := make(map[matching.AccountID]*venue.ActiveTrader, len(a.activeTraders))

for k, v := range a.activeTraders {
c[k] = v
}

return c
}

var _ venue.ExecutorStore = (*executorStore)(nil)

// Server is the main auction auctioneer server.
Expand All @@ -146,6 +203,11 @@ type Server struct {

batchExecutor *venue.BatchExecutor

// activeTraders is a map of all the current active traders. An active
// trader is one that's online and has a live communication channel
// with the BatchExecutor.
activeTraders *activeTradersMap

auctioneer *Auctioneer

channelEnforcer *chanenforcement.ChannelEnforcer
Expand Down Expand Up @@ -232,12 +294,16 @@ func NewServer(cfg *Config) (*Server, error) {
exeStore := &executorStore{
Store: store,
}
activeTraders := &activeTradersMap{
activeTraders: make(map[matching.AccountID]*venue.ActiveTrader),
}
batchExecutor := venue.NewBatchExecutor(&venue.ExecutorConfig{
Store: exeStore,
Signer: lnd.Signer,
BatchStorer: venue.NewExeBatchStorer(store),
AccountWatcher: accountManager,
TraderMsgTimeout: defaultMsgTimeout,
ActiveTraders: activeTraders.GetTraders,
})

durationBuckets := order.NewDurationBuckets()
Expand Down Expand Up @@ -297,6 +363,7 @@ func NewServer(cfg *Config) (*Server, error) {
accountManager: accountManager,
orderBook: orderBook,
batchExecutor: batchExecutor,
activeTraders: activeTraders,
auctioneer: NewAuctioneer(AuctioneerConfig{
DB: newAuctioneerStore(store),
ChainNotifier: lnd.ChainNotifier,
Expand Down Expand Up @@ -340,6 +407,7 @@ func NewServer(cfg *Config) (*Server, error) {
FundingConflictsResetInterval: cfg.FundingConflictResetInterval,
TraderRejected: traderRejected,
TraderRejectResetInterval: cfg.TraderRejectResetInterval,
TraderOnline: matching.NewTraderOnlineFilter(activeTraders.IsActive),
RatingsAgency: ratingsAgency,
}),
channelEnforcer: channelEnforcer,
Expand Down Expand Up @@ -425,6 +493,7 @@ func NewServer(cfg *Config) (*Server, error) {
server.orderBook, batchExecutor, server.auctioneer,
auctionTerms, ratingsAgency, ratingsDB, grpcListener,
restListener, serverOpts, clientCertOpt, cfg.SubscribeTimeout,
activeTraders,
)
server.rpcServer = auctioneerServer
cfg.Prometheus.PublicRPCServer = auctioneerServer.grpcServer
Expand Down
Loading

0 comments on commit 942bcd6

Please sign in to comment.