Skip to content

Commit

Permalink
WIP: Only obtain a bearer token once at a time
Browse files Browse the repository at this point in the history
Currently, on pushes, we can start several concurrent layer pushes;
each one will check for a bearer token in tokenCache, find none,
and ask the server for one, and then write it into the cache.

So, we can hammer the server with 6 basically-concurrent token requests.
That's unnecessary, slower than just asking once, and potentially might
impact rate limiting heuristics.

Instead, serialize writes to a bearerToken so that we only have one request in
flight at a time.

This does not apply to pulls, where the first request is for a manifest;
that obtains a token, so subsequent concurrent layer pulls will not request
a token again.

WIP: Clean up the debugging log entries.

Signed-off-by: Miloslav Trmač <[email protected]>
  • Loading branch information
mtrmac committed Jun 1, 2023
1 parent 025f73c commit e2c713b
Showing 1 changed file with 75 additions and 20 deletions.
95 changes: 75 additions & 20 deletions docker/docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
digest "github.com/opencontainers/go-digest"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)

const (
Expand Down Expand Up @@ -84,8 +85,19 @@ type extensionSignatureList struct {
Signatures []extensionSignature `json:"signatures"`
}

// bearerToken records a cached token we can use to authenticate.
// bearerToken records a cached token we can use to authenticate, or a pending process to obtain one.
//
// The goroutine obtaining the token holds lock to block concurrent token requests, and fills the structure (err and possibly the other fields)
// before releasing the lock.
// Other goroutines obtain lock to block on the token request, if any; and then inspect err to see if the token is usable.
// If it is not, they try to get a new one.
type bearerToken struct {
// lock is held while obtaining the token. Potentially nested inside dockerClient.tokenCacheLock.
// This is a counting semaphore only because we need a cancellable lock operation.
lock *semaphore.Weighted

// The following fields can only be accessed with lock held.
err error // nil if the token was successfully obtained (but may be expired); an error if the next lock holder _must_ obtain a new token.
token string
expirationTime time.Time
}
Expand Down Expand Up @@ -115,7 +127,7 @@ type dockerClient struct {
supportsSignatures bool

// Private state for setupRequestAuth (key: string, value: bearerToken)
tokenCacheLock sync.Mutex // Protects tokenCache
tokenCacheLock sync.Mutex // Protects tokenCache.
tokenCache map[string]*bearerToken
// Private state for detectProperties:
detectPropertiesOnce sync.Once // detectPropertiesOnce is used to execute detectProperties() at most once.
Expand Down Expand Up @@ -674,31 +686,74 @@ func (c *dockerClient) obtainBearerToken(ctx context.Context, challenge challeng
scopes = append(scopes, *extraScope)
}

var token *bearerToken
var inCache bool
func() { // A scope for defer
logrus.Debugf("REMOVE: Checking token cache for key %q", cacheKey)
token, newEntry, err := func() (*bearerToken, bool, error) { // A scope for defer
c.tokenCacheLock.Lock()
defer c.tokenCacheLock.Unlock()
token, inCache = c.tokenCache[cacheKey]
}()
if !inCache || time.Now().After(token.expirationTime) {
token = &bearerToken{}

var err error
if c.auth.IdentityToken != "" {
err = c.getBearerTokenOAuth2(ctx, token, challenge, scopes)
token, ok := c.tokenCache[cacheKey]
if ok {
return token, false, nil
} else {
err = c.getBearerToken(ctx, token, challenge, scopes)
logrus.Debugf("REMOVE: No token cache for key %q, allocating one…", cacheKey)
token = &bearerToken{
lock: semaphore.NewWeighted(1),
}
// If this is a new *bearerToken, lock the entry before adding it to the cache, so that any other goroutine that finds
// this entry blocks until we obtain the token for the first time, and does not see an empty object
// (and does not try to obtain the token itself when we are going to do so).
if err := token.lock.Acquire(ctx, 1); err != nil {
// We do not block on this Acquire, so we don’t really expect to fail here — but if ctx is canceled,
// there is no point in trying to continue anyway.
return nil, false, err
}
c.tokenCache[cacheKey] = token
return token, true, nil
}
if err != nil {
}()
if err != nil {
return "", err
}
if !newEntry {
// If this is an existing *bearerToken, obtain the lock only after releasing c.tokenCacheLock,
// so that users of other cacheKey values are not blocked for the whole duration of our HTTP roundtrip.
logrus.Debugf("REMOVE: Found existing token cache for key %q, getting lock", cacheKey)
if err := token.lock.Acquire(ctx, 1); err != nil {
return "", err
}
logrus.Debugf("REMOVE: Locked existing token cache for key %q", cacheKey)
}

func() { // A scope for defer
c.tokenCacheLock.Lock()
defer c.tokenCacheLock.Unlock()
c.tokenCache[cacheKey] = token
}()
defer token.lock.Release(1)

// Determine if the bearerToken is usable: if it is not, log the cause and fall through, otherwise return early.
switch {
case newEntry:
logrus.Debugf("REMOVE: New token cache entry for key %q, getting first token", cacheKey)
case token.err != nil:
// If obtaining a token fails for any reason, the request that triggered that will fail;
// other requests will see token.err and try obtaining their own token, one goroutine at a time.
// (Consider that a request can fail because a very short timeout was provided to _that one operation_ using a context.Context;
// that clearly shouldn’t prevent other operations from trying with a longer timeout.)
//
// If we got here while holding token.lock, we are the goroutine responsible for trying again; others are blocked
// on token.lock.
logrus.Debugf("REMOVE: Token cache for key %q records failure %v, getting new token", cacheKey, token.err)
case time.Now().After(token.expirationTime):
logrus.Debugf("REMOVE: Token cache for key %q is expired, getting new token", cacheKey)

default:
return token.token, nil
}

if c.auth.IdentityToken != "" {
err = c.getBearerTokenOAuth2(ctx, token, challenge, scopes)
} else {
err = c.getBearerToken(ctx, token, challenge, scopes)
}
logrus.Debugf("REMOVE: Obtaining a token for key %q, error %v", cacheKey, err)
token.err = err
if token.err != nil {
return "", token.err
}
return token.token, nil
}
Expand Down

0 comments on commit e2c713b

Please sign in to comment.