Skip to content

Commit

Permalink
Merge branch 'release/v0.7.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
dancannon committed Apr 19, 2015
2 parents b1a2248 + 8a08998 commit 0fcd578
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 25 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).

## v0.7.1 - 2015-04-19
### Changed
- Improved logging of connection errors.

### Fixed
- Fixed bug causing empty times to be inserted into the DB even when the omitempty tag was set.
- Fixed node status refresh loop leaking goroutines.

## v0.7.0 - 2015-03-30

This release includes support for RethinkDB 2.0 and connecting to clusters. To connect to a cluster you should use the new `Addresses` field in `ConnectOpts`, for example:
Expand Down
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
[Go](http://golang.org/) driver for [RethinkDB](http://www.rethinkdb.com/)


Current version: v0.7.0 (RethinkDB v2.0)
Current version: v0.7.1 (RethinkDB v2.0)

Please note that this version of the driver only supports versions of RethinkDB using the v0.4 protocol (any versions of the driver older than RethinkDB 2.0 will not work).

[![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/dancannon/gorethink?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge)

Expand All @@ -30,7 +32,9 @@ import (

var session *r.Session

session, err := Connect(address)
session, err := r.Connect(r.ConnectOpts{
Address: "localhost:28015",
})
if err != nil {
log.Fatalln(err.Error())
}
Expand Down Expand Up @@ -62,12 +66,12 @@ session.SetMaxOpenConns(5)

### Connect to a cluster

To connect to a RethinkDB cluster which has multiple nodes you can use the following syntax.
To connect to a RethinkDB cluster which has multiple nodes you can use the following syntax. When connecting to a cluster with multiple nodes queries will be distributed between these nodes.

```go
var session *r.Session

session, err := r.Conenct(r.ConnectOpts{
session, err := r.Connect(r.ConnectOpts{
Addresses: []string{"localhost:28015", "localhost:28016"},
Database: "test",
AuthKey: "14daak1cad13dj",
Expand Down Expand Up @@ -117,7 +121,6 @@ r.Db("database").Table("table").Between(1, 10, r.BetweenOpts{
}).Run(session)
```


### Optional Arguments

As shown above in the Between example optional arguments are passed to the function as a struct. Each function that has optional arguments as a related struct. This structs are named in the format FunctionNameOpts, for example BetweenOpts is the related struct for Between.
Expand Down
2 changes: 2 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (c *Cluster) connectNodes(hosts []Host) {
for _, host := range hosts {
conn, err := NewConnection(host.String(), c.opts)
if err != nil {
log.Warnf("Error creating connection %s", err.Error())
continue
}
defer conn.Close()
Expand All @@ -201,6 +202,7 @@ func (c *Cluster) connectNodes(hosts []Host) {
c.opts,
))
if err != nil {
log.Warnf("Error fetching cluster status %s", err)
continue
}

Expand Down
11 changes: 9 additions & 2 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func (c *Connection) sendQuery(q Query) error {

// Set timeout
if c.opts.Timeout == 0 {
c.conn.SetDeadline(time.Time{})
c.conn.SetWriteDeadline(time.Time{})
} else {
c.conn.SetDeadline(time.Now().Add(c.opts.Timeout))
c.conn.SetWriteDeadline(time.Now().Add(c.opts.Timeout))
}

// Send the JSON encoding of the query itself.
Expand All @@ -165,6 +165,13 @@ func (c *Connection) nextToken() int64 {
}

func (c *Connection) readResponse() (*Response, error) {
// Set timeout
if c.opts.Timeout == 0 {
c.conn.SetReadDeadline(time.Time{})
} else {
c.conn.SetReadDeadline(time.Now().Add(c.opts.Timeout))
}

// Read response header (token+length)
_, err := io.ReadFull(c.conn, c.headerBuf[:respHeaderLen])
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions connection_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"fmt"
"io"
"strings"

p "github.com/dancannon/gorethink/ql2"
)
Expand Down Expand Up @@ -60,6 +61,7 @@ func (c *Connection) readHandshakeSuccess() error {
// convert to string and remove trailing NUL byte
response := string(line[:len(line)-1])
if response != "SUCCESS" {
response = strings.TrimSpace(response)
// we failed authorization or something else terrible happened
return RqlDriverError{fmt.Sprintf("Server dropped connection with message: \"%s\"", response)}
}
Expand Down
2 changes: 1 addition & 1 deletion doc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package gorethink implements a Go driver for RethinkDB
//
// Current version: v0.7.0 (RethinkDB v2.0)
// Current version: v0.7.1 (RethinkDB v2.0)
// For more in depth information on how to use RethinkDB check out the API docs
// at http://rethinkdb.com/api
package gorethink
10 changes: 8 additions & 2 deletions encoding/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"image"
"reflect"
"testing"
"time"
)

var encodeExpected = map[string]interface{}{
Expand Down Expand Up @@ -74,6 +75,9 @@ type Optionals struct {
Ir int `gorethink:"omitempty"` // actually named omitempty, not an option
Io int `gorethink:"io,omitempty"`

Tr time.Time `gorethink:"tr"`
To time.Time `gorethink:"to,omitempty"`

Slr []string `gorethink:"slr"`
Slo []string `gorethink:"slo,omitempty"`

Expand All @@ -84,22 +88,24 @@ type Optionals struct {
var optionalsExpected = map[string]interface{}{
"sr": "",
"omitempty": int64(0),
"tr": map[string]interface{}{"$reql_type$": "TIME", "epoch_time": 0, "timezone": "+00:00"},
"slr": []interface{}{},
"mr": map[string]interface{}{},
}

func TestOmitEmpty(t *testing.T) {
var o Optionals
o.Sw = "something"
o.Tr = time.Unix(0, 0)
o.Mr = map[string]interface{}{}
o.Mo = map[string]interface{}{}

got, err := Encode(&o)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(got, optionalsExpected) {
t.Errorf(" got: %v\nwant: %v\n", got, optionalsExpected)
if !jsonEqual(got, optionalsExpected) {
t.Errorf("\ngot: %#v\nwant: %#v\n", got, optionalsExpected)
}
}

Expand Down
10 changes: 9 additions & 1 deletion encoding/encoder_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (se *structEncoder) encode(v reflect.Value) interface{} {

for i, f := range se.fields {
fv := fieldByIndex(v, f.index)
if !fv.IsValid() || f.omitEmpty && isEmptyValue(fv) {
if !fv.IsValid() || f.omitEmpty && se.isEmptyValue(fv) {
continue
}

Expand All @@ -143,6 +143,14 @@ func (se *structEncoder) encode(v reflect.Value) interface{} {
return m
}

func (se *structEncoder) isEmptyValue(v reflect.Value) bool {
if v.Type() == timeType {
return v.Interface().(time.Time) == time.Time{}
}

return isEmptyValue(v)
}

func newStructEncoder(t reflect.Type) encoderFunc {
fields := cachedTypeFields(t)
se := &structEncoder{
Expand Down
34 changes: 20 additions & 14 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ type Node struct {
Host Host
aliases []Host

cluster *Cluster
pool *Pool
refreshTicker *time.Ticker
cluster *Cluster
pool *Pool
refreshDoneChan chan struct{}

mu sync.RWMutex
closed bool
Expand All @@ -25,14 +25,14 @@ type Node struct {

func newNode(id string, aliases []Host, cluster *Cluster, pool *Pool) *Node {
node := &Node{
ID: id,
Host: aliases[0],
aliases: aliases,
cluster: cluster,
pool: pool,
health: 100,
ID: id,
Host: aliases[0],
aliases: aliases,
cluster: cluster,
pool: pool,
health: 100,
refreshDoneChan: make(chan struct{}),
}

// Start node refresh loop
refreshInterval := cluster.opts.NodeRefreshInterval
if refreshInterval <= 0 {
Expand All @@ -41,9 +41,15 @@ func newNode(id string, aliases []Host, cluster *Cluster, pool *Pool) *Node {
}

go func() {
node.refreshTicker = time.NewTicker(refreshInterval)
for _ = range node.refreshTicker.C {
node.Refresh()

refreshTicker := time.NewTicker(refreshInterval)
for {
select {
case <-refreshTicker.C:
node.Refresh()
case <-node.refreshDoneChan:
return
}
}
}()

Expand Down Expand Up @@ -73,7 +79,7 @@ func (n *Node) Close(optArgs ...CloseOpts) error {
}
}

n.refreshTicker.Stop()
n.refreshDoneChan <- struct{}{}
if n.pool != nil {
n.pool.Close()
}
Expand Down

0 comments on commit 0fcd578

Please sign in to comment.