From bc0c9449101c44d404f7fa47f1684dbff2ba94b0 Mon Sep 17 00:00:00 2001 From: antonio2368 Date: Thu, 12 Nov 2020 09:55:56 +0100 Subject: [PATCH] Add replica recovery process (#40) * Add file transfer over RPC * Snapshot transfer implementation * Allow snapshot creation only for MAIN instances * Replica and main can have replication clients * Use only snapshots and WALs that are from the Main storage * Add flush lock and expose buffer * Add fstat for file size and TryFlushing method * Use lseek for size Co-authored-by: Antonio Andelic --- src/storage/v2/CMakeLists.txt | 2 + src/storage/v2/durability/durability.cpp | 176 ++++++---- src/storage/v2/durability/durability.hpp | 36 ++ src/storage/v2/durability/serialization.cpp | 12 + src/storage/v2/durability/serialization.hpp | 12 + src/storage/v2/durability/snapshot.cpp | 9 +- src/storage/v2/durability/snapshot.hpp | 4 +- src/storage/v2/durability/wal.cpp | 17 +- src/storage/v2/durability/wal.hpp | 15 + src/storage/v2/replication/replication.cpp | 107 ++++++ src/storage/v2/replication/replication.hpp | 76 ++-- src/storage/v2/replication/rpc.lcp | 17 +- src/storage/v2/replication/serialization.cpp | 165 +++++++++ src/storage/v2/replication/serialization.hpp | 113 ++---- src/storage/v2/storage.cpp | 343 ++++++++++++++----- src/storage/v2/storage.hpp | 26 +- src/utils/file.cpp | 90 +++-- src/utils/file.hpp | 34 +- src/utils/synchronized.hpp | 2 +- tests/unit/storage_v2_replication.cpp | 131 +++++++ tests/unit/utils_file.cpp | 96 ++++++ 21 files changed, 1167 insertions(+), 316 deletions(-) create mode 100644 src/storage/v2/replication/replication.cpp create mode 100644 src/storage/v2/replication/serialization.cpp diff --git a/src/storage/v2/CMakeLists.txt b/src/storage/v2/CMakeLists.txt index 79e8ef52d..641dc5607 100644 --- a/src/storage/v2/CMakeLists.txt +++ b/src/storage/v2/CMakeLists.txt @@ -19,6 +19,8 @@ if(MG_ENTERPRISE) set(storage_v2_src_files ${storage_v2_src_files} + replication/replication.cpp + replication/serialization.cpp replication/slk.cpp ${lcp_storage_cpp_files}) endif() diff --git a/src/storage/v2/durability/durability.cpp b/src/storage/v2/durability/durability.cpp index bd0426bc7..679e53a11 100644 --- a/src/storage/v2/durability/durability.cpp +++ b/src/storage/v2/durability/durability.cpp @@ -50,6 +50,99 @@ void VerifyStorageDirectoryOwnerAndProcessUserOrDie( << ". Please start the process as user " << user_directory << "!"; } +std::vector> GetSnapshotFiles( + const std::filesystem::path &snapshot_directory, + const std::string_view uuid) { + std::vector> snapshot_files; + std::error_code error_code; + if (utils::DirExists(snapshot_directory)) { + for (const auto &item : + std::filesystem::directory_iterator(snapshot_directory, error_code)) { + if (!item.is_regular_file()) continue; + try { + auto info = ReadSnapshotInfo(item.path()); + if (uuid.empty() || info.uuid == uuid) { + snapshot_files.emplace_back(item.path(), info.uuid); + } + } catch (const RecoveryFailure &) { + continue; + } + } + CHECK(!error_code) << "Couldn't recover data because an error occurred: " + << error_code.message() << "!"; + } + + return snapshot_files; +} + +std::optional>> +GetWalFiles(const std::filesystem::path &wal_directory, + const std::string_view uuid, + const std::optional current_seq_num) { + if (!utils::DirExists(wal_directory)) return std::nullopt; + + std::vector> + wal_files; + std::error_code error_code; + for (const auto &item : + std::filesystem::directory_iterator(wal_directory, error_code)) { + if (!item.is_regular_file()) continue; + try { + auto info = ReadWalInfo(item.path()); + if ((uuid.empty() || info.uuid == uuid) && + (!current_seq_num || info.seq_num < current_seq_num)) + wal_files.emplace_back(info.seq_num, info.from_timestamp, + info.to_timestamp, item.path()); + } catch (const RecoveryFailure &e) { + DLOG(WARNING) << "Failed to read " << item.path(); + continue; + } + } + CHECK(!error_code) << "Couldn't recover data because an error occurred: " + << error_code.message() << "!"; + std::sort(wal_files.begin(), wal_files.end()); + return std::move(wal_files); +} + +// Function used to recover all discovered indices and constraints. The +// indices and constraints must be recovered after the data recovery is done +// to ensure that the indices and constraints are consistent at the end of the +// recovery process. +void RecoverIndicesAndConstraints( + const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices, + Constraints *constraints, utils::SkipList *vertices) { + // Recover label indices. + for (const auto &item : indices_constraints.indices.label) { + if (!indices->label_index.CreateIndex(item, vertices->access())) + throw RecoveryFailure("The label index must be created here!"); + } + + // Recover label+property indices. + for (const auto &item : indices_constraints.indices.label_property) { + if (!indices->label_property_index.CreateIndex(item.first, item.second, + vertices->access())) + throw RecoveryFailure("The label+property index must be created here!"); + } + + // Recover existence constraints. + for (const auto &item : indices_constraints.constraints.existence) { + auto ret = CreateExistenceConstraint(constraints, item.first, item.second, + vertices->access()); + if (ret.HasError() || !ret.GetValue()) + throw RecoveryFailure("The existence constraint must be created here!"); + } + + // Recover unique constraints. + for (const auto &item : indices_constraints.constraints.unique) { + auto ret = constraints->unique_constraints.CreateConstraint( + item.first, item.second, vertices->access()); + if (ret.HasError() || + ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS) + throw RecoveryFailure("The unique constraint must be created here!"); + } +} + std::optional RecoverData( const std::filesystem::path &snapshot_directory, const std::filesystem::path &wal_directory, std::string *uuid, @@ -60,64 +153,13 @@ std::optional RecoverData( if (!utils::DirExists(snapshot_directory) && !utils::DirExists(wal_directory)) return std::nullopt; - // Helper lambda used to recover all discovered indices and constraints. The - // indices and constraints must be recovered after the data recovery is done - // to ensure that the indices and constraints are consistent at the end of the - // recovery process. - auto recover_indices_and_constraints = [&](const auto &indices_constraints) { - // Recover label indices. - for (const auto &item : indices_constraints.indices.label) { - if (!indices->label_index.CreateIndex(item, vertices->access())) - throw RecoveryFailure("The label index must be created here!"); - } - - // Recover label+property indices. - for (const auto &item : indices_constraints.indices.label_property) { - if (!indices->label_property_index.CreateIndex(item.first, item.second, - vertices->access())) - throw RecoveryFailure("The label+property index must be created here!"); - } - - // Recover existence constraints. - for (const auto &item : indices_constraints.constraints.existence) { - auto ret = CreateExistenceConstraint(constraints, item.first, item.second, - vertices->access()); - if (ret.HasError() || !ret.GetValue()) - throw RecoveryFailure("The existence constraint must be created here!"); - } - - // Recover unique constraints. - for (const auto &item : indices_constraints.constraints.unique) { - auto ret = constraints->unique_constraints.CreateConstraint( - item.first, item.second, vertices->access()); - if (ret.HasError() || - ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS) - throw RecoveryFailure("The unique constraint must be created here!"); - } - }; - - // Array of all discovered snapshots, ordered by name. - std::vector> snapshot_files; - std::error_code error_code; - if (utils::DirExists(snapshot_directory)) { - for (const auto &item : - std::filesystem::directory_iterator(snapshot_directory, error_code)) { - if (!item.is_regular_file()) continue; - try { - auto info = ReadSnapshotInfo(item.path()); - snapshot_files.emplace_back(item.path(), info.uuid); - } catch (const RecoveryFailure &) { - continue; - } - } - CHECK(!error_code) << "Couldn't recover data because an error occurred: " - << error_code.message() << "!"; - } + auto snapshot_files = GetSnapshotFiles(snapshot_directory); RecoveryInfo recovery_info; RecoveredIndicesAndConstraints indices_constraints; std::optional snapshot_timestamp; if (!snapshot_files.empty()) { + // Order the files by name std::sort(snapshot_files.begin(), snapshot_files.end()); // UUID used for durability is the UUID of the last snapshot file. *uuid = snapshot_files.back().second; @@ -149,10 +191,12 @@ std::optional RecoverData( indices_constraints = std::move(recovered_snapshot->indices_constraints); snapshot_timestamp = recovered_snapshot->snapshot_info.start_timestamp; if (!utils::DirExists(wal_directory)) { - recover_indices_and_constraints(indices_constraints); + RecoverIndicesAndConstraints(indices_constraints, indices, constraints, + vertices); return recovered_snapshot->recovery_info; } } else { + std::error_code error_code; if (!utils::DirExists(wal_directory)) return std::nullopt; // Array of all discovered WAL files, ordered by name. std::vector> wal_files; @@ -174,23 +218,12 @@ std::optional RecoverData( *uuid = wal_files.back().second; } + auto maybe_wal_files = GetWalFiles(wal_directory, *uuid); + if (!maybe_wal_files) return std::nullopt; + // Array of all discovered WAL files, ordered by sequence number. - std::vector> - wal_files; - for (const auto &item : - std::filesystem::directory_iterator(wal_directory, error_code)) { - if (!item.is_regular_file()) continue; - try { - auto info = ReadWalInfo(item.path()); - if (info.uuid != *uuid) continue; - wal_files.emplace_back(info.seq_num, info.from_timestamp, - info.to_timestamp, item.path()); - } catch (const RecoveryFailure &e) { - continue; - } - } - CHECK(!error_code) << "Couldn't recover data because an error occurred: " - << error_code.message() << "!"; + auto &wal_files = *maybe_wal_files; + // By this point we should have recovered from a snapshot, or we should have // found some WAL files to recover from in the above `else`. This is just a // sanity check to circumvent the following case: The database didn't recover @@ -250,7 +283,8 @@ std::optional RecoverData( *wal_seq_num = *previous_seq_num + 1; } - recover_indices_and_constraints(indices_constraints); + RecoverIndicesAndConstraints(indices_constraints, indices, constraints, + vertices); return recovery_info; } diff --git a/src/storage/v2/durability/durability.hpp b/src/storage/v2/durability/durability.hpp index 047429066..8f260335b 100644 --- a/src/storage/v2/durability/durability.hpp +++ b/src/storage/v2/durability/durability.hpp @@ -9,6 +9,7 @@ #include "storage/v2/config.hpp" #include "storage/v2/constraints.hpp" #include "storage/v2/durability/metadata.hpp" +#include "storage/v2/durability/wal.hpp" #include "storage/v2/edge.hpp" #include "storage/v2/indices.hpp" #include "storage/v2/name_id_mapper.hpp" @@ -23,6 +24,41 @@ namespace storage::durability { void VerifyStorageDirectoryOwnerAndProcessUserOrDie( const std::filesystem::path &storage_directory); +/// Get list of snapshot files with their UUID. +/// @param snapshot_directory Directory containing the Snapshot files. +/// @param uuid UUID of the Snapshot files. If not empty, fetch only Snapshot +/// file with the specified UUID. Otherwise, fetch only Snapshot files in the +/// snapshot_directory. +/// @return List of snapshot files defined with its path and UUID. +std::vector> GetSnapshotFiles( + const std::filesystem::path &snapshot_directory, + std::string_view uuid = ""); + +/// Get list of WAL files ordered by the sequence number +/// @param wal_directory Directory containing the WAL files. +/// @param uuid UUID of the WAL files. If not empty, fetch only WAL files +/// with the specified UUID. Otherwise, fetch all WAL files in the +/// wal_directory. +/// @param current_seq_num Sequence number of the WAL file which is currently +/// being written. If specified, load only finalized WAL files, i.e. WAL files +/// with seq_num < current_seq_num. +/// @return List of WAL files. Each WAL file is defined with its sequence +/// number, from timestamp, to timestamp and path. +std::optional>> +GetWalFiles(const std::filesystem::path &wal_directory, + std::string_view uuid = "", + std::optional current_seq_num = {}); + +// Helper function used to recover all discovered indices and constraints. The +// indices and constraints must be recovered after the data recovery is done +// to ensure that the indices and constraints are consistent at the end of the +// recovery process. +/// @throw RecoveryFailure +void RecoverIndicesAndConstraints( + const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices, + Constraints *constraints, utils::SkipList *vertices); + /// Recovers data either from a snapshot and/or WAL files. /// @throw RecoveryFailure /// @throw std::bad_alloc diff --git a/src/storage/v2/durability/serialization.cpp b/src/storage/v2/durability/serialization.cpp index 0c3764003..faaa659b2 100644 --- a/src/storage/v2/durability/serialization.cpp +++ b/src/storage/v2/durability/serialization.cpp @@ -119,6 +119,18 @@ void Encoder::Finalize() { file_.Close(); } +void Encoder::DisableFlushing() { file_.DisableFlushing(); } + +void Encoder::EnableFlushing() { file_.EnableFlushing(); } + +void Encoder::TryFlushing() { file_.TryFlushing(); } + +std::pair Encoder::CurrentFileBuffer() const { + return file_.CurrentBuffer(); +} + +size_t Encoder::GetSize() { return file_.GetSize(); } + ////////////////////////// // Decoder implementation. ////////////////////////// diff --git a/src/storage/v2/durability/serialization.hpp b/src/storage/v2/durability/serialization.hpp index bdd3cd74f..66c5679ec 100644 --- a/src/storage/v2/durability/serialization.hpp +++ b/src/storage/v2/durability/serialization.hpp @@ -51,6 +51,18 @@ class Encoder final : public BaseEncoder { void Finalize(); + // Disable flushing of the internal buffer. + void DisableFlushing(); + // Enable flushing of the internal buffer. + void EnableFlushing(); + // Try flushing the internal buffer. + void TryFlushing(); + // Get the current internal buffer with its size. + std::pair CurrentFileBuffer() const; + + // Get the total size of the current file. + size_t GetSize(); + private: utils::OutputFile file_; }; diff --git a/src/storage/v2/durability/snapshot.cpp b/src/storage/v2/durability/snapshot.cpp index 478601e78..6b88b4b5c 100644 --- a/src/storage/v2/durability/snapshot.cpp +++ b/src/storage/v2/durability/snapshot.cpp @@ -9,6 +9,7 @@ #include "storage/v2/edge_ref.hpp" #include "storage/v2/mvcc.hpp" #include "storage/v2/vertex_accessor.hpp" +#include "utils/file_locker.hpp" namespace storage::durability { @@ -582,7 +583,8 @@ void CreateSnapshot(Transaction *transaction, utils::SkipList *vertices, utils::SkipList *edges, NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints, - Config::Items items, const std::string &uuid) { + Config::Items items, const std::string &uuid, + utils::FileRetainer *file_retainer) { // Ensure that the storage directory exists. utils::EnsureDirOrDie(snapshot_directory); @@ -865,10 +867,7 @@ void CreateSnapshot(Transaction *transaction, old_snapshot_files.size() - (snapshot_retention_count - 1); for (size_t i = 0; i < num_to_erase; ++i) { const auto &[start_timestamp, snapshot_path] = old_snapshot_files[i]; - if (!utils::DeleteFile(snapshot_path)) { - LOG(WARNING) << "Couldn't delete snapshot file " << snapshot_path - << "!"; - } + file_retainer->DeleteFile(snapshot_path); } old_snapshot_files.erase(old_snapshot_files.begin(), old_snapshot_files.begin() + num_to_erase); diff --git a/src/storage/v2/durability/snapshot.hpp b/src/storage/v2/durability/snapshot.hpp index 885c4c6e5..a7d317b83 100644 --- a/src/storage/v2/durability/snapshot.hpp +++ b/src/storage/v2/durability/snapshot.hpp @@ -12,6 +12,7 @@ #include "storage/v2/name_id_mapper.hpp" #include "storage/v2/transaction.hpp" #include "storage/v2/vertex.hpp" +#include "utils/file_locker.hpp" #include "utils/skip_list.hpp" namespace storage::durability { @@ -60,6 +61,7 @@ void CreateSnapshot(Transaction *transaction, utils::SkipList *vertices, utils::SkipList *edges, NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints, - Config::Items items, const std::string &uuid); + Config::Items items, const std::string &uuid, + utils::FileRetainer *file_retainer); } // namespace storage::durability diff --git a/src/storage/v2/durability/wal.cpp b/src/storage/v2/durability/wal.cpp index 49f1723b6..68460146f 100644 --- a/src/storage/v2/durability/wal.cpp +++ b/src/storage/v2/durability/wal.cpp @@ -976,7 +976,8 @@ WalFile::WalFile(const std::filesystem::path &wal_directory, path_(wal_directory / MakeWalName()), from_timestamp_(0), to_timestamp_(0), - count_(0) { + count_(0), + seq_num_(seq_num) { // Ensure that the storage directory exists. utils::EnsureDirOrDie(wal_directory); @@ -1055,7 +1056,9 @@ void WalFile::AppendOperation(StorageGlobalOperation operation, LabelId label, void WalFile::Sync() { wal_.Sync(); } -uint64_t WalFile::GetSize() { return wal_.GetPosition(); } +uint64_t WalFile::GetSize() { return wal_.GetSize(); } + +uint64_t WalFile::SequenceNumber() const { return seq_num_; } void WalFile::UpdateStats(uint64_t timestamp) { if (count_ == 0) from_timestamp_ = timestamp; @@ -1063,4 +1066,14 @@ void WalFile::UpdateStats(uint64_t timestamp) { count_ += 1; } +void WalFile::DisableFlushing() { wal_.DisableFlushing(); } + +void WalFile::EnableFlushing() { wal_.EnableFlushing(); } + +void WalFile::TryFlushing() { wal_.TryFlushing(); } + +std::pair WalFile::CurrentFileBuffer() const { + return wal_.CurrentFileBuffer(); +} + } // namespace storage::durability diff --git a/src/storage/v2/durability/wal.hpp b/src/storage/v2/durability/wal.hpp index 7c14aabfb..f8e0985ec 100644 --- a/src/storage/v2/durability/wal.hpp +++ b/src/storage/v2/durability/wal.hpp @@ -181,6 +181,20 @@ class WalFile { uint64_t GetSize(); + uint64_t SequenceNumber() const; + + // Disable flushing of the internal buffer. + void DisableFlushing(); + // Enable flushing of the internal buffer. + void EnableFlushing(); + // Try flushing the internal buffer. + void TryFlushing(); + // Get the internal buffer with its size. + std::pair CurrentFileBuffer() const; + + // Get the path of the current WAL file. + const auto &Path() const { return path_; } + private: void UpdateStats(uint64_t timestamp); @@ -191,6 +205,7 @@ class WalFile { uint64_t from_timestamp_; uint64_t to_timestamp_; uint64_t count_; + uint64_t seq_num_; }; } // namespace storage::durability diff --git a/src/storage/v2/replication/replication.cpp b/src/storage/v2/replication/replication.cpp new file mode 100644 index 000000000..747054c07 --- /dev/null +++ b/src/storage/v2/replication/replication.cpp @@ -0,0 +1,107 @@ +#include "storage/v2/replication/replication.hpp" + +namespace storage::replication { + +////// ReplicationClient ////// +ReplicationClient::ReplicationClient(std::string name, + NameIdMapper *name_id_mapper, + Config::Items items, + const io::network::Endpoint &endpoint, + bool use_ssl) + : name_(std::move(name)), + name_id_mapper_(name_id_mapper), + items_(items), + rpc_context_(use_ssl), + rpc_client_(endpoint, &rpc_context_) {} + +void ReplicationClient::TransferSnapshot(const std::filesystem::path &path) { + auto stream{rpc_client_.Stream()}; + Encoder encoder(stream.GetBuilder()); + encoder.WriteFile(path); + stream.AwaitResponse(); +} + +void ReplicationClient::TransferWalFiles( + const std::vector &wal_files) { + CHECK(!wal_files.empty()) << "Wal files list is empty!"; + auto stream{rpc_client_.Stream()}; + Encoder encoder(stream.GetBuilder()); + encoder.WriteUint(wal_files.size()); + for (const auto &wal : wal_files) { + encoder.WriteFile(wal); + } + + stream.AwaitResponse(); +} + +////// TransactionHandler ////// +ReplicationClient::TransactionHandler::TransactionHandler( + ReplicationClient *self) + : self_(self), stream_(self_->rpc_client_.Stream()) {} + +void ReplicationClient::TransactionHandler::AppendDelta( + const Delta &delta, const Vertex &vertex, uint64_t final_commit_timestamp) { + Encoder encoder(stream_.GetBuilder()); + EncodeDelta(&encoder, self_->name_id_mapper_, self_->items_, delta, vertex, + final_commit_timestamp); +} + +void ReplicationClient::TransactionHandler::AppendDelta( + const Delta &delta, const Edge &edge, uint64_t final_commit_timestamp) { + Encoder encoder(stream_.GetBuilder()); + EncodeDelta(&encoder, self_->name_id_mapper_, delta, edge, + final_commit_timestamp); +} + +void ReplicationClient::TransactionHandler::AppendTransactionEnd( + uint64_t final_commit_timestamp) { + Encoder encoder(stream_.GetBuilder()); + EncodeTransactionEnd(&encoder, final_commit_timestamp); +} + +void ReplicationClient::TransactionHandler::AppendOperation( + durability::StorageGlobalOperation operation, LabelId label, + const std::set &properties, uint64_t timestamp) { + Encoder encoder(stream_.GetBuilder()); + EncodeOperation(&encoder, self_->name_id_mapper_, operation, label, + properties, timestamp); +} + +void ReplicationClient::TransactionHandler::Finalize() { + stream_.AwaitResponse(); +} + +////// CurrentWalHandler ////// +ReplicationClient::CurrentWalHandler::CurrentWalHandler(ReplicationClient *self) + : self_(self), stream_(self_->rpc_client_.Stream()) { + Encoder encoder(stream_.GetBuilder()); + encoder.WriteUint(1); +} + +void ReplicationClient::CurrentWalHandler::AppendFilename( + const std::string &filename) { + Encoder encoder(stream_.GetBuilder()); + encoder.WriteString(filename); +} + +void ReplicationClient::CurrentWalHandler::AppendSize(const size_t size) { + Encoder encoder(stream_.GetBuilder()); + encoder.WriteUint(size); +} + +void ReplicationClient::CurrentWalHandler::AppendFileData( + utils::InputFile *file) { + Encoder encoder(stream_.GetBuilder()); + encoder.WriteFileData(file); +} + +void ReplicationClient::CurrentWalHandler::AppendBufferData( + const uint8_t *buffer, const size_t buffer_size) { + Encoder encoder(stream_.GetBuilder()); + encoder.WriteBuffer(buffer, buffer_size); +} + +void ReplicationClient::CurrentWalHandler::Finalize() { + stream_.AwaitResponse(); +} +} // namespace storage::replication diff --git a/src/storage/v2/replication/replication.hpp b/src/storage/v2/replication/replication.hpp index 52fea3f99..deb695e95 100644 --- a/src/storage/v2/replication/replication.hpp +++ b/src/storage/v2/replication/replication.hpp @@ -10,6 +10,7 @@ #include "storage/v2/property_value.hpp" #include "storage/v2/replication/rpc.hpp" #include "storage/v2/replication/serialization.hpp" +#include "utils/file.hpp" namespace storage::replication { @@ -17,62 +18,73 @@ class ReplicationClient { public: ReplicationClient(std::string name, NameIdMapper *name_id_mapper, Config::Items items, const io::network::Endpoint &endpoint, - bool use_ssl) - : name_(std::move(name)), - name_id_mapper_(name_id_mapper), - items_(items), - rpc_context_(use_ssl), - rpc_client_(endpoint, &rpc_context_) {} + bool use_ssl); - class Handler { + // Handler used for transfering the current transaction. + class TransactionHandler { private: friend class ReplicationClient; - - /// @throw rpc::RpcFailedException - explicit Handler(ReplicationClient *self) - : self_(self), stream_(self_->rpc_client_.Stream()) {} + explicit TransactionHandler(ReplicationClient *self); public: /// @throw rpc::RpcFailedException void AppendDelta(const Delta &delta, const Vertex &vertex, - uint64_t final_commit_timestamp) { - Encoder encoder(stream_.GetBuilder()); - EncodeDelta(&encoder, self_->name_id_mapper_, self_->items_, delta, - vertex, final_commit_timestamp); - } + uint64_t final_commit_timestamp); /// @throw rpc::RpcFailedException void AppendDelta(const Delta &delta, const Edge &edge, - uint64_t final_commit_timestamp) { - Encoder encoder(stream_.GetBuilder()); - EncodeDelta(&encoder, self_->name_id_mapper_, delta, edge, - final_commit_timestamp); - } + uint64_t final_commit_timestamp); /// @throw rpc::RpcFailedException - void AppendTransactionEnd(uint64_t final_commit_timestamp) { - Encoder encoder(stream_.GetBuilder()); - EncodeTransactionEnd(&encoder, final_commit_timestamp); - } + void AppendTransactionEnd(uint64_t final_commit_timestamp); /// @throw rpc::RpcFailedException void AppendOperation(durability::StorageGlobalOperation operation, LabelId label, const std::set &properties, - uint64_t timestamp) { - Encoder encoder(stream_.GetBuilder()); - EncodeOperation(&encoder, self_->name_id_mapper_, operation, label, - properties, timestamp); - } + uint64_t timestamp); /// @throw rpc::RpcFailedException - void Finalize() { stream_.AwaitResponse(); } + void Finalize(); private: ReplicationClient *self_; rpc::Client::StreamHandler stream_; }; - Handler ReplicateTransaction() { return Handler(this); } + TransactionHandler ReplicateTransaction() { return TransactionHandler(this); } + + // Transfer the snapshot file. + // @param path Path of the snapshot file. + void TransferSnapshot(const std::filesystem::path &path); + + // Handler for transfering the current WAL file whose data is + // contained in the internal buffer and the file. + class CurrentWalHandler { + private: + friend class ReplicationClient; + explicit CurrentWalHandler(ReplicationClient *self); + + public: + void AppendFilename(const std::string &filename); + + void AppendSize(size_t size); + + void AppendFileData(utils::InputFile *file); + + void AppendBufferData(const uint8_t *buffer, size_t buffer_size); + + /// @throw rpc::RpcFailedException + void Finalize(); + + private: + ReplicationClient *self_; + rpc::Client::StreamHandler stream_; + }; + + CurrentWalHandler TransferCurrentWalFile() { return CurrentWalHandler{this}; } + + // Transfer the WAL files + void TransferWalFiles(const std::vector &wal_files); const auto &Name() const { return name_; } diff --git a/src/storage/v2/replication/rpc.lcp b/src/storage/v2/replication/rpc.lcp index 15c31d836..e1986c822 100644 --- a/src/storage/v2/replication/rpc.lcp +++ b/src/storage/v2/replication/rpc.lcp @@ -1,15 +1,14 @@ #>cpp #pragma once -#include #include +#include #include "rpc/messages.hpp" #include "slk/serialization.hpp" #include "slk/streams.hpp" cpp<# - -;; TODO(mferencevic): Change namespace to `storage::replication` once LCP is +;; TODO(antonio2368): Change namespace to `storage::replication` once LCP is ;; updated to support such namespaces. (lcp:namespace storage) @@ -29,4 +28,16 @@ cpp<# ((success :bool) (term :uint64_t)))) +(lcp:define-rpc snapshot + (:request ()) + (:response + ((success :bool) + (term :uint64_t)))) + +(lcp:define-rpc wal-files + (:request ()) + (:response + ((success :bool) + (term :uint64_t)))) + (lcp:pop-namespace) ;; storage diff --git a/src/storage/v2/replication/serialization.cpp b/src/storage/v2/replication/serialization.cpp new file mode 100644 index 000000000..bcdd48da6 --- /dev/null +++ b/src/storage/v2/replication/serialization.cpp @@ -0,0 +1,165 @@ +#include "storage/v2/replication/serialization.hpp" + +namespace storage::replication { +////// Encoder ////// +void Encoder::WriteMarker(durability::Marker marker) { + slk::Save(marker, builder_); +} + +void Encoder::WriteBool(bool value) { + WriteMarker(durability::Marker::TYPE_BOOL); + slk::Save(value, builder_); +} + +void Encoder::WriteUint(uint64_t value) { + WriteMarker(durability::Marker::TYPE_INT); + slk::Save(value, builder_); +} + +void Encoder::WriteDouble(double value) { + WriteMarker(durability::Marker::TYPE_DOUBLE); + slk::Save(value, builder_); +} + +void Encoder::WriteString(const std::string_view &value) { + WriteMarker(durability::Marker::TYPE_STRING); + slk::Save(value, builder_); +} + +void Encoder::WritePropertyValue(const PropertyValue &value) { + WriteMarker(durability::Marker::TYPE_PROPERTY_VALUE); + slk::Save(value, builder_); +} + +void Encoder::WriteBuffer(const uint8_t *buffer, const size_t buffer_size) { + builder_->Save(buffer, buffer_size); +} + +void Encoder::WriteFileData(utils::InputFile *file) { + auto file_size = file->GetSize(); + uint8_t buffer[utils::kFileBufferSize]; + while (file_size > 0) { + const auto chunk_size = std::min(file_size, utils::kFileBufferSize); + file->Read(buffer, chunk_size); + WriteBuffer(buffer, chunk_size); + file_size -= chunk_size; + } +} + +void Encoder::WriteFile(const std::filesystem::path &path) { + utils::InputFile file; + CHECK(file.Open(path)) << "Failed to open file " << path; + CHECK(path.has_filename()) << "Path does not have a filename!"; + const auto &filename = path.filename().generic_string(); + WriteString(filename); + auto file_size = file.GetSize(); + WriteUint(file_size); + WriteFileData(&file); + file.Close(); +} + +////// Decoder ////// +std::optional Decoder::ReadMarker() { + durability::Marker marker; + slk::Load(&marker, reader_); + return marker; +} + +std::optional Decoder::ReadBool() { + if (const auto marker = ReadMarker(); + !marker || marker != durability::Marker::TYPE_BOOL) + return std::nullopt; + bool value; + slk::Load(&value, reader_); + return value; +} + +std::optional Decoder::ReadUint() { + if (const auto marker = ReadMarker(); + !marker || marker != durability::Marker::TYPE_INT) + return std::nullopt; + uint64_t value; + slk::Load(&value, reader_); + return value; +} + +std::optional Decoder::ReadDouble() { + if (const auto marker = ReadMarker(); + !marker || marker != durability::Marker::TYPE_DOUBLE) + return std::nullopt; + double value; + slk::Load(&value, reader_); + return value; +} + +std::optional Decoder::ReadString() { + if (const auto marker = ReadMarker(); + !marker || marker != durability::Marker::TYPE_STRING) + return std::nullopt; + std::string value; + slk::Load(&value, reader_); + return std::move(value); +} + +std::optional Decoder::ReadPropertyValue() { + if (const auto marker = ReadMarker(); + !marker || marker != durability::Marker::TYPE_PROPERTY_VALUE) + return std::nullopt; + PropertyValue value; + slk::Load(&value, reader_); + return std::move(value); +} + +bool Decoder::SkipString() { + if (const auto marker = ReadMarker(); + !marker || marker != durability::Marker::TYPE_STRING) + return false; + std::string value; + slk::Load(&value, reader_); + return true; +} + +bool Decoder::SkipPropertyValue() { + if (const auto marker = ReadMarker(); + !marker || marker != durability::Marker::TYPE_PROPERTY_VALUE) + return false; + PropertyValue value; + slk::Load(&value, reader_); + return true; +} + +std::optional Decoder::ReadFile( + const std::filesystem::path &directory) { + CHECK(std::filesystem::exists(directory) && + std::filesystem::is_directory(directory)) + << "Sent path for streamed files should be a valid directory!"; + utils::OutputFile file; + const auto maybe_filename = ReadString(); + CHECK(maybe_filename) << "Filename missing for the file"; + const auto &filename = *maybe_filename; + auto path = directory / filename; + + // Check if the file already exists so we don't overwrite it + const bool file_exists = std::filesystem::exists(path); + + // TODO (antonio2368): Maybe append filename with custom suffix so we have + // both copies? + if (!file_exists) { + file.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING); + } + std::optional maybe_file_size = ReadUint(); + CHECK(maybe_file_size) << "File size missing"; + auto file_size = *maybe_file_size; + uint8_t buffer[utils::kFileBufferSize]; + while (file_size > 0) { + const auto chunk_size = std::min(file_size, utils::kFileBufferSize); + reader_->Load(buffer, chunk_size); + if (!file_exists) { + file.Write(buffer, chunk_size); + } + file_size -= chunk_size; + } + file.Close(); + return std::move(path); +} +} // namespace storage::replication diff --git a/src/storage/v2/replication/serialization.hpp b/src/storage/v2/replication/serialization.hpp index 9d2db59c0..bc42faca6 100644 --- a/src/storage/v2/replication/serialization.hpp +++ b/src/storage/v2/replication/serialization.hpp @@ -1,9 +1,12 @@ #pragma once +#include + #include "slk/streams.hpp" #include "storage/v2/durability/serialization.hpp" #include "storage/v2/replication/slk.hpp" #include "utils/cast.hpp" +#include "utils/file.hpp" namespace storage::replication { @@ -11,34 +14,23 @@ class Encoder final : public durability::BaseEncoder { public: explicit Encoder(slk::Builder *builder) : builder_(builder) {} - void WriteMarker(durability::Marker marker) override { - slk::Save(marker, builder_); - } + void WriteMarker(durability::Marker marker) override; - void WriteBool(bool value) override { - WriteMarker(durability::Marker::TYPE_BOOL); - slk::Save(value, builder_); - } + void WriteBool(bool value) override; - void WriteUint(uint64_t value) override { - WriteMarker(durability::Marker::TYPE_INT); - slk::Save(value, builder_); - } + void WriteUint(uint64_t value) override; - void WriteDouble(double value) override { - WriteMarker(durability::Marker::TYPE_DOUBLE); - slk::Save(value, builder_); - } + void WriteDouble(double value) override; - void WriteString(const std::string_view &value) override { - WriteMarker(durability::Marker::TYPE_STRING); - slk::Save(value, builder_); - } + void WriteString(const std::string_view &value) override; - void WritePropertyValue(const PropertyValue &value) override { - WriteMarker(durability::Marker::TYPE_PROPERTY_VALUE); - slk::Save(value, builder_); - } + void WritePropertyValue(const PropertyValue &value) override; + + void WriteBuffer(const uint8_t *buffer, size_t buffer_size); + + void WriteFileData(utils::InputFile *file); + + void WriteFile(const std::filesystem::path &path); private: slk::Builder *builder_; @@ -48,74 +40,27 @@ class Decoder final : public durability::BaseDecoder { public: explicit Decoder(slk::Reader *reader) : reader_(reader) {} - std::optional ReadMarker() override { - durability::Marker marker; - slk::Load(&marker, reader_); - return marker; - } + std::optional ReadMarker() override; - std::optional ReadBool() override { - if (const auto marker = ReadMarker(); - !marker || marker != durability::Marker::TYPE_BOOL) - return std::nullopt; - bool value; - slk::Load(&value, reader_); - return value; - } + std::optional ReadBool() override; - std::optional ReadUint() override { - if (const auto marker = ReadMarker(); - !marker || marker != durability::Marker::TYPE_INT) - return std::nullopt; - uint64_t value; - slk::Load(&value, reader_); - return value; - } + std::optional ReadUint() override; - std::optional ReadDouble() override { - if (const auto marker = ReadMarker(); - !marker || marker != durability::Marker::TYPE_DOUBLE) - return std::nullopt; - double value; - slk::Load(&value, reader_); - return value; - } + std::optional ReadDouble() override; - std::optional ReadString() override { - if (const auto marker = ReadMarker(); - !marker || marker != durability::Marker::TYPE_STRING) - return std::nullopt; - std::string value; - slk::Load(&value, reader_); - return std::move(value); - } + std::optional ReadString() override; - std::optional ReadPropertyValue() override { - if (const auto marker = ReadMarker(); - !marker || marker != durability::Marker::TYPE_PROPERTY_VALUE) - return std::nullopt; - PropertyValue value; - slk::Load(&value, reader_); - return std::move(value); - } + std::optional ReadPropertyValue() override; - bool SkipString() override { - if (const auto marker = ReadMarker(); - !marker || marker != durability::Marker::TYPE_STRING) - return false; - std::string value; - slk::Load(&value, reader_); - return true; - } + bool SkipString() override; - bool SkipPropertyValue() override { - if (const auto marker = ReadMarker(); - !marker || marker != durability::Marker::TYPE_PROPERTY_VALUE) - return false; - PropertyValue value; - slk::Load(&value, reader_); - return true; - } + bool SkipPropertyValue() override; + + /// Read the file and save it inside the specified directory. + /// @param directory Directory which will contain the read file. + /// @return If the read was successful, path to the read file. + std::optional ReadFile( + const std::filesystem::path &directory); private: slk::Reader *reader_; diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index a26882e36..60e782a57 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -11,9 +11,13 @@ #include "io/network/endpoint.hpp" #include "storage/v2/durability/durability.hpp" +#include "storage/v2/durability/metadata.hpp" #include "storage/v2/durability/paths.hpp" #include "storage/v2/durability/snapshot.hpp" +#include "storage/v2/durability/wal.hpp" +#include "storage/v2/indices.hpp" #include "storage/v2/mvcc.hpp" +#include "utils/file.hpp" #include "utils/rw_lock.hpp" #include "utils/spin_lock.hpp" #include "utils/stat.hpp" @@ -394,23 +398,8 @@ Storage::Storage(Config config) } if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) { - snapshot_runner_.Run( - "Snapshot", config_.durability.snapshot_interval, [this] { - // Take master RW lock (for reading). - std::shared_lock storage_guard(main_lock_); - - // Create the transaction used to create the snapshot. - auto transaction = CreateTransaction(); - - // Create snapshot. - durability::CreateSnapshot( - &transaction, snapshot_directory_, wal_directory_, - config_.durability.snapshot_retention_count, &vertices_, &edges_, - &name_id_mapper_, &indices_, &constraints_, config_.items, uuid_); - - // Finalize snapshot transaction. - commit_log_.MarkFinished(transaction.start_timestamp); - }); + snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval, + [this] { this->CreateSnapshot(); }); } if (config_.gc.type == Config::Gc::Type::PERIODIC) { gc_runner_.Run("Storage GC", config_.gc.interval, @@ -435,8 +424,10 @@ Storage::~Storage() { } #ifdef MG_ENTERPRISE { - std::lock_guard replication_guard(replication_lock_); - rpc_context_.emplace(); + // Clear replication data + std::unique_lock replication_guard(replication_lock_); + replication_server_.reset(); + replication_clients_.WithLock([&](auto &clients) { clients.clear(); }); } #endif wal_file_ = std::nullopt; @@ -445,20 +436,7 @@ Storage::~Storage() { snapshot_runner_.Stop(); } if (config_.durability.snapshot_on_exit) { - // Take master RW lock (for reading). - std::shared_lock storage_guard(main_lock_); - - // Create the transaction used to create the snapshot. - auto transaction = CreateTransaction(); - - // Create snapshot. - durability::CreateSnapshot( - &transaction, snapshot_directory_, wal_directory_, - config_.durability.snapshot_retention_count, &vertices_, &edges_, - &name_id_mapper_, &indices_, &constraints_, config_.items, uuid_); - - // Finalize snapshot transaction. - commit_log_.MarkFinished(transaction.start_timestamp); + CreateSnapshot(); } } @@ -1640,6 +1618,12 @@ void Storage::FinalizeWalFile() { config_.durability.wal_file_size_kibibytes) { wal_file_ = std::nullopt; wal_unsynced_transactions_ = 0; + } else { + // Try writing the internal buffer if possible, if not + // the data should be written as soon as it's possible + // (triggered by the new transaction commit, or some + // reading thread EnabledFlushing) + wal_file_->TryFlushing(); } } @@ -1655,17 +1639,17 @@ void Storage::AppendToWal(const Transaction &transaction, // We need to keep this lock because handler takes a pointer to the client // from which it was created std::shared_lock replication_guard(replication_lock_); - std::list streams; + std::list streams; if (replication_state_.load() == ReplicationState::MAIN) { - auto &replication_clients = GetRpcContext(); - try { - std::transform(replication_clients.begin(), replication_clients.end(), - std::back_inserter(streams), [](auto &client) { - return client.ReplicateTransaction(); - }); - } catch (const rpc::RpcFailedException &) { - LOG(FATAL) << "Couldn't replicate data!"; - } + replication_clients_.WithLock([&](auto &clients) { + try { + std::transform( + clients.begin(), clients.end(), std::back_inserter(streams), + [](auto &client) { return client.ReplicateTransaction(); }); + } catch (const rpc::RpcFailedException &) { + LOG(FATAL) << "Couldn't replicate data!"; + } + }); } #endif @@ -1845,43 +1829,67 @@ void Storage::AppendToWal(durability::StorageGlobalOperation operation, wal_file_->AppendOperation(operation, label, properties, final_commit_timestamp); #ifdef MG_ENTERPRISE - std::shared_lock replication_guard(replication_lock_); - if (replication_state_.load() == ReplicationState::MAIN) { - auto &replication_clients = GetRpcContext(); - for (auto &client : replication_clients) { - auto stream = client.ReplicateTransaction(); - try { - stream.AppendOperation(operation, label, properties, - final_commit_timestamp); - stream.Finalize(); - } catch (const rpc::RpcFailedException &) { - LOG(FATAL) << "Couldn't replicate data!"; - } + { + std::shared_lock replication_guard(replication_lock_); + if (replication_state_.load() == ReplicationState::MAIN) { + replication_clients_.WithLock([&](auto &clients) { + for (auto &client : clients) { + auto stream = client.ReplicateTransaction(); + try { + stream.AppendOperation(operation, label, properties, + final_commit_timestamp); + stream.Finalize(); + } catch (const rpc::RpcFailedException &) { + LOG(FATAL) << "Couldn't replicate data!"; + } + } + }); } } - replication_guard.unlock(); #endif FinalizeWalFile(); } +void Storage::CreateSnapshot() { +#ifdef MG_ENTERPRISE + if (replication_state_.load() != ReplicationState::MAIN) { + LOG(WARNING) << "Snapshots are disabled for replicas!"; + return; + } +#endif + + // Take master RW lock (for reading). + std::shared_lock storage_guard(main_lock_); + + // Create the transaction used to create the snapshot. + auto transaction = CreateTransaction(); + + // Create snapshot. + durability::CreateSnapshot(&transaction, snapshot_directory_, wal_directory_, + config_.durability.snapshot_retention_count, + &vertices_, &edges_, &name_id_mapper_, &indices_, + &constraints_, config_.items, uuid_, + &file_retainer_); + + // Finalize snapshot transaction. + commit_log_.MarkFinished(transaction.start_timestamp); +} + #ifdef MG_ENTERPRISE void Storage::ConfigureReplica(io::network::Endpoint endpoint) { - rpc_context_.emplace(); - - auto &replication_server = GetRpcContext(); + replication_server_.emplace(); // Create RPC server. - // TODO(mferencevic): Add support for SSL. - replication_server.rpc_server_context.emplace(); + // TODO (antonio2368): Add support for SSL. + replication_server_->rpc_server_context.emplace(); // NOTE: The replication server must have a single thread for processing // because there is no need for more processing threads - each replica can // have only a single main server. Also, the single-threaded guarantee // simplifies the rest of the implementation. - // TODO(mferencevic): Make endpoint configurable. - replication_server.rpc_server.emplace(endpoint, - &*replication_server.rpc_server_context, - /* workers_count = */ 1); - replication_server.rpc_server->Register< + replication_server_->rpc_server.emplace( + endpoint, &*replication_server_->rpc_server_context, + /* workers_count = */ 1); + replication_server_->rpc_server->Register< AppendDeltasRpc>([this, endpoint = std::move(endpoint)]( auto *req_reader, auto *res_builder) { AppendDeltasReq req; @@ -2251,36 +2259,209 @@ void Storage::ConfigureReplica(io::network::Endpoint endpoint) { AppendDeltasRes res; slk::Save(res, res_builder); }); - replication_server.rpc_server->Start(); + replication_server_->rpc_server->Register( + [this](auto *req_reader, auto *res_builder) { + DLOG(INFO) << "Received SnapshotRpc"; + SnapshotReq req; + slk::Load(&req, req_reader); + + replication::Decoder decoder(req_reader); + + utils::EnsureDirOrDie(snapshot_directory_); + + const auto maybe_snapshot_path = decoder.ReadFile(snapshot_directory_); + CHECK(maybe_snapshot_path) << "Failed to load snapshot!"; + DLOG(INFO) << "Received snapshot saved to " << *maybe_snapshot_path; + + std::unique_lock storage_guard(main_lock_); + // Clear the database + vertices_.clear(); + edges_.clear(); + + constraints_ = Constraints(); + // TODO (antonio2368): Check if there's a less hacky way + indices_.label_index = + LabelIndex(&indices_, &constraints_, config_.items); + indices_.label_property_index = + LabelPropertyIndex(&indices_, &constraints_, config_.items); + try { + DLOG(INFO) << "Loading snapshot"; + auto recovered_snapshot = durability::LoadSnapshot( + *maybe_snapshot_path, &vertices_, &edges_, &name_id_mapper_, + &edge_count_, config_.items); + DLOG(INFO) << "Snapshot loaded successfully"; + // If this step is present it should always be the first step of + // the recovery so we use the UUID we read from snasphost + uuid_ = recovered_snapshot.snapshot_info.uuid; + const auto &recovery_info = recovered_snapshot.recovery_info; + vertex_id_ = recovery_info.next_vertex_id; + edge_id_ = recovery_info.next_edge_id; + timestamp_ = std::max(timestamp_, recovery_info.next_timestamp); + + durability::RecoverIndicesAndConstraints( + recovered_snapshot.indices_constraints, &indices_, &constraints_, + &vertices_); + } catch (const durability::RecoveryFailure &e) { + // TODO (antonio2368): What to do if the sent snapshot is invalid + LOG(WARNING) << "Couldn't load the snapshot because of: " << e.what(); + } + storage_guard.unlock(); + + SnapshotRes res; + slk::Save(res, res_builder); + }); + replication_server_->rpc_server->Register( + [this](auto *req_reader, auto *res_builder) { + DLOG(INFO) << "Received WalFilesRpc"; + WalFilesReq req; + slk::Load(&req, req_reader); + + replication::Decoder decoder(req_reader); + + utils::EnsureDirOrDie(wal_directory_); + + const auto maybe_wal_file_number = decoder.ReadUint(); + CHECK(maybe_wal_file_number) + << "Failed to read number of received WAL files!"; + const auto wal_file_number = *maybe_wal_file_number; + + DLOG(INFO) << "Received WAL files: " << wal_file_number; + + std::unique_lock storage_guard(main_lock_); + durability::RecoveredIndicesAndConstraints indices_constraints; + for (auto i = 0; i < wal_file_number; ++i) { + const auto maybe_wal_path = decoder.ReadFile(wal_directory_); + CHECK(maybe_wal_path) << "Failed to load WAL!"; + DLOG(INFO) << "Received WAL saved to " << *maybe_wal_path; + // TODO (antonio2368): Use timestamp_ for now, but the timestamp_ can + // increase by running read queries so define variable that will keep + // last received timestamp + try { + auto wal_info = durability::ReadWalInfo(*maybe_wal_path); + if (wal_info.seq_num == 0) { + uuid_ = wal_info.uuid; + } + auto info = durability::LoadWal( + *maybe_wal_path, &indices_constraints, timestamp_, &vertices_, + &edges_, &name_id_mapper_, &edge_count_, config_.items); + vertex_id_ = std::max(vertex_id_.load(), info.next_vertex_id); + edge_id_ = std::max(edge_id_.load(), info.next_edge_id); + timestamp_ = std::max(timestamp_, info.next_timestamp); + DLOG(INFO) << *maybe_wal_path << " loaded successfully"; + } catch (const durability::RecoveryFailure &e) { + LOG(FATAL) << "Couldn't recover WAL deltas from " << *maybe_wal_path + << " because of: " << e.what(); + } + } + durability::RecoverIndicesAndConstraints(indices_constraints, &indices_, + &constraints_, &vertices_); + storage_guard.unlock(); + + WalFilesRes res; + slk::Save(res, res_builder); + }); + replication_server_->rpc_server->Start(); } void Storage::RegisterReplica(std::string name, io::network::Endpoint endpoint) { - std::unique_lock replication_guard(replication_lock_); + std::shared_lock guard(replication_lock_); CHECK(replication_state_.load() == ReplicationState::MAIN) << "Only main instance can register a replica!"; - auto &replication_clients = GetRpcContext(); - // TODO (antonio2368): Check if it's okay to aquire first the shared lock - // and later on aquire the write lock only if it's necessary - // because here we wait for the write lock even though there exists - // a replica with a same name - if (std::any_of(replication_clients.begin(), replication_clients.end(), - [&](auto &client) { return client.Name() == name; })) { - throw utils::BasicException("Replica with a same name already exists!"); + // We can safely add new elements to the list because it doesn't validate + // existing references/iteratos + auto &client = replication_clients_.WithLock([&](auto &clients) -> auto & { + if (std::any_of(clients.begin(), clients.end(), + [&](auto &client) { return client.Name() == name; })) { + throw utils::BasicException("Replica with a same name already exists!"); + } + clients.emplace_back(std::move(name), &name_id_mapper_, config_.items, + endpoint, false); + return clients.back(); + }); + + // Recovery + auto recovery_locker = file_retainer_.AddLocker(); + std::optional snapshot_file; + std::vector wal_files; + std::optional latest_seq_num; + { + auto recovery_locker_acc = recovery_locker.Access(); + // For now we assume we need to send the latest snapshot + auto snapshot_files = + durability::GetSnapshotFiles(snapshot_directory_, uuid_); + if (!snapshot_files.empty()) { + std::sort(snapshot_files.begin(), snapshot_files.end()); + // TODO (antonio2368): Send the last snapshot for now + // check if additional logic is necessary + // Also, prevent the deletion of the snapshot file and the required + // WALs + snapshot_file.emplace(std::move(snapshot_files.back().first)); + recovery_locker_acc.AddFile(*snapshot_file); + } + + { + // So the wal_file_ isn't overwritten + std::unique_lock engine_guard(engine_lock_); + if (wal_file_) { + // For now let's disable flushing until we send the current WAL + wal_file_->DisableFlushing(); + latest_seq_num = wal_file_->SequenceNumber(); + } + } + + auto maybe_wal_files = + durability::GetWalFiles(wal_directory_, uuid_, latest_seq_num); + CHECK(maybe_wal_files) << "Failed to find WAL files"; + auto &wal_files_with_seq = *maybe_wal_files; + + std::optional previous_seq_num; + for (const auto &[seq_num, from_timestamp, to_timestamp, path] : + wal_files_with_seq) { + if (previous_seq_num && *previous_seq_num + 1 != seq_num) { + LOG(FATAL) << "You are missing a WAL file with the sequence number " + << *previous_seq_num + 1 << "!"; + } + previous_seq_num = seq_num; + wal_files.push_back(path); + recovery_locker_acc.AddFile(path); + } } - replication_clients.emplace_back(std::move(name), &name_id_mapper_, - config_.items, endpoint, false); + if (snapshot_file) { + DLOG(INFO) << "Sending the latest snapshot file: " << *snapshot_file; + client.TransferSnapshot(*snapshot_file); + } + + if (!wal_files.empty()) { + DLOG(INFO) << "Sending the latest wal files"; + client.TransferWalFiles(wal_files); + } + + // We have a current WAL + if (latest_seq_num) { + auto stream = client.TransferCurrentWalFile(); + stream.AppendFilename(wal_file_->Path().filename()); + utils::InputFile file; + CHECK(file.Open(wal_file_->Path())) << "Failed to open current WAL file!"; + const auto [buffer, buffer_size] = wal_file_->CurrentFileBuffer(); + stream.AppendSize(file.GetSize() + buffer_size); + stream.AppendFileData(&file); + stream.AppendBufferData(buffer, buffer_size); + stream.Finalize(); + wal_file_->EnableFlushing(); + } } void Storage::UnregisterReplica(const std::string &name) { std::unique_lock replication_guard(replication_lock_); CHECK(replication_state_.load() == ReplicationState::MAIN) << "Only main instance can unregister a replica!"; - auto &replication_clients = GetRpcContext(); - replication_clients.remove_if( - [&](const auto &client) { return client.Name() == name; }); + replication_clients_.WithLock([&](auto &clients) { + clients.remove_if( + [&](const auto &client) { return client.Name() == name; }); + }); } #endif diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 240b19a28..06c28e19f 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -4,7 +4,6 @@ #include #include #include -#include #include "storage/v2/commit_log.hpp" #include "storage/v2/config.hpp" @@ -19,6 +18,7 @@ #include "storage/v2/transaction.hpp" #include "storage/v2/vertex.hpp" #include "storage/v2/vertex_accessor.hpp" +#include "utils/file_locker.hpp" #include "utils/rw_lock.hpp" #include "utils/scheduler.hpp" #include "utils/skip_list.hpp" @@ -413,12 +413,12 @@ class Storage final { } std::unique_lock replication_guard(replication_lock_); - rpc_context_.emplace(); if constexpr (state == ReplicationState::REPLICA) { ConfigureReplica(std::forward(args)...); } else if (state == ReplicationState::MAIN) { - rpc_context_.emplace(); + // Main instance does not need replication server + replication_server_.reset(); } replication_state_.store(state); @@ -444,6 +444,8 @@ class Storage final { const std::set &properties, uint64_t final_commit_timestamp); + void CreateSnapshot(); + #ifdef MG_ENTERPRISE void ConfigureReplica(io::network::Endpoint endpoint); #endif @@ -521,6 +523,8 @@ class Storage final { std::optional wal_file_; uint64_t wal_unsynced_transactions_{0}; + utils::FileRetainer file_retainer_; + // Replication #ifdef MG_ENTERPRISE utils::RWLock replication_lock_{utils::RWLock::Priority::WRITE}; @@ -543,18 +547,12 @@ class Storage final { } }; - using ReplicationClientList = std::list; - // Monostate is used for explicitly calling the destructor of the current - // type - std::variant - rpc_context_; + using ReplicationClientList = + utils::Synchronized, + utils::SpinLock>; - template - TRpcContext &GetRpcContext() { - auto *context = std::get_if(&rpc_context_); - CHECK(context) << "Wrong type set for the current replication state!"; - return *context; - } + std::optional replication_server_; + ReplicationClientList replication_clients_; std::atomic replication_state_{ReplicationState::MAIN}; #endif diff --git a/src/utils/file.cpp b/src/utils/file.cpp index aaf3d76c1..7fa49a052 100644 --- a/src/utils/file.cpp +++ b/src/utils/file.cpp @@ -7,6 +7,8 @@ #include #include +#include +#include #include #include @@ -289,9 +291,9 @@ OutputFile::~OutputFile() { OutputFile::OutputFile(OutputFile &&other) noexcept : fd_(other.fd_), written_since_last_sync_(other.written_since_last_sync_), - path_(std::move(other.path_)), - buffer_position_(other.buffer_position_) { + path_(std::move(other.path_)) { memcpy(buffer_, other.buffer_, kFileBufferSize); + buffer_position_.store(other.buffer_position_.load()); other.fd_ = -1; other.written_since_last_sync_ = 0; other.buffer_position_ = 0; @@ -303,7 +305,7 @@ OutputFile &OutputFile::operator=(OutputFile &&other) noexcept { fd_ = other.fd_; written_since_last_sync_ = other.written_since_last_sync_; path_ = std::move(other.path_); - buffer_position_ = other.buffer_position_; + buffer_position_ = other.buffer_position_.load(); memcpy(buffer_, other.buffer_, kFileBufferSize); other.fd_ = -1; @@ -350,13 +352,21 @@ const std::filesystem::path &OutputFile::path() const { return path_; } void OutputFile::Write(const uint8_t *data, size_t size) { while (size > 0) { FlushBuffer(false); - auto buffer_left = kFileBufferSize - buffer_position_; - auto to_write = size < buffer_left ? size : buffer_left; - memcpy(buffer_ + buffer_position_, data, to_write); - size -= to_write; - data += to_write; - buffer_position_ += to_write; - written_since_last_sync_ += to_write; + { + // Reading thread can call EnableFlushing which triggers + // TryFlushing. + // We can't use a single shared lock for the entire Write + // because FlushBuffer acquires the unique_lock. + std::shared_lock flush_guard(flush_lock_); + const size_t buffer_position = buffer_position_.load(); + auto buffer_left = kFileBufferSize - buffer_position; + auto to_write = size < buffer_left ? size : buffer_left; + memcpy(buffer_ + buffer_position, data, to_write); + size -= to_write; + data += to_write; + buffer_position_.fetch_add(to_write); + written_since_last_sync_ += to_write; + } } } @@ -367,13 +377,7 @@ void OutputFile::Write(const std::string_view &data) { Write(data.data(), data.size()); } -size_t OutputFile::GetPosition() { - return SetPosition(Position::RELATIVE_TO_CURRENT, 0); -} - -size_t OutputFile::SetPosition(Position position, ssize_t offset) { - FlushBuffer(true); - +size_t OutputFile::SeekFile(const Position position, const ssize_t offset) { int whence; switch (position) { case Position::SET: @@ -398,6 +402,15 @@ size_t OutputFile::SetPosition(Position position, ssize_t offset) { } } +size_t OutputFile::GetPosition() { + return SetPosition(Position::RELATIVE_TO_CURRENT, 0); +} + +size_t OutputFile::SetPosition(Position position, ssize_t offset) { + FlushBuffer(true); + return SeekFile(position, offset); +} + bool OutputFile::AcquireLock() { CHECK(IsOpen()) << "Trying to acquire a write lock on an unopened file!"; int ret = -1; @@ -497,14 +510,20 @@ void OutputFile::Close() noexcept { void OutputFile::FlushBuffer(bool force_flush) { CHECK(IsOpen()); - if (!force_flush && buffer_position_ < kFileBufferSize) return; + if (!force_flush && buffer_position_.load() < kFileBufferSize) return; + std::unique_lock flush_guard(flush_lock_); + FlushBufferInternal(); +} + +void OutputFile::FlushBufferInternal() { CHECK(buffer_position_ <= kFileBufferSize) << "While trying to write to " << path_ << " more file was written to the buffer than the buffer has space!"; auto *buffer = buffer_; - while (buffer_position_ > 0) { + auto buffer_position = buffer_position_.load(); + while (buffer_position > 0) { auto written = write(fd_, buffer, buffer_position_); if (written == -1 && errno == EINTR) { continue; @@ -517,9 +536,40 @@ void OutputFile::FlushBuffer(bool force_flush) { << " bytes of data were lost from this call and possibly " << written_since_last_sync_ << " bytes were lost from previous calls."; - buffer_position_ -= written; + buffer_position -= written; buffer += written; } + + buffer_position_.store(buffer_position); +} + +void OutputFile::DisableFlushing() { flush_lock_.lock_shared(); } + +void OutputFile::EnableFlushing() { + flush_lock_.unlock_shared(); + TryFlushing(); +} + +std::pair OutputFile::CurrentBuffer() const { + return {buffer_, buffer_position_.load()}; +} + +size_t OutputFile::GetSize() { + // There's an alternative way of fetching the files size using fstat. + // lseek should be faster for smaller number of clients while fstat + // should have an advantage for high number of clients. + // The reason for this is the way those functions implement the + // support for multi-threading. While lseek uses locks, fstat is lockfree. + // For now, lseek should be good enough. If at any point this proves to + // be a bottleneck, fstat should be considered. + return SeekFile(Position::RELATIVE_TO_END, 0) + buffer_position_.load(); +} + +void OutputFile::TryFlushing() { + if (std::unique_lock guard(flush_lock_, std::try_to_lock); + guard.owns_lock()) { + FlushBufferInternal(); + } } } // namespace utils diff --git a/src/utils/file.hpp b/src/utils/file.hpp index cfdd5466a..578c5a50b 100644 --- a/src/utils/file.hpp +++ b/src/utils/file.hpp @@ -6,12 +6,15 @@ */ #pragma once +#include #include #include #include #include #include +#include "utils/rw_lock.hpp" + namespace utils { /// Get the path of the current executable. @@ -152,7 +155,11 @@ class InputFile { /// written to permanent storage. /// /// This class *isn't* thread safe. It is implemented as a wrapper around low -/// level system calls used for file manipulation. +/// level system calls used for file manipulation. It allows concurrent +/// READING of the file that is being written. To read the file, disable the +/// flushing of the internal buffer using `DisableFlushing`. Don't forget to +/// enable flushing again after you're done with reading using the +/// 'EnableFlushing' method! class OutputFile { public: enum class Mode { @@ -220,14 +227,37 @@ class OutputFile { /// file. On failure and misuse it crashes the program. void Close() noexcept; + /// Disable flushing of the internal buffer. + void DisableFlushing(); + + /// Enable flushing of the internal buffer. + /// Before the flushing is enabled, the internal buffer + /// is flushed. + void EnableFlushing(); + + /// Try flushing the internal buffer. + void TryFlushing(); + + /// Get the internal buffer with its current size. + std::pair CurrentBuffer() const; + + /// Get the size of the file. + size_t GetSize(); + private: void FlushBuffer(bool force_flush); + void FlushBufferInternal(); + + size_t SeekFile(Position position, ssize_t offset); int fd_{-1}; size_t written_since_last_sync_{0}; std::filesystem::path path_; uint8_t buffer_[kFileBufferSize]; - size_t buffer_position_{0}; + std::atomic buffer_position_{0}; + + // Flushing buffer should be a higher priority + utils::RWLock flush_lock_{RWLock::Priority::WRITE}; }; } // namespace utils diff --git a/src/utils/synchronized.hpp b/src/utils/synchronized.hpp index fa7b90e47..28c41c336 100644 --- a/src/utils/synchronized.hpp +++ b/src/utils/synchronized.hpp @@ -79,7 +79,7 @@ class Synchronized { LockedPtr Lock() { return LockedPtr(&object_, &mutex_); } template - auto WithLock(TCallable &&callable) { + decltype(auto) WithLock(TCallable &&callable) { return callable(*Lock()); } diff --git a/tests/unit/storage_v2_replication.cpp b/tests/unit/storage_v2_replication.cpp index d60d1bb03..3d003d3da 100644 --- a/tests/unit/storage_v2_replication.cpp +++ b/tests/unit/storage_v2_replication.cpp @@ -350,3 +350,134 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) { ASSERT_FALSE(acc.Commit().HasError()); } } + +TEST_F(ReplicationTest, RecoveryProcess) { + std::vector vertex_gids; + // Force the creation of snapshot + { + storage::Storage main_store( + {.durability = { + .storage_directory = storage_directory, + .recover_on_startup = true, + .snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode:: + PERIODIC_SNAPSHOT_WITH_WAL, + .snapshot_on_exit = true, + }}); + { + auto acc = main_store.Access(); + // Create the vertex before registering a replica + auto v = acc.CreateVertex(); + vertex_gids.emplace_back(v.Gid()); + ASSERT_FALSE(acc.Commit().HasError()); + } + } + + { + // Create second WAL + storage::Storage main_store( + {.durability = {.storage_directory = storage_directory, + .recover_on_startup = true, + .snapshot_wal_mode = storage::Config::Durability:: + SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}}); + // Create vertices in 2 different transactions + { + auto acc = main_store.Access(); + auto v = acc.CreateVertex(); + vertex_gids.emplace_back(v.Gid()); + ASSERT_FALSE(acc.Commit().HasError()); + } + { + auto acc = main_store.Access(); + auto v = acc.CreateVertex(); + vertex_gids.emplace_back(v.Gid()); + ASSERT_FALSE(acc.Commit().HasError()); + } + } + + storage::Storage main_store( + {.durability = { + .storage_directory = storage_directory, + .recover_on_startup = true, + .snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode:: + PERIODIC_SNAPSHOT_WITH_WAL, + }}); + + constexpr const auto *property_name = "property_name"; + constexpr const auto property_value = 1; + { + // Force the creation of current WAL file + auto acc = main_store.Access(); + for (const auto &vertex_gid : vertex_gids) { + auto v = acc.FindVertex(vertex_gid, storage::View::OLD); + ASSERT_TRUE(v); + ASSERT_TRUE(v->SetProperty(main_store.NameToProperty(property_name), + storage::PropertyValue(property_value)) + .HasValue()); + } + ASSERT_FALSE(acc.Commit().HasError()); + } + + std::filesystem::path replica_storage_directory{ + std::filesystem::temp_directory_path() / + "MG_test_unit_storage_v2_replication_replica"}; + utils::OnScopeExit replica_directory_cleaner( + [&]() { std::filesystem::remove_all(replica_storage_directory); }); + { + storage::Storage replica_store( + {.durability = {.storage_directory = replica_storage_directory}}); + + replica_store.SetReplicationState( + io::network::Endpoint{"127.0.0.1", 10000}); + + main_store.RegisterReplica("REPLICA1", + io::network::Endpoint{"127.0.0.1", 10000}); + constexpr const auto *vertex_label = "vertex_label"; + { + auto acc = main_store.Access(); + for (const auto &vertex_gid : vertex_gids) { + auto v = acc.FindVertex(vertex_gid, storage::View::OLD); + ASSERT_TRUE(v); + ASSERT_TRUE( + v->AddLabel(main_store.NameToLabel(vertex_label)).HasValue()); + } + ASSERT_FALSE(acc.Commit().HasError()); + } + { + auto acc = replica_store.Access(); + for (const auto &vertex_gid : vertex_gids) { + auto v = acc.FindVertex(vertex_gid, storage::View::OLD); + ASSERT_TRUE(v); + const auto labels = v->Labels(storage::View::OLD); + ASSERT_TRUE(labels.HasValue()); + ASSERT_THAT(*labels, UnorderedElementsAre( + replica_store.NameToLabel(vertex_label))); + const auto properties = v->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + ASSERT_THAT(*properties, + UnorderedElementsAre(std::make_pair( + replica_store.NameToProperty(property_name), + storage::PropertyValue(property_value)))); + } + ASSERT_FALSE(acc.Commit().HasError()); + } + } + // Test recovery of the replica with the files received from Main + { + storage::Storage replica_store( + {.durability = {.storage_directory = replica_storage_directory, + .recover_on_startup = true}}); + { + auto acc = replica_store.Access(); + for (const auto &vertex_gid : vertex_gids) { + auto v = acc.FindVertex(vertex_gid, storage::View::OLD); + ASSERT_TRUE(v); + const auto labels = v->Labels(storage::View::OLD); + ASSERT_TRUE(labels.HasValue()); + // Labels are received with AppendDeltasRpc so they are not saved on + // disk + ASSERT_EQ(labels->size(), 0); + } + ASSERT_FALSE(acc.Commit().HasError()); + } + } +} diff --git a/tests/unit/utils_file.cpp b/tests/unit/utils_file.cpp index a149d48c3..ca219c29e 100644 --- a/tests/unit/utils_file.cpp +++ b/tests/unit/utils_file.cpp @@ -1,13 +1,17 @@ +#include #include #include #include +#include #include #include #include #include "utils/file.hpp" +#include "utils/spin_lock.hpp" #include "utils/string.hpp" +#include "utils/synchronized.hpp" namespace fs = std::filesystem; @@ -283,3 +287,95 @@ TEST_F(UtilsFileTest, OutputFileDescriptorLeackage) { utils::OutputFile::Mode::APPEND_TO_EXISTING); } } + +TEST_F(UtilsFileTest, ConcurrentReadingAndWritting) { + const auto file_path = storage / "existing_dir_777" / "existing_file_777"; + utils::OutputFile handle; + handle.Open(file_path, utils::OutputFile::Mode::OVERWRITE_EXISTING); + + std::default_random_engine engine(586478780); + std::uniform_int_distribution random_short_wait(1, 10); + + const auto sleep_for = [&](int milliseconds) { + std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds)); + }; + + constexpr size_t number_of_writes = 500; + std::thread writer_thread([&] { + uint8_t current_number = 0; + for (size_t i = 0; i < number_of_writes; ++i) { + handle.Write(¤t_number, 1); + ++current_number; + handle.TryFlushing(); + sleep_for(random_short_wait(engine)); + } + }); + + constexpr size_t reader_threads_num = 7; + // number_of_reads needs to be higher than number_of_writes + // so we maximize the chance of having at least one reading + // thread that will read all of the data. + constexpr size_t number_of_reads = 550; + std::vector reader_threads(reader_threads_num); + utils::Synchronized, utils::SpinLock> max_read_counts; + for (size_t i = 0; i < reader_threads_num; ++i) { + reader_threads.emplace_back([&] { + for (size_t i = 0; i < number_of_reads; ++i) { + handle.DisableFlushing(); + auto [buffer, buffer_size] = handle.CurrentBuffer(); + utils::InputFile input_handle; + input_handle.Open(file_path); + std::optional previous_number; + size_t total_read_count = 0; + uint8_t current_number; + // Read the file + while (input_handle.Read(¤t_number, 1)) { + if (previous_number) { + const uint8_t expected_next = *previous_number + 1; + ASSERT_TRUE(current_number == expected_next); + } + previous_number = current_number; + ++total_read_count; + } + // Read the buffer + while (buffer_size > 0) { + if (previous_number) { + const uint8_t expected_next = *previous_number + 1; + ASSERT_TRUE(*buffer == expected_next); + } + previous_number = *buffer; + ++buffer; + --buffer_size; + ++total_read_count; + } + handle.EnableFlushing(); + input_handle.Close(); + // Last read will always have the highest amount of + // bytes read. + if (i == number_of_reads - 1) { + max_read_counts.WithLock([&](auto &read_counts) { + read_counts.push_back(total_read_count); + }); + } + sleep_for(random_short_wait(engine)); + } + }); + } + + if (writer_thread.joinable()) { + writer_thread.join(); + } + for (auto &reader_thread : reader_threads) { + if (reader_thread.joinable()) { + reader_thread.join(); + } + } + + handle.Close(); + // Check if any of the threads read the entire data. + ASSERT_TRUE(max_read_counts.WithLock([&](auto &read_counts) { + return std::any_of( + read_counts.cbegin(), read_counts.cend(), + [](const auto read_count) { return read_count == number_of_writes; }); + })); +}