diff --git a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt index 495c8af..5073c66 100644 --- a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt +++ b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt @@ -6,32 +6,78 @@ import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame import io.netty.handler.codec.http.websocketx.PingWebSocketFrame import io.netty.handler.codec.http.websocketx.PongWebSocketFrame import io.netty.handler.codec.http.websocketx.TextWebSocketFrame +import kotlin.reflect.KMutableProperty0 +import kotlin.reflect.KMutableProperty1 -@Suppress("unused") +@Suppress("unused", "MemberVisibilityCanBePrivate", "NOTHING_TO_INLINE") open class AbstractWebSocketHandler, H : WebSocketHandler> : WebSocketHandler { - private var onOpen: ((client: T) -> Unit)? = null + inline fun addListener( + p: KMutableProperty0<((A1) -> Unit)?>, + crossinline newHandler: (A1) -> Unit, + ) { + val oldHandler = p.get() + p.set { a1: A1 -> + oldHandler?.invoke(a1) + newHandler(a1) + } + } - fun onOpen(onOpen: ((client: T) -> Unit)) { + inline fun addListener( + p: KMutableProperty1, ((A1) -> Unit)?>, + crossinline newHandler: (A1) -> Unit, + ) { + val oldHandler = p.get(this) + p.set(this) { a1: A1 -> + oldHandler?.invoke(a1) + newHandler(a1) + } + } + + inline fun addListener( + p: KMutableProperty0<((A1, A2) -> Unit)?>, + noinline newHandler: (A1, A2) -> Unit, + ) { + val oldHandler = p.get() + p.set { a1: A1, a2: A2 -> + oldHandler?.invoke(a1, a2) + newHandler(a1, a2) + } + } + + inline fun addListener( + p: KMutableProperty1, ((A1, A2) -> Unit)?>, + crossinline newHandler: (A1, A2) -> Unit, + ) { + val oldHandler = p.get(this) + p.set(this) { a1: A1, a2: A2 -> + oldHandler?.invoke(a1, a2) + newHandler(a1, a2) + } + } + + var onOpen: ((client: T) -> Unit)? = null + + fun onOpen(onOpen: ((client: T) -> Unit)?) { this.onOpen = onOpen } override fun onOpen(client: T) { - onOpen?.also { it(client) } ?: super.onOpen(client) + onOpen?.invoke(client) } - private var onClose: ((client: T) -> Unit)? = null + var onClose: ((client: T) -> Unit)? = null - fun onClose(onClose: ((client: T) -> Unit)) { + fun onClose(onClose: ((client: T) -> Unit)?) { this.onClose = onClose } override fun onClose(client: T) { - onClose?.also { it(client) } ?: super.onClose(client) + onClose?.invoke(client) } - private var onError: ((client: T, e: Throwable) -> Unit)? = null + var onError: ((client: T, e: Throwable) -> Unit)? = null - fun onError(onError: ((client: T, e: Throwable) -> Unit)) { + fun onError(onError: ((client: T, e: Throwable) -> Unit)?) { this.onError = onError } @@ -39,7 +85,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand onError?.also { it(client, e) } ?: super.onError(client, e) } - private var readMessage1: ((client: T, msg: String) -> Unit)? = null + var readMessage1: ((client: T, msg: String) -> Unit)? = null @JvmName("readMessage1") fun readMessage(readMessage: (client: T, msg: String) -> Unit) { @@ -50,7 +96,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readMessage1?.also { it(client, msg) } ?: super.readMessage(client, msg) } - private var readMessage2: ((client: T, msg: TextWebSocketFrame) -> Unit)? = null + var readMessage2: ((client: T, msg: TextWebSocketFrame) -> Unit)? = null @JvmName("readMessage2") fun readMessage(readMessage: (client: T, msg: TextWebSocketFrame) -> Unit) { @@ -61,7 +107,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readMessage2?.also { it(client, msg) } ?: super.readMessage(client, msg) } - private var readMessage3: ((client: T, msg: ByteArray) -> Unit)? = null + var readMessage3: ((client: T, msg: ByteArray) -> Unit)? = null @JvmName("readMessage3") fun readMessage(readMessage: (client: T, msg: ByteArray) -> Unit) { @@ -72,7 +118,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readMessage3?.also { it(client, msg) } ?: super.readMessage(client, msg) } - private var readMessage4: ((client: T, msg: ByteBuf) -> Unit)? = null + var readMessage4: ((client: T, msg: ByteBuf) -> Unit)? = null @JvmName("readMessage4") fun readMessage(readMessage: (client: T, msg: ByteBuf) -> Unit) { @@ -83,7 +129,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readMessage4?.also { it(client, msg) } ?: super.readMessage(client, msg) } - private var readMessage5: ((client: T, msg: ByteBuffer) -> Unit)? = null + var readMessage5: ((client: T, msg: ByteBuffer) -> Unit)? = null @JvmName("readMessage5") fun readMessage(readMessage: (client: T, msg: ByteBuffer) -> Unit) { @@ -94,7 +140,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readMessage5?.also { it(client, msg) } ?: super.readMessage(client, msg) } - private var readMessage6: ((client: T, msg: BinaryWebSocketFrame) -> Unit)? = null + var readMessage6: ((client: T, msg: BinaryWebSocketFrame) -> Unit)? = null @JvmName("readMessage6") fun readMessage(readMessage: (client: T, msg: BinaryWebSocketFrame) -> Unit) { @@ -105,7 +151,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readMessage6?.also { it(client, msg) } ?: super.readMessage(client, msg) } - private var readPing1: ((client: T, msg: PingWebSocketFrame) -> Unit)? = null + var readPing1: ((client: T, msg: PingWebSocketFrame) -> Unit)? = null @JvmName("readPing1") fun readPing(readMessage: (client: T, msg: PingWebSocketFrame) -> Unit) { @@ -116,7 +162,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readPing1?.also { it(client, msg) } ?: super.readPing(client, msg) } - private var readPing2: ((client: T, msg: ByteBuf) -> Unit)? = null + var readPing2: ((client: T, msg: ByteBuf) -> Unit)? = null @JvmName("readPing2") fun readPing(readMessage: (client: T, msg: ByteBuf) -> Unit) { @@ -127,7 +173,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readPing2?.also { it(client, msg) } ?: super.readPing(client, msg) } - private var readPing3: ((client: T, msg: ByteBuffer) -> Unit)? = null + var readPing3: ((client: T, msg: ByteBuffer) -> Unit)? = null @JvmName("readPing3") fun readPing(readMessage: (client: T, msg: ByteBuffer) -> Unit) { @@ -138,7 +184,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readPing3?.also { it(client, msg) } ?: super.readPing(client, msg) } - private var readPing4: ((client: T, msg: ByteArray) -> Unit)? = null + var readPing4: ((client: T, msg: ByteArray) -> Unit)? = null @JvmName("readPing4") fun readPing(readMessage: (client: T, msg: ByteArray) -> Unit) { @@ -149,7 +195,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readPing4?.also { it(client, msg) } ?: super.readPing(client, msg) } - private var readPing5: ((client: T, msg: String) -> Unit)? = null + var readPing5: ((client: T, msg: String) -> Unit)? = null @JvmName("readPing5") fun readPing(readMessage: (client: T, msg: String) -> Unit) { @@ -160,7 +206,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readPing5?.also { it(client, msg) } ?: super.readPing(client, msg) } - private var readPong1: ((client: T, msg: PongWebSocketFrame) -> Unit)? = null + var readPong1: ((client: T, msg: PongWebSocketFrame) -> Unit)? = null @JvmName("readPong1") fun readPong(readMessage: (client: T, msg: PongWebSocketFrame) -> Unit) { @@ -171,7 +217,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readPong1?.also { it(client, msg) } ?: super.readPong(client, msg) } - private var readPong2: ((client: T, msg: ByteBuf) -> Unit)? = null + var readPong2: ((client: T, msg: ByteBuf) -> Unit)? = null @JvmName("readPong2") fun readPong(readMessage: (client: T, msg: ByteBuf) -> Unit) { @@ -182,7 +228,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readPong2?.also { it(client, msg) } ?: super.readPong(client, msg) } - private var readPong3: ((client: T, msg: ByteBuffer) -> Unit)? = null + var readPong3: ((client: T, msg: ByteBuffer) -> Unit)? = null @JvmName("readPong3") fun readPong(readMessage: (client: T, msg: ByteBuffer) -> Unit) { @@ -193,7 +239,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readPong3?.also { it(client, msg) } ?: super.readPong(client, msg) } - private var readPong4: ((client: T, msg: ByteArray) -> Unit)? = null + var readPong4: ((client: T, msg: ByteArray) -> Unit)? = null @JvmName("readPong4") fun readPong(readMessage: (client: T, msg: ByteArray) -> Unit) { @@ -204,7 +250,7 @@ open class AbstractWebSocketHandler, H : WebSocketHand readPong4?.also { it(client, msg) } ?: super.readPong(client, msg) } - private var readPong5: ((client: T, msg: String) -> Unit)? = null + var readPong5: ((client: T, msg: String) -> Unit)? = null @JvmName("readPong5") fun readPong(readMessage: (client: T, msg: String) -> Unit) { @@ -214,4 +260,4 @@ open class AbstractWebSocketHandler, H : WebSocketHand override fun readPong(client: T, msg: String) { readPong5?.also { it(client, msg) } ?: super.readPong(client, msg) } -} \ No newline at end of file +} diff --git a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt index 218cd60..a3a7e22 100644 --- a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt +++ b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt @@ -3,7 +3,9 @@ package cn.tursom.core.ws import cn.tursom.core.ShutdownHook import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.impl.NettyByteBuffer +import cn.tursom.core.notifyAll import cn.tursom.core.uncheckedCast +import cn.tursom.core.wait import io.netty.bootstrap.Bootstrap import io.netty.buffer.ByteBuf import io.netty.buffer.Unpooled @@ -20,6 +22,8 @@ import io.netty.handler.logging.LoggingHandler import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.util.InsecureTrustManagerFactory import java.net.URI +import java.util.concurrent.ThreadFactory +import java.util.concurrent.atomic.AtomicInteger @Suppress("unused") @@ -38,8 +42,11 @@ open class WebSocketClient, H : WebSocketHandler() ShutdownHook.addHook { close() } @@ -82,7 +89,7 @@ open class WebSocketClient, H : WebSocketHandler() { override fun initChannel(ch: SocketChannel) { @@ -196,12 +203,24 @@ open class WebSocketClient, H : WebSocketHandler