diff --git a/.travis.yml b/.travis.yml index ea1e5c56..1488f831 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,11 +1,11 @@ language: go go: - - 1.9.x - 1.10.x - 1.11.x - 1.12.x - 1.13.x + - 1.14.x cache: apt diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d1ee7ca..29c74d15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## v6.1.0 - 2020-03-09 + +- Reworked and tested new connection pools with multiple queries per connection +- Socket Read- and WriteTimeout replaced with context timeout +- Mock assert fix +- Connection pool fixed initial size +- Changes added offsets + ## v6.0.0 - 2019-12-22 - 2.4 RethinkDB support diff --git a/Makefile b/Makefile index 2bbaa8a8..379f5679 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,10 @@ test: - test -d ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6 && mv ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6 ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6.bak; true - cp -R . ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6 - go test -coverprofile=cover.out -race gopkg.in/rethinkdb/rethinkdb-go.v6; true - go tool cover -html=cover.out -o cover.html; true - rm -f cover.out; true - rm -rf ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6 - test -d ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6.bak && mv ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6.bak ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6; true + go test -coverprofile=cover.out -race gopkg.in/rethinkdb/rethinkdb-go.v6 gopkg.in/rethinkdb/rethinkdb-go.v6/encoding gopkg.in/rethinkdb/rethinkdb-go.v6/types + go tool cover -html=cover.out -o cover.html + rm -f cover.out + +integration: + go test -race gopkg.in/rethinkdb/rethinkdb-go.v6/internal/integration/... + +benchpool: + go test -v -cpu 1,2,4,8,16,24,32,64,128,256 -bench=BenchmarkConnectionPool -run ^$ ./internal/integration/tests/ diff --git a/README.md b/README.md index 6f9c9456..3b07fca3 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ ![RethinkDB-go Logo](https://raw.github.com/wiki/rethinkdb/rethinkdb-go/gopher-and-thinker-s.png "Golang Gopher and RethinkDB Thinker") -Current version: v6.0.0 (RethinkDB v2.4) +Current version: v6.1.0 (RethinkDB v2.4) Please note that this version of the driver only supports versions of RethinkDB using the v0.4 protocol (any versions of the driver older than RethinkDB 2.0 will not work). diff --git a/connection.go b/connection.go index f9adddc7..f5b760f8 100644 --- a/connection.go +++ b/connection.go @@ -378,11 +378,6 @@ func (c *Connection) sendQuery(q Query) error { binary.LittleEndian.PutUint64(b, uint64(q.Token)) binary.LittleEndian.PutUint32(b[8:], uint32(len(b)-respHeaderLen)) - // Set timeout - if c.opts.WriteTimeout != 0 { - c.Conn.SetWriteDeadline(time.Now().Add(c.opts.WriteTimeout)) - } - // Send the JSON encoding of the query itself. if err = c.writeData(b); err != nil { c.setBad() @@ -402,10 +397,8 @@ func (c *Connection) nextToken() int64 { // readResponse attempts to read a Response from the server, if no response // could be read then an error is returned. func (c *Connection) readResponse() (*Response, error) { - // Set timeout - if c.opts.ReadTimeout != 0 { - c.Conn.SetReadDeadline(time.Now().Add(c.opts.ReadTimeout)) - } + // due to this is pooled connection, it always reads from socket even if idle + // timeouts should be only on query-level with context // Read response header (token+length) headerBuf := [respHeaderLen]byte{} diff --git a/connection_helper.go b/connection_helper.go index a08cb291..974bb830 100644 --- a/connection_helper.go +++ b/connection_helper.go @@ -17,10 +17,14 @@ func (c *Connection) read(buf []byte) (total int, err error) { } func (c *Connection) contextFromConnectionOpts() context.Context { - sum := c.opts.ReadTimeout + c.opts.WriteTimeout - if c.opts.ReadTimeout == 0 || c.opts.WriteTimeout == 0 { + // back compatibility + min := c.opts.ReadTimeout + if c.opts.WriteTimeout < min { + min = c.opts.WriteTimeout + } + if min == 0 { return context.Background() } - ctx, _ := context.WithTimeout(context.Background(), sum) + ctx, _ := context.WithTimeout(context.Background(), min) return ctx } diff --git a/connection_test.go b/connection_test.go index cf5ab387..15a5845f 100644 --- a/connection_test.go +++ b/connection_test.go @@ -25,9 +25,9 @@ func (s *ConnectionSuite) TestConnection_Query_Ok(c *test.C) { header := respHeader(token, respData) conn := &connMock{} - conn.On("Write", writeData).Return(len(writeData), nil) - conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil) - conn.On("Read", len(respData)).Return(respData, len(respData), nil) + conn.On("Write", writeData).Return(len(writeData), nil, nil) + conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil, nil) + conn.On("Read", len(respData)).Return(respData, len(respData), nil, nil) conn.On("Close").Return(nil) connection := newConnection(conn, "addr", &ConnectOpts{}) @@ -60,9 +60,9 @@ func (s *ConnectionSuite) TestConnection_Query_DefaultDBOk(c *test.C) { header := respHeader(token, respData) conn := &connMock{} - conn.On("Write", writeData).Return(len(writeData), nil) - conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil) - conn.On("Read", len(respData)).Return(respData, len(respData), nil) + conn.On("Write", writeData).Return(len(writeData), nil, nil) + conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil, nil) + conn.On("Read", len(respData)).Return(respData, len(respData), nil, nil) conn.On("Close").Return(nil) connection := newConnection(conn, "addr", &ConnectOpts{Database: "db"}) @@ -106,7 +106,7 @@ func (s *ConnectionSuite) TestConnection_Query_SendFail(c *test.C) { writeData := serializeQuery(token, q) conn := &connMock{} - conn.On("Write", writeData).Return(0, io.EOF) + conn.On("Write", writeData).Return(0, io.EOF, nil) connection := newConnection(conn, "addr", &ConnectOpts{}) response, cursor, err := connection.Query(ctx, q) @@ -126,9 +126,9 @@ func (s *ConnectionSuite) TestConnection_Query_NoReplyOk(c *test.C) { header := respHeader(token, respData) conn := &connMock{} - conn.On("Write", writeData).Return(len(writeData), nil) - conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil) - conn.On("Read", len(respData)).Return(respData, len(respData), nil) + conn.On("Write", writeData).Return(len(writeData), nil, nil) + conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil, nil) + conn.On("Read", len(respData)).Return(respData, len(respData), nil, nil) conn.On("Close").Return(nil) connection := newConnection(conn, "addr", &ConnectOpts{}) @@ -151,9 +151,8 @@ func (s *ConnectionSuite) TestConnection_Query_TimeoutWrite(c *test.C) { stopData := serializeQuery(token, newStopQuery(token)) conn := &connMock{} - conn.On("Write", writeData).Return(len(writeData), nil) - conn.On("Write", stopData).Return(len(stopData), nil) - conn.On("SetWriteDeadline").Return(nil) + conn.On("Write", writeData).Return(len(writeData), nil, nil) + conn.On("Write", stopData).Return(len(stopData), nil, nil) connection := newConnection(conn, "addr", &ConnectOpts{ReadTimeout: time.Millisecond, WriteTimeout: time.Millisecond}) connection.readRequestsChan = make(chan tokenAndPromise, 0) @@ -174,9 +173,8 @@ func (s *ConnectionSuite) TestConnection_Query_TimeoutRead(c *test.C) { stopData := serializeQuery(token, newStopQuery(token)) conn := &connMock{} - conn.On("Write", writeData).Return(len(writeData), nil) - conn.On("Write", stopData).Return(len(stopData), nil) - conn.On("SetWriteDeadline").Return(nil) + conn.On("Write", writeData).Return(len(writeData), nil, 10*time.Millisecond) + conn.On("Write", stopData).Return(len(stopData), nil, nil) connection := newConnection(conn, "addr", &ConnectOpts{ReadTimeout: time.Millisecond, WriteTimeout: time.Millisecond}) response, cursor, err := connection.Query(ctx, q) @@ -196,7 +194,7 @@ func (s *ConnectionSuite) TestConnection_Query_SendFailTracing(c *test.C) { writeData := serializeQuery(token, q) conn := &connMock{} - conn.On("Write", writeData).Return(0, io.EOF) + conn.On("Write", writeData).Return(0, io.EOF, nil) connection := newConnection(conn, "addr", &ConnectOpts{UseOpentracing: true}) response, cursor, err := connection.Query(ctx, q) @@ -306,8 +304,7 @@ func (s *ConnectionSuite) TestConnection_readResponse_TimeoutHeader(c *test.C) { timeout := time.Second conn := &connMock{} - conn.On("SetReadDeadline").Return(nil) - conn.On("Read", respHeaderLen).Return(nil, 0, io.EOF) + conn.On("Read", respHeaderLen).Return(nil, 0, io.EOF, nil) connection := newConnection(conn, "addr", &ConnectOpts{ReadTimeout: timeout}) @@ -325,8 +322,8 @@ func (s *ConnectionSuite) TestConnection_readResponse_BodySocketErr(c *test.C) { header := respHeader(token, respData) conn := &connMock{} - conn.On("Read", respHeaderLen).Return(header, len(header), nil) - conn.On("Read", len(respData)).Return(nil, 0, io.EOF) + conn.On("Read", respHeaderLen).Return(header, len(header), nil, nil) + conn.On("Read", len(respData)).Return(nil, 0, io.EOF, nil) connection := newConnection(conn, "addr", &ConnectOpts{}) @@ -344,8 +341,8 @@ func (s *ConnectionSuite) TestConnection_readResponse_BodyUnmarshalErr(c *test.C header := respHeader(token, respData) conn := &connMock{} - conn.On("Read", respHeaderLen).Return(header, len(header), nil) - conn.On("Read", len(respData)).Return(make([]byte, len(respData)), len(respData), nil) + conn.On("Read", respHeaderLen).Return(header, len(header), nil, nil) + conn.On("Read", len(respData)).Return(make([]byte, len(respData)), len(respData), nil, nil) connection := newConnection(conn, "addr", &ConnectOpts{}) diff --git a/encoding/decoder.go b/encoding/decoder.go index b1416909..7d287fbb 100644 --- a/encoding/decoder.go +++ b/encoding/decoder.go @@ -78,7 +78,10 @@ func valueDecoder(dv, sv reflect.Value, blank bool) decoderFunc { if dv.IsValid() { dv = indirect(dv, false) - if blank { + if sv.Kind() == reflect.Ptr { + sv = indirect(sv, false) + dv.Set(sv) + } else if blank { dv.Set(reflect.Zero(dv.Type())) } } diff --git a/go.mod b/go.mod index 7e1514b1..6616e673 100644 --- a/go.mod +++ b/go.mod @@ -1,21 +1,28 @@ module gopkg.in/rethinkdb/rethinkdb-go.v6 require ( - github.com/cenkalti/backoff v2.0.0+incompatible + github.com/bitly/go-hostpool v0.1.0 // indirect + github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect + github.com/cenkalti/backoff v2.2.1+incompatible github.com/davecgh/go-spew v1.1.1 // indirect - github.com/golang/protobuf v1.2.0 + github.com/golang/protobuf v1.3.4 github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed github.com/kr/pretty v0.1.0 // indirect - github.com/opentracing/opentracing-go v1.0.2 - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect + github.com/onsi/ginkgo v1.12.0 // indirect + github.com/onsi/gomega v1.9.0 // indirect + github.com/opentracing/opentracing-go v1.1.0 github.com/sirupsen/logrus v1.0.6 - github.com/stretchr/objx v0.1.1 // indirect - github.com/stretchr/testify v1.2.2 - golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac - golang.org/x/net v0.0.0-20180826012351-8a410e7b638d - golang.org/x/sys v0.0.0-20180828065106-d99a578cf41b // indirect - gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 - gopkg.in/fatih/pool.v2 v2.0.0 + github.com/stretchr/objx v0.2.0 // indirect + github.com/stretchr/testify v1.5.1 + golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073 + golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 + golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect + gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect + gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f + gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect + gopkg.in/yaml.v2 v2.2.8 // indirect ) -go 1.13 +go 1.14 diff --git a/internal/integration/tests/benchmarks_test.go b/internal/integration/tests/benchmarks_test.go index f2d9300d..532f6c67 100644 --- a/internal/integration/tests/benchmarks_test.go +++ b/internal/integration/tests/benchmarks_test.go @@ -9,6 +9,62 @@ import ( "time" ) +func BenchmarkConnectionPoolLightweightQuery_Single(b *testing.B) { + q := r.Random() + + for i := 0; i < b.N; i++ { + var num float64 + err := q.ReadOne(&num, session) + if err != nil { + b.Errorf("read random number failed: %v", err) + } + } +} + +func BenchmarkConnectionPoolLightweightQuery_Parallel(b *testing.B) { + q := r.Random() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var num float64 + err := q.ReadOne(&num, session) + if err != nil { + b.Errorf("read random number failed: %v", err) + } + } + }) +} + +func BenchmarkConnectionPoolLightweightQuery_Parallel3X(b *testing.B) { + q := r.Random() + + b.SetParallelism(3) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var num float64 + err := q.ReadOne(&num, session) + if err != nil { + b.Errorf("read random number failed: %v", err) + } + } + }) +} + +func BenchmarkConnectionPoolLightweightQuery_Parallel10X(b *testing.B) { + q := r.Random() + + b.SetParallelism(10) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var num float64 + err := q.ReadOne(&num, session) + if err != nil { + b.Errorf("read random number failed: %v", err) + } + } + }) +} + func BenchmarkBatch200RandomWrites(b *testing.B) { var term r.Term diff --git a/internal/integration/tests/session_test.go b/internal/integration/tests/session_test.go index f250df70..1f870c95 100644 --- a/internal/integration/tests/session_test.go +++ b/internal/integration/tests/session_test.go @@ -158,3 +158,49 @@ func (s *RethinkSuite) TestSessionConnectUsername(c *test.C) { _, err = r.Expr("Hello World").Run(session) c.Assert(err, test.IsNil) } + +func (s *RethinkSuite) TestSessionIdleConnectionRemainsUsableSmallTimeout(c *test.C) { + session, err := r.Connect(r.ConnectOpts{ + Address: url, + NumRetries: 1, + InitialCap: 1, + ReadTimeout: 10 * time.Millisecond, + WriteTimeout: 10 * time.Millisecond, + }) + c.Assert(err, test.IsNil) + + time.Sleep(20 * time.Millisecond) + + var num int + err = r.Expr(5).ReadOne(&num, session) + c.Assert(err, test.IsNil) + c.Assert(num, test.Equals, 5) + + time.Sleep(20 * time.Millisecond) + + err = r.Expr(6).ReadOne(&num, session) + c.Assert(err, test.IsNil) + c.Assert(num, test.Equals, 6) +} + +func (s *RethinkSuite) TestSessionIdleConnectionRemainsUsableNoTimeout(c *test.C) { + session, err := r.Connect(r.ConnectOpts{ + Address: url, + NumRetries: 1, + InitialCap: 1, + }) + c.Assert(err, test.IsNil) + + time.Sleep(10 * time.Millisecond) + + var num int + err = r.Expr(5).ReadOne(&num, session) + c.Assert(err, test.IsNil) + c.Assert(num, test.Equals, 5) + + time.Sleep(10 * time.Millisecond) + + err = r.Expr(6).ReadOne(&num, session) + c.Assert(err, test.IsNil) + c.Assert(num, test.Equals, 6) +} diff --git a/mock_test.go b/mock_test.go index 041dda21..c16dd4a8 100644 --- a/mock_test.go +++ b/mock_test.go @@ -2,7 +2,6 @@ package rethinkdb import ( "fmt" - "testing" test "gopkg.in/check.v1" @@ -371,6 +370,54 @@ func (s *MockSuite) TestMockRethinkStructsRunWrite(c *test.C) { mock.AssertExpectations(c) } +func (s *MockSuite) TestMockMapSliceResultOk(c *test.C) { + type Some struct { + Id string + } + + result := []map[string]interface{}{ + {"Id": "test1"}, + {"Id": "test2"}, + } + + mock := NewMock() + q := DB("test").Table("test").GetAll() + mock.On(q).Return(result, nil) + res, err := q.Run(mock) + c.Assert(err, test.IsNil) + + var casted []*Some + err = res.All(&casted) + c.Assert(err, test.IsNil) + + c.Assert(casted[0].Id, test.Equals, "test1") + c.Assert(casted[1].Id, test.Equals, "test2") +} + +func (s *MockSuite) TestMockPointerSliceResultOk(c *test.C) { + type Some struct { + Id string + } + + result := []*Some{ + {Id: "test1"}, + {Id: "test2"}, + } + + mock := NewMock() + q := DB("test").Table("test").GetAll() + mock.On(q).Return(result, nil) + res, err := q.Run(mock) + c.Assert(err, test.IsNil) + + var casted []*Some + err = res.All(&casted) + c.Assert(err, test.IsNil) + + c.Assert(casted[0].Id, test.Equals, "test1") + c.Assert(casted[1].Id, test.Equals, "test2") +} + type simpleTestingT struct { failed bool } diff --git a/mocks_test.go b/mocks_test.go index 937c658f..a86da3bd 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -16,11 +16,19 @@ func (m *connMock) Read(b []byte) (n int, err error) { if ok { copy(b, rbuf) } + timeout := args.Get(3) + if timeout != nil { + time.Sleep(timeout.(time.Duration)) + } return args.Int(1), args.Error(2) } func (m *connMock) Write(b []byte) (n int, err error) { args := m.Called(b) + timeout := args.Get(2) + if timeout != nil { + time.Sleep(timeout.(time.Duration)) + } return args.Int(0), args.Error(1) } diff --git a/pool.go b/pool.go index e84ac5e0..3bfb5149 100644 --- a/pool.go +++ b/pool.go @@ -6,13 +6,17 @@ import ( "sync/atomic" "golang.org/x/net/context" - "gopkg.in/fatih/pool.v2" ) var ( errPoolClosed = errors.New("rethinkdb: pool is closed") ) +const ( + poolIsNotClosed int32 = 0 + poolIsClosed int32 = 1 +) + // A Pool is used to store a pool of connections to a single RethinkDB server type Pool struct { host Host @@ -20,9 +24,9 @@ type Pool struct { conns []*Connection pointer int32 + closed int32 - mu sync.RWMutex // protects following fields - closed bool + mu sync.Mutex // protects lazy creating connections } // NewPool creates a new connection pool for the given host @@ -41,7 +45,7 @@ func NewPool(host Host, opts *ConnectOpts) (*Pool, error) { conns := make([]*Connection, maxOpen) var err error - for i := range conns { + for i := 0; i < opts.InitialCap; i++ { conns[i], err = NewConnection(host.String(), opts) if err != nil { return nil, err @@ -53,13 +57,14 @@ func NewPool(host Host, opts *ConnectOpts) (*Pool, error) { pointer: -1, host: host, opts: opts, + closed: poolIsNotClosed, }, nil } // Ping verifies a connection to the database is still alive, // establishing a connection if necessary. func (p *Pool) Ping() error { - _, _, err := p.conn() + _, err := p.conn() return err } @@ -68,11 +73,17 @@ func (p *Pool) Ping() error { // It is rare to Close a Pool, as the Pool handle is meant to be // long-lived and shared between many goroutines. func (p *Pool) Close() error { - p.mu.RLock() - defer p.mu.RUnlock() - if p.closed { + if atomic.LoadInt32(&p.closed) == poolIsClosed { + return nil + } + + p.mu.Lock() + defer p.mu.Unlock() + + if p.closed == poolIsClosed { return nil } + p.closed = poolIsClosed for _, c := range p.conns { err := c.Close() @@ -84,14 +95,10 @@ func (p *Pool) Close() error { return nil } -func (p *Pool) conn() (*Connection, *pool.PoolConn, error) { - p.mu.RLock() - - if p.closed { - p.mu.RUnlock() - return nil, nil, errPoolClosed +func (p *Pool) conn() (*Connection, error) { + if atomic.LoadInt32(&p.closed) == poolIsClosed { + return nil, errPoolClosed } - p.mu.RUnlock() pos := atomic.AddInt32(&p.pointer, 1) if pos == int32(len(p.conns)) { @@ -99,7 +106,20 @@ func (p *Pool) conn() (*Connection, *pool.PoolConn, error) { } pos = pos % int32(len(p.conns)) - return p.conns[pos], nil, nil + if p.conns[pos] == nil { + p.mu.Lock() + defer p.mu.Unlock() + + if p.conns[pos] == nil { + var err error + p.conns[pos], err = NewConnection(p.host.String(), p.opts) + if err != nil { + return nil, err + } + } + } + + return p.conns[pos], nil } // SetInitialPoolCap sets the initial capacity of the connection pool. @@ -128,7 +148,7 @@ func (p *Pool) SetMaxOpenConns(n int) { // Exec executes a query without waiting for any response. func (p *Pool) Exec(ctx context.Context, q Query) error { - c, _, err := p.conn() + c, err := p.conn() if err != nil { return err } @@ -139,7 +159,7 @@ func (p *Pool) Exec(ctx context.Context, q Query) error { // Query executes a query and waits for the response func (p *Pool) Query(ctx context.Context, q Query) (*Cursor, error) { - c, _, err := p.conn() + c, err := p.conn() if err != nil { return nil, err } @@ -152,7 +172,7 @@ func (p *Pool) Query(ctx context.Context, q Query) (*Cursor, error) { func (p *Pool) Server() (ServerResponse, error) { var response ServerResponse - c, _, err := p.conn() + c, err := p.conn() if err != nil { return response, err } diff --git a/query.go b/query.go index eb385d3a..10c4dc12 100644 --- a/query.go +++ b/query.go @@ -289,11 +289,13 @@ type WriteResponse struct { // ChangeResponse is a helper type used when dealing with changefeeds. The type // contains both the value before the query and the new value. type ChangeResponse struct { - NewValue interface{} `rethinkdb:"new_val,omitempty"` - OldValue interface{} `rethinkdb:"old_val,omitempty"` - State string `rethinkdb:"state,omitempty"` - Error string `rethinkdb:"error,omitempty"` - Type string `rethinkdb:"type,omitempty"` + NewValue interface{} `rethinkdb:"new_val,omitempty"` + OldValue interface{} `rethinkdb:"old_val,omitempty"` + State string `rethinkdb:"state,omitempty"` + Error string `rethinkdb:"error,omitempty"` + Type string `rethinkdb:"type,omitempty"` + OldOffset int `rethinkdb:"old_offset,omitempty"` + NewOffset int `rethinkdb:"new_offset,omitempty"` } // RunOpts contains the optional arguments for the Run function. diff --git a/rethinkdb.go b/rethinkdb.go index 0a22bf72..eaf02a5a 100644 --- a/rethinkdb.go +++ b/rethinkdb.go @@ -10,6 +10,8 @@ import ( ) var ( + // Log is logger for debug purpuses. + // deprecated Log *logrus.Logger ) diff --git a/session.go b/session.go index eefcfd03..0e5c731a 100644 --- a/session.go +++ b/session.go @@ -46,9 +46,11 @@ type ConnectOpts struct { Timeout time.Duration `rethinkdb:"timeout,omitempty" json:"timeout,omitempty"` // WriteTimeout is the amount of time the driver will wait when sending the // query to the server + // Deprecated: use RunOpts.Context instead WriteTimeout time.Duration `rethinkdb:"write_timeout,omitempty" json:"write_timeout,omitempty"` // ReadTimeout is the amount of time the driver will wait for a response from // the server when executing queries. + // Deprecated: use RunOpts.Context instead ReadTimeout time.Duration `rethinkdb:"read_timeout,omitempty" json:"read_timeout,omitempty"` // KeepAlivePeriod is the keep alive period used by the connection, by default // this is 30s. It is not possible to disable keep alive messages @@ -76,10 +78,8 @@ type ConnectOpts struct { // the first query is executed. InitialCap int `rethinkdb:"initial_cap,omitempty" json:"initial_cap,omitempty"` // MaxOpen is used by the internal connection pool and is used to configure - // the maximum number of connections held in the pool. If all available - // connections are being used then the driver will open new connections as - // needed however they will not be returned to the pool. By default the - // maximum number of connections is 2 + // the maximum number of connections held in the pool. By default the + // maximum number of connections is 1 MaxOpen int `rethinkdb:"max_open,omitempty" json:"max_open,omitempty"` // Below options are for cluster discovery, please note there is a high