Load WAL on replica using transactions (#95)

This commit is contained in:
antonio2368 2021-06-16 13:22:48 +02:00 committed by GitHub
parent 644a3a0b2a
commit cbf826e0c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 226 additions and 236 deletions

View File

@ -1,12 +1,33 @@
#include "storage/v2/replication/replication_server.hpp"
#include <atomic>
#include <filesystem>
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/paths.hpp"
#include "storage/v2/durability/serialization.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/version.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/transaction.hpp"
#include "utils/exceptions.hpp"
namespace storage {
namespace {
std::pair<uint64_t, durability::WalDeltaData> ReadDelta(durability::BaseDecoder *decoder) {
try {
auto timestamp = ReadWalDeltaHeader(decoder);
SPDLOG_INFO(" Timestamp {}", timestamp);
auto delta = ReadWalDeltaData(decoder);
return {timestamp, delta};
} catch (const slk::SlkReaderException &) {
throw utils::BasicException("Missing data!");
} catch (const durability::RecoveryFailure &) {
throw utils::BasicException("Invalid data!");
}
};
} // namespace
Storage::ReplicationServer::ReplicationServer(Storage *storage, io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config)
: storage_(storage) {
@ -68,33 +89,6 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
storage_->epoch_id_ = std::move(*maybe_epoch_id);
}
const auto read_delta = [&]() -> std::pair<uint64_t, durability::WalDeltaData> {
try {
auto timestamp = ReadWalDeltaHeader(&decoder);
SPDLOG_INFO(" Timestamp {}", timestamp);
auto delta = ReadWalDeltaData(&decoder);
return {timestamp, delta};
} catch (const slk::SlkReaderException &) {
throw utils::BasicException("Missing data!");
} catch (const durability::RecoveryFailure &) {
throw utils::BasicException("Invalid data!");
}
};
if (req.previous_commit_timestamp != storage_->last_commit_timestamp_.load()) {
// Empty the stream
bool transaction_complete = false;
while (!transaction_complete) {
SPDLOG_INFO("Skipping delta");
const auto [timestamp, delta] = read_delta();
transaction_complete = durability::IsWalDeltaDataTypeTransactionEnd(delta.type);
}
AppendDeltasRes res{false, storage_->last_commit_timestamp_.load()};
slk::Save(res, res_builder);
return;
}
if (storage_->wal_file_) {
if (req.seq_num > storage_->wal_file_->SequenceNumber() || *maybe_epoch_id != storage_->epoch_id_) {
storage_->wal_file_->FinalizeWal();
@ -108,6 +102,173 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
storage_->wal_seq_num_ = req.seq_num;
}
if (req.previous_commit_timestamp != storage_->last_commit_timestamp_.load()) {
// Empty the stream
bool transaction_complete = false;
while (!transaction_complete) {
SPDLOG_INFO("Skipping delta");
const auto [timestamp, delta] = ReadDelta(&decoder);
transaction_complete = durability::IsWalDeltaDataTypeTransactionEnd(delta.type);
}
AppendDeltasRes res{false, storage_->last_commit_timestamp_.load()};
slk::Save(res, res_builder);
return;
}
ReadAndApplyDelta(&decoder);
AppendDeltasRes res{true, storage_->last_commit_timestamp_.load()};
slk::Save(res, res_builder);
}
void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
SnapshotReq req;
slk::Load(&req, req_reader);
replication::Decoder decoder(req_reader);
utils::EnsureDirOrDie(storage_->snapshot_directory_);
const auto maybe_snapshot_path = decoder.ReadFile(storage_->snapshot_directory_);
MG_ASSERT(maybe_snapshot_path, "Failed to load snapshot!");
spdlog::info("Received snapshot saved to {}", *maybe_snapshot_path);
std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
// Clear the database
storage_->vertices_.clear();
storage_->edges_.clear();
storage_->constraints_ = Constraints();
storage_->indices_.label_index = LabelIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items);
storage_->indices_.label_property_index =
LabelPropertyIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items);
try {
spdlog::debug("Loading snapshot");
auto recovered_snapshot = durability::LoadSnapshot(*maybe_snapshot_path, &storage_->vertices_, &storage_->edges_,
&storage_->epoch_history_, &storage_->name_id_mapper_,
&storage_->edge_count_, storage_->config_.items);
spdlog::debug("Snapshot loaded successfully");
// If this step is present it should always be the first step of
// the recovery so we use the UUID we read from snasphost
storage_->uuid_ = std::move(recovered_snapshot.snapshot_info.uuid);
storage_->epoch_id_ = std::move(recovered_snapshot.snapshot_info.epoch_id);
const auto &recovery_info = recovered_snapshot.recovery_info;
storage_->vertex_id_ = recovery_info.next_vertex_id;
storage_->edge_id_ = recovery_info.next_edge_id;
storage_->timestamp_ = std::max(storage_->timestamp_, recovery_info.next_timestamp);
durability::RecoverIndicesAndConstraints(recovered_snapshot.indices_constraints, &storage_->indices_,
&storage_->constraints_, &storage_->vertices_);
} catch (const durability::RecoveryFailure &e) {
LOG_FATAL("Couldn't load the snapshot because of: {}", e.what());
}
storage_guard.unlock();
SnapshotRes res{true, storage_->last_commit_timestamp_.load()};
slk::Save(res, res_builder);
// Delete other durability files
auto snapshot_files = durability::GetSnapshotFiles(storage_->snapshot_directory_, storage_->uuid_);
for (const auto &[path, uuid, _] : snapshot_files) {
if (path != *maybe_snapshot_path) {
storage_->file_retainer_.DeleteFile(path);
}
}
auto wal_files = durability::GetWalFiles(storage_->wal_directory_, storage_->uuid_);
if (wal_files) {
for (const auto &wal_file : *wal_files) {
storage_->file_retainer_.DeleteFile(wal_file.path);
}
storage_->wal_file_.reset();
}
}
void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
WalFilesReq req;
slk::Load(&req, req_reader);
const auto wal_file_number = req.file_number;
spdlog::debug("Received WAL files: {}", wal_file_number);
replication::Decoder decoder(req_reader);
utils::EnsureDirOrDie(storage_->wal_directory_);
for (auto i = 0; i < wal_file_number; ++i) {
LoadWal(&decoder);
}
WalFilesRes res{true, storage_->last_commit_timestamp_.load()};
slk::Save(res, res_builder);
}
void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
CurrentWalReq req;
slk::Load(&req, req_reader);
replication::Decoder decoder(req_reader);
utils::EnsureDirOrDie(storage_->wal_directory_);
LoadWal(&decoder);
CurrentWalRes res{true, storage_->last_commit_timestamp_.load()};
slk::Save(res, res_builder);
}
void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) {
const auto temp_wal_directory = std::filesystem::temp_directory_path() / "memgraph" / durability::kWalDirectory;
utils::EnsureDir(temp_wal_directory);
auto maybe_wal_path = decoder->ReadFile(temp_wal_directory);
MG_ASSERT(maybe_wal_path, "Failed to load WAL!");
spdlog::trace("Received WAL saved to {}", *maybe_wal_path);
try {
auto wal_info = durability::ReadWalInfo(*maybe_wal_path);
if (wal_info.seq_num == 0) {
storage_->uuid_ = wal_info.uuid;
}
if (wal_info.epoch_id != storage_->epoch_id_) {
storage_->epoch_history_.emplace_back(wal_info.epoch_id, storage_->last_commit_timestamp_);
storage_->epoch_id_ = std::move(wal_info.epoch_id);
}
if (storage_->wal_file_) {
if (storage_->wal_file_->SequenceNumber() != wal_info.seq_num) {
storage_->wal_file_->FinalizeWal();
storage_->wal_seq_num_ = wal_info.seq_num;
storage_->wal_file_.reset();
}
} else {
storage_->wal_seq_num_ = wal_info.seq_num;
}
durability::Decoder wal;
const auto version = wal.Initialize(*maybe_wal_path, durability::kWalMagic);
if (!version) throw durability::RecoveryFailure("Couldn't read WAL magic and/or version!");
if (!durability::IsVersionSupported(*version)) throw durability::RecoveryFailure("Invalid WAL version!");
wal.SetPosition(wal_info.offset_deltas);
for (size_t i = 0; i < wal_info.num_deltas;) {
i += ReadAndApplyDelta(&wal);
}
spdlog::debug("{} loaded successfully", *maybe_wal_path);
} catch (const durability::RecoveryFailure &e) {
LOG_FATAL("Couldn't recover WAL deltas from {} because of: {}", *maybe_wal_path, e.what());
}
}
Storage::ReplicationServer::~ReplicationServer() {
if (rpc_server_) {
rpc_server_->Shutdown();
rpc_server_->AwaitShutdown();
}
}
uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *decoder) {
auto edge_acc = storage_->edges_.access();
auto vertex_acc = storage_->vertices_.access();
@ -121,11 +282,22 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
return &commit_timestamp_and_accessor->second;
};
bool transaction_complete = false;
for (uint64_t i = 0; !transaction_complete; ++i) {
SPDLOG_INFO(" Delta {}", i);
const auto [timestamp, delta] = read_delta();
uint64_t applied_deltas = 0;
auto max_commit_timestamp = storage_->last_commit_timestamp_.load();
for (bool transaction_complete = false; !transaction_complete; ++applied_deltas) {
const auto [timestamp, delta] = ReadDelta(decoder);
if (timestamp > max_commit_timestamp) {
max_commit_timestamp = timestamp;
}
transaction_complete = durability::IsWalDeltaDataTypeTransactionEnd(delta.type);
if (timestamp < storage_->timestamp_) {
continue;
}
SPDLOG_INFO(" Delta {}", applied_deltas);
switch (delta.type) {
case durability::WalDeltaData::Type::VERTEX_CREATE: {
spdlog::trace(" Create vertex {}", delta.vertex_create_delete.gid.AsUint());
@ -368,189 +540,12 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
break;
}
}
transaction_complete = durability::IsWalDeltaDataTypeTransactionEnd(delta.type);
}
if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid data!");
AppendDeltasRes res{true, storage_->last_commit_timestamp_.load()};
slk::Save(res, res_builder);
}
storage_->last_commit_timestamp_ = max_commit_timestamp;
void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
SnapshotReq req;
slk::Load(&req, req_reader);
replication::Decoder decoder(req_reader);
utils::EnsureDirOrDie(storage_->snapshot_directory_);
const auto maybe_snapshot_path = decoder.ReadFile(storage_->snapshot_directory_);
MG_ASSERT(maybe_snapshot_path, "Failed to load snapshot!");
spdlog::info("Received snapshot saved to {}", *maybe_snapshot_path);
{
std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
// Clear the database
storage_->vertices_.clear();
storage_->edges_.clear();
storage_->constraints_ = Constraints();
storage_->indices_.label_index = LabelIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items);
storage_->indices_.label_property_index =
LabelPropertyIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items);
try {
spdlog::debug("Loading snapshot");
auto recovered_snapshot = durability::LoadSnapshot(*maybe_snapshot_path, &storage_->vertices_, &storage_->edges_,
&storage_->epoch_history_, &storage_->name_id_mapper_,
&storage_->edge_count_, storage_->config_.items);
spdlog::debug("Snapshot loaded successfully");
// If this step is present it should always be the first step of
// the recovery so we use the UUID we read from snasphost
storage_->uuid_ = std::move(recovered_snapshot.snapshot_info.uuid);
storage_->epoch_id_ = std::move(recovered_snapshot.snapshot_info.epoch_id);
const auto &recovery_info = recovered_snapshot.recovery_info;
storage_->vertex_id_ = recovery_info.next_vertex_id;
storage_->edge_id_ = recovery_info.next_edge_id;
storage_->timestamp_ = std::max(storage_->timestamp_, recovery_info.next_timestamp);
storage_->commit_log_.emplace(storage_->timestamp_);
durability::RecoverIndicesAndConstraints(recovered_snapshot.indices_constraints, &storage_->indices_,
&storage_->constraints_, &storage_->vertices_);
} catch (const durability::RecoveryFailure &e) {
LOG_FATAL("Couldn't load the snapshot because of: {}", e.what());
}
}
SnapshotRes res{true, storage_->last_commit_timestamp_.load()};
slk::Save(res, res_builder);
// Delete other durability files
auto snapshot_files = durability::GetSnapshotFiles(storage_->snapshot_directory_, storage_->uuid_);
for (const auto &[path, uuid, _] : snapshot_files) {
if (path != *maybe_snapshot_path) {
storage_->file_retainer_.DeleteFile(path);
}
}
auto wal_files = durability::GetWalFiles(storage_->wal_directory_, storage_->uuid_);
if (wal_files) {
for (const auto &wal_file : *wal_files) {
storage_->file_retainer_.DeleteFile(wal_file.path);
}
storage_->wal_file_.reset();
}
}
void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
WalFilesReq req;
slk::Load(&req, req_reader);
const auto wal_file_number = req.file_number;
spdlog::debug("Received WAL files: {}", wal_file_number);
replication::Decoder decoder(req_reader);
utils::EnsureDirOrDie(storage_->wal_directory_);
{
std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
durability::RecoveredIndicesAndConstraints indices_constraints;
auto [wal_info, path] = LoadWal(&decoder, &indices_constraints);
if (wal_info.seq_num == 0) {
storage_->uuid_ = wal_info.uuid;
}
// Check the seq number of the first wal file to see if it's the
// finalized form of the current wal on replica
if (storage_->wal_file_) {
if (storage_->wal_file_->SequenceNumber() == wal_info.seq_num && storage_->wal_file_->Path() != path) {
storage_->wal_file_->DeleteWal();
}
storage_->wal_file_.reset();
}
for (auto i = 1; i < wal_file_number; ++i) {
LoadWal(&decoder, &indices_constraints);
}
durability::RecoverIndicesAndConstraints(indices_constraints, &storage_->indices_, &storage_->constraints_,
&storage_->vertices_);
}
WalFilesRes res{true, storage_->last_commit_timestamp_.load()};
slk::Save(res, res_builder);
}
void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
CurrentWalReq req;
slk::Load(&req, req_reader);
replication::Decoder decoder(req_reader);
utils::EnsureDirOrDie(storage_->wal_directory_);
{
std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
durability::RecoveredIndicesAndConstraints indices_constraints;
auto [wal_info, path] = LoadWal(&decoder, &indices_constraints);
if (wal_info.seq_num == 0) {
storage_->uuid_ = wal_info.uuid;
}
if (storage_->wal_file_ && storage_->wal_file_->SequenceNumber() == wal_info.seq_num &&
storage_->wal_file_->Path() != path) {
// Delete the old wal file
storage_->file_retainer_.DeleteFile(storage_->wal_file_->Path());
}
MG_ASSERT(storage_->config_.durability.snapshot_wal_mode ==
Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL);
storage_->wal_file_.emplace(std::move(path), storage_->config_.items, &storage_->name_id_mapper_, wal_info.seq_num,
wal_info.from_timestamp, wal_info.to_timestamp, wal_info.num_deltas,
&storage_->file_retainer_);
durability::RecoverIndicesAndConstraints(indices_constraints, &storage_->indices_, &storage_->constraints_,
&storage_->vertices_);
}
CurrentWalRes res{true, storage_->last_commit_timestamp_.load()};
slk::Save(res, res_builder);
}
std::pair<durability::WalInfo, std::filesystem::path> Storage::ReplicationServer::LoadWal(
replication::Decoder *decoder, durability::RecoveredIndicesAndConstraints *indices_constraints) {
auto maybe_wal_path = decoder->ReadFile(storage_->wal_directory_, "_MAIN");
MG_ASSERT(maybe_wal_path, "Failed to load WAL!");
spdlog::trace("Received WAL saved to {}", *maybe_wal_path);
try {
auto wal_info = durability::ReadWalInfo(*maybe_wal_path);
if (wal_info.epoch_id != storage_->epoch_id_) {
storage_->epoch_history_.emplace_back(wal_info.epoch_id, storage_->last_commit_timestamp_);
storage_->epoch_id_ = std::move(wal_info.epoch_id);
}
const auto last_loaded_timestamp =
storage_->timestamp_ == kTimestampInitialId ? std::nullopt : std::optional{storage_->timestamp_ - 1};
auto info = durability::LoadWal(*maybe_wal_path, indices_constraints, last_loaded_timestamp, &storage_->vertices_,
&storage_->edges_, &storage_->name_id_mapper_, &storage_->edge_count_,
storage_->config_.items);
storage_->vertex_id_ = std::max(storage_->vertex_id_.load(), info.next_vertex_id);
storage_->edge_id_ = std::max(storage_->edge_id_.load(), info.next_edge_id);
storage_->timestamp_ = std::max(storage_->timestamp_, info.next_timestamp);
storage_->commit_log_.emplace(storage_->timestamp_);
if (info.last_commit_timestamp) {
storage_->last_commit_timestamp_ = *info.last_commit_timestamp;
}
spdlog::debug("{} loaded successfully", *maybe_wal_path);
return {std::move(wal_info), std::move(*maybe_wal_path)};
} catch (const durability::RecoveryFailure &e) {
LOG_FATAL("Couldn't recover WAL deltas from {} because of: {}", *maybe_wal_path, e.what());
}
}
Storage::ReplicationServer::~ReplicationServer() {
if (rpc_server_) {
rpc_server_->Shutdown();
rpc_server_->AwaitShutdown();
}
return applied_deltas;
}
} // namespace storage

View File

@ -23,8 +23,8 @@ class Storage::ReplicationServer {
void WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder);
void CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder);
std::pair<durability::WalInfo, std::filesystem::path> LoadWal(
replication::Decoder *decoder, durability::RecoveredIndicesAndConstraints *indices_constraints);
void LoadWal(replication::Decoder *decoder);
uint64_t ReadAndApplyDelta(durability::BaseDecoder *decoder);
std::optional<communication::ServerContext> rpc_server_context_;
std::optional<rpc::Server> rpc_server_;

View File

@ -118,8 +118,10 @@
(filter #(= :ok (:type %)))
(filter #(= :read (:f %))))
bad-reads (->> ok-reads
(map #(->> % :value :accounts))
(filter #(= (count %) 5))
(map (fn [op]
(let [balances (->> op :value :accounts (map :balance))
(let [balances (map :balance op)
expected-total (* account-num starting-balance)]
(cond (and
(not-empty balances)

View File

@ -9,19 +9,7 @@
[jepsen.memgraph.client :as c]))
(dbclient/defquery get-all-nodes
"MATCH (n:Node) RETURN n;")
(dbclient/defquery get-max-id
"MATCH (n:Node)
RETURN n.id AS id
ORDER BY id DESC
LIMIT 1;")
(dbclient/defquery get-min-id
"MATCH (n:Node)
RETURN n.id AS id
ORDER BY id
LIMIT 1;")
"MATCH (n:Node) RETURN n ORDER BY n.id;")
(dbclient/defquery create-node
"CREATE (n:Node {id: $id});")
@ -29,19 +17,23 @@
(dbclient/defquery delete-node-with-id
"MATCH (n:Node {id: $id}) DELETE n;")
(def next-node-for-add (atom 0))
(defn add-next-node
"Add a new node with its id set to the next highest"
[conn]
(dbclient/with-transaction conn tx
(let [max-id (-> (get-max-id tx) first :id)]
(create-node tx {:id (inc max-id)}))))
(when (dbclient/with-transaction conn tx
(create-node tx {:id (swap! next-node-for-add identity)}))
(swap! next-node-for-add inc)))
(def next-node-for-delete (atom 0))
(defn delete-oldest-node
"Delete a node with the lowest id"
[conn]
(dbclient/with-transaction conn tx
(let [min-id (-> (get-min-id tx) first :id)]
(delete-node-with-id tx {:id min-id}))))
(when (dbclient/with-transaction conn tx
(delete-node-with-id tx {:id (swap! next-node-for-delete identity)}))
(swap! next-node-for-delete inc)))
(c/replication-client Client []
(open! [this test node]
@ -123,11 +115,12 @@
(when (not-empty ids)
(cond ((complement strictly-increasing) ids)
{:type :not-increasing-ids
:op op}
((complement increased-by-1) ids)
{:type :ids-missing
:op op})))))
;; if there are multiple threads not sure how to guarante that the ids are created in order
;;((complement increased-by-1) ids)
;;{:type :ids-missing
;; :op op})))))
(filter identity)
(into []))
empty-nodes (let [all-nodes (->> ok-reads