0
0
mirror of https://github.com/tursom/TursomServer.git synced 2025-03-07 08:10:08 +08:00
This commit is contained in:
tursom 2020-05-27 03:10:09 +08:00
parent 5b00f97963
commit 58b214bb5f
5 changed files with 94 additions and 5 deletions

View File

@ -5,12 +5,16 @@ import cn.tursom.niothread.NioProtocol
import java.nio.channels.SelectionKey
import java.nio.channels.ServerSocketChannel
class BossLoopHandler(
open class BossLoopHandler(
private val protocol: NioProtocol,
private val workerThread: NioThread? = null
) : (NioThread, SelectionKey) -> Unit {
override fun invoke(nioThread: NioThread, key: SelectionKey) {
val workerThread: NioThread = workerThread ?: nioThread
handle(nioThread, key, workerThread)
}
fun handle(nioThread: NioThread, key: SelectionKey, workerThread: NioThread) {
try {
when {
key.isAcceptable -> {

View File

@ -0,0 +1,21 @@
package cn.tursom.niothread.loophandler
import cn.tursom.core.randomInt
import cn.tursom.niothread.NioThread
import cn.tursom.niothread.NioProtocol
import java.nio.channels.SelectionKey
import java.nio.channels.ServerSocketChannel
class MultithreadingBossLoopHandler(
protocol: NioProtocol,
private val workerThread: List<NioThread> = emptyList()
) : BossLoopHandler(protocol, null) {
override fun invoke(nioThread: NioThread, key: SelectionKey) {
val workerThread: NioThread = if (workerThread.isEmpty()) {
nioThread
} else {
workerThread[randomInt(0, workerThread.size - 1)]
}
handle(nioThread, key, workerThread)
}
}

View File

@ -18,8 +18,9 @@ open class BuffedNioServer(
val memoryPool: MemoryPool,
backlog: Int = 50,
coroutineScope: CoroutineScope = GlobalScope,
autoCloseSocket: Boolean = true,
handler: suspend BufferedAsyncSocket.() -> Unit
) : NioServer(port, backlog, coroutineScope, {
) : NioServer(port, backlog, coroutineScope, autoCloseSocket, {
MarkedMemoryPool(memoryPool).use { marked ->
BufferedNioSocket(this, marked).handler()
}
@ -30,12 +31,14 @@ open class BuffedNioServer(
blockCount: Int = 128,
backlog: Int = 50,
coroutineScope: CoroutineScope = GlobalScope,
autoCloseSocket: Boolean = true,
handler: suspend BufferedAsyncSocket.() -> Unit
) : this(
port,
ExpandableMemoryPool { DirectMemoryPool(blockSize, blockCount) },
backlog,
coroutineScope,
autoCloseSocket,
handler
)

View File

@ -0,0 +1,62 @@
package cn.tursom.socket.server
import cn.tursom.niothread.NioProtocol
import cn.tursom.niothread.NioThread
import cn.tursom.niothread.WorkerLoopNioThread
import cn.tursom.niothread.loophandler.MultithreadingBossLoopHandler
import cn.tursom.niothread.loophandler.WorkerLoopHandler
import java.net.InetSocketAddress
import java.nio.channels.SelectionKey
import java.nio.channels.ServerSocketChannel
import java.util.concurrent.atomic.AtomicBoolean
/**
* 工作在单线程上的 Nio 服务器
*/
@Suppress("MemberVisibilityCanBePrivate")
class MultithreadingNioLoopServer(
override val port: Int,
private val protocol: NioProtocol,
val backLog: Int = 50,
val workerThreads: Int = Runtime.getRuntime().availableProcessors(),
nioThreadFactory: (
threadName: String,
workLoop: (thread: NioThread, key: SelectionKey) -> Unit
) -> NioThread = { name, workLoop ->
WorkerLoopNioThread(name, workLoop = workLoop, daemon = false)
}
) : SocketServer {
private val listenChannel = ServerSocketChannel.open()
private val workerNioThread = Array(workerThreads) {
nioThreadFactory("nio-worker-$it", WorkerLoopHandler(protocol))
}.asList()
private val bossNioThread = nioThreadFactory(
"nio-boss", MultithreadingBossLoopHandler(
protocol,
workerNioThread
)
)
private val started = AtomicBoolean(false)
override fun run() {
if (started.compareAndSet(false, true)) {
listenChannel.socket().bind(InetSocketAddress(port), backLog)
listenChannel.configureBlocking(false)
bossNioThread.register(listenChannel, SelectionKey.OP_ACCEPT) {}
}
}
override fun close() {
listenChannel.close()
workerNioThread.forEach(NioThread::close)
bossNioThread.close()
}
protected fun finalize() {
close()
}
companion object {
private const val TIMEOUT = 1000L
}
}

View File

@ -18,6 +18,7 @@ open class NioServer(
override val port: Int,
backlog: Int = 50,
coroutineScope: CoroutineScope = GlobalScope,
var autoCloseSocket: Boolean = true,
private val handler: suspend AsyncSocket.() -> Unit
) : SocketServer by NioLoopServer(port, object : NioProtocol by AsyncProtocol {
override fun handleConnect(key: SelectionKey, nioThread: NioThread) {
@ -25,10 +26,8 @@ open class NioServer(
val socket = NioSocket(key, nioThread)
try {
socket.handler()
} catch (e: Exception) {
Exception(e).printStackTrace()
} finally {
try {
if (autoCloseSocket) try {
socket.close()
} catch (e: Exception) {
}