diff --git a/.travis.yml b/.travis.yml index ef246c2b..5c0bcc1e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,11 @@ language: go go: - - 1.7.x - - 1.8.x - 1.9.x - 1.10.x - 1.11.x - - master + - 1.12.x + - 1.13.x cache: apt diff --git a/README.md b/README.md index 0e6637e5..772f651f 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ ![RethinkDB-go Logo](https://raw.github.com/wiki/rethinkdb/rethinkdb-go/gopher-and-thinker-s.png "Golang Gopher and RethinkDB Thinker") -Current version: v5.0.1 (RethinkDB v2.3) +Current version: v5.1.0 (RethinkDB v2.4) Please note that this version of the driver only supports versions of RethinkDB using the v0.4 protocol (any versions of the driver older than RethinkDB 2.0 will not work). @@ -456,6 +456,43 @@ func TestSomething(t *testing.T) { } ``` +If you want the cursor to block on some of the response values, you can pass in +a value of type `chan interface{}` and the cursor will block until a value is +available to read on the channel. Or you can pass in a function with signature +`func() interface{}`: the cursor will call the function (which may block). Here +is the example above adapted to use a channel. + +```go +func TestSomething(t *testing.T) { + mock := r.NewMock() + ch := make(chan []interface{}) + mock.On(r.Table("people")).Return(ch, nil) + go func() { + ch <- []interface{}{ + map[string]interface{}{"id": 1, "name": "John Smith"}, + map[string]interface{}{"id": 2, "name": "Jane Smith"}, + } + ch <- []interface{}{map[string]interface{}{"id": 3, "name": "Jack Smith"}} + close(ch) + }() + cursor, err := r.Table("people").Run(mock) + if err != nil { + t.Errorf("err is: %v", err) + } + + var rows []interface{} + err = cursor.All(&rows) + if err != nil { + t.Errorf("err is: %v", err) + } + + // Test result of rows + + mock.AssertExpectations(t) +} + +``` + The mocking implementation is based on amazing https://github.com/stretchr/testify library, thanks to @stretchr for their awesome work! ## Benchmarks @@ -464,17 +501,17 @@ Everyone wants their project's benchmarks to be speedy. And while we know that R Thanks to @jaredfolkins for the contribution. -| Type | Value | -| --- | --- | -| **Model Name** | MacBook Pro | -| **Model Identifier** | MacBookPro11,3 | -| **Processor Name** | Intel Core i7 | -| **Processor Speed** | 2.3 GHz | -| **Number of Processors** | 1 | -| **Total Number of Cores** | 4 | -| **L2 Cache (per Core)** | 256 KB | -| **L3 Cache** | 6 MB | -| **Memory** | 16 GB | +| Type | Value | +| ------------------------- | -------------- | +| **Model Name** | MacBook Pro | +| **Model Identifier** | MacBookPro11,3 | +| **Processor Name** | Intel Core i7 | +| **Processor Speed** | 2.3 GHz | +| **Number of Processors** | 1 | +| **Total Number of Cores** | 4 | +| **L2 Cache (per Core)** | 256 KB | +| **L3 Cache** | 6 MB | +| **Memory** | 16 GB | ```bash BenchmarkBatch200RandomWrites 20 557227775 ns/op diff --git a/connection.go b/connection.go index 4c64b2d9..82689aaa 100644 --- a/connection.go +++ b/connection.go @@ -464,7 +464,7 @@ func (c *Connection) processResponse(ctx context.Context, q Query, response *Res case p.Response_WAIT_COMPLETE: return c.processWaitResponse(response) default: - return nil, nil, RQLDriverError{rqlError("Unexpected response type: %v")} + return nil, nil, RQLDriverError{rqlError(fmt.Sprintf("Unexpected response type: %v", response.Type.String()))} } } diff --git a/cursor.go b/cursor.go index 0566464b..60463c8e 100644 --- a/cursor.go +++ b/cursor.go @@ -227,7 +227,6 @@ func (c *Cursor) nextLocked(dest interface{}, progressCursor bool) (bool, error) if progressCursor { c.buffer = c.buffer[1:] } - err := encoding.Decode(dest, data) if err != nil { return false, err @@ -494,7 +493,6 @@ func (c *Cursor) Listen(channel interface{}) { if !c.Next(elemp.Interface()) { break } - channelv.Send(elemp.Elem()) } diff --git a/cursor_test.go b/cursor_test.go new file mode 100644 index 00000000..1094babe --- /dev/null +++ b/cursor_test.go @@ -0,0 +1,34 @@ +package rethinkdb + +import ( + test "gopkg.in/check.v1" + "gopkg.in/rethinkdb/rethinkdb-go.v5/internal/integration/tests" +) + +type CursorSuite struct{} + +var _ = test.Suite(&CursorSuite{}) + +func (s *CursorSuite) TestCursor_One_Ok(c *test.C) { + data := map[string]interface{}{ + "A": 1, + "B": true, + } + + mock := NewMock() + ch := make(chan []interface{}) + mock.On(DB("test").Table("test")).Return(ch, nil) + go func() { + ch <- []interface{}{data} + close(ch) + }() + res, err := DB("test").Table("test").Run(mock) + c.Assert(err, test.IsNil) + + var response interface{} + err = res.One(&response) + + c.Assert(err, test.IsNil) + c.Assert(response, tests.JsonEquals, data) + mock.AssertExpectations(c) +} diff --git a/encoding/decoder_test.go b/encoding/decoder_test.go index d98e4137..9cbd7c35 100644 --- a/encoding/decoder_test.go +++ b/encoding/decoder_test.go @@ -433,7 +433,7 @@ func TestDecodeUnmarshalerPointer(t *testing.T) { t.Errorf("got error %v, expected nil", err) } if !jsonEqual(out, want) { - t.Errorf("got %q, want %q", out, want) + t.Errorf("got %+v, want %+v", out, want) } } @@ -587,7 +587,7 @@ func TestDecodeCustomTypeEncodingPointer(t *testing.T) { t.Errorf("got error %v, expected nil", err) } if !jsonEqual(out, want) { - t.Errorf("got %q, want %q", out, want) + t.Errorf("got %+v, want %+v", out, want) } } diff --git a/internal/integration/reql_tests/gorethink_test.go b/internal/integration/reql_tests/gorethink_test.go index 9811789c..3f08e2ec 100644 --- a/internal/integration/reql_tests/gorethink_test.go +++ b/internal/integration/reql_tests/gorethink_test.go @@ -5,6 +5,7 @@ package reql_tests import ( "flag" "os" + "runtime" r "gopkg.in/rethinkdb/rethinkdb-go.v5" ) @@ -12,7 +13,11 @@ import ( var url string func init() { - flag.Parse() + // Fixing test.testlogfile parsing error on Go 1.13+. + if runtime.Version() < "go1.13" { + flag.Parse() + } + r.SetVerbose(true) // If the test is being run by wercker look for the rethink url diff --git a/internal/integration/reql_tests/reql_bitwise_test.go b/internal/integration/reql_tests/reql_bitwise_test.go new file mode 100644 index 00000000..857e6e72 --- /dev/null +++ b/internal/integration/reql_tests/reql_bitwise_test.go @@ -0,0 +1,165 @@ +package reql_tests + +import ( + "github.com/stretchr/testify/suite" + r "gopkg.in/rethinkdb/rethinkdb-go.v5" + "testing" +) + +// Test edge cases of bitwise operations +func TestBitwiseSuite(t *testing.T) { + suite.Run(t, new(BitwiseSuite)) +} + +type BitwiseSuite struct { + suite.Suite + + session *r.Session +} + +func (suite *BitwiseSuite) SetupTest() { + suite.T().Log("Setting up BitwiseSuite") + + session, err := r.Connect(r.ConnectOpts{ + Address: url, + }) + suite.Require().NoError(err, "Error returned when connecting to server") + suite.session = session +} + +func (suite *BitwiseSuite) TearDownSuite() { + suite.T().Log("Tearing down BitwiseSuite") + + if suite.session != nil { + suite.session.Close() + } +} + +func (suite *BitwiseSuite) TestCases() { + suite.T().Log("Running BitwiseSuite: Test edge cases of bitwise operations") + + runOpts := r.RunOpts{ + GeometryFormat: "raw", + GroupFormat: "map", + } + + { + var q = r.BitAnd(3, 5) + var expected_ = 3 & 5 + + suite.T().Logf("About to run line #1: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #1") + } + + { + var q = r.Expr(3).BitAnd(5) + var expected_ = 3 & 5 + + suite.T().Logf("About to run line #2: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #2") + } + + { + var q = r.BitOr(3, 5) + var expected_ = 3 | 5 + + suite.T().Logf("About to run line #3: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #3") + } + + { + var q = r.Expr(3).BitOr(5) + var expected_ = 3 | 5 + + suite.T().Logf("About to run line #4: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #4") + } + + { + var q = r.BitXor(3, 5) + var expected_ = 3 ^ 5 + + suite.T().Logf("About to run line #5: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #5") + } + + { + var q = r.Expr(3).BitXor(5) + var expected_ = 3 ^ 5 + + suite.T().Logf("About to run line #6: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #6") + } + + { + var q = r.BitNot(3) + var expected_ = ^3 + + suite.T().Logf("About to run line #7: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #7") + } + + { + var q = r.Expr(3).BitNot() + var expected_ = ^3 + + suite.T().Logf("About to run line #8: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #8") + } + + { + var q = r.BitSal(3, 5) + var expected_ = 3 << 5 + + suite.T().Logf("About to run line #9: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #9") + } + + { + var q = r.Expr(3).BitSal(5) + var expected_ = 3 << 5 + + suite.T().Logf("About to run line #10: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #10") + } + + { + var q = r.BitSar(3, 5) + var expected_ = 3 >> 5 + + suite.T().Logf("About to run line #11: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #11") + } + + { + var q = r.Expr(3).BitSar(5) + var expected_ = 3 >> 5 + + suite.T().Logf("About to run line #12: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #12") + } +} diff --git a/internal/integration/reql_tests/reql_meta_table_test.go b/internal/integration/reql_tests/reql_meta_table_test.go index b57a8dd6..39e5da28 100644 --- a/internal/integration/reql_tests/reql_meta_table_test.go +++ b/internal/integration/reql_tests/reql_meta_table_test.go @@ -83,7 +83,7 @@ func (suite *MetaTableSuite) TestCases() { { // meta/table.yaml line #9 /* ({'type':'DB','name':'rethinkdb','id':null}) */ - var expected_ map[interface{}]interface{} = map[interface{}]interface{}{"type": "DB", "name": "rethinkdb", "id": nil} + var expected_ = compare.PartialMatch(map[interface{}]interface{}{"type": "DB", "name": "rethinkdb"}) /* r.db('rethinkdb').info() */ suite.T().Log("About to run line #9: r.DB('rethinkdb').Info()") @@ -100,7 +100,7 @@ func (suite *MetaTableSuite) TestCases() { /* partial({'db':{'type':'DB','name':'rethinkdb','id':null}, 'type':'TABLE','id':null,'name':'stats', 'indexes':[],'primary_key':'id'}) */ - var expected_ compare.Expected = compare.PartialMatch(map[interface{}]interface{}{"db": map[interface{}]interface{}{"type": "DB", "name": "rethinkdb", "id": nil}, "type": "TABLE", "id": nil, "name": "stats", "indexes": []interface{}{}, "primary_key": "id"}) + var expected_ compare.Expected = compare.PartialMatch(map[interface{}]interface{}{"db": map[interface{}]interface{}{"type": "DB", "name": "rethinkdb"}, "type": "TABLE", "name": "stats", "indexes": []interface{}{}, "primary_key": "id"}) /* r.db('rethinkdb').table('stats').info() */ suite.T().Log("About to run line #12: r.DB('rethinkdb').Table('stats').Info()") diff --git a/internal/integration/reql_tests/reql_selection_test.go b/internal/integration/reql_tests/reql_selection_test.go index a3a2b576..37aec602 100644 --- a/internal/integration/reql_tests/reql_selection_test.go +++ b/internal/integration/reql_tests/reql_selection_test.go @@ -250,7 +250,7 @@ func (suite *SelectionSuite) TestCases() { { // selection.yaml line #86 /* err("ReqlQueryLogicError", 'Database name `%` invalid (Use A-Za-z0-9_ only).', [0]) */ - var expected_ Err = err("ReqlQueryLogicError", "Database name `%` invalid (Use A-Za-z0-9_ only).") + var expected_ Err = err("ReqlQueryLogicError", "Database name `%` invalid (Use A-Z, a-z, 0-9, _ and - only).") /* r.db('%') */ suite.T().Log("About to run line #86: r.DB('%')") @@ -265,7 +265,7 @@ func (suite *SelectionSuite) TestCases() { { // selection.yaml line #89 /* err("ReqlQueryLogicError", 'Table name `%` invalid (Use A-Za-z0-9_ only).', [0]) */ - var expected_ Err = err("ReqlQueryLogicError", "Table name `%` invalid (Use A-Za-z0-9_ only).") + var expected_ Err = err("ReqlQueryLogicError", "Table name `%` invalid (Use A-Z, a-z, 0-9, _ and - only).") /* r.db('test').table('%') */ suite.T().Log("About to run line #89: r.DB('test').Table('%')") diff --git a/internal/integration/reql_tests/reql_sindex_api_test.go b/internal/integration/reql_tests/reql_sindex_api_test.go index 0731e00f..5f6b8869 100644 --- a/internal/integration/reql_tests/reql_sindex_api_test.go +++ b/internal/integration/reql_tests/reql_sindex_api_test.go @@ -1087,7 +1087,7 @@ func (suite *SindexApiSuite) TestCases() { { // sindex/api.yaml line #286 /* err('ReqlQueryLogicError', 'Indexed order_by can only be performed on a TABLE or TABLE_SLICE.', [0]) */ - var expected_ Err = err("ReqlQueryLogicError", "Indexed order_by can only be performed on a TABLE or TABLE_SLICE.") + var expected_ Err = err("ReqlQueryLogicError", "Indexed order_by can only be performed on a TABLE or TABLE_SLICE. Make sure order_by comes before any transformations (such as map) or filters.") /* table_test_sindex_api.get_all(1, index='bi').order_by(index='id').map(lambda x:x['id']) */ suite.T().Log("About to run line #286: table_test_sindex_api.GetAll(1).OptArgs(r.GetAllOpts{Index: 'bi', }).OrderBy().OptArgs(r.OrderByOpts{Index: 'id', }).Map(func(x r.Term) interface{} { return x.AtIndex('id')})") diff --git a/internal/integration/reql_tests/reql_timeout_test.go b/internal/integration/reql_tests/reql_timeout_test.go index bf21a254..14fdcbd8 100644 --- a/internal/integration/reql_tests/reql_timeout_test.go +++ b/internal/integration/reql_tests/reql_timeout_test.go @@ -59,6 +59,7 @@ func (suite *TimeoutSuite) TestCases() { suite.T().Log("Running TimeoutSuite: Tests timeouts.") { + // r.JS default timeout is 5 sec // timeout.yaml line #5 /* err("ReqlQueryLogicError", "JavaScript query `while(true) {}` timed out after 5.000 seconds.", [0]) */ var expected_ Err = err("ReqlQueryLogicError", "JavaScript query `while(true) {}` timed out after 5.000 seconds.") diff --git a/internal/integration/reql_tests/reql_transformation_test.go b/internal/integration/reql_tests/reql_transformation_test.go index 3775cf2f..9363601c 100644 --- a/internal/integration/reql_tests/reql_transformation_test.go +++ b/internal/integration/reql_tests/reql_transformation_test.go @@ -551,7 +551,7 @@ func (suite *TransformationSuite) TestCases() { { // transformation.yaml line #163 /* err('ReqlQueryLogicError', 'Indexed order_by can only be performed on a TABLE or TABLE_SLICE.', [0]) */ - var expected_ Err = err("ReqlQueryLogicError", "Indexed order_by can only be performed on a TABLE or TABLE_SLICE.") + var expected_ Err = err("ReqlQueryLogicError", "Indexed order_by can only be performed on a TABLE or TABLE_SLICE. Make sure order_by comes before any transformations (such as map) or filters.") /* table_test_transformation.order_by('id').order_by(index='id')[0] */ suite.T().Log("About to run line #163: table_test_transformation.OrderBy('id').OrderBy().OptArgs(r.OrderByOpts{Index: 'id', }).AtIndex(0)") @@ -566,7 +566,7 @@ func (suite *TransformationSuite) TestCases() { { // transformation.yaml line #168 /* err('ReqlQueryLogicError', 'Indexed order_by can only be performed on a TABLE or TABLE_SLICE.', [0]) */ - var expected_ Err = err("ReqlQueryLogicError", "Indexed order_by can only be performed on a TABLE or TABLE_SLICE.") + var expected_ Err = err("ReqlQueryLogicError", "Indexed order_by can only be performed on a TABLE or TABLE_SLICE. Make sure order_by comes before any transformations (such as map) or filters.") /* table_test_transformation.order_by('id').order_by(index='a')[0] */ suite.T().Log("About to run line #168: table_test_transformation.OrderBy('id').OrderBy().OptArgs(r.OrderByOpts{Index: 'a', }).AtIndex(0)") @@ -806,7 +806,7 @@ func (suite *TransformationSuite) TestCases() { { // transformation.yaml line #234 /* err('ReqlQueryLogicError', 'Indexed order_by can only be performed on a TABLE or TABLE_SLICE.', [0]) */ - var expected_ Err = err("ReqlQueryLogicError", "Indexed order_by can only be performed on a TABLE or TABLE_SLICE.") + var expected_ Err = err("ReqlQueryLogicError", "Indexed order_by can only be performed on a TABLE or TABLE_SLICE. Make sure order_by comes before any transformations (such as map) or filters.") /* table_test_transformation.order_by('missing').order_by(index='id').nth(0) */ suite.T().Log("About to run line #234: table_test_transformation.OrderBy('missing').OrderBy().OptArgs(r.OrderByOpts{Index: 'id', }).Nth(0)") diff --git a/internal/integration/reql_tests/reql_writehook_test.go b/internal/integration/reql_tests/reql_writehook_test.go new file mode 100644 index 00000000..3a77d8c4 --- /dev/null +++ b/internal/integration/reql_tests/reql_writehook_test.go @@ -0,0 +1,240 @@ +package reql_tests + +import ( + "github.com/stretchr/testify/suite" + r "gopkg.in/rethinkdb/rethinkdb-go.v5" + "gopkg.in/rethinkdb/rethinkdb-go.v5/internal/compare" + "testing" +) + +// Test edge cases of write hook +func TestWriteHookSuite(t *testing.T) { + suite.Run(t, new(WriteHook)) +} + +type WriteHook struct { + suite.Suite + + session *r.Session +} + +func (suite *WriteHook) SetupTest() { + suite.T().Log("Setting up WriteHook") + + session, err := r.Connect(r.ConnectOpts{ + Address: url, + }) + suite.Require().NoError(err, "Error returned when connecting to server") + suite.session = session + + r.DBDrop("db_hook").Exec(suite.session) + err = r.DBCreate("db_hook").Exec(suite.session) + suite.Require().NoError(err) + err = r.DB("db_hook").Wait().Exec(suite.session) + suite.Require().NoError(err) + + r.DB("db_hook").TableDrop("test").Exec(suite.session) + err = r.DB("db_hook").TableCreate("test").Exec(suite.session) + suite.Require().NoError(err) + err = r.DB("db_hook").Table("test").Wait().Exec(suite.session) + suite.Require().NoError(err) +} + +func (suite *WriteHook) TearDownSuite() { + suite.T().Log("Tearing down WriteHook") + + if suite.session != nil { + err := r.DBDrop("db_hook").Exec(suite.session) + suite.Require().NoError(err) + + suite.session.Close() + } +} + +func (suite *WriteHook) TestCases() { + suite.T().Log("Running WriteHook: Test edge cases of write hooks") + + runOpts := r.RunOpts{ + GeometryFormat: "raw", + GroupFormat: "map", + } + + table := r.DB("db_hook").Table("test") + wcField := "write_counter" + + { + var q = table.SetWriteHook(func(id r.Term, oldVal r.Term, newVal r.Term) r.Term { + return newVal + }) + var expected_ = compare.PartialMatch(map[string]interface{}{"created": 1}) + + suite.T().Logf("About to run line #1: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #1") + } + + { + var q = table.SetWriteHook(nil) + var expected_ = compare.PartialMatch(map[string]interface{}{"deleted": 1}) + + suite.T().Logf("About to run line #2: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #2") + } + + { + var q = table.SetWriteHook(func(id r.Term, oldVal r.Term, newVal r.Term) r.Term { + return newVal + }) + var expected_ = compare.PartialMatch(map[string]interface{}{"created": 1}) + + suite.T().Logf("About to run line #3: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #3") + } + + { + var q = table.SetWriteHook(func(id r.Term, oldVal r.Term, newVal r.Term) r.Term { + return r.Branch(oldVal.And(newVal), + newVal.Merge(map[string]r.Term{wcField: oldVal.Field(wcField).Add(1)}), + newVal, + newVal.Merge(r.Expr(map[string]int{wcField: 1})), + r.Error("no delete"), + ) + }) + var expected_ = compare.PartialMatch(map[string]interface{}{"replaced": 1}) + + suite.T().Logf("About to run line #4: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #4") + } + + { + var q = table.Insert(map[string]interface{}{"id": 1}, r.InsertOpts{ReturnChanges: true}) + var expected_ = compare.PartialMatch(map[string]interface{}{ + "changes": []interface{}{ + map[string]interface{}{ + "new_val": map[string]interface{}{"id": 1, wcField: 1}, + "old_val": interface{}(nil), + }}, + "inserted": 1, + }) + + suite.T().Logf("About to run line #5: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #5") + } + + { + var q = table.Replace(map[string]interface{}{"id": 1, "text": "abc"}, r.ReplaceOpts{ReturnChanges: true}) + var expected_ = compare.PartialMatch(map[string]interface{}{ + "changes": []interface{}{ + map[string]interface{}{ + "new_val": map[string]interface{}{"id": 1, wcField: 2, "text": "abc"}, + "old_val": map[string]interface{}{"id": 1, wcField: 1}, + }}, + "replaced": 1, + }) + + suite.T().Logf("About to run line #6: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #6") + } + + { + var q = table.Get(1).Update(map[string]interface{}{"text": "def"}, r.UpdateOpts{ReturnChanges: true}) + var expected_ = compare.PartialMatch(map[string]interface{}{ + "changes": []interface{}{ + map[string]interface{}{ + "new_val": map[string]interface{}{"id": 1, wcField: 3, "text": "def"}, + "old_val": map[string]interface{}{"id": 1, wcField: 2, "text": "abc"}, + }}, + "replaced": 1, + }) + + suite.T().Logf("About to run line #7: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #7") + } + + { + var q = table.Get(1).Delete() + var expected_ = compare.PartialMatch(map[string]interface{}{ + "first_error": "Error in write hook: no delete", + "errors": 1, + }) + + suite.T().Logf("About to run line #8: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #8") + } + + { + var q = table.Insert(map[string]interface{}{"id": 2}, r.InsertOpts{ReturnChanges: true, IgnoreWriteHook: true}) + var expected_ = compare.PartialMatch(map[string]interface{}{ + "changes": []interface{}{ + map[string]interface{}{ + "new_val": map[string]interface{}{"id": 2}, + "old_val": interface{}(nil), + }}, + "inserted": 1, + }) + + suite.T().Logf("About to run line #9: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #9") + } + + { + var q = table.Replace(map[string]interface{}{"id": 2, "text": "abc"}, r.ReplaceOpts{ReturnChanges: true, IgnoreWriteHook: true}) + var expected_ = compare.PartialMatch(map[string]interface{}{ + "changes": []interface{}{ + map[string]interface{}{ + "new_val": map[string]interface{}{"id": 2, "text": "abc"}, + "old_val": map[string]interface{}{"id": 2}, + }}, + "replaced": 1, + }) + + suite.T().Logf("About to run line #10: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #10") + } + + { + var q = table.Get(2).Update(map[string]interface{}{"text": "def"}, r.UpdateOpts{ReturnChanges: true, IgnoreWriteHook: true}) + var expected_ = compare.PartialMatch(map[string]interface{}{ + "changes": []interface{}{ + map[string]interface{}{ + "new_val": map[string]interface{}{"id": 2, "text": "def"}, + "old_val": map[string]interface{}{"id": 2, "text": "abc"}, + }}, + "replaced": 1, + }) + + suite.T().Logf("About to run line #11: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #11") + } + + { + var q = table.Get(2).Delete(r.DeleteOpts{IgnoreWriteHook: true}) + var expected_ = compare.PartialMatch(map[string]interface{}{"deleted": 1}) + + suite.T().Logf("About to run line #12: %v", q) + + runAndAssert(suite.Suite, expected_, q, suite.session, runOpts) + suite.T().Log("Finished running line #12") + } +} diff --git a/internal/integration/reql_tests/template.go.tpl b/internal/integration/reql_tests/template.go.tpl index fec3c3be..f649f234 100644 --- a/internal/integration/reql_tests/template.go.tpl +++ b/internal/integration/reql_tests/template.go.tpl @@ -5,8 +5,8 @@ import ( "time" "github.com/stretchr/testify/suite" - r "gopkg.in/rethinkdb/rethinkdb-go.v4" - "gopkg.in/rethinkdb/rethinkdb-go.v4/internal/compare" + r "gopkg.in/rethinkdb/rethinkdb-go.v5" + "gopkg.in/rethinkdb/rethinkdb-go.v5/internal/compare" ) // ${description} diff --git a/internal/integration/tests/example_query_write_test.go b/internal/integration/tests/example_query_write_test.go index 68f99df1..02d55bc7 100644 --- a/internal/integration/tests/example_query_write_test.go +++ b/internal/integration/tests/example_query_write_test.go @@ -227,3 +227,24 @@ func ExampleTerm_Delete_many() { // Output: // 4 rows deleted } + +func ExampleTerm_SetWriteHook() { + resp, err := r.DB("test").Table("test").SetWriteHook( + func(id r.Term, oldVal r.Term, newVal r.Term) r.Term { + return r.Branch(oldVal.And(newVal), + newVal.Merge(map[string]r.Term{"write_counter": oldVal.Field("write_counter").Add(1)}), + newVal, + newVal.Merge(r.Expr(map[string]int{"write_counter": 1})), + nil, + ) + }).RunWrite(session) + + if err != nil { + fmt.Print(err) + return + } + + fmt.Printf("%d hook created", resp.Created) + // Output: + // 1 hook created +} diff --git a/internal/integration/tests/rethinkdb_test.go b/internal/integration/tests/rethinkdb_test.go index 8f77ff05..70dd4eb0 100644 --- a/internal/integration/tests/rethinkdb_test.go +++ b/internal/integration/tests/rethinkdb_test.go @@ -6,6 +6,7 @@ import ( "fmt" "math/rand" "os" + "runtime" "testing" "time" @@ -18,7 +19,11 @@ var testdata = flag.Bool("rethinkdb.testdata", true, "create test data") var url, url1, url2, url3, db, authKey string func init() { - flag.Parse() + // Fixing test.testlogfile parsing error on Go 1.13+. + if runtime.Version() < "go1.13" { + flag.Parse() + } + r.SetVerbose(true) // If the test is being run by wercker look for the rethink url diff --git a/mock.go b/mock.go index 7613450e..5b0d26c3 100644 --- a/mock.go +++ b/mock.go @@ -1,8 +1,12 @@ package rethinkdb import ( + "encoding/binary" "encoding/json" "fmt" + "gopkg.in/check.v1" + "gopkg.in/rethinkdb/rethinkdb-go.v5/encoding" + "net" "reflect" "sync" "time" @@ -103,6 +107,19 @@ func (mq *MockQuery) unlock() { // Return specifies the return arguments for the expectation. // // mock.On(r.Table("test")).Return(nil, errors.New("failed")) +// +// values of `chan []interface{}` type will turn to delayed data that produce data +// when there is an elements available on the channel. These elements are chunk of responses. +// Values of `func() []interface{}` type will produce data by calling the function. E.g. +// Closing channel or returning nil from func means end of data. +// +// f := func() []interface{} { return []interface{}{1, 2} } +// mock.On(r.Table("test1")).Return(f) +// +// ch := make(chan []interface{}) +// mock.On(r.Table("test1")).Return(ch) +// +// Running the query above will block until a value is pushed onto ch. func (mq *MockQuery) Return(response interface{}, err error) *MockQuery { mq.lock() defer mq.unlock() @@ -328,19 +345,31 @@ func (m *Mock) Query(ctx context.Context, q Query) (*Cursor, error) { return nil, query.Error } + if ctx == nil { + ctx = context.Background() + } + + conn := newConnection(newMockConn(query.Response), "mock", &ConnectOpts{}) + + query.Query.Type = p.Query_CONTINUE + query.Query.Token = conn.nextToken() + // Build cursor and return - c := newCursor(ctx, nil, "", query.Query.Token, query.Query.Term, query.Query.Opts) + c := newCursor(ctx, conn, "", query.Query.Token, query.Query.Term, query.Query.Opts) c.finished = true c.fetching = false c.isAtom = true + c.finished = false + c.releaseConn = func() error { return conn.Close() } - responseVal := reflect.ValueOf(query.Response) - if responseVal.Kind() == reflect.Slice || responseVal.Kind() == reflect.Array { - for i := 0; i < responseVal.Len(); i++ { - c.buffer = append(c.buffer, responseVal.Index(i).Interface()) - } - } else { - c.buffer = append(c.buffer, query.Response) + conn.cursors[query.Query.Token] = c + conn.runConnection() + + c.mu.Lock() + err := c.fetchMore() + c.mu.Unlock() + if err != nil { + return nil, err } return c, nil @@ -393,3 +422,106 @@ func (m *Mock) queries() []MockQuery { defer m.mu.Unlock() return append([]MockQuery{}, m.Queries...) } + +type mockConn struct { + c *check.C + mu sync.Mutex + value []byte + tokens chan int64 + valueGetter func() []interface{} +} + +func newMockConn(response interface{}) *mockConn { + c := &mockConn{tokens: make(chan int64, 1)} + switch g := response.(type) { + case chan []interface{}: + c.valueGetter = func() []interface{} { return <-g } + case func() []interface{}: + c.valueGetter = g + default: + responseVal := reflect.ValueOf(response) + if responseVal.Kind() == reflect.Slice || responseVal.Kind() == reflect.Array { + responses := make([]interface{}, responseVal.Len()) + for i := 0; i < responseVal.Len(); i++ { + responses[i] = responseVal.Index(i).Interface() + } + c.valueGetter = funcGetter(responses) + } else { + c.valueGetter = funcGetter([]interface{}{response}) + } + } + return c +} + +func funcGetter(responses []interface{}) func() []interface{} { + done := false + return func() []interface{} { + if done { + return nil + } + done = true + return responses + } +} + +func (c *mockConn) Read(b []byte) (n int, err error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.value == nil { + values := c.valueGetter() + + jresps := make([]json.RawMessage, len(values)) + for i := range values { + coded, err := encoding.Encode(values[i]) + if err != nil { + panic(fmt.Sprintf("failed to encode response: %v", err)) + } + jresps[i], err = json.Marshal(coded) + if err != nil { + panic(fmt.Sprintf("failed to encode response: %v", err)) + } + } + + token := <-c.tokens + resp := Response{ + Token: token, + Responses: jresps, + Type: p.Response_SUCCESS_PARTIAL, + } + if values == nil { + resp.Type = p.Response_SUCCESS_SEQUENCE + } + + c.value, err = json.Marshal(resp) + if err != nil { + panic(fmt.Sprintf("failed to encode response: %v", err)) + } + + if len(b) != respHeaderLen { + panic("wrong header len") + } + binary.LittleEndian.PutUint64(b[:8], uint64(token)) + binary.LittleEndian.PutUint32(b[8:], uint32(len(c.value))) + return len(b), nil + } else { + copy(b, c.value) + c.value = nil + return len(b), nil + } +} + +func (c *mockConn) Write(b []byte) (n int, err error) { + if len(b) < 8 { + panic("bad socket write") + } + token := int64(binary.LittleEndian.Uint64(b[:8])) + c.tokens <- token + return len(b), nil +} +func (c *mockConn) Close() error { return nil } +func (c *mockConn) LocalAddr() net.Addr { panic("not implemented") } +func (c *mockConn) RemoteAddr() net.Addr { panic("not implemented") } +func (c *mockConn) SetDeadline(t time.Time) error { panic("not implemented") } +func (c *mockConn) SetReadDeadline(t time.Time) error { panic("not implemented") } +func (c *mockConn) SetWriteDeadline(t time.Time) error { panic("not implemented") } diff --git a/mock_test.go b/mock_test.go index 1d4c0acf..e1baa13b 100644 --- a/mock_test.go +++ b/mock_test.go @@ -3,9 +3,10 @@ package rethinkdb import ( "fmt" + "testing" + test "gopkg.in/check.v1" "gopkg.in/rethinkdb/rethinkdb-go.v5/internal/integration/tests" - "testing" ) // Hook up gocheck into the gotest runner. @@ -56,8 +57,6 @@ func (s *MockSuite) TestMockRunSuccessSingleResult(c *test.C) { c.Assert(err, test.IsNil) c.Assert(response, tests.JsonEquals, map[string]interface{}{"id": "mocked"}) mock.AssertExpectations(c) - - res.Close() } func (s *MockSuite) TestMockRunSuccessMultipleResults(c *test.C) { @@ -75,8 +74,49 @@ func (s *MockSuite) TestMockRunSuccessMultipleResults(c *test.C) { c.Assert(err, test.IsNil) c.Assert(response, tests.JsonEquals, []interface{}{map[string]interface{}{"id": "mocked"}}) mock.AssertExpectations(c) +} - res.Close() +func (s *MockSuite) TestMockRunSuccessChannel(c *test.C) { + mock := NewMock() + ch := make(chan []interface{}) + mock.On(DB("test").Table("test")).Return(ch, nil) + go func() { + ch <- []interface{}{1, 2} + ch <- []interface{}{3} + ch <- []interface{}{4} + close(ch) + }() + res, err := DB("test").Table("test").Run(mock) + c.Assert(err, test.IsNil) + + var response []interface{} + err = res.All(&response) + + c.Assert(err, test.IsNil) + c.Assert(response, tests.JsonEquals, []interface{}{1, 2, 3, 4}) + mock.AssertExpectations(c) +} + +func (s *MockSuite) TestMockRunSuccessFunction(c *test.C) { + mock := NewMock() + n := 0 + f := func() []interface{} { + n++ + if n == 4 { + return nil + } + return []interface{}{n} + } + mock.On(DB("test").Table("test")).Return(f, nil) + res, err := DB("test").Table("test").Run(mock) + c.Assert(err, test.IsNil) + + var response []interface{} + err = res.All(&response) + + c.Assert(err, test.IsNil) + c.Assert(response, tests.JsonEquals, []interface{}{1, 2, 3}) + mock.AssertExpectations(c) } func (s *MockSuite) TestMockRunSuccessMultipleResults_type(c *test.C) { @@ -310,6 +350,27 @@ func (s *MockSuite) TestMockAnything(c *test.C) { mock.AssertExpectations(c) } +func (s *MockSuite) TestMockRethinkStructsRunWrite(c *test.C) { + mock := NewMock() + mock.On(DB("test").Table("test").Update(map[string]int{"val": 1})).Return(WriteResponse{ + Replaced: 1, + Changes: []ChangeResponse{ + {NewValue: map[string]interface{}{"val": 1}, OldValue: map[string]interface{}{"val": 0}}, + }, + }, nil) + + res, err := DB("test").Table("test").Update(map[string]int{"val": 1}).RunWrite(mock) + c.Assert(err, test.IsNil) + + c.Assert(res, tests.JsonEquals, WriteResponse{ + Replaced: 1, + Changes: []ChangeResponse{ + {NewValue: map[string]interface{}{"val": 1}, OldValue: map[string]interface{}{"val": 0}}, + }, + }) + mock.AssertExpectations(c) +} + type simpleTestingT struct { failed bool } diff --git a/pool.go b/pool.go index fe349696..e84ac5e0 100644 --- a/pool.go +++ b/pool.go @@ -2,9 +2,8 @@ package rethinkdb import ( "errors" - "fmt" - "net" "sync" + "sync/atomic" "golang.org/x/net/context" "gopkg.in/fatih/pool.v2" @@ -19,7 +18,8 @@ type Pool struct { host Host opts *ConnectOpts - pool pool.Pool + conns []*Connection + pointer int32 mu sync.RWMutex // protects following fields closed bool @@ -36,36 +36,31 @@ func NewPool(host Host, opts *ConnectOpts) (*Pool, error) { maxOpen := opts.MaxOpen if maxOpen <= 0 { - maxOpen = 2 + maxOpen = 1 } - p, err := pool.NewChannelPool(initialCap, maxOpen, func() (net.Conn, error) { - conn, err := NewConnection(host.String(), opts) + conns := make([]*Connection, maxOpen) + var err error + for i := range conns { + conns[i], err = NewConnection(host.String(), opts) if err != nil { return nil, err } - - return conn, err - }) - if err != nil { - return nil, err } return &Pool{ - pool: p, - host: host, - opts: opts, + conns: conns, + pointer: -1, + host: host, + opts: opts, }, nil } // Ping verifies a connection to the database is still alive, // establishing a connection if necessary. func (p *Pool) Ping() error { - _, pc, err := p.conn() - if err != nil { - return err - } - return pc.Close() + _, _, err := p.conn() + return err } // Close closes the database, releasing any open resources. @@ -79,37 +74,32 @@ func (p *Pool) Close() error { return nil } - p.pool.Close() + for _, c := range p.conns { + err := c.Close() + if err != nil { + return err + } + } return nil } func (p *Pool) conn() (*Connection, *pool.PoolConn, error) { p.mu.RLock() - defer p.mu.RUnlock() if p.closed { + p.mu.RUnlock() return nil, nil, errPoolClosed } + p.mu.RUnlock() - nc, err := p.pool.Get() - if err != nil { - return nil, nil, err - } - - pc, ok := nc.(*pool.PoolConn) - if !ok { - // This should never happen! - return nil, nil, fmt.Errorf("Invalid connection in pool") - } - - conn, ok := pc.Conn.(*Connection) - if !ok { - // This should never happen! - return nil, nil, fmt.Errorf("Invalid connection in pool") + pos := atomic.AddInt32(&p.pointer, 1) + if pos == int32(len(p.conns)) { + atomic.StoreInt32(&p.pointer, 0) } + pos = pos % int32(len(p.conns)) - return conn, pc, nil + return p.conns[pos], nil, nil } // SetInitialPoolCap sets the initial capacity of the connection pool. @@ -138,39 +128,23 @@ func (p *Pool) SetMaxOpenConns(n int) { // Exec executes a query without waiting for any response. func (p *Pool) Exec(ctx context.Context, q Query) error { - c, pc, err := p.conn() + c, _, err := p.conn() if err != nil { return err } - defer pc.Close() _, _, err = c.Query(ctx, q) - - if c.isBad() { - pc.MarkUnusable() - } - return err } // Query executes a query and waits for the response func (p *Pool) Query(ctx context.Context, q Query) (*Cursor, error) { - c, pc, err := p.conn() + c, _, err := p.conn() if err != nil { return nil, err } _, cursor, err := c.Query(ctx, q) - - if err == nil { - cursor.releaseConn = releaseConn(c, pc) - } else { - if c.isBad() { - pc.MarkUnusable() - } - pc.Close() - } - return cursor, err } @@ -178,27 +152,11 @@ func (p *Pool) Query(ctx context.Context, q Query) (*Cursor, error) { func (p *Pool) Server() (ServerResponse, error) { var response ServerResponse - c, pc, err := p.conn() + c, _, err := p.conn() if err != nil { return response, err } - defer pc.Close() response, err = c.Server() - - if c.isBad() { - pc.MarkUnusable() - } - return response, err } - -func releaseConn(c *Connection, pc *pool.PoolConn) func() error { - return func() error { - if c.isBad() { - pc.MarkUnusable() - } - - return pc.Close() - } -} diff --git a/query_bitwise.go b/query_bitwise.go new file mode 100644 index 00000000..71e9445c --- /dev/null +++ b/query_bitwise.go @@ -0,0 +1,87 @@ +package rethinkdb + +import ( + p "gopkg.in/rethinkdb/rethinkdb-go.v5/ql2" +) + +// Rethinkdb proposal: https://github.com/rethinkdb/rethinkdb/pull/6534 + +// Or performs a bitwise And. +func (t Term) BitAnd(args ...interface{}) Term { + return constructMethodTerm(t, "BitAnd", p.Term_BIT_AND, args, map[string]interface{}{}) +} + +// Or performs a bitwise And. +func BitAnd(args ...interface{}) Term { + return constructRootTerm("BitAnd", p.Term_BIT_AND, args, map[string]interface{}{}) +} + +// Or performs a bitwise Or. +func (t Term) BitOr(args ...interface{}) Term { + return constructMethodTerm(t, "BitOr", p.Term_BIT_OR, args, map[string]interface{}{}) +} + +// Or performs a bitwise Or. +func BitOr(args ...interface{}) Term { + return constructRootTerm("BitOr", p.Term_BIT_OR, args, map[string]interface{}{}) +} + +// Or performs a bitwise XOR. +func (t Term) BitXor(args ...interface{}) Term { + return constructMethodTerm(t, "BitXor", p.Term_BIT_XOR, args, map[string]interface{}{}) +} + +// Or performs a bitwise XOR. +func BitXor(args ...interface{}) Term { + return constructRootTerm("BitXor", p.Term_BIT_XOR, args, map[string]interface{}{}) +} + +// Or performs a bitwise complement. +func (t Term) BitNot() Term { + return constructMethodTerm(t, "BitNot", p.Term_BIT_NOT, []interface{}{}, map[string]interface{}{}) +} + +// Or performs a bitwise complement. +func BitNot(arg interface{}) Term { + return constructRootTerm("BitNot", p.Term_BIT_NOT, []interface{}{arg}, map[string]interface{}{}) +} + +// Or performs a bitwise shift arithmetic left. +func (t Term) BitSal(args ...interface{}) Term { + return constructMethodTerm(t, "BitSal", p.Term_BIT_SAL, args, map[string]interface{}{}) +} + +// Or performs a bitwise shift arithmetic left. +func BitSal(args ...interface{}) Term { + return constructRootTerm("BitSal", p.Term_BIT_SAL, args, map[string]interface{}{}) +} + +//// Or performs a bitwise left shift. +//func (t Term) BitShl(args ...interface{}) Term { +// return constructMethodTerm(t, "BitShl", p.Term_BIT_SAL, args, map[string]interface{}{}) +//} +// +//// Or performs a bitwise left shift. +//func BitShl(args ...interface{}) Term { +// return constructRootTerm("BitShl", p.Term_BIT_SAL, args, map[string]interface{}{}) +//} + +// Or performs a bitwise shift arithmetic right. +func (t Term) BitSar(args ...interface{}) Term { + return constructMethodTerm(t, "BitSar", p.Term_BIT_SAR, args, map[string]interface{}{}) +} + +// Or performs a bitwise shift arithmetic right. +func BitSar(args ...interface{}) Term { + return constructRootTerm("BitSar", p.Term_BIT_SAR, args, map[string]interface{}{}) +} + +//// Or performs a bitwise right shift. +//func (t Term) BitShr(args ...interface{}) Term { +// return constructMethodTerm(t, "BitShr", p.Term_BIT_SHR, args, map[string]interface{}{}) +//} +// +//// Or performs a bitwise right shift. +//func BitShr(args ...interface{}) Term { +// return constructRootTerm("BitShr", p.Term_BIT_SHR, args, map[string]interface{}{}) +//} diff --git a/query_write.go b/query_write.go index 7b1d4293..fd78f2a2 100644 --- a/query_write.go +++ b/query_write.go @@ -6,9 +6,10 @@ import ( // InsertOpts contains the optional arguments for the Insert term type InsertOpts struct { - Durability interface{} `rethinkdb:"durability,omitempty"` - ReturnChanges interface{} `rethinkdb:"return_changes,omitempty"` - Conflict interface{} `rethinkdb:"conflict,omitempty"` + Durability interface{} `gorethink:"durability,omitempty"` + ReturnChanges interface{} `gorethink:"return_changes,omitempty"` + Conflict interface{} `gorethink:"conflict,omitempty"` + IgnoreWriteHook interface{} `gorethink:"ignore_write_hook,omitempty"` } func (o InsertOpts) toMap() map[string]interface{} { @@ -27,10 +28,11 @@ func (t Term) Insert(arg interface{}, optArgs ...InsertOpts) Term { // UpdateOpts contains the optional arguments for the Update term type UpdateOpts struct { - Durability interface{} `rethinkdb:"durability,omitempty"` - ReturnChanges interface{} `rethinkdb:"return_changes,omitempty"` - NonAtomic interface{} `rethinkdb:"non_atomic,omitempty"` - Conflict interface{} `rethinkdb:"conflict,omitempty"` + Durability interface{} `gorethink:"durability,omitempty"` + ReturnChanges interface{} `gorethink:"return_changes,omitempty"` + NonAtomic interface{} `gorethink:"non_atomic,omitempty"` + Conflict interface{} `gorethink:"conflict,omitempty"` + IgnoreWriteHook interface{} `gorethink:"ignore_write_hook,omitempty"` } func (o UpdateOpts) toMap() map[string]interface{} { @@ -50,9 +52,10 @@ func (t Term) Update(arg interface{}, optArgs ...UpdateOpts) Term { // ReplaceOpts contains the optional arguments for the Replace term type ReplaceOpts struct { - Durability interface{} `rethinkdb:"durability,omitempty"` - ReturnChanges interface{} `rethinkdb:"return_changes,omitempty"` - NonAtomic interface{} `rethinkdb:"non_atomic,omitempty"` + Durability interface{} `gorethink:"durability,omitempty"` + ReturnChanges interface{} `gorethink:"return_changes,omitempty"` + NonAtomic interface{} `gorethink:"non_atomic,omitempty"` + IgnoreWriteHook interface{} `gorethink:"ignore_write_hook,omitempty"` } func (o ReplaceOpts) toMap() map[string]interface{} { @@ -72,8 +75,9 @@ func (t Term) Replace(arg interface{}, optArgs ...ReplaceOpts) Term { // DeleteOpts contains the optional arguments for the Delete term type DeleteOpts struct { - Durability interface{} `rethinkdb:"durability,omitempty"` - ReturnChanges interface{} `rethinkdb:"return_changes,omitempty"` + Durability interface{} `gorethink:"durability,omitempty"` + ReturnChanges interface{} `gorethink:"return_changes,omitempty"` + IgnoreWriteHook interface{} `gorethink:"ignore_write_hook,omitempty"` } func (o DeleteOpts) toMap() map[string]interface{} { diff --git a/query_write_hooks.go b/query_write_hooks.go new file mode 100644 index 00000000..697a7753 --- /dev/null +++ b/query_write_hooks.go @@ -0,0 +1,33 @@ +package rethinkdb + +import ( + p "gopkg.in/rethinkdb/rethinkdb-go.v5/ql2" +) + +// Rethinkdb proposal: https://github.com/rethinkdb/rethinkdb/issues/5813 + +// WriteHookFunc called by rethinkdb when document is changed. +// id, oldVal or newVal can be null (test with Branch). +type WriteHookFunc func(id Term, oldVal Term, newVal Term) Term + +// SetWriteHook sets function that will be called each time document in a table is being +// inserted, updated, replaced or deleted. +func (t Term) SetWriteHook(hookFunc WriteHookFunc) Term { + var f interface{} = nil + if hookFunc != nil { + f = funcWrap(hookFunc) + } + return constructMethodTerm(t, "SetWriteHook", p.Term_SET_WRITE_HOOK, []interface{}{f}, map[string]interface{}{}) +} + +// WriteHookInfo is a return type of GetWriteHook func. +type WriteHookInfo struct { + Function []byte `gorethink:"function,omitempty"` + Query string `gorethink:"query,omitempty"` +} + +// GetWriteHook reads write hook associated with table. +// Use WriteHookInfo and ReadOne to get results. +func (t Term) GetWriteHook() Term { + return constructMethodTerm(t, "GetWriteHook", p.Term_GET_WRITE_HOOK, []interface{}{}, map[string]interface{}{}) +} diff --git a/session.go b/session.go index 103a686f..a90c3059 100644 --- a/session.go +++ b/session.go @@ -24,63 +24,63 @@ type Session struct { type ConnectOpts struct { // Address holds the address of the server initially used when creating the // session. Only used if Addresses is empty - Address string `rethinkdb:"address,omitempty"` + Address string `rethinkdb:"address,omitempty" json:"address,omitempty"` // Addresses holds the addresses of the servers initially used when creating // the session. - Addresses []string `rethinkdb:"addresses,omitempty"` + Addresses []string `rethinkdb:"addresses,omitempty" json:"addresses,omitempty"` // Database is the default database name used when executing queries, this // value is only used if the query does not contain any DB term - Database string `rethinkdb:"database,omitempty"` + Database string `rethinkdb:"database,omitempty" json:"database,omitempty"` // Username holds the username used for authentication, if blank (and the v1 // handshake protocol is being used) then the admin user is used - Username string `rethinkdb:"username,omitempty"` + Username string `rethinkdb:"username,omitempty" json:"username,omitempty"` // Password holds the password used for authentication (only used when using // the v1 handshake protocol) - Password string `rethinkdb:"password,omitempty"` + Password string `rethinkdb:"password,omitempty" json:"password,omitempty"` // AuthKey is used for authentication when using the v0.4 handshake protocol // This field is no deprecated - AuthKey string `rethinkdb:"authkey,omitempty"` + AuthKey string `rethinkdb:"authkey,omitempty" json:"authkey,omitempty"` // Timeout is the time the driver waits when creating new connections, to // configure the timeout used when executing queries use WriteTimeout and // ReadTimeout - Timeout time.Duration `rethinkdb:"timeout,omitempty"` + Timeout time.Duration `rethinkdb:"timeout,omitempty" json:"timeout,omitempty"` // WriteTimeout is the amount of time the driver will wait when sending the // query to the server - WriteTimeout time.Duration `rethinkdb:"write_timeout,omitempty"` + WriteTimeout time.Duration `rethinkdb:"write_timeout,omitempty" json:"write_timeout,omitempty"` // ReadTimeout is the amount of time the driver will wait for a response from // the server when executing queries. - ReadTimeout time.Duration `rethinkdb:"read_timeout,omitempty"` + ReadTimeout time.Duration `rethinkdb:"read_timeout,omitempty" json:"read_timeout,omitempty"` // KeepAlivePeriod is the keep alive period used by the connection, by default // this is 30s. It is not possible to disable keep alive messages - KeepAlivePeriod time.Duration `rethinkdb:"keep_alive_timeout,omitempty"` + KeepAlivePeriod time.Duration `rethinkdb:"keep_alive_timeout,omitempty" json:"keep_alive_timeout,omitempty"` // TLSConfig holds the TLS configuration and can be used when connecting // to a RethinkDB server protected by SSL - TLSConfig *tls.Config `rethinkdb:"tlsconfig,omitempty"` + TLSConfig *tls.Config `rethinkdb:"tlsconfig,omitempty" json:"tlsconfig,omitempty"` // HandshakeVersion is used to specify which handshake version should be // used, this currently defaults to v1 which is used by RethinkDB 2.3 and // later. If you are using an older version then you can set the handshake // version to 0.4 - HandshakeVersion HandshakeVersion `rethinkdb:"handshake_version,omitempty"` + HandshakeVersion HandshakeVersion `rethinkdb:"handshake_version,omitempty" json:"handshake_version,omitempty"` // UseJSONNumber indicates whether the cursors running in this session should // use json.Number instead of float64 while unmarshaling documents with // interface{}. The default is `false`. - UseJSONNumber bool + UseJSONNumber bool `json:"use_json_number,omitempty"` // NumRetries is the number of times a query is retried if a connection // error is detected, queries are not retried if RethinkDB returns a // runtime error. - NumRetries int + NumRetries int `json:"num_retries,omitempty"` // InitialCap is used by the internal connection pool and is used to // configure how many connections are created for each host when the // session is created. If zero then no connections are created until // the first query is executed. - InitialCap int `rethinkdb:"initial_cap,omitempty"` + InitialCap int `rethinkdb:"initial_cap,omitempty" json:"initial_cap,omitempty"` // MaxOpen is used by the internal connection pool and is used to configure // the maximum number of connections held in the pool. If all available // connections are being used then the driver will open new connections as // needed however they will not be returned to the pool. By default the // maximum number of connections is 2 - MaxOpen int `rethinkdb:"max_open,omitempty"` + MaxOpen int `rethinkdb:"max_open,omitempty" json:"max_open,omitempty"` // Below options are for cluster discovery, please note there is a high // probability of these changing as the API is still being worked on. @@ -88,21 +88,21 @@ type ConnectOpts struct { // DiscoverHosts is used to enable host discovery, when true the driver // will attempt to discover any new nodes added to the cluster and then // start sending queries to these new nodes. - DiscoverHosts bool `rethinkdb:"discover_hosts,omitempty"` + DiscoverHosts bool `rethinkdb:"discover_hosts,omitempty" json:"discover_hosts,omitempty"` // HostDecayDuration is used by the go-hostpool package to calculate a weighted // score when selecting a host. By default a value of 5 minutes is used. - HostDecayDuration time.Duration + HostDecayDuration time.Duration `json:"host_decay_duration,omitempty"` // UseOpentracing is used to enable creating opentracing-go spans for queries. // Each span is created as child of span from the context in `RunOpts`. // This span lasts from point the query created to the point when cursor closed. - UseOpentracing bool + UseOpentracing bool `json:"use_opentracing,omitempty"` // Deprecated: This function is no longer used due to changes in the // way hosts are selected. - NodeRefreshInterval time.Duration `rethinkdb:"node_refresh_interval,omitempty"` + NodeRefreshInterval time.Duration `rethinkdb:"node_refresh_interval,omitempty" json:"node_refresh_interval,omitempty"` // Deprecated: Use InitialCap instead - MaxIdle int `rethinkdb:"max_idle,omitempty"` + MaxIdle int `rethinkdb:"max_idle,omitempty" json:"max_idle,omitempty"` } func (o ConnectOpts) toMap() map[string]interface{} {