diff --git a/go.mod b/go.mod index f1cefa7..c31d920 100644 --- a/go.mod +++ b/go.mod @@ -4,15 +4,15 @@ require ( github.com/elastic/go-elasticsearch/v7 v7.10.0 github.com/golang/protobuf v1.5.4 github.com/streadway/amqp v1.0.0 - github.com/stretchr/testify v1.9.0 - google.golang.org/grpc v1.66.2 - google.golang.org/protobuf v1.34.2 + github.com/stretchr/testify v1.10.0 + google.golang.org/grpc v1.68.1 + google.golang.org/protobuf v1.35.2 k8s.io/klog v1.0.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/fatih/color v1.17.0 // indirect + github.com/fatih/color v1.18.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -21,13 +21,13 @@ require ( github.com/rakyll/gotest v0.0.6 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/vektra/mockery v1.1.2 // indirect - golang.org/x/mod v0.21.0 // indirect - golang.org/x/net v0.29.0 // indirect - golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.25.0 // indirect - golang.org/x/text v0.18.0 // indirect - golang.org/x/tools v0.25.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect + golang.org/x/mod v0.22.0 // indirect + golang.org/x/net v0.32.0 // indirect + golang.org/x/sync v0.10.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect + golang.org/x/tools v0.28.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 854c979..e627f6d 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/elastic/go-elasticsearch/v7 v7.10.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyEr github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI= +github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= +github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= @@ -44,6 +46,8 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/vektra/mockery v1.1.2 h1:uc0Yn67rJpjt8U/mAZimdCKn9AeA97BOkjpmtBSlfP4= github.com/vektra/mockery v1.1.2/go.mod h1:VcfZjKaFOPO+MpN4ZvwPjs4c48lkq1o3Ym8yHZJu0jU= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -52,6 +56,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0= golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4= +golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -59,10 +65,14 @@ golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= +golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -73,15 +83,21 @@ golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200323144430-8dcfad9e016e/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE= golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg= +golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= +golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -89,12 +105,18 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583 h1:IfdSdTcLFy4lqUQrQJLkLt1PB+AsqVz6lwkWPzWEz10= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo= google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0= +google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/mocks/WriteAdapter.go b/mocks/WriteAdapter.go index 5a5929e..7caea66 100644 --- a/mocks/WriteAdapter.go +++ b/mocks/WriteAdapter.go @@ -33,23 +33,8 @@ func (_m *WriteAdapter) GetTimeout() time.Duration { } // ProcessMessages provides a mock function with given fields: msgs -func (_m *WriteAdapter) ProcessMessages(msgs []messages.Message) []messages.Message { - ret := _m.Called(msgs) - - if len(ret) == 0 { - panic("no return value specified for ProcessMessages") - } - - var r0 []messages.Message - if rf, ok := ret.Get(0).(func([]messages.Message) []messages.Message); ok { - r0 = rf(msgs) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]messages.Message) - } - } - - return r0 +func (_m *WriteAdapter) ProcessMessages(msgs *[]messages.Message) { + _m.Called(msgs) } // ShouldProcess provides a mock function with given fields: msgs diff --git a/pkg/messages/message.go b/pkg/messages/message.go index 4e9c95e..f2ec421 100644 --- a/pkg/messages/message.go +++ b/pkg/messages/message.go @@ -1,43 +1,23 @@ package messages -import "errors" - type Message struct { Id interface{} Body []byte - acked bool + acked *bool } -func (m Message) Nack() (Message, error) { - err := m.setAsAcked() - if err != nil { - return Message{}, err - } - - return Message{ - Id: m.Id, - Body: []byte{0}, - }, nil +func (m *Message) IsAcked() bool { + return m.acked != nil && *m.acked } -func (m Message) Ack() (Message, error) { - err := m.setAsAcked() - if err != nil { - return Message{}, err - } - - return Message{ - Id: m.Id, - Body: []byte{1}, - }, nil +func (m *Message) IsNacked() bool { + return m.acked != nil && !*m.acked } -func (m *Message) setAsAcked() error { - if m.acked { - return errors.New("Message already acked") - } - - m.acked = true +func (m *Message) Nack() { + m.acked = func(b bool) *bool { return &b }(false) +} - return nil +func (m *Message) Ack() { + m.acked = func(b bool) *bool { return &b }(true) } diff --git a/pkg/readers/adapters/amqp.go b/pkg/readers/adapters/amqp.go index 669a72a..94033b2 100644 --- a/pkg/readers/adapters/amqp.go +++ b/pkg/readers/adapters/amqp.go @@ -54,7 +54,7 @@ func (a *Amqp) processMessages(writeChannel chan<- messages.Message) { func (a *Amqp) HandleAck(ackChannel <-chan messages.Message) { for ack := range ackChannel { - if ack.Body[0] == 1 { + if ack.IsAcked() { err := a.Ch.Ack(ack.Id.(uint64), false) if err != nil { klog.Error(err) diff --git a/pkg/readers/adapters/amqp_test.go b/pkg/readers/adapters/amqp_test.go index 05bfd45..7181c95 100644 --- a/pkg/readers/adapters/amqp_test.go +++ b/pkg/readers/adapters/amqp_test.go @@ -56,10 +56,13 @@ func TestHandleAck(t *testing.T) { Ch: channel, } - ackChannel <- messages.Message{ + msg := messages.Message{ Id: expectedId, - Body: []byte{1}, + Body: []byte("Hello!"), } + msg.Ack() + + ackChannel <- msg go obj.HandleAck(ackChannel) @@ -87,10 +90,13 @@ func TestHandleNack(t *testing.T) { Ch: channel, } - ackChannel <- messages.Message{ + msg := messages.Message{ Id: expectedId, - Body: []byte{0}, + Body: []byte("Hello!"), } + msg.Nack() + + ackChannel <- msg go obj.HandleAck(ackChannel) @@ -120,14 +126,20 @@ func TestHandleMixedAcks(t *testing.T) { Ch: channel, } - ackChannel <- messages.Message{ + msgAck := messages.Message{ Id: expectedAckId, - Body: []byte{1}, + Body: []byte("Hello!"), } - ackChannel <- messages.Message{ + msgAck.Ack() + + msgNack := messages.Message{ Id: expectedNackId, - Body: []byte{0}, + Body: []byte("Hello!"), } + msgNack.Nack() + + ackChannel <- msgAck + ackChannel <- msgNack go obj.HandleAck(ackChannel) diff --git a/pkg/readers/adapters/dummy_test.go b/pkg/readers/adapters/dummy_test.go index 05b0878..c455008 100644 --- a/pkg/readers/adapters/dummy_test.go +++ b/pkg/readers/adapters/dummy_test.go @@ -20,10 +20,14 @@ func TestProduceMessageQuantity(t *testing.T) { func TestAcksAreRead(t *testing.T) { ackChannel := make(chan messages.Message, 2) - ackChannel <- messages.Message{ + + msg := messages.Message{ Id: uint64(1), - Body: []byte{1}, + Body: []byte("Hello!"), } + msg.Ack() + + ackChannel <- msg obj := new(Dummy) go obj.HandleAck(ackChannel) diff --git a/pkg/writers/adapters/dummy.go b/pkg/writers/adapters/dummy.go index 2599e81..823dce4 100644 --- a/pkg/writers/adapters/dummy.go +++ b/pkg/writers/adapters/dummy.go @@ -8,15 +8,10 @@ import ( type Dummy struct{} -func (d *Dummy) ProcessMessages(msgs []messages.Message) []messages.Message { - var processedMsg []messages.Message - for i := 0; i < len(msgs); i++ { - processedMsg = append(processedMsg, messages.Message{ - Id: uint64(i), - Body: []byte{1}, - }) +func (d *Dummy) ProcessMessages(msgs *[]messages.Message) { + for i := range *msgs { + (*msgs)[i].Ack() } - return processedMsg } func (d *Dummy) ShouldProcess(msgs []messages.Message) bool { diff --git a/pkg/writers/adapters/elasticsearch.go b/pkg/writers/adapters/elasticsearch.go index b64f82d..96426e0 100644 --- a/pkg/writers/adapters/elasticsearch.go +++ b/pkg/writers/adapters/elasticsearch.go @@ -21,24 +21,20 @@ type Elasticsearch struct { Bulk esapi.Bulk } -func (es *Elasticsearch) ProcessMessages(msgs []messages.Message) []messages.Message { - acks := make([]messages.Message, len(msgs)) +func (es *Elasticsearch) ProcessMessages(msgs *[]messages.Message) { - if len(msgs) == 0 { - return acks + if len(*msgs) == 0 { + return } var buf bytes.Buffer - for i, msg := range msgs { + for i := range *msgs { + msg := &(*msgs)[i] body, err := es.decodeBody(msg.Body) if err != nil { klog.Errorf("Invalid Message: %s", string(msg.Body)) - nack, err := msg.Nack() - if err != nil { - klog.Error(err) - } - acks[i] = nack + msg.Nack() continue } err = es.writeToBuffer(&buf, body) @@ -48,26 +44,27 @@ func (es *Elasticsearch) ProcessMessages(msgs []messages.Message) []messages.Mes } if buf.Len() == 0 { - return acks + return } result, err := es.Bulk(bytes.NewReader(buf.Bytes())) if err != nil || result.IsError() { klog.Warningf("Error in bulk action, %v", err) - es.setAllNacks(msgs, acks) - return acks + es.setAllNacks(msgs) + return } response := es.getResponseFromResult(result) - es.setAcksFromResponse(response, msgs, acks) - return acks + es.setAcksFromResponse(response, msgs) } -func (es *Elasticsearch) setAcksFromResponse(response esAdapter.ElasticSearchBulkResponse, msgs []messages.Message, acks []messages.Message) { +func (es *Elasticsearch) setAcksFromResponse(response esAdapter.ElasticSearchBulkResponse, msgs *[]messages.Message) { maxValidStatus := 299 responseItemPos := 0 - for ackPos, ack := range acks { - if ack.Id != nil { + for i := range *msgs { + msg := &(*msgs)[i] + + if msg.IsNacked() { continue } @@ -79,15 +76,9 @@ func (es *Elasticsearch) setAcksFromResponse(response esAdapter.ElasticSearchBul if status > maxValidStatus { klog.Warningf("Item has invalid status: %v", data) - ack, err := msgs[ackPos].Nack() - if err == nil { - acks[ackPos] = ack - } + msg.Nack() } else { - ack, err := msgs[ackPos].Ack() - if err == nil { - acks[ackPos] = ack - } + msg.Ack() } } responseItemPos++ @@ -104,12 +95,9 @@ func (es *Elasticsearch) getResponseFromResult(result *esapi.Response) esAdapter return response } -func (es *Elasticsearch) setAllNacks(msgs []messages.Message, acks []messages.Message) { - for i, msg := range msgs { - nack, err := msg.Nack() - if err == nil { - acks[i] = nack - } +func (es *Elasticsearch) setAllNacks(msgs *[]messages.Message) { + for i := range *msgs { + (*msgs)[i].Nack() } } @@ -119,7 +107,7 @@ func (es *Elasticsearch) writeToBuffer(buf *bytes.Buffer, body esAdapter.Elastic return err } if bytes.Equal(meta, []byte("null")) { - return errors.New("Invalid body: meta should be present") + return errors.New("invalid body: meta should be present") } data, err := json.Marshal(body.Data) if err != nil { diff --git a/pkg/writers/adapters/elasticsearch_test.go b/pkg/writers/adapters/elasticsearch_test.go index 73d214c..096cf18 100644 --- a/pkg/writers/adapters/elasticsearch_test.go +++ b/pkg/writers/adapters/elasticsearch_test.go @@ -48,16 +48,18 @@ func TestAdapterReceiveInvalidMessage(t *testing.T) { Bulk: bulk.getBulkFunc(), } - acks := esAdapter.ProcessMessages([]messages.Message{ + msgs := []messages.Message{ { - Id: 0, + Id: 1, Body: []byte("{ Invalid Json }"), }, - }) + } + + esAdapter.ProcessMessages(&msgs) bulk.AssertNotCalled(t, "func1") - assert.Len(t, acks, 1) - assert.True(t, acks[0].Body[0] == 0) + assert.Len(t, msgs, 1) + assert.True(t, msgs[0].IsNacked()) } func TestBulkActionWithErrorsMustDiscardAllMessages(t *testing.T) { @@ -75,16 +77,18 @@ func TestBulkActionWithErrorsMustDiscardAllMessages(t *testing.T) { } bulk.On("func1", mock.Anything).Once().Return(&response, nil) - acks := esAdapter.ProcessMessages([]messages.Message{ + msgs := []messages.Message{ { Id: 0, Body: []byte("{ \"meta\": \"valid-json\" }"), }, - }) + } + + esAdapter.ProcessMessages(&msgs) bulk.AssertExpectations(t) - assert.Len(t, acks, 1) - assert.True(t, acks[0].Body[0] == 0) + assert.Len(t, msgs, 1) + assert.True(t, msgs[0].IsNacked()) } func TestBulkActionWithSingleItemSucessful(t *testing.T) { @@ -102,16 +106,18 @@ func TestBulkActionWithSingleItemSucessful(t *testing.T) { } bulk.On("func1", mock.Anything).Once().Return(&response, nil) - acks := esAdapter.ProcessMessages([]messages.Message{ + msgs := []messages.Message{ { Id: 0, Body: []byte("{ \"meta\": \"valid-json\" }"), }, - }) + } + + esAdapter.ProcessMessages(&msgs) bulk.AssertExpectations(t) - assert.Len(t, acks, 1) - assert.True(t, acks[0].Body[0] == 1) + assert.Len(t, msgs, 1) + assert.True(t, msgs[0].IsAcked()) } func TestBulkActionWithSingleItemUnsuccessful(t *testing.T) { @@ -129,16 +135,18 @@ func TestBulkActionWithSingleItemUnsuccessful(t *testing.T) { } bulk.On("func1", mock.Anything).Once().Return(&response, nil) - acks := esAdapter.ProcessMessages([]messages.Message{ + msgs := []messages.Message{ { Id: 0, Body: []byte("{ \"meta\": \"valid-json\" }"), }, - }) + } + + esAdapter.ProcessMessages(&msgs) bulk.AssertExpectations(t) - assert.Len(t, acks, 1) - assert.True(t, acks[0].Body[0] == 0) + assert.Len(t, msgs, 1) + assert.True(t, msgs[0].IsNacked()) } func TestBulkActionWithMixedItemStatus(t *testing.T) { @@ -156,7 +164,7 @@ func TestBulkActionWithMixedItemStatus(t *testing.T) { } bulk.On("func1", mock.Anything).Once().Return(&response, nil) - acks := esAdapter.ProcessMessages([]messages.Message{ + msgs := []messages.Message{ { Id: 0, Body: []byte("{ \"meta\": \"valid-json\" }"), @@ -169,13 +177,15 @@ func TestBulkActionWithMixedItemStatus(t *testing.T) { Id: 2, Body: []byte("{ \"meta\": \"valid-json\" }"), }, - }) + } + + esAdapter.ProcessMessages(&msgs) bulk.AssertExpectations(t) - assert.Len(t, acks, 3) - assert.True(t, acks[0].Body[0] == 0) - assert.True(t, acks[1].Body[0] == 1) - assert.True(t, acks[2].Body[0] == 0) + assert.Len(t, msgs, 3) + assert.True(t, msgs[0].IsNacked()) + assert.True(t, msgs[1].IsAcked()) + assert.True(t, msgs[2].IsNacked()) } func TestBulkActionWithOnlyMetadata(t *testing.T) { @@ -194,16 +204,18 @@ func TestBulkActionWithOnlyMetadata(t *testing.T) { expectedBody := "{\"delete\":{\"_id\":\"123\"}}\n" bulk.On("func1", expectedBody).Once().Return(&response, nil) - acks := esAdapter.ProcessMessages([]messages.Message{ + msgs := []messages.Message{ { Id: 0, Body: []byte("{ \"meta\": {\"delete\": {\"_id\":\"123\"}} }"), }, - }) + } + + esAdapter.ProcessMessages(&msgs) bulk.AssertExpectations(t) - assert.Len(t, acks, 1) - assert.True(t, acks[0].Body[0] == 1) + assert.Len(t, msgs, 1) + assert.True(t, msgs[0].IsAcked()) } func TestBulkActionWithNoMetadata(t *testing.T) { @@ -214,15 +226,17 @@ func TestBulkActionWithNoMetadata(t *testing.T) { Bulk: bulk.getBulkFunc(), } - acks := esAdapter.ProcessMessages([]messages.Message{ + msgs := []messages.Message{ { Id: 0, Body: []byte("{ \"foobar\": {\"delete\": {\"_id\":\"123\"}} }"), }, - }) + } + + esAdapter.ProcessMessages(&msgs) bulk.AssertNotCalled(t, "func1", mock.Anything) bulk.AssertExpectations(t) - assert.Len(t, acks, 1) - assert.Empty(t, acks[0].Body) + assert.Len(t, msgs, 1) + assert.Empty(t, msgs[0].IsNacked()) } diff --git a/pkg/writers/adapters/interface.go b/pkg/writers/adapters/interface.go index 9a312ed..a406289 100644 --- a/pkg/writers/adapters/interface.go +++ b/pkg/writers/adapters/interface.go @@ -7,7 +7,7 @@ import ( ) type WriteAdapter interface { - ProcessMessages(msgs []messages.Message) []messages.Message + ProcessMessages(msgs *[]messages.Message) ShouldProcess(msgs []messages.Message) bool GetTimeout() time.Duration } diff --git a/pkg/writers/adapters/nop.go b/pkg/writers/adapters/nop.go index 1b1e292..ce751d1 100644 --- a/pkg/writers/adapters/nop.go +++ b/pkg/writers/adapters/nop.go @@ -8,15 +8,10 @@ import ( type Nop struct{} -func (wa *Nop) ProcessMessages(msgs []messages.Message) []messages.Message { - acks := make([]messages.Message, 0) - for _, msg := range msgs { - ack, err := msg.Ack() - if err == nil { - acks = append(acks, ack) - } +func (wa *Nop) ProcessMessages(msgs *[]messages.Message) { + for i := range *msgs { + (*msgs)[i].Ack() } - return acks } func (wa *Nop) GetTimeout() time.Duration { diff --git a/pkg/writers/writer.go b/pkg/writers/writer.go index f766e27..d66293a 100644 --- a/pkg/writers/writer.go +++ b/pkg/writers/writer.go @@ -55,9 +55,9 @@ func (ew *Writer) timeout(ackChannel chan<- messages.Message) { func (ew *Writer) trigger(ackChannel chan<- messages.Message) { ew.mutex.Lock() - acks := ew.WriteAdapter.ProcessMessages(ew.msgs) + ew.WriteAdapter.ProcessMessages(&ew.msgs) + ew.sendAcks(ew.msgs, ackChannel) ew.msgs = make([]messages.Message, 0) - ew.sendAcks(acks, ackChannel) ew.mutex.Unlock() } diff --git a/pkg/writers/writer_test.go b/pkg/writers/writer_test.go index 1b958bc..1436436 100644 --- a/pkg/writers/writer_test.go +++ b/pkg/writers/writer_test.go @@ -142,18 +142,17 @@ func getAcks(n int) []messages.Message { } func mockProcessMessages(writeAdapter *mocks.WriteAdapter) { - writeAdapter.On("ProcessMessages", mock.MatchedBy(func(msgs []messages.Message) bool { - return len(msgs) == 3 + writeAdapter.On("ProcessMessages", mock.MatchedBy(func(msgs *[]messages.Message) bool { + return len(*msgs) == 3 })).Maybe().Return(getAcks(3)) - writeAdapter.On("ProcessMessages", mock.MatchedBy(func(msgs []messages.Message) bool { - return len(msgs) == 1 + writeAdapter.On("ProcessMessages", mock.MatchedBy(func(msgs *[]messages.Message) bool { + return len(*msgs) == 1 })).Maybe().Return(getAcks(1)) - writeAdapter.On("ProcessMessages", mock.MatchedBy(func(msgs []messages.Message) bool { - return len(msgs) == 2 + writeAdapter.On("ProcessMessages", mock.MatchedBy(func(msgs *[]messages.Message) bool { + return len(*msgs) == 2 })).Maybe().Return(getAcks(2)) - writeAdapter.On("ProcessMessages", mock.MatchedBy(func(msgs []messages.Message) bool { - return len(msgs) == 0 + writeAdapter.On("ProcessMessages", mock.MatchedBy(func(msgs *[]messages.Message) bool { + return len(*msgs) == 0 })).Maybe().Return(getAcks(0)) - }