From 4c18fa6b0202a3320b58c830cfa98be35e2642f9 Mon Sep 17 00:00:00 2001 From: tursom Date: Sun, 10 Apr 2022 17:00:19 +0800 Subject: [PATCH] update --- settings.gradle.kts | 1 + .../src/main/kotlin/cn/tursom/core/utils.kt | 3 +- .../cn/tursom/http/client/AsyncHttpRequest.kt | 32 ++--- .../kotlin/cn/tursom/core/ByteBufferUtil.kt | 58 +++++++++ .../cn/tursom/core/HeapByteBufferUtil.kt | 22 ---- .../cn/tursom/core/buffer/ByteBuffer.kt | 5 +- .../core/buffer/impl/DirectByteBuffer.kt | 6 + .../tursom/core/buffer/impl/HeapByteBuffer.kt | 16 ++- ts-web/build.gradle.kts | 2 + .../web/client/BrotliHttpResponseStream.kt | 42 +++++++ .../tursom/web/client/ChannelHttpResponse.kt | 61 +++++++++ .../web/client/GzipHttpResponseStream.kt | 43 +++++++ .../kotlin/cn/tursom/web/client/HttpClient.kt | 2 +- .../cn/tursom/web/client/HttpRequest.kt | 5 +- .../cn/tursom/web/client/HttpResponse.kt | 2 +- .../tursom/web/client/HttpResponseStream.kt | 22 +++- ts-web/ts-web-netty-client/build.gradle.kts | 21 ++++ .../tursom/web/client/netty/HttpExecutor.kt | 2 + .../web/client/netty/NettyHttpClient.kt | 3 +- .../web/client/netty/NettyHttpRequest.kt | 3 +- .../web/client/netty/NettyHttpResponse.kt | 63 ++++------ .../web/client/netty/NettyHttpResultResume.kt | 2 +- .../web/client/netty/NettyHttpClientTest.kt | 22 ++++ ts-web/ts-web-okhttp/build.gradle.kts | 21 ++++ .../web/client/okhttp/OkhttpHttpClient.kt | 37 ++++++ .../web/client/okhttp/OkhttpHttpRequest.kt | 116 ++++++++++++++++++ .../web/client/okhttp/OkhttpHttpResponse.kt | 26 ++++ .../web/client/okhttp/OkhttpResponseStream.kt | 56 +++++++++ .../web/client/okhttp/OkhttpHttpClientTest.kt | 18 +++ 29 files changed, 619 insertions(+), 93 deletions(-) create mode 100644 ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/ByteBufferUtil.kt delete mode 100644 ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/HeapByteBufferUtil.kt create mode 100644 ts-web/src/main/kotlin/cn/tursom/web/client/BrotliHttpResponseStream.kt create mode 100644 ts-web/src/main/kotlin/cn/tursom/web/client/ChannelHttpResponse.kt create mode 100644 ts-web/src/main/kotlin/cn/tursom/web/client/GzipHttpResponseStream.kt create mode 100644 ts-web/ts-web-netty-client/src/test/kotlin/cn/tursom/web/client/netty/NettyHttpClientTest.kt create mode 100644 ts-web/ts-web-okhttp/build.gradle.kts create mode 100644 ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpClient.kt create mode 100644 ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpRequest.kt create mode 100644 ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpResponse.kt create mode 100644 ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpResponseStream.kt create mode 100644 ts-web/ts-web-okhttp/src/test/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpClientTest.kt diff --git a/settings.gradle.kts b/settings.gradle.kts index 0c36add..db8c42a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -27,6 +27,7 @@ include("ts-socket") include("ts-web") include("ts-web:ts-web-netty") include("ts-web:ts-web-netty-client") +include("ts-web:ts-web-okhttp") include("ts-web:ts-web-coroutine") include("ts-database") include("ts-database:ts-ktorm") diff --git a/ts-core/src/main/kotlin/cn/tursom/core/utils.kt b/ts-core/src/main/kotlin/cn/tursom/core/utils.kt index 3bd5fe9..b28fb07 100644 --- a/ts-core/src/main/kotlin/cn/tursom/core/utils.kt +++ b/ts-core/src/main/kotlin/cn/tursom/core/utils.kt @@ -430,7 +430,8 @@ fun String.base62Decode(): Long { fun Any.toJson(): String = Utils.gson.toJson(this) fun Any.toPrettyJson(): String = Utils.prettyGson.toJson(this) -inline fun String.fromJson(): T = Utils.gson.fromJson(this, T::class.java) +inline fun String.fromJson(): T = Utils.gson.fromJson(this) +inline fun String.fromJsonTyped(): T = Utils.gson.fromJsonTyped(this) fun Any.serialize(): ByteArray { val outputStream = ByteArrayOutputStream() diff --git a/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/AsyncHttpRequest.kt b/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/AsyncHttpRequest.kt index 725f488..d336022 100644 --- a/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/AsyncHttpRequest.kt +++ b/ts-core/ts-async-http/src/main/kotlin/cn/tursom/http/client/AsyncHttpRequest.kt @@ -32,7 +32,7 @@ object AsyncHttpRequest { url: String, param: Map? = null, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ): Response = client.get(url, param, headers) @JvmOverloads @@ -42,7 +42,7 @@ object AsyncHttpRequest { url: String, body: RequestBody, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ): Response = client.post(url, body, headers) @JvmOverloads @@ -52,7 +52,7 @@ object AsyncHttpRequest { url: String, param: Map, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ): Response = client.post(url, param, headers) @JvmOverloads @@ -62,7 +62,7 @@ object AsyncHttpRequest { url: String, body: String, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ) = client.post(url, body, headers) @JvmOverloads @@ -72,7 +72,7 @@ object AsyncHttpRequest { url: String, body: File, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ) = client.post(url, body, headers) @JvmOverloads @@ -82,7 +82,7 @@ object AsyncHttpRequest { url: String, body: ByteArray, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ) = client.post(url, body, headers) @Suppress("BlockingMethodInNonBlockingContext") @@ -93,7 +93,7 @@ object AsyncHttpRequest { url: String, param: Map? = null, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ): String = client.getStr(url, param, headers) @Suppress("BlockingMethodInNonBlockingContext") @@ -104,7 +104,7 @@ object AsyncHttpRequest { url: String, body: RequestBody, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ): String = client.postStr(url, body, headers) @JvmOverloads @@ -114,7 +114,7 @@ object AsyncHttpRequest { url: String, param: Map, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ): String = client.postStr(url, param, headers) @JvmOverloads @@ -124,7 +124,7 @@ object AsyncHttpRequest { url: String, body: String, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ): String = client.postStr(url, body, headers) @JvmOverloads @@ -134,7 +134,7 @@ object AsyncHttpRequest { url: String, body: File, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ): String = client.postStr(url, body, headers) @Suppress("BlockingMethodInNonBlockingContext") @@ -145,7 +145,7 @@ object AsyncHttpRequest { url: String, param: Map? = null, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ): ByteArray = client.getByteArray(url, param, headers) @@ -157,7 +157,7 @@ object AsyncHttpRequest { url: String, body: RequestBody, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ): ByteArray = client.postByteArray(url, body, headers) @JvmOverloads @@ -167,7 +167,7 @@ object AsyncHttpRequest { url: String, param: Map, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ): ByteArray = client.postByteArray(url, param, headers) @JvmOverloads @@ -177,7 +177,7 @@ object AsyncHttpRequest { url: String, body: String, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ): ByteArray = client.postByteArray(url, body, headers) @JvmOverloads @@ -187,6 +187,6 @@ object AsyncHttpRequest { url: String, body: File, headers: Map? = null, - client: OkHttpClient = defaultClient, + client: OkHttpClient = Okhttp.default, ): ByteArray = client.postByteArray(url, body, headers) } \ No newline at end of file diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/ByteBufferUtil.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/ByteBufferUtil.kt new file mode 100644 index 0000000..9932d4c --- /dev/null +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/ByteBufferUtil.kt @@ -0,0 +1,58 @@ +package cn.tursom.core + +import cn.tursom.core.buffer.impl.DirectByteBuffer +import cn.tursom.core.buffer.impl.HeapByteBuffer +import java.nio.ByteBuffer + +/** + * hack java.nio.HeapByteBuffer + */ +object ByteBufferUtil { + private val field = ByteBuffer::class.java.getDeclaredField("offset") + private val bufferWrapper = ArrayList<(ByteBuffer, Boolean) -> cn.tursom.core.buffer.ByteBuffer?>() + val empty: cn.tursom.core.buffer.ByteBuffer = HeapByteBuffer(0) + + init { + field.isAccessible = true + addWrapper { it, write -> + if (!it.hasArray()) { + null + } else { + HeapByteBuffer(it, write) + } + } + + addWrapper { it, write -> + if (it.hasArray()) { + null + } else { + DirectByteBuffer(it, write) + } + } + } + + fun addWrapper(wrapper: (ByteBuffer, Boolean) -> cn.tursom.core.buffer.ByteBuffer?) { + bufferWrapper.add(wrapper) + } + + fun wrap(byteBuffer: ByteBuffer, write: Boolean = true): cn.tursom.core.buffer.ByteBuffer { + bufferWrapper.forEach { wrapper -> + return wrapper(byteBuffer, write) ?: return@forEach + } + val buffer = HeapByteBuffer(byteBuffer.limit() - byteBuffer.position()) + if (!write) { + buffer.writeBuffer { + it.put(byteBuffer) + } + } + return buffer + } + + fun wrap(array: ByteArray, offset: Int = 0, size: Int = array.size - offset): ByteBuffer { + val buffer = ByteBuffer.wrap(array, 0, offset + size) + if (offset > 0) field.set(buffer, offset) + return buffer + } + + fun wrap(string: String) = wrap(string.toByteArray()) +} \ No newline at end of file diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/HeapByteBufferUtil.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/HeapByteBufferUtil.kt deleted file mode 100644 index d5e8feb..0000000 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/HeapByteBufferUtil.kt +++ /dev/null @@ -1,22 +0,0 @@ -package cn.tursom.core - -import java.nio.ByteBuffer - -/** - * hack java.nio.HeapByteBuffer - */ -object HeapByteBufferUtil { - private val field = ByteBuffer::class.java.getDeclaredField("offset") - - init { - field.isAccessible = true - } - - fun wrap(array: ByteArray, offset: Int = 0, size: Int = array.size - offset): ByteBuffer { - val buffer = ByteBuffer.wrap(array, 0, offset + size) - if (offset > 0) field.set(buffer, offset) - return buffer - } - - fun wrap(string: String) = wrap(string.toByteArray()) -} \ No newline at end of file diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt index a996579..4ccc7fd 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/ByteBuffer.kt @@ -182,8 +182,9 @@ interface ByteBuffer : Closeable { val buffer = buffer ?: bufferThreadLocal.get() read { while (it.remaining() > 0) { - it.put(buffer) - os.write(buffer) + val min = min(it.remaining(), buffer.size) + it.get(buffer, 0, min) + os.write(buffer, 0, min) } } } diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/DirectByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/DirectByteBuffer.kt index 432186a..34cbcf1 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/DirectByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/DirectByteBuffer.kt @@ -9,6 +9,12 @@ class DirectByteBuffer( ) : ByteBuffer { constructor(size: Int) : this(java.nio.ByteBuffer.allocateDirect(size)) + constructor(buffer: java.nio.ByteBuffer, write: Boolean) : this( + buffer, + if (write) 0 else buffer.position(), + if (write) buffer.position() else buffer.limit() + ) + override val hasArray: Boolean = false override val array: ByteArray get() = buffer.array() override val capacity: Int get() = buffer.capacity() diff --git a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/HeapByteBuffer.kt b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/HeapByteBuffer.kt index 6fd1e77..eb44a68 100644 --- a/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/HeapByteBuffer.kt +++ b/ts-core/ts-buffer/src/main/kotlin/cn/tursom/core/buffer/impl/HeapByteBuffer.kt @@ -1,6 +1,6 @@ package cn.tursom.core.buffer.impl -import cn.tursom.core.HeapByteBufferUtil +import cn.tursom.core.ByteBufferUtil import cn.tursom.core.buffer.ByteBuffer class HeapByteBuffer( @@ -11,7 +11,19 @@ class HeapByteBuffer( constructor(size: Int) : this(java.nio.ByteBuffer.allocate(size)) constructor(string: String) : this(string.toByteArray()) constructor(bytes: ByteArray, offset: Int = 0, size: Int = bytes.size - offset) - : this(HeapByteBufferUtil.wrap(bytes, offset, size), offset, offset + size) + : this(ByteBufferUtil.wrap(bytes, offset, size), offset, offset + size) + + constructor(buffer: java.nio.ByteBuffer, write: Boolean) : this( + buffer, + if (write) 0 else buffer.position(), + if (write) buffer.position() else buffer.limit() + ) + + constructor(bytes: ByteArray, write: Boolean) : this( + java.nio.ByteBuffer.wrap(bytes), + 0, + if (write) 0 else bytes.size + ) init { assert(buffer.hasArray()) diff --git a/ts-web/build.gradle.kts b/ts-web/build.gradle.kts index 81ccd1f..3e69c8c 100644 --- a/ts-web/build.gradle.kts +++ b/ts-web/build.gradle.kts @@ -8,6 +8,8 @@ dependencies { implementation(project(":ts-core")) implementation(project(":ts-core:ts-buffer")) implementation(project(":ts-core:ts-datastruct")) + compileOnly(project(":ts-core:ts-coroutine")) compileOnly(project(":ts-core:ts-json")) + compileOnly(group = "com.aayushatharva.brotli4j", name = "brotli4j", version = "1.7.1") implementation(group = "org.slf4j", name = "slf4j-api", version = "1.7.32") } diff --git a/ts-web/src/main/kotlin/cn/tursom/web/client/BrotliHttpResponseStream.kt b/ts-web/src/main/kotlin/cn/tursom/web/client/BrotliHttpResponseStream.kt new file mode 100644 index 0000000..ba85bc1 --- /dev/null +++ b/ts-web/src/main/kotlin/cn/tursom/web/client/BrotliHttpResponseStream.kt @@ -0,0 +1,42 @@ +package cn.tursom.web.client + +import cn.tursom.core.ByteBufferUtil +import cn.tursom.core.coroutine.GlobalScope +import com.aayushatharva.brotli4j.decoder.DecoderJNI +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.produce + +class BrotliHttpResponseStream( + private val stream: HttpResponseStream, + inputBufferSize: Int = 8 * 1024, +) : ChannelHttpResponse() { + private val decoder = DecoderJNI.Wrapper(inputBufferSize) + + @OptIn(ExperimentalCoroutinesApi::class) + override val bufferChannel = GlobalScope.produce { + while (true) { + val input = stream.buffer() ?: return@produce + when (decoder.status) { + DecoderJNI.Status.DONE -> return@produce + DecoderJNI.Status.OK -> decoder.push(0) + DecoderJNI.Status.NEEDS_MORE_INPUT -> { + if (decoder.hasOutput()) { + val buffer = decoder.pull() + send(ByteBufferUtil.wrap(buffer, false)) + } + val decoderInputBuffer = decoder.inputBuffer + decoderInputBuffer.clear() + input.writeTo(ByteBufferUtil.wrap(decoderInputBuffer, true)) + //decoderInputBuffer.put(input.getBytes(decoderInputBuffer.limit() - decoderInputBuffer.position())) + decoder.push(decoderInputBuffer.position()) + } + DecoderJNI.Status.NEEDS_MORE_OUTPUT -> { + val buffer = decoder.pull() ?: continue + send(ByteBufferUtil.wrap(buffer, false)) + } + else -> return@produce + } + } + } +} + diff --git a/ts-web/src/main/kotlin/cn/tursom/web/client/ChannelHttpResponse.kt b/ts-web/src/main/kotlin/cn/tursom/web/client/ChannelHttpResponse.kt new file mode 100644 index 0000000..dd16103 --- /dev/null +++ b/ts-web/src/main/kotlin/cn/tursom/web/client/ChannelHttpResponse.kt @@ -0,0 +1,61 @@ +package cn.tursom.web.client + +import cn.tursom.core.buffer.ByteBuffer +import kotlinx.coroutines.channels.ReceiveChannel +import java.io.ByteArrayOutputStream + + +abstract class ChannelHttpResponse : HttpResponseStream { + protected abstract val bufferChannel: ReceiveChannel + private var buffer: ByteBuffer? = null + + override suspend fun buffer(): ByteBuffer? { + while (buffer == null || buffer?.readable == 0) { + buffer?.close() + val receive = bufferChannel.receiveCatching() + buffer = if (receive.isSuccess) { + receive.getOrThrow() + } else { + val e = receive.exceptionOrNull() + if (e != null) { + throw e + } + return null + } + } + return buffer + } + + override suspend fun skip(n: Long): Long { + var skip = 0L + while (skip < n) { + val buffer = buffer() ?: return skip + skip += buffer.skip((n - skip).toInt()) + } + return skip + } + + override suspend fun read(): Int { + val buffer = buffer() ?: return -1 + return buffer.get().toInt() + } + + override suspend fun read(buffer: ByteBuffer) { + val buf = buffer() ?: return + buf.writeTo(buffer) + } + + override fun close() { + bufferChannel.cancel() + } + + override suspend fun readBytes(): ByteArray { + val os = ByteArrayOutputStream() + var buf = buffer() + while (buf != null) { + buf.writeTo(os) + buf = buffer() + } + return os.toByteArray() + } +} \ No newline at end of file diff --git a/ts-web/src/main/kotlin/cn/tursom/web/client/GzipHttpResponseStream.kt b/ts-web/src/main/kotlin/cn/tursom/web/client/GzipHttpResponseStream.kt new file mode 100644 index 0000000..6f0a18a --- /dev/null +++ b/ts-web/src/main/kotlin/cn/tursom/web/client/GzipHttpResponseStream.kt @@ -0,0 +1,43 @@ +package cn.tursom.web.client + +import cn.tursom.core.ByteBufferUtil +import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.impl.HeapByteBuffer +import cn.tursom.core.coroutine.GlobalScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.produce +import java.io.InputStream +import java.util.zip.GZIPInputStream + +//TODO impl +class GzipHttpResponseStream( + private val stream: HttpResponseStream, +) : ChannelHttpResponse() { + private class ByteByfferInputStream( + var byteBuffer: ByteBuffer, + ) : InputStream() { + override fun read(): Int { + if (byteBuffer.readable == 0) { + return -1 + } + return byteBuffer.get().toInt() + } + + override fun read(b: ByteArray): Int = byteBuffer.writeTo(b) + override fun read(b: ByteArray, off: Int, len: Int): Int = byteBuffer.writeTo(b, off, len) + } + + private val inputStream = ByteByfferInputStream(ByteBufferUtil.empty) + + @Suppress("BlockingMethodInNonBlockingContext") + @OptIn(ExperimentalCoroutinesApi::class) + override val bufferChannel: ReceiveChannel = GlobalScope.produce { + val gzip = GZIPInputStream(inputStream) + while (true) { + inputStream.byteBuffer = stream.buffer() ?: return@produce + val bytes = gzip.readBytes() + send(HeapByteBuffer(bytes, false)) + } + } +} diff --git a/ts-web/src/main/kotlin/cn/tursom/web/client/HttpClient.kt b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpClient.kt index 2658ae6..ff37be9 100644 --- a/ts-web/src/main/kotlin/cn/tursom/web/client/HttpClient.kt +++ b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpClient.kt @@ -1,5 +1,5 @@ package cn.tursom.web.client interface HttpClient { - suspend fun request(method: String, url: String, ssl: Boolean? = null): HttpRequest + suspend fun request(method: String, url: String): HttpRequest } diff --git a/ts-web/src/main/kotlin/cn/tursom/web/client/HttpRequest.kt b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpRequest.kt index c55d709..f7535ee 100644 --- a/ts-web/src/main/kotlin/cn/tursom/web/client/HttpRequest.kt +++ b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpRequest.kt @@ -14,9 +14,10 @@ interface HttpRequest { } val headers: Iterable> - fun addHeader(key: String, value: Any) - fun addHeaders(headers: Map) { + fun addHeader(key: String, value: Any): HttpRequest + fun addHeaders(headers: Map): HttpRequest { headers.forEach(::addHeader) + return this } fun body(data: ByteBuffer) diff --git a/ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponse.kt b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponse.kt index 15085e2..fa8c941 100644 --- a/ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponse.kt +++ b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponse.kt @@ -3,7 +3,7 @@ package cn.tursom.web.client interface HttpResponse { val code: Int val reason: String - val headers: Iterable> + val headers: Iterable> fun getHeader(key: String): String? fun getHeaders(key: String): List val body: HttpResponseStream diff --git a/ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponseStream.kt b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponseStream.kt index 9d33f6f..3cc17ce 100644 --- a/ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponseStream.kt +++ b/ts-web/src/main/kotlin/cn/tursom/web/client/HttpResponseStream.kt @@ -2,10 +2,15 @@ package cn.tursom.web.client import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.impl.HeapByteBuffer +import cn.tursom.core.fromJson +import cn.tursom.core.fromJsonTyped +import cn.tursom.core.toUTF8String +import java.io.ByteArrayOutputStream import java.io.Closeable interface HttpResponseStream : Closeable { - suspend fun skip(n: Long) + suspend fun buffer(): ByteBuffer? + suspend fun skip(n: Long): Long suspend fun read(): Int suspend fun read(buffer: ByteBuffer) suspend fun read( @@ -18,4 +23,19 @@ interface HttpResponseStream : Closeable { read(byteBuffer) return byteBuffer.writePosition } + + suspend fun readBytes(): ByteArray { + val os = ByteArrayOutputStream() + val buffer = ByteArray(1024) + do { + val read = read(buffer) + os.write(buffer, 0, read) + } while (read != 0) + return os.toByteArray() + } + + suspend fun string() = readBytes().toUTF8String() } + +suspend inline fun HttpResponseStream.json(): T = string().fromJson() +suspend inline fun HttpResponseStream.jsonGeneric(): T = string().fromJsonTyped() diff --git a/ts-web/ts-web-netty-client/build.gradle.kts b/ts-web/ts-web-netty-client/build.gradle.kts index 6408b98..2608736 100644 --- a/ts-web/ts-web-netty-client/build.gradle.kts +++ b/ts-web/ts-web-netty-client/build.gradle.kts @@ -4,15 +4,36 @@ plugins { id("ts-gradle") } +val brotliVersion = "1.7.1" +val operatingSystem: OperatingSystem = + org.gradle.nativeplatform.platform.internal.DefaultNativePlatform.getCurrentOperatingSystem() + dependencies { api(project(":ts-core")) api(project(":ts-core:ts-buffer")) api(project(":ts-core:ts-log")) api(project(":ts-web")) api(project(":ts-web:ts-web-netty")) + api(project(":ts-core:ts-coroutine")) api(group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version = "1.6.0") api(group = "io.netty", name = "netty-all", version = "4.1.72.Final") api(group = "org.slf4j", name = "slf4j-api", version = "1.7.32") + implementation(group = "io.netty", name = "netty-tcnative-boringssl-static", version = "2.0.46.Final") + + testApi(group = "junit", name = "junit", version = "4.13.2") + testImplementation(group = "com.aayushatharva.brotli4j", name = "brotli4j", version = brotliVersion) + testImplementation( + group = "com.aayushatharva.brotli4j", + name = "native-${ + if (operatingSystem.isWindows) "windows-x86_64" + else if (operatingSystem.isMacOsX) "osx-x86_64" + else if (operatingSystem.isLinux) + if (org.gradle.nativeplatform.platform.internal.DefaultNativePlatform.getCurrentArchitecture().isArm) "linux-aarch64" + else "native-linux-x86_64" + else "" + }", + version = brotliVersion + ) } diff --git a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/HttpExecutor.kt b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/HttpExecutor.kt index 63c4390..488651c 100644 --- a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/HttpExecutor.kt +++ b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/HttpExecutor.kt @@ -7,6 +7,7 @@ import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioSocketChannel import io.netty.handler.codec.http.HttpClientCodec +import io.netty.handler.codec.http.HttpContentDecompressor import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.util.InsecureTrustManagerFactory @@ -35,6 +36,7 @@ object HttpExecutor { addLast(sslCtx.newHandler(ch.alloc(), host, port)) } addLast(HttpClientCodec()) + addLast(HttpContentDecompressor()) } initChannel(ch) } diff --git a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpClient.kt b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpClient.kt index 88641f8..6a68f08 100644 --- a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpClient.kt +++ b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpClient.kt @@ -5,7 +5,7 @@ import cn.tursom.web.client.HttpRequest import java.net.URI open class NettyHttpClient : HttpClient { - override suspend fun request(method: String, url: String, ssl: Boolean?): HttpRequest { + override suspend fun request(method: String, url: String): HttpRequest { val uri = URI.create(url) val port = if (uri.port < 0) { when (uri.scheme ?: "http") { @@ -16,7 +16,6 @@ open class NettyHttpClient : HttpClient { } else { uri.port } - val pool = HttpConnectionPool.poolOf(uri.host, port, uri.scheme == "https") val request = NettyHttpRequest(pool) request.method = method diff --git a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpRequest.kt b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpRequest.kt index e0b073f..47907b4 100644 --- a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpRequest.kt +++ b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpRequest.kt @@ -43,8 +43,9 @@ class NettyHttpRequest( override val headers: Iterable> get() = request.headers() - override fun addHeader(key: String, value: Any) { + override fun addHeader(key: String, value: Any): NettyHttpRequest { request.headers().add(key, value) + return this } override fun body(data: ByteBuffer) { diff --git a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResponse.kt b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResponse.kt index 9d67031..55fd500 100644 --- a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResponse.kt +++ b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResponse.kt @@ -1,12 +1,15 @@ package cn.tursom.web.client.netty -import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.impl.NettyByteBuffer +import cn.tursom.core.coroutine.GlobalScope +import cn.tursom.web.client.ChannelHttpResponse import cn.tursom.web.client.HttpResponse import cn.tursom.web.client.HttpResponseStream import io.netty.handler.codec.http.HttpContent import io.netty.handler.codec.http.HttpObject +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.produce class NettyHttpResponse( private val response: io.netty.handler.codec.http.HttpResponse, @@ -16,60 +19,38 @@ class NettyHttpResponse( get() = response.status().code() override val reason: String get() = response.status().reasonPhrase() - override val headers get() = response.headers()!! + override val headers by lazy { response.headers().map { (k, v) -> k to v } } override fun getHeader(key: String): String? = response.headers().get(key) override fun getHeaders(key: String): List = response.headers().getAll(key) - override val body: HttpResponseStream = NettyStream(response, channel) + override val body: HttpResponseStream + init { + body = NettyStream(response, channel) + } + + @OptIn(ExperimentalCoroutinesApi::class) private class NettyStream( response: HttpObject, private val channel: ReceiveChannel, - ) : HttpResponseStream { - private var buffer: ByteBuffer? = if (response is HttpContent) { - NettyByteBuffer(response.content()) - } else { - null - } - - private suspend fun buffer(): ByteBuffer? { - if (buffer == null || buffer?.readable == 0) { - val receive = channel.receiveCatching() - buffer = if (receive.isSuccess) { + ) : ChannelHttpResponse() { + override val bufferChannel = GlobalScope.produce { + if (response is HttpContent) { + send(NettyByteBuffer(response.content())) + } + do { + val receive = this@NettyStream.channel.receiveCatching() + if (receive.isSuccess) { val content = receive.getOrThrow() as HttpContent - NettyByteBuffer(content.content()) + send(NettyByteBuffer(content.content())) } else { val e = receive.exceptionOrNull() if (e != null) { - throw e + close(e) } - null } - } - return buffer - } - - override suspend fun skip(n: Long) { - var skip = 0L - while (skip < n) { - val buffer = buffer() ?: return - skip += buffer.skip((n - skip).toInt()) - } - } - - override suspend fun read(): Int { - val buffer = buffer() ?: return -1 - return buffer.get().toInt() - } - - override suspend fun read(buffer: ByteBuffer) { - val buf = buffer() ?: return - buf.writeTo(buffer) - } - - override fun close() { - channel.cancel() + } while (receive.isSuccess) } } } \ No newline at end of file diff --git a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResultResume.kt b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResultResume.kt index c2047cd..9735e31 100644 --- a/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResultResume.kt +++ b/ts-web/ts-web-netty-client/src/main/kotlin/cn/tursom/web/client/netty/NettyHttpResultResume.kt @@ -11,7 +11,7 @@ import kotlinx.coroutines.channels.trySendBlocking import java.util.concurrent.atomic.AtomicInteger @ChannelHandler.Sharable -object NettyHttpResultResume : SimpleChannelInboundHandler() { +object NettyHttpResultResume : SimpleChannelInboundHandler(false) { val recvChannelKey = AttributeKey.newInstance>("recvChannelKey")!! val countKey = AttributeKey.newInstance("countKey")!! override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) { diff --git a/ts-web/ts-web-netty-client/src/test/kotlin/cn/tursom/web/client/netty/NettyHttpClientTest.kt b/ts-web/ts-web-netty-client/src/test/kotlin/cn/tursom/web/client/netty/NettyHttpClientTest.kt new file mode 100644 index 0000000..f7eefca --- /dev/null +++ b/ts-web/ts-web-netty-client/src/test/kotlin/cn/tursom/web/client/netty/NettyHttpClientTest.kt @@ -0,0 +1,22 @@ +package cn.tursom.web.client.netty + +import io.netty.handler.codec.compression.Brotli +import kotlinx.coroutines.runBlocking +import org.junit.Test + +internal class NettyHttpClientTest { + private val client = NettyHttpClient() + + @Test + fun request() { + Brotli.ensureAvailability() + runBlocking { + val request = client.request("GET", "https://cdn.segmentfault.com/r-e032f7ee/umi.js") + .addHeader("accept-encoding", "br") + request.path = "https://cdn.segmentfault.com/r-e032f7ee/umi.js" + val response = request.send() + println(response.body + .string()) + } + } +} \ No newline at end of file diff --git a/ts-web/ts-web-okhttp/build.gradle.kts b/ts-web/ts-web-okhttp/build.gradle.kts new file mode 100644 index 0000000..96b15fb --- /dev/null +++ b/ts-web/ts-web-okhttp/build.gradle.kts @@ -0,0 +1,21 @@ +plugins { + kotlin("jvm") + `maven-publish` + id("ts-gradle") +} + +dependencies { + api(project(":ts-core")) + api(project(":ts-core:ts-buffer")) + api(project(":ts-core:ts-log")) + api(project(":ts-web")) + api(group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version = "1.6.0") + api(group = "com.squareup.okhttp3", name = "okhttp", version = "4.9.3") + api(group = "io.netty", name = "netty-all", version = "4.1.72.Final") + api(group = "org.slf4j", name = "slf4j-api", version = "1.7.32") + testApi(group = "junit", name = "junit", version = "4.13.2") +} + + + + diff --git a/ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpClient.kt b/ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpClient.kt new file mode 100644 index 0000000..6cfabc0 --- /dev/null +++ b/ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpClient.kt @@ -0,0 +1,37 @@ +package cn.tursom.web.client.okhttp + +import cn.tursom.web.client.HttpClient +import okhttp3.OkHttpClient +import java.net.InetSocketAddress +import java.net.Proxy +import java.net.SocketAddress + +class OkhttpHttpClient( + private val client: OkHttpClient, +) : HttpClient { + companion object { + val direct = OkhttpHttpClient(OkHttpClient().newBuilder() + .retryOnConnectionFailure(true) + .build()) + val socket = proxy() + val httpProxy = proxy(port = 8080, type = Proxy.Type.HTTP) + + var default = direct + + @JvmOverloads + fun proxy( + host: String = "127.0.0.1", + port: Int = 1080, + type: Proxy.Type = Proxy.Type.SOCKS, + builder: OkHttpClient.Builder = OkHttpClient().newBuilder(), + ) = OkhttpHttpClient(builder + .proxy(Proxy(type, InetSocketAddress(host, port) as SocketAddress)) + .retryOnConnectionFailure(true) + .build()) + } + + override suspend fun request(method: String, url: String): OkhttpHttpRequest { + return OkhttpHttpRequest(client, method, url) + } +} + diff --git a/ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpRequest.kt b/ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpRequest.kt new file mode 100644 index 0000000..5513222 --- /dev/null +++ b/ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpRequest.kt @@ -0,0 +1,116 @@ +package cn.tursom.web.client.okhttp + +import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.web.client.HttpRequest +import okhttp3.* +import okhttp3.RequestBody.Companion.toRequestBody +import java.io.IOException +import java.net.URL +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +class OkhttpHttpRequest( + private val client: OkHttpClient, + override var method: String, + url: String, +) : HttpRequest { + override var version: String = "" + private val protocol: String + private val host: String + private val port: Int + override var path: String = "" + set(value) { + field = if (value.startsWith('/')) { + value + } else { + "/$value" + } + } + override val params = HashMap>() + private var ref: String? + private var body: ByteBuffer? = null + + init { + val url = URL(url) + host = url.host + port = url.port + path = url.path + protocol = url.protocol + url.query?.splitToSequence('&')?.forEach { query -> + val i = query.indexOf('=') + if (i <= 0) { + return@forEach + } + val key = query.substring(0, i) + val value = query.substring(i + 1) + addParam(key, value) + } + ref = url.ref + } + + private val portStr: String + get() = if (port <= 0) { + "" + } else { + ":$port" + } + + private val paramStr: String + get() = buildString { + params.forEach { (k, list) -> + list.forEach { v -> + if (isNotEmpty()) { + append('&') + } + append("$k=$v") + } + } + } + + override fun addParam(key: String, value: String) { + params.getOrPut(key) { ArrayList() }.add(value) + } + + data class Header(override val key: String, override val value: String) : Map.Entry + + override val headers = ArrayList
() + + override fun addHeader(key: String, value: Any): OkhttpHttpRequest { + headers.add(Header(key, value.toString())) + return this + } + + override fun body(data: ByteBuffer) { + body = data + } + + override suspend fun send(): OkhttpHttpResponse { + val builder = Request.Builder() + .method(method, body?.getBytes()?.toRequestBody()) + .url(buildString { + append("$protocol://$host$portStr$path") + val paramStr = paramStr + if (paramStr.isNotEmpty()) { + append("?$paramStr") + } + if (ref != null) { + append("#$ref") + } + }) + headers.forEach { (key, value) -> + builder.addHeader(key, value) + } + return OkhttpHttpResponse(suspendCoroutine { + client.newCall(builder.build()).enqueue(object : Callback { + override fun onFailure(call: Call, e: IOException) { + it.resumeWithException(e) + } + + override fun onResponse(call: Call, response: Response) { + it.resume(response) + } + }) + }) + } +} diff --git a/ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpResponse.kt b/ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpResponse.kt new file mode 100644 index 0000000..7831284 --- /dev/null +++ b/ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpResponse.kt @@ -0,0 +1,26 @@ +package cn.tursom.web.client.okhttp + +import cn.tursom.web.client.HttpResponse +import okhttp3.Response + +class OkhttpHttpResponse( + val response: Response, +) : HttpResponse { + override val code: Int + get() = response.code + override val reason: String + get() = response.message + override val headers: Iterable> + get() = response.headers + + override fun getHeader(key: String): String? { + return response.headers[key] + } + + override fun getHeaders(key: String): List { + return response.headers.filter { it.first == key }.map { it.second } + } + + override val body: OkhttpResponseStream + get() = OkhttpResponseStream(response.body) +} diff --git a/ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpResponseStream.kt b/ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpResponseStream.kt new file mode 100644 index 0000000..d44d61b --- /dev/null +++ b/ts-web/ts-web-okhttp/src/main/kotlin/cn/tursom/web/client/okhttp/OkhttpResponseStream.kt @@ -0,0 +1,56 @@ +package cn.tursom.web.client.okhttp + +import cn.tursom.core.buffer.ByteBuffer +import cn.tursom.core.buffer.impl.HeapByteBuffer +import cn.tursom.web.client.HttpResponseStream +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext +import okhttp3.ResponseBody + +class OkhttpResponseStream( + val body: ResponseBody?, +) : HttpResponseStream { + override suspend fun buffer(): ByteBuffer? { + val stream = body?.byteStream() ?: return null + val buffer = HeapByteBuffer(1024) + val read = withContext(Dispatchers.IO) { + stream.read(buffer.array, buffer.writeOffset, buffer.writeable) + } + if (read == 0) { + return null + } + buffer.writePosition += read + return buffer + } + + override suspend fun skip(n: Long) = withContext(Dispatchers.IO) { + body?.byteStream()?.skip(n) ?: 0 + } + + override suspend fun read(): Int = withContext(Dispatchers.IO) { + body?.byteStream()?.read() ?: -1 + } + + override suspend fun read(buffer: ByteBuffer) { + body ?: return + withContext(Dispatchers.IO) { + buffer.put(body.byteStream()) + } + } + + override fun close() { + body?.close() + } + + override suspend fun readBytes(): ByteArray { + return withContext(Dispatchers.IO) { + body?.bytes() ?: ByteArray(0) + } + } + + override suspend fun string(): String { + return withContext(Dispatchers.IO) { + body?.string() ?: "" + } + } +} diff --git a/ts-web/ts-web-okhttp/src/test/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpClientTest.kt b/ts-web/ts-web-okhttp/src/test/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpClientTest.kt new file mode 100644 index 0000000..499952d --- /dev/null +++ b/ts-web/ts-web-okhttp/src/test/kotlin/cn/tursom/web/client/okhttp/OkhttpHttpClientTest.kt @@ -0,0 +1,18 @@ +package cn.tursom.web.client.okhttp + +import kotlinx.coroutines.runBlocking +import org.junit.Test + +internal class OkhttpHttpClientTest { + private val client = OkhttpHttpClient.default + + @Test + fun request() { + runBlocking { + val response = client.request("GET", "https://cdn.segmentfault.com/r-e032f7ee/umi.js") + .addHeader("accept-encoding", "gzip, deflate, br") + .send() + println(response.body.string()) + } + } +} \ No newline at end of file