Skip to content

Commit

Permalink
Merge pull request #29 from axiomhq/use-aws-telemetry-api
Browse files Browse the repository at this point in the history
Switch to the new Telemetry API
  • Loading branch information
dasfmi authored Nov 4, 2024
2 parents 838efec + 226ff09 commit 7add6a4
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 150 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package: build
cd bin && zip -r extension.zip extensions

publish: package
aws lambda publish-layer-version --layer-name axiom-development-lambda-extension-go --region eu-west-1 --zip-file "fileb://bin/extension.zip" --compatible-architectures ${GOARCH} --description 'axiom lambda extension to push lambda logs to https://axiom.co'
aws lambda publish-layer-version --layer-name axiom-development-lambda-extension-go --region eu-west-1 --zip-file "fileb://bin/extension.zip" --compatible-architectures ${GOARCH} --description 'axiom lambda extension to push lambda logs to https://axiom.co'

arch:
echo ${GOARCH}
Expand Down
4 changes: 2 additions & 2 deletions extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ const (
extensionIdentifierHeader = "Lambda-Extension-Identifier"
)

func New(LogsAPI string) *Client {
func New(telemetryAPI string) *Client {
return &Client{
baseURL: fmt.Sprintf("http://%s/2020-01-01/extension", LogsAPI),
baseURL: fmt.Sprintf("http://%s/2020-01-01/extension", telemetryAPI),
httpClient: &http.Client{},
}
}
Expand Down
12 changes: 6 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (

"github.com/axiomhq/axiom-lambda-extension/extension"
"github.com/axiomhq/axiom-lambda-extension/flusher"
"github.com/axiomhq/axiom-lambda-extension/logsapi"
"github.com/axiomhq/axiom-lambda-extension/server"
"github.com/axiomhq/axiom-lambda-extension/telemetryapi"
)

var (
Expand Down Expand Up @@ -92,22 +92,22 @@ func Run() error {
}

// LOGS API SUBSCRIPTION
logsClient := logsapi.New(runtimeAPI)
telemetryClient := telemetryapi.New(runtimeAPI)

destination := logsapi.Destination{
destination := telemetryapi.Destination{
Protocol: "HTTP",
URI: logsapi.URI(fmt.Sprintf("http://sandbox.localdomain:%s/", logsPort)),
URI: telemetryapi.URI(fmt.Sprintf("http://sandbox.localdomain:%s/", logsPort)),
HttpMethod: "POST",
Encoding: "JSON",
}

bufferingCfg := logsapi.BufferingCfg{
bufferingCfg := telemetryapi.BufferingCfg{
MaxItems: uint32(defaultMaxItems),
MaxBytes: uint32(defaultMaxBytes),
TimeoutMS: uint32(defaultTimeoutMS),
}

_, err = logsClient.Subscribe(ctx, []string{"function", "platform"}, bufferingCfg, destination, extensionClient.ExtensionID)
_, err = telemetryClient.Subscribe(ctx, []string{"function", "platform"}, bufferingCfg, destination, extensionClient.ExtensionID)
if err != nil {
return err
}
Expand Down
60 changes: 23 additions & 37 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"io"
"net/http"
"regexp"
"strings"

"os"
"strconv"
Expand Down Expand Up @@ -38,8 +36,6 @@ var (
axiomMetaInfo = map[string]string{}
)

var logLineRgx, _ = regexp.Compile(`^([0-9.:TZ-]{20,})\s+([0-9a-f-]{36})\s+(ERROR|INFO|WARN|DEBUG|TRACE)\s+(?s:(.*))`)

func init() {
logger, _ = zap.NewProduction()

Expand Down Expand Up @@ -82,16 +78,37 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc
}

notifyRuntimeDone := false
requestID := ""

for _, e := range events {
e["message"] = ""
// if reocrd key exists, extract the requestId and message from it
if rec, ok := e["record"]; ok {
if record, ok := rec.(map[string]any); ok {
// capture requestId and set it if it exists
if reqID, ok := record["requestId"]; ok {
requestID = reqID.(string)
}
if e["type"] == "function" {
// set message
e["message"] = record["message"].(string)
}
}
}

// attach the lambda information to the event
e["lambda"] = lambdaMetaInfo
e["axiom"] = axiomMetaInfo
// replace the time field with axiom's _time
e["_time"], e["time"] = e["time"], nil

if e["type"] == "function" {
extractEventMessage(e)
// If we didn't find a message in record field, move the record to message
// and assign requestId
if e["type"] == "function" && e["message"] == "" {
e["message"] = e["record"]
e["record"] = map[string]string{
"requestId": requestID,
}
}

// decide if the handler should notify the extension that the runtime is done
Expand All @@ -115,34 +132,3 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc
}
}
}

// extractEventMessage extracts the message from the record field and puts it in the message field
// it detects if the record is a json string or a text log line that confirms to AWS log line formatting.
func extractEventMessage(e map[string]any) {
e["message"] = e["record"]
if recordStr, ok := e["record"].(string); ok && len(recordStr) > 0 {
recordStr = strings.Trim(recordStr, "\n")
// parse the record
// first check if the record is a json object, if not parse it as a text log line
if recordStr[0] == '{' && recordStr[len(recordStr)-1] == '}' {
var record map[string]any
err := json.Unmarshal([]byte(recordStr), &record)
if err != nil {
logger.Error("Error unmarshalling record:", zap.Error(err))
// do not return, we want to continue processing the event
} else {
if level, ok := record["level"].(string); ok {
record["level"] = strings.ToLower(level)
}
e["level"] = record["level"]
e["record"] = record
}
} else {
matches := logLineRgx.FindStringSubmatch(recordStr)
if len(matches) == 5 {
e["level"] = strings.ToLower(matches[3])
e["record"] = map[string]any{"requestId": matches[2], "message": matches[4], "timestamp": matches[1], "level": e["level"]}
}
}
}
}
98 changes: 0 additions & 98 deletions server/server_test.go

This file was deleted.

12 changes: 6 additions & 6 deletions logsapi/logs.go → telemetryapi/telemetry.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package logsapi
package telemetryapi

import (
"bytes"
Expand Down Expand Up @@ -68,19 +68,19 @@ type SubscribeResponse struct {

const (
lambdaAgentIdentifierHeaderKey = "Lambda-Extension-Identifier"
SchemaVersion20210318 = "2021-03-18"
SchemaVersionLatest = SchemaVersion20210318
SchemaVersion20221213 = "2022-12-13"
SchemaVersionLatest = SchemaVersion20221213
)

func New(LogsAPI string) *Client {
func New(runtimeAPI string) *Client {
return &Client{
baseURL: fmt.Sprintf("http://%s/2020-08-15", LogsAPI),
baseURL: fmt.Sprintf("http://%s/2022-07-01", runtimeAPI),
httpClient: &http.Client{},
}
}

func (lc *Client) Subscribe(ctx context.Context, types []string, bufferingCfg BufferingCfg, destination Destination, extensionID string) (*SubscribeResponse, error) {
subscribeEndpoint := lc.baseURL + "/logs"
subscribeEndpoint := lc.baseURL + "/telemetry"

subReq, err := json.Marshal(
&SubscribeRequest{
Expand Down

0 comments on commit 7add6a4

Please sign in to comment.