From 84029c86131b61fe3f7dcf42df62bd14859b5178 Mon Sep 17 00:00:00 2001 From: tursom Date: Tue, 13 Jul 2021 10:08:58 +0800 Subject: [PATCH] add FreeReference --- .../kotlin/cn/tursom/core/FreeReference.kt | 41 ++++++++++++++++++ .../kotlin/cn/tursom/core/ShutdownHook.kt | 38 +++++++++------- .../core/buffer/impl/NettyByteBuffer.kt | 18 +++++++- ts-core/ts-pool/build.gradle.kts | 5 ++- .../core/buffer/impl/PooledByteBuffer.kt | 43 ++++++------------- .../cn/tursom/core/ws/WebSocketClient.kt | 7 +-- 6 files changed, 98 insertions(+), 54 deletions(-) create mode 100644 ts-core/src/main/kotlin/cn/tursom/core/FreeReference.kt diff --git a/ts-core/src/main/kotlin/cn/tursom/core/FreeReference.kt b/ts-core/src/main/kotlin/cn/tursom/core/FreeReference.kt new file mode 100644 index 0000000..e92dd4d --- /dev/null +++ b/ts-core/src/main/kotlin/cn/tursom/core/FreeReference.kt @@ -0,0 +1,41 @@ +package cn.tursom.core + +import org.slf4j.LoggerFactory +import java.lang.ref.PhantomReference +import java.lang.ref.ReferenceQueue +import kotlin.concurrent.thread + + +abstract class FreeReference(referent: T) : PhantomReference(referent, referenceQueue) { + companion object { + private val logger = LoggerFactory.getLogger(FreeReference::class.java) + private val referenceQueue = ReferenceQueue() + private val freeThread = thread(isDaemon = true) { + while (true) { + val freeReference = referenceQueue.remove(1000) ?: continue + try { + if (freeReference is FreeReference<*> && !freeReference.cancel) { + freeReference.free() + } + } catch (e: Throwable) { + logger.error("an exception caused on free reference", e) + } + } + } + } + + private var cancel: Boolean = false + + override fun enqueue(): Boolean { + return if (cancel) { + false + } else { + super.enqueue() + } + } + + abstract fun free() + fun cancel() { + cancel = true + } +} diff --git a/ts-core/src/main/kotlin/cn/tursom/core/ShutdownHook.kt b/ts-core/src/main/kotlin/cn/tursom/core/ShutdownHook.kt index 6b2df5b..1499426 100644 --- a/ts-core/src/main/kotlin/cn/tursom/core/ShutdownHook.kt +++ b/ts-core/src/main/kotlin/cn/tursom/core/ShutdownHook.kt @@ -1,7 +1,6 @@ package cn.tursom.core import com.sun.org.slf4j.internal.LoggerFactory -import java.lang.ref.SoftReference import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.atomic.AtomicInteger @@ -13,20 +12,30 @@ import java.util.concurrent.atomic.AtomicInteger object ShutdownHook { private val logger = LoggerFactory.getLogger(ShutdownHook::class.java) - interface Reference { - fun get(): T + internal interface HookReference { + fun get(): (() -> Unit)? } - class Hook( + class Hook internal constructor( private val hook: () -> Unit, - private val reference: Reference<(() -> Unit)?> + private val reference: HookReference, ) { fun cancel() { shutdownHooks.remove(reference) } } - private val shutdownHooks = ConcurrentLinkedDeque Unit)?>>() + private class SoftReference( + hook: () -> Unit, + ) : FreeReference<() -> Unit>(hook) { + override fun free() { + shutdownHooks.removeIf { + it.get() == null + } + } + } + + private val shutdownHooks = ConcurrentLinkedDeque() private val availableThreadCount = Runtime.getRuntime().availableProcessors() * 2 private val activeThreadCount = AtomicInteger() @@ -35,16 +44,13 @@ object ShutdownHook { addWorkThread() } - val reference = if (softReference) { - object : Reference<(() -> Unit)?> { - private val ref = SoftReference(hook) - override fun get(): (() -> Unit)? = ref.get() - } - } else { - object : Reference<() -> Unit> { - override fun get(): () -> Unit = hook - } + val reference = if (softReference) object : HookReference { + private val ref = SoftReference(hook) + override fun get(): (() -> Unit)? = ref.get() + } else object : HookReference { + override fun get(): () -> Unit = hook } + shutdownHooks.add(reference) return Hook(hook, reference) } @@ -56,7 +62,7 @@ object ShutdownHook { try { hook.get()?.invoke() } catch (e: Throwable) { - //error("an exception caused on hook", e) + logger.error("an exception caused on hook", e) } hook = shutdownHooks.poll() } diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt index 6ad15eb..54d4273 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt @@ -1,6 +1,7 @@ package cn.tursom.core.buffer.impl import cn.tursom.core.AsyncFile +import cn.tursom.core.FreeReference import cn.tursom.core.buffer.ByteBuffer import io.netty.buffer.ByteBuf import java.io.OutputStream @@ -9,17 +10,27 @@ import java.util.concurrent.atomic.AtomicBoolean import kotlin.coroutines.suspendCoroutine class NettyByteBuffer( - val byteBuf: ByteBuf + val byteBuf: ByteBuf, + autoClose: Boolean = false, ) : ByteBuffer { constructor( byteBuf: ByteBuf, readPosition: Int = byteBuf.readerIndex(), - writePosition: Int = byteBuf.writerIndex() + writePosition: Int = byteBuf.writerIndex(), ) : this(byteBuf) { this.writePosition = writePosition this.readPosition = readPosition } + class AutoFreeReference( + nettyByteBuffer: NettyByteBuffer, + private val byteBuf: ByteBuf, + ) : FreeReference(nettyByteBuffer) { + override fun free() { + byteBuf.release() + } + } + override val hasArray: Boolean get() = byteBuf.hasArray() override var writePosition: Int get() = byteBuf.writerIndex() @@ -77,6 +88,8 @@ class NettyByteBuffer( } } + private val reference = if (autoClose) AutoFreeReference(this, byteBuf) else null + override fun readBuffer(): java.nio.ByteBuffer { return byteBuf.internalNioBuffer(readPosition, readable).slice() } @@ -180,6 +193,7 @@ class NettyByteBuffer( override fun close() { if (atomicClosed.compareAndSet(false, true)) { byteBuf.release() + reference?.cancel() } } } \ No newline at end of file diff --git a/ts-core/ts-pool/build.gradle.kts b/ts-core/ts-pool/build.gradle.kts index 9d5081f..52c3a95 100644 --- a/ts-core/ts-pool/build.gradle.kts +++ b/ts-core/ts-pool/build.gradle.kts @@ -4,8 +4,9 @@ plugins { } dependencies { - implementation(project(":")) - implementation(project(":ts-core:ts-buffer")) + api(project(":")) + api(project(":ts-core")) + api(project(":ts-core:ts-buffer")) implementation(project(":ts-core:ts-datastruct")) } diff --git a/ts-core/ts-pool/src/main/kotlin/cn/tursom/core/buffer/impl/PooledByteBuffer.kt b/ts-core/ts-pool/src/main/kotlin/cn/tursom/core/buffer/impl/PooledByteBuffer.kt index a3a98ca..2581a4e 100644 --- a/ts-core/ts-pool/src/main/kotlin/cn/tursom/core/buffer/impl/PooledByteBuffer.kt +++ b/ts-core/ts-pool/src/main/kotlin/cn/tursom/core/buffer/impl/PooledByteBuffer.kt @@ -1,15 +1,11 @@ package cn.tursom.core.buffer.impl +import cn.tursom.core.FreeReference import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ClosedBufferException import cn.tursom.core.buffer.ProxyByteBuffer import cn.tursom.core.pool.MemoryPool -import java.lang.ref.PhantomReference -import java.lang.ref.Reference -import java.lang.ref.ReferenceQueue -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger -import kotlin.concurrent.thread /** * 在被垃圾回收时能保证释放占用的内存池内存 @@ -20,19 +16,25 @@ class PooledByteBuffer( val token: Int, autoClose: Boolean = false, ) : ProxyByteBuffer, CloseSafeByteBuffer(agent) { - private val reference = if (autoClose) PhantomReference(this, allocatedReferenceQueue) else null - - init { - if (reference != null) allocatedMap[reference] = pool to token + class AutoFreeReference( + pooledByteBuffer: PooledByteBuffer, + val pool: MemoryPool, + val token: Int, + ) : FreeReference(pooledByteBuffer) { + override fun free() { + pool.free(token) + } } + private val reference = if (autoClose) AutoFreeReference(this, pool, token) else null + private val childCount = AtomicInteger(0) override val resized get() = agent.resized override fun close() { if (tryClose()) { if (childCount.get() == 0) { - if (reference != null) allocatedMap.remove(reference) + reference?.cancel() pool.free(this) } } @@ -60,25 +62,4 @@ class PooledByteBuffer( override fun toString(): String { return "PooledByteBuffer(buffer=$agent, pool=$pool, token=$token, closed=$closed)" } - - //protected fun finalize() { - // pool.free(this) - //} - - companion object { - private val allocatedReferenceQueue = ReferenceQueue() - private val allocatedMap = ConcurrentHashMap, Pair>() - - init { - thread(isDaemon = true) { - while (true) { - val (pool, token) = allocatedMap.remove(allocatedReferenceQueue.remove() ?: return@thread) ?: continue - try { - pool.free(token) - } catch (e: Exception) { - } - } - } - } - } } \ No newline at end of file diff --git a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt index a3a7e22..2c4c1eb 100644 --- a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt +++ b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt @@ -45,11 +45,12 @@ open class WebSocketClient, H : WebSocketHandler() - ShutdownHook.addHook { - close() - } } fun open(): ChannelFuture? {