From 81648a890d04bb7b4bb6876750f5a20f028ec573 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Wed, 10 Apr 2024 17:36:15 -0700 Subject: [PATCH] Remove WorkloadId from metric tags (#12061) --- airbyte-workload-launcher/src/main/kotlin/ClaimedProcessor.kt | 2 -- .../src/main/kotlin/pipeline/LaunchPipeline.kt | 2 -- .../src/main/kotlin/pipeline/handlers/FailureHandler.kt | 1 - .../src/main/kotlin/pipeline/handlers/SuccessHandler.kt | 1 - .../src/main/kotlin/pipeline/stages/CheckStatusStage.kt | 2 -- .../src/main/kotlin/pipeline/stages/ClaimStage.kt | 3 --- .../src/main/kotlin/pipeline/stages/EnforceMutexStage.kt | 2 -- .../src/test/kotlin/pipeline/handlers/SuccessHandlerTest.kt | 2 -- 8 files changed, 15 deletions(-) diff --git a/airbyte-workload-launcher/src/main/kotlin/ClaimedProcessor.kt b/airbyte-workload-launcher/src/main/kotlin/ClaimedProcessor.kt index ba1e4d154df..c7d255c91df 100644 --- a/airbyte-workload-launcher/src/main/kotlin/ClaimedProcessor.kt +++ b/airbyte-workload-launcher/src/main/kotlin/ClaimedProcessor.kt @@ -16,7 +16,6 @@ import io.airbyte.workload.launcher.metrics.CustomMetricPublisher import io.airbyte.workload.launcher.metrics.MeterFilterFactory import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.DATA_PLANE_ID_TAG import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.RESUME_CLAIMED_OPERATION_NAME -import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.WORKLOAD_ID_TAG import io.airbyte.workload.launcher.metrics.WorkloadLauncherMetricMetadata import io.airbyte.workload.launcher.model.toLauncherInput import io.airbyte.workload.launcher.pipeline.LaunchPipeline @@ -72,7 +71,6 @@ class ClaimedProcessor( private fun runOnClaimedScheduler(msg: LauncherInput): Mono { metricPublisher.count( WorkloadLauncherMetricMetadata.WORKLOAD_CLAIM_RESUMED, - MetricAttribute(WORKLOAD_ID_TAG, msg.workloadId), MetricAttribute(MeterFilterFactory.WORKLOAD_TYPE_TAG, msg.workloadType.toString()), ) return pipe.buildPipeline(msg) diff --git a/airbyte-workload-launcher/src/main/kotlin/pipeline/LaunchPipeline.kt b/airbyte-workload-launcher/src/main/kotlin/pipeline/LaunchPipeline.kt index 19674597d16..f54c1d9b9aa 100644 --- a/airbyte-workload-launcher/src/main/kotlin/pipeline/LaunchPipeline.kt +++ b/airbyte-workload-launcher/src/main/kotlin/pipeline/LaunchPipeline.kt @@ -45,7 +45,6 @@ class LaunchPipeline( val startTime = TimeSource.Monotonic.markNow() metricPublisher.count( WorkloadLauncherMetricMetadata.WORKLOAD_RECEIVED, - MetricAttribute(WORKLOAD_ID_TAG, msg.workloadId), MetricAttribute(MeterFilterFactory.WORKLOAD_TYPE_TAG, msg.workloadType.toString()), ) val disposable = @@ -55,7 +54,6 @@ class LaunchPipeline( metricPublisher.timer( WorkloadLauncherMetricMetadata.WORKLOAD_LAUNCH_DURATION, startTime.elapsedNow().toJavaDuration(), - MetricAttribute(MeterFilterFactory.WORKLOAD_TYPE_TAG, msg.workloadType.toString()), ) disposable.dispose() } diff --git a/airbyte-workload-launcher/src/main/kotlin/pipeline/handlers/FailureHandler.kt b/airbyte-workload-launcher/src/main/kotlin/pipeline/handlers/FailureHandler.kt index d6440a96c0d..f62667363a9 100644 --- a/airbyte-workload-launcher/src/main/kotlin/pipeline/handlers/FailureHandler.kt +++ b/airbyte-workload-launcher/src/main/kotlin/pipeline/handlers/FailureHandler.kt @@ -51,7 +51,6 @@ class FailureHandler( add(MetricAttribute(MeterFilterFactory.KUBE_POD_TYPE_TAG, clientEx.podType.toString())) } } - add(MetricAttribute(MeterFilterFactory.WORKLOAD_ID_TAG, io.msg.workloadId)) add(MetricAttribute(MeterFilterFactory.WORKLOAD_TYPE_TAG, io.msg.workloadType.toString())) add(MetricAttribute(MeterFilterFactory.STATUS_TAG, MeterFilterFactory.FAILURE_STATUS)) } diff --git a/airbyte-workload-launcher/src/main/kotlin/pipeline/handlers/SuccessHandler.kt b/airbyte-workload-launcher/src/main/kotlin/pipeline/handlers/SuccessHandler.kt index a64960af01d..19207bd3962 100644 --- a/airbyte-workload-launcher/src/main/kotlin/pipeline/handlers/SuccessHandler.kt +++ b/airbyte-workload-launcher/src/main/kotlin/pipeline/handlers/SuccessHandler.kt @@ -30,7 +30,6 @@ class SuccessHandler( withLoggingContext(io.logCtx) { metricPublisher.count( WorkloadLauncherMetricMetadata.WORKLOAD_PROCESSED, - MetricAttribute(MeterFilterFactory.WORKLOAD_ID_TAG, io.msg.workloadId), MetricAttribute(MeterFilterFactory.WORKLOAD_TYPE_TAG, io.msg.workloadType.toString()), MetricAttribute(MeterFilterFactory.STATUS_TAG, MeterFilterFactory.SUCCESS_STATUS), ) diff --git a/airbyte-workload-launcher/src/main/kotlin/pipeline/stages/CheckStatusStage.kt b/airbyte-workload-launcher/src/main/kotlin/pipeline/stages/CheckStatusStage.kt index 066b0f1b667..05ca9caadee 100644 --- a/airbyte-workload-launcher/src/main/kotlin/pipeline/stages/CheckStatusStage.kt +++ b/airbyte-workload-launcher/src/main/kotlin/pipeline/stages/CheckStatusStage.kt @@ -6,7 +6,6 @@ import io.airbyte.metrics.annotations.Tag import io.airbyte.metrics.lib.MetricAttribute import io.airbyte.workload.launcher.metrics.CustomMetricPublisher import io.airbyte.workload.launcher.metrics.MeterFilterFactory -import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.WORKLOAD_ID_TAG import io.airbyte.workload.launcher.metrics.WorkloadLauncherMetricMetadata import io.airbyte.workload.launcher.pipeline.stages.model.LaunchStage import io.airbyte.workload.launcher.pipeline.stages.model.LaunchStageIO @@ -49,7 +48,6 @@ open class CheckStatusStage( } customMetricPublisher.count( WorkloadLauncherMetricMetadata.WORKLOAD_ALREADY_RUNNING, - MetricAttribute(WORKLOAD_ID_TAG, input.msg.workloadId), MetricAttribute(MeterFilterFactory.WORKLOAD_TYPE_TAG, input.msg.workloadType.toString()), ) diff --git a/airbyte-workload-launcher/src/main/kotlin/pipeline/stages/ClaimStage.kt b/airbyte-workload-launcher/src/main/kotlin/pipeline/stages/ClaimStage.kt index 4f56ca9fa78..5274dc9d23f 100644 --- a/airbyte-workload-launcher/src/main/kotlin/pipeline/stages/ClaimStage.kt +++ b/airbyte-workload-launcher/src/main/kotlin/pipeline/stages/ClaimStage.kt @@ -11,7 +11,6 @@ import io.airbyte.metrics.lib.MetricAttribute import io.airbyte.workload.launcher.client.WorkloadApiClient import io.airbyte.workload.launcher.metrics.CustomMetricPublisher import io.airbyte.workload.launcher.metrics.MeterFilterFactory -import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.WORKLOAD_ID_TAG import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.WORKLOAD_TYPE_TAG import io.airbyte.workload.launcher.metrics.WorkloadLauncherMetricMetadata import io.airbyte.workload.launcher.pipeline.stages.model.LaunchStage @@ -52,7 +51,6 @@ open class ClaimStage( if (!claimed) { metricPublisher.count( WorkloadLauncherMetricMetadata.WORKLOAD_NOT_CLAIMED, - MetricAttribute(WORKLOAD_ID_TAG, input.msg.workloadId), MetricAttribute(WORKLOAD_TYPE_TAG, input.msg.workloadType.toString()), ) logger.info { "Workload not claimed. Setting SKIP flag to true." } @@ -63,7 +61,6 @@ open class ClaimStage( metricPublisher.count( WorkloadLauncherMetricMetadata.WORKLOAD_CLAIMED, - MetricAttribute(WORKLOAD_ID_TAG, input.msg.workloadId), MetricAttribute(WORKLOAD_TYPE_TAG, input.msg.workloadType.toString()), ) return input diff --git a/airbyte-workload-launcher/src/main/kotlin/pipeline/stages/EnforceMutexStage.kt b/airbyte-workload-launcher/src/main/kotlin/pipeline/stages/EnforceMutexStage.kt index c5852151734..6f15b90994f 100644 --- a/airbyte-workload-launcher/src/main/kotlin/pipeline/stages/EnforceMutexStage.kt +++ b/airbyte-workload-launcher/src/main/kotlin/pipeline/stages/EnforceMutexStage.kt @@ -7,7 +7,6 @@ import io.airbyte.metrics.lib.MetricAttribute import io.airbyte.workload.launcher.metrics.CustomMetricPublisher import io.airbyte.workload.launcher.metrics.MeterFilterFactory import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.MUTEX_KEY_TAG -import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.WORKLOAD_ID_TAG import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.WORKLOAD_TYPE_TAG import io.airbyte.workload.launcher.metrics.WorkloadLauncherMetricMetadata import io.airbyte.workload.launcher.pipeline.stages.model.LaunchStage @@ -59,7 +58,6 @@ open class EnforceMutexStage( logger.info { "Existing pods for mutex key: $key deleted." } metricPublisher.count( WorkloadLauncherMetricMetadata.PODS_DELETED_FOR_MUTEX_KEY, - MetricAttribute(WORKLOAD_ID_TAG, input.msg.workloadId), MetricAttribute(WORKLOAD_TYPE_TAG, input.msg.workloadType.toString()), MetricAttribute(MUTEX_KEY_TAG, key), ) diff --git a/airbyte-workload-launcher/src/test/kotlin/pipeline/handlers/SuccessHandlerTest.kt b/airbyte-workload-launcher/src/test/kotlin/pipeline/handlers/SuccessHandlerTest.kt index c94e6c35870..ad92d2dca89 100644 --- a/airbyte-workload-launcher/src/test/kotlin/pipeline/handlers/SuccessHandlerTest.kt +++ b/airbyte-workload-launcher/src/test/kotlin/pipeline/handlers/SuccessHandlerTest.kt @@ -34,7 +34,6 @@ class SuccessHandlerTest { any(), any(), any(), - any(), ) } returns Unit @@ -69,7 +68,6 @@ class SuccessHandlerTest { any(), any(), any(), - any(), ) } returns Unit