From 4643862bb779559b5ff677242bab5e33812c30a0 Mon Sep 17 00:00:00 2001 From: tursom Date: Wed, 21 Oct 2020 16:18:27 +0800 Subject: [PATCH] fix thread safe --- .../kotlin/cn/tursom/datagram/NioDatagram.kt | 7 ++- .../kotlin/cn/tursom/datagram/server/test.kt | 54 ++++++++++++------- .../src/test/kotlin/cn/tursom/socket/test.kt | 18 +++++-- .../cn/tursom/core/CurrentTimeMillisClock.kt | 12 +++-- .../cn/tursom/core/datastruct/AtomicBitSet.kt | 43 ++++++++++++--- .../cn/tursom/core/pool/AbstractMemoryPool.kt | 10 ++-- .../core/pool/LongBitSetAbstractMemoryPool.kt | 3 +- .../cn/tursom/core/pool/MarkedMemoryPool.kt | 20 ++++--- .../kotlin/cn/tursom/core/pool/MemoryPool.kt | 3 ++ .../pool/ThreadUnsafeAbstractMemoryPool.kt | 3 +- src/main/kotlin/cn/tursom/core/timer/Timer.kt | 7 +-- .../kotlin/cn/tursom/core/timer/WheelTimer.kt | 23 ++++---- 12 files changed, 140 insertions(+), 63 deletions(-) diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/datagram/NioDatagram.kt b/AsyncSocket/src/main/kotlin/cn/tursom/datagram/NioDatagram.kt index 52373ef..bca2924 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/datagram/NioDatagram.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/datagram/NioDatagram.kt @@ -8,8 +8,13 @@ import java.nio.channels.SelectionKey open class NioDatagram( override val channel: DatagramChannel, override val key: SelectionKey, - override val nioThread: NioThread + override val nioThread: NioThread, ) : AsyncDatagram { + companion object { + suspend operator fun invoke(host: String, port: Int) = AsyncDatagramClient.connect(host, port) + suspend operator fun invoke(address: SocketAddress) = AsyncDatagramClient.connect(address) + } + override val open: Boolean get() = channel.isOpen && key.isValid override val remoteAddress: SocketAddress get() = channel.remoteAddress override fun writeMode() {} diff --git a/AsyncSocket/src/test/kotlin/cn/tursom/datagram/server/test.kt b/AsyncSocket/src/test/kotlin/cn/tursom/datagram/server/test.kt index 3c578fe..dd643f1 100644 --- a/AsyncSocket/src/test/kotlin/cn/tursom/datagram/server/test.kt +++ b/AsyncSocket/src/test/kotlin/cn/tursom/datagram/server/test.kt @@ -1,20 +1,33 @@ package cn.tursom.datagram.server +import cn.tursom.channel.AsyncChannel import cn.tursom.channel.BufferedAsyncChannel import cn.tursom.core.log +import cn.tursom.core.logE import cn.tursom.core.pool.DirectMemoryPool import cn.tursom.datagram.AsyncDatagramClient -import cn.tursom.socket.NioClient -import cn.tursom.socket.NioSocket -import cn.tursom.socket.server.BufferedNioServer +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +private val coroutineNumber = AtomicInteger(0) +private val sockets = ConcurrentHashMap().keySet(Unit) val echoHandler: suspend BufferedAsyncChannel.() -> Unit = { - while (true) { - val buffer = read() - log("$this recv from client $remoteAddress: ${buffer.toString(buffer.readable)}") - //Throwable().printStackTrace() - write(buffer) + //System.err.println(coroutineNumber.incrementAndGet()) + sockets.add(this) + try { + while (open) { + val buffer = read(60 * 1000) + //log("$this recv from client $remoteAddress: ${buffer.toString(buffer.readable)}") + sockets.forEach { + //System.err.println(it) + it.write(buffer.slice(buffer.readPosition, buffer.readable, writePosition = buffer.readable)) + } + } + } finally { + sockets.remove(this) } } @@ -39,22 +52,27 @@ fun main() { runBlocking { val input = System.`in`.bufferedReader() var client = AsyncDatagramClient.connect("127.0.0.1", port).getBuffed(pool) + GlobalScope.launch { + while (true) { + val read = try { + client.read(3000) + } catch (e: Exception) { + logE("socket closed") + client.close() + client = AsyncDatagramClient.connect("127.0.0.1", port).getBuffed(pool) + //client.write(line) + client.read(3000) + } + log("recv from server: ${read.getString()}") + read.close() + } + } while (true) { try { print(">>>") val line = input.readLine() if (line.isEmpty()) continue client.write(line) - val read = try { - client.read(3000) - } catch (e: Exception) { - client.close() - client = AsyncDatagramClient.connect("127.0.0.1", port).getBuffed(pool) - client.write(line) - client.read(3000) - } - log("recv from server: ${read.getString()}") - read.close() } catch (e: Exception) { Exception(e).printStackTrace() client.close() diff --git a/AsyncSocket/src/test/kotlin/cn/tursom/socket/test.kt b/AsyncSocket/src/test/kotlin/cn/tursom/socket/test.kt index 1bf81b6..148aa56 100644 --- a/AsyncSocket/src/test/kotlin/cn/tursom/socket/test.kt +++ b/AsyncSocket/src/test/kotlin/cn/tursom/socket/test.kt @@ -3,12 +3,22 @@ package cn.tursom.socket import cn.tursom.core.pool.HeapMemoryPool import cn.tursom.socket.server.BufferedNioServer import kotlinx.coroutines.runBlocking +import java.util.concurrent.ConcurrentHashMap +private val sockets = ConcurrentHashMap().keySet(Unit) val handler: suspend BufferedAsyncSocket.() -> Unit = { - while (open) { - val read = read() - println(read.toString(read.readable)) - write(read) + sockets.add(this) + try { + while (open) { + val read = read(60 * 1000) + println(read.toString(read.readable)) + sockets.forEach { + System.err.println(it) + it.write(read) + } + } + } finally { + sockets.remove(this) } } diff --git a/src/main/kotlin/cn/tursom/core/CurrentTimeMillisClock.kt b/src/main/kotlin/cn/tursom/core/CurrentTimeMillisClock.kt index 0731255..d23c40e 100644 --- a/src/main/kotlin/cn/tursom/core/CurrentTimeMillisClock.kt +++ b/src/main/kotlin/cn/tursom/core/CurrentTimeMillisClock.kt @@ -2,6 +2,7 @@ package cn.tursom.core import java.util.concurrent.TimeUnit import java.util.concurrent.ScheduledThreadPoolExecutor +import kotlin.concurrent.thread object CurrentTimeMillisClock { @@ -11,11 +12,12 @@ object CurrentTimeMillisClock { val now get() = tick init { - ScheduledThreadPoolExecutor(1) { runnable -> - val thread = Thread(runnable, "current-time-millis") - thread.isDaemon = true - thread - }.scheduleAtFixedRate({ tick = System.currentTimeMillis() }, 1, 1, TimeUnit.MILLISECONDS) + thread(name = "current-time-millis", isDaemon = true) { + while (true) { + tick = System.currentTimeMillis() + Thread.sleep(1) + } + } } //val now get() = System.currentTimeMillis() diff --git a/src/main/kotlin/cn/tursom/core/datastruct/AtomicBitSet.kt b/src/main/kotlin/cn/tursom/core/datastruct/AtomicBitSet.kt index 20e77af..26e1d09 100644 --- a/src/main/kotlin/cn/tursom/core/datastruct/AtomicBitSet.kt +++ b/src/main/kotlin/cn/tursom/core/datastruct/AtomicBitSet.kt @@ -3,6 +3,7 @@ package cn.tursom.core.datastruct import java.io.Serializable import java.lang.reflect.Field import java.util.concurrent.atomic.AtomicLongArray +import kotlin.random.Random class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : Serializable { @Volatile @@ -37,17 +38,19 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S return bitSet[(index shr 6).toInt()] and getArr[index.toInt() and 63] != 0L } - fun up(index: Long): Boolean { + fun up(index: Long, fromDownToUp: Boolean = true): Boolean { val arrayIndex = (index shr 6).toInt() //bitSet[arrayIndex] = bitSet[arrayIndex] or getArr[index.toInt() and 63] - val expect = bitSet[arrayIndex] + var expect = bitSet[arrayIndex] + if (fromDownToUp) expect = expect and setArr[index.toInt() and 63] return bitSet.compareAndSet(arrayIndex, expect, expect or getArr[index.toInt() and 63]) } - fun down(index: Long): Boolean { + fun down(index: Long, fromUpToDown: Boolean = true): Boolean { val arrayIndex = (index shr 6).toInt() //bitSet[arrayIndex] = bitSet[arrayIndex] and setArr[index.toInt() and 63] - val expect = bitSet[arrayIndex] + var expect = bitSet[arrayIndex] + if (fromUpToDown) expect = expect and getArr[index.toInt() and 63] return bitSet.compareAndSet(arrayIndex, expect, expect and setArr[index.toInt() and 63]) } @@ -96,7 +99,20 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S } fun firstDown(): Long { - bitSet.forEachIndexed { index, l -> + return scanDown(0, bitSet.length()) + } + + fun getDownIndex(): Long { + val startIndex = Random.nextInt(0, bitSet.length()) + var scan = scanDown(startIndex, bitSet.length() - startIndex) + if (scan >= 0) return scan + scan = scanDown(startIndex - 1, startIndex, false) + if (scan >= 0) return scan + return -1 + } + + private fun scanDown(fromIndex: Int, length: Int, asc: Boolean = true): Long { + bitSet.forEachIndexed(fromIndex, length, asc) { index, l -> if (l != -1L) { for (i in 0 until 8) { if (l.inv() and scanArray[i] != 0L) { @@ -164,9 +180,9 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S private val Long.bitCount get() = bitCountArray[toInt().and(0xff)] + bitCountArray[shr(8).toInt().and(0xff)] + - bitCountArray[shr(16).toInt().and(0xff)] + bitCountArray[shr(24).toInt().and(0xff)] + - bitCountArray[shr(32).toInt().and(0xff)] + bitCountArray[shr(40).toInt().and(0xff)] + - bitCountArray[shr(48).toInt().and(0xff)] + bitCountArray[shr(56).toInt().and(0xff)] + bitCountArray[shr(16).toInt().and(0xff)] + bitCountArray[shr(24).toInt().and(0xff)] + + bitCountArray[shr(32).toInt().and(0xff)] + bitCountArray[shr(40).toInt().and(0xff)] + + bitCountArray[shr(48).toInt().and(0xff)] + bitCountArray[shr(56).toInt().and(0xff)] private val AtomicLongArray.size get() = length() @@ -182,5 +198,16 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S action(it, get(it)) } } + + inline fun AtomicLongArray.forEachIndexed(startIndex: Int, length: Int = length(), asc: Boolean = true, action: (index: Int, Long) -> Unit) { + repeat(length) { + val index = if (asc) { + startIndex + it + } else { + startIndex - it + } + action(index, get(index)) + } + } } } diff --git a/src/main/kotlin/cn/tursom/core/pool/AbstractMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/AbstractMemoryPool.kt index d7182ed..43112b7 100644 --- a/src/main/kotlin/cn/tursom/core/pool/AbstractMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/AbstractMemoryPool.kt @@ -10,24 +10,26 @@ abstract class AbstractMemoryPool( val blockCount: Int, val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer, private val memoryPool: ByteBuffer, + override var autoCollection: Boolean = false, ) : MemoryPool { private val bitMap = AtomicBitSet(blockCount.toLong()) val allocated: Int get() = bitMap.trueCount.toInt() + private fun getMemory(token: Int): ByteBuffer = synchronized(this) { - PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token) + PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token, autoCollection) } /** * @return token */ private fun allocate(): Int { - var index = bitMap.firstDown() + var index = bitMap.getDownIndex() while (index in 0 until blockCount) { if (bitMap.up(index)) { return index.toInt() } - index = if (bitMap[index]) bitMap.firstDown() else index + index = if (bitMap[index]) bitMap.getDownIndex() else index } return -1 } @@ -40,7 +42,7 @@ abstract class AbstractMemoryPool( override fun free(token: Int) { @Suppress("ControlFlowWithEmptyBody") - if (token in 0 until blockCount) while (!bitMap.down(token.toLong())); + if (token in 0 until blockCount) while (!bitMap.down(token.toLong(), false)); } override fun getMemoryOrNull(): ByteBuffer? { diff --git a/src/main/kotlin/cn/tursom/core/pool/LongBitSetAbstractMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/LongBitSetAbstractMemoryPool.kt index 8690341..0e4e041 100644 --- a/src/main/kotlin/cn/tursom/core/pool/LongBitSetAbstractMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/LongBitSetAbstractMemoryPool.kt @@ -12,6 +12,7 @@ abstract class LongBitSetAbstractMemoryPool( val blockSize: Int, val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer, private val memoryPool: ByteBuffer, + override var autoCollection: Boolean = false, ) : MemoryPool { private val bitMap = LongBitSet() val allocated: Int get() = bitMap.trueCount.toInt() @@ -22,7 +23,7 @@ abstract class LongBitSetAbstractMemoryPool( } private fun getMemory(token: Int): ByteBuffer = synchronized(this) { - return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token) + return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token, autoCollection) } /** diff --git a/src/main/kotlin/cn/tursom/core/pool/MarkedMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/MarkedMemoryPool.kt index bdf4787..1cef893 100644 --- a/src/main/kotlin/cn/tursom/core/pool/MarkedMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/MarkedMemoryPool.kt @@ -3,6 +3,7 @@ package cn.tursom.core.pool import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.impl.PooledByteBuffer import java.io.Closeable +import java.lang.ref.SoftReference /** * 可以记录与释放分配内存的内存池 @@ -11,21 +12,23 @@ import java.io.Closeable * 非线程安全 */ class MarkedMemoryPool(private val pool: MemoryPool) : MemoryPool by pool, Closeable { - private val allocatedList = ArrayList(2) + private val allocatedList = ArrayList>(2) override fun getMemory(): ByteBuffer { val memory = pool.getMemory() - allocatedList.add(memory) + allocatedList.add(SoftReference(memory)) return memory } override fun getMemoryOrNull(): ByteBuffer? { val memory = pool.getMemoryOrNull() - if (memory != null) allocatedList.add(memory) + if (memory != null) allocatedList.add(SoftReference(memory)) return memory } override fun close() { - allocatedList.forEach(ByteBuffer::close) + allocatedList.forEach { + it.get()?.close() + } allocatedList.clear() } @@ -36,12 +39,13 @@ class MarkedMemoryPool(private val pool: MemoryPool) : MemoryPool by pool, Close override fun toString(): String { val allocated = ArrayList(allocatedList.size) allocatedList.forEach { - if (it is PooledByteBuffer && !it.closed) allocated.add(it.token) + val buffer = it.get() + if (buffer is PooledByteBuffer && !buffer.closed) allocated.add(buffer.token) } return "MarkedMemoryPool(pool=$pool, allocated=$allocated)" } - protected fun finalize() { - close() - } + //protected fun finalize() { + // close() + //} } \ No newline at end of file diff --git a/src/main/kotlin/cn/tursom/core/pool/MemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/MemoryPool.kt index 0de3240..af1d0bd 100644 --- a/src/main/kotlin/cn/tursom/core/pool/MemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/MemoryPool.kt @@ -7,6 +7,9 @@ import cn.tursom.core.buffer.ByteBuffer */ interface MemoryPool { val staticSize: Boolean get() = true + var autoCollection: Boolean + get() = false + set(value) {} // fun allocate(): Int fun free(memory: ByteBuffer) diff --git a/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeAbstractMemoryPool.kt b/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeAbstractMemoryPool.kt index 36bef7e..1fd68f5 100644 --- a/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeAbstractMemoryPool.kt +++ b/src/main/kotlin/cn/tursom/core/pool/ThreadUnsafeAbstractMemoryPool.kt @@ -10,6 +10,7 @@ abstract class ThreadUnsafeAbstractMemoryPool( val blockCount: Int, val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer, private val memoryPool: ByteBuffer, + override var autoCollection: Boolean = false, ) : MemoryPool { private val bitMap = ArrayBitSet(blockCount.toLong()) val allocated: Int get() = bitMap.trueCount.toInt() @@ -29,7 +30,7 @@ abstract class ThreadUnsafeAbstractMemoryPool( } private fun unsafeGetMemory(token: Int): ByteBuffer { - return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token) + return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token,autoCollection) } /** diff --git a/src/main/kotlin/cn/tursom/core/timer/Timer.kt b/src/main/kotlin/cn/tursom/core/timer/Timer.kt index ece3f7c..9f2330f 100644 --- a/src/main/kotlin/cn/tursom/core/timer/Timer.kt +++ b/src/main/kotlin/cn/tursom/core/timer/Timer.kt @@ -1,6 +1,7 @@ package cn.tursom.core.timer import java.util.concurrent.* +import java.util.concurrent.atomic.AtomicInteger interface Timer { fun exec(timeout: Long, task: () -> Unit): TimerTask @@ -12,7 +13,7 @@ interface Timer { fun runNow(taskList: TaskQueue) { threadPool.execute { while (true) { - val task = taskList.take() ?: return@execute + val task = taskList.take() ?: break try { task() } catch (e: Throwable) { @@ -29,11 +30,11 @@ interface Timer { 0L, TimeUnit.MILLISECONDS, LinkedTransferQueue(), object : ThreadFactory { - var threadNumber = 0 + var threadNumber = AtomicInteger(0) override fun newThread(r: Runnable): Thread { val thread = Thread(r) thread.isDaemon = true - thread.name = "timer-worker-$threadNumber" + thread.name = "timer-worker-${threadNumber.incrementAndGet()}" return thread } }) diff --git a/src/main/kotlin/cn/tursom/core/timer/WheelTimer.kt b/src/main/kotlin/cn/tursom/core/timer/WheelTimer.kt index 55e92be..db2c6a5 100644 --- a/src/main/kotlin/cn/tursom/core/timer/WheelTimer.kt +++ b/src/main/kotlin/cn/tursom/core/timer/WheelTimer.kt @@ -11,13 +11,14 @@ import kotlin.concurrent.thread @Suppress("CanBeParameter", "MemberVisibilityCanBePrivate") class WheelTimer( - val tick: Long = 200, - val wheelSize: Int = 512, - val name: String = "wheelTimerLooper", - val taskQueueFactory: () -> TaskQueue = { NonLockTaskQueue() } + val tick: Long = 200, + val wheelSize: Int = 512, + val name: String = "wheelTimerLooper", + val taskQueueFactory: () -> TaskQueue = { NonLockTaskQueue() }, ) : Timer { var closed = false val taskQueueArray = AtomicReferenceArray(Array(wheelSize) { taskQueueFactory() }) + @Volatile private var position = 0 @@ -52,7 +53,7 @@ class WheelTimer( // runNow(outTimeQueue) //} thread(isDaemon = true, name = name) { - val startTime = CurrentTimeMillisClock.now + var startTime = CurrentTimeMillisClock.now while (!closed) { position %= wheelSize @@ -74,8 +75,10 @@ class WheelTimer( runNow(outTimeQueue) - val nextSleep = startTime + tick * position - CurrentTimeMillisClock.now + startTime += tick + val nextSleep = startTime - CurrentTimeMillisClock.now if (nextSleep > 0) sleep(tick) + //else System.err.println("timer has no delay") } } } @@ -84,10 +87,10 @@ class WheelTimer( val timer by lazy { WheelTimer(200, 1024) } val smoothTimer by lazy { WheelTimer(20, 128) } fun ScheduledThreadPoolExecutor.scheduleAtFixedRate( - var2: Long, - var4: Long, - var6: TimeUnit, - var1: () -> Unit + var2: Long, + var4: Long, + var6: TimeUnit, + var1: () -> Unit, ): ScheduledFuture<*> = scheduleAtFixedRate(var1, var2, var4, var6) } } \ No newline at end of file