From 463a5cf44c3ca9267584bce0c73917bc8ea532bb Mon Sep 17 00:00:00 2001 From: tursom <tursom@foxmail.com> Date: Tue, 6 Jul 2021 10:03:38 +0800 Subject: [PATCH 1/5] update AbstractWebSocketHandler --- .../core/ws/AbstractWebSocketHandler.kt | 103 +++++++++++++----- .../cn/tursom/core/ws/WebSocketClient.kt | 1 + 2 files changed, 78 insertions(+), 26 deletions(-) diff --git a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt index 495c8af..d966947 100644 --- a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt +++ b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt @@ -1,37 +1,88 @@ package cn.tursom.core.ws import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.uncheckedCast import io.netty.buffer.ByteBuf import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame import io.netty.handler.codec.http.websocketx.PingWebSocketFrame import io.netty.handler.codec.http.websocketx.PongWebSocketFrame import io.netty.handler.codec.http.websocketx.TextWebSocketFrame +import kotlin.reflect.KMutableProperty0 +import kotlin.reflect.KMutableProperty1 -@Suppress("unused") +@Suppress("unused", "MemberVisibilityCanBePrivate") open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHandler<T, H>> : WebSocketHandler<T, H> { - private var onOpen: ((client: T) -> Unit)? = null + fun <F : (A1) -> Unit, A1> addListener1( + p: KMutableProperty0<F?>, + newHandler: F?, + ) { + newHandler ?: return + val oldHandler = p.get() + p.set({ a1: A1 -> + oldHandler?.invoke(a1) + newHandler(a1) + }.uncheckedCast()) + } - fun onOpen(onOpen: ((client: T) -> Unit)) { + fun <F : (A1) -> Unit, A1> addListener1( + p: KMutableProperty1<AbstractWebSocketHandler<T, H>, F?>, + newHandler: F?, + ) { + newHandler ?: return + val oldHandler = p.get(this) + p.set(this, { a1: A1 -> + oldHandler?.invoke(a1) + newHandler(a1) + }.uncheckedCast()) + } + + fun <F : (A1, A2) -> Unit, A1, A2> addListener2( + p: KMutableProperty0<F?>, + newHandler: F?, + ) { + newHandler ?: return + val oldHandler = p.get() + p.set({ a1: A1, a2: A2 -> + oldHandler?.invoke(a1, a2) + newHandler(a1, a2) + }.uncheckedCast()) + } + + fun <F : (A1, A2) -> Unit, A1, A2> addListener2( + p: KMutableProperty1<AbstractWebSocketHandler<T, H>, F?>, + newHandler: F?, + ) { + newHandler ?: return + val oldHandler = p.get(this) + p.set(this, { a1: A1, a2: A2 -> + oldHandler?.invoke(a1, a2) + newHandler(a1, a2) + }.uncheckedCast()) + } + + var onOpen: ((client: T) -> Unit)? = null + + fun onOpen(onOpen: ((client: T) -> Unit)?) { this.onOpen = onOpen } override fun onOpen(client: T) { - onOpen?.also { it(client) } ?: super.onOpen(client) + onOpen?.invoke(client) } - private var onClose: ((client: T) -> Unit)? = null + var onClose: ((client: T) -> Unit)? = null - fun onClose(onClose: ((client: T) -> Unit)) { + fun onClose(onClose: ((client: T) -> Unit)?) { this.onClose = onClose } override fun onClose(client: T) { - onClose?.also { it(client) } ?: super.onClose(client) + onClose?.invoke(client) } - private var onError: ((client: T, e: Throwable) -> Unit)? = null + var onError: ((client: T, e: Throwable) -> Unit)? = null - fun onError(onError: ((client: T, e: Throwable) -> Unit)) { + fun onError(onError: ((client: T, e: Throwable) -> Unit)?) { this.onError = onError } @@ -39,7 +90,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand onError?.also { it(client, e) } ?: super.onError(client, e) } - private var readMessage1: ((client: T, msg: String) -> Unit)? = null + var readMessage1: ((client: T, msg: String) -> Unit)? = null @JvmName("readMessage1") fun readMessage(readMessage: (client: T, msg: String) -> Unit) { @@ -50,7 +101,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readMessage1?.also { it(client, msg) } ?: super.readMessage(client, msg) } - private var readMessage2: ((client: T, msg: TextWebSocketFrame) -> Unit)? = null + var readMessage2: ((client: T, msg: TextWebSocketFrame) -> Unit)? = null @JvmName("readMessage2") fun readMessage(readMessage: (client: T, msg: TextWebSocketFrame) -> Unit) { @@ -61,7 +112,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readMessage2?.also { it(client, msg) } ?: super.readMessage(client, msg) } - private var readMessage3: ((client: T, msg: ByteArray) -> Unit)? = null + var readMessage3: ((client: T, msg: ByteArray) -> Unit)? = null @JvmName("readMessage3") fun readMessage(readMessage: (client: T, msg: ByteArray) -> Unit) { @@ -72,7 +123,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readMessage3?.also { it(client, msg) } ?: super.readMessage(client, msg) } - private var readMessage4: ((client: T, msg: ByteBuf) -> Unit)? = null + var readMessage4: ((client: T, msg: ByteBuf) -> Unit)? = null @JvmName("readMessage4") fun readMessage(readMessage: (client: T, msg: ByteBuf) -> Unit) { @@ -83,7 +134,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readMessage4?.also { it(client, msg) } ?: super.readMessage(client, msg) } - private var readMessage5: ((client: T, msg: ByteBuffer) -> Unit)? = null + var readMessage5: ((client: T, msg: ByteBuffer) -> Unit)? = null @JvmName("readMessage5") fun readMessage(readMessage: (client: T, msg: ByteBuffer) -> Unit) { @@ -94,7 +145,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readMessage5?.also { it(client, msg) } ?: super.readMessage(client, msg) } - private var readMessage6: ((client: T, msg: BinaryWebSocketFrame) -> Unit)? = null + var readMessage6: ((client: T, msg: BinaryWebSocketFrame) -> Unit)? = null @JvmName("readMessage6") fun readMessage(readMessage: (client: T, msg: BinaryWebSocketFrame) -> Unit) { @@ -105,7 +156,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readMessage6?.also { it(client, msg) } ?: super.readMessage(client, msg) } - private var readPing1: ((client: T, msg: PingWebSocketFrame) -> Unit)? = null + var readPing1: ((client: T, msg: PingWebSocketFrame) -> Unit)? = null @JvmName("readPing1") fun readPing(readMessage: (client: T, msg: PingWebSocketFrame) -> Unit) { @@ -116,7 +167,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readPing1?.also { it(client, msg) } ?: super.readPing(client, msg) } - private var readPing2: ((client: T, msg: ByteBuf) -> Unit)? = null + var readPing2: ((client: T, msg: ByteBuf) -> Unit)? = null @JvmName("readPing2") fun readPing(readMessage: (client: T, msg: ByteBuf) -> Unit) { @@ -127,7 +178,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readPing2?.also { it(client, msg) } ?: super.readPing(client, msg) } - private var readPing3: ((client: T, msg: ByteBuffer) -> Unit)? = null + var readPing3: ((client: T, msg: ByteBuffer) -> Unit)? = null @JvmName("readPing3") fun readPing(readMessage: (client: T, msg: ByteBuffer) -> Unit) { @@ -138,7 +189,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readPing3?.also { it(client, msg) } ?: super.readPing(client, msg) } - private var readPing4: ((client: T, msg: ByteArray) -> Unit)? = null + var readPing4: ((client: T, msg: ByteArray) -> Unit)? = null @JvmName("readPing4") fun readPing(readMessage: (client: T, msg: ByteArray) -> Unit) { @@ -149,7 +200,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readPing4?.also { it(client, msg) } ?: super.readPing(client, msg) } - private var readPing5: ((client: T, msg: String) -> Unit)? = null + var readPing5: ((client: T, msg: String) -> Unit)? = null @JvmName("readPing5") fun readPing(readMessage: (client: T, msg: String) -> Unit) { @@ -160,7 +211,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readPing5?.also { it(client, msg) } ?: super.readPing(client, msg) } - private var readPong1: ((client: T, msg: PongWebSocketFrame) -> Unit)? = null + var readPong1: ((client: T, msg: PongWebSocketFrame) -> Unit)? = null @JvmName("readPong1") fun readPong(readMessage: (client: T, msg: PongWebSocketFrame) -> Unit) { @@ -171,7 +222,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readPong1?.also { it(client, msg) } ?: super.readPong(client, msg) } - private var readPong2: ((client: T, msg: ByteBuf) -> Unit)? = null + var readPong2: ((client: T, msg: ByteBuf) -> Unit)? = null @JvmName("readPong2") fun readPong(readMessage: (client: T, msg: ByteBuf) -> Unit) { @@ -182,7 +233,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readPong2?.also { it(client, msg) } ?: super.readPong(client, msg) } - private var readPong3: ((client: T, msg: ByteBuffer) -> Unit)? = null + var readPong3: ((client: T, msg: ByteBuffer) -> Unit)? = null @JvmName("readPong3") fun readPong(readMessage: (client: T, msg: ByteBuffer) -> Unit) { @@ -193,7 +244,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readPong3?.also { it(client, msg) } ?: super.readPong(client, msg) } - private var readPong4: ((client: T, msg: ByteArray) -> Unit)? = null + var readPong4: ((client: T, msg: ByteArray) -> Unit)? = null @JvmName("readPong4") fun readPong(readMessage: (client: T, msg: ByteArray) -> Unit) { @@ -204,7 +255,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand readPong4?.also { it(client, msg) } ?: super.readPong(client, msg) } - private var readPong5: ((client: T, msg: String) -> Unit)? = null + var readPong5: ((client: T, msg: String) -> Unit)? = null @JvmName("readPong5") fun readPong(readMessage: (client: T, msg: String) -> Unit) { @@ -214,4 +265,4 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand override fun readPong(client: T, msg: String) { readPong5?.also { it(client, msg) } ?: super.readPong(client, msg) } -} \ No newline at end of file +} diff --git a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt index 218cd60..d49870c 100644 --- a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt +++ b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt @@ -40,6 +40,7 @@ open class WebSocketClient<in T : WebSocketClient<T, H>, H : WebSocketHandler<T, internal set init { + uncheckedCast<T>() ShutdownHook.addHook { close() } From 5d2ab81a6d0ec67fc9e874cf489219a478e174ba Mon Sep 17 00:00:00 2001 From: tursom <tursom@foxmail.com> Date: Tue, 6 Jul 2021 10:04:18 +0800 Subject: [PATCH 2/5] update AbstractWebSocketHandler --- .../kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt index d966947..9749f3c 100644 --- a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt +++ b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt @@ -12,7 +12,7 @@ import kotlin.reflect.KMutableProperty1 @Suppress("unused", "MemberVisibilityCanBePrivate") open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHandler<T, H>> : WebSocketHandler<T, H> { - fun <F : (A1) -> Unit, A1> addListener1( + fun <F : (A1) -> Unit, A1> addListener( p: KMutableProperty0<F?>, newHandler: F?, ) { @@ -24,7 +24,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand }.uncheckedCast()) } - fun <F : (A1) -> Unit, A1> addListener1( + fun <F : (A1) -> Unit, A1> addListener( p: KMutableProperty1<AbstractWebSocketHandler<T, H>, F?>, newHandler: F?, ) { @@ -36,7 +36,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand }.uncheckedCast()) } - fun <F : (A1, A2) -> Unit, A1, A2> addListener2( + fun <F : (A1, A2) -> Unit, A1, A2> addListener( p: KMutableProperty0<F?>, newHandler: F?, ) { @@ -48,7 +48,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand }.uncheckedCast()) } - fun <F : (A1, A2) -> Unit, A1, A2> addListener2( + fun <F : (A1, A2) -> Unit, A1, A2> addListener( p: KMutableProperty1<AbstractWebSocketHandler<T, H>, F?>, newHandler: F?, ) { From bf555ba987988f33a51548381b1a1286a1210c27 Mon Sep 17 00:00:00 2001 From: tursom <tursom@foxmail.com> Date: Tue, 6 Jul 2021 10:11:43 +0800 Subject: [PATCH 3/5] update AbstractWebSocketHandler --- .../core/ws/AbstractWebSocketHandler.kt | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt index 9749f3c..bfc1d0f 100644 --- a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt +++ b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt @@ -1,7 +1,6 @@ package cn.tursom.core.ws import cn.tursom.core.buffer.ByteBuffer -import cn.tursom.core.uncheckedCast import io.netty.buffer.ByteBuf import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame import io.netty.handler.codec.http.websocketx.PingWebSocketFrame @@ -12,52 +11,52 @@ import kotlin.reflect.KMutableProperty1 @Suppress("unused", "MemberVisibilityCanBePrivate") open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHandler<T, H>> : WebSocketHandler<T, H> { - fun <F : (A1) -> Unit, A1> addListener( - p: KMutableProperty0<F?>, - newHandler: F?, + fun <A1> addListener( + p: KMutableProperty0<((A1) -> Unit)?>, + newHandler: ((A1) -> Unit)?, ) { newHandler ?: return val oldHandler = p.get() - p.set({ a1: A1 -> + p.set { a1: A1 -> oldHandler?.invoke(a1) newHandler(a1) - }.uncheckedCast()) + } } - fun <F : (A1) -> Unit, A1> addListener( - p: KMutableProperty1<AbstractWebSocketHandler<T, H>, F?>, - newHandler: F?, + fun <A1> addListener( + p: KMutableProperty1<AbstractWebSocketHandler<T, H>, ((A1) -> Unit)?>, + newHandler: ((A1) -> Unit)?, ) { newHandler ?: return val oldHandler = p.get(this) - p.set(this, { a1: A1 -> + p.set(this) { a1: A1 -> oldHandler?.invoke(a1) newHandler(a1) - }.uncheckedCast()) + } } - fun <F : (A1, A2) -> Unit, A1, A2> addListener( - p: KMutableProperty0<F?>, - newHandler: F?, + fun <A1, A2> addListener( + p: KMutableProperty0<((A1, A2) -> Unit)?>, + newHandler: ((A1, A2) -> Unit)?, ) { newHandler ?: return val oldHandler = p.get() - p.set({ a1: A1, a2: A2 -> + p.set { a1: A1, a2: A2 -> oldHandler?.invoke(a1, a2) newHandler(a1, a2) - }.uncheckedCast()) + } } - fun <F : (A1, A2) -> Unit, A1, A2> addListener( - p: KMutableProperty1<AbstractWebSocketHandler<T, H>, F?>, - newHandler: F?, + fun <A1, A2> addListener( + p: KMutableProperty1<AbstractWebSocketHandler<T, H>, ((A1, A2) -> Unit)?>, + newHandler: ((A1, A2) -> Unit)?, ) { newHandler ?: return val oldHandler = p.get(this) - p.set(this, { a1: A1, a2: A2 -> + p.set(this) { a1: A1, a2: A2 -> oldHandler?.invoke(a1, a2) newHandler(a1, a2) - }.uncheckedCast()) + } } var onOpen: ((client: T) -> Unit)? = null From f6369cd1eded155296472cc0f1e37b416d49740f Mon Sep 17 00:00:00 2001 From: tursom <tursom@foxmail.com> Date: Tue, 6 Jul 2021 10:14:33 +0800 Subject: [PATCH 4/5] update AbstractWebSocketHandler --- .../core/ws/AbstractWebSocketHandler.kt | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt index bfc1d0f..5073c66 100644 --- a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt +++ b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/AbstractWebSocketHandler.kt @@ -9,13 +9,12 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame import kotlin.reflect.KMutableProperty0 import kotlin.reflect.KMutableProperty1 -@Suppress("unused", "MemberVisibilityCanBePrivate") +@Suppress("unused", "MemberVisibilityCanBePrivate", "NOTHING_TO_INLINE") open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHandler<T, H>> : WebSocketHandler<T, H> { - fun <A1> addListener( + inline fun <A1> addListener( p: KMutableProperty0<((A1) -> Unit)?>, - newHandler: ((A1) -> Unit)?, + crossinline newHandler: (A1) -> Unit, ) { - newHandler ?: return val oldHandler = p.get() p.set { a1: A1 -> oldHandler?.invoke(a1) @@ -23,11 +22,10 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand } } - fun <A1> addListener( + inline fun <A1> addListener( p: KMutableProperty1<AbstractWebSocketHandler<T, H>, ((A1) -> Unit)?>, - newHandler: ((A1) -> Unit)?, + crossinline newHandler: (A1) -> Unit, ) { - newHandler ?: return val oldHandler = p.get(this) p.set(this) { a1: A1 -> oldHandler?.invoke(a1) @@ -35,11 +33,10 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand } } - fun <A1, A2> addListener( + inline fun <A1, A2> addListener( p: KMutableProperty0<((A1, A2) -> Unit)?>, - newHandler: ((A1, A2) -> Unit)?, + noinline newHandler: (A1, A2) -> Unit, ) { - newHandler ?: return val oldHandler = p.get() p.set { a1: A1, a2: A2 -> oldHandler?.invoke(a1, a2) @@ -47,11 +44,10 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand } } - fun <A1, A2> addListener( + inline fun <A1, A2> addListener( p: KMutableProperty1<AbstractWebSocketHandler<T, H>, ((A1, A2) -> Unit)?>, - newHandler: ((A1, A2) -> Unit)?, + crossinline newHandler: (A1, A2) -> Unit, ) { - newHandler ?: return val oldHandler = p.get(this) p.set(this) { a1: A1, a2: A2 -> oldHandler?.invoke(a1, a2) From 3aac8700458603cf5ed4da65f4adac85f86557df Mon Sep 17 00:00:00 2001 From: tursom <tursom@foxmail.com> Date: Tue, 6 Jul 2021 15:13:21 +0800 Subject: [PATCH 5/5] update --- .../cn/tursom/core/ws/WebSocketClient.kt | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt index d49870c..a3a7e22 100644 --- a/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt +++ b/ts-core/ts-ws-client/src/main/kotlin/cn/tursom/core/ws/WebSocketClient.kt @@ -3,7 +3,9 @@ package cn.tursom.core.ws import cn.tursom.core.ShutdownHook import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.impl.NettyByteBuffer +import cn.tursom.core.notifyAll import cn.tursom.core.uncheckedCast +import cn.tursom.core.wait import io.netty.bootstrap.Bootstrap import io.netty.buffer.ByteBuf import io.netty.buffer.Unpooled @@ -20,6 +22,8 @@ import io.netty.handler.logging.LoggingHandler import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.util.InsecureTrustManagerFactory import java.net.URI +import java.util.concurrent.ThreadFactory +import java.util.concurrent.atomic.AtomicInteger @Suppress("unused") @@ -38,6 +42,8 @@ open class WebSocketClient<in T : WebSocketClient<T, H>, H : WebSocketHandler<T, private val uri: URI = URI.create(url) var ch: Channel? = null internal set + var closed: Boolean = false + private set init { uncheckedCast<T>() @@ -83,7 +89,7 @@ open class WebSocketClient<in T : WebSocketClient<T, H>, H : WebSocketHandler<T, ) val handler = WebSocketClientChannelHandler(uncheckedCast(), handler, autoRelease) val bootstrap = Bootstrap() - bootstrap.group(group) + .group(group) .channel(NioSocketChannel::class.java) .handler(object : ChannelInitializer<SocketChannel>() { override fun initChannel(ch: SocketChannel) { @@ -197,12 +203,24 @@ open class WebSocketClient<in T : WebSocketClient<T, H>, H : WebSocketHandler<T, } open fun onOpen() { + closed = false } open fun onClose() { + closed = true + notifyAll { } + } + + fun waitClose() { + if (!closed) wait { } } companion object { - val group: EventLoopGroup = NioEventLoopGroup() + private val threadId = AtomicInteger() + private val group: EventLoopGroup = NioEventLoopGroup(0, ThreadFactory { + val thread = Thread(it, "WebSocketClient-${threadId.incrementAndGet()}") + thread.isDaemon = true + thread + }) } }