Add replica recovery process (#40)

* Add file transfer over RPC
* Snapshot transfer implementation
* Allow snapshot creation only for MAIN instances
* Replica and main can have replication clients
* Use only snapshots and WALs that are from the Main storage
* Add flush lock and expose buffer
* Add fstat for file size and TryFlushing method
* Use lseek for size

Co-authored-by: Antonio Andelic <antonio.andelic@memgraph.io>
This commit is contained in:
antonio2368 2020-11-12 09:55:56 +01:00 committed by Antonio Andelic
parent 42f6118c00
commit bc0c944910
21 changed files with 1167 additions and 316 deletions

View File

@ -19,6 +19,8 @@ if(MG_ENTERPRISE)
set(storage_v2_src_files
${storage_v2_src_files}
replication/replication.cpp
replication/serialization.cpp
replication/slk.cpp
${lcp_storage_cpp_files})
endif()

View File

@ -50,6 +50,99 @@ void VerifyStorageDirectoryOwnerAndProcessUserOrDie(
<< ". Please start the process as user " << user_directory << "!";
}
std::vector<std::pair<std::filesystem::path, std::string>> GetSnapshotFiles(
const std::filesystem::path &snapshot_directory,
const std::string_view uuid) {
std::vector<std::pair<std::filesystem::path, std::string>> snapshot_files;
std::error_code error_code;
if (utils::DirExists(snapshot_directory)) {
for (const auto &item :
std::filesystem::directory_iterator(snapshot_directory, error_code)) {
if (!item.is_regular_file()) continue;
try {
auto info = ReadSnapshotInfo(item.path());
if (uuid.empty() || info.uuid == uuid) {
snapshot_files.emplace_back(item.path(), info.uuid);
}
} catch (const RecoveryFailure &) {
continue;
}
}
CHECK(!error_code) << "Couldn't recover data because an error occurred: "
<< error_code.message() << "!";
}
return snapshot_files;
}
std::optional<std::vector<
std::tuple<uint64_t, uint64_t, uint64_t, std::filesystem::path>>>
GetWalFiles(const std::filesystem::path &wal_directory,
const std::string_view uuid,
const std::optional<size_t> current_seq_num) {
if (!utils::DirExists(wal_directory)) return std::nullopt;
std::vector<std::tuple<uint64_t, uint64_t, uint64_t, std::filesystem::path>>
wal_files;
std::error_code error_code;
for (const auto &item :
std::filesystem::directory_iterator(wal_directory, error_code)) {
if (!item.is_regular_file()) continue;
try {
auto info = ReadWalInfo(item.path());
if ((uuid.empty() || info.uuid == uuid) &&
(!current_seq_num || info.seq_num < current_seq_num))
wal_files.emplace_back(info.seq_num, info.from_timestamp,
info.to_timestamp, item.path());
} catch (const RecoveryFailure &e) {
DLOG(WARNING) << "Failed to read " << item.path();
continue;
}
}
CHECK(!error_code) << "Couldn't recover data because an error occurred: "
<< error_code.message() << "!";
std::sort(wal_files.begin(), wal_files.end());
return std::move(wal_files);
}
// Function used to recover all discovered indices and constraints. The
// indices and constraints must be recovered after the data recovery is done
// to ensure that the indices and constraints are consistent at the end of the
// recovery process.
void RecoverIndicesAndConstraints(
const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices,
Constraints *constraints, utils::SkipList<Vertex> *vertices) {
// Recover label indices.
for (const auto &item : indices_constraints.indices.label) {
if (!indices->label_index.CreateIndex(item, vertices->access()))
throw RecoveryFailure("The label index must be created here!");
}
// Recover label+property indices.
for (const auto &item : indices_constraints.indices.label_property) {
if (!indices->label_property_index.CreateIndex(item.first, item.second,
vertices->access()))
throw RecoveryFailure("The label+property index must be created here!");
}
// Recover existence constraints.
for (const auto &item : indices_constraints.constraints.existence) {
auto ret = CreateExistenceConstraint(constraints, item.first, item.second,
vertices->access());
if (ret.HasError() || !ret.GetValue())
throw RecoveryFailure("The existence constraint must be created here!");
}
// Recover unique constraints.
for (const auto &item : indices_constraints.constraints.unique) {
auto ret = constraints->unique_constraints.CreateConstraint(
item.first, item.second, vertices->access());
if (ret.HasError() ||
ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS)
throw RecoveryFailure("The unique constraint must be created here!");
}
}
std::optional<RecoveryInfo> RecoverData(
const std::filesystem::path &snapshot_directory,
const std::filesystem::path &wal_directory, std::string *uuid,
@ -60,64 +153,13 @@ std::optional<RecoveryInfo> RecoverData(
if (!utils::DirExists(snapshot_directory) && !utils::DirExists(wal_directory))
return std::nullopt;
// Helper lambda used to recover all discovered indices and constraints. The
// indices and constraints must be recovered after the data recovery is done
// to ensure that the indices and constraints are consistent at the end of the
// recovery process.
auto recover_indices_and_constraints = [&](const auto &indices_constraints) {
// Recover label indices.
for (const auto &item : indices_constraints.indices.label) {
if (!indices->label_index.CreateIndex(item, vertices->access()))
throw RecoveryFailure("The label index must be created here!");
}
// Recover label+property indices.
for (const auto &item : indices_constraints.indices.label_property) {
if (!indices->label_property_index.CreateIndex(item.first, item.second,
vertices->access()))
throw RecoveryFailure("The label+property index must be created here!");
}
// Recover existence constraints.
for (const auto &item : indices_constraints.constraints.existence) {
auto ret = CreateExistenceConstraint(constraints, item.first, item.second,
vertices->access());
if (ret.HasError() || !ret.GetValue())
throw RecoveryFailure("The existence constraint must be created here!");
}
// Recover unique constraints.
for (const auto &item : indices_constraints.constraints.unique) {
auto ret = constraints->unique_constraints.CreateConstraint(
item.first, item.second, vertices->access());
if (ret.HasError() ||
ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS)
throw RecoveryFailure("The unique constraint must be created here!");
}
};
// Array of all discovered snapshots, ordered by name.
std::vector<std::pair<std::filesystem::path, std::string>> snapshot_files;
std::error_code error_code;
if (utils::DirExists(snapshot_directory)) {
for (const auto &item :
std::filesystem::directory_iterator(snapshot_directory, error_code)) {
if (!item.is_regular_file()) continue;
try {
auto info = ReadSnapshotInfo(item.path());
snapshot_files.emplace_back(item.path(), info.uuid);
} catch (const RecoveryFailure &) {
continue;
}
}
CHECK(!error_code) << "Couldn't recover data because an error occurred: "
<< error_code.message() << "!";
}
auto snapshot_files = GetSnapshotFiles(snapshot_directory);
RecoveryInfo recovery_info;
RecoveredIndicesAndConstraints indices_constraints;
std::optional<uint64_t> snapshot_timestamp;
if (!snapshot_files.empty()) {
// Order the files by name
std::sort(snapshot_files.begin(), snapshot_files.end());
// UUID used for durability is the UUID of the last snapshot file.
*uuid = snapshot_files.back().second;
@ -149,10 +191,12 @@ std::optional<RecoveryInfo> RecoverData(
indices_constraints = std::move(recovered_snapshot->indices_constraints);
snapshot_timestamp = recovered_snapshot->snapshot_info.start_timestamp;
if (!utils::DirExists(wal_directory)) {
recover_indices_and_constraints(indices_constraints);
RecoverIndicesAndConstraints(indices_constraints, indices, constraints,
vertices);
return recovered_snapshot->recovery_info;
}
} else {
std::error_code error_code;
if (!utils::DirExists(wal_directory)) return std::nullopt;
// Array of all discovered WAL files, ordered by name.
std::vector<std::pair<std::filesystem::path, std::string>> wal_files;
@ -174,23 +218,12 @@ std::optional<RecoveryInfo> RecoverData(
*uuid = wal_files.back().second;
}
auto maybe_wal_files = GetWalFiles(wal_directory, *uuid);
if (!maybe_wal_files) return std::nullopt;
// Array of all discovered WAL files, ordered by sequence number.
std::vector<std::tuple<uint64_t, uint64_t, uint64_t, std::filesystem::path>>
wal_files;
for (const auto &item :
std::filesystem::directory_iterator(wal_directory, error_code)) {
if (!item.is_regular_file()) continue;
try {
auto info = ReadWalInfo(item.path());
if (info.uuid != *uuid) continue;
wal_files.emplace_back(info.seq_num, info.from_timestamp,
info.to_timestamp, item.path());
} catch (const RecoveryFailure &e) {
continue;
}
}
CHECK(!error_code) << "Couldn't recover data because an error occurred: "
<< error_code.message() << "!";
auto &wal_files = *maybe_wal_files;
// By this point we should have recovered from a snapshot, or we should have
// found some WAL files to recover from in the above `else`. This is just a
// sanity check to circumvent the following case: The database didn't recover
@ -250,7 +283,8 @@ std::optional<RecoveryInfo> RecoverData(
*wal_seq_num = *previous_seq_num + 1;
}
recover_indices_and_constraints(indices_constraints);
RecoverIndicesAndConstraints(indices_constraints, indices, constraints,
vertices);
return recovery_info;
}

View File

@ -9,6 +9,7 @@
#include "storage/v2/config.hpp"
#include "storage/v2/constraints.hpp"
#include "storage/v2/durability/metadata.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/indices.hpp"
#include "storage/v2/name_id_mapper.hpp"
@ -23,6 +24,41 @@ namespace storage::durability {
void VerifyStorageDirectoryOwnerAndProcessUserOrDie(
const std::filesystem::path &storage_directory);
/// Get list of snapshot files with their UUID.
/// @param snapshot_directory Directory containing the Snapshot files.
/// @param uuid UUID of the Snapshot files. If not empty, fetch only Snapshot
/// file with the specified UUID. Otherwise, fetch only Snapshot files in the
/// snapshot_directory.
/// @return List of snapshot files defined with its path and UUID.
std::vector<std::pair<std::filesystem::path, std::string>> GetSnapshotFiles(
const std::filesystem::path &snapshot_directory,
std::string_view uuid = "");
/// Get list of WAL files ordered by the sequence number
/// @param wal_directory Directory containing the WAL files.
/// @param uuid UUID of the WAL files. If not empty, fetch only WAL files
/// with the specified UUID. Otherwise, fetch all WAL files in the
/// wal_directory.
/// @param current_seq_num Sequence number of the WAL file which is currently
/// being written. If specified, load only finalized WAL files, i.e. WAL files
/// with seq_num < current_seq_num.
/// @return List of WAL files. Each WAL file is defined with its sequence
/// number, from timestamp, to timestamp and path.
std::optional<std::vector<
std::tuple<uint64_t, uint64_t, uint64_t, std::filesystem::path>>>
GetWalFiles(const std::filesystem::path &wal_directory,
std::string_view uuid = "",
std::optional<size_t> current_seq_num = {});
// Helper function used to recover all discovered indices and constraints. The
// indices and constraints must be recovered after the data recovery is done
// to ensure that the indices and constraints are consistent at the end of the
// recovery process.
/// @throw RecoveryFailure
void RecoverIndicesAndConstraints(
const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices,
Constraints *constraints, utils::SkipList<Vertex> *vertices);
/// Recovers data either from a snapshot and/or WAL files.
/// @throw RecoveryFailure
/// @throw std::bad_alloc

View File

@ -119,6 +119,18 @@ void Encoder::Finalize() {
file_.Close();
}
void Encoder::DisableFlushing() { file_.DisableFlushing(); }
void Encoder::EnableFlushing() { file_.EnableFlushing(); }
void Encoder::TryFlushing() { file_.TryFlushing(); }
std::pair<const uint8_t *, size_t> Encoder::CurrentFileBuffer() const {
return file_.CurrentBuffer();
}
size_t Encoder::GetSize() { return file_.GetSize(); }
//////////////////////////
// Decoder implementation.
//////////////////////////

View File

@ -51,6 +51,18 @@ class Encoder final : public BaseEncoder {
void Finalize();
// Disable flushing of the internal buffer.
void DisableFlushing();
// Enable flushing of the internal buffer.
void EnableFlushing();
// Try flushing the internal buffer.
void TryFlushing();
// Get the current internal buffer with its size.
std::pair<const uint8_t *, size_t> CurrentFileBuffer() const;
// Get the total size of the current file.
size_t GetSize();
private:
utils::OutputFile file_;
};

View File

@ -9,6 +9,7 @@
#include "storage/v2/edge_ref.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "utils/file_locker.hpp"
namespace storage::durability {
@ -582,7 +583,8 @@ void CreateSnapshot(Transaction *transaction,
utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints,
Config::Items items, const std::string &uuid) {
Config::Items items, const std::string &uuid,
utils::FileRetainer *file_retainer) {
// Ensure that the storage directory exists.
utils::EnsureDirOrDie(snapshot_directory);
@ -865,10 +867,7 @@ void CreateSnapshot(Transaction *transaction,
old_snapshot_files.size() - (snapshot_retention_count - 1);
for (size_t i = 0; i < num_to_erase; ++i) {
const auto &[start_timestamp, snapshot_path] = old_snapshot_files[i];
if (!utils::DeleteFile(snapshot_path)) {
LOG(WARNING) << "Couldn't delete snapshot file " << snapshot_path
<< "!";
}
file_retainer->DeleteFile(snapshot_path);
}
old_snapshot_files.erase(old_snapshot_files.begin(),
old_snapshot_files.begin() + num_to_erase);

View File

@ -12,6 +12,7 @@
#include "storage/v2/name_id_mapper.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex.hpp"
#include "utils/file_locker.hpp"
#include "utils/skip_list.hpp"
namespace storage::durability {
@ -60,6 +61,7 @@ void CreateSnapshot(Transaction *transaction,
utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints,
Config::Items items, const std::string &uuid);
Config::Items items, const std::string &uuid,
utils::FileRetainer *file_retainer);
} // namespace storage::durability

View File

@ -976,7 +976,8 @@ WalFile::WalFile(const std::filesystem::path &wal_directory,
path_(wal_directory / MakeWalName()),
from_timestamp_(0),
to_timestamp_(0),
count_(0) {
count_(0),
seq_num_(seq_num) {
// Ensure that the storage directory exists.
utils::EnsureDirOrDie(wal_directory);
@ -1055,7 +1056,9 @@ void WalFile::AppendOperation(StorageGlobalOperation operation, LabelId label,
void WalFile::Sync() { wal_.Sync(); }
uint64_t WalFile::GetSize() { return wal_.GetPosition(); }
uint64_t WalFile::GetSize() { return wal_.GetSize(); }
uint64_t WalFile::SequenceNumber() const { return seq_num_; }
void WalFile::UpdateStats(uint64_t timestamp) {
if (count_ == 0) from_timestamp_ = timestamp;
@ -1063,4 +1066,14 @@ void WalFile::UpdateStats(uint64_t timestamp) {
count_ += 1;
}
void WalFile::DisableFlushing() { wal_.DisableFlushing(); }
void WalFile::EnableFlushing() { wal_.EnableFlushing(); }
void WalFile::TryFlushing() { wal_.TryFlushing(); }
std::pair<const uint8_t *, size_t> WalFile::CurrentFileBuffer() const {
return wal_.CurrentFileBuffer();
}
} // namespace storage::durability

View File

@ -181,6 +181,20 @@ class WalFile {
uint64_t GetSize();
uint64_t SequenceNumber() const;
// Disable flushing of the internal buffer.
void DisableFlushing();
// Enable flushing of the internal buffer.
void EnableFlushing();
// Try flushing the internal buffer.
void TryFlushing();
// Get the internal buffer with its size.
std::pair<const uint8_t *, size_t> CurrentFileBuffer() const;
// Get the path of the current WAL file.
const auto &Path() const { return path_; }
private:
void UpdateStats(uint64_t timestamp);
@ -191,6 +205,7 @@ class WalFile {
uint64_t from_timestamp_;
uint64_t to_timestamp_;
uint64_t count_;
uint64_t seq_num_;
};
} // namespace storage::durability

View File

@ -0,0 +1,107 @@
#include "storage/v2/replication/replication.hpp"
namespace storage::replication {
////// ReplicationClient //////
ReplicationClient::ReplicationClient(std::string name,
NameIdMapper *name_id_mapper,
Config::Items items,
const io::network::Endpoint &endpoint,
bool use_ssl)
: name_(std::move(name)),
name_id_mapper_(name_id_mapper),
items_(items),
rpc_context_(use_ssl),
rpc_client_(endpoint, &rpc_context_) {}
void ReplicationClient::TransferSnapshot(const std::filesystem::path &path) {
auto stream{rpc_client_.Stream<SnapshotRpc>()};
Encoder encoder(stream.GetBuilder());
encoder.WriteFile(path);
stream.AwaitResponse();
}
void ReplicationClient::TransferWalFiles(
const std::vector<std::filesystem::path> &wal_files) {
CHECK(!wal_files.empty()) << "Wal files list is empty!";
auto stream{rpc_client_.Stream<WalFilesRpc>()};
Encoder encoder(stream.GetBuilder());
encoder.WriteUint(wal_files.size());
for (const auto &wal : wal_files) {
encoder.WriteFile(wal);
}
stream.AwaitResponse();
}
////// TransactionHandler //////
ReplicationClient::TransactionHandler::TransactionHandler(
ReplicationClient *self)
: self_(self), stream_(self_->rpc_client_.Stream<AppendDeltasRpc>()) {}
void ReplicationClient::TransactionHandler::AppendDelta(
const Delta &delta, const Vertex &vertex, uint64_t final_commit_timestamp) {
Encoder encoder(stream_.GetBuilder());
EncodeDelta(&encoder, self_->name_id_mapper_, self_->items_, delta, vertex,
final_commit_timestamp);
}
void ReplicationClient::TransactionHandler::AppendDelta(
const Delta &delta, const Edge &edge, uint64_t final_commit_timestamp) {
Encoder encoder(stream_.GetBuilder());
EncodeDelta(&encoder, self_->name_id_mapper_, delta, edge,
final_commit_timestamp);
}
void ReplicationClient::TransactionHandler::AppendTransactionEnd(
uint64_t final_commit_timestamp) {
Encoder encoder(stream_.GetBuilder());
EncodeTransactionEnd(&encoder, final_commit_timestamp);
}
void ReplicationClient::TransactionHandler::AppendOperation(
durability::StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties, uint64_t timestamp) {
Encoder encoder(stream_.GetBuilder());
EncodeOperation(&encoder, self_->name_id_mapper_, operation, label,
properties, timestamp);
}
void ReplicationClient::TransactionHandler::Finalize() {
stream_.AwaitResponse();
}
////// CurrentWalHandler //////
ReplicationClient::CurrentWalHandler::CurrentWalHandler(ReplicationClient *self)
: self_(self), stream_(self_->rpc_client_.Stream<WalFilesRpc>()) {
Encoder encoder(stream_.GetBuilder());
encoder.WriteUint(1);
}
void ReplicationClient::CurrentWalHandler::AppendFilename(
const std::string &filename) {
Encoder encoder(stream_.GetBuilder());
encoder.WriteString(filename);
}
void ReplicationClient::CurrentWalHandler::AppendSize(const size_t size) {
Encoder encoder(stream_.GetBuilder());
encoder.WriteUint(size);
}
void ReplicationClient::CurrentWalHandler::AppendFileData(
utils::InputFile *file) {
Encoder encoder(stream_.GetBuilder());
encoder.WriteFileData(file);
}
void ReplicationClient::CurrentWalHandler::AppendBufferData(
const uint8_t *buffer, const size_t buffer_size) {
Encoder encoder(stream_.GetBuilder());
encoder.WriteBuffer(buffer, buffer_size);
}
void ReplicationClient::CurrentWalHandler::Finalize() {
stream_.AwaitResponse();
}
} // namespace storage::replication

View File

@ -10,6 +10,7 @@
#include "storage/v2/property_value.hpp"
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/replication/serialization.hpp"
#include "utils/file.hpp"
namespace storage::replication {
@ -17,62 +18,73 @@ class ReplicationClient {
public:
ReplicationClient(std::string name, NameIdMapper *name_id_mapper,
Config::Items items, const io::network::Endpoint &endpoint,
bool use_ssl)
: name_(std::move(name)),
name_id_mapper_(name_id_mapper),
items_(items),
rpc_context_(use_ssl),
rpc_client_(endpoint, &rpc_context_) {}
bool use_ssl);
class Handler {
// Handler used for transfering the current transaction.
class TransactionHandler {
private:
friend class ReplicationClient;
/// @throw rpc::RpcFailedException
explicit Handler(ReplicationClient *self)
: self_(self), stream_(self_->rpc_client_.Stream<AppendDeltasRpc>()) {}
explicit TransactionHandler(ReplicationClient *self);
public:
/// @throw rpc::RpcFailedException
void AppendDelta(const Delta &delta, const Vertex &vertex,
uint64_t final_commit_timestamp) {
Encoder encoder(stream_.GetBuilder());
EncodeDelta(&encoder, self_->name_id_mapper_, self_->items_, delta,
vertex, final_commit_timestamp);
}
uint64_t final_commit_timestamp);
/// @throw rpc::RpcFailedException
void AppendDelta(const Delta &delta, const Edge &edge,
uint64_t final_commit_timestamp) {
Encoder encoder(stream_.GetBuilder());
EncodeDelta(&encoder, self_->name_id_mapper_, delta, edge,
final_commit_timestamp);
}
uint64_t final_commit_timestamp);
/// @throw rpc::RpcFailedException
void AppendTransactionEnd(uint64_t final_commit_timestamp) {
Encoder encoder(stream_.GetBuilder());
EncodeTransactionEnd(&encoder, final_commit_timestamp);
}
void AppendTransactionEnd(uint64_t final_commit_timestamp);
/// @throw rpc::RpcFailedException
void AppendOperation(durability::StorageGlobalOperation operation,
LabelId label, const std::set<PropertyId> &properties,
uint64_t timestamp) {
Encoder encoder(stream_.GetBuilder());
EncodeOperation(&encoder, self_->name_id_mapper_, operation, label,
properties, timestamp);
}
uint64_t timestamp);
/// @throw rpc::RpcFailedException
void Finalize() { stream_.AwaitResponse(); }
void Finalize();
private:
ReplicationClient *self_;
rpc::Client::StreamHandler<AppendDeltasRpc> stream_;
};
Handler ReplicateTransaction() { return Handler(this); }
TransactionHandler ReplicateTransaction() { return TransactionHandler(this); }
// Transfer the snapshot file.
// @param path Path of the snapshot file.
void TransferSnapshot(const std::filesystem::path &path);
// Handler for transfering the current WAL file whose data is
// contained in the internal buffer and the file.
class CurrentWalHandler {
private:
friend class ReplicationClient;
explicit CurrentWalHandler(ReplicationClient *self);
public:
void AppendFilename(const std::string &filename);
void AppendSize(size_t size);
void AppendFileData(utils::InputFile *file);
void AppendBufferData(const uint8_t *buffer, size_t buffer_size);
/// @throw rpc::RpcFailedException
void Finalize();
private:
ReplicationClient *self_;
rpc::Client::StreamHandler<WalFilesRpc> stream_;
};
CurrentWalHandler TransferCurrentWalFile() { return CurrentWalHandler{this}; }
// Transfer the WAL files
void TransferWalFiles(const std::vector<std::filesystem::path> &wal_files);
const auto &Name() const { return name_; }

View File

@ -1,15 +1,14 @@
#>cpp
#pragma once
#include <cstring>
#include <cstdint>
#include <cstring>
#include "rpc/messages.hpp"
#include "slk/serialization.hpp"
#include "slk/streams.hpp"
cpp<#
;; TODO(mferencevic): Change namespace to `storage::replication` once LCP is
;; TODO(antonio2368): Change namespace to `storage::replication` once LCP is
;; updated to support such namespaces.
(lcp:namespace storage)
@ -29,4 +28,16 @@ cpp<#
((success :bool)
(term :uint64_t))))
(lcp:define-rpc snapshot
(:request ())
(:response
((success :bool)
(term :uint64_t))))
(lcp:define-rpc wal-files
(:request ())
(:response
((success :bool)
(term :uint64_t))))
(lcp:pop-namespace) ;; storage

View File

@ -0,0 +1,165 @@
#include "storage/v2/replication/serialization.hpp"
namespace storage::replication {
////// Encoder //////
void Encoder::WriteMarker(durability::Marker marker) {
slk::Save(marker, builder_);
}
void Encoder::WriteBool(bool value) {
WriteMarker(durability::Marker::TYPE_BOOL);
slk::Save(value, builder_);
}
void Encoder::WriteUint(uint64_t value) {
WriteMarker(durability::Marker::TYPE_INT);
slk::Save(value, builder_);
}
void Encoder::WriteDouble(double value) {
WriteMarker(durability::Marker::TYPE_DOUBLE);
slk::Save(value, builder_);
}
void Encoder::WriteString(const std::string_view &value) {
WriteMarker(durability::Marker::TYPE_STRING);
slk::Save(value, builder_);
}
void Encoder::WritePropertyValue(const PropertyValue &value) {
WriteMarker(durability::Marker::TYPE_PROPERTY_VALUE);
slk::Save(value, builder_);
}
void Encoder::WriteBuffer(const uint8_t *buffer, const size_t buffer_size) {
builder_->Save(buffer, buffer_size);
}
void Encoder::WriteFileData(utils::InputFile *file) {
auto file_size = file->GetSize();
uint8_t buffer[utils::kFileBufferSize];
while (file_size > 0) {
const auto chunk_size = std::min(file_size, utils::kFileBufferSize);
file->Read(buffer, chunk_size);
WriteBuffer(buffer, chunk_size);
file_size -= chunk_size;
}
}
void Encoder::WriteFile(const std::filesystem::path &path) {
utils::InputFile file;
CHECK(file.Open(path)) << "Failed to open file " << path;
CHECK(path.has_filename()) << "Path does not have a filename!";
const auto &filename = path.filename().generic_string();
WriteString(filename);
auto file_size = file.GetSize();
WriteUint(file_size);
WriteFileData(&file);
file.Close();
}
////// Decoder //////
std::optional<durability::Marker> Decoder::ReadMarker() {
durability::Marker marker;
slk::Load(&marker, reader_);
return marker;
}
std::optional<bool> Decoder::ReadBool() {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_BOOL)
return std::nullopt;
bool value;
slk::Load(&value, reader_);
return value;
}
std::optional<uint64_t> Decoder::ReadUint() {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_INT)
return std::nullopt;
uint64_t value;
slk::Load(&value, reader_);
return value;
}
std::optional<double> Decoder::ReadDouble() {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_DOUBLE)
return std::nullopt;
double value;
slk::Load(&value, reader_);
return value;
}
std::optional<std::string> Decoder::ReadString() {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_STRING)
return std::nullopt;
std::string value;
slk::Load(&value, reader_);
return std::move(value);
}
std::optional<PropertyValue> Decoder::ReadPropertyValue() {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_PROPERTY_VALUE)
return std::nullopt;
PropertyValue value;
slk::Load(&value, reader_);
return std::move(value);
}
bool Decoder::SkipString() {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_STRING)
return false;
std::string value;
slk::Load(&value, reader_);
return true;
}
bool Decoder::SkipPropertyValue() {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_PROPERTY_VALUE)
return false;
PropertyValue value;
slk::Load(&value, reader_);
return true;
}
std::optional<std::filesystem::path> Decoder::ReadFile(
const std::filesystem::path &directory) {
CHECK(std::filesystem::exists(directory) &&
std::filesystem::is_directory(directory))
<< "Sent path for streamed files should be a valid directory!";
utils::OutputFile file;
const auto maybe_filename = ReadString();
CHECK(maybe_filename) << "Filename missing for the file";
const auto &filename = *maybe_filename;
auto path = directory / filename;
// Check if the file already exists so we don't overwrite it
const bool file_exists = std::filesystem::exists(path);
// TODO (antonio2368): Maybe append filename with custom suffix so we have
// both copies?
if (!file_exists) {
file.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING);
}
std::optional<size_t> maybe_file_size = ReadUint();
CHECK(maybe_file_size) << "File size missing";
auto file_size = *maybe_file_size;
uint8_t buffer[utils::kFileBufferSize];
while (file_size > 0) {
const auto chunk_size = std::min(file_size, utils::kFileBufferSize);
reader_->Load(buffer, chunk_size);
if (!file_exists) {
file.Write(buffer, chunk_size);
}
file_size -= chunk_size;
}
file.Close();
return std::move(path);
}
} // namespace storage::replication

View File

@ -1,9 +1,12 @@
#pragma once
#include <filesystem>
#include "slk/streams.hpp"
#include "storage/v2/durability/serialization.hpp"
#include "storage/v2/replication/slk.hpp"
#include "utils/cast.hpp"
#include "utils/file.hpp"
namespace storage::replication {
@ -11,34 +14,23 @@ class Encoder final : public durability::BaseEncoder {
public:
explicit Encoder(slk::Builder *builder) : builder_(builder) {}
void WriteMarker(durability::Marker marker) override {
slk::Save(marker, builder_);
}
void WriteMarker(durability::Marker marker) override;
void WriteBool(bool value) override {
WriteMarker(durability::Marker::TYPE_BOOL);
slk::Save(value, builder_);
}
void WriteBool(bool value) override;
void WriteUint(uint64_t value) override {
WriteMarker(durability::Marker::TYPE_INT);
slk::Save(value, builder_);
}
void WriteUint(uint64_t value) override;
void WriteDouble(double value) override {
WriteMarker(durability::Marker::TYPE_DOUBLE);
slk::Save(value, builder_);
}
void WriteDouble(double value) override;
void WriteString(const std::string_view &value) override {
WriteMarker(durability::Marker::TYPE_STRING);
slk::Save(value, builder_);
}
void WriteString(const std::string_view &value) override;
void WritePropertyValue(const PropertyValue &value) override {
WriteMarker(durability::Marker::TYPE_PROPERTY_VALUE);
slk::Save(value, builder_);
}
void WritePropertyValue(const PropertyValue &value) override;
void WriteBuffer(const uint8_t *buffer, size_t buffer_size);
void WriteFileData(utils::InputFile *file);
void WriteFile(const std::filesystem::path &path);
private:
slk::Builder *builder_;
@ -48,74 +40,27 @@ class Decoder final : public durability::BaseDecoder {
public:
explicit Decoder(slk::Reader *reader) : reader_(reader) {}
std::optional<durability::Marker> ReadMarker() override {
durability::Marker marker;
slk::Load(&marker, reader_);
return marker;
}
std::optional<durability::Marker> ReadMarker() override;
std::optional<bool> ReadBool() override {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_BOOL)
return std::nullopt;
bool value;
slk::Load(&value, reader_);
return value;
}
std::optional<bool> ReadBool() override;
std::optional<uint64_t> ReadUint() override {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_INT)
return std::nullopt;
uint64_t value;
slk::Load(&value, reader_);
return value;
}
std::optional<uint64_t> ReadUint() override;
std::optional<double> ReadDouble() override {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_DOUBLE)
return std::nullopt;
double value;
slk::Load(&value, reader_);
return value;
}
std::optional<double> ReadDouble() override;
std::optional<std::string> ReadString() override {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_STRING)
return std::nullopt;
std::string value;
slk::Load(&value, reader_);
return std::move(value);
}
std::optional<std::string> ReadString() override;
std::optional<PropertyValue> ReadPropertyValue() override {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_PROPERTY_VALUE)
return std::nullopt;
PropertyValue value;
slk::Load(&value, reader_);
return std::move(value);
}
std::optional<PropertyValue> ReadPropertyValue() override;
bool SkipString() override {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_STRING)
return false;
std::string value;
slk::Load(&value, reader_);
return true;
}
bool SkipString() override;
bool SkipPropertyValue() override {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_PROPERTY_VALUE)
return false;
PropertyValue value;
slk::Load(&value, reader_);
return true;
}
bool SkipPropertyValue() override;
/// Read the file and save it inside the specified directory.
/// @param directory Directory which will contain the read file.
/// @return If the read was successful, path to the read file.
std::optional<std::filesystem::path> ReadFile(
const std::filesystem::path &directory);
private:
slk::Reader *reader_;

View File

@ -11,9 +11,13 @@
#include "io/network/endpoint.hpp"
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/metadata.hpp"
#include "storage/v2/durability/paths.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/indices.hpp"
#include "storage/v2/mvcc.hpp"
#include "utils/file.hpp"
#include "utils/rw_lock.hpp"
#include "utils/spin_lock.hpp"
#include "utils/stat.hpp"
@ -394,23 +398,8 @@ Storage::Storage(Config config)
}
if (config_.durability.snapshot_wal_mode !=
Config::Durability::SnapshotWalMode::DISABLED) {
snapshot_runner_.Run(
"Snapshot", config_.durability.snapshot_interval, [this] {
// Take master RW lock (for reading).
std::shared_lock<utils::RWLock> storage_guard(main_lock_);
// Create the transaction used to create the snapshot.
auto transaction = CreateTransaction();
// Create snapshot.
durability::CreateSnapshot(
&transaction, snapshot_directory_, wal_directory_,
config_.durability.snapshot_retention_count, &vertices_, &edges_,
&name_id_mapper_, &indices_, &constraints_, config_.items, uuid_);
// Finalize snapshot transaction.
commit_log_.MarkFinished(transaction.start_timestamp);
});
snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval,
[this] { this->CreateSnapshot(); });
}
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
gc_runner_.Run("Storage GC", config_.gc.interval,
@ -435,8 +424,10 @@ Storage::~Storage() {
}
#ifdef MG_ENTERPRISE
{
std::lock_guard<utils::RWLock> replication_guard(replication_lock_);
rpc_context_.emplace<std::monostate>();
// Clear replication data
std::unique_lock<utils::RWLock> replication_guard(replication_lock_);
replication_server_.reset();
replication_clients_.WithLock([&](auto &clients) { clients.clear(); });
}
#endif
wal_file_ = std::nullopt;
@ -445,20 +436,7 @@ Storage::~Storage() {
snapshot_runner_.Stop();
}
if (config_.durability.snapshot_on_exit) {
// Take master RW lock (for reading).
std::shared_lock<utils::RWLock> storage_guard(main_lock_);
// Create the transaction used to create the snapshot.
auto transaction = CreateTransaction();
// Create snapshot.
durability::CreateSnapshot(
&transaction, snapshot_directory_, wal_directory_,
config_.durability.snapshot_retention_count, &vertices_, &edges_,
&name_id_mapper_, &indices_, &constraints_, config_.items, uuid_);
// Finalize snapshot transaction.
commit_log_.MarkFinished(transaction.start_timestamp);
CreateSnapshot();
}
}
@ -1640,6 +1618,12 @@ void Storage::FinalizeWalFile() {
config_.durability.wal_file_size_kibibytes) {
wal_file_ = std::nullopt;
wal_unsynced_transactions_ = 0;
} else {
// Try writing the internal buffer if possible, if not
// the data should be written as soon as it's possible
// (triggered by the new transaction commit, or some
// reading thread EnabledFlushing)
wal_file_->TryFlushing();
}
}
@ -1655,17 +1639,17 @@ void Storage::AppendToWal(const Transaction &transaction,
// We need to keep this lock because handler takes a pointer to the client
// from which it was created
std::shared_lock<utils::RWLock> replication_guard(replication_lock_);
std::list<replication::ReplicationClient::Handler> streams;
std::list<replication::ReplicationClient::TransactionHandler> streams;
if (replication_state_.load() == ReplicationState::MAIN) {
auto &replication_clients = GetRpcContext<ReplicationClientList>();
try {
std::transform(replication_clients.begin(), replication_clients.end(),
std::back_inserter(streams), [](auto &client) {
return client.ReplicateTransaction();
});
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
}
replication_clients_.WithLock([&](auto &clients) {
try {
std::transform(
clients.begin(), clients.end(), std::back_inserter(streams),
[](auto &client) { return client.ReplicateTransaction(); });
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
}
});
}
#endif
@ -1845,43 +1829,67 @@ void Storage::AppendToWal(durability::StorageGlobalOperation operation,
wal_file_->AppendOperation(operation, label, properties,
final_commit_timestamp);
#ifdef MG_ENTERPRISE
std::shared_lock<utils::RWLock> replication_guard(replication_lock_);
if (replication_state_.load() == ReplicationState::MAIN) {
auto &replication_clients = GetRpcContext<ReplicationClientList>();
for (auto &client : replication_clients) {
auto stream = client.ReplicateTransaction();
try {
stream.AppendOperation(operation, label, properties,
final_commit_timestamp);
stream.Finalize();
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
}
{
std::shared_lock<utils::RWLock> replication_guard(replication_lock_);
if (replication_state_.load() == ReplicationState::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
auto stream = client.ReplicateTransaction();
try {
stream.AppendOperation(operation, label, properties,
final_commit_timestamp);
stream.Finalize();
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
}
}
});
}
}
replication_guard.unlock();
#endif
FinalizeWalFile();
}
void Storage::CreateSnapshot() {
#ifdef MG_ENTERPRISE
if (replication_state_.load() != ReplicationState::MAIN) {
LOG(WARNING) << "Snapshots are disabled for replicas!";
return;
}
#endif
// Take master RW lock (for reading).
std::shared_lock<utils::RWLock> storage_guard(main_lock_);
// Create the transaction used to create the snapshot.
auto transaction = CreateTransaction();
// Create snapshot.
durability::CreateSnapshot(&transaction, snapshot_directory_, wal_directory_,
config_.durability.snapshot_retention_count,
&vertices_, &edges_, &name_id_mapper_, &indices_,
&constraints_, config_.items, uuid_,
&file_retainer_);
// Finalize snapshot transaction.
commit_log_.MarkFinished(transaction.start_timestamp);
}
#ifdef MG_ENTERPRISE
void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
rpc_context_.emplace<ReplicationServer>();
auto &replication_server = GetRpcContext<ReplicationServer>();
replication_server_.emplace();
// Create RPC server.
// TODO(mferencevic): Add support for SSL.
replication_server.rpc_server_context.emplace();
// TODO (antonio2368): Add support for SSL.
replication_server_->rpc_server_context.emplace();
// NOTE: The replication server must have a single thread for processing
// because there is no need for more processing threads - each replica can
// have only a single main server. Also, the single-threaded guarantee
// simplifies the rest of the implementation.
// TODO(mferencevic): Make endpoint configurable.
replication_server.rpc_server.emplace(endpoint,
&*replication_server.rpc_server_context,
/* workers_count = */ 1);
replication_server.rpc_server->Register<
replication_server_->rpc_server.emplace(
endpoint, &*replication_server_->rpc_server_context,
/* workers_count = */ 1);
replication_server_->rpc_server->Register<
AppendDeltasRpc>([this, endpoint = std::move(endpoint)](
auto *req_reader, auto *res_builder) {
AppendDeltasReq req;
@ -2251,36 +2259,209 @@ void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
AppendDeltasRes res;
slk::Save(res, res_builder);
});
replication_server.rpc_server->Start();
replication_server_->rpc_server->Register<SnapshotRpc>(
[this](auto *req_reader, auto *res_builder) {
DLOG(INFO) << "Received SnapshotRpc";
SnapshotReq req;
slk::Load(&req, req_reader);
replication::Decoder decoder(req_reader);
utils::EnsureDirOrDie(snapshot_directory_);
const auto maybe_snapshot_path = decoder.ReadFile(snapshot_directory_);
CHECK(maybe_snapshot_path) << "Failed to load snapshot!";
DLOG(INFO) << "Received snapshot saved to " << *maybe_snapshot_path;
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
// Clear the database
vertices_.clear();
edges_.clear();
constraints_ = Constraints();
// TODO (antonio2368): Check if there's a less hacky way
indices_.label_index =
LabelIndex(&indices_, &constraints_, config_.items);
indices_.label_property_index =
LabelPropertyIndex(&indices_, &constraints_, config_.items);
try {
DLOG(INFO) << "Loading snapshot";
auto recovered_snapshot = durability::LoadSnapshot(
*maybe_snapshot_path, &vertices_, &edges_, &name_id_mapper_,
&edge_count_, config_.items);
DLOG(INFO) << "Snapshot loaded successfully";
// If this step is present it should always be the first step of
// the recovery so we use the UUID we read from snasphost
uuid_ = recovered_snapshot.snapshot_info.uuid;
const auto &recovery_info = recovered_snapshot.recovery_info;
vertex_id_ = recovery_info.next_vertex_id;
edge_id_ = recovery_info.next_edge_id;
timestamp_ = std::max(timestamp_, recovery_info.next_timestamp);
durability::RecoverIndicesAndConstraints(
recovered_snapshot.indices_constraints, &indices_, &constraints_,
&vertices_);
} catch (const durability::RecoveryFailure &e) {
// TODO (antonio2368): What to do if the sent snapshot is invalid
LOG(WARNING) << "Couldn't load the snapshot because of: " << e.what();
}
storage_guard.unlock();
SnapshotRes res;
slk::Save(res, res_builder);
});
replication_server_->rpc_server->Register<WalFilesRpc>(
[this](auto *req_reader, auto *res_builder) {
DLOG(INFO) << "Received WalFilesRpc";
WalFilesReq req;
slk::Load(&req, req_reader);
replication::Decoder decoder(req_reader);
utils::EnsureDirOrDie(wal_directory_);
const auto maybe_wal_file_number = decoder.ReadUint();
CHECK(maybe_wal_file_number)
<< "Failed to read number of received WAL files!";
const auto wal_file_number = *maybe_wal_file_number;
DLOG(INFO) << "Received WAL files: " << wal_file_number;
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
durability::RecoveredIndicesAndConstraints indices_constraints;
for (auto i = 0; i < wal_file_number; ++i) {
const auto maybe_wal_path = decoder.ReadFile(wal_directory_);
CHECK(maybe_wal_path) << "Failed to load WAL!";
DLOG(INFO) << "Received WAL saved to " << *maybe_wal_path;
// TODO (antonio2368): Use timestamp_ for now, but the timestamp_ can
// increase by running read queries so define variable that will keep
// last received timestamp
try {
auto wal_info = durability::ReadWalInfo(*maybe_wal_path);
if (wal_info.seq_num == 0) {
uuid_ = wal_info.uuid;
}
auto info = durability::LoadWal(
*maybe_wal_path, &indices_constraints, timestamp_, &vertices_,
&edges_, &name_id_mapper_, &edge_count_, config_.items);
vertex_id_ = std::max(vertex_id_.load(), info.next_vertex_id);
edge_id_ = std::max(edge_id_.load(), info.next_edge_id);
timestamp_ = std::max(timestamp_, info.next_timestamp);
DLOG(INFO) << *maybe_wal_path << " loaded successfully";
} catch (const durability::RecoveryFailure &e) {
LOG(FATAL) << "Couldn't recover WAL deltas from " << *maybe_wal_path
<< " because of: " << e.what();
}
}
durability::RecoverIndicesAndConstraints(indices_constraints, &indices_,
&constraints_, &vertices_);
storage_guard.unlock();
WalFilesRes res;
slk::Save(res, res_builder);
});
replication_server_->rpc_server->Start();
}
void Storage::RegisterReplica(std::string name,
io::network::Endpoint endpoint) {
std::unique_lock<utils::RWLock> replication_guard(replication_lock_);
std::shared_lock guard(replication_lock_);
CHECK(replication_state_.load() == ReplicationState::MAIN)
<< "Only main instance can register a replica!";
auto &replication_clients = GetRpcContext<ReplicationClientList>();
// TODO (antonio2368): Check if it's okay to aquire first the shared lock
// and later on aquire the write lock only if it's necessary
// because here we wait for the write lock even though there exists
// a replica with a same name
if (std::any_of(replication_clients.begin(), replication_clients.end(),
[&](auto &client) { return client.Name() == name; })) {
throw utils::BasicException("Replica with a same name already exists!");
// We can safely add new elements to the list because it doesn't validate
// existing references/iteratos
auto &client = replication_clients_.WithLock([&](auto &clients) -> auto & {
if (std::any_of(clients.begin(), clients.end(),
[&](auto &client) { return client.Name() == name; })) {
throw utils::BasicException("Replica with a same name already exists!");
}
clients.emplace_back(std::move(name), &name_id_mapper_, config_.items,
endpoint, false);
return clients.back();
});
// Recovery
auto recovery_locker = file_retainer_.AddLocker();
std::optional<std::filesystem::path> snapshot_file;
std::vector<std::filesystem::path> wal_files;
std::optional<uint64_t> latest_seq_num;
{
auto recovery_locker_acc = recovery_locker.Access();
// For now we assume we need to send the latest snapshot
auto snapshot_files =
durability::GetSnapshotFiles(snapshot_directory_, uuid_);
if (!snapshot_files.empty()) {
std::sort(snapshot_files.begin(), snapshot_files.end());
// TODO (antonio2368): Send the last snapshot for now
// check if additional logic is necessary
// Also, prevent the deletion of the snapshot file and the required
// WALs
snapshot_file.emplace(std::move(snapshot_files.back().first));
recovery_locker_acc.AddFile(*snapshot_file);
}
{
// So the wal_file_ isn't overwritten
std::unique_lock engine_guard(engine_lock_);
if (wal_file_) {
// For now let's disable flushing until we send the current WAL
wal_file_->DisableFlushing();
latest_seq_num = wal_file_->SequenceNumber();
}
}
auto maybe_wal_files =
durability::GetWalFiles(wal_directory_, uuid_, latest_seq_num);
CHECK(maybe_wal_files) << "Failed to find WAL files";
auto &wal_files_with_seq = *maybe_wal_files;
std::optional<uint64_t> previous_seq_num;
for (const auto &[seq_num, from_timestamp, to_timestamp, path] :
wal_files_with_seq) {
if (previous_seq_num && *previous_seq_num + 1 != seq_num) {
LOG(FATAL) << "You are missing a WAL file with the sequence number "
<< *previous_seq_num + 1 << "!";
}
previous_seq_num = seq_num;
wal_files.push_back(path);
recovery_locker_acc.AddFile(path);
}
}
replication_clients.emplace_back(std::move(name), &name_id_mapper_,
config_.items, endpoint, false);
if (snapshot_file) {
DLOG(INFO) << "Sending the latest snapshot file: " << *snapshot_file;
client.TransferSnapshot(*snapshot_file);
}
if (!wal_files.empty()) {
DLOG(INFO) << "Sending the latest wal files";
client.TransferWalFiles(wal_files);
}
// We have a current WAL
if (latest_seq_num) {
auto stream = client.TransferCurrentWalFile();
stream.AppendFilename(wal_file_->Path().filename());
utils::InputFile file;
CHECK(file.Open(wal_file_->Path())) << "Failed to open current WAL file!";
const auto [buffer, buffer_size] = wal_file_->CurrentFileBuffer();
stream.AppendSize(file.GetSize() + buffer_size);
stream.AppendFileData(&file);
stream.AppendBufferData(buffer, buffer_size);
stream.Finalize();
wal_file_->EnableFlushing();
}
}
void Storage::UnregisterReplica(const std::string &name) {
std::unique_lock<utils::RWLock> replication_guard(replication_lock_);
CHECK(replication_state_.load() == ReplicationState::MAIN)
<< "Only main instance can unregister a replica!";
auto &replication_clients = GetRpcContext<ReplicationClientList>();
replication_clients.remove_if(
[&](const auto &client) { return client.Name() == name; });
replication_clients_.WithLock([&](auto &clients) {
clients.remove_if(
[&](const auto &client) { return client.Name() == name; });
});
}
#endif

View File

@ -4,7 +4,6 @@
#include <filesystem>
#include <optional>
#include <shared_mutex>
#include <variant>
#include "storage/v2/commit_log.hpp"
#include "storage/v2/config.hpp"
@ -19,6 +18,7 @@
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "utils/file_locker.hpp"
#include "utils/rw_lock.hpp"
#include "utils/scheduler.hpp"
#include "utils/skip_list.hpp"
@ -413,12 +413,12 @@ class Storage final {
}
std::unique_lock<utils::RWLock> replication_guard(replication_lock_);
rpc_context_.emplace<std::monostate>();
if constexpr (state == ReplicationState::REPLICA) {
ConfigureReplica(std::forward<Args>(args)...);
} else if (state == ReplicationState::MAIN) {
rpc_context_.emplace<ReplicationClientList>();
// Main instance does not need replication server
replication_server_.reset();
}
replication_state_.store(state);
@ -444,6 +444,8 @@ class Storage final {
const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp);
void CreateSnapshot();
#ifdef MG_ENTERPRISE
void ConfigureReplica(io::network::Endpoint endpoint);
#endif
@ -521,6 +523,8 @@ class Storage final {
std::optional<durability::WalFile> wal_file_;
uint64_t wal_unsynced_transactions_{0};
utils::FileRetainer file_retainer_;
// Replication
#ifdef MG_ENTERPRISE
utils::RWLock replication_lock_{utils::RWLock::Priority::WRITE};
@ -543,18 +547,12 @@ class Storage final {
}
};
using ReplicationClientList = std::list<replication::ReplicationClient>;
// Monostate is used for explicitly calling the destructor of the current
// type
std::variant<ReplicationClientList, ReplicationServer, std::monostate>
rpc_context_;
using ReplicationClientList =
utils::Synchronized<std::list<replication::ReplicationClient>,
utils::SpinLock>;
template <typename TRpcContext>
TRpcContext &GetRpcContext() {
auto *context = std::get_if<TRpcContext>(&rpc_context_);
CHECK(context) << "Wrong type set for the current replication state!";
return *context;
}
std::optional<ReplicationServer> replication_server_;
ReplicationClientList replication_clients_;
std::atomic<ReplicationState> replication_state_{ReplicationState::MAIN};
#endif

View File

@ -7,6 +7,8 @@
#include <cstring>
#include <fstream>
#include <mutex>
#include <shared_mutex>
#include <type_traits>
#include <glog/logging.h>
@ -289,9 +291,9 @@ OutputFile::~OutputFile() {
OutputFile::OutputFile(OutputFile &&other) noexcept
: fd_(other.fd_),
written_since_last_sync_(other.written_since_last_sync_),
path_(std::move(other.path_)),
buffer_position_(other.buffer_position_) {
path_(std::move(other.path_)) {
memcpy(buffer_, other.buffer_, kFileBufferSize);
buffer_position_.store(other.buffer_position_.load());
other.fd_ = -1;
other.written_since_last_sync_ = 0;
other.buffer_position_ = 0;
@ -303,7 +305,7 @@ OutputFile &OutputFile::operator=(OutputFile &&other) noexcept {
fd_ = other.fd_;
written_since_last_sync_ = other.written_since_last_sync_;
path_ = std::move(other.path_);
buffer_position_ = other.buffer_position_;
buffer_position_ = other.buffer_position_.load();
memcpy(buffer_, other.buffer_, kFileBufferSize);
other.fd_ = -1;
@ -350,13 +352,21 @@ const std::filesystem::path &OutputFile::path() const { return path_; }
void OutputFile::Write(const uint8_t *data, size_t size) {
while (size > 0) {
FlushBuffer(false);
auto buffer_left = kFileBufferSize - buffer_position_;
auto to_write = size < buffer_left ? size : buffer_left;
memcpy(buffer_ + buffer_position_, data, to_write);
size -= to_write;
data += to_write;
buffer_position_ += to_write;
written_since_last_sync_ += to_write;
{
// Reading thread can call EnableFlushing which triggers
// TryFlushing.
// We can't use a single shared lock for the entire Write
// because FlushBuffer acquires the unique_lock.
std::shared_lock flush_guard(flush_lock_);
const size_t buffer_position = buffer_position_.load();
auto buffer_left = kFileBufferSize - buffer_position;
auto to_write = size < buffer_left ? size : buffer_left;
memcpy(buffer_ + buffer_position, data, to_write);
size -= to_write;
data += to_write;
buffer_position_.fetch_add(to_write);
written_since_last_sync_ += to_write;
}
}
}
@ -367,13 +377,7 @@ void OutputFile::Write(const std::string_view &data) {
Write(data.data(), data.size());
}
size_t OutputFile::GetPosition() {
return SetPosition(Position::RELATIVE_TO_CURRENT, 0);
}
size_t OutputFile::SetPosition(Position position, ssize_t offset) {
FlushBuffer(true);
size_t OutputFile::SeekFile(const Position position, const ssize_t offset) {
int whence;
switch (position) {
case Position::SET:
@ -398,6 +402,15 @@ size_t OutputFile::SetPosition(Position position, ssize_t offset) {
}
}
size_t OutputFile::GetPosition() {
return SetPosition(Position::RELATIVE_TO_CURRENT, 0);
}
size_t OutputFile::SetPosition(Position position, ssize_t offset) {
FlushBuffer(true);
return SeekFile(position, offset);
}
bool OutputFile::AcquireLock() {
CHECK(IsOpen()) << "Trying to acquire a write lock on an unopened file!";
int ret = -1;
@ -497,14 +510,20 @@ void OutputFile::Close() noexcept {
void OutputFile::FlushBuffer(bool force_flush) {
CHECK(IsOpen());
if (!force_flush && buffer_position_ < kFileBufferSize) return;
if (!force_flush && buffer_position_.load() < kFileBufferSize) return;
std::unique_lock flush_guard(flush_lock_);
FlushBufferInternal();
}
void OutputFile::FlushBufferInternal() {
CHECK(buffer_position_ <= kFileBufferSize)
<< "While trying to write to " << path_
<< " more file was written to the buffer than the buffer has space!";
auto *buffer = buffer_;
while (buffer_position_ > 0) {
auto buffer_position = buffer_position_.load();
while (buffer_position > 0) {
auto written = write(fd_, buffer, buffer_position_);
if (written == -1 && errno == EINTR) {
continue;
@ -517,9 +536,40 @@ void OutputFile::FlushBuffer(bool force_flush) {
<< " bytes of data were lost from this call and possibly "
<< written_since_last_sync_ << " bytes were lost from previous calls.";
buffer_position_ -= written;
buffer_position -= written;
buffer += written;
}
buffer_position_.store(buffer_position);
}
void OutputFile::DisableFlushing() { flush_lock_.lock_shared(); }
void OutputFile::EnableFlushing() {
flush_lock_.unlock_shared();
TryFlushing();
}
std::pair<const uint8_t *, size_t> OutputFile::CurrentBuffer() const {
return {buffer_, buffer_position_.load()};
}
size_t OutputFile::GetSize() {
// There's an alternative way of fetching the files size using fstat.
// lseek should be faster for smaller number of clients while fstat
// should have an advantage for high number of clients.
// The reason for this is the way those functions implement the
// support for multi-threading. While lseek uses locks, fstat is lockfree.
// For now, lseek should be good enough. If at any point this proves to
// be a bottleneck, fstat should be considered.
return SeekFile(Position::RELATIVE_TO_END, 0) + buffer_position_.load();
}
void OutputFile::TryFlushing() {
if (std::unique_lock guard(flush_lock_, std::try_to_lock);
guard.owns_lock()) {
FlushBufferInternal();
}
}
} // namespace utils

View File

@ -6,12 +6,15 @@
*/
#pragma once
#include <atomic>
#include <filesystem>
#include <optional>
#include <string>
#include <string_view>
#include <vector>
#include "utils/rw_lock.hpp"
namespace utils {
/// Get the path of the current executable.
@ -152,7 +155,11 @@ class InputFile {
/// written to permanent storage.
///
/// This class *isn't* thread safe. It is implemented as a wrapper around low
/// level system calls used for file manipulation.
/// level system calls used for file manipulation. It allows concurrent
/// READING of the file that is being written. To read the file, disable the
/// flushing of the internal buffer using `DisableFlushing`. Don't forget to
/// enable flushing again after you're done with reading using the
/// 'EnableFlushing' method!
class OutputFile {
public:
enum class Mode {
@ -220,14 +227,37 @@ class OutputFile {
/// file. On failure and misuse it crashes the program.
void Close() noexcept;
/// Disable flushing of the internal buffer.
void DisableFlushing();
/// Enable flushing of the internal buffer.
/// Before the flushing is enabled, the internal buffer
/// is flushed.
void EnableFlushing();
/// Try flushing the internal buffer.
void TryFlushing();
/// Get the internal buffer with its current size.
std::pair<const uint8_t *, size_t> CurrentBuffer() const;
/// Get the size of the file.
size_t GetSize();
private:
void FlushBuffer(bool force_flush);
void FlushBufferInternal();
size_t SeekFile(Position position, ssize_t offset);
int fd_{-1};
size_t written_since_last_sync_{0};
std::filesystem::path path_;
uint8_t buffer_[kFileBufferSize];
size_t buffer_position_{0};
std::atomic<size_t> buffer_position_{0};
// Flushing buffer should be a higher priority
utils::RWLock flush_lock_{RWLock::Priority::WRITE};
};
} // namespace utils

View File

@ -79,7 +79,7 @@ class Synchronized {
LockedPtr Lock() { return LockedPtr(&object_, &mutex_); }
template <class TCallable>
auto WithLock(TCallable &&callable) {
decltype(auto) WithLock(TCallable &&callable) {
return callable(*Lock());
}

View File

@ -350,3 +350,134 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) {
ASSERT_FALSE(acc.Commit().HasError());
}
}
TEST_F(ReplicationTest, RecoveryProcess) {
std::vector<storage::Gid> vertex_gids;
// Force the creation of snapshot
{
storage::Storage main_store(
{.durability = {
.storage_directory = storage_directory,
.recover_on_startup = true,
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL,
.snapshot_on_exit = true,
}});
{
auto acc = main_store.Access();
// Create the vertex before registering a replica
auto v = acc.CreateVertex();
vertex_gids.emplace_back(v.Gid());
ASSERT_FALSE(acc.Commit().HasError());
}
}
{
// Create second WAL
storage::Storage main_store(
{.durability = {.storage_directory = storage_directory,
.recover_on_startup = true,
.snapshot_wal_mode = storage::Config::Durability::
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}});
// Create vertices in 2 different transactions
{
auto acc = main_store.Access();
auto v = acc.CreateVertex();
vertex_gids.emplace_back(v.Gid());
ASSERT_FALSE(acc.Commit().HasError());
}
{
auto acc = main_store.Access();
auto v = acc.CreateVertex();
vertex_gids.emplace_back(v.Gid());
ASSERT_FALSE(acc.Commit().HasError());
}
}
storage::Storage main_store(
{.durability = {
.storage_directory = storage_directory,
.recover_on_startup = true,
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL,
}});
constexpr const auto *property_name = "property_name";
constexpr const auto property_value = 1;
{
// Force the creation of current WAL file
auto acc = main_store.Access();
for (const auto &vertex_gid : vertex_gids) {
auto v = acc.FindVertex(vertex_gid, storage::View::OLD);
ASSERT_TRUE(v);
ASSERT_TRUE(v->SetProperty(main_store.NameToProperty(property_name),
storage::PropertyValue(property_value))
.HasValue());
}
ASSERT_FALSE(acc.Commit().HasError());
}
std::filesystem::path replica_storage_directory{
std::filesystem::temp_directory_path() /
"MG_test_unit_storage_v2_replication_replica"};
utils::OnScopeExit replica_directory_cleaner(
[&]() { std::filesystem::remove_all(replica_storage_directory); });
{
storage::Storage replica_store(
{.durability = {.storage_directory = replica_storage_directory}});
replica_store.SetReplicationState<storage::ReplicationState::REPLICA>(
io::network::Endpoint{"127.0.0.1", 10000});
main_store.RegisterReplica("REPLICA1",
io::network::Endpoint{"127.0.0.1", 10000});
constexpr const auto *vertex_label = "vertex_label";
{
auto acc = main_store.Access();
for (const auto &vertex_gid : vertex_gids) {
auto v = acc.FindVertex(vertex_gid, storage::View::OLD);
ASSERT_TRUE(v);
ASSERT_TRUE(
v->AddLabel(main_store.NameToLabel(vertex_label)).HasValue());
}
ASSERT_FALSE(acc.Commit().HasError());
}
{
auto acc = replica_store.Access();
for (const auto &vertex_gid : vertex_gids) {
auto v = acc.FindVertex(vertex_gid, storage::View::OLD);
ASSERT_TRUE(v);
const auto labels = v->Labels(storage::View::OLD);
ASSERT_TRUE(labels.HasValue());
ASSERT_THAT(*labels, UnorderedElementsAre(
replica_store.NameToLabel(vertex_label)));
const auto properties = v->Properties(storage::View::OLD);
ASSERT_TRUE(properties.HasValue());
ASSERT_THAT(*properties,
UnorderedElementsAre(std::make_pair(
replica_store.NameToProperty(property_name),
storage::PropertyValue(property_value))));
}
ASSERT_FALSE(acc.Commit().HasError());
}
}
// Test recovery of the replica with the files received from Main
{
storage::Storage replica_store(
{.durability = {.storage_directory = replica_storage_directory,
.recover_on_startup = true}});
{
auto acc = replica_store.Access();
for (const auto &vertex_gid : vertex_gids) {
auto v = acc.FindVertex(vertex_gid, storage::View::OLD);
ASSERT_TRUE(v);
const auto labels = v->Labels(storage::View::OLD);
ASSERT_TRUE(labels.HasValue());
// Labels are received with AppendDeltasRpc so they are not saved on
// disk
ASSERT_EQ(labels->size(), 0);
}
ASSERT_FALSE(acc.Commit().HasError());
}
}
}

View File

@ -1,13 +1,17 @@
#include <chrono>
#include <fstream>
#include <map>
#include <string>
#include <thread>
#include <vector>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "utils/file.hpp"
#include "utils/spin_lock.hpp"
#include "utils/string.hpp"
#include "utils/synchronized.hpp"
namespace fs = std::filesystem;
@ -283,3 +287,95 @@ TEST_F(UtilsFileTest, OutputFileDescriptorLeackage) {
utils::OutputFile::Mode::APPEND_TO_EXISTING);
}
}
TEST_F(UtilsFileTest, ConcurrentReadingAndWritting) {
const auto file_path = storage / "existing_dir_777" / "existing_file_777";
utils::OutputFile handle;
handle.Open(file_path, utils::OutputFile::Mode::OVERWRITE_EXISTING);
std::default_random_engine engine(586478780);
std::uniform_int_distribution<int> random_short_wait(1, 10);
const auto sleep_for = [&](int milliseconds) {
std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds));
};
constexpr size_t number_of_writes = 500;
std::thread writer_thread([&] {
uint8_t current_number = 0;
for (size_t i = 0; i < number_of_writes; ++i) {
handle.Write(&current_number, 1);
++current_number;
handle.TryFlushing();
sleep_for(random_short_wait(engine));
}
});
constexpr size_t reader_threads_num = 7;
// number_of_reads needs to be higher than number_of_writes
// so we maximize the chance of having at least one reading
// thread that will read all of the data.
constexpr size_t number_of_reads = 550;
std::vector<std::thread> reader_threads(reader_threads_num);
utils::Synchronized<std::vector<size_t>, utils::SpinLock> max_read_counts;
for (size_t i = 0; i < reader_threads_num; ++i) {
reader_threads.emplace_back([&] {
for (size_t i = 0; i < number_of_reads; ++i) {
handle.DisableFlushing();
auto [buffer, buffer_size] = handle.CurrentBuffer();
utils::InputFile input_handle;
input_handle.Open(file_path);
std::optional<uint8_t> previous_number;
size_t total_read_count = 0;
uint8_t current_number;
// Read the file
while (input_handle.Read(&current_number, 1)) {
if (previous_number) {
const uint8_t expected_next = *previous_number + 1;
ASSERT_TRUE(current_number == expected_next);
}
previous_number = current_number;
++total_read_count;
}
// Read the buffer
while (buffer_size > 0) {
if (previous_number) {
const uint8_t expected_next = *previous_number + 1;
ASSERT_TRUE(*buffer == expected_next);
}
previous_number = *buffer;
++buffer;
--buffer_size;
++total_read_count;
}
handle.EnableFlushing();
input_handle.Close();
// Last read will always have the highest amount of
// bytes read.
if (i == number_of_reads - 1) {
max_read_counts.WithLock([&](auto &read_counts) {
read_counts.push_back(total_read_count);
});
}
sleep_for(random_short_wait(engine));
}
});
}
if (writer_thread.joinable()) {
writer_thread.join();
}
for (auto &reader_thread : reader_threads) {
if (reader_thread.joinable()) {
reader_thread.join();
}
}
handle.Close();
// Check if any of the threads read the entire data.
ASSERT_TRUE(max_read_counts.WithLock([&](auto &read_counts) {
return std::any_of(
read_counts.cbegin(), read_counts.cend(),
[](const auto read_count) { return read_count == number_of_writes; });
}));
}