Add a lock around replication timeout map in Raft
Summary: Concurent access to the map that contains replication log timeouts caused the HA version to often report replication log timeout errors. Adding locks around the access prevets them from happening. Performance on Apollo reports write speed around 8k/s. Reviewers: ipaljak Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1920
This commit is contained in:
parent
1c768287e8
commit
f97872170a
@ -48,6 +48,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
|
||||
db_recover_on_startup_(db_recover_on_startup),
|
||||
commit_index_(0),
|
||||
last_applied_(0),
|
||||
replication_timeout_(config.replication_timeout),
|
||||
disk_storage_(fs::path(durability_dir) / kRaftDir) {}
|
||||
|
||||
void RaftServer::Start() {
|
||||
@ -409,7 +410,7 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
|
||||
for (auto &peer_heartbeat : next_heartbeat_) peer_heartbeat = now;
|
||||
|
||||
// From this point on, we can say that the replication of a LogEntry started.
|
||||
replication_timeout_[tx_id] = config_.replication_timeout + now;
|
||||
replication_timeout_.Insert(tx_id);
|
||||
|
||||
state_changed_.notify_all();
|
||||
}
|
||||
@ -435,7 +436,7 @@ bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) {
|
||||
return true;
|
||||
case Mode::LEADER:
|
||||
if (rlog_->is_active(tx_id)) {
|
||||
if (replication_timeout_[tx_id] < Clock::now()) {
|
||||
if (replication_timeout_.CheckTimeout(tx_id)) {
|
||||
throw ReplicationTimeoutException();
|
||||
}
|
||||
|
||||
@ -529,7 +530,7 @@ void RaftServer::Transition(const Mode &new_mode) {
|
||||
|
||||
db_->Reset();
|
||||
ResetReplicationLog();
|
||||
replication_timeout_.clear();
|
||||
replication_timeout_.Clear();
|
||||
|
||||
// Re-apply raft log.
|
||||
uint64_t starting_index = 1;
|
||||
@ -662,7 +663,7 @@ void RaftServer::AdvanceCommitIndex() {
|
||||
<< "Log entry should consist of at least two state deltas.";
|
||||
auto tx_id = deltas[0].transaction_id;
|
||||
rlog_->set_replicated(tx_id);
|
||||
replication_timeout_.erase(tx_id);
|
||||
replication_timeout_.Remove(tx_id);
|
||||
}
|
||||
|
||||
commit_index_ = new_commit_index;
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "raft/raft_interface.hpp"
|
||||
#include "raft/raft_rpc_messages.hpp"
|
||||
#include "raft/replication_log.hpp"
|
||||
#include "raft/replication_timeout_map.hpp"
|
||||
#include "raft/snapshot_metadata.hpp"
|
||||
#include "storage/common/kvstore/kvstore.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
@ -238,7 +239,7 @@ class RaftServer final : public RaftInterface {
|
||||
|
||||
// Tracks timepoints until a transactions is allowed to be in the replication
|
||||
// process.
|
||||
std::unordered_map<tx::TransactionId, TimePoint> replication_timeout_;
|
||||
ReplicationTimeoutMap replication_timeout_;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// persistent state on all servers
|
||||
|
74
src/raft/replication_timeout_map.hpp
Normal file
74
src/raft/replication_timeout_map.hpp
Normal file
@ -0,0 +1,74 @@
|
||||
/// @file
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "transactions/type.hpp"
|
||||
|
||||
namespace raft {
|
||||
|
||||
using Clock = std::chrono::system_clock;
|
||||
using TimePoint = std::chrono::system_clock::time_point;
|
||||
|
||||
/// A wrapper around an unordered_map whose reads/writes are protected with a
|
||||
/// lock. It's also specialized to serve the sole purpose of tracking
|
||||
/// replication timeout.
|
||||
class ReplicationTimeoutMap final {
|
||||
public:
|
||||
ReplicationTimeoutMap() = delete;
|
||||
|
||||
ReplicationTimeoutMap(const ReplicationTimeoutMap &) = delete;
|
||||
ReplicationTimeoutMap(ReplicationTimeoutMap &&) = delete;
|
||||
ReplicationTimeoutMap operator=(const ReplicationTimeoutMap &) = delete;
|
||||
ReplicationTimeoutMap operator=(ReplicationTimeoutMap &&) = delete;
|
||||
|
||||
explicit ReplicationTimeoutMap(std::chrono::milliseconds replication_timeout)
|
||||
: replication_timeout_(replication_timeout) {}
|
||||
|
||||
/// Remove all entries from the map.
|
||||
void Clear() {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
timeout_.clear();
|
||||
}
|
||||
|
||||
/// Remove a single entry from the map.
|
||||
void Remove(const tx::TransactionId &tx_id) {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
timeout_.erase(tx_id);
|
||||
}
|
||||
|
||||
/// Inserts and entry in the map by setting a point in time until it needs to
|
||||
/// replicated.
|
||||
void Insert(const tx::TransactionId &tx_id) {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
timeout_.emplace(tx_id, replication_timeout_ + Clock::now());
|
||||
}
|
||||
|
||||
/// Checks if the given entry has timed out.
|
||||
/// @returns bool True if it exceeded timeout, false otherwise.
|
||||
bool CheckTimeout(const tx::TransactionId &tx_id) {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
auto found = timeout_.find(tx_id);
|
||||
// If we didn't set the timeout yet, or we already deleted it, we didn't
|
||||
// time out.
|
||||
if (found == timeout_.end()) return false;
|
||||
if (found->second < Clock::now()) {
|
||||
LOG(INFO) << "Timeout happened for " << tx_id << " "
|
||||
<< std::chrono::system_clock::to_time_t(Clock::now()) << " "
|
||||
<< std::chrono::system_clock::to_time_t(found->second);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::chrono::milliseconds replication_timeout_;
|
||||
|
||||
mutable std::mutex lock_;
|
||||
std::unordered_map<tx::TransactionId, TimePoint> timeout_;
|
||||
};
|
||||
|
||||
} // namespace raft
|
Loading…
Reference in New Issue
Block a user