Skip to content

Commit

Permalink
Ignore history events with worker_may_ignore: true.
Browse files Browse the repository at this point in the history
  • Loading branch information
chronos-tachyon committed Feb 29, 2024
1 parent 2b05f07 commit 58981e3
Show file tree
Hide file tree
Showing 5 changed files with 507 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import io.temporal.api.command.v1.*;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.common.v1.*;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.*;
Expand Down Expand Up @@ -430,7 +428,11 @@ private void handleSingleEvent(HistoryEvent event, boolean hasNextEvent) {
replaying = false;
}

Long initialCommandEventId = getInitialCommandEventId(event);
final long initialCommandEventId = getInitialCommandEventId(event);
if (initialCommandEventId < 0L) {
return;
}

EntityStateMachine c = stateMachines.get(initialCommandEventId);
if (c != null) {
c.handleEvent(event, hasNextEvent);
Expand Down Expand Up @@ -1267,11 +1269,13 @@ private long getInitialCommandEventId(HistoryEvent event) {
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
return event.getEventId();
case UNRECOGNIZED:
case EVENT_TYPE_UNSPECIFIED:

default:
if (event.getWorkerMayIgnore()) {
return -1L;
}
throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
}
throw new IllegalStateException("unreachable");
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (C) 2024 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.temporal.internal.replay;

import static io.temporal.testing.WorkflowHistoryLoader.readHistoryFromResource;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.LocalActivityOptions;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.common.WorkflowExecutionHistory;
import io.temporal.testing.TestWorkflowEnvironment;
import io.temporal.worker.Worker;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class UnknownHistoryEventReplayerTest {

public static final String TASK_QUEUE = "unknown-history-event";
public static final String RES_CLEAN = "testUnknownHistoryEventClean.json";
public static final String RES_MAY_IGNORE = "testUnknownHistoryEventMayIgnore.json";
public static final String RES_MAY_NOT_IGNORE = "testUnknownHistoryEventMayNotIgnore.json";

@Rule
public Timeout testTimeout = Timeout.seconds(10);

private TestWorkflowEnvironment testEnvironment;
private Worker worker;

@Before
public void setUp() {
testEnvironment = TestWorkflowEnvironment.newInstance();
worker = testEnvironment.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(MyWorkflowImpl.class);
worker.registerActivitiesImplementations(new MyActivityImpl());
testEnvironment.start();
}

@After
public void tearDown() {
testEnvironment.close();
}

@Test
public void testRun() {
WorkflowClient client = testEnvironment.getWorkflowClient();
WorkflowOptions options =
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId("plain-run").build();
MyWorkflow stub = client.newWorkflowStub(MyWorkflow.class, options);
stub.execute();
WorkflowExecutionHistory history = client.fetchHistory("plain-run");
System.out.println(history.toJson(true));
}

@Test
public void testClean() throws Exception {
WorkflowExecutionHistory history = readHistoryFromResource(RES_CLEAN);
worker.replayWorkflowExecution(history);
}

@Test
public void testMayIgnore() throws Exception {
WorkflowExecutionHistory history = readHistoryFromResource(RES_MAY_IGNORE);
worker.replayWorkflowExecution(history);
}

@Test(expected = RuntimeException.class)
public void testMayNotIgnore() throws Exception {
WorkflowExecutionHistory history = readHistoryFromResource(RES_MAY_NOT_IGNORE);
worker.replayWorkflowExecution(history);
}

@WorkflowInterface
public interface MyWorkflow {

@WorkflowMethod
void execute();
}

@ActivityInterface
public interface MyActivity {

@ActivityMethod
void execute();
}

public static class MyWorkflowImpl implements MyWorkflow {

@Override
public void execute() {
MyActivity activity =
Workflow.newLocalActivityStub(
MyActivity.class,
LocalActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(1))
.build());
activity.execute();
}
}

public static class MyActivityImpl implements MyActivity {

@Override
public void execute() {
}
}
}
123 changes: 123 additions & 0 deletions temporal-sdk/src/test/resources/testUnknownHistoryEventClean.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
{
"events": [
{
"eventId": "1",
"eventTime": "2024-02-29T20:38:37.858Z",
"eventType": "WorkflowExecutionStarted",
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "MyWorkflow"
},
"taskQueue": {
"name": "unknown-history-event"
},
"input": {},
"workflowExecutionTimeout": "315360000s",
"workflowRunTimeout": "315360000s",
"workflowTaskTimeout": "10s",
"originalExecutionRunId": "447c6032-30a1-4dd7-8f7b-f965ba68715c",
"identity": "2732293@aten",
"firstExecutionRunId": "447c6032-30a1-4dd7-8f7b-f965ba68715c",
"attempt": 1,
"header": {}
}
},
{
"eventId": "2",
"eventTime": "2024-02-29T20:38:37.858Z",
"eventType": "WorkflowTaskScheduled",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "unknown-history-event"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
},
{
"eventId": "3",
"eventTime": "2024-02-29T20:38:37.867Z",
"eventType": "WorkflowTaskStarted",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "2",
"identity": "2732293@aten"
}
},
{
"eventId": "4",
"eventTime": "2024-02-29T20:38:38.001Z",
"eventType": "WorkflowTaskCompleted",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "2",
"identity": "2732293@aten",
"sdkMetadata": {
"langUsedFlags": [
1
]
},
"meteringMetadata": {}
}
},
{
"eventId": "5",
"eventTime": "2024-02-29T20:38:38.001Z",
"eventType": "MarkerRecorded",
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": {
"activityId": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg\u003d\u003d"
},
"data": "IjdjMzMxNzJkLTFhMGUtM2UxMS1iYTM4LTliNjY4ZTFkNzg5NyI\u003d"
}
]
},
"input": {},
"meta": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg\u003d\u003d"
},
"data": "eyJmaXJzdFNrZCI6MTcwOTIzOTExNzkzMywiYXRwdCI6MSwiYmFja29mZiI6bnVsbH0\u003d"
}
]
},
"time": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg\u003d\u003d"
},
"data": "MTcwOTIzOTExNzg4NQ\u003d\u003d"
}
]
},
"type": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg\u003d\u003d"
},
"data": "IkV4ZWN1dGUi"
}
]
}
},
"workflowTaskCompletedEventId": "3"
}
},
{
"eventId": "6",
"eventTime": "2024-02-29T20:38:38.001Z",
"eventType": "WorkflowExecutionCompleted",
"workflowExecutionCompletedEventAttributes": {
"result": {},
"workflowTaskCompletedEventId": "3"
}
}
]
}
Loading

0 comments on commit 58981e3

Please sign in to comment.