diff --git a/tests/simulation/CMakeLists.txt b/tests/simulation/CMakeLists.txt index b44dacb36..15c426e8c 100644 --- a/tests/simulation/CMakeLists.txt +++ b/tests/simulation/CMakeLists.txt @@ -31,4 +31,4 @@ add_simulation_test(raft.cpp address) add_simulation_test(trial_query_storage/query_storage_test.cpp address) -add_simulation_test(sharded_map.cpp address) +#add_simulation_test(sharded_map.cpp address) diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp index 29f0a97dd..6f6d8fb08 100644 --- a/tests/simulation/raft.cpp +++ b/tests/simulation/raft.cpp @@ -22,6 +22,7 @@ #include "io/rsm/raft.hpp" #include "io/simulator/simulator.hpp" #include "io/simulator/simulator_transport.hpp" +#include "utils/rsm_client.hpp" using memgraph::io::Address; using memgraph::io::Duration; @@ -116,90 +117,90 @@ class TestState { } }; -template <typename IoImpl, typename WriteRequestT, typename WriteResponseT, typename ReadRequestT, - typename ReadResponseT> -class RsmClient { - using ServerPool = std::vector<Address>; +// template <typename IoImpl, typename WriteRequestT, typename WriteResponseT, typename ReadRequestT, +// typename ReadResponseT> +// class RsmClient { +// using ServerPool = std::vector<Address>; - IoImpl io_; - Address leader_; +// IoImpl io_; +// Address leader_; - std::mt19937 cli_rng_{0}; - ServerPool server_addrs_; +// std::mt19937 cli_rng_{0}; +// ServerPool server_addrs_; - template <typename ResponseT> - std::optional<ResponseT> CheckForCorrectLeader(ResponseT response) { - if (response.retry_leader) { - MG_ASSERT(!response.success, "retry_leader should never be set for successful responses"); - leader_ = response.retry_leader.value(); - std::cout << "client redirected to leader server " << leader_.last_known_port << std::endl; - } else if (!response.success) { - std::uniform_int_distribution<size_t> addr_distrib(0, (server_addrs_.size() - 1)); - size_t addr_index = addr_distrib(cli_rng_); - leader_ = server_addrs_[addr_index]; +// template <typename ResponseT> +// std::optional<ResponseT> CheckForCorrectLeader(ResponseT response) { +// if (response.retry_leader) { +// MG_ASSERT(!response.success, "retry_leader should never be set for successful responses"); +// leader_ = response.retry_leader.value(); +// std::cout << "client redirected to leader server " << leader_.last_known_port << std::endl; +// } else if (!response.success) { +// std::uniform_int_distribution<size_t> addr_distrib(0, (server_addrs_.size() - 1)); +// size_t addr_index = addr_distrib(cli_rng_); +// leader_ = server_addrs_[addr_index]; - std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index - << " with port " << leader_.last_known_port << std::endl; - return {}; - } +// std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index +// << " with port " << leader_.last_known_port << std::endl; +// return {}; +// } - return response; - } +// return response; +// } - public: - RsmClient(IoImpl &&io, Address &&leader, ServerPool &&server_addrs) - : io_{io}, leader_{leader}, server_addrs_{server_addrs} {} +// public: +// RsmClient(IoImpl &&io, Address &&leader, ServerPool &&server_addrs) +// : io_{io}, leader_{leader}, server_addrs_{server_addrs} {} - RsmClient() = delete; +// RsmClient() = delete; - std::optional<WriteResponse<WriteResponseT>> SendWriteRequest(WriteRequestT req) { - WriteRequest<WriteRequestT> client_req; - client_req.operation = req; +// std::optional<WriteResponse<WriteResponseT>> SendWriteRequest(WriteRequestT req) { +// WriteRequest<WriteRequestT> client_req; +// client_req.operation = req; - std::cout << "client sending CasRequest to Leader " << leader_.last_known_port << std::endl; - ResponseFuture<WriteResponse<WriteResponseT>> response_future = - io_.template Request<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(leader_, client_req); - ResponseResult<WriteResponse<WriteResponseT>> response_result = std::move(response_future).Wait(); +// std::cout << "client sending CasRequest to Leader " << leader_.last_known_port << std::endl; +// ResponseFuture<WriteResponse<WriteResponseT>> response_future = +// io_.template Request<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(leader_, client_req); +// ResponseResult<WriteResponse<WriteResponseT>> response_result = std::move(response_future).Wait(); - if (response_result.HasError()) { - std::cout << "client timed out while trying to communicate with leader server " << std::endl; - // continue; - return std::nullopt; - } +// if (response_result.HasError()) { +// std::cout << "client timed out while trying to communicate with leader server " << std::endl; +// // continue; +// return std::nullopt; +// } - ResponseEnvelope<WriteResponse<WriteResponseT>> response_envelope = response_result.GetValue(); - WriteResponse<WriteResponseT> write_response = response_envelope.message; +// ResponseEnvelope<WriteResponse<WriteResponseT>> response_envelope = response_result.GetValue(); +// WriteResponse<WriteResponseT> write_response = response_envelope.message; - return CheckForCorrectLeader(write_response); - } +// return CheckForCorrectLeader(write_response); +// } - std::optional<ReadResponse<ReadResponseT>> SendReadRequest(ReadRequestT req) { - ReadRequest<ReadRequestT> read_req; - read_req.operation = req; +// std::optional<ReadResponse<ReadResponseT>> SendReadRequest(ReadRequestT req) { +// ReadRequest<ReadRequestT> read_req; +// read_req.operation = req; - std::cout << "client sending GetRequest to Leader " << leader_.last_known_port << std::endl; - ResponseFuture<ReadResponse<ReadResponseT>> get_response_future = - io_.template Request<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(leader_, read_req); +// std::cout << "client sending GetRequest to Leader " << leader_.last_known_port << std::endl; +// ResponseFuture<ReadResponse<ReadResponseT>> get_response_future = +// io_.template Request<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(leader_, read_req); - // receive response - ResponseResult<ReadResponse<ReadResponseT>> get_response_result = std::move(get_response_future).Wait(); +// // receive response +// ResponseResult<ReadResponse<ReadResponseT>> get_response_result = std::move(get_response_future).Wait(); - if (get_response_result.HasError()) { - std::cout << "client timed out while trying to communicate with leader server " << std::endl; - return {}; - } +// if (get_response_result.HasError()) { +// std::cout << "client timed out while trying to communicate with leader server " << std::endl; +// return {}; +// } - ResponseEnvelope<ReadResponse<ReadResponseT>> get_response_envelope = get_response_result.GetValue(); - ReadResponse<ReadResponseT> read_get_response = get_response_envelope.message; +// ResponseEnvelope<ReadResponse<ReadResponseT>> get_response_envelope = get_response_result.GetValue(); +// ReadResponse<ReadResponseT> read_get_response = get_response_envelope.message; - if (!read_get_response.success) { - // sent to a non-leader - return {}; - } +// if (!read_get_response.success) { +// // sent to a non-leader +// return {}; +// } - return CheckForCorrectLeader(read_get_response); - } -}; +// return CheckForCorrectLeader(read_get_response); +// } +// }; template <typename IoImpl> void RunRaft(Raft<IoImpl, TestState, CasRequest, CasResponse, GetRequest, GetResponse> server) {