Skip to content

Commit

Permalink
first step: add websocket.go and demo account channel use new writing…
Browse files Browse the repository at this point in the history
… style
  • Loading branch information
Alan.sung authored and Alan.sung committed Oct 2, 2023
1 parent 51ac5dd commit 19713e3
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 48 deletions.
14 changes: 4 additions & 10 deletions pkg/exchange/okex/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ func toGlobalBalance(account *okexapi.Account) types.BalanceMap {
return balanceMap
}

type WebsocketSubscription struct {
Channel string `json:"channel"`
InstrumentID string `json:"instId,omitempty"`
InstrumentType string `json:"instType,omitempty"`
}

var CandleChannels = []string{
"candle1Y",
"candle6M", "candle3M", "candle1M",
Expand All @@ -70,19 +64,19 @@ var CandleChannels = []string{
"candle30m", "candle15m", "candle5m", "candle3m", "candle1m",
}

func convertIntervalToCandle(interval types.Interval) string {
func convertIntervalToCandle(interval types.Interval) WebSocketChannelType {
s := interval.String()
switch s {

case "1h", "2h", "4h", "6h", "12h", "1d", "3d":
return "candle" + strings.ToUpper(s)
return WebSocketChannelType("candle" + strings.ToUpper(s))

case "1m", "5m", "15m", "30m":
return "candle" + s
return WebSocketChannelType("candle" + s)

}

return "candle" + s
return WebSocketChannelType("candle" + s)
}

func convertSubscription(s types.Subscription) (WebsocketSubscription, error) {
Expand Down
60 changes: 42 additions & 18 deletions pkg/exchange/okex/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,50 @@ import (
"github.com/c9s/bbgo/pkg/types"
)

func parseWebSocketEvent(str []byte) (interface{}, error) {
v, err := fastjson.ParseBytes(str)
func parseWebSocketEvent(in []byte) (interface{}, error) {
var e WsEvent

err := json.Unmarshal(in, &e)
if err != nil {
return nil, err
}
switch {
case e.IsOp():
return e.WebSocketOpEvent, nil
case e.IsPushDataEvent():
// need unmarshal again because arg in both WebSocketOpEvent and WebSocketPushDataEvent
var pushDataEvent WebSocketPushDataEvent
err := json.Unmarshal(in, &pushDataEvent)
if err != nil {
return nil, err
}
channel := pushDataEvent.Arg.Channel

if v.Exists("event") {
return parseEvent(v)
}
// keep old style first, just demo account channel, account channel use new writing style.
v, err := fastjson.ParseBytes(in)
if err != nil {
return nil, err
}

if v.Exists("data") {
return parseData(v)
switch channel {

case WsChannelTypeAccount:
return parseAccount(&pushDataEvent)
case WsChannelTypeOrders:
return parseData(v)
default:
if strings.HasPrefix(string(channel), "candle") {
return parseData(v)
}
if strings.HasPrefix(string(channel), "books") {
return parseData(v)
}
}
}

return nil, nil
return nil, fmt.Errorf("unhandled websocket event: %s", string(in))
}

// Will be replace with WebSocketOpEvent in the future
type WebSocketEvent struct {
Event string `json:"event"`
Code string `json:"code,omitempty"`
Expand Down Expand Up @@ -61,7 +88,7 @@ type BookEvent struct {
Asks []BookEntry
MillisecondTimestamp int64
Checksum int
channel string
channel WebSocketChannelType
}

func (data *BookEvent) BookTicker() types.BookTicker {
Expand Down Expand Up @@ -299,12 +326,11 @@ func parseCandle(channel string, v *fastjson.Value) (*Candle, error) {
}, nil
}

func parseAccount(v *fastjson.Value) (*okexapi.Account, error) {
data := v.Get("data").MarshalTo(nil)
func parseAccount(v *WebSocketPushDataEvent) (*okexapi.Account, error) {
data := v.Data

var accounts []okexapi.Account
err := json.Unmarshal(data, &accounts)
if err != nil {
if err := json.Unmarshal(data, &accounts); err != nil {
return nil, err
}

Expand Down Expand Up @@ -334,14 +360,12 @@ func parseData(v *fastjson.Value) (interface{}, error) {
switch channel {
case "books5":
data, err := parseBookData(v)
data.channel = channel
data.channel = WebSocketChannelType(channel)
return data, err
case "books":
data, err := parseBookData(v)
data.channel = channel
data.channel = WebSocketChannelType(channel)
return data, err
case "account":
return parseAccount(v)
case "orders":
return parseOrder(v)
default:
Expand Down
28 changes: 8 additions & 20 deletions pkg/exchange/okex/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,6 @@ import (
"github.com/c9s/bbgo/pkg/types"
)

type WebsocketOp struct {
Op string `json:"op"`
Args interface{} `json:"args"`
}

type WebsocketLogin struct {
Key string `json:"apiKey"`
Passphrase string `json:"passphrase"`
Timestamp string `json:"timestamp"`
Sign string `json:"sign"`
}

//go:generate callbackgen -type Stream -interface
type Stream struct {
types.StandardStream
Expand Down Expand Up @@ -80,7 +68,7 @@ func (s *Stream) handleConnect() {

log.Infof("subscribing channels: %+v", subs)
err := s.Conn.WriteJSON(WebsocketOp{
Op: "subscribe",
Op: WsOpTypeSubscribe,
Args: subs,
})

Expand All @@ -95,8 +83,8 @@ func (s *Stream) handleConnect() {
payload := msTimestamp + "GET" + "/users/self/verify"
sign := okexapi.Sign(payload, s.client.Secret)
op := WebsocketOp{
Op: "login",
Args: []WebsocketLogin{
Op: WsOpTypeLogin,
Args: []WebsocketSubscription{
{
Key: s.client.Key,
Passphrase: s.client.Passphrase,
Expand All @@ -116,17 +104,17 @@ func (s *Stream) handleConnect() {

func (s *Stream) handleEvent(event WebSocketEvent) {
switch event.Event {
case "login":
case string(WsOpTypeLogin):
if event.Code == "0" {
s.EmitAuth()
var subs = []WebsocketSubscription{
{Channel: "account"},
{Channel: "orders", InstrumentType: string(okexapi.InstrumentTypeSpot)},
{Channel: WsChannelTypeAccount},
{Channel: WsChannelTypeOrders, InstrumentType: okexapi.InstrumentTypeSpot},
}

log.Infof("subscribing private channels: %+v", subs)
err := s.Conn.WriteJSON(WebsocketOp{
Op: "subscribe",
Op: WsOpTypeSubscribe,
Args: subs,
})

Expand Down Expand Up @@ -207,7 +195,7 @@ func (s *Stream) dispatchEvent(e interface{}) {

case *BookEvent:
// there's "books" for 400 depth and books5 for 5 depth
if et.channel != "books5" {
if et.channel != WsChannelTypeBooks5 {
s.EmitBookEvent(*et)
}
s.EmitBookTickerUpdate(et.BookTicker())
Expand Down
149 changes: 149 additions & 0 deletions pkg/exchange/okex/websocket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package okex

import (
"encoding/json"

"github.com/c9s/bbgo/pkg/exchange/okex/okexapi"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)

type WsEvent struct {
// "op" and "PushData" are exclusive.
*WebSocketOpEvent
*WebSocketPushDataEvent
}

func (w *WsEvent) IsOp() bool {
return w.WebSocketOpEvent != nil && w.WebSocketPushDataEvent == nil
}

func (w *WsEvent) IsPushDataEvent() bool {
return w.WebSocketOpEvent == nil && w.WebSocketPushDataEvent != nil
}

type WsOpType string

const (
WsOpTypeLogin WsOpType = "login"
// subscribe and unsubscribe could be public or private, ex. private for chennel: orders
WsOpTypeSubscribe WsOpType = "subscribe"
WsOpTypeUnsubscribe WsOpType = "unsubscribe"
WsOpTypeOrder WsOpType = "order"
WsOpTypeBatchOrders WsOpType = "batch-orders"
WsOpTypeCancelOrder WsOpType = "cancel-order"
WsOpTypeBatchCancelOrders WsOpType = "batch-cancel-orders"
WsOpTypeAmendOrder WsOpType = "amend-order"
WsOpTypeBatchAmendOrders WsOpType = "batch-amend-orders"
WsOpTypeMassCancel WsOpType = "mass-cancel"
// below type exist only in response
WsOpTypeError WsOpType = "error"
)

// Websocket Op
type WebsocketOp struct {
// id only applicable to private op, ex, order, batch-orders
Id string `json:"id,omitempty"`
Op WsOpType `json:"op"`
Args []WebsocketSubscription `json:"args"`
}

// Websocket Op event
type WebSocketOpEvent struct {
// id only applicable to private op, ex, order, batch-orders
Id string `json:"id,omitempty"`
Op WsOpType `json:"op"`
Args []WebsocketSubscription `json:"args,omitempty"`
// Below is Websocket Response field
Event WsOpType `json:"event,omitempty"`
Code string `json:"code,omitempty"`
Message string `json:"msg,omitempty"`
Arg []WebsocketSubscription `json:"arg,omitempty"`
}

// Websocket Response event for private channel
type WebSocketPrivateEvent struct {
Id string `json:"id"`
Op WsOpType `json:"op"`
Data json.RawMessage `json:"data"`
Code string `json:"code"`
Message string `json:"msg"`
InTime types.MillisecondTimestamp `json:"inTime,omitempty"`
OutTime types.MillisecondTimestamp `json:"outTime,omitempty"`
}

// Websocket Push data event
type WebSocketPushDataEvent struct {
Arg WebsocketSubscription `json:"arg"`
Data json.RawMessage `json:"data"`
// action: snapshot, update, only applicable to : channel (books)
Action *string `json:"action,omitempty"`
}

type WebSocketChannelType string

const (
// below channel need authenticated
WsChannelTypeAccount WebSocketChannelType = "account"
WsChannelTypePositions WebSocketChannelType = "positions"
WsChannelTypeBalanceAndPosition WebSocketChannelType = "balance_and_position"
WsChannelTypeLiquidationWarning WebSocketChannelType = "liquidation-warning"
WsChannelTypeAccountGreeks WebSocketChannelType = "account-greeks"
WsChannelTypeOrders WebSocketChannelType = "orders"
// below channel no need authenticated
WsChannelTypeTickers WebSocketChannelType = "tickers"
WsChannelTypeTrades WebSocketChannelType = "trades"
WsChannelTypeTradesAll WebSocketChannelType = "trades-all"
WsChannelTypeOptionTrades WebSocketChannelType = "option-trades"
WsChannelTypeBooks WebSocketChannelType = "books"
WsChannelTypeBooks5 WebSocketChannelType = "books5"
WsChannelTypeBooks50L2Tbt WebSocketChannelType = "books50-l2-tbt"
WsChannelTypeBooksL2Tbt WebSocketChannelType = "books-l2-tbt"
WsChannelTypeCandle3M WebSocketChannelType = "candle3M"
WsChannelTypeCandle1M WebSocketChannelType = "candle1M"
WsChannelTypeCandle1W WebSocketChannelType = "candle1W"
WsChannelTypeCandle1D WebSocketChannelType = "candle1D"
WsChannelTypeCandle2D WebSocketChannelType = "candle2D"
WsChannelTypeCandle3D WebSocketChannelType = "candle3D"
WsChannelTypeCandle5D WebSocketChannelType = "candle5D"
WsChannelTypeCandle12H WebSocketChannelType = "candle12H"
WsChannelTypeCandle6H WebSocketChannelType = "candle6H"
WsChannelTypeCandle4H WebSocketChannelType = "candle4H"
WsChannelTypeCandle2H WebSocketChannelType = "candle2H"
WsChannelTypeCandle1H WebSocketChannelType = "candle1H"
WsChannelTypeCandle30m WebSocketChannelType = "candle30m"
WsChannelTypeCandle15m WebSocketChannelType = "candle15m"
WsChannelTypeCandle5m WebSocketChannelType = "candle5m"
WsChannelTypeCandle3m WebSocketChannelType = "candle3m"
WsChannelTypeCandle1m WebSocketChannelType = "candle1m"
WsChannelTypeCandle1s WebSocketChannelType = "candle1s"
)

func (w *WebSocketOpEvent) IsAuthenticated() bool {
return w.Op == WsOpTypeLogin && w.Code == "0"
}

// login args
type WebsocketLogin struct {
Key string `json:"apiKey"`
Passphrase string `json:"passphrase"`
Timestamp string `json:"timestamp"`
Sign string `json:"sign"`
}

// op args
type WebsocketSubscription struct {
Channel WebSocketChannelType `json:"channel"`
InstrumentID string `json:"instId,omitempty"`
InstrumentType okexapi.InstrumentType `json:"instType,omitempty"`
InstrumentFamily string `json:"instFamily,omitempty"`
Side string `json:"side,omitempty"`
TdMode string `json:"tdMode,omitempty"`
OrderType string `json:"ordType,omitempty"`
Quantity fixedpoint.Value `json:"sz,omitempty"`
// below for op login
Key string `json:"apiKey,omitempty"`
Passphrase string `json:"passphrase,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
Sign string `json:"sign,omitempty"`
}

0 comments on commit 19713e3

Please sign in to comment.