add AsyncBufferedStorageHandler

This commit is contained in:
tursom 2020-03-12 00:00:45 +08:00
parent 689490748e
commit 2fb5d872f2
3 changed files with 72 additions and 1 deletions

View File

@ -3,7 +3,7 @@ package cn.tursom.mongodb.async
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
abstract class AbstractSubscriber<T> : Subscriber<T> {
abstract class AbstractSubscriber<T>(val autoRequest: Boolean = false) : Subscriber<T> {
var compete = false
lateinit var subscription: Subscription
@ -13,5 +13,8 @@ abstract class AbstractSubscriber<T> : Subscriber<T> {
override fun onSubscribe(s: Subscription) {
subscription = s
if (autoRequest) {
subscription.request(Int.MAX_VALUE.toLong())
}
}
}

View File

@ -195,4 +195,14 @@ class AsyncMongoOperator<T : Any>(
val document = convertToBson(entity)
upsert(document, document)
}
suspend fun count(): Long {
val publisher = countDocuments()
return suspendCoroutine { cont ->
publisher.subscribe(object : AbstractSubscriber<Long>(true) {
override fun onNext(t: Long) = cont.resume(t)
override fun onError(t: Throwable) = cont.resumeWithException(t)
})
}
}
}

View File

@ -0,0 +1,58 @@
package cn.tursom.utils.storage
import cn.tursom.core.storage.StorageHandler
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
class AsyncBufferedStorageHandler<T>(
// 最小缓冲时间
private val minBufTime: Long = 500,
private val singleThreadWrite: Boolean = true,
// 数据批量写入处理器
private val writeHandler: suspend (list: Collection<T>) -> Unit
) : StorageHandler<T> {
private val onWrite = AtomicBoolean(false)
@Volatile
private var msgList = ConcurrentLinkedQueue<T>()
private val write = object {
suspend operator fun invoke() {
val list = msgList
delay(minBufTime)
msgList = ConcurrentLinkedQueue()
// 可能还有未释放 msgList 对象的线程,要稍微等待一下
delay(1)
if (singleThreadWrite) {
try {
writeHandler(list)
} finally {
if (msgList.isNotEmpty()) {
val write = this
GlobalScope.launch { write() }
} else {
onWrite.set(false)
}
}
} else {
onWrite.set(false)
writeHandler(list)
}
}
}
/**
* 向缓冲中添加一个写入对象
*/
override fun add(obj: T) {
msgList.add(obj)
if (onWrite.compareAndSet(false, true)) {
GlobalScope.launch {
write()
}
}
}
}