diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt b/AsyncSocket/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt index 285fe76..0998df9 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/socket/AsyncSocket.kt @@ -2,31 +2,60 @@ package cn.tursom.socket import cn.tursom.buffer.MultipleByteBuffer import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.read +import cn.tursom.core.buffer.write import cn.tursom.core.pool.MemoryPool import cn.tursom.niothread.NioThread import java.io.Closeable import java.net.SocketException +import java.nio.channels.FileChannel import java.nio.channels.SelectionKey import java.nio.channels.SocketChannel +@Suppress("unused") interface AsyncSocket : Closeable { val open: Boolean val channel: SocketChannel val key: SelectionKey val nioThread: NioThread - suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long = 0L): Long - suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long = 0L): Long + suspend fun <T> write(timeout: Long, action: () -> T): T + suspend fun <T> read(timeout: Long, action: () -> T): T + + suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long = 0L): Long = write(timeout) { channel.write(buffer) } + suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long = 0L): Long = read(timeout) { channel.read(buffer) } suspend fun write(buffer: ByteBuffer, timeout: Long = 0L): Int = write(arrayOf(buffer), timeout).toInt() suspend fun read(buffer: ByteBuffer, timeout: Long = 0L): Int = read(arrayOf(buffer), timeout).toInt() suspend fun write(buffer: MultipleByteBuffer, timeout: Long = 0L): Long = write(buffer.buffers, timeout) suspend fun read(buffer: MultipleByteBuffer, timeout: Long = 0L): Long = read(buffer.buffers, timeout) + suspend fun write( + file: FileChannel, + position: Long, + count: Long, + timeout: Long = 0 + ): Long = write(timeout) { + file.transferTo(position, count, channel) + } + + suspend fun read( + file: FileChannel, + position: Long, + count: Long, + timeout: Long = 0 + ): Long = read(timeout) { + file.transferFrom(channel, position, count) + } + /** * 在有数据读取的时候自动由内存池分配内存 */ @Throws(SocketException::class) - suspend fun read(pool: MemoryPool, timeout: Long = 0L): ByteBuffer + suspend fun read(pool: MemoryPool, timeout: Long = 0L): ByteBuffer = read(timeout) { + val buffer = pool.get() + if (channel.read(buffer) < 0) throw SocketException() + buffer + } override fun close() diff --git a/AsyncSocket/src/main/kotlin/cn/tursom/socket/NioSocket.kt b/AsyncSocket/src/main/kotlin/cn/tursom/socket/NioSocket.kt index 38180c6..03a612f 100644 --- a/AsyncSocket/src/main/kotlin/cn/tursom/socket/NioSocket.kt +++ b/AsyncSocket/src/main/kotlin/cn/tursom/socket/NioSocket.kt @@ -9,6 +9,7 @@ import cn.tursom.core.timer.TimerTask import cn.tursom.core.timer.WheelTimer import cn.tursom.niothread.NioThread import java.net.SocketException +import java.nio.channels.FileChannel import java.nio.channels.SelectionKey import java.nio.channels.SocketChannel import java.util.concurrent.TimeoutException @@ -24,45 +25,48 @@ class NioSocket(override val key: SelectionKey, override val nioThread: NioThrea override val channel: SocketChannel = key.channel() as SocketChannel override val open: Boolean get() = channel.isOpen && key.isValid - override suspend fun read(buffer: ByteBuffer, timeout: Long): Int { - if (buffer.writeable == 0) return emptyBufferCode + override suspend fun <T> write(timeout: Long, action: () -> T): T { + return operate { + waitWrite(timeout) + action() + } + } + + override suspend fun <T> read(timeout: Long, action: () -> T): T { return operate { waitRead(timeout) + action() + } + } + + override suspend fun read(buffer: ByteBuffer, timeout: Long): Int { + if (buffer.writeable == 0) return emptyBufferCode + return write(timeout) { channel.read(buffer) } } override suspend fun read(buffer: Array<out ByteBuffer>, timeout: Long): Long { if (buffer.isEmpty() && buffer.all { it.writeable != 0 }) return emptyBufferLongCode - return operate { - waitRead(timeout) + return read(timeout) { channel.read(buffer) } } override suspend fun write(buffer: ByteBuffer, timeout: Long): Int { if (buffer.readable == 0) return emptyBufferCode - return operate { - waitWrite(timeout) + return write(timeout) { channel.write(buffer) } } override suspend fun write(buffer: Array<out ByteBuffer>, timeout: Long): Long { if (buffer.isEmpty() && buffer.all { it.readable != 0 }) return emptyBufferLongCode - return operate { - waitWrite(timeout) + return write(timeout) { channel.write(buffer) } } - override suspend fun read(pool: MemoryPool, timeout: Long): ByteBuffer = operate { - waitRead(timeout) - val buffer = pool.get() - if (channel.read(buffer) < 0) throw SocketException() - buffer - } - override fun close() { if (channel.isOpen || key.isValid) { nioThread.execute { diff --git a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/RequestParser.kt b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/RequestParser.kt index 8ca05b8..ecc724e 100644 --- a/web/netty-web/src/main/kotlin/cn/tursom/web/netty/RequestParser.kt +++ b/web/netty-web/src/main/kotlin/cn/tursom/web/netty/RequestParser.kt @@ -11,38 +11,38 @@ import java.util.HashMap * HTTP请求参数解析器, 支持GET, POST */ object RequestParser { - fun parse(fullReq: FullHttpRequest): HashMap<String, List<String>> { - val method = fullReq.method() - - val paramMap = HashMap<String, List<String>>() - - when { - HttpMethod.GET === method -> try { - // 是GET请求 - val decoder = QueryStringDecoder(fullReq.uri()) - decoder.parameters().entries.forEach { entry -> - paramMap[entry.key] = entry.value - } - } catch (e: Exception) { - } - HttpMethod.POST === method -> try { - // 是POST请求 - val decoder = HttpPostRequestDecoder(fullReq) - decoder.offer(fullReq) - - val paramList = decoder.bodyHttpDatas - - for (param in paramList) { - val data = param as Attribute - if (!paramMap.containsKey(data.name)) { - paramMap[data.name] = ArrayList() - } - (paramMap[data.name] as ArrayList).add(data.value) - } - } catch (e: Exception) { - } - } - - return paramMap - } + fun parse(fullReq: FullHttpRequest): HashMap<String, List<String>> { + val method = fullReq.method() + + val paramMap = HashMap<String, List<String>>() + + when (method) { + HttpMethod.GET -> try { + // 是GET请求 + val decoder = QueryStringDecoder(fullReq.uri()) + decoder.parameters().entries.forEach { entry -> + paramMap[entry.key] = entry.value + } + } catch (e: Exception) { + } + HttpMethod.POST -> try { + // 是POST请求 + val decoder = HttpPostRequestDecoder(fullReq) + decoder.offer(fullReq) + + val paramList = decoder.bodyHttpDatas + + for (param in paramList) { + val data = param as Attribute + if (!paramMap.containsKey(data.name)) { + paramMap[data.name] = ArrayList() + } + (paramMap[data.name] as ArrayList).add(data.value) + } + } catch (e: Exception) { + } + } + + return paramMap + } } \ No newline at end of file