Define communication process (#49)

* Add basic communication process using commit timestamp
* Add file number to req
* Add proper recovery handling
* Allow loading of WALs with same seq num
* Allow always desired commit timestamp
* Set replica timestamp for operation
* Mark non-transactional timestamp as finished
This commit is contained in:
antonio2368 2020-11-25 12:08:26 +01:00 committed by Antonio Andelic
parent 3c85319701
commit 7e9175052a
15 changed files with 997 additions and 362 deletions

View File

@ -50,10 +50,10 @@ void VerifyStorageDirectoryOwnerAndProcessUserOrDie(
<< ". Please start the process as user " << user_directory << "!";
}
std::vector<std::pair<std::filesystem::path, std::string>> GetSnapshotFiles(
std::vector<SnapshotDurabilityInfo> GetSnapshotFiles(
const std::filesystem::path &snapshot_directory,
const std::string_view uuid) {
std::vector<std::pair<std::filesystem::path, std::string>> snapshot_files;
std::vector<SnapshotDurabilityInfo> snapshot_files;
std::error_code error_code;
if (utils::DirExists(snapshot_directory)) {
for (const auto &item :
@ -62,7 +62,8 @@ std::vector<std::pair<std::filesystem::path, std::string>> GetSnapshotFiles(
try {
auto info = ReadSnapshotInfo(item.path());
if (uuid.empty() || info.uuid == uuid) {
snapshot_files.emplace_back(item.path(), info.uuid);
snapshot_files.emplace_back(item.path(), std::move(info.uuid),
info.start_timestamp);
}
} catch (const RecoveryFailure &) {
continue;
@ -75,15 +76,12 @@ std::vector<std::pair<std::filesystem::path, std::string>> GetSnapshotFiles(
return snapshot_files;
}
std::optional<std::vector<
std::tuple<uint64_t, uint64_t, uint64_t, std::filesystem::path>>>
GetWalFiles(const std::filesystem::path &wal_directory,
const std::string_view uuid,
const std::optional<size_t> current_seq_num) {
std::optional<std::vector<WalDurabilityInfo>> GetWalFiles(
const std::filesystem::path &wal_directory, const std::string_view uuid,
const std::optional<size_t> current_seq_num) {
if (!utils::DirExists(wal_directory)) return std::nullopt;
std::vector<std::tuple<uint64_t, uint64_t, uint64_t, std::filesystem::path>>
wal_files;
std::vector<WalDurabilityInfo> wal_files;
std::error_code error_code;
for (const auto &item :
std::filesystem::directory_iterator(wal_directory, error_code)) {
@ -93,7 +91,8 @@ GetWalFiles(const std::filesystem::path &wal_directory,
if ((uuid.empty() || info.uuid == uuid) &&
(!current_seq_num || info.seq_num < current_seq_num))
wal_files.emplace_back(info.seq_num, info.from_timestamp,
info.to_timestamp, item.path());
info.to_timestamp, std::move(info.uuid),
item.path());
} catch (const RecoveryFailure &e) {
DLOG(WARNING) << "Failed to read " << item.path();
continue;
@ -101,6 +100,7 @@ GetWalFiles(const std::filesystem::path &wal_directory,
}
CHECK(!error_code) << "Couldn't recover data because an error occurred: "
<< error_code.message() << "!";
std::sort(wal_files.begin(), wal_files.end());
return std::move(wal_files);
}
@ -161,11 +161,12 @@ std::optional<RecoveryInfo> RecoverData(
if (!snapshot_files.empty()) {
// Order the files by name
std::sort(snapshot_files.begin(), snapshot_files.end());
// UUID used for durability is the UUID of the last snapshot file.
*uuid = snapshot_files.back().second;
*uuid = snapshot_files.back().uuid;
std::optional<RecoveredSnapshot> recovered_snapshot;
for (auto it = snapshot_files.rbegin(); it != snapshot_files.rend(); ++it) {
const auto &[path, file_uuid] = *it;
const auto &[path, file_uuid, _] = *it;
if (file_uuid != *uuid) {
LOG(WARNING) << "The snapshot file " << path
<< " isn't related to the latest snapshot file!";
@ -236,9 +237,9 @@ std::optional<RecoveryInfo> RecoverData(
"files that match the last WAL file!";
if (!wal_files.empty()) {
std::sort(wal_files.begin(), wal_files.end());
{
const auto &[seq_num, from_timestamp, to_timestamp, path] = wal_files[0];
const auto &[seq_num, from_timestamp, to_timestamp, _, path] =
wal_files[0];
if (seq_num != 0) {
// We don't have all WAL files. We need to see whether we need them all.
if (!snapshot_timestamp) {
@ -257,15 +258,17 @@ std::optional<RecoveryInfo> RecoverData(
}
}
std::optional<uint64_t> previous_seq_num;
for (const auto &[seq_num, from_timestamp, to_timestamp, path] :
auto last_loaded_timestamp = snapshot_timestamp;
for (const auto &[seq_num, from_timestamp, to_timestamp, _, path] :
wal_files) {
if (previous_seq_num && *previous_seq_num + 1 != seq_num) {
if (previous_seq_num && *previous_seq_num + 1 != seq_num &&
*previous_seq_num != seq_num) {
LOG(FATAL) << "You are missing a WAL file with the sequence number "
<< *previous_seq_num + 1 << "!";
}
previous_seq_num = seq_num;
try {
auto info = LoadWal(path, &indices_constraints, snapshot_timestamp,
auto info = LoadWal(path, &indices_constraints, last_loaded_timestamp,
vertices, edges, name_id_mapper, edge_count, items);
recovery_info.next_vertex_id =
std::max(recovery_info.next_vertex_id, info.next_vertex_id);
@ -273,6 +276,7 @@ std::optional<RecoveryInfo> RecoverData(
std::max(recovery_info.next_edge_id, info.next_edge_id);
recovery_info.next_timestamp =
std::max(recovery_info.next_timestamp, info.next_timestamp);
last_loaded_timestamp.emplace(recovery_info.next_timestamp - 1);
} catch (const RecoveryFailure &e) {
LOG(FATAL) << "Couldn't recover WAL deltas from " << path
<< " because of: " << e.what();

View File

@ -5,6 +5,7 @@
#include <filesystem>
#include <optional>
#include <string>
#include <variant>
#include "storage/v2/config.hpp"
#include "storage/v2/constraints.hpp"
@ -24,16 +25,52 @@ namespace storage::durability {
void VerifyStorageDirectoryOwnerAndProcessUserOrDie(
const std::filesystem::path &storage_directory);
// Used to capture the snapshot's data related to durability
struct SnapshotDurabilityInfo {
explicit SnapshotDurabilityInfo(std::filesystem::path path, std::string uuid,
const uint64_t start_timestamp)
: path(std::move(path)),
uuid(std::move(uuid)),
start_timestamp(start_timestamp) {}
std::filesystem::path path;
std::string uuid;
uint64_t start_timestamp;
auto operator<=>(const SnapshotDurabilityInfo &) const = default;
};
/// Get list of snapshot files with their UUID.
/// @param snapshot_directory Directory containing the Snapshot files.
/// @param uuid UUID of the Snapshot files. If not empty, fetch only Snapshot
/// file with the specified UUID. Otherwise, fetch only Snapshot files in the
/// snapshot_directory.
/// @return List of snapshot files defined with its path and UUID.
std::vector<std::pair<std::filesystem::path, std::string>> GetSnapshotFiles(
std::vector<SnapshotDurabilityInfo> GetSnapshotFiles(
const std::filesystem::path &snapshot_directory,
std::string_view uuid = "");
/// Used to capture a WAL's data related to durability
struct WalDurabilityInfo {
explicit WalDurabilityInfo(const uint64_t seq_num,
const uint64_t from_timestamp,
const uint64_t to_timestamp, std::string uuid,
std::filesystem::path path)
: seq_num(seq_num),
from_timestamp(from_timestamp),
to_timestamp(to_timestamp),
uuid(std::move(uuid)),
path(std::move(path)) {}
uint64_t seq_num;
uint64_t from_timestamp;
uint64_t to_timestamp;
std::string uuid;
std::filesystem::path path;
auto operator<=>(const WalDurabilityInfo &) const = default;
};
/// Get list of WAL files ordered by the sequence number
/// @param wal_directory Directory containing the WAL files.
/// @param uuid UUID of the WAL files. If not empty, fetch only WAL files
@ -44,11 +81,9 @@ std::vector<std::pair<std::filesystem::path, std::string>> GetSnapshotFiles(
/// with seq_num < current_seq_num.
/// @return List of WAL files. Each WAL file is defined with its sequence
/// number, from timestamp, to timestamp and path.
std::optional<std::vector<
std::tuple<uint64_t, uint64_t, uint64_t, std::filesystem::path>>>
GetWalFiles(const std::filesystem::path &wal_directory,
std::string_view uuid = "",
std::optional<size_t> current_seq_num = {});
std::optional<std::vector<WalDurabilityInfo>> GetWalFiles(
const std::filesystem::path &wal_directory, std::string_view uuid = "",
std::optional<size_t> current_seq_num = {});
// Helper function used to recover all discovered indices and constraints. The
// indices and constraints must be recovered after the data recovery is done

View File

@ -24,6 +24,16 @@ void Encoder::Initialize(const std::filesystem::path &path,
sizeof(version_encoded));
}
void Encoder::OpenExisting(const std::filesystem::path &path) {
file_.Open(path, utils::OutputFile::Mode::APPEND_TO_EXISTING);
}
void Encoder::Close() {
if (file_.IsOpen()) {
file_.Close();
}
}
void Encoder::Write(const uint8_t *data, uint64_t size) {
file_.Write(data, size);
}

View File

@ -33,6 +33,9 @@ class Encoder final : public BaseEncoder {
void Initialize(const std::filesystem::path &path,
const std::string_view &magic, uint64_t version);
void OpenExisting(const std::filesystem::path &path);
void Close();
// Main write function, the only one that is allowed to write to the `file_`
// directly.
void Write(const uint8_t *data, uint64_t size);

View File

@ -6,6 +6,7 @@
#include "storage/v2/durability/version.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/vertex.hpp"
#include "utils/file_locker.hpp"
namespace storage::durability {
@ -664,7 +665,7 @@ void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
RecoveryInfo LoadWal(const std::filesystem::path &path,
RecoveredIndicesAndConstraints *indices_constraints,
std::optional<uint64_t> snapshot_timestamp,
const std::optional<uint64_t> last_loaded_timestamp,
utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper,
std::atomic<uint64_t> *edge_count, Config::Items items) {
@ -681,7 +682,7 @@ RecoveryInfo LoadWal(const std::filesystem::path &path,
auto info = ReadWalInfo(path);
// Check timestamp.
if (snapshot_timestamp && info.to_timestamp <= *snapshot_timestamp)
if (last_loaded_timestamp && info.to_timestamp <= *last_loaded_timestamp)
return ret;
// Recover deltas.
@ -693,7 +694,7 @@ RecoveryInfo LoadWal(const std::filesystem::path &path,
// Read WAL delta header to find out the delta timestamp.
auto timestamp = ReadWalDeltaHeader(&wal);
if (!snapshot_timestamp || timestamp > *snapshot_timestamp) {
if (!last_loaded_timestamp || timestamp > *last_loaded_timestamp) {
// This delta should be loaded.
auto delta = ReadWalDeltaData(&wal);
switch (delta.type) {
@ -970,14 +971,16 @@ RecoveryInfo LoadWal(const std::filesystem::path &path,
WalFile::WalFile(const std::filesystem::path &wal_directory,
const std::string &uuid, Config::Items items,
NameIdMapper *name_id_mapper, uint64_t seq_num)
NameIdMapper *name_id_mapper, uint64_t seq_num,
utils::FileRetainer *file_retainer)
: items_(items),
name_id_mapper_(name_id_mapper),
path_(wal_directory / MakeWalName()),
from_timestamp_(0),
to_timestamp_(0),
count_(0),
seq_num_(seq_num) {
seq_num_(seq_num),
file_retainer_(file_retainer) {
// Ensure that the storage directory exists.
utils::EnsureDirOrDie(wal_directory);
@ -1010,20 +1013,43 @@ WalFile::WalFile(const std::filesystem::path &wal_directory,
wal_.Sync();
}
WalFile::~WalFile() {
if (count_ != 0) {
// Finalize file.
wal_.Finalize();
WalFile::WalFile(std::filesystem::path current_wal_path, Config::Items items,
NameIdMapper *name_id_mapper, uint64_t seq_num,
uint64_t from_timestamp, uint64_t to_timestamp, uint64_t count,
utils::FileRetainer *file_retainer)
: items_(items),
name_id_mapper_(name_id_mapper),
path_(std::move(current_wal_path)),
from_timestamp_(from_timestamp),
to_timestamp_(to_timestamp),
count_(count),
seq_num_(seq_num),
file_retainer_(file_retainer) {
wal_.OpenExisting(path_);
}
void WalFile::FinalizeWal() {
if (count_ != 0) {
wal_.Finalize();
// Rename file.
std::filesystem::path new_path(path_);
new_path.replace_filename(
RemakeWalName(path_.filename(), from_timestamp_, to_timestamp_));
// If the rename fails it isn't a crucial situation. The renaming is done
// only to make the directory structure of the WAL files easier to read
// manually.
utils::RenamePath(path_, new_path);
} else {
utils::CopyFile(path_, new_path);
wal_.Close();
file_retainer_->DeleteFile(path_);
path_ = std::move(new_path);
}
}
void WalFile::DeleteWal() {
wal_.Close();
file_retainer_->DeleteFile(path_);
}
WalFile::~WalFile() {
if (count_ == 0) {
// Remove empty WAL file.
utils::DeleteFile(path_);
}

View File

@ -14,6 +14,7 @@
#include "storage/v2/name_id_mapper.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/vertex.hpp"
#include "utils/file_locker.hpp"
#include "utils/skip_list.hpp"
namespace storage::durability {
@ -149,7 +150,7 @@ void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
/// @throw RecoveryFailure
RecoveryInfo LoadWal(const std::filesystem::path &path,
RecoveredIndicesAndConstraints *indices_constraints,
std::optional<uint64_t> snapshot_timestamp,
std::optional<uint64_t> last_loaded_timestamp,
utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper,
std::atomic<uint64_t> *edge_count, Config::Items items);
@ -158,7 +159,12 @@ RecoveryInfo LoadWal(const std::filesystem::path &path,
class WalFile {
public:
WalFile(const std::filesystem::path &wal_directory, const std::string &uuid,
Config::Items items, NameIdMapper *name_id_mapper, uint64_t seq_num);
Config::Items items, NameIdMapper *name_id_mapper, uint64_t seq_num,
utils::FileRetainer *file_retainer);
WalFile(std::filesystem::path current_wal_path, Config::Items items,
NameIdMapper *name_id_mapper, uint64_t seq_num,
uint64_t from_timestamp, uint64_t to_timestamp, uint64_t count,
utils::FileRetainer *file_retainer);
WalFile(const WalFile &) = delete;
WalFile(WalFile &&) = delete;
@ -183,6 +189,12 @@ class WalFile {
uint64_t SequenceNumber() const;
auto FromTimestamp() const { return from_timestamp_; }
auto ToTimestamp() const { return to_timestamp_; }
auto Count() const { return count_; }
// Disable flushing of the internal buffer.
void DisableFlushing();
// Enable flushing of the internal buffer.
@ -195,6 +207,9 @@ class WalFile {
// Get the path of the current WAL file.
const auto &Path() const { return path_; }
void FinalizeWal();
void DeleteWal();
private:
void UpdateStats(uint64_t timestamp);
@ -206,6 +221,8 @@ class WalFile {
uint64_t to_timestamp_;
uint64_t count_;
uint64_t seq_num_;
utils::FileRetainer *file_retainer_;
};
} // namespace storage::durability

View File

@ -1,41 +1,92 @@
#include "storage/v2/replication/replication.hpp"
#include <algorithm>
#include <type_traits>
#include "storage/v2/durability/durability.hpp"
#include "utils/file_locker.hpp"
namespace storage::replication {
namespace {
template <typename>
[[maybe_unused]] inline constexpr bool always_false_v = false;
} // namespace
////// ReplicationClient //////
ReplicationClient::ReplicationClient(std::string name,
NameIdMapper *name_id_mapper,
Config::Items items,
const io::network::Endpoint &endpoint,
bool use_ssl, const ReplicationMode mode)
ReplicationClient::ReplicationClient(
std::string name, const std::atomic<uint64_t> &last_commit_timestamp,
NameIdMapper *name_id_mapper, Config::Items items,
utils::FileRetainer *file_retainer,
const std::filesystem::path &snapshot_directory,
const std::filesystem::path &wal_directory, const std::string_view uuid,
std::optional<durability::WalFile> *wal_file_ptr,
utils::SpinLock *transaction_engine_lock,
const io::network::Endpoint &endpoint, bool use_ssl,
const ReplicationMode mode)
: name_(std::move(name)),
last_commit_timestamp_{last_commit_timestamp},
name_id_mapper_(name_id_mapper),
items_(items),
file_retainer_(file_retainer),
snapshot_directory_(snapshot_directory),
wal_directory_(wal_directory),
uuid_(uuid),
wal_file_ptr_(wal_file_ptr),
transaction_engine_lock_(transaction_engine_lock),
rpc_context_(use_ssl),
rpc_client_(endpoint, &rpc_context_),
mode_(mode) {}
mode_(mode) {
InitializeClient();
}
void ReplicationClient::TransferSnapshot(const std::filesystem::path &path) {
void ReplicationClient::InitializeClient() {
uint64_t current_commit_timestamp{kTimestampInitialId};
auto stream{rpc_client_.Stream<HeartbeatRpc>()};
const auto response = stream.AwaitResponse();
current_commit_timestamp = response.current_commit_timestamp;
DLOG(INFO) << "CURRENT TIMESTAMP: " << current_commit_timestamp;
DLOG(INFO) << "CURRENT MAIN TIMESTAMP: " << last_commit_timestamp_.load();
if (current_commit_timestamp == last_commit_timestamp_.load()) {
DLOG(INFO) << "REPLICA UP TO DATE";
replica_state_.store(ReplicaState::READY);
} else {
DLOG(INFO) << "REPLICA IS BEHIND";
replica_state_.store(ReplicaState::RECOVERY);
thread_pool_.AddTask(
[=, this] { this->RecoverReplica(current_commit_timestamp); });
}
}
SnapshotRes ReplicationClient::TransferSnapshot(
const std::filesystem::path &path) {
auto stream{rpc_client_.Stream<SnapshotRpc>()};
Encoder encoder(stream.GetBuilder());
encoder.WriteFile(path);
stream.AwaitResponse();
return stream.AwaitResponse();
}
void ReplicationClient::TransferWalFiles(
WalFilesRes ReplicationClient::TransferWalFiles(
const std::vector<std::filesystem::path> &wal_files) {
CHECK(!wal_files.empty()) << "Wal files list is empty!";
auto stream{rpc_client_.Stream<WalFilesRpc>()};
auto stream{rpc_client_.Stream<WalFilesRpc>(wal_files.size())};
Encoder encoder(stream.GetBuilder());
encoder.WriteUint(wal_files.size());
for (const auto &wal : wal_files) {
DLOG(INFO) << "Sending wal file: " << wal;
encoder.WriteFile(wal);
}
stream.AwaitResponse();
return stream.AwaitResponse();
}
bool ReplicationClient::StartTransactionReplication() {
OnlySnapshotRes ReplicationClient::TransferOnlySnapshot(
const uint64_t snapshot_timestamp) {
auto stream{rpc_client_.Stream<OnlySnapshotRpc>(snapshot_timestamp)};
return stream.AwaitResponse();
}
bool ReplicationClient::StartTransactionReplication(
const uint64_t current_wal_seq_num) {
std::unique_lock guard(client_lock_);
const auto status = replica_state_.load();
switch (status) {
@ -43,11 +94,26 @@ bool ReplicationClient::StartTransactionReplication() {
DLOG(INFO) << "Replica " << name_ << " is behind MAIN instance";
return false;
case ReplicaState::REPLICATING:
DLOG(INFO) << "Replica missed a transaction, going to recovery";
replica_state_.store(ReplicaState::RECOVERY);
// If it's in replicating state, it should have been up to date with all
// the commits until now so the replica should contain the
// last_commit_timestamp
thread_pool_.AddTask(
[=, this] { this->RecoverReplica(last_commit_timestamp_.load()); });
return false;
case ReplicaState::READY:
CHECK(!replica_stream_);
replica_stream_.emplace(ReplicaStream{this});
try {
replica_stream_.emplace(ReplicaStream{
this, last_commit_timestamp_.load(), current_wal_seq_num});
} catch (const rpc::RpcFailedException &) {
LOG(ERROR) << "Couldn't replicate data to " << name_;
thread_pool_.AddTask([this] {
rpc_client_.Abort();
InitializeClient();
});
}
replica_state_.store(ReplicaState::REPLICATING);
return true;
}
@ -56,7 +122,15 @@ bool ReplicationClient::StartTransactionReplication() {
void ReplicationClient::IfStreamingTransaction(
const std::function<void(ReplicaStream &handler)> &callback) {
if (replica_stream_) {
callback(*replica_stream_);
try {
callback(*replica_stream_);
} catch (const rpc::RpcFailedException &) {
LOG(ERROR) << "Couldn't replicate data to " << name_;
thread_pool_.AddTask([this] {
rpc_client_.Abort();
InitializeClient();
});
}
}
}
@ -71,7 +145,21 @@ void ReplicationClient::FinalizeTransactionReplication() {
void ReplicationClient::FinalizeTransactionReplicationInternal() {
if (replica_stream_) {
replica_stream_->Finalize();
try {
auto response = replica_stream_->Finalize();
if (!response.success) {
replica_state_.store(ReplicaState::RECOVERY);
thread_pool_.AddTask([&, this] {
this->RecoverReplica(response.current_commit_timestamp);
});
}
} catch (const rpc::RpcFailedException &) {
LOG(ERROR) << "Couldn't replicate data to " << name_;
thread_pool_.AddTask([this] {
rpc_client_.Abort();
InitializeClient();
});
}
replica_stream_.reset();
}
@ -80,9 +168,248 @@ void ReplicationClient::FinalizeTransactionReplicationInternal() {
replica_state_.store(ReplicaState::READY);
}
}
void ReplicationClient::RecoverReplica(uint64_t replica_commit) {
while (true) {
auto file_locker = file_retainer_->AddLocker();
const auto steps = GetRecoverySteps(replica_commit, &file_locker);
for (const auto &recovery_step : steps) {
std::visit(
[&, this]<typename T>(T &&arg) {
using StepType = std::remove_cvref_t<T>;
if constexpr (std::is_same_v<StepType, RecoverySnapshot>) {
DLOG(INFO) << "Sending the latest snapshot file: " << arg;
auto response = TransferSnapshot(arg);
replica_commit = response.current_commit_timestamp;
DLOG(INFO) << "CURRENT TIMESTAMP ON REPLICA: " << replica_commit;
} else if constexpr (std::is_same_v<StepType, RecoveryWals>) {
DLOG(INFO) << "Sending the latest wal files";
auto response = TransferWalFiles(arg);
replica_commit = response.current_commit_timestamp;
DLOG(INFO) << "CURRENT TIMESTAMP ON REPLICA: " << replica_commit;
} else if constexpr (std::is_same_v<StepType, RecoveryCurrentWal>) {
auto &wal_file = *wal_file_ptr_;
std::unique_lock transaction_guard(*transaction_engine_lock_);
if (wal_file &&
wal_file->SequenceNumber() == arg.current_wal_seq_num) {
wal_file->DisableFlushing();
transaction_guard.unlock();
DLOG(INFO) << "Sending current wal file";
replica_commit = ReplicateCurrentWal();
DLOG(INFO) << "CURRENT TIMESTAMP ON REPLICA: "
<< replica_commit;
wal_file->EnableFlushing();
}
} else if constexpr (std::is_same_v<StepType,
RecoveryFinalSnapshot>) {
DLOG(INFO) << "Snapshot timestamp is the latest";
auto response = TransferOnlySnapshot(arg.snapshot_timestamp);
if (response.success) {
replica_commit = response.current_commit_timestamp;
}
} else {
static_assert(always_false_v<T>,
"Missing type from variant visitor");
}
},
recovery_step);
}
if (last_commit_timestamp_.load() == replica_commit) {
replica_state_.store(ReplicaState::READY);
return;
}
}
}
uint64_t ReplicationClient::ReplicateCurrentWal() {
auto &wal_file = *wal_file_ptr_;
auto stream = TransferCurrentWalFile();
stream.AppendFilename(wal_file->Path().filename());
utils::InputFile file;
CHECK(file.Open(wal_file->Path())) << "Failed to open current WAL file!";
const auto [buffer, buffer_size] = wal_file->CurrentFileBuffer();
stream.AppendSize(file.GetSize() + buffer_size);
stream.AppendFileData(&file);
stream.AppendBufferData(buffer, buffer_size);
auto response = stream.Finalize();
return response.current_commit_timestamp;
}
/// This method tries to find the optimal path for recoverying a single replica.
/// Based on the last commit transfered to replica it tries to update the
/// replica using durability files - WALs and Snapshots. WAL files are much
/// smaller in size as they contain only the Deltas (changes) made during the
/// transactions while Snapshots contain all the data. For that reason we prefer
/// WALs as much as possible. As the WAL file that is currently being updated
/// can change during the process we ignore it as much as possible. Also, it
/// uses the transaction lock so lokcing it can be really expensive. After we
/// fetch the list of finalized WALs, we try to find the longest chain of
/// sequential WALs, starting from the latest one, that will update the recovery
/// with the all missed updates. If the WAL chain cannot be created, replica is
/// behind by a lot, so we use the regular recovery process, we send the latest
/// snapshot and all the necessary WAL files, starting from the newest WAL that
/// contains a timestamp before the snapshot. If we registered the existence of
/// the current WAL, we add the sequence number we read from it to the recovery
/// process. After all the other steps are finished, if the current WAL contains
/// the same sequence number, it's the same WAL we read while fetching the
/// recovery steps, so we can safely send it to the replica. There's also one
/// edge case, if MAIN instance restarted and the snapshot contained the last
/// change (creation of that snapshot) the latest timestamp is contained in it.
/// As no changes were made to the data, we only need to send the timestamp of
/// the snapshot so replica can set its last timestamp to that value.
std::vector<ReplicationClient::RecoveryStep>
ReplicationClient::GetRecoverySteps(
const uint64_t replica_commit,
utils::FileRetainer::FileLocker *file_locker) {
auto &wal_file = *wal_file_ptr_;
// First check if we can recover using the current wal file only
// otherwise save the seq_num of the current wal file
// This lock is also necessary to force the missed transaction to finish.
std::optional<uint64_t> current_wal_seq_num;
if (std::unique_lock transtacion_guard(*transaction_engine_lock_); wal_file) {
current_wal_seq_num.emplace(wal_file->SequenceNumber());
}
auto locker_acc = file_locker->Access();
auto wal_files =
durability::GetWalFiles(wal_directory_, uuid_, current_wal_seq_num);
CHECK(wal_files) << "Wal files could not be loaded";
auto snapshot_files =
durability::GetSnapshotFiles(snapshot_directory_, uuid_);
std::optional<durability::SnapshotDurabilityInfo> latest_snapshot;
if (!snapshot_files.empty()) {
std::sort(snapshot_files.begin(), snapshot_files.end());
latest_snapshot.emplace(std::move(snapshot_files.back()));
}
std::vector<RecoveryStep> recovery_steps;
// No finalized WAL files were found. This means the difference is contained
// inside the current WAL or the snapshot was loaded back without any WALs
// after.
if (wal_files->empty()) {
if (current_wal_seq_num) {
recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num});
} else {
CHECK(latest_snapshot);
locker_acc.AddFile(latest_snapshot->path);
recovery_steps.emplace_back(
RecoveryFinalSnapshot{latest_snapshot->start_timestamp});
}
return recovery_steps;
}
// Find the longest chain of WALs for recovery.
// The chain consists ONLY of sequential WALs.
auto rwal_it = wal_files->rbegin();
// if the last finalized WAL is before the replica commit
// then we can recovery only from current WAL or from snapshot
// if the main just recovered
if (rwal_it->to_timestamp <= replica_commit) {
if (current_wal_seq_num) {
recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num});
} else {
CHECK(latest_snapshot);
locker_acc.AddFile(latest_snapshot->path);
recovery_steps.emplace_back(
RecoveryFinalSnapshot{latest_snapshot->start_timestamp});
}
return recovery_steps;
}
++rwal_it;
uint64_t previous_seq_num{rwal_it->seq_num};
for (; rwal_it != wal_files->rend(); ++rwal_it) {
// If the difference between two consecutive wal files is not 0 or 1
// we have a missing WAL in our chain
if (previous_seq_num - rwal_it->seq_num > 1) {
break;
}
// Find first WAL that contains up to replica commit, i.e. WAL
// that is before the replica commit or conatins the replica commit
// as the last committed transaction.
if (replica_commit >= rwal_it->from_timestamp &&
replica_commit >= rwal_it->to_timestamp) {
// We want the WAL after because the replica already contains all the
// commits from this WAL
--rwal_it;
std::vector<std::filesystem::path> wal_chain;
auto distance_from_first = std::distance(rwal_it, wal_files->rend() - 1);
// We have managed to create WAL chain
// We need to lock these files and add them to the chain
for (auto result_wal_it = wal_files->begin() + distance_from_first;
result_wal_it != wal_files->end(); ++result_wal_it) {
locker_acc.AddFile(result_wal_it->path);
wal_chain.push_back(std::move(result_wal_it->path));
}
recovery_steps.emplace_back(std::in_place_type_t<RecoveryWals>{},
std::move(wal_chain));
if (current_wal_seq_num) {
recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num});
}
return recovery_steps;
}
previous_seq_num = rwal_it->seq_num;
}
CHECK(latest_snapshot) << "Invalid durability state, missing snapshot";
// We didn't manage to find a WAL chain, we need to send the latest snapshot
// with its WALs
locker_acc.AddFile(latest_snapshot->path);
recovery_steps.emplace_back(std::in_place_type_t<RecoverySnapshot>{},
std::move(latest_snapshot->path));
std::vector<std::filesystem::path> recovery_wal_files;
auto wal_it = wal_files->begin();
for (; wal_it != wal_files->end(); ++wal_it) {
// Assuming recovery process is correct the snashpot should
// always retain a single WAL that contains a transaction
// before its creation
if (latest_snapshot->start_timestamp < wal_it->to_timestamp) {
if (latest_snapshot->start_timestamp < wal_it->from_timestamp) {
CHECK(wal_it != wal_files->begin()) << "Invalid durability files state";
--wal_it;
}
break;
}
}
for (; wal_it != wal_files->end(); ++wal_it) {
locker_acc.AddFile(wal_it->path);
recovery_wal_files.push_back(std::move(wal_it->path));
}
// We only have a WAL before the snapshot
if (recovery_wal_files.empty()) {
locker_acc.AddFile(wal_files->back().path);
recovery_wal_files.push_back(std::move(wal_files->back().path));
}
recovery_steps.emplace_back(std::in_place_type_t<RecoveryWals>{},
std::move(recovery_wal_files));
if (current_wal_seq_num) {
recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num});
}
return recovery_steps;
}
////// ReplicaStream //////
ReplicationClient::ReplicaStream::ReplicaStream(ReplicationClient *self)
: self_(self), stream_(self_->rpc_client_.Stream<AppendDeltasRpc>()) {}
ReplicationClient::ReplicaStream::ReplicaStream(
ReplicationClient *self, const uint64_t previous_commit_timestamp,
const uint64_t current_seq_num)
: self_(self),
stream_(self_->rpc_client_.Stream<AppendDeltasRpc>(
previous_commit_timestamp, current_seq_num)) {}
void ReplicationClient::ReplicaStream::AppendDelta(
const Delta &delta, const Vertex &vertex, uint64_t final_commit_timestamp) {
@ -112,14 +439,13 @@ void ReplicationClient::ReplicaStream::AppendOperation(
properties, timestamp);
}
void ReplicationClient::ReplicaStream::Finalize() { stream_.AwaitResponse(); }
AppendDeltasRes ReplicationClient::ReplicaStream::Finalize() {
return stream_.AwaitResponse();
}
////// CurrentWalHandler //////
ReplicationClient::CurrentWalHandler::CurrentWalHandler(ReplicationClient *self)
: self_(self), stream_(self_->rpc_client_.Stream<WalFilesRpc>()) {
Encoder encoder(stream_.GetBuilder());
encoder.WriteUint(1);
}
: self_(self), stream_(self_->rpc_client_.Stream<CurrentWalRpc>()) {}
void ReplicationClient::CurrentWalHandler::AppendFilename(
const std::string &filename) {
@ -144,7 +470,7 @@ void ReplicationClient::CurrentWalHandler::AppendBufferData(
encoder.WriteBuffer(buffer, buffer_size);
}
void ReplicationClient::CurrentWalHandler::Finalize() {
stream_.AwaitResponse();
CurrentWalRes ReplicationClient::CurrentWalHandler::Finalize() {
return stream_.AwaitResponse();
}
} // namespace storage::replication

View File

@ -3,6 +3,7 @@
#include <atomic>
#include <condition_variable>
#include <thread>
#include <variant>
#include "rpc/client.hpp"
#include "storage/v2/config.hpp"
@ -15,6 +16,7 @@
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/replication/serialization.hpp"
#include "utils/file.hpp"
#include "utils/file_locker.hpp"
#include "utils/spin_lock.hpp"
#include "utils/synchronized.hpp"
#include "utils/thread_pool.hpp"
@ -27,15 +29,25 @@ enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY };
class ReplicationClient {
public:
ReplicationClient(std::string name, NameIdMapper *name_id_mapper,
Config::Items items, const io::network::Endpoint &endpoint,
bool use_ssl, ReplicationMode mode);
ReplicationClient(std::string name,
const std::atomic<uint64_t> &last_commit_timestamp,
NameIdMapper *name_id_mapper, Config::Items items,
utils::FileRetainer *file_retainer,
const std::filesystem::path &snapshot_directory,
const std::filesystem::path &wal_directory,
std::string_view uuid,
std::optional<durability::WalFile> *wal_file_ptr,
utils::SpinLock *transaction_engine_lock,
const io::network::Endpoint &endpoint, bool use_ssl,
ReplicationMode mode);
// Handler used for transfering the current transaction.
class ReplicaStream {
private:
friend class ReplicationClient;
explicit ReplicaStream(ReplicationClient *self);
explicit ReplicaStream(ReplicationClient *self,
uint64_t previous_commit_timestamp,
uint64_t current_seq_num);
public:
/// @throw rpc::RpcFailedException
@ -56,7 +68,7 @@ class ReplicationClient {
private:
/// @throw rpc::RpcFailedException
void Finalize();
AppendDeltasRes Finalize();
ReplicationClient *self_;
rpc::Client::StreamHandler<AppendDeltasRpc> stream_;
@ -79,14 +91,14 @@ class ReplicationClient {
void AppendBufferData(const uint8_t *buffer, size_t buffer_size);
/// @throw rpc::RpcFailedException
void Finalize();
CurrentWalRes Finalize();
private:
ReplicationClient *self_;
rpc::Client::StreamHandler<WalFilesRpc> stream_;
rpc::Client::StreamHandler<CurrentWalRpc> stream_;
};
bool StartTransactionReplication();
bool StartTransactionReplication(uint64_t current_wal_seq_num);
// Replication clients can be removed at any point
// so to avoid any complexity of checking if the client was removed whenever
@ -100,12 +112,17 @@ class ReplicationClient {
// Transfer the snapshot file.
// @param path Path of the snapshot file.
void TransferSnapshot(const std::filesystem::path &path);
SnapshotRes TransferSnapshot(const std::filesystem::path &path);
// Transfer the timestamp of the snapshot if it's the only difference
// between main and replica
OnlySnapshotRes TransferOnlySnapshot(uint64_t snapshot_timestamp);
CurrentWalHandler TransferCurrentWalFile() { return CurrentWalHandler{this}; }
// Transfer the WAL files
void TransferWalFiles(const std::vector<std::filesystem::path> &wal_files);
WalFilesRes TransferWalFiles(
const std::vector<std::filesystem::path> &wal_files);
const auto &Name() const { return name_; }
@ -114,9 +131,44 @@ class ReplicationClient {
private:
void FinalizeTransactionReplicationInternal();
void RecoverReplica(uint64_t replica_commit);
uint64_t ReplicateCurrentWal();
using RecoveryWals = std::vector<std::filesystem::path>;
struct RecoveryCurrentWal {
uint64_t current_wal_seq_num;
explicit RecoveryCurrentWal(const uint64_t current_wal_seq_num)
: current_wal_seq_num(current_wal_seq_num) {}
};
using RecoverySnapshot = std::filesystem::path;
struct RecoveryFinalSnapshot {
uint64_t snapshot_timestamp;
explicit RecoveryFinalSnapshot(const uint64_t snapshot_timestamp)
: snapshot_timestamp(snapshot_timestamp) {}
};
using RecoveryStep = std::variant<RecoverySnapshot, RecoveryWals,
RecoveryCurrentWal, RecoveryFinalSnapshot>;
std::vector<RecoveryStep> GetRecoverySteps(
uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker);
void InitializeClient();
std::string name_;
// storage info
const std::atomic<uint64_t> &last_commit_timestamp_;
NameIdMapper *name_id_mapper_;
Config::Items items_;
utils::FileRetainer *file_retainer_;
const std::filesystem::path &snapshot_directory_;
const std::filesystem::path &wal_directory_;
std::string_view uuid_;
std::optional<durability::WalFile> *wal_file_ptr_;
utils::SpinLock *transaction_engine_lock_;
communication::ClientContext rpc_context_;
rpc::Client rpc_client_;
@ -125,7 +177,7 @@ class ReplicationClient {
utils::SpinLock client_lock_;
utils::ThreadPool thread_pool_{1};
std::atomic<ReplicaState> replica_state_{ReplicaState::READY};
std::atomic<ReplicaState> replica_state_;
};
} // namespace storage::replication

View File

@ -15,29 +15,40 @@ cpp<#
(lcp:define-rpc append-deltas
;; The actual deltas are sent as additional data using the RPC client's
;; streaming API for additional data.
(:request ())
(:request
((previous-commit-timestamp :uint64_t)
(seq-num :uint64_t)))
(:response
((success :bool)
(term :uint64_t))))
(current-commit-timestamp :uint64_t))))
(lcp:define-rpc heartbeat
(:request
((leader-id :uint16_t)
(term :uint64_t)))
(:request ())
(:response
((success :bool)
(term :uint64_t))))
((current-commit-timestamp :uint64_t))))
(lcp:define-rpc snapshot
(:request ())
(:response
((success :bool)
(term :uint64_t))))
(current-commit-timestamp :uint64_t))))
(lcp:define-rpc only-snapshot
(:request ((snapshot-timestamp :uint64_t)))
(:response
((success :bool)
(current-commit-timestamp :uint64_t))))
(lcp:define-rpc wal-files
(:request ((file-number :uint64_t)))
(:response
((success :bool)
(current-commit-timestamp :uint64_t))))
(lcp:define-rpc current-wal
(:request ())
(:response
((success :bool)
(term :uint64_t))))
(current-commit-timestamp :uint64_t))))
(lcp:pop-namespace) ;; storage

View File

@ -129,24 +129,17 @@ bool Decoder::SkipPropertyValue() {
}
std::optional<std::filesystem::path> Decoder::ReadFile(
const std::filesystem::path &directory) {
const std::filesystem::path &directory, const std::string &suffix) {
CHECK(std::filesystem::exists(directory) &&
std::filesystem::is_directory(directory))
<< "Sent path for streamed files should be a valid directory!";
utils::OutputFile file;
const auto maybe_filename = ReadString();
CHECK(maybe_filename) << "Filename missing for the file";
const auto &filename = *maybe_filename;
const auto filename = *maybe_filename + suffix;
auto path = directory / filename;
// Check if the file already exists so we don't overwrite it
const bool file_exists = std::filesystem::exists(path);
// TODO (antonio2368): Maybe append filename with custom suffix so we have
// both copies?
if (!file_exists) {
file.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING);
}
file.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING);
std::optional<size_t> maybe_file_size = ReadUint();
CHECK(maybe_file_size) << "File size missing";
auto file_size = *maybe_file_size;
@ -154,9 +147,7 @@ std::optional<std::filesystem::path> Decoder::ReadFile(
while (file_size > 0) {
const auto chunk_size = std::min(file_size, utils::kFileBufferSize);
reader_->Load(buffer, chunk_size);
if (!file_exists) {
file.Write(buffer, chunk_size);
}
file.Write(buffer, chunk_size);
file_size -= chunk_size;
}
file.Close();

View File

@ -58,9 +58,10 @@ class Decoder final : public durability::BaseDecoder {
/// Read the file and save it inside the specified directory.
/// @param directory Directory which will contain the read file.
/// @param suffix Suffix to be added to the received file's filename.
/// @return If the read was successful, path to the read file.
std::optional<std::filesystem::path> ReadFile(
const std::filesystem::path &directory);
const std::filesystem::path &directory, const std::string &suffix = "");
private:
slk::Reader *reader_;

View File

@ -30,6 +30,7 @@
#ifdef MG_ENTERPRISE
DEFINE_bool(main, false, "Set to true to be the main");
DEFINE_bool(replica, false, "Set to true to be the replica");
DEFINE_bool(async_replica, false, "Set to true to be the replica");
#endif
namespace storage {
@ -361,6 +362,13 @@ Storage::Storage(Config config)
vertex_id_ = info->next_vertex_id;
edge_id_ = info->next_edge_id;
timestamp_ = std::max(timestamp_, info->next_timestamp);
#if MG_ENTERPRISE
// After we finished the recovery, the info->next_timestamp will
// basically be
// `std::max(latest_snapshot.start_timestamp + 1, latest_wal.to_timestamp
// + 1)` So the last commited transaction is one before that.
last_commit_timestamp_ = timestamp_ - 1;
#endif
}
} else if (config_.durability.snapshot_wal_mode !=
Config::Durability::SnapshotWalMode::DISABLED ||
@ -411,9 +419,14 @@ Storage::Storage(Config config)
// a query.
if (FLAGS_main) {
SetReplicationRole<ReplicationRole::MAIN>();
RegisterReplica("REPLICA_SYNC", io::network::Endpoint{"127.0.0.1", 10000});
RegisterReplica("REPLICA_ASYNC", io::network::Endpoint{"127.0.0.1", 10002});
} else if (FLAGS_replica) {
SetReplicationRole<ReplicationRole::REPLICA>(
io::network::Endpoint{"127.0.0.1", 1000});
io::network::Endpoint{"127.0.0.1", 10000});
} else if (FLAGS_async_replica) {
SetReplicationRole<ReplicationRole::REPLICA>(
io::network::Endpoint{"127.0.0.1", 10002});
}
#endif
}
@ -429,7 +442,10 @@ Storage::~Storage() {
replication_clients_.WithLock([&](auto &clients) { clients.clear(); });
}
#endif
wal_file_ = std::nullopt;
if (wal_file_) {
wal_file_->FinalizeWal();
wal_file_ = std::nullopt;
}
if (config_.durability.snapshot_wal_mode !=
Config::Durability::SnapshotWalMode::DISABLED) {
snapshot_runner_.Stop();
@ -853,16 +869,8 @@ EdgeTypeId Storage::Accessor::NameToEdgeType(const std::string_view &name) {
void Storage::Accessor::AdvanceCommand() { ++transaction_.command_id; }
#ifdef MG_ENTERPRISE
utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit() {
return Commit(std::nullopt);
}
utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit(
std::optional<uint64_t> desired_commit_timestamp) {
#else
utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit() {
#endif
const std::optional<uint64_t> desired_commit_timestamp) {
CHECK(is_transaction_active_) << "The transaction is already terminated!";
CHECK(!transaction_.must_abort) << "The transaction can't be committed!";
@ -899,17 +907,7 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit() {
{
std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_);
#ifdef MG_ENTERPRISE
if (!desired_commit_timestamp) {
commit_timestamp = storage_->timestamp_++;
} else {
commit_timestamp = *desired_commit_timestamp;
storage_->timestamp_ =
std::max(storage_->timestamp_, *desired_commit_timestamp + 1);
}
#else
commit_timestamp = storage_->timestamp_++;
#endif
commit_timestamp = storage_->CommitTimestamp(desired_commit_timestamp);
// Before committing and validating vertices against unique constraints,
// we have to update unique constraints with the vertices that are going
@ -949,7 +947,17 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit() {
// written before actually committing the transaction (before setting
// the commit timestamp) so that no other transaction can see the
// modifications before they are written to disk.
#ifdef MG_ENTERPRISE
// Replica can log only the write transaction received from Main
// so the Wal files are consistent
if (storage_->replication_role_ == ReplicationRole::MAIN ||
desired_commit_timestamp.has_value()) {
storage_->AppendToWal(transaction_, commit_timestamp);
}
#else
storage_->AppendToWal(transaction_, commit_timestamp);
#endif
// Take committed_transactions lock while holding the engine lock to
// make sure that committed transactions are sorted by the commit
@ -962,6 +970,15 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit() {
<< "Invalid database state!";
transaction_.commit_timestamp->store(commit_timestamp,
std::memory_order_release);
#ifdef MG_ENTERPRISE
// Replica can only update the last commit timestamp with
// the commits received from main.
if (storage_->replication_role_ == ReplicationRole::MAIN ||
desired_commit_timestamp.has_value()) {
// Update the last commit timestamp
storage_->last_commit_timestamp_.store(commit_timestamp);
}
#endif
// Release engine lock because we don't have to hold it anymore
// and emplace back could take a long time.
engine_guard.unlock();
@ -1189,7 +1206,8 @@ EdgeTypeId Storage::NameToEdgeType(const std::string_view &name) {
return EdgeTypeId::FromUint(name_id_mapper_.NameToId(name));
}
bool Storage::CreateIndex(LabelId label) {
bool Storage::CreateIndex(
LabelId label, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!indices_.label_index.CreateIndex(label, vertices_.access()))
return false;
@ -1201,40 +1219,65 @@ bool Storage::CreateIndex(LabelId label) {
// next regular transaction after this operation. This prevents collisions of
// commit timestamps between non-transactional operations and transactional
// operations.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_CREATE, label, {},
timestamp_);
commit_timestamp);
commit_log_.MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return true;
}
bool Storage::CreateIndex(LabelId label, PropertyId property) {
bool Storage::CreateIndex(
LabelId label, PropertyId property,
const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!indices_.label_property_index.CreateIndex(label, property,
vertices_.access()))
return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE,
label, {property}, timestamp_);
label, {property}, commit_timestamp);
commit_log_.MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return true;
}
bool Storage::DropIndex(LabelId label) {
bool Storage::DropIndex(
LabelId label, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!indices_.label_index.DropIndex(label)) return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_DROP, label, {},
timestamp_);
commit_timestamp);
commit_log_.MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return true;
}
bool Storage::DropIndex(LabelId label, PropertyId property) {
bool Storage::DropIndex(
LabelId label, PropertyId property,
const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!indices_.label_property_index.DropIndex(label, property)) return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP,
label, {property}, timestamp_);
label, {property}, commit_timestamp);
commit_log_.MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return true;
}
@ -1245,32 +1288,47 @@ IndicesInfo Storage::ListAllIndices() const {
}
utils::BasicResult<ConstraintViolation, bool>
Storage::CreateExistenceConstraint(LabelId label, PropertyId property) {
Storage::CreateExistenceConstraint(
LabelId label, PropertyId property,
const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
auto ret = ::storage::CreateExistenceConstraint(&constraints_, label,
property, vertices_.access());
if (ret.HasError() || !ret.GetValue()) return ret;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE,
label, {property}, timestamp_);
label, {property}, commit_timestamp);
commit_log_.MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return true;
}
bool Storage::DropExistenceConstraint(LabelId label, PropertyId property) {
bool Storage::DropExistenceConstraint(
LabelId label, PropertyId property,
const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!::storage::DropExistenceConstraint(&constraints_, label, property))
return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP,
label, {property}, timestamp_);
label, {property}, commit_timestamp);
commit_log_.MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return true;
}
utils::BasicResult<ConstraintViolation, UniqueConstraints::CreationStatus>
Storage::CreateUniqueConstraint(LabelId label,
const std::set<PropertyId> &properties) {
Storage::CreateUniqueConstraint(
LabelId label, const std::set<PropertyId> &properties,
const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
auto ret = constraints_.unique_constraints.CreateConstraint(
label, properties, vertices_.access());
@ -1280,13 +1338,19 @@ Storage::CreateUniqueConstraint(LabelId label,
}
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE,
label, properties, timestamp_);
label, properties, commit_timestamp);
commit_log_.MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return UniqueConstraints::CreationStatus::SUCCESS;
}
UniqueConstraints::DeletionStatus Storage::DropUniqueConstraint(
LabelId label, const std::set<PropertyId> &properties) {
LabelId label, const std::set<PropertyId> &properties,
const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
auto ret = constraints_.unique_constraints.DropConstraint(label, properties);
if (ret != UniqueConstraints::DeletionStatus::SUCCESS) {
@ -1294,8 +1358,13 @@ UniqueConstraints::DeletionStatus Storage::DropUniqueConstraint(
}
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP, label,
properties, timestamp_);
properties, commit_timestamp);
commit_log_.MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return UniqueConstraints::DeletionStatus::SUCCESS;
}
@ -1352,15 +1421,7 @@ Transaction Storage::CreateTransaction() {
{
std::lock_guard<utils::SpinLock> guard(engine_lock_);
transaction_id = transaction_id_++;
#ifdef MG_ENTERPRISE
if (replication_role_.load() != ReplicationRole::REPLICA) {
start_timestamp = timestamp_++;
} else {
start_timestamp = timestamp_;
}
#else
start_timestamp = timestamp_++;
#endif
}
return {transaction_id, start_timestamp};
}
@ -1486,7 +1547,7 @@ void Storage::CollectGarbage() {
break;
}
case PreviousPtr::Type::DELTA: {
if (prev.delta->timestamp->load(std::memory_order_release) ==
if (prev.delta->timestamp->load(std::memory_order_acquire) ==
commit_timestamp) {
// The delta that is newer than this one is also a delta from this
// transaction. We skip the current delta and will remove it as a
@ -1601,7 +1662,7 @@ bool Storage::InitializeWalFile() {
return false;
if (!wal_file_) {
wal_file_.emplace(wal_directory_, uuid_, config_.items, &name_id_mapper_,
wal_seq_num_++);
wal_seq_num_++, &file_retainer_);
}
return true;
}
@ -1615,6 +1676,7 @@ void Storage::FinalizeWalFile() {
}
if (wal_file_->GetSize() / 1024 >=
config_.durability.wal_file_size_kibibytes) {
wal_file_->FinalizeWal();
wal_file_ = std::nullopt;
wal_unsynced_transactions_ = 0;
} else {
@ -1641,11 +1703,7 @@ void Storage::AppendToWal(const Transaction &transaction,
if (replication_role_.load() == ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
try {
client.StartTransactionReplication();
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
}
client.StartTransactionReplication(wal_file_->SequenceNumber());
}
});
}
@ -1669,13 +1727,9 @@ void Storage::AppendToWal(const Transaction &transaction,
#ifdef MG_ENTERPRISE
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
try {
client.IfStreamingTransaction([&](auto &stream) {
stream.AppendDelta(*delta, parent, final_commit_timestamp);
});
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
}
client.IfStreamingTransaction([&](auto &stream) {
stream.AppendDelta(*delta, parent, final_commit_timestamp);
});
}
});
#endif
@ -1815,14 +1869,10 @@ void Storage::AppendToWal(const Transaction &transaction,
#ifdef MG_ENTERPRISE
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
try {
client.IfStreamingTransaction([&](auto &stream) {
stream.AppendTransactionEnd(final_commit_timestamp);
});
client.FinalizeTransactionReplication();
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
}
client.IfStreamingTransaction([&](auto &stream) {
stream.AppendTransactionEnd(final_commit_timestamp);
});
client.FinalizeTransactionReplication();
}
});
#endif
@ -1840,16 +1890,12 @@ void Storage::AppendToWal(durability::StorageGlobalOperation operation,
if (replication_role_.load() == ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
try {
client.StartTransactionReplication();
client.IfStreamingTransaction([&](auto &stream) {
stream.AppendOperation(operation, label, properties,
final_commit_timestamp);
});
client.FinalizeTransactionReplication();
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
}
client.StartTransactionReplication(wal_file_->SequenceNumber());
client.IfStreamingTransaction([&](auto &stream) {
stream.AppendOperation(operation, label, properties,
final_commit_timestamp);
});
client.FinalizeTransactionReplication();
}
});
}
@ -1883,7 +1929,47 @@ void Storage::CreateSnapshot() {
commit_log_.MarkFinished(transaction.start_timestamp);
}
uint64_t Storage::CommitTimestamp(
const std::optional<uint64_t> desired_commit_timestamp) {
#ifdef MG_ENTERPRISE
if (!desired_commit_timestamp) {
return timestamp_++;
} else {
const auto commit_timestamp = *desired_commit_timestamp;
timestamp_ = std::max(timestamp_, *desired_commit_timestamp + 1);
return commit_timestamp;
}
#else
return timestamp_++;
#endif
}
#ifdef MG_ENTERPRISE
std::pair<durability::WalInfo, std::filesystem::path> Storage::LoadWal(
replication::Decoder *decoder,
durability::RecoveredIndicesAndConstraints *indices_constraints) {
auto maybe_wal_path = decoder->ReadFile(wal_directory_, "_MAIN");
CHECK(maybe_wal_path) << "Failed to load WAL!";
DLOG(INFO) << "Received WAL saved to " << *maybe_wal_path;
try {
auto wal_info = durability::ReadWalInfo(*maybe_wal_path);
auto info = durability::LoadWal(
*maybe_wal_path, indices_constraints, last_commit_timestamp_,
&vertices_, &edges_, &name_id_mapper_, &edge_count_, config_.items);
vertex_id_ = std::max(vertex_id_.load(), info.next_vertex_id);
edge_id_ = std::max(edge_id_.load(), info.next_edge_id);
timestamp_ = std::max(timestamp_, info.next_timestamp);
if (info.next_timestamp != 0) {
last_commit_timestamp_ = info.next_timestamp - 1;
}
DLOG(INFO) << *maybe_wal_path << " loaded successfully";
return {std::move(wal_info), std::move(*maybe_wal_path)};
} catch (const durability::RecoveryFailure &e) {
LOG(FATAL) << "Couldn't recover WAL deltas from " << *maybe_wal_path
<< " because of: " << e.what();
}
}
void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
replication_server_.emplace();
@ -1897,15 +1983,84 @@ void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
replication_server_->rpc_server.emplace(
endpoint, &*replication_server_->rpc_server_context,
/* workers_count = */ 1);
replication_server_->rpc_server->Register<HeartbeatRpc>(
[this](auto *req_reader, auto *res_builder) {
HeartbeatReq req;
slk::Load(&req, req_reader);
DLOG(INFO) << "Received HeartbeatRpc:";
HeartbeatRes res{last_commit_timestamp_.load()};
slk::Save(res, res_builder);
});
replication_server_->rpc_server->Register<
AppendDeltasRpc>([this, endpoint = std::move(endpoint)](
auto *req_reader, auto *res_builder) {
AppendDeltasRpc>([this](auto *req_reader, auto *res_builder) {
AppendDeltasReq req;
slk::Load(&req, req_reader);
DLOG(INFO) << "Received AppendDeltasRpc:";
constexpr auto is_transaction_complete =
[](const durability::WalDeltaData::Type delta_type) {
switch (delta_type) {
case durability::WalDeltaData::Type::TRANSACTION_END:
case durability::WalDeltaData::Type::LABEL_INDEX_CREATE:
case durability::WalDeltaData::Type::LABEL_INDEX_DROP:
case durability::WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE:
case durability::WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP:
case durability::WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE:
case durability::WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP:
case durability::WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE:
case durability::WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP:
return true;
default:
return false;
}
};
replication::Decoder decoder(req_reader);
const auto read_delta =
[&]() -> std::pair<uint64_t, durability::WalDeltaData> {
try {
auto timestamp = ReadWalDeltaHeader(&decoder);
DLOG(INFO) << " Timestamp " << timestamp;
auto delta = ReadWalDeltaData(&decoder);
return {timestamp, delta};
} catch (const slk::SlkReaderException &) {
throw utils::BasicException("Missing data!");
} catch (const durability::RecoveryFailure &) {
throw utils::BasicException("Invalid data!");
}
};
if (req.previous_commit_timestamp != last_commit_timestamp_.load()) {
// Empty the stream
bool transaction_complete = false;
while (!transaction_complete) {
DLOG(INFO) << "Skipping delta";
const auto [timestamp, delta] = read_delta();
transaction_complete = is_transaction_complete(delta.type);
}
AppendDeltasRes res{false, last_commit_timestamp_.load()};
slk::Save(res, res_builder);
return;
}
if (wal_file_) {
if (req.seq_num > wal_file_->SequenceNumber()) {
wal_file_->FinalizeWal();
wal_file_.reset();
wal_seq_num_ = req.seq_num;
} else {
CHECK(wal_file_->SequenceNumber() == req.seq_num)
<< "Invalid sequence number of current wal file";
wal_seq_num_ = req.seq_num + 1;
}
} else {
wal_seq_num_ = req.seq_num;
}
auto edge_acc = edges_.access();
auto vertex_acc = vertices_.access();
@ -1924,19 +2079,8 @@ void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
bool transaction_complete = false;
for (uint64_t i = 0; !transaction_complete; ++i) {
uint64_t timestamp;
durability::WalDeltaData delta;
try {
timestamp = ReadWalDeltaHeader(&decoder);
DLOG(INFO) << " Delta " << i;
DLOG(INFO) << " Timestamp " << timestamp;
delta = ReadWalDeltaData(&decoder);
} catch (const slk::SlkReaderException &) {
throw utils::BasicException("Missing data!");
} catch (const durability::RecoveryFailure &) {
throw utils::BasicException("Invalid data!");
}
DLOG(INFO) << " Delta " << i;
const auto [timestamp, delta] = read_delta();
switch (delta.type) {
case durability::WalDeltaData::Type::VERTEX_CREATE: {
@ -2142,18 +2286,17 @@ void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
if (ret.HasError())
throw utils::BasicException("Invalid transaction!");
commit_timestamp_and_accessor = std::nullopt;
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::LABEL_INDEX_CREATE: {
DLOG(INFO) << " Create label index on :"
<< delta.operation_label.label;
// Need to send the timestamp
if (commit_timestamp_and_accessor)
throw utils::BasicException("Invalid transaction!");
if (!CreateIndex(NameToLabel(delta.operation_label.label)))
if (!CreateIndex(NameToLabel(delta.operation_label.label), timestamp))
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::LABEL_INDEX_DROP: {
@ -2161,9 +2304,8 @@ void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
<< delta.operation_label.label;
if (commit_timestamp_and_accessor)
throw utils::BasicException("Invalid transaction!");
if (!DropIndex(NameToLabel(delta.operation_label.label)))
if (!DropIndex(NameToLabel(delta.operation_label.label), timestamp))
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE: {
@ -2174,9 +2316,9 @@ void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
throw utils::BasicException("Invalid transaction!");
if (!CreateIndex(
NameToLabel(delta.operation_label_property.label),
NameToProperty(delta.operation_label_property.property)))
NameToProperty(delta.operation_label_property.property),
timestamp))
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP: {
@ -2187,9 +2329,9 @@ void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
throw utils::BasicException("Invalid transaction!");
if (!DropIndex(
NameToLabel(delta.operation_label_property.label),
NameToProperty(delta.operation_label_property.property)))
NameToProperty(delta.operation_label_property.property),
timestamp))
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE: {
@ -2200,10 +2342,10 @@ void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
throw utils::BasicException("Invalid transaction!");
auto ret = CreateExistenceConstraint(
NameToLabel(delta.operation_label_property.label),
NameToProperty(delta.operation_label_property.property));
NameToProperty(delta.operation_label_property.property),
timestamp);
if (!ret.HasValue() || !ret.GetValue())
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: {
@ -2214,9 +2356,9 @@ void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
throw utils::BasicException("Invalid transaction!");
if (!DropExistenceConstraint(
NameToLabel(delta.operation_label_property.label),
NameToProperty(delta.operation_label_property.property)))
NameToProperty(delta.operation_label_property.property),
timestamp))
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE: {
@ -2232,11 +2374,11 @@ void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
properties.emplace(NameToProperty(prop));
}
auto ret = CreateUniqueConstraint(
NameToLabel(delta.operation_label_properties.label), properties);
NameToLabel(delta.operation_label_properties.label), properties,
timestamp);
if (!ret.HasValue() ||
ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS)
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP: {
@ -2252,19 +2394,20 @@ void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
properties.emplace(NameToProperty(prop));
}
auto ret = DropUniqueConstraint(
NameToLabel(delta.operation_label_properties.label), properties);
NameToLabel(delta.operation_label_properties.label), properties,
timestamp);
if (ret != UniqueConstraints::DeletionStatus::SUCCESS)
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
}
transaction_complete = is_transaction_complete(delta.type);
}
if (commit_timestamp_and_accessor)
throw utils::BasicException("Invalid data!");
AppendDeltasRes res;
AppendDeltasRes res{true, last_commit_timestamp_.load()};
slk::Save(res, res_builder);
});
replication_server_->rpc_server->Register<SnapshotRpc>(
@ -2313,9 +2456,44 @@ void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
// TODO (antonio2368): What to do if the sent snapshot is invalid
LOG(WARNING) << "Couldn't load the snapshot because of: " << e.what();
}
last_commit_timestamp_ = timestamp_ - 1;
storage_guard.unlock();
SnapshotRes res;
SnapshotRes res{true, last_commit_timestamp_.load()};
slk::Save(res, res_builder);
// Delete other durability files
auto snapshot_files =
durability::GetSnapshotFiles(snapshot_directory_, uuid_);
for (const auto &[path, uuid, _] : snapshot_files) {
if (path != *maybe_snapshot_path) {
file_retainer_.DeleteFile(path);
}
}
auto wal_files = durability::GetWalFiles(wal_directory_, uuid_);
if (wal_files) {
for (const auto &[seq_num, from_timestamp, to_timestamp, _, path] :
*wal_files) {
file_retainer_.DeleteFile(path);
}
wal_file_.reset();
}
});
replication_server_->rpc_server->Register<OnlySnapshotRpc>(
[this](auto *req_reader, auto *res_builder) {
DLOG(INFO) << "Received OnlySnapshotRpc";
OnlySnapshotReq req;
slk::Load(&req, req_reader);
CHECK(last_commit_timestamp_.load() < req.snapshot_timestamp)
<< "Invalid snapshot timestamp, it should be less than the last"
"commited timestamp";
last_commit_timestamp_.store(req.snapshot_timestamp);
SnapshotRes res{true, last_commit_timestamp_.load()};
slk::Save(res, res_builder);
});
replication_server_->rpc_server->Register<WalFilesRpc>(
@ -2324,48 +2502,73 @@ void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
WalFilesReq req;
slk::Load(&req, req_reader);
const auto wal_file_number = req.file_number;
DLOG(INFO) << "Received WAL files: " << wal_file_number;
replication::Decoder decoder(req_reader);
utils::EnsureDirOrDie(wal_directory_);
const auto maybe_wal_file_number = decoder.ReadUint();
CHECK(maybe_wal_file_number)
<< "Failed to read number of received WAL files!";
const auto wal_file_number = *maybe_wal_file_number;
DLOG(INFO) << "Received WAL files: " << wal_file_number;
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
durability::RecoveredIndicesAndConstraints indices_constraints;
for (auto i = 0; i < wal_file_number; ++i) {
const auto maybe_wal_path = decoder.ReadFile(wal_directory_);
CHECK(maybe_wal_path) << "Failed to load WAL!";
DLOG(INFO) << "Received WAL saved to " << *maybe_wal_path;
// TODO (antonio2368): Use timestamp_ for now, but the timestamp_ can
// increase by running read queries so define variable that will keep
// last received timestamp
try {
auto wal_info = durability::ReadWalInfo(*maybe_wal_path);
if (wal_info.seq_num == 0) {
uuid_ = wal_info.uuid;
}
auto info = durability::LoadWal(
*maybe_wal_path, &indices_constraints, timestamp_, &vertices_,
&edges_, &name_id_mapper_, &edge_count_, config_.items);
vertex_id_ = std::max(vertex_id_.load(), info.next_vertex_id);
edge_id_ = std::max(edge_id_.load(), info.next_edge_id);
timestamp_ = std::max(timestamp_, info.next_timestamp);
DLOG(INFO) << *maybe_wal_path << " loaded successfully";
} catch (const durability::RecoveryFailure &e) {
LOG(FATAL) << "Couldn't recover WAL deltas from " << *maybe_wal_path
<< " because of: " << e.what();
}
auto [wal_info, path] = LoadWal(&decoder, &indices_constraints);
if (wal_info.seq_num == 0) {
uuid_ = wal_info.uuid;
}
// Check the seq number of the first wal file to see if it's the
// finalized form of the current wal on replica
if (wal_file_) {
if (wal_file_->SequenceNumber() == wal_info.seq_num &&
wal_file_->Path() != path) {
wal_file_->DeleteWal();
}
wal_file_.reset();
}
for (auto i = 1; i < wal_file_number; ++i) {
LoadWal(&decoder, &indices_constraints);
}
durability::RecoverIndicesAndConstraints(indices_constraints, &indices_,
&constraints_, &vertices_);
storage_guard.unlock();
WalFilesRes res;
WalFilesRes res{true, last_commit_timestamp_.load()};
slk::Save(res, res_builder);
});
replication_server_->rpc_server->Register<CurrentWalRpc>(
[this](auto *req_reader, auto *res_builder) {
DLOG(INFO) << "Received CurrentWalRpc";
CurrentWalReq req;
slk::Load(&req, req_reader);
replication::Decoder decoder(req_reader);
utils::EnsureDirOrDie(wal_directory_);
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
durability::RecoveredIndicesAndConstraints indices_constraints;
auto [wal_info, path] = LoadWal(&decoder, &indices_constraints);
if (wal_info.seq_num == 0) {
uuid_ = wal_info.uuid;
}
if (wal_file_ && wal_file_->SequenceNumber() == wal_info.seq_num &&
wal_file_->Path() != path) {
// Delete the old wal file
file_retainer_.DeleteFile(wal_file_->Path());
}
CHECK(config_.durability.snapshot_wal_mode ==
Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL);
wal_file_.emplace(std::move(path), config_.items, &name_id_mapper_,
wal_info.seq_num, wal_info.from_timestamp,
wal_info.to_timestamp, wal_info.num_deltas,
&file_retainer_);
durability::RecoverIndicesAndConstraints(indices_constraints, &indices_,
&constraints_, &vertices_);
storage_guard.unlock();
CurrentWalRes res{true, last_commit_timestamp_.load()};
slk::Save(res, res_builder);
});
replication_server_->rpc_server->Start();
@ -2381,87 +2584,16 @@ void Storage::RegisterReplica(
// We can safely add new elements to the list because it doesn't validate
// existing references/iteratos
auto &client = replication_clients_.WithLock([&](auto &clients) -> auto & {
replication_clients_.WithLock([&](auto &clients) {
if (std::any_of(clients.begin(), clients.end(),
[&](auto &client) { return client.Name() == name; })) {
throw utils::BasicException("Replica with a same name already exists!");
}
clients.emplace_back(std::move(name), &name_id_mapper_, config_.items,
endpoint, false, replication_mode);
return clients.back();
clients.emplace_back(std::move(name), last_commit_timestamp_,
&name_id_mapper_, config_.items, &file_retainer_,
snapshot_directory_, wal_directory_, uuid_, &wal_file_,
&engine_lock_, endpoint, false, replication_mode);
});
// Recovery
auto recovery_locker = file_retainer_.AddLocker();
std::optional<std::filesystem::path> snapshot_file;
std::vector<std::filesystem::path> wal_files;
std::optional<uint64_t> latest_seq_num;
{
auto recovery_locker_acc = recovery_locker.Access();
// For now we assume we need to send the latest snapshot
auto snapshot_files =
durability::GetSnapshotFiles(snapshot_directory_, uuid_);
if (!snapshot_files.empty()) {
std::sort(snapshot_files.begin(), snapshot_files.end());
// TODO (antonio2368): Send the last snapshot for now
// check if additional logic is necessary
// Also, prevent the deletion of the snapshot file and the required
// WALs
snapshot_file.emplace(std::move(snapshot_files.back().first));
recovery_locker_acc.AddFile(*snapshot_file);
}
{
// So the wal_file_ isn't overwritten
std::unique_lock engine_guard(engine_lock_);
if (wal_file_) {
// For now let's disable flushing until we send the current WAL
wal_file_->DisableFlushing();
latest_seq_num = wal_file_->SequenceNumber();
}
}
auto maybe_wal_files =
durability::GetWalFiles(wal_directory_, uuid_, latest_seq_num);
CHECK(maybe_wal_files) << "Failed to find WAL files";
auto &wal_files_with_seq = *maybe_wal_files;
std::optional<uint64_t> previous_seq_num;
for (const auto &[seq_num, from_timestamp, to_timestamp, path] :
wal_files_with_seq) {
if (previous_seq_num && *previous_seq_num + 1 != seq_num) {
LOG(FATAL) << "You are missing a WAL file with the sequence number "
<< *previous_seq_num + 1 << "!";
}
previous_seq_num = seq_num;
wal_files.push_back(path);
recovery_locker_acc.AddFile(path);
}
}
if (snapshot_file) {
DLOG(INFO) << "Sending the latest snapshot file: " << *snapshot_file;
client.TransferSnapshot(*snapshot_file);
}
if (!wal_files.empty()) {
DLOG(INFO) << "Sending the latest wal files";
client.TransferWalFiles(wal_files);
}
// We have a current WAL
if (latest_seq_num) {
auto stream = client.TransferCurrentWalFile();
stream.AppendFilename(wal_file_->Path().filename());
utils::InputFile file;
CHECK(file.Open(wal_file_->Path())) << "Failed to open current WAL file!";
const auto [buffer, buffer_size] = wal_file_->CurrentFileBuffer();
stream.AppendSize(file.GetSize() + buffer_size);
stream.AppendFileData(&file);
stream.AppendBufferData(buffer, buffer_size);
stream.Finalize();
wal_file_->EnableFlushing();
}
}
void Storage::UnregisterReplica(const std::string_view name) {

View File

@ -8,6 +8,7 @@
#include "storage/v2/commit_log.hpp"
#include "storage/v2/config.hpp"
#include "storage/v2/constraints.hpp"
#include "storage/v2/durability/metadata.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/edge_accessor.hpp"
@ -312,7 +313,8 @@ class Storage final {
/// transaction violate an existence or unique constraint. In that case the
/// transaction is automatically aborted. Otherwise, void is returned.
/// @throw std::bad_alloc
utils::BasicResult<ConstraintViolation, void> Commit();
utils::BasicResult<ConstraintViolation, void> Commit(
std::optional<uint64_t> desired_commit_timestamp = {});
/// @throw std::bad_alloc
void Abort();
@ -325,10 +327,6 @@ class Storage final {
/// @throw std::bad_alloc
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to,
EdgeTypeId edge_type, storage::Gid gid);
/// @throw std::bad_alloc
utils::BasicResult<ConstraintViolation, void> Commit(
std::optional<uint64_t> desired_commit_timestamp);
#endif
Storage *storage_;
@ -354,14 +352,18 @@ class Storage final {
EdgeTypeId NameToEdgeType(const std::string_view &name);
/// @throw std::bad_alloc
bool CreateIndex(LabelId label);
bool CreateIndex(LabelId label,
std::optional<uint64_t> desired_commit_timestamp = {});
/// @throw std::bad_alloc
bool CreateIndex(LabelId label, PropertyId property);
bool CreateIndex(LabelId label, PropertyId property,
std::optional<uint64_t> desired_commit_timestamp = {});
bool DropIndex(LabelId label);
bool DropIndex(LabelId label,
std::optional<uint64_t> desired_commit_timestamp = {});
bool DropIndex(LabelId label, PropertyId property);
bool DropIndex(LabelId label, PropertyId property,
std::optional<uint64_t> desired_commit_timestamp = {});
IndicesInfo ListAllIndices() const;
@ -372,11 +374,14 @@ class Storage final {
/// @throw std::bad_alloc
/// @throw std::length_error
utils::BasicResult<ConstraintViolation, bool> CreateExistenceConstraint(
LabelId label, PropertyId property);
LabelId label, PropertyId property,
std::optional<uint64_t> desired_commit_timestamp = {});
/// Removes an existence constraint. Returns true if the constraint was
/// removed, and false if it doesn't exist.
bool DropExistenceConstraint(LabelId label, PropertyId property);
bool DropExistenceConstraint(
LabelId label, PropertyId property,
std::optional<uint64_t> desired_commit_timestamp = {});
/// Creates a unique constraint. In the case of two vertices violating the
/// constraint, it returns `ConstraintViolation`. Otherwise returns a
@ -389,7 +394,8 @@ class Storage final {
///
/// @throw std::bad_alloc
utils::BasicResult<ConstraintViolation, UniqueConstraints::CreationStatus>
CreateUniqueConstraint(LabelId label, const std::set<PropertyId> &properties);
CreateUniqueConstraint(LabelId label, const std::set<PropertyId> &properties,
std::optional<uint64_t> desired_commit_timestamp = {});
/// Removes a unique constraint. Returns `UniqueConstraints::DeletionStatus`
/// enum with the following possibilities:
@ -399,7 +405,8 @@ class Storage final {
/// * `PROPERTIES_SIZE_LIMIT_EXCEEDED` if the property set exceeds the
// limit of maximum number of properties.
UniqueConstraints::DeletionStatus DropUniqueConstraint(
LabelId label, const std::set<PropertyId> &properties);
LabelId label, const std::set<PropertyId> &properties,
std::optional<uint64_t> desired_commit_timestamp = {});
ConstraintsInfo ListAllConstraints() const;
@ -407,7 +414,7 @@ class Storage final {
#ifdef MG_ENTERPRISE
template <ReplicationRole role, typename... Args>
void SetReplicationRole(Args &&... args) {
void SetReplicationRole(Args &&...args) {
if (replication_role_.load() == role) {
return;
}
@ -450,8 +457,15 @@ class Storage final {
void CreateSnapshot();
uint64_t CommitTimestamp(
std::optional<uint64_t> desired_commit_timestamp = {});
#ifdef MG_ENTERPRISE
void ConfigureReplica(io::network::Endpoint endpoint);
std::pair<durability::WalInfo, std::filesystem::path> LoadWal(
replication::Decoder *decoder,
durability::RecoveredIndicesAndConstraints *indices_constraints);
#endif
// Main storage lock.
@ -531,6 +545,7 @@ class Storage final {
// Replication
#ifdef MG_ENTERPRISE
std::atomic<uint64_t> last_commit_timestamp_{kTimestampInitialId};
utils::RWLock replication_lock_{utils::RWLock::Priority::WRITE};
struct ReplicationServer {

View File

@ -423,16 +423,28 @@ TEST_F(ReplicationTest, RecoveryProcess) {
"MG_test_unit_storage_v2_replication_replica"};
utils::OnScopeExit replica_directory_cleaner(
[&]() { std::filesystem::remove_all(replica_storage_directory); });
constexpr const auto *vertex_label = "vertex_label";
{
storage::Storage replica_store(
{.durability = {.storage_directory = replica_storage_directory}});
{.durability = {.storage_directory = replica_storage_directory,
.recover_on_startup = true,
.snapshot_wal_mode = storage::Config::Durability::
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}});
replica_store.SetReplicationRole<storage::ReplicationRole::REPLICA>(
io::network::Endpoint{"127.0.0.1", 10000});
main_store.RegisterReplica("REPLICA1",
io::network::Endpoint{"127.0.0.1", 10000});
constexpr const auto *vertex_label = "vertex_label";
ASSERT_EQ(main_store.ReplicaState("REPLICA1"),
storage::replication::ReplicaState::RECOVERY);
while (main_store.ReplicaState("REPLICA1") !=
storage::replication::ReplicaState::READY) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
{
auto acc = main_store.Access();
for (const auto &vertex_gid : vertex_gids) {
@ -462,11 +474,12 @@ TEST_F(ReplicationTest, RecoveryProcess) {
ASSERT_FALSE(acc.Commit().HasError());
}
}
// Test recovery of the replica with the files received from Main
{
storage::Storage replica_store(
{.durability = {.storage_directory = replica_storage_directory,
.recover_on_startup = true}});
.recover_on_startup = true,
.snapshot_wal_mode = storage::Config::Durability::
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}});
{
auto acc = replica_store.Access();
for (const auto &vertex_gid : vertex_gids) {
@ -474,9 +487,14 @@ TEST_F(ReplicationTest, RecoveryProcess) {
ASSERT_TRUE(v);
const auto labels = v->Labels(storage::View::OLD);
ASSERT_TRUE(labels.HasValue());
// Labels are received with AppendDeltasRpc so they are not saved on
// disk
ASSERT_EQ(labels->size(), 0);
ASSERT_THAT(*labels, UnorderedElementsAre(
replica_store.NameToLabel(vertex_label)));
const auto properties = v->Properties(storage::View::OLD);
ASSERT_TRUE(properties.HasValue());
ASSERT_THAT(*properties,
UnorderedElementsAre(std::make_pair(
replica_store.NameToProperty(property_name),
storage::PropertyValue(property_value))));
}
ASSERT_FALSE(acc.Commit().HasError());
}
@ -524,27 +542,18 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) {
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
ASSERT_EQ(main_store.ReplicaState("REPLICA_ASYNC"),
storage::replication::ReplicaState::RECOVERY);
// Replica should have at least the first vertex
{
auto acc = replica_store_async.Access();
auto v = acc.FindVertex(created_vertices[0], storage::View::OLD);
ASSERT_TRUE(v);
ASSERT_FALSE(acc.Commit().HasError());
while (main_store.ReplicaState("REPLICA_ASYNC") !=
storage::replication::ReplicaState::READY) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
// Most of the later vertices should be skipped because
// asyn replica cannot keep up
ASSERT_FALSE(std::all_of(created_vertices.begin() + 1, created_vertices.end(),
[&](const auto vertex_gid) {
auto acc = replica_store_async.Access();
auto v =
acc.FindVertex(vertex_gid, storage::View::OLD);
const bool exists = v.has_value();
EXPECT_FALSE(acc.Commit().HasError());
return exists;
}));
ASSERT_TRUE(std::all_of(created_vertices.begin(), created_vertices.end(),
[&](const auto vertex_gid) {
auto acc = replica_store_async.Access();
auto v =
acc.FindVertex(vertex_gid, storage::View::OLD);
const bool exists = v.has_value();
EXPECT_FALSE(acc.Commit().HasError());
return exists;
}));
}

View File

@ -12,6 +12,7 @@
#include "storage/v2/mvcc.hpp"
#include "storage/v2/name_id_mapper.hpp"
#include "utils/file.hpp"
#include "utils/file_locker.hpp"
#include "utils/uuid.hpp"
// Helper function used to convert between enum types.
@ -192,7 +193,7 @@ class DeltaGenerator final {
seq_num_(seq_num),
wal_file_(data_directory, uuid_,
{.properties_on_edges = properties_on_edges}, &mapper_,
seq_num) {}
seq_num, &file_retainer_) {}
Transaction CreateTransaction() { return Transaction(this); }
@ -282,6 +283,8 @@ class DeltaGenerator final {
uint64_t tx_from_{0};
uint64_t tx_to_{0};
uint64_t valid_{true};
utils::FileRetainer file_retainer_;
};
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)