Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

cleanup of comments/ exports #114

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ type Config struct {
// MaxRetrievalAttempts determines the number of times we will attempt to retrieve a block from the Saturn network before failing.
MaxRetrievalAttempts int

// MaxFetchFailuresBeforeCoolDown is the maximum number of retrieval failures across the pool for a key we will tolerate before we
// add the key to the cool down cache.
// MaxFetchFailuresBeforeCoolDown is the maximum number of retrieval failures across the pool for a url before we auto-reject subsequent
// fetches of that url.
MaxFetchFailuresBeforeCoolDown int

// FetchKeyCoolDownDuration is duration of time a key will stay in the cool down cache
Expand All @@ -83,8 +83,11 @@ const DefaultSaturnOrchestratorRequestTimeout = 30 * time.Second
const DefaultSaturnBlockRequestTimeout = 19 * time.Second
const DefaultSaturnCarRequestTimeout = 30 * time.Minute

const DefaultMaxRetries = 3
const DefaultMirrorFraction = 0.1
// default retries before failure unless overridden by MaxRetrievalAttempts
const defaultMaxRetries = 3

// default percentage of requests to mirror for tracking how nodes perform unless overridden by MirrorFraction
const defaultMirrorFraction = 0.01

const maxBlockSize = 4194305 // 4 Mib + 1 byte
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200"
Expand All @@ -94,16 +97,15 @@ const DefaultPoolRefreshInterval = 5 * time.Minute
// if we've seen a certain number of failures for it already in a given duration.
// NOTE: before getting creative here, make sure you dont break end user flow
// described in https://github.com/ipni/storetheindex/pull/1344
const DefaultMaxFetchFailures = 3 * DefaultMaxRetries // this has to fail more than DefaultMaxRetries done for a single gateway request
const DefaultFetchKeyCoolDownDuration = 1 * time.Minute // how long will a sane person wait and stare at blank screen with "retry later" error before hitting F5?
const defaultMaxFetchFailures = 3 * defaultMaxRetries // this has to fail more than DefaultMaxRetries done for a single gateway request
const defaultFetchKeyCoolDownDuration = 1 * time.Minute // how long will a sane person wait and stare at blank screen with "retry later" error before hitting F5?

// we cool off sending requests to a Saturn node if it returns transient errors rather than immediately downvoting it;
// however, only upto a certain max number of cool-offs.
const DefaultSaturnNodeCoolOff = 5 * time.Minute
const defaultSaturnNodeCoolOff = 5 * time.Minute

var ErrNotImplemented error = errors.New("not implemented")
var ErrNoBackend error = errors.New("no available saturn backend")
var ErrBackendFailed error = errors.New("saturn backend failed")
var ErrContentProviderNotFound error = errors.New("saturn failed to find content providers")
var ErrSaturnTimeout error = errors.New("saturn backend timed out")

Expand Down Expand Up @@ -181,17 +183,17 @@ type DataCallback func(resource string, reader io.Reader) error
// Every request will result in a remote network request.
func NewCaboose(config *Config) (*Caboose, error) {
if config.FetchKeyCoolDownDuration == 0 {
config.FetchKeyCoolDownDuration = DefaultFetchKeyCoolDownDuration
config.FetchKeyCoolDownDuration = defaultFetchKeyCoolDownDuration
}
if config.MaxFetchFailuresBeforeCoolDown == 0 {
config.MaxFetchFailuresBeforeCoolDown = DefaultMaxFetchFailures
config.MaxFetchFailuresBeforeCoolDown = defaultMaxFetchFailures
}

if config.SaturnNodeCoolOff == 0 {
config.SaturnNodeCoolOff = DefaultSaturnNodeCoolOff
config.SaturnNodeCoolOff = defaultSaturnNodeCoolOff
}
if config.MirrorFraction == 0 {
config.MirrorFraction = DefaultMirrorFraction
config.MirrorFraction = defaultMirrorFraction
}
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
config.OrchestratorOverride = strings.Split(override, ",")
Expand Down Expand Up @@ -222,7 +224,7 @@ func NewCaboose(config *Config) (*Caboose, error) {
}

if c.config.MaxRetrievalAttempts == 0 {
c.config.MaxRetrievalAttempts = DefaultMaxRetries
c.config.MaxRetrievalAttempts = defaultMaxRetries
}

// start the pool
Expand Down
5 changes: 1 addition & 4 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,

reqCtx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil)
req, err := http.NewRequestWithContext(httpstat.WithHTTPStat(reqCtx, &result), http.MethodGet, reqUrl, nil)
if err != nil {
if recordIfContextErr(resourceType, reqCtx, "build-http-request") {
return rm, reqCtx.Err()
Expand All @@ -258,9 +258,6 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
agent := req.Header.Get("User-Agent")
req.Header.Set("User-Agent", os.Getenv(SaturnEnvKey)+"/"+agent)

//trace
req = req.WithContext(httpstat.WithHTTPStat(req.Context(), &result))

var resp *http.Response
saturnCallsTotalMetric.WithLabelValues(resourceType).Add(1)
startReq := time.Now()
Expand Down
11 changes: 6 additions & 5 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (p *pool) loadPool() ([]string, error) {
if p.config.OrchestratorOverride != nil {
return p.config.OrchestratorOverride, nil
}

resp, err := p.config.OrchestratorClient.Get(p.config.OrchestratorEndpoint.String())
if err != nil {
goLogger.Warnw("failed to get backends from orchestrator", "err", err, "endpoint", p.config.OrchestratorEndpoint.String())
Expand All @@ -52,7 +53,7 @@ func (p *pool) loadPool() ([]string, error) {
return responses, nil
}

type poolRequest struct {
type mirroredPoolRequest struct {
node string
path string
// the key for node affinity for the request
Expand All @@ -66,7 +67,7 @@ type pool struct {
started chan struct{} // started signals that we've already initialized the pool once with Saturn endpoints.
refresh chan struct{} // refresh is used to signal the need for doing a refresh of the Saturn endpoints pool.
done chan struct{} // done is used to signal that we're shutting down the Saturn endpoints pool and don't need to refresh it anymore.
mirrorSamples chan poolRequest
mirrorSamples chan mirroredPoolRequest

fetchKeyLk sync.RWMutex
fetchKeyFailureCache *cache.Cache // guarded by fetchKeyLk
Expand All @@ -91,7 +92,7 @@ func newPool(c *Config) *pool {
started: make(chan struct{}),
refresh: make(chan struct{}, 1),
done: make(chan struct{}, 1),
mirrorSamples: make(chan poolRequest, 10),
mirrorSamples: make(chan mirroredPoolRequest, 10),

fetchKeyCoolDownCache: cache.New(c.FetchKeyCoolDownDuration, 1*time.Minute),
fetchKeyFailureCache: cache.New(c.FetchKeyCoolDownDuration, 1*time.Minute),
Expand Down Expand Up @@ -331,7 +332,7 @@ func (p *pool) fetchBlockWith(ctx context.Context, c cid.Cid, with string) (blk
// mirror successful request
if p.config.MirrorFraction > rand.Float64() {
select {
case p.mirrorSamples <- poolRequest{node: nodes[i], path: fmt.Sprintf("/ipfs/%s?format=car&car-scope=block", c), key: aff}:
case p.mirrorSamples <- mirroredPoolRequest{node: nodes[i], path: fmt.Sprintf("/ipfs/%s?format=car&car-scope=block", c), key: aff}:
default:
}
}
Expand Down Expand Up @@ -428,7 +429,7 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba
// sample request for mirroring
if p.config.MirrorFraction > rand.Float64() {
select {
case p.mirrorSamples <- poolRequest{node: nodes[i], path: pq[0], key: aff}:
case p.mirrorSamples <- mirroredPoolRequest{node: nodes[i], path: pq[0], key: aff}:
default:
}
}
Expand Down