Skip to content

Commit

Permalink
Update blox.go
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Mar 23, 2024
1 parent 3083d5d commit c846d73
Showing 1 changed file with 56 additions and 54 deletions.
110 changes: 56 additions & 54 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,69 +658,71 @@ func (p *Blox) Start(ctx context.Context) error {
}
p.ctx, p.cancel = context.WithCancel(ctx)

// Starting a new goroutine for periodic task
log.Debug("Called wg.Add in blox.start")
p.wg.Add(1)
go func() {
log.Debug("Called wg.Done in blox.start")
defer p.wg.Done()
defer log.Debug("Start blox blox.start go routine is ending")
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// instead of FetchAvailableManifestsAndStore See what are the new manifests from ther last time we checked in blockstore
// A method that checks the last time we checked for stored blocks file
// then it checks the stored files under /uniondrive/ipfs_datastore/blocks/{folders that are not .temp}
// then for those folders that the change date/time is after the time we last checked it reads the files names that are created after the last time we checked
// thne it passes each filename to a method called GetCidv1FromBlockFilename(string filename) (string, error) and receives the cidv1 of each file and put all in an array []string
// Then it calls the method:_, err := p.bl.HandleManifestBatchStore(ctx, p.topicName, storedCids)
// If no error, it stores the time/date in a file under /internal/.last_time_ipfs_checked so that it can use it in the next run
// ListStoredBlocks, and GetCidv1FromBlockFilename
lastCheckedTime, err := p.GetLastCheckedTime()
if err != nil {
log.Errorf("Error retrieving last checked time: %v", err)
continue
}
p.topicName = p.getPoolName()
if p.topicName != "0" {
storedLinks, err := p.ListModifiedStoredBlocks(lastCheckedTime)
if !p.poolHostMode {
// Starting a new goroutine for periodic task
log.Debug("Called wg.Add in blox.start")
p.wg.Add(1)
go func() {
log.Debug("Called wg.Done in blox.start")
defer p.wg.Done()
defer log.Debug("Start blox blox.start go routine is ending")
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// instead of FetchAvailableManifestsAndStore See what are the new manifests from ther last time we checked in blockstore
// A method that checks the last time we checked for stored blocks file
// then it checks the stored files under /uniondrive/ipfs_datastore/blocks/{folders that are not .temp}
// then for those folders that the change date/time is after the time we last checked it reads the files names that are created after the last time we checked
// thne it passes each filename to a method called GetCidv1FromBlockFilename(string filename) (string, error) and receives the cidv1 of each file and put all in an array []string
// Then it calls the method:_, err := p.bl.HandleManifestBatchStore(ctx, p.topicName, storedCids)
// If no error, it stores the time/date in a file under /internal/.last_time_ipfs_checked so that it can use it in the next run
// ListStoredBlocks, and GetCidv1FromBlockFilename
lastCheckedTime, err := p.GetLastCheckedTime()
if err != nil {
log.Errorf("Error listing stored blocks: %v", err)
log.Errorf("Error retrieving last checked time: %v", err)
continue
}

// Call HandleManifestBatchStore method
if len(storedLinks) > 0 {
_, err = p.bl.HandleManifestBatchStore(context.TODO(), p.topicName, storedLinks)
if strings.Contains(err.Error(), "AccountAlreadyStorer") {
// Log the occurrence of the specific error but do not continue
log.Warnw("Attempt to store with an account that is already a storer", "err", err, "p.topicName", p.topicName, "storedLinks", storedLinks)
} else {
// For any other error, log and continue
log.Errorw("Error calling HandleManifestBatchStore", "err", err, "p.topicName", p.topicName, "storedLinks", storedLinks)
p.topicName = p.getPoolName()
if p.topicName != "0" {
storedLinks, err := p.ListModifiedStoredBlocks(lastCheckedTime)
if err != nil {
log.Errorf("Error listing stored blocks: %v", err)
continue
}

// Call HandleManifestBatchStore method
if len(storedLinks) > 0 {
_, err = p.bl.HandleManifestBatchStore(context.TODO(), p.topicName, storedLinks)
if strings.Contains(err.Error(), "AccountAlreadyStorer") {
// Log the occurrence of the specific error but do not continue
log.Warnw("Attempt to store with an account that is already a storer", "err", err, "p.topicName", p.topicName, "storedLinks", storedLinks)
} else {
// For any other error, log and continue
log.Errorw("Error calling HandleManifestBatchStore", "err", err, "p.topicName", p.topicName, "storedLinks", storedLinks)
continue
}
}
}
}

// Update the last checked time
err = p.UpdateLastCheckedTime()
if err != nil {
log.Errorf("Error updating last checked time: %v", err)
// Update the last checked time
err = p.UpdateLastCheckedTime()
if err != nil {
log.Errorf("Error updating last checked time: %v", err)
}
case <-ctx.Done():
// This will handle the case where the parent context is canceled
log.Info("Stopping periodic blox.start due to context cancellation")
return
case <-p.ctx.Done():
// This will handle the case where the parent context is canceled
log.Info("Stopping periodic blox.start due to context cancellation")
return
}
case <-ctx.Done():
// This will handle the case where the parent context is canceled
log.Info("Stopping periodic blox.start due to context cancellation")
return
case <-p.ctx.Done():
// This will handle the case where the parent context is canceled
log.Info("Stopping periodic blox.start due to context cancellation")
return
}
}
}()
}()
}

return nil
}
Expand Down

0 comments on commit c846d73

Please sign in to comment.