diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index aa42f0285..091cb29eb 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -30,9 +30,7 @@ import com.google.common.base.Strings; import com.google.protobuf.Any; import io.temporal.api.command.v1.*; -import io.temporal.api.common.v1.Payloads; -import io.temporal.api.common.v1.SearchAttributes; -import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.common.v1.*; import io.temporal.api.enums.v1.EventType; import io.temporal.api.failure.v1.Failure; import io.temporal.api.history.v1.*; @@ -430,7 +428,11 @@ private void handleSingleEvent(HistoryEvent event, boolean hasNextEvent) { replaying = false; } - Long initialCommandEventId = getInitialCommandEventId(event); + final long initialCommandEventId = getInitialCommandEventId(event); + if (initialCommandEventId < 0L) { + return; + } + EntityStateMachine c = stateMachines.get(initialCommandEventId); if (c != null) { c.handleEvent(event, hasNextEvent); @@ -1267,11 +1269,13 @@ private long getInitialCommandEventId(HistoryEvent event) { case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED: case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED: return event.getEventId(); - case UNRECOGNIZED: - case EVENT_TYPE_UNSPECIFIED: + + default: + if (event.getWorkerMayIgnore()) { + return -1L; + } throw new IllegalArgumentException("Unexpected event type: " + event.getEventType()); } - throw new IllegalStateException("unreachable"); } /** diff --git a/temporal-sdk/src/test/java/io/temporal/internal/replay/UnknownHistoryEventReplayerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/replay/UnknownHistoryEventReplayerTest.java new file mode 100644 index 000000000..0758f1fd5 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/replay/UnknownHistoryEventReplayerTest.java @@ -0,0 +1,134 @@ +/* + * Copyright (C) 2024 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.temporal.internal.replay; + +import static io.temporal.testing.WorkflowHistoryLoader.readHistoryFromResource; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.LocalActivityOptions; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.testing.TestWorkflowEnvironment; +import io.temporal.worker.Worker; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +public class UnknownHistoryEventReplayerTest { + + public static final String TASK_QUEUE = "unknown-history-event"; + public static final String RES_CLEAN = "testUnknownHistoryEventClean.json"; + public static final String RES_MAY_IGNORE = "testUnknownHistoryEventMayIgnore.json"; + public static final String RES_MAY_NOT_IGNORE = "testUnknownHistoryEventMayNotIgnore.json"; + + @Rule + public Timeout testTimeout = Timeout.seconds(10); + + private TestWorkflowEnvironment testEnvironment; + private Worker worker; + + @Before + public void setUp() { + testEnvironment = TestWorkflowEnvironment.newInstance(); + worker = testEnvironment.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(MyWorkflowImpl.class); + worker.registerActivitiesImplementations(new MyActivityImpl()); + testEnvironment.start(); + } + + @After + public void tearDown() { + testEnvironment.close(); + } + + @Test + public void testRun() { + WorkflowClient client = testEnvironment.getWorkflowClient(); + WorkflowOptions options = + WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId("plain-run").build(); + MyWorkflow stub = client.newWorkflowStub(MyWorkflow.class, options); + stub.execute(); + WorkflowExecutionHistory history = client.fetchHistory("plain-run"); + System.out.println(history.toJson(true)); + } + + @Test + public void testClean() throws Exception { + WorkflowExecutionHistory history = readHistoryFromResource(RES_CLEAN); + worker.replayWorkflowExecution(history); + } + + @Test + public void testMayIgnore() throws Exception { + WorkflowExecutionHistory history = readHistoryFromResource(RES_MAY_IGNORE); + worker.replayWorkflowExecution(history); + } + + @Test(expected = RuntimeException.class) + public void testMayNotIgnore() throws Exception { + WorkflowExecutionHistory history = readHistoryFromResource(RES_MAY_NOT_IGNORE); + worker.replayWorkflowExecution(history); + } + + @WorkflowInterface + public interface MyWorkflow { + + @WorkflowMethod + void execute(); + } + + @ActivityInterface + public interface MyActivity { + + @ActivityMethod + void execute(); + } + + public static class MyWorkflowImpl implements MyWorkflow { + + @Override + public void execute() { + MyActivity activity = + Workflow.newLocalActivityStub( + MyActivity.class, + LocalActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(1)) + .build()); + activity.execute(); + } + } + + public static class MyActivityImpl implements MyActivity { + + @Override + public void execute() { + } + } +} diff --git a/temporal-sdk/src/test/resources/testUnknownHistoryEventClean.json b/temporal-sdk/src/test/resources/testUnknownHistoryEventClean.json new file mode 100644 index 000000000..1a137f802 --- /dev/null +++ b/temporal-sdk/src/test/resources/testUnknownHistoryEventClean.json @@ -0,0 +1,123 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-02-29T20:38:37.858Z", + "eventType": "WorkflowExecutionStarted", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "MyWorkflow" + }, + "taskQueue": { + "name": "unknown-history-event" + }, + "input": {}, + "workflowExecutionTimeout": "315360000s", + "workflowRunTimeout": "315360000s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "447c6032-30a1-4dd7-8f7b-f965ba68715c", + "identity": "2732293@aten", + "firstExecutionRunId": "447c6032-30a1-4dd7-8f7b-f965ba68715c", + "attempt": 1, + "header": {} + } + }, + { + "eventId": "2", + "eventTime": "2024-02-29T20:38:37.858Z", + "eventType": "WorkflowTaskScheduled", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "unknown-history-event" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-02-29T20:38:37.867Z", + "eventType": "WorkflowTaskStarted", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "2732293@aten" + } + }, + { + "eventId": "4", + "eventTime": "2024-02-29T20:38:38.001Z", + "eventType": "WorkflowTaskCompleted", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "identity": "2732293@aten", + "sdkMetadata": { + "langUsedFlags": [ + 1 + ] + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2024-02-29T20:38:38.001Z", + "eventType": "MarkerRecorded", + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": { + "activityId": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "IjdjMzMxNzJkLTFhMGUtM2UxMS1iYTM4LTliNjY4ZTFkNzg5NyI\u003d" + } + ] + }, + "input": {}, + "meta": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "eyJmaXJzdFNrZCI6MTcwOTIzOTExNzkzMywiYXRwdCI6MSwiYmFja29mZiI6bnVsbH0\u003d" + } + ] + }, + "time": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "MTcwOTIzOTExNzg4NQ\u003d\u003d" + } + ] + }, + "type": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "IkV4ZWN1dGUi" + } + ] + } + }, + "workflowTaskCompletedEventId": "3" + } + }, + { + "eventId": "6", + "eventTime": "2024-02-29T20:38:38.001Z", + "eventType": "WorkflowExecutionCompleted", + "workflowExecutionCompletedEventAttributes": { + "result": {}, + "workflowTaskCompletedEventId": "3" + } + } + ] +} \ No newline at end of file diff --git a/temporal-sdk/src/test/resources/testUnknownHistoryEventMayIgnore.json b/temporal-sdk/src/test/resources/testUnknownHistoryEventMayIgnore.json new file mode 100644 index 000000000..632e54876 --- /dev/null +++ b/temporal-sdk/src/test/resources/testUnknownHistoryEventMayIgnore.json @@ -0,0 +1,120 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-02-29T20:38:37.858Z", + "eventType": "WorkflowExecutionStarted", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "MyWorkflow" + }, + "taskQueue": { + "name": "unknown-history-event" + }, + "input": {}, + "workflowExecutionTimeout": "315360000s", + "workflowRunTimeout": "315360000s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "447c6032-30a1-4dd7-8f7b-f965ba68715c", + "identity": "2732293@aten", + "firstExecutionRunId": "447c6032-30a1-4dd7-8f7b-f965ba68715c", + "attempt": 1, + "header": {} + } + }, + { + "eventId": "2", + "eventTime": "2024-02-29T20:38:37.858Z", + "eventType": "WorkflowTaskScheduled", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "unknown-history-event" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-02-29T20:38:37.867Z", + "eventType": "WorkflowTaskStarted", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "2732293@aten" + } + }, + { + "eventId": "4", + "eventTime": "2024-02-29T20:38:38.001Z", + "eventType": "WorkflowTaskCompleted", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "identity": "2732293@aten", + "sdkMetadata": { + "langUsedFlags": [ + 1 + ] + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2024-02-29T20:38:38.001Z", + "eventType": "MarkerRecorded", + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": { + "activityId": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "IjdjMzMxNzJkLTFhMGUtM2UxMS1iYTM4LTliNjY4ZTFkNzg5NyI\u003d" + } + ] + }, + "input": {}, + "meta": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "eyJmaXJzdFNrZCI6MTcwOTIzOTExNzkzMywiYXRwdCI6MSwiYmFja29mZiI6bnVsbH0\u003d" + } + ] + }, + "time": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "MTcwOTIzOTExNzg4NQ\u003d\u003d" + } + ] + }, + "type": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "IkV4ZWN1dGUi" + } + ] + } + }, + "workflowTaskCompletedEventId": "3" + } + }, + { + "eventId": "6", + "eventTime": "2024-02-29T20:38:38.001Z", + "eventType": "Unknown", + "workerMayIgnore": true + } + ] +} \ No newline at end of file diff --git a/temporal-sdk/src/test/resources/testUnknownHistoryEventMayNotIgnore.json b/temporal-sdk/src/test/resources/testUnknownHistoryEventMayNotIgnore.json new file mode 100644 index 000000000..b13932e37 --- /dev/null +++ b/temporal-sdk/src/test/resources/testUnknownHistoryEventMayNotIgnore.json @@ -0,0 +1,119 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-02-29T20:38:37.858Z", + "eventType": "WorkflowExecutionStarted", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "MyWorkflow" + }, + "taskQueue": { + "name": "unknown-history-event" + }, + "input": {}, + "workflowExecutionTimeout": "315360000s", + "workflowRunTimeout": "315360000s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "447c6032-30a1-4dd7-8f7b-f965ba68715c", + "identity": "2732293@aten", + "firstExecutionRunId": "447c6032-30a1-4dd7-8f7b-f965ba68715c", + "attempt": 1, + "header": {} + } + }, + { + "eventId": "2", + "eventTime": "2024-02-29T20:38:37.858Z", + "eventType": "WorkflowTaskScheduled", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "unknown-history-event" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-02-29T20:38:37.867Z", + "eventType": "WorkflowTaskStarted", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "2732293@aten" + } + }, + { + "eventId": "4", + "eventTime": "2024-02-29T20:38:38.001Z", + "eventType": "WorkflowTaskCompleted", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "identity": "2732293@aten", + "sdkMetadata": { + "langUsedFlags": [ + 1 + ] + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2024-02-29T20:38:38.001Z", + "eventType": "MarkerRecorded", + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": { + "activityId": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "IjdjMzMxNzJkLTFhMGUtM2UxMS1iYTM4LTliNjY4ZTFkNzg5NyI\u003d" + } + ] + }, + "input": {}, + "meta": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "eyJmaXJzdFNrZCI6MTcwOTIzOTExNzkzMywiYXRwdCI6MSwiYmFja29mZiI6bnVsbH0\u003d" + } + ] + }, + "time": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "MTcwOTIzOTExNzg4NQ\u003d\u003d" + } + ] + }, + "type": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg\u003d\u003d" + }, + "data": "IkV4ZWN1dGUi" + } + ] + } + }, + "workflowTaskCompletedEventId": "3" + } + }, + { + "eventId": "6", + "eventTime": "2024-02-29T20:38:38.001Z", + "eventType": "Unknown" + } + ] +} \ No newline at end of file