Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Candle missing #11735

Merged
merged 3 commits into from
Oct 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 142 additions & 87 deletions datanode/candlesv2/candle_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@ package candlesv2

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"code.vegaprotocol.io/vega/datanode/entities"
"code.vegaprotocol.io/vega/logging"
)

var ErrNewSubscriberNotReady = errors.New("new subscriber was not ready to receive the last candle data")

type candleSource interface {
GetCandleDataForTimeSpan(ctx context.Context, candleID string, from *time.Time, to *time.Time,
p entities.CursorPagination) ([]entities.Candle, entities.PageInfo, error)
Expand All @@ -45,23 +49,28 @@ func (m subscriptionMsg) String() string {
}

type CandleUpdates struct {
log *logging.Logger
candleSource candleSource
candleID string
subscriptionMsgChan chan subscriptionMsg
nextSubscriptionID atomic.Uint64
config CandleUpdatesConfig
log *logging.Logger
candleSource candleSource
candleID string
subscriptionMsgCh chan subscriptionMsg
nextSubscriptionID atomic.Uint64
config CandleUpdatesConfig
subs map[string]chan entities.Candle
mu *sync.RWMutex
lastCandle *entities.Candle
}

func NewCandleUpdates(ctx context.Context, log *logging.Logger, candleID string, candleSource candleSource,
config CandleUpdatesConfig,
) *CandleUpdates {
ces := &CandleUpdates{
log: log,
candleSource: candleSource,
candleID: candleID,
config: config,
subscriptionMsgChan: make(chan subscriptionMsg, config.CandleUpdatesStreamSubscriptionMsgBufferSize),
log: log,
candleSource: candleSource,
candleID: candleID,
config: config,
subscriptionMsgCh: make(chan subscriptionMsg, config.CandleUpdatesStreamSubscriptionMsgBufferSize),
subs: map[string]chan entities.Candle{},
mu: &sync.RWMutex{},
}

go ces.run(ctx)
Expand All @@ -70,130 +79,175 @@ func NewCandleUpdates(ctx context.Context, log *logging.Logger, candleID string,
}

func (s *CandleUpdates) run(ctx context.Context) {
subscriptions := map[string]chan entities.Candle{}
defer closeAllSubscriptions(subscriptions)
defer s.closeAllSubscriptions()

ticker := time.NewTicker(s.config.CandleUpdatesStreamInterval.Duration)
defer ticker.Stop()
var lastCandle *entities.Candle

errorGettingCandleUpdates := false
candleUpdatesFailed := false
updateCandles := func(now time.Time) *entities.Candle {
// no subscriptions, don't update candles and remove last candle.
if len(s.subs) == 0 {
return nil
}
candles, err := s.getCandleUpdates(ctx, now)
if err != nil {
if !candleUpdatesFailed {
s.log.Error("Failed to get candles for candle id", logging.String("candle", s.candleID), logging.Error(err))
}
candleUpdatesFailed = true
return s.lastCandle // keep last candle we successfully obtained
}
if candleUpdatesFailed {
s.log.Info("Successfully got candles for candle id", logging.String("candle", s.candleID))
candleUpdatesFailed = false
}
if len(candles) == 0 {
return s.lastCandle // no new data, just keep the reference to the last candle we had
}
// send the new data to all subscribers.
_ = s.sendCandlesToSubscribers(candles, s.subs)
// find the most recent, non zero candle as the last candle we know exists
for i := len(candles) - 1; i >= 0; i-- {
last := candles[i]
if !last.High.IsZero() && !last.Low.IsZero() {
return &last
}
}
// if no last candle was found, the last candle remains whatever s.lastCandle was
return s.lastCandle
}
for {
select {
case <-ctx.Done():
return
case subscriptionMsg := <-s.subscriptionMsgChan:
subscriptions = s.handleSubscription(subscriptions, subscriptionMsg, lastCandle)
case subscriptionMsg := <-s.subscriptionMsgCh:
s.mu.Lock()
s.handleSubscription(subscriptionMsg)
s.mu.Unlock()
case now := <-ticker.C:
if len(subscriptions) == 0 {
lastCandle = nil
continue
}
candles, err := s.getCandleUpdates(ctx, lastCandle, now)
if err != nil {
if !errorGettingCandleUpdates {
s.log.Errorf("failed to get candles for candle id", logging.String("candle", s.candleID), logging.Error(err))
}
errorGettingCandleUpdates = true
continue
}
if errorGettingCandleUpdates {
s.log.Infof("Successfully got candles for candle", logging.String("candle", s.candleID))
errorGettingCandleUpdates = false
}
if len(candles) > 0 {
lastCandle = &candles[len(candles)-1]
}
subscriptions = s.sendCandlesToSubscribers(candles, subscriptions)
s.mu.RLock()
s.lastCandle = updateCandles(now)
s.mu.RUnlock()
}
}
}

func (s *CandleUpdates) handleSubscription(subscriptions map[string]chan entities.Candle, subscription subscriptionMsg, lastCandle *entities.Candle) map[string]chan entities.Candle {
func (s *CandleUpdates) handleSubscription(subscription subscriptionMsg) {
if subscription.subscribe {
return s.addSubscription(subscriptions, subscription, lastCandle)
s.addSubscription(subscription)
return
}
return removeSubscription(subscriptions, subscription.id)
s.removeSubscription(subscription.id)
}

func (s *CandleUpdates) addSubscription(subscriptions map[string]chan entities.Candle, subscription subscriptionMsg, lastCandle *entities.Candle) map[string]chan entities.Candle {
if lastCandle != nil {
if rm := s.sendCandlesToSubscribers([]entities.Candle{*lastCandle}, map[string]chan entities.Candle{subscription.id: subscription.out}); len(rm) == 0 {
// try to send the last candle data to the new subscription, if it fails, don't update the map
return subscriptions
}
func (s *CandleUpdates) addSubscription(subscription subscriptionMsg) {
if s.lastCandle == nil {
s.subs[subscription.id] = subscription.out
return
}
newSub := map[string]chan entities.Candle{
subscription.id: subscription.out,
}
if rm := s.sendCandlesToSubscribers([]entities.Candle{*s.lastCandle}, newSub); len(rm) == 0 {
s.subs[subscription.id] = subscription.out
}
subscriptions[subscription.id] = subscription.out
return subscriptions
}

func removeSubscription(subscriptions map[string]chan entities.Candle, subscriptionID string) map[string]chan entities.Candle {
if ch, ok := subscriptions[subscriptionID]; ok {
// first delete
delete(subscriptions, subscriptionID)
// then close
func (s *CandleUpdates) removeSubscription(id string) {
// no lock acquired, the map HAS to be locked when this function is called.
if ch, ok := s.subs[id]; ok {
close(ch)
delete(s.subs, id)
}
return subscriptions
}

func closeAllSubscriptions(subscribers map[string]chan entities.Candle) {
for _, subscriber := range subscribers {
func (s *CandleUpdates) closeAllSubscriptions() {
s.mu.Lock()
s.lastCandle = nil
for _, subscriber := range s.subs {
close(subscriber)
}
s.mu.Unlock()
}

// Subscribe returns a unique subscription id and channel on which updates will be sent.
func (s *CandleUpdates) Subscribe() (string, <-chan entities.Candle, error) {
out := make(chan entities.Candle, s.config.CandleUpdatesStreamBufferSize)

nextID := s.nextSubscriptionID.Add(1)
subscriptionID := fmt.Sprintf("%s-%d", s.candleID, nextID)
id := fmt.Sprintf("%s-%d", s.candleID, nextID)
var err error

if s.config.CandleUpdatesStreamSubscriptionMsgBufferSize == 0 {
// immediately add, acquire the lock and add to the map.
s.mu.Lock()
defer s.mu.Unlock()
s.subs[id] = out
// we have some data to send, then try this immediately
if s.lastCandle != nil {
newSub := map[string]chan entities.Candle{
id: out,
}
// try to send the last candle to the new subscriber, this will remove the last sub
// and close the channel if the send fails.
if rm := s.sendCandlesToSubscribers([]entities.Candle{*s.lastCandle}, newSub); len(rm) != 0 {
// if rm is not empty, the new subscriber was removed, and the channel was closed.
return "", nil, ErrNewSubscriberNotReady
}
}
return id, out, nil
}
msg := subscriptionMsg{
subscribe: true,
id: subscriptionID,
id: id,
out: out,
}

err := s.sendSubscriptionMessage(msg)
err = s.sendSubscriptionMessage(msg)
if err != nil {
return "", nil, err
}

return subscriptionID, out, nil
return id, out, nil
}

func (s *CandleUpdates) Unsubscribe(subscriptionID string) error {
func (s *CandleUpdates) Unsubscribe(id string) error {
if s.config.CandleUpdatesStreamSubscriptionMsgBufferSize == 0 {
// instantly unsubscribe, acquire the lock and remove from the map
s.mu.Lock()
if ch, ok := s.subs[id]; ok {
close(ch)
delete(s.subs, id)
}
s.mu.Unlock()
return nil
}
msg := subscriptionMsg{
subscribe: false,
id: subscriptionID,
id: id,
}

return s.sendSubscriptionMessage(msg)
}

func (s *CandleUpdates) sendSubscriptionMessage(msg subscriptionMsg) error {
if s.config.CandleUpdatesStreamSubscriptionMsgBufferSize == 0 {
s.subscriptionMsgChan <- msg
} else {
select {
case s.subscriptionMsgChan <- msg:
default:
return fmt.Errorf("failed to send subscription message \"%s\", subscription message buffer is full, try again later", msg)
}
select {
case s.subscriptionMsgCh <- msg:
return nil
default:
return fmt.Errorf("failed to send subscription message \"%s\", subscription message buffer is full, try again later", msg)
}
return nil
}

func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entities.Candle, now time.Time) ([]entities.Candle, error) {
func (s *CandleUpdates) getCandleUpdates(ctx context.Context, now time.Time) ([]entities.Candle, error) {
ctx, cancelFn := context.WithTimeout(ctx, s.config.CandlesFetchTimeout.Duration)
defer cancelFn()

var updates []entities.Candle
var err error
if lastCandle != nil {
start := lastCandle.PeriodStart
if s.lastCandle != nil {
start := s.lastCandle.PeriodStart
var candles []entities.Candle
candles, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, &start, &now, entities.CursorPagination{})

Expand All @@ -202,36 +256,37 @@ func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entiti
}

// allocate slice rather than doubling cap as we go.
updates = make([]entities.Candle, 0, len(candles))
updates = make([]entities.Candle, 0, len(candles)+1)
for _, candle := range candles {
// not before so either newer, or the same (last) candle should be returned.
if !candle.LastUpdateInPeriod.Before(lastCandle.LastUpdateInPeriod) || !candle.PeriodStart.Before(lastCandle.PeriodStart) {
// last candle or newer should be considered an update.
if !candle.PeriodStart.Before(s.lastCandle.PeriodStart) {
updates = append(updates, candle)
}
}
} else {
updates, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, nil, &now, entities.CursorPagination{})
return updates, nil
}
updates, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, nil, &now, entities.CursorPagination{})

if err != nil {
return nil, fmt.Errorf("getting candle updates:%w", err)
}
if err != nil {
return nil, fmt.Errorf("getting candle updates:%w", err)
}

return updates, nil
}

func (s *CandleUpdates) sendCandlesToSubscribers(candles []entities.Candle, subscriptions map[string]chan entities.Candle) map[string]chan entities.Candle {
ret := subscriptions
for subscriptionID, outCh := range subscriptions {
func (s *CandleUpdates) sendCandlesToSubscribers(candles []entities.Candle, subscriptions map[string]chan entities.Candle) []string {
rm := make([]string, 0, len(subscriptions))
for id, outCh := range subscriptions {
loop:
for _, candle := range candles {
select {
case outCh <- candle:
default:
ret = removeSubscription(ret, subscriptionID)
rm = append(rm, id)
s.removeSubscription(id)
break loop
}
}
}
return ret
return rm
}
Loading