Move RsmClient into a separate folder and header

This commit is contained in:
gvolfing 2022-08-08 17:59:39 +02:00 committed by Tyler Neely
parent 74469d7f79
commit 169320d27b
2 changed files with 67 additions and 66 deletions
tests/simulation

View File

@ -31,4 +31,4 @@ add_simulation_test(raft.cpp address)
add_simulation_test(trial_query_storage/query_storage_test.cpp address)
add_simulation_test(sharded_map.cpp address)
#add_simulation_test(sharded_map.cpp address)

View File

@ -22,6 +22,7 @@
#include "io/rsm/raft.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "utils/rsm_client.hpp"
using memgraph::io::Address;
using memgraph::io::Duration;
@ -116,90 +117,90 @@ class TestState {
}
};
template <typename IoImpl, typename WriteRequestT, typename WriteResponseT, typename ReadRequestT,
typename ReadResponseT>
class RsmClient {
using ServerPool = std::vector<Address>;
// template <typename IoImpl, typename WriteRequestT, typename WriteResponseT, typename ReadRequestT,
// typename ReadResponseT>
// class RsmClient {
// using ServerPool = std::vector<Address>;
IoImpl io_;
Address leader_;
// IoImpl io_;
// Address leader_;
std::mt19937 cli_rng_{0};
ServerPool server_addrs_;
// std::mt19937 cli_rng_{0};
// ServerPool server_addrs_;
template <typename ResponseT>
std::optional<ResponseT> CheckForCorrectLeader(ResponseT response) {
if (response.retry_leader) {
MG_ASSERT(!response.success, "retry_leader should never be set for successful responses");
leader_ = response.retry_leader.value();
std::cout << "client redirected to leader server " << leader_.last_known_port << std::endl;
} else if (!response.success) {
std::uniform_int_distribution<size_t> addr_distrib(0, (server_addrs_.size() - 1));
size_t addr_index = addr_distrib(cli_rng_);
leader_ = server_addrs_[addr_index];
// template <typename ResponseT>
// std::optional<ResponseT> CheckForCorrectLeader(ResponseT response) {
// if (response.retry_leader) {
// MG_ASSERT(!response.success, "retry_leader should never be set for successful responses");
// leader_ = response.retry_leader.value();
// std::cout << "client redirected to leader server " << leader_.last_known_port << std::endl;
// } else if (!response.success) {
// std::uniform_int_distribution<size_t> addr_distrib(0, (server_addrs_.size() - 1));
// size_t addr_index = addr_distrib(cli_rng_);
// leader_ = server_addrs_[addr_index];
std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index
<< " with port " << leader_.last_known_port << std::endl;
return {};
}
// std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index
// << " with port " << leader_.last_known_port << std::endl;
// return {};
// }
return response;
}
// return response;
// }
public:
RsmClient(IoImpl &&io, Address &&leader, ServerPool &&server_addrs)
: io_{io}, leader_{leader}, server_addrs_{server_addrs} {}
// public:
// RsmClient(IoImpl &&io, Address &&leader, ServerPool &&server_addrs)
// : io_{io}, leader_{leader}, server_addrs_{server_addrs} {}
RsmClient() = delete;
// RsmClient() = delete;
std::optional<WriteResponse<WriteResponseT>> SendWriteRequest(WriteRequestT req) {
WriteRequest<WriteRequestT> client_req;
client_req.operation = req;
// std::optional<WriteResponse<WriteResponseT>> SendWriteRequest(WriteRequestT req) {
// WriteRequest<WriteRequestT> client_req;
// client_req.operation = req;
std::cout << "client sending CasRequest to Leader " << leader_.last_known_port << std::endl;
ResponseFuture<WriteResponse<WriteResponseT>> response_future =
io_.template Request<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(leader_, client_req);
ResponseResult<WriteResponse<WriteResponseT>> response_result = std::move(response_future).Wait();
// std::cout << "client sending CasRequest to Leader " << leader_.last_known_port << std::endl;
// ResponseFuture<WriteResponse<WriteResponseT>> response_future =
// io_.template Request<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(leader_, client_req);
// ResponseResult<WriteResponse<WriteResponseT>> response_result = std::move(response_future).Wait();
if (response_result.HasError()) {
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
// continue;
return std::nullopt;
}
// if (response_result.HasError()) {
// std::cout << "client timed out while trying to communicate with leader server " << std::endl;
// // continue;
// return std::nullopt;
// }
ResponseEnvelope<WriteResponse<WriteResponseT>> response_envelope = response_result.GetValue();
WriteResponse<WriteResponseT> write_response = response_envelope.message;
// ResponseEnvelope<WriteResponse<WriteResponseT>> response_envelope = response_result.GetValue();
// WriteResponse<WriteResponseT> write_response = response_envelope.message;
return CheckForCorrectLeader(write_response);
}
// return CheckForCorrectLeader(write_response);
// }
std::optional<ReadResponse<ReadResponseT>> SendReadRequest(ReadRequestT req) {
ReadRequest<ReadRequestT> read_req;
read_req.operation = req;
// std::optional<ReadResponse<ReadResponseT>> SendReadRequest(ReadRequestT req) {
// ReadRequest<ReadRequestT> read_req;
// read_req.operation = req;
std::cout << "client sending GetRequest to Leader " << leader_.last_known_port << std::endl;
ResponseFuture<ReadResponse<ReadResponseT>> get_response_future =
io_.template Request<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(leader_, read_req);
// std::cout << "client sending GetRequest to Leader " << leader_.last_known_port << std::endl;
// ResponseFuture<ReadResponse<ReadResponseT>> get_response_future =
// io_.template Request<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(leader_, read_req);
// receive response
ResponseResult<ReadResponse<ReadResponseT>> get_response_result = std::move(get_response_future).Wait();
// // receive response
// ResponseResult<ReadResponse<ReadResponseT>> get_response_result = std::move(get_response_future).Wait();
if (get_response_result.HasError()) {
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
return {};
}
// if (get_response_result.HasError()) {
// std::cout << "client timed out while trying to communicate with leader server " << std::endl;
// return {};
// }
ResponseEnvelope<ReadResponse<ReadResponseT>> get_response_envelope = get_response_result.GetValue();
ReadResponse<ReadResponseT> read_get_response = get_response_envelope.message;
// ResponseEnvelope<ReadResponse<ReadResponseT>> get_response_envelope = get_response_result.GetValue();
// ReadResponse<ReadResponseT> read_get_response = get_response_envelope.message;
if (!read_get_response.success) {
// sent to a non-leader
return {};
}
// if (!read_get_response.success) {
// // sent to a non-leader
// return {};
// }
return CheckForCorrectLeader(read_get_response);
}
};
// return CheckForCorrectLeader(read_get_response);
// }
// };
template <typename IoImpl>
void RunRaft(Raft<IoImpl, TestState, CasRequest, CasResponse, GetRequest, GetResponse> server) {