diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/runtime/CanFailConnector.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/runtime/CanFailConnector.java index a41ea6e33d8..d2d6dc7096f 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/runtime/CanFailConnector.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/runtime/CanFailConnector.java @@ -27,8 +27,11 @@ @EnableBinding(CanFailConnectorChannels.class) public class CanFailConnector { - private boolean shouldSendError = true; + private boolean shouldSendError = false; + private boolean shouldSendResult = true; private AtomicBoolean integrationErrorSent = new AtomicBoolean(false); + private AtomicBoolean resultSent = new AtomicBoolean(false); + private AtomicBoolean resultNotSent = new AtomicBoolean(false); private IntegrationRequest latestReceivedIntegrationRequest; private final IntegrationResultSender integrationResultSender; @@ -42,6 +45,7 @@ public CanFailConnector(IntegrationResultSender integrationResultSender, public void setShouldSendError(boolean shouldSendError) { this.shouldSendError = shouldSendError; + this.integrationErrorSent.set(false); } @StreamListener(value = CanFailConnectorChannels.CAN_FAIL_CONNECTOR) @@ -49,13 +53,25 @@ public void canFailConnector(IntegrationRequest integrationRequest) { latestReceivedIntegrationRequest = integrationRequest; integrationErrorSent.set(false); if (shouldSendError) { - integrationErrorSent.set(true); integrationErrorSender.send(integrationRequest, new RuntimeException("task failed")); - } else { + integrationErrorSent.set(true); + } else if (shouldSendResult){ integrationResultSender.send(integrationRequest, integrationRequest.getIntegrationContext()); + resultSent.set(true); + } else { + resultNotSent.set(true); } + + } + + public AtomicBoolean resultSent() { + return resultSent; + } + + public AtomicBoolean resultNotSent() { + return resultNotSent; } public AtomicBoolean errorSent() { @@ -65,4 +81,15 @@ public AtomicBoolean errorSent() { public IntegrationRequest getLatestReceivedIntegrationRequest() { return latestReceivedIntegrationRequest; } + + public void setShouldSendResult(boolean shouldSendResult) { + this.shouldSendResult = shouldSendResult; + this.resultSent.set(false); + this.resultNotSent.set(false); + } + + public void reset() { + setShouldSendResult(true); + setShouldSendError(false); + } } diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/runtime/MQServiceTaskIT.java b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/runtime/MQServiceTaskIT.java index 29892f361e8..63266a8c783 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/runtime/MQServiceTaskIT.java +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/java/org/activiti/cloud/starter/tests/runtime/MQServiceTaskIT.java @@ -16,6 +16,7 @@ package org.activiti.cloud.starter.tests.runtime; import org.activiti.api.process.model.IntegrationContext; +import org.activiti.api.task.model.builders.CompleteTaskPayloadBuilder; import org.activiti.cloud.services.rest.api.ReplayServiceTaskRequest; import org.activiti.engine.runtime.ProcessInstance; import org.activiti.engine.task.Task; @@ -24,17 +25,17 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.cloud.stream.config.BindingProperties; - -import java.util.AbstractMap; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpEntity; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import static org.activiti.cloud.starter.tests.helper.ProcessInstanceRestTemplate.CONTENT_TYPE_HEADER; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; @@ -73,7 +74,10 @@ public void shouldConfigureDefaultConnectorBindingProperties() { @Test public void shouldRecoverFromFailure() { + canFailConnector.reset(); + //given + canFailConnector.setShouldSendError(true); Map variables = new HashMap<>(); variables.put("firstName", "John"); ProcessInstance procInst = runtimeService.startProcessInstanceByKey("MQServiceTaskErrorRecoverProcess", @@ -92,6 +96,9 @@ public void shouldRecoverFromFailure() { canFailConnector.setShouldSendError(false); replayServiceTask(integrationContext); + await("the service task should send the result the second try") + .untilTrue(canFailConnector.resultSent()); + //then await("the execution should arrive in the human tasks which follows the service task") .untilAsserted(() -> { @@ -102,6 +109,55 @@ public void shouldRecoverFromFailure() { ); } + @Test + public void shouldReplayRunningServiceTask() { + canFailConnector.reset(); + + //given + canFailConnector.setShouldSendResult(false); + Map variables = new HashMap<>(); + variables.put("firstName", "John"); + ProcessInstance procInst = runtimeService.startProcessInstanceByKey("MQServiceTaskErrorRecoverProcess", + "businessKey", + variables); + assertThat(procInst).isNotNull(); + await("the service task should not send the result on the first try") + .untilTrue(canFailConnector.resultNotSent()); + + assertThat(taskService.createTaskQuery() + .processInstanceId(procInst.getProcessInstanceId()) + .list()).isEmpty(); + //when + IntegrationContext integrationContext = canFailConnector.getLatestReceivedIntegrationRequest() + .getIntegrationContext(); + canFailConnector.setShouldSendResult(true); + replayServiceTask(integrationContext); + + await("the service task should send the result the second try") + .untilTrue(canFailConnector.resultSent()); + + //then + await("the execution should arrive in the human tasks which follows the service task") + .untilAsserted(() -> { + List tasks = taskService.createTaskQuery().processInstanceId(procInst.getProcessInstanceId()).list(); + assertThat(tasks).isNotNull(); + assertThat(tasks).extracting(Task::getName).containsExactly("Schedule meeting after service"); + } + ); + + // and given + Task task = taskService.createTaskQuery() + .processInstanceId(procInst.getProcessInstanceId()) + .singleResult(); + // when + complete(task); + + // then + assertThat(runtimeService.createProcessInstanceQuery() + .processInstanceId(procInst.getProcessInstanceId()) + .list()).isEmpty(); + } + private void replayServiceTask(IntegrationContext integrationContext) { identityTokenProducer.withTestUser("testadmin"); final ResponseEntity responseEntity = testRestTemplate.exchange("/admin/v1/executions/{executionId}/replay/service-task", @@ -113,4 +169,15 @@ private void replayServiceTask(IntegrationContext integrationContext) { assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK); } + private void complete(Task task) { + identityTokenProducer.withTestUser(keycloakTestUser); + final ResponseEntity responseEntity = testRestTemplate.exchange("/v1/tasks/{taskId}/complete", + HttpMethod.POST, + new HttpEntity<>(new CompleteTaskPayloadBuilder() + .withTaskId(task.getId()) + .build(), CONTENT_TYPE_HEADER), + new ParameterizedTypeReference<>() { + }, task.getId()); + assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK); + } } diff --git a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/MQServiceTaskErrorRecoverProcess.bpmn20.xml b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/MQServiceTaskErrorRecoverProcess.bpmn20.xml index 75a8423bdeb..d824ba99810 100644 --- a/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/MQServiceTaskErrorRecoverProcess.bpmn20.xml +++ b/activiti-cloud-runtime-bundle-service/activiti-cloud-starter-runtime-bundle/src/test/resources/processes/MQServiceTaskErrorRecoverProcess.bpmn20.xml @@ -1,5 +1,5 @@ - + @@ -10,7 +10,7 @@ - +