Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Long poll completion buffer to prevent timeouts #4425

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
queryFirstDecisionTaskCheckInterval = 200 * time.Millisecond
replicationTimeout = 30 * time.Second
contextLockTimeout = 500 * time.Millisecond
longPollCompletionBuffer = 50 * time.Millisecond

// TerminateIfRunningReason reason for terminateIfRunning
TerminateIfRunningReason = "TerminateIfRunning Policy"
Expand Down Expand Up @@ -1005,7 +1006,26 @@ func (e *historyEngineImpl) getMutableStateOrPolling(
if err != nil {
return nil, err
}
timer := time.NewTimer(e.shard.GetConfig().LongPollExpirationInterval(domainName))

expirationInterval := e.shard.GetConfig().LongPollExpirationInterval(domainName)
if deadline, ok := ctx.Deadline(); ok {
remainingTime := deadline.Sub(e.shard.GetTimeSource().Now())
// Here we return a safeguard error, to ensure that older clients are not stuck in long poll loop until context fully expires.
// Otherwise it results in multiple additional requests being made that returns empty responses.
// Newer clients will not make request with too small timeout remaining.
if remainingTime < longPollCompletionBuffer {
return nil, context.DeadlineExceeded
}
// longPollCompletionBuffer is here to leave some room to finish current request without its timeout.
expirationInterval = common.MinDuration(
expirationInterval,
remainingTime-longPollCompletionBuffer,
)
}
if expirationInterval <= 0 {
return response, nil
}
timer := time.NewTimer(expirationInterval)
defer timer.Stop()
for {
select {
Expand All @@ -1026,8 +1046,6 @@ func (e *historyEngineImpl) getMutableStateOrPolling(
}
case <-timer.C:
return response, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
Expand Down