diff --git a/db/version_set.cc b/db/version_set.cc index 4e37bf9..2bab498 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -827,6 +827,12 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { } if (!s.ok()) { Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str()); + if (ManifestContains(record)) { + Log(options_->info_log, + "MANIFEST contains log record despite error; advancing to new " + "version to prevent mismatch between in-memory and logged state"); + s = Status::OK(); + } } } @@ -1113,6 +1119,29 @@ const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const { int(current_->files_[6].size())); return scratch->buffer; } +bool VersionSet::ManifestContains(const std::string& record) const { + std::string fname = DescriptorFileName(dbname_, manifest_file_number_); + Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str()); + SequentialFile* file = NULL; + Status s = env_->NewSequentialFile(fname, &file); + if (!s.ok()) { + Log(options_->info_log, "ManifestContains: %s\n", s.ToString().c_str()); + return false; + } + log::Reader reader(file, NULL, true/*checksum*/, 0); + Slice r; + std::string scratch; + bool result = false; + while (reader.ReadRecord(&r, &scratch)) { + if (r == Slice(record)) { + result = true; + break; + } + } + delete file; + Log(options_->info_log, "ManifestContains: result = %d\n", result ? 1 : 0); + return result; +} uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { uint64_t result = 0; diff --git a/db/version_set.h b/db/version_set.h index ea0c925..f3afb96 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -293,6 +293,9 @@ class VersionSet { void AppendVersion(Version* v); + // Return true iff the manifest contains the specified record. + bool ManifestContains(const std::string& record) const; + Env* const env_; const std::string dbname_; const Options* const options_;