重构Channel增强类

This commit is contained in:
tursom 2020-05-22 16:06:03 +08:00
parent 777a8317bf
commit 94a0a311cb
17 changed files with 911 additions and 439 deletions

View File

@ -1,5 +1,5 @@
dependencies {
implementation project(":")
compile project(":")
// kotlin
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1'

View File

@ -3,5 +3,7 @@ package cn.tursom.channel.enhance
import java.io.Closeable
interface EnhanceChannel<Read, Write> : ChannelReader<Read>, ChannelWriter<Write>, Closeable {
val reader: ChannelReader<Read>
val writer: ChannelWriter<Write>
override fun close()
}

View File

@ -0,0 +1,14 @@
package cn.tursom.channel.enhance.impl
import cn.tursom.channel.BufferedAsyncChannel
import cn.tursom.channel.enhance.ChannelReader
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.pool.MemoryPool
class BufferedChannelReaderImpl(
private val socket: BufferedAsyncChannel
) : ChannelReader<ByteBuffer> {
suspend fun read(timeout: Long) = socket.read(socket.pool, timeout)
override suspend fun read(pool: MemoryPool, timeout: Long) = socket.read(pool, timeout)
override fun close() = socket.close()
}

View File

@ -0,0 +1,11 @@
package cn.tursom.channel.enhance.impl
import cn.tursom.channel.AsyncChannel
import cn.tursom.core.buffer.ByteBuffer
class EnhanceChannel<Channel : AsyncChannel>(
val channel: Channel
) : EnhanceChannelImpl<ByteBuffer, ByteBuffer>(
ChannelReaderImpl(channel),
ChannelWriterImpl(channel)
)

View File

@ -0,0 +1,16 @@
package cn.tursom.channel.enhance.impl
import cn.tursom.channel.enhance.ChannelReader
import cn.tursom.channel.enhance.ChannelWriter
import cn.tursom.channel.enhance.EnhanceChannel
import cn.tursom.core.pool.MemoryPool
open class EnhanceChannelImpl<Read, Write>(
override val reader: ChannelReader<Read>,
override val writer: ChannelWriter<Write>
) : EnhanceChannel<Read, Write>, ChannelReader<Read> by reader, ChannelWriter<Write> by writer {
override fun close() {
reader.close()
writer.close()
}
}

View File

@ -6,13 +6,14 @@ import cn.tursom.core.pool.ExpandableMemoryPool
import cn.tursom.core.pool.LongBitSetDirectMemoryPool
import cn.tursom.core.pool.MemoryPool
import cn.tursom.channel.enhance.ChannelReader
import cn.tursom.core.buffer.impl.ListByteBuffer
class LengthFieldBasedFrameReader(
private val prevReader: ChannelReader<ByteBuffer>
) : ChannelReader<List<ByteBuffer>> {
) : ChannelReader<ByteBuffer> {
private var lastRead: ByteBuffer? = null
override suspend fun read(pool: MemoryPool, timeout: Long): List<ByteBuffer> {
override suspend fun read(pool: MemoryPool, timeout: Long): ByteBuffer {
val startTime = CurrentTimeMillisClock.now
val maxSize = prevReader.read(prevPool, timeout).let {
val size = it.getInt()
@ -28,24 +29,26 @@ class LengthFieldBasedFrameReader(
while (readSize < maxSize) {
val buffer = prevReader.read(pool, timeout - (CurrentTimeMillisClock.now - startTime))
readSize += buffer.readable
bufList.add(if (readSize > maxSize) {
lastRead = buffer.slice(
buffer.readPosition + readSize - maxSize,
buffer.capacity - readSize - maxSize,
0,
buffer.capacity - readSize - maxSize
)
buffer.slice(
buffer.readPosition,
readSize - maxSize,
0,
readSize - maxSize
)
} else {
buffer
})
bufList.add(
if (readSize > maxSize) {
lastRead = buffer.slice(
buffer.readPosition + readSize - maxSize,
buffer.capacity - readSize - maxSize,
0,
buffer.capacity - readSize - maxSize
)
buffer.slice(
buffer.readPosition,
readSize - maxSize,
0,
readSize - maxSize
)
} else {
buffer
}
)
}
return bufList
return ListByteBuffer(bufList)
}
override fun close() = prevReader.close()

View File

@ -0,0 +1,13 @@
package cn.tursom.channel.enhance.impl
import cn.tursom.channel.enhance.ChannelReader
import cn.tursom.channel.enhance.ChannelWriter
import cn.tursom.core.buffer.ByteBuffer
class LengthFieldChannel(
reader: ChannelReader<ByteBuffer>,
writer: ChannelWriter<ByteBuffer>
) : EnhanceChannelImpl<ByteBuffer, ByteBuffer>(
LengthFieldBasedFrameReader(reader),
LengthFieldPrependWriter(writer)
)

View File

@ -0,0 +1,19 @@
package cn.tursom.channel.enhance.impl
import cn.tursom.channel.enhance.ChannelReader
import cn.tursom.channel.enhance.ChannelWriter
import cn.tursom.channel.enhance.EnhanceChannel
import cn.tursom.core.buffer.ByteBuffer
class StringChannel(
prevReader: ChannelReader<ByteBuffer>,
prevWriter: ChannelWriter<ByteBuffer>
) : EnhanceChannelImpl<String, String>(
StringReader(prevReader),
StringWriter(prevWriter)
) {
constructor(enhanceChannel: EnhanceChannel<ByteBuffer, ByteBuffer>) : this(
enhanceChannel.reader,
enhanceChannel.writer
)
}

View File

@ -1,5 +1,6 @@
package cn.tursom.datagram.server
import cn.tursom.buffer.MultipleByteBuffer
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.read
import cn.tursom.core.pool.MemoryPool
@ -39,7 +40,10 @@ class ServerNioDatagram(
override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long {
var write = 0L
buffer.forEach { buf ->
write += buf.read { channel.send(it, remoteAddress) }
if (buf is MultipleByteBuffer) {
} else {
write += buf.read { channel.send(it, remoteAddress) }
}
}
return write
}

View File

@ -8,23 +8,26 @@ import cn.tursom.channel.enhance.ChannelReader
import cn.tursom.channel.enhance.ChannelWriter
class SecurityEnhanceChannel(
val reader: ChannelReader<ByteBuffer>,
val writer: ChannelWriter<ByteArray>,
val preReader: ChannelReader<ByteBuffer>,
val preWriter: ChannelWriter<ByteArray>,
val aes: AES
) : EnhanceChannel<ByteArray, ByteArray> {
override val reader: ChannelReader<ByteArray> get() = this
override val writer: ChannelWriter<ByteArray> get() = this
override fun close() {
reader.close()
writer.close()
preReader.close()
preWriter.close()
}
override suspend fun read(pool: MemoryPool, timeout: Long): ByteArray {
val buffer = reader.read(pool, timeout)
val buffer = preReader.read(pool, timeout)
return aes.decrypt(buffer)
}
override suspend fun write(value: ByteArray) {
writer.write(aes.encrypt(value))
preWriter.write(aes.encrypt(value))
}
override suspend fun flush(timeout: Long): Long = writer.flush()
override suspend fun flush(timeout: Long): Long = preWriter.flush()
}

View File

@ -15,7 +15,7 @@ import kotlinx.coroutines.GlobalScope
*/
open class BuffedNioServer(
port: Int,
private val memoryPool: MemoryPool,
val memoryPool: MemoryPool,
backlog: Int = 50,
coroutineScope: CoroutineScope = GlobalScope,
handler: suspend BufferedAsyncSocket.() -> Unit

File diff suppressed because it is too large Load Diff

View File

@ -34,9 +34,28 @@ inline fun <T> ByteBuffer.write(block: (java.nio.ByteBuffer) -> T): T {
}
}
inline fun <T> MultipleByteBuffer.reads(block: (List<java.nio.ByteBuffer>) -> T): T {
val bufferList = readBuffers()
try {
return block(bufferList)
} finally {
finishRead(bufferList)
}
}
inline fun <T> MultipleByteBuffer.writes(block: (List<java.nio.ByteBuffer>) -> T): T {
val bufferList = writeBuffers()
try {
return block(bufferList)
} finally {
finishWrite(bufferList)
}
}
fun ReadableByteChannel.read(buffer: ByteBuffer): Int {
return if (buffer is MultipleByteBuffer && this is ScatteringByteChannel) {
buffer.writeBuffers { read(it) }.toInt()
buffer.writeBuffers { read(it.toTypedArray()) }.toInt()
} else {
buffer.write { read(it) }
}
@ -44,35 +63,75 @@ fun ReadableByteChannel.read(buffer: ByteBuffer): Int {
fun WritableByteChannel.write(buffer: ByteBuffer): Int {
return if (buffer is MultipleByteBuffer && this is GatheringByteChannel) {
buffer.readBuffers { write(it) }.toInt()
buffer.readBuffers { write(it.toTypedArray()) }.toInt()
} else {
buffer.read { write(it) }
}
}
fun ScatteringByteChannel.read(buffer: MultipleByteBuffer): Long {
return buffer.writeBuffers { read(it) }
return buffer.writeBuffers { read(it.toTypedArray()) }
}
fun GatheringByteChannel.write(buffer: MultipleByteBuffer): Long {
return buffer.readBuffers { write(it) }
return buffer.readBuffers { write(it.toTypedArray()) }
}
fun ScatteringByteChannel.read(buffers: Array<out ByteBuffer>): Long {
val bufferArray = Array(buffers.size) { buffers[it].writeBuffer() }
val bufferList = ArrayList<java.nio.ByteBuffer>()
buffers.forEach {
if (it is MultipleByteBuffer) {
it.forEach {
bufferList.add(it.writeBuffer())
}
} else {
bufferList.add(it.writeBuffer())
}
}
val bufferArray = bufferList.toTypedArray()
return try {
read(bufferArray)
} finally {
buffers.forEachIndexed { index, byteBuffer -> byteBuffer.finishWrite(bufferArray[index]) }
var index = 0
buffers.forEach {
if (it is MultipleByteBuffer) {
it.forEach {
it.finishWrite(bufferArray[index])
}
} else {
it.finishRead(bufferArray[index])
}
}
index++
}
}
fun GatheringByteChannel.write(buffers: Array<out ByteBuffer>): Long {
val bufferArray = Array(buffers.size) { buffers[it].readBuffer() }
val bufferList = ArrayList<java.nio.ByteBuffer>()
buffers.forEach {
if (it is MultipleByteBuffer) {
it.forEach {
bufferList.add(it.readBuffer())
}
} else {
bufferList.add(it.readBuffer())
}
}
val bufferArray = bufferList.toTypedArray()
return try {
write(bufferArray)
} finally {
buffers.forEachIndexed { index, byteBuffer -> byteBuffer.finishRead(bufferArray[index]) }
var index = 0
buffers.forEach {
if (it is MultipleByteBuffer) {
it.forEach {
it.finishRead(bufferArray[index])
}
} else {
it.finishRead(bufferArray[index])
}
}
index++
}
}

View File

@ -2,16 +2,22 @@ package cn.tursom.buffer
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.ListByteBuffer
import cn.tursom.core.forEachIndex
import cn.tursom.core.toBytes
import java.io.Closeable
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
import kotlin.math.min
@Suppress("unused")
interface MultipleByteBuffer : List<ByteBuffer>, Closeable {
interface MultipleByteBuffer : List<ByteBuffer>, Closeable, ByteBuffer {
val buffers: Array<out ByteBuffer> get() = toTypedArray()
/**
* 使用读 bufferByteBuffer 实现类有义务维护指针正常推进
*/
fun <T> readBuffers(block: (Array<out java.nio.ByteBuffer>) -> T): T {
fun <T> readBuffers(block: (List<java.nio.ByteBuffer>) -> T): T {
val buffer = readBuffers()
return try {
block(buffer)
@ -23,7 +29,7 @@ interface MultipleByteBuffer : List<ByteBuffer>, Closeable {
/**
* 使用写 bufferByteBuffer 实现类有义务维护指针正常推进
*/
fun <T> writeBuffers(block: (Array<out java.nio.ByteBuffer>) -> T): T {
fun <T> writeBuffers(block: (List<java.nio.ByteBuffer>) -> T): T {
val buffer = writeBuffers()
return try {
block(buffer)
@ -32,20 +38,169 @@ interface MultipleByteBuffer : List<ByteBuffer>, Closeable {
}
}
fun readBuffers(): Array<out java.nio.ByteBuffer> = Array(size) { this[it].readBuffer() }
fun writeBuffers(): Array<out java.nio.ByteBuffer> = Array(size) { this[it].writeBuffer() }
fun finishRead(buffers: Array<out java.nio.ByteBuffer>) = buffers.forEachIndexed { index, byteBuffer ->
this[index].finishRead(byteBuffer)
fun readBuffers(): List<java.nio.ByteBuffer> {
val bufferList = ArrayList<java.nio.ByteBuffer>()
buffers.forEach {
if (it is MultipleByteBuffer) {
it.forEach {
bufferList.add(it.readBuffer())
}
} else {
bufferList.add(it.readBuffer())
}
}
return bufferList
}
fun finishWrite(buffers: Array<out java.nio.ByteBuffer>) = buffers.forEachIndexed { index, byteBuffer ->
this[index].finishWrite(byteBuffer)
fun writeBuffers(): List<java.nio.ByteBuffer> {
val bufferList = ArrayList<java.nio.ByteBuffer>()
buffers.forEach {
if (it is MultipleByteBuffer) {
it.forEach {
bufferList.add(it.writeBuffer())
}
} else {
bufferList.add(it.writeBuffer())
}
}
return bufferList
}
fun finishRead(buffers: List<java.nio.ByteBuffer>) {
var index = 0
forEach {
if (it is MultipleByteBuffer) {
it.forEach {
it.finishRead(buffers[index])
index++
}
} else {
it.finishRead(buffers[index])
index++
}
}
}
fun finishWrite(buffers: List<java.nio.ByteBuffer>) {
var index = 0
forEach {
if (it is MultipleByteBuffer) {
it.forEach {
it.finishWrite(buffers[index])
index++
}
} else {
it.finishWrite(buffers[index])
index++
}
}
}
override fun close() = forEach(ByteBuffer::close)
fun slice(offset: Int, size: Int): MultipleByteBuffer = ListByteBuffer(subList(offset, offset + size))
fun fill(byte: Byte) = forEach { it.fill(byte) }
fun clear() = forEach(ByteBuffer::clear)
fun reset() = forEach(ByteBuffer::reset)
override fun slice(offset: Int, size: Int): MultipleByteBuffer = ListByteBuffer(subList(offset, offset + size))
override fun fill(byte: Byte) = forEach { it.fill(byte) }
override fun clear() = forEach(ByteBuffer::clear)
override fun reset() = forEach(ByteBuffer::reset)
override val resized: Boolean get() = false
override val hasArray: Boolean get() = false
override val array: ByteArray
get() = throw UnsupportedOperationException()
override val arrayOffset: Int get() = 0
override val capacity: Int
get() {
var capacity = 0
forEach {
capacity += it.capacity
}
return capacity
}
override fun readBuffer(): java.nio.ByteBuffer = throw UnsupportedOperationException()
override fun writeBuffer(): java.nio.ByteBuffer = throw UnsupportedOperationException()
override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer =
throw UnsupportedOperationException()
override fun resize(newSize: Int): Boolean = false
override fun get(): Byte
override fun getChar(): Char = cn.tursom.core.toChar(::get)
override fun getShort(): Short = cn.tursom.core.toShort(::get)
override fun getInt(): Int = cn.tursom.core.toInt(::get)
override fun getLong(): Long = cn.tursom.core.toLong(::get)
override fun getFloat(): Float = cn.tursom.core.toFloat(::get)
override fun getDouble(): Double = cn.tursom.core.toDouble(::get)
override fun getBytes(size: Int): ByteArray {
val buffer = ByteArray(size)
buffer.indices.forEach {
buffer[it] = get()
}
return buffer
}
override fun writeTo(buffer: ByteArray, bufferOffset: Int, size: Int): Int {
var write = 0
try {
repeat(size) {
buffer[bufferOffset + it] = get()
write++
}
} catch (e: Exception) {
}
return write
}
override fun writeTo(os: OutputStream): Int {
var write = 0
try {
while (true) {
os.write(get().toInt())
write++
}
} catch (e: Exception) {
}
return write
}
override fun writeTo(buffer: ByteBuffer): Int {
var write = 0
try {
while (true) {
buffer.put(get().toInt())
write++
}
} catch (e: Exception) {
}
return write
}
override fun put(byte: Byte): Unit
override fun put(char: Char) = char.toBytes { put(it) }
override fun put(short: Short) = short.toBytes { put(it) }
override fun put(int: Int) = int.toBytes { put(it) }
override fun put(long: Long) = long.toBytes { put(it) }
override fun put(float: Float) = float.toBytes { put(it) }
override fun put(double: Double) = double.toBytes { put(it) }
override fun put(byteArray: ByteArray, offset: Int, len: Int): Int {
var write = 0
byteArray.forEachIndex(offset, offset + len) {
put(it)
write++
}
return write
}
override fun put(inputStream: InputStream, size: Int): Int {
var read = 0
try {
put(inputStream.read().toByte())
read++
} catch (e: Exception) {
}
return read
}
override fun split(maxSize: Int): Array<out ByteBuffer> = throw UnsupportedOperationException()
override fun readAllSize(): Int = throw UnsupportedOperationException()
}

View File

@ -4,5 +4,5 @@ import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.buffer.MultipleByteBuffer
class ArrayByteBuffer(
override vararg val buffers: ByteBuffer
) : MultipleByteBuffer, List<ByteBuffer> by buffers.asList()
override vararg val buffers: ByteBuffer
) : ListByteBuffer(buffers.asList())

View File

@ -1,8 +0,0 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.buffer.MultipleByteBuffer
class ArrayListByteBuffer : MultipleByteBuffer, MutableList<ByteBuffer> by ArrayList() {
override fun clear() = super.clear()
}

View File

@ -1,6 +1,59 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.buffer.MultipleByteBuffer
import cn.tursom.core.buffer.ByteBuffer
class ListByteBuffer(bufferList: List<ByteBuffer>) : MultipleByteBuffer, List<ByteBuffer> by bufferList
open class ListByteBuffer(val bufferList: List<ByteBuffer>) : MultipleByteBuffer, List<ByteBuffer> by bufferList {
private var readOperator = bufferList.firstOrNull()
private var writeOperator = bufferList.firstOrNull()
override val hasArray: Boolean get() = false
override val array: ByteArray
get() = throw UnsupportedOperationException()
override val capacity: Int
get() {
var capacity = 0
bufferList.forEach {
capacity += it.capacity
}
return capacity
}
override val arrayOffset: Int get() = 0
override var writePosition: Int = 0
override var readPosition: Int = 0
override val resized: Boolean get() = false
override fun readBuffer(): java.nio.ByteBuffer = throw UnsupportedOperationException()
override fun writeBuffer(): java.nio.ByteBuffer = throw UnsupportedOperationException()
override fun slice(position: Int, size: Int, readPosition: Int, writePosition: Int): ByteBuffer =
throw UnsupportedOperationException()
override fun resize(newSize: Int): Boolean = throw UnsupportedOperationException()
fun updateRead() {
if (readOperator == null || readOperator!!.readable == 0) {
readOperator = bufferList[readPosition++]
}
}
fun updateWrite() {
if (writeOperator == null || writeOperator!!.readable == 0) {
writeOperator = bufferList[writePosition++]
}
}
override fun get(): Byte {
updateRead()
return readOperator!!.get()
}
override fun put(byte: Byte) {
TODO("Not yet implemented")
}
override fun split(maxSize: Int): Array<out ByteBuffer> {
TODO("Not yet implemented")
}
override fun readAllSize(): Int {
TODO("Not yet implemented")
}
}