update ShutdownHook

This commit is contained in:
tursom 2021-07-11 22:24:04 +08:00
parent 4ea5450819
commit 228a1137b7
6 changed files with 167 additions and 53 deletions

View File

@ -1,6 +1,8 @@
package cn.tursom.core
import com.sun.org.slf4j.internal.LoggerFactory
import java.lang.ref.Reference
import java.lang.ref.SoftReference
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.atomic.AtomicInteger
@ -12,15 +14,40 @@ import java.util.concurrent.atomic.AtomicInteger
object ShutdownHook {
private val logger = LoggerFactory.getLogger(ShutdownHook::class.java)
private val shutdownHooks = ConcurrentLinkedDeque<() -> Unit>()
private val shutdownHooks = ConcurrentLinkedDeque<Reference<(() -> Unit)?>>()
private val availableThreadCount = Runtime.getRuntime().availableProcessors() * 2
private val activeThreadCount = AtomicInteger()
fun addHook(hook: () -> Unit): Boolean {
interface Reference<out T> {
fun get(): T
}
class Hook(
private val hook: () -> Unit,
private val reference: Reference<(() -> Unit)?>
) {
fun cancel() {
shutdownHooks.remove(reference)
}
}
fun addHook(softReference: Boolean = false, hook: () -> Unit): Hook {
if (activeThreadCount.incrementAndGet() <= availableThreadCount) {
addWorkThread()
}
return shutdownHooks.add(hook)
val reference = if (softReference) {
object : Reference<(() -> Unit)?> {
private val ref = SoftReference(hook)
override fun get(): (() -> Unit)? = ref.get()
}
} else {
object : Reference<() -> Unit> {
override fun get(): () -> Unit = hook
}
}
shutdownHooks.add(reference)
return Hook(hook, reference)
}
private fun addWorkThread() {
@ -28,7 +55,7 @@ object ShutdownHook {
var hook = shutdownHooks.poll()
while (hook != null) {
try {
hook()
hook.get()?.invoke()
} catch (e: Throwable) {
//error("an exception caused on hook", e)
}

View File

@ -1,4 +1,4 @@
package cn.tursom.core
package cn.tursom.http
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer
@ -72,6 +72,13 @@ object HttpRequest {
return conn
}
fun send(
method: String = "GET",
url: String,
headers: Map<String, String> = defaultHeader,
data: ByteArray?
) = send(method, url, headers, data?.let { HeapByteBuffer(data) })
fun getContextStream(
method: String = "GET",
url: String,
@ -96,13 +103,6 @@ object HttpRequest {
return conn.getRealInputStream().readBytes().toString(conn.getCharset())
}
fun send(
method: String = "GET",
url: String,
headers: Map<String, String> = defaultHeader,
data: ByteArray?
) = send(method, url, headers, data?.let { HeapByteBuffer(data) })
fun doGet(
url: String,
param: String? = null,
@ -132,6 +132,24 @@ object HttpRequest {
}, headers)
}
fun doPost(
url: String,
data: ByteArray,
headers: Map<String, String> = defaultHeader
): String = getContextStr("POST", url, headers, HeapByteBuffer(data))
fun doPost(
url: String,
param: Map<String, String>,
headers: Map<String, String> = defaultHeader
): String {
val sb = StringBuilder()
param.forEach { (key, value) ->
sb.append("${URLEncoder.encode(key, "utf-8")}=${URLEncoder.encode(value, "utf-8")}&")
}
if (sb.isNotEmpty()) sb.deleteCharAt(sb.lastIndex)
return doPost(url, sb.toString().toByteArray(), headers)
}
fun doHead(
url: String,
@ -154,23 +172,4 @@ object HttpRequest {
}, headers)
}
fun doPost(
url: String,
data: ByteArray,
headers: Map<String, String> = defaultHeader
): String = getContextStr("POST", url, headers, HeapByteBuffer(data))
fun doPost(
url: String,
param: Map<String, String>,
headers: Map<String, String> = defaultHeader
): String {
val sb = StringBuilder()
param.forEach { (key, value) ->
sb.append("${URLEncoder.encode(key, "utf-8")}=${URLEncoder.encode(value, "utf-8")}&")
}
if (sb.isNotEmpty()) sb.deleteCharAt(sb.lastIndex)
return doPost(url, sb.toString().toByteArray(), headers)
}
}

View File

@ -9,17 +9,26 @@ import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.StandardOpenOption
import java.util.concurrent.Future
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
@Suppress("MemberVisibilityCanBePrivate")
@Suppress("MemberVisibilityCanBePrivate", "unused", "DuplicatedCode")
class AsyncFile(val path: Path) {
constructor(path: String) : this(Paths.get(path))
private var existsCache = false
interface Writer {
suspend fun writeAndWait(file: AsyncFile, position: Long): Int
}
interface Reader {
suspend fun read(file: AsyncFile, position: Long): Int
}
private var existsCache = exists
val exists: Boolean
get() {
@ -27,21 +36,28 @@ class AsyncFile(val path: Path) {
existsCache = exists
return exists
}
val size get() = if (existsCache || exists) Files.size(path) else 0
var writePosition: Long = 0
var readPosition: Long = 0
val writeChannel: AsynchronousFileChannel by lazy { AsynchronousFileChannel.open(path, StandardOpenOption.WRITE) }
val readChannel: AsynchronousFileChannel by lazy { AsynchronousFileChannel.open(path, StandardOpenOption.READ) }
fun write(buffer: ByteBuffer, position: Long = 0) {
create()
buffer.read { writeChannel.write(it, position) }
fun write(buffer: ByteBuffer, position: Long = writePosition): Future<Int> {
return buffer.read {
writeChannel.write(it, position)
}
}
suspend fun writeAndWait(buffer: ByteBuffer, position: Long = 0): Int {
create()
return suspendCoroutine { cont ->
buffer.read { writeChannel.write(it, position, cont, handler) }
suspend fun writeAndWait(buffer: ByteBuffer, position: Long = writePosition): Int {
val writeSize = buffer.fileWriter?.writeAndWait(this, position) ?: buffer.read {
suspendCoroutine { cont ->
writeChannel.write(it, position, cont, handler)
}
}
writePosition += writeSize
return writeSize
}
fun append(buffer: ByteBuffer, position: Long = size) {
@ -52,13 +68,17 @@ class AsyncFile(val path: Path) {
return writeAndWait(buffer, position)
}
suspend fun read(buffer: ByteBuffer, position: Long = 0): Int {
return suspendCoroutine { cont ->
buffer.write { readChannel.read(it, position, cont, handler) }
suspend fun read(buffer: ByteBuffer, position: Long = readPosition): Int {
val readSize = buffer.fileReader?.read(this, position) ?: buffer.write {
suspendCoroutine { cont ->
readChannel.read(it, position, cont, handler)
}
}
readPosition += readSize
return readSize
}
fun create() = if (existsCache || !exists) {
fun create() = if (!existsCache || !exists) {
Files.createFile(path)
existsCache = true
true
@ -85,5 +105,25 @@ class AsyncFile(val path: Path) {
override fun completed(result: Int, attachment: Continuation<Int>) = attachment.resume(result)
override fun failed(exc: Throwable, attachment: Continuation<Int>) = attachment.resumeWithException(exc)
}
@JvmStatic
val writePositionHandler = object : CompletionHandler<Int, AsyncFile> {
override fun completed(result: Int, attachment: AsyncFile) {
attachment.writePosition += result
}
override fun failed(exc: Throwable, attachment: AsyncFile) {
}
}
@JvmStatic
val readPositionHandler = object : CompletionHandler<Int, AsyncFile> {
override fun completed(result: Int, attachment: AsyncFile) {
attachment.readPosition += result
}
override fun failed(exc: Throwable, attachment: AsyncFile) {
}
}
}
}

View File

@ -1,5 +1,6 @@
package cn.tursom.core.buffer
import cn.tursom.core.AsyncFile
import cn.tursom.core.forEachIndex
import java.io.Closeable
import java.io.IOException
@ -41,6 +42,9 @@ interface ByteBuffer : Closeable {
val readable: Int get() = writePosition - readPosition
val writeable: Int get() = capacity - writePosition
val isReadable: Boolean get() = readable != 0
val isWriteable: Boolean get() = writeable != 0
val hasArray: Boolean
val array: ByteArray
@ -52,6 +56,9 @@ interface ByteBuffer : Closeable {
val closed: Boolean get() = false
val resized: Boolean
val fileReader: AsyncFile.Reader? get() = null
val fileWriter: AsyncFile.Writer? get() = null
override fun close() {
}
@ -119,14 +126,14 @@ interface ByteBuffer : Closeable {
return readSize
}
fun writeTo(os: OutputStream): Int {
fun writeTo(os: OutputStream, buffer: ByteArray? = null): Int {
val size = readable
if (hasArray) {
os.write(array, readOffset, size)
readPosition += size
reset()
} else {
val buffer = ByteArray(1024)
val buffer = buffer ?: ByteArray(1024)
read {
while (it.remaining() > 0) {
it.put(buffer)
@ -282,5 +289,4 @@ interface ByteBuffer : Closeable {
readPosition += size
return size
}
}
}

View File

@ -150,7 +150,7 @@ interface MultipleByteBuffer : List<ByteBuffer>, Closeable, ByteBuffer {
return write
}
override fun writeTo(os: OutputStream): Int {
override fun writeTo(os: OutputStream, buffer: ByteArray?): Int {
var write = 0
try {
while (true) {

View File

@ -1,9 +1,12 @@
package cn.tursom.core.buffer.impl
import cn.tursom.core.AsyncFile
import cn.tursom.core.buffer.ByteBuffer
import io.netty.buffer.ByteBuf
import java.io.OutputStream
import java.nio.ByteOrder
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.suspendCoroutine
class NettyByteBuffer(
val byteBuf: ByteBuf
@ -32,7 +35,47 @@ class NettyByteBuffer(
byteBuf.readerIndex(value)
}
override val resized: Boolean get() = false
override var closed: Boolean = false
override val closed get() = atomicClosed.get()
override val isReadable get() = byteBuf.isReadable
override val isWriteable get() = byteBuf.isWritable
private val atomicClosed = AtomicBoolean(false)
override val fileReader: AsyncFile.Reader = object : AsyncFile.Reader {
override suspend fun read(file: AsyncFile, position: Long): Int {
val nioBuffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.capacity())
var readPosition = position
for (nioBuffer in nioBuffers) {
while (nioBuffer.hasRemaining()) {
val readSize = suspendCoroutine<Int> { cont ->
file.writeChannel.read(nioBuffer, readPosition, cont, AsyncFile.handler)
}
if (readSize <= 0) break
readPosition += readSize
byteBuf.writerIndex(byteBuf.writerIndex() + readSize)
}
}
return (readPosition - position).toInt()
}
}
override val fileWriter: AsyncFile.Writer = object : AsyncFile.Writer {
override suspend fun writeAndWait(file: AsyncFile, position: Long): Int {
val nioBuffers = byteBuf.nioBuffers()
var writePosition = position
for (nioBuffer in nioBuffers) {
while (nioBuffer.hasRemaining()) {
val writeSize = suspendCoroutine<Int> { cont ->
file.writeChannel.write(nioBuffer, writePosition, cont, AsyncFile.handler)
}
if (writeSize <= 0) break
writePosition += writeSize
byteBuf.readerIndex(byteBuf.readerIndex() + writeSize)
}
}
return (writePosition - position).toInt()
}
}
override fun readBuffer(): java.nio.ByteBuffer {
return byteBuf.internalNioBuffer(readPosition, readable).slice()
@ -85,7 +128,7 @@ class NettyByteBuffer(
return size
}
override fun writeTo(os: OutputStream): Int {
override fun writeTo(os: OutputStream, buffer: ByteArray?): Int {
val size = readable
byteBuf.readBytes(os, size)
reset()
@ -135,8 +178,7 @@ class NettyByteBuffer(
}
override fun close() {
if (closed) {
closed = true
if (atomicClosed.compareAndSet(false, true)) {
byteBuf.release()
}
}