添加StorageHandler

This commit is contained in:
tursom 2020-03-09 23:33:28 +08:00
parent e4ed7896dc
commit 395866dc06
15 changed files with 264 additions and 77 deletions

View File

@ -6,6 +6,7 @@ import com.mongodb.client.AggregateIterable
import com.mongodb.client.FindIterable
import com.mongodb.client.MongoCollection
import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.DeleteOptions
import com.mongodb.client.model.InsertManyOptions
import com.mongodb.client.model.InsertOneOptions
import com.mongodb.client.model.UpdateOptions
@ -111,7 +112,16 @@ class MongoOperator<T : Any>(
fun aggregate(vararg pipeline: Bson): AggregateIterable<Document> = aggregate(pipeline.asList())
private fun convertToBson(entity: Any): Document {
fun delete(entity: T, options: DeleteOptions = DeleteOptions()) {
deleteOne(convertToBson(entity), options)
}
fun saveIfNotExists(entity: T) {
val document = convertToBson(entity)
upsert(document, document)
}
fun convertToBson(entity: Any): Document {
val bson = Document()
fields.forEach {
MongoUtil.injectValue(bson, it.get(entity) ?: return@forEach, it)
@ -119,7 +129,7 @@ class MongoOperator<T : Any>(
return bson
}
private fun convertToEntity(bson: Document): T {
fun convertToEntity(bson: Document): T {
val entity = try {
clazz.newInstance()
} catch (e: Exception) {

View File

@ -15,12 +15,16 @@ class MongoTemplate(
val host: String = "127.0.0.1",
val port: Int = 27017
) {
val client = MongoClient(host, port)
val db: MongoDatabase = client.getDatabase(db)
var client = MongoClient(host, port)
var db: MongoDatabase = client.getDatabase(db)
private val operatorMap = ConcurrentHashMap<Class<*>, MongoOperator<*>>()
fun connect() {
client = MongoClient(host, port)
db = client.getDatabase(db.name)
}
fun <T : Any> save(entity: T, options: InsertOneOptions = InsertOneOptions()) {
getCollection(entity.javaClass).save(entity, options)
}

View File

@ -12,7 +12,7 @@ class ThreadLocalSimpleDateFormat(val format: String = "YYYY-MM-dd'T'HH:mm:ssZZ"
companion object {
val iso8601 = ThreadLocalSimpleDateFormat()
val standard = ThreadLocalSimpleDateFormat("YYYY-MM-dd HH:mm:ssZZ")
val standard = ThreadLocalSimpleDateFormat("YYYY-MM-dd HH:mm:ss")
val simp = ThreadLocalSimpleDateFormat("YY-MM-dd HH:mm:ss")
val cn = ThreadLocalSimpleDateFormat("YYYY'年'MM'月'dd'日' HH'时'mm'分'ss'秒'")
}

View File

@ -28,14 +28,14 @@ fun printNonDaemonThread() {
currentGroup.enumerate(lstThreads)
lstThreads.excludeNull().forEach { t ->
if (!t.isDaemon) {
println("${System.currentTimeMillis()}: ${t.name}")
log(t.name)
}
}
println()
}
fun log(log: String) = println("${System.currentTimeMillis()}: $log")
fun logE(log: String) = System.err.println("${System.currentTimeMillis()}: $log")
fun log(log: String) = println("${ThreadLocalSimpleDateFormat.standard.format(System.currentTimeMillis())}: $log")
fun logE(log: String) = System.err.println("${ThreadLocalSimpleDateFormat.standard.format(System.currentTimeMillis())}: $log")
val String.urlDecode: String get() = URLDecoder.decode(this, "utf-8")
val String.urlEncode: String get() = URLEncoder.encode(this, "utf-8")

View File

@ -0,0 +1,63 @@
package cn.tursom.core.storage
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicBoolean
/**
* 缓冲数据集中写入
* 自动选择合适时间执行批量写入操作
* 工作流程
* - 插入
* A添加对象写入->加入缓冲队列->第一个加入缓冲队列的向线程池添加写缓冲任务
* - 写缓冲任务:
* 保存现有列表->等待一定时间使后续请求能够继续写入缓冲->计时到时刷新缓冲并写入旧缓冲
* 通过 AtomicBoolean onWrite 控制同一时刻只有一个线程在等待缓冲
*/
class BufferedStorageHandler<T>(
// 任务执行线程池
private val executor: Executor,
// 最小缓冲时间
private val minBufTime: Long = 500,
private val singleThreadWrite: Boolean = true,
// 数据批量写入处理器
private val writeHandler: (list: Collection<T>) -> Unit
) : StorageHandler<T> {
private val onWrite = AtomicBoolean(false)
@Volatile
private var msgList = ConcurrentLinkedQueue<T>()
private val write = object : Runnable {
override fun run() {
val list = msgList
Thread.sleep(minBufTime)
msgList = ConcurrentLinkedQueue()
// 可能还有未释放 msgList 对象的线程,要稍微等待一下
Thread.sleep(1)
if (singleThreadWrite) {
try {
writeHandler(list)
} finally {
if (msgList.isNotEmpty()) {
executor.execute(this)
} else {
onWrite.set(false)
}
}
} else {
onWrite.set(false)
writeHandler(list)
}
}
}
/**
* 向缓冲中添加一个写入对象
*/
override fun add(obj: T) {
msgList.add(obj)
if (onWrite.compareAndSet(false, true)) {
executor.execute(write)
}
}
}

View File

@ -0,0 +1,16 @@
package cn.tursom.core.storage
/**
* 即时写入存储处理器
*/
class InstantStorageHandler<T>(
// 数据批量写入处理器
private val writeHandler: (obj: T) -> Unit
) : StorageHandler<T> {
/**
* 向缓冲中添加一个写入对象
*/
override fun add(obj: T) {
writeHandler(obj)
}
}

View File

@ -0,0 +1,10 @@
package cn.tursom.core.storage
interface StorageHandler<T> : (T) -> Unit {
/**
* 向存储服务中写入一个对象
*/
fun add(obj: T)
override fun invoke(obj: T) = add(obj)
}

View File

@ -0,0 +1,22 @@
package cn.tursom.core.storage
import java.util.concurrent.Executor
/**
* 使用线程池进行写入的存储处理器
*/
class ThreadPoolStorageHandler<T>(
// 任务执行线程池
private val executor: Executor,
// 数据批量写入处理器
private val writeHandler: (obj: T) -> Unit
) : StorageHandler<T> {
/**
* 向缓冲中添加一个写入对象
*/
override fun add(obj: T) {
executor.execute {
writeHandler(obj)
}
}
}

View File

@ -19,12 +19,14 @@ fun addMailListener(
folder: Folder,
freq: Long, timeUnit: TimeUnit,
listener: MessageCountListener,
newFolder: () -> Folder? = { null }
newFolder: () -> Folder? = { null },
onHeartBeat: Folder. () -> Unit = {}
): ScheduledFuture<*> {
folder.addMessageCountListener(listener)
var mailFOlder = folder
return threadPool.scheduleAtFixedRate({
try {
mailFOlder.onHeartBeat()
mailFOlder.messageCount
} catch (e: FolderClosedException) {
mailFOlder = newFolder() ?: throw e
@ -39,8 +41,9 @@ fun addMailListener(
fun addMailListener(
newFolder: () -> Folder,
freq: Long, timeUnit: TimeUnit,
listener: MessageCountListener
): ScheduledFuture<*> = addMailListener(newFolder(), freq, timeUnit, listener, newFolder)
listener: MessageCountListener,
onHeartBeat: Folder. () -> Unit = {}
): ScheduledFuture<*> = addMailListener(newFolder(), freq, timeUnit, listener, newFolder, onHeartBeat)
fun getStore(host: String, port: Int, account: String, password: String): Store {
val props = System.getProperties()

View File

@ -0,0 +1,86 @@
package cn.tursom.utils
import cn.tursom.core.cast
import com.google.gson.Gson
import com.google.gson.GsonBuilder
import com.google.gson.TypeAdapter
import com.google.gson.TypeAdapterFactory
import com.google.gson.internal.LinkedTreeMap
import com.google.gson.reflect.TypeToken
import com.google.gson.stream.JsonReader
import com.google.gson.stream.JsonToken
import com.google.gson.stream.JsonWriter
class GsonDataTypeAdaptor internal constructor(private val gson: Gson) : TypeAdapter<Map<String, Any>?>() {
override fun write(out: JsonWriter, value: Map<String, Any>?) {
if (value == null) {
out.nullValue()
return
}
out.beginObject()
value.forEach { (t, u) ->
out.name(t)
System.err.println(u)
gson.getAdapter(Any::class.java).write(out, u)
}
out.endObject()
}
override fun read(`in`: JsonReader): Map<String, Any> = readInternal(`in`).cast()
private fun readInternal(`in`: JsonReader): Any? {
return when (`in`.peek()) {
JsonToken.BEGIN_ARRAY -> {
val list: MutableList<Any?> = ArrayList()
`in`.beginArray()
while (`in`.hasNext()) {
list.add(readInternal(`in`))
}
`in`.endArray()
list
}
JsonToken.BEGIN_OBJECT -> {
val map: MutableMap<String, Any?> = LinkedTreeMap()
`in`.beginObject()
while (`in`.hasNext()) {
map[`in`.nextName()] = readInternal(`in`)
}
`in`.endObject()
map
}
JsonToken.STRING -> `in`.nextString()
JsonToken.NUMBER -> {
//将其作为一个字符串读取出来
val numberStr: String = `in`.nextString()
//返回的numberStr不会为null
if (numberStr.contains(".") || numberStr.contains("e")
|| numberStr.contains("E")
) {
numberStr.toDouble()
} else numberStr.toLong()
}
JsonToken.BOOLEAN -> `in`.nextBoolean()
JsonToken.NULL -> {
`in`.nextNull()
null
}
else -> throw IllegalStateException()
}
}
companion object {
val FACTORY: TypeAdapterFactory = object :
TypeAdapterFactory {
override fun <T> create(gson: Gson, type: TypeToken<T>): TypeAdapter<T>? {
return if (type.rawType == Map::class.java) {
GsonDataTypeAdaptor(gson).cast()
} else null
}
}
val gson = GsonBuilder()
.registerTypeAdapterFactory(FACTORY)
.create()
}
}

View File

@ -1,6 +1,7 @@
package cn.tursom.utils
import com.google.gson.Gson
import com.google.gson.GsonBuilder
import kotlinx.coroutines.*
import java.util.concurrent.Executor
import kotlin.coroutines.resume
@ -8,7 +9,15 @@ import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
@Suppress("unused", "SpellCheckingInspection")
val gson = Gson()
val gson =GsonBuilder()
.registerTypeAdapterFactory(GsonDataTypeAdaptor.FACTORY)
.create()
@Suppress("unused", "SpellCheckingInspection")
val prettyGson = GsonBuilder()
.registerTypeAdapterFactory(GsonDataTypeAdaptor.FACTORY)
.setPrettyPrinting()
.create()
inline fun <reified T : Any> Gson.fromJson(json: String) = this.fromJson(json, T::class.java)!!

View File

@ -23,7 +23,7 @@ import java.net.URI
class WebSocketClient(uri: String, val handler: WebSocketHandler) {
private val uri: URI = URI.create(uri)
private var ch: Channel? = null
internal var ch: Channel? = null
fun open() {
close()
@ -72,7 +72,7 @@ class WebSocketClient(uri: String, val handler: WebSocketHandler) {
handler)
}
})
ch = b.connect(uri.host, port).sync().channel()
b.connect(uri.host, port)
//handler.handshakeFuture().sync()
}

View File

@ -12,7 +12,6 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker
import io.netty.util.CharsetUtil
class WebSocketClientChannelHandler(
private val handshaker: WebSocketClientHandshaker,
val client: WebSocketClient,
@ -29,16 +28,21 @@ class WebSocketClientChannelHandler(
}
override fun channelActive(ctx: ChannelHandlerContext) {
client.ch = ctx.channel()
handshaker.handshake(ctx.channel())
}
override fun channelInactive(ctx: ChannelHandlerContext) {
handler.onClose(client)
if (client.ch == ctx.channel()) {
client.ch = null
}
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) {
val ch = ctx.channel()
if (!handshaker.isHandshakeComplete) { // web socket client connected
if (!handshaker.isHandshakeComplete) {
// web socket client connected
handshaker.finishHandshake(ch, msg as FullHttpResponse)
handshakeFuture!!.setSuccess()
handler.onOpen(client)

View File

@ -223,36 +223,16 @@ open class RoutedHttpHandler(
}
protected fun getRoutes(annotation: Annotation) = when (annotation) {
is Mapping -> {
annotation.route to getRouter(annotation.method.let { if (it.isEmpty()) annotation.methodEnum.method else it })
}
is GetMapping -> {
annotation.route to getRouter("GET")
}
is PostMapping -> {
annotation.route to getRouter("POST")
}
is PutMapping -> {
annotation.route to getRouter("PUT")
}
is DeleteMapping -> {
annotation.route to getRouter("DELETE")
}
is PatchMapping -> {
annotation.route to getRouter("PATCH")
}
is TraceMapping -> {
annotation.route to getRouter("TRACE")
}
is HeadMapping -> {
annotation.route to getRouter("HEAD")
}
is OptionsMapping -> {
annotation.route to getRouter("OPTIONS")
}
is ConnectMapping -> {
annotation.route to getRouter("CONNECT")
}
is Mapping -> annotation.route to getRouter(annotation.method.let { if (it.isEmpty()) annotation.methodEnum.method else it })
is GetMapping -> annotation.route to getRouter("GET")
is PostMapping -> annotation.route to getRouter("POST")
is PutMapping -> annotation.route to getRouter("PUT")
is DeleteMapping -> annotation.route to getRouter("DELETE")
is PatchMapping -> annotation.route to getRouter("PATCH")
is TraceMapping -> annotation.route to getRouter("TRACE")
is HeadMapping -> annotation.route to getRouter("HEAD")
is OptionsMapping -> annotation.route to getRouter("OPTIONS")
is ConnectMapping -> annotation.route to getRouter("CONNECT")
else -> null
}

View File

@ -150,36 +150,16 @@ open class AsyncRoutedHttpHandler(
protected fun getAsyncRoutes(annotation: Annotation) = when (annotation) {
is Mapping -> {
annotation.route to getAsyncRouter(annotation.method.let { if (it.isEmpty()) annotation.methodEnum.method else it })
}
is GetMapping -> {
annotation.route to getAsyncRouter("GET")
}
is PostMapping -> {
annotation.route to getAsyncRouter("POST")
}
is PutMapping -> {
annotation.route to getAsyncRouter("PUT")
}
is DeleteMapping -> {
annotation.route to getAsyncRouter("DELETE")
}
is PatchMapping -> {
annotation.route to getAsyncRouter("PATCH")
}
is TraceMapping -> {
annotation.route to getAsyncRouter("TRACE")
}
is HeadMapping -> {
annotation.route to getAsyncRouter("HEAD")
}
is OptionsMapping -> {
annotation.route to getAsyncRouter("OPTIONS")
}
is ConnectMapping -> {
annotation.route to getAsyncRouter("CONNECT")
}
is Mapping -> annotation.route to getAsyncRouter(annotation.method.let { if (it.isEmpty()) annotation.methodEnum.method else it })
is GetMapping -> annotation.route to getAsyncRouter("GET")
is PostMapping -> annotation.route to getAsyncRouter("POST")
is PutMapping -> annotation.route to getAsyncRouter("PUT")
is DeleteMapping -> annotation.route to getAsyncRouter("DELETE")
is PatchMapping -> annotation.route to getAsyncRouter("PATCH")
is TraceMapping -> annotation.route to getAsyncRouter("TRACE")
is HeadMapping -> annotation.route to getAsyncRouter("HEAD")
is OptionsMapping -> annotation.route to getAsyncRouter("OPTIONS")
is ConnectMapping -> annotation.route to getAsyncRouter("CONNECT")
else -> null
}