diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java index aff097bbdc4..004732fcc42 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java @@ -35,13 +35,14 @@ public class ServiceMetricNames { public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.delay"; // Flow Trigger Handler - public static final String FLOW_TRIGGER_HANDLER_PREFIX = "flowTriggerHandler"; - public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".numFlowsSubmitted"; - public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".leaseObtained"; - public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".leasedToAnother"; - public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".noLongerLeasing"; - public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler"; - public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount"; + public static final String FLOW_TRIGGER_HANDLER_PREFIX = GOBBLIN_SERVICE_PREFIX + "." + "flowTriggerHandler"; + public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED = FLOW_TRIGGER_HANDLER_PREFIX + ".numFlowsSubmitted"; + public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leaseObtained"; + public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leasedToAnother"; + public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".noLongerLeasing"; + public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler"; + public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount"; + public static final String FLOW_TRIGGER_HANDLER_NUM_REMINDERS_DOING_MISSED_WORK = FLOW_TRIGGER_HANDLER_PREFIX + ".numRemindersDoingMissedWork"; // DagManager Related Metrics public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX + ".dagManager"; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java index c5a5bb8e0ee..988b2299ad7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java @@ -79,6 +79,7 @@ public class FlowTriggerHandler { private ContextAwareCounter noLongerLeasingStatusCount; private ContextAwareCounter jobDoesNotExistInSchedulerCount; private ContextAwareCounter failedToSetEventReminderCount; + private ContextAwareMeter numRemindersDoingMissedWork; @Inject public FlowTriggerHandler(Config config, Optional leaseDeterminationStore, @@ -96,6 +97,7 @@ public FlowTriggerHandler(Config config, Optional lease this.noLongerLeasingStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT); this.jobDoesNotExistInSchedulerCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT); this.failedToSetEventReminderCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT); + this.numRemindersDoingMissedWork = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NUM_REMINDERS_DOING_MISSED_WORK); } /** @@ -116,6 +118,9 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo // id. From this point onwards, always use the newer version of the flow action to easily track the action through // orchestration and execution. if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) { + if (isReminderEvent) { + this.numRemindersDoingMissedWork.mark(); + } MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus; this.leaseObtainedCount.inc(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index b05e7310521..a269fef1433 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -124,14 +124,6 @@ protected void processMessage(DecodeableKafkaRecord message) { String flowName = value.getFlowName(); String flowExecutionId = value.getFlowExecutionId(); - if (value.getDagAction() == null) { - log.warn("Skipping null dag action type received for flow group: {} name: {} executionId: {} tid: {} operation: " - + "{}", flowGroup, flowName, flowExecutionId, tid, operation); - this.malformedMessagesSkippedMeter.mark(); - return; - } - DagActionStore.FlowActionType dagActionType = DagActionStore.FlowActionType.valueOf(value.getDagAction().toString()); - produceToConsumeDelayValue = calcMillisSince(produceTimestamp); log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} tid: {} operation: {} lag: {}", flowGroup, flowName, flowExecutionId, tid, operation, produceToConsumeDelayValue); @@ -143,6 +135,15 @@ protected void processMessage(DecodeableKafkaRecord message) { return; } + // Needs to be done after, filtering out heartbeat values that do have valid null dagActionTypes + if (value.getDagAction() == null) { + log.warn("Skipping null dag action type received for flow group: {} name: {} executionId: {} tid: {} operation: " + + "{}", flowGroup, flowName, flowExecutionId, tid, operation); + this.malformedMessagesSkippedMeter.mark(); + return; + } + DagActionStore.FlowActionType dagActionType = DagActionStore.FlowActionType.valueOf(value.getDagAction().toString()); + // Used to easily log information to identify the dag action DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, dagActionType);