From 228a1137b772cf1dc6f23d9288c74ff08b7aa6a2 Mon Sep 17 00:00:00 2001 From: tursom Date: Sun, 11 Jul 2021 22:24:04 +0800 Subject: [PATCH] update ShutdownHook --- .../kotlin/cn/tursom/core/ShutdownHook.kt | 35 ++++++++-- .../kotlin/cn/tursom/http}/HttpRequest.kt | 53 ++++++++------- .../main/kotlin/cn/tursom/core/AsyncFile.kt | 66 +++++++++++++++---- .../cn/tursom/core/buffer/ByteBuffer.kt | 14 ++-- .../tursom/core/buffer/MultipleByteBuffer.kt | 2 +- .../core/buffer/impl/NettyByteBuffer.kt | 50 ++++++++++++-- 6 files changed, 167 insertions(+), 53 deletions(-) rename ts-core/{ts-buffer/src/main/kotlin/cn/tursom/core => ts-async-http/src/main/kotlin/cn/tursom/http}/HttpRequest.kt (99%) diff --git a/ts-core/src/main/kotlin/cn/tursom/core/ShutdownHook.kt b/ts-core/src/main/kotlin/cn/tursom/core/ShutdownHook.kt index e78d80f..dedf9ba 100644 --- a/ts-core/src/main/kotlin/cn/tursom/core/ShutdownHook.kt +++ b/ts-core/src/main/kotlin/cn/tursom/core/ShutdownHook.kt @@ -1,6 +1,8 @@ package cn.tursom.core import com.sun.org.slf4j.internal.LoggerFactory +import java.lang.ref.Reference +import java.lang.ref.SoftReference import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.atomic.AtomicInteger @@ -12,15 +14,40 @@ import java.util.concurrent.atomic.AtomicInteger object ShutdownHook { private val logger = LoggerFactory.getLogger(ShutdownHook::class.java) - private val shutdownHooks = ConcurrentLinkedDeque<() -> Unit>() + private val shutdownHooks = ConcurrentLinkedDeque Unit)?>>() private val availableThreadCount = Runtime.getRuntime().availableProcessors() * 2 private val activeThreadCount = AtomicInteger() - fun addHook(hook: () -> Unit): Boolean { + interface Reference { + fun get(): T + } + + class Hook( + private val hook: () -> Unit, + private val reference: Reference<(() -> Unit)?> + ) { + fun cancel() { + shutdownHooks.remove(reference) + } + } + + fun addHook(softReference: Boolean = false, hook: () -> Unit): Hook { if (activeThreadCount.incrementAndGet() <= availableThreadCount) { addWorkThread() } - return shutdownHooks.add(hook) + + val reference = if (softReference) { + object : Reference<(() -> Unit)?> { + private val ref = SoftReference(hook) + override fun get(): (() -> Unit)? = ref.get() + } + } else { + object : Reference<() -> Unit> { + override fun get(): () -> Unit = hook + } + } + shutdownHooks.add(reference) + return Hook(hook, reference) } private fun addWorkThread() { @@ -28,7 +55,7 @@ object ShutdownHook { var hook = shutdownHooks.poll() while (hook != null) { try { - hook() + hook.get()?.invoke() } catch (e: Throwable) { //error("an exception caused on hook", e) } diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/HttpRequest.kt b/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/HttpRequest.kt similarity index 99% rename from ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/HttpRequest.kt rename to ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/HttpRequest.kt index c6ec725..55b157c 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/HttpRequest.kt +++ b/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/HttpRequest.kt @@ -1,4 +1,4 @@ -package cn.tursom.core +package cn.tursom.http import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.impl.HeapByteBuffer @@ -72,6 +72,13 @@ object HttpRequest { return conn } + fun send( + method: String = "GET", + url: String, + headers: Map = defaultHeader, + data: ByteArray? + ) = send(method, url, headers, data?.let { HeapByteBuffer(data) }) + fun getContextStream( method: String = "GET", url: String, @@ -96,13 +103,6 @@ object HttpRequest { return conn.getRealInputStream().readBytes().toString(conn.getCharset()) } - fun send( - method: String = "GET", - url: String, - headers: Map = defaultHeader, - data: ByteArray? - ) = send(method, url, headers, data?.let { HeapByteBuffer(data) }) - fun doGet( url: String, param: String? = null, @@ -132,6 +132,24 @@ object HttpRequest { }, headers) } + fun doPost( + url: String, + data: ByteArray, + headers: Map = defaultHeader + ): String = getContextStr("POST", url, headers, HeapByteBuffer(data)) + + fun doPost( + url: String, + param: Map, + headers: Map = defaultHeader + ): String { + val sb = StringBuilder() + param.forEach { (key, value) -> + sb.append("${URLEncoder.encode(key, "utf-8")}=${URLEncoder.encode(value, "utf-8")}&") + } + if (sb.isNotEmpty()) sb.deleteCharAt(sb.lastIndex) + return doPost(url, sb.toString().toByteArray(), headers) + } fun doHead( url: String, @@ -154,23 +172,4 @@ object HttpRequest { }, headers) } - fun doPost( - url: String, - data: ByteArray, - headers: Map = defaultHeader - ): String = getContextStr("POST", url, headers, HeapByteBuffer(data)) - - fun doPost( - url: String, - param: Map, - headers: Map = defaultHeader - ): String { - val sb = StringBuilder() - param.forEach { (key, value) -> - sb.append("${URLEncoder.encode(key, "utf-8")}=${URLEncoder.encode(value, "utf-8")}&") - } - if (sb.isNotEmpty()) sb.deleteCharAt(sb.lastIndex) - return doPost(url, sb.toString().toByteArray(), headers) - } - } \ No newline at end of file diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/AsyncFile.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/AsyncFile.kt index fd4f0c2..61e02b6 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/AsyncFile.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/AsyncFile.kt @@ -9,17 +9,26 @@ import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths import java.nio.file.StandardOpenOption +import java.util.concurrent.Future import kotlin.coroutines.Continuation import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine -@Suppress("MemberVisibilityCanBePrivate") +@Suppress("MemberVisibilityCanBePrivate", "unused", "DuplicatedCode") class AsyncFile(val path: Path) { constructor(path: String) : this(Paths.get(path)) - private var existsCache = false + interface Writer { + suspend fun writeAndWait(file: AsyncFile, position: Long): Int + } + + interface Reader { + suspend fun read(file: AsyncFile, position: Long): Int + } + + private var existsCache = exists val exists: Boolean get() { @@ -27,21 +36,28 @@ class AsyncFile(val path: Path) { existsCache = exists return exists } + val size get() = if (existsCache || exists) Files.size(path) else 0 + var writePosition: Long = 0 + var readPosition: Long = 0 val writeChannel: AsynchronousFileChannel by lazy { AsynchronousFileChannel.open(path, StandardOpenOption.WRITE) } val readChannel: AsynchronousFileChannel by lazy { AsynchronousFileChannel.open(path, StandardOpenOption.READ) } - fun write(buffer: ByteBuffer, position: Long = 0) { - create() - buffer.read { writeChannel.write(it, position) } + fun write(buffer: ByteBuffer, position: Long = writePosition): Future { + return buffer.read { + writeChannel.write(it, position) + } } - suspend fun writeAndWait(buffer: ByteBuffer, position: Long = 0): Int { - create() - return suspendCoroutine { cont -> - buffer.read { writeChannel.write(it, position, cont, handler) } + suspend fun writeAndWait(buffer: ByteBuffer, position: Long = writePosition): Int { + val writeSize = buffer.fileWriter?.writeAndWait(this, position) ?: buffer.read { + suspendCoroutine { cont -> + writeChannel.write(it, position, cont, handler) + } } + writePosition += writeSize + return writeSize } fun append(buffer: ByteBuffer, position: Long = size) { @@ -52,13 +68,17 @@ class AsyncFile(val path: Path) { return writeAndWait(buffer, position) } - suspend fun read(buffer: ByteBuffer, position: Long = 0): Int { - return suspendCoroutine { cont -> - buffer.write { readChannel.read(it, position, cont, handler) } + suspend fun read(buffer: ByteBuffer, position: Long = readPosition): Int { + val readSize = buffer.fileReader?.read(this, position) ?: buffer.write { + suspendCoroutine { cont -> + readChannel.read(it, position, cont, handler) + } } + readPosition += readSize + return readSize } - fun create() = if (existsCache || !exists) { + fun create() = if (!existsCache || !exists) { Files.createFile(path) existsCache = true true @@ -85,5 +105,25 @@ class AsyncFile(val path: Path) { override fun completed(result: Int, attachment: Continuation) = attachment.resume(result) override fun failed(exc: Throwable, attachment: Continuation) = attachment.resumeWithException(exc) } + + @JvmStatic + val writePositionHandler = object : CompletionHandler { + override fun completed(result: Int, attachment: AsyncFile) { + attachment.writePosition += result + } + + override fun failed(exc: Throwable, attachment: AsyncFile) { + } + } + + @JvmStatic + val readPositionHandler = object : CompletionHandler { + override fun completed(result: Int, attachment: AsyncFile) { + attachment.readPosition += result + } + + override fun failed(exc: Throwable, attachment: AsyncFile) { + } + } } } \ No newline at end of file diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt index 1e499f2..93b24af 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt @@ -1,5 +1,6 @@ package cn.tursom.core.buffer +import cn.tursom.core.AsyncFile import cn.tursom.core.forEachIndex import java.io.Closeable import java.io.IOException @@ -41,6 +42,9 @@ interface ByteBuffer : Closeable { val readable: Int get() = writePosition - readPosition val writeable: Int get() = capacity - writePosition + val isReadable: Boolean get() = readable != 0 + val isWriteable: Boolean get() = writeable != 0 + val hasArray: Boolean val array: ByteArray @@ -52,6 +56,9 @@ interface ByteBuffer : Closeable { val closed: Boolean get() = false val resized: Boolean + val fileReader: AsyncFile.Reader? get() = null + val fileWriter: AsyncFile.Writer? get() = null + override fun close() { } @@ -119,14 +126,14 @@ interface ByteBuffer : Closeable { return readSize } - fun writeTo(os: OutputStream): Int { + fun writeTo(os: OutputStream, buffer: ByteArray? = null): Int { val size = readable if (hasArray) { os.write(array, readOffset, size) readPosition += size reset() } else { - val buffer = ByteArray(1024) + val buffer = buffer ?: ByteArray(1024) read { while (it.remaining() > 0) { it.put(buffer) @@ -282,5 +289,4 @@ interface ByteBuffer : Closeable { readPosition += size return size } -} - +} \ No newline at end of file diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/MultipleByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/MultipleByteBuffer.kt index abf4455..ec8a10a 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/MultipleByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/MultipleByteBuffer.kt @@ -150,7 +150,7 @@ interface MultipleByteBuffer : List, Closeable, ByteBuffer { return write } - override fun writeTo(os: OutputStream): Int { + override fun writeTo(os: OutputStream, buffer: ByteArray?): Int { var write = 0 try { while (true) { diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt index 3e1a48d..6ad15eb 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/NettyByteBuffer.kt @@ -1,9 +1,12 @@ package cn.tursom.core.buffer.impl +import cn.tursom.core.AsyncFile import cn.tursom.core.buffer.ByteBuffer import io.netty.buffer.ByteBuf import java.io.OutputStream import java.nio.ByteOrder +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.coroutines.suspendCoroutine class NettyByteBuffer( val byteBuf: ByteBuf @@ -32,7 +35,47 @@ class NettyByteBuffer( byteBuf.readerIndex(value) } override val resized: Boolean get() = false - override var closed: Boolean = false + override val closed get() = atomicClosed.get() + override val isReadable get() = byteBuf.isReadable + override val isWriteable get() = byteBuf.isWritable + + private val atomicClosed = AtomicBoolean(false) + + override val fileReader: AsyncFile.Reader = object : AsyncFile.Reader { + override suspend fun read(file: AsyncFile, position: Long): Int { + val nioBuffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.capacity()) + var readPosition = position + for (nioBuffer in nioBuffers) { + while (nioBuffer.hasRemaining()) { + val readSize = suspendCoroutine { cont -> + file.writeChannel.read(nioBuffer, readPosition, cont, AsyncFile.handler) + } + if (readSize <= 0) break + readPosition += readSize + byteBuf.writerIndex(byteBuf.writerIndex() + readSize) + } + } + return (readPosition - position).toInt() + } + } + + override val fileWriter: AsyncFile.Writer = object : AsyncFile.Writer { + override suspend fun writeAndWait(file: AsyncFile, position: Long): Int { + val nioBuffers = byteBuf.nioBuffers() + var writePosition = position + for (nioBuffer in nioBuffers) { + while (nioBuffer.hasRemaining()) { + val writeSize = suspendCoroutine { cont -> + file.writeChannel.write(nioBuffer, writePosition, cont, AsyncFile.handler) + } + if (writeSize <= 0) break + writePosition += writeSize + byteBuf.readerIndex(byteBuf.readerIndex() + writeSize) + } + } + return (writePosition - position).toInt() + } + } override fun readBuffer(): java.nio.ByteBuffer { return byteBuf.internalNioBuffer(readPosition, readable).slice() @@ -85,7 +128,7 @@ class NettyByteBuffer( return size } - override fun writeTo(os: OutputStream): Int { + override fun writeTo(os: OutputStream, buffer: ByteArray?): Int { val size = readable byteBuf.readBytes(os, size) reset() @@ -135,8 +178,7 @@ class NettyByteBuffer( } override fun close() { - if (closed) { - closed = true + if (atomicClosed.compareAndSet(false, true)) { byteBuf.release() } }