update Reference & Stream

This commit is contained in:
tursom 2021-07-13 13:35:14 +08:00
parent bbed157b4a
commit 5a19df7feb
22 changed files with 327 additions and 159 deletions

View File

@ -1,41 +0,0 @@
package cn.tursom.core
import org.slf4j.LoggerFactory
import java.lang.ref.PhantomReference
import java.lang.ref.ReferenceQueue
import kotlin.concurrent.thread
abstract class FreeReference<T>(referent: T) : PhantomReference<T>(referent, referenceQueue) {
companion object {
private val logger = LoggerFactory.getLogger(FreeReference::class.java)
private val referenceQueue = ReferenceQueue<Any?>()
private val freeThread = thread(isDaemon = true) {
while (true) {
val freeReference = referenceQueue.remove(1000) ?: continue
try {
if (freeReference is FreeReference<*> && !freeReference.cancel) {
freeReference.free()
}
} catch (e: Throwable) {
logger.error("an exception caused on free reference", e)
}
}
}
}
private var cancel: Boolean = false
override fun enqueue(): Boolean {
return if (cancel) {
false
} else {
super.enqueue()
}
}
abstract fun free()
fun cancel() {
cancel = true
}
}

View File

@ -1,5 +1,6 @@
package cn.tursom.core
import cn.tursom.core.reference.FreeSoftReference
import com.sun.org.slf4j.internal.LoggerFactory
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.atomic.AtomicInteger
@ -40,8 +41,8 @@ object ShutdownHook {
private class SoftReference(
hook: () -> Unit,
) : FreeReference<() -> Unit>(hook) {
override fun free() {
) : FreeSoftReference<() -> Unit>(hook) {
override fun release() {
shutdownHooks.removeIf {
it.get() == null
}

View File

@ -39,6 +39,8 @@ object Utils {
const val dollar = '$'
val random = Random(System.currentTimeMillis())
val bufferThreadLocal = SimpThreadLocal { ByteArray(1024) }
@Suppress("unused", "SpellCheckingInspection")
val gson: Gson by lazy {
GsonBuilder()
@ -567,7 +569,7 @@ fun ByteArray.undeflate(): ByteArray {
val inf = Inflater()
inf.setInput(this)
val bos = ByteArrayOutputStream()
val outByte = ByteArray(1024)
val outByte = Utils.bufferThreadLocal.get()
bos.use {
while (!inf.finished()) {
val len = inf.inflate(outByte)
@ -586,7 +588,7 @@ fun ByteArray.deflate(): ByteArray {
def.setInput(this)
def.finish()
val bos = ByteArrayOutputStream()
val outputByte = ByteArray(1024)
val outputByte = Utils.bufferThreadLocal.get()
bos.use {
while (!def.finished()) {
val len = def.deflate(outputByte)

View File

@ -0,0 +1,24 @@
package cn.tursom.core.reference
import java.lang.ref.PhantomReference
abstract class FreeReference<T>(
referent: T,
) : PhantomReference<T>(referent, ReleasableReference.referenceQueue),
ReleasableReference {
private var cancel: Boolean = false
override fun enqueue(): Boolean {
return if (cancel) {
false
} else {
super.enqueue()
}
}
fun cancel() {
cancel = true
}
}

View File

@ -0,0 +1,24 @@
package cn.tursom.core.reference
import java.lang.ref.SoftReference
abstract class FreeSoftReference<T>(
referent: T,
) : SoftReference<T>(referent, ReleasableReference.referenceQueue),
ReleasableReference {
private var cancel: Boolean = false
override fun enqueue(): Boolean {
return if (cancel) {
false
} else {
super.enqueue()
}
}
fun cancel() {
cancel = true
}
}

View File

@ -0,0 +1,24 @@
package cn.tursom.core.reference
import java.lang.ref.WeakReference
abstract class FreeWeakReference<T>(
referent: T,
) : WeakReference<T>(referent, ReleasableReference.referenceQueue),
ReleasableReference {
private var cancel: Boolean = false
override fun enqueue(): Boolean {
return if (cancel) {
false
} else {
super.enqueue()
}
}
fun cancel() {
cancel = true
}
}

View File

@ -0,0 +1,26 @@
package cn.tursom.core.reference
import org.slf4j.LoggerFactory
import java.lang.ref.ReferenceQueue
import kotlin.concurrent.thread
interface ReleasableReference {
companion object {
private val logger = LoggerFactory.getLogger(ReleasableReference::class.java)
val referenceQueue = ReferenceQueue<Any?>()
private val freeThread = thread(isDaemon = true) {
while (true) {
val freeReference = referenceQueue.remove(1000) ?: continue
try {
if (freeReference is ReleasableReference) {
freeReference.release()
}
} catch (e: Throwable) {
logger.error("an exception caused on free reference", e)
}
}
}
}
fun release()
}

View File

@ -4,6 +4,7 @@ plugins {
}
dependencies {
implementation(project(":ts-core:ts-log"))
implementation(project(":ts-core"))
compileOnly(group = "io.netty", name = "netty-all", version = "4.1.43.Final")
}

View File

@ -1,6 +1,7 @@
package cn.tursom.core.buffer
import cn.tursom.core.AsyncFile
import cn.tursom.core.Utils.bufferThreadLocal
import cn.tursom.core.forEachIndex
import java.io.Closeable
import java.io.IOException
@ -59,8 +60,7 @@ interface ByteBuffer : Closeable {
val fileReader: AsyncFile.Reader? get() = null
val fileWriter: AsyncFile.Writer? get() = null
override fun close() {
}
override fun close() {}
fun readBuffer(): java.nio.ByteBuffer
fun finishRead(buffer: java.nio.ByteBuffer) {
@ -133,7 +133,8 @@ interface ByteBuffer : Closeable {
readPosition += size
reset()
} else {
val buffer = buffer ?: ByteArray(1024)
@Suppress("NAME_SHADOWING")
val buffer = buffer ?: bufferThreadLocal.get()
read {
while (it.remaining() > 0) {
it.put(buffer)
@ -219,7 +220,7 @@ interface ByteBuffer : Closeable {
writePosition += read
read
} else {
val buffer = ByteArray(10 * 1024)
val buffer = bufferThreadLocal.get()
val read = inputStream.read(buffer)
put(buffer, 0, read)
}

View File

@ -1,8 +1,8 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.AsyncFile
import cn.tursom.core.FreeReference
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.reference.FreeReference
import io.netty.buffer.ByteBuf
import java.io.OutputStream
import java.nio.ByteOrder
@ -22,12 +22,15 @@ class NettyByteBuffer(
this.readPosition = readPosition
}
class AutoFreeReference(
internal class AutoFreeReference(
nettyByteBuffer: NettyByteBuffer,
private val atomicClosed: AtomicBoolean,
private val byteBuf: ByteBuf,
) : FreeReference<NettyByteBuffer>(nettyByteBuffer) {
override fun free() {
byteBuf.release()
override fun release() {
if (atomicClosed.compareAndSet(false, true)) {
byteBuf.release()
}
}
}
@ -88,7 +91,11 @@ class NettyByteBuffer(
}
}
private val reference = if (autoClose) AutoFreeReference(this, byteBuf) else null
private val reference = if (autoClose) {
AutoFreeReference(this, atomicClosed, byteBuf)
} else {
null
}
override fun readBuffer(): java.nio.ByteBuffer {
return byteBuf.internalNioBuffer(readPosition, readable).slice()
@ -186,14 +193,14 @@ class NettyByteBuffer(
return byteBuf.writerIndex() - writePosition
}
override fun toString(): String {
return "Nettyjava.nio.ByteBuffer(byteBuf=$byteBuf)"
}
override fun close() {
if (atomicClosed.compareAndSet(false, true)) {
byteBuf.release()
reference?.cancel()
}
}
override fun toString(): String {
return "Nettyjava.nio.ByteBuffer(byteBuf=$byteBuf)"
}
}

View File

@ -13,7 +13,7 @@ interface OutputStream : Closeable {
fun write(
buffer: ByteArray,
offset: Int = 0,
len: Int = buffer.size - offset
len: Int = buffer.size - offset,
): Int {
val byteBuffer = HeapByteBuffer(buffer, offset, len)
write(byteBuffer)
@ -22,4 +22,3 @@ interface OutputStream : Closeable {
fun flush() {}
}

View File

@ -2,14 +2,14 @@ package cn.tursom.core.stream
import cn.tursom.core.buffer.ByteBuffer
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
interface SuspendInputStream : InputStream {
fun skip(n: Long, handler: () -> Unit)
fun read(handler: (Int) -> Unit)
fun read(buffer: ByteArray, handler: (Int) -> Unit)
fun read(buffer: ByteArray, offset: Int, len: Int, handler: (Int) -> Unit)
fun read(buffer: ByteBuffer, handler: () -> Unit)
fun skip(n: Long, handler: (Throwable?) -> Unit)
fun read(handler: (Int, Throwable?) -> Unit)
fun read(buffer: ByteArray, offset: Int = 0, len: Int = buffer.size - offset, handler: (Int, Throwable?) -> Unit)
fun read(buffer: ByteBuffer, handler: (Throwable?) -> Unit)
suspend fun suspendSkip(n: Long) {
suspendCoroutine<Unit> { cont ->
@ -21,32 +21,36 @@ interface SuspendInputStream : InputStream {
suspend fun suspendRead(): Int {
return suspendCoroutine { cont ->
read {
cont.resume(it)
read { it, e ->
if (e != null) {
cont.resumeWithException(e)
} else {
cont.resume(it)
}
}
}
}
suspend fun suspendRead(buffer: ByteArray) {
suspendCoroutine<Unit> { cont ->
read(buffer) {
cont.resume(Unit)
}
}
}
suspend fun suspendRead(buffer: ByteArray, offset: Int, len: Int) {
suspendCoroutine<Unit> { cont ->
read(buffer, offset, len) {
cont.resume(Unit)
suspend fun suspendRead(buffer: ByteArray, offset: Int = 0, len: Int = buffer.size - offset): Int {
return suspendCoroutine { cont ->
read(buffer, offset, len) { it, e ->
if (e != null) {
cont.resumeWithException(e)
} else {
cont.resume(it)
}
}
}
}
suspend fun suspendRead(buffer: ByteBuffer) {
suspendCoroutine<Unit> { cont ->
read(buffer) {
cont.resume(Unit)
read(buffer) { e ->
if (e != null) {
cont.resumeWithException(e)
} else {
cont.resume(Unit)
}
}
}
}

View File

@ -0,0 +1,23 @@
package cn.tursom.core.stream
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer
interface SuspendOutputStream : OutputStream {
suspend fun suspendWrite(buffer: ByteBuffer)
suspend fun suspendWrite(byte: Byte) {
suspendWrite(byteArrayOf(byte))
}
suspend fun suspendWrite(
buffer: ByteArray,
offset: Int = 0,
len: Int = buffer.size - offset,
): Int {
val byteBuffer = HeapByteBuffer(buffer, offset, len)
suspendWrite(byteBuffer)
return byteBuffer.readPosition
}
suspend fun suspendFlush() {}
}

View File

@ -2,39 +2,28 @@ package cn.tursom.core.stream.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.stream.IOStream
import cn.tursom.core.stream.InputStream
import cn.tursom.core.stream.OutputStream
import cn.tursom.core.stream.SuspendInputStream
import cn.tursom.core.stream.SuspendOutputStream
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
class ByteBufferIOStream private constructor(
class ByteBufferIOStream(
private val buffer: ByteBuffer,
val inputStream: ByteBufferInputStream,
val outputStream: ByteBufferOutputStream
private val lock: Lock?,
val inputStream: ByteBufferInputStream = ByteBufferInputStream(buffer, lock),
val outputStream: ByteBufferOutputStream = ByteBufferOutputStream(buffer, lock),
) : IOStream,
SuspendInputStream,
InputStream by inputStream,
OutputStream by outputStream {
constructor(buffer: ByteBuffer) : this(buffer, ByteBufferInputStream(buffer), ByteBufferOutputStream(buffer))
override fun skip(n: Long, handler: () -> Unit) = handler()
override fun skip(n: Long) {}
override fun read(handler: (Int) -> Unit) {
handler(read())
}
override fun read(buffer: ByteArray, handler: (Int) -> Unit) {
handler(read(buffer))
}
override fun read(buffer: ByteArray, offset: Int, len: Int, handler: (Int) -> Unit) {
handler(read(buffer, offset, len))
}
override fun read(buffer: ByteBuffer, handler: () -> Unit) {
read(buffer)
handler()
}
SuspendInputStream by inputStream,
SuspendOutputStream by outputStream {
constructor(
buffer: ByteBuffer,
lock: Boolean = true,
) : this(
buffer,
if (lock) ReentrantLock() else null,
ByteBufferInputStream(buffer),
ByteBufferOutputStream(buffer)
)
override fun close() {
buffer.close()

View File

@ -1,29 +1,90 @@
package cn.tursom.core.stream.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.stream.InputStream
import cn.tursom.core.stream.SuspendInputStream
import java.util.concurrent.locks.Lock
import kotlin.concurrent.withLock
class ByteBufferInputStream(
val buffer: ByteBuffer
) : InputStream {
val buffer: ByteBuffer,
private val lock: Lock? = null,
) : SuspendInputStream {
private var closed = false
override val available: Int get() = buffer.readable
override fun skip(n: Long) {
checkClosed {
buffer.readPosition += n.toInt()
private inline fun <R> withLock(func: () -> R): R {
return if (lock != null) {
lock.withLock(func)
} else {
func()
}
}
override fun read(): Int = if (closed || buffer.readable == 0) -1 else buffer.get().toInt()
override val available: Int get() = buffer.readable
override fun skip(n: Long, handler: (Throwable?) -> Unit) {
handler(try {
skip(n)
null
} catch (e: Exception) {
e
})
}
override fun skip(n: Long) {
checkClosed {
withLock {
buffer.readPosition += n.toInt()
}
}
}
override fun read(handler: (Int, Throwable?) -> Unit) {
var n = 0
var t: Throwable? = null
try {
n = read()
} catch (e: Throwable) {
t = e
}
handler(n, t)
}
override fun read(buffer: ByteArray, offset: Int, len: Int, handler: (Int, Throwable?) -> Unit) {
var n = 0
var t: Throwable? = null
try {
n = read(buffer, offset, len)
} catch (e: Throwable) {
t = e
}
handler(n, t)
}
override fun read(buffer: ByteBuffer, handler: (Throwable?) -> Unit) {
handler(try {
read(buffer)
null
} catch (e: Throwable) {
e
})
}
override fun read(): Int {
return withLock {
if (closed || buffer.readable == 0) -1 else buffer.get().toInt()
}
}
override fun read(buffer: ByteArray, offset: Int, len: Int) = checkClosed {
this.buffer.writeTo(buffer, offset, len)
withLock {
this.buffer.writeTo(buffer, offset, len)
}
} ?: -1
override fun read(buffer: ByteBuffer) {
checkClosed {
this.buffer.writeTo(buffer)
withLock {
this.buffer.writeTo(buffer)
}
}
}

View File

@ -1,28 +1,54 @@
package cn.tursom.core.stream.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.stream.OutputStream
import cn.tursom.core.stream.SuspendOutputStream
import java.util.concurrent.locks.Lock
import kotlin.concurrent.withLock
class ByteBufferOutputStream(
val buffer: ByteBuffer
) : OutputStream {
val buffer: ByteBuffer,
private val lock: Lock? = null,
) : SuspendOutputStream {
private var closed = false
private inline fun <R> withLock(func: () -> R): R {
return if (lock != null) {
lock.withLock(func)
} else {
func()
}
}
override fun write(byte: Byte) {
checkClosed {
buffer.put(byte)
withLock {
buffer.put(byte)
}
}
}
override fun write(buffer: ByteArray, offset: Int, len: Int): Int = checkClosed {
this.buffer.put(buffer, offset, len)
withLock {
this.buffer.put(buffer, offset, len)
}
} ?: -1
override fun write(buffer: ByteBuffer) {
checkClosed {
buffer.writeTo(this.buffer)
withLock {
buffer.writeTo(this.buffer)
}
}
}
override suspend fun suspendWrite(byte: Byte) {
write(byte)
}
override suspend fun suspendWrite(buffer: ByteBuffer) {
write(buffer)
}
override fun flush() {}
private inline fun <T> checkClosed(action: () -> T): T? = if (!closed) action() else null

View File

@ -10,10 +10,9 @@ class BytesIOStream private constructor(
) : IOStream by buBufferIOStream, SuspendInputStream {
constructor(bytes: ByteArray) : this(ByteBufferIOStream(HeapByteBuffer(bytes).apply { clear() }))
override fun skip(n: Long, handler: () -> Unit) = buBufferIOStream.skip(n, handler)
override fun read(handler: (Int) -> Unit) = buBufferIOStream.read(handler)
override fun read(buffer: ByteBuffer, handler: () -> Unit) = buBufferIOStream.read(buffer, handler)
override fun read(buffer: ByteArray, handler: (Int) -> Unit) = buBufferIOStream.read(buffer, handler)
override fun read(buffer: ByteArray, offset: Int, len: Int, handler: (Int) -> Unit) =
override fun skip(n: Long, handler: (Throwable?) -> Unit) = buBufferIOStream.skip(n, handler)
override fun read(handler: (Int, Throwable?) -> Unit) = buBufferIOStream.read(handler)
override fun read(buffer: ByteBuffer, handler: (Throwable?) -> Unit) = buBufferIOStream.read(buffer, handler)
override fun read(buffer: ByteArray, offset: Int, len: Int, handler: (Int, Throwable?) -> Unit) =
buBufferIOStream.read(buffer, offset, len, handler)
}

View File

@ -4,7 +4,8 @@ import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.stream.InputStream
class JavaInputStream(
@Suppress("MemberVisibilityCanBePrivate") val inputStream: java.io.InputStream
@Suppress("MemberVisibilityCanBePrivate")
val inputStream: java.io.InputStream,
) : InputStream {
override val available: Int get() = inputStream.available()
override fun skip(n: Long) {

View File

@ -19,4 +19,4 @@ class PairIOStream(
inputStream.close()
outputStream.close()
}
}
}

View File

@ -5,14 +5,14 @@ import cn.tursom.core.toByteArray
import kotlin.math.pow
@Suppress("MemberVisibilityCanBePrivate")
class BloomFilter(val hash: Hash = murmur3Hash, val bitSize: Long, val hashCount: Int = 5) {
class BloomFilter(val hash: Hash = Murmur3Hash, val bitSize: Long, val hashCount: Int = 5) {
private val bitSet = ArrayBitSet(bitSize)
constructor(
valueCount: Long = 1000000,
hashCount: Int = 5,
wrongChance: Double = 0.03,
hash: Hash = murmur3Hash
hash: Hash = Murmur3Hash,
) : this(
hash,
wrongChance.run {
@ -78,7 +78,7 @@ class BloomFilter(val hash: Hash = murmur3Hash, val bitSize: Long, val hashCount
}
companion object {
val murmur3Hash = object : Hash {
object Murmur3Hash : Hash {
override fun hash(src: ByteArray): HashContent {
val hash = MurmurHash3.murmurHash3_x64_128(src)
return object : HashContent {

View File

@ -1,9 +1,8 @@
@file:Suppress("DuplicatedCode")
package cn.tursom.core.hash
import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.core.toHexString
import cn.tursom.core.toUTF8String
import java.io.File
@Suppress("unused", "FunctionName", "NAME_SHADOWING")
@ -28,7 +27,9 @@ object MurmurHash3 {
return k
}
/** Gets a long from a byte buffer in little endian byte order. */
/**
* Gets a long from a byte buffer in little endian byte order.
*/
private fun getLongLittleEndian(buf: ByteArray, offset: Int): Long {
return (buf[offset + 7].toLong() shl 56 // no mask needed
or (buf[offset + 6].toLong() and 0xffL shl 48)
@ -40,7 +41,9 @@ object MurmurHash3 {
or (buf[offset].toLong() and 0xffL)) // no shift needed
}
/** Returns the MurmurHash3_x86_32 hash. */
/**
* Returns the MurmurHash3_x86_32 hash.
*/
fun murmurHash3_x86_32(data: ByteArray, offset: Int, len: Int, seed: Int): Int {
val c1 = -0x3361d2af
val c2 = 0x1b873593
@ -79,7 +82,8 @@ object MurmurHash3 {
return fmix32(h1 xor len)
}
/** Returns the MurmurHash3_x86_32 hash of the UTF-8 bytes of the String without actually encoding
/**
* Returns the MurmurHash3_x86_32 hash of the UTF-8 bytes of the String without actually encoding
* the string to a temporary buffer. This is more than 2x faster than hashing the result
* of String.getBytes().
*/
@ -160,18 +164,20 @@ object MurmurHash3 {
offset: Int = 0,
len: Int = key.size - offset,
seed: Int,
out: LongPair = LongPair()
out: LongPair = LongPair(),
): LongPair {
return murmurHash3_x64_128(key, offset, len, seed.toLong() and 0x00000000FFFFFFFFL, out)
}
/** Returns the MurmurHash3_x64_128 hash, placing the result in "out". */
/**
* Returns the MurmurHash3_x64_128 hash, placing the result in "out".
*/
fun murmurHash3_x64_128(
key: ByteArray,
offset: Int = 0,
len: Int = key.size - offset,
seed: Long = 0,
out: LongPair = LongPair()
out: LongPair = LongPair(),
): LongPair {
// The original algorithm does have a 32 bit unsigned seed.
// We have to mask to match the behavior of the unsigned types and prevent sign extension.
@ -245,7 +251,9 @@ object MurmurHash3 {
return out
}
/** 128 bits of state */
/**
* 128 bits of state
*/
data class LongPair(var val1: Long = 0, var val2: Long = 0) {
fun toByteArray(): ByteArray {
val buffer = HeapByteBuffer(16)
@ -255,11 +263,3 @@ object MurmurHash3 {
}
}
}
fun main() {
val data = File("build.gradle").readText().toByteArray()
println(data.toUTF8String())
val hash = MurmurHash3.murmurHash3_x64_128(data, seed = 100)
println(hash)
println(hash.toByteArray().toHexString())
}

View File

@ -1,15 +1,12 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.FreeReference
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.ClosedBufferException
import cn.tursom.core.buffer.ProxyByteBuffer
import cn.tursom.core.pool.MemoryPool
import cn.tursom.core.reference.FreeReference
import java.util.concurrent.atomic.AtomicInteger
/**
* 在被垃圾回收时能保证释放占用的内存池内存
*/
class PooledByteBuffer(
agent: ByteBuffer,
val pool: MemoryPool,
@ -21,7 +18,7 @@ class PooledByteBuffer(
val pool: MemoryPool,
val token: Int,
) : FreeReference<PooledByteBuffer>(pooledByteBuffer) {
override fun free() {
override fun release() {
pool.free(token)
}
}