update buffers

This commit is contained in:
tursom 2021-07-14 13:58:53 +08:00
parent 9ce5ab7779
commit 306b3cddd6
8 changed files with 66 additions and 108 deletions

View File

@ -9,7 +9,6 @@ import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.StandardOpenOption
import java.util.concurrent.Future
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
@ -21,11 +20,11 @@ class AsyncFile(val path: Path) {
constructor(path: String) : this(Paths.get(path))
interface Writer {
suspend fun writeAndWait(file: AsyncFile, position: Long): Int
suspend fun writeAsyncFile(file: AsyncFile, position: Long): Int
}
interface Reader {
suspend fun read(file: AsyncFile, position: Long): Int
suspend fun readAsyncFile(file: AsyncFile, position: Long): Int
}
private var existsCache = exists
@ -44,14 +43,10 @@ class AsyncFile(val path: Path) {
val writeChannel: AsynchronousFileChannel by lazy { AsynchronousFileChannel.open(path, StandardOpenOption.WRITE) }
val readChannel: AsynchronousFileChannel by lazy { AsynchronousFileChannel.open(path, StandardOpenOption.READ) }
fun write(buffer: ByteBuffer, position: Long = writePosition): Future<Int> {
return buffer.read {
writeChannel.write(it, position)
}
}
suspend fun writeAndWait(buffer: ByteBuffer, position: Long = writePosition): Int {
val writeSize = buffer.fileWriter?.writeAndWait(this, position) ?: buffer.read {
val writeSize = if (buffer is Writer) {
buffer.writeAsyncFile(this, position)
} else buffer.read {
suspendCoroutine { cont ->
writeChannel.write(it, position, cont, handler)
}
@ -60,16 +55,14 @@ class AsyncFile(val path: Path) {
return writeSize
}
fun append(buffer: ByteBuffer, position: Long = size) {
write(buffer, position)
}
suspend fun appendAndWait(buffer: ByteBuffer, position: Long = size): Int {
return writeAndWait(buffer, position)
}
suspend fun read(buffer: ByteBuffer, position: Long = readPosition): Int {
val readSize = buffer.fileReader?.read(this, position) ?: buffer.write {
val readSize = if (buffer is Reader) {
buffer.readAsyncFile(this, position)
} else buffer.write {
suspendCoroutine { cont ->
readChannel.read(it, position, cont, handler)
}

View File

@ -3,7 +3,7 @@ package cn.tursom.core
import java.nio.ByteBuffer
/**
* HOOK java.nio.HeapByteBuffer
* hack java.nio.HeapByteBuffer
*/
object HeapByteBufferUtil {
private val field = ByteBuffer::class.java.getDeclaredField("offset")
@ -14,10 +14,6 @@ object HeapByteBufferUtil {
fun wrap(array: ByteArray, offset: Int = 0, size: Int = array.size - offset): ByteBuffer {
val buffer = ByteBuffer.wrap(array, 0, offset + size)
//return if (offset == 0) buffer else {
// buffer.position(offset)
// buffer.slice()
//}
if (offset > 0) field.set(buffer, offset)
return buffer
}

View File

@ -1,6 +1,5 @@
package cn.tursom.core.buffer
import cn.tursom.core.AsyncFile
import cn.tursom.core.Utils.bufferThreadLocal
import cn.tursom.core.forEachIndex
import cn.tursom.core.reverseBytes
@ -58,9 +57,6 @@ interface ByteBuffer : Closeable {
val closed: Boolean get() = false
val resized: Boolean
val fileReader: AsyncFile.Reader? get() = null
val fileWriter: AsyncFile.Writer? get() = null
override fun close() {}
fun readBuffer(): java.nio.ByteBuffer

View File

@ -40,7 +40,7 @@ inline fun <T> ByteBuffer.write(block: (java.nio.ByteBuffer) -> T): T {
}
}
inline fun <T> MultipleByteBuffer.reads(block: (List<java.nio.ByteBuffer>) -> T): T {
inline fun <T> MultipleByteBuffer.reads(block: (Sequence<java.nio.ByteBuffer>) -> T): T {
val bufferList = readBuffers()
try {
return block(bufferList)
@ -50,7 +50,7 @@ inline fun <T> MultipleByteBuffer.reads(block: (List<java.nio.ByteBuffer>) -> T)
}
inline fun <T> MultipleByteBuffer.writes(block: (List<java.nio.ByteBuffer>) -> T): T {
inline fun <T> MultipleByteBuffer.writes(block: (Sequence<java.nio.ByteBuffer>) -> T): T {
val bufferList = writeBuffers()
try {
return block(bufferList)
@ -61,7 +61,7 @@ inline fun <T> MultipleByteBuffer.writes(block: (List<java.nio.ByteBuffer>) -> T
fun ReadableByteChannel.read(buffer: ByteBuffer): Int {
return if (buffer is MultipleByteBuffer && this is ScatteringByteChannel) {
buffer.writeBuffers { read(it.toTypedArray()) }.toInt()
buffer.writeBuffers { read(it.toList().toTypedArray()) }.toInt()
} else {
buffer.write { read(it) }
}
@ -69,18 +69,18 @@ fun ReadableByteChannel.read(buffer: ByteBuffer): Int {
fun WritableByteChannel.write(buffer: ByteBuffer): Int {
return if (buffer is MultipleByteBuffer && this is GatheringByteChannel) {
buffer.readBuffers { write(it.toTypedArray()) }.toInt()
buffer.readBuffers { write(it.toList().toTypedArray()) }.toInt()
} else {
buffer.read { write(it) }
}
}
fun ScatteringByteChannel.read(buffer: MultipleByteBuffer): Long {
return buffer.writeBuffers { read(it.toTypedArray()) }
return buffer.writeBuffers { read(it.toList().toTypedArray()) }
}
fun GatheringByteChannel.write(buffer: MultipleByteBuffer): Long {
return buffer.readBuffers { write(it.toTypedArray()) }
return buffer.readBuffers { write(it.toList().toTypedArray()) }
}
fun ScatteringByteChannel.read(buffers: Array<out ByteBuffer>): Long {

View File

@ -10,15 +10,15 @@ import java.nio.ByteOrder
@Suppress("unused")
interface MultipleByteBuffer : Closeable, ByteBuffer {
val buffers: List<ByteBuffer>
val buffersArray: Array<out ByteBuffer> get() = buffers.toTypedArray()
val buffers: List<ByteBuffer> get() = listOf(this)
val buffersArray: Array<out ByteBuffer> get() = arrayOf(this)
fun append(buffer: ByteBuffer)
/**
* 使用读 bufferByteBuffer 实现类有义务维护指针正常推进
*/
fun <T> readBuffers(block: (List<java.nio.ByteBuffer>) -> T): T {
fun <T> readBuffers(block: (Sequence<java.nio.ByteBuffer>) -> T): T {
val buffer = readBuffers()
return try {
block(buffer)
@ -30,7 +30,7 @@ interface MultipleByteBuffer : Closeable, ByteBuffer {
/**
* 使用写 bufferByteBuffer 实现类有义务维护指针正常推进
*/
fun <T> writeBuffers(block: (List<java.nio.ByteBuffer>) -> T): T {
fun <T> writeBuffers(block: (Sequence<java.nio.ByteBuffer>) -> T): T {
val buffer = writeBuffers()
return try {
block(buffer)
@ -39,60 +39,44 @@ interface MultipleByteBuffer : Closeable, ByteBuffer {
}
}
fun readBuffers(): List<java.nio.ByteBuffer> {
val bufferList = ArrayList<java.nio.ByteBuffer>()
fun readBuffers(): Sequence<java.nio.ByteBuffer> = sequence {
buffers.forEach {
if (it is MultipleByteBuffer) {
it.buffers.forEach {
bufferList.add(it.readBuffer())
}
yieldAll(it.readBuffers())
} else {
bufferList.add(it.readBuffer())
yield(it.readBuffer())
}
}
return bufferList
}
fun writeBuffers(): List<java.nio.ByteBuffer> {
val bufferList = ArrayList<java.nio.ByteBuffer>()
fun writeBuffers(): Sequence<java.nio.ByteBuffer> = sequence {
buffers.forEach {
if (it is MultipleByteBuffer) {
it.buffers.forEach {
bufferList.add(it.writeBuffer())
}
yieldAll(it.writeBuffers())
} else {
bufferList.add(it.writeBuffer())
yield(it.writeBuffer())
}
}
return bufferList
}
fun finishRead(buffers: List<java.nio.ByteBuffer>) {
var index = 0
fun finishRead(buffers: Sequence<java.nio.ByteBuffer>) = finishRead(buffers.iterator())
fun finishRead(buffers: Iterator<java.nio.ByteBuffer>) {
this.buffers.forEach {
if (it is MultipleByteBuffer) {
it.buffers.forEach {
it.finishRead(buffers[index])
index++
}
it.finishRead(buffers)
} else {
it.finishRead(buffers[index])
index++
it.finishRead(buffers.next())
}
}
}
fun finishWrite(buffers: List<java.nio.ByteBuffer>) {
var index = 0
fun finishWrite(buffers: Sequence<java.nio.ByteBuffer>) = finishWrite(buffers.iterator())
fun finishWrite(buffers: Iterator<java.nio.ByteBuffer>) {
this.buffers.forEach { subBuf ->
if (subBuf is MultipleByteBuffer) {
subBuf.buffers.forEach { writeBuf ->
writeBuf.finishWrite(buffers[index])
index++
}
subBuf.finishWrite(buffers)
} else {
subBuf.finishWrite(buffers[index])
index++
subBuf.finishWrite(buffers.next())
}
}
}

View File

@ -4,11 +4,14 @@ import cn.tursom.buffer.MultipleByteBuffer
import cn.tursom.core.buffer.ByteBuffer
import java.nio.ByteOrder
@Suppress("MemberVisibilityCanBePrivate")
open class ListByteBuffer(
final override val buffers: MutableList<ByteBuffer> = ArrayList(),
) : MultipleByteBuffer {
private var readOperator = buffers.firstOrNull()
private var writeOperator = buffers.firstOrNull()
var readArrayPosition: Int = 0
var writeArrayPosition: Int = 0
var readOperator = buffers.firstOrNull()
var writeOperator = buffers.firstOrNull()
private var buffersArrayCache: Array<out ByteBuffer>? = null
override val buffersArray: Array<out ByteBuffer>
@ -21,17 +24,7 @@ open class ListByteBuffer(
override val hasArray: Boolean get() = false
override val array: ByteArray get() = throw UnsupportedOperationException()
override val capacity: Int
get() {
var capacity = 0
buffers.forEach {
capacity += it.capacity
}
return capacity
}
var writeArrayPosition: Int = 0
var readArrayPosition: Int = 0
override val capacity: Int get() = buffers.sumOf { it.capacity }
override var writePosition: Int = buffers.sumOf { it.writePosition }
override var readPosition: Int = buffers.sumOf { it.readPosition }
@ -66,14 +59,14 @@ open class ListByteBuffer(
override fun resize(newSize: Int): Boolean = throw UnsupportedOperationException()
fun updateRead() {
while (readArrayPosition < buffers.size && (readOperator == null || !readOperator!!.isReadable)) {
private fun updateRead() {
while (readArrayPosition < buffers.size && readOperator?.isReadable != true) {
readOperator = buffers[readArrayPosition++]
}
}
fun updateWrite() {
while (writeArrayPosition < buffers.size && (writeOperator == null || !writeOperator!!.isReadable)) {
private fun updateWrite() {
while (writeArrayPosition < buffers.size && writeOperator?.isWriteable == true) {
writeOperator = buffers[writeArrayPosition++]
}
}

View File

@ -13,7 +13,7 @@ import kotlin.coroutines.suspendCoroutine
class NettyByteBuffer(
val byteBuf: ByteBuf,
autoClose: Boolean = false,
) : ByteBuffer {
) : ByteBuffer, AsyncFile.Reader, AsyncFile.Writer {
companion object : Slf4jImpl()
constructor(
@ -59,40 +59,36 @@ class NettyByteBuffer(
private val atomicClosed = AtomicBoolean(false)
override val fileReader: AsyncFile.Reader = object : AsyncFile.Reader {
override suspend fun read(file: AsyncFile, position: Long): Int {
val nioBuffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.capacity())
var readPosition = position
for (nioBuffer in nioBuffers) {
while (nioBuffer.hasRemaining()) {
val readSize = suspendCoroutine<Int> { cont ->
file.writeChannel.read(nioBuffer, readPosition, cont, AsyncFile.handler)
}
if (readSize <= 0) break
readPosition += readSize
byteBuf.writerIndex(byteBuf.writerIndex() + readSize)
override suspend fun readAsyncFile(file: AsyncFile, position: Long): Int {
val nioBuffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes())
var readPosition = position
for (nioBuffer in nioBuffers) {
while (nioBuffer.hasRemaining()) {
val readSize = suspendCoroutine<Int> { cont ->
file.writeChannel.read(nioBuffer, readPosition, cont, AsyncFile.handler)
}
if (readSize <= 0) break
readPosition += readSize
byteBuf.writerIndex(byteBuf.writerIndex() + readSize)
}
return (readPosition - position).toInt()
}
return (readPosition - position).toInt()
}
override val fileWriter: AsyncFile.Writer = object : AsyncFile.Writer {
override suspend fun writeAndWait(file: AsyncFile, position: Long): Int {
val nioBuffers = byteBuf.nioBuffers()
var writePosition = position
for (nioBuffer in nioBuffers) {
while (nioBuffer.hasRemaining()) {
val writeSize = suspendCoroutine<Int> { cont ->
file.writeChannel.write(nioBuffer, writePosition, cont, AsyncFile.handler)
}
if (writeSize <= 0) break
writePosition += writeSize
byteBuf.readerIndex(byteBuf.readerIndex() + writeSize)
override suspend fun writeAsyncFile(file: AsyncFile, position: Long): Int {
val nioBuffers = byteBuf.nioBuffers()
var writePosition = position
for (nioBuffer in nioBuffers) {
while (nioBuffer.hasRemaining()) {
val writeSize = suspendCoroutine<Int> { cont ->
file.writeChannel.write(nioBuffer, writePosition, cont, AsyncFile.handler)
}
if (writeSize <= 0) break
writePosition += writeSize
byteBuf.readerIndex(byteBuf.readerIndex() + writeSize)
}
return (writePosition - position).toInt()
}
return (writePosition - position).toInt()
}
private val reference = if (autoClose) {