修复空轮询bug

This commit is contained in:
tursom 2020-05-17 00:39:27 +08:00
parent 0c08352f8c
commit 09699dc619
11 changed files with 48 additions and 33 deletions

View File

@ -33,7 +33,7 @@ interface AsyncChannel : Closeable {
suspend fun <T> write(timeout: Long, action: () -> T): T {
return operate {
waitWrite()
waitWrite(timeout)
action()
}
}
@ -104,7 +104,11 @@ interface AsyncChannel : Closeable {
if (Thread.currentThread() == nioThread.thread) {
if (key.isValid) key.interestOps(SelectionKey.OP_WRITE)
} else {
nioThread.execute { if (key.isValid) key.interestOps(SelectionKey.OP_WRITE) }
nioThread.execute {
if (key.isValid) {
key.interestOps(SelectionKey.OP_WRITE)
}
}
nioThread.wakeup()
}
}

View File

@ -1,45 +1,47 @@
package cn.tursom.channel
import cn.tursom.core.assert
import cn.tursom.core.timer.TimerTask
import cn.tursom.niothread.NioProtocol
import cn.tursom.niothread.NioThread
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
object AsyncProtocol: NioProtocol {
object AsyncProtocol : NioProtocol {
data class Context(val cont: Continuation<Int>, val timeoutTask: TimerTask? = null)
data class ConnectContext(val cont: Continuation<SelectionKey>, val timeoutTask: TimerTask? = null)
override fun handleConnect(key: SelectionKey, nioThread: NioThread) {
key.interestOps(0)
val context = key.attachment() as ConnectContext? ?: return
context.timeoutTask?.cancel()
context.cont.resume(key)
key.channel().assert<SocketChannel> { finishConnect() }
key.attachment().assert<ConnectContext> {
timeoutTask?.cancel()
cont.resume(key)
}
}
override fun handleRead(key: SelectionKey, nioThread: NioThread) {
key.interestOps(0)
//logE("read ready")
val context = key.attachment() as Context? ?: return
context.timeoutTask?.cancel()
context.cont.resume(0)
key.attachment().assert<Context> {
timeoutTask?.cancel()
cont.resume(0)
}
}
override fun handleWrite(key: SelectionKey, nioThread: NioThread) {
key.interestOps(0)
val context = key.attachment() as Context? ?: return
context.timeoutTask?.cancel()
context.cont.resume(0)
key.attachment().assert<Context> {
timeoutTask?.cancel()
cont.resume(0)
}
}
override fun exceptionCause(key: SelectionKey, nioThread: NioThread, e: Throwable) {
key.interestOps(0)
val context = key.attachment() as Context?
if (context != null)
context.cont.resumeWithException(e)
else {
if (!key.attachment().assert<Context> { cont.resumeWithException(e) }) {
key.cancel()
key.channel().close()
e.printStackTrace()

View File

@ -7,7 +7,7 @@ import java.nio.channels.Selector
@Suppress("MemberVisibilityCanBePrivate", "CanBeParameter")
class WorkerLoopNioThread(
val threadName: String = "nioLoopThread",
override val selector: Selector = Selector.open(),
override var selector: Selector = Selector.open(),
override val daemon: Boolean = false,
override val timeout: Long = 3000,
override val workLoop: (thread: NioThread, key: SelectionKey) -> Unit

View File

@ -44,7 +44,7 @@ interface AsyncSocket : AsyncChannel {
*/
override suspend fun read(pool: MemoryPool, timeout: Long): ByteBuffer = read(timeout) {
val buffer = pool.get()
if (channel.read(buffer) < 0) throw SocketException()
if (channel.read(buffer) < 0) throw SocketException("socket closed")
buffer
}
}

View File

@ -26,7 +26,7 @@ open class NioServer(
try {
socket.handler()
} catch (e: Exception) {
e.printStackTrace()
Exception(e).printStackTrace()
} finally {
try {
socket.close()

View File

@ -340,4 +340,13 @@ fun ByteArray.deflate(): ByteArray {
//
//fun ByteArray.undeflate(): ByteArray {
// return DeflaterInputStream(ByteArrayInputStream(this)).readBytes()
//}
//}
inline fun <reified T : Any?> Any.assert(action: T.() -> Unit): Boolean {
return if (this is T) {
action()
true
} else {
false
}
}

View File

@ -56,12 +56,12 @@ interface ByteBuffer : Closeable {
fun readBuffer(): java.nio.ByteBuffer
fun finishRead(buffer: java.nio.ByteBuffer) {
readPosition = buffer.position()
readPosition += buffer.position()
}
fun writeBuffer(): java.nio.ByteBuffer
fun finishWrite(buffer: java.nio.ByteBuffer) {
writePosition = buffer.position()
writePosition += buffer.position()
}
fun reset()

View File

@ -34,16 +34,16 @@ inline fun <T> ByteBuffer.write(block: (java.nio.ByteBuffer) -> T): T {
}
}
fun ScatteringByteChannel.read(buffer: ByteBuffer): Int {
return if (buffer is MultipleByteBuffer) {
fun ReadableByteChannel.read(buffer: ByteBuffer): Int {
return if (buffer is MultipleByteBuffer && this is ScatteringByteChannel) {
buffer.writeBuffers { read(it) }.toInt()
} else {
buffer.write { read(it) }
}
}
fun GatheringByteChannel.write(buffer: ByteBuffer): Int {
return if (buffer is MultipleByteBuffer) {
fun WritableByteChannel.write(buffer: ByteBuffer): Int {
return if (buffer is MultipleByteBuffer && this is GatheringByteChannel) {
buffer.readBuffers { write(it) }.toInt()
} else {
buffer.read { write(it) }

View File

@ -21,7 +21,7 @@ class DirectByteBuffer(
buffer.limit(writePosition)
if (buffer.position() != readPosition)
buffer.position(readPosition)
return buffer
return buffer.slice()
}
override fun writeBuffer(): java.nio.ByteBuffer {
@ -29,7 +29,7 @@ class DirectByteBuffer(
buffer.limit(capacity)
if (buffer.position() != writePosition)
buffer.position(writePosition)
return buffer
return buffer.slice()
}
override fun reset() {

View File

@ -29,7 +29,7 @@ class HeapByteBuffer(
buffer.limit(writePosition)
if (buffer.position() != readPosition)
buffer.position(readPosition)
return buffer
return buffer.slice()
}
override fun writeBuffer(): java.nio.ByteBuffer {
@ -37,7 +37,7 @@ class HeapByteBuffer(
buffer.limit(capacity)
if (buffer.position() != writePosition)
buffer.position(writePosition)
return buffer
return buffer.slice()
}
override fun reset() {

View File

@ -33,11 +33,11 @@ class NettyByteBuffer(
override val resized: Boolean get() = false
override fun readBuffer(): java.nio.ByteBuffer {
return byteBuf.internalNioBuffer(readPosition, readable)
return byteBuf.internalNioBuffer(readPosition, readable).slice()
}
override fun writeBuffer(): java.nio.ByteBuffer {
return byteBuf.internalNioBuffer(writePosition, writeable)
return byteBuf.internalNioBuffer(writePosition, writeable).slice()
}
override val readOffset: Int get() = byteBuf.arrayOffset() + byteBuf.readerIndex()