From 2fb5d872f25cc2108f171d1d0137a92aeb3f126a Mon Sep 17 00:00:00 2001 From: tursom Date: Thu, 12 Mar 2020 00:00:45 +0800 Subject: [PATCH] add AsyncBufferedStorageHandler --- .../mongodb/async/AbstractSubscriber.kt | 5 +- .../mongodb/async/AsyncMongoOperator.kt | 10 ++++ .../storage/AsyncBufferedStorageHandler.kt | 58 +++++++++++++++++++ 3 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 utils/src/main/kotlin/cn/tursom/utils/storage/AsyncBufferedStorageHandler.kt diff --git a/database/mongodb/mongodb-async/src/main/kotlin/cn/tursom/mongodb/async/AbstractSubscriber.kt b/database/mongodb/mongodb-async/src/main/kotlin/cn/tursom/mongodb/async/AbstractSubscriber.kt index 3105e1e..e228742 100644 --- a/database/mongodb/mongodb-async/src/main/kotlin/cn/tursom/mongodb/async/AbstractSubscriber.kt +++ b/database/mongodb/mongodb-async/src/main/kotlin/cn/tursom/mongodb/async/AbstractSubscriber.kt @@ -3,7 +3,7 @@ package cn.tursom.mongodb.async import org.reactivestreams.Subscriber import org.reactivestreams.Subscription -abstract class AbstractSubscriber : Subscriber { +abstract class AbstractSubscriber(val autoRequest: Boolean = false) : Subscriber { var compete = false lateinit var subscription: Subscription @@ -13,5 +13,8 @@ abstract class AbstractSubscriber : Subscriber { override fun onSubscribe(s: Subscription) { subscription = s + if (autoRequest) { + subscription.request(Int.MAX_VALUE.toLong()) + } } } \ No newline at end of file diff --git a/database/mongodb/mongodb-async/src/main/kotlin/cn/tursom/mongodb/async/AsyncMongoOperator.kt b/database/mongodb/mongodb-async/src/main/kotlin/cn/tursom/mongodb/async/AsyncMongoOperator.kt index 1096b34..fe3a635 100644 --- a/database/mongodb/mongodb-async/src/main/kotlin/cn/tursom/mongodb/async/AsyncMongoOperator.kt +++ b/database/mongodb/mongodb-async/src/main/kotlin/cn/tursom/mongodb/async/AsyncMongoOperator.kt @@ -195,4 +195,14 @@ class AsyncMongoOperator( val document = convertToBson(entity) upsert(document, document) } + + suspend fun count(): Long { + val publisher = countDocuments() + return suspendCoroutine { cont -> + publisher.subscribe(object : AbstractSubscriber(true) { + override fun onNext(t: Long) = cont.resume(t) + override fun onError(t: Throwable) = cont.resumeWithException(t) + }) + } + } } \ No newline at end of file diff --git a/utils/src/main/kotlin/cn/tursom/utils/storage/AsyncBufferedStorageHandler.kt b/utils/src/main/kotlin/cn/tursom/utils/storage/AsyncBufferedStorageHandler.kt new file mode 100644 index 0000000..d00c3ad --- /dev/null +++ b/utils/src/main/kotlin/cn/tursom/utils/storage/AsyncBufferedStorageHandler.kt @@ -0,0 +1,58 @@ +package cn.tursom.utils.storage + +import cn.tursom.core.storage.StorageHandler +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicBoolean + +class AsyncBufferedStorageHandler( + // 最小缓冲时间 + private val minBufTime: Long = 500, + private val singleThreadWrite: Boolean = true, + // 数据批量写入处理器 + private val writeHandler: suspend (list: Collection) -> Unit +) : StorageHandler { + private val onWrite = AtomicBoolean(false) + + @Volatile + private var msgList = ConcurrentLinkedQueue() + + private val write = object { + suspend operator fun invoke() { + val list = msgList + delay(minBufTime) + msgList = ConcurrentLinkedQueue() + // 可能还有未释放 msgList 对象的线程,要稍微等待一下 + delay(1) + if (singleThreadWrite) { + try { + writeHandler(list) + } finally { + if (msgList.isNotEmpty()) { + val write = this + GlobalScope.launch { write() } + } else { + onWrite.set(false) + } + } + } else { + onWrite.set(false) + writeHandler(list) + } + } + } + + /** + * 向缓冲中添加一个写入对象 + */ + override fun add(obj: T) { + msgList.add(obj) + if (onWrite.compareAndSet(false, true)) { + GlobalScope.launch { + write() + } + } + } +} \ No newline at end of file