diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java index 247f2ddfdbf..11cbf7abdd4 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java @@ -359,7 +359,10 @@ public enum OssMetricsRegistry implements MetricsRegistry { "Skip the line because of its size"), TOO_LONG_LINES_DISTRIBUTION(MetricEmittingApps.WORKER, "too_long_lines_distribution", - "Too long line distribution"); + "Too long line distribution"), + WORKLOAD_LAUNCHER_KUBE_ERROR(MetricEmittingApps.WORKLOAD_LAUNCHER, + "workload_kube_error", + "Number of kube error in the workload launcher"); private final MetricEmittingApp application; private final String metricName; diff --git a/airbyte-workload-launcher/src/main/kotlin/pods/OrchestratorPodLauncher.kt b/airbyte-workload-launcher/src/main/kotlin/pods/OrchestratorPodLauncher.kt index b0b1f48f835..c9c07283013 100644 --- a/airbyte-workload-launcher/src/main/kotlin/pods/OrchestratorPodLauncher.kt +++ b/airbyte-workload-launcher/src/main/kotlin/pods/OrchestratorPodLauncher.kt @@ -6,6 +6,9 @@ import io.airbyte.featureflag.ANONYMOUS import io.airbyte.featureflag.Connection import io.airbyte.featureflag.FeatureFlagClient import io.airbyte.featureflag.UseCustomK8sScheduler +import io.airbyte.metrics.lib.MetricAttribute +import io.airbyte.metrics.lib.MetricClient +import io.airbyte.metrics.lib.OssMetricsRegistry import io.airbyte.workers.process.KubePodInfo import io.airbyte.workers.process.KubePodProcess import io.airbyte.workers.process.KubePodResourceHelper @@ -34,7 +37,6 @@ import java.time.Duration import java.util.Objects import java.util.concurrent.TimeUnit import java.util.function.Predicate -import kotlin.collections.ArrayList private val logger = KotlinLogging.logger {} @@ -55,6 +57,7 @@ class OrchestratorPodLauncher( @Named("orchestratorEnvVars") private val envVars: List, @Named("orchestratorContainerPorts") private val containerPorts: List, @Named("orchestratorAnnotations") private val annotations: Map, + private val metricClient: MetricClient, ) { fun create( allLabels: Map, @@ -191,36 +194,51 @@ class OrchestratorPodLauncher( .build() // should only create after the kubernetes API creates the pod - return kubernetesClient.pods() - .inNamespace(kubePodInfo.namespace) - .resource(podToCreate) - .serverSideApply() + return runKubeCommand( + { + kubernetesClient.pods() + .inNamespace(kubePodInfo.namespace) + .resource(podToCreate) + .serverSideApply() + }, + "pod_create", + ) } fun waitForPodInit( labels: Map, waitDuration: Duration, ) { - kubernetesClient.pods() - .inNamespace(namespace) - .withLabels(labels) - .waitUntilCondition( - { p: Pod -> - ( - p.status.initContainerStatuses.isNotEmpty() && - p.status.initContainerStatuses[0].state.waiting == null + runKubeCommand( + { + kubernetesClient.pods() + .inNamespace(namespace) + .withLabels(labels) + .waitUntilCondition( + { p: Pod -> + ( + p.status.initContainerStatuses.isNotEmpty() && + p.status.initContainerStatuses[0].state.waiting == null + ) + }, + waitDuration.toMinutes(), + TimeUnit.MINUTES, ) - }, - waitDuration.toMinutes(), - TimeUnit.MINUTES, - ) + }, + "wait", + ) val pods = - kubernetesClient.pods() - .inNamespace(namespace) - .withLabels(labels) - .list() - .items + runKubeCommand( + { + kubernetesClient.pods() + .inNamespace(namespace) + .withLabels(labels) + .list() + .items + }, + "list", + ) if (pods.isEmpty()) { throw RuntimeException("No pods found for labels: $labels. Nothing to wait for.") @@ -244,17 +262,22 @@ class OrchestratorPodLauncher( labels: Map, waitDuration: Duration, ) { - kubernetesClient.pods() - .inNamespace(namespace) - .withLabels(labels) - .waitUntilCondition( - { p: Pod? -> - Objects.nonNull(p) && - (Readiness.getInstance().isReady(p) || KubePodResourceHelper.isTerminal(p)) - }, - waitDuration.toMinutes(), - TimeUnit.MINUTES, - ) + runKubeCommand( + { + kubernetesClient.pods() + .inNamespace(namespace) + .withLabels(labels) + .waitUntilCondition( + { p: Pod? -> + Objects.nonNull(p) && + (Readiness.getInstance().isReady(p) || KubePodResourceHelper.isTerminal(p)) + }, + waitDuration.toMinutes(), + TimeUnit.MINUTES, + ) + }, + "wait", + ) } fun copyFilesToKubeConfigVolumeMain( @@ -284,7 +307,13 @@ class OrchestratorPodLauncher( containerPath, KubePodProcess.INIT_CONTAINER_NAME, ) - proc = Runtime.getRuntime().exec(command) + proc = + runKubeCommand( + { + Runtime.getRuntime().exec(command) + }, + "kubectl_cp", + ) val exitCode = proc.waitFor() if (exitCode != 0) { throw IOException("kubectl cp failed with exit code $exitCode") @@ -302,21 +331,26 @@ class OrchestratorPodLauncher( fun podsExist(labels: Map): Boolean { try { - return kubernetesClient.pods() - .inNamespace(namespace) - .withLabels(labels) - .list() - .items - .stream() - .filter( - Predicate { kubePod: Pod? -> - !KubePodResourceHelper.isTerminal( - kubePod, + return runKubeCommand( + { + kubernetesClient.pods() + .inNamespace(namespace) + .withLabels(labels) + .list() + .items + .stream() + .filter( + Predicate { kubePod: Pod? -> + !KubePodResourceHelper.isTerminal( + kubePod, + ) + }, ) - }, - ) - .findAny() - .isPresent + .findAny() + .isPresent + }, + "list", + ) } catch (e: Exception) { logger.warn(e) { "Could not find pods running for $labels, presuming no pods are running" } return false @@ -324,17 +358,37 @@ class OrchestratorPodLauncher( } fun deletePods(labels: Map): List { - return kubernetesClient.pods() - .inNamespace(namespace) - .withLabels(labels) - .list() - .items - .flatMap { p -> + return runKubeCommand( + { kubernetesClient.pods() .inNamespace(namespace) - .resource(p) - .withPropagationPolicy(DeletionPropagation.FOREGROUND) - .delete() - } + .withLabels(labels) + .list() + .items + .flatMap { p -> + kubernetesClient.pods() + .inNamespace(namespace) + .resource(p) + .withPropagationPolicy(DeletionPropagation.FOREGROUND) + .delete() + } + }, + "delete", + ) + } + + private fun runKubeCommand( + kubeCommand: () -> T, + commandName: String, + ): T { + try { + return kubeCommand() + } catch (e: Exception) { + val attributes: List = listOf(MetricAttribute("operation", commandName)) + val attributesArray = attributes.toTypedArray() + metricClient.count(OssMetricsRegistry.WORKLOAD_LAUNCHER_KUBE_ERROR, 1, *attributesArray) + + throw e + } } } diff --git a/airbyte-workload-launcher/src/test/kotlin/pods/OrchestratorPodLauncherTest.kt b/airbyte-workload-launcher/src/test/kotlin/pods/OrchestratorPodLauncherTest.kt new file mode 100644 index 00000000000..465e60eaf6d --- /dev/null +++ b/airbyte-workload-launcher/src/test/kotlin/pods/OrchestratorPodLauncherTest.kt @@ -0,0 +1,124 @@ +package pods + +import io.airbyte.config.ResourceRequirements +import io.airbyte.featureflag.FeatureFlagClient +import io.airbyte.metrics.lib.MetricAttribute +import io.airbyte.metrics.lib.MetricClient +import io.airbyte.metrics.lib.OssMetricsRegistry +import io.airbyte.workers.process.KubeContainerInfo +import io.airbyte.workers.process.KubePodInfo +import io.airbyte.workload.launcher.pods.OrchestratorPodLauncher +import io.fabric8.kubernetes.client.KubernetesClient +import io.mockk.every +import io.mockk.impl.annotations.MockK +import io.mockk.junit5.MockKExtension +import io.mockk.verify +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.api.extension.ExtendWith +import java.lang.IllegalStateException +import java.time.Duration + +@ExtendWith(MockKExtension::class) +class OrchestratorPodLauncherTest { + @MockK + private lateinit var kubernetesClient: KubernetesClient + + @MockK + private lateinit var featureFlagClient: FeatureFlagClient + + @MockK + private lateinit var metricClient: MetricClient + + private lateinit var orchestratorPodLauncher: OrchestratorPodLauncher + + @BeforeEach + fun setup() { + orchestratorPodLauncher = + OrchestratorPodLauncher( + kubernetesClient, + featureFlagClient, + "", + "", + "", + "", + "", + "", + "", + listOf(), + listOf(), + mapOf(), + metricClient, + ) + + every { featureFlagClient.stringVariation(any(), any()) } returns "" + every { kubernetesClient.pods() } throws IllegalStateException() + every { metricClient.count(any(), any(), any()) } returns Unit + } + + @Test + fun `test fail to create pod`() { + assertThrows { + orchestratorPodLauncher.create( + mapOf(), + ResourceRequirements(), + mapOf(), + KubePodInfo("", "", KubeContainerInfo("", "")), + ) + } + + checkMetricSend("pod_create") + } + + @Test + fun `test fail to wait for pod init`() { + assertThrows { + orchestratorPodLauncher.waitForPodInit( + mapOf(), + Duration.ZERO, + ) + } + + checkMetricSend("wait") + } + + @Test + fun `test fail to wait for pod ready or terminal`() { + assertThrows { + orchestratorPodLauncher.waitForPodReadyOrTerminal( + mapOf(), + Duration.ZERO, + ) + } + + checkMetricSend("wait") + } + + @Test + fun `test fail to check if pod exist`() { + assertFalse(orchestratorPodLauncher.podsExist(mapOf())) + + checkMetricSend("list") + } + + @Test + fun `test fail to delete pod`() { + assertThrows { + orchestratorPodLauncher.deletePods( + mapOf(), + ) + } + + checkMetricSend("delete") + } + + private fun checkMetricSend(tag: String) { + val attributes: List = listOf(MetricAttribute("operation", tag)) + val attributesArray = attributes.toTypedArray() + verify { + metricClient.count(OssMetricsRegistry.WORKLOAD_LAUNCHER_KUBE_ERROR, 1, *attributesArray) + } + } +}