Skip to content

Commit

Permalink
revert: refactor: use structured encoder in cloud appender (#14882)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Dec 20, 2024
1 parent 7ba7fb6 commit b6fcb43
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ package io.airbyte.commons.logging.logback

import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.AppenderBase
import com.fasterxml.jackson.databind.module.SimpleModule
import com.google.common.util.concurrent.ThreadFactoryBuilder
import io.airbyte.commons.envvar.EnvVar
import io.airbyte.commons.jackson.MoreMappers
import io.airbyte.commons.logging.LogEvents
import io.airbyte.commons.logging.StackTraceElementSerializer
import io.airbyte.commons.logging.toLogEvent
import io.airbyte.commons.storage.AzureStorageClient
import io.airbyte.commons.storage.AzureStorageConfig
import io.airbyte.commons.storage.DocumentType
Expand All @@ -29,6 +34,8 @@ import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit

private val objectMapper = MoreMappers.initMapper()

/**
* Shared executor service used to reduce the number of threads created to handle
* uploading log data to remote storage.
Expand Down Expand Up @@ -81,23 +88,22 @@ class AirbyteCloudStorageAppender(
) : AppenderBase<ILoggingEvent>() {
private val buffer = LinkedBlockingQueue<ILoggingEvent>()
private var currentStorageId: String = createFileId(baseId = baseStorageId)
private val encoder = AirbyteLogEventEncoder()
private val uploadLock = Any()

override fun start() {
super.start()
executorService.scheduleAtFixedRate(this::upload, period, period, unit)
encoder.start()
val structuredLogEventModule = SimpleModule()
structuredLogEventModule.addSerializer(StackTraceElement::class.java, StackTraceElementSerializer())
objectMapper.registerModule(structuredLogEventModule)
}

override fun stop() {
try {
super.stop()
encoder.stop()
} finally {
// Do one final upload attempt to ensure that all logs are published
upload()
executorService.shutdownNow()
}
}

Expand All @@ -111,7 +117,7 @@ class AirbyteCloudStorageAppender(
buffer.drainTo(events)

if (events.isNotEmpty()) {
val document = encoder.bulkEncode(loggingEvents = events)
val document = objectMapper.writeValueAsString(LogEvents(events = events.map(ILoggingEvent::toLogEvent)))
storageClient.write(id = currentStorageId, document = document)

// Move to next file to avoid overwriting in log storage that doesn't support append mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.encoder.EncoderBase
import com.fasterxml.jackson.databind.module.SimpleModule
import io.airbyte.commons.jackson.MoreMappers
import io.airbyte.commons.logging.LogEvents
import io.airbyte.commons.logging.StackTraceElementSerializer
import io.airbyte.commons.logging.toLogEvent

Expand All @@ -25,16 +24,6 @@ class AirbyteLogEventEncoder : EncoderBase<ILoggingEvent>() {

override fun footerBytes(): ByteArray = EMPTY_BYTES

/**
* Converts the list of [ILoggingEvent] events into a [io.airbyte.commons.logging.LogEvents] document.
*
* @param loggingEvents A list of [ILoggingEvent] events.
* @return A JSON string representation of a [io.airbyte.commons.logging.LogEvents] document containing
* a structured log event for each Logback logging event.
*/
fun bulkEncode(loggingEvents: List<ILoggingEvent>): String =
objectMapper.writeValueAsString(LogEvents(events = loggingEvents.map(ILoggingEvent::toLogEvent)))

override fun encode(loggingEvent: ILoggingEvent): ByteArray = objectMapper.writeValueAsBytes(loggingEvent.toLogEvent()) + NEW_LINE

override fun start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ internal class AirbyteLogEventEncoderTest {
@BeforeEach
fun setUp() {
encoder = AirbyteLogEventEncoder()
encoder.start()
}

@Test
Expand Down

0 comments on commit b6fcb43

Please sign in to comment.