mirror of
https://github.com/tursom/TursomServer.git
synced 2025-03-04 06:40:11 +08:00
update Subscriber
This commit is contained in:
parent
465636b0dd
commit
7b47f3382a
@ -1,20 +0,0 @@
|
||||
package cn.tursom.mongodb.async
|
||||
|
||||
import org.reactivestreams.Subscriber
|
||||
import org.reactivestreams.Subscription
|
||||
|
||||
abstract class AbstractSubscriber<T>(val autoRequest: Boolean = false) : Subscriber<T> {
|
||||
var compete = false
|
||||
lateinit var subscription: Subscription
|
||||
|
||||
override fun onComplete() {
|
||||
compete = true
|
||||
}
|
||||
|
||||
override fun onSubscribe(s: Subscription) {
|
||||
subscription = s
|
||||
if (autoRequest) {
|
||||
subscription.request(Int.MAX_VALUE.toLong())
|
||||
}
|
||||
}
|
||||
}
|
@ -1,33 +1,20 @@
|
||||
package cn.tursom.mongodb.async
|
||||
|
||||
import cn.tursom.core.Disposable
|
||||
import cn.tursom.mongodb.BsonFactoryImpl
|
||||
import cn.tursom.mongodb.IndexBuilder
|
||||
import cn.tursom.mongodb.MongoUtil
|
||||
import cn.tursom.mongodb.Update
|
||||
import cn.tursom.mongodb.async.subscriber.*
|
||||
import cn.tursom.utils.AsyncIterator
|
||||
import cn.tursom.utils.forEach
|
||||
import com.mongodb.MongoClientSettings
|
||||
import com.mongodb.ServerAddress
|
||||
import com.mongodb.client.model.DeleteOptions
|
||||
import com.mongodb.client.model.InsertManyOptions
|
||||
import com.mongodb.client.model.InsertOneOptions
|
||||
import com.mongodb.client.model.UpdateOptions
|
||||
import com.mongodb.client.model.*
|
||||
import com.mongodb.client.result.InsertManyResult
|
||||
import com.mongodb.client.result.InsertOneResult
|
||||
import com.mongodb.client.result.UpdateResult
|
||||
import com.mongodb.reactivestreams.client.MongoClients
|
||||
import com.mongodb.reactivestreams.client.MongoCollection
|
||||
import com.mongodb.reactivestreams.client.MongoDatabase
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.bson.Document
|
||||
import org.bson.conversions.Bson
|
||||
import org.reactivestreams.Publisher
|
||||
import org.reactivestreams.Subscriber
|
||||
import org.reactivestreams.Subscription
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import kotlin.coroutines.Continuation
|
||||
import kotlin.coroutines.resume
|
||||
import kotlin.coroutines.resumeWithException
|
||||
import kotlin.coroutines.suspendCoroutine
|
||||
import kotlin.reflect.KProperty1
|
||||
|
||||
@ -38,103 +25,71 @@ class AsyncMongoOperator<T : Any>(
|
||||
constructor(clazz: Class<T>, db: MongoDatabase) : this(db.getCollection(MongoUtil.collectionName(clazz)), clazz)
|
||||
|
||||
suspend fun save(entity: T, options: InsertOneOptions = InsertOneOptions()) {
|
||||
val publisher = collection.insertOne(convertToBson(entity), options)
|
||||
suspendCoroutine<Unit> { cont ->
|
||||
publisher.subscribe(object : Subscriber<InsertOneResult> {
|
||||
override fun onComplete() = cont.resume(Unit)
|
||||
override fun onSubscribe(s: Subscription?) {}
|
||||
override fun onNext(t: InsertOneResult?) {}
|
||||
override fun onError(t: Throwable) = cont.resumeWithException(t)
|
||||
})
|
||||
}
|
||||
insertOne(entity, options)
|
||||
}
|
||||
|
||||
|
||||
suspend fun save(entities: Collection<T>, options: InsertManyOptions = InsertManyOptions()) {
|
||||
val publisher = collection.insertMany(entities.map { convertToBson(it) }, options)
|
||||
suspendCoroutine<Unit> { cont ->
|
||||
publisher.subscribe(object : Subscriber<InsertManyResult> {
|
||||
override fun onComplete() = cont.resume(Unit)
|
||||
override fun onSubscribe(s: Subscription?) {}
|
||||
override fun onNext(t: InsertManyResult) {}
|
||||
override fun onError(t: Throwable) = cont.resumeWithException(t)
|
||||
})
|
||||
insertMany(entities, options)
|
||||
}
|
||||
|
||||
suspend fun insertOne(entity: T, options: InsertOneOptions = InsertOneOptions()): InsertOneResult? {
|
||||
return suspendCoroutine { cont ->
|
||||
insertOne(convertToBson(entity), options).subscribe(SuspendInsertOneSubscriber(cont))
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun update(update: Bson, where: Bson, options: UpdateOptions = UpdateOptions()): UpdateResult {
|
||||
suspend fun insertMany(entities: Collection<T>, options: InsertManyOptions = InsertManyOptions()): List<InsertManyResult> {
|
||||
val publisher = collection.insertMany(entities.map { convertToBson(it) }, options)
|
||||
return suspendCoroutine { cont ->
|
||||
publisher.subscribe(SuspendInsertListSubscriber(cont, entities.size.toLong()))
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun update(update: Bson, where: Bson, options: UpdateOptions = UpdateOptions()): UpdateResult? {
|
||||
val publisher = collection.updateOne(where, update, options)
|
||||
return suspendCoroutine { cont ->
|
||||
publisher.subscribe(object : Subscriber<UpdateResult> {
|
||||
override fun onComplete() {}
|
||||
override fun onSubscribe(s: Subscription) {}
|
||||
override fun onNext(t: UpdateResult) = cont.resume(t)
|
||||
override fun onError(t: Throwable) = cont.resumeWithException(t)
|
||||
})
|
||||
publisher.subscribe(SuspendInsertOneSubscriber(cont))
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun update(entity: T, where: Bson, options: UpdateOptions = UpdateOptions()): UpdateResult {
|
||||
suspend fun update(entity: T, where: Bson, options: UpdateOptions = UpdateOptions()): UpdateResult? {
|
||||
return update(convertToBson(entity), where, options)
|
||||
}
|
||||
|
||||
@Suppress("SpellCheckingInspection")
|
||||
suspend fun upsert(entity: T, where: Bson, options: UpdateOptions = UpdateOptions()): UpdateResult {
|
||||
suspend fun upsert(entity: T, where: Bson, options: UpdateOptions = UpdateOptions()): UpdateResult? {
|
||||
return update(entity, where, options.upsert(true))
|
||||
}
|
||||
|
||||
@Suppress("SpellCheckingInspection")
|
||||
suspend fun upsert(update: Bson, where: Bson, options: UpdateOptions = UpdateOptions()): UpdateResult {
|
||||
suspend fun upsert(update: Bson, where: Bson, options: UpdateOptions = UpdateOptions()): UpdateResult? {
|
||||
return update(update, where, options.upsert(true))
|
||||
}
|
||||
|
||||
suspend fun add(field: KProperty1<T, Number?>, value: Number, where: Bson, options: UpdateOptions = UpdateOptions()): UpdateResult {
|
||||
suspend fun add(field: KProperty1<T, Number?>, value: Number, where: Bson, options: UpdateOptions = UpdateOptions()): UpdateResult? {
|
||||
return upsert(
|
||||
Update { field inc value },
|
||||
where, options
|
||||
)
|
||||
}
|
||||
|
||||
suspend fun inc(field: KProperty1<T, Number?>, where: Bson): UpdateResult {
|
||||
suspend fun inc(field: KProperty1<T, Number?>, where: Bson): UpdateResult? {
|
||||
return add(field, 1, where)
|
||||
}
|
||||
|
||||
suspend fun getOne(where: Bson? = null): T? {
|
||||
val publisher = if (where == null) find() else find(where)
|
||||
return suspendCoroutine { cont ->
|
||||
publisher.subscribe(object : AbstractSubscriber<Document>() {
|
||||
var resumed = false
|
||||
override fun onComplete() {
|
||||
super.onComplete()
|
||||
if (!resumed) cont.resume(null)
|
||||
}
|
||||
|
||||
override fun onSubscribe(s: Subscription) = s.request(1)
|
||||
override fun onNext(t: Document) = cont.resume(parse(t))
|
||||
override fun onError(t: Throwable) = cont.resumeWithException(t)
|
||||
})
|
||||
publisher.subscribe(SuspendOneSubscriber(this, cont))
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun list(where: Bson? = null): List<T> {
|
||||
val publisher = if (where == null) find() else find(where)
|
||||
val resultList = ArrayList<T>()
|
||||
suspendCoroutine<Unit> { cont ->
|
||||
publisher.subscribe(object : AbstractSubscriber<Document>() {
|
||||
override fun onComplete() {
|
||||
super.onComplete()
|
||||
cont.resume(Unit)
|
||||
}
|
||||
|
||||
override fun onSubscribe(s: Subscription) = s.request(Int.MAX_VALUE.toLong())
|
||||
override fun onNext(t: Document) {
|
||||
resultList.add(parse(t))
|
||||
}
|
||||
|
||||
override fun onError(t: Throwable) = cont.resumeWithException(t)
|
||||
})
|
||||
return suspendCoroutine { cont ->
|
||||
publisher.subscribe(SuspendListSubscriber(this, cont))
|
||||
}
|
||||
return resultList
|
||||
}
|
||||
|
||||
fun get(where: Bson? = null, bufSize: Int = 32): AsyncIterator<T> {
|
||||
@ -145,44 +100,7 @@ class AsyncMongoOperator<T : Any>(
|
||||
fun aggregate(vararg pipeline: Bson, bufSize: Int = 32) = iterator(aggregate(pipeline.asList()), bufSize)
|
||||
|
||||
private fun iterator(publisher: Publisher<Document>, bufSize: Int = 32): AsyncIterator<T> {
|
||||
val subscriber = object : AbstractSubscriber<Document>(), AsyncIterator<T> {
|
||||
private var cont = Disposable<Continuation<T>>()
|
||||
private var notify = Disposable<Continuation<Unit>>()
|
||||
private val cache = ConcurrentLinkedQueue<T>()
|
||||
|
||||
override fun onComplete() {
|
||||
super.onComplete()
|
||||
cont.get()?.resumeWithException(Exception())
|
||||
notify.get()?.resume(Unit)
|
||||
}
|
||||
|
||||
override fun onNext(t: Document) {
|
||||
val entity = parse(t)
|
||||
cont.get()?.resume(entity) ?: cache.add(entity)
|
||||
notify.get()?.resume(Unit)
|
||||
}
|
||||
|
||||
override fun onError(t: Throwable) {
|
||||
cont.get()?.resumeWithException(t) ?: t.printStackTrace()
|
||||
}
|
||||
|
||||
override suspend fun next(): T {
|
||||
return cache.poll() ?: suspendCoroutine { cont ->
|
||||
this.cont.set(cont)
|
||||
subscription.request(bufSize.toLong())
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun hasNext(): Boolean {
|
||||
if (cache.isEmpty() && !compete) {
|
||||
suspendCoroutine<Unit> {
|
||||
notify.set(it)
|
||||
subscription.request(bufSize.toLong())
|
||||
}
|
||||
}
|
||||
return cache.isNotEmpty()
|
||||
}
|
||||
}
|
||||
val subscriber = AsyncIteratorSubscriber(this, bufSize)
|
||||
publisher.subscribe(subscriber)
|
||||
return subscriber
|
||||
}
|
||||
@ -199,10 +117,23 @@ class AsyncMongoOperator<T : Any>(
|
||||
suspend fun count(): Long {
|
||||
val publisher = countDocuments()
|
||||
return suspendCoroutine { cont ->
|
||||
publisher.subscribe(object : AbstractSubscriber<Long>(true) {
|
||||
override fun onNext(t: Long) = cont.resume(t)
|
||||
override fun onError(t: Throwable) = cont.resumeWithException(t)
|
||||
})
|
||||
publisher.subscribe(SuspendInsertOneSubscriber(cont))
|
||||
} ?: 0L
|
||||
}
|
||||
|
||||
suspend fun createIndexSuspend(key: Bson, indexOptions: IndexOptions = IndexOptions()): String? {
|
||||
return suspendCoroutine { cont ->
|
||||
createIndex(key, indexOptions).subscribe(SuspendInsertOneSubscriber(cont))
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun createIndexSuspend(indexBuilder: IndexBuilder.() -> Unit): String? {
|
||||
val builder = IndexBuilder()
|
||||
builder.indexBuilder()
|
||||
return createIndexSuspend(builder.indexDocument, builder.indexOption)
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
return "MongoOperator(collection=$collection, clazz=$clazz)"
|
||||
}
|
||||
}
|
@ -8,42 +8,42 @@ import com.mongodb.client.model.InsertOneOptions
|
||||
import com.mongodb.client.model.UpdateOptions
|
||||
import com.mongodb.reactivestreams.client.MongoClient
|
||||
import com.mongodb.reactivestreams.client.MongoClients
|
||||
import com.mongodb.reactivestreams.client.MongoDatabase
|
||||
import org.bson.conversions.Bson
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
class AsyncMongoTemplate(
|
||||
val mongoClient: MongoClient,
|
||||
db: String
|
||||
) {
|
||||
constructor(db: String, vararg servers: ServerAddress) : this(MongoClientSettings.builder().let {
|
||||
private val db: MongoDatabase
|
||||
) : MongoDatabase by db {
|
||||
constructor(
|
||||
mongoClient: MongoClient,
|
||||
db: String
|
||||
) : this(mongoClient.getDatabase(db))
|
||||
|
||||
constructor(db: String, mongoClientSettingsBuilder: MongoClientSettings.Builder) : this(MongoClients.create(mongoClientSettingsBuilder.build()), db)
|
||||
|
||||
constructor(db: String, vararg servers: ServerAddress) : this(db, MongoClientSettings.builder().also {
|
||||
it.applyToClusterSettings { builder ->
|
||||
builder.hosts(servers.asList())
|
||||
}
|
||||
MongoClients.create(it.build())
|
||||
}, db)
|
||||
})
|
||||
|
||||
constructor(host: String, port: Int, db: String) : this(db, ServerAddress(host, port))
|
||||
|
||||
val db = mongoClient.getDatabase(db)
|
||||
private val operatorMap = ConcurrentHashMap<Class<*>, AsyncMongoOperator<*>>()
|
||||
|
||||
|
||||
suspend fun <T : Any> save(entity: T, options: InsertOneOptions = InsertOneOptions()) {
|
||||
suspend fun <T : Any> save(entity: T, options: InsertOneOptions = InsertOneOptions()) =
|
||||
getCollection(entity.javaClass).save(entity, options)
|
||||
}
|
||||
|
||||
suspend inline fun <reified T : Any> save(entities: Collection<T>, options: InsertManyOptions = InsertManyOptions()) {
|
||||
suspend inline fun <reified T : Any> save(entities: Collection<T>, options: InsertManyOptions = InsertManyOptions()) =
|
||||
getCollection<T>().save(entities, options)
|
||||
}
|
||||
|
||||
suspend fun <T : Any> update(entity: T, where: Bson, options: UpdateOptions = UpdateOptions()) {
|
||||
suspend fun <T : Any> update(entity: T, where: Bson, options: UpdateOptions = UpdateOptions()) =
|
||||
getCollection(entity.javaClass).update(entity, where, options)
|
||||
}
|
||||
|
||||
@Suppress("SpellCheckingInspection")
|
||||
suspend fun <T : Any> upsert(entity: T, where: Bson, options: UpdateOptions = UpdateOptions()) {
|
||||
suspend fun <T : Any> upsert(entity: T, where: Bson, options: UpdateOptions = UpdateOptions()) =
|
||||
getCollection(entity.javaClass).upsert(entity, where, options)
|
||||
}
|
||||
|
||||
inline fun <reified T : Any> getCollection(): AsyncMongoOperator<T> = getCollection(T::class.java)
|
||||
|
||||
@ -54,4 +54,12 @@ class AsyncMongoTemplate(
|
||||
operator
|
||||
}
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
return "MongoTemplate(db=$db)"
|
||||
}
|
||||
|
||||
//override fun close() {
|
||||
// //mongoClient.close()
|
||||
//}
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
package cn.tursom.mongodb.async.subscriber
|
||||
|
||||
import cn.tursom.mongodb.BsonFactory
|
||||
import org.bson.Document
|
||||
import org.reactivestreams.Subscriber
|
||||
import org.reactivestreams.Subscription
|
||||
|
||||
@Suppress("ReactiveStreamsSubscriberImplementation")
|
||||
abstract class AbstractSubscriber<T>(
|
||||
private val bsonFactory: BsonFactory<T>,
|
||||
val size: Long = 1
|
||||
) : Subscriber<Document> {
|
||||
abstract fun next(t: T)
|
||||
|
||||
protected var requested = 0
|
||||
protected lateinit var s: Subscription
|
||||
protected var compete = false
|
||||
|
||||
override fun onComplete() {
|
||||
compete = true
|
||||
}
|
||||
|
||||
override fun onSubscribe(s: Subscription) {
|
||||
this.s = s
|
||||
s.request(size - requested)
|
||||
}
|
||||
|
||||
override fun onNext(t: Document) {
|
||||
next(bsonFactory.parse(t))
|
||||
requested++
|
||||
if (size - requested > 0) {
|
||||
s.request(size - requested)
|
||||
} else {
|
||||
onComplete()
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,56 @@
|
||||
package cn.tursom.mongodb.async.subscriber
|
||||
|
||||
import cn.tursom.core.Disposable
|
||||
import cn.tursom.mongodb.BsonFactory
|
||||
import cn.tursom.utils.AsyncIterator
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import kotlin.coroutines.Continuation
|
||||
import kotlin.coroutines.resume
|
||||
import kotlin.coroutines.resumeWithException
|
||||
import kotlin.coroutines.suspendCoroutine
|
||||
|
||||
class AsyncIteratorSubscriber<T>(
|
||||
bsonFactory: BsonFactory<T>,
|
||||
val bufSize: Int = 32,
|
||||
requestSize: Long = 1
|
||||
) : AbstractSubscriber<T>(bsonFactory, requestSize), AsyncIterator<T>, BsonFactory<T> by bsonFactory {
|
||||
private var cont = Disposable<Continuation<T>>()
|
||||
private var notify = Disposable<Continuation<Unit>>()
|
||||
private val cache = ConcurrentLinkedQueue<T>()
|
||||
|
||||
override fun onComplete() {
|
||||
super.onComplete()
|
||||
cont.get()?.resumeWithException(Exception())
|
||||
notify.get()?.resume(Unit)
|
||||
}
|
||||
|
||||
override fun next(t: T) {
|
||||
cont.get()?.resume(t) ?: cache.add(t)
|
||||
notify.get()?.resume(Unit)
|
||||
}
|
||||
|
||||
override fun onError(t: Throwable) {
|
||||
cont.get()?.resumeWithException(t) ?: t.printStackTrace()
|
||||
}
|
||||
|
||||
override suspend fun next(): T {
|
||||
return cache.poll() ?: suspendCoroutine { cont ->
|
||||
this.cont.set(cont)
|
||||
s.request(bufSize.toLong())
|
||||
cache.poll()?.let { this.cont.get()?.resume(it) ?: cache.add(it) }
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun hasNext(): Boolean {
|
||||
if (cache.isEmpty() && !compete) {
|
||||
suspendCoroutine<Unit> {
|
||||
notify.set(it)
|
||||
s.request(bufSize.toLong())
|
||||
if (cache.isNotEmpty()) {
|
||||
notify.get()?.resume(Unit)
|
||||
}
|
||||
}
|
||||
}
|
||||
return cache.isNotEmpty()
|
||||
}
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
package cn.tursom.mongodb.async.subscriber
|
||||
|
||||
import org.reactivestreams.Subscriber
|
||||
import org.reactivestreams.Subscription
|
||||
import kotlin.coroutines.Continuation
|
||||
import kotlin.coroutines.resume
|
||||
import kotlin.coroutines.resumeWithException
|
||||
|
||||
@Suppress("ReactiveStreamsSubscriberImplementation")
|
||||
class SuspendInsertListSubscriber<T>(
|
||||
val cont: Continuation<List<T>>,
|
||||
val requestSize: Long = Long.MAX_VALUE
|
||||
) : Subscriber<T> {
|
||||
val resultList = ArrayList<T>()
|
||||
override fun onComplete() {
|
||||
cont.resume(resultList)
|
||||
}
|
||||
|
||||
override fun onSubscribe(s: Subscription) = s.request(requestSize)
|
||||
override fun onNext(t: T) {
|
||||
resultList.add(t)
|
||||
}
|
||||
|
||||
override fun onError(t: Throwable) {
|
||||
cont.resumeWithException(t)
|
||||
}
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
package cn.tursom.mongodb.async.subscriber
|
||||
|
||||
import cn.tursom.core.Disposable
|
||||
import org.reactivestreams.Subscriber
|
||||
import org.reactivestreams.Subscription
|
||||
import kotlin.coroutines.Continuation
|
||||
import kotlin.coroutines.resume
|
||||
import kotlin.coroutines.resumeWithException
|
||||
|
||||
@Suppress("ReactiveStreamsSubscriberImplementation")
|
||||
class SuspendInsertOneSubscriber<T>(
|
||||
cont: Continuation<T?>,
|
||||
val requestSize: Long = 1
|
||||
) : Subscriber<T> {
|
||||
val contDisposable = Disposable(cont)
|
||||
override fun onComplete() {
|
||||
contDisposable.get()?.resume(null)
|
||||
}
|
||||
|
||||
override fun onSubscribe(s: Subscription) = s.request(requestSize)
|
||||
override fun onNext(t: T) {
|
||||
contDisposable.get()?.resume(t)
|
||||
}
|
||||
|
||||
override fun onError(t: Throwable) {
|
||||
contDisposable.get()?.resumeWithException(t)
|
||||
}
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package cn.tursom.mongodb.async.subscriber
|
||||
|
||||
import cn.tursom.mongodb.BsonFactory
|
||||
import kotlin.coroutines.Continuation
|
||||
import kotlin.coroutines.resume
|
||||
import kotlin.coroutines.resumeWithException
|
||||
|
||||
@Suppress("ReactiveStreamsSubscriberImplementation")
|
||||
class SuspendListSubscriber<T>(
|
||||
bsonFactory: BsonFactory<T>,
|
||||
val cont: Continuation<List<T>>,
|
||||
size: Long = Long.MAX_VALUE
|
||||
) : AbstractSubscriber<T>(bsonFactory, size) {
|
||||
val resultList = ArrayList<T>()
|
||||
override fun onComplete() = cont.resume(resultList)
|
||||
override fun onError(t: Throwable) = cont.resumeWithException(t)
|
||||
|
||||
override fun next(t: T) {
|
||||
resultList.add(t)
|
||||
}
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
package cn.tursom.mongodb.async.subscriber
|
||||
|
||||
import cn.tursom.core.Disposable
|
||||
import cn.tursom.mongodb.BsonFactory
|
||||
import kotlin.coroutines.Continuation
|
||||
import kotlin.coroutines.resume
|
||||
import kotlin.coroutines.resumeWithException
|
||||
|
||||
@Suppress("ReactiveStreamsSubscriberImplementation")
|
||||
class SuspendOneSubscriber<T>(
|
||||
bsonFactory: BsonFactory<T>,
|
||||
cont: Continuation<T?>,
|
||||
size: Long = 1
|
||||
) : AbstractSubscriber<T>(bsonFactory, size) {
|
||||
val contDisposable = Disposable(cont)
|
||||
override fun onComplete() {
|
||||
contDisposable.get()?.resume(null)
|
||||
}
|
||||
|
||||
override fun next(t: T) {
|
||||
contDisposable.get()?.resume(t)
|
||||
}
|
||||
|
||||
override fun onError(t: Throwable) {
|
||||
contDisposable.get()?.resumeWithException(t)
|
||||
}
|
||||
}
|
@ -0,0 +1,5 @@
|
||||
package cn.tursom.mongodb.async
|
||||
|
||||
import com.mongodb.reactivestreams.client.MongoClient
|
||||
|
||||
fun MongoClient.getMongoTemplate(db: String) = AsyncMongoTemplate(this, db)
|
@ -0,0 +1,135 @@
|
||||
package cn.tursom.mongodb
|
||||
|
||||
import com.mongodb.client.model.Collation
|
||||
import com.mongodb.client.model.IndexOptions
|
||||
import org.bson.Document
|
||||
import org.bson.conversions.Bson
|
||||
import java.lang.reflect.Field
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.reflect.KProperty
|
||||
|
||||
@Suppress("unused")
|
||||
class IndexBuilder {
|
||||
val indexDocument = Document()
|
||||
val indexOption = IndexOptions()
|
||||
|
||||
var background
|
||||
get() = indexOption.isBackground
|
||||
set(value) {
|
||||
indexOption.background(value)
|
||||
}
|
||||
var unique
|
||||
get() = indexOption.isUnique
|
||||
set(value) {
|
||||
indexOption.unique(value)
|
||||
}
|
||||
var name: String?
|
||||
get() = indexOption.name
|
||||
set(value) {
|
||||
indexOption.name(value)
|
||||
}
|
||||
var sparse
|
||||
get() = indexOption.isSparse
|
||||
set(value) {
|
||||
indexOption.sparse(value)
|
||||
}
|
||||
var expireAfterSeconds: Long?
|
||||
get() = indexOption.getExpireAfter(TimeUnit.SECONDS)
|
||||
set(value) {
|
||||
indexOption.expireAfter(value, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
fun getExpireAfter(timeUnit: TimeUnit): Long? = indexOption.getExpireAfter(timeUnit)
|
||||
fun expireAfter(expireAfter: Long?, timeUnit: TimeUnit): IndexOptions? = indexOption.expireAfter(expireAfter, timeUnit)
|
||||
|
||||
var version: Int?
|
||||
get() = indexOption.version
|
||||
set(value) {
|
||||
indexOption.version(value)
|
||||
}
|
||||
var weights: Bson?
|
||||
get() = indexOption.weights
|
||||
set(value) {
|
||||
indexOption.weights(value)
|
||||
}
|
||||
var defaultLanguage: String?
|
||||
get() = indexOption.defaultLanguage
|
||||
set(value) {
|
||||
indexOption.defaultLanguage(value)
|
||||
}
|
||||
var languageOverride: String?
|
||||
get() = indexOption.languageOverride
|
||||
set(value) {
|
||||
indexOption.languageOverride(value)
|
||||
}
|
||||
var textVersion: Int?
|
||||
get() = indexOption.textVersion
|
||||
set(value) {
|
||||
indexOption.textVersion(value)
|
||||
}
|
||||
var sphereVersion: Int?
|
||||
get() = indexOption.sphereVersion
|
||||
set(value) {
|
||||
indexOption.sphereVersion(value)
|
||||
}
|
||||
var bits: Int?
|
||||
get() = indexOption.bits
|
||||
set(value) {
|
||||
indexOption.bits(value)
|
||||
}
|
||||
var min: Double?
|
||||
get() = indexOption.min
|
||||
set(value) {
|
||||
indexOption.min(value)
|
||||
}
|
||||
var max: Double?
|
||||
get() = indexOption.max
|
||||
set(value) {
|
||||
indexOption.max(value)
|
||||
}
|
||||
var bucketSize: Double?
|
||||
get() = indexOption.bucketSize
|
||||
set(value) {
|
||||
indexOption.bucketSize(value)
|
||||
}
|
||||
var storageEngine: Bson?
|
||||
get() = indexOption.storageEngine
|
||||
set(value) {
|
||||
indexOption.storageEngine(value)
|
||||
}
|
||||
var partialFilterExpression: Bson?
|
||||
get() = indexOption.partialFilterExpression
|
||||
set(value) {
|
||||
indexOption.partialFilterExpression(value)
|
||||
}
|
||||
var collation: Collation?
|
||||
get() = indexOption.collation
|
||||
set(value) {
|
||||
indexOption.collation(value)
|
||||
}
|
||||
var wildcardProjection: Bson
|
||||
get() = indexOption.wildcardProjection
|
||||
set(value) {
|
||||
indexOption.wildcardProjection(value)
|
||||
}
|
||||
|
||||
|
||||
enum class SortEnum(val code: Int) {
|
||||
DSC(1), DESC(-1)
|
||||
}
|
||||
|
||||
infix fun String.index(sort: SortEnum) {
|
||||
indexDocument[this] = sort.code
|
||||
}
|
||||
|
||||
operator fun String.unaryPlus() = this index SortEnum.DSC
|
||||
operator fun String.unaryMinus() = this index SortEnum.DESC
|
||||
|
||||
infix fun KProperty<*>.index(sort: SortEnum) = MongoUtil.fieldName(this) index sort
|
||||
operator fun KProperty<*>.unaryPlus() = +MongoUtil.fieldName(this)
|
||||
operator fun KProperty<*>.unaryMinus() = -MongoUtil.fieldName(this)
|
||||
|
||||
infix fun Field.index(sort: SortEnum) = MongoUtil.fieldName(this) index sort
|
||||
operator fun Field.unaryPlus() = +MongoUtil.fieldName(this)
|
||||
operator fun Field.unaryMinus() = -MongoUtil.fieldName(this)
|
||||
}
|
82
database/src/main/kotlin/cn/tursom/database/AutoTable.kt
Normal file
82
database/src/main/kotlin/cn/tursom/database/AutoTable.kt
Normal file
@ -0,0 +1,82 @@
|
||||
package cn.tursom.database
|
||||
|
||||
import cn.tursom.core.cast
|
||||
import cn.tursom.database.SqlUtils.tableField
|
||||
import cn.tursom.database.SqlUtils.tableName
|
||||
import cn.tursom.database.annotations.TableField
|
||||
import cn.tursom.utils.clone.Property
|
||||
import cn.tursom.utils.clone.inject
|
||||
import cn.tursom.utils.clone.instance
|
||||
import me.liuwj.ktorm.dsl.QueryRowSet
|
||||
import me.liuwj.ktorm.schema.BaseTable
|
||||
import me.liuwj.ktorm.schema.Column
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import kotlin.reflect.KClass
|
||||
import kotlin.reflect.KProperty
|
||||
import kotlin.reflect.KProperty0
|
||||
import kotlin.reflect.KProperty1
|
||||
import kotlin.reflect.full.memberProperties
|
||||
import kotlin.reflect.jvm.javaField
|
||||
|
||||
open class AutoTable<T : Any>(
|
||||
entityClass: KClass<T>,
|
||||
tableName: String = entityClass.tableName,
|
||||
alias: String? = null,
|
||||
val unsafe: Boolean = true
|
||||
) : BaseTable<T>(tableName, alias, entityClass) {
|
||||
private val fieldMap: Map<String, KProperty<*>>
|
||||
private val fieldColumns: MutableMap<KProperty<*>, Column<*>> = HashMap()
|
||||
private val fieldNameColumnMap: MutableMap<String, Column<*>> = HashMap()
|
||||
|
||||
init {
|
||||
fieldMap = entityClass.memberProperties.associateBy { it.tableField }
|
||||
entityClass.memberProperties.forEach {
|
||||
val field = it.javaField ?: return@forEach
|
||||
val tableField: TableField? = field.getAnnotation(TableField::class.java)
|
||||
if (tableField?.exist == false) return@forEach
|
||||
//TypeAdapterFactory.register(this, it)
|
||||
val column = TypeAdapterFactory.register(this, it) ?: return@forEach
|
||||
fieldColumns[it] = column
|
||||
fieldNameColumnMap[it.name] = column
|
||||
}
|
||||
}
|
||||
|
||||
override fun doCreateEntity(row: QueryRowSet, withReferences: Boolean): T {
|
||||
val instance = instance(unsafe, entityClass!!.java)!!
|
||||
columns.forEach {
|
||||
val field = fieldMap[it.name] ?: return@forEach
|
||||
row[it]?.inject(instance, field.cast<Property<Any>>())
|
||||
}
|
||||
return instance
|
||||
}
|
||||
|
||||
operator fun <R : Any> get(property: KProperty1<T, R?>): Column<R> = fieldColumns[property].cast()
|
||||
//operator fun <R : Any> get(property: KProperty1<T, R?>): Column<R> = this[property.simpTableField].cast()
|
||||
|
||||
fun <V : Any> field(): FieldProxy<T, V> = fieldProxyInstance.cast()
|
||||
fun <V : Any> field(property: KProperty0<*>): Column<V> = fieldNameColumnMap[property.name].cast()
|
||||
|
||||
companion object {
|
||||
private val fieldProxyInstance = FieldProxy<Any, Any>()
|
||||
private val autoTableMap = ConcurrentHashMap<Class<*>, AutoTable<*>>()
|
||||
operator fun <T : Any> get(clazz: KClass<T>): AutoTable<T> = get(clazz.java)
|
||||
|
||||
operator fun <T : Any> get(clazz: Class<T>): AutoTable<T> {
|
||||
var autoTable = autoTableMap[clazz]
|
||||
if (autoTable == null) {
|
||||
synchronized(autoTableMap) {
|
||||
autoTable = AutoTable(clazz.kotlin)
|
||||
autoTableMap[clazz] = autoTable.cast()
|
||||
}
|
||||
}
|
||||
return autoTable.cast()
|
||||
}
|
||||
|
||||
class FieldProxy<T : Any, V : Any> {
|
||||
operator fun getValue(
|
||||
autoTable: AutoTable<T>,
|
||||
property: KProperty<*>
|
||||
): Column<V> = autoTable.fieldNameColumnMap[property.name].cast()
|
||||
}
|
||||
}
|
||||
}
|
@ -14,7 +14,7 @@ object TypeAdapterFactory {
|
||||
private val adapterMap = ConcurrentSkipListMap<Int, ConcurrentLinkedQueue<TypeAdapter<*>>>()
|
||||
|
||||
init {
|
||||
scanPackage("cn.tursom.database.typeadapter")
|
||||
scanPackage(TypeAdapterFactory::class.java.`package`.name + ".typeadapter")
|
||||
}
|
||||
|
||||
private fun getAdapterQueue(level: Int): ConcurrentLinkedQueue<TypeAdapter<*>> {
|
||||
|
@ -3,4 +3,4 @@ package cn.tursom.database.annotations
|
||||
@MustBeDocumented
|
||||
@Retention(AnnotationRetention.RUNTIME)
|
||||
@Target(AnnotationTarget.PROPERTY)
|
||||
annotation class TableField(val fieldName: String)
|
||||
annotation class TableField(val fieldName: String, val exist: Boolean = true)
|
@ -2,18 +2,20 @@ package cn.tursom.core
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
class Disposable<T> {
|
||||
class Disposable<T>(
|
||||
value: T? = null
|
||||
) {
|
||||
private var value = AtomicReference<T>()
|
||||
|
||||
init {
|
||||
if (value != null) {
|
||||
this.value.set(value)
|
||||
}
|
||||
}
|
||||
|
||||
fun set(value: T) {
|
||||
this.value.set(value)
|
||||
}
|
||||
|
||||
fun get(): T? {
|
||||
val value = value.get() ?: return null
|
||||
return if (this.value.compareAndSet(value, null)) {
|
||||
value
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
fun get(): T? = value.getAndSet(null)
|
||||
}
|
Loading…
Reference in New Issue
Block a user