Skip to content

Commit

Permalink
add redis pub/sub support
Browse files Browse the repository at this point in the history
  • Loading branch information
demdxx committed Oct 3, 2021
1 parent ffa97fd commit 8d576b6
Show file tree
Hide file tree
Showing 39 changed files with 859 additions and 66 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
test:
strategy:
matrix:
go-version: [1.15.x, 1.16.x]
go-version: [1.15.x, 1.16.x, 1.17.x]
platform: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.platform }}
steps:
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ _testmain.go
*.exe
*.test
*.prof

.vscode
43 changes: 16 additions & 27 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ export PATH := $(GOBIN):$(PATH)
# https://golang.org/doc/go1.12#tls_1_3
export GODEBUG := tls13=0

GOLINT_VERSION := d0100b6bd8b389f0385611eb39152c4d7c3a7905
GOLINT := $(TMP_VERSIONS)/golint/$(GOLINT_VERSION)
$(GOLINT):
$(eval GOLINT_TMP := $(shell mktemp -d))
cd $(GOLINT_TMP); go get golang.org/x/lint/golint@$(GOLINT_VERSION)
@rm -rf $(GOLINT_TMP)
@rm -rf $(dir $(GOLINT))
@mkdir -p $(dir $(GOLINT))
@touch $(GOLINT)
GOLANGLINTCI_VERSION := latest
GOLANGLINTCI := $(TMP_VERSIONS)/golangci-lint/$(GOLANGLINTCI_VERSION)
$(GOLANGLINTCI):
$(eval GOLANGLINTCI_TMP := $(shell mktemp -d))
cd $(GOLANGLINTCI_TMP); go get github.com/golangci/golangci-lint/cmd/golangci-lint@$(GOLANGLINTCI_VERSION)
@rm -rf $(GOLANGLINTCI_TMP)
@rm -rf $(dir $(GOLANGLINTCI))
@mkdir -p $(dir $(GOLANGLINTCI))
@touch $(GOLANGLINTCI)

ERRCHECK_VERSION := v1.2.0
ERRCHECK := $(TMP_VERSIONS)/errcheck/$(ERRCHECK_VERSION)
Expand Down Expand Up @@ -84,31 +84,20 @@ $(GOMOCK):
@touch $(GOMOCK)

.PHONY: deps
deps: $(GOLINT) $(ERRCHECK) $(STATICCHECK) $(CERTSTRAP) $(GOMOCK)
deps: $(GOLANGLINTCI) $(ERRCHECK) $(STATICCHECK) $(CERTSTRAP) $(GOMOCK)

.PHONY: generate-code
generate-code: ## Generate mocks for the project
@echo "Generate mocks for the project"
@go generate ./...

.PHONY: golint
golint: $(GOLINT)
golint -set_exit_status ./...

.PHONY: vet
vet:
go vet ./...

.PHONY:
errcheck: $(ERRCHECK)
errcheck ./...

.PHONY: staticcheck
staticcheck: $(STATICCHECK)
staticcheck ./...

.PHONY: lint
lint: golint vet errcheck staticcheck
lint: golint

.PHONY: golint
golint: $(GOLANGLINTCI)
# golint -set_exit_status ./...
golangci-lint run -v ./...

.PHONY: test
test: ## Run package test
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ simplify writing pub/sub-services.
- [NATS](nats)
- [NATS Stream](natstream)
- [PostgreSQL](pg)
- [Redis](redis)
- [Golang Chanels implementation](gochan)
- [Golang time interval executor](interval)
- [TODO](#todo)
Expand Down Expand Up @@ -101,7 +102,7 @@ func main() {
## TODO

* [ ] Add support Amazon SQS queue
* [ ] Add support Redis queue
* [X] Add support Redis queue
* [ ] Add support RabbitMQ queue
* [ ] Add support MySQL notifications queue
* [X] Add support PostgreSQL notifications queue
Expand Down
19 changes: 19 additions & 0 deletions decoder/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package decoder

import (
"encoding/json"
"encoding/xml"
)

// Decoder function type
type Decoder func(data []byte, msg interface{}) error

// JSON decoder implementation
func JSON(data []byte, msg interface{}) error {
return json.Unmarshal(data, msg)
}

// XML decoder implementation
func XML(data []byte, msg interface{}) error {
return xml.Unmarshal(data, msg)
}
40 changes: 40 additions & 0 deletions decoder/decoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package decoder

import (
"encoding/xml"
"reflect"
"testing"

"github.com/stretchr/testify/assert"
)

type testItem struct {
XMLName xml.Name `json:"-" xml:"item"`
Value string `json:"value" xml:"value"`
}

func TestDecoding(t *testing.T) {
tests := []struct {
target interface{}
data string
dec Decoder
}{
{
target: &testItem{Value: "target"},
data: `{"value":"target"}`,
dec: JSON,
},
{
target: &testItem{XMLName: xml.Name{Local: "item"}, Value: "target"},
data: `<item><value>target</value></item>`,
dec: XML,
},
}

for _, test := range tests {
var it testItem
err := test.dec([]byte(test.data), &it)
assert.NoError(t, err)
assert.True(t, reflect.DeepEqual(test.target, &it))
}
}
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
module github.com/geniusrabbit/notificationcenter

go 1.14
go 1.15

require (
github.com/Shopify/sarama v1.29.1
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/alicebob/miniredis/v2 v2.15.1
github.com/allegro/bigcache v1.2.1
github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/demdxx/gocast v1.0.1
github.com/demdxx/rpool v0.0.0-20200317152850-d737c64d8aaf
github.com/elliotchance/redismock v1.5.3
github.com/go-redis/redis v6.15.9+incompatible
Expand All @@ -21,4 +22,5 @@ require (
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
go.uber.org/multierr v1.7.0
)
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI=
github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk=
github.com/alicebob/miniredis/v2 v2.15.1 h1:Fw+ixAJPmKhCLBqDwHlTDqxUxp0xjEwXczEpt1B6r7k=
github.com/alicebob/miniredis/v2 v2.15.1/go.mod h1:gquAfGbzn92jvtrSC69+6zZnwSODVXVpYDRaGhWaL6I=
github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKSc=
github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM=
Expand All @@ -23,6 +25,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/demdxx/gocast v1.0.1 h1:5dIlibBCaaGD4SCBWNHyKUt3DNlbcIbtoJjMH1CNgRY=
github.com/demdxx/gocast v1.0.1/go.mod h1:doeaWGKAe/UuiDhJ3tmbSK2RwN5qK+8Z18RRVzHLNsA=
github.com/demdxx/rpool v0.0.0-20200317152850-d737c64d8aaf h1:qTk+vPwzCp/on8DpTLDizwwvo08DNlj0ptDYwP+R4ng=
github.com/demdxx/rpool v0.0.0-20200317152850-d737c64d8aaf/go.mod h1:x9oXikUwI05ecNFxo0JUpQ6hABl2lNtKMXv2wlDduyk=
github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
Expand Down Expand Up @@ -182,10 +186,15 @@ github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
6 changes: 3 additions & 3 deletions gochan/go.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Publisher struct {
// Publish one or more messages to the pub-service
func (p Publisher) Publish(ctx context.Context, messages ...interface{}) error {
for _, msg := range messages {
if err := p.proxy.write(msg); err != nil {
if err := p.proxy.write(ctx, msg); err != nil {
return err
}
}
Expand Down Expand Up @@ -55,12 +55,12 @@ func (p *Proxy) Publisher() Publisher {
return Publisher{proxy: p}
}

func (p *Proxy) write(msg interface{}) error {
func (p *Proxy) write(ctx context.Context, msg interface{}) error {
buff := &bytes.Buffer{}
if err := p.encoder(msg, buff); err != nil {
return err
}
p.pool <- message{data: buff.Bytes()}
p.pool <- message{ctx: ctx, data: buff.Bytes()}
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions gochan/message.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
package gochan

import "context"

type message struct {
ctx context.Context
data []byte
}

// Context of the message
func (m message) Context() context.Context {
return m.ctx
}

// Unical message ID (depends on transport)
func (m message) ID() string {
return ""
Expand Down
9 changes: 8 additions & 1 deletion interval/message.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package interval

import (
"context"
"encoding/json"
"errors"
)
Expand All @@ -13,7 +14,13 @@ type messageValue interface {
}

type message struct {
v interface{}
ctx context.Context
v interface{}
}

// Context of the message
func (m *message) Context() context.Context {
return m.ctx
}

// Unical message ID (depends on transport)
Expand Down
6 changes: 3 additions & 3 deletions interval/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ loop:
case <-ctx.Done():
break loop
case <-s.ticker.C:
if err := s.ProcessMessage(s.message()); err != nil {
if err := s.ProcessMessage(s.message(ctx)); err != nil {
return err
}
}
}
return nil
}

func (s *interval) message() *message {
return &message{s.msgFnk()}
func (s *interval) message(ctx context.Context) *message {
return &message{ctx: ctx, v: s.msgFnk()}
}

// Close nstream client
Expand Down
7 changes: 7 additions & 0 deletions kafka/message.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"context"
"errors"

"github.com/Shopify/sarama"
Expand All @@ -11,10 +12,16 @@ import (
var ErrMessageInvalidConsumer = errors.New(`[message] invalid consumer`)

type message struct {
ctx context.Context
msg *sarama.ConsumerMessage
consumer *cluster.Consumer
}

// Context of the message
func (m *message) Context() context.Context {
return m.ctx
}

// ID returns unical message ID (depends on transport)
func (m *message) ID() string {
return ""
Expand Down
1 change: 1 addition & 0 deletions kafka/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestMessage(t *testing.T) {
}
assert.Equal(t, []byte(`{"data": "test"}`), msg.Body())
assert.Equal(t, ``, msg.ID())
assert.Nil(t, msg.Context())
assert.Error(t, msg.Ack())
}

Expand Down
2 changes: 1 addition & 1 deletion kafka/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ loop:
if !ok {
break loop
}
m := &message{msg: msg, consumer: s.consumer}
m := &message{ctx: ctx, msg: msg, consumer: s.consumer}
if err := s.ProcessMessage(m); err != nil {
s.logger.Error(err)
}
Expand Down
5 changes: 5 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package notificationcenter

import "context"

// Message describes the access methods to the message original object
type Message interface {
// Context of the message
Context() context.Context

// Unical message ID (depends on transport)
ID() string

Expand Down
12 changes: 10 additions & 2 deletions mocks/message.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
package mocks

import "context"

// Message simple object for testing reasons
type Message struct {
ctx context.Context
id string
body []byte
err error
}

// NewMessage object constructor
func NewMessage(id string, body []byte, ackErr error) *Message {
return &Message{id: id, body: body, err: ackErr}
func NewMessage(ctx context.Context, id string, body []byte, ackErr error) *Message {
return &Message{ctx: ctx, id: id, body: body, err: ackErr}
}

// Context of the message
func (m *Message) Context() context.Context {
return m.ctx
}

// ID Unical message ID (depends on transport)
Expand Down
Loading

0 comments on commit 8d576b6

Please sign in to comment.