Write-ahead log

Summary:
My dear fellow Memgraphians. It's friday afternoon, and I am as ready to pop as WAL is to get reviewed...

What's done:
- Vertices and Edges have global IDs, stored in `VersionList`. Main storage is now a concurrent map ID->vlist_ptr.
- WriteAheadLog class added. It's based around buffering WAL::Op objects (elementraly DB changes) and periodically serializing and flusing them to disk.
- Snapshot recovery refactored, WAL recovery added. Snapshot format changed again to include necessary info.
- Durability testing completely reworked.

What's not done (and should be when we decide how):
- Old WAL file purging.
- Config refactor (naming and organization). Will do when we discuss what we want.
- Changelog and new feature documentation (both depending on the point above).
- Better error handling and recovery feedback. Currently it's all returning bools, which is not fine-grained enough (neither for errors nor partial successes, also EOF is reported as a failure at the moment).
- Moving the implementation of WAL stuff to .cpp where possible.
- Not sure if there are transactions being created outside of `GraphDbAccessor` and it's `BuildIndex`. Need to look into.
- True write-ahead logic (flag controlled): not committing a DB transaction if the WAL has not flushed it's data. We can discuss the gain/effort ratio for this feature.

Reviewers: buda, mislav.bradac, teon.banek, dgleich

Reviewed By: dgleich

Subscribers: mtomic, pullbot

Differential Revision: https://phabricator.memgraph.io/D958
This commit is contained in:
florijan 2017-11-13 09:50:49 +01:00
parent 1d5245cb13
commit 1e0ac8ab8f
49 changed files with 1916 additions and 800 deletions

View File

@ -194,6 +194,7 @@ set(memgraph_src_files
${src_dir}/database/graph_db_accessor.cpp
${src_dir}/durability/recovery.cpp
${src_dir}/durability/snapshooter.cpp
${src_dir}/durability/wal.cpp
${src_dir}/io/network/addrinfo.cpp
${src_dir}/io/network/network_endpoint.cpp
${src_dir}/io/network/socket.cpp

View File

@ -15,7 +15,10 @@
--snapshot-cycle-sec=-1
# create snapshot disabled on db exit
--snapshot-on-exit=true
--snapshot-on-exit=false
# disable WAL
--wal-flush-interval-millis=-1
# max number of snapshots which will be kept on the disk at some point
# if set to -1 the max number of snapshots is unlimited

View File

@ -192,6 +192,40 @@ DecodedValue::operator query::TypedValue() const {
}
}
DecodedValue::operator PropertyValue() const {
switch (type_) {
case Type::Null:
return PropertyValue::Null;
case Type::Bool:
return PropertyValue(bool_v);
case Type::Int:
return PropertyValue(int_v);
case Type::Double:
return PropertyValue(double_v);
case Type::String:
return PropertyValue(string_v);
case Type::List: {
std::vector<PropertyValue> vec;
vec.reserve(list_v.size());
for (const auto &value : list_v)
vec.emplace_back(static_cast<PropertyValue>(value));
return PropertyValue(std::move(vec));
}
case Type::Map: {
std::map<std::string, PropertyValue> map;
for (const auto &kv : map_v)
map.emplace(kv.first, static_cast<PropertyValue>(kv.second));
return PropertyValue(std::move(map));
}
case Type::Vertex:
case Type::Edge:
case Type::UnboundedEdge:
case Type::Path:
throw DecodedValueException(
"Unsupported conversion from DecodedValue to PropertyValue");
}
}
std::ostream &operator<<(std::ostream &os, const DecodedVertex &vertex) {
os << "V(";
utils::PrintIterable(os, vertex.labels, ":",

View File

@ -161,8 +161,9 @@ class DecodedValue {
#undef TYPE_CHECKER
// conversion function to TypedValue
operator query::TypedValue() const;
// PropertyValue operator must be explicit to prevent ambiguity.
explicit operator PropertyValue() const;
friend std::ostream &operator<<(std::ostream &os, const DecodedValue &value);

View File

@ -6,7 +6,6 @@
#include "communication/bolt/v1/codes.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "database/graph_db_accessor.hpp"
#include "utils/bswap.hpp"
#include "utils/underlying_cast.hpp"

View File

@ -40,7 +40,7 @@ class BaseEncoder : public PrimitiveEncoder<Buffer> {
void WriteVertex(const VertexAccessor &vertex) {
this->WriteRAW(underlying_cast(Marker::TinyStruct) + 3);
this->WriteRAW(underlying_cast(Signature::Node));
WriteUInt(vertex.temporary_id());
WriteUInt(vertex.id());
// write labels
const auto &labels = vertex.labels();
@ -62,10 +62,10 @@ class BaseEncoder : public PrimitiveEncoder<Buffer> {
this->WriteRAW(underlying_cast(unbound ? Signature::UnboundRelationship
: Signature::Relationship));
WriteUInt(edge.temporary_id());
WriteUInt(edge.id());
if (!unbound) {
WriteUInt(edge.from().temporary_id());
WriteUInt(edge.to().temporary_id());
WriteUInt(edge.from().id());
WriteUInt(edge.to().id());
}
// write type

View File

@ -13,7 +13,8 @@
/**
* A thread-safe ring buffer. Multi-producer, multi-consumer. Producers get
* blocked if the buffer is full. Consumers get returnd a nullopt.
* blocked if the buffer is full. Consumers get returnd a nullopt. First in
* first out.
*
* @tparam TElement - type of element the buffer tracks.
*/
@ -33,6 +34,11 @@ class RingBuffer {
delete[] buffer_;
}
/**
* Emplaces a new element into the buffer. This call blocks until space in the
* buffer is available. If multiple threads are waiting for space to become
* available, there are no order-of-entrace guarantees.
*/
template <typename... TArgs>
void emplace(TArgs &&... args) {
while (true) {
@ -53,6 +59,10 @@ class RingBuffer {
}
}
/**
* Removes and returns the oldest element from the buffer. If the buffer is
* empty, nullopt is returned.
*/
std::experimental::optional<TElement> pop() {
std::lock_guard<SpinLock> guard(lock_);
if (size_ == 0) return std::experimental::nullopt;
@ -63,6 +73,14 @@ class RingBuffer {
return result;
}
/** Removes all elements from the buffer. */
void clear() {
std::lock_guard<SpinLock> guard(lock_);
read_pos_ = 0;
write_pos_ = 0;
size_ = 0;
}
private:
int capacity_;
TElement *buffer_;

View File

@ -7,6 +7,7 @@
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/recovery.hpp"
#include "durability/snapshooter.hpp"
#include "storage/edge.hpp"
#include "storage/garbage_collector.hpp"
#include "utils/timer.hpp"
@ -47,9 +48,9 @@ GraphDb::GraphDb()
[this]() { CollectGarbage(); });
}
if (FLAGS_snapshot_recover_on_startup) {
RecoverDatabase(FLAGS_snapshot_directory);
}
if (FLAGS_snapshot_recover_on_startup)
durability::Recover(FLAGS_snapshot_directory, *this);
wal_.Enable();
StartSnapshooting();
if (FLAGS_query_execution_time_sec != -1) {
@ -77,39 +78,17 @@ void GraphDb::StartSnapshooting() {
if (FLAGS_snapshot_cycle_sec != -1) {
auto create_snapshot = [this]() -> void {
GraphDbAccessor db_accessor(*this);
snapshooter_.MakeSnapshot(db_accessor, fs::path(FLAGS_snapshot_directory),
FLAGS_snapshot_max_retained);
if (!durability::MakeSnapshot(db_accessor, fs::path(FLAGS_snapshot_directory),
FLAGS_snapshot_max_retained)) {
LOG(WARNING) << "Durability: snapshot creation failed";
}
db_accessor.Commit();
};
snapshot_creator_.Run(std::chrono::seconds(FLAGS_snapshot_cycle_sec),
create_snapshot);
}
}
void GraphDb::RecoverDatabase(const fs::path &snapshot_db_dir) {
if (snapshot_db_dir.empty()) return;
std::vector<fs::path> snapshots;
for (auto &snapshot_file : fs::directory_iterator(snapshot_db_dir)) {
if (fs::is_regular_file(snapshot_file)) {
snapshots.push_back(snapshot_file);
}
}
std::sort(snapshots.rbegin(), snapshots.rend());
Recovery recovery;
for (auto &snapshot_file : snapshots) {
GraphDbAccessor db_accessor(*this);
std::cout << "Starting database recovery from snapshot " << snapshot_file
<< std::endl;
if (recovery.Recover(snapshot_file.string(), db_accessor)) {
std::cout << "Recovery successful." << std::endl;
return;
} else {
LOG(ERROR) << "Recovery unsuccessful, trying older snapshot..."
<< std::endl;
}
}
}
void GraphDb::CollectGarbage() {
// main garbage collection logic
// see wiki documentation for logic explanation
@ -176,7 +155,7 @@ GraphDb::~GraphDb() {
if (FLAGS_snapshot_on_exit == true) {
GraphDbAccessor db_accessor(*this);
LOG(INFO) << "Creating snapshot on shutdown..." << std::endl;
const bool status = snapshooter_.MakeSnapshot(
const bool status = durability::MakeSnapshot(
db_accessor, fs::path(FLAGS_snapshot_directory),
FLAGS_snapshot_max_retained);
if (status) {
@ -188,8 +167,8 @@ GraphDb::~GraphDb() {
// Delete vertices and edges which weren't collected before, also deletes
// records inside version list
for (auto &vertex : vertices_.access()) delete vertex;
for (auto &edge : edges_.access()) delete edge;
for (auto &id_vlist : vertices_.access()) delete id_vlist.second;
for (auto &id_vlist : edges_.access()) delete id_vlist.second;
// Free expired records with the maximal possible id from all the deleters.
edge_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());

View File

@ -11,7 +11,7 @@
#include "database/graph_db_datatypes.hpp"
#include "database/indexes/key_index.hpp"
#include "database/indexes/label_property_index.hpp"
#include "durability/snapshooter.hpp"
#include "durability/wal.hpp"
#include "mvcc/version_list.hpp"
#include "storage/deferred_deleter.hpp"
#include "storage/edge.hpp"
@ -74,22 +74,21 @@ class GraphDb {
void StartSnapshooting();
/**
* Recovers database from a snapshot file and starts snapshooting.
* @param snapshot_db path to snapshot folder
*/
void RecoverDatabase(const fs::path &snapshot_db_path);
/** transaction engine related to this database */
tx::Engine tx_engine_;
std::atomic<int64_t> next_vertex_id_{0};
std::atomic<int64_t> next_edge_id{0};
// main storage for the graph
SkipList<mvcc::VersionList<Vertex> *> vertices_;
SkipList<mvcc::VersionList<Edge> *> edges_;
ConcurrentMap<int64_t, mvcc::VersionList<Vertex> *> vertices_;
ConcurrentMap<int64_t, mvcc::VersionList<Edge> *> edges_;
// Garbage collectors
GarbageCollector<Vertex> gc_vertices_;
GarbageCollector<Edge> gc_edges_;
GarbageCollector<ConcurrentMap<int64_t, mvcc::VersionList<Vertex> *>, Vertex>
gc_vertices_;
GarbageCollector<ConcurrentMap<int64_t, mvcc::VersionList<Edge> *>, Edge>
gc_edges_;
// Deleters for not relevant records
DeferredDeleter<Vertex> vertex_record_deleter_;
@ -117,8 +116,7 @@ class GraphDb {
*/
std::atomic<bool> index_build_in_progress_{false};
// snapshooter
Snapshooter snapshooter_;
durability::WriteAheadLog wal_;
// Schedulers
Scheduler gc_scheduler_;

View File

@ -8,7 +8,9 @@
#include "utils/on_scope_exit.hpp"
GraphDbAccessor::GraphDbAccessor(GraphDb &db)
: db_(db), transaction_(db.tx_engine_.Begin()) {}
: db_(db), transaction_(db.tx_engine_.Begin()) {
db_.wal_.TxBegin(transaction_->id_);
}
GraphDbAccessor::~GraphDbAccessor() {
if (!commited_ && !aborted_) {
@ -16,6 +18,10 @@ GraphDbAccessor::~GraphDbAccessor() {
}
}
tx::transaction_id_t GraphDbAccessor::transaction_id() const {
return transaction_->id_;
}
void GraphDbAccessor::AdvanceCommand() {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
transaction_->engine_.Advance(transaction_->id_);
@ -23,13 +29,17 @@ void GraphDbAccessor::AdvanceCommand() {
void GraphDbAccessor::Commit() {
DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction.";
auto tid = transaction_->id_;
transaction_->Commit();
db_.wal_.TxCommit(tid);
commited_ = true;
}
void GraphDbAccessor::Abort() {
DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction.";
auto tid = transaction_->id_;
transaction_->Abort();
db_.wal_.TxAbort(tid);
aborted_ = true;
}
@ -38,17 +48,51 @@ bool GraphDbAccessor::should_abort() const {
return transaction_->should_abort();
}
VertexAccessor GraphDbAccessor::InsertVertex() {
durability::WriteAheadLog &GraphDbAccessor::wal() { return db_.wal_; }
VertexAccessor GraphDbAccessor::InsertVertex(
std::experimental::optional<int64_t> opt_id) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
// create a vertex
auto vertex_vlist = new mvcc::VersionList<Vertex>(*transaction_);
auto id = opt_id ? *opt_id : db_.next_vertex_id_++;
if (opt_id) {
while (true) {
auto next_id = db_.next_vertex_id_.load();
if (next_id > id) break;
if (db_.next_vertex_id_.compare_exchange_strong(next_id, id + 1)) break;
}
}
bool success = db_.vertices_.access().insert(vertex_vlist).second;
CHECK(success) << "It is impossible for new version list to already exist";
auto vertex_vlist = new mvcc::VersionList<Vertex>(*transaction_, id);
bool success = db_.vertices_.access().insert(id, vertex_vlist).second;
CHECK(success) << "Attempting to insert a vertex with an existing ID: " << id;
db_.wal_.CreateVertex(transaction_->id_, vertex_vlist->id_);
return VertexAccessor(*vertex_vlist, *this);
}
std::experimental::optional<VertexAccessor> GraphDbAccessor::FindVertex(
int64_t id, bool current_state) {
auto collection_accessor = db_.vertices_.access();
auto found = collection_accessor.find(id);
if (found == collection_accessor.end()) return std::experimental::nullopt;
VertexAccessor record_accessor(*found->second, *this);
if (!Visible(record_accessor, current_state))
return std::experimental::nullopt;
return record_accessor;
}
std::experimental::optional<EdgeAccessor> GraphDbAccessor::FindEdge(
int64_t id, bool current_state) {
auto collection_accessor = db_.edges_.access();
auto found = collection_accessor.find(id);
if (found == collection_accessor.end()) return std::experimental::nullopt;
EdgeAccessor record_accessor(*found->second, *this);
if (!Visible(record_accessor, current_state))
return std::experimental::nullopt;
return record_accessor;
}
void GraphDbAccessor::BuildIndex(const GraphDbTypes::Label &label,
const GraphDbTypes::Property &property) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
@ -87,6 +131,10 @@ void GraphDbAccessor::BuildIndex(const GraphDbTypes::Label &label,
// TODO reconsider this constant, currently rule-of-thumb chosen
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
// We must notify the WAL about this transaction manually since it's not
// handled by a GraphDbAccessor.
db_.wal_.TxBegin(wait_transaction->id_);
db_.wal_.TxCommit(wait_transaction->id_);
wait_transaction->Commit();
}
@ -98,8 +146,12 @@ void GraphDbAccessor::BuildIndex(const GraphDbTypes::Label &label,
vertex.current_);
}
// Commit transaction as we finished applying method on newest visible
// records.
// records. Write that transaction's ID to the WAL as the index has been built
// at this point even if this DBA's transaction aborts for some reason.
auto wal_build_index_tx_id = dba.transaction_id();
dba.Commit();
db_.wal_.BuildIndex(wal_build_index_tx_id, LabelName(label),
PropertyName(property));
// After these two operations we are certain that everything is contained in
// the index under the assumption that this transaction contained no
@ -202,6 +254,8 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor) {
if (vertex_accessor.out_degree() > 0 || vertex_accessor.in_degree() > 0)
return false;
db_.wal_.RemoveVertex(transaction_->id_, vertex_accessor.vlist_->id_);
vertex_accessor.vlist_->remove(vertex_accessor.current_, *transaction_);
return true;
}
@ -223,18 +277,26 @@ void GraphDbAccessor::DetachRemoveVertex(VertexAccessor &vertex_accessor) {
vertex_accessor.vlist_->remove(vertex_accessor.current_, *transaction_);
}
EdgeAccessor GraphDbAccessor::InsertEdge(VertexAccessor &from,
VertexAccessor &to,
GraphDbTypes::EdgeType edge_type) {
EdgeAccessor GraphDbAccessor::InsertEdge(
VertexAccessor &from, VertexAccessor &to, GraphDbTypes::EdgeType edge_type,
std::experimental::optional<int64_t> opt_id) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
// create an edge
auto edge_vlist = new mvcc::VersionList<Edge>(*transaction_, *from.vlist_,
auto id = opt_id ? *opt_id : db_.next_edge_id++;
if (opt_id) {
while (true) {
auto next_id = db_.next_edge_id.load();
if (next_id > id) break;
if (db_.next_edge_id.compare_exchange_strong(next_id, id + 1)) break;
}
}
auto edge_vlist = new mvcc::VersionList<Edge>(*transaction_, id, *from.vlist_,
*to.vlist_, edge_type);
// We need to insert edge_vlist to edges_ before calling update since update
// can throw and edge_vlist will not be garbage collected if it is not in
// edges_ skiplist.
bool success = db_.edges_.access().insert(edge_vlist).second;
CHECK(success) << "It is impossible for new version list to already exist";
bool success = db_.edges_.access().insert(id, edge_vlist).second;
CHECK(success) << "Attempting to insert an edge with an existing ID: " << id;
// ensure that the "from" accessor has the latest version
from.SwitchNew();
@ -245,10 +307,9 @@ EdgeAccessor GraphDbAccessor::InsertEdge(VertexAccessor &from,
to.SwitchNew();
to.update().in_.emplace(from.vlist_, edge_vlist, edge_type);
// This has to be here because there is no additional method for setting edge
// type.
const auto edge_accessor = EdgeAccessor(*edge_vlist, *this);
return edge_accessor;
db_.wal_.CreateEdge(transaction_->id_, edge_vlist->id_, from.vlist_->id_,
to.vlist_->id_, EdgeTypeName(edge_type));
return EdgeAccessor(*edge_vlist, *this);
}
int64_t GraphDbAccessor::EdgesCount() const {
@ -269,6 +330,8 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge_accessor,
if (remove_from_to)
edge_accessor.to().update().in_.RemoveEdge(edge_accessor.vlist_);
edge_accessor.vlist_->remove(edge_accessor.current_, *transaction_);
db_.wal_.RemoveEdge(transaction_->id_, edge_accessor.id());
}
GraphDbTypes::Label GraphDbAccessor::Label(const std::string &label_name) {

View File

@ -31,12 +31,10 @@ class IndexBuildInProgressException : public utils::BasicException {
};
/**
* An accessor for the database object: exposes functions
* for operating on the database. All the functions in
* this class should be self-sufficient: for example the
* function for creating
* a new Vertex should take care of all the book-keeping around
* the creation.
* An accessor for the database object: exposes functions for operating on the
* database. All the functions in this class should be self-sufficient: for
* example the function for creating a new Vertex should take care of all the
* book-keeping around the creation.
*/
class GraphDbAccessor {
@ -56,26 +54,33 @@ class GraphDbAccessor {
explicit GraphDbAccessor(GraphDb &db);
~GraphDbAccessor();
// the GraphDbAccessor can NOT be copied nor moved because
// 1. it ensures transaction cleanup once it's destructed
// 2. it will contain index and side-effect bookkeeping data
// which is unique to the transaction (shared_ptr works but slower)
GraphDbAccessor(const GraphDbAccessor &other) = delete;
GraphDbAccessor(GraphDbAccessor &&other) = delete;
GraphDbAccessor &operator=(const GraphDbAccessor &other) = delete;
GraphDbAccessor &operator=(GraphDbAccessor &&other) = delete;
/**
* Creates a new Vertex and returns an accessor to it.
* Creates a new Vertex and returns an accessor to it. If the ID is
* provided, the created Vertex will have that ID, and the ID counter will be
* increased to it so collisions are avoided. This should only be used by
* durability recovery, normal vertex creation should not provide the ID.
*
* You should NOT make interleaved recovery and normal DB op calls to this
* function. Doing so will likely mess up the ID generation and crash MG.
* Always perform recovery only once, immediately when the database is
* created, before any transactional ops start.
*
* @param opt_id The desired ID. Should only be provided when recovering from
* durability.
* @return See above.
*/
VertexAccessor InsertVertex();
VertexAccessor InsertVertex(
std::experimental::optional<int64_t> opt_id = std::experimental::nullopt);
/**
* Removes the vertex of the given accessor. If the vertex has any outgoing
* or incoming edges, it is not deleted. See `DetachRemoveVertex` if you
* want to remove a vertex regardless of connectivity.
* Removes the vertex of the given accessor. If the vertex has any outgoing or
* incoming edges, it is not deleted. See `DetachRemoveVertex` if you want to
* remove a vertex regardless of connectivity.
*
* If the vertex has already been deleted by the current transaction+command,
* this function will not do anything and will return true.
@ -93,6 +98,20 @@ class GraphDbAccessor {
*/
void DetachRemoveVertex(VertexAccessor &vertex_accessor);
/**
* Obtains the vertex for the given ID. If there is no vertex for the given
* ID, or it's not visible to this accessor's transaction, nullopt is
* returned.
*
* @param id - The ID of the sought vertex.
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
std::experimental::optional<VertexAccessor> FindVertex(int64_t id,
bool current_state);
/**
* Returns iterable over accessors to all the vertices in the graph
* visible to the current transaction.
@ -105,18 +124,16 @@ class GraphDbAccessor {
auto Vertices(bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
// wrap version lists into accessors, which will look for visible versions
auto accessors =
iter::imap([this](auto vlist) { return VertexAccessor(*vlist, *this); },
db_.vertices_.access());
auto accessors = iter::imap(
[this](auto id_vlist) {
return VertexAccessor(*id_vlist.second, *this);
},
db_.vertices_.access());
// filter out the accessors not visible to the current transaction
return iter::filter(
[this, current_state](const VertexAccessor &accessor) {
return (accessor.old_ &&
!(current_state &&
accessor.old_->is_expired_by(*transaction_))) ||
(current_state && accessor.new_ &&
!accessor.new_->is_expired_by(*transaction_));
return Visible(accessor, current_state);
},
std::move(accessors));
}
@ -141,6 +158,7 @@ class GraphDbAccessor {
/**
* Return VertexAccessors which contain the current label and property for the
* given transaction visibility.
*
* @param label - label for which to return VertexAccessors
* @param property - property for which to return VertexAccessors
* @param current_state If true then the graph state for the
@ -235,15 +253,26 @@ class GraphDbAccessor {
}
/**
* Creates a new Edge and returns an accessor to it.
* Creates a new Edge and returns an accessor to it. If the ID is
* provided, the created Edge will have that ID, and the ID counter will be
* increased to it so collisions are avoided. This should only be used by
* durability recovery, normal edge creation should not provide the ID.
*
* You should NOT make interleaved recovery and normal DB op calls to this
* function. Doing so will likely mess up the ID generation and crash MG.
* Always perform recovery only once, immediately when the database is
* created, before any transactional ops start.
*
* @param from The 'from' vertex.
* @param to The 'to' vertex'
* @param type Edge type.
* @param opt_id The desired ID. Should only be provided when recovering from
* durability.
* @return An accessor to the edge.
*/
EdgeAccessor InsertEdge(VertexAccessor &from, VertexAccessor &to,
GraphDbTypes::EdgeType type);
EdgeAccessor InsertEdge(
VertexAccessor &from, VertexAccessor &to, GraphDbTypes::EdgeType type,
std::experimental::optional<int64_t> opt_id = std::experimental::nullopt);
/**
* Removes an edge from the graph. Parameters can indicate if the edge should
@ -260,6 +289,19 @@ class GraphDbAccessor {
void RemoveEdge(EdgeAccessor &edge_accessor, bool remove_from = true,
bool remove_to = true);
/**
* Obtains the edge for the given ID. If there is no edge for the given
* ID, or it's not visible to this accessor's transaction, nullopt is
* returned.
*
* @param id - The ID of the sought edge.
* @param current_state If true then the graph state for the
* current transaction+command is returned (insertions, updates and
* deletions performed in the current transaction+command are not
* ignored).
*/
std::experimental::optional<EdgeAccessor> FindEdge(int64_t id,
bool current_state);
/**
* Returns iterable over accessors to all the edges in the graph
* visible to the current transaction.
@ -273,18 +315,14 @@ class GraphDbAccessor {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
// wrap version lists into accessors, which will look for visible versions
auto accessors =
iter::imap([this](auto vlist) { return EdgeAccessor(*vlist, *this); },
db_.edges_.access());
auto accessors = iter::imap(
[this](auto id_vlist) { return EdgeAccessor(*id_vlist.second, *this); },
db_.edges_.access());
// filter out the accessors not visible to the current transaction
return iter::filter(
[this, current_state](const EdgeAccessor &accessor) {
return (accessor.old_ &&
!(current_state &&
accessor.old_->is_expired_by(*transaction_))) ||
(current_state && accessor.new_ &&
!accessor.new_->is_expired_by(*transaction_));
return Visible(accessor, current_state);
},
std::move(accessors));
}
@ -301,8 +339,8 @@ class GraphDbAccessor {
* `accessor`. It has default post-construction `current_` set (`old` if
* available, otherwise `new`).
*
* @param accessor The [Vertex/Edge]Accessor whose underlying graph
* element we want in this GraphDbAccessor.
* @param accessor The [Vertex/Edge]Accessor whose underlying graph element we
* want in this GraphDbAccessor.
* @return See above.
* @tparam TAccessor Either VertexAccessor or EdgeAccessor
*/
@ -323,8 +361,8 @@ class GraphDbAccessor {
* existing vertices that belong to it.
*
* You should never call BuildIndex on a GraphDbAccessor (transaction) on
* which new vertices have been inserted or existing ones updated. Do it in a
* new accessor instead.
* which new vertices have been inserted or existing ones updated. Do it
* in a new accessor instead.
*
* Build index throws if an index for the given (label, property) already
* exists (even if it's being built by a concurrent transaction and is not yet
@ -355,7 +393,7 @@ class GraphDbAccessor {
*/
std::vector<LabelPropertyIndex::Key> GetIndicesKeys() {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.label_property_index_.GetIndicesKeys();
return db_.label_property_index_.Keys();
}
/**
@ -373,6 +411,7 @@ class GraphDbAccessor {
/**
* Return approximate number of vertices under indexes with the given label.
* Note that this is always an over-estimate and never an under-estimate.
*
* @param label - label to check for
* @return number of vertices with the given label
*/
@ -380,8 +419,9 @@ class GraphDbAccessor {
/**
* Return approximate number of vertices under indexes with the given label
* and property.
* Note that this is always an over-estimate and never an under-estimate.
* and property. Note that this is always an over-estimate and never an
* under-estimate.
*
* @param label - label to check for
* @param property - property to check for
* @return number of vertices with the given label, fails if no such
@ -457,6 +497,9 @@ class GraphDbAccessor {
*/
const std::string &PropertyName(const GraphDbTypes::Property property) const;
/** Returns the id of this accessor's transaction */
tx::transaction_id_t transaction_id() const;
/**
* Advances transaction's command id by 1.
*/
@ -480,6 +523,9 @@ class GraphDbAccessor {
/** Returns the transaction of this accessor */
const tx::Transaction &transaction() const { return *transaction_; }
/** Return's the database's write-ahead log */
durability::WriteAheadLog &wal();
/**
* Initializes the record pointers in the given accessor.
* The old_ and new_ pointers need to be initialized
@ -539,8 +585,8 @@ class GraphDbAccessor {
/**
* Sets the counter with the given name to the given value. Returns nothing.
* If the counter with the given name does not exist, a new counter is
* created and set to the given value.
* If the counter with the given name does not exist, a new counter is created
* and set to the given value.
*/
void CounterSet(const std::string &name, int64_t value);
@ -553,6 +599,7 @@ class GraphDbAccessor {
/**
* Insert this vertex into corresponding label and label+property (if it
* exists) index.
*
* @param label - label with which to insert vertex label record
* @param vertex_accessor - vertex_accessor to insert
* @param vertex - vertex record to insert
@ -572,6 +619,17 @@ class GraphDbAccessor {
const RecordAccessor<Vertex> &record_accessor,
const Vertex *const vertex);
/** Returns true if the given accessor (made with this GraphDbAccessor) is
* visible given the `current_state` flag. */
template <typename TRecord>
bool Visible(const RecordAccessor<TRecord> &accessor,
bool current_state) const {
return (accessor.old_ &&
!(current_state && accessor.old_->is_expired_by(*transaction_))) ||
(current_state && accessor.new_ &&
!accessor.new_->is_expired_by(*transaction_));
}
GraphDb &db_;
/** The current transaction */

View File

@ -376,17 +376,6 @@ class LabelPropertyIndex {
});
}
/**
* @brief - Returns vector of keys of label-property indice.
*/
std::vector<Key> GetIndicesKeys() {
std::vector<Key> indices;
for (auto index : indices_.access()) {
indices.push_back(index.first);
}
return indices;
}
/**
* Returns a vector of keys present in this index.
*/

View File

@ -8,6 +8,8 @@
/**
* Buffer that writes data to file and calculates hash of written data.
* Implements template param Buffer interface from BaseEncoder class.
*
* All of the methods on a HashedFileWriter can throw an exception.
*/
class HashedFileWriter {
public:
@ -21,7 +23,13 @@ class HashedFileWriter {
output_stream_.open(path, std::ios::out | std::ios::binary);
}
/** Closes ofstream. */
/** Opens the writer */
void Open(const std::string &path) {
output_stream_.open(path, std::ios::out | std::ios::binary);
hasher_ = Hasher();
}
/** Closes the writer. */
void Close() { output_stream_.close(); }
/**

View File

@ -1,14 +1,19 @@
#include "durability/recovery.hpp"
#include <limits>
#include <unordered_map>
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/version.hpp"
#include "durability/wal.hpp"
#include "query/typed_value.hpp"
#include "transactions/type.hpp"
#include "utils/string.hpp"
using communication::bolt::DecodedValue;
namespace durability {
bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
int64_t &edge_count, uint64_t &hash) {
auto pos = buffer.Tellg();
@ -20,35 +25,43 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
buffer.Seek(pos);
return r_val;
}
}
bool Recovery::Recover(const fs::path &snapshot_file,
GraphDbAccessor &db_accessor) {
if (!fs::exists(snapshot_file)) return false;
if (!Decode(snapshot_file, db_accessor)) {
db_accessor.Abort();
return false;
namespace {
using communication::bolt::DecodedValue;
// A data structure for exchanging info between main recovery function and
// snapshot and WAL recovery functions.
struct RecoveryData {
tx::transaction_id_t snapshooter_tx_id{0};
std::vector<tx::transaction_id_t> snapshooter_tx_snapshot;
// A collection into which the indexes should be added so they
// can be rebuilt at the end of the recovery transaction.
std::vector<std::pair<std::string, std::string>> indexes;
void Clear() {
snapshooter_tx_id = 0;
snapshooter_tx_snapshot.clear();
indexes.clear();
}
db_accessor.Commit();
return true;
}
};
#define RETURN_IF_NOT(condition) \
if (!(condition)) { \
buffer.Close(); \
reader.Close(); \
return false; \
}
bool Recovery::Decode(const fs::path &snapshot_file,
GraphDbAccessor &db_accessor) {
HashedFileReader buffer;
communication::bolt::Decoder<HashedFileReader> decoder(buffer);
bool RecoverSnapshot(const fs::path &snapshot_file,
GraphDbAccessor &db_accessor,
RecoveryData &recovery_data) {
HashedFileReader reader;
communication::bolt::Decoder<HashedFileReader> decoder(reader);
RETURN_IF_NOT(buffer.Open(snapshot_file));
RETURN_IF_NOT(reader.Open(snapshot_file));
std::unordered_map<uint64_t, VertexAccessor> vertices;
auto magic_number = durability::kMagicNumber;
buffer.Read(magic_number.data(), magic_number.size());
reader.Read(magic_number.data(), magic_number.size());
RETURN_IF_NOT(magic_number == durability::kMagicNumber);
// Read the vertex and edge count, and the hash, from the end of the snapshot.
@ -56,34 +69,39 @@ bool Recovery::Decode(const fs::path &snapshot_file,
int64_t edge_count;
uint64_t hash;
RETURN_IF_NOT(
durability::ReadSnapshotSummary(buffer, vertex_count, edge_count, hash));
durability::ReadSnapshotSummary(reader, vertex_count, edge_count, hash));
DecodedValue dv;
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int) &&
dv.ValueInt() == durability::kVersion);
// Transaction snapshot of the transaction that created the snapshot :D In the
// current recovery implementation it's ignored.
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int));
recovery_data.snapshooter_tx_id = dv.ValueInt();
// Transaction snapshot of the transaction that created the snapshot.
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::List));
for (const auto &value : dv.ValueList()) {
RETURN_IF_NOT(value.IsInt());
recovery_data.snapshooter_tx_snapshot.emplace_back(value.ValueInt());
}
// A list of label+property indexes.
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::List));
auto indexes = dv.ValueList();
for (auto it = indexes.begin(); it != indexes.end();) {
auto index_value = dv.ValueList();
for (auto it = index_value.begin(); it != index_value.end();) {
auto label = *it++;
RETURN_IF_NOT(it != indexes.end());
RETURN_IF_NOT(it != index_value.end());
auto property = *it++;
RETURN_IF_NOT(label.IsString() && property.IsString());
db_accessor.BuildIndex(db_accessor.Label(label.ValueString()),
db_accessor.Property(property.ValueString()));
recovery_data.indexes.emplace_back(label.ValueString(),
property.ValueString());
}
for (int64_t i = 0; i < vertex_count; ++i) {
DecodedValue vertex_dv;
RETURN_IF_NOT(decoder.ReadValue(&vertex_dv, DecodedValue::Type::Vertex));
auto &vertex = vertex_dv.ValueVertex();
auto vertex_accessor = db_accessor.InsertVertex();
auto vertex_accessor = db_accessor.InsertVertex(vertex.id);
for (const auto &label : vertex.labels) {
vertex_accessor.add_label(db_accessor.Label(label));
}
@ -100,8 +118,9 @@ bool Recovery::Decode(const fs::path &snapshot_file,
auto it_from = vertices.find(edge.from);
auto it_to = vertices.find(edge.to);
RETURN_IF_NOT(it_from != vertices.end() && it_to != vertices.end());
auto edge_accessor = db_accessor.InsertEdge(
it_from->second, it_to->second, db_accessor.EdgeType(edge.type));
auto edge_accessor =
db_accessor.InsertEdge(it_from->second, it_to->second,
db_accessor.EdgeType(edge.type), edge.id);
for (const auto &property_pair : edge.properties)
edge_accessor.PropsSet(db_accessor.Property(property_pair.first),
@ -110,10 +129,252 @@ bool Recovery::Decode(const fs::path &snapshot_file,
// Vertex and edge counts are included in the hash. Re-read them to update the
// hash.
buffer.ReadType(vertex_count);
buffer.ReadType(edge_count);
return (buffer.Close() && buffer.hash() == hash);
reader.ReadType(vertex_count);
reader.ReadType(edge_count);
if (!reader.Close()) return false;
return reader.hash() == hash;
}
#undef RETURN_IF_NOT
void ApplyOp(const WriteAheadLog::Op &op, GraphDbAccessor &dba) {
switch (op.type_) {
// Transactional state is not recovered.
case WriteAheadLog::Op::Type::TRANSACTION_BEGIN:
case WriteAheadLog::Op::Type::TRANSACTION_COMMIT:
case WriteAheadLog::Op::Type::TRANSACTION_ABORT:
LOG(FATAL) << "Transaction handling not handled in ApplyOp";
break;
case WriteAheadLog::Op::Type::CREATE_VERTEX:
dba.InsertVertex(op.vertex_id_);
break;
case WriteAheadLog::Op::Type::CREATE_EDGE: {
auto from = dba.FindVertex(op.vertex_from_id_, true);
auto to = dba.FindVertex(op.vertex_to_id_, true);
DCHECK(from) << "Failed to find vertex.";
DCHECK(to) << "Failed to find vertex.";
dba.InsertEdge(*from, *to, dba.EdgeType(op.edge_type_), op.edge_id_);
break;
}
case WriteAheadLog::Op::Type::SET_PROPERTY_VERTEX: {
auto vertex = dba.FindVertex(op.vertex_id_, true);
DCHECK(vertex) << "Failed to find vertex.";
vertex->PropsSet(dba.Property(op.property_), op.value_);
break;
}
case WriteAheadLog::Op::Type::SET_PROPERTY_EDGE: {
auto edge = dba.FindEdge(op.edge_id_, true);
DCHECK(edge) << "Failed to find edge.";
edge->PropsSet(dba.Property(op.property_), op.value_);
break;
}
case WriteAheadLog::Op::Type::ADD_LABEL: {
auto vertex = dba.FindVertex(op.vertex_id_, true);
DCHECK(vertex) << "Failed to find vertex.";
vertex->add_label(dba.Label(op.label_));
break;
}
case WriteAheadLog::Op::Type::REMOVE_LABEL: {
auto vertex = dba.FindVertex(op.vertex_id_, true);
DCHECK(vertex) << "Failed to find vertex.";
vertex->remove_label(dba.Label(op.label_));
break;
}
case WriteAheadLog::Op::Type::REMOVE_VERTEX: {
auto vertex = dba.FindVertex(op.vertex_id_, true);
DCHECK(vertex) << "Failed to find vertex.";
dba.DetachRemoveVertex(*vertex);
break;
}
case WriteAheadLog::Op::Type::REMOVE_EDGE: {
auto edge = dba.FindEdge(op.edge_id_, true);
DCHECK(edge) << "Failed to find edge.";
dba.RemoveEdge(*edge);
break;
}
case WriteAheadLog::Op::Type::BUILD_INDEX: {
LOG(FATAL) << "Index handling not handled in ApplyOp";
break;
}
}
}
// Returns the transaction id contained in the file name. If the filename is not
// a parseable WAL file name, nullopt is returned. If the filename represents
// the "current" WAL file, then the maximum possible transaction ID is returned.
std::experimental::optional<tx::transaction_id_t> TransactionIdFromWalFilename(
const std::string &name) {
// Get the max_transaction_id from the file name that has format
// "XXXXX__max_transaction_<MAX_TRANS_ID>"
auto file_name_split = utils::RSplit(name, "__", 1);
if (file_name_split.size() != 2) {
LOG(WARNING) << "Unable to parse WAL file name: " << name;
return std::experimental::nullopt;
}
if (file_name_split[1] == "current")
return std::numeric_limits<tx::transaction_id_t>::max();
file_name_split = utils::RSplit(file_name_split[1], "_", 1);
if (file_name_split.size() != 2) {
LOG(WARNING) << "Unable to parse WAL file name: " << name;
return std::experimental::nullopt;
}
return std::stoi(file_name_split[1]);
}
// TODO - finer-grained recovery feedback could be useful here.
bool RecoverWal(GraphDbAccessor &db_accessor, RecoveryData &recovery_data) {
// Get paths to all the WAL files and sort them (on date).
std::vector<fs::path> wal_files;
if (!fs::exists(FLAGS_wal_directory)) return true;
for (auto &wal_file : fs::directory_iterator(FLAGS_wal_directory))
wal_files.emplace_back(wal_file);
std::sort(wal_files.begin(), wal_files.end());
// Track which transaction should be recovered next.
tx::transaction_id_t next_to_recover = recovery_data.snapshooter_tx_id + 1;
// Some transactions that come after the first to recover need to be skipped
// (if they committed before the snapshot, and are not in the snapshot's tx
// snapshot).
std::set<tx::transaction_id_t> to_skip;
if (!recovery_data.snapshooter_tx_snapshot.empty()) {
std::set<tx::transaction_id_t> txs{
recovery_data.snapshooter_tx_snapshot.begin(),
recovery_data.snapshooter_tx_snapshot.end()};
next_to_recover = *txs.begin();
for (tx::transaction_id_t i = next_to_recover;
i < recovery_data.snapshooter_tx_id; ++i)
if (txs.find(i) == txs.end()) to_skip.emplace(i);
// We don't try to recover the snapshooter transaction.
to_skip.emplace(recovery_data.snapshooter_tx_id);
}
// A buffer for the WAL transaction ops. Accumulate and apply them in the
// right transactional sequence.
std::map<tx::transaction_id_t, std::vector<WriteAheadLog::Op>> ops;
// Track which transactions were aborted/committed in the WAL.
std::set<tx::transaction_id_t> aborted;
std::set<tx::transaction_id_t> committed;
auto apply_all_possible = [&]() {
while (true) {
// Remove old ops from memory.
for (auto it = ops.begin(); it != ops.end();) {
if (it->first < next_to_recover)
it = ops.erase(it);
else
++it;
}
// Check if we can apply skip/apply the next transaction.
if (to_skip.find(next_to_recover) != to_skip.end())
next_to_recover++;
else if (utils::Contains(aborted, next_to_recover)) {
next_to_recover++;
} else if (utils::Contains(committed, next_to_recover)) {
auto found = ops.find(next_to_recover);
if (found != ops.end())
for (const auto &op : found->second) ApplyOp(op, db_accessor);
next_to_recover++;
} else
break;
}
};
// Read all the WAL files whose max_tx_id is not smaller then
// min_tx_to_recover
for (auto &wal_file : wal_files) {
auto wal_file_tx_id = TransactionIdFromWalFilename(wal_file.filename());
if (!wal_file_tx_id || *wal_file_tx_id < next_to_recover) continue;
HashedFileReader wal_reader;
if (!wal_reader.Open(wal_file)) return false;
communication::bolt::Decoder<HashedFileReader> decoder(wal_reader);
while (true) {
auto op = WriteAheadLog::Op::Decode(wal_reader, decoder);
if (!op) break;
switch (op->type_) {
case WriteAheadLog::Op::Type::TRANSACTION_BEGIN:
DCHECK(ops.find(op->transaction_id_) == ops.end())
<< "Double transaction start";
if (to_skip.find(op->transaction_id_) == to_skip.end())
ops.emplace(op->transaction_id_, std::vector<WriteAheadLog::Op>{});
break;
case WriteAheadLog::Op::Type::TRANSACTION_ABORT: {
auto it = ops.find(op->transaction_id_);
if (it != ops.end()) ops.erase(it);
aborted.emplace(op->transaction_id_);
apply_all_possible();
break;
}
case WriteAheadLog::Op::Type::TRANSACTION_COMMIT:
committed.emplace(op->transaction_id_);
apply_all_possible();
break;
case WriteAheadLog::Op::Type::BUILD_INDEX: {
recovery_data.indexes.emplace_back(op->label_, op->property_);
break;
}
default: {
auto it = ops.find(op->transaction_id_);
if (it != ops.end()) it->second.emplace_back(*op);
}
}
} // reading all Ops in a single wal file
} // reading all wal files
apply_all_possible();
// TODO when implementing proper error handling return one of the following:
// - WAL fully recovered
// - WAL partially recovered
// - WAL recovery error
return true;
}
} // anonymous namespace
bool Recover(const fs::path &snapshot_dir, GraphDb &db) {
RecoveryData recovery_data;
// Attempt to recover from snapshot files in reverse order (from newest
// backwards).
std::vector<fs::path> snapshot_files;
if (fs::exists(snapshot_dir) && fs::is_directory(snapshot_dir))
for (auto &file : fs::directory_iterator(snapshot_dir))
snapshot_files.emplace_back(file);
std::sort(snapshot_files.rbegin(), snapshot_files.rend());
for (auto &snapshot_file : snapshot_files) {
GraphDbAccessor db_accessor{db};
LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
if (!RecoverSnapshot(snapshot_file, db_accessor, recovery_data)) {
db_accessor.Abort();
recovery_data.Clear();
LOG(WARNING) << "Snapshot recovery failed, trying older snapshot...";
continue;
} else {
LOG(INFO) << "Snapshot recovery successful.";
db_accessor.Commit();
break;
}
}
// Write-ahead-log recovery.
GraphDbAccessor db_accessor{db};
// WAL recovery does not have to be complete for the recovery to be
// considered successful. For the time being ignore the return value,
// consider a better system.
RecoverWal(db_accessor, recovery_data);
db_accessor.Commit();
// Index recovery.
GraphDbAccessor db_accessor_indices{db};
for (const auto &label_prop : recovery_data.indexes)
db_accessor_indices.BuildIndex(
db_accessor_indices.Label(label_prop.first),
db_accessor_indices.Property(label_prop.second));
db_accessor_indices.Commit();
return true;
}
} // namespace durability

View File

@ -3,40 +3,28 @@
#include <experimental/filesystem>
#include <unordered_map>
#include "database/graph_db_accessor.hpp"
#include "database/graph_db.hpp"
#include "durability/hashed_file_reader.hpp"
#include "storage/vertex_accessor.hpp"
namespace fs = std::experimental::filesystem;
namespace durability {
// TODO review: replacement of Recovery class with a function is coming in
// another diff.
/** Reads snapshot metadata from the end of the file without messing up the
* hash. */
bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
int64_t &edge_count, uint64_t &hash);
}
/**
* Class used to recover database from snapshot file.
* Recovers database from snapshot_file. If recovering fails, false is returned
* and db_accessor aborts transaction, else true is returned and transaction is
* commited.
*
* @param snapshot_dir - Path to snapshot directory.
* @param db - The database to recover into.
* @return - If recovery was succesful.
*/
class Recovery {
public:
/**
* Recovers database from snapshot_file. Graph elements are inserted
* in graph using db_accessor. If recovering fails, false is returned and
* db_accessor aborts transaction, else true is returned and transaction is
* commited.
* @param snapshot_file:
* path to snapshot file
* @param db_accessor:
* GraphDbAccessor used to access database.
*/
bool Recover(const fs::path &snapshot_file, GraphDbAccessor &db_accessor);
private:
/**
* Decodes database from snapshot_file. Graph emlements are inserted in
* graph using db_accessor. If decoding fails, false is returned, else ture.
*/
bool Decode(const fs::path &snapshot_file, GraphDbAccessor &db_accessor);
};
bool Recover(const std::experimental::filesystem::path &snapshot_dir,
GraphDb &db);
}

View File

@ -10,25 +10,10 @@
#include "durability/version.hpp"
#include "utils/datetime/timestamp.hpp"
bool Snapshooter::MakeSnapshot(GraphDbAccessor &db_accessor_,
const fs::path &snapshot_folder,
const int snapshot_max_retained) {
if (!fs::exists(snapshot_folder) &&
!fs::create_directories(snapshot_folder)) {
LOG(ERROR) << "Error while creating directory " << snapshot_folder;
return false;
}
const auto snapshot_file = GetSnapshotFileName(snapshot_folder);
if (fs::exists(snapshot_file)) return false;
if (Encode(snapshot_file, db_accessor_)) {
MaintainMaxRetainedFiles(snapshot_folder, snapshot_max_retained);
return true;
}
return false;
}
namespace durability {
bool Snapshooter::Encode(const fs::path &snapshot_file,
GraphDbAccessor &db_accessor_) {
namespace {
bool Encode(const fs::path &snapshot_file, GraphDbAccessor &db_accessor_) {
try {
HashedFileWriter buffer(snapshot_file);
communication::bolt::BaseEncoder<HashedFileWriter> encoder(buffer);
@ -38,6 +23,9 @@ bool Snapshooter::Encode(const fs::path &snapshot_file,
durability::kMagicNumber.size());
encoder.WriteInt(durability::kVersion);
// Write the ID of the transaction doing the snapshot.
encoder.WriteInt(db_accessor_.transaction_id());
// Write the transaction snapshot into the snapshot. It's used when
// recovering from the combination of snapshot and write-ahead-log.
{
@ -79,31 +67,46 @@ bool Snapshooter::Encode(const fs::path &snapshot_file,
return true;
}
fs::path GetSnapshotFileName(const fs::path &snapshot_folder) {
void MaintainMaxRetainedFiles(const fs::path &snapshot_folder,
int snapshot_max_retained) {
if (snapshot_max_retained == -1) return;
std::vector<fs::path> files;
for (auto &file : fs::directory_iterator(snapshot_folder))
files.push_back(file.path());
if (static_cast<int>(files.size()) <= snapshot_max_retained) return;
sort(files.begin(), files.end());
for (size_t i = 0U; i < files.size() - snapshot_max_retained; ++i) {
if (!fs::remove(files[i])) {
LOG(ERROR) << "Error while removing file: " << files[i];
}
}
}
} // annonnymous namespace
fs::path MakeSnapshotPath(const fs::path &snapshot_folder) {
std::string date_str =
Timestamp(Timestamp::now())
.to_string("{:04d}_{:02d}_{:02d}__{:02d}_{:02d}_{:02d}_{:05d}");
return snapshot_folder / date_str;
}
std::vector<fs::path> Snapshooter::GetSnapshotFiles(
const fs::path &snapshot_folder) {
std::vector<fs::path> files;
for (auto &file : fs::directory_iterator(snapshot_folder))
files.push_back(file.path());
return files;
}
void Snapshooter::MaintainMaxRetainedFiles(const fs::path &snapshot_folder,
int snapshot_max_retained) {
if (snapshot_max_retained == -1) return;
std::vector<fs::path> files = GetSnapshotFiles(snapshot_folder);
if (static_cast<int>(files.size()) <= snapshot_max_retained) return;
sort(files.begin(), files.end());
for (int i = 0; i < static_cast<int>(files.size()) - snapshot_max_retained;
++i) {
if (!fs::remove(files[i])) {
LOG(ERROR) << "Error while removing file: " << files[i];
}
bool MakeSnapshot(GraphDbAccessor &db_accessor_,
const fs::path &snapshot_folder,
const int snapshot_max_retained) {
if (!fs::exists(snapshot_folder) &&
!fs::create_directories(snapshot_folder)) {
LOG(ERROR) << "Error while creating directory " << snapshot_folder;
return false;
}
const auto snapshot_file = MakeSnapshotPath(snapshot_folder);
if (fs::exists(snapshot_file)) return false;
if (Encode(snapshot_file, db_accessor_)) {
MaintainMaxRetainedFiles(snapshot_folder, snapshot_max_retained);
return true;
} else {
std::error_code error_code; // Just for exception suppression.
fs::remove(snapshot_file, error_code);
return false;
}
}
} // namespace durability

View File

@ -1,55 +1,25 @@
#pragma once
#include <cstring>
#include <experimental/filesystem>
#include <vector>
namespace fs = std::experimental::filesystem;
class GraphDbAccessor;
/**
* Returns path to new snapshot file in format snapshot_folder/timestamp.
*/
fs::path GetSnapshotFileName(const fs::path &snapshot_folder);
namespace durability {
using path = std::experimental::filesystem::path;
/** Generates a path for a DB snapshot in the given folder in a well-defined
* sortable format. */
path MakeSnapshotPath(const path &snapshot_folder);
/**
* Class responsible for making snapshots. Snapshots are stored in folder
* memgraph/build/$snapshot_folder/$db_name using bolt protocol.
* Make snapshot and save it in snapshots folder. Returns true if successful.
* @param db_accessor:
* GraphDbAccessor used to access elements of GraphDb.
* @param snapshot_folder:
* folder where snapshots are stored.
* @param snapshot_max_retained:
* maximum number of snapshots stored in snapshot folder.
*/
class Snapshooter {
public:
Snapshooter(){};
/**
* Make snapshot and save it in snapshots folder. Returns true if successful.
* @param db_accessor:
* GraphDbAccessor used to access elements of GraphDb.
* @param snapshot_folder:
* folder where snapshots are stored.
* @param snapshot_max_retained:
* maximum number of snapshots stored in snapshot folder.
*/
bool MakeSnapshot(GraphDbAccessor &db_accessor,
const fs::path &snapshot_folder,
int snapshot_max_retained);
private:
/**
* Method used to keep given number of snapshots in snapshot folder. Newest
* max_retained_files snapshots are kept, other snapshots are deleted. If
* max_retained_files is -1, all snapshots are kept.
*/
void MaintainMaxRetainedFiles(const fs::path &snapshot_folder,
const int max_retained_files);
/**
* Function returns list of snapshot files in snapshot folder.
*/
std::vector<fs::path> GetSnapshotFiles(const fs::path &snapshot_folder);
/**
* Encodes graph and stores it in file given as parameter. Graph elements are
* accessed using parameter db_accessor. If function is successfully executed,
* true is returned.
*/
bool Encode(const fs::path &snapshot_file, GraphDbAccessor &db_accessor);
};
bool MakeSnapshot(GraphDbAccessor &db_accessor, const path &snapshot_folder,
int snapshot_max_retained);
}

View File

@ -8,5 +8,5 @@ namespace durability {
constexpr std::array<uint8_t, 4> kMagicNumber{{'M', 'G', 's', 'n'}};
// The current default version of snapshot and WAL enconding / decoding.
constexpr int64_t kVersion{2};
constexpr int64_t kVersion{3};
}

17
src/durability/wal.cpp Normal file
View File

@ -0,0 +1,17 @@
#include "wal.hpp"
#include "utils/flag_validation.hpp"
DEFINE_int32(wal_flush_interval_millis, -1,
"Interval between two write-ahead log flushes, in milliseconds. "
"Set to -1 to disable the WAL.");
DEFINE_string(wal_directory, "wal",
"Directory in which the write-ahead log files are stored.");
DEFINE_int32(wal_rotate_ops_count, 10000,
"How many write-ahead ops should be stored in a single WAL file "
"before rotating it.");
DEFINE_VALIDATED_int32(wal_buffer_size, 4096, "Write-ahead log buffer size.",
FLAG_IN_RANGE(1, 1 << 30));

429
src/durability/wal.hpp Normal file
View File

@ -0,0 +1,429 @@
#pragma once
#include <chrono>
#include <cstdint>
#include <experimental/filesystem>
#include <experimental/optional>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "communication/bolt/v1/encoder/primitive_encoder.hpp"
#include "data_structures/ring_buffer.hpp"
#include "database/graph_db_datatypes.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/hashed_file_writer.hpp"
#include "storage/property_value.hpp"
#include "transactions/type.hpp"
#include "utils/datetime/timestamp.hpp"
#include "utils/scheduler.hpp"
#include "utils/timer.hpp"
// The amount of time between two flushes of the write-ahead log,
// in milliseconds.
DECLARE_int32(wal_flush_interval_millis);
// Directory in which the WAL is dumped.
DECLARE_string(wal_directory);
// How many Ops are stored in a single WAL file.
DECLARE_int32(wal_rotate_ops_count);
// The WAL buffer size (number of ops in a buffer).
DECLARE_int32(wal_buffer_size);
namespace durability {
/** A database operation log for durability. Buffers and periodically serializes
* small-granulation database operations (Ops).
*
* The order is not deterministic in a multithreaded scenario (multiple DB
* transactions). This is fine, the recovery process should be immune to this
* indeterminism.
*/
class WriteAheadLog {
public:
/** A single operation that needs to be written to the write-ahead log. Either
* a transaction operation (start, commit or abort), or a database storage
* operation being executed within a transaction. */
class Op {
public:
/** Defines operation type. For each type the comment indicates which values
* need to be stored. All ops have the transaction_id member, so that's
* omitted in the comment. */
enum class Type {
TRANSACTION_BEGIN,
TRANSACTION_COMMIT,
TRANSACTION_ABORT,
CREATE_VERTEX, // vertex_id
CREATE_EDGE, // edge_id, from_vertex_id, to_vertex_id, edge_type
SET_PROPERTY_VERTEX, // vertex_id, property, property_value
SET_PROPERTY_EDGE, // edge_id, property, property_value
// remove property is done by setting a PropertyValue::Null
ADD_LABEL, // vertex_id, label
REMOVE_LABEL, // vertex_id, label
REMOVE_VERTEX, // vertex_id
REMOVE_EDGE, // edge_id
BUILD_INDEX // label, property
};
Op() = default;
Op(const Type &type, tx::transaction_id_t tx_id)
: type_(type), transaction_id_(tx_id) {}
// Members valid for every op.
Type type_;
tx::transaction_id_t transaction_id_;
// Hash obtained from the HashedFileWriter obtained after writing all the Op
// values. Cumulative with previous writes.
uint32_t hash_;
// Members valid only for some ops, see Op::Type comments above.
int64_t vertex_id_;
int64_t edge_id_;
int64_t vertex_from_id_;
int64_t vertex_to_id_;
std::string edge_type_;
std::string property_;
PropertyValue value_ = PropertyValue::Null;
std::string label_;
void Encode(HashedFileWriter &writer,
communication::bolt::PrimitiveEncoder<HashedFileWriter>
&encoder) const {
encoder.WriteInt(static_cast<int64_t>(type_));
encoder.WriteInt(static_cast<int64_t>(transaction_id_));
switch (type_) {
case Type::TRANSACTION_BEGIN:
case Type::TRANSACTION_COMMIT:
case Type::TRANSACTION_ABORT:
break;
case Type::CREATE_VERTEX:
encoder.WriteInt(vertex_id_);
break;
case Type::CREATE_EDGE:
encoder.WriteInt(edge_id_);
encoder.WriteInt(vertex_from_id_);
encoder.WriteInt(vertex_to_id_);
encoder.WriteString(edge_type_);
break;
case Type::SET_PROPERTY_VERTEX:
encoder.WriteInt(vertex_id_);
encoder.WriteString(property_);
encoder.WritePropertyValue(value_);
break;
case Type::SET_PROPERTY_EDGE:
encoder.WriteInt(edge_id_);
encoder.WriteString(property_);
encoder.WritePropertyValue(value_);
break;
case Type::ADD_LABEL:
case Type::REMOVE_LABEL:
encoder.WriteInt(vertex_id_);
encoder.WriteString(label_);
break;
case Type::REMOVE_VERTEX:
encoder.WriteInt(vertex_id_);
break;
case Type::REMOVE_EDGE:
encoder.WriteInt(edge_id_);
break;
case Type::BUILD_INDEX:
encoder.WriteString(label_);
encoder.WriteString(property_);
break;
}
writer.WriteValue(writer.hash());
}
#define DECODE_MEMBER(member, value_f) \
if (!decoder.ReadValue(&dv)) return nullopt; \
r_val.member = dv.value_f();
public:
/** Attempts to decode a WAL::Op from the given decoder. Returns the decoded
* value if successful, otherwise returns nullopt. */
static std::experimental::optional<Op> Decode(
HashedFileReader &reader,
communication::bolt::Decoder<HashedFileReader> &decoder) {
using std::experimental::nullopt;
Op r_val;
// The decoded value used as a temporary while decoding.
communication::bolt::DecodedValue dv;
try {
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.type_ = static_cast<Op::Type>(dv.ValueInt());
DECODE_MEMBER(transaction_id_, ValueInt)
switch (r_val.type_) {
case Type::TRANSACTION_BEGIN:
case Type::TRANSACTION_COMMIT:
case Type::TRANSACTION_ABORT:
break;
case Type::CREATE_VERTEX:
DECODE_MEMBER(vertex_id_, ValueInt)
break;
case Type::CREATE_EDGE:
DECODE_MEMBER(edge_id_, ValueInt)
DECODE_MEMBER(vertex_from_id_, ValueInt)
DECODE_MEMBER(vertex_to_id_, ValueInt)
DECODE_MEMBER(edge_type_, ValueString)
break;
case Type::SET_PROPERTY_VERTEX:
DECODE_MEMBER(vertex_id_, ValueInt)
DECODE_MEMBER(property_, ValueString)
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.value_ = static_cast<PropertyValue>(dv);
break;
case Type::SET_PROPERTY_EDGE:
DECODE_MEMBER(edge_id_, ValueInt)
DECODE_MEMBER(property_, ValueString)
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.value_ = static_cast<PropertyValue>(dv);
break;
case Type::ADD_LABEL:
case Type::REMOVE_LABEL:
DECODE_MEMBER(vertex_id_, ValueInt)
DECODE_MEMBER(label_, ValueString)
break;
case Type::REMOVE_VERTEX:
DECODE_MEMBER(vertex_id_, ValueInt)
break;
case Type::REMOVE_EDGE:
DECODE_MEMBER(edge_id_, ValueInt)
break;
case Type::BUILD_INDEX:
DECODE_MEMBER(label_, ValueString)
DECODE_MEMBER(property_, ValueString)
break;
}
auto decoder_hash = reader.hash();
uint64_t encoded_hash;
if (!reader.ReadType(encoded_hash, true)) return nullopt;
if (decoder_hash != encoded_hash) return nullopt;
return r_val;
} catch (communication::bolt::DecodedValueException &) {
return nullopt;
} catch (std::ifstream::failure &) {
return nullopt;
}
}
};
#undef DECODE_MEMBER
WriteAheadLog() {
if (FLAGS_wal_flush_interval_millis >= 0) {
wal_file_.Init();
scheduler_.Run(std::chrono::milliseconds(FLAGS_wal_flush_interval_millis),
[this]() { wal_file_.Flush(ops_); });
}
}
~WriteAheadLog() {
if (FLAGS_wal_flush_interval_millis >= 0) {
scheduler_.Stop();
wal_file_.Flush(ops_);
}
}
/** Enables the WAL. Called at the end of GraphDb construction, after
* (optional) recovery. */
void Enable() { enabled_ = true; }
void TxBegin(tx::transaction_id_t tx_id) {
Emplace({Op::Type::TRANSACTION_BEGIN, tx_id});
}
void TxCommit(tx::transaction_id_t tx_id) {
Emplace({Op::Type::TRANSACTION_COMMIT, tx_id});
}
void TxAbort(tx::transaction_id_t tx_id) {
Emplace({Op::Type::TRANSACTION_ABORT, tx_id});
}
void CreateVertex(tx::transaction_id_t tx_id, int64_t vertex_id) {
Op op(Op::Type::CREATE_VERTEX, tx_id);
op.vertex_id_ = vertex_id;
Emplace(std::move(op));
}
void CreateEdge(tx::transaction_id_t tx_id, int64_t edge_id,
int64_t vertex_from_id, int64_t vertex_to_id,
const std::string &edge_type) {
Op op(Op::Type::CREATE_EDGE, tx_id);
op.edge_id_ = edge_id;
op.vertex_from_id_ = vertex_from_id;
op.vertex_to_id_ = vertex_to_id;
op.edge_type_ = edge_type;
Emplace(std::move(op));
}
void PropsSetVertex(tx::transaction_id_t tx_id, int64_t vertex_id,
const std::string &property, const PropertyValue &value) {
Op op(Op::Type::SET_PROPERTY_VERTEX, tx_id);
op.vertex_id_ = vertex_id;
op.property_ = property;
op.value_ = value;
Emplace(std::move(op));
}
void PropsSetEdge(tx::transaction_id_t tx_id, int64_t edge_id,
const std::string &property, const PropertyValue &value) {
Op op(Op::Type::SET_PROPERTY_EDGE, tx_id);
op.edge_id_ = edge_id;
op.property_ = property;
op.value_ = value;
Emplace(std::move(op));
}
void AddLabel(tx::transaction_id_t tx_id, int64_t vertex_id,
const std::string &label) {
Op op(Op::Type::ADD_LABEL, tx_id);
op.vertex_id_ = vertex_id;
op.label_ = label;
Emplace(std::move(op));
}
void RemoveLabel(tx::transaction_id_t tx_id, int64_t vertex_id,
const std::string &label) {
Op op(Op::Type::REMOVE_LABEL, tx_id);
op.vertex_id_ = vertex_id;
op.label_ = label;
Emplace(std::move(op));
}
void RemoveVertex(tx::transaction_id_t tx_id, int64_t vertex_id) {
Op op(Op::Type::REMOVE_VERTEX, tx_id);
op.vertex_id_ = vertex_id;
Emplace(std::move(op));
}
void RemoveEdge(tx::transaction_id_t tx_id, int64_t edge_id) {
Op op(Op::Type::REMOVE_EDGE, tx_id);
op.edge_id_ = edge_id;
Emplace(std::move(op));
}
void BuildIndex(tx::transaction_id_t tx_id, const std::string &label,
const std::string &property) {
Op op(Op::Type::BUILD_INDEX, tx_id);
op.label_ = label;
op.property_ = property;
Emplace(std::move(op));
}
private:
/** Groups the logic of WAL file handling (flushing, naming, rotating) */
class WalFile {
using path = std::experimental::filesystem::path;
public:
~WalFile() {
if (!current_wal_file_.empty()) writer_.Close();
}
/** Initializes the WAL file. Must be called before first flush. Can be
* called after Flush() to re-initialize stuff. */
void Init() {
if (!std::experimental::filesystem::exists(FLAGS_wal_directory) &&
!std::experimental::filesystem::create_directories(
FLAGS_wal_directory)) {
LOG(ERROR) << "Can't write to WAL directory: " << FLAGS_wal_directory;
current_wal_file_ = path();
} else {
current_wal_file_ = MakeFilePath("__current");
try {
writer_.Open(current_wal_file_);
} catch (std::ios_base::failure &) {
LOG(ERROR) << "Failed to open write-ahead log file: "
<< current_wal_file_;
current_wal_file_ = path();
}
}
latest_tx_ = 0;
current_wal_file_ops_count_ = 0;
}
/** Flushes all the ops in the buffer to the WAL file. If necessary rotates
* the file. */
void Flush(RingBuffer<Op> &buffer) {
if (current_wal_file_.empty()) {
LOG(ERROR) << "Write-ahead log file uninitialized, discarding data.";
buffer.clear();
return;
}
try {
while (true) {
auto op = buffer.pop();
if (!op) break;
latest_tx_ = std::max(latest_tx_, op->transaction_id_);
op->Encode(writer_, encoder_);
if (++current_wal_file_ops_count_ >= FLAGS_wal_rotate_ops_count)
RotateFile();
}
writer_.Flush();
} catch (std::ios_base::failure &) {
LOG(ERROR) << "Failed to write to write-ahead log, discarding data.";
buffer.clear();
return;
} catch (std::experimental::filesystem::filesystem_error &) {
LOG(ERROR) << "Failed to rotate write-ahead log.";
buffer.clear();
return;
}
}
private:
HashedFileWriter writer_;
communication::bolt::PrimitiveEncoder<HashedFileWriter> encoder_{writer_};
// The file to which the WAL flushes data. The path is fixed, the file gets
// moved when the WAL gets rotated.
std::experimental::filesystem::path current_wal_file_;
// Number of Ops in the current wal file.
int current_wal_file_ops_count_{0};
// The latest transaction whose delta is recorded in the current WAL file.
// Zero indicates that no deltas have so far been written to the current WAL
// file.
tx::transaction_id_t latest_tx_{0};
path MakeFilePath(const std::string &suffix) {
return path(FLAGS_wal_directory) /
(Timestamp::now().to_iso8601() + suffix);
}
void RotateFile() {
writer_.Close();
std::experimental::filesystem::rename(
current_wal_file_,
MakeFilePath("__max_transaction_" + std::to_string(latest_tx_)));
Init();
}
};
RingBuffer<Op> ops_{FLAGS_wal_buffer_size};
Scheduler scheduler_;
WalFile wal_file_;
// Used for disabling the WAL during DB recovery.
bool enabled_{false};
// Emplaces the given Op onto the buffer, if the WAL is enabled.
void Emplace(Op &&op) {
if (enabled_ && FLAGS_wal_flush_interval_millis >= 0)
ops_.emplace(std::move(op));
}
};
} // namespace durability

View File

@ -19,17 +19,16 @@ class SerializationError : public utils::BasicException {
template <class T>
class VersionList {
public:
using uptr = std::unique_ptr<VersionList<T>>;
using item_t = T;
/**
* @brief Constructor that is used to insert one item into VersionList.
* @param t - transaction
* @param id - Version list identifier. Uniqueness guaranteed by the code
* creating this version list.
* @param args - args forwarded to constructor of item T (for
* creating the first Record (Version) in this VersionList.
*/
template <typename... Args>
VersionList(const tx::Transaction &t, Args &&... args) {
VersionList(tx::Transaction &t, int64_t id, Args &&... args) : id_(id) {
// TODO replace 'new' with something better
auto *v1 = new T(std::forward<Args>(args)...);
v1->mark_created(t);
@ -215,6 +214,8 @@ class VersionList {
record->mark_expired(t);
}
const int64_t id_;
private:
void lock_and_validate(T *record, tx::Transaction &t) {
DCHECK(record != nullptr) << "Record is nullptr on lock and validation.";
@ -256,3 +257,4 @@ class VersionList {
RecordLock lock_;
};
} // namespace mvcc

View File

@ -8,6 +8,7 @@
#include <list>
#include "communication/result_stream_faker.hpp"
#include "database/graph_db.hpp"
namespace query {

View File

@ -679,9 +679,9 @@ size_t TypedValue::Hash::operator()(const TypedValue &value) const {
return hash;
}
case TypedValue::Type::Vertex:
return value.Value<VertexAccessor>().temporary_id();
return value.Value<VertexAccessor>().id();
case TypedValue::Type::Edge:
return value.Value<EdgeAccessor>().temporary_id();
return value.Value<EdgeAccessor>().id();
case TypedValue::Type::Path:
return FnvCollection<std::vector<VertexAccessor>, VertexAccessor>{}(
value.ValuePath().vertices()) ^

View File

@ -1,6 +1,5 @@
#pragma once
#include "database/graph_db.hpp"
#include "storage/edge.hpp"
#include "storage/record_accessor.hpp"
@ -55,6 +54,6 @@ std::ostream &operator<<(std::ostream &, const EdgeAccessor &);
namespace std {
template <>
struct hash<EdgeAccessor> {
size_t operator()(const EdgeAccessor &e) const { return e.temporary_id(); };
size_t operator()(const EdgeAccessor &e) const { return e.id(); };
};
} // namespace std

View File

@ -10,15 +10,17 @@
/**
* @brief - Garbage collects deleted records.
* @Tparam T type of underlying record in mvcc
* @tparam TCollection - type of collection. Must have a SkipList-like API
* (accessors).
* @tparam TRecord - type of underlying record in mvcc.
*/
template <typename T>
template <typename TCollection, typename TRecord>
class GarbageCollector {
public:
GarbageCollector(SkipList<mvcc::VersionList<T> *> &skiplist,
DeferredDeleter<T> &record_deleter,
DeferredDeleter<mvcc::VersionList<T>> &version_list_deleter)
: skiplist_(skiplist),
GarbageCollector(
TCollection &collection, DeferredDeleter<TRecord> &record_deleter,
DeferredDeleter<mvcc::VersionList<TRecord>> &version_list_deleter)
: collection_(collection),
record_deleter_(record_deleter),
version_list_deleter_(version_list_deleter) {}
@ -31,19 +33,21 @@ class GarbageCollector {
* @param engine - reference to engine object
*/
void Run(const tx::Snapshot &snapshot, const tx::Engine &engine) {
auto collection_accessor = this->skiplist_.access();
auto collection_accessor = collection_.access();
uint64_t count = 0;
std::vector<typename DeferredDeleter<T>::DeletedObject> deleted_records;
std::vector<typename DeferredDeleter<mvcc::VersionList<T>>::DeletedObject>
std::vector<typename DeferredDeleter<TRecord>::DeletedObject>
deleted_records;
std::vector<
typename DeferredDeleter<mvcc::VersionList<TRecord>>::DeletedObject>
deleted_version_lists;
for (auto version_list : collection_accessor) {
for (auto id_vlist : collection_accessor) {
mvcc::VersionList<TRecord> *vlist = id_vlist.second;
// If the version_list is empty, i.e. there is nothing else to be read
// from it we can delete it.
auto ret = version_list->GcDeleted(snapshot, engine);
auto ret = vlist->GcDeleted(snapshot, engine);
if (ret.first) {
deleted_version_lists.emplace_back(version_list,
engine.LockFreeCount());
count += collection_accessor.remove(version_list);
deleted_version_lists.emplace_back(vlist, engine.LockFreeCount());
count += collection_accessor.remove(id_vlist.first);
}
if (ret.second != nullptr)
deleted_records.emplace_back(ret.second, engine.LockFreeCount());
@ -61,7 +65,7 @@ class GarbageCollector {
}
private:
SkipList<mvcc::VersionList<T> *> &skiplist_;
DeferredDeleter<T> &record_deleter_;
DeferredDeleter<mvcc::VersionList<T>> &version_list_deleter_;
TCollection &collection_;
DeferredDeleter<TRecord> &record_deleter_;
DeferredDeleter<mvcc::VersionList<TRecord>> &version_list_deleter_;
};

View File

@ -18,14 +18,60 @@ const PropertyValue &RecordAccessor<TRecord>::PropsAt(
return current().properties_.at(key);
}
template <typename TRecord>
size_t RecordAccessor<TRecord>::PropsErase(GraphDbTypes::Property key) {
template <>
void RecordAccessor<Vertex>::PropsSet(GraphDbTypes::Property key,
PropertyValue value) {
Vertex &vertex = update();
vertex.properties_.set(key, value);
auto &dba = db_accessor();
dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->id_,
dba.PropertyName(key), value);
db_accessor().UpdatePropertyIndex(key, *this, &vertex);
}
template <>
void RecordAccessor<Edge>::PropsSet(GraphDbTypes::Property key,
PropertyValue value) {
update().properties_.set(key, value);
auto &dba = db_accessor();
dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->id_,
dba.PropertyName(key), value);
}
template <>
size_t RecordAccessor<Vertex>::PropsErase(GraphDbTypes::Property key) {
auto &dba = db_accessor();
dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->id_,
dba.PropertyName(key), PropertyValue::Null);
return update().properties_.erase(key);
}
template <typename TRecord>
void RecordAccessor<TRecord>::PropsClear() {
update().properties_.clear();
template <>
size_t RecordAccessor<Edge>::PropsErase(GraphDbTypes::Property key) {
auto &dba = db_accessor();
dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->id_,
dba.PropertyName(key), PropertyValue::Null);
return update().properties_.erase(key);
}
template <>
void RecordAccessor<Vertex>::PropsClear() {
auto &updated = update();
auto &dba = db_accessor();
for (const auto &kv : updated.properties_)
dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->id_,
dba.PropertyName(kv.first), PropertyValue::Null);
updated.properties_.clear();
}
template <>
void RecordAccessor<Edge>::PropsClear() {
auto &updated = update();
auto &dba = db_accessor();
for (const auto &kv : updated.properties_)
dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->id_,
dba.PropertyName(kv.first), PropertyValue::Null);
updated.properties_.clear();
}
template <typename TRecord>
@ -39,11 +85,6 @@ GraphDbAccessor &RecordAccessor<TRecord>::db_accessor() const {
return *db_accessor_;
}
template <typename TRecord>
uint64_t RecordAccessor<TRecord>::temporary_id() const {
return (uint64_t)vlist_;
}
template <typename TRecord>
RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchNew() {
if (!new_) {
@ -83,18 +124,5 @@ const TRecord &RecordAccessor<TRecord>::current() const {
return *current_;
}
template <>
void RecordAccessor<Vertex>::PropsSet(GraphDbTypes::Property key,
PropertyValue value) {
Vertex &vertex = update();
vertex.properties_.set(key, value);
this->db_accessor().UpdatePropertyIndex(key, *this, &vertex);
}
template <>
void RecordAccessor<Edge>::PropsSet(GraphDbTypes::Property key,
PropertyValue value) {
update().properties_.set(key, value);
}
template class RecordAccessor<Vertex>;
template class RecordAccessor<Edge>;

View File

@ -1,6 +1,6 @@
#pragma once
#include "database/graph_db.hpp"
#include "database/graph_db_datatypes.hpp"
#include "mvcc/version_list.hpp"
#include "storage/property_value.hpp"
#include "utils/total_ordering.hpp"
@ -114,21 +114,11 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
*/
GraphDbAccessor &db_accessor() const;
/**
* Returns a temporary ID of the record stored in this accessor.
*
* This function returns a number that represents the current memory
* location where the record is stored. That number is used only as an
* identification for the database snapshotter. The snapshotter needs an
* ID so that when the database is saved to disk that it can be successfully
* reconstructed.
* IMPORTANT: The ID is valid for identifying graph elements observed in
* the same transaction. It is not valid for comparing graph elements
* observed in different transactions.
*
* @return See above.
/** Returns a database-unique index of this vertex or edge. Note that vertices
* and edges have separate ID domains, there can be a vertex with ID X and an
* edge with the same id.
*/
uint64_t temporary_id() const;
int64_t id() const { return vlist_->id_; }
/*
* Switches this record accessor to use the latest

View File

@ -18,7 +18,9 @@ bool VertexAccessor::add_label(GraphDbTypes::Label label) {
// not a duplicate label, add it
Vertex &vertex = update();
vertex.labels_.emplace_back(label);
this->db_accessor().UpdateLabelIndices(label, *this, &vertex);
auto &dba = db_accessor();
dba.UpdateLabelIndices(label, *this, &vertex);
dba.wal().AddLabel(dba.transaction_id(), id(), dba.LabelName(label));
return true;
}
@ -29,6 +31,8 @@ size_t VertexAccessor::remove_label(GraphDbTypes::Label label) {
std::swap(*found, labels.back());
labels.pop_back();
auto &dba = db_accessor();
dba.wal().RemoveLabel(dba.transaction_id(), id(), dba.LabelName(label));
return 1;
}

View File

@ -6,11 +6,9 @@
#include "cppitertools/chain.hpp"
#include "database/graph_db.hpp"
#include "storage/record_accessor.hpp"
#include "storage/util.hpp"
#include "storage/vertex.hpp"
#include "storage/edge_accessor.hpp"
/**
@ -142,6 +140,6 @@ std::ostream &operator<<(std::ostream &, const VertexAccessor &);
namespace std {
template <>
struct hash<VertexAccessor> {
size_t operator()(const VertexAccessor &v) const { return v.temporary_id(); };
size_t operator()(const VertexAccessor &v) const { return v.id(); };
};
}

View File

@ -34,6 +34,9 @@ class Engine : Lockable<SpinLock> {
std::numeric_limits<decltype(std::declval<Transaction>().cid())>::max();
public:
Engine() = default;
/** Begins a transaction and returns a pointer to
* it's object.
*
@ -42,7 +45,7 @@ class Engine : Lockable<SpinLock> {
* committted or aborted.
*/
Transaction *Begin() {
auto guard = this->acquire_unique();
auto guard = acquire_unique();
transaction_id_t id{++counter_};
auto t = new Transaction(id, active_, *this);
@ -61,7 +64,7 @@ class Engine : Lockable<SpinLock> {
* @return Pointer to the transaction object for id.
*/
Transaction &Advance(transaction_id_t id) {
auto guard = this->acquire_unique();
auto guard = acquire_unique();
auto *t = store_.get(id);
DCHECK(t != nullptr) << "Transaction::advance on non-existing transaction";
@ -88,7 +91,7 @@ class Engine : Lockable<SpinLock> {
* documentation).
*/
Snapshot GcSnapshot() {
auto guard = this->acquire_unique();
auto guard = acquire_unique();
// No active transactions.
if (active_.size() == 0) {
@ -106,18 +109,16 @@ class Engine : Lockable<SpinLock> {
/** Comits the given transaction. Deletes the transaction object, it's not
* valid after this function executes. */
void Commit(const Transaction &t) {
auto guard = this->acquire_unique();
auto guard = acquire_unique();
clog_.set_committed(t.id_);
Finalize(t);
}
/** Aborts the given transaction. Deletes the transaction object, it's not
* valid after this function executes. */
void Abort(const Transaction &t) {
auto guard = this->acquire_unique();
auto guard = acquire_unique();
clog_.set_aborted(t.id_);
Finalize(t);
}
@ -128,20 +129,20 @@ class Engine : Lockable<SpinLock> {
/** The total number of transactions that have executed since the creation of
* this engine */
auto Count() const {
auto guard = this->acquire_unique();
return counter_.load();
tx::transaction_id_t Count() const {
auto guard = acquire_unique();
return counter_;
}
/** The count of currently active transactions */
int64_t ActiveCount() const {
auto guard = this->acquire_unique();
auto guard = acquire_unique();
return active_.size();
}
/** Calls function f on each active transaction. */
void ForEachActiveTransaction(std::function<void(Transaction &)> f) {
auto guard = this->acquire_unique();
auto guard = acquire_unique();
for (auto transaction : active_) {
f(*store_.get(transaction));
}

View File

@ -13,6 +13,7 @@
#include <vector>
#include "data_structures/concurrent/skiplist.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/graph_db_datatypes.hpp"
#include "mvcc/version_list.hpp"
#include "storage/property_value.hpp"

View File

@ -13,7 +13,7 @@ class Timer {
}
private:
const std::chrono::time_point<std::chrono::steady_clock> start_time_ =
std::chrono::time_point<std::chrono::steady_clock> start_time_ =
std::chrono::steady_clock::now();
};
};

View File

@ -20,7 +20,7 @@ void MvccMix(benchmark::State &state) {
state.PauseTiming();
tx::Engine engine;
auto t1 = engine.Begin();
mvcc::VersionList<Prop> version_list(*t1);
mvcc::VersionList<Prop> version_list(*t1, 0);
t1->Commit();
auto t2 = engine.Begin();

View File

@ -75,6 +75,8 @@ class Writer {
durability::kMagicNumber.size());
encoder_.WriteTypedValue(durability::kVersion);
// Transactional ID of the snapshooter.
encoder_.WriteInt(0);
// Transactional Snapshot is an empty list of transaction IDs.
encoder_.WriteList(std::vector<query::TypedValue>{});
}

View File

@ -195,13 +195,13 @@ TEST(BoltEncoder, VertexAndEdge) {
// and Memgraph now encodes IDs so we need to check the output
// part by part.
CheckOutput(output, vertexedge_encoded, 5, false);
CheckInt(output, va1.temporary_id());
CheckInt(output, va1.id());
CheckOutput(output, vertexedge_encoded + 6, 34, false);
CheckInt(output, va2.temporary_id());
CheckInt(output, va2.id());
CheckOutput(output, vertexedge_encoded + 41, 4, false);
CheckInt(output, ea.temporary_id());
CheckInt(output, va1.temporary_id());
CheckInt(output, va2.temporary_id());
CheckInt(output, ea.id());
CheckInt(output, va1.id());
CheckInt(output, va2.id());
CheckOutput(output, vertexedge_encoded + 48, 26);
}

View File

@ -16,7 +16,7 @@ TEST(LabelsIndex, UniqueInsert) {
GraphDbAccessor dba(db);
tx::Engine engine;
auto t1 = engine.Begin();
mvcc::VersionList<Vertex> vlist(*t1);
mvcc::VersionList<Vertex> vlist(*t1, 0);
t1->Commit();
auto t2 = engine.Begin();
@ -45,8 +45,8 @@ TEST(LabelsIndex, UniqueFilter) {
tx::Engine engine;
auto t1 = engine.Begin();
mvcc::VersionList<Vertex> vlist1(*t1);
mvcc::VersionList<Vertex> vlist2(*t1);
mvcc::VersionList<Vertex> vlist1(*t1, 0);
mvcc::VersionList<Vertex> vlist2(*t1, 1);
engine.Advance(t1->id_);
auto r1v1 = vlist1.find(*t1);
auto r1v2 = vlist2.find(*t1);
@ -86,8 +86,8 @@ TEST(LabelsIndex, Refresh) {
// add two vertices to database
auto t1 = engine.Begin();
mvcc::VersionList<Vertex> vlist1(*t1);
mvcc::VersionList<Vertex> vlist2(*t1);
mvcc::VersionList<Vertex> vlist1(*t1, 0);
mvcc::VersionList<Vertex> vlist2(*t1, 1);
engine.Advance(t1->id_);
auto v1r1 = vlist1.find(*t1);

View File

@ -22,7 +22,7 @@ class LabelPropertyIndexComplexTest : public ::testing::Test {
index.IndexFinishedBuilding(*key);
t = engine.Begin();
vlist = new mvcc::VersionList<Vertex>(*t);
vlist = new mvcc::VersionList<Vertex>(*t, 0);
engine.Advance(t->id_);
vertex = vlist->find(*t);

631
tests/unit/durability.cpp Normal file
View File

@ -0,0 +1,631 @@
#include <cstdio>
#include <experimental/filesystem>
#include <experimental/optional>
#include <functional>
#include <random>
#include <unordered_map>
#include <unordered_set>
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "gtest/gtest.h"
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/recovery.hpp"
#include "durability/snapshooter.hpp"
#include "durability/version.hpp"
#include "utils/string.hpp"
DECLARE_string(snapshot_directory);
DECLARE_bool(snapshot_on_exit);
DECLARE_int32(snapshot_cycle_sec);
DECLARE_bool(snapshot_recover_on_startup);
DECLARE_int32(wal_flush_interval_millis);
DECLARE_int32(wal_rotate_ops_count);
namespace fs = std::experimental::filesystem;
// Helper class for performing random CRUD ops on a database.
class DbGenerator {
static constexpr int kLabelCount = 3;
static constexpr int kPropertyCount = 4;
static constexpr int kEdgeTypeCount = 2;
auto Label(int i) { return dba_.Label("label" + std::to_string(i)); }
auto Property(int i) { return dba_.Property("property" + std::to_string(i)); }
auto EdgeType(int i) {
return dba_.EdgeType("edge_type" + std::to_string(i));
}
public:
DbGenerator(GraphDbAccessor &dba) : dba_(dba) {}
void BuildIndex(int seq_number) {
dba_.BuildIndex(Label(seq_number % kLabelCount),
Property(seq_number % kPropertyCount));
}
EdgeAccessor RandomEdge(bool remove_from_ids = false) {
return *dba_.FindEdge(RandomElement(edge_ids_, remove_from_ids), true);
}
VertexAccessor RandomVertex(bool remove_from_ids = false) {
return *dba_.FindVertex(RandomElement(vertex_ids_, remove_from_ids), true);
}
VertexAccessor InsertVertex() {
auto vertex = dba_.InsertVertex();
vertex_ids_.emplace_back(vertex.id());
return vertex;
}
void DetachRemoveVertex() {
auto vertex = RandomVertex(true);
dba_.RemoveVertex(vertex);
}
EdgeAccessor InsertEdge() {
auto from = RandomVertex();
auto to = RandomVertex();
auto edge = dba_.InsertEdge(from, to, EdgeType(RandomInt(kEdgeTypeCount)));
edge_ids_.emplace_back(edge.id());
return edge;
}
void RemoveEdge() {
auto edge = RandomEdge(true);
dba_.RemoveEdge(edge);
}
void SetVertexProperty() {
auto vertex = RandomVertex();
vertex.PropsSet(Property(RandomInt(kPropertyCount)), RandomValue());
}
void EraseVertexProperty() {
auto v = RandomVertex();
for (int i = 0; i < kPropertyCount; i++) v.PropsErase(Property(i));
}
void ClearVertexProperties() { RandomVertex().PropsClear(); }
void SetEdgeProperty() {
auto edge = RandomEdge();
edge.PropsSet(Property(RandomInt(kPropertyCount)), RandomValue());
}
void EraseEdgeProperty() {
auto e = RandomEdge();
for (int i = 0; i < kPropertyCount; i++) e.PropsErase(Property(i));
}
void ClearEdgeProperties() { RandomEdge().PropsClear(); }
void AddLabel() {
auto vertex = RandomVertex();
vertex.add_label(Label(RandomInt(kLabelCount)));
}
void ClearLabels() {
auto vertex = RandomVertex();
auto labels = vertex.labels();
for (auto label : labels) vertex.remove_label(label);
}
private:
GraphDbAccessor &dba_;
std::vector<int64_t> vertex_ids_;
std::vector<int64_t> edge_ids_;
std::mt19937 gen_{std::random_device{}()};
std::uniform_real_distribution<> rand_{0.0, 1.0};
int64_t RandomElement(std::vector<int64_t> &collection, bool remove = false) {
DCHECK(!collection.empty()) << "Random element from empty collection";
int64_t id = RandomInt(collection.size());
int64_t r_val = collection[id];
if (remove) {
collection[id] = collection.back();
collection.resize(collection.size() - 1);
}
return r_val;
}
int64_t RandomInt(int64_t upper_bound) { return rand_(gen_) * upper_bound; }
PropertyValue RandomValue() {
switch (RandomInt(3)) {
case 0:
return rand_(gen_); // Float
case 1:
return RandomInt(1000);
case 2:
return rand_(gen_) < 0.5;
default:
LOG(FATAL) << "Unsupported random value";
}
}
};
/** Returns true if the given databases have the same contents (indices,
* vertices and edges). */
void CompareDbs(GraphDb &a, GraphDb &b) {
GraphDbAccessor dba_a(a);
GraphDbAccessor dba_b(b);
{
auto index_a = dba_a.IndexInfo();
auto index_b = dba_b.IndexInfo();
EXPECT_TRUE(
index_a.size() == index_b.size() &&
std::is_permutation(index_a.begin(), index_a.end(), index_b.begin()))
<< "Indexes not equal [" << utils::Join(index_a, ", ") << "] != ["
<< utils::Join(index_b, ", ");
}
auto is_permutation_props = [&dba_a, &dba_b](const auto &p1, const auto p2) {
return p1.size() == p2.size() &&
std::is_permutation(
p1.begin(), p1.end(), p2.begin(),
[&dba_a, &dba_b](const auto &p1, const auto &p2) {
return dba_a.PropertyName(p1.first) ==
dba_b.PropertyName(p2.first) &&
query::TypedValue::BoolEqual{}(p1.second, p2.second);
});
};
{
int vertices_a_count = 0;
for (auto v_a : dba_a.Vertices(false)) {
vertices_a_count++;
auto v_b = dba_b.FindVertex(v_a.id(), false);
ASSERT_TRUE(v_b) << "Vertex not found, id: " << v_a.id();
ASSERT_EQ(v_a.labels().size(), v_b->labels().size());
EXPECT_TRUE(std::is_permutation(
v_a.labels().begin(), v_a.labels().end(), v_b->labels().begin(),
[&dba_a, &dba_b](const auto &la, const auto &lb) {
return dba_a.LabelName(la) == dba_b.LabelName(lb);
}));
EXPECT_TRUE(is_permutation_props(v_a.Properties(), v_b->Properties()));
}
auto vertices_b = dba_b.Vertices(false);
EXPECT_EQ(std::distance(vertices_b.begin(), vertices_b.end()),
vertices_a_count);
}
{
int edges_a_count = 0;
for (auto e_a : dba_a.Edges(false)) {
edges_a_count++;
auto e_b = dba_b.FindEdge(e_a.id(), false);
ASSERT_TRUE(e_b);
ASSERT_TRUE(e_b) << "Edge not found, id: " << e_a.id();
EXPECT_EQ(dba_a.EdgeTypeName(e_a.EdgeType()),
dba_b.EdgeTypeName(e_b->EdgeType()));
EXPECT_EQ(e_a.from().id(), e_b->from().id());
EXPECT_EQ(e_a.to().id(), e_b->to().id());
EXPECT_TRUE(is_permutation_props(e_a.Properties(), e_b->Properties()));
}
auto edges_b = dba_b.Edges(false);
EXPECT_EQ(std::distance(edges_b.begin(), edges_b.end()), edges_a_count);
}
}
const fs::path kSnapshotDir =
fs::temp_directory_path() / "MG_test_unit_durability" / "snapshot";
const fs::path kWalDir =
fs::temp_directory_path() / "MG_test_unit_durability" / "wal";
void CleanDurability() {
if (fs::exists(kSnapshotDir))
for (auto file : fs::directory_iterator(kSnapshotDir)) fs::remove(file);
if (fs::exists(kWalDir))
for (auto file : fs::directory_iterator(kWalDir)) fs::remove(file);
}
std::vector<fs::path> DirFiles(fs::path dir) {
std::vector<fs::path> files;
for (auto &file : fs::directory_iterator(dir)) files.push_back(file.path());
return files;
}
void MakeSnapshot(GraphDb &db, int snapshot_max_retained = -1) {
GraphDbAccessor dba(db);
durability::MakeSnapshot(dba, kSnapshotDir, snapshot_max_retained);
dba.Commit();
}
fs::path GetLastFile(fs::path dir) {
std::vector<fs::path> files = DirFiles(dir);
CHECK(static_cast<int>(files.size()) > 0) << "No files in folder.";
return *std::max_element(files.begin(), files.end());
}
void MakeDb(GraphDbAccessor &dba, int scale, std::vector<int> indices = {}) {
DbGenerator generator{dba};
for (int i = 0; i < scale; i++) generator.InsertVertex();
for (int i = 0; i < scale * 2; i++) generator.InsertEdge();
// Give the WAL some time to flush, we're pumping ops fast here.
std::this_thread::sleep_for(std::chrono::milliseconds(30));
for (int i = 0; i < scale * 3; i++) {
generator.SetVertexProperty();
generator.SetEdgeProperty();
generator.AddLabel();
if (i % 500 == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(30));
}
for (int i = 0; i < scale / 2; i++) {
generator.ClearLabels();
generator.EraseEdgeProperty();
generator.EraseVertexProperty();
generator.ClearEdgeProperties();
generator.ClearVertexProperties();
if (i % 500 == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(30));
}
for (auto index : indices) generator.BuildIndex(index);
}
void MakeDb(GraphDb &db, int scale, std::vector<int> indices = {}) {
GraphDbAccessor dba{db};
MakeDb(dba, scale, indices);
dba.Commit();
}
class Durability : public ::testing::Test {
protected:
void SetUp() override {
CleanDurability();
FLAGS_snapshot_cycle_sec = -1;
FLAGS_snapshot_directory = kSnapshotDir;
FLAGS_snapshot_on_exit = false;
FLAGS_wal_flush_interval_millis = -1;
FLAGS_wal_directory = kWalDir;
FLAGS_snapshot_recover_on_startup = false;
}
void TearDown() override { CleanDurability(); }
};
TEST_F(Durability, WalEncoding) {
FLAGS_wal_flush_interval_millis = 1;
FLAGS_wal_rotate_ops_count = 5000;
{
GraphDb db;
GraphDbAccessor dba(db);
auto v0 = dba.InsertVertex();
ASSERT_EQ(v0.id(), 0);
v0.add_label(dba.Label("l0"));
v0.PropsSet(dba.Property("p0"), 42);
auto v1 = dba.InsertVertex();
ASSERT_EQ(v1.id(), 1);
auto e0 = dba.InsertEdge(v0, v1, dba.EdgeType("et0"));
ASSERT_EQ(e0.id(), 0);
e0.PropsSet(dba.Property("p0"), std::vector<PropertyValue>{1, 2, 3});
dba.BuildIndex(dba.Label("l1"), dba.Property("p1"));
dba.Commit();
}
// Sleep to ensure the WAL gets flushed.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
HashedFileReader reader;
ASSERT_EQ(DirFiles(kWalDir).size(), 1);
ASSERT_TRUE(reader.Open(GetLastFile(kWalDir)));
communication::bolt::Decoder<HashedFileReader> decoder{reader};
std::vector<durability::WriteAheadLog::Op> ops;
while (true) {
auto op = durability::WriteAheadLog::Op::Decode(reader, decoder);
if (op) {
ops.emplace_back(*op);
} else {
break;
}
}
reader.Close();
ASSERT_EQ(ops.size(), 13);
using Type = durability::WriteAheadLog::Op::Type;
EXPECT_EQ(ops[0].type_, Type::TRANSACTION_BEGIN);
EXPECT_EQ(ops[0].transaction_id_, 1);
EXPECT_EQ(ops[1].type_, Type::CREATE_VERTEX);
EXPECT_EQ(ops[1].transaction_id_, 1);
EXPECT_EQ(ops[1].vertex_id_, 0);
EXPECT_EQ(ops[2].type_, Type::ADD_LABEL);
EXPECT_EQ(ops[2].transaction_id_, 1);
EXPECT_EQ(ops[2].label_, "l0");
EXPECT_EQ(ops[3].type_, Type::SET_PROPERTY_VERTEX);
EXPECT_EQ(ops[3].transaction_id_, 1);
EXPECT_EQ(ops[3].vertex_id_, 0);
EXPECT_EQ(ops[3].property_, "p0");
EXPECT_EQ(ops[3].value_.type(), PropertyValue::Type::Int);
EXPECT_EQ(ops[3].value_.Value<int64_t>(), 42);
EXPECT_EQ(ops[4].type_, Type::CREATE_VERTEX);
EXPECT_EQ(ops[4].transaction_id_, 1);
EXPECT_EQ(ops[4].vertex_id_, 1);
EXPECT_EQ(ops[5].type_, Type::CREATE_EDGE);
EXPECT_EQ(ops[5].transaction_id_, 1);
EXPECT_EQ(ops[5].edge_id_, 0);
EXPECT_EQ(ops[5].vertex_from_id_, 0);
EXPECT_EQ(ops[5].vertex_to_id_, 1);
EXPECT_EQ(ops[5].edge_type_, "et0");
EXPECT_EQ(ops[6].type_, Type::SET_PROPERTY_EDGE);
EXPECT_EQ(ops[6].transaction_id_, 1);
EXPECT_EQ(ops[6].edge_id_, 0);
EXPECT_EQ(ops[6].property_, "p0");
EXPECT_EQ(ops[6].value_.type(), PropertyValue::Type::List);
// The next four ops are the BuildIndex internal transactions.
EXPECT_EQ(ops[7].type_, Type::TRANSACTION_BEGIN);
EXPECT_EQ(ops[8].type_, Type::TRANSACTION_COMMIT);
EXPECT_EQ(ops[9].type_, Type::TRANSACTION_BEGIN);
EXPECT_EQ(ops[10].type_, Type::TRANSACTION_COMMIT);
EXPECT_EQ(ops[11].type_, Type::BUILD_INDEX);
EXPECT_EQ(ops[11].label_, "l1");
EXPECT_EQ(ops[11].property_, "p1");
EXPECT_EQ(ops[12].type_, Type::TRANSACTION_COMMIT);
EXPECT_EQ(ops[12].transaction_id_, 1);
}
TEST_F(Durability, SnapshotEncoding) {
{
GraphDb db;
GraphDbAccessor dba(db);
auto v0 = dba.InsertVertex();
ASSERT_EQ(v0.id(), 0);
v0.add_label(dba.Label("l0"));
v0.PropsSet(dba.Property("p0"), 42);
auto v1 = dba.InsertVertex();
ASSERT_EQ(v1.id(), 1);
v1.add_label(dba.Label("l0"));
v1.add_label(dba.Label("l1"));
auto v2 = dba.InsertVertex();
ASSERT_EQ(v2.id(), 2);
v2.PropsSet(dba.Property("p0"), true);
v2.PropsSet(dba.Property("p1"), "Johnny");
auto e0 = dba.InsertEdge(v0, v1, dba.EdgeType("et0"));
ASSERT_EQ(e0.id(), 0);
e0.PropsSet(dba.Property("p0"), std::vector<PropertyValue>{1, 2, 3});
auto e1 = dba.InsertEdge(v2, v1, dba.EdgeType("et1"));
ASSERT_EQ(e1.id(), 1);
dba.BuildIndex(dba.Label("l1"), dba.Property("p1"));
dba.Commit();
MakeSnapshot(db);
}
auto snapshot = GetLastFile(kSnapshotDir);
HashedFileReader buffer;
communication::bolt::Decoder<HashedFileReader> decoder(buffer);
int64_t vertex_count, edge_count;
uint64_t hash;
ASSERT_TRUE(buffer.Open(snapshot));
ASSERT_TRUE(
durability::ReadSnapshotSummary(buffer, vertex_count, edge_count, hash));
ASSERT_EQ(vertex_count, 3);
ASSERT_EQ(edge_count, 2);
auto magic_number = durability::kMagicNumber;
buffer.Read(magic_number.data(), magic_number.size());
ASSERT_EQ(magic_number, durability::kMagicNumber);
communication::bolt::DecodedValue dv;
decoder.ReadValue(&dv);
ASSERT_EQ(dv.ValueInt(), durability::kVersion);
// Transaction ID.
decoder.ReadValue(&dv);
ASSERT_TRUE(dv.IsInt());
// Transactional snapshot.
decoder.ReadValue(&dv);
ASSERT_TRUE(dv.IsList());
// Label property indices.
decoder.ReadValue(&dv);
ASSERT_EQ(dv.ValueList().size(), 2);
EXPECT_EQ(dv.ValueList()[0].ValueString(), "l1");
EXPECT_EQ(dv.ValueList()[1].ValueString(), "p1");
std::map<int64_t, communication::bolt::DecodedVertex> decoded_vertices;
std::map<int64_t, communication::bolt::DecodedEdge> decoded_edges;
// Decode vertices.
for (int i = 0; i < vertex_count; ++i) {
decoder.ReadValue(&dv);
ASSERT_EQ(dv.type(), communication::bolt::DecodedValue::Type::Vertex);
auto &vertex = dv.ValueVertex();
decoded_vertices.emplace(vertex.id, vertex);
}
ASSERT_EQ(decoded_vertices.size(), 3);
ASSERT_EQ(decoded_vertices[0].labels.size(), 1);
EXPECT_EQ(decoded_vertices[0].labels[0], "l0");
ASSERT_EQ(decoded_vertices[0].properties.size(), 1);
EXPECT_EQ(decoded_vertices[0].properties["p0"].ValueInt(), 42);
EXPECT_EQ(decoded_vertices[1].labels.size(), 2);
EXPECT_EQ(decoded_vertices[1].properties.size(), 0);
EXPECT_EQ(decoded_vertices[2].labels.size(), 0);
EXPECT_EQ(decoded_vertices[2].properties.size(), 2);
// Decode edges.
for (int i = 0; i < edge_count; ++i) {
decoder.ReadValue(&dv);
ASSERT_EQ(dv.type(), communication::bolt::DecodedValue::Type::Edge);
auto &edge = dv.ValueEdge();
decoded_edges.emplace(edge.id, edge);
}
ASSERT_EQ(decoded_edges.size(), 2);
ASSERT_EQ(decoded_edges[0].from, 0);
ASSERT_EQ(decoded_edges[0].to, 1);
ASSERT_EQ(decoded_edges[0].type, "et0");
ASSERT_EQ(decoded_edges[0].properties.size(), 1);
ASSERT_EQ(decoded_edges[1].from, 2);
ASSERT_EQ(decoded_edges[1].to, 1);
ASSERT_EQ(decoded_edges[1].type, "et1");
ASSERT_EQ(decoded_edges[1].properties.size(), 0);
// Vertex and edge counts are included in the hash. Re-read them to update the
// hash.
buffer.ReadType(vertex_count);
buffer.ReadType(edge_count);
buffer.Close();
EXPECT_EQ(buffer.hash(), hash);
}
TEST_F(Durability, SnapshotRecovery) {
GraphDb db;
MakeDb(db, 300, {0, 1, 2});
MakeDb(db, 300);
MakeDb(db, 300, {3, 4});
MakeSnapshot(db);
{
FLAGS_snapshot_recover_on_startup = true;
GraphDb recovered;
CompareDbs(db, recovered);
}
}
TEST_F(Durability, WalRecovery) {
FLAGS_wal_flush_interval_millis = 2;
FLAGS_wal_rotate_ops_count = 5000;
GraphDb db;
MakeDb(db, 300, {0, 1, 2});
MakeDb(db, 300);
MakeDb(db, 300, {3, 4});
// Sleep to ensure the WAL gets flushed.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
ASSERT_EQ(DirFiles(kSnapshotDir).size(), 0);
EXPECT_GT(DirFiles(kWalDir).size(), 1);
{
FLAGS_snapshot_recover_on_startup = true;
GraphDb recovered;
CompareDbs(db, recovered);
}
}
TEST_F(Durability, SnapshotAndWalRecovery) {
FLAGS_wal_flush_interval_millis = 2;
FLAGS_wal_rotate_ops_count = 1000;
GraphDb db;
MakeDb(db, 300, {0, 1, 2});
MakeDb(db, 300);
MakeSnapshot(db);
MakeDb(db, 300, {3, 4});
MakeDb(db, 300);
MakeDb(db, 300, {5});
// Sleep to ensure the WAL gets flushed.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
ASSERT_EQ(DirFiles(kSnapshotDir).size(), 1);
EXPECT_GT(DirFiles(kWalDir).size(), 1);
{
FLAGS_snapshot_recover_on_startup = true;
GraphDb recovered;
CompareDbs(db, recovered);
}
}
TEST_F(Durability, SnapshotAndWalRecoveryAfterComplexTxSituation) {
FLAGS_wal_flush_interval_millis = 2;
FLAGS_wal_rotate_ops_count = 1000;
GraphDb db;
// The first transaction modifies and commits.
GraphDbAccessor dba_1{db};
MakeDb(dba_1, 100);
dba_1.Commit();
// The second transaction will commit after snapshot.
GraphDbAccessor dba_2{db};
MakeDb(dba_2, 100);
// The third transaction modifies and commits.
GraphDbAccessor dba_3{db};
MakeDb(dba_3, 100);
dba_3.Commit();
MakeSnapshot(db); // Snapshooter takes the fourth transaction.
dba_2.Commit();
// The fifth transaction starts and commits after snapshot.
GraphDbAccessor dba_5{db};
MakeDb(dba_5, 100);
dba_5.Commit();
// The sixth transaction will not commit at all.
GraphDbAccessor dba_6{db};
MakeDb(dba_6, 100);
auto VisibleVertexCount = [](GraphDb &db) {
GraphDbAccessor dba{db};
auto vertices = dba.Vertices(false);
return std::distance(vertices.begin(), vertices.end());
};
ASSERT_EQ(VisibleVertexCount(db), 400);
// Sleep to ensure the WAL gets flushed.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
ASSERT_EQ(DirFiles(kSnapshotDir).size(), 1);
EXPECT_GT(DirFiles(kWalDir).size(), 1);
{
FLAGS_snapshot_recover_on_startup = true;
GraphDb recovered;
ASSERT_EQ(VisibleVertexCount(recovered), 400);
CompareDbs(db, recovered);
}
}
TEST_F(Durability, NoWalDuringRecovery) {
FLAGS_wal_flush_interval_millis = 2;
FLAGS_wal_rotate_ops_count = 1000;
GraphDb db;
MakeDb(db, 300, {0, 1, 2});
// Sleep to ensure the WAL gets flushed.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
auto wal_files_before = DirFiles(kWalDir);
ASSERT_GT(wal_files_before.size(), 3);
{
FLAGS_snapshot_recover_on_startup = true;
GraphDb recovered;
CompareDbs(db, recovered);
auto wal_files_after = DirFiles(kWalDir);
// We get an extra file for the "current" wal of the recovered db.
EXPECT_EQ(wal_files_after.size(), wal_files_before.size() + 1);
}
}
TEST_F(Durability, SnapshotRetention) {
GraphDb db;
for (auto &pair : {std::pair<int, int>{5, 10}, {5, 3}, {7, -1}}) {
CleanDurability();
int count, retain;
std::tie(count, retain) = pair;
// Track the added snapshots to ensure the correct ones are pruned.
std::unordered_set<std::string> snapshots;
for (int i = 0; i < count; ++i) {
GraphDbAccessor dba(db);
durability::MakeSnapshot(dba, kSnapshotDir, retain);
auto latest = GetLastFile(kSnapshotDir);
snapshots.emplace(GetLastFile(kSnapshotDir));
// Ensures that the latest snapshot was not in the snapshots collection
// before. Thus ensures that it wasn't pruned.
EXPECT_EQ(snapshots.size(), i + 1);
}
EXPECT_EQ(DirFiles(kSnapshotDir).size(),
std::min(count, retain < 0 ? count : retain));
};
}
TEST_F(Durability, SnapshotOnExit) {
FLAGS_snapshot_directory = kSnapshotDir;
FLAGS_snapshot_on_exit = true;
{ GraphDb graph_db; }
EXPECT_EQ(DirFiles(kSnapshotDir).size(), 1);
}

View File

@ -18,19 +18,36 @@ TEST(GraphDbAccessorTest, InsertVertex) {
EXPECT_EQ(Count(accessor.Vertices(false)), 0);
accessor.InsertVertex();
EXPECT_EQ(accessor.InsertVertex().id(), 0);
EXPECT_EQ(Count(accessor.Vertices(false)), 0);
EXPECT_EQ(Count(accessor.Vertices(true)), 1);
accessor.AdvanceCommand();
EXPECT_EQ(Count(accessor.Vertices(false)), 1);
accessor.InsertVertex();
EXPECT_EQ(accessor.InsertVertex().id(), 1);
EXPECT_EQ(Count(accessor.Vertices(false)), 1);
EXPECT_EQ(Count(accessor.Vertices(true)), 2);
accessor.AdvanceCommand();
EXPECT_EQ(Count(accessor.Vertices(false)), 2);
}
TEST(GraphDbAccessorTest, UniqueVertexId) {
GraphDb db;
SkipList<int64_t> ids;
std::vector<std::thread> threads;
for (int i = 0; i < 50; i++) {
threads.emplace_back([&db, &ids]() {
GraphDbAccessor dba(db);
auto access = ids.access();
for (int i = 0; i < 200; i++) access.insert(dba.InsertVertex().id());
});
}
for (auto &thread : threads) thread.join();
EXPECT_EQ(ids.access().size(), 50 * 200);
}
TEST(GraphDbAccessorTest, RemoveVertexSameTransaction) {
GraphDb db;
GraphDbAccessor accessor(db);
@ -116,6 +133,27 @@ TEST(GraphDbAccessorTest, InsertEdge) {
EXPECT_EQ(va3.out_degree(), 1);
}
TEST(GraphDbAccessorTest, UniqueEdgeId) {
GraphDb db;
SkipList<int64_t> ids;
std::vector<std::thread> threads;
for (int i = 0; i < 50; i++) {
threads.emplace_back([&db, &ids]() {
GraphDbAccessor dba(db);
auto v1 = dba.InsertVertex();
auto v2 = dba.InsertVertex();
auto edge_type = dba.EdgeType("edge_type");
auto access = ids.access();
for (int i = 0; i < 200; i++)
access.insert(dba.InsertEdge(v1, v2, edge_type).id());
});
}
for (auto &thread : threads) thread.join();
EXPECT_EQ(ids.access().size(), 50 * 200);
}
TEST(GraphDbAccessorTest, RemoveEdge) {
GraphDb db;
GraphDbAccessor dba(db);

View File

@ -14,8 +14,8 @@ TEST(MVCC, Deadlock) {
tx::Engine engine;
auto t0 = engine.Begin();
mvcc::VersionList<Prop> version_list1(*t0);
mvcc::VersionList<Prop> version_list2(*t0);
mvcc::VersionList<Prop> version_list1(*t0, 0);
mvcc::VersionList<Prop> version_list2(*t0, 1);
t0->Commit();
auto t1 = engine.Begin();
@ -33,7 +33,7 @@ TEST(MVCC, UpdateDontDelete) {
{
tx::Engine engine;
auto t1 = engine.Begin();
mvcc::VersionList<DestrCountRec> version_list(*t1, count);
mvcc::VersionList<DestrCountRec> version_list(*t1, 0, count);
t1->Commit();
auto t2 = engine.Begin();
@ -57,7 +57,7 @@ TEST(MVCC, UpdateDontDelete) {
TEST(MVCC, Oldest) {
tx::Engine engine;
auto t1 = engine.Begin();
mvcc::VersionList<Prop> version_list(*t1);
mvcc::VersionList<Prop> version_list(*t1, 0);
auto first = version_list.Oldest();
EXPECT_NE(first, nullptr);
// TODO Gleich: no need to do 10 checks of the same thing

View File

@ -60,7 +60,7 @@ class Mvcc : public ::testing::Test {
int version_list_size = 0;
tx::Engine engine;
tx::Transaction *t1 = engine.Begin();
mvcc::VersionList<TestClass> version_list{*t1, version_list_size};
mvcc::VersionList<TestClass> version_list{*t1, 0, version_list_size};
TestClass *v1 = nullptr;
tx::Transaction *t2 = nullptr;
tx::transaction_id_t id0, id1, id2;

View File

@ -7,7 +7,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "data_structures/concurrent/skiplist.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "mvcc/record.hpp"
#include "mvcc/version_list.hpp"
#include "storage/garbage_collector.hpp"
@ -25,7 +25,8 @@ class MvccGcTest : public ::testing::Test {
protected:
std::atomic<int> record_destruction_count{0};
mvcc::VersionList<DestrCountRec> version_list{*t0, record_destruction_count};
mvcc::VersionList<DestrCountRec> version_list{*t0, 0,
record_destruction_count};
std::vector<tx::Transaction *> transactions{t0};
void SetUp() override { t0->Commit(); }
@ -115,18 +116,20 @@ TEST_F(MvccGcTest, OldestTransactionSnapshot) {
* empty (not visible from any future transaction) from the skiplist.
*/
TEST(GarbageCollector, GcClean) {
SkipList<mvcc::VersionList<DestrCountRec> *> skiplist;
ConcurrentMap<int64_t, mvcc::VersionList<DestrCountRec> *> collection;
tx::Engine engine;
DeferredDeleter<DestrCountRec> deleter;
DeferredDeleter<mvcc::VersionList<DestrCountRec>> vlist_deleter;
GarbageCollector<DestrCountRec> gc(skiplist, deleter, vlist_deleter);
GarbageCollector<decltype(collection), DestrCountRec> gc(collection, deleter,
vlist_deleter);
// create a version list in transaction t1
auto t1 = engine.Begin();
std::atomic<int> record_destruction_count{0};
auto vl = new mvcc::VersionList<DestrCountRec>(*t1, record_destruction_count);
auto access = skiplist.access();
access.insert(vl);
auto vl =
new mvcc::VersionList<DestrCountRec>(*t1, 0, record_destruction_count);
auto access = collection.access();
access.insert(0, vl);
t1->Commit();
// run garbage collection that has nothing co collect

View File

@ -267,9 +267,9 @@ class ExpandFixture : public testing::Test {
EdgeAccessor r2 = dba_.InsertEdge(v1, v3, edge_type);
void SetUp() override {
v1.add_label((GraphDbTypes::Label)1);
v2.add_label((GraphDbTypes::Label)2);
v3.add_label((GraphDbTypes::Label)3);
v1.add_label(dba_.Label("l1"));
v2.add_label(dba_.Label("l2"));
v3.add_label(dba_.Label("l3"));
dba_.AdvanceCommand();
}
};

View File

@ -1,291 +0,0 @@
#include <cstdio>
#include <experimental/filesystem>
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "gtest/gtest.h"
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/recovery.hpp"
#include "durability/version.hpp"
DECLARE_int32(snapshot_cycle_sec);
namespace fs = std::experimental::filesystem;
char tmp[] = "XXXXXX";
const fs::path SNAPSHOTS_DIR = mkdtemp(tmp);
std::vector<fs::path> GetFilesFromDir(
const std::string &snapshots_default_db_dir) {
std::vector<fs::path> files;
for (auto &file : fs::directory_iterator(snapshots_default_db_dir))
files.push_back(file.path());
return files;
}
void CleanDbDir() {
if (!fs::exists(SNAPSHOTS_DIR)) return;
std::vector<fs::path> files = GetFilesFromDir(SNAPSHOTS_DIR);
for (auto file : files) {
fs::remove(file);
}
}
class RecoveryTest : public ::testing::Test {
protected:
void TearDown() override { CleanDbDir(); }
void SetUp() override {
CleanDbDir();
FLAGS_snapshot_cycle_sec = -1;
}
const int snapshot_max_retained_ = 10;
};
void CreateSmallGraph(GraphDb &db) {
GraphDbAccessor dba(db);
// setup (v1) - [:likes] -> (v2) <- [:hates] - (v3)
auto va1 = dba.InsertVertex();
auto va2 = dba.InsertVertex();
dba.InsertEdge(va1, va2, dba.EdgeType("likes"));
auto va3 = dba.InsertVertex();
dba.InsertEdge(va3, va2, dba.EdgeType("hates"));
dba.Commit();
}
void CreateBigGraph(GraphDb &db) {
// creates graph with one inner vertex connected with other 999 outer vertices
// relationships are directed from outer vertices to the inner vertex
// every vertex hash label "label" and property "prop" with value "prop"
// every relationship has type "type" and property "prop" with value "prop"
GraphDbAccessor dba(db);
auto va_middle = dba.InsertVertex();
va_middle.add_label(dba.Label("label"));
va_middle.PropsSet(dba.Property("prop"), "prop");
for (int i = 1; i < 1000; ++i) {
auto va = dba.InsertVertex();
va.add_label(dba.Label("label"));
va.PropsSet(dba.Property("prop"), "prop");
auto ea = dba.InsertEdge(va, va_middle, dba.EdgeType("type"));
ea.PropsSet(dba.Property("prop"), "prop");
}
dba.Commit();
}
void TakeSnapshot(GraphDb &db, int snapshot_max_retained_) {
GraphDbAccessor dba(db);
Snapshooter snapshooter;
snapshooter.MakeSnapshot(dba, SNAPSHOTS_DIR, snapshot_max_retained_);
}
std::string GetLatestSnapshot() {
std::vector<fs::path> files = GetFilesFromDir(SNAPSHOTS_DIR);
CHECK(static_cast<int>(files.size()) == 1) << "No snapshot files in folder.";
std::sort(files.rbegin(), files.rend());
return files[0];
}
TEST_F(RecoveryTest, TestEncoding) {
// Creates snapshot of the small graph. Uses file_reader_buffer and bolt
// decoder to read data from the snapshot and reads graph from it. After
// reading graph is tested.
GraphDb db;
CreateSmallGraph(db);
TakeSnapshot(db, snapshot_max_retained_);
std::string snapshot = GetLatestSnapshot();
HashedFileReader buffer;
communication::bolt::Decoder<HashedFileReader> decoder(buffer);
int64_t vertex_count, edge_count;
uint64_t hash;
ASSERT_TRUE(buffer.Open(snapshot));
ASSERT_TRUE(
durability::ReadSnapshotSummary(buffer, vertex_count, edge_count, hash));
auto magic_number = durability::kMagicNumber;
buffer.Read(magic_number.data(), magic_number.size());
ASSERT_EQ(magic_number, durability::kMagicNumber);
communication::bolt::DecodedValue dv;
decoder.ReadValue(&dv);
ASSERT_EQ(dv.ValueInt(), durability::kVersion);
// Transactional Snapshot, igore value, just check type.
decoder.ReadValue(&dv);
ASSERT_TRUE(dv.IsList());
// Label property indices.
decoder.ReadValue(&dv);
ASSERT_EQ(dv.ValueList().size(), 0);
std::vector<int64_t> ids;
std::vector<std::string> edge_types;
for (int i = 0; i < vertex_count; ++i) {
communication::bolt::DecodedValue vertex_dv;
decoder.ReadValue(&vertex_dv);
auto &vertex = vertex_dv.ValueVertex();
ids.push_back(vertex.id);
}
std::vector<int64_t> from, to;
for (int i = 0; i < edge_count; ++i) {
communication::bolt::DecodedValue edge_dv;
decoder.ReadValue(&edge_dv);
auto &edge = edge_dv.ValueEdge();
from.push_back(edge.from);
to.push_back(edge.to);
edge_types.push_back(edge.type);
}
// Vertex and edge counts are included in the hash. Re-read them to update the
// hash.
buffer.ReadType(vertex_count);
buffer.ReadType(edge_count);
buffer.Close();
ASSERT_EQ(to.size(), 2U);
ASSERT_EQ(from.size(), 2U);
EXPECT_EQ(buffer.hash(), hash);
EXPECT_NE(edge_types.end(),
std::find(edge_types.begin(), edge_types.end(), "hates"));
EXPECT_NE(edge_types.end(),
std::find(edge_types.begin(), edge_types.end(), "likes"));
EXPECT_EQ(to[0], to[1]);
EXPECT_NE(from[0], from[1]);
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), to[0]));
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), from[0]));
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), from[1]));
}
TEST_F(RecoveryTest, TestEncodingAndDecoding) {
// Creates snapshot of the small graph. Uses Recovery to recover graph from
// the snapshot file. After creation graph is tested.
GraphDb db;
CreateSmallGraph(db);
TakeSnapshot(db, snapshot_max_retained_);
std::string snapshot = GetLatestSnapshot();
// New db is needed - old db has database "default"
GraphDb db_recover;
GraphDbAccessor dba_recover(db_recover);
Recovery recovery;
ASSERT_TRUE(recovery.Recover(snapshot, dba_recover));
std::vector<VertexAccessor> vertices;
std::vector<EdgeAccessor> edges;
GraphDbAccessor dba(db_recover);
int64_t vertex_count = 0;
for (const auto &vertex : dba.Vertices(false)) {
vertices.push_back(vertex);
vertex_count++;
}
EXPECT_EQ(vertex_count, 3);
int64_t edge_count = 0;
for (const auto &edge : dba.Edges(false)) {
EXPECT_NE(vertices.end(),
std::find(vertices.begin(), vertices.end(), edge.to()));
EXPECT_NE(vertices.end(),
std::find(vertices.begin(), vertices.end(), edge.from()));
edges.push_back(edge);
edge_count++;
}
CHECK(static_cast<int>(edges.size()) == 2) << "There should be two edges.";
EXPECT_EQ(edge_count, 2);
EXPECT_TRUE(edges[0].to() == edges[1].to());
EXPECT_FALSE(edges[0].from() == edges[1].from());
}
TEST_F(RecoveryTest, TestEncodingAndRecovering) {
// Creates snapshot of the big graph. Uses Recovery to recover graph from
// the snapshot file. After creation graph is tested.
GraphDb db;
CreateBigGraph(db);
TakeSnapshot(db, snapshot_max_retained_);
std::string snapshot = GetLatestSnapshot();
// New db is needed - old db has database "default"
GraphDb db_recover;
GraphDbAccessor dba_recover(db_recover);
Recovery recovery;
EXPECT_TRUE(recovery.Recover(snapshot, dba_recover));
GraphDbAccessor dba_get(db_recover);
int64_t vertex_count = 0;
for (const auto &vertex : dba_get.Vertices(false)) {
EXPECT_EQ(vertex.labels().size(), 1);
EXPECT_TRUE(vertex.has_label(dba_get.Label("label")));
query::TypedValue prop =
query::TypedValue(vertex.PropsAt(dba_get.Property("prop")));
query::TypedValue expected_prop = query::TypedValue(PropertyValue("prop"));
EXPECT_TRUE((prop == expected_prop).Value<bool>());
vertex_count++;
}
EXPECT_EQ(vertex_count, 1000);
int64_t edge_count = 0;
for (const auto &edge : dba_get.Edges(false)) {
EXPECT_EQ(edge.EdgeType(), dba_get.EdgeType("type"));
query::TypedValue prop =
query::TypedValue(edge.PropsAt(dba_get.Property("prop")));
query::TypedValue expected_prop = query::TypedValue(PropertyValue("prop"));
EXPECT_TRUE((prop == expected_prop).Value<bool>());
edge_count++;
}
EXPECT_EQ(edge_count, 999);
dba_get.Commit();
}
TEST_F(RecoveryTest, TestLabelPropertyIndexRecovery) {
// Creates snapshot of the graph with indices.
GraphDb db;
GraphDbAccessor dba(db);
dba.BuildIndex(dba.Label("label"), dba.Property("prop"));
dba.Commit();
CreateBigGraph(db);
TakeSnapshot(db, snapshot_max_retained_);
std::string snapshot = GetLatestSnapshot();
GraphDb db_recover;
GraphDbAccessor dba_recover(db_recover);
Recovery recovery;
EXPECT_TRUE(recovery.Recover(snapshot, dba_recover));
GraphDbAccessor dba_get(db_recover);
EXPECT_EQ(dba_get.GetIndicesKeys().size(), 1);
EXPECT_TRUE(dba_get.LabelPropertyIndexExists(dba_get.Label("label"),
dba_get.Property("prop")));
int64_t vertex_count = 0;
for (const auto &vertex : dba_get.Vertices(false)) {
EXPECT_EQ(vertex.labels().size(), 1);
EXPECT_TRUE(vertex.has_label(dba_get.Label("label")));
query::TypedValue prop =
query::TypedValue(vertex.PropsAt(dba_get.Property("prop")));
query::TypedValue expected_prop = query::TypedValue(PropertyValue("prop"));
EXPECT_TRUE((prop == expected_prop).Value<bool>());
vertex_count++;
}
EXPECT_EQ(vertex_count, 1000);
int64_t edge_count = 0;
for (const auto &edge : dba_get.Edges(false)) {
EXPECT_EQ(edge.EdgeType(), dba_get.EdgeType("type"));
query::TypedValue prop =
query::TypedValue(edge.PropsAt(dba_get.Property("prop")));
query::TypedValue expected_prop = query::TypedValue(PropertyValue("prop"));
EXPECT_TRUE((prop == expected_prop).Value<bool>());
edge_count++;
}
EXPECT_EQ(edge_count, 999);
dba_get.Commit();
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -1,117 +0,0 @@
#include <experimental/filesystem>
#include "gflags/gflags.h"
#include "gtest/gtest.h"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/snapshooter.hpp"
DECLARE_bool(snapshot_on_exit);
DECLARE_int32(snapshot_cycle_sec);
DECLARE_string(snapshot_directory);
namespace fs = std::experimental::filesystem;
char tmp[] = "XXXXXX";
const fs::path SNAPSHOTS_DIR = mkdtemp(tmp);
// Other functionality is tested in recovery tests.
std::vector<fs::path> GetFilesFromDir(
const fs::path &snapshots_default_db_dir) {
std::vector<fs::path> files;
for (auto &file : fs::directory_iterator(snapshots_default_db_dir))
files.push_back(file.path());
return files;
}
void CleanDbDir() {
if (!fs::exists(SNAPSHOTS_DIR)) return;
std::vector<fs::path> files = GetFilesFromDir(SNAPSHOTS_DIR);
for (auto file : files) {
fs::remove(file);
}
}
class SnapshotTest : public ::testing::Test {
protected:
virtual void TearDown() { CleanDbDir(); }
virtual void SetUp() {
CleanDbDir();
FLAGS_snapshot_cycle_sec = -1;
}
std::string snapshot_cycle_sec_setup_;
};
TEST_F(SnapshotTest, CreateLessThanMaxRetainedSnapshotsTests) {
const int snapshot_max_retained = 10;
GraphDb db;
for (int i = 0; i < 3; ++i) {
GraphDbAccessor dba(db);
Snapshooter snapshooter;
snapshooter.MakeSnapshot(dba, SNAPSHOTS_DIR,
snapshot_max_retained);
}
std::vector<fs::path> files = GetFilesFromDir(SNAPSHOTS_DIR);
EXPECT_EQ(files.size(), 3);
}
TEST_F(SnapshotTest, CreateMoreThanMaxRetainedSnapshotsTests) {
const int snapshot_max_retained = 2;
GraphDb db;
fs::path first_snapshot;
for (int i = 0; i < 3; ++i) {
GraphDbAccessor dba(db);
Snapshooter snapshooter;
snapshooter.MakeSnapshot(dba, SNAPSHOTS_DIR,
snapshot_max_retained);
if (i == 0) {
std::vector<fs::path> files_begin =
GetFilesFromDir(SNAPSHOTS_DIR);
EXPECT_EQ(files_begin.size(), 1);
first_snapshot = files_begin[0];
}
}
std::vector<fs::path> files_end =
GetFilesFromDir(SNAPSHOTS_DIR);
EXPECT_EQ(files_end.size(), 2);
EXPECT_EQ(fs::exists(first_snapshot), false);
}
TEST_F(SnapshotTest, CreateSnapshotWithUnlimitedMaxRetainedSnapshots) {
const int snapshot_max_retained = -1;
GraphDb db;
for (int i = 0; i < 10; ++i) {
GraphDbAccessor dba(db);
Snapshooter snapshooter;
snapshooter.MakeSnapshot(dba, SNAPSHOTS_DIR,
snapshot_max_retained);
}
std::vector<fs::path> files = GetFilesFromDir(SNAPSHOTS_DIR);
EXPECT_EQ(files.size(), 10);
}
TEST_F(SnapshotTest, TestSnapshotFileOnDbDestruct) {
{
FLAGS_snapshot_directory = SNAPSHOTS_DIR;
FLAGS_snapshot_on_exit = true;
GraphDb db;
GraphDbAccessor dba(db);
}
std::vector<fs::path> files = GetFilesFromDir(SNAPSHOTS_DIR);
// snapshot is created on dbms destruction
EXPECT_EQ(files.size(), 1);
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -5,6 +5,7 @@ add_executable(mg_import_csv
${memgraph_src_dir}/data_structures/concurrent/skiplist_gc.cpp
${memgraph_src_dir}/database/graph_db_accessor.cpp
${memgraph_src_dir}/durability/snapshooter.cpp
${memgraph_src_dir}/durability/wal.cpp
${memgraph_src_dir}/query/typed_value.cpp
${memgraph_src_dir}/storage/edge_accessor.cpp
${memgraph_src_dir}/storage/locking/record_lock.cpp

View File

@ -331,16 +331,18 @@ void Convert(const std::vector<std::string> &nodes,
int64_t relationship_count = 0;
MemgraphNodeIdMap node_id_map;
// Snapshot file has the following contents in order:
// 1) magic number
// 2) transactional snapshot of the snapshoter. When the snapshot is
// 1) Magic number.
// 2) Transaction ID of the snapshooter. When generated set to 0.
// 3) Transactional snapshot of the snapshoter. When the snapshot is
// generated it's an empty list.
// 3) list of label+property index
// 4) all nodes, sequentially, but not encoded as a list
// 5) all relationships, sequentially, but not encoded as a list
// 5) summary with node count, relationship count and hash digest
// 4) List of label+property index.
// 5) All nodes, sequentially, but not encoded as a list.
// 6) All relationships, sequentially, but not encoded as a list.
// 7) Summary with node count, relationship count and hash digest.
encoder.WriteRAW(durability::kMagicNumber.data(),
durability::kMagicNumber.size());
encoder.WriteTypedValue(durability::kVersion);
encoder.WriteInt(0); // Id of transaction that is snapshooting.
encoder.WriteList({}); // Transactional snapshot.
encoder.WriteList({}); // Label + property indexes.
for (const auto &nodes_file : nodes) {
@ -389,7 +391,7 @@ std::string GetOutputPath() {
} catch (const std::experimental::filesystem::filesystem_error &error) {
LOG(FATAL) << error.what();
}
return std::string(GetSnapshotFileName(snapshot_dir));
return std::string(durability::MakeSnapshotPath(snapshot_dir));
}
int main(int argc, char *argv[]) {

View File

@ -8,6 +8,7 @@ add_executable(mg_recovery_check
${memgraph_src_dir}/database/graph_db_accessor.cpp
${memgraph_src_dir}/durability/recovery.cpp
${memgraph_src_dir}/durability/snapshooter.cpp
${memgraph_src_dir}/durability/wal.cpp
${memgraph_src_dir}/query/typed_value.cpp
${memgraph_src_dir}/storage/edge_accessor.cpp
${memgraph_src_dir}/storage/locking/record_lock.cpp

View File

@ -3,6 +3,7 @@
#include "gflags/gflags.h"
#include "gtest/gtest.h"
#include "database/graph_db_accessor.hpp"
#include "database/graph_db.hpp"
#include "durability/recovery.hpp"
#include "query/typed_value.hpp"
@ -18,10 +19,8 @@ DEFINE_string(snapshot_dir, "", "Path to where the snapshot is stored");
class RecoveryTest : public ::testing::Test {
protected:
void SetUp() override {
Recovery recovery;
std::string snapshot(FLAGS_snapshot_dir + "/snapshot");
GraphDbAccessor dba(db_);
recovery.Recover(snapshot, dba);
std::string snapshot(FLAGS_snapshot_dir);
durability::Recover(snapshot, db_);
}
GraphDb db_;