Use Zlib for compression/decompression on native

This commit is contained in:
Him188 2022-06-04 15:01:54 +01:00
parent 89fa379f8f
commit 4f61417af6
3 changed files with 498 additions and 81 deletions

View File

@ -14,7 +14,7 @@ import kotlin.annotation.AnnotationTarget.*
@RequiresOptIn("This can only be used in tests.", level = ERROR)
@Target(CLASS, FUNCTION, PROPERTY, CLASS, CONSTRUCTOR, FUNCTION, PROPERTY_GETTER)
@Target(CLASS, FUNCTION, PROPERTY, CLASS, CONSTRUCTOR, FUNCTION, PROPERTY_GETTER, PROPERTY_SETTER)
public annotation class TestOnly
/**

View File

@ -11,6 +11,8 @@
package net.mamoe.mirai.utils
import io.ktor.utils.io.bits.*
import io.ktor.utils.io.core.*
import kotlinx.cinterop.*
import platform.zlib.*
@ -29,93 +31,440 @@ public actual fun ByteArray.sha1(offset: Int, length: Int): ByteArray = SHA1.cre
return digest().bytes
}
public actual fun ByteArray.gzip(offset: Int, length: Int): ByteArray {
val output = ByteArray(length * 5)
output.usePinned { out ->
usePinned { pin ->
memScoped {
val z = alloc<z_stream>()
z.avail_in = size.toUInt()
z.next_in = pin.addressOf(0).reinterpret()
z.avail_out = output.size.toUInt()
val initialOutAddress = out.addressOf(0)
z.next_out = initialOutAddress.reinterpret()
deflateInit2(z.ptr, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 or 16, 8, Z_DEFAULT_STRATEGY)
deflate(z.ptr, Z_FINISH) // TODO: 2022/5/28 buf
deflateEnd(z.ptr)
/**
* WARNING: DO NOT SET THIS BUFFER TOO SMALL, OR YOU WILL SEE COMPRESSION ERROR.
*/
@set:TestOnly
public var ZLIB_BUFFER_SIZE: Long = 8192
val resultSize = z.next_out.toLong() - initialOutAddress.toLong()
return output.copyOf(resultSize.toInt())
}
}
}
public actual fun ByteArray.gzip(offset: Int, length: Int): ByteArray {
return ZlibInput(
source = this.toReadPacket(offset, length),
zlibInit = { deflateInit2(it, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 or 16, 8, Z_DEFAULT_STRATEGY) },
zlibProcess = { z, flush -> deflate(z, flush) },
zlibHasPending = null,
zlibFlushMode = { if (it) Z_FINISH else Z_NO_FLUSH },
zlibEnd = { deflateEnd(it) },
).readBytes()
}
// TODO: 2022/5/28 optimize length
public actual fun ByteArray.ungzip(offset: Int, length: Int): ByteArray {
val output = ByteArray(length)
output.usePinned { out ->
usePinned { pin ->
memScoped {
val z = alloc<z_stream>()
z.avail_in = size.toUInt()
z.next_in = pin.addressOf(0).reinterpret()
z.avail_out = output.size.toUInt()
val initialOutAddress = out.addressOf(0)
z.next_out = initialOutAddress.reinterpret()
inflateInit2(z.ptr, 15 or 16)
inflate(z.ptr, Z_FINISH)
inflateEnd(z.ptr)
val resultSize = z.next_out.toLong() - initialOutAddress.toLong()
return output.copyOf(resultSize.toInt())
}
}
}
return ZlibInput(
source = this.toReadPacket(offset, length),
zlibInit = { inflateInit2(it, 15 or 16) },
zlibProcess = { z, flush -> inflate(z, flush) },
zlibHasPending = null,
zlibFlushMode = { if (it) Z_SYNC_FLUSH else Z_NO_FLUSH },
zlibEnd = { inflateEnd(it) },
).readBytes()
}
public actual fun ByteArray.deflate(offset: Int, length: Int): ByteArray {
val output = ByteArray(length * 2)
output.usePinned { out ->
usePinned { pin ->
return ZlibInput(
source = this.toReadPacket(offset, length),
zlibInit = { deflateInit(it, Z_DEFAULT_COMPRESSION) },
zlibProcess = { z, flush -> deflate(z, flush) },
zlibHasPending = { z ->
memScoped {
val z = alloc<z_stream>()
z.avail_in = size.toUInt()
z.next_in = pin.addressOf(0).reinterpret()
z.avail_out = output.size.toUInt()
val initialOutAddress = out.addressOf(0)
z.next_out = initialOutAddress.reinterpret()
deflateInit(z.ptr, Z_DEFAULT_COMPRESSION)
deflate(z.ptr, Z_FINISH)
deflateEnd(z.ptr)
val pendingBytes = cValue<UIntVar>().ptr
val pendingBits = cValue<IntVar>().ptr
val resultSize = z.next_out.toLong() - initialOutAddress.toLong()
return output.copyOf(resultSize.toInt())
debug { "deflatePending checking" }
if (deflatePending(z, pendingBytes, pendingBits) != Z_OK) {
debug { "deflatePending: failed" }
false
} else {
pendingBytes.pointed.value > 0u || pendingBits.pointed.value > 0
}
}
}
}
},
zlibFlushMode = { if (it) Z_FINISH else Z_NO_FLUSH },
zlibEnd = { deflateEnd(it) },
).readBytes()
}
public actual fun ByteArray.inflate(offset: Int, length: Int): ByteArray {
val output = ByteArray(length)
output.usePinned { out ->
usePinned { pin ->
memScoped {
val z = alloc<z_stream>()
z.avail_in = size.toUInt()
z.next_in = pin.addressOf(0).reinterpret()
z.avail_out = output.size.toUInt()
val initialOutAddress = out.addressOf(0)
z.next_out = initialOutAddress.reinterpret()
inflateInit(z.ptr)
inflate(z.ptr, Z_FINISH)
inflateEnd(z.ptr)
// val input = this
// val output = ByteArray((10 * length) + 12)
// this.usePinned { pin ->
// output.usePinned { outPin ->
// memScoped {
// val values = ulongArrayOf(output.size.toULong())
// values.usePinned { sizes ->
// uncompress(
// outPin.addressOf(0).reinterpret(),
// sizes.addressOf(0).reinterpret(),
// pin.addressOf(offset).reinterpret(),
// length.convert(),
// )
// }
//
// return output.copyOf(values[0].toInt())
//
// }
// }
// return output
// }
val resultSize = z.next_out.toLong() - initialOutAddress.toLong()
return output.copyOf(resultSize.toInt())
return ZlibInput(
source = this.toReadPacket(offset, length),
zlibInit = { inflateInit2(it, 15) },
zlibProcess = { z, flush -> inflate(z, flush) },
zlibHasPending = null,
zlibFlushMode = { if (it) Z_SYNC_FLUSH else Z_NO_FLUSH },
zlibEnd = { inflateEnd(it) },
).readBytes()
}
private const val debugging = false
private inline fun debug(string: () -> String) {
if (debugging) println(string())
}
private inline fun debug() {
if (debugging) println()
}
private fun ZlibInput(
source: Input,
zlibInit: (z_streamp) -> Int,
zlibProcess: (z_streamp, flush: Int) -> Int,
zlibHasPending: ((z_streamp) -> Boolean)?, // null lambda means operation not defined
zlibFlushMode: (shouldFlushAll: Boolean) -> Int,
zlibEnd: (z_streamp) -> Int,
): Input {
val z = nativeHeap.alloc<z_stream>()
val r = zlibInit(z.ptr)
if (r != 0) {
nativeHeap.free(z)
error("Failed to init zlib: $r (${getZlibError(r)})")
}
return object : Input {
@Deprecated(
"Not supported anymore. All operations are big endian by default. Use readXXXLittleEndian or readXXX then X.reverseByteOrder() instead.",
level = DeprecationLevel.ERROR
)
override var byteOrder: ByteOrder
get() = throw UnsupportedOperationException()
set(_) {
throw UnsupportedOperationException()
}
private inline val BUFFER_TOTAL_SIZE get() = ZLIB_BUFFER_SIZE
private var bufferReadableSize = 0
private val inputBuffer = nativeHeap.allocArray<ByteVar>(BUFFER_TOTAL_SIZE)
private val buffer = nativeHeap.allocArray<ByteVar>(BUFFER_TOTAL_SIZE)
private var bufferIndex = 0L
private var closed: Boolean = false
override val endOfInput: Boolean
get() = closed || !prepare()
override fun close() {
debug { "closing" }
if (closed) return
this.closed = true
zlibEnd(z.ptr)
nativeHeap.free(z)
nativeHeap.free(buffer)
nativeHeap.free(inputBuffer)
}
override fun discard(n: Long): Long {
if (closed) {
return 0
}
val old = bufferIndex
if (old >= bufferReadableSize) {
if (prepare()) return discard(n)
return 0
}
bufferIndex = (bufferIndex + n).coerceAtMost(ZLIB_BUFFER_SIZE)
return bufferIndex - old
}
override fun peekTo(destination: Memory, destinationOffset: Long, offset: Long, min: Long, max: Long): Long {
debug()
debug { "peekTo" }
require(min <= max) { "min > max" }
if (!prepare()) return 0
val readableLength = (bufferReadableSize - bufferIndex - offset).coerceAtLeast(0)
if (offset > readableLength) {
if (min == 0L) {
throw EOFException("offset($offset) > readableLength($readableLength)")
}
}
if (min > readableLength) return 0
val len = readableLength.coerceAtMost(max).coerceAtMost(destination.size)
debug { "peekTo: read $len" }
buffer.copyTo(destination, bufferIndex + offset, len, destinationOffset)
return len
}
override fun readByte(): Byte {
if (!prepare()) {
throw EOFException("One more byte required")
}
return buffer[bufferIndex++]
}
override fun tryPeek(): Int {
if (!prepare()) {
return -1
}
return buffer[bufferIndex].toIntUnsigned()
}
private fun prepare(): Boolean {
if (closed) {
return false
}
debug { "prepare: bufferIndex = $bufferIndex, bufferReadableSize = $bufferReadableSize" }
if (bufferIndex < bufferReadableSize) {
debug { "prepare returned, because " }
return true // has buf unused
}
bufferIndex = 0
debug { "prepare: previous value: z.avail_in=${z.avail_in}, z.avail_out=${z.avail_out}" }
if (z.avail_in == 0u) {
if (z.avail_out == 0u) {
// Last time we used all the output, there is either something cached in Zlib, or no further source.
debug { "both used" }
// bot input and output are used
val flush = updateAvailIn() ?: return false
copyOutputsFromZlib(flush)
} else {
// We did not use all the inputs, meaning least time we used all avail_in.
debug { "both used2" }
val flush = updateAvailIn() ?: return false
copyOutputsFromZlib(flush)
}
} else {
// Inputs not used up.
copyOutputsFromZlib(Z_NO_FLUSH)
}
return true
}
private fun copyOutputsFromZlib(flush: Int): Boolean {
z.avail_out = BUFFER_TOTAL_SIZE.toUInt()
z.next_out = buffer.reinterpret()
// We still have input, no need to update.
debug { "Set z.avail_out=${z.avail_out}, z.next_out=buffer.reinterpret()" }
debug { "Calling zlib, flush = $flush" }
val p = zlibProcess(z.ptr, flush)
when (p) {
Z_BUF_ERROR -> error("Zlib failed to process data. (Z_BUF_ERROR)")
Z_MEM_ERROR -> throw OutOfMemoryError("Insufficient native heap memory for Zlib. (Z_MEM_ERROR)")
Z_STREAM_ERROR -> error("Zlib failed to process data. (Z_STREAM_ERROR)")
Z_DATA_ERROR -> error("Zlib failed to process data. (Z_DATA_ERROR)")
Z_NEED_DICT -> error("Zlib failed to process data. (Z_NEED_DICT)")
else -> debug { "zlib: $p" }
}
bufferReadableSize = (BUFFER_TOTAL_SIZE.toUInt() - z.avail_out).toInt()
debug { "Zlib produced bufferReadableSize=$bufferReadableSize bytes" }
debug { "Partial output: ${buffer.readBytes(bufferReadableSize).toUHexString()}" }
debug { "Now z.avail_in=${z.avail_in}, z.avail_out=${z.avail_out}" }
if (p == Z_FINISH) {
debug { "Zlib returned Z_FINISH. Ignoring result check." }
return true
}
if (p == Z_STREAM_END) {
debug { "Zlib returned Z_STREAM_END. Ignoring result check." }
return true
}
if (bufferReadableSize == 0 && (z.avail_in == 0u && source.endOfInput)) {
if (zlibHasPending?.invoke(z.ptr) == true) {
// has pending. So the data must be incomplete.
error("Failed to process data, possibly bad data inputted.")
// if (z.avail_in == 0u && source.endOfInput) {
// // no any input.
// } else {
// // there's some input, so we can still read.
// }
} else {
// no pending, but we should expect Z_FINISH in this case.
error("Zlib read 0 byte, but it should not happen.")
}
// can't read
}
return true
}
private fun updateAvailIn(): Int? {
val read = source.readAvailable(inputBuffer, 0, BUFFER_TOTAL_SIZE)
if (read == 0L) {
debug { "updateAvailIn: endOfInput, closing" }
close() // automatically close
return null // no more source available
}
z.avail_in = read.toUInt()
val flush = zlibFlushMode(read < BUFFER_TOTAL_SIZE || source.endOfInput)
println(inputBuffer.readBytes(read.toInt()).toUHexString())
z.next_in = inputBuffer.reinterpret()
debug { "Updated availIn: z.avail_in=${z.avail_in}, z.next_in = inputBuffer.reinterpret()" }
return flush
}
}
// https://refspecs.linuxbase.org/LSB_3.0.0/LSB-Core-generic/LSB-Core-generic/zlib-inflate-1.html
// val input = source
// input.usePinned { pin ->
// buildPacket {
// memScoped {
// val outBuffer = ByteArray(ZLIB_BUFFER_SIZE)
// check(outBuffer.isNotEmpty())
// outBuffer.usePinned { out ->
//
// val z = alloc<z_stream>()
// z.avail_out = outBuffer.size.toUInt()
// val outBufferAddr = out.addressOf(0)
// z.next_out = outBufferAddr.reinterpret()
//
// checkZlibResp(zlibInit(z.ptr), "zlibInit")
//
// // On success, inflate() shall return
// // Z_OK
// // if decompression progress has been made, or
// //
// // Z_STREAM_END
// // if all of the input data has been decompressed and
// // there was sufficient space in the output buffer to store the uncompressed result.
// //
// // On error, inflate() shall return a value to indicate the error.
// // Z_BUF_ERROR
// // No progress is possible; either avail_in or avail_out was zero.
// //
// // Z_MEM_ERROR
// // Insufficient memory.
// //
// // Z_STREAM_ERROR
// // The state (as represented in stream) is inconsistent, or stream was NULL.
// //
// // Z_NEED_DICT
// // A preset dictionary is required. The adler field shall be set to the Adler-32 checksum of the dictionary chosen by the compressor.
//
// var flush: Boolean = false;
// do {
// input.readAvailable()
// z.avail_in = length.toUInt()
// z.next_in = pin.addressOf(offset).reinterpret()
//
// } while ()
//
// while (true) {
// val r = zlibProcess(z.ptr)
// when (r) {
// Z_OK, Z_STREAM_END -> {
// val wroteSize = outBuffer.size - z.avail_out.toInt()
// debug(wroteSize)
//
// this.writeFully(outBuffer, 0, wroteSize)
//
// if (z.avail_out == 0u) {
// if (z.avail_in == 0u) {
//
// } else {
// z.avail_out = outBuffer.size.toUInt()
// z.next_out = outBufferAddr.reinterpret()
// continue
// }
// } else {
//// checkZlibResp(zlibEnd(z.ptr), "zlibEnd")
//// return build()
//
// }
// if (r == Z_STREAM_END) {
//// check(r == Z_STREAM_END) { "Zlib result must be Z_STREAM_END" }
// checkZlibResp(zlibEnd(z.ptr), "zlibEnd")
// return this.build()
// }
//
// }
// Z_BUF_ERROR -> error("Zlib failed to process data. (Z_BUF_ERROR)")
// Z_MEM_ERROR -> throw OutOfMemoryError("Insufficient native heap memory for Zlib. (Z_MEM_ERROR)")
// Z_STREAM_ERROR -> error("Zlib failed to process data. (Z_STREAM_ERROR)")
// Z_NEED_DICT -> error("Zlib failed to process data. (Z_NEED_DICT)")
// else -> error("Internal error: unexpected result from Zlib: $r")
// }
//
//// if (r == Z_STREAM_END && z.) {
//// checkZlibResp(zlibEnd(z.ptr), "zlibEnd")
//// return build()
//// }
// }
//
//// while (true) {
//// val r = zlibProcess(z.ptr)
//// when (r) {
//// Z_OK, Z_STREAM_END -> {
//// val wroteSize = outBuffer.size - z.avail_out.toInt()
//// debug(wroteSize)
////
//// writeFully(outBuffer, 0, wroteSize)
////
//// if (z.avail_out == 0u) {
//// if (z.avail_in == 0u) {
////
//// } else {
//// z.avail_out = outBuffer.size.toUInt()
//// z.next_out = outBufferAddr.reinterpret()
//// continue
//// }
//// } else {
////// checkZlibResp(zlibEnd(z.ptr), "zlibEnd")
////// return build()
////
//// }
//// if (r == Z_STREAM_END) {
////// check(r == Z_STREAM_END) { "Zlib result must be Z_STREAM_END" }
//// checkZlibResp(zlibEnd(z.ptr), "zlibEnd")
//// return build()
//// }
////
//// }
//// Z_BUF_ERROR -> error("Zlib failed to process data. (Z_BUF_ERROR)")
//// Z_MEM_ERROR -> throw OutOfMemoryError("Insufficient native heap memory for Zlib. (Z_MEM_ERROR)")
//// Z_STREAM_ERROR -> error("Zlib failed to process data. (Z_STREAM_ERROR)")
//// Z_NEED_DICT -> error("Zlib failed to process data. (Z_NEED_DICT)")
//// else -> error("Internal error: unexpected result from Zlib: $r")
//// }
////
////// if (r == Z_STREAM_END && z.) {
////// checkZlibResp(zlibEnd(z.ptr), "zlibEnd")
////// return build()
////// }
//// }
//
// }
// }
// }
// }
// error("Unreachable") // no contract was declared for memScoped and usePinned
}
private fun getZlibError(it: Int): String {
return when (it) {
Z_DATA_ERROR -> "Z_DATA_ERROR"
Z_STREAM_ERROR -> "Z_STREAM_ERROR"
else -> "Unknown error $it"
}
}

File diff suppressed because one or more lines are too long