Skip to content

Commit

Permalink
apiName and chainID added to StatusCode warnings (#839)
Browse files Browse the repository at this point in the history
* apiName and chainID added to StatusCode warnings

* api and chainID prints simplified

* InvalidStatusCodeError special error added

* fix error handling

* fixing race in RS

---------

Co-authored-by: Ran Mishael <[email protected]>
  • Loading branch information
candostyavuz and ranlavanet authored Oct 4, 2023
1 parent c1d1092 commit 0c6c09e
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func IdFromRawMessage(rawID json.RawMessage) (jsonrpcId, error) {
var idInterface interface{}
err := json.Unmarshal(rawID, &idInterface)
if err != nil {
return nil, utils.LavaFormatError("failed to unmarshal id from response", err, utils.Attribute{Key: "id", Value: rawID})
return nil, utils.LavaFormatError("failed to unmarshal id from response", err, utils.Attribute{Key: "id", Value: string(rawID)})
}

switch id := idInterface.(type) {
Expand All @@ -106,7 +106,7 @@ func IdFromRawMessage(rawID json.RawMessage) (jsonrpcId, error) {
return JSONRPCIntID(int(id)), nil
default:
typ := reflect.TypeOf(id)
return nil, utils.LavaFormatError("failed to unmarshal id not a string or float", err, []utils.Attribute{{Key: "id", Value: rawID}, {Key: "id type", Value: typ}}...)
return nil, utils.LavaFormatError("failed to unmarshal id not a string or float", err, []utils.Attribute{{Key: "id", Value: string(rawID)}, {Key: "id type", Value: typ}}...)
}
}

Expand Down
4 changes: 3 additions & 1 deletion protocol/chainlib/chainproxy/rpcclient/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package rpcclient

import "fmt"
import (
"fmt"
)

// HTTPError is returned by client operations when the HTTP status code of the
// response is not a 2xx status.
Expand Down
7 changes: 5 additions & 2 deletions protocol/chainlib/chainproxy/rpcclient/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync"
"time"

"github.com/lavanet/lava/protocol/common"
"github.com/lavanet/lava/utils"
)

Expand Down Expand Up @@ -223,8 +224,10 @@ func (hc *httpConn) doRequest(ctx context.Context, msg interface{}, isJsonRPC bo
}

func ValidateStatusCodes(statusCode int) error {
if statusCode == 504 || statusCode == 429 {
return utils.LavaFormatError("Received invalid status code", nil, utils.Attribute{Key: "Status Code", Value: statusCode})
if statusCode == 504 {
return common.StatusCodeError504
} else if statusCode == 429 {
return common.StatusCodeError429
}
return nil
}
Expand Down
1 change: 1 addition & 0 deletions protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type BaseChainProxy struct {
ErrorHandler
averageBlockTime time.Duration
NodeUrl common.NodeUrl
ChainID string
}

func extractDappIDFromFiberContext(c *fiber.Ctx) (dappID string) {
Expand Down
5 changes: 4 additions & 1 deletion protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func NewJrpcChainProxy(ctx context.Context, nConns uint, rpcProviderEndpoint lav
_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
nodeUrl := rpcProviderEndpoint.NodeUrls[0]
cp := &JrpcChainProxy{
BaseChainProxy: BaseChainProxy{averageBlockTime: averageBlockTime, NodeUrl: nodeUrl, ErrorHandler: &JsonRPCErrorHandler{}},
BaseChainProxy: BaseChainProxy{averageBlockTime: averageBlockTime, NodeUrl: nodeUrl, ErrorHandler: &JsonRPCErrorHandler{}, ChainID: rpcProviderEndpoint.ChainID},
conn: map[string]*chainproxy.Connector{},
}
verifyRPCEndpoint(nodeUrl.Url)
Expand Down Expand Up @@ -601,6 +601,9 @@ func (cp *JrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{},
defer cancel()
rpcMessage, err = rpc.CallContext(connectCtx, nodeMessage.ID, nodeMessage.Method, nodeMessage.Params, true)
if err != nil {
if common.StatusCodeError504.Is(err) || common.StatusCodeError429.Is(err) {
return nil, "", nil, utils.LavaFormatWarning("Received invalid status code", err, utils.Attribute{Key: "chainID", Value: cp.BaseChainProxy.ChainID}, utils.Attribute{Key: "apiName", Value: chainMessage.GetApi().Name})
}
// Validate if the error is related to the provider connection to the node or it is a valid error
// in case the error is valid (e.g. bad input parameters) the error will return in the form of a valid error reply
if parsedError := cp.HandleNodeError(ctx, err); parsedError != nil {
Expand Down
4 changes: 2 additions & 2 deletions protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func NewRestChainProxy(ctx context.Context, nConns uint, rpcProviderEndpoint lav
nodeUrl := rpcProviderEndpoint.NodeUrls[0]
nodeUrl.Url = strings.TrimSuffix(rpcProviderEndpoint.NodeUrls[0].Url, "/")
rcp := &RestChainProxy{
BaseChainProxy: BaseChainProxy{averageBlockTime: averageBlockTime, NodeUrl: rpcProviderEndpoint.NodeUrls[0], ErrorHandler: &RestErrorHandler{}},
BaseChainProxy: BaseChainProxy{averageBlockTime: averageBlockTime, NodeUrl: rpcProviderEndpoint.NodeUrls[0], ErrorHandler: &RestErrorHandler{}, ChainID: rpcProviderEndpoint.ChainID},
}
return rcp, nil
}
Expand Down Expand Up @@ -446,7 +446,7 @@ func (rcp *RestChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{},

err = rcp.HandleStatusError(res.StatusCode)
if err != nil {
return nil, "", nil, err
return nil, "", nil, utils.LavaFormatWarning("Received invalid status code", nil, utils.Attribute{Key: "Status Code", Value: res.StatusCode}, utils.Attribute{Key: "chainID", Value: rcp.BaseChainProxy.ChainID}, utils.Attribute{Key: "apiName", Value: chainMessage.GetApi().Name})
}

body, err := io.ReadAll(res.Body)
Expand Down
7 changes: 5 additions & 2 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func NewtendermintRpcChainProxy(ctx context.Context, nConns uint, rpcProviderEnd
_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
websocketUrl, httpUrl := verifyTendermintEndpoint(rpcProviderEndpoint.NodeUrls)
cp := &tendermintRpcChainProxy{
JrpcChainProxy: JrpcChainProxy{BaseChainProxy: BaseChainProxy{averageBlockTime: averageBlockTime, NodeUrl: websocketUrl, ErrorHandler: &TendermintRPCErrorHandler{}}, conn: map[string]*chainproxy.Connector{}},
JrpcChainProxy: JrpcChainProxy{BaseChainProxy: BaseChainProxy{averageBlockTime: averageBlockTime, NodeUrl: websocketUrl, ErrorHandler: &TendermintRPCErrorHandler{}, ChainID: rpcProviderEndpoint.ChainID}, conn: map[string]*chainproxy.Connector{}},
httpNodeUrl: httpUrl,
httpConnector: nil,
}
Expand Down Expand Up @@ -618,7 +618,7 @@ func (cp *tendermintRpcChainProxy) SendURI(ctx context.Context, nodeMessage *rpc

err = cp.HandleStatusError(res.StatusCode)
if err != nil {
return nil, "", nil, err
return nil, "", nil, utils.LavaFormatWarning("Received invalid status code", nil, utils.Attribute{Key: "Status Code", Value: res.StatusCode}, utils.Attribute{Key: "chainID", Value: cp.BaseChainProxy.ChainID}, utils.Attribute{Key: "apiName", Value: chainMessage.GetApi().Name})
}

// read the response body
Expand Down Expand Up @@ -691,6 +691,9 @@ func (cp *tendermintRpcChainProxy) SendRPC(ctx context.Context, nodeMessage *rpc
// perform the rpc call
rpcMessage, err = rpc.CallContext(connectCtx, nodeMessage.ID, nodeMessage.Method, nodeMessage.Params, false)
if err != nil {
if common.StatusCodeError504.Is(err) || common.StatusCodeError429.Is(err) {
return nil, "", nil, utils.LavaFormatWarning("Received invalid status code", err, utils.Attribute{Key: "chainID", Value: cp.BaseChainProxy.ChainID}, utils.Attribute{Key: "apiName", Value: chainMessage.GetApi().Name})
}
// Validate if the error is related to the provider connection to the node or it is a valid error
// in case the error is valid (e.g. bad input parameters) the error will return in the form of a valid error reply
if parsedError := cp.HandleNodeError(ctx, err); parsedError != nil {
Expand Down
6 changes: 5 additions & 1 deletion protocol/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ package common

import sdkerrors "cosmossdk.io/errors"

var ContextDeadlineExceededError = sdkerrors.New("ContextDeadlineExceeded Error", 300, "context deadline exceeded")
var (
ContextDeadlineExceededError = sdkerrors.New("ContextDeadlineExceeded Error", 300, "context deadline exceeded")
StatusCodeError504 = sdkerrors.New("Disallowed StatusCode Error", 504, "Disallowed status code error")
StatusCodeError429 = sdkerrors.New("Disallowed StatusCode Error", 429, "Disallowed status code error")
)
19 changes: 9 additions & 10 deletions protocol/rpcprovider/rewardserver/reward_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,16 +267,15 @@ func (rws *RewardServer) gatherRewardsForClaim(ctx context.Context, currentEpoch
}

activeEpochThreshold := currentEpoch - blockDistanceForEpochValidity

rws.lock.Lock()
defer rws.lock.Unlock()
for epoch, epochRewards := range rws.rewards {
if epoch < earliestSavedEpoch {
delete(rws.rewards, epoch)
err := rws.rewardDB.DeleteEpochRewards(epoch)
if err != nil {
utils.LavaFormatWarning("gatherRewardsForClaim failed deleting epoch from rewardDB", err, utils.Attribute{Key: "epoch", Value: epoch})
}

delete(rws.rewards, epoch)

// Epoch is too old, we can't claim the rewards anymore.
continue
}
Expand Down Expand Up @@ -374,7 +373,7 @@ func NewRewardServer(rewardsTxSender RewardsTxSender, providerMetrics *metrics.P
rws.rewardDB = rewardDB
rws.rewardStoragePath = rewardStoragePath
rws.rewardsSnapshotThreshold = uint64(rewardsSnapshotThreshold)
rws.rewardsSnapshotTimeoutDuration = time.Duration(rewardsSnapshotTimeoutSec * uint(time.Second))
rws.rewardsSnapshotTimeoutDuration = time.Duration(rewardsSnapshotTimeoutSec) * time.Second
rws.rewardsSnapshotTimer = timer.NewTimer(rws.rewardsSnapshotTimeoutDuration)
rws.rewardsSnapshotThresholdCh = make(chan struct{})

Expand All @@ -386,16 +385,16 @@ func (rws *RewardServer) saveRewardsSnapshotToDBJob() {
for {
select {
case <-rws.rewardsSnapshotTimer.C:
rws.resetSnapshotTimerAndSaveRewardsSnapshotToDBAndResetTimer()
rws.resetSnapshotTimerAndSaveRewardsSnapshotToDB()
case <-rws.rewardsSnapshotThresholdCh:
rws.resetSnapshotTimerAndSaveRewardsSnapshotToDBAndResetTimer()
rws.resetSnapshotTimerAndSaveRewardsSnapshotToDB()
}
}
}

func (rws *RewardServer) resetSnapshotTimerAndSaveRewardsSnapshotToDBAndResetTimer() {
func (rws *RewardServer) resetSnapshotTimerAndSaveRewardsSnapshotToDB() {
// We lock without defer because the DB is already locking itself
rws.lock.Lock()
rws.lock.RLock()
rws.rewardsSnapshotTimer.Reset(rws.rewardsSnapshotTimeoutDuration)

rewardEntities := []*RewardEntity{}
Expand All @@ -413,7 +412,7 @@ func (rws *RewardServer) resetSnapshotTimerAndSaveRewardsSnapshotToDBAndResetTim
}
}
}
rws.lock.Unlock()
rws.lock.RUnlock()
if len(rewardEntities) == 0 {
return
}
Expand Down
7 changes: 3 additions & 4 deletions protocol/rpcprovider/rewardserver/reward_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func TestUpdateEpoch(t *testing.T) {
}

// Make sure that the rewards are flushed to DB
rws.resetSnapshotTimerAndSaveRewardsSnapshotToDBAndResetTimer()
rws.resetSnapshotTimerAndSaveRewardsSnapshotToDB()

rws.UpdateEpoch(1)

Expand All @@ -280,8 +280,8 @@ func TestUpdateEpoch(t *testing.T) {
privKey, acc := sigs.GenerateFloatingKey()

ctx := sdk.WrapSDKContext(sdk.NewContext(nil, tmproto.Header{}, false, nil))
epoch := uint64(1)
for _, sessionId := range []uint64{1, 2, 3, 4, 5} {
epoch := uint64(1)
proof := common.BuildRelayRequestWithSession(ctx, "provider", []byte{}, sessionId, uint64(0), "spec", nil)
proof.Epoch = int64(epoch)

Expand All @@ -293,15 +293,14 @@ func TestUpdateEpoch(t *testing.T) {
}

// Make sure that the rewards are flushed to DB
rws.resetSnapshotTimerAndSaveRewardsSnapshotToDBAndResetTimer()
rws.resetSnapshotTimerAndSaveRewardsSnapshotToDB()

stubRewardsTxSender.earliestBlockInMemory = 2

rws.UpdateEpoch(3)

// ensure no payments have been sent
require.Len(t, stubRewardsTxSender.sentPayments, 0)

rewards, err := db.FindAll()
require.NoError(t, err)
// ensure rewards have been deleted
Expand Down

0 comments on commit 0c6c09e

Please sign in to comment.