Skip to content

Commit

Permalink
Fix Launcher Startup Error (#10158)
Browse files Browse the repository at this point in the history
Fix issue preventing start up of launcher service
  • Loading branch information
jdpgrailsdev committed Dec 5, 2023
1 parent 7000ea4 commit 36b3d98
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workload.launcher.pipeline.handlers

import io.airbyte.metrics.lib.ApmTraceUtils
Expand All @@ -10,16 +14,19 @@ import io.airbyte.workload.launcher.pipeline.stages.model.LaunchStageIO
import io.airbyte.workload.launcher.pipeline.stages.model.StageError
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import jakarta.inject.Named
import jakarta.inject.Singleton
import reactor.core.publisher.Mono
import java.util.Optional
import java.util.function.Function

private val logger = KotlinLogging.logger {}

@Singleton
class FailureHandler(
private val apiClient: WorkloadApiClient,
private val metricPublisher: CustomMetricPublisher,
private val logMsgTemplate: (String) -> String = { id -> "Pipeline aborted after error for workload: $id." },
@Named("logMsgTemplate") private val logMsgTemplate: Optional<Function<String, String>>,
) {
fun apply(
e: Throwable,
Expand All @@ -37,7 +44,7 @@ class FailureHandler(
WorkloadLauncherMetricMetadata.WORKLOAD_PROCESSED_UNSUCCESSFULLY,
MetricAttribute(MeterFilterFactory.WORKLOAD_ID_TAG, io.msg.workloadId),
)
logger.info { logMsgTemplate(io.msg.workloadId) }
logger.info { logMsgTemplate.orElse { id -> "Pipeline aborted after error for workload: $id." }.apply(io.msg.workloadId) }
}

return Mono.empty()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workload.launcher.pipeline.handlers

import io.airbyte.metrics.lib.MetricAttribute
Expand All @@ -7,22 +11,25 @@ import io.airbyte.workload.launcher.metrics.WorkloadLauncherMetricMetadata
import io.airbyte.workload.launcher.pipeline.stages.model.LaunchStageIO
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import jakarta.inject.Named
import jakarta.inject.Singleton
import java.util.Optional
import java.util.function.Function

private val logger = KotlinLogging.logger {}

@Singleton
class SuccessHandler(
private val metricPublisher: CustomMetricPublisher,
private val logMsgTemplate: (String) -> String = { id: String -> "Pipeline completed for workload: $id." },
@Named("logMsgTemplate") private val logMsgTemplate: Optional<Function<String, String>>,
) {
fun accept(io: LaunchStageIO) {
withLoggingContext(io.logCtx) {
metricPublisher.count(
WorkloadLauncherMetricMetadata.WORKLOAD_PROCESSED_SUCCESSFULLY,
MetricAttribute(MeterFilterFactory.WORKLOAD_ID_TAG, io.msg.workloadId),
)
logger.info { logMsgTemplate(io.msg.workloadId) }
logger.info { logMsgTemplate.orElse { id: String -> "Pipeline completed for workload: $id." }.apply(io.msg.workloadId) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.junit.jupiter.params.provider.MethodSource
import org.junit.jupiter.params.provider.ValueSource
import java.io.File
import java.nio.file.Files
import java.util.Optional
import java.util.function.Function
import java.util.stream.Stream
import kotlin.io.path.Path

Expand Down Expand Up @@ -114,8 +116,8 @@ class LogPathTest {
mockk {
every { reportFailure(any()) } returns Unit
}
private val successHandler = SuccessHandler(metricPublisher) { id -> "TEST: success. Id: $id." }
private val failureHandler = FailureHandler(mockApiClient, metricPublisher) { id -> "TEST: failure. Id: $id." }
private val successHandler = SuccessHandler(metricPublisher, Optional.of(Function { id -> "TEST: success. Id: $id." }))
private val failureHandler = FailureHandler(mockApiClient, metricPublisher, Optional.of(Function { id -> "TEST: failure. Id: $id." }))

private const val TEST_LOG_PREFIX = "TEST"

Expand Down

0 comments on commit 36b3d98

Please sign in to comment.