Skip to content

Commit

Permalink
Add tooling to easily download chunks from ingesters and parse it (#6523
Browse files Browse the repository at this point in the history
)

* Add tooling to easily download chunks from ingesters and parse it

Signed-off-by: Marco Pracucci <[email protected]>

* Added README

Signed-off-by: Marco Pracucci <[email protected]>

* Fixed linter

Signed-off-by: Marco Pracucci <[email protected]>

* Added missing license

Signed-off-by: Marco Pracucci <[email protected]>

---------

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Nov 3, 2023
1 parent ce59772 commit 99c0159
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 0 deletions.
9 changes: 9 additions & 0 deletions tools/grpcurl-query-ingesters/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# What is this?

A simple hacky script + tool to download chunks from ingesters and dump their content.

# How to use it

1. Edit `download-chunks-from-ingesters-query.json` with the label matchers and time range to query.
2. Edit `download-chunks-from-ingesters.sh` with the configuration about the Kubernetes namespace and Mimir tenant to query.
3. Once you've got the dump (1 file per ingester), run the go tool in this directory to print the dump content of 1+ files.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"start_timestamp_ms": 1698746364317,
"end_timestamp_ms": 1698748164317,
"matchers": [
{
"type": 0,
"name": "__name__",
"value": "test_series"
}
]
}
36 changes: 36 additions & 0 deletions tools/grpcurl-query-ingesters/download-chunks-from-ingesters.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/usr/bin/env bash
# SPDX-License-Identifier: AGPL-3.0-only

# Begin of configuration.
K8S_CONTEXT=""
K8S_NAMESPACE=""
MIMIR_TENANT_ID=""
# End of configuration.

mkdir -p chunks-dump/

# Get list of pods.
PODS=$(kubectl --context "$K8S_CONTEXT" -n "$K8S_NAMESPACE" get pods --no-headers | grep ingester | awk '{print $1}')

for POD in $PODS; do
echo "Querying $POD"

# Open port-forward
kubectl port-forward --context "$K8S_CONTEXT" -n "$K8S_NAMESPACE" "$POD" 9095:9095 &
KUBECTL_PID=$!

# Wait some time
sleep 5

cat query.json | grpcurl \
-d @ \
-H "X-Scope-OrgID: $MIMIR_TENANT_ID" \
-proto pkg/ingester/client/ingester.proto \
-import-path . \
-import-path ./vendor \
-plaintext \
localhost:9095 "cortex.Ingester/QueryStream" > "chunks-dump/$POD"

kill $KUBECTL_PID
wait $KUBECTL_PID
done
88 changes: 88 additions & 0 deletions tools/grpcurl-query-ingesters/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// SPDX-License-Identifier: AGPL-3.0-only

package main

import (
"bytes"
"encoding/json"
"flag"
"fmt"
"os"
"time"

"github.com/grafana/dskit/flagext"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)

func main() {
args, err := flagext.ParseFlagsAndArguments(flag.CommandLine)
if err != nil {
fmt.Println("Failed to parse CLI arguments:", err.Error())
os.Exit(1)
}

for _, arg := range args {
res, err := parseFile(arg)
if err != nil {
fmt.Println("Failed to parse file:", err.Error())
os.Exit(1)
}

dumpResponse(res)
}
}

func parseFile(file string) (QueryStreamResponse, error) {
res := QueryStreamResponse{}

fileData, err := os.ReadFile(file)
if err != nil {
return res, err
}

// Decode file.
decoder := json.NewDecoder(bytes.NewReader(fileData))
if err := decoder.Decode(&res); err != nil {
return res, err
}

return res, nil
}

func dumpResponse(res QueryStreamResponse) {
for _, series := range res.Chunkseries {
fmt.Println(series.LabelSet().String())

for _, chunk := range series.Chunks {
fmt.Printf(
"- Chunk: %s - %s\n",
chunk.StartTime().Format(time.TimeOnly),
chunk.EndTime().Format(time.TimeOnly))

chunkIterator := chunk.EncodedChunk().NewIterator(nil)
for {
sampleType := chunkIterator.Scan()
if sampleType == chunkenc.ValNone {
break
}

switch sampleType {
case chunkenc.ValFloat:
fmt.Println(" - Sample:", sampleType.String(), "ts:", chunkIterator.Timestamp(), "value:", chunkIterator.Value().Value)
case chunkenc.ValHistogram:
ts, value := chunkIterator.AtHistogram()
fmt.Println(" - Sample:", sampleType.String(), "ts:", ts, "value:", value)
case chunkenc.ValFloatHistogram:
ts, value := chunkIterator.AtFloatHistogram()
fmt.Println(" - Sample:", sampleType.String(), "ts:", ts, "value:", value)
default:
panic(fmt.Errorf("unknown sample type %s", sampleType.String()))
}
}

if chunkIterator.Err() != nil {
panic(chunkIterator.Err())
}
}
}
}
105 changes: 105 additions & 0 deletions tools/grpcurl-query-ingesters/response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// SPDX-License-Identifier: AGPL-3.0-only

package main

import (
"encoding/base64"
"strconv"
"time"

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/mimir/pkg/storage/chunk"
)

type QueryStreamResponse struct {
Chunkseries []QueryStreamChunkseries `json:"chunkseries"`
}

type QueryStreamChunkseries struct {
Labels []Label `json:"labels"`
Chunks []Chunk `json:"chunks"`
}

func (c QueryStreamChunkseries) LabelSet() labels.Labels {
builder := labels.NewScratchBuilder(len(c.Labels))
for _, label := range c.Labels {
builder.Add(label.Name(), label.Value())
}
return builder.Labels()
}

type Label struct {
EncodedName string `json:"name"`
EncodedValue string `json:"value"`
}

func (l Label) Name() string {
name, err := base64.StdEncoding.DecodeString(l.EncodedName)
if err != nil {
panic(err)
}

return string(name)
}

func (l Label) Value() string {
value, err := base64.StdEncoding.DecodeString(l.EncodedValue)
if err != nil {
panic(err)
}

return string(value)
}

type Chunk struct {
StartTimestampMs string `json:"startTimestampMs"`
EndTimestampMs string `json:"endTimestampMs"`
Encoding int `json:"encoding"`
EncodedData string `json:"data"`
}

func (c Chunk) StartTimestamp() int64 {
value, err := strconv.ParseInt(c.StartTimestampMs, 10, 64)
if err != nil {
panic(err)
}

return value
}

func (c Chunk) EndTimestamp() int64 {
value, err := strconv.ParseInt(c.EndTimestampMs, 10, 64)
if err != nil {
panic(err)
}

return value
}

func (c Chunk) StartTime() time.Time {
return time.UnixMilli(c.StartTimestamp()).UTC()
}

func (c Chunk) EndTime() time.Time {
return time.UnixMilli(c.EndTimestamp()).UTC()
}

func (c Chunk) EncodedChunk() chunk.EncodedChunk {
data, err := base64.StdEncoding.DecodeString(c.EncodedData)
if err != nil {
panic(err)
}

dataChunk, err := chunk.NewForEncoding(chunk.Encoding(c.Encoding))
if err != nil {
panic(err)
}

err = dataChunk.UnmarshalFromBuf(data)
if err != nil {
panic(err)
}

return dataChunk
}

0 comments on commit 99c0159

Please sign in to comment.