From 4abb3e3703b8a84176a57b5417dae981549a91cf Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=BE=AE=E8=8E=B9=C2=B7=E7=BA=A4=E7=BB=AB?=
 <kar@kasukusakura.com>
Date: Mon, 31 Oct 2022 21:09:58 +0800
Subject: [PATCH] [core] Introduce `Flow<T>.toStream()` for simply implement
 `Streamable` (#2259)

* [core] Introduce `Flow<T>.toStream()` for simply implement `Streamable`

* Drop `unintercepted` support

* fix ci

* update test
---
 .../android/api/android.api                   |   2 +-
 .../compatibility-validation/jvm/api/jvm.api  |   2 +-
 .../jvmBaseMain/kotlin/utils/Streamable.kt    |  16 +-
 .../src/jvmBaseMain/kotlin/Streams.kt         | 151 +++++++++++++++++-
 .../kotlin/KotlinFlowToJdkStreamTest.kt       | 138 ++++++++++++++++
 5 files changed, 305 insertions(+), 4 deletions(-)
 create mode 100644 mirai-core-utils/src/jvmBaseTest/kotlin/KotlinFlowToJdkStreamTest.kt

diff --git a/mirai-core-api/compatibility-validation/android/api/android.api b/mirai-core-api/compatibility-validation/android/api/android.api
index 1ee4c9c52..ecb963c03 100644
--- a/mirai-core-api/compatibility-validation/android/api/android.api
+++ b/mirai-core-api/compatibility-validation/android/api/android.api
@@ -6173,7 +6173,7 @@ public final class net/mamoe/mirai/utils/SingleFileLogger : net/mamoe/mirai/util
 
 public abstract interface class net/mamoe/mirai/utils/Streamable {
 	public abstract fun asFlow ()Lkotlinx/coroutines/flow/Flow;
-	public abstract fun asStream ()Ljava/util/stream/Stream;
+	public fun asStream ()Ljava/util/stream/Stream;
 	public fun toList ()Ljava/util/List;
 	public fun toList (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static synthetic fun toList$suspendImpl (Lnet/mamoe/mirai/utils/Streamable;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
diff --git a/mirai-core-api/compatibility-validation/jvm/api/jvm.api b/mirai-core-api/compatibility-validation/jvm/api/jvm.api
index 48ce87022..43a9e77f1 100644
--- a/mirai-core-api/compatibility-validation/jvm/api/jvm.api
+++ b/mirai-core-api/compatibility-validation/jvm/api/jvm.api
@@ -6169,7 +6169,7 @@ public final class net/mamoe/mirai/utils/StandardCharImageLoginSolver$Companion
 
 public abstract interface class net/mamoe/mirai/utils/Streamable {
 	public abstract fun asFlow ()Lkotlinx/coroutines/flow/Flow;
-	public abstract fun asStream ()Ljava/util/stream/Stream;
+	public fun asStream ()Ljava/util/stream/Stream;
 	public fun toList ()Ljava/util/List;
 	public fun toList (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static synthetic fun toList$suspendImpl (Lnet/mamoe/mirai/utils/Streamable;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
diff --git a/mirai-core-api/src/jvmBaseMain/kotlin/utils/Streamable.kt b/mirai-core-api/src/jvmBaseMain/kotlin/utils/Streamable.kt
index fbb212836..3f9c4cca8 100644
--- a/mirai-core-api/src/jvmBaseMain/kotlin/utils/Streamable.kt
+++ b/mirai-core-api/src/jvmBaseMain/kotlin/utils/Streamable.kt
@@ -11,12 +11,15 @@
 
 package net.mamoe.mirai.utils
 
+import kotlinx.coroutines.CoroutineScope
 import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.toList
 import me.him188.kotlin.jvm.blocking.bridge.JvmBlockingBridge
 import net.mamoe.mirai.contact.announcement.Announcement
 import net.mamoe.mirai.contact.announcement.Announcements
+import net.mamoe.mirai.utils.JdkStreamSupport.toStream
 import java.util.stream.Stream
+import kotlin.coroutines.EmptyCoroutineContext
 
 /**
  * 表示一个可以创建数据流 [Flow] 和 [Stream] 的对象.
@@ -38,8 +41,19 @@ public actual interface Streamable<T> {
      * 创建一个能获取该群内所有 [T] 的 [Stream].
      *
      * 实现细节: 为了适合 Java 调用, 实现类似为阻塞式的 [asFlow], 因此不建议在 Kotlin 使用. 在 Kotlin 请使用 [asFlow].
+     *
+     * 注: 为了资源的正确释放, 使用 [Stream] 时需要使用 `try-with-resource`. 如
+     *
+     * ```java
+     * Streamable<String> tmp;
+     * try (var stream = tmp.asStream()) {
+     *     System.out.println(stream.findFirst());
+     * }
+     * ```
      */
-    public fun asStream(): Stream<T>
+    public fun asStream(): Stream<T> = asFlow().toStream(
+        context = if (this is CoroutineScope) this.coroutineContext else EmptyCoroutineContext,
+    )
 
     /**
      * 获取所有 [T] 列表, 将全部 [T] 都加载后再返回.
diff --git a/mirai-core-utils/src/jvmBaseMain/kotlin/Streams.kt b/mirai-core-utils/src/jvmBaseMain/kotlin/Streams.kt
index 78e15fd61..47a562df0 100644
--- a/mirai-core-utils/src/jvmBaseMain/kotlin/Streams.kt
+++ b/mirai-core-utils/src/jvmBaseMain/kotlin/Streams.kt
@@ -13,9 +13,158 @@
 
 package net.mamoe.mirai.utils
 
+import kotlinx.coroutines.CancellableContinuation
+import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.suspendCancellableCoroutine
+import java.util.*
+import java.util.Spliterators.AbstractSpliterator
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.function.Consumer
 import java.util.stream.Stream
+import java.util.stream.StreamSupport
+import kotlin.coroutines.*
 import kotlin.streams.asStream
 
 @JvmSynthetic
 public inline fun <T> stream(@BuilderInference noinline block: suspend SequenceScope<T>.() -> Unit): Stream<T> =
-    sequence(block).asStream()
\ No newline at end of file
+    sequence(block).asStream()
+
+@Suppress("RemoveExplicitTypeArguments")
+public object JdkStreamSupport {
+    private class CompleteToken(val error: Throwable?) {
+        override fun toString(): String {
+            return "CompleteToken[$error]"
+        }
+    }
+
+    private val NULL_PLACEHOLDER = Symbol("null")!!
+
+    /*
+    Implementation:
+
+    Spliterator.tryAdvance():
+        - Resume coroutine
+        (*Wait for collector.emit*)
+        - Re-Suspend flow
+        - Put response to queue
+
+        - Fire response to jdk consumer
+
+    Completion & Exception caught:
+        (* Spliterator.tryAdvance(): Resume coroutine *)
+        (* No more values or exception thrown. *)
+        (* completion called *)
+        - Put the exception or the completion token to queue
+
+        - Throw exception in Spliterator.tryAdvance() if possible
+        - Return false in Spliterator.tryAdvance()
+
+     */
+    public fun <T> Flow<T>.toStream(
+        context: CoroutineContext = EmptyCoroutineContext,
+    ): Stream<T> {
+        val spliterator = FlowSpliterator(
+            flow = this,
+            coroutineContext = context,
+        )
+
+        return StreamSupport.stream(spliterator, false).onClose {
+            spliterator.cancelled = true
+            spliterator.nextStep?.let { nextStep ->
+                if (nextStep is CancellableContinuation<*>) {
+                    nextStep.cancel()
+                } else {
+                    nextStep.resumeWithException(CancellationException())
+                }
+            }
+            spliterator.nextStep = null
+        }
+    }
+
+    private class FlowSpliterator<T>(
+        private val flow: Flow<T>,
+        private val coroutineContext: CoroutineContext,
+    ) : AbstractSpliterator<T>(
+        Long.MAX_VALUE, Spliterator.ORDERED or Spliterator.IMMUTABLE
+    ) {
+
+        private val queue = ArrayBlockingQueue<Any?>(1)
+        private var completed = false
+
+        @JvmField
+        var cancelled = false
+
+        @JvmField
+        var nextStep: Continuation<Unit>? = run {
+            val completion = object : Continuation<Unit> {
+                override val context: CoroutineContext get() = coroutineContext
+
+                override fun resumeWith(result: Result<Unit>) {
+                    nextStep = null
+                    completed = true
+                    queue.put(CompleteToken(result.exceptionOrNull()))
+                }
+
+            }
+            return@run (suspend {
+                flow.collect { item ->
+                    suspendCancellableCoroutine<Unit> { cont ->
+                        nextStep = cont
+                        queue.put(boxValue(item))
+                    }
+                }
+            }).createCoroutine(completion)
+        }
+
+        private inline fun boxValue(value: Any?): Any {
+            return value ?: NULL_PLACEHOLDER
+        }
+
+        private fun unboxResponse(value: Any?, action: Consumer<in T>): Boolean {
+            if (value is CompleteToken) { // completion & exception caught
+                value.error?.let { throw boxError(it) }
+                completed = true
+                return false // no more value available
+            }
+
+            if (value === NULL_PLACEHOLDER) { // null
+                @Suppress("UNCHECKED_CAST")
+                action.accept(null as T)
+            } else {
+                @Suppress("UNCHECKED_CAST")
+                action.accept(value as T)
+            }
+            return true
+        }
+
+        override fun tryAdvance(action: Consumer<in T>): Boolean {
+            if (completed) return false
+
+            if (queue.isNotEmpty()) {
+                return unboxResponse(queue.take(), action)
+            }
+            if (cancelled) return false
+
+            val step = nextStep!!
+            nextStep = null
+            step.resume(Unit)
+
+            return unboxResponse(queue.take(), action)
+        }
+
+    }
+
+    private fun boxError(error: Throwable): Throwable {
+        return ExceptionInFlowException(error)
+    }
+
+    // @PublishedApi
+    public open class ExceptionInFlowException : RuntimeException {
+        public constructor() : super()
+        public constructor(msg: String?) : super(msg)
+        public constructor(cause: Throwable?) : super(cause)
+        public constructor(msg: String?, cause: Throwable?) : super(msg, cause)
+    }
+}
+
diff --git a/mirai-core-utils/src/jvmBaseTest/kotlin/KotlinFlowToJdkStreamTest.kt b/mirai-core-utils/src/jvmBaseTest/kotlin/KotlinFlowToJdkStreamTest.kt
new file mode 100644
index 000000000..a90e58628
--- /dev/null
+++ b/mirai-core-utils/src/jvmBaseTest/kotlin/KotlinFlowToJdkStreamTest.kt
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2019-2022 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.utils
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.channelFlow
+import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.test.runTest
+import net.mamoe.mirai.utils.JdkStreamSupport.toStream
+import org.junit.jupiter.api.Test
+import java.util.*
+import java.util.stream.Collectors
+import java.util.stream.Stream
+import kotlin.coroutines.EmptyCoroutineContext
+import kotlin.test.*
+
+internal class KotlinFlowToJdkStreamTest {
+    private fun <T> Stream<T>.collectList(): List<T> = use { collect(Collectors.toList()) }
+
+    @Test
+    internal fun testFlowFinally() = runTest {
+        var finallyCalled = false
+
+        flow<Any?> {
+            try {
+                while (true) {
+                    emit("")
+                }
+            } finally {
+                finallyCalled = true
+            }
+        }.toStream().use { it.findFirst() }
+
+        assertTrue { finallyCalled }
+    }
+
+    @Test
+    internal fun testNormally() = runTest {
+        flow<Any?> {
+            emit("1")
+            emit("5")
+            emit("2")
+            emit("3")
+        }.toStream().collectList().let {
+            assertEquals(listOf("1", "5", "2", "3"), it)
+        }
+    }
+
+    @Test
+    internal fun testSuspendInFlow() = runTest {
+        flow<Any?> {
+            emit("1")
+            yield() // Suspended
+            emit("2")
+        }.toStream(context = Dispatchers.IO).collectList().let {
+            assertEquals(listOf("1", "2"), it)
+        }
+    }
+
+    @Test
+    internal fun testCounter() = runTest {
+        var counter = 0
+        flow<Any?> {
+            while (true) {
+                counter++
+                emit(counter)
+            }
+        }.toStream().use { stream ->
+            stream.limit(5).forEach { }
+        }
+        assertEquals(5, counter)
+    }
+
+    @Test
+    internal fun testChannelFlow() = runTest {
+        channelFlow<Any?> {
+            send(514)
+            launch { send(94481) }
+            launch { send(94481) }
+            launch { send(94481) }
+            launch { send(94481) }
+        }.toStream().collectList().let {
+            assertEquals(listOf(514, 94481, 94481, 94481, 94481), it)
+        }
+    }
+
+    @Test
+    internal fun testExceptionCaught() = runTest {
+        val msg = UUID.randomUUID().toString()
+        flow<Any> { error(msg) }.toStream().use { s ->
+            assertFails(msg) { s.findFirst() }.printStackTrace(System.out)
+        }
+    }
+
+    @Test
+    internal fun testErrorInLaunchedContext() = runTest {
+        lateinit var myError: Throwable
+        val msg = UUID.randomUUID().toString()
+
+        flow<Any> {
+            myError = Throwable(msg)
+            throw myError
+        }.toStream(
+            context = Dispatchers.IO,
+        ).use { stream ->
+            assertFailsWith<RuntimeException>(msg) { stream.findFirst() }.let { err ->
+                assertSame(myError, err.cause)
+                assertTrue {
+                    err.stackTrace.any { it.className == "net.mamoe.mirai.utils.JdkStreamSupport\$FlowSpliterator" && it.methodName == "tryAdvance" }
+                }
+
+                err.printStackTrace(System.out)
+            }
+        }
+    }
+
+    @Test
+    internal fun errorWillNotCancelJob() = runTest {
+        val scope = CoroutineScope(EmptyCoroutineContext)
+        val errmsg = UUID.randomUUID().toString()
+
+        flow<Any> { error(errmsg) }.toStream(
+            context = scope.coroutineContext
+        ).use { assertFails(errmsg) { it.findFirst() } }
+
+        val job = scope.coroutineContext.job
+        assertTrue { job.isActive }
+        assertFalse { job.isCancelled }
+        assertFalse { job.isCompleted }
+    }
+}
\ No newline at end of file