diff --git a/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncNioClient.kt b/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncNioClient.kt index bcb6b5d..240001d 100644 --- a/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncNioClient.kt +++ b/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncNioClient.kt @@ -1,7 +1,10 @@ package cn.tursom.socket +import cn.tursom.core.logE import cn.tursom.socket.niothread.WorkerLoopNioThread import java.net.InetSocketAddress +import java.net.SocketException +import java.nio.channels.SelectableChannel import java.nio.channels.SelectionKey import java.nio.channels.SocketChannel import java.util.concurrent.TimeoutException @@ -9,101 +12,84 @@ import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine +@Suppress("MemberVisibilityCanBePrivate") object AsyncNioClient { - private const val TIMEOUT = 1000L - private val protocol = AsyncNioSocket.nioSocketProtocol - @JvmStatic - private val nioThread = WorkerLoopNioThread("nioClient") { nioThread -> - val selector = nioThread.selector - //logE("AsyncNioClient selector select") - if (selector.select(TIMEOUT) != 0) { - //logE("AsyncNioClient selector select successfully") - val keyIter = selector.selectedKeys().iterator() - while (keyIter.hasNext()) { - val key = keyIter.next() - keyIter.remove() - try { - when { - !key.isValid -> { - } - key.isReadable -> { - protocol.handleRead(key, nioThread) - } - key.isWritable -> { - protocol.handleWrite(key, nioThread) - } - key.isConnectable -> { - protocol.handleConnect(key, nioThread) - } - } - } catch (e: Throwable) { - try { - protocol.exceptionCause(key, nioThread, e) - } catch (e1: Throwable) { - e.printStackTrace() - e1.printStackTrace() - } - } - } - } - //logE("AsyncNioClient selector select end") - } + private const val TIMEOUT = 1000L + private val protocol = AsyncNioSocket.nioSocketProtocol + @JvmStatic + private val nioThread = WorkerLoopNioThread("nioClient", isDaemon = true) { nioThread -> + val selector = nioThread.selector + //logE("client keys: ${selector.keys().size}") + //logE("client op read: ${selector.keys().filter { key -> + // key.isValid && key.interestOps() == SelectionKey.OP_READ + //}.size}") + //logE("client op write: ${selector.keys().filter { key -> + // key.isValid && key.interestOps() == SelectionKey.OP_WRITE + //}.size}") + //logE("AsyncNioClient selector select") + if (selector.select(TIMEOUT) != 0) { + //logE("AsyncNioClient selector select successfully") + val keyIter = selector.selectedKeys().iterator() + while (keyIter.hasNext()) { + val key = keyIter.next() + keyIter.remove() + try { + when { + //!key.isValid -> { + //} + //key.isConnectable -> { + // protocol.handleConnect(key, nioThread) + //} + key.isReadable -> { + protocol.handleRead(key, nioThread) + } + key.isWritable -> { + protocol.handleWrite(key, nioThread) + } + } + } catch (e: Throwable) { + try { + protocol.exceptionCause(key, nioThread, e) + } catch (e1: Throwable) { + e.printStackTrace() + e1.printStackTrace() + key.cancel() + key.channel().close() + } + } + } + } + //logE("AsyncNioClient selector select end") + } - @Suppress("DuplicatedCode") - fun connect(host: String, port: Int): AsyncNioSocket { - val selector = nioThread.selector - val channel = SocketChannel.open() - channel.connect(InetSocketAddress(host, port)) - channel.configureBlocking(false) - val f = nioThread.submit { - channel.register(selector, 0) - } - selector.wakeup() - val key: SelectionKey = f.get() - return AsyncNioSocket(key, nioThread) - } + suspend fun connect(host: String, port: Int): AsyncNioSocket { + return connect(host, port, 0) + } - @Suppress("DuplicatedCode") - suspend fun suspendConnect(host: String, port: Int): AsyncNioSocket { - val key: SelectionKey = suspendCoroutine { cont -> - try { - val channel = SocketChannel.open() - channel.connect(InetSocketAddress(host, port)) - channel.configureBlocking(false) - nioThread.submit { - nioThread.register(channel, 0) { key -> - cont.resume(key) - } - } - nioThread.wakeup() - } catch (e: Exception) { - cont.resumeWithException(e) - } - } - return AsyncNioSocket(key, nioThread) - } + suspend fun connect(host: String, port: Int, timeout: Long): AsyncNioSocket { + val key: SelectionKey = suspendCoroutine { cont -> + val channel = getConnection(host, port) + val timeoutTask = if (timeout > 0) AsyncNioSocket.timer.exec(timeout) { + channel.close() + cont.resumeWithException(TimeoutException()) + } else { + null + } + nioThread.register(channel, 0) { key -> + //key.attach(AsyncNioSocket.ConnectContext(cont, timeoutTask)) + timeoutTask?.cancel() + cont.resume(key) + } + } + return AsyncNioSocket(key, nioThread) + } - @Suppress("DuplicatedCode") - suspend fun suspendConnect(host: String, port: Int, timeout: Long): AsyncNioSocket { - if (timeout <= 0) return suspendConnect(host, port) - val key: SelectionKey = suspendCoroutine { cont -> - val channel = SocketChannel.open() - channel.connect(InetSocketAddress(host, port)) - channel.configureBlocking(false) - val timeoutTask = AsyncNioSocket.timer.exec(timeout) { - channel.close() - cont.resumeWithException(TimeoutException()) - } - try { - nioThread.register(channel, 0) { key -> - timeoutTask.cancel() - cont.resume(key) - } - nioThread.wakeup() - } catch (e: Exception) { - cont.resumeWithException(e) - } - } - return AsyncNioSocket(key, nioThread) - } + private fun getConnection(host: String, port: Int): SelectableChannel { + val channel = SocketChannel.open()!! + if (!channel.connect(InetSocketAddress(host, port))) { + throw SocketException("connection failed") + } + channel.configureBlocking(false) + return channel + } } \ No newline at end of file diff --git a/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncNioSocket.kt b/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncNioSocket.kt index 12f73b1..d916211 100644 --- a/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncNioSocket.kt +++ b/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncNioSocket.kt @@ -21,7 +21,7 @@ import kotlin.coroutines.suspendCoroutine */ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INioThread) : IAsyncNioSocket { override val channel: SocketChannel = key.channel() as SocketChannel - + override suspend fun read(buffer: ByteBuffer): Int { if (buffer.remaining() == 0) return emptyBufferCode return operate { @@ -31,7 +31,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi channel.read(buffer) } } - + override suspend fun read(buffer: Array): Long { if (buffer.isEmpty()) return emptyBufferLongCode return operate { @@ -39,7 +39,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi channel.read(buffer) } } - + override suspend fun write(buffer: ByteBuffer): Int { if (buffer.remaining() == 0) return emptyBufferCode return operate { @@ -47,7 +47,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi channel.write(buffer) } } - + override suspend fun write(buffer: Array): Long { if (buffer.isEmpty()) return emptyBufferLongCode return operate { @@ -55,7 +55,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi channel.write(buffer) } } - + override suspend fun read(buffer: ByteBuffer, timeout: Long): Int { //logE("AsyncNioSocket.read(buffer: ByteBuffer, timeout: Long): $buffer, $timeout") if (timeout <= 0) return read(buffer) @@ -67,7 +67,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi channel.read(buffer) } } - + override suspend fun read(buffer: Array, timeout: Long): Long { if (timeout <= 0) return read(buffer) if (buffer.isEmpty()) return emptyBufferLongCode @@ -76,7 +76,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi channel.read(buffer) } } - + override suspend fun write(buffer: ByteBuffer, timeout: Long): Int { if (timeout <= 0) return write(buffer) if (buffer.remaining() == 0) return emptyBufferCode @@ -85,7 +85,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi channel.write(buffer) } } - + override suspend fun write(buffer: Array, timeout: Long): Long { if (timeout <= 0) return write(buffer) if (buffer.isEmpty()) return emptyBufferLongCode @@ -94,7 +94,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi channel.write(buffer) } } - + override fun close() { nioThread.execute { channel.close() @@ -102,16 +102,16 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi } nioThread.wakeup() } - + private inline fun operate(action: () -> T): T { return try { action() } catch (e: Exception) { waitMode() - throw RuntimeException(e) + throw e } } - + private suspend inline fun waitRead(timeout: Long) { suspendCoroutine { key.attach(Context(it, timer.exec(timeout) { @@ -123,7 +123,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi nioThread.wakeup() } } - + private suspend inline fun waitWrite(timeout: Long) { suspendCoroutine { key.attach(Context(it, timer.exec(timeout) { @@ -135,7 +135,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi nioThread.wakeup() } } - + private suspend inline fun waitRead() { suspendCoroutine { //logE("waitRead() attach") @@ -146,7 +146,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi nioThread.wakeup() } } - + private suspend inline fun waitWrite() { suspendCoroutine { key.attach(Context(it)) @@ -154,13 +154,19 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi nioThread.wakeup() } } - + data class Context(val cont: Continuation, val timeoutTask: TimerTask? = null) - + data class ConnectContext(val cont: Continuation, val timeoutTask: TimerTask? = null) + companion object { val nioSocketProtocol = object : INioProtocol { - override fun handleConnect(key: SelectionKey, nioThread: INioThread) {} - + override fun handleConnect(key: SelectionKey, nioThread: INioThread) { + key.interestOps(0) + val context = key.attachment() as ConnectContext? ?: return + context.timeoutTask?.cancel() + context.cont.resume(key) + } + override fun handleRead(key: SelectionKey, nioThread: INioThread) { key.interestOps(0) //logE("read ready") @@ -168,14 +174,14 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi context.timeoutTask?.cancel() context.cont.resume(0) } - + override fun handleWrite(key: SelectionKey, nioThread: INioThread) { key.interestOps(0) val context = key.attachment() as Context? ?: return context.timeoutTask?.cancel() context.cont.resume(0) } - + override fun exceptionCause(key: SelectionKey, nioThread: INioThread, e: Throwable) { key.interestOps(0) val context = key.attachment() as Context? @@ -188,10 +194,10 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi } } } - + //val timer = StaticWheelTimer.timer val timer = WheelTimer.timer - + const val emptyBufferCode = 0 const val emptyBufferLongCode = 0L } diff --git a/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt b/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt index 407683b..74fac0e 100644 --- a/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt +++ b/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt @@ -1,6 +1,7 @@ package cn.tursom.socket import cn.tursom.core.bytebuffer.AdvanceByteBuffer +import cn.tursom.core.bytebuffer.ByteArrayAdvanceByteBuffer import cn.tursom.core.bytebuffer.readNioBuffer import cn.tursom.core.bytebuffer.writeNioBuffer import cn.tursom.core.logE @@ -13,7 +14,7 @@ interface AsyncSocket : Closeable { suspend fun write(buffer: ByteBuffer, timeout: Long = 0L): Int = write(arrayOf(buffer), timeout).toInt() suspend fun read(buffer: ByteBuffer, timeout: Long = 0L): Int = read(arrayOf(buffer), timeout).toInt() override fun close() - + suspend fun write(buffer: AdvanceByteBuffer, timeout: Long = 0): Int { return if (buffer.bufferCount == 1) { buffer.readNioBuffer { @@ -28,12 +29,15 @@ interface AsyncSocket : Closeable { value } } - + suspend fun read(buffer: AdvanceByteBuffer, timeout: Long = 0): Int { //logE("buffer.bufferCount: ${buffer.bufferCount}") //logE("AsyncSocket.read(buffer: AdvanceByteBuffer, timeout: Long = 0): buffer: $buffer") return if (buffer.bufferCount == 1) { buffer.writeNioBuffer { + //if (buffer is ByteArrayAdvanceByteBuffer) { + // logE(it.toString()) + //} read(it, timeout) } } else { diff --git a/socket/socket-async/src/main/kotlin/cn/tursom/socket/server/AsyncNioServer.kt b/socket/socket-async/src/main/kotlin/cn/tursom/socket/server/AsyncNioServer.kt index e015715..c4aa658 100644 --- a/socket/socket-async/src/main/kotlin/cn/tursom/socket/server/AsyncNioServer.kt +++ b/socket/socket-async/src/main/kotlin/cn/tursom/socket/server/AsyncNioServer.kt @@ -32,18 +32,5 @@ class AsyncNioServer( } } } -}, backlog) { - /** - * 次要构造方法,为使用Spring的同学们准备的 - */ - constructor( - port: Int, - backlog: Int = 50, - handler: Handler - ) : this(port, backlog, { handler.handle(this) }) - - interface Handler { - fun handle(socket: AsyncNioSocket) - } -} +}, backlog) diff --git a/socket/socket-async/src/test/kotlin/ProcessorTest.kt b/socket/socket-async/src/test/kotlin/ProcessorTest.kt index af76a01..22c2a12 100644 --- a/socket/socket-async/src/test/kotlin/ProcessorTest.kt +++ b/socket/socket-async/src/test/kotlin/ProcessorTest.kt @@ -1,5 +1,6 @@ import cn.tursom.core.bytebuffer.ByteArrayAdvanceByteBuffer -import cn.tursom.core.log +import cn.tursom.core.bytebuffer.HeapNioAdvanceByteBuffer +import cn.tursom.core.logE import cn.tursom.core.pool.DirectMemoryPool import cn.tursom.core.pool.usingAdvanceByteBuffer import cn.tursom.socket.AsyncNioClient @@ -7,23 +8,23 @@ import cn.tursom.socket.server.AsyncNioServer import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import java.lang.Thread.sleep -import java.net.SocketException import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicInteger fun main() { // 服务器端口,可任意指定 - val port = 12345 - + val port = 12346 + // 创建一个直接内存池,每个块是1024字节,共有256个快 - val memoryPool = DirectMemoryPool(1024, 256) + val memoryPool = DirectMemoryPool(1024, 512) // 创建服务器对象 val server = AsyncNioServer(port) { + //log("get new connection") // 这里处理业务逻辑,套接字对象被以 this 的方式传进来 // 从内存池中获取一个内存块 memoryPool.usingAdvanceByteBuffer { // 检查是否获取成功,不成功就创建一个堆缓冲 - val buffer = it ?: ByteArrayAdvanceByteBuffer(1024) + val buffer = it ?: HeapNioAdvanceByteBuffer(1024) try { while (true) { buffer.clear() @@ -38,39 +39,47 @@ fun main() { //log("server send [$writeSize] bytes") } } catch (e: TimeoutException) { + e.printStackTrace() } // 代码块结束后,框架会自动释放连接 } } server.run() - - val connectionCount = 300 - val dataPerConn = 10 + + val connectionCount = 1000 + val dataPerConn = 1 val testData = "testData".toByteArray() - - val remain = AtomicInteger(connectionCount) - + + val remain = AtomicInteger(connectionCount * dataPerConn) + val clientMemoryPool = DirectMemoryPool(1024, connectionCount) - + val start = System.currentTimeMillis() - + repeat(connectionCount) { GlobalScope.launch { val socket = AsyncNioClient.connect("127.0.0.1", port) clientMemoryPool.usingAdvanceByteBuffer { // 检查是否获取成功,不成功就创建一个堆缓冲 - val buffer = it ?: ByteArrayAdvanceByteBuffer(1024) try { + //val buffer = it!! + val buffer = ByteArrayAdvanceByteBuffer(1024) repeat(dataPerConn) { buffer.clear() buffer.put(testData) //log("client sending: [${buffer.readableSize}] ${buffer.toString(buffer.readableSize)}") val writeSize = socket.write(buffer) + if (writeSize == 0) { + logE("write size is zero") + } else if (writeSize < 0) { + return@usingAdvanceByteBuffer + } //log("client write [$writeSize] bytes") //log(buffer.toString()) val readSize = socket.read(buffer) //log(buffer.toString()) //log("client recv: [$readSize:${buffer.readableSize}] ${buffer.toString(buffer.readableSize)}") + remain.decrementAndGet() } } catch (e: Exception) { Exception(e).printStackTrace() @@ -78,15 +87,14 @@ fun main() { socket.close() } } - remain.decrementAndGet() } } - + while (remain.get() != 0) { println(remain.get()) sleep(500) } - + val end = System.currentTimeMillis() println(end - start) server.close() diff --git a/socket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt b/socket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt index 6012d54..23fcf36 100644 --- a/socket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt +++ b/socket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt @@ -21,12 +21,12 @@ class NioServer( ) : ISocketServer { private val listenChannel = ServerSocketChannel.open() private val threadList = ConcurrentLinkedDeque() - + init { listenChannel.socket().bind(InetSocketAddress(port), backLog) listenChannel.configureBlocking(false) } - + constructor( port: Int, protocol: INioProtocol, @@ -34,28 +34,35 @@ class NioServer( ) : this(port, protocol, backLog, { name, workLoop -> WorkerLoopNioThread(name, workLoop = workLoop, isDaemon = false) }) - + override fun run() { val nioThread = nioThreadGenerator("nio worker", LoopHandler(protocol)::handle) nioThread.register(listenChannel, SelectionKey.OP_ACCEPT) {} threadList.add(nioThread) } - + override fun close() { listenChannel.close() threadList.forEach { it.close() } } - + class LoopHandler(val protocol: INioProtocol) { fun handle(nioThread: INioThread) { //logE("wake up") val selector = nioThread.selector + //logE("server keys: ${selector.keys().size}") + //logE("server op read: ${selector.keys().filter { key -> + // key.isValid && key.interestOps() == SelectionKey.OP_READ + //}.size}") + //logE("server op write: ${selector.keys().filter { key -> + // key.isValid && key.interestOps() == SelectionKey.OP_WRITE + //}.size}") if (selector.isOpen) { if (selector.select(TIMEOUT) != 0) { val keyIter = selector.selectedKeys().iterator() - while (keyIter.hasNext()) run whileBlock@{ + while (keyIter.hasNext()) { val key = keyIter.next() keyIter.remove() //logE("selected key: $key: ${key.attachment()}") @@ -94,7 +101,7 @@ class NioServer( } } } - + companion object { private const val TIMEOUT = 1000L }