Skip to content

Commit

Permalink
Added ListRecentCidsAsStringWithChildren
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Mar 5, 2024
1 parent 5da5ddb commit 11cfcf7
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 3 deletions.
5 changes: 4 additions & 1 deletion blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,13 +639,16 @@ func (bl *FxBlockchain) handleReplicateInPool(method string, action string, from

pCtx, pCancel := context.WithTimeout(r.Context(), time.Second*time.Duration(bl.timeout))
defer pCancel()
pinOptions := ipfsClusterClientApi.PinOptions{
Mode: 0,
}
for i := 0; i < len(res.Manifests); i++ {
c, err := cid.Decode(res.Manifests[i].Cid)
if err != nil {
log.Errorw("Error decoding CID:", "err", err)
continue // Or handle the error appropriately
}
replicationRes, err := bl.ipfsClusterApi.Pin(pCtx, ipfsClusterClientApi.NewCid(c), ipfsClusterClientApi.PinOptions{})
replicationRes, err := bl.ipfsClusterApi.Pin(pCtx, ipfsClusterClientApi.NewCid(c), pinOptions)
if err != nil {
log.Errorw("Error pinning CID:", "err", err)
continue
Expand Down
63 changes: 63 additions & 0 deletions mobile/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@ import (
"bytes"
"context"
"errors"
"fmt"
"strings"

"github.com/functionland/go-fula/blockchain"
"github.com/functionland/go-fula/exchange"
bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
_ "github.com/ipld/go-ipld-prime/codec/dagcbor"
_ "github.com/ipld/go-ipld-prime/codec/dagjson"
_ "github.com/ipld/go-ipld-prime/codec/raw"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
ipldmc "github.com/ipld/go-ipld-prime/multicodec"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/mr-tron/base58"
Expand All @@ -35,6 +41,7 @@ import (
// * Any struct type, all of whose exported methods have supported function types and all of whose exported fields have supported types.

var rootDatastoreKey = datastore.NewKey("/")
var exploreAllRecursivelySelector selector.Selector

Check failure on line 44 in mobile/client.go

View workflow job for this annotation

GitHub Actions / All

var exploreAllRecursivelySelector is unused (U1000)

type Client struct {
h host.Host
Expand Down Expand Up @@ -243,6 +250,62 @@ func (c *Client) ListRecentCidsAsString() (*StringIterator, error) {
return &StringIterator{links: links}, nil
}

func (c *Client) ListRecentCidsAsStringWithChildren() (*StringIterator, error) {
ctx := context.TODO()
recentLinks, err := c.listRecentCids(ctx)
if err != nil {
return nil, err
}

ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
ss := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreUnion(
ssb.Matcher(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
))
exploreAllRecursivelySelector, err := ss.Selector()
if err != nil {
return nil, fmt.Errorf("failed to parse IPLD built-in selector: %v", err)
}

cidSet := make(map[string]struct{}) // Use a map to track unique CIDs
for _, link := range recentLinks {
node, err := c.ls.Load(ipld.LinkContext{Ctx: ctx}, link, basicnode.Prototype.Any)
if err != nil {
return nil, err
}

progress := traversal.Progress{
Cfg: &traversal.Config{
Ctx: ctx,
LinkSystem: c.ls,
LinkTargetNodePrototypeChooser: bsfetcher.DefaultPrototypeChooser,
LinkVisitOnlyOnce: true,
},
}

err = progress.WalkMatching(node, exploreAllRecursivelySelector, func(progress traversal.Progress, visitedNode datamodel.Node) error {
link, err := c.ls.ComputeLink(link.Prototype(), visitedNode)
if err != nil {
return err
}
cidStr := link.String()
cidSet[cidStr] = struct{}{} // Add CID string to the set
return nil
})
if err != nil {
return nil, err
}
}

// Convert the map keys to a slice
var uniqueCidStrings []string
for cidStr := range cidSet {
uniqueCidStrings = append(uniqueCidStrings, cidStr)
}

return &StringIterator{links: uniqueCidStrings}, nil // Return StringIterator with unique CIDs
}

func (c *Client) ClearCidsFromRecent(cidsBytes []byte) error {
ctx := context.TODO()

Expand Down
72 changes: 70 additions & 2 deletions mobile/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,9 @@ func Example_poolExchangeDagBetweenClientBlox() {
fmt.Printf("Stored raw data link: %s\n", c.String())
log.Infof("Stored raw data link: %s", c.String())

recentCids, err := c1.ListRecentCidsAsString()
recentCids, err := c1.ListRecentCidsAsStringWithChildren()
if err != nil {
log.Errorw("Error happened in ListRecentCidsAsString", "err", err)
log.Errorw("Error happened in ListRecentCidsAsStringWithChildre", "err", err)
panic(err)
}
for recentCids.HasNext() {
Expand Down Expand Up @@ -471,6 +471,74 @@ func Example_poolExchangeDagBetweenClientBlox() {
// Fetched Val is: some raw data
}

func Example_listRecentCidsWithChildren() {
// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "debug"); err != nil {
log.Error("Error happened in logging.SetLogLevel")
panic(err)
}

mcfg := fulamobile.NewConfig()
mcfg.AllowTransientConnection = true
mcfg.BloxAddr = ""
mcfg.PoolName = "1"
mcfg.Exchange = "noop"
mcfg.BlockchainEndpoint = ""

c1, err := fulamobile.NewClient(mcfg)
if err != nil {
log.Errorw("Error happened in fulamobile.NewClient", "err", err)
panic(err)
}
// Authorize exchange between the two nodes
mobilePeerIDString := c1.ID()
log.Infof("first client created with ID: %s", mobilePeerIDString)
mpid, err := peer.Decode(mobilePeerIDString)
if err != nil {
log.Errorw("Error happened in peer.Decode", "err", err)
panic(err)
}
log.Infof("mpid is %s", mpid)

rawData := []byte("some raw data")
fmt.Printf("Original Val is: %s\n", string(rawData))
rawCodec := int64(0x55)
linkBytes, err := c1.Put(rawData, rawCodec)
if err != nil {
fmt.Printf("Error storing the raw data: %v", err)
return
}
c, err := cid.Cast(linkBytes)
if err != nil {
fmt.Printf("Error casting bytes to CID: %v", err)
return
}
fmt.Printf("Stored raw data link: %s\n", c.String())
log.Infof("Stored raw data link: %s", c.String())

recentCids, err := c1.ListRecentCidsAsStringWithChildren()
if err != nil {
log.Errorw("Error happened in ListRecentCidsAsStringWithChildren", "err", err)
panic(err)
}
for recentCids.HasNext() {
cid, err := recentCids.Next()
if err != nil {
fmt.Printf("Error retrieving next CID: %v", err)
log.Errorf("Error retrieving next CID: %v", err)
// Decide if you want to break or continue based on your error handling strategy
break
}
fmt.Printf("recentCid link: %s\n", cid) // Print each CID
log.Infof("recentCid link: %s", cid)
}

// Output:
// Original Val is: some raw data
// Stored raw data link: bafkr4ifmwbmdrkxep3mci37ionvgturlylvganap4ch7ouia2ui5tmr4iy
// recentCid link: bafkr4ifmwbmdrkxep3mci37ionvgturlylvganap4ch7ouia2ui5tmr4iy
}

func Example_poolExchangeLargeDagBetweenClientBlox() {
server := startMockServer("127.0.0.1:4004")
defer func() {
Expand Down
29 changes: 29 additions & 0 deletions mobile/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,32 @@ func (c *Client) listRecentCidsAsString(ctx context.Context) ([]string, error) {
}
return links, nil
}

func (c *Client) listRecentCids(ctx context.Context) ([]ipld.Link, error) {
q := query.Query{
KeysOnly: true,
Prefix: recentCidKeyPrefix.String(),
}
results, err := c.ds.Query(ctx, q)
if err != nil {
return nil, err
}
defer results.Close()

var links []ipld.Link
for r := range results.Next() {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if r.Error != nil {
return nil, r.Error
}
key := datastore.RawKey(r.Key)
c, err := cid.Decode(key.BaseNamespace())
if err != nil {
return nil, err
}
links = append(links, cidlink.Link{Cid: c})
}
return links, nil
}

0 comments on commit 11cfcf7

Please sign in to comment.