Skip to content

Commit

Permalink
Send metric when kube fails (#10190)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Dec 6, 2023
1 parent 049d598 commit 85fd659
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,10 @@ public enum OssMetricsRegistry implements MetricsRegistry {
"Skip the line because of its size"),
TOO_LONG_LINES_DISTRIBUTION(MetricEmittingApps.WORKER,
"too_long_lines_distribution",
"Too long line distribution");
"Too long line distribution"),
WORKLOAD_LAUNCHER_KUBE_ERROR(MetricEmittingApps.WORKLOAD_LAUNCHER,
"workload_kube_error",
"Number of kube error in the workload launcher");

private final MetricEmittingApp application;
private final String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import io.airbyte.featureflag.ANONYMOUS
import io.airbyte.featureflag.Connection
import io.airbyte.featureflag.FeatureFlagClient
import io.airbyte.featureflag.UseCustomK8sScheduler
import io.airbyte.metrics.lib.MetricAttribute
import io.airbyte.metrics.lib.MetricClient
import io.airbyte.metrics.lib.OssMetricsRegistry
import io.airbyte.workers.process.KubePodInfo
import io.airbyte.workers.process.KubePodProcess
import io.airbyte.workers.process.KubePodResourceHelper
Expand Down Expand Up @@ -34,7 +37,6 @@ import java.time.Duration
import java.util.Objects
import java.util.concurrent.TimeUnit
import java.util.function.Predicate
import kotlin.collections.ArrayList

private val logger = KotlinLogging.logger {}

Expand All @@ -55,6 +57,7 @@ class OrchestratorPodLauncher(
@Named("orchestratorEnvVars") private val envVars: List<EnvVar>,
@Named("orchestratorContainerPorts") private val containerPorts: List<ContainerPort>,
@Named("orchestratorAnnotations") private val annotations: Map<String, String>,
private val metricClient: MetricClient,
) {
fun create(
allLabels: Map<String, String>,
Expand Down Expand Up @@ -191,36 +194,51 @@ class OrchestratorPodLauncher(
.build()

// should only create after the kubernetes API creates the pod
return kubernetesClient.pods()
.inNamespace(kubePodInfo.namespace)
.resource(podToCreate)
.serverSideApply()
return runKubeCommand(
{
kubernetesClient.pods()
.inNamespace(kubePodInfo.namespace)
.resource(podToCreate)
.serverSideApply()
},
"pod_create",
)
}

fun waitForPodInit(
labels: Map<String, String>,
waitDuration: Duration,
) {
kubernetesClient.pods()
.inNamespace(namespace)
.withLabels(labels)
.waitUntilCondition(
{ p: Pod ->
(
p.status.initContainerStatuses.isNotEmpty() &&
p.status.initContainerStatuses[0].state.waiting == null
runKubeCommand(
{
kubernetesClient.pods()
.inNamespace(namespace)
.withLabels(labels)
.waitUntilCondition(
{ p: Pod ->
(
p.status.initContainerStatuses.isNotEmpty() &&
p.status.initContainerStatuses[0].state.waiting == null
)
},
waitDuration.toMinutes(),
TimeUnit.MINUTES,
)
},
waitDuration.toMinutes(),
TimeUnit.MINUTES,
)
},
"wait",
)

val pods =
kubernetesClient.pods()
.inNamespace(namespace)
.withLabels(labels)
.list()
.items
runKubeCommand(
{
kubernetesClient.pods()
.inNamespace(namespace)
.withLabels(labels)
.list()
.items
},
"list",
)

if (pods.isEmpty()) {
throw RuntimeException("No pods found for labels: $labels. Nothing to wait for.")
Expand All @@ -244,17 +262,22 @@ class OrchestratorPodLauncher(
labels: Map<String, String>,
waitDuration: Duration,
) {
kubernetesClient.pods()
.inNamespace(namespace)
.withLabels(labels)
.waitUntilCondition(
{ p: Pod? ->
Objects.nonNull(p) &&
(Readiness.getInstance().isReady(p) || KubePodResourceHelper.isTerminal(p))
},
waitDuration.toMinutes(),
TimeUnit.MINUTES,
)
runKubeCommand(
{
kubernetesClient.pods()
.inNamespace(namespace)
.withLabels(labels)
.waitUntilCondition(
{ p: Pod? ->
Objects.nonNull(p) &&
(Readiness.getInstance().isReady(p) || KubePodResourceHelper.isTerminal(p))
},
waitDuration.toMinutes(),
TimeUnit.MINUTES,
)
},
"wait",
)
}

fun copyFilesToKubeConfigVolumeMain(
Expand Down Expand Up @@ -284,7 +307,13 @@ class OrchestratorPodLauncher(
containerPath,
KubePodProcess.INIT_CONTAINER_NAME,
)
proc = Runtime.getRuntime().exec(command)
proc =
runKubeCommand(
{
Runtime.getRuntime().exec(command)
},
"kubectl_cp",
)
val exitCode = proc.waitFor()
if (exitCode != 0) {
throw IOException("kubectl cp failed with exit code $exitCode")
Expand All @@ -302,39 +331,64 @@ class OrchestratorPodLauncher(

fun podsExist(labels: Map<String, String>): Boolean {
try {
return kubernetesClient.pods()
.inNamespace(namespace)
.withLabels(labels)
.list()
.items
.stream()
.filter(
Predicate<Pod> { kubePod: Pod? ->
!KubePodResourceHelper.isTerminal(
kubePod,
return runKubeCommand(
{
kubernetesClient.pods()
.inNamespace(namespace)
.withLabels(labels)
.list()
.items
.stream()
.filter(
Predicate<Pod> { kubePod: Pod? ->
!KubePodResourceHelper.isTerminal(
kubePod,
)
},
)
},
)
.findAny()
.isPresent
.findAny()
.isPresent
},
"list",
)
} catch (e: Exception) {
logger.warn(e) { "Could not find pods running for $labels, presuming no pods are running" }
return false
}
}

fun deletePods(labels: Map<String, String>): List<StatusDetails> {
return kubernetesClient.pods()
.inNamespace(namespace)
.withLabels(labels)
.list()
.items
.flatMap { p ->
return runKubeCommand(
{
kubernetesClient.pods()
.inNamespace(namespace)
.resource(p)
.withPropagationPolicy(DeletionPropagation.FOREGROUND)
.delete()
}
.withLabels(labels)
.list()
.items
.flatMap { p ->
kubernetesClient.pods()
.inNamespace(namespace)
.resource(p)
.withPropagationPolicy(DeletionPropagation.FOREGROUND)
.delete()
}
},
"delete",
)
}

private fun <T> runKubeCommand(
kubeCommand: () -> T,
commandName: String,
): T {
try {
return kubeCommand()
} catch (e: Exception) {
val attributes: List<MetricAttribute> = listOf(MetricAttribute("operation", commandName))
val attributesArray = attributes.toTypedArray<MetricAttribute>()
metricClient.count(OssMetricsRegistry.WORKLOAD_LAUNCHER_KUBE_ERROR, 1, *attributesArray)

throw e
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package pods

import io.airbyte.config.ResourceRequirements
import io.airbyte.featureflag.FeatureFlagClient
import io.airbyte.metrics.lib.MetricAttribute
import io.airbyte.metrics.lib.MetricClient
import io.airbyte.metrics.lib.OssMetricsRegistry
import io.airbyte.workers.process.KubeContainerInfo
import io.airbyte.workers.process.KubePodInfo
import io.airbyte.workload.launcher.pods.OrchestratorPodLauncher
import io.fabric8.kubernetes.client.KubernetesClient
import io.mockk.every
import io.mockk.impl.annotations.MockK
import io.mockk.junit5.MockKExtension
import io.mockk.verify
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.api.extension.ExtendWith
import java.lang.IllegalStateException
import java.time.Duration

@ExtendWith(MockKExtension::class)
class OrchestratorPodLauncherTest {
@MockK
private lateinit var kubernetesClient: KubernetesClient

@MockK
private lateinit var featureFlagClient: FeatureFlagClient

@MockK
private lateinit var metricClient: MetricClient

private lateinit var orchestratorPodLauncher: OrchestratorPodLauncher

@BeforeEach
fun setup() {
orchestratorPodLauncher =
OrchestratorPodLauncher(
kubernetesClient,
featureFlagClient,
"",
"",
"",
"",
"",
"",
"",
listOf(),
listOf(),
mapOf(),
metricClient,
)

every { featureFlagClient.stringVariation(any(), any()) } returns ""
every { kubernetesClient.pods() } throws IllegalStateException()
every { metricClient.count(any(), any(), any()) } returns Unit
}

@Test
fun `test fail to create pod`() {
assertThrows<IllegalStateException> {
orchestratorPodLauncher.create(
mapOf(),
ResourceRequirements(),
mapOf(),
KubePodInfo("", "", KubeContainerInfo("", "")),
)
}

checkMetricSend("pod_create")
}

@Test
fun `test fail to wait for pod init`() {
assertThrows<IllegalStateException> {
orchestratorPodLauncher.waitForPodInit(
mapOf(),
Duration.ZERO,
)
}

checkMetricSend("wait")
}

@Test
fun `test fail to wait for pod ready or terminal`() {
assertThrows<IllegalStateException> {
orchestratorPodLauncher.waitForPodReadyOrTerminal(
mapOf(),
Duration.ZERO,
)
}

checkMetricSend("wait")
}

@Test
fun `test fail to check if pod exist`() {
assertFalse(orchestratorPodLauncher.podsExist(mapOf()))

checkMetricSend("list")
}

@Test
fun `test fail to delete pod`() {
assertThrows<IllegalStateException> {
orchestratorPodLauncher.deletePods(
mapOf(),
)
}

checkMetricSend("delete")
}

private fun checkMetricSend(tag: String) {
val attributes: List<MetricAttribute> = listOf(MetricAttribute("operation", tag))
val attributesArray = attributes.toTypedArray<MetricAttribute>()
verify {
metricClient.count(OssMetricsRegistry.WORKLOAD_LAUNCHER_KUBE_ERROR, 1, *attributesArray)
}
}
}

0 comments on commit 85fd659

Please sign in to comment.