Skip to content

Commit

Permalink
Merge pull request #478 from rethinkdb/develop
Browse files Browse the repository at this point in the history
6.1.0 release
  • Loading branch information
CMogilko authored Mar 9, 2020
2 parents ce1a82d + d5da520 commit bbcde53
Show file tree
Hide file tree
Showing 17 changed files with 281 additions and 86 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
language: go

go:
- 1.9.x
- 1.10.x
- 1.11.x
- 1.12.x
- 1.13.x
- 1.14.x

cache: apt

Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).

## v6.1.0 - 2020-03-09

- Reworked and tested new connection pools with multiple queries per connection
- Socket Read- and WriteTimeout replaced with context timeout
- Mock assert fix
- Connection pool fixed initial size
- Changes added offsets

## v6.0.0 - 2019-12-22

- 2.4 RethinkDB support
Expand Down
16 changes: 9 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
test:
test -d ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6 && mv ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6 ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6.bak; true
cp -R . ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6
go test -coverprofile=cover.out -race gopkg.in/rethinkdb/rethinkdb-go.v6; true
go tool cover -html=cover.out -o cover.html; true
rm -f cover.out; true
rm -rf ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6
test -d ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6.bak && mv ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6.bak ${GOPATH}/src/gopkg.in/rethinkdb/rethinkdb-go.v6; true
go test -coverprofile=cover.out -race gopkg.in/rethinkdb/rethinkdb-go.v6 gopkg.in/rethinkdb/rethinkdb-go.v6/encoding gopkg.in/rethinkdb/rethinkdb-go.v6/types
go tool cover -html=cover.out -o cover.html
rm -f cover.out

integration:
go test -race gopkg.in/rethinkdb/rethinkdb-go.v6/internal/integration/...

benchpool:
go test -v -cpu 1,2,4,8,16,24,32,64,128,256 -bench=BenchmarkConnectionPool -run ^$ ./internal/integration/tests/
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

![RethinkDB-go Logo](https://raw.github.com/wiki/rethinkdb/rethinkdb-go/gopher-and-thinker-s.png "Golang Gopher and RethinkDB Thinker")

Current version: v6.0.0 (RethinkDB v2.4)
Current version: v6.1.0 (RethinkDB v2.4)

Please note that this version of the driver only supports versions of RethinkDB using the v0.4 protocol (any versions of the driver older than RethinkDB 2.0 will not work).

Expand Down
11 changes: 2 additions & 9 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,6 @@ func (c *Connection) sendQuery(q Query) error {
binary.LittleEndian.PutUint64(b, uint64(q.Token))
binary.LittleEndian.PutUint32(b[8:], uint32(len(b)-respHeaderLen))

// Set timeout
if c.opts.WriteTimeout != 0 {
c.Conn.SetWriteDeadline(time.Now().Add(c.opts.WriteTimeout))
}

// Send the JSON encoding of the query itself.
if err = c.writeData(b); err != nil {
c.setBad()
Expand All @@ -402,10 +397,8 @@ func (c *Connection) nextToken() int64 {
// readResponse attempts to read a Response from the server, if no response
// could be read then an error is returned.
func (c *Connection) readResponse() (*Response, error) {
// Set timeout
if c.opts.ReadTimeout != 0 {
c.Conn.SetReadDeadline(time.Now().Add(c.opts.ReadTimeout))
}
// due to this is pooled connection, it always reads from socket even if idle
// timeouts should be only on query-level with context

// Read response header (token+length)
headerBuf := [respHeaderLen]byte{}
Expand Down
10 changes: 7 additions & 3 deletions connection_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ func (c *Connection) read(buf []byte) (total int, err error) {
}

func (c *Connection) contextFromConnectionOpts() context.Context {
sum := c.opts.ReadTimeout + c.opts.WriteTimeout
if c.opts.ReadTimeout == 0 || c.opts.WriteTimeout == 0 {
// back compatibility
min := c.opts.ReadTimeout
if c.opts.WriteTimeout < min {
min = c.opts.WriteTimeout
}
if min == 0 {
return context.Background()
}
ctx, _ := context.WithTimeout(context.Background(), sum)
ctx, _ := context.WithTimeout(context.Background(), min)
return ctx
}
43 changes: 20 additions & 23 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ func (s *ConnectionSuite) TestConnection_Query_Ok(c *test.C) {
header := respHeader(token, respData)

conn := &connMock{}
conn.On("Write", writeData).Return(len(writeData), nil)
conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil)
conn.On("Read", len(respData)).Return(respData, len(respData), nil)
conn.On("Write", writeData).Return(len(writeData), nil, nil)
conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil, nil)
conn.On("Read", len(respData)).Return(respData, len(respData), nil, nil)
conn.On("Close").Return(nil)

connection := newConnection(conn, "addr", &ConnectOpts{})
Expand Down Expand Up @@ -60,9 +60,9 @@ func (s *ConnectionSuite) TestConnection_Query_DefaultDBOk(c *test.C) {
header := respHeader(token, respData)

conn := &connMock{}
conn.On("Write", writeData).Return(len(writeData), nil)
conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil)
conn.On("Read", len(respData)).Return(respData, len(respData), nil)
conn.On("Write", writeData).Return(len(writeData), nil, nil)
conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil, nil)
conn.On("Read", len(respData)).Return(respData, len(respData), nil, nil)
conn.On("Close").Return(nil)

connection := newConnection(conn, "addr", &ConnectOpts{Database: "db"})
Expand Down Expand Up @@ -106,7 +106,7 @@ func (s *ConnectionSuite) TestConnection_Query_SendFail(c *test.C) {
writeData := serializeQuery(token, q)

conn := &connMock{}
conn.On("Write", writeData).Return(0, io.EOF)
conn.On("Write", writeData).Return(0, io.EOF, nil)

connection := newConnection(conn, "addr", &ConnectOpts{})
response, cursor, err := connection.Query(ctx, q)
Expand All @@ -126,9 +126,9 @@ func (s *ConnectionSuite) TestConnection_Query_NoReplyOk(c *test.C) {
header := respHeader(token, respData)

conn := &connMock{}
conn.On("Write", writeData).Return(len(writeData), nil)
conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil)
conn.On("Read", len(respData)).Return(respData, len(respData), nil)
conn.On("Write", writeData).Return(len(writeData), nil, nil)
conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil, nil)
conn.On("Read", len(respData)).Return(respData, len(respData), nil, nil)
conn.On("Close").Return(nil)

connection := newConnection(conn, "addr", &ConnectOpts{})
Expand All @@ -151,9 +151,8 @@ func (s *ConnectionSuite) TestConnection_Query_TimeoutWrite(c *test.C) {
stopData := serializeQuery(token, newStopQuery(token))

conn := &connMock{}
conn.On("Write", writeData).Return(len(writeData), nil)
conn.On("Write", stopData).Return(len(stopData), nil)
conn.On("SetWriteDeadline").Return(nil)
conn.On("Write", writeData).Return(len(writeData), nil, nil)
conn.On("Write", stopData).Return(len(stopData), nil, nil)

connection := newConnection(conn, "addr", &ConnectOpts{ReadTimeout: time.Millisecond, WriteTimeout: time.Millisecond})
connection.readRequestsChan = make(chan tokenAndPromise, 0)
Expand All @@ -174,9 +173,8 @@ func (s *ConnectionSuite) TestConnection_Query_TimeoutRead(c *test.C) {
stopData := serializeQuery(token, newStopQuery(token))

conn := &connMock{}
conn.On("Write", writeData).Return(len(writeData), nil)
conn.On("Write", stopData).Return(len(stopData), nil)
conn.On("SetWriteDeadline").Return(nil)
conn.On("Write", writeData).Return(len(writeData), nil, 10*time.Millisecond)
conn.On("Write", stopData).Return(len(stopData), nil, nil)

connection := newConnection(conn, "addr", &ConnectOpts{ReadTimeout: time.Millisecond, WriteTimeout: time.Millisecond})
response, cursor, err := connection.Query(ctx, q)
Expand All @@ -196,7 +194,7 @@ func (s *ConnectionSuite) TestConnection_Query_SendFailTracing(c *test.C) {
writeData := serializeQuery(token, q)

conn := &connMock{}
conn.On("Write", writeData).Return(0, io.EOF)
conn.On("Write", writeData).Return(0, io.EOF, nil)

connection := newConnection(conn, "addr", &ConnectOpts{UseOpentracing: true})
response, cursor, err := connection.Query(ctx, q)
Expand Down Expand Up @@ -306,8 +304,7 @@ func (s *ConnectionSuite) TestConnection_readResponse_TimeoutHeader(c *test.C) {
timeout := time.Second

conn := &connMock{}
conn.On("SetReadDeadline").Return(nil)
conn.On("Read", respHeaderLen).Return(nil, 0, io.EOF)
conn.On("Read", respHeaderLen).Return(nil, 0, io.EOF, nil)

connection := newConnection(conn, "addr", &ConnectOpts{ReadTimeout: timeout})

Expand All @@ -325,8 +322,8 @@ func (s *ConnectionSuite) TestConnection_readResponse_BodySocketErr(c *test.C) {
header := respHeader(token, respData)

conn := &connMock{}
conn.On("Read", respHeaderLen).Return(header, len(header), nil)
conn.On("Read", len(respData)).Return(nil, 0, io.EOF)
conn.On("Read", respHeaderLen).Return(header, len(header), nil, nil)
conn.On("Read", len(respData)).Return(nil, 0, io.EOF, nil)

connection := newConnection(conn, "addr", &ConnectOpts{})

Expand All @@ -344,8 +341,8 @@ func (s *ConnectionSuite) TestConnection_readResponse_BodyUnmarshalErr(c *test.C
header := respHeader(token, respData)

conn := &connMock{}
conn.On("Read", respHeaderLen).Return(header, len(header), nil)
conn.On("Read", len(respData)).Return(make([]byte, len(respData)), len(respData), nil)
conn.On("Read", respHeaderLen).Return(header, len(header), nil, nil)
conn.On("Read", len(respData)).Return(make([]byte, len(respData)), len(respData), nil, nil)

connection := newConnection(conn, "addr", &ConnectOpts{})

Expand Down
5 changes: 4 additions & 1 deletion encoding/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ func valueDecoder(dv, sv reflect.Value, blank bool) decoderFunc {

if dv.IsValid() {
dv = indirect(dv, false)
if blank {
if sv.Kind() == reflect.Ptr {
sv = indirect(sv, false)
dv.Set(sv)
} else if blank {
dv.Set(reflect.Zero(dv.Type()))
}
}
Expand Down
31 changes: 19 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
module gopkg.in/rethinkdb/rethinkdb-go.v6

require (
github.com/cenkalti/backoff v2.0.0+incompatible
github.com/bitly/go-hostpool v0.1.0 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.2.0
github.com/golang/protobuf v1.3.4
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed
github.com/kr/pretty v0.1.0 // indirect
github.com/opentracing/opentracing-go v1.0.2
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/onsi/ginkgo v1.12.0 // indirect
github.com/onsi/gomega v1.9.0 // indirect
github.com/opentracing/opentracing-go v1.1.0
github.com/sirupsen/logrus v1.0.6
github.com/stretchr/objx v0.1.1 // indirect
github.com/stretchr/testify v1.2.2
golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d
golang.org/x/sys v0.0.0-20180828065106-d99a578cf41b // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127
gopkg.in/fatih/pool.v2 v2.0.0
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.5.1
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect
gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
)

go 1.13
go 1.14
56 changes: 56 additions & 0 deletions internal/integration/tests/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,62 @@ import (
"time"
)

func BenchmarkConnectionPoolLightweightQuery_Single(b *testing.B) {
q := r.Random()

for i := 0; i < b.N; i++ {
var num float64
err := q.ReadOne(&num, session)
if err != nil {
b.Errorf("read random number failed: %v", err)
}
}
}

func BenchmarkConnectionPoolLightweightQuery_Parallel(b *testing.B) {
q := r.Random()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var num float64
err := q.ReadOne(&num, session)
if err != nil {
b.Errorf("read random number failed: %v", err)
}
}
})
}

func BenchmarkConnectionPoolLightweightQuery_Parallel3X(b *testing.B) {
q := r.Random()

b.SetParallelism(3)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var num float64
err := q.ReadOne(&num, session)
if err != nil {
b.Errorf("read random number failed: %v", err)
}
}
})
}

func BenchmarkConnectionPoolLightweightQuery_Parallel10X(b *testing.B) {
q := r.Random()

b.SetParallelism(10)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var num float64
err := q.ReadOne(&num, session)
if err != nil {
b.Errorf("read random number failed: %v", err)
}
}
})
}

func BenchmarkBatch200RandomWrites(b *testing.B) {

var term r.Term
Expand Down
46 changes: 46 additions & 0 deletions internal/integration/tests/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,49 @@ func (s *RethinkSuite) TestSessionConnectUsername(c *test.C) {
_, err = r.Expr("Hello World").Run(session)
c.Assert(err, test.IsNil)
}

func (s *RethinkSuite) TestSessionIdleConnectionRemainsUsableSmallTimeout(c *test.C) {
session, err := r.Connect(r.ConnectOpts{
Address: url,
NumRetries: 1,
InitialCap: 1,
ReadTimeout: 10 * time.Millisecond,
WriteTimeout: 10 * time.Millisecond,
})
c.Assert(err, test.IsNil)

time.Sleep(20 * time.Millisecond)

var num int
err = r.Expr(5).ReadOne(&num, session)
c.Assert(err, test.IsNil)
c.Assert(num, test.Equals, 5)

time.Sleep(20 * time.Millisecond)

err = r.Expr(6).ReadOne(&num, session)
c.Assert(err, test.IsNil)
c.Assert(num, test.Equals, 6)
}

func (s *RethinkSuite) TestSessionIdleConnectionRemainsUsableNoTimeout(c *test.C) {
session, err := r.Connect(r.ConnectOpts{
Address: url,
NumRetries: 1,
InitialCap: 1,
})
c.Assert(err, test.IsNil)

time.Sleep(10 * time.Millisecond)

var num int
err = r.Expr(5).ReadOne(&num, session)
c.Assert(err, test.IsNil)
c.Assert(num, test.Equals, 5)

time.Sleep(10 * time.Millisecond)

err = r.Expr(6).ReadOne(&num, session)
c.Assert(err, test.IsNil)
c.Assert(num, test.Equals, 6)
}
Loading

0 comments on commit bbcde53

Please sign in to comment.