Update snapshot format

Summary:
Set vertex/edge generator id from recovery

Add tests

Reviewers: florijan

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1061
This commit is contained in:
Dominik Gleich 2017-12-20 12:48:19 +01:00
parent 20aa8d563a
commit 1556d78d15
10 changed files with 167 additions and 53 deletions

View File

@ -101,13 +101,11 @@ void GraphDb::Shutdown() {
void GraphDb::StartSnapshooting() {
if (config_.durability_enabled) {
auto create_snapshot = [this]() -> void {
GraphDbAccessor db_accessor(*this);
if (!durability::MakeSnapshot(db_accessor,
if (!durability::MakeSnapshot(*this,
fs::path(config_.durability_directory),
config_.snapshot_max_retained)) {
LOG(WARNING) << "Durability: snapshot creation failed";
}
db_accessor.Commit();
};
snapshot_creator_.Run(std::chrono::seconds(config_.snapshot_cycle_sec),
create_snapshot);
@ -178,11 +176,10 @@ GraphDb::~GraphDb() {
// Create last database snapshot
if (config_.snapshot_on_exit == true) {
GraphDbAccessor db_accessor(*this);
LOG(INFO) << "Creating snapshot on shutdown..." << std::endl;
const bool status = durability::MakeSnapshot(
db_accessor, fs::path(config_.durability_directory),
config_.snapshot_max_retained);
const bool status =
durability::MakeSnapshot(*this, fs::path(config_.durability_directory),
config_.snapshot_max_retained);
if (status) {
std::cout << "Snapshot created successfully." << std::endl;
} else {

View File

@ -103,6 +103,9 @@ class GraphDb {
void CollectGarbage();
gid::GidGenerator &VertexGenerator() { return vertex_generator_; }
gid::GidGenerator &EdgeGenerator() { return edge_generator_; }
/** When this is false, no new transactions should be created. */
std::atomic<bool> is_accepting_transactions_{true};

View File

@ -52,8 +52,7 @@ struct RecoveryData {
return false; \
}
bool RecoverSnapshot(const fs::path &snapshot_file,
GraphDbAccessor &db_accessor,
bool RecoverSnapshot(const fs::path &snapshot_file, GraphDb &db,
RecoveryData &recovery_data) {
HashedFileReader reader;
communication::bolt::Decoder<HashedFileReader> decoder(reader);
@ -77,6 +76,16 @@ bool RecoverSnapshot(const fs::path &snapshot_file,
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int) &&
dv.ValueInt() == durability::kVersion);
// Vertex and edge generator ids
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int));
uint64_t vertex_generator_cnt = dv.ValueInt();
db.VertexGenerator().SetId(
std::max(db.VertexGenerator().LocalCount(), vertex_generator_cnt));
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int));
uint64_t edge_generator_cnt = dv.ValueInt();
db.EdgeGenerator().SetId(
std::max(db.EdgeGenerator().LocalCount(), edge_generator_cnt));
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int));
recovery_data.snapshooter_tx_id = dv.ValueInt();
// Transaction snapshot of the transaction that created the snapshot.
@ -98,16 +107,17 @@ bool RecoverSnapshot(const fs::path &snapshot_file,
property.ValueString());
}
GraphDbAccessor dba(db);
for (int64_t i = 0; i < vertex_count; ++i) {
DecodedValue vertex_dv;
RETURN_IF_NOT(decoder.ReadValue(&vertex_dv, DecodedValue::Type::Vertex));
auto &vertex = vertex_dv.ValueVertex();
auto vertex_accessor = db_accessor.InsertVertex(vertex.id);
auto vertex_accessor = dba.InsertVertex(vertex.id);
for (const auto &label : vertex.labels) {
vertex_accessor.add_label(db_accessor.Label(label));
vertex_accessor.add_label(dba.Label(label));
}
for (const auto &property_pair : vertex.properties) {
vertex_accessor.PropsSet(db_accessor.Property(property_pair.first),
vertex_accessor.PropsSet(dba.Property(property_pair.first),
query::TypedValue(property_pair.second));
}
vertices.insert({vertex.id, vertex_accessor});
@ -119,12 +129,11 @@ bool RecoverSnapshot(const fs::path &snapshot_file,
auto it_from = vertices.find(edge.from);
auto it_to = vertices.find(edge.to);
RETURN_IF_NOT(it_from != vertices.end() && it_to != vertices.end());
auto edge_accessor =
db_accessor.InsertEdge(it_from->second, it_to->second,
db_accessor.EdgeType(edge.type), edge.id);
auto edge_accessor = dba.InsertEdge(it_from->second, it_to->second,
dba.EdgeType(edge.type), edge.id);
for (const auto &property_pair : edge.properties)
edge_accessor.PropsSet(db_accessor.Property(property_pair.first),
edge_accessor.PropsSet(dba.Property(property_pair.first),
query::TypedValue(property_pair.second));
}
@ -132,8 +141,12 @@ bool RecoverSnapshot(const fs::path &snapshot_file,
// hash.
reader.ReadType(vertex_count);
reader.ReadType(edge_count);
if (!reader.Close()) return false;
return reader.hash() == hash;
if (!reader.Close() || reader.hash() != hash) {
dba.Abort();
return false;
}
dba.Commit();
return true;
}
#undef RETURN_IF_NOT
@ -266,22 +279,19 @@ bool Recover(const fs::path &durability_dir, GraphDb &db) {
snapshot_files.emplace_back(file);
std::sort(snapshot_files.rbegin(), snapshot_files.rend());
for (auto &snapshot_file : snapshot_files) {
GraphDbAccessor db_accessor{db};
LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
if (!RecoverSnapshot(snapshot_file, db_accessor, recovery_data)) {
db_accessor.Abort();
if (!RecoverSnapshot(snapshot_file, db, recovery_data)) {
recovery_data.Clear();
LOG(WARNING) << "Snapshot recovery failed, trying older snapshot...";
continue;
} else {
LOG(INFO) << "Snapshot recovery successful.";
db_accessor.Commit();
break;
}
}
// Write-ahead-log recovery.
GraphDbAccessor db_accessor{db};
GraphDbAccessor db_accessor(db);
// WAL recovery does not have to be complete for the recovery to be
// considered successful. For the time being ignore the return value,
// consider a better system.

View File

@ -27,4 +27,4 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
*/
bool Recover(const std::experimental::filesystem::path &durability_dir,
GraphDb &db);
}
} // namespace durability

View File

@ -16,7 +16,7 @@ namespace fs = std::experimental::filesystem;
namespace durability {
namespace {
bool Encode(const fs::path &snapshot_file, GraphDbAccessor &db_accessor_) {
bool Encode(const fs::path &snapshot_file, GraphDb &db, GraphDbAccessor &dba) {
try {
HashedFileWriter buffer(snapshot_file);
communication::bolt::BaseEncoder<HashedFileWriter> encoder(buffer);
@ -26,14 +26,19 @@ bool Encode(const fs::path &snapshot_file, GraphDbAccessor &db_accessor_) {
durability::kMagicNumber.size());
encoder.WriteInt(durability::kVersion);
// Write the number of generated vertex and edges, used to recover
// generators internal states
encoder.WriteInt(db.VertexGenerator().LocalCount());
encoder.WriteInt(db.EdgeGenerator().LocalCount());
// Write the ID of the transaction doing the snapshot.
encoder.WriteInt(db_accessor_.transaction_id());
encoder.WriteInt(dba.transaction_id());
// Write the transaction snapshot into the snapshot. It's used when
// recovering from the combination of snapshot and write-ahead-log.
{
std::vector<query::TypedValue> tx_snapshot;
for (int64_t tx : db_accessor_.transaction().snapshot())
for (int64_t tx : dba.transaction().snapshot())
tx_snapshot.emplace_back(tx);
encoder.WriteList(tx_snapshot);
}
@ -41,18 +46,18 @@ bool Encode(const fs::path &snapshot_file, GraphDbAccessor &db_accessor_) {
// Write label+property indexes as list ["label", "property", ...]
{
std::vector<query::TypedValue> index_vec;
for (const auto &key : db_accessor_.GetIndicesKeys()) {
index_vec.emplace_back(db_accessor_.LabelName(key.label_));
index_vec.emplace_back(db_accessor_.PropertyName(key.property_));
for (const auto &key : dba.GetIndicesKeys()) {
index_vec.emplace_back(dba.LabelName(key.label_));
index_vec.emplace_back(dba.PropertyName(key.property_));
}
encoder.WriteList(index_vec);
}
for (const auto &vertex : db_accessor_.Vertices(false)) {
for (const auto &vertex : dba.Vertices(false)) {
encoder.WriteVertex(vertex);
vertex_num++;
}
for (const auto &edge : db_accessor_.Edges(false)) {
for (const auto &edge : dba.Edges(false)) {
encoder.WriteEdge(edge);
edge_num++;
}
@ -110,14 +115,16 @@ fs::path MakeSnapshotPath(const fs::path &durability_dir) {
return durability_dir / kSnapshotDir / date_str;
}
bool MakeSnapshot(GraphDbAccessor &db_accessor_, const fs::path &durability_dir,
bool MakeSnapshot(GraphDb &db, const fs::path &durability_dir,
const int snapshot_max_retained) {
if (!EnsureDir(durability_dir / kSnapshotDir)) return false;
const auto snapshot_file = MakeSnapshotPath(durability_dir);
if (fs::exists(snapshot_file)) return false;
if (Encode(snapshot_file, db_accessor_)) {
GraphDbAccessor dba(db);
if (Encode(snapshot_file, db, dba)) {
RemoveOldSnapshots(durability_dir / kSnapshotDir, snapshot_max_retained);
RemoveOldWals(durability_dir / kWalDir, db_accessor_.transaction());
RemoveOldWals(durability_dir / kWalDir, dba.transaction());
dba.Commit();
return true;
} else {
std::error_code error_code; // Just for exception suppression.

View File

@ -2,7 +2,7 @@
#include <experimental/filesystem>
class GraphDbAccessor;
#include "database/graph_db.hpp"
namespace durability {
using path = std::experimental::filesystem::path;
@ -14,10 +14,10 @@ path MakeSnapshotPath(const path &durability_dir);
/**
* Make snapshot and save it in snapshots folder. Returns true if successful.
* @param db_accessor- GraphDbAccessor used to access elements of GraphDb.
* @param db - database for which we are creating a snapshot
* @param durability_dir - directory where durability data is stored.
* @param snapshot_max_retained - maximum number of snapshots to retain.
*/
bool MakeSnapshot(GraphDbAccessor &db_accessor, const path &durability_dir,
bool MakeSnapshot(GraphDb &db, const path &durability_dir,
int snapshot_max_retained);
}
} // namespace durability

View File

@ -8,5 +8,5 @@ namespace durability {
constexpr std::array<uint8_t, 4> kMagicNumber{{'M', 'G', 's', 'n'}};
// The current default version of snapshot and WAL enconding / decoding.
constexpr int64_t kVersion{3};
}
constexpr int64_t kVersion{4};
} // namespace durability

View File

@ -34,8 +34,12 @@ static inline Gid Create(uint64_t worker_id, uint64_t local_id) {
}
/**
* @brief - Threadsafe generation of new global ids which belong to the
* worker_id machine
* Threadsafe generation of new global ids which belong to the
* worker_id machine. Never call SetId after calling Next without an Id you are
* sure is going to be used for gid, i.e. SetId should only be called before
* first Next call. We want to make sure that every id that we generate is
* larger than the id set by SetId, we can ensure that by not allowing calls to
* SetId after Next which generated new id (incremented internal id counter).
*/
class GidGenerator {
public:
@ -45,15 +49,29 @@ class GidGenerator {
* @param local_id - force local id instead of generating a new one
*/
gid::Gid Next(std::experimental::optional<Gid> local_id) {
auto id = local_id ? *local_id : id_++;
auto id = local_id ? *local_id : next_local_id_++;
if (local_id) {
utils::EnsureAtomicGe(id_, id + 1);
utils::EnsureAtomicGe(next_local_id_, id + 1);
} else {
generated_id_ = true;
}
return gid::Create(worker_id_, id);
}
/// Returns number of locally generated ids
uint64_t LocalCount() const { return next_local_id_; };
// Sets a new id from which every new gid will be generated, should only be
// set before first Next is called
void SetId(uint64_t id) {
DCHECK(!generated_id_)
<< "Id should be set only before first id is generated";
next_local_id_ = id;
}
private:
bool generated_id_{false};
int worker_id_;
std::atomic<Gid> id_{0};
std::atomic<uint64_t> next_local_id_{0};
};
}; // namespace gid
} // namespace gid

View File

@ -148,7 +148,7 @@ class DbGenerator {
}
};
/** Returns true if the given databases have the same contents (indices,
/** Checks if the given databases have the same contents (indices,
* vertices and edges). */
void CompareDbs(GraphDb &a, GraphDb &b) {
GraphDbAccessor dba_a(a);
@ -288,10 +288,8 @@ class Durability : public ::testing::Test {
}
void MakeSnapshot(GraphDb &db, int snapshot_max_retained = -1) {
GraphDbAccessor dba(db);
ASSERT_TRUE(
durability::MakeSnapshot(dba, durability_dir_, snapshot_max_retained));
dba.Commit();
durability::MakeSnapshot(db, durability_dir_, snapshot_max_retained));
}
void SetUp() override {
@ -418,6 +416,12 @@ TEST_F(Durability, SnapshotEncoding) {
communication::bolt::DecodedValue dv;
decoder.ReadValue(&dv);
ASSERT_EQ(dv.ValueInt(), durability::kVersion);
// Number of generated vertex ids.
decoder.ReadValue(&dv);
ASSERT_TRUE(dv.IsInt());
// Number of generated edge ids.
decoder.ReadValue(&dv);
ASSERT_TRUE(dv.IsInt());
// Transaction ID.
decoder.ReadValue(&dv);
ASSERT_TRUE(dv.IsInt());
@ -494,6 +498,73 @@ TEST_F(Durability, SnapshotRecovery) {
}
}
TEST_F(Durability, SnapshotNoVerticesIdRecovery) {
GraphDb db{DbConfig()};
MakeDb(db, 10);
// Erase all vertices, this should cause snapshot to not have any more
// vertices which should make it not change any id after snapshot recovery,
// but we still have to make sure that the id for generators is recovered
{
GraphDbAccessor dba(db);
for (auto vertex : dba.Vertices(false)) dba.RemoveVertex(vertex);
dba.Commit();
}
MakeSnapshot(db);
{
auto recovered_config = DbConfig();
recovered_config.db_recover_on_startup = true;
GraphDb recovered{recovered_config};
EXPECT_EQ(db.VertexGenerator().LocalCount(),
recovered.VertexGenerator().LocalCount());
EXPECT_EQ(db.EdgeGenerator().LocalCount(),
recovered.EdgeGenerator().LocalCount());
}
}
TEST_F(Durability, SnapshotAndWalIdRecovery) {
auto config = DbConfig();
config.durability_enabled = true;
GraphDb db{config};
MakeDb(db, 300);
MakeSnapshot(db);
MakeDb(db, 300);
// Sleep to ensure the WAL gets flushed.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
ASSERT_EQ(DirFiles(snapshot_dir_).size(), 1);
EXPECT_GT(DirFiles(wal_dir_).size(), 1);
{
auto recovered_config = DbConfig();
recovered_config.db_recover_on_startup = true;
GraphDb recovered{recovered_config};
EXPECT_EQ(db.VertexGenerator().LocalCount(),
recovered.VertexGenerator().LocalCount());
EXPECT_EQ(db.EdgeGenerator().LocalCount(),
recovered.EdgeGenerator().LocalCount());
}
}
TEST_F(Durability, OnlyWalIdRecovery) {
auto config = DbConfig();
config.durability_enabled = true;
GraphDb db{config};
MakeDb(db, 300);
// Sleep to ensure the WAL gets flushed.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
ASSERT_EQ(DirFiles(snapshot_dir_).size(), 0);
EXPECT_GT(DirFiles(wal_dir_).size(), 1);
{
auto recovered_config = DbConfig();
recovered_config.db_recover_on_startup = true;
GraphDb recovered{recovered_config};
EXPECT_EQ(db.VertexGenerator().LocalCount(),
recovered.VertexGenerator().LocalCount());
EXPECT_EQ(db.EdgeGenerator().LocalCount(),
recovered.EdgeGenerator().LocalCount());
}
}
TEST_F(Durability, WalRecovery) {
auto config = DbConfig();
config.durability_enabled = true;

View File

@ -342,6 +342,14 @@ void Convert(const std::vector<std::string> &nodes,
encoder.WriteRAW(durability::kMagicNumber.data(),
durability::kMagicNumber.size());
encoder.WriteTypedValue(durability::kVersion);
// The following two entries indicate the starting points for generating new
// Vertex/Edge IDs in the DB. They are only important when there are
// vertices/edges that were moved to another worker (in distributed
// Memgraph), so it's safe to set them to 0 in snapshot generation.
encoder.WriteInt(0); // Internal Id of vertex generator
encoder.WriteInt(0); // Internal Id of edge generator
encoder.WriteInt(0); // Id of transaction that is snapshooting.
encoder.WriteList({}); // Transactional snapshot.
encoder.WriteList({}); // Label + property indexes.
@ -373,8 +381,8 @@ std::string GetOutputPath() {
// If we have the 'out' flag, use that.
if (!utils::Trim(FLAGS_out).empty()) return FLAGS_out;
// Without the 'out', fall back to reading the memgraph configuration for
// durability_directory. Hopefully, memgraph configuration doesn't contain other
// flags which are defined in this file.
// durability_directory. Hopefully, memgraph configuration doesn't contain
// other flags which are defined in this file.
LoadConfig();
// Without durability_directory, we have to require 'out' flag.
if (utils::Trim(FLAGS_durability_directory).empty())