Не подтверждена Коммит 1c4fcc3f создал по автору Mariia Skripchenko's avatar Mariia Skripchenko Зафиксировано автором GitHub
Просмотр файлов

KTOR-6172 Add Server-sent events (SSE) plugin for server (#3727)

владелец c75fce9b
......@@ -53,7 +53,7 @@ internal suspend fun CloseableHttpAsyncClient.sendRequest(
val body: Any = if (requestData.isSseRequest()) {
DefaultClientSSESession(
requestData.body as SSEContent,
requestData.body as SSEClientContent,
consumer.responseChannel,
callContext,
status,
......
......@@ -56,7 +56,7 @@ internal suspend fun CloseableHttpAsyncClient.sendRequest(
val headers = HeadersImpl(rawHeaders)
val body: Any = if (requestData.isSseRequest()) {
DefaultClientSSESession(
requestData.body as SSEContent,
requestData.body as SSEClientContent,
bodyConsumer.responseChannel,
callContext,
status,
......
......@@ -194,7 +194,7 @@ internal suspend fun readResponse(
}
val responseBody: Any = if (request.isSseRequest()) {
DefaultClientSSESession(request.body as SSEContent, body, callContext, status, headers)
DefaultClientSSESession(request.body as SSEClientContent, body, callContext, status, headers)
} else {
body
}
......
......@@ -871,7 +871,7 @@ public abstract interface class io/ktor/client/plugins/sse/ClientSSESession : ko
}
public final class io/ktor/client/plugins/sse/DefaultClientSSESession : io/ktor/client/plugins/sse/ClientSSESession {
public fun <init> (Lio/ktor/client/plugins/sse/SSEContent;Lio/ktor/utils/io/ByteReadChannel;Lkotlin/coroutines/CoroutineContext;Lio/ktor/http/HttpStatusCode;Lio/ktor/http/Headers;)V
public fun <init> (Lio/ktor/client/plugins/sse/SSEClientContent;Lio/ktor/utils/io/ByteReadChannel;Lkotlin/coroutines/CoroutineContext;Lio/ktor/http/HttpStatusCode;Lio/ktor/http/Headers;)V
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
public fun getIncoming ()Lkotlinx/coroutines/flow/Flow;
}
......@@ -881,15 +881,7 @@ public final class io/ktor/client/plugins/sse/SSECapability : io/ktor/client/eng
public fun toString ()Ljava/lang/String;
}
public final class io/ktor/client/plugins/sse/SSEConfig {
public fun <init> ()V
public final fun getReconnectionTime-UwyO8pc ()J
public final fun setReconnectionTime-LRDsOJo (J)V
public final fun showCommentEvents ()V
public final fun showRetryEvents ()V
}
public final class io/ktor/client/plugins/sse/SSEContent : io/ktor/http/content/OutgoingContent$NoContent {
public final class io/ktor/client/plugins/sse/SSEClientContent : io/ktor/http/content/OutgoingContent$NoContent {
public synthetic fun <init> (JZZLkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun getHeaders ()Lio/ktor/http/Headers;
public final fun getReconnectionTime-UwyO8pc ()J
......@@ -898,6 +890,14 @@ public final class io/ktor/client/plugins/sse/SSEContent : io/ktor/http/content/
public fun toString ()Ljava/lang/String;
}
public final class io/ktor/client/plugins/sse/SSEConfig {
public fun <init> ()V
public final fun getReconnectionTime-UwyO8pc ()J
public final fun setReconnectionTime-LRDsOJo (J)V
public final fun showCommentEvents ()V
public final fun showRetryEvents ()V
}
public final class io/ktor/client/plugins/sse/SSEKt {
public static final fun getSSE ()Lio/ktor/client/plugins/api/ClientPlugin;
}
......
......@@ -13,7 +13,7 @@ import kotlin.coroutines.*
@OptIn(InternalAPI::class)
public class DefaultClientSSESession(
content: SSEContent,
content: SSEClientContent,
private var input: ByteReadChannel,
override val coroutineContext: CoroutineContext,
status: HttpStatusCode,
......@@ -28,12 +28,8 @@ public class DefaultClientSSESession(
while (true) {
val event = input.parseEvent() ?: break
if (event.isCommentsEvent() && !showCommentEvents) {
continue
}
if (event.isRetryEvent() && !showRetryEvents) {
continue
}
if (event.isCommentsEvent() && !showCommentEvents) continue
if (event.isRetryEvent() && !showRetryEvents) continue
send(event)
}
......@@ -49,7 +45,7 @@ public class DefaultClientSSESession(
if (headers[HttpHeaders.ContentType] != ContentType.Text.EventStream.toString()) {
throw SSEException(
"Content type must be `text/event-stream` but it was: ${headers[HttpHeaders.ContentType]}"
"Content type must be `text/event-stream` but was: ${headers[HttpHeaders.ContentType]}"
)
}
}
......@@ -99,7 +95,7 @@ public class DefaultClientSSESession(
"event" -> eventType = value
"data" -> {
wasData = true
data.append(value).append(LF)
data.append(value).append(END_OF_LINE)
}
"retry" -> {
......@@ -120,10 +116,10 @@ public class DefaultClientSSESession(
}
private fun StringBuilder.appendComment(comment: String) {
append(comment.removePrefix(COLON).removePrefix(SPACE)).append(LF)
append(comment.removePrefix(COLON).removePrefix(SPACE)).append(END_OF_LINE)
}
private fun StringBuilder.toText() = toString().removeSuffix(LF)
private fun StringBuilder.toText() = toString().removeSuffix(END_OF_LINE)
private fun ServerSentEvent.isEmpty() =
data == null && id == null && event == null && retry == null && comments == null
......@@ -135,7 +131,4 @@ public class DefaultClientSSESession(
data == null && event == null && id == null && comments == null && retry != null
}
private const val COLON = ":"
private const val SPACE = " "
private const val LF = "\n"
private const val NULL = "\u0000"
......@@ -50,7 +50,7 @@ public val SSE: ClientPlugin<SSEConfig> = createClientPlugin(
val localShowCommentEvents = getAttributeValue(request, showCommentEventsAttr)
val localShowRetryEvents = getAttributeValue(request, showRetryEventsAttr)
SSEContent(
SSEClientContent(
localReconnectionTime ?: reconnectionTime,
localShowCommentEvents ?: showCommentEvents,
localShowRetryEvents ?: showRetryEvents
......
......@@ -10,7 +10,7 @@ import io.ktor.util.*
import kotlin.time.*
@InternalAPI
public class SSEContent(
public class SSEClientContent(
public val reconnectionTime: Duration,
public val showCommentEvents: Boolean,
public val showRetryEvents: Boolean,
......@@ -21,5 +21,5 @@ public class SSEContent(
append(HttpHeaders.CacheControl, "no-store")
}.build()
override fun toString(): String = "SSEContent"
override fun toString(): String = "SSEClientContent"
}
......@@ -308,5 +308,5 @@ public fun HttpRequestData.isUpgradeRequest(): Boolean {
@InternalAPI
@Suppress("KDocMissingDocumentation")
public fun HttpRequestData.isSseRequest(): Boolean {
return body is SSEContent
return body is SSEClientContent
}
......@@ -47,7 +47,7 @@ internal class JavaHttpResponseBodyHandler(
val headers = HeadersImpl(response.headers().map())
val body: Any = if (requestData.isSseRequest()) {
DefaultClientSSESession(requestData.body as SSEContent, responseChannel, callContext, status, headers)
DefaultClientSSESession(requestData.body as SSEClientContent, responseChannel, callContext, status, headers)
} else {
responseChannel
}
......
Нет предварительного просмотра для этого типа файлов
......@@ -44,7 +44,10 @@ class ServerSentEventsTest : ClientLoader() {
session.incoming.single().apply {
assertEquals("0", id)
assertEquals("hello 0", event)
assertEquals("hello\nfrom server", data)
val lines = data?.lines() ?: emptyList()
assertEquals(2, lines.size)
assertEquals("hello", lines[0])
assertEquals("from server", lines[1])
}
session.cancel()
}
......@@ -64,7 +67,10 @@ class ServerSentEventsTest : ClientLoader() {
session.incoming.collectIndexed { i, it ->
assertEquals("$i", it.id)
assertEquals("hello $i", it.event)
assertEquals("hello\nfrom server", it.data)
val lines = it.data?.lines() ?: emptyList()
assertEquals(2, lines.size)
assertEquals("hello", lines[0])
assertEquals("from server", lines[1])
size++
}
assertEquals(100, size)
......@@ -76,7 +82,10 @@ class ServerSentEventsTest : ClientLoader() {
session.incoming.collectIndexed { i, it ->
assertEquals("$i", it.id)
assertEquals("hello $i", it.event)
assertEquals("hello\nfrom server", it.data)
val lines = it.data?.lines() ?: emptyList()
assertEquals(2, lines.size)
assertEquals("hello", lines[0])
assertEquals("from server", lines[1])
size++
}
assertEquals(50, size)
......
public final class io/ktor/server/sse/RoutingKt {
public static final fun sse (Lio/ktor/server/routing/RoutingBuilder;Ljava/lang/String;Lkotlin/jvm/functions/Function2;)V
public static final fun sse (Lio/ktor/server/routing/RoutingBuilder;Lkotlin/jvm/functions/Function2;)V
}
public final class io/ktor/server/sse/SSEKt {
public static final fun getSSE ()Lio/ktor/server/application/ApplicationPlugin;
}
public final class io/ktor/server/sse/SSEServerContent : io/ktor/http/content/OutgoingContent$WriteChannelContent {
public fun <init> (Lio/ktor/server/application/ApplicationCall;Lkotlin/jvm/functions/Function2;)V
public final fun getCall ()Lio/ktor/server/application/ApplicationCall;
public fun getContentType ()Lio/ktor/http/ContentType;
public final fun getHandle ()Lkotlin/jvm/functions/Function2;
public fun getHeaders ()Lio/ktor/http/Headers;
public fun toString ()Ljava/lang/String;
public fun writeTo (Lio/ktor/utils/io/ByteWriteChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
public abstract interface class io/ktor/server/sse/ServerSSESession : kotlinx/coroutines/CoroutineScope {
public abstract fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getCall ()Lio/ktor/server/application/ApplicationCall;
public abstract fun send (Lio/ktor/sse/ServerSentEvent;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun send (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
public final class io/ktor/server/sse/ServerSSESession$DefaultImpls {
public static fun send (Lio/ktor/server/sse/ServerSSESession;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun send$default (Lio/ktor/server/sse/ServerSSESession;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}
description = "Server-sent events (SSE) support"
kotlin.sourceSets {
jvmAndNixMain {
dependencies {
api(project(":ktor-shared:ktor-sse"))
}
}
}
/*
* Copyright 2014-2023 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/
package io.ktor.server.sse
import io.ktor.server.application.*
import io.ktor.sse.*
import io.ktor.util.*
import io.ktor.utils.io.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
internal class DefaultServerSSESession(
private val output: ByteWriteChannel,
override val call: ApplicationCall,
override val coroutineContext: CoroutineContext
) : ServerSSESession {
private val mutex = Mutex()
override suspend fun send(event: ServerSentEvent) {
mutex.withLock {
output.writeSSE(event)
}
}
override suspend fun close() {
mutex.withLock {
output.close()
}
}
@OptIn(InternalAPI::class)
private suspend fun ByteWriteChannel.writeSSE(event: ServerSentEvent) {
writeStringUtf8(event.toString() + END_OF_LINE)
flush()
}
}
/*
* Copyright 2014-2023 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/
package io.ktor.server.sse
import io.ktor.http.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
/**
* Adds a route to handle Server-Sent Events (SSE) at the specified [path] using the provided [handler].
* Requires [SSE] plugin to be installed.
*
* @param path URL path at which to handle SSE requests.
* @param handler function that defines the behavior of the SSE session. It is invoked when a client connects to the SSE
* endpoint. Inside the handler, you can use the functions provided by [ServerSSESession]
* to send events to the connected clients.
*
* @see ServerSSESession
*/
public fun RoutingBuilder.sse(path: String, handler: suspend ServerSSESession.() -> Unit) {
plugin(SSE)
route(path, HttpMethod.Get) {
sse(handler)
}
}
/**
* Adds a route to handle Server-Sent Events (SSE) using the provided [handler].
* Requires [SSE] plugin to be installed.
*
* @param handler function that defines the behavior of the SSE session. It is invoked when a client connects to the SSE
* endpoint. Inside the handler, you can use the functions provided by [ServerSSESession]
* to send events to the connected clients.
*
* @see ServerSSESession
*/
public fun RoutingBuilder.sse(handler: suspend ServerSSESession.() -> Unit) {
plugin(SSE)
handle {
call.respond(SSEServerContent(call, handler))
}
}
/*
* Copyright 2014-2023 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/
package io.ktor.server.sse
import io.ktor.server.application.*
import io.ktor.util.logging.*
internal val LOGGER = KtorSimpleLogger("io.ktor.server.plugins.sse.SSE")
/**
* Server-Sent Events (SSE) support plugin. It is required to be installed first before binding any sse endpoints.
*
* To learn more, see [specification](https://html.spec.whatwg.org/multipage/server-sent-events.html).
*
* Example:
* ```kotlin
* install(SSE)
*
* install(Routing) {
* sse {
* send(ServerSentEvent("Hello"))
* }
* }
* ```
*/
public val SSE: ApplicationPlugin<Unit> = createApplicationPlugin("SSE") {}
/*
* Copyright 2014-2023 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/
package io.ktor.server.sse
import io.ktor.http.*
import io.ktor.http.content.*
import io.ktor.server.application.*
import io.ktor.server.request.*
import io.ktor.utils.io.*
import kotlinx.coroutines.*
/**
* An [OutgoingContent] response object that could be used to `respond()`.
* It will start Server-Sent events [SSE] session.
*
* Please note that you generally shouldn't use this object directly but use [SSE] plugin with routing builders
* [sse] instead.
*
* [handle] function is applied to a session.
*
* @param call that is starting SSE session.
* @param handle function that is started once SSE session created.
*/
public class SSEServerContent(
public val call: ApplicationCall,
public val handle: suspend ServerSSESession.() -> Unit
) : OutgoingContent.WriteChannelContent() {
override val contentType: ContentType = ContentType.Text.EventStream
override val headers: Headers = HeadersBuilder().apply {
append(HttpHeaders.CacheControl, "no-store")
append(HttpHeaders.Connection, "keep-alive")
}.build()
override suspend fun writeTo(channel: ByteWriteChannel) {
LOGGER.trace("Starting sse session for ${call.request.uri}")
var session: ServerSSESession? = null
try {
coroutineScope {
session = DefaultServerSSESession(channel, call, coroutineContext)
session?.handle()
}
} finally {
session?.close()
}
}
override fun toString(): String = "SSEServerContent"
}
/*
* Copyright 2014-2023 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/
package io.ktor.server.sse
import io.ktor.server.application.*
import io.ktor.sse.*
import kotlinx.coroutines.*
/**
* Represents a server-side server-sent events session.
* An [ServerSSESession] allows the server to send [ServerSentEvent] to the client over a single HTTP connection.
*
* @see [SSE]
*/
public interface ServerSSESession : CoroutineScope {
/**
* Associated received [call] that originating this session.
*/
public val call: ApplicationCall
/**
* Sends a [ServerSentEvent] to the client.
*/
public suspend fun send(event: ServerSentEvent)
/**
* Creates and sends a [ServerSentEvent] to the client.
*
* @param data data field of the event.
* @param event string identifying the type of event.
* @param id event ID.
* @param retry reconnection time, in milliseconds to wait before reconnecting.
* @param comments comment lines starting with a ':' character.
*/
public suspend fun send(
data: String? = null,
event: String? = null,
id: String? = null,
retry: Long? = null,
comments: String? = null
) {
send(ServerSentEvent(data, event, id, retry, comments))
}
/**
* Closes the [ServerSSESession], terminating the connection with the client.
* Once this method is called, the SSE session is closed and no further events can be sent.
* You don't need to call this method as it is called automatically when all the send operations are completed.
*
* It's important to note that closing the session using this method does not send a termination event
* to the client. If you wish to send a specific event to signify the end of the SSE stream
* before closing the session, you can use the [send] function for it.
*/
public suspend fun close()
}
/*
* Copyright 2014-2023 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/
package io.ktor.server.sse
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*
import io.ktor.server.testing.*
import io.ktor.sse.*
import kotlinx.coroutines.*
import kotlin.test.*
class ServerSentEventsTest {
@Test
fun testSingleEvents() = testApplication {
install(SSE)
routing {
sse("/hello") {
send(ServerSentEvent("world"))
}
}
client.get("/hello").apply {
assertEquals(HttpStatusCode.OK, status)
assertEquals(ContentType.Text.EventStream.toString(), headers[HttpHeaders.ContentType])
assertEquals("data: world", bodyAsText().trim())
}
}
@Test
fun testEvents() = testApplication {
install(SSE)
routing {
sse("/events") {
repeat(100) {
send(ServerSentEvent("event $it"))
}
}
}
client.get("/events").apply {
assertEquals(HttpStatusCode.OK, status)
assertEquals(ContentType.Text.EventStream.toString(), headers[HttpHeaders.ContentType])
val events = bodyAsText().lines()
assertEquals(201, events.size)
for (i in 0 until 100) {
assertEquals("data: event $i", events[i * 2])
assertTrue { events[i * 2 + 1].isEmpty() }
}
}
}
@Test
fun testChannelsOfEvents() = testApplication {
install(SSE)
routing {
sse("/events") {
launch {
repeat(100) {
send(ServerSentEvent("channel-1 $it"))
}
}
launch {
repeat(100) {
send(ServerSentEvent("channel-2 $it"))
}
}
}
}
client.get("/events").apply {
assertEquals(HttpStatusCode.OK, status)
assertEquals(ContentType.Text.EventStream.toString(), headers[HttpHeaders.ContentType])
val events = bodyAsText().lines()
assertEquals(401, events.size)
for (i in 0 until 100) {
assertContains(events, "data: channel-1 $i")
assertContains(events, "data: channel-2 $i")
}
}
}
@Test
fun testSeveralClients() = testApplication {
install(SSE)
routing {
sse("/events") {
repeat(100) {
send(ServerSentEvent("event $it"))
}
}
}
client.get("/events").apply {
val events = bodyAsText().lines()
assertEquals(201, events.size)
for (i in 0 until 100) {
assertEquals("data: event $i", events[i * 2])
assertTrue { events[i * 2 + 1].isEmpty() }
}
}
client.get("/events").apply {
val events = bodyAsText().lines()
assertEquals(201, events.size)
for (i in 0 until 100) {
assertEquals("data: event $i", events[i * 2])
assertTrue { events[i * 2 + 1].isEmpty() }
}
}
}
}
......@@ -15,3 +15,9 @@ public final class io/ktor/sse/ServerSentEvent {
public fun toString ()Ljava/lang/String;
}
public final class io/ktor/sse/ServerSentEventKt {
public static final field COLON Ljava/lang/String;
public static final field END_OF_LINE Ljava/lang/String;
public static final field SPACE Ljava/lang/String;
}
Поддерживает Markdown
0% или .
You are about to add 0 people to the discussion. Proceed with caution.
Сначала завершите редактирование этого сообщения!
Пожалуйста, зарегистрируйтесь или чтобы прокомментировать