Skip to content

Commit

Permalink
Fix solana subscription (#1816)
Browse files Browse the repository at this point in the history
  • Loading branch information
shleikes authored Dec 3, 2024
1 parent 1b12d07 commit 100ba7e
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 71 deletions.
151 changes: 90 additions & 61 deletions cookbook/specs/solana.json
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,91 @@
"stateful": 0
},
"extra_compute_units": 0
}
],
"headers": [],
"inheritance_apis": [],
"parse_directives": [
{
"function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getLatestBlockhash\",\"params\":[{\"commitment\":\"finalized\"}],\"id\":1}",
"function_tag": "GET_BLOCKNUM",
"result_parsing": {
"parser_arg": [
"0",
"context",
"slot"
],
"parser_func": "PARSE_CANONICAL"
},
"api_name": "getLatestBlockhash"
},
{
"function_tag": "GET_BLOCK_BY_NUM",
"function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getBlock\",\"params\":[%d,{\"transactionDetails\":\"none\",\"rewards\":false}],\"id\":1}",
"result_parsing": {
"parser_arg": [
"0",
"blockhash"
],
"parser_func": "PARSE_CANONICAL",
"encoding": "base64"
},
"api_name": "getBlock"
}
],
"verifications": [
{
"name": "version",
"parse_directive": {
"function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getVersion\",\"params\":[],\"id\":1}",
"function_tag": "VERIFICATION",
"result_parsing": {
"parser_arg": [
"0",
"solana-core"
],
"parser_func": "PARSE_CANONICAL"
},
"api_name": "getVersion"
},
"values": [
{
"expected_value": "*"
}
]
},
{
"name": "tokens-owner-indexed",
"parse_directive": {
"function_template": "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"getTokenAccountsByOwner\",\"params\":[\"4Qkev8aNZcqFNSRhQzwyLMFSsi94jHqE8WNVTJzTP99F\",{\"programId\":\"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA\"},{\"encoding\":\"jsonParsed\"}]}",
"function_tag": "VERIFICATION",
"result_parsing": {
"parser_arg": [
"0",
"value"
],
"parser_func": "PARSE_CANONICAL"
},
"api_name": "getTokenAccountsByOwner"
},
"values": [
{
"expected_value": "*",
"severity": "Warning"
}
]
}
]
},
{
"enabled": true,
"collection_data": {
"api_interface": "jsonrpc",
"internal_path": "/ws",
"type": "POST",
"add_on": ""
},
"apis": [
{
"name": "accountSubscribe",
"block_parsing": {
Expand Down Expand Up @@ -1294,30 +1378,17 @@
"inheritance_apis": [],
"parse_directives": [
{
"function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getLatestBlockhash\",\"params\":[{\"commitment\":\"finalized\"}],\"id\":1}",
"function_tag": "GET_BLOCKNUM",
"result_parsing": {
"parser_arg": [
"0",
"context",
"slot"
],
"parser_func": "PARSE_CANONICAL"
},
"api_name": "getLatestBlockhash"
"parser_func": "DEFAULT"
}
},
{
"function_template": "%d",
"function_tag": "GET_BLOCK_BY_NUM",
"function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getBlock\",\"params\":[%d,{\"transactionDetails\":\"none\",\"rewards\":false}],\"id\":1}",
"result_parsing": {
"parser_arg": [
"0",
"blockhash"
],
"parser_func": "PARSE_CANONICAL",
"encoding": "base64"
},
"api_name": "getBlock"
"parser_func": "DEFAULT"
}
},
{
"function_tag": "SUBSCRIBE",
Expand Down Expand Up @@ -1401,49 +1472,7 @@
"api_name": "voteUnsubscribe"
}
],
"verifications": [
{
"name": "version",
"parse_directive": {
"function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getVersion\",\"params\":[],\"id\":1}",
"function_tag": "VERIFICATION",
"result_parsing": {
"parser_arg": [
"0",
"solana-core"
],
"parser_func": "PARSE_CANONICAL"
},
"api_name": "getVersion"
},
"values": [
{
"expected_value": "*"
}
]
},
{
"name": "tokens-owner-indexed",
"parse_directive": {
"function_template": "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"getTokenAccountsByOwner\",\"params\":[\"4Qkev8aNZcqFNSRhQzwyLMFSsi94jHqE8WNVTJzTP99F\",{\"programId\":\"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA\"},{\"encoding\":\"jsonParsed\"}]}",
"function_tag": "VERIFICATION",
"result_parsing": {
"parser_arg": [
"0",
"value"
],
"parser_func": "PARSE_CANONICAL"
},
"api_name": "getTokenAccountsByOwner"
},
"values": [
{
"expected_value": "*",
"severity": "Warning"
}
]
}
]
"verifications": []
}
]
},
Expand Down
24 changes: 21 additions & 3 deletions protocol/chainlib/chainproxy/rpcclient/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,16 @@ func (h *handler) handleImmediate(msg *JsonrpcMessage) bool {
h.handleSubscriptionResultTendermint(msg)
return true
case msg.isEthereumNotification():
if strings.HasSuffix(msg.Method, notificationMethodSuffix) {
if strings.HasSuffix(msg.Method, ethereumNotificationMethodSuffix) {
h.handleSubscriptionResultEthereum(msg)
return true
} else if strings.HasSuffix(msg.Method, solanaNotificationMethodSuffix) {
h.handleSubscriptionResultSolana(msg)
return true
}
return false
case msg.isStarkNetPathfinderNotification():
if strings.HasSuffix(msg.Method, notificationMethodSuffix) {
if strings.HasSuffix(msg.Method, ethereumNotificationMethodSuffix) {
h.handleSubscriptionResultStarkNetPathfinder(msg)
return true
}
Expand All @@ -258,7 +261,7 @@ func (h *handler) handleImmediate(msg *JsonrpcMessage) bool {
}

func (h *handler) handleSubscriptionResultStarkNetPathfinder(msg *JsonrpcMessage) {
var result starkNetPathfinderSubscriptionResult
var result integerIdSubscriptionResult
if err := json.Unmarshal(msg.Result, &result); err != nil {
utils.LavaFormatTrace("Dropping invalid starknet pathfinder subscription message",
utils.LogAttr("err", err),
Expand Down Expand Up @@ -290,6 +293,21 @@ func (h *handler) handleSubscriptionResultEthereum(msg *JsonrpcMessage) {
}
}

func (h *handler) handleSubscriptionResultSolana(msg *JsonrpcMessage) {
var result integerIdSubscriptionResult
if err := json.Unmarshal(msg.Params, &result); err != nil {
utils.LavaFormatTrace("Dropping invalid solana subscription message",
utils.LogAttr("err", err),
utils.LogAttr("params", string(msg.Params)),
)
h.log.Debug("Dropping invalid subscription message")
return
}
if h.clientSubs[strconv.Itoa(result.ID)] != nil {
h.clientSubs[strconv.Itoa(result.ID)].deliver(msg)
}
}

func (h *handler) handleSubscriptionResultTendermint(msg *JsonrpcMessage) {
var result tendermintSubscriptionResult
if err := json.Unmarshal(msg.Result, &result); err != nil {
Expand Down
13 changes: 7 additions & 6 deletions protocol/chainlib/chainproxy/rpcclient/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ import (
)

const (
Vsn = "2.0"
serviceMethodSeparator = "_"
subscribeMethodSuffix = "_subscribe"
unsubscribeMethodSuffix = "_unsubscribe"
notificationMethodSuffix = "_subscription"
Vsn = "2.0"
serviceMethodSeparator = "_"
subscribeMethodSuffix = "_subscribe"
unsubscribeMethodSuffix = "_unsubscribe"
ethereumNotificationMethodSuffix = "_subscription"
solanaNotificationMethodSuffix = "Notification"

defaultWriteTimeout = 10 * time.Second // used if context has no deadline
)
Expand All @@ -49,7 +50,7 @@ type ethereumSubscriptionResult struct {
Result json.RawMessage `json:"result,omitempty"`
}

type starkNetPathfinderSubscriptionResult struct {
type integerIdSubscriptionResult struct {
ID int `json:"subscription"`
Result json.RawMessage `json:"result,omitempty"`
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/chainlib/chainproxy/rpcclient/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
ctx := context.Background()
return n.h.conn.writeJSON(ctx, &JsonrpcMessage{
Version: Vsn,
Method: n.namespace + notificationMethodSuffix,
Method: n.namespace + ethereumNotificationMethodSuffix,
Params: params,
})
}
Expand Down

0 comments on commit 100ba7e

Please sign in to comment.