Skip to content

Commit

Permalink
[PLAT-118505] fix receive write metrics (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnyi authored Sep 17, 2024
2 parents 00fc9e6 + 8864c3c commit 1a3b559
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 13 deletions.
12 changes: 6 additions & 6 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,11 +711,11 @@ type remoteWriteParams struct {
alreadyReplicated bool
}

func (h *Handler) gatherWriteStats(localWrites map[endpointReplica]map[string]trackedSeries) tenantRequestStats {
func (h *Handler) gatherWriteStats(remoteWrites map[endpointReplica]map[string]trackedSeries) tenantRequestStats {
var stats tenantRequestStats = make(tenantRequestStats)

for er := range localWrites {
for tenant, series := range localWrites[er] {
for er := range remoteWrites {
for tenant, series := range remoteWrites[er] {
samples := 0

for _, ts := range series.timeSeries {
Expand Down Expand Up @@ -743,7 +743,6 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) (
ctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), ctx), h.options.ForwardTimeout)

var writeErrors writeErrors
var stats tenantRequestStats = make(tenantRequestStats)

defer func() {
if writeErrors.ErrOrNil() != nil {
Expand All @@ -763,10 +762,11 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) (
localWrites, remoteWrites, err := h.distributeTimeseriesToReplicas(params.tenant, params.replicas, params.writeRequest.Timeseries)
if err != nil {
level.Error(requestLogger).Log("msg", "failed to distribute timeseries to replicas", "err", err)
return stats, err
return tenantRequestStats{}, err
}

stats = h.gatherWriteStats(localWrites)
// Specific to Databricks setup, we only measure remote writes
stats := h.gatherWriteStats(remoteWrites)

// Prepare a buffered channel to receive the responses from the local and remote writes. Remote writes will all go
// asynchronously and with this capacity we will never block on writing to the channel.
Expand Down
29 changes: 22 additions & 7 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ type dnsResolver struct {
logger log.Logger
target resolver.Target
cc resolver.ClientConn
addrStore map[string][]string
addrStore *dnsResolverBuilder
}

func (r *dnsResolver) start() {
addrStrs := r.addrStore[r.target.Endpoint()]
addrStrs := r.addrStore.get(r.target.Endpoint())
addrs := make([]resolver.Address, len(addrStrs))
for i, s := range addrStrs {
addrs[i] = resolver.Address{Addr: s}
Expand All @@ -88,14 +88,28 @@ func (*dnsResolver) Close() {}
type dnsResolverBuilder struct {
logger log.Logger
addrStore map[string][]string

sync.Mutex
}

func (b *dnsResolverBuilder) get(addr string) []string {
b.Lock()
defer b.Unlock()
return b.addrStore[addr]
}

func (b *dnsResolverBuilder) updateStore(addr string, ips []string) {
b.Lock()
defer b.Unlock()
b.addrStore[addr] = ips
}

func (b *dnsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r := &dnsResolver{
logger: b.logger,
target: target,
cc: cc,
addrStore: b.addrStore,
addrStore: b,
}
r.start()
return r, nil
Expand Down Expand Up @@ -1841,8 +1855,6 @@ func TestHandlerFlippingHashrings(t *testing.T) {
}

func TestIngestorRestart(t *testing.T) {
// TODO: fix this test. It has a data race.
t.Skip("Skipping this test case temporarily due to a data race")
var err error
logger := log.NewLogfmtLogger(os.Stderr)
addr1, addr2, addr3 := "localhost:14090", "localhost:14091", "localhost:14092"
Expand Down Expand Up @@ -1884,15 +1896,18 @@ func TestIngestorRestart(t *testing.T) {
},
}

_, err = client.handleRequest(ctx, 0, "test", data)
stats, err := client.handleRequest(ctx, 0, "test", data)
require.NoError(t, err)
require.Equal(t, tenantRequestStats{
"test": requestStats{timeseries: 2, totalSamples: 2},
}, stats)

// close srv2 to simulate ingestor down
ing2.Shutdown(err)
ing3 := startIngestor(logger, addr3, 2*time.Second)
defer ing3.Shutdown(err)
// bind the new backend to the same DNS
dnsBuilder.addrStore[clientAddr] = []string{addr3}
dnsBuilder.updateStore(clientAddr, []string{addr3})

iter, errs := 10, 0
for i := 0; i < iter; i++ {
Expand Down

0 comments on commit 1a3b559

Please sign in to comment.