1
0
mirror of https://github.com/mamoe/mirai.git synced 2025-04-25 13:03:35 +08:00

[core] Move Latch and OnDemandValueScope to mirai-core-utils,

and rename OnDemandValueScope to OnDemandSendChannel and OnDemandReceiveChannel
This commit is contained in:
Him188 2023-04-22 12:42:35 +01:00
parent 36d90c57fe
commit 99f592d614
9 changed files with 177 additions and 137 deletions

View File

@ -0,0 +1,18 @@
/*
* Copyright 2019-2023 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.utils.channels
public class IllegalProducerStateException internal constructor(
private val state: ProducerState<*, *>,
message: String? = state.toString(),
cause: Throwable? = null,
) : IllegalStateException(message, cause) {
public val lastStateWasSucceed: Boolean get() = (state is ProducerState.Finished) && state.isSuccess
}

View File

@ -7,7 +7,7 @@
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.network.auth
package net.mamoe.mirai.utils.channels
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
@ -16,34 +16,26 @@ import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.cancel
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.UtilsLogger
import net.mamoe.mirai.utils.childScope
import net.mamoe.mirai.utils.debug
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.cancellation.CancellationException
internal class IllegalProducerStateException(
private val state: ProducerState<*, *>,
message: String? = state.toString(),
cause: Throwable? = null,
) : IllegalStateException(message, cause) {
val lastStateWasSucceed get() = (state is ProducerState.Finished) && state.isSuccess
}
internal class CoroutineOnDemandValueScope<T, V>(
internal class CoroutineOnDemandReceiveChannel<T, V>(
parentCoroutineContext: CoroutineContext,
private val logger: MiraiLogger,
private val producerCoroutine: suspend OnDemandProducerScope<T, V>.(initialTicket: T) -> Unit,
) : OnDemandConsumer<T, V> {
private val coroutineScope = parentCoroutineContext.childScope("CoroutineOnDemandValueScope")
private val logger: UtilsLogger,
private val producerCoroutine: suspend OnDemandSendChannel<T, V>.(initialTicket: T) -> Unit,
) : OnDemandReceiveChannel<T, V> {
private val coroutineScope = parentCoroutineContext.childScope("CoroutineOnDemandReceiveChannel")
private val state: AtomicRef<ProducerState<T, V>> = atomic(ProducerState.JustInitialized())
inner class Producer(
private val initialTicket: T,
) : OnDemandProducerScope<T, V> {
) : OnDemandSendChannel<T, V> {
init {
coroutineScope.launch {
try {
@ -132,7 +124,23 @@ internal class CoroutineOnDemandValueScope<T, V>(
override suspend fun receiveOrNull(): V? {
state.loop { state ->
when (state) {
is ProducerState.Producing -> {
is ProducerState.Consuming -> {
// value is ready, switch state to Consumed
if (compareAndSetState(state, ProducerState.Consumed(state.producer, state.producerLatch))) {
return try {
// This actually won't suspend, since the value is already completed
// Just to be error-tolerating
state.value.await()
} catch (e: Exception) {
throw ProducerFailureException(cause = e)
}
}
}
// note: actually, this case should be the first case (for code consistency) in `when`,
// but atomicfu 1.8.10 fails on this.
is ProducerState.Producing<T, V> -> {
// still producing value
state.deferred.await() // just wait for value, but does not return it.
@ -143,22 +151,6 @@ internal class CoroutineOnDemandValueScope<T, V>(
// Here we will loop again, to atomically switch to Consumed state.
}
is ProducerState.Consuming -> {
// value is ready, switch state to ProducerReady
if (compareAndSetState(
state,
ProducerState.Consumed(state.producer, state.producerLatch)
)
) {
return try {
state.value.await() // won't suspend, since value is already completed
} catch (e: Exception) {
throw ProducerFailureException(cause = e)
}
}
}
is ProducerState.Finished -> {
state.exception?.let { err ->
throw ProducerFailureException(cause = err)

View File

@ -0,0 +1,90 @@
/*
* Copyright 2019-2023 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.utils.channels
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import net.mamoe.mirai.utils.UtilsLogger
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.cancellation.CancellationException
/**
* 按需供给的 [Channel].
*/
public interface OnDemandSendChannel<T, V> {
/**
* 挂起协程, 直到 [OnDemandReceiveChannel] 期望接收一个 [V], 届时将 [value] 传递给 [OnDemandReceiveChannel.receiveOrNull], 成为其返回值.
*
* 若在调用 [emit] 时已经有 [OnDemandReceiveChannel] 正在等待, 则该 [OnDemandReceiveChannel] 协程会立即[恢复][Continuation.resumeWith].
*
* [OnDemandReceiveChannel] 已经[完结][OnDemandReceiveChannel.finish], [OnDemandSendChannel.emit] 会抛出 [IllegalProducerStateException].
*/
public suspend fun emit(value: V): T
/**
* 标记此 [OnDemandSendChannel] 在生产 [V] 的过程中出现错误.
*
* 这也会终止此 [OnDemandSendChannel], 随后 [OnDemandReceiveChannel.receiveOrNull] 将会抛出 [ProducerFailureException].
*/
public fun finishExceptionally(exception: Throwable)
/**
* 标记此 [OnDemandSendChannel] 已经没有更多 [V] 可生产.
*
* 随后 [OnDemandReceiveChannel.receiveOrNull] 将会抛出 [IllegalStateException].
*/
public fun finish()
}
/**
* 按需消费者.
*
* [ReceiveChannel] 不同, [OnDemandReceiveChannel] 只有在调用 [expectMore] 后才会让[生产者][OnDemandSendChannel] 开始生产下一个 [V].
*/
public interface OnDemandReceiveChannel<T, V> {
/**
* 挂起协程并等待从 [OnDemandSendChannel] [接收][OnDemandSendChannel.emit]一个 [V].
*
* 当此函数被多个线程 (协程) 同时调用时, 只有一个线程挂起并获得 [V], 其他线程将会
*
* @throws ProducerFailureException [OnDemandSendChannel.finishExceptionally] 时抛出.
* @throws CancellationException 当协程被取消时抛出
* @throws IllegalProducerStateException 当状态异常, 如未调用 [expectMore] 时抛出
*/
@Throws(ProducerFailureException::class, CancellationException::class)
public suspend fun receiveOrNull(): V?
/**
* 期待 [OnDemandSendChannel] 再生产一个 [V]. 期望生产后必须在之后调用 [receiveOrNull] [finish] 来消耗生产的 [V].
*
* 在成功发起期待后返回 `true`; [OnDemandSendChannel] 已经[完结][OnDemandSendChannel.finish] 时返回 `false`.
*
* @throws IllegalProducerStateException [expectMore] 被调用后, 没有调用 [receiveOrNull] 就又调用了 [expectMore] 时抛出
*/
public fun expectMore(ticket: T): Boolean
/**
* 标记此 [OnDemandReceiveChannel] 已经完结.
*
* 如果 [OnDemandSendChannel] 仍在运行, 将会 (正常地) 取消 [OnDemandSendChannel].
*
* 随后 [OnDemandSendChannel.emit] 将会抛出 [IllegalStateException].
*/
public fun finish()
}
public fun <T, V> OnDemandReceiveChannel(
parentCoroutineContext: CoroutineContext = EmptyCoroutineContext,
logger: UtilsLogger = UtilsLogger.noop(),
producerCoroutine: suspend OnDemandSendChannel<T, V>.(initialTicket: T) -> Unit,
): OnDemandReceiveChannel<T, V> = CoroutineOnDemandReceiveChannel(parentCoroutineContext, logger, producerCoroutine)

View File

@ -0,0 +1,15 @@
/*
* Copyright 2019-2023 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.utils.channels
public class ProducerFailureException(
override val message: String? = null,
override val cause: Throwable?
) : Exception()

View File

@ -7,11 +7,12 @@
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.network.auth
package net.mamoe.mirai.utils.channels
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import net.mamoe.mirai.utils.sync.Latch
import kotlin.coroutines.CoroutineContext
/**
@ -97,14 +98,14 @@ internal sealed interface ProducerState<T, V> {
}
sealed interface HasProducer<T, V> : ProducerState<T, V> {
val producer: OnDemandProducerScope<T, V>
val producer: OnDemandSendChannel<T, V>
}
class ProducerReady<T, V>(
launchProducer: () -> OnDemandProducerScope<T, V>,
launchProducer: () -> OnDemandSendChannel<T, V>,
) : HasProducer<T, V> {
// Lazily start the producer job since it's on-demand
override val producer: OnDemandProducerScope<T, V> by lazy(launchProducer) // `lazy` is synchronized
override val producer: OnDemandSendChannel<T, V> by lazy(launchProducer) // `lazy` is synchronized
fun startProducerIfNotYet() {
producer
@ -114,18 +115,18 @@ internal sealed interface ProducerState<T, V> {
}
class Producing<T, V>(
override val producer: OnDemandProducerScope<T, V>,
override val producer: OnDemandSendChannel<T, V>,
val deferred: CompletableDeferred<V>,
) : HasProducer<T, V> {
override fun toString(): String = "Producing(deferred.completed=${deferred.isCompleted})"
}
class Consuming<T, V>(
override val producer: OnDemandProducerScope<T, V>,
override val producer: OnDemandSendChannel<T, V>,
val value: Deferred<V>,
parentCoroutineContext: CoroutineContext,
) : HasProducer<T, V> {
val producerLatch = Latch<T>(parentCoroutineContext)
val producerLatch: Latch<T> = Latch(parentCoroutineContext)
override fun toString(): String {
@OptIn(ExperimentalCoroutinesApi::class)
@ -136,17 +137,17 @@ internal sealed interface ProducerState<T, V> {
}
class Consumed<T, V>(
override val producer: OnDemandProducerScope<T, V>,
override val producer: OnDemandSendChannel<T, V>,
val producerLatch: Latch<T>
) : HasProducer<T, V> {
override fun toString(): String = "Consumed($producerLatch)"
}
class Finished<T, V>(
val previousState: ProducerState<T, V>,
private val previousState: ProducerState<T, V>,
val exception: Throwable?,
) : ProducerState<T, V> {
val isSuccess get() = exception == null
val isSuccess: Boolean get() = exception == null
fun createAlreadyFinishedException(cause: Throwable?): IllegalProducerStateException {
val exception = exception

View File

@ -7,31 +7,33 @@
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.network.auth
package net.mamoe.mirai.utils.sync
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.completeWith
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
internal interface Latch<T> {
public interface Latch<T> {
/**
* Suspends and waits to acquire the latch.
* @throws Throwable if [resumeWith] is called with [Result.Failure]
*/
suspend fun acquire(): T
public suspend fun acquire(): T
/**
* Release the latch, resuming the coroutines waiting for the latch.
*
* This function will return immediately unless a client is calling [acquire] concurrently.
*/
fun resumeWith(result: Result<T>)
public fun resumeWith(result: Result<T>)
}
internal fun <T> Latch(parentCoroutineContext: CoroutineContext): Latch<T> = LatchImpl(parentCoroutineContext)
public fun <T> Latch(parentCoroutineContext: CoroutineContext = EmptyCoroutineContext): Latch<T> =
LatchImpl(parentCoroutineContext)
private class LatchImpl<T>(
parentCoroutineContext: CoroutineContext

View File

@ -12,9 +12,12 @@ package net.mamoe.mirai.internal.network.auth
import net.mamoe.mirai.auth.BotAuthInfo
import net.mamoe.mirai.auth.BotAuthorization
import net.mamoe.mirai.internal.network.components.SsoProcessorImpl
import net.mamoe.mirai.internal.utils.asUtilsLogger
import net.mamoe.mirai.internal.utils.subLogger
import net.mamoe.mirai.utils.ExceptionCollector
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.channels.OnDemandReceiveChannel
import net.mamoe.mirai.utils.channels.ProducerFailureException
import net.mamoe.mirai.utils.debug
import net.mamoe.mirai.utils.verbose
import kotlin.coroutines.CoroutineContext
@ -35,8 +38,11 @@ internal class AuthControl(
) {
internal val exceptionCollector = ExceptionCollector()
private val userDecisions: OnDemandConsumer<Throwable?, SsoProcessorImpl.AuthMethod> =
CoroutineOnDemandValueScope(parentCoroutineContext, logger.subLogger("AuthControl/UserDecisions")) { _ ->
private val userDecisions: OnDemandReceiveChannel<Throwable?, SsoProcessorImpl.AuthMethod> =
OnDemandReceiveChannel(
parentCoroutineContext,
logger.subLogger("AuthControl/UserDecisions").asUtilsLogger()
) { _ ->
val sessionImpl = SafeBotAuthSession(this)
try {

View File

@ -1,86 +0,0 @@
/*
* Copyright 2019-2023 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.internal.network.auth
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlin.coroutines.Continuation
import kotlin.coroutines.cancellation.CancellationException
/**
* 按需供给的值制造器.
*/
internal interface OnDemandProducerScope<T, V> {
/**
* 挂起协程, 直到 [OnDemandConsumer] 期望接收一个 [V], 届时将 [value] 传递给 [OnDemandConsumer.receiveOrNull], 成为其返回值.
*
* 若在调用 [emit] 时已经有 [OnDemandConsumer] 正在等待, 则该 [OnDemandConsumer] 协程会立即[恢复][Continuation.resumeWith].
*
* [OnDemandConsumer] 已经[完结][OnDemandConsumer.finish], [OnDemandProducerScope.emit] 会抛出 [IllegalProducerStateException].
*/
suspend fun emit(value: V): T
/**
* 标记此 [OnDemandProducerScope] 在生产 [V] 的过程中出现错误.
*
* 这也会终止此 [OnDemandProducerScope], 随后 [OnDemandConsumer.receiveOrNull] 将会抛出 [ProducerFailureException].
*/
fun finishExceptionally(exception: Throwable)
/**
* 标记此 [OnDemandProducerScope] 已经没有更多 [V] 可生产.
*
* 随后 [OnDemandConsumer.receiveOrNull] 将会抛出 [IllegalStateException].
*/
fun finish()
}
/**
* 按需消费者.
*
* [ReceiveChannel] 不同, [OnDemandConsumer] 只有在调用 [expectMore] 后才会期待[生产者][OnDemandProducerScope] 生产下一个 [V].
*/
internal interface OnDemandConsumer<T, V> {
/**
* 挂起协程并等待从 [OnDemandProducerScope] [接收][OnDemandProducerScope.emit]一个 [V].
*
* 当此函数被多个线程 (协程) 同时调用时, 只有一个线程挂起并获得 [V], 其他线程将会
*
* @throws ProducerFailureException [OnDemandProducerScope.finishExceptionally] 时抛出.
* @throws CancellationException 当协程被取消时抛出
* @throws IllegalProducerStateException 当状态异常, 如未调用 [expectMore] 时抛出
*/
@Throws(ProducerFailureException::class, CancellationException::class)
suspend fun receiveOrNull(): V?
/**
* 期待 [OnDemandProducerScope] 再生产一个 [V]. 期望生产后必须在之后调用 [receiveOrNull] [finish] 来消耗生产的 [V].
*
* 在成功发起期待后返回 `true`; [OnDemandProducerScope] 已经[完结][OnDemandProducerScope.finish] 时返回 `false`.
*
* @throws IllegalProducerStateException [expectMore] 被调用后, 没有调用 [receiveOrNull] 就又调用了 [expectMore] 时抛出
*/
fun expectMore(ticket: T): Boolean
/**
* 标记此 [OnDemandConsumer] 已经完结.
*
* 如果 [OnDemandProducerScope] 仍在运行, 将会 (正常地) 取消 [OnDemandProducerScope].
*
* 随后 [OnDemandProducerScope.emit] 将会抛出 [IllegalStateException].
*/
fun finish()
}
internal class ProducerFailureException(
override val message: String? = null,
override val cause: Throwable?
) : Exception()

View File

@ -12,12 +12,14 @@ package net.mamoe.mirai.internal.network.auth
import net.mamoe.mirai.auth.BotAuthResult
import net.mamoe.mirai.internal.network.components.SsoProcessorImpl
import net.mamoe.mirai.utils.SecretsProtection
import net.mamoe.mirai.utils.channels.IllegalProducerStateException
import net.mamoe.mirai.utils.channels.OnDemandSendChannel
/**
* Implements [BotAuthSessionInternal] from API, to be called by the user, to receive user's decisions.
*/
internal class SafeBotAuthSession(
private val producer: OnDemandProducerScope<Throwable?, SsoProcessorImpl.AuthMethod>
private val producer: OnDemandSendChannel<Throwable?, SsoProcessorImpl.AuthMethod>
) : BotAuthSessionInternal() {
private val authResultImpl = object : BotAuthResult {}