Коммит d845e36a создал по автору Bruce Hamilton's avatar Bruce Hamilton Зафиксировано автором Bruce Hamilton
Просмотр файлов

KTOR-8172 Fix issue in JVM byte channel read function

владелец 2e5b4fbb
...@@ -96,9 +96,8 @@ public suspend fun ByteReadChannel.copyTo(channel: WritableByteChannel, limit: L ...@@ -96,9 +96,8 @@ public suspend fun ByteReadChannel.copyTo(channel: WritableByteChannel, limit: L
} }
} }
while (copied < limit) { while (copied < limit && !isClosedForRead) {
read(min = 0, consumer = copy) read(min = 0, consumer = copy)
if (isClosedForRead) break
} }
closedCause?.let { throw it } closedCause?.let { throw it }
...@@ -184,19 +183,17 @@ public fun ByteReadChannel.readAvailable(block: (ByteBuffer) -> Int): Int { ...@@ -184,19 +183,17 @@ public fun ByteReadChannel.readAvailable(block: (ByteBuffer) -> Int): Int {
* *
* @param min amount of bytes available for read, should be positive or zero * @param min amount of bytes available for read, should be positive or zero
* @param consumer to be invoked when at least [min] bytes available for read * @param consumer to be invoked when at least [min] bytes available for read
* @throws EOFException when there are less than [min] bytes available after the channel is closed
*/ */
@OptIn(InternalAPI::class) @OptIn(InternalAPI::class)
public suspend inline fun ByteReadChannel.read(min: Int = 1, noinline consumer: (ByteBuffer) -> Unit) { public suspend inline fun ByteReadChannel.read(min: Int = 1, noinline consumer: (ByteBuffer) -> Unit) {
require(min >= 0) { "min should be positive or zero" } require(min >= 0) { "min should be positive or zero" }
if (availableForRead > 0 && availableForRead >= min) { if (min > 0) {
if (!awaitContent(min)) {
throw EOFException("Not enough bytes available: required $min but $availableForRead available")
}
readBuffer.read(consumer)
} else if (awaitContent()) {
readBuffer.read(consumer) readBuffer.read(consumer)
return
}
awaitContent()
if (isClosedForRead && min > 0) {
throw EOFException("Not enough bytes available: required $min but $availableForRead available")
} }
if (availableForRead > 0) readBuffer.read(consumer)
} }
...@@ -4,7 +4,11 @@ ...@@ -4,7 +4,11 @@
import io.ktor.utils.io.* import io.ktor.utils.io.*
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.test.runTest
import kotlinx.io.EOFException
import org.junit.jupiter.api.assertThrows
import kotlin.test.* import kotlin.test.*
import kotlin.test.Test
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
import kotlin.time.measureTime import kotlin.time.measureTime
...@@ -100,4 +104,16 @@ class ByteReadChannelOperationsJvmTest { ...@@ -100,4 +104,16 @@ class ByteReadChannelOperationsJvmTest {
assertTrue(time < 5.seconds, "Expected I/O to be complete in a reasonable time, but it took $time") assertTrue(time < 5.seconds, "Expected I/O to be complete in a reasonable time, but it took $time")
assertEquals(2_088_890, out.length) assertEquals(2_088_890, out.length)
} }
@Test
fun readWithGreaterMinThrows() = runTest {
val channel = ByteChannel()
channel.writeByte(1)
channel.close()
assertThrows<EOFException> {
channel.read(2) {
fail("There is only one byte in the channel")
}
}
}
} }
Поддерживает Markdown
0% или .
You are about to add 0 people to the discussion. Proceed with caution.
Сначала завершите редактирование этого сообщения!
Пожалуйста, зарегистрируйтесь или чтобы прокомментировать