Skip to content
This repository has been archived by the owner on Dec 28, 2024. It is now read-only.

Commit

Permalink
feat: finalise presence for pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Dec 26, 2024
1 parent ea9fef8 commit 6eb09ed
Show file tree
Hide file tree
Showing 15 changed files with 642 additions and 124 deletions.
36 changes: 20 additions & 16 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,14 @@ type Cacheable interface {
ToCacheEntry() ([]byte, error)
}

type PresenceInfo struct {
// Total number of present clients (uniq)
Total int
// Presence records
Records []interface{}
}

// We can extend the presence read functionality in the future
// (e.g., add pagination, filtering, etc.)
type PresenceInfoOptions struct {
ReturnRecords bool
ReturnRecords bool `json:"return_records,omitempty"`
}

func NewPresenceInfoOptions() *PresenceInfoOptions {
return &PresenceInfoOptions{ReturnRecords: true}
}

type PresenceInfoOption func(*PresenceInfoOptions)
Expand Down Expand Up @@ -83,14 +80,17 @@ type Broker interface {

// Adds a new presence record for the stream. Returns true if that's the first
// presence record for the presence ID (pid, a unique user presence identifier).
PresenceAdd(stream string, sid string, pid string, info interface{}) error
PresenceAdd(stream string, sid string, pid string, info interface{}) (*common.PresenceEvent, error)

// Removes a presence record for the stream. Returns true if that was the last
// record for the presence ID (pid).
PresenceRemove(stream string, sid string, pid string) error
PresenceRemove(stream string, sid string) (*common.PresenceEvent, error)

// Retrieves presence information for the stream (counts, records, etc. depending on the options)
PresenceInfo(stream string, opts ...PresenceInfoOption) (*PresenceInfo, error)
PresenceInfo(stream string, opts ...PresenceInfoOption) (*common.PresenceInfo, error)

// Marks presence record as finished (for cache expiration)
FinishPresence(sid string) error
}

// LocalBroker is a single-node broker that can used to store streams data locally
Expand Down Expand Up @@ -229,14 +229,18 @@ func (LegacyBroker) FinishSession(sid string) error {
return nil
}

func (LegacyBroker) PresenceAdd(stream string, sid string, pid string, info interface{}) error {
return errors.New("presence not supported")
func (LegacyBroker) PresenceAdd(stream string, sid string, pid string, info interface{}) (*common.PresenceEvent, error) {
return nil, errors.New("presence not supported")
}

func (LegacyBroker) PresenceRemove(stream string, sid string, pid string) error {
return errors.New("presence not supported")
func (LegacyBroker) PresenceRemove(stream string, sid string) (*common.PresenceEvent, error) {
return nil, errors.New("presence not supported")
}

func (LegacyBroker) PresenceInfo(stream string, opts ...PresenceInfoOption) (*PresenceInfo, error) {
func (LegacyBroker) PresenceInfo(stream string, opts ...PresenceInfoOption) (*common.PresenceInfo, error) {
return nil, errors.New("presence not supported")
}

func (LegacyBroker) FinishPresence(sid string) error {
return nil
}
131 changes: 88 additions & 43 deletions broker/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"errors"
"fmt"
"slices"
"sync"
"time"

"github.com/anycable/anycable-go/common"
"github.com/anycable/anycable-go/utils"
nanoid "github.com/matoous/go-nanoid"
)

Expand Down Expand Up @@ -178,6 +180,7 @@ type presenceSessionEntry struct {

type presenceEntry struct {
info interface{}
id string
sessions []string
}

Expand All @@ -200,6 +203,14 @@ func (pe *presenceEntry) remove(sid string) bool {
return len(pe.sessions) == 0
}

func (pe *presenceEntry) add(sid string, info interface{}) {
if !slices.Contains(pe.sessions, sid) {
pe.sessions = append(pe.sessions, sid)
}

pe.info = info
}

type presenceState struct {
streams map[string]map[string]*presenceEntry
sessions map[string]*presenceSessionEntry
Expand Down Expand Up @@ -431,6 +442,10 @@ func (b *Memory) FinishSession(sid string) error {
}
b.sessionsMu.Unlock()

return nil
}

func (b *Memory) FinishPresence(sid string) error {
b.presence.mu.Lock()

if sp, ok := b.presence.sessions[sid]; ok {
Expand All @@ -442,62 +457,82 @@ func (b *Memory) FinishSession(sid string) error {
return nil
}

func (b *Memory) PresenceAdd(stream string, sid string, pid string, info interface{}) error {
func (b *Memory) PresenceAdd(stream string, sid string, pid string, info interface{}) (*common.PresenceEvent, error) {
b.presence.mu.Lock()
defer b.presence.mu.Unlock()

if _, ok := b.presence.streams[stream]; !ok {
b.presence.streams[stream] = make(map[string]*presenceEntry)
}

if _, ok := b.presence.sessions[sid]; !ok {
b.presence.sessions[sid] = &presenceSessionEntry{
streams: make(map[string]string),
}
}

if oldPid, ok := b.presence.sessions[sid].streams[stream]; ok && oldPid != pid {
return nil, errors.New("presence ID mismatch")
}

b.presence.sessions[sid].streams[stream] = pid

streamPresence := b.presence.streams[stream]

newPresence := false

if _, ok := streamPresence[pid]; !ok {
newPresence = true
streamPresence[pid] = &presenceEntry{
info: info,
id: pid,
sessions: []string{},
}
}

streamSessionPresence := streamPresence[pid]

newPresence := len(streamSessionPresence.sessions) == 0

streamSessionPresence.sessions = append(
streamSessionPresence.sessions,
sid,
)

if _, ok := b.presence.sessions[sid]; !ok {
b.presence.sessions[sid] = &presenceSessionEntry{
streams: make(map[string]string),
}
}

b.presence.sessions[sid].streams[stream] = pid
streamSessionPresence.add(sid, info)

if newPresence {
b.broadcaster.Broadcast(&common.StreamMessage{
Stream: stream,
Data: common.PresenceJoinMessage(pid, info),
})
return &common.PresenceEvent{
Type: common.PresenceJoinType,
ID: pid,
Info: info,
}, nil
}

return nil
return nil, nil
}

func (b *Memory) PresenceRemove(stream string, sid string, pid string) error {
func (b *Memory) PresenceRemove(stream string, sid string) (*common.PresenceEvent, error) {
b.presence.mu.Lock()
defer b.presence.mu.Unlock()

if _, ok := b.presence.streams[stream]; !ok {
return nil
return nil, errors.New("stream not found")
}

var pid string

if ses, ok := b.presence.sessions[sid]; ok {
if id, ok := ses.streams[stream]; !ok {
return nil, errors.New("presence info not found")
} else {
pid = id
}

delete(ses.streams, stream)

if len(ses.streams) == 0 {
delete(b.presence.sessions, sid)
}
}

streamPresence := b.presence.streams[stream]

if _, ok := streamPresence[pid]; !ok {
return nil
return nil, errors.New("presence record not found")
}

streamSessionPresence := streamPresence[pid]
Expand All @@ -512,42 +547,43 @@ func (b *Memory) PresenceRemove(stream string, sid string, pid string) error {
delete(b.presence.streams, stream)
}

if _, ok := b.presence.sessions[sid]; ok {
delete(b.presence.sessions[sid].streams, stream)
}

if empty {
b.broadcaster.Broadcast(&common.StreamMessage{
Stream: stream,
Data: common.PresenceLeaveMessage(pid),
})
return &common.PresenceEvent{
Type: common.PresenceLeaveType,
ID: pid,
}, nil
}

return nil
return nil, nil
}

func (b *Memory) PresenceInfo(stream string, opts ...PresenceInfoOption) (*PresenceInfo, error) {
options := &PresenceInfoOptions{}
func (b *Memory) PresenceInfo(stream string, opts ...PresenceInfoOption) (*common.PresenceInfo, error) {
options := NewPresenceInfoOptions()
for _, opt := range opts {
opt(options)
}

b.presence.mu.RLock()
defer b.presence.mu.RUnlock()

info := common.NewPresenceInfo()

if _, ok := b.presence.streams[stream]; !ok {
return &PresenceInfo{Total: 0}, nil
return info, nil
}

streamPresence := b.presence.streams[stream]

info := &PresenceInfo{Total: len(streamPresence)}
info.Total = len(streamPresence)

if options.ReturnRecords {
info.Records = make([]interface{}, 0, len(streamPresence))
info.Records = make([]*common.PresenceEvent, 0, len(streamPresence))

for _, entry := range streamPresence {
info.Records = append(info.Records, entry.info)
info.Records = append(info.Records, &common.PresenceEvent{
Info: entry.info,
ID: entry.id,
})
}
}

Expand Down Expand Up @@ -635,7 +671,7 @@ func (b *Memory) expirePresence() {
toDelete := []string{}

for sid, sp := range b.presence.sessions {
if sp.deadline < now {
if sp.deadline > 0 && sp.deadline < now {
toDelete = append(toDelete, sid)
}
}
Expand All @@ -661,9 +697,15 @@ func (b *Memory) expirePresence() {
if empty {
delete(b.presence.streams[stream], pid)

msg := &common.PresenceEvent{Type: common.PresenceLeaveType, ID: pid}

leaveMessages = append(leaveMessages, common.StreamMessage{
Stream: stream,
Data: common.PresenceLeaveMessage(pid),
Data: string(utils.ToJSON(msg)),
Meta: &common.StreamMessageMetadata{
BroadcastType: common.PresenceType,
Transient: true,
},
})

if len(b.presence.streams[stream]) == 0 {
Expand All @@ -677,8 +719,11 @@ func (b *Memory) expirePresence() {

b.presence.mu.Unlock()

// TODO: batch broadcast?
for _, msg := range leaveMessages {
b.broadcaster.Broadcast(&msg)
if b.broadcaster != nil {
// TODO: batch broadcast?
// FIXME: move broadcasts out of broker
for _, msg := range leaveMessages {
b.broadcaster.Broadcast(&msg)
}
}
}
Loading

0 comments on commit 6eb09ed

Please sign in to comment.