From 41e6e5f57981688386a094957c8600dcb464fa02 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Wed, 27 Mar 2024 19:14:53 -0400 Subject: [PATCH] check if manifests are avaialbe before storing pinned blocks --- blockchain/bl_manifest.go | 56 +++++++++++++++++++++++++++++++++++++++ blox/blox.go | 15 ++++++++--- 2 files changed, 67 insertions(+), 4 deletions(-) diff --git a/blockchain/bl_manifest.go b/blockchain/bl_manifest.go index 021ff56..9d578ac 100644 --- a/blockchain/bl_manifest.go +++ b/blockchain/bl_manifest.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "strconv" + "time" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" @@ -84,6 +85,61 @@ func (bl *FxBlockchain) ManifestStore(ctx context.Context, to peer.ID, r Manifes } } +func (bl *FxBlockchain) HandleManifestAvailableBatch(ctx context.Context, poolIDString string, account string, links []ipld.Link) ([]ipld.Link, error) { + var availableLinks []ipld.Link + poolID, err := strconv.Atoi(poolIDString) + if err != nil { + return nil, fmt.Errorf("invalid pool ID: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(bl.timeout)) + defer cancel() + + var cids []string + for _, link := range links { + cids = append(cids, link.String()) + } + + reqBody := ReplicateRequest{ + Cids: cids, + Account: account, + PoolID: poolID, + } + log.Debugw("HandleManifestAvailableBatch", "reqBody", reqBody) + req, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("failed to serialize request: %w", err) + } + + response, statusCode, err := bl.callBlockchain(ctx, "POST", actionManifestAvailableBatch, req) + if err != nil { + return nil, fmt.Errorf("blockchain call failed: %w", err) + } + if statusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", statusCode) + } + + var resp ReplicateResponse + if err := json.Unmarshal(response, &resp); err != nil { + return nil, fmt.Errorf("failed to parse response: %w", err) + } + log.Debugw("HandleManifestAvailableBatch", "resp", resp) + + // Filter for available manifests. + for _, manifest := range resp.Manifests { + if manifest.ReplicationAvailable > 0 { + c, err := cid.Decode(manifest.Cid) + if err != nil { + // Log or handle the error based on your application's logging strategy. + continue // Skipping invalid CIDs. + } + availableLinks = append(availableLinks, cidlink.Link{Cid: c}) + } + } + + return availableLinks, nil +} + func (bl *FxBlockchain) HandleManifestBatchStore(ctx context.Context, poolIDString string, links []ipld.Link) ([]string, error) { var linksString []string for _, link := range links { diff --git a/blox/blox.go b/blox/blox.go index 19ae714..19874eb 100644 --- a/blox/blox.go +++ b/blox/blox.go @@ -727,16 +727,23 @@ func (p *Blox) Start(ctx context.Context) error { // Call HandleManifestBatchStore method if len(storedLinks) > 0 { - _, err = p.bl.HandleManifestBatchStore(context.TODO(), p.topicName, storedLinks) + // Check if the manifests are available to be stored or not + availableLinks, err := p.bl.HandleManifestAvailableBatch(shortCtx, p.topicName, nodeAccount, storedLinks) + if err != nil { + log.Errorw("Error checking available manifests", "err", err) + continue // Or handle the error appropriately + } + // If available then submit to store + _, err = p.bl.HandleManifestBatchStore(context.TODO(), p.topicName, availableLinks) if strings.Contains(err.Error(), "AccountAlreadyStorer") { // Log the occurrence of the specific error but do not continue - log.Warnw("Attempt to store with an account that is already a storer", "err", err, "p.topicName", p.topicName, "storedLinks", storedLinks) + log.Warnw("Attempt to store with an account that is already a storer", "err", err, "p.topicName", p.topicName, "availableLinks", availableLinks) } else if strings.Contains(err.Error(), "Transaction is outdated") { continue } else { // For any other error, log and continue - log.Errorw("Error calling HandleManifestBatchStore", "err", err, "p.topicName", p.topicName, "storedLinks", storedLinks) - p.UpdateFailedCids(storedLinks) + log.Errorw("Error calling HandleManifestBatchStore", "err", err, "p.topicName", p.topicName, "availableLinks", availableLinks) + p.UpdateFailedCids(availableLinks) //continue } }