This commit is contained in:
tursom 2020-05-18 20:02:11 +08:00
parent 8eee7a644b
commit 28c4fe9ad5
8 changed files with 62 additions and 10 deletions

View File

@ -26,4 +26,8 @@ open class NioDatagram(
nioThread.wakeup()
}
}
override fun toString(): String {
return "NioDatagram(channel=$channel, key=$key, nioThread=$nioThread)"
}
}

View File

@ -5,18 +5,26 @@ import kotlinx.coroutines.launch
import java.net.SocketAddress
import java.util.concurrent.ConcurrentHashMap
class AsyncDatagramServer(
open class AsyncDatagramServer(
port: Int,
private val handler: suspend ServerNioDatagram.() -> Unit
) : LoopDatagramServer(port, DatagramProtocol) {
private val channelMap = ConcurrentHashMap<SocketAddress, ServerNioDatagram>()
private fun initChannel(address: SocketAddress): ServerNioDatagram {
val datagram = ServerNioDatagram(address, listenChannel, key, bossNioThread)
GlobalScope.launch { handler(datagram) }
val datagram = ServerNioDatagram(address, this, listenChannel, key, bossNioThread)
GlobalScope.launch {
datagram.use { datagram ->
handler(datagram)
}
}
return datagram
}
fun closeChannel(address: SocketAddress): ServerNioDatagram? {
return channelMap.remove(address)
}
fun getChannel(address: SocketAddress): ServerNioDatagram {
var channel = channelMap[address]
if (channel != null) {
@ -27,4 +35,3 @@ class AsyncDatagramServer(
return channel
}
}

View File

@ -0,0 +1,9 @@
package cn.tursom.datagram.server
import cn.tursom.core.pool.MemoryPool
class BufferedAsyncDatagramServer(
port: Int,
private val memoryPool: MemoryPool,
private val handler: suspend BufferedServerNioDatagram.() -> Unit
) : AsyncDatagramServer(port, { handler(BufferedServerNioDatagram(memoryPool, this)) })

View File

@ -0,0 +1,15 @@
package cn.tursom.datagram.server
import cn.tursom.core.pool.MemoryPool
import cn.tursom.datagram.AsyncDatagram
import cn.tursom.datagram.BufferedAsyncDatagram
import java.net.SocketAddress
class BufferedServerNioDatagram(override val pool: MemoryPool, override val prevChannel: ServerNioDatagram) :
BufferedAsyncDatagram,
AsyncDatagram by prevChannel {
val remoteAddress: SocketAddress get() = prevChannel.remoteAddress
override fun toString(): String {
return "BufferedServerNioDatagram(pool=$pool, prevChannel=$prevChannel)"
}
}

View File

@ -18,6 +18,7 @@ import kotlin.coroutines.suspendCoroutine
class ServerNioDatagram(
val remoteAddress: SocketAddress,
val server: AsyncDatagramServer,
channel: DatagramChannel,
key: SelectionKey,
nioThread: NioThread
@ -78,4 +79,12 @@ class ServerNioDatagram(
read(buf, timeout)
return buf
}
override fun close() {
server.closeChannel(remoteAddress)
}
override fun toString(): String {
return "ServerNioDatagram(remoteAddress=$remoteAddress, localAddress=${channel.localAddress})"
}
}

View File

@ -74,6 +74,10 @@ class WorkerLoopNioThread(
}
}
override fun toString(): String {
return "WorkerLoopNioThread(threadName='$threadName', selector=$selector, daemon=$daemon, timeout=$timeout, workLoop=$workLoop)"
}
class Future<T> : NioThreadTaskFuture<T> {
private val lock = Object()
private var exception: Throwable? = null
@ -111,4 +115,6 @@ class WorkerLoopNioThread(
}
}
}
}

View File

@ -65,6 +65,10 @@ class NioSocket(override val key: SelectionKey, override val nioThread: NioThrea
close()
}
override fun toString(): String {
return "NioSocket(key=$key, nioThread=$nioThread, channel=$channel)"
}
/**
* 伴生对象
*/

View File

@ -3,18 +3,16 @@ package cn.tursom.datagram.server
import cn.tursom.core.log
import cn.tursom.core.pool.DirectMemoryPool
import cn.tursom.datagram.AsyncDatagramClient
import cn.tursom.datagram.BufferedNioDatagram
import kotlinx.coroutines.runBlocking
fun main() {
val port = 12345
val pool = DirectMemoryPool(1024, 16)
val server = AsyncDatagramServer(port) {
val buffed = BufferedNioDatagram(pool, this)
val server = BufferedAsyncDatagramServer(port, pool) {
while (true) {
val buffer = buffed.read()
log("recv from client $remoteAddress: ${buffer.toString(buffer.readable)}")
buffed.write(buffer)
val buffer = read()
log("$this recv from client $remoteAddress: ${buffer.toString(buffer.readable)}")
write(buffer)
}
}
//val server = LoopDatagramServer(port, protocol = object : NioProtocol {