Merge remote-tracking branch 'origin/master' into master

# Conflicts:
#	AsyncSocket/build.gradle
#	AsyncSocket/src/test/kotlin/cn/tursom/datagram/server/test.kt
#	build.gradle
This commit is contained in:
tursom 2021-02-28 17:45:15 +08:00
commit a1e13580f4
62 changed files with 1455 additions and 315 deletions

View File

@ -3,7 +3,6 @@ dependencies {
compile project(":log")
// kotlin
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9'
compile project(":log")
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2'
testRuntime group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-debug', version: '1.3.9'
}

View File

@ -22,29 +22,6 @@ interface AsyncNioChannel : AsyncChannel {
val nioThread: NioThread
val channel: SelectableChannel get() = key.channel()
private inline fun <T> operate(action: () -> T): T {
return try {
action()
} catch (e: Exception) {
waitMode()
throw e
}
}
suspend fun <T> write(timeout: Long, action: () -> T): T {
return operate {
waitWrite(timeout)
action()
}
}
suspend fun <T> read(timeout: Long, action: () -> T): T {
return operate {
waitRead(timeout)
action()
}
}
override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long
override suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long): Long
override suspend fun write(buffer: ByteBuffer, timeout: Long): Int = write(arrayOf(buffer), timeout).toInt()
@ -56,7 +33,7 @@ interface AsyncNioChannel : AsyncChannel {
file: FileChannel,
position: Long,
count: Long,
timeout: Long
timeout: Long,
): Long = write(timeout) {
file.transferTo(position, count, channel as WritableByteChannel)
}
@ -65,7 +42,7 @@ interface AsyncNioChannel : AsyncChannel {
file: FileChannel,
position: Long,
count: Long,
timeout: Long
timeout: Long,
): Long = read(timeout) {
file.transferFrom(channel as ReadableByteChannel, position, count)
}
@ -177,3 +154,26 @@ interface AsyncNioChannel : AsyncChannel {
val timer: Timer = WheelTimer.timer
}
}
inline fun <T> AsyncNioChannel.operate(action: () -> T): T {
return try {
action()
} catch (e: Exception) {
waitMode()
throw e
}
}
suspend fun <T> AsyncNioChannel.write(timeout: Long, action: () -> T): T {
return operate {
waitWrite(timeout)
action()
}
}
suspend fun <T> AsyncNioChannel.read(timeout: Long, action: () -> T): T {
return operate {
waitRead(timeout)
action()
}
}

View File

@ -1,6 +1,8 @@
package cn.tursom.datagram
import cn.tursom.channel.AsyncNioChannel
import cn.tursom.channel.read
import cn.tursom.channel.write
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.read
import cn.tursom.core.buffer.write
@ -12,8 +14,7 @@ interface AsyncDatagram : AsyncNioChannel {
override val channel: DatagramChannel
override fun getBuffed(pool: MemoryPool): BufferedAsyncDatagram = BufferedNioDatagram(pool, this)
override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long =
write(timeout) { channel.write(buffer) }
override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long = channel.write(buffer)
override suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long): Long =
read(timeout) { channel.read(buffer) }

View File

@ -8,16 +8,17 @@ import java.nio.channels.SelectionKey
open class NioDatagram(
override val channel: DatagramChannel,
override val key: SelectionKey,
override val nioThread: NioThread
override val nioThread: NioThread,
) : AsyncDatagram {
companion object {
suspend operator fun invoke(host: String, port: Int) = AsyncDatagramClient.connect(host, port)
suspend operator fun invoke(address: SocketAddress) = AsyncDatagramClient.connect(address)
}
override val open: Boolean get() = channel.isOpen && key.isValid
override val remoteAddress: SocketAddress get() = channel.remoteAddress
override fun writeMode() {}
override suspend fun <T> write(timeout: Long, action: () -> T): T {
return action()
}
override fun close() {
if (channel.isOpen || key.isValid) {
nioThread.execute {

View File

@ -2,6 +2,8 @@ package cn.tursom.socket
import cn.tursom.channel.AsyncNioChannel
import cn.tursom.channel.BufferedAsyncChannel
import cn.tursom.channel.read
import cn.tursom.channel.write
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.read
import cn.tursom.core.buffer.write

View File

@ -2,6 +2,8 @@ package cn.tursom.socket
import cn.tursom.channel.AsyncChannel.Companion.emptyBufferCode
import cn.tursom.channel.AsyncChannel.Companion.emptyBufferLongCode
import cn.tursom.channel.read
import cn.tursom.channel.write
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.read
import cn.tursom.core.buffer.write
@ -12,7 +14,18 @@ import java.nio.channels.SocketChannel
/**
* 异步协程套接字对象
*/
class NioSocket(override val key: SelectionKey, override val nioThread: NioThread) : AsyncSocket {
class NioSocket internal constructor(
override val key: SelectionKey,
override val nioThread: NioThread,
) : AsyncSocket {
companion object {
suspend operator fun invoke(
host: String,
port: Int,
timeout: Long = 0,
) = NioClient.connect(host, port, timeout)
}
override val channel: SocketChannel = key.channel() as SocketChannel
override val open: Boolean get() = channel.isOpen && key.isValid

View File

@ -13,7 +13,7 @@ import kotlinx.coroutines.GlobalScope
* 带内存池的 NIO 套接字服务器
* 在处理结束后会自动释放由内存池分配的内存
*/
open class BuffedNioServer(
open class BufferedNioServer(
port: Int,
val memoryPool: MemoryPool,
backlog: Int = 50,

View File

@ -0,0 +1,34 @@
package cn.tursom.socket
import cn.tursom.core.pool.HeapMemoryPool
import cn.tursom.socket.server.BufferedNioServer
import kotlinx.coroutines.runBlocking
import java.util.concurrent.ConcurrentHashMap
private val sockets = ConcurrentHashMap<AsyncSocket, Unit>().keySet(Unit)
val handler: suspend BufferedAsyncSocket.() -> Unit = {
sockets.add(this)
try {
while (open) {
val read = read(60 * 1000)
println(read.toString(read.readable))
sockets.forEach {
System.err.println(it)
it.write(read)
}
}
} finally {
sockets.remove(this)
}
}
fun main() {
val server = BufferedNioServer(12345, handler = handler)
server.run()
runBlocking {
BufferedNioSocket(NioSocket("localhost", 12345), server.memoryPool).use { socket ->
socket.write("hello")
println(socket.read().getString())
}
}
}

View File

@ -3,5 +3,5 @@ dependencies {
implementation project(":AsyncSocket")
// kotlin
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2'
}

View File

@ -18,3 +18,5 @@ include 'utils:delegation'
include 'utils:observer'
include 'utils:TrafficForward'
include 'utils:performance-test'
include 'utils:math'
include 'utils:json'

View File

@ -3,5 +3,5 @@ dependencies {
compile project(":socket")
// kotlin
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2'
}

View File

@ -3,6 +3,7 @@ package cn.tursom.socket.server
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.pool.DirectMemoryPool
import cn.tursom.core.pool.MemoryPool
import cn.tursom.core.pool.invoke
import cn.tursom.socket.AsyncNioSocket
/**
@ -14,7 +15,7 @@ class BuffedAsyncNioServer(
port: Int,
memoryPool: MemoryPool,
backlog: Int = 50,
handler: suspend AsyncNioSocket.(buffer: ByteBuffer) -> Unit
handler: suspend AsyncNioSocket.(buffer: ByteBuffer) -> Unit,
) : IAsyncNioServer by AsyncNioServer(port, backlog, {
memoryPool {
handler(it)
@ -25,6 +26,6 @@ class BuffedAsyncNioServer(
blockSize: Int = 1024,
blockCount: Int = 128,
backlog: Int = 50,
handler: suspend AsyncNioSocket.(buffer: ByteBuffer) -> Unit
handler: suspend AsyncNioSocket.(buffer: ByteBuffer) -> Unit,
) : this(port, DirectMemoryPool(blockSize, blockCount), backlog, handler)
}

View File

@ -2,6 +2,7 @@ package cn.tursom.core
import java.util.concurrent.TimeUnit
import java.util.concurrent.ScheduledThreadPoolExecutor
import kotlin.concurrent.thread
object CurrentTimeMillisClock {
@ -11,11 +12,12 @@ object CurrentTimeMillisClock {
val now get() = tick
init {
ScheduledThreadPoolExecutor(1) { runnable ->
val thread = Thread(runnable, "current-time-millis")
thread.isDaemon = true
thread
}.scheduleAtFixedRate({ tick = System.currentTimeMillis() }, 1, 1, TimeUnit.MILLISECONDS)
thread(name = "current-time-millis", isDaemon = true) {
while (true) {
tick = System.currentTimeMillis()
Thread.sleep(1)
}
}
}
//val now get() = System.currentTimeMillis()

View File

@ -0,0 +1,34 @@
package cn.tursom.core
import java.lang.ref.PhantomReference
import java.lang.ref.Reference
import java.lang.ref.ReferenceQueue
import java.util.concurrent.ConcurrentHashMap
import kotlin.concurrent.thread
object Finalized {
private val referenceQueue = ReferenceQueue<Any?>()
private val handlerMap = ConcurrentHashMap<Reference<*>, () -> Unit>()
init {
thread(isDaemon = true) {
while (true) {
val action = handlerMap.remove(referenceQueue.remove() ?: return@thread) ?: continue
try {
action()
} catch (e: Exception) {
}
}
}
}
fun <T> listen(obj: T, action: () -> Unit): Reference<T> = PhantomReference(obj, referenceQueue).also {
handlerMap[it] = action
}
fun remove(reference: Reference<*>) {
handlerMap.remove(reference)
}
}
fun <T> T.finalized(action: () -> Unit) = Finalized.listen(this, action)

View File

@ -6,15 +6,23 @@ import java.lang.reflect.Field
import java.lang.reflect.Member
import java.lang.reflect.Modifier
val fieldModifiers: Field = Field::class.java.getDeclaredField("modifiers").apply {
private val fieldModifiersField: Field? = try {
Field::class.java.getDeclaredField("modifiers").apply {
isAccessible = true
}
} catch (e: Throwable) {
null
}
var fieldModifiers: (Field, Int) -> Unit = { field, modifer ->
fieldModifiersField!!.set(field, modifer)
}
var Field.public: Boolean
get() = Modifier.isPublic(this.modifiers)
set(value) {
val modifier = Modifier.PUBLIC
fieldModifiers.set(
fieldModifiers(
this,
if (value) {
modifiers or modifier
@ -28,7 +36,7 @@ var Field.private: Boolean
get() = Modifier.isPrivate(this.modifiers)
set(value) {
val modifier = Modifier.PRIVATE
fieldModifiers.set(
fieldModifiers(
this,
if (value) {
modifiers or modifier
@ -42,7 +50,7 @@ var Field.protected: Boolean
get() = Modifier.isProtected(this.modifiers)
set(value) {
val modifier = Modifier.PROTECTED
fieldModifiers.set(
fieldModifiers(
this,
if (value) {
modifiers or modifier
@ -56,7 +64,7 @@ var Field.static: Boolean
get() = Modifier.isStatic(this.modifiers)
set(value) {
val modifier = Modifier.STATIC
fieldModifiers.set(
fieldModifiers(
this,
if (value) {
modifiers or modifier
@ -70,7 +78,7 @@ var Field.final: Boolean
get() = Modifier.isFinal(this.modifiers)
set(value) {
val modifier = Modifier.FINAL
fieldModifiers.set(
fieldModifiers(
this,
if (value) {
modifiers or modifier
@ -84,7 +92,7 @@ var Field.synchronized: Boolean
get() = Modifier.isSynchronized(this.modifiers)
set(value) {
val modifier = Modifier.SYNCHRONIZED
fieldModifiers.set(
fieldModifiers(
this,
if (value) {
modifiers or modifier
@ -98,7 +106,7 @@ var Field.volatile: Boolean
get() = Modifier.isVolatile(this.modifiers)
set(value) {
val modifier = Modifier.VOLATILE
fieldModifiers.set(
fieldModifiers(
this,
if (value) {
modifiers or modifier
@ -112,7 +120,7 @@ var Field.transient: Boolean
get() = Modifier.isTransient(this.modifiers)
set(value) {
val modifier = Modifier.TRANSIENT
fieldModifiers.set(
fieldModifiers(
this,
if (value) {
modifiers or modifier
@ -126,7 +134,7 @@ var Field.native: Boolean
get() = Modifier.isNative(this.modifiers)
set(value) {
val modifier = Modifier.NATIVE
fieldModifiers.set(
fieldModifiers(
this,
if (value) {
modifiers or modifier
@ -140,7 +148,7 @@ var Field.`interface`: Boolean
get() = Modifier.isInterface(this.modifiers)
set(value) {
val modifier = Modifier.INTERFACE
fieldModifiers.set(
fieldModifiers(
this,
if (value) {
modifiers or modifier
@ -154,7 +162,7 @@ var Field.abstract: Boolean
get() = Modifier.isAbstract(this.modifiers)
set(value) {
val modifier = Modifier.ABSTRACT
fieldModifiers.set(
fieldModifiers(
this,
if (value) {
modifiers or modifier
@ -168,7 +176,7 @@ var Field.strict: Boolean
get() = Modifier.isStrict(this.modifiers)
set(value) {
val modifier = Modifier.STRICT
fieldModifiers.set(
fieldModifiers(
this,
if (value) {
modifiers or modifier

View File

@ -99,6 +99,7 @@ interface ByteBuffer : Closeable {
fun toString(size: Int): String {
val bytes = getBytes(size)
// 将测试的字节返还回来
readPosition -= bytes.size
return String(bytes)
}

View File

@ -0,0 +1,9 @@
package cn.tursom.core.buffer
class ClosedBufferException : Exception {
constructor() : super()
constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(cause: Throwable?) : super(cause)
constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(message, cause, enableSuppression, writableStackTrace)
}

View File

@ -1,6 +1,4 @@
package cn.tursom.buffer
import cn.tursom.core.buffer.ByteBuffer
package cn.tursom.core.buffer
interface ProxyByteBuffer : ByteBuffer {
val agent: ByteBuffer

View File

@ -0,0 +1,63 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.ClosedBufferException
import cn.tursom.core.buffer.ProxyByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
open class CloseSafeByteBuffer(
override val agent: ByteBuffer,
) : ByteBuffer by agent, ProxyByteBuffer {
/**
* 这个变量保证 buffer 不会被释放多次
*/
private val open = AtomicBoolean(true)
override val closed: Boolean
get() = !open.get()
fun tryClose() = open.compareAndSet(true, false)
override fun readBuffer(): java.nio.ByteBuffer {
if (closed) {
throw ClosedBufferException("byte buffer has closed.")
}
return agent.readBuffer()
}
override fun writeBuffer(): java.nio.ByteBuffer {
if (closed) {
throw ClosedBufferException("byte buffer has closed.")
}
return agent.writeBuffer()
}
override val array: ByteArray
get() {
if (closed) {
throw ClosedBufferException("byte buffer has closed.")
}
return agent.array
}
override fun reset() {
if (closed) {
throw ClosedBufferException("byte buffer has closed.")
}
return agent.reset()
}
override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer {
if (closed) {
throw ClosedBufferException("byte buffer has closed.")
}
return agent.slice(position, size, readPosition, writePosition)
}
override fun resize(newSize: Int): Boolean {
if (closed) {
throw ClosedBufferException("byte buffer has closed.")
}
return agent.resize(newSize)
}
}

View File

@ -1,7 +1,7 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.buffer.ProxyByteBuffer
import cn.tursom.core.buffer.ProxyByteBuffer
import cn.tursom.core.pool.MemoryPool
class InstantByteBuffer(

View File

@ -2,7 +2,7 @@ package cn.tursom.core.buffer.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.buffer.MarkableByteBuffer
import cn.tursom.buffer.ProxyByteBuffer
import cn.tursom.core.buffer.ProxyByteBuffer
class MarkedByteBuffer(override val agent: ByteBuffer) : ProxyByteBuffer, MarkableByteBuffer, ByteBuffer by agent {
private var writeMark = 0

View File

@ -1,36 +1,47 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.buffer.ProxyByteBuffer
import cn.tursom.core.buffer.ProxyByteBuffer
import cn.tursom.core.buffer.ClosedBufferException
import cn.tursom.core.pool.MemoryPool
import java.util.concurrent.atomic.AtomicBoolean
import java.lang.ref.PhantomReference
import java.lang.ref.Reference
import java.lang.ref.ReferenceQueue
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
/**
* 在被垃圾回收时能保证释放占用的内存池内存
*/
class PooledByteBuffer(
override val agent: ByteBuffer,
agent: ByteBuffer,
val pool: MemoryPool,
val token: Int
) : ProxyByteBuffer, ByteBuffer by agent {
/**
* 这个变量保证 buffer 不会被释放多次
*/
private val open = AtomicBoolean(true)
val token: Int,
autoClose: Boolean = false,
) : ProxyByteBuffer, CloseSafeByteBuffer(agent) {
private val reference = if (autoClose) PhantomReference(this, allocatedReferenceQueue) else null
init {
if (reference != null) allocatedMap[reference] = pool to token
}
private val childCount = AtomicInteger(0)
override val resized get() = agent.resized
override val closed: Boolean get() = !open.get() && !resized
override fun close() {
if (tryClose()) {
if (childCount.get() == 0) {
if (open.compareAndSet(true, false)) {
if (reference != null) allocatedMap.remove(reference)
pool.free(this)
}
}
}
override fun resize(newSize: Int): Boolean {
if (closed) {
return false
}
val successful = agent.resize(newSize)
if (successful) {
close()
@ -39,14 +50,35 @@ class PooledByteBuffer(
}
override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer {
if (closed) {
throw ClosedBufferException("PooledByteBuffer has closed.")
}
return SplitByteBuffer(this, childCount, agent.slice(position, size, readPosition, writePosition))
}
override fun toString(): String {
return "PooledByteBuffer(buffer=$agent, pool=$pool, token=$token, open=$open)"
return "PooledByteBuffer(buffer=$agent, pool=$pool, token=$token, closed=$closed)"
}
protected fun finalize() {
pool.free(this)
//protected fun finalize() {
// pool.free(this)
//}
companion object {
private val allocatedReferenceQueue = ReferenceQueue<PooledByteBuffer>()
private val allocatedMap = ConcurrentHashMap<Reference<PooledByteBuffer>, Pair<MemoryPool, Int>>()
init {
thread(isDaemon = true) {
while (true) {
val (pool, token) = allocatedMap.remove(allocatedReferenceQueue.remove() ?: return@thread) ?: continue
try {
pool.free(token)
} catch (e: Exception) {
}
}
}
}
}
}

View File

@ -1,7 +1,7 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.buffer.ProxyByteBuffer
import cn.tursom.core.buffer.ProxyByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

View File

@ -1,8 +1,12 @@
package cn.tursom.core.datastruct
import cn.tursom.core.randomInt
import cn.tursom.core.usingTime
import java.io.Serializable
import java.lang.reflect.Field
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLongArray
import kotlin.random.Random
class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : Serializable {
@Volatile
@ -25,6 +29,7 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S
bitSet.array.forEach { count += it.bitCount }
return count
}
val upCount get() = trueCount
init {
val default = if (defaultState) -1L else 0L
@ -37,17 +42,19 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S
return bitSet[(index shr 6).toInt()] and getArr[index.toInt() and 63] != 0L
}
fun up(index: Long): Boolean {
fun up(index: Long, fromDownToUp: Boolean = true): Boolean {
val arrayIndex = (index shr 6).toInt()
//bitSet[arrayIndex] = bitSet[arrayIndex] or getArr[index.toInt() and 63]
val expect = bitSet[arrayIndex]
var expect = bitSet[arrayIndex]
if (fromDownToUp) expect = expect and setArr[index.toInt() and 63]
return bitSet.compareAndSet(arrayIndex, expect, expect or getArr[index.toInt() and 63])
}
fun down(index: Long): Boolean {
fun down(index: Long, fromUpToDown: Boolean = true): Boolean {
val arrayIndex = (index shr 6).toInt()
//bitSet[arrayIndex] = bitSet[arrayIndex] and setArr[index.toInt() and 63]
val expect = bitSet[arrayIndex]
var expect = bitSet[arrayIndex]
if (fromUpToDown) expect = expect and getArr[index.toInt() and 63]
return bitSet.compareAndSet(arrayIndex, expect, expect and setArr[index.toInt() and 63])
}
@ -80,7 +87,33 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S
}
fun firstUp(): Long {
bitSet.forEachIndexed { index, l ->
return scanUp(0, bitSet.length())
}
fun randomUpIndex(): Long {
val startIndex = Random.nextInt(bitSet.length())
var scan = scanUp(startIndex, bitSet.length() - startIndex)
if (scan >= 0) return scan
scan = scanUp(startIndex - 1, startIndex, false)
if (scan >= 0) return scan
return -1
}
fun firstDown(): Long {
return scanDown(0, bitSet.length())
}
fun getDownIndex(): Long {
val startIndex = Random.nextInt(bitSet.length())
var scan = scanDown(startIndex, bitSet.length() - startIndex)
if (scan >= 0) return scan
scan = scanDown(startIndex - 1, startIndex, false)
if (scan >= 0) return scan
return -1
}
private fun scanUp(fromIndex: Int, length: Int, asc: Boolean = true): Long {
bitSet.forEachIndexed(fromIndex, length, asc) { index, l ->
if (l != 0L) {
for (i in 0 until 8) {
if (l and scanArray[i] != 0L) {
@ -95,8 +128,8 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S
return -1
}
fun firstDown(): Long {
bitSet.forEachIndexed { index, l ->
private fun scanDown(fromIndex: Int, length: Int, asc: Boolean = true): Long {
bitSet.forEachIndexed(fromIndex, length, asc) { index, l ->
if (l != -1L) {
for (i in 0 until 8) {
if (l.inv() and scanArray[i] != 0L) {
@ -177,10 +210,45 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S
array.isAccessible = true
}
inline fun AtomicLongArray.forEachIndexed(action: (index: Int, Long) -> Unit) {
private inline fun AtomicLongArray.forEachIndexed(action: (index: Int, Long) -> Unit) {
repeat(length()) {
action(it, get(it))
}
}
private inline fun AtomicLongArray.forEachIndexed(startIndex: Int, length: Int = length(), asc: Boolean = true, action: (index: Int, Long) -> Unit) {
repeat(length) {
val index = if (asc) {
startIndex + it
} else {
startIndex - it
}
//scand.incrementAndGet()
action(index, get(index))
}
}
}
}
//val scand = AtomicInteger(0)
fun main() {
val size = 1000000
val bitSet = AtomicBitSet(size.toLong())
println(usingTime {
repeat(1000) {
bitSet.downAll()
repeat(size) {
val index = bitSet.getDownIndex()
bitSet.up(index)
repeat(randomInt(0, 3) / 2) {
val randomUpIndex = bitSet.randomUpIndex()
if (randomUpIndex >= 0) {
bitSet.down(randomUpIndex)
}
}
}
}
})
//println(scand.get() / 100)
}

View File

@ -1,14 +1,21 @@
package cn.tursom.core.encrypt
import cn.tursom.core.toHexString
import cn.tursom.core.toUTF8String
import java.security.*
import java.security.interfaces.RSAPublicKey
import java.security.spec.X509EncodedKeySpec
import javax.crypto.Cipher
import kotlin.experimental.xor
import kotlin.math.min
import kotlin.random.Random
@Suppress("unused", "MemberVisibilityCanBePrivate")
abstract class AbstractPublicKeyEncrypt(
val algorithm: String,
final override val publicKey: PublicKey,
final override val privateKey: PrivateKey? = null
final override val privateKey: PrivateKey? = null,
val modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB,
) : PublicKeyEncrypt {
val publicKeyEncoded get() = publicKey.encoded!!
val privateKeyEncoded get() = privateKey?.encoded
@ -39,15 +46,92 @@ abstract class AbstractPublicKeyEncrypt(
override fun signature(digest: String): String = this@AbstractPublicKeyEncrypt.signature(digest)
}
constructor(algorithm: String, keyPair: KeyPair) : this(algorithm, keyPair.public as PublicKey, keyPair.private as PrivateKey)
private val blockCipher: Encrypt = when (modeOfOperation) {
BlockCipherModeOfOperation.ECB -> ECBBlockCipher()
BlockCipherModeOfOperation.CBC -> CBCBlockCipher()
else -> TODO()
}
constructor(algorithm: String, keySize: Int = 1024) : this(algorithm, KeyPairGenerator.getInstance(algorithm).let {
override var encryptInitVector: ByteArray?
get() = blockCipher.encryptInitVector
set(value) {
blockCipher.encryptInitVector = value
}
override var decryptInitVector: ByteArray?
get() = blockCipher.decryptInitVector
set(value) {
blockCipher.decryptInitVector = value
}
constructor(
algorithm: String,
keyPair: KeyPair,
modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB,
) : this(algorithm, keyPair.public as PublicKey, keyPair.private as PrivateKey, modeOfOperation = modeOfOperation)
constructor(
algorithm: String,
keySize: Int = 1024,
modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB,
) : this(
algorithm,
KeyPairGenerator.getInstance(algorithm).let {
it.initialize(keySize)
it.generateKeyPair()
})
},
modeOfOperation = modeOfOperation
)
constructor(algorithm: String, publicKey: ByteArray) : this(algorithm, KeyFactory.getInstance(algorithm).generatePublic(X509EncodedKeySpec(publicKey)) as PublicKey)
constructor(
algorithm: String,
publicKey: ByteArray,
modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB,
) : this(
algorithm,
KeyFactory.getInstance(algorithm).generatePublic(X509EncodedKeySpec(publicKey)) as PublicKey,
modeOfOperation = modeOfOperation
)
override fun encrypt(data: ByteArray, offset: Int, size: Int): ByteArray = blockCipher.encrypt(data, offset, size)
override fun decrypt(data: ByteArray, offset: Int, size: Int): ByteArray = blockCipher.decrypt(data, offset, size)
override fun encrypt(data: ByteArray, buffer: ByteArray, bufferOffset: Int, offset: Int, size: Int): Int = blockCipher.encrypt(data, buffer, bufferOffset, offset, size)
override fun decrypt(data: ByteArray, buffer: ByteArray, bufferOffset: Int, offset: Int, size: Int): Int = blockCipher.decrypt(data, buffer, bufferOffset, offset, size)
protected open fun signature(digest: String) = "${digest}with$algorithm"
override fun sign(data: ByteArray, digest: String): ByteArray {
val signature: Signature = Signature.getInstance(signature(digest))
signature.initSign(privateKey)
signature.update(data)
return signature.sign()
}
override fun verify(data: ByteArray, sign: ByteArray, digest: String): Boolean {
val signature = Signature.getInstance(signature(digest))
signature.initVerify(publicKey)
signature.update(data)
return signature.verify(sign)
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as RSA
if (publicKey != other.publicKey) return false
if (privateKey != other.privateKey) return false
return true
}
override fun hashCode(): Int {
var result = publicKey.hashCode()
result = 31 * result + (privateKey?.hashCode() ?: 0)
return result
}
protected inner class ECBBlockCipher : Encrypt {
override fun encrypt(data: ByteArray, offset: Int, size: Int): ByteArray {
return if (size < encryptMaxLen) {
encryptCipher.doFinal(data, offset, size)
@ -89,7 +173,7 @@ abstract class AbstractPublicKeyEncrypt(
buffer: ByteArray,
cipher: Cipher,
blockSize: Int,
bufferOffset: Int = 0
bufferOffset: Int = 0,
): Int {
var readPosition = offset
var writeIndex = bufferOffset
@ -100,38 +184,108 @@ abstract class AbstractPublicKeyEncrypt(
writeIndex += cipher.doFinal(data, readPosition, size - readPosition, buffer, writeIndex)
return writeIndex - bufferOffset
}
protected open fun signature(digest: String) = "${digest}with$algorithm"
override fun sign(data: ByteArray, digest: String): ByteArray {
val signature: Signature = Signature.getInstance(signature(digest))
signature.initSign(privateKey)
signature.update(data)
return signature.sign()
}
override fun verify(data: ByteArray, sign: ByteArray, digest: String): Boolean {
val signature = Signature.getInstance(signature(digest))
signature.initVerify(publicKey)
signature.update(data)
return signature.verify(sign)
protected inner class CBCBlockCipher : Encrypt {
override var encryptInitVector: ByteArray? = Random.nextBytes(encryptMaxLen)
set(value) {
value ?: return
field = value
encBuf = value
}
override var decryptInitVector: ByteArray? = null
set(value) {
field = value
decBuf = value
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
private var encBuf = encryptInitVector!!
private var decBuf: ByteArray? = decryptInitVector
other as RSA
if (publicKey != other.publicKey) return false
if (privateKey != other.privateKey) return false
return true
override fun encrypt(data: ByteArray, offset: Int, size: Int): ByteArray {
val buffer = ByteArray(((size - 1) / encryptMaxLen + 1) * decryptMaxLen)
//return buffer.copyOf(encrypt(data, buffer, 0, offset, size))
encrypt(data, buffer, 0, offset, size)
return buffer
}
override fun hashCode(): Int {
var result = publicKey.hashCode()
result = 31 * result + (privateKey?.hashCode() ?: 0)
return result
override fun encrypt(data: ByteArray, buffer: ByteArray, bufferOffset: Int, offset: Int, size: Int): Int {
var end = offset
var start: Int
var writeIndex = bufferOffset
do {
start = end
end += encryptMaxLen
end = min(data.size, end)
(0 until end - start).forEach { index ->
encBuf[index] = encBuf[index] xor data[start + index]
}
writeIndex += encryptCipher.doFinal(encBuf, 0, encBuf.size, buffer, writeIndex)
//println("${data.size} $start->$end $writeIndex")
} while (end < offset + size)
return writeIndex - bufferOffset
}
override fun decrypt(data: ByteArray, offset: Int, size: Int): ByteArray {
val decryptInitVector = decBuf!!
var start: Int
var end = offset
val buffer = ByteArray(((size - 1) / decryptMaxLen + 1) * encryptMaxLen + 11)
var writeIndex = 0
do {
start = end
end += decryptMaxLen
end = min(data.size, end)
println("${data.size}, $start->$end, ${buffer.size}, $writeIndex")
val writeIndexBefore = writeIndex
writeIndex += decryptCipher.doFinal(data, start, end - start, buffer, writeIndex)
if (start == 0) {
repeat(encryptMaxLen) {
buffer[it] = buffer[it] xor decryptInitVector[it]
}
} else {
repeat(writeIndex - writeIndexBefore) {
buffer[writeIndexBefore + it] = buffer[writeIndexBefore + it] xor data[start + it]
}
}
} while (end < offset + size)
decBuf = buffer.copyOfRange(buffer.size - encryptMaxLen, buffer.size)
return buffer.copyOf(writeIndex)
}
override fun decrypt(data: ByteArray, buffer: ByteArray, bufferOffset: Int, offset: Int, size: Int): Int {
TODO("Not yet implemented")
}
//private fun doFinal(data: ByteArray, buffer: ByteArray, bufferOffset: Int, offset: Int, size: Int, cipher: Cipher): Int {
// var start = offset
// var end = offset
// var writeIndex = bufferOffset
// do {
// end += decryptMaxLen
// end = min(data.size, end)
// encBuf.indices.forEach { index ->
// encBuf[index] = encBuf[index] xor data[start + index]
// }
// writeIndex += cipher.doFinal(encBuf, 0, encBuf.size, buffer, writeIndex)
// start += decryptMaxLen
// } while (end < offset + size)
// return writeIndex - bufferOffset
//}
}
companion object {
private val random = Random(System.currentTimeMillis())
}
}
fun main() {
val source = "HelloWorld".repeat(100).toByteArray()
val rsa = RSA()
val decodeRsa = rsa.public
decodeRsa.decryptInitVector = rsa.encryptInitVector
val encrypt = rsa.encrypt(source)
//println(encrypt.toHexString())
println(decodeRsa.decrypt(encrypt).toUTF8String())
}

View File

@ -0,0 +1,5 @@
package cn.tursom.core.encrypt
enum class BlockCipherModeOfOperation {
ECB, CBC, CFB, OFB, CTR,
}

View File

@ -3,6 +3,13 @@ package cn.tursom.core.encrypt
import cn.tursom.core.buffer.ByteBuffer
interface Encrypt {
var encryptInitVector: ByteArray?
get() = null
set(_) {}
var decryptInitVector: ByteArray?
get() = null
set(_) {}
fun encrypt(data: ByteArray, offset: Int = 0, size: Int = data.size - offset): ByteArray
fun decrypt(data: ByteArray, offset: Int = 0, size: Int = data.size - offset): ByteArray
fun encrypt(data: ByteArray, buffer: ByteArray, bufferOffset: Int = 0, offset: Int = 0, size: Int = data.size - offset): Int

View File

@ -10,28 +10,49 @@ import java.security.spec.X509EncodedKeySpec
@Suppress("unused", "MemberVisibilityCanBePrivate")
class RSA(
publicKey: RSAPublicKey,
privateKey: RSAPrivateKey? = null
) : AbstractPublicKeyEncrypt("RSA", publicKey, privateKey) {
privateKey: RSAPrivateKey? = null,
modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB,
) : AbstractPublicKeyEncrypt("RSA", publicKey, privateKey, modeOfOperation = modeOfOperation) {
val keySize = publicKey.modulus.bitLength()
override val decryptMaxLen = keySize / 8
override val encryptMaxLen = decryptMaxLen - 11
val keySize get() = (publicKey as RSAPublicKey).modulus.bitLength()
override val decryptMaxLen get() = keySize / 8
override val encryptMaxLen get() = decryptMaxLen - 11
override val public by lazy {
if (privateKey == null) {
this
} else {
RSA(publicKey)
RSA(publicKey, modeOfOperation = modeOfOperation)
}
}
constructor(keyPair: KeyPair) : this(keyPair.public as RSAPublicKey, keyPair.private as RSAPrivateKey)
constructor(
keyPair: KeyPair,
modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB,
) : this(
keyPair.public as RSAPublicKey,
keyPair.private as RSAPrivateKey,
modeOfOperation
)
constructor(keySize: Int = 1024) : this(KeyPairGenerator.getInstance("RSA").let {
constructor(
keySize: Int = 1024,
modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB,
) : this(
KeyPairGenerator.getInstance("RSA").let {
it.initialize(keySize)
it.generateKeyPair()
})
},
modeOfOperation
)
constructor(publicKey: ByteArray) : this(KeyFactory.getInstance("RSA").generatePublic(X509EncodedKeySpec(publicKey)) as RSAPublicKey)
constructor(
publicKey: ByteArray,
modeOfOperation: BlockCipherModeOfOperation = BlockCipherModeOfOperation.ECB,
) : this(
KeyFactory.getInstance("RSA").generatePublic(X509EncodedKeySpec(publicKey)) as RSAPublicKey,
null,
modeOfOperation
)
}

View File

@ -0,0 +1,4 @@
package cn.tursom.core
inline fun <T, R> with(v: T, crossinline action: (T) -> R): () -> R = { action(v) }
inline fun <T1, T2, R> with(v1: T1, v2: T2, crossinline action: (T1, T2) -> R): () -> R = { action(v1, v2) }

View File

@ -8,38 +8,43 @@ import cn.tursom.core.datastruct.AtomicBitSet
abstract class AbstractMemoryPool(
val blockSize: Int,
val blockCount: Int,
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(blockSize) },
private val memoryPool: ByteBuffer
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer,
private val memoryPool: ByteBuffer,
override var autoCollection: Boolean = false,
) : MemoryPool {
private val bitMap = AtomicBitSet(blockCount.toLong())
val allocated: Int get() = bitMap.trueCount.toInt()
private fun getMemory(token: Int): ByteBuffer = synchronized(this) {
PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token)
PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token, autoCollection)
}
/**
* @return token
*/
private fun allocate(): Int {
var index = bitMap.firstDown()
var index = bitMap.getDownIndex()
while (index in 0 until blockCount) {
if (bitMap.up(index)) {
return index.toInt()
}
index = if (bitMap[index]) bitMap.firstDown() else index
index = if (bitMap[index]) bitMap.getDownIndex() else index
}
return -1
}
override fun free(memory: ByteBuffer) {
if (memory is PooledByteBuffer && memory.pool == this) {
val token = memory.token
@Suppress("ControlFlowWithEmptyBody")
if (token in 0 until blockCount) while (!bitMap.down(token.toLong()));
free(memory.token)
}
}
override fun free(token: Int) {
@Suppress("ControlFlowWithEmptyBody")
if (token in 0 until blockCount) while (!bitMap.down(token.toLong(), false));
}
override fun getMemoryOrNull(): ByteBuffer? {
val token = allocate()
return if (token in 0 until blockCount) {

View File

@ -8,7 +8,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer
class DirectMemoryPool(
blockSize: Int = 1024,
blockCount: Int = 16,
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) }
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer
) : AbstractMemoryPool(
blockSize,
blockCount,

View File

@ -8,7 +8,10 @@ import java.util.concurrent.atomic.AtomicBoolean
* 可自动申请新内存空间的内存池
* 线程安全
*/
class ExpandableMemoryPool(val maxPoolCount: Int = -1, private val poolFactory: () -> MemoryPool) : MemoryPool {
class ExpandableMemoryPool(
val maxPoolCount: Int = -1,
private val poolFactory: () -> MemoryPool,
) : MemoryPool {
private val poolList = ConcurrentLinkedQueue<MemoryPool>()
@Volatile
@ -30,7 +33,7 @@ class ExpandableMemoryPool(val maxPoolCount: Int = -1, private val poolFactory:
poolList.add(usingPool)
}
override fun free(memory: ByteBuffer) = throw NotImplementedError("ExpandableMemoryPool won't allocate any memory")
override fun free(memory: ByteBuffer) = Unit
override fun getMemory(): ByteBuffer {
var buffer = usingPool.getMemoryOrNull()

View File

@ -8,7 +8,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer
class HeapMemoryPool(
blockSize: Int = 1024,
blockCount: Int = 16,
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) }
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer
) : AbstractMemoryPool(
blockSize,
blockCount,

View File

@ -8,7 +8,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
class InstantMemoryPool(
val blockSize: Int,
val newMemory: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(blockSize) }
val newMemory: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer
) : MemoryPool {
private val memoryList = ConcurrentLinkedQueue<SoftReference<InstantByteBuffer>>()

View File

@ -10,8 +10,9 @@ import cn.tursom.core.buffer.impl.PooledByteBuffer
*/
abstract class LongBitSetAbstractMemoryPool(
val blockSize: Int,
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(blockSize) },
private val memoryPool: ByteBuffer
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer,
private val memoryPool: ByteBuffer,
override var autoCollection: Boolean = false,
) : MemoryPool {
private val bitMap = LongBitSet()
val allocated: Int get() = bitMap.trueCount.toInt()
@ -22,7 +23,7 @@ abstract class LongBitSetAbstractMemoryPool(
}
private fun getMemory(token: Int): ByteBuffer = synchronized(this) {
return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token)
return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token, autoCollection)
}
/**
@ -41,11 +42,14 @@ abstract class LongBitSetAbstractMemoryPool(
override fun free(memory: ByteBuffer) {
if (memory is PooledByteBuffer && memory.pool == this) {
val token = memory.token
free(memory.token)
}
}
override fun free(token: Int) {
@Suppress("ControlFlowWithEmptyBody")
if (token >= 0) while (!bitMap.down(token));
}
}
override fun getMemoryOrNull(): ByteBuffer? {
val token = allocate()

View File

@ -5,7 +5,7 @@ import cn.tursom.core.buffer.impl.DirectByteBuffer
class LongBitSetDirectMemoryPool(
blockSize: Int,
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { DirectByteBuffer(it) }
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::DirectByteBuffer
) : LongBitSetAbstractMemoryPool(blockSize, emptyPoolBuffer, DirectByteBuffer(64 * blockSize)) {
override fun toString(): String {
return "LongBitSetDirectMemoryPool(blockSize=$blockSize, blockCount=$blockCount, allocated=$allocated)"

View File

@ -5,7 +5,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer
class LongBitSetHeapMemoryPool (
blockSize: Int,
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) }
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer
) : LongBitSetAbstractMemoryPool(blockSize, emptyPoolBuffer, HeapByteBuffer(64 * blockSize)) {
override fun toString(): String {
return "LongBitSetDirectMemoryPool(blockSize=$blockSize, blockCount=$blockCount, allocated=$allocated)"

View File

@ -3,6 +3,7 @@ package cn.tursom.core.pool
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.PooledByteBuffer
import java.io.Closeable
import java.lang.ref.SoftReference
/**
* 可以记录与释放分配内存的内存池
@ -11,21 +12,23 @@ import java.io.Closeable
* 非线程安全
*/
class MarkedMemoryPool(private val pool: MemoryPool) : MemoryPool by pool, Closeable {
private val allocatedList = ArrayList<ByteBuffer>(2)
private val allocatedList = ArrayList<SoftReference<ByteBuffer>>(2)
override fun getMemory(): ByteBuffer {
val memory = pool.getMemory()
allocatedList.add(memory)
allocatedList.add(SoftReference(memory))
return memory
}
override fun getMemoryOrNull(): ByteBuffer? {
val memory = pool.getMemoryOrNull()
if (memory != null) allocatedList.add(memory)
if (memory != null) allocatedList.add(SoftReference(memory))
return memory
}
override fun close() {
allocatedList.forEach(ByteBuffer::close)
allocatedList.forEach {
it.get()?.close()
}
allocatedList.clear()
}
@ -36,12 +39,13 @@ class MarkedMemoryPool(private val pool: MemoryPool) : MemoryPool by pool, Close
override fun toString(): String {
val allocated = ArrayList<Int>(allocatedList.size)
allocatedList.forEach {
if (it is PooledByteBuffer && !it.closed) allocated.add(it.token)
val buffer = it.get()
if (buffer is PooledByteBuffer && !buffer.closed) allocated.add(buffer.token)
}
return "MarkedMemoryPool(pool=$pool, allocated=$allocated)"
}
protected fun finalize() {
close()
}
//protected fun finalize() {
// close()
//}
}

View File

@ -7,24 +7,28 @@ import cn.tursom.core.buffer.ByteBuffer
*/
interface MemoryPool {
val staticSize: Boolean get() = true
var autoCollection: Boolean
get() = false
set(value) {}
// fun allocate(): Int
fun free(memory: ByteBuffer)
fun free(token: Int) = Unit
fun getMemory(): ByteBuffer
fun getMemoryOrNull(): ByteBuffer?
override fun toString(): String
suspend operator fun <T> invoke(action: suspend (ByteBuffer) -> T): T {
return getMemory().use { buffer ->
action(buffer)
}
}
fun get() = getMemory()
operator fun get(blockCount: Int): Array<ByteBuffer> = Array(blockCount) { get() }
fun gc() {}
}
inline operator fun <T> MemoryPool.invoke(action: (ByteBuffer) -> T): T {
return getMemory().use { buffer ->
action(buffer)
}
}

View File

@ -31,7 +31,7 @@ class ScalabilityMemoryPool(private val poolFactory: () -> MemoryPool) : MemoryP
}
}
override fun free(memory: ByteBuffer) = throw NotImplementedError("ExpandableMemoryPool won't allocate any memory")
override fun free(memory: ByteBuffer) = Unit
override fun getMemory(): ByteBuffer {
var buffer = usingPool.getMemoryOrNull()

View File

@ -2,7 +2,9 @@ package cn.tursom.core.pool
import cn.tursom.core.buffer.ByteBuffer
class ThreadLocalMemoryPool(private val poolFactory: () -> MemoryPool) : MemoryPool {
class ThreadLocalMemoryPool(
private val poolFactory: () -> MemoryPool
) : MemoryPool {
private val threadLocal = ThreadLocal<MemoryPool>()
override fun free(memory: ByteBuffer) = throw NotImplementedError("ThreadLocalMemoryPool won't allocate any memory")

View File

@ -8,8 +8,9 @@ import cn.tursom.core.datastruct.ArrayBitSet
abstract class ThreadUnsafeAbstractMemoryPool(
val blockSize: Int,
val blockCount: Int,
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(blockSize) },
private val memoryPool: ByteBuffer
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer,
private val memoryPool: ByteBuffer,
override var autoCollection: Boolean = false,
) : MemoryPool {
private val bitMap = ArrayBitSet(blockCount.toLong())
val allocated: Int get() = bitMap.trueCount.toInt()
@ -29,7 +30,7 @@ abstract class ThreadUnsafeAbstractMemoryPool(
}
private fun unsafeGetMemory(token: Int): ByteBuffer {
return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token)
return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token,autoCollection)
}
/**

View File

@ -8,7 +8,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer
class ThreadUnsafeDirectMemoryPool(
blockSize: Int = 1024,
blockCount: Int = 16,
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) }
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer,
) : ThreadUnsafeAbstractMemoryPool(
blockSize,
blockCount,

View File

@ -8,7 +8,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer
class ThreadUnsafeHeapMemoryPool(
blockSize: Int = 1024,
blockCount: Int = 16,
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) }
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer,
) : ThreadUnsafeAbstractMemoryPool(
blockSize,
blockCount,

View File

@ -1,6 +1,7 @@
package cn.tursom.core.timer
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger
interface Timer {
fun exec(timeout: Long, task: () -> Unit): TimerTask
@ -12,7 +13,7 @@ interface Timer {
fun runNow(taskList: TaskQueue) {
threadPool.execute {
while (true) {
val task = taskList.take() ?: return@execute
val task = taskList.take() ?: break
try {
task()
} catch (e: Throwable) {
@ -29,11 +30,11 @@ interface Timer {
0L, TimeUnit.MILLISECONDS,
LinkedTransferQueue(),
object : ThreadFactory {
var threadNumber = 0
var threadNumber = AtomicInteger(0)
override fun newThread(r: Runnable): Thread {
val thread = Thread(r)
thread.isDaemon = true
thread.name = "timer-worker-$threadNumber"
thread.name = "timer-worker-${threadNumber.incrementAndGet()}"
return thread
}
})

View File

@ -14,10 +14,11 @@ class WheelTimer(
val tick: Long = 200,
val wheelSize: Int = 512,
val name: String = "wheelTimerLooper",
val taskQueueFactory: () -> TaskQueue = { NonLockTaskQueue() }
val taskQueueFactory: () -> TaskQueue = { NonLockTaskQueue() },
) : Timer {
var closed = false
val taskQueueArray = AtomicReferenceArray(Array(wheelSize) { taskQueueFactory() })
@Volatile
private var position = 0
@ -52,7 +53,7 @@ class WheelTimer(
// runNow(outTimeQueue)
//}
thread(isDaemon = true, name = name) {
val startTime = CurrentTimeMillisClock.now
var startTime = CurrentTimeMillisClock.now
while (!closed) {
position %= wheelSize
@ -74,8 +75,10 @@ class WheelTimer(
runNow(outTimeQueue)
val nextSleep = startTime + tick * position - CurrentTimeMillisClock.now
startTime += tick
val nextSleep = startTime - CurrentTimeMillisClock.now
if (nextSleep > 0) sleep(tick)
//else System.err.println("timer has no delay")
}
}
}
@ -87,7 +90,7 @@ class WheelTimer(
var2: Long,
var4: Long,
var6: TimeUnit,
var1: () -> Unit
var1: () -> Unit,
): ScheduledFuture<*> = scheduleAtFixedRate(var1, var2, var4, var6)
}
}

View File

@ -4,7 +4,7 @@ dependencies {
api project(":utils:xml")
// kotlin
//implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2'
// kotlin
//implementation "org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion"
// OkHttp

View File

@ -2,10 +2,11 @@ dependencies {
compile project(":")
api "com.google.code.gson:gson:2.8.2"
// kotlin
api 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1'
api 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2'
// kotlin
api "org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion"
//
api 'org.apache.lucene:lucene-core:4.0.0'
api group: "io.netty", name: "netty-all", version: "4.1.43.Final"
api group: "io.netty", name: "netty-all", version: "4.1.43.Final"
}

0
utils/json/build.gradle Normal file
View File

View File

@ -0,0 +1,204 @@
package cn.tursom.utils.json
import com.sun.org.apache.xalan.internal.lib.ExsltMath.power
import java.lang.RuntimeException
object Json {
class JsonFormatException(message: String? = null) : RuntimeException(message) {
internal constructor(content: JsonParseContent) : this("${content.json}[${content.index}]")
}
fun parse(json: String): Any? {
val content = JsonParseContent(json)
val parse = parse(content)
jumpWhitespace(content)
if (content.index != json.length) throw JsonFormatException("$json[${json[content.index]}] remain characters")
return parse
}
internal data class JsonParseContent(val json: String, var index: Int = 0)
private fun parse(content: JsonParseContent): Any? {
jumpWhitespace(content)
return when (content.json[content.index]) {
'{' -> parseObj(content)
'[' -> parseArray(content)
'"' -> parseString(content)
'+', '-', in '0'..'9' -> parseNumber(content)
't', 'f' -> parseBoolean(content)
'n' -> parseNull(content)
else -> throw JsonFormatException(content)
}
}
private fun parseNull(content: JsonParseContent) = if (content.json.startsWith("null", content.index)) {
content.index += 4
null
} else throw JsonFormatException(content)
@Suppress("ControlFlowWithEmptyBody")
private fun parseBoolean(content: JsonParseContent) = when {
content.json.startsWith("true", content.index) -> {
content.index += 4
true
}
content.json.startsWith("false", content.index) -> {
content.index += 5
false
}
else -> throw JsonFormatException(content)
}
private fun jumpWhitespaceLoopCondition(json: String, index: Int) = index < json.length && json[index] in " \t\r\n"
private fun jumpWhitespace(content: JsonParseContent) {
@Suppress("ControlFlowWithEmptyBody")
if (jumpWhitespaceLoopCondition(content.json, content.index)) while (jumpWhitespaceLoopCondition(content.json, ++content.index));
}
private fun charToInt(char: Char): Int {
val indexOf = char - '0'
if (indexOf < 0 || indexOf > 9) throw JsonFormatException("$char is not an number")
return indexOf
}
private fun parseInt(content: JsonParseContent): Number {
var number = charToInt(content.json[content.index]).toLong()
while (++content.index < content.json.length && content.json[content.index] in '0'..'9') {
number = number * 10 + charToInt(content.json[content.index])
}
return if (number <= Int.MAX_VALUE) number.toInt() else number
}
private fun parseNumber(content: JsonParseContent): Number {
val negative = content.json[content.index] == '-'
if (negative || content.json[content.index] == '+') content.index++
var number: Number = when (content.json[content.index]) {
in '0'..'9' -> parseInt(content)
else -> throw JsonFormatException(content)
}
if (content.index < content.json.length && content.json[content.index] == '.') {
if (++content.index >= content.json.length) throw JsonFormatException(content)
var base = 0.1
var double = charToInt(content.json[content.index]) * base
while (++content.index < content.json.length && content.json[content.index] in '0'..'9') {
base *= 0.1
double += charToInt(content.json[content.index]) * base
}
number = number.toDouble() + double
}
if (content.index < content.json.length && content.json[content.index] in "eE") {
val powerNegative = when (content.json[++content.index]) {
'-' -> true
'+' -> false
else -> {
content.index--
false
}
}
content.index++
number = number.toDouble() * power(10.0, parseInt(content).toLong() * if (powerNegative) -1.0 else 1.0)
}
return if (negative) when (number) {
is Int -> -number
is Long -> -number
else -> -number.toDouble()
} else number
}
private fun parseString(content: JsonParseContent): String {
if (content.json[content.index++] != '"') throw JsonFormatException("string not begin with '\"'")
val builder = StringBuilder()
while (content.index < content.json.length) when (content.json[content.index]) {
'\\' -> {
when (content.json[++content.index]) {
'b' -> builder.append('\b')
'f' -> builder.append('\u000C')
'n' -> builder.append('\n')
'r' -> builder.append('\r')
't' -> builder.append('\t')
'u' -> {
var char = 0
repeat(4) {
val indexOf = "0123456789abcdef".indexOf(content.json[++content.index].toLowerCase())
if (indexOf < 0) throw JsonFormatException(content)
char = char * 16 + indexOf
}
builder.append(char.toChar())
}
else -> builder.append(content.json[content.index])
}
content.index++
}
'"' -> {
content.index++
return builder.toString()
}
else -> builder.append(content.json[content.index++])
}
throw JsonFormatException(content)
}
private fun parseObj(content: JsonParseContent): Map<String, Any?> {
if (content.json[content.index++] != '{') throw JsonFormatException(content)
jumpWhitespace(content)
if (content.json[content.index] == '}') {
content.index++
return emptyMap()
}
val map = HashMap<String, Any?>()
while (true) {
jumpWhitespace(content)
val key = parseString(content)
jumpWhitespace(content)
if (content.json[content.index++] != ':') throw JsonFormatException(content)
map[key] = parse(content)
jumpWhitespace(content)
when (content.json[content.index++]) {
',' -> continue
'}' -> break
else -> throw JsonFormatException("json object not ends with '}'")
}
}
return map
}
private fun parseArray(content: JsonParseContent): List<Any?> {
if (content.json[content.index++] != '[') throw JsonFormatException(content)
jumpWhitespace(content)
if (content.json[content.index] == ']') {
content.index++
return emptyList()
}
val array = ArrayList<Any?>()
while (true) {
array.add(parse(content))
jumpWhitespace(content)
if (content.index >= content.json.length) throw JsonFormatException(content)
when (content.json[content.index++]) {
',' -> continue
']' -> break
else -> throw JsonFormatException(content)
}
}
return array
}
}
//fun main() {
// println(Json.parse(" null "))
// println(Json.parse(" true "))
// println(Json.parse(" false "))
// println(Json.parse(" 123 "))
// println(Json.parse(" -123 "))
// println(Json.parse(" 123.0 "))
// println(Json.parse(" 123.0 "))
// println(Json.parse(" 123e2 "))
// println(Json.parse(" 123e10 "))
// println(Json.parse(" -123.5e10 "))
// println(Json.parse(" \"bb-12\\t3\\\".5e10aa\" "))
// println(Json.parse(" {} "))
// println(Json.parse(" {\"a\":3, \"c\": {}} "))
// println(Json.parse("[1,3, 4 ,\"cc\\u0041\" , true , false , null , {}, {\"a\":\"b\"} , [ ] , [] , {\"a\":3, \"c\": {}, \"b\":[]}]"))
// println(Json.parse(" [1,3,4] "))
//}

2
utils/math/build.gradle Normal file
View File

@ -0,0 +1,2 @@
dependencies {
}

View File

@ -0,0 +1,270 @@
package cn.tursom.math
import java.util.*
import kotlin.math.cos
import kotlin.math.sin
data class Complex(
var r: Double = 0.0,
var i: Double = 0.0,
) {
constructor(r: Int, i: Int) : this(r.toDouble(), i.toDouble())
operator fun plus(complex: Complex) = Complex(r + complex.r, i + complex.i)
operator fun minus(complex: Complex) = Complex(r - complex.r, i - complex.i)
operator fun times(complex: Complex) = Complex(r * complex.r, i * complex.i)
operator fun plusAssign(complex: Complex) {
r += complex.r
i += complex.i
}
operator fun timesAssign(complex: Complex) {
r *= complex.r
i *= complex.i
}
// return abs/modulus/magnitude
fun abs(): Double {
return Math.hypot(r, i)
}
// return angle/phase/argument, normalized to be between -pi and pi
fun phase(): Double {
return Math.atan2(i, r)
}
// return a new object whose value is (this * alpha)
fun scale(alpha: Double): Complex {
return Complex(alpha * r, alpha * i)
}
fun conjugate(): Complex {
return Complex(r, -i)
}
fun reciprocal(): Complex {
val scale = r * r + i * i
return Complex(r / scale, -i / scale)
}
// return the real or imaginary part
fun re(): Double {
return r
}
fun im(): Double {
return r
}
// return a / b
fun divides(b: Complex): Complex {
val a = this
return a.times(b.reciprocal())
}
// return a new Complex object whose value is the complex exponential of this
fun exp(): Complex {
return Complex(Math.exp(r) * Math.cos(i), Math.exp(r) * Math.sin(i))
}
// return a new Complex object whose value is the complex sine of this
fun sin(): Complex {
return Complex(Math.sin(r) * Math.cosh(i), Math.cos(r) * Math.sinh(i))
}
// return a new Complex object whose value is the complex cosine of this
fun cos(): Complex {
return Complex(Math.cos(r) * Math.cosh(i), -Math.sin(r) * Math.sinh(i))
}
// return a new Complex object whose value is the complex tangent of this
fun tan(): Complex {
return sin().divides(cos())
}
override fun toString(): String {
return "($r,$i)"
}
}
object FFT {
fun fft(x: Array<out Complex>): Array<Complex> {
val n = x.size
if (n == 1) return arrayOf(x[0])
require(n % 2 == 0) { "n is not a power of 2" }
val even = Array(n / 2) { k ->
x[2 * k]
}
val evenFFT = fft(even)
for (k in 0 until n / 2) {
even[k] = x[2 * k + 1]
}
val oddFFT = fft(even)
val y = arrayOfNulls<Complex>(n)
for (k in 0 until n / 2) {
val kth = -2 * k * Math.PI / n
val wk = Complex(cos(kth), sin(kth))
y[k] = evenFFT[k].plus(wk.times(oddFFT[k]))
y[k + n / 2] = evenFFT[k].minus(wk.times(oddFFT[k]))
}
@Suppress("UNCHECKED_CAST")
return y as Array<Complex>
}
// compute the inverse FFT of x[], assuming its length n is a power of 2
fun ifft(x: Array<out Complex>): Array<out Complex> {
val n = x.size
var y = Array(n) { i ->
x[i].conjugate()
}
// compute forward FFT
y = fft(y as Array<out Complex>)
// take conjugate again
for (i in 0 until n) {
y[i] = y[i].conjugate()
}
// divide by n
for (i in 0 until n) {
y[i] = y[i].scale(1.0 / n)
}
return y
}
fun cconvolve(x: Array<out Complex>, y: Array<out Complex>): Array<out Complex> {
require(x.size == y.size) { "Dimensions don't agree" }
val n = x.size
val a = fft(x)
val b = fft(y)
val c = Array(n) { i ->
a[i].times(b[i])
}
return ifft(c)
}
fun convolve(x: Array<out Complex>, y: Array<out Complex>): Array<out Complex> {
val ZERO = Complex(0, 0)
val a = Array(2 * x.size) { i ->
if (i in x.indices) {
x[i]
} else {
ZERO
}
}
val b = Array(2 * y.size) { i ->
if (i in y.indices) {
y[i]
} else {
ZERO
}
}
return cconvolve(a, b)
}
// compute the DFT of x[] via brute force (n^2 time)
fun dft(x: Array<out Complex>): Array<out Complex> {
val n = x.size
val ZERO = Complex(0, 0)
val y = Array<Complex>(n) { k ->
val data = ZERO
for (j in 0 until n) {
val power = k * j % n
val kth = -2 * power * Math.PI / n
val wkj = Complex(cos(kth), sin(kth))
data.plusAssign(x[j] * wkj)
}
data
}
return y
}
// display an array of Complex numbers to standard output
fun show(x: Array<out Complex?>, title: String?) {
println(title)
println("-------------------")
for (i in x.indices) {
println(x[i])
}
println()
}
/***************************************************************************
* Test client and sample execution
*
* % java FFT 4
* x
* -------------------
* -0.03480425839330703
* 0.07910192950176387
* 0.7233322451735928
* 0.1659819820667019
*
* y = fft(x)
* -------------------
* 0.9336118983487516
* -0.7581365035668999 + 0.08688005256493803i
* 0.44344407521182005
* -0.7581365035668999 - 0.08688005256493803i
*
* z = ifft(y)
* -------------------
* -0.03480425839330703
* 0.07910192950176387 + 2.6599344570851287E-18i
* 0.7233322451735928
* 0.1659819820667019 - 2.6599344570851287E-18i
*
* c = cconvolve(x, x)
* -------------------
* 0.5506798633981853
* 0.23461407150576394 - 4.033186818023279E-18i
* -0.016542951108772352
* 0.10288019294318276 + 4.033186818023279E-18i
*
* d = convolve(x, x)
* -------------------
* 0.001211336402308083 - 3.122502256758253E-17i
* -0.005506167987577068 - 5.058885073636224E-17i
* -0.044092969479563274 + 2.1934338938072244E-18i
* 0.10288019294318276 - 3.6147323062478115E-17i
* 0.5494685269958772 + 3.122502256758253E-17i
* 0.240120239493341 + 4.655566391833896E-17i
* 0.02755001837079092 - 2.1934338938072244E-18i
* 4.01805098805014E-17i
*
*/
@JvmStatic
fun main(args: Array<String>) {
//val n = args[0].toInt()
val n = 8
val x = Array(n) { i ->
Complex(sin(i.toDouble()), 0.0)
}
show(x, "x")
// FFT of original data
val y = fft(x)
show(y, "y = fft(x)")
// FFT of original data
val y2 = dft(x)
show(y2, "y2 = dft(x)")
// take inverse FFT
val z = ifft(y)
show(z, "z = ifft(y)")
// circular convolution of x with itself
val c = cconvolve(x, x)
show(c, "c = cconvolve(x, x)")
// linear convolution of x with itself
val d = convolve(x, x)
show(d, "d = convolve(x, x)")
}
}

View File

@ -0,0 +1,33 @@
package cn.tursom.math
import kotlin.math.PI
import kotlin.math.cos
import kotlin.math.sin
fun fft1(a: Array<Complex>): Array<Complex> {
if (a.size == 1) return a
val a0 = Array(a.size shr 1) {
a[it shl 1]
}
val a1 = Array(a.size shr 1) {
a[(it shl 1) + 1]
}
fft1(a0)
fft1(a1)
val wn = Complex(cos(2 * PI / a.size), sin(2 * PI / a.size))
val w = Complex(1.0, 0.0)
repeat(a.size shr 1) { k ->
a[k] = a0[k] + w * a1[k]
a[k + (a.size shr 1)] = a0[k] - w * a1[k]
w.plusAssign(wn)
}
return a
}
fun main() {
val source = Array(8) {
Complex(sin(it.toDouble()))
}
println(source.asList())
println(fft1(source).asList())
}

View File

@ -0,0 +1,22 @@
package cn.tursom.utils
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOutboundHandlerAdapter
import io.netty.channel.ChannelPromise
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
@ChannelHandler.Sharable
object WebSocketFrameWrapper : ChannelOutboundHandlerAdapter() {
override fun write(ctx: ChannelHandlerContext, msg: Any?, promise: ChannelPromise?) {
ctx.write(when (msg) {
is String -> TextWebSocketFrame(msg)
is ByteArray -> BinaryWebSocketFrame(Unpooled.wrappedBuffer(msg))
is ByteBuf -> BinaryWebSocketFrame(msg)
else -> msg
}, promise)
}
}

View File

@ -1,6 +1,6 @@
dependencies {
compile project(":")
api project(":log")
api project(":utils")
implementation project(":utils")
compile group: "io.netty", name: "netty-all", version: "4.1.43.Final"
}

View File

@ -1,6 +1,7 @@
package cn.tursom.ws
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.utils.WebSocketFrameWrapper
import cn.tursom.utils.bytebuffer.NettyByteBuffer
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
@ -17,13 +18,24 @@ import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.websocketx.*
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler
import io.netty.handler.logging.LoggingHandler
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.util.InsecureTrustManagerFactory
import java.net.URI
class WebSocketClient(uri: String, val handler: WebSocketHandler) {
private val uri: URI = URI.create(uri)
@Suppress("unused")
class WebSocketClient(
url: String,
val handler: WebSocketHandler,
val autoWrap: Boolean = true,
val log: Boolean = false,
val compressed: Boolean = true,
val maxContextLength: Int = 4096,
private val headers: Map<String, String>? = null,
private val handshakerUri: URI? = null,
) {
private val uri: URI = URI.create(url)
internal var ch: Channel? = null
fun open() {
@ -53,35 +65,52 @@ class WebSocketClient(uri: String, val handler: WebSocketHandler) {
} else {
null
}
val handler = WebSocketClientChannelHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, false, DefaultHttpHeaders()
), this, handler
)
val b = Bootstrap()
b.group(group)
val httpHeaders = DefaultHttpHeaders()
headers?.forEach { (k, v) ->
httpHeaders[k] = v
}
val handshakerAdapter = WebSocketClientHandshakerAdapter(WebSocketClientHandshakerFactory.newHandshaker(
handshakerUri ?: uri, WebSocketVersion.V13, null, true, httpHeaders
), this, handler)
val handler = WebSocketClientChannelHandler(this, handler)
val bootstrap = Bootstrap()
bootstrap.group(group)
.channel(NioSocketChannel::class.java)
.handler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(ch: SocketChannel) {
val p = ch.pipeline()
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port))
ch.pipeline().apply {
if (log) {
addLast(LoggingHandler())
}
if (sslCtx != null) {
addLast(sslCtx.newHandler(ch.alloc(), host, port))
}
addLast(HttpClientCodec())
addLast(HttpObjectAggregator(maxContextLength))
if (compressed) {
addLast(WebSocketClientCompressionHandler.INSTANCE)
}
addLast(handshakerAdapter)
//if (log) {
// addLast(LoggingHandler())
//}
addLast(handler)
if (autoWrap) {
addLast(WebSocketFrameWrapper)
}
}
p.addLast(
HttpClientCodec(),
HttpObjectAggregator(8192),
WebSocketClientCompressionHandler.INSTANCE,
handler
)
}
})
b.connect(uri.host, port)
bootstrap.connect(uri.host, port)
//handler.handshakeFuture().sync()
}
fun close() {
fun close(reasonText: String? = null) {
if (reasonText == null) {
ch?.writeAndFlush(CloseWebSocketFrame())
} else {
ch?.writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus.NORMAL_CLOSURE, reasonText))
}
ch?.closeFuture()?.sync()
}
@ -123,6 +152,44 @@ class WebSocketClient(uri: String, val handler: WebSocketHandler) {
return ch!!.writeAndFlush(TextWebSocketFrame(data))
}
fun ping(data: ByteArray): ChannelFuture {
return ch!!.writeAndFlush(PingWebSocketFrame(Unpooled.wrappedBuffer(data)))
}
fun ping(data: ByteBuffer): ChannelFuture {
return ch!!.writeAndFlush(
PingWebSocketFrame(
when (data) {
is NettyByteBuffer -> data.byteBuf
else -> Unpooled.wrappedBuffer(data.getBytes())
}
)
)
}
fun ping(data: ByteBuf): ChannelFuture {
return ch!!.writeAndFlush(PingWebSocketFrame(data))
}
fun pong(data: ByteArray): ChannelFuture {
return ch!!.writeAndFlush(PongWebSocketFrame(Unpooled.wrappedBuffer(data)))
}
fun pong(data: ByteBuffer): ChannelFuture {
return ch!!.writeAndFlush(
PongWebSocketFrame(
when (data) {
is NettyByteBuffer -> data.byteBuf
else -> Unpooled.wrappedBuffer(data.getBytes())
}
)
)
}
fun pong(data: ByteBuf): ChannelFuture {
return ch!!.writeAndFlush(PongWebSocketFrame(data))
}
companion object {
private val group: EventLoopGroup = NioEventLoopGroup()
}

View File

@ -5,32 +5,14 @@ import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPromise
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker
import io.netty.handler.codec.http.websocketx.*
import io.netty.util.CharsetUtil
class WebSocketClientChannelHandler(
private val handshaker: WebSocketClientHandshaker,
val client: WebSocketClient,
val handler: WebSocketHandler
) : SimpleChannelInboundHandler<Any>() {
private var handshakeFuture: ChannelPromise? = null
fun handshakeFuture(): ChannelFuture? {
return handshakeFuture
}
override fun handlerAdded(ctx: ChannelHandlerContext) {
handshakeFuture = ctx.newPromise()
}
override fun channelActive(ctx: ChannelHandlerContext) {
client.ch = ctx.channel()
handshaker.handshake(ctx.channel())
}
val handler: WebSocketHandler,
) : SimpleChannelInboundHandler<WebSocketFrame>() {
override fun channelInactive(ctx: ChannelHandlerContext) {
handler.onClose(client)
@ -39,34 +21,14 @@ class WebSocketClientChannelHandler(
}
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) {
override fun channelRead0(ctx: ChannelHandlerContext, msg: WebSocketFrame) {
val ch = ctx.channel()
if (!handshaker.isHandshakeComplete) {
// web socket client connected
handshaker.finishHandshake(ch, msg as FullHttpResponse)
handshakeFuture!!.setSuccess()
handler.onOpen(client)
return
}
if (msg is FullHttpResponse) {
throw Exception("Unexpected FullHttpResponse (getStatus=${msg.status()}, content=${msg.content().toString(CharsetUtil.UTF_8)})")
}
when (msg) {
is TextWebSocketFrame -> handler.readMessage(client, msg)
is BinaryWebSocketFrame -> handler.readMessage(client, msg)
is PingWebSocketFrame -> handler.readPing(client, msg)
is PongWebSocketFrame -> handler.readPong(client, msg)
is CloseWebSocketFrame -> ch.close()
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
try {
handler.onError(client, cause)
} catch (e: Exception) {
e.printStackTrace()
if (!handshakeFuture!!.isDone) {
handshakeFuture!!.setFailure(cause)
}
ctx.close()
}
}
}

View File

@ -0,0 +1,52 @@
package cn.tursom.ws
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPromise
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker
import io.netty.util.CharsetUtil
class WebSocketClientHandshakerAdapter(
private val handshaker: WebSocketClientHandshaker,
private val client: WebSocketClient,
private val handler: WebSocketHandler,
) : SimpleChannelInboundHandler<FullHttpResponse>() {
private var handshakeFuture: ChannelPromise? = null
override fun handlerAdded(ctx: ChannelHandlerContext) {
handshakeFuture = ctx.newPromise()
}
override fun channelActive(ctx: ChannelHandlerContext) {
client.ch = ctx.channel()
handshaker.handshake(ctx.channel())
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpResponse) {
if (!handshaker.isHandshakeComplete) {
handshaker.finishHandshake(ctx.channel(), msg)
handshakeFuture!!.setSuccess()
msg.retain()
ctx.fireChannelRead(msg)
handler.onOpen(client)
return
} else {
throw Exception("Unexpected FullHttpResponse (getStatus=${msg.status()}, content=${
msg.content().toString(CharsetUtil.UTF_8)
})")
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
try {
handler.onError(client, cause)
} catch (e: Exception) {
e.printStackTrace()
if (!handshakeFuture!!.isDone) {
handshakeFuture!!.setFailure(cause)
}
ctx.close()
}
}
}

View File

@ -1,9 +1,12 @@
package cn.tursom.ws
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.toUTF8String
import cn.tursom.utils.bytebuffer.NettyByteBuffer
import io.netty.buffer.ByteBuf
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
interface WebSocketHandler {
@ -31,4 +34,42 @@ interface WebSocketHandler {
fun readMessage(client: WebSocketClient, msg: BinaryWebSocketFrame) {
readMessage(client, msg.content())
}
fun readPing(client: WebSocketClient, msg: PingWebSocketFrame) {
readPing(client, msg.content())
}
fun readPing(client: WebSocketClient, msg: ByteBuf) {
readPing(client, NettyByteBuffer(msg))
}
fun readPing(client: WebSocketClient, msg: ByteBuffer) {
readPing(client, msg.getBytes())
}
fun readPing(client: WebSocketClient, msg: ByteArray) {
readPing(client, msg.toUTF8String())
}
fun readPing(client: WebSocketClient, msg: String) {
}
fun readPong(client: WebSocketClient, msg: PongWebSocketFrame) {
readPong(client, msg.content())
}
fun readPong(client: WebSocketClient, msg: ByteBuf) {
readPong(client, NettyByteBuffer(msg))
}
fun readPong(client: WebSocketClient, msg: ByteBuffer) {
readPong(client, msg.getBytes())
}
fun readPong(client: WebSocketClient, msg: ByteArray) {
readPong(client, msg.toUTF8String())
}
fun readPong(client: WebSocketClient, msg: String) {
}
}

View File

@ -1,5 +1,6 @@
package cn.tursom.web.netty
import cn.tursom.utils.WebSocketFrameWrapper
import cn.tursom.web.HttpHandler
import cn.tursom.web.HttpServer
import cn.tursom.web.WebSocketHandler
@ -29,8 +30,9 @@ class NettyHttpServer(
var webSocketPath: Iterable<Pair<String, WebSocketHandler<NettyWebSocketContent>>> = listOf(),
var readTimeout: Int? = 60,
var writeTimeout: Int? = null,
decodeType: NettyHttpDecodeType = NettyHttpDecodeType.MULTI_PART,
backlog: Int = 1024
decodeType: NettyHttpDecodeType = if (webSocketPath.iterator().hasNext()) NettyHttpDecodeType.FULL_HTTP else NettyHttpDecodeType.MULTI_PART,
backlog: Int = 1024,
val wrapWebSocketFrame: Boolean = false,
) : HttpServer {
constructor(
port: Int,
@ -39,14 +41,16 @@ class NettyHttpServer(
webSocketPath: Iterable<Pair<String, WebSocketHandler<NettyWebSocketContent>>> = listOf(),
readTimeout: Int? = 60,
writeTimeout: Int? = null,
decodeType: NettyHttpDecodeType = NettyHttpDecodeType.MULTI_PART,
handler: (content: NettyHttpContent) -> Unit
decodeType: NettyHttpDecodeType = if (webSocketPath.iterator().hasNext()) NettyHttpDecodeType.FULL_HTTP else NettyHttpDecodeType.MULTI_PART,
backlog: Int = 1024,
wrapWebSocketFrame: Boolean = false,
handler: (content: NettyHttpContent) -> Unit,
) : this(
port,
object : HttpHandler<NettyHttpContent, NettyExceptionContent> {
override fun handle(content: NettyHttpContent) = handler(content)
},
bodySize, autoRun, webSocketPath, readTimeout, writeTimeout, decodeType
bodySize, autoRun, webSocketPath, readTimeout, writeTimeout, decodeType, backlog, wrapWebSocketFrame
)
var decodeType: NettyHttpDecodeType = decodeType
@ -84,6 +88,9 @@ class NettyHttpServer(
pipeline.addLast("ws-$webSocketPath", WebSocketServerProtocolHandler(webSocketPath))
pipeline.addLast("wsHandler-$webSocketPath", NettyWebSocketHandler(ch, handler))
}
if (wrapWebSocketFrame && webSocketPath.iterator().hasNext()) {
pipeline.addLast(WebSocketFrameWrapper)
}
pipeline.addLast("handle", httpHandler)
}
})

View File

@ -19,12 +19,13 @@ interface ResponseHeaderAdapter {
mustRevalidate: Boolean = false
) = setResponseHeader(
"Cache-Control", "$cacheControl${
if (maxAge != null && maxAge > 0) ", max-age=$maxAge" else ""}${
if (maxAge != null && maxAge > 0) ", max-age=$maxAge" else ""
}${
if (mustRevalidate) ", must-revalidate" else ""
}"
)
fun addCookie(cookie: Cookie) = addCookie(cookie.name, cookie.value, cookie.maxAge, cookie.domain, cookie.path, cookie.sameSite)
fun addCookie(cookie: Cookie) = addResponseHeader("Set-Cookie", cookie)
fun addCookie(
name: String,
value: Any,
@ -32,15 +33,7 @@ interface ResponseHeaderAdapter {
domain: String? = null,
path: String? = null,
sameSite: SameSite? = null
) = addResponseHeader(
"Set-Cookie",
"$name=$value${
if (maxAge > 0) "; Max-Age=$maxAge" else ""}${
if (domain != null) "; Domain=$domain" else ""}${
if (path != null) "; Path=$path" else ""}${
if (sameSite != null) ": SameSite=$sameSite" else ""
}"
)
) = addCookie(Cookie(name, value.toString(), maxAge = maxAge, domain = domain, path = path, sameSite = sameSite))
fun setLanguage(language: String) {
setResponseHeader("Content-Language", language)

View File

@ -2,6 +2,6 @@ dependencies {
implementation project(":web")
api project(":json")
api group: 'org.slf4j', name: 'slf4j-api', version: '1.7.29'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2'
compile group: 'org.jetbrains.kotlin', name: 'kotlin-reflect', version: kotlinVersion
}