Properly suspend coroutine is not available for send

This commit is contained in:
Him188 2021-04-26 21:04:11 +08:00
parent eb80a4836b
commit 83a81961ca
6 changed files with 42 additions and 12 deletions

View File

@ -113,6 +113,8 @@ internal interface NetworkHandler : CoroutineScope {
/**
* Suspends the coroutine until [sendAndExpect] can be executed without suspension.
*
* In other words, if this functions returns, it indicates that [state] is [State.LOADING] or [State.OK]
*
* May throw exception that had caused current state to fail.
* @see State
*/

View File

@ -174,7 +174,8 @@ internal abstract class NetworkHandlerSupport(
final override val state: NetworkHandler.State get() = _state.correspondingState
private var _stateChangedDeferred = CompletableDeferred<NetworkHandler.State>()
protected var _stateChangedDeferred = CompletableDeferred<NetworkHandler.State>()
private set
/**
* For suspension until a state. e.g login.

View File

@ -23,9 +23,14 @@ import kotlin.coroutines.CoroutineContext
/**
* A proxy to [NetworkHandler] that delegates calls to instance returned by [NetworkHandlerSelector.awaitResumeInstance].
*
* This is useful to implement a delegation of [NetworkHandler]. The functionality of *selection* is provided by the strategy [selector][NetworkHandlerSelector].
*
* ### Important notes
*
* [NetworkHandlerSelector.awaitResumeInstance] is called everytime when an operation in [NetworkHandler] is called.
*
* This is useful to implement a delegation of [NetworkHandler]. The functionality of *selection* is provided by the strategy [selector][NetworkHandlerSelector].
* Before every [sendAndExpect] call, [resumeConnection] is invoked.
*
* @see NetworkHandlerSelector
*/
internal class SelectorNetworkHandler(

View File

@ -47,9 +47,11 @@ internal open class NettyNetworkHandler(
private fun closeSuper(cause: Throwable?) = super.close(cause)
override suspend fun sendPacketImpl(packet: OutgoingPacket) {
final override tailrec suspend fun sendPacketImpl(packet: OutgoingPacket) {
val state = _state as NettyState
state.sendPacketImpl(packet)
if (state.sendPacketImpl(packet)) return
_stateChangedDeferred.join() // wait for next state
return sendPacketImpl(packet)
}
override fun toString(): String {
@ -169,12 +171,17 @@ internal open class NettyNetworkHandler(
protected abstract inner class NettyState(
correspondingState: State
) : BaseStateImpl(correspondingState) {
abstract suspend fun sendPacketImpl(packet: OutgoingPacket)
/**
* @return `true` if packet has been sent, `false` if state is not ready for send.
* @throws IllegalStateException if is [StateClosed].
*/
abstract suspend fun sendPacketImpl(packet: OutgoingPacket): Boolean
}
protected inner class StateInitialized : NettyState(State.INITIALIZED) {
override suspend fun sendPacketImpl(packet: OutgoingPacket) {
error("Cannot send packet when connection is not set. (resumeConnection not called.)")
override suspend fun sendPacketImpl(packet: OutgoingPacket): Boolean {
// error("Cannot send packet when connection is not set. (resumeConnection not called.)")
return false
}
override suspend fun resumeConnection0() {
@ -234,9 +241,10 @@ internal open class NettyNetworkHandler(
override fun getCause(): Throwable? = collectiveExceptions.getLast()
override suspend fun sendPacketImpl(packet: OutgoingPacket) {
override suspend fun sendPacketImpl(packet: OutgoingPacket): Boolean {
connection.await() // split line number
.writeAndFlush(packet)
return true
}
override suspend fun resumeConnection0() {
@ -257,8 +265,9 @@ internal open class NettyNetworkHandler(
protected inner class StateLoading(
private val connection: NettyChannel
) : NettyState(State.LOADING) {
override suspend fun sendPacketImpl(packet: OutgoingPacket) {
override suspend fun sendPacketImpl(packet: OutgoingPacket): Boolean {
connection.writeAndFlush(packet)
return true
}
private val configPush = this@NettyNetworkHandler.launch(CoroutineName("ConfigPush sync")) {
@ -323,8 +332,9 @@ internal open class NettyNetworkHandler(
context[KeyRefreshProcessor].keyRefreshLoop(this@NettyNetworkHandler)
}
override suspend fun sendPacketImpl(packet: OutgoingPacket) {
override suspend fun sendPacketImpl(packet: OutgoingPacket): Boolean {
connection.writeAndFlush(packet)
return true
}
override suspend fun resumeConnection0() {

View File

@ -54,9 +54,17 @@ internal open class TestNetworkHandler(
val resumeCount = AtomicInteger(0)
val onResume get() = resumeDeferred.onJoin
@Synchronized
override suspend fun resumeConnection0() {
resumeCount.incrementAndGet()
resumeDeferred.complete(Unit)
when (this.correspondingState) {
NetworkHandler.State.INITIALIZED -> {
setState(NetworkHandler.State.CONNECTING)
}
else -> {
}
}
}
}

View File

@ -29,8 +29,12 @@ private class TestSelector(val createInstance0: () -> NetworkHandler) :
internal class KeepAliveNetworkHandlerSelectorTest : AbstractMockNetworkHandlerTest() {
@Test
fun `can initialize instance`() {
val selector = TestSelector { createNetworkHandler().apply { setState(State.OK) } }
runBlockingUnit(timeout = 3.seconds) { selector.awaitResumeInstance() }
val selector = TestSelector {
createNetworkHandler().apply {
setState(State.OK)
}
}
runBlockingUnit(timeout = 1.seconds) { selector.awaitResumeInstance() }
assertNotNull(selector.getResumedInstance())
}