diff --git a/docker/docker_client.go b/docker/docker_client.go index 0182c056b1..69478a0fc4 100644 --- a/docker/docker_client.go +++ b/docker/docker_client.go @@ -32,6 +32,7 @@ import ( digest "github.com/opencontainers/go-digest" imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" ) const ( @@ -84,8 +85,19 @@ type extensionSignatureList struct { Signatures []extensionSignature `json:"signatures"` } -// bearerToken records a cached token we can use to authenticate. +// bearerToken records a cached token we can use to authenticate, or a pending process to obtain one. +// +// The goroutine obtaining the token holds lock to block concurrent token requests, and fills the structure (err and possibly the other fields) +// before releasing the lock. +// Other goroutines obtain lock to block on the token request, if any; and then inspect err to see if the token is usable. +// If it is not, they try to get a new one. type bearerToken struct { + // lock is held while obtaining the token. Potentially nested inside dockerClient.tokenCacheLock. + // This is a counting semaphore only because we need a cancellable lock operation. + lock *semaphore.Weighted + + // The following fields can only be accessed with lock held. + err error // nil if the token was successfully obtained (but may be expired); an error if the next lock holder _must_ obtain a new token. token string expirationTime time.Time } @@ -115,7 +127,7 @@ type dockerClient struct { supportsSignatures bool // Private state for setupRequestAuth (key: string, value: bearerToken) - tokenCacheLock sync.Mutex // Protects tokenCache + tokenCacheLock sync.Mutex // Protects tokenCache. tokenCache map[string]*bearerToken // Private state for detectProperties: detectPropertiesOnce sync.Once // detectPropertiesOnce is used to execute detectProperties() at most once. @@ -674,31 +686,74 @@ func (c *dockerClient) obtainBearerToken(ctx context.Context, challenge challeng scopes = append(scopes, *extraScope) } - var token *bearerToken - var inCache bool - func() { // A scope for defer + logrus.Debugf("REMOVE: Checking token cache for key %q", cacheKey) + token, newEntry, err := func() (*bearerToken, bool, error) { // A scope for defer c.tokenCacheLock.Lock() defer c.tokenCacheLock.Unlock() - token, inCache = c.tokenCache[cacheKey] - }() - if !inCache || time.Now().After(token.expirationTime) { - token = &bearerToken{} - - var err error - if c.auth.IdentityToken != "" { - err = c.getBearerTokenOAuth2(ctx, token, challenge, scopes) + token, ok := c.tokenCache[cacheKey] + if ok { + return token, false, nil } else { - err = c.getBearerToken(ctx, token, challenge, scopes) + logrus.Debugf("REMOVE: No token cache for key %q, allocating one…", cacheKey) + token = &bearerToken{ + lock: semaphore.NewWeighted(1), + } + // If this is a new *bearerToken, lock the entry before adding it to the cache, so that any other goroutine that finds + // this entry blocks until we obtain the token for the first time, and does not see an empty object + // (and does not try to obtain the token itself when we are going to do so). + if err := token.lock.Acquire(ctx, 1); err != nil { + // We do not block on this Acquire, so we don’t really expect to fail here — but if ctx is canceled, + // there is no point in trying to continue anyway. + return nil, false, err + } + c.tokenCache[cacheKey] = token + return token, true, nil } - if err != nil { + }() + if err != nil { + return "", err + } + if !newEntry { + // If this is an existing *bearerToken, obtain the lock only after releasing c.tokenCacheLock, + // so that users of other cacheKey values are not blocked for the whole duration of our HTTP roundtrip. + logrus.Debugf("REMOVE: Found existing token cache for key %q, getting lock", cacheKey) + if err := token.lock.Acquire(ctx, 1); err != nil { return "", err } + logrus.Debugf("REMOVE: Locked existing token cache for key %q", cacheKey) + } - func() { // A scope for defer - c.tokenCacheLock.Lock() - defer c.tokenCacheLock.Unlock() - c.tokenCache[cacheKey] = token - }() + defer token.lock.Release(1) + + // Determine if the bearerToken is usable: if it is not, log the cause and fall through, otherwise return early. + switch { + case newEntry: + logrus.Debugf("REMOVE: New token cache entry for key %q, getting first token", cacheKey) + case token.err != nil: + // If obtaining a token fails for any reason, the request that triggered that will fail; + // other requests will see token.err and try obtaining their own token, one goroutine at a time. + // (Consider that a request can fail because a very short timeout was provided to _that one operation_ using a context.Context; + // that clearly shouldn’t prevent other operations from trying with a longer timeout.) + // + // If we got here while holding token.lock, we are the goroutine responsible for trying again; others are blocked + // on token.lock. + logrus.Debugf("REMOVE: Token cache for key %q records failure %v, getting new token", cacheKey, token.err) + case time.Now().After(token.expirationTime): + logrus.Debugf("REMOVE: Token cache for key %q is expired, getting new token", cacheKey) + + default: + return token.token, nil + } + + if c.auth.IdentityToken != "" { + err = c.getBearerTokenOAuth2(ctx, token, challenge, scopes) + } else { + err = c.getBearerToken(ctx, token, challenge, scopes) + } + logrus.Debugf("REMOVE: Obtaining a token for key %q, error %v", cacheKey, err) + token.err = err + if token.err != nil { + return "", token.err } return token.token, nil }