From 99c0159c7e31bf37ec5bad23cc4a2b3d9364b7fb Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 3 Nov 2023 08:30:14 +0100 Subject: [PATCH] Add tooling to easily download chunks from ingesters and parse it (#6523) * Add tooling to easily download chunks from ingesters and parse it Signed-off-by: Marco Pracucci * Added README Signed-off-by: Marco Pracucci * Fixed linter Signed-off-by: Marco Pracucci * Added missing license Signed-off-by: Marco Pracucci --------- Signed-off-by: Marco Pracucci --- tools/grpcurl-query-ingesters/README.md | 9 ++ .../download-chunks-from-ingesters-query.json | 11 ++ .../download-chunks-from-ingesters.sh | 36 ++++++ tools/grpcurl-query-ingesters/main.go | 88 +++++++++++++++ tools/grpcurl-query-ingesters/response.go | 105 ++++++++++++++++++ 5 files changed, 249 insertions(+) create mode 100644 tools/grpcurl-query-ingesters/README.md create mode 100644 tools/grpcurl-query-ingesters/download-chunks-from-ingesters-query.json create mode 100644 tools/grpcurl-query-ingesters/download-chunks-from-ingesters.sh create mode 100644 tools/grpcurl-query-ingesters/main.go create mode 100644 tools/grpcurl-query-ingesters/response.go diff --git a/tools/grpcurl-query-ingesters/README.md b/tools/grpcurl-query-ingesters/README.md new file mode 100644 index 0000000000..6936f29c76 --- /dev/null +++ b/tools/grpcurl-query-ingesters/README.md @@ -0,0 +1,9 @@ +# What is this? + +A simple hacky script + tool to download chunks from ingesters and dump their content. + +# How to use it + +1. Edit `download-chunks-from-ingesters-query.json` with the label matchers and time range to query. +2. Edit `download-chunks-from-ingesters.sh` with the configuration about the Kubernetes namespace and Mimir tenant to query. +3. Once you've got the dump (1 file per ingester), run the go tool in this directory to print the dump content of 1+ files. diff --git a/tools/grpcurl-query-ingesters/download-chunks-from-ingesters-query.json b/tools/grpcurl-query-ingesters/download-chunks-from-ingesters-query.json new file mode 100644 index 0000000000..c65791da6f --- /dev/null +++ b/tools/grpcurl-query-ingesters/download-chunks-from-ingesters-query.json @@ -0,0 +1,11 @@ +{ + "start_timestamp_ms": 1698746364317, + "end_timestamp_ms": 1698748164317, + "matchers": [ + { + "type": 0, + "name": "__name__", + "value": "test_series" + } + ] +} \ No newline at end of file diff --git a/tools/grpcurl-query-ingesters/download-chunks-from-ingesters.sh b/tools/grpcurl-query-ingesters/download-chunks-from-ingesters.sh new file mode 100644 index 0000000000..14bb1b32b8 --- /dev/null +++ b/tools/grpcurl-query-ingesters/download-chunks-from-ingesters.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +# SPDX-License-Identifier: AGPL-3.0-only + +# Begin of configuration. +K8S_CONTEXT="" +K8S_NAMESPACE="" +MIMIR_TENANT_ID="" +# End of configuration. + +mkdir -p chunks-dump/ + +# Get list of pods. +PODS=$(kubectl --context "$K8S_CONTEXT" -n "$K8S_NAMESPACE" get pods --no-headers | grep ingester | awk '{print $1}') + +for POD in $PODS; do + echo "Querying $POD" + + # Open port-forward + kubectl port-forward --context "$K8S_CONTEXT" -n "$K8S_NAMESPACE" "$POD" 9095:9095 & + KUBECTL_PID=$! + + # Wait some time + sleep 5 + + cat query.json | grpcurl \ + -d @ \ + -H "X-Scope-OrgID: $MIMIR_TENANT_ID" \ + -proto pkg/ingester/client/ingester.proto \ + -import-path . \ + -import-path ./vendor \ + -plaintext \ + localhost:9095 "cortex.Ingester/QueryStream" > "chunks-dump/$POD" + + kill $KUBECTL_PID + wait $KUBECTL_PID +done diff --git a/tools/grpcurl-query-ingesters/main.go b/tools/grpcurl-query-ingesters/main.go new file mode 100644 index 0000000000..c658428a10 --- /dev/null +++ b/tools/grpcurl-query-ingesters/main.go @@ -0,0 +1,88 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package main + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "os" + "time" + + "github.com/grafana/dskit/flagext" + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +func main() { + args, err := flagext.ParseFlagsAndArguments(flag.CommandLine) + if err != nil { + fmt.Println("Failed to parse CLI arguments:", err.Error()) + os.Exit(1) + } + + for _, arg := range args { + res, err := parseFile(arg) + if err != nil { + fmt.Println("Failed to parse file:", err.Error()) + os.Exit(1) + } + + dumpResponse(res) + } +} + +func parseFile(file string) (QueryStreamResponse, error) { + res := QueryStreamResponse{} + + fileData, err := os.ReadFile(file) + if err != nil { + return res, err + } + + // Decode file. + decoder := json.NewDecoder(bytes.NewReader(fileData)) + if err := decoder.Decode(&res); err != nil { + return res, err + } + + return res, nil +} + +func dumpResponse(res QueryStreamResponse) { + for _, series := range res.Chunkseries { + fmt.Println(series.LabelSet().String()) + + for _, chunk := range series.Chunks { + fmt.Printf( + "- Chunk: %s - %s\n", + chunk.StartTime().Format(time.TimeOnly), + chunk.EndTime().Format(time.TimeOnly)) + + chunkIterator := chunk.EncodedChunk().NewIterator(nil) + for { + sampleType := chunkIterator.Scan() + if sampleType == chunkenc.ValNone { + break + } + + switch sampleType { + case chunkenc.ValFloat: + fmt.Println(" - Sample:", sampleType.String(), "ts:", chunkIterator.Timestamp(), "value:", chunkIterator.Value().Value) + case chunkenc.ValHistogram: + ts, value := chunkIterator.AtHistogram() + fmt.Println(" - Sample:", sampleType.String(), "ts:", ts, "value:", value) + case chunkenc.ValFloatHistogram: + ts, value := chunkIterator.AtFloatHistogram() + fmt.Println(" - Sample:", sampleType.String(), "ts:", ts, "value:", value) + default: + panic(fmt.Errorf("unknown sample type %s", sampleType.String())) + } + } + + if chunkIterator.Err() != nil { + panic(chunkIterator.Err()) + } + } + } +} diff --git a/tools/grpcurl-query-ingesters/response.go b/tools/grpcurl-query-ingesters/response.go new file mode 100644 index 0000000000..78ccb325b4 --- /dev/null +++ b/tools/grpcurl-query-ingesters/response.go @@ -0,0 +1,105 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package main + +import ( + "encoding/base64" + "strconv" + "time" + + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/mimir/pkg/storage/chunk" +) + +type QueryStreamResponse struct { + Chunkseries []QueryStreamChunkseries `json:"chunkseries"` +} + +type QueryStreamChunkseries struct { + Labels []Label `json:"labels"` + Chunks []Chunk `json:"chunks"` +} + +func (c QueryStreamChunkseries) LabelSet() labels.Labels { + builder := labels.NewScratchBuilder(len(c.Labels)) + for _, label := range c.Labels { + builder.Add(label.Name(), label.Value()) + } + return builder.Labels() +} + +type Label struct { + EncodedName string `json:"name"` + EncodedValue string `json:"value"` +} + +func (l Label) Name() string { + name, err := base64.StdEncoding.DecodeString(l.EncodedName) + if err != nil { + panic(err) + } + + return string(name) +} + +func (l Label) Value() string { + value, err := base64.StdEncoding.DecodeString(l.EncodedValue) + if err != nil { + panic(err) + } + + return string(value) +} + +type Chunk struct { + StartTimestampMs string `json:"startTimestampMs"` + EndTimestampMs string `json:"endTimestampMs"` + Encoding int `json:"encoding"` + EncodedData string `json:"data"` +} + +func (c Chunk) StartTimestamp() int64 { + value, err := strconv.ParseInt(c.StartTimestampMs, 10, 64) + if err != nil { + panic(err) + } + + return value +} + +func (c Chunk) EndTimestamp() int64 { + value, err := strconv.ParseInt(c.EndTimestampMs, 10, 64) + if err != nil { + panic(err) + } + + return value +} + +func (c Chunk) StartTime() time.Time { + return time.UnixMilli(c.StartTimestamp()).UTC() +} + +func (c Chunk) EndTime() time.Time { + return time.UnixMilli(c.EndTimestamp()).UTC() +} + +func (c Chunk) EncodedChunk() chunk.EncodedChunk { + data, err := base64.StdEncoding.DecodeString(c.EncodedData) + if err != nil { + panic(err) + } + + dataChunk, err := chunk.NewForEncoding(chunk.Encoding(c.Encoding)) + if err != nil { + panic(err) + } + + err = dataChunk.UnmarshalFromBuf(data) + if err != nil { + panic(err) + } + + return dataChunk +}