diff --git a/agent/connection.go b/agent/connection.go index 34b73a9..0655392 100644 --- a/agent/connection.go +++ b/agent/connection.go @@ -86,7 +86,7 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error return nil } - logCtx.Trace("Adding an item to the event writer", "resourceID", event.ResourceID(ev), "eventID", event.EventID(ev)) + logCtx.Tracef("Adding an item to the event writer: resourceID %s eventID %s", event.ResourceID(ev), event.EventID(ev)) a.eventWriter.Add(ev) return nil @@ -117,13 +117,13 @@ func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) erro logCtx.Debugf("Received a new event from stream") if ev.Target() == event.TargetEventAck { - logCtx.Trace("Received an ACK for an event", "resourceID", ev.ResourceID(), "eventID", ev.EventID()) + logCtx.Tracef("Received an ACK for an event: resourceID %s eventID %s", ev.ResourceID(), ev.EventID()) rawEvent, err := format.FromProto(rcvd.Event) if err != nil { return err } a.eventWriter.Remove(rawEvent) - logCtx.Trace("Removed an event from the EventWriter", "resourceID", ev.ResourceID(), "eventID", ev.EventID()) + logCtx.Tracef("Removed an event from the EventWriter: resourceID %s eventID %s", ev.ResourceID(), ev.EventID()) return nil } @@ -137,7 +137,7 @@ func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) erro return fmt.Errorf("no send queue found for the remote principal") } sendQ.Add(a.emitter.ProcessedEvent(event.EventProcessed, ev)) - logCtx.Info("Sent an ACK for an event", "resourceID", ev.ResourceID(), "eventID", ev.EventID()) + logCtx.Tracef("Sent an ACK for an event: resourceID %s eventID %s", ev.ResourceID(), ev.EventID()) } return nil } diff --git a/internal/event/event.go b/internal/event/event.go index c10bef7..85a7d44 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -307,6 +307,7 @@ func (ew *EventWriter) Add(ev *cloudevents.Event) { event: ev, backoff: &defaultBackoff, } + ew.log.Tracef("added a new event resourceID %s eventID %s", resID, EventID(ev)) return } @@ -316,6 +317,7 @@ func (ew *EventWriter) Add(ev *cloudevents.Event) { eventMsg.backoff = &defaultBackoff eventMsg.retryAfter = nil eventMsg.mu.Unlock() + ew.log.Tracef("updated an existing event: resourceID %s eventID %s", resID, EventID(ev)) } func (ew *EventWriter) Get(resID string) *eventMessage { @@ -367,7 +369,7 @@ func (ew *EventWriter) sendEvent(resID string) { // Check if the event is already ACK'd. eventMsg := ew.Get(resID) if eventMsg == nil { - ew.log.Trace("event is not found, perhaps it is already ACK'd", "resourceID", resID) + ew.log.Tracef("event is not found, perhaps it is already ACK'd: resourceID %s", resID) return } @@ -375,8 +377,11 @@ func (ew *EventWriter) sendEvent(resID string) { defer eventMsg.mu.Unlock() // Check if it is time to resend the event. - if eventMsg.retryAfter != nil && eventMsg.retryAfter.After(time.Now()) { - return + if eventMsg.retryAfter != nil { + if eventMsg.retryAfter.After(time.Now()) { + return + } + ew.log.Tracef("resending an event: resourceID %s eventID %s", resID, EventID(eventMsg.event)) } defer func() { @@ -398,11 +403,11 @@ func (ew *EventWriter) sendEvent(resID string) { return } - ew.log.Trace("event sent to target", "resourceID", resID, "type", eventMsg.event.Type()) + ew.log.Tracef("event sent to target: resourceID %s eventID %s", resID, EventID(eventMsg.event)) // We don't have to wait for an ACK if the current event is ACK. So, remove it from the EventWriter. if Target(eventMsg.event) == TargetEventAck { ew.Remove(eventMsg.event) - ew.log.Trace("ACK is removed from the EventWriter", "resourceID", resID, "eventID", EventID(eventMsg.event)) + ew.log.Tracef("ACK is removed from the EventWriter: resourceID: %s eventID: %s", resID, EventID(eventMsg.event)) } } diff --git a/principal/apis/eventstream/eventstream.go b/principal/apis/eventstream/eventstream.go index 1ffae1f..e79c27d 100644 --- a/principal/apis/eventstream/eventstream.go +++ b/principal/apis/eventstream/eventstream.go @@ -193,7 +193,7 @@ func (s *Server) recvFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe } if event.Target(incomingEvent) == event.TargetEventAck { - logCtx.Trace("Received an ACK event", "resourceID", event.ResourceID(incomingEvent), "eventID", event.EventID(incomingEvent)) + logCtx.Tracef("Received an ACK: resourceID %s eventID %s", event.ResourceID(incomingEvent), event.EventID(incomingEvent)) s.eventWriter.Remove(incomingEvent) logCtx.Trace("Removed the ACK from the EventWriter") return nil @@ -235,7 +235,7 @@ func (s *Server) sendFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe return fmt.Errorf("panic: invalid data in sendqueue: want: %T, have %T", cloudevents.Event{}, item) } - logCtx.Trace("Adding an item to the event writer", "resourceID", event.ResourceID(ev), "eventID", event.EventID(ev)) + logCtx.Tracef("Adding an item to the event writer: resourceID %s eventID %s", event.ResourceID(ev), event.EventID(ev)) s.eventWriter.Add(ev) q.Done(item) diff --git a/principal/event.go b/principal/event.go index 5a56432..9944a2c 100644 --- a/principal/event.go +++ b/principal/event.go @@ -286,7 +286,7 @@ func (s *Server) eventProcessor(ctx context.Context) error { logCtx.Debugf("Queue disappeared -- client probably has disconnected") return } - logCtx.Trace("sending an ACK", "resourceID", event.ResourceID(ev), "eventID", event.EventID(ev)) + logCtx.Tracef("sending an ACK: resourceID %s eventID %s", event.ResourceID(ev), event.EventID(ev)) sendQ.Add(s.events.ProcessedEvent(event.EventProcessed, event.New(ev, event.TargetEventAck))) }(queueName, q) }