Skip to content

Commit

Permalink
Use a separate event writer for each agent
Browse files Browse the repository at this point in the history
Signed-off-by: Chetan Banavikalmutt <[email protected]>
  • Loading branch information
chetan-rns committed Nov 20, 2024
1 parent 0249682 commit 2525ac2
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions principal/apis/eventstream/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ type Server struct {
options *ServerOptions
queues queue.QueuePair

eventWriter *event.EventWriter
// key: AgentName
// value: EventWriter for that agent
eventWriters map[string]*event.EventWriter
}

type ServerOptions struct {
Expand Down Expand Up @@ -83,8 +85,9 @@ func NewServer(queues queue.QueuePair, opts ...ServerOption) *Server {
o(options)
}
return &Server{
queues: queues,
options: options,
queues: queues,
options: options,
eventWriters: make(map[string]*event.EventWriter),
}
}

Expand Down Expand Up @@ -194,7 +197,11 @@ func (s *Server) recvFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe

if event.Target(incomingEvent) == event.TargetEventAck {
logCtx.Tracef("Received an ACK: resourceID %s eventID %s", event.ResourceID(incomingEvent), event.EventID(incomingEvent))
s.eventWriter.Remove(incomingEvent)
eventWriter, exists := s.eventWriters[c.agentName]
if !exists {
return fmt.Errorf("panic: event writer not found for agent %s", c.agentName)
}
eventWriter.Remove(incomingEvent)
logCtx.Trace("Removed the ACK from the EventWriter")
return nil
}
Expand Down Expand Up @@ -235,8 +242,13 @@ func (s *Server) sendFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe
return fmt.Errorf("panic: invalid data in sendqueue: want: %T, have %T", cloudevents.Event{}, item)
}

eventWriter, exists := s.eventWriters[c.agentName]
if !exists {
return fmt.Errorf("panic: event writer not found for agent %s", c.agentName)
}

logCtx.Tracef("Adding an item to the event writer: resourceID %s eventID %s", event.ResourceID(ev), event.EventID(ev))
s.eventWriter.Add(ev)
eventWriter.Add(ev)

q.Done(item)

Expand All @@ -256,8 +268,8 @@ func (s *Server) Subscribe(subs eventstreamapi.EventStream_SubscribeServer) erro
return err
}

s.eventWriter = event.NewEventWriter(subs)
go s.eventWriter.SendWaitingEvents(c.ctx)
s.eventWriters[c.agentName] = event.NewEventWriter(subs)
go s.eventWriters[c.agentName].SendWaitingEvents(c.ctx)

// We receive events in a dedicated go routine
c.wg.Add(1)
Expand Down

0 comments on commit 2525ac2

Please sign in to comment.