Skip to content

Commit

Permalink
Prototype of a rate-limiter intended to favor workflows getting histo…
Browse files Browse the repository at this point in the history
…ry over polling for new workflows.

This is to address an explosion of GetWorkflowExecutionHistory requests in one of our internal domains.
"Explosion" to the tune of: normally a couple hundred per second, but during this issue we saw up to ~100,000/s.

A larger description will come after I get some more sleep, but the quick and dirty summary is:
- they had many "live" workflows
- they started to build up a decision-schedule queue
- slowing them down
- overloading caches, causing a lot of un-cached decisions
- ... leading to a lot of history iterators in new workflows looping, trying to load history, and getting ratelimited...
- ... causing more to loop and try to load history...
- ... slowing things down further and making it worse.

Decision tasks were regularly >10 minutes, just trying to load history.

So this is an attempt to prevent that from happening.
It's not yet complete, just contains the limiter I'm planning, and tests.
  • Loading branch information
Groxx committed Mar 1, 2023
1 parent e5063a1 commit cd3d6f4
Show file tree
Hide file tree
Showing 8 changed files with 673 additions and 35 deletions.
42 changes: 21 additions & 21 deletions internal/internal_coroutines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,13 @@ func TestBlockingSelect(t *testing.T) {
Go(ctx, func(ctx Context) {
history = append(history, "add-one")
c1.Send(ctx, "one")
history = append(history, "add-one-done")
history = append(history, "add-one-stopped")

})
Go(ctx, func(ctx Context) {
history = append(history, "add-two")
c2.Send(ctx, "two")
history = append(history, "add-two-done")
history = append(history, "add-two-stopped")
})

s := NewSelector(ctx)
Expand All @@ -298,21 +298,21 @@ func TestBlockingSelect(t *testing.T) {
s.Select(ctx)
history = append(history, "select2")
s.Select(ctx)
history = append(history, "done")
history = append(history, "stopped")
})
require.NoError(t, d.ExecuteUntilAllBlocked())
require.True(t, d.IsDone(), strings.Join(history, "\n"))

expected := []string{
"select1",
"add-one",
"add-one-done",
"add-one-stopped",
"add-two",
"c1-one",
"select2",
"c2-two",
"done",
"add-two-done",
"stopped",
"add-two-stopped",
}
require.EqualValues(t, expected, history)
}
Expand All @@ -339,7 +339,7 @@ func TestBlockingSelectAsyncSend(t *testing.T) {
history = append(history, fmt.Sprintf("select-%v", ii))
s.Select(ctx)
}
history = append(history, "done")
history = append(history, "stopped")
})
require.NoError(t, d.ExecuteUntilAllBlocked())
require.True(t, d.IsDone(), strings.Join(history, "\n"))
Expand All @@ -354,7 +354,7 @@ func TestBlockingSelectAsyncSend(t *testing.T) {
"select-2",
"add-2",
"c1-2",
"done",
"stopped",
}
require.EqualValues(t, expected, history)
}
Expand Down Expand Up @@ -429,7 +429,7 @@ func TestBlockingSelectAsyncSend2(t *testing.T) {
c1.SendAsync("s1")
history = append(history, "select-1")
s.Select(ctx)
history = append(history, "done")
history = append(history, "stopped")
})
require.NoError(t, d.ExecuteUntilAllBlocked())
require.True(t, d.IsDone(), strings.Join(history, "\n"))
Expand All @@ -441,7 +441,7 @@ func TestBlockingSelectAsyncSend2(t *testing.T) {
"send-s1",
"select-1",
"c1-s1",
"done",
"stopped",
}
require.EqualValues(t, expected, history)
}
Expand Down Expand Up @@ -469,7 +469,7 @@ func TestSendSelect(t *testing.T) {
s.Select(ctx)
history = append(history, "select2")
s.Select(ctx)
history = append(history, "done")
history = append(history, "stopped")
})
require.NoError(t, d.ExecuteUntilAllBlocked())
require.True(t, d.IsDone())
Expand All @@ -481,7 +481,7 @@ func TestSendSelect(t *testing.T) {
"send2",
"select2",
"send1",
"done",
"stopped",
"c1-one",
}
require.EqualValues(t, expected, history)
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestSendSelectWithAsyncReceive(t *testing.T) {
s.Select(ctx)
history = append(history, "select2")
s.Select(ctx)
history = append(history, "done")
history = append(history, "stopped")
})
require.NoError(t, d.ExecuteUntilAllBlocked())
require.True(t, d.IsDone(), strings.Join(history, "\n"))
Expand All @@ -523,7 +523,7 @@ func TestSendSelectWithAsyncReceive(t *testing.T) {
"send2",
"select2",
"send1",
"done",
"stopped",
"c1-one",
}
require.EqualValues(t, expected, history)
Expand All @@ -533,7 +533,7 @@ func TestChannelClose(t *testing.T) {
var history []string
d, _ := newDispatcher(createRootTestContext(t), func(ctx Context) {
jobs := NewBufferedChannel(ctx, 5)
done := NewNamedChannel(ctx, "done")
done := NewNamedChannel(ctx, "stopped")

GoNamed(ctx, "receiver", func(ctx Context) {
for {
Expand All @@ -555,7 +555,7 @@ func TestChannelClose(t *testing.T) {
jobs.Close()
history = append(history, "sent all jobs")
done.Receive(ctx, nil)
history = append(history, "done")
history = append(history, "stopped")

})
require.EqualValues(t, 0, len(history))
Expand All @@ -571,7 +571,7 @@ func TestChannelClose(t *testing.T) {
"received job 2",
"received job 3",
"received all jobs",
"done",
"stopped",
}
require.EqualValues(t, expected, history)
}
Expand Down Expand Up @@ -987,7 +987,7 @@ func TestSelectFuture(t *testing.T) {
s.Select(ctx)
history = append(history, "select2")
s.Select(ctx)
history = append(history, "done")
history = append(history, "stopped")
})
require.NoError(t, d.ExecuteUntilAllBlocked())
require.True(t, d.IsDone())
Expand All @@ -999,7 +999,7 @@ func TestSelectFuture(t *testing.T) {
"c1-one",
"select2",
"c2-two",
"done",
"stopped",
}
require.EqualValues(t, expected, history)
}
Expand Down Expand Up @@ -1036,7 +1036,7 @@ func TestSelectDecodeFuture(t *testing.T) {
s.Select(ctx)
history = append(history, "select2")
s.Select(ctx)
history = append(history, "done")
history = append(history, "stopped")
})
require.NoError(t, d.ExecuteUntilAllBlocked())
require.True(t, d.IsDone())
Expand All @@ -1048,7 +1048,7 @@ func TestSelectDecodeFuture(t *testing.T) {
"c1-one",
"select2",
"c2-two",
"done",
"stopped",
}
require.EqualValues(t, expected, history)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,13 +526,13 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_2() {
task = createQueryTask(testEvents[0:8], 8, "HelloWorld_Workflow", queryType)
taskHandler = newWorkflowTaskHandler(testDomain, params, nil, t.registry)
response, _ = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
t.verifyQueryResult(response, "done")
t.verifyQueryResult(response, "stopped")

// query after second decision task with extra events
task = createQueryTask(testEvents[0:9], 9, "HelloWorld_Workflow", queryType)
taskHandler = newWorkflowTaskHandler(testDomain, params, nil, t.registry)
response, _ = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
t.verifyQueryResult(response, "done")
t.verifyQueryResult(response, "stopped")

task = createQueryTask(testEvents[0:9], 9, "HelloWorld_Workflow", "invalid-query-type")
taskHandler = newWorkflowTaskHandler(testDomain, params, nil, t.registry)
Expand Down
6 changes: 5 additions & 1 deletion internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"go.uber.org/cadence/internal/common/backoff"
"go.uber.org/cadence/internal/common/metrics"
"go.uber.org/cadence/internal/common/util"
"go.uber.org/cadence/internal/pahlimiter"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -137,6 +138,8 @@ type (
pollerAutoScaler *pollerAutoScaler
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket

pollAndHistoryLimiter pahlimiter.PollAndHistoryLimiter
}

polledTask struct {
Expand Down Expand Up @@ -288,8 +291,9 @@ func (bw *baseWorker) pollTask() {
}
}

bw.retrier.Throttle()
bw.retrier.Throttle() // sleeps if retry policy determines it should sleep after failures
if bw.pollLimiter == nil || bw.pollLimiter.Wait(bw.limiterContext) == nil {
// TODO: block here on a shared semaphore with history-loading?
task, err = bw.options.taskWorker.PollTask()
if err != nil && enableVerboseLogging {
bw.logger.Debug("Failed to poll for task.", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_worker_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
signalCh = "signal-chan"

startingQueryValue = ""
finishedQueryValue = "done"
finishedQueryValue = "stopped"
queryErr = "error handling query"
)

Expand Down
2 changes: 1 addition & 1 deletion internal/internal_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (s *WorkersTestSuite) TestQueryTask_WorkflowCacheEvicted() {
if err := ExecuteActivity(ctx, activityFn).Get(ctx, nil); err != nil {
return err
}
queryResult = "done"
queryResult = "stopped"
return nil
}

Expand Down
18 changes: 9 additions & 9 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1707,7 +1707,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowUnknownName() {

func (s *WorkflowTestSuiteUnitTest) Test_QueryWorkflow() {
queryType := "state"
stateWaitSignal, stateWaitActivity, stateDone := "wait for signal", "wait for activity", "done"
stateWaitSignal, stateWaitActivity, stateDone := "wait for signal", "wait for activity", "stopped"
workflowFn := func(ctx Context) error {
var state string
err := SetQueryHandler(ctx, queryType, func(queryInput string) (string, error) {
Expand Down Expand Up @@ -2199,7 +2199,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_Channel() {
processedCount++
runningCount++
Go(ctx, func(ctx Context) {
doneCh.SendAsync("done")
doneCh.SendAsync("stopped")
runningCount--
})
}
Expand Down Expand Up @@ -2230,7 +2230,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ContextMisuse() {

Go(ctx, func(shouldUseThisCtx Context) {
Sleep(ctx, time.Hour)
ch.Send(ctx, "done")
ch.Send(ctx, "stopped")
})

var done string
Expand Down Expand Up @@ -2294,7 +2294,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityRetry() {
if info.Attempt < 2 {
return "", NewCustomError("bad-luck")
}
return "retry-done", nil
return "retry-stopped", nil
}

workflowFn := func(ctx Context) (string, error) {
Expand Down Expand Up @@ -2339,7 +2339,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityRetry() {
s.NoError(env.GetWorkflowError())
var result string
s.NoError(env.GetWorkflowResult(&result))
s.Equal("retry-done", result)
s.Equal("retry-stopped", result)
s.Equal(1, attempt1Count)
s.Equal(3, attempt2Count)
}
Expand Down Expand Up @@ -2416,7 +2416,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_LocalActivityRetry() {
if info.Attempt < 2 {
return "", NewCustomError("bad-luck")
}
return "retry-done", nil
return "retry-stopped", nil
}

workflowFn := func(ctx Context) (string, error) {
Expand Down Expand Up @@ -2454,7 +2454,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_LocalActivityRetry() {
s.NoError(env.GetWorkflowError())
var result string
s.NoError(env.GetWorkflowResult(&result))
s.Equal("retry-done", result)
s.Equal("retry-stopped", result)
s.Equal(1, nonretriableCount)
s.Equal(3, retriableCount)
}
Expand Down Expand Up @@ -2544,7 +2544,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflowRetry() {
if info.Attempt < 2 {
return "", NewCustomError("bad-luck")
}
return "retry-done", nil
return "retry-stopped", nil
}

workflowFn := func(ctx Context) (string, error) {
Expand Down Expand Up @@ -2578,7 +2578,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflowRetry() {
s.NoError(env.GetWorkflowError())
var result string
s.NoError(env.GetWorkflowResult(&result))
s.Equal("retry-done", result)
s.Equal("retry-stopped", result)
}

func (s *WorkflowTestSuiteUnitTest) Test_SignalChildWorkflowRetry() {
Expand Down
Loading

0 comments on commit cd3d6f4

Please sign in to comment.