From 58b214bb5febe6b46551981cb8e867659a1f3ccf Mon Sep 17 00:00:00 2001 From: tursom Date: Wed, 27 May 2020 03:10:09 +0800 Subject: [PATCH] update --- .../niothread/loophandler/BossLoopHandler.kt | 6 +- .../MultithreadingBossLoopHandler.kt | 21 +++++++ .../tursom/socket/server/BuffedNioServer.kt | 5 +- .../server/MultithreadingNioLoopServer.kt | 62 +++++++++++++++++++ .../cn/tursom/socket/server/NioServer.kt | 5 +- 5 files changed, 94 insertions(+), 5 deletions(-) create mode 100644 AsyncSocket/src/main/kotlin/cn/tursom/niothread/loophandler/MultithreadingBossLoopHandler.kt create mode 100644 AsyncSocket/src/main/kotlin/cn/tursom/socket/server/MultithreadingNioLoopServer.kt diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/niothread/loophandler/BossLoopHandler.kt b/AsyncSocket/src/main/kotlin/cn/tursom/niothread/loophandler/BossLoopHandler.kt index 71b2f15..e62db72 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/niothread/loophandler/BossLoopHandler.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/niothread/loophandler/BossLoopHandler.kt @@ -5,12 +5,16 @@ import cn.tursom.niothread.NioProtocol import java.nio.channels.SelectionKey import java.nio.channels.ServerSocketChannel -class BossLoopHandler( +open class BossLoopHandler( private val protocol: NioProtocol, private val workerThread: NioThread? = null ) : (NioThread, SelectionKey) -> Unit { override fun invoke(nioThread: NioThread, key: SelectionKey) { val workerThread: NioThread = workerThread ?: nioThread + handle(nioThread, key, workerThread) + } + + fun handle(nioThread: NioThread, key: SelectionKey, workerThread: NioThread) { try { when { key.isAcceptable -> { diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/niothread/loophandler/MultithreadingBossLoopHandler.kt b/AsyncSocket/src/main/kotlin/cn/tursom/niothread/loophandler/MultithreadingBossLoopHandler.kt new file mode 100644 index 0000000..9cfa714 --- /dev/null +++ b/AsyncSocket/src/main/kotlin/cn/tursom/niothread/loophandler/MultithreadingBossLoopHandler.kt @@ -0,0 +1,21 @@ +package cn.tursom.niothread.loophandler + +import cn.tursom.core.randomInt +import cn.tursom.niothread.NioThread +import cn.tursom.niothread.NioProtocol +import java.nio.channels.SelectionKey +import java.nio.channels.ServerSocketChannel + +class MultithreadingBossLoopHandler( + protocol: NioProtocol, + private val workerThread: List = emptyList() +) : BossLoopHandler(protocol, null) { + override fun invoke(nioThread: NioThread, key: SelectionKey) { + val workerThread: NioThread = if (workerThread.isEmpty()) { + nioThread + } else { + workerThread[randomInt(0, workerThread.size - 1)] + } + handle(nioThread, key, workerThread) + } +} \ No newline at end of file diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/BuffedNioServer.kt b/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/BuffedNioServer.kt index 230743a..834b19f 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/BuffedNioServer.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/BuffedNioServer.kt @@ -18,8 +18,9 @@ open class BuffedNioServer( val memoryPool: MemoryPool, backlog: Int = 50, coroutineScope: CoroutineScope = GlobalScope, + autoCloseSocket: Boolean = true, handler: suspend BufferedAsyncSocket.() -> Unit -) : NioServer(port, backlog, coroutineScope, { +) : NioServer(port, backlog, coroutineScope, autoCloseSocket, { MarkedMemoryPool(memoryPool).use { marked -> BufferedNioSocket(this, marked).handler() } @@ -30,12 +31,14 @@ open class BuffedNioServer( blockCount: Int = 128, backlog: Int = 50, coroutineScope: CoroutineScope = GlobalScope, + autoCloseSocket: Boolean = true, handler: suspend BufferedAsyncSocket.() -> Unit ) : this( port, ExpandableMemoryPool { DirectMemoryPool(blockSize, blockCount) }, backlog, coroutineScope, + autoCloseSocket, handler ) diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/MultithreadingNioLoopServer.kt b/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/MultithreadingNioLoopServer.kt new file mode 100644 index 0000000..6841dbf --- /dev/null +++ b/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/MultithreadingNioLoopServer.kt @@ -0,0 +1,62 @@ +package cn.tursom.socket.server + +import cn.tursom.niothread.NioProtocol +import cn.tursom.niothread.NioThread +import cn.tursom.niothread.WorkerLoopNioThread +import cn.tursom.niothread.loophandler.MultithreadingBossLoopHandler +import cn.tursom.niothread.loophandler.WorkerLoopHandler +import java.net.InetSocketAddress +import java.nio.channels.SelectionKey +import java.nio.channels.ServerSocketChannel +import java.util.concurrent.atomic.AtomicBoolean + +/** + * 工作在单线程上的 Nio 服务器。 + */ +@Suppress("MemberVisibilityCanBePrivate") +class MultithreadingNioLoopServer( + override val port: Int, + private val protocol: NioProtocol, + val backLog: Int = 50, + val workerThreads: Int = Runtime.getRuntime().availableProcessors(), + nioThreadFactory: ( + threadName: String, + workLoop: (thread: NioThread, key: SelectionKey) -> Unit + ) -> NioThread = { name, workLoop -> + WorkerLoopNioThread(name, workLoop = workLoop, daemon = false) + } +) : SocketServer { + private val listenChannel = ServerSocketChannel.open() + private val workerNioThread = Array(workerThreads) { + nioThreadFactory("nio-worker-$it", WorkerLoopHandler(protocol)) + }.asList() + private val bossNioThread = nioThreadFactory( + "nio-boss", MultithreadingBossLoopHandler( + protocol, + workerNioThread + ) + ) + private val started = AtomicBoolean(false) + + override fun run() { + if (started.compareAndSet(false, true)) { + listenChannel.socket().bind(InetSocketAddress(port), backLog) + listenChannel.configureBlocking(false) + bossNioThread.register(listenChannel, SelectionKey.OP_ACCEPT) {} + } + } + + override fun close() { + listenChannel.close() + workerNioThread.forEach(NioThread::close) + bossNioThread.close() + } + + protected fun finalize() { + close() + } + + companion object { + private const val TIMEOUT = 1000L + } +} \ No newline at end of file diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt b/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt index a51f7f2..5b25dfa 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt @@ -18,6 +18,7 @@ open class NioServer( override val port: Int, backlog: Int = 50, coroutineScope: CoroutineScope = GlobalScope, + var autoCloseSocket: Boolean = true, private val handler: suspend AsyncSocket.() -> Unit ) : SocketServer by NioLoopServer(port, object : NioProtocol by AsyncProtocol { override fun handleConnect(key: SelectionKey, nioThread: NioThread) { @@ -25,10 +26,8 @@ open class NioServer( val socket = NioSocket(key, nioThread) try { socket.handler() - } catch (e: Exception) { - Exception(e).printStackTrace() } finally { - try { + if (autoCloseSocket) try { socket.close() } catch (e: Exception) { }