1
0
mirror of https://github.com/mamoe/mirai.git synced 2025-03-30 09:50:12 +08:00

[core] Introduce Flow<T>.toStream() for simply implement Streamable ()

* [core] Introduce `Flow<T>.toStream()` for simply implement `Streamable`

* Drop `unintercepted` support

* fix ci

* update test
This commit is contained in:
微莹·纤绫 2022-10-31 21:09:58 +08:00 committed by GitHub
parent e47b205e0d
commit 4abb3e3703
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 305 additions and 4 deletions
mirai-core-api
compatibility-validation
android/api
jvm/api
src/jvmBaseMain/kotlin/utils
mirai-core-utils/src
jvmBaseMain/kotlin
jvmBaseTest/kotlin

View File

@ -6173,7 +6173,7 @@ public final class net/mamoe/mirai/utils/SingleFileLogger : net/mamoe/mirai/util
public abstract interface class net/mamoe/mirai/utils/Streamable {
public abstract fun asFlow ()Lkotlinx/coroutines/flow/Flow;
public abstract fun asStream ()Ljava/util/stream/Stream;
public fun asStream ()Ljava/util/stream/Stream;
public fun toList ()Ljava/util/List;
public fun toList (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun toList$suspendImpl (Lnet/mamoe/mirai/utils/Streamable;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

View File

@ -6169,7 +6169,7 @@ public final class net/mamoe/mirai/utils/StandardCharImageLoginSolver$Companion
public abstract interface class net/mamoe/mirai/utils/Streamable {
public abstract fun asFlow ()Lkotlinx/coroutines/flow/Flow;
public abstract fun asStream ()Ljava/util/stream/Stream;
public fun asStream ()Ljava/util/stream/Stream;
public fun toList ()Ljava/util/List;
public fun toList (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun toList$suspendImpl (Lnet/mamoe/mirai/utils/Streamable;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

View File

@ -11,12 +11,15 @@
package net.mamoe.mirai.utils
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.toList
import me.him188.kotlin.jvm.blocking.bridge.JvmBlockingBridge
import net.mamoe.mirai.contact.announcement.Announcement
import net.mamoe.mirai.contact.announcement.Announcements
import net.mamoe.mirai.utils.JdkStreamSupport.toStream
import java.util.stream.Stream
import kotlin.coroutines.EmptyCoroutineContext
/**
* 表示一个可以创建数据流 [Flow] [Stream] 的对象.
@ -38,8 +41,19 @@ public actual interface Streamable<T> {
* 创建一个能获取该群内所有 [T] [Stream].
*
* 实现细节: 为了适合 Java 调用, 实现类似为阻塞式的 [asFlow], 因此不建议在 Kotlin 使用. Kotlin 请使用 [asFlow].
*
* : 为了资源的正确释放, 使用 [Stream] 时需要使用 `try-with-resource`.
*
* ```java
* Streamable<String> tmp;
* try (var stream = tmp.asStream()) {
* System.out.println(stream.findFirst());
* }
* ```
*/
public fun asStream(): Stream<T>
public fun asStream(): Stream<T> = asFlow().toStream(
context = if (this is CoroutineScope) this.coroutineContext else EmptyCoroutineContext,
)
/**
* 获取所有 [T] 列表, 将全部 [T] 都加载后再返回.

View File

@ -13,9 +13,158 @@
package net.mamoe.mirai.utils
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.suspendCancellableCoroutine
import java.util.*
import java.util.Spliterators.AbstractSpliterator
import java.util.concurrent.ArrayBlockingQueue
import java.util.function.Consumer
import java.util.stream.Stream
import java.util.stream.StreamSupport
import kotlin.coroutines.*
import kotlin.streams.asStream
@JvmSynthetic
public inline fun <T> stream(@BuilderInference noinline block: suspend SequenceScope<T>.() -> Unit): Stream<T> =
sequence(block).asStream()
sequence(block).asStream()
@Suppress("RemoveExplicitTypeArguments")
public object JdkStreamSupport {
private class CompleteToken(val error: Throwable?) {
override fun toString(): String {
return "CompleteToken[$error]"
}
}
private val NULL_PLACEHOLDER = Symbol("null")!!
/*
Implementation:
Spliterator.tryAdvance():
- Resume coroutine
(*Wait for collector.emit*)
- Re-Suspend flow
- Put response to queue
- Fire response to jdk consumer
Completion & Exception caught:
(* Spliterator.tryAdvance(): Resume coroutine *)
(* No more values or exception thrown. *)
(* completion called *)
- Put the exception or the completion token to queue
- Throw exception in Spliterator.tryAdvance() if possible
- Return false in Spliterator.tryAdvance()
*/
public fun <T> Flow<T>.toStream(
context: CoroutineContext = EmptyCoroutineContext,
): Stream<T> {
val spliterator = FlowSpliterator(
flow = this,
coroutineContext = context,
)
return StreamSupport.stream(spliterator, false).onClose {
spliterator.cancelled = true
spliterator.nextStep?.let { nextStep ->
if (nextStep is CancellableContinuation<*>) {
nextStep.cancel()
} else {
nextStep.resumeWithException(CancellationException())
}
}
spliterator.nextStep = null
}
}
private class FlowSpliterator<T>(
private val flow: Flow<T>,
private val coroutineContext: CoroutineContext,
) : AbstractSpliterator<T>(
Long.MAX_VALUE, Spliterator.ORDERED or Spliterator.IMMUTABLE
) {
private val queue = ArrayBlockingQueue<Any?>(1)
private var completed = false
@JvmField
var cancelled = false
@JvmField
var nextStep: Continuation<Unit>? = run {
val completion = object : Continuation<Unit> {
override val context: CoroutineContext get() = coroutineContext
override fun resumeWith(result: Result<Unit>) {
nextStep = null
completed = true
queue.put(CompleteToken(result.exceptionOrNull()))
}
}
return@run (suspend {
flow.collect { item ->
suspendCancellableCoroutine<Unit> { cont ->
nextStep = cont
queue.put(boxValue(item))
}
}
}).createCoroutine(completion)
}
private inline fun boxValue(value: Any?): Any {
return value ?: NULL_PLACEHOLDER
}
private fun unboxResponse(value: Any?, action: Consumer<in T>): Boolean {
if (value is CompleteToken) { // completion & exception caught
value.error?.let { throw boxError(it) }
completed = true
return false // no more value available
}
if (value === NULL_PLACEHOLDER) { // null
@Suppress("UNCHECKED_CAST")
action.accept(null as T)
} else {
@Suppress("UNCHECKED_CAST")
action.accept(value as T)
}
return true
}
override fun tryAdvance(action: Consumer<in T>): Boolean {
if (completed) return false
if (queue.isNotEmpty()) {
return unboxResponse(queue.take(), action)
}
if (cancelled) return false
val step = nextStep!!
nextStep = null
step.resume(Unit)
return unboxResponse(queue.take(), action)
}
}
private fun boxError(error: Throwable): Throwable {
return ExceptionInFlowException(error)
}
// @PublishedApi
public open class ExceptionInFlowException : RuntimeException {
public constructor() : super()
public constructor(msg: String?) : super(msg)
public constructor(cause: Throwable?) : super(cause)
public constructor(msg: String?, cause: Throwable?) : super(msg, cause)
}
}

View File

@ -0,0 +1,138 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.utils
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.test.runTest
import net.mamoe.mirai.utils.JdkStreamSupport.toStream
import org.junit.jupiter.api.Test
import java.util.*
import java.util.stream.Collectors
import java.util.stream.Stream
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.test.*
internal class KotlinFlowToJdkStreamTest {
private fun <T> Stream<T>.collectList(): List<T> = use { collect(Collectors.toList()) }
@Test
internal fun testFlowFinally() = runTest {
var finallyCalled = false
flow<Any?> {
try {
while (true) {
emit("")
}
} finally {
finallyCalled = true
}
}.toStream().use { it.findFirst() }
assertTrue { finallyCalled }
}
@Test
internal fun testNormally() = runTest {
flow<Any?> {
emit("1")
emit("5")
emit("2")
emit("3")
}.toStream().collectList().let {
assertEquals(listOf("1", "5", "2", "3"), it)
}
}
@Test
internal fun testSuspendInFlow() = runTest {
flow<Any?> {
emit("1")
yield() // Suspended
emit("2")
}.toStream(context = Dispatchers.IO).collectList().let {
assertEquals(listOf("1", "2"), it)
}
}
@Test
internal fun testCounter() = runTest {
var counter = 0
flow<Any?> {
while (true) {
counter++
emit(counter)
}
}.toStream().use { stream ->
stream.limit(5).forEach { }
}
assertEquals(5, counter)
}
@Test
internal fun testChannelFlow() = runTest {
channelFlow<Any?> {
send(514)
launch { send(94481) }
launch { send(94481) }
launch { send(94481) }
launch { send(94481) }
}.toStream().collectList().let {
assertEquals(listOf(514, 94481, 94481, 94481, 94481), it)
}
}
@Test
internal fun testExceptionCaught() = runTest {
val msg = UUID.randomUUID().toString()
flow<Any> { error(msg) }.toStream().use { s ->
assertFails(msg) { s.findFirst() }.printStackTrace(System.out)
}
}
@Test
internal fun testErrorInLaunchedContext() = runTest {
lateinit var myError: Throwable
val msg = UUID.randomUUID().toString()
flow<Any> {
myError = Throwable(msg)
throw myError
}.toStream(
context = Dispatchers.IO,
).use { stream ->
assertFailsWith<RuntimeException>(msg) { stream.findFirst() }.let { err ->
assertSame(myError, err.cause)
assertTrue {
err.stackTrace.any { it.className == "net.mamoe.mirai.utils.JdkStreamSupport\$FlowSpliterator" && it.methodName == "tryAdvance" }
}
err.printStackTrace(System.out)
}
}
}
@Test
internal fun errorWillNotCancelJob() = runTest {
val scope = CoroutineScope(EmptyCoroutineContext)
val errmsg = UUID.randomUUID().toString()
flow<Any> { error(errmsg) }.toStream(
context = scope.coroutineContext
).use { assertFails(errmsg) { it.findFirst() } }
val job = scope.coroutineContext.job
assertTrue { job.isActive }
assertFalse { job.isCancelled }
assertFalse { job.isCompleted }
}
}