Skip to content

Commit

Permalink
Add framework and unit tests for DagActionStoreChangeMonitor (#3817)
Browse files Browse the repository at this point in the history
* Add framework and unit tests for DagActionStoreChangeMonitor

* Add more test cases and validation

* Add header for new file

* Move FlowSpec static function to Utils class

* Remove unused import

* Fix compile error

* Fix unit tests

---------

Co-authored-by: Urmi Mustafi <[email protected]>
  • Loading branch information
umustafi and Urmi Mustafi authored Nov 8, 2023
1 parent 50b6ca9 commit 5339332
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.runtime;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.net.URI;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
import org.apache.gobblin.service.monitoring.DagActionValue;
import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent;
import org.apache.gobblin.service.monitoring.OperationType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.testng.annotations.Test;

import static org.mockito.Mockito.*;


/**
* Tests the main functionality of {@link DagActionStoreChangeMonitor} to process {@link DagActionStoreChangeEvent} type
* events stored in a {@link org.apache.gobblin.kafka.client.KafkaConsumerRecord}. The
* processMessage(DecodeableKafkaRecord message) function should be able to gracefully process a variety of message
* types, even with undesired formats, without throwing exceptions.
*/
@Slf4j
public class DagActionStoreChangeMonitorTest {
public static final String TOPIC = DagActionStoreChangeEvent.class.getSimpleName();
private final int PARTITION = 1;
private final int OFFSET = 1;
private final String FLOW_GROUP = "flowGroup";
private final String FLOW_NAME = "flowName";
private final String FLOW_EXECUTION_ID = "123";
private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor;
private int txidCounter = 0;

/**
* Note: The class methods are wrapped in a test specific method because the original methods are package protected
* and cannot be accessed by this class.
*/
class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor {

public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads,
boolean isMultiActiveSchedulerEnabled) {
super(topic, config, mock(DagActionStore.class), mock(DagManager.class), numThreads, mock(FlowCatalog.class),
mock(Orchestrator.class), isMultiActiveSchedulerEnabled);
}

protected void processMessageForTest(DecodeableKafkaRecord record) {
super.processMessage(record);

}

protected void startUpForTest() {
super.startUp();
}
}

MockDagActionStoreChangeMonitor createMockDagActionStoreChangeMonitor() {
Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
.withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
.withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
return new MockDagActionStoreChangeMonitor("dummyTopic", config, 5, true);
}

// Called at start of every test so the count of each method being called is reset to 0
public void setup() {
mockDagActionStoreChangeMonitor = createMockDagActionStoreChangeMonitor();
mockDagActionStoreChangeMonitor.startUpForTest();
}

/**
* Ensure no NPE results from passing a HEARTBEAT type message with a null {@link DagActionValue} and the message is
* filtered out since it's a heartbeat type so no methods are called.
*/
@Test
public void testProcessMessageWithHeartbeatAndNullDagAction() throws SpecNotFoundException {
setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", null);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
// Note: Indirectly verifies submitFlowToDagManagerHelper is called which is not a mocked object so cannot be asserted
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class));
}

/**
* Ensure a HEARTBEAT type message with non-empty flow information is filtered out since it's a heartbeat type so no
* methods are called.
*/
@Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndNullDagAction")
public void testProcessMessageWithHeartbeatAndFlowInfo() throws SpecNotFoundException {
setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class));
}

/**
* Tests process message with an INSERT type message of a `launch` action
*/
@Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndFlowInfo")
public void testProcessMessageWithInsertLaunchType() throws SpecNotFoundException {
setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(1)).getSpecs(any(URI.class));
}

/**
* Tests process message with an INSERT type message of a `resume` action. It re-uses the same flow information however
* since it is a different tid used every time it will be considered unique and submit a kill request.
*/
@Test (dependsOnMethods = "testProcessMessageWithInsertLaunchType")
public void testProcessMessageWithInsertResumeType() throws SpecNotFoundException {
setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(1)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class));
}

/**
* Tests process message with an INSERT type message of a `kill` action. Similar to `testProcessMessageWithInsertResumeType`.
*/
@Test (dependsOnMethods = "testProcessMessageWithInsertResumeType")
public void testProcessMessageWithInsertKillType() throws SpecNotFoundException {
setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.KILL);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(1)).handleKillFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class));
}

/**
* Tests process message with an UPDATE type message of the 'launch' action above. Although processMessage does not
* expect this message type it should handle it gracefully
*/
@Test (dependsOnMethods = "testProcessMessageWithInsertKillType")
public void testProcessMessageWithUpdate() throws SpecNotFoundException {
setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.UPDATE, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class));
}

/**
* Tests process message with a DELETE type message which should be ignored regardless of the flow information.
*/
@Test (dependsOnMethods = "testProcessMessageWithUpdate")
public void testProcessMessageWithDelete() throws SpecNotFoundException {
setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class));
}

/**
* Util to create a general DagActionStoreChange type event
*/
private DagActionStoreChangeEvent createDagActionStoreChangeEvent(OperationType operationType,
String flowGroup, String flowName, String flowExecutionId, DagActionValue dagAction) {
String key = getKeyForFlow(flowGroup, flowName, flowExecutionId);
GenericStoreChangeEvent genericStoreChangeEvent =
new GenericStoreChangeEvent(key, String.valueOf(txidCounter), System.currentTimeMillis(), operationType);
txidCounter++;
return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, flowName, flowExecutionId, dagAction);
}

/**
* Form a key for events using the flow identifiers
* @return a key formed by adding an '_' delimiter between the flow identifiers
*/
private String getKeyForFlow(String flowGroup, String flowName, String flowExecutionId) {
return flowGroup + "_" + flowName + "_" + flowExecutionId;
}

/**
* Util to create wrapper around DagActionStoreChangeEvent
*/
private Kafka09ConsumerClient.Kafka09ConsumerRecord wrapDagActionStoreChangeEvent(OperationType operationType, String flowGroup, String flowName,
String flowExecutionId, DagActionValue dagAction) {
DagActionStoreChangeEvent eventToProcess = null;
try {
eventToProcess =
createDagActionStoreChangeEvent(operationType, flowGroup, flowName, flowExecutionId, dagAction);
} catch (Exception e) {
log.error("Exception while creating event ", e);
}
// TODO: handle partition and offset values better
ConsumerRecord consumerRecord = new ConsumerRecord<>(TOPIC, PARTITION, OFFSET,
getKeyForFlow(flowGroup, flowName, flowExecutionId), eventToProcess);
return new Kafka09ConsumerClient.Kafka09ConsumerRecord(consumerRecord);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -518,15 +518,15 @@ public static int maxFlowSpecUriLength() {
return URI_SCHEME.length() + ":".length() // URI separator
+ URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_NAME_LENGTH + URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
}
}

/**
* Create a new FlowSpec object with the added property defined by path and value parameters
* @param path key for new property
* @param value
*/
public static FlowSpec createFlowSpecWithProperty(FlowSpec flowSpec, String path, String value) {
Config updatedConfig = flowSpec.getConfig().withValue(path, ConfigValueFactory.fromAnyRef(value));
return new Builder(flowSpec.getUri()).withConfig(updatedConfig).build();
/**
* Create a new FlowSpec object with the added property defined by path and value parameters
* @param path key for new property
* @param value
*/
public static FlowSpec createFlowSpecWithProperty(FlowSpec flowSpec, String path, String value) {
Config updatedConfig = flowSpec.getConfig().withValue(path, ConfigValueFactory.fromAnyRef(value));
return new Builder(flowSpec.getUri()).withConfig(updatedConfig).build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.apache.gobblin.runtime.api.FlowSpec.*;


public class FlowSpecTest {

Expand All @@ -51,7 +49,7 @@ public void testAddProperty() throws URISyntaxException {
properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, "true");

FlowSpec originalFlowSpec = FlowSpec.builder(flowUri).withConfigAsProperties(properties).build();
FlowSpec updatedFlowSpec = createFlowSpecWithProperty(originalFlowSpec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
FlowSpec updatedFlowSpec = FlowSpec.Utils.createFlowSpecWithProperty(originalFlowSpec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);

Properties updatedProperties = updatedFlowSpec.getConfigAsProperties();
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.service.monitoring;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand All @@ -27,6 +28,7 @@
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
Expand All @@ -42,8 +44,6 @@
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;

import static org.apache.gobblin.runtime.api.FlowSpec.*;


/**
* A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} schema to process Kafka messages received
Expand Down Expand Up @@ -79,10 +79,13 @@ public String load(String key) throws Exception {
dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build(cacheLoader);

protected DagActionStore dagActionStore;

@Getter
@VisibleForTesting
protected DagManager dagManager;
protected Orchestrator orchestrator;
protected boolean isMultiActiveSchedulerEnabled;
@Getter
@VisibleForTesting
protected FlowCatalog flowCatalog;

// Note that the topic is an empty string (rather than null to avoid NPE) because this monitor relies on the consumer
Expand Down Expand Up @@ -200,7 +203,7 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName, S
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
// Adds flowExecutionId to config to ensure they are consistent across hosts
FlowSpec updatedSpec = createFlowSpecWithProperty(spec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
FlowSpec updatedSpec = FlowSpec.Utils.createFlowSpecWithProperty(spec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
this.orchestrator.submitFlowToDagManager(updatedSpec);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage());
Expand Down

0 comments on commit 5339332

Please sign in to comment.