From a3df5df1d5f24ee2e3104e4f5966e23e02d00643 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 9 Dec 2024 14:51:57 -0800 Subject: [PATCH 1/2] Do not run replay on commands that are a part of a non-completed task --- internal/internal_task_handlers.go | 19 +++++++++++++++++++ internal/internal_worker_test.go | 23 +++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 831258776..da4ae6d22 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -534,6 +534,10 @@ func isPreloadMarkerEvent(event *historypb.HistoryEvent) bool { return event.GetEventType() == enumspb.EVENT_TYPE_MARKER_RECORDED } +func isTaskCompletedEvent(event *historypb.HistoryEvent) bool { + return event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED +} + func inferMessageFromAcceptedEvent(attrs *historypb.WorkflowExecutionUpdateAcceptedEventAttributes) *protocolpb.Message { return &protocolpb.Message{ Id: attrs.GetAcceptedRequestMessageId(), @@ -1022,6 +1026,8 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo _, wfPanicked := w.err.(*workflowPanicError) return !wfPanicked && isInReplayer } + completedTaskCommandIndex := -1 + completedTask := false metricsHandler := w.wth.metricsHandler.WithTags(metrics.WorkflowTags(task.WorkflowType.GetName())) start := time.Now() @@ -1045,6 +1051,10 @@ ProcessEvents: binaryChecksum := nextTask.binaryChecksum nextTaskBuildId := nextTask.buildID admittedUpdates := nextTask.admittedMsgs + if len(reorderedEvents) > 0 && isTaskCompletedEvent(reorderedEvents[0]) { + completedTaskCommandIndex = len(replayCommands) + completedTask = true + } // Check if we are replaying so we know if we should use the messages in the WFT or the history isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1]) var msgs *eventMsgIndex @@ -1195,6 +1205,15 @@ ProcessEvents: metricsTimer = nil } + // We do not want to run non-determinism checks on a task start that + // doesn't have a corresponding completed task. + if completedTaskCommandIndex >= 0 { + replayCommands = replayCommands[:completedTaskCommandIndex] + } + if !completedTask { + replayCommands = nil + } + // Non-deterministic error could happen in 2 different places: // 1) the replay commands does not match to history events. This is usually due to non backwards compatible code // change to workflow logic. For example, change calling one activity to a different activity. diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 6a5a34862..79b1fbc5a 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -312,6 +312,29 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory() { replayer.RegisterWorkflow(testReplayWorkflow) err = replayer.ReplayWorkflowHistory(logger, history) require.NoError(s.T(), err) + require.True(s.T(), false) +} + +func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_IncompleteWorkflowTask() { + taskQueue := "taskQueue1" + testEvents := []*historypb.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &commonpb.WorkflowType{Name: "testReplayWorkflow"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + Input: testEncodeFunctionArgs(converter.GetDefaultDataConverter()), + }), + createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}), + createTestEventWorkflowTaskStarted(3), + } + + history := &historypb.History{Events: testEvents} + logger := getLogger() + replayer, err := NewWorkflowReplayer(WorkflowReplayerOptions{}) + require.NoError(s.T(), err) + replayer.RegisterWorkflow(testReplayWorkflow) + fmt.Println("[ReplayWorkflowHistory]") + err = replayer.ReplayWorkflowHistory(logger, history) + require.NoError(s.T(), err) } func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity() { From 6241536d5d654dbc1716634eb32200322de9e239 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 9 Dec 2024 15:03:18 -0800 Subject: [PATCH 2/2] Remove completedTask, don't need 2 variables to track --- internal/internal_task_handlers.go | 7 +------ internal/internal_worker_test.go | 1 - 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index da4ae6d22..9b972bca1 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -1026,8 +1026,7 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo _, wfPanicked := w.err.(*workflowPanicError) return !wfPanicked && isInReplayer } - completedTaskCommandIndex := -1 - completedTask := false + completedTaskCommandIndex := 0 metricsHandler := w.wth.metricsHandler.WithTags(metrics.WorkflowTags(task.WorkflowType.GetName())) start := time.Now() @@ -1053,7 +1052,6 @@ ProcessEvents: admittedUpdates := nextTask.admittedMsgs if len(reorderedEvents) > 0 && isTaskCompletedEvent(reorderedEvents[0]) { completedTaskCommandIndex = len(replayCommands) - completedTask = true } // Check if we are replaying so we know if we should use the messages in the WFT or the history isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1]) @@ -1210,9 +1208,6 @@ ProcessEvents: if completedTaskCommandIndex >= 0 { replayCommands = replayCommands[:completedTaskCommandIndex] } - if !completedTask { - replayCommands = nil - } // Non-deterministic error could happen in 2 different places: // 1) the replay commands does not match to history events. This is usually due to non backwards compatible code diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 79b1fbc5a..8893d85b7 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -312,7 +312,6 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory() { replayer.RegisterWorkflow(testReplayWorkflow) err = replayer.ReplayWorkflowHistory(logger, history) require.NoError(s.T(), err) - require.True(s.T(), false) } func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_IncompleteWorkflowTask() {