From 395866dc06f01e6876b251378eafccb758fdad35 Mon Sep 17 00:00:00 2001 From: tursom Date: Mon, 9 Mar 2020 23:33:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0StorageHandler?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kotlin/cn/tursom/mongodb/MongoOperator.kt | 14 ++- .../kotlin/cn/tursom/mongodb/MongoTemplate.kt | 10 ++- .../core/ThreadLocalSimpleDateFormat.kt | 2 +- src/main/kotlin/cn/tursom/core/Tools.kt | 6 +- .../core/storage/BufferedStorageHandler.kt | 63 ++++++++++++++ .../core/storage/InstantStorageHandler.kt | 16 ++++ .../cn/tursom/core/storage/StorageHandler.kt | 10 +++ .../core/storage/ThreadPoolStorageHandler.kt | 22 +++++ .../src/main/kotlin/cn/tursom/mail/read.kt | 9 +- .../cn/tursom/utils/GsonDataTypeAdaptor.kt | 86 +++++++++++++++++++ .../src/main/kotlin/cn/tursom/utils/Tools.kt | 11 ++- .../kotlin/cn/tursom/ws/WebSocketClient.kt | 4 +- .../ws/WebSocketClientChannelHandler.kt | 8 +- .../cn/tursom/web/router/RoutedHttpHandler.kt | 40 +++------ .../web/router/AsyncRoutedHttpHandler.kt | 40 +++------ 15 files changed, 264 insertions(+), 77 deletions(-) create mode 100644 src/main/kotlin/cn/tursom/core/storage/BufferedStorageHandler.kt create mode 100644 src/main/kotlin/cn/tursom/core/storage/InstantStorageHandler.kt create mode 100644 src/main/kotlin/cn/tursom/core/storage/StorageHandler.kt create mode 100644 src/main/kotlin/cn/tursom/core/storage/ThreadPoolStorageHandler.kt create mode 100644 utils/src/main/kotlin/cn/tursom/utils/GsonDataTypeAdaptor.kt diff --git a/database/mongodb/src/main/kotlin/cn/tursom/mongodb/MongoOperator.kt b/database/mongodb/src/main/kotlin/cn/tursom/mongodb/MongoOperator.kt index 019fc0f..50244b8 100644 --- a/database/mongodb/src/main/kotlin/cn/tursom/mongodb/MongoOperator.kt +++ b/database/mongodb/src/main/kotlin/cn/tursom/mongodb/MongoOperator.kt @@ -6,6 +6,7 @@ import com.mongodb.client.AggregateIterable import com.mongodb.client.FindIterable import com.mongodb.client.MongoCollection import com.mongodb.client.MongoDatabase +import com.mongodb.client.model.DeleteOptions import com.mongodb.client.model.InsertManyOptions import com.mongodb.client.model.InsertOneOptions import com.mongodb.client.model.UpdateOptions @@ -111,7 +112,16 @@ class MongoOperator( fun aggregate(vararg pipeline: Bson): AggregateIterable = aggregate(pipeline.asList()) - private fun convertToBson(entity: Any): Document { + fun delete(entity: T, options: DeleteOptions = DeleteOptions()) { + deleteOne(convertToBson(entity), options) + } + + fun saveIfNotExists(entity: T) { + val document = convertToBson(entity) + upsert(document, document) + } + + fun convertToBson(entity: Any): Document { val bson = Document() fields.forEach { MongoUtil.injectValue(bson, it.get(entity) ?: return@forEach, it) @@ -119,7 +129,7 @@ class MongoOperator( return bson } - private fun convertToEntity(bson: Document): T { + fun convertToEntity(bson: Document): T { val entity = try { clazz.newInstance() } catch (e: Exception) { diff --git a/database/mongodb/src/main/kotlin/cn/tursom/mongodb/MongoTemplate.kt b/database/mongodb/src/main/kotlin/cn/tursom/mongodb/MongoTemplate.kt index 98276bc..f6763e0 100644 --- a/database/mongodb/src/main/kotlin/cn/tursom/mongodb/MongoTemplate.kt +++ b/database/mongodb/src/main/kotlin/cn/tursom/mongodb/MongoTemplate.kt @@ -15,12 +15,16 @@ class MongoTemplate( val host: String = "127.0.0.1", val port: Int = 27017 ) { - val client = MongoClient(host, port) - - val db: MongoDatabase = client.getDatabase(db) + var client = MongoClient(host, port) + var db: MongoDatabase = client.getDatabase(db) private val operatorMap = ConcurrentHashMap, MongoOperator<*>>() + fun connect() { + client = MongoClient(host, port) + db = client.getDatabase(db.name) + } + fun save(entity: T, options: InsertOneOptions = InsertOneOptions()) { getCollection(entity.javaClass).save(entity, options) } diff --git a/src/main/kotlin/cn/tursom/core/ThreadLocalSimpleDateFormat.kt b/src/main/kotlin/cn/tursom/core/ThreadLocalSimpleDateFormat.kt index d1fef67..9e3b20e 100644 --- a/src/main/kotlin/cn/tursom/core/ThreadLocalSimpleDateFormat.kt +++ b/src/main/kotlin/cn/tursom/core/ThreadLocalSimpleDateFormat.kt @@ -12,7 +12,7 @@ class ThreadLocalSimpleDateFormat(val format: String = "YYYY-MM-dd'T'HH:mm:ssZZ" companion object { val iso8601 = ThreadLocalSimpleDateFormat() - val standard = ThreadLocalSimpleDateFormat("YYYY-MM-dd HH:mm:ssZZ") + val standard = ThreadLocalSimpleDateFormat("YYYY-MM-dd HH:mm:ss") val simp = ThreadLocalSimpleDateFormat("YY-MM-dd HH:mm:ss") val cn = ThreadLocalSimpleDateFormat("YYYY'年'MM'月'dd'日' HH'时'mm'分'ss'秒'") } diff --git a/src/main/kotlin/cn/tursom/core/Tools.kt b/src/main/kotlin/cn/tursom/core/Tools.kt index 6f49291..63c590c 100644 --- a/src/main/kotlin/cn/tursom/core/Tools.kt +++ b/src/main/kotlin/cn/tursom/core/Tools.kt @@ -28,14 +28,14 @@ fun printNonDaemonThread() { currentGroup.enumerate(lstThreads) lstThreads.excludeNull().forEach { t -> if (!t.isDaemon) { - println("${System.currentTimeMillis()}: ${t.name}") + log(t.name) } } println() } -fun log(log: String) = println("${System.currentTimeMillis()}: $log") -fun logE(log: String) = System.err.println("${System.currentTimeMillis()}: $log") +fun log(log: String) = println("${ThreadLocalSimpleDateFormat.standard.format(System.currentTimeMillis())}: $log") +fun logE(log: String) = System.err.println("${ThreadLocalSimpleDateFormat.standard.format(System.currentTimeMillis())}: $log") val String.urlDecode: String get() = URLDecoder.decode(this, "utf-8") val String.urlEncode: String get() = URLEncoder.encode(this, "utf-8") diff --git a/src/main/kotlin/cn/tursom/core/storage/BufferedStorageHandler.kt b/src/main/kotlin/cn/tursom/core/storage/BufferedStorageHandler.kt new file mode 100644 index 0000000..fc05d16 --- /dev/null +++ b/src/main/kotlin/cn/tursom/core/storage/BufferedStorageHandler.kt @@ -0,0 +1,63 @@ +package cn.tursom.core.storage + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.Executor +import java.util.concurrent.atomic.AtomicBoolean + +/** + * 缓冲数据集中写入 + * 自动选择合适时间执行批量写入操作 + * 工作流程: + * - 插入: + * A添加对象写入->加入缓冲队列->第一个加入缓冲队列的向线程池添加写缓冲任务 + * - 写缓冲任务: + * 保存现有列表->等待一定时间,使后续请求能够继续写入缓冲->计时到时刷新缓冲并写入旧缓冲 + * 通过 AtomicBoolean onWrite 控制同一时刻只有一个线程在等待缓冲 + */ +class BufferedStorageHandler( + // 任务执行线程池 + private val executor: Executor, + // 最小缓冲时间 + private val minBufTime: Long = 500, + private val singleThreadWrite: Boolean = true, + // 数据批量写入处理器 + private val writeHandler: (list: Collection) -> Unit +) : StorageHandler { + private val onWrite = AtomicBoolean(false) + @Volatile + private var msgList = ConcurrentLinkedQueue() + + private val write = object : Runnable { + override fun run() { + val list = msgList + Thread.sleep(minBufTime) + msgList = ConcurrentLinkedQueue() + // 可能还有未释放 msgList 对象的线程,要稍微等待一下 + Thread.sleep(1) + if (singleThreadWrite) { + try { + writeHandler(list) + } finally { + if (msgList.isNotEmpty()) { + executor.execute(this) + } else { + onWrite.set(false) + } + } + } else { + onWrite.set(false) + writeHandler(list) + } + } + } + + /** + * 向缓冲中添加一个写入对象 + */ + override fun add(obj: T) { + msgList.add(obj) + if (onWrite.compareAndSet(false, true)) { + executor.execute(write) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/storage/InstantStorageHandler.kt b/src/main/kotlin/cn/tursom/core/storage/InstantStorageHandler.kt new file mode 100644 index 0000000..4e3cad9 --- /dev/null +++ b/src/main/kotlin/cn/tursom/core/storage/InstantStorageHandler.kt @@ -0,0 +1,16 @@ +package cn.tursom.core.storage + +/** + * 即时写入存储处理器 + */ +class InstantStorageHandler( + // 数据批量写入处理器 + private val writeHandler: (obj: T) -> Unit +) : StorageHandler { + /** + * 向缓冲中添加一个写入对象 + */ + override fun add(obj: T) { + writeHandler(obj) + } +} \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/storage/StorageHandler.kt b/src/main/kotlin/cn/tursom/core/storage/StorageHandler.kt new file mode 100644 index 0000000..4544785 --- /dev/null +++ b/src/main/kotlin/cn/tursom/core/storage/StorageHandler.kt @@ -0,0 +1,10 @@ +package cn.tursom.core.storage + +interface StorageHandler : (T) -> Unit { + /** + * 向存储服务中写入一个对象 + */ + fun add(obj: T) + + override fun invoke(obj: T) = add(obj) +} \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/storage/ThreadPoolStorageHandler.kt b/src/main/kotlin/cn/tursom/core/storage/ThreadPoolStorageHandler.kt new file mode 100644 index 0000000..dde392a --- /dev/null +++ b/src/main/kotlin/cn/tursom/core/storage/ThreadPoolStorageHandler.kt @@ -0,0 +1,22 @@ +package cn.tursom.core.storage + +import java.util.concurrent.Executor + +/** + * 使用线程池进行写入的存储处理器 + */ +class ThreadPoolStorageHandler( + // 任务执行线程池 + private val executor: Executor, + // 数据批量写入处理器 + private val writeHandler: (obj: T) -> Unit +) : StorageHandler { + /** + * 向缓冲中添加一个写入对象 + */ + override fun add(obj: T) { + executor.execute { + writeHandler(obj) + } + } +} diff --git a/utils/mail/src/main/kotlin/cn/tursom/mail/read.kt b/utils/mail/src/main/kotlin/cn/tursom/mail/read.kt index f7a5841..d204194 100644 --- a/utils/mail/src/main/kotlin/cn/tursom/mail/read.kt +++ b/utils/mail/src/main/kotlin/cn/tursom/mail/read.kt @@ -19,12 +19,14 @@ fun addMailListener( folder: Folder, freq: Long, timeUnit: TimeUnit, listener: MessageCountListener, - newFolder: () -> Folder? = { null } + newFolder: () -> Folder? = { null }, + onHeartBeat: Folder. () -> Unit = {} ): ScheduledFuture<*> { folder.addMessageCountListener(listener) var mailFOlder = folder return threadPool.scheduleAtFixedRate({ try { + mailFOlder.onHeartBeat() mailFOlder.messageCount } catch (e: FolderClosedException) { mailFOlder = newFolder() ?: throw e @@ -39,8 +41,9 @@ fun addMailListener( fun addMailListener( newFolder: () -> Folder, freq: Long, timeUnit: TimeUnit, - listener: MessageCountListener -): ScheduledFuture<*> = addMailListener(newFolder(), freq, timeUnit, listener, newFolder) + listener: MessageCountListener, + onHeartBeat: Folder. () -> Unit = {} +): ScheduledFuture<*> = addMailListener(newFolder(), freq, timeUnit, listener, newFolder, onHeartBeat) fun getStore(host: String, port: Int, account: String, password: String): Store { val props = System.getProperties() diff --git a/utils/src/main/kotlin/cn/tursom/utils/GsonDataTypeAdaptor.kt b/utils/src/main/kotlin/cn/tursom/utils/GsonDataTypeAdaptor.kt new file mode 100644 index 0000000..82ccca6 --- /dev/null +++ b/utils/src/main/kotlin/cn/tursom/utils/GsonDataTypeAdaptor.kt @@ -0,0 +1,86 @@ +package cn.tursom.utils + +import cn.tursom.core.cast +import com.google.gson.Gson +import com.google.gson.GsonBuilder +import com.google.gson.TypeAdapter +import com.google.gson.TypeAdapterFactory +import com.google.gson.internal.LinkedTreeMap +import com.google.gson.reflect.TypeToken +import com.google.gson.stream.JsonReader +import com.google.gson.stream.JsonToken +import com.google.gson.stream.JsonWriter + +class GsonDataTypeAdaptor internal constructor(private val gson: Gson) : TypeAdapter?>() { + override fun write(out: JsonWriter, value: Map?) { + if (value == null) { + out.nullValue() + return + } + out.beginObject() + value.forEach { (t, u) -> + out.name(t) + System.err.println(u) + gson.getAdapter(Any::class.java).write(out, u) + } + out.endObject() + } + + override fun read(`in`: JsonReader): Map = readInternal(`in`).cast() + + private fun readInternal(`in`: JsonReader): Any? { + return when (`in`.peek()) { + JsonToken.BEGIN_ARRAY -> { + val list: MutableList = ArrayList() + `in`.beginArray() + while (`in`.hasNext()) { + list.add(readInternal(`in`)) + } + `in`.endArray() + list + } + JsonToken.BEGIN_OBJECT -> { + val map: MutableMap = LinkedTreeMap() + `in`.beginObject() + while (`in`.hasNext()) { + map[`in`.nextName()] = readInternal(`in`) + } + `in`.endObject() + map + } + JsonToken.STRING -> `in`.nextString() + JsonToken.NUMBER -> { + //将其作为一个字符串读取出来 + val numberStr: String = `in`.nextString() + //返回的numberStr不会为null + if (numberStr.contains(".") || numberStr.contains("e") + || numberStr.contains("E") + ) { + numberStr.toDouble() + } else numberStr.toLong() + } + JsonToken.BOOLEAN -> `in`.nextBoolean() + JsonToken.NULL -> { + `in`.nextNull() + null + } + else -> throw IllegalStateException() + } + } + + companion object { + val FACTORY: TypeAdapterFactory = object : + TypeAdapterFactory { + override fun create(gson: Gson, type: TypeToken): TypeAdapter? { + return if (type.rawType == Map::class.java) { + GsonDataTypeAdaptor(gson).cast() + } else null + } + } + + val gson = GsonBuilder() + .registerTypeAdapterFactory(FACTORY) + .create() + } + +} \ No newline at end of file diff --git a/utils/src/main/kotlin/cn/tursom/utils/Tools.kt b/utils/src/main/kotlin/cn/tursom/utils/Tools.kt index 3a15b71..a51a970 100644 --- a/utils/src/main/kotlin/cn/tursom/utils/Tools.kt +++ b/utils/src/main/kotlin/cn/tursom/utils/Tools.kt @@ -1,6 +1,7 @@ package cn.tursom.utils import com.google.gson.Gson +import com.google.gson.GsonBuilder import kotlinx.coroutines.* import java.util.concurrent.Executor import kotlin.coroutines.resume @@ -8,7 +9,15 @@ import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine @Suppress("unused", "SpellCheckingInspection") -val gson = Gson() +val gson =GsonBuilder() + .registerTypeAdapterFactory(GsonDataTypeAdaptor.FACTORY) + .create() + +@Suppress("unused", "SpellCheckingInspection") +val prettyGson = GsonBuilder() + .registerTypeAdapterFactory(GsonDataTypeAdaptor.FACTORY) + .setPrettyPrinting() + .create() inline fun Gson.fromJson(json: String) = this.fromJson(json, T::class.java)!! diff --git a/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClient.kt b/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClient.kt index 5dffc8b..85058b8 100644 --- a/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClient.kt +++ b/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClient.kt @@ -23,7 +23,7 @@ import java.net.URI class WebSocketClient(uri: String, val handler: WebSocketHandler) { private val uri: URI = URI.create(uri) - private var ch: Channel? = null + internal var ch: Channel? = null fun open() { close() @@ -72,7 +72,7 @@ class WebSocketClient(uri: String, val handler: WebSocketHandler) { handler) } }) - ch = b.connect(uri.host, port).sync().channel() + b.connect(uri.host, port) //handler.handshakeFuture().sync() } diff --git a/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClientChannelHandler.kt b/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClientChannelHandler.kt index 1593add..c4e1fe2 100644 --- a/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClientChannelHandler.kt +++ b/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClientChannelHandler.kt @@ -12,7 +12,6 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker import io.netty.util.CharsetUtil - class WebSocketClientChannelHandler( private val handshaker: WebSocketClientHandshaker, val client: WebSocketClient, @@ -29,16 +28,21 @@ class WebSocketClientChannelHandler( } override fun channelActive(ctx: ChannelHandlerContext) { + client.ch = ctx.channel() handshaker.handshake(ctx.channel()) } override fun channelInactive(ctx: ChannelHandlerContext) { handler.onClose(client) + if (client.ch == ctx.channel()) { + client.ch = null + } } override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) { val ch = ctx.channel() - if (!handshaker.isHandshakeComplete) { // web socket client connected + if (!handshaker.isHandshakeComplete) { + // web socket client connected handshaker.finishHandshake(ch, msg as FullHttpResponse) handshakeFuture!!.setSuccess() handler.onOpen(client) diff --git a/web/src/main/kotlin/cn/tursom/web/router/RoutedHttpHandler.kt b/web/src/main/kotlin/cn/tursom/web/router/RoutedHttpHandler.kt index b686357..c8a5f58 100644 --- a/web/src/main/kotlin/cn/tursom/web/router/RoutedHttpHandler.kt +++ b/web/src/main/kotlin/cn/tursom/web/router/RoutedHttpHandler.kt @@ -223,36 +223,16 @@ open class RoutedHttpHandler( } protected fun getRoutes(annotation: Annotation) = when (annotation) { - is Mapping -> { - annotation.route to getRouter(annotation.method.let { if (it.isEmpty()) annotation.methodEnum.method else it }) - } - is GetMapping -> { - annotation.route to getRouter("GET") - } - is PostMapping -> { - annotation.route to getRouter("POST") - } - is PutMapping -> { - annotation.route to getRouter("PUT") - } - is DeleteMapping -> { - annotation.route to getRouter("DELETE") - } - is PatchMapping -> { - annotation.route to getRouter("PATCH") - } - is TraceMapping -> { - annotation.route to getRouter("TRACE") - } - is HeadMapping -> { - annotation.route to getRouter("HEAD") - } - is OptionsMapping -> { - annotation.route to getRouter("OPTIONS") - } - is ConnectMapping -> { - annotation.route to getRouter("CONNECT") - } + is Mapping -> annotation.route to getRouter(annotation.method.let { if (it.isEmpty()) annotation.methodEnum.method else it }) + is GetMapping -> annotation.route to getRouter("GET") + is PostMapping -> annotation.route to getRouter("POST") + is PutMapping -> annotation.route to getRouter("PUT") + is DeleteMapping -> annotation.route to getRouter("DELETE") + is PatchMapping -> annotation.route to getRouter("PATCH") + is TraceMapping -> annotation.route to getRouter("TRACE") + is HeadMapping -> annotation.route to getRouter("HEAD") + is OptionsMapping -> annotation.route to getRouter("OPTIONS") + is ConnectMapping -> annotation.route to getRouter("CONNECT") else -> null } diff --git a/web/web-coroutine/src/main/kotlin/cn/tursom/web/router/AsyncRoutedHttpHandler.kt b/web/web-coroutine/src/main/kotlin/cn/tursom/web/router/AsyncRoutedHttpHandler.kt index ee7f3ee..7512d8d 100644 --- a/web/web-coroutine/src/main/kotlin/cn/tursom/web/router/AsyncRoutedHttpHandler.kt +++ b/web/web-coroutine/src/main/kotlin/cn/tursom/web/router/AsyncRoutedHttpHandler.kt @@ -150,36 +150,16 @@ open class AsyncRoutedHttpHandler( protected fun getAsyncRoutes(annotation: Annotation) = when (annotation) { - is Mapping -> { - annotation.route to getAsyncRouter(annotation.method.let { if (it.isEmpty()) annotation.methodEnum.method else it }) - } - is GetMapping -> { - annotation.route to getAsyncRouter("GET") - } - is PostMapping -> { - annotation.route to getAsyncRouter("POST") - } - is PutMapping -> { - annotation.route to getAsyncRouter("PUT") - } - is DeleteMapping -> { - annotation.route to getAsyncRouter("DELETE") - } - is PatchMapping -> { - annotation.route to getAsyncRouter("PATCH") - } - is TraceMapping -> { - annotation.route to getAsyncRouter("TRACE") - } - is HeadMapping -> { - annotation.route to getAsyncRouter("HEAD") - } - is OptionsMapping -> { - annotation.route to getAsyncRouter("OPTIONS") - } - is ConnectMapping -> { - annotation.route to getAsyncRouter("CONNECT") - } + is Mapping -> annotation.route to getAsyncRouter(annotation.method.let { if (it.isEmpty()) annotation.methodEnum.method else it }) + is GetMapping -> annotation.route to getAsyncRouter("GET") + is PostMapping -> annotation.route to getAsyncRouter("POST") + is PutMapping -> annotation.route to getAsyncRouter("PUT") + is DeleteMapping -> annotation.route to getAsyncRouter("DELETE") + is PatchMapping -> annotation.route to getAsyncRouter("PATCH") + is TraceMapping -> annotation.route to getAsyncRouter("TRACE") + is HeadMapping -> annotation.route to getAsyncRouter("HEAD") + is OptionsMapping -> annotation.route to getAsyncRouter("OPTIONS") + is ConnectMapping -> annotation.route to getAsyncRouter("CONNECT") else -> null }