Implement NetworkHandlerSelector, serverList, and implement them into Bot

This commit is contained in:
Him188 2021-04-16 10:24:59 +08:00
parent ff80434d2d
commit 382c351e08
7 changed files with 295 additions and 94 deletions

View File

@ -28,6 +28,7 @@ import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.event.events.BotOfflineEvent
import net.mamoe.mirai.internal.network.DefaultServerList
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.ServerList
import net.mamoe.mirai.supervisorJob
import net.mamoe.mirai.utils.*
import kotlin.coroutines.CoroutineContext
@ -170,7 +171,8 @@ internal abstract class AbstractBot constructor(
// network
///////////////////////////////////////////////////////////////////////////
internal val serverList: MutableList<Pair<String, Int>> = mutableListOf()
internal val serverList: MutableList<Pair<String, Int>> = mutableListOf() // TODO: 2021/4/16 remove old
internal val serverListNew = ServerList() // TODO: 2021/4/16 load server list from cache (add a provider)
// TODO: 2021/4/14 handle serverList

View File

@ -21,10 +21,8 @@ import net.mamoe.mirai.internal.contact.info.FriendInfoImpl
import net.mamoe.mirai.internal.contact.info.StrangerInfoImpl
import net.mamoe.mirai.internal.contact.uin
import net.mamoe.mirai.internal.network.*
import net.mamoe.mirai.internal.network.handler.BdhSessionSyncer
import net.mamoe.mirai.internal.network.handler.NetworkHandler
import net.mamoe.mirai.internal.network.handler.NetworkHandlerContextImpl
import net.mamoe.mirai.internal.network.handler.impl.netty.NettyNetworkHandler
import net.mamoe.mirai.internal.network.handler.*
import net.mamoe.mirai.internal.network.handler.impl.netty.NettyNetworkHandlerFactory
import net.mamoe.mirai.internal.network.net.protocol.SsoContext
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketWithRespType
@ -36,7 +34,6 @@ import net.mamoe.mirai.internal.utils.friendCacheFile
import net.mamoe.mirai.internal.utils.io.serialization.toByteArray
import net.mamoe.mirai.utils.*
import java.io.File
import java.net.InetSocketAddress
import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext
@ -165,10 +162,11 @@ internal class QQAndroidBot constructor(
}
override fun createNetworkHandler(coroutineContext: CoroutineContext): NetworkHandler {
return NettyNetworkHandler(
NetworkHandlerContextImpl(this, this),
InetSocketAddress("123", 1) // TODO: 2021/4/14 address
) // TODO: 2021/4/14
val context = NetworkHandlerContextImpl(this, this)
return SelectorNetworkHandler(
context,
FactoryKeepAliveNetworkHandlerSelector(NettyNetworkHandlerFactory, serverListNew, context)
) // We can move the factory to configuration but this is not necessary for now.
}
@JvmField

View File

@ -9,7 +9,6 @@
package net.mamoe.mirai.internal.network.handler
import kotlinx.atomicfu.atomic
import net.mamoe.mirai.Bot
import net.mamoe.mirai.internal.QQAndroidBot
import net.mamoe.mirai.internal.network.Packet
@ -33,8 +32,6 @@ internal interface NetworkHandlerContext {
val logger: MiraiLogger
val ssoContext: SsoContext
val configuration: BotConfiguration
fun getNextAddress(): SocketAddress // FIXME: 2021/4/14
}
internal class NetworkHandlerContextImpl(
@ -44,10 +41,6 @@ internal class NetworkHandlerContextImpl(
override val configuration: BotConfiguration
get() = bot.configuration
override fun getNextAddress(): SocketAddress {
TODO("Not yet implemented")
}
override val logger: MiraiLogger by lazy { configuration.networkLoggerSupplier(bot) }
}
@ -99,9 +92,12 @@ internal interface NetworkHandler {
}
/**
* Attempts to resume the connection. Throws no exception but changes [state]
* Attempts to resume the connection.
*
* May throw exception that had caused current state to fail.
* @see State
*/
@Throws(Exception::class)
suspend fun resumeConnection()
@ -168,80 +164,4 @@ internal interface NetworkHandlerFactory<H : NetworkHandler> {
* Create an instance of [H]. The returning [H] has [NetworkHandler.state] of [State.INITIALIZED]
*/
fun create(context: NetworkHandlerContext, address: SocketAddress): H
}
/**
* A lazy stateful selector of [NetworkHandler].
*
* - Calls [factory.create][NetworkHandlerFactory.create] to create [NetworkHandler]s.
* - Re-initialize [NetworkHandler] instances if the old one is dead.
* - Suspends requests when connection is not available.
*
* No connection is created until first invocation of [getResumedInstance],
* and new connections are created only when calling [getResumedInstance] if the old connection was dead.
*/
internal abstract class NetworkHandlerSelector<H : NetworkHandler> {
/**
* Returns an instance immediately without suspension, or `null` if instance not ready.
* @see awaitResumeInstance
*/
abstract fun getResumedInstance(): H?
/**
* Returns an alive [NetworkHandler], or suspends the coroutine until the connection has been made again.
*/
abstract suspend fun awaitResumeInstance(): H
}
// TODO: 2021/4/14 better naming
internal abstract class AutoReconnectNetworkHandlerSelector<H : NetworkHandler> : NetworkHandlerSelector<H>() {
private val current = atomic<H?>(null)
protected abstract fun createInstance(): H
final override fun getResumedInstance(): H? = current.value
final override tailrec suspend fun awaitResumeInstance(): H {
val current = getResumedInstance()
return if (current != null) {
when (current.state) {
State.OK -> current
State.CLOSED -> {
this.current.compareAndSet(current, null) // invalidate the instance and try again.
awaitResumeInstance()
}
else -> {
current.resumeConnection() // try to advance state.
awaitResumeInstance()
}
}
} else {
this.current.compareAndSet(current, createInstance())
awaitResumeInstance()
}
}
}
/**
* Delegates [NetworkHandler] calls to instance returned by [NetworkHandlerSelector.awaitResumeInstance].
*/
internal class SelectorNetworkHandler(
override val context: NetworkHandlerContext,
private val selector: NetworkHandlerSelector<*>
) : NetworkHandler {
private suspend inline fun instance(): NetworkHandler = selector.awaitResumeInstance()
override val state: State get() = selector.getResumedInstance()?.state ?: State.INITIALIZED
override suspend fun resumeConnection() {
instance() // the selector will resume connection for us.
}
override suspend fun sendAndExpect(packet: OutgoingPacket, timeout: Long, attempts: Int) =
instance().sendAndExpect(packet, timeout, attempts)
override suspend fun sendWithoutExpect(packet: OutgoingPacket) = instance().sendWithoutExpect(packet)
override fun close() {
selector.getResumedInstance()?.close()
}
}

View File

@ -0,0 +1,122 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.handler
import kotlinx.atomicfu.atomic
import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
/**
* A proxy to [NetworkHandler] that delegates calls to instance returned by [NetworkHandlerSelector.awaitResumeInstance].
*
* [NetworkHandlerSelector.awaitResumeInstance] is called everytime when an operation in [NetworkHandler] is called.
*
* This is useful to implement a delegation of [NetworkHandler]. The functionality of *selection* is provided by the strategy [selector][NetworkHandlerSelector].
* @see NetworkHandlerSelector
*/
internal class SelectorNetworkHandler(
override val context: NetworkHandlerContext, // impl notes: may consider to move into function member.
private val selector: NetworkHandlerSelector<*>,
) : NetworkHandler {
private suspend inline fun instance(): NetworkHandler = selector.awaitResumeInstance()
override val state: NetworkHandler.State
get() = selector.getResumedInstance()?.state ?: NetworkHandler.State.INITIALIZED
override suspend fun resumeConnection() {
instance() // the selector will resume connection for us.
}
override suspend fun sendAndExpect(packet: OutgoingPacket, timeout: Long, attempts: Int) =
instance().sendAndExpect(packet, timeout, attempts)
override suspend fun sendWithoutExpect(packet: OutgoingPacket) = instance().sendWithoutExpect(packet)
override fun close() {
selector.getResumedInstance()?.close()
}
}
internal class ExceptionInSelectorResumeException(
cause: Throwable
) : RuntimeException(cause)
/**
* A lazy stateful selector of [NetworkHandler]. This is used as a director([selector][SelectorNetworkHandler.selector]) to [SelectorNetworkHandler].
*/
internal interface NetworkHandlerSelector<H : NetworkHandler> {
/**
* Returns an instance immediately without suspension, or `null` if instance not ready.
*
* This function should not throw any exception.
* @see awaitResumeInstance
*/
fun getResumedInstance(): H?
/**
* Returns an alive [NetworkHandler], or suspends the coroutine until the connection has been made again.
*
* This function may throw exceptions, which would be propagated to the original caller of [SelectorNetworkHandler.resumeConnection].
*/
suspend fun awaitResumeInstance(): H
}
/**
* A lazy stateful implementation of [NetworkHandlerSelector].
*
* - Calls [factory.create][NetworkHandlerFactory.create] to create [NetworkHandler]s.
* - Re-initialize [NetworkHandler] instances if the old one is dead.
* - Suspends requests when connection is not available.
*
* No connection is created until first invocation of [getResumedInstance],
* and new connections are created only when calling [getResumedInstance] if the old connection was dead.
*/
// may be replaced with a better name.
internal abstract class AbstractKeepAliveNetworkHandlerSelector<H : NetworkHandler> : NetworkHandlerSelector<H> {
private val current = atomic<H?>(null)
protected abstract fun createInstance(): H
final override fun getResumedInstance(): H? = current.value
// TODO: 2021/4/16 add test for AbstractKeepAliveNetworkHandlerSelector
final override tailrec suspend fun awaitResumeInstance(): H {
val current = getResumedInstance()
return if (current != null) {
when (current.state) {
NetworkHandler.State.OK -> current
NetworkHandler.State.CLOSED -> {
this.current.compareAndSet(current, null) // invalidate the instance and try again.
awaitResumeInstance()
}
else -> {
current.resumeConnection() // try to advance state.
awaitResumeInstance()
}
}
} else {
this.current.compareAndSet(current, createInstance())
awaitResumeInstance()
}
}
}
/**
* [AbstractKeepAliveNetworkHandlerSelector] implementation delegating [createInstance] to [factory]
*/
internal class FactoryKeepAliveNetworkHandlerSelector<H : NetworkHandler>(
private val factory: NetworkHandlerFactory<H>,
private val serverList: ServerList,
private val context: NetworkHandlerContext,
) : AbstractKeepAliveNetworkHandlerSelector<H>() {
override fun createInstance(): H =
factory.create(context, serverList.pollCurrent()?.toSocketAddress() ?: throw NoServerAvailableException())
}
internal class NoServerAvailableException :
NoSuchElementException("No server available. (Failed to connect to any of the servers)")

View File

@ -0,0 +1,87 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network.handler
import java.net.InetSocketAddress
import java.util.*
internal data class ServerAddress(
val host: String,
val port: Int
) {
init {
require(port >= 0) { "port must be positive: '$port'" }
require(host.isNotBlank()) { "host is invalid: '$host'" }
}
fun toSocketAddress(): InetSocketAddress = InetSocketAddress.createUnresolved(host, port)
}
/**
* Queue of servers. Pop each time when trying to connect.
*/
internal class ServerList(
initial: Collection<ServerAddress> = emptyList()
) {
@Volatile
private var preferred: Set<ServerAddress> = DefaultServerList
@Volatile
private var current: Queue<ServerAddress> = ArrayDeque(initial)
@Synchronized
fun setPreferred(list: Collection<ServerAddress>) {
require(list.isNotEmpty()) { "list cannot be empty." }
preferred = list.toSet()
}
init {
refresh()
}
@Synchronized
fun refresh() {
current = preferred.toCollection(ArrayDeque(current.size))
check(current.isNotEmpty()) {
"Internal error: failed to fill server list. No server available."
}
}
/**
* [Poll][Queue.poll] from current address list. Returns `null` if current address list is empty.
*/
@Synchronized
fun pollCurrent(): ServerAddress? {
return current.poll()
}
/**
* [Poll][Queue.poll] from current address list, before which the list is filled with preferred addresses or default list if empty.
*/
@Synchronized
fun pollAny(): ServerAddress {
if (current.isEmpty()) refresh()
return current.remove()
}
companion object {
internal val DefaultServerList: Set<ServerAddress> =
"""msfwifi.3g.qq.com:8080, 14.215.138.110:8080, 113.96.12.224:8080,
|157.255.13.77:14000, 120.232.18.27:443,
|183.3.235.162:14000, 163.177.89.195:443, 183.232.94.44:80,
|203.205.255.224:8080, 203.205.255.221:8080""".trimMargin()
.split(", ", "\n").filterNot(String::isBlank)
.map {
val host = it.substringBefore(':')
val port = it.substringAfter(':').toInt()
ServerAddress(host, port)
}.shuffled().toMutableSet()
}
}

View File

@ -122,6 +122,10 @@ internal abstract class NetworkHandlerSupport(
protected abstract inner class BaseStateImpl(
val correspondingState: NetworkHandler.State,
) : CoroutineScope by CoroutineScope(coroutineContext + SupervisorJob(coroutineContext.job)) {
/**
* May throw any exception that caused the state to fail.
*/
@Throws(Exception::class)
abstract suspend fun resumeConnection()
}

View File

@ -0,0 +1,68 @@
/*
* Copyright 2019-2021 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
package net.mamoe.mirai.internal.network
import net.mamoe.mirai.internal.network.handler.ServerAddress
import net.mamoe.mirai.internal.network.handler.ServerList
import kotlin.test.*
internal class ServerListTest {
@Test
fun canInitializeDefaults() {
assertNotEquals(0, ServerList.DefaultServerList.size)
}
@Test
fun `can poll current for initial`() {
assertNotNull(ServerList().pollCurrent())
}
@Test
fun `not empty for initial`() {
assertNotNull(ServerList().pollAny())
}
@Test
fun `poll current will end with null`() {
val instance = ServerList()
repeat(100) {
instance.pollCurrent()
}
assertNull(instance.pollCurrent())
}
@Test
fun `poll any is always not null`() {
val instance = ServerList()
repeat(100) {
instance.pollAny()
}
assertNotNull(instance.pollAny())
}
@Test
fun `preferred cannot be empty`() {
assertFailsWith<IllegalArgumentException> {
ServerList().setPreferred(emptyList())
}
}
@Test
fun `use preferred`() {
val instance = ServerList()
val addr = ServerAddress("test", 1)
instance.setPreferred(listOf(addr))
repeat(100) {
instance.pollAny()
}
assertSame(addr, instance.pollAny())
}
}