diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java index aff097bbdc4..5550513c436 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java @@ -20,37 +20,39 @@ public class ServiceMetricNames { // These prefixes can be used to distinguish metrics reported by GobblinService from other metrics reported by Gobblin // This can be used in conjunction with MetricNameRegexFilter to filter out metrics in any MetricReporter public static final String GOBBLIN_SERVICE_PREFIX = "GobblinService"; + public static final String GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER = GOBBLIN_SERVICE_PREFIX + "."; public static final String GOBBLIN_JOB_METRICS_PREFIX = "JobMetrics"; // Flow Compilation Meters and Timer - public static final String FLOW_COMPILATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.successful"; - public static final String FLOW_COMPILATION_FAILED_METER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.failed"; - public static final String FLOW_COMPILATION_TIMER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.time"; - public static final String DATA_AUTHORIZATION_TIMER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.dataAuthorization.time"; + public static final String FLOW_COMPILATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.successful"; + public static final String FLOW_COMPILATION_FAILED_METER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.failed"; + public static final String FLOW_COMPILATION_TIMER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.time"; + public static final String DATA_AUTHORIZATION_TIMER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.dataAuthorization.time"; // Flow Orchestration Meters and Timer - public static final String FLOW_ORCHESTRATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.successful"; - public static final String FLOW_ORCHESTRATION_FAILED_METER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.failed"; - public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.time"; - public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.delay"; + public static final String FLOW_ORCHESTRATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.successful"; + public static final String FLOW_ORCHESTRATION_FAILED_METER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.failed"; + public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.time"; + public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.delay"; // Flow Trigger Handler - public static final String FLOW_TRIGGER_HANDLER_PREFIX = "flowTriggerHandler"; - public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".numFlowsSubmitted"; - public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".leaseObtained"; - public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".leasedToAnother"; - public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".noLongerLeasing"; - public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler"; - public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount"; + public static final String FLOW_TRIGGER_HANDLER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowTriggerHandler"; + public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED = FLOW_TRIGGER_HANDLER_PREFIX + ".numFlowsSubmitted"; + public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leaseObtained"; + public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leasedToAnother"; + public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".noLongerLeasing"; + public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler"; + public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount"; + public static final String FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT = FLOW_TRIGGER_HANDLER_PREFIX + ".leasesObtainedDueToReminderCount"; // DagManager Related Metrics - public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX + ".dagManager"; + public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "dagManager"; public static final String DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".failedLaunchEventsOnStartupCount"; public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT = DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount"; //Job status poll timer - public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time"; + public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "jobStatusPoll.time"; public static final String CREATE_FLOW_METER = "CreateFlow"; public static final String DELETE_FLOW_METER = "DeleteFlow"; @@ -59,9 +61,9 @@ public class ServiceMetricNames { public static final String START_SLA_EXCEEDED_FLOWS_METER = "StartSLAExceededFlows"; public static final String SLA_EXCEEDED_FLOWS_METER = "SlaExceededFlows"; public static final String FAILED_FLOW_METER = "FailedFlows"; - public static final String SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX + ".ScheduledFlows"; - public static final String NON_SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX + ".NonScheduledFlows"; - public static final String SKIPPED_FLOWS = GOBBLIN_SERVICE_PREFIX + ".SkippedFlows"; + public static final String SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "ScheduledFlows"; + public static final String NON_SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "NonScheduledFlows"; + public static final String SKIPPED_FLOWS = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "SkippedFlows"; public static final String RUNNING_FLOWS_COUNTER = "RunningFlows"; public static final String SERVICE_USERS = "ServiceUsers"; public static final String COMPILED = "Compiled"; @@ -70,9 +72,9 @@ public class ServiceMetricNames { public static final String HELIX_LEADER_STATE = "HelixLeaderState"; - public static final String FLOWGRAPH_UPDATE_FAILED_METER = GOBBLIN_SERVICE_PREFIX + ".FlowgraphUpdateFailed"; + public static final String FLOWGRAPH_UPDATE_FAILED_METER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "FlowgraphUpdateFailed"; - public static final String DAG_COUNT_MYSQL_DAG_STATE_COUNT = GOBBLIN_SERVICE_PREFIX + ".MysqlDagStateStore" + ".totalDagCount"; + public static final String DAG_COUNT_MYSQL_DAG_STATE_COUNT = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "MysqlDagStateStore" + ".totalDagCount"; - public static final String DAG_COUNT_FS_DAG_STATE_COUNT = GOBBLIN_SERVICE_PREFIX + ".FsDagStateStore" + ".totalDagCount"; + public static final String DAG_COUNT_FS_DAG_STATE_COUNT = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "FsDagStateStore" + ".totalDagCount"; } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java index 86adb467127..de5cb2e99cc 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java @@ -37,47 +37,50 @@ public class RuntimeMetrics { public static final String GOBBLIN_JOB_MONITOR_SLAEVENT_REJECTEDEVENTS = "gobblin.jobMonitor.slaevent.rejectedevents"; public static final String GOBBLIN_JOB_MONITOR_KAFKA_MESSAGE_PARSE_FAILURES = "gobblin.jobMonitor.kafka.messageParseFailures"; - public static final String GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.successful.added.specs"; - public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.failed.added.specs"; - public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.deleted.specs"; - public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.unexpected.errors"; - public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.message.processed"; - public static final String GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = - ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specstoreMonitor.produce.to.consume.delay"; - public static final String DAG_ACTION_STORE_MONITOR_PREFIX = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT = DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED = DAG_ACTION_STORE_MONITOR_PREFIX + ".malformedMessagedSkipped"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched"; + public static final String SPEC_STORE_MONITOR_PREFIX = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "specStoreMonitor."; + public static final String GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = SPEC_STORE_MONITOR_PREFIX + "successful.added.specs"; + public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = SPEC_STORE_MONITOR_PREFIX + "failed.added.specs"; + public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = SPEC_STORE_MONITOR_PREFIX + "deleted.specs"; + public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = SPEC_STORE_MONITOR_PREFIX + "unexpected.errors"; + public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED = SPEC_STORE_MONITOR_PREFIX + "message.processed"; + public static final String GOBBLIN_SPEC_STORE_DUPLICATE_MESSAGES = SPEC_STORE_MONITOR_PREFIX + "duplicateMessages"; + public static final String GOBBLIN_SPEC_STORE_HEARTBEAT_MESSAGES = SPEC_STORE_MONITOR_PREFIX + "heartbeatMessages"; + public static final String GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = SPEC_STORE_MONITOR_PREFIX + "produce.to.consume.delay"; + public static final String DAG_ACTION_STORE_MONITOR_PREFIX = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "dagActionStoreMonitor."; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + "kills.invoked"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = DAG_ACTION_STORE_MONITOR_PREFIX + "message.processed"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_DUPLICATE_MESSAGES = DAG_ACTION_STORE_MONITOR_PREFIX + "duplicateMessages"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_HEARTBEAT_MESSAGES = DAG_ACTION_STORE_MONITOR_PREFIX + "heartbeatMessages"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_NULL_DAG_ACTION_TYPE_MESSAGES = DAG_ACTION_STORE_MONITOR_PREFIX + "nullDagActionTypeMessages"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = DAG_ACTION_STORE_MONITOR_PREFIX + "resumes.invoked"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = DAG_ACTION_STORE_MONITOR_PREFIX + "flows.launched"; - public static final String GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS = DAG_ACTION_STORE_MONITOR_PREFIX + ".failedFlowLaunchSubmissions"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = DAG_ACTION_STORE_MONITOR_PREFIX + ".unexpected.errors"; + public static final String GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS = DAG_ACTION_STORE_MONITOR_PREFIX + "failedFlowLaunchSubmissions"; + public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = DAG_ACTION_STORE_MONITOR_PREFIX + "unexpected.errors"; public static final String - GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = DAG_ACTION_STORE_MONITOR_PREFIX + ".produce.to.consume.delay"; - public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.unexpected.errors"; - public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded"; - public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota"; + GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = DAG_ACTION_STORE_MONITOR_PREFIX + "produce.to.consume.delay"; + public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "gobblin.mysql.quota.manager.unexpected.errors"; + public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "gobblin.mysql.quota.manager.quotaRequests.exceeded"; + public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "gobblin.mysql.quota.manager.time.to.check.quota"; // The following metrics are used to identify the bottlenecks for initializing the job scheduler public static final String - GOBBLIN_JOB_SCHEDULER_GET_SPECS_DURING_STARTUP_PER_SPEC_RATE_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.getSpecsDuringStartupPerSpecRateNanos"; - public static final String GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.loadSpecBatchSize"; + GOBBLIN_JOB_SCHEDULER_GET_SPECS_DURING_STARTUP_PER_SPEC_RATE_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "jobScheduler.getSpecsDuringStartupPerSpecRateNanos"; + public static final String GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "jobScheduler.loadSpecBatchSize"; public static final String - GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.timeToInitializeSchedulerNanos"; + GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "jobScheduler.timeToInitializeSchedulerNanos"; public static final String - GOBBLIN_JOB_SCHEDULER_TIME_TO_OBTAIN_SPEC_URIS_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.timeToObtainSpecUrisNanos"; + GOBBLIN_JOB_SCHEDULER_TIME_TO_OBTAIN_SPEC_URIS_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "jobScheduler.timeToObtainSpecUrisNanos"; public static final String - GOBBLIN_JOB_SCHEDULER_INDIVIDUAL_GET_SPEC_SPEED_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.individualGetSpecSpeedNanos"; + GOBBLIN_JOB_SCHEDULER_INDIVIDUAL_GET_SPEC_SPEED_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "jobScheduler.individualGetSpecSpeedNanos"; public static final String - GOBBLIN_JOB_SCHEDULER_EACH_COMPLETE_ADD_SPEC_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.eachCompleteAddSpecNanos"; + GOBBLIN_JOB_SCHEDULER_EACH_COMPLETE_ADD_SPEC_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "jobScheduler.eachCompleteAddSpecNanos"; public static final String - GOBBLIN_JOB_SCHEDULER_EACH_SPEC_COMPILATION_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.eachSpecCompilationNanos"; - public static final String GOBBLIN_JOB_SCHEDULER_EACH_SCHEDULE_JOB_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.eachScheduleJobNanos"; - public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_GET_SPEC_TIME_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.totalGetSpecTimeNanos"; - public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_ADD_SPEC_TIME_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.totalAddSpecTimeNanos"; - public static final String GOBBLIN_JOB_SCHEDULER_NUM_JOBS_SCHEDULED_DURING_STARTUP = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.numJobsScheduledDuringStartup"; + GOBBLIN_JOB_SCHEDULER_EACH_SPEC_COMPILATION_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "jobScheduler.eachSpecCompilationNanos"; + public static final String GOBBLIN_JOB_SCHEDULER_EACH_SCHEDULE_JOB_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "jobScheduler.eachScheduleJobNanos"; + public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_GET_SPEC_TIME_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "jobScheduler.totalGetSpecTimeNanos"; + public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_ADD_SPEC_TIME_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "jobScheduler.totalAddSpecTimeNanos"; + public static final String GOBBLIN_JOB_SCHEDULER_NUM_JOBS_SCHEDULED_DURING_STARTUP = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "jobScheduler.numJobsScheduledDuringStartup"; // Metadata keys public static final String TOPIC = "topic"; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index b4ec9c0ceef..c1553da39ca 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -138,7 +138,7 @@ public class DagManager extends AbstractIdleService { private static final long DAG_FLOW_STATUS_TOLERANCE_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5); public static final String FAILED_DAG_POLLING_INTERVAL = FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes"; public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60; - public static final String DAG_MANAGER_HEARTBEAT = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagManager.heartbeat-%s"; + public static final String DAG_MANAGER_HEARTBEAT = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "dagManager.heartbeat-%s"; // Default job start SLA time if configured, measured in minutes. Default is 10 minutes private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME; private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java index c5a5bb8e0ee..af65390ec50 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java @@ -79,6 +79,7 @@ public class FlowTriggerHandler { private ContextAwareCounter noLongerLeasingStatusCount; private ContextAwareCounter jobDoesNotExistInSchedulerCount; private ContextAwareCounter failedToSetEventReminderCount; + private ContextAwareMeter leasesObtainedDueToReminderCount; @Inject public FlowTriggerHandler(Config config, Optional leaseDeterminationStore, @@ -96,6 +97,7 @@ public FlowTriggerHandler(Config config, Optional lease this.noLongerLeasingStatusCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT); this.jobDoesNotExistInSchedulerCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT); this.failedToSetEventReminderCount = this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT); + this.leasesObtainedDueToReminderCount = this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT); } /** @@ -116,6 +118,9 @@ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flo // id. From this point onwards, always use the newer version of the flow action to easily track the action through // orchestration and execution. if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) { + if (isReminderEvent) { + this.leasesObtainedDueToReminderCount.mark(); + } MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus; this.leaseObtainedCount.inc(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java index a2d68fbc0d1..9e121c8b778 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java @@ -20,6 +20,8 @@ import com.google.common.cache.LoadingCache; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.metrics.ContextAwareMeter; + @Slf4j public final class ChangeMonitorUtils { @@ -28,20 +30,24 @@ private ChangeMonitorUtils() { } /** - * Performs checks for duplicate messages and heartbeat operation prior to processing a message. Returns true if - * the pre-conditions above don't apply and we should proceed processing the change event + * Performs checks for duplicate messages, heartbeat message types, or null dag action types all of which cannot or + * should not be processed. Returns true if the pre-conditions above don't apply, and we should proceed processing + * the change event */ - public static boolean shouldProcessMessage(String changeIdentifier, LoadingCache cache, - String operation, String timestamp) { + public static boolean isValidAndUniqueMessage(String changeIdentifier, String operation, String timestamp, + LoadingCache cache, ContextAwareMeter duplicateMessagesMeter, + ContextAwareMeter heartbeatMessagesMeter) { // If we've already processed a message with this timestamp and key before then skip duplicate message if (cache.getIfPresent(changeIdentifier) != null) { - log.info("Duplicate change event with identifier {}", changeIdentifier); + log.debug("Duplicate change event with identifier {}", changeIdentifier); + duplicateMessagesMeter.mark(); return false; } // If event is a heartbeat type then log it and skip processing if (operation.equals("HEARTBEAT")) { log.debug("Received heartbeat message from time {}", timestamp); + heartbeatMessagesMeter.mark(); return false; } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index b05e7310521..1190a1e460b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -61,8 +61,9 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer { private ContextAwareMeter failedFlowLaunchSubmissions; private ContextAwareMeter unexpectedErrors; private ContextAwareMeter messageProcessedMeter; - private ContextAwareMeter messageFilteredOutMeter; - private ContextAwareMeter malformedMessagesSkippedMeter; + private ContextAwareMeter duplicateMessagesMeter; + private ContextAwareMeter heartbeatMessagesMeter; + private ContextAwareMeter nullDagActionTypeMessagesMeter; private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from all partitions in one gauge private volatile Long produceToConsumeDelayValue = -1L; @@ -124,24 +125,23 @@ protected void processMessage(DecodeableKafkaRecord message) { String flowName = value.getFlowName(); String flowExecutionId = value.getFlowExecutionId(); - if (value.getDagAction() == null) { - log.warn("Skipping null dag action type received for flow group: {} name: {} executionId: {} tid: {} operation: " - + "{}", flowGroup, flowName, flowExecutionId, tid, operation); - this.malformedMessagesSkippedMeter.mark(); - return; - } - DagActionStore.FlowActionType dagActionType = DagActionStore.FlowActionType.valueOf(value.getDagAction().toString()); - produceToConsumeDelayValue = calcMillisSince(produceTimestamp); log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} tid: {} operation: {} lag: {}", flowGroup, flowName, flowExecutionId, tid, operation, produceToConsumeDelayValue); String changeIdentifier = tid + key; - if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, dagActionsSeenCache, operation, - produceTimestamp.toString())) { - this.messageFilteredOutMeter.mark(); + if (!ChangeMonitorUtils.isValidAndUniqueMessage(changeIdentifier, operation, produceTimestamp.toString(), + dagActionsSeenCache, duplicateMessagesMeter, heartbeatMessagesMeter)) { return; } + // check after filtering out heartbeat messages expected to have `dagActionValue == null` + if (value.getDagAction() == null) { + log.warn("Skipping null dag action type received for identifier {} ", changeIdentifier); + nullDagActionTypeMessagesMeter.mark(); + return; + } + + DagActionStore.FlowActionType dagActionType = DagActionStore.FlowActionType.valueOf(value.getDagAction().toString()); // Used to easily log information to identify the dag action DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, @@ -231,8 +231,9 @@ protected void createMetrics() { this.failedFlowLaunchSubmissions = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS); this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS); this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED); - this.messageFilteredOutMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT); - this.malformedMessagesSkippedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED); + this.duplicateMessagesMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_DUPLICATE_MESSAGES); + this.heartbeatMessagesMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_HEARTBEAT_MESSAGES); + this.nullDagActionTypeMessagesMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_NULL_DAG_ACTION_TYPE_MESSAGES); this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue); this.getMetricContext().register(this.produceToConsumeDelayMillis); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java index f029c08c6e4..8b197a352ef 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java @@ -59,6 +59,8 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer { private ContextAwareMeter failedAddedSpecs; private ContextAwareMeter deletedSpecs; private ContextAwareMeter unexpectedErrors; + private ContextAwareMeter duplicateMessagesMeter; + private ContextAwareMeter heartbeatMessagesMeter; private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from all partitions in one gauge private volatile Long produceToConsumeDelayValue = -1L; @@ -115,8 +117,8 @@ protected void processMessage(DecodeableKafkaRecord message) { produceToConsumeDelayValue); String changeIdentifier = tid + key; - if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, specChangesSeenCache, operation, - produceTimestamp.toString())) { + if (!ChangeMonitorUtils.isValidAndUniqueMessage(changeIdentifier, operation, produceTimestamp.toString(), + specChangesSeenCache, duplicateMessagesMeter, heartbeatMessagesMeter)) { return; } @@ -179,6 +181,8 @@ protected void createMetrics() { this.deletedSpecs = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS); this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS); this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED); + this.duplicateMessagesMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_DUPLICATE_MESSAGES); + this.heartbeatMessagesMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_HEARTBEAT_MESSAGES); this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue); this.getMetricContext().register(this.produceToConsumeDelayMillis); }