Refactor operations into StateDeltas

Summary: Operations are moved and renamed from WAL to a separate file in preparation for HA and distributed storage.

Reviewers: florijan, mtomic, mislav.bradac

Reviewed By: florijan

Subscribers: mislav.bradac, pullbot

Differential Revision: https://phabricator.memgraph.io/D1034
This commit is contained in:
Dominik Gleich 2017-12-07 13:09:34 +01:00
parent b1cfe46c4a
commit 03db948d7e
11 changed files with 713 additions and 475 deletions

View File

@ -16,6 +16,7 @@ set(memgraph_src_files
database/graph_db.cpp database/graph_db.cpp
database/graph_db_config.cpp database/graph_db_config.cpp
database/graph_db_accessor.cpp database/graph_db_accessor.cpp
database/state_delta.cpp
durability/paths.cpp durability/paths.cpp
durability/recovery.cpp durability/recovery.cpp
durability/snapshooter.cpp durability/snapshooter.cpp

View File

@ -11,7 +11,7 @@
GraphDbAccessor::GraphDbAccessor(GraphDb &db) GraphDbAccessor::GraphDbAccessor(GraphDb &db)
: db_(db), transaction_(MasterEngine().Begin()) { : db_(db), transaction_(MasterEngine().Begin()) {
db_.wal_.TxBegin(transaction_->id_); db_.wal_.Emplace(database::StateDelta::TxBegin(transaction_->id_));
} }
GraphDbAccessor::~GraphDbAccessor() { GraphDbAccessor::~GraphDbAccessor() {
@ -33,7 +33,7 @@ void GraphDbAccessor::Commit() {
DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction."; DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction.";
auto tid = transaction_->id_; auto tid = transaction_->id_;
MasterEngine().Commit(*transaction_); MasterEngine().Commit(*transaction_);
db_.wal_.TxCommit(tid); db_.wal_.Emplace(database::StateDelta::TxCommit(tid));
commited_ = true; commited_ = true;
} }
@ -41,7 +41,7 @@ void GraphDbAccessor::Abort() {
DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction."; DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction.";
auto tid = transaction_->id_; auto tid = transaction_->id_;
MasterEngine().Abort(*transaction_); MasterEngine().Abort(*transaction_);
db_.wal_.TxAbort(tid); db_.wal_.Emplace(database::StateDelta::TxAbort(tid));
aborted_ = true; aborted_ = true;
} }
@ -68,7 +68,8 @@ VertexAccessor GraphDbAccessor::InsertVertex(
bool success = db_.vertices_.access().insert(id, vertex_vlist).second; bool success = db_.vertices_.access().insert(id, vertex_vlist).second;
CHECK(success) << "Attempting to insert a vertex with an existing ID: " << id; CHECK(success) << "Attempting to insert a vertex with an existing ID: " << id;
db_.wal_.CreateVertex(transaction_->id_, vertex_vlist->gid_); db_.wal_.Emplace(database::StateDelta::CreateVertex(transaction_->id_,
vertex_vlist->gid_));
return VertexAccessor(*vertex_vlist, *this); return VertexAccessor(*vertex_vlist, *this);
} }
@ -148,8 +149,8 @@ void GraphDbAccessor::BuildIndex(const GraphDbTypes::Label &label,
// reason. // reason.
auto wal_build_index_tx_id = dba.transaction_id(); auto wal_build_index_tx_id = dba.transaction_id();
dba.Commit(); dba.Commit();
db_.wal_.BuildIndex(wal_build_index_tx_id, LabelName(label), db_.wal_.Emplace(database::StateDelta::BuildIndex(
PropertyName(property)); wal_build_index_tx_id, LabelName(label), PropertyName(property)));
// After these two operations we are certain that everything is contained in // After these two operations we are certain that everything is contained in
// the index under the assumption that this transaction contained no // the index under the assumption that this transaction contained no
@ -252,7 +253,8 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor) {
if (vertex_accessor.out_degree() > 0 || vertex_accessor.in_degree() > 0) if (vertex_accessor.out_degree() > 0 || vertex_accessor.in_degree() > 0)
return false; return false;
db_.wal_.RemoveVertex(transaction_->id_, vertex_accessor.vlist_->gid_); db_.wal_.Emplace(database::StateDelta::RemoveVertex(
transaction_->id_, vertex_accessor.vlist_->gid_));
vertex_accessor.vlist_->remove(vertex_accessor.current_, *transaction_); vertex_accessor.vlist_->remove(vertex_accessor.current_, *transaction_);
return true; return true;
@ -305,8 +307,9 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
to.SwitchNew(); to.SwitchNew();
to.update().in_.emplace(from.vlist_, edge_vlist, edge_type); to.update().in_.emplace(from.vlist_, edge_vlist, edge_type);
db_.wal_.CreateEdge(transaction_->id_, edge_vlist->gid_, from.vlist_->gid_, db_.wal_.Emplace(database::StateDelta::CreateEdge(
to.vlist_->gid_, EdgeTypeName(edge_type)); transaction_->id_, edge_vlist->gid_, from.vlist_->gid_, to.vlist_->gid_,
EdgeTypeName(edge_type)));
return EdgeAccessor(*edge_vlist, *this, from.vlist_, to.vlist_, edge_type); return EdgeAccessor(*edge_vlist, *this, from.vlist_, to.vlist_, edge_type);
} }
@ -329,7 +332,8 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge_accessor,
edge_accessor.to().update().in_.RemoveEdge(edge_accessor.vlist_); edge_accessor.to().update().in_.RemoveEdge(edge_accessor.vlist_);
edge_accessor.vlist_->remove(edge_accessor.current_, *transaction_); edge_accessor.vlist_->remove(edge_accessor.current_, *transaction_);
db_.wal_.RemoveEdge(transaction_->id_, edge_accessor.gid()); db_.wal_.Emplace(
database::StateDelta::RemoveEdge(transaction_->id_, edge_accessor.gid()));
} }
GraphDbTypes::Label GraphDbAccessor::Label(const std::string &label_name) { GraphDbTypes::Label GraphDbAccessor::Label(const std::string &label_name) {

View File

@ -0,0 +1,296 @@
#include <string>
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp"
namespace database {
std::pair<std::string, std::string> StateDelta::IndexName() const {
CHECK(StateDelta::type() == StateDelta::Type::BUILD_INDEX)
<< "Invalid operation type to try to get index name";
return std::make_pair(label_, property_);
}
StateDelta StateDelta::TxBegin(tx::transaction_id_t tx_id) {
return {StateDelta::Type::TRANSACTION_BEGIN, tx_id};
}
StateDelta StateDelta::TxCommit(tx::transaction_id_t tx_id) {
return {StateDelta::Type::TRANSACTION_COMMIT, tx_id};
}
StateDelta StateDelta::TxAbort(tx::transaction_id_t tx_id) {
return {StateDelta::Type::TRANSACTION_ABORT, tx_id};
}
StateDelta StateDelta::CreateVertex(tx::transaction_id_t tx_id,
gid::Gid vertex_id) {
StateDelta op(StateDelta::Type::CREATE_VERTEX, tx_id);
op.vertex_id_ = vertex_id;
return op;
}
StateDelta StateDelta::CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
gid::Gid vertex_from_id,
gid::Gid vertex_to_id,
const std::string &edge_type) {
StateDelta op(StateDelta::Type::CREATE_EDGE, tx_id);
op.edge_id_ = edge_id;
op.vertex_from_id_ = vertex_from_id;
op.vertex_to_id_ = vertex_to_id;
op.edge_type_ = edge_type;
return op;
}
StateDelta StateDelta::PropsSetVertex(tx::transaction_id_t tx_id,
gid::Gid vertex_id,
const std::string &property,
const PropertyValue &value) {
StateDelta op(StateDelta::Type::SET_PROPERTY_VERTEX, tx_id);
op.vertex_id_ = vertex_id;
op.property_ = property;
op.value_ = value;
return op;
}
StateDelta StateDelta::PropsSetEdge(tx::transaction_id_t tx_id,
gid::Gid edge_id,
const std::string &property,
const PropertyValue &value) {
StateDelta op(StateDelta::Type::SET_PROPERTY_EDGE, tx_id);
op.edge_id_ = edge_id;
op.property_ = property;
op.value_ = value;
return op;
}
StateDelta StateDelta::AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
const std::string &label) {
StateDelta op(StateDelta::Type::ADD_LABEL, tx_id);
op.vertex_id_ = vertex_id;
op.label_ = label;
return op;
}
StateDelta StateDelta::RemoveLabel(tx::transaction_id_t tx_id,
gid::Gid vertex_id,
const std::string &label) {
StateDelta op(StateDelta::Type::REMOVE_LABEL, tx_id);
op.vertex_id_ = vertex_id;
op.label_ = label;
return op;
}
StateDelta StateDelta::RemoveVertex(tx::transaction_id_t tx_id,
gid::Gid vertex_id) {
StateDelta op(StateDelta::Type::REMOVE_VERTEX, tx_id);
op.vertex_id_ = vertex_id;
return op;
}
StateDelta StateDelta::RemoveEdge(tx::transaction_id_t tx_id,
gid::Gid edge_id) {
StateDelta op(StateDelta::Type::REMOVE_EDGE, tx_id);
op.edge_id_ = edge_id;
return op;
}
StateDelta StateDelta::BuildIndex(tx::transaction_id_t tx_id,
const std::string &label,
const std::string &property) {
StateDelta op(StateDelta::Type::BUILD_INDEX, tx_id);
op.label_ = label;
op.property_ = property;
return op;
}
void StateDelta::Encode(
HashedFileWriter &writer,
communication::bolt::PrimitiveEncoder<HashedFileWriter> &encoder) const {
encoder.WriteInt(static_cast<int64_t>(type_));
encoder.WriteInt(static_cast<int64_t>(transaction_id_));
switch (type_) {
case Type::TRANSACTION_BEGIN:
case Type::TRANSACTION_COMMIT:
case Type::TRANSACTION_ABORT:
break;
case Type::CREATE_VERTEX:
encoder.WriteInt(vertex_id_);
break;
case Type::CREATE_EDGE:
encoder.WriteInt(edge_id_);
encoder.WriteInt(vertex_from_id_);
encoder.WriteInt(vertex_to_id_);
encoder.WriteString(edge_type_);
break;
case Type::SET_PROPERTY_VERTEX:
encoder.WriteInt(vertex_id_);
encoder.WriteString(property_);
encoder.WritePropertyValue(value_);
break;
case Type::SET_PROPERTY_EDGE:
encoder.WriteInt(edge_id_);
encoder.WriteString(property_);
encoder.WritePropertyValue(value_);
break;
case Type::ADD_LABEL:
case Type::REMOVE_LABEL:
encoder.WriteInt(vertex_id_);
encoder.WriteString(label_);
break;
case Type::REMOVE_VERTEX:
encoder.WriteInt(vertex_id_);
break;
case Type::REMOVE_EDGE:
encoder.WriteInt(edge_id_);
break;
case Type::BUILD_INDEX:
encoder.WriteString(label_);
encoder.WriteString(property_);
break;
}
writer.WriteValue(writer.hash());
}
#define DECODE_MEMBER(member, value_f) \
if (!decoder.ReadValue(&dv)) return nullopt; \
r_val.member = dv.value_f();
std::experimental::optional<StateDelta> StateDelta::Decode(
HashedFileReader &reader,
communication::bolt::Decoder<HashedFileReader> &decoder) {
using std::experimental::nullopt;
StateDelta r_val;
// The decoded value used as a temporary while decoding.
communication::bolt::DecodedValue dv;
try {
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.type_ = static_cast<enum StateDelta::Type>(dv.ValueInt());
DECODE_MEMBER(transaction_id_, ValueInt)
switch (r_val.type_) {
case Type::TRANSACTION_BEGIN:
case Type::TRANSACTION_COMMIT:
case Type::TRANSACTION_ABORT:
break;
case Type::CREATE_VERTEX:
DECODE_MEMBER(vertex_id_, ValueInt)
break;
case Type::CREATE_EDGE:
DECODE_MEMBER(edge_id_, ValueInt)
DECODE_MEMBER(vertex_from_id_, ValueInt)
DECODE_MEMBER(vertex_to_id_, ValueInt)
DECODE_MEMBER(edge_type_, ValueString)
break;
case Type::SET_PROPERTY_VERTEX:
DECODE_MEMBER(vertex_id_, ValueInt)
DECODE_MEMBER(property_, ValueString)
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.value_ = static_cast<PropertyValue>(dv);
break;
case Type::SET_PROPERTY_EDGE:
DECODE_MEMBER(edge_id_, ValueInt)
DECODE_MEMBER(property_, ValueString)
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.value_ = static_cast<PropertyValue>(dv);
break;
case Type::ADD_LABEL:
case Type::REMOVE_LABEL:
DECODE_MEMBER(vertex_id_, ValueInt)
DECODE_MEMBER(label_, ValueString)
break;
case Type::REMOVE_VERTEX:
DECODE_MEMBER(vertex_id_, ValueInt)
break;
case Type::REMOVE_EDGE:
DECODE_MEMBER(edge_id_, ValueInt)
break;
case Type::BUILD_INDEX:
DECODE_MEMBER(label_, ValueString)
DECODE_MEMBER(property_, ValueString)
break;
}
auto decoder_hash = reader.hash();
uint64_t encoded_hash;
if (!reader.ReadType(encoded_hash, true)) return nullopt;
if (decoder_hash != encoded_hash) return nullopt;
return r_val;
} catch (communication::bolt::DecodedValueException &) {
return nullopt;
} catch (std::ifstream::failure &) {
return nullopt;
}
}
#undef DECODE_MEMBER
void StateDelta::Apply(GraphDbAccessor &dba) const {
switch (type_) {
// Transactional state is not recovered.
case Type::TRANSACTION_BEGIN:
case Type::TRANSACTION_COMMIT:
case Type::TRANSACTION_ABORT:
LOG(FATAL) << "Transaction handling not handled in Apply";
break;
case Type::CREATE_VERTEX:
dba.InsertVertex(vertex_id_);
break;
case Type::CREATE_EDGE: {
auto from = dba.FindVertex(vertex_from_id_, true);
auto to = dba.FindVertex(vertex_to_id_, true);
DCHECK(from) << "Failed to find vertex.";
DCHECK(to) << "Failed to find vertex.";
dba.InsertEdge(*from, *to, dba.EdgeType(edge_type_), edge_id_);
break;
}
case Type::SET_PROPERTY_VERTEX: {
auto vertex = dba.FindVertex(vertex_id_, true);
DCHECK(vertex) << "Failed to find vertex.";
vertex->PropsSet(dba.Property(property_), value_);
break;
}
case Type::SET_PROPERTY_EDGE: {
auto edge = dba.FindEdge(edge_id_, true);
DCHECK(edge) << "Failed to find edge.";
edge->PropsSet(dba.Property(property_), value_);
break;
}
case Type::ADD_LABEL: {
auto vertex = dba.FindVertex(vertex_id_, true);
DCHECK(vertex) << "Failed to find vertex.";
vertex->add_label(dba.Label(label_));
break;
}
case Type::REMOVE_LABEL: {
auto vertex = dba.FindVertex(vertex_id_, true);
DCHECK(vertex) << "Failed to find vertex.";
vertex->remove_label(dba.Label(label_));
break;
}
case Type::REMOVE_VERTEX: {
auto vertex = dba.FindVertex(vertex_id_, true);
DCHECK(vertex) << "Failed to find vertex.";
dba.DetachRemoveVertex(*vertex);
break;
}
case Type::REMOVE_EDGE: {
auto edge = dba.FindEdge(edge_id_, true);
DCHECK(edge) << "Failed to find edge.";
dba.RemoveEdge(*edge);
break;
}
case Type::BUILD_INDEX: {
LOG(FATAL) << "Index handling not handled in Apply";
break;
}
}
}
}; // namespace database

View File

@ -0,0 +1,101 @@
#pragma once
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "communication/bolt/v1/encoder/primitive_encoder.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/hashed_file_writer.hpp"
#include "storage/gid.hpp"
#include "storage/property_value.hpp"
#include "transactions/transaction.hpp"
namespace database {
/** Describes single change to the database state. Used for durability (WAL) and
* state communication over network in HA and for distributed remote storage
* changes*/
class StateDelta {
public:
/** Defines StateDelta type. For each type the comment indicates which values
* need to be stored. All deltas have the transaction_id member, so that's
* omitted in the comment. */
enum class Type {
TRANSACTION_BEGIN,
TRANSACTION_COMMIT,
TRANSACTION_ABORT,
CREATE_VERTEX, // vertex_id
CREATE_EDGE, // edge_id, from_vertex_id, to_vertex_id, edge_type
SET_PROPERTY_VERTEX, // vertex_id, property, property_value
SET_PROPERTY_EDGE, // edge_id, property, property_value
// remove property is done by setting a PropertyValue::Null
ADD_LABEL, // vertex_id, label
REMOVE_LABEL, // vertex_id, label
REMOVE_VERTEX, // vertex_id
REMOVE_EDGE, // edge_id
BUILD_INDEX // label, property
};
StateDelta() = default;
StateDelta(const enum Type &type, tx::transaction_id_t tx_id)
: type_(type), transaction_id_(tx_id) {}
/** Attempts to decode a StateDelta from the given decoder. Returns the
* decoded value if successful, otherwise returns nullopt. */
static std::experimental::optional<StateDelta> Decode(
HashedFileReader &reader,
communication::bolt::Decoder<HashedFileReader> &decoder);
/** Encodes the delta using primitive encoder, and writes out the new hash
* with delta to the writer */
void Encode(
HashedFileWriter &writer,
communication::bolt::PrimitiveEncoder<HashedFileWriter> &encoder) const;
tx::transaction_id_t transaction_id() const { return transaction_id_; }
Type type() const { return type_; }
static StateDelta TxBegin(tx::transaction_id_t tx_id);
static StateDelta TxCommit(tx::transaction_id_t tx_id);
static StateDelta TxAbort(tx::transaction_id_t tx_id);
static StateDelta CreateVertex(tx::transaction_id_t tx_id,
gid::Gid vertex_id);
static StateDelta CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
gid::Gid vertex_from_id, gid::Gid vertex_to_id,
const std::string &edge_type);
static StateDelta PropsSetVertex(tx::transaction_id_t tx_id,
gid::Gid vertex_id,
const std::string &property,
const PropertyValue &value);
static StateDelta PropsSetEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
const std::string &property,
const PropertyValue &value);
static StateDelta AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
const std::string &label);
static StateDelta RemoveLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
const std::string &label);
static StateDelta RemoveVertex(tx::transaction_id_t tx_id,
gid::Gid vertex_id);
static StateDelta RemoveEdge(tx::transaction_id_t tx_id, gid::Gid edge_id);
static StateDelta BuildIndex(tx::transaction_id_t tx_id,
const std::string &label,
const std::string &property);
std::pair<std::string, std::string> IndexName() const;
/// Applies CRUD delta to database accessor. Fails on other types of deltas
void Apply(GraphDbAccessor &dba) const;
private:
// Members valid for every delta.
enum Type type_;
tx::transaction_id_t transaction_id_;
// Members valid only for some deltas, see StateDelta::Type comments above.
gid::Gid vertex_id_;
gid::Gid edge_id_;
gid::Gid vertex_from_id_;
gid::Gid vertex_to_id_;
std::string edge_type_;
std::string property_;
PropertyValue value_ = PropertyValue::Null;
std::string label_;
};
} // namespace database

View File

@ -138,68 +138,6 @@ bool RecoverSnapshot(const fs::path &snapshot_file,
#undef RETURN_IF_NOT #undef RETURN_IF_NOT
void ApplyOp(const WriteAheadLog::Op &op, GraphDbAccessor &dba) {
switch (op.type_) {
// Transactional state is not recovered.
case WriteAheadLog::Op::Type::TRANSACTION_BEGIN:
case WriteAheadLog::Op::Type::TRANSACTION_COMMIT:
case WriteAheadLog::Op::Type::TRANSACTION_ABORT:
LOG(FATAL) << "Transaction handling not handled in ApplyOp";
break;
case WriteAheadLog::Op::Type::CREATE_VERTEX:
dba.InsertVertex(op.vertex_id_);
break;
case WriteAheadLog::Op::Type::CREATE_EDGE: {
auto from = dba.FindVertex(op.vertex_from_id_, true);
auto to = dba.FindVertex(op.vertex_to_id_, true);
DCHECK(from) << "Failed to find vertex.";
DCHECK(to) << "Failed to find vertex.";
dba.InsertEdge(*from, *to, dba.EdgeType(op.edge_type_), op.edge_id_);
break;
}
case WriteAheadLog::Op::Type::SET_PROPERTY_VERTEX: {
auto vertex = dba.FindVertex(op.vertex_id_, true);
DCHECK(vertex) << "Failed to find vertex.";
vertex->PropsSet(dba.Property(op.property_), op.value_);
break;
}
case WriteAheadLog::Op::Type::SET_PROPERTY_EDGE: {
auto edge = dba.FindEdge(op.edge_id_, true);
DCHECK(edge) << "Failed to find edge.";
edge->PropsSet(dba.Property(op.property_), op.value_);
break;
}
case WriteAheadLog::Op::Type::ADD_LABEL: {
auto vertex = dba.FindVertex(op.vertex_id_, true);
DCHECK(vertex) << "Failed to find vertex.";
vertex->add_label(dba.Label(op.label_));
break;
}
case WriteAheadLog::Op::Type::REMOVE_LABEL: {
auto vertex = dba.FindVertex(op.vertex_id_, true);
DCHECK(vertex) << "Failed to find vertex.";
vertex->remove_label(dba.Label(op.label_));
break;
}
case WriteAheadLog::Op::Type::REMOVE_VERTEX: {
auto vertex = dba.FindVertex(op.vertex_id_, true);
DCHECK(vertex) << "Failed to find vertex.";
dba.DetachRemoveVertex(*vertex);
break;
}
case WriteAheadLog::Op::Type::REMOVE_EDGE: {
auto edge = dba.FindEdge(op.edge_id_, true);
DCHECK(edge) << "Failed to find edge.";
dba.RemoveEdge(*edge);
break;
}
case WriteAheadLog::Op::Type::BUILD_INDEX: {
LOG(FATAL) << "Index handling not handled in ApplyOp";
break;
}
}
}
// TODO - finer-grained recovery feedback could be useful here. // TODO - finer-grained recovery feedback could be useful here.
bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor, bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor,
RecoveryData &recovery_data) { RecoveryData &recovery_data) {
@ -231,19 +169,19 @@ bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor,
to_skip.emplace(recovery_data.snapshooter_tx_id); to_skip.emplace(recovery_data.snapshooter_tx_id);
} }
// A buffer for the WAL transaction ops. Accumulate and apply them in the // A buffer for the WAL transaction deltas. Accumulate and apply them in the
// right transactional sequence. // right transactional sequence.
std::map<tx::transaction_id_t, std::vector<WriteAheadLog::Op>> ops; std::map<tx::transaction_id_t, std::vector<database::StateDelta>> deltas;
// Track which transactions were aborted/committed in the WAL. // Track which transactions were aborted/committed in the WAL.
std::set<tx::transaction_id_t> aborted; std::set<tx::transaction_id_t> aborted;
std::set<tx::transaction_id_t> committed; std::set<tx::transaction_id_t> committed;
auto apply_all_possible = [&]() { auto apply_all_possible = [&]() {
while (true) { while (true) {
// Remove old ops from memory. // Remove old deltas from memory.
for (auto it = ops.begin(); it != ops.end();) { for (auto it = deltas.begin(); it != deltas.end();) {
if (it->first < next_to_recover) if (it->first < next_to_recover)
it = ops.erase(it); it = deltas.erase(it);
else else
++it; ++it;
} }
@ -254,9 +192,9 @@ bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor,
else if (utils::Contains(aborted, next_to_recover)) { else if (utils::Contains(aborted, next_to_recover)) {
next_to_recover++; next_to_recover++;
} else if (utils::Contains(committed, next_to_recover)) { } else if (utils::Contains(committed, next_to_recover)) {
auto found = ops.find(next_to_recover); auto found = deltas.find(next_to_recover);
if (found != ops.end()) if (found != deltas.end())
for (const auto &op : found->second) ApplyOp(op, db_accessor); for (const auto &delta : found->second) delta.Apply(db_accessor);
next_to_recover++; next_to_recover++;
} else } else
break; break;
@ -273,36 +211,37 @@ bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor,
if (!wal_reader.Open(wal_file)) return false; if (!wal_reader.Open(wal_file)) return false;
communication::bolt::Decoder<HashedFileReader> decoder(wal_reader); communication::bolt::Decoder<HashedFileReader> decoder(wal_reader);
while (true) { while (true) {
auto op = WriteAheadLog::Op::Decode(wal_reader, decoder); auto delta = database::StateDelta::Decode(wal_reader, decoder);
if (!op) break; if (!delta) break;
switch (op->type_) { switch (delta->type()) {
case WriteAheadLog::Op::Type::TRANSACTION_BEGIN: case database::StateDelta::Type::TRANSACTION_BEGIN:
DCHECK(ops.find(op->transaction_id_) == ops.end()) DCHECK(deltas.find(delta->transaction_id()) == deltas.end())
<< "Double transaction start"; << "Double transaction start";
if (to_skip.find(op->transaction_id_) == to_skip.end()) if (to_skip.find(delta->transaction_id()) == to_skip.end())
ops.emplace(op->transaction_id_, std::vector<WriteAheadLog::Op>{}); deltas.emplace(delta->transaction_id(),
std::vector<database::StateDelta>{});
break; break;
case WriteAheadLog::Op::Type::TRANSACTION_ABORT: { case database::StateDelta::Type::TRANSACTION_ABORT: {
auto it = ops.find(op->transaction_id_); auto it = deltas.find(delta->transaction_id());
if (it != ops.end()) ops.erase(it); if (it != deltas.end()) deltas.erase(it);
aborted.emplace(op->transaction_id_); aborted.emplace(delta->transaction_id());
apply_all_possible(); apply_all_possible();
break; break;
} }
case WriteAheadLog::Op::Type::TRANSACTION_COMMIT: case database::StateDelta::Type::TRANSACTION_COMMIT:
committed.emplace(op->transaction_id_); committed.emplace(delta->transaction_id());
apply_all_possible(); apply_all_possible();
break; break;
case WriteAheadLog::Op::Type::BUILD_INDEX: { case database::StateDelta::Type::BUILD_INDEX: {
recovery_data.indexes.emplace_back(op->label_, op->property_); recovery_data.indexes.emplace_back(delta->IndexName());
break; break;
} }
default: { default: {
auto it = ops.find(op->transaction_id_); auto it = deltas.find(delta->transaction_id());
if (it != ops.end()) it->second.emplace_back(*op); if (it != deltas.end()) it->second.emplace_back(*delta);
} }
} }
} // reading all Ops in a single wal file } // reading all deltas in a single wal file
} // reading all wal files } // reading all wal files
apply_all_possible(); apply_all_possible();

View File

@ -9,8 +9,8 @@ DEFINE_HIDDEN_int32(
"Interval between two write-ahead log flushes, in milliseconds."); "Interval between two write-ahead log flushes, in milliseconds.");
DEFINE_HIDDEN_int32( DEFINE_HIDDEN_int32(
wal_rotate_ops_count, 10000, wal_rotate_deltas_count, 10000,
"How many write-ahead ops should be stored in a single WAL file " "How many write-ahead deltas should be stored in a single WAL file "
"before rotating it."); "before rotating it.");
DEFINE_VALIDATED_HIDDEN_int32(wal_buffer_size, 4096, DEFINE_VALIDATED_HIDDEN_int32(wal_buffer_size, 4096,
@ -19,237 +19,22 @@ DEFINE_VALIDATED_HIDDEN_int32(wal_buffer_size, 4096,
namespace durability { namespace durability {
void WriteAheadLog::Op::Encode(
HashedFileWriter &writer,
communication::bolt::PrimitiveEncoder<HashedFileWriter> &encoder) const {
encoder.WriteInt(static_cast<int64_t>(type_));
encoder.WriteInt(static_cast<int64_t>(transaction_id_));
switch (type_) {
case Type::TRANSACTION_BEGIN:
case Type::TRANSACTION_COMMIT:
case Type::TRANSACTION_ABORT:
break;
case Type::CREATE_VERTEX:
encoder.WriteInt(vertex_id_);
break;
case Type::CREATE_EDGE:
encoder.WriteInt(edge_id_);
encoder.WriteInt(vertex_from_id_);
encoder.WriteInt(vertex_to_id_);
encoder.WriteString(edge_type_);
break;
case Type::SET_PROPERTY_VERTEX:
encoder.WriteInt(vertex_id_);
encoder.WriteString(property_);
encoder.WritePropertyValue(value_);
break;
case Type::SET_PROPERTY_EDGE:
encoder.WriteInt(edge_id_);
encoder.WriteString(property_);
encoder.WritePropertyValue(value_);
break;
case Type::ADD_LABEL:
case Type::REMOVE_LABEL:
encoder.WriteInt(vertex_id_);
encoder.WriteString(label_);
break;
case Type::REMOVE_VERTEX:
encoder.WriteInt(vertex_id_);
break;
case Type::REMOVE_EDGE:
encoder.WriteInt(edge_id_);
break;
case Type::BUILD_INDEX:
encoder.WriteString(label_);
encoder.WriteString(property_);
break;
}
writer.WriteValue(writer.hash());
}
#define DECODE_MEMBER(member, value_f) \
if (!decoder.ReadValue(&dv)) return nullopt; \
r_val.member = dv.value_f();
std::experimental::optional<WriteAheadLog::Op> WriteAheadLog::Op::Decode(
HashedFileReader &reader,
communication::bolt::Decoder<HashedFileReader> &decoder) {
using std::experimental::nullopt;
Op r_val;
// The decoded value used as a temporary while decoding.
communication::bolt::DecodedValue dv;
try {
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.type_ = static_cast<Op::Type>(dv.ValueInt());
DECODE_MEMBER(transaction_id_, ValueInt)
switch (r_val.type_) {
case Type::TRANSACTION_BEGIN:
case Type::TRANSACTION_COMMIT:
case Type::TRANSACTION_ABORT:
break;
case Type::CREATE_VERTEX:
DECODE_MEMBER(vertex_id_, ValueInt)
break;
case Type::CREATE_EDGE:
DECODE_MEMBER(edge_id_, ValueInt)
DECODE_MEMBER(vertex_from_id_, ValueInt)
DECODE_MEMBER(vertex_to_id_, ValueInt)
DECODE_MEMBER(edge_type_, ValueString)
break;
case Type::SET_PROPERTY_VERTEX:
DECODE_MEMBER(vertex_id_, ValueInt)
DECODE_MEMBER(property_, ValueString)
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.value_ = static_cast<PropertyValue>(dv);
break;
case Type::SET_PROPERTY_EDGE:
DECODE_MEMBER(edge_id_, ValueInt)
DECODE_MEMBER(property_, ValueString)
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.value_ = static_cast<PropertyValue>(dv);
break;
case Type::ADD_LABEL:
case Type::REMOVE_LABEL:
DECODE_MEMBER(vertex_id_, ValueInt)
DECODE_MEMBER(label_, ValueString)
break;
case Type::REMOVE_VERTEX:
DECODE_MEMBER(vertex_id_, ValueInt)
break;
case Type::REMOVE_EDGE:
DECODE_MEMBER(edge_id_, ValueInt)
break;
case Type::BUILD_INDEX:
DECODE_MEMBER(label_, ValueString)
DECODE_MEMBER(property_, ValueString)
break;
}
auto decoder_hash = reader.hash();
uint64_t encoded_hash;
if (!reader.ReadType(encoded_hash, true)) return nullopt;
if (decoder_hash != encoded_hash) return nullopt;
return r_val;
} catch (communication::bolt::DecodedValueException &) {
return nullopt;
} catch (std::ifstream::failure &) {
return nullopt;
}
}
#undef DECODE_MEMBER
WriteAheadLog::WriteAheadLog( WriteAheadLog::WriteAheadLog(
const std::experimental::filesystem::path &durability_dir, const std::experimental::filesystem::path &durability_dir,
bool durability_enabled) bool durability_enabled)
: ops_{FLAGS_wal_buffer_size}, wal_file_{durability_dir} { : deltas_{FLAGS_wal_buffer_size}, wal_file_{durability_dir} {
if (durability_enabled) { if (durability_enabled) {
CheckDurabilityDir(durability_dir); CheckDurabilityDir(durability_dir);
wal_file_.Init(); wal_file_.Init();
scheduler_.Run(std::chrono::milliseconds(FLAGS_wal_flush_interval_millis), scheduler_.Run(std::chrono::milliseconds(FLAGS_wal_flush_interval_millis),
[this]() { wal_file_.Flush(ops_); }); [this]() { wal_file_.Flush(deltas_); });
} }
} }
WriteAheadLog::~WriteAheadLog() { WriteAheadLog::~WriteAheadLog() {
// TODO review : scheduler.Stop() legal if it wasn't started? // TODO review : scheduler.Stop() legal if it wasn't started?
scheduler_.Stop(); scheduler_.Stop();
if (enabled_) wal_file_.Flush(ops_); if (enabled_) wal_file_.Flush(deltas_);
}
void WriteAheadLog::TxBegin(tx::transaction_id_t tx_id) {
Emplace({Op::Type::TRANSACTION_BEGIN, tx_id});
}
void WriteAheadLog::TxCommit(tx::transaction_id_t tx_id) {
Emplace({Op::Type::TRANSACTION_COMMIT, tx_id});
}
void WriteAheadLog::TxAbort(tx::transaction_id_t tx_id) {
Emplace({Op::Type::TRANSACTION_ABORT, tx_id});
}
void WriteAheadLog::CreateVertex(tx::transaction_id_t tx_id,
gid::Gid vertex_id) {
Op op(Op::Type::CREATE_VERTEX, tx_id);
op.vertex_id_ = vertex_id;
Emplace(std::move(op));
}
void WriteAheadLog::CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
gid::Gid vertex_from_id, gid::Gid vertex_to_id,
const std::string &edge_type) {
Op op(Op::Type::CREATE_EDGE, tx_id);
op.edge_id_ = edge_id;
op.vertex_from_id_ = vertex_from_id;
op.vertex_to_id_ = vertex_to_id;
op.edge_type_ = edge_type;
Emplace(std::move(op));
}
void WriteAheadLog::PropsSetVertex(tx::transaction_id_t tx_id,
gid::Gid vertex_id,
const std::string &property,
const PropertyValue &value) {
Op op(Op::Type::SET_PROPERTY_VERTEX, tx_id);
op.vertex_id_ = vertex_id;
op.property_ = property;
op.value_ = value;
Emplace(std::move(op));
}
void WriteAheadLog::PropsSetEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
const std::string &property,
const PropertyValue &value) {
Op op(Op::Type::SET_PROPERTY_EDGE, tx_id);
op.edge_id_ = edge_id;
op.property_ = property;
op.value_ = value;
Emplace(std::move(op));
}
void WriteAheadLog::AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
const std::string &label) {
Op op(Op::Type::ADD_LABEL, tx_id);
op.vertex_id_ = vertex_id;
op.label_ = label;
Emplace(std::move(op));
}
void WriteAheadLog::RemoveLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
const std::string &label) {
Op op(Op::Type::REMOVE_LABEL, tx_id);
op.vertex_id_ = vertex_id;
op.label_ = label;
Emplace(std::move(op));
}
void WriteAheadLog::RemoveVertex(tx::transaction_id_t tx_id,
gid::Gid vertex_id) {
Op op(Op::Type::REMOVE_VERTEX, tx_id);
op.vertex_id_ = vertex_id;
Emplace(std::move(op));
}
void WriteAheadLog::RemoveEdge(tx::transaction_id_t tx_id, gid::Gid edge_id) {
Op op(Op::Type::REMOVE_EDGE, tx_id);
op.edge_id_ = edge_id;
Emplace(std::move(op));
}
void WriteAheadLog::BuildIndex(tx::transaction_id_t tx_id,
const std::string &label,
const std::string &property) {
Op op(Op::Type::BUILD_INDEX, tx_id);
op.label_ = label;
op.property_ = property;
Emplace(std::move(op));
} }
WriteAheadLog::WalFile::WalFile( WriteAheadLog::WalFile::WalFile(
@ -275,10 +60,10 @@ void WriteAheadLog::WalFile::Init() {
} }
} }
latest_tx_ = 0; latest_tx_ = 0;
current_wal_file_ops_count_ = 0; current_wal_file_delta_count_ = 0;
} }
void WriteAheadLog::WalFile::Flush(RingBuffer<Op> &buffer) { void WriteAheadLog::WalFile::Flush(RingBuffer<database::StateDelta> &buffer) {
if (current_wal_file_.empty()) { if (current_wal_file_.empty()) {
LOG(ERROR) << "Write-ahead log file uninitialized, discarding data."; LOG(ERROR) << "Write-ahead log file uninitialized, discarding data.";
buffer.clear(); buffer.clear();
@ -287,11 +72,11 @@ void WriteAheadLog::WalFile::Flush(RingBuffer<Op> &buffer) {
try { try {
while (true) { while (true) {
auto op = buffer.pop(); auto delta = buffer.pop();
if (!op) break; if (!delta) break;
latest_tx_ = std::max(latest_tx_, op->transaction_id_); latest_tx_ = std::max(latest_tx_, delta->transaction_id());
op->Encode(writer_, encoder_); delta->Encode(writer_, encoder_);
if (++current_wal_file_ops_count_ >= FLAGS_wal_rotate_ops_count) if (++current_wal_file_delta_count_ >= FLAGS_wal_rotate_deltas_count)
RotateFile(); RotateFile();
} }
writer_.Flush(); writer_.Flush();
@ -312,9 +97,9 @@ void WriteAheadLog::WalFile::RotateFile() {
Init(); Init();
} }
void WriteAheadLog::Emplace(Op &&op) { void WriteAheadLog::Emplace(database::StateDelta &&delta) {
if (enabled_ && FLAGS_wal_flush_interval_millis >= 0) if (enabled_ && FLAGS_wal_flush_interval_millis >= 0)
ops_.emplace(std::move(op)); deltas_.emplace(std::move(delta));
} }
} // namespace durability } // namespace durability

View File

@ -12,8 +12,7 @@
#include "communication/bolt/v1/encoder/primitive_encoder.hpp" #include "communication/bolt/v1/encoder/primitive_encoder.hpp"
#include "data_structures/ring_buffer.hpp" #include "data_structures/ring_buffer.hpp"
#include "database/graph_db_datatypes.hpp" #include "database/graph_db_datatypes.hpp"
#include "durability/hashed_file_reader.hpp" #include "database/state_delta.hpp"
#include "durability/hashed_file_writer.hpp"
#include "storage/gid.hpp" #include "storage/gid.hpp"
#include "storage/property_value.hpp" #include "storage/property_value.hpp"
#include "transactions/type.hpp" #include "transactions/type.hpp"
@ -21,8 +20,8 @@
namespace durability { namespace durability {
/** A database operation log for durability. Buffers and periodically serializes /** A database StateDelta log for durability. Buffers and periodically
* small-granulation database operations (Ops). * serializes small-granulation database deltas (StateDelta).
* *
* The order is not deterministic in a multithreaded scenario (multiple DB * The order is not deterministic in a multithreaded scenario (multiple DB
* transactions). This is fine, the recovery process should be immune to this * transactions). This is fine, the recovery process should be immune to this
@ -30,63 +29,6 @@ namespace durability {
*/ */
class WriteAheadLog { class WriteAheadLog {
public: public:
/** A single operation that needs to be written to the write-ahead log. Either
* a transaction operation (start, commit or abort), or a database storage
* operation being executed within a transaction. */
class Op {
public:
/** Defines operation type. For each type the comment indicates which values
* need to be stored. All ops have the transaction_id member, so that's
* omitted in the comment. */
enum class Type {
TRANSACTION_BEGIN,
TRANSACTION_COMMIT,
TRANSACTION_ABORT,
CREATE_VERTEX, // vertex_id
CREATE_EDGE, // edge_id, from_vertex_id, to_vertex_id, edge_type
SET_PROPERTY_VERTEX, // vertex_id, property, property_value
SET_PROPERTY_EDGE, // edge_id, property, property_value
// remove property is done by setting a PropertyValue::Null
ADD_LABEL, // vertex_id, label
REMOVE_LABEL, // vertex_id, label
REMOVE_VERTEX, // vertex_id
REMOVE_EDGE, // edge_id
BUILD_INDEX // label, property
};
Op() = default;
Op(const Type &type, tx::transaction_id_t tx_id)
: type_(type), transaction_id_(tx_id) {}
// Members valid for every op.
Type type_;
tx::transaction_id_t transaction_id_;
// Hash obtained from the HashedFileWriter obtained after writing all the Op
// values. Cumulative with previous writes.
uint32_t hash_;
// Members valid only for some ops, see Op::Type comments above.
gid::Gid vertex_id_;
gid::Gid edge_id_;
gid::Gid vertex_from_id_;
gid::Gid vertex_to_id_;
std::string edge_type_;
std::string property_;
PropertyValue value_ = PropertyValue::Null;
std::string label_;
void Encode(
HashedFileWriter &writer,
communication::bolt::PrimitiveEncoder<HashedFileWriter> &encoder) const;
public:
/** Attempts to decode a WAL::Op from the given decoder. Returns the decoded
* value if successful, otherwise returns nullopt. */
static std::experimental::optional<Op> Decode(
HashedFileReader &reader,
communication::bolt::Decoder<HashedFileReader> &decoder);
};
WriteAheadLog(const std::experimental::filesystem::path &durability_dir, WriteAheadLog(const std::experimental::filesystem::path &durability_dir,
bool durability_enabled); bool durability_enabled);
~WriteAheadLog(); ~WriteAheadLog();
@ -95,25 +37,8 @@ class WriteAheadLog {
* (optional) recovery. */ * (optional) recovery. */
void Enable() { enabled_ = true; } void Enable() { enabled_ = true; }
void TxBegin(tx::transaction_id_t tx_id); // Emplaces the given DeltaState onto the buffer, if the WAL is enabled.
void TxCommit(tx::transaction_id_t tx_id); void Emplace(database::StateDelta &&delta);
void TxAbort(tx::transaction_id_t tx_id);
void CreateVertex(tx::transaction_id_t tx_id, gid::Gid vertex_id);
void CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
gid::Gid vertex_from_id, gid::Gid vertex_to_id,
const std::string &edge_type);
void PropsSetVertex(tx::transaction_id_t tx_id, gid::Gid vertex_id,
const std::string &property, const PropertyValue &value);
void PropsSetEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
const std::string &property, const PropertyValue &value);
void AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
const std::string &label);
void RemoveLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
const std::string &label);
void RemoveVertex(tx::transaction_id_t tx_id, gid::Gid vertex_id);
void RemoveEdge(tx::transaction_id_t tx_id, gid::Gid edge_id);
void BuildIndex(tx::transaction_id_t tx_id, const std::string &label,
const std::string &property);
private: private:
/** Groups the logic of WAL file handling (flushing, naming, rotating) */ /** Groups the logic of WAL file handling (flushing, naming, rotating) */
@ -126,9 +51,9 @@ class WriteAheadLog {
* called after Flush() to re-initialize stuff. */ * called after Flush() to re-initialize stuff. */
void Init(); void Init();
/** Flushes all the ops in the buffer to the WAL file. If necessary rotates /** Flushes all the deltas in the buffer to the WAL file. If necessary
* the file. */ * rotates the file. */
void Flush(RingBuffer<Op> &buffer); void Flush(RingBuffer<database::StateDelta> &buffer);
private: private:
const std::experimental::filesystem::path wal_dir_; const std::experimental::filesystem::path wal_dir_;
@ -139,8 +64,8 @@ class WriteAheadLog {
// moved when the WAL gets rotated. // moved when the WAL gets rotated.
std::experimental::filesystem::path current_wal_file_; std::experimental::filesystem::path current_wal_file_;
// Number of Ops in the current wal file. // Number of deltas in the current wal file.
int current_wal_file_ops_count_{0}; int current_wal_file_delta_count_{0};
// The latest transaction whose delta is recorded in the current WAL file. // The latest transaction whose delta is recorded in the current WAL file.
// Zero indicates that no deltas have so far been written to the current WAL // Zero indicates that no deltas have so far been written to the current WAL
@ -150,13 +75,10 @@ class WriteAheadLog {
void RotateFile(); void RotateFile();
}; };
RingBuffer<Op> ops_; RingBuffer<database::StateDelta> deltas_;
Scheduler scheduler_; Scheduler scheduler_;
WalFile wal_file_; WalFile wal_file_;
// Used for disabling the WAL during DB recovery. // Used for disabling the WAL during DB recovery.
bool enabled_{false}; bool enabled_{false};
// Emplaces the given Op onto the buffer, if the WAL is enabled.
void Emplace(Op &&op);
}; };
} // namespace durability } // namespace durability

View File

@ -22,8 +22,8 @@ void RecordAccessor<Vertex>::PropsSet(GraphDbTypes::Property key,
Vertex &vertex = update(); Vertex &vertex = update();
vertex.properties_.set(key, value); vertex.properties_.set(key, value);
auto &dba = db_accessor(); auto &dba = db_accessor();
dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->gid_, dba.wal().Emplace(database::StateDelta::PropsSetVertex(
dba.PropertyName(key), value); dba.transaction_id(), vlist_->gid_, dba.PropertyName(key), value));
db_accessor().UpdatePropertyIndex(key, *this, &vertex); db_accessor().UpdatePropertyIndex(key, *this, &vertex);
} }
@ -32,23 +32,25 @@ void RecordAccessor<Edge>::PropsSet(GraphDbTypes::Property key,
PropertyValue value) { PropertyValue value) {
update().properties_.set(key, value); update().properties_.set(key, value);
auto &dba = db_accessor(); auto &dba = db_accessor();
dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->gid_, dba.wal().Emplace(database::StateDelta::PropsSetEdge(
dba.PropertyName(key), value); dba.transaction_id(), vlist_->gid_, dba.PropertyName(key), value));
} }
template <> template <>
size_t RecordAccessor<Vertex>::PropsErase(GraphDbTypes::Property key) { size_t RecordAccessor<Vertex>::PropsErase(GraphDbTypes::Property key) {
auto &dba = db_accessor(); auto &dba = db_accessor();
dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->gid_, dba.wal().Emplace(database::StateDelta::PropsSetVertex(
dba.PropertyName(key), PropertyValue::Null); dba.transaction_id(), vlist_->gid_, dba.PropertyName(key),
PropertyValue::Null));
return update().properties_.erase(key); return update().properties_.erase(key);
} }
template <> template <>
size_t RecordAccessor<Edge>::PropsErase(GraphDbTypes::Property key) { size_t RecordAccessor<Edge>::PropsErase(GraphDbTypes::Property key) {
auto &dba = db_accessor(); auto &dba = db_accessor();
dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->gid_, dba.wal().Emplace(database::StateDelta::PropsSetEdge(
dba.PropertyName(key), PropertyValue::Null); dba.transaction_id(), vlist_->gid_, dba.PropertyName(key),
PropertyValue::Null));
return update().properties_.erase(key); return update().properties_.erase(key);
} }
@ -57,8 +59,9 @@ void RecordAccessor<Vertex>::PropsClear() {
auto &updated = update(); auto &updated = update();
auto &dba = db_accessor(); auto &dba = db_accessor();
for (const auto &kv : updated.properties_) for (const auto &kv : updated.properties_)
dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->gid_, dba.wal().Emplace(database::StateDelta::PropsSetVertex(
dba.PropertyName(kv.first), PropertyValue::Null); dba.transaction_id(), vlist_->gid_, dba.PropertyName(kv.first),
PropertyValue::Null));
updated.properties_.clear(); updated.properties_.clear();
} }
@ -67,8 +70,9 @@ void RecordAccessor<Edge>::PropsClear() {
auto &updated = update(); auto &updated = update();
auto &dba = db_accessor(); auto &dba = db_accessor();
for (const auto &kv : updated.properties_) for (const auto &kv : updated.properties_)
dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->gid_, dba.wal().Emplace(database::StateDelta::PropsSetEdge(
dba.PropertyName(kv.first), PropertyValue::Null); dba.transaction_id(), vlist_->gid_, dba.PropertyName(kv.first),
PropertyValue::Null));
updated.properties_.clear(); updated.properties_.clear();
} }

View File

@ -19,7 +19,8 @@ bool VertexAccessor::add_label(GraphDbTypes::Label label) {
vertex.labels_.emplace_back(label); vertex.labels_.emplace_back(label);
auto &dba = db_accessor(); auto &dba = db_accessor();
dba.UpdateLabelIndices(label, *this, &vertex); dba.UpdateLabelIndices(label, *this, &vertex);
dba.wal().AddLabel(dba.transaction_id(), gid(), dba.LabelName(label)); dba.wal().Emplace(database::StateDelta::AddLabel(dba.transaction_id(), gid(),
dba.LabelName(label)));
return true; return true;
} }
@ -31,7 +32,8 @@ size_t VertexAccessor::remove_label(GraphDbTypes::Label label) {
std::swap(*found, labels.back()); std::swap(*found, labels.back());
labels.pop_back(); labels.pop_back();
auto &dba = db_accessor(); auto &dba = db_accessor();
dba.wal().RemoveLabel(dba.transaction_id(), gid(), dba.LabelName(label)); dba.wal().Emplace(database::StateDelta::RemoveLabel(
dba.transaction_id(), gid(), dba.LabelName(label)));
return 1; return 1;
} }

View File

@ -12,6 +12,7 @@
#include "communication/bolt/v1/decoder/decoder.hpp" #include "communication/bolt/v1/decoder/decoder.hpp"
#include "database/graph_db.hpp" #include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp"
#include "durability/hashed_file_reader.hpp" #include "durability/hashed_file_reader.hpp"
#include "durability/paths.hpp" #include "durability/paths.hpp"
#include "durability/recovery.hpp" #include "durability/recovery.hpp"
@ -20,7 +21,7 @@
#include "utils/string.hpp" #include "utils/string.hpp"
DECLARE_int32(wal_flush_interval_millis); DECLARE_int32(wal_flush_interval_millis);
DECLARE_int32(wal_rotate_ops_count); DECLARE_int32(wal_rotate_deltas_count);
namespace fs = std::experimental::filesystem; namespace fs = std::experimental::filesystem;
@ -297,13 +298,15 @@ class Durability : public ::testing::Test {
durability_dir_ = tmp_dir_ / utils::RandomString(24); durability_dir_ = tmp_dir_ / utils::RandomString(24);
snapshot_dir_ = durability_dir_ / durability::kSnapshotDir; snapshot_dir_ = durability_dir_ / durability::kSnapshotDir;
wal_dir_ = durability_dir_ / durability::kWalDir; wal_dir_ = durability_dir_ / durability::kWalDir;
FLAGS_wal_rotate_ops_count = 1000; FLAGS_wal_rotate_deltas_count = 1000;
CleanDurability(); CleanDurability();
} }
void TearDown() override { CleanDurability(); } void TearDown() override { CleanDurability(); }
}; };
// Tests wal encoder to encode correctly non-CRUD deltas, and that all deltas
// are written in the correct order
TEST_F(Durability, WalEncoding) { TEST_F(Durability, WalEncoding) {
auto gid0 = gid::Create(0, 0); auto gid0 = gid::Create(0, 0);
auto gid1 = gid::Create(0, 1); auto gid1 = gid::Create(0, 1);
@ -332,55 +335,42 @@ TEST_F(Durability, WalEncoding) {
ASSERT_EQ(DirFiles(wal_dir_).size(), 1); ASSERT_EQ(DirFiles(wal_dir_).size(), 1);
ASSERT_TRUE(reader.Open(GetLastFile(wal_dir_))); ASSERT_TRUE(reader.Open(GetLastFile(wal_dir_)));
communication::bolt::Decoder<HashedFileReader> decoder{reader}; communication::bolt::Decoder<HashedFileReader> decoder{reader};
std::vector<durability::WriteAheadLog::Op> ops; std::vector<database::StateDelta> deltas;
while (true) { while (true) {
auto op = durability::WriteAheadLog::Op::Decode(reader, decoder); auto delta = database::StateDelta::Decode(reader, decoder);
if (op) { if (delta) {
ops.emplace_back(*op); deltas.emplace_back(*delta);
} else { } else {
break; break;
} }
} }
reader.Close(); reader.Close();
ASSERT_EQ(ops.size(), 11); ASSERT_EQ(deltas.size(), 11);
using Type = durability::WriteAheadLog::Op::Type; using Type = enum database::StateDelta::Type;
EXPECT_EQ(ops[0].type_, Type::TRANSACTION_BEGIN); EXPECT_EQ(deltas[0].type(), Type::TRANSACTION_BEGIN);
EXPECT_EQ(ops[0].transaction_id_, 1); EXPECT_EQ(deltas[0].transaction_id(), 1);
EXPECT_EQ(ops[1].type_, Type::CREATE_VERTEX); EXPECT_EQ(deltas[1].type(), Type::CREATE_VERTEX);
EXPECT_EQ(ops[1].transaction_id_, 1); EXPECT_EQ(deltas[1].transaction_id(), 1);
EXPECT_EQ(ops[1].vertex_id_, gid0); EXPECT_EQ(deltas[2].type(), Type::ADD_LABEL);
EXPECT_EQ(ops[2].type_, Type::ADD_LABEL); EXPECT_EQ(deltas[2].transaction_id(), 1);
EXPECT_EQ(ops[2].transaction_id_, 1); EXPECT_EQ(deltas[3].type(), Type::SET_PROPERTY_VERTEX);
EXPECT_EQ(ops[2].label_, "l0"); EXPECT_EQ(deltas[3].transaction_id(), 1);
EXPECT_EQ(ops[3].type_, Type::SET_PROPERTY_VERTEX); EXPECT_EQ(deltas[4].type(), Type::CREATE_VERTEX);
EXPECT_EQ(ops[3].transaction_id_, 1); EXPECT_EQ(deltas[4].transaction_id(), 1);
EXPECT_EQ(ops[3].vertex_id_, gid0); EXPECT_EQ(deltas[5].type(), Type::CREATE_EDGE);
EXPECT_EQ(ops[3].property_, "p0"); EXPECT_EQ(deltas[5].transaction_id(), 1);
EXPECT_EQ(ops[3].value_.type(), PropertyValue::Type::Int); EXPECT_EQ(deltas[6].type(), Type::SET_PROPERTY_EDGE);
EXPECT_EQ(ops[3].value_.Value<int64_t>(), 42); EXPECT_EQ(deltas[6].transaction_id(), 1);
EXPECT_EQ(ops[4].type_, Type::CREATE_VERTEX); // The next two deltas are the BuildIndex internal transactions.
EXPECT_EQ(ops[4].transaction_id_, 1); EXPECT_EQ(deltas[7].type(), Type::TRANSACTION_BEGIN);
EXPECT_EQ(ops[4].vertex_id_, gid1); EXPECT_EQ(deltas[8].type(), Type::TRANSACTION_COMMIT);
EXPECT_EQ(ops[5].type_, Type::CREATE_EDGE); EXPECT_EQ(deltas[9].type(), Type::BUILD_INDEX);
EXPECT_EQ(ops[5].transaction_id_, 1); auto index_name = deltas[9].IndexName();
EXPECT_EQ(ops[5].edge_id_, gid0); EXPECT_EQ(index_name.first, "l1");
EXPECT_EQ(ops[5].vertex_from_id_, gid0); EXPECT_EQ(index_name.second, "p1");
EXPECT_EQ(ops[5].vertex_to_id_, gid1); EXPECT_EQ(deltas[10].type(), Type::TRANSACTION_COMMIT);
EXPECT_EQ(ops[5].edge_type_, "et0"); EXPECT_EQ(deltas[10].transaction_id(), 1);
EXPECT_EQ(ops[6].type_, Type::SET_PROPERTY_EDGE);
EXPECT_EQ(ops[6].transaction_id_, 1);
EXPECT_EQ(ops[6].edge_id_, gid0);
EXPECT_EQ(ops[6].property_, "p0");
EXPECT_EQ(ops[6].value_.type(), PropertyValue::Type::List);
// The next two ops are the BuildIndex internal transactions.
EXPECT_EQ(ops[7].type_, Type::TRANSACTION_BEGIN);
EXPECT_EQ(ops[8].type_, Type::TRANSACTION_COMMIT);
EXPECT_EQ(ops[9].type_, Type::BUILD_INDEX);
EXPECT_EQ(ops[9].label_, "l1");
EXPECT_EQ(ops[9].property_, "p1");
EXPECT_EQ(ops[10].type_, Type::TRANSACTION_COMMIT);
EXPECT_EQ(ops[10].transaction_id_, 1);
} }
TEST_F(Durability, SnapshotEncoding) { TEST_F(Durability, SnapshotEncoding) {
@ -645,7 +635,7 @@ TEST_F(Durability, SnapshotRetention) {
} }
TEST_F(Durability, WalRetention) { TEST_F(Durability, WalRetention) {
FLAGS_wal_rotate_ops_count = 100; FLAGS_wal_rotate_deltas_count = 100;
auto config = DbConfig(); auto config = DbConfig();
config.durability_enabled = true; config.durability_enabled = true;
GraphDb db{config}; GraphDb db{config};

194
tests/unit/state_delta.cpp Normal file
View File

@ -0,0 +1,194 @@
#include "gtest/gtest.h"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp"
TEST(StateDelta, CreateVertex) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
{
GraphDbAccessor dba(db);
auto delta = database::StateDelta::CreateVertex(dba.transaction_id(), gid0);
delta.Apply(dba);
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto vertex = dba.FindVertex(gid0, false);
EXPECT_TRUE(vertex);
}
}
TEST(StateDelta, RemoveVertex) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
{
GraphDbAccessor dba(db);
dba.InsertVertex(gid0);
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto delta = database::StateDelta::RemoveVertex(dba.transaction_id(), gid0);
delta.Apply(dba);
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto vertex = dba.FindVertex(gid0, false);
EXPECT_FALSE(vertex);
}
}
TEST(StateDelta, CreateEdge) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
auto gid1 = gid::Create(0, 1);
auto gid2 = gid::Create(0, 2);
{
GraphDbAccessor dba(db);
dba.InsertVertex(gid0);
dba.InsertVertex(gid1);
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto delta = database::StateDelta::CreateEdge(dba.transaction_id(), gid2,
gid0, gid1, "edge");
delta.Apply(dba);
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto edge = dba.FindEdge(gid2, false);
EXPECT_TRUE(edge);
}
}
TEST(StateDelta, RemoveEdge) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
auto gid1 = gid::Create(0, 1);
auto gid2 = gid::Create(0, 2);
{
GraphDbAccessor dba(db);
auto v0 = dba.InsertVertex(gid0);
auto v1 = dba.InsertVertex(gid1);
dba.InsertEdge(v0, v1, dba.EdgeType("edge"), gid2);
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto delta = database::StateDelta::RemoveEdge(dba.transaction_id(), gid2);
delta.Apply(dba);
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto edge = dba.FindEdge(gid2, false);
EXPECT_FALSE(edge);
}
}
TEST(StateDelta, AddLabel) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
{
GraphDbAccessor dba(db);
dba.InsertVertex(gid0);
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto delta =
database::StateDelta::AddLabel(dba.transaction_id(), gid0, "label");
delta.Apply(dba);
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto vertex = dba.FindVertex(gid0, false);
EXPECT_TRUE(vertex);
auto labels = vertex->labels();
EXPECT_EQ(labels.size(), 1);
EXPECT_EQ(labels[0], dba.Label("label"));
}
}
TEST(StateDelta, RemoveLabel) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
{
GraphDbAccessor dba(db);
auto vertex = dba.InsertVertex(gid0);
vertex.add_label(dba.Label("label"));
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto delta =
database::StateDelta::RemoveLabel(dba.transaction_id(), gid0, "label");
delta.Apply(dba);
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto vertex = dba.FindVertex(gid0, false);
EXPECT_TRUE(vertex);
auto labels = vertex->labels();
EXPECT_EQ(labels.size(), 0);
}
}
TEST(StateDelta, SetPropertyVertex) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
{
GraphDbAccessor dba(db);
dba.InsertVertex(gid0);
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto delta = database::StateDelta::PropsSetVertex(
dba.transaction_id(), gid0, "property", PropertyValue(2212));
delta.Apply(dba);
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto vertex = dba.FindVertex(gid0, false);
EXPECT_TRUE(vertex);
auto prop = vertex->PropsAt(dba.Property("property"));
EXPECT_EQ(prop.Value<int64_t>(), 2212);
}
}
TEST(StateDelta, SetPropertyEdge) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
auto gid1 = gid::Create(0, 1);
auto gid2 = gid::Create(0, 2);
{
GraphDbAccessor dba(db);
auto v0 = dba.InsertVertex(gid0);
auto v1 = dba.InsertVertex(gid1);
dba.InsertEdge(v0, v1, dba.EdgeType("edge"), gid2);
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto delta = database::StateDelta::PropsSetEdge(
dba.transaction_id(), gid2, "property", PropertyValue(2212));
delta.Apply(dba);
dba.Commit();
}
{
GraphDbAccessor dba(db);
auto edge = dba.FindEdge(gid2, false);
EXPECT_TRUE(edge);
auto prop = edge->PropsAt(dba.Property("property"));
EXPECT_EQ(prop.Value<int64_t>(), 2212);
}
}