diff --git a/Makefile b/Makefile index e9de60b..e55ecfa 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ package: build cd bin && zip -r extension.zip extensions publish: package - aws lambda publish-layer-version --layer-name axiom-development-lambda-extension-go --region eu-west-1 --zip-file "fileb://bin/extension.zip" --compatible-architectures ${GOARCH} --description 'axiom lambda extension to push lambda logs to https://axiom.co' + aws lambda publish-layer-version --layer-name axiom-development-lambda-extension-go --region eu-west-1 --zip-file "fileb://bin/extension.zip" --compatible-architectures ${GOARCH} --description 'axiom lambda extension to push lambda logs to https://axiom.co' arch: echo ${GOARCH} diff --git a/extension/extension.go b/extension/extension.go index d9eb9d7..e392abc 100644 --- a/extension/extension.go +++ b/extension/extension.go @@ -41,9 +41,9 @@ const ( extensionIdentifierHeader = "Lambda-Extension-Identifier" ) -func New(LogsAPI string) *Client { +func New(telemetryAPI string) *Client { return &Client{ - baseURL: fmt.Sprintf("http://%s/2020-01-01/extension", LogsAPI), + baseURL: fmt.Sprintf("http://%s/2020-01-01/extension", telemetryAPI), httpClient: &http.Client{}, } } diff --git a/main.go b/main.go index b437f65..8e038dc 100644 --- a/main.go +++ b/main.go @@ -14,8 +14,8 @@ import ( "github.com/axiomhq/axiom-lambda-extension/extension" "github.com/axiomhq/axiom-lambda-extension/flusher" - "github.com/axiomhq/axiom-lambda-extension/logsapi" "github.com/axiomhq/axiom-lambda-extension/server" + "github.com/axiomhq/axiom-lambda-extension/telemetryapi" ) var ( @@ -92,22 +92,22 @@ func Run() error { } // LOGS API SUBSCRIPTION - logsClient := logsapi.New(runtimeAPI) + telemetryClient := telemetryapi.New(runtimeAPI) - destination := logsapi.Destination{ + destination := telemetryapi.Destination{ Protocol: "HTTP", - URI: logsapi.URI(fmt.Sprintf("http://sandbox.localdomain:%s/", logsPort)), + URI: telemetryapi.URI(fmt.Sprintf("http://sandbox.localdomain:%s/", logsPort)), HttpMethod: "POST", Encoding: "JSON", } - bufferingCfg := logsapi.BufferingCfg{ + bufferingCfg := telemetryapi.BufferingCfg{ MaxItems: uint32(defaultMaxItems), MaxBytes: uint32(defaultMaxBytes), TimeoutMS: uint32(defaultTimeoutMS), } - _, err = logsClient.Subscribe(ctx, []string{"function", "platform"}, bufferingCfg, destination, extensionClient.ExtensionID) + _, err = telemetryClient.Subscribe(ctx, []string{"function", "platform"}, bufferingCfg, destination, extensionClient.ExtensionID) if err != nil { return err } diff --git a/server/server.go b/server/server.go index f3a9ddf..499e4ca 100644 --- a/server/server.go +++ b/server/server.go @@ -5,8 +5,6 @@ import ( "fmt" "io" "net/http" - "regexp" - "strings" "os" "strconv" @@ -38,8 +36,6 @@ var ( axiomMetaInfo = map[string]string{} ) -var logLineRgx, _ = regexp.Compile(`^([0-9.:TZ-]{20,})\s+([0-9a-f-]{36})\s+(ERROR|INFO|WARN|DEBUG|TRACE)\s+(?s:(.*))`) - func init() { logger, _ = zap.NewProduction() @@ -82,16 +78,37 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc } notifyRuntimeDone := false + requestID := "" for _, e := range events { + e["message"] = "" + // if reocrd key exists, extract the requestId and message from it + if rec, ok := e["record"]; ok { + if record, ok := rec.(map[string]any); ok { + // capture requestId and set it if it exists + if reqID, ok := record["requestId"]; ok { + requestID = reqID.(string) + } + if e["type"] == "function" { + // set message + e["message"] = record["message"].(string) + } + } + } + // attach the lambda information to the event e["lambda"] = lambdaMetaInfo e["axiom"] = axiomMetaInfo // replace the time field with axiom's _time e["_time"], e["time"] = e["time"], nil - if e["type"] == "function" { - extractEventMessage(e) + // If we didn't find a message in record field, move the record to message + // and assign requestId + if e["type"] == "function" && e["message"] == "" { + e["message"] = e["record"] + e["record"] = map[string]string{ + "requestId": requestID, + } } // decide if the handler should notify the extension that the runtime is done @@ -115,34 +132,3 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc } } } - -// extractEventMessage extracts the message from the record field and puts it in the message field -// it detects if the record is a json string or a text log line that confirms to AWS log line formatting. -func extractEventMessage(e map[string]any) { - e["message"] = e["record"] - if recordStr, ok := e["record"].(string); ok && len(recordStr) > 0 { - recordStr = strings.Trim(recordStr, "\n") - // parse the record - // first check if the record is a json object, if not parse it as a text log line - if recordStr[0] == '{' && recordStr[len(recordStr)-1] == '}' { - var record map[string]any - err := json.Unmarshal([]byte(recordStr), &record) - if err != nil { - logger.Error("Error unmarshalling record:", zap.Error(err)) - // do not return, we want to continue processing the event - } else { - if level, ok := record["level"].(string); ok { - record["level"] = strings.ToLower(level) - } - e["level"] = record["level"] - e["record"] = record - } - } else { - matches := logLineRgx.FindStringSubmatch(recordStr) - if len(matches) == 5 { - e["level"] = strings.ToLower(matches[3]) - e["record"] = map[string]any{"requestId": matches[2], "message": matches[4], "timestamp": matches[1], "level": e["level"]} - } - } - } -} diff --git a/server/server_test.go b/server/server_test.go deleted file mode 100644 index 4211e5d..0000000 --- a/server/server_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package server - -import ( - "testing" -) - -func TestMessageExtraction(t *testing.T) { - testCases := []struct { - name string - input string - expected map[string]any - }{ - { - name: "error messages on multiple lines", - input: "2024-01-16T08:53:51.919Z 4b995efa-75f8-4fdc-92af-0882c79f47a1 ERROR testing sending an error\nand this is a new line inside the error \n and a new line \n bye", - expected: map[string]any{ - "level": "error", - "message": "SAME_AS_INPUT_NO_NEED_TO_DUPLICATE_INPUT_HERE", - "record": map[string]any{"requestId": "4b995efa-75f8-4fdc-92af-0882c79f47a1", "message": "testing sending an error\nand this is a new line inside the error \n and a new line \n bye", "timestamp": "2024-01-16T08:53:51.919Z", "level": "error"}, - }, - }, - { - name: "info messages", - input: "2024-01-16T08:53:51.919Z 4b995efa-75f8-4fdc-92af-0882c79f47a2 INFO Hello, world!", - expected: map[string]any{ - "level": "info", - "message": "SAME_AS_INPUT_NO_NEED_TO_DUPLICATE_INPUT_HERE", - "record": map[string]any{"requestId": "4b995efa-75f8-4fdc-92af-0882c79f47a2", "message": "Hello, world!", "timestamp": "2024-01-16T08:53:51.919Z", "level": "info"}, - }, - }, - { - name: "warn messages", - input: "2024-01-16T08:53:51.919Z 4b995efa-75f8-4fdc-92af-0882c79f47a3 WARN head my warning", - expected: map[string]any{ - "level": "warn", - "message": "SAME_AS_INPUT_NO_NEED_TO_DUPLICATE_INPUT_HERE", - "record": map[string]any{"requestId": "4b995efa-75f8-4fdc-92af-0882c79f47a3", "message": "head my warning", "timestamp": "2024-01-16T08:53:51.919Z", "level": "warn"}, - }, - }, - { - name: "trace messages", - input: "2024-01-16T08:53:51.919Z 4b995efa-75f8-4fdc-92af-0882c79f47a4 TRACE this is a trace \n with information on a new line.", - expected: map[string]any{ - "level": "trace", - "message": "SAME_AS_INPUT_NO_NEED_TO_DUPLICATE_INPUT_HERE", - "record": map[string]any{"requestId": "4b995efa-75f8-4fdc-92af-0882c79f47a4", "message": "this is a trace \n with information on a new line.", "timestamp": "2024-01-16T08:53:51.919Z", "level": "trace"}, - }, - }, - { - name: "debug messages", - input: "2024-01-16T08:53:51.919Z 4b995efa-75f8-4fdc-92af-0882c79f47a5 DEBUG Debugging is fun!", - expected: map[string]any{ - "level": "debug", - "message": "SAME_AS_INPUT_NO_NEED_TO_DUPLICATE_INPUT_HERE", - "record": map[string]any{"requestId": "4b995efa-75f8-4fdc-92af-0882c79f47a5", "message": "Debugging is fun!", "timestamp": "2024-01-16T08:53:51.919Z", "level": "debug"}, - }, - }, - { - name: "testing json messages", - input: `{"timestamp":"2024-01-08T16:48:45.316Z","level":"INFO","requestId":"de126cf0-6124-426c-818a-174983fbfc4b","message":"foo != bar"}`, - expected: map[string]any{ - "level": "info", - "message": "SAME_AS_INPUT_NO_NEED_TO_DUPLICATE_INPUT_HERE", - "record": map[string]any{"requestId": "de126cf0-6124-426c-818a-174983fbfc4b", "message": "foo != bar", "timestamp": "2024-01-08T16:48:45.316Z", "level": "info"}, - }, - }, - } - - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - e := make(map[string]any) - e["record"] = testCase.input - extractEventMessage(e) - if e["level"] != testCase.expected["level"] { - t.Errorf("Expected level to be %s, got %s", testCase.expected["level"], e["level"]) - } - if e["message"] != testCase.input { // the message field should contain the original input - t.Errorf("Expected message to be %s, got %s", testCase.input, e["message"]) - } - - expectedRecord := testCase.expected["record"].(map[string]any) - outputRecord := e["record"].(map[string]any) - - if outputRecord["timestamp"] != expectedRecord["timestamp"] { - t.Errorf("Expected timestamp to be %s, got %s", testCase.expected["timestamp"], e["timestamp"]) - } - if outputRecord["level"] != expectedRecord["level"] { - t.Errorf("Expected record.level to be %s, got %s", expectedRecord["level"], outputRecord["level"]) - } - if outputRecord["requestId"] != expectedRecord["requestId"] { - t.Errorf("Expected record.requestId to be %s, got %s", expectedRecord["requestId"], outputRecord["requestId"]) - } - if outputRecord["message"] != expectedRecord["message"] { - t.Errorf("Expected record.message to be %s, got %s", expectedRecord["message"], outputRecord["message"]) - } - }) - } -} diff --git a/logsapi/logs.go b/telemetryapi/telemetry.go similarity index 90% rename from logsapi/logs.go rename to telemetryapi/telemetry.go index 8fb380b..0fa7bff 100644 --- a/logsapi/logs.go +++ b/telemetryapi/telemetry.go @@ -1,4 +1,4 @@ -package logsapi +package telemetryapi import ( "bytes" @@ -68,19 +68,19 @@ type SubscribeResponse struct { const ( lambdaAgentIdentifierHeaderKey = "Lambda-Extension-Identifier" - SchemaVersion20210318 = "2021-03-18" - SchemaVersionLatest = SchemaVersion20210318 + SchemaVersion20221213 = "2022-12-13" + SchemaVersionLatest = SchemaVersion20221213 ) -func New(LogsAPI string) *Client { +func New(runtimeAPI string) *Client { return &Client{ - baseURL: fmt.Sprintf("http://%s/2020-08-15", LogsAPI), + baseURL: fmt.Sprintf("http://%s/2022-07-01", runtimeAPI), httpClient: &http.Client{}, } } func (lc *Client) Subscribe(ctx context.Context, types []string, bufferingCfg BufferingCfg, destination Destination, extensionID string) (*SubscribeResponse, error) { - subscribeEndpoint := lc.baseURL + "/logs" + subscribeEndpoint := lc.baseURL + "/telemetry" subReq, err := json.Marshal( &SubscribeRequest{