fix thread safe

This commit is contained in:
tursom 2020-10-21 16:18:27 +08:00
parent 8fab6e1918
commit 4643862bb7
12 changed files with 140 additions and 63 deletions

View File

@ -8,8 +8,13 @@ import java.nio.channels.SelectionKey
open class NioDatagram( open class NioDatagram(
override val channel: DatagramChannel, override val channel: DatagramChannel,
override val key: SelectionKey, override val key: SelectionKey,
override val nioThread: NioThread override val nioThread: NioThread,
) : AsyncDatagram { ) : AsyncDatagram {
companion object {
suspend operator fun invoke(host: String, port: Int) = AsyncDatagramClient.connect(host, port)
suspend operator fun invoke(address: SocketAddress) = AsyncDatagramClient.connect(address)
}
override val open: Boolean get() = channel.isOpen && key.isValid override val open: Boolean get() = channel.isOpen && key.isValid
override val remoteAddress: SocketAddress get() = channel.remoteAddress override val remoteAddress: SocketAddress get() = channel.remoteAddress
override fun writeMode() {} override fun writeMode() {}

View File

@ -1,20 +1,33 @@
package cn.tursom.datagram.server package cn.tursom.datagram.server
import cn.tursom.channel.AsyncChannel
import cn.tursom.channel.BufferedAsyncChannel import cn.tursom.channel.BufferedAsyncChannel
import cn.tursom.core.log import cn.tursom.core.log
import cn.tursom.core.logE
import cn.tursom.core.pool.DirectMemoryPool import cn.tursom.core.pool.DirectMemoryPool
import cn.tursom.datagram.AsyncDatagramClient import cn.tursom.datagram.AsyncDatagramClient
import cn.tursom.socket.NioClient import kotlinx.coroutines.GlobalScope
import cn.tursom.socket.NioSocket import kotlinx.coroutines.launch
import cn.tursom.socket.server.BufferedNioServer
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
private val coroutineNumber = AtomicInteger(0)
private val sockets = ConcurrentHashMap<AsyncChannel, Unit>().keySet(Unit)
val echoHandler: suspend BufferedAsyncChannel.() -> Unit = { val echoHandler: suspend BufferedAsyncChannel.() -> Unit = {
while (true) { //System.err.println(coroutineNumber.incrementAndGet())
val buffer = read() sockets.add(this)
log("$this recv from client $remoteAddress: ${buffer.toString(buffer.readable)}") try {
//Throwable().printStackTrace() while (open) {
write(buffer) val buffer = read(60 * 1000)
//log("$this recv from client $remoteAddress: ${buffer.toString(buffer.readable)}")
sockets.forEach {
//System.err.println(it)
it.write(buffer.slice(buffer.readPosition, buffer.readable, writePosition = buffer.readable))
}
}
} finally {
sockets.remove(this)
} }
} }
@ -39,22 +52,27 @@ fun main() {
runBlocking { runBlocking {
val input = System.`in`.bufferedReader() val input = System.`in`.bufferedReader()
var client = AsyncDatagramClient.connect("127.0.0.1", port).getBuffed(pool) var client = AsyncDatagramClient.connect("127.0.0.1", port).getBuffed(pool)
GlobalScope.launch {
while (true) {
val read = try {
client.read(3000)
} catch (e: Exception) {
logE("socket closed")
client.close()
client = AsyncDatagramClient.connect("127.0.0.1", port).getBuffed(pool)
//client.write(line)
client.read(3000)
}
log("recv from server: ${read.getString()}")
read.close()
}
}
while (true) { while (true) {
try { try {
print(">>>") print(">>>")
val line = input.readLine() val line = input.readLine()
if (line.isEmpty()) continue if (line.isEmpty()) continue
client.write(line) client.write(line)
val read = try {
client.read(3000)
} catch (e: Exception) {
client.close()
client = AsyncDatagramClient.connect("127.0.0.1", port).getBuffed(pool)
client.write(line)
client.read(3000)
}
log("recv from server: ${read.getString()}")
read.close()
} catch (e: Exception) { } catch (e: Exception) {
Exception(e).printStackTrace() Exception(e).printStackTrace()
client.close() client.close()

View File

@ -3,12 +3,22 @@ package cn.tursom.socket
import cn.tursom.core.pool.HeapMemoryPool import cn.tursom.core.pool.HeapMemoryPool
import cn.tursom.socket.server.BufferedNioServer import cn.tursom.socket.server.BufferedNioServer
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import java.util.concurrent.ConcurrentHashMap
private val sockets = ConcurrentHashMap<AsyncSocket, Unit>().keySet(Unit)
val handler: suspend BufferedAsyncSocket.() -> Unit = { val handler: suspend BufferedAsyncSocket.() -> Unit = {
while (open) { sockets.add(this)
val read = read() try {
println(read.toString(read.readable)) while (open) {
write(read) val read = read(60 * 1000)
println(read.toString(read.readable))
sockets.forEach {
System.err.println(it)
it.write(read)
}
}
} finally {
sockets.remove(this)
} }
} }

View File

@ -2,6 +2,7 @@ package cn.tursom.core
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.concurrent.ScheduledThreadPoolExecutor
import kotlin.concurrent.thread
object CurrentTimeMillisClock { object CurrentTimeMillisClock {
@ -11,11 +12,12 @@ object CurrentTimeMillisClock {
val now get() = tick val now get() = tick
init { init {
ScheduledThreadPoolExecutor(1) { runnable -> thread(name = "current-time-millis", isDaemon = true) {
val thread = Thread(runnable, "current-time-millis") while (true) {
thread.isDaemon = true tick = System.currentTimeMillis()
thread Thread.sleep(1)
}.scheduleAtFixedRate({ tick = System.currentTimeMillis() }, 1, 1, TimeUnit.MILLISECONDS) }
}
} }
//val now get() = System.currentTimeMillis() //val now get() = System.currentTimeMillis()

View File

@ -3,6 +3,7 @@ package cn.tursom.core.datastruct
import java.io.Serializable import java.io.Serializable
import java.lang.reflect.Field import java.lang.reflect.Field
import java.util.concurrent.atomic.AtomicLongArray import java.util.concurrent.atomic.AtomicLongArray
import kotlin.random.Random
class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : Serializable { class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : Serializable {
@Volatile @Volatile
@ -37,17 +38,19 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S
return bitSet[(index shr 6).toInt()] and getArr[index.toInt() and 63] != 0L return bitSet[(index shr 6).toInt()] and getArr[index.toInt() and 63] != 0L
} }
fun up(index: Long): Boolean { fun up(index: Long, fromDownToUp: Boolean = true): Boolean {
val arrayIndex = (index shr 6).toInt() val arrayIndex = (index shr 6).toInt()
//bitSet[arrayIndex] = bitSet[arrayIndex] or getArr[index.toInt() and 63] //bitSet[arrayIndex] = bitSet[arrayIndex] or getArr[index.toInt() and 63]
val expect = bitSet[arrayIndex] var expect = bitSet[arrayIndex]
if (fromDownToUp) expect = expect and setArr[index.toInt() and 63]
return bitSet.compareAndSet(arrayIndex, expect, expect or getArr[index.toInt() and 63]) return bitSet.compareAndSet(arrayIndex, expect, expect or getArr[index.toInt() and 63])
} }
fun down(index: Long): Boolean { fun down(index: Long, fromUpToDown: Boolean = true): Boolean {
val arrayIndex = (index shr 6).toInt() val arrayIndex = (index shr 6).toInt()
//bitSet[arrayIndex] = bitSet[arrayIndex] and setArr[index.toInt() and 63] //bitSet[arrayIndex] = bitSet[arrayIndex] and setArr[index.toInt() and 63]
val expect = bitSet[arrayIndex] var expect = bitSet[arrayIndex]
if (fromUpToDown) expect = expect and getArr[index.toInt() and 63]
return bitSet.compareAndSet(arrayIndex, expect, expect and setArr[index.toInt() and 63]) return bitSet.compareAndSet(arrayIndex, expect, expect and setArr[index.toInt() and 63])
} }
@ -96,7 +99,20 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S
} }
fun firstDown(): Long { fun firstDown(): Long {
bitSet.forEachIndexed { index, l -> return scanDown(0, bitSet.length())
}
fun getDownIndex(): Long {
val startIndex = Random.nextInt(0, bitSet.length())
var scan = scanDown(startIndex, bitSet.length() - startIndex)
if (scan >= 0) return scan
scan = scanDown(startIndex - 1, startIndex, false)
if (scan >= 0) return scan
return -1
}
private fun scanDown(fromIndex: Int, length: Int, asc: Boolean = true): Long {
bitSet.forEachIndexed(fromIndex, length, asc) { index, l ->
if (l != -1L) { if (l != -1L) {
for (i in 0 until 8) { for (i in 0 until 8) {
if (l.inv() and scanArray[i] != 0L) { if (l.inv() and scanArray[i] != 0L) {
@ -164,9 +180,9 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S
private val Long.bitCount private val Long.bitCount
get() = bitCountArray[toInt().and(0xff)] + bitCountArray[shr(8).toInt().and(0xff)] + get() = bitCountArray[toInt().and(0xff)] + bitCountArray[shr(8).toInt().and(0xff)] +
bitCountArray[shr(16).toInt().and(0xff)] + bitCountArray[shr(24).toInt().and(0xff)] + bitCountArray[shr(16).toInt().and(0xff)] + bitCountArray[shr(24).toInt().and(0xff)] +
bitCountArray[shr(32).toInt().and(0xff)] + bitCountArray[shr(40).toInt().and(0xff)] + bitCountArray[shr(32).toInt().and(0xff)] + bitCountArray[shr(40).toInt().and(0xff)] +
bitCountArray[shr(48).toInt().and(0xff)] + bitCountArray[shr(56).toInt().and(0xff)] bitCountArray[shr(48).toInt().and(0xff)] + bitCountArray[shr(56).toInt().and(0xff)]
private val AtomicLongArray.size get() = length() private val AtomicLongArray.size get() = length()
@ -182,5 +198,16 @@ class AtomicBitSet(beginSize: Long = 256, val defaultState: Boolean = false) : S
action(it, get(it)) action(it, get(it))
} }
} }
inline fun AtomicLongArray.forEachIndexed(startIndex: Int, length: Int = length(), asc: Boolean = true, action: (index: Int, Long) -> Unit) {
repeat(length) {
val index = if (asc) {
startIndex + it
} else {
startIndex - it
}
action(index, get(index))
}
}
} }
} }

View File

@ -10,24 +10,26 @@ abstract class AbstractMemoryPool(
val blockCount: Int, val blockCount: Int,
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer, val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer,
private val memoryPool: ByteBuffer, private val memoryPool: ByteBuffer,
override var autoCollection: Boolean = false,
) : MemoryPool { ) : MemoryPool {
private val bitMap = AtomicBitSet(blockCount.toLong()) private val bitMap = AtomicBitSet(blockCount.toLong())
val allocated: Int get() = bitMap.trueCount.toInt() val allocated: Int get() = bitMap.trueCount.toInt()
private fun getMemory(token: Int): ByteBuffer = synchronized(this) { private fun getMemory(token: Int): ByteBuffer = synchronized(this) {
PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token) PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token, autoCollection)
} }
/** /**
* @return token * @return token
*/ */
private fun allocate(): Int { private fun allocate(): Int {
var index = bitMap.firstDown() var index = bitMap.getDownIndex()
while (index in 0 until blockCount) { while (index in 0 until blockCount) {
if (bitMap.up(index)) { if (bitMap.up(index)) {
return index.toInt() return index.toInt()
} }
index = if (bitMap[index]) bitMap.firstDown() else index index = if (bitMap[index]) bitMap.getDownIndex() else index
} }
return -1 return -1
} }
@ -40,7 +42,7 @@ abstract class AbstractMemoryPool(
override fun free(token: Int) { override fun free(token: Int) {
@Suppress("ControlFlowWithEmptyBody") @Suppress("ControlFlowWithEmptyBody")
if (token in 0 until blockCount) while (!bitMap.down(token.toLong())); if (token in 0 until blockCount) while (!bitMap.down(token.toLong(), false));
} }
override fun getMemoryOrNull(): ByteBuffer? { override fun getMemoryOrNull(): ByteBuffer? {

View File

@ -12,6 +12,7 @@ abstract class LongBitSetAbstractMemoryPool(
val blockSize: Int, val blockSize: Int,
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer, val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer,
private val memoryPool: ByteBuffer, private val memoryPool: ByteBuffer,
override var autoCollection: Boolean = false,
) : MemoryPool { ) : MemoryPool {
private val bitMap = LongBitSet() private val bitMap = LongBitSet()
val allocated: Int get() = bitMap.trueCount.toInt() val allocated: Int get() = bitMap.trueCount.toInt()
@ -22,7 +23,7 @@ abstract class LongBitSetAbstractMemoryPool(
} }
private fun getMemory(token: Int): ByteBuffer = synchronized(this) { private fun getMemory(token: Int): ByteBuffer = synchronized(this) {
return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token) return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token, autoCollection)
} }
/** /**

View File

@ -3,6 +3,7 @@ package cn.tursom.core.pool
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.PooledByteBuffer import cn.tursom.core.buffer.impl.PooledByteBuffer
import java.io.Closeable import java.io.Closeable
import java.lang.ref.SoftReference
/** /**
* 可以记录与释放分配内存的内存池 * 可以记录与释放分配内存的内存池
@ -11,21 +12,23 @@ import java.io.Closeable
* 非线程安全 * 非线程安全
*/ */
class MarkedMemoryPool(private val pool: MemoryPool) : MemoryPool by pool, Closeable { class MarkedMemoryPool(private val pool: MemoryPool) : MemoryPool by pool, Closeable {
private val allocatedList = ArrayList<ByteBuffer>(2) private val allocatedList = ArrayList<SoftReference<ByteBuffer>>(2)
override fun getMemory(): ByteBuffer { override fun getMemory(): ByteBuffer {
val memory = pool.getMemory() val memory = pool.getMemory()
allocatedList.add(memory) allocatedList.add(SoftReference(memory))
return memory return memory
} }
override fun getMemoryOrNull(): ByteBuffer? { override fun getMemoryOrNull(): ByteBuffer? {
val memory = pool.getMemoryOrNull() val memory = pool.getMemoryOrNull()
if (memory != null) allocatedList.add(memory) if (memory != null) allocatedList.add(SoftReference(memory))
return memory return memory
} }
override fun close() { override fun close() {
allocatedList.forEach(ByteBuffer::close) allocatedList.forEach {
it.get()?.close()
}
allocatedList.clear() allocatedList.clear()
} }
@ -36,12 +39,13 @@ class MarkedMemoryPool(private val pool: MemoryPool) : MemoryPool by pool, Close
override fun toString(): String { override fun toString(): String {
val allocated = ArrayList<Int>(allocatedList.size) val allocated = ArrayList<Int>(allocatedList.size)
allocatedList.forEach { allocatedList.forEach {
if (it is PooledByteBuffer && !it.closed) allocated.add(it.token) val buffer = it.get()
if (buffer is PooledByteBuffer && !buffer.closed) allocated.add(buffer.token)
} }
return "MarkedMemoryPool(pool=$pool, allocated=$allocated)" return "MarkedMemoryPool(pool=$pool, allocated=$allocated)"
} }
protected fun finalize() { //protected fun finalize() {
close() // close()
} //}
} }

View File

@ -7,6 +7,9 @@ import cn.tursom.core.buffer.ByteBuffer
*/ */
interface MemoryPool { interface MemoryPool {
val staticSize: Boolean get() = true val staticSize: Boolean get() = true
var autoCollection: Boolean
get() = false
set(value) {}
// fun allocate(): Int // fun allocate(): Int
fun free(memory: ByteBuffer) fun free(memory: ByteBuffer)

View File

@ -10,6 +10,7 @@ abstract class ThreadUnsafeAbstractMemoryPool(
val blockCount: Int, val blockCount: Int,
val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer, val emptyPoolBuffer: (blockSize: Int) -> ByteBuffer = ::HeapByteBuffer,
private val memoryPool: ByteBuffer, private val memoryPool: ByteBuffer,
override var autoCollection: Boolean = false,
) : MemoryPool { ) : MemoryPool {
private val bitMap = ArrayBitSet(blockCount.toLong()) private val bitMap = ArrayBitSet(blockCount.toLong())
val allocated: Int get() = bitMap.trueCount.toInt() val allocated: Int get() = bitMap.trueCount.toInt()
@ -29,7 +30,7 @@ abstract class ThreadUnsafeAbstractMemoryPool(
} }
private fun unsafeGetMemory(token: Int): ByteBuffer { private fun unsafeGetMemory(token: Int): ByteBuffer {
return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token) return PooledByteBuffer(memoryPool.slice(token * blockSize, blockSize), this, token,autoCollection)
} }
/** /**

View File

@ -1,6 +1,7 @@
package cn.tursom.core.timer package cn.tursom.core.timer
import java.util.concurrent.* import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger
interface Timer { interface Timer {
fun exec(timeout: Long, task: () -> Unit): TimerTask fun exec(timeout: Long, task: () -> Unit): TimerTask
@ -12,7 +13,7 @@ interface Timer {
fun runNow(taskList: TaskQueue) { fun runNow(taskList: TaskQueue) {
threadPool.execute { threadPool.execute {
while (true) { while (true) {
val task = taskList.take() ?: return@execute val task = taskList.take() ?: break
try { try {
task() task()
} catch (e: Throwable) { } catch (e: Throwable) {
@ -29,11 +30,11 @@ interface Timer {
0L, TimeUnit.MILLISECONDS, 0L, TimeUnit.MILLISECONDS,
LinkedTransferQueue(), LinkedTransferQueue(),
object : ThreadFactory { object : ThreadFactory {
var threadNumber = 0 var threadNumber = AtomicInteger(0)
override fun newThread(r: Runnable): Thread { override fun newThread(r: Runnable): Thread {
val thread = Thread(r) val thread = Thread(r)
thread.isDaemon = true thread.isDaemon = true
thread.name = "timer-worker-$threadNumber" thread.name = "timer-worker-${threadNumber.incrementAndGet()}"
return thread return thread
} }
}) })

View File

@ -11,13 +11,14 @@ import kotlin.concurrent.thread
@Suppress("CanBeParameter", "MemberVisibilityCanBePrivate") @Suppress("CanBeParameter", "MemberVisibilityCanBePrivate")
class WheelTimer( class WheelTimer(
val tick: Long = 200, val tick: Long = 200,
val wheelSize: Int = 512, val wheelSize: Int = 512,
val name: String = "wheelTimerLooper", val name: String = "wheelTimerLooper",
val taskQueueFactory: () -> TaskQueue = { NonLockTaskQueue() } val taskQueueFactory: () -> TaskQueue = { NonLockTaskQueue() },
) : Timer { ) : Timer {
var closed = false var closed = false
val taskQueueArray = AtomicReferenceArray(Array(wheelSize) { taskQueueFactory() }) val taskQueueArray = AtomicReferenceArray(Array(wheelSize) { taskQueueFactory() })
@Volatile @Volatile
private var position = 0 private var position = 0
@ -52,7 +53,7 @@ class WheelTimer(
// runNow(outTimeQueue) // runNow(outTimeQueue)
//} //}
thread(isDaemon = true, name = name) { thread(isDaemon = true, name = name) {
val startTime = CurrentTimeMillisClock.now var startTime = CurrentTimeMillisClock.now
while (!closed) { while (!closed) {
position %= wheelSize position %= wheelSize
@ -74,8 +75,10 @@ class WheelTimer(
runNow(outTimeQueue) runNow(outTimeQueue)
val nextSleep = startTime + tick * position - CurrentTimeMillisClock.now startTime += tick
val nextSleep = startTime - CurrentTimeMillisClock.now
if (nextSleep > 0) sleep(tick) if (nextSleep > 0) sleep(tick)
//else System.err.println("timer has no delay")
} }
} }
} }
@ -84,10 +87,10 @@ class WheelTimer(
val timer by lazy { WheelTimer(200, 1024) } val timer by lazy { WheelTimer(200, 1024) }
val smoothTimer by lazy { WheelTimer(20, 128) } val smoothTimer by lazy { WheelTimer(20, 128) }
fun ScheduledThreadPoolExecutor.scheduleAtFixedRate( fun ScheduledThreadPoolExecutor.scheduleAtFixedRate(
var2: Long, var2: Long,
var4: Long, var4: Long,
var6: TimeUnit, var6: TimeUnit,
var1: () -> Unit var1: () -> Unit,
): ScheduledFuture<*> = scheduleAtFixedRate(var1, var2, var4, var6) ): ScheduledFuture<*> = scheduleAtFixedRate(var1, var2, var4, var6)
} }
} }