From 9765c5ead5f96e815716c213e16ba57805e68332 Mon Sep 17 00:00:00 2001 From: tursom Date: Sun, 3 Nov 2019 23:22:39 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20AsyncSocket=20=E7=9A=84=20?= =?UTF-8?q?bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kotlin/cn/tursom/socket/AsyncNioSocket.kt | 102 ++- .../kotlin/cn/tursom/socket/AsyncSocket.kt | 70 +- .../cn/tursom/socket/IAsyncNioSocket.kt | 24 +- .../cn/tursom/socket/server/AsyncNioServer.kt | 4 +- .../src/test/kotlin/ProcessorTest.kt | 93 +++ .../socket/server/AsyncNioServerTest.kt | 59 ++ .../kotlin/cn/tursom/socket/BaseSocket.kt | 116 +-- .../kotlin/cn/tursom/socket/SocketClient.kt | 84 +- .../cn/tursom/socket/server/NioServer.kt | 16 +- .../cn/tursom/socket/server/NioServerTest.kt | 66 ++ .../core/bytebuffer/AdvanceByteBuffer.kt | 786 +++++++++--------- .../bytebuffer/ByteArrayAdvanceByteBuffer.kt | 297 +++---- .../bytebuffer/DirectNioAdvanceByteBuffer.kt | 109 +-- .../bytebuffer/HeapNioAdvanceByteBuffer.kt | 323 +++---- .../core/bytebuffer/MultiAdvanceByteBuffer.kt | 197 ++--- .../core/bytebuffer/NioAdvanceByteBuffer.kt | 10 +- .../cn/tursom/core/pool/DirectMemoryPool.kt | 4 + .../cn/tursom/core/pool/HeapMemoryPool.kt | 4 + .../kotlin/cn/tursom/core/pool/MemoryPool.kt | 4 +- .../kotlin/cn/tursom/core/timer/WheelTimer.kt | 198 ++--- .../web/netty/NettyAdvanceByteBuffer.kt | 9 +- 21 files changed, 1443 insertions(+), 1132 deletions(-) create mode 100644 socket/socket-async/src/test/kotlin/ProcessorTest.kt create mode 100644 socket/socket-async/src/test/kotlin/cn/tursom/socket/server/AsyncNioServerTest.kt create mode 100644 socket/src/test/kotlin/cn/tursom/socket/server/NioServerTest.kt diff --git a/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncNioSocket.kt b/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncNioSocket.kt index 870880f..12f73b1 100644 --- a/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncNioSocket.kt +++ b/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncNioSocket.kt @@ -1,8 +1,10 @@ package cn.tursom.socket -import cn.tursom.socket.niothread.INioThread +import cn.tursom.core.log +import cn.tursom.core.logE import cn.tursom.core.timer.TimerTask import cn.tursom.core.timer.WheelTimer +import cn.tursom.socket.niothread.INioThread import java.nio.ByteBuffer import java.nio.channels.SelectionKey import java.nio.channels.SocketChannel @@ -19,15 +21,17 @@ import kotlin.coroutines.suspendCoroutine */ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INioThread) : IAsyncNioSocket { override val channel: SocketChannel = key.channel() as SocketChannel - + override suspend fun read(buffer: ByteBuffer): Int { if (buffer.remaining() == 0) return emptyBufferCode return operate { + //logE("read(buffer: ByteBuffer) wait read") waitRead() + //logE("read(buffer: ByteBuffer) wait read complete") channel.read(buffer) } } - + override suspend fun read(buffer: Array): Long { if (buffer.isEmpty()) return emptyBufferLongCode return operate { @@ -35,7 +39,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi channel.read(buffer) } } - + override suspend fun write(buffer: ByteBuffer): Int { if (buffer.remaining() == 0) return emptyBufferCode return operate { @@ -43,7 +47,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi channel.write(buffer) } } - + override suspend fun write(buffer: Array): Long { if (buffer.isEmpty()) return emptyBufferLongCode return operate { @@ -51,16 +55,19 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi channel.write(buffer) } } - + override suspend fun read(buffer: ByteBuffer, timeout: Long): Int { + //logE("AsyncNioSocket.read(buffer: ByteBuffer, timeout: Long): $buffer, $timeout") if (timeout <= 0) return read(buffer) if (buffer.remaining() == 0) return emptyBufferCode return operate { + //logE("wait read") waitRead(timeout) + //logE("wait read complete") channel.read(buffer) } } - + override suspend fun read(buffer: Array, timeout: Long): Long { if (timeout <= 0) return read(buffer) if (buffer.isEmpty()) return emptyBufferLongCode @@ -69,7 +76,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi channel.read(buffer) } } - + override suspend fun write(buffer: ByteBuffer, timeout: Long): Int { if (timeout <= 0) return write(buffer) if (buffer.remaining() == 0) return emptyBufferCode @@ -78,7 +85,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi channel.write(buffer) } } - + override suspend fun write(buffer: Array, timeout: Long): Long { if (timeout <= 0) return write(buffer) if (buffer.isEmpty()) return emptyBufferLongCode @@ -87,7 +94,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi channel.write(buffer) } } - + override fun close() { nioThread.execute { channel.close() @@ -95,7 +102,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi } nioThread.wakeup() } - + private inline fun operate(action: () -> T): T { return try { action() @@ -104,37 +111,42 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi throw RuntimeException(e) } } - + private suspend inline fun waitRead(timeout: Long) { suspendCoroutine { key.attach(Context(it, timer.exec(timeout) { key.attach(null) + waitMode() it.resumeWithException(TimeoutException()) - readMode() - nioThread.wakeup() })) - } - } - - private suspend inline fun waitWrite(timeout: Long) { - suspendCoroutine { - key.attach(Context(it, timer.exec(timeout) { - key.attach(null) - it.resumeWithException(TimeoutException()) - writeMode() - nioThread.wakeup() - })) - } - } - - private suspend inline fun waitRead() { - suspendCoroutine { - key.attach(Context(it)) readMode() nioThread.wakeup() } } - + + private suspend inline fun waitWrite(timeout: Long) { + suspendCoroutine { + key.attach(Context(it, timer.exec(timeout) { + key.attach(null) + waitMode() + it.resumeWithException(TimeoutException()) + })) + writeMode() + nioThread.wakeup() + } + } + + private suspend inline fun waitRead() { + suspendCoroutine { + //logE("waitRead() attach") + key.attach(Context(it)) + //logE("waitRead() readMode()") + readMode() + //logE("waitRead() wakeup()") + nioThread.wakeup() + } + } + private suspend inline fun waitWrite() { suspendCoroutine { key.attach(Context(it)) @@ -142,26 +154,28 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi nioThread.wakeup() } } - + data class Context(val cont: Continuation, val timeoutTask: TimerTask? = null) - + companion object { val nioSocketProtocol = object : INioProtocol { override fun handleConnect(key: SelectionKey, nioThread: INioThread) {} - + override fun handleRead(key: SelectionKey, nioThread: INioThread) { + key.interestOps(0) + //logE("read ready") + val context = key.attachment() as Context? ?: return + context.timeoutTask?.cancel() + context.cont.resume(0) + } + + override fun handleWrite(key: SelectionKey, nioThread: INioThread) { key.interestOps(0) val context = key.attachment() as Context? ?: return context.timeoutTask?.cancel() context.cont.resume(0) } - - override fun handleWrite(key: SelectionKey, nioThread: INioThread) { - key.interestOps(0) - val context = key.attachment() as Context? ?: return - context.cont.resume(0) - } - + override fun exceptionCause(key: SelectionKey, nioThread: INioThread, e: Throwable) { key.interestOps(0) val context = key.attachment() as Context? @@ -174,10 +188,10 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi } } } - + //val timer = StaticWheelTimer.timer val timer = WheelTimer.timer - + const val emptyBufferCode = 0 const val emptyBufferLongCode = 0L } diff --git a/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt b/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt index 9ba0b5b..407683b 100644 --- a/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt +++ b/socket/socket-async/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt @@ -3,41 +3,45 @@ package cn.tursom.socket import cn.tursom.core.bytebuffer.AdvanceByteBuffer import cn.tursom.core.bytebuffer.readNioBuffer import cn.tursom.core.bytebuffer.writeNioBuffer +import cn.tursom.core.logE import java.io.Closeable import java.nio.ByteBuffer interface AsyncSocket : Closeable { - suspend fun write(buffer: Array, timeout: Long = 0L): Long - suspend fun read(buffer: Array, timeout: Long = 0L): Long - suspend fun write(buffer: ByteBuffer, timeout: Long = 0L): Int = write(arrayOf(buffer)).toInt() - suspend fun read(buffer: ByteBuffer, timeout: Long = 0L): Int = read(arrayOf(buffer)).toInt() - override fun close() - - suspend fun write(buffer: AdvanceByteBuffer, timeout: Long = 0): Int { - return if (buffer.bufferCount == 1) { - buffer.readNioBuffer { - write(it, timeout) - } - } else { - val readMode = buffer.readMode - buffer.readMode() - val value = write(buffer.nioBuffers, timeout).toInt() - if (!readMode) buffer.resumeWriteMode() - value - } - } - - suspend fun read(buffer: AdvanceByteBuffer, timeout: Long = 0): Int { - return if (buffer.bufferCount == 1) { - buffer.writeNioBuffer { - read(it, timeout) - } - } else { - val readMode = buffer.readMode - buffer.resumeWriteMode() - val value = read(buffer.nioBuffers, timeout).toInt() - if (readMode) buffer.readMode() - value - } - } + suspend fun write(buffer: Array, timeout: Long = 0L): Long + suspend fun read(buffer: Array, timeout: Long = 0L): Long + suspend fun write(buffer: ByteBuffer, timeout: Long = 0L): Int = write(arrayOf(buffer), timeout).toInt() + suspend fun read(buffer: ByteBuffer, timeout: Long = 0L): Int = read(arrayOf(buffer), timeout).toInt() + override fun close() + + suspend fun write(buffer: AdvanceByteBuffer, timeout: Long = 0): Int { + return if (buffer.bufferCount == 1) { + buffer.readNioBuffer { + //logE(it.toString()) + write(it, timeout) + } + } else { + val readMode = buffer.readMode + buffer.readMode() + val value = write(buffer.nioBuffers, timeout).toInt() + if (!readMode) buffer.resumeWriteMode() + value + } + } + + suspend fun read(buffer: AdvanceByteBuffer, timeout: Long = 0): Int { + //logE("buffer.bufferCount: ${buffer.bufferCount}") + //logE("AsyncSocket.read(buffer: AdvanceByteBuffer, timeout: Long = 0): buffer: $buffer") + return if (buffer.bufferCount == 1) { + buffer.writeNioBuffer { + read(it, timeout) + } + } else { + val readMode = buffer.readMode + buffer.resumeWriteMode() + val value = read(buffer.nioBuffers, timeout).toInt() + if (readMode) buffer.readMode() + value + } + } } \ No newline at end of file diff --git a/socket/socket-async/src/main/kotlin/cn/tursom/socket/IAsyncNioSocket.kt b/socket/socket-async/src/main/kotlin/cn/tursom/socket/IAsyncNioSocket.kt index 7345fa4..53c13c4 100644 --- a/socket/socket-async/src/main/kotlin/cn/tursom/socket/IAsyncNioSocket.kt +++ b/socket/socket-async/src/main/kotlin/cn/tursom/socket/IAsyncNioSocket.kt @@ -3,6 +3,7 @@ package cn.tursom.socket import cn.tursom.socket.niothread.INioThread import cn.tursom.core.bytebuffer.AdvanceByteBuffer import cn.tursom.core.bytebuffer.writeNioBuffer +import cn.tursom.core.logE import java.net.SocketException import java.nio.ByteBuffer import java.nio.channels.SelectionKey @@ -12,7 +13,7 @@ interface IAsyncNioSocket : AsyncSocket { val channel: SocketChannel val key: SelectionKey val nioThread: INioThread - + fun waitMode() { if (Thread.currentThread() == nioThread.thread) { if (key.isValid) key.interestOps(SelectionKey.OP_WRITE) @@ -21,16 +22,21 @@ interface IAsyncNioSocket : AsyncSocket { nioThread.wakeup() } } - + fun readMode() { + //logE("readMode()") if (Thread.currentThread() == nioThread.thread) { if (key.isValid) key.interestOps(SelectionKey.OP_WRITE) } else { - nioThread.execute { if (key.isValid) key.interestOps(SelectionKey.OP_READ) } + nioThread.execute { + //logE("readMode() interest") + if (key.isValid) key.interestOps(SelectionKey.OP_READ) + //logE("readMode interestOps ${key.isValid} ${key.interestOps()}") + } nioThread.wakeup() } } - + fun writeMode() { if (Thread.currentThread() == nioThread.thread) { if (key.isValid) key.interestOps(SelectionKey.OP_WRITE) @@ -39,7 +45,7 @@ interface IAsyncNioSocket : AsyncSocket { nioThread.wakeup() } } - + suspend fun read(buffer: ByteBuffer): Int = read(arrayOf(buffer)).toInt() suspend fun write(buffer: ByteBuffer): Int = write(arrayOf(buffer)).toInt() suspend fun read(buffer: Array): Long @@ -55,7 +61,7 @@ interface IAsyncNioSocket : AsyncSocket { } return readSize } - + suspend fun recv(buffer: ByteBuffer, timeout: Long): Int { if (buffer.remaining() == 0) return emptyBufferCode val readSize = read(buffer, timeout) @@ -64,7 +70,7 @@ interface IAsyncNioSocket : AsyncSocket { } return readSize } - + suspend fun recv(buffers: Array, timeout: Long): Long { if (buffers.isEmpty()) return emptyBufferLongCode val readSize = read(buffers, timeout) @@ -73,7 +79,7 @@ interface IAsyncNioSocket : AsyncSocket { } return readSize } - + suspend fun recv(buffer: AdvanceByteBuffer, timeout: Long = 0): Int { return if (buffer.bufferCount == 1) { buffer.writeNioBuffer { @@ -87,7 +93,7 @@ interface IAsyncNioSocket : AsyncSocket { value } } - + companion object { const val emptyBufferCode = 0 const val emptyBufferLongCode = 0L diff --git a/socket/socket-async/src/main/kotlin/cn/tursom/socket/server/AsyncNioServer.kt b/socket/socket-async/src/main/kotlin/cn/tursom/socket/server/AsyncNioServer.kt index 7631d57..e015715 100644 --- a/socket/socket-async/src/main/kotlin/cn/tursom/socket/server/AsyncNioServer.kt +++ b/socket/socket-async/src/main/kotlin/cn/tursom/socket/server/AsyncNioServer.kt @@ -23,7 +23,7 @@ class AsyncNioServer( try { socket.handler() } catch (e: Exception) { - e.printStackTrace() + Exception(e).printStackTrace() } finally { try { socket.close() @@ -41,7 +41,7 @@ class AsyncNioServer( backlog: Int = 50, handler: Handler ) : this(port, backlog, { handler.handle(this) }) - + interface Handler { fun handle(socket: AsyncNioSocket) } diff --git a/socket/socket-async/src/test/kotlin/ProcessorTest.kt b/socket/socket-async/src/test/kotlin/ProcessorTest.kt new file mode 100644 index 0000000..af76a01 --- /dev/null +++ b/socket/socket-async/src/test/kotlin/ProcessorTest.kt @@ -0,0 +1,93 @@ +import cn.tursom.core.bytebuffer.ByteArrayAdvanceByteBuffer +import cn.tursom.core.log +import cn.tursom.core.pool.DirectMemoryPool +import cn.tursom.core.pool.usingAdvanceByteBuffer +import cn.tursom.socket.AsyncNioClient +import cn.tursom.socket.server.AsyncNioServer +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import java.lang.Thread.sleep +import java.net.SocketException +import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicInteger + +fun main() { + // 服务器端口,可任意指定 + val port = 12345 + + // 创建一个直接内存池,每个块是1024字节,共有256个快 + val memoryPool = DirectMemoryPool(1024, 256) + // 创建服务器对象 + val server = AsyncNioServer(port) { + // 这里处理业务逻辑,套接字对象被以 this 的方式传进来 + // 从内存池中获取一个内存块 + memoryPool.usingAdvanceByteBuffer { + // 检查是否获取成功,不成功就创建一个堆缓冲 + val buffer = it ?: ByteArrayAdvanceByteBuffer(1024) + try { + while (true) { + buffer.clear() + // 从套接字中读数据,五秒之内没有数据就抛出异常 + if (read(buffer, 10_000) < 0) { + return@AsyncNioServer + } + // 输出读取到的数据 + //log("server recv from ${channel.remoteAddress}: [${buffer.readableSize}] ${buffer.toString(buffer.readableSize)}") + // 原封不动的返回数据 + val writeSize = write(buffer) + //log("server send [$writeSize] bytes") + } + } catch (e: TimeoutException) { + } + // 代码块结束后,框架会自动释放连接 + } + } + server.run() + + val connectionCount = 300 + val dataPerConn = 10 + val testData = "testData".toByteArray() + + val remain = AtomicInteger(connectionCount) + + val clientMemoryPool = DirectMemoryPool(1024, connectionCount) + + val start = System.currentTimeMillis() + + repeat(connectionCount) { + GlobalScope.launch { + val socket = AsyncNioClient.connect("127.0.0.1", port) + clientMemoryPool.usingAdvanceByteBuffer { + // 检查是否获取成功,不成功就创建一个堆缓冲 + val buffer = it ?: ByteArrayAdvanceByteBuffer(1024) + try { + repeat(dataPerConn) { + buffer.clear() + buffer.put(testData) + //log("client sending: [${buffer.readableSize}] ${buffer.toString(buffer.readableSize)}") + val writeSize = socket.write(buffer) + //log("client write [$writeSize] bytes") + //log(buffer.toString()) + val readSize = socket.read(buffer) + //log(buffer.toString()) + //log("client recv: [$readSize:${buffer.readableSize}] ${buffer.toString(buffer.readableSize)}") + } + } catch (e: Exception) { + Exception(e).printStackTrace() + } finally { + socket.close() + } + } + remain.decrementAndGet() + } + } + + while (remain.get() != 0) { + println(remain.get()) + sleep(500) + } + + val end = System.currentTimeMillis() + println(end - start) + server.close() +} \ No newline at end of file diff --git a/socket/socket-async/src/test/kotlin/cn/tursom/socket/server/AsyncNioServerTest.kt b/socket/socket-async/src/test/kotlin/cn/tursom/socket/server/AsyncNioServerTest.kt new file mode 100644 index 0000000..f22ce42 --- /dev/null +++ b/socket/socket-async/src/test/kotlin/cn/tursom/socket/server/AsyncNioServerTest.kt @@ -0,0 +1,59 @@ +package cn.tursom.socket.server + +import cn.tursom.core.bytebuffer.ByteArrayAdvanceByteBuffer +import cn.tursom.core.log +import cn.tursom.core.logE +import cn.tursom.socket.AsyncAioClient +import cn.tursom.socket.SocketClient +import kotlinx.coroutines.runBlocking +import org.junit.Test + +class AsyncNioServerTest { + private val testMsg = "hello" + private val port = 12345 + private val server = AsyncNioServer(port) { + log("new connection") + val buffer = ByteArrayAdvanceByteBuffer(1024) + while (true) { + buffer.clear() + read(buffer, 5000) + logE("server recv: ${buffer.toString(buffer.readableSize)}") + write(buffer, 5000) + } + } + + init { + server.run() + } + + //@Test + fun testAsyncNioServer() { + runBlocking { + val client = AsyncAioClient.connect("127.0.0.1", port) + log("connect to server") + val buffer = ByteArrayAdvanceByteBuffer(1024) + repeat(10) { + buffer.clear() + buffer.put(testMsg) + client.write(buffer, 5000) + buffer.clear() + client.read(buffer, 5000) + log("server recv: ${buffer.getString()}") + } + } + } + + + //@Test + fun testAsyncNioServerSocket() { + SocketClient("localhost", port) { + val buffer = ByteArray(1024) + repeat(10) { + send(testMsg) + val readSize = inputStream.read(buffer) + val recv = String(buffer, 0, readSize) + log(recv) + } + } + } +} \ No newline at end of file diff --git a/socket/src/main/kotlin/cn/tursom/socket/BaseSocket.kt b/socket/src/main/kotlin/cn/tursom/socket/BaseSocket.kt index 5d6deeb..7c93c5b 100644 --- a/socket/src/main/kotlin/cn/tursom/socket/BaseSocket.kt +++ b/socket/src/main/kotlin/cn/tursom/socket/BaseSocket.kt @@ -12,65 +12,65 @@ open class BaseSocket( val socket: Socket, val timeout: Int = Companion.timeout ) : Closeable { - - val address = socket.inetAddress?.toString()?.drop(1) ?: "0.0.0.0" - val port = socket.port - val localPort = socket.localPort - val inputStream = socket.getInputStream()!! - val outputStream = socket.getOutputStream()!! - - fun send(message: String?) { - send((message ?: return).toByteArray()) + + val address = socket.inetAddress?.toString()?.drop(1) ?: "0.0.0.0" + val port = socket.port + val localPort = socket.localPort + val inputStream by lazy { socket.getInputStream()!! } + val outputStream by lazy { socket.getOutputStream()!! } + + fun send(message: String?) { + send((message ?: return).toByteArray()) + } + + fun send(message: ByteArray?) { + outputStream.write(message ?: return) + } + + fun send(message: Int) { + val buffer = ByteArray(4) + buffer.put(message) + send(buffer) + } + + fun send(message: Long) { + val buffer = ByteArray(8) + buffer.put(message) + send(buffer) + } + + override fun close() { + closeSocket() + } + + protected fun closeSocket() { + if (!socket.isClosed) { + closeInputStream() + closeOutputStream() + socket.close() } - - fun send(message: ByteArray?) { - outputStream.write(message ?: return) + } + + private fun closeInputStream() { + try { + inputStream.close() + } catch (e: Exception) { } - - fun send(message: Int) { - val buffer = ByteArray(4) - buffer.put(message) - send(buffer) - } - - fun send(message: Long) { - val buffer = ByteArray(8) - buffer.put(message) - send(buffer) - } - - override fun close() { - closeSocket() - } - - protected fun closeSocket() { - if (!socket.isClosed) { - closeInputStream() - closeOutputStream() - socket.close() - } - } - - private fun closeInputStream() { - try { - inputStream.close() - } catch (e: Exception) { - } - } - - private fun closeOutputStream() { - try { - outputStream.close() - } catch (e: Exception) { - } - } - - fun isConnected(): Boolean { - return socket.isConnected - } - - companion object Companion { - const val defaultReadSize: Int = 1024 * 8 - const val timeout: Int = 60 * 1000 + } + + private fun closeOutputStream() { + try { + outputStream.close() + } catch (e: Exception) { } + } + + fun isConnected(): Boolean { + return socket.isConnected + } + + companion object Companion { + const val defaultReadSize: Int = 1024 * 8 + const val timeout: Int = 60 * 1000 + } } \ No newline at end of file diff --git a/socket/src/main/kotlin/cn/tursom/socket/SocketClient.kt b/socket/src/main/kotlin/cn/tursom/socket/SocketClient.kt index 797c57f..b748ca6 100644 --- a/socket/src/main/kotlin/cn/tursom/socket/SocketClient.kt +++ b/socket/src/main/kotlin/cn/tursom/socket/SocketClient.kt @@ -5,47 +5,47 @@ import java.net.Socket import java.net.SocketException class SocketClient( - socket: Socket, - timeout: Int = Companion.timeout, - private val ioException: IOException.() -> Unit = { printStackTrace() }, - private val exception: Exception.() -> Unit = { printStackTrace() }, - func: (SocketClient.() -> Unit)? = null + socket: Socket, + timeout: Int = Companion.timeout, + private val ioException: IOException.() -> Unit = { printStackTrace() }, + private val exception: Exception.() -> Unit = { printStackTrace() }, + func: (SocketClient.() -> Unit)? = null ) : BaseSocket(socket, timeout) { - - init { - func?.let { - use(it) - } - } - - constructor( - host: String, - port: Int, - timeout: Int = Companion.timeout, - ioException: IOException.() -> Unit = { printStackTrace() }, - exception: Exception.() -> Unit = { printStackTrace() }, - func: (SocketClient.() -> Unit)? = null - ) : this(Socket(host, port), timeout, ioException, exception, func) - - fun execute(func: SocketClient.() -> Unit) { - try { - func() - } catch (io: IOException) { - io.ioException() - } catch (e: SocketException) { - if (e.message == null) { - e.printStackTrace() - } else { - System.err.println("$address: ${e::class.java}: ${e.message}") - } - } catch (e: Exception) { - e.exception() - } - } - - fun use(func: SocketClient.() -> Unit) { - val ret = execute(func) - closeSocket() - return ret - } + + init { + func?.let { + invoke(it) + } + } + + constructor( + host: String, + port: Int, + timeout: Int = Companion.timeout, + ioException: IOException.() -> Unit = { printStackTrace() }, + exception: Exception.() -> Unit = { printStackTrace() }, + func: (SocketClient.() -> Unit)? = null + ) : this(Socket(host, port), timeout, ioException, exception, func) + + fun execute(func: SocketClient.() -> Unit) { + try { + func() + } catch (io: IOException) { + io.ioException() + } catch (e: SocketException) { + if (e.message == null) { + e.printStackTrace() + } else { + System.err.println("$address: ${e::class.java}: ${e.message}") + } + } catch (e: Exception) { + e.exception() + } + } + + operator fun invoke(func: SocketClient.() -> Unit) { + val ret = execute(func) + closeSocket() + return ret + } } diff --git a/socket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt b/socket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt index 5505bdc..6012d54 100644 --- a/socket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt +++ b/socket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt @@ -1,5 +1,7 @@ package cn.tursom.socket.server +import cn.tursom.core.log +import cn.tursom.core.logE import cn.tursom.socket.INioProtocol import cn.tursom.socket.niothread.INioThread import cn.tursom.socket.niothread.WorkerLoopNioThread @@ -19,12 +21,12 @@ class NioServer( ) : ISocketServer { private val listenChannel = ServerSocketChannel.open() private val threadList = ConcurrentLinkedDeque() - + init { listenChannel.socket().bind(InetSocketAddress(port), backLog) listenChannel.configureBlocking(false) } - + constructor( port: Int, protocol: INioProtocol, @@ -32,22 +34,23 @@ class NioServer( ) : this(port, protocol, backLog, { name, workLoop -> WorkerLoopNioThread(name, workLoop = workLoop, isDaemon = false) }) - + override fun run() { val nioThread = nioThreadGenerator("nio worker", LoopHandler(protocol)::handle) nioThread.register(listenChannel, SelectionKey.OP_ACCEPT) {} threadList.add(nioThread) } - + override fun close() { listenChannel.close() threadList.forEach { it.close() } } - + class LoopHandler(val protocol: INioProtocol) { fun handle(nioThread: INioThread) { + //logE("wake up") val selector = nioThread.selector if (selector.isOpen) { if (selector.select(TIMEOUT) != 0) { @@ -55,6 +58,7 @@ class NioServer( while (keyIter.hasNext()) run whileBlock@{ val key = keyIter.next() keyIter.remove() + //logE("selected key: $key: ${key.attachment()}") try { when { key.isAcceptable -> { @@ -90,7 +94,7 @@ class NioServer( } } } - + companion object { private const val TIMEOUT = 1000L } diff --git a/socket/src/test/kotlin/cn/tursom/socket/server/NioServerTest.kt b/socket/src/test/kotlin/cn/tursom/socket/server/NioServerTest.kt new file mode 100644 index 0000000..8ee99e2 --- /dev/null +++ b/socket/src/test/kotlin/cn/tursom/socket/server/NioServerTest.kt @@ -0,0 +1,66 @@ +package cn.tursom.socket.server + +import cn.tursom.core.bytebuffer.AdvanceByteBuffer +import cn.tursom.core.bytebuffer.ByteArrayAdvanceByteBuffer +import cn.tursom.core.bytebuffer.readNioBuffer +import cn.tursom.core.bytebuffer.writeNioBuffer +import cn.tursom.core.pool.DirectMemoryPool +import cn.tursom.core.pool.MemoryPool +import cn.tursom.socket.INioProtocol +import cn.tursom.socket.SocketClient +import cn.tursom.socket.niothread.INioThread +import org.junit.Test +import java.nio.channels.SelectionKey +import java.nio.channels.SocketChannel + +class NioServerTest { + private val port = 12345 + @Test + fun testNioServer() { + val memoryPool: MemoryPool = DirectMemoryPool(1024, 256) + val server = NioServer(port, object : INioProtocol { + override fun handleConnect(key: SelectionKey, nioThread: INioThread) { + val memoryToken = memoryPool.allocate() + key.attach(memoryToken to (memoryPool.getAdvanceByteBuffer(memoryToken) ?: ByteArrayAdvanceByteBuffer(1024))) + key.interestOps(SelectionKey.OP_READ) + } + + override fun handleRead(key: SelectionKey, nioThread: INioThread) { + val channel = key.channel() as SocketChannel + val buffer = (key.attachment() as Pair).second + buffer.writeNioBuffer { + channel.read(it) + } + println("record from client: ${buffer.toString(buffer.readableSize)}") + key.interestOps(SelectionKey.OP_WRITE) + } + + override fun handleWrite(key: SelectionKey, nioThread: INioThread) { + val channel = key.channel() as SocketChannel + val buffer = (key.attachment() as Pair).second + println("send to client: ${buffer.toString(buffer.readableSize)}") + buffer.readNioBuffer { + channel.write(it) + } + buffer.reset() + key.interestOps(SelectionKey.OP_READ) + } + + override fun exceptionCause(key: SelectionKey, nioThread: INioThread, e: Throwable) { + super.exceptionCause(key, nioThread, e) + val memoryToken = (key.attachment() as Pair).first + memoryPool.free(memoryToken) + key.channel().close() + key.cancel() + } + }) + + server.run() + val socket = SocketClient("127.0.0.1", port) + val buffer = ByteArray(1024) + socket.outputStream.write("Hello".toByteArray()) + val readCount = socket.inputStream.read(buffer) + println(buffer.copyOfRange(0, readCount).toString(Charsets.UTF_8)) + server.close() + } +} \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/bytebuffer/AdvanceByteBuffer.kt b/src/main/kotlin/cn/tursom/core/bytebuffer/AdvanceByteBuffer.kt index 57b3eca..b5c01d0 100644 --- a/src/main/kotlin/cn/tursom/core/bytebuffer/AdvanceByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/bytebuffer/AdvanceByteBuffer.kt @@ -1,435 +1,445 @@ package cn.tursom.core.bytebuffer import cn.tursom.core.forEachIndex +import cn.tursom.core.logE import sun.reflect.generics.reflectiveObjects.NotImplementedException import java.io.OutputStream import java.nio.ByteBuffer import kotlin.math.min interface AdvanceByteBuffer { - val nioBuffer: ByteBuffer - val nioBuffers: Array get() = arrayOf(nioBuffer) - - /** - * 各种位置变量 - */ - val readOnly: Boolean - val bufferCount: Int get() = 1 - - var writePosition: Int - var limit: Int - val capacity: Int - val hasArray: Boolean - val array: ByteArray - val arrayOffset: Int - var readPosition: Int - val readOffset: Int get() = arrayOffset + readPosition - val readableSize: Int - val available: Int get() = readableSize - val writeOffset: Int get() = arrayOffset + writePosition - val writeableSize: Int get() = limit - writePosition - val size: Int - val readMode: Boolean - - - fun readMode() - fun resumeWriteMode(usedSize: Int = 0) - - fun needReadSize(size: Int) { - if (readableSize < size) throw OutOfBufferException() - } - - fun useReadSize(size: Int): Int { - needReadSize(size) - readPosition += size - return size - } - - fun take(size: Int): Int { - needReadSize(size) - val offset = readOffset - readPosition += size - return offset - } - - fun push(size: Int): Int { - val offset = writeOffset - writePosition += size - return offset - } - - fun readAllSize() = useReadSize(readableSize) - fun takeAll() = take(readableSize) - - fun clear() - - fun reset() { - if (hasArray) { - array.copyInto(array, arrayOffset, readOffset, arrayOffset + writePosition) - writePosition = readableSize - readPosition = 0 - } else { - readMode() - nioBuffer.compact() - val writePosition = readPosition - resumeWriteMode() - readPosition = 0 - this.writePosition = writePosition - } - } - - fun reset(outputStream: OutputStream) { - if (hasArray) { - outputStream.write(array, readOffset, arrayOffset + writePosition) - } else { - outputStream.write(getBytes()) - } - writePosition = 0 - readPosition = 0 - } - - fun requireAvailableSize(size: Int) { - if (limit - readPosition < size) reset() - } - - - /* - * 数据获取方法 - */ - - - fun get(): Byte = if (readMode) { - nioBuffer.get() + val nioBuffer: ByteBuffer + val nioBuffers: Array get() = arrayOf(nioBuffer) + + /** + * 各种位置变量 + */ + val readOnly: Boolean + val bufferCount: Int get() = 1 + + var writePosition: Int + var limit: Int + val capacity: Int + val hasArray: Boolean + val array: ByteArray + val arrayOffset: Int + var readPosition: Int + val readOffset: Int get() = arrayOffset + readPosition + val readableSize: Int + val available: Int get() = readableSize + val writeOffset: Int get() = arrayOffset + writePosition + val writeableSize: Int get() = limit - writePosition + val size: Int + val readMode: Boolean + + + fun readMode() + fun resumeWriteMode(usedSize: Int = 0) + + fun needReadSize(size: Int) { + if (readableSize < size) throw OutOfBufferException() + } + + fun useReadSize(size: Int): Int { + needReadSize(size) + readPosition += size + return size + } + + fun take(size: Int): Int { + needReadSize(size) + val offset = readOffset + readPosition += size + return offset + } + + fun push(size: Int): Int { + val offset = writeOffset + writePosition += size + return offset + } + + fun readAllSize() = useReadSize(readableSize) + fun takeAll() = take(readableSize) + + fun clear() + + fun reset() { + if (hasArray) { + array.copyInto(array, arrayOffset, readOffset, arrayOffset + writePosition) + writePosition = readableSize + readPosition = 0 } else { - readMode() - val value = nioBuffer.get() - resumeWriteMode() - value + readMode() + nioBuffer.compact() + val writePosition = readPosition + resumeWriteMode() + readPosition = 0 + this.writePosition = writePosition } - - fun getChar(): Char = if (readMode) { - nioBuffer.char + } + + fun reset(outputStream: OutputStream) { + if (hasArray) { + outputStream.write(array, readOffset, arrayOffset + writePosition) } else { - readMode() - val value = nioBuffer.char - resumeWriteMode() - value + outputStream.write(getBytes()) } - - fun getShort(): Short = if (readMode) { - nioBuffer.short + writePosition = 0 + readPosition = 0 + } + + fun requireAvailableSize(size: Int) { + if (limit - readPosition < size) reset() + } + + + /* + * 数据获取方法 + */ + + + fun get(): Byte = if (readMode) { + nioBuffer.get() + } else { + readMode() + val value = nioBuffer.get() + resumeWriteMode() + value + } + + fun getChar(): Char = if (readMode) { + nioBuffer.char + } else { + readMode() + val value = nioBuffer.char + resumeWriteMode() + value + } + + fun getShort(): Short = if (readMode) { + nioBuffer.short + } else { + readMode() + val value = nioBuffer.short + resumeWriteMode() + value + } + + fun getInt(): Int = if (readMode) { + nioBuffer.int + } else { + readMode() + val value = nioBuffer.int + resumeWriteMode() + value + } + + fun getLong(): Long = if (readMode) { + nioBuffer.long + } else { + readMode() + val value = nioBuffer.long + resumeWriteMode() + value + } + + fun getFloat(): Float = if (readMode) { + nioBuffer.float + } else { + readMode() + val value = nioBuffer.float + resumeWriteMode() + value + } + + fun getDouble(): Double = if (readMode) { + nioBuffer.double + } else { + readMode() + val value = nioBuffer.double + resumeWriteMode() + value + } + + + fun getBytes(size: Int = readableSize): ByteArray = if (readMode) { + val bytes = ByteArray(size) + nioBuffer.get(bytes) + readPosition = writePosition + bytes + } else { + readMode() + val bytes = ByteArray(size) + nioBuffer.get(bytes) + readPosition = writePosition + resumeWriteMode() + bytes + } + + + fun getString(size: Int = readableSize): String = String(getBytes()) + fun toString(size: Int): String { + //logE("AdvanceByteBuffer.toString(size: Int): $this") + //val rp = readPosition + val bytes = getBytes(size) + //readPosition = rp + //logE("AdvanceByteBuffer.toString(size: Int): $this") + return String(bytes) + } + + fun writeTo(buffer: ByteArray, bufferOffset: Int = 0, size: Int = min(readableSize, buffer.size)): Int { + val readSize = min(readableSize, size) + if (hasArray) { + array.copyInto(buffer, bufferOffset, readOffset, readOffset + readSize) + readPosition += readOffset + reset() } else { - readMode() - val value = nioBuffer.short - resumeWriteMode() - value + readBuffer { + it.put(buffer, bufferOffset, readSize) + } } - - fun getInt(): Int = if (readMode) { - nioBuffer.int + return readSize + } + + fun writeTo(os: OutputStream): Int { + val size = readableSize + if (hasArray) { + os.write(array, arrayOffset + readPosition, size) + readPosition += size + reset() } else { - readMode() - val value = nioBuffer.int - resumeWriteMode() - value + val buffer = ByteArray(1024) + readBuffer { + while (it.remaining() > 0) { + it.put(buffer) + os.write(buffer) + } + } } - - fun getLong(): Long = if (readMode) { - nioBuffer.long + return size + } + + fun writeTo(buffer: AdvanceByteBuffer): Int { + val size = min(readableSize, buffer.writeableSize) + if (hasArray && buffer.hasArray) { + array.copyInto(buffer.array, buffer.writeOffset, readOffset, readOffset + size) + buffer.writePosition += size + readPosition += size + reset() } else { - readMode() - val value = nioBuffer.long - resumeWriteMode() - value + readBuffer { + buffer.nioBuffer.put(it) + } } - - fun getFloat(): Float = if (readMode) { - nioBuffer.float + return size + } + + fun toByteArray() = getBytes() + + + /* + * 数据写入方法 + */ + + fun put(byte: Byte) { + if (readMode) { + resumeWriteMode() + nioBuffer.put(byte) + readMode() } else { - readMode() - val value = nioBuffer.float - resumeWriteMode() - value + nioBuffer.put(byte) } - - fun getDouble(): Double = if (readMode) { - nioBuffer.double + } + + fun put(char: Char) { + if (readMode) { + resumeWriteMode() + nioBuffer.putChar(char) + readMode() } else { - readMode() - val value = nioBuffer.double - resumeWriteMode() - value + nioBuffer.putChar(char) } - - - fun getBytes(): ByteArray = if (readMode) { - val bytes = ByteArray(readableSize) - nioBuffer.get(bytes) - readPosition = writePosition - bytes + } + + fun put(short: Short) { + if (readMode) { + resumeWriteMode() + nioBuffer.putShort(short) + readMode() } else { - readMode() - val bytes = ByteArray(readableSize) - nioBuffer.get(bytes) - readPosition = writePosition - resumeWriteMode() - bytes + nioBuffer.putShort(short) } - - - fun getString(size: Int = readableSize): String = String(getBytes()) - fun toString(size: Int): String { - val rp = readPosition - val bytes = getBytes() - readPosition = rp - return String(bytes) + } + + fun put(int: Int) { + if (readMode) { + resumeWriteMode() + nioBuffer.putInt(int) + readMode() + } else { + nioBuffer.putInt(int) } - - fun writeTo(buffer: ByteArray, bufferOffset: Int = 0, size: Int = min(readableSize, buffer.size)): Int { - val readSize = min(readableSize, size) - if (hasArray) { - array.copyInto(buffer, bufferOffset, readOffset, readOffset + readSize) - readPosition += readOffset - reset() - } else { - readBuffer { - it.put(buffer, bufferOffset, readSize) - } - } - return readSize + } + + fun put(long: Long) { + if (readMode) { + resumeWriteMode() + nioBuffer.putLong(long) + readMode() + } else { + nioBuffer.putLong(long) } - - fun writeTo(os: OutputStream): Int { - val size = readableSize - if (hasArray) { - os.write(array, arrayOffset + readPosition, size) - readPosition += size - reset() - } else { - val buffer = ByteArray(1024) - readBuffer { - while (it.remaining() > 0) { - it.put(buffer) - os.write(buffer) - } - } - } - return size + } + + fun put(float: Float) { + if (readMode) { + resumeWriteMode() + nioBuffer.putFloat(float) + readMode() + } else { + nioBuffer.putFloat(float) } - - fun writeTo(buffer: AdvanceByteBuffer): Int { - val size = min(readableSize, buffer.writeableSize) - if (hasArray && buffer.hasArray) { - array.copyInto(buffer.array, buffer.writeOffset, readOffset, readOffset + size) - buffer.writePosition += size - readPosition += size - reset() - } else { - readBuffer { - buffer.nioBuffer.put(it) - } - } - return size + } + + fun put(double: Double) { + if (readMode) { + resumeWriteMode() + nioBuffer.putDouble(double) + readMode() + } else { + nioBuffer.putDouble(double) } - - fun toByteArray() = getBytes() - - - /* - * 数据写入方法 - */ - - fun put(byte: Byte) { - if (readMode) { - resumeWriteMode() - nioBuffer.put(byte) - readMode() - } else { - nioBuffer.put(byte) - } + } + + fun put(str: String) { + put(str.toByteArray()) + } + + fun put(byteArray: ByteArray, startIndex: Int = 0, endIndex: Int = byteArray.size - startIndex) { + if (readMode) { + resumeWriteMode() + nioBuffer.put(byteArray, startIndex, endIndex - startIndex) + readMode() + } else { + nioBuffer.put(byteArray, startIndex, endIndex - startIndex) } - - fun put(char: Char) { - if (readMode) { - resumeWriteMode() - nioBuffer.putChar(char) - readMode() - } else { - nioBuffer.putChar(char) - } + } + + fun put(array: CharArray, index: Int = 0, size: Int = array.size - index) { + if (readMode) { + resumeWriteMode() + array.forEachIndex(index, index + size - 1, this::put) + readMode() + } else { + array.forEachIndex(index, index + size - 1, this::put) } - - fun put(short: Short) { - if (readMode) { - resumeWriteMode() - nioBuffer.putShort(short) - readMode() - } else { - nioBuffer.putShort(short) - } + } + + fun put(array: ShortArray, index: Int = 0, size: Int = array.size - index) { + if (readMode) { + resumeWriteMode() + array.forEachIndex(index, index + size - 1, this::put) + readMode() + } else { + array.forEachIndex(index, index + size - 1, this::put) } - - fun put(int: Int) { - if (readMode) { - resumeWriteMode() - nioBuffer.putInt(int) - readMode() - } else { - nioBuffer.putInt(int) - } + } + + fun put(array: IntArray, index: Int = 0, size: Int = array.size - index) { + if (readMode) { + resumeWriteMode() + array.forEachIndex(index, index + size - 1, this::put) + readMode() + } else { + array.forEachIndex(index, index + size - 1, this::put) } - - fun put(long: Long) { - if (readMode) { - resumeWriteMode() - nioBuffer.putLong(long) - readMode() - } else { - nioBuffer.putLong(long) - } + } + + fun put(array: LongArray, index: Int = 0, size: Int = array.size - index) { + if (readMode) { + resumeWriteMode() + array.forEachIndex(index, index + size - 1, this::put) + readMode() + } else { + array.forEachIndex(index, index + size - 1, this::put) } - - fun put(float: Float) { - if (readMode) { - resumeWriteMode() - nioBuffer.putFloat(float) - readMode() - } else { - nioBuffer.putFloat(float) - } + } + + fun put(array: FloatArray, index: Int = 0, size: Int = array.size - index) { + if (readMode) { + resumeWriteMode() + array.forEachIndex(index, index + size - 1, this::put) + readMode() + } else { + array.forEachIndex(index, index + size - 1, this::put) } - - fun put(double: Double) { - if (readMode) { - resumeWriteMode() - nioBuffer.putDouble(double) - readMode() - } else { - nioBuffer.putDouble(double) - } + } + + fun put(array: DoubleArray, index: Int = 0, size: Int = array.size - index) { + if (readMode) { + resumeWriteMode() + array.forEachIndex(index, index + size - 1, this::put) + readMode() + } else { + array.forEachIndex(index, index + size - 1, this::put) } - - fun put(str: String) { - if (readMode) { - resumeWriteMode() - nioBuffer.put(str.toByteArray()) - readMode() - } else { - nioBuffer.put(str.toByteArray()) - } - } - - fun put(byteArray: ByteArray, startIndex: Int = 0, endIndex: Int = byteArray.size) { - if (readMode) { - resumeWriteMode() - nioBuffer.put(byteArray, startIndex, endIndex - startIndex) - readMode() - } else { - nioBuffer.put(byteArray, startIndex, endIndex - startIndex) - } - } - - fun put(array: CharArray, index: Int = 0, size: Int = array.size - index) { - if (readMode) { - resumeWriteMode() - array.forEachIndex(index, index + size - 1, this::put) - readMode() - } else { - array.forEachIndex(index, index + size - 1, this::put) - } - } - - fun put(array: ShortArray, index: Int = 0, size: Int = array.size - index) { - if (readMode) { - resumeWriteMode() - array.forEachIndex(index, index + size - 1, this::put) - readMode() - } else { - array.forEachIndex(index, index + size - 1, this::put) - } - } - - fun put(array: IntArray, index: Int = 0, size: Int = array.size - index) { - if (readMode) { - resumeWriteMode() - array.forEachIndex(index, index + size - 1, this::put) - readMode() - } else { - array.forEachIndex(index, index + size - 1, this::put) - } - } - - fun put(array: LongArray, index: Int = 0, size: Int = array.size - index) { - if (readMode) { - resumeWriteMode() - array.forEachIndex(index, index + size - 1, this::put) - readMode() - } else { - array.forEachIndex(index, index + size - 1, this::put) - } - } - - fun put(array: FloatArray, index: Int = 0, size: Int = array.size - index) { - if (readMode) { - resumeWriteMode() - array.forEachIndex(index, index + size - 1, this::put) - readMode() - } else { - array.forEachIndex(index, index + size - 1, this::put) - } - } - - fun put(array: DoubleArray, index: Int = 0, size: Int = array.size - index) { - if (readMode) { - resumeWriteMode() - array.forEachIndex(index, index + size - 1, this::put) - readMode() - } else { - array.forEachIndex(index, index + size - 1, this::put) - } - } - - fun peekString(size: Int = readableSize): String { - val readP = readPosition - val str = getString(size) - readPosition = readP - return str - } - - fun readBuffer(action: (nioBuffer: ByteBuffer) -> T): T = readNioBuffer(action) - fun writeBuffer(action: (nioBuffer: ByteBuffer) -> T): T = writeNioBuffer(action) - - suspend fun readSuspendBuffer(action: suspend (nioBuffer: ByteBuffer) -> T): T = readNioBuffer { action(it) } - suspend fun writeSuspendBuffer(action: suspend (nioBuffer: ByteBuffer) -> T): T = writeNioBuffer { action(it) } - - fun split(from: Int = readPosition, to: Int = writePosition): AdvanceByteBuffer { - return if (hasArray) { - ByteArrayAdvanceByteBuffer(array, arrayOffset + readPosition, to - from) - } else { - throw NotImplementedException() - } + } + + fun peekString(size: Int = readableSize): String { + val readP = readPosition + val str = getString(size) + readPosition = readP + return str + } + + fun readBuffer(action: (nioBuffer: ByteBuffer) -> T): T = readNioBuffer(action) + fun writeBuffer(action: (nioBuffer: ByteBuffer) -> T): T = writeNioBuffer(action) + + suspend fun readSuspendBuffer(action: suspend (nioBuffer: ByteBuffer) -> T): T = readNioBuffer { action(it) } + suspend fun writeSuspendBuffer(action: suspend (nioBuffer: ByteBuffer) -> T): T = writeNioBuffer { action(it) } + + fun split(from: Int = readPosition, to: Int = writePosition): AdvanceByteBuffer { + return if (hasArray) { + ByteArrayAdvanceByteBuffer(array, arrayOffset + readPosition, to - from) + } else { + throw NotImplementedException() } + } + + override fun toString(): String } inline fun AdvanceByteBuffer.readNioBuffer(action: (nioBuffer: ByteBuffer) -> T): T { - readMode() - val buffer = nioBuffer - val position = nioBuffer.position() - return try { - action(buffer) - } finally { - resumeWriteMode(buffer.position() - position) + val readMode = this.readMode + readMode() + val buffer = nioBuffer + val position = readPosition + val bufferPosition = nioBuffer.position() + return try { + //logE(buffer.toString()) + action(buffer) + } finally { + if (!readMode) { + resumeWriteMode(buffer.position() - position) + readPosition = position + (buffer.position() - bufferPosition) } + } } inline fun AdvanceByteBuffer.writeNioBuffer(action: (nioBuffer: ByteBuffer) -> T): T { - val buffer = nioBuffer - val position = writePosition - val bufferPosition = nioBuffer.position() - return try { - action(buffer) - } finally { - writePosition = position + (buffer.position() - bufferPosition) + val readMode = readMode + resumeWriteMode() + val buffer = nioBuffer + val position = writePosition + val bufferPosition = nioBuffer.position() + return try { + action(buffer) + } finally { + if (readMode) { + writePosition = position + (buffer.position() - bufferPosition) + readMode() } + } } diff --git a/src/main/kotlin/cn/tursom/core/bytebuffer/ByteArrayAdvanceByteBuffer.kt b/src/main/kotlin/cn/tursom/core/bytebuffer/ByteArrayAdvanceByteBuffer.kt index e33c1bd..9a39462 100644 --- a/src/main/kotlin/cn/tursom/core/bytebuffer/ByteArrayAdvanceByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/bytebuffer/ByteArrayAdvanceByteBuffer.kt @@ -5,150 +5,157 @@ import java.io.OutputStream import java.nio.ByteBuffer class ByteArrayAdvanceByteBuffer( - override val array: ByteArray, - val offset: Int = 0, - override val size: Int = array.size - offset, - override var readPosition: Int = 0, - override var writePosition: Int = size + override val array: ByteArray, + val offset: Int = 0, + override val size: Int = array.size - offset, + override var readPosition: Int = 0, + override var writePosition: Int = size ) : AdvanceByteBuffer { - constructor(size: Int) : this(ByteArray(size), 0, size, 0, 0) - - override val hasArray: Boolean get() = true - override var readOnly: Boolean = false - override val nioBuffer: ByteBuffer - get() = if (readMode) readByteBuffer - else writeByteBuffer - override var limit: Int = size - override val capacity: Int get() = size - override val arrayOffset: Int get() = offset - override val available: Int - get() = readableSize - override val writeableSize: Int get() = limit - writePosition - override var readMode: Boolean = false - - override fun readMode() { - readMode = true - } - - override fun resumeWriteMode(usedSize: Int) { - readPosition += usedSize - readMode = false - } - - override fun writeTo(os: OutputStream): Int { - val size = readableSize - os.write(array, readOffset, size) - reset() - return size - } - - - override val readOffset get() = offset + readPosition - override val writeOffset get() = offset + writePosition - - val readByteBuffer get() = HeapByteBuffer.wrap(array, offset + readPosition, writePosition - readPosition) - val writeByteBuffer get() = HeapByteBuffer.wrap(array, offset + writePosition, limit - writePosition) - - override val readableSize get() = writePosition - readPosition - - val position get() = "ArrayByteBuffer(size=$size, writePosition=$writePosition, readPosition=$readPosition)" - - /* - * 位置控制方法 - */ - - override fun clear() { - writePosition = 0 - readPosition = 0 - } - - override fun reset() { - array.copyInto(array, offset, readOffset, offset + writePosition) - writePosition = readableSize - readPosition = 0 - } - - override fun reset(outputStream: OutputStream) { - outputStream.write(array, readOffset, offset + writePosition) - writePosition = 0 - readPosition = 0 - } - - override fun needReadSize(size: Int) { - if (readableSize < size) throw OutOfBufferException() - } - - override fun take(size: Int): Int { - needReadSize(size) - val offset = readOffset - readPosition += size - return offset - } - - override fun useReadSize(size: Int): Int { - needReadSize(size) - readPosition += size - return size - } - - override fun push(size: Int): Int { - val offset = writeOffset - writePosition += size - return offset - } - - override fun readAllSize() = useReadSize(readableSize) - override fun takeAll() = take(readableSize) - - - /* - * 数据获取方法 - */ - - override fun get() = array[take(1)] - override fun getChar() = array.toChar(take(2)) - override fun getShort() = array.toShort(take(2)) - override fun getInt() = array.toInt(take(4)) - override fun getLong() = array.toLong(take(8)) - override fun getFloat() = array.toFloat(take(4)) - override fun getDouble() = array.toDouble(take(8)) - override fun getBytes() = array.copyOfRange(readPosition, readAllSize()) - override fun getString(size: Int) = String(array, readPosition, useReadSize(size)) - - override fun writeTo(buffer: ByteArray, bufferOffset: Int, size: Int): Int { - array.copyInto(buffer, bufferOffset, offset, useReadSize(size)) - return size - } - - override fun toByteArray() = getBytes() - - - /* - * 数据写入方法 - */ - - override fun put(byte: Byte) { - array.put(byte, push(1)) - } - - override fun put(char: Char) = array.put(char, push(2)) - override fun put(short: Short) = array.put(short, push(2)) - override fun put(int: Int) = array.put(int, push(4)) - override fun put(long: Long) = array.put(long, push(8)) - override fun put(float: Float) = array.put(float, push(4)) - override fun put(double: Double) = array.put(double, push(8)) - override fun put(str: String) = put(str.toByteArray()) - override fun put(byteArray: ByteArray, startIndex: Int, endIndex: Int) { - byteArray.copyInto(array, push(endIndex - startIndex), startIndex, endIndex) - } - - override fun toString(): String { - //return String(array, readOffset, readableSize) - return "ByteArrayAdvanceByteBuffer(size=$size, readPosition=$readPosition, writePosition=$writePosition)" - } - - /** - * 缓冲区用完异常 - */ - class OutOfBufferException : Exception() + constructor(size: Int) : this(ByteArray(size), 0, size, 0, 0) + + override val hasArray: Boolean get() = true + override var readOnly: Boolean = false + override val nioBuffer: ByteBuffer + get() = if (readMode) readByteBuffer + else writeByteBuffer + override var limit: Int = size + override val capacity: Int get() = size + override val arrayOffset: Int get() = offset + override val available: Int + get() = readableSize + override val writeableSize: Int get() = limit - writePosition + override var readMode: Boolean = false + + override fun readMode() { + readMode = true + } + + override fun resumeWriteMode(usedSize: Int) { + readPosition += usedSize + readMode = false + } + + override fun writeTo(os: OutputStream): Int { + val size = readableSize + os.write(array, readOffset, size) + reset() + return size + } + + + override val readOffset get() = offset + readPosition + override val writeOffset get() = offset + writePosition + + val readByteBuffer get() = HeapByteBuffer.wrap(array, offset + readPosition, writePosition - readPosition) + val writeByteBuffer get() = HeapByteBuffer.wrap(array, offset + writePosition, limit - writePosition) + + override val readableSize get() = writePosition - readPosition + + val position get() = "ArrayByteBuffer(size=$size, writePosition=$writePosition, readPosition=$readPosition)" + + /* + * 位置控制方法 + */ + + override fun clear() { + writePosition = 0 + readPosition = 0 + } + + override fun reset() { + array.copyInto(array, offset, readOffset, offset + writePosition) + writePosition = readableSize + readPosition = 0 + } + + override fun reset(outputStream: OutputStream) { + outputStream.write(array, readOffset, offset + writePosition) + writePosition = 0 + readPosition = 0 + } + + override fun needReadSize(size: Int) { + if (readableSize < size) throw OutOfBufferException() + } + + override fun take(size: Int): Int { + needReadSize(size) + val offset = readOffset + readPosition += size + return offset + } + + override fun useReadSize(size: Int): Int { + needReadSize(size) + readPosition += size + return size + } + + override fun push(size: Int): Int { + val offset = writeOffset + writePosition += size + return offset + } + + override fun readAllSize() = useReadSize(readableSize) + override fun takeAll() = take(readableSize) + + + /* + * 数据获取方法 + */ + + override fun get() = array[take(1)] + override fun getChar() = array.toChar(take(2)) + override fun getShort() = array.toShort(take(2)) + override fun getInt() = array.toInt(take(4)) + override fun getLong() = array.toLong(take(8)) + override fun getFloat() = array.toFloat(take(4)) + override fun getDouble() = array.toDouble(take(8)) + override fun getBytes(size: Int): ByteArray { + val readMode = readMode + readMode() + val array = array.copyOfRange(readPosition, useReadSize(size)) + if (!readMode) resumeWriteMode(size) + return array + } + + override fun getString(size: Int) = String(array, readPosition, useReadSize(size)) + + override fun writeTo(buffer: ByteArray, bufferOffset: Int, size: Int): Int { + array.copyInto(buffer, bufferOffset, offset, useReadSize(size)) + return size + } + + override fun toByteArray() = getBytes() + + + /* + * 数据写入方法 + */ + + override fun put(byte: Byte) { + array.put(byte, push(1)) + } + + override fun put(char: Char) = array.put(char, push(2)) + override fun put(short: Short) = array.put(short, push(2)) + override fun put(int: Int) = array.put(int, push(4)) + override fun put(long: Long) = array.put(long, push(8)) + override fun put(float: Float) = array.put(float, push(4)) + override fun put(double: Double) = array.put(double, push(8)) + override fun put(str: String) = put(str.toByteArray()) + override fun put(byteArray: ByteArray, startIndex: Int, endIndex: Int) { + byteArray.copyInto(array, push(endIndex - startIndex), startIndex, endIndex) + } + + override fun toString(): String { + //return String(array, readOffset, readableSize) + return "ByteArrayAdvanceByteBuffer(size=$size, readPosition=$readPosition, writePosition=$writePosition)" + } + + /** + * 缓冲区用完异常 + */ + class OutOfBufferException : Exception() } \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/bytebuffer/DirectNioAdvanceByteBuffer.kt b/src/main/kotlin/cn/tursom/core/bytebuffer/DirectNioAdvanceByteBuffer.kt index 02f3949..cd4cb66 100644 --- a/src/main/kotlin/cn/tursom/core/bytebuffer/DirectNioAdvanceByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/bytebuffer/DirectNioAdvanceByteBuffer.kt @@ -1,53 +1,68 @@ package cn.tursom.core.bytebuffer +import cn.tursom.core.logE import java.nio.ByteBuffer class DirectNioAdvanceByteBuffer(val buffer: ByteBuffer) : AdvanceByteBuffer { - override val nioBuffer: ByteBuffer get() = buffer - override val readOnly: Boolean get() = buffer.isReadOnly - override var writePosition: Int = buffer.position() - get() = field - set(value) { - if (!readMode) buffer.position(value) - field = value - } - override var limit: Int = buffer.limit() - get() = if (!readMode) buffer.limit() else field - set(value) { - if (!readMode) buffer.limit(value) - field = value - } - override val capacity: Int get() = buffer.capacity() - - override val hasArray: Boolean get() = false - override val array: ByteArray get() = buffer.array() - override val arrayOffset: Int = 0 - override var readPosition: Int = 0 - get() = if (readMode) buffer.position() else field - set(value) { - if (readMode) buffer.position(value) - field = value - } - override val readableSize: Int get() = if (readMode) buffer.remaining() else writePosition - readPosition - override val size: Int get() = buffer.capacity() - override var readMode: Boolean = false - - override fun readMode() { - if (!readMode) { - readMode = true - buffer.flip() - } - } - - override fun resumeWriteMode(usedSize: Int) { - if (readMode) { - readMode = false - buffer.limit(capacity) - buffer.position(writePosition) - } - } - - override fun clear() { - buffer.clear() - } + override val nioBuffer: ByteBuffer get() = buffer + override val readOnly: Boolean get() = buffer.isReadOnly + var writeMark = 0 + override var writePosition: Int + get() { + return if (readMode) writeMark + else buffer.position() + } + set(value) { + if (!readMode) buffer.position(value) + else buffer.limit(value) + } + override var limit: Int = buffer.limit() + get() = if (!readMode) buffer.limit() else field + set(value) { + if (!readMode) buffer.limit(value) + field = value + } + override val capacity: Int get() = buffer.capacity() + + override val hasArray: Boolean get() = false + override val array: ByteArray get() = buffer.array() + override val arrayOffset: Int = 0 + override var readPosition: Int = 0 + get() = if (readMode) buffer.position() else field + set(value) { + if (readMode) buffer.position(value) + field = value + } + override val readableSize: Int get() = if (readMode) buffer.remaining() else writePosition - readPosition + override val size: Int get() = buffer.capacity() + override var readMode: Boolean = false + + override fun readMode() { + if (!readMode) { + writeMark = writePosition + //logE("readMode() $this $writeMark $writePosition ${buffer.position()}") + readMode = true + buffer.flip() + buffer.position(readPosition) + //logE("readMode() $this $writeMark $writePosition ${buffer.position()}") + } + } + + override fun resumeWriteMode(usedSize: Int) { + if (readMode) { + readMode = false + buffer.limit(capacity) + buffer.position(writeMark) + } + } + + override fun clear() { + resumeWriteMode() + buffer.clear() + readPosition = 0 + } + + override fun toString(): String { + return "DirectNioAdvanceByteBuffer(buffer=$buffer, readMode=$readMode, readPosition=$readPosition, writePosition=$writePosition)" + } } \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/bytebuffer/HeapNioAdvanceByteBuffer.kt b/src/main/kotlin/cn/tursom/core/bytebuffer/HeapNioAdvanceByteBuffer.kt index c43190e..3df55d4 100644 --- a/src/main/kotlin/cn/tursom/core/bytebuffer/HeapNioAdvanceByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/bytebuffer/HeapNioAdvanceByteBuffer.kt @@ -6,160 +6,171 @@ import java.nio.ByteBuffer @Suppress("unused", "MemberVisibilityCanBePrivate") class HeapNioAdvanceByteBuffer(val buffer: ByteBuffer) : AdvanceByteBuffer { - constructor(size: Int) : this(ByteBuffer.allocate(size)) - constructor(buffer: ByteArray, offset: Int = 0, size: Int = buffer.size - offset) : this(HeapByteBuffer.wrap(buffer, offset, size)) - - override val nioBuffer: ByteBuffer get() = buffer - - override val hasArray: Boolean get() = buffer.hasArray() - override val readOnly: Boolean get() = buffer.isReadOnly - - private var _readMode = false - var readMark = 0 - var writeMark = 0 - - /** - * 各种位置变量 - */ - override var writePosition - get() = buffer.position() - set(value) { - buffer.position(value) - } - override var limit - get() = buffer.limit() - set(value) { - buffer.limit(value) - } - - override val capacity: Int = buffer.capacity() - override val array: ByteArray get() = buffer.array() - override val arrayOffset: Int get() = buffer.arrayOffset() - override var readPosition: Int = 0 - override val readOffset get() = arrayOffset + readPosition - override val readableSize get() = writePosition - readPosition - override val available get() = readableSize - override val writeOffset get() = arrayOffset + writePosition - override val writeableSize get() = limit - writePosition - override val size = buffer.capacity() - override val readMode get() = _readMode - - /* - * 位置控制方法 - */ - - override fun readMode() { - writeMark = buffer.position() - readMark = readPosition - buffer.limit(buffer.position()) - buffer.position(readPosition) - _readMode = true - } - - override fun resumeWriteMode(usedSize: Int) { - readPosition = readMark + usedSize - buffer.limit(buffer.capacity()) - buffer.position(writeMark) - _readMode = false - } - - override fun needReadSize(size: Int) { - if (readableSize < size) throw OutOfBufferException() - } - - override fun useReadSize(size: Int): Int { - needReadSize(size) - readPosition += size - return size - } - - override fun take(size: Int): Int { - needReadSize(size) - val offset = readOffset - readPosition += size - return offset - } - - override fun push(size: Int): Int { - val offset = writeOffset - writePosition += size - return offset - } - - override fun readAllSize() = useReadSize(readableSize) - override fun takeAll() = take(readableSize) - - override fun clear() { - readPosition = 0 - buffer.clear() - } - - override fun reset() { - array.copyInto(array, arrayOffset, readOffset, arrayOffset + writePosition) - writePosition = readableSize - readPosition = 0 - } - - override fun reset(outputStream: OutputStream) { - outputStream.write(array, readOffset, arrayOffset + writePosition) - writePosition = 0 - readPosition = 0 - } - - override fun requireAvailableSize(size: Int) { - if (limit - readPosition < size) reset() - } - - - /* - * 数据获取方法 - */ - - override fun get() = array[take(1)] - override fun getChar() = array.toChar(take(2)) - override fun getShort() = array.toShort(take(2)) - override fun getInt() = array.toInt(take(4)) - override fun getLong() = array.toLong(take(8)) - override fun getFloat() = array.toFloat(take(4)) - override fun getDouble() = array.toDouble(take(8)) - override fun getBytes() = array.copyOfRange(arrayOffset, readAllSize()) - override fun getString(size: Int) = String(array, readOffset, useReadSize(size)) - - override fun writeTo(buffer: ByteArray, bufferOffset: Int, size: Int): Int { - array.copyInto(buffer, bufferOffset, arrayOffset, useReadSize(size)) - return size - } - - override fun toByteArray() = getBytes() - - - /* - * 数据写入方法 - */ - - override fun put(byte: Byte) { - buffer.put(byte) - } - - override fun put(char: Char) = array.put(char, push(2)) - override fun put(short: Short) = array.put(short, push(2)) - override fun put(int: Int) = array.put(int, push(4)) - override fun put(long: Long) = array.put(long, push(8)) - override fun put(float: Float) = array.put(float, push(4)) - override fun put(double: Double) = array.put(double, push(8)) - override fun put(str: String) = put(str.toByteArray()) - override fun put(byteArray: ByteArray, startIndex: Int, endIndex: Int) { - byteArray.copyInto(array, push(endIndex - startIndex), startIndex, endIndex) - } - - override fun split(from: Int, to: Int): AdvanceByteBuffer { - val readMark = readPosition - val writeMark = writePosition - buffer.position(readMark) - buffer.limit(writeMark) - val slice = HeapNioAdvanceByteBuffer(buffer.slice()) - readPosition = readMark - writePosition = writeMark - return slice - } + constructor(size: Int) : this(ByteBuffer.allocate(size)) + constructor(buffer: ByteArray, offset: Int = 0, size: Int = buffer.size - offset) : this(HeapByteBuffer.wrap(buffer, offset, size)) + + override val nioBuffer: ByteBuffer get() = buffer + + override val hasArray: Boolean get() = buffer.hasArray() + override val readOnly: Boolean get() = buffer.isReadOnly + + private var _readMode = false + var readMark = 0 + var writeMark = 0 + + /** + * 各种位置变量 + */ + override var writePosition + get() = buffer.position() + set(value) { + buffer.position(value) + } + override var limit + get() = buffer.limit() + set(value) { + buffer.limit(value) + } + + override val capacity: Int = buffer.capacity() + override val array: ByteArray get() = buffer.array() + override val arrayOffset: Int get() = buffer.arrayOffset() + override var readPosition: Int = 0 + override val readOffset get() = arrayOffset + readPosition + override val readableSize get() = writePosition - readPosition + override val available get() = readableSize + override val writeOffset get() = arrayOffset + writePosition + override val writeableSize get() = limit - writePosition + override val size = buffer.capacity() + override val readMode get() = _readMode + + /* + * 位置控制方法 + */ + + override fun readMode() { + writeMark = buffer.position() + readMark = readPosition + buffer.limit(buffer.position()) + buffer.position(readPosition) + _readMode = true + } + + override fun resumeWriteMode(usedSize: Int) { + readPosition = readMark + usedSize + buffer.limit(buffer.capacity()) + buffer.position(writeMark) + _readMode = false + } + + override fun needReadSize(size: Int) { + if (readableSize < size) throw OutOfBufferException() + } + + override fun useReadSize(size: Int): Int { + needReadSize(size) + readPosition += size + return size + } + + override fun take(size: Int): Int { + needReadSize(size) + val offset = readOffset + readPosition += size + return offset + } + + override fun push(size: Int): Int { + val offset = writeOffset + writePosition += size + return offset + } + + override fun readAllSize() = useReadSize(readableSize) + override fun takeAll() = take(readableSize) + + override fun clear() { + readPosition = 0 + buffer.clear() + } + + override fun reset() { + array.copyInto(array, arrayOffset, readOffset, arrayOffset + writePosition) + writePosition = readableSize + readPosition = 0 + } + + override fun reset(outputStream: OutputStream) { + outputStream.write(array, readOffset, arrayOffset + writePosition) + writePosition = 0 + readPosition = 0 + } + + override fun requireAvailableSize(size: Int) { + if (limit - readPosition < size) reset() + } + + + /* + * 数据获取方法 + */ + + override fun get() = array[take(1)] + override fun getChar() = array.toChar(take(2)) + override fun getShort() = array.toShort(take(2)) + override fun getInt() = array.toInt(take(4)) + override fun getLong() = array.toLong(take(8)) + override fun getFloat() = array.toFloat(take(4)) + override fun getDouble() = array.toDouble(take(8)) + override fun getBytes(size: Int): ByteArray { + val readMode = readMode + readMode() + val array = array.copyOfRange(readPosition, useReadSize(size)) + if (!readMode) resumeWriteMode(size) + return array + } + + override fun getString(size: Int) = String(array, readOffset, useReadSize(size)) + + override fun writeTo(buffer: ByteArray, bufferOffset: Int, size: Int): Int { + array.copyInto(buffer, bufferOffset, arrayOffset, useReadSize(size)) + return size + } + + override fun toByteArray() = getBytes() + + + /* + * 数据写入方法 + */ + + override fun put(byte: Byte) { + buffer.put(byte) + } + + override fun put(char: Char) = array.put(char, push(2)) + override fun put(short: Short) = array.put(short, push(2)) + override fun put(int: Int) = array.put(int, push(4)) + override fun put(long: Long) = array.put(long, push(8)) + override fun put(float: Float) = array.put(float, push(4)) + override fun put(double: Double) = array.put(double, push(8)) + override fun put(str: String) = put(str.toByteArray()) + override fun put(byteArray: ByteArray, startIndex: Int, endIndex: Int) { + byteArray.copyInto(array, push(endIndex - startIndex), startIndex, endIndex) + } + + override fun split(from: Int, to: Int): AdvanceByteBuffer { + val readMark = readPosition + val writeMark = writePosition + buffer.position(readMark) + buffer.limit(writeMark) + val slice = HeapNioAdvanceByteBuffer(buffer.slice()) + readPosition = readMark + writePosition = writeMark + return slice + } + + override fun toString(): String { + return "HeapNioAdvanceByteBuffer(buffer=$buffer, readMode=$_readMode, readMark=$readMark, writeMark=$writeMark, capacity=$capacity, readPosition=$readPosition, size=$size)" + } } \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/bytebuffer/MultiAdvanceByteBuffer.kt b/src/main/kotlin/cn/tursom/core/bytebuffer/MultiAdvanceByteBuffer.kt index f4aab8c..5d68b54 100644 --- a/src/main/kotlin/cn/tursom/core/bytebuffer/MultiAdvanceByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/bytebuffer/MultiAdvanceByteBuffer.kt @@ -1,101 +1,108 @@ package cn.tursom.core.bytebuffer import java.nio.ByteBuffer +import java.util.* +import kotlin.collections.ArrayList class MultiAdvanceByteBuffer(vararg val buffers: AdvanceByteBuffer) : AdvanceByteBuffer { - init { - resumeWriteMode() - } - - var writeBufferIndex = 0 - var readBufferIndex = 0 - val readBuffer get() = buffers[writeBufferIndex] - val writeBuffer get() = buffers[writeBufferIndex] - - val operatorBuffer - get() = if (readMode) { - readBuffer - } else { - writeBuffer - } - - override val nioBuffers: Array - get() { - val bufList = ArrayList() - buffers.forEach { buffer -> - if (buffer.bufferCount == 1) { - bufList.add(buffer.nioBuffer) - } else { - buffer.nioBuffers.forEach { - bufList.add(it) - } - } - } - return bufList.toTypedArray() - } - override val hasArray: Boolean get() = false - override val readOnly: Boolean get() = false - override val bufferCount: Int get() = buffers.size - - override val nioBuffer: ByteBuffer get() = operatorBuffer.nioBuffer - override var writePosition: Int - get() = operatorBuffer.writePosition - set(value) { - operatorBuffer.writePosition = value - } - override var limit: Int - get() = operatorBuffer.limit - set(value) { - operatorBuffer.limit = value - } - override val capacity: Int get() = operatorBuffer.capacity - override val array: ByteArray get() = operatorBuffer.array - override val arrayOffset: Int get() = operatorBuffer.arrayOffset - override var readPosition: Int - get() = operatorBuffer.readPosition - set(value) { - operatorBuffer.readPosition = value - } - override val readOffset: Int get() = operatorBuffer.readOffset - override val readableSize: Int get() = operatorBuffer.readableSize - override val available: Int get() = operatorBuffer.available - override val writeOffset: Int get() = operatorBuffer.writeOffset - override val writeableSize: Int get() = operatorBuffer.writeableSize - override val size: Int get() = operatorBuffer.size - override var readMode: Boolean = false - - override fun readMode() { - readMode = true - buffers.forEach(AdvanceByteBuffer::readMode) - } - - override fun resumeWriteMode(usedSize: Int) { - readMode = false - buffers.forEach { it.resumeWriteMode() } - } - - override fun clear() { - writeBufferIndex = 0 - readBufferIndex = 0 - buffers.forEach { buffer -> buffer.clear() } - } - - override fun get(): Byte = readBuffer.get() - override fun getChar(): Char = readBuffer.getChar() - override fun getShort(): Short = readBuffer.getShort() - override fun getInt(): Int = readBuffer.getInt() - override fun getLong(): Long = readBuffer.getLong() - override fun getFloat(): Float = readBuffer.getFloat() - override fun getDouble(): Double = readBuffer.getDouble() - override fun getBytes(): ByteArray = readBuffer.getBytes() - override fun getString(size: Int): String = readBuffer.getString(size) - - override fun put(byte: Byte) = writeBuffer.put(byte) - override fun put(char: Char) = writeBuffer.put(char) - override fun put(short: Short) = writeBuffer.put(short) - override fun put(int: Int) = writeBuffer.put(int) - override fun put(long: Long) = writeBuffer.put(long) - override fun put(float: Float) = writeBuffer.put(float) - override fun put(double: Double) = writeBuffer.put(double) - override fun put(str: String) = writeBuffer.put(str) + init { + resumeWriteMode() + } + + var writeBufferIndex = 0 + var readBufferIndex = 0 + val readBuffer get() = buffers[writeBufferIndex] + val writeBuffer get() = buffers[writeBufferIndex] + + val operatorBuffer + get() = if (readMode) { + readBuffer + } else { + writeBuffer + } + + override val nioBuffers: Array + get() { + val bufList = ArrayList() + buffers.forEach { buffer -> + if (buffer.bufferCount == 1) { + bufList.add(buffer.nioBuffer) + } else { + buffer.nioBuffers.forEach { + bufList.add(it) + } + } + } + return bufList.toTypedArray() + } + override val hasArray: Boolean get() = false + override val readOnly: Boolean get() = false + override val bufferCount: Int get() = buffers.size + + override val nioBuffer: ByteBuffer get() = operatorBuffer.nioBuffer + override var writePosition: Int + get() = operatorBuffer.writePosition + set(value) { + operatorBuffer.writePosition = value + } + override var limit: Int + get() = operatorBuffer.limit + set(value) { + operatorBuffer.limit = value + } + override val capacity: Int get() = operatorBuffer.capacity + override val array: ByteArray get() = operatorBuffer.array + override val arrayOffset: Int get() = operatorBuffer.arrayOffset + override var readPosition: Int + get() = operatorBuffer.readPosition + set(value) { + operatorBuffer.readPosition = value + } + override val readOffset: Int get() = operatorBuffer.readOffset + override val readableSize: Int get() = operatorBuffer.readableSize + override val available: Int get() = operatorBuffer.available + override val writeOffset: Int get() = operatorBuffer.writeOffset + override val writeableSize: Int get() = operatorBuffer.writeableSize + override val size: Int get() = operatorBuffer.size + override var readMode: Boolean = false + + override fun readMode() { + readMode = true + buffers.forEach(AdvanceByteBuffer::readMode) + } + + override fun resumeWriteMode(usedSize: Int) { + readMode = false + buffers.forEach { it.resumeWriteMode() } + } + + override fun clear() { + writeBufferIndex = 0 + readBufferIndex = 0 + buffers.forEach { buffer -> buffer.clear() } + } + + override fun get(): Byte = readBuffer.get() + override fun getChar(): Char = readBuffer.getChar() + override fun getShort(): Short = readBuffer.getShort() + override fun getInt(): Int = readBuffer.getInt() + override fun getLong(): Long = readBuffer.getLong() + override fun getFloat(): Float = readBuffer.getFloat() + override fun getDouble(): Double = readBuffer.getDouble() + override fun getBytes(size: Int) = readBuffer.getBytes(size) + override fun getString(size: Int): String = readBuffer.getString(size) + + override fun put(byte: Byte) = writeBuffer.put(byte) + override fun put(char: Char) = writeBuffer.put(char) + override fun put(short: Short) = writeBuffer.put(short) + override fun put(int: Int) = writeBuffer.put(int) + override fun put(long: Long) = writeBuffer.put(long) + override fun put(float: Float) = writeBuffer.put(float) + override fun put(double: Double) = writeBuffer.put(double) + override fun put(str: String) = writeBuffer.put(str) + override fun toString(): String { + return "MultiAdvanceByteBuffer(buffers=${Arrays.toString(buffers)}, writeBufferIndex=$writeBufferIndex, readBufferIndex=$readBufferIndex, readMode=$readMode)" + } + + } \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/bytebuffer/NioAdvanceByteBuffer.kt b/src/main/kotlin/cn/tursom/core/bytebuffer/NioAdvanceByteBuffer.kt index e07b120..7ddb0c0 100644 --- a/src/main/kotlin/cn/tursom/core/bytebuffer/NioAdvanceByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/bytebuffer/NioAdvanceByteBuffer.kt @@ -3,8 +3,8 @@ package cn.tursom.core.bytebuffer import java.nio.ByteBuffer class NioAdvanceByteBuffer(val buffer: ByteBuffer) : - AdvanceByteBuffer by if (buffer.hasArray()) { - HeapNioAdvanceByteBuffer(buffer) - } else { - DirectNioAdvanceByteBuffer(buffer) - } + AdvanceByteBuffer by if (buffer.hasArray()) { + HeapNioAdvanceByteBuffer(buffer) + } else { + DirectNioAdvanceByteBuffer(buffer) + } diff --git a/src/main/kotlin/cn/tursom/core/pool/DirectMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/DirectMemoryPool.kt index 3b75cb4..8fa760e 100644 --- a/src/main/kotlin/cn/tursom/core/pool/DirectMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/DirectMemoryPool.kt @@ -47,4 +47,8 @@ class DirectMemoryPool(override val blockSize: Int = 1024, override val blockCou } else { null } + + override fun toString(): String { + return "DirectMemoryPool(blockSize=$blockSize, blockCount=$blockCount, bitMap=$bitMap)" + } } \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/pool/HeapMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/HeapMemoryPool.kt index 78a9018..020a04f 100644 --- a/src/main/kotlin/cn/tursom/core/pool/HeapMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/HeapMemoryPool.kt @@ -48,4 +48,8 @@ class HeapMemoryPool(override val blockSize: Int = 1024, override val blockCount } else { null } + + override fun toString(): String { + return "HeapMemoryPool(blockSize=$blockSize, blockCount=$blockCount, bitMap=$bitMap)" + } } \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/pool/MemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/MemoryPool.kt index 6089b3e..28f3905 100644 --- a/src/main/kotlin/cn/tursom/core/pool/MemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/MemoryPool.kt @@ -11,7 +11,7 @@ import java.nio.ByteBuffer interface MemoryPool { val blockSize: Int val blockCount: Int - + fun allocate(): Int fun free(token: Int) fun getMemory(token: Int): ByteBuffer? @@ -23,6 +23,8 @@ interface MemoryPool { null } } + + override fun toString(): String } diff --git a/src/main/kotlin/cn/tursom/core/timer/WheelTimer.kt b/src/main/kotlin/cn/tursom/core/timer/WheelTimer.kt index a64f5e7..48f90ac 100644 --- a/src/main/kotlin/cn/tursom/core/timer/WheelTimer.kt +++ b/src/main/kotlin/cn/tursom/core/timer/WheelTimer.kt @@ -8,103 +8,105 @@ import kotlin.concurrent.thread class WheelTimer( - val tick: Long = 200, - val wheelSize: Int = 512 + val tick: Long = 200, + val wheelSize: Int = 512 ) : Timer { - var closed = false - val taskQueueArray = Array(wheelSize) { TaskQueue() } - private var position = 0 - - override fun exec(timeout: Long, task: () -> Unit): TimerTask { - val index = ((timeout / tick + position + if (timeout % tick == 0L) 0 else 1) % wheelSize).toInt() - return taskQueueArray[index].offer(task, timeout) - } - - init { - thread(isDaemon = true, name = "wheelTimerLooper") { - while (!closed) { - position %= wheelSize - - val newQueue = TaskQueue() - val taskQueue = taskQueueArray[position] - taskQueueArray[position] = newQueue - - val time = System.currentTimeMillis() - var node = taskQueue.root.next - while (node != null) { - node = if (node.isOutTime(time)) { - val sNode = node - threadPool.execute { sNode.task() } - node.next - } else { - val next = node.next - newQueue.offer(node) - next - } - } - - position++ - sleep(tick) - } - } - } - - - class TaskQueue { - val root: TaskNode = TaskNode(0, {}, null, null) - - fun offer(task: () -> Unit, timeout: Long): TaskNode { - synchronized(root) { - val insert = TaskNode(timeout, task, root, root.next) - root.next = insert - insert.next?.prev = insert - return insert - } - } - - fun offer(node: TaskNode): TaskNode { - synchronized(root) { - node.next = root.next - node.next = node - node.next?.prev = node - return node - } - } - - inner class TaskNode( - val timeout: Long, - val task: () -> Unit, - var prev: TaskNode?, - var next: TaskNode? - ) : TimerTask { - val outTime = System.currentTimeMillis() + timeout - val isOutTime get() = System.currentTimeMillis() > outTime - - fun isOutTime(time: Long) = time > outTime - - override fun run() = task() - - override fun cancel() { - synchronized(root) { - prev?.next = next - next?.prev = prev - } - } - } - } - - companion object { - val threadPool: ExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), - object : ThreadFactory { - var threadNumber = 0 - override fun newThread(r: Runnable): Thread { - val thread = Thread(r) - thread.isDaemon = true - thread.name = "wheelTimerWorker-$threadNumber" - return thread - } - }) - val timer by lazy { WheelTimer(200, 1024) } - val smoothTimer by lazy { WheelTimer(20, 128) } - } + var closed = false + val taskQueueArray = Array(wheelSize) { TaskQueue() } + private var position = 0 + + override fun exec(timeout: Long, task: () -> Unit): TimerTask { + val index = ((timeout / tick + position + if (timeout % tick == 0L) 0 else 1) % wheelSize).toInt() + return taskQueueArray[index].offer(task, timeout) + } + + init { + thread(isDaemon = true, name = "wheelTimerLooper") { + val startTime = System.currentTimeMillis() + while (!closed) { + position %= wheelSize + + val newQueue = TaskQueue() + val taskQueue = taskQueueArray[position] + taskQueueArray[position] = newQueue + + val time = System.currentTimeMillis() + var node = taskQueue.root.next + while (node != null) { + node = if (node.isOutTime(time)) { + val sNode = node + threadPool.execute { sNode.task() } + node.next + } else { + val next = node.next + newQueue.offer(node) + next + } + } + + position++ + val nextSleep = startTime + tick * position - System.currentTimeMillis() + if (nextSleep > 0) sleep(tick) + } + } + } + + + class TaskQueue { + val root: TaskNode = TaskNode(0, {}, null, null) + + fun offer(task: () -> Unit, timeout: Long): TaskNode { + synchronized(root) { + val insert = TaskNode(timeout, task, root, root.next) + root.next = insert + insert.next?.prev = insert + return insert + } + } + + fun offer(node: TaskNode): TaskNode { + synchronized(root) { + node.next = root.next + node.next = node + node.next?.prev = node + return node + } + } + + inner class TaskNode( + val timeout: Long, + val task: () -> Unit, + var prev: TaskNode?, + var next: TaskNode? + ) : TimerTask { + val outTime = System.currentTimeMillis() + timeout + val isOutTime get() = System.currentTimeMillis() > outTime + + fun isOutTime(time: Long) = time > outTime + + override fun run() = task() + + override fun cancel() { + synchronized(root) { + prev?.next = next + next?.prev = prev + } + } + } + } + + companion object { + val threadPool: ExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), + object : ThreadFactory { + var threadNumber = 0 + override fun newThread(r: Runnable): Thread { + val thread = Thread(r) + thread.isDaemon = true + thread.name = "wheelTimerWorker-$threadNumber" + return thread + } + }) + val timer by lazy { WheelTimer(200, 1024) } + val smoothTimer by lazy { WheelTimer(20, 128) } + } } \ No newline at end of file diff --git a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyAdvanceByteBuffer.kt b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyAdvanceByteBuffer.kt index 8668c96..d4429d3 100644 --- a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyAdvanceByteBuffer.kt +++ b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyAdvanceByteBuffer.kt @@ -80,8 +80,8 @@ class NettyAdvanceByteBuffer(val byteBuf: ByteBuf) : AdvanceByteBuffer { override fun getFloat(): Float = byteBuf.readFloat() override fun getDouble(): Double = byteBuf.readDouble() - override fun getBytes(): ByteArray { - val bytes = ByteArray(byteBuf.readableBytes()) + override fun getBytes(size: Int): ByteArray { + val bytes = ByteArray(size) byteBuf.readBytes(bytes) return bytes } @@ -139,5 +139,8 @@ class NettyAdvanceByteBuffer(val byteBuf: ByteBuf) : AdvanceByteBuffer { override fun put(byteArray: ByteArray, startIndex: Int, endIndex: Int) { byteBuf.writeBytes(byteArray, startIndex, endIndex - startIndex) } - + + override fun toString(): String { + return "NettyAdvanceByteBuffer(byteBuf=$byteBuf, readMode=$readMode)" + } } \ No newline at end of file