This commit is contained in:
tursom 2022-04-08 20:28:01 +08:00
parent ebd7f40380
commit 68648b1f77
37 changed files with 1082 additions and 368 deletions

View File

@ -26,6 +26,7 @@ include("ts-core:ts-reflectasm")
include("ts-socket")
include("ts-web")
include("ts-web:ts-web-netty")
include("ts-web:ts-web-netty-client")
include("ts-web:ts-web-coroutine")
include("ts-database")
include("ts-database:ts-ktorm")

View File

@ -20,16 +20,16 @@ class ArrayContextEnv : ContextEnv {
override operator fun <T> get(key: ContextKey<T>): T? {
checkEnv(key)
return if (array.size < key.id) {
null
} else {
return if (array.size > key.id) {
array[key.id]?.uncheckedCast<T>()
} else {
null
}
}
override fun <T> set(key: ContextKey<T>, value: T): ArrayContext {
checkEnv(key)
if (array.size < key.id) {
if (array.size <= key.id) {
array = array.copyOf(idGenerator.get())
}
array[key.id] = value

View File

@ -9,14 +9,13 @@ dependencies {
api(project(":ts-core"))
api(project(":ts-core:ts-buffer"))
implementation(project(":ts-core:ts-xml"))
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0")
compileOnly("com.squareup.okhttp3:okhttp:4.9.3")
api(group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version = "1.6.0")
api(group = "com.squareup.okhttp3", name = "okhttp", version = "4.9.3")
compileOnly(group = "io.netty", name = "netty-all", version = "4.1.72.Final")
//api(group = "com.squareup.retrofit2", name = "converter-gson", version = "2.9.0")
//api(group = "com.squareup.retrofit2", name = "retrofit", version = "2.9.0")
// https://mvnrepository.com/artifact/org.jsoup/jsoup
api(group = "org.jsoup", name = "jsoup", version = "1.14.3")
//api(group = "org.jsoup", name = "jsoup", version = "1.14.3")
testImplementation(project(":ts-core:ts-coroutine"))
}

View File

@ -1,321 +1,192 @@
package cn.tursom.http.client
import okhttp3.*
import okhttp3.MediaType.Companion.toMediaTypeOrNull
import okhttp3.Call
import okhttp3.OkHttpClient
import okhttp3.RequestBody
import okhttp3.Response
import java.io.File
import java.io.IOException
import java.net.InetSocketAddress
import java.net.Proxy
import java.net.SocketAddress
import java.net.URLEncoder
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
@Deprecated("this object is deprecated. it will be removed on 2022/5")
@Suppress("unused", "MemberVisibilityCanBePrivate")
object AsyncHttpRequest {
val defaultClient: OkHttpClient = OkHttpClient().newBuilder()
.retryOnConnectionFailure(true)
.build()
val socketClient: OkHttpClient = proxyClient()
val httpProxyClient: OkHttpClient =
proxyClient(port = 8080, type = Proxy.Type.HTTP)
@Deprecated("it should replace by Okhttp.defaultClient",
ReplaceWith("Okhttp.default"))
var defaultClient: OkHttpClient by Okhttp::default
fun proxyClient(
host: String = "127.0.0.1",
port: Int = 1080,
type: Proxy.Type = Proxy.Type.SOCKS
): OkHttpClient = OkHttpClient().newBuilder()
.proxy(Proxy(type, InetSocketAddress(host, port) as SocketAddress))
.retryOnConnectionFailure(true)
.build()
@Deprecated("it should replace by Okhttp.socket",
ReplaceWith("Okhttp.socket"))
val socketClient: OkHttpClient by Okhttp::socket
suspend fun sendRequest(call: Call): Response = suspendCoroutine {
call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
it.resumeWithException(e)
}
@Deprecated("it should replace by Okhttp.httpProxy",
ReplaceWith("Okhttp.httpProxy"))
val httpProxyClient: OkHttpClient by Okhttp::httpProxy
override fun onResponse(call: Call, response: Response) {
it.resume(response)
}
})
}
@Deprecated("it should replace by call.sendRequest()",
ReplaceWith("call.sendRequest()"))
suspend fun sendRequest(call: Call): Response = call.sendRequest()
@JvmOverloads
@Deprecated("it should replace by client.get()",
ReplaceWith("client.get(url, param, headers)"))
suspend fun get(
url: String,
param: Map<String, String>? = null,
headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient
): Response {
val paramSB = StringBuilder()
param?.forEach {
paramSB.append("${URLEncoder.encode(it.key, "UTF-8")}=${URLEncoder.encode(it.value, "UTF-8")}&")
}
if (paramSB.isNotEmpty())
paramSB.deleteCharAt(paramSB.length - 1)
client: OkHttpClient = defaultClient,
): Response = client.get(url, param, headers)
val requestBuilder = Request.Builder().get()
.url("$url?$paramSB")
headers?.forEach { t, u ->
requestBuilder.addHeader(t, u)
}
return sendRequest(
client.newCall(
requestBuilder.build()
)
)
}
private suspend fun post(
@JvmOverloads
@Deprecated("it should replace by client.post()",
ReplaceWith("client.post(url, body, headers)"))
suspend fun post(
url: String,
body: RequestBody,
headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient
): Response {
val requestBuilder = Request.Builder()
.post(body)
.url(url)
headers?.forEach { t, u ->
requestBuilder.addHeader(t, u)
}
return sendRequest(client.newCall(requestBuilder.build()))
}
client: OkHttpClient = defaultClient,
): Response = client.post(url, body, headers)
@JvmOverloads
@Deprecated("it should replace by client.post()",
ReplaceWith("client.post(url, param, headers)"))
suspend fun post(
url: String,
param: Map<String, String>,
headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient
): Response {
val formBuilder = FormBody.Builder()
param.forEach { (t, u) ->
formBuilder.add(t, u)
}
return post(url, formBuilder.build(), headers, client)
}
client: OkHttpClient = defaultClient,
): Response = client.post(url, param, headers)
@JvmOverloads
@Deprecated("it should replace by client.post()",
ReplaceWith("client.post(url, body, headers)"))
suspend fun post(
url: String,
body: String,
headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient
) = post(
url,
RequestBody.create("text/plain;charset=utf-8".toMediaTypeOrNull(), body),
headers,
client
)
client: OkHttpClient = defaultClient,
) = client.post(url, body, headers)
@JvmOverloads
@Deprecated("it should replace by client.post()",
ReplaceWith("client.post(url, body, headers)"))
suspend fun post(
url: String,
body: File,
headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient
) = post(
url,
RequestBody.create("application/octet-stream".toMediaTypeOrNull(), body),
headers,
client
)
client: OkHttpClient = defaultClient,
) = client.post(url, body, headers)
@JvmOverloads
@Deprecated("it should replace by client.post()",
ReplaceWith("client.post(url, body, headers)"))
suspend fun post(
url: String,
body: ByteArray,
headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient
) = post(
url,
RequestBody.create("application/octet-stream".toMediaTypeOrNull(), body),
headers,
client
)
suspend fun getStr(
url: String,
param: Map<String, String>? = null,
headers: Map<String, String>? = null
): String {
return getStr(url, param, headers, defaultClient)
}
client: OkHttpClient = defaultClient,
) = client.post(url, body, headers)
@Suppress("BlockingMethodInNonBlockingContext")
@JvmOverloads
@Deprecated("it should replace by client.getStr()",
ReplaceWith("client.getStr(url, param, headers)"))
suspend fun getStr(
url: String,
param: Map<String, String>? = null,
headers: Map<String, String>? = null,
client: OkHttpClient
): String {
val response = get(url, param, headers, client)
return response.body!!.string()
}
client: OkHttpClient = defaultClient,
): String = client.getStr(url, param, headers)
@Suppress("BlockingMethodInNonBlockingContext")
private suspend fun postStr(
@JvmOverloads
@Deprecated("it should replace by client.postStr()",
ReplaceWith("client.postStr(url, body, headers)"))
suspend fun postStr(
url: String,
body: RequestBody,
headers: Map<String, String>? = null,
client: OkHttpClient
): String = post(url, body, headers, client).body!!.string()
suspend fun postStr(
url: String,
param: Map<String, String>,
headers: Map<String, String>? = null
): String =
postStr(url, param, headers, defaultClient)
client: OkHttpClient = defaultClient,
): String = client.postStr(url, body, headers)
@JvmOverloads
@Deprecated("it should replace by client.postStr()",
ReplaceWith("client.postStr(url, param, headers)"))
suspend fun postStr(
url: String,
param: Map<String, String>,
headers: Map<String, String>? = null,
client: OkHttpClient
): String {
val formBuilder = FormBody.Builder()
param.forEach { (t, u) ->
formBuilder.add(t, u)
}
return postStr(url, formBuilder.build(), headers, client)
}
suspend fun postStr(
url: String,
body: String,
headers: Map<String, String>? = null
): String =
postStr(url, body, headers, defaultClient)
client: OkHttpClient = defaultClient,
): String = client.postStr(url, param, headers)
@JvmOverloads
@Deprecated("it should replace by client.postStr()",
ReplaceWith("client.postStr(url, body, headers)"))
suspend fun postStr(
url: String,
body: String,
headers: Map<String, String>? = null,
client: OkHttpClient
): String = postStr(
url,
RequestBody.create("text/plain;charset=utf-8".toMediaTypeOrNull(), body),
headers,
client
)
suspend fun postStr(
url: String,
body: File,
headers: Map<String, String>? = null
): String =
postStr(url, body, headers, defaultClient)
client: OkHttpClient = defaultClient,
): String = client.postStr(url, body, headers)
@JvmOverloads
@Deprecated("it should replace by client.postStr()",
ReplaceWith("client.postStr(url, body, headers)"))
suspend fun postStr(
url: String,
body: File,
headers: Map<String, String>? = null,
client: OkHttpClient
): String = postStr(
url,
RequestBody.create("application/octet-stream".toMediaTypeOrNull(), body),
headers,
client
)
suspend fun getByteArray(
url: String,
param: Map<String, String>? = null,
headers: Map<String, String>? = null
): ByteArray = getByteArray(
url,
param,
headers,
defaultClient
)
client: OkHttpClient = defaultClient,
): String = client.postStr(url, body, headers)
@Suppress("BlockingMethodInNonBlockingContext")
@JvmOverloads
@Deprecated("it should replace by client.getByteArray()",
ReplaceWith("client.getByteArray(url, param, headers)"))
suspend fun getByteArray(
url: String,
param: Map<String, String>? = null,
headers: Map<String, String>? = null,
client: OkHttpClient
): ByteArray = get(url, param, headers, client).body!!.bytes()
client: OkHttpClient = defaultClient,
): ByteArray = client.getByteArray(url, param, headers)
@Suppress("BlockingMethodInNonBlockingContext")
private suspend fun postByteArray(
@JvmOverloads
@Deprecated("it should replace by client.postByteArray()",
ReplaceWith("client.postByteArray(url, body, headers)"))
suspend fun postByteArray(
url: String,
body: RequestBody,
headers: Map<String, String>? = null,
client: OkHttpClient
): ByteArray = post(url, body, headers, client).body!!.bytes()
suspend fun postByteArray(
url: String,
param: Map<String, String>,
headers: Map<String, String>? = null
): ByteArray = postByteArray(
url,
param,
headers,
defaultClient
)
client: OkHttpClient = defaultClient,
): ByteArray = client.postByteArray(url, body, headers)
@JvmOverloads
@Deprecated("it should replace by client.postByteArray()",
ReplaceWith("client.postByteArray(url, param, headers)"))
suspend fun postByteArray(
url: String,
param: Map<String, String>,
headers: Map<String, String>? = null,
client: OkHttpClient
): ByteArray {
val formBuilder = FormBody.Builder()
param.forEach { (t, u) ->
formBuilder.add(t, u)
}
return postByteArray(url, formBuilder.build(), headers, client)
}
suspend fun postByteArray(
url: String,
body: String,
headers: Map<String, String>? = null
): ByteArray = postByteArray(
url,
body,
headers,
defaultClient
)
client: OkHttpClient = defaultClient,
): ByteArray = client.postByteArray(url, param, headers)
@JvmOverloads
@Deprecated("it should replace by client.postByteArray()",
ReplaceWith("client.postByteArray(url, body, headers)"))
suspend fun postByteArray(
url: String,
body: String,
headers: Map<String, String>? = null,
client: OkHttpClient
): ByteArray = postByteArray(
url,
RequestBody.create("text/plain;charset=utf-8".toMediaTypeOrNull(), body),
headers,
client
)
suspend fun postByteArray(
url: String,
body: File,
headers: Map<String, String>? = null
): ByteArray = postByteArray(
url,
body,
headers,
defaultClient
)
client: OkHttpClient = defaultClient,
): ByteArray = client.postByteArray(url, body, headers)
@JvmOverloads
@Deprecated("it should replace by client.postByteArray()",
ReplaceWith("client.postByteArray(url, body, headers)"))
suspend fun postByteArray(
url: String,
body: File,
headers: Map<String, String>? = null,
client: OkHttpClient
): ByteArray = postByteArray(
url,
RequestBody.create("application/octet-stream".toMediaTypeOrNull(), body),
headers,
client
)
client: OkHttpClient = defaultClient,
): ByteArray = client.postByteArray(url, body, headers)
}

View File

@ -0,0 +1,32 @@
package cn.tursom.http.client
import okhttp3.Call
import okhttp3.Response
@JvmOverloads
suspend fun Call.Factory.get(
url: String,
param: Map<String, String?>? = null,
headers: Map<String, String>? = null,
): Response = newCall {
url(url) {
addQueryParameters(param)
}
addHeaders(headers)
}.sendRequest()
@Suppress("BlockingMethodInNonBlockingContext")
@JvmOverloads
suspend fun Call.Factory.getStr(
url: String,
param: Map<String, String>? = null,
headers: Map<String, String>? = null,
): String = get(url, param, headers).body!!.string()
@Suppress("BlockingMethodInNonBlockingContext")
@JvmOverloads
suspend fun Call.Factory.getByteArray(
url: String,
param: Map<String, String>? = null,
headers: Map<String, String>? = null,
): ByteArray = get(url, param, headers).body!!.bytes()

View File

@ -0,0 +1,156 @@
package cn.tursom.http.client
import okhttp3.*
import okhttp3.MediaType.Companion.toMediaTypeOrNull
import okhttp3.RequestBody.Companion.asRequestBody
import okhttp3.RequestBody.Companion.toRequestBody
import java.io.File
@JvmOverloads
@OkhttpMaker
suspend inline fun Call.Factory.post(
url: String,
headers: Map<String, String>? = null,
body: Request.Builder.() -> RequestBody,
): Response = newCall {
post(body())
url(url)
addHeaders(headers)
}.sendRequest()
@JvmOverloads
suspend fun Call.Factory.post(
url: String,
body: RequestBody,
headers: Map<String, String>? = null,
): Response = newCall {
post(body)
url(url)
addHeaders(headers)
}.sendRequest()
@JvmOverloads
suspend fun Call.Factory.post(
url: String,
param: Map<String, String>,
headers: Map<String, String>? = null,
): Response = post(url, headers) {
form {
add(param)
}
}
@JvmOverloads
suspend fun Call.Factory.post(
url: String,
body: String,
headers: Map<String, String>? = null,
) = post(url, headers) {
body.toRequestBody("text/plain;charset=utf-8".toMediaTypeOrNull())
}
@JvmOverloads
suspend fun Call.Factory.post(
url: String,
body: File,
headers: Map<String, String>? = null,
) = post(url, headers) {
body.asRequestBody("application/octet-stream".toMediaTypeOrNull())
}
@JvmOverloads
suspend fun Call.Factory.post(
url: String,
body: ByteArray,
headers: Map<String, String>? = null,
) = post(url, headers) {
body.toRequestBody("application/octet-stream".toMediaTypeOrNull())
}
@Suppress("BlockingMethodInNonBlockingContext")
@JvmOverloads
@OkhttpMaker
suspend inline fun Call.Factory.postStr(
url: String,
headers: Map<String, String>? = null,
body: Request.Builder.() -> RequestBody,
): String = post(url, headers, body).body!!.string()
@Suppress("BlockingMethodInNonBlockingContext")
@JvmOverloads
suspend fun Call.Factory.postStr(
url: String,
body: RequestBody,
headers: Map<String, String>? = null,
): String = post(url, body, headers).body!!.string()
@JvmOverloads
suspend fun Call.Factory.postStr(
url: String,
param: Map<String, String>,
headers: Map<String, String>? = null,
): String = postStr(url, headers) {
FormBody.Builder().add(param).build()
}
@JvmOverloads
suspend fun Call.Factory.postStr(
url: String,
body: String,
headers: Map<String, String>? = null,
): String = postStr(url, headers) {
body.toRequestBody("text/plain;charset=utf-8".toMediaTypeOrNull())
}
@JvmOverloads
suspend fun Call.Factory.postStr(
url: String,
body: File,
headers: Map<String, String>? = null,
): String = postStr(url, headers) {
body.asRequestBody("application/octet-stream".toMediaTypeOrNull())
}
@Suppress("BlockingMethodInNonBlockingContext")
@JvmOverloads
@OkhttpMaker
suspend inline fun Call.Factory.postByteArray(
url: String,
headers: Map<String, String>? = null,
body: Request.Builder.() -> RequestBody,
): ByteArray = post(url, headers, body).body!!.bytes()
@Suppress("BlockingMethodInNonBlockingContext")
@JvmOverloads
suspend fun Call.Factory.postByteArray(
url: String,
body: RequestBody,
headers: Map<String, String>? = null,
): ByteArray = post(url, body, headers).body!!.bytes()
@JvmOverloads
suspend fun Call.Factory.postByteArray(
url: String,
param: Map<String, String>,
headers: Map<String, String>? = null,
): ByteArray = postByteArray(url, headers) {
FormBody.Builder().add(param).build()
}
@JvmOverloads
suspend fun Call.Factory.postByteArray(
url: String,
body: String,
headers: Map<String, String>? = null,
): ByteArray = postByteArray(url, headers) {
body.toRequestBody("text/plain;charset=utf-8".toMediaTypeOrNull())
}
@JvmOverloads
suspend fun Call.Factory.postByteArray(
url: String,
body: File,
headers: Map<String, String>? = null,
): ByteArray = postByteArray(url, headers) {
body.asRequestBody("application/octet-stream".toMediaTypeOrNull())
}

View File

@ -0,0 +1,108 @@
@file:Suppress("unused")
package cn.tursom.http.client
import okhttp3.*
import okhttp3.HttpUrl.Companion.toHttpUrl
import java.io.IOException
import java.net.InetSocketAddress
import java.net.Proxy
import java.net.SocketAddress
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
@DslMarker
@Retention(AnnotationRetention.BINARY)
annotation class OkhttpMaker
@OkhttpMaker
object Okhttp : Call.Factory, WebSocket.Factory {
val direct: OkHttpClient = OkHttpClient().newBuilder()
.retryOnConnectionFailure(true)
.build()
val socket: OkHttpClient = proxy()
val httpProxy: OkHttpClient = proxy(port = 8080, type = Proxy.Type.HTTP)
var default: OkHttpClient = direct
override fun newCall(request: Request): Call = default.newCall(request)
override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket =
default.newWebSocket(request, listener)
@JvmOverloads
fun proxy(
host: String = "127.0.0.1",
port: Int = 1080,
type: Proxy.Type = Proxy.Type.SOCKS,
builder: OkHttpClient.Builder = OkHttpClient().newBuilder(),
): OkHttpClient = builder
.proxy(Proxy(type, InetSocketAddress(host, port) as SocketAddress))
.retryOnConnectionFailure(true)
.build()
}
@OkhttpMaker
inline fun <B : Request.Builder> B.url(url: String? = null, builder: HttpUrl.Builder.() -> Unit): B = apply {
val urlBuilder = url?.toHttpUrl()?.newBuilder() ?: HttpUrl.Builder()
urlBuilder.builder()
url(urlBuilder.build())
}
fun <B : Request.Builder> B.addHeaders(headers: Map<String, String>?): B = apply {
headers?.forEach { (k, v) ->
addHeader(k, v)
}
}
inline fun Request.Builder.form(builder: FormBody.Builder.() -> Unit): FormBody =
FormBody.Builder().build(builder)
fun HttpUrl.Builder.addQueryParameters(params: Map<String, String?>?) {
params?.forEach { (k, v) ->
addQueryParameter(k, v)
}
}
@OkhttpMaker
inline infix fun FormBody.Builder.build(builder: FormBody.Builder.() -> Unit): FormBody {
val form = FormBody.Builder()
form.builder()
return form.build()
}
fun FormBody.Builder.add(forms: Map<String, String>?) = apply {
forms?.forEach { (k, v) ->
add(k, v)
}
}
fun FormBody.Builder.addEncoded(forms: Map<String, String>?) = apply {
forms?.forEach { (k, v) ->
addEncoded(k, v)
}
}
suspend fun Call.sendRequest(): Response = suspendCoroutine {
enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
it.resumeWithException(e)
}
override fun onResponse(call: Call, response: Response) {
it.resume(response)
}
})
}
suspend fun Call.str() = sendRequest().body!!.string()
suspend fun Call.bytes() = sendRequest().body!!.bytes()
@OkhttpMaker
inline fun WebSocket.Factory.newWebSocket(listener: WebSocketListener, builder: Request.Builder.() -> Unit): WebSocket =
newWebSocket(Request.Builder().apply(builder).build(), listener)
@OkhttpMaker
inline fun Call.Factory.newCall(builder: Request.Builder.() -> Unit) =
newCall(Request.Builder().apply(builder).build())

View File

@ -9,8 +9,9 @@ plugins {
dependencies {
api(project(":ts-core"))
api("cglib", "cglib", "3.3.0")
implementation("org.apache.commons", "commons-lang3", "3.8.1")
api(project(":ts-core:ts-reflectasm"))
api(group = "cglib", name = "cglib", version = "3.3.0")
implementation(group = "org.apache.commons", name = "commons-lang3", version = "3.8.1")
testApi(group = "junit", name = "junit", version = "4.13.2")
}

View File

@ -1,27 +1,32 @@
package cn.tursom.proxy
import cn.tursom.core.context.Context
class ListProxyContainer : MutableProxyContainer {
private val proxyList: MutableList<ProxyMethod> = ArrayList()
override fun addProxy(proxy: ProxyMethod): Int {
class ListProxyContainer(
private val proxyList: MutableCollection<ProxyMethod> = ArrayList(),
) : MutableProxyContainer {
override var lastModify: Long = System.currentTimeMillis()
private set
override val context: Context = ProxyContainer.contextEnv.newContext()
override fun addProxy(proxy: ProxyMethod) {
lastModify = System.currentTimeMillis()
proxyList.add(proxy)
return proxyList.size - 1
}
override fun addAllProxy(proxy: Collection<ProxyMethod>?): Boolean {
lastModify = System.currentTimeMillis()
return proxyList.addAll(proxy!!)
}
override fun removeProxy(proxy: ProxyMethod) {
lastModify = System.currentTimeMillis()
proxyList.remove(proxy)
}
override fun removeProxy(index: Int) {
proxyList.removeAt(index)
}
override fun iterator(): MutableIterator<ProxyMethod> {
lastModify = System.currentTimeMillis()
return proxyList.iterator()
}
}

View File

@ -1,8 +1,7 @@
package cn.tursom.proxy
interface MutableProxyContainer : ProxyContainer {
fun addProxy(proxy: ProxyMethod): Int
fun addProxy(proxy: ProxyMethod)
fun addAllProxy(proxy: Collection<ProxyMethod>?): Boolean
fun removeProxy(proxy: ProxyMethod)
fun removeProxy(index: Int)
}

View File

@ -2,26 +2,32 @@ package cn.tursom.proxy
import cn.tursom.core.uncheckedCast
import net.sf.cglib.proxy.Enhancer
import net.sf.cglib.proxy.Factory
import java.util.concurrent.ConcurrentHashMap
object Proxy {
val enhancerMap = ConcurrentHashMap<Class<*>, Enhancer>()
private val cache = ConcurrentHashMap<Class<*>, Class<*>>()
private fun <T> getTarget(clazz: Class<T>): Class<T> = cache.computeIfAbsent(clazz) {
val enhancer = Enhancer()
enhancerMap[clazz] = enhancer
enhancer.setSuperclass(clazz)
enhancer.setCallbackType(ProxyInterceptor::class.java)
enhancer.setCallbackFilter { 0 }
enhancer.createClass()
}.uncheckedCast()
operator fun <T> get(clazz: Class<T>, builder: (Class<T>) -> T): Pair<T, MutableProxyContainer> {
operator fun <T> get(
clazz: Class<T>,
builder: (Class<T>) -> T,
): Pair<T, MutableProxyContainer> {
val target = getTarget(clazz)
val container = ListProxyContainer()
synchronized(target) {
Enhancer.registerCallbacks(target, arrayOf(ProxyInterceptor(container)))
return builder(target) to container
}
val obj = builder(target)
obj as Factory
obj.setCallback(0, ProxyInterceptor(container))
return obj to container
}
inline fun <reified T> get() = get(T::class.java)
@ -30,13 +36,13 @@ object Proxy {
arguments: Array<out Any?>,
) = get(T::class.java, argumentTypes, arguments)
operator fun <T> get(clazz: Class<T>): Pair<T, MutableProxyContainer> = get(clazz, Class<T>::newInstance)
operator fun <T> get(clazz: Class<T>) = get(clazz, Class<T>::newInstance)
operator fun <T> get(
clazz: Class<T>,
argumentTypes: Array<out Class<*>>,
arguments: Array<out Any?>,
): Pair<T, MutableProxyContainer> = get(clazz) {
) = get(clazz) {
it.getConstructor(*argumentTypes).newInstance(*arguments)
}
}

View File

@ -1,47 +1,30 @@
package cn.tursom.proxy
import cn.tursom.core.uncheckedCast
data class ProxyResult<out R>(
val result: R,
val success: Boolean = false,
) {
companion object {
val failed: ProxyResult<*> = ProxyResult<Any?>(null, false)
fun <R> of(): ProxyResult<R?> {
return of(null)
}
/**
* 返回一个临时使用的 Result 对象
* 因为是临时对象所以不要把这个对象放到任何当前函数堆栈以外的地方
* 如果要长期储存对象请 new Result
*/
fun <R> of(result: R): ProxyResult<R> {
return ProxyResult(result, true)
}
fun <R> failed(): ProxyResult<R> {
return failed.uncheckedCast()
}
}
}
import cn.tursom.core.context.ArrayContextEnv
import cn.tursom.core.context.Context
interface ProxyContainer : Iterable<ProxyMethod> {
}
companion object {
val contextEnv = ArrayContextEnv()
inline fun ProxyContainer.forEachProxy(action: (ProxyMethod) -> Unit) {
inline fun ProxyContainer.forEachProxy(action: (ProxyMethod) -> Unit) {
for (t in this) {
action(t)
}
}
}
inline fun <R> ProxyContainer.forFirstProxy(action: (ProxyMethod) -> ProxyResult<R>?): ProxyResult<R> {
inline fun <R> ProxyContainer.forFirstProxy(action: (ProxyMethod) -> ProxyResult<R>): ProxyResult<R> {
for (t in this) {
val result = action(t)
if (result != null && result.success) {
if (result.success) {
return result
}
}
return ProxyResult.failed()
}
}
// to impl, use override val context: Context = ProxyContainer.contextEnv.newContext()
val context: Context
val lastModify: Long
}

View File

@ -5,19 +5,19 @@ import java.lang.reflect.Method
import java.util.concurrent.ConcurrentHashMap
object ProxyContainerHandlerCache {
private val handlerMap: MutableMap<Method, (Any, ProxyContainer, Method, Array<out Any?>, MethodProxy) -> ProxyResult<Any?>> =
private val handlerMap: MutableMap<MethodProxy, (Any, ProxyContainer, Method, Array<out Any?>, MethodProxy) -> ProxyResult<Any?>> =
ConcurrentHashMap()
val callSuper = { obj: Any, _: ProxyContainer, _: Method, args: Array<out Any?>, proxy: MethodProxy ->
ProxyResult.of<Any?>(proxy.invokeSuper(obj, args))
}
val empty = { _: Any, _: ProxyContainer, _: Method, _: Array<out Any?>, _: MethodProxy -> ProxyResult.failed<Any?>() }
fun getHandler(method: Method): ((Any, ProxyContainer, Method, Array<out Any?>, MethodProxy) -> ProxyResult<Any?>)? {
fun getHandler(method: MethodProxy): ((Any, ProxyContainer, Method, Array<out Any?>, MethodProxy) -> ProxyResult<Any?>)? {
return handlerMap[method]
}
fun setHandler(
method: Method,
method: MethodProxy,
onProxy: ((Any, ProxyContainer, Method, Array<out Any?>, MethodProxy) -> ProxyResult<Any?>)?,
) {
handlerMap[method] = onProxy ?: callSuper

View File

@ -8,9 +8,10 @@ import java.util.*
class ProxyInterceptor(
private val container: ProxyContainer = ListProxyContainer(),
private val supportCallSuper: Boolean = true,
) : MethodInterceptor {
companion object {
private val HANDLE_DEQUE_THREAD_LOCAL = ThreadLocal<ArrayDeque<Any>>()
val callSuper = ThreadLocal<Boolean?>()
private val parameterTypes = arrayOf(Method::class.java, Array<Any>::class.java, MethodProxy::class.java)
private val parameterTypesField: Field = Method::class.java.getDeclaredField("parameterTypes").apply {
isAccessible = true
@ -26,18 +27,22 @@ class ProxyInterceptor(
}
fun isOnProxyMethod(method: Method): Boolean {
return equalsMethod(method, "onProxy", parameterTypes)
//return callSuper.get() == true || equalsMethod(method, "onProxy", parameterTypes)
return callSuper.get() == true
}
}
@Throws(Throwable::class)
override fun intercept(obj: Any, method: Method, args: Array<out Any?>, proxy: MethodProxy): Any? {
if (!isOnProxyMethod(method)) {
val result = ProxyRunner.onProxy(obj, container, method, args, proxy)
if (result != null && result.success) {
return result.result
}
}
if (supportCallSuper && callSuper.get() == true) {
callSuper.remove()
return proxy.invokeSuper(obj, args)
}
val result = ProxyRunner.onProxy(obj, container, method, args, proxy)
return if (result.success) {
result.result
} else {
proxy.invokeSuper(obj, args)
}
}
}

View File

@ -2,23 +2,44 @@ package cn.tursom.proxy
import cn.tursom.proxy.annotation.ForEachProxy
import cn.tursom.proxy.annotation.ForFirstProxy
import cn.tursom.reflect.asm.ReflectAsmUtils
import net.sf.cglib.proxy.MethodProxy
import java.lang.reflect.Method
import java.util.concurrent.ConcurrentHashMap
interface ProxyMethod {
@Throws(Throwable::class)
fun onProxy(obj: Any?, method: Method, args: Array<out Any?>, proxy: MethodProxy): ProxyResult<*>? {
val selfMethod: Method
fun onProxy(obj: Any?, method: Method, args: Array<out Any?>, proxy: MethodProxy): ProxyResult<*> {
val handlerCacheMap = getHandlerCacheMap(javaClass)
val methodResult = handlerCacheMap[method]
if (methodResult != null) {
return if (methodResult.success) {
ProxyResult.of(methodResult.result(this, *args))
ProxyResult.of(methodResult.result(this, args))
} else {
ProxyResult.failed<Any>()
}
}
val reflectAsmMethod = try {
ReflectAsmUtils.getMethod(
javaClass,
method.name,
paramTypes = method.parameterTypes,
returnType = method.returnType,
)
} catch (e: Exception) {
e.printStackTrace()
null
}
if (reflectAsmMethod != null) {
val (methodAccess, index) = reflectAsmMethod
handlerCacheMap[method] = ProxyResult({ p, a ->
methodAccess.invoke(p, index, *a)
}, true)
return ProxyResult.of(methodAccess.invoke(this, index, *args))
}
val selfMethod: Method
try {
var methodName = method.name
for (annotation in method.annotations) {
@ -36,19 +57,24 @@ interface ProxyMethod {
}
selfMethod = javaClass.getMethod(methodName, *method.parameterTypes)
selfMethod.isAccessible = true
handlerCacheMap[method] = ProxyResult(selfMethod, true)
} catch (e: Exception) {
handlerCacheMap[method] = ProxyResult({ p, a ->
selfMethod(p, *a)
}, true)
return ProxyResult.of<Any>(selfMethod(this, *args))
} catch (_: Exception) {
}
handlerCacheMap[method] = ProxyResult.failed()
return ProxyResult.failed<Any>()
}
return ProxyResult.of<Any>(selfMethod(this, *args))
}
companion object {
private val handlerCacheMapMap: MutableMap<Class<out ProxyMethod>, MutableMap<Method, ProxyResult<Method>>> =
private val handlerCacheMapMap: MutableMap<
Class<out ProxyMethod>,
MutableMap<Method, ProxyResult<(proxy: ProxyMethod, args: Array<out Any?>) -> Any?>>> =
HashMap()
fun getHandlerCacheMap(type: Class<out ProxyMethod>): MutableMap<Method, ProxyResult<Method>> {
fun getHandlerCacheMap(type: Class<out ProxyMethod>): MutableMap<Method, ProxyResult<(proxy: ProxyMethod, args: Array<out Any?>) -> Any?>> {
var handlerCacheMap = handlerCacheMapMap[type]
if (handlerCacheMap == null) synchronized(handlerCacheMapMap) {
handlerCacheMap = handlerCacheMapMap[type]
@ -59,13 +85,5 @@ interface ProxyMethod {
}
return handlerCacheMap!!
}
fun getProxyMethod(clazz: Class<*>, name: String, vararg parameterTypes: Class<*>): Method? {
return try {
clazz.getDeclaredMethod(name, *parameterTypes)
} catch (e: NoSuchMethodException) {
throw RuntimeException(e)
}
}
}
}

View File

@ -0,0 +1,32 @@
package cn.tursom.proxy
import net.sf.cglib.proxy.MethodProxy
import java.lang.reflect.Method
typealias ProxyMethodCacheFunction = (obj: Any?, method: Method, args: Array<out Any?>, proxy: MethodProxy) -> ProxyResult<*>
class ProxyMethodCache(
private var lastModify: Long = 0,
private var function: ProxyMethodCacheFunction = failed,
) {
companion object {
val failed: ProxyMethodCacheFunction = { _, _, _, _ -> ProxyResult.failed }
}
fun update(lastModify: Long, function: ProxyMethodCacheFunction = this.function) {
this.lastModify = lastModify
this.function = function
}
operator fun invoke(
lastModify: Long,
obj: Any?,
method: Method,
args: Array<out Any?>,
proxy: MethodProxy,
): ProxyResult<*>? = if (lastModify != this.lastModify) {
null
} else {
function(obj, method, args, proxy)
}
}

View File

@ -0,0 +1,17 @@
package cn.tursom.proxy
import cn.tursom.core.uncheckedCast
data class ProxyResult<out R>(
val result: R,
val success: Boolean = false,
val cache: Boolean = false,
) {
companion object {
val success: ProxyResult<*> = ProxyResult<Any?>(null, true)
val failed: ProxyResult<*> = ProxyResult<Any?>(null, false)
fun <R> of(): ProxyResult<R?> = success.uncheckedCast()
fun <R> of(result: R): ProxyResult<R> = ProxyResult(result, true)
fun <R> failed(): ProxyResult<R> = failed.uncheckedCast()
}
}

View File

@ -1,5 +1,7 @@
package cn.tursom.proxy
import cn.tursom.proxy.ProxyContainer.Companion.forEachProxy
import cn.tursom.proxy.ProxyContainer.Companion.forFirstProxy
import cn.tursom.proxy.annotation.ForEachProxy
import cn.tursom.proxy.annotation.ForFirstProxy
import net.sf.cglib.proxy.MethodProxy
@ -7,8 +9,13 @@ import org.apache.commons.lang3.StringUtils
import java.lang.reflect.Method
import java.util.*
object ProxyRunner {
private val errMsgSearchList = arrayOf("%M", "%B", "%A")
private val forFirstProxyCacheKey =
ProxyContainer.contextEnv.newKey<ProxyMethodCache>().withDefault { ProxyMethodCache() }
private val forEachProxyCacheKey =
ProxyContainer.contextEnv.newKey<ProxyMethodCache>().withDefault { ProxyMethodCache() }
/**
* will be call when proxy method invoke.
@ -16,7 +23,7 @@ object ProxyRunner {
*/
@Throws(Throwable::class)
fun onProxy(obj: Any, c: ProxyContainer, method: Method, args: Array<out Any?>, proxy: MethodProxy): ProxyResult<*> {
var handler = ProxyContainerHandlerCache.getHandler(method)
var handler = ProxyContainerHandlerCache.getHandler(proxy)
if (handler != null) {
return handler(obj, c, method, args, proxy)
}
@ -34,18 +41,20 @@ object ProxyRunner {
//handler = ProxyContainerHandlerCache.callSuper
handler = onForFirstProxy(ForFirstProxy.EMPTY)
}
ProxyContainerHandlerCache.setHandler(method, handler)
ProxyContainerHandlerCache.setHandler(proxy, handler)
return handler(obj, c, method, args, proxy)
}
fun onForFirstProxy(forFirstProxy: ForFirstProxy) =
{ o: Any, c: ProxyContainer, m: Method, a: Array<out Any?>, p: MethodProxy ->
onForFirstProxy(o, c, m, a, p, forFirstProxy, when (forFirstProxy.value.size) {
private fun onForFirstProxy(forFirstProxy: ForFirstProxy): (Any, ProxyContainer, Method, Array<out Any?>, MethodProxy) -> ProxyResult<*> {
return { o: Any, c: ProxyContainer, m: Method, a: Array<out Any?>, p: MethodProxy ->
c.context[forFirstProxyCacheKey](c.lastModify, o, m, a, p) ?: onForFirstProxy(o, c, m, a, p, forFirstProxy,
when (forFirstProxy.value.size) {
0 -> emptyList()
1 -> listOf(forFirstProxy.value[0].java)
else -> forFirstProxy.value.asSequence().map { it.java }.toSet()
})
}
}
private fun onForFirstProxy(
obj: Any,
@ -56,9 +65,14 @@ object ProxyRunner {
forFirstProxy: ForFirstProxy,
classes: Collection<Class<*>>,
): ProxyResult<*> {
val cache = container.context[forFirstProxyCacheKey]
val result = container.forFirstProxy { p ->
if (classes.isEmpty() || classes.stream().anyMatch { c: Class<*> -> c.isInstance(p) }) {
return@forFirstProxy p.onProxy(obj, method, args, proxy)
val result = p.onProxy(obj, method, args, proxy)
if (forFirstProxy.cache && result.success && result.cache) {
cache.update(container.lastModify, p::onProxy)
}
return@forFirstProxy result
} else {
return@forFirstProxy ProxyResult.failed
}
@ -87,7 +101,16 @@ object ProxyRunner {
}
errMsg = StringUtils.replaceEach(errMsg, errMsgSearchList, replacementList)
throw forFirstProxy.errClass.java.getConstructor(String::class.java).newInstance(errMsg)
val exceptionConstructor = forFirstProxy.errClass.java.getConstructor(String::class.java)
if (forFirstProxy.cache) {
cache.update(container.lastModify) { _, _, _, _ ->
throw exceptionConstructor.newInstance(errMsg)
}
}
throw exceptionConstructor.newInstance(errMsg)
}
if (forFirstProxy.cache) {
cache.update(container.lastModify)
}
return ProxyResult.failed
}
@ -100,12 +123,14 @@ object ProxyRunner {
private fun onForeachProxy(
classes: Collection<Class<*>>,
) = label@{ o: Any, c: ProxyContainer, m: Method, a: Array<out Any?>, proxy1: MethodProxy ->
): (Any, ProxyContainer, Method, Array<out Any?>, MethodProxy) -> ProxyResult<Any?> {
return (label@{ o: Any, c: ProxyContainer, m: Method, a: Array<out Any?>, proxy1: MethodProxy ->
c.forEachProxy { p ->
if (classes.isEmpty() || classes.any { c: Class<*> -> c.isInstance(p) }) {
p.onProxy(o, m, a, proxy1)
}
}
ProxyResult.failed<Any?>()
})
}
}

View File

@ -14,6 +14,7 @@ annotation class ForFirstProxy(
val must: Boolean = false,
val errMsg: String = "",
val errClass: KClass<out RuntimeException> = RuntimeException::class,
val cache: Boolean = true,
) {
companion object {
val EMPTY = ForFirstProxy()

View File

@ -1,31 +1,32 @@
package cn.tursom.proxy
import cn.tursom.proxy.Example.GetA
import cn.tursom.proxy.annotation.ForEachProxy
import org.junit.Test
class Example {
open class TestClass protected constructor() {
@get:ForEachProxy
open val a: Int = 0
//@get:ForEachProxy
open var a: Int = 0
}
fun interface GetA : ProxyMethod {
fun getA(): Int
class GetA(
private val t: TestClass,
) : ProxyMethod {
fun getA(): Int {
ProxyInterceptor.callSuper.set(true)
return t.a + 1
}
}
@Test
fun test() {
repeat(3) {
val (t, container) = Proxy.get<TestClass>()
container.addProxy(GetA {
println("on proxy method")
0
})
container.addProxy(GetA(t))
println(t.javaClass)
println(t.a)
repeat(1000000000) {
t.a = t.a
//println(t.a)
}
}
}

View File

@ -179,6 +179,4 @@ object ReflectAsmUtils {
methodAccess.invoke(this, index, a1, a2, a3, a4, a5, a6) as R
}
}
}

View File

@ -0,0 +1,12 @@
package cn.tursom.web
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.web.utils.Chunked
interface ChunkedMessage {
fun writeChunkedHeader()
fun addChunked(buffer: ByteBuffer) = addChunked { buffer }
fun addChunked(buffer: () -> ByteBuffer)
fun finishChunked()
fun finishChunked(chunked: Chunked)
}

View File

@ -2,14 +2,13 @@ package cn.tursom.web
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.urlDecode
import cn.tursom.web.utils.Chunked
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.File
import java.io.RandomAccessFile
import java.net.SocketAddress
interface HttpContent : ResponseHeaderAdapter, RequestHeaderAdapter {
interface HttpContent : ResponseHeaderAdapter, RequestHeaderAdapter, ChunkedMessage {
val requestSendFully: Boolean
val finished: Boolean
val uri: String
@ -132,18 +131,12 @@ interface HttpContent : ResponseHeaderAdapter, RequestHeaderAdapter {
fun deleteCookie(name: String, path: String = "/") =
addCookie(name, "deleted; expires=Thu, 01 Jan 1970 00:00:00 GMT", path = path)
fun writeChunkedHeader()
fun addChunked(buffer: ByteBuffer) = addChunked { buffer }
fun addChunked(buffer: () -> ByteBuffer)
fun finishChunked()
fun finishChunked(chunked: Chunked)
fun finishFile(file: File, chunkSize: Int = 8192)
fun finishFile(
file: RandomAccessFile,
offset: Long = 0,
length: Long = file.length() - offset,
chunkSize: Int = 8192
chunkSize: Int = 8192,
)
fun jump(url: String) = temporaryMoved(url)

View File

@ -0,0 +1,5 @@
package cn.tursom.web.client
interface HttpClient {
suspend fun request(method: String, url: String, ssl: Boolean? = null): HttpRequest
}

View File

@ -0,0 +1,25 @@
package cn.tursom.web.client
import cn.tursom.core.buffer.ByteBuffer
interface HttpRequest {
var version: String
var method: String
var path: String
val params: Map<String, List<String>>
fun addParam(key: String, value: String)
fun addParams(params: Map<String, String>) {
params.forEach(::addParam)
}
val headers: Iterable<Map.Entry<String, String>>
fun addHeader(key: String, value: Any)
fun addHeaders(headers: Map<String, Any>) {
headers.forEach(::addHeader)
}
fun body(data: ByteBuffer)
suspend fun send(): HttpResponse
}

View File

@ -0,0 +1,10 @@
package cn.tursom.web.client
interface HttpResponse {
val code: Int
val reason: String
val headers: Iterable<Map.Entry<String, String>>
fun getHeader(key: String): String?
fun getHeaders(key: String): List<String>
val body: HttpResponseStream
}

View File

@ -0,0 +1,21 @@
package cn.tursom.web.client
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer
import java.io.Closeable
interface HttpResponseStream : Closeable {
suspend fun skip(n: Long)
suspend fun read(): Int
suspend fun read(buffer: ByteBuffer)
suspend fun read(
buffer: ByteArray,
offset: Int = 0,
len: Int = buffer.size - offset,
): Int {
val byteBuffer = HeapByteBuffer(buffer, offset, len)
byteBuffer.clear()
read(byteBuffer)
return byteBuffer.writePosition
}
}

View File

@ -0,0 +1,5 @@
package cn.tursom.web.utils
enum class HttpVersion {
HTTP_1_0, HTTP_1_1,
}

View File

@ -0,0 +1,20 @@
plugins {
kotlin("jvm")
`maven-publish`
id("ts-gradle")
}
dependencies {
api(project(":ts-core"))
api(project(":ts-core:ts-buffer"))
api(project(":ts-core:ts-log"))
api(project(":ts-web"))
api(project(":ts-web:ts-web-netty"))
api(group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version = "1.6.0")
api(group = "io.netty", name = "netty-all", version = "4.1.72.Final")
api(group = "org.slf4j", name = "slf4j-api", version = "1.7.32")
}

View File

@ -0,0 +1,48 @@
package cn.tursom.web.client.netty
import io.netty.handler.timeout.WriteTimeoutHandler
import kotlinx.coroutines.channels.Channel
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
class HttpConnectionPool(
host: String,
port: Int,
ssl: Boolean,
private val maxConn: Int = 5,
) {
companion object {
private data class PoolDesc(val host: String, val port: Int, val ssl: Boolean)
private val poolCache = ConcurrentHashMap<PoolDesc, HttpConnectionPool>()
fun poolOf(host: String, port: Int, ssl: Boolean) = poolCache.getOrPut(PoolDesc(host, port, ssl)) {
HttpConnectionPool(host, port, ssl)
}
}
private val group = HttpExecutor.group(host, port, ssl) {
it.attr(NettyHttpResultResume.countKey).set(conn)
it.pipeline()
.addLast(NettyHttpResultResume)
.addLast(WriteTimeoutHandler(60))
}
private val pool = Channel<NettyHttpConnection>(maxConn)
private val conn = AtomicInteger()
suspend fun <R> useConnection(handler: suspend (NettyHttpConnection) -> R): R {
val client = if (conn.getAndIncrement() < maxConn) {
group()
} else {
conn.decrementAndGet()
pool.receive()
}
val result = try {
handler(client)
} catch (e: Exception) {
client.channel.close()
throw e
}
pool.send(client)
return result
}
}

View File

@ -0,0 +1,65 @@
package cn.tursom.web.client.netty
import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.util.InsecureTrustManagerFactory
object HttpExecutor {
private val group = NioEventLoopGroup()
fun group(
host: String,
port: Int,
ssl: Boolean,
initChannel: (SocketChannel) -> Unit = {},
): suspend () -> NettyHttpConnection {
val sslCtx = if (ssl) {
SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build()
} else {
null
}
val bootstrap = Bootstrap()
.group(group)
.channel(NioSocketChannel::class.java)
.handler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(ch: SocketChannel) {
ch.pipeline().apply {
if (sslCtx != null) {
addLast(sslCtx.newHandler(ch.alloc(), host, port))
}
addLast(HttpClientCodec())
}
initChannel(ch)
}
override fun exceptionCaught(ctx: ChannelHandlerContext?, cause: Throwable?) {
super.exceptionCaught(ctx, cause)
}
})
return {
val channelFuture = bootstrap.connect(host, port)
channelFuture.awaitSuspend()
HttpClientImpl(channelFuture.channel() as SocketChannel)
}
}
suspend fun connect(
host: String,
port: Int,
ssl: Boolean,
initChannel: (SocketChannel) -> Unit = {},
): NettyHttpConnection {
return group(host, port, ssl, initChannel)()
}
private class HttpClientImpl(
override val channel: SocketChannel,
) : NettyHttpConnection
}

View File

@ -0,0 +1,26 @@
package cn.tursom.web.client.netty
import cn.tursom.web.client.HttpClient
import cn.tursom.web.client.HttpRequest
import java.net.URI
open class NettyHttpClient : HttpClient {
override suspend fun request(method: String, url: String, ssl: Boolean?): HttpRequest {
val uri = URI.create(url)
val port = if (uri.port < 0) {
when (uri.scheme ?: "http") {
"http" -> 80
"https" -> 443
else -> -1
}
} else {
uri.port
}
val pool = HttpConnectionPool.poolOf(uri.host, port, uri.scheme == "https")
val request = NettyHttpRequest(pool)
request.method = method
request.path = uri.path
return request
}
}

View File

@ -0,0 +1,18 @@
package cn.tursom.web.client.netty
import io.netty.channel.socket.SocketChannel
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.HttpObject
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
interface NettyHttpConnection {
val channel: SocketChannel
suspend fun request(request: FullHttpRequest): ReceiveChannel<HttpObject> {
val ktChannel = Channel<HttpObject>(Channel.UNLIMITED)
channel.attr(NettyHttpResultResume.recvChannelKey).set(ktChannel)
channel.writeAndFlush(request)
return ktChannel
}
}

View File

@ -0,0 +1,66 @@
package cn.tursom.web.client.netty
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.NettyByteBuffer
import cn.tursom.log.impl.Slf4jImpl
import cn.tursom.web.client.HttpRequest
import cn.tursom.web.client.HttpResponse
import io.netty.buffer.Unpooled
import io.netty.handler.codec.http.DefaultFullHttpRequest
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpVersion
class NettyHttpRequest(
private val pool: HttpConnectionPool,
) : HttpRequest {
companion object : Slf4jImpl()
private var request: FullHttpRequest = DefaultFullHttpRequest(HttpVersion.HTTP_1_0, HttpMethod.GET, "")
override var version: String
get() = request.protocolVersion().text()
set(value) {
request.protocolVersion = HttpVersion.valueOf(value)
}
override var method: String
get() = request.method().name()
set(value) {
request.method = HttpMethod.valueOf(value)
}
override var path: String
get() = request.uri()
set(value) {
request.uri = value
}
override val params = HashMap<String, ArrayList<String>>().withDefault { ArrayList() }
override fun addParam(key: String, value: String) {
params[key]!!.add(value)
}
override val headers: Iterable<Map.Entry<String, String>>
get() = request.headers()
override fun addHeader(key: String, value: Any) {
request.headers().add(key, value)
}
override fun body(data: ByteBuffer) {
val byteBuf = if (data is NettyByteBuffer) {
data.byteBuf
} else {
Unpooled.wrappedBuffer(data.getBytes())
}
request = request.replace(byteBuf)
}
override suspend fun send(): HttpResponse {
val receiveChannel = pool.useConnection {
it.request(request)
}
val httpResponse = receiveChannel.receive() as io.netty.handler.codec.http.HttpResponse
return NettyHttpResponse(httpResponse, receiveChannel)
}
}

View File

@ -0,0 +1,75 @@
package cn.tursom.web.client.netty
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.NettyByteBuffer
import cn.tursom.web.client.HttpResponse
import cn.tursom.web.client.HttpResponseStream
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpObject
import kotlinx.coroutines.channels.ReceiveChannel
class NettyHttpResponse(
private val response: io.netty.handler.codec.http.HttpResponse,
channel: ReceiveChannel<HttpObject>,
) : HttpResponse {
override val code: Int
get() = response.status().code()
override val reason: String
get() = response.status().reasonPhrase()
override val headers get() = response.headers()!!
override fun getHeader(key: String): String? = response.headers().get(key)
override fun getHeaders(key: String): List<String> = response.headers().getAll(key)
override val body: HttpResponseStream = NettyStream(response, channel)
private class NettyStream(
response: HttpObject,
private val channel: ReceiveChannel<HttpObject>,
) : HttpResponseStream {
private var buffer: ByteBuffer? = if (response is HttpContent) {
NettyByteBuffer(response.content())
} else {
null
}
private suspend fun buffer(): ByteBuffer? {
if (buffer == null || buffer?.readable == 0) {
val receive = channel.receiveCatching()
buffer = if (receive.isSuccess) {
val content = receive.getOrThrow() as HttpContent
NettyByteBuffer(content.content())
} else {
val e = receive.exceptionOrNull()
if (e != null) {
throw e
}
null
}
}
return buffer
}
override suspend fun skip(n: Long) {
var skip = 0L
while (skip < n) {
val buffer = buffer() ?: return
skip += buffer.skip((n - skip).toInt())
}
}
override suspend fun read(): Int {
val buffer = buffer() ?: return -1
return buffer.get().toInt()
}
override suspend fun read(buffer: ByteBuffer) {
val buf = buffer() ?: return
buf.writeTo(buffer)
}
override fun close() {
channel.cancel()
}
}
}

View File

@ -0,0 +1,40 @@
package cn.tursom.web.client.netty
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.HttpObject
import io.netty.handler.codec.http.LastHttpContent
import io.netty.util.AttributeKey
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.trySendBlocking
import java.util.concurrent.atomic.AtomicInteger
@ChannelHandler.Sharable
object NettyHttpResultResume : SimpleChannelInboundHandler<HttpObject>() {
val recvChannelKey = AttributeKey.newInstance<SendChannel<HttpObject>>("recvChannelKey")!!
val countKey = AttributeKey.newInstance<AtomicInteger>("countKey")!!
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) {
val channel = ctx.channel().attr(recvChannelKey).get() ?: return
val send = channel.trySendBlocking(msg)
if (send.isFailure || msg is LastHttpContent) {
channel.close()
ctx.channel().attr(recvChannelKey).set(null)
}
}
@Suppress("OVERRIDE_DEPRECATION", "DEPRECATION")
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
val channel = ctx.channel().attr(recvChannelKey).get()
if (channel == null) {
super.exceptionCaught(ctx, cause)
return
}
channel.close(cause)
}
override fun channelInactive(ctx: ChannelHandlerContext) {
ctx.channel().attr(countKey).get()?.decrementAndGet()
super.channelInactive(ctx)
}
}

View File

@ -0,0 +1,27 @@
package cn.tursom.web.client.netty
import io.netty.util.concurrent.Future
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
suspend fun <T> Future<T>.awaitSuspend() = suspendCoroutine<T> { cont ->
addListener {
if (isSuccess) {
cont.resume(now)
} else {
cont.resumeWithException(cause())
}
}
}
suspend fun <T> Future<T>.awaitCancelable() = suspendCancellableCoroutine<T> { cont ->
addListener {
if (isSuccess) {
cont.resume(now)
} else {
cont.resumeWithException(cause())
}
}
}