From f97872170a2321be34eca98ecdca6a8ebab43b2d Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Tue, 12 Mar 2019 16:20:52 +0100 Subject: [PATCH] Add a lock around replication timeout map in Raft Summary: Concurent access to the map that contains replication log timeouts caused the HA version to often report replication log timeout errors. Adding locks around the access prevets them from happening. Performance on Apollo reports write speed around 8k/s. Reviewers: ipaljak Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1920 --- src/raft/raft_server.cpp | 9 ++-- src/raft/raft_server.hpp | 3 +- src/raft/replication_timeout_map.hpp | 74 ++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 5 deletions(-) create mode 100644 src/raft/replication_timeout_map.hpp diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index f7cc9809a..6dba8973c 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -48,6 +48,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, db_recover_on_startup_(db_recover_on_startup), commit_index_(0), last_applied_(0), + replication_timeout_(config.replication_timeout), disk_storage_(fs::path(durability_dir) / kRaftDir) {} void RaftServer::Start() { @@ -409,7 +410,7 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id, for (auto &peer_heartbeat : next_heartbeat_) peer_heartbeat = now; // From this point on, we can say that the replication of a LogEntry started. - replication_timeout_[tx_id] = config_.replication_timeout + now; + replication_timeout_.Insert(tx_id); state_changed_.notify_all(); } @@ -435,7 +436,7 @@ bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) { return true; case Mode::LEADER: if (rlog_->is_active(tx_id)) { - if (replication_timeout_[tx_id] < Clock::now()) { + if (replication_timeout_.CheckTimeout(tx_id)) { throw ReplicationTimeoutException(); } @@ -529,7 +530,7 @@ void RaftServer::Transition(const Mode &new_mode) { db_->Reset(); ResetReplicationLog(); - replication_timeout_.clear(); + replication_timeout_.Clear(); // Re-apply raft log. uint64_t starting_index = 1; @@ -662,7 +663,7 @@ void RaftServer::AdvanceCommitIndex() { << "Log entry should consist of at least two state deltas."; auto tx_id = deltas[0].transaction_id; rlog_->set_replicated(tx_id); - replication_timeout_.erase(tx_id); + replication_timeout_.Remove(tx_id); } commit_index_ = new_commit_index; diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 9fb10e6e6..06736106e 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -16,6 +16,7 @@ #include "raft/raft_interface.hpp" #include "raft/raft_rpc_messages.hpp" #include "raft/replication_log.hpp" +#include "raft/replication_timeout_map.hpp" #include "raft/snapshot_metadata.hpp" #include "storage/common/kvstore/kvstore.hpp" #include "transactions/type.hpp" @@ -238,7 +239,7 @@ class RaftServer final : public RaftInterface { // Tracks timepoints until a transactions is allowed to be in the replication // process. - std::unordered_map replication_timeout_; + ReplicationTimeoutMap replication_timeout_; ////////////////////////////////////////////////////////////////////////////// // persistent state on all servers diff --git a/src/raft/replication_timeout_map.hpp b/src/raft/replication_timeout_map.hpp new file mode 100644 index 000000000..0d2ce6985 --- /dev/null +++ b/src/raft/replication_timeout_map.hpp @@ -0,0 +1,74 @@ +/// @file +#pragma once + +#include +#include +#include + +#include "transactions/type.hpp" + +namespace raft { + +using Clock = std::chrono::system_clock; +using TimePoint = std::chrono::system_clock::time_point; + +/// A wrapper around an unordered_map whose reads/writes are protected with a +/// lock. It's also specialized to serve the sole purpose of tracking +/// replication timeout. +class ReplicationTimeoutMap final { + public: + ReplicationTimeoutMap() = delete; + + ReplicationTimeoutMap(const ReplicationTimeoutMap &) = delete; + ReplicationTimeoutMap(ReplicationTimeoutMap &&) = delete; + ReplicationTimeoutMap operator=(const ReplicationTimeoutMap &) = delete; + ReplicationTimeoutMap operator=(ReplicationTimeoutMap &&) = delete; + + explicit ReplicationTimeoutMap(std::chrono::milliseconds replication_timeout) + : replication_timeout_(replication_timeout) {} + + /// Remove all entries from the map. + void Clear() { + std::lock_guard guard(lock_); + timeout_.clear(); + } + + /// Remove a single entry from the map. + void Remove(const tx::TransactionId &tx_id) { + std::lock_guard guard(lock_); + timeout_.erase(tx_id); + } + + /// Inserts and entry in the map by setting a point in time until it needs to + /// replicated. + void Insert(const tx::TransactionId &tx_id) { + std::lock_guard guard(lock_); + timeout_.emplace(tx_id, replication_timeout_ + Clock::now()); + } + + /// Checks if the given entry has timed out. + /// @returns bool True if it exceeded timeout, false otherwise. + bool CheckTimeout(const tx::TransactionId &tx_id) { + std::lock_guard guard(lock_); + auto found = timeout_.find(tx_id); + // If we didn't set the timeout yet, or we already deleted it, we didn't + // time out. + if (found == timeout_.end()) return false; + if (found->second < Clock::now()) { + LOG(INFO) << "Timeout happened for " << tx_id << " " + << std::chrono::system_clock::to_time_t(Clock::now()) << " " + << std::chrono::system_clock::to_time_t(found->second); + return true; + } else { + return false; + } + } + + private: + std::chrono::milliseconds replication_timeout_; + + mutable std::mutex lock_; + std::unordered_map timeout_; +}; + +} // namespace raft