From 11cfcf7a1c7b8e3e6a54988756358758e8c061b9 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Tue, 5 Mar 2024 14:41:52 -0500 Subject: [PATCH] Added ListRecentCidsAsStringWithChildren --- blockchain/blockchain.go | 5 ++- mobile/client.go | 63 +++++++++++++++++++++++++++++++++++ mobile/example_test.go | 72 ++++++++++++++++++++++++++++++++++++++-- mobile/store.go | 29 ++++++++++++++++ 4 files changed, 166 insertions(+), 3 deletions(-) diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 45c53b6..a4c6743 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -639,13 +639,16 @@ func (bl *FxBlockchain) handleReplicateInPool(method string, action string, from pCtx, pCancel := context.WithTimeout(r.Context(), time.Second*time.Duration(bl.timeout)) defer pCancel() + pinOptions := ipfsClusterClientApi.PinOptions{ + Mode: 0, + } for i := 0; i < len(res.Manifests); i++ { c, err := cid.Decode(res.Manifests[i].Cid) if err != nil { log.Errorw("Error decoding CID:", "err", err) continue // Or handle the error appropriately } - replicationRes, err := bl.ipfsClusterApi.Pin(pCtx, ipfsClusterClientApi.NewCid(c), ipfsClusterClientApi.PinOptions{}) + replicationRes, err := bl.ipfsClusterApi.Pin(pCtx, ipfsClusterClientApi.NewCid(c), pinOptions) if err != nil { log.Errorw("Error pinning CID:", "err", err) continue diff --git a/mobile/client.go b/mobile/client.go index b2b79db..c7a7727 100644 --- a/mobile/client.go +++ b/mobile/client.go @@ -4,19 +4,25 @@ import ( "bytes" "context" "errors" + "fmt" "strings" "github.com/functionland/go-fula/blockchain" "github.com/functionland/go-fula/exchange" + bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipld/go-ipld-prime" _ "github.com/ipld/go-ipld-prime/codec/dagcbor" _ "github.com/ipld/go-ipld-prime/codec/dagjson" _ "github.com/ipld/go-ipld-prime/codec/raw" + "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" ipldmc "github.com/ipld/go-ipld-prime/multicodec" basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/mr-tron/base58" @@ -35,6 +41,7 @@ import ( // * Any struct type, all of whose exported methods have supported function types and all of whose exported fields have supported types. var rootDatastoreKey = datastore.NewKey("/") +var exploreAllRecursivelySelector selector.Selector type Client struct { h host.Host @@ -243,6 +250,62 @@ func (c *Client) ListRecentCidsAsString() (*StringIterator, error) { return &StringIterator{links: links}, nil } +func (c *Client) ListRecentCidsAsStringWithChildren() (*StringIterator, error) { + ctx := context.TODO() + recentLinks, err := c.listRecentCids(ctx) + if err != nil { + return nil, err + } + + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) + ss := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreUnion( + ssb.Matcher(), + ssb.ExploreAll(ssb.ExploreRecursiveEdge()), + )) + exploreAllRecursivelySelector, err := ss.Selector() + if err != nil { + return nil, fmt.Errorf("failed to parse IPLD built-in selector: %v", err) + } + + cidSet := make(map[string]struct{}) // Use a map to track unique CIDs + for _, link := range recentLinks { + node, err := c.ls.Load(ipld.LinkContext{Ctx: ctx}, link, basicnode.Prototype.Any) + if err != nil { + return nil, err + } + + progress := traversal.Progress{ + Cfg: &traversal.Config{ + Ctx: ctx, + LinkSystem: c.ls, + LinkTargetNodePrototypeChooser: bsfetcher.DefaultPrototypeChooser, + LinkVisitOnlyOnce: true, + }, + } + + err = progress.WalkMatching(node, exploreAllRecursivelySelector, func(progress traversal.Progress, visitedNode datamodel.Node) error { + link, err := c.ls.ComputeLink(link.Prototype(), visitedNode) + if err != nil { + return err + } + cidStr := link.String() + cidSet[cidStr] = struct{}{} // Add CID string to the set + return nil + }) + if err != nil { + return nil, err + } + } + + // Convert the map keys to a slice + var uniqueCidStrings []string + for cidStr := range cidSet { + uniqueCidStrings = append(uniqueCidStrings, cidStr) + } + + return &StringIterator{links: uniqueCidStrings}, nil // Return StringIterator with unique CIDs +} + func (c *Client) ClearCidsFromRecent(cidsBytes []byte) error { ctx := context.TODO() diff --git a/mobile/example_test.go b/mobile/example_test.go index 24e796d..025d75a 100644 --- a/mobile/example_test.go +++ b/mobile/example_test.go @@ -396,9 +396,9 @@ func Example_poolExchangeDagBetweenClientBlox() { fmt.Printf("Stored raw data link: %s\n", c.String()) log.Infof("Stored raw data link: %s", c.String()) - recentCids, err := c1.ListRecentCidsAsString() + recentCids, err := c1.ListRecentCidsAsStringWithChildren() if err != nil { - log.Errorw("Error happened in ListRecentCidsAsString", "err", err) + log.Errorw("Error happened in ListRecentCidsAsStringWithChildre", "err", err) panic(err) } for recentCids.HasNext() { @@ -471,6 +471,74 @@ func Example_poolExchangeDagBetweenClientBlox() { // Fetched Val is: some raw data } +func Example_listRecentCidsWithChildren() { + // Elevate log level to show internal communications. + if err := logging.SetLogLevel("*", "debug"); err != nil { + log.Error("Error happened in logging.SetLogLevel") + panic(err) + } + + mcfg := fulamobile.NewConfig() + mcfg.AllowTransientConnection = true + mcfg.BloxAddr = "" + mcfg.PoolName = "1" + mcfg.Exchange = "noop" + mcfg.BlockchainEndpoint = "" + + c1, err := fulamobile.NewClient(mcfg) + if err != nil { + log.Errorw("Error happened in fulamobile.NewClient", "err", err) + panic(err) + } + // Authorize exchange between the two nodes + mobilePeerIDString := c1.ID() + log.Infof("first client created with ID: %s", mobilePeerIDString) + mpid, err := peer.Decode(mobilePeerIDString) + if err != nil { + log.Errorw("Error happened in peer.Decode", "err", err) + panic(err) + } + log.Infof("mpid is %s", mpid) + + rawData := []byte("some raw data") + fmt.Printf("Original Val is: %s\n", string(rawData)) + rawCodec := int64(0x55) + linkBytes, err := c1.Put(rawData, rawCodec) + if err != nil { + fmt.Printf("Error storing the raw data: %v", err) + return + } + c, err := cid.Cast(linkBytes) + if err != nil { + fmt.Printf("Error casting bytes to CID: %v", err) + return + } + fmt.Printf("Stored raw data link: %s\n", c.String()) + log.Infof("Stored raw data link: %s", c.String()) + + recentCids, err := c1.ListRecentCidsAsStringWithChildren() + if err != nil { + log.Errorw("Error happened in ListRecentCidsAsStringWithChildren", "err", err) + panic(err) + } + for recentCids.HasNext() { + cid, err := recentCids.Next() + if err != nil { + fmt.Printf("Error retrieving next CID: %v", err) + log.Errorf("Error retrieving next CID: %v", err) + // Decide if you want to break or continue based on your error handling strategy + break + } + fmt.Printf("recentCid link: %s\n", cid) // Print each CID + log.Infof("recentCid link: %s", cid) + } + + // Output: + // Original Val is: some raw data + // Stored raw data link: bafkr4ifmwbmdrkxep3mci37ionvgturlylvganap4ch7ouia2ui5tmr4iy + // recentCid link: bafkr4ifmwbmdrkxep3mci37ionvgturlylvganap4ch7ouia2ui5tmr4iy +} + func Example_poolExchangeLargeDagBetweenClientBlox() { server := startMockServer("127.0.0.1:4004") defer func() { diff --git a/mobile/store.go b/mobile/store.go index 6d07de7..fb3cbc4 100644 --- a/mobile/store.go +++ b/mobile/store.go @@ -116,3 +116,32 @@ func (c *Client) listRecentCidsAsString(ctx context.Context) ([]string, error) { } return links, nil } + +func (c *Client) listRecentCids(ctx context.Context) ([]ipld.Link, error) { + q := query.Query{ + KeysOnly: true, + Prefix: recentCidKeyPrefix.String(), + } + results, err := c.ds.Query(ctx, q) + if err != nil { + return nil, err + } + defer results.Close() + + var links []ipld.Link + for r := range results.Next() { + if ctx.Err() != nil { + return nil, ctx.Err() + } + if r.Error != nil { + return nil, r.Error + } + key := datastore.RawKey(r.Key) + c, err := cid.Decode(key.BaseNamespace()) + if err != nil { + return nil, err + } + links = append(links, cidlink.Link{Cid: c}) + } + return links, nil +}