From 19713e352f3a9c666ab59e5d4c0b041c7b217796 Mon Sep 17 00:00:00 2001 From: "Alan.sung" Date: Mon, 2 Oct 2023 14:26:18 +0800 Subject: [PATCH] first step: add websocket.go and demo account channel use new writing style --- pkg/exchange/okex/convert.go | 14 +--- pkg/exchange/okex/parse.go | 60 +++++++++---- pkg/exchange/okex/stream.go | 28 ++----- pkg/exchange/okex/websocket.go | 149 +++++++++++++++++++++++++++++++++ 4 files changed, 203 insertions(+), 48 deletions(-) create mode 100644 pkg/exchange/okex/websocket.go diff --git a/pkg/exchange/okex/convert.go b/pkg/exchange/okex/convert.go index 193ba233e1..896bfc6b4f 100644 --- a/pkg/exchange/okex/convert.go +++ b/pkg/exchange/okex/convert.go @@ -55,12 +55,6 @@ func toGlobalBalance(account *okexapi.Account) types.BalanceMap { return balanceMap } -type WebsocketSubscription struct { - Channel string `json:"channel"` - InstrumentID string `json:"instId,omitempty"` - InstrumentType string `json:"instType,omitempty"` -} - var CandleChannels = []string{ "candle1Y", "candle6M", "candle3M", "candle1M", @@ -70,19 +64,19 @@ var CandleChannels = []string{ "candle30m", "candle15m", "candle5m", "candle3m", "candle1m", } -func convertIntervalToCandle(interval types.Interval) string { +func convertIntervalToCandle(interval types.Interval) WebSocketChannelType { s := interval.String() switch s { case "1h", "2h", "4h", "6h", "12h", "1d", "3d": - return "candle" + strings.ToUpper(s) + return WebSocketChannelType("candle" + strings.ToUpper(s)) case "1m", "5m", "15m", "30m": - return "candle" + s + return WebSocketChannelType("candle" + s) } - return "candle" + s + return WebSocketChannelType("candle" + s) } func convertSubscription(s types.Subscription) (WebsocketSubscription, error) { diff --git a/pkg/exchange/okex/parse.go b/pkg/exchange/okex/parse.go index 93a6b0c938..4550d158c2 100644 --- a/pkg/exchange/okex/parse.go +++ b/pkg/exchange/okex/parse.go @@ -15,23 +15,50 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -func parseWebSocketEvent(str []byte) (interface{}, error) { - v, err := fastjson.ParseBytes(str) +func parseWebSocketEvent(in []byte) (interface{}, error) { + var e WsEvent + + err := json.Unmarshal(in, &e) if err != nil { return nil, err } + switch { + case e.IsOp(): + return e.WebSocketOpEvent, nil + case e.IsPushDataEvent(): + // need unmarshal again because arg in both WebSocketOpEvent and WebSocketPushDataEvent + var pushDataEvent WebSocketPushDataEvent + err := json.Unmarshal(in, &pushDataEvent) + if err != nil { + return nil, err + } + channel := pushDataEvent.Arg.Channel - if v.Exists("event") { - return parseEvent(v) - } + // keep old style first, just demo account channel, account channel use new writing style. + v, err := fastjson.ParseBytes(in) + if err != nil { + return nil, err + } - if v.Exists("data") { - return parseData(v) + switch channel { + + case WsChannelTypeAccount: + return parseAccount(&pushDataEvent) + case WsChannelTypeOrders: + return parseData(v) + default: + if strings.HasPrefix(string(channel), "candle") { + return parseData(v) + } + if strings.HasPrefix(string(channel), "books") { + return parseData(v) + } + } } - - return nil, nil + return nil, fmt.Errorf("unhandled websocket event: %s", string(in)) } +// Will be replace with WebSocketOpEvent in the future type WebSocketEvent struct { Event string `json:"event"` Code string `json:"code,omitempty"` @@ -61,7 +88,7 @@ type BookEvent struct { Asks []BookEntry MillisecondTimestamp int64 Checksum int - channel string + channel WebSocketChannelType } func (data *BookEvent) BookTicker() types.BookTicker { @@ -299,12 +326,11 @@ func parseCandle(channel string, v *fastjson.Value) (*Candle, error) { }, nil } -func parseAccount(v *fastjson.Value) (*okexapi.Account, error) { - data := v.Get("data").MarshalTo(nil) +func parseAccount(v *WebSocketPushDataEvent) (*okexapi.Account, error) { + data := v.Data var accounts []okexapi.Account - err := json.Unmarshal(data, &accounts) - if err != nil { + if err := json.Unmarshal(data, &accounts); err != nil { return nil, err } @@ -334,14 +360,12 @@ func parseData(v *fastjson.Value) (interface{}, error) { switch channel { case "books5": data, err := parseBookData(v) - data.channel = channel + data.channel = WebSocketChannelType(channel) return data, err case "books": data, err := parseBookData(v) - data.channel = channel + data.channel = WebSocketChannelType(channel) return data, err - case "account": - return parseAccount(v) case "orders": return parseOrder(v) default: diff --git a/pkg/exchange/okex/stream.go b/pkg/exchange/okex/stream.go index 7d7c7a77eb..20b86ac9e4 100644 --- a/pkg/exchange/okex/stream.go +++ b/pkg/exchange/okex/stream.go @@ -9,18 +9,6 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -type WebsocketOp struct { - Op string `json:"op"` - Args interface{} `json:"args"` -} - -type WebsocketLogin struct { - Key string `json:"apiKey"` - Passphrase string `json:"passphrase"` - Timestamp string `json:"timestamp"` - Sign string `json:"sign"` -} - //go:generate callbackgen -type Stream -interface type Stream struct { types.StandardStream @@ -80,7 +68,7 @@ func (s *Stream) handleConnect() { log.Infof("subscribing channels: %+v", subs) err := s.Conn.WriteJSON(WebsocketOp{ - Op: "subscribe", + Op: WsOpTypeSubscribe, Args: subs, }) @@ -95,8 +83,8 @@ func (s *Stream) handleConnect() { payload := msTimestamp + "GET" + "/users/self/verify" sign := okexapi.Sign(payload, s.client.Secret) op := WebsocketOp{ - Op: "login", - Args: []WebsocketLogin{ + Op: WsOpTypeLogin, + Args: []WebsocketSubscription{ { Key: s.client.Key, Passphrase: s.client.Passphrase, @@ -116,17 +104,17 @@ func (s *Stream) handleConnect() { func (s *Stream) handleEvent(event WebSocketEvent) { switch event.Event { - case "login": + case string(WsOpTypeLogin): if event.Code == "0" { s.EmitAuth() var subs = []WebsocketSubscription{ - {Channel: "account"}, - {Channel: "orders", InstrumentType: string(okexapi.InstrumentTypeSpot)}, + {Channel: WsChannelTypeAccount}, + {Channel: WsChannelTypeOrders, InstrumentType: okexapi.InstrumentTypeSpot}, } log.Infof("subscribing private channels: %+v", subs) err := s.Conn.WriteJSON(WebsocketOp{ - Op: "subscribe", + Op: WsOpTypeSubscribe, Args: subs, }) @@ -207,7 +195,7 @@ func (s *Stream) dispatchEvent(e interface{}) { case *BookEvent: // there's "books" for 400 depth and books5 for 5 depth - if et.channel != "books5" { + if et.channel != WsChannelTypeBooks5 { s.EmitBookEvent(*et) } s.EmitBookTickerUpdate(et.BookTicker()) diff --git a/pkg/exchange/okex/websocket.go b/pkg/exchange/okex/websocket.go new file mode 100644 index 0000000000..0f00c7f03e --- /dev/null +++ b/pkg/exchange/okex/websocket.go @@ -0,0 +1,149 @@ +package okex + +import ( + "encoding/json" + + "github.com/c9s/bbgo/pkg/exchange/okex/okexapi" + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" +) + +type WsEvent struct { + // "op" and "PushData" are exclusive. + *WebSocketOpEvent + *WebSocketPushDataEvent +} + +func (w *WsEvent) IsOp() bool { + return w.WebSocketOpEvent != nil && w.WebSocketPushDataEvent == nil +} + +func (w *WsEvent) IsPushDataEvent() bool { + return w.WebSocketOpEvent == nil && w.WebSocketPushDataEvent != nil +} + +type WsOpType string + +const ( + WsOpTypeLogin WsOpType = "login" + // subscribe and unsubscribe could be public or private, ex. private for chennel: orders + WsOpTypeSubscribe WsOpType = "subscribe" + WsOpTypeUnsubscribe WsOpType = "unsubscribe" + WsOpTypeOrder WsOpType = "order" + WsOpTypeBatchOrders WsOpType = "batch-orders" + WsOpTypeCancelOrder WsOpType = "cancel-order" + WsOpTypeBatchCancelOrders WsOpType = "batch-cancel-orders" + WsOpTypeAmendOrder WsOpType = "amend-order" + WsOpTypeBatchAmendOrders WsOpType = "batch-amend-orders" + WsOpTypeMassCancel WsOpType = "mass-cancel" + // below type exist only in response + WsOpTypeError WsOpType = "error" +) + +// Websocket Op +type WebsocketOp struct { + // id only applicable to private op, ex, order, batch-orders + Id string `json:"id,omitempty"` + Op WsOpType `json:"op"` + Args []WebsocketSubscription `json:"args"` +} + +// Websocket Op event +type WebSocketOpEvent struct { + // id only applicable to private op, ex, order, batch-orders + Id string `json:"id,omitempty"` + Op WsOpType `json:"op"` + Args []WebsocketSubscription `json:"args,omitempty"` + // Below is Websocket Response field + Event WsOpType `json:"event,omitempty"` + Code string `json:"code,omitempty"` + Message string `json:"msg,omitempty"` + Arg []WebsocketSubscription `json:"arg,omitempty"` +} + +// Websocket Response event for private channel +type WebSocketPrivateEvent struct { + Id string `json:"id"` + Op WsOpType `json:"op"` + Data json.RawMessage `json:"data"` + Code string `json:"code"` + Message string `json:"msg"` + InTime types.MillisecondTimestamp `json:"inTime,omitempty"` + OutTime types.MillisecondTimestamp `json:"outTime,omitempty"` +} + +// Websocket Push data event +type WebSocketPushDataEvent struct { + Arg WebsocketSubscription `json:"arg"` + Data json.RawMessage `json:"data"` + // action: snapshot, update, only applicable to : channel (books) + Action *string `json:"action,omitempty"` +} + +type WebSocketChannelType string + +const ( + // below channel need authenticated + WsChannelTypeAccount WebSocketChannelType = "account" + WsChannelTypePositions WebSocketChannelType = "positions" + WsChannelTypeBalanceAndPosition WebSocketChannelType = "balance_and_position" + WsChannelTypeLiquidationWarning WebSocketChannelType = "liquidation-warning" + WsChannelTypeAccountGreeks WebSocketChannelType = "account-greeks" + WsChannelTypeOrders WebSocketChannelType = "orders" + // below channel no need authenticated + WsChannelTypeTickers WebSocketChannelType = "tickers" + WsChannelTypeTrades WebSocketChannelType = "trades" + WsChannelTypeTradesAll WebSocketChannelType = "trades-all" + WsChannelTypeOptionTrades WebSocketChannelType = "option-trades" + WsChannelTypeBooks WebSocketChannelType = "books" + WsChannelTypeBooks5 WebSocketChannelType = "books5" + WsChannelTypeBooks50L2Tbt WebSocketChannelType = "books50-l2-tbt" + WsChannelTypeBooksL2Tbt WebSocketChannelType = "books-l2-tbt" + WsChannelTypeCandle3M WebSocketChannelType = "candle3M" + WsChannelTypeCandle1M WebSocketChannelType = "candle1M" + WsChannelTypeCandle1W WebSocketChannelType = "candle1W" + WsChannelTypeCandle1D WebSocketChannelType = "candle1D" + WsChannelTypeCandle2D WebSocketChannelType = "candle2D" + WsChannelTypeCandle3D WebSocketChannelType = "candle3D" + WsChannelTypeCandle5D WebSocketChannelType = "candle5D" + WsChannelTypeCandle12H WebSocketChannelType = "candle12H" + WsChannelTypeCandle6H WebSocketChannelType = "candle6H" + WsChannelTypeCandle4H WebSocketChannelType = "candle4H" + WsChannelTypeCandle2H WebSocketChannelType = "candle2H" + WsChannelTypeCandle1H WebSocketChannelType = "candle1H" + WsChannelTypeCandle30m WebSocketChannelType = "candle30m" + WsChannelTypeCandle15m WebSocketChannelType = "candle15m" + WsChannelTypeCandle5m WebSocketChannelType = "candle5m" + WsChannelTypeCandle3m WebSocketChannelType = "candle3m" + WsChannelTypeCandle1m WebSocketChannelType = "candle1m" + WsChannelTypeCandle1s WebSocketChannelType = "candle1s" +) + +func (w *WebSocketOpEvent) IsAuthenticated() bool { + return w.Op == WsOpTypeLogin && w.Code == "0" +} + +// login args +type WebsocketLogin struct { + Key string `json:"apiKey"` + Passphrase string `json:"passphrase"` + Timestamp string `json:"timestamp"` + Sign string `json:"sign"` +} + +// op args +type WebsocketSubscription struct { + Channel WebSocketChannelType `json:"channel"` + InstrumentID string `json:"instId,omitempty"` + InstrumentType okexapi.InstrumentType `json:"instType,omitempty"` + InstrumentFamily string `json:"instFamily,omitempty"` + Side string `json:"side,omitempty"` + TdMode string `json:"tdMode,omitempty"` + OrderType string `json:"ordType,omitempty"` + Quantity fixedpoint.Value `json:"sz,omitempty"` + // below for op login + Key string `json:"apiKey,omitempty"` + Passphrase string `json:"passphrase,omitempty"` + Timestamp string `json:"timestamp,omitempty"` + Sign string `json:"sign,omitempty"` +}