Merge branch 'T0912-MG-in-memory-shard-map' of github.com:memgraph/memgraph into T0916-MG-fbthrift-transport
This commit is contained in:
commit
a994b364ca
@ -72,6 +72,9 @@ class Io {
|
||||
/// without an explicit timeout set.
|
||||
void SetDefaultTimeout(Duration timeout) { default_timeout_ = timeout; }
|
||||
|
||||
/// Returns the current default timeout for this Io instance.
|
||||
Duration GetDefaultTimeout() { return default_timeout_; }
|
||||
|
||||
/// Issue a request with an explicit timeout in microseconds provided. This tends to be used by clients.
|
||||
template <Message Request, Message Response>
|
||||
ResponseFuture<Response> RequestWithTimeout(Address address, Request request, Duration timeout) {
|
||||
|
@ -187,13 +187,12 @@ void RunSimulation() {
|
||||
|
||||
cas_req.new_value = i;
|
||||
|
||||
auto write_cas_response_opt = client.SendWriteRequest(cas_req);
|
||||
if (!write_cas_response_opt) {
|
||||
auto write_cas_response_result = client.SendWriteRequest(cas_req);
|
||||
if (write_cas_response_result.HasError()) {
|
||||
// timed out
|
||||
continue;
|
||||
}
|
||||
auto write_cas_response = write_cas_response_opt.value();
|
||||
|
||||
CasResponse cas_response = write_cas_response.write_return;
|
||||
CasResponse cas_response = write_cas_response_result.GetValue();
|
||||
|
||||
bool cas_succeeded = cas_response.cas_success;
|
||||
|
||||
@ -213,13 +212,12 @@ void RunSimulation() {
|
||||
GetRequest get_req;
|
||||
get_req.key = key;
|
||||
|
||||
auto read_get_response_opt = client.SendReadRequest(get_req);
|
||||
if (!read_get_response_opt) {
|
||||
auto read_get_response_result = client.SendReadRequest(get_req);
|
||||
if (read_get_response_result.HasError()) {
|
||||
// timed out
|
||||
continue;
|
||||
}
|
||||
auto read_get_response = read_get_response_opt.value();
|
||||
|
||||
GetResponse get_response = read_get_response.read_return;
|
||||
GetResponse get_response = read_get_response_result.GetValue();
|
||||
|
||||
MG_ASSERT(get_response.value == i);
|
||||
|
||||
|
@ -19,17 +19,21 @@
|
||||
#include <vector>
|
||||
|
||||
#include "io/address.hpp"
|
||||
#include "io/errors.hpp"
|
||||
#include "io/rsm/coordinator_rsm.hpp"
|
||||
#include "io/rsm/raft.hpp"
|
||||
#include "io/rsm/shard_rsm.hpp"
|
||||
#include "io/simulator/simulator.hpp"
|
||||
#include "io/simulator/simulator_transport.hpp"
|
||||
#include "utils/result.hpp"
|
||||
#include "utils/rsm_client.hpp"
|
||||
|
||||
using memgraph::coordinator::Address;
|
||||
using memgraph::coordinator::AddressAndStatus;
|
||||
using memgraph::coordinator::CompoundKey;
|
||||
using memgraph::coordinator::Coordinator;
|
||||
using memgraph::coordinator::HlcRequest;
|
||||
using memgraph::coordinator::HlcResponse;
|
||||
using memgraph::coordinator::Shard;
|
||||
using memgraph::coordinator::ShardMap;
|
||||
using memgraph::coordinator::Shards;
|
||||
@ -39,6 +43,7 @@ using memgraph::io::Io;
|
||||
using memgraph::io::ResponseEnvelope;
|
||||
using memgraph::io::ResponseFuture;
|
||||
using memgraph::io::Time;
|
||||
using memgraph::io::TimedOut;
|
||||
using memgraph::io::rsm::CoordinatorRsm;
|
||||
using memgraph::io::rsm::Raft;
|
||||
using memgraph::io::rsm::ReadRequest;
|
||||
@ -54,6 +59,7 @@ using memgraph::io::simulator::Simulator;
|
||||
using memgraph::io::simulator::SimulatorConfig;
|
||||
using memgraph::io::simulator::SimulatorStats;
|
||||
using memgraph::io::simulator::SimulatorTransport;
|
||||
using memgraph::utils::BasicResult;
|
||||
|
||||
using StorageClient =
|
||||
RsmClient<Io<SimulatorTransport>, StorageWriteRequest, StorageWriteResponse, StorageGetRequest, StorageGetResponse>;
|
||||
@ -236,29 +242,27 @@ int main() {
|
||||
|
||||
while (true) {
|
||||
// Create CompoundKey
|
||||
auto cm_key_1 = memgraph::storage::v3::PropertyValue(3);
|
||||
auto cm_key_2 = memgraph::storage::v3::PropertyValue(4);
|
||||
const auto cm_key_1 = memgraph::storage::v3::PropertyValue(3);
|
||||
const auto cm_key_2 = memgraph::storage::v3::PropertyValue(4);
|
||||
|
||||
CompoundKey cm_k = {cm_key_1, cm_key_2};
|
||||
const CompoundKey cm_k = {cm_key_1, cm_key_2};
|
||||
|
||||
// Look for Shard
|
||||
auto read_res_opt = coordinator_client.SendReadRequest(req);
|
||||
if (!read_res_opt) {
|
||||
BasicResult<TimedOut, memgraph::coordinator::ReadResponses> read_res = coordinator_client.SendReadRequest(req);
|
||||
|
||||
if (read_res.HasError()) {
|
||||
// timeout
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!read_res_opt.value().success) {
|
||||
continue;
|
||||
}
|
||||
auto coordinator_read_response = read_res.GetValue();
|
||||
HlcResponse hlc_response = std::get<HlcResponse>(coordinator_read_response);
|
||||
|
||||
auto read_res = read_res_opt.value();
|
||||
|
||||
auto res = std::get<memgraph::coordinator::HlcResponse>(read_res.read_return);
|
||||
// Transaction ID to be used later...
|
||||
auto transaction_id = res.new_hlc;
|
||||
auto transaction_id = hlc_response.new_hlc;
|
||||
|
||||
if (res.fresher_shard_map) {
|
||||
client_shard_map = res.fresher_shard_map.value();
|
||||
if (hlc_response.fresher_shard_map) {
|
||||
client_shard_map = hlc_response.fresher_shard_map.value();
|
||||
}
|
||||
|
||||
// TODO(gabor) check somewhere in the call chain if the entries are actually valid
|
||||
@ -277,34 +281,38 @@ int main() {
|
||||
// Have client use shard map to decide which shard to communicate
|
||||
// with in order to write a new value
|
||||
// client_shard_map.
|
||||
StorageWriteRequest storage_req;
|
||||
auto write_key_1 = memgraph::storage::PropertyValue(3);
|
||||
auto write_key_2 = memgraph::storage::PropertyValue(4);
|
||||
|
||||
StorageWriteRequest storage_req;
|
||||
storage_req.key = {write_key_1, write_key_2};
|
||||
storage_req.value = 1000;
|
||||
auto write_res_opt = storage_client.SendWriteRequest(storage_req);
|
||||
if (!write_res_opt) {
|
||||
|
||||
auto write_response_result = storage_client.SendWriteRequest(storage_req);
|
||||
if (write_response_result.HasError()) {
|
||||
// timed out
|
||||
continue;
|
||||
}
|
||||
auto write_res = write_res_opt.value().write_return;
|
||||
auto write_response = write_response_result.GetValue();
|
||||
|
||||
bool cas_succeeded = write_res.shard_rsm_success;
|
||||
bool cas_succeeded = write_response.shard_rsm_success;
|
||||
|
||||
if (!cas_succeeded) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Have client use shard map to decide which shard to communicate
|
||||
// with to read that same value back
|
||||
|
||||
StorageGetRequest storage_get_req;
|
||||
storage_get_req.key = {write_key_1, write_key_2};
|
||||
auto get_res_opt = storage_client.SendReadRequest(storage_get_req);
|
||||
if (!get_res_opt) {
|
||||
|
||||
auto get_response_result = storage_client.SendReadRequest(storage_get_req);
|
||||
if (get_response_result.HasError()) {
|
||||
// timed out
|
||||
continue;
|
||||
}
|
||||
auto get_res = get_res_opt.value();
|
||||
auto val = get_res.read_return.value.value();
|
||||
auto get_response = get_response_result.GetValue();
|
||||
auto val = get_response.value.value();
|
||||
|
||||
MG_ASSERT(val == 1000);
|
||||
break;
|
||||
|
@ -15,15 +15,20 @@
|
||||
|
||||
#include "io/address.hpp"
|
||||
#include "io/rsm/raft.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
using memgraph::io::Address;
|
||||
using memgraph::io::Duration;
|
||||
using memgraph::io::ResponseEnvelope;
|
||||
using memgraph::io::ResponseFuture;
|
||||
using memgraph::io::ResponseResult;
|
||||
using memgraph::io::Time;
|
||||
using memgraph::io::TimedOut;
|
||||
using memgraph::io::rsm::ReadRequest;
|
||||
using memgraph::io::rsm::ReadResponse;
|
||||
using memgraph::io::rsm::WriteRequest;
|
||||
using memgraph::io::rsm::WriteResponse;
|
||||
using memgraph::utils::BasicResult;
|
||||
|
||||
template <typename IoImpl, typename WriteRequestT, typename WriteResponseT, typename ReadRequestT,
|
||||
typename ReadResponseT>
|
||||
@ -37,22 +42,21 @@ class RsmClient {
|
||||
ServerPool server_addrs_;
|
||||
|
||||
template <typename ResponseT>
|
||||
std::optional<ResponseT> CheckForCorrectLeader(ResponseT response) {
|
||||
void PossiblyRedirectLeader(const ResponseT &response) {
|
||||
if (response.retry_leader) {
|
||||
MG_ASSERT(!response.success, "retry_leader should never be set for successful responses");
|
||||
leader_ = response.retry_leader.value();
|
||||
std::cout << "client redirected to leader server " << leader_.last_known_port << std::endl;
|
||||
spdlog::debug("client redirected to leader server {}", leader_.ToString());
|
||||
} else if (!response.success) {
|
||||
std::uniform_int_distribution<size_t> addr_distrib(0, (server_addrs_.size() - 1));
|
||||
size_t addr_index = addr_distrib(cli_rng_);
|
||||
leader_ = server_addrs_[addr_index];
|
||||
|
||||
std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index
|
||||
<< " with port " << leader_.last_known_port << std::endl;
|
||||
return std::nullopt;
|
||||
spdlog::debug(
|
||||
"client NOT redirected to leader server despite our success failing to be processed (it probably was sent to "
|
||||
"a RSM Candidate) trying a random one at index {} with address {}",
|
||||
addr_index, leader_.ToString());
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
public:
|
||||
@ -61,52 +65,69 @@ class RsmClient {
|
||||
|
||||
RsmClient() = delete;
|
||||
|
||||
std::optional<WriteResponse<WriteResponseT>> SendWriteRequest(WriteRequestT req) {
|
||||
BasicResult<TimedOut, WriteResponseT> SendWriteRequest(WriteRequestT req) {
|
||||
WriteRequest<WriteRequestT> client_req;
|
||||
client_req.operation = req;
|
||||
|
||||
std::cout << "client sending CasRequest to Leader " << leader_.last_known_port << std::endl;
|
||||
ResponseFuture<WriteResponse<WriteResponseT>> response_future =
|
||||
io_.template Request<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(leader_, client_req);
|
||||
ResponseResult<WriteResponse<WriteResponseT>> response_result = std::move(response_future).Wait();
|
||||
const Duration overall_timeout = io_.GetDefaultTimeout();
|
||||
const Time before = io_.Now();
|
||||
|
||||
if (response_result.HasError()) {
|
||||
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
|
||||
// continue;
|
||||
return std::nullopt;
|
||||
}
|
||||
do {
|
||||
spdlog::debug("client sending CasRequest to Leader {}", leader_.ToString());
|
||||
ResponseFuture<WriteResponse<WriteResponseT>> response_future =
|
||||
io_.template Request<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(leader_, client_req);
|
||||
ResponseResult<WriteResponse<WriteResponseT>> response_result = std::move(response_future).Wait();
|
||||
|
||||
ResponseEnvelope<WriteResponse<WriteResponseT>> response_envelope = response_result.GetValue();
|
||||
WriteResponse<WriteResponseT> write_response = response_envelope.message;
|
||||
if (response_result.HasError()) {
|
||||
spdlog::debug("client timed out while trying to communicate with leader server {}", leader_.ToString());
|
||||
// continue;
|
||||
return response_result.GetError();
|
||||
}
|
||||
|
||||
return CheckForCorrectLeader(write_response);
|
||||
ResponseEnvelope<WriteResponse<WriteResponseT>> &&response_envelope = std::move(response_result.GetValue());
|
||||
WriteResponse<WriteResponseT> &&write_response = std::move(response_envelope.message);
|
||||
|
||||
if (write_response.success) {
|
||||
return std::move(write_response.write_return);
|
||||
}
|
||||
|
||||
PossiblyRedirectLeader(write_response);
|
||||
} while (io_.Now() < before + overall_timeout);
|
||||
|
||||
return TimedOut{};
|
||||
}
|
||||
|
||||
std::optional<ReadResponse<ReadResponseT>> SendReadRequest(ReadRequestT req) {
|
||||
BasicResult<TimedOut, ReadResponseT> SendReadRequest(ReadRequestT req) {
|
||||
ReadRequest<ReadRequestT> read_req;
|
||||
read_req.operation = req;
|
||||
|
||||
std::cout << "client sending GetRequest to Leader " << leader_.last_known_port << std::endl;
|
||||
const Duration overall_timeout = io_.GetDefaultTimeout();
|
||||
const Time before = io_.Now();
|
||||
|
||||
ResponseFuture<ReadResponse<ReadResponseT>> get_response_future =
|
||||
io_.template Request<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(leader_, read_req);
|
||||
do {
|
||||
spdlog::debug("client sending GetRequest to Leader {}", leader_.ToString());
|
||||
|
||||
// receive response
|
||||
ResponseResult<ReadResponse<ReadResponseT>> get_response_result = std::move(get_response_future).Wait();
|
||||
ResponseFuture<ReadResponse<ReadResponseT>> get_response_future =
|
||||
io_.template Request<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(leader_, read_req);
|
||||
|
||||
if (get_response_result.HasError()) {
|
||||
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
|
||||
return std::nullopt;
|
||||
}
|
||||
// receive response
|
||||
ResponseResult<ReadResponse<ReadResponseT>> get_response_result = std::move(get_response_future).Wait();
|
||||
|
||||
ResponseEnvelope<ReadResponse<ReadResponseT>> get_response_envelope = get_response_result.GetValue();
|
||||
ReadResponse<ReadResponseT> read_get_response = get_response_envelope.message;
|
||||
if (get_response_result.HasError()) {
|
||||
spdlog::debug("client timed out while trying to communicate with leader server {}", leader_.ToString());
|
||||
return get_response_result.GetError();
|
||||
}
|
||||
|
||||
// if (!read_get_response.success) {
|
||||
// // sent to a non-leader
|
||||
// return {};
|
||||
// }
|
||||
ResponseEnvelope<ReadResponse<ReadResponseT>> &&get_response_envelope = std::move(get_response_result.GetValue());
|
||||
ReadResponse<ReadResponseT> &&read_get_response = std::move(get_response_envelope.message);
|
||||
|
||||
return CheckForCorrectLeader(read_get_response);
|
||||
if (read_get_response.success) {
|
||||
return std::move(read_get_response.read_return);
|
||||
}
|
||||
|
||||
PossiblyRedirectLeader(read_get_response);
|
||||
} while (io_.Now() < before + overall_timeout);
|
||||
|
||||
return TimedOut{};
|
||||
}
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user