Merge remote-tracking branch 'origin/master' into master

This commit is contained in:
tursom 2021-07-08 22:05:40 +08:00
commit 21a6acb8f7
2 changed files with 93 additions and 28 deletions

View File

@ -6,32 +6,78 @@ import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
import kotlin.reflect.KMutableProperty0
import kotlin.reflect.KMutableProperty1
@Suppress("unused")
@Suppress("unused", "MemberVisibilityCanBePrivate", "NOTHING_TO_INLINE")
open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHandler<T, H>> : WebSocketHandler<T, H> {
private var onOpen: ((client: T) -> Unit)? = null
inline fun <A1> addListener(
p: KMutableProperty0<((A1) -> Unit)?>,
crossinline newHandler: (A1) -> Unit,
) {
val oldHandler = p.get()
p.set { a1: A1 ->
oldHandler?.invoke(a1)
newHandler(a1)
}
}
fun onOpen(onOpen: ((client: T) -> Unit)) {
inline fun <A1> addListener(
p: KMutableProperty1<AbstractWebSocketHandler<T, H>, ((A1) -> Unit)?>,
crossinline newHandler: (A1) -> Unit,
) {
val oldHandler = p.get(this)
p.set(this) { a1: A1 ->
oldHandler?.invoke(a1)
newHandler(a1)
}
}
inline fun <A1, A2> addListener(
p: KMutableProperty0<((A1, A2) -> Unit)?>,
noinline newHandler: (A1, A2) -> Unit,
) {
val oldHandler = p.get()
p.set { a1: A1, a2: A2 ->
oldHandler?.invoke(a1, a2)
newHandler(a1, a2)
}
}
inline fun <A1, A2> addListener(
p: KMutableProperty1<AbstractWebSocketHandler<T, H>, ((A1, A2) -> Unit)?>,
crossinline newHandler: (A1, A2) -> Unit,
) {
val oldHandler = p.get(this)
p.set(this) { a1: A1, a2: A2 ->
oldHandler?.invoke(a1, a2)
newHandler(a1, a2)
}
}
var onOpen: ((client: T) -> Unit)? = null
fun onOpen(onOpen: ((client: T) -> Unit)?) {
this.onOpen = onOpen
}
override fun onOpen(client: T) {
onOpen?.also { it(client) } ?: super.onOpen(client)
onOpen?.invoke(client)
}
private var onClose: ((client: T) -> Unit)? = null
var onClose: ((client: T) -> Unit)? = null
fun onClose(onClose: ((client: T) -> Unit)) {
fun onClose(onClose: ((client: T) -> Unit)?) {
this.onClose = onClose
}
override fun onClose(client: T) {
onClose?.also { it(client) } ?: super.onClose(client)
onClose?.invoke(client)
}
private var onError: ((client: T, e: Throwable) -> Unit)? = null
var onError: ((client: T, e: Throwable) -> Unit)? = null
fun onError(onError: ((client: T, e: Throwable) -> Unit)) {
fun onError(onError: ((client: T, e: Throwable) -> Unit)?) {
this.onError = onError
}
@ -39,7 +85,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
onError?.also { it(client, e) } ?: super.onError(client, e)
}
private var readMessage1: ((client: T, msg: String) -> Unit)? = null
var readMessage1: ((client: T, msg: String) -> Unit)? = null
@JvmName("readMessage1")
fun readMessage(readMessage: (client: T, msg: String) -> Unit) {
@ -50,7 +96,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readMessage1?.also { it(client, msg) } ?: super.readMessage(client, msg)
}
private var readMessage2: ((client: T, msg: TextWebSocketFrame) -> Unit)? = null
var readMessage2: ((client: T, msg: TextWebSocketFrame) -> Unit)? = null
@JvmName("readMessage2")
fun readMessage(readMessage: (client: T, msg: TextWebSocketFrame) -> Unit) {
@ -61,7 +107,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readMessage2?.also { it(client, msg) } ?: super.readMessage(client, msg)
}
private var readMessage3: ((client: T, msg: ByteArray) -> Unit)? = null
var readMessage3: ((client: T, msg: ByteArray) -> Unit)? = null
@JvmName("readMessage3")
fun readMessage(readMessage: (client: T, msg: ByteArray) -> Unit) {
@ -72,7 +118,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readMessage3?.also { it(client, msg) } ?: super.readMessage(client, msg)
}
private var readMessage4: ((client: T, msg: ByteBuf) -> Unit)? = null
var readMessage4: ((client: T, msg: ByteBuf) -> Unit)? = null
@JvmName("readMessage4")
fun readMessage(readMessage: (client: T, msg: ByteBuf) -> Unit) {
@ -83,7 +129,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readMessage4?.also { it(client, msg) } ?: super.readMessage(client, msg)
}
private var readMessage5: ((client: T, msg: ByteBuffer) -> Unit)? = null
var readMessage5: ((client: T, msg: ByteBuffer) -> Unit)? = null
@JvmName("readMessage5")
fun readMessage(readMessage: (client: T, msg: ByteBuffer) -> Unit) {
@ -94,7 +140,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readMessage5?.also { it(client, msg) } ?: super.readMessage(client, msg)
}
private var readMessage6: ((client: T, msg: BinaryWebSocketFrame) -> Unit)? = null
var readMessage6: ((client: T, msg: BinaryWebSocketFrame) -> Unit)? = null
@JvmName("readMessage6")
fun readMessage(readMessage: (client: T, msg: BinaryWebSocketFrame) -> Unit) {
@ -105,7 +151,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readMessage6?.also { it(client, msg) } ?: super.readMessage(client, msg)
}
private var readPing1: ((client: T, msg: PingWebSocketFrame) -> Unit)? = null
var readPing1: ((client: T, msg: PingWebSocketFrame) -> Unit)? = null
@JvmName("readPing1")
fun readPing(readMessage: (client: T, msg: PingWebSocketFrame) -> Unit) {
@ -116,7 +162,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readPing1?.also { it(client, msg) } ?: super.readPing(client, msg)
}
private var readPing2: ((client: T, msg: ByteBuf) -> Unit)? = null
var readPing2: ((client: T, msg: ByteBuf) -> Unit)? = null
@JvmName("readPing2")
fun readPing(readMessage: (client: T, msg: ByteBuf) -> Unit) {
@ -127,7 +173,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readPing2?.also { it(client, msg) } ?: super.readPing(client, msg)
}
private var readPing3: ((client: T, msg: ByteBuffer) -> Unit)? = null
var readPing3: ((client: T, msg: ByteBuffer) -> Unit)? = null
@JvmName("readPing3")
fun readPing(readMessage: (client: T, msg: ByteBuffer) -> Unit) {
@ -138,7 +184,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readPing3?.also { it(client, msg) } ?: super.readPing(client, msg)
}
private var readPing4: ((client: T, msg: ByteArray) -> Unit)? = null
var readPing4: ((client: T, msg: ByteArray) -> Unit)? = null
@JvmName("readPing4")
fun readPing(readMessage: (client: T, msg: ByteArray) -> Unit) {
@ -149,7 +195,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readPing4?.also { it(client, msg) } ?: super.readPing(client, msg)
}
private var readPing5: ((client: T, msg: String) -> Unit)? = null
var readPing5: ((client: T, msg: String) -> Unit)? = null
@JvmName("readPing5")
fun readPing(readMessage: (client: T, msg: String) -> Unit) {
@ -160,7 +206,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readPing5?.also { it(client, msg) } ?: super.readPing(client, msg)
}
private var readPong1: ((client: T, msg: PongWebSocketFrame) -> Unit)? = null
var readPong1: ((client: T, msg: PongWebSocketFrame) -> Unit)? = null
@JvmName("readPong1")
fun readPong(readMessage: (client: T, msg: PongWebSocketFrame) -> Unit) {
@ -171,7 +217,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readPong1?.also { it(client, msg) } ?: super.readPong(client, msg)
}
private var readPong2: ((client: T, msg: ByteBuf) -> Unit)? = null
var readPong2: ((client: T, msg: ByteBuf) -> Unit)? = null
@JvmName("readPong2")
fun readPong(readMessage: (client: T, msg: ByteBuf) -> Unit) {
@ -182,7 +228,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readPong2?.also { it(client, msg) } ?: super.readPong(client, msg)
}
private var readPong3: ((client: T, msg: ByteBuffer) -> Unit)? = null
var readPong3: ((client: T, msg: ByteBuffer) -> Unit)? = null
@JvmName("readPong3")
fun readPong(readMessage: (client: T, msg: ByteBuffer) -> Unit) {
@ -193,7 +239,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readPong3?.also { it(client, msg) } ?: super.readPong(client, msg)
}
private var readPong4: ((client: T, msg: ByteArray) -> Unit)? = null
var readPong4: ((client: T, msg: ByteArray) -> Unit)? = null
@JvmName("readPong4")
fun readPong(readMessage: (client: T, msg: ByteArray) -> Unit) {
@ -204,7 +250,7 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
readPong4?.also { it(client, msg) } ?: super.readPong(client, msg)
}
private var readPong5: ((client: T, msg: String) -> Unit)? = null
var readPong5: ((client: T, msg: String) -> Unit)? = null
@JvmName("readPong5")
fun readPong(readMessage: (client: T, msg: String) -> Unit) {
@ -214,4 +260,4 @@ open class AbstractWebSocketHandler<T : WebSocketClient<T, H>, H : WebSocketHand
override fun readPong(client: T, msg: String) {
readPong5?.also { it(client, msg) } ?: super.readPong(client, msg)
}
}
}

View File

@ -3,7 +3,9 @@ package cn.tursom.core.ws
import cn.tursom.core.ShutdownHook
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.NettyByteBuffer
import cn.tursom.core.notifyAll
import cn.tursom.core.uncheckedCast
import cn.tursom.core.wait
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
@ -20,6 +22,8 @@ import io.netty.handler.logging.LoggingHandler
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.util.InsecureTrustManagerFactory
import java.net.URI
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicInteger
@Suppress("unused")
@ -38,8 +42,11 @@ open class WebSocketClient<in T : WebSocketClient<T, H>, H : WebSocketHandler<T,
private val uri: URI = URI.create(url)
var ch: Channel? = null
internal set
var closed: Boolean = false
private set
init {
uncheckedCast<T>()
ShutdownHook.addHook {
close()
}
@ -82,7 +89,7 @@ open class WebSocketClient<in T : WebSocketClient<T, H>, H : WebSocketHandler<T,
)
val handler = WebSocketClientChannelHandler(uncheckedCast(), handler, autoRelease)
val bootstrap = Bootstrap()
bootstrap.group(group)
.group(group)
.channel(NioSocketChannel::class.java)
.handler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(ch: SocketChannel) {
@ -196,12 +203,24 @@ open class WebSocketClient<in T : WebSocketClient<T, H>, H : WebSocketHandler<T,
}
open fun onOpen() {
closed = false
}
open fun onClose() {
closed = true
notifyAll { }
}
fun waitClose() {
if (!closed) wait { }
}
companion object {
val group: EventLoopGroup = NioEventLoopGroup()
private val threadId = AtomicInteger()
private val group: EventLoopGroup = NioEventLoopGroup(0, ThreadFactory {
val thread = Thread(it, "WebSocketClient-${threadId.incrementAndGet()}")
thread.isDaemon = true
thread
})
}
}