diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index fd2161829..a8bb42dc9 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -231,6 +231,11 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { if (repl_info.timeout) { replica.timeout = *repl_info.timeout; } + + replica.current_timestamp_of_replica = repl_info.timestamp_info.current_timestamp_of_replica; + replica.current_number_of_timestamp_behind_master = + repl_info.timestamp_info.current_number_of_timestamp_behind_master; + switch (repl_info.state) { case storage::replication::ReplicaState::READY: replica.state = ReplicationQuery::ReplicaState::READY; @@ -245,6 +250,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { replica.state = ReplicationQuery::ReplicaState::INVALID; break; } + return replica; }; @@ -512,7 +518,13 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } case ReplicationQuery::Action::SHOW_REPLICAS: { - callback.header = {"name", "socket_address", "sync_mode", "timeout", "state"}; + callback.header = {"name", + "socket_address", + "sync_mode", + "timeout", + "current_timestamp_of_replica", + "number_of_timestamp_behind_master", + "state"}; callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, replica_nfields = callback.header.size()] { const auto &replicas = handler.ShowReplicas(); auto typed_replicas = std::vector>{}; @@ -539,6 +551,10 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & typed_replica.emplace_back(TypedValue()); } + typed_replica.emplace_back(TypedValue(static_cast(replica.current_timestamp_of_replica))); + typed_replica.emplace_back( + TypedValue(static_cast(replica.current_number_of_timestamp_behind_master))); + switch (replica.state) { case ReplicationQuery::ReplicaState::READY: typed_replica.emplace_back(TypedValue("ready")); diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 78713be47..155cb28a6 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -127,6 +127,8 @@ class ReplicationQueryHandler { std::string socket_address; ReplicationQuery::SyncMode sync_mode; std::optional timeout; + uint64_t current_timestamp_of_replica; + uint64_t current_number_of_timestamp_behind_master; ReplicationQuery::ReplicaState state; }; diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 1fcdcb2c5..9aec05a50 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -537,6 +537,33 @@ std::vector Storage::ReplicationClient return recovery_steps; } +Storage::TimestampInfo Storage::ReplicationClient::GetTimestampInfo() { + Storage::TimestampInfo info; + info.current_timestamp_of_replica = 0; + info.current_number_of_timestamp_behind_master = 0; + + try { + auto stream{rpc_client_->Stream()}; + const auto response = stream.AwaitResponse(); + const auto is_success = response.success; + if (!is_success) { + replica_state_.store(replication::ReplicaState::INVALID); + HandleRpcFailure(); + } + auto main_time_stamp = storage_->last_commit_timestamp_.load(); + info.current_timestamp_of_replica = response.current_commit_timestamp; + info.current_number_of_timestamp_behind_master = response.current_commit_timestamp - main_time_stamp; + } catch (const rpc::RpcFailedException &) { + { + std::unique_lock client_guard(client_lock_); + replica_state_.store(replication::ReplicaState::INVALID); + } + HandleRpcFailure(); // mutex already unlocked, if the new enqueued task dispatches immediately it probably won't block + } + + return info; +} + ////// TimeoutDispatcher ////// void Storage::ReplicationClient::TimeoutDispatcher::WaitForTaskToFinish() { // Wait for the previous timeout task to finish diff --git a/src/storage/v2/replication/replication_client.hpp b/src/storage/v2/replication/replication_client.hpp index d7d13de98..b1be036c4 100644 --- a/src/storage/v2/replication/replication_client.hpp +++ b/src/storage/v2/replication/replication_client.hpp @@ -124,6 +124,8 @@ class Storage::ReplicationClient { const auto &Endpoint() const { return rpc_client_->Endpoint(); } + Storage::TimestampInfo GetTimestampInfo(); + private: void FinalizeTransactionReplicationInternal(); diff --git a/src/storage/v2/replication/replication_server.cpp b/src/storage/v2/replication/replication_server.cpp index be797b1fe..fed501d6e 100644 --- a/src/storage/v2/replication/replication_server.cpp +++ b/src/storage/v2/replication/replication_server.cpp @@ -80,6 +80,10 @@ Storage::ReplicationServer::ReplicationServer(Storage *storage, io::network::End spdlog::debug("Received CurrentWalRpc"); this->CurrentWalHandler(req_reader, res_builder); }); + rpc_server_->Register([this](auto *req_reader, auto *res_builder) { + spdlog::debug("Received TimestampRpc"); + this->TimestampHandler(req_reader, res_builder); + }); rpc_server_->Start(); } @@ -284,6 +288,14 @@ void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) { } } +void Storage::ReplicationServer::TimestampHandler(slk::Reader *req_reader, slk::Builder *res_builder) { + replication::TimestampReq req; + slk::Load(&req, req_reader); + + replication::TimestampRes res{true, storage_->last_commit_timestamp_.load()}; + slk::Save(res, res_builder); +} + Storage::ReplicationServer::~ReplicationServer() { if (rpc_server_) { rpc_server_->Shutdown(); diff --git a/src/storage/v2/replication/replication_server.hpp b/src/storage/v2/replication/replication_server.hpp index 5aa81424c..083d1c6cf 100644 --- a/src/storage/v2/replication/replication_server.hpp +++ b/src/storage/v2/replication/replication_server.hpp @@ -34,6 +34,7 @@ class Storage::ReplicationServer { void SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder); void WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder); void CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder); + void TimestampHandler(slk::Reader *req_reader, slk::Builder *res_builder); void LoadWal(replication::Decoder *decoder); uint64_t ReadAndApplyDelta(durability::BaseDecoder *decoder); diff --git a/src/storage/v2/replication/rpc.lcp b/src/storage/v2/replication/rpc.lcp index 40efdaa0f..aa8ef8b7d 100644 --- a/src/storage/v2/replication/rpc.lcp +++ b/src/storage/v2/replication/rpc.lcp @@ -67,6 +67,12 @@ cpp<# ((success :bool) (current-commit-timestamp :uint64_t)))) +(lcp:define-rpc timestamp + (:request ()) + (:response + ((success :bool) + (current-commit-timestamp :uint64_t)))) + (lcp:pop-namespace) ;; replication (lcp:pop-namespace) ;; storage (lcp:pop-namespace) ;; memgraph diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index 471a346c4..e0fbdf0da 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -1954,7 +1954,8 @@ std::vector Storage::ReplicasInfo() { replica_info.reserve(clients.size()); std::transform(clients.begin(), clients.end(), std::back_inserter(replica_info), [](const auto &client) -> ReplicaInfo { - return {client->Name(), client->Mode(), client->Timeout(), client->Endpoint(), client->State()}; + return {client->Name(), client->Mode(), client->Timeout(), + client->Endpoint(), client->State(), client->GetTimestampInfo()}; }); return replica_info; }); diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 50e2e982b..90a5daf16 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -425,12 +425,18 @@ class Storage final { ReplicationRole GetReplicationRole() const; + struct TimestampInfo { + uint64_t current_timestamp_of_replica; + uint64_t current_number_of_timestamp_behind_master; + }; + struct ReplicaInfo { std::string name; replication::ReplicationMode mode; std::optional timeout; io::network::Endpoint endpoint; replication::ReplicaState state; + TimestampInfo timestamp_info; }; std::vector ReplicasInfo(); diff --git a/tests/e2e/replication/show.py b/tests/e2e/replication/show.py index 9d8a92af4..add3d5938 100755 --- a/tests/e2e/replication/show.py +++ b/tests/e2e/replication/show.py @@ -12,6 +12,7 @@ import sys import pytest +import time from common import execute_and_fetch_all @@ -27,5 +28,84 @@ def test_show_replication_role(port, role, connection): assert data[0][0] == role +def test_show_replicas(connection): + cursor = connection(7687, "main").cursor() + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + + expected_column_names = { + "name", + "socket_address", + "sync_mode", + "timeout", + "current_timestamp_of_replica", + "number_of_timestamp_behind_master", + "state", + } + actual_column_names = {x.name for x in cursor.description} + assert expected_column_names == actual_column_names + + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"), + } + assert expected_data == actual_data + + +def test_show_replicas_while_inserting_data(connection): + # Goal is to check the timestamp are correctly computed from the information we get from replicas. + # 0/ Check original state of replicas. + # 1/ Add some data on main. + # 2/ Check state of replicas. + # 3/ Execute a read only query. + # 4/ Check that the states have not changed. + + # 0/ + cursor = connection(7687, "main").cursor() + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + + expected_column_names = { + "name", + "socket_address", + "sync_mode", + "timeout", + "current_timestamp_of_replica", + "number_of_timestamp_behind_master", + "state", + } + actual_column_names = {x.name for x in cursor.description} + assert expected_column_names == actual_column_names + + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"), + } + assert expected_data == actual_data + + # 1/ + execute_and_fetch_all(cursor, "CREATE (n1:Number {name: 'forty_two', value:42});") + time.sleep(1) + + # 2/ + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 2.0, 4, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 1.0, 4, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", None, 4, 0, "ready"), + } + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + print("actual_data=" + str(actual_data)) + print("expected_data=" + str(expected_data)) + assert expected_data == actual_data + + # 3/ + res = execute_and_fetch_all(cursor, "MATCH (node) return node;") + assert 1 == len(res) + + # 4/ + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + assert expected_data == actual_data + + if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/replication/show_while_creating_invalid_state.py b/tests/e2e/replication/show_while_creating_invalid_state.py index 73d451cc4..d302a8956 100644 --- a/tests/e2e/replication/show_while_creating_invalid_state.py +++ b/tests/e2e/replication/show_while_creating_invalid_state.py @@ -78,16 +78,24 @@ def test_show_replicas(connection): # 1/ actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) - EXPECTED_COLUMN_NAMES = {"name", "socket_address", "sync_mode", "timeout", "state"} + EXPECTED_COLUMN_NAMES = { + "name", + "socket_address", + "sync_mode", + "timeout", + "current_timestamp_of_replica", + "number_of_timestamp_behind_master", + "state", + } actual_column_names = {x.name for x in cursor.description} assert EXPECTED_COLUMN_NAMES == actual_column_names expected_data = { - ("replica_1", "127.0.0.1:10001", "sync", 0, "ready"), - ("replica_2", "127.0.0.1:10002", "sync", 1.0, "ready"), - ("replica_3", "127.0.0.1:10003", "async", None, "ready"), - ("replica_4", "127.0.0.1:10004", "async", None, "ready"), + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "ready"), } assert expected_data == actual_data @@ -95,9 +103,9 @@ def test_show_replicas(connection): execute_and_fetch_all(cursor, "DROP REPLICA replica_2") actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) expected_data = { - ("replica_1", "127.0.0.1:10001", "sync", 0, "ready"), - ("replica_3", "127.0.0.1:10003", "async", None, "ready"), - ("replica_4", "127.0.0.1:10004", "async", None, "ready"), + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "ready"), } assert expected_data == actual_data @@ -110,9 +118,9 @@ def test_show_replicas(connection): time.sleep(2) actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) expected_data = { - ("replica_1", "127.0.0.1:10001", "sync", 0, "invalid"), - ("replica_3", "127.0.0.1:10003", "async", None, "invalid"), - ("replica_4", "127.0.0.1:10004", "async", None, "invalid"), + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, 0, "invalid"), + ("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "invalid"), + ("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "invalid"), } assert expected_data == actual_data diff --git a/tests/e2e/replication/workloads.yaml b/tests/e2e/replication/workloads.yaml index 1aae85047..a05fc6aca 100644 --- a/tests/e2e/replication/workloads.yaml +++ b/tests/e2e/replication/workloads.yaml @@ -69,7 +69,7 @@ workloads: args: ["--bolt-port", "7687", "--log-level=TRACE"] log_file: "replication-e2e-main.log" setup_queries: [ - "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001'", + "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 2 TO '127.0.0.1:10001'", "REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'", "REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'" ]