add FreeReference

This commit is contained in:
tursom 2021-07-13 10:08:58 +08:00
parent 21ad18eecb
commit 84029c8613
6 changed files with 98 additions and 54 deletions

View File

@ -0,0 +1,41 @@
package cn.tursom.core
import org.slf4j.LoggerFactory
import java.lang.ref.PhantomReference
import java.lang.ref.ReferenceQueue
import kotlin.concurrent.thread
abstract class FreeReference<T>(referent: T) : PhantomReference<T>(referent, referenceQueue) {
companion object {
private val logger = LoggerFactory.getLogger(FreeReference::class.java)
private val referenceQueue = ReferenceQueue<Any?>()
private val freeThread = thread(isDaemon = true) {
while (true) {
val freeReference = referenceQueue.remove(1000) ?: continue
try {
if (freeReference is FreeReference<*> && !freeReference.cancel) {
freeReference.free()
}
} catch (e: Throwable) {
logger.error("an exception caused on free reference", e)
}
}
}
}
private var cancel: Boolean = false
override fun enqueue(): Boolean {
return if (cancel) {
false
} else {
super.enqueue()
}
}
abstract fun free()
fun cancel() {
cancel = true
}
}

View File

@ -1,7 +1,6 @@
package cn.tursom.core
import com.sun.org.slf4j.internal.LoggerFactory
import java.lang.ref.SoftReference
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.atomic.AtomicInteger
@ -13,20 +12,30 @@ import java.util.concurrent.atomic.AtomicInteger
object ShutdownHook {
private val logger = LoggerFactory.getLogger(ShutdownHook::class.java)
interface Reference<out T> {
fun get(): T
internal interface HookReference {
fun get(): (() -> Unit)?
}
class Hook(
class Hook internal constructor(
private val hook: () -> Unit,
private val reference: Reference<(() -> Unit)?>
private val reference: HookReference,
) {
fun cancel() {
shutdownHooks.remove(reference)
}
}
private val shutdownHooks = ConcurrentLinkedDeque<Reference<(() -> Unit)?>>()
private class SoftReference(
hook: () -> Unit,
) : FreeReference<() -> Unit>(hook) {
override fun free() {
shutdownHooks.removeIf {
it.get() == null
}
}
}
private val shutdownHooks = ConcurrentLinkedDeque<HookReference>()
private val availableThreadCount = Runtime.getRuntime().availableProcessors() * 2
private val activeThreadCount = AtomicInteger()
@ -35,16 +44,13 @@ object ShutdownHook {
addWorkThread()
}
val reference = if (softReference) {
object : Reference<(() -> Unit)?> {
private val ref = SoftReference(hook)
override fun get(): (() -> Unit)? = ref.get()
}
} else {
object : Reference<() -> Unit> {
override fun get(): () -> Unit = hook
}
val reference = if (softReference) object : HookReference {
private val ref = SoftReference(hook)
override fun get(): (() -> Unit)? = ref.get()
} else object : HookReference {
override fun get(): () -> Unit = hook
}
shutdownHooks.add(reference)
return Hook(hook, reference)
}
@ -56,7 +62,7 @@ object ShutdownHook {
try {
hook.get()?.invoke()
} catch (e: Throwable) {
//error("an exception caused on hook", e)
logger.error("an exception caused on hook", e)
}
hook = shutdownHooks.poll()
}

View File

@ -1,6 +1,7 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.AsyncFile
import cn.tursom.core.FreeReference
import cn.tursom.core.buffer.ByteBuffer
import io.netty.buffer.ByteBuf
import java.io.OutputStream
@ -9,17 +10,27 @@ import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.suspendCoroutine
class NettyByteBuffer(
val byteBuf: ByteBuf
val byteBuf: ByteBuf,
autoClose: Boolean = false,
) : ByteBuffer {
constructor(
byteBuf: ByteBuf,
readPosition: Int = byteBuf.readerIndex(),
writePosition: Int = byteBuf.writerIndex()
writePosition: Int = byteBuf.writerIndex(),
) : this(byteBuf) {
this.writePosition = writePosition
this.readPosition = readPosition
}
class AutoFreeReference(
nettyByteBuffer: NettyByteBuffer,
private val byteBuf: ByteBuf,
) : FreeReference<NettyByteBuffer>(nettyByteBuffer) {
override fun free() {
byteBuf.release()
}
}
override val hasArray: Boolean get() = byteBuf.hasArray()
override var writePosition: Int
get() = byteBuf.writerIndex()
@ -77,6 +88,8 @@ class NettyByteBuffer(
}
}
private val reference = if (autoClose) AutoFreeReference(this, byteBuf) else null
override fun readBuffer(): java.nio.ByteBuffer {
return byteBuf.internalNioBuffer(readPosition, readable).slice()
}
@ -180,6 +193,7 @@ class NettyByteBuffer(
override fun close() {
if (atomicClosed.compareAndSet(false, true)) {
byteBuf.release()
reference?.cancel()
}
}
}

View File

@ -4,8 +4,9 @@ plugins {
}
dependencies {
implementation(project(":"))
implementation(project(":ts-core:ts-buffer"))
api(project(":"))
api(project(":ts-core"))
api(project(":ts-core:ts-buffer"))
implementation(project(":ts-core:ts-datastruct"))
}

View File

@ -1,15 +1,11 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.FreeReference
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.ClosedBufferException
import cn.tursom.core.buffer.ProxyByteBuffer
import cn.tursom.core.pool.MemoryPool
import java.lang.ref.PhantomReference
import java.lang.ref.Reference
import java.lang.ref.ReferenceQueue
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
/**
* 在被垃圾回收时能保证释放占用的内存池内存
@ -20,19 +16,25 @@ class PooledByteBuffer(
val token: Int,
autoClose: Boolean = false,
) : ProxyByteBuffer, CloseSafeByteBuffer(agent) {
private val reference = if (autoClose) PhantomReference(this, allocatedReferenceQueue) else null
init {
if (reference != null) allocatedMap[reference] = pool to token
class AutoFreeReference(
pooledByteBuffer: PooledByteBuffer,
val pool: MemoryPool,
val token: Int,
) : FreeReference<PooledByteBuffer>(pooledByteBuffer) {
override fun free() {
pool.free(token)
}
}
private val reference = if (autoClose) AutoFreeReference(this, pool, token) else null
private val childCount = AtomicInteger(0)
override val resized get() = agent.resized
override fun close() {
if (tryClose()) {
if (childCount.get() == 0) {
if (reference != null) allocatedMap.remove(reference)
reference?.cancel()
pool.free(this)
}
}
@ -60,25 +62,4 @@ class PooledByteBuffer(
override fun toString(): String {
return "PooledByteBuffer(buffer=$agent, pool=$pool, token=$token, closed=$closed)"
}
//protected fun finalize() {
// pool.free(this)
//}
companion object {
private val allocatedReferenceQueue = ReferenceQueue<PooledByteBuffer>()
private val allocatedMap = ConcurrentHashMap<Reference<PooledByteBuffer>, Pair<MemoryPool, Int>>()
init {
thread(isDaemon = true) {
while (true) {
val (pool, token) = allocatedMap.remove(allocatedReferenceQueue.remove() ?: return@thread) ?: continue
try {
pool.free(token)
} catch (e: Exception) {
}
}
}
}
}
}

View File

@ -45,11 +45,12 @@ open class WebSocketClient<in T : WebSocketClient<T, H>, H : WebSocketHandler<T,
var closed: Boolean = false
private set
private val hook = ShutdownHook.addHook(true) {
close()
}
init {
uncheckedCast<T>()
ShutdownHook.addHook {
close()
}
}
fun open(): ChannelFuture? {