1
0
mirror of https://github.com/google/leveldb.git synced 2025-04-25 14:00:27 +08:00
This commit is contained in:
tanghengjian 2025-02-06 19:43:21 +08:00 committed by GitHub
commit 2a0a4b5520
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 192 additions and 4 deletions

View File

@ -42,6 +42,7 @@ check_library_exists(crc32c crc32c_value "" HAVE_CRC32C)
check_library_exists(snappy snappy_compress "" HAVE_SNAPPY)
check_library_exists(zstd zstd_compress "" HAVE_ZSTD)
check_library_exists(tcmalloc malloc "" HAVE_TCMALLOC)
check_library_exists(lz4 lz4compress "" HAVE_LZ4)
include(CheckCXXSymbolExists)
# Using check_cxx_symbol_exists() instead of check_c_symbol_exists() because
@ -281,6 +282,10 @@ if(HAVE_TCMALLOC)
target_link_libraries(leveldb tcmalloc)
endif(HAVE_TCMALLOC)
if(HAVE_LZ4)
target_link_libraries(leveldb lz4)
endif(HAVE_LZ4)
# Needed by port_stdcxx.h
find_package(Threads REQUIRED)
target_link_libraries(leveldb Threads::Threads)

View File

@ -61,6 +61,8 @@ static const char* FLAGS_benchmarks =
"crc32c,"
"snappycomp,"
"snappyuncomp,"
"lz4comp,"
"lz4uncomp,";
"zstdcomp,"
"zstduncomp,";
@ -635,6 +637,10 @@ class Benchmark {
method = &Benchmark::SnappyCompress;
} else if (name == Slice("snappyuncomp")) {
method = &Benchmark::SnappyUncompress;
} else if (name == Slice("lz4comp")) {
method = &Benchmark::Lz4Compress;
} else if (name == Slice("lz4uncomp")) {
method = &Benchmark::Lz4Uncompress;
} else if (name == Slice("zstdcomp")) {
method = &Benchmark::ZstdCompress;
} else if (name == Slice("zstduncomp")) {
@ -799,6 +805,55 @@ class Benchmark {
&port::Zstd_Uncompress);
}
void Lz4Compress(ThreadState* thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
int64_t bytes = 0;
int64_t produced = 0;
bool ok = true;
std::string compressed;
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Lz4_Compress(input.data(), input.size(), &compressed);
// std::fprintf(stdout, "Lz4_Compress
// compressed.size():%lu,compressed:%s\n",compressed.size(),compressed.c_str());
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedSingleOp();
}
if (!ok) {
thread->stats.AddMessage("(Lz4Compress failure)");
} else {
char buf[100];
std::snprintf(buf, sizeof(buf), "(output: %.1f)",
(produced * 100.0) / bytes);
thread->stats.AddMessage(buf);
thread->stats.AddBytes(bytes);
}
}
void Lz4Uncompress(ThreadState* thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
std::string compressed;
bool ok = port::Lz4_Compress(input.data(), input.size(), &compressed);
int64_t bytes = 0;
char* uncompressed = new char[input.size()];
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Lz4_UnCompress(compressed.data(), compressed.size(),
uncompressed, input.size());
bytes += input.size();
thread->stats.FinishedSingleOp();
}
delete[] uncompressed;
if (!ok) {
thread->stats.AddMessage("(Lz4Uncompress failure)");
} else {
thread->stats.AddBytes(bytes);
}
}
void Open() {
assert(db_ == nullptr);
Options options;

View File

@ -28,6 +28,7 @@ enum CompressionType {
kNoCompression = 0x0,
kSnappyCompression = 0x1,
kZstdCompression = 0x2,
kLz4Compression = 0x3,
};
// Options to control the behavior of a database (passed to DB::Open)

View File

@ -82,7 +82,8 @@ class LEVELDB_EXPORT TableBuilder {
private:
bool ok() const { return status().ok(); }
void WriteBlock(BlockBuilder* block, BlockHandle* handle);
void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle);
void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle,
size_t rawsize = 0);
struct Rep;
Rep* rep_;

View File

@ -33,6 +33,10 @@
#include <zstd.h>
#endif // HAVE_ZSTD
#if HAVE_LZ4
#include <lz4.h>
#endif // HAVE_LZ4
#include <cassert>
#include <condition_variable> // NOLINT
#include <cstddef>
@ -217,6 +221,61 @@ inline uint32_t AcceleratedCRC32C(uint32_t crc, const char* buf, size_t size) {
#endif // HAVE_CRC32C
}
inline bool Lz4_Compress(const char* input, size_t length,
std::string* output) {
#if HAVE_LZ4
// std::fprintf(stdout, "Lz4_Compress start\n");
int src_size = (int)(length);
// LZ4 provides a function that will tell you the maximum size of compressed
// output based on input data via LZ4_compressBound().
const int max_dst_size = LZ4_compressBound(src_size);
output->resize(max_dst_size);
int compressed_data_size =
LZ4_compress_default(input, &(*output)[0], src_size, max_dst_size);
// std::fprintf(stdout, "Lz4_Compress
// max_dst_size:%d,compressed_data_size:%d,output:%s\n",max_dst_size,compressed_data_size,(*output).c_str());
// Check return_value to determine what happened.
if (compressed_data_size <= 0) return false;
if (compressed_data_size > 0) {
output->resize(compressed_data_size);
return true;
}
#else
// Silence compiler warnings about unused arguments.
(void)input;
(void)length;
(void)output;
return 0;
#endif // HAVE_LZ4
}
inline bool Lz4_UnCompress(const char* input, size_t length, char* output,
size_t rawsize) {
#if HAVE_LZ4
// std::fprintf(stdout, "Lz4_UnCompress start\n");
// The LZ4_decompress_safe function needs to know where the compressed data
// is, how many bytes long it is, where the regen_buffer memory location is,
// and how large regen_buffer (uncompressed) output will be. Again, save the
// return_value.
const int decompressed_size =
LZ4_decompress_safe(input, output, length, rawsize);
// Check return_value to determine what happened.
if (decompressed_size <= 0) return false;
if (decompressed_size > 0) {
// std::fprintf(stdout, "Lz4_UnCompress end\n");
return true;
}
#else
// Silence compiler warnings about unused arguments.
(void)input;
(void)length;
(void)output;
return false;
#endif // HAVE_LZ4
}
} // namespace port
} // namespace leveldb

View File

@ -75,6 +75,7 @@ Status ReadBlock(RandomAccessFile* file, const ReadOptions& options,
// Read the block contents as well as the type/crc footer.
// See table_builder.cc for the code that built this structure.
size_t n = static_cast<size_t>(handle.size());
size_t rawsize = static_cast<size_t>(handle.rawsize());
char* buf = new char[n + kBlockTrailerSize];
Slice contents;
Status s = file->Read(handle.offset(), n + kBlockTrailerSize, &contents, buf);
@ -153,6 +154,19 @@ Status ReadBlock(RandomAccessFile* file, const ReadOptions& options,
result->cachable = true;
break;
}
case kLz4Compression: {
char* ubuf = new char[rawsize];
if (!port::Lz4_UnCompress(data, n, ubuf, rawsize)) {
delete[] buf;
delete[] ubuf;
return Status::Corruption("corrupted compressed block contents");
}
delete[] buf;
result->data = Slice(ubuf, rawsize);
result->heap_allocated = true;
result->cachable = true;
break;
}
default:
delete[] buf;
return Status::Corruption("bad block type");

View File

@ -35,12 +35,17 @@ class BlockHandle {
uint64_t size() const { return size_; }
void set_size(uint64_t size) { size_ = size; }
// The size of the raw stored block
uint64_t rawsize() const { return rawsize_; }
void set_rawsize(uint64_t size) { rawsize_ = size; }
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* input);
private:
uint64_t offset_;
uint64_t size_;
uint64_t rawsize_;
};
// Footer encapsulates the fixed information stored at the tail

View File

@ -168,7 +168,17 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
}
break;
}
case kLz4Compression: {
std::string* compressed = &r->compressed_output;
if (port::Lz4_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
case kZstdCompression: {
std::string* compressed = &r->compressed_output;
if (port::Zstd_Compress(r->options.zstd_compression_level, raw.data(),
@ -184,16 +194,18 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
break;
}
}
WriteRawBlock(block_contents, type, handle);
WriteRawBlock(block_contents, type, handle, raw.size());
r->compressed_output.clear();
block->Reset();
}
void TableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type, BlockHandle* handle) {
CompressionType type, BlockHandle* handle,
size_t rawsize) {
Rep* r = rep_;
handle->set_offset(r->offset);
handle->set_size(block_contents.size());
handle->set_rawsize(rawsize);
r->status = r->file->Append(block_contents);
if (r->status.ok()) {
char trailer[kBlockTrailerSize];

View File

@ -839,4 +839,40 @@ TEST_P(CompressionTableTest, ApproximateOffsetOfCompressed) {
ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 2 * min_z, 2 * max_z));
}
TEST(TableTest, ApproximateOffsetOfLZ4Compressed) {
if (!SnappyCompressionSupported()) {
std::fprintf(stderr, "skipping compression tests\n");
return;
}
Random rnd(301);
TableConstructor c(BytewiseComparator());
std::string tmp;
c.Add("k01", "hello");
c.Add("k02", test::CompressibleString(&rnd, 0.25, 10000, &tmp));
c.Add("k03", "hello3");
c.Add("k04", test::CompressibleString(&rnd, 0.25, 10000, &tmp));
std::vector<std::string> keys;
KVMap kvmap;
Options options;
options.block_size = 1024;
options.compression = kLz4Compression;
c.Finish(options, &keys, &kvmap);
// Expected upper and lower bounds of space used by compressible strings.
static const int kSlop = 1000; // Compressor effectiveness varies.
const int expected = 2500; // 10000 * compression ratio (0.25)
const int min_z = expected - kSlop;
const int max_z = expected + kSlop;
ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"), 0, kSlop));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"), 0, kSlop));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k02"), 0, kSlop));
// Have now emitted a large compressible string, so adjust expected offset.
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"), min_z, max_z));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"), min_z, max_z));
// Have now emitted two large compressible strings, so adjust expected offset.
ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 2 * min_z, 2 * max_z));
}
} // namespace leveldb