Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT-improve-timeout-handling #1249

Merged
merged 63 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
86379cd
refactored the rpcconsumer send, WIP
omerlavanet Feb 22, 2024
c0bcf0e
added TODO
omerlavanet Feb 22, 2024
130eeeb
progress, WIP
omerlavanet Feb 26, 2024
348e5fe
wip
omerlavanet Feb 26, 2024
2016497
wip
omerlavanet Feb 26, 2024
0342af0
relay_processor done
omerlavanet Feb 28, 2024
516c327
change timeouts across the protocol
omerlavanet Feb 28, 2024
a66beef
handle non deterministic apis in the quorum
omerlavanet Feb 28, 2024
353ca05
lint
omerlavanet Feb 28, 2024
5d15525
fix tests
omerlavanet Feb 28, 2024
154e441
fix deadlock
omerlavanet Feb 28, 2024
6398a5f
fix nil getSessions
omerlavanet Feb 28, 2024
fcd51d6
increase hanging api time
omerlavanet Feb 28, 2024
9c2b59c
add unitest for usedProviders
omerlavanet Feb 29, 2024
1cd0136
added tests + refactor for simplicity
omerlavanet Feb 29, 2024
b38652b
added more tests
omerlavanet Feb 29, 2024
628ab5c
added more testing
omerlavanet Mar 1, 2024
f393b29
Merge remote-tracking branch 'origin/main' into PRT-improve-timout-ha…
omerlavanet Mar 1, 2024
05a9be4
lint
omerlavanet Mar 1, 2024
fbc2242
add unitests
omerlavanet Mar 3, 2024
c243a39
on init do not fail init relays on pairing
omerlavanet Mar 3, 2024
60652ab
change csm get provider error to warning
omerlavanet Mar 3, 2024
617ea4a
added errors when failing relays
omerlavanet Mar 3, 2024
47fb46b
remove data reliability errors on latest block requests
omerlavanet Mar 3, 2024
60abc95
disabled data reliability latest also on node errors
omerlavanet Mar 3, 2024
488da3a
add more information on error during quorum
omerlavanet Mar 4, 2024
2927739
allow empty results for queries without an error
omerlavanet Mar 4, 2024
75b74da
add missing consumer consistency
omerlavanet Mar 4, 2024
24d06f3
empty response quorum
omerlavanet Mar 4, 2024
01b1658
Merge remote-tracking branch 'origin/main' into PRT-improve-timout-ha…
omerlavanet Mar 4, 2024
e6fbeed
add more info on provider error
omerlavanet Mar 4, 2024
f84dcff
Merge remote-tracking branch 'origin/main' into PRT-improve-timout-ha…
omerlavanet Mar 6, 2024
cec200f
added basic consumer unitest
omerlavanet Mar 6, 2024
bf735fe
added a check for consumer to be up during unitest
omerlavanet Mar 6, 2024
4d41b08
refactor code for reuse in unitest
omerlavanet Mar 6, 2024
a50600a
added provider and consumer unitests
omerlavanet Mar 7, 2024
6313c30
added rpcproviderServer creation to unitest
omerlavanet Mar 7, 2024
69b9b3e
added provider dialing
omerlavanet Mar 7, 2024
a551a32
fixed bug in rpcconsumer, finished unitest
omerlavanet Mar 7, 2024
1ad183d
fix panic in relayErrors
omerlavanet Mar 11, 2024
650ab23
sanitize unique errors get
omerlavanet Mar 11, 2024
8d0ff06
prevent port conflict
omerlavanet Mar 11, 2024
511d2be
rename
ranlavanet Mar 11, 2024
0e225a3
fix addUsed with len(0) resetting the wait
omerlavanet Mar 11, 2024
888db6a
Merge branch 'PRT-improve-timout-handling' of github.com:lavanet/lava…
omerlavanet Mar 11, 2024
b0f001a
simplify port selection in unitests
omerlavanet Mar 11, 2024
44be705
add timeouts and fails to relays in unitest
omerlavanet Mar 11, 2024
379f79b
fix tests
omerlavanet Mar 13, 2024
a529191
added timeout to the scenarios
omerlavanet Mar 13, 2024
0ebfac9
lint
omerlavanet Mar 13, 2024
5abeea6
add a new test for tx sending
omerlavanet Mar 16, 2024
ad0ab82
Merge branch 'main' into PRT-improve-timout-handling
ranlavanet Mar 19, 2024
1b83426
Merge branch 'PRT-improve-timout-handling' of github.com:lavanet/lava…
ranlavanet Mar 19, 2024
e881ef5
sort logs better :)
ranlavanet Mar 19, 2024
599c92f
fix Caching error spam when missing cache
ranlavanet Mar 20, 2024
f1b3216
adding nil protection for used Providers
ranlavanet Mar 21, 2024
011ff99
set default 0
ranlavanet Mar 21, 2024
0298067
adding error logs and nil protection.
ranlavanet Mar 21, 2024
582a1dd
adding some checks to avoid nil deref
ranlavanet Mar 21, 2024
6fb97af
allow error for connection reset by peer when in emergency mode.. as …
ranlavanet Mar 21, 2024
16c3752
adding debugging information for next time ts fails.
ranlavanet Mar 21, 2024
19e2082
es lint..
ranlavanet Mar 21, 2024
d4f0801
eslint fix
ranlavanet Mar 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions ecosystem/cache/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,17 @@ func (s *RelayerCacheServer) GetRelay(ctx context.Context, relayCacheGet *pairin
}()
// wait for all reads to complete before moving forward
waitGroup.Wait()

// validate that the response seen block is larger or equal to our expectations.
if cacheReply.SeenBlock < slices.Min([]int64{relayCacheGet.SeenBlock, relayCacheGet.RequestedBlock}) { // TODO unitest this.
// Error, our reply seen block is not larger than our expectations, meaning we got an old response
// this can happen only in the case relayCacheGet.SeenBlock < relayCacheGet.RequestedBlock
// by setting the err variable we will get a cache miss, and the relay will continue to the node.
err = utils.LavaFormatDebug("reply seen block is smaller than our expectations",
utils.LogAttr("cacheReply.SeenBlock", cacheReply.SeenBlock),
utils.LogAttr("seenBlock", relayCacheGet.SeenBlock),
)
if err == nil { // in case we got a hit validate seen block of the reply.
// validate that the response seen block is larger or equal to our expectations.
if cacheReply.SeenBlock < slices.Min([]int64{relayCacheGet.SeenBlock, relayCacheGet.RequestedBlock}) { // TODO unitest this.
// Error, our reply seen block is not larger than our expectations, meaning we got an old response
// this can happen only in the case relayCacheGet.SeenBlock < relayCacheGet.RequestedBlock
// by setting the err variable we will get a cache miss, and the relay will continue to the node.
err = utils.LavaFormatDebug("reply seen block is smaller than our expectations",
utils.LogAttr("cacheReply.SeenBlock", cacheReply.SeenBlock),
utils.LogAttr("seenBlock", relayCacheGet.SeenBlock),
)
}
}
// set seen block.
if relayCacheGet.SeenBlock > cacheReply.SeenBlock {
Expand Down
12 changes: 6 additions & 6 deletions protocol/chainlib/chain_message_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,25 @@ package chainlib
import "github.com/lavanet/lava/protocol/common"

func ShouldSendToAllProviders(chainMessage ChainMessage) bool {
return chainMessage.GetApi().Category.Stateful == common.CONSISTENCY_SELECT_ALLPROVIDERS
return chainMessage.GetApi().Category.Stateful == common.CONSISTENCY_SELECT_ALL_PROVIDERS
}

func GetAddon(chainMessage ChainMessage) string {
func GetAddon(chainMessage ChainMessageForSend) string {
return chainMessage.GetApiCollection().CollectionData.AddOn
}

func IsSubscription(chainMessage ChainMessage) bool {
func IsSubscription(chainMessage ChainMessageForSend) bool {
return chainMessage.GetApi().Category.Subscription
}

func IsHangingApi(chainMessage ChainMessage) bool {
func IsHangingApi(chainMessage ChainMessageForSend) bool {
return chainMessage.GetApi().Category.HangingApi
}

func GetComputeUnits(chainMessage ChainMessage) uint64 {
func GetComputeUnits(chainMessage ChainMessageForSend) uint64 {
return chainMessage.GetApi().ComputeUnits
}

func GetStateful(chainMessage ChainMessage) uint32 {
func GetStateful(chainMessage ChainMessageForSend) uint32 {
return chainMessage.GetApi().Category.Stateful
}
1 change: 1 addition & 0 deletions protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type ChainMessage interface {
}

type ChainMessageForSend interface {
TimeoutOverride(...time.Duration) time.Duration
GetApi() *spectypes.Api
GetRPCMessage() rpcInterfaceMessages.GenericMessage
GetApiCollection() *spectypes.ApiCollection
Expand Down
22 changes: 19 additions & 3 deletions protocol/chainlib/common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chainlib

import (
"context"
"encoding/json"
"fmt"
"net"
Expand Down Expand Up @@ -86,6 +87,13 @@ func (bcp *BaseChainProxy) GetChainProxyInformation() (common.NodeUrl, string) {
return bcp.NodeUrl, bcp.ChainID
}

func (bcp *BaseChainProxy) CapTimeoutForSend(ctx context.Context, chainMessage ChainMessageForSend) (context.Context, context.CancelFunc) {
relayTimeout := GetRelayTimeout(chainMessage, bcp.averageBlockTime)
processingTimeout := common.GetTimeoutForProcessing(relayTimeout, GetTimeoutInfo(chainMessage))
connectCtx, cancel := bcp.NodeUrl.LowerContextTimeout(ctx, processingTimeout)
return connectCtx, cancel
}

func extractDappIDFromFiberContext(c *fiber.Ctx) (dappID string) {
// Read the dappID from the headers
dappID = c.Get("dapp-id")
Expand Down Expand Up @@ -299,21 +307,21 @@ func CompareRequestedBlockInBatch(firstRequestedBlock int64, second int64) (late
return returnBigger(firstRequestedBlock, second)
}

func GetRelayTimeout(chainMessage ChainMessage, chainParser ChainParser, timeouts int) time.Duration {
func GetRelayTimeout(chainMessage ChainMessageForSend, averageBlockTime time.Duration) time.Duration {
if chainMessage.TimeoutOverride() != 0 {
return chainMessage.TimeoutOverride()
}
// Calculate extra RelayTimeout
extraRelayTimeout := time.Duration(0)
if IsHangingApi(chainMessage) {
_, extraRelayTimeout, _, _ = chainParser.ChainBlockStats()
extraRelayTimeout = averageBlockTime * 2
}
relayTimeAddition := common.GetTimePerCu(GetComputeUnits(chainMessage))
if chainMessage.GetApi().TimeoutMs > 0 {
relayTimeAddition = time.Millisecond * time.Duration(chainMessage.GetApi().TimeoutMs)
}
// Set relay timout, increase it every time we fail a relay on timeout
return extraRelayTimeout + time.Duration(timeouts+1)*relayTimeAddition + common.AverageWorldLatency
return extraRelayTimeout + relayTimeAddition + common.AverageWorldLatency
}

// setup a common preflight and cors configuration allowing wild cards and preflight caching.
Expand Down Expand Up @@ -416,3 +424,11 @@ func (rd *RefererData) SendReferer(refererMatchString string, chainId string, ms
rd.ReferrerClient.AppendReferrer(metrics.NewReferrerRequest(refererMatchString, chainId, msg, referer, origin, userAgent))
return nil
}

func GetTimeoutInfo(chainMessage ChainMessageForSend) common.TimeoutInfo {
return common.TimeoutInfo{
CU: chainMessage.GetApi().ComputeUnits,
Hanging: IsHangingApi(chainMessage),
Stateful: GetStateful(chainMessage),
}
}
18 changes: 9 additions & 9 deletions protocol/chainlib/common_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,19 @@ func generateCombinations(arr []string) [][]string {

// generates a chain parser, a chain fetcher messages based on it
// apiInterface can either be an ApiInterface string as in spectypes.ApiInterfaceXXX or a number for an index in the apiCollections
func CreateChainLibMocks(ctx context.Context, specIndex string, apiInterface string, serverCallback http.HandlerFunc, getToTopMostPath string, services []string) (cpar ChainParser, crout ChainRouter, cfetc chaintracker.ChainFetcher, closeServer func(), errRet error) {
func CreateChainLibMocks(ctx context.Context, specIndex string, apiInterface string, serverCallback http.HandlerFunc, getToTopMostPath string, services []string) (cpar ChainParser, crout ChainRouter, cfetc chaintracker.ChainFetcher, closeServer func(), endpointRet *lavasession.RPCProviderEndpoint, errRet error) {
closeServer = nil
spec, err := keepertest.GetASpec(specIndex, getToTopMostPath, nil, nil)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
index, err := strconv.Atoi(apiInterface)
if err == nil && index < len(spec.ApiCollections) {
apiInterface = spec.ApiCollections[index].CollectionData.ApiInterface
}
chainParser, err := NewChainParser(apiInterface)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
var chainRouter ChainRouter
chainParser.SetSpec(spec)
Expand All @@ -111,15 +111,15 @@ func CreateChainLibMocks(ctx context.Context, specIndex string, apiInterface str
}
addons, extensions, err := chainParser.SeparateAddonsExtensions(services)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}

if apiInterface == spectypes.APIInterfaceGrpc {
// Start a new gRPC server using the buffered connection
grpcServer := grpc.NewServer()
lis, err := net.Listen("tcp", "localhost:0")
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, nil, nil, closeServer, err
return nil, nil, nil, closeServer, nil, err
}
endpoint.NodeUrls = append(endpoint.NodeUrls, common.NodeUrl{Url: lis.Addr().String(), Addons: addons})
allCombinations := generateCombinations(extensions)
Expand All @@ -138,19 +138,19 @@ func CreateChainLibMocks(ctx context.Context, specIndex string, apiInterface str
time.Sleep(10 * time.Millisecond)
chainRouter, err = GetChainRouter(ctx, 1, endpoint, chainParser)
if err != nil {
return nil, nil, nil, closeServer, err
return nil, nil, nil, closeServer, nil, err
}
} else {
mockServer := httptest.NewServer(serverCallback)
closeServer = mockServer.Close
endpoint.NodeUrls = append(endpoint.NodeUrls, common.NodeUrl{Url: mockServer.URL, Addons: addons})
chainRouter, err = GetChainRouter(ctx, 1, endpoint, chainParser)
if err != nil {
return nil, nil, nil, closeServer, err
return nil, nil, nil, closeServer, nil, err
}
}
chainFetcher := NewChainFetcher(ctx, &ChainFetcherOptions{chainRouter, chainParser, endpoint, nil})
return chainParser, chainRouter, chainFetcher, closeServer, err
return chainParser, chainRouter, chainFetcher, closeServer, endpoint, err
}

type TestStruct struct {
Expand Down
6 changes: 3 additions & 3 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (apil *GrpcChainListener) Serve(ctx context.Context, cmdFlags common.Consum
grpcHeaders := convertToMetadataMapOfSlices(metadataValues)
utils.LavaFormatDebug("in <<< GRPC Relay ",
utils.LogAttr("GUID", ctx),
utils.LogAttr("method", method),
utils.LogAttr("_method", method),
utils.LogAttr("headers", grpcHeaders),
)
metricsData := metrics.NewRelayAnalytics(dappID, apil.endpoint.ChainID, apiInterface)
Expand Down Expand Up @@ -520,14 +520,14 @@ func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{},
}
if debug {
utils.LavaFormatDebug("provider sending node message",
utils.Attribute{Key: "method", Value: nodeMessage.Path},
utils.Attribute{Key: "_method", Value: nodeMessage.Path},
utils.Attribute{Key: "headers", Value: metadataMap},
utils.Attribute{Key: "apiInterface", Value: "grpc"},
)
}
var respHeaders metadata.MD
response := msgFactory.NewMessage(methodDescriptor.GetOutputType())
connectCtx, cancel := cp.NodeUrl.LowerContextTimeout(ctx, chainMessage, cp.averageBlockTime)
connectCtx, cancel := cp.CapTimeoutForSend(ctx, chainMessage)
defer cancel()
err = conn.Invoke(connectCtx, "/"+nodeMessage.Path, msg, response, grpc.Header(&respHeaders))
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions protocol/chainlib/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestGrpcChainProxy(t *testing.T) {
// Handle the incoming request and provide the desired response
wasCalled = true
})
chainParser, chainProxy, chainFetcher, closeServer, err := CreateChainLibMocks(ctx, "LAV1", spectypes.APIInterfaceGrpc, serverHandle, "../../", nil)
chainParser, chainProxy, chainFetcher, closeServer, _, err := CreateChainLibMocks(ctx, "LAV1", spectypes.APIInterfaceGrpc, serverHandle, "../../", nil)
require.NoError(t, err)
require.NotNil(t, chainParser)
require.NotNil(t, chainProxy)
Expand All @@ -169,7 +169,7 @@ func TestParsingRequestedBlocksHeadersGrpc(t *testing.T) {
w.WriteHeader(244591)
}
})
chainParser, chainRouter, _, closeServer, err := CreateChainLibMocks(ctx, "LAV1", spectypes.APIInterfaceGrpc, serverHandler, "../../", nil)
chainParser, chainRouter, _, closeServer, _, err := CreateChainLibMocks(ctx, "LAV1", spectypes.APIInterfaceGrpc, serverHandler, "../../", nil)
require.NoError(t, err)
defer func() {
if closeServer != nil {
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestSettingBlocksHeadersGrpc(t *testing.T) {
w.WriteHeader(244591)
}
})
chainParser, chainRouter, _, closeServer, err := CreateChainLibMocks(ctx, "LAV1", spectypes.APIInterfaceGrpc, serverHandler, "../../", nil)
chainParser, chainRouter, _, closeServer, _, err := CreateChainLibMocks(ctx, "LAV1", spectypes.APIInterfaceGrpc, serverHandler, "../../", nil)
require.NoError(t, err)
defer func() {
if closeServer != nil {
Expand Down
6 changes: 3 additions & 3 deletions protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context, cmdFlags common.Con
utils.LavaFormatDebug("in <<<",
utils.LogAttr("GUID", ctx),
utils.LogAttr("seed", msgSeed),
utils.LogAttr("msg", logFormattedMsg),
utils.LogAttr("_msg", logFormattedMsg),
utils.LogAttr("dappID", dappID),
utils.LogAttr("headers", headers),
)
Expand Down Expand Up @@ -596,7 +596,7 @@ func (cp *JrpcChainProxy) sendBatchMessage(ctx context.Context, nodeMessage *rpc
}
}
// set context with timeout
connectCtx, cancel := cp.NodeUrl.LowerContextTimeout(ctx, chainMessage, cp.averageBlockTime)
connectCtx, cancel := cp.CapTimeoutForSend(ctx, chainMessage)
defer cancel()

cp.NodeUrl.SetIpForwardingIfNecessary(ctx, rpc.SetHeader)
Expand Down Expand Up @@ -668,7 +668,7 @@ func (cp *JrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{},
} else {
// we use the minimum timeout between the two, spec or context. to prevent the provider from hanging
// we don't use the context alone so the provider won't be hanging forever by an attack
connectCtx, cancel := cp.NodeUrl.LowerContextTimeout(ctx, chainMessage, cp.averageBlockTime)
connectCtx, cancel := cp.CapTimeoutForSend(ctx, chainMessage)
defer cancel()

cp.NodeUrl.SetIpForwardingIfNecessary(ctx, rpc.SetHeader)
Expand Down
10 changes: 5 additions & 5 deletions protocol/chainlib/jsonRPC_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestJsonRpcChainProxy(t *testing.T) {
fmt.Fprint(w, `{"jsonrpc":"2.0","id":1,"result":"0x10a7a08"}`)
})

chainParser, chainProxy, chainFetcher, closeServer, err := CreateChainLibMocks(ctx, "ETH1", spectypes.APIInterfaceJsonRPC, serverHandle, "../../", nil)
chainParser, chainProxy, chainFetcher, closeServer, _, err := CreateChainLibMocks(ctx, "ETH1", spectypes.APIInterfaceJsonRPC, serverHandle, "../../", nil)
require.NoError(t, err)
require.NotNil(t, chainParser)
require.NotNil(t, chainProxy)
Expand All @@ -164,7 +164,7 @@ func TestAddonAndVerifications(t *testing.T) {
fmt.Fprint(w, `{"jsonrpc":"2.0","id":1,"result":"0xf9ccdff90234a064"}`)
})

chainParser, chainRouter, chainFetcher, closeServer, err := CreateChainLibMocks(ctx, "ETH1", spectypes.APIInterfaceJsonRPC, serverHandle, "../../", []string{"debug"})
chainParser, chainRouter, chainFetcher, closeServer, _, err := CreateChainLibMocks(ctx, "ETH1", spectypes.APIInterfaceJsonRPC, serverHandle, "../../", []string{"debug"})
require.NoError(t, err)
require.NotNil(t, chainParser)
require.NotNil(t, chainRouter)
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestExtensions(t *testing.T) {
})

specname := "ETH1"
chainParser, chainRouter, chainFetcher, closeServer, err := CreateChainLibMocks(ctx, specname, spectypes.APIInterfaceJsonRPC, serverHandle, "../../", []string{"archive"})
chainParser, chainRouter, chainFetcher, closeServer, _, err := CreateChainLibMocks(ctx, specname, spectypes.APIInterfaceJsonRPC, serverHandle, "../../", []string{"archive"})
require.NoError(t, err)
require.NotNil(t, chainParser)
require.NotNil(t, chainRouter)
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestJsonRpcBatchCall(t *testing.T) {
fmt.Fprint(w, response)
})

chainParser, chainProxy, chainFetcher, closeServer, err := CreateChainLibMocks(ctx, "ETH1", spectypes.APIInterfaceJsonRPC, serverHandle, "../../", nil)
chainParser, chainProxy, chainFetcher, closeServer, _, err := CreateChainLibMocks(ctx, "ETH1", spectypes.APIInterfaceJsonRPC, serverHandle, "../../", nil)
require.NoError(t, err)
require.NotNil(t, chainParser)
require.NotNil(t, chainProxy)
Expand Down Expand Up @@ -320,7 +320,7 @@ func TestJsonRpcBatchCallSameID(t *testing.T) {
fmt.Fprint(w, response)
})

chainParser, chainProxy, chainFetcher, closeServer, err := CreateChainLibMocks(ctx, "ETH1", spectypes.APIInterfaceJsonRPC, serverHandle, "../../", nil)
chainParser, chainProxy, chainFetcher, closeServer, _, err := CreateChainLibMocks(ctx, "ETH1", spectypes.APIInterfaceJsonRPC, serverHandle, "../../", nil)
require.NoError(t, err)
require.NotNil(t, chainParser)
require.NotNil(t, chainProxy)
Expand Down
8 changes: 4 additions & 4 deletions protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (apil *RestChainListener) Serve(ctx context.Context, cmdFlags common.Consum
analytics := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
utils.LavaFormatDebug("in <<<",
utils.LogAttr("GUID", ctx),
utils.LogAttr("path", path),
utils.LogAttr("_path", path),
utils.LogAttr("dappID", dappID),
utils.LogAttr("msgSeed", msgSeed),
utils.LogAttr("headers", restHeaders),
Expand Down Expand Up @@ -366,7 +366,7 @@ func (apil *RestChainListener) Serve(ctx context.Context, cmdFlags common.Consum
defer cancel() // incase there's a problem make sure to cancel the connection
utils.LavaFormatDebug("in <<<",
utils.LogAttr("GUID", ctx),
utils.LogAttr("path", path),
utils.LogAttr("_path", path),
utils.LogAttr("dappID", dappID),
utils.LogAttr("msgSeed", msgSeed),
utils.LogAttr("headers", restHeaders),
Expand Down Expand Up @@ -473,7 +473,7 @@ func (rcp *RestChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{},
urlPath := rcp.NodeUrl.Url + nodeMessage.Path

// set context with timeout
connectCtx, cancel := rcp.NodeUrl.LowerContextTimeout(ctx, chainMessage, rcp.averageBlockTime)
connectCtx, cancel := rcp.CapTimeoutForSend(ctx, chainMessage)
defer cancel()

req, err := http.NewRequestWithContext(connectCtx, connectionTypeSlected, rcp.NodeUrl.AuthConfig.AddAuthPath(urlPath), msgBuffer)
Expand All @@ -496,7 +496,7 @@ func (rcp *RestChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{},

if debug {
utils.LavaFormatDebug("provider sending node message",
utils.Attribute{Key: "method", Value: nodeMessage.Path},
utils.Attribute{Key: "_method", Value: nodeMessage.Path},
utils.Attribute{Key: "headers", Value: req.Header},
utils.Attribute{Key: "apiInterface", Value: "rest"},
)
Expand Down
6 changes: 3 additions & 3 deletions protocol/chainlib/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestRestChainProxy(t *testing.T) {
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"block": { "header": {"height": "244591"}}}`)
})
chainParser, chainProxy, chainFetcher, closeServer, err := CreateChainLibMocks(ctx, "LAV1", spectypes.APIInterfaceRest, serverHandler, "../../", nil)
chainParser, chainProxy, chainFetcher, closeServer, _, err := CreateChainLibMocks(ctx, "LAV1", spectypes.APIInterfaceRest, serverHandler, "../../", nil)
require.NoError(t, err)
require.NotNil(t, chainParser)
require.NotNil(t, chainProxy)
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestParsingRequestedBlocksHeadersRest(t *testing.T) {
fmt.Fprint(w, `{"block": { "header": {"height": "244591"}}}`)
}
})
chainParser, chainRouter, _, closeServer, err := CreateChainLibMocks(ctx, "LAV1", spectypes.APIInterfaceRest, serverHandler, "../../", nil)
chainParser, chainRouter, _, closeServer, _, err := CreateChainLibMocks(ctx, "LAV1", spectypes.APIInterfaceRest, serverHandler, "../../", nil)
require.NoError(t, err)
defer func() {
if closeServer != nil {
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestSettingRequestedBlocksHeadersRest(t *testing.T) {
}
fmt.Fprint(w, `{"block": { "header": {"height": "244591"}}}`)
})
chainParser, chainRouter, _, closeServer, err := CreateChainLibMocks(ctx, "LAV1", spectypes.APIInterfaceRest, serverHandler, "../../", nil)
chainParser, chainRouter, _, closeServer, _, err := CreateChainLibMocks(ctx, "LAV1", spectypes.APIInterfaceRest, serverHandler, "../../", nil)
require.NoError(t, err)
defer func() {
if closeServer != nil {
Expand Down
Loading
Loading