This commit is contained in:
tursom 2021-04-11 23:04:57 +08:00
parent 1670296122
commit d45dae40d5
19 changed files with 321 additions and 534 deletions

View File

@ -10,6 +10,7 @@ include("ts-core:ts-delegation")
include("ts-core:ts-clone")
include("ts-core:ts-mail")
include("ts-core:ts-coroutine")
include("ts-core:ts-coroutine:ts-coroutine-lock")
include("ts-core:ts-ws-client")
include("ts-core:ts-yaml")
include("ts-socket")

View File

@ -0,0 +1,31 @@
plugins {
kotlin("jvm")
`maven-publish`
}
dependencies {
api(project(":ts-core"))
}
@kotlin.Suppress("UNCHECKED_CAST")
(rootProject.ext["excludeTest"] as (Project, TaskContainer) -> Unit)(project, tasks)
tasks.register("install") {
finalizedBy(tasks["publishToMavenLocal"])
}
publishing {
publications {
create<MavenPublication>("maven") {
groupId = project.group.toString()
artifactId = project.name
version = project.version.toString()
from(components["java"])
try {
artifact(tasks["sourcesJar"])
} catch (e: Exception) {
}
}
}
}

View File

@ -0,0 +1,59 @@
package cn.tursom.core.coroutine.lock
import kotlinx.coroutines.delay
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
interface AsyncLock {
suspend fun sync(block: suspend () -> Unit)
suspend fun isLock(): Boolean
suspend operator fun <T> invoke(block: suspend () -> T): T
suspend fun AtomicBoolean.lock(delayTime: Long) {
// 如果得不到锁先自旋20次
var maxLoopTime = 20
while (maxLoopTime-- > 0) {
if (compareAndSet(false, true)) return
}
while (!compareAndSet(false, true)) {
delay(delayTime)
}
}
suspend fun AtomicBoolean.release() {
set(false)
}
suspend fun AtomicBoolean.wait(delayTime: Long) {
// 如果得不到锁先自旋20次
var maxLoopTime = 20
while (maxLoopTime-- > 0) {
if (!get()) return
}
while (get()) {
delay(delayTime)
}
}
suspend fun AsyncLock.wait(delayTime: Long) {
// 如果得不到锁先自旋20次
var maxLoopTime = 20
while (maxLoopTime-- > 0) {
if (!isLock()) return
}
while (isLock()) {
delay(delayTime)
}
}
suspend fun AtomicInteger.wait(delayTime: Long) {
// 如果得不到锁先自旋20次
var maxLoopTime = 20
while (maxLoopTime-- > 0) {
if (get() <= 0) return
}
while (get() > 0) {
delay(delayTime)
}
}
}

View File

@ -0,0 +1,22 @@
package cn.tursom.core.coroutine.lock
import java.util.concurrent.atomic.AtomicBoolean
class AsyncLoopLock : AsyncLock {
private val lock = AtomicBoolean(false)
override suspend fun sync(block: suspend () -> Unit) {
invoke(block)
}
override suspend fun isLock(): Boolean = lock.get()
override suspend fun <T> invoke(block: suspend () -> T): T {
while (!lock.compareAndSet(false, true));
try {
return block()
} finally {
lock.set(false)
}
}
}

View File

@ -0,0 +1,47 @@
package cn.tursom.core.coroutine.lock
import java.util.concurrent.atomic.AtomicBoolean
class AsyncMutexLock : AsyncLock {
private val lock = AtomicBoolean(false)
private val waitList = AsyncWaitList()
suspend fun wait() {
var loopTime = 20
while (loopTime-- > 0) if (!lock.get()) return
waitList.wait()
waitList.resume()
}
override suspend fun sync(block: suspend () -> Unit) {
invoke(block)
}
override suspend fun isLock(): Boolean {
return lock.get()
}
override suspend operator fun <T> invoke(block: suspend () -> T): T {
lock.lock()
try {
return block()
} finally {
lock.release()
}
}
private suspend fun AtomicBoolean.lock() {
var loopTime = 20
while (loopTime-- > 0) if (compareAndSet(false, true)) return
waitList.wait()
}
override suspend fun AtomicBoolean.release() {
if (waitList.notEmpty) {
waitList.resume()
} else {
set(false)
}
}
}

View File

@ -0,0 +1,6 @@
package cn.tursom.core.coroutine.lock
interface AsyncRWLock : AsyncLock {
suspend fun <T> doRead(block: suspend () -> T): T
suspend fun <T> doWrite(block: suspend () -> T): T
}

View File

@ -0,0 +1,41 @@
package cn.tursom.core.coroutine.lock
import java.util.concurrent.atomic.AtomicInteger
/**
* 读优化锁
*/
@Suppress("MemberVisibilityCanBePrivate")
class AsyncReadFirstRWLock() : AsyncRWLock {
private val lock = AsyncMutexLock()
private val readNumber = AtomicInteger(0)
private val writeList = AsyncWaitList()
override suspend fun <T> doRead(block: suspend () -> T): T {
readNumber.incrementAndGet()
lock.wait()
try {
return block()
} finally {
readNumber.decrementAndGet()
if (readNumber.get() == 0) writeList.resume()
}
}
override suspend fun <T> doWrite(block: suspend () -> T): T {
return invoke(block)
}
override suspend fun sync(block: suspend () -> Unit) {
invoke(block)
}
override suspend fun <T> invoke(block: suspend () -> T): T {
while (readNumber.get() != 0) writeList.wait()
return lock { block() }
}
override suspend fun isLock(): Boolean {
return lock.isLock()
}
}

View File

@ -0,0 +1,58 @@
package cn.tursom.core.coroutine.lock
import cn.tursom.core.Unsafe.unsafe
import java.io.Closeable
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
@Suppress("MemberVisibilityCanBePrivate")
class AsyncWaitList : Closeable {
val empty: Boolean get() = lockList == null
val notEmpty: Boolean get() = lockList != null
suspend fun wait() = suspendCoroutine<Int> { cont ->
var list = lockList
while (!unsafe.compareAndSwapObject(this, listOffset, list, LockNode(cont, list))) {
list = lockList
}
}
fun resume(): Boolean {
var list = lockList ?: return false
while (!unsafe.compareAndSwapObject(this, listOffset, list, list.next)) {
list = lockList ?: return false
}
list.cont.resume(0)
return true
}
fun resumeAll(): Boolean {
var list: LockNode? = lockList ?: return false
while (!unsafe.compareAndSwapObject(this, listOffset, list, null)) {
list = lockList ?: return false
}
while (list != null) {
list.cont.resume(0)
list = list.next
}
return true
}
override fun close() {
resumeAll()
}
@Volatile
private var lockList: LockNode? = null
//private val listLock = AsyncLoopLock()
private data class LockNode(val cont: Continuation<Int>, val next: LockNode? = null)
companion object {
val listOffset = run {
unsafe.objectFieldOffset(AsyncWaitList::class.java.getDeclaredField("lockList"))
}
}
}

View File

@ -0,0 +1,56 @@
package cn.tursom.core.coroutine.lock
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
@Suppress("MemberVisibilityCanBePrivate")
class AsyncWriteFirstRWLock(val delayTime: Long = 10) : AsyncRWLock {
private val lock = AtomicBoolean(false)
private val readNumber = AtomicInteger(0)
private val writeNumber = AtomicInteger(0)
override suspend fun <T> doWrite(block: suspend () -> T): T {
return invoke(block)
}
override suspend fun <T> doRead(block: suspend () -> T): T {
// 先等待通知锁关闭
writeNumber.wait(delayTime)
// 添加读计数
readNumber.incrementAndGet()
try {
return block()
} finally {
// 减少读计数
readNumber.decrementAndGet()
}
}
override suspend fun sync(block: suspend () -> Unit) {
invoke(block)
}
override suspend fun <T> invoke(block: suspend () -> T): T {
writeNumber.incrementAndGet()
repeat(20) {}
readNumber.wait(delayTime)
lock.lock(delayTime)
try {
return block()
} finally {
lock.release()
writeNumber.decrementAndGet()
}
}
override suspend fun isLock(): Boolean {
return lock.get()
}
}

View File

@ -1,12 +1,8 @@
package cn.tursom.ws
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPromise
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.websocketx.*
import io.netty.util.CharsetUtil
class WebSocketClientChannelHandler(

View File

@ -1,11 +0,0 @@
package cn.tursom.utils.coroutine
import kotlinx.coroutines.CoroutineScope
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
class CoroutineContextScope(override val coroutineContext: CoroutineContext) : CoroutineScope {
companion object {
suspend fun get() = CoroutineContextScope(coroutineContext)
}
}

View File

@ -1,53 +0,0 @@
package cn.tursom.utils.coroutine
import cn.tursom.core.cast
import cn.tursom.core.toHexString
import kotlinx.coroutines.Job
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
open class CoroutineLocal<T> {
open suspend fun get(): T? {
var attach: MutableMap<CoroutineLocal<*>, Any?>? = coroutineContext[CoroutineLocalContext]
if (attach == null) {
if (injectCoroutineLocalContext()) {
attach = coroutineContext[CoroutineLocalContext]
}
if (attach == null) {
val job = coroutineContext[Job] ?: return null
attach = attachMap[job]
}
}
return attach?.get(this)?.cast()
}
open suspend infix fun set(value: T): Boolean {
var attach: MutableMap<CoroutineLocal<*>, Any?>? = coroutineContext[CoroutineLocalContext]
if (attach == null) {
if (injectCoroutineLocalContext()) {
attach = coroutineContext[CoroutineLocalContext]
}
if (attach == null) {
val job = coroutineContext[Job] ?: return false
attach = attachMap[job]
if (attach == null) {
attach = HashMap()
attachMap[job] = attach
job.invokeOnCompletion {
attachMap.remove(job)
}
}
}
}
attach[this] = value
return true
}
override fun toString(): String = "CoroutineLocal@${hashCode().toHexString(false)}"
companion object {
private val attachMap = ConcurrentHashMap<CoroutineContext, MutableMap<CoroutineLocal<*>, Any?>>()
override fun toString(): String = attachMap.toString()
}
}

View File

@ -1,45 +0,0 @@
package cn.tursom.utils.coroutine
import cn.tursom.core.cast
import kotlinx.coroutines.ThreadContextElement
import kotlin.coroutines.CoroutineContext
open class CoroutineLocalContext(
private val mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) }
) : CoroutineContext.Element, ThreadContextElement<MutableMap<CoroutineLocal<*>, Any?>>, MutableMap<CoroutineLocal<*>, Any?> {
override val key: CoroutineContext.Key<*> get() = Key
private var map: MutableMap<CoroutineLocal<*>, Any?> = mapBuilder()
override fun toString(): String {
return "CoroutineLocalContext$map"
}
companion object Key : CoroutineContext.Key<CoroutineLocalContext>
override fun restoreThreadContext(context: CoroutineContext, oldState: MutableMap<CoroutineLocal<*>, Any?>) {
map = oldState
}
override fun updateThreadContext(context: CoroutineContext): MutableMap<CoroutineLocal<*>, Any?> {
val oldState = map
map = mapBuilder()
return oldState
}
@JvmName("getTyped")
operator fun <T> get(key: CoroutineLocal<T>): T? = map[key].cast()
operator fun <T> set(key: CoroutineLocal<T>, value: T?): T? = map.put(key, value).cast()
override val size: Int get() = map.size
override fun containsKey(key: CoroutineLocal<*>): Boolean = map.containsKey(key)
override fun containsValue(value: Any?): Boolean = map.containsValue(value)
override fun get(key: CoroutineLocal<*>): Any? = map[key]
override fun isEmpty(): Boolean = map.isEmpty()
override val entries: MutableSet<MutableMap.MutableEntry<CoroutineLocal<*>, Any?>> get() = map.entries
override val keys: MutableSet<CoroutineLocal<*>> get() = map.keys
override val values: MutableCollection<Any?> get() = map.values
override fun clear() = map.clear()
override fun put(key: CoroutineLocal<*>, value: Any?): Any? = map.put(key, value)
override fun putAll(from: Map<out CoroutineLocal<*>, Any?>) = map.putAll(from)
override fun remove(key: CoroutineLocal<*>): Any? = map.remove(key)
}

View File

@ -1,16 +0,0 @@
package cn.tursom.utils.coroutine
import cn.tursom.core.cast
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
class CoroutineLocalContinuation(
private val completion: Continuation<*>
) : Continuation<Any?> by completion.cast() {
override val context: CoroutineContext = completion.context + if (completion.context[CoroutineLocalContext] == null) {
CoroutineLocalContext()
} else {
EmptyCoroutineContext
}
}

View File

@ -1,37 +0,0 @@
package cn.tursom.utils.coroutine
import cn.tursom.core.cast
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
class CoroutineScopeContext(
var coroutineScope: CoroutineScope = GlobalScope
) : CoroutineContext.Element {
override val key: CoroutineContext.Key<*> get() = Companion
override fun toString(): String = "CoroutineScopeContext(coroutineScope=$coroutineScope)"
companion object : CoroutineContext.Key<CoroutineScopeContext>, CoroutineLocal<CoroutineScope>() {
override suspend fun get(): CoroutineScope = coroutineContext[this]?.coroutineScope ?: super.get()
?: coroutineContext[Job]?.let {
if (it is CoroutineScope) {
it.cast<CoroutineScope>()
} else {
null
}
} ?: CoroutineContextScope(coroutineContext)
override suspend infix fun set(value: CoroutineScope): Boolean {
val coroutineScopeContext = coroutineContext[this]
return if (coroutineScopeContext != null) {
coroutineScopeContext.coroutineScope = value
true
} else {
super.set(value)
}
}
}
}

View File

@ -1,75 +0,0 @@
package cn.tursom.utils.coroutine
import cn.tursom.core.SimpThreadLocal
import cn.tursom.core.cast
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
object CurrentThreadCoroutineScope {
private val eventLoopThreadLocal: SimpThreadLocal<CoroutineDispatcher> = SimpThreadLocal {
newBlockingEventLoop()
}
private suspend fun getCoroutineScope(): CoroutineScope {
val eventLoop = eventLoopThreadLocal.get()
val coroutineScopeContext = CoroutineScopeContext()
val newBlockingCoroutine = newBlockingCoroutine(
coroutineContext + coroutineScopeContext + Dispatchers.Unconfined,
Thread.currentThread(),
eventLoop
)
coroutineScopeContext.coroutineScope = newBlockingCoroutine
return newBlockingCoroutine
}
suspend fun launch(
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val coroutineScope = getCoroutineScope()
//coroutineScope.launch(start = start, block = block)
coroutineScope.start(start, block = block)
return coroutineScope as Job
}
private val EventLoop = Class.forName("kotlinx.coroutines.EventLoop")
private val EventLoopShouldBeProcessedFromContext = EventLoop.methods
.first { it.name == "shouldBeProcessedFromContext" }
.apply { isAccessible = true }
private val BlockingEventLoop = Class.forName("kotlinx.coroutines.BlockingEventLoop")
private val BlockingEventLoopConstructor = BlockingEventLoop
.getConstructor(Thread::class.java)
.apply { isAccessible = true }
private fun newBlockingEventLoop(thread: Thread = Thread.currentThread()): CoroutineDispatcher {
return BlockingEventLoopConstructor.newInstance(thread) as CoroutineDispatcher
}
private val BlockingCoroutine = Class.forName("kotlinx.coroutines.BlockingCoroutine")
private val BlockingCoroutineConstructor = BlockingCoroutine.constructors[0].apply { isAccessible = true }
private val BlockingCoroutineStart = BlockingCoroutine.methods.first { it.name == "start" && it.parameters.size == 3 }.apply { isAccessible = true }
private val BlockingCoroutineJoinBlocking = BlockingCoroutine.methods.first { it.name == "joinBlocking" }.apply { isAccessible = true }
//private val BlockingCoroutineOnCompleted = BlockingCoroutine.methods.first { it.name == "onCompleted" }.apply { isAccessible = true }
private fun newBlockingCoroutine(
coroutineContext: CoroutineContext,
thread: Thread = Thread.currentThread(),
eventLoop: CoroutineDispatcher
): CoroutineScope {
return BlockingCoroutineConstructor.newInstance(coroutineContext, thread, eventLoop).cast()
}
private fun <T> CoroutineScope.start(
start: CoroutineStart = CoroutineStart.DEFAULT,
receiver: CoroutineScope = this,
block: suspend CoroutineScope.() -> T
) {
BlockingCoroutineStart.invoke(this, start, receiver, block)
}
private fun <T> CoroutineScope.joinBlocking(): T {
return BlockingCoroutineJoinBlocking.invoke(this).cast()
}
}

View File

@ -1,13 +0,0 @@
package cn.tursom.utils.coroutine
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.MainCoroutineDispatcher
import kotlinx.coroutines.internal.MainDispatcherFactory
@Suppress("unused")
@InternalCoroutinesApi
class MainCoroutineDispatcherFactory : MainDispatcherFactory {
override val loadPriority: Int = 1
override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher = MainDispatcher
}

View File

@ -1,51 +0,0 @@
package cn.tursom.utils.coroutine
import cn.tursom.core.cast
import kotlinx.coroutines.ExecutorCoroutineDispatcher
import kotlinx.coroutines.MainCoroutineDispatcher
import kotlinx.coroutines.asCoroutineDispatcher
import java.io.Closeable
import java.lang.reflect.Field
import java.lang.reflect.Modifier
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.concurrent.thread
import kotlin.coroutines.CoroutineContext
object MainDispatcher : MainCoroutineDispatcher(), Closeable {
private val loaded = AtomicBoolean(false)
private val dispatcher: ExecutorCoroutineDispatcher = Executors.newSingleThreadExecutor {
thread(start = false, block = it::run, name = "MainDispatcher", isDaemon = false)
}.asCoroutineDispatcher()
private var oldDispatcher: MainCoroutineDispatcher? = null
private val mainDispatcherLoader: Class<*> = Class.forName("kotlinx.coroutines.internal.MainDispatcherLoader")
private val dispatcherField: Field = mainDispatcherLoader.getDeclaredField("dispatcher").also { dispatcher ->
dispatcher.isAccessible = true
val mf: Field = Field::class.java.getDeclaredField("modifiers")
mf.isAccessible = true
mf.setInt(dispatcher, dispatcher.modifiers and Modifier.FINAL.inv())
}
fun init() {
if (loaded.compareAndSet(false, true)) {
oldDispatcher = dispatcherField.get(null).cast()
dispatcherField.set(null, this)
}
}
fun resume() {
if (loaded.compareAndSet(true, false) && oldDispatcher != null) {
dispatcherField.set(null, oldDispatcher)
}
}
override val immediate: MainCoroutineDispatcher get() = this
override fun dispatch(context: CoroutineContext, block: Runnable) {
dispatcher.dispatch(context, block)
}
override fun close() {
dispatcher.close()
}
}

View File

@ -1,229 +0,0 @@
@file:Suppress("unused")
package cn.tursom.utils.coroutine
import cn.tursom.core.cast
import cn.tursom.core.forAllFields
import cn.tursom.core.isInheritanceFrom
import kotlinx.coroutines.*
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.coroutineContext
fun CoroutineScope.launchWithCoroutineLocalContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> Unit
): Job {
return launch(context + CoroutineLocalContext(mapBuilder), start, block)
}
@Suppress("DeferredIsResult")
fun <T> CoroutineScope.asyncWithCoroutineLocalContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> T
): Deferred<T> {
return async(context + CoroutineLocalContext(mapBuilder), start, block)
}
suspend fun <T> withCoroutineLocalContext(
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> T
): T {
return withContext(CoroutineLocalContext(mapBuilder), block)
}
@Throws(InterruptedException::class)
fun <T> runBlockingWithCoroutineLocalContext(
context: CoroutineContext = EmptyCoroutineContext,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> T
): T {
return runBlocking(context + CoroutineLocalContext(mapBuilder), block)
}
fun CoroutineScope.launchWithCoroutineScopeContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
return launch(context + CoroutineScopeContext(this), start, block)
}
@Suppress("DeferredIsResult")
fun <T> CoroutineScope.asyncWithCoroutineScopeContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
return async(context + CoroutineScopeContext(this), start, block)
}
fun CoroutineScope.launchWithCoroutineLocalAndCoroutineScopeContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> Unit
): Job {
return launch(context + CoroutineLocalContext(mapBuilder) + CoroutineScopeContext(this), start, block)
}
@Suppress("DeferredIsResult")
fun <T> CoroutineScope.asyncWithCoroutineLocalAndCoroutineScopeContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> T
): Deferred<T> {
return async(context + CoroutineLocalContext(mapBuilder) + CoroutineScopeContext(this), start, block)
}
@Throws(InterruptedException::class)
fun <T> runBlockingWithCoroutineLocalAndCoroutineScopeContext(
context: CoroutineContext = EmptyCoroutineContext,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> T
): T {
return runBlocking(context + CoroutineLocalContext(mapBuilder) + CoroutineScopeContext()) {
CoroutineScopeContext set this
block()
}
}
fun CoroutineScope.launchWithEnhanceContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> Unit
): Job {
return launch(context + CoroutineLocalContext(mapBuilder), start, block)
}
@Suppress("DeferredIsResult")
fun <T> CoroutineScope.asyncWithEnhanceContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> T
): Deferred<T> {
return async(context + CoroutineLocalContext(mapBuilder), start, block)
}
@Throws(InterruptedException::class)
fun <T> runBlockingWithEnhanceContext(
context: CoroutineContext = EmptyCoroutineContext,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> T
): T {
return runBlocking(context + CoroutineLocalContext(mapBuilder)) {
block()
}
}
suspend fun <T> runWithCoroutineLocalContext(
block: suspend () -> T
): T {
val continuation: Any? = getContinuation()
val coroutineLocalContinuation = if (continuation is Continuation<*>) {
CoroutineLocalContinuation(continuation.cast())
} else {
return continuation.cast()
}
return (block.cast<(Any?) -> T>()).invoke(coroutineLocalContinuation)
}
suspend fun <T> runWithCoroutineLocal(
block: suspend () -> T
): T {
if (coroutineContext[CoroutineLocalContext] == null) {
return runWithCoroutineLocalContext(block)
}
return block()
}
@Suppress("NOTHING_TO_INLINE")
inline fun getContinuation(continuation: Continuation<*>): Continuation<*> {
return continuation
}
suspend inline fun getContinuation(): Continuation<*> {
val getContinuation: (continuation: Continuation<*>) -> Continuation<*> = ::getContinuation
return (getContinuation.cast<suspend () -> Continuation<*>>()).invoke()
}
suspend fun injectCoroutineContext(
coroutineContext: CoroutineContext,
key: CoroutineContext.Key<out CoroutineContext.Element>? = null
): Boolean {
return if (key == null || coroutineContext[key] == null) {
getContinuation().injectCoroutineContext(coroutineContext, key)
} else {
true
}
}
suspend fun injectCoroutineLocalContext(coroutineLocalContext: CoroutineLocalContext? = null): Boolean {
return if (coroutineContext[CoroutineLocalContext] == null) {
getContinuation().injectCoroutineLocalContext(coroutineLocalContext)
} else {
true
}
}
fun Continuation<*>.injectCoroutineLocalContext(
coroutineLocalContext: CoroutineLocalContext? = null
): Boolean {
return if (context[CoroutineLocalContext] == null) {
injectCoroutineContext(coroutineLocalContext ?: CoroutineLocalContext(), CoroutineLocalContext)
} else {
true
}
}
private val BaseContinuationImpl = Class.forName("kotlin.coroutines.jvm.internal.BaseContinuationImpl")
private val BaseContinuationImplCompletion = BaseContinuationImpl.getDeclaredField("completion").apply { isAccessible = true }
fun Continuation<*>.injectCoroutineContext(
coroutineContext: CoroutineContext,
key: CoroutineContext.Key<out CoroutineContext.Element>? = null
): Boolean {
if (key != null && context[key] != null) return true
if (BaseContinuationImpl.isInstance(this)) {
BaseContinuationImplCompletion.get(this).cast<Continuation<*>>().injectCoroutineContext(coroutineContext, key)
}
combinedContext(context)
if (context[CoroutineLocalContext] != null) return true
javaClass.forAllFields {
if (!it.type.isInheritanceFrom(CoroutineContext::class.java)) {
return@forAllFields
}
it.isAccessible = true
it.set(this, it.get(this).cast<CoroutineContext>() + coroutineContext)
}
return context[CoroutineLocalContext] != null
}
private val combinedContextClass = Class.forName("kotlin.coroutines.CombinedContext")
private val left = combinedContextClass.getDeclaredField("left").apply { isAccessible = true }
fun combinedContext(coroutineContext: CoroutineContext): Boolean {
if (!combinedContextClass.isInstance(coroutineContext)) return false
if (coroutineContext[CoroutineLocalContext] == null) {
val leftObj = left.get(coroutineContext).cast<CoroutineContext>()
left.set(coroutineContext, leftObj + CoroutineLocalContext())
}
return true
}
//fun CoroutineScope.runOnUiThread(action: suspend CoroutineScope.() -> Unit): Job {
// return launch(Dispatchers.Main, block = action)
//}
suspend fun <T> runOnUiThread(coroutineContext: CoroutineContext = EmptyCoroutineContext, action: suspend CoroutineScope.() -> T): T {
return withContext(coroutineContext + Dispatchers.Main, action)
}