Skip to content

Commit

Permalink
fix seen block bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet committed Mar 17, 2024
1 parent 6355b80 commit d1863fc
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 83 deletions.
139 changes: 98 additions & 41 deletions ecosystem/cache/handlers.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// ============= GET =============
// if seen block >= requested block && requested_block > 0 -> no need to fetch seen block in advance of get.
// if requested_block < 0 -> parse block, replace requested block,
// * check if seen_block >= replaced_requested_block -> no need to fetch seen and get.

// else fetch seen

// if seen >= requested_block -> seen_for_hash = requested (for key calculation)
// if seen < requested_block, seen_for_hash = 0

// fetch with the new key, check if result.seen_block >= my_seen ==> return hit
// else, the result is bad. cache miss.

// ============ SET =========
// if seen >= requested_block -> seen_for_hash = requested (for key calculation)
// if seen < requested_block, seen_for_hash = 0

package cache

import (
Expand Down Expand Up @@ -84,77 +101,118 @@ func (s *RelayerCacheServer) getSeenBlockForSharedStateMode(chainId string, shar

func (s *RelayerCacheServer) GetRelay(ctx context.Context, relayCacheGet *pairingtypes.RelayCacheGet) (*pairingtypes.CacheRelayReply, error) {
cacheReply := &pairingtypes.CacheRelayReply{}
var cacheReplyTmp *pairingtypes.CacheRelayReply
var err error
var seenBlock int64

waitGroup := sync.WaitGroup{}
waitGroup.Add(2) // currently we have two groups getRelayInner and getSeenBlock
requestedBlock := relayCacheGet.RequestedBlock // save requested block
originalRequestedBlock := relayCacheGet.RequestedBlock // save requested block prior to swap
if originalRequestedBlock < 0 { // we need to fetch stored latest block information.
getLatestBlock := s.getLatestBlock(latestBlockKey(relayCacheGet.ChainId, ""))
relayCacheGet.RequestedBlock = lavaprotocol.ReplaceRequestedBlock(originalRequestedBlock, getLatestBlock)
}

// fetch all reads at the same time.
go func() {
defer waitGroup.Done()
var cacheReplyTmp *pairingtypes.CacheRelayReply
utils.LavaFormatDebug("Got Cache Get", utils.Attribute{Key: "request_hash", Value: string(relayCacheGet.RequestHash)},
utils.Attribute{Key: "finalized", Value: relayCacheGet.Finalized},
utils.Attribute{Key: "requested_block", Value: originalRequestedBlock},
utils.Attribute{Key: "block_hash", Value: relayCacheGet.BlockHash},
utils.Attribute{Key: "requested_block_parsed", Value: relayCacheGet.RequestedBlock},
utils.Attribute{Key: "seen_block", Value: relayCacheGet.SeenBlock},
)

// check seen block is larger than our requested block, we don't need to fetch seen block prior as its already larger than requested block
if relayCacheGet.SeenBlock >= relayCacheGet.RequestedBlock {
waitGroup := sync.WaitGroup{}
waitGroup.Add(2) // currently we have two groups getRelayInner and getSeenBlock
// fetch all reads at the same time.
go func() {
defer waitGroup.Done()
cacheReplyTmp, err = s.getRelayInner(relayCacheGet)
if cacheReplyTmp != nil {
cacheReply = cacheReplyTmp // set cache reply only if its not nil, as we need to store seen block in it.
}
}()
go func() {
defer waitGroup.Done()
// set seen block if required
seenBlock = s.getSeenBlockForSharedStateMode(relayCacheGet.ChainId, relayCacheGet.SharedStateId)
}()
// wait for all reads to complete before moving forward
waitGroup.Wait()
} else {
// our seen block might change our cache key value when shared state is enabled, we need to fetch it prior to moving forward
// fetch seen block prior to cache
seenBlock = s.getSeenBlockForSharedStateMode(relayCacheGet.ChainId, relayCacheGet.SharedStateId)
if seenBlock > relayCacheGet.SeenBlock {
relayCacheGet.SeenBlock = seenBlock // update state.
}
// now that we have our updated seen block state we can
cacheReplyTmp, err = s.getRelayInner(relayCacheGet)
if cacheReplyTmp != nil {
cacheReply = cacheReplyTmp // set cache reply only if its not nil, as we need to store seen block in it.
}
}()
go func() {
defer waitGroup.Done()
// set seen block if required
seenBlock = s.getSeenBlockForSharedStateMode(relayCacheGet.ChainId, relayCacheGet.SharedStateId)
}()
}

// ============= GET =============
// if seen block >= requested block && requested_block > 0 -> no need to fetch seen block in advance of get.
// if requested_block < 0 -> parse block, replace requested block,
// * check if seen_block >= replaced_requested_block -> no need to fetch seen and get.

// else fetch seen

// if seen >= requested_block -> seen_for_hash = requested (for key calculation)
// if seen < requested_block, seen_for_hash = 0

// fetch with the new key, check if result.seen_block >= my_seen ==> return hit
// else, the result is bad. cache miss.

// ============ SET =========
// if seen >= requested_block -> seen_for_hash = requested (for key calculation)
// if seen < requested_block, seen_for_hash = 0

// wait for all reads to complete before moving forward
waitGroup.Wait()
// set seen block.
if seenBlock > cacheReply.SeenBlock {
cacheReply.SeenBlock = seenBlock
}

// add prometheus metrics asynchronously
go func() {
cacheMetricsContext, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
var hit bool
if err != nil {
s.cacheMiss(ctx, err)
s.cacheMiss(cacheMetricsContext, err)
} else {
hit = true
s.cacheHit(ctx)
s.cacheHit(cacheMetricsContext)
}
s.CacheServer.CacheMetrics.AddApiSpecific(requestedBlock, relayCacheGet.ChainId, hit)
s.CacheServer.CacheMetrics.AddApiSpecific(originalRequestedBlock, relayCacheGet.ChainId, hit)
}()
return cacheReply, err
}

// add the requested block to the hash key information so we hit the right block.
func (s *RelayerCacheServer) formatHashKey(hash []byte, latestBlock int64) []byte {
// Convert the int64 number to a byte array
numBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(numBytes, uint64(latestBlock))

// Append the int64 byte array to the hash byte array
hash = append(hash, numBytes...)
// formatHashKey formats the hash key by adding latestBlock and seenBlock information.
// If seenBlock is greater than or equal to latestBlock, seenBlock is set to latestBlock for key calculation,
// otherwise, it's set to 0.
func (s *RelayerCacheServer) formatHashKey(hash []byte, latestBlock int64, seenBlock int64) []byte {
// Handle seenBlock according to the specified rules
if seenBlock >= latestBlock {
seenBlock = latestBlock
} else {
seenBlock = 0
}
// Append the latestBlock and seenBlock directly to the hash using little-endian encoding
hash = binary.LittleEndian.AppendUint64(hash, uint64(latestBlock))
hash = binary.LittleEndian.AppendUint64(hash, uint64(seenBlock))
return hash
}

func (s *RelayerCacheServer) getRelayInner(relayCacheGet *pairingtypes.RelayCacheGet) (*pairingtypes.CacheRelayReply, error) {
requestedBlock := relayCacheGet.RequestedBlock
// get the latest block per chain, on consumer side .Provider is empty,
// on the provider side it contains the provider public key to distinguish between different providers using the same cache service.
getLatestBlock := s.getLatestBlock(latestBlockKey(relayCacheGet.ChainId, ""))
relayCacheGet.RequestedBlock = lavaprotocol.ReplaceRequestedBlock(requestedBlock, getLatestBlock)
// cache key is compressed from:
// 1. Request hash including all the information inside RelayPrivateData (Salt can cause issues if not dealt with on consumer side.)
// 2. chain-id (same requests for different chains should get unique results)
// 3. provider address, different providers can return different results on none deterministic responses. we want to have consistency.
cacheKey := s.formatHashKey(relayCacheGet.RequestHash, relayCacheGet.RequestedBlock)
utils.LavaFormatDebug("Got Cache Get", utils.Attribute{Key: "cacheKey", Value: string(cacheKey)},
utils.Attribute{Key: "finalized", Value: relayCacheGet.Finalized},
utils.Attribute{Key: "requestedBlock", Value: requestedBlock},
utils.Attribute{Key: "requestHash", Value: relayCacheGet.BlockHash},
utils.Attribute{Key: "getLatestBlock", Value: relayCacheGet.RequestedBlock},
)
// 3. seen block to distinguish between seen entries and unseen entries.
cacheKey := s.formatHashKey(relayCacheGet.RequestHash, relayCacheGet.RequestedBlock, relayCacheGet.SeenBlock)

cacheVal, cache_source, found := s.findInAllCaches(relayCacheGet.Finalized, cacheKey)
// TODO: use the information when a new block is finalized
if !found {
Expand Down Expand Up @@ -229,8 +287,7 @@ func (s *RelayerCacheServer) SetRelay(ctx context.Context, relayCacheSet *pairin
if relayCacheSet.RequestedBlock < 0 {
return nil, utils.LavaFormatError("invalid relay cache set data, request block is negative", nil, utils.Attribute{Key: "requestBlock", Value: relayCacheSet.RequestedBlock})
}
// TODO: make this non-blocking
cacheKey := s.formatHashKey(relayCacheSet.RequestHash, relayCacheSet.RequestedBlock)
cacheKey := s.formatHashKey(relayCacheSet.RequestHash, relayCacheSet.RequestedBlock, relayCacheSet.SeenBlock)
cacheValue := formatCacheValue(relayCacheSet.Response, relayCacheSet.BlockHash, relayCacheSet.Finalized, relayCacheSet.OptionalMetadata)
utils.LavaFormatDebug("Got Cache Set", utils.Attribute{Key: "cacheKey", Value: string(cacheKey)},
utils.Attribute{Key: "finalized", Value: fmt.Sprintf("%t", relayCacheSet.Finalized)},
Expand Down
1 change: 1 addition & 0 deletions proto/lavanet/lava/pairing/relayCache.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ message RelayCacheGet {
int64 requested_block = 4;
string shared_state_id = 5; // empty id for no shared state
string chain_id = 6; // used to set latest block per chain.
int64 seen_block = 7;
}

message RelayCacheSet {
Expand Down
3 changes: 1 addition & 2 deletions protocol/chainlib/chain_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,7 @@ func HashCacheRequestInner(relayData *pairingtypes.RelayPrivateData, chainId str
inputFormatter, outputFormatter := formatter.FormatterForRelayRequestAndResponse(relayData.ApiInterface)
relayData.Data = inputFormatter(relayData.Data) // remove id from request.
relayData.Salt = nil // remove salt
// TODO: Do we need to set this to 0? or in some cases the data is no longer relevant and we need to set it to a value
relayData.SeenBlock = 0 // remove seen block changes
relayData.SeenBlock = 0 // remove seen block
// we remove the discrepancy of requested block from the hash, and add it on the cache side instead
// this is due to the fact that we don't know the latest seen block at this moment, as on shared state
// only the cache has this information. we make sure the hashing at this stage does not include the requested block.
Expand Down
1 change: 1 addition & 0 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider(
BlockHash: nil,
Finalized: false,
SharedStateId: sharedStateId,
SeenBlock: relayRequestData.SeenBlock,
}) // caching in the portal doesn't care about hashes, and we don't have data on finalization yet
cancel()
reply := cacheReply.GetReply()
Expand Down
Loading

0 comments on commit d1863fc

Please sign in to comment.