Skip to content

Commit

Permalink
feat: resend events with exponential backoff and wait for ACK (#225)
Browse files Browse the repository at this point in the history
Signed-off-by: Chetan Banavikalmutt <[email protected]>
  • Loading branch information
chetan-rns authored Nov 29, 2024
1 parent 206a7a3 commit 2486f0c
Show file tree
Hide file tree
Showing 8 changed files with 663 additions and 104 deletions.
2 changes: 2 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type Agent struct {
// At present, 'watchLock' is only acquired on calls to 'addAppUpdateToQueue'. This behaviour was added as a short-term attempt to preserve update event ordering. However, this is known to be problematic due to the potential for race conditions, both within itself, and between other event processors like deleteAppCallback.
watchLock sync.RWMutex
version *version.Version

eventWriter *event.EventWriter
}

const defaultQueueName = "default"
Expand Down
51 changes: 33 additions & 18 deletions agent/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,8 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error
return nil
}

logCtx.Tracef("Sending an item to the event stream")

pev, err := format.ToProto(ev)
if err != nil {
logCtx.Warnf("Could not wire event: %v", err)
return nil
}

err = stream.Send(&eventstreamapi.Event{Event: pev})
if err != nil {
if grpcutil.NeedReconnectOnError(err) {
return err
} else {
logCtx.Errorf("Error while sending: %v", err)
return nil
}
}
logCtx.WithField("resource_id", event.ResourceID(ev)).WithField("event_id", event.EventID(ev)).Trace("Adding an event to the event writer")
a.eventWriter.Add(ev)

return nil
}
Expand All @@ -122,10 +107,36 @@ func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) erro
logCtx.Errorf("Could not unwrap event: %v", err)
return nil
}

logCtx = logCtx.WithFields(logrus.Fields{
"resource_id": ev.ResourceID(),
"event_id": ev.EventID(),
})

logCtx.Debugf("Received a new event from stream")

if ev.Target() == event.TargetEventAck {
logCtx.Trace("Received an ACK for an event")
rawEvent, err := format.FromProto(rcvd.Event)
if err != nil {
return err
}
a.eventWriter.Remove(rawEvent)
logCtx.Trace("Removed an event from the event writer")
return nil
}

err = a.processIncomingEvent(ev)
if err != nil {
if err != nil && !event.IsEventDiscarded(err) && !event.IsEventNotAllowed(err) {
logCtx.WithError(err).Errorf("Unable to process incoming event")
} else {
// Send an ACK if the event is processed successfully.
sendQ := a.queues.SendQ(a.remote.ClientID())
if sendQ == nil {
return fmt.Errorf("no send queue found for the remote principal")
}
sendQ.Add(a.emitter.ProcessedEvent(event.EventProcessed, ev))
logCtx.Trace("Sent an ACK for an event")
}
return nil
}
Expand All @@ -137,6 +148,10 @@ func (a *Agent) handleStreamEvents() error {
if err != nil {
return err
}

a.eventWriter = event.NewEventWriter(stream)
go a.eventWriter.SendWaitingEvents(a.context)

logCtx := log().WithFields(logrus.Fields{
"module": "StreamEvent",
"server_addr": grpcutil.AddressFromContext(stream.Context()),
Expand Down
29 changes: 23 additions & 6 deletions agent/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/argoproj-labs/argocd-agent/pkg/types"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)

/*
Expand Down Expand Up @@ -122,7 +123,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
// that are incoming.
if a.mode != types.AgentModeManaged {
logCtx.Trace("Discarding this event, because agent is not in managed mode")
return nil, fmt.Errorf("cannot create application: agent is not in managed mode")
return nil, event.NewEventDiscardedErr("cannot create application: agent is not in managed mode")
}

// If we receive a new app event for an app we already manage, it usually
Expand All @@ -131,7 +132,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
// TODO(jannfis): Handle this situation properly instead of throwing an error.
if a.appManager.IsManaged(incoming.QualifiedName()) {
logCtx.Trace("Discarding this event, because application is already managed on this agent")
return nil, fmt.Errorf("application %s is already managed", incoming.QualifiedName())
return nil, event.NewEventDiscardedErr("application %s is already managed", incoming.QualifiedName())
}

logCtx.Infof("Creating a new application on behalf of an incoming event")
Expand All @@ -143,6 +144,11 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
}

created, err := a.appManager.Create(a.context, incoming)
if apierrors.IsAlreadyExists(err) {
logCtx.Debug("application already exists")
return created, nil
}

return created, err
}

Expand All @@ -156,7 +162,7 @@ func (a *Agent) updateApplication(incoming *v1alpha1.Application) (*v1alpha1.App
})
if a.appManager.IsChangeIgnored(incoming.QualifiedName(), incoming.ResourceVersion) {
logCtx.Tracef("Discarding this event, because agent has seen this version %s already", incoming.ResourceVersion)
return nil, fmt.Errorf("the version %s has already been seen by this agent", incoming.ResourceVersion)
return nil, event.NewEventDiscardedErr("the version %s has already been seen by this agent", incoming.ResourceVersion)
} else {
logCtx.Tracef("New resource version: %s", incoming.ResourceVersion)
}
Expand Down Expand Up @@ -197,6 +203,10 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error {
deletionPropagation := backend.DeletePropagationBackground
err := a.appManager.Delete(a.context, a.namespace, app, &deletionPropagation)
if err != nil {
if apierrors.IsNotFound(err) {
logCtx.Debug("application is not found, perhaps it is already deleted")
return nil
}
return err
}
err = a.appManager.Unmanage(app.QualifiedName())
Expand All @@ -219,14 +229,14 @@ func (a *Agent) createAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr
// that are incoming.
if a.mode != types.AgentModeManaged {
logCtx.Trace("Discarding this event, because agent is not in managed mode")
return nil, fmt.Errorf("cannot create appproject: agent is not in managed mode")
return nil, event.NewEventDiscardedErr("cannot create appproject: agent is not in managed mode")
}

// If we receive a new AppProject event for an AppProject we already manage, it usually
// means that we're out-of-sync from the control plane.
if a.appManager.IsManaged(incoming.Name) {
logCtx.Trace("Discarding this event, because AppProject is already managed on this agent")
return nil, fmt.Errorf("appproject %s is already managed", incoming.Name)
return nil, event.NewEventDiscardedErr("appproject %s is already managed", incoming.Name)
}

logCtx.Infof("Creating a new AppProject on behalf of an incoming event")
Expand All @@ -238,6 +248,9 @@ func (a *Agent) createAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr
}

created, err := a.projectManager.Create(a.context, incoming)
if apierrors.IsAlreadyExists(err) {
logCtx.Debug("appProject already exists")
}
return created, err
}

Expand All @@ -251,7 +264,7 @@ func (a *Agent) updateAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr
})
if a.appManager.IsChangeIgnored(incoming.Name, incoming.ResourceVersion) {
logCtx.Tracef("Discarding this event, because agent has seen this version %s already", incoming.ResourceVersion)
return nil, fmt.Errorf("the version %s has already been seen by this agent", incoming.ResourceVersion)
return nil, event.NewEventDiscardedErr("the version %s has already been seen by this agent", incoming.ResourceVersion)
} else {
logCtx.Tracef("New resource version: %s", incoming.ResourceVersion)
}
Expand Down Expand Up @@ -283,6 +296,10 @@ func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error {
deletionPropagation := backend.DeletePropagationBackground
err := a.projectManager.Delete(a.context, a.namespace, project, &deletionPropagation)
if err != nil {
if apierrors.IsNotFound(err) {
logCtx.Debug("appProject not found, perhaps it is already deleted")
return nil
}
return err
}
err = a.projectManager.Unmanage(project.Name)
Expand Down
Loading

0 comments on commit 2486f0c

Please sign in to comment.