Introduce higher performance compression/decompression methods for different purposes for common

This commit is contained in:
Him188 2022-06-04 16:33:55 +01:00
parent 4f61417af6
commit 21d883b630
10 changed files with 805 additions and 509 deletions

View File

@ -24,8 +24,92 @@ public fun String.sha1(): ByteArray = toByteArray().sha1()
public expect fun ByteArray.sha1(offset: Int = 0, length: Int = size - offset): ByteArray
///////////////////////////////////////////////////////////////////////////
// How to choose 'inflate', 'inflateAllAvailable', 'InflateInput'?
//
// On JVM, performance 'inflateAllAvailable' > 'InflateInput' > 'inflate'
// On Native, performance 'inflateAllAvailable' = 'InflateInput' > 'inflate'
//
// So you should use `inflateAllAvailable` if you need have an Input and you need a ByteArray.
// If you have a ByteArray and you need an InputStream, use 'InflateInput'.
// Use 'inflate' only if the input and desired output type are both ByteArray.
//
// Specially if you are using `.decodeToString()` after reading a ByteArray, then you'd prefer 'InflateInput' with 'readText()'.
///////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////
// Processing ByteArray
///////////////////////////////////////////////////////////////////////////
public expect fun ByteArray.gzip(offset: Int = 0, length: Int = size - offset): ByteArray
public expect fun ByteArray.ungzip(offset: Int = 0, length: Int = size - offset): ByteArray
public expect fun ByteArray.inflate(offset: Int = 0, length: Int = size - offset): ByteArray
public expect fun ByteArray.deflate(offset: Int = 0, length: Int = size - offset): ByteArray
///////////////////////////////////////////////////////////////////////////
// Consuming input
///////////////////////////////////////////////////////////////////////////
/**
* Input will be closed.
*/
public expect fun Input.gzipAllAvailable(): ByteArray
/**
* Input will be closed.
*/
public expect fun Input.ungzipAllAvailable(): ByteArray
/**
* Input will be closed.
*/
public expect fun Input.inflateAllAvailable(): ByteArray
/**
* Input will be closed.
*/
public expect fun Input.deflateAllAvailable(): ByteArray
///////////////////////////////////////////////////////////////////////////
// Input adapters.
///////////////////////////////////////////////////////////////////////////
//@Suppress("FunctionName")
//public expect fun GzipCompressionInput(source: Input): Input // No GzipInputStream for decompression on JVM
/**
* [source] will be closed on returned [Input.close]
*/
@Suppress("FunctionName")
public expect fun GzipDecompressionInput(source: Input): Input
/**
* @see GzipDecompressionInput
*/
public inline fun Input.gzipDecompressionInput(): Input = GzipDecompressionInput(this)
/**
* [source] will be closed on returned [Input.close]
*/
@Suppress("FunctionName")
public expect fun InflateInput(source: Input): Input
/**
* @see InflateInput
*/
public inline fun Input.inflateInput(): Input = InflateInput(this)
/**
* [source] will be closed on returned [Input.close]
*/
@Suppress("FunctionName")
public expect fun DeflateInput(source: Input): Input
/**
* @see DeflateInput
*/
public inline fun Input.deflateInput(): Input = DeflateInput(this)

View File

@ -43,17 +43,51 @@ public inline fun <I : Closeable, O : Closeable, R> I.withOut(output: O, block:
return use { output.use { block(this, output) } }
}
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
/**
* It's caller's responsibility to close the input
*/
public inline fun <R> ByteReadPacket.useBytes(
n: Int = remaining.toInt(),//not that safe but adequate
n: Int = remaining.toIntOrFail(),
block: (data: ByteArray, length: Int) -> R
): R = ByteArrayPool.useInstance(n) {
this.readFully(it, 0, n)
block(it, n)
}
/**
* It's caller's responsibility to close the input
*/
public inline fun <R> Input.useBytes(
n: Int? = null,
block: (data: ByteArray, length: Int) -> R
): R {
return when {
n != null -> {
this.readBytes(n).let { block(it, it.size) }
}
this is ByteReadPacket -> {
val count = this.remaining.toIntOrFail()
ByteArrayPool.useInstance(count) {
this.readFully(it, 0, count)
block(it, count)
}
}
else -> {
this.readBytes().let { block(it, it.size) }
}
}
}
public fun Long.toIntOrFail(): Int {
if (this >= Int.MAX_VALUE || this <= Int.MIN_VALUE) {
throw IllegalArgumentException("$this does not fit in Int range")
}
return this.toInt()
}
public inline fun ByteReadPacket.readPacketExact(
n: Int = remaining.toInt()//not that safe but adequate
n: Int = remaining.toIntOrFail()
): ByteReadPacket = this.readBytes(n).toReadPacket()

File diff suppressed because one or more lines are too long

View File

@ -12,36 +12,17 @@
package net.mamoe.mirai.utils
import io.ktor.utils.io.core.*
import io.ktor.utils.io.streams.asInput
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.io.OutputStream
import java.security.MessageDigest
import java.util.zip.Deflater
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
import java.util.zip.Inflater
import java.util.zip.*
public actual val DEFAULT_BUFFER_SIZE: Int get() = kotlin.io.DEFAULT_BUFFER_SIZE
public actual fun ByteArray.inflate(offset: Int, length: Int): ByteArray {
checkOffsetAndLength(offset, length)
if (length == 0) return ByteArray(0)
val inflater = Inflater()
inflater.reset()
ByteArrayOutputStream().use { output ->
inflater.setInput(this, offset, length)
ByteArray(DEFAULT_BUFFER_SIZE).let {
while (!inflater.finished()) {
output.write(it, 0, inflater.inflate(it))
}
}
inflater.end()
return output.toByteArray()
}
}
public fun InputStream.md5(): ByteArray {
return digest("md5")
}
@ -81,11 +62,6 @@ public actual fun ByteArray.sha1(offset: Int, length: Int): ByteArray {
return MessageDigest.getInstance("SHA-1").apply { update(this@sha1, offset, length) }.digest()
}
@JvmOverloads
public actual fun ByteArray.ungzip(offset: Int, length: Int): ByteArray {
return GZIPInputStream(inputStream(offset, length)).use { it.readBytes() }
}
@JvmOverloads
public actual fun ByteArray.gzip(offset: Int, length: Int): ByteArray {
ByteArrayOutputStream().use { buf ->
@ -97,6 +73,31 @@ public actual fun ByteArray.gzip(offset: Int, length: Int): ByteArray {
}
}
@JvmOverloads
public actual fun ByteArray.ungzip(offset: Int, length: Int): ByteArray {
return GZIPInputStream(inputStream(offset, length)).use { it.readBytes() }
}
public actual fun ByteArray.inflate(offset: Int, length: Int): ByteArray {
checkOffsetAndLength(offset, length)
if (length == 0) return ByteArray(0)
val inflater = Inflater()
inflater.reset()
return InflaterInputStream(ByteArrayInputStream(this, offset, length), inflater).readBytes()
// ByteArrayOutputStream().use { output ->
// inflater.setInput(this, offset, length)
// ByteArray(DEFAULT_BUFFER_SIZE).let {
// while (!inflater.finished()) {
// output.write(it, 0, inflater.inflate(it))
// }
// }
//
// inflater.end()
// return output.toByteArray()
// }
}
@JvmOverloads
public actual fun ByteArray.deflate(offset: Int, length: Int): ByteArray {
checkOffsetAndLength(offset, length)
@ -111,3 +112,91 @@ public actual fun ByteArray.deflate(offset: Int, length: Int): ByteArray {
}
}
/**
* Input will be closed.
*/
public actual fun Input.gzipAllAvailable(): ByteArray {
return this.readBytes().gzip()
// The following doesn't work, input's release won't becalled. Possibly Ktor bug.
// return this.use {
// ByteArrayOutputStream().use { buf ->
// GZIPOutputStream(buf).use { gzip ->
// copyTo(gzip.asOutput())
// }
// buf.flush()
// buf.toByteArray()
// }
// }
}
/**
* Input will be closed.
*/
public actual fun Input.ungzipAllAvailable(): ByteArray {
return GZIPInputStream(this.asStream()).use { it.readBytes() }
}
/**
* Input will be closed.
*/
public actual fun Input.inflateAllAvailable(): ByteArray {
return this.inflateInput().use { it.readBytes() }
}
/**
* Input will be closed.
*/
public actual fun Input.deflateAllAvailable(): ByteArray {
return this.deflateInput().use { it.readBytes() }
}
private fun Input.asStream(): InputStream = object : InputStream() {
override fun read(): Int {
if (endOfInput) return -1
return readByte().toIntUnsigned()
}
override fun read(buffer: ByteArray, offset: Int, length: Int): Int {
if (this@asStream.endOfInput) return -1
return readAvailable(buffer, offset, length)
}
override fun skip(count: Long): Long = discard(count)
override fun close() {
this@asStream.close()
}
}
/**
* [source] will be closed on returned [Input.close]
*/
@Suppress("FunctionName")
public actual fun GzipDecompressionInput(source: Input): Input {
return GZIPInputStream(source.asStream()).asInput()
}
/**
* [source] will be closed on returned [Input.close]
*/
@Suppress("FunctionName")
public actual fun InflateInput(source: Input): Input {
val inflater = Inflater()
inflater.reset()
return InflaterInputStream(source.asStream(), inflater).asInput()
}
/**
* [source] will be closed on returned [Input.close]
*/
@Suppress("FunctionName")
public actual fun DeflateInput(source: Input): Input {
val deflater = Deflater()
deflater.reset()
return DeflaterInputStream(source.asStream(), deflater).asInput()
}

View File

@ -0,0 +1,12 @@
/*
* Copyright 2019-2022 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
package net.mamoe.mirai.utils
internal actual class ByteArrayOpTest : CommonByteArrayOpTest()

View File

@ -38,30 +38,62 @@ public actual fun ByteArray.sha1(offset: Int, length: Int): ByteArray = SHA1.cre
public var ZLIB_BUFFER_SIZE: Long = 8192
public actual fun ByteArray.gzip(offset: Int, length: Int): ByteArray {
return GzipCompressionInput(this.toReadPacket(offset, length)).use { it.readBytes() }
}
public actual fun ByteArray.ungzip(offset: Int, length: Int): ByteArray {
return GzipDecompressionInput(this.toReadPacket(offset, length)).use { it.readBytes() }
}
public actual fun ByteArray.deflate(offset: Int, length: Int): ByteArray {
return DeflateInput(this.toReadPacket(offset, length)).use { it.readBytes() }
}
public actual fun ByteArray.inflate(offset: Int, length: Int): ByteArray {
return InflateInput(this.toReadPacket(offset, length)).use { it.readBytes() }
}
@Suppress("FunctionName")
public fun GzipCompressionInput(source: Input): Input {
return ZlibInput(
source = this.toReadPacket(offset, length),
source = source,
zlibInit = { deflateInit2(it, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 or 16, 8, Z_DEFAULT_STRATEGY) },
zlibProcess = { z, flush -> deflate(z, flush) },
zlibHasPending = null,
zlibFlushMode = { if (it) Z_FINISH else Z_NO_FLUSH },
zlibEnd = { deflateEnd(it) },
).readBytes()
)
}
public actual fun ByteArray.ungzip(offset: Int, length: Int): ByteArray {
@Suppress("FunctionName")
public actual fun GzipDecompressionInput(source: Input): Input {
return ZlibInput(
source = this.toReadPacket(offset, length),
source = source,
zlibInit = { inflateInit2(it, 15 or 16) },
zlibProcess = { z, flush -> inflate(z, flush) },
zlibHasPending = null,
zlibFlushMode = { if (it) Z_SYNC_FLUSH else Z_NO_FLUSH },
zlibEnd = { inflateEnd(it) },
).readBytes()
)
}
public actual fun ByteArray.deflate(offset: Int, length: Int): ByteArray {
@Suppress("FunctionName")
public actual fun InflateInput(source: Input): Input {
return ZlibInput(
source = this.toReadPacket(offset, length),
source = source,
zlibInit = { inflateInit2(it, 15) },
zlibProcess = { z, flush -> inflate(z, flush) },
zlibHasPending = null,
zlibFlushMode = { if (it) Z_SYNC_FLUSH else Z_NO_FLUSH },
zlibEnd = { inflateEnd(it) },
)
}
@Suppress("FunctionName")
public actual fun DeflateInput(source: Input): Input {
return ZlibInput(
source = source,
zlibInit = { deflateInit(it, Z_DEFAULT_COMPRESSION) },
zlibProcess = { z, flush -> deflate(z, flush) },
zlibHasPending = { z ->
@ -69,9 +101,7 @@ public actual fun ByteArray.deflate(offset: Int, length: Int): ByteArray {
val pendingBytes = cValue<UIntVar>().ptr
val pendingBits = cValue<IntVar>().ptr
debug { "deflatePending checking" }
if (deflatePending(z, pendingBytes, pendingBits) != Z_OK) {
debug { "deflatePending: failed" }
false
} else {
pendingBytes.pointed.value > 0u || pendingBits.pointed.value > 0
@ -80,391 +110,256 @@ public actual fun ByteArray.deflate(offset: Int, length: Int): ByteArray {
},
zlibFlushMode = { if (it) Z_FINISH else Z_NO_FLUSH },
zlibEnd = { deflateEnd(it) },
).readBytes()
)
}
public actual fun ByteArray.inflate(offset: Int, length: Int): ByteArray {
// val input = this
// val output = ByteArray((10 * length) + 12)
// this.usePinned { pin ->
// output.usePinned { outPin ->
// memScoped {
// val values = ulongArrayOf(output.size.toULong())
// values.usePinned { sizes ->
// uncompress(
// outPin.addressOf(0).reinterpret(),
// sizes.addressOf(0).reinterpret(),
// pin.addressOf(offset).reinterpret(),
// length.convert(),
// )
// }
//
// return output.copyOf(values[0].toInt())
//
// }
// }
// return output
// }
return ZlibInput(
source = this.toReadPacket(offset, length),
zlibInit = { inflateInit2(it, 15) },
zlibProcess = { z, flush -> inflate(z, flush) },
zlibHasPending = null,
zlibFlushMode = { if (it) Z_SYNC_FLUSH else Z_NO_FLUSH },
zlibEnd = { inflateEnd(it) },
).readBytes()
/**
* Input will be closed.
*/
public actual fun Input.gzipAllAvailable(): ByteArray {
return GzipCompressionInput(this).use { it.readBytes() }
}
private const val debugging = false
private inline fun debug(string: () -> String) {
if (debugging) println(string())
/**
* Input will be closed.
*/
public actual fun Input.ungzipAllAvailable(): ByteArray {
return GzipDecompressionInput(this).use { it.readBytes() }
}
private inline fun debug() {
if (debugging) println()
/**
* Input will be closed.
*/
public actual fun Input.inflateAllAvailable(): ByteArray {
return InflateInput(this).use { it.readBytes() }
}
private fun ZlibInput(
source: Input,
/**
* Input will be closed.
*/
public actual fun Input.deflateAllAvailable(): ByteArray {
return DeflateInput(this).use { it.readBytes() }
}
/**
* [source] will be closed on [ZlibInput.close]
*/
internal class ZlibInput(
private val source: Input,
zlibInit: (z_streamp) -> Int,
zlibProcess: (z_streamp, flush: Int) -> Int,
zlibHasPending: ((z_streamp) -> Boolean)?, // null lambda means operation not defined
zlibFlushMode: (shouldFlushAll: Boolean) -> Int,
zlibEnd: (z_streamp) -> Int,
): Input {
val z = nativeHeap.alloc<z_stream>()
private val zlibProcess: (z_streamp, flush: Int) -> Int,
private val zlibHasPending: ((z_streamp) -> Boolean)?, // null lambda means operation not defined
private val zlibFlushMode: (shouldFlushAll: Boolean) -> Int,
private val zlibEnd: (z_streamp) -> Int,
) : Input {
private val z: z_stream = nativeHeap.alloc()
// Zlib manual: https://refspecs.linuxbase.org/LSB_3.0.0/LSB-Core-generic/LSB-Core-generic/zlib-inflate-1.html
val r = zlibInit(z.ptr)
if (r != 0) {
nativeHeap.free(z)
error("Failed to init zlib: $r (${getZlibError(r)})")
}
return object : Input {
@Deprecated(
"Not supported anymore. All operations are big endian by default. Use readXXXLittleEndian or readXXX then X.reverseByteOrder() instead.",
level = DeprecationLevel.ERROR
)
override var byteOrder: ByteOrder
get() = throw UnsupportedOperationException()
set(_) {
throw UnsupportedOperationException()
}
private inline val BUFFER_TOTAL_SIZE get() = ZLIB_BUFFER_SIZE
private var bufferReadableSize = 0
private val inputBuffer = nativeHeap.allocArray<ByteVar>(BUFFER_TOTAL_SIZE)
private val buffer = nativeHeap.allocArray<ByteVar>(BUFFER_TOTAL_SIZE)
private var bufferIndex = 0L
private var closed: Boolean = false
override val endOfInput: Boolean
get() = closed || !prepare()
override fun close() {
debug { "closing" }
if (closed) return
this.closed = true
zlibEnd(z.ptr)
init {
val r = zlibInit(z.ptr)
if (r != 0) {
nativeHeap.free(z)
nativeHeap.free(buffer)
nativeHeap.free(inputBuffer)
error("Failed to init zlib: $r (${getZlibError(r)})")
}
}
@Deprecated(
"Not supported anymore. All operations are big endian by default. Use readXXXLittleEndian or readXXX then X.reverseByteOrder() instead.",
level = DeprecationLevel.ERROR
)
override var byteOrder: ByteOrder
get() = throw UnsupportedOperationException()
set(_) {
throw UnsupportedOperationException()
}
override fun discard(n: Long): Long {
if (closed) {
return 0
private var bufferReadableSize = 0
private val inputBuffer = nativeHeap.allocArray<ByteVar>(ZLIB_BUFFER_SIZE)
private val buffer = nativeHeap.allocArray<ByteVar>(ZLIB_BUFFER_SIZE)
private var bufferIndex = 0L
private var closed: Boolean = false
override val endOfInput: Boolean
get() = closed || !prepare()
override fun close() {
debug { "closing" }
if (closed) return
this.closed = true
source.close()
zlibEnd(z.ptr)
nativeHeap.free(z)
nativeHeap.free(buffer)
nativeHeap.free(inputBuffer)
}
override fun discard(n: Long): Long {
if (closed) {
return 0
}
val old = bufferIndex
if (old >= bufferReadableSize) {
if (prepare()) return discard(n)
return 0
}
bufferIndex = (bufferIndex + n).coerceAtMost(ZLIB_BUFFER_SIZE)
return bufferIndex - old
}
override fun peekTo(destination: Memory, destinationOffset: Long, offset: Long, min: Long, max: Long): Long {
debug()
debug { "peekTo" }
require(min <= max) { "min > max" }
if (!prepare()) return 0
val readableLength = (bufferReadableSize - bufferIndex - offset).coerceAtLeast(0)
if (offset > readableLength) {
if (min == 0L) {
throw EOFException("offset($offset) > readableLength($readableLength)")
}
val old = bufferIndex
if (old >= bufferReadableSize) {
if (prepare()) return discard(n)
return 0
}
bufferIndex = (bufferIndex + n).coerceAtMost(ZLIB_BUFFER_SIZE)
return bufferIndex - old
}
if (min > readableLength) return 0
val len = readableLength.coerceAtMost(max).coerceAtMost(destination.size)
debug { "peekTo: read $len" }
buffer.copyTo(destination, bufferIndex + offset, len, destinationOffset)
return len
}
override fun readByte(): Byte {
if (!prepare()) {
throw EOFException("One more byte required")
}
return buffer[bufferIndex++]
}
override fun tryPeek(): Int {
if (!prepare()) {
return -1
}
return buffer[bufferIndex].toIntUnsigned()
}
private fun prepare(): Boolean {
if (closed) {
return false
}
debug { "prepare: bufferIndex = $bufferIndex, bufferReadableSize = $bufferReadableSize" }
if (bufferIndex < bufferReadableSize) {
debug { "prepare returned, because " }
return true // has buf unused
}
override fun peekTo(destination: Memory, destinationOffset: Long, offset: Long, min: Long, max: Long): Long {
debug()
debug { "peekTo" }
require(min <= max) { "min > max" }
if (!prepare()) return 0
val readableLength = (bufferReadableSize - bufferIndex - offset).coerceAtLeast(0)
if (offset > readableLength) {
if (min == 0L) {
throw EOFException("offset($offset) > readableLength($readableLength)")
}
}
if (min > readableLength) return 0
val len = readableLength.coerceAtMost(max).coerceAtMost(destination.size)
debug { "peekTo: read $len" }
buffer.copyTo(destination, bufferIndex + offset, len, destinationOffset)
bufferIndex = 0
debug { "prepare: previous value: z.avail_in=${z.avail_in}, z.avail_out=${z.avail_out}" }
return len
if (z.avail_in == 0u) {
// These two cases are similar.
// if (z.avail_out == 0u) {
// // Last time we used all the output, there is either something cached in Zlib, or no further source.
// } else {
// // We did not use all the inputs, meaning least time we used all avail_in.
// }
// bot input and output are used
val flush = updateAvailIn() ?: return false
copyOutputsFromZlib(flush)
} else {
// Inputs not used up.
copyOutputsFromZlib(Z_NO_FLUSH)
}
override fun readByte(): Byte {
if (!prepare()) {
throw EOFException("One more byte required")
}
return buffer[bufferIndex++]
return true
}
private fun copyOutputsFromZlib(flush: Int): Boolean {
z.avail_out = ZLIB_BUFFER_SIZE.toUInt()
z.next_out = buffer.reinterpret()
// We still have input, no need to update.
debug { "Set z.avail_out=${z.avail_out}, z.next_out=buffer.reinterpret()" }
debug { "Calling zlib, flush = $flush" }
val p = zlibProcess(z.ptr, flush)
when (p) {
Z_BUF_ERROR -> error("Zlib failed to process data. (Z_BUF_ERROR)")
Z_MEM_ERROR -> throw OutOfMemoryError("Insufficient native heap memory for Zlib. (Z_MEM_ERROR)")
Z_STREAM_ERROR -> error("Zlib failed to process data. (Z_STREAM_ERROR)")
Z_DATA_ERROR -> error("Zlib failed to process data. (Z_DATA_ERROR)")
Z_NEED_DICT -> error("Zlib failed to process data. (Z_NEED_DICT)")
else -> debug { "zlib: $p" }
}
bufferReadableSize = (ZLIB_BUFFER_SIZE.toUInt() - z.avail_out).toInt()
override fun tryPeek(): Int {
if (!prepare()) {
return -1
}
return buffer[bufferIndex].toIntUnsigned()
}
private fun prepare(): Boolean {
if (closed) {
return false
}
debug { "prepare: bufferIndex = $bufferIndex, bufferReadableSize = $bufferReadableSize" }
if (bufferIndex < bufferReadableSize) {
debug { "prepare returned, because " }
return true // has buf unused
}
bufferIndex = 0
debug { "prepare: previous value: z.avail_in=${z.avail_in}, z.avail_out=${z.avail_out}" }
if (z.avail_in == 0u) {
if (z.avail_out == 0u) {
// Last time we used all the output, there is either something cached in Zlib, or no further source.
debug { "both used" }
// bot input and output are used
val flush = updateAvailIn() ?: return false
copyOutputsFromZlib(flush)
} else {
// We did not use all the inputs, meaning least time we used all avail_in.
debug { "both used2" }
val flush = updateAvailIn() ?: return false
copyOutputsFromZlib(flush)
}
} else {
// Inputs not used up.
copyOutputsFromZlib(Z_NO_FLUSH)
}
debug { "Zlib produced bufferReadableSize=$bufferReadableSize bytes" }
debug { "Partial output: ${buffer.readBytes(bufferReadableSize).toUHexString()}" }
debug { "Now z.avail_in=${z.avail_in}, z.avail_out=${z.avail_out}" }
if (p == Z_FINISH) {
debug { "Zlib returned Z_FINISH. Ignoring result check." }
return true
}
private fun copyOutputsFromZlib(flush: Int): Boolean {
z.avail_out = BUFFER_TOTAL_SIZE.toUInt()
z.next_out = buffer.reinterpret()
if (p == Z_STREAM_END) {
debug { "Zlib returned Z_STREAM_END. Ignoring result check." }
return true
}
// We still have input, no need to update.
debug { "Set z.avail_out=${z.avail_out}, z.next_out=buffer.reinterpret()" }
debug { "Calling zlib, flush = $flush" }
val p = zlibProcess(z.ptr, flush)
when (p) {
Z_BUF_ERROR -> error("Zlib failed to process data. (Z_BUF_ERROR)")
Z_MEM_ERROR -> throw OutOfMemoryError("Insufficient native heap memory for Zlib. (Z_MEM_ERROR)")
Z_STREAM_ERROR -> error("Zlib failed to process data. (Z_STREAM_ERROR)")
Z_DATA_ERROR -> error("Zlib failed to process data. (Z_DATA_ERROR)")
Z_NEED_DICT -> error("Zlib failed to process data. (Z_NEED_DICT)")
else -> debug { "zlib: $p" }
}
bufferReadableSize = (BUFFER_TOTAL_SIZE.toUInt() - z.avail_out).toInt()
debug { "Zlib produced bufferReadableSize=$bufferReadableSize bytes" }
debug { "Partial output: ${buffer.readBytes(bufferReadableSize).toUHexString()}" }
debug { "Now z.avail_in=${z.avail_in}, z.avail_out=${z.avail_out}" }
if (p == Z_FINISH) {
debug { "Zlib returned Z_FINISH. Ignoring result check." }
return true
}
if (p == Z_STREAM_END) {
debug { "Zlib returned Z_STREAM_END. Ignoring result check." }
return true
}
if (bufferReadableSize == 0 && (z.avail_in == 0u && source.endOfInput)) {
if (zlibHasPending?.invoke(z.ptr) == true) {
// has pending. So the data must be incomplete.
error("Failed to process data, possibly bad data inputted.")
if (bufferReadableSize == 0 && (z.avail_in == 0u && source.endOfInput)) {
if (zlibHasPending?.invoke(z.ptr) == true) {
// has pending. So the data must be incomplete.
error("Failed to process data, possibly bad data inputted.")
// if (z.avail_in == 0u && source.endOfInput) {
// // no any input.
// } else {
// // there's some input, so we can still read.
// }
} else {
// no pending, but we should expect Z_FINISH in this case.
error("Zlib read 0 byte, but it should not happen.")
}
// can't read
} else {
// no pending, but we should expect Z_FINISH in this case.
error("Zlib read 0 byte, but it should not happen.")
}
return true
}
private fun updateAvailIn(): Int? {
val read = source.readAvailable(inputBuffer, 0, BUFFER_TOTAL_SIZE)
if (read == 0L) {
debug { "updateAvailIn: endOfInput, closing" }
close() // automatically close
return null // no more source available
}
z.avail_in = read.toUInt()
val flush = zlibFlushMode(read < BUFFER_TOTAL_SIZE || source.endOfInput)
println(inputBuffer.readBytes(read.toInt()).toUHexString())
z.next_in = inputBuffer.reinterpret()
debug { "Updated availIn: z.avail_in=${z.avail_in}, z.next_in = inputBuffer.reinterpret()" }
return flush
// can't read
}
return true
}
// https://refspecs.linuxbase.org/LSB_3.0.0/LSB-Core-generic/LSB-Core-generic/zlib-inflate-1.html
private fun updateAvailIn(): Int? {
val read = source.readAvailable(inputBuffer, 0, ZLIB_BUFFER_SIZE)
if (read == 0L) {
debug { "updateAvailIn: endOfInput, closing" }
close() // automatically close
return null // no more source available
}
z.avail_in = read.toUInt()
val flush = zlibFlushMode(read < ZLIB_BUFFER_SIZE || source.endOfInput)
debug { "inputBuffer content: " + inputBuffer.readBytes(read.toInt()).toUHexString() }
z.next_in = inputBuffer.reinterpret()
debug { "Updated availIn: z.avail_in=${z.avail_in}, z.next_in = inputBuffer.reinterpret()" }
return flush
}
// val input = source
// input.usePinned { pin ->
// buildPacket {
// memScoped {
// val outBuffer = ByteArray(ZLIB_BUFFER_SIZE)
// check(outBuffer.isNotEmpty())
// outBuffer.usePinned { out ->
//
// val z = alloc<z_stream>()
// z.avail_out = outBuffer.size.toUInt()
// val outBufferAddr = out.addressOf(0)
// z.next_out = outBufferAddr.reinterpret()
//
// checkZlibResp(zlibInit(z.ptr), "zlibInit")
//
// // On success, inflate() shall return
// // Z_OK
// // if decompression progress has been made, or
// //
// // Z_STREAM_END
// // if all of the input data has been decompressed and
// // there was sufficient space in the output buffer to store the uncompressed result.
// //
// // On error, inflate() shall return a value to indicate the error.
// // Z_BUF_ERROR
// // No progress is possible; either avail_in or avail_out was zero.
// //
// // Z_MEM_ERROR
// // Insufficient memory.
// //
// // Z_STREAM_ERROR
// // The state (as represented in stream) is inconsistent, or stream was NULL.
// //
// // Z_NEED_DICT
// // A preset dictionary is required. The adler field shall be set to the Adler-32 checksum of the dictionary chosen by the compressor.
//
// var flush: Boolean = false;
// do {
// input.readAvailable()
// z.avail_in = length.toUInt()
// z.next_in = pin.addressOf(offset).reinterpret()
//
// } while ()
//
// while (true) {
// val r = zlibProcess(z.ptr)
// when (r) {
// Z_OK, Z_STREAM_END -> {
// val wroteSize = outBuffer.size - z.avail_out.toInt()
// debug(wroteSize)
//
// this.writeFully(outBuffer, 0, wroteSize)
//
// if (z.avail_out == 0u) {
// if (z.avail_in == 0u) {
//
// } else {
// z.avail_out = outBuffer.size.toUInt()
// z.next_out = outBufferAddr.reinterpret()
// continue
// }
// } else {
//// checkZlibResp(zlibEnd(z.ptr), "zlibEnd")
//// return build()
//
// }
// if (r == Z_STREAM_END) {
//// check(r == Z_STREAM_END) { "Zlib result must be Z_STREAM_END" }
// checkZlibResp(zlibEnd(z.ptr), "zlibEnd")
// return this.build()
// }
//
// }
// Z_BUF_ERROR -> error("Zlib failed to process data. (Z_BUF_ERROR)")
// Z_MEM_ERROR -> throw OutOfMemoryError("Insufficient native heap memory for Zlib. (Z_MEM_ERROR)")
// Z_STREAM_ERROR -> error("Zlib failed to process data. (Z_STREAM_ERROR)")
// Z_NEED_DICT -> error("Zlib failed to process data. (Z_NEED_DICT)")
// else -> error("Internal error: unexpected result from Zlib: $r")
// }
//
//// if (r == Z_STREAM_END && z.) {
//// checkZlibResp(zlibEnd(z.ptr), "zlibEnd")
//// return build()
//// }
// }
//
//// while (true) {
//// val r = zlibProcess(z.ptr)
//// when (r) {
//// Z_OK, Z_STREAM_END -> {
//// val wroteSize = outBuffer.size - z.avail_out.toInt()
//// debug(wroteSize)
////
//// writeFully(outBuffer, 0, wroteSize)
////
//// if (z.avail_out == 0u) {
//// if (z.avail_in == 0u) {
////
//// } else {
//// z.avail_out = outBuffer.size.toUInt()
//// z.next_out = outBufferAddr.reinterpret()
//// continue
//// }
//// } else {
////// checkZlibResp(zlibEnd(z.ptr), "zlibEnd")
////// return build()
////
//// }
//// if (r == Z_STREAM_END) {
////// check(r == Z_STREAM_END) { "Zlib result must be Z_STREAM_END" }
//// checkZlibResp(zlibEnd(z.ptr), "zlibEnd")
//// return build()
//// }
////
//// }
//// Z_BUF_ERROR -> error("Zlib failed to process data. (Z_BUF_ERROR)")
//// Z_MEM_ERROR -> throw OutOfMemoryError("Insufficient native heap memory for Zlib. (Z_MEM_ERROR)")
//// Z_STREAM_ERROR -> error("Zlib failed to process data. (Z_STREAM_ERROR)")
//// Z_NEED_DICT -> error("Zlib failed to process data. (Z_NEED_DICT)")
//// else -> error("Internal error: unexpected result from Zlib: $r")
//// }
////
////// if (r == Z_STREAM_END && z.) {
////// checkZlibResp(zlibEnd(z.ptr), "zlibEnd")
////// return build()
////// }
//// }
//
// }
// }
// }
// }
// error("Unreachable") // no contract was declared for memScoped and usePinned
}
private companion object {
private fun getZlibError(it: Int): String {
return when (it) {
Z_DATA_ERROR -> "Z_DATA_ERROR"
Z_STREAM_ERROR -> "Z_STREAM_ERROR"
else -> "Unknown error $it"
}
}
private fun getZlibError(it: Int): String {
return when (it) {
Z_DATA_ERROR -> "Z_DATA_ERROR"
Z_STREAM_ERROR -> "Z_STREAM_ERROR"
else -> "Unknown error $it"
private const val debugging = false
private inline fun debug(string: () -> String) {
if (debugging) println(string())
}
private inline fun debug() {
if (debugging) println()
}
}
}

File diff suppressed because one or more lines are too long

View File

@ -24,10 +24,7 @@ import net.mamoe.mirai.internal.message.protocol.serialization.MessageSerializer
import net.mamoe.mirai.internal.message.runWithBugReport
import net.mamoe.mirai.internal.network.protocol.data.proto.ImMsgBody
import net.mamoe.mirai.message.data.*
import net.mamoe.mirai.utils.deflate
import net.mamoe.mirai.utils.hexToBytes
import net.mamoe.mirai.utils.inflate
import net.mamoe.mirai.utils.toUHexString
import net.mamoe.mirai.utils.*
/**
* Handles:
@ -143,7 +140,7 @@ internal class RichMessageProtocol : MessageProtocol() {
{ "resId=" + lightApp.msgResid + "data=" + lightApp.data.toUHexString() }) {
when (lightApp.data[0].toInt()) {
0 -> lightApp.data.decodeToString(startIndex = 1)
1 -> lightApp.data.inflate(1).decodeToString()
1 -> lightApp.data.toReadPacket(offset = 1).inflateInput().readText()
else -> error("unknown compression flag=${lightApp.data[0]}")
}
}
@ -162,7 +159,7 @@ internal class RichMessageProtocol : MessageProtocol() {
val content = runWithBugReport("解析 richMsg", { richMsg.template1.toUHexString() }) {
when (richMsg.template1[0].toInt()) {
0 -> richMsg.template1.decodeToString(startIndex = 1)
1 -> richMsg.template1.inflate(1).decodeToString()
1 -> richMsg.template1.toReadPacket(offset = 1).inflateInput().readText()
else -> error("unknown compression flag=${richMsg.template1[0]}")
}
}

View File

@ -222,14 +222,12 @@ internal class PacketCodecImpl : PacketCodec {
}
1 -> {
input.discardExact(4)
input.useBytes { data, length ->
data.inflate(0, length).let {
val size = it.toInt()
if (size == it.size || size == it.size + 4) {
it.toReadPacket(offset = 4)
} else {
it.toReadPacket()
}
input.inflateAllAvailable().let { bytes ->
val size = bytes.toInt()
if (size == bytes.size || size == bytes.size + 4) {
bytes.toReadPacket(offset = 4)
} else {
bytes.toReadPacket()
}
}
}

View File

@ -558,7 +558,7 @@ internal class ImMsgBody : ProtoBuf {
return when (byteArray[0].toInt()) {
0 -> byteArrayOf(0) + byteArray.decodeToString(startIndex = 1).toByteArray()
1 -> byteArrayOf(0) + byteArray.inflate(offset = 1).decodeToString().toByteArray()
1 -> byteArrayOf(0) + byteArray.inflate(offset = 1)
else -> error("unknown compression flag=${byteArray[0]}")
}
}