Fix epoch id handling (#73)

This commit is contained in:
antonio2368 2021-01-12 19:08:08 +01:00 committed by Antonio Andelic
parent b19cd4f5d1
commit 10c8256ec9
5 changed files with 44 additions and 89 deletions

View File

@ -969,9 +969,7 @@ void CreateSnapshot(
for (uint64_t i = 0; i < *pos; ++i) {
const auto &[seq_num, from_timestamp, to_timestamp, wal_path] =
wal_files[i];
if (!utils::DeleteFile(wal_path)) {
LOG(WARNING) << "Couldn't delete WAL file " << wal_path << "!";
}
file_retainer->DeleteFile(wal_path);
}
}
}

View File

@ -6,6 +6,7 @@
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/transaction.hpp"
#include "utils/file_locker.hpp"
namespace storage {
@ -39,23 +40,40 @@ Storage::ReplicationClient::ReplicationClient(
/// @throws rpc::RpcFailedException
void Storage::ReplicationClient::InitializeClient() {
uint64_t current_commit_timestamp{kTimestampInitialId};
auto stream{
rpc_client_->Stream<HeartbeatRpc>(storage_->last_commit_timestamp_)};
replication::Encoder encoder{stream.GetBuilder()};
// Write epoch id
std::optional<std::string> epoch_id;
{
// We need to lock so the epoch id isn't overwritten
std::unique_lock engine_guard{storage_->engine_lock_};
encoder.WriteString(storage_->epoch_id_);
// epoch_id_ can be changed if we don't take this lock
std::unique_lock engine_guard(storage_->engine_lock_);
epoch_id.emplace(storage_->epoch_id_);
}
auto stream{rpc_client_->Stream<HeartbeatRpc>(
storage_->last_commit_timestamp_, std::move(*epoch_id))};
const auto response = stream.AwaitResponse();
if (!response.success) {
LOG(ERROR)
<< "Replica " << name_
<< " is ahead of this instance. The branching point is on commit "
<< response.current_commit_timestamp;
std::optional<uint64_t> branching_point;
if (response.epoch_id != storage_->epoch_id_ &&
response.current_commit_timestamp != kTimestampInitialId) {
const auto &epoch_history = storage_->epoch_history_;
const auto epoch_info_iter =
std::find_if(epoch_history.crbegin(), epoch_history.crend(),
[&](const auto &epoch_info) {
return epoch_info.first == response.epoch_id;
});
if (epoch_info_iter == epoch_history.crend()) {
branching_point = 0;
} else if (epoch_info_iter->second != response.current_commit_timestamp) {
branching_point = epoch_info_iter->second;
}
}
if (branching_point) {
LOG(ERROR) << "Replica " << name_ << " cannot be used with this instance."
<< " Please start a clean instance of Memgraph server"
<< " on the specified endpoint.";
return;
}
current_commit_timestamp = response.current_commit_timestamp;
DLOG(INFO) << "Current timestamp on replica: " << current_commit_timestamp;
DLOG(INFO) << "Current MAIN timestamp: "

View File

@ -62,43 +62,8 @@ void Storage::ReplicationServer::HeartbeatHandler(slk::Reader *req_reader,
slk::Builder *res_builder) {
HeartbeatReq req;
slk::Load(&req, req_reader);
replication::Decoder decoder{req_reader};
auto maybe_epoch_id = decoder.ReadString();
CHECK(maybe_epoch_id) << "Invalid value read form HeartbeatRpc!";
if (storage_->last_commit_timestamp_ == kTimestampInitialId) {
// The replica has no commits
// use the main's epoch id
storage_->epoch_id_ = std::move(*maybe_epoch_id);
} else if (*maybe_epoch_id != storage_->epoch_id_) {
auto &epoch_history = storage_->epoch_history_;
const auto result =
std::find_if(epoch_history.rbegin(), epoch_history.rend(),
[&](const auto &epoch_info) {
return epoch_info.first == *maybe_epoch_id;
});
auto branching_point = kTimestampInitialId;
if (result == epoch_history.rend()) {
// we couldn't find the epoch_id inside the history so if it has
// the same or larger commit timestamp, some old replica became a main
// This isn't always the case, there is one case where an old main
// becomes a replica then main again and it should have a commit timestamp
// larger than the one on replica.
if (req.main_commit_timestamp >= storage_->last_commit_timestamp_) {
epoch_history.emplace_back(std::move(storage_->epoch_id_),
storage_->last_commit_timestamp_);
storage_->epoch_id_ = std::move(*maybe_epoch_id);
HeartbeatRes res{true, storage_->last_commit_timestamp_.load()};
slk::Save(res, res_builder);
return;
}
} else {
branching_point = result->second;
}
HeartbeatRes res{false, branching_point};
slk::Save(res, res_builder);
return;
}
HeartbeatRes res{true, storage_->last_commit_timestamp_.load()};
HeartbeatRes res{true, storage_->last_commit_timestamp_.load(),
storage_->epoch_id_};
slk::Save(res, res_builder);
}
@ -112,12 +77,11 @@ void Storage::ReplicationServer::AppendDeltasHandler(
auto maybe_epoch_id = decoder.ReadString();
CHECK(maybe_epoch_id) << "Invalid replication message";
// Different epoch ids should not be possible in AppendDeltas
// because Recovery and Heartbeat handlers should resolve
// any issues with timestamp and epoch id
CHECK(*maybe_epoch_id == storage_->epoch_id_)
<< "Received Deltas from transaction with incompatible"
" epoch id";
if (*maybe_epoch_id != storage_->epoch_id_) {
storage_->epoch_history_.emplace_back(std::move(storage_->epoch_id_),
storage_->last_commit_timestamp_);
storage_->epoch_id_ = std::move(*maybe_epoch_id);
}
const auto read_delta =
[&]() -> std::pair<uint64_t, durability::WalDeltaData> {

View File

@ -3,6 +3,7 @@
#include <cstdint>
#include <cstring>
#include <string>
#include "rpc/messages.hpp"
#include "slk/serialization.hpp"
@ -23,10 +24,13 @@ cpp<#
(current-commit-timestamp :uint64_t))))
(lcp:define-rpc heartbeat
(:request ((main-commit-timestamp :uint64_t)))
(:request
((main-commit-timestamp :uint64_t)
(epoch-id "std::string")))
(:response
((success :bool)
(current-commit-timestamp :uint64_t))))
(current-commit-timestamp :uint64_t)
(epoch-id "std::string"))))
(lcp:define-rpc snapshot
(:request ())

View File

@ -30,12 +30,6 @@
#include "storage/v2/replication/rpc.hpp"
#endif
#ifdef MG_ENTERPRISE
DEFINE_bool(main, false, "Set to true to be the main");
DEFINE_bool(replica, false, "Set to true to be the replica");
DEFINE_bool(async_replica, false, "Set to true to be the replica");
#endif
namespace storage {
namespace {
@ -421,29 +415,6 @@ Storage::Storage(Config config)
gc_runner_.Run("Storage GC", config_.gc.interval,
[this] { this->CollectGarbage(); });
}
#ifdef MG_ENTERPRISE
// For testing purposes until we can define the instance type from
// a query.
if (FLAGS_main) {
if (RegisterReplica("REPLICA_SYNC",
io::network::Endpoint{"127.0.0.1", 10000},
replication::ReplicationMode::SYNC)
.HasError()) {
LOG(WARNING) << "Couldn't connect to REPLICA_SYNC";
}
if (RegisterReplica("REPLICA_ASYNC",
io::network::Endpoint{"127.0.0.1", 10002},
replication::ReplicationMode::ASYNC)
.HasError()) {
LOG(WARNING) << "Couldn't connect to REPLICA_SYNC";
}
} else if (FLAGS_replica) {
SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10000});
} else if (FLAGS_async_replica) {
SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10002});
}
#endif
}
Storage::~Storage() {