update buffers

This commit is contained in:
tursom 2021-07-16 15:07:51 +08:00
parent cc3ea3c0a5
commit d76bfdc89b
16 changed files with 127 additions and 48 deletions

View File

@ -134,6 +134,9 @@ inline fun <T> Any?.cast() = this as T
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST") @Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
inline fun <T> Any?.uncheckedCast() = this as T inline fun <T> Any?.uncheckedCast() = this as T
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
inline fun <T> Any?.uncheckedCastNullable() = this as? T
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST") @Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
inline fun <reified T> Any?.castOrNull() = if (this is T) this else null inline fun <reified T> Any?.castOrNull() = if (this is T) this else null

View File

@ -1,8 +1,8 @@
package cn.tursom.core package cn.tursom.core
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.AsyncFile.Reader
import cn.tursom.core.buffer.read import cn.tursom.core.AsyncFile.Writer
import cn.tursom.core.buffer.write import cn.tursom.core.buffer.*
import java.nio.channels.AsynchronousFileChannel import java.nio.channels.AsynchronousFileChannel
import java.nio.channels.CompletionHandler import java.nio.channels.CompletionHandler
import java.nio.file.Files import java.nio.file.Files
@ -19,11 +19,69 @@ import kotlin.coroutines.suspendCoroutine
class AsyncFile(val path: Path) { class AsyncFile(val path: Path) {
constructor(path: String) : this(Paths.get(path)) constructor(path: String) : this(Paths.get(path))
interface Writer { fun interface Writer {
companion object : ByteBufferExtensionKey<Writer> {
override tailrec fun get(buffer: ByteBuffer): Writer? {
return when (buffer) {
is MultipleByteBuffer -> Writer { file, position ->
var writePosition = position
val nioBuffers = buffer.readBuffers().toList()
run {
nioBuffers.forEach { readBuf ->
while (readBuf.position() < readBuf.limit()) {
val writeSize = file.write(readBuf, writePosition)
if (writeSize > 0) {
writePosition += writeSize
} else {
return@run
}
}
}
}
buffer.finishRead(nioBuffers.iterator())
(writePosition - position).toInt()
}
is ProxyByteBuffer -> get(buffer.agent)
else -> null
}
}
}
suspend fun writeAsyncFile(file: AsyncFile, position: Long): Int suspend fun writeAsyncFile(file: AsyncFile, position: Long): Int
} }
interface Reader { fun interface Reader {
companion object : ByteBufferExtensionKey<Reader> {
override tailrec fun get(buffer: ByteBuffer): Reader? {
return when (buffer) {
is MultipleByteBuffer -> Reader { file, position ->
var readPosition = position
val nioBuffers = buffer.writeBuffers().toList()
run {
nioBuffers.forEach { nioBuf ->
while (nioBuf.position() < nioBuf.limit()) {
val readSize = file.read(nioBuf, readPosition)
if (readSize > 0) {
readPosition += readSize
} else {
return@run
}
}
}
}
buffer.finishWrite(nioBuffers.iterator())
(readPosition - position).toInt()
}
is ProxyByteBuffer -> get(buffer.agent)
else -> null
}
}
}
suspend fun readAsyncFile(file: AsyncFile, position: Long): Int suspend fun readAsyncFile(file: AsyncFile, position: Long): Int
} }
@ -44,33 +102,35 @@ class AsyncFile(val path: Path) {
val readChannel: AsynchronousFileChannel by lazy { AsynchronousFileChannel.open(path, StandardOpenOption.READ) } val readChannel: AsynchronousFileChannel by lazy { AsynchronousFileChannel.open(path, StandardOpenOption.READ) }
suspend fun writeAndWait(buffer: ByteBuffer, position: Long = writePosition): Int { suspend fun writeAndWait(buffer: ByteBuffer, position: Long = writePosition): Int {
val writeSize = if (buffer is Writer) { val writeSize = buffer.getExtension(Writer)?.writeAsyncFile(this, position)
buffer.writeAsyncFile(this, position) ?: buffer.read {
} else buffer.read { write(it, position)
suspendCoroutine { cont ->
writeChannel.write(it, position, cont, handler)
} }
}
writePosition += writeSize writePosition += writeSize
return writeSize return writeSize
} }
private suspend fun write(nioBuf: java.nio.ByteBuffer, position: Long): Int = suspendCoroutine { cont ->
writeChannel.write(nioBuf, position, cont, handler)
}
suspend fun appendAndWait(buffer: ByteBuffer, position: Long = size): Int { suspend fun appendAndWait(buffer: ByteBuffer, position: Long = size): Int {
return writeAndWait(buffer, position) return writeAndWait(buffer, position)
} }
suspend fun read(buffer: ByteBuffer, position: Long = readPosition): Int { suspend fun read(buffer: ByteBuffer, position: Long = readPosition): Int {
val readSize = if (buffer is Reader) { val readSize = buffer.getExtension(Reader)?.readAsyncFile(this, position)
buffer.readAsyncFile(this, position) ?: buffer.write {
} else buffer.write { read(it, position)
suspendCoroutine { cont ->
readChannel.read(it, position, cont, handler)
} }
}
readPosition += readSize readPosition += readSize
return readSize return readSize
} }
private suspend fun read(nioBuf: java.nio.ByteBuffer, position: Long): Int = suspendCoroutine { cont ->
readChannel.read(nioBuf, position, cont, handler)
}
fun create() = if (!existsCache || !exists) { fun create() = if (!existsCache || !exists) {
Files.createFile(path) Files.createFile(path)
existsCache = true existsCache = true

View File

@ -0,0 +1 @@
package cn.tursom.core.buffer

View File

@ -16,6 +16,8 @@ import kotlin.math.min
*/ */
@Suppress("unused") @Suppress("unused")
interface ByteBuffer : Closeable { interface ByteBuffer : Closeable {
fun <T> getExtension(key: ByteBufferExtensionKey<T>): T? = key.get(this)
/** /**
* 使用读 bufferByteBuffer 实现类有义务维护指针正常推进 * 使用读 bufferByteBuffer 实现类有义务维护指针正常推进
*/ */

View File

@ -5,7 +5,6 @@
package cn.tursom.core.buffer package cn.tursom.core.buffer
import cn.tursom.buffer.MultipleByteBuffer
import cn.tursom.core.buffer.impl.ArrayByteBuffer import cn.tursom.core.buffer.impl.ArrayByteBuffer
import cn.tursom.core.toBytes import cn.tursom.core.toBytes
import cn.tursom.core.toInt import cn.tursom.core.toInt
@ -87,8 +86,8 @@ fun ScatteringByteChannel.read(buffers: Array<out ByteBuffer>): Long {
val bufferList = ArrayList<java.nio.ByteBuffer>() val bufferList = ArrayList<java.nio.ByteBuffer>()
buffers.forEach { buffers.forEach {
if (it is MultipleByteBuffer) { if (it is MultipleByteBuffer) {
it.buffers.forEach { it.writeBuffers().forEach { nioBuffer ->
bufferList.add(it.writeBuffer()) bufferList.add(nioBuffer)
} }
} else { } else {
bufferList.add(it.writeBuffer()) bufferList.add(it.writeBuffer())
@ -99,13 +98,12 @@ fun ScatteringByteChannel.read(buffers: Array<out ByteBuffer>): Long {
read(bufferArray) read(bufferArray)
} finally { } finally {
var index = 0 var index = 0
val nioBuffers = bufferList.iterator()
buffers.forEach { buffers.forEach {
if (it is MultipleByteBuffer) { if (it is MultipleByteBuffer) {
it.buffers.forEach { it.finishWrite(nioBuffers)
it.finishWrite(bufferArray[index])
}
} else { } else {
it.finishWrite(bufferArray[index]) it.finishWrite(nioBuffers.next())
} }
} }
index++ index++
@ -116,8 +114,8 @@ fun GatheringByteChannel.write(buffers: Array<out ByteBuffer>): Long {
val bufferList = ArrayList<java.nio.ByteBuffer>() val bufferList = ArrayList<java.nio.ByteBuffer>()
buffers.forEach { buffers.forEach {
if (it is MultipleByteBuffer) { if (it is MultipleByteBuffer) {
it.buffers.forEach { it.readBuffers().forEach { nioBuffer ->
bufferList.add(it.readBuffer()) bufferList.add(nioBuffer)
} }
} else { } else {
bufferList.add(it.readBuffer()) bufferList.add(it.readBuffer())
@ -128,13 +126,12 @@ fun GatheringByteChannel.write(buffers: Array<out ByteBuffer>): Long {
write(bufferArray) write(bufferArray)
} finally { } finally {
var index = 0 var index = 0
val iterator = bufferList.iterator()
buffers.forEach { buffers.forEach {
if (it is MultipleByteBuffer) { if (it is MultipleByteBuffer) {
it.buffers.forEach { it.finishRead(iterator)
it.finishRead(bufferArray[index])
}
} else { } else {
it.finishRead(bufferArray[index]) it.finishRead(iterator.next())
} }
} }
index++ index++

View File

@ -0,0 +1,5 @@
package cn.tursom.core.buffer
interface ByteBufferExtensionKey<T> {
fun get(buffer: ByteBuffer): T? = null
}

View File

@ -1,6 +1,5 @@
package cn.tursom.buffer package cn.tursom.core.buffer
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.ListByteBuffer import cn.tursom.core.buffer.impl.ListByteBuffer
import cn.tursom.core.forEachIndex import cn.tursom.core.forEachIndex
import java.io.Closeable import java.io.Closeable

View File

@ -6,7 +6,7 @@ import cn.tursom.core.buffer.ByteBuffer
class HeapByteBuffer( class HeapByteBuffer(
private var buffer: java.nio.ByteBuffer, private var buffer: java.nio.ByteBuffer,
override var readPosition: Int = 0, override var readPosition: Int = 0,
override var writePosition: Int = 0 override var writePosition: Int = 0,
) : ByteBuffer { ) : ByteBuffer {
constructor(size: Int) : this(java.nio.ByteBuffer.allocate(size)) constructor(size: Int) : this(java.nio.ByteBuffer.allocate(size))
constructor(string: String) : this(string.toByteArray()) constructor(string: String) : this(string.toByteArray())

View File

@ -1,7 +1,7 @@
package cn.tursom.core.buffer.impl package cn.tursom.core.buffer.impl
import cn.tursom.buffer.MultipleByteBuffer
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.MultipleByteBuffer
import java.nio.ByteOrder import java.nio.ByteOrder
@Suppress("MemberVisibilityCanBePrivate") @Suppress("MemberVisibilityCanBePrivate")

View File

@ -2,7 +2,9 @@ package cn.tursom.core.buffer.impl
import cn.tursom.core.AsyncFile import cn.tursom.core.AsyncFile
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.ByteBufferExtensionKey
import cn.tursom.core.reference.FreeReference import cn.tursom.core.reference.FreeReference
import cn.tursom.core.uncheckedCast
import cn.tursom.log.impl.Slf4jImpl import cn.tursom.log.impl.Slf4jImpl
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import java.io.OutputStream import java.io.OutputStream
@ -59,6 +61,13 @@ class NettyByteBuffer(
private val atomicClosed = AtomicBoolean(false) private val atomicClosed = AtomicBoolean(false)
override fun <T> getExtension(key: ByteBufferExtensionKey<T>): T? {
return when (key) {
AsyncFile.Reader, AsyncFile.Writer -> this.uncheckedCast()
else -> super.getExtension(key)
}
}
override suspend fun readAsyncFile(file: AsyncFile, position: Long): Int { override suspend fun readAsyncFile(file: AsyncFile, position: Long): Int {
val nioBuffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes()) val nioBuffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes())
var readPosition = position var readPosition = position

View File

@ -1,6 +1,7 @@
package cn.tursom.core.buffer.impl package cn.tursom.core.buffer.impl
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.ClosedBufferException
import cn.tursom.core.buffer.ProxyByteBuffer import cn.tursom.core.buffer.ProxyByteBuffer
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
@ -26,6 +27,9 @@ class SplitByteBuffer(
} }
override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer { override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer {
if (closed) {
throw ClosedBufferException("SplitByteBuffer was closed.")
}
return SplitByteBuffer(parent, childCount, agent.slice(position, size, readPosition, writePosition)) return SplitByteBuffer(parent, childCount, agent.slice(position, size, readPosition, writePosition))
} }

View File

@ -1,38 +1,37 @@
package cn.tursom.core.buffer.impl package cn.tursom.core.buffer.impl
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.ClosedBufferException
import cn.tursom.core.buffer.ProxyByteBuffer import cn.tursom.core.buffer.ProxyByteBuffer
import cn.tursom.core.pool.MemoryPool import cn.tursom.core.pool.MemoryPool
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
class InstantByteBuffer( class InstantByteBuffer(
override val agent: ByteBuffer, override val agent: ByteBuffer,
val pool: MemoryPool, val pool: MemoryPool,
) : ProxyByteBuffer, ByteBuffer by agent { ) : ProxyByteBuffer, ByteBuffer by agent {
override var closed = false override val closed get() = aClosed.get()
private val childCount = AtomicInteger(0) private val childCount = AtomicInteger(0)
private val aClosed = AtomicBoolean(false)
override fun close() { override fun close() {
if (childCount.get() == 0) { if (childCount.get() == 0 && aClosed.compareAndSet(false, true)) {
agent.close() agent.close()
pool.free(this) pool.free(this)
closed = true
} }
} }
override fun closeChild(child: ByteBuffer) { override fun closeChild(child: ByteBuffer) {
if (child is SplitByteBuffer && child.parent == this && childCount.decrementAndGet() == 0) { if (child is SplitByteBuffer && child.parent == this && childCount.decrementAndGet() == 0) {
if (closed) { close()
close()
}
} }
} }
override fun slice(position: Int, size: Int): ByteBuffer {
return SplitByteBuffer(this, childCount, agent.slice(position, size))
}
override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer { override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer {
if (closed) {
throw ClosedBufferException("InstantByteBuffer was closed.")
}
return SplitByteBuffer(this, childCount, agent.slice(position, size, readPosition, writePosition)) return SplitByteBuffer(this, childCount, agent.slice(position, size, readPosition, writePosition))
} }

View File

@ -57,7 +57,7 @@ class PooledByteBuffer(
override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer { override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer {
if (closed) { if (closed) {
throw ClosedBufferException("PooledByteBuffer has closed.") throw ClosedBufferException("PooledByteBuffer was closed.")
} }
return SplitByteBuffer(this, childCount, agent.slice(position, size, readPosition, writePosition)) return SplitByteBuffer(this, childCount, agent.slice(position, size, readPosition, writePosition))
} }

View File

@ -1,7 +1,7 @@
package cn.tursom.channel package cn.tursom.channel
import cn.tursom.buffer.MultipleByteBuffer
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.MultipleByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.core.pool.MemoryPool import cn.tursom.core.pool.MemoryPool
import java.io.Closeable import java.io.Closeable

View File

@ -1,9 +1,9 @@
package cn.tursom.channel package cn.tursom.channel
import cn.tursom.buffer.MultipleByteBuffer
import cn.tursom.channel.AsyncChannel.Companion.emptyBufferCode import cn.tursom.channel.AsyncChannel.Companion.emptyBufferCode
import cn.tursom.channel.AsyncChannel.Companion.emptyBufferLongCode import cn.tursom.channel.AsyncChannel.Companion.emptyBufferLongCode
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.MultipleByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.core.buffer.read import cn.tursom.core.buffer.read
import cn.tursom.core.pool.MemoryPool import cn.tursom.core.pool.MemoryPool

View File

@ -1,7 +1,7 @@
package cn.tursom.datagram.server package cn.tursom.datagram.server
import cn.tursom.buffer.MultipleByteBuffer
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.MultipleByteBuffer
import cn.tursom.core.buffer.read import cn.tursom.core.buffer.read
import cn.tursom.core.pool.MemoryPool import cn.tursom.core.pool.MemoryPool
import cn.tursom.core.timer.TimerTask import cn.tursom.core.timer.TimerTask