diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6a64e48a6..ab074a10a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -16,6 +16,7 @@ set(memgraph_src_files database/graph_db.cpp database/graph_db_config.cpp database/graph_db_accessor.cpp + database/state_delta.cpp durability/paths.cpp durability/recovery.cpp durability/snapshooter.cpp diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index b323c5c43..88751c500 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -11,7 +11,7 @@ GraphDbAccessor::GraphDbAccessor(GraphDb &db) : db_(db), transaction_(MasterEngine().Begin()) { - db_.wal_.TxBegin(transaction_->id_); + db_.wal_.Emplace(database::StateDelta::TxBegin(transaction_->id_)); } GraphDbAccessor::~GraphDbAccessor() { @@ -33,7 +33,7 @@ void GraphDbAccessor::Commit() { DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction."; auto tid = transaction_->id_; MasterEngine().Commit(*transaction_); - db_.wal_.TxCommit(tid); + db_.wal_.Emplace(database::StateDelta::TxCommit(tid)); commited_ = true; } @@ -41,7 +41,7 @@ void GraphDbAccessor::Abort() { DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction."; auto tid = transaction_->id_; MasterEngine().Abort(*transaction_); - db_.wal_.TxAbort(tid); + db_.wal_.Emplace(database::StateDelta::TxAbort(tid)); aborted_ = true; } @@ -68,7 +68,8 @@ VertexAccessor GraphDbAccessor::InsertVertex( bool success = db_.vertices_.access().insert(id, vertex_vlist).second; CHECK(success) << "Attempting to insert a vertex with an existing ID: " << id; - db_.wal_.CreateVertex(transaction_->id_, vertex_vlist->gid_); + db_.wal_.Emplace(database::StateDelta::CreateVertex(transaction_->id_, + vertex_vlist->gid_)); return VertexAccessor(*vertex_vlist, *this); } @@ -148,8 +149,8 @@ void GraphDbAccessor::BuildIndex(const GraphDbTypes::Label &label, // reason. auto wal_build_index_tx_id = dba.transaction_id(); dba.Commit(); - db_.wal_.BuildIndex(wal_build_index_tx_id, LabelName(label), - PropertyName(property)); + db_.wal_.Emplace(database::StateDelta::BuildIndex( + wal_build_index_tx_id, LabelName(label), PropertyName(property))); // After these two operations we are certain that everything is contained in // the index under the assumption that this transaction contained no @@ -252,7 +253,8 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor) { if (vertex_accessor.out_degree() > 0 || vertex_accessor.in_degree() > 0) return false; - db_.wal_.RemoveVertex(transaction_->id_, vertex_accessor.vlist_->gid_); + db_.wal_.Emplace(database::StateDelta::RemoveVertex( + transaction_->id_, vertex_accessor.vlist_->gid_)); vertex_accessor.vlist_->remove(vertex_accessor.current_, *transaction_); return true; @@ -305,8 +307,9 @@ EdgeAccessor GraphDbAccessor::InsertEdge( to.SwitchNew(); to.update().in_.emplace(from.vlist_, edge_vlist, edge_type); - db_.wal_.CreateEdge(transaction_->id_, edge_vlist->gid_, from.vlist_->gid_, - to.vlist_->gid_, EdgeTypeName(edge_type)); + db_.wal_.Emplace(database::StateDelta::CreateEdge( + transaction_->id_, edge_vlist->gid_, from.vlist_->gid_, to.vlist_->gid_, + EdgeTypeName(edge_type))); return EdgeAccessor(*edge_vlist, *this, from.vlist_, to.vlist_, edge_type); } @@ -329,7 +332,8 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge_accessor, edge_accessor.to().update().in_.RemoveEdge(edge_accessor.vlist_); edge_accessor.vlist_->remove(edge_accessor.current_, *transaction_); - db_.wal_.RemoveEdge(transaction_->id_, edge_accessor.gid()); + db_.wal_.Emplace( + database::StateDelta::RemoveEdge(transaction_->id_, edge_accessor.gid())); } GraphDbTypes::Label GraphDbAccessor::Label(const std::string &label_name) { diff --git a/src/database/state_delta.cpp b/src/database/state_delta.cpp new file mode 100644 index 000000000..cdf301910 --- /dev/null +++ b/src/database/state_delta.cpp @@ -0,0 +1,296 @@ +#include + +#include "communication/bolt/v1/decoder/decoded_value.hpp" +#include "database/graph_db_accessor.hpp" +#include "database/state_delta.hpp" + +namespace database { + +std::pair StateDelta::IndexName() const { + CHECK(StateDelta::type() == StateDelta::Type::BUILD_INDEX) + << "Invalid operation type to try to get index name"; + return std::make_pair(label_, property_); +} + +StateDelta StateDelta::TxBegin(tx::transaction_id_t tx_id) { + return {StateDelta::Type::TRANSACTION_BEGIN, tx_id}; +} + +StateDelta StateDelta::TxCommit(tx::transaction_id_t tx_id) { + return {StateDelta::Type::TRANSACTION_COMMIT, tx_id}; +} + +StateDelta StateDelta::TxAbort(tx::transaction_id_t tx_id) { + return {StateDelta::Type::TRANSACTION_ABORT, tx_id}; +} + +StateDelta StateDelta::CreateVertex(tx::transaction_id_t tx_id, + gid::Gid vertex_id) { + StateDelta op(StateDelta::Type::CREATE_VERTEX, tx_id); + op.vertex_id_ = vertex_id; + return op; +} + +StateDelta StateDelta::CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id, + gid::Gid vertex_from_id, + gid::Gid vertex_to_id, + const std::string &edge_type) { + StateDelta op(StateDelta::Type::CREATE_EDGE, tx_id); + op.edge_id_ = edge_id; + op.vertex_from_id_ = vertex_from_id; + op.vertex_to_id_ = vertex_to_id; + op.edge_type_ = edge_type; + return op; +} + +StateDelta StateDelta::PropsSetVertex(tx::transaction_id_t tx_id, + gid::Gid vertex_id, + const std::string &property, + const PropertyValue &value) { + StateDelta op(StateDelta::Type::SET_PROPERTY_VERTEX, tx_id); + op.vertex_id_ = vertex_id; + op.property_ = property; + op.value_ = value; + return op; +} + +StateDelta StateDelta::PropsSetEdge(tx::transaction_id_t tx_id, + gid::Gid edge_id, + const std::string &property, + const PropertyValue &value) { + StateDelta op(StateDelta::Type::SET_PROPERTY_EDGE, tx_id); + op.edge_id_ = edge_id; + op.property_ = property; + op.value_ = value; + return op; +} + +StateDelta StateDelta::AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id, + const std::string &label) { + StateDelta op(StateDelta::Type::ADD_LABEL, tx_id); + op.vertex_id_ = vertex_id; + op.label_ = label; + return op; +} + +StateDelta StateDelta::RemoveLabel(tx::transaction_id_t tx_id, + gid::Gid vertex_id, + const std::string &label) { + StateDelta op(StateDelta::Type::REMOVE_LABEL, tx_id); + op.vertex_id_ = vertex_id; + op.label_ = label; + return op; +} + +StateDelta StateDelta::RemoveVertex(tx::transaction_id_t tx_id, + gid::Gid vertex_id) { + StateDelta op(StateDelta::Type::REMOVE_VERTEX, tx_id); + op.vertex_id_ = vertex_id; + return op; +} + +StateDelta StateDelta::RemoveEdge(tx::transaction_id_t tx_id, + gid::Gid edge_id) { + StateDelta op(StateDelta::Type::REMOVE_EDGE, tx_id); + op.edge_id_ = edge_id; + return op; +} + +StateDelta StateDelta::BuildIndex(tx::transaction_id_t tx_id, + const std::string &label, + const std::string &property) { + StateDelta op(StateDelta::Type::BUILD_INDEX, tx_id); + op.label_ = label; + op.property_ = property; + return op; +} + +void StateDelta::Encode( + HashedFileWriter &writer, + communication::bolt::PrimitiveEncoder &encoder) const { + encoder.WriteInt(static_cast(type_)); + encoder.WriteInt(static_cast(transaction_id_)); + + switch (type_) { + case Type::TRANSACTION_BEGIN: + case Type::TRANSACTION_COMMIT: + case Type::TRANSACTION_ABORT: + break; + case Type::CREATE_VERTEX: + encoder.WriteInt(vertex_id_); + break; + case Type::CREATE_EDGE: + encoder.WriteInt(edge_id_); + encoder.WriteInt(vertex_from_id_); + encoder.WriteInt(vertex_to_id_); + encoder.WriteString(edge_type_); + break; + case Type::SET_PROPERTY_VERTEX: + encoder.WriteInt(vertex_id_); + encoder.WriteString(property_); + encoder.WritePropertyValue(value_); + break; + case Type::SET_PROPERTY_EDGE: + encoder.WriteInt(edge_id_); + encoder.WriteString(property_); + encoder.WritePropertyValue(value_); + break; + case Type::ADD_LABEL: + case Type::REMOVE_LABEL: + encoder.WriteInt(vertex_id_); + encoder.WriteString(label_); + break; + case Type::REMOVE_VERTEX: + encoder.WriteInt(vertex_id_); + break; + case Type::REMOVE_EDGE: + encoder.WriteInt(edge_id_); + break; + case Type::BUILD_INDEX: + encoder.WriteString(label_); + encoder.WriteString(property_); + break; + } + + writer.WriteValue(writer.hash()); +} + +#define DECODE_MEMBER(member, value_f) \ + if (!decoder.ReadValue(&dv)) return nullopt; \ + r_val.member = dv.value_f(); + +std::experimental::optional StateDelta::Decode( + HashedFileReader &reader, + communication::bolt::Decoder &decoder) { + using std::experimental::nullopt; + + StateDelta r_val; + // The decoded value used as a temporary while decoding. + communication::bolt::DecodedValue dv; + + try { + if (!decoder.ReadValue(&dv)) return nullopt; + r_val.type_ = static_cast(dv.ValueInt()); + DECODE_MEMBER(transaction_id_, ValueInt) + + switch (r_val.type_) { + case Type::TRANSACTION_BEGIN: + case Type::TRANSACTION_COMMIT: + case Type::TRANSACTION_ABORT: + break; + case Type::CREATE_VERTEX: + DECODE_MEMBER(vertex_id_, ValueInt) + break; + case Type::CREATE_EDGE: + DECODE_MEMBER(edge_id_, ValueInt) + DECODE_MEMBER(vertex_from_id_, ValueInt) + DECODE_MEMBER(vertex_to_id_, ValueInt) + DECODE_MEMBER(edge_type_, ValueString) + break; + case Type::SET_PROPERTY_VERTEX: + DECODE_MEMBER(vertex_id_, ValueInt) + DECODE_MEMBER(property_, ValueString) + if (!decoder.ReadValue(&dv)) return nullopt; + r_val.value_ = static_cast(dv); + break; + case Type::SET_PROPERTY_EDGE: + DECODE_MEMBER(edge_id_, ValueInt) + DECODE_MEMBER(property_, ValueString) + if (!decoder.ReadValue(&dv)) return nullopt; + r_val.value_ = static_cast(dv); + break; + case Type::ADD_LABEL: + case Type::REMOVE_LABEL: + DECODE_MEMBER(vertex_id_, ValueInt) + DECODE_MEMBER(label_, ValueString) + break; + case Type::REMOVE_VERTEX: + DECODE_MEMBER(vertex_id_, ValueInt) + break; + case Type::REMOVE_EDGE: + DECODE_MEMBER(edge_id_, ValueInt) + break; + case Type::BUILD_INDEX: + DECODE_MEMBER(label_, ValueString) + DECODE_MEMBER(property_, ValueString) + break; + } + + auto decoder_hash = reader.hash(); + uint64_t encoded_hash; + if (!reader.ReadType(encoded_hash, true)) return nullopt; + if (decoder_hash != encoded_hash) return nullopt; + + return r_val; + } catch (communication::bolt::DecodedValueException &) { + return nullopt; + } catch (std::ifstream::failure &) { + return nullopt; + } +} + +#undef DECODE_MEMBER + +void StateDelta::Apply(GraphDbAccessor &dba) const { + switch (type_) { + // Transactional state is not recovered. + case Type::TRANSACTION_BEGIN: + case Type::TRANSACTION_COMMIT: + case Type::TRANSACTION_ABORT: + LOG(FATAL) << "Transaction handling not handled in Apply"; + break; + case Type::CREATE_VERTEX: + dba.InsertVertex(vertex_id_); + break; + case Type::CREATE_EDGE: { + auto from = dba.FindVertex(vertex_from_id_, true); + auto to = dba.FindVertex(vertex_to_id_, true); + DCHECK(from) << "Failed to find vertex."; + DCHECK(to) << "Failed to find vertex."; + dba.InsertEdge(*from, *to, dba.EdgeType(edge_type_), edge_id_); + break; + } + case Type::SET_PROPERTY_VERTEX: { + auto vertex = dba.FindVertex(vertex_id_, true); + DCHECK(vertex) << "Failed to find vertex."; + vertex->PropsSet(dba.Property(property_), value_); + break; + } + case Type::SET_PROPERTY_EDGE: { + auto edge = dba.FindEdge(edge_id_, true); + DCHECK(edge) << "Failed to find edge."; + edge->PropsSet(dba.Property(property_), value_); + break; + } + case Type::ADD_LABEL: { + auto vertex = dba.FindVertex(vertex_id_, true); + DCHECK(vertex) << "Failed to find vertex."; + vertex->add_label(dba.Label(label_)); + break; + } + case Type::REMOVE_LABEL: { + auto vertex = dba.FindVertex(vertex_id_, true); + DCHECK(vertex) << "Failed to find vertex."; + vertex->remove_label(dba.Label(label_)); + break; + } + case Type::REMOVE_VERTEX: { + auto vertex = dba.FindVertex(vertex_id_, true); + DCHECK(vertex) << "Failed to find vertex."; + dba.DetachRemoveVertex(*vertex); + break; + } + case Type::REMOVE_EDGE: { + auto edge = dba.FindEdge(edge_id_, true); + DCHECK(edge) << "Failed to find edge."; + dba.RemoveEdge(*edge); + break; + } + case Type::BUILD_INDEX: { + LOG(FATAL) << "Index handling not handled in Apply"; + break; + } + } +} + +}; // namespace database diff --git a/src/database/state_delta.hpp b/src/database/state_delta.hpp new file mode 100644 index 000000000..211b2ebd4 --- /dev/null +++ b/src/database/state_delta.hpp @@ -0,0 +1,101 @@ +#pragma once + +#include "communication/bolt/v1/decoder/decoder.hpp" +#include "communication/bolt/v1/encoder/primitive_encoder.hpp" +#include "durability/hashed_file_reader.hpp" +#include "durability/hashed_file_writer.hpp" +#include "storage/gid.hpp" +#include "storage/property_value.hpp" +#include "transactions/transaction.hpp" + +namespace database { +/** Describes single change to the database state. Used for durability (WAL) and + * state communication over network in HA and for distributed remote storage + * changes*/ +class StateDelta { + public: + /** Defines StateDelta type. For each type the comment indicates which values + * need to be stored. All deltas have the transaction_id member, so that's + * omitted in the comment. */ + enum class Type { + TRANSACTION_BEGIN, + TRANSACTION_COMMIT, + TRANSACTION_ABORT, + CREATE_VERTEX, // vertex_id + CREATE_EDGE, // edge_id, from_vertex_id, to_vertex_id, edge_type + SET_PROPERTY_VERTEX, // vertex_id, property, property_value + SET_PROPERTY_EDGE, // edge_id, property, property_value + // remove property is done by setting a PropertyValue::Null + ADD_LABEL, // vertex_id, label + REMOVE_LABEL, // vertex_id, label + REMOVE_VERTEX, // vertex_id + REMOVE_EDGE, // edge_id + BUILD_INDEX // label, property + }; + + StateDelta() = default; + StateDelta(const enum Type &type, tx::transaction_id_t tx_id) + : type_(type), transaction_id_(tx_id) {} + + /** Attempts to decode a StateDelta from the given decoder. Returns the + * decoded value if successful, otherwise returns nullopt. */ + static std::experimental::optional Decode( + HashedFileReader &reader, + communication::bolt::Decoder &decoder); + + /** Encodes the delta using primitive encoder, and writes out the new hash + * with delta to the writer */ + void Encode( + HashedFileWriter &writer, + communication::bolt::PrimitiveEncoder &encoder) const; + + tx::transaction_id_t transaction_id() const { return transaction_id_; } + Type type() const { return type_; } + + static StateDelta TxBegin(tx::transaction_id_t tx_id); + static StateDelta TxCommit(tx::transaction_id_t tx_id); + static StateDelta TxAbort(tx::transaction_id_t tx_id); + static StateDelta CreateVertex(tx::transaction_id_t tx_id, + gid::Gid vertex_id); + static StateDelta CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id, + gid::Gid vertex_from_id, gid::Gid vertex_to_id, + const std::string &edge_type); + static StateDelta PropsSetVertex(tx::transaction_id_t tx_id, + gid::Gid vertex_id, + const std::string &property, + const PropertyValue &value); + static StateDelta PropsSetEdge(tx::transaction_id_t tx_id, gid::Gid edge_id, + const std::string &property, + const PropertyValue &value); + static StateDelta AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id, + const std::string &label); + static StateDelta RemoveLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id, + const std::string &label); + static StateDelta RemoveVertex(tx::transaction_id_t tx_id, + gid::Gid vertex_id); + static StateDelta RemoveEdge(tx::transaction_id_t tx_id, gid::Gid edge_id); + static StateDelta BuildIndex(tx::transaction_id_t tx_id, + const std::string &label, + const std::string &property); + + std::pair IndexName() const; + + /// Applies CRUD delta to database accessor. Fails on other types of deltas + void Apply(GraphDbAccessor &dba) const; + + private: + // Members valid for every delta. + enum Type type_; + tx::transaction_id_t transaction_id_; + + // Members valid only for some deltas, see StateDelta::Type comments above. + gid::Gid vertex_id_; + gid::Gid edge_id_; + gid::Gid vertex_from_id_; + gid::Gid vertex_to_id_; + std::string edge_type_; + std::string property_; + PropertyValue value_ = PropertyValue::Null; + std::string label_; +}; +} // namespace database diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp index 04bf6541a..4120bcf01 100644 --- a/src/durability/recovery.cpp +++ b/src/durability/recovery.cpp @@ -138,68 +138,6 @@ bool RecoverSnapshot(const fs::path &snapshot_file, #undef RETURN_IF_NOT -void ApplyOp(const WriteAheadLog::Op &op, GraphDbAccessor &dba) { - switch (op.type_) { - // Transactional state is not recovered. - case WriteAheadLog::Op::Type::TRANSACTION_BEGIN: - case WriteAheadLog::Op::Type::TRANSACTION_COMMIT: - case WriteAheadLog::Op::Type::TRANSACTION_ABORT: - LOG(FATAL) << "Transaction handling not handled in ApplyOp"; - break; - case WriteAheadLog::Op::Type::CREATE_VERTEX: - dba.InsertVertex(op.vertex_id_); - break; - case WriteAheadLog::Op::Type::CREATE_EDGE: { - auto from = dba.FindVertex(op.vertex_from_id_, true); - auto to = dba.FindVertex(op.vertex_to_id_, true); - DCHECK(from) << "Failed to find vertex."; - DCHECK(to) << "Failed to find vertex."; - dba.InsertEdge(*from, *to, dba.EdgeType(op.edge_type_), op.edge_id_); - break; - } - case WriteAheadLog::Op::Type::SET_PROPERTY_VERTEX: { - auto vertex = dba.FindVertex(op.vertex_id_, true); - DCHECK(vertex) << "Failed to find vertex."; - vertex->PropsSet(dba.Property(op.property_), op.value_); - break; - } - case WriteAheadLog::Op::Type::SET_PROPERTY_EDGE: { - auto edge = dba.FindEdge(op.edge_id_, true); - DCHECK(edge) << "Failed to find edge."; - edge->PropsSet(dba.Property(op.property_), op.value_); - break; - } - case WriteAheadLog::Op::Type::ADD_LABEL: { - auto vertex = dba.FindVertex(op.vertex_id_, true); - DCHECK(vertex) << "Failed to find vertex."; - vertex->add_label(dba.Label(op.label_)); - break; - } - case WriteAheadLog::Op::Type::REMOVE_LABEL: { - auto vertex = dba.FindVertex(op.vertex_id_, true); - DCHECK(vertex) << "Failed to find vertex."; - vertex->remove_label(dba.Label(op.label_)); - break; - } - case WriteAheadLog::Op::Type::REMOVE_VERTEX: { - auto vertex = dba.FindVertex(op.vertex_id_, true); - DCHECK(vertex) << "Failed to find vertex."; - dba.DetachRemoveVertex(*vertex); - break; - } - case WriteAheadLog::Op::Type::REMOVE_EDGE: { - auto edge = dba.FindEdge(op.edge_id_, true); - DCHECK(edge) << "Failed to find edge."; - dba.RemoveEdge(*edge); - break; - } - case WriteAheadLog::Op::Type::BUILD_INDEX: { - LOG(FATAL) << "Index handling not handled in ApplyOp"; - break; - } - } -} - // TODO - finer-grained recovery feedback could be useful here. bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor, RecoveryData &recovery_data) { @@ -231,19 +169,19 @@ bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor, to_skip.emplace(recovery_data.snapshooter_tx_id); } - // A buffer for the WAL transaction ops. Accumulate and apply them in the + // A buffer for the WAL transaction deltas. Accumulate and apply them in the // right transactional sequence. - std::map> ops; + std::map> deltas; // Track which transactions were aborted/committed in the WAL. std::set aborted; std::set committed; auto apply_all_possible = [&]() { while (true) { - // Remove old ops from memory. - for (auto it = ops.begin(); it != ops.end();) { + // Remove old deltas from memory. + for (auto it = deltas.begin(); it != deltas.end();) { if (it->first < next_to_recover) - it = ops.erase(it); + it = deltas.erase(it); else ++it; } @@ -254,9 +192,9 @@ bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor, else if (utils::Contains(aborted, next_to_recover)) { next_to_recover++; } else if (utils::Contains(committed, next_to_recover)) { - auto found = ops.find(next_to_recover); - if (found != ops.end()) - for (const auto &op : found->second) ApplyOp(op, db_accessor); + auto found = deltas.find(next_to_recover); + if (found != deltas.end()) + for (const auto &delta : found->second) delta.Apply(db_accessor); next_to_recover++; } else break; @@ -273,36 +211,37 @@ bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor, if (!wal_reader.Open(wal_file)) return false; communication::bolt::Decoder decoder(wal_reader); while (true) { - auto op = WriteAheadLog::Op::Decode(wal_reader, decoder); - if (!op) break; - switch (op->type_) { - case WriteAheadLog::Op::Type::TRANSACTION_BEGIN: - DCHECK(ops.find(op->transaction_id_) == ops.end()) + auto delta = database::StateDelta::Decode(wal_reader, decoder); + if (!delta) break; + switch (delta->type()) { + case database::StateDelta::Type::TRANSACTION_BEGIN: + DCHECK(deltas.find(delta->transaction_id()) == deltas.end()) << "Double transaction start"; - if (to_skip.find(op->transaction_id_) == to_skip.end()) - ops.emplace(op->transaction_id_, std::vector{}); + if (to_skip.find(delta->transaction_id()) == to_skip.end()) + deltas.emplace(delta->transaction_id(), + std::vector{}); break; - case WriteAheadLog::Op::Type::TRANSACTION_ABORT: { - auto it = ops.find(op->transaction_id_); - if (it != ops.end()) ops.erase(it); - aborted.emplace(op->transaction_id_); + case database::StateDelta::Type::TRANSACTION_ABORT: { + auto it = deltas.find(delta->transaction_id()); + if (it != deltas.end()) deltas.erase(it); + aborted.emplace(delta->transaction_id()); apply_all_possible(); break; } - case WriteAheadLog::Op::Type::TRANSACTION_COMMIT: - committed.emplace(op->transaction_id_); + case database::StateDelta::Type::TRANSACTION_COMMIT: + committed.emplace(delta->transaction_id()); apply_all_possible(); break; - case WriteAheadLog::Op::Type::BUILD_INDEX: { - recovery_data.indexes.emplace_back(op->label_, op->property_); + case database::StateDelta::Type::BUILD_INDEX: { + recovery_data.indexes.emplace_back(delta->IndexName()); break; } default: { - auto it = ops.find(op->transaction_id_); - if (it != ops.end()) it->second.emplace_back(*op); + auto it = deltas.find(delta->transaction_id()); + if (it != deltas.end()) it->second.emplace_back(*delta); } } - } // reading all Ops in a single wal file + } // reading all deltas in a single wal file } // reading all wal files apply_all_possible(); diff --git a/src/durability/wal.cpp b/src/durability/wal.cpp index 315e4c076..edc2de374 100644 --- a/src/durability/wal.cpp +++ b/src/durability/wal.cpp @@ -9,8 +9,8 @@ DEFINE_HIDDEN_int32( "Interval between two write-ahead log flushes, in milliseconds."); DEFINE_HIDDEN_int32( - wal_rotate_ops_count, 10000, - "How many write-ahead ops should be stored in a single WAL file " + wal_rotate_deltas_count, 10000, + "How many write-ahead deltas should be stored in a single WAL file " "before rotating it."); DEFINE_VALIDATED_HIDDEN_int32(wal_buffer_size, 4096, @@ -19,237 +19,22 @@ DEFINE_VALIDATED_HIDDEN_int32(wal_buffer_size, 4096, namespace durability { -void WriteAheadLog::Op::Encode( - HashedFileWriter &writer, - communication::bolt::PrimitiveEncoder &encoder) const { - encoder.WriteInt(static_cast(type_)); - encoder.WriteInt(static_cast(transaction_id_)); - - switch (type_) { - case Type::TRANSACTION_BEGIN: - case Type::TRANSACTION_COMMIT: - case Type::TRANSACTION_ABORT: - break; - case Type::CREATE_VERTEX: - encoder.WriteInt(vertex_id_); - break; - case Type::CREATE_EDGE: - encoder.WriteInt(edge_id_); - encoder.WriteInt(vertex_from_id_); - encoder.WriteInt(vertex_to_id_); - encoder.WriteString(edge_type_); - break; - case Type::SET_PROPERTY_VERTEX: - encoder.WriteInt(vertex_id_); - encoder.WriteString(property_); - encoder.WritePropertyValue(value_); - break; - case Type::SET_PROPERTY_EDGE: - encoder.WriteInt(edge_id_); - encoder.WriteString(property_); - encoder.WritePropertyValue(value_); - break; - case Type::ADD_LABEL: - case Type::REMOVE_LABEL: - encoder.WriteInt(vertex_id_); - encoder.WriteString(label_); - break; - case Type::REMOVE_VERTEX: - encoder.WriteInt(vertex_id_); - break; - case Type::REMOVE_EDGE: - encoder.WriteInt(edge_id_); - break; - case Type::BUILD_INDEX: - encoder.WriteString(label_); - encoder.WriteString(property_); - break; - } - - writer.WriteValue(writer.hash()); -} - -#define DECODE_MEMBER(member, value_f) \ - if (!decoder.ReadValue(&dv)) return nullopt; \ - r_val.member = dv.value_f(); - -std::experimental::optional WriteAheadLog::Op::Decode( - HashedFileReader &reader, - communication::bolt::Decoder &decoder) { - using std::experimental::nullopt; - - Op r_val; - // The decoded value used as a temporary while decoding. - communication::bolt::DecodedValue dv; - - try { - if (!decoder.ReadValue(&dv)) return nullopt; - r_val.type_ = static_cast(dv.ValueInt()); - DECODE_MEMBER(transaction_id_, ValueInt) - - switch (r_val.type_) { - case Type::TRANSACTION_BEGIN: - case Type::TRANSACTION_COMMIT: - case Type::TRANSACTION_ABORT: - break; - case Type::CREATE_VERTEX: - DECODE_MEMBER(vertex_id_, ValueInt) - break; - case Type::CREATE_EDGE: - DECODE_MEMBER(edge_id_, ValueInt) - DECODE_MEMBER(vertex_from_id_, ValueInt) - DECODE_MEMBER(vertex_to_id_, ValueInt) - DECODE_MEMBER(edge_type_, ValueString) - break; - case Type::SET_PROPERTY_VERTEX: - DECODE_MEMBER(vertex_id_, ValueInt) - DECODE_MEMBER(property_, ValueString) - if (!decoder.ReadValue(&dv)) return nullopt; - r_val.value_ = static_cast(dv); - break; - case Type::SET_PROPERTY_EDGE: - DECODE_MEMBER(edge_id_, ValueInt) - DECODE_MEMBER(property_, ValueString) - if (!decoder.ReadValue(&dv)) return nullopt; - r_val.value_ = static_cast(dv); - break; - case Type::ADD_LABEL: - case Type::REMOVE_LABEL: - DECODE_MEMBER(vertex_id_, ValueInt) - DECODE_MEMBER(label_, ValueString) - break; - case Type::REMOVE_VERTEX: - DECODE_MEMBER(vertex_id_, ValueInt) - break; - case Type::REMOVE_EDGE: - DECODE_MEMBER(edge_id_, ValueInt) - break; - case Type::BUILD_INDEX: - DECODE_MEMBER(label_, ValueString) - DECODE_MEMBER(property_, ValueString) - break; - } - - auto decoder_hash = reader.hash(); - uint64_t encoded_hash; - if (!reader.ReadType(encoded_hash, true)) return nullopt; - if (decoder_hash != encoded_hash) return nullopt; - - return r_val; - } catch (communication::bolt::DecodedValueException &) { - return nullopt; - } catch (std::ifstream::failure &) { - return nullopt; - } -} - -#undef DECODE_MEMBER - WriteAheadLog::WriteAheadLog( const std::experimental::filesystem::path &durability_dir, bool durability_enabled) - : ops_{FLAGS_wal_buffer_size}, wal_file_{durability_dir} { + : deltas_{FLAGS_wal_buffer_size}, wal_file_{durability_dir} { if (durability_enabled) { CheckDurabilityDir(durability_dir); wal_file_.Init(); scheduler_.Run(std::chrono::milliseconds(FLAGS_wal_flush_interval_millis), - [this]() { wal_file_.Flush(ops_); }); + [this]() { wal_file_.Flush(deltas_); }); } } WriteAheadLog::~WriteAheadLog() { // TODO review : scheduler.Stop() legal if it wasn't started? scheduler_.Stop(); - if (enabled_) wal_file_.Flush(ops_); -} - -void WriteAheadLog::TxBegin(tx::transaction_id_t tx_id) { - Emplace({Op::Type::TRANSACTION_BEGIN, tx_id}); -} - -void WriteAheadLog::TxCommit(tx::transaction_id_t tx_id) { - Emplace({Op::Type::TRANSACTION_COMMIT, tx_id}); -} - -void WriteAheadLog::TxAbort(tx::transaction_id_t tx_id) { - Emplace({Op::Type::TRANSACTION_ABORT, tx_id}); -} - -void WriteAheadLog::CreateVertex(tx::transaction_id_t tx_id, - gid::Gid vertex_id) { - Op op(Op::Type::CREATE_VERTEX, tx_id); - op.vertex_id_ = vertex_id; - Emplace(std::move(op)); -} - -void WriteAheadLog::CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id, - gid::Gid vertex_from_id, gid::Gid vertex_to_id, - const std::string &edge_type) { - Op op(Op::Type::CREATE_EDGE, tx_id); - op.edge_id_ = edge_id; - op.vertex_from_id_ = vertex_from_id; - op.vertex_to_id_ = vertex_to_id; - op.edge_type_ = edge_type; - Emplace(std::move(op)); -} - -void WriteAheadLog::PropsSetVertex(tx::transaction_id_t tx_id, - gid::Gid vertex_id, - const std::string &property, - const PropertyValue &value) { - Op op(Op::Type::SET_PROPERTY_VERTEX, tx_id); - op.vertex_id_ = vertex_id; - op.property_ = property; - op.value_ = value; - Emplace(std::move(op)); -} - -void WriteAheadLog::PropsSetEdge(tx::transaction_id_t tx_id, gid::Gid edge_id, - const std::string &property, - const PropertyValue &value) { - Op op(Op::Type::SET_PROPERTY_EDGE, tx_id); - op.edge_id_ = edge_id; - op.property_ = property; - op.value_ = value; - Emplace(std::move(op)); -} - -void WriteAheadLog::AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id, - const std::string &label) { - Op op(Op::Type::ADD_LABEL, tx_id); - op.vertex_id_ = vertex_id; - op.label_ = label; - Emplace(std::move(op)); -} - -void WriteAheadLog::RemoveLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id, - const std::string &label) { - Op op(Op::Type::REMOVE_LABEL, tx_id); - op.vertex_id_ = vertex_id; - op.label_ = label; - Emplace(std::move(op)); -} - -void WriteAheadLog::RemoveVertex(tx::transaction_id_t tx_id, - gid::Gid vertex_id) { - Op op(Op::Type::REMOVE_VERTEX, tx_id); - op.vertex_id_ = vertex_id; - Emplace(std::move(op)); -} - -void WriteAheadLog::RemoveEdge(tx::transaction_id_t tx_id, gid::Gid edge_id) { - Op op(Op::Type::REMOVE_EDGE, tx_id); - op.edge_id_ = edge_id; - Emplace(std::move(op)); -} - -void WriteAheadLog::BuildIndex(tx::transaction_id_t tx_id, - const std::string &label, - const std::string &property) { - Op op(Op::Type::BUILD_INDEX, tx_id); - op.label_ = label; - op.property_ = property; - Emplace(std::move(op)); + if (enabled_) wal_file_.Flush(deltas_); } WriteAheadLog::WalFile::WalFile( @@ -275,10 +60,10 @@ void WriteAheadLog::WalFile::Init() { } } latest_tx_ = 0; - current_wal_file_ops_count_ = 0; + current_wal_file_delta_count_ = 0; } -void WriteAheadLog::WalFile::Flush(RingBuffer &buffer) { +void WriteAheadLog::WalFile::Flush(RingBuffer &buffer) { if (current_wal_file_.empty()) { LOG(ERROR) << "Write-ahead log file uninitialized, discarding data."; buffer.clear(); @@ -287,11 +72,11 @@ void WriteAheadLog::WalFile::Flush(RingBuffer &buffer) { try { while (true) { - auto op = buffer.pop(); - if (!op) break; - latest_tx_ = std::max(latest_tx_, op->transaction_id_); - op->Encode(writer_, encoder_); - if (++current_wal_file_ops_count_ >= FLAGS_wal_rotate_ops_count) + auto delta = buffer.pop(); + if (!delta) break; + latest_tx_ = std::max(latest_tx_, delta->transaction_id()); + delta->Encode(writer_, encoder_); + if (++current_wal_file_delta_count_ >= FLAGS_wal_rotate_deltas_count) RotateFile(); } writer_.Flush(); @@ -312,9 +97,9 @@ void WriteAheadLog::WalFile::RotateFile() { Init(); } -void WriteAheadLog::Emplace(Op &&op) { +void WriteAheadLog::Emplace(database::StateDelta &&delta) { if (enabled_ && FLAGS_wal_flush_interval_millis >= 0) - ops_.emplace(std::move(op)); + deltas_.emplace(std::move(delta)); } } // namespace durability diff --git a/src/durability/wal.hpp b/src/durability/wal.hpp index 2b72a47a4..5ca961839 100644 --- a/src/durability/wal.hpp +++ b/src/durability/wal.hpp @@ -12,8 +12,7 @@ #include "communication/bolt/v1/encoder/primitive_encoder.hpp" #include "data_structures/ring_buffer.hpp" #include "database/graph_db_datatypes.hpp" -#include "durability/hashed_file_reader.hpp" -#include "durability/hashed_file_writer.hpp" +#include "database/state_delta.hpp" #include "storage/gid.hpp" #include "storage/property_value.hpp" #include "transactions/type.hpp" @@ -21,8 +20,8 @@ namespace durability { -/** A database operation log for durability. Buffers and periodically serializes - * small-granulation database operations (Ops). +/** A database StateDelta log for durability. Buffers and periodically + * serializes small-granulation database deltas (StateDelta). * * The order is not deterministic in a multithreaded scenario (multiple DB * transactions). This is fine, the recovery process should be immune to this @@ -30,63 +29,6 @@ namespace durability { */ class WriteAheadLog { public: - /** A single operation that needs to be written to the write-ahead log. Either - * a transaction operation (start, commit or abort), or a database storage - * operation being executed within a transaction. */ - class Op { - public: - /** Defines operation type. For each type the comment indicates which values - * need to be stored. All ops have the transaction_id member, so that's - * omitted in the comment. */ - enum class Type { - TRANSACTION_BEGIN, - TRANSACTION_COMMIT, - TRANSACTION_ABORT, - CREATE_VERTEX, // vertex_id - CREATE_EDGE, // edge_id, from_vertex_id, to_vertex_id, edge_type - SET_PROPERTY_VERTEX, // vertex_id, property, property_value - SET_PROPERTY_EDGE, // edge_id, property, property_value - // remove property is done by setting a PropertyValue::Null - ADD_LABEL, // vertex_id, label - REMOVE_LABEL, // vertex_id, label - REMOVE_VERTEX, // vertex_id - REMOVE_EDGE, // edge_id - BUILD_INDEX // label, property - }; - - Op() = default; - Op(const Type &type, tx::transaction_id_t tx_id) - : type_(type), transaction_id_(tx_id) {} - - // Members valid for every op. - Type type_; - tx::transaction_id_t transaction_id_; - // Hash obtained from the HashedFileWriter obtained after writing all the Op - // values. Cumulative with previous writes. - uint32_t hash_; - - // Members valid only for some ops, see Op::Type comments above. - gid::Gid vertex_id_; - gid::Gid edge_id_; - gid::Gid vertex_from_id_; - gid::Gid vertex_to_id_; - std::string edge_type_; - std::string property_; - PropertyValue value_ = PropertyValue::Null; - std::string label_; - - void Encode( - HashedFileWriter &writer, - communication::bolt::PrimitiveEncoder &encoder) const; - - public: - /** Attempts to decode a WAL::Op from the given decoder. Returns the decoded - * value if successful, otherwise returns nullopt. */ - static std::experimental::optional Decode( - HashedFileReader &reader, - communication::bolt::Decoder &decoder); - }; - WriteAheadLog(const std::experimental::filesystem::path &durability_dir, bool durability_enabled); ~WriteAheadLog(); @@ -95,25 +37,8 @@ class WriteAheadLog { * (optional) recovery. */ void Enable() { enabled_ = true; } - void TxBegin(tx::transaction_id_t tx_id); - void TxCommit(tx::transaction_id_t tx_id); - void TxAbort(tx::transaction_id_t tx_id); - void CreateVertex(tx::transaction_id_t tx_id, gid::Gid vertex_id); - void CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id, - gid::Gid vertex_from_id, gid::Gid vertex_to_id, - const std::string &edge_type); - void PropsSetVertex(tx::transaction_id_t tx_id, gid::Gid vertex_id, - const std::string &property, const PropertyValue &value); - void PropsSetEdge(tx::transaction_id_t tx_id, gid::Gid edge_id, - const std::string &property, const PropertyValue &value); - void AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id, - const std::string &label); - void RemoveLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id, - const std::string &label); - void RemoveVertex(tx::transaction_id_t tx_id, gid::Gid vertex_id); - void RemoveEdge(tx::transaction_id_t tx_id, gid::Gid edge_id); - void BuildIndex(tx::transaction_id_t tx_id, const std::string &label, - const std::string &property); + // Emplaces the given DeltaState onto the buffer, if the WAL is enabled. + void Emplace(database::StateDelta &&delta); private: /** Groups the logic of WAL file handling (flushing, naming, rotating) */ @@ -126,9 +51,9 @@ class WriteAheadLog { * called after Flush() to re-initialize stuff. */ void Init(); - /** Flushes all the ops in the buffer to the WAL file. If necessary rotates - * the file. */ - void Flush(RingBuffer &buffer); + /** Flushes all the deltas in the buffer to the WAL file. If necessary + * rotates the file. */ + void Flush(RingBuffer &buffer); private: const std::experimental::filesystem::path wal_dir_; @@ -139,8 +64,8 @@ class WriteAheadLog { // moved when the WAL gets rotated. std::experimental::filesystem::path current_wal_file_; - // Number of Ops in the current wal file. - int current_wal_file_ops_count_{0}; + // Number of deltas in the current wal file. + int current_wal_file_delta_count_{0}; // The latest transaction whose delta is recorded in the current WAL file. // Zero indicates that no deltas have so far been written to the current WAL @@ -150,13 +75,10 @@ class WriteAheadLog { void RotateFile(); }; - RingBuffer ops_; + RingBuffer deltas_; Scheduler scheduler_; WalFile wal_file_; // Used for disabling the WAL during DB recovery. bool enabled_{false}; - - // Emplaces the given Op onto the buffer, if the WAL is enabled. - void Emplace(Op &&op); }; } // namespace durability diff --git a/src/storage/record_accessor.cpp b/src/storage/record_accessor.cpp index 2b41f9e1f..426cd263b 100644 --- a/src/storage/record_accessor.cpp +++ b/src/storage/record_accessor.cpp @@ -22,8 +22,8 @@ void RecordAccessor::PropsSet(GraphDbTypes::Property key, Vertex &vertex = update(); vertex.properties_.set(key, value); auto &dba = db_accessor(); - dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->gid_, - dba.PropertyName(key), value); + dba.wal().Emplace(database::StateDelta::PropsSetVertex( + dba.transaction_id(), vlist_->gid_, dba.PropertyName(key), value)); db_accessor().UpdatePropertyIndex(key, *this, &vertex); } @@ -32,23 +32,25 @@ void RecordAccessor::PropsSet(GraphDbTypes::Property key, PropertyValue value) { update().properties_.set(key, value); auto &dba = db_accessor(); - dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->gid_, - dba.PropertyName(key), value); + dba.wal().Emplace(database::StateDelta::PropsSetEdge( + dba.transaction_id(), vlist_->gid_, dba.PropertyName(key), value)); } template <> size_t RecordAccessor::PropsErase(GraphDbTypes::Property key) { auto &dba = db_accessor(); - dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->gid_, - dba.PropertyName(key), PropertyValue::Null); + dba.wal().Emplace(database::StateDelta::PropsSetVertex( + dba.transaction_id(), vlist_->gid_, dba.PropertyName(key), + PropertyValue::Null)); return update().properties_.erase(key); } template <> size_t RecordAccessor::PropsErase(GraphDbTypes::Property key) { auto &dba = db_accessor(); - dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->gid_, - dba.PropertyName(key), PropertyValue::Null); + dba.wal().Emplace(database::StateDelta::PropsSetEdge( + dba.transaction_id(), vlist_->gid_, dba.PropertyName(key), + PropertyValue::Null)); return update().properties_.erase(key); } @@ -57,8 +59,9 @@ void RecordAccessor::PropsClear() { auto &updated = update(); auto &dba = db_accessor(); for (const auto &kv : updated.properties_) - dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->gid_, - dba.PropertyName(kv.first), PropertyValue::Null); + dba.wal().Emplace(database::StateDelta::PropsSetVertex( + dba.transaction_id(), vlist_->gid_, dba.PropertyName(kv.first), + PropertyValue::Null)); updated.properties_.clear(); } @@ -67,8 +70,9 @@ void RecordAccessor::PropsClear() { auto &updated = update(); auto &dba = db_accessor(); for (const auto &kv : updated.properties_) - dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->gid_, - dba.PropertyName(kv.first), PropertyValue::Null); + dba.wal().Emplace(database::StateDelta::PropsSetEdge( + dba.transaction_id(), vlist_->gid_, dba.PropertyName(kv.first), + PropertyValue::Null)); updated.properties_.clear(); } diff --git a/src/storage/vertex_accessor.cpp b/src/storage/vertex_accessor.cpp index eda09283a..faacfdbee 100644 --- a/src/storage/vertex_accessor.cpp +++ b/src/storage/vertex_accessor.cpp @@ -19,7 +19,8 @@ bool VertexAccessor::add_label(GraphDbTypes::Label label) { vertex.labels_.emplace_back(label); auto &dba = db_accessor(); dba.UpdateLabelIndices(label, *this, &vertex); - dba.wal().AddLabel(dba.transaction_id(), gid(), dba.LabelName(label)); + dba.wal().Emplace(database::StateDelta::AddLabel(dba.transaction_id(), gid(), + dba.LabelName(label))); return true; } @@ -31,7 +32,8 @@ size_t VertexAccessor::remove_label(GraphDbTypes::Label label) { std::swap(*found, labels.back()); labels.pop_back(); auto &dba = db_accessor(); - dba.wal().RemoveLabel(dba.transaction_id(), gid(), dba.LabelName(label)); + dba.wal().Emplace(database::StateDelta::RemoveLabel( + dba.transaction_id(), gid(), dba.LabelName(label))); return 1; } diff --git a/tests/unit/durability.cpp b/tests/unit/durability.cpp index a73cfad97..e55bc01c2 100644 --- a/tests/unit/durability.cpp +++ b/tests/unit/durability.cpp @@ -12,6 +12,7 @@ #include "communication/bolt/v1/decoder/decoder.hpp" #include "database/graph_db.hpp" #include "database/graph_db_accessor.hpp" +#include "database/state_delta.hpp" #include "durability/hashed_file_reader.hpp" #include "durability/paths.hpp" #include "durability/recovery.hpp" @@ -20,7 +21,7 @@ #include "utils/string.hpp" DECLARE_int32(wal_flush_interval_millis); -DECLARE_int32(wal_rotate_ops_count); +DECLARE_int32(wal_rotate_deltas_count); namespace fs = std::experimental::filesystem; @@ -297,13 +298,15 @@ class Durability : public ::testing::Test { durability_dir_ = tmp_dir_ / utils::RandomString(24); snapshot_dir_ = durability_dir_ / durability::kSnapshotDir; wal_dir_ = durability_dir_ / durability::kWalDir; - FLAGS_wal_rotate_ops_count = 1000; + FLAGS_wal_rotate_deltas_count = 1000; CleanDurability(); } void TearDown() override { CleanDurability(); } }; +// Tests wal encoder to encode correctly non-CRUD deltas, and that all deltas +// are written in the correct order TEST_F(Durability, WalEncoding) { auto gid0 = gid::Create(0, 0); auto gid1 = gid::Create(0, 1); @@ -332,55 +335,42 @@ TEST_F(Durability, WalEncoding) { ASSERT_EQ(DirFiles(wal_dir_).size(), 1); ASSERT_TRUE(reader.Open(GetLastFile(wal_dir_))); communication::bolt::Decoder decoder{reader}; - std::vector ops; + std::vector deltas; while (true) { - auto op = durability::WriteAheadLog::Op::Decode(reader, decoder); - if (op) { - ops.emplace_back(*op); + auto delta = database::StateDelta::Decode(reader, decoder); + if (delta) { + deltas.emplace_back(*delta); } else { break; } } reader.Close(); - ASSERT_EQ(ops.size(), 11); + ASSERT_EQ(deltas.size(), 11); - using Type = durability::WriteAheadLog::Op::Type; - EXPECT_EQ(ops[0].type_, Type::TRANSACTION_BEGIN); - EXPECT_EQ(ops[0].transaction_id_, 1); - EXPECT_EQ(ops[1].type_, Type::CREATE_VERTEX); - EXPECT_EQ(ops[1].transaction_id_, 1); - EXPECT_EQ(ops[1].vertex_id_, gid0); - EXPECT_EQ(ops[2].type_, Type::ADD_LABEL); - EXPECT_EQ(ops[2].transaction_id_, 1); - EXPECT_EQ(ops[2].label_, "l0"); - EXPECT_EQ(ops[3].type_, Type::SET_PROPERTY_VERTEX); - EXPECT_EQ(ops[3].transaction_id_, 1); - EXPECT_EQ(ops[3].vertex_id_, gid0); - EXPECT_EQ(ops[3].property_, "p0"); - EXPECT_EQ(ops[3].value_.type(), PropertyValue::Type::Int); - EXPECT_EQ(ops[3].value_.Value(), 42); - EXPECT_EQ(ops[4].type_, Type::CREATE_VERTEX); - EXPECT_EQ(ops[4].transaction_id_, 1); - EXPECT_EQ(ops[4].vertex_id_, gid1); - EXPECT_EQ(ops[5].type_, Type::CREATE_EDGE); - EXPECT_EQ(ops[5].transaction_id_, 1); - EXPECT_EQ(ops[5].edge_id_, gid0); - EXPECT_EQ(ops[5].vertex_from_id_, gid0); - EXPECT_EQ(ops[5].vertex_to_id_, gid1); - EXPECT_EQ(ops[5].edge_type_, "et0"); - EXPECT_EQ(ops[6].type_, Type::SET_PROPERTY_EDGE); - EXPECT_EQ(ops[6].transaction_id_, 1); - EXPECT_EQ(ops[6].edge_id_, gid0); - EXPECT_EQ(ops[6].property_, "p0"); - EXPECT_EQ(ops[6].value_.type(), PropertyValue::Type::List); - // The next two ops are the BuildIndex internal transactions. - EXPECT_EQ(ops[7].type_, Type::TRANSACTION_BEGIN); - EXPECT_EQ(ops[8].type_, Type::TRANSACTION_COMMIT); - EXPECT_EQ(ops[9].type_, Type::BUILD_INDEX); - EXPECT_EQ(ops[9].label_, "l1"); - EXPECT_EQ(ops[9].property_, "p1"); - EXPECT_EQ(ops[10].type_, Type::TRANSACTION_COMMIT); - EXPECT_EQ(ops[10].transaction_id_, 1); + using Type = enum database::StateDelta::Type; + EXPECT_EQ(deltas[0].type(), Type::TRANSACTION_BEGIN); + EXPECT_EQ(deltas[0].transaction_id(), 1); + EXPECT_EQ(deltas[1].type(), Type::CREATE_VERTEX); + EXPECT_EQ(deltas[1].transaction_id(), 1); + EXPECT_EQ(deltas[2].type(), Type::ADD_LABEL); + EXPECT_EQ(deltas[2].transaction_id(), 1); + EXPECT_EQ(deltas[3].type(), Type::SET_PROPERTY_VERTEX); + EXPECT_EQ(deltas[3].transaction_id(), 1); + EXPECT_EQ(deltas[4].type(), Type::CREATE_VERTEX); + EXPECT_EQ(deltas[4].transaction_id(), 1); + EXPECT_EQ(deltas[5].type(), Type::CREATE_EDGE); + EXPECT_EQ(deltas[5].transaction_id(), 1); + EXPECT_EQ(deltas[6].type(), Type::SET_PROPERTY_EDGE); + EXPECT_EQ(deltas[6].transaction_id(), 1); + // The next two deltas are the BuildIndex internal transactions. + EXPECT_EQ(deltas[7].type(), Type::TRANSACTION_BEGIN); + EXPECT_EQ(deltas[8].type(), Type::TRANSACTION_COMMIT); + EXPECT_EQ(deltas[9].type(), Type::BUILD_INDEX); + auto index_name = deltas[9].IndexName(); + EXPECT_EQ(index_name.first, "l1"); + EXPECT_EQ(index_name.second, "p1"); + EXPECT_EQ(deltas[10].type(), Type::TRANSACTION_COMMIT); + EXPECT_EQ(deltas[10].transaction_id(), 1); } TEST_F(Durability, SnapshotEncoding) { @@ -645,7 +635,7 @@ TEST_F(Durability, SnapshotRetention) { } TEST_F(Durability, WalRetention) { - FLAGS_wal_rotate_ops_count = 100; + FLAGS_wal_rotate_deltas_count = 100; auto config = DbConfig(); config.durability_enabled = true; GraphDb db{config}; diff --git a/tests/unit/state_delta.cpp b/tests/unit/state_delta.cpp new file mode 100644 index 000000000..3f6fe9c7f --- /dev/null +++ b/tests/unit/state_delta.cpp @@ -0,0 +1,194 @@ +#include "gtest/gtest.h" + +#include "database/graph_db.hpp" +#include "database/graph_db_accessor.hpp" +#include "database/state_delta.hpp" + +TEST(StateDelta, CreateVertex) { + GraphDb db; + auto gid0 = gid::Create(0, 0); + { + GraphDbAccessor dba(db); + auto delta = database::StateDelta::CreateVertex(dba.transaction_id(), gid0); + delta.Apply(dba); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto vertex = dba.FindVertex(gid0, false); + EXPECT_TRUE(vertex); + } +} + +TEST(StateDelta, RemoveVertex) { + GraphDb db; + auto gid0 = gid::Create(0, 0); + { + GraphDbAccessor dba(db); + dba.InsertVertex(gid0); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto delta = database::StateDelta::RemoveVertex(dba.transaction_id(), gid0); + delta.Apply(dba); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto vertex = dba.FindVertex(gid0, false); + EXPECT_FALSE(vertex); + } +} + +TEST(StateDelta, CreateEdge) { + GraphDb db; + auto gid0 = gid::Create(0, 0); + auto gid1 = gid::Create(0, 1); + auto gid2 = gid::Create(0, 2); + { + GraphDbAccessor dba(db); + dba.InsertVertex(gid0); + dba.InsertVertex(gid1); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto delta = database::StateDelta::CreateEdge(dba.transaction_id(), gid2, + gid0, gid1, "edge"); + delta.Apply(dba); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto edge = dba.FindEdge(gid2, false); + EXPECT_TRUE(edge); + } +} + +TEST(StateDelta, RemoveEdge) { + GraphDb db; + auto gid0 = gid::Create(0, 0); + auto gid1 = gid::Create(0, 1); + auto gid2 = gid::Create(0, 2); + { + GraphDbAccessor dba(db); + auto v0 = dba.InsertVertex(gid0); + auto v1 = dba.InsertVertex(gid1); + dba.InsertEdge(v0, v1, dba.EdgeType("edge"), gid2); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto delta = database::StateDelta::RemoveEdge(dba.transaction_id(), gid2); + delta.Apply(dba); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto edge = dba.FindEdge(gid2, false); + EXPECT_FALSE(edge); + } +} + +TEST(StateDelta, AddLabel) { + GraphDb db; + auto gid0 = gid::Create(0, 0); + { + GraphDbAccessor dba(db); + dba.InsertVertex(gid0); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto delta = + database::StateDelta::AddLabel(dba.transaction_id(), gid0, "label"); + delta.Apply(dba); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto vertex = dba.FindVertex(gid0, false); + EXPECT_TRUE(vertex); + auto labels = vertex->labels(); + EXPECT_EQ(labels.size(), 1); + EXPECT_EQ(labels[0], dba.Label("label")); + } +} + +TEST(StateDelta, RemoveLabel) { + GraphDb db; + auto gid0 = gid::Create(0, 0); + { + GraphDbAccessor dba(db); + auto vertex = dba.InsertVertex(gid0); + vertex.add_label(dba.Label("label")); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto delta = + database::StateDelta::RemoveLabel(dba.transaction_id(), gid0, "label"); + delta.Apply(dba); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto vertex = dba.FindVertex(gid0, false); + EXPECT_TRUE(vertex); + auto labels = vertex->labels(); + EXPECT_EQ(labels.size(), 0); + } +} + +TEST(StateDelta, SetPropertyVertex) { + GraphDb db; + auto gid0 = gid::Create(0, 0); + { + GraphDbAccessor dba(db); + dba.InsertVertex(gid0); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto delta = database::StateDelta::PropsSetVertex( + dba.transaction_id(), gid0, "property", PropertyValue(2212)); + delta.Apply(dba); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto vertex = dba.FindVertex(gid0, false); + EXPECT_TRUE(vertex); + auto prop = vertex->PropsAt(dba.Property("property")); + EXPECT_EQ(prop.Value(), 2212); + } +} + +TEST(StateDelta, SetPropertyEdge) { + GraphDb db; + auto gid0 = gid::Create(0, 0); + auto gid1 = gid::Create(0, 1); + auto gid2 = gid::Create(0, 2); + { + GraphDbAccessor dba(db); + auto v0 = dba.InsertVertex(gid0); + auto v1 = dba.InsertVertex(gid1); + dba.InsertEdge(v0, v1, dba.EdgeType("edge"), gid2); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto delta = database::StateDelta::PropsSetEdge( + dba.transaction_id(), gid2, "property", PropertyValue(2212)); + delta.Apply(dba); + dba.Commit(); + } + { + GraphDbAccessor dba(db); + auto edge = dba.FindEdge(gid2, false); + EXPECT_TRUE(edge); + auto prop = edge->PropsAt(dba.Property("property")); + EXPECT_EQ(prop.Value(), 2212); + } +}