memgraph/src/durability/wal.cpp

322 lines
9.8 KiB
C++
Raw Normal View History

Write-ahead log Summary: My dear fellow Memgraphians. It's friday afternoon, and I am as ready to pop as WAL is to get reviewed... What's done: - Vertices and Edges have global IDs, stored in `VersionList`. Main storage is now a concurrent map ID->vlist_ptr. - WriteAheadLog class added. It's based around buffering WAL::Op objects (elementraly DB changes) and periodically serializing and flusing them to disk. - Snapshot recovery refactored, WAL recovery added. Snapshot format changed again to include necessary info. - Durability testing completely reworked. What's not done (and should be when we decide how): - Old WAL file purging. - Config refactor (naming and organization). Will do when we discuss what we want. - Changelog and new feature documentation (both depending on the point above). - Better error handling and recovery feedback. Currently it's all returning bools, which is not fine-grained enough (neither for errors nor partial successes, also EOF is reported as a failure at the moment). - Moving the implementation of WAL stuff to .cpp where possible. - Not sure if there are transactions being created outside of `GraphDbAccessor` and it's `BuildIndex`. Need to look into. - True write-ahead logic (flag controlled): not committing a DB transaction if the WAL has not flushed it's data. We can discuss the gain/effort ratio for this feature. Reviewers: buda, mislav.bradac, teon.banek, dgleich Reviewed By: dgleich Subscribers: mtomic, pullbot Differential Revision: https://phabricator.memgraph.io/D958
2017-11-13 16:50:49 +08:00
#include "wal.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "durability/paths.hpp"
Write-ahead log Summary: My dear fellow Memgraphians. It's friday afternoon, and I am as ready to pop as WAL is to get reviewed... What's done: - Vertices and Edges have global IDs, stored in `VersionList`. Main storage is now a concurrent map ID->vlist_ptr. - WriteAheadLog class added. It's based around buffering WAL::Op objects (elementraly DB changes) and periodically serializing and flusing them to disk. - Snapshot recovery refactored, WAL recovery added. Snapshot format changed again to include necessary info. - Durability testing completely reworked. What's not done (and should be when we decide how): - Old WAL file purging. - Config refactor (naming and organization). Will do when we discuss what we want. - Changelog and new feature documentation (both depending on the point above). - Better error handling and recovery feedback. Currently it's all returning bools, which is not fine-grained enough (neither for errors nor partial successes, also EOF is reported as a failure at the moment). - Moving the implementation of WAL stuff to .cpp where possible. - Not sure if there are transactions being created outside of `GraphDbAccessor` and it's `BuildIndex`. Need to look into. - True write-ahead logic (flag controlled): not committing a DB transaction if the WAL has not flushed it's data. We can discuss the gain/effort ratio for this feature. Reviewers: buda, mislav.bradac, teon.banek, dgleich Reviewed By: dgleich Subscribers: mtomic, pullbot Differential Revision: https://phabricator.memgraph.io/D958
2017-11-13 16:50:49 +08:00
#include "utils/flag_validation.hpp"
DEFINE_HIDDEN_int32(
wal_flush_interval_millis, 2,
"Interval between two write-ahead log flushes, in milliseconds.");
Write-ahead log Summary: My dear fellow Memgraphians. It's friday afternoon, and I am as ready to pop as WAL is to get reviewed... What's done: - Vertices and Edges have global IDs, stored in `VersionList`. Main storage is now a concurrent map ID->vlist_ptr. - WriteAheadLog class added. It's based around buffering WAL::Op objects (elementraly DB changes) and periodically serializing and flusing them to disk. - Snapshot recovery refactored, WAL recovery added. Snapshot format changed again to include necessary info. - Durability testing completely reworked. What's not done (and should be when we decide how): - Old WAL file purging. - Config refactor (naming and organization). Will do when we discuss what we want. - Changelog and new feature documentation (both depending on the point above). - Better error handling and recovery feedback. Currently it's all returning bools, which is not fine-grained enough (neither for errors nor partial successes, also EOF is reported as a failure at the moment). - Moving the implementation of WAL stuff to .cpp where possible. - Not sure if there are transactions being created outside of `GraphDbAccessor` and it's `BuildIndex`. Need to look into. - True write-ahead logic (flag controlled): not committing a DB transaction if the WAL has not flushed it's data. We can discuss the gain/effort ratio for this feature. Reviewers: buda, mislav.bradac, teon.banek, dgleich Reviewed By: dgleich Subscribers: mtomic, pullbot Differential Revision: https://phabricator.memgraph.io/D958
2017-11-13 16:50:49 +08:00
DEFINE_HIDDEN_int32(
wal_rotate_ops_count, 10000,
"How many write-ahead ops should be stored in a single WAL file "
"before rotating it.");
Write-ahead log Summary: My dear fellow Memgraphians. It's friday afternoon, and I am as ready to pop as WAL is to get reviewed... What's done: - Vertices and Edges have global IDs, stored in `VersionList`. Main storage is now a concurrent map ID->vlist_ptr. - WriteAheadLog class added. It's based around buffering WAL::Op objects (elementraly DB changes) and periodically serializing and flusing them to disk. - Snapshot recovery refactored, WAL recovery added. Snapshot format changed again to include necessary info. - Durability testing completely reworked. What's not done (and should be when we decide how): - Old WAL file purging. - Config refactor (naming and organization). Will do when we discuss what we want. - Changelog and new feature documentation (both depending on the point above). - Better error handling and recovery feedback. Currently it's all returning bools, which is not fine-grained enough (neither for errors nor partial successes, also EOF is reported as a failure at the moment). - Moving the implementation of WAL stuff to .cpp where possible. - Not sure if there are transactions being created outside of `GraphDbAccessor` and it's `BuildIndex`. Need to look into. - True write-ahead logic (flag controlled): not committing a DB transaction if the WAL has not flushed it's data. We can discuss the gain/effort ratio for this feature. Reviewers: buda, mislav.bradac, teon.banek, dgleich Reviewed By: dgleich Subscribers: mtomic, pullbot Differential Revision: https://phabricator.memgraph.io/D958
2017-11-13 16:50:49 +08:00
DEFINE_VALIDATED_HIDDEN_int32(wal_buffer_size, 4096,
"Write-ahead log buffer size.",
FLAG_IN_RANGE(1, 1 << 30));
namespace durability {
void WriteAheadLog::Op::Encode(
HashedFileWriter &writer,
communication::bolt::PrimitiveEncoder<HashedFileWriter> &encoder) const {
encoder.WriteInt(static_cast<int64_t>(type_));
encoder.WriteInt(static_cast<int64_t>(transaction_id_));
switch (type_) {
case Type::TRANSACTION_BEGIN:
case Type::TRANSACTION_COMMIT:
case Type::TRANSACTION_ABORT:
break;
case Type::CREATE_VERTEX:
encoder.WriteInt(vertex_id_);
break;
case Type::CREATE_EDGE:
encoder.WriteInt(edge_id_);
encoder.WriteInt(vertex_from_id_);
encoder.WriteInt(vertex_to_id_);
encoder.WriteString(edge_type_);
break;
case Type::SET_PROPERTY_VERTEX:
encoder.WriteInt(vertex_id_);
encoder.WriteString(property_);
encoder.WritePropertyValue(value_);
break;
case Type::SET_PROPERTY_EDGE:
encoder.WriteInt(edge_id_);
encoder.WriteString(property_);
encoder.WritePropertyValue(value_);
break;
case Type::ADD_LABEL:
case Type::REMOVE_LABEL:
encoder.WriteInt(vertex_id_);
encoder.WriteString(label_);
break;
case Type::REMOVE_VERTEX:
encoder.WriteInt(vertex_id_);
break;
case Type::REMOVE_EDGE:
encoder.WriteInt(edge_id_);
break;
case Type::BUILD_INDEX:
encoder.WriteString(label_);
encoder.WriteString(property_);
break;
}
writer.WriteValue(writer.hash());
}
#define DECODE_MEMBER(member, value_f) \
if (!decoder.ReadValue(&dv)) return nullopt; \
r_val.member = dv.value_f();
std::experimental::optional<WriteAheadLog::Op> WriteAheadLog::Op::Decode(
HashedFileReader &reader,
communication::bolt::Decoder<HashedFileReader> &decoder) {
using std::experimental::nullopt;
Op r_val;
// The decoded value used as a temporary while decoding.
communication::bolt::DecodedValue dv;
try {
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.type_ = static_cast<Op::Type>(dv.ValueInt());
DECODE_MEMBER(transaction_id_, ValueInt)
switch (r_val.type_) {
case Type::TRANSACTION_BEGIN:
case Type::TRANSACTION_COMMIT:
case Type::TRANSACTION_ABORT:
break;
case Type::CREATE_VERTEX:
DECODE_MEMBER(vertex_id_, ValueInt)
break;
case Type::CREATE_EDGE:
DECODE_MEMBER(edge_id_, ValueInt)
DECODE_MEMBER(vertex_from_id_, ValueInt)
DECODE_MEMBER(vertex_to_id_, ValueInt)
DECODE_MEMBER(edge_type_, ValueString)
break;
case Type::SET_PROPERTY_VERTEX:
DECODE_MEMBER(vertex_id_, ValueInt)
DECODE_MEMBER(property_, ValueString)
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.value_ = static_cast<PropertyValue>(dv);
break;
case Type::SET_PROPERTY_EDGE:
DECODE_MEMBER(edge_id_, ValueInt)
DECODE_MEMBER(property_, ValueString)
if (!decoder.ReadValue(&dv)) return nullopt;
r_val.value_ = static_cast<PropertyValue>(dv);
break;
case Type::ADD_LABEL:
case Type::REMOVE_LABEL:
DECODE_MEMBER(vertex_id_, ValueInt)
DECODE_MEMBER(label_, ValueString)
break;
case Type::REMOVE_VERTEX:
DECODE_MEMBER(vertex_id_, ValueInt)
break;
case Type::REMOVE_EDGE:
DECODE_MEMBER(edge_id_, ValueInt)
break;
case Type::BUILD_INDEX:
DECODE_MEMBER(label_, ValueString)
DECODE_MEMBER(property_, ValueString)
break;
}
auto decoder_hash = reader.hash();
uint64_t encoded_hash;
if (!reader.ReadType(encoded_hash, true)) return nullopt;
if (decoder_hash != encoded_hash) return nullopt;
return r_val;
} catch (communication::bolt::DecodedValueException &) {
return nullopt;
} catch (std::ifstream::failure &) {
return nullopt;
}
}
#undef DECODE_MEMBER
WriteAheadLog::WriteAheadLog(
const std::experimental::filesystem::path &durability_dir,
bool durability_enabled)
: ops_{FLAGS_wal_buffer_size}, wal_file_{durability_dir} {
if (durability_enabled) {
CheckDurabilityDir(durability_dir);
wal_file_.Init();
scheduler_.Run(std::chrono::milliseconds(FLAGS_wal_flush_interval_millis),
[this]() { wal_file_.Flush(ops_); });
}
}
WriteAheadLog::~WriteAheadLog() {
// TODO review : scheduler.Stop() legal if it wasn't started?
scheduler_.Stop();
if (enabled_) wal_file_.Flush(ops_);
}
void WriteAheadLog::TxBegin(tx::transaction_id_t tx_id) {
Emplace({Op::Type::TRANSACTION_BEGIN, tx_id});
}
void WriteAheadLog::TxCommit(tx::transaction_id_t tx_id) {
Emplace({Op::Type::TRANSACTION_COMMIT, tx_id});
}
void WriteAheadLog::TxAbort(tx::transaction_id_t tx_id) {
Emplace({Op::Type::TRANSACTION_ABORT, tx_id});
}
void WriteAheadLog::CreateVertex(tx::transaction_id_t tx_id,
int64_t vertex_id) {
Op op(Op::Type::CREATE_VERTEX, tx_id);
op.vertex_id_ = vertex_id;
Emplace(std::move(op));
}
void WriteAheadLog::CreateEdge(tx::transaction_id_t tx_id, int64_t edge_id,
int64_t vertex_from_id, int64_t vertex_to_id,
const std::string &edge_type) {
Op op(Op::Type::CREATE_EDGE, tx_id);
op.edge_id_ = edge_id;
op.vertex_from_id_ = vertex_from_id;
op.vertex_to_id_ = vertex_to_id;
op.edge_type_ = edge_type;
Emplace(std::move(op));
}
void WriteAheadLog::PropsSetVertex(tx::transaction_id_t tx_id,
int64_t vertex_id,
const std::string &property,
const PropertyValue &value) {
Op op(Op::Type::SET_PROPERTY_VERTEX, tx_id);
op.vertex_id_ = vertex_id;
op.property_ = property;
op.value_ = value;
Emplace(std::move(op));
}
void WriteAheadLog::PropsSetEdge(tx::transaction_id_t tx_id, int64_t edge_id,
const std::string &property,
const PropertyValue &value) {
Op op(Op::Type::SET_PROPERTY_EDGE, tx_id);
op.edge_id_ = edge_id;
op.property_ = property;
op.value_ = value;
Emplace(std::move(op));
}
void WriteAheadLog::AddLabel(tx::transaction_id_t tx_id, int64_t vertex_id,
const std::string &label) {
Op op(Op::Type::ADD_LABEL, tx_id);
op.vertex_id_ = vertex_id;
op.label_ = label;
Emplace(std::move(op));
}
void WriteAheadLog::RemoveLabel(tx::transaction_id_t tx_id, int64_t vertex_id,
const std::string &label) {
Op op(Op::Type::REMOVE_LABEL, tx_id);
op.vertex_id_ = vertex_id;
op.label_ = label;
Emplace(std::move(op));
}
void WriteAheadLog::RemoveVertex(tx::transaction_id_t tx_id,
int64_t vertex_id) {
Op op(Op::Type::REMOVE_VERTEX, tx_id);
op.vertex_id_ = vertex_id;
Emplace(std::move(op));
}
void WriteAheadLog::RemoveEdge(tx::transaction_id_t tx_id, int64_t edge_id) {
Op op(Op::Type::REMOVE_EDGE, tx_id);
op.edge_id_ = edge_id;
Emplace(std::move(op));
}
void WriteAheadLog::BuildIndex(tx::transaction_id_t tx_id,
const std::string &label,
const std::string &property) {
Op op(Op::Type::BUILD_INDEX, tx_id);
op.label_ = label;
op.property_ = property;
Emplace(std::move(op));
}
WriteAheadLog::WalFile::WalFile(
const std::experimental::filesystem::path &durability_dir)
: wal_dir_{durability_dir / kWalDir} {}
WriteAheadLog::WalFile::~WalFile() {
if (!current_wal_file_.empty()) writer_.Close();
}
void WriteAheadLog::WalFile::Init() {
if (!std::experimental::filesystem::exists(wal_dir_) &&
!std::experimental::filesystem::create_directories(wal_dir_)) {
LOG(ERROR) << "Can't write to WAL directory: " << wal_dir_;
current_wal_file_ = std::experimental::filesystem::path();
} else {
current_wal_file_ = WalFilenameForTransactionId(wal_dir_);
try {
writer_.Open(current_wal_file_);
} catch (std::ios_base::failure &) {
LOG(ERROR) << "Failed to open write-ahead log file: "
<< current_wal_file_;
current_wal_file_ = std::experimental::filesystem::path();
}
}
latest_tx_ = 0;
current_wal_file_ops_count_ = 0;
}
void WriteAheadLog::WalFile::Flush(RingBuffer<Op> &buffer) {
if (current_wal_file_.empty()) {
LOG(ERROR) << "Write-ahead log file uninitialized, discarding data.";
buffer.clear();
return;
}
try {
while (true) {
auto op = buffer.pop();
if (!op) break;
latest_tx_ = std::max(latest_tx_, op->transaction_id_);
op->Encode(writer_, encoder_);
if (++current_wal_file_ops_count_ >= FLAGS_wal_rotate_ops_count)
RotateFile();
}
writer_.Flush();
} catch (std::ios_base::failure &) {
LOG(ERROR) << "Failed to write to write-ahead log, discarding data.";
buffer.clear();
return;
} catch (std::experimental::filesystem::filesystem_error &) {
LOG(ERROR) << "Failed to rotate write-ahead log.";
buffer.clear();
return;
}
}
void WriteAheadLog::WalFile::RotateFile() {
writer_.Close();
std::experimental::filesystem::rename(
current_wal_file_, WalFilenameForTransactionId(wal_dir_, latest_tx_));
Init();
}
void WriteAheadLog::Emplace(Op &&op) {
if (enabled_ && FLAGS_wal_flush_interval_millis >= 0)
ops_.emplace(std::move(op));
}
} // namespace durability