Skip to content

Commit

Permalink
fix(flusher): Retry if we flush on exit
Browse files Browse the repository at this point in the history
The flusher keeps a batch of events so retrying doesn't make sense when
we'll flush again with the next event.

But on exit there won't be a next event, so we need to retry there.

Because you can't set the retry behaviour after constructing an Axiom
client, this creates a second one that retries and adds a `RetryOpt` to
the `Flush` method.
  • Loading branch information
bahlo committed Oct 5, 2023
1 parent cdf1654 commit 0167347
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 21 deletions.
44 changes: 36 additions & 8 deletions flusher/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@ import (
"time"

"github.com/axiomhq/axiom-go/axiom"
"github.com/axiomhq/axiom-go/axiom/ingest"
"go.uber.org/zap"

"github.com/axiomhq/axiom-lambda-extension/version"
)

type RetryOpt int

const (
NoRetry RetryOpt = iota
Retry
)

// Axiom Config
var (
axiomToken = os.Getenv("AXIOM_TOKEN")
Expand All @@ -29,24 +37,37 @@ func init() {

type Axiom struct {
client *axiom.Client
retryClient *axiom.Client
events []axiom.Event
eventsLock sync.Mutex
lastFlushTime time.Time
}

func New() (*Axiom, error) {
client, err := axiom.NewClient(
// We create two almost identical clients, but one will retry and one will
// not. This is mostly because we are just waiting for the next flush with
// the next event most of the time, but want to retry on exit/shutdown.

opts := []axiom.Option{
axiom.SetAPITokenConfig(axiomToken),
axiom.SetUserAgent(fmt.Sprintf("axiom-lambda-extension/%s", version.Get())),
axiom.SetNoRetry(),
)
}

retryClient, err := axiom.NewClient(opts...)
if err != nil {
return nil, err
}

opts = append(opts, axiom.SetNoRetry())
client, err := axiom.NewClient(opts...)
if err != nil {
return nil, err
}

f := &Axiom{
client: client,
events: make([]axiom.Event, 0),
client: client,
retryClient: retryClient,
events: make([]axiom.Event, 0),
}

return f, nil
Expand All @@ -73,7 +94,7 @@ func (f *Axiom) QueueEvents(events []axiom.Event) {
f.events = append(f.events, events...)
}

func (f *Axiom) Flush() {
func (f *Axiom) Flush(opt RetryOpt) {
f.eventsLock.Lock()
var batch []axiom.Event
// create a copy of the batch, clear the original
Expand All @@ -85,9 +106,16 @@ func (f *Axiom) Flush() {
return
}

res, err := f.client.IngestEvents(context.Background(), axiomDataset, batch)
var res *ingest.Status
var err error
if opt == Retry {
res, _ = f.retryClient.IngestEvents(context.Background(), axiomDataset, batch)
} else {
res, _ = f.client.IngestEvents(context.Background(), axiomDataset, batch)
}

if err != nil {
logger.Error("failed to ingest events", zap.Error(err))
logger.Error("Failed to ingest events", zap.Error(err))
// allow this batch to be retried again, put them back
f.eventsLock.Lock()
defer f.eventsLock.Unlock()
Expand Down
29 changes: 16 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func Run() error {
if err != nil {
// We don't want to exit with error, so that the extensions doesn't crash and crash the main function with it.
// so we continue even if Axiom client is nil
logger.Error("error creating axiom client, no logs will send to Axiom.", zap.Error(err))
logger.Error("Failed to create Axiom client, no logs will be sent to Axiom", zap.Error(err))
// if users want to crash on error, they can set the PANIC_ON_API_ERROR env variable
if crashOnAPIErr == "true" {
return err
Expand Down Expand Up @@ -112,41 +112,44 @@ func Run() error {
return err
}

// Make sure we flush with retry on exit
defer func() {
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush(flusher.Retry)
})
}()

for {
select {
case <-ctx.Done():
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush()
})
logger.Info("Context Done", zap.Any("ctx", ctx.Err()))
logger.Info("Context done", zap.Error(ctx.Err()))
return nil
default:
res, err := extensionClient.NextEvent(ctx, extensionName)
if err != nil {
logger.Error("Next event Failed:", zap.Error(err))
logger.Error("Next event failed:", zap.Error(err))
return err
}

// on every event received, check if we should flush
// On every event received, check if we should flush
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
if client.ShouldFlush() {
client.Flush()
// No retry, we'll try again with the next event
client.Flush(flusher.NoRetry)
}
})

// wait for the first invocation to finish (receive platform.runtimeDone log), then flush
// Wait for the first invocation to finish (receive platform.runtimeDone log), then flush
if isFirstInvocation {
<-runtimeDone
isFirstInvocation = false
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush()
// No retry, we'll try again with the next event
client.Flush(flusher.NoRetry)
})
}

if res.EventType == "SHUTDOWN" {
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush()
})
_ = httpServer.Shutdown()
return nil
}
Expand Down

0 comments on commit 0167347

Please sign in to comment.