From 77b6a80964c566b350085e6fb3fa9ef7b08c8695 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Tue, 9 Apr 2024 17:27:03 -0700 Subject: [PATCH] fix the async work flag didn't take affect Signed-off-by: Yi Jin --- cmd/thanos/receive.go | 2 ++ pkg/receive/handler.go | 10 ++-------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 031b82abc6..cd890ce71d 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -255,6 +255,8 @@ func runReceive( MaxBackoff: time.Duration(*conf.maxBackoff), TSDBStats: dbs, Limiter: limiter, + + AsyncForwardWorkerCount: conf.asyncForwardWorkerCount, }) grpcProbe := prober.NewGRPC() diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index c1e76212fb..7125f9d478 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -142,6 +142,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler { if workers == 0 { workers = 1 } + level.Info(logger).Log("msg", "Starting receive handler with async forward workers", "workers", workers) h := &Handler{ logger: logger, @@ -163,7 +164,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler { }, ), workers, - logger, o.DialOpts...), receiverMode: o.ReceiverMode, Limiter: o.Limiter, @@ -777,8 +777,6 @@ func (h *Handler) distributeTimeseriesToReplicas( var writeDestination = remoteWrites if endpoint == h.options.Endpoint { writeDestination = localWrites - } else { - endpointReplica.replica = 0 } writeableSeries, ok := writeDestination[endpointReplica] if !ok { @@ -1212,7 +1210,7 @@ type peerWorker struct { forwardDelay prometheus.Histogram } -func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, log log.Logger, dialOpts ...grpc.DialOption) peersContainer { +func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer { return &peerGroup{ dialOpts: dialOpts, connections: map[string]*peerWorker{}, @@ -1222,7 +1220,6 @@ func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, as expBackoff: backoff, forwardDelay: forwardDelay, asyncForwardWorkersCount: asyncForwardWorkersCount, - log: log, } } @@ -1266,7 +1263,6 @@ type peerGroup struct { expBackoff backoff.Backoff forwardDelay prometheus.Histogram asyncForwardWorkersCount uint - log log.Logger m sync.RWMutex @@ -1322,8 +1318,6 @@ func (p *peerGroup) getConnection(ctx context.Context, addr string) (WriteableSt } p.connections[addr] = newPeerWorker(conn, p.forwardDelay, p.asyncForwardWorkersCount) - log.With(p.log).Log("msg", "established connection to peer", "peer", addr, - "asyncForwardWorkersCount", p.asyncForwardWorkersCount, "poolSize", p.connections[addr].wp.Size()) return p.connections[addr], nil }