Improve error handling (#56)

This commit is contained in:
antonio2368 2020-12-04 10:20:13 +01:00 committed by Antonio Andelic
parent 200ce5f45e
commit f7b764607d
5 changed files with 120 additions and 94 deletions

View File

@ -165,7 +165,7 @@ RecoveredSnapshot LoadSnapshot(
if (!version)
throw RecoveryFailure("Couldn't read snapshot magic and/or version!");
if (!IsVersionSupported(*version))
throw RecoveryFailure("Invalid snapshot version!");
throw RecoveryFailure(fmt::format("Invalid snapshot version {}", *version));
// Cleanup of loaded data in case of failure.
bool success = false;
@ -173,6 +173,7 @@ RecoveredSnapshot LoadSnapshot(
if (!success) {
edges->clear();
vertices->clear();
epoch_history->clear();
}
});

View File

@ -28,14 +28,15 @@ Storage::ReplicationClient::ReplicationClient(
}
rpc_client_.emplace(endpoint, &*rpc_context_);
InitializeClient();
TryInitializeClient();
if (config.timeout) {
if (config.timeout && replica_state_ != replication::ReplicaState::INVALID) {
timeout_.emplace(*config.timeout);
timeout_dispatcher_.emplace();
}
}
/// @throws rpc::RpcFailedException
void Storage::ReplicationClient::InitializeClient() {
uint64_t current_commit_timestamp{kTimestampInitialId};
auto stream{
@ -74,6 +75,25 @@ void Storage::ReplicationClient::InitializeClient() {
}
}
void Storage::ReplicationClient::TryInitializeClient() {
try {
InitializeClient();
} catch (const rpc::RpcFailedException &) {
std::unique_lock client_guarde{client_lock_};
replica_state_.store(replication::ReplicaState::INVALID);
LOG(ERROR) << "Failed to connect to replica " << name_ << " at "
<< rpc_client_->Endpoint();
}
}
void Storage::ReplicationClient::HandleRpcFailure() {
LOG(ERROR) << "Couldn't replicate data to " << name_;
thread_pool_.AddTask([this] {
rpc_client_->Abort();
this->TryInitializeClient();
});
}
SnapshotRes Storage::ReplicationClient::TransferSnapshot(
const std::filesystem::path &path) {
auto stream{rpc_client_->Stream<SnapshotRpc>()};
@ -103,44 +123,39 @@ OnlySnapshotRes Storage::ReplicationClient::TransferOnlySnapshot(
return stream.AwaitResponse();
}
bool Storage::ReplicationClient::StartTransactionReplication(
void Storage::ReplicationClient::StartTransactionReplication(
const uint64_t current_wal_seq_num) {
std::unique_lock guard(client_lock_);
const auto status = replica_state_.load();
switch (status) {
case replication::ReplicaState::RECOVERY:
DLOG(INFO) << "Replica " << name_ << " is behind MAIN instance";
return false;
return;
case replication::ReplicaState::REPLICATING:
DLOG(INFO) << "Replica missed a transaction, going to recovery";
DLOG(INFO) << "Replica " << name_ << " missed a transaction";
// We missed a transaction because we're still replicating
// the previous transaction so we need to go to RECOVERY
// state to catch up with the missing transaction
// We cannot queue the recovery process here because
// an error can happen while we're replicating the previous
// transaction after which the client should go to
// INVALID state before starting the recovery process
replica_state_.store(replication::ReplicaState::RECOVERY);
// If it's in replicating state, it should have been up to date with all
// the commits until now so the replica should contain the
// last_commit_timestamp
thread_pool_.AddTask([=, this] {
this->RecoverReplica(storage_->last_commit_timestamp_.load());
});
return false;
return;
case replication::ReplicaState::INVALID:
LOG(ERROR) << "Couldn't replicate data to " << name_;
return false;
HandleRpcFailure();
return;
case replication::ReplicaState::READY:
CHECK(!replica_stream_);
try {
replica_stream_.emplace(
ReplicaStream{this, storage_->last_commit_timestamp_.load(),
current_wal_seq_num});
} catch (const rpc::RpcFailedException &) {
replica_state_.store(replication::ReplicaState::INVALID);
LOG(ERROR) << "Couldn't replicate data to " << name_;
thread_pool_.AddTask([this] {
rpc_client_->Abort();
InitializeClient();
});
return false;
}
replica_state_.store(replication::ReplicaState::REPLICATING);
return true;
} catch (const rpc::RpcFailedException &) {
HandleRpcFailure();
}
return;
}
}
@ -150,11 +165,7 @@ void Storage::ReplicationClient::IfStreamingTransaction(
try {
callback(*replica_stream_);
} catch (const rpc::RpcFailedException &) {
LOG(ERROR) << "Couldn't replicate data to " << name_;
thread_pool_.AddTask([this] {
rpc_client_->Abort();
InitializeClient();
});
HandleRpcFailure();
}
}
}
@ -209,25 +220,16 @@ void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
if (replica_stream_) {
try {
auto response = replica_stream_->Finalize();
if (!response.success) {
{
std::unique_lock client_guard{client_lock_};
if (!response.success ||
replica_state_ == replication::ReplicaState::RECOVERY) {
replica_state_.store(replication::ReplicaState::RECOVERY);
}
thread_pool_.AddTask([&, this] {
this->RecoverReplica(response.current_commit_timestamp);
});
}
} catch (const rpc::RpcFailedException &) {
LOG(ERROR) << "Couldn't replicate data to " << name_;
{
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::INVALID);
}
thread_pool_.AddTask([this] {
rpc_client_->Abort();
InitializeClient();
});
HandleRpcFailure();
}
replica_stream_.reset();
}
@ -244,6 +246,7 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
const auto steps = GetRecoverySteps(replica_commit, &file_locker);
for (const auto &recovery_step : steps) {
try {
std::visit(
[&, this]<typename T>(T &&arg) {
using StepType = std::remove_cvref_t<T>;
@ -251,13 +254,16 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
DLOG(INFO) << "Sending the latest snapshot file: " << arg;
auto response = TransferSnapshot(arg);
replica_commit = response.current_commit_timestamp;
DLOG(INFO) << "CURRENT TIMESTAMP ON REPLICA: " << replica_commit;
DLOG(INFO) << "CURRENT TIMESTAMP ON REPLICA: "
<< replica_commit;
} else if constexpr (std::is_same_v<StepType, RecoveryWals>) {
DLOG(INFO) << "Sending the latest wal files";
auto response = TransferWalFiles(arg);
replica_commit = response.current_commit_timestamp;
DLOG(INFO) << "CURRENT TIMESTAMP ON REPLICA: " << replica_commit;
} else if constexpr (std::is_same_v<StepType, RecoveryCurrentWal>) {
DLOG(INFO) << "CURRENT TIMESTAMP ON REPLICA: "
<< replica_commit;
} else if constexpr (std::is_same_v<StepType,
RecoveryCurrentWal>) {
std::unique_lock transaction_guard(storage_->engine_lock_);
if (storage_->wal_file_ &&
storage_->wal_file_->SequenceNumber() ==
@ -266,8 +272,8 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
transaction_guard.unlock();
DLOG(INFO) << "Sending current wal file";
replica_commit = ReplicateCurrentWal();
DLOG(INFO) << "CURRENT TIMESTAMP ON REPLICA: "
<< replica_commit;
DLOG(INFO)
<< "CURRENT TIMESTAMP ON REPLICA: " << replica_commit;
storage_->wal_file_->EnableFlushing();
}
} else if constexpr (std::is_same_v<StepType,
@ -283,6 +289,9 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
}
},
recovery_step);
} catch (const rpc::RpcFailedException &) {
HandleRpcFailure();
}
}
if (storage_->last_commit_timestamp_.load() == replica_commit) {
@ -391,7 +400,6 @@ Storage::ReplicationClient::GetRecoverySteps(
return recovery_steps;
}
++rwal_it;
uint64_t previous_seq_num{rwal_it->seq_num};
for (; rwal_it != wal_files->rend(); ++rwal_it) {
// If the difference between two consecutive wal files is not 0 or 1
@ -403,11 +411,12 @@ Storage::ReplicationClient::GetRecoverySteps(
// Find first WAL that contains up to replica commit, i.e. WAL
// that is before the replica commit or conatins the replica commit
// as the last committed transaction.
if (replica_commit >= rwal_it->from_timestamp &&
replica_commit >= rwal_it->to_timestamp) {
if (replica_commit >= rwal_it->from_timestamp) {
if (replica_commit >= rwal_it->to_timestamp) {
// We want the WAL after because the replica already contains all the
// commits from this WAL
--rwal_it;
}
std::vector<std::filesystem::path> wal_chain;
auto distance_from_first = std::distance(rwal_it, wal_files->rend() - 1);
// We have managed to create WAL chain

View File

@ -90,7 +90,7 @@ class Storage::ReplicationClient {
rpc::Client::StreamHandler<CurrentWalRpc> stream_;
};
bool StartTransactionReplication(uint64_t current_wal_seq_num);
void StartTransactionReplication(uint64_t current_wal_seq_num);
// Replication clients can be removed at any point
// so to avoid any complexity of checking if the client was removed whenever
@ -155,6 +155,10 @@ class Storage::ReplicationClient {
void InitializeClient();
void TryInitializeClient();
void HandleRpcFailure();
std::string name_;
Storage *storage_;
@ -190,6 +194,19 @@ class Storage::ReplicationClient {
std::optional<TimeoutDispatcher> timeout_dispatcher_;
utils::SpinLock client_lock_;
// This thread pool is used for background tasks so we don't
// block the main storage thread
// We use only 1 thread for 2 reasons:
// - background tasks ALWAYS contain some kind of RPC communication.
// We can't have multiple RPC communication from a same client
// because that's not logically valid (e.g. you cannot send a snapshot
// and WAL at a same time because WAL will arrive earlier and be applied
// before the snapshot which is not correct)
// - the implementation is simplified as we have a total control of what
// this pool is executing. Also, we can simply queue multiple tasks
// and be sure of the execution order.
// Not having mulitple possible threads in the same client allows us
// to ignore concurrency problems inside the client.
utils::ThreadPool thread_pool_{1};
std::atomic<replication::ReplicaState> replica_state_{
replication::ReplicaState::INVALID};

View File

@ -112,6 +112,13 @@ void Storage::ReplicationServer::AppendDeltasHandler(
auto maybe_epoch_id = decoder.ReadString();
CHECK(maybe_epoch_id) << "Invalid replication message";
// Different epoch ids should not be possible in AppendDeltas
// because Recovery and Heartbeat handlers should resolve
// any issues with timestamp and epoch id
CHECK(*maybe_epoch_id == storage_->epoch_id_)
<< "Received Deltas from transaction with incompatible"
" epoch id";
const auto read_delta =
[&]() -> std::pair<uint64_t, durability::WalDeltaData> {
try {
@ -126,11 +133,6 @@ void Storage::ReplicationServer::AppendDeltasHandler(
}
};
// TODO (antonio2368): Add error handling for different epoch id
if (*maybe_epoch_id != storage_->epoch_id_) {
throw utils::BasicException("Invalid epoch id");
}
if (req.previous_commit_timestamp !=
storage_->last_commit_timestamp_.load()) {
// Empty the stream
@ -532,7 +534,6 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader,
storage_->edges_.clear();
storage_->constraints_ = Constraints();
// TODO (antonio2368): Check if there's a less hacky way
storage_->indices_.label_index = LabelIndex(
&storage_->indices_, &storage_->constraints_, storage_->config_.items);
storage_->indices_.label_property_index = LabelPropertyIndex(
@ -558,8 +559,7 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader,
recovered_snapshot.indices_constraints, &storage_->indices_,
&storage_->constraints_, &storage_->vertices_);
} catch (const durability::RecoveryFailure &e) {
// TODO (antonio2368): What to do if the sent snapshot is invalid
LOG(WARNING) << "Couldn't load the snapshot because of: " << e.what();
LOG(FATAL) << "Couldn't load the snapshot because of: " << e.what();
}
storage_->last_commit_timestamp_ = storage_->timestamp_ - 1;
storage_guard.unlock();

View File

@ -2003,7 +2003,6 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
std::string name, io::network::Endpoint endpoint,
const replication::ReplicationMode replication_mode,
const replication::ReplicationClientConfig &config) {
// TODO (antonio2368): This shouldn't stop the main instance
CHECK(replication_role_.load() == ReplicationRole::MAIN)
<< "Only main instance can register a replica!";