Skip to content

Commit

Permalink
check if manifests are avaialbe before storing pinned blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Mar 27, 2024
1 parent cf6f554 commit 41e6e5f
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
56 changes: 56 additions & 0 deletions blockchain/bl_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net/http"
"strconv"
"time"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
Expand Down Expand Up @@ -84,6 +85,61 @@ func (bl *FxBlockchain) ManifestStore(ctx context.Context, to peer.ID, r Manifes
}
}

func (bl *FxBlockchain) HandleManifestAvailableBatch(ctx context.Context, poolIDString string, account string, links []ipld.Link) ([]ipld.Link, error) {
var availableLinks []ipld.Link
poolID, err := strconv.Atoi(poolIDString)
if err != nil {
return nil, fmt.Errorf("invalid pool ID: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(bl.timeout))
defer cancel()

var cids []string
for _, link := range links {
cids = append(cids, link.String())
}

reqBody := ReplicateRequest{
Cids: cids,
Account: account,
PoolID: poolID,
}
log.Debugw("HandleManifestAvailableBatch", "reqBody", reqBody)
req, err := json.Marshal(reqBody)
if err != nil {
return nil, fmt.Errorf("failed to serialize request: %w", err)
}

response, statusCode, err := bl.callBlockchain(ctx, "POST", actionManifestAvailableBatch, req)
if err != nil {
return nil, fmt.Errorf("blockchain call failed: %w", err)
}
if statusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", statusCode)
}

var resp ReplicateResponse
if err := json.Unmarshal(response, &resp); err != nil {
return nil, fmt.Errorf("failed to parse response: %w", err)
}
log.Debugw("HandleManifestAvailableBatch", "resp", resp)

// Filter for available manifests.
for _, manifest := range resp.Manifests {
if manifest.ReplicationAvailable > 0 {
c, err := cid.Decode(manifest.Cid)
if err != nil {
// Log or handle the error based on your application's logging strategy.
continue // Skipping invalid CIDs.
}
availableLinks = append(availableLinks, cidlink.Link{Cid: c})
}
}

return availableLinks, nil
}

func (bl *FxBlockchain) HandleManifestBatchStore(ctx context.Context, poolIDString string, links []ipld.Link) ([]string, error) {
var linksString []string
for _, link := range links {
Expand Down
15 changes: 11 additions & 4 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,16 +727,23 @@ func (p *Blox) Start(ctx context.Context) error {

// Call HandleManifestBatchStore method
if len(storedLinks) > 0 {
_, err = p.bl.HandleManifestBatchStore(context.TODO(), p.topicName, storedLinks)
// Check if the manifests are available to be stored or not
availableLinks, err := p.bl.HandleManifestAvailableBatch(shortCtx, p.topicName, nodeAccount, storedLinks)
if err != nil {
log.Errorw("Error checking available manifests", "err", err)
continue // Or handle the error appropriately
}
// If available then submit to store
_, err = p.bl.HandleManifestBatchStore(context.TODO(), p.topicName, availableLinks)
if strings.Contains(err.Error(), "AccountAlreadyStorer") {
// Log the occurrence of the specific error but do not continue
log.Warnw("Attempt to store with an account that is already a storer", "err", err, "p.topicName", p.topicName, "storedLinks", storedLinks)
log.Warnw("Attempt to store with an account that is already a storer", "err", err, "p.topicName", p.topicName, "availableLinks", availableLinks)
} else if strings.Contains(err.Error(), "Transaction is outdated") {
continue
} else {
// For any other error, log and continue
log.Errorw("Error calling HandleManifestBatchStore", "err", err, "p.topicName", p.topicName, "storedLinks", storedLinks)
p.UpdateFailedCids(storedLinks)
log.Errorw("Error calling HandleManifestBatchStore", "err", err, "p.topicName", p.topicName, "availableLinks", availableLinks)
p.UpdateFailedCids(availableLinks)
//continue
}
}
Expand Down

0 comments on commit 41e6e5f

Please sign in to comment.