From 9f3aa8c76ed723f20d75157e28148ac3b3d83980 Mon Sep 17 00:00:00 2001 From: Luigi Tagliamonte Date: Fri, 18 Oct 2019 12:56:25 -0700 Subject: [PATCH] update gocql dependency --- Gopkg.lock | 4 +- vendor/github.com/gocql/gocql/.travis.yml | 16 +- vendor/github.com/gocql/gocql/AUTHORS | 4 + vendor/github.com/gocql/gocql/README.md | 4 +- vendor/github.com/gocql/gocql/cluster.go | 33 +- vendor/github.com/gocql/gocql/conn.go | 202 +++++++--- .../github.com/gocql/gocql/connectionpool.go | 18 +- vendor/github.com/gocql/gocql/control.go | 26 +- vendor/github.com/gocql/gocql/frame.go | 3 + vendor/github.com/gocql/gocql/go.mod | 6 + vendor/github.com/gocql/gocql/go.modverify | 3 - vendor/github.com/gocql/gocql/go.sum | 22 ++ vendor/github.com/gocql/gocql/helpers.go | 8 + vendor/github.com/gocql/gocql/host_source.go | 45 ++- vendor/github.com/gocql/gocql/integration.sh | 7 +- vendor/github.com/gocql/gocql/marshal.go | 115 ++++-- vendor/github.com/gocql/gocql/metadata.go | 69 +++- vendor/github.com/gocql/gocql/policies.go | 324 ++++++++++------ .../github.com/gocql/gocql/query_executor.go | 53 ++- vendor/github.com/gocql/gocql/session.go | 348 ++++++++++-------- vendor/github.com/gocql/gocql/token.go | 25 +- vendor/github.com/gocql/gocql/topology.go | 138 +++++-- vendor/github.com/gocql/gocql/uuid.go | 51 ++- 23 files changed, 1037 insertions(+), 487 deletions(-) delete mode 100644 vendor/github.com/gocql/gocql/go.modverify create mode 100644 vendor/github.com/gocql/gocql/go.sum diff --git a/Gopkg.lock b/Gopkg.lock index 2a641ce6..4380df60 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -109,7 +109,7 @@ [[projects]] branch = "master" - digest = "1:f7a570d735ec7933137f695ff21baddc15b25578a0508947c9a805367487bf66" + digest = "1:2b3ea78fabc590b377699be537fbdffba08d03d41ea7ee69d39914a6fffa82b4" name = "github.com/gocql/gocql" packages = [ ".", @@ -118,7 +118,7 @@ "internal/streams", ] pruneopts = "T" - revision = "68ae1e384be4d7cd7df0f5b77f331759d806308e" + revision = "07ace3bab0f84bb88477bab5d79ba1f7e1da0169" [[projects]] digest = "1:a2ecb56e5053d942aafc86738915fb94c9131bac848c543b8b6764365fd69080" diff --git a/vendor/github.com/gocql/gocql/.travis.yml b/vendor/github.com/gocql/gocql/.travis.yml index f11cb137..778a1bee 100644 --- a/vendor/github.com/gocql/gocql/.travis.yml +++ b/vendor/github.com/gocql/gocql/.travis.yml @@ -13,24 +13,26 @@ matrix: branches: only: - - master + - master env: global: - GOMAXPROCS=2 matrix: - - CASS=2.2.13 + - CASS=2.1.21 AUTH=true - - CASS=2.2.13 + - CASS=2.2.14 + AUTH=true + - CASS=2.2.14 AUTH=false - - CASS=3.0.17 + - CASS=3.0.18 AUTH=false - - CASS=3.11.3 + - CASS=3.11.4 AUTH=false go: - - "1.10" - - "1.11" + - 1.12.x + - 1.13.x install: - ./install_test_deps.sh $TRAVIS_REPO_SLUG diff --git a/vendor/github.com/gocql/gocql/AUTHORS b/vendor/github.com/gocql/gocql/AUTHORS index f4528e2b..463be1ac 100644 --- a/vendor/github.com/gocql/gocql/AUTHORS +++ b/vendor/github.com/gocql/gocql/AUTHORS @@ -108,3 +108,7 @@ Luke Hines Jacob Greenleaf Alex Lourie ; Marco Cadetg +Karl Matthias +Thomas Meson +Martin Sucha ; +Pavel Buchinchik diff --git a/vendor/github.com/gocql/gocql/README.md b/vendor/github.com/gocql/gocql/README.md index 1b3fd03b..a35d9f3c 100644 --- a/vendor/github.com/gocql/gocql/README.md +++ b/vendor/github.com/gocql/gocql/README.md @@ -19,8 +19,8 @@ The following matrix shows the versions of Go and Cassandra that are tested with Go/Cassandra | 2.1.x | 2.2.x | 3.x.x -------------| -------| ------| --------- -1.10 | yes | yes | yes -1.11 | yes | yes | yes +1.12 | yes | yes | yes +1.13 | yes | yes | yes Gocql has been tested in production against many different versions of Cassandra. Due to limits in our CI setup we only test against the latest 3 major releases, which coincide with the official support from the Apache project. diff --git a/vendor/github.com/gocql/gocql/cluster.go b/vendor/github.com/gocql/gocql/cluster.go index 32b9e0a4..ab0ab8a0 100644 --- a/vendor/github.com/gocql/gocql/cluster.go +++ b/vendor/github.com/gocql/gocql/cluster.go @@ -45,22 +45,23 @@ type ClusterConfig struct { // highest supported protocol for the cluster. In clusters with nodes of different // versions the protocol selected is not defined (ie, it can be any of the supported in the cluster) ProtoVersion int - Timeout time.Duration // connection timeout (default: 600ms) - ConnectTimeout time.Duration // initial connection timeout, used during initial dial to server (default: 600ms) - Port int // port (default: 9042) - Keyspace string // initial keyspace (optional) - NumConns int // number of connections per host (default: 2) - Consistency Consistency // default consistency level (default: Quorum) - Compressor Compressor // compression algorithm (default: nil) - Authenticator Authenticator // authenticator (default: nil) - RetryPolicy RetryPolicy // Default retry policy to use for queries (default: 0) - ConvictionPolicy ConvictionPolicy // Decide whether to mark host as down based on the error and host info (default: SimpleConvictionPolicy) - ReconnectionPolicy ReconnectionPolicy // Default reconnection policy to use for reconnecting before trying to mark host as down (default: see below) - SocketKeepalive time.Duration // The keepalive period to use, enabled if > 0 (default: 0) - MaxPreparedStmts int // Sets the maximum cache size for prepared statements globally for gocql (default: 1000) - MaxRoutingKeyInfo int // Sets the maximum cache size for query info about statements for each session (default: 1000) - PageSize int // Default page size to use for created sessions (default: 5000) - SerialConsistency SerialConsistency // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset) + Timeout time.Duration // connection timeout (default: 600ms) + ConnectTimeout time.Duration // initial connection timeout, used during initial dial to server (default: 600ms) + Port int // port (default: 9042) + Keyspace string // initial keyspace (optional) + NumConns int // number of connections per host (default: 2) + Consistency Consistency // default consistency level (default: Quorum) + Compressor Compressor // compression algorithm (default: nil) + Authenticator Authenticator // authenticator (default: nil) + AuthProvider func(h *HostInfo) (Authenticator, error) // an authenticator factory. Can be used to create alternative authenticators (default: nil) + RetryPolicy RetryPolicy // Default retry policy to use for queries (default: 0) + ConvictionPolicy ConvictionPolicy // Decide whether to mark host as down based on the error and host info (default: SimpleConvictionPolicy) + ReconnectionPolicy ReconnectionPolicy // Default reconnection policy to use for reconnecting before trying to mark host as down (default: see below) + SocketKeepalive time.Duration // The keepalive period to use, enabled if > 0 (default: 0) + MaxPreparedStmts int // Sets the maximum cache size for prepared statements globally for gocql (default: 1000) + MaxRoutingKeyInfo int // Sets the maximum cache size for query info about statements for each session (default: 1000) + PageSize int // Default page size to use for created sessions (default: 5000) + SerialConsistency SerialConsistency // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset) SslOpts *SslOptions DefaultTimestamp bool // Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server. (default: true, only enabled for protocol 3 and above) // PoolConfig configures the underlying connection pool, allowing the diff --git a/vendor/github.com/gocql/gocql/conn.go b/vendor/github.com/gocql/gocql/conn.go index d4f2786a..553292ab 100644 --- a/vendor/github.com/gocql/gocql/conn.go +++ b/vendor/github.com/gocql/gocql/conn.go @@ -28,6 +28,8 @@ var ( "org.apache.cassandra.auth.PasswordAuthenticator", "com.instaclustr.cassandra.auth.SharedSecretAuthenticator", "com.datastax.bdp.cassandra.auth.DseAuthenticator", + "io.aiven.cassandra.auth.AivenAuthenticator", + "com.ericsson.bss.cassandra.ecaudit.auth.AuditPasswordAuthenticator", } ) @@ -98,8 +100,11 @@ type ConnConfig struct { ConnectTimeout time.Duration Compressor Compressor Authenticator Authenticator + AuthProvider func(h *HostInfo) (Authenticator, error) Keepalive time.Duration - tlsConfig *tls.Config + + tlsConfig *tls.Config + disableCoalesce bool } type ConnErrorHandler interface { @@ -135,7 +140,7 @@ type Conn struct { headerBuf [maxFrameHeaderSize]byte streams *streams.IDGenerator - mu sync.RWMutex + mu sync.Mutex calls map[int]*callReq errorHandler ConnErrorHandler @@ -155,13 +160,39 @@ type Conn struct { timeouts int64 } -// Connect establishes a connection to a Cassandra node. -func (s *Session) dial(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) { +// connect establishes a connection to a Cassandra node using session's connection config. +func (s *Session) connect(host *HostInfo, errorHandler ConnErrorHandler) (*Conn, error) { + return s.dial(host, s.connCfg, errorHandler) +} + +// dial establishes a connection to a Cassandra node and notifies the session's connectObserver. +func (s *Session) dial(host *HostInfo, connConfig *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) { + var obs ObservedConnect + if s.connectObserver != nil { + obs.Host = host + obs.Start = time.Now() + } + + conn, err := s.dialWithoutObserver(host, connConfig, errorHandler) + + if s.connectObserver != nil { + obs.End = time.Now() + obs.Err = err + s.connectObserver.ObserveConnect(obs) + } + + return conn, err +} + +// dialWithoutObserver establishes connection to a Cassandra node. +// +// dialWithoutObserver does not notify the connection observer, so you most probably want to call dial() instead. +func (s *Session) dialWithoutObserver(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) { ip := host.ConnectAddress() port := host.port // TODO(zariel): remove these - if len(ip) == 0 || ip.IsUnspecified() { + if !validIpAddr(ip) { panic(fmt.Sprintf("host missing connect ip address: %v", ip)) } else if port == 0 { panic(fmt.Sprintf("host missing port: %v", port)) @@ -179,15 +210,12 @@ func (s *Session) dial(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHa dialer.KeepAlive = cfg.Keepalive } - // TODO(zariel): handle ipv6 zone - addr := (&net.TCPAddr{IP: ip, Port: port}).String() - if cfg.tlsConfig != nil { // the TLS config is safe to be reused by connections but it must not // be modified after being used. - conn, err = tls.DialWithDialer(dialer, "tcp", addr, cfg.tlsConfig) + conn, err = tls.DialWithDialer(dialer, "tcp", host.HostnameAndPort(), cfg.tlsConfig) } else { - conn, err = dialer.Dial("tcp", addr) + conn, err = dialer.Dial("tcp", host.HostnameAndPort()) } if err != nil { @@ -203,7 +231,6 @@ func (s *Session) dial(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHa addr: conn.RemoteAddr().String(), errorHandler: errorHandler, compressor: cfg.Compressor, - auth: cfg.Authenticator, quit: make(chan struct{}), session: s, streams: streams.New(cfg.ProtoVersion), @@ -215,6 +242,15 @@ func (s *Session) dial(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHa }, } + if cfg.AuthProvider != nil { + c.auth, err = cfg.AuthProvider(host) + if err != nil { + return nil, err + } + } else { + c.auth = cfg.Authenticator + } + var ( ctx context.Context cancel func() @@ -240,11 +276,12 @@ func (s *Session) dial(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHa c.timeout = cfg.Timeout // dont coalesce startup frames - if s.cfg.WriteCoalesceWaitTime > 0 { - c.w = newWriteCoalescer(c.w, s.cfg.WriteCoalesceWaitTime, c.quit) + if s.cfg.WriteCoalesceWaitTime > 0 && !cfg.disableCoalesce { + c.w = newWriteCoalescer(conn, c.timeout, s.cfg.WriteCoalesceWaitTime, c.quit) } go c.serve() + go c.heartBeat() return c, nil } @@ -432,7 +469,7 @@ func (c *Conn) closeWithError(err error) { // we should attempt to deliver the error back to the caller if it // exists if err != nil { - c.mu.RLock() + c.mu.Lock() for _, req := range c.calls { // we need to send the error to all waiting queries, put the state // of this conn into not active so that it can not execute any queries. @@ -441,7 +478,7 @@ func (c *Conn) closeWithError(err error) { case <-req.timeout: } } - c.mu.RUnlock() + c.mu.Unlock() } // if error was nil then unblock the quit channel @@ -495,6 +532,53 @@ func (p *protocolError) Error() string { return fmt.Sprintf("gocql: received unexpected frame on stream %d: %v", p.frame.Header().stream, p.frame) } +func (c *Conn) heartBeat() { + sleepTime := 1 * time.Second + timer := time.NewTimer(sleepTime) + defer timer.Stop() + + var failures int + + for { + if failures > 5 { + c.closeWithError(fmt.Errorf("gocql: heartbeat failed")) + return + } + + timer.Reset(sleepTime) + + select { + case <-c.quit: + return + case <-timer.C: + } + + framer, err := c.exec(context.Background(), &writeOptionsFrame{}, nil) + if err != nil { + failures++ + continue + } + + resp, err := framer.parseFrame() + if err != nil { + // invalid frame + failures++ + continue + } + + switch resp.(type) { + case *supportedFrame: + // Everything ok + sleepTime = 5 * time.Second + failures = 0 + case error: + // TODO: should we do something here? + default: + panic(fmt.Sprintf("gocql: unknown frame in response to options: %T", resp)) + } + } +} + func (c *Conn) recv() error { // not safe for concurrent reads @@ -521,6 +605,7 @@ func (c *Conn) recv() error { Length: int32(head.length), Start: headStartTime, End: headEndTime, + Host: c.host, }) } @@ -552,12 +637,15 @@ func (c *Conn) recv() error { } } - c.mu.RLock() + c.mu.Lock() call, ok := c.calls[head.stream] - c.mu.RUnlock() + delete(c.calls, head.stream) + c.mu.Unlock() if call == nil || call.framer == nil || !ok { Logger.Printf("gocql: received response for stream which has no handler: header=%v\n", head) return c.discardFrame(head) + } else if head.stream != call.streamID { + panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream)) } err = call.framer.readFrame(&head) @@ -574,30 +662,19 @@ func (c *Conn) recv() error { select { case call.resp <- err: case <-call.timeout: - c.releaseStream(head.stream) + c.releaseStream(call) case <-c.quit: } return nil } -func (c *Conn) releaseStream(stream int) { - c.mu.Lock() - call := c.calls[stream] - if call != nil && stream != call.streamID { - panic(fmt.Sprintf("attempt to release streamID with invalid stream: %d -> %+v\n", stream, call)) - } else if call == nil { - panic(fmt.Sprintf("releasing a stream not in use: %d", stream)) - } - delete(c.calls, stream) - c.mu.Unlock() - +func (c *Conn) releaseStream(call *callReq) { if call.timer != nil { call.timer.Stop() } - streamPool.Put(call) - c.streams.Clear(stream) + c.streams.Clear(call.streamID) } func (c *Conn) handleTimeout() { @@ -606,16 +683,6 @@ func (c *Conn) handleTimeout() { } } -var ( - streamPool = sync.Pool{ - New: func() interface{} { - return &callReq{ - resp: make(chan error), - } - }, - } -) - type callReq struct { // could use a waitgroup but this allows us to do timeouts on the read/send resp chan error @@ -641,19 +708,20 @@ func (c *deadlineWriter) Write(p []byte) (int, error) { return c.w.Write(p) } -func newWriteCoalescer(w io.Writer, d time.Duration, quit <-chan struct{}) *writeCoalescer { +func newWriteCoalescer(conn net.Conn, timeout time.Duration, d time.Duration, quit <-chan struct{}) *writeCoalescer { wc := &writeCoalescer{ writeCh: make(chan struct{}), // TODO: could this be sync? cond: sync.NewCond(&sync.Mutex{}), - w: w, + c: conn, quit: quit, + timeout: timeout, } go wc.writeFlusher(d) return wc } type writeCoalescer struct { - w io.Writer + c net.Conn quit <-chan struct{} writeCh chan struct{} @@ -662,6 +730,7 @@ type writeCoalescer struct { // cond waits for the buffer to be flushed cond *sync.Cond buffers net.Buffers + timeout time.Duration // result of the write err error @@ -673,10 +742,14 @@ func (w *writeCoalescer) flushLocked() { return } + if w.timeout > 0 { + w.c.SetWriteDeadline(time.Now().Add(w.timeout)) + } + // Given we are going to do a fanout n is useless and according to // the docs WriteTo should return 0 and err or bytes written and // no error. - _, w.err = w.buffers.WriteTo(w.w) + _, w.err = w.buffers.WriteTo(w.c) if w.err != nil { w.buffers = nil } @@ -766,10 +839,12 @@ func (c *Conn) exec(ctx context.Context, req frameWriter, tracer Tracer) (*frame // resp is basically a waiting semaphore protecting the framer framer := newFramer(c, c, c.compressor, c.version) - call := streamPool.Get().(*callReq) - call.framer = framer - call.timeout = make(chan struct{}) - call.streamID = stream + call := &callReq{ + framer: framer, + timeout: make(chan struct{}), + streamID: stream, + resp: make(chan error), + } c.mu.Lock() existingCall := c.calls[stream] @@ -833,7 +908,7 @@ func (c *Conn) exec(ctx context.Context, req frameWriter, tracer Tracer) (*frame // this is because the request is still outstanding and we have // been handed another error from another stream which caused the // connection to close. - c.releaseStream(stream) + c.releaseStream(call) } return nil, err } @@ -854,7 +929,7 @@ func (c *Conn) exec(ctx context.Context, req frameWriter, tracer Tracer) (*frame // // Ensure that the stream is not released if there are potentially outstanding // requests on the stream to prevent nil pointer dereferences in recv(). - defer c.releaseStream(stream) + defer c.releaseStream(call) if v := framer.header.version.version(); v != c.version { return nil, NewErrProtocol("unexpected protocol version in response: got %d expected %d", v, c.version) @@ -1280,11 +1355,12 @@ func (c *Conn) query(ctx context.Context, statement string, values ...interface{ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) { const ( - peerSchemas = "SELECT schema_version, peer FROM system.peers" + peerSchemas = "SELECT * FROM system.peers" localSchemas = "SELECT schema_version FROM system.local WHERE key='local'" ) var versions map[string]struct{} + var schemaVersion string endDeadline := time.Now().Add(c.session.cfg.MaxWaitSchemaAgreement) for time.Now().Before(endDeadline) { @@ -1292,16 +1368,22 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) { versions = make(map[string]struct{}) - var schemaVersion string - var peer string - for iter.Scan(&schemaVersion, &peer) { - if schemaVersion == "" { - Logger.Printf("skipping peer entry with empty schema_version: peer=%q", peer) + rows, err := iter.SliceMap() + if err != nil { + goto cont + } + + for _, row := range rows { + host, err := c.session.hostInfoFromMap(row, &HostInfo{connectAddress: c.host.ConnectAddress(), port: c.session.cfg.Port}) + if err != nil { + goto cont + } + if !isValidPeer(host) || host.schemaVersion == "" { + Logger.Printf("invalid peer or peer with empty schema_version: peer=%q", host) continue } - versions[schemaVersion] = struct{}{} - schemaVersion = "" + versions[host.schemaVersion] = struct{}{} } if err = iter.Close(); err != nil { @@ -1352,7 +1434,7 @@ func (c *Conn) localHostInfo(ctx context.Context) (*HostInfo, error) { port := c.conn.RemoteAddr().(*net.TCPAddr).Port // TODO(zariel): avoid doing this here - host, err := c.session.hostInfoFromMap(row, port) + host, err := c.session.hostInfoFromMap(row, &HostInfo{connectAddress: c.host.connectAddress, port: port}) if err != nil { return nil, err } diff --git a/vendor/github.com/gocql/gocql/connectionpool.go b/vendor/github.com/gocql/gocql/connectionpool.go index 7bfb0875..bf2388e1 100644 --- a/vendor/github.com/gocql/gocql/connectionpool.go +++ b/vendor/github.com/gocql/gocql/connectionpool.go @@ -90,14 +90,16 @@ func connConfig(cfg *ClusterConfig) (*ConnConfig, error) { } return &ConnConfig{ - ProtoVersion: cfg.ProtoVersion, - CQLVersion: cfg.CQLVersion, - Timeout: cfg.Timeout, - ConnectTimeout: cfg.ConnectTimeout, - Compressor: cfg.Compressor, - Authenticator: cfg.Authenticator, - Keepalive: cfg.SocketKeepalive, - tlsConfig: tlsConfig, + ProtoVersion: cfg.ProtoVersion, + CQLVersion: cfg.CQLVersion, + Timeout: cfg.Timeout, + ConnectTimeout: cfg.ConnectTimeout, + Compressor: cfg.Compressor, + Authenticator: cfg.Authenticator, + AuthProvider: cfg.AuthProvider, + Keepalive: cfg.SocketKeepalive, + tlsConfig: tlsConfig, + disableCoalesce: tlsConfig != nil, // write coalescing doesn't work with framing on top of TCP like in TLS. }, nil } diff --git a/vendor/github.com/gocql/gocql/control.go b/vendor/github.com/gocql/gocql/control.go index 927e4c3a..21aa1151 100644 --- a/vendor/github.com/gocql/gocql/control.go +++ b/vendor/github.com/gocql/gocql/control.go @@ -116,7 +116,7 @@ func hostInfo(addr string, defaultPort int) ([]*HostInfo, error) { // Check if host is a literal IP address if ip := net.ParseIP(host); ip != nil { - hosts = append(hosts, &HostInfo{connectAddress: ip, port: port}) + hosts = append(hosts, &HostInfo{hostname: host, connectAddress: ip, port: port}) return hosts, nil } @@ -142,21 +142,21 @@ func hostInfo(addr string, defaultPort int) ([]*HostInfo, error) { } for _, ip := range ips { - hosts = append(hosts, &HostInfo{connectAddress: ip, port: port}) + hosts = append(hosts, &HostInfo{hostname: host, connectAddress: ip, port: port}) } return hosts, nil } func shuffleHosts(hosts []*HostInfo) []*HostInfo { - mutRandr.Lock() - perm := randr.Perm(len(hosts)) - mutRandr.Unlock() shuffled := make([]*HostInfo, len(hosts)) + copy(shuffled, hosts) - for i, host := range hosts { - shuffled[perm[i]] = host - } + mutRandr.Lock() + randr.Shuffle(len(hosts), func(i, j int) { + shuffled[i], shuffled[j] = shuffled[j], shuffled[i] + }) + mutRandr.Unlock() return shuffled } @@ -166,10 +166,13 @@ func (c *controlConn) shuffleDial(endpoints []*HostInfo) (*Conn, error) { // node. shuffled := shuffleHosts(endpoints) + cfg := *c.session.connCfg + cfg.disableCoalesce = true + var err error for _, host := range shuffled { var conn *Conn - conn, err = c.session.connect(host, c) + conn, err = c.session.dial(host, &cfg, c) if err == nil { return conn, nil } @@ -386,7 +389,10 @@ func (c *controlConn) HandleError(conn *Conn, err error, closed bool) { } oldConn := c.getConn() - if oldConn.conn != conn { + + // If connection has long gone, and not been attempted for awhile, + // it's possible to have oldConn as nil here (#1297). + if oldConn != nil && oldConn.conn != conn { return } diff --git a/vendor/github.com/gocql/gocql/frame.go b/vendor/github.com/gocql/gocql/frame.go index d959b2c1..5fc94889 100644 --- a/vendor/github.com/gocql/gocql/frame.go +++ b/vendor/github.com/gocql/gocql/frame.go @@ -361,6 +361,9 @@ type ObservedFrameHeader struct { Start time.Time // EndHeader is the time we finished reading the frame header off the network connection. End time.Time + + // Host is Host of the connection the frame header was read from. + Host *HostInfo } func (f ObservedFrameHeader) String() string { diff --git a/vendor/github.com/gocql/gocql/go.mod b/vendor/github.com/gocql/gocql/go.mod index a3c38054..70e98c86 100644 --- a/vendor/github.com/gocql/gocql/go.mod +++ b/vendor/github.com/gocql/gocql/go.mod @@ -1,7 +1,13 @@ module github.com/gocql/gocql require ( + github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect + github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/golang/snappy v0.0.0-20170215233205-553a64147049 github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed + github.com/kr/pretty v0.1.0 // indirect + github.com/stretchr/testify v1.3.0 // indirect gopkg.in/inf.v0 v0.9.1 ) + +go 1.13 diff --git a/vendor/github.com/gocql/gocql/go.modverify b/vendor/github.com/gocql/gocql/go.modverify deleted file mode 100644 index fa9fc601..00000000 --- a/vendor/github.com/gocql/gocql/go.modverify +++ /dev/null @@ -1,3 +0,0 @@ -github.com/golang/snappy v0.0.0-20170215233205-553a64147049 h1:K9KHZbXKpGydfDN0aZrsoHpLJlZsBrGMFWbgLDGnPZk= -github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= -gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= diff --git a/vendor/github.com/gocql/gocql/go.sum b/vendor/github.com/gocql/gocql/go.sum new file mode 100644 index 00000000..a2bcaf78 --- /dev/null +++ b/vendor/github.com/gocql/gocql/go.sum @@ -0,0 +1,22 @@ +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/snappy v0.0.0-20170215233205-553a64147049 h1:K9KHZbXKpGydfDN0aZrsoHpLJlZsBrGMFWbgLDGnPZk= +github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= diff --git a/vendor/github.com/gocql/gocql/helpers.go b/vendor/github.com/gocql/gocql/helpers.go index 7259c746..0eb30d07 100644 --- a/vendor/github.com/gocql/gocql/helpers.go +++ b/vendor/github.com/gocql/gocql/helpers.go @@ -26,6 +26,8 @@ func goType(t TypeInfo) reflect.Type { return reflect.TypeOf(*new(string)) case TypeBigInt, TypeCounter: return reflect.TypeOf(*new(int64)) + case TypeTime: + return reflect.TypeOf(*new(time.Duration)) case TypeTimestamp: return reflect.TypeOf(*new(time.Time)) case TypeBlob: @@ -91,6 +93,10 @@ func getCassandraBaseType(name string) Type { return TypeFloat case "int": return TypeInt + case "tinyint": + return TypeTinyInt + case "time": + return TypeTime case "timestamp": return TypeTimestamp case "uuid": @@ -229,6 +235,8 @@ func getApacheCassandraType(class string) Type { return TypeSmallInt case "ByteType": return TypeTinyInt + case "TimeType": + return TypeTime case "DateType", "TimestampType": return TypeTimestamp case "UUIDType", "LexicalUUIDType": diff --git a/vendor/github.com/gocql/gocql/host_source.go b/vendor/github.com/gocql/gocql/host_source.go index 8d95e5d6..f8ab3c10 100644 --- a/vendor/github.com/gocql/gocql/host_source.go +++ b/vendor/github.com/gocql/gocql/host_source.go @@ -89,6 +89,10 @@ func (c cassVersion) Before(major, minor, patch int) bool { return false } +func (c cassVersion) AtLeast(major, minor, patch int) bool { + return !c.Before(major, minor, patch) +} + func (c cassVersion) String() string { return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch) } @@ -106,6 +110,7 @@ type HostInfo struct { // TODO(zariel): reduce locking maybe, not all values will change, but to ensure // that we are thread safe use a mutex to access all fields. mu sync.RWMutex + hostname string peer net.IP broadcastAddress net.IP listenAddress net.IP @@ -123,6 +128,7 @@ type HostInfo struct { clusterName string version cassVersion state nodeState + schemaVersion string tokens []string } @@ -222,8 +228,9 @@ func (h *HostInfo) PreferredIP() net.IP { func (h *HostInfo) DataCenter() string { h.mu.RLock() - defer h.mu.RUnlock() - return h.dataCenter + dc := h.dataCenter + h.mu.RUnlock() + return dc } func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo { @@ -235,8 +242,9 @@ func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo { func (h *HostInfo) Rack() string { h.mu.RLock() - defer h.mu.RUnlock() - return h.rack + rack := h.rack + h.mu.RUnlock() + return rack } func (h *HostInfo) setRack(rack string) *HostInfo { @@ -407,15 +415,22 @@ func (h *HostInfo) IsUp() bool { return h != nil && h.State() == NodeUp } +func (h *HostInfo) HostnameAndPort() string { + if h.hostname == "" { + h.hostname = h.ConnectAddress().String() + } + return net.JoinHostPort(h.hostname, strconv.Itoa(h.port)) +} + func (h *HostInfo) String() string { h.mu.RLock() defer h.mu.RUnlock() connectAddr, source := h.connectAddressLocked() - return fmt.Sprintf("[HostInfo connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+ + return fmt.Sprintf("[HostInfo hostname=%q connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+ "preferred_ip=%q connect_addr=%q connect_addr_source=%q "+ "port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", - h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, h.preferredIP, + h.hostname, h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, h.preferredIP, connectAddr, source, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens)) } @@ -446,15 +461,11 @@ func checkSystemSchema(control *controlConn) (bool, error) { // Given a map that represents a row from either system.local or system.peers // return as much information as we can in *HostInfo -func (s *Session) hostInfoFromMap(row map[string]interface{}, port int) (*HostInfo, error) { +func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (*HostInfo, error) { const assertErrorMsg = "Assertion failed for %s" var ok bool // Default to our connected port if the cluster doesn't have port information - host := HostInfo{ - port: port, - } - for key, value := range row { switch key { case "data_center": @@ -539,6 +550,12 @@ func (s *Session) hostInfoFromMap(row map[string]interface{}, port int) (*HostIn if !ok { return nil, fmt.Errorf(assertErrorMsg, "dse_version") } + case "schema_version": + schemaVersion, ok := value.(UUID) + if !ok { + return nil, fmt.Errorf(assertErrorMsg, "schema_version") + } + host.schemaVersion = schemaVersion.String() } // TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete // Not sure what the port field will be called until the JIRA issue is complete @@ -548,7 +565,7 @@ func (s *Session) hostInfoFromMap(row map[string]interface{}, port int) (*HostIn host.connectAddress = ip host.port = port - return &host, nil + return host, nil } // Ask the control node for host info on all it's known peers @@ -571,7 +588,7 @@ func (r *ringDescriber) getClusterPeerInfo() ([]*HostInfo, error) { for _, row := range rows { // extract all available info about the peer - host, err := r.session.hostInfoFromMap(row, r.session.cfg.Port) + host, err := r.session.hostInfoFromMap(row, &HostInfo{port: r.session.cfg.Port}) if err != nil { return nil, err } else if !isValidPeer(host) { @@ -633,7 +650,7 @@ func (r *ringDescriber) getHostInfo(ip net.IP, port int) (*HostInfo, error) { } for _, row := range rows { - h, err := r.session.hostInfoFromMap(row, port) + h, err := r.session.hostInfoFromMap(row, &HostInfo{port: port}) if err != nil { return nil, err } diff --git a/vendor/github.com/gocql/gocql/integration.sh b/vendor/github.com/gocql/gocql/integration.sh index 6e49f844..1c6311b2 100755 --- a/vendor/github.com/gocql/gocql/integration.sh +++ b/vendor/github.com/gocql/gocql/integration.sh @@ -75,10 +75,15 @@ function run_tests() { else sleep 1s go test -tags "cassandra gocql_debug" -timeout=5m -race $args + + ccm clear + ccm start --wait-for-binary-proto + sleep 1s + go test -tags "integration gocql_debug" -timeout=5m -race $args ccm clear - ccm start + ccm start --wait-for-binary-proto sleep 1s go test -tags "ccm gocql_debug" -timeout=5m -race $args diff --git a/vendor/github.com/gocql/gocql/marshal.go b/vendor/github.com/gocql/gocql/marshal.go index 6c13a8b6..f8131e44 100644 --- a/vendor/github.com/gocql/gocql/marshal.go +++ b/vendor/github.com/gocql/gocql/marshal.go @@ -82,7 +82,9 @@ func Marshal(info TypeInfo, value interface{}) ([]byte, error) { return marshalDouble(info, value) case TypeDecimal: return marshalDecimal(info, value) - case TypeTimestamp, TypeTime: + case TypeTime: + return marshalTime(info, value) + case TypeTimestamp: return marshalTimestamp(info, value) case TypeList, TypeSet: return marshalList(info, value) @@ -146,7 +148,9 @@ func Unmarshal(info TypeInfo, data []byte, value interface{}) error { return unmarshalDouble(info, data, value) case TypeDecimal: return unmarshalDecimal(info, data, value) - case TypeTimestamp, TypeTime: + case TypeTime: + return unmarshalTime(info, data, value) + case TypeTimestamp: return unmarshalTimestamp(info, data, value) case TypeList, TypeSet: return unmarshalList(info, data, value) @@ -1090,6 +1094,30 @@ func encBigInt2C(n *big.Int) []byte { return nil } +func marshalTime(info TypeInfo, value interface{}) ([]byte, error) { + switch v := value.(type) { + case Marshaler: + return v.MarshalCQL(info) + case unsetColumn: + return nil, nil + case int64: + return encBigInt(v), nil + case time.Duration: + return encBigInt(v.Nanoseconds()), nil + } + + if value == nil { + return nil, nil + } + + rv := reflect.ValueOf(value) + switch rv.Type().Kind() { + case reflect.Int64: + return encBigInt(rv.Int()), nil + } + return nil, marshalErrorf("can not marshal %T into %s", value, info) +} + func marshalTimestamp(info TypeInfo, value interface{}) ([]byte, error) { switch v := value.(type) { case Marshaler: @@ -1104,8 +1132,6 @@ func marshalTimestamp(info TypeInfo, value interface{}) ([]byte, error) { } x := int64(v.UTC().Unix()*1e3) + int64(v.UTC().Nanosecond()/1e6) return encBigInt(x), nil - case time.Duration: - return encBigInt(v.Nanoseconds()), nil } if value == nil { @@ -1120,6 +1146,31 @@ func marshalTimestamp(info TypeInfo, value interface{}) ([]byte, error) { return nil, marshalErrorf("can not marshal %T into %s", value, info) } +func unmarshalTime(info TypeInfo, data []byte, value interface{}) error { + switch v := value.(type) { + case Unmarshaler: + return v.UnmarshalCQL(info, data) + case *int64: + *v = decBigInt(data) + return nil + case *time.Duration: + *v = time.Duration(decBigInt(data)) + return nil + } + + rv := reflect.ValueOf(value) + if rv.Kind() != reflect.Ptr { + return unmarshalErrorf("can not unmarshal into non-pointer %T", value) + } + rv = rv.Elem() + switch rv.Type().Kind() { + case reflect.Int64: + rv.SetInt(decBigInt(data)) + return nil + } + return unmarshalErrorf("can not unmarshal %s into %T", info, value) +} + func unmarshalTimestamp(info TypeInfo, data []byte, value interface{}) error { switch v := value.(type) { case Unmarshaler: @@ -1137,8 +1188,6 @@ func unmarshalTimestamp(info TypeInfo, data []byte, value interface{}) error { nsec := (x - sec*1000) * 1000000 *v = time.Unix(sec, nsec).In(time.UTC) return nil - case *time.Duration: - *v = time.Duration(decBigInt(data)) } rv := reflect.ValueOf(value) @@ -1212,6 +1261,16 @@ func unmarshalDate(info TypeInfo, data []byte, value interface{}) error { timestamp := (int64(current) - int64(origin)) * 86400000 *v = time.Unix(0, timestamp*int64(time.Millisecond)).In(time.UTC) return nil + case *string: + if len(data) == 0 { + *v = "" + return nil + } + var origin uint32 = 1 << 31 + var current uint32 = binary.BigEndian.Uint32(data) + timestamp := (int64(current) - int64(origin)) * 86400000 + *v = time.Unix(0, timestamp*int64(time.Millisecond)).In(time.UTC).Format("2006-01-02") + return nil } return unmarshalErrorf("can not unmarshal %s into %T", info, value) } @@ -1400,11 +1459,17 @@ func marshalList(info TypeInfo, value interface{}) ([]byte, error) { return nil, marshalErrorf("can not marshal %T into %s", value, info) } -func readCollectionSize(info CollectionType, data []byte) (size, read int) { +func readCollectionSize(info CollectionType, data []byte) (size, read int, err error) { if info.proto > protoVersion2 { + if len(data) < 4 { + return 0, 0, unmarshalErrorf("unmarshal list: unexpected eof") + } size = int(data[0])<<24 | int(data[1])<<16 | int(data[2])<<8 | int(data[3]) read = 4 } else { + if len(data) < 2 { + return 0, 0, unmarshalErrorf("unmarshal list: unexpected eof") + } size = int(data[0])<<8 | int(data[1]) read = 2 } @@ -1437,10 +1502,10 @@ func unmarshalList(info TypeInfo, data []byte, value interface{}) error { rv.Set(reflect.Zero(t)) return nil } - if len(data) < 2 { - return unmarshalErrorf("unmarshal list: unexpected eof") + n, p, err := readCollectionSize(listInfo, data) + if err != nil { + return err } - n, p := readCollectionSize(listInfo, data) data = data[p:] if k == reflect.Array { if rv.Len() != n { @@ -1450,10 +1515,10 @@ func unmarshalList(info TypeInfo, data []byte, value interface{}) error { rv.Set(reflect.MakeSlice(t, n, n)) } for i := 0; i < n; i++ { - if len(data) < 2 { - return unmarshalErrorf("unmarshal list: unexpected eof") + m, p, err := readCollectionSize(listInfo, data) + if err != nil { + return err } - m, p := readCollectionSize(listInfo, data) data = data[p:] if err := Unmarshal(listInfo.Elem, data[:m], rv.Index(i).Addr().Interface()); err != nil { return err @@ -1478,15 +1543,16 @@ func marshalMap(info TypeInfo, value interface{}) ([]byte, error) { } rv := reflect.ValueOf(value) - if rv.IsNil() { - return nil, nil - } t := rv.Type() if t.Kind() != reflect.Map { return nil, marshalErrorf("can not marshal %T into %s", value, info) } + if rv.IsNil() { + return nil, nil + } + buf := &bytes.Buffer{} n := rv.Len() @@ -1537,16 +1603,16 @@ func unmarshalMap(info TypeInfo, data []byte, value interface{}) error { return nil } rv.Set(reflect.MakeMap(t)) - if len(data) < 2 { - return unmarshalErrorf("unmarshal map: unexpected eof") + n, p, err := readCollectionSize(mapInfo, data) + if err != nil { + return err } - n, p := readCollectionSize(mapInfo, data) data = data[p:] for i := 0; i < n; i++ { - if len(data) < 2 { - return unmarshalErrorf("unmarshal list: unexpected eof") + m, p, err := readCollectionSize(mapInfo, data) + if err != nil { + return err } - m, p := readCollectionSize(mapInfo, data) data = data[p:] key := reflect.New(t.Key()) if err := Unmarshal(mapInfo.Key, data[:m], key.Interface()); err != nil { @@ -1554,7 +1620,10 @@ func unmarshalMap(info TypeInfo, data []byte, value interface{}) error { } data = data[m:] - m, p = readCollectionSize(mapInfo, data) + m, p, err = readCollectionSize(mapInfo, data) + if err != nil { + return err + } data = data[p:] val := reflect.New(t.Elem()) if err := Unmarshal(mapInfo.Elem, data[:m], val.Interface()); err != nil { diff --git a/vendor/github.com/gocql/gocql/metadata.go b/vendor/github.com/gocql/gocql/metadata.go index 083c51df..5a17559b 100644 --- a/vendor/github.com/gocql/gocql/metadata.go +++ b/vendor/github.com/gocql/gocql/metadata.go @@ -22,6 +22,7 @@ type KeyspaceMetadata struct { Tables map[string]*TableMetadata Functions map[string]*FunctionMetadata Aggregates map[string]*AggregateMetadata + Views map[string]*ViewMetadata } // schema metadata for a table (a.k.a. column family) @@ -81,6 +82,14 @@ type AggregateMetadata struct { finalFunc string } +// ViewMetadata holds the metadata for views. +type ViewMetadata struct { + Keyspace string + Name string + FieldNames []string + FieldTypes []TypeInfo +} + // the ordering of the column with regard to its comparator type ColumnOrder bool @@ -233,9 +242,13 @@ func (s *schemaDescriber) refreshSchema(keyspaceName string) error { if err != nil { return err } + views, err := getViewsMetadata(s.session, keyspaceName) + if err != nil { + return err + } // organize the schema data - compileMetadata(s.session.cfg.ProtoVersion, keyspace, tables, columns, functions, aggregates) + compileMetadata(s.session.cfg.ProtoVersion, keyspace, tables, columns, functions, aggregates, views) // update the cache s.cache[keyspaceName] = keyspace @@ -255,6 +268,7 @@ func compileMetadata( columns []ColumnMetadata, functions []FunctionMetadata, aggregates []AggregateMetadata, + views []ViewMetadata, ) { keyspace.Tables = make(map[string]*TableMetadata) for i := range tables { @@ -272,6 +286,10 @@ func compileMetadata( aggregate.StateFunc = *keyspace.Functions[aggregate.stateFunc] keyspace.Aggregates[aggregate.Name] = &aggregate } + keyspace.Views = make(map[string]*ViewMetadata, len(views)) + for i := range views { + keyspace.Views[views[i].Name] = &views[i] + } // add columns from the schema data for i := range columns { @@ -849,11 +867,56 @@ func getTypeInfo(t string) TypeInfo { return getCassandraType(t) } -func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMetadata, error) { +func getViewsMetadata(session *Session, keyspaceName string) ([]ViewMetadata, error) { if session.cfg.ProtoVersion == protoVersion1 { return nil, nil } var tableName string + if session.useSystemSchema { + tableName = "system_schema.types" + } else { + tableName = "system.schema_usertypes" + } + stmt := fmt.Sprintf(` + SELECT + type_name, + field_names, + field_types + FROM %s + WHERE keyspace_name = ?`, tableName) + + var views []ViewMetadata + + rows := session.control.query(stmt, keyspaceName).Scanner() + for rows.Next() { + view := ViewMetadata{Keyspace: keyspaceName} + var argumentTypes []string + err := rows.Scan(&view.Name, + &view.FieldNames, + &argumentTypes, + ) + if err != nil { + return nil, err + } + view.FieldTypes = make([]TypeInfo, len(argumentTypes)) + for i, argumentType := range argumentTypes { + view.FieldTypes[i] = getTypeInfo(argumentType) + } + views = append(views, view) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return views, nil +} + +func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMetadata, error) { + if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions { + return nil, nil + } + var tableName string if session.useSystemSchema { tableName = "system_schema.functions" } else { @@ -905,7 +968,7 @@ func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMeta } func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMetadata, error) { - if session.cfg.ProtoVersion == protoVersion1 { + if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions { return nil, nil } var tableName string diff --git a/vendor/github.com/gocql/gocql/policies.go b/vendor/github.com/gocql/gocql/policies.go index b24beda2..feddc9ef 100644 --- a/vendor/github.com/gocql/gocql/policies.go +++ b/vendor/github.com/gocql/gocql/policies.go @@ -6,6 +6,8 @@ package gocql import ( "context" + crand "crypto/rand" + "encoding/binary" "errors" "fmt" "math" @@ -334,8 +336,6 @@ func RoundRobinHostPolicy() HostSelectionPolicy { type roundRobinHostPolicy struct { hosts cowHostList - pos uint32 - mu sync.RWMutex } func (r *roundRobinHostPolicy) IsLocal(*HostInfo) bool { return true } @@ -344,25 +344,16 @@ func (r *roundRobinHostPolicy) SetPartitioner(partitioner string) {} func (r *roundRobinHostPolicy) Init(*Session) {} func (r *roundRobinHostPolicy) Pick(qry ExecutableQuery) NextHost { - // i is used to limit the number of attempts to find a host - // to the number of hosts known to this policy - var i int - return func() SelectedHost { - hosts := r.hosts.get() - if len(hosts) == 0 { - return nil - } + src := r.hosts.get() + hosts := make([]*HostInfo, len(src)) + copy(hosts, src) - // always increment pos to evenly distribute traffic in case of - // failures - pos := atomic.AddUint32(&r.pos, 1) - 1 - if i >= len(hosts) { - return nil - } - host := hosts[(pos)%uint32(len(hosts))] - i++ - return (*selectedHost)(host) - } + rand := rand.New(randSource()) + rand.Shuffle(len(hosts), func(i, j int) { + hosts[i], hosts[j] = hosts[j], hosts[i] + }) + + return roundRobbin(hosts) } func (r *roundRobinHostPolicy) AddHost(host *HostInfo) { @@ -387,6 +378,18 @@ func ShuffleReplicas() func(*tokenAwareHostPolicy) { } } +// NonLocalReplicasFallback enables fallback to replicas that are not considered local. +// +// TokenAwareHostPolicy used with DCAwareHostPolicy fallback first selects replicas by partition key in local DC, then +// falls back to other nodes in the local DC. Enabling NonLocalReplicasFallback causes TokenAwareHostPolicy +// to first select replicas by partition key in local DC, then replicas by partition key in remote DCs and fall back +// to other nodes in local DC. +func NonLocalReplicasFallback() func(policy *tokenAwareHostPolicy) { + return func(t *tokenAwareHostPolicy) { + t.nonLocalReplicasFallback = true + } +} + // TokenAwareHostPolicy is a token aware host selection policy, where hosts are // selected based on the partition key, so queries are sent to the host which // owns the partition. Fallback is used when routing information is not available. @@ -398,25 +401,35 @@ func TokenAwareHostPolicy(fallback HostSelectionPolicy, opts ...func(*tokenAware return p } -type keyspaceMeta struct { - replicas map[string]map[token][]*HostInfo +// clusterMeta holds metadata about cluster topology. +// It is used inside atomic.Value and shallow copies are used when replacing it, +// so fields should not be modified in-place. Instead, to modify a field a copy of the field should be made +// and the pointer in clusterMeta updated to point to the new value. +type clusterMeta struct { + // replicas is map[keyspace]map[token]hosts + replicas map[string]tokenRingReplicas + tokenRing *tokenRing } type tokenAwareHostPolicy struct { - hosts cowHostList - mu sync.RWMutex - partitioner string - fallback HostSelectionPolicy - session *Session + fallback HostSelectionPolicy + getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error) + getKeyspaceName func() string - tokenRing atomic.Value // *tokenRing - keyspaces atomic.Value // *keyspaceMeta + shuffleReplicas bool + nonLocalReplicasFallback bool - shuffleReplicas bool + // mu protects writes to hosts, partitioner, metadata. + // reads can be unlocked as long as they are not used for updating state later. + mu sync.Mutex + hosts cowHostList + partitioner string + metadata atomic.Value // *clusterMeta } func (t *tokenAwareHostPolicy) Init(s *Session) { - t.session = s + t.getKeyspaceMetadata = s.KeyspaceMetadata + t.getKeyspaceName = func() string { return s.cfg.Keyspace } } func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo) bool { @@ -424,34 +437,36 @@ func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo) bool { } func (t *tokenAwareHostPolicy) KeyspaceChanged(update KeyspaceUpdateEvent) { - meta, _ := t.keyspaces.Load().(*keyspaceMeta) - var size = 1 - if meta != nil { - size = len(meta.replicas) - } + t.mu.Lock() + defer t.mu.Unlock() + meta := t.getMetadataForUpdate() + t.updateReplicas(meta, update.Keyspace) + t.metadata.Store(meta) +} - newMeta := &keyspaceMeta{ - replicas: make(map[string]map[token][]*HostInfo, size), - } +// updateReplicas updates replicas in clusterMeta. +// It must be called with t.mu mutex locked. +// meta must not be nil and it's replicas field will be updated. +func (t *tokenAwareHostPolicy) updateReplicas(meta *clusterMeta, keyspace string) { + newReplicas := make(map[string]tokenRingReplicas, len(meta.replicas)) - ks, err := t.session.KeyspaceMetadata(update.Keyspace) + ks, err := t.getKeyspaceMetadata(keyspace) if err == nil { strat := getStrategy(ks) - tr := t.tokenRing.Load().(*tokenRing) - if tr != nil { - newMeta.replicas[update.Keyspace] = strat.replicaMap(t.hosts.get(), tr.tokens) + if strat != nil { + if meta != nil && meta.tokenRing != nil { + newReplicas[keyspace] = strat.replicaMap(meta.tokenRing) + } } } - if meta != nil { - for ks, replicas := range meta.replicas { - if ks != update.Keyspace { - newMeta.replicas[ks] = replicas - } + for ks, replicas := range meta.replicas { + if ks != keyspace { + newReplicas[ks] = replicas } } - t.keyspaces.Store(newMeta) + meta.replicas = newReplicas } func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) { @@ -461,50 +476,96 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) { if t.partitioner != partitioner { t.fallback.SetPartitioner(partitioner) t.partitioner = partitioner - - t.resetTokenRing(partitioner) + meta := t.getMetadataForUpdate() + meta.resetTokenRing(t.partitioner, t.hosts.get()) + t.updateReplicas(meta, t.getKeyspaceName()) + t.metadata.Store(meta) } } func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) { - t.hosts.add(host) + t.mu.Lock() + if t.hosts.add(host) { + meta := t.getMetadataForUpdate() + meta.resetTokenRing(t.partitioner, t.hosts.get()) + t.updateReplicas(meta, t.getKeyspaceName()) + t.metadata.Store(meta) + } + t.mu.Unlock() + t.fallback.AddHost(host) +} - t.mu.RLock() - partitioner := t.partitioner - t.mu.RUnlock() - t.resetTokenRing(partitioner) +func (t *tokenAwareHostPolicy) AddHosts(hosts []*HostInfo) { + t.mu.Lock() + + for _, host := range hosts { + t.hosts.add(host) + } + + meta := t.getMetadataForUpdate() + meta.resetTokenRing(t.partitioner, t.hosts.get()) + t.updateReplicas(meta, t.getKeyspaceName()) + t.metadata.Store(meta) + + t.mu.Unlock() + + for _, host := range hosts { + t.fallback.AddHost(host) + } } func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) { - t.hosts.remove(host.ConnectAddress()) - t.fallback.RemoveHost(host) + t.mu.Lock() + if t.hosts.remove(host.ConnectAddress()) { + meta := t.getMetadataForUpdate() + meta.resetTokenRing(t.partitioner, t.hosts.get()) + t.updateReplicas(meta, t.getKeyspaceName()) + t.metadata.Store(meta) + } + t.mu.Unlock() - t.mu.RLock() - partitioner := t.partitioner - t.mu.RUnlock() - t.resetTokenRing(partitioner) + t.fallback.RemoveHost(host) } func (t *tokenAwareHostPolicy) HostUp(host *HostInfo) { - // TODO: need to avoid doing all the work on AddHost on hostup/down - // because it now expensive to calculate the replica map for each - // token - t.AddHost(host) + t.fallback.HostUp(host) } func (t *tokenAwareHostPolicy) HostDown(host *HostInfo) { - t.RemoveHost(host) + t.fallback.HostDown(host) +} + +// getMetadataReadOnly returns current cluster metadata. +// Metadata uses copy on write, so the returned value should be only used for reading. +// To obtain a copy that could be updated, use getMetadataForUpdate instead. +func (t *tokenAwareHostPolicy) getMetadataReadOnly() *clusterMeta { + meta, _ := t.metadata.Load().(*clusterMeta) + return meta } -func (t *tokenAwareHostPolicy) resetTokenRing(partitioner string) { +// getMetadataForUpdate returns clusterMeta suitable for updating. +// It is a SHALLOW copy of current metadata in case it was already set or new empty clusterMeta otherwise. +// This function should be called with t.mu mutex locked and the mutex should not be released before +// storing the new metadata. +func (t *tokenAwareHostPolicy) getMetadataForUpdate() *clusterMeta { + metaReadOnly := t.getMetadataReadOnly() + meta := new(clusterMeta) + if metaReadOnly != nil { + *meta = *metaReadOnly + } + return meta +} + +// resetTokenRing creates a new tokenRing. +// It must be called with t.mu locked. +func (m *clusterMeta) resetTokenRing(partitioner string, hosts []*HostInfo) { if partitioner == "" { // partitioner not yet set return } // create a new token ring - hosts := t.hosts.get() tokenRing, err := newTokenRing(partitioner, hosts) if err != nil { Logger.Printf("Unable to update the token ring due to error: %s", err) @@ -512,16 +573,7 @@ func (t *tokenAwareHostPolicy) resetTokenRing(partitioner string) { } // replace the token ring - t.tokenRing.Store(tokenRing) -} - -func (t *tokenAwareHostPolicy) getReplicas(keyspace string, token token) ([]*HostInfo, bool) { - meta, _ := t.keyspaces.Load().(*keyspaceMeta) - if meta == nil { - return nil, false - } - tokens, ok := meta.replicas[keyspace][token] - return tokens, ok + m.tokenRing = tokenRing } func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { @@ -536,28 +588,28 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { return t.fallback.Pick(qry) } - tr, _ := t.tokenRing.Load().(*tokenRing) - if tr == nil { + meta := t.getMetadataReadOnly() + if meta == nil || meta.tokenRing == nil { return t.fallback.Pick(qry) } - token := tr.partitioner.Hash(routingKey) - primaryEndpoint := tr.GetHostForToken(token) + token := meta.tokenRing.partitioner.Hash(routingKey) + ht := meta.replicas[qry.Keyspace()].replicasFor(token) - if primaryEndpoint == nil || token == nil { - return t.fallback.Pick(qry) - } - - replicas, ok := t.getReplicas(qry.Keyspace(), token) - if !ok { - replicas = []*HostInfo{primaryEndpoint} + var replicas []*HostInfo + if ht == nil { + host, _ := meta.tokenRing.GetHostForToken(token) + replicas = []*HostInfo{host} } else if t.shuffleReplicas { replicas = shuffleHosts(replicas) + } else { + replicas = ht.hosts } var ( fallbackIter NextHost - i int + i, j int + remote []*HostInfo ) used := make(map[*HostInfo]bool, len(replicas)) @@ -566,12 +618,29 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { h := replicas[i] i++ - if h.IsUp() && t.fallback.IsLocal(h) { + if !t.fallback.IsLocal(h) { + remote = append(remote, h) + continue + } + + if h.IsUp() { used[h] = true return (*selectedHost)(h) } } + if t.nonLocalReplicasFallback { + for j < len(remote) { + h := remote[j] + j++ + + if h.IsUp() { + used[h] = true + return (*selectedHost)(h) + } + } + } + if fallbackIter == nil { // fallback fallbackIter = t.fallback.Pick(qry) @@ -580,9 +649,11 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { // filter the token aware selected hosts from the fallback hosts for fallbackHost := fallbackIter(); fallbackHost != nil; fallbackHost = fallbackIter() { if !used[fallbackHost.Info()] { + used[fallbackHost.Info()] = true return fallbackHost } } + return nil } } @@ -731,8 +802,6 @@ func (host selectedHostPoolHost) Mark(err error) { type dcAwareRR struct { local string - pos uint32 - mu sync.RWMutex localHosts cowHostList remoteHosts cowHostList } @@ -753,7 +822,7 @@ func (d *dcAwareRR) IsLocal(host *HostInfo) bool { } func (d *dcAwareRR) AddHost(host *HostInfo) { - if host.DataCenter() == d.local { + if d.IsLocal(host) { d.localHosts.add(host) } else { d.remoteHosts.add(host) @@ -761,7 +830,7 @@ func (d *dcAwareRR) AddHost(host *HostInfo) { } func (d *dcAwareRR) RemoveHost(host *HostInfo) { - if host.DataCenter() == d.local { + if d.IsLocal(host) { d.localHosts.remove(host.ConnectAddress()) } else { d.remoteHosts.remove(host.ConnectAddress()) @@ -771,31 +840,54 @@ func (d *dcAwareRR) RemoveHost(host *HostInfo) { func (d *dcAwareRR) HostUp(host *HostInfo) { d.AddHost(host) } func (d *dcAwareRR) HostDown(host *HostInfo) { d.RemoveHost(host) } -func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost { +var randSeed int64 + +func init() { + p := make([]byte, 8) + if _, err := crand.Read(p); err != nil { + panic(err) + } + randSeed = int64(binary.BigEndian.Uint64(p)) +} + +func randSource() rand.Source { + return rand.NewSource(atomic.AddInt64(&randSeed, 1)) +} + +func roundRobbin(hosts []*HostInfo) NextHost { var i int return func() SelectedHost { - var hosts []*HostInfo - localHosts := d.localHosts.get() - remoteHosts := d.remoteHosts.get() - if len(localHosts) != 0 { - hosts = localHosts - } else { - hosts = remoteHosts - } - if len(hosts) == 0 { - return nil - } + for i < len(hosts) { + h := hosts[i] + i++ - // always increment pos to evenly distribute traffic in case of - // failures - pos := atomic.AddUint32(&d.pos, 1) - 1 - if i >= len(localHosts)+len(remoteHosts) { - return nil + if h.IsUp() { + return (*selectedHost)(h) + } } - host := hosts[(pos)%uint32(len(hosts))] - i++ - return (*selectedHost)(host) + + return nil + } +} + +func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost { + local := d.localHosts.get() + remote := d.remoteHosts.get() + + hosts := make([]*HostInfo, len(local)+len(remote)) + n := copy(hosts, local) + copy(hosts[n:], remote) + + // TODO: use random chose-2 but that will require plumbing information + // about connection/host load to here + r := rand.New(randSource()) + for _, l := range [][]*HostInfo{hosts[:len(local)], hosts[len(local):]} { + r.Shuffle(len(l), func(i, j int) { + l[i], l[j] = l[j], l[i] + }) } + + return roundRobbin(hosts) } // ConvictionPolicy interface is used by gocql to determine if a host should be diff --git a/vendor/github.com/gocql/gocql/query_executor.go b/vendor/github.com/gocql/gocql/query_executor.go index 04d3704f..6dd912db 100644 --- a/vendor/github.com/gocql/gocql/query_executor.go +++ b/vendor/github.com/gocql/gocql/query_executor.go @@ -34,12 +34,30 @@ func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, c return iter } +func (q *queryExecutor) speculate(ctx context.Context, qry ExecutableQuery, sp SpeculativeExecutionPolicy, results chan *Iter) *Iter { + ticker := time.NewTicker(sp.Delay()) + defer ticker.Stop() + + for i := 0; i < sp.Attempts(); i++ { + select { + case <-ticker.C: + go q.run(ctx, qry, results) + case <-ctx.Done(): + return &Iter{err: ctx.Err()} + case iter := <-results: + return iter + } + } + + return nil +} + func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) { // check if the query is not marked as idempotent, if // it is, we force the policy to NonSpeculative sp := qry.speculativeExecutionPolicy() - if !qry.IsIdempotent() { - sp = NonSpeculativeExecution{} + if !qry.IsIdempotent() || sp.Attempts() == 0 { + return q.do(qry.Context(), qry), nil } ctx, cancel := context.WithCancel(qry.Context()) @@ -53,22 +71,9 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) { // The speculative executions are launched _in addition_ to the main // execution, on a timer. So Speculation{2} would make 3 executions running // in total. - go func() { - // setup a ticker - ticker := time.NewTicker(sp.Delay()) - defer ticker.Stop() - - for i := 0; i < sp.Attempts(); i++ { - select { - case <-ticker.C: - // Launch the additional execution - go q.run(ctx, qry, results) - case <-ctx.Done(): - // not starting additional executions - return - } - } - }() + if iter := q.speculate(ctx, qry, sp, results); iter != nil { + return iter, nil + } select { case iter := <-results: @@ -107,7 +112,15 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery) *Iter { iter = q.attemptQuery(ctx, qry, conn) iter.host = selectedHost.Info() // Update host - selectedHost.Mark(iter.err) + switch iter.err { + case context.Canceled, context.DeadlineExceeded, ErrNotFound: + // those errors represents logical errors, they should not count + // toward removing a node from the pool + selectedHost.Mark(nil) + return iter + default: + selectedHost.Mark(iter.err) + } // Exit if the query was successful // or no retry policy defined or retry attempts were reached @@ -140,7 +153,7 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery) *Iter { return &Iter{err: ErrNoConnections} } -func (q *queryExecutor) run(ctx context.Context, qry ExecutableQuery, results chan *Iter) { +func (q *queryExecutor) run(ctx context.Context, qry ExecutableQuery, results chan<- *Iter) { select { case results <- q.do(ctx, qry): case <-ctx.Done(): diff --git a/vendor/github.com/gocql/gocql/session.go b/vendor/github.com/gocql/gocql/session.go index 101cb3ce..935f632f 100644 --- a/vendor/github.com/gocql/gocql/session.go +++ b/vendor/github.com/gocql/gocql/session.go @@ -62,8 +62,9 @@ type Session struct { schemaEvents *eventDebouncer // ring metadata - hosts []HostInfo - useSystemSchema bool + hosts []HostInfo + useSystemSchema bool + hasAggregatesAndFunctions bool cfg ClusterConfig @@ -107,6 +108,11 @@ func NewSession(cfg ClusterConfig) (*Session, error) { return nil, ErrNoHosts } + // Check that either Authenticator is set or AuthProvider, not both + if cfg.Authenticator != nil && cfg.AuthProvider != nil { + return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.") + } + s := &Session{ cons: cfg.Consistency, prefetch: 0.25, @@ -215,9 +221,28 @@ func (s *Session) init() error { hostMap[host.ConnectAddress().String()] = host } + hosts = hosts[:0] for _, host := range hostMap { host = s.ring.addOrUpdate(host) - s.addNewNode(host) + if s.cfg.filterHost(host) { + continue + } + + host.setState(NodeUp) + s.pool.addHost(host) + + hosts = append(hosts, host) + } + + type bulkAddHosts interface { + AddHosts([]*HostInfo) + } + if v, ok := s.policy.(bulkAddHosts); ok { + v.AddHosts(hosts) + } else { + for _, host := range hosts { + s.policy.AddHost(host) + } } // TODO(zariel): we probably dont need this any more as we verify that we @@ -235,14 +260,21 @@ func (s *Session) init() error { newer, _ := checkSystemSchema(s.control) s.useSystemSchema = newer } else { - host := s.ring.rrHost() - s.useSystemSchema = host.Version().Major >= 3 + version := s.ring.rrHost().Version() + s.useSystemSchema = version.AtLeast(3, 0, 0) + s.hasAggregatesAndFunctions = version.AtLeast(2, 2, 0) } if s.pool.Size() == 0 { return ErrNoConnectionsStarted } + // Invoke KeyspaceChanged to let the policy cache the session keyspace + // parameters. This is used by tokenAwareHostPolicy to discover replicas. + if !s.cfg.disableControlConn && s.cfg.Keyspace != "" { + s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: s.cfg.Keyspace}) + } + return nil } @@ -643,21 +675,6 @@ func (s *Session) MapExecuteBatchCAS(batch *Batch, dest map[string]interface{}) return applied, iter, iter.err } -func (s *Session) connect(host *HostInfo, errorHandler ConnErrorHandler) (*Conn, error) { - if s.connectObserver != nil { - obs := ObservedConnect{ - Host: host, - Start: time.Now(), - } - conn, err := s.dial(host, s.connCfg, errorHandler) - obs.End = time.Now() - obs.Err = err - s.connectObserver.ObserveConnect(obs) - return conn, err - } - return s.dial(host, s.connCfg, errorHandler) -} - type hostMetrics struct { Attempts int TotalLatency int64 @@ -666,6 +683,82 @@ type hostMetrics struct { type queryMetrics struct { l sync.RWMutex m map[string]*hostMetrics + // totalAttempts is total number of attempts. + // Equal to sum of all hostMetrics' Attempts. + totalAttempts int +} + +// preFilledQueryMetrics initializes new queryMetrics based on per-host supplied data. +func preFilledQueryMetrics(m map[string]*hostMetrics) *queryMetrics { + qm := &queryMetrics{m: m} + for _, hm := range qm.m { + qm.totalAttempts += hm.Attempts + } + return qm +} + +// hostMetricsLocked gets or creates host metrics for given host. +func (qm *queryMetrics) hostMetrics(host *HostInfo) *hostMetrics { + qm.l.Lock() + metrics := qm.hostMetricsLocked(host) + qm.l.Unlock() + return metrics +} + +// hostMetricsLocked gets or creates host metrics for given host. +// It must be called only while holding qm.l lock. +func (qm *queryMetrics) hostMetricsLocked(host *HostInfo) *hostMetrics { + metrics, exists := qm.m[host.ConnectAddress().String()] + if !exists { + // if the host is not in the map, it means it's been accessed for the first time + metrics = &hostMetrics{} + qm.m[host.ConnectAddress().String()] = metrics + } + + return metrics +} + +// attempts returns the number of times the query was executed. +func (qm *queryMetrics) attempts() int { + qm.l.Lock() + attempts := qm.totalAttempts + qm.l.Unlock() + return attempts +} + +// addAttempts adds given number of attempts and returns previous total attempts. +func (qm *queryMetrics) addAttempts(i int, host *HostInfo) int { + qm.l.Lock() + hostMetric := qm.hostMetricsLocked(host) + hostMetric.Attempts += i + attempts := qm.totalAttempts + qm.totalAttempts += i + qm.l.Unlock() + return attempts +} + +func (qm *queryMetrics) latency() int64 { + qm.l.Lock() + var ( + attempts int + latency int64 + ) + for _, metric := range qm.m { + attempts += metric.Attempts + latency += metric.TotalLatency + } + qm.l.Unlock() + if attempts > 0 { + return latency / int64(attempts) + } + return 0 +} + +func (qm *queryMetrics) addLatency(l int64, host *HostInfo) { + qm.l.Lock() + hostMetric := qm.hostMetricsLocked(host) + hostMetric.TotalLatency += l + qm.l.Unlock() } // Query represents a CQL statement that can be executed. @@ -675,7 +768,6 @@ type Query struct { cons Consistency pageSize int routingKey []byte - routingKeyBuffer []byte pageState []byte prefetch float64 trace Tracer @@ -694,6 +786,9 @@ type Query struct { metrics *queryMetrics disableAutoPage bool + + // getKeyspace is field so that it can be overriden in tests + getKeyspace func() string } func (q *Query) defaultsFromSession() { @@ -715,19 +810,6 @@ func (q *Query) defaultsFromSession() { s.mu.RUnlock() } -func (q *Query) getHostMetrics(host *HostInfo) *hostMetrics { - q.metrics.l.Lock() - metrics, exists := q.metrics.m[host.ConnectAddress().String()] - if !exists { - // if the host is not in the map, it means it's been accessed for the first time - metrics = &hostMetrics{} - q.metrics.m[host.ConnectAddress().String()] = metrics - } - q.metrics.l.Unlock() - - return metrics -} - // Statement returns the statement that was used to generate this query. func (q Query) Statement() string { return q.stmt @@ -740,43 +822,20 @@ func (q Query) String() string { //Attempts returns the number of times the query was executed. func (q *Query) Attempts() int { - q.metrics.l.Lock() - var attempts int - for _, metric := range q.metrics.m { - attempts += metric.Attempts - } - q.metrics.l.Unlock() - return attempts + return q.metrics.attempts() } func (q *Query) AddAttempts(i int, host *HostInfo) { - hostMetric := q.getHostMetrics(host) - q.metrics.l.Lock() - hostMetric.Attempts += i - q.metrics.l.Unlock() + q.metrics.addAttempts(i, host) } //Latency returns the average amount of nanoseconds per attempt of the query. func (q *Query) Latency() int64 { - q.metrics.l.Lock() - var attempts int - var latency int64 - for _, metric := range q.metrics.m { - attempts += metric.Attempts - latency += metric.TotalLatency - } - q.metrics.l.Unlock() - if attempts > 0 { - return latency / int64(attempts) - } - return 0 + return q.metrics.latency() } func (q *Query) AddLatency(l int64, host *HostInfo) { - hostMetric := q.getHostMetrics(host) - q.metrics.l.Lock() - hostMetric.TotalLatency += l - q.metrics.l.Unlock() + q.metrics.addLatency(l, host) } // Consistency sets the consistency level for this query. If no consistency @@ -891,7 +950,7 @@ func (q *Query) execute(ctx context.Context, conn *Conn) *Iter { } func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { - q.AddAttempts(1, host) + attempt := q.metrics.addAttempts(1, host) q.AddLatency(end.Sub(start).Nanoseconds(), host) if q.observer != nil { @@ -902,8 +961,9 @@ func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host End: end, Rows: iter.numRows, Host: host, - Metrics: q.getHostMetrics(host), + Metrics: q.metrics.hostMetrics(host), Err: iter.err, + Attempt: attempt, }) } } @@ -914,6 +974,9 @@ func (q *Query) retryPolicy() RetryPolicy { // Keyspace returns the keyspace the query will be executed against. func (q *Query) Keyspace() string { + if q.getKeyspace != nil { + return q.getKeyspace() + } if q.session == nil { return "" } @@ -944,46 +1007,7 @@ func (q *Query) GetRoutingKey() ([]byte, error) { return nil, err } - if routingKeyInfo == nil { - return nil, nil - } - - if len(routingKeyInfo.indexes) == 1 { - // single column routing key - routingKey, err := Marshal( - routingKeyInfo.types[0], - q.values[routingKeyInfo.indexes[0]], - ) - if err != nil { - return nil, err - } - return routingKey, nil - } - - // We allocate that buffer only once, so that further re-bind/exec of the - // same query don't allocate more memory. - if q.routingKeyBuffer == nil { - q.routingKeyBuffer = make([]byte, 0, 256) - } - - // composite routing key - buf := bytes.NewBuffer(q.routingKeyBuffer) - for i := range routingKeyInfo.indexes { - encoded, err := Marshal( - routingKeyInfo.types[i], - q.values[routingKeyInfo.indexes[i]], - ) - if err != nil { - return nil, err - } - lenBuf := []byte{0x00, 0x00} - binary.BigEndian.PutUint16(lenBuf, uint16(len(encoded))) - buf.Write(lenBuf) - buf.Write(encoded) - buf.WriteByte(0x00) - } - routingKey := buf.Bytes() - return routingKey, nil + return createRoutingKey(routingKeyInfo, q.values) } func (q *Query) shouldPrepare() bool { @@ -1048,6 +1072,7 @@ func (q *Query) Idempotent(value bool) *Query { // to an existing query instance. func (q *Query) Bind(v ...interface{}) *Query { q.values = v + q.pageState = nil return q } @@ -1473,10 +1498,13 @@ type Batch struct { Type BatchType Entries []BatchEntry Cons Consistency + routingKey []byte + routingKeyBuffer []byte CustomPayload map[string][]byte rt RetryPolicy spec SpeculativeExecutionPolicy observer BatchObserver + session *Session serialCons SerialConsistency defaultTimestamp bool defaultTimestampValue int64 @@ -1505,6 +1533,7 @@ func (s *Session) NewBatch(typ BatchType) *Batch { rt: s.cfg.RetryPolicy, serialCons: s.cfg.SerialConsistency, observer: s.batchObserver, + session: s, Cons: s.cons, defaultTimestamp: s.cfg.DefaultTimestamp, keyspace: s.cfg.Keyspace, @@ -1516,19 +1545,6 @@ func (s *Session) NewBatch(typ BatchType) *Batch { return batch } -func (b *Batch) getHostMetrics(host *HostInfo) *hostMetrics { - b.metrics.l.Lock() - metrics, exists := b.metrics.m[host.ConnectAddress().String()] - if !exists { - // if the host is not in the map, it means it's been accessed for the first time - metrics = &hostMetrics{} - b.metrics.m[host.ConnectAddress().String()] = metrics - } - b.metrics.l.Unlock() - - return metrics -} - // Observer enables batch-level observer on this batch. // The provided observer will be called every time this batched query is executed. func (b *Batch) Observer(observer BatchObserver) *Batch { @@ -1542,47 +1558,20 @@ func (b *Batch) Keyspace() string { // Attempts returns the number of attempts made to execute the batch. func (b *Batch) Attempts() int { - b.metrics.l.Lock() - defer b.metrics.l.Unlock() - - var attempts int - for _, metric := range b.metrics.m { - attempts += metric.Attempts - } - return attempts + return b.metrics.attempts() } func (b *Batch) AddAttempts(i int, host *HostInfo) { - hostMetric := b.getHostMetrics(host) - b.metrics.l.Lock() - hostMetric.Attempts += i - b.metrics.l.Unlock() + b.metrics.addAttempts(i, host) } //Latency returns the average number of nanoseconds to execute a single attempt of the batch. func (b *Batch) Latency() int64 { - b.metrics.l.Lock() - defer b.metrics.l.Unlock() - - var ( - attempts int - latency int64 - ) - for _, metric := range b.metrics.m { - attempts += metric.Attempts - latency += metric.TotalLatency - } - if attempts > 0 { - return latency / int64(attempts) - } - return 0 + return b.metrics.latency() } func (b *Batch) AddLatency(l int64, host *HostInfo) { - hostMetric := b.getHostMetrics(host) - b.metrics.l.Lock() - hostMetric.TotalLatency += l - b.metrics.l.Unlock() + b.metrics.addLatency(l, host) } // GetConsistency returns the currently configured consistency level for the batch @@ -1725,14 +1714,69 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host End: end, // Rows not used in batch observations // TODO - might be able to support it when using BatchCAS Host: host, - Metrics: b.getHostMetrics(host), + Metrics: b.metrics.hostMetrics(host), Err: iter.err, }) } func (b *Batch) GetRoutingKey() ([]byte, error) { - // TODO: use the first statement in the batch as the routing key? - return nil, nil + if b.routingKey != nil { + return b.routingKey, nil + } + + if len(b.Entries) == 0 { + return nil, nil + } + + entry := b.Entries[0] + if entry.binding != nil { + // bindings do not have the values let's skip it like Query does. + return nil, nil + } + // try to determine the routing key + routingKeyInfo, err := b.session.routingKeyInfo(b.Context(), entry.Stmt) + if err != nil { + return nil, err + } + + return createRoutingKey(routingKeyInfo, entry.Args) +} + +func createRoutingKey(routingKeyInfo *routingKeyInfo, values []interface{}) ([]byte, error) { + if routingKeyInfo == nil { + return nil, nil + } + + if len(routingKeyInfo.indexes) == 1 { + // single column routing key + routingKey, err := Marshal( + routingKeyInfo.types[0], + values[routingKeyInfo.indexes[0]], + ) + if err != nil { + return nil, err + } + return routingKey, nil + } + + // composite routing key + buf := bytes.NewBuffer(make([]byte, 0, 256)) + for i := range routingKeyInfo.indexes { + encoded, err := Marshal( + routingKeyInfo.types[i], + values[routingKeyInfo.indexes[i]], + ) + if err != nil { + return nil, err + } + lenBuf := []byte{0x00, 0x00} + binary.BigEndian.PutUint16(lenBuf, uint16(len(encoded))) + buf.Write(lenBuf) + buf.Write(encoded) + buf.WriteByte(0x00) + } + routingKey := buf.Bytes() + return routingKey, nil } type BatchType byte @@ -1885,6 +1929,10 @@ type ObservedQuery struct { // Err is the error in the query. // It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error Err error + + // Attempt is the index of attempt at executing this query. + // The first attempt is number zero and any retries have non-zero attempt number. + Attempt int } // QueryObserver is the interface implemented by query observers / stat collectors. diff --git a/vendor/github.com/gocql/gocql/token.go b/vendor/github.com/gocql/gocql/token.go index bdfcceb9..8ab7b858 100644 --- a/vendor/github.com/gocql/gocql/token.go +++ b/vendor/github.com/gocql/gocql/token.go @@ -131,10 +131,13 @@ func (ht hostToken) String() string { type tokenRing struct { partitioner partitioner tokens []hostToken + hosts []*HostInfo } func newTokenRing(partitioner string, hosts []*HostInfo) (*tokenRing, error) { - tokenRing := &tokenRing{} + tokenRing := &tokenRing{ + hosts: hosts, + } if strings.HasSuffix(partitioner, "Murmur3Partitioner") { tokenRing.partitioner = murmur3Partitioner{} @@ -192,29 +195,29 @@ func (t *tokenRing) String() string { return string(buf.Bytes()) } -func (t *tokenRing) GetHostForPartitionKey(partitionKey []byte) *HostInfo { +func (t *tokenRing) GetHostForPartitionKey(partitionKey []byte) (host *HostInfo, endToken token) { if t == nil { - return nil + return nil, nil } - token := t.partitioner.Hash(partitionKey) - return t.GetHostForToken(token) + return t.GetHostForToken(t.partitioner.Hash(partitionKey)) } -func (t *tokenRing) GetHostForToken(token token) *HostInfo { +func (t *tokenRing) GetHostForToken(token token) (host *HostInfo, endToken token) { if t == nil || len(t.tokens) == 0 { - return nil + return nil, nil } // find the primary replica - ringIndex := sort.Search(len(t.tokens), func(i int) bool { + p := sort.Search(len(t.tokens), func(i int) bool { return !t.tokens[i].token.Less(token) }) - if ringIndex == len(t.tokens) { + if p == len(t.tokens) { // wrap around to the first in the ring - ringIndex = 0 + p = 0 } - return t.tokens[ringIndex].host + v := t.tokens[p] + return v.host, v.token } diff --git a/vendor/github.com/gocql/gocql/topology.go b/vendor/github.com/gocql/gocql/topology.go index 735dc9da..008f4a7a 100644 --- a/vendor/github.com/gocql/gocql/topology.go +++ b/vendor/github.com/gocql/gocql/topology.go @@ -2,12 +2,51 @@ package gocql import ( "fmt" + "sort" "strconv" "strings" ) +type hostTokens struct { + token token + hosts []*HostInfo +} + +type tokenRingReplicas []hostTokens + +func (h tokenRingReplicas) Less(i, j int) bool { return h[i].token.Less(h[j].token) } +func (h tokenRingReplicas) Len() int { return len(h) } +func (h tokenRingReplicas) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h tokenRingReplicas) replicasFor(t token) *hostTokens { + if len(h) == 0 { + return nil + } + + p := sort.Search(len(h), func(i int) bool { + return !h[i].token.Less(t) + }) + + // TODO: simplify this + if p < len(h) && h[p].token == t { + return &h[p] + } + + p-- + + if p >= len(h) { + // rollover + p = 0 + } else if p < 0 { + // rollunder + p = len(h) - 1 + } + + return &h[p] +} + type placementStrategy interface { - replicaMap(hosts []*HostInfo, tokens []hostToken) map[token][]*HostInfo + replicaMap(tokenRing *tokenRing) tokenRingReplicas replicationFactor(dc string) int } @@ -47,6 +86,8 @@ func getStrategy(ks *KeyspaceMetadata) placementStrategy { dcs[dc] = getReplicationFactorFromOpts(ks.Name+":dc="+dc, rf) } return &networkTopology{dcs: dcs} + case strings.Contains(ks.StrategyClass, "LocalStrategy"): + return nil default: // TODO: handle unknown replicas and just return the primary host for a token panic(fmt.Sprintf("unsupported strategy class: %v", ks.StrategyClass)) @@ -61,20 +102,28 @@ func (s *simpleStrategy) replicationFactor(dc string) int { return s.rf } -func (s *simpleStrategy) replicaMap(_ []*HostInfo, tokens []hostToken) map[token][]*HostInfo { - tokenRing := make(map[token][]*HostInfo, len(tokens)) +func (s *simpleStrategy) replicaMap(tokenRing *tokenRing) tokenRingReplicas { + tokens := tokenRing.tokens + ring := make(tokenRingReplicas, len(tokens)) for i, th := range tokens { replicas := make([]*HostInfo, 0, s.rf) + seen := make(map[*HostInfo]bool) + for j := 0; j < len(tokens) && len(replicas) < s.rf; j++ { - // TODO: need to ensure we dont add the same hosts twice h := tokens[(i+j)%len(tokens)] - replicas = append(replicas, h.host) + if !seen[h.host] { + replicas = append(replicas, h.host) + seen[h.host] = true + } } - tokenRing[th.token] = replicas + + ring[i] = hostTokens{th.token, replicas} } - return tokenRing + sort.Sort(ring) + + return ring } type networkTopology struct { @@ -99,10 +148,16 @@ func (n *networkTopology) haveRF(replicaCounts map[string]int) bool { return true } -func (n *networkTopology) replicaMap(hosts []*HostInfo, tokens []hostToken) map[token][]*HostInfo { - dcRacks := make(map[string]map[string]struct{}) +func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas { + dcRacks := make(map[string]map[string]struct{}, len(n.dcs)) + // skipped hosts in a dc + skipped := make(map[string][]*HostInfo, len(n.dcs)) + // number of replicas per dc + replicasInDC := make(map[string]int, len(n.dcs)) + // dc -> racks + seenDCRacks := make(map[string]map[string]struct{}, len(n.dcs)) - for _, h := range hosts { + for _, h := range tokenRing.hosts { dc := h.DataCenter() rack := h.Rack() @@ -114,26 +169,39 @@ func (n *networkTopology) replicaMap(hosts []*HostInfo, tokens []hostToken) map[ racks[rack] = struct{}{} } - tokenRing := make(map[token][]*HostInfo, len(tokens)) + for dc, racks := range dcRacks { + replicasInDC[dc] = 0 + seenDCRacks[dc] = make(map[string]struct{}, len(racks)) + } + + tokens := tokenRing.tokens + replicaRing := make(tokenRingReplicas, len(tokens)) var totalRF int for _, rf := range n.dcs { totalRF += rf } - for i, th := range tokens { - // number of replicas per dc - // TODO: recycle these - replicasInDC := make(map[string]int, len(n.dcs)) - // dc -> racks - seenDCRacks := make(map[string]map[string]struct{}, len(n.dcs)) - // skipped hosts in a dc - skipped := make(map[string][]*HostInfo, len(n.dcs)) + for i, th := range tokenRing.tokens { + for k, v := range skipped { + skipped[k] = v[:0] + } + + for dc := range n.dcs { + replicasInDC[dc] = 0 + for rack := range seenDCRacks[dc] { + delete(seenDCRacks[dc], rack) + } + } replicas := make([]*HostInfo, 0, totalRF) - for j := 0; j < len(tokens) && !n.haveRF(replicasInDC); j++ { + for j := 0; j < len(tokens) && (len(replicas) < totalRF && !n.haveRF(replicasInDC)); j++ { // TODO: ensure we dont add the same host twice - h := tokens[(i+j)%len(tokens)].host + p := i + j + if p >= len(tokens) { + p -= len(tokens) + } + h := tokens[p].host dc := h.DataCenter() rack := h.Rack() @@ -152,13 +220,6 @@ func (n *networkTopology) replicaMap(hosts []*HostInfo, tokens []hostToken) map[ } else if _, ok := dcRacks[dc][rack]; !ok { // dont know about this rack continue - } else if len(replicas) >= totalRF { - if replicasInDC[dc] > rf { - panic(fmt.Sprintf("replica overflow. total rf=%d have=%d", totalRF, len(replicas))) - } - - // we now have enough replicas - break } racks := seenDCRacks[dc] @@ -175,7 +236,7 @@ func (n *networkTopology) replicaMap(hosts []*HostInfo, tokens []hostToken) map[ // new rack racks[rack] = struct{}{} replicas = append(replicas, h) - replicasInDC[dc]++ + r := replicasInDC[dc] + 1 if len(racks) == len(dcRacks[dc]) { // if we have been through all the racks, drain the rest of the skipped @@ -183,13 +244,14 @@ func (n *networkTopology) replicaMap(hosts []*HostInfo, tokens []hostToken) map[ // above skippedHosts := skipped[dc] var k int - for ; k < len(skippedHosts) && replicasInDC[dc] < rf; k++ { + for ; k < len(skippedHosts) && r+k < rf; k++ { sh := skippedHosts[k] replicas = append(replicas, sh) - replicasInDC[dc]++ } + r += k skipped[dc] = skippedHosts[k:] } + replicasInDC[dc] = r } else { // already seen this rack, keep hold of this host incase // we dont get enough for rf @@ -197,16 +259,18 @@ func (n *networkTopology) replicaMap(hosts []*HostInfo, tokens []hostToken) map[ } } - if len(replicas) == 0 || replicas[0] != th.host { - panic("first replica is not the primary replica for the token") + if len(replicas) == 0 { + panic(fmt.Sprintf("no replicas for token: %v", th.token)) + } else if !replicas[0].Equal(th.host) { + panic(fmt.Sprintf("first replica is not the primary replica for the token: expected %v got %v", replicas[0].ConnectAddress(), th.host.ConnectAddress())) } - tokenRing[th.token] = replicas + replicaRing[i] = hostTokens{th.token, replicas} } - if len(tokenRing) != len(tokens) { - panic(fmt.Sprintf("token map different size to token ring: got %d expected %d", len(tokenRing), len(tokens))) + if len(replicaRing) != len(tokens) { + panic(fmt.Sprintf("token map different size to token ring: got %d expected %d", len(replicaRing), len(tokens))) } - return tokenRing + return replicaRing } diff --git a/vendor/github.com/gocql/gocql/uuid.go b/vendor/github.com/gocql/gocql/uuid.go index 7ca4c087..13ad3837 100644 --- a/vendor/github.com/gocql/gocql/uuid.go +++ b/vendor/github.com/gocql/gocql/uuid.go @@ -112,21 +112,64 @@ func RandomUUID() (UUID, error) { var timeBase = time.Date(1582, time.October, 15, 0, 0, 0, 0, time.UTC).Unix() +// getTimestamp converts time to UUID (version 1) timestamp. +// It must be an interval of 100-nanoseconds since timeBase. +func getTimestamp(t time.Time) int64 { + utcTime := t.In(time.UTC) + ts := int64(utcTime.Unix()-timeBase)*10000000 + int64(utcTime.Nanosecond()/100) + + return ts +} + // TimeUUID generates a new time based UUID (version 1) using the current // time as the timestamp. func TimeUUID() UUID { return UUIDFromTime(time.Now()) } +// The min and max clock values for a UUID. +// +// Cassandra's TimeUUIDType compares the lsb parts as signed byte arrays. +// Thus, the min value for each byte is -128 and the max is +127. +const ( + minClock = 0x8080 + maxClock = 0x7f7f +) + +// The min and max node values for a UUID. +// +// See explanation about Cassandra's TimeUUIDType comparison logic above. +var ( + minNode = []byte{0x80, 0x80, 0x80, 0x80, 0x80, 0x80} + maxNode = []byte{0x7f, 0x7f, 0x7f, 0x7f, 0x7f, 0x7f} +) + +// MinTimeUUID generates a "fake" time based UUID (version 1) which will be +// the smallest possible UUID generated for the provided timestamp. +// +// UUIDs generated by this function are not unique and are mostly suitable only +// in queries to select a time range of a Cassandra's TimeUUID column. +func MinTimeUUID(t time.Time) UUID { + return TimeUUIDWith(getTimestamp(t), minClock, minNode) +} + +// MaxTimeUUID generates a "fake" time based UUID (version 1) which will be +// the biggest possible UUID generated for the provided timestamp. +// +// UUIDs generated by this function are not unique and are mostly suitable only +// in queries to select a time range of a Cassandra's TimeUUID column. +func MaxTimeUUID(t time.Time) UUID { + return TimeUUIDWith(getTimestamp(t), maxClock, maxNode) +} + // UUIDFromTime generates a new time based UUID (version 1) as described in // RFC 4122. This UUID contains the MAC address of the node that generated // the UUID, the given timestamp and a sequence number. -func UUIDFromTime(aTime time.Time) UUID { - utcTime := aTime.In(time.UTC) - t := int64(utcTime.Unix()-timeBase)*10000000 + int64(utcTime.Nanosecond()/100) +func UUIDFromTime(t time.Time) UUID { + ts := getTimestamp(t) clock := atomic.AddUint32(&clockSeq, 1) - return TimeUUIDWith(t, clock, hardwareAddr) + return TimeUUIDWith(ts, clock, hardwareAddr) } // TimeUUIDWith generates a new time based UUID (version 1) as described in