Add information on show replicas to express how up-to-date a replica is (#412)

* Add test

* Add implementation and adapted test

* Update workloads.yaml to have a timeout > 0

* Update tests (failing due to merging of "add replica state")
This commit is contained in:
Jeremy B 2022-06-23 10:22:57 +02:00 committed by GitHub
parent 7a2bbd4bb3
commit 65a7ba01da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 175 additions and 14 deletions

View File

@ -231,6 +231,11 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
if (repl_info.timeout) {
replica.timeout = *repl_info.timeout;
}
replica.current_timestamp_of_replica = repl_info.timestamp_info.current_timestamp_of_replica;
replica.current_number_of_timestamp_behind_master =
repl_info.timestamp_info.current_number_of_timestamp_behind_master;
switch (repl_info.state) {
case storage::replication::ReplicaState::READY:
replica.state = ReplicationQuery::ReplicaState::READY;
@ -245,6 +250,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
replica.state = ReplicationQuery::ReplicaState::INVALID;
break;
}
return replica;
};
@ -512,7 +518,13 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
}
case ReplicationQuery::Action::SHOW_REPLICAS: {
callback.header = {"name", "socket_address", "sync_mode", "timeout", "state"};
callback.header = {"name",
"socket_address",
"sync_mode",
"timeout",
"current_timestamp_of_replica",
"number_of_timestamp_behind_master",
"state"};
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, replica_nfields = callback.header.size()] {
const auto &replicas = handler.ShowReplicas();
auto typed_replicas = std::vector<std::vector<TypedValue>>{};
@ -539,6 +551,10 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
typed_replica.emplace_back(TypedValue());
}
typed_replica.emplace_back(TypedValue(static_cast<int64_t>(replica.current_timestamp_of_replica)));
typed_replica.emplace_back(
TypedValue(static_cast<int64_t>(replica.current_number_of_timestamp_behind_master)));
switch (replica.state) {
case ReplicationQuery::ReplicaState::READY:
typed_replica.emplace_back(TypedValue("ready"));

View File

@ -127,6 +127,8 @@ class ReplicationQueryHandler {
std::string socket_address;
ReplicationQuery::SyncMode sync_mode;
std::optional<double> timeout;
uint64_t current_timestamp_of_replica;
uint64_t current_number_of_timestamp_behind_master;
ReplicationQuery::ReplicaState state;
};

View File

@ -537,6 +537,33 @@ std::vector<Storage::ReplicationClient::RecoveryStep> Storage::ReplicationClient
return recovery_steps;
}
Storage::TimestampInfo Storage::ReplicationClient::GetTimestampInfo() {
Storage::TimestampInfo info;
info.current_timestamp_of_replica = 0;
info.current_number_of_timestamp_behind_master = 0;
try {
auto stream{rpc_client_->Stream<replication::TimestampRpc>()};
const auto response = stream.AwaitResponse();
const auto is_success = response.success;
if (!is_success) {
replica_state_.store(replication::ReplicaState::INVALID);
HandleRpcFailure();
}
auto main_time_stamp = storage_->last_commit_timestamp_.load();
info.current_timestamp_of_replica = response.current_commit_timestamp;
info.current_number_of_timestamp_behind_master = response.current_commit_timestamp - main_time_stamp;
} catch (const rpc::RpcFailedException &) {
{
std::unique_lock client_guard(client_lock_);
replica_state_.store(replication::ReplicaState::INVALID);
}
HandleRpcFailure(); // mutex already unlocked, if the new enqueued task dispatches immediately it probably won't block
}
return info;
}
////// TimeoutDispatcher //////
void Storage::ReplicationClient::TimeoutDispatcher::WaitForTaskToFinish() {
// Wait for the previous timeout task to finish

View File

@ -124,6 +124,8 @@ class Storage::ReplicationClient {
const auto &Endpoint() const { return rpc_client_->Endpoint(); }
Storage::TimestampInfo GetTimestampInfo();
private:
void FinalizeTransactionReplicationInternal();

View File

@ -80,6 +80,10 @@ Storage::ReplicationServer::ReplicationServer(Storage *storage, io::network::End
spdlog::debug("Received CurrentWalRpc");
this->CurrentWalHandler(req_reader, res_builder);
});
rpc_server_->Register<replication::TimestampRpc>([this](auto *req_reader, auto *res_builder) {
spdlog::debug("Received TimestampRpc");
this->TimestampHandler(req_reader, res_builder);
});
rpc_server_->Start();
}
@ -284,6 +288,14 @@ void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) {
}
}
void Storage::ReplicationServer::TimestampHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::TimestampReq req;
slk::Load(&req, req_reader);
replication::TimestampRes res{true, storage_->last_commit_timestamp_.load()};
slk::Save(res, res_builder);
}
Storage::ReplicationServer::~ReplicationServer() {
if (rpc_server_) {
rpc_server_->Shutdown();

View File

@ -34,6 +34,7 @@ class Storage::ReplicationServer {
void SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder);
void WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder);
void CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder);
void TimestampHandler(slk::Reader *req_reader, slk::Builder *res_builder);
void LoadWal(replication::Decoder *decoder);
uint64_t ReadAndApplyDelta(durability::BaseDecoder *decoder);

View File

@ -67,6 +67,12 @@ cpp<#
((success :bool)
(current-commit-timestamp :uint64_t))))
(lcp:define-rpc timestamp
(:request ())
(:response
((success :bool)
(current-commit-timestamp :uint64_t))))
(lcp:pop-namespace) ;; replication
(lcp:pop-namespace) ;; storage
(lcp:pop-namespace) ;; memgraph

View File

@ -1954,7 +1954,8 @@ std::vector<Storage::ReplicaInfo> Storage::ReplicasInfo() {
replica_info.reserve(clients.size());
std::transform(clients.begin(), clients.end(), std::back_inserter(replica_info),
[](const auto &client) -> ReplicaInfo {
return {client->Name(), client->Mode(), client->Timeout(), client->Endpoint(), client->State()};
return {client->Name(), client->Mode(), client->Timeout(),
client->Endpoint(), client->State(), client->GetTimestampInfo()};
});
return replica_info;
});

View File

@ -425,12 +425,18 @@ class Storage final {
ReplicationRole GetReplicationRole() const;
struct TimestampInfo {
uint64_t current_timestamp_of_replica;
uint64_t current_number_of_timestamp_behind_master;
};
struct ReplicaInfo {
std::string name;
replication::ReplicationMode mode;
std::optional<double> timeout;
io::network::Endpoint endpoint;
replication::ReplicaState state;
TimestampInfo timestamp_info;
};
std::vector<ReplicaInfo> ReplicasInfo();

View File

@ -12,6 +12,7 @@
import sys
import pytest
import time
from common import execute_and_fetch_all
@ -27,5 +28,84 @@ def test_show_replication_role(port, role, connection):
assert data[0][0] == role
def test_show_replicas(connection):
cursor = connection(7687, "main").cursor()
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_column_names = {
"name",
"socket_address",
"sync_mode",
"timeout",
"current_timestamp_of_replica",
"number_of_timestamp_behind_master",
"state",
}
actual_column_names = {x.name for x in cursor.description}
assert expected_column_names == actual_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"),
}
assert expected_data == actual_data
def test_show_replicas_while_inserting_data(connection):
# Goal is to check the timestamp are correctly computed from the information we get from replicas.
# 0/ Check original state of replicas.
# 1/ Add some data on main.
# 2/ Check state of replicas.
# 3/ Execute a read only query.
# 4/ Check that the states have not changed.
# 0/
cursor = connection(7687, "main").cursor()
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_column_names = {
"name",
"socket_address",
"sync_mode",
"timeout",
"current_timestamp_of_replica",
"number_of_timestamp_behind_master",
"state",
}
actual_column_names = {x.name for x in cursor.description}
assert expected_column_names == actual_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"),
}
assert expected_data == actual_data
# 1/
execute_and_fetch_all(cursor, "CREATE (n1:Number {name: 'forty_two', value:42});")
time.sleep(1)
# 2/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 2.0, 4, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 1.0, 4, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, 4, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
print("actual_data=" + str(actual_data))
print("expected_data=" + str(expected_data))
assert expected_data == actual_data
# 3/
res = execute_and_fetch_all(cursor, "MATCH (node) return node;")
assert 1 == len(res)
# 4/
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert expected_data == actual_data
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -78,16 +78,24 @@ def test_show_replicas(connection):
# 1/
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
EXPECTED_COLUMN_NAMES = {"name", "socket_address", "sync_mode", "timeout", "state"}
EXPECTED_COLUMN_NAMES = {
"name",
"socket_address",
"sync_mode",
"timeout",
"current_timestamp_of_replica",
"number_of_timestamp_behind_master",
"state",
}
actual_column_names = {x.name for x in cursor.description}
assert EXPECTED_COLUMN_NAMES == actual_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 1.0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, "ready"),
("replica_4", "127.0.0.1:10004", "async", None, "ready"),
("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "ready"),
}
assert expected_data == actual_data
@ -95,9 +103,9 @@ def test_show_replicas(connection):
execute_and_fetch_all(cursor, "DROP REPLICA replica_2")
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, "ready"),
("replica_4", "127.0.0.1:10004", "async", None, "ready"),
("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "ready"),
}
assert expected_data == actual_data
@ -110,9 +118,9 @@ def test_show_replicas(connection):
time.sleep(2)
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, "invalid"),
("replica_3", "127.0.0.1:10003", "async", None, "invalid"),
("replica_4", "127.0.0.1:10004", "async", None, "invalid"),
("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "invalid"),
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "invalid"),
("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "invalid"),
}
assert expected_data == actual_data

View File

@ -69,7 +69,7 @@ workloads:
args: ["--bolt-port", "7687", "--log-level=TRACE"]
log_file: "replication-e2e-main.log"
setup_queries: [
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001'",
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 2 TO '127.0.0.1:10001'",
"REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'",
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'"
]