diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/channel/AsyncChannel.kt b/AsyncSocket/src/main/kotlin/cn/tursom/channel/AsyncChannel.kt index b24d4d6..7f600f3 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/channel/AsyncChannel.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/channel/AsyncChannel.kt @@ -33,7 +33,7 @@ interface AsyncChannel : Closeable { suspend fun <T> write(timeout: Long, action: () -> T): T { return operate { - waitWrite() + waitWrite(timeout) action() } } @@ -104,7 +104,11 @@ interface AsyncChannel : Closeable { if (Thread.currentThread() == nioThread.thread) { if (key.isValid) key.interestOps(SelectionKey.OP_WRITE) } else { - nioThread.execute { if (key.isValid) key.interestOps(SelectionKey.OP_WRITE) } + nioThread.execute { + if (key.isValid) { + key.interestOps(SelectionKey.OP_WRITE) + } + } nioThread.wakeup() } } diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/channel/AsyncProtocol.kt b/AsyncSocket/src/main/kotlin/cn/tursom/channel/AsyncProtocol.kt index 635c353..61046a9 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/channel/AsyncProtocol.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/channel/AsyncProtocol.kt @@ -1,45 +1,47 @@ package cn.tursom.channel +import cn.tursom.core.assert import cn.tursom.core.timer.TimerTask import cn.tursom.niothread.NioProtocol import cn.tursom.niothread.NioThread import java.nio.channels.SelectionKey +import java.nio.channels.SocketChannel import kotlin.coroutines.Continuation import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException -object AsyncProtocol: NioProtocol { +object AsyncProtocol : NioProtocol { data class Context(val cont: Continuation<Int>, val timeoutTask: TimerTask? = null) data class ConnectContext(val cont: Continuation<SelectionKey>, val timeoutTask: TimerTask? = null) override fun handleConnect(key: SelectionKey, nioThread: NioThread) { key.interestOps(0) - val context = key.attachment() as ConnectContext? ?: return - context.timeoutTask?.cancel() - context.cont.resume(key) + key.channel().assert<SocketChannel> { finishConnect() } + key.attachment().assert<ConnectContext> { + timeoutTask?.cancel() + cont.resume(key) + } } override fun handleRead(key: SelectionKey, nioThread: NioThread) { key.interestOps(0) - //logE("read ready") - val context = key.attachment() as Context? ?: return - context.timeoutTask?.cancel() - context.cont.resume(0) + key.attachment().assert<Context> { + timeoutTask?.cancel() + cont.resume(0) + } } override fun handleWrite(key: SelectionKey, nioThread: NioThread) { key.interestOps(0) - val context = key.attachment() as Context? ?: return - context.timeoutTask?.cancel() - context.cont.resume(0) + key.attachment().assert<Context> { + timeoutTask?.cancel() + cont.resume(0) + } } override fun exceptionCause(key: SelectionKey, nioThread: NioThread, e: Throwable) { key.interestOps(0) - val context = key.attachment() as Context? - if (context != null) - context.cont.resumeWithException(e) - else { + if (!key.attachment().assert<Context> { cont.resumeWithException(e) }) { key.cancel() key.channel().close() e.printStackTrace() diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/niothread/WorkerLoopNioThread.kt b/AsyncSocket/src/main/kotlin/cn/tursom/niothread/WorkerLoopNioThread.kt index fd101cb..ea5eaf4 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/niothread/WorkerLoopNioThread.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/niothread/WorkerLoopNioThread.kt @@ -7,7 +7,7 @@ import java.nio.channels.Selector @Suppress("MemberVisibilityCanBePrivate", "CanBeParameter") class WorkerLoopNioThread( val threadName: String = "nioLoopThread", - override val selector: Selector = Selector.open(), + override var selector: Selector = Selector.open(), override val daemon: Boolean = false, override val timeout: Long = 3000, override val workLoop: (thread: NioThread, key: SelectionKey) -> Unit diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt b/AsyncSocket/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt index 64599b0..4eb7448 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt @@ -44,7 +44,7 @@ interface AsyncSocket : AsyncChannel { */ override suspend fun read(pool: MemoryPool, timeout: Long): ByteBuffer = read(timeout) { val buffer = pool.get() - if (channel.read(buffer) < 0) throw SocketException() + if (channel.read(buffer) < 0) throw SocketException("socket closed") buffer } } \ No newline at end of file diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt b/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt index e4eb3c4..a51f7f2 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/NioServer.kt @@ -26,7 +26,7 @@ open class NioServer( try { socket.handler() } catch (e: Exception) { - e.printStackTrace() + Exception(e).printStackTrace() } finally { try { socket.close() diff --git a/src/main/kotlin/cn/tursom/core/Tools.kt b/src/main/kotlin/cn/tursom/core/Tools.kt index 40083f0..52994a0 100644 --- a/src/main/kotlin/cn/tursom/core/Tools.kt +++ b/src/main/kotlin/cn/tursom/core/Tools.kt @@ -340,4 +340,13 @@ fun ByteArray.deflate(): ByteArray { // //fun ByteArray.undeflate(): ByteArray { // return DeflaterInputStream(ByteArrayInputStream(this)).readBytes() -//} \ No newline at end of file +//} + +inline fun <reified T : Any?> Any.assert(action: T.() -> Unit): Boolean { + return if (this is T) { + action() + true + } else { + false + } +} \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt b/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt index e650864..5789c87 100644 --- a/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt @@ -56,12 +56,12 @@ interface ByteBuffer : Closeable { fun readBuffer(): java.nio.ByteBuffer fun finishRead(buffer: java.nio.ByteBuffer) { - readPosition = buffer.position() + readPosition += buffer.position() } fun writeBuffer(): java.nio.ByteBuffer fun finishWrite(buffer: java.nio.ByteBuffer) { - writePosition = buffer.position() + writePosition += buffer.position() } fun reset() diff --git a/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtension.kt b/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtension.kt index bf60f18..370d041 100644 --- a/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtension.kt +++ b/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtension.kt @@ -34,16 +34,16 @@ inline fun <T> ByteBuffer.write(block: (java.nio.ByteBuffer) -> T): T { } } -fun ScatteringByteChannel.read(buffer: ByteBuffer): Int { - return if (buffer is MultipleByteBuffer) { +fun ReadableByteChannel.read(buffer: ByteBuffer): Int { + return if (buffer is MultipleByteBuffer && this is ScatteringByteChannel) { buffer.writeBuffers { read(it) }.toInt() } else { buffer.write { read(it) } } } -fun GatheringByteChannel.write(buffer: ByteBuffer): Int { - return if (buffer is MultipleByteBuffer) { +fun WritableByteChannel.write(buffer: ByteBuffer): Int { + return if (buffer is MultipleByteBuffer && this is GatheringByteChannel) { buffer.readBuffers { write(it) }.toInt() } else { buffer.read { write(it) } diff --git a/src/main/kotlin/cn/tursom/core/buffer/impl/DirectByteBuffer.kt b/src/main/kotlin/cn/tursom/core/buffer/impl/DirectByteBuffer.kt index 57030a3..3fa0361 100644 --- a/src/main/kotlin/cn/tursom/core/buffer/impl/DirectByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/buffer/impl/DirectByteBuffer.kt @@ -21,7 +21,7 @@ class DirectByteBuffer( buffer.limit(writePosition) if (buffer.position() != readPosition) buffer.position(readPosition) - return buffer + return buffer.slice() } override fun writeBuffer(): java.nio.ByteBuffer { @@ -29,7 +29,7 @@ class DirectByteBuffer( buffer.limit(capacity) if (buffer.position() != writePosition) buffer.position(writePosition) - return buffer + return buffer.slice() } override fun reset() { diff --git a/src/main/kotlin/cn/tursom/core/buffer/impl/HeapByteBuffer.kt b/src/main/kotlin/cn/tursom/core/buffer/impl/HeapByteBuffer.kt index 1051fd4..2eae40a 100644 --- a/src/main/kotlin/cn/tursom/core/buffer/impl/HeapByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/buffer/impl/HeapByteBuffer.kt @@ -29,7 +29,7 @@ class HeapByteBuffer( buffer.limit(writePosition) if (buffer.position() != readPosition) buffer.position(readPosition) - return buffer + return buffer.slice() } override fun writeBuffer(): java.nio.ByteBuffer { @@ -37,7 +37,7 @@ class HeapByteBuffer( buffer.limit(capacity) if (buffer.position() != writePosition) buffer.position(writePosition) - return buffer + return buffer.slice() } override fun reset() { diff --git a/utils/src/main/kotlin/cn/tursom/utils/bytebuffer/NettyByteBuffer.kt b/utils/src/main/kotlin/cn/tursom/utils/bytebuffer/NettyByteBuffer.kt index 47a2bd9..785a5ce 100644 --- a/utils/src/main/kotlin/cn/tursom/utils/bytebuffer/NettyByteBuffer.kt +++ b/utils/src/main/kotlin/cn/tursom/utils/bytebuffer/NettyByteBuffer.kt @@ -33,11 +33,11 @@ class NettyByteBuffer( override val resized: Boolean get() = false override fun readBuffer(): java.nio.ByteBuffer { - return byteBuf.internalNioBuffer(readPosition, readable) + return byteBuf.internalNioBuffer(readPosition, readable).slice() } override fun writeBuffer(): java.nio.ByteBuffer { - return byteBuf.internalNioBuffer(writePosition, writeable) + return byteBuf.internalNioBuffer(writePosition, writeable).slice() } override val readOffset: Int get() = byteBuf.arrayOffset() + byteBuf.readerIndex()