1
0
mirror of https://github.com/google/leveldb.git synced 2025-04-25 14:00:27 +08:00

Merge branch 'google:main' into main

This commit is contained in:
innoyiya 2024-02-23 14:26:28 +08:00 committed by GitHub
commit c627c9f379
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 370 additions and 73 deletions

View File

@ -66,10 +66,11 @@ jobs:
- name: Install dependencies on Linux
if: ${{ runner.os == 'Linux' }}
# libgoogle-perftools-dev is temporarily removed from the package list
# because it is currently broken on GitHub's Ubuntu 22.04.
run: |
sudo apt-get update
sudo apt-get install libgoogle-perftools-dev libkyotocabinet-dev \
libsnappy-dev libsqlite3-dev
sudo apt-get install libkyotocabinet-dev libsnappy-dev libsqlite3-dev
- name: Generate build config
run: >-

View File

@ -40,6 +40,7 @@ check_include_file("unistd.h" HAVE_UNISTD_H)
include(CheckLibraryExists)
check_library_exists(crc32c crc32c_value "" HAVE_CRC32C)
check_library_exists(snappy snappy_compress "" HAVE_SNAPPY)
check_library_exists(zstd zstd_compress "" HAVE_ZSTD)
check_library_exists(tcmalloc malloc "" HAVE_TCMALLOC)
include(CheckCXXSymbolExists)
@ -273,6 +274,9 @@ endif(HAVE_CRC32C)
if(HAVE_SNAPPY)
target_link_libraries(leveldb snappy)
endif(HAVE_SNAPPY)
if(HAVE_ZSTD)
target_link_libraries(leveldb zstd)
endif(HAVE_ZSTD)
if(HAVE_TCMALLOC)
target_link_libraries(leveldb tcmalloc)
endif(HAVE_TCMALLOC)

View File

@ -18,7 +18,7 @@ Authors: Sanjay Ghemawat (sanjay@google.com) and Jeff Dean (jeff@google.com)
* Multiple changes can be made in one atomic batch.
* Users can create a transient snapshot to get a consistent view of data.
* Forward and backward iteration is supported over the data.
* Data is automatically compressed using the [Snappy compression library](https://google.github.io/snappy/).
* Data is automatically compressed using the [Snappy compression library](https://google.github.io/snappy/), but [Zstd compression](https://facebook.github.io/zstd/) is also supported.
* External activity (file system operations etc.) is relayed through a virtual interface so users can customize the operating system interactions.
# Documentation

View File

@ -60,7 +60,9 @@ static const char* FLAGS_benchmarks =
"fill100K,"
"crc32c,"
"snappycomp,"
"snappyuncomp,";
"snappyuncomp,"
"zstdcomp,"
"zstduncomp,";
// Number of key/values to place in database
static int FLAGS_num = 1000000;
@ -124,6 +126,9 @@ static bool FLAGS_compression = true;
// Use the db with the following name.
static const char* FLAGS_db = nullptr;
// ZSTD compression level to try out
static int FLAGS_zstd_compression_level = 1;
namespace leveldb {
namespace {
@ -367,6 +372,57 @@ struct ThreadState {
ThreadState(int index, int seed) : tid(index), rand(seed), shared(nullptr) {}
};
void Compress(
ThreadState* thread, std::string name,
std::function<bool(const char*, size_t, std::string*)> compress_func) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
int64_t bytes = 0;
int64_t produced = 0;
bool ok = true;
std::string compressed;
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = compress_func(input.data(), input.size(), &compressed);
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedSingleOp();
}
if (!ok) {
thread->stats.AddMessage("(" + name + " failure)");
} else {
char buf[100];
std::snprintf(buf, sizeof(buf), "(output: %.1f%%)",
(produced * 100.0) / bytes);
thread->stats.AddMessage(buf);
thread->stats.AddBytes(bytes);
}
}
void Uncompress(
ThreadState* thread, std::string name,
std::function<bool(const char*, size_t, std::string*)> compress_func,
std::function<bool(const char*, size_t, char*)> uncompress_func) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
std::string compressed;
bool ok = compress_func(input.data(), input.size(), &compressed);
int64_t bytes = 0;
char* uncompressed = new char[input.size()];
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = uncompress_func(compressed.data(), compressed.size(), uncompressed);
bytes += input.size();
thread->stats.FinishedSingleOp();
}
delete[] uncompressed;
if (!ok) {
thread->stats.AddMessage("(" + name + " failure)");
} else {
thread->stats.AddBytes(bytes);
}
}
} // namespace
class Benchmark {
@ -579,6 +635,10 @@ class Benchmark {
method = &Benchmark::SnappyCompress;
} else if (name == Slice("snappyuncomp")) {
method = &Benchmark::SnappyUncompress;
} else if (name == Slice("zstdcomp")) {
method = &Benchmark::ZstdCompress;
} else if (name == Slice("zstduncomp")) {
method = &Benchmark::ZstdUncompress;
} else if (name == Slice("heapprofile")) {
HeapProfile();
} else if (name == Slice("stats")) {
@ -713,50 +773,30 @@ class Benchmark {
}
void SnappyCompress(ThreadState* thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
int64_t bytes = 0;
int64_t produced = 0;
bool ok = true;
std::string compressed;
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedSingleOp();
}
if (!ok) {
thread->stats.AddMessage("(snappy failure)");
} else {
char buf[100];
std::snprintf(buf, sizeof(buf), "(output: %.1f%%)",
(produced * 100.0) / bytes);
thread->stats.AddMessage(buf);
thread->stats.AddBytes(bytes);
}
Compress(thread, "snappy", &port::Snappy_Compress);
}
void SnappyUncompress(ThreadState* thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
std::string compressed;
bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
int64_t bytes = 0;
char* uncompressed = new char[input.size()];
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
uncompressed);
bytes += input.size();
thread->stats.FinishedSingleOp();
}
delete[] uncompressed;
Uncompress(thread, "snappy", &port::Snappy_Compress,
&port::Snappy_Uncompress);
}
if (!ok) {
thread->stats.AddMessage("(snappy failure)");
} else {
thread->stats.AddBytes(bytes);
}
void ZstdCompress(ThreadState* thread) {
Compress(thread, "zstd",
[](const char* input, size_t length, std::string* output) {
return port::Zstd_Compress(FLAGS_zstd_compression_level, input,
length, output);
});
}
void ZstdUncompress(ThreadState* thread) {
Uncompress(
thread, "zstd",
[](const char* input, size_t length, std::string* output) {
return port::Zstd_Compress(FLAGS_zstd_compression_level, input,
length, output);
},
&port::Zstd_Uncompress);
}
void Open() {

View File

@ -1368,8 +1368,22 @@ Status DBImpl::MakeRoomForWrite(bool force) {
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
s = logfile_->Close();
if (!s.ok()) {
// We may have lost some data written to the previous log file.
// Switch to the new log file anyway, but record as a background
// error so we do not attempt any more writes.
//
// We could perhaps attempt to save the memtable corresponding
// to log file and suppress the error if that works, but that
// would add more complexity in a critical code path.
RecordBackgroundError(s);
}
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);

View File

@ -65,6 +65,19 @@ class AtomicCounter {
void DelayMilliseconds(int millis) {
Env::Default()->SleepForMicroseconds(millis * 1000);
}
bool IsLdbFile(const std::string& f) {
return strstr(f.c_str(), ".ldb") != nullptr;
}
bool IsLogFile(const std::string& f) {
return strstr(f.c_str(), ".log") != nullptr;
}
bool IsManifestFile(const std::string& f) {
return strstr(f.c_str(), "MANIFEST") != nullptr;
}
} // namespace
// Test Env to override default Env behavior for testing.
@ -100,6 +113,10 @@ class TestEnv : public EnvWrapper {
// Special Env used to delay background operations.
class SpecialEnv : public EnvWrapper {
public:
// For historical reasons, the std::atomic<> fields below are currently
// accessed via acquired loads and release stores. We should switch
// to plain load(), store() calls that provide sequential consistency.
// sstable/log Sync() calls are blocked while this pointer is non-null.
std::atomic<bool> delay_data_sync_;
@ -118,6 +135,9 @@ class SpecialEnv : public EnvWrapper {
// Force write to manifest files to fail while this pointer is non-null.
std::atomic<bool> manifest_write_error_;
// Force log file close to fail while this bool is true.
std::atomic<bool> log_file_close_;
bool count_random_reads_;
AtomicCounter random_read_counter_;
@ -129,6 +149,7 @@ class SpecialEnv : public EnvWrapper {
non_writable_(false),
manifest_sync_error_(false),
manifest_write_error_(false),
log_file_close_(false),
count_random_reads_(false) {}
Status NewWritableFile(const std::string& f, WritableFile** r) {
@ -136,9 +157,12 @@ class SpecialEnv : public EnvWrapper {
private:
SpecialEnv* const env_;
WritableFile* const base_;
const std::string fname_;
public:
DataFile(SpecialEnv* env, WritableFile* base) : env_(env), base_(base) {}
DataFile(SpecialEnv* env, WritableFile* base, const std::string& fname)
: env_(env), base_(base), fname_(fname) {}
~DataFile() { delete base_; }
Status Append(const Slice& data) {
if (env_->no_space_.load(std::memory_order_acquire)) {
@ -148,7 +172,14 @@ class SpecialEnv : public EnvWrapper {
return base_->Append(data);
}
}
Status Close() { return base_->Close(); }
Status Close() {
Status s = base_->Close();
if (s.ok() && IsLogFile(fname_) &&
env_->log_file_close_.load(std::memory_order_acquire)) {
s = Status::IOError("simulated log file Close error");
}
return s;
}
Status Flush() { return base_->Flush(); }
Status Sync() {
if (env_->data_sync_error_.load(std::memory_order_acquire)) {
@ -192,10 +223,9 @@ class SpecialEnv : public EnvWrapper {
Status s = target()->NewWritableFile(f, r);
if (s.ok()) {
if (strstr(f.c_str(), ".ldb") != nullptr ||
strstr(f.c_str(), ".log") != nullptr) {
*r = new DataFile(this, *r);
} else if (strstr(f.c_str(), "MANIFEST") != nullptr) {
if (IsLdbFile(f) || IsLogFile(f)) {
*r = new DataFile(this, *r, f);
} else if (IsManifestFile(f)) {
*r = new ManifestFile(this, *r);
}
}
@ -1692,8 +1722,14 @@ TEST_F(DBTest, DestroyEmptyDir) {
ASSERT_TRUE(env.FileExists(dbname));
std::vector<std::string> children;
ASSERT_LEVELDB_OK(env.GetChildren(dbname, &children));
#if defined(LEVELDB_PLATFORM_CHROMIUM)
// TODO(https://crbug.com/1428746): Chromium's file system abstraction always
// filters out '.' and '..'.
ASSERT_EQ(0, children.size());
#else
// The stock Env's do not filter out '.' and '..' special files.
ASSERT_EQ(2, children.size());
#endif // defined(LEVELDB_PLATFORM_CHROMIUM)
ASSERT_LEVELDB_OK(DestroyDB(dbname, opts));
ASSERT_TRUE(!env.FileExists(dbname));
@ -1941,6 +1977,33 @@ TEST_F(DBTest, BloomFilter) {
delete options.filter_policy;
}
TEST_F(DBTest, LogCloseError) {
// Regression test for bug where we could ignore log file
// Close() error when switching to a new log file.
const int kValueSize = 20000;
const int kWriteCount = 10;
const int kWriteBufferSize = (kValueSize * kWriteCount) / 2;
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = kWriteBufferSize; // Small write buffer
Reopen(&options);
env_->log_file_close_.store(true, std::memory_order_release);
std::string value(kValueSize, 'x');
Status s;
for (int i = 0; i < kWriteCount && s.ok(); i++) {
s = Put(Key(i), value);
}
ASSERT_TRUE(!s.ok()) << "succeeded even after log file Close failure";
// Future writes should also fail after an earlier error.
s = Put("hello", "world");
ASSERT_TRUE(!s.ok()) << "write succeeded after log file Close failure";
env_->log_file_close_.store(false, std::memory_order_release);
}
// Multi-threaded test:
namespace {

View File

@ -328,7 +328,12 @@ TEST_F(RecoveryTest, ManifestMissing) {
RemoveManifestFile();
Status status = OpenWithStatus();
#if defined(LEVELDB_PLATFORM_CHROMIUM)
// TODO(crbug.com/760362): See comment in MakeIOError() from env_chromium.cc.
ASSERT_TRUE(status.IsIOError());
#else
ASSERT_TRUE(status.IsCorruption());
#endif // defined(LEVELDB_PLATFORM_CHROMIUM)
}
} // namespace leveldb

View File

@ -26,7 +26,8 @@ enum CompressionType {
// NOTE: do not change the values of existing entries, as these are
// part of the persistent format on disk.
kNoCompression = 0x0,
kSnappyCompression = 0x1
kSnappyCompression = 0x1,
kZstdCompression = 0x2,
};
// Options to control the behavior of a database (passed to DB::Open)
@ -130,6 +131,10 @@ struct LEVELDB_EXPORT Options {
// efficiently detect that and will switch to uncompressed mode.
CompressionType compression = kSnappyCompression;
// Compression level for zstd.
// Currently only the range [-5,22] is supported. Default is 1.
int zstd_compression_level = 1;
// EXPERIMENTAL: If true, append to existing MANIFEST and log files
// when a database is opened. This can significantly speed up open.
//

View File

@ -30,4 +30,9 @@
#cmakedefine01 HAVE_SNAPPY
#endif // !defined(HAVE_SNAPPY)
#endif // STORAGE_LEVELDB_PORT_PORT_CONFIG_H_
// Define to 1 if you have Zstd.
#if !defined(HAVE_Zstd)
#cmakedefine01 HAVE_ZSTD
#endif // !defined(HAVE_ZSTD)
#endif // STORAGE_LEVELDB_PORT_PORT_CONFIG_H_

View File

@ -72,7 +72,7 @@ bool Snappy_GetUncompressedLength(const char* input, size_t length,
size_t* result);
// Attempt to snappy uncompress input[0,input_length-1] into *output.
// Returns true if successful, false if the input is invalid lightweight
// Returns true if successful, false if the input is invalid snappy
// compressed data.
//
// REQUIRES: at least the first "n" bytes of output[] must be writable
@ -81,6 +81,26 @@ bool Snappy_GetUncompressedLength(const char* input, size_t length,
bool Snappy_Uncompress(const char* input_data, size_t input_length,
char* output);
// Store the zstd compression of "input[0,input_length-1]" in *output.
// Returns false if zstd is not supported by this port.
bool Zstd_Compress(int level, const char* input, size_t input_length,
std::string* output);
// If input[0,input_length-1] looks like a valid zstd compressed
// buffer, store the size of the uncompressed data in *result and
// return true. Else return false.
bool Zstd_GetUncompressedLength(const char* input, size_t length,
size_t* result);
// Attempt to zstd uncompress input[0,input_length-1] into *output.
// Returns true if successful, false if the input is invalid zstd
// compressed data.
//
// REQUIRES: at least the first "n" bytes of output[] must be writable
// where "n" is the result of a successful call to
// Zstd_GetUncompressedLength.
bool Zstd_Uncompress(const char* input_data, size_t input_length, char* output);
// ------------------ Miscellaneous -------------------
// If heap profiling is not supported, returns false.

View File

@ -28,6 +28,10 @@
#if HAVE_SNAPPY
#include <snappy.h>
#endif // HAVE_SNAPPY
#if HAVE_ZSTD
#define ZSTD_STATIC_LINKING_ONLY // For ZSTD_compressionParameters.
#include <zstd.h>
#endif // HAVE_ZSTD
#include <cassert>
#include <condition_variable> // NOLINT
@ -126,6 +130,74 @@ inline bool Snappy_Uncompress(const char* input, size_t length, char* output) {
#endif // HAVE_SNAPPY
}
inline bool Zstd_Compress(int level, const char* input, size_t length,
std::string* output) {
#if HAVE_ZSTD
// Get the MaxCompressedLength.
size_t outlen = ZSTD_compressBound(length);
if (ZSTD_isError(outlen)) {
return false;
}
output->resize(outlen);
ZSTD_CCtx* ctx = ZSTD_createCCtx();
ZSTD_compressionParameters parameters =
ZSTD_getCParams(level, std::max(length, size_t{1}), /*dictSize=*/0);
ZSTD_CCtx_setCParams(ctx, parameters);
outlen = ZSTD_compress2(ctx, &(*output)[0], output->size(), input, length);
ZSTD_freeCCtx(ctx);
if (ZSTD_isError(outlen)) {
return false;
}
output->resize(outlen);
return true;
#else
// Silence compiler warnings about unused arguments.
(void)level;
(void)input;
(void)length;
(void)output;
return false;
#endif // HAVE_ZSTD
}
inline bool Zstd_GetUncompressedLength(const char* input, size_t length,
size_t* result) {
#if HAVE_ZSTD
size_t size = ZSTD_getFrameContentSize(input, length);
if (size == 0) return false;
*result = size;
return true;
#else
// Silence compiler warnings about unused arguments.
(void)input;
(void)length;
(void)result;
return false;
#endif // HAVE_ZSTD
}
inline bool Zstd_Uncompress(const char* input, size_t length, char* output) {
#if HAVE_ZSTD
size_t outlen;
if (!Zstd_GetUncompressedLength(input, length, &outlen)) {
return false;
}
ZSTD_DCtx* ctx = ZSTD_createDCtx();
outlen = ZSTD_decompressDCtx(ctx, output, outlen, input, length);
ZSTD_freeDCtx(ctx);
if (ZSTD_isError(outlen)) {
return false;
}
return true;
#else
// Silence compiler warnings about unused arguments.
(void)input;
(void)length;
(void)output;
return false;
#endif // HAVE_ZSTD
}
inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
// Silence compiler warnings about unused arguments.
(void)func;

View File

@ -5,6 +5,7 @@
#include "table/format.h"
#include "leveldb/env.h"
#include "leveldb/options.h"
#include "port/port.h"
#include "table/block.h"
#include "util/coding.h"
@ -40,6 +41,10 @@ void Footer::EncodeTo(std::string* dst) const {
}
Status Footer::DecodeFrom(Slice* input) {
if (input->size() < kEncodedLength) {
return Status::Corruption("not an sstable (footer too short)");
}
const char* magic_ptr = input->data() + kEncodedLength - 8;
const uint32_t magic_lo = DecodeFixed32(magic_ptr);
const uint32_t magic_hi = DecodeFixed32(magic_ptr + 4);
@ -116,13 +121,31 @@ Status ReadBlock(RandomAccessFile* file, const ReadOptions& options,
size_t ulength = 0;
if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) {
delete[] buf;
return Status::Corruption("corrupted compressed block contents");
return Status::Corruption("corrupted snappy compressed block length");
}
char* ubuf = new char[ulength];
if (!port::Snappy_Uncompress(data, n, ubuf)) {
delete[] buf;
delete[] ubuf;
return Status::Corruption("corrupted compressed block contents");
return Status::Corruption("corrupted snappy compressed block contents");
}
delete[] buf;
result->data = Slice(ubuf, ulength);
result->heap_allocated = true;
result->cachable = true;
break;
}
case kZstdCompression: {
size_t ulength = 0;
if (!port::Zstd_GetUncompressedLength(data, n, &ulength)) {
delete[] buf;
return Status::Corruption("corrupted zstd compressed block length");
}
char* ubuf = new char[ulength];
if (!port::Zstd_Uncompress(data, n, ubuf)) {
delete[] buf;
delete[] ubuf;
return Status::Corruption("corrupted zstd compressed block contents");
}
delete[] buf;
result->data = Slice(ubuf, ulength);

View File

@ -168,6 +168,21 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
}
break;
}
case kZstdCompression: {
std::string* compressed = &r->compressed_output;
if (port::Zstd_Compress(r->options.zstd_compression_level, raw.data(),
raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Zstd not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
}
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();

View File

@ -14,6 +14,7 @@
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "leveldb/options.h"
#include "leveldb/table_builder.h"
#include "table/block.h"
#include "table/block_builder.h"
@ -784,15 +785,29 @@ TEST(TableTest, ApproximateOffsetOfPlain) {
ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 612000));
}
static bool SnappyCompressionSupported() {
static bool CompressionSupported(CompressionType type) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(in.data(), in.size(), &out);
if (type == kSnappyCompression) {
return port::Snappy_Compress(in.data(), in.size(), &out);
} else if (type == kZstdCompression) {
return port::Zstd_Compress(/*level=*/1, in.data(), in.size(), &out);
}
return false;
}
TEST(TableTest, ApproximateOffsetOfCompressed) {
if (!SnappyCompressionSupported())
GTEST_SKIP() << "skipping compression tests";
class CompressionTableTest
: public ::testing::TestWithParam<std::tuple<CompressionType>> {};
INSTANTIATE_TEST_SUITE_P(CompressionTests, CompressionTableTest,
::testing::Values(kSnappyCompression,
kZstdCompression));
TEST_P(CompressionTableTest, ApproximateOffsetOfCompressed) {
CompressionType type = ::testing::get<0>(GetParam());
if (!CompressionSupported(type)) {
GTEST_SKIP() << "skipping compression test: " << type;
}
Random rnd(301);
TableConstructor c(BytewiseComparator());
@ -805,7 +820,7 @@ TEST(TableTest, ApproximateOffsetOfCompressed) {
KVMap kvmap;
Options options;
options.block_size = 1024;
options.compression = kSnappyCompression;
options.compression = type;
c.Finish(options, &keys, &kvmap);
// Expected upper and lower bounds of space used by compressible strings.

@ -1 +1 @@
Subproject commit 7d0d9061d83b663ce05d9de5da3d5865a3845b79
Subproject commit f7547e29ccaed7b64ef4f7495ecfff1c9f6f3d03

View File

@ -96,40 +96,45 @@ TEST_F(EnvTest, RunMany) {
struct RunState {
port::Mutex mu;
port::CondVar cvar{&mu};
int last_id = 0;
int run_count = 0;
};
struct Callback {
RunState* state_; // Pointer to shared state.
const int id_; // Order# for the execution of this callback.
RunState* const state_; // Pointer to shared state.
bool run = false;
Callback(RunState* s, int id) : state_(s), id_(id) {}
Callback(RunState* s) : state_(s) {}
static void Run(void* arg) {
Callback* callback = reinterpret_cast<Callback*>(arg);
RunState* state = callback->state_;
MutexLock l(&state->mu);
ASSERT_EQ(state->last_id, callback->id_ - 1);
state->last_id = callback->id_;
state->run_count++;
callback->run = true;
state->cvar.Signal();
}
};
RunState state;
Callback callback1(&state, 1);
Callback callback2(&state, 2);
Callback callback3(&state, 3);
Callback callback4(&state, 4);
Callback callback1(&state);
Callback callback2(&state);
Callback callback3(&state);
Callback callback4(&state);
env_->Schedule(&Callback::Run, &callback1);
env_->Schedule(&Callback::Run, &callback2);
env_->Schedule(&Callback::Run, &callback3);
env_->Schedule(&Callback::Run, &callback4);
MutexLock l(&state.mu);
while (state.last_id != 4) {
while (state.run_count != 4) {
state.cvar.Wait();
}
ASSERT_TRUE(callback1.run);
ASSERT_TRUE(callback2.run);
ASSERT_TRUE(callback3.run);
ASSERT_TRUE(callback4.run);
}
struct State {
@ -175,11 +180,21 @@ TEST_F(EnvTest, TestOpenNonExistentFile) {
RandomAccessFile* random_access_file;
Status status =
env_->NewRandomAccessFile(non_existent_file, &random_access_file);
#if defined(LEVELDB_PLATFORM_CHROMIUM)
// TODO(crbug.com/760362): See comment in MakeIOError() from env_chromium.cc.
ASSERT_TRUE(status.IsIOError());
#else
ASSERT_TRUE(status.IsNotFound());
#endif // defined(LEVELDB_PLATFORM_CHROMIUM)
SequentialFile* sequential_file;
status = env_->NewSequentialFile(non_existent_file, &sequential_file);
#if defined(LEVELDB_PLATFORM_CHROMIUM)
// TODO(crbug.com/760362): See comment in MakeIOError() from env_chromium.cc.
ASSERT_TRUE(status.IsIOError());
#else
ASSERT_TRUE(status.IsNotFound());
#endif // defined(LEVELDB_PLATFORM_CHROMIUM)
}
TEST_F(EnvTest, ReopenWritableFile) {