Skip to content

Commit

Permalink
Add mockito.verify in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vsinghal85 committed Dec 12, 2024
1 parent fdf09f9 commit a1288bc
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
dagManagementStateStore.updateDagNode(dagNode);
sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
} catch (Exception e) {
String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
// Only mark the job as failed in case of non transient exceptions
if(!DagProcessingEngine.isTransientException(e)){
if(!DagProcessingEngine.isTransientException(e)) {
TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage());
if (jobFailedTimer != null) {
jobFailedTimer.stop(jobMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.concurrent.ExecutionException;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
Expand All @@ -24,21 +23,16 @@
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

@RunWith(PowerMockRunner.class)
@PrepareForTest(EventSubmitter.class)
public class DagProcUtilsTest {

DagManagementStateStore dagManagementStateStore;
SpecExecutor mockSpecExecutor;

@BeforeClass
@BeforeTest
public void setUp() {
dagManagementStateStore = Mockito.mock(DagManagementStateStore.class);
mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class));
Expand All @@ -56,38 +50,45 @@ public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
Mockito.verify(dagManagementStateStore, Mockito.times(jobExecutionPlans.size())).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
}

@Test
public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOException, ExecutionException, InterruptedException {
public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOException {
Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680);
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(0);
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
dagNodeList.add(dagNode);
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
Mockito.doNothing().when(metrics).incrementJobsSentToExecutor(dagNode);
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
Mockito.verify(dagManagementStateStore, Mockito.times(2)).getDagManagerMetrics();
Mockito.verify(dagManagementStateStore, Mockito.times(1)).updateDagNode(dagNode);
Mockito.verify(metrics, Mockito.times(1)).incrementRunningJobMetrics(dagNode);
Mockito.verify(metrics, Mockito.times(1)).incrementJobsSentToExecutor(dagNode);
}

@Test(expectedExceptions = RuntimeException.class)
@Test(expectedExceptions = RuntimeException.class, dependsOnMethods = "testWhenSubmitToExecutorSuccess")
public void testWhenSubmitToExecutorGivesRuntimeException() throws URISyntaxException, IOException, ExecutionException, InterruptedException{
Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678);
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2);
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
dagNodeList.add(dagNode);
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
SpecProducer<Spec> mockedSpecProducer = mockSpecExecutor.getProducer().get();
Mockito.doThrow(RuntimeException.class).when(mockedSpecProducer).addSpec(Mockito.any(JobSpec.class));
DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
Mockito.verify(mockedSpecProducer, Mockito.times(1)).addSpec(Mockito.any(JobSpec.class));
Mockito.verify(dagManagementStateStore, Mockito.times(1)).getDagManagerMetrics();
Mockito.verify(metrics, Mockito.times(1)).incrementRunningJobMetrics(dagNode);
}

private List<JobExecutionPlan> getJobExecutionPlans() throws URISyntaxException {
Expand Down

0 comments on commit a1288bc

Please sign in to comment.