This commit is contained in:
tursom 2022-04-10 17:00:19 +08:00
parent 56375ce172
commit 4c18fa6b02
29 changed files with 619 additions and 93 deletions

View File

@ -27,6 +27,7 @@ include("ts-socket")
include("ts-web") include("ts-web")
include("ts-web:ts-web-netty") include("ts-web:ts-web-netty")
include("ts-web:ts-web-netty-client") include("ts-web:ts-web-netty-client")
include("ts-web:ts-web-okhttp")
include("ts-web:ts-web-coroutine") include("ts-web:ts-web-coroutine")
include("ts-database") include("ts-database")
include("ts-database:ts-ktorm") include("ts-database:ts-ktorm")

View File

@ -430,7 +430,8 @@ fun String.base62Decode(): Long {
fun Any.toJson(): String = Utils.gson.toJson(this) fun Any.toJson(): String = Utils.gson.toJson(this)
fun Any.toPrettyJson(): String = Utils.prettyGson.toJson(this) fun Any.toPrettyJson(): String = Utils.prettyGson.toJson(this)
inline fun <reified T : Any> String.fromJson(): T = Utils.gson.fromJson(this, T::class.java) inline fun <reified T : Any> String.fromJson(): T = Utils.gson.fromJson(this)
inline fun <reified T : Any> String.fromJsonTyped(): T = Utils.gson.fromJsonTyped(this)
fun Any.serialize(): ByteArray { fun Any.serialize(): ByteArray {
val outputStream = ByteArrayOutputStream() val outputStream = ByteArrayOutputStream()

View File

@ -32,7 +32,7 @@ object AsyncHttpRequest {
url: String, url: String,
param: Map<String, String>? = null, param: Map<String, String>? = null,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
): Response = client.get(url, param, headers) ): Response = client.get(url, param, headers)
@JvmOverloads @JvmOverloads
@ -42,7 +42,7 @@ object AsyncHttpRequest {
url: String, url: String,
body: RequestBody, body: RequestBody,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
): Response = client.post(url, body, headers) ): Response = client.post(url, body, headers)
@JvmOverloads @JvmOverloads
@ -52,7 +52,7 @@ object AsyncHttpRequest {
url: String, url: String,
param: Map<String, String>, param: Map<String, String>,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
): Response = client.post(url, param, headers) ): Response = client.post(url, param, headers)
@JvmOverloads @JvmOverloads
@ -62,7 +62,7 @@ object AsyncHttpRequest {
url: String, url: String,
body: String, body: String,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
) = client.post(url, body, headers) ) = client.post(url, body, headers)
@JvmOverloads @JvmOverloads
@ -72,7 +72,7 @@ object AsyncHttpRequest {
url: String, url: String,
body: File, body: File,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
) = client.post(url, body, headers) ) = client.post(url, body, headers)
@JvmOverloads @JvmOverloads
@ -82,7 +82,7 @@ object AsyncHttpRequest {
url: String, url: String,
body: ByteArray, body: ByteArray,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
) = client.post(url, body, headers) ) = client.post(url, body, headers)
@Suppress("BlockingMethodInNonBlockingContext") @Suppress("BlockingMethodInNonBlockingContext")
@ -93,7 +93,7 @@ object AsyncHttpRequest {
url: String, url: String,
param: Map<String, String>? = null, param: Map<String, String>? = null,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
): String = client.getStr(url, param, headers) ): String = client.getStr(url, param, headers)
@Suppress("BlockingMethodInNonBlockingContext") @Suppress("BlockingMethodInNonBlockingContext")
@ -104,7 +104,7 @@ object AsyncHttpRequest {
url: String, url: String,
body: RequestBody, body: RequestBody,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
): String = client.postStr(url, body, headers) ): String = client.postStr(url, body, headers)
@JvmOverloads @JvmOverloads
@ -114,7 +114,7 @@ object AsyncHttpRequest {
url: String, url: String,
param: Map<String, String>, param: Map<String, String>,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
): String = client.postStr(url, param, headers) ): String = client.postStr(url, param, headers)
@JvmOverloads @JvmOverloads
@ -124,7 +124,7 @@ object AsyncHttpRequest {
url: String, url: String,
body: String, body: String,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
): String = client.postStr(url, body, headers) ): String = client.postStr(url, body, headers)
@JvmOverloads @JvmOverloads
@ -134,7 +134,7 @@ object AsyncHttpRequest {
url: String, url: String,
body: File, body: File,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
): String = client.postStr(url, body, headers) ): String = client.postStr(url, body, headers)
@Suppress("BlockingMethodInNonBlockingContext") @Suppress("BlockingMethodInNonBlockingContext")
@ -145,7 +145,7 @@ object AsyncHttpRequest {
url: String, url: String,
param: Map<String, String>? = null, param: Map<String, String>? = null,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
): ByteArray = client.getByteArray(url, param, headers) ): ByteArray = client.getByteArray(url, param, headers)
@ -157,7 +157,7 @@ object AsyncHttpRequest {
url: String, url: String,
body: RequestBody, body: RequestBody,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
): ByteArray = client.postByteArray(url, body, headers) ): ByteArray = client.postByteArray(url, body, headers)
@JvmOverloads @JvmOverloads
@ -167,7 +167,7 @@ object AsyncHttpRequest {
url: String, url: String,
param: Map<String, String>, param: Map<String, String>,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
): ByteArray = client.postByteArray(url, param, headers) ): ByteArray = client.postByteArray(url, param, headers)
@JvmOverloads @JvmOverloads
@ -177,7 +177,7 @@ object AsyncHttpRequest {
url: String, url: String,
body: String, body: String,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
): ByteArray = client.postByteArray(url, body, headers) ): ByteArray = client.postByteArray(url, body, headers)
@JvmOverloads @JvmOverloads
@ -187,6 +187,6 @@ object AsyncHttpRequest {
url: String, url: String,
body: File, body: File,
headers: Map<String, String>? = null, headers: Map<String, String>? = null,
client: OkHttpClient = defaultClient, client: OkHttpClient = Okhttp.default,
): ByteArray = client.postByteArray(url, body, headers) ): ByteArray = client.postByteArray(url, body, headers)
} }

View File

@ -0,0 +1,58 @@
package cn.tursom.core
import cn.tursom.core.buffer.impl.DirectByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer
import java.nio.ByteBuffer
/**
* hack java.nio.HeapByteBuffer
*/
object ByteBufferUtil {
private val field = ByteBuffer::class.java.getDeclaredField("offset")
private val bufferWrapper = ArrayList<(ByteBuffer, Boolean) -> cn.tursom.core.buffer.ByteBuffer?>()
val empty: cn.tursom.core.buffer.ByteBuffer = HeapByteBuffer(0)
init {
field.isAccessible = true
addWrapper { it, write ->
if (!it.hasArray()) {
null
} else {
HeapByteBuffer(it, write)
}
}
addWrapper { it, write ->
if (it.hasArray()) {
null
} else {
DirectByteBuffer(it, write)
}
}
}
fun addWrapper(wrapper: (ByteBuffer, Boolean) -> cn.tursom.core.buffer.ByteBuffer?) {
bufferWrapper.add(wrapper)
}
fun wrap(byteBuffer: ByteBuffer, write: Boolean = true): cn.tursom.core.buffer.ByteBuffer {
bufferWrapper.forEach { wrapper ->
return wrapper(byteBuffer, write) ?: return@forEach
}
val buffer = HeapByteBuffer(byteBuffer.limit() - byteBuffer.position())
if (!write) {
buffer.writeBuffer {
it.put(byteBuffer)
}
}
return buffer
}
fun wrap(array: ByteArray, offset: Int = 0, size: Int = array.size - offset): ByteBuffer {
val buffer = ByteBuffer.wrap(array, 0, offset + size)
if (offset > 0) field.set(buffer, offset)
return buffer
}
fun wrap(string: String) = wrap(string.toByteArray())
}

View File

@ -1,22 +0,0 @@
package cn.tursom.core
import java.nio.ByteBuffer
/**
* hack java.nio.HeapByteBuffer
*/
object HeapByteBufferUtil {
private val field = ByteBuffer::class.java.getDeclaredField("offset")
init {
field.isAccessible = true
}
fun wrap(array: ByteArray, offset: Int = 0, size: Int = array.size - offset): ByteBuffer {
val buffer = ByteBuffer.wrap(array, 0, offset + size)
if (offset > 0) field.set(buffer, offset)
return buffer
}
fun wrap(string: String) = wrap(string.toByteArray())
}

View File

@ -182,8 +182,9 @@ interface ByteBuffer : Closeable {
val buffer = buffer ?: bufferThreadLocal.get() val buffer = buffer ?: bufferThreadLocal.get()
read { read {
while (it.remaining() > 0) { while (it.remaining() > 0) {
it.put(buffer) val min = min(it.remaining(), buffer.size)
os.write(buffer) it.get(buffer, 0, min)
os.write(buffer, 0, min)
} }
} }
} }

View File

@ -9,6 +9,12 @@ class DirectByteBuffer(
) : ByteBuffer { ) : ByteBuffer {
constructor(size: Int) : this(java.nio.ByteBuffer.allocateDirect(size)) constructor(size: Int) : this(java.nio.ByteBuffer.allocateDirect(size))
constructor(buffer: java.nio.ByteBuffer, write: Boolean) : this(
buffer,
if (write) 0 else buffer.position(),
if (write) buffer.position() else buffer.limit()
)
override val hasArray: Boolean = false override val hasArray: Boolean = false
override val array: ByteArray get() = buffer.array() override val array: ByteArray get() = buffer.array()
override val capacity: Int get() = buffer.capacity() override val capacity: Int get() = buffer.capacity()

View File

@ -1,6 +1,6 @@
package cn.tursom.core.buffer.impl package cn.tursom.core.buffer.impl
import cn.tursom.core.HeapByteBufferUtil import cn.tursom.core.ByteBufferUtil
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ByteBuffer
class HeapByteBuffer( class HeapByteBuffer(
@ -11,7 +11,19 @@ class HeapByteBuffer(
constructor(size: Int) : this(java.nio.ByteBuffer.allocate(size)) constructor(size: Int) : this(java.nio.ByteBuffer.allocate(size))
constructor(string: String) : this(string.toByteArray()) constructor(string: String) : this(string.toByteArray())
constructor(bytes: ByteArray, offset: Int = 0, size: Int = bytes.size - offset) constructor(bytes: ByteArray, offset: Int = 0, size: Int = bytes.size - offset)
: this(HeapByteBufferUtil.wrap(bytes, offset, size), offset, offset + size) : this(ByteBufferUtil.wrap(bytes, offset, size), offset, offset + size)
constructor(buffer: java.nio.ByteBuffer, write: Boolean) : this(
buffer,
if (write) 0 else buffer.position(),
if (write) buffer.position() else buffer.limit()
)
constructor(bytes: ByteArray, write: Boolean) : this(
java.nio.ByteBuffer.wrap(bytes),
0,
if (write) 0 else bytes.size
)
init { init {
assert(buffer.hasArray()) assert(buffer.hasArray())

View File

@ -8,6 +8,8 @@ dependencies {
implementation(project(":ts-core")) implementation(project(":ts-core"))
implementation(project(":ts-core:ts-buffer")) implementation(project(":ts-core:ts-buffer"))
implementation(project(":ts-core:ts-datastruct")) implementation(project(":ts-core:ts-datastruct"))
compileOnly(project(":ts-core:ts-coroutine"))
compileOnly(project(":ts-core:ts-json")) compileOnly(project(":ts-core:ts-json"))
compileOnly(group = "com.aayushatharva.brotli4j", name = "brotli4j", version = "1.7.1")
implementation(group = "org.slf4j", name = "slf4j-api", version = "1.7.32") implementation(group = "org.slf4j", name = "slf4j-api", version = "1.7.32")
} }

View File

@ -0,0 +1,42 @@
package cn.tursom.web.client
import cn.tursom.core.ByteBufferUtil
import cn.tursom.core.coroutine.GlobalScope
import com.aayushatharva.brotli4j.decoder.DecoderJNI
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.produce
class BrotliHttpResponseStream(
private val stream: HttpResponseStream,
inputBufferSize: Int = 8 * 1024,
) : ChannelHttpResponse() {
private val decoder = DecoderJNI.Wrapper(inputBufferSize)
@OptIn(ExperimentalCoroutinesApi::class)
override val bufferChannel = GlobalScope.produce {
while (true) {
val input = stream.buffer() ?: return@produce
when (decoder.status) {
DecoderJNI.Status.DONE -> return@produce
DecoderJNI.Status.OK -> decoder.push(0)
DecoderJNI.Status.NEEDS_MORE_INPUT -> {
if (decoder.hasOutput()) {
val buffer = decoder.pull()
send(ByteBufferUtil.wrap(buffer, false))
}
val decoderInputBuffer = decoder.inputBuffer
decoderInputBuffer.clear()
input.writeTo(ByteBufferUtil.wrap(decoderInputBuffer, true))
//decoderInputBuffer.put(input.getBytes(decoderInputBuffer.limit() - decoderInputBuffer.position()))
decoder.push(decoderInputBuffer.position())
}
DecoderJNI.Status.NEEDS_MORE_OUTPUT -> {
val buffer = decoder.pull() ?: continue
send(ByteBufferUtil.wrap(buffer, false))
}
else -> return@produce
}
}
}
}

View File

@ -0,0 +1,61 @@
package cn.tursom.web.client
import cn.tursom.core.buffer.ByteBuffer
import kotlinx.coroutines.channels.ReceiveChannel
import java.io.ByteArrayOutputStream
abstract class ChannelHttpResponse : HttpResponseStream {
protected abstract val bufferChannel: ReceiveChannel<ByteBuffer>
private var buffer: ByteBuffer? = null
override suspend fun buffer(): ByteBuffer? {
while (buffer == null || buffer?.readable == 0) {
buffer?.close()
val receive = bufferChannel.receiveCatching()
buffer = if (receive.isSuccess) {
receive.getOrThrow()
} else {
val e = receive.exceptionOrNull()
if (e != null) {
throw e
}
return null
}
}
return buffer
}
override suspend fun skip(n: Long): Long {
var skip = 0L
while (skip < n) {
val buffer = buffer() ?: return skip
skip += buffer.skip((n - skip).toInt())
}
return skip
}
override suspend fun read(): Int {
val buffer = buffer() ?: return -1
return buffer.get().toInt()
}
override suspend fun read(buffer: ByteBuffer) {
val buf = buffer() ?: return
buf.writeTo(buffer)
}
override fun close() {
bufferChannel.cancel()
}
override suspend fun readBytes(): ByteArray {
val os = ByteArrayOutputStream()
var buf = buffer()
while (buf != null) {
buf.writeTo(os)
buf = buffer()
}
return os.toByteArray()
}
}

View File

@ -0,0 +1,43 @@
package cn.tursom.web.client
import cn.tursom.core.ByteBufferUtil
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.core.coroutine.GlobalScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import java.io.InputStream
import java.util.zip.GZIPInputStream
//TODO impl
class GzipHttpResponseStream(
private val stream: HttpResponseStream,
) : ChannelHttpResponse() {
private class ByteByfferInputStream(
var byteBuffer: ByteBuffer,
) : InputStream() {
override fun read(): Int {
if (byteBuffer.readable == 0) {
return -1
}
return byteBuffer.get().toInt()
}
override fun read(b: ByteArray): Int = byteBuffer.writeTo(b)
override fun read(b: ByteArray, off: Int, len: Int): Int = byteBuffer.writeTo(b, off, len)
}
private val inputStream = ByteByfferInputStream(ByteBufferUtil.empty)
@Suppress("BlockingMethodInNonBlockingContext")
@OptIn(ExperimentalCoroutinesApi::class)
override val bufferChannel: ReceiveChannel<ByteBuffer> = GlobalScope.produce {
val gzip = GZIPInputStream(inputStream)
while (true) {
inputStream.byteBuffer = stream.buffer() ?: return@produce
val bytes = gzip.readBytes()
send(HeapByteBuffer(bytes, false))
}
}
}

View File

@ -1,5 +1,5 @@
package cn.tursom.web.client package cn.tursom.web.client
interface HttpClient { interface HttpClient {
suspend fun request(method: String, url: String, ssl: Boolean? = null): HttpRequest suspend fun request(method: String, url: String): HttpRequest
} }

View File

@ -14,9 +14,10 @@ interface HttpRequest {
} }
val headers: Iterable<Map.Entry<String, String>> val headers: Iterable<Map.Entry<String, String>>
fun addHeader(key: String, value: Any) fun addHeader(key: String, value: Any): HttpRequest
fun addHeaders(headers: Map<String, Any>) { fun addHeaders(headers: Map<String, Any>): HttpRequest {
headers.forEach(::addHeader) headers.forEach(::addHeader)
return this
} }
fun body(data: ByteBuffer) fun body(data: ByteBuffer)

View File

@ -3,7 +3,7 @@ package cn.tursom.web.client
interface HttpResponse { interface HttpResponse {
val code: Int val code: Int
val reason: String val reason: String
val headers: Iterable<Map.Entry<String, String>> val headers: Iterable<Pair<String, String>>
fun getHeader(key: String): String? fun getHeader(key: String): String?
fun getHeaders(key: String): List<String> fun getHeaders(key: String): List<String>
val body: HttpResponseStream val body: HttpResponseStream

View File

@ -2,10 +2,15 @@ package cn.tursom.web.client
import cn.tursom.core.buffer.ByteBuffer import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.core.fromJson
import cn.tursom.core.fromJsonTyped
import cn.tursom.core.toUTF8String
import java.io.ByteArrayOutputStream
import java.io.Closeable import java.io.Closeable
interface HttpResponseStream : Closeable { interface HttpResponseStream : Closeable {
suspend fun skip(n: Long) suspend fun buffer(): ByteBuffer?
suspend fun skip(n: Long): Long
suspend fun read(): Int suspend fun read(): Int
suspend fun read(buffer: ByteBuffer) suspend fun read(buffer: ByteBuffer)
suspend fun read( suspend fun read(
@ -18,4 +23,19 @@ interface HttpResponseStream : Closeable {
read(byteBuffer) read(byteBuffer)
return byteBuffer.writePosition return byteBuffer.writePosition
} }
suspend fun readBytes(): ByteArray {
val os = ByteArrayOutputStream()
val buffer = ByteArray(1024)
do {
val read = read(buffer)
os.write(buffer, 0, read)
} while (read != 0)
return os.toByteArray()
} }
suspend fun string() = readBytes().toUTF8String()
}
suspend inline fun <reified T : Any> HttpResponseStream.json(): T = string().fromJson()
suspend inline fun <reified T : Any> HttpResponseStream.jsonGeneric(): T = string().fromJsonTyped()

View File

@ -4,15 +4,36 @@ plugins {
id("ts-gradle") id("ts-gradle")
} }
val brotliVersion = "1.7.1"
val operatingSystem: OperatingSystem =
org.gradle.nativeplatform.platform.internal.DefaultNativePlatform.getCurrentOperatingSystem()
dependencies { dependencies {
api(project(":ts-core")) api(project(":ts-core"))
api(project(":ts-core:ts-buffer")) api(project(":ts-core:ts-buffer"))
api(project(":ts-core:ts-log")) api(project(":ts-core:ts-log"))
api(project(":ts-web")) api(project(":ts-web"))
api(project(":ts-web:ts-web-netty")) api(project(":ts-web:ts-web-netty"))
api(project(":ts-core:ts-coroutine"))
api(group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version = "1.6.0") api(group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version = "1.6.0")
api(group = "io.netty", name = "netty-all", version = "4.1.72.Final") api(group = "io.netty", name = "netty-all", version = "4.1.72.Final")
api(group = "org.slf4j", name = "slf4j-api", version = "1.7.32") api(group = "org.slf4j", name = "slf4j-api", version = "1.7.32")
implementation(group = "io.netty", name = "netty-tcnative-boringssl-static", version = "2.0.46.Final")
testApi(group = "junit", name = "junit", version = "4.13.2")
testImplementation(group = "com.aayushatharva.brotli4j", name = "brotli4j", version = brotliVersion)
testImplementation(
group = "com.aayushatharva.brotli4j",
name = "native-${
if (operatingSystem.isWindows) "windows-x86_64"
else if (operatingSystem.isMacOsX) "osx-x86_64"
else if (operatingSystem.isLinux)
if (org.gradle.nativeplatform.platform.internal.DefaultNativePlatform.getCurrentArchitecture().isArm) "linux-aarch64"
else "native-linux-x86_64"
else ""
}",
version = brotliVersion
)
} }

View File

@ -7,6 +7,7 @@ import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.HttpClientCodec import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpContentDecompressor
import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.util.InsecureTrustManagerFactory import io.netty.handler.ssl.util.InsecureTrustManagerFactory
@ -35,6 +36,7 @@ object HttpExecutor {
addLast(sslCtx.newHandler(ch.alloc(), host, port)) addLast(sslCtx.newHandler(ch.alloc(), host, port))
} }
addLast(HttpClientCodec()) addLast(HttpClientCodec())
addLast(HttpContentDecompressor())
} }
initChannel(ch) initChannel(ch)
} }

View File

@ -5,7 +5,7 @@ import cn.tursom.web.client.HttpRequest
import java.net.URI import java.net.URI
open class NettyHttpClient : HttpClient { open class NettyHttpClient : HttpClient {
override suspend fun request(method: String, url: String, ssl: Boolean?): HttpRequest { override suspend fun request(method: String, url: String): HttpRequest {
val uri = URI.create(url) val uri = URI.create(url)
val port = if (uri.port < 0) { val port = if (uri.port < 0) {
when (uri.scheme ?: "http") { when (uri.scheme ?: "http") {
@ -16,7 +16,6 @@ open class NettyHttpClient : HttpClient {
} else { } else {
uri.port uri.port
} }
val pool = HttpConnectionPool.poolOf(uri.host, port, uri.scheme == "https") val pool = HttpConnectionPool.poolOf(uri.host, port, uri.scheme == "https")
val request = NettyHttpRequest(pool) val request = NettyHttpRequest(pool)
request.method = method request.method = method

View File

@ -43,8 +43,9 @@ class NettyHttpRequest(
override val headers: Iterable<Map.Entry<String, String>> override val headers: Iterable<Map.Entry<String, String>>
get() = request.headers() get() = request.headers()
override fun addHeader(key: String, value: Any) { override fun addHeader(key: String, value: Any): NettyHttpRequest {
request.headers().add(key, value) request.headers().add(key, value)
return this
} }
override fun body(data: ByteBuffer) { override fun body(data: ByteBuffer) {

View File

@ -1,12 +1,15 @@
package cn.tursom.web.client.netty package cn.tursom.web.client.netty
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.NettyByteBuffer import cn.tursom.core.buffer.impl.NettyByteBuffer
import cn.tursom.core.coroutine.GlobalScope
import cn.tursom.web.client.ChannelHttpResponse
import cn.tursom.web.client.HttpResponse import cn.tursom.web.client.HttpResponse
import cn.tursom.web.client.HttpResponseStream import cn.tursom.web.client.HttpResponseStream
import io.netty.handler.codec.http.HttpContent import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpObject import io.netty.handler.codec.http.HttpObject
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
class NettyHttpResponse( class NettyHttpResponse(
private val response: io.netty.handler.codec.http.HttpResponse, private val response: io.netty.handler.codec.http.HttpResponse,
@ -16,60 +19,38 @@ class NettyHttpResponse(
get() = response.status().code() get() = response.status().code()
override val reason: String override val reason: String
get() = response.status().reasonPhrase() get() = response.status().reasonPhrase()
override val headers get() = response.headers()!! override val headers by lazy { response.headers().map { (k, v) -> k to v } }
override fun getHeader(key: String): String? = response.headers().get(key) override fun getHeader(key: String): String? = response.headers().get(key)
override fun getHeaders(key: String): List<String> = response.headers().getAll(key) override fun getHeaders(key: String): List<String> = response.headers().getAll(key)
override val body: HttpResponseStream = NettyStream(response, channel) override val body: HttpResponseStream
init {
body = NettyStream(response, channel)
}
@OptIn(ExperimentalCoroutinesApi::class)
private class NettyStream( private class NettyStream(
response: HttpObject, response: HttpObject,
private val channel: ReceiveChannel<HttpObject>, private val channel: ReceiveChannel<HttpObject>,
) : HttpResponseStream { ) : ChannelHttpResponse() {
private var buffer: ByteBuffer? = if (response is HttpContent) { override val bufferChannel = GlobalScope.produce {
NettyByteBuffer(response.content()) if (response is HttpContent) {
} else { send(NettyByteBuffer(response.content()))
null
} }
do {
private suspend fun buffer(): ByteBuffer? { val receive = this@NettyStream.channel.receiveCatching()
if (buffer == null || buffer?.readable == 0) { if (receive.isSuccess) {
val receive = channel.receiveCatching()
buffer = if (receive.isSuccess) {
val content = receive.getOrThrow() as HttpContent val content = receive.getOrThrow() as HttpContent
NettyByteBuffer(content.content()) send(NettyByteBuffer(content.content()))
} else { } else {
val e = receive.exceptionOrNull() val e = receive.exceptionOrNull()
if (e != null) { if (e != null) {
throw e close(e)
}
null
} }
} }
return buffer } while (receive.isSuccess)
}
override suspend fun skip(n: Long) {
var skip = 0L
while (skip < n) {
val buffer = buffer() ?: return
skip += buffer.skip((n - skip).toInt())
}
}
override suspend fun read(): Int {
val buffer = buffer() ?: return -1
return buffer.get().toInt()
}
override suspend fun read(buffer: ByteBuffer) {
val buf = buffer() ?: return
buf.writeTo(buffer)
}
override fun close() {
channel.cancel()
} }
} }
} }

View File

@ -11,7 +11,7 @@ import kotlinx.coroutines.channels.trySendBlocking
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
@ChannelHandler.Sharable @ChannelHandler.Sharable
object NettyHttpResultResume : SimpleChannelInboundHandler<HttpObject>() { object NettyHttpResultResume : SimpleChannelInboundHandler<HttpObject>(false) {
val recvChannelKey = AttributeKey.newInstance<SendChannel<HttpObject>>("recvChannelKey")!! val recvChannelKey = AttributeKey.newInstance<SendChannel<HttpObject>>("recvChannelKey")!!
val countKey = AttributeKey.newInstance<AtomicInteger>("countKey")!! val countKey = AttributeKey.newInstance<AtomicInteger>("countKey")!!
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) { override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) {

View File

@ -0,0 +1,22 @@
package cn.tursom.web.client.netty
import io.netty.handler.codec.compression.Brotli
import kotlinx.coroutines.runBlocking
import org.junit.Test
internal class NettyHttpClientTest {
private val client = NettyHttpClient()
@Test
fun request() {
Brotli.ensureAvailability()
runBlocking {
val request = client.request("GET", "https://cdn.segmentfault.com/r-e032f7ee/umi.js")
.addHeader("accept-encoding", "br")
request.path = "https://cdn.segmentfault.com/r-e032f7ee/umi.js"
val response = request.send()
println(response.body
.string())
}
}
}

View File

@ -0,0 +1,21 @@
plugins {
kotlin("jvm")
`maven-publish`
id("ts-gradle")
}
dependencies {
api(project(":ts-core"))
api(project(":ts-core:ts-buffer"))
api(project(":ts-core:ts-log"))
api(project(":ts-web"))
api(group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version = "1.6.0")
api(group = "com.squareup.okhttp3", name = "okhttp", version = "4.9.3")
api(group = "io.netty", name = "netty-all", version = "4.1.72.Final")
api(group = "org.slf4j", name = "slf4j-api", version = "1.7.32")
testApi(group = "junit", name = "junit", version = "4.13.2")
}

View File

@ -0,0 +1,37 @@
package cn.tursom.web.client.okhttp
import cn.tursom.web.client.HttpClient
import okhttp3.OkHttpClient
import java.net.InetSocketAddress
import java.net.Proxy
import java.net.SocketAddress
class OkhttpHttpClient(
private val client: OkHttpClient,
) : HttpClient {
companion object {
val direct = OkhttpHttpClient(OkHttpClient().newBuilder()
.retryOnConnectionFailure(true)
.build())
val socket = proxy()
val httpProxy = proxy(port = 8080, type = Proxy.Type.HTTP)
var default = direct
@JvmOverloads
fun proxy(
host: String = "127.0.0.1",
port: Int = 1080,
type: Proxy.Type = Proxy.Type.SOCKS,
builder: OkHttpClient.Builder = OkHttpClient().newBuilder(),
) = OkhttpHttpClient(builder
.proxy(Proxy(type, InetSocketAddress(host, port) as SocketAddress))
.retryOnConnectionFailure(true)
.build())
}
override suspend fun request(method: String, url: String): OkhttpHttpRequest {
return OkhttpHttpRequest(client, method, url)
}
}

View File

@ -0,0 +1,116 @@
package cn.tursom.web.client.okhttp
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.web.client.HttpRequest
import okhttp3.*
import okhttp3.RequestBody.Companion.toRequestBody
import java.io.IOException
import java.net.URL
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
class OkhttpHttpRequest(
private val client: OkHttpClient,
override var method: String,
url: String,
) : HttpRequest {
override var version: String = ""
private val protocol: String
private val host: String
private val port: Int
override var path: String = ""
set(value) {
field = if (value.startsWith('/')) {
value
} else {
"/$value"
}
}
override val params = HashMap<String, MutableList<String>>()
private var ref: String?
private var body: ByteBuffer? = null
init {
val url = URL(url)
host = url.host
port = url.port
path = url.path
protocol = url.protocol
url.query?.splitToSequence('&')?.forEach { query ->
val i = query.indexOf('=')
if (i <= 0) {
return@forEach
}
val key = query.substring(0, i)
val value = query.substring(i + 1)
addParam(key, value)
}
ref = url.ref
}
private val portStr: String
get() = if (port <= 0) {
""
} else {
":$port"
}
private val paramStr: String
get() = buildString {
params.forEach { (k, list) ->
list.forEach { v ->
if (isNotEmpty()) {
append('&')
}
append("$k=$v")
}
}
}
override fun addParam(key: String, value: String) {
params.getOrPut(key) { ArrayList() }.add(value)
}
data class Header(override val key: String, override val value: String) : Map.Entry<String, String>
override val headers = ArrayList<Header>()
override fun addHeader(key: String, value: Any): OkhttpHttpRequest {
headers.add(Header(key, value.toString()))
return this
}
override fun body(data: ByteBuffer) {
body = data
}
override suspend fun send(): OkhttpHttpResponse {
val builder = Request.Builder()
.method(method, body?.getBytes()?.toRequestBody())
.url(buildString {
append("$protocol://$host$portStr$path")
val paramStr = paramStr
if (paramStr.isNotEmpty()) {
append("?$paramStr")
}
if (ref != null) {
append("#$ref")
}
})
headers.forEach { (key, value) ->
builder.addHeader(key, value)
}
return OkhttpHttpResponse(suspendCoroutine<Response> {
client.newCall(builder.build()).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
it.resumeWithException(e)
}
override fun onResponse(call: Call, response: Response) {
it.resume(response)
}
})
})
}
}

View File

@ -0,0 +1,26 @@
package cn.tursom.web.client.okhttp
import cn.tursom.web.client.HttpResponse
import okhttp3.Response
class OkhttpHttpResponse(
val response: Response,
) : HttpResponse {
override val code: Int
get() = response.code
override val reason: String
get() = response.message
override val headers: Iterable<Pair<String, String>>
get() = response.headers
override fun getHeader(key: String): String? {
return response.headers[key]
}
override fun getHeaders(key: String): List<String> {
return response.headers.filter { it.first == key }.map { it.second }
}
override val body: OkhttpResponseStream
get() = OkhttpResponseStream(response.body)
}

View File

@ -0,0 +1,56 @@
package cn.tursom.web.client.okhttp
import cn.tursom.core.buffer.ByteBuffer
import cn.tursom.core.buffer.impl.HeapByteBuffer
import cn.tursom.web.client.HttpResponseStream
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import okhttp3.ResponseBody
class OkhttpResponseStream(
val body: ResponseBody?,
) : HttpResponseStream {
override suspend fun buffer(): ByteBuffer? {
val stream = body?.byteStream() ?: return null
val buffer = HeapByteBuffer(1024)
val read = withContext(Dispatchers.IO) {
stream.read(buffer.array, buffer.writeOffset, buffer.writeable)
}
if (read == 0) {
return null
}
buffer.writePosition += read
return buffer
}
override suspend fun skip(n: Long) = withContext(Dispatchers.IO) {
body?.byteStream()?.skip(n) ?: 0
}
override suspend fun read(): Int = withContext(Dispatchers.IO) {
body?.byteStream()?.read() ?: -1
}
override suspend fun read(buffer: ByteBuffer) {
body ?: return
withContext(Dispatchers.IO) {
buffer.put(body.byteStream())
}
}
override fun close() {
body?.close()
}
override suspend fun readBytes(): ByteArray {
return withContext(Dispatchers.IO) {
body?.bytes() ?: ByteArray(0)
}
}
override suspend fun string(): String {
return withContext(Dispatchers.IO) {
body?.string() ?: ""
}
}
}

View File

@ -0,0 +1,18 @@
package cn.tursom.web.client.okhttp
import kotlinx.coroutines.runBlocking
import org.junit.Test
internal class OkhttpHttpClientTest {
private val client = OkhttpHttpClient.default
@Test
fun request() {
runBlocking {
val response = client.request("GET", "https://cdn.segmentfault.com/r-e032f7ee/umi.js")
.addHeader("accept-encoding", "gzip, deflate, br")
.send()
println(response.body.string())
}
}
}