From 1f9447c2532073db61cab04378c00753e364eac6 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Mon, 25 Nov 2024 16:17:32 +0530 Subject: [PATCH] Use a thread-safe map to manage event writers for agents Signed-off-by: Chetan Banavikalmutt --- internal/event/event.go | 6 ++- principal/apis/eventstream/eventstream.go | 60 ++++++++++++++++++++--- 2 files changed, 58 insertions(+), 8 deletions(-) diff --git a/internal/event/event.go b/internal/event/event.go index d98d3f5..f7c7202 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -341,7 +341,11 @@ func (ew *EventWriter) Remove(ev *cloudevents.Event) { return } - if EventID(latestEvent.event) == EventID(ev) { + latestEvent.mu.RLock() + latestEventID := EventID(latestEvent.event) + latestEvent.mu.RUnlock() + + if latestEventID == EventID(ev) { delete(ew.latestEvents, resourceID) } } diff --git a/principal/apis/eventstream/eventstream.go b/principal/apis/eventstream/eventstream.go index 0e0565a..ccc47cb 100644 --- a/principal/apis/eventstream/eventstream.go +++ b/principal/apis/eventstream/eventstream.go @@ -48,11 +48,51 @@ type Server struct { options *ServerOptions queues queue.QueuePair + eventWriters *eventWritersMap +} + +// eventWritersMap provides a thread-safe way to manage event writers. +type eventWritersMap struct { + mu sync.RWMutex + // key: AgentName // value: EventWriter for that agent + // - acquire 'lock' before accessing eventWriters map[string]*event.EventWriter } +func newEventWritersMap() *eventWritersMap { + return &eventWritersMap{ + eventWriters: make(map[string]*event.EventWriter), + } +} + +func (ewm *eventWritersMap) Get(agentName string) *event.EventWriter { + ewm.mu.RLock() + defer ewm.mu.RUnlock() + + eventWriter, exists := ewm.eventWriters[agentName] + if exists { + return eventWriter + } + + return nil +} + +func (ewm *eventWritersMap) Add(agentName string, eventWriter *event.EventWriter) { + ewm.mu.Lock() + defer ewm.mu.Unlock() + + ewm.eventWriters[agentName] = eventWriter +} + +func (ewm *eventWritersMap) Remove(agentName string) { + ewm.mu.Lock() + defer ewm.mu.Unlock() + + delete(ewm.eventWriters, agentName) +} + type ServerOptions struct { MaxStreamDuration time.Duration } @@ -86,7 +126,7 @@ func NewServer(queues queue.QueuePair, opts ...ServerOption) *Server { return &Server{ queues: queues, options: options, - eventWriters: make(map[string]*event.EventWriter), + eventWriters: newEventWritersMap(), } } @@ -202,8 +242,8 @@ func (s *Server) recvFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe if event.Target(incomingEvent) == event.TargetEventAck { logCtx.Trace("Received an ACK") - eventWriter, exists := s.eventWriters[c.agentName] - if !exists { + eventWriter := s.eventWriters.Get(c.agentName) + if eventWriter == nil { return fmt.Errorf("panic: event writer not found for agent %s", c.agentName) } eventWriter.Remove(incomingEvent) @@ -242,8 +282,8 @@ func (s *Server) sendFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe return fmt.Errorf("panic: nil item in queue") } - eventWriter, exists := s.eventWriters[c.agentName] - if !exists { + eventWriter := s.eventWriters.Get(c.agentName) + if eventWriter == nil { return fmt.Errorf("panic: event writer not found for agent %s", c.agentName) } @@ -268,8 +308,13 @@ func (s *Server) Subscribe(subs eventstreamapi.EventStream_SubscribeServer) erro return err } - s.eventWriters[c.agentName] = event.NewEventWriter(subs) - go s.eventWriters[c.agentName].SendWaitingEvents(c.ctx) + s.eventWriters.Add(c.agentName, event.NewEventWriter(subs)) + eventWriter := s.eventWriters.Get(c.agentName) + if eventWriter == nil { + return fmt.Errorf("panic: event writer not found for agent %s", c.agentName) + } + + go eventWriter.SendWaitingEvents(c.ctx) // We receive events in a dedicated go routine c.wg.Add(1) @@ -314,6 +359,7 @@ func (s *Server) Subscribe(subs eventstreamapi.EventStream_SubscribeServer) erro c.wg.Wait() c.logCtx.Info("Closing EventStream") + s.eventWriters.Remove(c.agentName) return nil }