添加对文件的支持

This commit is contained in:
tursom 2019-11-29 17:49:00 +08:00
parent d587e90024
commit cd43c1dc83
3 changed files with 85 additions and 52 deletions

View File

@ -2,31 +2,60 @@ package cn.tursom.socket
import cn.tursom.buffer.MultipleByteBuffer
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.read
import cn.tursom.core.buffer.write
import cn.tursom.core.pool.MemoryPool
import cn.tursom.niothread.NioThread
import java.io.Closeable
import java.net.SocketException
import java.nio.channels.FileChannel
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
@Suppress("unused")
interface AsyncSocket : Closeable {
val open: Boolean
val channel: SocketChannel
val key: SelectionKey
val nioThread: NioThread
suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long = 0L): Long
suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long = 0L): Long
suspend fun <T> write(timeout: Long, action: () -> T): T
suspend fun <T> read(timeout: Long, action: () -> T): T
suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long = 0L): Long = write(timeout) { channel.write(buffer) }
suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long = 0L): Long = read(timeout) { channel.read(buffer) }
suspend fun write(buffer: ByteBuffer, timeout: Long = 0L): Int = write(arrayOf(buffer), timeout).toInt()
suspend fun read(buffer: ByteBuffer, timeout: Long = 0L): Int = read(arrayOf(buffer), timeout).toInt()
suspend fun write(buffer: MultipleByteBuffer, timeout: Long = 0L): Long = write(buffer.buffers, timeout)
suspend fun read(buffer: MultipleByteBuffer, timeout: Long = 0L): Long = read(buffer.buffers, timeout)
suspend fun write(
file: FileChannel,
position: Long,
count: Long,
timeout: Long = 0
): Long = write(timeout) {
file.transferTo(position, count, channel)
}
suspend fun read(
file: FileChannel,
position: Long,
count: Long,
timeout: Long = 0
): Long = read(timeout) {
file.transferFrom(channel, position, count)
}
/**
* 在有数据读取的时候自动由内存池分配内存
*/
@Throws(SocketException::class)
suspend fun read(pool: MemoryPool, timeout: Long = 0L): ByteBuffer
suspend fun read(pool: MemoryPool, timeout: Long = 0L): ByteBuffer = read(timeout) {
val buffer = pool.get()
if (channel.read(buffer) < 0) throw SocketException()
buffer
}
override fun close()

View File

@ -9,6 +9,7 @@ import cn.tursom.core.timer.TimerTask
import cn.tursom.core.timer.WheelTimer
import cn.tursom.niothread.NioThread
import java.net.SocketException
import java.nio.channels.FileChannel
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import java.util.concurrent.TimeoutException
@ -24,45 +25,48 @@ class NioSocket(override val key: SelectionKey, override val nioThread: NioThrea
override val channel: SocketChannel = key.channel() as SocketChannel
override val open: Boolean get() = channel.isOpen && key.isValid
override suspend fun read(buffer: ByteBuffer, timeout: Long): Int {
if (buffer.writeable == 0) return emptyBufferCode
override suspend fun <T> write(timeout: Long, action: () -> T): T {
return operate {
waitWrite(timeout)
action()
}
}
override suspend fun <T> read(timeout: Long, action: () -> T): T {
return operate {
waitRead(timeout)
action()
}
}
override suspend fun read(buffer: ByteBuffer, timeout: Long): Int {
if (buffer.writeable == 0) return emptyBufferCode
return write(timeout) {
channel.read(buffer)
}
}
override suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long): Long {
if (buffer.isEmpty() && buffer.all { it.writeable != 0 }) return emptyBufferLongCode
return operate {
waitRead(timeout)
return read(timeout) {
channel.read(buffer)
}
}
override suspend fun write(buffer: ByteBuffer, timeout: Long): Int {
if (buffer.readable == 0) return emptyBufferCode
return operate {
waitWrite(timeout)
return write(timeout) {
channel.write(buffer)
}
}
override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long {
if (buffer.isEmpty() && buffer.all { it.readable != 0 }) return emptyBufferLongCode
return operate {
waitWrite(timeout)
return write(timeout) {
channel.write(buffer)
}
}
override suspend fun read(pool: MemoryPool, timeout: Long): ByteBuffer = operate {
waitRead(timeout)
val buffer = pool.get()
if (channel.read(buffer) < 0) throw SocketException()
buffer
}
override fun close() {
if (channel.isOpen || key.isValid) {
nioThread.execute {

View File

@ -11,38 +11,38 @@ import java.util.HashMap
* HTTP请求参数解析器, 支持GET, POST
*/
object RequestParser {
fun parse(fullReq: FullHttpRequest): HashMap<String, List<String>> {
val method = fullReq.method()
val paramMap = HashMap<String, List<String>>()
when {
HttpMethod.GET === method -> try {
// 是GET请求
val decoder = QueryStringDecoder(fullReq.uri())
decoder.parameters().entries.forEach { entry ->
paramMap[entry.key] = entry.value
}
} catch (e: Exception) {
}
HttpMethod.POST === method -> try {
// 是POST请求
val decoder = HttpPostRequestDecoder(fullReq)
decoder.offer(fullReq)
val paramList = decoder.bodyHttpDatas
for (param in paramList) {
val data = param as Attribute
if (!paramMap.containsKey(data.name)) {
paramMap[data.name] = ArrayList()
}
(paramMap[data.name] as ArrayList).add(data.value)
}
} catch (e: Exception) {
}
}
return paramMap
}
fun parse(fullReq: FullHttpRequest): HashMap<String, List<String>> {
val method = fullReq.method()
val paramMap = HashMap<String, List<String>>()
when (method) {
HttpMethod.GET -> try {
// 是GET请求
val decoder = QueryStringDecoder(fullReq.uri())
decoder.parameters().entries.forEach { entry ->
paramMap[entry.key] = entry.value
}
} catch (e: Exception) {
}
HttpMethod.POST -> try {
// 是POST请求
val decoder = HttpPostRequestDecoder(fullReq)
decoder.offer(fullReq)
val paramList = decoder.bodyHttpDatas
for (param in paramList) {
val data = param as Attribute
if (!paramMap.containsKey(data.name)) {
paramMap[data.name] = ArrayList()
}
(paramMap[data.name] as ArrayList).add(data.value)
}
} catch (e: Exception) {
}
}
return paramMap
}
}