Skip to content

Commit

Permalink
fix the async work flag didn't take affect
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Apr 10, 2024
1 parent bd6169c commit 77b6a80
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 8 deletions.
2 changes: 2 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ func runReceive(
MaxBackoff: time.Duration(*conf.maxBackoff),
TSDBStats: dbs,
Limiter: limiter,

AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
})

grpcProbe := prober.NewGRPC()
Expand Down
10 changes: 2 additions & 8 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
if workers == 0 {
workers = 1
}
level.Info(logger).Log("msg", "Starting receive handler with async forward workers", "workers", workers)

h := &Handler{
logger: logger,
Expand All @@ -163,7 +164,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
},
),
workers,
logger,
o.DialOpts...),
receiverMode: o.ReceiverMode,
Limiter: o.Limiter,
Expand Down Expand Up @@ -777,8 +777,6 @@ func (h *Handler) distributeTimeseriesToReplicas(
var writeDestination = remoteWrites
if endpoint == h.options.Endpoint {
writeDestination = localWrites
} else {
endpointReplica.replica = 0
}
writeableSeries, ok := writeDestination[endpointReplica]
if !ok {
Expand Down Expand Up @@ -1212,7 +1210,7 @@ type peerWorker struct {
forwardDelay prometheus.Histogram
}

func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, log log.Logger, dialOpts ...grpc.DialOption) peersContainer {
func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer {
return &peerGroup{
dialOpts: dialOpts,
connections: map[string]*peerWorker{},
Expand All @@ -1222,7 +1220,6 @@ func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, as
expBackoff: backoff,
forwardDelay: forwardDelay,
asyncForwardWorkersCount: asyncForwardWorkersCount,
log: log,
}
}

Expand Down Expand Up @@ -1266,7 +1263,6 @@ type peerGroup struct {
expBackoff backoff.Backoff
forwardDelay prometheus.Histogram
asyncForwardWorkersCount uint
log log.Logger

m sync.RWMutex

Expand Down Expand Up @@ -1322,8 +1318,6 @@ func (p *peerGroup) getConnection(ctx context.Context, addr string) (WriteableSt
}

p.connections[addr] = newPeerWorker(conn, p.forwardDelay, p.asyncForwardWorkersCount)
log.With(p.log).Log("msg", "established connection to peer", "peer", addr,
"asyncForwardWorkersCount", p.asyncForwardWorkersCount, "poolSize", p.connections[addr].wp.Size())
return p.connections[addr], nil
}

Expand Down

0 comments on commit 77b6a80

Please sign in to comment.