Skip to content

Commit

Permalink
Make it easy to plugin other queues (#2208)
Browse files Browse the repository at this point in the history
* Add unit tests for nats queue

* Move nats related code to own client

Renamed realtime.Realtime to realtime.Broker to make the import not stuttering

* Implement healthy test as table test to support different brokers in the future

Co-authored-by: Johann Böhler <[email protected]>
  • Loading branch information
hikhvar and bitionaire authored Oct 12, 2022
1 parent e6f0476 commit f3e6e8b
Show file tree
Hide file tree
Showing 16 changed files with 686 additions and 334 deletions.
8 changes: 5 additions & 3 deletions server/src/api/router.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package api

import (
"net/http"

"github.com/go-chi/cors"
"github.com/go-chi/jwtauth/v5"
"github.com/go-chi/render"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"net/http"

"scrumlr.io/server/auth"
"scrumlr.io/server/logger"
"scrumlr.io/server/realtime"
Expand All @@ -19,7 +21,7 @@ import (
type Server struct {
basePath string

realtime *realtime.Realtime
realtime *realtime.Broker
auth auth.Auth

boards services.Boards
Expand All @@ -39,7 +41,7 @@ type Server struct {

func New(
basePath string,
rt *realtime.Realtime,
rt *realtime.Broker,
auth auth.Auth,
boards services.Boards,
votings services.Votings,
Expand Down
6 changes: 4 additions & 2 deletions server/src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ func run(c *cli.Context) error {
return errors.Wrap(err, "unable to migrate database")
}

rt := realtime.New(c.String("nats"))

rt, err := realtime.NewNats(c.String("nats"))
if err != nil {
logger.Get().Fatalf("failed to connect to message queue: %v", err)
}
basePath := "/"
if c.IsSet("base-path") {
basePath = c.String("base-path")
Expand Down
21 changes: 15 additions & 6 deletions server/src/realtime/board_sessions_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package realtime

import (
"fmt"

"github.com/google/uuid"

"scrumlr.io/server/logger"
)

Expand All @@ -13,13 +15,20 @@ const (
RequestRejected = "SESSION_REJECTED"
)

func (r *Realtime) BroadcastUpdateOnBoardSessionRequest(board, user uuid.UUID, msg BoardSessionRequestEventType) error {
func (b *Broker) BroadcastUpdateOnBoardSessionRequest(board, user uuid.UUID, msg BoardSessionRequestEventType) error {
logger.Get().Debugw("broadcasting to board session request", "board", board, "user", user, "msg", msg)
return r.con.Publish(fmt.Sprintf("request.%s.%s", board, user), msg)
return b.con.Publish(requestSubject(board, user), msg)
}

func (b *Broker) GetBoardSessionRequestChannel(board, user uuid.UUID) chan *BoardSessionRequestEventType {
c, err := b.con.SubscribeToBoardSessionEvents(requestSubject(board, user))
if err != nil {
// TODO: Bubble up this error, so the caller can retry to establish this subscription
logger.Get().Errorw("failed to subscribe to BoardSessionRequestChannel", "err", err)
}
return c
}

func (r *Realtime) GetBoardSessionRequestChannel(board, user uuid.UUID) chan *BoardSessionRequestEventType {
receiverChan := make(chan *BoardSessionRequestEventType)
r.con.BindRecvChan(fmt.Sprintf("request.%s.%s", board, user), receiverChan)
return receiverChan
func requestSubject(board, user uuid.UUID) string {
return fmt.Sprintf("request.%s.%s", board, user)
}
60 changes: 60 additions & 0 deletions server/src/realtime/board_sessions_requests_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package realtime_test

import (
"context"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"scrumlr.io/server/realtime"
)

func TestRealtime_GetBoardSessionRequestChannel(t *testing.T) {

ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
rt, err := realtime.NewNats(SetupNatsContainer(t))
assert.Nil(t, err)
testBoard := uuid.New()
testUser := uuid.New()

testEvents := []realtime.BoardSessionRequestEventType{
realtime.RequestRejected,
realtime.RequestAccepted,
"some undefined event",
}

eventChannel := rt.GetBoardSessionRequestChannel(testBoard, testUser)
readEvents := []realtime.BoardSessionRequestEventType{}
wg := sync.WaitGroup{}
go func() {
for {
select {
case ev := <-eventChannel:
assert.NotNil(t, ev)
readEvents = append(readEvents, *ev)
wg.Done()
case <-ctx.Done():
return
}
}
}()

for _, ev := range testEvents {
err := rt.BroadcastUpdateOnBoardSessionRequest(testBoard, testUser, ev)
assert.Nil(t, err)
wg.Add(1)
}

go func() {
wg.Wait()
cancelFunc()
}()

<-ctx.Done()
assert.Equal(t, testEvents, readEvents)

}
20 changes: 14 additions & 6 deletions server/src/realtime/boards.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/google/uuid"

"scrumlr.io/server/logger"
)

Expand Down Expand Up @@ -33,13 +34,20 @@ type BoardEvent struct {
Data interface{} `json:"data,omitempty"`
}

func (r *Realtime) BroadcastToBoard(boardID uuid.UUID, msg BoardEvent) error {
func (b *Broker) BroadcastToBoard(boardID uuid.UUID, msg BoardEvent) error {
logger.Get().Debugw("broadcasting to board", "board", boardID, "msg", msg.Type)
return r.con.Publish(fmt.Sprintf("board.%s", boardID), msg)
return b.con.Publish(boardsSubject(boardID), msg)
}

func (b *Broker) GetBoardChannel(boardID uuid.UUID) chan *BoardEvent {
c, err := b.con.SubscribeToBoardEvents(boardsSubject(boardID))
if err != nil {
// TODO: Bubble up this error, so the caller can retry to establish this subscription
logger.Get().Errorw("failed to subscribe to BoardChannel", "err", err)
}
return c
}

func (r *Realtime) GetBoardChannel(boardID uuid.UUID) chan *BoardEvent {
receiverChan := make(chan *BoardEvent)
r.con.BindRecvChan(fmt.Sprintf("board.%s", boardID), receiverChan)
return receiverChan
func boardsSubject(boardID uuid.UUID) string {
return fmt.Sprintf("board.%s", boardID)
}
96 changes: 96 additions & 0 deletions server/src/realtime/boards_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package realtime_test

import (
"context"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"scrumlr.io/server/realtime"
)

func TestRealtime_GetBoardChannel(t *testing.T) {

ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
rt, err := realtime.NewNats(SetupNatsContainer(t))
assert.Nil(t, err)
testBoard := uuid.New()

testEvents := []realtime.BoardEvent{
{
Type: realtime.BoardEventInit,
Data: nil,
},
{
Type: realtime.BoardEventInit,
Data: "not nil string data",
},
{
Type: realtime.BoardEventNotesUpdated,
Data: struct {
More string
Complex float64
Data map[int]bool
}{
More: "foo",
Complex: 2.0,
Data: map[int]bool{1: false, 3: true},
},
},
}

// TODO: The current datastructures doesn't respect the types, because
// an empty interface can be anything.
expectedEvents := []realtime.BoardEvent{
testEvents[0],
testEvents[1],
{
Type: realtime.BoardEventNotesUpdated,
Data: map[string]interface{}{
"Complex": 2.0,
"More": "foo",
"Data": map[string]interface{}{
// Mapping int to string here, because JSON stuff.
"1": false,
"3": true,
},
},
},
}

eventChannel := rt.GetBoardChannel(testBoard)
readEvents := []realtime.BoardEvent{}
wg := sync.WaitGroup{}
go func() {
for {
select {
case ev := <-eventChannel:
assert.NotNil(t, ev)
readEvents = append(readEvents, *ev)
wg.Done()
case <-ctx.Done():
return
}
}
}()

for _, ev := range testEvents {
err := rt.BroadcastToBoard(testBoard, ev)
assert.Nil(t, err)
wg.Add(1)
}

go func() {
wg.Wait()
cancelFunc()
}()

<-ctx.Done()

assert.Equal(t, expectedEvents, readEvents)

}
21 changes: 21 additions & 0 deletions server/src/realtime/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package realtime

// Client can publish data to an external queue and receive events from
// that external queue
type Client interface {
// Publish an event to the queue
Publish(subject string, event interface{}) error

// SubscribeToBoardSessionEvents subscribes to the given topic and return a channel
// with the received BoardSessionRequestEventType
SubscribeToBoardSessionEvents(subject string) (chan *BoardSessionRequestEventType, error)

// SubscribeToBoardEvents subscribes to the given topic and return a channel
// // with the received BoardEvent
SubscribeToBoardEvents(subject string) (chan *BoardEvent, error)
}

// The Broker enables a user to broadcast and receive events
type Broker struct {
con Client
}
8 changes: 6 additions & 2 deletions server/src/realtime/health.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package realtime

func (r *Realtime) IsHealthy() bool {
err := r.con.Publish("health", "test")
// IsHealthy returns true if the Broker is in a healthy state
func (b *Broker) IsHealthy() bool {
if b == nil || b.con == nil {
return false
}
err := b.con.Publish("health", "test")
if err != nil {
return false
}
Expand Down
43 changes: 43 additions & 0 deletions server/src/realtime/health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package realtime_test

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"scrumlr.io/server/realtime"
)

func TestBroker_IsHealthy(t *testing.T) {
type testCase struct {
name string
setupBroker func(t *testing.T) *realtime.Broker
expected bool
}
testcases := []testCase{
{
name: "nats client has has wrong url",
setupBroker: func(t *testing.T) *realtime.Broker {
rt, err := realtime.NewNats("foo")
require.NotNil(t, err)
return rt
},
expected: false,
},
{
name: "nats client is setup correctly",
setupBroker: func(t *testing.T) *realtime.Broker {
rt, err := realtime.NewNats(SetupNatsContainer(t))
require.Nil(t, err)
return rt
},
expected: true,
},
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.expected, tt.setupBroker(t).IsHealthy(), "healthy didn't return expected result")
})
}
}
Loading

0 comments on commit f3e6e8b

Please sign in to comment.