Worker id in snapshot/wal

Summary:
Adds worker id to snapshot and wal filename.
Adds a new worker_id flag to be used for recovering a worker with a distributed snapshot.
Adds worker_id field to snapshot to check for consistency.

Reviewers: florijan

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1096
This commit is contained in:
Dominik Gleich 2018-01-18 10:22:54 +01:00
parent 189fd75369
commit 2a130e784e
13 changed files with 111 additions and 50 deletions

View File

@ -2,6 +2,7 @@
## Next Release
* Snapshot format changed (not backward compatible).
## v0.9.0

View File

@ -1,6 +1,5 @@
#include "database/graph_db.hpp"
#include "communication/messaging/distributed.hpp"
#include "database/graph_db.hpp"
#include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp"
#include "durability/paths.hpp"
@ -75,7 +74,8 @@ class SingleNode : public Base {
private:
Storage storage_{0};
durability::WriteAheadLog wal_{config_.durability_directory,
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled};
tx::SingleNodeEngine tx_engine_{&wal_};
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
@ -91,7 +91,8 @@ class Master : public Base {
private:
communication::messaging::System system_{config_.master_endpoint};
Storage storage_{0};
durability::WriteAheadLog wal_{config_.durability_directory,
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled};
tx::MasterEngine tx_engine_{system_, &wal_};
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
@ -113,7 +114,8 @@ class Worker : public Base {
tx::WorkerEngine tx_engine_{system_, config_.master_endpoint};
Storage storage_{config_.worker_id};
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
durability::WriteAheadLog wal_{config_.durability_directory,
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled};
TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{system_,
config_.master_endpoint};
@ -144,6 +146,8 @@ GraphDb::~GraphDb() {
if (impl_->config_.snapshot_on_exit) MakeSnapshot();
}
int GraphDb::WorkerId() const { return impl_->config_.worker_id; }
Storage &GraphDb::storage() { return impl_->storage(); }
durability::WriteAheadLog &GraphDb::wal() { return impl_->wal(); }

View File

@ -76,6 +76,7 @@ class GraphDb {
storage::ConcurrentIdMapper<storage::Property> &property_mapper();
database::Counters &counters();
void CollectGarbage();
int WorkerId() const;
protected:
explicit GraphDb(std::unique_ptr<impl::Base> impl);

View File

@ -1,3 +1,5 @@
#include "durability/paths.hpp"
#include <experimental/filesystem>
#include <experimental/optional>
#include <string>
@ -22,9 +24,9 @@ bool EnsureDir(const fs::path &dir) {
void CheckDurabilityDir(const std::string &durability_dir) {
namespace fs = std::experimental::filesystem;
if (fs::exists(durability_dir)) {
CHECK(fs::is_directory(durability_dir)) << "The durability directory path '"
<< durability_dir
<< "' is not a directory!";
CHECK(fs::is_directory(durability_dir))
<< "The durability directory path '" << durability_dir
<< "' is not a directory!";
} else {
bool success = EnsureDir(durability_dir);
CHECK(success) << "Failed to create durability directory '"
@ -42,20 +44,20 @@ std::experimental::optional<tx::transaction_id_t> TransactionIdFromWalFilename(
const std::string &name) {
auto nullopt = std::experimental::nullopt;
// Get the max_transaction_id from the file name that has format
// "XXXXX__max_transaction_<MAX_TRANS_ID>"
// "XXXXX__max_transaction_<MAX_TRANS_ID>_worker_<Worker_ID>"
auto file_name_split = utils::RSplit(name, "__", 1);
if (file_name_split.size() != 2) {
LOG(WARNING) << "Unable to parse WAL file name: " << name;
return nullopt;
}
if (file_name_split[1] == "current")
if (utils::StartsWith(file_name_split[1], "current"))
return std::numeric_limits<tx::transaction_id_t>::max();
file_name_split = utils::RSplit(file_name_split[1], "_", 1);
if (file_name_split.size() != 2) {
file_name_split = utils::Split(file_name_split[1], "_");
if (file_name_split.size() != 5) {
LOG(WARNING) << "Unable to parse WAL file name: " << name;
return nullopt;
}
auto &tx_id_str = file_name_split[1];
auto &tx_id_str = file_name_split[2];
try {
return std::stoll(tx_id_str);
} catch (std::invalid_argument &) {
@ -67,19 +69,27 @@ std::experimental::optional<tx::transaction_id_t> TransactionIdFromWalFilename(
}
}
fs::path MakeSnapshotPath(const fs::path &durability_dir, const int worker_id) {
std::string date_str =
Timestamp(Timestamp::now())
.to_string("{:04d}_{:02d}_{:02d}__{:02d}_{:02d}_{:02d}_{:05d}");
auto file_name = date_str + "_worker_" + std::to_string(worker_id);
return durability_dir / kSnapshotDir / file_name;
}
/// Generates a file path for a write-ahead log file. If given a transaction ID
/// the file name will contain it. Otherwise the file path is for the "current"
/// WAL file for which the max tx id is still unknown.
fs::path WalFilenameForTransactionId(
const std::experimental::filesystem::path &wal_dir,
std::experimental::optional<tx::transaction_id_t> tx_id =
std::experimental::nullopt) {
const std::experimental::filesystem::path &wal_dir, int worker_id,
std::experimental::optional<tx::transaction_id_t> tx_id) {
auto file_name = Timestamp::now().to_iso8601();
if (tx_id) {
file_name += "__max_transaction_" + std::to_string(*tx_id);
} else {
file_name += "__current";
}
file_name = file_name + "_Worker_" + std::to_string(worker_id);
return wal_dir / file_name;
}
} // namespace durability
} // namespace durability

View File

@ -25,11 +25,16 @@ void CheckDurabilityDir(const std::string &durability_dir);
std::experimental::optional<tx::transaction_id_t> TransactionIdFromWalFilename(
const std::string &name);
/// Generates a file path for a write-ahead log file. If given a transaction ID
/// the file name will contain it. Otherwise the file path is for the "current"
/// WAL file for which the max tx id is still unknown.
/** Generates a path for a DB snapshot in the given folder in a well-defined
* sortable format with worker id appended to the file name. */
std::experimental::filesystem::path MakeSnapshotPath(
const std::experimental::filesystem::path &durability_dir, int worker_id);
/// Generates a file path for a write-ahead log file of a specified worker. If
/// given a transaction ID the file name will contain it. Otherwise the file
/// path is for the "current" WAL file for which the max tx id is still unknown.
std::experimental::filesystem::path WalFilenameForTransactionId(
const std::experimental::filesystem::path &wal_dir,
const std::experimental::filesystem::path &wal_dir, int worker_id,
std::experimental::optional<tx::transaction_id_t> tx_id =
std::experimental::nullopt);
} // namespace durability

View File

@ -76,6 +76,10 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int) &&
dv.ValueInt() == durability::kVersion);
// Checks worker id was set correctly
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int) &&
dv.ValueInt() == db.WorkerId());
// Vertex and edge generator ids
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int));
uint64_t vertex_generator_cnt = dv.ValueInt();

View File

@ -27,6 +27,10 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
durability::kMagicNumber.size());
encoder.WriteInt(durability::kVersion);
// Writes the worker id to snapshot, used to guarantee consistent cluster
// state after recovery
encoder.WriteInt(db.WorkerId());
// Write the number of generated vertex and edges, used to recover
// generators internal states
encoder.WriteInt(db.storage().VertexGenerator().LocalCount());
@ -109,17 +113,10 @@ void RemoveOldWals(const fs::path &wal_dir,
}
} // namespace
fs::path MakeSnapshotPath(const fs::path &durability_dir) {
std::string date_str =
Timestamp(Timestamp::now())
.to_string("{:04d}_{:02d}_{:02d}__{:02d}_{:02d}_{:02d}_{:05d}");
return durability_dir / kSnapshotDir / date_str;
}
bool MakeSnapshot(database::GraphDb &db, const fs::path &durability_dir,
const int snapshot_max_retained) {
if (!EnsureDir(durability_dir / kSnapshotDir)) return false;
const auto snapshot_file = MakeSnapshotPath(durability_dir);
const auto snapshot_file = MakeSnapshotPath(durability_dir, db.WorkerId());
if (fs::exists(snapshot_file)) return false;
database::GraphDbAccessor dba(db);
if (Encode(snapshot_file, db, dba)) {

View File

@ -5,12 +5,6 @@
#include "database/graph_db.hpp"
namespace durability {
using path = std::experimental::filesystem::path;
/** Generates a path for a DB snapshot in the given folder in a well-defined
* sortable format. */
// TODO review - move to paths.hpp?
path MakeSnapshotPath(const path &durability_dir);
/**
* Make snapshot and save it in snapshots folder. Returns true if successful.
@ -18,6 +12,7 @@ path MakeSnapshotPath(const path &durability_dir);
* @param durability_dir - directory where durability data is stored.
* @param snapshot_max_retained - maximum number of snapshots to retain.
*/
bool MakeSnapshot(database::GraphDb &db, const path &durability_dir,
bool MakeSnapshot(database::GraphDb &db,
const std::experimental::filesystem::path &durability_dir,
int snapshot_max_retained);
} // namespace durability

View File

@ -8,5 +8,5 @@ namespace durability {
constexpr std::array<uint8_t, 4> kMagicNumber{{'M', 'G', 's', 'n'}};
// The current default version of snapshot and WAL enconding / decoding.
constexpr int64_t kVersion{4};
constexpr int64_t kVersion{5};
} // namespace durability

View File

@ -20,9 +20,9 @@ DEFINE_VALIDATED_HIDDEN_int32(wal_buffer_size, 4096,
namespace durability {
WriteAheadLog::WriteAheadLog(
const std::experimental::filesystem::path &durability_dir,
int worker_id, const std::experimental::filesystem::path &durability_dir,
bool durability_enabled)
: deltas_{FLAGS_wal_buffer_size}, wal_file_{durability_dir} {
: deltas_{FLAGS_wal_buffer_size}, wal_file_{worker_id, durability_dir} {
if (durability_enabled) {
CheckDurabilityDir(durability_dir);
wal_file_.Init();
@ -38,8 +38,8 @@ WriteAheadLog::~WriteAheadLog() {
}
WriteAheadLog::WalFile::WalFile(
const std::experimental::filesystem::path &durability_dir)
: wal_dir_{durability_dir / kWalDir} {}
int worker_id, const std::experimental::filesystem::path &durability_dir)
: worker_id_(worker_id), wal_dir_{durability_dir / kWalDir} {}
WriteAheadLog::WalFile::~WalFile() {
if (!current_wal_file_.empty()) writer_.Close();
@ -50,7 +50,7 @@ void WriteAheadLog::WalFile::Init() {
LOG(ERROR) << "Can't write to WAL directory: " << wal_dir_;
current_wal_file_ = std::experimental::filesystem::path();
} else {
current_wal_file_ = WalFilenameForTransactionId(wal_dir_);
current_wal_file_ = WalFilenameForTransactionId(wal_dir_, worker_id_);
try {
writer_.Open(current_wal_file_);
} catch (std::ios_base::failure &) {
@ -93,7 +93,8 @@ void WriteAheadLog::WalFile::Flush(RingBuffer<database::StateDelta> &buffer) {
void WriteAheadLog::WalFile::RotateFile() {
writer_.Close();
std::experimental::filesystem::rename(
current_wal_file_, WalFilenameForTransactionId(wal_dir_, latest_tx_));
current_wal_file_,
WalFilenameForTransactionId(wal_dir_, worker_id_, latest_tx_));
Init();
}

View File

@ -29,12 +29,12 @@ namespace durability {
*/
class WriteAheadLog {
public:
WriteAheadLog(const std::experimental::filesystem::path &durability_dir,
WriteAheadLog(int worker_id,
const std::experimental::filesystem::path &durability_dir,
bool durability_enabled);
~WriteAheadLog();
/** Enables the WAL. Called at the end of database::GraphDb construction,
* after
/** Enables the WAL. Called at the end of GraphDb construction, after
* (optional) recovery. */
void Enable() { enabled_ = true; }
@ -45,7 +45,7 @@ class WriteAheadLog {
/** Groups the logic of WAL file handling (flushing, naming, rotating) */
class WalFile {
public:
explicit WalFile(const std::experimental::filesystem::path &wal__dir);
WalFile(int worker_id, const std::experimental::filesystem::path &wal__dir);
~WalFile();
/** Initializes the WAL file. Must be called before first flush. Can be
@ -57,6 +57,7 @@ class WriteAheadLog {
void Flush(RingBuffer<database::StateDelta> &buffer);
private:
int worker_id_;
const std::experimental::filesystem::path wal_dir_;
HashedFileWriter writer_;
communication::bolt::PrimitiveEncoder<HashedFileWriter> encoder_{writer_};

View File

@ -422,6 +422,9 @@ TEST_F(Durability, SnapshotEncoding) {
communication::bolt::DecodedValue dv;
decoder.ReadValue(&dv);
ASSERT_EQ(dv.ValueInt(), durability::kVersion);
// Worker id
decoder.ReadValue(&dv);
ASSERT_EQ(dv.ValueInt(), 0);
// Number of generated vertex ids.
decoder.ReadValue(&dv);
ASSERT_TRUE(dv.IsInt());
@ -478,8 +481,8 @@ TEST_F(Durability, SnapshotEncoding) {
EXPECT_EQ(decoded_edges[gid1].type, "et1");
EXPECT_EQ(decoded_edges[gid1].properties.size(), 0);
// Vertex and edge counts are included in the hash. Re-read them to update the
// hash.
// Vertex and edge counts are included in the hash. Re-read them to update
// the hash.
buffer.ReadType(vertex_count);
buffer.ReadType(edge_count);
buffer.Close();
@ -736,3 +739,37 @@ TEST_F(Durability, SnapshotOnExit) {
}
EXPECT_EQ(DirFiles(snapshot_dir_).size(), 1);
}
TEST_F(Durability, WorkerIdRecovery) {
auto config = DbConfig();
config.worker_id = 5;
database::SingleNode db{config};
MakeDb(db, 100);
MakeSnapshot(db);
EXPECT_EQ(DirFiles(snapshot_dir_).size(), 1);
// WorkerIds are equal and recovery should be sucessful
{
auto config = DbConfig();
config.worker_id = 5;
config.db_recover_on_startup = true;
database::SingleNode recovered{config};
EXPECT_EQ(recovered.WorkerId(), config.worker_id);
CompareDbs(db, recovered);
database::GraphDbAccessor dba(recovered);
EXPECT_NE(dba.VerticesCount(), 0);
EXPECT_NE(dba.EdgesCount(), 0);
}
// WorkerIds are not equal and recovery should fail
{
auto config = DbConfig();
config.worker_id = 10;
config.db_recover_on_startup = true;
database::SingleNode recovered{config};
EXPECT_NE(recovered.WorkerId(), db.WorkerId());
database::GraphDbAccessor dba(recovered);
EXPECT_EQ(dba.VerticesCount(), 0);
EXPECT_EQ(dba.EdgesCount(), 0);
}
}

View File

@ -11,6 +11,7 @@
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "config.hpp"
#include "durability/hashed_file_writer.hpp"
#include "durability/paths.hpp"
#include "durability/snapshooter.hpp"
#include "durability/version.hpp"
#include "utils/string.hpp"
@ -361,6 +362,9 @@ void Convert(const std::vector<std::string> &nodes,
durability::kMagicNumber.size());
encoder.WriteTypedValue(durability::kVersion);
encoder.WriteInt(0); // Worker Id - for this use case it's okay to set to 0
// since we are using a single-node version of
// memgraph here
// The following two entries indicate the starting points for generating new
// Vertex/Edge IDs in the DB. They are only important when there are
// vertices/edges that were moved to another worker (in distributed
@ -417,7 +421,8 @@ std::string GetOutputPath() {
} catch (const std::experimental::filesystem::filesystem_error &error) {
LOG(FATAL) << error.what();
}
return std::string(durability::MakeSnapshotPath(durability_dir));
int worker_id = 0;
return std::string(durability::MakeSnapshotPath(durability_dir, worker_id));
}
int main(int argc, char *argv[]) {