diff --git a/ts-core/src/main/kotlin/cn/tursom/core/Tools.kt b/ts-core/src/main/kotlin/cn/tursom/core/Tools.kt index 4583190..eaeccec 100644 --- a/ts-core/src/main/kotlin/cn/tursom/core/Tools.kt +++ b/ts-core/src/main/kotlin/cn/tursom/core/Tools.kt @@ -134,6 +134,9 @@ inline fun <T> Any?.cast() = this as T @Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST") inline fun <T> Any?.uncheckedCast() = this as T +@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST") +inline fun <T> Any?.uncheckedCastNullable() = this as? T + @Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST") inline fun <reified T> Any?.castOrNull() = if (this is T) this else null diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/AsyncFile.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/AsyncFile.kt index 6c1f40b..c1f044f 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/AsyncFile.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/AsyncFile.kt @@ -1,8 +1,8 @@ package cn.tursom.core -import cn.tursom.core.buffer.ByteBuffer -import cn.tursom.core.buffer.read -import cn.tursom.core.buffer.write +import cn.tursom.core.AsyncFile.Reader +import cn.tursom.core.AsyncFile.Writer +import cn.tursom.core.buffer.* import java.nio.channels.AsynchronousFileChannel import java.nio.channels.CompletionHandler import java.nio.file.Files @@ -19,11 +19,69 @@ import kotlin.coroutines.suspendCoroutine class AsyncFile(val path: Path) { constructor(path: String) : this(Paths.get(path)) - interface Writer { + fun interface Writer { + companion object : ByteBufferExtensionKey<Writer> { + override tailrec fun get(buffer: ByteBuffer): Writer? { + return when (buffer) { + is MultipleByteBuffer -> Writer { file, position -> + var writePosition = position + val nioBuffers = buffer.readBuffers().toList() + + run { + nioBuffers.forEach { readBuf -> + while (readBuf.position() < readBuf.limit()) { + val writeSize = file.write(readBuf, writePosition) + if (writeSize > 0) { + writePosition += writeSize + } else { + return@run + } + } + } + } + + buffer.finishRead(nioBuffers.iterator()) + (writePosition - position).toInt() + } + is ProxyByteBuffer -> get(buffer.agent) + else -> null + } + } + } + suspend fun writeAsyncFile(file: AsyncFile, position: Long): Int } - interface Reader { + fun interface Reader { + companion object : ByteBufferExtensionKey<Reader> { + override tailrec fun get(buffer: ByteBuffer): Reader? { + return when (buffer) { + is MultipleByteBuffer -> Reader { file, position -> + var readPosition = position + val nioBuffers = buffer.writeBuffers().toList() + + run { + nioBuffers.forEach { nioBuf -> + while (nioBuf.position() < nioBuf.limit()) { + val readSize = file.read(nioBuf, readPosition) + if (readSize > 0) { + readPosition += readSize + } else { + return@run + } + } + } + } + + buffer.finishWrite(nioBuffers.iterator()) + (readPosition - position).toInt() + } + is ProxyByteBuffer -> get(buffer.agent) + else -> null + } + } + } + suspend fun readAsyncFile(file: AsyncFile, position: Long): Int } @@ -44,33 +102,35 @@ class AsyncFile(val path: Path) { val readChannel: AsynchronousFileChannel by lazy { AsynchronousFileChannel.open(path, StandardOpenOption.READ) } suspend fun writeAndWait(buffer: ByteBuffer, position: Long = writePosition): Int { - val writeSize = if (buffer is Writer) { - buffer.writeAsyncFile(this, position) - } else buffer.read { - suspendCoroutine { cont -> - writeChannel.write(it, position, cont, handler) + val writeSize = buffer.getExtension(Writer)?.writeAsyncFile(this, position) + ?: buffer.read { + write(it, position) } - } writePosition += writeSize return writeSize } + private suspend fun write(nioBuf: java.nio.ByteBuffer, position: Long): Int = suspendCoroutine { cont -> + writeChannel.write(nioBuf, position, cont, handler) + } + suspend fun appendAndWait(buffer: ByteBuffer, position: Long = size): Int { return writeAndWait(buffer, position) } suspend fun read(buffer: ByteBuffer, position: Long = readPosition): Int { - val readSize = if (buffer is Reader) { - buffer.readAsyncFile(this, position) - } else buffer.write { - suspendCoroutine { cont -> - readChannel.read(it, position, cont, handler) + val readSize = buffer.getExtension(Reader)?.readAsyncFile(this, position) + ?: buffer.write { + read(it, position) } - } readPosition += readSize return readSize } + private suspend fun read(nioBuf: java.nio.ByteBuffer, position: Long): Int = suspendCoroutine { cont -> + readChannel.read(nioBuf, position, cont, handler) + } + fun create() = if (!existsCache || !exists) { Files.createFile(path) existsCache = true diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/BuffersArray.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/BuffersArray.kt new file mode 100644 index 0000000..d7a1432 --- /dev/null +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/BuffersArray.kt @@ -0,0 +1 @@ +package cn.tursom.core.buffer diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt index 23e1d19..8a2c3b4 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt @@ -16,6 +16,8 @@ import kotlin.math.min */ @Suppress("unused") interface ByteBuffer : Closeable { + fun <T> getExtension(key: ByteBufferExtensionKey<T>): T? = key.get(this) + /** * 使用读 buffer,ByteBuffer 实现类有义务维护指针正常推进 */ diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtension.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtension.kt index 365102e..31e32d1 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtension.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtension.kt @@ -5,7 +5,6 @@ package cn.tursom.core.buffer -import cn.tursom.buffer.MultipleByteBuffer import cn.tursom.core.buffer.impl.ArrayByteBuffer import cn.tursom.core.toBytes import cn.tursom.core.toInt @@ -87,8 +86,8 @@ fun ScatteringByteChannel.read(buffers: Array<out ByteBuffer>): Long { val bufferList = ArrayList<java.nio.ByteBuffer>() buffers.forEach { if (it is MultipleByteBuffer) { - it.buffers.forEach { - bufferList.add(it.writeBuffer()) + it.writeBuffers().forEach { nioBuffer -> + bufferList.add(nioBuffer) } } else { bufferList.add(it.writeBuffer()) @@ -99,13 +98,12 @@ fun ScatteringByteChannel.read(buffers: Array<out ByteBuffer>): Long { read(bufferArray) } finally { var index = 0 + val nioBuffers = bufferList.iterator() buffers.forEach { if (it is MultipleByteBuffer) { - it.buffers.forEach { - it.finishWrite(bufferArray[index]) - } + it.finishWrite(nioBuffers) } else { - it.finishWrite(bufferArray[index]) + it.finishWrite(nioBuffers.next()) } } index++ @@ -116,8 +114,8 @@ fun GatheringByteChannel.write(buffers: Array<out ByteBuffer>): Long { val bufferList = ArrayList<java.nio.ByteBuffer>() buffers.forEach { if (it is MultipleByteBuffer) { - it.buffers.forEach { - bufferList.add(it.readBuffer()) + it.readBuffers().forEach { nioBuffer -> + bufferList.add(nioBuffer) } } else { bufferList.add(it.readBuffer()) @@ -128,13 +126,12 @@ fun GatheringByteChannel.write(buffers: Array<out ByteBuffer>): Long { write(bufferArray) } finally { var index = 0 + val iterator = bufferList.iterator() buffers.forEach { if (it is MultipleByteBuffer) { - it.buffers.forEach { - it.finishRead(bufferArray[index]) - } + it.finishRead(iterator) } else { - it.finishRead(bufferArray[index]) + it.finishRead(iterator.next()) } } index++ diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtensionKey.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtensionKey.kt new file mode 100644 index 0000000..d6d3580 --- /dev/null +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBufferExtensionKey.kt @@ -0,0 +1,5 @@ +package cn.tursom.core.buffer + +interface ByteBufferExtensionKey<T> { + fun get(buffer: ByteBuffer): T? = null +} diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/MultipleByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/MultipleByteBuffer.kt index 5840f76..d32d123 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/MultipleByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/MultipleByteBuffer.kt @@ -1,6 +1,5 @@ -package cn.tursom.buffer +package cn.tursom.core.buffer -import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.impl.ListByteBuffer import cn.tursom.core.forEachIndex import java.io.Closeable diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/HeapByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/HeapByteBuffer.kt index 9cb0105..984be6c 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/HeapByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/HeapByteBuffer.kt @@ -6,7 +6,7 @@ import cn.tursom.core.buffer.ByteBuffer class HeapByteBuffer( private var buffer: java.nio.ByteBuffer, override var readPosition: Int = 0, - override var writePosition: Int = 0 + override var writePosition: Int = 0, ) : ByteBuffer { constructor(size: Int) : this(java.nio.ByteBuffer.allocate(size)) constructor(string: String) : this(string.toByteArray()) diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/ListByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/ListByteBuffer.kt index 3a09b53..23a99d4 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/ListByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/ListByteBuffer.kt @@ -1,7 +1,7 @@ package cn.tursom.core.buffer.impl -import cn.tursom.buffer.MultipleByteBuffer import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.MultipleByteBuffer import java.nio.ByteOrder @Suppress("MemberVisibilityCanBePrivate") diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt index 2791a30..12dfa4a 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt @@ -2,7 +2,9 @@ package cn.tursom.core.buffer.impl import cn.tursom.core.AsyncFile import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.ByteBufferExtensionKey import cn.tursom.core.reference.FreeReference +import cn.tursom.core.uncheckedCast import cn.tursom.log.impl.Slf4jImpl import io.netty.buffer.ByteBuf import java.io.OutputStream @@ -59,6 +61,13 @@ class NettyByteBuffer( private val atomicClosed = AtomicBoolean(false) + override fun <T> getExtension(key: ByteBufferExtensionKey<T>): T? { + return when (key) { + AsyncFile.Reader, AsyncFile.Writer -> this.uncheckedCast() + else -> super.getExtension(key) + } + } + override suspend fun readAsyncFile(file: AsyncFile, position: Long): Int { val nioBuffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes()) var readPosition = position diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/SplitByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/SplitByteBuffer.kt index 740485e..915346b 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/SplitByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/SplitByteBuffer.kt @@ -1,6 +1,7 @@ package cn.tursom.core.buffer.impl import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.ClosedBufferException import cn.tursom.core.buffer.ProxyByteBuffer import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger @@ -26,6 +27,9 @@ class SplitByteBuffer( } override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer { + if (closed) { + throw ClosedBufferException("SplitByteBuffer was closed.") + } return SplitByteBuffer(parent, childCount, agent.slice(position, size, readPosition, writePosition)) } diff --git a/ts-core/ts-pool/src/main/kotlin/cn/tursom/core/buffer/impl/InstantByteBuffer.kt b/ts-core/ts-pool/src/main/kotlin/cn/tursom/core/buffer/impl/InstantByteBuffer.kt index 3a14782..c43e579 100644 --- a/ts-core/ts-pool/src/main/kotlin/cn/tursom/core/buffer/impl/InstantByteBuffer.kt +++ b/ts-core/ts-pool/src/main/kotlin/cn/tursom/core/buffer/impl/InstantByteBuffer.kt @@ -1,38 +1,37 @@ package cn.tursom.core.buffer.impl import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.ClosedBufferException import cn.tursom.core.buffer.ProxyByteBuffer import cn.tursom.core.pool.MemoryPool +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger class InstantByteBuffer( override val agent: ByteBuffer, val pool: MemoryPool, ) : ProxyByteBuffer, ByteBuffer by agent { - override var closed = false + override val closed get() = aClosed.get() private val childCount = AtomicInteger(0) + private val aClosed = AtomicBoolean(false) override fun close() { - if (childCount.get() == 0) { + if (childCount.get() == 0 && aClosed.compareAndSet(false, true)) { agent.close() pool.free(this) - closed = true } } override fun closeChild(child: ByteBuffer) { if (child is SplitByteBuffer && child.parent == this && childCount.decrementAndGet() == 0) { - if (closed) { - close() - } + close() } } - override fun slice(position: Int, size: Int): ByteBuffer { - return SplitByteBuffer(this, childCount, agent.slice(position, size)) - } - override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer { + if (closed) { + throw ClosedBufferException("InstantByteBuffer was closed.") + } return SplitByteBuffer(this, childCount, agent.slice(position, size, readPosition, writePosition)) } diff --git a/ts-core/ts-pool/src/main/kotlin/cn/tursom/core/buffer/impl/PooledByteBuffer.kt b/ts-core/ts-pool/src/main/kotlin/cn/tursom/core/buffer/impl/PooledByteBuffer.kt index f6aaaa0..06a0ca5 100644 --- a/ts-core/ts-pool/src/main/kotlin/cn/tursom/core/buffer/impl/PooledByteBuffer.kt +++ b/ts-core/ts-pool/src/main/kotlin/cn/tursom/core/buffer/impl/PooledByteBuffer.kt @@ -57,7 +57,7 @@ class PooledByteBuffer( override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer { if (closed) { - throw ClosedBufferException("PooledByteBuffer has closed.") + throw ClosedBufferException("PooledByteBuffer was closed.") } return SplitByteBuffer(this, childCount, agent.slice(position, size, readPosition, writePosition)) } diff --git a/ts-socket/src/main/kotlin/cn/tursom/channel/AsyncChannel.kt b/ts-socket/src/main/kotlin/cn/tursom/channel/AsyncChannel.kt index 3d79c6c..b35ba02 100644 --- a/ts-socket/src/main/kotlin/cn/tursom/channel/AsyncChannel.kt +++ b/ts-socket/src/main/kotlin/cn/tursom/channel/AsyncChannel.kt @@ -1,7 +1,7 @@ package cn.tursom.channel -import cn.tursom.buffer.MultipleByteBuffer import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.MultipleByteBuffer import cn.tursom.core.buffer.impl.HeapByteBuffer import cn.tursom.core.pool.MemoryPool import java.io.Closeable diff --git a/ts-socket/src/main/kotlin/cn/tursom/channel/AsyncNioChannel.kt b/ts-socket/src/main/kotlin/cn/tursom/channel/AsyncNioChannel.kt index 6fce8d6..e1bb977 100644 --- a/ts-socket/src/main/kotlin/cn/tursom/channel/AsyncNioChannel.kt +++ b/ts-socket/src/main/kotlin/cn/tursom/channel/AsyncNioChannel.kt @@ -1,9 +1,9 @@ package cn.tursom.channel -import cn.tursom.buffer.MultipleByteBuffer import cn.tursom.channel.AsyncChannel.Companion.emptyBufferCode import cn.tursom.channel.AsyncChannel.Companion.emptyBufferLongCode import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.MultipleByteBuffer import cn.tursom.core.buffer.impl.HeapByteBuffer import cn.tursom.core.buffer.read import cn.tursom.core.pool.MemoryPool diff --git a/ts-socket/src/main/kotlin/cn/tursom/datagram/server/ServerNioDatagram.kt b/ts-socket/src/main/kotlin/cn/tursom/datagram/server/ServerNioDatagram.kt index faa22f0..f0bf64b 100644 --- a/ts-socket/src/main/kotlin/cn/tursom/datagram/server/ServerNioDatagram.kt +++ b/ts-socket/src/main/kotlin/cn/tursom/datagram/server/ServerNioDatagram.kt @@ -1,7 +1,7 @@ package cn.tursom.datagram.server -import cn.tursom.buffer.MultipleByteBuffer import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.MultipleByteBuffer import cn.tursom.core.buffer.read import cn.tursom.core.pool.MemoryPool import cn.tursom.core.timer.TimerTask