Skip to content

Commit

Permalink
Merge pull request #1863 from c9s/narumi/sentinel
Browse files Browse the repository at this point in the history
STRATEGY: Detect abnormal volume increases using isolation forest
  • Loading branch information
narumiruna authored Dec 25, 2024
2 parents 40bc0ce + c8d78a7 commit b07c26a
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 0 deletions.
19 changes: 19 additions & 0 deletions config/sentinel.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
sessions:
max:
exchange: &exchange max
envVarPrefix: max

persistence:
json:
directory: var/data
redis:
host: 127.0.0.1
port: 6379
db: 0

exchangeStrategies:
- on: *exchange
sentinel:
symbol: BTCUSDT
interval: 1m
scoreThreshold: 0.6
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/narumiruna/go-iforest v0.2.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ github.com/muesli/kmeans v0.3.0/go.mod h1:eNyybq0tX9/iBEP6EMU4Y7dpmGK0uEhODdZpnG
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/narumiruna/go-iforest v0.2.2 h1:48GGRVLSlgtV3vGr+eedXODn5RT3WvYroqpMNEoQvkk=
github.com/narumiruna/go-iforest v0.2.2/go.mod h1:2pumoiqKf0Lr+KvLECMC8uNrbRkxtSvUwMJC/6AW7DM=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/strategy/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
_ "github.com/c9s/bbgo/pkg/strategy/rsmaker"
_ "github.com/c9s/bbgo/pkg/strategy/schedule"
_ "github.com/c9s/bbgo/pkg/strategy/scmaker"
_ "github.com/c9s/bbgo/pkg/strategy/sentinel"
_ "github.com/c9s/bbgo/pkg/strategy/supertrend"
_ "github.com/c9s/bbgo/pkg/strategy/support"
_ "github.com/c9s/bbgo/pkg/strategy/swing"
Expand Down
18 changes: 18 additions & 0 deletions pkg/datatype/floats/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ func (s Slice) Mean() (mean float64) {
return s.Sum() / float64(length)
}

func (s Slice) Var() float64 {
length := len(s)
if length == 0 {
panic("zero length slice")
}

mean := s.Mean()
variance := 0.0
for _, v := range s {
variance += (v - mean) * (v - mean)
}
return variance / float64(length)
}

func (s Slice) Std() float64 {
return math.Sqrt(s.Var())
}

func (s Slice) Tail(size int) Slice {
length := len(s)
if length <= size {
Expand Down
161 changes: 161 additions & 0 deletions pkg/strategy/sentinel/strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package sentinel

import (
"context"
"time"

"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/datatype/floats"
"github.com/c9s/bbgo/pkg/types"
"github.com/narumiruna/go-iforest/pkg/iforest"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)

const ID = "sentinel"

func init() {
bbgo.RegisterStrategy(ID, &Strategy{})
}

type Strategy struct {
Symbol string `json:"symbol"`
Interval types.Interval `json:"interval"`
ScoreThreshold float64 `json:"scoreThreshold"`
KLineLimit int `json:"klineLimit"`
Window int `json:"window"`

IsolationForest *iforest.IsolationForest `json:"isolationForest"`
NotificationInterval time.Duration `json:"notificationInterval"`
RetrainingInterval time.Duration `json:"retrainingInterval"`

notificationRateLimiter *rate.Limiter
retrainingRateLimiter *rate.Limiter
}

func (s *Strategy) ID() string {
return ID
}

func (s *Strategy) Defaults() error {
if s.ScoreThreshold == 0 {
s.ScoreThreshold = 0.6
}

if s.KLineLimit == 0 {
s.KLineLimit = 1440
}

if s.Window == 0 {
s.Window = 60
}

if s.NotificationInterval == 0 {
s.NotificationInterval = 10 * time.Minute
}

if s.RetrainingInterval == 0 {
s.RetrainingInterval = 1 * time.Hour
}

s.notificationRateLimiter = rate.NewLimiter(rate.Every(s.NotificationInterval), 1)
s.retrainingRateLimiter = rate.NewLimiter(rate.Every(s.RetrainingInterval), 1)
return nil
}

func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval})
}

func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
session.MarketDataStream.OnKLine(types.KLineWith(s.Symbol, s.Interval, func(kline types.KLine) {
if !s.isMarketAvailable(session, s.Symbol) {
return
}

klines, err := s.queryKLines(ctx, session)
if err != nil {
log.Errorf("Unable to query klines: %v", err)
return
}

volumes := s.extractVolumes(klines)
samples := s.generateSamples(volumes)

if s.shouldSkipIsolationForest(volumes, samples) {
s.logSkipIsolationForest(samples, volumes, kline)
return
}

s.fitIsolationForest(samples)
scores := s.IsolationForest.Score(samples)
s.notifyOnIsolationForestScore(scores, kline)
}))
return nil
}

func (s *Strategy) isMarketAvailable(session *bbgo.ExchangeSession, symbol string) bool {
_, ok := session.Market(symbol)
return ok
}

func (s *Strategy) queryKLines(ctx context.Context, session *bbgo.ExchangeSession) ([]types.KLine, error) {
endTime := time.Now()
options := types.KLineQueryOptions{
Limit: s.KLineLimit,
EndTime: &endTime,
}
return session.Exchange.QueryKLines(ctx, s.Symbol, s.Interval, options)
}

func (s *Strategy) extractVolumes(klines []types.KLine) floats.Slice {
volumes := floats.Slice{}
for _, kline := range klines {
volumes.Push(kline.Volume.Float64())
}
return volumes
}

func (s *Strategy) generateSamples(volumes floats.Slice) [][]float64 {
samples := make([][]float64, 0, len(volumes))
for i := range volumes {
if i < s.Window {
continue
}

subset := volumes.Tail(s.Window)

mean := subset.Mean()
std := subset.Std()
samples = append(samples, []float64{mean, std})
}
return samples
}

func (s *Strategy) shouldSkipIsolationForest(volumes floats.Slice, samples [][]float64) bool {
volumeMean := volumes.Mean()
lastMovingMean := samples[len(samples)-1][0]
return lastMovingMean < volumeMean
}

func (s *Strategy) logSkipIsolationForest(samples [][]float64, volumes floats.Slice, kline types.KLine) {
log.Infof("Skipping isolation forest calculation for symbol: %s, last moving mean: %f, average volume: %f, kline: %s", s.Symbol, samples[len(samples)-1][0], volumes.Mean(), kline.String())
}

func (s *Strategy) fitIsolationForest(samples [][]float64) {
if s.retrainingRateLimiter.Allow() {
s.IsolationForest = iforest.New()
s.IsolationForest.Fit(samples)
log.Infof("Isolation forest fitted with %d samples and %d/%d trees", len(samples), len(s.IsolationForest.Trees), s.IsolationForest.NumTrees)
}
}

func (s *Strategy) notifyOnIsolationForestScore(scores []float64, kline types.KLine) {
lastScore := scores[len(scores)-1]
log.Warnf("Symbol: %s, isolation forest score: %f, threshold: %f, kline: %s", s.Symbol, lastScore, s.ScoreThreshold, kline.String())
if lastScore > s.ScoreThreshold {
if s.notificationRateLimiter.Allow() {
bbgo.Notify("symbol: %s isolation forest score: %f", s.Symbol, lastScore)
}
}
}

0 comments on commit b07c26a

Please sign in to comment.