Skip to content

Commit

Permalink
Merge pull request #231 from vegaprotocol/230-update-datanode-api-v2
Browse files Browse the repository at this point in the history
Updated api use to v2
  • Loading branch information
peterbarrow authored Oct 21, 2022
2 parents 59e4320 + 9fd2eae commit f10a902
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 122 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
- [222](https://github.com/vegaprotocol/vegatools/issue/222) - Updated market proposal text with renamed fields
- [225](https://github.com/vegaprotocol/vegatools/issue/225) - Add option to dump total event counts
- [227](https://github.com/vegaprotocol/vegatools/issue/227) - Allow configuration of the LP shape
- [230](https://github.com/vegaprotocol/vegatools/issue/230) - Update datanode api use to v2

### 🐛 Fixes
- [78](https://github.com/vegaprotocol/vegatools/pull/78) - Fix build with missing dependency
Expand Down
18 changes: 11 additions & 7 deletions delegationviewer/delegationviewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"
"time"

api "code.vegaprotocol.io/vega/protos/data-node/api/v1"
api "code.vegaprotocol.io/vega/protos/data-node/api/v2"
proto "code.vegaprotocol.io/vega/protos/vega"

"github.com/gdamore/tcell/v2"
Expand Down Expand Up @@ -51,12 +51,16 @@ func initialiseValidatorNames() {
}

func getDelegationDetails(dataclient api.TradingDataServiceClient) error {
req := &api.GetNodesRequest{}
nodeResp, err := dataclient.GetNodes(context.Background(), req)
req := &api.ListNodesRequest{}
nodeResp, err := dataclient.ListNodes(context.Background(), req)
if err != nil {
return fmt.Errorf("Failed to get node details: %v", err)
return fmt.Errorf("failed to get node details: %v", err)
}

nodes = make([]*proto.Node, 0, len(nodeResp.Nodes.Edges))
for _, edgeNode := range nodeResp.Nodes.GetEdges() {
nodes = append(nodes, edgeNode.Node)
}
nodes = nodeResp.Nodes
sort.Slice(nodes, func(i, j int) bool { return nodes[i].Id < nodes[j].Id })
return nil
}
Expand Down Expand Up @@ -197,15 +201,15 @@ func Run(gRPCAddress string, delay uint) error {
connection, err := grpc.Dial(gRPCAddress, grpc.WithInsecure())
if err != nil {
// Something went wrong
return fmt.Errorf("Failed to connect to the vega gRPC port: %s", err)
return fmt.Errorf("failed to connect to the vega gRPC port: %s", err)
}
defer connection.Close()
dataclient := api.NewTradingDataServiceClient(connection)

// Check we can get delegation information
err = getDelegationDetails(dataclient)
if err != nil {
return fmt.Errorf("Failed to get delegation details: %v", err)
return fmt.Errorf("failed to get delegation details: %v", err)
}

initialiseValidatorNames()
Expand Down
38 changes: 21 additions & 17 deletions liquiditycommitment/liquiditycommitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"strconv"
"strings"

api "code.vegaprotocol.io/vega/protos/data-node/api/v1"
api "code.vegaprotocol.io/vega/protos/data-node/api/v2"
proto "code.vegaprotocol.io/vega/protos/vega"
eventspb "code.vegaprotocol.io/vega/protos/vega/events/v1"

Expand All @@ -29,51 +29,55 @@ var (
)

func getLiquidityProvisions(dataclient api.TradingDataServiceClient, marketID string) []*proto.LiquidityProvision {
lpReq := &api.LiquidityProvisionsRequest{Market: marketID}
lpReq := &api.ListLiquidityProvisionsRequest{MarketId: &marketID}

response, err := dataclient.LiquidityProvisions(context.Background(), lpReq)
response, err := dataclient.ListLiquidityProvisions(context.Background(), lpReq)
if err != nil {
log.Println(err)
}
// Only return the orders currently ACTIVE
lps := make([]*proto.LiquidityProvision, 0)
for _, lp := range response.LiquidityProvisions {
if lp.Status != proto.LiquidityProvision_STATUS_CANCELLED {
lps = append(lps, lp)
// Only return the LPs currently ACTIVE
lps := make([]*proto.LiquidityProvision, 0, len(response.LiquidityProvisions.Edges))
for _, lp := range response.LiquidityProvisions.Edges {
if lp.Node.Status != proto.LiquidityProvision_STATUS_CANCELLED {
lps = append(lps, lp.Node)
}
}
return lps
}

func getMarketData(dataclient api.TradingDataServiceClient, marketID string) *proto.MarketData {
marketDataRequest := &api.MarketDataByIDRequest{
marketDataRequest := &api.GetLatestMarketDataRequest{
MarketId: marketID,
}

marketDataResponse, err := dataclient.MarketDataByID(context.Background(), marketDataRequest)
marketDataResponse, err := dataclient.GetLatestMarketData(context.Background(), marketDataRequest)
if err != nil {
fmt.Println("Failed to get market data")
os.Exit(0)
return nil
}

if marketDataResponse == nil {
return nil
}
return marketDataResponse.MarketData
}

func getMarketToDisplay(dataclient api.TradingDataServiceClient, marketID string) *proto.Market {
marketsRequest := &api.MarketsRequest{}
marketsRequest := &api.ListMarketsRequest{}

marketsResponse, err := dataclient.Markets(context.Background(), marketsRequest)
marketsResponse, err := dataclient.ListMarkets(context.Background(), marketsRequest)
if err != nil {
return nil
}

var validMarkets []*proto.Market
// Check each market to see if we have at least one LP
for _, market := range marketsResponse.Markets {
lps := getLiquidityProvisions(dataclient, market.Id)
for _, market := range marketsResponse.Markets.Edges {
lps := getLiquidityProvisions(dataclient, market.Node.Id)
if len(lps) > 0 {
validMarkets = append(validMarkets, market)
mapMarketToLPs[market.Id] = lps
validMarkets = append(validMarkets, market.Node)
mapMarketToLPs[market.Node.Id] = lps
}
}

Expand Down Expand Up @@ -120,7 +124,7 @@ func getMarketToDisplay(dataclient api.TradingDataServiceClient, marketID string
// Correct index as we start with 0
index--

if index < 0 || index > len(marketsResponse.Markets)-1 {
if index < 0 || index > len(marketsResponse.Markets.Edges)-1 {
fmt.Println("Invalid market selected")
os.Exit(0)
}
Expand Down
64 changes: 38 additions & 26 deletions liquidityviewer/liquidityviewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"strconv"
"strings"

api "code.vegaprotocol.io/vega/protos/data-node/api/v1"
api "code.vegaprotocol.io/vega/protos/data-node/api/v2"
proto "code.vegaprotocol.io/vega/protos/vega"
eventspb "code.vegaprotocol.io/vega/protos/vega/events/v1"

Expand All @@ -36,30 +36,35 @@ var (
)

func getLiquidityProvisions(dataclient api.TradingDataServiceClient, marketID string) []*proto.LiquidityProvision {
lpReq := &api.LiquidityProvisionsRequest{Market: marketID}
lpReq := &api.ListLiquidityProvisionsRequest{MarketId: &marketID}

response, err := dataclient.LiquidityProvisions(context.Background(), lpReq)
response, err := dataclient.ListLiquidityProvisions(context.Background(), lpReq)
if err != nil {
log.Println(err)
}
return response.LiquidityProvisions
lps := make([]*proto.LiquidityProvision, 0, len(response.LiquidityProvisions.Edges))

for _, lp := range response.LiquidityProvisions.Edges {
lps = append(lps, lp.Node)
}
return lps
}

func getMarketToDisplay(dataclient api.TradingDataServiceClient, marketID string) *proto.Market {
marketsRequest := &api.MarketsRequest{}
marketsRequest := &api.ListMarketsRequest{}

marketsResponse, err := dataclient.Markets(context.Background(), marketsRequest)
marketsResponse, err := dataclient.ListMarkets(context.Background(), marketsRequest)
if err != nil {
return nil
}

var validMarkets []*proto.Market
// Check each market to see if we have at least one LP
for _, market := range marketsResponse.Markets {
lps := getLiquidityProvisions(dataclient, market.Id)
for _, market := range marketsResponse.Markets.Edges {
lps := getLiquidityProvisions(dataclient, market.Node.Id)
if len(lps) > 0 {
validMarkets = append(validMarkets, market)
mapMarketToLPs[market.Id] = lps
validMarkets = append(validMarkets, market.Node)
mapMarketToLPs[market.Node.Id] = lps
}
}

Expand Down Expand Up @@ -106,7 +111,7 @@ func getMarketToDisplay(dataclient api.TradingDataServiceClient, marketID string
// Correct index as we start with 0
index--

if index < 0 || index > len(marketsResponse.Markets)-1 {
if index < 0 || index > len(marketsResponse.Markets.Edges)-1 {
fmt.Println("Invalid market selected")
os.Exit(0)
}
Expand Down Expand Up @@ -167,36 +172,38 @@ func getPartyToDisplay(dataclient api.TradingDataServiceClient, marketID, partyI
}

func getAccountDetails(dataclient api.TradingDataServiceClient, partyID, assetID string) {
lpReq := &api.PartyAccountsRequest{
PartyId: partyID,
Asset: assetID,
lpReq := &api.ListAccountsRequest{
Filter: &api.AccountFilter{
PartyIds: strings.Fields(partyID),
AssetId: assetID,
},
}

response, err := dataclient.PartyAccounts(context.Background(), lpReq)
response, err := dataclient.ListAccounts(context.Background(), lpReq)
if err != nil {
log.Fatalln(err)
return
}

for _, acct := range response.Accounts {
for _, acct := range response.Accounts.Edges {
log.Println(acct)
switch acct.Type {
switch acct.Account.Type {
case proto.AccountType_ACCOUNT_TYPE_BOND:
acctBond = acct.Balance
acctBond = acct.Account.Balance
case proto.AccountType_ACCOUNT_TYPE_MARGIN:
acctMargin = acct.Balance
acctMargin = acct.Account.Balance
case proto.AccountType_ACCOUNT_TYPE_GENERAL:
acctGeneral = acct.Balance
acctGeneral = acct.Account.Balance
}
}
}

func subscribePositions(dataclient api.TradingDataServiceClient, marketID string, userKey string) {
req := &api.PositionsSubscribeRequest{
MarketId: marketID,
PartyId: userKey,
req := &api.ObservePositionsRequest{
MarketId: &marketID,
PartyId: &userKey,
}
stream, err := dataclient.PositionsSubscribe(context.Background(), req)
stream, err := dataclient.ObservePositions(context.Background(), req)
if err != nil {
log.Panicln("Failed to subscribe to positions: ", err)
}
Expand All @@ -205,7 +212,7 @@ func subscribePositions(dataclient api.TradingDataServiceClient, marketID string
go processPositions(stream)
}

func processPositions(stream api.TradingDataService_PositionsSubscribeClient) {
func processPositions(stream api.TradingDataService_ObservePositionsClient) {
for {
o, err := stream.Recv()
if err == io.EOF {
Expand All @@ -216,7 +223,12 @@ func processPositions(stream api.TradingDataService_PositionsSubscribeClient) {
log.Panicln("positions: stream closed err:", err)
break
}
position = o.GetPosition()
snapshots := o.GetSnapshot()
if snapshots != nil {
if len(snapshots.Positions) > 0 {
position = snapshots.Positions[len(snapshots.Positions)-1]
}
}
}
}

Expand Down
27 changes: 16 additions & 11 deletions marketdepthviewer/deltas.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"log"
"time"

api "code.vegaprotocol.io/vega/protos/data-node/api/v1"
api "code.vegaprotocol.io/vega/protos/data-node/api/v2"
proto "code.vegaprotocol.io/vega/protos/vega"
)

func (m *mdv) getMarketDepthSnapshot(dataclient api.TradingDataServiceClient) error {
req := &api.MarketDepthRequest{
req := &api.GetLatestMarketDepthRequest{
MarketId: m.market.Id,
}
resp, err := dataclient.MarketDepth(context.Background(), req)
resp, err := dataclient.GetLatestMarketDepth(context.Background(), req)
if err != nil {
return err
}
Expand All @@ -33,10 +33,10 @@ func (m *mdv) getMarketDepthSnapshot(dataclient api.TradingDataServiceClient) er
}

func (m *mdv) subscribeToMarketDepthUpdates(dataclient api.TradingDataServiceClient) error {
req := &api.MarketDepthUpdatesSubscribeRequest{
MarketId: m.market.Id,
req := &api.ObserveMarketsDepthUpdatesRequest{
MarketIds: []string{m.market.Id},
}
stream, err := dataclient.MarketDepthUpdatesSubscribe(context.Background(), req)
stream, err := dataclient.ObserveMarketsDepthUpdates(context.Background(), req)
if err != nil {
return fmt.Errorf("failed to subscribe to trades: %w", err)
}
Expand All @@ -50,7 +50,7 @@ func (m *mdv) subscribeToMarketDepthUpdates(dataclient api.TradingDataServiceCli
return nil
}

func (m *mdv) processMarketDepthUpdates(stream api.TradingDataService_MarketDepthUpdatesSubscribeClient) {
func (m *mdv) processMarketDepthUpdates(stream api.TradingDataService_ObserveMarketsDepthUpdatesClient) {
for {
resp, err := stream.Recv()
if err == io.EOF {
Expand All @@ -62,18 +62,23 @@ func (m *mdv) processMarketDepthUpdates(stream api.TradingDataService_MarketDept
break
}

if len(resp.Update.Buy) == 0 && len(resp.Update.Sell) == 0 {
if len(resp.Update) == 0 {
continue
}

if m.book.seqNum == 0 {
continue
}

if resp.Update.PreviousSequenceNumber != m.book.seqNum {
continue
for _, md := range resp.Update {
if md.PreviousSequenceNumber != m.book.seqNum {
continue
}
if len(md.Buy) == 0 && len(md.Sell) == 0 {
continue
}
m.updateMarketDepthUpdates(md)
}
m.updateMarketDepthUpdates(resp.Update)
}
}

Expand Down
Loading

0 comments on commit f10a902

Please sign in to comment.