diff --git a/AsyncSocket/build.gradle b/AsyncSocket/build.gradle index 90e629e..c3cbc6a 100644 --- a/AsyncSocket/build.gradle +++ b/AsyncSocket/build.gradle @@ -3,7 +3,6 @@ dependencies { compile project(":log") // kotlin 协程 - compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9' - compile project(":log") + compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2' testRuntime group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-debug', version: '1.3.9' } \ No newline at end of file diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/channel/AsyncNioChannel.kt b/AsyncSocket/src/main/kotlin/cn/tursom/channel/AsyncNioChannel.kt index 8c2345b..6fce8d6 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/channel/AsyncNioChannel.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/channel/AsyncNioChannel.kt @@ -22,29 +22,6 @@ interface AsyncNioChannel : AsyncChannel { val nioThread: NioThread val channel: SelectableChannel get() = key.channel() - private inline fun operate(action: () -> T): T { - return try { - action() - } catch (e: Exception) { - waitMode() - throw e - } - } - - suspend fun write(timeout: Long, action: () -> T): T { - return operate { - waitWrite(timeout) - action() - } - } - - suspend fun read(timeout: Long, action: () -> T): T { - return operate { - waitRead(timeout) - action() - } - } - override suspend fun write(buffer: Array, timeout: Long): Long override suspend fun read(buffer: Array, timeout: Long): Long override suspend fun write(buffer: ByteBuffer, timeout: Long): Int = write(arrayOf(buffer), timeout).toInt() @@ -56,7 +33,7 @@ interface AsyncNioChannel : AsyncChannel { file: FileChannel, position: Long, count: Long, - timeout: Long + timeout: Long, ): Long = write(timeout) { file.transferTo(position, count, channel as WritableByteChannel) } @@ -65,7 +42,7 @@ interface AsyncNioChannel : AsyncChannel { file: FileChannel, position: Long, count: Long, - timeout: Long + timeout: Long, ): Long = read(timeout) { file.transferFrom(channel as ReadableByteChannel, position, count) } @@ -176,4 +153,27 @@ interface AsyncNioChannel : AsyncChannel { //val timer = StaticWheelTimer.timer val timer: Timer = WheelTimer.timer } +} + +inline fun AsyncNioChannel.operate(action: () -> T): T { + return try { + action() + } catch (e: Exception) { + waitMode() + throw e + } +} + +suspend fun AsyncNioChannel.write(timeout: Long, action: () -> T): T { + return operate { + waitWrite(timeout) + action() + } +} + +suspend fun AsyncNioChannel.read(timeout: Long, action: () -> T): T { + return operate { + waitRead(timeout) + action() + } } \ No newline at end of file diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/datagram/AsyncDatagram.kt b/AsyncSocket/src/main/kotlin/cn/tursom/datagram/AsyncDatagram.kt index 3f43844..b9fb3bd 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/datagram/AsyncDatagram.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/datagram/AsyncDatagram.kt @@ -1,6 +1,8 @@ package cn.tursom.datagram import cn.tursom.channel.AsyncNioChannel +import cn.tursom.channel.read +import cn.tursom.channel.write import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.read import cn.tursom.core.buffer.write @@ -12,8 +14,7 @@ interface AsyncDatagram : AsyncNioChannel { override val channel: DatagramChannel override fun getBuffed(pool: MemoryPool): BufferedAsyncDatagram = BufferedNioDatagram(pool, this) - override suspend fun write(buffer: Array, timeout: Long): Long = - write(timeout) { channel.write(buffer) } + override suspend fun write(buffer: Array, timeout: Long): Long = channel.write(buffer) override suspend fun read(buffer: Array, timeout: Long): Long = read(timeout) { channel.read(buffer) } diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/datagram/NioDatagram.kt b/AsyncSocket/src/main/kotlin/cn/tursom/datagram/NioDatagram.kt index d245720..bca2924 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/datagram/NioDatagram.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/datagram/NioDatagram.kt @@ -8,16 +8,17 @@ import java.nio.channels.SelectionKey open class NioDatagram( override val channel: DatagramChannel, override val key: SelectionKey, - override val nioThread: NioThread + override val nioThread: NioThread, ) : AsyncDatagram { + companion object { + suspend operator fun invoke(host: String, port: Int) = AsyncDatagramClient.connect(host, port) + suspend operator fun invoke(address: SocketAddress) = AsyncDatagramClient.connect(address) + } + override val open: Boolean get() = channel.isOpen && key.isValid override val remoteAddress: SocketAddress get() = channel.remoteAddress override fun writeMode() {} - override suspend fun write(timeout: Long, action: () -> T): T { - return action() - } - override fun close() { if (channel.isOpen || key.isValid) { nioThread.execute { diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt b/AsyncSocket/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt index 879ad86..56d24ab 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt @@ -2,6 +2,8 @@ package cn.tursom.socket import cn.tursom.channel.AsyncNioChannel import cn.tursom.channel.BufferedAsyncChannel +import cn.tursom.channel.read +import cn.tursom.channel.write import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.read import cn.tursom.core.buffer.write diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/socket/NioSocket.kt b/AsyncSocket/src/main/kotlin/cn/tursom/socket/NioSocket.kt index fd79217..7ff3dba 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/socket/NioSocket.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/socket/NioSocket.kt @@ -2,6 +2,8 @@ package cn.tursom.socket import cn.tursom.channel.AsyncChannel.Companion.emptyBufferCode import cn.tursom.channel.AsyncChannel.Companion.emptyBufferLongCode +import cn.tursom.channel.read +import cn.tursom.channel.write import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.read import cn.tursom.core.buffer.write @@ -12,7 +14,18 @@ import java.nio.channels.SocketChannel /** * 异步协程套接字对象 */ -class NioSocket(override val key: SelectionKey, override val nioThread: NioThread) : AsyncSocket { +class NioSocket internal constructor( + override val key: SelectionKey, + override val nioThread: NioThread, +) : AsyncSocket { + companion object { + suspend operator fun invoke( + host: String, + port: Int, + timeout: Long = 0, + ) = NioClient.connect(host, port, timeout) + } + override val channel: SocketChannel = key.channel() as SocketChannel override val open: Boolean get() = channel.isOpen && key.isValid diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/BuffedNioServer.kt b/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/BufferedNioServer.kt similarity index 97% rename from AsyncSocket/src/main/kotlin/cn/tursom/socket/server/BuffedNioServer.kt rename to AsyncSocket/src/main/kotlin/cn/tursom/socket/server/BufferedNioServer.kt index 834b19f..2b3a2a9 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/BuffedNioServer.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/socket/server/BufferedNioServer.kt @@ -13,7 +13,7 @@ import kotlinx.coroutines.GlobalScope * 带内存池的 NIO 套接字服务器。 * 在处理结束后会自动释放由内存池分配的内存 */ -open class BuffedNioServer( +open class BufferedNioServer( port: Int, val memoryPool: MemoryPool, backlog: Int = 50, diff --git a/AsyncSocket/src/test/kotlin/cn/tursom/socket/test.kt b/AsyncSocket/src/test/kotlin/cn/tursom/socket/test.kt new file mode 100644 index 0000000..148aa56 --- /dev/null +++ b/AsyncSocket/src/test/kotlin/cn/tursom/socket/test.kt @@ -0,0 +1,34 @@ +package cn.tursom.socket + +import cn.tursom.core.pool.HeapMemoryPool +import cn.tursom.socket.server.BufferedNioServer +import kotlinx.coroutines.runBlocking +import java.util.concurrent.ConcurrentHashMap + +private val sockets = ConcurrentHashMap().keySet(Unit) +val handler: suspend BufferedAsyncSocket.() -> Unit = { + sockets.add(this) + try { + while (open) { + val read = read(60 * 1000) + println(read.toString(read.readable)) + sockets.forEach { + System.err.println(it) + it.write(read) + } + } + } finally { + sockets.remove(this) + } +} + +fun main() { + val server = BufferedNioServer(12345, handler = handler) + server.run() + runBlocking { + BufferedNioSocket(NioSocket("localhost", 12345), server.memoryPool).use { socket -> + socket.write("hello") + println(socket.read().getString()) + } + } +} \ No newline at end of file diff --git a/microservices/build.gradle b/microservices/build.gradle index 98ae62a..d12e5bb 100644 --- a/microservices/build.gradle +++ b/microservices/build.gradle @@ -3,5 +3,5 @@ dependencies { implementation project(":AsyncSocket") // kotlin 协程 - implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1' + implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2' } diff --git a/settings.gradle b/settings.gradle index c17cacc..b8462db 100644 --- a/settings.gradle +++ b/settings.gradle @@ -17,4 +17,6 @@ include 'utils:csv' include 'utils:delegation' include 'utils:observer' include 'utils:TrafficForward' -include 'utils:performance-test' \ No newline at end of file +include 'utils:performance-test' +include 'utils:math' +include 'utils:json' \ No newline at end of file diff --git a/socket/socket-async/build.gradle b/socket/socket-async/build.gradle index 78554ba..2a34a6a 100644 --- a/socket/socket-async/build.gradle +++ b/socket/socket-async/build.gradle @@ -3,5 +3,5 @@ dependencies { compile project(":socket") // kotlin 协程 - compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1' + compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2' } \ No newline at end of file diff --git a/socket/socket-async/src/main/kotlin/cn/tursom/socket/server/BuffedAsyncNioServer.kt b/socket/socket-async/src/main/kotlin/cn/tursom/socket/server/BuffedAsyncNioServer.kt index 140f9dc..60034aa 100644 --- a/socket/socket-async/src/main/kotlin/cn/tursom/socket/server/BuffedAsyncNioServer.kt +++ b/socket/socket-async/src/main/kotlin/cn/tursom/socket/server/BuffedAsyncNioServer.kt @@ -3,6 +3,7 @@ package cn.tursom.socket.server import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.pool.DirectMemoryPool import cn.tursom.core.pool.MemoryPool +import cn.tursom.core.pool.invoke import cn.tursom.socket.AsyncNioSocket /** @@ -11,20 +12,20 @@ import cn.tursom.socket.AsyncNioSocket * 当内存池用完之后会换为 ByteArrayByteBuffer。 */ class BuffedAsyncNioServer( - port: Int, - memoryPool: MemoryPool, - backlog: Int = 50, - handler: suspend AsyncNioSocket.(buffer: ByteBuffer) -> Unit + port: Int, + memoryPool: MemoryPool, + backlog: Int = 50, + handler: suspend AsyncNioSocket.(buffer: ByteBuffer) -> Unit, ) : IAsyncNioServer by AsyncNioServer(port, backlog, { memoryPool { handler(it) } }) { constructor( - port: Int, - blockSize: Int = 1024, - blockCount: Int = 128, - backlog: Int = 50, - handler: suspend AsyncNioSocket.(buffer: ByteBuffer) -> Unit + port: Int, + blockSize: Int = 1024, + blockCount: Int = 128, + backlog: Int = 50, + handler: suspend AsyncNioSocket.(buffer: ByteBuffer) -> Unit, ) : this(port, DirectMemoryPool(blockSize, blockCount), backlog, handler) } \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/CurrentTimeMillisClock.kt b/src/main/kotlin/cn/tursom/core/CurrentTimeMillisClock.kt index 0731255..d23c40e 100644 --- a/src/main/kotlin/cn/tursom/core/CurrentTimeMillisClock.kt +++ b/src/main/kotlin/cn/tursom/core/CurrentTimeMillisClock.kt @@ -2,6 +2,7 @@ package cn.tursom.core import java.util.concurrent.TimeUnit import java.util.concurrent.ScheduledThreadPoolExecutor +import kotlin.concurrent.thread object CurrentTimeMillisClock { @@ -11,11 +12,12 @@ object CurrentTimeMillisClock { val now get() = tick init { - ScheduledThreadPoolExecutor(1) { runnable -> - val thread = Thread(runnable, "current-time-millis") - thread.isDaemon = true - thread - }.scheduleAtFixedRate({ tick = System.currentTimeMillis() }, 1, 1, TimeUnit.MILLISECONDS) + thread(name = "current-time-millis", isDaemon = true) { + while (true) { + tick = System.currentTimeMillis() + Thread.sleep(1) + } + } } //val now get() = System.currentTimeMillis() diff --git a/src/main/kotlin/cn/tursom/core/Finalized.kt b/src/main/kotlin/cn/tursom/core/Finalized.kt new file mode 100644 index 0000000..2525b22 --- /dev/null +++ b/src/main/kotlin/cn/tursom/core/Finalized.kt @@ -0,0 +1,34 @@ +package cn.tursom.core + +import java.lang.ref.PhantomReference +import java.lang.ref.Reference +import java.lang.ref.ReferenceQueue +import java.util.concurrent.ConcurrentHashMap +import kotlin.concurrent.thread + +object Finalized { + private val referenceQueue = ReferenceQueue() + private val handlerMap = ConcurrentHashMap, () -> Unit>() + + init { + thread(isDaemon = true) { + while (true) { + val action = handlerMap.remove(referenceQueue.remove() ?: return@thread) ?: continue + try { + action() + } catch (e: Exception) { + } + } + } + } + + fun listen(obj: T, action: () -> Unit): Reference = PhantomReference(obj, referenceQueue).also { + handlerMap[it] = action + } + + fun remove(reference: Reference<*>) { + handlerMap.remove(reference) + } +} + +fun T.finalized(action: () -> Unit) = Finalized.listen(this, action) \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/Members.kt b/src/main/kotlin/cn/tursom/core/Members.kt index e4a49bc..30812ca 100644 --- a/src/main/kotlin/cn/tursom/core/Members.kt +++ b/src/main/kotlin/cn/tursom/core/Members.kt @@ -6,15 +6,23 @@ import java.lang.reflect.Field import java.lang.reflect.Member import java.lang.reflect.Modifier -val fieldModifiers: Field = Field::class.java.getDeclaredField("modifiers").apply { - isAccessible = true +private val fieldModifiersField: Field? = try { + Field::class.java.getDeclaredField("modifiers").apply { + isAccessible = true + } +} catch (e: Throwable) { + null +} + +var fieldModifiers: (Field, Int) -> Unit = { field, modifer -> + fieldModifiersField!!.set(field, modifer) } var Field.public: Boolean get() = Modifier.isPublic(this.modifiers) set(value) { val modifier = Modifier.PUBLIC - fieldModifiers.set( + fieldModifiers( this, if (value) { modifiers or modifier @@ -28,7 +36,7 @@ var Field.private: Boolean get() = Modifier.isPrivate(this.modifiers) set(value) { val modifier = Modifier.PRIVATE - fieldModifiers.set( + fieldModifiers( this, if (value) { modifiers or modifier @@ -42,7 +50,7 @@ var Field.protected: Boolean get() = Modifier.isProtected(this.modifiers) set(value) { val modifier = Modifier.PROTECTED - fieldModifiers.set( + fieldModifiers( this, if (value) { modifiers or modifier @@ -56,7 +64,7 @@ var Field.static: Boolean get() = Modifier.isStatic(this.modifiers) set(value) { val modifier = Modifier.STATIC - fieldModifiers.set( + fieldModifiers( this, if (value) { modifiers or modifier @@ -70,7 +78,7 @@ var Field.final: Boolean get() = Modifier.isFinal(this.modifiers) set(value) { val modifier = Modifier.FINAL - fieldModifiers.set( + fieldModifiers( this, if (value) { modifiers or modifier @@ -84,7 +92,7 @@ var Field.synchronized: Boolean get() = Modifier.isSynchronized(this.modifiers) set(value) { val modifier = Modifier.SYNCHRONIZED - fieldModifiers.set( + fieldModifiers( this, if (value) { modifiers or modifier @@ -98,7 +106,7 @@ var Field.volatile: Boolean get() = Modifier.isVolatile(this.modifiers) set(value) { val modifier = Modifier.VOLATILE - fieldModifiers.set( + fieldModifiers( this, if (value) { modifiers or modifier @@ -112,7 +120,7 @@ var Field.transient: Boolean get() = Modifier.isTransient(this.modifiers) set(value) { val modifier = Modifier.TRANSIENT - fieldModifiers.set( + fieldModifiers( this, if (value) { modifiers or modifier @@ -126,7 +134,7 @@ var Field.native: Boolean get() = Modifier.isNative(this.modifiers) set(value) { val modifier = Modifier.NATIVE - fieldModifiers.set( + fieldModifiers( this, if (value) { modifiers or modifier @@ -140,7 +148,7 @@ var Field.`interface`: Boolean get() = Modifier.isInterface(this.modifiers) set(value) { val modifier = Modifier.INTERFACE - fieldModifiers.set( + fieldModifiers( this, if (value) { modifiers or modifier @@ -154,7 +162,7 @@ var Field.abstract: Boolean get() = Modifier.isAbstract(this.modifiers) set(value) { val modifier = Modifier.ABSTRACT - fieldModifiers.set( + fieldModifiers( this, if (value) { modifiers or modifier @@ -168,7 +176,7 @@ var Field.strict: Boolean get() = Modifier.isStrict(this.modifiers) set(value) { val modifier = Modifier.STRICT - fieldModifiers.set( + fieldModifiers( this, if (value) { modifiers or modifier diff --git a/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt b/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt index 905d82b..4e223cc 100644 --- a/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt @@ -99,6 +99,7 @@ interface ByteBuffer : Closeable { fun toString(size: Int): String { val bytes = getBytes(size) + // 将测试的字节返还回来 readPosition -= bytes.size return String(bytes) } diff --git a/src/main/kotlin/cn/tursom/core/buffer/ClosedBufferException.kt b/src/main/kotlin/cn/tursom/core/buffer/ClosedBufferException.kt new file mode 100644 index 0000000..20b7886 --- /dev/null +++ b/src/main/kotlin/cn/tursom/core/buffer/ClosedBufferException.kt @@ -0,0 +1,9 @@ +package cn.tursom.core.buffer + +class ClosedBufferException : Exception { + constructor() : super() + constructor(message: String?) : super(message) + constructor(message: String?, cause: Throwable?) : super(message, cause) + constructor(cause: Throwable?) : super(cause) + constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(message, cause, enableSuppression, writableStackTrace) +} \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/buffer/ProxyByteBuffer.kt b/src/main/kotlin/cn/tursom/core/buffer/ProxyByteBuffer.kt index 54fa742..a3b3439 100644 --- a/src/main/kotlin/cn/tursom/core/buffer/ProxyByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/buffer/ProxyByteBuffer.kt @@ -1,6 +1,4 @@ -package cn.tursom.buffer - -import cn.tursom.core.buffer.ByteBuffer +package cn.tursom.core.buffer interface ProxyByteBuffer : ByteBuffer { val agent: ByteBuffer diff --git a/src/main/kotlin/cn/tursom/core/buffer/impl/CloseSafeByteBuffer.kt b/src/main/kotlin/cn/tursom/core/buffer/impl/CloseSafeByteBuffer.kt new file mode 100644 index 0000000..10c3889 --- /dev/null +++ b/src/main/kotlin/cn/tursom/core/buffer/impl/CloseSafeByteBuffer.kt @@ -0,0 +1,63 @@ +package cn.tursom.core.buffer.impl + +import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.ClosedBufferException +import cn.tursom.core.buffer.ProxyByteBuffer +import java.util.concurrent.atomic.AtomicBoolean + +open class CloseSafeByteBuffer( + override val agent: ByteBuffer, +) : ByteBuffer by agent, ProxyByteBuffer { + /** + * 这个变量保证 buffer 不会被释放多次 + */ + private val open = AtomicBoolean(true) + + override val closed: Boolean + get() = !open.get() + + fun tryClose() = open.compareAndSet(true, false) + + override fun readBuffer(): java.nio.ByteBuffer { + if (closed) { + throw ClosedBufferException("byte buffer has closed.") + } + return agent.readBuffer() + } + + override fun writeBuffer(): java.nio.ByteBuffer { + if (closed) { + throw ClosedBufferException("byte buffer has closed.") + } + return agent.writeBuffer() + } + + override val array: ByteArray + get() { + if (closed) { + throw ClosedBufferException("byte buffer has closed.") + } + return agent.array + } + + override fun reset() { + if (closed) { + throw ClosedBufferException("byte buffer has closed.") + } + return agent.reset() + } + + override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer { + if (closed) { + throw ClosedBufferException("byte buffer has closed.") + } + return agent.slice(position, size, readPosition, writePosition) + } + + override fun resize(newSize: Int): Boolean { + if (closed) { + throw ClosedBufferException("byte buffer has closed.") + } + return agent.resize(newSize) + } +} \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/buffer/impl/InstantByteBuffer.kt b/src/main/kotlin/cn/tursom/core/buffer/impl/InstantByteBuffer.kt index 45ad3f9..490516a 100644 --- a/src/main/kotlin/cn/tursom/core/buffer/impl/InstantByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/buffer/impl/InstantByteBuffer.kt @@ -1,7 +1,7 @@ package cn.tursom.core.buffer.impl import cn.tursom.core.buffer.ByteBuffer -import cn.tursom.buffer.ProxyByteBuffer +import cn.tursom.core.buffer.ProxyByteBuffer import cn.tursom.core.pool.MemoryPool class InstantByteBuffer( diff --git a/src/main/kotlin/cn/tursom/core/buffer/impl/MarkedByteBuffer.kt b/src/main/kotlin/cn/tursom/core/buffer/impl/MarkedByteBuffer.kt index 5c3a46c..c82ef89 100644 --- a/src/main/kotlin/cn/tursom/core/buffer/impl/MarkedByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/buffer/impl/MarkedByteBuffer.kt @@ -2,7 +2,7 @@ package cn.tursom.core.buffer.impl import cn.tursom.core.buffer.ByteBuffer import cn.tursom.buffer.MarkableByteBuffer -import cn.tursom.buffer.ProxyByteBuffer +import cn.tursom.core.buffer.ProxyByteBuffer class MarkedByteBuffer(override val agent: ByteBuffer) : ProxyByteBuffer, MarkableByteBuffer, ByteBuffer by agent { private var writeMark = 0 diff --git a/src/main/kotlin/cn/tursom/core/buffer/impl/PooledByteBuffer.kt b/src/main/kotlin/cn/tursom/core/buffer/impl/PooledByteBuffer.kt index 3e9686d..bec27ea 100644 --- a/src/main/kotlin/cn/tursom/core/buffer/impl/PooledByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/buffer/impl/PooledByteBuffer.kt @@ -1,36 +1,47 @@ package cn.tursom.core.buffer.impl import cn.tursom.core.buffer.ByteBuffer -import cn.tursom.buffer.ProxyByteBuffer +import cn.tursom.core.buffer.ProxyByteBuffer +import cn.tursom.core.buffer.ClosedBufferException import cn.tursom.core.pool.MemoryPool -import java.util.concurrent.atomic.AtomicBoolean +import java.lang.ref.PhantomReference +import java.lang.ref.Reference +import java.lang.ref.ReferenceQueue +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger +import kotlin.concurrent.thread /** * 在被垃圾回收时能保证释放占用的内存池内存 */ class PooledByteBuffer( - override val agent: ByteBuffer, + agent: ByteBuffer, val pool: MemoryPool, - val token: Int -) : ProxyByteBuffer, ByteBuffer by agent { - /** - * 这个变量保证 buffer 不会被释放多次 - */ - private val open = AtomicBoolean(true) + val token: Int, + autoClose: Boolean = false, +) : ProxyByteBuffer, CloseSafeByteBuffer(agent) { + private val reference = if (autoClose) PhantomReference(this, allocatedReferenceQueue) else null + + init { + if (reference != null) allocatedMap[reference] = pool to token + } + private val childCount = AtomicInteger(0) override val resized get() = agent.resized - override val closed: Boolean get() = !open.get() && !resized override fun close() { - if (childCount.get() == 0) { - if (open.compareAndSet(true, false)) { + if (tryClose()) { + if (childCount.get() == 0) { + if (reference != null) allocatedMap.remove(reference) pool.free(this) } } } override fun resize(newSize: Int): Boolean { + if (closed) { + return false + } val successful = agent.resize(newSize) if (successful) { close() @@ -39,14 +50,35 @@ class PooledByteBuffer( } override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer { + if (closed) { + throw ClosedBufferException("PooledByteBuffer has closed.") + } return SplitByteBuffer(this, childCount, agent.slice(position, size, readPosition, writePosition)) } + override fun toString(): String { - return "PooledByteBuffer(buffer=$agent, pool=$pool, token=$token, open=$open)" + return "PooledByteBuffer(buffer=$agent, pool=$pool, token=$token, closed=$closed)" } - protected fun finalize() { - pool.free(this) + //protected fun finalize() { + // pool.free(this) + //} + + companion object { + private val allocatedReferenceQueue = ReferenceQueue() + private val allocatedMap = ConcurrentHashMap, Pair>() + + init { + thread(isDaemon = true) { + while (true) { + val (pool, token) = allocatedMap.remove(allocatedReferenceQueue.remove() ?: return@thread) ?: continue + try { + pool.free(token) + } catch (e: Exception) { + } + } + } + } } } \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/buffer/impl/SplitByteBuffer.kt b/src/main/kotlin/cn/tursom/core/buffer/impl/SplitByteBuffer.kt index 5de4917..99c8b2b 100644 --- a/src/main/kotlin/cn/tursom/core/buffer/impl/SplitByteBuffer.kt +++ b/src/main/kotlin/cn/tursom/core/buffer/impl/SplitByteBuffer.kt @@ -1,7 +1,7 @@ package cn.tursom.core.buffer.impl import cn.tursom.core.buffer.ByteBuffer -import cn.tursom.buffer.ProxyByteBuffer +import cn.tursom.core.buffer.ProxyByteBuffer import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger diff --git a/src/main/kotlin/cn/tursom/core/datastruct/AtomicBitSet.kt b/src/main/kotlin/cn/tursom/core/datastruct/AtomicBitSet.kt index 20e77af..b3a2307 100644 --- a/src/main/kotlin/cn/tursom/core/datastruct/AtomicBitSet.kt +++ b/src/main/kotlin/cn/tursom/core/datastruct/AtomicBitSet.kt @@ -1,8 +1,12 @@ package cn.tursom.core.datastruct +import cn.tursom.core.randomInt +import cn.tursom.core.usingTime import java.io.Serializable import java.lang.reflect.Field +import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicLongArray +import kotlin.random.Random class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : Serializable { @Volatile @@ -25,6 +29,7 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S bitSet.array.forEach { count += it.bitCount } return count } + val upCount get() = trueCount init { val default = if (defaultState) -1L else 0L @@ -37,17 +42,19 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S return bitSet[(index shr 6).toInt()] and getArr[index.toInt() and 63] != 0L } - fun up(index: Long): Boolean { + fun up(index: Long, fromDownToUp: Boolean = true): Boolean { val arrayIndex = (index shr 6).toInt() //bitSet[arrayIndex] = bitSet[arrayIndex] or getArr[index.toInt() and 63] - val expect = bitSet[arrayIndex] + var expect = bitSet[arrayIndex] + if (fromDownToUp) expect = expect and setArr[index.toInt() and 63] return bitSet.compareAndSet(arrayIndex, expect, expect or getArr[index.toInt() and 63]) } - fun down(index: Long): Boolean { + fun down(index: Long, fromUpToDown: Boolean = true): Boolean { val arrayIndex = (index shr 6).toInt() //bitSet[arrayIndex] = bitSet[arrayIndex] and setArr[index.toInt() and 63] - val expect = bitSet[arrayIndex] + var expect = bitSet[arrayIndex] + if (fromUpToDown) expect = expect and getArr[index.toInt() and 63] return bitSet.compareAndSet(arrayIndex, expect, expect and setArr[index.toInt() and 63]) } @@ -80,7 +87,33 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S } fun firstUp(): Long { - bitSet.forEachIndexed { index, l -> + return scanUp(0, bitSet.length()) + } + + fun randomUpIndex(): Long { + val startIndex = Random.nextInt(bitSet.length()) + var scan = scanUp(startIndex, bitSet.length() - startIndex) + if (scan >= 0) return scan + scan = scanUp(startIndex - 1, startIndex, false) + if (scan >= 0) return scan + return -1 + } + + fun firstDown(): Long { + return scanDown(0, bitSet.length()) + } + + fun getDownIndex(): Long { + val startIndex = Random.nextInt(bitSet.length()) + var scan = scanDown(startIndex, bitSet.length() - startIndex) + if (scan >= 0) return scan + scan = scanDown(startIndex - 1, startIndex, false) + if (scan >= 0) return scan + return -1 + } + + private fun scanUp(fromIndex: Int, length: Int, asc: Boolean = true): Long { + bitSet.forEachIndexed(fromIndex, length, asc) { index, l -> if (l != 0L) { for (i in 0 until 8) { if (l and scanArray[i] != 0L) { @@ -95,8 +128,8 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S return -1 } - fun firstDown(): Long { - bitSet.forEachIndexed { index, l -> + private fun scanDown(fromIndex: Int, length: Int, asc: Boolean = true): Long { + bitSet.forEachIndexed(fromIndex, length, asc) { index, l -> if (l != -1L) { for (i in 0 until 8) { if (l.inv() and scanArray[i] != 0L) { @@ -164,9 +197,9 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S private val Long.bitCount get() = bitCountArray[toInt().and(0xff)] + bitCountArray[shr(8).toInt().and(0xff)] + - bitCountArray[shr(16).toInt().and(0xff)] + bitCountArray[shr(24).toInt().and(0xff)] + - bitCountArray[shr(32).toInt().and(0xff)] + bitCountArray[shr(40).toInt().and(0xff)] + - bitCountArray[shr(48).toInt().and(0xff)] + bitCountArray[shr(56).toInt().and(0xff)] + bitCountArray[shr(16).toInt().and(0xff)] + bitCountArray[shr(24).toInt().and(0xff)] + + bitCountArray[shr(32).toInt().and(0xff)] + bitCountArray[shr(40).toInt().and(0xff)] + + bitCountArray[shr(48).toInt().and(0xff)] + bitCountArray[shr(56).toInt().and(0xff)] private val AtomicLongArray.size get() = length() @@ -177,10 +210,45 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S array.isAccessible = true } - inline fun AtomicLongArray.forEachIndexed(action: (index: Int, Long) -> Unit) { + private inline fun AtomicLongArray.forEachIndexed(action: (index: Int, Long) -> Unit) { repeat(length()) { action(it, get(it)) } } + + private inline fun AtomicLongArray.forEachIndexed(startIndex: Int, length: Int = length(), asc: Boolean = true, action: (index: Int, Long) -> Unit) { + repeat(length) { + val index = if (asc) { + startIndex + it + } else { + startIndex - it + } + //scand.incrementAndGet() + action(index, get(index)) + } + } } } + +//val scand = AtomicInteger(0) + +fun main() { + val size = 1000000 + val bitSet = AtomicBitSet(size.toLong()) + println(usingTime { + repeat(1000) { + bitSet.downAll() + repeat(size) { + val index = bitSet.getDownIndex() + bitSet.up(index) + repeat(randomInt(0, 3) / 2) { + val randomUpIndex = bitSet.randomUpIndex() + if (randomUpIndex >= 0) { + bitSet.down(randomUpIndex) + } + } + } + } + }) + //println(scand.get() / 100) +} \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/encrypt/AbstractPublicKeyEncrypt.kt b/src/main/kotlin/cn/tursom/core/encrypt/AbstractPublicKeyEncrypt.kt index e9844f5..0bb9244 100644 --- a/src/main/kotlin/cn/tursom/core/encrypt/AbstractPublicKeyEncrypt.kt +++ b/src/main/kotlin/cn/tursom/core/encrypt/AbstractPublicKeyEncrypt.kt @@ -1,14 +1,21 @@ package cn.tursom.core.encrypt +import cn.tursom.core.toHexString +import cn.tursom.core.toUTF8String import java.security.* +import java.security.interfaces.RSAPublicKey import java.security.spec.X509EncodedKeySpec import javax.crypto.Cipher +import kotlin.experimental.xor +import kotlin.math.min +import kotlin.random.Random @Suppress("unused", "MemberVisibilityCanBePrivate") abstract class AbstractPublicKeyEncrypt( val algorithm: String, final override val publicKey: PublicKey, - final override val privateKey: PrivateKey? = null + final override val privateKey: PrivateKey? = null, + val modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB, ) : PublicKeyEncrypt { val publicKeyEncoded get() = publicKey.encoded!! val privateKeyEncoded get() = privateKey?.encoded @@ -39,67 +46,56 @@ abstract class AbstractPublicKeyEncrypt( override fun signature(digest: String): String = this@AbstractPublicKeyEncrypt.signature(digest) } - constructor(algorithm: String, keyPair: KeyPair) : this(algorithm, keyPair.public as PublicKey, keyPair.private as PrivateKey) - - constructor(algorithm: String, keySize: Int = 1024) : this(algorithm, KeyPairGenerator.getInstance(algorithm).let { - it.initialize(keySize) - it.generateKeyPair() - }) - - constructor(algorithm: String, publicKey: ByteArray) : this(algorithm, KeyFactory.getInstance(algorithm).generatePublic(X509EncodedKeySpec(publicKey)) as PublicKey) - - override fun encrypt(data: ByteArray, offset: Int, size: Int): ByteArray { - return if (size < encryptMaxLen) { - encryptCipher.doFinal(data, offset, size) - } else { - val buffer = ByteArray(((size - 1) / encryptMaxLen + 1) * decryptMaxLen) - buffer.copyOf(doFinal(data, offset, size, buffer, encryptCipher, encryptMaxLen)) - } + private val blockCipher: Encrypt = when (modeOfOperation) { + BlockCipherModeOfOperation.ECB -> ECBBlockCipher() + BlockCipherModeOfOperation.CBC -> CBCBlockCipher() + else -> TODO() } - override fun decrypt(data: ByteArray, offset: Int, size: Int): ByteArray { - return if (data.size < decryptMaxLen) { - decryptCipher.doFinal(data, offset, size) - } else { - val buffer = ByteArray(size / decryptMaxLen * encryptMaxLen + 11) - buffer.copyOf(doFinal(data, offset, size, buffer, decryptCipher, decryptMaxLen)) + override var encryptInitVector: ByteArray? + get() = blockCipher.encryptInitVector + set(value) { + blockCipher.encryptInitVector = value + } + override var decryptInitVector: ByteArray? + get() = blockCipher.decryptInitVector + set(value) { + blockCipher.decryptInitVector = value } - } - override fun encrypt(data: ByteArray, buffer: ByteArray, bufferOffset: Int, offset: Int, size: Int): Int { - return if (data.size < decryptMaxLen) { - encryptCipher.doFinal(data, offset, size, buffer, bufferOffset) - } else { - doFinal(data, offset, size, buffer, encryptCipher, decryptMaxLen, bufferOffset) - } - } + constructor( + algorithm: String, + keyPair: KeyPair, + modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB, + ) : this(algorithm, keyPair.public as PublicKey, keyPair.private as PrivateKey, modeOfOperation = modeOfOperation) - override fun decrypt(data: ByteArray, buffer: ByteArray, bufferOffset: Int, offset: Int, size: Int): Int { - return if (data.size < decryptMaxLen) { - decryptCipher.doFinal(data, offset, size, buffer, bufferOffset) - } else { - doFinal(data, offset, size, buffer, decryptCipher, decryptMaxLen, bufferOffset) - } - } + constructor( + algorithm: String, + keySize: Int = 1024, + modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB, + ) : this( + algorithm, + KeyPairGenerator.getInstance(algorithm).let { + it.initialize(keySize) + it.generateKeyPair() + }, + modeOfOperation = modeOfOperation + ) - private fun doFinal( - data: ByteArray, - offset: Int, - size: Int, - buffer: ByteArray, - cipher: Cipher, - blockSize: Int, - bufferOffset: Int = 0 - ): Int { - var readPosition = offset - var writeIndex = bufferOffset - while (readPosition + blockSize < size) { - writeIndex += cipher.doFinal(data, readPosition, blockSize, buffer, writeIndex) - readPosition += blockSize - } - writeIndex += cipher.doFinal(data, readPosition, size - readPosition, buffer, writeIndex) - return writeIndex - bufferOffset - } + constructor( + algorithm: String, + publicKey: ByteArray, + modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB, + ) : this( + algorithm, + KeyFactory.getInstance(algorithm).generatePublic(X509EncodedKeySpec(publicKey)) as PublicKey, + modeOfOperation = modeOfOperation + ) + + override fun encrypt(data: ByteArray, offset: Int, size: Int): ByteArray = blockCipher.encrypt(data, offset, size) + override fun decrypt(data: ByteArray, offset: Int, size: Int): ByteArray = blockCipher.decrypt(data, offset, size) + override fun encrypt(data: ByteArray, buffer: ByteArray, bufferOffset: Int, offset: Int, size: Int): Int = blockCipher.encrypt(data, buffer, bufferOffset, offset, size) + override fun decrypt(data: ByteArray, buffer: ByteArray, bufferOffset: Int, offset: Int, size: Int): Int = blockCipher.decrypt(data, buffer, bufferOffset, offset, size) protected open fun signature(digest: String) = "${digest}with$algorithm" @@ -134,4 +130,162 @@ abstract class AbstractPublicKeyEncrypt( result = 31 * result + (privateKey?.hashCode() ?: 0) return result } + + protected inner class ECBBlockCipher : Encrypt { + override fun encrypt(data: ByteArray, offset: Int, size: Int): ByteArray { + return if (size < encryptMaxLen) { + encryptCipher.doFinal(data, offset, size) + } else { + val buffer = ByteArray(((size - 1) / encryptMaxLen + 1) * decryptMaxLen) + buffer.copyOf(doFinal(data, offset, size, buffer, encryptCipher, encryptMaxLen)) + } + } + + override fun decrypt(data: ByteArray, offset: Int, size: Int): ByteArray { + return if (data.size < decryptMaxLen) { + decryptCipher.doFinal(data, offset, size) + } else { + val buffer = ByteArray(size / decryptMaxLen * encryptMaxLen + 11) + buffer.copyOf(doFinal(data, offset, size, buffer, decryptCipher, decryptMaxLen)) + } + } + + override fun encrypt(data: ByteArray, buffer: ByteArray, bufferOffset: Int, offset: Int, size: Int): Int { + return if (data.size < decryptMaxLen) { + encryptCipher.doFinal(data, offset, size, buffer, bufferOffset) + } else { + doFinal(data, offset, size, buffer, encryptCipher, decryptMaxLen, bufferOffset) + } + } + + override fun decrypt(data: ByteArray, buffer: ByteArray, bufferOffset: Int, offset: Int, size: Int): Int { + return if (data.size < decryptMaxLen) { + decryptCipher.doFinal(data, offset, size, buffer, bufferOffset) + } else { + doFinal(data, offset, size, buffer, decryptCipher, decryptMaxLen, bufferOffset) + } + } + + private fun doFinal( + data: ByteArray, + offset: Int, + size: Int, + buffer: ByteArray, + cipher: Cipher, + blockSize: Int, + bufferOffset: Int = 0, + ): Int { + var readPosition = offset + var writeIndex = bufferOffset + while (readPosition + blockSize < size) { + writeIndex += cipher.doFinal(data, readPosition, blockSize, buffer, writeIndex) + readPosition += blockSize + } + writeIndex += cipher.doFinal(data, readPosition, size - readPosition, buffer, writeIndex) + return writeIndex - bufferOffset + } + } + + protected inner class CBCBlockCipher : Encrypt { + override var encryptInitVector: ByteArray? = Random.nextBytes(encryptMaxLen) + set(value) { + value ?: return + field = value + encBuf = value + } + override var decryptInitVector: ByteArray? = null + set(value) { + field = value + decBuf = value + } + + private var encBuf = encryptInitVector!! + private var decBuf: ByteArray? = decryptInitVector + + override fun encrypt(data: ByteArray, offset: Int, size: Int): ByteArray { + val buffer = ByteArray(((size - 1) / encryptMaxLen + 1) * decryptMaxLen) + //return buffer.copyOf(encrypt(data, buffer, 0, offset, size)) + encrypt(data, buffer, 0, offset, size) + return buffer + } + + override fun encrypt(data: ByteArray, buffer: ByteArray, bufferOffset: Int, offset: Int, size: Int): Int { + var end = offset + var start: Int + var writeIndex = bufferOffset + do { + start = end + end += encryptMaxLen + end = min(data.size, end) + (0 until end - start).forEach { index -> + encBuf[index] = encBuf[index] xor data[start + index] + } + writeIndex += encryptCipher.doFinal(encBuf, 0, encBuf.size, buffer, writeIndex) + //println("${data.size} $start->$end $writeIndex") + } while (end < offset + size) + return writeIndex - bufferOffset + } + + override fun decrypt(data: ByteArray, offset: Int, size: Int): ByteArray { + val decryptInitVector = decBuf!! + var start: Int + var end = offset + val buffer = ByteArray(((size - 1) / decryptMaxLen + 1) * encryptMaxLen + 11) + var writeIndex = 0 + do { + start = end + end += decryptMaxLen + end = min(data.size, end) + println("${data.size}, $start->$end, ${buffer.size}, $writeIndex") + val writeIndexBefore = writeIndex + writeIndex += decryptCipher.doFinal(data, start, end - start, buffer, writeIndex) + if (start == 0) { + repeat(encryptMaxLen) { + buffer[it] = buffer[it] xor decryptInitVector[it] + } + } else { + repeat(writeIndex - writeIndexBefore) { + buffer[writeIndexBefore + it] = buffer[writeIndexBefore + it] xor data[start + it] + } + } + } while (end < offset + size) + decBuf = buffer.copyOfRange(buffer.size - encryptMaxLen, buffer.size) + return buffer.copyOf(writeIndex) + } + + override fun decrypt(data: ByteArray, buffer: ByteArray, bufferOffset: Int, offset: Int, size: Int): Int { + TODO("Not yet implemented") + } + + //private fun doFinal(data: ByteArray, buffer: ByteArray, bufferOffset: Int, offset: Int, size: Int, cipher: Cipher): Int { + // var start = offset + // var end = offset + // var writeIndex = bufferOffset + // do { + // end += decryptMaxLen + // end = min(data.size, end) + // encBuf.indices.forEach { index -> + // encBuf[index] = encBuf[index] xor data[start + index] + // } + // writeIndex += cipher.doFinal(encBuf, 0, encBuf.size, buffer, writeIndex) + // start += decryptMaxLen + // } while (end < offset + size) + // return writeIndex - bufferOffset + //} + } + + companion object { + private val random = Random(System.currentTimeMillis()) + } +} + + +fun main() { + val source = "HelloWorld".repeat(100).toByteArray() + val rsa = RSA() + val decodeRsa = rsa.public + decodeRsa.decryptInitVector = rsa.encryptInitVector + val encrypt = rsa.encrypt(source) + //println(encrypt.toHexString()) + println(decodeRsa.decrypt(encrypt).toUTF8String()) } \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/encrypt/BlockCipherModeOfOperation.kt b/src/main/kotlin/cn/tursom/core/encrypt/BlockCipherModeOfOperation.kt new file mode 100644 index 0000000..d1925fa --- /dev/null +++ b/src/main/kotlin/cn/tursom/core/encrypt/BlockCipherModeOfOperation.kt @@ -0,0 +1,5 @@ +package cn.tursom.core.encrypt + +enum class BlockCipherModeOfOperation { + ECB, CBC, CFB, OFB, CTR, +} \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/encrypt/Encrypt.kt b/src/main/kotlin/cn/tursom/core/encrypt/Encrypt.kt index c36e53e..5c24efe 100644 --- a/src/main/kotlin/cn/tursom/core/encrypt/Encrypt.kt +++ b/src/main/kotlin/cn/tursom/core/encrypt/Encrypt.kt @@ -3,6 +3,13 @@ package cn.tursom.core.encrypt import cn.tursom.core.buffer.ByteBuffer interface Encrypt { + var encryptInitVector: ByteArray? + get() = null + set(_) {} + var decryptInitVector: ByteArray? + get() = null + set(_) {} + fun encrypt(data: ByteArray, offset: Int = 0, size: Int = data.size - offset): ByteArray fun decrypt(data: ByteArray, offset: Int = 0, size: Int = data.size - offset): ByteArray fun encrypt(data: ByteArray, buffer: ByteArray, bufferOffset: Int = 0, offset: Int = 0, size: Int = data.size - offset): Int diff --git a/src/main/kotlin/cn/tursom/core/encrypt/RSA.kt b/src/main/kotlin/cn/tursom/core/encrypt/RSA.kt index ca9d7e5..cc13f15 100644 --- a/src/main/kotlin/cn/tursom/core/encrypt/RSA.kt +++ b/src/main/kotlin/cn/tursom/core/encrypt/RSA.kt @@ -10,28 +10,49 @@ import java.security.spec.X509EncodedKeySpec @Suppress("unused", "MemberVisibilityCanBePrivate") class RSA( publicKey: RSAPublicKey, - privateKey: RSAPrivateKey? = null -) : AbstractPublicKeyEncrypt("RSA", publicKey, privateKey) { + privateKey: RSAPrivateKey? = null, + modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB, +) : AbstractPublicKeyEncrypt("RSA", publicKey, privateKey, modeOfOperation = modeOfOperation) { - val keySize = publicKey.modulus.bitLength() - override val decryptMaxLen = keySize / 8 - override val encryptMaxLen = decryptMaxLen - 11 + val keySize get() = (publicKey as RSAPublicKey).modulus.bitLength() + override val decryptMaxLen get() = keySize / 8 + override val encryptMaxLen get() = decryptMaxLen - 11 - override val public by lazy { + override val public by lazy { if (privateKey == null) { this } else { - RSA(publicKey) + RSA(publicKey, modeOfOperation = modeOfOperation) } } - constructor(keyPair: KeyPair) : this(keyPair.public as RSAPublicKey, keyPair.private as RSAPrivateKey) + constructor( + keyPair: KeyPair, + modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB, + ) : this( + keyPair.public as RSAPublicKey, + keyPair.private as RSAPrivateKey, + modeOfOperation + ) - constructor(keySize: Int = 1024) : this(KeyPairGenerator.getInstance("RSA").let { - it.initialize(keySize) - it.generateKeyPair() - }) + constructor( + keySize: Int = 1024, + modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB, + ) : this( + KeyPairGenerator.getInstance("RSA").let { + it.initialize(keySize) + it.generateKeyPair() + }, + modeOfOperation + ) - constructor(publicKey: ByteArray) : this(KeyFactory.getInstance("RSA").generatePublic(X509EncodedKeySpec(publicKey)) as RSAPublicKey) + constructor( + publicKey: ByteArray, + modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB, + ) : this( + KeyFactory.getInstance("RSA").generatePublic(X509EncodedKeySpec(publicKey)) as RSAPublicKey, + null, + modeOfOperation + ) } diff --git a/src/main/kotlin/cn/tursom/core/functional.kt b/src/main/kotlin/cn/tursom/core/functional.kt new file mode 100644 index 0000000..26c8346 --- /dev/null +++ b/src/main/kotlin/cn/tursom/core/functional.kt @@ -0,0 +1,4 @@ +package cn.tursom.core + +inline fun with(v: T, crossinline action: (T) -> R): () -> R = { action(v) } +inline fun with(v1: T1, v2: T2, crossinline action: (T1, T2) -> R): () -> R = { action(v1, v2) } \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/pool/AbstractMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/AbstractMemoryPool.kt index 6ca2928..43112b7 100644 --- a/src/main/kotlin/cn/tursom/core/pool/AbstractMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/AbstractMemoryPool.kt @@ -6,40 +6,45 @@ import cn.tursom.core.buffer.impl.PooledByteBuffer import cn.tursom.core.datastruct.AtomicBitSet abstract class AbstractMemoryPool( - val blockSize: Int, - val blockCount: Int, - val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(blockSize) }, - private val memoryPool: ByteBuffer + val blockSize: Int, + val blockCount: Int, + val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer, + private val memoryPool: ByteBuffer, + override var autoCollection: Boolean = false, ) : MemoryPool { private val bitMap = AtomicBitSet(blockCount.toLong()) val allocated: Int get() = bitMap.trueCount.toInt() + private fun getMemory(token: Int): ByteBuffer = synchronized(this) { - PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token) + PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token, autoCollection) } /** * @return token */ private fun allocate(): Int { - var index = bitMap.firstDown() + var index = bitMap.getDownIndex() while (index in 0 until blockCount) { if (bitMap.up(index)) { return index.toInt() } - index = if (bitMap[index]) bitMap.firstDown() else index + index = if (bitMap[index]) bitMap.getDownIndex() else index } return -1 } override fun free(memory: ByteBuffer) { if (memory is PooledByteBuffer && memory.pool == this) { - val token = memory.token - @Suppress("ControlFlowWithEmptyBody") - if (token in 0 until blockCount) while (!bitMap.down(token.toLong())); + free(memory.token) } } + override fun free(token: Int) { + @Suppress("ControlFlowWithEmptyBody") + if (token in 0 until blockCount) while (!bitMap.down(token.toLong(), false)); + } + override fun getMemoryOrNull(): ByteBuffer? { val token = allocate() return if (token in 0 until blockCount) { diff --git a/src/main/kotlin/cn/tursom/core/pool/DirectMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/DirectMemoryPool.kt index 76af9b2..55f1c55 100644 --- a/src/main/kotlin/cn/tursom/core/pool/DirectMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/DirectMemoryPool.kt @@ -8,7 +8,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer class DirectMemoryPool( blockSize: Int = 1024, blockCount: Int = 16, - emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) } + emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer ) : AbstractMemoryPool( blockSize, blockCount, diff --git a/src/main/kotlin/cn/tursom/core/pool/ExpandableMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/ExpandableMemoryPool.kt index 8bd3e8e..b851c9e 100644 --- a/src/main/kotlin/cn/tursom/core/pool/ExpandableMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/ExpandableMemoryPool.kt @@ -8,7 +8,10 @@ import java.util.concurrent.atomic.AtomicBoolean * 可自动申请新内存空间的内存池 * 线程安全 */ -class ExpandableMemoryPool(val maxPoolCount: Int = -1, private val poolFactory: () -> MemoryPool) : MemoryPool { +class ExpandableMemoryPool( + val maxPoolCount: Int = -1, + private val poolFactory: () -> MemoryPool, +) : MemoryPool { private val poolList = ConcurrentLinkedQueue() @Volatile @@ -30,7 +33,7 @@ class ExpandableMemoryPool(val maxPoolCount: Int = -1, private val poolFactory: poolList.add(usingPool) } - override fun free(memory: ByteBuffer) = throw NotImplementedError("ExpandableMemoryPool won't allocate any memory") + override fun free(memory: ByteBuffer) = Unit override fun getMemory(): ByteBuffer { var buffer = usingPool.getMemoryOrNull() diff --git a/src/main/kotlin/cn/tursom/core/pool/HeapMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/HeapMemoryPool.kt index 65e1797..88af606 100644 --- a/src/main/kotlin/cn/tursom/core/pool/HeapMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/HeapMemoryPool.kt @@ -8,7 +8,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer class HeapMemoryPool( blockSize: Int = 1024, blockCount: Int = 16, - emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) } + emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer ) : AbstractMemoryPool( blockSize, blockCount, diff --git a/src/main/kotlin/cn/tursom/core/pool/InstantMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/InstantMemoryPool.kt index c8b426e..88fa58a 100644 --- a/src/main/kotlin/cn/tursom/core/pool/InstantMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/InstantMemoryPool.kt @@ -8,7 +8,7 @@ import java.util.concurrent.ConcurrentLinkedQueue class InstantMemoryPool( val blockSize: Int, - val newMemory: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(blockSize) } + val newMemory: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer ) : MemoryPool { private val memoryList = ConcurrentLinkedQueue>() diff --git a/src/main/kotlin/cn/tursom/core/pool/LongBitSetAbstractMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/LongBitSetAbstractMemoryPool.kt index f8154dc..0e4e041 100644 --- a/src/main/kotlin/cn/tursom/core/pool/LongBitSetAbstractMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/LongBitSetAbstractMemoryPool.kt @@ -9,9 +9,10 @@ import cn.tursom.core.buffer.impl.PooledByteBuffer * 无锁,固定容量的内存池 */ abstract class LongBitSetAbstractMemoryPool( - val blockSize: Int, - val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(blockSize) }, - private val memoryPool: ByteBuffer + val blockSize: Int, + val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer, + private val memoryPool: ByteBuffer, + override var autoCollection: Boolean = false, ) : MemoryPool { private val bitMap = LongBitSet() val allocated: Int get() = bitMap.trueCount.toInt() @@ -22,7 +23,7 @@ abstract class LongBitSetAbstractMemoryPool( } private fun getMemory(token: Int): ByteBuffer = synchronized(this) { - return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token) + return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token, autoCollection) } /** @@ -41,12 +42,15 @@ abstract class LongBitSetAbstractMemoryPool( override fun free(memory: ByteBuffer) { if (memory is PooledByteBuffer && memory.pool == this) { - val token = memory.token - @Suppress("ControlFlowWithEmptyBody") - if (token >= 0) while (!bitMap.down(token)); + free(memory.token) } } + override fun free(token: Int) { + @Suppress("ControlFlowWithEmptyBody") + if (token >= 0) while (!bitMap.down(token)); + } + override fun getMemoryOrNull(): ByteBuffer? { val token = allocate() return if (token >= 0) { diff --git a/src/main/kotlin/cn/tursom/core/pool/LongBitSetDirectMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/LongBitSetDirectMemoryPool.kt index 77fd120..32b07b5 100644 --- a/src/main/kotlin/cn/tursom/core/pool/LongBitSetDirectMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/LongBitSetDirectMemoryPool.kt @@ -5,7 +5,7 @@ import cn.tursom.core.buffer.impl.DirectByteBuffer class LongBitSetDirectMemoryPool( blockSize: Int, - emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { DirectByteBuffer(it) } + emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::DirectByteBuffer ) : LongBitSetAbstractMemoryPool(blockSize, emptyPoolBuffer, DirectByteBuffer(64 * blockSize)) { override fun toString(): String { return "LongBitSetDirectMemoryPool(blockSize=$blockSize, blockCount=$blockCount, allocated=$allocated)" diff --git a/src/main/kotlin/cn/tursom/core/pool/LongBitSetHeapMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/LongBitSetHeapMemoryPool.kt index deb7d59..a9bb788 100644 --- a/src/main/kotlin/cn/tursom/core/pool/LongBitSetHeapMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/LongBitSetHeapMemoryPool.kt @@ -5,7 +5,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer class LongBitSetHeapMemoryPool ( blockSize: Int, - emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) } + emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer ) : LongBitSetAbstractMemoryPool(blockSize, emptyPoolBuffer, HeapByteBuffer(64 * blockSize)) { override fun toString(): String { return "LongBitSetDirectMemoryPool(blockSize=$blockSize, blockCount=$blockCount, allocated=$allocated)" diff --git a/src/main/kotlin/cn/tursom/core/pool/MarkedMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/MarkedMemoryPool.kt index bdf4787..1cef893 100644 --- a/src/main/kotlin/cn/tursom/core/pool/MarkedMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/MarkedMemoryPool.kt @@ -3,6 +3,7 @@ package cn.tursom.core.pool import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.impl.PooledByteBuffer import java.io.Closeable +import java.lang.ref.SoftReference /** * 可以记录与释放分配内存的内存池 @@ -11,21 +12,23 @@ import java.io.Closeable * 非线程安全 */ class MarkedMemoryPool(private val pool: MemoryPool) : MemoryPool by pool, Closeable { - private val allocatedList = ArrayList(2) + private val allocatedList = ArrayList>(2) override fun getMemory(): ByteBuffer { val memory = pool.getMemory() - allocatedList.add(memory) + allocatedList.add(SoftReference(memory)) return memory } override fun getMemoryOrNull(): ByteBuffer? { val memory = pool.getMemoryOrNull() - if (memory != null) allocatedList.add(memory) + if (memory != null) allocatedList.add(SoftReference(memory)) return memory } override fun close() { - allocatedList.forEach(ByteBuffer::close) + allocatedList.forEach { + it.get()?.close() + } allocatedList.clear() } @@ -36,12 +39,13 @@ class MarkedMemoryPool(private val pool: MemoryPool) : MemoryPool by pool, Close override fun toString(): String { val allocated = ArrayList(allocatedList.size) allocatedList.forEach { - if (it is PooledByteBuffer && !it.closed) allocated.add(it.token) + val buffer = it.get() + if (buffer is PooledByteBuffer && !buffer.closed) allocated.add(buffer.token) } return "MarkedMemoryPool(pool=$pool, allocated=$allocated)" } - protected fun finalize() { - close() - } + //protected fun finalize() { + // close() + //} } \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/pool/MemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/MemoryPool.kt index 3764844..af1d0bd 100644 --- a/src/main/kotlin/cn/tursom/core/pool/MemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/MemoryPool.kt @@ -7,24 +7,28 @@ import cn.tursom.core.buffer.ByteBuffer */ interface MemoryPool { val staticSize: Boolean get() = true + var autoCollection: Boolean + get() = false + set(value) {} // fun allocate(): Int fun free(memory: ByteBuffer) + fun free(token: Int) = Unit fun getMemory(): ByteBuffer fun getMemoryOrNull(): ByteBuffer? override fun toString(): String - suspend operator fun invoke(action: suspend (ByteBuffer) -> T): T { - return getMemory().use { buffer -> - action(buffer) - } - } - fun get() = getMemory() operator fun get(blockCount: Int): Array = Array(blockCount) { get() } fun gc() {} } + +inline operator fun MemoryPool.invoke(action: (ByteBuffer) -> T): T { + return getMemory().use { buffer -> + action(buffer) + } +} diff --git a/src/main/kotlin/cn/tursom/core/pool/ScalabilityMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/ScalabilityMemoryPool.kt index eae0087..4694d04 100644 --- a/src/main/kotlin/cn/tursom/core/pool/ScalabilityMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/ScalabilityMemoryPool.kt @@ -31,7 +31,7 @@ class ScalabilityMemoryPool(private val poolFactory: () -> MemoryPool) : MemoryP } } - override fun free(memory: ByteBuffer) = throw NotImplementedError("ExpandableMemoryPool won't allocate any memory") + override fun free(memory: ByteBuffer) = Unit override fun getMemory(): ByteBuffer { var buffer = usingPool.getMemoryOrNull() diff --git a/src/main/kotlin/cn/tursom/core/pool/ThreadLocalMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/ThreadLocalMemoryPool.kt index e7dedec..c7edb65 100644 --- a/src/main/kotlin/cn/tursom/core/pool/ThreadLocalMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/ThreadLocalMemoryPool.kt @@ -2,7 +2,9 @@ package cn.tursom.core.pool import cn.tursom.core.buffer.ByteBuffer -class ThreadLocalMemoryPool(private val poolFactory: () -> MemoryPool) : MemoryPool { +class ThreadLocalMemoryPool( + private val poolFactory: () -> MemoryPool +) : MemoryPool { private val threadLocal = ThreadLocal() override fun free(memory: ByteBuffer) = throw NotImplementedError("ThreadLocalMemoryPool won't allocate any memory") diff --git a/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeAbstractMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeAbstractMemoryPool.kt index b08fc55..1fd68f5 100644 --- a/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeAbstractMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeAbstractMemoryPool.kt @@ -6,10 +6,11 @@ import cn.tursom.core.buffer.impl.PooledByteBuffer import cn.tursom.core.datastruct.ArrayBitSet abstract class ThreadUnsafeAbstractMemoryPool( - val blockSize: Int, - val blockCount: Int, - val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(blockSize) }, - private val memoryPool: ByteBuffer + val blockSize: Int, + val blockCount: Int, + val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer, + private val memoryPool: ByteBuffer, + override var autoCollection: Boolean = false, ) : MemoryPool { private val bitMap = ArrayBitSet(blockCount.toLong()) val allocated: Int get() = bitMap.trueCount.toInt() @@ -29,7 +30,7 @@ abstract class ThreadUnsafeAbstractMemoryPool( } private fun unsafeGetMemory(token: Int): ByteBuffer { - return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token) + return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token,autoCollection) } /** diff --git a/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeDirectMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeDirectMemoryPool.kt index d8d59a7..c6b27df 100644 --- a/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeDirectMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeDirectMemoryPool.kt @@ -8,7 +8,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer class ThreadUnsafeDirectMemoryPool( blockSize: Int = 1024, blockCount: Int = 16, - emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) } + emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer, ) : ThreadUnsafeAbstractMemoryPool( blockSize, blockCount, diff --git a/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeHeapMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeHeapMemoryPool.kt index 0b93907..57edd50 100644 --- a/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeHeapMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeHeapMemoryPool.kt @@ -8,7 +8,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer class ThreadUnsafeHeapMemoryPool( blockSize: Int = 1024, blockCount: Int = 16, - emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) } + emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer, ) : ThreadUnsafeAbstractMemoryPool( blockSize, blockCount, diff --git a/src/main/kotlin/cn/tursom/core/timer/Timer.kt b/src/main/kotlin/cn/tursom/core/timer/Timer.kt index ece3f7c..9f2330f 100644 --- a/src/main/kotlin/cn/tursom/core/timer/Timer.kt +++ b/src/main/kotlin/cn/tursom/core/timer/Timer.kt @@ -1,6 +1,7 @@ package cn.tursom.core.timer import java.util.concurrent.* +import java.util.concurrent.atomic.AtomicInteger interface Timer { fun exec(timeout: Long, task: () -> Unit): TimerTask @@ -12,7 +13,7 @@ interface Timer { fun runNow(taskList: TaskQueue) { threadPool.execute { while (true) { - val task = taskList.take() ?: return@execute + val task = taskList.take() ?: break try { task() } catch (e: Throwable) { @@ -29,11 +30,11 @@ interface Timer { 0L, TimeUnit.MILLISECONDS, LinkedTransferQueue(), object : ThreadFactory { - var threadNumber = 0 + var threadNumber = AtomicInteger(0) override fun newThread(r: Runnable): Thread { val thread = Thread(r) thread.isDaemon = true - thread.name = "timer-worker-$threadNumber" + thread.name = "timer-worker-${threadNumber.incrementAndGet()}" return thread } }) diff --git a/src/main/kotlin/cn/tursom/core/timer/WheelTimer.kt b/src/main/kotlin/cn/tursom/core/timer/WheelTimer.kt index 55e92be..db2c6a5 100644 --- a/src/main/kotlin/cn/tursom/core/timer/WheelTimer.kt +++ b/src/main/kotlin/cn/tursom/core/timer/WheelTimer.kt @@ -11,13 +11,14 @@ import kotlin.concurrent.thread @Suppress("CanBeParameter", "MemberVisibilityCanBePrivate") class WheelTimer( - val tick: Long = 200, - val wheelSize: Int = 512, - val name: String = "wheelTimerLooper", - val taskQueueFactory: () -> TaskQueue = { NonLockTaskQueue() } + val tick: Long = 200, + val wheelSize: Int = 512, + val name: String = "wheelTimerLooper", + val taskQueueFactory: () -> TaskQueue = { NonLockTaskQueue() }, ) : Timer { var closed = false val taskQueueArray = AtomicReferenceArray(Array(wheelSize) { taskQueueFactory() }) + @Volatile private var position = 0 @@ -52,7 +53,7 @@ class WheelTimer( // runNow(outTimeQueue) //} thread(isDaemon = true, name = name) { - val startTime = CurrentTimeMillisClock.now + var startTime = CurrentTimeMillisClock.now while (!closed) { position %= wheelSize @@ -74,8 +75,10 @@ class WheelTimer( runNow(outTimeQueue) - val nextSleep = startTime + tick * position - CurrentTimeMillisClock.now + startTime += tick + val nextSleep = startTime - CurrentTimeMillisClock.now if (nextSleep > 0) sleep(tick) + //else System.err.println("timer has no delay") } } } @@ -84,10 +87,10 @@ class WheelTimer( val timer by lazy { WheelTimer(200, 1024) } val smoothTimer by lazy { WheelTimer(20, 128) } fun ScheduledThreadPoolExecutor.scheduleAtFixedRate( - var2: Long, - var4: Long, - var6: TimeUnit, - var1: () -> Unit + var2: Long, + var4: Long, + var6: TimeUnit, + var1: () -> Unit, ): ScheduledFuture<*> = scheduleAtFixedRate(var1, var2, var4, var6) } } \ No newline at end of file diff --git a/utils/async-http/build.gradle b/utils/async-http/build.gradle index 5758c26..0268fa7 100644 --- a/utils/async-http/build.gradle +++ b/utils/async-http/build.gradle @@ -4,7 +4,7 @@ dependencies { api project(":utils:xml") // kotlin 协程 - //implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1' + compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2' // kotlin 反射 //implementation "org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion" // OkHttp diff --git a/utils/build.gradle b/utils/build.gradle index 2ec4e36..3a9b114 100644 --- a/utils/build.gradle +++ b/utils/build.gradle @@ -2,10 +2,11 @@ dependencies { compile project(":") api "com.google.code.gson:gson:2.8.2" // kotlin 协程 - api 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1' + api 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2' // kotlin 反射 api "org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion" // 计算对象大小 api 'org.apache.lucene:lucene-core:4.0.0' api group: "io.netty", name: "netty-all", version: "4.1.43.Final" + api group: "io.netty", name: "netty-all", version: "4.1.43.Final" } diff --git a/utils/json/build.gradle b/utils/json/build.gradle new file mode 100644 index 0000000..e69de29 diff --git a/utils/json/src/main/kotlin/cn/tursom/utils/json/Json.kt b/utils/json/src/main/kotlin/cn/tursom/utils/json/Json.kt new file mode 100644 index 0000000..bde038c --- /dev/null +++ b/utils/json/src/main/kotlin/cn/tursom/utils/json/Json.kt @@ -0,0 +1,204 @@ +package cn.tursom.utils.json + +import com.sun.org.apache.xalan.internal.lib.ExsltMath.power +import java.lang.RuntimeException + +object Json { + class JsonFormatException(message: String? = null) : RuntimeException(message) { + internal constructor(content: JsonParseContent) : this("${content.json}[${content.index}]") + } + + fun parse(json: String): Any? { + val content = JsonParseContent(json) + val parse = parse(content) + jumpWhitespace(content) + if (content.index != json.length) throw JsonFormatException("$json[${json[content.index]}] remain characters") + return parse + } + + internal data class JsonParseContent(val json: String, var index: Int = 0) + + private fun parse(content: JsonParseContent): Any? { + jumpWhitespace(content) + return when (content.json[content.index]) { + '{' -> parseObj(content) + '[' -> parseArray(content) + '"' -> parseString(content) + '+', '-', in '0'..'9' -> parseNumber(content) + 't', 'f' -> parseBoolean(content) + 'n' -> parseNull(content) + else -> throw JsonFormatException(content) + } + } + + private fun parseNull(content: JsonParseContent) = if (content.json.startsWith("null", content.index)) { + content.index += 4 + null + } else throw JsonFormatException(content) + + @Suppress("ControlFlowWithEmptyBody") + private fun parseBoolean(content: JsonParseContent) = when { + content.json.startsWith("true", content.index) -> { + content.index += 4 + true + } + content.json.startsWith("false", content.index) -> { + content.index += 5 + false + } + else -> throw JsonFormatException(content) + } + + private fun jumpWhitespaceLoopCondition(json: String, index: Int) = index < json.length && json[index] in " \t\r\n" + + private fun jumpWhitespace(content: JsonParseContent) { + @Suppress("ControlFlowWithEmptyBody") + if (jumpWhitespaceLoopCondition(content.json, content.index)) while (jumpWhitespaceLoopCondition(content.json, ++content.index)); + } + + private fun charToInt(char: Char): Int { + val indexOf = char - '0' + if (indexOf < 0 || indexOf > 9) throw JsonFormatException("$char is not an number") + return indexOf + } + + private fun parseInt(content: JsonParseContent): Number { + var number = charToInt(content.json[content.index]).toLong() + while (++content.index < content.json.length && content.json[content.index] in '0'..'9') { + number = number * 10 + charToInt(content.json[content.index]) + } + return if (number <= Int.MAX_VALUE) number.toInt() else number + } + + private fun parseNumber(content: JsonParseContent): Number { + val negative = content.json[content.index] == '-' + if (negative || content.json[content.index] == '+') content.index++ + var number: Number = when (content.json[content.index]) { + in '0'..'9' -> parseInt(content) + else -> throw JsonFormatException(content) + } + if (content.index < content.json.length && content.json[content.index] == '.') { + if (++content.index >= content.json.length) throw JsonFormatException(content) + var base = 0.1 + var double = charToInt(content.json[content.index]) * base + while (++content.index < content.json.length && content.json[content.index] in '0'..'9') { + base *= 0.1 + double += charToInt(content.json[content.index]) * base + } + number = number.toDouble() + double + } + if (content.index < content.json.length && content.json[content.index] in "eE") { + val powerNegative = when (content.json[++content.index]) { + '-' -> true + '+' -> false + else -> { + content.index-- + false + } + } + content.index++ + number = number.toDouble() * power(10.0, parseInt(content).toLong() * if (powerNegative) -1.0 else 1.0) + } + return if (negative) when (number) { + is Int -> -number + is Long -> -number + else -> -number.toDouble() + } else number + } + + private fun parseString(content: JsonParseContent): String { + if (content.json[content.index++] != '"') throw JsonFormatException("string not begin with '\"'") + val builder = StringBuilder() + while (content.index < content.json.length) when (content.json[content.index]) { + '\\' -> { + when (content.json[++content.index]) { + 'b' -> builder.append('\b') + 'f' -> builder.append('\u000C') + 'n' -> builder.append('\n') + 'r' -> builder.append('\r') + 't' -> builder.append('\t') + 'u' -> { + var char = 0 + repeat(4) { + val indexOf = "0123456789abcdef".indexOf(content.json[++content.index].toLowerCase()) + if (indexOf < 0) throw JsonFormatException(content) + char = char * 16 + indexOf + } + builder.append(char.toChar()) + } + else -> builder.append(content.json[content.index]) + } + content.index++ + } + '"' -> { + content.index++ + return builder.toString() + } + else -> builder.append(content.json[content.index++]) + } + throw JsonFormatException(content) + } + + private fun parseObj(content: JsonParseContent): Map { + if (content.json[content.index++] != '{') throw JsonFormatException(content) + jumpWhitespace(content) + if (content.json[content.index] == '}') { + content.index++ + return emptyMap() + } + val map = HashMap() + while (true) { + jumpWhitespace(content) + val key = parseString(content) + jumpWhitespace(content) + if (content.json[content.index++] != ':') throw JsonFormatException(content) + map[key] = parse(content) + jumpWhitespace(content) + when (content.json[content.index++]) { + ',' -> continue + '}' -> break + else -> throw JsonFormatException("json object not ends with '}'") + } + } + return map + } + + private fun parseArray(content: JsonParseContent): List { + if (content.json[content.index++] != '[') throw JsonFormatException(content) + jumpWhitespace(content) + if (content.json[content.index] == ']') { + content.index++ + return emptyList() + } + val array = ArrayList() + while (true) { + array.add(parse(content)) + jumpWhitespace(content) + if (content.index >= content.json.length) throw JsonFormatException(content) + when (content.json[content.index++]) { + ',' -> continue + ']' -> break + else -> throw JsonFormatException(content) + } + } + return array + } +} + +//fun main() { +// println(Json.parse(" null ")) +// println(Json.parse(" true ")) +// println(Json.parse(" false ")) +// println(Json.parse(" 123 ")) +// println(Json.parse(" -123 ")) +// println(Json.parse(" 123.0 ")) +// println(Json.parse(" 123.0 ")) +// println(Json.parse(" 123e2 ")) +// println(Json.parse(" 123e10 ")) +// println(Json.parse(" -123.5e10 ")) +// println(Json.parse(" \"bb-12\\t3\\\".5e10aa\" ")) +// println(Json.parse(" {} ")) +// println(Json.parse(" {\"a\":3, \"c\": {}} ")) +// println(Json.parse("[1,3, 4 ,\"cc\\u0041\" , true , false , null , {}, {\"a\":\"b\"} , [ ] , [] , {\"a\":3, \"c\": {}, \"b\":[]}]")) +// println(Json.parse(" [1,3,4] ")) +//} \ No newline at end of file diff --git a/utils/math/build.gradle b/utils/math/build.gradle new file mode 100644 index 0000000..7d82dc7 --- /dev/null +++ b/utils/math/build.gradle @@ -0,0 +1,2 @@ +dependencies { +} diff --git a/utils/math/src/main/kotlin/cn/tursom/math/Complex.kt b/utils/math/src/main/kotlin/cn/tursom/math/Complex.kt new file mode 100644 index 0000000..481b2e0 --- /dev/null +++ b/utils/math/src/main/kotlin/cn/tursom/math/Complex.kt @@ -0,0 +1,270 @@ +package cn.tursom.math + +import java.util.* +import kotlin.math.cos +import kotlin.math.sin + + +data class Complex( + var r: Double = 0.0, + var i: Double = 0.0, +) { + constructor(r: Int, i: Int) : this(r.toDouble(), i.toDouble()) + + operator fun plus(complex: Complex) = Complex(r + complex.r, i + complex.i) + operator fun minus(complex: Complex) = Complex(r - complex.r, i - complex.i) + operator fun times(complex: Complex) = Complex(r * complex.r, i * complex.i) + operator fun plusAssign(complex: Complex) { + r += complex.r + i += complex.i + } + + operator fun timesAssign(complex: Complex) { + r *= complex.r + i *= complex.i + } + + // return abs/modulus/magnitude + fun abs(): Double { + return Math.hypot(r, i) + } + + // return angle/phase/argument, normalized to be between -pi and pi + fun phase(): Double { + return Math.atan2(i, r) + } + + // return a new object whose value is (this * alpha) + fun scale(alpha: Double): Complex { + return Complex(alpha * r, alpha * i) + } + + fun conjugate(): Complex { + return Complex(r, -i) + } + + fun reciprocal(): Complex { + val scale = r * r + i * i + return Complex(r / scale, -i / scale) + } + + // return the real or imaginary part + fun re(): Double { + return r + } + + fun im(): Double { + return r + } + + // return a / b + fun divides(b: Complex): Complex { + val a = this + return a.times(b.reciprocal()) + } + + // return a new Complex object whose value is the complex exponential of this + fun exp(): Complex { + return Complex(Math.exp(r) * Math.cos(i), Math.exp(r) * Math.sin(i)) + } + + // return a new Complex object whose value is the complex sine of this + fun sin(): Complex { + return Complex(Math.sin(r) * Math.cosh(i), Math.cos(r) * Math.sinh(i)) + } + + // return a new Complex object whose value is the complex cosine of this + fun cos(): Complex { + return Complex(Math.cos(r) * Math.cosh(i), -Math.sin(r) * Math.sinh(i)) + } + + // return a new Complex object whose value is the complex tangent of this + fun tan(): Complex { + return sin().divides(cos()) + } + + override fun toString(): String { + return "($r,$i)" + } +} + +object FFT { + fun fft(x: Array): Array { + val n = x.size + if (n == 1) return arrayOf(x[0]) + require(n % 2 == 0) { "n is not a power of 2" } + val even = Array(n / 2) { k -> + x[2 * k] + } + val evenFFT = fft(even) + for (k in 0 until n / 2) { + even[k] = x[2 * k + 1] + } + val oddFFT = fft(even) + val y = arrayOfNulls(n) + for (k in 0 until n / 2) { + val kth = -2 * k * Math.PI / n + val wk = Complex(cos(kth), sin(kth)) + y[k] = evenFFT[k].plus(wk.times(oddFFT[k])) + y[k + n / 2] = evenFFT[k].minus(wk.times(oddFFT[k])) + } + @Suppress("UNCHECKED_CAST") + return y as Array + } + + // compute the inverse FFT of x[], assuming its length n is a power of 2 + fun ifft(x: Array): Array { + val n = x.size + var y = Array(n) { i -> + x[i].conjugate() + } + + // compute forward FFT + y = fft(y as Array) + + // take conjugate again + for (i in 0 until n) { + y[i] = y[i].conjugate() + } + + // divide by n + for (i in 0 until n) { + y[i] = y[i].scale(1.0 / n) + } + return y + } + + fun cconvolve(x: Array, y: Array): Array { + require(x.size == y.size) { "Dimensions don't agree" } + val n = x.size + + val a = fft(x) + val b = fft(y) + + val c = Array(n) { i -> + a[i].times(b[i]) + } + return ifft(c) + } + + fun convolve(x: Array, y: Array): Array { + val ZERO = Complex(0, 0) + val a = Array(2 * x.size) { i -> + if (i in x.indices) { + x[i] + } else { + ZERO + } + } + val b = Array(2 * y.size) { i -> + if (i in y.indices) { + y[i] + } else { + ZERO + } + } + return cconvolve(a, b) + } + + // compute the DFT of x[] via brute force (n^2 time) + fun dft(x: Array): Array { + val n = x.size + val ZERO = Complex(0, 0) + val y = Array(n) { k -> + val data = ZERO + for (j in 0 until n) { + val power = k * j % n + val kth = -2 * power * Math.PI / n + val wkj = Complex(cos(kth), sin(kth)) + data.plusAssign(x[j] * wkj) + } + data + } + return y + } + + // display an array of Complex numbers to standard output + fun show(x: Array, title: String?) { + println(title) + println("-------------------") + for (i in x.indices) { + println(x[i]) + } + println() + } + + /*************************************************************************** + * Test client and sample execution + * + * % java FFT 4 + * x + * ------------------- + * -0.03480425839330703 + * 0.07910192950176387 + * 0.7233322451735928 + * 0.1659819820667019 + * + * y = fft(x) + * ------------------- + * 0.9336118983487516 + * -0.7581365035668999 + 0.08688005256493803i + * 0.44344407521182005 + * -0.7581365035668999 - 0.08688005256493803i + * + * z = ifft(y) + * ------------------- + * -0.03480425839330703 + * 0.07910192950176387 + 2.6599344570851287E-18i + * 0.7233322451735928 + * 0.1659819820667019 - 2.6599344570851287E-18i + * + * c = cconvolve(x, x) + * ------------------- + * 0.5506798633981853 + * 0.23461407150576394 - 4.033186818023279E-18i + * -0.016542951108772352 + * 0.10288019294318276 + 4.033186818023279E-18i + * + * d = convolve(x, x) + * ------------------- + * 0.001211336402308083 - 3.122502256758253E-17i + * -0.005506167987577068 - 5.058885073636224E-17i + * -0.044092969479563274 + 2.1934338938072244E-18i + * 0.10288019294318276 - 3.6147323062478115E-17i + * 0.5494685269958772 + 3.122502256758253E-17i + * 0.240120239493341 + 4.655566391833896E-17i + * 0.02755001837079092 - 2.1934338938072244E-18i + * 4.01805098805014E-17i + * + */ + @JvmStatic + fun main(args: Array) { + //val n = args[0].toInt() + val n = 8 + val x = Array(n) { i -> + Complex(sin(i.toDouble()), 0.0) + } + + show(x, "x") + + // FFT of original data + val y = fft(x) + show(y, "y = fft(x)") + + // FFT of original data + val y2 = dft(x) + show(y2, "y2 = dft(x)") + + // take inverse FFT + val z = ifft(y) + show(z, "z = ifft(y)") + + // circular convolution of x with itself + val c = cconvolve(x, x) + show(c, "c = cconvolve(x, x)") + + // linear convolution of x with itself + val d = convolve(x, x) + show(d, "d = convolve(x, x)") + } +} \ No newline at end of file diff --git a/utils/math/src/main/kotlin/cn/tursom/math/FFT.kt b/utils/math/src/main/kotlin/cn/tursom/math/FFT.kt new file mode 100644 index 0000000..005660d --- /dev/null +++ b/utils/math/src/main/kotlin/cn/tursom/math/FFT.kt @@ -0,0 +1,33 @@ +package cn.tursom.math + +import kotlin.math.PI +import kotlin.math.cos +import kotlin.math.sin + +fun fft1(a: Array): Array { + if (a.size == 1) return a + val a0 = Array(a.size shr 1) { + a[it shl 1] + } + val a1 = Array(a.size shr 1) { + a[(it shl 1) + 1] + } + fft1(a0) + fft1(a1) + val wn = Complex(cos(2 * PI / a.size), sin(2 * PI / a.size)) + val w = Complex(1.0, 0.0) + repeat(a.size shr 1) { k -> + a[k] = a0[k] + w * a1[k] + a[k + (a.size shr 1)] = a0[k] - w * a1[k] + w.plusAssign(wn) + } + return a +} + +fun main() { + val source = Array(8) { + Complex(sin(it.toDouble())) + } + println(source.asList()) + println(fft1(source).asList()) +} \ No newline at end of file diff --git a/utils/src/main/kotlin/cn/tursom/utils/WebSocketFrameWrapper.kt b/utils/src/main/kotlin/cn/tursom/utils/WebSocketFrameWrapper.kt new file mode 100644 index 0000000..4ace9e3 --- /dev/null +++ b/utils/src/main/kotlin/cn/tursom/utils/WebSocketFrameWrapper.kt @@ -0,0 +1,22 @@ +package cn.tursom.utils + +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import io.netty.channel.ChannelHandler +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelOutboundHandlerAdapter +import io.netty.channel.ChannelPromise +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame + +@ChannelHandler.Sharable +object WebSocketFrameWrapper : ChannelOutboundHandlerAdapter() { + override fun write(ctx: ChannelHandlerContext, msg: Any?, promise: ChannelPromise?) { + ctx.write(when (msg) { + is String -> TextWebSocketFrame(msg) + is ByteArray -> BinaryWebSocketFrame(Unpooled.wrappedBuffer(msg)) + is ByteBuf -> BinaryWebSocketFrame(msg) + else -> msg + }, promise) + } +} \ No newline at end of file diff --git a/utils/ws-client/build.gradle b/utils/ws-client/build.gradle index 4fadabb..107fa9e 100644 --- a/utils/ws-client/build.gradle +++ b/utils/ws-client/build.gradle @@ -1,6 +1,6 @@ dependencies { compile project(":") api project(":log") - api project(":utils") + implementation project(":utils") compile group: "io.netty", name: "netty-all", version: "4.1.43.Final" } \ No newline at end of file diff --git a/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClient.kt b/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClient.kt index d7fcbe8..b6a4c68 100644 --- a/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClient.kt +++ b/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClient.kt @@ -1,6 +1,7 @@ package cn.tursom.ws import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.utils.WebSocketFrameWrapper import cn.tursom.utils.bytebuffer.NettyByteBuffer import io.netty.bootstrap.Bootstrap import io.netty.buffer.ByteBuf @@ -17,13 +18,24 @@ import io.netty.handler.codec.http.HttpClientCodec import io.netty.handler.codec.http.HttpObjectAggregator import io.netty.handler.codec.http.websocketx.* import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler +import io.netty.handler.logging.LoggingHandler import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.util.InsecureTrustManagerFactory import java.net.URI -class WebSocketClient(uri: String, val handler: WebSocketHandler) { - private val uri: URI = URI.create(uri) +@Suppress("unused") +class WebSocketClient( + url: String, + val handler: WebSocketHandler, + val autoWrap: Boolean = true, + val log: Boolean = false, + val compressed: Boolean = true, + val maxContextLength: Int = 4096, + private val headers: Map? = null, + private val handshakerUri: URI? = null, +) { + private val uri: URI = URI.create(url) internal var ch: Channel? = null fun open() { @@ -53,35 +65,52 @@ class WebSocketClient(uri: String, val handler: WebSocketHandler) { } else { null } - - val handler = WebSocketClientChannelHandler( - WebSocketClientHandshakerFactory.newHandshaker( - uri, WebSocketVersion.V13, null, false, DefaultHttpHeaders() - ), this, handler - ) - val b = Bootstrap() - b.group(group) + val httpHeaders = DefaultHttpHeaders() + headers?.forEach { (k, v) -> + httpHeaders[k] = v + } + val handshakerAdapter = WebSocketClientHandshakerAdapter(WebSocketClientHandshakerFactory.newHandshaker( + handshakerUri ?: uri, WebSocketVersion.V13, null, true, httpHeaders + ), this, handler) + val handler = WebSocketClientChannelHandler(this, handler) + val bootstrap = Bootstrap() + bootstrap.group(group) .channel(NioSocketChannel::class.java) .handler(object : ChannelInitializer() { override fun initChannel(ch: SocketChannel) { - val p = ch.pipeline() - if (sslCtx != null) { - p.addLast(sslCtx.newHandler(ch.alloc(), host, port)) + ch.pipeline().apply { + if (log) { + addLast(LoggingHandler()) + } + if (sslCtx != null) { + addLast(sslCtx.newHandler(ch.alloc(), host, port)) + } + addLast(HttpClientCodec()) + addLast(HttpObjectAggregator(maxContextLength)) + if (compressed) { + addLast(WebSocketClientCompressionHandler.INSTANCE) + } + addLast(handshakerAdapter) + //if (log) { + // addLast(LoggingHandler()) + //} + addLast(handler) + if (autoWrap) { + addLast(WebSocketFrameWrapper) + } } - p.addLast( - HttpClientCodec(), - HttpObjectAggregator(8192), - WebSocketClientCompressionHandler.INSTANCE, - handler - ) } }) - b.connect(uri.host, port) + bootstrap.connect(uri.host, port) //handler.handshakeFuture().sync() } - fun close() { - ch?.writeAndFlush(CloseWebSocketFrame()) + fun close(reasonText: String? = null) { + if (reasonText == null) { + ch?.writeAndFlush(CloseWebSocketFrame()) + } else { + ch?.writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus.NORMAL_CLOSURE, reasonText)) + } ch?.closeFuture()?.sync() } @@ -123,6 +152,44 @@ class WebSocketClient(uri: String, val handler: WebSocketHandler) { return ch!!.writeAndFlush(TextWebSocketFrame(data)) } + fun ping(data: ByteArray): ChannelFuture { + return ch!!.writeAndFlush(PingWebSocketFrame(Unpooled.wrappedBuffer(data))) + } + + fun ping(data: ByteBuffer): ChannelFuture { + return ch!!.writeAndFlush( + PingWebSocketFrame( + when (data) { + is NettyByteBuffer -> data.byteBuf + else -> Unpooled.wrappedBuffer(data.getBytes()) + } + ) + ) + } + + fun ping(data: ByteBuf): ChannelFuture { + return ch!!.writeAndFlush(PingWebSocketFrame(data)) + } + + fun pong(data: ByteArray): ChannelFuture { + return ch!!.writeAndFlush(PongWebSocketFrame(Unpooled.wrappedBuffer(data))) + } + + fun pong(data: ByteBuffer): ChannelFuture { + return ch!!.writeAndFlush( + PongWebSocketFrame( + when (data) { + is NettyByteBuffer -> data.byteBuf + else -> Unpooled.wrappedBuffer(data.getBytes()) + } + ) + ) + } + + fun pong(data: ByteBuf): ChannelFuture { + return ch!!.writeAndFlush(PongWebSocketFrame(data)) + } + companion object { private val group: EventLoopGroup = NioEventLoopGroup() } diff --git a/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClientChannelHandler.kt b/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClientChannelHandler.kt index c4e1fe2..dae7df2 100644 --- a/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClientChannelHandler.kt +++ b/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClientChannelHandler.kt @@ -5,32 +5,14 @@ import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelPromise import io.netty.channel.SimpleChannelInboundHandler import io.netty.handler.codec.http.FullHttpResponse -import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame -import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame -import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker +import io.netty.handler.codec.http.websocketx.* import io.netty.util.CharsetUtil class WebSocketClientChannelHandler( - private val handshaker: WebSocketClientHandshaker, val client: WebSocketClient, - val handler: WebSocketHandler -) : SimpleChannelInboundHandler() { - private var handshakeFuture: ChannelPromise? = null - - fun handshakeFuture(): ChannelFuture? { - return handshakeFuture - } - - override fun handlerAdded(ctx: ChannelHandlerContext) { - handshakeFuture = ctx.newPromise() - } - - override fun channelActive(ctx: ChannelHandlerContext) { - client.ch = ctx.channel() - handshaker.handshake(ctx.channel()) - } + val handler: WebSocketHandler, +) : SimpleChannelInboundHandler() { override fun channelInactive(ctx: ChannelHandlerContext) { handler.onClose(client) @@ -39,34 +21,14 @@ class WebSocketClientChannelHandler( } } - override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) { + override fun channelRead0(ctx: ChannelHandlerContext, msg: WebSocketFrame) { val ch = ctx.channel() - if (!handshaker.isHandshakeComplete) { - // web socket client connected - handshaker.finishHandshake(ch, msg as FullHttpResponse) - handshakeFuture!!.setSuccess() - handler.onOpen(client) - return - } - if (msg is FullHttpResponse) { - throw Exception("Unexpected FullHttpResponse (getStatus=${msg.status()}, content=${msg.content().toString(CharsetUtil.UTF_8)})") - } when (msg) { is TextWebSocketFrame -> handler.readMessage(client, msg) is BinaryWebSocketFrame -> handler.readMessage(client, msg) + is PingWebSocketFrame -> handler.readPing(client, msg) + is PongWebSocketFrame -> handler.readPong(client, msg) is CloseWebSocketFrame -> ch.close() } } - - override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - try { - handler.onError(client, cause) - } catch (e: Exception) { - e.printStackTrace() - if (!handshakeFuture!!.isDone) { - handshakeFuture!!.setFailure(cause) - } - ctx.close() - } - } } \ No newline at end of file diff --git a/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClientHandshakerAdapter.kt b/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClientHandshakerAdapter.kt new file mode 100644 index 0000000..acc560a --- /dev/null +++ b/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketClientHandshakerAdapter.kt @@ -0,0 +1,52 @@ +package cn.tursom.ws + +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelPromise +import io.netty.channel.SimpleChannelInboundHandler +import io.netty.handler.codec.http.FullHttpResponse +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker +import io.netty.util.CharsetUtil + +class WebSocketClientHandshakerAdapter( + private val handshaker: WebSocketClientHandshaker, + private val client: WebSocketClient, + private val handler: WebSocketHandler, +) : SimpleChannelInboundHandler() { + private var handshakeFuture: ChannelPromise? = null + + override fun handlerAdded(ctx: ChannelHandlerContext) { + handshakeFuture = ctx.newPromise() + } + + override fun channelActive(ctx: ChannelHandlerContext) { + client.ch = ctx.channel() + handshaker.handshake(ctx.channel()) + } + + override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpResponse) { + if (!handshaker.isHandshakeComplete) { + handshaker.finishHandshake(ctx.channel(), msg) + handshakeFuture!!.setSuccess() + msg.retain() + ctx.fireChannelRead(msg) + handler.onOpen(client) + return + } else { + throw Exception("Unexpected FullHttpResponse (getStatus=${msg.status()}, content=${ + msg.content().toString(CharsetUtil.UTF_8) + })") + } + } + + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + try { + handler.onError(client, cause) + } catch (e: Exception) { + e.printStackTrace() + if (!handshakeFuture!!.isDone) { + handshakeFuture!!.setFailure(cause) + } + ctx.close() + } + } +} \ No newline at end of file diff --git a/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketHandler.kt b/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketHandler.kt index 7bb4e54..342052b 100644 --- a/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketHandler.kt +++ b/utils/ws-client/src/main/kotlin/cn/tursom/ws/WebSocketHandler.kt @@ -1,9 +1,12 @@ package cn.tursom.ws import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.toUTF8String import cn.tursom.utils.bytebuffer.NettyByteBuffer import io.netty.buffer.ByteBuf import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame import io.netty.handler.codec.http.websocketx.TextWebSocketFrame interface WebSocketHandler { @@ -31,4 +34,42 @@ interface WebSocketHandler { fun readMessage(client: WebSocketClient, msg: BinaryWebSocketFrame) { readMessage(client, msg.content()) } + + fun readPing(client: WebSocketClient, msg: PingWebSocketFrame) { + readPing(client, msg.content()) + } + + fun readPing(client: WebSocketClient, msg: ByteBuf) { + readPing(client, NettyByteBuffer(msg)) + } + + fun readPing(client: WebSocketClient, msg: ByteBuffer) { + readPing(client, msg.getBytes()) + } + + fun readPing(client: WebSocketClient, msg: ByteArray) { + readPing(client, msg.toUTF8String()) + } + + fun readPing(client: WebSocketClient, msg: String) { + } + + fun readPong(client: WebSocketClient, msg: PongWebSocketFrame) { + readPong(client, msg.content()) + } + + fun readPong(client: WebSocketClient, msg: ByteBuf) { + readPong(client, NettyByteBuffer(msg)) + } + + fun readPong(client: WebSocketClient, msg: ByteBuffer) { + readPong(client, msg.getBytes()) + } + + fun readPong(client: WebSocketClient, msg: ByteArray) { + readPong(client, msg.toUTF8String()) + } + + fun readPong(client: WebSocketClient, msg: String) { + } } \ No newline at end of file diff --git a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpServer.kt b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpServer.kt index 803d445..c6535f3 100644 --- a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpServer.kt +++ b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/NettyHttpServer.kt @@ -1,5 +1,6 @@ package cn.tursom.web.netty +import cn.tursom.utils.WebSocketFrameWrapper import cn.tursom.web.HttpHandler import cn.tursom.web.HttpServer import cn.tursom.web.WebSocketHandler @@ -29,8 +30,9 @@ class NettyHttpServer( var webSocketPath: Iterable>> = listOf(), var readTimeout: Int? = 60, var writeTimeout: Int? = null, - decodeType: NettyHttpDecodeType = NettyHttpDecodeType.MULTI_PART, - backlog: Int = 1024 + decodeType: NettyHttpDecodeType = if (webSocketPath.iterator().hasNext()) NettyHttpDecodeType.FULL_HTTP else NettyHttpDecodeType.MULTI_PART, + backlog: Int = 1024, + val wrapWebSocketFrame: Boolean = false, ) : HttpServer { constructor( port: Int, @@ -39,14 +41,16 @@ class NettyHttpServer( webSocketPath: Iterable>> = listOf(), readTimeout: Int? = 60, writeTimeout: Int? = null, - decodeType: NettyHttpDecodeType = NettyHttpDecodeType.MULTI_PART, - handler: (content: NettyHttpContent) -> Unit + decodeType: NettyHttpDecodeType = if (webSocketPath.iterator().hasNext()) NettyHttpDecodeType.FULL_HTTP else NettyHttpDecodeType.MULTI_PART, + backlog: Int = 1024, + wrapWebSocketFrame: Boolean = false, + handler: (content: NettyHttpContent) -> Unit, ) : this( port, object : HttpHandler { override fun handle(content: NettyHttpContent) = handler(content) }, - bodySize, autoRun, webSocketPath, readTimeout, writeTimeout, decodeType + bodySize, autoRun, webSocketPath, readTimeout, writeTimeout, decodeType, backlog, wrapWebSocketFrame ) var decodeType: NettyHttpDecodeType = decodeType @@ -84,6 +88,9 @@ class NettyHttpServer( pipeline.addLast("ws-$webSocketPath", WebSocketServerProtocolHandler(webSocketPath)) pipeline.addLast("wsHandler-$webSocketPath", NettyWebSocketHandler(ch, handler)) } + if (wrapWebSocketFrame && webSocketPath.iterator().hasNext()) { + pipeline.addLast(WebSocketFrameWrapper) + } pipeline.addLast("handle", httpHandler) } }) diff --git a/web/src/main/kotlin/cn/tursom/web/ResponseHeaderAdapter.kt b/web/src/main/kotlin/cn/tursom/web/ResponseHeaderAdapter.kt index 250f7ac..71ad772 100644 --- a/web/src/main/kotlin/cn/tursom/web/ResponseHeaderAdapter.kt +++ b/web/src/main/kotlin/cn/tursom/web/ResponseHeaderAdapter.kt @@ -19,12 +19,13 @@ interface ResponseHeaderAdapter { mustRevalidate: Boolean = false ) = setResponseHeader( "Cache-Control", "$cacheControl${ - if (maxAge != null && maxAge > 0) ", max-age=$maxAge" else ""}${ - if (mustRevalidate) ", must-revalidate" else "" + if (maxAge != null && maxAge > 0) ", max-age=$maxAge" else "" + }${ + if (mustRevalidate) ", must-revalidate" else "" }" ) - fun addCookie(cookie: Cookie) = addCookie(cookie.name, cookie.value, cookie.maxAge, cookie.domain, cookie.path, cookie.sameSite) + fun addCookie(cookie: Cookie) = addResponseHeader("Set-Cookie", cookie) fun addCookie( name: String, value: Any, @@ -32,15 +33,7 @@ interface ResponseHeaderAdapter { domain: String? = null, path: String? = null, sameSite: SameSite? = null - ) = addResponseHeader( - "Set-Cookie", - "$name=$value${ - if (maxAge > 0) "; Max-Age=$maxAge" else ""}${ - if (domain != null) "; Domain=$domain" else ""}${ - if (path != null) "; Path=$path" else ""}${ - if (sameSite != null) ": SameSite=$sameSite" else "" - }" - ) + ) = addCookie(Cookie(name, value.toString(), maxAge = maxAge, domain = domain, path = path, sameSite = sameSite)) fun setLanguage(language: String) { setResponseHeader("Content-Language", language) diff --git a/web/web-coroutine/build.gradle b/web/web-coroutine/build.gradle index a2ba88d..b7b102a 100644 --- a/web/web-coroutine/build.gradle +++ b/web/web-coroutine/build.gradle @@ -2,6 +2,6 @@ dependencies { implementation project(":web") api project(":json") api group: 'org.slf4j', name: 'slf4j-api', version: '1.7.29' - compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1' + compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2' compile group: 'org.jetbrains.kotlin', name: 'kotlin-reflect', version: kotlinVersion } \ No newline at end of file