添加对Http Multipart的支持

This commit is contained in:
tursom 2020-05-20 00:03:12 +08:00
parent 810a116e19
commit 8675eccd33
13 changed files with 200 additions and 70 deletions

View File

@ -10,20 +10,24 @@ import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.*
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder
import io.netty.handler.stream.ChunkedFile
import org.slf4j.LoggerFactory
import java.io.File
import java.io.RandomAccessFile
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.collections.set
@Suppress("MemberVisibilityCanBePrivate", "unused")
open class NettyHttpContent(
val ctx: ChannelHandlerContext,
val msg: FullHttpRequest
val request: HttpRequest
) : MutableHttpContent, NettyResponseHeaderAdapter() {
val decoder = HttpPostRequestDecoder(request)
override var requestSendFully: Boolean = false
override var finished: Boolean = false
override val uri: String by lazy {
var uri = msg.uri()
var uri = request.uri()
while (uri.contains("//")) {
uri = uri.replace("//", "/")
}
@ -31,12 +35,15 @@ open class NettyHttpContent(
}
override val clientIp get() = ctx.channel().remoteAddress()!!
override val realIp: String = super.realIp
val httpMethod: HttpMethod get() = msg.method()
val protocolVersion: HttpVersion get() = msg.protocolVersion()
val headers: HttpHeaders get() = msg.headers()
protected val paramMap by lazy { ParamParser.parse(msg) }
val httpMethod: HttpMethod get() = request.method()
val protocolVersion: HttpVersion get() = request.protocolVersion()
val headers: HttpHeaders get() = request.headers()
protected val paramMap by lazy { ParamParser.parse(request) }
override val cookieMap by lazy { getHeader("Cookie")?.let { decodeCookie(it) } ?: mapOf() }
override val body = msg.content()?.let { NettyByteBuffer(it) }
private var waitBodyHandler = ConcurrentLinkedQueue<(end: Boolean) -> Unit>()
override val body: ByteBuffer? get() = bodyList.poll()?.content()?.let { NettyByteBuffer(it) }
val bodyList = ConcurrentLinkedQueue<HttpContent>()
//override val responseBody = ByteArrayOutputStream()
var responseStatus: HttpResponseStatus = HttpResponseStatus.OK
@ -50,6 +57,40 @@ open class NettyHttpContent(
val chunkedList = ArrayList<() -> ByteBuffer>()
private var responseBodyBuf: CompositeByteBuf? = null
fun newResponseBody(httpContent: HttpContent) {
bodyList.add(httpContent)
val end = if (httpContent is LastHttpContent) {
requestSendFully = true
true
} else {
false
}
while (waitBodyHandler.isNotEmpty()) {
val handler = waitBodyHandler.poll() ?: continue
handler(end)
}
}
override fun waitBody(action: (end: Boolean) -> Unit) {
if (!requestSendFully) {
waitBodyHandler.add(action)
}
}
override fun addBodyParam() {
ParamParser.parse(request, bodyList.poll()!!, paramMap)
}
override fun addBodyParam(body: ByteBuffer) {
val byteBuf = if (body is NettyByteBuffer) {
body.byteBuf
} else {
Unpooled.wrappedBuffer(body.readBuffer())
}
ParamParser.parse(request, DefaultHttpContent(byteBuf), paramMap)
body.close()
}
fun getResponseBodyBuf(): CompositeByteBuf {
if (responseBodyBuf == null) {
responseBodyBuf = ctx.alloc().compositeBuffer()!!
@ -112,13 +153,15 @@ open class NettyHttpContent(
override fun write(buffer: ByteBuffer) {
//buffer.writeTo(responseBody)
log?.trace("write {}", buffer)
getResponseBodyBuf().addComponent(if (buffer is NettyByteBuffer) {
buffer.byteBuf
} else {
val buf = Unpooled.wrappedBuffer(buffer.readBuffer())
buffer.clear()
buf
})
getResponseBodyBuf().addComponent(
if (buffer is NettyByteBuffer) {
buffer.byteBuf
} else {
val buf = Unpooled.wrappedBuffer(buffer.readBuffer())
buffer.clear()
buf
}
)
}
override fun reset() {

View File

@ -0,0 +1,5 @@
package cn.tursom.web.netty
enum class NettyHttpDecodeType {
FULL_HTTP, MULTI_PART
}

View File

@ -12,6 +12,10 @@ import org.slf4j.LoggerFactory
class NettyHttpHandler(
private val handler: HttpHandler<NettyHttpContent, NettyExceptionContent>
) : SimpleChannelInboundHandler<FullHttpRequest>() {
override fun channelActive(ctx: ChannelHandlerContext) {
handler.newRequest()
super.channelActive(ctx)
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) {
val handlerContext = NettyHttpContent(ctx, msg)

View File

@ -0,0 +1,35 @@
package cn.tursom.web.netty
import cn.tursom.web.HttpHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpObject
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.LastHttpContent
import io.netty.util.AttributeKey
class NettyHttpObjectHandler(
private val handler: HttpHandler<NettyHttpContent, NettyExceptionContent>
) : SimpleChannelInboundHandler<HttpObject>() {
companion object {
private val context = AttributeKey.newInstance<NettyHttpContent>("NettyHttpContent")
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) {
when (msg) {
is HttpRequest -> {
val newHandlerContext = NettyHttpContent(ctx, msg)
ctx.channel().attr(context).set(newHandlerContext)
handler(newHandlerContext)
}
is HttpContent -> {
val content = ctx.channel().attr(context).get()
content.newResponseBody(msg)
}
else -> {
ctx.fireChannelRead(msg)
}
}
}
}

View File

@ -7,9 +7,11 @@ import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.http.HttpObject
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler
@ -22,54 +24,71 @@ import org.slf4j.LoggerFactory
class NettyHttpServer(
override val port: Int,
handler: HttpHandler<NettyHttpContent, NettyExceptionContent>,
bodySize: Int = 512 * 1024,
var bodySize: Int = 512 * 1024,
autoRun: Boolean = false,
webSocketPath: Iterable<Pair<String, WebSocketHandler<NettyWebSocketContext>>> = listOf(),
readTimeout: Int? = null,
writeTimeout: Int? = null
var webSocketPath: Iterable<Pair<String, WebSocketHandler<NettyWebSocketContent>>> = listOf(),
var readTimeout: Int? = null,
var writeTimeout: Int? = null,
decodeType: NettyHttpDecodeType = NettyHttpDecodeType.MULTI_PART,
backlog: Int = 1024
) : HttpServer {
constructor(
port: Int,
bodySize: Int = 512 * 1024,
autoRun: Boolean = false,
webSocketPath: Iterable<Pair<String, WebSocketHandler<NettyWebSocketContext>>> = listOf(),
webSocketPath: Iterable<Pair<String, WebSocketHandler<NettyWebSocketContent>>> = listOf(),
readTimeout: Int? = null,
writeTimeout: Int? = null,
decodeType: NettyHttpDecodeType = NettyHttpDecodeType.MULTI_PART,
handler: (content: NettyHttpContent) -> Unit
) : this(
port,
object : HttpHandler<NettyHttpContent, NettyExceptionContent> {
override fun handle(content: NettyHttpContent) = handler(content)
},
bodySize, autoRun, webSocketPath, readTimeout, writeTimeout
bodySize, autoRun, webSocketPath, readTimeout, writeTimeout, decodeType
)
val httpHandler = NettyHttpHandler(handler)
var decodeType: NettyHttpDecodeType = decodeType
set(value) {
if (value != field) {
field = value
updateHandler()
}
}
var handler: HttpHandler<NettyHttpContent, NettyExceptionContent> = handler
set(value) {
field = value
updateHandler()
}
private var httpHandler = updateHandler()
private val group = NioEventLoopGroup()
private val b = ServerBootstrap().group(group)
.channel(NioServerSocketChannel::class.java)
.childHandler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(ch: SocketChannel) {
ch.pipeline()
.apply {
if (readTimeout != null) addLast(ReadTimeoutHandler(readTimeout))
}
.apply {
if (writeTimeout != null) addLast(WriteTimeoutHandler(writeTimeout))
}
.addLast("codec", HttpServerCodec())
.addLast("aggregator", HttpObjectAggregator(bodySize))
.addLast("http-chunked", ChunkedWriteHandler())
.apply {
webSocketPath.forEach { (webSocketPath, handler) ->
addLast("ws-$webSocketPath", WebSocketServerProtocolHandler(webSocketPath))
addLast("wsHandler-$webSocketPath", NettyWebSocketHandler(ch, handler))
}
}
.addLast("handle", httpHandler)
val pipeline = ch.pipeline()
readTimeout?.let {
pipeline.addLast(ReadTimeoutHandler(it))
}
writeTimeout?.let {
pipeline.addLast(WriteTimeoutHandler(it))
}
pipeline.addLast("codec", HttpServerCodec())
if (this@NettyHttpServer.decodeType == NettyHttpDecodeType.FULL_HTTP) {
pipeline.addLast("aggregator", HttpObjectAggregator(bodySize))
}
pipeline.addLast("http-chunked", ChunkedWriteHandler())
this@NettyHttpServer.webSocketPath.forEach { (webSocketPath, handler) ->
pipeline.addLast("ws-$webSocketPath", WebSocketServerProtocolHandler(webSocketPath))
pipeline.addLast("wsHandler-$webSocketPath", NettyWebSocketHandler(ch, handler))
}
pipeline.addLast("handle", httpHandler)
}
})
.option(ChannelOption.SO_BACKLOG, 1024) // determining the number of connections queued
.option(ChannelOption.SO_BACKLOG, backlog) // determining the number of connections queued
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
private val future: ChannelFuture = b.bind(port)
@ -90,6 +109,14 @@ class NettyHttpServer(
future.channel().close()
}
private fun updateHandler(): SimpleChannelInboundHandler<out HttpObject> {
httpHandler = when (decodeType) {
NettyHttpDecodeType.FULL_HTTP -> NettyHttpHandler(handler)
NettyHttpDecodeType.MULTI_PART -> NettyHttpObjectHandler(handler)
}
return httpHandler
}
companion object {
private val log = try {
LoggerFactory.getLogger(NettyHttpServer::class.java)

View File

@ -3,15 +3,15 @@ package cn.tursom.web.netty
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.read
import cn.tursom.utils.bytebuffer.NettyByteBuffer
import cn.tursom.web.WebSocketContext
import cn.tursom.web.WebSocketContent
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
class NettyWebSocketContext(
class NettyWebSocketContent(
private val channel: Channel
) : WebSocketContext {
) : WebSocketContent {
override fun writeText(buffer: ByteBuffer) {
if (buffer is NettyByteBuffer) {
channel.writeAndFlush(TextWebSocketFrame(buffer.byteBuf))

View File

@ -11,9 +11,9 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame
class NettyWebSocketHandler(
channel: Channel,
private val handler: WebSocketHandler<NettyWebSocketContext>
private val handler: WebSocketHandler<NettyWebSocketContent>
) : SimpleChannelInboundHandler<WebSocketFrame>() {
private val webSocketContext = NettyWebSocketContext(channel)
private val webSocketContext = NettyWebSocketContent(channel)
override fun channelRead0(ctx: ChannelHandlerContext, msg: WebSocketFrame) {
when (msg) {

View File

@ -1,8 +1,6 @@
package cn.tursom.web.netty
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.QueryStringDecoder
import io.netty.handler.codec.http.*
import io.netty.handler.codec.http.multipart.Attribute
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder
import java.util.*
@ -13,38 +11,45 @@ import kotlin.collections.set
* HTTP请求参数解析器, 支持GET, POST
*/
object ParamParser {
fun parse(fullReq: FullHttpRequest): HashMap<String, List<String>> {
val method = fullReq.method()
fun parse(req: HttpRequest): MutableMap<String, List<String>> {
val paramMap = HashMap<String, List<String>>()
when (method) {
when (req.method()) {
HttpMethod.GET -> try {
// 是GET请求
val decoder = QueryStringDecoder(fullReq.uri())
val decoder = QueryStringDecoder(req.uri())
decoder.parameters().entries.forEach { entry ->
paramMap[entry.key] = entry.value
}
} catch (e: Exception) {
}
HttpMethod.POST -> try {
HttpMethod.POST -> if (req is HttpContent) {
// 是POST请求
val decoder = HttpPostRequestDecoder(fullReq)
decoder.offer(fullReq)
val paramList = decoder.bodyHttpDatas
for (param in paramList) {
val data = param as Attribute
if (!paramMap.containsKey(data.name)) {
paramMap[data.name] = ArrayList()
}
(paramMap[data.name] as ArrayList).add(data.value)
}
} catch (e: Exception) {
parse(req, req, paramMap)
}
}
return paramMap
}
fun parse(
req: HttpRequest,
body: HttpContent,
paramMap: MutableMap<String, List<String>>
): MutableMap<String, List<String>> {
try {
val decoder = HttpPostRequestDecoder(req)
decoder.offer(body)
val paramList = decoder.bodyHttpDatas
for (param in paramList) {
val data = param as Attribute
if (!paramMap.containsKey(data.name)) {
paramMap[data.name] = ArrayList()
}
(paramMap[data.name] as ArrayList).add(data.value)
}
} catch (e: Exception) {
}
return paramMap
}
}

View File

@ -9,6 +9,7 @@ import java.io.RandomAccessFile
import java.net.SocketAddress
interface HttpContent : ResponseHeaderAdapter, RequestHeaderAdapter {
val requestSendFully: Boolean
val finished: Boolean
val uri: String
var responseCode: Int
@ -21,6 +22,12 @@ interface HttpContent : ResponseHeaderAdapter, RequestHeaderAdapter {
str.substring(1, str.indexOf(':').let { if (it < 1) str.length else it - 1 })
}
fun waitBody(action: (end: Boolean) -> Unit = { addBodyParam() })
fun addBodyParam(body: ByteBuffer)
fun addBodyParam() {
addBodyParam(body ?: return)
}
fun getParam(param: String): String? = getParams(param)?.firstOrNull()
fun getParams(): Map<String, List<String>>
fun getParams(param: String): List<String>?

View File

@ -1,6 +1,7 @@
package cn.tursom.web
interface HttpHandler<in T : HttpContent, in E : ExceptionContent> {
fun newRequest() {}
fun handle(content: T)
fun exceptionCause(e: E) {

View File

@ -4,7 +4,7 @@ import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer
import java.nio.charset.Charset
interface WebSocketContext {
interface WebSocketContent {
fun writeText(buffer: ByteBuffer)
fun writeText(bytes: ByteArray) = writeText(HeapByteBuffer(bytes))
fun writeText(str: String, charset: Charset = Charsets.UTF_8) = writeText(str.toByteArray(charset))

View File

@ -2,7 +2,7 @@ package cn.tursom.web
import cn.tursom.core.buffer.ByteBuffer
interface WebSocketHandler<in T : WebSocketContext> {
interface WebSocketHandler<in T : WebSocketContent> {
fun recvText(str: String, context: T)
fun recvText(byteBuffer: ByteBuffer, context: T) = recvText(byteBuffer.getString(), context)

View File

@ -16,7 +16,8 @@ class EmptyHttpContent(
override val body: ByteBuffer? = null,
override val clientIp: SocketAddress = InetSocketAddress(0),
override val method: String = "GET",
override val cookieMap: Map<String, String> = mapOf()
override val cookieMap: Map<String, String> = mapOf(),
override val requestSendFully: Boolean
) : HttpContent {
override fun getHeader(header: String): String? = null
override fun getHeaders(header: String): List<String> = listOf()
@ -43,5 +44,7 @@ class EmptyHttpContent(
override fun finishChunked(chunked: Chunked) {}
override fun finishFile(file: File, chunkSize: Int) {}
override fun finishFile(file: RandomAccessFile, offset: Long, length: Long, chunkSize: Int) {}
override fun addBodyParam(body: ByteBuffer) {}
override fun waitBody(action: (end: Boolean) -> Unit) {}
}