Skip to content
This repository has been archived by the owner on Jun 13, 2023. It is now read-only.

Commit

Permalink
feat(lambda): support trace creation before timeout (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
ophiryael authored Aug 18, 2021
1 parent f0866ed commit 2e1bc6f
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 29 deletions.
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -465,15 +465,15 @@ import (

Advanced options can be configured as a parameter to the `Config` struct to the `WrapLambdaHandler` or as environment variables.

|Parameter |Environment Variable |Type |Default |Description |
|----------------------|------------------------------|-------|-------------|-----------------------------------------------------------------------------------|
|Token |EPSAGON_TOKEN |String |- |Epsagon account token |
|ApplicationName |- |String |- |Application name that will be set for traces |
|MetadataOnly |EPSAGON_METADATA |Boolean|`true` |Whether to send only the metadata (`True`) or also the payloads (`False`) |
|CollectorURL |EPSAGON_COLLECTOR_URL |String |- |The address of the trace collector to send trace to |
|Debug |EPSAGON_DEBUG |Boolean|`False` |Enable debug prints for troubleshooting |
|SendTimeout |EPSAGON_SEND_TIMEOUT_SEC |String |`1s` |The timeout duration to send the traces to the trace collector |

|Parameter |Environment Variable |Type |Default |Description |
|----------------------|------------------------------------|-------|-------------|-----------------------------------------------------------------------------------|
|Token |EPSAGON_TOKEN |String |- |Epsagon account token |
|ApplicationName |- |String |- |Application name that will be set for traces |
|MetadataOnly |EPSAGON_METADATA |Boolean|`true` |Whether to send only the metadata (`True`) or also the payloads (`False`) |
|CollectorURL |EPSAGON_COLLECTOR_URL |String |- |The address of the trace collector to send trace to |
|Debug |EPSAGON_DEBUG |Boolean|`False` |Enable debug prints for troubleshooting |
|SendTimeout |EPSAGON_SEND_TIMEOUT_SEC |String |`1s` |The timeout duration to send the traces to the trace collector |
|_ |EPSAGON_LAMBDA_TIMEOUT_THRESHOLD_MS |Integer|`200` |The threshold in milliseconds to send the trace before a Lambda timeout occurs |


## Getting Help
Expand Down
77 changes: 58 additions & 19 deletions epsagon/lambda_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"runtime/debug"
"strconv"
"strings"
"time"

"github.com/aws/aws-lambda-go/lambdacontext"
"github.com/epsagon/epsagon-go/protocol"
Expand All @@ -18,6 +19,8 @@ var (
coldStart = true
)

const TimeoutErrorCode protocol.ErrorCode = 3

type genericLambdaHandler func(context.Context, json.RawMessage) (interface{}, error)

// epsagonLambdaWrapper is a generic lambda function type
Expand All @@ -27,6 +30,7 @@ type epsagonLambdaWrapper struct {
tracer tracer.Tracer
invoked bool
invoking bool
timeout bool
}

type preInvokeData struct {
Expand All @@ -51,6 +55,24 @@ func getAWSAccount(lc *lambdacontext.LambdaContext) string {
return ""
}

func createLambdaEvent(preInvokeInfo *preInvokeData) *protocol.Event {
endTime := tracer.GetTimestamp()
duration := endTime - preInvokeInfo.StartTime

return &protocol.Event{
Id: preInvokeInfo.LambdaContext.AwsRequestID,
StartTime: preInvokeInfo.StartTime,
Resource: &protocol.Resource{
Name: lambdacontext.FunctionName,
Type: "lambda",
Operation: "invoke",
Metadata: preInvokeInfo.InvocationMetadata,
},
Origin: "runner",
Duration: duration,
}
}

func (wrapper *epsagonLambdaWrapper) preInvokeOps(
ctx context.Context, payload json.RawMessage) (info *preInvokeData) {
startTime := tracer.GetTimestamp()
Expand Down Expand Up @@ -100,23 +122,9 @@ func (wrapper *epsagonLambdaWrapper) postInvokeOps(
}
}()

endTime := tracer.GetTimestamp()
duration := endTime - preInvokeInfo.StartTime

lambdaEvent := &protocol.Event{
Id: preInvokeInfo.LambdaContext.AwsRequestID,
StartTime: preInvokeInfo.StartTime,
Resource: &protocol.Resource{
Name: lambdacontext.FunctionName,
Type: "lambda",
Operation: "invoke",
Metadata: preInvokeInfo.InvocationMetadata,
},
Origin: "runner",
Duration: duration,
ErrorCode: invokeInfo.errorStatus,
Exception: invokeInfo.ExceptionInfo,
}
lambdaEvent := createLambdaEvent(preInvokeInfo)
lambdaEvent.ErrorCode = invokeInfo.errorStatus
lambdaEvent.Exception = invokeInfo.ExceptionInfo

if !wrapper.config.MetadataOnly {
result, err := json.Marshal(invokeInfo.result)
Expand Down Expand Up @@ -151,12 +159,36 @@ func (wrapper *epsagonLambdaWrapper) Invoke(ctx context.Context, payload json.Ra
}()

preInvokeInfo := wrapper.preInvokeOps(ctx, payload)
go wrapper.trackTimeout(ctx, preInvokeInfo)
wrapper.InvokeClientLambda(ctx, payload, invokeInfo)
wrapper.postInvokeOps(preInvokeInfo, invokeInfo)
if !wrapper.timeout {
wrapper.postInvokeOps(preInvokeInfo, invokeInfo)
}

return invokeInfo.result, invokeInfo.err
}

func (wrapper *epsagonLambdaWrapper) trackTimeout(ctx context.Context, preInvokeInfo *preInvokeData) {
deadline, isDeadlineSet := ctx.Deadline()
if isDeadlineSet {
thresholdDuration := time.Duration(tracer.GetLambdaTimeoutThresholdMs())
deadline = deadline.Add(-thresholdDuration * time.Millisecond)
timeoutChannel := time.After(time.Until(deadline))

for range timeoutChannel {
if wrapper.invoking {
wrapper.timeout = true

lambdaEvent := createLambdaEvent(preInvokeInfo)
lambdaEvent.ErrorCode = TimeoutErrorCode

wrapper.tracer.AddEvent(lambdaEvent)
wrapper.tracer.Stop()
}
}
}
}

func (wrapper *epsagonLambdaWrapper) InvokeClientLambda(
ctx context.Context, payload json.RawMessage, invokeInfo *invocationData) {
defer func() {
Expand Down Expand Up @@ -196,12 +228,19 @@ func WrapLambdaHandler(config *Config, handler interface{}) interface{} {
return func(ctx context.Context, payload json.RawMessage) (interface{}, error) {
wrapperTracer := tracer.CreateGlobalTracer(&config.Config)
wrapperTracer.Start()
defer wrapperTracer.Stop()

wrapper := &epsagonLambdaWrapper{
config: config,
handler: makeGenericHandler(handler),
tracer: wrapperTracer,
}

defer func() {
if !wrapper.timeout {
wrapperTracer.Stop()
}
}()

return wrapper.Invoke(ctx, payload)
}
}
124 changes: 123 additions & 1 deletion epsagon/lambda_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ package epsagon
import (
"context"
"encoding/json"
"fmt"
"os"
"reflect"
"time"

"github.com/epsagon/epsagon-go/protocol"
"github.com/epsagon/epsagon-go/tracer"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"reflect"
)

var _ = Describe("lambda_wrapper", func() {
Expand Down Expand Up @@ -79,7 +83,125 @@ var _ = Describe("lambda_wrapper", func() {
Expect(exceptions).To(BeEmpty())
Expect(events).To(HaveLen(2))
})

Context("Lambda timeout handling", func() {
It("Marks event as success when timeout defined but not reached", func() {
const lambdaTimeout = 5 * time.Minute

called := false
wrapper := &epsagonLambdaWrapper{
config: &Config{},
handler: makeGenericHandler(func() { called = true }),
tracer: tracer.GlobalTracer,
}

lambdaDeadline := time.Now().Add(lambdaTimeout)
ctx, cancel := context.WithDeadline(context.Background(), lambdaDeadline)
defer cancel()

payload := json.RawMessage("{}")
wrapper.Invoke(ctx, payload)

Expect(called).To(Equal(true))
Expect(exceptions).To(BeEmpty())
Expect(events).To(HaveLen(2))

var trigger, runner *protocol.Event
for _, event := range events {
if event.Origin == "trigger" {
trigger = event
} else if event.Origin == "runner" {
runner = event
}
}
Expect(trigger).NotTo(BeNil())
Expect(runner).NotTo(BeNil())
Expect(trigger.ErrorCode).To(Equal(protocol.ErrorCode_OK))
Expect(runner.ErrorCode).To(Equal(protocol.ErrorCode_OK))
})

It("Marks event as timeout when the default lambda timout threshold reached", func() {
const lambdaTimeout = (tracer.DefaultLambdaTimeoutThresholdMs + 100) * time.Millisecond

called := false
wrapper := &epsagonLambdaWrapper{
config: &Config{},
handler: makeGenericHandler(func() {
called = true
time.Sleep(lambdaTimeout)
}),
tracer: tracer.GlobalTracer,
}

lambdaDeadline := time.Now().Add(lambdaTimeout)
ctx, cancel := context.WithDeadline(context.Background(), lambdaDeadline)
defer cancel()

payload := json.RawMessage("{}")
wrapper.Invoke(ctx, payload)

Expect(called).To(Equal(true))
Expect(exceptions).To(BeEmpty())
Expect(events).To(HaveLen(2))

var trigger, runner *protocol.Event
for _, event := range events {
if event.Origin == "trigger" {
trigger = event
} else if event.Origin == "runner" {
runner = event
}
}
Expect(trigger).NotTo(BeNil())
Expect(runner).NotTo(BeNil())
Expect(trigger.ErrorCode).To(Equal(protocol.ErrorCode_OK))
Expect(runner.ErrorCode).To(Equal(TimeoutErrorCode))
})

It("Marks event as timeout when a user defined lambda timout threshold reached", func() {
const userDefinedTimeoutThresholdMs = 50
os.Setenv("EPSAGON_LAMBDA_TIMEOUT_THRESHOLD_MS", fmt.Sprint(userDefinedTimeoutThresholdMs))
defer os.Unsetenv("EPSAGON_LAMBDA_TIMEOUT_THRESHOLD_MS")

const lambdaTimeout = (userDefinedTimeoutThresholdMs + 100) * time.Millisecond

called := false
wrapper := &epsagonLambdaWrapper{
config: &Config{},
handler: makeGenericHandler(func() {
called = true
time.Sleep(lambdaTimeout)
}),
tracer: tracer.GlobalTracer,
}

lambdaDeadline := time.Now().Add(lambdaTimeout)
ctx, cancel := context.WithDeadline(context.Background(), lambdaDeadline)
defer cancel()

payload := json.RawMessage("{}")
wrapper.Invoke(ctx, payload)

Expect(called).To(Equal(true))
Expect(exceptions).To(BeEmpty())
Expect(events).To(HaveLen(2))

var trigger, runner *protocol.Event
for _, event := range events {
if event.Origin == "trigger" {
trigger = event
} else if event.Origin == "runner" {
runner = event
}
}
Expect(trigger).NotTo(BeNil())
Expect(runner).NotTo(BeNil())
Expect(trigger.ErrorCode).To(Equal(protocol.ErrorCode_OK))
Expect(runner.ErrorCode).To(Equal(TimeoutErrorCode))
})
})
})

Describe("Error Flows", func() {
var (
events []*protocol.Event
Expand Down
14 changes: 14 additions & 0 deletions tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ var strongKeys = map[string]bool{
"item_hash": true,
}

// threshold in milliseconds to send the trace before a Lambda timeout occurs
const DefaultLambdaTimeoutThresholdMs = 200

func GetLambdaTimeoutThresholdMs() int {
timeoutThresholdMs := DefaultLambdaTimeoutThresholdMs
userDefinedThreshold, ok := os.LookupEnv("EPSAGON_LAMBDA_TIMEOUT_THRESHOLD_MS")
if ok {
if userDefinedThreshold, err := strconv.Atoi(userDefinedThreshold); err == nil {
timeoutThresholdMs = userDefinedThreshold
}
}
return timeoutThresholdMs
}

// Tracer is what a general program tracer has to provide
type Tracer interface {
AddEvent(*protocol.Event)
Expand Down

0 comments on commit 2e1bc6f

Please sign in to comment.