diff --git a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpContent.kt b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpContent.kt index b595a03..18aeac6 100644 --- a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpContent.kt +++ b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpContent.kt @@ -10,20 +10,24 @@ import io.netty.buffer.Unpooled import io.netty.channel.ChannelFuture import io.netty.channel.ChannelHandlerContext import io.netty.handler.codec.http.* +import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder import io.netty.handler.stream.ChunkedFile import org.slf4j.LoggerFactory import java.io.File import java.io.RandomAccessFile +import java.util.concurrent.ConcurrentLinkedQueue import kotlin.collections.set @Suppress("MemberVisibilityCanBePrivate", "unused") open class NettyHttpContent( val ctx: ChannelHandlerContext, - val msg: FullHttpRequest + val request: HttpRequest ) : MutableHttpContent, NettyResponseHeaderAdapter() { + val decoder = HttpPostRequestDecoder(request) + override var requestSendFully: Boolean = false override var finished: Boolean = false override val uri: String by lazy { - var uri = msg.uri() + var uri = request.uri() while (uri.contains("//")) { uri = uri.replace("//", "/") } @@ -31,12 +35,15 @@ open class NettyHttpContent( } override val clientIp get() = ctx.channel().remoteAddress()!! override val realIp: String = super.realIp - val httpMethod: HttpMethod get() = msg.method() - val protocolVersion: HttpVersion get() = msg.protocolVersion() - val headers: HttpHeaders get() = msg.headers() - protected val paramMap by lazy { ParamParser.parse(msg) } + val httpMethod: HttpMethod get() = request.method() + val protocolVersion: HttpVersion get() = request.protocolVersion() + val headers: HttpHeaders get() = request.headers() + protected val paramMap by lazy { ParamParser.parse(request) } override val cookieMap by lazy { getHeader("Cookie")?.let { decodeCookie(it) } ?: mapOf() } - override val body = msg.content()?.let { NettyByteBuffer(it) } + + private var waitBodyHandler = ConcurrentLinkedQueue<(end: Boolean) -> Unit>() + override val body: ByteBuffer? get() = bodyList.poll()?.content()?.let { NettyByteBuffer(it) } + val bodyList = ConcurrentLinkedQueue<HttpContent>() //override val responseBody = ByteArrayOutputStream() var responseStatus: HttpResponseStatus = HttpResponseStatus.OK @@ -50,6 +57,40 @@ open class NettyHttpContent( val chunkedList = ArrayList<() -> ByteBuffer>() private var responseBodyBuf: CompositeByteBuf? = null + fun newResponseBody(httpContent: HttpContent) { + bodyList.add(httpContent) + val end = if (httpContent is LastHttpContent) { + requestSendFully = true + true + } else { + false + } + while (waitBodyHandler.isNotEmpty()) { + val handler = waitBodyHandler.poll() ?: continue + handler(end) + } + } + + override fun waitBody(action: (end: Boolean) -> Unit) { + if (!requestSendFully) { + waitBodyHandler.add(action) + } + } + + override fun addBodyParam() { + ParamParser.parse(request, bodyList.poll()!!, paramMap) + } + + override fun addBodyParam(body: ByteBuffer) { + val byteBuf = if (body is NettyByteBuffer) { + body.byteBuf + } else { + Unpooled.wrappedBuffer(body.readBuffer()) + } + ParamParser.parse(request, DefaultHttpContent(byteBuf), paramMap) + body.close() + } + fun getResponseBodyBuf(): CompositeByteBuf { if (responseBodyBuf == null) { responseBodyBuf = ctx.alloc().compositeBuffer()!! @@ -112,13 +153,15 @@ open class NettyHttpContent( override fun write(buffer: ByteBuffer) { //buffer.writeTo(responseBody) log?.trace("write {}", buffer) - getResponseBodyBuf().addComponent(if (buffer is NettyByteBuffer) { - buffer.byteBuf - } else { - val buf = Unpooled.wrappedBuffer(buffer.readBuffer()) - buffer.clear() - buf - }) + getResponseBodyBuf().addComponent( + if (buffer is NettyByteBuffer) { + buffer.byteBuf + } else { + val buf = Unpooled.wrappedBuffer(buffer.readBuffer()) + buffer.clear() + buf + } + ) } override fun reset() { diff --git a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpDecodeType.kt b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpDecodeType.kt new file mode 100644 index 0000000..76bf6dd --- /dev/null +++ b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpDecodeType.kt @@ -0,0 +1,5 @@ +package cn.tursom.web.netty + +enum class NettyHttpDecodeType { + FULL_HTTP, MULTI_PART +} \ No newline at end of file diff --git a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpHandler.kt b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpHandler.kt index bbe1434..cdb24eb 100644 --- a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpHandler.kt +++ b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpHandler.kt @@ -12,6 +12,10 @@ import org.slf4j.LoggerFactory class NettyHttpHandler( private val handler: HttpHandler<NettyHttpContent, NettyExceptionContent> ) : SimpleChannelInboundHandler<FullHttpRequest>() { + override fun channelActive(ctx: ChannelHandlerContext) { + handler.newRequest() + super.channelActive(ctx) + } override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) { val handlerContext = NettyHttpContent(ctx, msg) diff --git a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpObjectHandler.kt b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpObjectHandler.kt new file mode 100644 index 0000000..cf8d592 --- /dev/null +++ b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpObjectHandler.kt @@ -0,0 +1,35 @@ +package cn.tursom.web.netty + +import cn.tursom.web.HttpHandler +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.SimpleChannelInboundHandler +import io.netty.handler.codec.http.HttpContent +import io.netty.handler.codec.http.HttpObject +import io.netty.handler.codec.http.HttpRequest +import io.netty.handler.codec.http.LastHttpContent +import io.netty.util.AttributeKey + +class NettyHttpObjectHandler( + private val handler: HttpHandler<NettyHttpContent, NettyExceptionContent> +) : SimpleChannelInboundHandler<HttpObject>() { + companion object { + private val context = AttributeKey.newInstance<NettyHttpContent>("NettyHttpContent") + } + + override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) { + when (msg) { + is HttpRequest -> { + val newHandlerContext = NettyHttpContent(ctx, msg) + ctx.channel().attr(context).set(newHandlerContext) + handler(newHandlerContext) + } + is HttpContent -> { + val content = ctx.channel().attr(context).get() + content.newResponseBody(msg) + } + else -> { + ctx.fireChannelRead(msg) + } + } + } +} \ No newline at end of file diff --git a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpServer.kt b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpServer.kt index 225548f..1cd79d8 100644 --- a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpServer.kt +++ b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpServer.kt @@ -7,9 +7,11 @@ import io.netty.bootstrap.ServerBootstrap import io.netty.channel.ChannelFuture import io.netty.channel.ChannelInitializer import io.netty.channel.ChannelOption +import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel +import io.netty.handler.codec.http.HttpObject import io.netty.handler.codec.http.HttpObjectAggregator import io.netty.handler.codec.http.HttpServerCodec import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler @@ -22,54 +24,71 @@ import org.slf4j.LoggerFactory class NettyHttpServer( override val port: Int, handler: HttpHandler<NettyHttpContent, NettyExceptionContent>, - bodySize: Int = 512 * 1024, + var bodySize: Int = 512 * 1024, autoRun: Boolean = false, - webSocketPath: Iterable<Pair<String, WebSocketHandler<NettyWebSocketContext>>> = listOf(), - readTimeout: Int? = null, - writeTimeout: Int? = null + var webSocketPath: Iterable<Pair<String, WebSocketHandler<NettyWebSocketContent>>> = listOf(), + var readTimeout: Int? = null, + var writeTimeout: Int? = null, + decodeType: NettyHttpDecodeType = NettyHttpDecodeType.MULTI_PART, + backlog: Int = 1024 ) : HttpServer { constructor( port: Int, bodySize: Int = 512 * 1024, autoRun: Boolean = false, - webSocketPath: Iterable<Pair<String, WebSocketHandler<NettyWebSocketContext>>> = listOf(), + webSocketPath: Iterable<Pair<String, WebSocketHandler<NettyWebSocketContent>>> = listOf(), readTimeout: Int? = null, writeTimeout: Int? = null, + decodeType: NettyHttpDecodeType = NettyHttpDecodeType.MULTI_PART, handler: (content: NettyHttpContent) -> Unit ) : this( port, object : HttpHandler<NettyHttpContent, NettyExceptionContent> { override fun handle(content: NettyHttpContent) = handler(content) }, - bodySize, autoRun, webSocketPath, readTimeout, writeTimeout + bodySize, autoRun, webSocketPath, readTimeout, writeTimeout, decodeType ) - val httpHandler = NettyHttpHandler(handler) + + var decodeType: NettyHttpDecodeType = decodeType + set(value) { + if (value != field) { + field = value + updateHandler() + } + } + var handler: HttpHandler<NettyHttpContent, NettyExceptionContent> = handler + set(value) { + field = value + updateHandler() + } + + private var httpHandler = updateHandler() private val group = NioEventLoopGroup() private val b = ServerBootstrap().group(group) .channel(NioServerSocketChannel::class.java) .childHandler(object : ChannelInitializer<SocketChannel>() { override fun initChannel(ch: SocketChannel) { - ch.pipeline() - .apply { - if (readTimeout != null) addLast(ReadTimeoutHandler(readTimeout)) - } - .apply { - if (writeTimeout != null) addLast(WriteTimeoutHandler(writeTimeout)) - } - .addLast("codec", HttpServerCodec()) - .addLast("aggregator", HttpObjectAggregator(bodySize)) - .addLast("http-chunked", ChunkedWriteHandler()) - .apply { - webSocketPath.forEach { (webSocketPath, handler) -> - addLast("ws-$webSocketPath", WebSocketServerProtocolHandler(webSocketPath)) - addLast("wsHandler-$webSocketPath", NettyWebSocketHandler(ch, handler)) - } - } - .addLast("handle", httpHandler) + val pipeline = ch.pipeline() + readTimeout?.let { + pipeline.addLast(ReadTimeoutHandler(it)) + } + writeTimeout?.let { + pipeline.addLast(WriteTimeoutHandler(it)) + } + pipeline.addLast("codec", HttpServerCodec()) + if (this@NettyHttpServer.decodeType == NettyHttpDecodeType.FULL_HTTP) { + pipeline.addLast("aggregator", HttpObjectAggregator(bodySize)) + } + pipeline.addLast("http-chunked", ChunkedWriteHandler()) + this@NettyHttpServer.webSocketPath.forEach { (webSocketPath, handler) -> + pipeline.addLast("ws-$webSocketPath", WebSocketServerProtocolHandler(webSocketPath)) + pipeline.addLast("wsHandler-$webSocketPath", NettyWebSocketHandler(ch, handler)) + } + pipeline.addLast("handle", httpHandler) } }) - .option(ChannelOption.SO_BACKLOG, 1024) // determining the number of connections queued + .option(ChannelOption.SO_BACKLOG, backlog) // determining the number of connections queued .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE) private val future: ChannelFuture = b.bind(port) @@ -90,6 +109,14 @@ class NettyHttpServer( future.channel().close() } + private fun updateHandler(): SimpleChannelInboundHandler<out HttpObject> { + httpHandler = when (decodeType) { + NettyHttpDecodeType.FULL_HTTP -> NettyHttpHandler(handler) + NettyHttpDecodeType.MULTI_PART -> NettyHttpObjectHandler(handler) + } + return httpHandler + } + companion object { private val log = try { LoggerFactory.getLogger(NettyHttpServer::class.java) diff --git a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyWebSocketContext.kt b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyWebSocketContent.kt similarity index 91% rename from web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyWebSocketContext.kt rename to web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyWebSocketContent.kt index 9c568ac..e177f04 100644 --- a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyWebSocketContext.kt +++ b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyWebSocketContent.kt @@ -3,15 +3,15 @@ package cn.tursom.web.netty import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.read import cn.tursom.utils.bytebuffer.NettyByteBuffer -import cn.tursom.web.WebSocketContext +import cn.tursom.web.WebSocketContent import io.netty.buffer.Unpooled import io.netty.channel.Channel import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame import io.netty.handler.codec.http.websocketx.TextWebSocketFrame -class NettyWebSocketContext( +class NettyWebSocketContent( private val channel: Channel -) : WebSocketContext { +) : WebSocketContent { override fun writeText(buffer: ByteBuffer) { if (buffer is NettyByteBuffer) { channel.writeAndFlush(TextWebSocketFrame(buffer.byteBuf)) diff --git a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyWebSocketHandler.kt b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyWebSocketHandler.kt index 08620c1..b44d3b2 100644 --- a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyWebSocketHandler.kt +++ b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyWebSocketHandler.kt @@ -11,9 +11,9 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame class NettyWebSocketHandler( channel: Channel, - private val handler: WebSocketHandler<NettyWebSocketContext> + private val handler: WebSocketHandler<NettyWebSocketContent> ) : SimpleChannelInboundHandler<WebSocketFrame>() { - private val webSocketContext = NettyWebSocketContext(channel) + private val webSocketContext = NettyWebSocketContent(channel) override fun channelRead0(ctx: ChannelHandlerContext, msg: WebSocketFrame) { when (msg) { diff --git a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/ParamParser.kt b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/ParamParser.kt index f68c926..6deafaa 100644 --- a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/ParamParser.kt +++ b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/ParamParser.kt @@ -1,8 +1,6 @@ package cn.tursom.web.netty -import io.netty.handler.codec.http.FullHttpRequest -import io.netty.handler.codec.http.HttpMethod -import io.netty.handler.codec.http.QueryStringDecoder +import io.netty.handler.codec.http.* import io.netty.handler.codec.http.multipart.Attribute import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder import java.util.* @@ -13,38 +11,45 @@ import kotlin.collections.set * HTTP请求参数解析器, 支持GET, POST */ object ParamParser { - fun parse(fullReq: FullHttpRequest): HashMap<String, List<String>> { - val method = fullReq.method() - + fun parse(req: HttpRequest): MutableMap<String, List<String>> { val paramMap = HashMap<String, List<String>>() - - when (method) { + when (req.method()) { HttpMethod.GET -> try { // 是GET请求 - val decoder = QueryStringDecoder(fullReq.uri()) + val decoder = QueryStringDecoder(req.uri()) decoder.parameters().entries.forEach { entry -> paramMap[entry.key] = entry.value } } catch (e: Exception) { } - HttpMethod.POST -> try { + HttpMethod.POST -> if (req is HttpContent) { // 是POST请求 - val decoder = HttpPostRequestDecoder(fullReq) - decoder.offer(fullReq) - - val paramList = decoder.bodyHttpDatas - - for (param in paramList) { - val data = param as Attribute - if (!paramMap.containsKey(data.name)) { - paramMap[data.name] = ArrayList() - } - (paramMap[data.name] as ArrayList).add(data.value) - } - } catch (e: Exception) { + parse(req, req, paramMap) } } + return paramMap + } + fun parse( + req: HttpRequest, + body: HttpContent, + paramMap: MutableMap<String, List<String>> + ): MutableMap<String, List<String>> { + try { + val decoder = HttpPostRequestDecoder(req) + decoder.offer(body) + + val paramList = decoder.bodyHttpDatas + + for (param in paramList) { + val data = param as Attribute + if (!paramMap.containsKey(data.name)) { + paramMap[data.name] = ArrayList() + } + (paramMap[data.name] as ArrayList).add(data.value) + } + } catch (e: Exception) { + } return paramMap } } \ No newline at end of file diff --git a/web/src/main/kotlin/cn/tursom/web/HttpContent.kt b/web/src/main/kotlin/cn/tursom/web/HttpContent.kt index c1d7dff..a404b75 100644 --- a/web/src/main/kotlin/cn/tursom/web/HttpContent.kt +++ b/web/src/main/kotlin/cn/tursom/web/HttpContent.kt @@ -9,6 +9,7 @@ import java.io.RandomAccessFile import java.net.SocketAddress interface HttpContent : ResponseHeaderAdapter, RequestHeaderAdapter { + val requestSendFully: Boolean val finished: Boolean val uri: String var responseCode: Int @@ -21,6 +22,12 @@ interface HttpContent : ResponseHeaderAdapter, RequestHeaderAdapter { str.substring(1, str.indexOf(':').let { if (it < 1) str.length else it - 1 }) } + fun waitBody(action: (end: Boolean) -> Unit = { addBodyParam() }) + fun addBodyParam(body: ByteBuffer) + fun addBodyParam() { + addBodyParam(body ?: return) + } + fun getParam(param: String): String? = getParams(param)?.firstOrNull() fun getParams(): Map<String, List<String>> fun getParams(param: String): List<String>? diff --git a/web/src/main/kotlin/cn/tursom/web/HttpHandler.kt b/web/src/main/kotlin/cn/tursom/web/HttpHandler.kt index f29d744..1e4ef80 100644 --- a/web/src/main/kotlin/cn/tursom/web/HttpHandler.kt +++ b/web/src/main/kotlin/cn/tursom/web/HttpHandler.kt @@ -1,6 +1,7 @@ package cn.tursom.web interface HttpHandler<in T : HttpContent, in E : ExceptionContent> { + fun newRequest() {} fun handle(content: T) fun exceptionCause(e: E) { diff --git a/web/src/main/kotlin/cn/tursom/web/WebSocketContext.kt b/web/src/main/kotlin/cn/tursom/web/WebSocketContent.kt similarity index 94% rename from web/src/main/kotlin/cn/tursom/web/WebSocketContext.kt rename to web/src/main/kotlin/cn/tursom/web/WebSocketContent.kt index d8716aa..8f25adb 100644 --- a/web/src/main/kotlin/cn/tursom/web/WebSocketContext.kt +++ b/web/src/main/kotlin/cn/tursom/web/WebSocketContent.kt @@ -4,7 +4,7 @@ import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.impl.HeapByteBuffer import java.nio.charset.Charset -interface WebSocketContext { +interface WebSocketContent { fun writeText(buffer: ByteBuffer) fun writeText(bytes: ByteArray) = writeText(HeapByteBuffer(bytes)) fun writeText(str: String, charset: Charset = Charsets.UTF_8) = writeText(str.toByteArray(charset)) diff --git a/web/src/main/kotlin/cn/tursom/web/WebSocketHandler.kt b/web/src/main/kotlin/cn/tursom/web/WebSocketHandler.kt index 617541f..2405f68 100644 --- a/web/src/main/kotlin/cn/tursom/web/WebSocketHandler.kt +++ b/web/src/main/kotlin/cn/tursom/web/WebSocketHandler.kt @@ -2,7 +2,7 @@ package cn.tursom.web import cn.tursom.core.buffer.ByteBuffer -interface WebSocketHandler<in T : WebSocketContext> { +interface WebSocketHandler<in T : WebSocketContent> { fun recvText(str: String, context: T) fun recvText(byteBuffer: ByteBuffer, context: T) = recvText(byteBuffer.getString(), context) diff --git a/web/src/main/kotlin/cn/tursom/web/utils/EmptyHttpContent.kt b/web/src/main/kotlin/cn/tursom/web/utils/EmptyHttpContent.kt index e9b4fa3..a0ccb37 100644 --- a/web/src/main/kotlin/cn/tursom/web/utils/EmptyHttpContent.kt +++ b/web/src/main/kotlin/cn/tursom/web/utils/EmptyHttpContent.kt @@ -16,7 +16,8 @@ class EmptyHttpContent( override val body: ByteBuffer? = null, override val clientIp: SocketAddress = InetSocketAddress(0), override val method: String = "GET", - override val cookieMap: Map<String, String> = mapOf() + override val cookieMap: Map<String, String> = mapOf(), + override val requestSendFully: Boolean ) : HttpContent { override fun getHeader(header: String): String? = null override fun getHeaders(header: String): List<String> = listOf() @@ -43,5 +44,7 @@ class EmptyHttpContent( override fun finishChunked(chunked: Chunked) {} override fun finishFile(file: File, chunkSize: Int) {} override fun finishFile(file: RandomAccessFile, offset: Long, length: Long, chunkSize: Int) {} + override fun addBodyParam(body: ByteBuffer) {} + override fun waitBody(action: (end: Boolean) -> Unit) {} }