diff --git a/protocol/chainlib/chainproxy/rpcInterfaceMessages/tendermintRPCMessage.go b/protocol/chainlib/chainproxy/rpcInterfaceMessages/tendermintRPCMessage.go index a4f6951182..c5ceed3baf 100644 --- a/protocol/chainlib/chainproxy/rpcInterfaceMessages/tendermintRPCMessage.go +++ b/protocol/chainlib/chainproxy/rpcInterfaceMessages/tendermintRPCMessage.go @@ -95,7 +95,7 @@ func IdFromRawMessage(rawID json.RawMessage) (jsonrpcId, error) { var idInterface interface{} err := json.Unmarshal(rawID, &idInterface) if err != nil { - return nil, utils.LavaFormatError("failed to unmarshal id from response", err, utils.Attribute{Key: "id", Value: rawID}) + return nil, utils.LavaFormatError("failed to unmarshal id from response", err, utils.Attribute{Key: "id", Value: string(rawID)}) } switch id := idInterface.(type) { @@ -106,7 +106,7 @@ func IdFromRawMessage(rawID json.RawMessage) (jsonrpcId, error) { return JSONRPCIntID(int(id)), nil default: typ := reflect.TypeOf(id) - return nil, utils.LavaFormatError("failed to unmarshal id not a string or float", err, []utils.Attribute{{Key: "id", Value: rawID}, {Key: "id type", Value: typ}}...) + return nil, utils.LavaFormatError("failed to unmarshal id not a string or float", err, []utils.Attribute{{Key: "id", Value: string(rawID)}, {Key: "id type", Value: typ}}...) } } diff --git a/protocol/chainlib/chainproxy/rpcclient/errors.go b/protocol/chainlib/chainproxy/rpcclient/errors.go index ff3b344f99..3046d4903b 100755 --- a/protocol/chainlib/chainproxy/rpcclient/errors.go +++ b/protocol/chainlib/chainproxy/rpcclient/errors.go @@ -16,7 +16,9 @@ package rpcclient -import "fmt" +import ( + "fmt" +) // HTTPError is returned by client operations when the HTTP status code of the // response is not a 2xx status. diff --git a/protocol/chainlib/chainproxy/rpcclient/http.go b/protocol/chainlib/chainproxy/rpcclient/http.go index f24cdb510b..2dadbc4d0a 100755 --- a/protocol/chainlib/chainproxy/rpcclient/http.go +++ b/protocol/chainlib/chainproxy/rpcclient/http.go @@ -29,6 +29,7 @@ import ( "sync" "time" + "github.com/lavanet/lava/protocol/common" "github.com/lavanet/lava/utils" ) @@ -223,8 +224,10 @@ func (hc *httpConn) doRequest(ctx context.Context, msg interface{}, isJsonRPC bo } func ValidateStatusCodes(statusCode int) error { - if statusCode == 504 || statusCode == 429 { - return utils.LavaFormatError("Received invalid status code", nil, utils.Attribute{Key: "Status Code", Value: statusCode}) + if statusCode == 504 { + return common.StatusCodeError504 + } else if statusCode == 429 { + return common.StatusCodeError429 } return nil } diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index 6ac00709a9..d3c658af74 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -61,6 +61,7 @@ type BaseChainProxy struct { ErrorHandler averageBlockTime time.Duration NodeUrl common.NodeUrl + ChainID string } func extractDappIDFromFiberContext(c *fiber.Ctx) (dappID string) { diff --git a/protocol/chainlib/jsonRPC.go b/protocol/chainlib/jsonRPC.go index 247c70316b..e2aeeeb95b 100644 --- a/protocol/chainlib/jsonRPC.go +++ b/protocol/chainlib/jsonRPC.go @@ -446,7 +446,7 @@ func NewJrpcChainProxy(ctx context.Context, nConns uint, rpcProviderEndpoint lav _, averageBlockTime, _, _ := chainParser.ChainBlockStats() nodeUrl := rpcProviderEndpoint.NodeUrls[0] cp := &JrpcChainProxy{ - BaseChainProxy: BaseChainProxy{averageBlockTime: averageBlockTime, NodeUrl: nodeUrl, ErrorHandler: &JsonRPCErrorHandler{}}, + BaseChainProxy: BaseChainProxy{averageBlockTime: averageBlockTime, NodeUrl: nodeUrl, ErrorHandler: &JsonRPCErrorHandler{}, ChainID: rpcProviderEndpoint.ChainID}, conn: map[string]*chainproxy.Connector{}, } verifyRPCEndpoint(nodeUrl.Url) @@ -601,6 +601,9 @@ func (cp *JrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, defer cancel() rpcMessage, err = rpc.CallContext(connectCtx, nodeMessage.ID, nodeMessage.Method, nodeMessage.Params, true) if err != nil { + if common.StatusCodeError504.Is(err) || common.StatusCodeError429.Is(err) { + return nil, "", nil, utils.LavaFormatWarning("Received invalid status code", err, utils.Attribute{Key: "chainID", Value: cp.BaseChainProxy.ChainID}, utils.Attribute{Key: "apiName", Value: chainMessage.GetApi().Name}) + } // Validate if the error is related to the provider connection to the node or it is a valid error // in case the error is valid (e.g. bad input parameters) the error will return in the form of a valid error reply if parsedError := cp.HandleNodeError(ctx, err); parsedError != nil { diff --git a/protocol/chainlib/rest.go b/protocol/chainlib/rest.go index aa64c8768c..5c7c06756e 100644 --- a/protocol/chainlib/rest.go +++ b/protocol/chainlib/rest.go @@ -369,7 +369,7 @@ func NewRestChainProxy(ctx context.Context, nConns uint, rpcProviderEndpoint lav nodeUrl := rpcProviderEndpoint.NodeUrls[0] nodeUrl.Url = strings.TrimSuffix(rpcProviderEndpoint.NodeUrls[0].Url, "/") rcp := &RestChainProxy{ - BaseChainProxy: BaseChainProxy{averageBlockTime: averageBlockTime, NodeUrl: rpcProviderEndpoint.NodeUrls[0], ErrorHandler: &RestErrorHandler{}}, + BaseChainProxy: BaseChainProxy{averageBlockTime: averageBlockTime, NodeUrl: rpcProviderEndpoint.NodeUrls[0], ErrorHandler: &RestErrorHandler{}, ChainID: rpcProviderEndpoint.ChainID}, } return rcp, nil } @@ -446,7 +446,7 @@ func (rcp *RestChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, err = rcp.HandleStatusError(res.StatusCode) if err != nil { - return nil, "", nil, err + return nil, "", nil, utils.LavaFormatWarning("Received invalid status code", nil, utils.Attribute{Key: "Status Code", Value: res.StatusCode}, utils.Attribute{Key: "chainID", Value: rcp.BaseChainProxy.ChainID}, utils.Attribute{Key: "apiName", Value: chainMessage.GetApi().Name}) } body, err := io.ReadAll(res.Body) diff --git a/protocol/chainlib/tendermintRPC.go b/protocol/chainlib/tendermintRPC.go index a5bfa3d90a..ed8c610377 100644 --- a/protocol/chainlib/tendermintRPC.go +++ b/protocol/chainlib/tendermintRPC.go @@ -518,7 +518,7 @@ func NewtendermintRpcChainProxy(ctx context.Context, nConns uint, rpcProviderEnd _, averageBlockTime, _, _ := chainParser.ChainBlockStats() websocketUrl, httpUrl := verifyTendermintEndpoint(rpcProviderEndpoint.NodeUrls) cp := &tendermintRpcChainProxy{ - JrpcChainProxy: JrpcChainProxy{BaseChainProxy: BaseChainProxy{averageBlockTime: averageBlockTime, NodeUrl: websocketUrl, ErrorHandler: &TendermintRPCErrorHandler{}}, conn: map[string]*chainproxy.Connector{}}, + JrpcChainProxy: JrpcChainProxy{BaseChainProxy: BaseChainProxy{averageBlockTime: averageBlockTime, NodeUrl: websocketUrl, ErrorHandler: &TendermintRPCErrorHandler{}, ChainID: rpcProviderEndpoint.ChainID}, conn: map[string]*chainproxy.Connector{}}, httpNodeUrl: httpUrl, httpConnector: nil, } @@ -618,7 +618,7 @@ func (cp *tendermintRpcChainProxy) SendURI(ctx context.Context, nodeMessage *rpc err = cp.HandleStatusError(res.StatusCode) if err != nil { - return nil, "", nil, err + return nil, "", nil, utils.LavaFormatWarning("Received invalid status code", nil, utils.Attribute{Key: "Status Code", Value: res.StatusCode}, utils.Attribute{Key: "chainID", Value: cp.BaseChainProxy.ChainID}, utils.Attribute{Key: "apiName", Value: chainMessage.GetApi().Name}) } // read the response body @@ -691,6 +691,9 @@ func (cp *tendermintRpcChainProxy) SendRPC(ctx context.Context, nodeMessage *rpc // perform the rpc call rpcMessage, err = rpc.CallContext(connectCtx, nodeMessage.ID, nodeMessage.Method, nodeMessage.Params, false) if err != nil { + if common.StatusCodeError504.Is(err) || common.StatusCodeError429.Is(err) { + return nil, "", nil, utils.LavaFormatWarning("Received invalid status code", err, utils.Attribute{Key: "chainID", Value: cp.BaseChainProxy.ChainID}, utils.Attribute{Key: "apiName", Value: chainMessage.GetApi().Name}) + } // Validate if the error is related to the provider connection to the node or it is a valid error // in case the error is valid (e.g. bad input parameters) the error will return in the form of a valid error reply if parsedError := cp.HandleNodeError(ctx, err); parsedError != nil { diff --git a/protocol/common/errors.go b/protocol/common/errors.go index b88f2d6cb4..23cf966957 100644 --- a/protocol/common/errors.go +++ b/protocol/common/errors.go @@ -2,4 +2,8 @@ package common import sdkerrors "cosmossdk.io/errors" -var ContextDeadlineExceededError = sdkerrors.New("ContextDeadlineExceeded Error", 300, "context deadline exceeded") +var ( + ContextDeadlineExceededError = sdkerrors.New("ContextDeadlineExceeded Error", 300, "context deadline exceeded") + StatusCodeError504 = sdkerrors.New("Disallowed StatusCode Error", 504, "Disallowed status code error") + StatusCodeError429 = sdkerrors.New("Disallowed StatusCode Error", 429, "Disallowed status code error") +) diff --git a/protocol/rpcprovider/rewardserver/reward_server.go b/protocol/rpcprovider/rewardserver/reward_server.go index bf96a5def4..a3041f0828 100644 --- a/protocol/rpcprovider/rewardserver/reward_server.go +++ b/protocol/rpcprovider/rewardserver/reward_server.go @@ -267,16 +267,15 @@ func (rws *RewardServer) gatherRewardsForClaim(ctx context.Context, currentEpoch } activeEpochThreshold := currentEpoch - blockDistanceForEpochValidity - + rws.lock.Lock() + defer rws.lock.Unlock() for epoch, epochRewards := range rws.rewards { if epoch < earliestSavedEpoch { + delete(rws.rewards, epoch) err := rws.rewardDB.DeleteEpochRewards(epoch) if err != nil { utils.LavaFormatWarning("gatherRewardsForClaim failed deleting epoch from rewardDB", err, utils.Attribute{Key: "epoch", Value: epoch}) } - - delete(rws.rewards, epoch) - // Epoch is too old, we can't claim the rewards anymore. continue } @@ -374,7 +373,7 @@ func NewRewardServer(rewardsTxSender RewardsTxSender, providerMetrics *metrics.P rws.rewardDB = rewardDB rws.rewardStoragePath = rewardStoragePath rws.rewardsSnapshotThreshold = uint64(rewardsSnapshotThreshold) - rws.rewardsSnapshotTimeoutDuration = time.Duration(rewardsSnapshotTimeoutSec * uint(time.Second)) + rws.rewardsSnapshotTimeoutDuration = time.Duration(rewardsSnapshotTimeoutSec) * time.Second rws.rewardsSnapshotTimer = timer.NewTimer(rws.rewardsSnapshotTimeoutDuration) rws.rewardsSnapshotThresholdCh = make(chan struct{}) @@ -386,16 +385,16 @@ func (rws *RewardServer) saveRewardsSnapshotToDBJob() { for { select { case <-rws.rewardsSnapshotTimer.C: - rws.resetSnapshotTimerAndSaveRewardsSnapshotToDBAndResetTimer() + rws.resetSnapshotTimerAndSaveRewardsSnapshotToDB() case <-rws.rewardsSnapshotThresholdCh: - rws.resetSnapshotTimerAndSaveRewardsSnapshotToDBAndResetTimer() + rws.resetSnapshotTimerAndSaveRewardsSnapshotToDB() } } } -func (rws *RewardServer) resetSnapshotTimerAndSaveRewardsSnapshotToDBAndResetTimer() { +func (rws *RewardServer) resetSnapshotTimerAndSaveRewardsSnapshotToDB() { // We lock without defer because the DB is already locking itself - rws.lock.Lock() + rws.lock.RLock() rws.rewardsSnapshotTimer.Reset(rws.rewardsSnapshotTimeoutDuration) rewardEntities := []*RewardEntity{} @@ -413,7 +412,7 @@ func (rws *RewardServer) resetSnapshotTimerAndSaveRewardsSnapshotToDBAndResetTim } } } - rws.lock.Unlock() + rws.lock.RUnlock() if len(rewardEntities) == 0 { return } diff --git a/protocol/rpcprovider/rewardserver/reward_server_test.go b/protocol/rpcprovider/rewardserver/reward_server_test.go index 7a5cef77cb..abe573771d 100644 --- a/protocol/rpcprovider/rewardserver/reward_server_test.go +++ b/protocol/rpcprovider/rewardserver/reward_server_test.go @@ -262,7 +262,7 @@ func TestUpdateEpoch(t *testing.T) { } // Make sure that the rewards are flushed to DB - rws.resetSnapshotTimerAndSaveRewardsSnapshotToDBAndResetTimer() + rws.resetSnapshotTimerAndSaveRewardsSnapshotToDB() rws.UpdateEpoch(1) @@ -280,8 +280,8 @@ func TestUpdateEpoch(t *testing.T) { privKey, acc := sigs.GenerateFloatingKey() ctx := sdk.WrapSDKContext(sdk.NewContext(nil, tmproto.Header{}, false, nil)) + epoch := uint64(1) for _, sessionId := range []uint64{1, 2, 3, 4, 5} { - epoch := uint64(1) proof := common.BuildRelayRequestWithSession(ctx, "provider", []byte{}, sessionId, uint64(0), "spec", nil) proof.Epoch = int64(epoch) @@ -293,7 +293,7 @@ func TestUpdateEpoch(t *testing.T) { } // Make sure that the rewards are flushed to DB - rws.resetSnapshotTimerAndSaveRewardsSnapshotToDBAndResetTimer() + rws.resetSnapshotTimerAndSaveRewardsSnapshotToDB() stubRewardsTxSender.earliestBlockInMemory = 2 @@ -301,7 +301,6 @@ func TestUpdateEpoch(t *testing.T) { // ensure no payments have been sent require.Len(t, stubRewardsTxSender.sentPayments, 0) - rewards, err := db.FindAll() require.NoError(t, err) // ensure rewards have been deleted