Skip to content

Commit

Permalink
Quantify Missed Work Completed by Reminders
Browse files Browse the repository at this point in the history
   Also fix bug to filter out heartbeat events before extracting field
  • Loading branch information
Urmi Mustafi committed Oct 25, 2023
1 parent 9a516d3 commit 9f582aa
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ public class ServiceMetricNames {
public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.delay";

// Flow Trigger Handler
public static final String FLOW_TRIGGER_HANDLER_PREFIX = "flowTriggerHandler";
public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".numFlowsSubmitted";
public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".leaseObtained";
public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".leasedToAnother";
public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".noLongerLeasing";
public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler";
public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount";
public static final String FLOW_TRIGGER_HANDLER_PREFIX = GOBBLIN_SERVICE_PREFIX + "." + "flowTriggerHandler";
public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED = FLOW_TRIGGER_HANDLER_PREFIX + ".numFlowsSubmitted";
public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leaseObtained";
public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leasedToAnother";
public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".noLongerLeasing";
public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler";
public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount";
public static final String FLOW_TRIGGER_HANDLER_NUM_REMINDERS_DOING_MISSED_WORK = FLOW_TRIGGER_HANDLER_PREFIX + ".numRemindersDoingMissedWork";

// DagManager Related Metrics
public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX + ".dagManager";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class FlowTriggerHandler {
private ContextAwareCounter noLongerLeasingStatusCount;
private ContextAwareCounter jobDoesNotExistInSchedulerCount;
private ContextAwareCounter failedToSetEventReminderCount;
private ContextAwareMeter numRemindersDoingMissedWork;

@Inject
public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> leaseDeterminationStore,
Expand All @@ -96,6 +97,7 @@ public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> lease
this.noLongerLeasingStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT);
this.jobDoesNotExistInSchedulerCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT);
this.failedToSetEventReminderCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT);
this.numRemindersDoingMissedWork = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NUM_REMINDERS_DOING_MISSED_WORK);
}

/**
Expand All @@ -116,6 +118,9 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo
// id. From this point onwards, always use the newer version of the flow action to easily track the action through
// orchestration and execution.
if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) {
if (isReminderEvent) {
this.numRemindersDoingMissedWork.mark();
}
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus;
this.leaseObtainedCount.inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,6 @@ protected void processMessage(DecodeableKafkaRecord message) {
String flowName = value.getFlowName();
String flowExecutionId = value.getFlowExecutionId();

if (value.getDagAction() == null) {
log.warn("Skipping null dag action type received for flow group: {} name: {} executionId: {} tid: {} operation: "
+ "{}", flowGroup, flowName, flowExecutionId, tid, operation);
this.malformedMessagesSkippedMeter.mark();
return;
}
DagActionStore.FlowActionType dagActionType = DagActionStore.FlowActionType.valueOf(value.getDagAction().toString());

produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} tid: {} operation: {} lag: {}",
flowGroup, flowName, flowExecutionId, tid, operation, produceToConsumeDelayValue);
Expand All @@ -143,6 +135,15 @@ protected void processMessage(DecodeableKafkaRecord message) {
return;
}

// Needs to be done after, filtering out heartbeat values that do have valid null dagActionTypes
if (value.getDagAction() == null) {
log.warn("Skipping null dag action type received for flow group: {} name: {} executionId: {} tid: {} operation: "
+ "{}", flowGroup, flowName, flowExecutionId, tid, operation);
this.malformedMessagesSkippedMeter.mark();
return;
}
DagActionStore.FlowActionType dagActionType = DagActionStore.FlowActionType.valueOf(value.getDagAction().toString());

// Used to easily log information to identify the dag action
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
dagActionType);
Expand Down

0 comments on commit 9f582aa

Please sign in to comment.