From b6fcb43225dda6fa96145ab67c0c87f8f288bdbd Mon Sep 17 00:00:00 2001 From: Benoit Moriceau <benoit@airbyte.io> Date: Thu, 19 Dec 2024 16:18:37 -0800 Subject: [PATCH] revert: refactor: use structured encoder in cloud appender (#14882) --- .../logback/AirbyteCloudStorageAppender.kt | 16 +++++++++++----- .../logging/logback/AirbyteLogEventEncoder.kt | 11 ----------- .../logback/AirbyteLogEventEncoderTest.kt | 1 - 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/airbyte-commons-storage/src/main/kotlin/io/airbyte/commons/logging/logback/AirbyteCloudStorageAppender.kt b/airbyte-commons-storage/src/main/kotlin/io/airbyte/commons/logging/logback/AirbyteCloudStorageAppender.kt index 10daa79306..6c0c2a5033 100644 --- a/airbyte-commons-storage/src/main/kotlin/io/airbyte/commons/logging/logback/AirbyteCloudStorageAppender.kt +++ b/airbyte-commons-storage/src/main/kotlin/io/airbyte/commons/logging/logback/AirbyteCloudStorageAppender.kt @@ -6,8 +6,13 @@ package io.airbyte.commons.logging.logback import ch.qos.logback.classic.spi.ILoggingEvent import ch.qos.logback.core.AppenderBase +import com.fasterxml.jackson.databind.module.SimpleModule import com.google.common.util.concurrent.ThreadFactoryBuilder import io.airbyte.commons.envvar.EnvVar +import io.airbyte.commons.jackson.MoreMappers +import io.airbyte.commons.logging.LogEvents +import io.airbyte.commons.logging.StackTraceElementSerializer +import io.airbyte.commons.logging.toLogEvent import io.airbyte.commons.storage.AzureStorageClient import io.airbyte.commons.storage.AzureStorageConfig import io.airbyte.commons.storage.DocumentType @@ -29,6 +34,8 @@ import java.util.concurrent.Executors import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit +private val objectMapper = MoreMappers.initMapper() + /** * Shared executor service used to reduce the number of threads created to handle * uploading log data to remote storage. @@ -81,23 +88,22 @@ class AirbyteCloudStorageAppender( ) : AppenderBase<ILoggingEvent>() { private val buffer = LinkedBlockingQueue<ILoggingEvent>() private var currentStorageId: String = createFileId(baseId = baseStorageId) - private val encoder = AirbyteLogEventEncoder() private val uploadLock = Any() override fun start() { super.start() executorService.scheduleAtFixedRate(this::upload, period, period, unit) - encoder.start() + val structuredLogEventModule = SimpleModule() + structuredLogEventModule.addSerializer(StackTraceElement::class.java, StackTraceElementSerializer()) + objectMapper.registerModule(structuredLogEventModule) } override fun stop() { try { super.stop() - encoder.stop() } finally { // Do one final upload attempt to ensure that all logs are published upload() - executorService.shutdownNow() } } @@ -111,7 +117,7 @@ class AirbyteCloudStorageAppender( buffer.drainTo(events) if (events.isNotEmpty()) { - val document = encoder.bulkEncode(loggingEvents = events) + val document = objectMapper.writeValueAsString(LogEvents(events = events.map(ILoggingEvent::toLogEvent))) storageClient.write(id = currentStorageId, document = document) // Move to next file to avoid overwriting in log storage that doesn't support append mode diff --git a/airbyte-commons-storage/src/main/kotlin/io/airbyte/commons/logging/logback/AirbyteLogEventEncoder.kt b/airbyte-commons-storage/src/main/kotlin/io/airbyte/commons/logging/logback/AirbyteLogEventEncoder.kt index 584c501498..0947fdfce7 100644 --- a/airbyte-commons-storage/src/main/kotlin/io/airbyte/commons/logging/logback/AirbyteLogEventEncoder.kt +++ b/airbyte-commons-storage/src/main/kotlin/io/airbyte/commons/logging/logback/AirbyteLogEventEncoder.kt @@ -8,7 +8,6 @@ import ch.qos.logback.classic.spi.ILoggingEvent import ch.qos.logback.core.encoder.EncoderBase import com.fasterxml.jackson.databind.module.SimpleModule import io.airbyte.commons.jackson.MoreMappers -import io.airbyte.commons.logging.LogEvents import io.airbyte.commons.logging.StackTraceElementSerializer import io.airbyte.commons.logging.toLogEvent @@ -25,16 +24,6 @@ class AirbyteLogEventEncoder : EncoderBase<ILoggingEvent>() { override fun footerBytes(): ByteArray = EMPTY_BYTES - /** - * Converts the list of [ILoggingEvent] events into a [io.airbyte.commons.logging.LogEvents] document. - * - * @param loggingEvents A list of [ILoggingEvent] events. - * @return A JSON string representation of a [io.airbyte.commons.logging.LogEvents] document containing - * a structured log event for each Logback logging event. - */ - fun bulkEncode(loggingEvents: List<ILoggingEvent>): String = - objectMapper.writeValueAsString(LogEvents(events = loggingEvents.map(ILoggingEvent::toLogEvent))) - override fun encode(loggingEvent: ILoggingEvent): ByteArray = objectMapper.writeValueAsBytes(loggingEvent.toLogEvent()) + NEW_LINE override fun start() { diff --git a/airbyte-commons-storage/src/test/kotlin/io/airbyte/commons/logging/logback/AirbyteLogEventEncoderTest.kt b/airbyte-commons-storage/src/test/kotlin/io/airbyte/commons/logging/logback/AirbyteLogEventEncoderTest.kt index 7386fab495..00835d19c3 100644 --- a/airbyte-commons-storage/src/test/kotlin/io/airbyte/commons/logging/logback/AirbyteLogEventEncoderTest.kt +++ b/airbyte-commons-storage/src/test/kotlin/io/airbyte/commons/logging/logback/AirbyteLogEventEncoderTest.kt @@ -22,7 +22,6 @@ internal class AirbyteLogEventEncoderTest { @BeforeEach fun setUp() { encoder = AirbyteLogEventEncoder() - encoder.start() } @Test