Skip to content

Commit

Permalink
Use a thread-safe map to manage event writers for agents
Browse files Browse the repository at this point in the history
Signed-off-by: Chetan Banavikalmutt <[email protected]>
  • Loading branch information
chetan-rns committed Nov 25, 2024
1 parent 0220027 commit 1f9447c
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 8 deletions.
6 changes: 5 additions & 1 deletion internal/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,11 @@ func (ew *EventWriter) Remove(ev *cloudevents.Event) {
return
}

if EventID(latestEvent.event) == EventID(ev) {
latestEvent.mu.RLock()
latestEventID := EventID(latestEvent.event)
latestEvent.mu.RUnlock()

if latestEventID == EventID(ev) {
delete(ew.latestEvents, resourceID)
}
}
Expand Down
60 changes: 53 additions & 7 deletions principal/apis/eventstream/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,51 @@ type Server struct {
options *ServerOptions
queues queue.QueuePair

eventWriters *eventWritersMap
}

// eventWritersMap provides a thread-safe way to manage event writers.
type eventWritersMap struct {
mu sync.RWMutex

// key: AgentName
// value: EventWriter for that agent
// - acquire 'lock' before accessing
eventWriters map[string]*event.EventWriter
}

func newEventWritersMap() *eventWritersMap {
return &eventWritersMap{
eventWriters: make(map[string]*event.EventWriter),
}
}

func (ewm *eventWritersMap) Get(agentName string) *event.EventWriter {
ewm.mu.RLock()
defer ewm.mu.RUnlock()

eventWriter, exists := ewm.eventWriters[agentName]
if exists {
return eventWriter
}

return nil
}

func (ewm *eventWritersMap) Add(agentName string, eventWriter *event.EventWriter) {
ewm.mu.Lock()
defer ewm.mu.Unlock()

ewm.eventWriters[agentName] = eventWriter
}

func (ewm *eventWritersMap) Remove(agentName string) {
ewm.mu.Lock()
defer ewm.mu.Unlock()

delete(ewm.eventWriters, agentName)
}

type ServerOptions struct {
MaxStreamDuration time.Duration
}
Expand Down Expand Up @@ -86,7 +126,7 @@ func NewServer(queues queue.QueuePair, opts ...ServerOption) *Server {
return &Server{
queues: queues,
options: options,
eventWriters: make(map[string]*event.EventWriter),
eventWriters: newEventWritersMap(),
}
}

Expand Down Expand Up @@ -202,8 +242,8 @@ func (s *Server) recvFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe

if event.Target(incomingEvent) == event.TargetEventAck {
logCtx.Trace("Received an ACK")
eventWriter, exists := s.eventWriters[c.agentName]
if !exists {
eventWriter := s.eventWriters.Get(c.agentName)
if eventWriter == nil {
return fmt.Errorf("panic: event writer not found for agent %s", c.agentName)
}
eventWriter.Remove(incomingEvent)
Expand Down Expand Up @@ -242,8 +282,8 @@ func (s *Server) sendFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe
return fmt.Errorf("panic: nil item in queue")
}

eventWriter, exists := s.eventWriters[c.agentName]
if !exists {
eventWriter := s.eventWriters.Get(c.agentName)
if eventWriter == nil {
return fmt.Errorf("panic: event writer not found for agent %s", c.agentName)
}

Expand All @@ -268,8 +308,13 @@ func (s *Server) Subscribe(subs eventstreamapi.EventStream_SubscribeServer) erro
return err
}

s.eventWriters[c.agentName] = event.NewEventWriter(subs)
go s.eventWriters[c.agentName].SendWaitingEvents(c.ctx)
s.eventWriters.Add(c.agentName, event.NewEventWriter(subs))
eventWriter := s.eventWriters.Get(c.agentName)
if eventWriter == nil {
return fmt.Errorf("panic: event writer not found for agent %s", c.agentName)
}

go eventWriter.SendWaitingEvents(c.ctx)

// We receive events in a dedicated go routine
c.wg.Add(1)
Expand Down Expand Up @@ -314,6 +359,7 @@ func (s *Server) Subscribe(subs eventstreamapi.EventStream_SubscribeServer) erro

c.wg.Wait()
c.logCtx.Info("Closing EventStream")
s.eventWriters.Remove(c.agentName)
return nil
}

Expand Down

0 comments on commit 1f9447c

Please sign in to comment.