Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
omerlavanet committed Dec 15, 2024
1 parent 7f93a1d commit 0ce8ac3
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 22 deletions.
42 changes: 33 additions & 9 deletions protocol/chainlib/base_chain_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ type InternalPath struct {
Addon string
}

type ErrorPattern struct {
TooNewPattern string
TooOldPattern string
}

func (bep *ErrorPattern) IsEmpty() bool {
return bep.TooNewPattern == "" && bep.TooOldPattern == ""
}

type BaseChainParser struct {
internalPaths map[string]InternalPath
taggedApis map[spectypes.FUNCTION_TAG]TaggedContainer
Expand All @@ -47,26 +56,41 @@ type BaseChainParser struct {
allowedAddons map[string]bool
extensionParser extensionslib.ExtensionParser
active bool
blockErrorPattern string
blockErrorPattern ErrorPattern
}

func (bcp *BaseChainParser) IdentifyBlockNodeError(message string) (isBlockError bool, blockHeight int64) {
// allows an optional kind to specify the type of error to identify
// LATEST is identifying too new error
// EARLIEST is identifying too old error
func (bcp *BaseChainParser) IdentifyBlockNodeError(message string, kind ...DataKind) (isBlockError bool, blockHeight int64) {
bcp.rwLock.RLock()
defer bcp.rwLock.RUnlock()
if bcp.blockErrorPattern == "" {
if bcp.blockErrorPattern.IsEmpty() {
return false, 0
}
_, err := fmt.Sscanf(message, bcp.blockErrorPattern, &blockHeight)
if err != nil {
return false, 0
if len(kind) == 0 || kind[0] == LATEST {
_, err := fmt.Sscanf(message, bcp.blockErrorPattern.TooNewPattern, &blockHeight)
if err == nil {
return true, blockHeight
}
}
return true, blockHeight
if len(kind) == 0 || kind[0] == EARLIEST {
_, err := fmt.Sscanf(message, bcp.blockErrorPattern.TooOldPattern, &blockHeight)
if err == nil {
return true, blockHeight
}
}
return false, 0
}

func (bcp *BaseChainParser) SetBlockErrorPattern(pattern string) {
func (bcp *BaseChainParser) SetBlockErrorPattern(pattern string, kind DataKind) {
bcp.rwLock.Lock()
defer bcp.rwLock.Unlock()
bcp.blockErrorPattern = pattern
if kind == EARLIEST {
bcp.blockErrorPattern.TooOldPattern = pattern
} else if kind == LATEST {
bcp.blockErrorPattern.TooNewPattern = pattern
}
}

func (bcp *BaseChainParser) Activate() {
Expand Down
4 changes: 2 additions & 2 deletions protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ type ChainParser interface {
ExtensionsParser() *extensionslib.ExtensionParser
ExtractDataFromRequest(*http.Request) (url string, data string, connectionType string, metadata []pairingtypes.Metadata, err error)
SetResponseFromRelayResult(*common.RelayResult) (*http.Response, error)
SetBlockErrorPattern(string)
IdentifyBlockNodeError(message string) (isBlockError bool, blockHeight int64)
SetBlockErrorPattern(string, DataKind)
IdentifyBlockNodeError(message string, kind ...DataKind) (isBlockError bool, blockHeight int64)
}

type ChainMessage interface {
Expand Down
5 changes: 4 additions & 1 deletion protocol/chainlib/common_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ func CreateChainLibMocks(
endpoint.NodeUrls = append(endpoint.NodeUrls, common.NodeUrl{Url: lis.Addr().String(), Addons: addons})
allCombinations := generateCombinations(extensions)
for _, extensionsList := range allCombinations {
endpoint.NodeUrls = append(endpoint.NodeUrls, common.NodeUrl{Url: lis.Addr().String(), Addons: append(addons, extensionsList...)})
nodeUrl := common.NodeUrl{Url: lis.Addr().String(), Addons: append(addons, extensionsList...)}
// this is used to identify this header in the handler
nodeUrl.AuthConfig = common.AuthConfig{AuthHeaders: map[string]string{"Addon": strings.Join(extensionsList, ",")}}
endpoint.NodeUrls = append(endpoint.NodeUrls, nodeUrl)
}
go func() {
service := myServiceImplementation{serverCallback: httpServerCallback}
Expand Down
134 changes: 134 additions & 0 deletions protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2189,3 +2189,137 @@ func TestArchiveProvidersRetryOnParsedHash(t *testing.T) {
})
}
}

func TestUnconfiguredApiWithArchiveRequest(t *testing.T) {
playbook := []struct {
name string
apiInterface string
errorFormat string
managedToParseBlock bool
reqFormat string
}{
{
name: "tendermint",
apiInterface: spectypes.APIInterfaceTendermintRPC,
errorFormat: `{"jsonrpc":"2.0","id":-1,"error":{"code":-32603,"message":"Internal error","data":"height %d must be less than or equal to the current blockchain height 1837105"}}`,
managedToParseBlock: true,
reqFormat: `{"jsonrpc":"2.0","method":"block_undefined","params":[123],"id":1}`,
},
{
name: "tendermint",
apiInterface: spectypes.APIInterfaceJsonRPC,
errorFormat: `{"jsonrpc":"2.0","id":-1,"result":null}}`,
managedToParseBlock: false,
reqFormat: `{"jsonrpc":"2.0","method":"block_undefined","params":[123],"id":1}`,
},
}
for _, play := range playbook {
t.Run("unconfiguredApiWithArchiveRequest", func(t *testing.T) {
ctx := context.Background()
// can be any spec and api interface
specId := "LAV1"
apiInterface := play.apiInterface
epoch := uint64(100)
lavaChainID := "lava"
numProviders := 2

consumerListenAddress := addressGen.GetAddress()

type providerData struct {
account sigs.Account
endpoint *lavasession.RPCProviderEndpoint
server *rpcprovider.RPCProviderServer
replySetter *ReplySetter
mockChainFetcher *MockChainFetcher
mockReliabilityManager *MockReliabilityManager
}
providers := []providerData{}

for i := 0; i < numProviders; i++ {
account := sigs.GenerateDeterministicFloatingKey(randomizer)
providerDataI := providerData{account: account}
providers = append(providers, providerDataI)
}
consumerAccount := sigs.GenerateDeterministicFloatingKey(randomizer)
archiveCalled := make(chan bool, 10)
for i := 0; i < numProviders; i++ {
ctx := context.Background()
providerDataI := providers[i]
listenAddress := addressGen.GetAddress()
addons := []string(nil)
if i == 0 {
// one provider will have archive
addons = []string{"archive"}
}

rpcProviderOptions := rpcProviderOptions{
consumerAddress: consumerAccount.Addr.String(),
specId: specId,
apiInterface: apiInterface,
listenAddress: listenAddress,
account: providerDataI.account,
lavaChainID: lavaChainID,
addons: addons,
providerUniqueId: fmt.Sprintf("provider%d", i),
}
providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, rpcProviderOptions)
providers[i].replySetter.handler = func(req []byte, header http.Header) (data []byte, status int) {
if bytes.Equal(req, []byte(`{"jsonrpc":"2.0","method":"block","params":[9223372036854775807],"id":1}`)) {
return []byte(fmt.Sprintf(play.errorFormat, 9223372036854775807)), 500
}
if bytes.Equal(req, []byte(`{"jsonrpc":"2.0","method":"block","params":[9223372036854775806],"id":1}`)) {
return []byte(fmt.Sprintf(play.errorFormat, 9223372036854775806)), 500
}
for key, val := range header {
if key == "Addon" {
if val[0] == "archive" {
archiveCalled <- true
}
}
}
return []byte(`{"jsonrpc":"2.0","id":-1,"error":{"code":-32603,"message":"Internal error","data":"height 1 must be less than or equal to the current blockchain height 1837105"}}`), 500
}
}

pairingList := map[uint64]*lavasession.ConsumerSessionsWithProvider{}
for i := 0; i < numProviders; i++ {
extensions := map[string]struct{}{}
if i == 0 {
extensions = map[string]struct{}{"archive": {}}
}
pairingList[uint64(i)] = &lavasession.ConsumerSessionsWithProvider{
PublicLavaAddress: providers[i].account.Addr.String(),

Endpoints: []*lavasession.Endpoint{
{
NetworkAddress: providers[i].endpoint.NetworkAddress.Address,
Enabled: true,
Geolocation: 1,
Extensions: extensions,
},
},
Sessions: map[int64]*lavasession.SingleConsumerSession{},
MaxComputeUnits: 10000,
UsedComputeUnits: 0,
PairingEpoch: epoch,
}
}

rpcConsumerOptions := rpcConsumerOptions{
specId: specId,
apiInterface: apiInterface,
account: consumerAccount,
consumerListenAddress: consumerListenAddress,
epoch: epoch,
pairingList: pairingList,
requiredResponses: 1,
lavaChainID: lavaChainID,
}
rpcConsumerOut := createRpcConsumer(t, ctx, rpcConsumerOptions)
require.NotNil(t, rpcConsumerOut.rpcConsumerServer)
success := rpcConsumerOut.rpcConsumerServer.ExtractNodeData(ctx)

Check failure on line 2320 in protocol/integration/protocol_test.go

View workflow job for this annotation

GitHub Actions / test-protocol

not enough arguments in call to rpcConsumerOut.rpcConsumerServer.ExtractNodeData
require.Equal(t, play.managedToParseBlock, success)

})
}
}
29 changes: 19 additions & 10 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (rpccs *RPCConsumerServer) sendCraftedRelaysWrapper(initialRelays bool) (bo
if success {
rpccs.initialized.Store(true)
}
go rpccs.ExtractNodeData(context.Background())
go rpccs.tryExtractNodeData(context.Background())
return success, err
}

Expand Down Expand Up @@ -1670,11 +1670,13 @@ const (
RefreshInterval = 24 * time.Hour // Y time for refreshing on success
)

func (rpccs *RPCConsumerServer) ExtractNodeData(ctx context.Context) {
func (rpccs *RPCConsumerServer) tryExtractNodeData(ctx context.Context) {
for {
success := rpccs.tryExtractNodeData(ctx)
// identify the pattern for node errors with node too new
successTooNew := rpccs.ExtractNodeData(ctx, chainlib.LATEST)
successTooOld := rpccs.ExtractNodeData(ctx, chainlib.EARLIEST)
var timer *time.Timer
if success {
if successTooNew && successTooOld {
timer = time.NewTimer(RefreshInterval)
} else {
timer = time.NewTimer(RetryInterval)
Expand All @@ -1689,7 +1691,7 @@ func (rpccs *RPCConsumerServer) ExtractNodeData(ctx context.Context) {
}
}

func (rpccs *RPCConsumerServer) tryExtractNodeData(ctx context.Context) bool {
func (rpccs *RPCConsumerServer) ExtractNodeData(ctx context.Context, kind chainlib.DataKind) bool {
endpoint := &lavasession.RPCProviderEndpoint{ChainID: rpccs.listenEndpoint.ChainID, ApiInterface: rpccs.listenEndpoint.ApiInterface, NodeUrls: []common.NodeUrl{{
Url: "Internal",
}}}
Expand All @@ -1700,29 +1702,36 @@ func (rpccs *RPCConsumerServer) tryExtractNodeData(ctx context.Context) bool {
Cache: nil,
})
// we want a block that will surely fail
_, responseErrorMessage, format, err := chainFetcher.FetchBlock(ctx, math.MaxInt64)
fetchBlock := int64(0)
if kind == chainlib.LATEST {
fetchBlock = math.MaxInt64
} else if kind == chainlib.EARLIEST {
// TODO: make it earliest and make sure it fails
}
_, responseErrorMessage, format, err := chainFetcher.FetchBlock(ctx, fetchBlock)
if err != nil {
utils.LavaFormatError("[-] failed sending a fault block fetch to parse errors", err)
return false
}
if responseErrorMessage != "" {
blockError := ""
formatted := fmt.Sprintf(format, math.MaxInt64)
formatted := fmt.Sprintf(format, fetchBlock)
re := regexp.MustCompile(formatted)
blockError = re.ReplaceAllString(responseErrorMessage, format)
if blockError == responseErrorMessage {
// this shouldn't happen if the block exists in the response
return false
}
_, responseErrorMessage, _, err = chainFetcher.FetchBlock(ctx, math.MaxInt64-1)
fetchBlock = fetchBlock - 1
_, responseErrorMessage, _, err = chainFetcher.FetchBlock(ctx, fetchBlock)
if err != nil {
utils.LavaFormatError("[-] failed sending a fault block fetch to parse errors maxInt-1", err)
return false
}
formatted = fmt.Sprintf(blockError, math.MaxInt64-1)
formatted = fmt.Sprintf(blockError, math.MaxInt64)
if formatted == responseErrorMessage {
utils.LavaFormatInfo("[+] identified pattern for node errors, setting in chain parser", utils.LogAttr("pattern", blockError))
rpccs.chainParser.SetBlockErrorPattern(blockError)
rpccs.chainParser.SetBlockErrorPattern(blockError, chainlib.LATEST)
return true
}
}
Expand Down

0 comments on commit 0ce8ac3

Please sign in to comment.