From 306b3cddd6bc7fb4ede8a93b4ed6f5ef27da8db3 Mon Sep 17 00:00:00 2001 From: tursom Date: Wed, 14 Jul 2021 13:58:53 +0800 Subject: [PATCH] update buffers --- .../main/kotlin/cn/tursom/core/Snowflake.kt | 0 .../main/kotlin/cn/tursom/core/AsyncFile.kt | 23 +++----- .../cn/tursom/core/HeapByteBufferUtil.kt | 6 +-- .../cn/tursom/core/buffer/ByteBuffer.kt | 4 -- .../tursom/core/buffer/ByteBufferExtension.kt | 12 ++--- .../tursom/core/buffer/MultipleByteBuffer.kt | 52 +++++++------------ .../tursom/core/buffer/impl/ListByteBuffer.kt | 27 ++++------ .../core/buffer/impl/NettyByteBuffer.kt | 50 ++++++++---------- 8 files changed, 66 insertions(+), 108 deletions(-) rename ts-core/{ts-buffer => }/src/main/kotlin/cn/tursom/core/Snowflake.kt (100%) diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/Snowflake.kt b/ts-core/src/main/kotlin/cn/tursom/core/Snowflake.kt similarity index 100% rename from ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/Snowflake.kt rename to ts-core/src/main/kotlin/cn/tursom/core/Snowflake.kt diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/AsyncFile.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/AsyncFile.kt index 61e02b6..6c1f40b 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/AsyncFile.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/AsyncFile.kt @@ -9,7 +9,6 @@ import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths import java.nio.file.StandardOpenOption -import java.util.concurrent.Future import kotlin.coroutines.Continuation import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException @@ -21,11 +20,11 @@ class AsyncFile(val path: Path) { constructor(path: String) : this(Paths.get(path)) interface Writer { - suspend fun writeAndWait(file: AsyncFile, position: Long): Int + suspend fun writeAsyncFile(file: AsyncFile, position: Long): Int } interface Reader { - suspend fun read(file: AsyncFile, position: Long): Int + suspend fun readAsyncFile(file: AsyncFile, position: Long): Int } private var existsCache = exists @@ -44,14 +43,10 @@ class AsyncFile(val path: Path) { val writeChannel: AsynchronousFileChannel by lazy { AsynchronousFileChannel.open(path, StandardOpenOption.WRITE) } val readChannel: AsynchronousFileChannel by lazy { AsynchronousFileChannel.open(path, StandardOpenOption.READ) } - fun write(buffer: ByteBuffer, position: Long = writePosition): Future { - return buffer.read { - writeChannel.write(it, position) - } - } - suspend fun writeAndWait(buffer: ByteBuffer, position: Long = writePosition): Int { - val writeSize = buffer.fileWriter?.writeAndWait(this, position) ?: buffer.read { + val writeSize = if (buffer is Writer) { + buffer.writeAsyncFile(this, position) + } else buffer.read { suspendCoroutine { cont -> writeChannel.write(it, position, cont, handler) } @@ -60,16 +55,14 @@ class AsyncFile(val path: Path) { return writeSize } - fun append(buffer: ByteBuffer, position: Long = size) { - write(buffer, position) - } - suspend fun appendAndWait(buffer: ByteBuffer, position: Long = size): Int { return writeAndWait(buffer, position) } suspend fun read(buffer: ByteBuffer, position: Long = readPosition): Int { - val readSize = buffer.fileReader?.read(this, position) ?: buffer.write { + val readSize = if (buffer is Reader) { + buffer.readAsyncFile(this, position) + } else buffer.write { suspendCoroutine { cont -> readChannel.read(it, position, cont, handler) } diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/HeapByteBufferUtil.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/HeapByteBufferUtil.kt index c20016c..d5e8feb 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/HeapByteBufferUtil.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/HeapByteBufferUtil.kt @@ -3,7 +3,7 @@ package cn.tursom.core import java.nio.ByteBuffer /** - * HOOK java.nio.HeapByteBuffer + * hack java.nio.HeapByteBuffer */ object HeapByteBufferUtil { private val field = ByteBuffer::class.java.getDeclaredField("offset") @@ -14,10 +14,6 @@ object HeapByteBufferUtil { fun wrap(array: ByteArray, offset: Int = 0, size: Int = array.size - offset): ByteBuffer { val buffer = ByteBuffer.wrap(array, 0, offset + size) - //return if (offset == 0) buffer else { - // buffer.position(offset) - // buffer.slice() - //} if (offset > 0) field.set(buffer, offset) return buffer } diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt index c7bca7d..1271998 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt @@ -1,6 +1,5 @@ package cn.tursom.core.buffer -import cn.tursom.core.AsyncFile import cn.tursom.core.Utils.bufferThreadLocal import cn.tursom.core.forEachIndex import cn.tursom.core.reverseBytes @@ -58,9 +57,6 @@ interface ByteBuffer : Closeable { val closed: Boolean get() = false val resized: Boolean - val fileReader: AsyncFile.Reader? get() = null - val fileWriter: AsyncFile.Writer? get() = null - override fun close() {} fun readBuffer(): java.nio.ByteBuffer diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtension.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtension.kt index 24ac961..365102e 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtension.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtension.kt @@ -40,7 +40,7 @@ inline fun ByteBuffer.write(block: (java.nio.ByteBuffer) -> T): T { } } -inline fun MultipleByteBuffer.reads(block: (List) -> T): T { +inline fun MultipleByteBuffer.reads(block: (Sequence) -> T): T { val bufferList = readBuffers() try { return block(bufferList) @@ -50,7 +50,7 @@ inline fun MultipleByteBuffer.reads(block: (List) -> T) } -inline fun MultipleByteBuffer.writes(block: (List) -> T): T { +inline fun MultipleByteBuffer.writes(block: (Sequence) -> T): T { val bufferList = writeBuffers() try { return block(bufferList) @@ -61,7 +61,7 @@ inline fun MultipleByteBuffer.writes(block: (List) -> T fun ReadableByteChannel.read(buffer: ByteBuffer): Int { return if (buffer is MultipleByteBuffer && this is ScatteringByteChannel) { - buffer.writeBuffers { read(it.toTypedArray()) }.toInt() + buffer.writeBuffers { read(it.toList().toTypedArray()) }.toInt() } else { buffer.write { read(it) } } @@ -69,18 +69,18 @@ fun ReadableByteChannel.read(buffer: ByteBuffer): Int { fun WritableByteChannel.write(buffer: ByteBuffer): Int { return if (buffer is MultipleByteBuffer && this is GatheringByteChannel) { - buffer.readBuffers { write(it.toTypedArray()) }.toInt() + buffer.readBuffers { write(it.toList().toTypedArray()) }.toInt() } else { buffer.read { write(it) } } } fun ScatteringByteChannel.read(buffer: MultipleByteBuffer): Long { - return buffer.writeBuffers { read(it.toTypedArray()) } + return buffer.writeBuffers { read(it.toList().toTypedArray()) } } fun GatheringByteChannel.write(buffer: MultipleByteBuffer): Long { - return buffer.readBuffers { write(it.toTypedArray()) } + return buffer.readBuffers { write(it.toList().toTypedArray()) } } fun ScatteringByteChannel.read(buffers: Array): Long { diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/MultipleByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/MultipleByteBuffer.kt index 51b989f..5840f76 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/MultipleByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/MultipleByteBuffer.kt @@ -10,15 +10,15 @@ import java.nio.ByteOrder @Suppress("unused") interface MultipleByteBuffer : Closeable, ByteBuffer { - val buffers: List - val buffersArray: Array get() = buffers.toTypedArray() + val buffers: List get() = listOf(this) + val buffersArray: Array get() = arrayOf(this) fun append(buffer: ByteBuffer) /** * 使用读 buffer,ByteBuffer 实现类有义务维护指针正常推进 */ - fun readBuffers(block: (List) -> T): T { + fun readBuffers(block: (Sequence) -> T): T { val buffer = readBuffers() return try { block(buffer) @@ -30,7 +30,7 @@ interface MultipleByteBuffer : Closeable, ByteBuffer { /** * 使用写 buffer,ByteBuffer 实现类有义务维护指针正常推进 */ - fun writeBuffers(block: (List) -> T): T { + fun writeBuffers(block: (Sequence) -> T): T { val buffer = writeBuffers() return try { block(buffer) @@ -39,60 +39,44 @@ interface MultipleByteBuffer : Closeable, ByteBuffer { } } - fun readBuffers(): List { - val bufferList = ArrayList() + fun readBuffers(): Sequence = sequence { buffers.forEach { if (it is MultipleByteBuffer) { - it.buffers.forEach { - bufferList.add(it.readBuffer()) - } + yieldAll(it.readBuffers()) } else { - bufferList.add(it.readBuffer()) + yield(it.readBuffer()) } } - return bufferList } - fun writeBuffers(): List { - val bufferList = ArrayList() + fun writeBuffers(): Sequence = sequence { buffers.forEach { if (it is MultipleByteBuffer) { - it.buffers.forEach { - bufferList.add(it.writeBuffer()) - } + yieldAll(it.writeBuffers()) } else { - bufferList.add(it.writeBuffer()) + yield(it.writeBuffer()) } } - return bufferList } - fun finishRead(buffers: List) { - var index = 0 + fun finishRead(buffers: Sequence) = finishRead(buffers.iterator()) + fun finishRead(buffers: Iterator) { this.buffers.forEach { if (it is MultipleByteBuffer) { - it.buffers.forEach { - it.finishRead(buffers[index]) - index++ - } + it.finishRead(buffers) } else { - it.finishRead(buffers[index]) - index++ + it.finishRead(buffers.next()) } } } - fun finishWrite(buffers: List) { - var index = 0 + fun finishWrite(buffers: Sequence) = finishWrite(buffers.iterator()) + fun finishWrite(buffers: Iterator) { this.buffers.forEach { subBuf -> if (subBuf is MultipleByteBuffer) { - subBuf.buffers.forEach { writeBuf -> - writeBuf.finishWrite(buffers[index]) - index++ - } + subBuf.finishWrite(buffers) } else { - subBuf.finishWrite(buffers[index]) - index++ + subBuf.finishWrite(buffers.next()) } } } diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/ListByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/ListByteBuffer.kt index e6bfc82..3a09b53 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/ListByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/ListByteBuffer.kt @@ -4,11 +4,14 @@ import cn.tursom.buffer.MultipleByteBuffer import cn.tursom.core.buffer.ByteBuffer import java.nio.ByteOrder +@Suppress("MemberVisibilityCanBePrivate") open class ListByteBuffer( final override val buffers: MutableList = ArrayList(), ) : MultipleByteBuffer { - private var readOperator = buffers.firstOrNull() - private var writeOperator = buffers.firstOrNull() + var readArrayPosition: Int = 0 + var writeArrayPosition: Int = 0 + var readOperator = buffers.firstOrNull() + var writeOperator = buffers.firstOrNull() private var buffersArrayCache: Array? = null override val buffersArray: Array @@ -21,17 +24,7 @@ open class ListByteBuffer( override val hasArray: Boolean get() = false override val array: ByteArray get() = throw UnsupportedOperationException() - override val capacity: Int - get() { - var capacity = 0 - buffers.forEach { - capacity += it.capacity - } - return capacity - } - - var writeArrayPosition: Int = 0 - var readArrayPosition: Int = 0 + override val capacity: Int get() = buffers.sumOf { it.capacity } override var writePosition: Int = buffers.sumOf { it.writePosition } override var readPosition: Int = buffers.sumOf { it.readPosition } @@ -66,14 +59,14 @@ open class ListByteBuffer( override fun resize(newSize: Int): Boolean = throw UnsupportedOperationException() - fun updateRead() { - while (readArrayPosition < buffers.size && (readOperator == null || !readOperator!!.isReadable)) { + private fun updateRead() { + while (readArrayPosition < buffers.size && readOperator?.isReadable != true) { readOperator = buffers[readArrayPosition++] } } - fun updateWrite() { - while (writeArrayPosition < buffers.size && (writeOperator == null || !writeOperator!!.isReadable)) { + private fun updateWrite() { + while (writeArrayPosition < buffers.size && writeOperator?.isWriteable == true) { writeOperator = buffers[writeArrayPosition++] } } diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt index 1793fbf..2791a30 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt @@ -13,7 +13,7 @@ import kotlin.coroutines.suspendCoroutine class NettyByteBuffer( val byteBuf: ByteBuf, autoClose: Boolean = false, -) : ByteBuffer { +) : ByteBuffer, AsyncFile.Reader, AsyncFile.Writer { companion object : Slf4jImpl() constructor( @@ -59,40 +59,36 @@ class NettyByteBuffer( private val atomicClosed = AtomicBoolean(false) - override val fileReader: AsyncFile.Reader = object : AsyncFile.Reader { - override suspend fun read(file: AsyncFile, position: Long): Int { - val nioBuffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.capacity()) - var readPosition = position - for (nioBuffer in nioBuffers) { - while (nioBuffer.hasRemaining()) { - val readSize = suspendCoroutine { cont -> - file.writeChannel.read(nioBuffer, readPosition, cont, AsyncFile.handler) - } - if (readSize <= 0) break - readPosition += readSize - byteBuf.writerIndex(byteBuf.writerIndex() + readSize) + override suspend fun readAsyncFile(file: AsyncFile, position: Long): Int { + val nioBuffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes()) + var readPosition = position + for (nioBuffer in nioBuffers) { + while (nioBuffer.hasRemaining()) { + val readSize = suspendCoroutine { cont -> + file.writeChannel.read(nioBuffer, readPosition, cont, AsyncFile.handler) } + if (readSize <= 0) break + readPosition += readSize + byteBuf.writerIndex(byteBuf.writerIndex() + readSize) } - return (readPosition - position).toInt() } + return (readPosition - position).toInt() } - override val fileWriter: AsyncFile.Writer = object : AsyncFile.Writer { - override suspend fun writeAndWait(file: AsyncFile, position: Long): Int { - val nioBuffers = byteBuf.nioBuffers() - var writePosition = position - for (nioBuffer in nioBuffers) { - while (nioBuffer.hasRemaining()) { - val writeSize = suspendCoroutine { cont -> - file.writeChannel.write(nioBuffer, writePosition, cont, AsyncFile.handler) - } - if (writeSize <= 0) break - writePosition += writeSize - byteBuf.readerIndex(byteBuf.readerIndex() + writeSize) + override suspend fun writeAsyncFile(file: AsyncFile, position: Long): Int { + val nioBuffers = byteBuf.nioBuffers() + var writePosition = position + for (nioBuffer in nioBuffers) { + while (nioBuffer.hasRemaining()) { + val writeSize = suspendCoroutine { cont -> + file.writeChannel.write(nioBuffer, writePosition, cont, AsyncFile.handler) } + if (writeSize <= 0) break + writePosition += writeSize + byteBuf.readerIndex(byteBuf.readerIndex() + writeSize) } - return (writePosition - position).toInt() } + return (writePosition - position).toInt() } private val reference = if (autoClose) {