This commit is contained in:
tursom 2020-05-16 21:38:39 +08:00
parent a14a4b3e0d
commit b5e4fbf44a
6 changed files with 15 additions and 11 deletions

View File

@ -15,7 +15,7 @@ object AsyncDatagramClient {
private val nioThread = WorkerLoopNioThread( private val nioThread = WorkerLoopNioThread(
"nioClient", "nioClient",
daemon = false, daemon = false,
workLoop = WorkerLoopHandler(AsyncProtocol)::handle workLoop = WorkerLoopHandler(AsyncProtocol)
) )
suspend fun connect(host: String, port: Int): NioDatagram = connect(InetSocketAddress(host, port)) suspend fun connect(host: String, port: Int): NioDatagram = connect(InetSocketAddress(host, port))

View File

@ -5,8 +5,11 @@ import cn.tursom.niothread.NioProtocol
import java.nio.channels.SelectionKey import java.nio.channels.SelectionKey
import java.nio.channels.ServerSocketChannel import java.nio.channels.ServerSocketChannel
class BossLoopHandler(private val protocol: NioProtocol, private val workerThread: NioThread? = null) { class BossLoopHandler(
fun handle(nioThread: NioThread, key: SelectionKey) { private val protocol: NioProtocol,
private val workerThread: NioThread? = null
) : (NioThread, SelectionKey) -> Unit {
override fun invoke(nioThread: NioThread, key: SelectionKey) {
val workerThread: NioThread = workerThread ?: nioThread val workerThread: NioThread = workerThread ?: nioThread
try { try {
when { when {

View File

@ -4,8 +4,8 @@ import cn.tursom.niothread.NioProtocol
import cn.tursom.niothread.NioThread import cn.tursom.niothread.NioThread
import java.nio.channels.SelectionKey import java.nio.channels.SelectionKey
class WorkerLoopHandler(private val protocol: NioProtocol) { class WorkerLoopHandler(private val protocol: NioProtocol) : (NioThread, SelectionKey) -> Unit {
fun handle(nioThread: NioThread, key: SelectionKey) { override fun invoke(nioThread: NioThread, key: SelectionKey) {
try { try {
when { when {
key.isReadable -> { key.isReadable -> {

View File

@ -3,6 +3,6 @@ package cn.tursom.socket
import cn.tursom.core.pool.MemoryPool import cn.tursom.core.pool.MemoryPool
class BufferedNioSocket( class BufferedNioSocket(
val socket: AsyncSocket, override val prevChannel: AsyncSocket,
override val pool: MemoryPool override val pool: MemoryPool
) : BufferedAsyncSocket, AsyncSocket by socket ) : BufferedAsyncSocket, AsyncSocket by prevChannel

View File

@ -1,5 +1,6 @@
package cn.tursom.socket package cn.tursom.socket
import cn.tursom.channel.AsyncChannel
import cn.tursom.channel.AsyncProtocol import cn.tursom.channel.AsyncProtocol
import cn.tursom.niothread.WorkerLoopNioThread import cn.tursom.niothread.WorkerLoopNioThread
import cn.tursom.niothread.loophandler.WorkerLoopHandler import cn.tursom.niothread.loophandler.WorkerLoopHandler
@ -19,13 +20,13 @@ object NioClient {
private val nioThread = WorkerLoopNioThread( private val nioThread = WorkerLoopNioThread(
"nioClient", "nioClient",
daemon = false, daemon = false,
workLoop = WorkerLoopHandler(AsyncProtocol)::handle workLoop = WorkerLoopHandler(AsyncProtocol)
) )
suspend fun connect(host: String, port: Int, timeout: Long = 0): NioSocket { suspend fun connect(host: String, port: Int, timeout: Long = 0): NioSocket {
val key: SelectionKey = suspendCoroutine { cont -> val key: SelectionKey = suspendCoroutine { cont ->
val channel = getConnection(host, port) val channel = getConnection(host, port)
val timeoutTask = if (timeout > 0) NioSocket.timer.exec(timeout) { val timeoutTask = if (timeout > 0) AsyncChannel.timer.exec(timeout) {
channel.close() channel.close()
cont.resumeWithException(TimeoutException()) cont.resumeWithException(TimeoutException())
} else { } else {

View File

@ -25,11 +25,11 @@ class NioLoopServer(
} }
) : SocketServer { ) : SocketServer {
private val listenChannel = ServerSocketChannel.open() private val listenChannel = ServerSocketChannel.open()
private val workerNioThread = nioThreadFactory("nio-worker", WorkerLoopHandler(protocol)::handle) private val workerNioThread = nioThreadFactory("nio-worker", WorkerLoopHandler(protocol))
private val bossNioThread = nioThreadFactory("nio-boss", BossLoopHandler( private val bossNioThread = nioThreadFactory("nio-boss", BossLoopHandler(
protocol, protocol,
workerNioThread workerNioThread
)::handle) )::invoke)
private val started = AtomicBoolean(false) private val started = AtomicBoolean(false)
override fun run() { override fun run() {