对项目进行进一步拆分

This commit is contained in:
tursom 2019-10-08 18:51:38 +08:00
parent 4262983af4
commit 43c5ce13ed
11 changed files with 681 additions and 0 deletions

View File

@ -0,0 +1,61 @@
package cn.tursom.utils.cache
import cn.tursom.database.async.AsyncSqlAdapter
import cn.tursom.database.async.AsyncSqlHelper
import cn.tursom.database.clauses.clause
import cn.tursom.utils.background
import cn.tursom.utils.cache.interfaces.AsyncPotableCacheMap
import java.util.logging.Level
import java.util.logging.Logger
@Suppress("CanBeParameter", "MemberVisibilityCanBePrivate", "SpellCheckingInspection")
class AsyncSqlStringCacheMap(
val db: AsyncSqlHelper,
val timeout: Long,
val table: String,
val updateDelay: Long = 60 * 1000,
val prevCacheMap: AsyncPotableCacheMap<String, String> = DefaultAsyncPotableCacheMap(timeout),
val logger: Logger? = null
) : AsyncPotableCacheMap<String, String> by prevCacheMap {
override suspend fun get(key: String): String? {
return prevCacheMap.get(key) ?: try {
val storage = db.select(AsyncSqlAdapter(StorageData::class.java), where = clause { !StorageData::key equal !key }, maxCount = 1, table = table)
if (storage.isNotEmpty() && storage[0].cacheTime + timeout > System.currentTimeMillis()) {
val value = storage[0].value
set(key, value)
value
} else null
} catch (e: Exception) {
e.printStackTrace()
null
}
}
override suspend fun get(key: String, constructor: suspend () -> String): String {
val memCache = get(key)
return if (memCache != null) memCache
else {
val newValue = constructor()
this.set(key, newValue)
newValue
}
}
override suspend fun set(key: String, value: String): String? {
prevCacheMap.set(key, value)
background { updateStorage(key, value) }
return value
}
suspend fun updateStorage(key: String, value: String) {
try {
val updated = db.replace(table, StorageData(key, value))
logger?.log(Level.INFO, "AsyncSqlStringCacheMap update $updated coloums: $key")
} catch (e: Exception) {
System.err.println("AsyncSqlStringCacheMap cause an Exception on update cn.tusom.database")
e.printStackTrace()
}
}
}

View File

@ -0,0 +1,5 @@
dependencies {
implementation project(":")
implementation project(":web")
implementation group: "io.netty", name: "netty-all", version: "4.1.33.Final"
}

View File

@ -0,0 +1,20 @@
package cn.tursom.web.netty
import cn.tursom.web.HttpContent
import cn.tursom.web.utils.Cookie
import io.netty.handler.codec.DateFormatter
import io.netty.handler.codec.http.cookie.ServerCookieDecoder
import java.util.*
import kotlin.collections.HashMap
fun HttpContent.parseHttpDate(date: CharSequence, start: Int = 0, end: Int = date.length): Date = DateFormatter.parseHttpDate(date, start, end)
fun HttpContent.format(date: Date): String = DateFormatter.format(date)
fun HttpContent.append(date: Date, sb: StringBuilder): StringBuilder = DateFormatter.append(date, sb)
fun HttpContent.decodeCookie(cookie: String): Map<String, Cookie> {
val cookieMap = HashMap<String, Cookie>()
ServerCookieDecoder.STRICT.decode(cookie).forEach {
cookieMap[it.name()] = Cookie(it.name(), it.value(), it.domain(), it.path(), it.maxAge())
}
return cookieMap
}

View File

@ -0,0 +1,143 @@
package cn.tursom.web.netty
import cn.tursom.core.bytebuffer.AdvanceByteBuffer
import io.netty.buffer.ByteBuf
import java.io.OutputStream
import java.nio.ByteBuffer
class NettyAdvanceByteBuffer(val byteBuf: ByteBuf) : AdvanceByteBuffer {
override val hasArray: Boolean get() = byteBuf.hasArray()
override val readOnly: Boolean get() = byteBuf.isReadOnly
override val nioBuffer: ByteBuffer
get() = if (readMode) byteBuf.nioBuffer()
else byteBuf.nioBuffer(writePosition, limit)
override val bufferCount: Int get() = byteBuf.nioBufferCount()
override val nioBuffers: Array<out ByteBuffer> get() = byteBuf.nioBuffers()
override var writePosition: Int
get() = byteBuf.writerIndex()
set(value) {
byteBuf.writerIndex(value)
}
override var limit: Int
get() = byteBuf.capacity()
set(value) {
byteBuf.capacity(value)
}
override val capacity get() = byteBuf.maxCapacity()
override val array: ByteArray get() = byteBuf.array()
override val arrayOffset: Int get() = byteBuf.arrayOffset()
override var readPosition: Int
get() = byteBuf.readerIndex()
set(value) {
byteBuf.readerIndex(value)
}
override val readOffset: Int get() = byteBuf.arrayOffset() + byteBuf.readerIndex()
override val readableSize: Int
get() = byteBuf.readableBytes()
override val available: Int get() = readableSize
override val writeOffset: Int get() = writePosition
override val writeableSize: Int
get() = limit - writePosition
override val size: Int get() = capacity
override var readMode: Boolean = false
override fun readMode() {
readMode = true
}
override fun resumeWriteMode(usedSize: Int) {
readPosition += usedSize
readMode = false
}
override fun needReadSize(size: Int) {
if (size > limit) {
if (size < capacity) byteBuf.capacity(size)
else throw IndexOutOfBoundsException()
}
}
override fun clear() {
byteBuf.clear()
}
override fun reset() {
byteBuf.discardReadBytes()
}
override fun reset(outputStream: OutputStream) {
byteBuf.readBytes(outputStream, readableSize)
byteBuf.clear()
}
override fun get(): Byte = byteBuf.readByte()
override fun getChar(): Char = byteBuf.readChar()
override fun getShort(): Short = byteBuf.readShort()
override fun getInt(): Int = byteBuf.readInt()
override fun getLong(): Long = byteBuf.readLong()
override fun getFloat(): Float = byteBuf.readFloat()
override fun getDouble(): Double = byteBuf.readDouble()
override fun getBytes(): ByteArray {
val bytes = ByteArray(byteBuf.readableBytes())
byteBuf.readBytes(bytes)
return bytes
}
override fun getString(size: Int): String {
val str = byteBuf.toString(readPosition, size, Charsets.UTF_8)
readPosition += size
return str
}
override fun writeTo(buffer: ByteArray, bufferOffset: Int, size: Int): Int {
byteBuf.readBytes(buffer, bufferOffset, size)
return size
}
override fun writeTo(os: OutputStream): Int {
val size = readableSize
byteBuf.readBytes(os, size)
reset()
return size
}
override fun put(byte: Byte) {
byteBuf.writeByte(byte.toInt())
}
override fun put(char: Char) {
byteBuf.writeChar(char.toInt())
}
override fun put(short: Short) {
byteBuf.writeShort(short.toInt())
}
override fun put(int: Int) {
byteBuf.writeInt(int)
}
override fun put(long: Long) {
byteBuf.writeLong(long)
}
override fun put(float: Float) {
byteBuf.writeFloat(float)
}
override fun put(double: Double) {
byteBuf.writeDouble(double)
}
override fun put(str: String) {
byteBuf.writeCharSequence(str, Charsets.UTF_8)
}
override fun put(byteArray: ByteArray, startIndex: Int, endIndex: Int) {
byteBuf.writeBytes(byteArray, startIndex, endIndex - startIndex)
}
}

View File

@ -0,0 +1,39 @@
package cn.tursom.web.netty
import cn.tursom.core.bytebuffer.AdvanceByteBuffer
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.stream.ChunkedInput
class NettyChunkedByteBuffer(val bufList: List<AdvanceByteBuffer>) : ChunkedInput<ByteBuf> {
constructor(vararg bufList: AdvanceByteBuffer) : this(bufList.asList())
var iterator = bufList.iterator()
var progress: Long = 0
val length = run {
var len = 0L
bufList.forEach {
len += it.readableSize
}
len
}
override fun progress(): Long = progress
override fun length(): Long = length
override fun isEndOfInput(): Boolean = !iterator.hasNext()
override fun readChunk(ctx: ChannelHandlerContext?): ByteBuf = readChunk()
override fun readChunk(allocator: ByteBufAllocator?): ByteBuf = readChunk()
private fun readChunk(): ByteBuf {
val next = iterator.next()
progress += next.readableSize
return if (next is NettyAdvanceByteBuffer) next.byteBuf
else Unpooled.wrappedBuffer(next.nioBuffer)
}
override fun close() {}
}

View File

@ -0,0 +1,28 @@
package cn.tursom.web.netty
import cn.tursom.web.utils.Chunked
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.stream.ChunkedInput
class NettyChunkedInput(val chunked: Chunked) : ChunkedInput<ByteBuf> {
override fun progress(): Long = chunked.progress
override fun length() = chunked.length
override fun isEndOfInput(): Boolean = chunked.endOfInput
override fun readChunk(ctx: ChannelHandlerContext?): ByteBuf {
val buf = chunked.readChunk()
return if (buf is NettyAdvanceByteBuffer) buf.byteBuf
else Unpooled.wrappedBuffer(buf.nioBuffer)
}
override fun readChunk(allocator: ByteBufAllocator?): ByteBuf {
val buf = chunked.readChunk()
return if (buf is NettyAdvanceByteBuffer) buf.byteBuf
else Unpooled.wrappedBuffer(buf.nioBuffer)
}
override fun close() = chunked.close()
}

View File

@ -0,0 +1,30 @@
package cn.tursom.web.netty
import cn.tursom.core.bytebuffer.AdvanceByteBuffer
import cn.tursom.web.ExceptionContent
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelHandlerContext
class NettyExceptionContent(
val ctx: ChannelHandlerContext,
override val cause: Throwable
) : ExceptionContent {
override fun write(message: String) {
ctx.write(Unpooled.wrappedBuffer(message.toByteArray()))
}
override fun write(bytes: ByteArray, offset: Int, length: Int) {
ctx.write(Unpooled.wrappedBuffer(bytes, offset, length))
}
override fun write(buffer: AdvanceByteBuffer) {
when (buffer) {
is NettyAdvanceByteBuffer -> ctx.write(buffer.byteBuf)
else -> write(buffer.getBytes())
}
}
override fun finish() {
ctx.flush()
}
}

View File

@ -0,0 +1,200 @@
package cn.tursom.web.netty
import cn.tursom.core.buf
import cn.tursom.core.bytebuffer.AdvanceByteBuffer
import cn.tursom.web.AdvanceHttpContent
import cn.tursom.web.utils.Chunked
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.*
import io.netty.handler.stream.ChunkedFile
import java.io.ByteArrayOutputStream
import java.io.File
import java.io.RandomAccessFile
import java.util.*
import kotlin.collections.ArrayList
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.set
@Suppress("MemberVisibilityCanBePrivate", "unused")
open class NettyHttpContent(
val ctx: ChannelHandlerContext,
val msg: FullHttpRequest
) : AdvanceHttpContent {
override val uri: String by lazy {
var uri = msg.uri()
while (uri.contains("//")) {
uri = uri.replace("//", "/")
}
uri
}
override val clientIp get() = ctx.channel().remoteAddress()!!
override val realIp: String = super.realIp
val httpMethod: HttpMethod get() = msg.method()
val protocolVersion: HttpVersion get() = msg.protocolVersion()
val headers: HttpHeaders get() = msg.headers()
protected val paramMap by lazy { RequestParser.parse(msg) }
override val cookieMap by lazy { getHeader("Cookie")?.let { decodeCookie(it) } ?: mapOf() }
override val body = msg.content()?.let { NettyAdvanceByteBuffer(it) }
val responseMap = HashMap<String, Any>()
val responseListMap = HashMap<String, ArrayList<Any>>()
override val responseBody = ByteArrayOutputStream()
override var responseCode: Int = 200
override var responseMessage: String? = null
override val method: String get() = httpMethod.name()
val chunkedList = ArrayList<AdvanceByteBuffer>()
override fun getHeader(header: String): String? {
return headers[header]
}
override fun getHeaders(): List<Map.Entry<String, String>> {
return headers.toList()
}
override fun getParam(param: String): String? {
return paramMap[param]?.get(0)
}
override fun getParams(): Map<String, List<String>> {
return paramMap
}
override fun getParams(param: String): List<String>? {
return paramMap[param]
}
override fun addParam(key: String, value: String) {
if (!paramMap.containsKey(key)) {
paramMap[key] = ArrayList()
}
(paramMap[key] as ArrayList).add(value)
}
override fun setResponseHeader(name: String, value: Any) {
responseMap[name] = value
}
override fun addResponseHeader(name: String, value: Any) {
val list = responseListMap[name] ?: run {
val newList = ArrayList<Any>()
responseListMap[name] = newList
newList
}
list.add(value)
}
override fun write(message: String) {
responseBody.write(message.toByteArray())
}
override fun write(byte: Byte) {
responseBody.write(byte.toInt())
}
override fun write(bytes: ByteArray, offset: Int, size: Int) {
responseBody.write(bytes, offset, size)
}
override fun write(buffer: AdvanceByteBuffer) {
buffer.writeTo(responseBody)
}
override fun reset() {
responseBody.reset()
}
override fun finish() {
finish(responseBody.buf, 0, responseBody.size())
}
override fun finish(buffer: ByteArray, offset: Int, size: Int) {
finish(Unpooled.wrappedBuffer(buffer, offset, size))
}
override fun finish(buffer: AdvanceByteBuffer) {
if (buffer is NettyAdvanceByteBuffer) {
finish(buffer.byteBuf)
} else {
super.finish(buffer)
}
}
fun finish(buf: ByteBuf) = finish(buf, HttpResponseStatus.valueOf(responseCode))
fun finish(buf: ByteBuf, responseCode: HttpResponseStatus) {
val response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, responseCode, buf)
finish(response)
}
fun finish(response: FullHttpResponse) {
val heads = response.headers()
addHeaders(
heads, mapOf(
HttpHeaderNames.CONTENT_TYPE to "${HttpHeaderValues.TEXT_PLAIN}; charset=UTF-8",
HttpHeaderNames.CONTENT_LENGTH to response.content().readableBytes(),
HttpHeaderNames.CONNECTION to HttpHeaderValues.KEEP_ALIVE
)
)
ctx.writeAndFlush(response)
}
fun addHeaders(heads: HttpHeaders, defaultHeaders: Map<out CharSequence, Any>) {
responseListMap.forEach { (t, u) ->
u.forEach {
heads.add(t, it)
}
}
defaultHeaders.forEach { (t, u) ->
heads.set(t, u)
}
responseMap.forEach { (t, u) ->
heads.set(t, u)
}
}
override fun writeChunkedHeader() {
val response = DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
response.status = if (responseMessage != null) HttpResponseStatus(responseCode, responseMessage)
else HttpResponseStatus.valueOf(responseCode)
val heads = response.headers()
addHeaders(
heads, mapOf(
HttpHeaderNames.CONTENT_TYPE to "${HttpHeaderValues.TEXT_PLAIN}; charset=UTF-8",
HttpHeaderNames.CONNECTION to HttpHeaderValues.KEEP_ALIVE,
HttpHeaderNames.TRANSFER_ENCODING to "chunked"
)
)
ctx.write(response)
}
override fun addChunked(buffer: AdvanceByteBuffer) {
chunkedList.add(buffer)
}
override fun finishChunked() {
val httpChunkWriter = HttpChunkedInput(NettyChunkedByteBuffer(chunkedList))
ctx.writeAndFlush(httpChunkWriter)
}
override fun finishChunked(chunked: Chunked) {
val httpChunkWriter = HttpChunkedInput(NettyChunkedInput(chunked))
ctx.writeAndFlush(httpChunkWriter)
}
override fun finishFile(file: File, chunkSize: Int) {
writeChunkedHeader()
ctx.writeAndFlush(HttpChunkedInput(ChunkedFile(file, chunkSize)))
}
override fun finishFile(file: RandomAccessFile, offset: Long, length: Long, chunkSize: Int) {
writeChunkedHeader()
ctx.writeAndFlush(HttpChunkedInput(ChunkedFile(file, offset, length, chunkSize)))
}
}

View File

@ -0,0 +1,32 @@
package cn.tursom.web.netty
import cn.tursom.web.HttpHandler
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.FullHttpRequest
@ChannelHandler.Sharable
class NettyHttpHandler(
private val handler: HttpHandler<NettyHttpContent, NettyExceptionContent>
) : SimpleChannelInboundHandler<FullHttpRequest>() {
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) {
val handlerContext = NettyHttpContent(ctx, msg)
try {
handler.handle(handlerContext)
} catch (e: Throwable) {
handlerContext.write("${e.javaClass}: ${e.message}")
}
}
override fun channelReadComplete(ctx: ChannelHandlerContext) {
super.channelReadComplete(ctx)
ctx.flush()
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable?) {
if (cause != null) handler.exception(NettyExceptionContent(ctx, cause))
ctx.close()
}
}

View File

@ -0,0 +1,75 @@
package cn.tursom.web.netty
import cn.tursom.web.HttpHandler
import cn.tursom.web.HttpServer
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpRequestDecoder
import io.netty.handler.codec.http.HttpResponseEncoder
import io.netty.handler.stream.ChunkedWriteHandler
class NettyHttpServer(
override val port: Int,
handler: HttpHandler<NettyHttpContent, NettyExceptionContent>,
bodySize: Int = 512 * 1024,
autoRun: Boolean = false
) : HttpServer {
constructor(
port: Int,
bodySize: Int = 512 * 1024,
autoRun: Boolean = false,
handler: (content: NettyHttpContent) -> Unit
) : this(
port,
object : HttpHandler<NettyHttpContent, NettyExceptionContent> {
override fun handle(content: NettyHttpContent) {
handler(content)
}
override fun exception(e: NettyExceptionContent) {
e.cause.printStackTrace()
}
},
bodySize,
autoRun
)
val httpHandler = NettyHttpHandler(handler)
private val group = NioEventLoopGroup()
private val b = ServerBootstrap().group(group)
.channel(NioServerSocketChannel::class.java)
.childHandler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(ch: SocketChannel) {
ch.pipeline()
.addLast("decoder", HttpRequestDecoder())
.addLast("encoder", HttpResponseEncoder())
.addLast("aggregator", HttpObjectAggregator(bodySize))
.addLast("http-chunked", ChunkedWriteHandler())
.addLast("handle", httpHandler)
}
})
.option(ChannelOption.SO_BACKLOG, 1024) // determining the number of connections queued
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
private lateinit var future: ChannelFuture
init {
if (autoRun) run()
}
override fun run() {
future = b.bind(port)
future.sync()
}
override fun close() {
future.cancel(false)
future.channel().close()
}
}

View File

@ -0,0 +1,48 @@
package cn.tursom.web.netty
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.QueryStringDecoder
import io.netty.handler.codec.http.multipart.Attribute
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder
import java.util.HashMap
/**
* HTTP请求参数解析器, 支持GET, POST
*/
object RequestParser {
fun parse(fullReq: FullHttpRequest): HashMap<String, List<String>> {
val method = fullReq.method()
val paramMap = HashMap<String, List<String>>()
when {
HttpMethod.GET === method -> try {
// 是GET请求
val decoder = QueryStringDecoder(fullReq.uri())
decoder.parameters().entries.forEach { entry ->
paramMap[entry.key] = entry.value
}
} catch (e: Exception) {
}
HttpMethod.POST === method -> try {
// 是POST请求
val decoder = HttpPostRequestDecoder(fullReq)
decoder.offer(fullReq)
val paramList = decoder.bodyHttpDatas
for (param in paramList) {
val data = param as Attribute
if (!paramMap.containsKey(data.name)) {
paramMap[data.name] = ArrayList()
}
(paramMap[data.name] as ArrayList).add(data.value)
}
} catch (e: Exception) {
}
}
return paramMap
}
}