-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redis driver fork update v9.6.1 #47049
Changes from 5 commits
576b62a
3a2226d
e35fe83
b954eb9
66a112a
0f93704
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,6 +92,8 @@ const ( | |
const ( | ||
// aclWhoami is a subcommand of "acl" that requires special handling. | ||
aclWhoami = "whoami" | ||
// protocolV2 defines the RESP protocol v2 that Teleport uses. | ||
protocolV2 = 2 | ||
) | ||
|
||
// clusterClient is a wrapper around redis.ClusterClient | ||
|
@@ -101,30 +103,26 @@ type clusterClient struct { | |
|
||
// newClient creates a new Redis client based on given ConnectionMode. If connection mode is not supported | ||
// an error is returned. | ||
func newClient(ctx context.Context, connectionOptions *connection.Options, tlsConfig *tls.Config, onConnect onClientConnectFunc) (redis.UniversalClient, error) { | ||
func newClient(ctx context.Context, connectionOptions *connection.Options, tlsConfig *tls.Config, credentialsProvider fetchCredentialsFunc) (redis.UniversalClient, error) { | ||
connectionAddr := net.JoinHostPort(connectionOptions.Address, connectionOptions.Port) | ||
// TODO(jakub): Investigate Redis Sentinel. | ||
switch connectionOptions.Mode { | ||
case connection.Standalone: | ||
return redis.NewClient(&redis.Options{ | ||
Addr: connectionAddr, | ||
TLSConfig: tlsConfig, | ||
OnConnect: onConnect, | ||
|
||
// Auth should be done by the `OnConnect` callback here. So disable | ||
// "automatic" auth by the client. | ||
DisableAuthOnConnect: true, | ||
Addr: connectionAddr, | ||
TLSConfig: tlsConfig, | ||
CredentialsProviderContext: credentialsProvider, | ||
Protocol: protocolV2, | ||
DisableIndentity: true, | ||
}), nil | ||
case connection.Cluster: | ||
client := &clusterClient{ | ||
ClusterClient: *redis.NewClusterClient(&redis.ClusterOptions{ | ||
Addrs: []string{connectionAddr}, | ||
TLSConfig: tlsConfig, | ||
OnConnect: onConnect, | ||
NewClient: func(opt *redis.Options) *redis.Client { | ||
opt.DisableAuthOnConnect = true | ||
return redis.NewClient(opt) | ||
}, | ||
Addrs: []string{connectionAddr}, | ||
TLSConfig: tlsConfig, | ||
CredentialsProviderContext: credentialsProvider, | ||
Protocol: protocolV2, | ||
DisableIndentity: true, | ||
}), | ||
} | ||
// Load cluster information. | ||
|
@@ -137,33 +135,25 @@ func newClient(ctx context.Context, connectionOptions *connection.Options, tlsCo | |
} | ||
} | ||
|
||
// onClientConnectFunc is a callback function that performs setups after Redis | ||
// client makes a new connection. | ||
type onClientConnectFunc func(context.Context, *redis.Conn) error | ||
|
||
// fetchCredentialsFunc fetches credentials for a new connection. | ||
type fetchCredentialsFunc func(ctx context.Context) (username, password string, err error) | ||
|
||
func noopOnConnect(context.Context, *redis.Conn) error { | ||
return nil | ||
} | ||
|
||
// authWithPasswordOnConnect returns an onClientConnectFunc that sends "auth" | ||
// authWithPasswordOnConnect returns an fetchCredentialsFunc that sends "auth" | ||
// with provided username and password. | ||
func authWithPasswordOnConnect(username, password string) onClientConnectFunc { | ||
return func(ctx context.Context, conn *redis.Conn) error { | ||
return authConnection(ctx, conn, username, password) | ||
func authWithPasswordOnConnect(username, password string) fetchCredentialsFunc { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simple refactor to change the onConnect functions to credentials provider. |
||
return func(ctx context.Context) (string, string, error) { | ||
return username, password, nil | ||
} | ||
} | ||
|
||
// fetchCredentialsOnConnect returns an onClientConnectFunc that does an | ||
// fetchCredentialsOnConnect returns an fetchCredentialsFunc that does an | ||
// authorization check, calls a provided credential fetcher callback func, | ||
// then logs an AUTH query to the audit log once and and uses the credentials to | ||
// auth a new connection. | ||
func fetchCredentialsOnConnect(closeCtx context.Context, sessionCtx *common.Session, audit common.Audit, fetchCreds fetchCredentialsFunc) onClientConnectFunc { | ||
func fetchCredentialsOnConnect(closeCtx context.Context, sessionCtx *common.Session, audit common.Audit, fetchCreds fetchCredentialsFunc) fetchCredentialsFunc { | ||
// audit log one time, to avoid excessive audit logs from reconnects. | ||
var auditOnce sync.Once | ||
return func(ctx context.Context, conn *redis.Conn) error { | ||
return func(ctx context.Context) (string, string, error) { | ||
err := sessionCtx.Checker.CheckAccess(sessionCtx.Database, | ||
services.AccessState{MFAVerified: true}, | ||
role.GetDatabaseRoleMatchers(role.RoleMatchersConfig{ | ||
|
@@ -172,11 +162,11 @@ func fetchCredentialsOnConnect(closeCtx context.Context, sessionCtx *common.Sess | |
DatabaseName: sessionCtx.DatabaseName, | ||
})...) | ||
if err != nil { | ||
return trace.Wrap(err) | ||
return "", "", trace.Wrap(err) | ||
} | ||
username, password, err := fetchCreds(ctx) | ||
if err != nil { | ||
return trace.Wrap(err) | ||
return "", "", trace.Wrap(err) | ||
} | ||
auditOnce.Do(func() { | ||
var query string | ||
|
@@ -187,7 +177,7 @@ func fetchCredentialsOnConnect(closeCtx context.Context, sessionCtx *common.Sess | |
} | ||
audit.OnQuery(closeCtx, sessionCtx, common.Query{Query: query}) | ||
}) | ||
return authConnection(ctx, conn, username, password) | ||
return username, password, nil | ||
} | ||
} | ||
|
||
|
@@ -256,23 +246,6 @@ func awsIAMTokenFetchFunc(sessionCtx *common.Session, auth common.Auth) (fetchCr | |
} | ||
} | ||
|
||
// authConnection is a helper function that sends "auth" command to provided | ||
// Redis connection with provided username and password. | ||
func authConnection(ctx context.Context, conn *redis.Conn, username, password string) error { | ||
// Copied from redis.baseClient.initConn. | ||
_, err := conn.Pipelined(ctx, func(pipe redis.Pipeliner) error { | ||
if password != "" { | ||
if username != "" { | ||
pipe.AuthACL(ctx, username, password) | ||
} else { | ||
pipe.Auth(ctx, password) | ||
} | ||
} | ||
return nil | ||
}) | ||
return trace.Wrap(err) | ||
} | ||
|
||
// Process add supports for additional cluster commands. Our Redis implementation passes most commands to | ||
// go-redis `Process()` function which doesn't handel all Cluster commands like for ex. DBSIZE, FLUSHDB, etc. | ||
// This function provides additional processing for those commands enabling more Redis commands in Cluster mode. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ import ( | |
"net" | ||
"slices" | ||
"strings" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/service/elasticache" | ||
|
@@ -75,6 +76,8 @@ type Engine struct { | |
redisClient redis.UniversalClient | ||
// awsIAMAuthSupported is the saved result of isAWSIAMAuthSupported. | ||
awsIAMAuthSupported *bool | ||
// clientMessageRead indicates processing client messages has started. | ||
clientMessageRead bool | ||
} | ||
|
||
// InitializeConnection initializes the database connection. | ||
|
@@ -145,12 +148,52 @@ func (e *Engine) SendError(redisErr error) { | |
return | ||
} | ||
|
||
// If the first message is a HELLO test, do not return authentication | ||
// errors to the HELLO command as it can be swallowed by the client as part | ||
// of its fallback mechanism. First return the unknown command error then | ||
// send the authentication errors to the next incoming command (usually | ||
// AUTH). | ||
// | ||
// Background: The HELLO test is used for establishing the RESP3 protocol | ||
// but Teleport currently only supports RESP2. The client generally | ||
// fallbacks to RESP2 when they receive an unknown command error for the | ||
// HELLO message. | ||
e.maybeHandleFirstHello() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic is caught and verified by |
||
|
||
if err := e.sendToClient(redisErr); err != nil { | ||
e.Log.ErrorContext(e.Context, "Failed to send message to the client.", "error", err) | ||
return | ||
} | ||
} | ||
|
||
// maybeHandleFirstHello replies an unknown command error to the client if the | ||
// first message is a HELLO test. | ||
func (e *Engine) maybeHandleFirstHello() { | ||
// Return if not the first message. | ||
if e.clientMessageRead { | ||
return | ||
} | ||
|
||
// Let's not wait forever for the HELLO message. | ||
ctx, cancel := context.WithTimeout(e.Context, 10*time.Second) | ||
defer cancel() | ||
|
||
cmd, err := e.readClientCmd(ctx) | ||
if err != nil { | ||
e.Log.ErrorContext(e.Context, "Failed to read first client message.", "error", err) | ||
return | ||
} | ||
|
||
// Return if not a HELLO. | ||
if strings.ToLower(cmd.Name()) != helloCmd { | ||
return | ||
} | ||
response := protocol.MakeUnknownCommandErrorForCmd(cmd) | ||
if err := e.sendToClient(response); err != nil { | ||
e.Log.ErrorContext(e.Context, "Failed to send message to the client.", "error", err) | ||
} | ||
} | ||
|
||
// sendToClient sends a command to connected Redis client. | ||
func (e *Engine) sendToClient(vals interface{}) error { | ||
if vals == nil { | ||
|
@@ -249,12 +292,12 @@ func (e *Engine) getNewClientFn(ctx context.Context, sessionCtx *common.Session) | |
} | ||
|
||
return func(username, password string) (redis.UniversalClient, error) { | ||
onConnect, err := e.createOnClientConnectFunc(ctx, sessionCtx, username, password) | ||
credenialsProvider, err := e.createCredentialsProvider(ctx, sessionCtx, username, password) | ||
if err != nil { | ||
return nil, trace.Wrap(err) | ||
} | ||
|
||
redisClient, err := newClient(ctx, connectionOptions, tlsConfig, onConnect) | ||
redisClient, err := newClient(ctx, connectionOptions, tlsConfig, credenialsProvider) | ||
if err != nil { | ||
return nil, trace.Wrap(err) | ||
} | ||
|
@@ -263,9 +306,10 @@ func (e *Engine) getNewClientFn(ctx context.Context, sessionCtx *common.Session) | |
}, nil | ||
} | ||
|
||
// createOnClientConnectFunc creates a callback function that is called after a | ||
// successful client connection with the Redis server. | ||
func (e *Engine) createOnClientConnectFunc(ctx context.Context, sessionCtx *common.Session, username, password string) (onClientConnectFunc, error) { | ||
// createCredentialsProvider creates a callback function that provides username | ||
// and password. | ||
// This function may return nil, nil as nil credenialsProvider is valid. | ||
func (e *Engine) createCredentialsProvider(ctx context.Context, sessionCtx *common.Session, username, password string) (fetchCredentialsFunc, error) { | ||
switch { | ||
// If password is provided by client. | ||
case password != "": | ||
|
@@ -299,7 +343,7 @@ func (e *Engine) createOnClientConnectFunc(ctx context.Context, sessionCtx *comm | |
return fetchCredentialsOnConnect(e.Context, sessionCtx, e.Audit, credFetchFn), nil | ||
|
||
default: | ||
return noopOnConnect, nil | ||
return nil, nil | ||
} | ||
} | ||
|
||
|
@@ -454,6 +498,8 @@ func (e *Engine) process(ctx context.Context, sessionCtx *common.Session) error | |
|
||
// readClientCmd reads commands from connected Redis client. | ||
func (e *Engine) readClientCmd(ctx context.Context) (*redis.Cmd, error) { | ||
e.clientMessageRead = true | ||
|
||
cmd := &redis.Cmd{} | ||
if err := cmd.ReadReply(e.clientReader); err != nil { | ||
return nil, trace.Wrap(err) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the previous driver, we disabled auth on connect and performed auth during
OnConnect
callback.We are switching to use
CredentialsProviderContext
now for auth and we setProtocol
to v2 so the driver won't do RESP3 HELLOThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking at drive code it will always send HELLO 2 now instead of not doing HELLO at all (on connection), right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, and the auth will be done using
HELLO 2 AUTH <user> <password>
instead of just AUTH. In my opinion, it's a win for newer servers. I haven't tested older version (<6.2) tho. The driver suppose to fallback to regular AUTH. We can test that during release testing.