diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/FlvChecker.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/FlvChecker.kt
deleted file mode 100644
index 8d09527..0000000
--- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/FlvChecker.kt
+++ /dev/null
@@ -1,151 +0,0 @@
-package cn.tursom.core
-
-import cn.tursom.core.stream.InputStream
-import cn.tursom.core.stream.OutputStream
-import java.io.IOException
-
-/**
- * 原理 https://www.cnblogs.com/lidabo/p/9018548.html
- */
-class FlvChecker {
-  // 用于缓冲
-  private val buffer = ByteArray(1024 * 1024 * 16)
-
-  /**
-   * 从头部开始Check, 重新锚定时间戳, 将最后一帧(不管是否完整)去掉
-   *
-   * @param path
-   * @throws IOException
-   */
-// 用于统计时间戳
-  private val lastTimestampRead = intArrayOf(-1, -1)
-  private val lastTimestampWrite = intArrayOf(-1, -1)
-
-  /**
-   * 从头部开始Check, 重新锚定时间戳, 将最后一帧(不管是否完整)去掉
-   */
-  fun check(raf: InputStream, rafNew: OutputStream) {
-    // 用于排除无效尾巴帧
-    // 复制头部
-    raf.read(buffer, 0, 9)
-    rafNew.write(buffer, 0, 9)
-    try {
-      //var remain = 40
-      //var timestamp = 0
-      loop@ while (true) {
-        //remain--
-        // 读取前一个tag size
-        readBytesToInt(raf, 4)
-        // Logger.print("前一个长度为:" + predataSize);
-        // 读取tag
-        // tag 类型
-        when (val tagType = raf.read().toInt()) {
-          8, 9 -> {
-            rafNew.write(buffer, 0, 4)
-            rafNew.write(tagType.toByte())
-            // tag data size 3个字节。表示tag data的长度。从streamd id 后算起。
-            val dataSize = readBytesToInt(raf, 3)
-            rafNew.write(buffer, 0, 3)
-            // 时间戳 3
-            val timestamp = readBytesToInt(raf, 3) + (raf.read().toInt() shl 24)
-            //timestamp += timestampEx
-            dealTimestamp(rafNew, timestamp, tagType - 8)
-            raf.read(buffer, 0, 3 + dataSize)
-            rafNew.write(buffer, 0, 3 + dataSize)
-          }
-          18 -> {
-            // 18 scripts
-            // 如果是scripts脚本,默认为第一个tag,此时将前一个tag Size 置零
-            val zeroTimestamp = byteArrayOf(0, 0, 0, 0)
-            rafNew.write(zeroTimestamp)
-            rafNew.write(tagType.toByte())
-            val dataSize = readBytesToInt(raf, 3)
-            rafNew.write(buffer, 0, 3)
-            raf.read(buffer, 0, 4)
-            val zeros = byteArrayOf(0, 0, 0)
-            rafNew.write(zeros) // 时间戳 0
-            rafNew.write(0) // 时间戳扩展 0
-            raf.read(buffer, 0, 3 + dataSize)
-            rafNew.write(buffer, 0, 3 + dataSize)
-
-          }
-          else -> {
-            break@loop
-          }
-        }
-      }
-    } catch (e: Exception) {
-      e.printStackTrace()
-    }
-  }
-
-  /**
-   * 处理音/视频时间戳
-   *
-   * @param raf
-   * @param timestamp
-   * @throws IOException
-   * @return 是否忽略该tag
-   */
-  private fun dealTimestamp(raf: OutputStream, timestamp: Int, tagType: Int): Boolean {
-    // 如果是首帧
-    if (lastTimestampRead[tagType] == -1) {
-      lastTimestampWrite[tagType] = 0
-    } else if (timestamp >= lastTimestampRead[tagType]) {
-      // 如果时序正常
-      // 间隔十分巨大(1s),那么重新开始即可
-      if (timestamp > lastTimestampRead[tagType] + 1000) {
-        lastTimestampWrite[tagType] += 10
-      } else {
-        lastTimestampWrite[tagType] = timestamp - lastTimestampRead[tagType] + lastTimestampWrite[tagType]
-      }
-    } else {
-      // 如果出现倒序时间戳
-      // 如果间隔不大,那么如实反馈
-      if (lastTimestampRead[tagType] - timestamp < 5 * 1000) {
-        var tmp = timestamp - lastTimestampRead[tagType] + lastTimestampWrite[tagType]
-        tmp = if (tmp > 0) tmp else 1
-        lastTimestampWrite[tagType] = tmp
-      } else {
-        // 间隔十分巨大,那么重新开始即可
-        lastTimestampWrite[tagType] += 10
-      }
-    }
-    lastTimestampRead[tagType] = timestamp
-    // 低于0xffffff部分
-    val lowCurrentTime = lastTimestampWrite[tagType] and 0xffffff
-    raf.write(int2Bytes(lowCurrentTime), 1, 3)
-    // 高于0xffffff部分
-    val highCurrentTime = lastTimestampWrite[tagType] shr 24
-    raf.write(highCurrentTime.toByte())
-    return true
-  }
-
-  /**
-   * @param raf
-   * @param byteLength
-   * @return
-   * @throws IOException
-   */
-  private fun readBytesToInt(raf: InputStream, byteLength: Int): Int {
-    raf.read(buffer, 0, byteLength)
-    return bytes2Int(buffer, byteLength)
-  }
-
-  private fun int2Bytes(value: Int): ByteArray {
-    val byteRet = ByteArray(4)
-    for (i in 0..3) {
-      byteRet[3 - i] = (value shr 8 * i and 0xff).toByte()
-    }
-    return byteRet
-  }
-
-  private fun bytes2Int(bytes: ByteArray, byteLength: Int): Int {
-    var result = 0
-    for (i in 0 until byteLength) {
-      result = result or (bytes[byteLength - 1 - i].toInt() and 0xff shl i * 8)
-      // System.out.printf("%x ",(bytes[i] & 0xff));
-    }
-    return result
-  }
-}
diff --git a/ts-socket/src/main/kotlin/cn/tursom/channel/enhance/ChannelPipeline.kt b/ts-socket/src/main/kotlin/cn/tursom/channel/enhance/ChannelPipeline.kt
new file mode 100644
index 0000000..85d9e89
--- /dev/null
+++ b/ts-socket/src/main/kotlin/cn/tursom/channel/enhance/ChannelPipeline.kt
@@ -0,0 +1,34 @@
+package cn.tursom.channel.enhance
+
+import cn.tursom.core.ShutdownHook
+import cn.tursom.core.pool.HeapMemoryPool
+import cn.tursom.core.pool.MemoryPool
+import kotlinx.coroutines.DelicateCoroutinesApi
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.launch
+import java.io.Closeable
+import kotlin.coroutines.CoroutineContext
+
+@OptIn(DelicateCoroutinesApi::class)
+class ChannelPipeline<V>(
+  context: CoroutineContext,
+  private val reader: ChannelReader<V>,
+  private val writer: ChannelWriter<V>,
+  private val pool: MemoryPool = HeapMemoryPool(),
+) : Closeable {
+  private val job = GlobalScope.launch(context) {
+    while (true) {
+      val buffer = reader.read(pool)
+      writer.write(buffer)
+    }
+  }
+
+  @Suppress("unused")
+  private val hook = ShutdownHook.addHook(softReference = true) {
+    close()
+  }
+
+  override fun close() {
+    job.cancel()
+  }
+}
diff --git a/ts-socket/src/main/kotlin/cn/tursom/channel/enhance/impl/CoroutineContextChannelReader.kt b/ts-socket/src/main/kotlin/cn/tursom/channel/enhance/impl/CoroutineContextChannelReader.kt
new file mode 100644
index 0000000..42bc856
--- /dev/null
+++ b/ts-socket/src/main/kotlin/cn/tursom/channel/enhance/impl/CoroutineContextChannelReader.kt
@@ -0,0 +1,21 @@
+package cn.tursom.channel.enhance.impl
+
+import cn.tursom.channel.enhance.ChannelReader
+import cn.tursom.core.pool.MemoryPool
+import kotlinx.coroutines.withContext
+import kotlin.coroutines.CoroutineContext
+
+class CoroutineContextChannelReader<T>(
+  var context: CoroutineContext,
+  private val prevReader: ChannelReader<T>,
+) : ChannelReader<T> {
+  override suspend fun read(pool: MemoryPool, timeout: Long): T {
+    return withContext(context) {
+      prevReader.read(pool, timeout)
+    }
+  }
+
+  override fun close() {
+    prevReader.close()
+  }
+}
diff --git a/ts-socket/src/main/kotlin/cn/tursom/channel/enhance/impl/CoroutineContextChannelWriter.kt b/ts-socket/src/main/kotlin/cn/tursom/channel/enhance/impl/CoroutineContextChannelWriter.kt
new file mode 100644
index 0000000..d0cbc0b
--- /dev/null
+++ b/ts-socket/src/main/kotlin/cn/tursom/channel/enhance/impl/CoroutineContextChannelWriter.kt
@@ -0,0 +1,45 @@
+package cn.tursom.channel.enhance.impl
+
+import cn.tursom.channel.enhance.ChannelWriter
+import kotlinx.coroutines.withContext
+import kotlin.coroutines.CoroutineContext
+
+class CoroutineContextChannelWriter<T>(
+  var context: CoroutineContext,
+  private val prevWriter: ChannelWriter<T>,
+) : ChannelWriter<T> {
+  override suspend fun write(vararg value: T) {
+    withContext(context) {
+      prevWriter.write(*value)
+    }
+  }
+
+  override suspend fun write(value: Collection<T>) {
+    withContext(context) {
+      prevWriter.write(value)
+    }
+  }
+
+  override suspend fun writeAndFlush(value: T, timeout: Long): Long {
+    return withContext(context) {
+      prevWriter.writeAndFlush(value, timeout)
+    }
+  }
+
+  override suspend fun write(value: T) {
+    withContext(context) {
+      prevWriter.write(value)
+    }
+  }
+
+  override suspend fun flush(timeout: Long): Long {
+    return withContext(context) {
+      prevWriter.flush()
+    }
+  }
+
+  override fun close() {
+    prevWriter.close()
+  }
+}
+