diff --git a/mirai-core/src/commonMain/kotlin/AbstractBot.kt b/mirai-core/src/commonMain/kotlin/AbstractBot.kt index 751e14885..a2a1dcbbd 100644 --- a/mirai-core/src/commonMain/kotlin/AbstractBot.kt +++ b/mirai-core/src/commonMain/kotlin/AbstractBot.kt @@ -28,6 +28,7 @@ import net.mamoe.mirai.event.events.BotEvent import net.mamoe.mirai.event.events.BotOfflineEvent import net.mamoe.mirai.internal.network.DefaultServerList import net.mamoe.mirai.internal.network.handler.NetworkHandler +import net.mamoe.mirai.internal.network.handler.ServerList import net.mamoe.mirai.supervisorJob import net.mamoe.mirai.utils.* import kotlin.coroutines.CoroutineContext @@ -170,7 +171,8 @@ internal abstract class AbstractBot constructor( // network /////////////////////////////////////////////////////////////////////////// - internal val serverList: MutableList> = mutableListOf() + internal val serverList: MutableList> = mutableListOf() // TODO: 2021/4/16 remove old + internal val serverListNew = ServerList() // TODO: 2021/4/16 load server list from cache (add a provider) // TODO: 2021/4/14 handle serverList diff --git a/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt b/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt index fdc410149..fa6f12343 100644 --- a/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt +++ b/mirai-core/src/commonMain/kotlin/QQAndroidBot.kt @@ -21,10 +21,8 @@ import net.mamoe.mirai.internal.contact.info.FriendInfoImpl import net.mamoe.mirai.internal.contact.info.StrangerInfoImpl import net.mamoe.mirai.internal.contact.uin import net.mamoe.mirai.internal.network.* -import net.mamoe.mirai.internal.network.handler.BdhSessionSyncer -import net.mamoe.mirai.internal.network.handler.NetworkHandler -import net.mamoe.mirai.internal.network.handler.NetworkHandlerContextImpl -import net.mamoe.mirai.internal.network.handler.impl.netty.NettyNetworkHandler +import net.mamoe.mirai.internal.network.handler.* +import net.mamoe.mirai.internal.network.handler.impl.netty.NettyNetworkHandlerFactory import net.mamoe.mirai.internal.network.net.protocol.SsoContext import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType @@ -36,7 +34,6 @@ import net.mamoe.mirai.internal.utils.friendCacheFile import net.mamoe.mirai.internal.utils.io.serialization.toByteArray import net.mamoe.mirai.utils.* import java.io.File -import java.net.InetSocketAddress import kotlin.contracts.contract import kotlin.coroutines.CoroutineContext @@ -165,10 +162,11 @@ internal class QQAndroidBot constructor( } override fun createNetworkHandler(coroutineContext: CoroutineContext): NetworkHandler { - return NettyNetworkHandler( - NetworkHandlerContextImpl(this, this), - InetSocketAddress("123", 1) // TODO: 2021/4/14 address - ) // TODO: 2021/4/14 + val context = NetworkHandlerContextImpl(this, this) + return SelectorNetworkHandler( + context, + FactoryKeepAliveNetworkHandlerSelector(NettyNetworkHandlerFactory, serverListNew, context) + ) // We can move the factory to configuration but this is not necessary for now. } @JvmField diff --git a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt index bb250fc0b..33f173ae9 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/NetworkHandler.kt @@ -9,7 +9,6 @@ package net.mamoe.mirai.internal.network.handler -import kotlinx.atomicfu.atomic import net.mamoe.mirai.Bot import net.mamoe.mirai.internal.QQAndroidBot import net.mamoe.mirai.internal.network.Packet @@ -33,8 +32,6 @@ internal interface NetworkHandlerContext { val logger: MiraiLogger val ssoContext: SsoContext val configuration: BotConfiguration - - fun getNextAddress(): SocketAddress // FIXME: 2021/4/14 } internal class NetworkHandlerContextImpl( @@ -44,10 +41,6 @@ internal class NetworkHandlerContextImpl( override val configuration: BotConfiguration get() = bot.configuration - override fun getNextAddress(): SocketAddress { - TODO("Not yet implemented") - } - override val logger: MiraiLogger by lazy { configuration.networkLoggerSupplier(bot) } } @@ -99,9 +92,12 @@ internal interface NetworkHandler { } /** - * Attempts to resume the connection. Throws no exception but changes [state] + * Attempts to resume the connection. + * + * May throw exception that had caused current state to fail. * @see State */ + @Throws(Exception::class) suspend fun resumeConnection() @@ -168,80 +164,4 @@ internal interface NetworkHandlerFactory { * Create an instance of [H]. The returning [H] has [NetworkHandler.state] of [State.INITIALIZED] */ fun create(context: NetworkHandlerContext, address: SocketAddress): H -} - -/** - * A lazy stateful selector of [NetworkHandler]. - * - * - Calls [factory.create][NetworkHandlerFactory.create] to create [NetworkHandler]s. - * - Re-initialize [NetworkHandler] instances if the old one is dead. - * - Suspends requests when connection is not available. - * - * No connection is created until first invocation of [getResumedInstance], - * and new connections are created only when calling [getResumedInstance] if the old connection was dead. - */ -internal abstract class NetworkHandlerSelector { - /** - * Returns an instance immediately without suspension, or `null` if instance not ready. - * @see awaitResumeInstance - */ - abstract fun getResumedInstance(): H? - - /** - * Returns an alive [NetworkHandler], or suspends the coroutine until the connection has been made again. - */ - abstract suspend fun awaitResumeInstance(): H -} - -// TODO: 2021/4/14 better naming -internal abstract class AutoReconnectNetworkHandlerSelector : NetworkHandlerSelector() { - private val current = atomic(null) - - protected abstract fun createInstance(): H - - final override fun getResumedInstance(): H? = current.value - - final override tailrec suspend fun awaitResumeInstance(): H { - val current = getResumedInstance() - return if (current != null) { - when (current.state) { - State.OK -> current - State.CLOSED -> { - this.current.compareAndSet(current, null) // invalidate the instance and try again. - awaitResumeInstance() - } - else -> { - current.resumeConnection() // try to advance state. - awaitResumeInstance() - } - } - } else { - this.current.compareAndSet(current, createInstance()) - awaitResumeInstance() - } - } -} - -/** - * Delegates [NetworkHandler] calls to instance returned by [NetworkHandlerSelector.awaitResumeInstance]. - */ -internal class SelectorNetworkHandler( - override val context: NetworkHandlerContext, - private val selector: NetworkHandlerSelector<*> -) : NetworkHandler { - private suspend inline fun instance(): NetworkHandler = selector.awaitResumeInstance() - - override val state: State get() = selector.getResumedInstance()?.state ?: State.INITIALIZED - - override suspend fun resumeConnection() { - instance() // the selector will resume connection for us. - } - - override suspend fun sendAndExpect(packet: OutgoingPacket, timeout: Long, attempts: Int) = - instance().sendAndExpect(packet, timeout, attempts) - - override suspend fun sendWithoutExpect(packet: OutgoingPacket) = instance().sendWithoutExpect(packet) - override fun close() { - selector.getResumedInstance()?.close() - } } \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/network/handler/SelectorNetworkHandler.kt b/mirai-core/src/commonMain/kotlin/network/handler/SelectorNetworkHandler.kt new file mode 100644 index 000000000..e6e4dffff --- /dev/null +++ b/mirai-core/src/commonMain/kotlin/network/handler/SelectorNetworkHandler.kt @@ -0,0 +1,122 @@ +/* + * Copyright 2019-2021 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + +package net.mamoe.mirai.internal.network.handler + +import kotlinx.atomicfu.atomic +import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket + +/** + * A proxy to [NetworkHandler] that delegates calls to instance returned by [NetworkHandlerSelector.awaitResumeInstance]. + * + * [NetworkHandlerSelector.awaitResumeInstance] is called everytime when an operation in [NetworkHandler] is called. + * + * This is useful to implement a delegation of [NetworkHandler]. The functionality of *selection* is provided by the strategy [selector][NetworkHandlerSelector]. + * @see NetworkHandlerSelector + */ +internal class SelectorNetworkHandler( + override val context: NetworkHandlerContext, // impl notes: may consider to move into function member. + private val selector: NetworkHandlerSelector<*>, +) : NetworkHandler { + private suspend inline fun instance(): NetworkHandler = selector.awaitResumeInstance() + + override val state: NetworkHandler.State + get() = selector.getResumedInstance()?.state ?: NetworkHandler.State.INITIALIZED + + override suspend fun resumeConnection() { + instance() // the selector will resume connection for us. + } + + override suspend fun sendAndExpect(packet: OutgoingPacket, timeout: Long, attempts: Int) = + instance().sendAndExpect(packet, timeout, attempts) + + override suspend fun sendWithoutExpect(packet: OutgoingPacket) = instance().sendWithoutExpect(packet) + override fun close() { + selector.getResumedInstance()?.close() + } +} + +internal class ExceptionInSelectorResumeException( + cause: Throwable +) : RuntimeException(cause) + +/** + * A lazy stateful selector of [NetworkHandler]. This is used as a director([selector][SelectorNetworkHandler.selector]) to [SelectorNetworkHandler]. + */ +internal interface NetworkHandlerSelector { + /** + * Returns an instance immediately without suspension, or `null` if instance not ready. + * + * This function should not throw any exception. + * @see awaitResumeInstance + */ + fun getResumedInstance(): H? + + /** + * Returns an alive [NetworkHandler], or suspends the coroutine until the connection has been made again. + * + * This function may throw exceptions, which would be propagated to the original caller of [SelectorNetworkHandler.resumeConnection]. + */ + suspend fun awaitResumeInstance(): H +} + +/** + * A lazy stateful implementation of [NetworkHandlerSelector]. + * + * - Calls [factory.create][NetworkHandlerFactory.create] to create [NetworkHandler]s. + * - Re-initialize [NetworkHandler] instances if the old one is dead. + * - Suspends requests when connection is not available. + * + * No connection is created until first invocation of [getResumedInstance], + * and new connections are created only when calling [getResumedInstance] if the old connection was dead. + */ +// may be replaced with a better name. +internal abstract class AbstractKeepAliveNetworkHandlerSelector : NetworkHandlerSelector { + private val current = atomic(null) + + protected abstract fun createInstance(): H + + final override fun getResumedInstance(): H? = current.value + + // TODO: 2021/4/16 add test for AbstractKeepAliveNetworkHandlerSelector + final override tailrec suspend fun awaitResumeInstance(): H { + val current = getResumedInstance() + return if (current != null) { + when (current.state) { + NetworkHandler.State.OK -> current + NetworkHandler.State.CLOSED -> { + this.current.compareAndSet(current, null) // invalidate the instance and try again. + awaitResumeInstance() + } + else -> { + current.resumeConnection() // try to advance state. + awaitResumeInstance() + } + } + } else { + this.current.compareAndSet(current, createInstance()) + awaitResumeInstance() + } + } +} + +/** + * [AbstractKeepAliveNetworkHandlerSelector] implementation delegating [createInstance] to [factory] + */ +internal class FactoryKeepAliveNetworkHandlerSelector( + private val factory: NetworkHandlerFactory, + private val serverList: ServerList, + private val context: NetworkHandlerContext, +) : AbstractKeepAliveNetworkHandlerSelector() { + override fun createInstance(): H = + factory.create(context, serverList.pollCurrent()?.toSocketAddress() ?: throw NoServerAvailableException()) +} + +internal class NoServerAvailableException : + NoSuchElementException("No server available. (Failed to connect to any of the servers)") \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/network/handler/ServerList.kt b/mirai-core/src/commonMain/kotlin/network/handler/ServerList.kt new file mode 100644 index 000000000..1cffc180b --- /dev/null +++ b/mirai-core/src/commonMain/kotlin/network/handler/ServerList.kt @@ -0,0 +1,87 @@ +/* + * Copyright 2019-2021 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + +package net.mamoe.mirai.internal.network.handler + +import java.net.InetSocketAddress +import java.util.* + +internal data class ServerAddress( + val host: String, + val port: Int +) { + init { + require(port >= 0) { "port must be positive: '$port'" } + require(host.isNotBlank()) { "host is invalid: '$host'" } + } + + fun toSocketAddress(): InetSocketAddress = InetSocketAddress.createUnresolved(host, port) +} + +/** + * Queue of servers. Pop each time when trying to connect. + */ +internal class ServerList( + initial: Collection = emptyList() +) { + @Volatile + private var preferred: Set = DefaultServerList + + @Volatile + private var current: Queue = ArrayDeque(initial) + + @Synchronized + fun setPreferred(list: Collection) { + require(list.isNotEmpty()) { "list cannot be empty." } + preferred = list.toSet() + } + + init { + refresh() + } + + @Synchronized + fun refresh() { + current = preferred.toCollection(ArrayDeque(current.size)) + check(current.isNotEmpty()) { + "Internal error: failed to fill server list. No server available." + } + } + + /** + * [Poll][Queue.poll] from current address list. Returns `null` if current address list is empty. + */ + @Synchronized + fun pollCurrent(): ServerAddress? { + return current.poll() + } + + /** + * [Poll][Queue.poll] from current address list, before which the list is filled with preferred addresses or default list if empty. + */ + @Synchronized + fun pollAny(): ServerAddress { + if (current.isEmpty()) refresh() + return current.remove() + } + + companion object { + internal val DefaultServerList: Set = + """msfwifi.3g.qq.com:8080, 14.215.138.110:8080, 113.96.12.224:8080, + |157.255.13.77:14000, 120.232.18.27:443, + |183.3.235.162:14000, 163.177.89.195:443, 183.232.94.44:80, + |203.205.255.224:8080, 203.205.255.221:8080""".trimMargin() + .split(", ", "\n").filterNot(String::isBlank) + .map { + val host = it.substringBefore(':') + val port = it.substringAfter(':').toInt() + ServerAddress(host, port) + }.shuffled().toMutableSet() + } +} \ No newline at end of file diff --git a/mirai-core/src/commonMain/kotlin/network/handler/impl/NetworkHandlerSupport.kt b/mirai-core/src/commonMain/kotlin/network/handler/impl/NetworkHandlerSupport.kt index 6d1f6ce87..bad42b0e0 100644 --- a/mirai-core/src/commonMain/kotlin/network/handler/impl/NetworkHandlerSupport.kt +++ b/mirai-core/src/commonMain/kotlin/network/handler/impl/NetworkHandlerSupport.kt @@ -122,6 +122,10 @@ internal abstract class NetworkHandlerSupport( protected abstract inner class BaseStateImpl( val correspondingState: NetworkHandler.State, ) : CoroutineScope by CoroutineScope(coroutineContext + SupervisorJob(coroutineContext.job)) { + + /** + * May throw any exception that caused the state to fail. + */ @Throws(Exception::class) abstract suspend fun resumeConnection() } diff --git a/mirai-core/src/commonTest/kotlin/network/ServerListTest.kt b/mirai-core/src/commonTest/kotlin/network/ServerListTest.kt new file mode 100644 index 000000000..1b1217925 --- /dev/null +++ b/mirai-core/src/commonTest/kotlin/network/ServerListTest.kt @@ -0,0 +1,68 @@ +/* + * Copyright 2019-2021 Mamoe Technologies and contributors. + * + * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证. + * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link. + * + * https://github.com/mamoe/mirai/blob/master/LICENSE + */ + +package net.mamoe.mirai.internal.network + +import net.mamoe.mirai.internal.network.handler.ServerAddress +import net.mamoe.mirai.internal.network.handler.ServerList +import kotlin.test.* + +internal class ServerListTest { + + @Test + fun canInitializeDefaults() { + assertNotEquals(0, ServerList.DefaultServerList.size) + } + + @Test + fun `can poll current for initial`() { + assertNotNull(ServerList().pollCurrent()) + } + + @Test + fun `not empty for initial`() { + assertNotNull(ServerList().pollAny()) + } + + @Test + fun `poll current will end with null`() { + val instance = ServerList() + repeat(100) { + instance.pollCurrent() + } + assertNull(instance.pollCurrent()) + } + + @Test + fun `poll any is always not null`() { + val instance = ServerList() + repeat(100) { + instance.pollAny() + } + assertNotNull(instance.pollAny()) + } + + @Test + fun `preferred cannot be empty`() { + assertFailsWith { + ServerList().setPreferred(emptyList()) + } + } + + @Test + fun `use preferred`() { + val instance = ServerList() + val addr = ServerAddress("test", 1) + instance.setPreferred(listOf(addr)) + repeat(100) { + instance.pollAny() + } + assertSame(addr, instance.pollAny()) + } +} \ No newline at end of file