This commit is contained in:
tursom 2021-04-11 23:40:14 +08:00
parent 914f6502c3
commit def6a9f752
98 changed files with 0 additions and 4328 deletions

View File

@ -1,4 +0,0 @@
dependencies {
compile project(":")
compile project(":log")
}

View File

@ -1,35 +0,0 @@
###异步套接字的协程封装
这个包实现了对异步的套接字的语句同步化封装,适用于 Kotlin 协程执行环境。
但是因为需要协程作为执行环境,所以无法在 Java 环境下正常创建。
其核心分别是对 AIO 进行封装的 AsyncAioSocket 和对 NIO 进行封装的
AsyncNioSocket。AsyncAioSocket 实现简单,但是可塑性较低,缺陷也较难解决;
AsyncNioSocket 虽然实现复杂,但是可塑性很高,优化空间大,缺陷一般也都可以解决。
---
AsyncAioSocket 和 AsyncNioSocket 分别通过对应的服务器与客户端创建。
创建一个异步服务器的形式和同步服务器的形式是完全一样的:
```kotlin
// 创建一个自带内存池的异步套接字服务器
val server = BufferedAsyncNioServer(port) { buffer->
// do any thing
// 这里都是用同步语法写出的异步套接字操作
read(buffer)
write(buffer)
}
// 异步服务器不需要创建新线程来执行
server.run()
// 异步套接字的创建既可以在普通环境下,也可以在协程环境下
val client = AsyncNioClient.connect("localhost", port)
runBlocking {
val buffer = ByteArrayAdvanceByteBuffer(1024)
// 向套接字内写数据
buffer.put("Hello!")
client.write(buffer)
// 从套接字内读数据
buffer.reset()
client.read(buffer)
log(buffer.getString())
client.close()
}
```

View File

@ -1,7 +0,0 @@
dependencies {
//implementation project(":")
compile project(":socket")
// kotlin
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2'
}

View File

@ -1,7 +0,0 @@
package cn.tursom.channel.enhance
import cn.tursom.socket.IAsyncNioSocket
interface EnhanceSocket<Read, Write> : SocketReader<Read>, SocketWriter<Write>, IAsyncNioSocket {
override fun close()
}

View File

@ -1,10 +0,0 @@
package cn.tursom.channel.enhance
import cn.tursom.core.buffer.ByteBuffer
import java.io.Closeable
interface SocketReader<T> : Closeable {
suspend fun get(buffer: ByteBuffer, timeout: Long = 0): T
override fun close()
}

View File

@ -1,13 +0,0 @@
package cn.tursom.channel.enhance
import java.io.Closeable
interface SocketWriter<T> : Closeable {
suspend fun put(value: T, timeout: Long) {
put(value)
flush(timeout)
}
suspend fun put(value: T)
suspend fun flush(timeout: Long = 0)
}

View File

@ -1,45 +0,0 @@
package cn.tursom.channel.enhance.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.socket.IAsyncNioSocket
import cn.tursom.channel.enhance.SocketReader
class LengthFieldBasedFrameReader(
val prevReader: SocketReader<ByteBuffer>
) : SocketReader<ByteBuffer> {
constructor(socket: IAsyncNioSocket) : this(SimpSocketReader(socket))
override suspend fun get(buffer: ByteBuffer, timeout: Long): ByteBuffer {
val rBuf = prevReader.get(buffer, timeout)
val blockSize = rBuf.getInt()
if (rBuf.readable == blockSize) {
return rBuf
} else if (rBuf.readable > blockSize) {
return if (rBuf.hasArray) {
val retBuf = HeapByteBuffer(rBuf.array, rBuf.readOffset, blockSize)
rBuf.readPosition += blockSize
retBuf
} else {
val targetBuffer = HeapByteBuffer(blockSize)
rBuf.writeTo(targetBuffer)
targetBuffer
}
}
val targetBuffer = HeapByteBuffer(blockSize)
rBuf.writeTo(targetBuffer)
while (targetBuffer.writeable != 0) {
val rBuf2 = prevReader.get(buffer, timeout)
if (rBuf2.readable == 0) return targetBuffer
rBuf2.writeTo(targetBuffer)
}
return targetBuffer
}
override fun close() {
prevReader.close()
}
}

View File

@ -1,36 +0,0 @@
package cn.tursom.channel.enhance.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.socket.IAsyncNioSocket
import cn.tursom.channel.enhance.SocketWriter
import cn.tursom.core.pool.DirectMemoryPool
import cn.tursom.core.pool.ExpandableMemoryPool
class LengthFieldPrependWriter(
val prevWriter: SocketWriter<ByteBuffer>
) : SocketWriter<ByteBuffer> {
constructor(socket: IAsyncNioSocket) : this(SimpSocketWriter(socket))
override suspend fun put(value: ByteBuffer) {
val buffer = directMemoryPool.getMemory()
buffer.put(value.readable)
prevWriter.put(buffer)
prevWriter.put(value)
buffer.close()
}
override suspend fun flush(timeout: Long) {
prevWriter.flush()
}
override fun close() {
prevWriter.close()
}
companion object {
@JvmStatic
private val directMemoryPool = ExpandableMemoryPool { DirectMemoryPool(4, 64) }
}
}

View File

@ -1,21 +0,0 @@
package cn.tursom.channel.enhance.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.socket.IAsyncNioSocket
import cn.tursom.channel.enhance.SocketReader
class SimpSocketReader(
val socket: IAsyncNioSocket
) : SocketReader<ByteBuffer> {
override suspend fun get(buffer: ByteBuffer, timeout: Long): ByteBuffer {
buffer.reset()
if (socket.read(buffer) < 0) {
socket.close()
}
return buffer
}
override fun close() {
socket.close()
}
}

View File

@ -1,26 +0,0 @@
package cn.tursom.channel.enhance.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.socket.IAsyncNioSocket
import cn.tursom.channel.enhance.SocketWriter
import java.util.concurrent.ConcurrentLinkedQueue
class SimpSocketWriter(
val socket: IAsyncNioSocket
) : SocketWriter<ByteBuffer> {
private val bufferQueue = ConcurrentLinkedQueue<ByteBuffer>()
override suspend fun put(value: ByteBuffer) {
bufferQueue.offer(value)
}
override suspend fun flush(timeout: Long) {
val buffers = bufferQueue.toTypedArray()
bufferQueue.clear()
socket.write(buffers, timeout)
buffers.forEach { it.close() }
}
override fun close() {
socket.close()
}
}

View File

@ -1,19 +0,0 @@
package cn.tursom.channel.enhance.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.socket.IAsyncNioSocket
import cn.tursom.channel.enhance.SocketReader
class StringReader(
val prevReader: SocketReader<ByteBuffer>
) : SocketReader<String> {
constructor(socket: IAsyncNioSocket) : this(LengthFieldBasedFrameReader(socket))
override suspend fun get(buffer: ByteBuffer, timeout: Long): String {
return prevReader.get(buffer, timeout).getString()
}
override fun close() {
prevReader.close()
}
}

View File

@ -1,17 +0,0 @@
package cn.tursom.channel.enhance.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.socket.IAsyncNioSocket
import cn.tursom.channel.enhance.EnhanceSocket
import cn.tursom.channel.enhance.SocketReader
import cn.tursom.channel.enhance.SocketWriter
class StringSocket(
socket: IAsyncNioSocket,
prevReader: SocketReader<ByteBuffer> = LengthFieldBasedFrameReader(socket),
prevWriter: SocketWriter<ByteBuffer> = LengthFieldPrependWriter(socket)
) : EnhanceSocket<String, String> by UnionEnhanceSocket(
socket,
StringReader(prevReader),
StringWriter(prevWriter)
)

View File

@ -1,26 +0,0 @@
package cn.tursom.channel.enhance.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.socket.IAsyncNioSocket
import cn.tursom.channel.enhance.SocketWriter
class StringWriter(
val prevWriter: SocketWriter<ByteBuffer>
) : SocketWriter<String> {
constructor(socket: IAsyncNioSocket) : this(LengthFieldPrependWriter(socket))
override suspend fun put(value: String) {
val buf = HeapByteBuffer(value.toByteArray())
buf.writePosition = buf.capacity
prevWriter.put(buf)
}
override suspend fun flush(timeout: Long) {
prevWriter.flush(timeout)
}
override fun close() {
prevWriter.close()
}
}

View File

@ -1,25 +0,0 @@
package cn.tursom.channel.enhance.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.timer.TimerTask
import cn.tursom.core.timer.WheelTimer
import cn.tursom.channel.enhance.SocketReader
class TimeoutReader<Read>(val prevReader: SocketReader<Read>, val timeout: Long = 5000L) : SocketReader<Read> {
private var timerTask: TimerTask? = null
override suspend fun get(buffer: ByteBuffer, timeout: Long): Read {
timerTask?.cancel()
timerTask = timer.exec(this.timeout) {
prevReader.close()
}
return prevReader.get(buffer, timeout)
}
override fun close() {
prevReader.close()
}
companion object {
val timer = WheelTimer.timer
}
}

View File

@ -1,21 +0,0 @@
package cn.tursom.channel.enhance.impl
import cn.tursom.socket.IAsyncNioSocket
import cn.tursom.channel.enhance.EnhanceSocket
import cn.tursom.channel.enhance.SocketReader
import cn.tursom.channel.enhance.SocketWriter
@Suppress("MemberVisibilityCanBePrivate")
class UnionEnhanceSocket<Read, Write>(
val socket: IAsyncNioSocket,
val prevReader: SocketReader<Read>,
val prevWriter: SocketWriter<Write>
) : EnhanceSocket<Read, Write>,
SocketReader<Read> by prevReader,
SocketWriter<Write> by prevWriter,
IAsyncNioSocket by socket {
override fun close() {
socket.close()
}
}

View File

@ -1,32 +0,0 @@
package cn.tursom.socket
import java.net.InetSocketAddress
import java.net.SocketAddress
import java.nio.channels.AsynchronousSocketChannel
import java.nio.channels.CompletionHandler
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
object AsyncAioClient {
private val handler = object : CompletionHandler<Void, Continuation<Void?>> {
override fun completed(result: Void?, attachment: Continuation<Void?>) {
attachment.resume(result)
}
override fun failed(exc: Throwable, attachment: Continuation<Void?>) {
attachment.resumeWithException(exc)
}
}
suspend fun connect(host: String, port: Int): AsyncAioSocket {
@Suppress("BlockingMethodInNonBlockingContext")
return connect(AsynchronousSocketChannel.open()!!, host, port)
}
suspend fun connect(socketChannel: AsynchronousSocketChannel, host: String, port: Int): AsyncAioSocket {
suspendCoroutine<Void?> { cont -> socketChannel.connect(InetSocketAddress(host, port) as SocketAddress, cont, handler) }
return AsyncAioSocket(socketChannel)
}
}

View File

@ -1,72 +0,0 @@
package cn.tursom.socket
import java.net.SocketAddress
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousSocketChannel
import java.nio.channels.CompletionHandler
import java.util.concurrent.TimeUnit
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
@Suppress("MemberVisibilityCanBePrivate")
open class AsyncAioSocket(val socketChannel: AsynchronousSocketChannel) : AsyncSocket {
val address: SocketAddress get() = socketChannel.remoteAddress
override suspend fun write(buffer: ByteBuffer, timeout: Long): Int {
return suspendCoroutine { cont ->
this.socketChannel.write(buffer, timeout, TimeUnit.MILLISECONDS, cont, awaitHandler)
}
}
override suspend fun read(buffer: ByteBuffer, timeout: Long): Int {
return suspendCoroutine { cont ->
this.socketChannel.read(buffer, timeout, TimeUnit.MILLISECONDS, cont, awaitHandler)
}
}
override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long {
return suspendCoroutine { cont ->
this.socketChannel.write(buffer, 0, buffer.size, timeout, TimeUnit.MILLISECONDS, cont, awaitLongHandler)
}
}
override suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long): Long {
return suspendCoroutine { cont ->
this.socketChannel.read(buffer, 0, buffer.size, timeout, TimeUnit.MILLISECONDS, cont, awaitLongHandler)
}
}
override fun close() {
socketChannel.close()
}
companion object {
const val defaultTimeout = 60_000L
@JvmStatic
private val awaitHandler =
object : CompletionHandler<Int, Continuation<Int>> {
override fun completed(result: Int, attachment: Continuation<Int>) {
attachment.resume(result)
}
override fun failed(exc: Throwable, attachment: Continuation<Int>) {
attachment.resumeWithException(exc)
}
}
@JvmStatic
private val awaitLongHandler =
object : CompletionHandler<Long, Continuation<Long>> {
override fun completed(result: Long, attachment: Continuation<Long>) {
attachment.resume(result)
}
override fun failed(exc: Throwable, attachment: Continuation<Long>) {
attachment.resumeWithException(exc)
}
}
}
}

View File

@ -1,109 +0,0 @@
package cn.tursom.socket
import cn.tursom.socket.niothread.WorkerLoopNioThread
import java.net.InetSocketAddress
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import java.util.concurrent.TimeoutException
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
object AsyncNioClient {
private const val TIMEOUT = 1000L
private val protocol = AsyncNioSocket.nioSocketProtocol
@JvmStatic
private val nioThread = WorkerLoopNioThread("nioClient") { nioThread ->
val selector = nioThread.selector
//logE("AsyncNioClient selector select")
if (selector.select(TIMEOUT) != 0) {
//logE("AsyncNioClient selector select successfully")
val keyIter = selector.selectedKeys().iterator()
while (keyIter.hasNext()) {
val key = keyIter.next()
keyIter.remove()
try {
when {
!key.isValid -> {
}
key.isReadable -> {
protocol.handleRead(key, nioThread)
}
key.isWritable -> {
protocol.handleWrite(key, nioThread)
}
key.isConnectable -> {
protocol.handleConnect(key, nioThread)
}
}
} catch (e: Throwable) {
try {
protocol.exceptionCause(key, nioThread, e)
} catch (e1: Throwable) {
e.printStackTrace()
e1.printStackTrace()
}
}
}
}
//logE("AsyncNioClient selector select end")
}
@Suppress("DuplicatedCode")
fun connect(host: String, port: Int): AsyncNioSocket {
val selector = nioThread.selector
val channel = SocketChannel.open()
channel.connect(InetSocketAddress(host, port))
channel.configureBlocking(false)
val f = nioThread.submit<SelectionKey> {
channel.register(selector, 0)
}
selector.wakeup()
val key: SelectionKey = f.get()
return AsyncNioSocket(key, nioThread)
}
@Suppress("DuplicatedCode")
suspend fun suspendConnect(host: String, port: Int): AsyncNioSocket {
val key: SelectionKey = suspendCoroutine { cont ->
try {
val channel = SocketChannel.open()
channel.connect(InetSocketAddress(host, port))
channel.configureBlocking(false)
nioThread.submit {
nioThread.register(channel, 0) { key ->
cont.resume(key)
}
}
nioThread.wakeup()
} catch (e: Exception) {
cont.resumeWithException(e)
}
}
return AsyncNioSocket(key, nioThread)
}
@Suppress("DuplicatedCode")
suspend fun suspendConnect(host: String, port: Int, timeout: Long): AsyncNioSocket {
if (timeout <= 0) return suspendConnect(host, port)
val key: SelectionKey = suspendCoroutine { cont ->
val channel = SocketChannel.open()
channel.connect(InetSocketAddress(host, port))
channel.configureBlocking(false)
val timeoutTask = AsyncNioSocket.timer.exec(timeout) {
channel.close()
cont.resumeWithException(TimeoutException())
}
try {
nioThread.register(channel, 0) { key ->
timeoutTask.cancel()
cont.resume(key)
}
nioThread.wakeup()
} catch (e: Exception) {
cont.resumeWithException(e)
}
}
return AsyncNioSocket(key, nioThread)
}
}

View File

@ -1,196 +0,0 @@
package cn.tursom.socket
import cn.tursom.core.timer.TimerTask
import cn.tursom.core.timer.WheelTimer
import cn.tursom.socket.niothread.INioThread
import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import java.util.concurrent.TimeoutException
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
/**
* 利用 SelectionKey attachment 进行状态的传输
* 导致该类无法利用 SelectionKey attachment
* 但是对于一般的应用而言是足够使用的
*/
class AsyncNioSocket(override val key: SelectionKey, override val nioThread: INioThread) : IAsyncNioSocket {
override val channel: SocketChannel = key.channel() as SocketChannel
override suspend fun read(buffer: ByteBuffer): Int {
if (buffer.remaining() == 0) return emptyBufferCode
return operate {
//logE("read(buffer: ByteBuffer) wait read")
waitRead()
//logE("read(buffer: ByteBuffer) wait read complete")
channel.read(buffer)
}
}
override suspend fun read(buffer: Array<out ByteBuffer>): Long {
if (buffer.isEmpty()) return emptyBufferLongCode
return operate {
waitRead()
channel.read(buffer)
}
}
override suspend fun write(buffer: ByteBuffer): Int {
if (buffer.remaining() == 0) return emptyBufferCode
return operate {
waitWrite()
channel.write(buffer)
}
}
override suspend fun write(buffer: Array<out ByteBuffer>): Long {
if (buffer.isEmpty()) return emptyBufferLongCode
return operate {
waitWrite()
channel.write(buffer)
}
}
override suspend fun read(buffer: ByteBuffer, timeout: Long): Int {
//logE("AsyncNioSocket.read(buffer: ByteBuffer, timeout: Long): $buffer, $timeout")
if (timeout <= 0) return read(buffer)
if (buffer.remaining() == 0) return emptyBufferCode
return operate {
//logE("wait read")
waitRead(timeout)
//logE("wait read complete")
channel.read(buffer)
}
}
override suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long): Long {
if (timeout <= 0) return read(buffer)
if (buffer.isEmpty()) return emptyBufferLongCode
return operate {
waitRead(timeout)
channel.read(buffer)
}
}
override suspend fun write(buffer: ByteBuffer, timeout: Long): Int {
if (timeout <= 0) return write(buffer)
if (buffer.remaining() == 0) return emptyBufferCode
return operate {
waitWrite(timeout)
channel.write(buffer)
}
}
override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long {
if (timeout <= 0) return write(buffer)
if (buffer.isEmpty()) return emptyBufferLongCode
return operate {
waitWrite(timeout)
channel.write(buffer)
}
}
override fun close() {
nioThread.execute {
channel.close()
key.cancel()
}
nioThread.wakeup()
}
private inline fun <T> operate(action: () -> T): T {
return try {
action()
} catch (e: Exception) {
waitMode()
throw RuntimeException(e)
}
}
private suspend inline fun waitRead(timeout: Long) {
suspendCoroutine<Int> {
key.attach(Context(it, timer.exec(timeout) {
key.attach(null)
waitMode()
it.resumeWithException(TimeoutException())
}))
readMode()
nioThread.wakeup()
}
}
private suspend inline fun waitWrite(timeout: Long) {
suspendCoroutine<Int> {
key.attach(Context(it, timer.exec(timeout) {
key.attach(null)
waitMode()
it.resumeWithException(TimeoutException())
}))
writeMode()
nioThread.wakeup()
}
}
private suspend inline fun waitRead() {
suspendCoroutine<Int> {
//logE("waitRead() attach")
key.attach(Context(it))
//logE("waitRead() readMode()")
readMode()
//logE("waitRead() wakeup()")
nioThread.wakeup()
}
}
private suspend inline fun waitWrite() {
suspendCoroutine<Int> {
key.attach(Context(it))
writeMode()
nioThread.wakeup()
}
}
data class Context(val cont: Continuation<Int>, val timeoutTask: TimerTask? = null)
companion object {
val nioSocketProtocol = object : INioProtocol {
override fun handleConnect(key: SelectionKey, nioThread: INioThread) {}
override fun handleRead(key: SelectionKey, nioThread: INioThread) {
key.interestOps(0)
//logE("read ready")
val context = key.attachment() as Context? ?: return
context.timeoutTask?.cancel()
context.cont.resume(0)
}
override fun handleWrite(key: SelectionKey, nioThread: INioThread) {
key.interestOps(0)
val context = key.attachment() as Context? ?: return
context.timeoutTask?.cancel()
context.cont.resume(0)
}
override fun exceptionCause(key: SelectionKey, nioThread: INioThread, e: Throwable) {
key.interestOps(0)
val context = key.attachment() as Context?
if (context != null)
context.cont.resumeWithException(e)
else {
key.cancel()
key.channel().close()
e.printStackTrace()
}
}
}
//val timer = StaticWheelTimer.timer
val timer = WheelTimer.timer
const val emptyBufferCode = 0
const val emptyBufferLongCode = 0L
}
}

View File

@ -1,33 +0,0 @@
package cn.tursom.socket
import cn.tursom.core.encrypt.Encrypt
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousSocketChannel
class AsyncSecurityAioSocket(
private val asyncAioSocket: AsyncAioSocket,
private val key: Encrypt
) : AsyncSocket by asyncAioSocket {
constructor(socketChannel: AsynchronousSocketChannel, key: Encrypt) : this(AsyncAioSocket(socketChannel), key)
override suspend fun write(buffer: ByteBuffer, timeout: Long): Int {
return asyncAioSocket.write(
ByteBuffer.wrap(key.encrypt(
buffer.array(),
buffer.arrayOffset() + buffer.position(),
buffer.limit()
)),
timeout
)
}
suspend fun readBytes(buffer: ByteBuffer, timeout: Long): ByteArray {
asyncAioSocket.read(buffer, timeout)
buffer.flip()
return key.encrypt(
buffer.array(),
buffer.arrayOffset() + buffer.position(),
buffer.limit()
)
}
}

View File

@ -1,40 +0,0 @@
package cn.tursom.socket
import cn.tursom.core.buffer.read
import cn.tursom.core.buffer.write
import java.io.Closeable
import java.nio.ByteBuffer
interface AsyncSocket : Closeable {
suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long = 0L): Long
suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long = 0L): Long
suspend fun write(buffer: ByteBuffer, timeout: Long = 0L): Int = write(arrayOf(buffer), timeout).toInt()
suspend fun read(buffer: ByteBuffer, timeout: Long = 0L): Int = read(arrayOf(buffer), timeout).toInt()
override fun close()
suspend fun write(buffer: cn.tursom.core.buffer.ByteBuffer, timeout: Long = 0): Int {
return buffer.read {
write(it, timeout)
}
}
suspend fun read(buffer: cn.tursom.core.buffer.ByteBuffer, timeout: Long = 0): Int {
return buffer.write {
read(it, timeout)
}
}
suspend fun write(buffers: Array<out cn.tursom.core.buffer.ByteBuffer>, timeout: Long): Long {
val nioBuffer = buffers.map { it.readBuffer() }.toTypedArray()
val writeSize = write(nioBuffer, timeout)
buffers.forEachIndexed { index, byteBuffer -> byteBuffer.finishRead(nioBuffer[index]) }
return writeSize
}
suspend fun write(buffers: Collection<cn.tursom.core.buffer.ByteBuffer>, timeout: Long): Long {
val nioBuffer = buffers.map { it.readBuffer() }.toTypedArray()
val writeSize = write(nioBuffer, timeout)
buffers.forEachIndexed { index, byteBuffer -> byteBuffer.finishRead(nioBuffer[index]) }
return writeSize
}
}

View File

@ -1,91 +0,0 @@
package cn.tursom.socket
import cn.tursom.core.buffer.write
import cn.tursom.socket.niothread.INioThread
import java.net.SocketException
import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
interface IAsyncNioSocket : AsyncSocket {
val channel: SocketChannel
val key: SelectionKey
val nioThread: INioThread
fun waitMode() {
if (Thread.currentThread() == nioThread.thread) {
if (key.isValid) key.interestOps(SelectionKey.OP_WRITE)
} else {
nioThread.execute { if (key.isValid) key.interestOps(0) }
nioThread.wakeup()
}
}
fun readMode() {
//logE("readMode()")
if (Thread.currentThread() == nioThread.thread) {
if (key.isValid) key.interestOps(SelectionKey.OP_WRITE)
} else {
nioThread.execute {
//logE("readMode() interest")
if (key.isValid) key.interestOps(SelectionKey.OP_READ)
//logE("readMode interestOps ${key.isValid} ${key.interestOps()}")
}
nioThread.wakeup()
}
}
fun writeMode() {
if (Thread.currentThread() == nioThread.thread) {
if (key.isValid) key.interestOps(SelectionKey.OP_WRITE)
} else {
nioThread.execute { if (key.isValid) key.interestOps(SelectionKey.OP_WRITE) }
nioThread.wakeup()
}
}
suspend fun read(buffer: ByteBuffer): Int = read(arrayOf(buffer)).toInt()
suspend fun write(buffer: ByteBuffer): Int = write(arrayOf(buffer)).toInt()
suspend fun read(buffer: Array<out ByteBuffer>): Long
suspend fun write(buffer: Array<out ByteBuffer>): Long
/**
* 如果通道已断开则会抛出异常
*/
suspend fun recv(buffer: ByteBuffer): Int {
if (buffer.remaining() == 0) return emptyBufferCode
val readSize = read(buffer)
if (readSize < 0) {
throw SocketException("channel closed")
}
return readSize
}
suspend fun recv(buffer: ByteBuffer, timeout: Long): Int {
if (buffer.remaining() == 0) return emptyBufferCode
val readSize = read(buffer, timeout)
if (readSize < 0) {
throw SocketException("channel closed")
}
return readSize
}
suspend fun recv(buffers: Array<out ByteBuffer>, timeout: Long): Long {
if (buffers.isEmpty()) return emptyBufferLongCode
val readSize = read(buffers, timeout)
if (readSize < 0) {
throw SocketException("channel closed")
}
return readSize
}
suspend fun recv(buffer: cn.tursom.core.buffer.ByteBuffer, timeout: Long = 0): Int {
return buffer.write {
recv(it, timeout)
}
}
companion object {
const val emptyBufferCode = 0
const val emptyBufferLongCode = 0L
}
}

View File

@ -1,41 +0,0 @@
package cn.tursom.socket.server
import cn.tursom.socket.AsyncNioSocket
import cn.tursom.socket.INioProtocol
import cn.tursom.socket.niothread.INioThread
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.nio.channels.SelectionKey
/**
* 有多个工作线程的协程套接字服务器
* 不过因为结构复杂所以性能一般比单个工作线程的 AsyncNioServer
*/
@Suppress("MemberVisibilityCanBePrivate")
class AsyncGroupNioServer(
override val port: Int,
val threads: Int = Runtime.getRuntime().availableProcessors(),
backlog: Int = 50,
override val handler: suspend AsyncNioSocket.() -> Unit
) : IAsyncNioServer, ISocketServer by GroupNioServer(
port,
threads,
object : INioProtocol by AsyncNioSocket.nioSocketProtocol {
override fun handleConnect(key: SelectionKey, nioThread: INioThread) {
GlobalScope.launch {
val socket = AsyncNioSocket(key, nioThread)
try {
socket.handler()
} catch (e: Exception) {
e.printStackTrace()
} finally {
try {
nioThread.execute { socket.close() }
} catch (e: Exception) {
}
}
}
}
},
backlog
)

View File

@ -1,49 +0,0 @@
package cn.tursom.socket.server
import cn.tursom.socket.AsyncNioSocket
import cn.tursom.socket.INioProtocol
import cn.tursom.socket.niothread.INioThread
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.nio.channels.SelectionKey
/**
* 只有一个工作线程的协程套接字服务器
* 不过因为结构更加简单所以性能一般比多个工作线程的 ProtocolGroupAsyncNioServer
* 而且协程是天生多线程并不需要太多的接受线程来处理所以一般只需要用本服务器即可
*/
class AsyncNioServer(
override val port: Int,
backlog: Int = 50,
override val handler: suspend AsyncNioSocket.() -> Unit
) : IAsyncNioServer, ISocketServer by NioServer(port, object : INioProtocol by AsyncNioSocket.nioSocketProtocol {
override fun handleConnect(key: SelectionKey, nioThread: INioThread) {
GlobalScope.launch {
val socket = AsyncNioSocket(key, nioThread)
try {
socket.handler()
} catch (e: Exception) {
Exception(e).printStackTrace()
} finally {
try {
socket.close()
} catch (e: Exception) {
}
}
}
}
}, backlog) {
/**
* 次要构造方法为使用Spring的同学们准备的
*/
constructor(
port: Int,
backlog: Int = 50,
handler: Handler
) : this(port, backlog, { handler.handle(this) })
interface Handler {
fun handle(socket: AsyncNioSocket)
}
}

View File

@ -1,50 +0,0 @@
package cn.tursom.socket.server
import cn.tursom.socket.AsyncAioSocket
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.net.InetSocketAddress
import java.nio.channels.AsynchronousCloseException
import java.nio.channels.AsynchronousServerSocketChannel
import java.nio.channels.AsynchronousSocketChannel
import java.nio.channels.CompletionHandler
class AsyncSocketServer(
override val port: Int,
host: String = "0.0.0.0",
private val handler: suspend AsyncAioSocket.() -> Unit
) : ISocketServer {
private val server = AsynchronousServerSocketChannel
.open()
.bind(InetSocketAddress(host, port))
override fun run() {
server.accept(0, object : CompletionHandler<AsynchronousSocketChannel, Int> {
override fun completed(result: AsynchronousSocketChannel?, attachment: Int) {
try {
server.accept(attachment + 1, this)
} catch (e: Throwable) {
e.printStackTrace()
}
result ?: return
GlobalScope.launch {
AsyncAioSocket(result).handler()
}
}
override fun failed(exc: Throwable?, attachment: Int?) {
when (exc) {
is AsynchronousCloseException -> {
}
else -> exc?.printStackTrace()
}
}
})
}
override fun close() {
server.close()
}
}

View File

@ -1,31 +0,0 @@
package cn.tursom.socket.server
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.pool.DirectMemoryPool
import cn.tursom.core.pool.MemoryPool
import cn.tursom.core.pool.invoke
import cn.tursom.socket.AsyncNioSocket
/**
* 带内存池的 NIO 套接字服务器<br />
* 其构造函数是标准写法的改造会向 handler 方法传入一个 ByteBuffer默认是 DirectByteBuffer
* 当内存池用完之后会换为 ByteArrayByteBuffer
*/
class BuffedAsyncNioServer(
port: Int,
memoryPool: MemoryPool,
backlog: Int = 50,
handler: suspend AsyncNioSocket.(buffer: ByteBuffer) -> Unit,
) : IAsyncNioServer by AsyncNioServer(port, backlog, {
memoryPool {
handler(it)
}
}) {
constructor(
port: Int,
blockSize: Int = 1024,
blockCount: Int = 128,
backlog: Int = 50,
handler: suspend AsyncNioSocket.(buffer: ByteBuffer) -> Unit,
) : this(port, DirectMemoryPool(blockSize, blockCount), backlog, handler)
}

View File

@ -1,7 +0,0 @@
package cn.tursom.socket.server
import cn.tursom.socket.AsyncNioSocket
interface IAsyncNioServer : ISocketServer {
val handler: suspend AsyncNioSocket.() -> Unit
}

View File

@ -1,8 +0,0 @@
package cn.tursom.datagram
object UdpPackageSize {
//定义不同环境下数据报的最大大小
const val LANNetLen = 1472
const val internetLen = 548
const val defaultLen = internetLen
}

View File

@ -1,46 +0,0 @@
package cn.tursom.datagram.broadcast
import cn.tursom.datagram.UdpPackageSize.LANNetLen
import java.net.DatagramPacket
import java.net.DatagramSocket
import java.net.InetAddress
import java.util.*
class BroadcastServer(val port: Int, val bufSize: Int = LANNetLen) {
private val socket = DatagramSocket()
private val server by lazy { DatagramSocket(port) }
private val buffer = DatagramPacket(ByteArray(bufSize), bufSize)
constructor(port: Int) : this(port, LANNetLen)
private fun send(packet: DatagramPacket) {
socket.send(packet)
}
fun send(packet: ByteArray) {
send(packet, 0, packet.size)
}
fun send(packet: ByteArray, offset: Int = 0, size: Int = packet.size) {
send(DatagramPacket(packet, offset, size, broadcastInetAddr, port))
}
fun recv(): ByteArray {
server.receive(buffer)
return Arrays.copyOfRange(buffer.data, 0, buffer.length)
}
fun recvBuffer(): DatagramPacket {
server.receive(buffer)
return buffer
}
companion object {
const val BROADCAST_IP = "255.255.255.255"
@JvmStatic
val broadcastInetAddr = InetAddress.getByName(BROADCAST_IP)
@JvmStatic
fun takeOut(packet: DatagramPacket) = Arrays.copyOfRange(packet.data, 0, packet.length)
}
}

View File

@ -1,43 +0,0 @@
package cn.tursom.datagram.client
import cn.tursom.datagram.UdpPackageSize.defaultLen
import java.io.Closeable
import java.net.DatagramPacket
import java.net.DatagramSocket
import java.net.InetSocketAddress
import java.net.SocketAddress
@Suppress("CanBeParameter")
class UdpClient(
private val host: String,
private val port: Int,
private val packageSize: Int = defaultLen
) : Closeable {
private val socket = DatagramSocket()
val address: SocketAddress = InetSocketAddress(host, port)
fun send(data: ByteArray, callback: ((data: ByteArray, size: Int) -> Unit)? = null): SocketAddress {
socket.send(DatagramPacket(data, data.size, address))
callback?.let {
//定义接受网络数据的字节数组
val inBuff = ByteArray(packageSize)
//已指定字节数组创建准备接受数据的DatagramPacket对象
val inPacket = DatagramPacket(inBuff, inBuff.size, address)
socket.receive(inPacket)
it(inPacket.data ?: return@let, inPacket.length)
}
return address
}
fun recv(buffer: ByteArray, callback: (ByteArray, size: Int) -> Unit) {
val inPacket = DatagramPacket(buffer, buffer.size, address)
socket.receive(inPacket)
callback(inPacket.data ?: return, inPacket.length)
}
override fun close() {
socket.close()
}
}

View File

@ -1,13 +0,0 @@
package cn.tursom.datagram.server
import java.net.DatagramPacket
import java.net.DatagramSocket
import java.net.SocketAddress
abstract class AbstractUdpServer : UDPServer {
protected abstract val socket: DatagramSocket
fun send(address: SocketAddress, buffer: ByteArray, size: Int = buffer.size, offset: Int = 0) {
socket.send(DatagramPacket(buffer, offset, size, address))
}
}

View File

@ -1,118 +0,0 @@
package cn.tursom.datagram.server
import cn.tursom.core.timer.WheelTimer
import java.net.InetSocketAddress
import java.net.SocketAddress
import java.nio.ByteBuffer
import java.nio.channels.DatagramChannel
import java.nio.channels.SelectionKey
import java.nio.channels.Selector
import java.util.concurrent.*
class AioUdpServer(
override val port: Int,
private val threadPool: ThreadPoolExecutor = ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
LinkedBlockingQueue(32)
),
private val queue: BlockingQueue<() -> Unit> = ArrayBlockingQueue(128),
private val handler: AioUdpServer.(channel: DatagramChannel, address: SocketAddress, buffer: ByteBuffer) -> Unit
) : UDPServer {
private val excWheelTimer = WheelTimer.timer
private val channel = DatagramChannel.open()!!
private val selector = Selector.open()!!
private var closed: Boolean = false
private val connectionMap = ConcurrentHashMap<SocketAddress, AioUdpServer.(
channel: DatagramChannel,
address: SocketAddress,
buffer: ByteBuffer
) -> Unit>()
init {
channel.configureBlocking(false)
channel.socket().bind(InetSocketAddress(port))
channel.register(selector, SelectionKey.OP_READ)
}
override fun run() {
val byteBuffer = ByteBuffer.allocateDirect(2048)
while (!closed) {
try {
val taskQueue = queue.iterator()
while (taskQueue.hasNext()) {
taskQueue.next()()
taskQueue.remove()
}
// 进行选择
val select = selector.select(60000)
if (select > 0) {
// 获取以选择的键的集合
val iterator = selector.selectedKeys().iterator()
while (iterator.hasNext()) {
val key = iterator.next() as SelectionKey
// 必须手动删除
iterator.remove()
if (key.isReadable) {
val datagramChannel = key.channel() as DatagramChannel
// 读取
byteBuffer.clear()
println(datagramChannel === channel)
val address = datagramChannel.receive(byteBuffer) ?: continue
val action =
connectionMap[address] ?: handler
threadPool.execute { action(datagramChannel, address, byteBuffer) }
}
}
}
} catch (e: Exception) {
e.printStackTrace()
}
}
}
override fun start() {
Thread(this, "AioUdpSer").start()
}
override fun close() {
closed = true
channel.close()
threadPool.shutdown()
selector.close()
}
fun read(
address: SocketAddress,
timeout: Long = 0L,
exc: (e: Exception) -> Unit = { it.printStackTrace() },
onComplete: (byteBuffer: ByteBuffer) -> Unit
) {
val timeoutTask = if (timeout > 0) {
excWheelTimer.exec(timeout) {
connectionMap.remove(address)
exc(TimeoutException("cn.tursom.datagram address $address read time out"))
}
} else {
null
}
connectionMap[address] = { _, _, buffer ->
timeoutTask?.cancel()
onComplete(buffer)
}
}
fun send(
channel: DatagramChannel,
address: SocketAddress,
buffer: ByteBuffer
) {
channel.send(buffer, address)
}
}

View File

@ -1,81 +0,0 @@
package cn.tursom.datagram.server
import cn.tursom.core.timer.WheelTimer
import cn.tursom.datagram.UdpPackageSize
import java.net.DatagramPacket
import java.net.DatagramSocket
import java.net.SocketAddress
import java.net.SocketException
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeoutException
class MultiThreadUDPServer(
override val port: Int,
val thread: Int = Runtime.getRuntime().availableProcessors(),
private val packageSize: Int = UdpPackageSize.defaultLen,
private val exception: Exception.() -> Unit = { printStackTrace() },
private val handler: MultiThreadUDPServer.(address: SocketAddress, buffer: ByteArray, size: Int) -> Unit
) : AbstractUdpServer() {
private val excWheelTimer = WheelTimer.timer
private val connectionMap: java.util.AbstractMap<
SocketAddress,
MultiThreadUDPServer.(
address: SocketAddress,
buffer: ByteArray,
size: Int
) -> Unit
> = ConcurrentHashMap()
override val socket = DatagramSocket(port)
override fun run() {
val inBuff = ByteArray(packageSize)
val inPacket = DatagramPacket(inBuff, inBuff.size)
while (true) {
try {
//读取inPacket的数据
socket.receive(inPacket)
val address = inPacket.socketAddress
(connectionMap[address] ?: handler)(address, inPacket.data, inPacket.length)
} catch (e: SocketException) {
if (e.message == "Socket closed" || e.message == "cn.tursom.socket closed") {
break
} else {
e.exception()
}
} catch (e: Exception) {
e.exception()
}
}
}
override fun start() {
for (i in 1..thread) {
Thread(this, "MTUdpSer$i").start()
}
}
@Suppress("NAME_SHADOWING")
fun recv(
address: SocketAddress,
timeout: Long = 0L,
onTimeout: (e: Exception) -> Unit = { it.printStackTrace() },
handler: MultiThreadUDPServer.(address: SocketAddress, buffer: ByteArray, size: Int) -> Unit
) {
val timeoutTask = if (timeout > 0L) {
excWheelTimer.exec(timeout) {
onTimeout(TimeoutException())
}
} else {
null
}
connectionMap[address] = { address: SocketAddress, buffer: ByteArray, size: Int ->
timeoutTask?.cancel()
handler(address, buffer, size)
}
}
override fun close() {
socket.close()
}
}

View File

@ -1,46 +0,0 @@
package cn.tursom.datagram.server
import cn.tursom.datagram.UdpPackageSize
import java.net.DatagramPacket
import java.net.DatagramSocket
import java.net.SocketAddress
import java.net.SocketException
class SimpleUdpServer(
override val port: Int,
private val packageSize: Int = UdpPackageSize.defaultLen,
private val exception: Exception.() -> Unit = { printStackTrace() },
private val handler: SimpleUdpServer.(address: SocketAddress, buffer: ByteArray, size: Int) -> Unit
) : AbstractUdpServer() {
override val socket = DatagramSocket(port)
override fun start() {
Thread(this, "SUdpServer").start()
}
override fun run() {
val inBuff = ByteArray(packageSize)
val inPacket = DatagramPacket(inBuff, inBuff.size)
while (true) {
try {
//读取inPacket的数据
socket.receive(inPacket)
val address = inPacket.socketAddress
handler(address, inPacket.data, inPacket.length)
} catch (e: SocketException) {
if (e.message == "Socket closed" || e.message == "cn.tursom.socket closed") {
break
} else {
e.exception()
}
} catch (e: Exception) {
e.exception()
}
}
}
override fun close() {
socket.close()
}
}

View File

@ -1,89 +0,0 @@
package cn.tursom.datagram.server
import cn.tursom.core.timer.WheelTimer
import cn.tursom.datagram.UdpPackageSize
import java.net.DatagramPacket
import java.net.DatagramSocket
import java.net.SocketAddress
import java.net.SocketException
import java.util.concurrent.*
class SingleThreadUdpServer(
override val port: Int,
private val connectionMap: java.util.AbstractMap<
SocketAddress,
SingleThreadUdpServer.(
address: SocketAddress,
buffer: ByteArray,
size: Int
) -> Unit
> = HashMap(),
private val queue: BlockingQueue<() -> Unit> = ArrayBlockingQueue(128),
private val packageSize: Int = UdpPackageSize.defaultLen,
private val exception: Exception.() -> Unit = { printStackTrace() },
private val handler: SingleThreadUdpServer.(address: SocketAddress, buffer: ByteArray, size: Int) -> Unit
) : AbstractUdpServer() {
private val excWheelTimer = WheelTimer.timer
override val socket = DatagramSocket(port)
override fun run() {
val inBuff = ByteArray(packageSize)
val inPacket = DatagramPacket(inBuff, inBuff.size)
while (true) {
try {
val taskQueue = queue.iterator()
while (taskQueue.hasNext()) {
try {
taskQueue.next()()
} catch (e: Exception) {
e.exception()
}
taskQueue.remove()
}
//读取inPacket的数据
socket.receive(inPacket)
val address = inPacket.socketAddress
(connectionMap[address] ?: handler)(address, inPacket.data, inPacket.length)
} catch (e: SocketException) {
if (e.message == "Socket closed" || e.message == "cn.tursom.socket closed") {
break
} else {
e.exception()
}
} catch (e: Exception) {
e.exception()
}
}
}
fun recv(
address: SocketAddress,
timeout: Long = 0L,
onTimeout: (e: Exception) -> Unit = { it.printStackTrace() },
handler: SingleThreadUdpServer.(address: SocketAddress, buffer: ByteArray, size: Int) -> Unit
) {
val timeoutTask = if (timeout > 0L) {
excWheelTimer.exec(timeout) {
onTimeout(TimeoutException())
}
} else {
null
}
queue.add {
connectionMap[address] = { address: SocketAddress, buffer: ByteArray, size: Int ->
timeoutTask?.cancel()
handler(address, buffer, size)
}
}
}
override fun start() {
Thread(this, "STUdpSer").start()
}
override fun close() {
socket.close()
}
}

View File

@ -1,103 +0,0 @@
package cn.tursom.datagram.server
import cn.tursom.core.timer.WheelTimer
import cn.tursom.datagram.UdpPackageSize
import java.net.DatagramPacket
import java.net.DatagramSocket
import java.net.SocketAddress
import java.net.SocketException
import java.util.concurrent.*
/**
* 单线程UDP服务器
*/
class ThreadPoolUDPServer(
override val port: Int,
private val connectionMap: java.util.AbstractMap<
SocketAddress,
ThreadPoolUDPServer.(
address: SocketAddress,
buffer: ByteArray,
size: Int
) -> Unit
> = HashMap(),
private val queue: BlockingQueue<() -> Unit> = LinkedBlockingQueue(128),
private val packageSize: Int = UdpPackageSize.defaultLen,
private val threadPool: ThreadPoolExecutor = ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
60_000L,
TimeUnit.MILLISECONDS,
LinkedBlockingQueue(32)
),
private val exception: Exception.() -> Unit = { printStackTrace() },
private val handler: ThreadPoolUDPServer.(address: SocketAddress, buffer: ByteArray, size: Int) -> Unit
) : AbstractUdpServer() {
private val excWheelTimer = WheelTimer.timer
override val socket = DatagramSocket(port)
override fun run() {
val inBuff = ByteArray(packageSize)
val inPacket = DatagramPacket(inBuff, inBuff.size)
while (true) {
try {
val taskQueue = queue.iterator()
while (taskQueue.hasNext()) {
try {
taskQueue.next()()
} catch (e: Exception) {
e.exception()
}
taskQueue.remove()
}
//读取inPacket的数据
socket.receive(inPacket)
val address = inPacket.socketAddress
val handler = (connectionMap[address] ?: handler)
threadPool.execute {
this.handler(address, inPacket.data, inPacket.length)
}
} catch (e: SocketException) {
if (e.message == "Socket closed" || e.message == "cn.tursom.socket closed") {
break
} else {
e.exception()
}
} catch (e: Exception) {
e.exception()
}
}
}
fun recv(
address: SocketAddress,
timeout: Long = 0L,
onTimeout: (e: Exception) -> Unit = { it.printStackTrace() },
handler: ThreadPoolUDPServer.(address: SocketAddress, buffer: ByteArray, size: Int) -> Unit
) {
val timeoutTask = if (timeout > 0L) {
excWheelTimer.exec(timeout) {
onTimeout(TimeoutException())
}
} else {
null
}
queue.add {
connectionMap[address] = { address: SocketAddress, buffer: ByteArray, size: Int ->
timeoutTask?.cancel()
handler(address, buffer, size)
}
}
}
override fun start() {
Thread(this, "TPUdpSer").start()
}
override fun close() {
socket.close()
}
}

View File

@ -1,10 +0,0 @@
package cn.tursom.datagram.server
import java.io.Closeable
interface UDPServer : Runnable, Closeable {
val port: Int
fun start()
}

View File

@ -1,76 +0,0 @@
package cn.tursom.socket
import cn.tursom.core.put
import java.io.Closeable
import java.net.Socket
/**
* 对基础的Socket做了些许封装
*/
@Suppress("unused", "MemberVisibilityCanBePrivate")
open class BaseSocket(
val socket: Socket,
val timeout: Int = Companion.timeout
) : Closeable {
val address = socket.inetAddress?.toString()?.drop(1) ?: "0.0.0.0"
val port = socket.port
val localPort = socket.localPort
val inputStream by lazy { socket.getInputStream()!! }
val outputStream by lazy { socket.getOutputStream()!! }
fun send(message: String?) {
send((message ?: return).toByteArray())
}
fun send(message: ByteArray?) {
outputStream.write(message ?: return)
}
fun send(message: Int) {
val buffer = ByteArray(4)
buffer.put(message)
send(buffer)
}
fun send(message: Long) {
val buffer = ByteArray(8)
buffer.put(message)
send(buffer)
}
override fun close() {
closeSocket()
}
protected fun closeSocket() {
if (!socket.isClosed) {
closeInputStream()
closeOutputStream()
socket.close()
}
}
private fun closeInputStream() {
try {
inputStream.close()
} catch (e: Exception) {
}
}
private fun closeOutputStream() {
try {
outputStream.close()
} catch (e: Exception) {
}
}
fun isConnected(): Boolean {
return socket.isConnected
}
companion object Companion {
const val defaultReadSize: Int = 1024 * 8
const val timeout: Int = 60 * 1000
}
}

View File

@ -1,23 +0,0 @@
package cn.tursom.socket
import cn.tursom.socket.niothread.INioThread
import java.nio.channels.SelectionKey
interface INioProtocol {
@Throws(Throwable::class)
fun handleConnect(key: SelectionKey, nioThread: INioThread) {
}
@Throws(Throwable::class)
fun handleRead(key: SelectionKey, nioThread: INioThread)
@Throws(Throwable::class)
fun handleWrite(key: SelectionKey, nioThread: INioThread)
@Throws(Throwable::class)
fun exceptionCause(key: SelectionKey, nioThread: INioThread, e: Throwable) {
key.cancel()
key.channel().close()
e.printStackTrace()
}
}

View File

@ -1,6 +0,0 @@
package cn.tursom.socket
/**
* 为了解决一个attachment不能携带多个数据的问题
*/
data class NioAttachment(var attachment: Any?, var protocol: INioProtocol)

View File

@ -1,51 +0,0 @@
package cn.tursom.socket
import java.io.IOException
import java.net.Socket
import java.net.SocketException
class SocketClient(
socket: Socket,
timeout: Int = Companion.timeout,
private val ioException: IOException.() -> Unit = { printStackTrace() },
private val exception: Exception.() -> Unit = { printStackTrace() },
func: (SocketClient.() -> Unit)? = null
) : BaseSocket(socket, timeout) {
init {
func?.let {
invoke(it)
}
}
constructor(
host: String,
port: Int,
timeout: Int = Companion.timeout,
ioException: IOException.() -> Unit = { printStackTrace() },
exception: Exception.() -> Unit = { printStackTrace() },
func: (SocketClient.() -> Unit)? = null
) : this(Socket(host, port), timeout, ioException, exception, func)
fun execute(func: SocketClient.() -> Unit) {
try {
func()
} catch (io: IOException) {
io.ioException()
} catch (e: SocketException) {
if (e.message == null) {
e.printStackTrace()
} else {
System.err.println("$address: ${e::class.java}: ${e.message}")
}
} catch (e: Exception) {
e.exception()
}
}
operator fun invoke(func: SocketClient.() -> Unit) {
val ret = execute(func)
closeSocket()
return ret
}
}

View File

@ -1,58 +0,0 @@
package cn.tursom.socket.niothread
import java.io.Closeable
import java.nio.channels.SelectableChannel
import java.nio.channels.SelectionKey
import java.nio.channels.Selector
import java.util.concurrent.Callable
/**
* 一个 nio 工作线程
* 一个线程对应一个 Selector 选择器
*/
interface INioThread : Closeable {
val selector: Selector
val closed: Boolean
val workLoop: (thread: INioThread) -> Unit
val thread: Thread
val isDaemon: Boolean
fun wakeup() {
if (Thread.currentThread() != thread) selector.wakeup()
}
/**
* 将通道注册到线程对应的选择器上
*/
fun register(channel: SelectableChannel, ops: Int, onComplete: (key: SelectionKey) -> Unit) {
if (Thread.currentThread() == thread) {
val key = channel.register(selector, ops)
onComplete(key)
} else {
execute {
val key = channel.register(selector, ops)
onComplete(key)
}
wakeup()
}
}
fun execute(command: Runnable)
fun execute(command: () -> Unit) {
execute(Runnable { command() })
}
fun <T> call(task: Callable<T>): T {
return submit(task).get()
}
fun <T> call(task: () -> T): T {
return call(Callable<T> { task() })
}
fun <T> submit(task: Callable<T>): NioThreadFuture<T>
fun <T> submit(task: () -> T): NioThreadFuture<T> {
return submit(Callable<T> { task() })
}
}

View File

@ -1,9 +0,0 @@
package cn.tursom.socket.niothread
import java.io.Closeable
import java.nio.channels.SelectableChannel
interface IWorkerGroup : Closeable {
val isDaemon: Boolean
fun register(channel: SelectableChannel, onComplete: (key: SelectionContext) -> Unit)
}

View File

@ -1,5 +0,0 @@
package cn.tursom.socket.niothread
interface NioThreadFuture<T> {
fun get(): T
}

View File

@ -1,5 +0,0 @@
package cn.tursom.socket.niothread
import java.nio.channels.SelectionKey
data class SelectionContext(val key: SelectionKey, val nioThread: INioThread)

View File

@ -1,68 +0,0 @@
package cn.tursom.socket.niothread
import java.nio.channels.SelectableChannel
import java.nio.channels.SelectionKey
import java.nio.channels.Selector
import java.util.concurrent.*
@Suppress("MemberVisibilityCanBePrivate")
class ThreadPoolNioThread(
val threadName: String = "",
override val selector: Selector = Selector.open(),
override val isDaemon: Boolean = false,
override val workLoop: (thread: INioThread) -> Unit
) : INioThread {
override lateinit var thread: Thread
val threadPool: ExecutorService = ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
LinkedBlockingQueue<Runnable>(),
ThreadFactory {
val thread = Thread(it)
this.thread = thread
thread.isDaemon = isDaemon
thread.name = threadName
thread
})
override var closed: Boolean = false
init {
threadPool.execute(object : Runnable {
override fun run() {
workLoop(this@ThreadPoolNioThread)
if (!threadPool.isShutdown) threadPool.execute(this)
}
})
}
override fun wakeup() {
if (Thread.currentThread() != thread) {
selector.wakeup()
}
}
override fun register(channel: SelectableChannel, ops: Int, onComplete: (key: SelectionKey) -> Unit) {
if (Thread.currentThread() == thread) {
onComplete(channel.register(selector, ops))
} else {
threadPool.execute { register(channel, ops, onComplete) }
wakeup()
}
}
override fun execute(command: Runnable) = threadPool.execute(command)
override fun <T> call(task: Callable<T>): T = threadPool.submit(task).get()
override fun <T> submit(task: Callable<T>): NioThreadFuture<T> = ThreadPoolFuture(threadPool.submit(task))
override fun close() {
closed = true
threadPool.shutdown()
}
class ThreadPoolFuture<T>(val future: Future<T>) : NioThreadFuture<T> {
override fun get(): T = future.get()
}
override fun toString(): String {
return "SingleThreadNioThread($threadName)"
}
}

View File

@ -1,29 +0,0 @@
package cn.tursom.socket.niothread
import java.nio.channels.SelectableChannel
@Suppress("MemberVisibilityCanBePrivate")
class ThreadPoolWorkerGroup(
val poolSize: Int = Runtime.getRuntime().availableProcessors(),
val groupName: String = "",
override val isDaemon: Boolean = false,
val worker: (thread: INioThread) -> Unit
) : IWorkerGroup {
val workerGroup = Array(poolSize) {
ThreadPoolNioThread("$groupName-$it", isDaemon = isDaemon, workLoop = worker)
}
var registered = 0
override fun register(channel: SelectableChannel, onComplete: (key: SelectionContext) -> Unit) {
val workerThread = workerGroup[registered++ % poolSize]
workerThread.register(channel, 0) {
onComplete(SelectionContext(it, workerThread))
}
}
override fun close() {
workerGroup.forEach {
it.close()
it.selector.close()
}
}
}

View File

@ -1,108 +0,0 @@
package cn.tursom.socket.niothread
import java.nio.channels.Selector
import java.util.concurrent.Callable
import java.util.concurrent.LinkedBlockingDeque
@Suppress("MemberVisibilityCanBePrivate", "CanBeParameter")
class WorkerLoopNioThread(
val threadName: String = "nioLoopThread",
override val selector: Selector = Selector.open(),
override val isDaemon: Boolean = false,
override val workLoop: (thread: INioThread) -> Unit
) : INioThread {
override var closed: Boolean = false
val waitQueue = LinkedBlockingDeque<Runnable>()
val taskQueue = LinkedBlockingDeque<Future<Any?>>()
override val thread = Thread {
while (!closed) {
try {
workLoop(this)
} catch (e: Exception) {
e.printStackTrace()
}
//System.err.println("$threadName worker loop finish once")
while (waitQueue.isNotEmpty()) try {
waitQueue.poll().run()
} catch (e: Exception) {
e.printStackTrace()
}
while (taskQueue.isNotEmpty()) try {
val task = taskQueue.poll()
try {
task.resume(task.task.call())
} catch (e: Throwable) {
task.resumeWithException(e)
}
} catch (e: Exception) {
e.printStackTrace()
}
}
}
init {
thread.name = threadName
thread.isDaemon = isDaemon
thread.start()
}
override fun execute(command: Runnable) {
waitQueue.add(command)
}
override fun <T> submit(task: Callable<T>): NioThreadFuture<T> {
val f = Future(task)
@Suppress("UNCHECKED_CAST")
taskQueue.add(f as Future<Any?>)
return f
}
override fun close() {
closed = true
}
override fun wakeup() {
if (Thread.currentThread() != thread) {
selector.wakeup()
}
}
class Future<T>(val task: Callable<T>) : NioThreadFuture<T> {
private val lock = Object()
private var exception: Throwable? = null
private var result: Pair<T, Boolean>? = null
override fun get(): T {
val result = this.result
return when {
exception != null -> throw RuntimeException(exception)
result != null -> result.first
else -> synchronized(lock) {
lock.wait()
val exception = this.exception
if (exception != null) {
throw RuntimeException(exception)
} else {
this.result!!.first
}
}
}
}
fun resume(value: T) {
result = value to true
synchronized(lock) {
lock.notifyAll()
}
}
fun resumeWithException(e: Throwable) {
exception = e
synchronized(lock) {
lock.notifyAll()
}
}
}
}

View File

@ -1,104 +0,0 @@
package cn.tursom.socket.server
import cn.tursom.socket.INioProtocol
import cn.tursom.socket.niothread.INioThread
import cn.tursom.socket.niothread.IWorkerGroup
import cn.tursom.socket.niothread.ThreadPoolNioThread
import cn.tursom.socket.niothread.ThreadPoolWorkerGroup
import java.net.InetSocketAddress
import java.nio.channels.SelectionKey
import java.nio.channels.Selector
import java.nio.channels.ServerSocketChannel
import java.util.concurrent.LinkedBlockingDeque
/**
* 拥有一个连接线程和多个工作线程的 nio 服务器
*/
@Suppress("MemberVisibilityCanBePrivate")
class GroupNioServer(
override val port: Int,
val threads: Int = Runtime.getRuntime().availableProcessors(),
private val protocol: INioProtocol,
backlog: Int = 50,
val nioThreadGenerator: (
threadName: String,
threads: Int,
worker: (thread: INioThread) -> Unit
) -> IWorkerGroup = { name, _, worker ->
ThreadPoolWorkerGroup(threads, name, false, worker)
}
) : ISocketServer {
private val listenChannel = ServerSocketChannel.open()
private val listenThreads = LinkedBlockingDeque<INioThread>()
private val workerGroupList = LinkedBlockingDeque<IWorkerGroup>()
init {
listenChannel.socket().bind(InetSocketAddress(port), backlog)
listenChannel.configureBlocking(false)
}
override fun run() {
val workerGroup = nioThreadGenerator(
"nioWorkerGroup", threads,
NioServer.LoopHandler(protocol)::handle
)
workerGroupList.add(workerGroup)
val nioThread = ThreadPoolNioThread("nioAccepter") { nioThread ->
val selector = nioThread.selector
if (selector.isOpen) {
forEachKey(selector) { key ->
try {
when {
key.isAcceptable -> {
val serverChannel = key.channel() as ServerSocketChannel
var channel = serverChannel.accept()
while (channel != null) {
channel.configureBlocking(false)
workerGroup.register(channel) { (key, thread) ->
protocol.handleConnect(key, thread)
}
channel = serverChannel.accept()
}
}
}
} catch (e: Throwable) {
try {
protocol.exceptionCause(key, nioThread, e)
} catch (e1: Throwable) {
e.printStackTrace()
e1.printStackTrace()
key.cancel()
key.channel().close()
}
}
nioThread.execute(this)
}
}
}
listenThreads.add(nioThread)
listenChannel.register(nioThread.selector, SelectionKey.OP_ACCEPT)
nioThread.wakeup()
}
override fun close() {
listenChannel.close()
listenThreads.forEach { it.close() }
workerGroupList.forEach { it.close() }
}
companion object {
const val TIMEOUT = 3000L
inline fun forEachKey(selector: Selector, action: (key: SelectionKey) -> Unit) {
if (selector.select(TIMEOUT) != 0) {
val keyIter = selector.selectedKeys().iterator()
while (keyIter.hasNext()) run whileBlock@{
val key = keyIter.next()
keyIter.remove()
action(key)
}
}
}
}
}

View File

@ -1,11 +0,0 @@
package cn.tursom.socket.server
import cn.tursom.socket.BaseSocket
interface ISimpleSocketServer : ISocketServer {
val handler: BaseSocket.() -> Unit
interface Handler {
fun handle(socket: BaseSocket)
}
}

View File

@ -1,14 +0,0 @@
package cn.tursom.socket.server
import java.io.Closeable
/**
* 套接字服务器的基本形式提供运行关闭的基本操作
* 其应支持最基本的创建形式
* XXXServer(port) {
* // 业务逻辑
* }
*/
interface ISocketServer : Runnable, Closeable {
val port: Int
}

View File

@ -1,62 +0,0 @@
package cn.tursom.socket.server
import cn.tursom.core.cpuNumber
import cn.tursom.socket.BaseSocket
import java.net.ServerSocket
/**
* 这是一个自动启用多个线程来处理请求的套接字服务器
*/
class MultithreadingSocketServer(
private val serverSocket: ServerSocket,
private val threadNumber: Int = cpuNumber,
override val handler: BaseSocket.() -> Unit
) : ISimpleSocketServer {
override val port = serverSocket.localPort
constructor(
port: Int,
threadNumber: Int = cpuNumber,
handler: BaseSocket.() -> Unit
) : this(ServerSocket(port), threadNumber, handler)
constructor(
port: Int,
handler: BaseSocket.() -> Unit
) : this(port, cpuNumber, handler)
constructor(
port: Int,
threadNumber: Int = cpuNumber,
handler: ISimpleSocketServer.Handler
) : this(ServerSocket(port), threadNumber, handler::handle)
constructor(
port: Int,
handler: ISimpleSocketServer.Handler
) : this(port, cpuNumber, handler::handle)
private val threadList = ArrayList<Thread>()
override fun run() {
for (i in 1..threadNumber) {
val thread = Thread {
while (true) {
serverSocket.accept().use {
try {
BaseSocket(it).handler()
} catch (e: Exception) {
e.printStackTrace()
}
}
}
}
thread.start()
threadList.add(thread)
}
}
override fun close() {
serverSocket.close()
}
}

View File

@ -1,99 +0,0 @@
package cn.tursom.socket.server
import cn.tursom.socket.INioProtocol
import cn.tursom.socket.niothread.INioThread
import cn.tursom.socket.niothread.WorkerLoopNioThread
import java.net.InetSocketAddress
import java.nio.channels.SelectionKey
import java.nio.channels.ServerSocketChannel
import java.util.concurrent.ConcurrentLinkedDeque
/**
* 工作在单线程上的 Nio 服务器
*/
class NioServer(
override val port: Int,
private val protocol: INioProtocol,
backLog: Int = 50,
val nioThreadGenerator: (threadName: String, workLoop: (thread: INioThread) -> Unit) -> INioThread
) : ISocketServer {
private val listenChannel = ServerSocketChannel.open()
private val threadList = ConcurrentLinkedDeque<INioThread>()
init {
listenChannel.socket().bind(InetSocketAddress(port), backLog)
listenChannel.configureBlocking(false)
}
constructor(
port: Int,
protocol: INioProtocol,
backLog: Int = 50
) : this(port, protocol, backLog, { name, workLoop ->
WorkerLoopNioThread(name, workLoop = workLoop, isDaemon = false)
})
override fun run() {
val nioThread = nioThreadGenerator("nio worker", LoopHandler(protocol)::handle)
nioThread.register(listenChannel, SelectionKey.OP_ACCEPT) {}
threadList.add(nioThread)
}
override fun close() {
listenChannel.close()
threadList.forEach {
it.close()
}
}
class LoopHandler(val protocol: INioProtocol) {
fun handle(nioThread: INioThread) {
//logE("wake up")
val selector = nioThread.selector
if (selector.isOpen) {
if (selector.select(TIMEOUT) != 0) {
val keyIter = selector.selectedKeys().iterator()
while (keyIter.hasNext()) run whileBlock@{
val key = keyIter.next()
keyIter.remove()
//logE("selected key: $key: ${key.attachment()}")
try {
when {
key.isAcceptable -> {
val serverChannel = key.channel() as ServerSocketChannel
var channel = serverChannel.accept()
while (channel != null) {
channel.configureBlocking(false)
nioThread.register(channel, 0) {
protocol.handleConnect(it, nioThread)
}
channel = serverChannel.accept()
}
}
key.isReadable -> {
protocol.handleRead(key, nioThread)
}
key.isWritable -> {
protocol.handleWrite(key, nioThread)
}
}
} catch (e: Throwable) {
try {
protocol.exceptionCause(key, nioThread, e)
} catch (e1: Throwable) {
e.printStackTrace()
e1.printStackTrace()
key.cancel()
key.channel().close()
}
}
}
}
}
}
}
companion object {
private const val TIMEOUT = 1000L
}
}

View File

@ -1,54 +0,0 @@
package cn.tursom.socket.server
import cn.tursom.socket.BaseSocket
import java.net.ServerSocket
import java.net.SocketException
/**
* 单线程套接字服务器
* 可以用多个线程同时运行该服务器可以正常工作
*/
class SingleThreadSocketServer(
private val serverSocket: ServerSocket,
override val handler: BaseSocket.() -> Unit
) : ISimpleSocketServer {
override val port = serverSocket.localPort
constructor(
port: Int,
handler: BaseSocket.() -> Unit
) : this(ServerSocket(port), handler)
constructor(
port: Int,
handler: ISimpleSocketServer.Handler
) : this(ServerSocket(port), handler::handle)
override fun run() {
while (!serverSocket.isClosed) {
try {
serverSocket.accept().use {
try {
BaseSocket(it).handler()
} catch (e: Exception) {
e.printStackTrace()
}
}
} catch (e: SocketException) {
if (e.message == "Socket closed" || e.message == "cn.tursom.socket closed") {
break
} else {
e.printStackTrace()
}
}
}
}
override fun close() {
try {
serverSocket.close()
} catch (e: Exception) {
e.printStackTrace()
}
}
}

View File

@ -1,137 +0,0 @@
package cn.tursom.socket.server
import cn.tursom.log.impl.Slf4jImpl
import cn.tursom.socket.BaseSocket
import java.io.IOException
import java.net.ServerSocket
import java.net.Socket
import java.net.SocketException
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
/**
* SocketServer线程池服务器
* 每当有新连接接入时就会将handler:Runnable加入线程池的任务队列中运行
* 通过重载handler:Runnable的getter处理业务逻辑
* start()函数实现无限循环监听同时自动处理异常
* 最新接入的套接字出存在socket变量中
* 通过调用close()或closeServer()关闭服务器造成的异常会被自动处理
*
* 标准使用例
* object : ThreadPoolSocketServer(port) {
* override val handler: Runnable
* get() = object : ServerHandler(cn.tursom.socket) {
* override fun handle() {
* ... // 业务逻辑代码
* }
* }
* }
*
*/
class ThreadPoolSocketServer
/**
* 使用代码而不是配置文件的构造函数
*
* @param port 运行端口必须指定
* @param threads 线程池最大线程数
* @param queueSize 线程池任务队列大小
* @param keepAliveTime 线程最长存活时间
* @param timeUnit timeout的单位默认毫秒
* @param handler 对套接字处理的业务逻辑
*/(
override val port: Int,
threads: Int = 1,
queueSize: Int = 1,
keepAliveTime: Long = 60_000L,
timeUnit: TimeUnit = TimeUnit.MILLISECONDS,
override val handler: BaseSocket.() -> Unit
) : ISimpleSocketServer {
constructor(
port: Int,
handler: BaseSocket.() -> Unit
) : this(port, 1, 1, 60_000L, TimeUnit.MILLISECONDS, handler)
var socket = Socket()
private val pool: ThreadPoolExecutor =
ThreadPoolExecutor(threads, threads, keepAliveTime, timeUnit, LinkedBlockingQueue(queueSize))
private var serverSocket: ServerSocket = ServerSocket(port)
/**
* 主要作用
* 循环接受连接请求
* 讲接收的连接交给handler处理
* 连接初期异常处理
* 自动关闭套接字服务器与线程池
*/
override fun run() {
while (!serverSocket.isClosed) {
try {
socket = serverSocket.accept()
debug("run(): get connect: {}", socket)
pool.execute {
socket.use {
BaseSocket(it).handler()
}
}
} catch (e: IOException) {
if (pool.isShutdown || serverSocket.isClosed) {
System.err.println("server closed")
break
}
e.printStackTrace()
} catch (e: SocketException) {
e.printStackTrace()
break
} catch (e: RejectedExecutionException) {
socket.getOutputStream()?.write(poolIsFull)
} catch (e: Exception) {
e.printStackTrace()
break
}
}
close()
System.err.println("server closed")
}
/**
* 关闭服务器套接字
*/
private fun closeServer() {
if (!serverSocket.isClosed) {
serverSocket.close()
}
}
/**
* 关闭线程池
*/
private fun shutdownPool() {
if (!pool.isShutdown) {
pool.shutdown()
}
}
/**
* 服务器是否已经关闭
*/
@Suppress("unused")
fun isClosed() = pool.isShutdown || serverSocket.isClosed
/**
* 关闭服务器
*/
override fun close() {
shutdownPool()
closeServer()
}
companion object : Slf4jImpl() {
/**
* 线程池满时返回给客户端的信息
*/
val poolIsFull = "server pool is full".toByteArray()
}
}

View File

@ -1,21 +0,0 @@
package cn.tursom.socket.server
import cn.tursom.socket.BaseSocket
import org.junit.Test
class SingleThreadSocketServerTest {
@Test
fun testCreateServer() {
val port = 12345
// Kotlin 写法
SingleThreadSocketServer(port) {
}.close()
// Java 写法
SingleThreadSocketServer(port, object : ISimpleSocketServer.Handler {
override fun handle(socket: BaseSocket) {
}
}).close()
}
}

View File

@ -1,7 +0,0 @@
dependencies {
compile project(":")
compile project(":AsyncSocket")
compile project(":utils")
compile project(":utils:ws-client")
api group: "io.netty", name: "netty-all", version: "4.1.43.Final"
}

View File

@ -1,11 +0,0 @@
package cn.tursom.forward
import cn.tursom.utils.bytebuffer.NettyByteBuffer
import cn.tursom.core.stream.OutputStream
import io.netty.buffer.ByteBuf
interface Forward : OutputStream {
var forward: Forward?
fun write(byteBuf: ByteBuf) = write(NettyByteBuffer(byteBuf))
override fun flush() {}
}

View File

@ -1,20 +0,0 @@
package cn.tursom.forward
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
interface NettyHandler {
fun onOpen(ctx: ChannelHandlerContext) {
}
fun recvBytes(buf: ByteBuf, ctx: ChannelHandlerContext) {
}
fun onClose(ctx: ChannelHandlerContext) {
}
fun exceptionCaused(cause: Throwable, ctx: ChannelHandlerContext) {
cause.printStackTrace()
ctx.close()
}
}

View File

@ -1,94 +0,0 @@
package cn.tursom.forward.datagram
import io.netty.buffer.ByteBuf
import io.netty.channel.*
import io.netty.channel.socket.DatagramPacket
import io.netty.util.ReferenceCountUtil
import io.netty.util.internal.RecyclableArrayList
import java.net.InetSocketAddress
import java.net.SocketAddress
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
class DatagramChannel constructor(
private val serverChannel: ServerDatagramChannel,
private val remote: InetSocketAddress
) : AbstractChannel(serverChannel) {
@Volatile
private var open = true
private var reading = AtomicBoolean(false)
private val metadata = ChannelMetadata(false)
private val config = DefaultChannelConfig(this)
private val buffers = ConcurrentLinkedQueue<ByteBuf>()
override fun metadata(): ChannelMetadata = metadata
override fun config(): ChannelConfig = config
override fun isActive(): Boolean = open
override fun isOpen(): Boolean = isActive
override fun doDisconnect() = doClose()
internal fun addBuffer(buffer: ByteBuf) = buffers.add(buffer)
override fun isCompatible(eventloop: EventLoop): Boolean = eventloop is DefaultEventLoop
override fun newUnsafe(): AbstractUnsafe = UdpChannelUnsafe()
override fun localAddress0(): SocketAddress = serverChannel.localAddress0()
override fun remoteAddress0(): SocketAddress = remote
override fun doBind(addr: SocketAddress) = throw UnsupportedOperationException()
override fun doClose() {
open = false
serverChannel.removeChannel(this)
}
override fun doBeginRead() {
if (!reading.compareAndSet(false, true)) return
try {
while (!buffers.isEmpty()) {
val buffer = buffers.poll() ?: continue
pipeline().fireChannelRead(buffer)
}
pipeline().fireChannelReadComplete()
} finally {
reading.set(false)
}
}
override fun doWrite(buffer: ChannelOutboundBuffer) {
val list = RecyclableArrayList.newInstance()
var freeList = true
try {
while (!buffer.isEmpty) {
val buf = buffer.current() as ByteBuf? ?: continue
list.add(buf.retain())
buffer.remove()
}
freeList = false
} finally {
if (freeList) {
for (obj in list) {
ReferenceCountUtil.safeRelease(obj)
}
list.recycle()
}
}
serverChannel.eventLoop().execute {
try {
for (buf in list) {
if (buf is ByteBuf) {
serverChannel.unsafe().write(DatagramPacket(buf, remote), voidPromise())
}
}
serverChannel.unsafe().flush()
} finally {
list.recycle()
}
}
}
private inner class UdpChannelUnsafe : AbstractUnsafe() {
override fun connect(
remoteAddress: SocketAddress?,
localAddress: SocketAddress?,
promise: ChannelPromise?
) = throw UnsupportedOperationException()
}
}

View File

@ -1,39 +0,0 @@
package cn.tursom.forward.datagram
import cn.tursom.forward.NettyHandler
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelInitializer
import io.netty.channel.DefaultEventLoopGroup
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.handler.timeout.ReadTimeoutHandler
import io.netty.handler.timeout.WriteTimeoutHandler
open class DatagramServer(
val port: Int,
var handler: (DatagramChannel) -> NettyHandler,
var readTimeout: Int? = 60,
var writeTimeout: Int? = 60
) {
private val bossGroup: EventLoopGroup = NioEventLoopGroup()
private val workerGroup: EventLoopGroup = DefaultEventLoopGroup()
private var b: ServerBootstrap? = null
private var future: ChannelFuture? = null
fun start() {
b = ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(ServerDatagramChannel::class.java)
.childHandler(object : ChannelInitializer<DatagramChannel>() {
override fun initChannel(ch: DatagramChannel) {
val pipeline = ch.pipeline()
if (readTimeout != null) pipeline.addLast(ReadTimeoutHandler(readTimeout!!))
if (writeTimeout != null) pipeline.addLast(WriteTimeoutHandler(writeTimeout!!))
pipeline.addLast("handle", DatagramServerHandler(handler(ch)))
}
})
future = b?.bind(port)
future?.sync()
}
}

View File

@ -1,26 +0,0 @@
package cn.tursom.forward.datagram
import cn.tursom.forward.NettyHandler
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
class DatagramServerHandler(private val handler: NettyHandler) : SimpleChannelInboundHandler<ByteBuf>() {
override fun channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf) {
handler.recvBytes(msg, ctx)
}
override fun channelInactive(ctx: ChannelHandlerContext) {
handler.onClose(ctx)
super.channelInactive(ctx)
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
handler.exceptionCaused(cause, ctx)
}
override fun channelActive(ctx: ChannelHandlerContext) {
handler.onOpen(ctx)
super.channelActive(ctx)
}
}

View File

@ -1,100 +0,0 @@
package cn.tursom.forward.datagram
import io.netty.channel.Channel
import io.netty.channel.ChannelMetadata
import io.netty.channel.ChannelOutboundBuffer
import io.netty.channel.nio.AbstractNioMessageChannel
import io.netty.channel.socket.DatagramPacket
import io.netty.channel.socket.ServerSocketChannel
import io.netty.util.internal.PlatformDependent
import java.net.InetSocketAddress
import java.net.SocketAddress
import java.net.StandardProtocolFamily
import java.nio.channels.SelectionKey
import java.nio.channels.spi.SelectorProvider
import java.util.concurrent.ConcurrentHashMap
class ServerDatagramChannel constructor(
datagramChannel: java.nio.channels.DatagramChannel
) : AbstractNioMessageChannel(null, datagramChannel, SelectionKey.OP_READ), ServerSocketChannel {
constructor() : this(SelectorProvider.provider().openDatagramChannel(StandardProtocolFamily.INET))
private val metadata = ChannelMetadata(true)
private val config: ServerDatagramChannelConfig = ServerDatagramChannelConfig(this, datagramChannel)
private val channels = ConcurrentHashMap<InetSocketAddress, DatagramChannel>()
override fun localAddress(): InetSocketAddress = super.localAddress() as InetSocketAddress
public override fun localAddress0(): SocketAddress = javaChannel().socket().localSocketAddress
override fun remoteAddress() = null
override fun remoteAddress0() = null
override fun metadata() = metadata
override fun config() = config
override fun isActive(): Boolean = javaChannel().isOpen && javaChannel().socket().isBound
override fun javaChannel() = super.javaChannel() as java.nio.channels.DatagramChannel
override fun doBind(localAddress: SocketAddress) = javaChannel().socket().bind(localAddress)
override fun doConnect(addr1: SocketAddress, addr2: SocketAddress) = throw UnsupportedOperationException()
override fun doFinishConnect() = throw UnsupportedOperationException()
override fun doDisconnect() = throw UnsupportedOperationException()
override fun doClose() {
for (channel in channels.values) channel.close()
javaChannel().close()
}
fun removeChannel(channel: Channel) {
if (channel is DatagramChannel) {
eventLoop().submit {
val remote = channel.remoteAddress() as InetSocketAddress
if (channels[remote] === channel) {
channels.remove(remote)
}
}
}
}
override fun doReadMessages(list: MutableList<Any>): Int {
val javaChannel = javaChannel()
val allocatorHandle = unsafe().recvBufAllocHandle()
val buffer = allocatorHandle.allocate(config.allocator)
allocatorHandle.attemptedBytesRead(buffer.writableBytes())
var freeBuffer = true
return try {
val nioBuffer = buffer.internalNioBuffer(buffer.writerIndex(), buffer.writableBytes())
val nioPos = nioBuffer.position()
val socketAddress = javaChannel.receive(nioBuffer) ?: return 0
allocatorHandle.lastBytesRead(nioBuffer.position() - nioPos)
buffer.writerIndex(buffer.writerIndex() + allocatorHandle.lastBytesRead())
var udpChannel = channels[socketAddress as InetSocketAddress]
if (udpChannel == null || !udpChannel.isOpen) {
udpChannel = DatagramChannel(this, socketAddress)
channels[socketAddress] = udpChannel
list.add(udpChannel)
udpChannel.addBuffer(buffer)
freeBuffer = false
1
} else {
udpChannel.addBuffer(buffer)
freeBuffer = false
if (udpChannel.isRegistered) udpChannel.read()
0
}
} catch (t: Throwable) {
PlatformDependent.throwException(t)
-1
} finally {
if (freeBuffer) buffer.release()
}
}
override fun doWriteMessage(msg: Any, buffer: ChannelOutboundBuffer): Boolean {
if (msg !is DatagramPacket) return false
val recipient = msg.recipient()
val byteBuf = msg.content()
val readableBytes = byteBuf.readableBytes()
if (readableBytes == 0) return true
val internalNioBuffer = byteBuf.internalNioBuffer(
byteBuf.readerIndex(), readableBytes
)
return javaChannel().send(internalNioBuffer, recipient) > 0
}
}

View File

@ -1,70 +0,0 @@
package cn.tursom.forward.datagram
import io.netty.buffer.ByteBufAllocator
import io.netty.channel.*
import io.netty.channel.socket.ServerSocketChannelConfig
import java.net.SocketException
import java.nio.channels.DatagramChannel
class ServerDatagramChannelConfig(
channel: Channel,
private val datagramChannel: DatagramChannel
) : DefaultChannelConfig(channel), ServerSocketChannelConfig {
init {
setRecvByteBufAllocator(FixedRecvByteBufAllocator(2048))
}
private inline fun withThis(action: () -> Unit): ServerDatagramChannelConfig {
action()
return this
}
@Suppress("DEPRECATION")
@Deprecated("super deprecated")
override fun setMaxMessagesPerRead(n: Int) = withThis { super.setMaxMessagesPerRead(n) }
override fun getBacklog(): Int = 1
override fun setBacklog(backlog: Int) = this
override fun setConnectTimeoutMillis(timeout: Int) = this
override fun setPerformancePreferences(arg0: Int, arg1: Int, arg2: Int) = this
override fun setAllocator(alloc: ByteBufAllocator) = withThis { super.setAllocator(alloc) }
override fun setAutoRead(autoread: Boolean) = withThis { super.setAutoRead(true) }
override fun setMessageSizeEstimator(est: MessageSizeEstimator) = withThis { super.setMessageSizeEstimator(est) }
override fun setWriteSpinCount(spincount: Int) = withThis { super.setWriteSpinCount(spincount) }
override fun setRecvByteBufAllocator(alloc: RecvByteBufAllocator) =
withThis { super.setRecvByteBufAllocator(alloc) }
override fun setWriteBufferHighWaterMark(writeBufferHighWaterMark: Int) =
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark) as ServerSocketChannelConfig
override fun setWriteBufferLowWaterMark(writeBufferLowWaterMark: Int) =
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark) as ServerSocketChannelConfig
override fun setWriteBufferWaterMark(writeBufferWaterMark: WriteBufferWaterMark) =
super.setWriteBufferWaterMark(writeBufferWaterMark) as ServerSocketChannelConfig
override fun getReceiveBufferSize(): Int = try {
datagramChannel.socket().receiveBufferSize
} catch (ex: SocketException) {
throw ChannelException(ex)
}
override fun setReceiveBufferSize(size: Int) = try {
datagramChannel.socket().receiveBufferSize = size
this
} catch (ex: SocketException) {
throw ChannelException(ex)
}
override fun isReuseAddress(): Boolean = try {
datagramChannel.socket().reuseAddress
} catch (ex: SocketException) {
throw ChannelException(ex)
}
override fun setReuseAddress(reuseaddr: Boolean) = try {
datagramChannel.socket().reuseAddress = true
this
} catch (ex: SocketException) {
throw ChannelException(ex)
}
}

View File

@ -1,53 +0,0 @@
package cn.tursom.forward.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.core.encrypt.AES
import cn.tursom.core.encrypt.PublicKeyEncrypt
import cn.tursom.core.encrypt.RSA
import cn.tursom.forward.Forward
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import java.net.InetSocketAddress
import java.net.SocketAddress
import java.nio.channels.SelectableChannel
class ActiveAesNIOForward(
host: SocketAddress,
channel: SelectableChannel = udp(),
timeout: Long = 3,
forward: Forward? = null,
publicKeyEncrypt: PublicKeyEncrypt = RSA()
) : NIOForward(host, channel, timeout, forward) {
constructor(
host: String,
port: Int,
channel: SelectableChannel = udp(),
timeout: Long = 3,
forward: Forward? = null,
publicKeyEncrypt: PublicKeyEncrypt = RSA()
) : this(InetSocketAddress(host, port), channel, timeout, forward, publicKeyEncrypt)
private lateinit var aes: AES
protected var recvMsgHandler: (ByteBuffer) -> Unit = ::recvAESKey
private lateinit var publicKeyEncrypt: PublicKeyEncrypt
init {
@Suppress("LeakingThis")
write(publicKeyEncrypt.publicKey!!.encoded)
}
override fun recvMsg(msg: ByteBuffer) {
recvMsgHandler(msg)
}
private fun superRecvMsg(msg: ByteBuffer) {
super.recvMsg(HeapByteBuffer(publicKeyEncrypt.decrypt(msg.toByteArray())))
}
private fun recvAESKey(buf: ByteBuffer) {
aes = AES(publicKeyEncrypt.decrypt(buf))
recvMsgHandler = ::superRecvMsg
}
}

View File

@ -1,38 +0,0 @@
package cn.tursom.forward.impl
import cn.tursom.core.encrypt.AES
import cn.tursom.core.encrypt.PublicKeyEncrypt
import cn.tursom.core.encrypt.RSA
import cn.tursom.forward.Forward
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
open class ActiveAesNettyForward(
channel: Channel,
forward: Forward? = null,
private val publicKeyEncrypt: PublicKeyEncrypt = RSA()
) : NettyForward(channel, forward) {
protected var recvBytesHandler: (buf: ByteBuf, ctx: ChannelHandlerContext) -> Unit = ::recvAESKey
private lateinit var aes: AES
init {
channel.writeAndFlush(Unpooled.wrappedBuffer(publicKeyEncrypt.publicKey!!.encoded))
}
override fun recvBytes(buf: ByteBuf, ctx: ChannelHandlerContext) {
val buffer = ByteArray(buf.readableBytes())
buf.readBytes(buffer)
recvBytesHandler(Unpooled.wrappedBuffer(aes.decrypt(buffer)), ctx)
}
private fun superRecvBytes(buf: ByteBuf, ctx: ChannelHandlerContext) = super.recvBytes(buf, ctx)
private fun recvAESKey(buf: ByteBuf, ctx: ChannelHandlerContext) {
val key = ByteArray(buf.readableBytes())
buf.readBytes(key)
aes = AES(publicKeyEncrypt.decrypt(key))
recvBytesHandler = ::superRecvBytes
}
}

View File

@ -1,18 +0,0 @@
package cn.tursom.forward.impl
import cn.tursom.forward.Forward
import java.net.InetSocketAddress
import java.net.SocketAddress
class DatagramForward(
host: SocketAddress,
timeout: Long = 3,
forward: Forward? = null
) : NIOForward(host, udp(), timeout, forward) {
constructor(
host: String,
port: Int,
timeout: Long = 3,
forward: Forward? = null
) : this(InetSocketAddress(host, port), timeout, forward)
}

View File

@ -1,164 +0,0 @@
package cn.tursom.forward.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.read
import cn.tursom.core.buffer.write
import cn.tursom.core.pool.DirectMemoryPool
import cn.tursom.core.pool.ExpandableMemoryPool
import cn.tursom.core.pool.MemoryPool
import cn.tursom.core.timer.WheelTimer
import cn.tursom.core.unaryPlus
import cn.tursom.forward.Forward
import cn.tursom.niothread.NioProtocol
import cn.tursom.niothread.NioThread
import cn.tursom.niothread.WorkerLoopNioThread
import cn.tursom.niothread.loophandler.WorkerLoopHandler
import io.netty.util.internal.logging.InternalLogger
import io.netty.util.internal.logging.Slf4JLoggerFactory
import java.net.InetSocketAddress
import java.net.SocketAddress
import java.nio.channels.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
/**
* only support UDP(DatagramChannel) and TCP(SocketChannel)
*/
open class NIOForward(
host: SocketAddress,
private val channel: SelectableChannel = udp(),
val timeout: Long = 3,
final override var forward: Forward? = null
) : Forward {
private lateinit var key: SelectionKey
private var timeoutTask = timer.exec(timeout * 1000) {
close()
}
constructor(
host: String,
port: Int,
channel: SelectableChannel = udp(),
timeout: Long = 3,
forward: Forward? = null
) : this(InetSocketAddress(host, port), channel, timeout, forward)
init {
when (channel) {
is SocketChannel, is DatagramChannel -> {
}
else -> throw UnsupportedOperationException()
}
@Suppress("LeakingThis")
forward?.forward = this
channel.configureBlocking(false)
when (channel) {
is SocketChannel -> channel.connect(host)
is DatagramChannel -> channel.connect(host)
}
val latch = CountDownLatch(1)
nioThread.register(
channel,
if (channel is DatagramChannel) SelectionKey.OP_READ else SelectionKey.OP_CONNECT
) { key ->
this.key = key
if (channel is DatagramChannel) {
latch.countDown()
key.attach(this)
} else {
key.attach(this to latch)
}
}
latch.await(3, TimeUnit.SECONDS)
}
private fun resetTimeout() {
timeoutTask.cancel()
timeoutTask = timer.exec(timeout * 1000) {
close()
}
}
override fun write(buffer: ByteBuffer) {
log.debug("recv msg from agent {}", +{ buffer.toString(buffer.readable) })
resetTimeout()
if (channel is WritableByteChannel) {
channel.write(buffer)
buffer.close()
} else {
buffer.close()
throw UnsupportedOperationException()
}
}
override fun close() {
nioThread.execute {
key.cancel()
}
}
open fun recvMsg(msg: ByteBuffer) {
log.debug("connected from tcp {}", +{ msg.toString(msg.readable) })
resetTimeout()
forward?.write(msg)
}
@Suppress("unused")
companion object {
fun tcp(): SocketChannel = SocketChannel.open()
fun udp(): DatagramChannel = DatagramChannel.open()
private val timer = WheelTimer.timer
private val log: InternalLogger = Slf4JLoggerFactory.getInstance(NIOForward::class.java)
private val memoryPool: MemoryPool = ExpandableMemoryPool(128) { DirectMemoryPool(1024, 16) }
@Suppress("UNCHECKED_CAST")
private val nioThread: NioThread = WorkerLoopNioThread(
"NIOForwardLooper",
workLoop = WorkerLoopHandler(object : NioProtocol {
override fun handleConnect(key: SelectionKey, nioThread: NioThread) {
key.interestOps(SelectionKey.OP_READ)
val (forward, latch) = (key.attachment() as Pair<NIOForward, CountDownLatch>)
latch.countDown()
key.attach(forward)
}
override fun handleRead(key: SelectionKey, nioThread: NioThread) {
val channel = key.channel()
val forward = key.attachment() as NIOForward
val buffer = memoryPool.get()
if (channel is ReadableByteChannel) {
channel.read(buffer)
}
log.debug("recv msg from {}: {}", +{
when (channel) {
is SocketChannel -> channel.remoteAddress
is DatagramChannel -> channel.remoteAddress
else -> null
}
}, +{
buffer.toString(buffer.readable)
})
forward.recvMsg(buffer)
buffer.close()
}
override fun handleWrite(key: SelectionKey, nioThread: NioThread) {}
override fun exceptionCause(key: SelectionKey, nioThread: NioThread, e: Throwable) {
val channel = key.channel()
log.error("exception caused on handler msg, address: {}", +{
when (channel) {
is SocketChannel -> channel.remoteAddress
is DatagramChannel -> channel.remoteAddress
else -> null
}
}, e)
val forward = key.attachment() as NIOForward
forward.close()
forward.forward?.close()
}
})
)
}
}

View File

@ -1,61 +0,0 @@
package cn.tursom.forward.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.unaryPlus
import cn.tursom.forward.Forward
import cn.tursom.forward.NettyHandler
import cn.tursom.utils.bytebuffer.NettyByteBuffer
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.timeout.ReadTimeoutException
import io.netty.handler.timeout.WriteTimeoutException
import io.netty.util.internal.logging.InternalLogger
import io.netty.util.internal.logging.Slf4JLoggerFactory
open class NettyForward(
protected val channel: Channel,
final override var forward: Forward? = null
) : NettyHandler, Forward {
companion object {
private val log: InternalLogger = Slf4JLoggerFactory.getInstance(NettyForward::class.java)
}
init {
@Suppress("LeakingThis")
forward?.forward = this
}
override fun write(buffer: ByteBuffer) {
log.debug("recv msg from agent {}", +{ buffer.toString(buffer.readable) })
val future = channel.writeAndFlush(Unpooled.wrappedBuffer(buffer.readBuffer()))
future.addListener {
buffer.close()
}
}
override fun close() {
channel.close()
}
override fun recvBytes(buf: ByteBuf, ctx: ChannelHandlerContext) {
val buffer = NettyByteBuffer(buf)
log.debug(
"recv msg from socket client {}: {}",
ctx.channel().remoteAddress(),
+{ buffer.toString(buffer.readable) })
forward?.write(buffer) ?: buf.release()
}
override fun exceptionCaused(cause: Throwable, ctx: ChannelHandlerContext) {
if (cause !is ReadTimeoutException && cause !is WriteTimeoutException) {
log.error("exception caused on socket forward", cause)
}
ctx.close()
}
override fun onClose(ctx: ChannelHandlerContext) {
forward?.close()
}
}

View File

@ -1,51 +0,0 @@
package cn.tursom.forward.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.core.encrypt.AES
import cn.tursom.core.encrypt.PublicKeyEncrypt
import cn.tursom.core.encrypt.RSA
import cn.tursom.forward.Forward
import java.net.InetSocketAddress
import java.net.SocketAddress
import java.nio.channels.SelectableChannel
/**
* only support UDP(DatagramChannel) and TCP(SocketChannel)
*/
open class PassiveAesNIOForward(
host: SocketAddress,
channel: SelectableChannel = udp(),
timeout: Long = 3,
forward: Forward? = null,
private val aes: AES = AES(),
private val publicKeyEncryptBuilder: (publicKey: ByteArray) -> PublicKeyEncrypt = { RSA(it) }
) : NIOForward(host, channel, timeout, forward) {
constructor(
host: String,
port: Int,
channel: SelectableChannel = udp(),
timeout: Long = 3,
forward: Forward? = null,
aes: AES = AES(),
publicKeyEncryptBuilder: (publicKey: ByteArray) -> PublicKeyEncrypt = { RSA(it) }
) : this(InetSocketAddress(host, port), channel, timeout, forward, aes, publicKeyEncryptBuilder)
protected var recvMsgHandler: (ByteBuffer) -> Unit = ::recvPublicKeyEncrypt
private lateinit var publicKeyEncrypt: PublicKeyEncrypt
override fun recvMsg(msg: ByteBuffer) {
recvMsgHandler(msg)
}
private fun superRecvMsg(msg: ByteBuffer) {
super.recvMsg(HeapByteBuffer(publicKeyEncrypt.decrypt(msg.toByteArray())))
}
private fun recvPublicKeyEncrypt(msg: ByteBuffer) {
publicKeyEncrypt = publicKeyEncryptBuilder(msg.toByteArray())
write(aes.secKey.encoded)
recvMsgHandler = ::superRecvMsg
}
}

View File

@ -1,34 +0,0 @@
package cn.tursom.forward.impl
import cn.tursom.core.encrypt.AES
import cn.tursom.core.encrypt.PublicKeyEncrypt
import cn.tursom.core.encrypt.RSA
import cn.tursom.forward.Forward
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
open class PassiveAesNettyForward(
channel: Channel,
forward: Forward? = null,
private val aes: AES = AES(),
private val publicKeyEncryptBuilder: (ByteArray) -> PublicKeyEncrypt = { RSA(it) }
) : NettyForward(channel, forward) {
protected var recvBytesHandler: (buf: ByteBuf, ctx: ChannelHandlerContext) -> Unit = ::recvAESKey
override fun recvBytes(buf: ByteBuf, ctx: ChannelHandlerContext) {
val buffer = ByteArray(buf.readableBytes())
buf.readBytes(buffer)
recvBytesHandler(Unpooled.wrappedBuffer(aes.decrypt(buffer)), ctx)
}
private fun superRecvBytes(buf: ByteBuf, ctx: ChannelHandlerContext) = super.recvBytes(buf, ctx)
private fun recvAESKey(buf: ByteBuf, ctx: ChannelHandlerContext) {
val key = ByteArray(buf.readableBytes())
buf.readBytes(key)
val publicKeyEncrypt = publicKeyEncryptBuilder(key)
write(publicKeyEncrypt.encrypt(aes.secKey.encoded))
recvBytesHandler = ::superRecvBytes
}
}

View File

@ -1,19 +0,0 @@
package cn.tursom.forward.impl
import cn.tursom.forward.Forward
import java.net.InetSocketAddress
import java.net.SocketAddress
class SocketForward(
host: SocketAddress,
timeout: Long = 3,
forward: Forward? = null
) : NIOForward(host, tcp(), timeout, forward) {
constructor(
host: String,
port: Int,
timeout: Long = 3,
forward: Forward? = null
) : this(InetSocketAddress(host, port), timeout, forward)
}

View File

@ -1,65 +0,0 @@
package cn.tursom.forward.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.unaryPlus
import cn.tursom.forward.Forward
import cn.tursom.forward.ws.WebSocketHandler
import cn.tursom.utils.bytebuffer.NettyByteBuffer
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.socket.SocketChannel
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
import io.netty.handler.timeout.ReadTimeoutException
import io.netty.handler.timeout.WriteTimeoutException
import io.netty.util.internal.logging.InternalLogger
import io.netty.util.internal.logging.Slf4JLoggerFactory
class WSForward(
private val wsChannel: SocketChannel,
override var forward: Forward? = null
) : WebSocketHandler, Forward {
companion object {
private val log: InternalLogger = Slf4JLoggerFactory.getInstance(WSForward::class.java)
}
init {
forward?.forward = this
}
override fun write(buffer: ByteBuffer) {
log.debug("recv msg from agent {}", +{ buffer.toString(buffer.readable) })
val future = wsChannel.writeAndFlush(TextWebSocketFrame(Unpooled.wrappedBuffer(buffer.readBuffer())))
future.addListener {
buffer.close()
}
}
override fun close() {
wsChannel.close()
}
override fun recvStr(str: ByteBuf, ctx: ChannelHandlerContext) {
recvBytes(str, ctx)
}
override fun recvBytes(buf: ByteBuf, ctx: ChannelHandlerContext) {
val buffer = NettyByteBuffer(buf)
log.debug(
"recv msg from ws client {}: {}",
ctx.channel().remoteAddress(),
+{ buffer.toString(buffer.readable) })
forward?.write(buffer) ?: buf.release()
}
override fun exceptionCaused(cause: Throwable, ctx: ChannelHandlerContext) {
if (cause !is ReadTimeoutException && cause !is WriteTimeoutException) {
log.error("exception caused on web socket forward", cause)
}
ctx.close()
}
override fun onClose(ctx: ChannelHandlerContext) {
forward?.close()
}
}

View File

@ -1,49 +0,0 @@
package cn.tursom.forward.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.forward.Forward
import cn.tursom.ws.WebSocketClient
import cn.tursom.ws.WebSocketHandler
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
import io.netty.util.internal.logging.InternalLogger
import io.netty.util.internal.logging.Slf4JLoggerFactory
class WsClientForward(uri: String, override var forward: Forward? = null) : Forward, WebSocketHandler {
companion object {
private val log: InternalLogger = Slf4JLoggerFactory.getInstance(WsClientForward::class.java)
}
val wsClient = WebSocketClient(uri, this)
init {
forward?.forward = this
}
override fun readMessage(client: WebSocketClient, msg: TextWebSocketFrame) {
readMessage(client, msg.content())
}
override fun readMessage(client: WebSocketClient, msg: ByteBuffer) {
forward?.write(msg) ?: msg.close()
}
override fun write(buffer: ByteBuffer) {
val future = wsClient.writeText(buffer)
future.addListener {
buffer.close()
} ?: buffer.close()
}
override fun onClose(client: WebSocketClient) {
forward?.close()
}
override fun onError(client: WebSocketClient, e: Throwable) {
log.error("exception caused on web socket client", e)
close()
}
override fun close() {
wsClient.close()
}
}

View File

@ -1,18 +0,0 @@
package cn.tursom.forward.server
import cn.tursom.core.encrypt.PublicKeyEncrypt
import cn.tursom.core.encrypt.RSA
import cn.tursom.forward.Forward
import cn.tursom.forward.datagram.DatagramServer
import cn.tursom.forward.impl.ActiveAesNettyForward
import cn.tursom.forward.impl.NettyForward
open class ActiveAesDatagramForwardServer(
port: Int,
readTimeout: Int? = 60,
writeTimeout: Int? = 60,
private val aesBuilder: () -> PublicKeyEncrypt = { RSA() },
forward: () -> Forward
) : DatagramServer(
port, { ActiveAesNettyForward(it, forward(), aesBuilder()) }, readTimeout, writeTimeout
)

View File

@ -1,17 +0,0 @@
package cn.tursom.forward.server
import cn.tursom.core.encrypt.PublicKeyEncrypt
import cn.tursom.core.encrypt.RSA
import cn.tursom.forward.Forward
import cn.tursom.forward.impl.ActiveAesNettyForward
import cn.tursom.forward.socket.SocketServer
open class ActiveAesSocketForwardServer(
port: Int,
readTimeout: Int? = 60,
writeTimeout: Int? = 60,
private val aesBuilder: () -> PublicKeyEncrypt = { RSA() },
forward: () -> Forward
) : SocketServer(
port, { ActiveAesNettyForward(it, forward(), aesBuilder()) }, readTimeout, writeTimeout
)

View File

@ -1,14 +0,0 @@
package cn.tursom.forward.server
import cn.tursom.forward.Forward
import cn.tursom.forward.datagram.DatagramServer
import cn.tursom.forward.impl.NettyForward
open class DatagramForwardServer(
port: Int,
readTimeout: Int? = 60,
writeTimeout: Int? = 60,
forward: () -> Forward
) : DatagramServer(
port, { NettyForward(it, forward()) }, readTimeout, writeTimeout
)

View File

@ -1,22 +0,0 @@
package cn.tursom.forward.server
import cn.tursom.core.encrypt.AES
import cn.tursom.core.encrypt.AESPool
import cn.tursom.core.encrypt.PublicKeyEncrypt
import cn.tursom.core.encrypt.RSA
import cn.tursom.core.pool.Pool
import cn.tursom.forward.Forward
import cn.tursom.forward.datagram.DatagramServer
import cn.tursom.forward.impl.NettyForward
import cn.tursom.forward.impl.PassiveAesNettyForward
open class PassiveAesDatagramForwardServer(
port: Int,
readTimeout: Int? = 60,
writeTimeout: Int? = 60,
private val aes: Pool<AES> = AESPool(128),
private val publicKeyEncryptBuilder: (ByteArray) -> PublicKeyEncrypt = { RSA(it) },
forward: () -> Forward
) : DatagramServer(
port, { PassiveAesNettyForward(it, forward(), aes.forceGet(), publicKeyEncryptBuilder) }, readTimeout, writeTimeout
)

View File

@ -1,21 +0,0 @@
package cn.tursom.forward.server
import cn.tursom.core.encrypt.AES
import cn.tursom.core.encrypt.AESPool
import cn.tursom.core.encrypt.PublicKeyEncrypt
import cn.tursom.core.encrypt.RSA
import cn.tursom.core.pool.Pool
import cn.tursom.forward.Forward
import cn.tursom.forward.impl.PassiveAesNettyForward
import cn.tursom.forward.socket.SocketServer
open class PassiveAesSocketForwardServer(
port: Int,
readTimeout: Int? = 60,
writeTimeout: Int? = 60,
private val aes: Pool<AES> = AESPool(128),
private val publicKeyEncryptBuilder: (ByteArray) -> PublicKeyEncrypt = { RSA(it) },
forward: () -> Forward
) : SocketServer(
port, { PassiveAesNettyForward(it, forward(), aes.forceGet(), publicKeyEncryptBuilder) }, readTimeout, writeTimeout
)

View File

@ -1,14 +0,0 @@
package cn.tursom.forward.server
import cn.tursom.forward.Forward
import cn.tursom.forward.impl.NettyForward
import cn.tursom.forward.socket.SocketServer
open class SocketForwardServer(
port: Int,
readTimeout: Int? = 60,
writeTimeout: Int? = 60,
forward: () -> Forward
) : SocketServer(
port, { NettyForward(it, forward()) }, readTimeout, writeTimeout
)

View File

@ -1,16 +0,0 @@
package cn.tursom.forward.server
import cn.tursom.forward.Forward
import cn.tursom.forward.impl.WSForward
import cn.tursom.forward.ws.WebSocketServer
open class WsForwardServer(
port: Int,
readTimeout: Int? = 60,
writeTimeout: Int? = 60,
webSocketPath: String = "/ws",
bodySize: Int = 512 * 1024,
forward: () -> Forward
) : WebSocketServer(
port, { WSForward(it, forward()) }, readTimeout, writeTimeout, webSocketPath, bodySize
)

View File

@ -1,42 +0,0 @@
package cn.tursom.forward.server
import cn.tursom.forward.impl.NIOForward
import java.net.InetSocketAddress
import java.net.SocketAddress
import java.nio.channels.DatagramChannel
/**
* 预组装流量转发服务器
* 监听wsPort端口并讲所有流量以UDP的方式转发到udpHost上
* 每个WS连接分别对应一个UDP连接
*/
class WsUDPForwardServer(
var udpHost: SocketAddress,
wsPort: Int,
readTimeout: Int? = 60,
writeTimeout: Int? = 60,
webSocketPath: String = "/ws",
bodySize: Int = 512 * 1024
) : WsForwardServer(
wsPort,
readTimeout,
writeTimeout,
webSocketPath,
bodySize,
{ NIOForward(udpHost, DatagramChannel.open()) }
) {
constructor(
udpHost: String,
udpPort: Int,
wsPort: Int,
readTimeout: Int? = 60,
writeTimeout: Int? = 60,
webSocketPath: String = "/ws",
bodySize: Int = 512 * 1024
) : this(InetSocketAddress(udpHost, udpPort), wsPort, readTimeout, writeTimeout, webSocketPath, bodySize)
fun setUdpHost(host: String, port: Int) {
udpHost = InetSocketAddress(host, port)
}
}

View File

@ -1,44 +0,0 @@
package cn.tursom.forward.socket
import cn.tursom.forward.NettyHandler
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelInitializer
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.timeout.ReadTimeoutHandler
import io.netty.handler.timeout.WriteTimeoutHandler
open class SocketServer(
val port: Int,
var handler: (SocketChannel) -> NettyHandler,
var readTimeout: Int? = 60,
var writeTimeout: Int? = 60
) {
private val bossGroup: EventLoopGroup = NioEventLoopGroup()
private val workerGroup: EventLoopGroup = NioEventLoopGroup()
private var b: ServerBootstrap? = null
private var future: ChannelFuture? = null
fun start() {
b = ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel::class.java)
.childHandler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()
if (readTimeout != null) pipeline.addLast(ReadTimeoutHandler(readTimeout!!))
if (writeTimeout != null) pipeline.addLast(WriteTimeoutHandler(writeTimeout!!))
try {
pipeline.addLast("handle", SocketServerHandler(handler(ch)))
} catch (e: Exception) {
ch.close()
}
}
})
future = b?.bind(port)
future?.sync()
}
}

View File

@ -1,26 +0,0 @@
package cn.tursom.forward.socket
import cn.tursom.forward.NettyHandler
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
class SocketServerHandler(private val handler: NettyHandler) : SimpleChannelInboundHandler<ByteBuf>() {
override fun channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf) {
handler.recvBytes(msg, ctx)
}
override fun channelInactive(ctx: ChannelHandlerContext) {
handler.onClose(ctx)
super.channelInactive(ctx)
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
handler.exceptionCaused(cause, ctx)
}
override fun channelActive(ctx: ChannelHandlerContext) {
handler.onOpen(ctx)
super.channelActive(ctx)
}
}

View File

@ -1,32 +0,0 @@
package cn.tursom.forward.ws
import cn.tursom.forward.ws.WebSocketHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
import io.netty.handler.codec.http.websocketx.WebSocketFrame
class WebSocketFrameHandler(private val handler: WebSocketHandler) : SimpleChannelInboundHandler<WebSocketFrame>() {
override fun channelRead0(ctx: ChannelHandlerContext, msg: WebSocketFrame) {
if (msg is TextWebSocketFrame) {
handler.recvStr(msg.content(), ctx)
} else if (msg is BinaryWebSocketFrame) {
handler.recvBytes(msg.content(), ctx)
}
}
override fun channelInactive(ctx: ChannelHandlerContext) {
handler.onClose(ctx)
super.channelInactive(ctx)
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
handler.exceptionCaused(cause, ctx)
}
override fun channelActive(ctx: ChannelHandlerContext) {
handler.onOpen(ctx)
super.channelActive(ctx)
}
}

View File

@ -1,34 +0,0 @@
package cn.tursom.forward.ws
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import java.nio.charset.StandardCharsets
interface WebSocketHandler {
fun onOpen(ctx: ChannelHandlerContext) {
}
fun recvStr(str: String, ctx: ChannelHandlerContext) {
}
fun recvStr(str: ByteBuf, ctx: ChannelHandlerContext) {
recvStr(str.toString(StandardCharsets.UTF_8), ctx)
}
fun recvBytes(bytes: ByteArray, ctx: ChannelHandlerContext) {
}
fun recvBytes(buf: ByteBuf, ctx: ChannelHandlerContext) {
val buffer = ByteArray(buf.readableBytes())
buf.readBytes(buffer)
recvBytes(buffer, ctx)
}
fun onClose(ctx: ChannelHandlerContext) {
}
fun exceptionCaused(cause: Throwable, ctx: ChannelHandlerContext) {
cause.printStackTrace()
ctx.close()
}
}

View File

@ -1,49 +0,0 @@
package cn.tursom.forward.ws
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelInitializer
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler
import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.handler.timeout.ReadTimeoutHandler
import io.netty.handler.timeout.WriteTimeoutHandler
open class WebSocketServer(
val port: Int,
var handler: (SocketChannel) -> WebSocketHandler,
var readTimeout: Int? = 60,
var writeTimeout: Int? = 60,
var webSocketPath: String = "/ws",
var bodySize: Int = 512 * 1024
) {
private val bossGroup: EventLoopGroup = NioEventLoopGroup()
private val workerGroup: EventLoopGroup = NioEventLoopGroup()
private var b: ServerBootstrap? = null
private var future: ChannelFuture? = null
fun start() {
b = ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel::class.java)
.childHandler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()
if (readTimeout != null) pipeline.addLast(ReadTimeoutHandler(readTimeout!!))
if (writeTimeout != null) pipeline.addLast(WriteTimeoutHandler(writeTimeout!!))
pipeline.addLast("codec", HttpServerCodec())
.addLast("aggregator", HttpObjectAggregator(bodySize))
.addLast("http-chunked", ChunkedWriteHandler())
pipeline.addLast("ws", WebSocketServerProtocolHandler(webSocketPath))
pipeline.addLast("handle", WebSocketFrameHandler(handler(ch)))
}
})
future = b?.bind(port)
future?.sync()
}
}

View File

@ -1,3 +0,0 @@
dependencies {
compile project(":")
}

View File

@ -1,2 +0,0 @@
dependencies {
}

View File

@ -1,270 +0,0 @@
package cn.tursom.math
import java.util.*
import kotlin.math.cos
import kotlin.math.sin
data class Complex(
var r: Double = 0.0,
var i: Double = 0.0,
) {
constructor(r: Int, i: Int) : this(r.toDouble(), i.toDouble())
operator fun plus(complex: Complex) = Complex(r + complex.r, i + complex.i)
operator fun minus(complex: Complex) = Complex(r - complex.r, i - complex.i)
operator fun times(complex: Complex) = Complex(r * complex.r, i * complex.i)
operator fun plusAssign(complex: Complex) {
r += complex.r
i += complex.i
}
operator fun timesAssign(complex: Complex) {
r *= complex.r
i *= complex.i
}
// return abs/modulus/magnitude
fun abs(): Double {
return Math.hypot(r, i)
}
// return angle/phase/argument, normalized to be between -pi and pi
fun phase(): Double {
return Math.atan2(i, r)
}
// return a new object whose value is (this * alpha)
fun scale(alpha: Double): Complex {
return Complex(alpha * r, alpha * i)
}
fun conjugate(): Complex {
return Complex(r, -i)
}
fun reciprocal(): Complex {
val scale = r * r + i * i
return Complex(r / scale, -i / scale)
}
// return the real or imaginary part
fun re(): Double {
return r
}
fun im(): Double {
return r
}
// return a / b
fun divides(b: Complex): Complex {
val a = this
return a.times(b.reciprocal())
}
// return a new Complex object whose value is the complex exponential of this
fun exp(): Complex {
return Complex(Math.exp(r) * Math.cos(i), Math.exp(r) * Math.sin(i))
}
// return a new Complex object whose value is the complex sine of this
fun sin(): Complex {
return Complex(Math.sin(r) * Math.cosh(i), Math.cos(r) * Math.sinh(i))
}
// return a new Complex object whose value is the complex cosine of this
fun cos(): Complex {
return Complex(Math.cos(r) * Math.cosh(i), -Math.sin(r) * Math.sinh(i))
}
// return a new Complex object whose value is the complex tangent of this
fun tan(): Complex {
return sin().divides(cos())
}
override fun toString(): String {
return "($r,$i)"
}
}
object FFT {
fun fft(x: Array<out Complex>): Array<Complex> {
val n = x.size
if (n == 1) return arrayOf(x[0])
require(n % 2 == 0) { "n is not a power of 2" }
val even = Array(n / 2) { k ->
x[2 * k]
}
val evenFFT = fft(even)
for (k in 0 until n / 2) {
even[k] = x[2 * k + 1]
}
val oddFFT = fft(even)
val y = arrayOfNulls<Complex>(n)
for (k in 0 until n / 2) {
val kth = -2 * k * Math.PI / n
val wk = Complex(cos(kth), sin(kth))
y[k] = evenFFT[k].plus(wk.times(oddFFT[k]))
y[k + n / 2] = evenFFT[k].minus(wk.times(oddFFT[k]))
}
@Suppress("UNCHECKED_CAST")
return y as Array<Complex>
}
// compute the inverse FFT of x[], assuming its length n is a power of 2
fun ifft(x: Array<out Complex>): Array<out Complex> {
val n = x.size
var y = Array(n) { i ->
x[i].conjugate()
}
// compute forward FFT
y = fft(y as Array<out Complex>)
// take conjugate again
for (i in 0 until n) {
y[i] = y[i].conjugate()
}
// divide by n
for (i in 0 until n) {
y[i] = y[i].scale(1.0 / n)
}
return y
}
fun cconvolve(x: Array<out Complex>, y: Array<out Complex>): Array<out Complex> {
require(x.size == y.size) { "Dimensions don't agree" }
val n = x.size
val a = fft(x)
val b = fft(y)
val c = Array(n) { i ->
a[i].times(b[i])
}
return ifft(c)
}
fun convolve(x: Array<out Complex>, y: Array<out Complex>): Array<out Complex> {
val ZERO = Complex(0, 0)
val a = Array(2 * x.size) { i ->
if (i in x.indices) {
x[i]
} else {
ZERO
}
}
val b = Array(2 * y.size) { i ->
if (i in y.indices) {
y[i]
} else {
ZERO
}
}
return cconvolve(a, b)
}
// compute the DFT of x[] via brute force (n^2 time)
fun dft(x: Array<out Complex>): Array<out Complex> {
val n = x.size
val ZERO = Complex(0, 0)
val y = Array<Complex>(n) { k ->
val data = ZERO
for (j in 0 until n) {
val power = k * j % n
val kth = -2 * power * Math.PI / n
val wkj = Complex(cos(kth), sin(kth))
data.plusAssign(x[j] * wkj)
}
data
}
return y
}
// display an array of Complex numbers to standard output
fun show(x: Array<out Complex?>, title: String?) {
println(title)
println("-------------------")
for (i in x.indices) {
println(x[i])
}
println()
}
/***************************************************************************
* Test client and sample execution
*
* % java FFT 4
* x
* -------------------
* -0.03480425839330703
* 0.07910192950176387
* 0.7233322451735928
* 0.1659819820667019
*
* y = fft(x)
* -------------------
* 0.9336118983487516
* -0.7581365035668999 + 0.08688005256493803i
* 0.44344407521182005
* -0.7581365035668999 - 0.08688005256493803i
*
* z = ifft(y)
* -------------------
* -0.03480425839330703
* 0.07910192950176387 + 2.6599344570851287E-18i
* 0.7233322451735928
* 0.1659819820667019 - 2.6599344570851287E-18i
*
* c = cconvolve(x, x)
* -------------------
* 0.5506798633981853
* 0.23461407150576394 - 4.033186818023279E-18i
* -0.016542951108772352
* 0.10288019294318276 + 4.033186818023279E-18i
*
* d = convolve(x, x)
* -------------------
* 0.001211336402308083 - 3.122502256758253E-17i
* -0.005506167987577068 - 5.058885073636224E-17i
* -0.044092969479563274 + 2.1934338938072244E-18i
* 0.10288019294318276 - 3.6147323062478115E-17i
* 0.5494685269958772 + 3.122502256758253E-17i
* 0.240120239493341 + 4.655566391833896E-17i
* 0.02755001837079092 - 2.1934338938072244E-18i
* 4.01805098805014E-17i
*
*/
@JvmStatic
fun main(args: Array<String>) {
//val n = args[0].toInt()
val n = 8
val x = Array(n) { i ->
Complex(sin(i.toDouble()), 0.0)
}
show(x, "x")
// FFT of original data
val y = fft(x)
show(y, "y = fft(x)")
// FFT of original data
val y2 = dft(x)
show(y2, "y2 = dft(x)")
// take inverse FFT
val z = ifft(y)
show(z, "z = ifft(y)")
// circular convolution of x with itself
val c = cconvolve(x, x)
show(c, "c = cconvolve(x, x)")
// linear convolution of x with itself
val d = convolve(x, x)
show(d, "d = convolve(x, x)")
}
}

View File

@ -1,4 +0,0 @@
dependencies {
compile project(":")
compile project(":utils:delegation")
}

View File

@ -1,10 +0,0 @@
package cn.tursom.observer
/**
* 标识一个属性可以被指定的 FieldChangeListener 监听
* 属性的实现者应该实现相应的逻辑
*/
@RequiresOptIn
@Retention(AnnotationRetention.BINARY)
@Target(AnnotationTarget.FIELD, AnnotationTarget.PROPERTY, AnnotationTarget.FUNCTION, AnnotationTarget.CLASS)
annotation class Observable

View File

@ -1,64 +0,0 @@
package cn.tursom.observer
import cn.tursom.core.cast
import cn.tursom.delegation.DecoratorMutableDelegatedField
import cn.tursom.delegation.DelegatedFieldAttachmentKey
import cn.tursom.delegation.MutableDelegatedField
import java.util.concurrent.ConcurrentLinkedDeque
import kotlin.reflect.KProperty
@Observable
class ObservableMutableDelegatedField<T, V>(
override val mutableDelegatedField: MutableDelegatedField<T, V>,
) : MutableDelegatedField<T, V> by mutableDelegatedField, DecoratorMutableDelegatedField<T, V> {
companion object : DelegatedFieldAttachmentKey<Boolean>
override fun <K> get(key: DelegatedFieldAttachmentKey<K>): K? {
return if (key == Companion) true.cast() else super.get(key)
}
private val listenerList = ConcurrentLinkedDeque<T.(old: V, new: V) -> Unit>()
override fun setValue(thisRef: T, property: KProperty<*>, value: V) {
val oldValue = getValue()
listenerList.forEach {
thisRef.it(oldValue, value)
}
mutableDelegatedField.setValue(thisRef, property, value)
}
override fun valueOnSet(thisRef: T, property: KProperty<*>, value: V, oldValue: V) {
listenerList.forEach {
thisRef.it(oldValue, value)
}
mutableDelegatedField.valueOnSet(thisRef, property, value, oldValue)
}
fun addListener(listener: T.(old: V, new: V) -> Unit): Observer<T, V> {
var catch: (T.(old: V, new: V, e: Throwable) -> Unit)? = null
var finally: (T.(old: V, new: V) -> Unit)? = null
listenerList.add { old, new ->
try {
listener(old, new)
} catch (e: Throwable) {
catch?.invoke(this, old, new, e)
} finally {
finally?.invoke(this, old, new)
}
}
return object : Observer<T, V> {
override fun cancel() = listenerList.remove(listener)
override fun catch(handler: T.(old: V, new: V, e: Throwable) -> Unit): Observer<T, V> {
catch = handler
return this
}
override fun finally(handler: T.(old: V, new: V) -> Unit): Observer<T, V> {
finally = handler
return this
}
}
}
}

View File

@ -1,6 +0,0 @@
package cn.tursom.observer
interface ObservableObserver<out T, V> : Observer<T, V> {
infix fun addListener(listener: T.(old: V, new: V) -> Unit): Observer<T, V>
override fun catch(handler: T.(old: V, new: V, e: Throwable) -> Unit): ObservableObserver<T, V>
}

View File

@ -1,7 +0,0 @@
package cn.tursom.observer
interface Observer<out T, V> {
fun cancel(): Boolean
infix fun catch(handler: T.(old: V, new: V, e: Throwable) -> Unit): Observer<T, V>
infix fun finally(handler: T.(old: V, new: V) -> Unit): Observer<T, V>
}

View File

@ -1,14 +0,0 @@
package cn.tursom.observer
class UnmonitoredFieldException : Exception {
constructor() : super()
constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(cause: Throwable?) : super(cause)
constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(
message,
cause,
enableSuppression,
writableStackTrace
)
}

View File

@ -1,120 +0,0 @@
package cn.tursom.observer
import cn.tursom.core.cast
import cn.tursom.core.receiver
import cn.tursom.delegation.*
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.reflect.KMutableProperty0
import kotlin.reflect.KProperty0
import kotlin.reflect.jvm.isAccessible
@Observable
fun <V> KMutableProperty0<V>.listenable(): MutableDelegatedField<Any, V> {
isAccessible = true
val delegate = getDelegate()
return if (delegate is MutableDelegatedField<*, *> && delegate[ObservableMutableDelegatedField] == true) {
delegate.cast()
} else {
ObservableMutableDelegatedField(KPropertyMutableDelegatedField(cast()))
}
}
@Observable
fun <V> listenable(initValue: V): MutableDelegatedField<Any, V> = ObservableMutableDelegatedField(
MutableDelegatedFieldValue(initValue)
)
@Observable
fun <T, V> MutableDelegatedField<T, V>.listenable(): MutableDelegatedField<T, V> =
ObservableMutableDelegatedField(this)
@OptIn(Observable::class)
fun <V> KProperty0<V>.getListenableMutableDelegatedField(): ObservableMutableDelegatedField<Any, V>? {
isAccessible = true
var delegate = getDelegate() ?: getDelegate(receiver, name)
if (delegate is DelegatedField<*, *>) {
while (true) {
if (delegate is ObservableMutableDelegatedField<*, *>) {
return delegate.cast()
}
if (delegate is DecoratorDelegatedField<*, *>) {
delegate = delegate.delegatedField
} else {
break
}
}
} else if (delegate is KProperty0<*>) {
return delegate.cast<KProperty0<V>>().getListenableMutableDelegatedField()
}
return null
}
inline fun <T, V> T.addChangeListener(
property: T.() -> KProperty0<V>,
): ObservableObserver<T, V> {
val kProperty0 = property()
@OptIn(Observable::class)
val delegatedField = kProperty0
.getListenableMutableDelegatedField()
.cast<ObservableMutableDelegatedField<T, V>?>()
?: throw UnmonitoredFieldException(kProperty0.toString())
return object : ObservableObserver<T, V> {
private val selfList = ConcurrentLinkedQueue<Observer<T, V>>()
override fun addListener(listener: T.(old: V, new: V) -> Unit): Observer<T, V> {
@OptIn(Observable::class)
val listener1 = delegatedField.addListener(listener)
selfList.add(listener1)
return listener1
}
override fun catch(handler: T.(old: V, new: V, e: Throwable) -> Unit): ObservableObserver<T, V> {
selfList.forEach {
it.catch(handler)
}
return this
}
override fun cancel(): Boolean {
selfList.forEach {
it.cancel()
}
return true
}
override fun finally(handler: T.(old: V, new: V) -> Unit): Observer<T, V> {
selfList.forEach {
it.finally(handler)
}
return this
}
}
}
infix operator fun <T, V> ObservableObserver<T, V>.invoke(listener: T.(old: V, new: V) -> Unit): Observer<T, V> =
addListener(listener)
infix fun <T, V> ObservableObserver<T, V>.with(listener: T.(old: V, new: V) -> Unit): Observer<T, V> =
addListener(listener)
infix fun <T, V> ObservableObserver<T, V>.and(listener: T.(old: V, new: V) -> Unit): Observer<T, V> =
addListener(listener)
infix operator fun <T, V> ObservableObserver<T, V>.plus(listener: T.(old: V, new: V) -> Unit): Observer<T, V> =
addListener(listener)
infix fun <V> KProperty0<V>.listen(listener: Any.(old: V, new: V) -> Unit): Observer<Any, V> {
@OptIn(Observable::class)
return getListenableMutableDelegatedField()
?.addListener(listener)
?: throw UnmonitoredFieldException(toString())
}
fun <T, V> T.listen(property: KProperty0<V>, listener: T.(old: V, new: V) -> Unit): Observer<Any, V> {
@OptIn(Observable::class)
return property.getListenableMutableDelegatedField()
?.addListener(listener.cast())
?: throw UnmonitoredFieldException(toString())
}