Skip to content

Commit

Permalink
Fix race condition while running cache service on provider process. (#…
Browse files Browse the repository at this point in the history
…1060)

* Fix race condition while running cache service on provider process.

* fix lint

* add version upgrade
  • Loading branch information
ranlavanet authored Dec 20, 2023
1 parent 2748a34 commit a3e2937
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 39 deletions.
13 changes: 3 additions & 10 deletions protocol/chainlib/chain_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/lavanet/lava/protocol/parser"
"github.com/lavanet/lava/protocol/performance"
"github.com/lavanet/lava/utils"
"github.com/lavanet/lava/utils/protocopy"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
spectypes "github.com/lavanet/lava/x/spec/types"
)
Expand Down Expand Up @@ -84,18 +83,12 @@ func (cf *ChainFetcher) Validate(ctx context.Context) error {
}

func (cf *ChainFetcher) populateCache(relayData *pairingtypes.RelayPrivateData, reply *pairingtypes.RelayReply, requestedBlockHash []byte, finalized bool) {
if requestedBlockHash != nil || finalized {
copyPrivateData := &pairingtypes.RelayPrivateData{}
copyErr := protocopy.DeepCopyProtoObject(relayData, copyPrivateData)
if copyErr != nil {
utils.LavaFormatError("Failed Copying relay private data on populateCache", copyErr)
return
}
if cf.cache.CacheActive() && (requestedBlockHash != nil || finalized) {
new_ctx := context.Background()
new_ctx, cancel := context.WithTimeout(new_ctx, common.DataReliabilityTimeoutIncrease)
defer cancel()
err := cf.cache.SetEntry(new_ctx, copyPrivateData, requestedBlockHash, cf.endpoint.ChainID, reply, finalized, "", nil)
if err != nil && !performance.NotInitialisedError.Is(err) {
err := cf.cache.SetEntry(new_ctx, relayData, requestedBlockHash, cf.endpoint.ChainID, reply, finalized, "", nil)
if err != nil {
utils.LavaFormatWarning("chain fetcher error updating cache with new entry", err)
}
}
Expand Down
4 changes: 4 additions & 0 deletions protocol/performance/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (cache *Cache) GetEntry(ctx context.Context, request *pairingtypes.RelayPri
return cache.client.GetRelay(ctx, &pairingtypes.RelayCacheGet{Request: request, BlockHash: blockHash, ChainID: chainID, Finalized: finalized, Provider: provider})
}

func (cache *Cache) CacheActive() bool {
return cache == nil
}

func (cache *Cache) SetEntry(ctx context.Context, request *pairingtypes.RelayPrivateData, blockHash []byte, chainID string, reply *pairingtypes.RelayReply, finalized bool, provider string, optionalMetadata []pairingtypes.Metadata) error {
if cache == nil {
// TODO: try to connect again once in a while
Expand Down
49 changes: 27 additions & 22 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,28 +507,33 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider(
)
}
errResponse = rpccs.consumerSessionManager.OnSessionDone(singleConsumerSession, latestBlock, chainlib.GetComputeUnits(chainMessage), relayLatency, singleConsumerSession.CalculateExpectedLatency(relayTimeout), expectedBH, numOfProviders, pairingAddressesLen, chainMessage.GetApi().Category.HangingApi) // session done successfully
// copy private data so if it changes it doesn't panic mid async send
copyPrivateData := &pairingtypes.RelayPrivateData{}
copyErr := protocopy.DeepCopyProtoObject(localRelayResult.Request.RelayData, copyPrivateData)
// set cache in a nonblocking call
go func() {
// deal with copying error.
if copyErr != nil {
utils.LavaFormatError("Failed copying relay private data sendRelayToProvider", copyErr)
return
}
requestedBlock, _ := chainMessage.RequestedBlock()
if requestedBlock == spectypes.NOT_APPLICABLE {
return
}
new_ctx := context.Background()
new_ctx, cancel := context.WithTimeout(new_ctx, common.DataReliabilityTimeoutIncrease)
defer cancel()
err2 := rpccs.cache.SetEntry(new_ctx, copyPrivateData, nil, chainID, localRelayResult.Reply, localRelayResult.Finalized, localRelayResult.Request.RelaySession.Provider, nil) // caching in the portal doesn't care about hashes
if err2 != nil && !performance.NotInitialisedError.Is(err2) {
utils.LavaFormatWarning("error updating cache with new entry", err2)
}
}()

if rpccs.cache.CacheActive() {
// copy private data so if it changes it doesn't panic mid async send
copyPrivateData := &pairingtypes.RelayPrivateData{}
copyRequestErr := protocopy.DeepCopyProtoObject(localRelayResult.Request.RelayData, copyPrivateData)
copyReply := &pairingtypes.RelayReply{}
copyReplyErr := protocopy.DeepCopyProtoObject(localRelayResult.Reply, copyReply)
// set cache in a non blocking call
go func() {
// deal with copying error.
if copyRequestErr != nil || copyReplyErr != nil {
utils.LavaFormatError("Failed copying relay private data sendRelayToProvider", nil, utils.LogAttr("copyReplyErr", copyReplyErr), utils.LogAttr("copyRequestErr", copyRequestErr))
return
}
requestedBlock, _ := chainMessage.RequestedBlock()
if requestedBlock == spectypes.NOT_APPLICABLE {
return
}
new_ctx := context.Background()
new_ctx, cancel := context.WithTimeout(new_ctx, common.DataReliabilityTimeoutIncrease)
defer cancel()
err2 := rpccs.cache.SetEntry(new_ctx, copyPrivateData, nil, chainID, copyReply, localRelayResult.Finalized, localRelayResult.Request.RelaySession.Provider, nil) // caching in the portal doesn't care about hashes
if err2 != nil {
utils.LavaFormatWarning("error updating cache with new entry", err2)
}
}()
}
}(providerPublicAddress, sessionInfo)
}

Expand Down
15 changes: 9 additions & 6 deletions protocol/rpcprovider/rpcprovider_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,19 +684,22 @@ func (rpcps *RPCProviderServer) TryRelay(ctx context.Context, request *pairingty
}
reply.Metadata, _, ignoredMetadata = rpcps.chainParser.HandleHeaders(reply.Metadata, chainMsg.GetApiCollection(), spectypes.Header_pass_reply)
// TODO: use overwriteReqBlock on the reply metadata to set the correct latest block
if requestedBlockHash != nil || finalized {
if cache.CacheActive() && (requestedBlockHash != nil || finalized) {
// copy request and reply as they change later on and we call SetEntry in a routine.
copyPrivateData := &pairingtypes.RelayPrivateData{}
copyErr := protocopy.DeepCopyProtoObject(request.RelayData, copyPrivateData)
copyRequestErr := protocopy.DeepCopyProtoObject(request.RelayData, copyPrivateData)
copyReply := &pairingtypes.RelayReply{}
copyReplyErr := protocopy.DeepCopyProtoObject(reply, copyReply)
go func() {
if copyErr != nil {
utils.LavaFormatError("Failed copying relay private data on TryRelay", copyErr)
if copyRequestErr != nil || copyReplyErr != nil {
utils.LavaFormatError("Failed copying relay private data on TryRelay", nil, utils.LogAttr("copyReplyErr", copyReplyErr), utils.LogAttr("copyRequestErr", copyRequestErr))
return
}
new_ctx := context.Background()
new_ctx, cancel := context.WithTimeout(new_ctx, common.DataReliabilityTimeoutIncrease)
defer cancel()
err := cache.SetEntry(new_ctx, copyPrivateData, requestedBlockHash, rpcps.rpcProviderEndpoint.ChainID, reply, finalized, rpcps.providerAddress.String(), ignoredMetadata)
if err != nil && !performance.NotInitialisedError.Is(err) && request.RelaySession.Epoch != spectypes.NOT_APPLICABLE {
err := cache.SetEntry(new_ctx, copyPrivateData, requestedBlockHash, rpcps.rpcProviderEndpoint.ChainID, copyReply, finalized, rpcps.providerAddress.String(), ignoredMetadata)
if err != nil && request.RelaySession.Epoch != spectypes.NOT_APPLICABLE {
utils.LavaFormatWarning("error updating cache with new entry", err, utils.Attribute{Key: "GUID", Value: ctx})
}
}()
Expand Down
2 changes: 1 addition & 1 deletion x/protocol/types/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
var _ paramtypes.ParamSet = (*Params)(nil)

const (
TARGET_VERSION = "0.31.3"
TARGET_VERSION = "0.31.4"
MIN_VERSION = "0.30.1"
)

Expand Down

0 comments on commit a3e2937

Please sign in to comment.