From 68648b1f7742761e335f432e65795a6938c99aad Mon Sep 17 00:00:00 2001 From: tursom Date: Fri, 8 Apr 2022 20:28:01 +0800 Subject: [PATCH] update --- settings.gradle.kts | 1 + .../cn/tursom/core/context/ArrayContextEnv.kt | 8 +- ts-core/ts-async-http/build.gradle.kts | 9 +- .../cn/tursom/http/client/AsyncHttpRequest.kt | 329 ++++++------------ .../kotlin/cn/tursom/http/client/OkHttpGet.kt | 32 ++ .../cn/tursom/http/client/OkHttpPost.kt | 156 +++++++++ .../kotlin/cn/tursom/http/client/okhttp.kt | 108 ++++++ ts-core/ts-proxy/build.gradle.kts | 5 +- .../cn/tursom/proxy/ListProxyContainer.kt | 21 +- .../cn/tursom/proxy/MutableProxyContainer.kt | 3 +- .../src/main/kotlin/cn/tursom/proxy/Proxy.kt | 20 +- .../kotlin/cn/tursom/proxy/ProxyContainer.kt | 59 ++-- .../proxy/ProxyContainerHandlerCache.kt | 6 +- .../cn/tursom/proxy/ProxyInterceptor.kt | 23 +- .../kotlin/cn/tursom/proxy/ProxyMethod.kt | 54 ++- .../cn/tursom/proxy/ProxyMethodCache.kt | 32 ++ .../kotlin/cn/tursom/proxy/ProxyResult.kt | 17 + .../kotlin/cn/tursom/proxy/ProxyRunner.kt | 61 +++- .../tursom/proxy/annotation/ForFirstProxy.kt | 1 + .../test/kotlin/cn/tursom/proxy/Example.kt | 29 +- .../cn/tursom/reflect/asm/ReflectAsmUtils.kt | 2 - .../kotlin/cn/tursom/web/ChunkedMessage.kt | 12 + .../main/kotlin/cn/tursom/web/HttpContent.kt | 11 +- .../kotlin/cn/tursom/web/client/HttpClient.kt | 5 + .../cn/tursom/web/client/HttpRequest.kt | 25 ++ .../cn/tursom/web/client/HttpResponse.kt | 10 + .../tursom/web/client/HttpResponseStream.kt | 21 ++ .../kotlin/cn/tursom/web/utils/HttpVersion.kt | 5 + ts-web/ts-web-netty-client/build.gradle.kts | 20 ++ .../web/client/netty/HttpConnectionPool.kt | 48 +++ .../tursom/web/client/netty/HttpExecutor.kt | 65 ++++ .../web/client/netty/NettyHttpClient.kt | 26 ++ .../web/client/netty/NettyHttpConnection.kt | 18 + .../web/client/netty/NettyHttpRequest.kt | 66 ++++ .../web/client/netty/NettyHttpResponse.kt | 75 ++++ .../web/client/netty/NettyHttpResultResume.kt | 40 +++ .../cn/tursom/web/client/netty/netty.kt | 27 ++ 37 files changed, 1082 insertions(+), 368 deletions(-) create mode 100644 ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/OkHttpGet.kt create mode 100644 ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/OkHttpPost.kt create mode 100644 ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/okhttp.kt create mode 100644 ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyMethodCache.kt create mode 100644 ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyResult.kt create mode 100644 ts-web/src/main/kotlin/cn/tursom/web/ChunkedMessage.kt create mode 100644 ts-web/src/main/kotlin/cn/tursom/web/client/HttpClient.kt create mode 100644 ts-web/src/main/kotlin/cn/tursom/web/client/HttpRequest.kt create mode 100644 ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponse.kt create mode 100644 ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponseStream.kt create mode 100644 ts-web/src/main/kotlin/cn/tursom/web/utils/HttpVersion.kt create mode 100644 ts-web/ts-web-netty-client/build.gradle.kts create mode 100644 ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/HttpConnectionPool.kt create mode 100644 ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/HttpExecutor.kt create mode 100644 ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpClient.kt create mode 100644 ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpConnection.kt create mode 100644 ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpRequest.kt create mode 100644 ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResponse.kt create mode 100644 ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResultResume.kt create mode 100644 ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/netty.kt diff --git a/settings.gradle.kts b/settings.gradle.kts index 01f60f6..0c36add 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -26,6 +26,7 @@ include("ts-core:ts-reflectasm") include("ts-socket") include("ts-web") include("ts-web:ts-web-netty") +include("ts-web:ts-web-netty-client") include("ts-web:ts-web-coroutine") include("ts-database") include("ts-database:ts-ktorm") diff --git a/ts-core/src/main/kotlin/cn/tursom/core/context/ArrayContextEnv.kt b/ts-core/src/main/kotlin/cn/tursom/core/context/ArrayContextEnv.kt index b800e8b..4a00e57 100644 --- a/ts-core/src/main/kotlin/cn/tursom/core/context/ArrayContextEnv.kt +++ b/ts-core/src/main/kotlin/cn/tursom/core/context/ArrayContextEnv.kt @@ -20,16 +20,16 @@ class ArrayContextEnv : ContextEnv { override operator fun get(key: ContextKey): T? { checkEnv(key) - return if (array.size < key.id) { - null - } else { + return if (array.size > key.id) { array[key.id]?.uncheckedCast() + } else { + null } } override fun set(key: ContextKey, value: T): ArrayContext { checkEnv(key) - if (array.size < key.id) { + if (array.size <= key.id) { array = array.copyOf(idGenerator.get()) } array[key.id] = value diff --git a/ts-core/ts-async-http/build.gradle.kts b/ts-core/ts-async-http/build.gradle.kts index c018262..f6b54be 100644 --- a/ts-core/ts-async-http/build.gradle.kts +++ b/ts-core/ts-async-http/build.gradle.kts @@ -9,14 +9,13 @@ dependencies { api(project(":ts-core")) api(project(":ts-core:ts-buffer")) implementation(project(":ts-core:ts-xml")) - api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0") - compileOnly("com.squareup.okhttp3:okhttp:4.9.3") + api(group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version = "1.6.0") + api(group = "com.squareup.okhttp3", name = "okhttp", version = "4.9.3") + compileOnly(group = "io.netty", name = "netty-all", version = "4.1.72.Final") //api(group = "com.squareup.retrofit2", name = "converter-gson", version = "2.9.0") //api(group = "com.squareup.retrofit2", name = "retrofit", version = "2.9.0") // https://mvnrepository.com/artifact/org.jsoup/jsoup - api(group = "org.jsoup", name = "jsoup", version = "1.14.3") - - + //api(group = "org.jsoup", name = "jsoup", version = "1.14.3") testImplementation(project(":ts-core:ts-coroutine")) } diff --git a/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/AsyncHttpRequest.kt b/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/AsyncHttpRequest.kt index e6a615f..725f488 100644 --- a/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/AsyncHttpRequest.kt +++ b/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/AsyncHttpRequest.kt @@ -1,321 +1,192 @@ package cn.tursom.http.client -import okhttp3.* -import okhttp3.MediaType.Companion.toMediaTypeOrNull +import okhttp3.Call +import okhttp3.OkHttpClient +import okhttp3.RequestBody +import okhttp3.Response import java.io.File -import java.io.IOException -import java.net.InetSocketAddress -import java.net.Proxy -import java.net.SocketAddress -import java.net.URLEncoder -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException -import kotlin.coroutines.suspendCoroutine +@Deprecated("this object is deprecated. it will be removed on 2022/5") @Suppress("unused", "MemberVisibilityCanBePrivate") object AsyncHttpRequest { - val defaultClient: OkHttpClient = OkHttpClient().newBuilder() - .retryOnConnectionFailure(true) - .build() - val socketClient: OkHttpClient = proxyClient() - val httpProxyClient: OkHttpClient = - proxyClient(port = 8080, type = Proxy.Type.HTTP) + @Deprecated("it should replace by Okhttp.defaultClient", + ReplaceWith("Okhttp.default")) + var defaultClient: OkHttpClient by Okhttp::default - fun proxyClient( - host: String = "127.0.0.1", - port: Int = 1080, - type: Proxy.Type = Proxy.Type.SOCKS - ): OkHttpClient = OkHttpClient().newBuilder() - .proxy(Proxy(type, InetSocketAddress(host, port) as SocketAddress)) - .retryOnConnectionFailure(true) - .build() + @Deprecated("it should replace by Okhttp.socket", + ReplaceWith("Okhttp.socket")) + val socketClient: OkHttpClient by Okhttp::socket - suspend fun sendRequest(call: Call): Response = suspendCoroutine { - call.enqueue(object : Callback { - override fun onFailure(call: Call, e: IOException) { - it.resumeWithException(e) - } + @Deprecated("it should replace by Okhttp.httpProxy", + ReplaceWith("Okhttp.httpProxy")) + val httpProxyClient: OkHttpClient by Okhttp::httpProxy - override fun onResponse(call: Call, response: Response) { - it.resume(response) - } - }) - } + @Deprecated("it should replace by call.sendRequest()", + ReplaceWith("call.sendRequest()")) + suspend fun sendRequest(call: Call): Response = call.sendRequest() + @JvmOverloads + @Deprecated("it should replace by client.get()", + ReplaceWith("client.get(url, param, headers)")) suspend fun get( url: String, param: Map? = null, headers: Map? = null, - client: OkHttpClient = defaultClient - ): Response { - val paramSB = StringBuilder() - param?.forEach { - paramSB.append("${URLEncoder.encode(it.key, "UTF-8")}=${URLEncoder.encode(it.value, "UTF-8")}&") - } - if (paramSB.isNotEmpty()) - paramSB.deleteCharAt(paramSB.length - 1) + client: OkHttpClient = defaultClient, + ): Response = client.get(url, param, headers) - val requestBuilder = Request.Builder().get() - .url("$url?$paramSB") - - headers?.forEach { t, u -> - requestBuilder.addHeader(t, u) - } - - return sendRequest( - client.newCall( - requestBuilder.build() - ) - ) - } - - private suspend fun post( + @JvmOverloads + @Deprecated("it should replace by client.post()", + ReplaceWith("client.post(url, body, headers)")) + suspend fun post( url: String, body: RequestBody, headers: Map? = null, - client: OkHttpClient = defaultClient - ): Response { - val requestBuilder = Request.Builder() - .post(body) - .url(url) - - headers?.forEach { t, u -> - requestBuilder.addHeader(t, u) - } - - return sendRequest(client.newCall(requestBuilder.build())) - } + client: OkHttpClient = defaultClient, + ): Response = client.post(url, body, headers) + @JvmOverloads + @Deprecated("it should replace by client.post()", + ReplaceWith("client.post(url, param, headers)")) suspend fun post( url: String, param: Map, headers: Map? = null, - client: OkHttpClient = defaultClient - ): Response { - val formBuilder = FormBody.Builder() - param.forEach { (t, u) -> - formBuilder.add(t, u) - } - return post(url, formBuilder.build(), headers, client) - } + client: OkHttpClient = defaultClient, + ): Response = client.post(url, param, headers) + @JvmOverloads + @Deprecated("it should replace by client.post()", + ReplaceWith("client.post(url, body, headers)")) suspend fun post( url: String, body: String, headers: Map? = null, - client: OkHttpClient = defaultClient - ) = post( - url, - RequestBody.create("text/plain;charset=utf-8".toMediaTypeOrNull(), body), - headers, - client - ) + client: OkHttpClient = defaultClient, + ) = client.post(url, body, headers) + @JvmOverloads + @Deprecated("it should replace by client.post()", + ReplaceWith("client.post(url, body, headers)")) suspend fun post( url: String, body: File, headers: Map? = null, - client: OkHttpClient = defaultClient - ) = post( - url, - RequestBody.create("application/octet-stream".toMediaTypeOrNull(), body), - headers, - client - ) + client: OkHttpClient = defaultClient, + ) = client.post(url, body, headers) + @JvmOverloads + @Deprecated("it should replace by client.post()", + ReplaceWith("client.post(url, body, headers)")) suspend fun post( url: String, body: ByteArray, headers: Map? = null, - client: OkHttpClient = defaultClient - ) = post( - url, - RequestBody.create("application/octet-stream".toMediaTypeOrNull(), body), - headers, - client - ) - - suspend fun getStr( - url: String, - param: Map? = null, - headers: Map? = null - ): String { - return getStr(url, param, headers, defaultClient) - } + client: OkHttpClient = defaultClient, + ) = client.post(url, body, headers) @Suppress("BlockingMethodInNonBlockingContext") + @JvmOverloads + @Deprecated("it should replace by client.getStr()", + ReplaceWith("client.getStr(url, param, headers)")) suspend fun getStr( url: String, param: Map? = null, headers: Map? = null, - client: OkHttpClient - ): String { - val response = get(url, param, headers, client) - return response.body!!.string() - } + client: OkHttpClient = defaultClient, + ): String = client.getStr(url, param, headers) @Suppress("BlockingMethodInNonBlockingContext") - private suspend fun postStr( + @JvmOverloads + @Deprecated("it should replace by client.postStr()", + ReplaceWith("client.postStr(url, body, headers)")) + suspend fun postStr( url: String, body: RequestBody, headers: Map? = null, - client: OkHttpClient - ): String = post(url, body, headers, client).body!!.string() - - suspend fun postStr( - url: String, - param: Map, - headers: Map? = null - ): String = - postStr(url, param, headers, defaultClient) + client: OkHttpClient = defaultClient, + ): String = client.postStr(url, body, headers) + @JvmOverloads + @Deprecated("it should replace by client.postStr()", + ReplaceWith("client.postStr(url, param, headers)")) suspend fun postStr( url: String, param: Map, headers: Map? = null, - client: OkHttpClient - ): String { - val formBuilder = FormBody.Builder() - param.forEach { (t, u) -> - formBuilder.add(t, u) - } - return postStr(url, formBuilder.build(), headers, client) - } - - suspend fun postStr( - url: String, - body: String, - headers: Map? = null - ): String = - postStr(url, body, headers, defaultClient) + client: OkHttpClient = defaultClient, + ): String = client.postStr(url, param, headers) + @JvmOverloads + @Deprecated("it should replace by client.postStr()", + ReplaceWith("client.postStr(url, body, headers)")) suspend fun postStr( url: String, body: String, headers: Map? = null, - client: OkHttpClient - ): String = postStr( - url, - RequestBody.create("text/plain;charset=utf-8".toMediaTypeOrNull(), body), - headers, - client - ) - - suspend fun postStr( - url: String, - body: File, - headers: Map? = null - ): String = - postStr(url, body, headers, defaultClient) + client: OkHttpClient = defaultClient, + ): String = client.postStr(url, body, headers) + @JvmOverloads + @Deprecated("it should replace by client.postStr()", + ReplaceWith("client.postStr(url, body, headers)")) suspend fun postStr( url: String, body: File, headers: Map? = null, - client: OkHttpClient - ): String = postStr( - url, - RequestBody.create("application/octet-stream".toMediaTypeOrNull(), body), - headers, - client - ) - - suspend fun getByteArray( - url: String, - param: Map? = null, - headers: Map? = null - ): ByteArray = getByteArray( - url, - param, - headers, - defaultClient - ) + client: OkHttpClient = defaultClient, + ): String = client.postStr(url, body, headers) @Suppress("BlockingMethodInNonBlockingContext") + @JvmOverloads + @Deprecated("it should replace by client.getByteArray()", + ReplaceWith("client.getByteArray(url, param, headers)")) suspend fun getByteArray( url: String, param: Map? = null, headers: Map? = null, - client: OkHttpClient - ): ByteArray = get(url, param, headers, client).body!!.bytes() + client: OkHttpClient = defaultClient, + ): ByteArray = client.getByteArray(url, param, headers) @Suppress("BlockingMethodInNonBlockingContext") - private suspend fun postByteArray( + @JvmOverloads + @Deprecated("it should replace by client.postByteArray()", + ReplaceWith("client.postByteArray(url, body, headers)")) + suspend fun postByteArray( url: String, body: RequestBody, headers: Map? = null, - client: OkHttpClient - ): ByteArray = post(url, body, headers, client).body!!.bytes() - - - suspend fun postByteArray( - url: String, - param: Map, - headers: Map? = null - ): ByteArray = postByteArray( - url, - param, - headers, - defaultClient - ) + client: OkHttpClient = defaultClient, + ): ByteArray = client.postByteArray(url, body, headers) + @JvmOverloads + @Deprecated("it should replace by client.postByteArray()", + ReplaceWith("client.postByteArray(url, param, headers)")) suspend fun postByteArray( url: String, param: Map, headers: Map? = null, - client: OkHttpClient - ): ByteArray { - val formBuilder = FormBody.Builder() - param.forEach { (t, u) -> - formBuilder.add(t, u) - } - return postByteArray(url, formBuilder.build(), headers, client) - } - - suspend fun postByteArray( - url: String, - body: String, - headers: Map? = null - ): ByteArray = postByteArray( - url, - body, - headers, - defaultClient - ) + client: OkHttpClient = defaultClient, + ): ByteArray = client.postByteArray(url, param, headers) + @JvmOverloads + @Deprecated("it should replace by client.postByteArray()", + ReplaceWith("client.postByteArray(url, body, headers)")) suspend fun postByteArray( url: String, body: String, headers: Map? = null, - client: OkHttpClient - ): ByteArray = postByteArray( - url, - RequestBody.create("text/plain;charset=utf-8".toMediaTypeOrNull(), body), - headers, - client - ) - - suspend fun postByteArray( - url: String, - body: File, - headers: Map? = null - ): ByteArray = postByteArray( - url, - body, - headers, - defaultClient - ) + client: OkHttpClient = defaultClient, + ): ByteArray = client.postByteArray(url, body, headers) + @JvmOverloads + @Deprecated("it should replace by client.postByteArray()", + ReplaceWith("client.postByteArray(url, body, headers)")) suspend fun postByteArray( url: String, body: File, headers: Map? = null, - client: OkHttpClient - ): ByteArray = postByteArray( - url, - RequestBody.create("application/octet-stream".toMediaTypeOrNull(), body), - headers, - client - ) + client: OkHttpClient = defaultClient, + ): ByteArray = client.postByteArray(url, body, headers) } \ No newline at end of file diff --git a/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/OkHttpGet.kt b/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/OkHttpGet.kt new file mode 100644 index 0000000..13222e8 --- /dev/null +++ b/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/OkHttpGet.kt @@ -0,0 +1,32 @@ +package cn.tursom.http.client + +import okhttp3.Call +import okhttp3.Response + +@JvmOverloads +suspend fun Call.Factory.get( + url: String, + param: Map? = null, + headers: Map? = null, +): Response = newCall { + url(url) { + addQueryParameters(param) + } + addHeaders(headers) +}.sendRequest() + +@Suppress("BlockingMethodInNonBlockingContext") +@JvmOverloads +suspend fun Call.Factory.getStr( + url: String, + param: Map? = null, + headers: Map? = null, +): String = get(url, param, headers).body!!.string() + +@Suppress("BlockingMethodInNonBlockingContext") +@JvmOverloads +suspend fun Call.Factory.getByteArray( + url: String, + param: Map? = null, + headers: Map? = null, +): ByteArray = get(url, param, headers).body!!.bytes() diff --git a/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/OkHttpPost.kt b/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/OkHttpPost.kt new file mode 100644 index 0000000..470b52c --- /dev/null +++ b/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/OkHttpPost.kt @@ -0,0 +1,156 @@ +package cn.tursom.http.client + +import okhttp3.* +import okhttp3.MediaType.Companion.toMediaTypeOrNull +import okhttp3.RequestBody.Companion.asRequestBody +import okhttp3.RequestBody.Companion.toRequestBody +import java.io.File + +@JvmOverloads +@OkhttpMaker +suspend inline fun Call.Factory.post( + url: String, + headers: Map? = null, + body: Request.Builder.() -> RequestBody, +): Response = newCall { + post(body()) + url(url) + addHeaders(headers) +}.sendRequest() + +@JvmOverloads +suspend fun Call.Factory.post( + url: String, + body: RequestBody, + headers: Map? = null, +): Response = newCall { + post(body) + url(url) + addHeaders(headers) +}.sendRequest() + +@JvmOverloads +suspend fun Call.Factory.post( + url: String, + param: Map, + headers: Map? = null, +): Response = post(url, headers) { + form { + add(param) + } +} + +@JvmOverloads +suspend fun Call.Factory.post( + url: String, + body: String, + headers: Map? = null, +) = post(url, headers) { + body.toRequestBody("text/plain;charset=utf-8".toMediaTypeOrNull()) +} + +@JvmOverloads +suspend fun Call.Factory.post( + url: String, + body: File, + headers: Map? = null, +) = post(url, headers) { + body.asRequestBody("application/octet-stream".toMediaTypeOrNull()) +} + +@JvmOverloads +suspend fun Call.Factory.post( + url: String, + body: ByteArray, + headers: Map? = null, +) = post(url, headers) { + body.toRequestBody("application/octet-stream".toMediaTypeOrNull()) +} + +@Suppress("BlockingMethodInNonBlockingContext") +@JvmOverloads +@OkhttpMaker +suspend inline fun Call.Factory.postStr( + url: String, + headers: Map? = null, + body: Request.Builder.() -> RequestBody, +): String = post(url, headers, body).body!!.string() + +@Suppress("BlockingMethodInNonBlockingContext") +@JvmOverloads +suspend fun Call.Factory.postStr( + url: String, + body: RequestBody, + headers: Map? = null, +): String = post(url, body, headers).body!!.string() + +@JvmOverloads +suspend fun Call.Factory.postStr( + url: String, + param: Map, + headers: Map? = null, +): String = postStr(url, headers) { + FormBody.Builder().add(param).build() +} + +@JvmOverloads +suspend fun Call.Factory.postStr( + url: String, + body: String, + headers: Map? = null, +): String = postStr(url, headers) { + body.toRequestBody("text/plain;charset=utf-8".toMediaTypeOrNull()) +} + +@JvmOverloads +suspend fun Call.Factory.postStr( + url: String, + body: File, + headers: Map? = null, +): String = postStr(url, headers) { + body.asRequestBody("application/octet-stream".toMediaTypeOrNull()) +} + +@Suppress("BlockingMethodInNonBlockingContext") +@JvmOverloads +@OkhttpMaker +suspend inline fun Call.Factory.postByteArray( + url: String, + headers: Map? = null, + body: Request.Builder.() -> RequestBody, +): ByteArray = post(url, headers, body).body!!.bytes() + +@Suppress("BlockingMethodInNonBlockingContext") +@JvmOverloads +suspend fun Call.Factory.postByteArray( + url: String, + body: RequestBody, + headers: Map? = null, +): ByteArray = post(url, body, headers).body!!.bytes() + +@JvmOverloads +suspend fun Call.Factory.postByteArray( + url: String, + param: Map, + headers: Map? = null, +): ByteArray = postByteArray(url, headers) { + FormBody.Builder().add(param).build() +} + +@JvmOverloads +suspend fun Call.Factory.postByteArray( + url: String, + body: String, + headers: Map? = null, +): ByteArray = postByteArray(url, headers) { + body.toRequestBody("text/plain;charset=utf-8".toMediaTypeOrNull()) +} + +@JvmOverloads +suspend fun Call.Factory.postByteArray( + url: String, + body: File, + headers: Map? = null, +): ByteArray = postByteArray(url, headers) { + body.asRequestBody("application/octet-stream".toMediaTypeOrNull()) +} diff --git a/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/okhttp.kt b/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/okhttp.kt new file mode 100644 index 0000000..436b164 --- /dev/null +++ b/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/okhttp.kt @@ -0,0 +1,108 @@ +@file:Suppress("unused") + +package cn.tursom.http.client + +import okhttp3.* +import okhttp3.HttpUrl.Companion.toHttpUrl +import java.io.IOException +import java.net.InetSocketAddress +import java.net.Proxy +import java.net.SocketAddress +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +@DslMarker +@Retention(AnnotationRetention.BINARY) +annotation class OkhttpMaker + +@OkhttpMaker +object Okhttp : Call.Factory, WebSocket.Factory { + val direct: OkHttpClient = OkHttpClient().newBuilder() + .retryOnConnectionFailure(true) + .build() + val socket: OkHttpClient = proxy() + val httpProxy: OkHttpClient = proxy(port = 8080, type = Proxy.Type.HTTP) + + var default: OkHttpClient = direct + + override fun newCall(request: Request): Call = default.newCall(request) + override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket = + default.newWebSocket(request, listener) + + @JvmOverloads + fun proxy( + host: String = "127.0.0.1", + port: Int = 1080, + type: Proxy.Type = Proxy.Type.SOCKS, + builder: OkHttpClient.Builder = OkHttpClient().newBuilder(), + ): OkHttpClient = builder + .proxy(Proxy(type, InetSocketAddress(host, port) as SocketAddress)) + .retryOnConnectionFailure(true) + .build() +} + +@OkhttpMaker +inline fun B.url(url: String? = null, builder: HttpUrl.Builder.() -> Unit): B = apply { + val urlBuilder = url?.toHttpUrl()?.newBuilder() ?: HttpUrl.Builder() + + urlBuilder.builder() + url(urlBuilder.build()) +} + +fun B.addHeaders(headers: Map?): B = apply { + headers?.forEach { (k, v) -> + addHeader(k, v) + } +} + +inline fun Request.Builder.form(builder: FormBody.Builder.() -> Unit): FormBody = + FormBody.Builder().build(builder) + +fun HttpUrl.Builder.addQueryParameters(params: Map?) { + params?.forEach { (k, v) -> + addQueryParameter(k, v) + } +} + +@OkhttpMaker +inline infix fun FormBody.Builder.build(builder: FormBody.Builder.() -> Unit): FormBody { + val form = FormBody.Builder() + form.builder() + return form.build() +} + +fun FormBody.Builder.add(forms: Map?) = apply { + forms?.forEach { (k, v) -> + add(k, v) + } +} + +fun FormBody.Builder.addEncoded(forms: Map?) = apply { + forms?.forEach { (k, v) -> + addEncoded(k, v) + } +} + +suspend fun Call.sendRequest(): Response = suspendCoroutine { + enqueue(object : Callback { + override fun onFailure(call: Call, e: IOException) { + it.resumeWithException(e) + } + + override fun onResponse(call: Call, response: Response) { + it.resume(response) + } + }) +} + +suspend fun Call.str() = sendRequest().body!!.string() +suspend fun Call.bytes() = sendRequest().body!!.bytes() + +@OkhttpMaker +inline fun WebSocket.Factory.newWebSocket(listener: WebSocketListener, builder: Request.Builder.() -> Unit): WebSocket = + newWebSocket(Request.Builder().apply(builder).build(), listener) + +@OkhttpMaker +inline fun Call.Factory.newCall(builder: Request.Builder.() -> Unit) = + newCall(Request.Builder().apply(builder).build()) diff --git a/ts-core/ts-proxy/build.gradle.kts b/ts-core/ts-proxy/build.gradle.kts index ad20fb1..684afd9 100644 --- a/ts-core/ts-proxy/build.gradle.kts +++ b/ts-core/ts-proxy/build.gradle.kts @@ -9,8 +9,9 @@ plugins { dependencies { api(project(":ts-core")) - api("cglib", "cglib", "3.3.0") - implementation("org.apache.commons", "commons-lang3", "3.8.1") + api(project(":ts-core:ts-reflectasm")) + api(group = "cglib", name = "cglib", version = "3.3.0") + implementation(group = "org.apache.commons", name = "commons-lang3", version = "3.8.1") testApi(group = "junit", name = "junit", version = "4.13.2") } diff --git a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ListProxyContainer.kt b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ListProxyContainer.kt index caf4f03..c63322e 100644 --- a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ListProxyContainer.kt +++ b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ListProxyContainer.kt @@ -1,27 +1,32 @@ package cn.tursom.proxy +import cn.tursom.core.context.Context -class ListProxyContainer : MutableProxyContainer { - private val proxyList: MutableList = ArrayList() - override fun addProxy(proxy: ProxyMethod): Int { +class ListProxyContainer( + private val proxyList: MutableCollection = ArrayList(), +) : MutableProxyContainer { + override var lastModify: Long = System.currentTimeMillis() + private set + override val context: Context = ProxyContainer.contextEnv.newContext() + + override fun addProxy(proxy: ProxyMethod) { + lastModify = System.currentTimeMillis() proxyList.add(proxy) - return proxyList.size - 1 } override fun addAllProxy(proxy: Collection?): Boolean { + lastModify = System.currentTimeMillis() return proxyList.addAll(proxy!!) } override fun removeProxy(proxy: ProxyMethod) { + lastModify = System.currentTimeMillis() proxyList.remove(proxy) } - override fun removeProxy(index: Int) { - proxyList.removeAt(index) - } - override fun iterator(): MutableIterator { + lastModify = System.currentTimeMillis() return proxyList.iterator() } } \ No newline at end of file diff --git a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/MutableProxyContainer.kt b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/MutableProxyContainer.kt index b34658e..444d5be 100644 --- a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/MutableProxyContainer.kt +++ b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/MutableProxyContainer.kt @@ -1,8 +1,7 @@ package cn.tursom.proxy interface MutableProxyContainer : ProxyContainer { - fun addProxy(proxy: ProxyMethod): Int + fun addProxy(proxy: ProxyMethod) fun addAllProxy(proxy: Collection?): Boolean fun removeProxy(proxy: ProxyMethod) - fun removeProxy(index: Int) } \ No newline at end of file diff --git a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/Proxy.kt b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/Proxy.kt index 7d3608d..e521f08 100644 --- a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/Proxy.kt +++ b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/Proxy.kt @@ -2,26 +2,32 @@ package cn.tursom.proxy import cn.tursom.core.uncheckedCast import net.sf.cglib.proxy.Enhancer +import net.sf.cglib.proxy.Factory import java.util.concurrent.ConcurrentHashMap object Proxy { + val enhancerMap = ConcurrentHashMap, Enhancer>() private val cache = ConcurrentHashMap, Class<*>>() private fun getTarget(clazz: Class): Class = cache.computeIfAbsent(clazz) { val enhancer = Enhancer() + enhancerMap[clazz] = enhancer enhancer.setSuperclass(clazz) enhancer.setCallbackType(ProxyInterceptor::class.java) enhancer.setCallbackFilter { 0 } enhancer.createClass() }.uncheckedCast() - operator fun get(clazz: Class, builder: (Class) -> T): Pair { + operator fun get( + clazz: Class, + builder: (Class) -> T, + ): Pair { val target = getTarget(clazz) val container = ListProxyContainer() - synchronized(target) { - Enhancer.registerCallbacks(target, arrayOf(ProxyInterceptor(container))) - return builder(target) to container - } + val obj = builder(target) + obj as Factory + obj.setCallback(0, ProxyInterceptor(container)) + return obj to container } inline fun get() = get(T::class.java) @@ -30,13 +36,13 @@ object Proxy { arguments: Array, ) = get(T::class.java, argumentTypes, arguments) - operator fun get(clazz: Class): Pair = get(clazz, Class::newInstance) + operator fun get(clazz: Class) = get(clazz, Class::newInstance) operator fun get( clazz: Class, argumentTypes: Array>, arguments: Array, - ): Pair = get(clazz) { + ) = get(clazz) { it.getConstructor(*argumentTypes).newInstance(*arguments) } } \ No newline at end of file diff --git a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyContainer.kt b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyContainer.kt index 1c9cf3b..1d0f0a4 100644 --- a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyContainer.kt +++ b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyContainer.kt @@ -1,47 +1,30 @@ package cn.tursom.proxy -import cn.tursom.core.uncheckedCast - -data class ProxyResult( - val result: R, - val success: Boolean = false, -) { - companion object { - val failed: ProxyResult<*> = ProxyResult(null, false) - fun of(): ProxyResult { - return of(null) - } - - /** - * 返回一个临时使用的 Result 对象 - * 因为是临时对象,所以不要把这个对象放到任何当前函数堆栈以外的地方 - * 如果要长期储存对象请 new Result - */ - fun of(result: R): ProxyResult { - return ProxyResult(result, true) - } - - fun failed(): ProxyResult { - return failed.uncheckedCast() - } - } -} +import cn.tursom.core.context.ArrayContextEnv +import cn.tursom.core.context.Context interface ProxyContainer : Iterable { -} + companion object { + val contextEnv = ArrayContextEnv() -inline fun ProxyContainer.forEachProxy(action: (ProxyMethod) -> Unit) { - for (t in this) { - action(t) - } -} + inline fun ProxyContainer.forEachProxy(action: (ProxyMethod) -> Unit) { + for (t in this) { + action(t) + } + } -inline fun ProxyContainer.forFirstProxy(action: (ProxyMethod) -> ProxyResult?): ProxyResult { - for (t in this) { - val result = action(t) - if (result != null && result.success) { - return result + inline fun ProxyContainer.forFirstProxy(action: (ProxyMethod) -> ProxyResult): ProxyResult { + for (t in this) { + val result = action(t) + if (result.success) { + return result + } + } + return ProxyResult.failed() } } - return ProxyResult.failed() + + // to impl, use override val context: Context = ProxyContainer.contextEnv.newContext() + val context: Context + val lastModify: Long } \ No newline at end of file diff --git a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyContainerHandlerCache.kt b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyContainerHandlerCache.kt index 461b0bd..1ac35a0 100644 --- a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyContainerHandlerCache.kt +++ b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyContainerHandlerCache.kt @@ -5,19 +5,19 @@ import java.lang.reflect.Method import java.util.concurrent.ConcurrentHashMap object ProxyContainerHandlerCache { - private val handlerMap: MutableMap, MethodProxy) -> ProxyResult> = + private val handlerMap: MutableMap, MethodProxy) -> ProxyResult> = ConcurrentHashMap() val callSuper = { obj: Any, _: ProxyContainer, _: Method, args: Array, proxy: MethodProxy -> ProxyResult.of(proxy.invokeSuper(obj, args)) } val empty = { _: Any, _: ProxyContainer, _: Method, _: Array, _: MethodProxy -> ProxyResult.failed() } - fun getHandler(method: Method): ((Any, ProxyContainer, Method, Array, MethodProxy) -> ProxyResult)? { + fun getHandler(method: MethodProxy): ((Any, ProxyContainer, Method, Array, MethodProxy) -> ProxyResult)? { return handlerMap[method] } fun setHandler( - method: Method, + method: MethodProxy, onProxy: ((Any, ProxyContainer, Method, Array, MethodProxy) -> ProxyResult)?, ) { handlerMap[method] = onProxy ?: callSuper diff --git a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyInterceptor.kt b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyInterceptor.kt index 48f1b5e..ace0992 100644 --- a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyInterceptor.kt +++ b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyInterceptor.kt @@ -8,9 +8,10 @@ import java.util.* class ProxyInterceptor( private val container: ProxyContainer = ListProxyContainer(), + private val supportCallSuper: Boolean = true, ) : MethodInterceptor { companion object { - private val HANDLE_DEQUE_THREAD_LOCAL = ThreadLocal>() + val callSuper = ThreadLocal() private val parameterTypes = arrayOf(Method::class.java, Array::class.java, MethodProxy::class.java) private val parameterTypesField: Field = Method::class.java.getDeclaredField("parameterTypes").apply { isAccessible = true @@ -26,18 +27,22 @@ class ProxyInterceptor( } fun isOnProxyMethod(method: Method): Boolean { - return equalsMethod(method, "onProxy", parameterTypes) + //return callSuper.get() == true || equalsMethod(method, "onProxy", parameterTypes) + return callSuper.get() == true } } - @Throws(Throwable::class) override fun intercept(obj: Any, method: Method, args: Array, proxy: MethodProxy): Any? { - if (!isOnProxyMethod(method)) { - val result = ProxyRunner.onProxy(obj, container, method, args, proxy) - if (result != null && result.success) { - return result.result - } + if (supportCallSuper && callSuper.get() == true) { + callSuper.remove() + return proxy.invokeSuper(obj, args) + } + + val result = ProxyRunner.onProxy(obj, container, method, args, proxy) + return if (result.success) { + result.result + } else { + proxy.invokeSuper(obj, args) } - return proxy.invokeSuper(obj, args) } } \ No newline at end of file diff --git a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyMethod.kt b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyMethod.kt index e79b5a3..5142dae 100644 --- a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyMethod.kt +++ b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyMethod.kt @@ -2,23 +2,44 @@ package cn.tursom.proxy import cn.tursom.proxy.annotation.ForEachProxy import cn.tursom.proxy.annotation.ForFirstProxy +import cn.tursom.reflect.asm.ReflectAsmUtils import net.sf.cglib.proxy.MethodProxy import java.lang.reflect.Method import java.util.concurrent.ConcurrentHashMap interface ProxyMethod { @Throws(Throwable::class) - fun onProxy(obj: Any?, method: Method, args: Array, proxy: MethodProxy): ProxyResult<*>? { - val selfMethod: Method + fun onProxy(obj: Any?, method: Method, args: Array, proxy: MethodProxy): ProxyResult<*> { val handlerCacheMap = getHandlerCacheMap(javaClass) val methodResult = handlerCacheMap[method] if (methodResult != null) { return if (methodResult.success) { - ProxyResult.of(methodResult.result(this, *args)) + ProxyResult.of(methodResult.result(this, args)) } else { ProxyResult.failed() } } + + val reflectAsmMethod = try { + ReflectAsmUtils.getMethod( + javaClass, + method.name, + paramTypes = method.parameterTypes, + returnType = method.returnType, + ) + } catch (e: Exception) { + e.printStackTrace() + null + } + if (reflectAsmMethod != null) { + val (methodAccess, index) = reflectAsmMethod + handlerCacheMap[method] = ProxyResult({ p, a -> + methodAccess.invoke(p, index, *a) + }, true) + return ProxyResult.of(methodAccess.invoke(this, index, *args)) + } + + val selfMethod: Method try { var methodName = method.name for (annotation in method.annotations) { @@ -36,19 +57,24 @@ interface ProxyMethod { } selfMethod = javaClass.getMethod(methodName, *method.parameterTypes) selfMethod.isAccessible = true - handlerCacheMap[method] = ProxyResult(selfMethod, true) - } catch (e: Exception) { - handlerCacheMap[method] = ProxyResult.failed() - return ProxyResult.failed() + handlerCacheMap[method] = ProxyResult({ p, a -> + selfMethod(p, *a) + }, true) + return ProxyResult.of(selfMethod(this, *args)) + } catch (_: Exception) { } - return ProxyResult.of(selfMethod(this, *args)) + + handlerCacheMap[method] = ProxyResult.failed() + return ProxyResult.failed() } companion object { - private val handlerCacheMapMap: MutableMap, MutableMap>> = + private val handlerCacheMapMap: MutableMap< + Class, + MutableMap) -> Any?>>> = HashMap() - fun getHandlerCacheMap(type: Class): MutableMap> { + fun getHandlerCacheMap(type: Class): MutableMap) -> Any?>> { var handlerCacheMap = handlerCacheMapMap[type] if (handlerCacheMap == null) synchronized(handlerCacheMapMap) { handlerCacheMap = handlerCacheMapMap[type] @@ -59,13 +85,5 @@ interface ProxyMethod { } return handlerCacheMap!! } - - fun getProxyMethod(clazz: Class<*>, name: String, vararg parameterTypes: Class<*>): Method? { - return try { - clazz.getDeclaredMethod(name, *parameterTypes) - } catch (e: NoSuchMethodException) { - throw RuntimeException(e) - } - } } } \ No newline at end of file diff --git a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyMethodCache.kt b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyMethodCache.kt new file mode 100644 index 0000000..3a57a6e --- /dev/null +++ b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyMethodCache.kt @@ -0,0 +1,32 @@ +package cn.tursom.proxy + +import net.sf.cglib.proxy.MethodProxy +import java.lang.reflect.Method + +typealias ProxyMethodCacheFunction = (obj: Any?, method: Method, args: Array, proxy: MethodProxy) -> ProxyResult<*> + +class ProxyMethodCache( + private var lastModify: Long = 0, + private var function: ProxyMethodCacheFunction = failed, +) { + companion object { + val failed: ProxyMethodCacheFunction = { _, _, _, _ -> ProxyResult.failed } + } + + fun update(lastModify: Long, function: ProxyMethodCacheFunction = this.function) { + this.lastModify = lastModify + this.function = function + } + + operator fun invoke( + lastModify: Long, + obj: Any?, + method: Method, + args: Array, + proxy: MethodProxy, + ): ProxyResult<*>? = if (lastModify != this.lastModify) { + null + } else { + function(obj, method, args, proxy) + } +} \ No newline at end of file diff --git a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyResult.kt b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyResult.kt new file mode 100644 index 0000000..fddc51f --- /dev/null +++ b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyResult.kt @@ -0,0 +1,17 @@ +package cn.tursom.proxy + +import cn.tursom.core.uncheckedCast + +data class ProxyResult( + val result: R, + val success: Boolean = false, + val cache: Boolean = false, +) { + companion object { + val success: ProxyResult<*> = ProxyResult(null, true) + val failed: ProxyResult<*> = ProxyResult(null, false) + fun of(): ProxyResult = success.uncheckedCast() + fun of(result: R): ProxyResult = ProxyResult(result, true) + fun failed(): ProxyResult = failed.uncheckedCast() + } +} diff --git a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyRunner.kt b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyRunner.kt index 3eb2c23..fb89881 100644 --- a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyRunner.kt +++ b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/ProxyRunner.kt @@ -1,5 +1,7 @@ package cn.tursom.proxy +import cn.tursom.proxy.ProxyContainer.Companion.forEachProxy +import cn.tursom.proxy.ProxyContainer.Companion.forFirstProxy import cn.tursom.proxy.annotation.ForEachProxy import cn.tursom.proxy.annotation.ForFirstProxy import net.sf.cglib.proxy.MethodProxy @@ -7,8 +9,13 @@ import org.apache.commons.lang3.StringUtils import java.lang.reflect.Method import java.util.* + object ProxyRunner { private val errMsgSearchList = arrayOf("%M", "%B", "%A") + private val forFirstProxyCacheKey = + ProxyContainer.contextEnv.newKey().withDefault { ProxyMethodCache() } + private val forEachProxyCacheKey = + ProxyContainer.contextEnv.newKey().withDefault { ProxyMethodCache() } /** * will be call when proxy method invoke. @@ -16,7 +23,7 @@ object ProxyRunner { */ @Throws(Throwable::class) fun onProxy(obj: Any, c: ProxyContainer, method: Method, args: Array, proxy: MethodProxy): ProxyResult<*> { - var handler = ProxyContainerHandlerCache.getHandler(method) + var handler = ProxyContainerHandlerCache.getHandler(proxy) if (handler != null) { return handler(obj, c, method, args, proxy) } @@ -34,18 +41,20 @@ object ProxyRunner { //handler = ProxyContainerHandlerCache.callSuper handler = onForFirstProxy(ForFirstProxy.EMPTY) } - ProxyContainerHandlerCache.setHandler(method, handler) + ProxyContainerHandlerCache.setHandler(proxy, handler) return handler(obj, c, method, args, proxy) } - fun onForFirstProxy(forFirstProxy: ForFirstProxy) = - { o: Any, c: ProxyContainer, m: Method, a: Array, p: MethodProxy -> - onForFirstProxy(o, c, m, a, p, forFirstProxy, when (forFirstProxy.value.size) { - 0 -> emptyList() - 1 -> listOf(forFirstProxy.value[0].java) - else -> forFirstProxy.value.asSequence().map { it.java }.toSet() - }) + private fun onForFirstProxy(forFirstProxy: ForFirstProxy): (Any, ProxyContainer, Method, Array, MethodProxy) -> ProxyResult<*> { + return { o: Any, c: ProxyContainer, m: Method, a: Array, p: MethodProxy -> + c.context[forFirstProxyCacheKey](c.lastModify, o, m, a, p) ?: onForFirstProxy(o, c, m, a, p, forFirstProxy, + when (forFirstProxy.value.size) { + 0 -> emptyList() + 1 -> listOf(forFirstProxy.value[0].java) + else -> forFirstProxy.value.asSequence().map { it.java }.toSet() + }) } + } private fun onForFirstProxy( obj: Any, @@ -56,9 +65,14 @@ object ProxyRunner { forFirstProxy: ForFirstProxy, classes: Collection>, ): ProxyResult<*> { + val cache = container.context[forFirstProxyCacheKey] val result = container.forFirstProxy { p -> if (classes.isEmpty() || classes.stream().anyMatch { c: Class<*> -> c.isInstance(p) }) { - return@forFirstProxy p.onProxy(obj, method, args, proxy) + val result = p.onProxy(obj, method, args, proxy) + if (forFirstProxy.cache && result.success && result.cache) { + cache.update(container.lastModify, p::onProxy) + } + return@forFirstProxy result } else { return@forFirstProxy ProxyResult.failed } @@ -87,7 +101,16 @@ object ProxyRunner { } errMsg = StringUtils.replaceEach(errMsg, errMsgSearchList, replacementList) - throw forFirstProxy.errClass.java.getConstructor(String::class.java).newInstance(errMsg) + val exceptionConstructor = forFirstProxy.errClass.java.getConstructor(String::class.java) + if (forFirstProxy.cache) { + cache.update(container.lastModify) { _, _, _, _ -> + throw exceptionConstructor.newInstance(errMsg) + } + } + throw exceptionConstructor.newInstance(errMsg) + } + if (forFirstProxy.cache) { + cache.update(container.lastModify) } return ProxyResult.failed } @@ -100,12 +123,14 @@ object ProxyRunner { private fun onForeachProxy( classes: Collection>, - ) = label@{ o: Any, c: ProxyContainer, m: Method, a: Array, proxy1: MethodProxy -> - c.forEachProxy { p -> - if (classes.isEmpty() || classes.any { c: Class<*> -> c.isInstance(p) }) { - p.onProxy(o, m, a, proxy1) + ): (Any, ProxyContainer, Method, Array, MethodProxy) -> ProxyResult { + return (label@{ o: Any, c: ProxyContainer, m: Method, a: Array, proxy1: MethodProxy -> + c.forEachProxy { p -> + if (classes.isEmpty() || classes.any { c: Class<*> -> c.isInstance(p) }) { + p.onProxy(o, m, a, proxy1) + } } - } - ProxyResult.failed() + ProxyResult.failed() + }) } -} \ No newline at end of file +} diff --git a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/annotation/ForFirstProxy.kt b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/annotation/ForFirstProxy.kt index 0d75068..36122ce 100644 --- a/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/annotation/ForFirstProxy.kt +++ b/ts-core/ts-proxy/src/main/kotlin/cn/tursom/proxy/annotation/ForFirstProxy.kt @@ -14,6 +14,7 @@ annotation class ForFirstProxy( val must: Boolean = false, val errMsg: String = "", val errClass: KClass = RuntimeException::class, + val cache: Boolean = true, ) { companion object { val EMPTY = ForFirstProxy() diff --git a/ts-core/ts-proxy/src/test/kotlin/cn/tursom/proxy/Example.kt b/ts-core/ts-proxy/src/test/kotlin/cn/tursom/proxy/Example.kt index 5c91b85..f5ce338 100644 --- a/ts-core/ts-proxy/src/test/kotlin/cn/tursom/proxy/Example.kt +++ b/ts-core/ts-proxy/src/test/kotlin/cn/tursom/proxy/Example.kt @@ -1,31 +1,32 @@ package cn.tursom.proxy -import cn.tursom.proxy.Example.GetA -import cn.tursom.proxy.annotation.ForEachProxy import org.junit.Test class Example { open class TestClass protected constructor() { - @get:ForEachProxy - open val a: Int = 0 + //@get:ForEachProxy + open var a: Int = 0 } - fun interface GetA : ProxyMethod { - fun getA(): Int + class GetA( + private val t: TestClass, + ) : ProxyMethod { + fun getA(): Int { + ProxyInterceptor.callSuper.set(true) + return t.a + 1 + } } @Test fun test() { - repeat(3) { - val (t, container) = Proxy.get() - container.addProxy(GetA { - println("on proxy method") - 0 - }) + val (t, container) = Proxy.get() + container.addProxy(GetA(t)) - println(t.javaClass) - println(t.a) + println(t.javaClass) + repeat(1000000000) { + t.a = t.a + //println(t.a) } } } \ No newline at end of file diff --git a/ts-core/ts-reflectasm/src/main/kotlin/cn/tursom/reflect/asm/ReflectAsmUtils.kt b/ts-core/ts-reflectasm/src/main/kotlin/cn/tursom/reflect/asm/ReflectAsmUtils.kt index d61872d..ff0818b 100644 --- a/ts-core/ts-reflectasm/src/main/kotlin/cn/tursom/reflect/asm/ReflectAsmUtils.kt +++ b/ts-core/ts-reflectasm/src/main/kotlin/cn/tursom/reflect/asm/ReflectAsmUtils.kt @@ -179,6 +179,4 @@ object ReflectAsmUtils { methodAccess.invoke(this, index, a1, a2, a3, a4, a5, a6) as R } } - - } diff --git a/ts-web/src/main/kotlin/cn/tursom/web/ChunkedMessage.kt b/ts-web/src/main/kotlin/cn/tursom/web/ChunkedMessage.kt new file mode 100644 index 0000000..d278a7b --- /dev/null +++ b/ts-web/src/main/kotlin/cn/tursom/web/ChunkedMessage.kt @@ -0,0 +1,12 @@ +package cn.tursom.web + +import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.web.utils.Chunked + +interface ChunkedMessage { + fun writeChunkedHeader() + fun addChunked(buffer: ByteBuffer) = addChunked { buffer } + fun addChunked(buffer: () -> ByteBuffer) + fun finishChunked() + fun finishChunked(chunked: Chunked) +} \ No newline at end of file diff --git a/ts-web/src/main/kotlin/cn/tursom/web/HttpContent.kt b/ts-web/src/main/kotlin/cn/tursom/web/HttpContent.kt index 7f64b6f..d95034d 100644 --- a/ts-web/src/main/kotlin/cn/tursom/web/HttpContent.kt +++ b/ts-web/src/main/kotlin/cn/tursom/web/HttpContent.kt @@ -2,14 +2,13 @@ package cn.tursom.web import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.urlDecode -import cn.tursom.web.utils.Chunked import org.slf4j.Logger import org.slf4j.LoggerFactory import java.io.File import java.io.RandomAccessFile import java.net.SocketAddress -interface HttpContent : ResponseHeaderAdapter, RequestHeaderAdapter { +interface HttpContent : ResponseHeaderAdapter, RequestHeaderAdapter, ChunkedMessage { val requestSendFully: Boolean val finished: Boolean val uri: String @@ -132,18 +131,12 @@ interface HttpContent : ResponseHeaderAdapter, RequestHeaderAdapter { fun deleteCookie(name: String, path: String = "/") = addCookie(name, "deleted; expires=Thu, 01 Jan 1970 00:00:00 GMT", path = path) - fun writeChunkedHeader() - fun addChunked(buffer: ByteBuffer) = addChunked { buffer } - fun addChunked(buffer: () -> ByteBuffer) - fun finishChunked() - fun finishChunked(chunked: Chunked) - fun finishFile(file: File, chunkSize: Int = 8192) fun finishFile( file: RandomAccessFile, offset: Long = 0, length: Long = file.length() - offset, - chunkSize: Int = 8192 + chunkSize: Int = 8192, ) fun jump(url: String) = temporaryMoved(url) diff --git a/ts-web/src/main/kotlin/cn/tursom/web/client/HttpClient.kt b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpClient.kt new file mode 100644 index 0000000..2658ae6 --- /dev/null +++ b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpClient.kt @@ -0,0 +1,5 @@ +package cn.tursom.web.client + +interface HttpClient { + suspend fun request(method: String, url: String, ssl: Boolean? = null): HttpRequest +} diff --git a/ts-web/src/main/kotlin/cn/tursom/web/client/HttpRequest.kt b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpRequest.kt new file mode 100644 index 0000000..c55d709 --- /dev/null +++ b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpRequest.kt @@ -0,0 +1,25 @@ +package cn.tursom.web.client + +import cn.tursom.core.buffer.ByteBuffer + +interface HttpRequest { + var version: String + var method: String + var path: String + + val params: Map> + fun addParam(key: String, value: String) + fun addParams(params: Map) { + params.forEach(::addParam) + } + + val headers: Iterable> + fun addHeader(key: String, value: Any) + fun addHeaders(headers: Map) { + headers.forEach(::addHeader) + } + + fun body(data: ByteBuffer) + + suspend fun send(): HttpResponse +} diff --git a/ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponse.kt b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponse.kt new file mode 100644 index 0000000..15085e2 --- /dev/null +++ b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponse.kt @@ -0,0 +1,10 @@ +package cn.tursom.web.client + +interface HttpResponse { + val code: Int + val reason: String + val headers: Iterable> + fun getHeader(key: String): String? + fun getHeaders(key: String): List + val body: HttpResponseStream +} diff --git a/ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponseStream.kt b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponseStream.kt new file mode 100644 index 0000000..9d33f6f --- /dev/null +++ b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponseStream.kt @@ -0,0 +1,21 @@ +package cn.tursom.web.client + +import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.impl.HeapByteBuffer +import java.io.Closeable + +interface HttpResponseStream : Closeable { + suspend fun skip(n: Long) + suspend fun read(): Int + suspend fun read(buffer: ByteBuffer) + suspend fun read( + buffer: ByteArray, + offset: Int = 0, + len: Int = buffer.size - offset, + ): Int { + val byteBuffer = HeapByteBuffer(buffer, offset, len) + byteBuffer.clear() + read(byteBuffer) + return byteBuffer.writePosition + } +} diff --git a/ts-web/src/main/kotlin/cn/tursom/web/utils/HttpVersion.kt b/ts-web/src/main/kotlin/cn/tursom/web/utils/HttpVersion.kt new file mode 100644 index 0000000..e594563 --- /dev/null +++ b/ts-web/src/main/kotlin/cn/tursom/web/utils/HttpVersion.kt @@ -0,0 +1,5 @@ +package cn.tursom.web.utils + +enum class HttpVersion { + HTTP_1_0, HTTP_1_1, +} \ No newline at end of file diff --git a/ts-web/ts-web-netty-client/build.gradle.kts b/ts-web/ts-web-netty-client/build.gradle.kts new file mode 100644 index 0000000..6408b98 --- /dev/null +++ b/ts-web/ts-web-netty-client/build.gradle.kts @@ -0,0 +1,20 @@ +plugins { + kotlin("jvm") + `maven-publish` + id("ts-gradle") +} + +dependencies { + api(project(":ts-core")) + api(project(":ts-core:ts-buffer")) + api(project(":ts-core:ts-log")) + api(project(":ts-web")) + api(project(":ts-web:ts-web-netty")) + api(group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version = "1.6.0") + api(group = "io.netty", name = "netty-all", version = "4.1.72.Final") + api(group = "org.slf4j", name = "slf4j-api", version = "1.7.32") +} + + + + diff --git a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/HttpConnectionPool.kt b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/HttpConnectionPool.kt new file mode 100644 index 0000000..6094388 --- /dev/null +++ b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/HttpConnectionPool.kt @@ -0,0 +1,48 @@ +package cn.tursom.web.client.netty + +import io.netty.handler.timeout.WriteTimeoutHandler +import kotlinx.coroutines.channels.Channel +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +class HttpConnectionPool( + host: String, + port: Int, + ssl: Boolean, + private val maxConn: Int = 5, +) { + companion object { + private data class PoolDesc(val host: String, val port: Int, val ssl: Boolean) + + private val poolCache = ConcurrentHashMap() + fun poolOf(host: String, port: Int, ssl: Boolean) = poolCache.getOrPut(PoolDesc(host, port, ssl)) { + HttpConnectionPool(host, port, ssl) + } + } + + private val group = HttpExecutor.group(host, port, ssl) { + it.attr(NettyHttpResultResume.countKey).set(conn) + it.pipeline() + .addLast(NettyHttpResultResume) + .addLast(WriteTimeoutHandler(60)) + } + private val pool = Channel(maxConn) + private val conn = AtomicInteger() + + suspend fun useConnection(handler: suspend (NettyHttpConnection) -> R): R { + val client = if (conn.getAndIncrement() < maxConn) { + group() + } else { + conn.decrementAndGet() + pool.receive() + } + val result = try { + handler(client) + } catch (e: Exception) { + client.channel.close() + throw e + } + pool.send(client) + return result + } +} diff --git a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/HttpExecutor.kt b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/HttpExecutor.kt new file mode 100644 index 0000000..63c4390 --- /dev/null +++ b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/HttpExecutor.kt @@ -0,0 +1,65 @@ +package cn.tursom.web.client.netty + +import io.netty.bootstrap.Bootstrap +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelInitializer +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.NioSocketChannel +import io.netty.handler.codec.http.HttpClientCodec +import io.netty.handler.ssl.SslContextBuilder +import io.netty.handler.ssl.util.InsecureTrustManagerFactory + +object HttpExecutor { + private val group = NioEventLoopGroup() + + fun group( + host: String, + port: Int, + ssl: Boolean, + initChannel: (SocketChannel) -> Unit = {}, + ): suspend () -> NettyHttpConnection { + val sslCtx = if (ssl) { + SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE).build() + } else { + null + } + val bootstrap = Bootstrap() + .group(group) + .channel(NioSocketChannel::class.java) + .handler(object : ChannelInitializer() { + override fun initChannel(ch: SocketChannel) { + ch.pipeline().apply { + if (sslCtx != null) { + addLast(sslCtx.newHandler(ch.alloc(), host, port)) + } + addLast(HttpClientCodec()) + } + initChannel(ch) + } + + override fun exceptionCaught(ctx: ChannelHandlerContext?, cause: Throwable?) { + super.exceptionCaught(ctx, cause) + } + }) + return { + val channelFuture = bootstrap.connect(host, port) + channelFuture.awaitSuspend() + HttpClientImpl(channelFuture.channel() as SocketChannel) + } + } + + suspend fun connect( + host: String, + port: Int, + ssl: Boolean, + initChannel: (SocketChannel) -> Unit = {}, + ): NettyHttpConnection { + return group(host, port, ssl, initChannel)() + } + + private class HttpClientImpl( + override val channel: SocketChannel, + ) : NettyHttpConnection +} diff --git a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpClient.kt b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpClient.kt new file mode 100644 index 0000000..88641f8 --- /dev/null +++ b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpClient.kt @@ -0,0 +1,26 @@ +package cn.tursom.web.client.netty + +import cn.tursom.web.client.HttpClient +import cn.tursom.web.client.HttpRequest +import java.net.URI + +open class NettyHttpClient : HttpClient { + override suspend fun request(method: String, url: String, ssl: Boolean?): HttpRequest { + val uri = URI.create(url) + val port = if (uri.port < 0) { + when (uri.scheme ?: "http") { + "http" -> 80 + "https" -> 443 + else -> -1 + } + } else { + uri.port + } + + val pool = HttpConnectionPool.poolOf(uri.host, port, uri.scheme == "https") + val request = NettyHttpRequest(pool) + request.method = method + request.path = uri.path + return request + } +} diff --git a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpConnection.kt b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpConnection.kt new file mode 100644 index 0000000..356a0dc --- /dev/null +++ b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpConnection.kt @@ -0,0 +1,18 @@ +package cn.tursom.web.client.netty + +import io.netty.channel.socket.SocketChannel +import io.netty.handler.codec.http.FullHttpRequest +import io.netty.handler.codec.http.HttpObject +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel + +interface NettyHttpConnection { + val channel: SocketChannel + + suspend fun request(request: FullHttpRequest): ReceiveChannel { + val ktChannel = Channel(Channel.UNLIMITED) + channel.attr(NettyHttpResultResume.recvChannelKey).set(ktChannel) + channel.writeAndFlush(request) + return ktChannel + } +} \ No newline at end of file diff --git a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpRequest.kt b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpRequest.kt new file mode 100644 index 0000000..e0b073f --- /dev/null +++ b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpRequest.kt @@ -0,0 +1,66 @@ +package cn.tursom.web.client.netty + +import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.impl.NettyByteBuffer +import cn.tursom.log.impl.Slf4jImpl +import cn.tursom.web.client.HttpRequest +import cn.tursom.web.client.HttpResponse +import io.netty.buffer.Unpooled +import io.netty.handler.codec.http.DefaultFullHttpRequest +import io.netty.handler.codec.http.FullHttpRequest +import io.netty.handler.codec.http.HttpMethod +import io.netty.handler.codec.http.HttpVersion + +class NettyHttpRequest( + private val pool: HttpConnectionPool, +) : HttpRequest { + companion object : Slf4jImpl() + + private var request: FullHttpRequest = DefaultFullHttpRequest(HttpVersion.HTTP_1_0, HttpMethod.GET, "") + + override var version: String + get() = request.protocolVersion().text() + set(value) { + request.protocolVersion = HttpVersion.valueOf(value) + } + + override var method: String + get() = request.method().name() + set(value) { + request.method = HttpMethod.valueOf(value) + } + override var path: String + get() = request.uri() + set(value) { + request.uri = value + } + override val params = HashMap>().withDefault { ArrayList() } + + override fun addParam(key: String, value: String) { + params[key]!!.add(value) + } + + override val headers: Iterable> + get() = request.headers() + + override fun addHeader(key: String, value: Any) { + request.headers().add(key, value) + } + + override fun body(data: ByteBuffer) { + val byteBuf = if (data is NettyByteBuffer) { + data.byteBuf + } else { + Unpooled.wrappedBuffer(data.getBytes()) + } + request = request.replace(byteBuf) + } + + override suspend fun send(): HttpResponse { + val receiveChannel = pool.useConnection { + it.request(request) + } + val httpResponse = receiveChannel.receive() as io.netty.handler.codec.http.HttpResponse + return NettyHttpResponse(httpResponse, receiveChannel) + } +} diff --git a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResponse.kt b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResponse.kt new file mode 100644 index 0000000..9d67031 --- /dev/null +++ b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResponse.kt @@ -0,0 +1,75 @@ +package cn.tursom.web.client.netty + +import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.impl.NettyByteBuffer +import cn.tursom.web.client.HttpResponse +import cn.tursom.web.client.HttpResponseStream +import io.netty.handler.codec.http.HttpContent +import io.netty.handler.codec.http.HttpObject +import kotlinx.coroutines.channels.ReceiveChannel + +class NettyHttpResponse( + private val response: io.netty.handler.codec.http.HttpResponse, + channel: ReceiveChannel, +) : HttpResponse { + override val code: Int + get() = response.status().code() + override val reason: String + get() = response.status().reasonPhrase() + override val headers get() = response.headers()!! + + override fun getHeader(key: String): String? = response.headers().get(key) + override fun getHeaders(key: String): List = response.headers().getAll(key) + + override val body: HttpResponseStream = NettyStream(response, channel) + + private class NettyStream( + response: HttpObject, + private val channel: ReceiveChannel, + ) : HttpResponseStream { + private var buffer: ByteBuffer? = if (response is HttpContent) { + NettyByteBuffer(response.content()) + } else { + null + } + + private suspend fun buffer(): ByteBuffer? { + if (buffer == null || buffer?.readable == 0) { + val receive = channel.receiveCatching() + buffer = if (receive.isSuccess) { + val content = receive.getOrThrow() as HttpContent + NettyByteBuffer(content.content()) + } else { + val e = receive.exceptionOrNull() + if (e != null) { + throw e + } + null + } + } + return buffer + } + + override suspend fun skip(n: Long) { + var skip = 0L + while (skip < n) { + val buffer = buffer() ?: return + skip += buffer.skip((n - skip).toInt()) + } + } + + override suspend fun read(): Int { + val buffer = buffer() ?: return -1 + return buffer.get().toInt() + } + + override suspend fun read(buffer: ByteBuffer) { + val buf = buffer() ?: return + buf.writeTo(buffer) + } + + override fun close() { + channel.cancel() + } + } +} \ No newline at end of file diff --git a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResultResume.kt b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResultResume.kt new file mode 100644 index 0000000..c2047cd --- /dev/null +++ b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResultResume.kt @@ -0,0 +1,40 @@ +package cn.tursom.web.client.netty + +import io.netty.channel.ChannelHandler +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.SimpleChannelInboundHandler +import io.netty.handler.codec.http.HttpObject +import io.netty.handler.codec.http.LastHttpContent +import io.netty.util.AttributeKey +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.channels.trySendBlocking +import java.util.concurrent.atomic.AtomicInteger + +@ChannelHandler.Sharable +object NettyHttpResultResume : SimpleChannelInboundHandler() { + val recvChannelKey = AttributeKey.newInstance>("recvChannelKey")!! + val countKey = AttributeKey.newInstance("countKey")!! + override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) { + val channel = ctx.channel().attr(recvChannelKey).get() ?: return + val send = channel.trySendBlocking(msg) + if (send.isFailure || msg is LastHttpContent) { + channel.close() + ctx.channel().attr(recvChannelKey).set(null) + } + } + + @Suppress("OVERRIDE_DEPRECATION", "DEPRECATION") + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + val channel = ctx.channel().attr(recvChannelKey).get() + if (channel == null) { + super.exceptionCaught(ctx, cause) + return + } + channel.close(cause) + } + + override fun channelInactive(ctx: ChannelHandlerContext) { + ctx.channel().attr(countKey).get()?.decrementAndGet() + super.channelInactive(ctx) + } +} diff --git a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/netty.kt b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/netty.kt new file mode 100644 index 0000000..c8df28c --- /dev/null +++ b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/netty.kt @@ -0,0 +1,27 @@ +package cn.tursom.web.client.netty + +import io.netty.util.concurrent.Future +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +suspend fun Future.awaitSuspend() = suspendCoroutine { cont -> + addListener { + if (isSuccess) { + cont.resume(now) + } else { + cont.resumeWithException(cause()) + } + } +} + +suspend fun Future.awaitCancelable() = suspendCancellableCoroutine { cont -> + addListener { + if (isSuccess) { + cont.resume(now) + } else { + cont.resumeWithException(cause()) + } + } +}