From 83a81961cab1ee5b470482f920823307e269c16d Mon Sep 17 00:00:00 2001
From: Him188 <Him188@mamoe.net>
Date: Mon, 26 Apr 2021 21:04:11 +0800
Subject: [PATCH] Properly suspend coroutine is not available for send

---
 .../kotlin/network/handler/NetworkHandler.kt  |  2 ++
 .../network/handler/NetworkHandlerSupport.kt  |  3 ++-
 .../selector/SelectorNetworkHandler.kt        |  7 ++++-
 .../network/impl/netty/NettyNetworkHandler.kt | 26 +++++++++++++------
 .../kotlin/network/framework/testUtils.kt     |  8 ++++++
 .../KeepAliveNetworkHandlerSelectorTest.kt    |  8 ++++--
 6 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt
index 3c1281171..e502e5653 100644
--- a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt
+++ b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt
@@ -113,6 +113,8 @@ internal interface NetworkHandler : CoroutineScope {
     /**
      * Suspends the coroutine until [sendAndExpect] can be executed without suspension.
      *
+     * In other words, if this functions returns, it indicates that [state] is [State.LOADING] or [State.OK]
+     *
      * May throw exception that had caused current state to fail.
      * @see State
      */
diff --git a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt
index 108a8328f..082367ded 100644
--- a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt
+++ b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandlerSupport.kt
@@ -174,7 +174,8 @@ internal abstract class NetworkHandlerSupport(
 
     final override val state: NetworkHandler.State get() = _state.correspondingState
 
-    private var _stateChangedDeferred = CompletableDeferred<NetworkHandler.State>()
+    protected var _stateChangedDeferred = CompletableDeferred<NetworkHandler.State>()
+        private set
 
     /**
      * For suspension until a state. e.g login.
diff --git a/mirai-core/src/commonMain/kotlin/network/handler/selector/SelectorNetworkHandler.kt b/mirai-core/src/commonMain/kotlin/network/handler/selector/SelectorNetworkHandler.kt
index 36658989f..37612f527 100644
--- a/mirai-core/src/commonMain/kotlin/network/handler/selector/SelectorNetworkHandler.kt
+++ b/mirai-core/src/commonMain/kotlin/network/handler/selector/SelectorNetworkHandler.kt
@@ -23,9 +23,14 @@ import kotlin.coroutines.CoroutineContext
 /**
  * A proxy to [NetworkHandler] that delegates calls to instance returned by [NetworkHandlerSelector.awaitResumeInstance].
  *
+ * This is useful to implement a delegation of [NetworkHandler]. The functionality of *selection* is provided by the strategy [selector][NetworkHandlerSelector].
+ *
+ * ### Important notes
+ *
  * [NetworkHandlerSelector.awaitResumeInstance] is called everytime when an operation in [NetworkHandler] is called.
  *
- * This is useful to implement a delegation of [NetworkHandler]. The functionality of *selection* is provided by the strategy [selector][NetworkHandlerSelector].
+ * Before every [sendAndExpect] call, [resumeConnection] is invoked.
+ *
  * @see NetworkHandlerSelector
  */
 internal class SelectorNetworkHandler(
diff --git a/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt b/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt
index 0eb16d834..db665c357 100644
--- a/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt
+++ b/mirai-core/src/commonMain/kotlin/network/impl/netty/NettyNetworkHandler.kt
@@ -47,9 +47,11 @@ internal open class NettyNetworkHandler(
 
     private fun closeSuper(cause: Throwable?) = super.close(cause)
 
-    override suspend fun sendPacketImpl(packet: OutgoingPacket) {
+    final override tailrec suspend fun sendPacketImpl(packet: OutgoingPacket) {
         val state = _state as NettyState
-        state.sendPacketImpl(packet)
+        if (state.sendPacketImpl(packet)) return
+        _stateChangedDeferred.join() // wait for next state
+        return sendPacketImpl(packet)
     }
 
     override fun toString(): String {
@@ -169,12 +171,17 @@ internal open class NettyNetworkHandler(
     protected abstract inner class NettyState(
         correspondingState: State
     ) : BaseStateImpl(correspondingState) {
-        abstract suspend fun sendPacketImpl(packet: OutgoingPacket)
+        /**
+         * @return `true` if packet has been sent, `false` if state is not ready for send.
+         * @throws IllegalStateException if is [StateClosed].
+         */
+        abstract suspend fun sendPacketImpl(packet: OutgoingPacket): Boolean
     }
 
     protected inner class StateInitialized : NettyState(State.INITIALIZED) {
-        override suspend fun sendPacketImpl(packet: OutgoingPacket) {
-            error("Cannot send packet when connection is not set. (resumeConnection not called.)")
+        override suspend fun sendPacketImpl(packet: OutgoingPacket): Boolean {
+//            error("Cannot send packet when connection is not set. (resumeConnection not called.)")
+            return false
         }
 
         override suspend fun resumeConnection0() {
@@ -234,9 +241,10 @@ internal open class NettyNetworkHandler(
 
         override fun getCause(): Throwable? = collectiveExceptions.getLast()
 
-        override suspend fun sendPacketImpl(packet: OutgoingPacket) {
+        override suspend fun sendPacketImpl(packet: OutgoingPacket): Boolean {
             connection.await() // split line number
                 .writeAndFlush(packet)
+            return true
         }
 
         override suspend fun resumeConnection0() {
@@ -257,8 +265,9 @@ internal open class NettyNetworkHandler(
     protected inner class StateLoading(
         private val connection: NettyChannel
     ) : NettyState(State.LOADING) {
-        override suspend fun sendPacketImpl(packet: OutgoingPacket) {
+        override suspend fun sendPacketImpl(packet: OutgoingPacket): Boolean {
             connection.writeAndFlush(packet)
+            return true
         }
 
         private val configPush = this@NettyNetworkHandler.launch(CoroutineName("ConfigPush sync")) {
@@ -323,8 +332,9 @@ internal open class NettyNetworkHandler(
             context[KeyRefreshProcessor].keyRefreshLoop(this@NettyNetworkHandler)
         }
 
-        override suspend fun sendPacketImpl(packet: OutgoingPacket) {
+        override suspend fun sendPacketImpl(packet: OutgoingPacket): Boolean {
             connection.writeAndFlush(packet)
+            return true
         }
 
         override suspend fun resumeConnection0() {
diff --git a/mirai-core/src/commonTest/kotlin/network/framework/testUtils.kt b/mirai-core/src/commonTest/kotlin/network/framework/testUtils.kt
index f5de2bc20..72014becc 100644
--- a/mirai-core/src/commonTest/kotlin/network/framework/testUtils.kt
+++ b/mirai-core/src/commonTest/kotlin/network/framework/testUtils.kt
@@ -54,9 +54,17 @@ internal open class TestNetworkHandler(
         val resumeCount = AtomicInteger(0)
         val onResume get() = resumeDeferred.onJoin
 
+        @Synchronized
         override suspend fun resumeConnection0() {
             resumeCount.incrementAndGet()
             resumeDeferred.complete(Unit)
+            when (this.correspondingState) {
+                NetworkHandler.State.INITIALIZED -> {
+                    setState(NetworkHandler.State.CONNECTING)
+                }
+                else -> {
+                }
+            }
         }
     }
 
diff --git a/mirai-core/src/commonTest/kotlin/network/handler/KeepAliveNetworkHandlerSelectorTest.kt b/mirai-core/src/commonTest/kotlin/network/handler/KeepAliveNetworkHandlerSelectorTest.kt
index 17e05526c..7c4aa6aac 100644
--- a/mirai-core/src/commonTest/kotlin/network/handler/KeepAliveNetworkHandlerSelectorTest.kt
+++ b/mirai-core/src/commonTest/kotlin/network/handler/KeepAliveNetworkHandlerSelectorTest.kt
@@ -29,8 +29,12 @@ private class TestSelector(val createInstance0: () -> NetworkHandler) :
 internal class KeepAliveNetworkHandlerSelectorTest : AbstractMockNetworkHandlerTest() {
     @Test
     fun `can initialize instance`() {
-        val selector = TestSelector { createNetworkHandler().apply { setState(State.OK) } }
-        runBlockingUnit(timeout = 3.seconds) { selector.awaitResumeInstance() }
+        val selector = TestSelector {
+            createNetworkHandler().apply {
+                setState(State.OK)
+            }
+        }
+        runBlockingUnit(timeout = 1.seconds) { selector.awaitResumeInstance() }
         assertNotNull(selector.getResumedInstance())
     }