1
0
mirror of https://github.com/google/leveldb.git synced 2025-04-25 14:00:27 +08:00

Merge branch 'main' into master

This commit is contained in:
tanghengjian 2023-09-13 11:49:47 +08:00 committed by GitHub
commit 47760bd45f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 390 additions and 83 deletions

View File

@ -66,10 +66,11 @@ jobs:
- name: Install dependencies on Linux
if: ${{ runner.os == 'Linux' }}
# libgoogle-perftools-dev is temporarily removed from the package list
# because it is currently broken on GitHub's Ubuntu 22.04.
run: |
sudo apt-get update
sudo apt-get install libgoogle-perftools-dev libkyotocabinet-dev \
libsnappy-dev libsqlite3-dev
sudo apt-get install libkyotocabinet-dev libsnappy-dev libsqlite3-dev
- name: Generate build config
run: >-

View File

@ -40,6 +40,7 @@ check_include_file("unistd.h" HAVE_UNISTD_H)
include(CheckLibraryExists)
check_library_exists(crc32c crc32c_value "" HAVE_CRC32C)
check_library_exists(snappy snappy_compress "" HAVE_SNAPPY)
check_library_exists(zstd zstd_compress "" HAVE_ZSTD)
check_library_exists(tcmalloc malloc "" HAVE_TCMALLOC)
check_library_exists(lz4 lz4compress "" HAVE_LZ4)
@ -274,6 +275,9 @@ endif(HAVE_CRC32C)
if(HAVE_SNAPPY)
target_link_libraries(leveldb snappy)
endif(HAVE_SNAPPY)
if(HAVE_ZSTD)
target_link_libraries(leveldb zstd)
endif(HAVE_ZSTD)
if(HAVE_TCMALLOC)
target_link_libraries(leveldb tcmalloc)
endif(HAVE_TCMALLOC)

View File

@ -1,4 +1,9 @@
**LevelDB is a fast key-value storage library written at Google that provides an ordered mapping from string keys to string values.**
LevelDB is a fast key-value storage library written at Google that provides an ordered mapping from string keys to string values.
> **This repository is receiving very limited maintenance. We will only review the following types of changes.**
>
> * Fixes for critical bugs, such as data loss or memory corruption
> * Changes absolutely needed by internally supported leveldb clients. These typically fix breakage introduced by a language/standard library/OS update
[![ci](https://github.com/google/leveldb/actions/workflows/build.yml/badge.svg)](https://github.com/google/leveldb/actions/workflows/build.yml)
@ -13,12 +18,12 @@ Authors: Sanjay Ghemawat (sanjay@google.com) and Jeff Dean (jeff@google.com)
* Multiple changes can be made in one atomic batch.
* Users can create a transient snapshot to get a consistent view of data.
* Forward and backward iteration is supported over the data.
* Data is automatically compressed using the [Snappy compression library](https://google.github.io/snappy/).
* Data is automatically compressed using the [Snappy compression library](https://google.github.io/snappy/), but [Zstd compression](https://facebook.github.io/zstd/) is also supported.
* External activity (file system operations etc.) is relayed through a virtual interface so users can customize the operating system interactions.
# Documentation
[LevelDB library documentation](https://github.com/google/leveldb/blob/master/doc/index.md) is online and bundled with the source code.
[LevelDB library documentation](https://github.com/google/leveldb/blob/main/doc/index.md) is online and bundled with the source code.
# Limitations
@ -72,6 +77,11 @@ Please see the CMake documentation and `CMakeLists.txt` for more advanced usage.
# Contributing to the leveldb Project
> **This repository is receiving very limited maintenance. We will only review the following types of changes.**
>
> * Bug fixes
> * Changes absolutely needed by internally supported leveldb clients. These typically fix breakage introduced by a language/standard library/OS update
The leveldb project welcomes contributions. leveldb's primary goal is to be
a reliable and fast key/value store. Changes that are in line with the
features/limitations outlined above, and meet the requirements below,
@ -113,7 +123,7 @@ Contributor License Agreement (CLA) at https://cla.developers.google.com/.
In order to keep the commit timeline linear
[squash](https://git-scm.com/book/en/v2/Git-Tools-Rewriting-History#Squashing-Commits)
your changes down to a single commit and [rebase](https://git-scm.com/docs/git-rebase)
on google/leveldb/master. This keeps the commit timeline linear and more easily sync'ed
on google/leveldb/main. This keeps the commit timeline linear and more easily sync'ed
with the internal repository at Google. More information at GitHub's
[About Git rebase](https://help.github.com/articles/about-git-rebase/) page.

View File

@ -63,6 +63,8 @@ static const char* FLAGS_benchmarks =
"snappyuncomp,"
"lz4comp,"
"lz4uncomp,";
"zstdcomp,"
"zstduncomp,";
// Number of key/values to place in database
static int FLAGS_num = 1000000;
@ -120,9 +122,15 @@ static bool FLAGS_use_existing_db = false;
// If true, reuse existing log/MANIFEST files when re-opening a database.
static bool FLAGS_reuse_logs = false;
// If true, use compression.
static bool FLAGS_compression = true;
// Use the db with the following name.
static const char* FLAGS_db = nullptr;
// ZSTD compression level to try out
static int FLAGS_zstd_compression_level = 1;
namespace leveldb {
namespace {
@ -366,6 +374,57 @@ struct ThreadState {
ThreadState(int index, int seed) : tid(index), rand(seed), shared(nullptr) {}
};
void Compress(
ThreadState* thread, std::string name,
std::function<bool(const char*, size_t, std::string*)> compress_func) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
int64_t bytes = 0;
int64_t produced = 0;
bool ok = true;
std::string compressed;
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = compress_func(input.data(), input.size(), &compressed);
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedSingleOp();
}
if (!ok) {
thread->stats.AddMessage("(" + name + " failure)");
} else {
char buf[100];
std::snprintf(buf, sizeof(buf), "(output: %.1f%%)",
(produced * 100.0) / bytes);
thread->stats.AddMessage(buf);
thread->stats.AddBytes(bytes);
}
}
void Uncompress(
ThreadState* thread, std::string name,
std::function<bool(const char*, size_t, std::string*)> compress_func,
std::function<bool(const char*, size_t, char*)> uncompress_func) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
std::string compressed;
bool ok = compress_func(input.data(), input.size(), &compressed);
int64_t bytes = 0;
char* uncompressed = new char[input.size()];
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = uncompress_func(compressed.data(), compressed.size(), uncompressed);
bytes += input.size();
thread->stats.FinishedSingleOp();
}
delete[] uncompressed;
if (!ok) {
thread->stats.AddMessage("(" + name + " failure)");
} else {
thread->stats.AddBytes(bytes);
}
}
} // namespace
class Benchmark {
@ -582,6 +641,10 @@ class Benchmark {
method = &Benchmark::Lz4Compress;
} else if (name == Slice("lz4uncomp")) {
method = &Benchmark::Lz4Uncompress;
} else if (name == Slice("zstdcomp")) {
method = &Benchmark::ZstdCompress;
} else if (name == Slice("zstduncomp")) {
method = &Benchmark::ZstdUncompress;
} else if (name == Slice("heapprofile")) {
HeapProfile();
} else if (name == Slice("stats")) {
@ -716,50 +779,30 @@ class Benchmark {
}
void SnappyCompress(ThreadState* thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
int64_t bytes = 0;
int64_t produced = 0;
bool ok = true;
std::string compressed;
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedSingleOp();
}
if (!ok) {
thread->stats.AddMessage("(snappy failure)");
} else {
char buf[100];
std::snprintf(buf, sizeof(buf), "(output: %.1f%%)",
(produced * 100.0) / bytes);
thread->stats.AddMessage(buf);
thread->stats.AddBytes(bytes);
}
Compress(thread, "snappy", &port::Snappy_Compress);
}
void SnappyUncompress(ThreadState* thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
std::string compressed;
bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
int64_t bytes = 0;
char* uncompressed = new char[input.size()];
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
uncompressed);
bytes += input.size();
thread->stats.FinishedSingleOp();
}
delete[] uncompressed;
Uncompress(thread, "snappy", &port::Snappy_Compress,
&port::Snappy_Uncompress);
}
if (!ok) {
thread->stats.AddMessage("(snappy failure)");
} else {
thread->stats.AddBytes(bytes);
}
void ZstdCompress(ThreadState* thread) {
Compress(thread, "zstd",
[](const char* input, size_t length, std::string* output) {
return port::Zstd_Compress(FLAGS_zstd_compression_level, input,
length, output);
});
}
void ZstdUncompress(ThreadState* thread) {
Uncompress(
thread, "zstd",
[](const char* input, size_t length, std::string* output) {
return port::Zstd_Compress(FLAGS_zstd_compression_level, input,
length, output);
},
&port::Zstd_Uncompress);
}
void Lz4Compress(ThreadState* thread) {
@ -826,6 +869,8 @@ class Benchmark {
options.max_open_files = FLAGS_open_files;
options.filter_policy = filter_policy_;
options.reuse_logs = FLAGS_reuse_logs;
options.compression =
FLAGS_compression ? kSnappyCompression : kNoCompression;
Status s = DB::Open(options, FLAGS_db, &db_);
if (!s.ok()) {
std::fprintf(stderr, "open error: %s\n", s.ToString().c_str());
@ -1100,6 +1145,9 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--reuse_logs=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_reuse_logs = n;
} else if (sscanf(argv[i], "--compression=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_compression = n;
} else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
FLAGS_num = n;
} else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {

View File

@ -1368,8 +1368,22 @@ Status DBImpl::MakeRoomForWrite(bool force) {
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
s = logfile_->Close();
if (!s.ok()) {
// We may have lost some data written to the previous log file.
// Switch to the new log file anyway, but record as a background
// error so we do not attempt any more writes.
//
// We could perhaps attempt to save the memtable corresponding
// to log file and suppress the error if that works, but that
// would add more complexity in a critical code path.
RecordBackgroundError(s);
}
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);

View File

@ -65,6 +65,19 @@ class AtomicCounter {
void DelayMilliseconds(int millis) {
Env::Default()->SleepForMicroseconds(millis * 1000);
}
bool IsLdbFile(const std::string& f) {
return strstr(f.c_str(), ".ldb") != nullptr;
}
bool IsLogFile(const std::string& f) {
return strstr(f.c_str(), ".log") != nullptr;
}
bool IsManifestFile(const std::string& f) {
return strstr(f.c_str(), "MANIFEST") != nullptr;
}
} // namespace
// Test Env to override default Env behavior for testing.
@ -100,6 +113,10 @@ class TestEnv : public EnvWrapper {
// Special Env used to delay background operations.
class SpecialEnv : public EnvWrapper {
public:
// For historical reasons, the std::atomic<> fields below are currently
// accessed via acquired loads and release stores. We should switch
// to plain load(), store() calls that provide sequential consistency.
// sstable/log Sync() calls are blocked while this pointer is non-null.
std::atomic<bool> delay_data_sync_;
@ -118,6 +135,9 @@ class SpecialEnv : public EnvWrapper {
// Force write to manifest files to fail while this pointer is non-null.
std::atomic<bool> manifest_write_error_;
// Force log file close to fail while this bool is true.
std::atomic<bool> log_file_close_;
bool count_random_reads_;
AtomicCounter random_read_counter_;
@ -129,6 +149,7 @@ class SpecialEnv : public EnvWrapper {
non_writable_(false),
manifest_sync_error_(false),
manifest_write_error_(false),
log_file_close_(false),
count_random_reads_(false) {}
Status NewWritableFile(const std::string& f, WritableFile** r) {
@ -136,9 +157,12 @@ class SpecialEnv : public EnvWrapper {
private:
SpecialEnv* const env_;
WritableFile* const base_;
const std::string fname_;
public:
DataFile(SpecialEnv* env, WritableFile* base) : env_(env), base_(base) {}
DataFile(SpecialEnv* env, WritableFile* base, const std::string& fname)
: env_(env), base_(base), fname_(fname) {}
~DataFile() { delete base_; }
Status Append(const Slice& data) {
if (env_->no_space_.load(std::memory_order_acquire)) {
@ -148,7 +172,14 @@ class SpecialEnv : public EnvWrapper {
return base_->Append(data);
}
}
Status Close() { return base_->Close(); }
Status Close() {
Status s = base_->Close();
if (s.ok() && IsLogFile(fname_) &&
env_->log_file_close_.load(std::memory_order_acquire)) {
s = Status::IOError("simulated log file Close error");
}
return s;
}
Status Flush() { return base_->Flush(); }
Status Sync() {
if (env_->data_sync_error_.load(std::memory_order_acquire)) {
@ -192,10 +223,9 @@ class SpecialEnv : public EnvWrapper {
Status s = target()->NewWritableFile(f, r);
if (s.ok()) {
if (strstr(f.c_str(), ".ldb") != nullptr ||
strstr(f.c_str(), ".log") != nullptr) {
*r = new DataFile(this, *r);
} else if (strstr(f.c_str(), "MANIFEST") != nullptr) {
if (IsLdbFile(f) || IsLogFile(f)) {
*r = new DataFile(this, *r, f);
} else if (IsManifestFile(f)) {
*r = new ManifestFile(this, *r);
}
}
@ -1692,8 +1722,14 @@ TEST_F(DBTest, DestroyEmptyDir) {
ASSERT_TRUE(env.FileExists(dbname));
std::vector<std::string> children;
ASSERT_LEVELDB_OK(env.GetChildren(dbname, &children));
#if defined(LEVELDB_PLATFORM_CHROMIUM)
// TODO(https://crbug.com/1428746): Chromium's file system abstraction always
// filters out '.' and '..'.
ASSERT_EQ(0, children.size());
#else
// The stock Env's do not filter out '.' and '..' special files.
ASSERT_EQ(2, children.size());
#endif // defined(LEVELDB_PLATFORM_CHROMIUM)
ASSERT_LEVELDB_OK(DestroyDB(dbname, opts));
ASSERT_TRUE(!env.FileExists(dbname));
@ -1941,6 +1977,33 @@ TEST_F(DBTest, BloomFilter) {
delete options.filter_policy;
}
TEST_F(DBTest, LogCloseError) {
// Regression test for bug where we could ignore log file
// Close() error when switching to a new log file.
const int kValueSize = 20000;
const int kWriteCount = 10;
const int kWriteBufferSize = (kValueSize * kWriteCount) / 2;
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = kWriteBufferSize; // Small write buffer
Reopen(&options);
env_->log_file_close_.store(true, std::memory_order_release);
std::string value(kValueSize, 'x');
Status s;
for (int i = 0; i < kWriteCount && s.ok(); i++) {
s = Put(Key(i), value);
}
ASSERT_TRUE(!s.ok()) << "succeeded even after log file Close failure";
// Future writes should also fail after an earlier error.
s = Put("hello", "world");
ASSERT_TRUE(!s.ok()) << "write succeeded after log file Close failure";
env_->log_file_close_.store(false, std::memory_order_release);
}
// Multi-threaded test:
namespace {

View File

@ -328,7 +328,12 @@ TEST_F(RecoveryTest, ManifestMissing) {
RemoveManifestFile();
Status status = OpenWithStatus();
#if defined(LEVELDB_PLATFORM_CHROMIUM)
// TODO(crbug.com/760362): See comment in MakeIOError() from env_chromium.cc.
ASSERT_TRUE(status.IsIOError());
#else
ASSERT_TRUE(status.IsCorruption());
#endif // defined(LEVELDB_PLATFORM_CHROMIUM)
}
} // namespace leveldb

View File

@ -90,9 +90,9 @@ div.bsql {
<h4>Benchmark Source Code</h4>
<p>We wrote benchmark tools for SQLite and Kyoto TreeDB based on LevelDB's <span class="code">db_bench</span>. The code for each of the benchmarks resides here:</p>
<ul>
<li> <b>LevelDB:</b> <a href="https://github.com/google/leveldb/blob/master/benchmarks/db_bench.cc">benchmarks/db_bench.cc</a>.</li>
<li> <b>SQLite:</b> <a href="https://github.com/google/leveldb/blob/master/benchmarks/db_bench_sqlite3.cc">benchmarks/db_bench_sqlite3.cc</a>.</li>
<li> <b>Kyoto TreeDB:</b> <a href="https://github.com/google/leveldb/blob/master/benchmarks/db_bench_tree_db.cc">benchmarks/db_bench_tree_db.cc</a>.</li>
<li> <b>LevelDB:</b> <a href="https://github.com/google/leveldb/blob/main/benchmarks/db_bench.cc">benchmarks/db_bench.cc</a>.</li>
<li> <b>SQLite:</b> <a href="https://github.com/google/leveldb/blob/main/benchmarks/db_bench_sqlite3.cc">benchmarks/db_bench_sqlite3.cc</a>.</li>
<li> <b>Kyoto TreeDB:</b> <a href="https://github.com/google/leveldb/blob/main/benchmarks/db_bench_tree_db.cc">benchmarks/db_bench_tree_db.cc</a>.</li>
</ul>
<h4>Custom Build Specifications</h4>

View File

@ -27,7 +27,8 @@ enum CompressionType {
// part of the persistent format on disk.
kNoCompression = 0x0,
kSnappyCompression = 0x1,
kLz4Compression = 0x2,
kZstdCompression = 0x2,
kLz4Compression = 0x3,
};
// Options to control the behavior of a database (passed to DB::Open)
@ -131,6 +132,10 @@ struct LEVELDB_EXPORT Options {
// efficiently detect that and will switch to uncompressed mode.
CompressionType compression = kSnappyCompression;
// Compression level for zstd.
// Currently only the range [-5,22] is supported. Default is 1.
int zstd_compression_level = 1;
// EXPERIMENTAL: If true, append to existing MANIFEST and log files
// when a database is opened. This can significantly speed up open.
//
@ -145,8 +150,6 @@ struct LEVELDB_EXPORT Options {
// Options that control read operations
struct LEVELDB_EXPORT ReadOptions {
ReadOptions() = default;
// If true, all data read from underlying storage will be
// verified against corresponding checksums.
bool verify_checksums = false;

View File

@ -30,4 +30,9 @@
#cmakedefine01 HAVE_SNAPPY
#endif // !defined(HAVE_SNAPPY)
#endif // STORAGE_LEVELDB_PORT_PORT_CONFIG_H_
// Define to 1 if you have Zstd.
#if !defined(HAVE_Zstd)
#cmakedefine01 HAVE_ZSTD
#endif // !defined(HAVE_ZSTD)
#endif // STORAGE_LEVELDB_PORT_PORT_CONFIG_H_

View File

@ -72,7 +72,7 @@ bool Snappy_GetUncompressedLength(const char* input, size_t length,
size_t* result);
// Attempt to snappy uncompress input[0,input_length-1] into *output.
// Returns true if successful, false if the input is invalid lightweight
// Returns true if successful, false if the input is invalid snappy
// compressed data.
//
// REQUIRES: at least the first "n" bytes of output[] must be writable
@ -81,6 +81,26 @@ bool Snappy_GetUncompressedLength(const char* input, size_t length,
bool Snappy_Uncompress(const char* input_data, size_t input_length,
char* output);
// Store the zstd compression of "input[0,input_length-1]" in *output.
// Returns false if zstd is not supported by this port.
bool Zstd_Compress(int level, const char* input, size_t input_length,
std::string* output);
// If input[0,input_length-1] looks like a valid zstd compressed
// buffer, store the size of the uncompressed data in *result and
// return true. Else return false.
bool Zstd_GetUncompressedLength(const char* input, size_t length,
size_t* result);
// Attempt to zstd uncompress input[0,input_length-1] into *output.
// Returns true if successful, false if the input is invalid zstd
// compressed data.
//
// REQUIRES: at least the first "n" bytes of output[] must be writable
// where "n" is the result of a successful call to
// Zstd_GetUncompressedLength.
bool Zstd_Uncompress(const char* input_data, size_t input_length, char* output);
// ------------------ Miscellaneous -------------------
// If heap profiling is not supported, returns false.

View File

@ -28,6 +28,10 @@
#if HAVE_SNAPPY
#include <snappy.h>
#endif // HAVE_SNAPPY
#if HAVE_ZSTD
#define ZSTD_STATIC_LINKING_ONLY // For ZSTD_compressionParameters.
#include <zstd.h>
#endif // HAVE_ZSTD
#if HAVE_LZ4
#include <lz4.h>
@ -130,6 +134,74 @@ inline bool Snappy_Uncompress(const char* input, size_t length, char* output) {
#endif // HAVE_SNAPPY
}
inline bool Zstd_Compress(int level, const char* input, size_t length,
std::string* output) {
#if HAVE_ZSTD
// Get the MaxCompressedLength.
size_t outlen = ZSTD_compressBound(length);
if (ZSTD_isError(outlen)) {
return false;
}
output->resize(outlen);
ZSTD_CCtx* ctx = ZSTD_createCCtx();
ZSTD_compressionParameters parameters =
ZSTD_getCParams(level, std::max(length, size_t{1}), /*dictSize=*/0);
ZSTD_CCtx_setCParams(ctx, parameters);
outlen = ZSTD_compress2(ctx, &(*output)[0], output->size(), input, length);
ZSTD_freeCCtx(ctx);
if (ZSTD_isError(outlen)) {
return false;
}
output->resize(outlen);
return true;
#else
// Silence compiler warnings about unused arguments.
(void)level;
(void)input;
(void)length;
(void)output;
return false;
#endif // HAVE_ZSTD
}
inline bool Zstd_GetUncompressedLength(const char* input, size_t length,
size_t* result) {
#if HAVE_ZSTD
size_t size = ZSTD_getFrameContentSize(input, length);
if (size == 0) return false;
*result = size;
return true;
#else
// Silence compiler warnings about unused arguments.
(void)input;
(void)length;
(void)result;
return false;
#endif // HAVE_ZSTD
}
inline bool Zstd_Uncompress(const char* input, size_t length, char* output) {
#if HAVE_ZSTD
size_t outlen;
if (!Zstd_GetUncompressedLength(input, length, &outlen)) {
return false;
}
ZSTD_DCtx* ctx = ZSTD_createDCtx();
outlen = ZSTD_decompressDCtx(ctx, output, outlen, input, length);
ZSTD_freeDCtx(ctx);
if (ZSTD_isError(outlen)) {
return false;
}
return true;
#else
// Silence compiler warnings about unused arguments.
(void)input;
(void)length;
(void)output;
return false;
#endif // HAVE_ZSTD
}
inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
// Silence compiler warnings about unused arguments.
(void)func;

View File

@ -5,6 +5,7 @@
#include "table/format.h"
#include "leveldb/env.h"
#include "leveldb/options.h"
#include "port/port.h"
#include "table/block.h"
#include "util/coding.h"
@ -40,6 +41,10 @@ void Footer::EncodeTo(std::string* dst) const {
}
Status Footer::DecodeFrom(Slice* input) {
if (input->size() < kEncodedLength) {
return Status::Corruption("not an sstable (footer too short)");
}
const char* magic_ptr = input->data() + kEncodedLength - 8;
const uint32_t magic_lo = DecodeFixed32(magic_ptr);
const uint32_t magic_hi = DecodeFixed32(magic_ptr + 4);
@ -117,13 +122,31 @@ Status ReadBlock(RandomAccessFile* file, const ReadOptions& options,
size_t ulength = 0;
if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) {
delete[] buf;
return Status::Corruption("corrupted compressed block contents");
return Status::Corruption("corrupted snappy compressed block length");
}
char* ubuf = new char[ulength];
if (!port::Snappy_Uncompress(data, n, ubuf)) {
delete[] buf;
delete[] ubuf;
return Status::Corruption("corrupted compressed block contents");
return Status::Corruption("corrupted snappy compressed block contents");
}
delete[] buf;
result->data = Slice(ubuf, ulength);
result->heap_allocated = true;
result->cachable = true;
break;
}
case kZstdCompression: {
size_t ulength = 0;
if (!port::Zstd_GetUncompressedLength(data, n, &ulength)) {
delete[] buf;
return Status::Corruption("corrupted zstd compressed block length");
}
char* ubuf = new char[ulength];
if (!port::Zstd_Uncompress(data, n, ubuf)) {
delete[] buf;
delete[] ubuf;
return Status::Corruption("corrupted zstd compressed block contents");
}
delete[] buf;
result->data = Slice(ubuf, ulength);

View File

@ -168,14 +168,25 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
}
break;
}
case kLz4Compression: {
std::string* compressed = &r->compressed_output;
if (port::Lz4_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Snappy not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
case kZstdCompression: {
std::string* compressed = &r->compressed_output;
if (port::Zstd_Compress(r->options.zstd_compression_level, raw.data(),
raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Zstd not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;

View File

@ -14,6 +14,7 @@
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "leveldb/options.h"
#include "leveldb/table_builder.h"
#include "table/block.h"
#include "table/block_builder.h"
@ -784,16 +785,28 @@ TEST(TableTest, ApproximateOffsetOfPlain) {
ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 612000));
}
static bool SnappyCompressionSupported() {
static bool CompressionSupported(CompressionType type) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(in.data(), in.size(), &out);
if (type == kSnappyCompression) {
return port::Snappy_Compress(in.data(), in.size(), &out);
} else if (type == kZstdCompression) {
return port::Zstd_Compress(/*level=*/1, in.data(), in.size(), &out);
}
return false;
}
TEST(TableTest, ApproximateOffsetOfCompressed) {
if (!SnappyCompressionSupported()) {
std::fprintf(stderr, "skipping compression tests\n");
return;
class CompressionTableTest
: public ::testing::TestWithParam<std::tuple<CompressionType>> {};
INSTANTIATE_TEST_SUITE_P(CompressionTests, CompressionTableTest,
::testing::Values(kSnappyCompression,
kZstdCompression));
TEST_P(CompressionTableTest, ApproximateOffsetOfCompressed) {
CompressionType type = ::testing::get<0>(GetParam());
if (!CompressionSupported(type)) {
GTEST_SKIP() << "skipping compression test: " << type;
}
Random rnd(301);
@ -807,7 +820,7 @@ TEST(TableTest, ApproximateOffsetOfCompressed) {
KVMap kvmap;
Options options;
options.block_size = 1024;
options.compression = kSnappyCompression;
options.compression = type;
c.Finish(options, &keys, &kvmap);
// Expected upper and lower bounds of space used by compressible strings.

@ -1 +1 @@
Subproject commit 7d0d9061d83b663ce05d9de5da3d5865a3845b79
Subproject commit f7547e29ccaed7b64ef4f7495ecfff1c9f6f3d03

View File

@ -96,40 +96,45 @@ TEST_F(EnvTest, RunMany) {
struct RunState {
port::Mutex mu;
port::CondVar cvar{&mu};
int last_id = 0;
int run_count = 0;
};
struct Callback {
RunState* state_; // Pointer to shared state.
const int id_; // Order# for the execution of this callback.
RunState* const state_; // Pointer to shared state.
bool run = false;
Callback(RunState* s, int id) : state_(s), id_(id) {}
Callback(RunState* s) : state_(s) {}
static void Run(void* arg) {
Callback* callback = reinterpret_cast<Callback*>(arg);
RunState* state = callback->state_;
MutexLock l(&state->mu);
ASSERT_EQ(state->last_id, callback->id_ - 1);
state->last_id = callback->id_;
state->run_count++;
callback->run = true;
state->cvar.Signal();
}
};
RunState state;
Callback callback1(&state, 1);
Callback callback2(&state, 2);
Callback callback3(&state, 3);
Callback callback4(&state, 4);
Callback callback1(&state);
Callback callback2(&state);
Callback callback3(&state);
Callback callback4(&state);
env_->Schedule(&Callback::Run, &callback1);
env_->Schedule(&Callback::Run, &callback2);
env_->Schedule(&Callback::Run, &callback3);
env_->Schedule(&Callback::Run, &callback4);
MutexLock l(&state.mu);
while (state.last_id != 4) {
while (state.run_count != 4) {
state.cvar.Wait();
}
ASSERT_TRUE(callback1.run);
ASSERT_TRUE(callback2.run);
ASSERT_TRUE(callback3.run);
ASSERT_TRUE(callback4.run);
}
struct State {
@ -175,11 +180,21 @@ TEST_F(EnvTest, TestOpenNonExistentFile) {
RandomAccessFile* random_access_file;
Status status =
env_->NewRandomAccessFile(non_existent_file, &random_access_file);
#if defined(LEVELDB_PLATFORM_CHROMIUM)
// TODO(crbug.com/760362): See comment in MakeIOError() from env_chromium.cc.
ASSERT_TRUE(status.IsIOError());
#else
ASSERT_TRUE(status.IsNotFound());
#endif // defined(LEVELDB_PLATFORM_CHROMIUM)
SequentialFile* sequential_file;
status = env_->NewSequentialFile(non_existent_file, &sequential_file);
#if defined(LEVELDB_PLATFORM_CHROMIUM)
// TODO(crbug.com/760362): See comment in MakeIOError() from env_chromium.cc.
ASSERT_TRUE(status.IsIOError());
#else
ASSERT_TRUE(status.IsNotFound());
#endif // defined(LEVELDB_PLATFORM_CHROMIUM)
}
TEST_F(EnvTest, ReopenWritableFile) {