Skip to content

Commit

Permalink
Log UI work
Browse files Browse the repository at this point in the history
  • Loading branch information
wardviaene committed Sep 27, 2024
1 parent 3b39613 commit ab46309
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 104 deletions.
5 changes: 3 additions & 2 deletions cmd/cloudwatch-ingestion/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"net/http"
"os"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
Expand Down Expand Up @@ -104,8 +103,10 @@ func fetchLogEvents(svc *cloudwatchlogs.Client, logGroupName, logStreamName stri
}

for _, event := range result.Events {
seconds := float64(*event.Timestamp / 1000)
microseconds := float64(*event.Timestamp%1000) * 1000
messages = append(messages, map[string]any{
"date": float64(*event.Timestamp * int64(time.Millisecond)),
"date": seconds + (microseconds / 1e6),
"log": *event.Message,
"log-group": logGroupName,
"log-stream": logStreamWithoutRandom,
Expand Down
4 changes: 2 additions & 2 deletions pkg/observability/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func (o *Observability) WriteBufferToStorage(n int64) error {
defer o.ActiveBufferWriters.Done()
o.WriteLock.Lock()
defer o.WriteLock.Unlock()
logging.DebugLog(fmt.Errorf("writing buffer to file. Buffer has: %d bytes", n))
// copy first to temporary buffer (storage might have latency)
tempBuf := bytes.NewBuffer(make([]byte, 0, n))
_, err := io.CopyN(tempBuf, o.Buffer, n)
Expand All @@ -28,7 +29,6 @@ func (o *Observability) WriteBufferToStorage(n int64) error {
o.LastFlushed = time.Now()

for _, bufferPosAndPrefix := range mergeBufferPosAndPrefix(prefix) {

now := time.Now()
filename := bufferPosAndPrefix.prefix + "/data-" + strconv.FormatInt(now.Unix(), 10) + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10)
err = ensurePath(o.Storage, filename)
Expand Down Expand Up @@ -79,7 +79,7 @@ func (o *Observability) Ingest(data io.ReadCloser) error {
if len(msgs) == 0 {
return nil // no messages to ingest
}
_, err = o.Buffer.Write(encodeMessage(msgs), floatToDate(msgs[0].Date).Format(DATE_PREFIX))
_, err = o.Buffer.Write(encodeMessage(msgs), FloatToDate(msgs[0].Date).Format(DATE_PREFIX))
if err != nil {
return fmt.Errorf("write error: %s", err)
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/observability/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -70,8 +71,16 @@ func (o *Observability) logsHandler(w http.ResponseWriter, r *http.Request) {
pos = i
}
}
search := r.FormValue("search")
out, err := o.getLogs(fromDate, endDate, pos, maxLines, offset, search)
displayTags := strings.Split(r.FormValue("display-tags"), ",")
filterTagsSplit := strings.Split(r.FormValue("filter-tags"), ",")
filterTags := []KeyValue{}
for _, tag := range filterTagsSplit {
kv := strings.Split(tag, "=")
if len(kv) == 2 {
filterTags = append(filterTags, KeyValue{Key: kv[0], Value: kv[1]})
}
}
out, err := o.getLogs(fromDate, endDate, pos, maxLines, offset, r.FormValue("search"), displayTags, filterTags)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Printf("get logs error: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/observability/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (o *Observability) returnError(w http.ResponseWriter, err error, statusCode
w.Write([]byte(`{"error": "` + strings.Replace(err.Error(), `"`, `\"`, -1) + `"}`))
}

func floatToDate(datetime float64) time.Time {
func FloatToDate(datetime float64) time.Time {
datetimeInt := int64(datetime)
decimals := datetime - float64(datetimeInt)
nsecs := int64(math.Round(decimals * 1_000_000)) // precision to match golang's time.Time
Expand Down
2 changes: 1 addition & 1 deletion pkg/observability/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
func TestFloatToDate2Way(t *testing.T) {
now := time.Now()
float := DateToFloat(now)
date := floatToDate(float)
date := FloatToDate(float)
if date.Format(TIMESTAMP_FORMAT) != now.Format(TIMESTAMP_FORMAT) {
t.Fatalf("got: %s, expected: %s", date.Format(TIMESTAMP_FORMAT), now.Format(TIMESTAMP_FORMAT))
}
Expand Down
50 changes: 35 additions & 15 deletions pkg/observability/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import (
"time"
)

func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, maxLogLines, offset int, search string) (LogEntryResponse, error) {
func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, maxLogLines, offset int, search string, displayTags []string, filterTags []KeyValue) (LogEntryResponse, error) {
logEntryResponse := LogEntryResponse{
Enabled: true,
Environments: []string{"dev", "qa", "prod"},
LogEntries: []LogEntry{},
Keys: KeyValueInt{},
Enabled: true,
LogEntries: []LogEntry{},
Tags: KeyValueInt{},
}

keys := make(map[KeyValue]int)
Expand Down Expand Up @@ -58,16 +57,37 @@ func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, maxLogLi
logMessage := decodeMessage(scanner.Bytes())
logline, ok := logMessage.Data["log"]
if ok {
timestamp := floatToDate(logMessage.Date).Add(time.Duration(offset) * time.Minute)
timestamp := FloatToDate(logMessage.Date).Add(time.Duration(offset) * time.Minute)
if search == "" || strings.Contains(logline, search) {
logEntry := LogEntry{
Timestamp: timestamp.Format(TIMESTAMP_FORMAT),
Data: logline,
tags := []KeyValue{}
for _, tag := range displayTags {
if tagValue, ok := logMessage.Data[tag]; ok {
tags = append(tags, KeyValue{Key: tag, Value: tagValue})
}
}
filterMessage := true
if len(filterTags) == 0 {
filterMessage = false
} else {
for _, filter := range filterTags {
if tagValue, ok := logMessage.Data[filter.Key]; ok {
if tagValue == filter.Value {
filterMessage = false
}
}
}
}
logEntryResponse.LogEntries = append(logEntryResponse.LogEntries, logEntry)
for k, v := range logMessage.Data {
if k != "log" {
keys[KeyValue{Key: k, Value: v}] += 1
if !filterMessage {
logEntry := LogEntry{
Timestamp: timestamp.Format(TIMESTAMP_FORMAT),
Data: logline,
Tags: tags,
}
logEntryResponse.LogEntries = append(logEntryResponse.LogEntries, logEntry)
for k, v := range logMessage.Data {
if k != "log" {
keys[KeyValue{Key: k, Value: v}] += 1
}
}
}
}
Expand All @@ -84,13 +104,13 @@ func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, maxLogLi
}

for k, v := range keys {
logEntryResponse.Keys = append(logEntryResponse.Keys, KeyValueTotal{
logEntryResponse.Tags = append(logEntryResponse.Tags, KeyValueTotal{
Key: k.Key,
Value: k.Value,
Total: v,
})
}
sort.Sort(logEntryResponse.Keys)
sort.Sort(logEntryResponse.Tags)

return logEntryResponse, nil
}
8 changes: 4 additions & 4 deletions pkg/observability/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ func TestGetLogs(t *testing.T) {
if len(logEntryResponse.LogEntries) != totalMessagesToGenerate {
t.Fatalf("didn't get the same log entries as messaged we generated: got: %d, expected: %d", len(logEntryResponse.LogEntries), totalMessagesToGenerate)
}
if logEntryResponse.LogEntries[0].Timestamp != floatToDate(timestamp).Format(TIMESTAMP_FORMAT) {
t.Fatalf("unexpected timestamp: %s vs %s", logEntryResponse.LogEntries[0].Timestamp, floatToDate(timestamp).Format(TIMESTAMP_FORMAT))
if logEntryResponse.LogEntries[0].Timestamp != FloatToDate(timestamp).Format(TIMESTAMP_FORMAT) {
t.Fatalf("unexpected timestamp: %s vs %s", logEntryResponse.LogEntries[0].Timestamp, FloatToDate(timestamp).Format(TIMESTAMP_FORMAT))
}
}

func TestFloatToDate(t *testing.T) {
for i := 0; i < 10; i++ {
now := time.Now()
floatDate := float64(now.Unix()) + float64(now.Nanosecond())/1e9
floatToDate := floatToDate(floatDate)
floatToDate := FloatToDate(floatDate)
if now.Unix() != floatToDate.Unix() {
t.Fatalf("times are not equal. Got: %v, expected: %v", floatToDate, now)
}
Expand All @@ -82,7 +82,7 @@ func TestFloatToDate(t *testing.T) {

func TestKeyValue(t *testing.T) {
logEntryResponse := LogEntryResponse{
Keys: KeyValueInt{
Tags: KeyValueInt{
{Key: "k", Value: "v", Total: 4},
},
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/observability/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ type BufferPosAndPrefix struct {
}

type LogEntryResponse struct {
Enabled bool `json:"enabled"`
LogEntries []LogEntry `json:"logEntries"`
Environments []string `json:"environments"`
Keys KeyValueInt `json:"keys"`
NextPos int64 `json:"nextPos"`
Enabled bool `json:"enabled"`
LogEntries []LogEntry `json:"logEntries"`
Tags KeyValueInt `json:"tags"`
NextPos int64 `json:"nextPos"`
}

type LogEntry struct {
Timestamp string `json:"timestamp"`
Data string `json:"data"`
Timestamp string `json:"timestamp"`
Data string `json:"data"`
Tags []KeyValue `json:"tags"`
}

type KeyValueInt []KeyValueTotal
Expand All @@ -61,8 +61,8 @@ type KeyValueTotal struct {
Total int
}
type KeyValue struct {
Key string
Value string
Key string `json:"key"`
Value string `json:"value"`
}

func (kv KeyValueInt) MarshalJSON() ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/rest/login/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,6 @@ func TestAuthenticateMFAWithToken(t *testing.T) {
t.Fatalf("authentication error: %s", err)
}
if !loginResp.Authenticated {
t.Fatalf("expected not to be authenticated")
t.Fatalf("expected to be authenticated")
}
}
Loading

0 comments on commit ab46309

Please sign in to comment.