add CurrentThreadCoroutineScope

This commit is contained in:
tursom 2020-07-18 15:45:45 +08:00
parent 9b80eabdf0
commit 78c06598e1
6 changed files with 187 additions and 16 deletions

View File

@ -0,0 +1,11 @@
package cn.tursom.utils.coroutine
import kotlinx.coroutines.CoroutineScope
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
class CoroutineContextScope(override val coroutineContext: CoroutineContext) : CoroutineScope {
companion object {
suspend fun get() = CoroutineContextScope(coroutineContext)
}
}

View File

@ -3,6 +3,7 @@ package cn.tursom.utils.coroutine
import cn.tursom.core.cast
import kotlinx.coroutines.Job
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
open class CoroutineLocal<T> {
@ -33,7 +34,7 @@ open class CoroutineLocal<T> {
}
companion object {
private val attachMap = ConcurrentHashMap<Job, MutableMap<CoroutineLocal<*>, Any?>>()
private val attachMap = ConcurrentHashMap<CoroutineContext, MutableMap<CoroutineLocal<*>, Any?>>()
override fun toString(): String = attachMap.toString()
}
}

View File

@ -1,7 +1,9 @@
package cn.tursom.utils.coroutine
import cn.tursom.core.cast
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
@ -9,11 +11,20 @@ class CoroutineScopeContext(
var coroutineScope: CoroutineScope = GlobalScope
) : CoroutineContext.Element {
override val key: CoroutineContext.Key<*> get() = Companion
override fun toString(): String = "CoroutineScopeContext(coroutineScope=$coroutineScope)"
companion object : CoroutineContext.Key<CoroutineScopeContext>, CoroutineLocal<CoroutineScope>() {
override suspend fun get(): CoroutineScope = coroutineContext[this]?.coroutineScope ?: super.get() ?: GlobalScope
override suspend fun set(value: CoroutineScope): Boolean {
override suspend fun get(): CoroutineScope = coroutineContext[this]?.coroutineScope ?: super.get()
?: coroutineContext[Job]?.let {
if (it is CoroutineScope) {
it.cast<CoroutineScope>()
} else {
null
}
} ?: CoroutineContextScope(coroutineContext)
override suspend infix fun set(value: CoroutineScope): Boolean {
val coroutineScopeContext = coroutineContext[this]
return if (coroutineScopeContext != null) {
coroutineScopeContext.coroutineScope = value

View File

@ -0,0 +1,63 @@
package cn.tursom.utils.coroutine
import cn.tursom.core.cast
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
object CurrentThreadCoroutineScope {
private val currentThreadCoroutineScopeThreadLocal = ThreadLocal<CoroutineScope>()
private suspend fun getCoroutineScope(): CoroutineScope {
return currentThreadCoroutineScopeThreadLocal.get() ?: kotlin.run {
val eventLoop = newBlockingEventLoop()
val coroutineScope = newBlockingCoroutine(coroutineContext, Thread.currentThread(), eventLoop)
currentThreadCoroutineScopeThreadLocal.set(coroutineScope)
coroutineScope
}
}
suspend fun launch(
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
) {
getCoroutineScope().start(start, block = block)
}
private val BlockingEventLoop = Class.forName("kotlinx.coroutines.BlockingEventLoop")
private val BlockingEventLoopConstructor = BlockingEventLoop
.getConstructor(Thread::class.java)
.apply { isAccessible = true }
private fun newBlockingEventLoop(thread: Thread = Thread.currentThread()): CoroutineDispatcher {
return BlockingEventLoopConstructor.newInstance(thread) as CoroutineDispatcher
}
private val BlockingCoroutine = Class.forName("kotlinx.coroutines.BlockingCoroutine")
private val BlockingCoroutineConstructor = BlockingCoroutine.constructors[0].apply { isAccessible = true }
private val BlockingCoroutineStart = BlockingCoroutine.methods.first { it.name == "start" && it.parameters.size == 3 }.apply { isAccessible = true }
private val BlockingCoroutineJoinBlocking = BlockingCoroutine.methods.first { it.name == "joinBlocking" }.apply { isAccessible = true }
//private val BlockingCoroutineOnCompleted = BlockingCoroutine.methods.first { it.name == "onCompleted" }.apply { isAccessible = true }
private fun newBlockingCoroutine(
coroutineContext: CoroutineContext,
thread: Thread = Thread.currentThread(),
eventLoop: CoroutineDispatcher
): CoroutineScope {
return BlockingCoroutineConstructor.newInstance(coroutineContext, thread, eventLoop).cast()
}
private fun <T> CoroutineScope.start(
start: CoroutineStart = CoroutineStart.DEFAULT,
receiver: CoroutineScope = this,
block: suspend CoroutineScope.() -> T
) {
BlockingCoroutineStart.invoke(this, start, receiver, block)
}
private fun <T> CoroutineScope.joinBlocking(): T {
return BlockingCoroutineJoinBlocking.invoke(this).cast()
}
}

View File

@ -87,3 +87,34 @@ fun <T> runBlockingWithCoroutineLocalAndCoroutineScopeContext(
block()
}
}
fun CoroutineScope.launchWithEnhanceContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
map: MutableMap<CoroutineLocal<*>, Any?> = HashMap(4),
block: suspend CoroutineScope.() -> Unit
): Job {
return launch(context + CoroutineLocalContext(map), start, block)
}
@Suppress("DeferredIsResult")
fun <T> CoroutineScope.asyncWithEnhanceContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
map: MutableMap<CoroutineLocal<*>, Any?> = HashMap(4),
block: suspend CoroutineScope.() -> T
): Deferred<T> {
return async(context + CoroutineLocalContext(map), start, block)
}
@Throws(InterruptedException::class)
fun <T> runBlockingWithEnhanceContext(
context: CoroutineContext = EmptyCoroutineContext,
map: MutableMap<CoroutineLocal<*>, Any?> = HashMap(4),
block: suspend CoroutineScope.() -> T
): T {
return runBlocking(context + CoroutineLocalContext(map)) {
block()
}
}

View File

@ -1,24 +1,78 @@
package cn.tursom.utils.coroutine
import kotlinx.coroutines.delay
import cn.tursom.core.cast
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
val testCoroutineLocal = CoroutineLocal<Int>()
val testCoroutineLocalList = Array(100000) {
CoroutineLocal<Int>()
}.asList()
fun main() = runBlockingWithCoroutineLocalAndCoroutineScopeContext {
suspend fun test() {
println(coroutineContext)
CoroutineScopeContext.get().launchWithCoroutineLocalAndCoroutineScopeContext {
println(coroutineContext)
println(CoroutineScopeContext.get())
println(Thread.currentThread().name)
CoroutineScopeContext.get().launchWithCoroutineLocalAndCoroutineScopeContext {
println(coroutineContext)
println(CoroutineScopeContext.get())
println(Thread.currentThread().name)
}
}.join()
println(coroutineContext[Job] is CoroutineScope)
println(CoroutineScopeContext.get())
println(Thread.currentThread().name)
}
val EventLoop = Class.forName("kotlinx.coroutines.EventLoop")
val AbstractCoroutine = Class.forName("kotlinx.coroutines.AbstractCoroutine")
val BlockingEventLoop = Class.forName("kotlinx.coroutines.BlockingEventLoop")
val BlockingEventLoopConstructor = BlockingEventLoop.getConstructor(Thread::class.java).apply { isAccessible = true }
fun newBlockingEventLoop(thread: Thread = Thread.currentThread()): CoroutineDispatcher {
return BlockingEventLoopConstructor.newInstance(thread) as CoroutineDispatcher
}
val BlockingCoroutine = Class.forName("kotlinx.coroutines.BlockingCoroutine")
val BlockingCoroutineConstructor = BlockingCoroutine.constructors[0].apply { isAccessible = true }
val BlockingCoroutineStart = BlockingCoroutine.methods.first { it.name == "start" && it.parameters.size == 3 }.apply { isAccessible = true }
val BlockingCoroutineJoinBlocking = BlockingCoroutine.methods.first { it.name == "joinBlocking" }.apply { isAccessible = true }
fun newBlockingCoroutine(coroutineContext: CoroutineContext, thread: Thread = Thread.currentThread(), eventLoop: CoroutineDispatcher): CoroutineScope {
return BlockingCoroutineConstructor.newInstance(coroutineContext, thread, eventLoop).cast()
}
fun CoroutineScope.start(start: CoroutineStart = CoroutineStart.DEFAULT, receiver: CoroutineScope = this, block: suspend CoroutineScope.() -> Any?) {
return BlockingCoroutineStart.invoke(this, start, receiver, block).cast()
}
fun CoroutineScope.joinBlocking() {
BlockingCoroutineJoinBlocking.invoke(this)
}
suspend fun main() {
test()
CurrentThreadCoroutineScope.launch {
test()
}
delay(1000)
println(CoroutineLocal)
//runBlockingWithEnhanceContext {
// println(coroutineContext)
// println(coroutineContext[Job] is CoroutineScope)
// println(CoroutineScopeContext.get())
// println(Thread.currentThread().name)
// CoroutineContextScope(coroutineContext).launch {
// println(coroutineContext)
// println(coroutineContext[Job] is CoroutineScope)
// println(CoroutineScopeContext.get())
// println(Thread.currentThread().name)
// }.join()
// //CoroutineScopeContext.get().launchWithEnhanceContext {
// // println(coroutineContext)
// // println(coroutineContext[Job] is CoroutineScope)
// // println(CoroutineScopeContext.get())
// // println(Thread.currentThread().name)
// // CoroutineScopeContext.get().launchWithEnhanceContext {
// // println(coroutineContext)
// // println(coroutineContext[Job] is CoroutineScope)
// // println(CoroutineScopeContext.get())
// // println(Thread.currentThread().name)
// // }
// //}.join()
// delay(1000)
// println(CoroutineLocal)
//}
}