Skip to content

Commit

Permalink
Added store cid using ipfs get
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Feb 21, 2024
1 parent e1f8b69 commit 2591235
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
28 changes: 25 additions & 3 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,24 @@ func (p *Blox) PubsubValidator(ctx context.Context, id peer.ID, msg *pubsub.Mess
return p.an.ValidateAnnouncement(ctx, id, msg, status, exists)
}

func (p *Blox) storeCidIPFS(ctx context.Context, c path.Path) error {
getCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
_, err := p.rpc.Block().Get(getCtx, c)
if err != nil {
log.Errorw("It seems that the link is not found", "c", c, "err", err)
return err
}
pinCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err = p.rpc.Pin().Add(pinCtx, c)
if err != nil {
log.Errorw("It seems that the link is found but not pinned", "c", c, "err", err)
return err
}
return nil
}

func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error {
exists, err := p.Has(ctx, l)
if err != nil {
Expand All @@ -130,6 +148,8 @@ func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error {
log.Errorw("And Error happened in StoreCid", "err", err)
return err
}
cidLink := l.(cidlink.Link).Cid
cidPath := path.FromCid(cidLink)
// Iterate over the providers and ping
for _, provider := range providers {
if provider.ID != p.h.ID() {
Expand All @@ -151,17 +171,19 @@ func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error {
err = p.ex.PullBlock(ctx, provider.ID, l)
if err != nil {
log.Errorw("Error happened in pulling from provider", "l", l, "err", err)
continue
err := p.storeCidIPFS(ctx, cidPath)
if err != nil {
continue
}
}
cidLink := l.(cidlink.Link).Cid
cidPath := path.FromCid(cidLink)
statCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel() // Ensures resources are cleaned up after the Stat call
stat, err := p.rpc.Block().Stat(statCtx, cidPath)
if err != nil {
log.Errorw("It seems that the link is not stored", "l", l, "err", err)
continue
}
p.ex.IpniNotifyLink(l)
log.Debugw("link might be successfully stored", "l", l, "from", provider.ID, "size", stat.Size())
return nil
} else {
Expand Down
14 changes: 7 additions & 7 deletions exchange/hub_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,35 +67,35 @@ func (p *hubPublisher) Start(_ context.Context) error {
maybePublish := func() {
remaining := len(unpublished)
if remaining == 0 {
log.Debug("No remaining entries to publish")
log.Debug("hubPublisher: No remaining entries to publish")
return
}
if publishing.Load() {
log.Debugw("IPNI publishing in progress", "remaining", remaining)
log.Debugw("hubPublisher: IPNI publishing in progress", "remaining", remaining)
return
}
log.Debugw("Attempting to publish links to IPNI", "count", remaining)
log.Debugw("hubPublisher: Attempting to publish links to IPNI", "count", remaining)
mhs := make([]multihash.Multihash, 0, remaining)
for c := range unpublished {
mhs = append(mhs, c.Hash())
delete(unpublished, c)
}
publishing.Store(true)
go func(entries []multihash.Multihash) {
log.Debug("IPNI publish attempt in progress...")
log.Debug("hubPublisher: IPNI publish attempt in progress...")
defer func() {
publishing.Store(false)
log.Debug("Finished attempt to publish to IPNI.")
log.Debug("hubPublisher: Finished attempt to publish to IPNI.")
}()
if err := p.publish(entries); err != nil {
log.Errorw("Failed to publish to IPNI", "entriesCount", len(mhs), "err", err)
log.Errorw("hubPublisher: Failed to publish to IPNI", "entriesCount", len(mhs), "err", err)
}
}(mhs)
}
for {
select {
case <-p.ctx.Done():
log.Infow("IPNI publisher stopped", "remainingLinks", len(unpublished))
log.Infow("hubPublisher: IPNI publisher stopped", "remainingLinks", len(unpublished))
return
case <-p.ipniPublishTicker.C:
maybePublish()
Expand Down

0 comments on commit 2591235

Please sign in to comment.