是 ByteArrayAdvanceByteBuffer 性能太低

This commit is contained in:
tursom 2019-11-06 22:11:09 +08:00
parent be04d7ff73
commit d713d31efd
6 changed files with 155 additions and 157 deletions

View File

@ -1,7 +1,10 @@
package cn.tursom.socket
import cn.tursom.core.logE
import cn.tursom.socket.niothread.WorkerLoopNioThread
import java.net.InetSocketAddress
import java.net.SocketException
import java.nio.channels.SelectableChannel
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import java.util.concurrent.TimeoutException
@ -9,101 +12,84 @@ import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
@Suppress("MemberVisibilityCanBePrivate")
object AsyncNioClient {
private const val TIMEOUT = 1000L
private val protocol = AsyncNioSocket.nioSocketProtocol
@JvmStatic
private val nioThread = WorkerLoopNioThread("nioClient") { nioThread ->
val selector = nioThread.selector
//logE("AsyncNioClient selector select")
if (selector.select(TIMEOUT) != 0) {
//logE("AsyncNioClient selector select successfully")
val keyIter = selector.selectedKeys().iterator()
while (keyIter.hasNext()) {
val key = keyIter.next()
keyIter.remove()
try {
when {
!key.isValid -> {
}
key.isReadable -> {
protocol.handleRead(key, nioThread)
}
key.isWritable -> {
protocol.handleWrite(key, nioThread)
}
key.isConnectable -> {
protocol.handleConnect(key, nioThread)
}
}
} catch (e: Throwable) {
try {
protocol.exceptionCause(key, nioThread, e)
} catch (e1: Throwable) {
e.printStackTrace()
e1.printStackTrace()
}
}
}
}
//logE("AsyncNioClient selector select end")
}
private const val TIMEOUT = 1000L
private val protocol = AsyncNioSocket.nioSocketProtocol
@JvmStatic
private val nioThread = WorkerLoopNioThread("nioClient", isDaemon = true) { nioThread ->
val selector = nioThread.selector
//logE("client keys: ${selector.keys().size}")
//logE("client op read: ${selector.keys().filter { key ->
// key.isValid && key.interestOps() == SelectionKey.OP_READ
//}.size}")
//logE("client op write: ${selector.keys().filter { key ->
// key.isValid && key.interestOps() == SelectionKey.OP_WRITE
//}.size}")
//logE("AsyncNioClient selector select")
if (selector.select(TIMEOUT) != 0) {
//logE("AsyncNioClient selector select successfully")
val keyIter = selector.selectedKeys().iterator()
while (keyIter.hasNext()) {
val key = keyIter.next()
keyIter.remove()
try {
when {
//!key.isValid -> {
//}
//key.isConnectable -> {
// protocol.handleConnect(key, nioThread)
//}
key.isReadable -> {
protocol.handleRead(key, nioThread)
}
key.isWritable -> {
protocol.handleWrite(key, nioThread)
}
}
} catch (e: Throwable) {
try {
protocol.exceptionCause(key, nioThread, e)
} catch (e1: Throwable) {
e.printStackTrace()
e1.printStackTrace()
key.cancel()
key.channel().close()
}
}
}
}
//logE("AsyncNioClient selector select end")
}
@Suppress("DuplicatedCode")
fun connect(host: String, port: Int): AsyncNioSocket {
val selector = nioThread.selector
val channel = SocketChannel.open()
channel.connect(InetSocketAddress(host, port))
channel.configureBlocking(false)
val f = nioThread.submit<SelectionKey> {
channel.register(selector, 0)
}
selector.wakeup()
val key: SelectionKey = f.get()
return AsyncNioSocket(key, nioThread)
}
suspend fun connect(host: String, port: Int): AsyncNioSocket {
return connect(host, port, 0)
}
@Suppress("DuplicatedCode")
suspend fun suspendConnect(host: String, port: Int): AsyncNioSocket {
val key: SelectionKey = suspendCoroutine { cont ->
try {
val channel = SocketChannel.open()
channel.connect(InetSocketAddress(host, port))
channel.configureBlocking(false)
nioThread.submit {
nioThread.register(channel, 0) { key ->
cont.resume(key)
}
}
nioThread.wakeup()
} catch (e: Exception) {
cont.resumeWithException(e)
}
}
return AsyncNioSocket(key, nioThread)
}
suspend fun connect(host: String, port: Int, timeout: Long): AsyncNioSocket {
val key: SelectionKey = suspendCoroutine { cont ->
val channel = getConnection(host, port)
val timeoutTask = if (timeout > 0) AsyncNioSocket.timer.exec(timeout) {
channel.close()
cont.resumeWithException(TimeoutException())
} else {
null
}
nioThread.register(channel, 0) { key ->
//key.attach(AsyncNioSocket.ConnectContext(cont, timeoutTask))
timeoutTask?.cancel()
cont.resume(key)
}
}
return AsyncNioSocket(key, nioThread)
}
@Suppress("DuplicatedCode")
suspend fun suspendConnect(host: String, port: Int, timeout: Long): AsyncNioSocket {
if (timeout <= 0) return suspendConnect(host, port)
val key: SelectionKey = suspendCoroutine { cont ->
val channel = SocketChannel.open()
channel.connect(InetSocketAddress(host, port))
channel.configureBlocking(false)
val timeoutTask = AsyncNioSocket.timer.exec(timeout) {
channel.close()
cont.resumeWithException(TimeoutException())
}
try {
nioThread.register(channel, 0) { key ->
timeoutTask.cancel()
cont.resume(key)
}
nioThread.wakeup()
} catch (e: Exception) {
cont.resumeWithException(e)
}
}
return AsyncNioSocket(key, nioThread)
}
private fun getConnection(host: String, port: Int): SelectableChannel {
val channel = SocketChannel.open()!!
if (!channel.connect(InetSocketAddress(host, port))) {
throw SocketException("connection failed")
}
channel.configureBlocking(false)
return channel
}
}

View File

@ -21,7 +21,7 @@ import kotlin.coroutines.suspendCoroutine
*/
class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INioThread) : IAsyncNioSocket {
override val channel: SocketChannel = key.channel() as SocketChannel
override suspend fun read(buffer: ByteBuffer): Int {
if (buffer.remaining() == 0) return emptyBufferCode
return operate {
@ -31,7 +31,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
channel.read(buffer)
}
}
override suspend fun read(buffer: Array<out ByteBuffer>): Long {
if (buffer.isEmpty()) return emptyBufferLongCode
return operate {
@ -39,7 +39,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
channel.read(buffer)
}
}
override suspend fun write(buffer: ByteBuffer): Int {
if (buffer.remaining() == 0) return emptyBufferCode
return operate {
@ -47,7 +47,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
channel.write(buffer)
}
}
override suspend fun write(buffer: Array<out ByteBuffer>): Long {
if (buffer.isEmpty()) return emptyBufferLongCode
return operate {
@ -55,7 +55,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
channel.write(buffer)
}
}
override suspend fun read(buffer: ByteBuffer, timeout: Long): Int {
//logE("AsyncNioSocket.read(buffer: ByteBuffer, timeout: Long): $buffer, $timeout")
if (timeout <= 0) return read(buffer)
@ -67,7 +67,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
channel.read(buffer)
}
}
override suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long): Long {
if (timeout <= 0) return read(buffer)
if (buffer.isEmpty()) return emptyBufferLongCode
@ -76,7 +76,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
channel.read(buffer)
}
}
override suspend fun write(buffer: ByteBuffer, timeout: Long): Int {
if (timeout <= 0) return write(buffer)
if (buffer.remaining() == 0) return emptyBufferCode
@ -85,7 +85,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
channel.write(buffer)
}
}
override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long {
if (timeout <= 0) return write(buffer)
if (buffer.isEmpty()) return emptyBufferLongCode
@ -94,7 +94,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
channel.write(buffer)
}
}
override fun close() {
nioThread.execute {
channel.close()
@ -102,16 +102,16 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
}
nioThread.wakeup()
}
private inline fun <T> operate(action: () -> T): T {
return try {
action()
} catch (e: Exception) {
waitMode()
throw RuntimeException(e)
throw e
}
}
private suspend inline fun waitRead(timeout: Long) {
suspendCoroutine<Int> {
key.attach(Context(it, timer.exec(timeout) {
@ -123,7 +123,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
nioThread.wakeup()
}
}
private suspend inline fun waitWrite(timeout: Long) {
suspendCoroutine<Int> {
key.attach(Context(it, timer.exec(timeout) {
@ -135,7 +135,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
nioThread.wakeup()
}
}
private suspend inline fun waitRead() {
suspendCoroutine<Int> {
//logE("waitRead() attach")
@ -146,7 +146,7 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
nioThread.wakeup()
}
}
private suspend inline fun waitWrite() {
suspendCoroutine<Int> {
key.attach(Context(it))
@ -154,13 +154,19 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
nioThread.wakeup()
}
}
data class Context(val cont: Continuation<Int>, val timeoutTask: TimerTask? = null)
data class ConnectContext(val cont: Continuation<SelectionKey>, val timeoutTask: TimerTask? = null)
companion object {
val nioSocketProtocol = object : INioProtocol {
override fun handleConnect(key: SelectionKey, nioThread: INioThread) {}
override fun handleConnect(key: SelectionKey, nioThread: INioThread) {
key.interestOps(0)
val context = key.attachment() as ConnectContext? ?: return
context.timeoutTask?.cancel()
context.cont.resume(key)
}
override fun handleRead(key: SelectionKey, nioThread: INioThread) {
key.interestOps(0)
//logE("read ready")
@ -168,14 +174,14 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
context.timeoutTask?.cancel()
context.cont.resume(0)
}
override fun handleWrite(key: SelectionKey, nioThread: INioThread) {
key.interestOps(0)
val context = key.attachment() as Context? ?: return
context.timeoutTask?.cancel()
context.cont.resume(0)
}
override fun exceptionCause(key: SelectionKey, nioThread: INioThread, e: Throwable) {
key.interestOps(0)
val context = key.attachment() as Context?
@ -188,10 +194,10 @@ class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INi
}
}
}
//val timer = StaticWheelTimer.timer
val timer = WheelTimer.timer
const val emptyBufferCode = 0
const val emptyBufferLongCode = 0L
}

View File

@ -1,6 +1,7 @@
package cn.tursom.socket
import cn.tursom.core.bytebuffer.AdvanceByteBuffer
import cn.tursom.core.bytebuffer.ByteArrayAdvanceByteBuffer
import cn.tursom.core.bytebuffer.readNioBuffer
import cn.tursom.core.bytebuffer.writeNioBuffer
import cn.tursom.core.logE
@ -13,7 +14,7 @@ interface AsyncSocket : Closeable {
suspend fun write(buffer: ByteBuffer, timeout: Long = 0L): Int = write(arrayOf(buffer), timeout).toInt()
suspend fun read(buffer: ByteBuffer, timeout: Long = 0L): Int = read(arrayOf(buffer), timeout).toInt()
override fun close()
suspend fun write(buffer: AdvanceByteBuffer, timeout: Long = 0): Int {
return if (buffer.bufferCount == 1) {
buffer.readNioBuffer {
@ -28,12 +29,15 @@ interface AsyncSocket : Closeable {
value
}
}
suspend fun read(buffer: AdvanceByteBuffer, timeout: Long = 0): Int {
//logE("buffer.bufferCount: ${buffer.bufferCount}")
//logE("AsyncSocket.read(buffer: AdvanceByteBuffer, timeout: Long = 0): buffer: $buffer")
return if (buffer.bufferCount == 1) {
buffer.writeNioBuffer {
//if (buffer is ByteArrayAdvanceByteBuffer) {
// logE(it.toString())
//}
read(it, timeout)
}
} else {

View File

@ -32,18 +32,5 @@ class AsyncNioServer(
}
}
}
}, backlog) {
/**
* 次要构造方法为使用Spring的同学们准备的
*/
constructor(
port: Int,
backlog: Int = 50,
handler: Handler
) : this(port, backlog, { handler.handle(this) })
interface Handler {
fun handle(socket: AsyncNioSocket)
}
}
}, backlog)

View File

@ -1,5 +1,6 @@
import cn.tursom.core.bytebuffer.ByteArrayAdvanceByteBuffer
import cn.tursom.core.log
import cn.tursom.core.bytebuffer.HeapNioAdvanceByteBuffer
import cn.tursom.core.logE
import cn.tursom.core.pool.DirectMemoryPool
import cn.tursom.core.pool.usingAdvanceByteBuffer
import cn.tursom.socket.AsyncNioClient
@ -7,23 +8,23 @@ import cn.tursom.socket.server.AsyncNioServer
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.lang.Thread.sleep
import java.net.SocketException
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
fun main() {
// 服务器端口,可任意指定
val port = 12345
val port = 12346
// 创建一个直接内存池每个块是1024字节共有256个快
val memoryPool = DirectMemoryPool(1024, 256)
val memoryPool = DirectMemoryPool(1024, 512)
// 创建服务器对象
val server = AsyncNioServer(port) {
//log("get new connection")
// 这里处理业务逻辑,套接字对象被以 this 的方式传进来
// 从内存池中获取一个内存块
memoryPool.usingAdvanceByteBuffer {
// 检查是否获取成功,不成功就创建一个堆缓冲
val buffer = it ?: ByteArrayAdvanceByteBuffer(1024)
val buffer = it ?: HeapNioAdvanceByteBuffer(1024)
try {
while (true) {
buffer.clear()
@ -38,39 +39,47 @@ fun main() {
//log("server send [$writeSize] bytes")
}
} catch (e: TimeoutException) {
e.printStackTrace()
}
// 代码块结束后,框架会自动释放连接
}
}
server.run()
val connectionCount = 300
val dataPerConn = 10
val connectionCount = 1000
val dataPerConn = 1
val testData = "testData".toByteArray()
val remain = AtomicInteger(connectionCount)
val remain = AtomicInteger(connectionCount * dataPerConn)
val clientMemoryPool = DirectMemoryPool(1024, connectionCount)
val start = System.currentTimeMillis()
repeat(connectionCount) {
GlobalScope.launch {
val socket = AsyncNioClient.connect("127.0.0.1", port)
clientMemoryPool.usingAdvanceByteBuffer {
// 检查是否获取成功,不成功就创建一个堆缓冲
val buffer = it ?: ByteArrayAdvanceByteBuffer(1024)
try {
//val buffer = it!!
val buffer = ByteArrayAdvanceByteBuffer(1024)
repeat(dataPerConn) {
buffer.clear()
buffer.put(testData)
//log("client sending: [${buffer.readableSize}] ${buffer.toString(buffer.readableSize)}")
val writeSize = socket.write(buffer)
if (writeSize == 0) {
logE("write size is zero")
} else if (writeSize < 0) {
return@usingAdvanceByteBuffer
}
//log("client write [$writeSize] bytes")
//log(buffer.toString())
val readSize = socket.read(buffer)
//log(buffer.toString())
//log("client recv: [$readSize:${buffer.readableSize}] ${buffer.toString(buffer.readableSize)}")
remain.decrementAndGet()
}
} catch (e: Exception) {
Exception(e).printStackTrace()
@ -78,15 +87,14 @@ fun main() {
socket.close()
}
}
remain.decrementAndGet()
}
}
while (remain.get() != 0) {
println(remain.get())
sleep(500)
}
val end = System.currentTimeMillis()
println(end - start)
server.close()

View File

@ -21,12 +21,12 @@ class NioServer(
) : ISocketServer {
private val listenChannel = ServerSocketChannel.open()
private val threadList = ConcurrentLinkedDeque<INioThread>()
init {
listenChannel.socket().bind(InetSocketAddress(port), backLog)
listenChannel.configureBlocking(false)
}
constructor(
port: Int,
protocol: INioProtocol,
@ -34,28 +34,35 @@ class NioServer(
) : this(port, protocol, backLog, { name, workLoop ->
WorkerLoopNioThread(name, workLoop = workLoop, isDaemon = false)
})
override fun run() {
val nioThread = nioThreadGenerator("nio worker", LoopHandler(protocol)::handle)
nioThread.register(listenChannel, SelectionKey.OP_ACCEPT) {}
threadList.add(nioThread)
}
override fun close() {
listenChannel.close()
threadList.forEach {
it.close()
}
}
class LoopHandler(val protocol: INioProtocol) {
fun handle(nioThread: INioThread) {
//logE("wake up")
val selector = nioThread.selector
//logE("server keys: ${selector.keys().size}")
//logE("server op read: ${selector.keys().filter { key ->
// key.isValid && key.interestOps() == SelectionKey.OP_READ
//}.size}")
//logE("server op write: ${selector.keys().filter { key ->
// key.isValid && key.interestOps() == SelectionKey.OP_WRITE
//}.size}")
if (selector.isOpen) {
if (selector.select(TIMEOUT) != 0) {
val keyIter = selector.selectedKeys().iterator()
while (keyIter.hasNext()) run whileBlock@{
while (keyIter.hasNext()) {
val key = keyIter.next()
keyIter.remove()
//logE("selected key: $key: ${key.attachment()}")
@ -94,7 +101,7 @@ class NioServer(
}
}
}
companion object {
private const val TIMEOUT = 1000L
}