diff --git a/protocol/chainlib/base_chain_parser.go b/protocol/chainlib/base_chain_parser.go index a396952717..896e9eb534 100644 --- a/protocol/chainlib/base_chain_parser.go +++ b/protocol/chainlib/base_chain_parser.go @@ -35,6 +35,15 @@ type InternalPath struct { Addon string } +type ErrorPattern struct { + TooNewPattern string + TooOldPattern string +} + +func (bep *ErrorPattern) IsEmpty() bool { + return bep.TooNewPattern == "" && bep.TooOldPattern == "" +} + type BaseChainParser struct { internalPaths map[string]InternalPath taggedApis map[spectypes.FUNCTION_TAG]TaggedContainer @@ -47,26 +56,41 @@ type BaseChainParser struct { allowedAddons map[string]bool extensionParser extensionslib.ExtensionParser active bool - blockErrorPattern string + blockErrorPattern ErrorPattern } -func (bcp *BaseChainParser) IdentifyBlockNodeError(message string) (isBlockError bool, blockHeight int64) { +// allows an optional kind to specify the type of error to identify +// LATEST is identifying too new error +// EARLIEST is identifying too old error +func (bcp *BaseChainParser) IdentifyBlockNodeError(message string, kind ...DataKind) (isBlockError bool, blockHeight int64) { bcp.rwLock.RLock() defer bcp.rwLock.RUnlock() - if bcp.blockErrorPattern == "" { + if bcp.blockErrorPattern.IsEmpty() { return false, 0 } - _, err := fmt.Sscanf(message, bcp.blockErrorPattern, &blockHeight) - if err != nil { - return false, 0 + if len(kind) == 0 || kind[0] == LATEST { + _, err := fmt.Sscanf(message, bcp.blockErrorPattern.TooNewPattern, &blockHeight) + if err == nil { + return true, blockHeight + } } - return true, blockHeight + if len(kind) == 0 || kind[0] == EARLIEST { + _, err := fmt.Sscanf(message, bcp.blockErrorPattern.TooOldPattern, &blockHeight) + if err == nil { + return true, blockHeight + } + } + return false, 0 } -func (bcp *BaseChainParser) SetBlockErrorPattern(pattern string) { +func (bcp *BaseChainParser) SetBlockErrorPattern(pattern string, kind DataKind) { bcp.rwLock.Lock() defer bcp.rwLock.Unlock() - bcp.blockErrorPattern = pattern + if kind == EARLIEST { + bcp.blockErrorPattern.TooOldPattern = pattern + } else if kind == LATEST { + bcp.blockErrorPattern.TooNewPattern = pattern + } } func (bcp *BaseChainParser) Activate() { diff --git a/protocol/chainlib/chainlib.go b/protocol/chainlib/chainlib.go index 77377c21c3..d68add5115 100644 --- a/protocol/chainlib/chainlib.go +++ b/protocol/chainlib/chainlib.go @@ -83,8 +83,8 @@ type ChainParser interface { ExtensionsParser() *extensionslib.ExtensionParser ExtractDataFromRequest(*http.Request) (url string, data string, connectionType string, metadata []pairingtypes.Metadata, err error) SetResponseFromRelayResult(*common.RelayResult) (*http.Response, error) - SetBlockErrorPattern(string) - IdentifyBlockNodeError(message string) (isBlockError bool, blockHeight int64) + SetBlockErrorPattern(string, DataKind) + IdentifyBlockNodeError(message string, kind ...DataKind) (isBlockError bool, blockHeight int64) } type ChainMessage interface { diff --git a/protocol/chainlib/common_test_utils.go b/protocol/chainlib/common_test_utils.go index 9b05bdd6c8..4fd15ae00c 100644 --- a/protocol/chainlib/common_test_utils.go +++ b/protocol/chainlib/common_test_utils.go @@ -171,7 +171,10 @@ func CreateChainLibMocks( endpoint.NodeUrls = append(endpoint.NodeUrls, common.NodeUrl{Url: lis.Addr().String(), Addons: addons}) allCombinations := generateCombinations(extensions) for _, extensionsList := range allCombinations { - endpoint.NodeUrls = append(endpoint.NodeUrls, common.NodeUrl{Url: lis.Addr().String(), Addons: append(addons, extensionsList...)}) + nodeUrl := common.NodeUrl{Url: lis.Addr().String(), Addons: append(addons, extensionsList...)} + // this is used to identify this header in the handler + nodeUrl.AuthConfig = common.AuthConfig{AuthHeaders: map[string]string{"Addon": strings.Join(extensionsList, ",")}} + endpoint.NodeUrls = append(endpoint.NodeUrls, nodeUrl) } go func() { service := myServiceImplementation{serverCallback: httpServerCallback} diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 56abee7175..cb0bb6e0ce 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -2189,3 +2189,137 @@ func TestArchiveProvidersRetryOnParsedHash(t *testing.T) { }) } } + +func TestUnconfiguredApiWithArchiveRequest(t *testing.T) { + playbook := []struct { + name string + apiInterface string + errorFormat string + managedToParseBlock bool + reqFormat string + }{ + { + name: "tendermint", + apiInterface: spectypes.APIInterfaceTendermintRPC, + errorFormat: `{"jsonrpc":"2.0","id":-1,"error":{"code":-32603,"message":"Internal error","data":"height %d must be less than or equal to the current blockchain height 1837105"}}`, + managedToParseBlock: true, + reqFormat: `{"jsonrpc":"2.0","method":"block_undefined","params":[123],"id":1}`, + }, + { + name: "tendermint", + apiInterface: spectypes.APIInterfaceJsonRPC, + errorFormat: `{"jsonrpc":"2.0","id":-1,"result":null}}`, + managedToParseBlock: false, + reqFormat: `{"jsonrpc":"2.0","method":"block_undefined","params":[123],"id":1}`, + }, + } + for _, play := range playbook { + t.Run("unconfiguredApiWithArchiveRequest", func(t *testing.T) { + ctx := context.Background() + // can be any spec and api interface + specId := "LAV1" + apiInterface := play.apiInterface + epoch := uint64(100) + lavaChainID := "lava" + numProviders := 2 + + consumerListenAddress := addressGen.GetAddress() + + type providerData struct { + account sigs.Account + endpoint *lavasession.RPCProviderEndpoint + server *rpcprovider.RPCProviderServer + replySetter *ReplySetter + mockChainFetcher *MockChainFetcher + mockReliabilityManager *MockReliabilityManager + } + providers := []providerData{} + + for i := 0; i < numProviders; i++ { + account := sigs.GenerateDeterministicFloatingKey(randomizer) + providerDataI := providerData{account: account} + providers = append(providers, providerDataI) + } + consumerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + archiveCalled := make(chan bool, 10) + for i := 0; i < numProviders; i++ { + ctx := context.Background() + providerDataI := providers[i] + listenAddress := addressGen.GetAddress() + addons := []string(nil) + if i == 0 { + // one provider will have archive + addons = []string{"archive"} + } + + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: addons, + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, rpcProviderOptions) + providers[i].replySetter.handler = func(req []byte, header http.Header) (data []byte, status int) { + if bytes.Equal(req, []byte(`{"jsonrpc":"2.0","method":"block","params":[9223372036854775807],"id":1}`)) { + return []byte(fmt.Sprintf(play.errorFormat, 9223372036854775807)), 500 + } + if bytes.Equal(req, []byte(`{"jsonrpc":"2.0","method":"block","params":[9223372036854775806],"id":1}`)) { + return []byte(fmt.Sprintf(play.errorFormat, 9223372036854775806)), 500 + } + for key, val := range header { + if key == "Addon" { + if val[0] == "archive" { + archiveCalled <- true + } + } + } + return []byte(`{"jsonrpc":"2.0","id":-1,"error":{"code":-32603,"message":"Internal error","data":"height 1 must be less than or equal to the current blockchain height 1837105"}}`), 500 + } + } + + pairingList := map[uint64]*lavasession.ConsumerSessionsWithProvider{} + for i := 0; i < numProviders; i++ { + extensions := map[string]struct{}{} + if i == 0 { + extensions = map[string]struct{}{"archive": {}} + } + pairingList[uint64(i)] = &lavasession.ConsumerSessionsWithProvider{ + PublicLavaAddress: providers[i].account.Addr.String(), + + Endpoints: []*lavasession.Endpoint{ + { + NetworkAddress: providers[i].endpoint.NetworkAddress.Address, + Enabled: true, + Geolocation: 1, + Extensions: extensions, + }, + }, + Sessions: map[int64]*lavasession.SingleConsumerSession{}, + MaxComputeUnits: 10000, + UsedComputeUnits: 0, + PairingEpoch: epoch, + } + } + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcConsumerOut := createRpcConsumer(t, ctx, rpcConsumerOptions) + require.NotNil(t, rpcConsumerOut.rpcConsumerServer) + success := rpcConsumerOut.rpcConsumerServer.ExtractNodeData(ctx) + require.Equal(t, play.managedToParseBlock, success) + + }) + } +} diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index eb13b70abe..fca8c44e07 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -179,7 +179,7 @@ func (rpccs *RPCConsumerServer) sendCraftedRelaysWrapper(initialRelays bool) (bo if success { rpccs.initialized.Store(true) } - go rpccs.ExtractNodeData(context.Background()) + go rpccs.tryExtractNodeData(context.Background()) return success, err } @@ -1670,11 +1670,13 @@ const ( RefreshInterval = 24 * time.Hour // Y time for refreshing on success ) -func (rpccs *RPCConsumerServer) ExtractNodeData(ctx context.Context) { +func (rpccs *RPCConsumerServer) tryExtractNodeData(ctx context.Context) { for { - success := rpccs.tryExtractNodeData(ctx) + // identify the pattern for node errors with node too new + successTooNew := rpccs.ExtractNodeData(ctx, chainlib.LATEST) + successTooOld := rpccs.ExtractNodeData(ctx, chainlib.EARLIEST) var timer *time.Timer - if success { + if successTooNew && successTooOld { timer = time.NewTimer(RefreshInterval) } else { timer = time.NewTimer(RetryInterval) @@ -1689,7 +1691,7 @@ func (rpccs *RPCConsumerServer) ExtractNodeData(ctx context.Context) { } } -func (rpccs *RPCConsumerServer) tryExtractNodeData(ctx context.Context) bool { +func (rpccs *RPCConsumerServer) ExtractNodeData(ctx context.Context, kind chainlib.DataKind) bool { endpoint := &lavasession.RPCProviderEndpoint{ChainID: rpccs.listenEndpoint.ChainID, ApiInterface: rpccs.listenEndpoint.ApiInterface, NodeUrls: []common.NodeUrl{{ Url: "Internal", }}} @@ -1700,29 +1702,36 @@ func (rpccs *RPCConsumerServer) tryExtractNodeData(ctx context.Context) bool { Cache: nil, }) // we want a block that will surely fail - _, responseErrorMessage, format, err := chainFetcher.FetchBlock(ctx, math.MaxInt64) + fetchBlock := int64(0) + if kind == chainlib.LATEST { + fetchBlock = math.MaxInt64 + } else if kind == chainlib.EARLIEST { + // TODO: make it earliest and make sure it fails + } + _, responseErrorMessage, format, err := chainFetcher.FetchBlock(ctx, fetchBlock) if err != nil { utils.LavaFormatError("[-] failed sending a fault block fetch to parse errors", err) return false } if responseErrorMessage != "" { blockError := "" - formatted := fmt.Sprintf(format, math.MaxInt64) + formatted := fmt.Sprintf(format, fetchBlock) re := regexp.MustCompile(formatted) blockError = re.ReplaceAllString(responseErrorMessage, format) if blockError == responseErrorMessage { // this shouldn't happen if the block exists in the response return false } - _, responseErrorMessage, _, err = chainFetcher.FetchBlock(ctx, math.MaxInt64-1) + fetchBlock = fetchBlock - 1 + _, responseErrorMessage, _, err = chainFetcher.FetchBlock(ctx, fetchBlock) if err != nil { utils.LavaFormatError("[-] failed sending a fault block fetch to parse errors maxInt-1", err) return false } - formatted = fmt.Sprintf(blockError, math.MaxInt64-1) + formatted = fmt.Sprintf(blockError, math.MaxInt64) if formatted == responseErrorMessage { utils.LavaFormatInfo("[+] identified pattern for node errors, setting in chain parser", utils.LogAttr("pattern", blockError)) - rpccs.chainParser.SetBlockErrorPattern(blockError) + rpccs.chainParser.SetBlockErrorPattern(blockError, chainlib.LATEST) return true } }