Default Changelist

This commit is contained in:
tursom 2020-10-21 13:29:39 +08:00
parent ff893294ef
commit 8fab6e1918
36 changed files with 598 additions and 102 deletions

View File

@ -22,29 +22,6 @@ interface AsyncNioChannel : AsyncChannel {
val nioThread: NioThread
val channel: SelectableChannel get() = key.channel()
private inline fun <T> operate(action: () -> T): T {
return try {
action()
} catch (e: Exception) {
waitMode()
throw e
}
}
suspend fun <T> write(timeout: Long, action: () -> T): T {
return operate {
waitWrite(timeout)
action()
}
}
suspend fun <T> read(timeout: Long, action: () -> T): T {
return operate {
waitRead(timeout)
action()
}
}
override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long
override suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long): Long
override suspend fun write(buffer: ByteBuffer, timeout: Long): Int = write(arrayOf(buffer), timeout).toInt()
@ -56,7 +33,7 @@ interface AsyncNioChannel : AsyncChannel {
file: FileChannel,
position: Long,
count: Long,
timeout: Long
timeout: Long,
): Long = write(timeout) {
file.transferTo(position, count, channel as WritableByteChannel)
}
@ -65,7 +42,7 @@ interface AsyncNioChannel : AsyncChannel {
file: FileChannel,
position: Long,
count: Long,
timeout: Long
timeout: Long,
): Long = read(timeout) {
file.transferFrom(channel as ReadableByteChannel, position, count)
}
@ -176,4 +153,27 @@ interface AsyncNioChannel : AsyncChannel {
//val timer = StaticWheelTimer.timer
val timer: Timer = WheelTimer.timer
}
}
inline fun <T> AsyncNioChannel.operate(action: () -> T): T {
return try {
action()
} catch (e: Exception) {
waitMode()
throw e
}
}
suspend fun <T> AsyncNioChannel.write(timeout: Long, action: () -> T): T {
return operate {
waitWrite(timeout)
action()
}
}
suspend fun <T> AsyncNioChannel.read(timeout: Long, action: () -> T): T {
return operate {
waitRead(timeout)
action()
}
}

View File

@ -1,6 +1,8 @@
package cn.tursom.datagram
import cn.tursom.channel.AsyncNioChannel
import cn.tursom.channel.read
import cn.tursom.channel.write
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.read
import cn.tursom.core.buffer.write
@ -12,8 +14,7 @@ interface AsyncDatagram : AsyncNioChannel {
override val channel: DatagramChannel
override fun getBuffed(pool: MemoryPool): BufferedAsyncDatagram = BufferedNioDatagram(pool, this)
override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long =
write(timeout) { channel.write(buffer) }
override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long = channel.write(buffer)
override suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long): Long =
read(timeout) { channel.read(buffer) }

View File

@ -14,10 +14,6 @@ open class NioDatagram(
override val remoteAddress: SocketAddress get() = channel.remoteAddress
override fun writeMode() {}
override suspend fun <T> write(timeout: Long, action: () -> T): T {
return action()
}
override fun close() {
if (channel.isOpen || key.isValid) {
nioThread.execute {

View File

@ -2,6 +2,8 @@ package cn.tursom.socket
import cn.tursom.channel.AsyncNioChannel
import cn.tursom.channel.BufferedAsyncChannel
import cn.tursom.channel.read
import cn.tursom.channel.write
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.read
import cn.tursom.core.buffer.write

View File

@ -2,6 +2,8 @@ package cn.tursom.socket
import cn.tursom.channel.AsyncChannel.Companion.emptyBufferCode
import cn.tursom.channel.AsyncChannel.Companion.emptyBufferLongCode
import cn.tursom.channel.read
import cn.tursom.channel.write
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.read
import cn.tursom.core.buffer.write
@ -12,7 +14,18 @@ import java.nio.channels.SocketChannel
/**
* 异步协程套接字对象
*/
class NioSocket(override val key: SelectionKey, override val nioThread: NioThread) : AsyncSocket {
class NioSocket internal constructor(
override val key: SelectionKey,
override val nioThread: NioThread,
) : AsyncSocket {
companion object {
suspend operator fun invoke(
host: String,
port: Int,
timeout: Long = 0,
) = NioClient.connect(host, port, timeout)
}
override val channel: SocketChannel = key.channel() as SocketChannel
override val open: Boolean get() = channel.isOpen && key.isValid

View File

@ -13,7 +13,7 @@ import kotlinx.coroutines.GlobalScope
* 带内存池的 NIO 套接字服务器
* 在处理结束后会自动释放由内存池分配的内存
*/
open class BuffedNioServer(
open class BufferedNioServer(
port: Int,
val memoryPool: MemoryPool,
backlog: Int = 50,

View File

@ -5,14 +5,15 @@ import cn.tursom.core.log
import cn.tursom.core.pool.DirectMemoryPool
import cn.tursom.datagram.AsyncDatagramClient
import cn.tursom.socket.NioClient
import cn.tursom.socket.server.BuffedNioServer
import cn.tursom.socket.NioSocket
import cn.tursom.socket.server.BufferedNioServer
import kotlinx.coroutines.runBlocking
val echoHandler: suspend BufferedAsyncChannel.() -> Unit = {
while (true) {
val buffer = read()
log("$this recv from client $remoteAddress: ${buffer.toString(buffer.readable)}")
Throwable().printStackTrace()
//Throwable().printStackTrace()
write(buffer)
}
}
@ -20,7 +21,7 @@ val echoHandler: suspend BufferedAsyncChannel.() -> Unit = {
fun main() {
val port = 12345
val pool = DirectMemoryPool(1024, 16)
val server = BuffedNioServer(port, pool, handler = echoHandler)
val server = BufferedAsyncDatagramServer(port, pool, handler = echoHandler)
//val server = LoopDatagramServer(port, protocol = object : NioProtocol {
// override fun handleRead(key: SelectionKey, nioThread: NioThread) {
// val datagramChannel = key.channel() as DatagramChannel
@ -37,7 +38,7 @@ fun main() {
runBlocking {
val input = System.`in`.bufferedReader()
var client = NioClient.connect("127.0.0.1", port).getBuffed(pool)
var client = AsyncDatagramClient.connect("127.0.0.1", port).getBuffed(pool)
while (true) {
try {
print(">>>")

View File

@ -0,0 +1,24 @@
package cn.tursom.socket
import cn.tursom.core.pool.HeapMemoryPool
import cn.tursom.socket.server.BufferedNioServer
import kotlinx.coroutines.runBlocking
val handler: suspend BufferedAsyncSocket.() -> Unit = {
while (open) {
val read = read()
println(read.toString(read.readable))
write(read)
}
}
fun main() {
val server = BufferedNioServer(12345, handler = handler)
server.run()
runBlocking {
BufferedNioSocket(NioSocket("localhost", 12345), server.memoryPool).use { socket ->
socket.write("hello")
println(socket.read().getString())
}
}
}

View File

@ -15,4 +15,5 @@ include 'utils:ws-client'
include 'utils:mail'
include 'utils:csv'
include 'utils:TrafficForward'
include 'utils:performance-test'
include 'utils:performance-test'
include 'utils:math'

View File

@ -3,6 +3,7 @@ package cn.tursom.socket.server
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.pool.DirectMemoryPool
import cn.tursom.core.pool.MemoryPool
import cn.tursom.core.pool.invoke
import cn.tursom.socket.AsyncNioSocket
/**
@ -11,20 +12,20 @@ import cn.tursom.socket.AsyncNioSocket
* 当内存池用完之后会换为 ByteArrayByteBuffer
*/
class BuffedAsyncNioServer(
port: Int,
memoryPool: MemoryPool,
backlog: Int = 50,
handler: suspend AsyncNioSocket.(buffer: ByteBuffer) -> Unit
port: Int,
memoryPool: MemoryPool,
backlog: Int = 50,
handler: suspend AsyncNioSocket.(buffer: ByteBuffer) -> Unit,
) : IAsyncNioServer by AsyncNioServer(port, backlog, {
memoryPool {
handler(it)
}
}) {
constructor(
port: Int,
blockSize: Int = 1024,
blockCount: Int = 128,
backlog: Int = 50,
handler: suspend AsyncNioSocket.(buffer: ByteBuffer) -> Unit
port: Int,
blockSize: Int = 1024,
blockCount: Int = 128,
backlog: Int = 50,
handler: suspend AsyncNioSocket.(buffer: ByteBuffer) -> Unit,
) : this(port, DirectMemoryPool(blockSize, blockCount), backlog, handler)
}

View File

@ -0,0 +1,34 @@
package cn.tursom.core
import java.lang.ref.PhantomReference
import java.lang.ref.Reference
import java.lang.ref.ReferenceQueue
import java.util.concurrent.ConcurrentHashMap
import kotlin.concurrent.thread
object Finalized {
private val referenceQueue = ReferenceQueue<Any?>()
private val handlerMap = ConcurrentHashMap<Reference<*>, () -> Unit>()
init {
thread(isDaemon = true) {
while (true) {
val action = handlerMap.remove(referenceQueue.remove() ?: return@thread) ?: continue
try {
action()
} catch (e: Exception) {
}
}
}
}
fun <T> listen(obj: T, action: () -> Unit): Reference<T> = PhantomReference(obj, referenceQueue).also {
handlerMap[it] = action
}
fun remove(reference: Reference<*>) {
handlerMap.remove(reference)
}
}
fun <T> T.finalized(action: () -> Unit) = Finalized.listen(this, action)

View File

@ -0,0 +1,9 @@
package cn.tursom.core.buffer
class ClosedBufferException : Exception {
constructor() : super()
constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(cause: Throwable?) : super(cause)
constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(message, cause, enableSuppression, writableStackTrace)
}

View File

@ -1,6 +1,4 @@
package cn.tursom.buffer
import cn.tursom.core.buffer.ByteBuffer
package cn.tursom.core.buffer
interface ProxyByteBuffer : ByteBuffer {
val agent: ByteBuffer

View File

@ -0,0 +1,63 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.ClosedBufferException
import cn.tursom.core.buffer.ProxyByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
open class CloseSafeByteBuffer(
override val agent: ByteBuffer,
) : ByteBuffer by agent, ProxyByteBuffer {
/**
* 这个变量保证 buffer 不会被释放多次
*/
private val open = AtomicBoolean(true)
override val closed: Boolean
get() = !open.get()
fun tryClose() = open.compareAndSet(true, false)
override fun readBuffer(): java.nio.ByteBuffer {
if (closed) {
throw ClosedBufferException("byte buffer has closed.")
}
return agent.readBuffer()
}
override fun writeBuffer(): java.nio.ByteBuffer {
if (closed) {
throw ClosedBufferException("byte buffer has closed.")
}
return agent.writeBuffer()
}
override val array: ByteArray
get() {
if (closed) {
throw ClosedBufferException("byte buffer has closed.")
}
return agent.array
}
override fun reset() {
if (closed) {
throw ClosedBufferException("byte buffer has closed.")
}
return agent.reset()
}
override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer {
if (closed) {
throw ClosedBufferException("byte buffer has closed.")
}
return agent.slice(position, size, readPosition, writePosition)
}
override fun resize(newSize: Int): Boolean {
if (closed) {
throw ClosedBufferException("byte buffer has closed.")
}
return agent.resize(newSize)
}
}

View File

@ -1,7 +1,7 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.buffer.ProxyByteBuffer
import cn.tursom.core.buffer.ProxyByteBuffer
import cn.tursom.core.pool.MemoryPool
class InstantByteBuffer(

View File

@ -2,7 +2,7 @@ package cn.tursom.core.buffer.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.buffer.MarkableByteBuffer
import cn.tursom.buffer.ProxyByteBuffer
import cn.tursom.core.buffer.ProxyByteBuffer
class MarkedByteBuffer(override val agent: ByteBuffer) : ProxyByteBuffer, MarkableByteBuffer, ByteBuffer by agent {
private var writeMark = 0

View File

@ -1,36 +1,47 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.buffer.ProxyByteBuffer
import cn.tursom.core.buffer.ProxyByteBuffer
import cn.tursom.core.buffer.ClosedBufferException
import cn.tursom.core.pool.MemoryPool
import java.util.concurrent.atomic.AtomicBoolean
import java.lang.ref.PhantomReference
import java.lang.ref.Reference
import java.lang.ref.ReferenceQueue
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
/**
* 在被垃圾回收时能保证释放占用的内存池内存
*/
class PooledByteBuffer(
override val agent: ByteBuffer,
agent: ByteBuffer,
val pool: MemoryPool,
val token: Int
) : ProxyByteBuffer, ByteBuffer by agent {
/**
* 这个变量保证 buffer 不会被释放多次
*/
private val open = AtomicBoolean(true)
val token: Int,
autoClose: Boolean = false,
) : ProxyByteBuffer, CloseSafeByteBuffer(agent) {
private val reference = if (autoClose) PhantomReference(this, allocatedReferenceQueue) else null
init {
if (reference != null) allocatedMap[reference] = pool to token
}
private val childCount = AtomicInteger(0)
override val resized get() = agent.resized
override val closed: Boolean get() = !open.get() && !resized
override fun close() {
if (childCount.get() == 0) {
if (open.compareAndSet(true, false)) {
if (tryClose()) {
if (childCount.get() == 0) {
if (reference != null) allocatedMap.remove(reference)
pool.free(this)
}
}
}
override fun resize(newSize: Int): Boolean {
if (closed) {
return false
}
val successful = agent.resize(newSize)
if (successful) {
close()
@ -39,14 +50,35 @@ class PooledByteBuffer(
}
override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer {
if (closed) {
throw ClosedBufferException("PooledByteBuffer has closed.")
}
return SplitByteBuffer(this, childCount, agent.slice(position, size, readPosition, writePosition))
}
override fun toString(): String {
return "PooledByteBuffer(buffer=$agent, pool=$pool, token=$token, open=$open)"
return "PooledByteBuffer(buffer=$agent, pool=$pool, token=$token, closed=$closed)"
}
protected fun finalize() {
pool.free(this)
//protected fun finalize() {
// pool.free(this)
//}
companion object {
private val allocatedReferenceQueue = ReferenceQueue<PooledByteBuffer>()
private val allocatedMap = ConcurrentHashMap<Reference<PooledByteBuffer>, Pair<MemoryPool, Int>>()
init {
thread(isDaemon = true) {
while (true) {
val (pool, token) = allocatedMap.remove(allocatedReferenceQueue.remove() ?: return@thread) ?: continue
try {
pool.free(token)
} catch (e: Exception) {
}
}
}
}
}
}

View File

@ -1,7 +1,7 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.buffer.ProxyByteBuffer
import cn.tursom.core.buffer.ProxyByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

View File

@ -0,0 +1,4 @@
package cn.tursom.core
inline fun <T, R> with(v: T, crossinline action: (T) -> R): () -> R = { action(v) }
inline fun <T1, T2, R> with(v1: T1, v2: T2, crossinline action: (T1, T2) -> R): () -> R = { action(v1, v2) }

View File

@ -6,10 +6,10 @@ import cn.tursom.core.buffer.impl.PooledByteBuffer
import cn.tursom.core.datastruct.AtomicBitSet
abstract class AbstractMemoryPool(
val blockSize: Int,
val blockCount: Int,
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(blockSize) },
private val memoryPool: ByteBuffer
val blockSize: Int,
val blockCount: Int,
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer,
private val memoryPool: ByteBuffer,
) : MemoryPool {
private val bitMap = AtomicBitSet(blockCount.toLong())
val allocated: Int get() = bitMap.trueCount.toInt()
@ -34,12 +34,15 @@ abstract class AbstractMemoryPool(
override fun free(memory: ByteBuffer) {
if (memory is PooledByteBuffer && memory.pool == this) {
val token = memory.token
@Suppress("ControlFlowWithEmptyBody")
if (token in 0 until blockCount) while (!bitMap.down(token.toLong()));
free(memory.token)
}
}
override fun free(token: Int) {
@Suppress("ControlFlowWithEmptyBody")
if (token in 0 until blockCount) while (!bitMap.down(token.toLong()));
}
override fun getMemoryOrNull(): ByteBuffer? {
val token = allocate()
return if (token in 0 until blockCount) {

View File

@ -8,7 +8,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer
class DirectMemoryPool(
blockSize: Int = 1024,
blockCount: Int = 16,
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) }
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer
) : AbstractMemoryPool(
blockSize,
blockCount,

View File

@ -8,7 +8,10 @@ import java.util.concurrent.atomic.AtomicBoolean
* 可自动申请新内存空间的内存池
* 线程安全
*/
class ExpandableMemoryPool(val maxPoolCount: Int = -1, private val poolFactory: () -> MemoryPool) : MemoryPool {
class ExpandableMemoryPool(
val maxPoolCount: Int = -1,
private val poolFactory: () -> MemoryPool,
) : MemoryPool {
private val poolList = ConcurrentLinkedQueue<MemoryPool>()
@Volatile
@ -30,7 +33,7 @@ class ExpandableMemoryPool(val maxPoolCount: Int = -1, private val poolFactory:
poolList.add(usingPool)
}
override fun free(memory: ByteBuffer) = throw NotImplementedError("ExpandableMemoryPool won't allocate any memory")
override fun free(memory: ByteBuffer) = Unit
override fun getMemory(): ByteBuffer {
var buffer = usingPool.getMemoryOrNull()

View File

@ -8,7 +8,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer
class HeapMemoryPool(
blockSize: Int = 1024,
blockCount: Int = 16,
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) }
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer
) : AbstractMemoryPool(
blockSize,
blockCount,

View File

@ -8,7 +8,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
class InstantMemoryPool(
val blockSize: Int,
val newMemory: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(blockSize) }
val newMemory: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer
) : MemoryPool {
private val memoryList = ConcurrentLinkedQueue<SoftReference<InstantByteBuffer>>()

View File

@ -9,9 +9,9 @@ import cn.tursom.core.buffer.impl.PooledByteBuffer
* 无锁固定容量的内存池
*/
abstract class LongBitSetAbstractMemoryPool(
val blockSize: Int,
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(blockSize) },
private val memoryPool: ByteBuffer
val blockSize: Int,
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer,
private val memoryPool: ByteBuffer,
) : MemoryPool {
private val bitMap = LongBitSet()
val allocated: Int get() = bitMap.trueCount.toInt()
@ -41,12 +41,15 @@ abstract class LongBitSetAbstractMemoryPool(
override fun free(memory: ByteBuffer) {
if (memory is PooledByteBuffer && memory.pool == this) {
val token = memory.token
@Suppress("ControlFlowWithEmptyBody")
if (token >= 0) while (!bitMap.down(token));
free(memory.token)
}
}
override fun free(token: Int) {
@Suppress("ControlFlowWithEmptyBody")
if (token >= 0) while (!bitMap.down(token));
}
override fun getMemoryOrNull(): ByteBuffer? {
val token = allocate()
return if (token >= 0) {

View File

@ -5,7 +5,7 @@ import cn.tursom.core.buffer.impl.DirectByteBuffer
class LongBitSetDirectMemoryPool(
blockSize: Int,
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { DirectByteBuffer(it) }
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::DirectByteBuffer
) : LongBitSetAbstractMemoryPool(blockSize, emptyPoolBuffer, DirectByteBuffer(64 * blockSize)) {
override fun toString(): String {
return "LongBitSetDirectMemoryPool(blockSize=$blockSize, blockCount=$blockCount, allocated=$allocated)"

View File

@ -5,7 +5,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer
class LongBitSetHeapMemoryPool (
blockSize: Int,
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) }
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer
) : LongBitSetAbstractMemoryPool(blockSize, emptyPoolBuffer, HeapByteBuffer(64 * blockSize)) {
override fun toString(): String {
return "LongBitSetDirectMemoryPool(blockSize=$blockSize, blockCount=$blockCount, allocated=$allocated)"

View File

@ -10,21 +10,22 @@ interface MemoryPool {
// fun allocate(): Int
fun free(memory: ByteBuffer)
fun free(token: Int) = Unit
fun getMemory(): ByteBuffer
fun getMemoryOrNull(): ByteBuffer?
override fun toString(): String
suspend operator fun <T> invoke(action: suspend (ByteBuffer) -> T): T {
return getMemory().use { buffer ->
action(buffer)
}
}
fun get() = getMemory()
operator fun get(blockCount: Int): Array<ByteBuffer> = Array(blockCount) { get() }
fun gc() {}
}
inline operator fun <T> MemoryPool.invoke(action: (ByteBuffer) -> T): T {
return getMemory().use { buffer ->
action(buffer)
}
}

View File

@ -31,7 +31,7 @@ class ScalabilityMemoryPool(private val poolFactory: () -> MemoryPool) : MemoryP
}
}
override fun free(memory: ByteBuffer) = throw NotImplementedError("ExpandableMemoryPool won't allocate any memory")
override fun free(memory: ByteBuffer) = Unit
override fun getMemory(): ByteBuffer {
var buffer = usingPool.getMemoryOrNull()

View File

@ -2,7 +2,9 @@ package cn.tursom.core.pool
import cn.tursom.core.buffer.ByteBuffer
class ThreadLocalMemoryPool(private val poolFactory: () -> MemoryPool) : MemoryPool {
class ThreadLocalMemoryPool(
private val poolFactory: () -> MemoryPool
) : MemoryPool {
private val threadLocal = ThreadLocal<MemoryPool>()
override fun free(memory: ByteBuffer) = throw NotImplementedError("ThreadLocalMemoryPool won't allocate any memory")

View File

@ -6,10 +6,10 @@ import cn.tursom.core.buffer.impl.PooledByteBuffer
import cn.tursom.core.datastruct.ArrayBitSet
abstract class ThreadUnsafeAbstractMemoryPool(
val blockSize: Int,
val blockCount: Int,
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(blockSize) },
private val memoryPool: ByteBuffer
val blockSize: Int,
val blockCount: Int,
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer,
private val memoryPool: ByteBuffer,
) : MemoryPool {
private val bitMap = ArrayBitSet(blockCount.toLong())
val allocated: Int get() = bitMap.trueCount.toInt()

View File

@ -8,7 +8,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer
class ThreadUnsafeDirectMemoryPool(
blockSize: Int = 1024,
blockCount: Int = 16,
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) }
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer,
) : ThreadUnsafeAbstractMemoryPool(
blockSize,
blockCount,

View File

@ -8,7 +8,7 @@ import cn.tursom.core.buffer.impl.HeapByteBuffer
class ThreadUnsafeHeapMemoryPool(
blockSize: Int = 1024,
blockCount: Int = 16,
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = { HeapByteBuffer(it) }
emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer,
) : ThreadUnsafeAbstractMemoryPool(
blockSize,
blockCount,

2
utils/math/build.gradle Normal file
View File

@ -0,0 +1,2 @@
dependencies {
}

View File

@ -0,0 +1,270 @@
package cn.tursom.math
import java.util.*
import kotlin.math.cos
import kotlin.math.sin
data class Complex(
var r: Double = 0.0,
var i: Double = 0.0,
) {
constructor(r: Int, i: Int) : this(r.toDouble(), i.toDouble())
operator fun plus(complex: Complex) = Complex(r + complex.r, i + complex.i)
operator fun minus(complex: Complex) = Complex(r - complex.r, i - complex.i)
operator fun times(complex: Complex) = Complex(r * complex.r, i * complex.i)
operator fun plusAssign(complex: Complex) {
r += complex.r
i += complex.i
}
operator fun timesAssign(complex: Complex) {
r *= complex.r
i *= complex.i
}
// return abs/modulus/magnitude
fun abs(): Double {
return Math.hypot(r, i)
}
// return angle/phase/argument, normalized to be between -pi and pi
fun phase(): Double {
return Math.atan2(i, r)
}
// return a new object whose value is (this * alpha)
fun scale(alpha: Double): Complex {
return Complex(alpha * r, alpha * i)
}
fun conjugate(): Complex {
return Complex(r, -i)
}
fun reciprocal(): Complex {
val scale = r * r + i * i
return Complex(r / scale, -i / scale)
}
// return the real or imaginary part
fun re(): Double {
return r
}
fun im(): Double {
return r
}
// return a / b
fun divides(b: Complex): Complex {
val a = this
return a.times(b.reciprocal())
}
// return a new Complex object whose value is the complex exponential of this
fun exp(): Complex {
return Complex(Math.exp(r) * Math.cos(i), Math.exp(r) * Math.sin(i))
}
// return a new Complex object whose value is the complex sine of this
fun sin(): Complex {
return Complex(Math.sin(r) * Math.cosh(i), Math.cos(r) * Math.sinh(i))
}
// return a new Complex object whose value is the complex cosine of this
fun cos(): Complex {
return Complex(Math.cos(r) * Math.cosh(i), -Math.sin(r) * Math.sinh(i))
}
// return a new Complex object whose value is the complex tangent of this
fun tan(): Complex {
return sin().divides(cos())
}
override fun toString(): String {
return "($r,$i)"
}
}
object FFT {
fun fft(x: Array<out Complex>): Array<Complex> {
val n = x.size
if (n == 1) return arrayOf(x[0])
require(n % 2 == 0) { "n is not a power of 2" }
val even = Array(n / 2) { k ->
x[2 * k]
}
val evenFFT = fft(even)
for (k in 0 until n / 2) {
even[k] = x[2 * k + 1]
}
val oddFFT = fft(even)
val y = arrayOfNulls<Complex>(n)
for (k in 0 until n / 2) {
val kth = -2 * k * Math.PI / n
val wk = Complex(cos(kth), sin(kth))
y[k] = evenFFT[k].plus(wk.times(oddFFT[k]))
y[k + n / 2] = evenFFT[k].minus(wk.times(oddFFT[k]))
}
@Suppress("UNCHECKED_CAST")
return y as Array<Complex>
}
// compute the inverse FFT of x[], assuming its length n is a power of 2
fun ifft(x: Array<out Complex>): Array<out Complex> {
val n = x.size
var y = Array(n) { i ->
x[i].conjugate()
}
// compute forward FFT
y = fft(y as Array<out Complex>)
// take conjugate again
for (i in 0 until n) {
y[i] = y[i].conjugate()
}
// divide by n
for (i in 0 until n) {
y[i] = y[i].scale(1.0 / n)
}
return y
}
fun cconvolve(x: Array<out Complex>, y: Array<out Complex>): Array<out Complex> {
require(x.size == y.size) { "Dimensions don't agree" }
val n = x.size
val a = fft(x)
val b = fft(y)
val c = Array(n) { i ->
a[i].times(b[i])
}
return ifft(c)
}
fun convolve(x: Array<out Complex>, y: Array<out Complex>): Array<out Complex> {
val ZERO = Complex(0, 0)
val a = Array(2 * x.size) { i ->
if (i in x.indices) {
x[i]
} else {
ZERO
}
}
val b = Array(2 * y.size) { i ->
if (i in y.indices) {
y[i]
} else {
ZERO
}
}
return cconvolve(a, b)
}
// compute the DFT of x[] via brute force (n^2 time)
fun dft(x: Array<out Complex>): Array<out Complex> {
val n = x.size
val ZERO = Complex(0, 0)
val y = Array<Complex>(n) { k ->
val data = ZERO
for (j in 0 until n) {
val power = k * j % n
val kth = -2 * power * Math.PI / n
val wkj = Complex(cos(kth), sin(kth))
data.plusAssign(x[j] * wkj)
}
data
}
return y
}
// display an array of Complex numbers to standard output
fun show(x: Array<out Complex?>, title: String?) {
println(title)
println("-------------------")
for (i in x.indices) {
println(x[i])
}
println()
}
/***************************************************************************
* Test client and sample execution
*
* % java FFT 4
* x
* -------------------
* -0.03480425839330703
* 0.07910192950176387
* 0.7233322451735928
* 0.1659819820667019
*
* y = fft(x)
* -------------------
* 0.9336118983487516
* -0.7581365035668999 + 0.08688005256493803i
* 0.44344407521182005
* -0.7581365035668999 - 0.08688005256493803i
*
* z = ifft(y)
* -------------------
* -0.03480425839330703
* 0.07910192950176387 + 2.6599344570851287E-18i
* 0.7233322451735928
* 0.1659819820667019 - 2.6599344570851287E-18i
*
* c = cconvolve(x, x)
* -------------------
* 0.5506798633981853
* 0.23461407150576394 - 4.033186818023279E-18i
* -0.016542951108772352
* 0.10288019294318276 + 4.033186818023279E-18i
*
* d = convolve(x, x)
* -------------------
* 0.001211336402308083 - 3.122502256758253E-17i
* -0.005506167987577068 - 5.058885073636224E-17i
* -0.044092969479563274 + 2.1934338938072244E-18i
* 0.10288019294318276 - 3.6147323062478115E-17i
* 0.5494685269958772 + 3.122502256758253E-17i
* 0.240120239493341 + 4.655566391833896E-17i
* 0.02755001837079092 - 2.1934338938072244E-18i
* 4.01805098805014E-17i
*
*/
@JvmStatic
fun main(args: Array<String>) {
//val n = args[0].toInt()
val n = 8
val x = Array(n) { i ->
Complex(sin(i.toDouble()), 0.0)
}
show(x, "x")
// FFT of original data
val y = fft(x)
show(y, "y = fft(x)")
// FFT of original data
val y2 = dft(x)
show(y2, "y2 = dft(x)")
// take inverse FFT
val z = ifft(y)
show(z, "z = ifft(y)")
// circular convolution of x with itself
val c = cconvolve(x, x)
show(c, "c = cconvolve(x, x)")
// linear convolution of x with itself
val d = convolve(x, x)
show(d, "d = convolve(x, x)")
}
}

View File

@ -0,0 +1,33 @@
package cn.tursom.math
import kotlin.math.PI
import kotlin.math.cos
import kotlin.math.sin
fun fft1(a: Array<Complex>): Array<Complex> {
if (a.size == 1) return a
val a0 = Array(a.size shr 1) {
a[it shl 1]
}
val a1 = Array(a.size shr 1) {
a[(it shl 1) + 1]
}
fft1(a0)
fft1(a1)
val wn = Complex(cos(2 * PI / a.size), sin(2 * PI / a.size))
val w = Complex(1.0, 0.0)
repeat(a.size shr 1) { k ->
a[k] = a0[k] + w * a1[k]
a[k + (a.size shr 1)] = a0[k] - w * a1[k]
w.plusAssign(wn)
}
return a
}
fun main() {
val source = Array(8) {
Complex(sin(it.toDouble()))
}
println(source.asList())
println(fft1(source).asList())
}