From 1d19b772e993fb03bd0ed7ce17784addae0c522f Mon Sep 17 00:00:00 2001 From: Bruce Hamilton <150327496+bjhham@users.noreply.github.com> Date: Wed, 27 Nov 2024 19:29:26 +0100 Subject: [PATCH] KTOR-7845 Fix for threading issue in flushAndClose for reader job channels (#4503) --- ktor-io/api/ktor-io.api | 4 ++++ ktor-io/api/ktor-io.klib.api | 1 + .../utils/io/ByteReadChannelOperations.kt | 3 +-- .../utils/io/CloseHookByteWriteChannel.kt | 24 +++++++++++++++++++ .../io/ktor/tests/utils/FileChannelTest.kt | 21 ++++++++++++++++ 5 files changed, 51 insertions(+), 2 deletions(-) create mode 100644 ktor-io/common/src/io/ktor/utils/io/CloseHookByteWriteChannel.kt diff --git a/ktor-io/api/ktor-io.api b/ktor-io/api/ktor-io.api index 68a8877a475..9adc665a32b 100644 --- a/ktor-io/api/ktor-io.api +++ b/ktor-io/api/ktor-io.api @@ -173,6 +173,10 @@ public abstract interface class io/ktor/utils/io/ChannelJob { public abstract fun getJob ()Lkotlinx/coroutines/Job; } +public final class io/ktor/utils/io/CloseHookByteWriteChannelKt { + public static final fun onClose (Lio/ktor/utils/io/ByteWriteChannel;Lkotlin/jvm/functions/Function1;)Lio/ktor/utils/io/ByteWriteChannel; +} + public final class io/ktor/utils/io/ConcurrentIOException : java/lang/IllegalStateException { public fun (Ljava/lang/String;Ljava/lang/Throwable;)V public synthetic fun (Ljava/lang/String;Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V diff --git a/ktor-io/api/ktor-io.klib.api b/ktor-io/api/ktor-io.klib.api index 053abc706a0..a26bd58db5c 100644 --- a/ktor-io/api/ktor-io.klib.api +++ b/ktor-io/api/ktor-io.klib.api @@ -364,6 +364,7 @@ final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/cancel() // io.kt final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/close() // io.ktor.utils.io/close|close@io.ktor.utils.io.ByteWriteChannel(){}[0] final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/close(kotlin/Throwable?) // io.ktor.utils.io/close|close@io.ktor.utils.io.ByteWriteChannel(kotlin.Throwable?){}[0] final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/counted(): io.ktor.utils.io/CountedByteWriteChannel // io.ktor.utils.io/counted|counted@io.ktor.utils.io.ByteWriteChannel(){}[0] +final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/onClose(kotlin.coroutines/SuspendFunction0): io.ktor.utils.io/ByteWriteChannel // io.ktor.utils.io/onClose|onClose@io.ktor.utils.io.ByteWriteChannel(kotlin.coroutines.SuspendFunction0){}[0] final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/rethrowCloseCauseIfNeeded() // io.ktor.utils.io/rethrowCloseCauseIfNeeded|rethrowCloseCauseIfNeeded@io.ktor.utils.io.ByteWriteChannel(){}[0] final fun (io.ktor.utils.io/ChannelJob).io.ktor.utils.io/cancel() // io.ktor.utils.io/cancel|cancel@io.ktor.utils.io.ChannelJob(){}[0] final fun (io.ktor.utils.io/ChannelJob).io.ktor.utils.io/getCancellationException(): kotlin.coroutines.cancellation/CancellationException // io.ktor.utils.io/getCancellationException|getCancellationException@io.ktor.utils.io.ChannelJob(){}[0] diff --git a/ktor-io/common/src/io/ktor/utils/io/ByteReadChannelOperations.kt b/ktor-io/common/src/io/ktor/utils/io/ByteReadChannelOperations.kt index eee738d5b1b..d686a372393 100644 --- a/ktor-io/common/src/io/ktor/utils/io/ByteReadChannelOperations.kt +++ b/ktor-io/common/src/io/ktor/utils/io/ByteReadChannelOperations.kt @@ -14,7 +14,6 @@ import kotlinx.io.Buffer import kotlinx.io.bytestring.* import kotlinx.io.unsafe.* import kotlin.coroutines.* -import kotlin.jvm.* import kotlin.math.* @OptIn(InternalAPI::class) @@ -309,7 +308,7 @@ public fun CoroutineScope.reader( } } - return ReaderJob(channel, job) + return ReaderJob(channel.onClose { job.join() }, job) } /** diff --git a/ktor-io/common/src/io/ktor/utils/io/CloseHookByteWriteChannel.kt b/ktor-io/common/src/io/ktor/utils/io/CloseHookByteWriteChannel.kt new file mode 100644 index 00000000000..d5925c7b2db --- /dev/null +++ b/ktor-io/common/src/io/ktor/utils/io/CloseHookByteWriteChannel.kt @@ -0,0 +1,24 @@ +/* + * Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package io.ktor.utils.io + +/** + * Wraps this channel to execute the provided action when closed using `flushAndClose()`. + * + * @param onClose The action to execute when the channel is closed. + * @return A new `ByteWriteChannel` that executes the given action upon closure. + */ +public fun ByteWriteChannel.onClose(onClose: suspend () -> Unit): ByteWriteChannel = + CloseHookByteWriteChannel(this, onClose) + +internal class CloseHookByteWriteChannel( + private val delegate: ByteWriteChannel, + private val onClose: suspend () -> Unit +) : ByteWriteChannel by delegate { + override suspend fun flushAndClose() { + delegate.flushAndClose() + onClose() + } +} diff --git a/ktor-utils/jvm/test/io/ktor/tests/utils/FileChannelTest.kt b/ktor-utils/jvm/test/io/ktor/tests/utils/FileChannelTest.kt index 3a32b8dc2d9..e6af6335ae6 100644 --- a/ktor-utils/jvm/test/io/ktor/tests/utils/FileChannelTest.kt +++ b/ktor-utils/jvm/test/io/ktor/tests/utils/FileChannelTest.kt @@ -9,9 +9,11 @@ import io.ktor.util.cio.* import io.ktor.utils.io.* import io.ktor.utils.io.jvm.javaio.* import kotlinx.coroutines.* +import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.* import org.junit.jupiter.api.extension.* import java.io.* +import java.nio.file.Files import kotlin.test.* import kotlin.test.Test @@ -117,4 +119,23 @@ class FileChannelTest { // Assert (we cannot delete if there is a file handle open on it) assertTrue(temp.delete()) } + + @Test + fun `writeChannel finishes on close`() = runTest { + val file = Files.createTempFile("file", "txt").toFile() + val ch = file.writeChannel() + ch.writeStringUtf8("Hello") + ch.flushAndClose() + assertEquals(5, file.length()) + assertEquals("Hello", file.readText()) + } + + @Test + fun `writeChannel writes to file on flush`() = runTest { + val file = Files.createTempFile("file", "txt").toFile() + val ch = file.writeChannel() + ch.writeStringUtf8("Hello") + ch.flush() + assertEquals("Hello", file.readText()) + } }