Small replication fixes (#59)

* Set state to invalid after exception

* Add proper locking

* Start background replicating only if in valid state

* Freeze transaction timestamp on replica

* Timeout fixes

* Fix Jepsen run script

* Disable perf checker and enable nemesis

* Add documentation for some chunks of code

* Decrease timeout so main doesn't hang on network partitions too long
This commit is contained in:
antonio2368 2020-12-18 09:17:32 +01:00 committed by Antonio Andelic
parent bff9cf07de
commit adc355a22a
7 changed files with 94 additions and 66 deletions

View File

@ -324,7 +324,7 @@ jobs:
release_jepsen_test:
name: "Release Jepsen Test"
runs-on: [self-hosted, Linux, X64, Debian10, JepsenControl]
continue-on-error: true
#continue-on-error: true
env:
THREADS: 24
@ -356,7 +356,7 @@ jobs:
# NOTE: docker exec -t is NOT ok because gh user does NOT have TTY.
# NOTE: ~/.bashrc has to be manually sourced when bash -c is used
# because some Jepsen config is there.
docker exec jepsen-control bash -c "source ~/.bashrc && cd memgraph && lein run test-all --local-binary /opt/memgraph/memgraph --node n1"
docker exec jepsen-control bash -c "source ~/.bashrc && cd memgraph && lein run test-all --local-binary /opt/memgraph/memgraph --node-configs resources/node-config.edn"
docker exec jepsen-control bash -c 'tar -czvf /jepsen/memgraph/Jepsen.tar.gz $(readlink -f /jepsen/memgraph/store/latest)'
docker cp jepsen-control:/jepsen/memgraph/Jepsen.tar.gz ./

View File

@ -57,15 +57,15 @@ void Storage::ReplicationClient::InitializeClient() {
return;
}
current_commit_timestamp = response.current_commit_timestamp;
DLOG(INFO) << "CURRENT TIMESTAMP: " << current_commit_timestamp;
DLOG(INFO) << "CURRENT MAIN TIMESTAMP: "
DLOG(INFO) << "Current timestamp on replica: " << current_commit_timestamp;
DLOG(INFO) << "Current MAIN timestamp: "
<< storage_->last_commit_timestamp_.load();
if (current_commit_timestamp == storage_->last_commit_timestamp_.load()) {
DLOG(INFO) << "REPLICA UP TO DATE";
DLOG(INFO) << "Replica up to date";
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::READY);
} else {
DLOG(INFO) << "REPLICA IS BEHIND";
DLOG(INFO) << "Replica is behind";
{
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::RECOVERY);
@ -153,6 +153,7 @@ void Storage::ReplicationClient::StartTransactionReplication(
current_wal_seq_num});
replica_state_.store(replication::ReplicaState::REPLICATING);
} catch (const rpc::RpcFailedException &) {
replica_state_.store(replication::ReplicaState::INVALID);
HandleRpcFailure();
}
return;
@ -161,16 +162,34 @@ void Storage::ReplicationClient::StartTransactionReplication(
void Storage::ReplicationClient::IfStreamingTransaction(
const std::function<void(ReplicaStream &handler)> &callback) {
if (replica_stream_) {
try {
callback(*replica_stream_);
} catch (const rpc::RpcFailedException &) {
HandleRpcFailure();
// We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption
// that this and other transaction replication functions can only be
// called from a one thread stands)
if (replica_state_ != replication::ReplicaState::REPLICATING) {
return;
}
try {
callback(*replica_stream_);
} catch (const rpc::RpcFailedException &) {
{
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::INVALID);
}
HandleRpcFailure();
}
}
void Storage::ReplicationClient::FinalizeTransactionReplication() {
// We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption
// that this and other transaction replication functions can only be
// called from a one thread stands)
if (replica_state_ != replication::ReplicaState::REPLICATING) {
return;
}
if (mode_ == replication::ReplicationMode::ASYNC) {
thread_pool_.AddTask(
[this] { this->FinalizeTransactionReplicationInternal(); });
@ -192,10 +211,10 @@ void Storage::ReplicationClient::FinalizeTransactionReplication() {
timeout_dispatcher_->StartTimeoutTask(*timeout_);
// Wait until one of the threads notifies us that they finished executing
// Both threads should first set the active flag to false
{
std::unique_lock main_guard(timeout_dispatcher_->main_lock);
// Wait until one of the threads notifies us that they finished executing
// Both threads should first set the active flag to false
timeout_dispatcher_->main_cv.wait(
main_guard, [&] { return !timeout_dispatcher_->active.load(); });
}
@ -217,26 +236,27 @@ void Storage::ReplicationClient::FinalizeTransactionReplication() {
}
void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
if (replica_stream_) {
try {
auto response = replica_stream_->Finalize();
std::unique_lock client_guard{client_lock_};
if (!response.success ||
replica_state_ == replication::ReplicaState::RECOVERY) {
replica_state_.store(replication::ReplicaState::RECOVERY);
thread_pool_.AddTask([&, this] {
this->RecoverReplica(response.current_commit_timestamp);
});
}
} catch (const rpc::RpcFailedException &) {
HandleRpcFailure();
}
CHECK(replica_stream_) << "Missing stream for transaction deltas";
try {
auto response = replica_stream_->Finalize();
replica_stream_.reset();
}
std::unique_lock guard(client_lock_);
if (replica_state_.load() == replication::ReplicaState::REPLICATING) {
replica_state_.store(replication::ReplicaState::READY);
std::unique_lock client_guard(client_lock_);
if (!response.success ||
replica_state_ == replication::ReplicaState::RECOVERY) {
replica_state_.store(replication::ReplicaState::RECOVERY);
thread_pool_.AddTask([&, this] {
this->RecoverReplica(response.current_commit_timestamp);
});
} else {
replica_state_.store(replication::ReplicaState::READY);
}
} catch (const rpc::RpcFailedException &) {
replica_stream_.reset();
{
std::unique_lock client_guard(client_lock_);
replica_state_.store(replication::ReplicaState::INVALID);
}
HandleRpcFailure();
}
}
@ -254,13 +274,13 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
DLOG(INFO) << "Sending the latest snapshot file: " << arg;
auto response = TransferSnapshot(arg);
replica_commit = response.current_commit_timestamp;
DLOG(INFO) << "CURRENT TIMESTAMP ON REPLICA: "
DLOG(INFO) << "Current timestamp on replica: "
<< replica_commit;
} else if constexpr (std::is_same_v<StepType, RecoveryWals>) {
DLOG(INFO) << "Sending the latest wal files";
auto response = TransferWalFiles(arg);
replica_commit = response.current_commit_timestamp;
DLOG(INFO) << "CURRENT TIMESTAMP ON REPLICA: "
DLOG(INFO) << "Current timestamp on replica: "
<< replica_commit;
} else if constexpr (std::is_same_v<StepType,
RecoveryCurrentWal>) {
@ -273,7 +293,7 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
DLOG(INFO) << "Sending current wal file";
replica_commit = ReplicateCurrentWal();
DLOG(INFO)
<< "CURRENT TIMESTAMP ON REPLICA: " << replica_commit;
<< "Current timestamp on replica: " << replica_commit;
storage_->wal_file_->EnableFlushing();
}
} else if constexpr (std::is_same_v<StepType,
@ -290,12 +310,25 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
},
recovery_step);
} catch (const rpc::RpcFailedException &) {
{
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::INVALID);
}
HandleRpcFailure();
}
}
// To avoid the situation where we read a correct commit timestamp in
// one thread, and after that another thread commits a different a
// transaction and THEN we set the state to READY in the first thread,
// we set this lock before checking the timestamp.
// We will detect that the state is invalid during the next commit,
// because AppendDeltasRpc sends the last commit timestamp which
// replica checks if it's the same last commit timestamp it received
// and we will go to recovery.
// By adding this lock, we can avoid that, and go to RECOVERY immediately.
std::unique_lock client_guard{client_lock_};
if (storage_->last_commit_timestamp_.load() == replica_commit) {
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::READY);
return;
}
@ -491,14 +524,14 @@ void Storage::ReplicationClient::TimeoutDispatcher::WaitForTaskToFinish() {
void Storage::ReplicationClient::TimeoutDispatcher::StartTimeoutTask(
const double timeout) {
timeout_pool.AddTask([&, this] {
timeout_pool.AddTask([timeout, this] {
finished = false;
using std::chrono::steady_clock;
const auto timeout_duration =
std::chrono::duration_cast<steady_clock::duration>(
std::chrono::duration<double>(timeout));
const auto end_time = steady_clock::now() + timeout_duration;
while (active && steady_clock::now() < end_time) {
while (active && (steady_clock::now() < end_time)) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

View File

@ -185,7 +185,7 @@ class Storage::ReplicationClient {
private:
// if the Timeout task finished executing
bool finished{false};
bool finished{true};
utils::ThreadPool timeout_pool{1};
};

View File

@ -1226,14 +1226,6 @@ bool Storage::CreateIndex(
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!indices_.label_index.CreateIndex(label, vertices_.access()))
return false;
// Here it is safe to use `timestamp_` as the final commit timestamp of this
// operation even though this operation isn't transactional. The `timestamp_`
// variable holds the next timestamp that will be used. Because the above
// `storage_guard` ensures that no transactions are currently active, the
// value of `timestamp_` is guaranteed to be used as a start timestamp for the
// next regular transaction after this operation. This prevents collisions of
// commit timestamps between non-transactional operations and transactional
// operations.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_CREATE, label, {},
commit_timestamp);
@ -1251,8 +1243,6 @@ bool Storage::CreateIndex(
if (!indices_.label_property_index.CreateIndex(label, property,
vertices_.access()))
return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE,
label, {property}, commit_timestamp);
@ -1267,8 +1257,6 @@ bool Storage::DropIndex(
LabelId label, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!indices_.label_index.DropIndex(label)) return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_DROP, label, {},
commit_timestamp);
@ -1310,8 +1298,6 @@ Storage::CreateExistenceConstraint(
auto ret = ::storage::CreateExistenceConstraint(&constraints_, label,
property, vertices_.access());
if (ret.HasError() || !ret.GetValue()) return ret;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE,
label, {property}, commit_timestamp);
@ -1328,8 +1314,6 @@ bool Storage::DropExistenceConstraint(
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!::storage::DropExistenceConstraint(&constraints_, label, property))
return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP,
label, {property}, commit_timestamp);
@ -1351,8 +1335,6 @@ Storage::CreateUniqueConstraint(
ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS) {
return ret;
}
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE,
label, properties, commit_timestamp);
@ -1371,8 +1353,6 @@ UniqueConstraints::DeletionStatus Storage::DropUniqueConstraint(
if (ret != UniqueConstraints::DeletionStatus::SUCCESS) {
return ret;
}
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP, label,
properties, commit_timestamp);
@ -1436,7 +1416,21 @@ Transaction Storage::CreateTransaction() {
{
std::lock_guard<utils::SpinLock> guard(engine_lock_);
transaction_id = transaction_id_++;
#ifdef MG_ENTERPRISE
// Replica should have only read queries and the write queries
// can come from main instance with any past timestamp.
// To preserve snapshot isolation we set the start timestamp
// of any query on replica to the last commited transaction
// which is timestamp_ as only commit of transaction with writes
// can change the value of it.
if (replication_role_ == ReplicationRole::REPLICA) {
start_timestamp = timestamp_;
} else {
start_timestamp = timestamp_++;
}
#else
start_timestamp = timestamp_++;
#endif
}
return {transaction_id, start_timestamp};
}

View File

@ -3,4 +3,4 @@
"n3" {:replication-role :replica :replication-mode :async :port 10000}}
{"n1" {:replication-role :main}
"n2" {:replication-role :replica :replication-mode :async :port 10000}
"n3" {:replication-role :replica :replication-mode :sync :port 10000 :timeout 5}}]
"n3" {:replication-role :replica :replication-mode :sync :port 10000 :timeout 3}}]

View File

@ -23,9 +23,9 @@
(def nemesis-configuration
"Nemesis configuration"
{:interval 5
:kill-node true
:partition-halves true})
{:interval 5
:kill-node? true
:partition-halves? true})
(defn memgraph-test
"Given an options map from the command line runner (e.g. :nodes, :ssh,
@ -53,7 +53,7 @@
:checker (checker/compose
{:stats (checker/stats)
:exceptions (checker/unhandled-exceptions)
:perf (checker/perf)
;:perf (checker/perf) really exepnsive
:workload (:checker workload)})
:nemesis (:nemesis nemesis)
:generator gen})))
@ -67,7 +67,7 @@
{:replication-role :replica
:replication-mode :sync
:port 10000
:timeout 10}}))
:timeout 5}}))
{}
nodes))

View File

@ -35,7 +35,8 @@
[(cycle (map op [:start-partition-halves :stop-partition-halves]))])]
(apply concat)
gen/mix
(gen/stagger (:interval opts))))
(gen/stagger (:interval opts))
(gen/phases (gen/sleep 10))))
(defn nemesis
"Composite nemesis and generator"