添加协程UDP服务端支持

This commit is contained in:
tursom 2020-05-18 19:43:40 +08:00
parent 4d52a4f8d8
commit 8eee7a644b
11 changed files with 254 additions and 7 deletions

View File

@ -3,6 +3,7 @@ package cn.tursom.channel
import cn.tursom.buffer.MultipleByteBuffer
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.core.buffer.read
import cn.tursom.core.pool.MemoryPool
import cn.tursom.core.timer.Timer
import cn.tursom.core.timer.WheelTimer
@ -78,7 +79,11 @@ interface AsyncChannel : Closeable {
return write(str.toByteArray(charset))
}
suspend fun read(pool: MemoryPool, timeout: Long = 0L): ByteBuffer
suspend fun read(pool: MemoryPool, timeout: Long = 0L): ByteBuffer = read(timeout) {
val buffer = pool.get()
if ((channel as ReadableByteChannel).read(buffer) < 0) throw SocketException()
buffer
}
fun waitMode() {
if (Thread.currentThread() == nioThread.thread) {

View File

@ -4,7 +4,7 @@ import cn.tursom.niothread.NioThread
import java.nio.channels.DatagramChannel
import java.nio.channels.SelectionKey
class NioDatagram(
open class NioDatagram(
override val channel: DatagramChannel,
override val key: SelectionKey,
override val nioThread: NioThread

View File

@ -0,0 +1,30 @@
package cn.tursom.datagram.server
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.net.SocketAddress
import java.util.concurrent.ConcurrentHashMap
class AsyncDatagramServer(
port: Int,
private val handler: suspend ServerNioDatagram.() -> Unit
) : LoopDatagramServer(port, DatagramProtocol) {
private val channelMap = ConcurrentHashMap<SocketAddress, ServerNioDatagram>()
private fun initChannel(address: SocketAddress): ServerNioDatagram {
val datagram = ServerNioDatagram(address, listenChannel, key, bossNioThread)
GlobalScope.launch { handler(datagram) }
return datagram
}
fun getChannel(address: SocketAddress): ServerNioDatagram {
var channel = channelMap[address]
if (channel != null) {
return channel
}
channel = initChannel(address)
channelMap[address] = channel
return channel
}
}

View File

@ -0,0 +1,24 @@
package cn.tursom.datagram.server
import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.core.buffer.write
import cn.tursom.niothread.NioProtocol
import cn.tursom.niothread.NioThread
import java.nio.channels.DatagramChannel
import java.nio.channels.SelectionKey
import kotlin.coroutines.resume
object DatagramProtocol : NioProtocol {
override fun handleRead(key: SelectionKey, nioThread: NioThread) {
val attr = key.attachment() as AsyncDatagramServer
val datagramChannel = key.channel() as DatagramChannel
val buffer = HeapByteBuffer(1024)
val address = buffer.write { datagramChannel.receive(it) }
val channel = attr.getChannel(address)
channel.addBuffer(buffer)
channel.cont?.resume(0)
}
override fun handleWrite(key: SelectionKey, nioThread: NioThread) {
}
}

View File

@ -0,0 +1,48 @@
package cn.tursom.datagram.server
import cn.tursom.niothread.NioProtocol
import cn.tursom.niothread.NioThread
import cn.tursom.niothread.WorkerLoopNioThread
import cn.tursom.niothread.loophandler.BossLoopHandler
import cn.tursom.niothread.loophandler.WorkerLoopHandler
import cn.tursom.socket.server.SocketServer
import java.net.InetSocketAddress
import java.nio.channels.DatagramChannel
import java.nio.channels.SelectionKey
import java.util.concurrent.atomic.AtomicBoolean
open class LoopDatagramServer(
override val port: Int,
protocol: NioProtocol,
nioThreadFactory: (
threadName: String,
workLoop: (thread: NioThread, key: SelectionKey) -> Unit
) -> NioThread = { name, workLoop ->
WorkerLoopNioThread(name, workLoop = workLoop, daemon = false)
}
) : SocketServer {
protected val listenChannel: DatagramChannel = DatagramChannel.open()
val bossNioThread = nioThreadFactory("nio-boss", WorkerLoopHandler(protocol))
private val started = AtomicBoolean(false)
protected lateinit var key: SelectionKey
override fun run() {
if (started.compareAndSet(false, true)) {
listenChannel.bind(InetSocketAddress(port))
listenChannel.configureBlocking(false)
bossNioThread.register(listenChannel, SelectionKey.OP_READ) {
it.attach(this)
key = it
}
}
}
override fun close() {
listenChannel.close()
bossNioThread.close()
}
protected fun finalize() {
close()
}
}

View File

@ -0,0 +1,81 @@
package cn.tursom.datagram.server
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.read
import cn.tursom.core.pool.MemoryPool
import cn.tursom.core.timer.TimerTask
import cn.tursom.core.timer.WheelTimer
import cn.tursom.datagram.NioDatagram
import cn.tursom.niothread.NioThread
import java.net.SocketAddress
import java.nio.channels.DatagramChannel
import java.nio.channels.SelectionKey
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.TimeoutException
import kotlin.coroutines.Continuation
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
class ServerNioDatagram(
val remoteAddress: SocketAddress,
channel: DatagramChannel,
key: SelectionKey,
nioThread: NioThread
) : NioDatagram(channel, key, nioThread) {
companion object {
val timer = WheelTimer.timer
}
private val bufferList = ConcurrentLinkedQueue<ByteBuffer>()
private var readBuffer: ByteBuffer? = null
var cont: Continuation<Int>? = null
private var timeoutTask: TimerTask? = null
fun addBuffer(buffer: ByteBuffer) {
bufferList.add(buffer)
}
override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long {
var write = 0L
buffer.forEach { buf ->
write += buf.read { channel.send(it, remoteAddress) }
}
return write
}
override suspend fun waitRead(timeout: Long) {
suspendCoroutine<Int> { cont ->
this.cont = cont
if (timeout > 0) {
timeoutTask = timer.exec(timeout) {
cont.resumeWithException(TimeoutException())
}
}
}
cont = null
timeoutTask?.cancel()
}
override suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long): Long {
if (bufferList.isEmpty()) waitRead()
val bufferIterator = buffer.iterator()
var write = 0L
while (bufferIterator.hasNext()) {
val buf = bufferIterator.next()
while (buf.writeable != 0) {
if (readBuffer == null || readBuffer?.readable == 0) {
readBuffer = bufferList.poll() ?: return write
}
write += buf.put(readBuffer!!)
}
}
return write
}
override suspend fun read(pool: MemoryPool, timeout: Long): ByteBuffer {
if (bufferList.isEmpty()) waitRead()
val buf = pool.get()
read(buf, timeout)
return buf
}
}

View File

@ -1,6 +1,5 @@
package cn.tursom.niothread
import cn.tursom.niothread.NioThread
import java.nio.channels.SelectionKey
interface NioProtocol {

View File

@ -14,7 +14,7 @@ import cn.tursom.socket.enhance.impl.SocketWriterImpl
object AsyncSocketSecurityUtil {
private val memoryPool = HeapMemoryPool(4096)
suspend fun initAESByRSAServer(socket: AsyncSocket, rsa: PublicKeyEncrypt): EnhanceSocket<ByteArray, ByteArray> {
suspend fun initActiveAESSocket(socket: AsyncSocket, rsa: PublicKeyEncrypt): EnhanceSocket<ByteArray, ByteArray> {
// 发送RSA公钥
socket.write(HeapByteBuffer(rsa.publicKey!!.encoded))
// 接受AES密钥
@ -23,7 +23,7 @@ object AsyncSocketSecurityUtil {
return SecurityEnhanceSocket(SocketReaderImpl(socket), ByteArrayWriter(SocketWriterImpl(socket)), aes)
}
suspend fun AsyncSocket.initAESByRSAClient(
suspend fun initPassiveAESSocket(
socket: AsyncSocket,
publicKeyEncryptBuilder: (key: ByteArray) -> PublicKeyEncrypt = { RSA(it) }
): EnhanceSocket<ByteArray, ByteArray> {

View File

@ -13,7 +13,7 @@ class SecurityNioServer(
@Suppress("MemberVisibilityCanBePrivate") val rsa: RSA = RSA(),
val handler: suspend AsyncSocket.() -> Unit
) : NioServer(port, backlog, coroutineScope, {
AsyncSocketSecurityUtil.initAESByRSAServer(this, rsa)
AsyncSocketSecurityUtil.initActiveAESSocket(this, rsa)
handler()
}) {
constructor(

View File

@ -0,0 +1,60 @@
package cn.tursom.datagram.server
import cn.tursom.core.log
import cn.tursom.core.pool.DirectMemoryPool
import cn.tursom.datagram.AsyncDatagramClient
import cn.tursom.datagram.BufferedNioDatagram
import kotlinx.coroutines.runBlocking
fun main() {
val port = 12345
val pool = DirectMemoryPool(1024, 16)
val server = AsyncDatagramServer(port) {
val buffed = BufferedNioDatagram(pool, this)
while (true) {
val buffer = buffed.read()
log("recv from client $remoteAddress: ${buffer.toString(buffer.readable)}")
buffed.write(buffer)
}
}
//val server = LoopDatagramServer(port, protocol = object : NioProtocol {
// override fun handleRead(key: SelectionKey, nioThread: NioThread) {
// val datagramChannel = key.channel() as DatagramChannel
// val buffer = HeapByteBuffer(1024)
// val address = buffer.write { datagramChannel.receive(it) }
// log("recv from client $address: ${buffer.toString(buffer.readable)}")
// buffer.read { datagramChannel.send(it, address) }
// }
//
// override fun handleWrite(key: SelectionKey, nioThread: NioThread) {
// }
//})
server.run()
runBlocking {
val input = System.`in`.bufferedReader()
var client = AsyncDatagramClient.connect("127.0.0.1", port).getBuffed(pool)
while (true) {
try {
print(">>>")
val line = input.readLine()
if (line.isEmpty()) continue
client.write(line)
val read = try {
client.read(3000)
} catch (e: Exception) {
client.close()
client = AsyncDatagramClient.connect("127.0.0.1", port).getBuffed(pool)
client.write(line)
client.read(3000)
}
log("recv from server: ${read.getString()}")
read.close()
} catch (e: Exception) {
Exception(e).printStackTrace()
client.close()
client = AsyncDatagramClient.connect("127.0.0.1", port).getBuffed(pool)
}
}
}
}

View File

@ -136,7 +136,7 @@ interface ByteBuffer : Closeable {
}
fun writeTo(buffer: ByteBuffer): Int {
val size = min(readable, buffer.readable)
val size = min(readable, buffer.writeable)
if (hasArray) {
buffer.put(array, readOffset, size)
readPosition += size