From 06b94bd7c6e75a658bc8d3e57303da58b57aff64 Mon Sep 17 00:00:00 2001 From: benmoriceau Date: Mon, 16 Dec 2024 14:57:11 -0800 Subject: [PATCH 1/9] tmp tmp Metadata Tets Tets Rm logs Fix rm extra call only use queue part size Fix build Format bump sdk version Increase the retry strategy Change config Dedicated config for file transfer Format Update test Format --- .../load/command/DestinationConfiguration.kt | 1 + .../cdk/load/config/SyncBeanFactory.kt | 10 ++ .../cdk/load/task/DestinationTaskLauncher.kt | 29 +++--- .../load/task/implementor/ProcessBatchTask.kt | 3 +- .../load/task/implementor/ProcessFileTask.kt | 46 +++++---- .../load/task/internal/InputConsumerTask.kt | 16 +++- .../io/airbyte/cdk/load/write/StreamLoader.kt | 11 ++- .../task/implementor/ProcessFileTaskTest.kt | 49 ---------- .../ObjectStorageFormattingWriter.kt | 18 ++-- .../object_storage/FilePartAccumulator.kt | 72 ++++++++++++++ .../ObjectStorageStreamLoaderFactory.kt | 38 ++------ .../object_storage/FilePartAccumulatorTest.kt | 94 +++++++++++++++++++ .../ObjectStorageStreamLoaderTest.kt | 90 ------------------ .../bulk/toolkits/load-s3/build.gradle | 2 +- .../io/airbyte/cdk/load/file/s3/S3Client.kt | 1 + .../destination-s3-v2/metadata.yaml | 35 ++++--- .../destination/s3_v2/S3V2WriteTest.kt | 14 ++- 17 files changed, 303 insertions(+), 226 deletions(-) delete mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTaskTest.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulatorTest.kt delete mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt index 035d31159a6c..a36d0044c950 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt @@ -87,6 +87,7 @@ abstract class DestinationConfiguration : Configuration { open val numProcessRecordsWorkers: Int = 2 open val numProcessBatchWorkers: Int = 5 + open val numProcessBatchWorkersForFileTransfer: Int = 3 open val batchQueueDepth: Int = 10 companion object { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt index 380c7143baea..700fcc2c3bc2 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.state.ReservationManager import io.airbyte.cdk.load.task.implementor.FileAggregateMessage +import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Value @@ -79,4 +80,13 @@ class SyncBeanFactory { val channel = Channel>(config.batchQueueDepth) return MultiProducerChannel(config.numProcessRecordsWorkers.toLong(), channel, "batchQueue") } + + @Singleton + @Named("fileMessageQueue") + fun fileMessageQueue( + config: DestinationConfiguration, + ): MultiProducerChannel { + val channel = Channel(config.batchQueueDepth) + return MultiProducerChannel(1, channel, "fileMessageQueue") + } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt index 5e4fa1389bc7..0caef80100b9 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt @@ -10,9 +10,9 @@ import io.airbyte.cdk.load.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.CheckpointMessageWrapped -import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationMessage import io.airbyte.cdk.load.message.DestinationStreamEvent +import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.QueueWriter import io.airbyte.cdk.load.state.Reserved @@ -20,6 +20,7 @@ import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.implementor.CloseStreamTaskFactory import io.airbyte.cdk.load.task.implementor.FailStreamTaskFactory import io.airbyte.cdk.load.task.implementor.FailSyncTaskFactory +import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage import io.airbyte.cdk.load.task.implementor.OpenStreamTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessBatchTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessFileTaskFactory @@ -37,6 +38,7 @@ import io.airbyte.cdk.load.util.setOnce import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Value +import jakarta.inject.Named import jakarta.inject.Singleton import java.util.concurrent.atomic.AtomicBoolean import kotlinx.coroutines.CancellationException @@ -50,8 +52,6 @@ interface DestinationTaskLauncher : TaskLauncher { suspend fun handleNewBatch(stream: DestinationStream.Descriptor, wrapped: BatchEnvelope<*>) suspend fun handleStreamClosed(stream: DestinationStream.Descriptor) suspend fun handleTeardownComplete(success: Boolean = true) - suspend fun handleFile(stream: DestinationStream.Descriptor, file: DestinationFile, index: Long) - suspend fun handleException(e: Exception) suspend fun handleFailStreamComplete(stream: DestinationStream.Descriptor, e: Exception) } @@ -129,6 +129,7 @@ class DefaultDestinationTaskLauncher( private val recordQueueSupplier: MessageQueueSupplier>, private val checkpointQueue: QueueWriter>, + @Named("fileMessageQueue") private val fileTransferQueue: MessageQueue ) : DestinationTaskLauncher { private val log = KotlinLogging.logger {} @@ -180,7 +181,8 @@ class DefaultDestinationTaskLauncher( inputFlow = inputFlow, recordQueueSupplier = recordQueueSupplier, checkpointQueue = checkpointQueue, - this, + fileTransferQueue = fileTransferQueue, + destinationTaskLauncher = this, ) enqueue(inputConsumerTask) @@ -209,6 +211,17 @@ class DefaultDestinationTaskLauncher( val task = processBatchTaskFactory.make(this) enqueue(task) } + } else { + repeat(config.numProcessRecordsWorkers) { + log.info { "Launching process file task $it" } + enqueue(processFileTaskFactory.make(this)) + } + + repeat(config.numProcessBatchWorkersForFileTransfer) { + log.info { "Launching process batch task $it" } + val task = processBatchTaskFactory.make(this) + enqueue(task) + } } // Start flush task @@ -284,14 +297,6 @@ class DefaultDestinationTaskLauncher( } } - override suspend fun handleFile( - stream: DestinationStream.Descriptor, - file: DestinationFile, - index: Long - ) { - enqueue(processFileTaskFactory.make(this, stream, file, index)) - } - override suspend fun handleException(e: Exception) { catalog.streams .map { failStreamTaskFactory.make(this, e, it.descriptor) } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt index 34b4d94a88a1..1d0e43d86242 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher import io.airbyte.cdk.load.task.KillableScope import io.airbyte.cdk.load.write.StreamLoader +import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Named import jakarta.inject.Singleton @@ -22,7 +23,7 @@ class DefaultProcessBatchTask( private val batchQueue: MultiProducerChannel>, private val taskLauncher: DestinationTaskLauncher ) : ProcessBatchTask { - + val log = KotlinLogging.logger {} override suspend fun execute() { batchQueue.consume().collect { batchEnvelope -> val streamLoader = syncManager.getOrAwaitStreamLoader(batchEnvelope.streamDescriptor) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt index 36801ce82156..c74a5412b235 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt @@ -4,44 +4,52 @@ package io.airbyte.cdk.load.task.implementor -import com.google.common.collect.Range import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.DestinationFile +import io.airbyte.cdk.load.message.MessageQueue +import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.util.use +import io.airbyte.cdk.load.write.BatchAccumulator import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary +import jakarta.inject.Named import jakarta.inject.Singleton +import java.util.concurrent.ConcurrentHashMap interface ProcessFileTask : ImplementorScope class DefaultProcessFileTask( - private val streamDescriptor: DestinationStream.Descriptor, - private val taskLauncher: DestinationTaskLauncher, private val syncManager: SyncManager, - private val file: DestinationFile, - private val index: Long, + private val taskLauncher: DestinationTaskLauncher, + private val inputQueue: MessageQueue, + private val outputQueue: MultiProducerChannel>, ) : ProcessFileTask { val log = KotlinLogging.logger {} + private val accumulators = ConcurrentHashMap() override suspend fun execute() { - val streamLoader = syncManager.getOrAwaitStreamLoader(streamDescriptor) + outputQueue.use { + inputQueue.consume().collect { (streamDescriptor, file, index) -> + val streamLoader = syncManager.getOrAwaitStreamLoader(streamDescriptor) - val batch = streamLoader.processFile(file) + val acc = + accumulators.getOrPut(streamDescriptor) { + streamLoader.createFileBatchAccumulator(outputQueue) + } - val wrapped = BatchEnvelope(batch, Range.singleton(index), streamDescriptor) - taskLauncher.handleNewBatch(streamDescriptor, wrapped) + acc.processFilePart(file, index) + } + } } } interface ProcessFileTaskFactory { fun make( taskLauncher: DestinationTaskLauncher, - stream: DestinationStream.Descriptor, - file: DestinationFile, - index: Long, ): ProcessFileTask } @@ -49,13 +57,19 @@ interface ProcessFileTaskFactory { @Secondary class DefaultFileRecordsTaskFactory( private val syncManager: SyncManager, + @Named("fileMessageQueue") + private val fileTransferQueue: MessageQueue, + @Named("batchQueue") private val outputQueue: MultiProducerChannel>, ) : ProcessFileTaskFactory { override fun make( taskLauncher: DestinationTaskLauncher, - stream: DestinationStream.Descriptor, - file: DestinationFile, - index: Long, ): ProcessFileTask { - return DefaultProcessFileTask(stream, taskLauncher, syncManager, file, index) + return DefaultProcessFileTask(syncManager, taskLauncher, fileTransferQueue, outputQueue) } } + +data class FileTransferQueueMessage( + val streamDescriptor: DestinationStream.Descriptor, + val file: DestinationFile, + val index: Long, +) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt index 66da0212e0fe..e4526a3dc991 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt @@ -22,6 +22,7 @@ import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage import io.airbyte.cdk.load.message.DestinationStreamEvent import io.airbyte.cdk.load.message.GlobalCheckpoint import io.airbyte.cdk.load.message.GlobalCheckpointWrapped +import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.QueueWriter import io.airbyte.cdk.load.message.SimpleBatch @@ -34,9 +35,11 @@ import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher import io.airbyte.cdk.load.task.KillableScope +import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage import io.airbyte.cdk.load.util.use import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary +import jakarta.inject.Named import jakarta.inject.Singleton interface InputConsumerTask : KillableScope @@ -61,6 +64,8 @@ class DefaultInputConsumerTask( private val checkpointQueue: QueueWriter>, private val syncManager: SyncManager, private val destinationTaskLauncher: DestinationTaskLauncher, + @Named("fileMessageQueue") + private val fileTransferQueue: MessageQueue, ) : InputConsumerTask { private val log = KotlinLogging.logger {} @@ -97,15 +102,17 @@ class DefaultInputConsumerTask( } is DestinationFile -> { val index = manager.countRecordIn() - destinationTaskLauncher.handleFile(stream, message, index) + // destinationTaskLauncher.handleFile(stream, message, index) + fileTransferQueue.publish(FileTransferQueueMessage(stream, message, index)) } is DestinationFileStreamComplete -> { reserved.release() // safe because multiple calls conflate manager.markEndOfStream(true) + fileTransferQueue.close() val envelope = BatchEnvelope( SimpleBatch(Batch.State.COMPLETE), - streamDescriptor = message.stream + streamDescriptor = message.stream, ) destinationTaskLauncher.handleNewBatch(stream, envelope) } @@ -198,6 +205,7 @@ interface InputConsumerTaskFactory { MessageQueueSupplier>, checkpointQueue: QueueWriter>, destinationTaskLauncher: DestinationTaskLauncher, + fileTransferQueue: MessageQueue ): InputConsumerTask } @@ -212,6 +220,7 @@ class DefaultInputConsumerTaskFactory(private val syncManager: SyncManager) : MessageQueueSupplier>, checkpointQueue: QueueWriter>, destinationTaskLauncher: DestinationTaskLauncher, + fileTransferQueue: MessageQueue, ): InputConsumerTask { return DefaultInputConsumerTask( catalog, @@ -219,7 +228,8 @@ class DefaultInputConsumerTaskFactory(private val syncManager: SyncManager) : recordQueueSupplier, checkpointQueue, syncManager, - destinationTaskLauncher + destinationTaskLauncher, + fileTransferQueue, ) } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt index 3fe495067434..515243d32e9f 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt @@ -6,8 +6,10 @@ package io.airbyte.cdk.load.write import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.state.StreamProcessingFailed @@ -48,8 +50,10 @@ interface StreamLoader : BatchAccumulator { suspend fun start() {} suspend fun createBatchAccumulator(): BatchAccumulator = this + suspend fun createFileBatchAccumulator( + outputQueue: MultiProducerChannel>, + ): BatchAccumulator = this - suspend fun processFile(file: DestinationFile): Batch suspend fun processBatch(batch: Batch): Batch = SimpleBatch(Batch.State.COMPLETE) suspend fun close(streamFailure: StreamProcessingFailed? = null) {} } @@ -63,4 +67,9 @@ interface BatchAccumulator { throw NotImplementedError( "processRecords must be implemented if createBatchAccumulator is overridden" ) + + suspend fun processFilePart(file: DestinationFile, index: Long): Unit = + throw NotImplementedError( + "processRecords must be implemented if createBatchAccumulator is overridden" + ) } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTaskTest.kt deleted file mode 100644 index 0c550b927c1e..000000000000 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTaskTest.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.load.task.implementor - -import com.google.common.collect.Range -import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.message.Batch -import io.airbyte.cdk.load.message.BatchEnvelope -import io.airbyte.cdk.load.message.DestinationFile -import io.airbyte.cdk.load.message.SimpleBatch -import io.airbyte.cdk.load.state.SyncManager -import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.write.StreamLoader -import io.mockk.coEvery -import io.mockk.coVerify -import io.mockk.mockk -import kotlinx.coroutines.test.runTest -import org.junit.jupiter.api.Test - -class ProcessFileTaskTest { - private val stream: DestinationStream.Descriptor = - DestinationStream.Descriptor("namespace", "name") - private val taskLauncher: DestinationTaskLauncher = mockk(relaxed = true) - private val syncManager: SyncManager = mockk(relaxed = true) - private val file: DestinationFile = mockk(relaxed = true) - private val index = 1234L - - val defaultProcessFileTask = - DefaultProcessFileTask(stream, taskLauncher, syncManager, file, index) - - @Test - fun `the the file process task execution`() = runTest { - val mockedStreamLoader = mockk(relaxed = true) - coEvery { syncManager.getOrAwaitStreamLoader(stream) } returns mockedStreamLoader - val batch = SimpleBatch(Batch.State.COMPLETE) - coEvery { mockedStreamLoader.processFile(file) } returns batch - - defaultProcessFileTask.execute() - - coVerify { - taskLauncher.handleNewBatch( - stream, - BatchEnvelope(batch, Range.singleton(index), stream) - ) - } - } -} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt index c5417306a47b..0119b8bf0257 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt @@ -36,9 +36,9 @@ import java.io.Closeable import java.io.OutputStream import org.apache.avro.Schema -interface ObjectStorageFormattingWriter : Closeable { - fun accept(record: DestinationRecord) - fun flush() +abstract class ObjectStorageFormattingWriter : Closeable { + abstract fun accept(record: DestinationRecord) + abstract fun flush() } @Singleton @@ -51,6 +51,8 @@ class ObjectStorageFormattingWriterFactory( outputStream: OutputStream ): ObjectStorageFormattingWriter { val flatten = formatConfigProvider.objectStorageFormatConfiguration.rootLevelFlattening + // TODO: FileWriter + return when (formatConfigProvider.objectStorageFormatConfiguration) { is JsonFormatConfiguration -> JsonFormattingWriter(stream, outputStream, flatten) is AvroFormatConfiguration -> @@ -78,7 +80,7 @@ class JsonFormattingWriter( private val stream: DestinationStream, private val outputStream: OutputStream, private val rootLevelFlattening: Boolean, -) : ObjectStorageFormattingWriter { +) : ObjectStorageFormattingWriter() { override fun accept(record: DestinationRecord) { val data = @@ -100,7 +102,7 @@ class CSVFormattingWriter( private val stream: DestinationStream, outputStream: OutputStream, private val rootLevelFlattening: Boolean -) : ObjectStorageFormattingWriter { +) : ObjectStorageFormattingWriter() { private val finalSchema = stream.schema.withAirbyteMeta(rootLevelFlattening) private val printer = finalSchema.toCsvPrinterWithHeader(outputStream) @@ -124,7 +126,7 @@ class AvroFormattingWriter( outputStream: OutputStream, formatConfig: AvroFormatConfiguration, private val rootLevelFlattening: Boolean, -) : ObjectStorageFormattingWriter { +) : ObjectStorageFormattingWriter() { val log = KotlinLogging.logger {} private val pipeline = AvroMapperPipelineFactory().create(stream) @@ -157,7 +159,7 @@ class ParquetFormattingWriter( outputStream: OutputStream, formatConfig: ParquetFormatConfiguration, private val rootLevelFlattening: Boolean, -) : ObjectStorageFormattingWriter { +) : ObjectStorageFormattingWriter() { private val log = KotlinLogging.logger {} private val pipeline = ParquetMapperPipelineFactory().create(stream) @@ -206,7 +208,7 @@ class BufferedFormattingWriter( private val buffer: ByteArrayOutputStream, private val streamProcessor: StreamProcessor, private val wrappingBuffer: T -) : ObjectStorageFormattingWriter { +) : ObjectStorageFormattingWriter() { val bufferSize: Int get() = buffer.size() diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt new file mode 100644 index 000000000000..9fadf03c0c60 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.write.object_storage + +import com.google.common.collect.Range +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration +import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory +import io.airbyte.cdk.load.file.object_storage.PartFactory +import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.BatchEnvelope +import io.airbyte.cdk.load.message.DestinationFile +import io.airbyte.cdk.load.message.MultiProducerChannel +import io.airbyte.cdk.load.message.object_storage.LoadablePart +import io.airbyte.cdk.load.write.BatchAccumulator +import java.io.File +import java.nio.file.Path + +class FilePartAccumulator( + private val pathFactory: ObjectStoragePathFactory, + private val stream: DestinationStream, + private val outputQueue: MultiProducerChannel>, +) : BatchAccumulator { + override suspend fun processFilePart(file: DestinationFile, index: Long) { + val key = + Path.of(pathFactory.getFinalDirectory(stream), "${file.fileMessage.fileRelativePath}") + .toString() + + val part = + PartFactory( + key = key, + fileNumber = index, + ) + + val localFile = File(file.fileMessage.fileUrl) + val fileInputStream = localFile.inputStream() + + while (true) { + val bytePart = + ByteArray(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt()) + val read = fileInputStream.read(bytePart) + + if (read == -1) { + val filePart: ByteArray? = null + val batch = LoadablePart(part.nextPart(filePart, isFinal = true)) + handleFilePart(batch, stream.descriptor, index) + break + } else if (read < bytePart.size) { + val filePart: ByteArray = bytePart.copyOfRange(0, read) + val batch = LoadablePart(part.nextPart(filePart, isFinal = true)) + handleFilePart(batch, stream.descriptor, index) + break + } else { + val batch = LoadablePart(part.nextPart(bytePart, isFinal = false)) + handleFilePart(batch, stream.descriptor, index) + } + } + localFile.delete() + } + + private suspend fun handleFilePart( + batch: Batch, + streamDescriptor: DestinationStream.Descriptor, + index: Long, + ) { + + val wrapped = BatchEnvelope(batch, Range.singleton(index), streamDescriptor) + outputQueue.publish(wrapped) + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt index 5f362fcffa49..0072cb57ce1b 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt @@ -15,7 +15,9 @@ import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.airbyte.cdk.load.message.Batch -import io.airbyte.cdk.load.message.DestinationFile +import io.airbyte.cdk.load.message.BatchEnvelope +import io.airbyte.cdk.load.message.MultiProducerChannel +import io.airbyte.cdk.load.message.object_storage.* import io.airbyte.cdk.load.message.object_storage.LoadedObject import io.airbyte.cdk.load.message.object_storage.ObjectStorageBatch import io.airbyte.cdk.load.state.DestinationStateManager @@ -28,7 +30,6 @@ import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton import java.io.File import java.io.OutputStream -import java.nio.file.Path import java.util.concurrent.atomic.AtomicLong @Singleton @@ -97,36 +98,9 @@ class ObjectStorageStreamLoader, U : OutputStream>( ) } - override suspend fun processFile(file: DestinationFile): Batch { - if (pathFactory.supportsStaging) { - throw IllegalStateException("Staging is not supported for files") - } - val fileUrl = file.fileMessage.fileUrl ?: "" - if (!File(fileUrl).exists()) { - log.error { "File does not exist: $fileUrl" } - throw IllegalStateException("File does not exist: $fileUrl") - } - val key = - Path.of(pathFactory.getFinalDirectory(stream), "${file.fileMessage.fileRelativePath}") - .toString() - - val state = destinationStateManager.getState(stream) - state.addObject( - generationId = stream.generationId, - key = key, - partNumber = 0, - isStaging = false - ) - - val metadata = ObjectStorageDestinationState.metadataFor(stream) - val obj = - client.streamingUpload(key, metadata, streamProcessor = compressor) { outputStream -> - File(fileUrl).inputStream().use { it.copyTo(outputStream) } - } - val localFile = createFile(fileUrl) - localFile.delete() - return LoadedObject(remoteObject = obj, fileNumber = 0) - } + override suspend fun createFileBatchAccumulator( + outputQueue: MultiProducerChannel>, + ): BatchAccumulator = FilePartAccumulator(pathFactory, stream, outputQueue) @VisibleForTesting fun createFile(url: String) = File(url) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulatorTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulatorTest.kt new file mode 100644 index 000000000000..25d232bd3328 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulatorTest.kt @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.write.object_storage + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration +import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory +import io.airbyte.cdk.load.message.BatchEnvelope +import io.airbyte.cdk.load.message.DestinationFile +import io.airbyte.cdk.load.message.MultiProducerChannel +import io.mockk.coVerify +import io.mockk.every +import io.mockk.mockk +import java.io.File +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +class FilePartAccumulatorTest { + private val pathFactory: ObjectStoragePathFactory = mockk(relaxed = true) + private val stream: DestinationStream = mockk(relaxed = true) + private val outputQueue: MultiProducerChannel> = mockk(relaxed = true) + + private val filePartAccumulator = FilePartAccumulator(pathFactory, stream, outputQueue) + + private val fileRelativePath = "relativePath" + private val descriptor = DestinationStream.Descriptor("namespace", "name") + + @BeforeEach + fun init() { + every { stream.descriptor } returns descriptor + } + + @Test + fun testFilePartAccumulatorSmall() = runTest { + val finalDirectory = "finalDirectory" + every { pathFactory.getFinalDirectory(stream) } returns finalDirectory + val file = createFile(10) + val index = 21L + val fileMessage = createFileMessage(file) + + filePartAccumulator.processFilePart(fileMessage, index) + + coVerify(exactly = 1) { outputQueue.publish(any()) } + } + + @Test + fun testFilePartAccumulatorExactlyPartSize() = runTest { + val finalDirectory = "finalDirectory" + every { pathFactory.getFinalDirectory(stream) } returns finalDirectory + val file = createFile(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt()) + val index = 21L + val fileMessage = createFileMessage(file) + + filePartAccumulator.processFilePart(fileMessage, index) + + coVerify(exactly = 2) { outputQueue.publish(any()) } + } + + @Test + fun testFilePartAccumulatorBig() = runTest { + val finalDirectory = "finalDirectory" + every { pathFactory.getFinalDirectory(stream) } returns finalDirectory + val file = + createFile(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt() + 1000) + val index = 21L + val fileMessage = createFileMessage(file) + + filePartAccumulator.processFilePart(fileMessage, index) + + coVerify(exactly = 2) { outputQueue.publish(any()) } + } + + private fun createFile(sizeInBytes: Int): File { + val file = File.createTempFile("test", ".txt") + val text = CharArray(sizeInBytes) { 'a' }.concatToString() + file.writeText(text) + return file + } + + private fun createFileMessage(file: File): DestinationFile { + return DestinationFile( + descriptor, + 0, + "", + DestinationFile.AirbyteRecordMessageFile( + fileUrl = file.absolutePath, + fileRelativePath = fileRelativePath, + ) + ) + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt deleted file mode 100644 index 47631dd552c2..000000000000 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.load.write.object_storage - -import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.file.StreamProcessor -import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriterFactory -import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient -import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory -import io.airbyte.cdk.load.file.object_storage.RemoteObject -import io.airbyte.cdk.load.message.DestinationFile -import io.airbyte.cdk.load.message.object_storage.* -import io.airbyte.cdk.load.state.DestinationStateManager -import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState -import io.mockk.coEvery -import io.mockk.coVerify -import io.mockk.every -import io.mockk.mockk -import io.mockk.spyk -import io.mockk.verify -import java.io.ByteArrayOutputStream -import java.io.File -import java.nio.file.Files -import java.nio.file.Path -import kotlin.test.assertEquals -import kotlinx.coroutines.test.runTest -import org.junit.jupiter.api.Test - -class ObjectStorageStreamLoaderTest { - private val stream: DestinationStream = mockk(relaxed = true) - private val client: ObjectStorageClient> = mockk(relaxed = true) - private val compressor: StreamProcessor = mockk(relaxed = true) - private val pathFactory: ObjectStoragePathFactory = mockk(relaxed = true) - private val writerFactory: BufferedFormattingWriterFactory = - mockk(relaxed = true) - private val destinationStateManager: DestinationStateManager = - mockk(relaxed = true) - private val fileSize: Long = 2 - private val partSize: Long = 1 - - private val objectStorageStreamLoader = - spyk( - ObjectStorageStreamLoader( - stream, - client, - compressor, - pathFactory, - writerFactory, - destinationStateManager, - partSizeBytes = partSize, - fileSizeBytes = fileSize - ) - ) - - @Test - fun `test processFile`() = runTest { - val fileUrl = "/tmp/fileUrl" - Files.deleteIfExists(Path.of(fileUrl)) - Files.createFile(Path.of(fileUrl)) - val stagingDirectory = "stagingDirectory" - val generationId = 12L - val destinationFile = mockk() - every { destinationFile.fileMessage } returns - DestinationFile.AirbyteRecordMessageFile(fileUrl = fileUrl, fileRelativePath = fileUrl) - every { pathFactory.getFinalDirectory(any()) } returns stagingDirectory - every { stream.generationId } returns generationId - val mockedStateStorage: ObjectStorageDestinationState = mockk(relaxed = true) - coEvery { destinationStateManager.getState(stream) } returns mockedStateStorage - val mockedFile = mockk(relaxed = true) - every { objectStorageStreamLoader.createFile(any()) } returns mockedFile - - val expectedKey = Path.of(stagingDirectory, fileUrl).toString() - val metadata = - mapOf( - ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY to generationId.toString() - ) - val mockRemoteObject: RemoteObject = mockk(relaxed = true) - coEvery { client.streamingUpload(any(), any(), compressor, any()) } returns mockRemoteObject - - val result = objectStorageStreamLoader.processFile(destinationFile) - - coVerify { mockedStateStorage.addObject(generationId, expectedKey, 0, false) } - coVerify { client.streamingUpload(expectedKey, metadata, compressor, any()) } - assertEquals(mockRemoteObject, (result as LoadedObject<*>).remoteObject) - verify { mockedFile.delete() } - Files.deleteIfExists(Path.of(fileUrl)) - } -} diff --git a/airbyte-cdk/bulk/toolkits/load-s3/build.gradle b/airbyte-cdk/bulk/toolkits/load-s3/build.gradle index 00336f267d49..e19bc337b342 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/build.gradle +++ b/airbyte-cdk/bulk/toolkits/load-s3/build.gradle @@ -5,5 +5,5 @@ dependencies { api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-object-storage') testFixturesApi(testFixtures(project(":airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-object-storage"))) - implementation("aws.sdk.kotlin:s3:1.0.0") + implementation("aws.sdk.kotlin:s3:1.3.94") } diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt index a2c8bb79d5dd..eeb1ed0f417c 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt @@ -241,6 +241,7 @@ class S3ClientFactory( region = bucketConfig.s3BucketConfiguration.s3BucketRegion.name credentialsProvider = credsProvider endpointUrl = bucketConfig.s3BucketConfiguration.s3Endpoint?.let { Url.parse(it) } + retryStrategy { maxAttempts = 5 } } return S3Client( diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index 2dfbda00c691..68ff9306d770 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -1,27 +1,40 @@ data: connectorSubtype: file connectorType: destination - definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.3.4 - dockerRepository: airbyte/destination-s3-v2 - githubIssueLabel: destination-s3-v2 + definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 + dockerImageTag: 1.5.0 + dockerRepository: airbyte/destination-s3 + githubIssueLabel: destination-s3 icon: s3.svg license: ELv2 - name: S3 V2 Destination + name: S3 registryOverrides: cloud: - enabled: false + enabled: true oss: - enabled: false - releaseStage: alpha + enabled: true + releaseStage: generally_available + releases: + breakingChanges: + 1.0.0: + message: > + **This release includes breaking changes, including major revisions to the schema of stored data. Do not upgrade without reviewing the migration guide.** + upgradeDeadline: "2024-10-08" + resourceRequirements: + jobSpecific: + - jobType: sync + resourceRequirements: + memory_limit: 2Gi + memory_request: 2Gi documentationUrl: https://docs.airbyte.com/integrations/destinations/s3 tags: - language:java ab_internal: - sl: 100 - ql: 100 - supportLevel: community + sl: 300 + ql: 300 + supportLevel: certified supportsRefreshes: true + supportsFileTransfer: true connectorTestSuitesOptions: - suite: unitTests - suite: integrationTests diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index a4b3d56ab8b1..39ce3c3a4a80 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -48,7 +48,7 @@ abstract class S3V2WriteTest( super.testAppendSchemaEvolution() } - @Disabled("Temporarily disable because failing in CI") + @Disabled("For most test the file test is not needed since it doesn't apply compression") @Test override fun testBasicWriteFile() { super.testBasicWriteFile() @@ -72,6 +72,11 @@ class S3V2WriteTestJsonUncompressed : override fun testBasicTypes() { super.testBasicTypes() } + + @Test + override fun testBasicWriteFile() { + super.testBasicWriteFile() + } } class S3V2WriteTestJsonRootLevelFlattening : @@ -119,7 +124,12 @@ class S3V2WriteTestCsvUncompressed : promoteUnionToObject = false, preserveUndeclaredFields = true, allTypesBehavior = Untyped, - ) + ) { + @Test + override fun testBasicWriteFile() { + super.testBasicWriteFile() + } +} class S3V2WriteTestCsvRootLevelFlattening : S3V2WriteTest( From 12b444b3fc90e669e7d1442b7165d2c53dd3a403 Mon Sep 17 00:00:00 2001 From: benmoriceau Date: Wed, 18 Dec 2024 16:51:37 -0800 Subject: [PATCH 2/9] Build --- .../destination/dev_null/DevNullWriter.kt | 34 ------------------- 1 file changed, 34 deletions(-) diff --git a/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt b/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt index 1bf7d57284b7..5a96ded5c24b 100644 --- a/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt +++ b/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt @@ -90,12 +90,6 @@ class LoggingStreamLoader(override val stream: DestinationStream, loggingConfig: return SimpleBatch(state = Batch.State.COMPLETE) } - - override suspend fun processFile(file: DestinationFile): Batch { - log.info { "Ignoring file" } - - return SimpleBatch(state = Batch.State.COMPLETE) - } } class SilentStreamLoader(override val stream: DestinationStream) : StreamLoader { @@ -106,10 +100,6 @@ class SilentStreamLoader(override val stream: DestinationStream) : StreamLoader ): Batch { return SimpleBatch(state = Batch.State.COMPLETE) } - - override suspend fun processFile(file: DestinationFile): Batch { - return SimpleBatch(state = Batch.State.COMPLETE) - } } @SuppressFBWarnings( @@ -134,14 +124,6 @@ class ThrottledStreamLoader( return SimpleBatch(state = Batch.State.COMPLETE) } - - override suspend fun processFile(file: DestinationFile): Batch { - log.info { "Processing a file with delay of $millisPerRecord" } - delay(millisPerRecord) - log.info { "Completed file." } - - return SimpleBatch(state = Batch.State.COMPLETE) - } } class FailingStreamLoader(override val stream: DestinationStream, private val numMessages: Int) : @@ -173,20 +155,4 @@ class FailingStreamLoader(override val stream: DestinationStream, private val nu return SimpleBatch(state = Batch.State.COMPLETE) } - - override suspend fun processFile(file: DestinationFile): Batch { - log.info { "Processing a file with failure after $numMessages messages" } - - messageCount.getAndIncrement().let { messageCount -> - if (messageCount > numMessages) { - val message = - "Failing Destination(stream=$stream, numMessages=$numMessages: failing at $file" - log.info { message } - throw RuntimeException(message) - } - } - log.info { "Completed file." } - - return SimpleBatch(state = Batch.State.COMPLETE) - } } From 1a4fefae7fdabee145ec36610a38771e0869ca64 Mon Sep 17 00:00:00 2001 From: benmoriceau Date: Wed, 18 Dec 2024 16:55:00 -0800 Subject: [PATCH 3/9] Format --- .../airbyte/integrations/destination/dev_null/DevNullWriter.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt b/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt index 5a96ded5c24b..cb422042bf20 100644 --- a/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt +++ b/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt @@ -7,7 +7,6 @@ package io.airbyte.integrations.destination.dev_null import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.Batch -import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.write.DestinationWriter From 040f799cdd02b7a0e0edd50e533d8dc1ae9bec80 Mon Sep 17 00:00:00 2001 From: benmoriceau Date: Thu, 19 Dec 2024 07:40:35 -0800 Subject: [PATCH 4/9] Fix test build --- .../MockDestinationWriter.kt | 4 -- .../load/task/DestinationTaskLauncherTest.kt | 4 +- .../load/task/DestinationTaskLauncherUTest.kt | 19 ++----- .../airbyte/cdk/load/task/MockTaskLauncher.kt | 8 --- .../task/internal/InputConsumerTaskTest.kt | 55 ++++++++++--------- 5 files changed, 37 insertions(+), 53 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt index 68d71570bb38..abe3e3897f45 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt @@ -79,10 +79,6 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader { return LocalBatch(records.asSequence().toList()) } - override suspend fun processFile(file: DestinationFile): Batch { - return LocalFileBatch(file) - } - override suspend fun processBatch(batch: Batch): Batch { return when (batch) { is LocalBatch -> { diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt index 34081e2be221..d3fbaa0cbfe6 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt @@ -30,6 +30,7 @@ import io.airbyte.cdk.load.task.implementor.FailStreamTask import io.airbyte.cdk.load.task.implementor.FailStreamTaskFactory import io.airbyte.cdk.load.task.implementor.FailSyncTask import io.airbyte.cdk.load.task.implementor.FailSyncTaskFactory +import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage import io.airbyte.cdk.load.task.implementor.OpenStreamTask import io.airbyte.cdk.load.task.implementor.OpenStreamTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessBatchTaskFactory @@ -159,7 +160,8 @@ class DestinationTaskLauncherTest { MessageQueueSupplier< DestinationStream.Descriptor, Reserved>, checkpointQueue: QueueWriter>, - destinationTaskLauncher: DestinationTaskLauncher + destinationTaskLauncher: DestinationTaskLauncher, + fileTransferQueue: MessageQueue, ): InputConsumerTask { return object : InputConsumerTask { override suspend fun execute() { diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt index b763d0613953..f0ac67df834b 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.CheckpointMessageWrapped import io.airbyte.cdk.load.message.DestinationMessage import io.airbyte.cdk.load.message.DestinationStreamEvent +import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.QueueWriter import io.airbyte.cdk.load.state.Reserved @@ -18,6 +19,7 @@ import io.airbyte.cdk.load.task.implementor.CloseStreamTaskFactory import io.airbyte.cdk.load.task.implementor.FailStreamTask import io.airbyte.cdk.load.task.implementor.FailStreamTaskFactory import io.airbyte.cdk.load.task.implementor.FailSyncTaskFactory +import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage import io.airbyte.cdk.load.task.implementor.OpenStreamTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessBatchTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessFileTask @@ -80,6 +82,7 @@ class DestinationTaskLauncherUTest { mockk(relaxed = true) private val checkpointQueue: QueueWriter> = mockk(relaxed = true) + private val fileTransferQueue: MessageQueue = mockk(relaxed = true) private fun getDefaultDestinationTaskLauncher( useFileTranfer: Boolean ): DefaultDestinationTaskLauncher { @@ -107,6 +110,7 @@ class DestinationTaskLauncherUTest { inputFlow, recordQueueSupplier, checkpointQueue, + fileTransferQueue, ) } @@ -144,21 +148,6 @@ class DestinationTaskLauncherUTest { coVerify { spillToDiskTaskFactory.make(any(), any()) } } - class MockedTaskWrapper(override val innerTask: ScopedTask) : WrappedTask { - override suspend fun execute() {} - } - - @Test - fun `test handle file`() = runTest { - val processFileTask = mockk(relaxed = true) - every { processFileTaskFactory.make(any(), any(), any(), any()) } returns processFileTask - - val destinationTaskLauncher = getDefaultDestinationTaskLauncher(true) - destinationTaskLauncher.handleFile(mockk(), mockk(), 1L) - - coVerify { taskScopeProvider.launch(match { it.innerTask is ProcessFileTask }) } - } - @Test fun `test handle exception`() = runTest { val destinationTaskLauncher = getDefaultDestinationTaskLauncher(true) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt index 913a7e425d6b..641546ddd582 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt @@ -40,14 +40,6 @@ class MockTaskLauncher : DestinationTaskLauncher { throw NotImplementedError() } - override suspend fun handleFile( - stream: DestinationStream.Descriptor, - file: DestinationFile, - index: Long - ) { - throw NotImplementedError("This destination does not support file transfer.") - } - override suspend fun handleException(e: Exception) { TODO("Not yet implemented") } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt index 2623715bf598..6a48faa0ca36 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt @@ -111,11 +111,12 @@ class InputConsumerTaskTest { val task = taskFactory.make( - mockCatalogFactory.make(), - mockInputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(), + catalog = mockCatalogFactory.make(), + inputFlow = mockInputFlow, + recordQueueSupplier = recordQueueSupplier, + checkpointQueue = checkpointQueue, + destinationTaskLauncher = mockk(), + fileTransferQueue = mockk(relaxed = true), ) launch { task.execute() } @@ -188,11 +189,12 @@ class InputConsumerTaskTest { ) val task = taskFactory.make( - mockCatalogFactory.make(), - mockInputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(), + catalog = mockCatalogFactory.make(), + inputFlow = mockInputFlow, + recordQueueSupplier = recordQueueSupplier, + checkpointQueue = checkpointQueue, + destinationTaskLauncher = mockk(), + fileTransferQueue = mockk(relaxed = true), ) val job = launch { task.execute() } mockInputFlow.stop() @@ -246,11 +248,12 @@ class InputConsumerTaskTest { val task = taskFactory.make( - mockCatalogFactory.make(), - mockInputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(), + catalog = mockCatalogFactory.make(), + inputFlow = mockInputFlow, + recordQueueSupplier = recordQueueSupplier, + checkpointQueue = checkpointQueue, + destinationTaskLauncher = mockk(), + fileTransferQueue = mockk(relaxed = true), ) launch { task.execute() } batches.forEach { (stream, count, expectedCount) -> @@ -302,11 +305,12 @@ class InputConsumerTaskTest { val task = taskFactory.make( - mockCatalogFactory.make(), - mockInputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(), + catalog = mockCatalogFactory.make(), + inputFlow = mockInputFlow, + recordQueueSupplier = recordQueueSupplier, + checkpointQueue = checkpointQueue, + destinationTaskLauncher = mockk(), + fileTransferQueue = mockk(relaxed = true), ) launch { task.execute() } batches.forEach { event -> @@ -367,11 +371,12 @@ class InputConsumerTaskTest { ) val task = taskFactory.make( - mockCatalogFactory.make(), - mockInputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(relaxed = true), + catalog = mockCatalogFactory.make(), + inputFlow = mockInputFlow, + recordQueueSupplier = recordQueueSupplier, + checkpointQueue = checkpointQueue, + destinationTaskLauncher = mockk(), + fileTransferQueue = mockk(relaxed = true), ) CoroutineTestUtils.assertThrows(IllegalStateException::class) { task.execute() } mockInputFlow.stop() From 1e17723675cdab65be9341cb475a677f517476cf Mon Sep 17 00:00:00 2001 From: benmoriceau Date: Thu, 19 Dec 2024 08:02:19 -0800 Subject: [PATCH 5/9] format --- .../io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt | 1 - .../src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt | 1 - 2 files changed, 2 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt index f0ac67df834b..2dd90f858e7c 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt @@ -22,7 +22,6 @@ import io.airbyte.cdk.load.task.implementor.FailSyncTaskFactory import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage import io.airbyte.cdk.load.task.implementor.OpenStreamTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessBatchTaskFactory -import io.airbyte.cdk.load.task.implementor.ProcessFileTask import io.airbyte.cdk.load.task.implementor.ProcessFileTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessRecordsTaskFactory import io.airbyte.cdk.load.task.implementor.SetupTaskFactory diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt index 641546ddd582..da78d46d0e2a 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt @@ -6,7 +6,6 @@ package io.airbyte.cdk.load.task import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.BatchEnvelope -import io.airbyte.cdk.load.message.DestinationFile import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Requires import jakarta.inject.Singleton From 2cd50acbffea923f78f27c91548ae6d4731ac829 Mon Sep 17 00:00:00 2001 From: benmoriceau Date: Thu, 19 Dec 2024 08:43:37 -0800 Subject: [PATCH 6/9] Fix some build --- .../cdk/load/write/object_storage/FilePartAccumulator.kt | 1 - .../destination/iceberg/v2/IcebergStreamLoader.kt | 4 ---- 2 files changed, 5 deletions(-) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt index 9fadf03c0c60..114528cbf99e 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt @@ -65,7 +65,6 @@ class FilePartAccumulator( streamDescriptor: DestinationStream.Descriptor, index: Long, ) { - val wrapped = BatchEnvelope(batch, Range.singleton(index), streamDescriptor) outputQueue.publish(wrapped) } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt index 18546300050b..fb4f9cf8dfc3 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt @@ -71,10 +71,6 @@ class IcebergStreamLoader( return SimpleBatch(Batch.State.COMPLETE) } - override suspend fun processFile(file: DestinationFile): Batch { - throw NotImplementedError("Destination Iceberg does not support universal file transfer.") - } - override suspend fun close(streamFailure: StreamProcessingFailed?) { if (streamFailure == null) { // Doing it first to make sure that data coming in the current batch is written to the From 5e43c0a8084fc37a863ed335ce5437b30501cd5a Mon Sep 17 00:00:00 2001 From: benmoriceau Date: Thu, 19 Dec 2024 08:46:59 -0800 Subject: [PATCH 7/9] format --- .../integrations/destination/iceberg/v2/IcebergStreamLoader.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt index fb4f9cf8dfc3..17c1cfeb197a 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt @@ -8,7 +8,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.MapperPipeline import io.airbyte.cdk.load.message.Batch -import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.state.StreamProcessingFailed From 6df4b8d161bfc8348d70abbafaf98dfaded1020d Mon Sep 17 00:00:00 2001 From: benmoriceau Date: Thu, 19 Dec 2024 08:55:04 -0800 Subject: [PATCH 8/9] spotbug --- .../cdk/load/write/object_storage/FilePartAccumulator.kt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt index 114528cbf99e..809a320e11e9 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt @@ -5,6 +5,7 @@ package io.airbyte.cdk.load.write.object_storage import com.google.common.collect.Range +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory @@ -18,6 +19,10 @@ import io.airbyte.cdk.load.write.BatchAccumulator import java.io.File import java.nio.file.Path +@SuppressFBWarnings( + "NP_NONNULL_PARAM_VIOLATION", + justification = "state is guaranteed to be non-null by Kotlin's type system" +) class FilePartAccumulator( private val pathFactory: ObjectStoragePathFactory, private val stream: DestinationStream, From 0ca95e236a5f562494c62eb7644c2f3e86d4fa98 Mon Sep 17 00:00:00 2001 From: benmoriceau Date: Thu, 19 Dec 2024 15:52:55 -0800 Subject: [PATCH 9/9] Bump devnull --- .../connectors/destination-dev-null/metadata.yaml | 2 +- docs/integrations/destinations/dev-null.md | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-dev-null/metadata.yaml b/airbyte-integrations/connectors/destination-dev-null/metadata.yaml index ca2b28ffac1d..698f7f924ead 100644 --- a/airbyte-integrations/connectors/destination-dev-null/metadata.yaml +++ b/airbyte-integrations/connectors/destination-dev-null/metadata.yaml @@ -10,7 +10,7 @@ data: - suite: integrationTests connectorType: destination definitionId: a7bcc9d8-13b3-4e49-b80d-d020b90045e3 - dockerImageTag: 0.7.13 + dockerImageTag: 0.7.14 dockerRepository: airbyte/destination-dev-null documentationUrl: https://docs.airbyte.com/integrations/destinations/dev-null githubIssueLabel: destination-dev-null diff --git a/docs/integrations/destinations/dev-null.md b/docs/integrations/destinations/dev-null.md index 901f6eb9f680..2166cdc28bbf 100644 --- a/docs/integrations/destinations/dev-null.md +++ b/docs/integrations/destinations/dev-null.md @@ -49,8 +49,9 @@ The OSS and Cloud variants have the same version number starting from version `0 | Version | Date | Pull Request | Subject | |:------------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------| -| 0.7.13 | 2024-12-18 | [49899](https://github.com/airbytehq/airbyte/pull/49899) | Use a base image: airbyte/java-connector-base:1.0.0 | -| 0.7.12 | 2024-12-04 | [48794](https://github.com/airbytehq/airbyte/pull/48794) | Promoting release candidate 0.7.12-rc.2 to a main version. | +| 0.7.13 | 2024-12-19 | [49899](https://github.com/airbytehq/airbyte/pull/49931) | Update CDK | +| 0.7.13 | 2024-12-18 | [49899](https://github.com/airbytehq/airbyte/pull/49899) | Use a base image: airbyte/java-connector-base:1.0.0 | +| 0.7.12 | 2024-12-04 | [48794](https://github.com/airbytehq/airbyte/pull/48794) | Promoting release candidate 0.7.12-rc.2 to a main version. | | 0.7.12-rc.2 | 2024-11-26 | [48693](https://github.com/airbytehq/airbyte/pull/48693) | Update for testing progressive rollout | | 0.7.12-rc.1 | 2024-11-25 | [48693](https://github.com/airbytehq/airbyte/pull/48693) | Update for testing progressive rollout | | 0.7.11 | 2024-11-18 | [48468](https://github.com/airbytehq/airbyte/pull/48468) | Implement File CDk |