添加web socket服务支持

This commit is contained in:
tursom 2020-05-19 22:51:56 +08:00
parent 2ade0aa6d1
commit 810a116e19
5 changed files with 97 additions and 5 deletions

View File

@ -2,6 +2,7 @@ package cn.tursom.web.netty
import cn.tursom.web.HttpHandler
import cn.tursom.web.HttpServer
import cn.tursom.web.WebSocketHandler
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelInitializer
@ -23,7 +24,7 @@ class NettyHttpServer(
handler: HttpHandler<NettyHttpContent, NettyExceptionContent>,
bodySize: Int = 512 * 1024,
autoRun: Boolean = false,
webSocketPath: String? = null,
webSocketPath: Iterable<Pair<String, WebSocketHandler<NettyWebSocketContext>>> = listOf(),
readTimeout: Int? = null,
writeTimeout: Int? = null
) : HttpServer {
@ -31,14 +32,16 @@ class NettyHttpServer(
port: Int,
bodySize: Int = 512 * 1024,
autoRun: Boolean = false,
webSocketPath: Iterable<Pair<String, WebSocketHandler<NettyWebSocketContext>>> = listOf(),
readTimeout: Int? = null,
writeTimeout: Int? = null,
handler: (content: NettyHttpContent) -> Unit
) : this(
port,
object : HttpHandler<NettyHttpContent, NettyExceptionContent> {
override fun handle(content: NettyHttpContent) = handler(content)
},
bodySize,
autoRun
bodySize, autoRun, webSocketPath, readTimeout, writeTimeout
)
val httpHandler = NettyHttpHandler(handler)
@ -58,8 +61,9 @@ class NettyHttpServer(
.addLast("aggregator", HttpObjectAggregator(bodySize))
.addLast("http-chunked", ChunkedWriteHandler())
.apply {
if (webSocketPath != null) {
addLast("ws", WebSocketServerProtocolHandler(webSocketPath))
webSocketPath.forEach { (webSocketPath, handler) ->
addLast("ws-$webSocketPath", WebSocketServerProtocolHandler(webSocketPath))
addLast("wsHandler-$webSocketPath", NettyWebSocketHandler(ch, handler))
}
}
.addLast("handle", httpHandler)

View File

@ -0,0 +1,36 @@
package cn.tursom.web.netty
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.read
import cn.tursom.utils.bytebuffer.NettyByteBuffer
import cn.tursom.web.WebSocketContext
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
class NettyWebSocketContext(
private val channel: Channel
) : WebSocketContext {
override fun writeText(buffer: ByteBuffer) {
if (buffer is NettyByteBuffer) {
channel.writeAndFlush(TextWebSocketFrame(buffer.byteBuf))
} else {
buffer.read {
channel.writeAndFlush(TextWebSocketFrame(Unpooled.wrappedBuffer(it)))
}
buffer.close()
}
}
override fun writeBinary(buffer: ByteBuffer) {
if (buffer is NettyByteBuffer) {
channel.writeAndFlush(BinaryWebSocketFrame(buffer.byteBuf))
} else {
buffer.read {
channel.writeAndFlush(BinaryWebSocketFrame(Unpooled.wrappedBuffer(it)))
}
buffer.close()
}
}
}

View File

@ -0,0 +1,28 @@
package cn.tursom.web.netty
import cn.tursom.utils.bytebuffer.NettyByteBuffer
import cn.tursom.web.WebSocketHandler
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
import io.netty.handler.codec.http.websocketx.WebSocketFrame
class NettyWebSocketHandler(
channel: Channel,
private val handler: WebSocketHandler<NettyWebSocketContext>
) : SimpleChannelInboundHandler<WebSocketFrame>() {
private val webSocketContext = NettyWebSocketContext(channel)
override fun channelRead0(ctx: ChannelHandlerContext, msg: WebSocketFrame) {
when (msg) {
is TextWebSocketFrame -> {
handler.recvText(NettyByteBuffer(msg.content()), webSocketContext)
}
is BinaryWebSocketFrame -> {
handler.recvBinary(NettyByteBuffer(msg.content()), webSocketContext)
}
}
}
}

View File

@ -0,0 +1,13 @@
package cn.tursom.web
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer
import java.nio.charset.Charset
interface WebSocketContext {
fun writeText(buffer: ByteBuffer)
fun writeText(bytes: ByteArray) = writeText(HeapByteBuffer(bytes))
fun writeText(str: String, charset: Charset = Charsets.UTF_8) = writeText(str.toByteArray(charset))
fun writeBinary(buffer: ByteBuffer)
fun writeBinary(bytes: ByteArray) = writeBinary(HeapByteBuffer(bytes))
}

View File

@ -0,0 +1,11 @@
package cn.tursom.web
import cn.tursom.core.buffer.ByteBuffer
interface WebSocketHandler<in T : WebSocketContext> {
fun recvText(str: String, context: T)
fun recvText(byteBuffer: ByteBuffer, context: T) = recvText(byteBuffer.getString(), context)
fun recvBinary(bytes: ByteArray, context: T)
fun recvBinary(byteBuffer: ByteBuffer, context: T) = recvBinary(byteBuffer.getBytes(), context)
}