Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive: refactor handler for improved readability and organization #6898

Merged
merged 47 commits into from
Jan 17, 2024
Merged
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
0edff03
[wip] First checkpoint
douglascamata Nov 14, 2023
5837f0f
[wip] Second checkpoint
douglascamata Nov 14, 2023
e74a01c
Small random refactors
douglascamata Nov 15, 2023
219b203
Add some useful trace tags
douglascamata Nov 15, 2023
023c776
Concurrent and traced local writes
douglascamata Nov 15, 2023
99f6afb
Improve variable names in remote writes
douglascamata Nov 15, 2023
1a969ee
Rename `newFanoutForward` function
douglascamata Nov 15, 2023
b8bb340
More refactors
douglascamata Nov 15, 2023
9af212f
Fix linting issue
douglascamata Nov 15, 2023
955d859
Add a quorum test with sloppy quorum
douglascamata Nov 16, 2023
4a462e6
[wip] Try to make retries work
douglascamata Nov 16, 2023
f622a97
[wip] Checkpoint: wait group still hanging
douglascamata Nov 17, 2023
a8e1175
Some refactors
douglascamata Nov 21, 2023
91ce0d6
Add some commented code so I don't lose it
douglascamata Nov 21, 2023
d49b8b1
Adapt tests
douglascamata Nov 21, 2023
85cf490
Remove sloppy quorum code
douglascamata Nov 21, 2023
3976c16
Move some code around
douglascamata Nov 21, 2023
3640a7e
Remove even more leftover of sloppy quorum
douglascamata Nov 22, 2023
ba5c92b
Extract a type to hold function params
douglascamata Nov 22, 2023
5c74447
Remove unused struct field
douglascamata Nov 22, 2023
10af70a
Remove useless variable
douglascamata Nov 22, 2023
b1934bd
Merge branch 'main' of github.com:thanos-io/thanos into receive-write…
douglascamata Nov 22, 2023
504dd58
Remove type that wasn't used enough
douglascamata Nov 22, 2023
7e131da
Delete function to tighten up max buffered responses
douglascamata Nov 22, 2023
43fe873
Add comments to some functions
douglascamata Nov 22, 2023
5b9f1ff
Fix peer up check
douglascamata Nov 22, 2023
3690492
Fix size of replication tracking slices
douglascamata Nov 22, 2023
a641dde
Rename context
douglascamata Nov 22, 2023
e3a0c01
Don't do local writes concurrently
douglascamata Nov 22, 2023
35ba1f9
Remove extra error logging
douglascamata Nov 22, 2023
5fcf772
Merge branch 'main' into receive-write-refactor
douglascamata Dec 13, 2023
faf449f
Fix syntax after merge
douglascamata Dec 13, 2023
7406300
Merge branch 'main' of github.com:thanos-io/thanos into receive-write…
douglascamata Jan 8, 2024
fab1c8e
Add missing methods to peersContainer
douglascamata Jan 8, 2024
0bdf584
Fix handler test
douglascamata Jan 9, 2024
0cab6ce
Merge branch 'main' of github.com:thanos-io/thanos into receive-write…
douglascamata Jan 9, 2024
ae9af0c
Reset peers state on hashring changes
douglascamata Jan 10, 2024
d389410
Handle PR comment regarding waitgroup
douglascamata Jan 10, 2024
2bd869b
Set span tags to help debug
douglascamata Jan 10, 2024
ead8120
Fix concurrency issue
douglascamata Jan 11, 2024
af890dc
Fix request ID middleware
douglascamata Jan 11, 2024
a3163ba
Fix `distributeTimeseriesToReplicas` comment
douglascamata Jan 16, 2024
b04b7e5
Extract var with 1-indexed replication index
douglascamata Jan 16, 2024
d1b88dd
Rename methods in peersContainer interface
douglascamata Jan 16, 2024
ffc3060
Make peerGroup `getConnection` check if peers are up
douglascamata Jan 16, 2024
f9d4c6e
Remove yet one more not useful log
douglascamata Jan 16, 2024
fd4405f
Remove logger from `h.sendWrites`
douglascamata Jan 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[wip] First checkpoint
Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>
douglascamata committed Nov 14, 2023
commit 0edff03badca5ea362d25157cb9190c8be6ab3d0
226 changes: 213 additions & 13 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
@@ -587,30 +587,231 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
}
}

wreqs := make(map[endpointReplica]trackedSeries)
for tsID, ts := range wreq.Timeseries {
wreqs, _, err := h.distributeTimeseriesToReplicas(tenant, h.hashring, replicas, 0, wreq.Timeseries)
if err != nil {
h.mtx.RUnlock()
return err
}
h.mtx.RUnlock()

return h.fanoutForward(ctx, tenant, wreqs, len(wreq.Timeseries), replicas, r.replicated)
}

type distributedSeries map[endpointReplica]trackedSeries

// distributeTimeseriesToReplicas distributes the given timeseries from the tenant to different endpoints in a manner
// that achieves the replication factor indicated by replicas. It is possible to offset the hashing algorithm by using
// the hashOffset parameter, which can be used to distribute the same series differently.
func (h *Handler) distributeTimeseriesToReplicas(tenant string, hashring Hashring, replicas []uint64, hashOffset uint64, timeseries []prompb.TimeSeries) (distributedSeries, distributedSeries, error) {
remoteWrites := make(distributedSeries)
localWrites := make(distributedSeries)
for tsIndex, ts := range timeseries {
for _, rn := range replicas {
endpoint, err := h.hashring.GetN(tenant, &ts, rn)
endpoint, err := hashring.GetN(tenant, &ts, rn+hashOffset)
if err != nil {
h.mtx.RUnlock()
return err
return nil, nil, err
}
key := endpointReplica{endpoint: endpoint, replica: rn}
writeTarget, ok := wreqs[key]
var writeDestination = remoteWrites
if endpoint == h.options.Endpoint {
writeDestination = localWrites
}
writeTarget, ok := writeDestination[key]
if !ok {
writeTarget = trackedSeries{
seriesIDs: make([]int, 0),
timeSeries: make([]prompb.TimeSeries, 0),
}
}
writeTarget.timeSeries = append(wreqs[key].timeSeries, ts)
writeTarget.seriesIDs = append(wreqs[key].seriesIDs, tsID)
wreqs[key] = writeTarget
writeTarget.timeSeries = append(remoteWrites[key].timeSeries, ts)
writeTarget.seriesIDs = append(remoteWrites[key].seriesIDs, tsIndex)
remoteWrites[key] = writeTarget
}
}
h.mtx.RUnlock()
return localWrites, remoteWrites, nil
}

func (h *Handler) newFanoutForward(pctx context.Context, tenant string, writeRequest *prompb.WriteRequest, replicas []uint64, alreadyReplicated bool) error {
var writeErrors writeErrors

// Create a new context for this function with a custom timeout.
ctx, cancel := context.WithTimeout(pctx, h.options.ForwardTimeout)

defer func() {
if writeErrors.ErrOrNil() != nil {
// NOTICE: The cancel function is not used on all paths intentionally,
// if there is no error when quorum is reached,
// let forward requests to optimistically run until timeout.
cancel()
}
}()

tLogger := log.With(h.logger, "tenant", tenant)
if id, ok := middleware.RequestIDFromContext(pctx); ok {
tLogger = log.With(tLogger, "request-id", id)
}

localWrites, remoteWrites, err := h.distributeTimeseriesToReplicas(tenant, h.hashring, replicas, 0, writeRequest.Timeseries)
if err != nil {
return err
}
maxBufferedResponses := len(remoteWrites) + len(localWrites)
finalResponses := make(chan writeResponse, maxBufferedResponses)

// Do the writes to the local node first. This should be easy and fast.
for endpointReplica := range localWrites {
var err error
tracing.DoInSpan(ctx, "receive_tsdb_write", func(_ context.Context) {
err = h.writer.Write(ctx, tenant, &prompb.WriteRequest{
Timeseries: localWrites[endpointReplica].timeSeries,
})
})
if err != nil {
level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error())
finalResponses <- newWriteResponse(localWrites[endpointReplica].seriesIDs, err)
continue
}
finalResponses <- newWriteResponse(localWrites[endpointReplica].seriesIDs, nil)
}

return h.fanoutForward(ctx, tenant, wreqs, len(wreq.Timeseries), r.replicated)
// Do the writes to remote nodes. Run them all in parallel.
var wg sync.WaitGroup
wg.Add(len(remoteWrites))
for er := range remoteWrites {
go func(endpointReplica endpointReplica, respChan chan<- writeResponse) {
defer wg.Done()
err := h.sendRemoteWrite(ctx, tenant, endpointReplica, remoteWrites[endpointReplica].timeSeries)
if err != nil {
finalResponses <- newWriteResponse(remoteWrites[endpointReplica].seriesIDs, err)

h.forwardRequests.WithLabelValues(labelError).Inc()
if !alreadyReplicated {
h.replications.WithLabelValues(labelError).Inc()
}
return
}
h.forwardRequests.WithLabelValues(labelSuccess).Inc()
if !alreadyReplicated {
h.replications.WithLabelValues(labelSuccess).Inc()
}
}(er, finalResponses)
}

go func() {
wg.Wait()
close(finalResponses)
}()

// At the end, make sure to exhaust the channel, letting remaining unnecessary requests finish asynchronously.
// This is needed if context is canceled or if we reached success of fail quorum faster.
defer func() {
go func() {
for resp := range finalResponses {
if resp.err != nil {
// TODO: log something
}
}
}()
}()

quorum := h.writeQuorum()
if alreadyReplicated {
quorum = 1
}
successes := make([]int, len(writeRequest.Timeseries))
seriesErrs := newReplicationErrors(quorum, len(writeRequest.Timeseries))
for {
select {
case <-ctx.Done():
return ctx.Err()
case resp, hasMore := <-finalResponses:
// At the end, aggregate all errors if there are any and return them.
if !hasMore {
for _, err := range seriesErrs {
writeErrors.Add(err)
}
return writeErrors.ErrOrNil()
}

// Track errors and successes on a per-series basis.
if resp.err != nil {
for _, seriesID := range resp.seriesIDs {
seriesErrs[seriesID].Add(resp.err)
}
continue
}
for _, seriesID := range resp.seriesIDs {
successes[seriesID]++
}

if quorumReached(successes, quorum) {
return nil
}
}
}
}

func (h *Handler) sendRemoteWrite(ctx context.Context, tenant string, endpointReplica endpointReplica, timeSeries []prompb.TimeSeries) error {
endpoint := endpointReplica.endpoint
cl, err := h.peers.get(ctx, endpoint)
if err != nil {
return err
}
if peerHealthy := h.isPeerHealthy(endpoint); !peerHealthy {
return errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)
}

span, spanCtx := tracing.StartSpan(ctx, "receive_forward")
// Actually make the request against the endpoint we determined should handle these time series.
_, err = cl.RemoteWrite(spanCtx, &storepb.WriteRequest{
Timeseries: timeSeries,
Tenant: tenant,
// Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated.
Replica: int64(endpointReplica.replica + 1),
douglascamata marked this conversation as resolved.
Show resolved Hide resolved
})
span.Finish()
if err != nil {
// Check if peer connection is unavailable, update the peer state to avoid spamming that peer.
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unavailable {
h.markUnhealthyPeer(endpoint)
}
}
return err
}
h.markHealthyPeer(endpoint)
return nil
}

func (h *Handler) markUnhealthyPeer(endpoint string) {
h.mtx.Lock()

state, ok := h.peerStates[endpoint]
if !ok {
state = &retryState{attempt: -1}
}
state.attempt++
state.nextAllowed = time.Now().Add(h.expBackoff.ForAttempt(state.attempt))
h.peerStates[endpoint] = state

h.mtx.Unlock()
}

func (h *Handler) markHealthyPeer(endpoint string) {
h.mtx.Lock()
delete(h.peerStates, endpoint)
h.mtx.Unlock()
}

func (h *Handler) isPeerHealthy(endpoint string) bool {
h.mtx.RLock()
defer h.mtx.RUnlock()
state, ok := h.peerStates[endpoint]
// No state means the peer wasn't reported as unhealthy yet.
if !ok {
return true
}
return time.Now().Before(state.nextAllowed)
}

// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success.
@@ -630,7 +831,7 @@ func quorumReached(successes []int, successThreshold int) bool {

// fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of
// requests succeeds or fails or if context is canceled.
func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]trackedSeries, numSeries int, seriesReplicated bool) error {
func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]trackedSeries, numSeries int, replicas []uint64, seriesReplicated bool) error {
var errs writeErrors

fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout)
@@ -768,7 +969,6 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
}
werr := errors.Wrapf(err, "forwarding request to endpoint %v", writeTarget.endpoint)
responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, werr)
return
}
h.mtx.Lock()
delete(h.peerStates, writeTarget.endpoint)