add NioBuffers

This commit is contained in:
tursom 2021-07-18 01:16:08 +08:00
parent 278e19511c
commit d403a6192c
10 changed files with 336 additions and 107 deletions

View File

@ -21,11 +21,14 @@ class AsyncFile(val path: Path) {
fun interface Writer { fun interface Writer {
companion object : ByteBufferExtensionKey<Writer> { companion object : ByteBufferExtensionKey<Writer> {
override val extensionClass: Class<Writer> = Writer::class.java
override tailrec fun get(buffer: ByteBuffer): Writer? { override tailrec fun get(buffer: ByteBuffer): Writer? {
return when (buffer) { val sequences = buffer.getExtension(NioBuffers.Sequences)
is MultipleByteBuffer -> Writer { file, position -> return when {
sequences != null -> Writer { file, position ->
var writePosition = position var writePosition = position
val nioBuffers = buffer.readBuffers().toList() val nioBuffers = sequences.readBufferSequence().toList()
run { run {
nioBuffers.forEach { readBuf -> nioBuffers.forEach { readBuf ->
@ -40,10 +43,10 @@ class AsyncFile(val path: Path) {
} }
} }
buffer.finishRead(nioBuffers.iterator()) sequences.finishRead(nioBuffers.iterator())
(writePosition - position).toInt() (writePosition - position).toInt()
} }
is ProxyByteBuffer -> get(buffer.agent) buffer is ProxyByteBuffer -> get(buffer.agent)
else -> null else -> null
} }
} }
@ -54,11 +57,14 @@ class AsyncFile(val path: Path) {
fun interface Reader { fun interface Reader {
companion object : ByteBufferExtensionKey<Reader> { companion object : ByteBufferExtensionKey<Reader> {
override val extensionClass: Class<Reader> = Reader::class.java
override tailrec fun get(buffer: ByteBuffer): Reader? { override tailrec fun get(buffer: ByteBuffer): Reader? {
return when (buffer) { val sequences = buffer.getExtension(NioBuffers.Sequences)
is MultipleByteBuffer -> Reader { file, position -> return when {
sequences != null -> Reader { file, position ->
var readPosition = position var readPosition = position
val nioBuffers = buffer.writeBuffers().toList() val nioBuffers = sequences.writeBufferSequence().toList()
run { run {
nioBuffers.forEach { nioBuf -> nioBuffers.forEach { nioBuf ->
@ -73,10 +79,10 @@ class AsyncFile(val path: Path) {
} }
} }
buffer.finishWrite(nioBuffers.iterator()) sequences.finishWrite(nioBuffers.iterator())
(readPosition - position).toInt() (readPosition - position).toInt()
} }
is ProxyByteBuffer -> get(buffer.agent) buffer is ProxyByteBuffer -> get(buffer.agent)
else -> null else -> null
} }
} }

View File

@ -11,8 +11,7 @@ import java.nio.ByteOrder
import kotlin.math.min import kotlin.math.min
/** /**
* 针对 java nio 的弱智 ByteBuffer 的简单封装 * 针对其他库的字节缓冲的封装
* 支持读写 buffer 分离
*/ */
@Suppress("unused") @Suppress("unused")
interface ByteBuffer : Closeable { interface ByteBuffer : Closeable {

View File

@ -5,7 +5,19 @@
package cn.tursom.core.buffer package cn.tursom.core.buffer
import cn.tursom.core.buffer.NioBuffers.Arrays.Companion.readArrays
import cn.tursom.core.buffer.NioBuffers.Arrays.Companion.writeArrays
import cn.tursom.core.buffer.NioBuffers.Lists.Companion.readLists
import cn.tursom.core.buffer.NioBuffers.Lists.Companion.writeLists
import cn.tursom.core.buffer.NioBuffers.Sequences.Companion.readSequences
import cn.tursom.core.buffer.NioBuffers.Sequences.Companion.writeSequences
import cn.tursom.core.buffer.NioBuffers.finishRead
import cn.tursom.core.buffer.NioBuffers.finishWrite
import cn.tursom.core.buffer.NioBuffers.getReadNioBufferList
import cn.tursom.core.buffer.NioBuffers.getWriteNioBufferList
import cn.tursom.core.buffer.impl.ArrayByteBuffer import cn.tursom.core.buffer.impl.ArrayByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.core.buffer.impl.ListByteBuffer
import cn.tursom.core.toBytes import cn.tursom.core.toBytes
import cn.tursom.core.toInt import cn.tursom.core.toInt
import java.nio.ByteOrder import java.nio.ByteOrder
@ -39,102 +51,104 @@ inline fun <T> ByteBuffer.write(block: (java.nio.ByteBuffer) -> T): T {
} }
} }
inline fun <T> MultipleByteBuffer.reads(block: (Sequence<java.nio.ByteBuffer>) -> T): T { //inline fun <T> MultipleByteBuffer.reads(block: (Sequence<java.nio.ByteBuffer>) -> T): T {
val bufferList = readBuffers() // val bufferList = readBufferSequence()
try { // try {
return block(bufferList) // return block(bufferList)
} finally { // } finally {
finishRead(bufferList) // finishRead(bufferList)
} // }
} //}
//inline fun <T> MultipleByteBuffer.writes(block: (Sequence<java.nio.ByteBuffer>) -> T): T {
inline fun <T> MultipleByteBuffer.writes(block: (Sequence<java.nio.ByteBuffer>) -> T): T { // val bufferList = writeBufferSequence()
val bufferList = writeBuffers() // try {
try { // return block(bufferList)
return block(bufferList) // } finally {
} finally { // finishWrite(bufferList)
finishWrite(bufferList) // }
} //}
}
fun ReadableByteChannel.read(buffer: ByteBuffer): Int { fun ReadableByteChannel.read(buffer: ByteBuffer): Int {
return if (buffer is MultipleByteBuffer && this is ScatteringByteChannel) { if (this is ScatteringByteChannel) {
buffer.writeBuffers { read(it.toList().toTypedArray()) }.toInt() val arrays = buffer.getExtension(NioBuffers.Arrays)
} else { if (arrays != null) {
buffer.write { read(it) } return arrays.writeArrays { nioBuffers ->
read(nioBuffers)
}.toInt()
}
val list = buffer.getExtension(NioBuffers.Lists)
if (list != null) {
return list.writeLists { nioBuffers ->
read(nioBuffers.toTypedArray())
}.toInt()
}
val sequences = buffer.getExtension(NioBuffers.Sequences)
if (sequences != null) {
return sequences.writeSequences { nioBuffers ->
read(nioBuffers.toList().toTypedArray())
}.toInt()
}
} }
return buffer.write { read(it) }
} }
fun WritableByteChannel.write(buffer: ByteBuffer): Int { fun WritableByteChannel.write(buffer: ByteBuffer): Int {
return if (buffer is MultipleByteBuffer && this is GatheringByteChannel) { if (this is GatheringByteChannel) {
buffer.readBuffers { write(it.toList().toTypedArray()) }.toInt() val arrays = buffer.getExtension(NioBuffers.Arrays)
} else { if (arrays != null) {
buffer.read { write(it) } return arrays.readArrays { nioBuffers ->
} write(nioBuffers)
} }.toInt()
}
fun ScatteringByteChannel.read(buffer: MultipleByteBuffer): Long { val list = buffer.getExtension(NioBuffers.Lists)
return buffer.writeBuffers { read(it.toList().toTypedArray()) } if (list != null) {
} return list.readLists { nioBuffers ->
write(nioBuffers.toTypedArray())
}.toInt()
}
fun GatheringByteChannel.write(buffer: MultipleByteBuffer): Long { val sequences = buffer.getExtension(NioBuffers.Sequences)
return buffer.readBuffers { write(it.toList().toTypedArray()) } if (sequences != null) {
} return sequences.readSequences { nioBuffers ->
write(nioBuffers.toList().toTypedArray())
fun ScatteringByteChannel.read(buffers: Array<out ByteBuffer>): Long { }.toInt()
val bufferList = ArrayList<java.nio.ByteBuffer>()
buffers.forEach {
if (it is MultipleByteBuffer) {
it.writeBuffers().forEach { nioBuffer ->
bufferList.add(nioBuffer)
}
} else {
bufferList.add(it.writeBuffer())
} }
} }
return buffer.read { write(it) }
}
//fun ScatteringByteChannel.read(buffer: MultipleByteBuffer): Long {
// return buffer.writeBuffers { read(it.toList().toTypedArray()) }
//}
//fun GatheringByteChannel.write(buffer: MultipleByteBuffer): Long {
// return buffer.readBuffers { write(it.toList().toTypedArray()) }
//}
fun ScatteringByteChannel.read(buffers: Array<out ByteBuffer>): Long {
val bufferList = buffers.iterator().getWriteNioBufferList()
val bufferArray = bufferList.toTypedArray() val bufferArray = bufferList.toTypedArray()
return try { return try {
read(bufferArray) read(bufferArray)
} finally { } finally {
var index = 0 val iterator = bufferList.iterator()
val nioBuffers = bufferList.iterator() buffers.iterator().finishWrite(iterator)
buffers.forEach {
if (it is MultipleByteBuffer) {
it.finishWrite(nioBuffers)
} else {
it.finishWrite(nioBuffers.next())
}
}
index++
} }
} }
fun GatheringByteChannel.write(buffers: Array<out ByteBuffer>): Long { fun GatheringByteChannel.write(buffers: Array<out ByteBuffer>): Long {
val bufferList = ArrayList<java.nio.ByteBuffer>() val bufferList = buffers.iterator().getReadNioBufferList()
buffers.forEach {
if (it is MultipleByteBuffer) {
it.readBuffers().forEach { nioBuffer ->
bufferList.add(nioBuffer)
}
} else {
bufferList.add(it.readBuffer())
}
}
val bufferArray = bufferList.toTypedArray() val bufferArray = bufferList.toTypedArray()
return try { return try {
write(bufferArray) write(bufferArray)
} finally { } finally {
var index = 0
val iterator = bufferList.iterator() val iterator = bufferList.iterator()
buffers.forEach { buffers.iterator().finishRead(iterator)
if (it is MultipleByteBuffer) {
it.finishRead(iterator)
} else {
it.finishRead(iterator.next())
}
}
index++
} }
} }
@ -218,3 +232,7 @@ fun ByteBuffer.putLongWithSize(l: Long, size: Int, byteOrder: ByteOrder = ByteOr
} }
} }
fun main() {
println(HeapByteBuffer(1).getExtension(NioBuffers.Sequences))
println(ListByteBuffer().getExtension(NioBuffers.Sequences))
}

View File

@ -1,5 +1,13 @@
package cn.tursom.core.buffer package cn.tursom.core.buffer
interface ByteBufferExtensionKey<T> { interface ByteBufferExtensionKey<T> {
fun get(buffer: ByteBuffer): T? = null val extensionClass: Class<T>
fun get(buffer: ByteBuffer): T? = if (extensionClass.isInstance(buffer)) {
@Suppress("UNCHECKED_CAST")
buffer as T
} else {
null
}
} }

View File

@ -8,7 +8,7 @@ import java.io.OutputStream
import java.nio.ByteOrder import java.nio.ByteOrder
@Suppress("unused") @Suppress("unused")
interface MultipleByteBuffer : Closeable, ByteBuffer { interface MultipleByteBuffer : Closeable, ByteBuffer, NioBuffers.Sequences {
val buffers: List<ByteBuffer> get() = listOf(this) val buffers: List<ByteBuffer> get() = listOf(this)
val buffersArray: Array<out ByteBuffer> get() = arrayOf(this) val buffersArray: Array<out ByteBuffer> get() = arrayOf(this)
@ -18,7 +18,7 @@ interface MultipleByteBuffer : Closeable, ByteBuffer {
* 使用读 bufferByteBuffer 实现类有义务维护指针正常推进 * 使用读 bufferByteBuffer 实现类有义务维护指针正常推进
*/ */
fun <T> readBuffers(block: (Sequence<java.nio.ByteBuffer>) -> T): T { fun <T> readBuffers(block: (Sequence<java.nio.ByteBuffer>) -> T): T {
val buffer = readBuffers() val buffer = readBufferSequence()
return try { return try {
block(buffer) block(buffer)
} finally { } finally {
@ -30,7 +30,7 @@ interface MultipleByteBuffer : Closeable, ByteBuffer {
* 使用写 bufferByteBuffer 实现类有义务维护指针正常推进 * 使用写 bufferByteBuffer 实现类有义务维护指针正常推进
*/ */
fun <T> writeBuffers(block: (Sequence<java.nio.ByteBuffer>) -> T): T { fun <T> writeBuffers(block: (Sequence<java.nio.ByteBuffer>) -> T): T {
val buffer = writeBuffers() val buffer = writeBufferSequence()
return try { return try {
block(buffer) block(buffer)
} finally { } finally {
@ -38,20 +38,20 @@ interface MultipleByteBuffer : Closeable, ByteBuffer {
} }
} }
fun readBuffers(): Sequence<java.nio.ByteBuffer> = sequence { override fun readBufferSequence(): Sequence<java.nio.ByteBuffer> = sequence {
buffers.forEach { buffers.forEach {
if (it is MultipleByteBuffer) { if (it is MultipleByteBuffer) {
yieldAll(it.readBuffers()) yieldAll(it.readBufferSequence())
} else { } else {
yield(it.readBuffer()) yield(it.readBuffer())
} }
} }
} }
fun writeBuffers(): Sequence<java.nio.ByteBuffer> = sequence { override fun writeBufferSequence(): Sequence<java.nio.ByteBuffer> = sequence {
buffers.forEach { buffers.forEach {
if (it is MultipleByteBuffer) { if (it is MultipleByteBuffer) {
yieldAll(it.writeBuffers()) yieldAll(it.writeBufferSequence())
} else { } else {
yield(it.writeBuffer()) yield(it.writeBuffer())
} }
@ -59,7 +59,7 @@ interface MultipleByteBuffer : Closeable, ByteBuffer {
} }
fun finishRead(buffers: Sequence<java.nio.ByteBuffer>) = finishRead(buffers.iterator()) fun finishRead(buffers: Sequence<java.nio.ByteBuffer>) = finishRead(buffers.iterator())
fun finishRead(buffers: Iterator<java.nio.ByteBuffer>) { override fun finishRead(buffers: Iterator<java.nio.ByteBuffer>) {
this.buffers.forEach { this.buffers.forEach {
if (it is MultipleByteBuffer) { if (it is MultipleByteBuffer) {
it.finishRead(buffers) it.finishRead(buffers)
@ -70,7 +70,7 @@ interface MultipleByteBuffer : Closeable, ByteBuffer {
} }
fun finishWrite(buffers: Sequence<java.nio.ByteBuffer>) = finishWrite(buffers.iterator()) fun finishWrite(buffers: Sequence<java.nio.ByteBuffer>) = finishWrite(buffers.iterator())
fun finishWrite(buffers: Iterator<java.nio.ByteBuffer>) { override fun finishWrite(buffers: Iterator<java.nio.ByteBuffer>) {
this.buffers.forEach { subBuf -> this.buffers.forEach { subBuf ->
if (subBuf is MultipleByteBuffer) { if (subBuf is MultipleByteBuffer) {
subBuf.finishWrite(buffers) subBuf.finishWrite(buffers)

View File

@ -0,0 +1,157 @@
package cn.tursom.core.buffer
object NioBuffers {
fun Array<out ByteBuffer>.getReadNioBufferList(): List<java.nio.ByteBuffer> = iterator().getReadNioBufferList()
fun Sequence<ByteBuffer>.getReadNioBufferList(): List<java.nio.ByteBuffer> = iterator().getReadNioBufferList()
fun Iterable<ByteBuffer>.getReadNioBufferList(): List<java.nio.ByteBuffer> = iterator().getReadNioBufferList()
fun Iterator<ByteBuffer>.getReadNioBufferList(): List<java.nio.ByteBuffer> {
val bufferList = ArrayList<java.nio.ByteBuffer>()
forEach {
val nioBuffersList = it.getExtension(Sequences)
if (nioBuffersList != null) {
bufferList.addAll(nioBuffersList.readBufferSequence())
return@forEach
}
bufferList.add(it.readBuffer())
}
return bufferList
}
fun Array<out ByteBuffer>.finishRead(iterator: Iterator<java.nio.ByteBuffer>) = iterator().finishRead(iterator)
fun Sequence<ByteBuffer>.finishRead(iterator: Iterator<java.nio.ByteBuffer>) = iterator().finishRead(iterator)
fun Iterable<ByteBuffer>.finishRead(iterator: Iterator<java.nio.ByteBuffer>) = iterator().finishRead(iterator)
fun Iterator<ByteBuffer>.finishRead(iterator: Iterator<java.nio.ByteBuffer>) {
forEach {
val nioBuffersList = it.getExtension(Sequences)
if (nioBuffersList != null) {
nioBuffersList.finishRead(iterator)
return@forEach
}
it.finishRead(iterator.next())
}
}
fun Array<out ByteBuffer>.getWriteNioBufferList(): List<java.nio.ByteBuffer> = iterator().getWriteNioBufferList()
fun Sequence<ByteBuffer>.getWriteNioBufferList(): List<java.nio.ByteBuffer> = iterator().getWriteNioBufferList()
fun Iterable<ByteBuffer>.getWriteNioBufferList(): List<java.nio.ByteBuffer> = iterator().getWriteNioBufferList()
fun Iterator<ByteBuffer>.getWriteNioBufferList(): List<java.nio.ByteBuffer> {
val bufferList = ArrayList<java.nio.ByteBuffer>()
forEach {
val nioBuffersList = it.getExtension(Sequences)
if (nioBuffersList != null) {
bufferList.addAll(nioBuffersList.writeBufferSequence())
return@forEach
}
bufferList.add(it.writeBuffer())
}
return bufferList
}
fun Array<out ByteBuffer>.finishWrite(iterator: Iterator<java.nio.ByteBuffer>) = iterator().finishWrite(iterator)
fun Sequence<ByteBuffer>.finishWrite(iterator: Iterator<java.nio.ByteBuffer>) = iterator().finishWrite(iterator)
fun Iterable<ByteBuffer>.finishWrite(iterator: Iterator<java.nio.ByteBuffer>) = iterator().finishWrite(iterator)
fun Iterator<ByteBuffer>.finishWrite(iterator: Iterator<java.nio.ByteBuffer>) {
forEach {
val nioBuffersList = it.getExtension(Sequences)
if (nioBuffersList != null) {
nioBuffersList.finishWrite(iterator)
return@forEach
}
it.finishRead(iterator.next())
}
}
interface Sequences {
companion object : ByteBufferExtensionKey<Sequences> {
override val extensionClass: Class<Sequences> = Sequences::class.java
inline fun <T> Sequences.readSequences(action: (nioBuffers: Sequence<java.nio.ByteBuffer>) -> T): T {
val sequence = readBufferSequence()
val ret = action(sequence)
finishRead(sequence.iterator())
return ret
}
inline fun <T> Sequences.writeSequences(action: (nioBuffers: Sequence<java.nio.ByteBuffer>) -> T): T {
val sequence = writeBufferSequence()
val ret = action(sequence)
finishWrite(sequence.iterator())
return ret
}
}
fun readBufferSequence(): Sequence<java.nio.ByteBuffer>
fun finishRead(buffers: Iterator<java.nio.ByteBuffer>)
fun writeBufferSequence(): Sequence<java.nio.ByteBuffer>
fun finishWrite(buffers: Iterator<java.nio.ByteBuffer>)
}
interface Lists : Sequences {
companion object : ByteBufferExtensionKey<Lists> {
override val extensionClass: Class<Lists> = Lists::class.java
inline fun <T> Lists.readLists(action: (nioBuffers: List<java.nio.ByteBuffer>) -> T): T {
val list = readBufferList()
val ret = action(list)
finishRead(list.iterator())
return ret
}
inline fun <T> Lists.writeLists(action: (nioBuffers: List<java.nio.ByteBuffer>) -> T): T {
val list = writeBufferList()
val ret = action(list)
finishWrite(list.iterator())
return ret
}
}
override fun readBufferSequence(): Sequence<java.nio.ByteBuffer> = readBufferList().asSequence()
override fun writeBufferSequence(): Sequence<java.nio.ByteBuffer> = writeBufferList().asSequence()
fun readBufferList(): List<java.nio.ByteBuffer>
override fun finishRead(buffers: Iterator<java.nio.ByteBuffer>)
fun writeBufferList(): List<java.nio.ByteBuffer>
override fun finishWrite(buffers: Iterator<java.nio.ByteBuffer>)
}
interface Arrays : Lists {
companion object : ByteBufferExtensionKey<Arrays> {
override val extensionClass: Class<Arrays> = Arrays::class.java
inline fun <T> Arrays.readArrays(action: (nioBuffers: Array<out java.nio.ByteBuffer>) -> T): T {
val arrayOfByteBuffers = readBufferArray()
val ret = action(arrayOfByteBuffers)
finishRead(arrayOfByteBuffers.iterator())
return ret
}
inline fun <T> Arrays.writeArrays(action: (nioBuffers: Array<out java.nio.ByteBuffer>) -> T): T {
val arrayOfByteBuffers = writeBufferArray()
val ret = action(arrayOfByteBuffers)
finishWrite(arrayOfByteBuffers.iterator())
return ret
}
}
override fun readBufferSequence(): Sequence<java.nio.ByteBuffer> = readBufferArray().asSequence()
override fun writeBufferSequence(): Sequence<java.nio.ByteBuffer> = writeBufferArray().asSequence()
override fun readBufferList(): List<java.nio.ByteBuffer> = readBufferArray().asList()
override fun writeBufferList(): List<java.nio.ByteBuffer> = writeBufferArray().asList()
fun readBufferArray(): Array<out java.nio.ByteBuffer>
override fun finishRead(buffers: Iterator<java.nio.ByteBuffer>)
fun writeBufferArray(): Array<out java.nio.ByteBuffer>
override fun finishWrite(buffers: Iterator<java.nio.ByteBuffer>)
}
}

View File

@ -3,6 +3,7 @@ package cn.tursom.core.buffer.impl
import cn.tursom.core.AsyncFile import cn.tursom.core.AsyncFile
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.ByteBufferExtensionKey import cn.tursom.core.buffer.ByteBufferExtensionKey
import cn.tursom.core.buffer.NioBuffers
import cn.tursom.core.reference.FreeReference import cn.tursom.core.reference.FreeReference
import cn.tursom.core.uncheckedCast import cn.tursom.core.uncheckedCast
import cn.tursom.log.impl.Slf4jImpl import cn.tursom.log.impl.Slf4jImpl
@ -15,7 +16,7 @@ import kotlin.coroutines.suspendCoroutine
class NettyByteBuffer( class NettyByteBuffer(
val byteBuf: ByteBuf, val byteBuf: ByteBuf,
autoClose: Boolean = false, autoClose: Boolean = false,
) : ByteBuffer, AsyncFile.Reader, AsyncFile.Writer { ) : ByteBuffer, AsyncFile.Reader, AsyncFile.Writer, NioBuffers.Arrays {
companion object : Slf4jImpl() companion object : Slf4jImpl()
constructor( constructor(
@ -61,6 +62,11 @@ class NettyByteBuffer(
private val atomicClosed = AtomicBoolean(false) private val atomicClosed = AtomicBoolean(false)
private var nioReadPosition = 0
private var nioReadBuffersNum = 0
private var nioWritePosition = 0
private var nioWriteBuffersNum = 0
override fun <T> getExtension(key: ByteBufferExtensionKey<T>): T? { override fun <T> getExtension(key: ByteBufferExtensionKey<T>): T? {
return when (key) { return when (key) {
AsyncFile.Reader, AsyncFile.Writer -> this.uncheckedCast() AsyncFile.Reader, AsyncFile.Writer -> this.uncheckedCast()
@ -263,6 +269,38 @@ class NettyByteBuffer(
} }
} }
override fun readBufferArray(): Array<out java.nio.ByteBuffer> {
val nioBuffers = byteBuf.nioBuffers()
nioReadPosition = nioBuffers.sumOf { it.position() }
nioReadBuffersNum = nioBuffers.size
return nioBuffers
}
override fun finishRead(buffers: Iterator<java.nio.ByteBuffer>) {
var readPositionResult = 0
repeat(nioReadBuffersNum) {
readPositionResult += buffers.next().position()
}
nioReadBuffersNum = 0
byteBuf.readerIndex(byteBuf.readerIndex() + readPositionResult - nioReadPosition)
}
override fun writeBufferArray(): Array<out java.nio.ByteBuffer> {
val nioBuffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes())
nioWritePosition = nioBuffers.sumOf { it.position() }
nioWriteBuffersNum = nioBuffers.size
return nioBuffers
}
override fun finishWrite(buffers: Iterator<java.nio.ByteBuffer>) {
var writePositionResult = 0
repeat(nioWriteBuffersNum) {
writePositionResult += buffers.next().position()
}
nioWriteBuffersNum = 0
byteBuf.writerIndex(byteBuf.writerIndex() + writePositionResult - nioWritePosition)
}
override fun toString(): String { override fun toString(): String {
return "Nettyjava.nio.ByteBuffer(byteBuf=$byteBuf)" return "Nettyjava.nio.ByteBuffer(byteBuf=$byteBuf)"
} }

View File

@ -1,7 +1,6 @@
package cn.tursom.channel package cn.tursom.channel
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.MultipleByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.core.pool.MemoryPool import cn.tursom.core.pool.MemoryPool
import java.io.Closeable import java.io.Closeable
@ -21,8 +20,8 @@ interface AsyncChannel : Closeable {
suspend fun read(buffer: List<ByteBuffer>, timeout: Long = 0L): Long = read(buffer.toTypedArray(), timeout) suspend fun read(buffer: List<ByteBuffer>, timeout: Long = 0L): Long = read(buffer.toTypedArray(), timeout)
suspend fun write(buffer: ByteBuffer, timeout: Long = 0L): Int = write(arrayOf(buffer), timeout).toInt() suspend fun write(buffer: ByteBuffer, timeout: Long = 0L): Int = write(arrayOf(buffer), timeout).toInt()
suspend fun read(buffer: ByteBuffer, timeout: Long = 0L): Int = read(arrayOf(buffer), timeout).toInt() suspend fun read(buffer: ByteBuffer, timeout: Long = 0L): Int = read(arrayOf(buffer), timeout).toInt()
suspend fun write(buffer: MultipleByteBuffer, timeout: Long = 0L): Long = write(buffer.buffersArray, timeout) //suspend fun write(buffer: MultipleByteBuffer, timeout: Long = 0L): Long = write(buffer.buffersArray, timeout)
suspend fun read(buffer: MultipleByteBuffer, timeout: Long = 0L): Long = read(buffer.buffersArray, timeout) //suspend fun read(buffer: MultipleByteBuffer, timeout: Long = 0L): Long = read(buffer.buffersArray, timeout)
suspend fun write( suspend fun write(
file: FileChannel, file: FileChannel,

View File

@ -3,7 +3,6 @@ package cn.tursom.channel
import cn.tursom.channel.AsyncChannel.Companion.emptyBufferCode import cn.tursom.channel.AsyncChannel.Companion.emptyBufferCode
import cn.tursom.channel.AsyncChannel.Companion.emptyBufferLongCode import cn.tursom.channel.AsyncChannel.Companion.emptyBufferLongCode
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.MultipleByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.core.buffer.read import cn.tursom.core.buffer.read
import cn.tursom.core.pool.MemoryPool import cn.tursom.core.pool.MemoryPool
@ -26,8 +25,8 @@ interface AsyncNioChannel : AsyncChannel {
override suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long): Long override suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long): Long
override suspend fun write(buffer: ByteBuffer, timeout: Long): Int = write(arrayOf(buffer), timeout).toInt() override suspend fun write(buffer: ByteBuffer, timeout: Long): Int = write(arrayOf(buffer), timeout).toInt()
override suspend fun read(buffer: ByteBuffer, timeout: Long): Int = read(arrayOf(buffer), timeout).toInt() override suspend fun read(buffer: ByteBuffer, timeout: Long): Int = read(arrayOf(buffer), timeout).toInt()
override suspend fun write(buffer: MultipleByteBuffer, timeout: Long): Long = write(buffer.buffers, timeout) //override suspend fun write(buffer: MultipleByteBuffer, timeout: Long): Long = write(buffer.buffers, timeout)
override suspend fun read(buffer: MultipleByteBuffer, timeout: Long): Long = read(buffer.buffers, timeout) //override suspend fun read(buffer: MultipleByteBuffer, timeout: Long): Long = read(buffer.buffers, timeout)
override suspend fun write( override suspend fun write(
file: FileChannel, file: FileChannel,

View File

@ -1,8 +1,8 @@
package cn.tursom.datagram.server package cn.tursom.datagram.server
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.MultipleByteBuffer import cn.tursom.core.buffer.NioBuffers.finishRead
import cn.tursom.core.buffer.read import cn.tursom.core.buffer.NioBuffers.getReadNioBufferList
import cn.tursom.core.pool.MemoryPool import cn.tursom.core.pool.MemoryPool
import cn.tursom.core.timer.TimerTask import cn.tursom.core.timer.TimerTask
import cn.tursom.core.timer.WheelTimer import cn.tursom.core.timer.WheelTimer
@ -38,11 +38,16 @@ class ServerNioDatagram(
} }
override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long { override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long {
val nioBufferList = buffer.getReadNioBufferList()
buffer.finishRead(nioBufferList.iterator())
var write = 0L var write = 0L
buffer.forEach { buf -> nioBufferList.forEach { buf ->
if (buf is MultipleByteBuffer) { if (buf.remaining() != 0) {
} else { val send = channel.send(buf, remoteAddress)
write += buf.read { channel.send(it, remoteAddress) } if (send <= 0) {
return write
}
write += send
} }
} }
return write return write