Gerneral clean-up

This commit is contained in:
gvolfing 2022-08-10 08:33:40 +02:00 committed by Tyler Neely
parent 08fe7b09db
commit 3c9fe7ef42
4 changed files with 32 additions and 146 deletions
src
tests/simulation

View File

@ -105,7 +105,6 @@ class Coordinator {
/// Increment our
ReadResponses Read(HlcRequest hlc_request) {
std::cout << "HlcRequest->HlcResponse" << std::endl;
HlcResponse res{};
auto hlc_shard_map = shard_map_.GetHlc();
@ -125,8 +124,6 @@ class Coordinator {
}
GetShardMapResponse Read(GetShardMapRequest &&get_shard_map_request) {
std::cout << "GetShardMapRequest" << std::endl;
GetShardMapResponse res;
res.shard_map = shard_map_;
return res;
@ -179,22 +176,22 @@ class Coordinator {
explicit Coordinator(ShardMap sm) : shard_map_{(sm)} {}
ReadResponses Read(ReadRequests requests) {
if (std::get_if<HlcRequest>(&requests)) {
std::cout << "HlcRequest" << std::endl;
} else if (std::get_if<GetShardMapRequest>(&requests)) {
std::cout << "GetShardMapRequest" << std::endl;
} else {
std::cout << "idk requests" << std::endl;
}
std::cout << "Coordinator Read()" << std::endl;
// if (std::get_if<HlcRequest>(&requests)) {
// std::cout << "HlcRequest" << std::endl;
// } else if (std::get_if<GetShardMapRequest>(&requests)) {
// std::cout << "GetShardMapRequest" << std::endl;
// } else {
// std::cout << "idk requests" << std::endl;
// }
// std::cout << "Coordinator Read()" << std::endl;
auto ret = std::visit([&](auto requests) { return Read(requests); }, (requests));
if (std::get_if<HlcResponse>(&ret)) {
std::cout << "HlcResponse" << std::endl;
} else if (std::get_if<GetShardMapResponse>(&ret)) {
std::cout << "GetShardMapResponse" << std::endl;
} else {
std::cout << "idk response" << std::endl;
}
// if (std::get_if<HlcResponse>(&ret)) {
// std::cout << "HlcResponse" << std::endl;
// } else if (std::get_if<GetShardMapResponse>(&ret)) {
// std::cout << "GetShardMapResponse" << std::endl;
// } else {
// std::cout << "idk response" << std::endl;
// }
return ret;
}

View File

@ -90,15 +90,12 @@ class StorageRsm {
StorageGetResponse ret;
if (!IsKeyInRange(request.key)) {
std::cout << "ONE" << std::endl;
ret.latest_known_shard_map_version = shard_map_version_;
ret.shard_rsm_success = false;
} else if (state_.contains(request.key)) {
std::cout << "TWO" << std::endl;
ret.value = state_[request.key];
ret.shard_rsm_success = true;
} else {
std::cout << "THREE" << std::endl;
ret.shard_rsm_success = false;
ret.value = std::nullopt;
}
@ -112,12 +109,10 @@ class StorageRsm {
if (!IsKeyInRange(request.key)) {
ret.latest_known_shard_map_version = shard_map_version_;
ret.shard_rsm_success = false;
std::cout << "WRITE 0" << std::endl;
}
// Key exist
else if (state_.contains(request.key)) {
auto &val = state_[request.key];
std::cout << "WRITE 1" << std::endl;
/*
* Delete
@ -126,7 +121,6 @@ class StorageRsm {
ret.shard_rsm_success = true;
ret.last_value = val;
state_.erase(state_.find(request.key));
std::cout << "WRITE 2" << std::endl;
}
/*
@ -138,12 +132,10 @@ class StorageRsm {
ret.shard_rsm_success = true;
val = request.value.value();
std::cout << "WRITE 3" << std::endl;
} else {
ret.last_value = val;
ret.shard_rsm_success = false;
std::cout << "WRITE 4" << std::endl;
}
}
/*
@ -154,10 +146,8 @@ class StorageRsm {
ret.shard_rsm_success = true;
state_.emplace(request.key, std::move(request.value).value());
std::cout << "WRITE 5" << std::endl;
}
std::cout << "WRITE ret" << std::endl;
return ret;
}
};

View File

@ -117,91 +117,6 @@ class TestState {
}
};
// template <typename IoImpl, typename WriteRequestT, typename WriteResponseT, typename ReadRequestT,
// typename ReadResponseT>
// class RsmClient {
// using ServerPool = std::vector<Address>;
// IoImpl io_;
// Address leader_;
// std::mt19937 cli_rng_{0};
// ServerPool server_addrs_;
// template <typename ResponseT>
// std::optional<ResponseT> CheckForCorrectLeader(ResponseT response) {
// if (response.retry_leader) {
// MG_ASSERT(!response.success, "retry_leader should never be set for successful responses");
// leader_ = response.retry_leader.value();
// std::cout << "client redirected to leader server " << leader_.last_known_port << std::endl;
// } else if (!response.success) {
// std::uniform_int_distribution<size_t> addr_distrib(0, (server_addrs_.size() - 1));
// size_t addr_index = addr_distrib(cli_rng_);
// leader_ = server_addrs_[addr_index];
// std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index
// << " with port " << leader_.last_known_port << std::endl;
// return {};
// }
// return response;
// }
// public:
// RsmClient(IoImpl &&io, Address &&leader, ServerPool &&server_addrs)
// : io_{io}, leader_{leader}, server_addrs_{server_addrs} {}
// RsmClient() = delete;
// std::optional<WriteResponse<WriteResponseT>> SendWriteRequest(WriteRequestT req) {
// WriteRequest<WriteRequestT> client_req;
// client_req.operation = req;
// std::cout << "client sending CasRequest to Leader " << leader_.last_known_port << std::endl;
// ResponseFuture<WriteResponse<WriteResponseT>> response_future =
// io_.template Request<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(leader_, client_req);
// ResponseResult<WriteResponse<WriteResponseT>> response_result = std::move(response_future).Wait();
// if (response_result.HasError()) {
// std::cout << "client timed out while trying to communicate with leader server " << std::endl;
// // continue;
// return std::nullopt;
// }
// ResponseEnvelope<WriteResponse<WriteResponseT>> response_envelope = response_result.GetValue();
// WriteResponse<WriteResponseT> write_response = response_envelope.message;
// return CheckForCorrectLeader(write_response);
// }
// std::optional<ReadResponse<ReadResponseT>> SendReadRequest(ReadRequestT req) {
// ReadRequest<ReadRequestT> read_req;
// read_req.operation = req;
// std::cout << "client sending GetRequest to Leader " << leader_.last_known_port << std::endl;
// ResponseFuture<ReadResponse<ReadResponseT>> get_response_future =
// io_.template Request<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(leader_, read_req);
// // receive response
// ResponseResult<ReadResponse<ReadResponseT>> get_response_result = std::move(get_response_future).Wait();
// if (get_response_result.HasError()) {
// std::cout << "client timed out while trying to communicate with leader server " << std::endl;
// return {};
// }
// ResponseEnvelope<ReadResponse<ReadResponseT>> get_response_envelope = get_response_result.GetValue();
// ReadResponse<ReadResponseT> read_get_response = get_response_envelope.message;
// if (!read_get_response.success) {
// // sent to a non-leader
// return {};
// }
// return CheckForCorrectLeader(read_get_response);
// }
// };
template <typename IoImpl>
void RunRaft(Raft<IoImpl, TestState, CasRequest, CasResponse, GetRequest, GetResponse> server) {
server.Run();

View File

@ -129,22 +129,13 @@ int main() {
.scramble_messages = true,
.rng_seed = 0,
.start_time = Time::min() + std::chrono::microseconds{256 * 1024},
.abort_time = Time::min() + std::chrono::microseconds{8 * 1024 * 1024},
.abort_time = Time::min() + std::chrono::microseconds{2 * 8 * 1024 * 1024},
};
auto simulator = Simulator(config);
Io<SimulatorTransport> cli_io = simulator.RegisterNew();
// auto c_thread_1 = std::jthread(RunRaft< Coordinator>, std::move(c_1));
// simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[0]);
// auto c_thread_2 = std::jthread(RunRaft< Coordinator>, std::move(c_2));
// simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[1]);
// auto c_thread_3 = std::jthread(RunRaft<Coordinator>, std::move(c_3));
// simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[2]);
// Register
Io<SimulatorTransport> a_io_1 = simulator.RegisterNew();
Io<SimulatorTransport> a_io_2 = simulator.RegisterNew();
@ -255,37 +246,25 @@ int main() {
// Look for Shard
auto read_res_opt = coordinator_client.SendReadRequest(req);
if (!read_res_opt) {
std::cout << "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!0" << std::endl;
continue;
}
if (!read_res_opt.value().success) {
std::cout << "Not successful." << std::endl;
continue;
}
std::cout << "Before" << std::endl;
auto read_res = read_res_opt.value();
std::cout << "After" << std::endl;
auto res = std::get<memgraph::coordinator::HlcResponse>(read_res.read_return);
// Transaction ID to be used later...
auto transaction_id = res.new_hlc;
std::cout << "transaction_id: " << transaction_id.logical_id << std::endl;
if (!res.fresher_shard_map) {
// continue;
std::cout << "Something is really not OK..." << std::endl;
}
std::cout << "Before2" << std::endl;
client_shard_map = res.fresher_shard_map.value();
std::cout << "After2" << std::endl;
// TODO(gabor) check somewhere in the call chain if the entries are actually valid
for (auto &[key, val] : client_shard_map.GetShards()) {
std::cout << "key: " << key << std::endl;
}
// for (auto &[key, val] : client_shard_map.GetShards()) {
// std::cout << "key: " << key << std::endl;
// }
auto target_shard = client_shard_map.GetShardForKey(std::string("label1"), cm_k);
@ -293,9 +272,7 @@ int main() {
auto storage_client_opt = DetermineShardLocation(target_shard, a_addrs, shard_a_client, b_addrs, shard_b_client);
MG_ASSERT(storage_client_opt);
std::cout << "Before3" << std::endl;
auto storage_client = storage_client_opt.value();
std::cout << "After3" << std::endl;
// Have client use shard map to decide which shard to communicate
// with in order to write a new value
@ -307,7 +284,6 @@ int main() {
storage_req.value = 1000;
auto write_res_opt = storage_client.SendWriteRequest(storage_req);
if (!write_res_opt) {
std::cout << "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1" << std::endl;
continue;
}
auto write_res = write_res_opt.value().write_return;
@ -325,19 +301,27 @@ int main() {
storage_get_req.key = {write_key_1, write_key_2};
auto get_res_opt = storage_client.SendReadRequest(storage_get_req);
if (!get_res_opt) {
std::cout << "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!2" << std::endl;
continue;
}
auto get_res = get_res_opt.value();
auto val = get_res.read_return.value.value();
std::cout << "val -> " << val << std::endl;
MG_ASSERT(get_res.read_return.value == 1000);
MG_ASSERT(val == 1000);
break;
}
simulator.ShutDown();
SimulatorStats stats = simulator.Stats();
std::cout << "total messages: " << stats.total_messages << std::endl;
std::cout << "dropped messages: " << stats.dropped_messages << std::endl;
std::cout << "timed out requests: " << stats.timed_out_requests << std::endl;
std::cout << "total requests: " << stats.total_requests << std::endl;
std::cout << "total responses: " << stats.total_responses << std::endl;
std::cout << "simulator ticks: " << stats.simulator_ticks << std::endl;
std::cout << "========================== SUCCESS :) ==========================" << std::endl;
return 0;
}