Skip to content

Commit

Permalink
Improve observability of the workload launcher/monitor. (#10398)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Dec 15, 2023
1 parent 62e1da6 commit 782f461
Show file tree
Hide file tree
Showing 17 changed files with 412 additions and 135 deletions.
49 changes: 38 additions & 11 deletions airbyte-cron/src/main/java/io/airbyte/cron/jobs/WorkloadMonitor.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.airbyte.cron.jobs

import datadog.trace.api.Trace
import io.airbyte.metrics.annotations.Instrument
import io.airbyte.metrics.annotations.Tag
import io.airbyte.metrics.lib.MetricAttribute
import io.airbyte.metrics.lib.MetricClient
import io.airbyte.metrics.lib.MetricTags
Expand Down Expand Up @@ -33,7 +35,7 @@ private val logger = KotlinLogging.logger { }
property = "airbyte.workload.monitor.enabled",
value = "true",
)
class WorkloadMonitor(
open class WorkloadMonitor(
private val workloadApi: WorkloadApi,
@Property(name = "airbyte.workload.monitor.claim-timeout") private val claimTimeout: Duration,
@Property(name = "airbyte.workload.monitor.heartbeat-timeout") private val heartbeatTimeout: Duration,
Expand All @@ -52,9 +54,14 @@ class WorkloadMonitor(
}

@Trace
@Instrument(
start = "WORKLOAD_MONITOR_RUN",
end = "WORKLOAD_MONITOR_DONE",
duration = "WORKLOAD_MONITOR_DURATION",
tags = [Tag(key = MetricTags.CRON_TYPE, value = CHECK_START)],
)
@Scheduled(fixedRate = "\${airbyte.workload.monitor.not-started-check-rate}")
fun cancelNotStartedWorkloads() {
metricClient.count(OssMetricsRegistry.WORKLOAD_MONITOR_RUN, 1, MetricAttribute(MetricTags.CRON_TYPE, CHECK_START))
open fun cancelNotStartedWorkloads() {
logger.info { "Checking for not started workloads." }
val oldestStartedTime = timeProvider(ZoneOffset.UTC).minusSeconds(nonStartedTimeout.seconds)
val notStartedWorkloads =
Expand All @@ -69,9 +76,14 @@ class WorkloadMonitor(
}

@Trace
@Instrument(
start = "WORKLOAD_MONITOR_RUN",
end = "WORKLOAD_MONITOR_DONE",
duration = "WORKLOAD_MONITOR_DURATION",
tags = [Tag(key = MetricTags.CRON_TYPE, value = CHECK_CLAIMS)],
)
@Scheduled(fixedRate = "\${airbyte.workload.monitor.claim-check-rate}")
fun cancelNotClaimedWorkloads() {
metricClient.count(OssMetricsRegistry.WORKLOAD_MONITOR_RUN, 1, MetricAttribute(MetricTags.CRON_TYPE, CHECK_CLAIMS))
open fun cancelNotClaimedWorkloads() {
logger.info { "Checking for not claimed workloads." }
val oldestClaimTime = timeProvider(ZoneOffset.UTC).minusSeconds(claimTimeout.seconds)
val notClaimedWorkloads =
Expand All @@ -86,9 +98,14 @@ class WorkloadMonitor(
}

@Trace
@Instrument(
start = "WORKLOAD_MONITOR_RUN",
end = "WORKLOAD_MONITOR_DONE",
duration = "WORKLOAD_MONITOR_DURATION",
tags = [Tag(key = MetricTags.CRON_TYPE, value = CHECK_HEARTBEAT)],
)
@Scheduled(fixedRate = "\${airbyte.workload.monitor.heartbeat-check-rate}")
fun cancelNotHeartbeatingWorkloads() {
metricClient.count(OssMetricsRegistry.WORKLOAD_MONITOR_RUN, 1, MetricAttribute(MetricTags.CRON_TYPE, CHECK_HEARTBEAT))
open fun cancelNotHeartbeatingWorkloads() {
logger.info { "Checking for non heartbeating workloads." }
val oldestHeartbeatTime = timeProvider(ZoneOffset.UTC).minusSeconds(heartbeatTimeout.seconds)
val nonHeartbeatingWorkloads =
Expand All @@ -103,9 +120,14 @@ class WorkloadMonitor(
}

@Trace
@Instrument(
start = "WORKLOAD_MONITOR_RUN",
end = "WORKLOAD_MONITOR_DONE",
duration = "WORKLOAD_MONITOR_DURATION",
tags = [Tag(key = MetricTags.CRON_TYPE, value = CHECK_NON_SYNC_TIMEOUT)],
)
@Scheduled(fixedRate = "\${airbyte.workload.monitor.non-sync-age-check-rate}")
fun cancelRunningForTooLongNonSyncWorkloads() {
metricClient.count(OssMetricsRegistry.WORKLOAD_MONITOR_RUN, 1, MetricAttribute(MetricTags.CRON_TYPE, CHECK_NON_SYNC_TIMEOUT))
open fun cancelRunningForTooLongNonSyncWorkloads() {
logger.info { "Checking for workloads running for too long with timeout value $nonSyncWorkloadTimeout" }
val nonHeartbeatingWorkloads =
workloadApi.workloadListOldNonSync(
Expand All @@ -118,9 +140,14 @@ class WorkloadMonitor(
}

@Trace
@Instrument(
start = "WORKLOAD_MONITOR_RUN",
end = "WORKLOAD_MONITOR_DONE",
duration = "WORKLOAD_MONITOR_DURATION",
tags = [Tag(key = MetricTags.CRON_TYPE, value = CHECK_SYNC_TIMEOUT)],
)
@Scheduled(fixedRate = "\${airbyte.workload.monitor.sync-age-check-rate}")
fun cancelRunningForTooLongSyncWorkloads() {
metricClient.count(OssMetricsRegistry.WORKLOAD_MONITOR_RUN, 1, MetricAttribute(MetricTags.CRON_TYPE, CHECK_SYNC_TIMEOUT))
open fun cancelRunningForTooLongSyncWorkloads() {
logger.info { "Checking for sync workloads running for too long with timeout value $syncWorkloadTimeout" }
val nonHeartbeatingWorkloads =
workloadApi.workloadListOldSync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ class WorkloadMonitorTest {
)
}
verify(exactly = 1) {
metricClient.count(
OssMetricsRegistry.WORKLOAD_MONITOR_RUN,
1,
MetricAttribute(MetricTags.CRON_TYPE, "workload-monitor-start"),
)
metricClient.count(
OssMetricsRegistry.WORKLOADS_CANCEL,
1,
Expand Down Expand Up @@ -123,11 +118,6 @@ class WorkloadMonitorTest {
)
}
verify(exactly = 1) {
metricClient.count(
OssMetricsRegistry.WORKLOAD_MONITOR_RUN,
1,
MetricAttribute(MetricTags.CRON_TYPE, "workload-monitor-claim"),
)
metricClient.count(
OssMetricsRegistry.WORKLOADS_CANCEL,
1,
Expand Down Expand Up @@ -165,11 +155,6 @@ class WorkloadMonitorTest {
)
}
verify(exactly = 1) {
metricClient.count(
OssMetricsRegistry.WORKLOAD_MONITOR_RUN,
1,
MetricAttribute(MetricTags.CRON_TYPE, "workload-monitor-heartbeat"),
)
metricClient.count(
OssMetricsRegistry.WORKLOADS_CANCEL,
1,
Expand Down Expand Up @@ -207,11 +192,6 @@ class WorkloadMonitorTest {
)
}
verify(exactly = 1) {
metricClient.count(
OssMetricsRegistry.WORKLOAD_MONITOR_RUN,
1,
MetricAttribute(MetricTags.CRON_TYPE, "workload-monitor-non-sync-timeout"),
)
metricClient.count(
OssMetricsRegistry.WORKLOADS_CANCEL,
1,
Expand Down Expand Up @@ -249,11 +229,6 @@ class WorkloadMonitorTest {
)
}
verify(exactly = 1) {
metricClient.count(
OssMetricsRegistry.WORKLOAD_MONITOR_RUN,
1,
MetricAttribute(MetricTags.CRON_TYPE, "workload-monitor-sync-timeout"),
)
metricClient.count(
OssMetricsRegistry.WORKLOADS_CANCEL,
1,
Expand Down
45 changes: 25 additions & 20 deletions airbyte-metrics/metrics-lib/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,39 +1,44 @@
plugins {
id("io.airbyte.gradle.jvm.lib")
id("io.airbyte.gradle.publish")
kotlin("jvm")
kotlin("kapt")
}

dependencies {
implementation( project(":airbyte-commons"))
implementation( project(":airbyte-config:config-models"))
implementation( project(":airbyte-db:jooq"))
implementation( project(":airbyte-db:db-lib"))
kapt(libs.bundles.micronaut.annotation.processor)

implementation( libs.guava)
implementation( libs.google.cloud.storage)
implementation(project(":airbyte-commons"))
implementation(project(":airbyte-config:config-models"))
implementation(project(":airbyte-db:jooq"))
implementation(project(":airbyte-db:db-lib"))

implementation(libs.guava)
implementation(libs.google.cloud.storage)
compileOnly(libs.lombok)
annotationProcessor( libs.lombok)
annotationProcessor(libs.lombok)

implementation( libs.otel.semconv)
implementation( libs.otel.sdk)
implementation( libs.otel.sdk.testing)
implementation( libs.micrometer.statsd)
implementation( platform(libs.otel.bom))
implementation(libs.otel.semconv)
implementation(libs.otel.sdk)
implementation(libs.otel.sdk.testing)
implementation(libs.micrometer.statsd)
implementation(platform(libs.otel.bom))
implementation(("io.opentelemetry:opentelemetry-api"))
implementation(("io.opentelemetry:opentelemetry-sdk"))
implementation(("io.opentelemetry:opentelemetry-exporter-otlp"))

implementation( libs.java.dogstatsd.client)
implementation( libs.bundles.datadog)
implementation(libs.java.dogstatsd.client)
implementation(libs.bundles.datadog)

testImplementation( project(":airbyte-config:config-persistence"))
testImplementation( project(":airbyte-test-utils"))
testImplementation( libs.platform.testcontainers.postgresql)
testImplementation(project(":airbyte-config:config-persistence"))
testImplementation(project(":airbyte-test-utils"))
testImplementation(libs.platform.testcontainers.postgresql)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation( libs.bundles.junit)
testImplementation( libs.assertj.core)
testImplementation(libs.bundles.junit)
testImplementation(libs.assertj.core)
testImplementation(libs.mockk)
testImplementation((variantOf(libs.opentracing.util.test) { classifier("tests") }))

testImplementation( libs.junit.pioneer)
testImplementation(libs.junit.pioneer)

}
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,12 @@ public enum OssMetricsRegistry implements MetricsRegistry {
WORKLOAD_MONITOR_RUN(MetricEmittingApps.CRON,
"workload_monitor_run",
"number of cron run for the workload_monitor"),
WORKLOAD_MONITOR_DONE(MetricEmittingApps.CRON,
"workload_monitor_done",
"number of cron completed run for the workload_monitor"),
WORKLOAD_MONITOR_DURATION(MetricEmittingApps.CRON,
"workload_monitor_duration",
"duration of a run of the workload_monitor"),
WORKLOADS_CANCEL(MetricEmittingApps.CRON,
"workload_cancel",
"number of workloads canceled"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.airbyte.metrics.annotations

import io.micronaut.aop.Around

annotation class Tag(val key: String, val value: String)

/**
* Set this annotation to a function to instrument metric emissions.
*
* As we are using [io.airbyte.metrics.lib.MetricsRegistry] to define the metrics, values provided for [start], [end] and [duration] need to be
* valid values from a [io.airbyte.metrics.lib.MetricsRegistry].
*
* For the [end] and [duration] metric, a `status` tag with values `ok` or `error` will be automatically added. Status being `error` if an
* exception is thrown, `ok` otherwise.
*
* @property start if not empty defines the name of metric to emit on start of the method.
* @property end if not empty defines the name metric to emit at the end of the method.
* @property duration if not empty defines the name metric to the duration of the method.
* @property tags defines a list of custom tags to be added to each metrics.
*/
@MustBeDocumented
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.FUNCTION)
@Around
annotation class Instrument(
val start: String = "",
val end: String = "",
val duration: String = "",
val tags: Array<Tag> = [],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.airbyte.metrics.interceptors

import io.airbyte.metrics.annotations.Instrument
import io.airbyte.metrics.annotations.Tag
import io.airbyte.metrics.lib.MetricAttribute
import io.micronaut.aop.MethodInterceptor
import io.micronaut.aop.MethodInvocationContext
import io.micronaut.core.annotation.AnnotationValue
import kotlin.time.Duration
import kotlin.time.TimeSource

abstract class InstrumentInterceptorBase : MethodInterceptor<Any, Any> {
companion object {
const val START = "start"
const val END = "end"
const val DURATION = "duration"
const val TAGS = "tags"

const val SUCCESS_STATUS = "ok"
const val FAILURE_STATUS = "error"
}

abstract fun emitStartMetric(
startMetricName: String,
tags: Array<MetricAttribute>,
)

abstract fun emitEndMetric(
endMetricName: String,
success: Boolean,
tags: Array<MetricAttribute>,
)

abstract fun emitDurationMetric(
durationMetricName: String,
duration: Duration,
success: Boolean,
tags: Array<MetricAttribute>,
)

override fun intercept(context: MethodInvocationContext<Any, Any>): Any? {
val annotationValue = context.getAnnotation(Instrument::class.java)
return if (annotationValue != null) {
doIntercept(annotationValue, context)
} else {
context.proceed()
}
}

private fun doIntercept(
annotationValue: AnnotationValue<Instrument>,
context: MethodInvocationContext<Any, Any>,
): Any? {
val tags = readTags(annotationValue)

annotationValue.stringValue(START).ifPresent { emitStartMetric(it, tags) }

var success = true
val startTime = TimeSource.Monotonic.markNow()
try {
return context.proceed()
} catch (e: Throwable) {
success = false
throw e
} finally {
annotationValue.stringValue(END).ifPresent { emitEndMetric(it, success, tags) }
annotationValue.stringValue(DURATION).ifPresent { emitDurationMetric(it, startTime.elapsedNow(), success, tags) }
}
}

private fun readTags(annotationValue: AnnotationValue<Instrument>): Array<MetricAttribute> {
return annotationValue.getAnnotations<Tag>(TAGS)
.map { MetricAttribute(it.stringValue("key").orElse(""), it.stringValue("value").orElse("")) }
.toTypedArray()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.airbyte.metrics.interceptors

import io.airbyte.metrics.annotations.Instrument
import io.airbyte.metrics.lib.MetricAttribute
import io.airbyte.metrics.lib.MetricClient
import io.airbyte.metrics.lib.MetricTags
import io.airbyte.metrics.lib.OssMetricsRegistry
import io.micronaut.aop.InterceptorBean
import jakarta.inject.Singleton
import kotlin.time.Duration
import kotlin.time.DurationUnit

@Singleton
@InterceptorBean(Instrument::class)
class MetricClientInstrumentInterceptor(private val metricClient: MetricClient) : InstrumentInterceptorBase() {
override fun emitStartMetric(
startMetricName: String,
tags: Array<MetricAttribute>,
) {
metricClient.count(OssMetricsRegistry.valueOf(startMetricName), 1, *tags)
}

override fun emitEndMetric(
endMetricName: String,
success: Boolean,
tags: Array<MetricAttribute>,
) {
metricClient.count(
OssMetricsRegistry.valueOf(endMetricName),
1,
MetricAttribute(MetricTags.STATUS, if (success) SUCCESS_STATUS else FAILURE_STATUS),
*tags,
)
}

override fun emitDurationMetric(
durationMetricName: String,
duration: Duration,
success: Boolean,
tags: Array<MetricAttribute>,
) {
metricClient.distribution(
OssMetricsRegistry.valueOf(durationMetricName),
duration.toDouble(DurationUnit.MILLISECONDS),
MetricAttribute(MetricTags.STATUS, if (success) SUCCESS_STATUS else FAILURE_STATUS),
*tags,
)
}
}
Loading

0 comments on commit 782f461

Please sign in to comment.