修复 AsyncSocket 的 bug

This commit is contained in:
tursom 2019-11-03 23:22:39 +08:00
parent 5af7493ff5
commit 9765c5ead5
21 changed files with 1443 additions and 1132 deletions

View File

@ -1,8 +1,10 @@
package cn.tursom.socket package cn.tursom.socket
import cn.tursom.socket.niothread.INioThread import cn.tursom.core.log
import cn.tursom.core.logE
import cn.tursom.core.timer.TimerTask import cn.tursom.core.timer.TimerTask
import cn.tursom.core.timer.WheelTimer import cn.tursom.core.timer.WheelTimer
import cn.tursom.socket.niothread.INioThread
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.channels.SelectionKey import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel import java.nio.channels.SocketChannel
@ -19,15 +21,17 @@ import kotlin.coroutines.suspendCoroutine
*/ */
class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INioThread) : IAsyncNioSocket { class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INioThread) : IAsyncNioSocket {
override val channel: SocketChannel = key.channel() as SocketChannel override val channel: SocketChannel = key.channel() as SocketChannel
override suspend fun read(buffer: ByteBuffer): Int { override suspend fun read(buffer: ByteBuffer): Int {
if (buffer.remaining() == 0) return emptyBufferCode if (buffer.remaining() == 0) return emptyBufferCode
return operate { return operate {
//logE("read(buffer: ByteBuffer) wait read")
waitRead() waitRead()
//logE("read(buffer: ByteBuffer) wait read complete")
channel.read(buffer) channel.read(buffer)
} }
} }
override suspend fun read(buffer: Array<out ByteBuffer>): Long { override suspend fun read(buffer: Array<out ByteBuffer>): Long {
if (buffer.isEmpty()) return emptyBufferLongCode if (buffer.isEmpty()) return emptyBufferLongCode
return operate { return operate {
@ -35,7 +39,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
channel.read(buffer) channel.read(buffer)
} }
} }
override suspend fun write(buffer: ByteBuffer): Int { override suspend fun write(buffer: ByteBuffer): Int {
if (buffer.remaining() == 0) return emptyBufferCode if (buffer.remaining() == 0) return emptyBufferCode
return operate { return operate {
@ -43,7 +47,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
channel.write(buffer) channel.write(buffer)
} }
} }
override suspend fun write(buffer: Array<out ByteBuffer>): Long { override suspend fun write(buffer: Array<out ByteBuffer>): Long {
if (buffer.isEmpty()) return emptyBufferLongCode if (buffer.isEmpty()) return emptyBufferLongCode
return operate { return operate {
@ -51,16 +55,19 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
channel.write(buffer) channel.write(buffer)
} }
} }
override suspend fun read(buffer: ByteBuffer, timeout: Long): Int { override suspend fun read(buffer: ByteBuffer, timeout: Long): Int {
//logE("AsyncNioSocket.read(buffer: ByteBuffer, timeout: Long): $buffer, $timeout")
if (timeout <= 0) return read(buffer) if (timeout <= 0) return read(buffer)
if (buffer.remaining() == 0) return emptyBufferCode if (buffer.remaining() == 0) return emptyBufferCode
return operate { return operate {
//logE("wait read")
waitRead(timeout) waitRead(timeout)
//logE("wait read complete")
channel.read(buffer) channel.read(buffer)
} }
} }
override suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long): Long { override suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long): Long {
if (timeout <= 0) return read(buffer) if (timeout <= 0) return read(buffer)
if (buffer.isEmpty()) return emptyBufferLongCode if (buffer.isEmpty()) return emptyBufferLongCode
@ -69,7 +76,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
channel.read(buffer) channel.read(buffer)
} }
} }
override suspend fun write(buffer: ByteBuffer, timeout: Long): Int { override suspend fun write(buffer: ByteBuffer, timeout: Long): Int {
if (timeout <= 0) return write(buffer) if (timeout <= 0) return write(buffer)
if (buffer.remaining() == 0) return emptyBufferCode if (buffer.remaining() == 0) return emptyBufferCode
@ -78,7 +85,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
channel.write(buffer) channel.write(buffer)
} }
} }
override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long { override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long {
if (timeout <= 0) return write(buffer) if (timeout <= 0) return write(buffer)
if (buffer.isEmpty()) return emptyBufferLongCode if (buffer.isEmpty()) return emptyBufferLongCode
@ -87,7 +94,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
channel.write(buffer) channel.write(buffer)
} }
} }
override fun close() { override fun close() {
nioThread.execute { nioThread.execute {
channel.close() channel.close()
@ -95,7 +102,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
} }
nioThread.wakeup() nioThread.wakeup()
} }
private inline fun <T> operate(action: () -> T): T { private inline fun <T> operate(action: () -> T): T {
return try { return try {
action() action()
@ -104,37 +111,42 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
throw RuntimeException(e) throw RuntimeException(e)
} }
} }
private suspend inline fun waitRead(timeout: Long) { private suspend inline fun waitRead(timeout: Long) {
suspendCoroutine<Int> { suspendCoroutine<Int> {
key.attach(Context(it, timer.exec(timeout) { key.attach(Context(it, timer.exec(timeout) {
key.attach(null) key.attach(null)
waitMode()
it.resumeWithException(TimeoutException()) it.resumeWithException(TimeoutException())
readMode()
nioThread.wakeup()
})) }))
}
}
private suspend inline fun waitWrite(timeout: Long) {
suspendCoroutine<Int> {
key.attach(Context(it, timer.exec(timeout) {
key.attach(null)
it.resumeWithException(TimeoutException())
writeMode()
nioThread.wakeup()
}))
}
}
private suspend inline fun waitRead() {
suspendCoroutine<Int> {
key.attach(Context(it))
readMode() readMode()
nioThread.wakeup() nioThread.wakeup()
} }
} }
private suspend inline fun waitWrite(timeout: Long) {
suspendCoroutine<Int> {
key.attach(Context(it, timer.exec(timeout) {
key.attach(null)
waitMode()
it.resumeWithException(TimeoutException())
}))
writeMode()
nioThread.wakeup()
}
}
private suspend inline fun waitRead() {
suspendCoroutine<Int> {
//logE("waitRead() attach")
key.attach(Context(it))
//logE("waitRead() readMode()")
readMode()
//logE("waitRead() wakeup()")
nioThread.wakeup()
}
}
private suspend inline fun waitWrite() { private suspend inline fun waitWrite() {
suspendCoroutine<Int> { suspendCoroutine<Int> {
key.attach(Context(it)) key.attach(Context(it))
@ -142,26 +154,28 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
nioThread.wakeup() nioThread.wakeup()
} }
} }
data class Context(val cont: Continuation<Int>, val timeoutTask: TimerTask? = null) data class Context(val cont: Continuation<Int>, val timeoutTask: TimerTask? = null)
companion object { companion object {
val nioSocketProtocol = object : INioProtocol { val nioSocketProtocol = object : INioProtocol {
override fun handleConnect(key: SelectionKey, nioThread: INioThread) {} override fun handleConnect(key: SelectionKey, nioThread: INioThread) {}
override fun handleRead(key: SelectionKey, nioThread: INioThread) { override fun handleRead(key: SelectionKey, nioThread: INioThread) {
key.interestOps(0)
//logE("read ready")
val context = key.attachment() as Context? ?: return
context.timeoutTask?.cancel()
context.cont.resume(0)
}
override fun handleWrite(key: SelectionKey, nioThread: INioThread) {
key.interestOps(0) key.interestOps(0)
val context = key.attachment() as Context? ?: return val context = key.attachment() as Context? ?: return
context.timeoutTask?.cancel() context.timeoutTask?.cancel()
context.cont.resume(0) context.cont.resume(0)
} }
override fun handleWrite(key: SelectionKey, nioThread: INioThread) {
key.interestOps(0)
val context = key.attachment() as Context? ?: return
context.cont.resume(0)
}
override fun exceptionCause(key: SelectionKey, nioThread: INioThread, e: Throwable) { override fun exceptionCause(key: SelectionKey, nioThread: INioThread, e: Throwable) {
key.interestOps(0) key.interestOps(0)
val context = key.attachment() as Context? val context = key.attachment() as Context?
@ -174,10 +188,10 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
} }
} }
} }
//val timer = StaticWheelTimer.timer //val timer = StaticWheelTimer.timer
val timer = WheelTimer.timer val timer = WheelTimer.timer
const val emptyBufferCode = 0 const val emptyBufferCode = 0
const val emptyBufferLongCode = 0L const val emptyBufferLongCode = 0L
} }

View File

@ -3,41 +3,45 @@ package cn.tursom.socket
import cn.tursom.core.bytebuffer.AdvanceByteBuffer import cn.tursom.core.bytebuffer.AdvanceByteBuffer
import cn.tursom.core.bytebuffer.readNioBuffer import cn.tursom.core.bytebuffer.readNioBuffer
import cn.tursom.core.bytebuffer.writeNioBuffer import cn.tursom.core.bytebuffer.writeNioBuffer
import cn.tursom.core.logE
import java.io.Closeable import java.io.Closeable
import java.nio.ByteBuffer import java.nio.ByteBuffer
interface AsyncSocket : Closeable { interface AsyncSocket : Closeable {
suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long = 0L): Long suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long = 0L): Long
suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long = 0L): Long suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long = 0L): Long
suspend fun write(buffer: ByteBuffer, timeout: Long = 0L): Int = write(arrayOf(buffer)).toInt() suspend fun write(buffer: ByteBuffer, timeout: Long = 0L): Int = write(arrayOf(buffer), timeout).toInt()
suspend fun read(buffer: ByteBuffer, timeout: Long = 0L): Int = read(arrayOf(buffer)).toInt() suspend fun read(buffer: ByteBuffer, timeout: Long = 0L): Int = read(arrayOf(buffer), timeout).toInt()
override fun close() override fun close()
suspend fun write(buffer: AdvanceByteBuffer, timeout: Long = 0): Int { suspend fun write(buffer: AdvanceByteBuffer, timeout: Long = 0): Int {
return if (buffer.bufferCount == 1) { return if (buffer.bufferCount == 1) {
buffer.readNioBuffer { buffer.readNioBuffer {
write(it, timeout) //logE(it.toString())
} write(it, timeout)
} else { }
val readMode = buffer.readMode } else {
buffer.readMode() val readMode = buffer.readMode
val value = write(buffer.nioBuffers, timeout).toInt() buffer.readMode()
if (!readMode) buffer.resumeWriteMode() val value = write(buffer.nioBuffers, timeout).toInt()
value if (!readMode) buffer.resumeWriteMode()
} value
} }
}
suspend fun read(buffer: AdvanceByteBuffer, timeout: Long = 0): Int {
return if (buffer.bufferCount == 1) { suspend fun read(buffer: AdvanceByteBuffer, timeout: Long = 0): Int {
buffer.writeNioBuffer { //logE("buffer.bufferCount: ${buffer.bufferCount}")
read(it, timeout) //logE("AsyncSocket.read(buffer: AdvanceByteBuffer, timeout: Long = 0): buffer: $buffer")
} return if (buffer.bufferCount == 1) {
} else { buffer.writeNioBuffer {
val readMode = buffer.readMode read(it, timeout)
buffer.resumeWriteMode() }
val value = read(buffer.nioBuffers, timeout).toInt() } else {
if (readMode) buffer.readMode() val readMode = buffer.readMode
value buffer.resumeWriteMode()
} val value = read(buffer.nioBuffers, timeout).toInt()
} if (readMode) buffer.readMode()
value
}
}
} }

View File

@ -3,6 +3,7 @@ package cn.tursom.socket
import cn.tursom.socket.niothread.INioThread import cn.tursom.socket.niothread.INioThread
import cn.tursom.core.bytebuffer.AdvanceByteBuffer import cn.tursom.core.bytebuffer.AdvanceByteBuffer
import cn.tursom.core.bytebuffer.writeNioBuffer import cn.tursom.core.bytebuffer.writeNioBuffer
import cn.tursom.core.logE
import java.net.SocketException import java.net.SocketException
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.channels.SelectionKey import java.nio.channels.SelectionKey
@ -12,7 +13,7 @@ interface IAsyncNioSocket : AsyncSocket {
val channel: SocketChannel val channel: SocketChannel
val key: SelectionKey val key: SelectionKey
val nioThread: INioThread val nioThread: INioThread
fun waitMode() { fun waitMode() {
if (Thread.currentThread() == nioThread.thread) { if (Thread.currentThread() == nioThread.thread) {
if (key.isValid) key.interestOps(SelectionKey.OP_WRITE) if (key.isValid) key.interestOps(SelectionKey.OP_WRITE)
@ -21,16 +22,21 @@ interface IAsyncNioSocket : AsyncSocket {
nioThread.wakeup() nioThread.wakeup()
} }
} }
fun readMode() { fun readMode() {
//logE("readMode()")
if (Thread.currentThread() == nioThread.thread) { if (Thread.currentThread() == nioThread.thread) {
if (key.isValid) key.interestOps(SelectionKey.OP_WRITE) if (key.isValid) key.interestOps(SelectionKey.OP_WRITE)
} else { } else {
nioThread.execute { if (key.isValid) key.interestOps(SelectionKey.OP_READ) } nioThread.execute {
//logE("readMode() interest")
if (key.isValid) key.interestOps(SelectionKey.OP_READ)
//logE("readMode interestOps ${key.isValid} ${key.interestOps()}")
}
nioThread.wakeup() nioThread.wakeup()
} }
} }
fun writeMode() { fun writeMode() {
if (Thread.currentThread() == nioThread.thread) { if (Thread.currentThread() == nioThread.thread) {
if (key.isValid) key.interestOps(SelectionKey.OP_WRITE) if (key.isValid) key.interestOps(SelectionKey.OP_WRITE)
@ -39,7 +45,7 @@ interface IAsyncNioSocket : AsyncSocket {
nioThread.wakeup() nioThread.wakeup()
} }
} }
suspend fun read(buffer: ByteBuffer): Int = read(arrayOf(buffer)).toInt() suspend fun read(buffer: ByteBuffer): Int = read(arrayOf(buffer)).toInt()
suspend fun write(buffer: ByteBuffer): Int = write(arrayOf(buffer)).toInt() suspend fun write(buffer: ByteBuffer): Int = write(arrayOf(buffer)).toInt()
suspend fun read(buffer: Array<out ByteBuffer>): Long suspend fun read(buffer: Array<out ByteBuffer>): Long
@ -55,7 +61,7 @@ interface IAsyncNioSocket : AsyncSocket {
} }
return readSize return readSize
} }
suspend fun recv(buffer: ByteBuffer, timeout: Long): Int { suspend fun recv(buffer: ByteBuffer, timeout: Long): Int {
if (buffer.remaining() == 0) return emptyBufferCode if (buffer.remaining() == 0) return emptyBufferCode
val readSize = read(buffer, timeout) val readSize = read(buffer, timeout)
@ -64,7 +70,7 @@ interface IAsyncNioSocket : AsyncSocket {
} }
return readSize return readSize
} }
suspend fun recv(buffers: Array<out ByteBuffer>, timeout: Long): Long { suspend fun recv(buffers: Array<out ByteBuffer>, timeout: Long): Long {
if (buffers.isEmpty()) return emptyBufferLongCode if (buffers.isEmpty()) return emptyBufferLongCode
val readSize = read(buffers, timeout) val readSize = read(buffers, timeout)
@ -73,7 +79,7 @@ interface IAsyncNioSocket : AsyncSocket {
} }
return readSize return readSize
} }
suspend fun recv(buffer: AdvanceByteBuffer, timeout: Long = 0): Int { suspend fun recv(buffer: AdvanceByteBuffer, timeout: Long = 0): Int {
return if (buffer.bufferCount == 1) { return if (buffer.bufferCount == 1) {
buffer.writeNioBuffer { buffer.writeNioBuffer {
@ -87,7 +93,7 @@ interface IAsyncNioSocket : AsyncSocket {
value value
} }
} }
companion object { companion object {
const val emptyBufferCode = 0 const val emptyBufferCode = 0
const val emptyBufferLongCode = 0L const val emptyBufferLongCode = 0L

View File

@ -23,7 +23,7 @@ class AsyncNioServer(
try { try {
socket.handler() socket.handler()
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() Exception(e).printStackTrace()
} finally { } finally {
try { try {
socket.close() socket.close()
@ -41,7 +41,7 @@ class AsyncNioServer(
backlog: Int = 50, backlog: Int = 50,
handler: Handler handler: Handler
) : this(port, backlog, { handler.handle(this) }) ) : this(port, backlog, { handler.handle(this) })
interface Handler { interface Handler {
fun handle(socket: AsyncNioSocket) fun handle(socket: AsyncNioSocket)
} }

View File

@ -0,0 +1,93 @@
import cn.tursom.core.bytebuffer.ByteArrayAdvanceByteBuffer
import cn.tursom.core.log
import cn.tursom.core.pool.DirectMemoryPool
import cn.tursom.core.pool.usingAdvanceByteBuffer
import cn.tursom.socket.AsyncNioClient
import cn.tursom.socket.server.AsyncNioServer
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.lang.Thread.sleep
import java.net.SocketException
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
fun main() {
// 服务器端口,可任意指定
val port = 12345
// 创建一个直接内存池每个块是1024字节共有256个快
val memoryPool = DirectMemoryPool(1024, 256)
// 创建服务器对象
val server = AsyncNioServer(port) {
// 这里处理业务逻辑,套接字对象被以 this 的方式传进来
// 从内存池中获取一个内存块
memoryPool.usingAdvanceByteBuffer {
// 检查是否获取成功,不成功就创建一个堆缓冲
val buffer = it ?: ByteArrayAdvanceByteBuffer(1024)
try {
while (true) {
buffer.clear()
// 从套接字中读数据,五秒之内没有数据就抛出异常
if (read(buffer, 10_000) < 0) {
return@AsyncNioServer
}
// 输出读取到的数据
//log("server recv from ${channel.remoteAddress}: [${buffer.readableSize}] ${buffer.toString(buffer.readableSize)}")
// 原封不动的返回数据
val writeSize = write(buffer)
//log("server send [$writeSize] bytes")
}
} catch (e: TimeoutException) {
}
// 代码块结束后,框架会自动释放连接
}
}
server.run()
val connectionCount = 300
val dataPerConn = 10
val testData = "testData".toByteArray()
val remain = AtomicInteger(connectionCount)
val clientMemoryPool = DirectMemoryPool(1024, connectionCount)
val start = System.currentTimeMillis()
repeat(connectionCount) {
GlobalScope.launch {
val socket = AsyncNioClient.connect("127.0.0.1", port)
clientMemoryPool.usingAdvanceByteBuffer {
// 检查是否获取成功,不成功就创建一个堆缓冲
val buffer = it ?: ByteArrayAdvanceByteBuffer(1024)
try {
repeat(dataPerConn) {
buffer.clear()
buffer.put(testData)
//log("client sending: [${buffer.readableSize}] ${buffer.toString(buffer.readableSize)}")
val writeSize = socket.write(buffer)
//log("client write [$writeSize] bytes")
//log(buffer.toString())
val readSize = socket.read(buffer)
//log(buffer.toString())
//log("client recv: [$readSize:${buffer.readableSize}] ${buffer.toString(buffer.readableSize)}")
}
} catch (e: Exception) {
Exception(e).printStackTrace()
} finally {
socket.close()
}
}
remain.decrementAndGet()
}
}
while (remain.get() != 0) {
println(remain.get())
sleep(500)
}
val end = System.currentTimeMillis()
println(end - start)
server.close()
}

View File

@ -0,0 +1,59 @@
package cn.tursom.socket.server
import cn.tursom.core.bytebuffer.ByteArrayAdvanceByteBuffer
import cn.tursom.core.log
import cn.tursom.core.logE
import cn.tursom.socket.AsyncAioClient
import cn.tursom.socket.SocketClient
import kotlinx.coroutines.runBlocking
import org.junit.Test
class AsyncNioServerTest {
private val testMsg = "hello"
private val port = 12345
private val server = AsyncNioServer(port) {
log("new connection")
val buffer = ByteArrayAdvanceByteBuffer(1024)
while (true) {
buffer.clear()
read(buffer, 5000)
logE("server recv: ${buffer.toString(buffer.readableSize)}")
write(buffer, 5000)
}
}
init {
server.run()
}
//@Test
fun testAsyncNioServer() {
runBlocking {
val client = AsyncAioClient.connect("127.0.0.1", port)
log("connect to server")
val buffer = ByteArrayAdvanceByteBuffer(1024)
repeat(10) {
buffer.clear()
buffer.put(testMsg)
client.write(buffer, 5000)
buffer.clear()
client.read(buffer, 5000)
log("server recv: ${buffer.getString()}")
}
}
}
//@Test
fun testAsyncNioServerSocket() {
SocketClient("localhost", port) {
val buffer = ByteArray(1024)
repeat(10) {
send(testMsg)
val readSize = inputStream.read(buffer)
val recv = String(buffer, 0, readSize)
log(recv)
}
}
}
}

View File

@ -12,65 +12,65 @@ open class BaseSocket(
val socket: Socket, val socket: Socket,
val timeout: Int = Companion.timeout val timeout: Int = Companion.timeout
) : Closeable { ) : Closeable {
val address = socket.inetAddress?.toString()?.drop(1) ?: "0.0.0.0" val address = socket.inetAddress?.toString()?.drop(1) ?: "0.0.0.0"
val port = socket.port val port = socket.port
val localPort = socket.localPort val localPort = socket.localPort
val inputStream = socket.getInputStream()!! val inputStream by lazy { socket.getInputStream()!! }
val outputStream = socket.getOutputStream()!! val outputStream by lazy { socket.getOutputStream()!! }
fun send(message: String?) { fun send(message: String?) {
send((message ?: return).toByteArray()) send((message ?: return).toByteArray())
}
fun send(message: ByteArray?) {
outputStream.write(message ?: return)
}
fun send(message: Int) {
val buffer = ByteArray(4)
buffer.put(message)
send(buffer)
}
fun send(message: Long) {
val buffer = ByteArray(8)
buffer.put(message)
send(buffer)
}
override fun close() {
closeSocket()
}
protected fun closeSocket() {
if (!socket.isClosed) {
closeInputStream()
closeOutputStream()
socket.close()
} }
}
fun send(message: ByteArray?) {
outputStream.write(message ?: return) private fun closeInputStream() {
try {
inputStream.close()
} catch (e: Exception) {
} }
}
fun send(message: Int) {
val buffer = ByteArray(4) private fun closeOutputStream() {
buffer.put(message) try {
send(buffer) outputStream.close()
} } catch (e: Exception) {
fun send(message: Long) {
val buffer = ByteArray(8)
buffer.put(message)
send(buffer)
}
override fun close() {
closeSocket()
}
protected fun closeSocket() {
if (!socket.isClosed) {
closeInputStream()
closeOutputStream()
socket.close()
}
}
private fun closeInputStream() {
try {
inputStream.close()
} catch (e: Exception) {
}
}
private fun closeOutputStream() {
try {
outputStream.close()
} catch (e: Exception) {
}
}
fun isConnected(): Boolean {
return socket.isConnected
}
companion object Companion {
const val defaultReadSize: Int = 1024 * 8
const val timeout: Int = 60 * 1000
} }
}
fun isConnected(): Boolean {
return socket.isConnected
}
companion object Companion {
const val defaultReadSize: Int = 1024 * 8
const val timeout: Int = 60 * 1000
}
} }

View File

@ -5,47 +5,47 @@ import java.net.Socket
import java.net.SocketException import java.net.SocketException
class SocketClient( class SocketClient(
socket: Socket, socket: Socket,
timeout: Int = Companion.timeout, timeout: Int = Companion.timeout,
private val ioException: IOException.() -> Unit = { printStackTrace() }, private val ioException: IOException.() -> Unit = { printStackTrace() },
private val exception: Exception.() -> Unit = { printStackTrace() }, private val exception: Exception.() -> Unit = { printStackTrace() },
func: (SocketClient.() -> Unit)? = null func: (SocketClient.() -> Unit)? = null
) : BaseSocket(socket, timeout) { ) : BaseSocket(socket, timeout) {
init { init {
func?.let { func?.let {
use(it) invoke(it)
} }
} }
constructor( constructor(
host: String, host: String,
port: Int, port: Int,
timeout: Int = Companion.timeout, timeout: Int = Companion.timeout,
ioException: IOException.() -> Unit = { printStackTrace() }, ioException: IOException.() -> Unit = { printStackTrace() },
exception: Exception.() -> Unit = { printStackTrace() }, exception: Exception.() -> Unit = { printStackTrace() },
func: (SocketClient.() -> Unit)? = null func: (SocketClient.() -> Unit)? = null
) : this(Socket(host, port), timeout, ioException, exception, func) ) : this(Socket(host, port), timeout, ioException, exception, func)
fun execute(func: SocketClient.() -> Unit) { fun execute(func: SocketClient.() -> Unit) {
try { try {
func() func()
} catch (io: IOException) { } catch (io: IOException) {
io.ioException() io.ioException()
} catch (e: SocketException) { } catch (e: SocketException) {
if (e.message == null) { if (e.message == null) {
e.printStackTrace() e.printStackTrace()
} else { } else {
System.err.println("$address: ${e::class.java}: ${e.message}") System.err.println("$address: ${e::class.java}: ${e.message}")
} }
} catch (e: Exception) { } catch (e: Exception) {
e.exception() e.exception()
} }
} }
fun use(func: SocketClient.() -> Unit) { operator fun invoke(func: SocketClient.() -> Unit) {
val ret = execute(func) val ret = execute(func)
closeSocket() closeSocket()
return ret return ret
} }
} }

View File

@ -1,5 +1,7 @@
package cn.tursom.socket.server package cn.tursom.socket.server
import cn.tursom.core.log
import cn.tursom.core.logE
import cn.tursom.socket.INioProtocol import cn.tursom.socket.INioProtocol
import cn.tursom.socket.niothread.INioThread import cn.tursom.socket.niothread.INioThread
import cn.tursom.socket.niothread.WorkerLoopNioThread import cn.tursom.socket.niothread.WorkerLoopNioThread
@ -19,12 +21,12 @@ class NioServer(
) : ISocketServer { ) : ISocketServer {
private val listenChannel = ServerSocketChannel.open() private val listenChannel = ServerSocketChannel.open()
private val threadList = ConcurrentLinkedDeque<INioThread>() private val threadList = ConcurrentLinkedDeque<INioThread>()
init { init {
listenChannel.socket().bind(InetSocketAddress(port), backLog) listenChannel.socket().bind(InetSocketAddress(port), backLog)
listenChannel.configureBlocking(false) listenChannel.configureBlocking(false)
} }
constructor( constructor(
port: Int, port: Int,
protocol: INioProtocol, protocol: INioProtocol,
@ -32,22 +34,23 @@ class NioServer(
) : this(port, protocol, backLog, { name, workLoop -> ) : this(port, protocol, backLog, { name, workLoop ->
WorkerLoopNioThread(name, workLoop = workLoop, isDaemon = false) WorkerLoopNioThread(name, workLoop = workLoop, isDaemon = false)
}) })
override fun run() { override fun run() {
val nioThread = nioThreadGenerator("nio worker", LoopHandler(protocol)::handle) val nioThread = nioThreadGenerator("nio worker", LoopHandler(protocol)::handle)
nioThread.register(listenChannel, SelectionKey.OP_ACCEPT) {} nioThread.register(listenChannel, SelectionKey.OP_ACCEPT) {}
threadList.add(nioThread) threadList.add(nioThread)
} }
override fun close() { override fun close() {
listenChannel.close() listenChannel.close()
threadList.forEach { threadList.forEach {
it.close() it.close()
} }
} }
class LoopHandler(val protocol: INioProtocol) { class LoopHandler(val protocol: INioProtocol) {
fun handle(nioThread: INioThread) { fun handle(nioThread: INioThread) {
//logE("wake up")
val selector = nioThread.selector val selector = nioThread.selector
if (selector.isOpen) { if (selector.isOpen) {
if (selector.select(TIMEOUT) != 0) { if (selector.select(TIMEOUT) != 0) {
@ -55,6 +58,7 @@ class NioServer(
while (keyIter.hasNext()) run whileBlock@{ while (keyIter.hasNext()) run whileBlock@{
val key = keyIter.next() val key = keyIter.next()
keyIter.remove() keyIter.remove()
//logE("selected key: $key: ${key.attachment()}")
try { try {
when { when {
key.isAcceptable -> { key.isAcceptable -> {
@ -90,7 +94,7 @@ class NioServer(
} }
} }
} }
companion object { companion object {
private const val TIMEOUT = 1000L private const val TIMEOUT = 1000L
} }

View File

@ -0,0 +1,66 @@
package cn.tursom.socket.server
import cn.tursom.core.bytebuffer.AdvanceByteBuffer
import cn.tursom.core.bytebuffer.ByteArrayAdvanceByteBuffer
import cn.tursom.core.bytebuffer.readNioBuffer
import cn.tursom.core.bytebuffer.writeNioBuffer
import cn.tursom.core.pool.DirectMemoryPool
import cn.tursom.core.pool.MemoryPool
import cn.tursom.socket.INioProtocol
import cn.tursom.socket.SocketClient
import cn.tursom.socket.niothread.INioThread
import org.junit.Test
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
class NioServerTest {
private val port = 12345
@Test
fun testNioServer() {
val memoryPool: MemoryPool = DirectMemoryPool(1024, 256)
val server = NioServer(port, object : INioProtocol {
override fun handleConnect(key: SelectionKey, nioThread: INioThread) {
val memoryToken = memoryPool.allocate()
key.attach(memoryToken to (memoryPool.getAdvanceByteBuffer(memoryToken) ?: ByteArrayAdvanceByteBuffer(1024)))
key.interestOps(SelectionKey.OP_READ)
}
override fun handleRead(key: SelectionKey, nioThread: INioThread) {
val channel = key.channel() as SocketChannel
val buffer = (key.attachment() as Pair<Int, AdvanceByteBuffer>).second
buffer.writeNioBuffer {
channel.read(it)
}
println("record from client: ${buffer.toString(buffer.readableSize)}")
key.interestOps(SelectionKey.OP_WRITE)
}
override fun handleWrite(key: SelectionKey, nioThread: INioThread) {
val channel = key.channel() as SocketChannel
val buffer = (key.attachment() as Pair<Int, AdvanceByteBuffer>).second
println("send to client: ${buffer.toString(buffer.readableSize)}")
buffer.readNioBuffer {
channel.write(it)
}
buffer.reset()
key.interestOps(SelectionKey.OP_READ)
}
override fun exceptionCause(key: SelectionKey, nioThread: INioThread, e: Throwable) {
super.exceptionCause(key, nioThread, e)
val memoryToken = (key.attachment() as Pair<Int, AdvanceByteBuffer>).first
memoryPool.free(memoryToken)
key.channel().close()
key.cancel()
}
})
server.run()
val socket = SocketClient("127.0.0.1", port)
val buffer = ByteArray(1024)
socket.outputStream.write("Hello".toByteArray())
val readCount = socket.inputStream.read(buffer)
println(buffer.copyOfRange(0, readCount).toString(Charsets.UTF_8))
server.close()
}
}

View File

@ -1,435 +1,445 @@
package cn.tursom.core.bytebuffer package cn.tursom.core.bytebuffer
import cn.tursom.core.forEachIndex import cn.tursom.core.forEachIndex
import cn.tursom.core.logE
import sun.reflect.generics.reflectiveObjects.NotImplementedException import sun.reflect.generics.reflectiveObjects.NotImplementedException
import java.io.OutputStream import java.io.OutputStream
import java.nio.ByteBuffer import java.nio.ByteBuffer
import kotlin.math.min import kotlin.math.min
interface AdvanceByteBuffer { interface AdvanceByteBuffer {
val nioBuffer: ByteBuffer val nioBuffer: ByteBuffer
val nioBuffers: Array<out ByteBuffer> get() = arrayOf(nioBuffer) val nioBuffers: Array<out ByteBuffer> get() = arrayOf(nioBuffer)
/** /**
* 各种位置变量 * 各种位置变量
*/ */
val readOnly: Boolean val readOnly: Boolean
val bufferCount: Int get() = 1 val bufferCount: Int get() = 1
var writePosition: Int var writePosition: Int
var limit: Int var limit: Int
val capacity: Int val capacity: Int
val hasArray: Boolean val hasArray: Boolean
val array: ByteArray val array: ByteArray
val arrayOffset: Int val arrayOffset: Int
var readPosition: Int var readPosition: Int
val readOffset: Int get() = arrayOffset + readPosition val readOffset: Int get() = arrayOffset + readPosition
val readableSize: Int val readableSize: Int
val available: Int get() = readableSize val available: Int get() = readableSize
val writeOffset: Int get() = arrayOffset + writePosition val writeOffset: Int get() = arrayOffset + writePosition
val writeableSize: Int get() = limit - writePosition val writeableSize: Int get() = limit - writePosition
val size: Int val size: Int
val readMode: Boolean val readMode: Boolean
fun readMode() fun readMode()
fun resumeWriteMode(usedSize: Int = 0) fun resumeWriteMode(usedSize: Int = 0)
fun needReadSize(size: Int) { fun needReadSize(size: Int) {
if (readableSize < size) throw OutOfBufferException() if (readableSize < size) throw OutOfBufferException()
} }
fun useReadSize(size: Int): Int { fun useReadSize(size: Int): Int {
needReadSize(size) needReadSize(size)
readPosition += size readPosition += size
return size return size
} }
fun take(size: Int): Int { fun take(size: Int): Int {
needReadSize(size) needReadSize(size)
val offset = readOffset val offset = readOffset
readPosition += size readPosition += size
return offset return offset
} }
fun push(size: Int): Int { fun push(size: Int): Int {
val offset = writeOffset val offset = writeOffset
writePosition += size writePosition += size
return offset return offset
} }
fun readAllSize() = useReadSize(readableSize) fun readAllSize() = useReadSize(readableSize)
fun takeAll() = take(readableSize) fun takeAll() = take(readableSize)
fun clear() fun clear()
fun reset() { fun reset() {
if (hasArray) { if (hasArray) {
array.copyInto(array, arrayOffset, readOffset, arrayOffset + writePosition) array.copyInto(array, arrayOffset, readOffset, arrayOffset + writePosition)
writePosition = readableSize writePosition = readableSize
readPosition = 0 readPosition = 0
} else {
readMode()
nioBuffer.compact()
val writePosition = readPosition
resumeWriteMode()
readPosition = 0
this.writePosition = writePosition
}
}
fun reset(outputStream: OutputStream) {
if (hasArray) {
outputStream.write(array, readOffset, arrayOffset + writePosition)
} else {
outputStream.write(getBytes())
}
writePosition = 0
readPosition = 0
}
fun requireAvailableSize(size: Int) {
if (limit - readPosition < size) reset()
}
/*
* 数据获取方法
*/
fun get(): Byte = if (readMode) {
nioBuffer.get()
} else { } else {
readMode() readMode()
val value = nioBuffer.get() nioBuffer.compact()
resumeWriteMode() val writePosition = readPosition
value resumeWriteMode()
readPosition = 0
this.writePosition = writePosition
} }
}
fun getChar(): Char = if (readMode) {
nioBuffer.char fun reset(outputStream: OutputStream) {
if (hasArray) {
outputStream.write(array, readOffset, arrayOffset + writePosition)
} else { } else {
readMode() outputStream.write(getBytes())
val value = nioBuffer.char
resumeWriteMode()
value
} }
writePosition = 0
fun getShort(): Short = if (readMode) { readPosition = 0
nioBuffer.short }
fun requireAvailableSize(size: Int) {
if (limit - readPosition < size) reset()
}
/*
* 数据获取方法
*/
fun get(): Byte = if (readMode) {
nioBuffer.get()
} else {
readMode()
val value = nioBuffer.get()
resumeWriteMode()
value
}
fun getChar(): Char = if (readMode) {
nioBuffer.char
} else {
readMode()
val value = nioBuffer.char
resumeWriteMode()
value
}
fun getShort(): Short = if (readMode) {
nioBuffer.short
} else {
readMode()
val value = nioBuffer.short
resumeWriteMode()
value
}
fun getInt(): Int = if (readMode) {
nioBuffer.int
} else {
readMode()
val value = nioBuffer.int
resumeWriteMode()
value
}
fun getLong(): Long = if (readMode) {
nioBuffer.long
} else {
readMode()
val value = nioBuffer.long
resumeWriteMode()
value
}
fun getFloat(): Float = if (readMode) {
nioBuffer.float
} else {
readMode()
val value = nioBuffer.float
resumeWriteMode()
value
}
fun getDouble(): Double = if (readMode) {
nioBuffer.double
} else {
readMode()
val value = nioBuffer.double
resumeWriteMode()
value
}
fun getBytes(size: Int = readableSize): ByteArray = if (readMode) {
val bytes = ByteArray(size)
nioBuffer.get(bytes)
readPosition = writePosition
bytes
} else {
readMode()
val bytes = ByteArray(size)
nioBuffer.get(bytes)
readPosition = writePosition
resumeWriteMode()
bytes
}
fun getString(size: Int = readableSize): String = String(getBytes())
fun toString(size: Int): String {
//logE("AdvanceByteBuffer.toString(size: Int): $this")
//val rp = readPosition
val bytes = getBytes(size)
//readPosition = rp
//logE("AdvanceByteBuffer.toString(size: Int): $this")
return String(bytes)
}
fun writeTo(buffer: ByteArray, bufferOffset: Int = 0, size: Int = min(readableSize, buffer.size)): Int {
val readSize = min(readableSize, size)
if (hasArray) {
array.copyInto(buffer, bufferOffset, readOffset, readOffset + readSize)
readPosition += readOffset
reset()
} else { } else {
readMode() readBuffer {
val value = nioBuffer.short it.put(buffer, bufferOffset, readSize)
resumeWriteMode() }
value
} }
return readSize
fun getInt(): Int = if (readMode) { }
nioBuffer.int
fun writeTo(os: OutputStream): Int {
val size = readableSize
if (hasArray) {
os.write(array, arrayOffset + readPosition, size)
readPosition += size
reset()
} else { } else {
readMode() val buffer = ByteArray(1024)
val value = nioBuffer.int readBuffer {
resumeWriteMode() while (it.remaining() > 0) {
value it.put(buffer)
os.write(buffer)
}
}
} }
return size
fun getLong(): Long = if (readMode) { }
nioBuffer.long
fun writeTo(buffer: AdvanceByteBuffer): Int {
val size = min(readableSize, buffer.writeableSize)
if (hasArray && buffer.hasArray) {
array.copyInto(buffer.array, buffer.writeOffset, readOffset, readOffset + size)
buffer.writePosition += size
readPosition += size
reset()
} else { } else {
readMode() readBuffer {
val value = nioBuffer.long buffer.nioBuffer.put(it)
resumeWriteMode() }
value
} }
return size
fun getFloat(): Float = if (readMode) { }
nioBuffer.float
fun toByteArray() = getBytes()
/*
* 数据写入方法
*/
fun put(byte: Byte) {
if (readMode) {
resumeWriteMode()
nioBuffer.put(byte)
readMode()
} else { } else {
readMode() nioBuffer.put(byte)
val value = nioBuffer.float
resumeWriteMode()
value
} }
}
fun getDouble(): Double = if (readMode) {
nioBuffer.double fun put(char: Char) {
if (readMode) {
resumeWriteMode()
nioBuffer.putChar(char)
readMode()
} else { } else {
readMode() nioBuffer.putChar(char)
val value = nioBuffer.double
resumeWriteMode()
value
} }
}
fun getBytes(): ByteArray = if (readMode) { fun put(short: Short) {
val bytes = ByteArray(readableSize) if (readMode) {
nioBuffer.get(bytes) resumeWriteMode()
readPosition = writePosition nioBuffer.putShort(short)
bytes readMode()
} else { } else {
readMode() nioBuffer.putShort(short)
val bytes = ByteArray(readableSize)
nioBuffer.get(bytes)
readPosition = writePosition
resumeWriteMode()
bytes
} }
}
fun getString(size: Int = readableSize): String = String(getBytes()) fun put(int: Int) {
fun toString(size: Int): String { if (readMode) {
val rp = readPosition resumeWriteMode()
val bytes = getBytes() nioBuffer.putInt(int)
readPosition = rp readMode()
return String(bytes) } else {
nioBuffer.putInt(int)
} }
}
fun writeTo(buffer: ByteArray, bufferOffset: Int = 0, size: Int = min(readableSize, buffer.size)): Int {
val readSize = min(readableSize, size) fun put(long: Long) {
if (hasArray) { if (readMode) {
array.copyInto(buffer, bufferOffset, readOffset, readOffset + readSize) resumeWriteMode()
readPosition += readOffset nioBuffer.putLong(long)
reset() readMode()
} else { } else {
readBuffer { nioBuffer.putLong(long)
it.put(buffer, bufferOffset, readSize)
}
}
return readSize
} }
}
fun writeTo(os: OutputStream): Int {
val size = readableSize fun put(float: Float) {
if (hasArray) { if (readMode) {
os.write(array, arrayOffset + readPosition, size) resumeWriteMode()
readPosition += size nioBuffer.putFloat(float)
reset() readMode()
} else { } else {
val buffer = ByteArray(1024) nioBuffer.putFloat(float)
readBuffer {
while (it.remaining() > 0) {
it.put(buffer)
os.write(buffer)
}
}
}
return size
} }
}
fun writeTo(buffer: AdvanceByteBuffer): Int {
val size = min(readableSize, buffer.writeableSize) fun put(double: Double) {
if (hasArray && buffer.hasArray) { if (readMode) {
array.copyInto(buffer.array, buffer.writeOffset, readOffset, readOffset + size) resumeWriteMode()
buffer.writePosition += size nioBuffer.putDouble(double)
readPosition += size readMode()
reset() } else {
} else { nioBuffer.putDouble(double)
readBuffer {
buffer.nioBuffer.put(it)
}
}
return size
} }
}
fun toByteArray() = getBytes()
fun put(str: String) {
put(str.toByteArray())
/* }
* 数据写入方法
*/ fun put(byteArray: ByteArray, startIndex: Int = 0, endIndex: Int = byteArray.size - startIndex) {
if (readMode) {
fun put(byte: Byte) { resumeWriteMode()
if (readMode) { nioBuffer.put(byteArray, startIndex, endIndex - startIndex)
resumeWriteMode() readMode()
nioBuffer.put(byte) } else {
readMode() nioBuffer.put(byteArray, startIndex, endIndex - startIndex)
} else {
nioBuffer.put(byte)
}
} }
}
fun put(char: Char) {
if (readMode) { fun put(array: CharArray, index: Int = 0, size: Int = array.size - index) {
resumeWriteMode() if (readMode) {
nioBuffer.putChar(char) resumeWriteMode()
readMode() array.forEachIndex(index, index + size - 1, this::put)
} else { readMode()
nioBuffer.putChar(char) } else {
} array.forEachIndex(index, index + size - 1, this::put)
} }
}
fun put(short: Short) {
if (readMode) { fun put(array: ShortArray, index: Int = 0, size: Int = array.size - index) {
resumeWriteMode() if (readMode) {
nioBuffer.putShort(short) resumeWriteMode()
readMode() array.forEachIndex(index, index + size - 1, this::put)
} else { readMode()
nioBuffer.putShort(short) } else {
} array.forEachIndex(index, index + size - 1, this::put)
} }
}
fun put(int: Int) {
if (readMode) { fun put(array: IntArray, index: Int = 0, size: Int = array.size - index) {
resumeWriteMode() if (readMode) {
nioBuffer.putInt(int) resumeWriteMode()
readMode() array.forEachIndex(index, index + size - 1, this::put)
} else { readMode()
nioBuffer.putInt(int) } else {
} array.forEachIndex(index, index + size - 1, this::put)
} }
}
fun put(long: Long) {
if (readMode) { fun put(array: LongArray, index: Int = 0, size: Int = array.size - index) {
resumeWriteMode() if (readMode) {
nioBuffer.putLong(long) resumeWriteMode()
readMode() array.forEachIndex(index, index + size - 1, this::put)
} else { readMode()
nioBuffer.putLong(long) } else {
} array.forEachIndex(index, index + size - 1, this::put)
} }
}
fun put(float: Float) {
if (readMode) { fun put(array: FloatArray, index: Int = 0, size: Int = array.size - index) {
resumeWriteMode() if (readMode) {
nioBuffer.putFloat(float) resumeWriteMode()
readMode() array.forEachIndex(index, index + size - 1, this::put)
} else { readMode()
nioBuffer.putFloat(float) } else {
} array.forEachIndex(index, index + size - 1, this::put)
} }
}
fun put(double: Double) {
if (readMode) { fun put(array: DoubleArray, index: Int = 0, size: Int = array.size - index) {
resumeWriteMode() if (readMode) {
nioBuffer.putDouble(double) resumeWriteMode()
readMode() array.forEachIndex(index, index + size - 1, this::put)
} else { readMode()
nioBuffer.putDouble(double) } else {
} array.forEachIndex(index, index + size - 1, this::put)
} }
}
fun put(str: String) {
if (readMode) { fun peekString(size: Int = readableSize): String {
resumeWriteMode() val readP = readPosition
nioBuffer.put(str.toByteArray()) val str = getString(size)
readMode() readPosition = readP
} else { return str
nioBuffer.put(str.toByteArray()) }
}
} fun <T> readBuffer(action: (nioBuffer: ByteBuffer) -> T): T = readNioBuffer(action)
fun <T> writeBuffer(action: (nioBuffer: ByteBuffer) -> T): T = writeNioBuffer(action)
fun put(byteArray: ByteArray, startIndex: Int = 0, endIndex: Int = byteArray.size) {
if (readMode) { suspend fun <T> readSuspendBuffer(action: suspend (nioBuffer: ByteBuffer) -> T): T = readNioBuffer { action(it) }
resumeWriteMode() suspend fun <T> writeSuspendBuffer(action: suspend (nioBuffer: ByteBuffer) -> T): T = writeNioBuffer { action(it) }
nioBuffer.put(byteArray, startIndex, endIndex - startIndex)
readMode() fun split(from: Int = readPosition, to: Int = writePosition): AdvanceByteBuffer {
} else { return if (hasArray) {
nioBuffer.put(byteArray, startIndex, endIndex - startIndex) ByteArrayAdvanceByteBuffer(array, arrayOffset + readPosition, to - from)
} } else {
} throw NotImplementedException()
fun put(array: CharArray, index: Int = 0, size: Int = array.size - index) {
if (readMode) {
resumeWriteMode()
array.forEachIndex(index, index + size - 1, this::put)
readMode()
} else {
array.forEachIndex(index, index + size - 1, this::put)
}
}
fun put(array: ShortArray, index: Int = 0, size: Int = array.size - index) {
if (readMode) {
resumeWriteMode()
array.forEachIndex(index, index + size - 1, this::put)
readMode()
} else {
array.forEachIndex(index, index + size - 1, this::put)
}
}
fun put(array: IntArray, index: Int = 0, size: Int = array.size - index) {
if (readMode) {
resumeWriteMode()
array.forEachIndex(index, index + size - 1, this::put)
readMode()
} else {
array.forEachIndex(index, index + size - 1, this::put)
}
}
fun put(array: LongArray, index: Int = 0, size: Int = array.size - index) {
if (readMode) {
resumeWriteMode()
array.forEachIndex(index, index + size - 1, this::put)
readMode()
} else {
array.forEachIndex(index, index + size - 1, this::put)
}
}
fun put(array: FloatArray, index: Int = 0, size: Int = array.size - index) {
if (readMode) {
resumeWriteMode()
array.forEachIndex(index, index + size - 1, this::put)
readMode()
} else {
array.forEachIndex(index, index + size - 1, this::put)
}
}
fun put(array: DoubleArray, index: Int = 0, size: Int = array.size - index) {
if (readMode) {
resumeWriteMode()
array.forEachIndex(index, index + size - 1, this::put)
readMode()
} else {
array.forEachIndex(index, index + size - 1, this::put)
}
}
fun peekString(size: Int = readableSize): String {
val readP = readPosition
val str = getString(size)
readPosition = readP
return str
}
fun <T> readBuffer(action: (nioBuffer: ByteBuffer) -> T): T = readNioBuffer(action)
fun <T> writeBuffer(action: (nioBuffer: ByteBuffer) -> T): T = writeNioBuffer(action)
suspend fun <T> readSuspendBuffer(action: suspend (nioBuffer: ByteBuffer) -> T): T = readNioBuffer { action(it) }
suspend fun <T> writeSuspendBuffer(action: suspend (nioBuffer: ByteBuffer) -> T): T = writeNioBuffer { action(it) }
fun split(from: Int = readPosition, to: Int = writePosition): AdvanceByteBuffer {
return if (hasArray) {
ByteArrayAdvanceByteBuffer(array, arrayOffset + readPosition, to - from)
} else {
throw NotImplementedException()
}
} }
}
override fun toString(): String
} }
inline fun <T> AdvanceByteBuffer.readNioBuffer(action: (nioBuffer: ByteBuffer) -> T): T { inline fun <T> AdvanceByteBuffer.readNioBuffer(action: (nioBuffer: ByteBuffer) -> T): T {
readMode() val readMode = this.readMode
val buffer = nioBuffer readMode()
val position = nioBuffer.position() val buffer = nioBuffer
return try { val position = readPosition
action(buffer) val bufferPosition = nioBuffer.position()
} finally { return try {
resumeWriteMode(buffer.position() - position) //logE(buffer.toString())
action(buffer)
} finally {
if (!readMode) {
resumeWriteMode(buffer.position() - position)
readPosition = position + (buffer.position() - bufferPosition)
} }
}
} }
inline fun <T> AdvanceByteBuffer.writeNioBuffer(action: (nioBuffer: ByteBuffer) -> T): T { inline fun <T> AdvanceByteBuffer.writeNioBuffer(action: (nioBuffer: ByteBuffer) -> T): T {
val buffer = nioBuffer val readMode = readMode
val position = writePosition resumeWriteMode()
val bufferPosition = nioBuffer.position() val buffer = nioBuffer
return try { val position = writePosition
action(buffer) val bufferPosition = nioBuffer.position()
} finally { return try {
writePosition = position + (buffer.position() - bufferPosition) action(buffer)
} finally {
if (readMode) {
writePosition = position + (buffer.position() - bufferPosition)
readMode()
} }
}
} }

View File

@ -5,150 +5,157 @@ import java.io.OutputStream
import java.nio.ByteBuffer import java.nio.ByteBuffer
class ByteArrayAdvanceByteBuffer( class ByteArrayAdvanceByteBuffer(
override val array: ByteArray, override val array: ByteArray,
val offset: Int = 0, val offset: Int = 0,
override val size: Int = array.size - offset, override val size: Int = array.size - offset,
override var readPosition: Int = 0, override var readPosition: Int = 0,
override var writePosition: Int = size override var writePosition: Int = size
) : AdvanceByteBuffer { ) : AdvanceByteBuffer {
constructor(size: Int) : this(ByteArray(size), 0, size, 0, 0) constructor(size: Int) : this(ByteArray(size), 0, size, 0, 0)
override val hasArray: Boolean get() = true override val hasArray: Boolean get() = true
override var readOnly: Boolean = false override var readOnly: Boolean = false
override val nioBuffer: ByteBuffer override val nioBuffer: ByteBuffer
get() = if (readMode) readByteBuffer get() = if (readMode) readByteBuffer
else writeByteBuffer else writeByteBuffer
override var limit: Int = size override var limit: Int = size
override val capacity: Int get() = size override val capacity: Int get() = size
override val arrayOffset: Int get() = offset override val arrayOffset: Int get() = offset
override val available: Int override val available: Int
get() = readableSize get() = readableSize
override val writeableSize: Int get() = limit - writePosition override val writeableSize: Int get() = limit - writePosition
override var readMode: Boolean = false override var readMode: Boolean = false
override fun readMode() { override fun readMode() {
readMode = true readMode = true
} }
override fun resumeWriteMode(usedSize: Int) { override fun resumeWriteMode(usedSize: Int) {
readPosition += usedSize readPosition += usedSize
readMode = false readMode = false
} }
override fun writeTo(os: OutputStream): Int { override fun writeTo(os: OutputStream): Int {
val size = readableSize val size = readableSize
os.write(array, readOffset, size) os.write(array, readOffset, size)
reset() reset()
return size return size
} }
override val readOffset get() = offset + readPosition override val readOffset get() = offset + readPosition
override val writeOffset get() = offset + writePosition override val writeOffset get() = offset + writePosition
val readByteBuffer get() = HeapByteBuffer.wrap(array, offset + readPosition, writePosition - readPosition) val readByteBuffer get() = HeapByteBuffer.wrap(array, offset + readPosition, writePosition - readPosition)
val writeByteBuffer get() = HeapByteBuffer.wrap(array, offset + writePosition, limit - writePosition) val writeByteBuffer get() = HeapByteBuffer.wrap(array, offset + writePosition, limit - writePosition)
override val readableSize get() = writePosition - readPosition override val readableSize get() = writePosition - readPosition
val position get() = "ArrayByteBuffer(size=$size, writePosition=$writePosition, readPosition=$readPosition)" val position get() = "ArrayByteBuffer(size=$size, writePosition=$writePosition, readPosition=$readPosition)"
/* /*
* 位置控制方法 * 位置控制方法
*/ */
override fun clear() { override fun clear() {
writePosition = 0 writePosition = 0
readPosition = 0 readPosition = 0
} }
override fun reset() { override fun reset() {
array.copyInto(array, offset, readOffset, offset + writePosition) array.copyInto(array, offset, readOffset, offset + writePosition)
writePosition = readableSize writePosition = readableSize
readPosition = 0 readPosition = 0
} }
override fun reset(outputStream: OutputStream) { override fun reset(outputStream: OutputStream) {
outputStream.write(array, readOffset, offset + writePosition) outputStream.write(array, readOffset, offset + writePosition)
writePosition = 0 writePosition = 0
readPosition = 0 readPosition = 0
} }
override fun needReadSize(size: Int) { override fun needReadSize(size: Int) {
if (readableSize < size) throw OutOfBufferException() if (readableSize < size) throw OutOfBufferException()
} }
override fun take(size: Int): Int { override fun take(size: Int): Int {
needReadSize(size) needReadSize(size)
val offset = readOffset val offset = readOffset
readPosition += size readPosition += size
return offset return offset
} }
override fun useReadSize(size: Int): Int { override fun useReadSize(size: Int): Int {
needReadSize(size) needReadSize(size)
readPosition += size readPosition += size
return size return size
} }
override fun push(size: Int): Int { override fun push(size: Int): Int {
val offset = writeOffset val offset = writeOffset
writePosition += size writePosition += size
return offset return offset
} }
override fun readAllSize() = useReadSize(readableSize) override fun readAllSize() = useReadSize(readableSize)
override fun takeAll() = take(readableSize) override fun takeAll() = take(readableSize)
/* /*
* 数据获取方法 * 数据获取方法
*/ */
override fun get() = array[take(1)] override fun get() = array[take(1)]
override fun getChar() = array.toChar(take(2)) override fun getChar() = array.toChar(take(2))
override fun getShort() = array.toShort(take(2)) override fun getShort() = array.toShort(take(2))
override fun getInt() = array.toInt(take(4)) override fun getInt() = array.toInt(take(4))
override fun getLong() = array.toLong(take(8)) override fun getLong() = array.toLong(take(8))
override fun getFloat() = array.toFloat(take(4)) override fun getFloat() = array.toFloat(take(4))
override fun getDouble() = array.toDouble(take(8)) override fun getDouble() = array.toDouble(take(8))
override fun getBytes() = array.copyOfRange(readPosition, readAllSize()) override fun getBytes(size: Int): ByteArray {
override fun getString(size: Int) = String(array, readPosition, useReadSize(size)) val readMode = readMode
readMode()
override fun writeTo(buffer: ByteArray, bufferOffset: Int, size: Int): Int { val array = array.copyOfRange(readPosition, useReadSize(size))
array.copyInto(buffer, bufferOffset, offset, useReadSize(size)) if (!readMode) resumeWriteMode(size)
return size return array
} }
override fun toByteArray() = getBytes() override fun getString(size: Int) = String(array, readPosition, useReadSize(size))
override fun writeTo(buffer: ByteArray, bufferOffset: Int, size: Int): Int {
/* array.copyInto(buffer, bufferOffset, offset, useReadSize(size))
* 数据写入方法 return size
*/ }
override fun put(byte: Byte) { override fun toByteArray() = getBytes()
array.put(byte, push(1))
}
/*
override fun put(char: Char) = array.put(char, push(2)) * 数据写入方法
override fun put(short: Short) = array.put(short, push(2)) */
override fun put(int: Int) = array.put(int, push(4))
override fun put(long: Long) = array.put(long, push(8)) override fun put(byte: Byte) {
override fun put(float: Float) = array.put(float, push(4)) array.put(byte, push(1))
override fun put(double: Double) = array.put(double, push(8)) }
override fun put(str: String) = put(str.toByteArray())
override fun put(byteArray: ByteArray, startIndex: Int, endIndex: Int) { override fun put(char: Char) = array.put(char, push(2))
byteArray.copyInto(array, push(endIndex - startIndex), startIndex, endIndex) override fun put(short: Short) = array.put(short, push(2))
} override fun put(int: Int) = array.put(int, push(4))
override fun put(long: Long) = array.put(long, push(8))
override fun toString(): String { override fun put(float: Float) = array.put(float, push(4))
//return String(array, readOffset, readableSize) override fun put(double: Double) = array.put(double, push(8))
return "ByteArrayAdvanceByteBuffer(size=$size, readPosition=$readPosition, writePosition=$writePosition)" override fun put(str: String) = put(str.toByteArray())
} override fun put(byteArray: ByteArray, startIndex: Int, endIndex: Int) {
byteArray.copyInto(array, push(endIndex - startIndex), startIndex, endIndex)
/** }
* 缓冲区用完异常
*/ override fun toString(): String {
class OutOfBufferException : Exception() //return String(array, readOffset, readableSize)
return "ByteArrayAdvanceByteBuffer(size=$size, readPosition=$readPosition, writePosition=$writePosition)"
}
/**
* 缓冲区用完异常
*/
class OutOfBufferException : Exception()
} }

View File

@ -1,53 +1,68 @@
package cn.tursom.core.bytebuffer package cn.tursom.core.bytebuffer
import cn.tursom.core.logE
import java.nio.ByteBuffer import java.nio.ByteBuffer
class DirectNioAdvanceByteBuffer(val buffer: ByteBuffer) : AdvanceByteBuffer { class DirectNioAdvanceByteBuffer(val buffer: ByteBuffer) : AdvanceByteBuffer {
override val nioBuffer: ByteBuffer get() = buffer override val nioBuffer: ByteBuffer get() = buffer
override val readOnly: Boolean get() = buffer.isReadOnly override val readOnly: Boolean get() = buffer.isReadOnly
override var writePosition: Int = buffer.position() var writeMark = 0
get() = field override var writePosition: Int
set(value) { get() {
if (!readMode) buffer.position(value) return if (readMode) writeMark
field = value else buffer.position()
} }
override var limit: Int = buffer.limit() set(value) {
get() = if (!readMode) buffer.limit() else field if (!readMode) buffer.position(value)
set(value) { else buffer.limit(value)
if (!readMode) buffer.limit(value) }
field = value override var limit: Int = buffer.limit()
} get() = if (!readMode) buffer.limit() else field
override val capacity: Int get() = buffer.capacity() set(value) {
if (!readMode) buffer.limit(value)
override val hasArray: Boolean get() = false field = value
override val array: ByteArray get() = buffer.array() }
override val arrayOffset: Int = 0 override val capacity: Int get() = buffer.capacity()
override var readPosition: Int = 0
get() = if (readMode) buffer.position() else field override val hasArray: Boolean get() = false
set(value) { override val array: ByteArray get() = buffer.array()
if (readMode) buffer.position(value) override val arrayOffset: Int = 0
field = value override var readPosition: Int = 0
} get() = if (readMode) buffer.position() else field
override val readableSize: Int get() = if (readMode) buffer.remaining() else writePosition - readPosition set(value) {
override val size: Int get() = buffer.capacity() if (readMode) buffer.position(value)
override var readMode: Boolean = false field = value
}
override fun readMode() { override val readableSize: Int get() = if (readMode) buffer.remaining() else writePosition - readPosition
if (!readMode) { override val size: Int get() = buffer.capacity()
readMode = true override var readMode: Boolean = false
buffer.flip()
} override fun readMode() {
} if (!readMode) {
writeMark = writePosition
override fun resumeWriteMode(usedSize: Int) { //logE("readMode() $this $writeMark $writePosition ${buffer.position()}")
if (readMode) { readMode = true
readMode = false buffer.flip()
buffer.limit(capacity) buffer.position(readPosition)
buffer.position(writePosition) //logE("readMode() $this $writeMark $writePosition ${buffer.position()}")
} }
} }
override fun clear() { override fun resumeWriteMode(usedSize: Int) {
buffer.clear() if (readMode) {
} readMode = false
buffer.limit(capacity)
buffer.position(writeMark)
}
}
override fun clear() {
resumeWriteMode()
buffer.clear()
readPosition = 0
}
override fun toString(): String {
return "DirectNioAdvanceByteBuffer(buffer=$buffer, readMode=$readMode, readPosition=$readPosition, writePosition=$writePosition)"
}
} }

View File

@ -6,160 +6,171 @@ import java.nio.ByteBuffer
@Suppress("unused", "MemberVisibilityCanBePrivate") @Suppress("unused", "MemberVisibilityCanBePrivate")
class HeapNioAdvanceByteBuffer(val buffer: ByteBuffer) : AdvanceByteBuffer { class HeapNioAdvanceByteBuffer(val buffer: ByteBuffer) : AdvanceByteBuffer {
constructor(size: Int) : this(ByteBuffer.allocate(size)) constructor(size: Int) : this(ByteBuffer.allocate(size))
constructor(buffer: ByteArray, offset: Int = 0, size: Int = buffer.size - offset) : this(HeapByteBuffer.wrap(buffer, offset, size)) constructor(buffer: ByteArray, offset: Int = 0, size: Int = buffer.size - offset) : this(HeapByteBuffer.wrap(buffer, offset, size))
override val nioBuffer: ByteBuffer get() = buffer override val nioBuffer: ByteBuffer get() = buffer
override val hasArray: Boolean get() = buffer.hasArray() override val hasArray: Boolean get() = buffer.hasArray()
override val readOnly: Boolean get() = buffer.isReadOnly override val readOnly: Boolean get() = buffer.isReadOnly
private var _readMode = false private var _readMode = false
var readMark = 0 var readMark = 0
var writeMark = 0 var writeMark = 0
/** /**
* 各种位置变量 * 各种位置变量
*/ */
override var writePosition override var writePosition
get() = buffer.position() get() = buffer.position()
set(value) { set(value) {
buffer.position(value) buffer.position(value)
} }
override var limit override var limit
get() = buffer.limit() get() = buffer.limit()
set(value) { set(value) {
buffer.limit(value) buffer.limit(value)
} }
override val capacity: Int = buffer.capacity() override val capacity: Int = buffer.capacity()
override val array: ByteArray get() = buffer.array() override val array: ByteArray get() = buffer.array()
override val arrayOffset: Int get() = buffer.arrayOffset() override val arrayOffset: Int get() = buffer.arrayOffset()
override var readPosition: Int = 0 override var readPosition: Int = 0
override val readOffset get() = arrayOffset + readPosition override val readOffset get() = arrayOffset + readPosition
override val readableSize get() = writePosition - readPosition override val readableSize get() = writePosition - readPosition
override val available get() = readableSize override val available get() = readableSize
override val writeOffset get() = arrayOffset + writePosition override val writeOffset get() = arrayOffset + writePosition
override val writeableSize get() = limit - writePosition override val writeableSize get() = limit - writePosition
override val size = buffer.capacity() override val size = buffer.capacity()
override val readMode get() = _readMode override val readMode get() = _readMode
/* /*
* 位置控制方法 * 位置控制方法
*/ */
override fun readMode() { override fun readMode() {
writeMark = buffer.position() writeMark = buffer.position()
readMark = readPosition readMark = readPosition
buffer.limit(buffer.position()) buffer.limit(buffer.position())
buffer.position(readPosition) buffer.position(readPosition)
_readMode = true _readMode = true
} }
override fun resumeWriteMode(usedSize: Int) { override fun resumeWriteMode(usedSize: Int) {
readPosition = readMark + usedSize readPosition = readMark + usedSize
buffer.limit(buffer.capacity()) buffer.limit(buffer.capacity())
buffer.position(writeMark) buffer.position(writeMark)
_readMode = false _readMode = false
} }
override fun needReadSize(size: Int) { override fun needReadSize(size: Int) {
if (readableSize < size) throw OutOfBufferException() if (readableSize < size) throw OutOfBufferException()
} }
override fun useReadSize(size: Int): Int { override fun useReadSize(size: Int): Int {
needReadSize(size) needReadSize(size)
readPosition += size readPosition += size
return size return size
} }
override fun take(size: Int): Int { override fun take(size: Int): Int {
needReadSize(size) needReadSize(size)
val offset = readOffset val offset = readOffset
readPosition += size readPosition += size
return offset return offset
} }
override fun push(size: Int): Int { override fun push(size: Int): Int {
val offset = writeOffset val offset = writeOffset
writePosition += size writePosition += size
return offset return offset
} }
override fun readAllSize() = useReadSize(readableSize) override fun readAllSize() = useReadSize(readableSize)
override fun takeAll() = take(readableSize) override fun takeAll() = take(readableSize)
override fun clear() { override fun clear() {
readPosition = 0 readPosition = 0
buffer.clear() buffer.clear()
} }
override fun reset() { override fun reset() {
array.copyInto(array, arrayOffset, readOffset, arrayOffset + writePosition) array.copyInto(array, arrayOffset, readOffset, arrayOffset + writePosition)
writePosition = readableSize writePosition = readableSize
readPosition = 0 readPosition = 0
} }
override fun reset(outputStream: OutputStream) { override fun reset(outputStream: OutputStream) {
outputStream.write(array, readOffset, arrayOffset + writePosition) outputStream.write(array, readOffset, arrayOffset + writePosition)
writePosition = 0 writePosition = 0
readPosition = 0 readPosition = 0
} }
override fun requireAvailableSize(size: Int) { override fun requireAvailableSize(size: Int) {
if (limit - readPosition < size) reset() if (limit - readPosition < size) reset()
} }
/* /*
* 数据获取方法 * 数据获取方法
*/ */
override fun get() = array[take(1)] override fun get() = array[take(1)]
override fun getChar() = array.toChar(take(2)) override fun getChar() = array.toChar(take(2))
override fun getShort() = array.toShort(take(2)) override fun getShort() = array.toShort(take(2))
override fun getInt() = array.toInt(take(4)) override fun getInt() = array.toInt(take(4))
override fun getLong() = array.toLong(take(8)) override fun getLong() = array.toLong(take(8))
override fun getFloat() = array.toFloat(take(4)) override fun getFloat() = array.toFloat(take(4))
override fun getDouble() = array.toDouble(take(8)) override fun getDouble() = array.toDouble(take(8))
override fun getBytes() = array.copyOfRange(arrayOffset, readAllSize()) override fun getBytes(size: Int): ByteArray {
override fun getString(size: Int) = String(array, readOffset, useReadSize(size)) val readMode = readMode
readMode()
override fun writeTo(buffer: ByteArray, bufferOffset: Int, size: Int): Int { val array = array.copyOfRange(readPosition, useReadSize(size))
array.copyInto(buffer, bufferOffset, arrayOffset, useReadSize(size)) if (!readMode) resumeWriteMode(size)
return size return array
} }
override fun toByteArray() = getBytes() override fun getString(size: Int) = String(array, readOffset, useReadSize(size))
override fun writeTo(buffer: ByteArray, bufferOffset: Int, size: Int): Int {
/* array.copyInto(buffer, bufferOffset, arrayOffset, useReadSize(size))
* 数据写入方法 return size
*/ }
override fun put(byte: Byte) { override fun toByteArray() = getBytes()
buffer.put(byte)
}
/*
override fun put(char: Char) = array.put(char, push(2)) * 数据写入方法
override fun put(short: Short) = array.put(short, push(2)) */
override fun put(int: Int) = array.put(int, push(4))
override fun put(long: Long) = array.put(long, push(8)) override fun put(byte: Byte) {
override fun put(float: Float) = array.put(float, push(4)) buffer.put(byte)
override fun put(double: Double) = array.put(double, push(8)) }
override fun put(str: String) = put(str.toByteArray())
override fun put(byteArray: ByteArray, startIndex: Int, endIndex: Int) { override fun put(char: Char) = array.put(char, push(2))
byteArray.copyInto(array, push(endIndex - startIndex), startIndex, endIndex) override fun put(short: Short) = array.put(short, push(2))
} override fun put(int: Int) = array.put(int, push(4))
override fun put(long: Long) = array.put(long, push(8))
override fun split(from: Int, to: Int): AdvanceByteBuffer { override fun put(float: Float) = array.put(float, push(4))
val readMark = readPosition override fun put(double: Double) = array.put(double, push(8))
val writeMark = writePosition override fun put(str: String) = put(str.toByteArray())
buffer.position(readMark) override fun put(byteArray: ByteArray, startIndex: Int, endIndex: Int) {
buffer.limit(writeMark) byteArray.copyInto(array, push(endIndex - startIndex), startIndex, endIndex)
val slice = HeapNioAdvanceByteBuffer(buffer.slice()) }
readPosition = readMark
writePosition = writeMark override fun split(from: Int, to: Int): AdvanceByteBuffer {
return slice val readMark = readPosition
} val writeMark = writePosition
buffer.position(readMark)
buffer.limit(writeMark)
val slice = HeapNioAdvanceByteBuffer(buffer.slice())
readPosition = readMark
writePosition = writeMark
return slice
}
override fun toString(): String {
return "HeapNioAdvanceByteBuffer(buffer=$buffer, readMode=$_readMode, readMark=$readMark, writeMark=$writeMark, capacity=$capacity, readPosition=$readPosition, size=$size)"
}
} }

View File

@ -1,101 +1,108 @@
package cn.tursom.core.bytebuffer package cn.tursom.core.bytebuffer
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.*
import kotlin.collections.ArrayList
class MultiAdvanceByteBuffer(vararg val buffers: AdvanceByteBuffer) : AdvanceByteBuffer { class MultiAdvanceByteBuffer(vararg val buffers: AdvanceByteBuffer) : AdvanceByteBuffer {
init { init {
resumeWriteMode() resumeWriteMode()
} }
var writeBufferIndex = 0 var writeBufferIndex = 0
var readBufferIndex = 0 var readBufferIndex = 0
val readBuffer get() = buffers[writeBufferIndex] val readBuffer get() = buffers[writeBufferIndex]
val writeBuffer get() = buffers[writeBufferIndex] val writeBuffer get() = buffers[writeBufferIndex]
val operatorBuffer val operatorBuffer
get() = if (readMode) { get() = if (readMode) {
readBuffer readBuffer
} else { } else {
writeBuffer writeBuffer
} }
override val nioBuffers: Array<out ByteBuffer> override val nioBuffers: Array<out ByteBuffer>
get() { get() {
val bufList = ArrayList<ByteBuffer>() val bufList = ArrayList<ByteBuffer>()
buffers.forEach { buffer -> buffers.forEach { buffer ->
if (buffer.bufferCount == 1) { if (buffer.bufferCount == 1) {
bufList.add(buffer.nioBuffer) bufList.add(buffer.nioBuffer)
} else { } else {
buffer.nioBuffers.forEach { buffer.nioBuffers.forEach {
bufList.add(it) bufList.add(it)
} }
} }
} }
return bufList.toTypedArray() return bufList.toTypedArray()
} }
override val hasArray: Boolean get() = false override val hasArray: Boolean get() = false
override val readOnly: Boolean get() = false override val readOnly: Boolean get() = false
override val bufferCount: Int get() = buffers.size override val bufferCount: Int get() = buffers.size
override val nioBuffer: ByteBuffer get() = operatorBuffer.nioBuffer override val nioBuffer: ByteBuffer get() = operatorBuffer.nioBuffer
override var writePosition: Int override var writePosition: Int
get() = operatorBuffer.writePosition get() = operatorBuffer.writePosition
set(value) { set(value) {
operatorBuffer.writePosition = value operatorBuffer.writePosition = value
} }
override var limit: Int override var limit: Int
get() = operatorBuffer.limit get() = operatorBuffer.limit
set(value) { set(value) {
operatorBuffer.limit = value operatorBuffer.limit = value
} }
override val capacity: Int get() = operatorBuffer.capacity override val capacity: Int get() = operatorBuffer.capacity
override val array: ByteArray get() = operatorBuffer.array override val array: ByteArray get() = operatorBuffer.array
override val arrayOffset: Int get() = operatorBuffer.arrayOffset override val arrayOffset: Int get() = operatorBuffer.arrayOffset
override var readPosition: Int override var readPosition: Int
get() = operatorBuffer.readPosition get() = operatorBuffer.readPosition
set(value) { set(value) {
operatorBuffer.readPosition = value operatorBuffer.readPosition = value
} }
override val readOffset: Int get() = operatorBuffer.readOffset override val readOffset: Int get() = operatorBuffer.readOffset
override val readableSize: Int get() = operatorBuffer.readableSize override val readableSize: Int get() = operatorBuffer.readableSize
override val available: Int get() = operatorBuffer.available override val available: Int get() = operatorBuffer.available
override val writeOffset: Int get() = operatorBuffer.writeOffset override val writeOffset: Int get() = operatorBuffer.writeOffset
override val writeableSize: Int get() = operatorBuffer.writeableSize override val writeableSize: Int get() = operatorBuffer.writeableSize
override val size: Int get() = operatorBuffer.size override val size: Int get() = operatorBuffer.size
override var readMode: Boolean = false override var readMode: Boolean = false
override fun readMode() { override fun readMode() {
readMode = true readMode = true
buffers.forEach(AdvanceByteBuffer::readMode) buffers.forEach(AdvanceByteBuffer::readMode)
} }
override fun resumeWriteMode(usedSize: Int) { override fun resumeWriteMode(usedSize: Int) {
readMode = false readMode = false
buffers.forEach { it.resumeWriteMode() } buffers.forEach { it.resumeWriteMode() }
} }
override fun clear() { override fun clear() {
writeBufferIndex = 0 writeBufferIndex = 0
readBufferIndex = 0 readBufferIndex = 0
buffers.forEach { buffer -> buffer.clear() } buffers.forEach { buffer -> buffer.clear() }
} }
override fun get(): Byte = readBuffer.get() override fun get(): Byte = readBuffer.get()
override fun getChar(): Char = readBuffer.getChar() override fun getChar(): Char = readBuffer.getChar()
override fun getShort(): Short = readBuffer.getShort() override fun getShort(): Short = readBuffer.getShort()
override fun getInt(): Int = readBuffer.getInt() override fun getInt(): Int = readBuffer.getInt()
override fun getLong(): Long = readBuffer.getLong() override fun getLong(): Long = readBuffer.getLong()
override fun getFloat(): Float = readBuffer.getFloat() override fun getFloat(): Float = readBuffer.getFloat()
override fun getDouble(): Double = readBuffer.getDouble() override fun getDouble(): Double = readBuffer.getDouble()
override fun getBytes(): ByteArray = readBuffer.getBytes() override fun getBytes(size: Int) = readBuffer.getBytes(size)
override fun getString(size: Int): String = readBuffer.getString(size) override fun getString(size: Int): String = readBuffer.getString(size)
override fun put(byte: Byte) = writeBuffer.put(byte) override fun put(byte: Byte) = writeBuffer.put(byte)
override fun put(char: Char) = writeBuffer.put(char) override fun put(char: Char) = writeBuffer.put(char)
override fun put(short: Short) = writeBuffer.put(short) override fun put(short: Short) = writeBuffer.put(short)
override fun put(int: Int) = writeBuffer.put(int) override fun put(int: Int) = writeBuffer.put(int)
override fun put(long: Long) = writeBuffer.put(long) override fun put(long: Long) = writeBuffer.put(long)
override fun put(float: Float) = writeBuffer.put(float) override fun put(float: Float) = writeBuffer.put(float)
override fun put(double: Double) = writeBuffer.put(double) override fun put(double: Double) = writeBuffer.put(double)
override fun put(str: String) = writeBuffer.put(str) override fun put(str: String) = writeBuffer.put(str)
override fun toString(): String {
return "MultiAdvanceByteBuffer(buffers=${Arrays.toString(buffers)}, writeBufferIndex=$writeBufferIndex, readBufferIndex=$readBufferIndex, readMode=$readMode)"
}
} }

View File

@ -3,8 +3,8 @@ package cn.tursom.core.bytebuffer
import java.nio.ByteBuffer import java.nio.ByteBuffer
class NioAdvanceByteBuffer(val buffer: ByteBuffer) : class NioAdvanceByteBuffer(val buffer: ByteBuffer) :
AdvanceByteBuffer by if (buffer.hasArray()) { AdvanceByteBuffer by if (buffer.hasArray()) {
HeapNioAdvanceByteBuffer(buffer) HeapNioAdvanceByteBuffer(buffer)
} else { } else {
DirectNioAdvanceByteBuffer(buffer) DirectNioAdvanceByteBuffer(buffer)
} }

View File

@ -47,4 +47,8 @@ class DirectMemoryPool(override val blockSize: Int = 1024, override val blockCou
} else { } else {
null null
} }
override fun toString(): String {
return "DirectMemoryPool(blockSize=$blockSize, blockCount=$blockCount, bitMap=$bitMap)"
}
} }

View File

@ -48,4 +48,8 @@ class HeapMemoryPool(override val blockSize: Int = 1024, override val blockCount
} else { } else {
null null
} }
override fun toString(): String {
return "HeapMemoryPool(blockSize=$blockSize, blockCount=$blockCount, bitMap=$bitMap)"
}
} }

View File

@ -11,7 +11,7 @@ import java.nio.ByteBuffer
interface MemoryPool { interface MemoryPool {
val blockSize: Int val blockSize: Int
val blockCount: Int val blockCount: Int
fun allocate(): Int fun allocate(): Int
fun free(token: Int) fun free(token: Int)
fun getMemory(token: Int): ByteBuffer? fun getMemory(token: Int): ByteBuffer?
@ -23,6 +23,8 @@ interface MemoryPool {
null null
} }
} }
override fun toString(): String
} }

View File

@ -8,103 +8,105 @@ import kotlin.concurrent.thread
class WheelTimer( class WheelTimer(
val tick: Long = 200, val tick: Long = 200,
val wheelSize: Int = 512 val wheelSize: Int = 512
) : Timer { ) : Timer {
var closed = false var closed = false
val taskQueueArray = Array(wheelSize) { TaskQueue() } val taskQueueArray = Array(wheelSize) { TaskQueue() }
private var position = 0 private var position = 0
override fun exec(timeout: Long, task: () -> Unit): TimerTask { override fun exec(timeout: Long, task: () -> Unit): TimerTask {
val index = ((timeout / tick + position + if (timeout % tick == 0L) 0 else 1) % wheelSize).toInt() val index = ((timeout / tick + position + if (timeout % tick == 0L) 0 else 1) % wheelSize).toInt()
return taskQueueArray[index].offer(task, timeout) return taskQueueArray[index].offer(task, timeout)
} }
init { init {
thread(isDaemon = true, name = "wheelTimerLooper") { thread(isDaemon = true, name = "wheelTimerLooper") {
while (!closed) { val startTime = System.currentTimeMillis()
position %= wheelSize while (!closed) {
position %= wheelSize
val newQueue = TaskQueue()
val taskQueue = taskQueueArray[position] val newQueue = TaskQueue()
taskQueueArray[position] = newQueue val taskQueue = taskQueueArray[position]
taskQueueArray[position] = newQueue
val time = System.currentTimeMillis()
var node = taskQueue.root.next val time = System.currentTimeMillis()
while (node != null) { var node = taskQueue.root.next
node = if (node.isOutTime(time)) { while (node != null) {
val sNode = node node = if (node.isOutTime(time)) {
threadPool.execute { sNode.task() } val sNode = node
node.next threadPool.execute { sNode.task() }
} else { node.next
val next = node.next } else {
newQueue.offer(node) val next = node.next
next newQueue.offer(node)
} next
} }
}
position++
sleep(tick) position++
} val nextSleep = startTime + tick * position - System.currentTimeMillis()
} if (nextSleep > 0) sleep(tick)
} }
}
}
class TaskQueue {
val root: TaskNode = TaskNode(0, {}, null, null)
class TaskQueue {
fun offer(task: () -> Unit, timeout: Long): TaskNode { val root: TaskNode = TaskNode(0, {}, null, null)
synchronized(root) {
val insert = TaskNode(timeout, task, root, root.next) fun offer(task: () -> Unit, timeout: Long): TaskNode {
root.next = insert synchronized(root) {
insert.next?.prev = insert val insert = TaskNode(timeout, task, root, root.next)
return insert root.next = insert
} insert.next?.prev = insert
} return insert
}
fun offer(node: TaskNode): TaskNode { }
synchronized(root) {
node.next = root.next fun offer(node: TaskNode): TaskNode {
node.next = node synchronized(root) {
node.next?.prev = node node.next = root.next
return node node.next = node
} node.next?.prev = node
} return node
}
inner class TaskNode( }
val timeout: Long,
val task: () -> Unit, inner class TaskNode(
var prev: TaskNode?, val timeout: Long,
var next: TaskNode? val task: () -> Unit,
) : TimerTask { var prev: TaskNode?,
val outTime = System.currentTimeMillis() + timeout var next: TaskNode?
val isOutTime get() = System.currentTimeMillis() > outTime ) : TimerTask {
val outTime = System.currentTimeMillis() + timeout
fun isOutTime(time: Long) = time > outTime val isOutTime get() = System.currentTimeMillis() > outTime
override fun run() = task() fun isOutTime(time: Long) = time > outTime
override fun cancel() { override fun run() = task()
synchronized(root) {
prev?.next = next override fun cancel() {
next?.prev = prev synchronized(root) {
} prev?.next = next
} next?.prev = prev
} }
} }
}
companion object { }
val threadPool: ExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
object : ThreadFactory { companion object {
var threadNumber = 0 val threadPool: ExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
override fun newThread(r: Runnable): Thread { object : ThreadFactory {
val thread = Thread(r) var threadNumber = 0
thread.isDaemon = true override fun newThread(r: Runnable): Thread {
thread.name = "wheelTimerWorker-$threadNumber" val thread = Thread(r)
return thread thread.isDaemon = true
} thread.name = "wheelTimerWorker-$threadNumber"
}) return thread
val timer by lazy { WheelTimer(200, 1024) } }
val smoothTimer by lazy { WheelTimer(20, 128) } })
} val timer by lazy { WheelTimer(200, 1024) }
val smoothTimer by lazy { WheelTimer(20, 128) }
}
} }

View File

@ -80,8 +80,8 @@ class NettyAdvanceByteBuffer(val byteBuf: ByteBuf) : AdvanceByteBuffer {
override fun getFloat(): Float = byteBuf.readFloat() override fun getFloat(): Float = byteBuf.readFloat()
override fun getDouble(): Double = byteBuf.readDouble() override fun getDouble(): Double = byteBuf.readDouble()
override fun getBytes(): ByteArray { override fun getBytes(size: Int): ByteArray {
val bytes = ByteArray(byteBuf.readableBytes()) val bytes = ByteArray(size)
byteBuf.readBytes(bytes) byteBuf.readBytes(bytes)
return bytes return bytes
} }
@ -139,5 +139,8 @@ class NettyAdvanceByteBuffer(val byteBuf: ByteBuf) : AdvanceByteBuffer {
override fun put(byteArray: ByteArray, startIndex: Int, endIndex: Int) { override fun put(byteArray: ByteArray, startIndex: Int, endIndex: Int) {
byteBuf.writeBytes(byteArray, startIndex, endIndex - startIndex) byteBuf.writeBytes(byteArray, startIndex, endIndex - startIndex)
} }
override fun toString(): String {
return "NettyAdvanceByteBuffer(byteBuf=$byteBuf, readMode=$readMode)"
}
} }