add fastSlowTicker

This commit is contained in:
tursom 2022-04-05 18:12:49 +08:00
parent 5e6edec783
commit b94956cabc

View File

@ -19,7 +19,7 @@ fun CoroutineScope.launchWithCoroutineLocalContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> Unit
block: suspend CoroutineScope.() -> Unit,
): Job {
return launch(context + CoroutineLocalContext(mapBuilder), start, block)
}
@ -29,14 +29,14 @@ fun <T> CoroutineScope.asyncWithCoroutineLocalContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> T
block: suspend CoroutineScope.() -> T,
): Deferred<T> {
return async(context + CoroutineLocalContext(mapBuilder), start, block)
}
suspend fun <T> withCoroutineLocalContext(
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> T
block: suspend CoroutineScope.() -> T,
): T {
return withContext(CoroutineLocalContext(mapBuilder), block)
}
@ -45,7 +45,7 @@ suspend fun <T> withCoroutineLocalContext(
fun <T> runBlockingWithCoroutineLocalContext(
context: CoroutineContext = EmptyCoroutineContext,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> T
block: suspend CoroutineScope.() -> T,
): T {
return runBlocking(context + CoroutineLocalContext(mapBuilder), block)
}
@ -53,7 +53,7 @@ fun <T> runBlockingWithCoroutineLocalContext(
fun CoroutineScope.launchWithCoroutineScopeContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
block: suspend CoroutineScope.() -> Unit,
): Job {
return launch(context + CoroutineScopeContext(this), start, block)
}
@ -62,7 +62,7 @@ fun CoroutineScope.launchWithCoroutineScopeContext(
fun <T> CoroutineScope.asyncWithCoroutineScopeContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
block: suspend CoroutineScope.() -> T,
): Deferred<T> {
return async(context + CoroutineScopeContext(this), start, block)
}
@ -71,7 +71,7 @@ fun CoroutineScope.launchWithCoroutineLocalAndCoroutineScopeContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> Unit
block: suspend CoroutineScope.() -> Unit,
): Job {
return launch(context + CoroutineLocalContext(mapBuilder) + CoroutineScopeContext(this), start, block)
}
@ -81,7 +81,7 @@ fun <T> CoroutineScope.asyncWithCoroutineLocalAndCoroutineScopeContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> T
block: suspend CoroutineScope.() -> T,
): Deferred<T> {
return async(context + CoroutineLocalContext(mapBuilder) + CoroutineScopeContext(this), start, block)
}
@ -90,7 +90,7 @@ fun <T> CoroutineScope.asyncWithCoroutineLocalAndCoroutineScopeContext(
fun <T> runBlockingWithCoroutineLocalAndCoroutineScopeContext(
context: CoroutineContext = EmptyCoroutineContext,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> T
block: suspend CoroutineScope.() -> T,
): T {
return runBlocking(context + CoroutineLocalContext(mapBuilder) + CoroutineScopeContext()) {
CoroutineScopeContext set this
@ -103,7 +103,7 @@ fun CoroutineScope.launchWithEnhanceContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> Unit
block: suspend CoroutineScope.() -> Unit,
): Job {
return launch(context + CoroutineLocalContext(mapBuilder), start, block)
}
@ -113,7 +113,7 @@ fun <T> CoroutineScope.asyncWithEnhanceContext(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> T
block: suspend CoroutineScope.() -> T,
): Deferred<T> {
return async(context + CoroutineLocalContext(mapBuilder), start, block)
}
@ -122,7 +122,7 @@ fun <T> CoroutineScope.asyncWithEnhanceContext(
fun <T> runBlockingWithEnhanceContext(
context: CoroutineContext = EmptyCoroutineContext,
mapBuilder: () -> MutableMap<CoroutineLocal<*>, Any?> = { HashMap(4) },
block: suspend CoroutineScope.() -> T
block: suspend CoroutineScope.() -> T,
): T {
return runBlocking(context + CoroutineLocalContext(mapBuilder)) {
block()
@ -130,7 +130,7 @@ fun <T> runBlockingWithEnhanceContext(
}
suspend fun <T> runWithCoroutineLocalContext(
block: suspend () -> T
block: suspend () -> T,
): T {
val continuation: Any? = getContinuation()
val coroutineLocalContinuation = if (continuation is Continuation<*>) {
@ -142,7 +142,7 @@ suspend fun <T> runWithCoroutineLocalContext(
}
suspend fun <T> runWithCoroutineLocal(
block: suspend () -> T
block: suspend () -> T,
): T {
if (coroutineContext[CoroutineLocalContext] == null) {
return runWithCoroutineLocalContext(block)
@ -164,7 +164,7 @@ suspend inline fun <T> getTypedContinuation(): Continuation<T> = getContinuation
suspend fun injectCoroutineContext(
coroutineContext: CoroutineContext,
key: CoroutineContext.Key<out CoroutineContext.Element>? = null
key: CoroutineContext.Key<out CoroutineContext.Element>? = null,
): Boolean {
return if (key == null || coroutineContext[key] == null) {
getContinuation().injectCoroutineContext(coroutineContext, key)
@ -182,7 +182,7 @@ suspend fun injectCoroutineLocalContext(coroutineLocalContext: CoroutineLocalCon
}
fun Continuation<*>.injectCoroutineLocalContext(
coroutineLocalContext: CoroutineLocalContext? = null
coroutineLocalContext: CoroutineLocalContext? = null,
): Boolean {
return if (context[CoroutineLocalContext] == null) {
injectCoroutineContext(coroutineLocalContext ?: CoroutineLocalContext(), CoroutineLocalContext)
@ -197,7 +197,7 @@ private val BaseContinuationImplCompletion =
fun Continuation<*>.injectCoroutineContext(
coroutineContext: CoroutineContext,
key: CoroutineContext.Key<out CoroutineContext.Element>? = null
key: CoroutineContext.Key<out CoroutineContext.Element>? = null,
): Boolean {
if (key != null && context[key] != null) return true
if (BaseContinuationImpl.isInstance(this)) {
@ -233,12 +233,12 @@ fun combinedContext(coroutineContext: CoroutineContext): Boolean {
suspend fun <T> runOnUiThread(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
action: suspend CoroutineScope.() -> T
action: suspend CoroutineScope.() -> T,
): T {
return withContext(coroutineContext + Dispatchers.Main, action)
}
@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
@OptIn(ExperimentalCoroutinesApi::class)
fun bufferTicker(
delayMillis: Long,
capacity: Int = 16,
@ -260,3 +260,31 @@ fun bufferTicker(
}
}
}
@OptIn(ExperimentalCoroutinesApi::class)
fun fastSlowTicker(
fastTicker: ReceiveChannel<Unit>,
slowTicker: ReceiveChannel<Unit>,
) = GlobalScope.produce {
var failure = 0
while (true) {
var receive = fastTicker.tryReceive()
if (receive.isSuccess) {
failure = 0
send(Unit)
continue
}
failure++
if (failure < 5) {
send(fastTicker.receive())
continue
}
send(slowTicker.receive())
do {
receive = fastTicker.tryReceive()
} while (receive.isSuccess)
failure--
}
}