diff --git a/src/coordinator/coordinator.hpp b/src/coordinator/coordinator.hpp index 69a2e5cd7..259d10f9c 100644 --- a/src/coordinator/coordinator.hpp +++ b/src/coordinator/coordinator.hpp @@ -105,7 +105,6 @@ class Coordinator { /// Increment our ReadResponses Read(HlcRequest hlc_request) { - std::cout << "HlcRequest->HlcResponse" << std::endl; HlcResponse res{}; auto hlc_shard_map = shard_map_.GetHlc(); @@ -125,8 +124,6 @@ class Coordinator { } GetShardMapResponse Read(GetShardMapRequest &&get_shard_map_request) { - std::cout << "GetShardMapRequest" << std::endl; - GetShardMapResponse res; res.shard_map = shard_map_; return res; @@ -179,22 +176,22 @@ class Coordinator { explicit Coordinator(ShardMap sm) : shard_map_{(sm)} {} ReadResponses Read(ReadRequests requests) { - if (std::get_if<HlcRequest>(&requests)) { - std::cout << "HlcRequest" << std::endl; - } else if (std::get_if<GetShardMapRequest>(&requests)) { - std::cout << "GetShardMapRequest" << std::endl; - } else { - std::cout << "idk requests" << std::endl; - } - std::cout << "Coordinator Read()" << std::endl; + // if (std::get_if<HlcRequest>(&requests)) { + // std::cout << "HlcRequest" << std::endl; + // } else if (std::get_if<GetShardMapRequest>(&requests)) { + // std::cout << "GetShardMapRequest" << std::endl; + // } else { + // std::cout << "idk requests" << std::endl; + // } + // std::cout << "Coordinator Read()" << std::endl; auto ret = std::visit([&](auto requests) { return Read(requests); }, (requests)); - if (std::get_if<HlcResponse>(&ret)) { - std::cout << "HlcResponse" << std::endl; - } else if (std::get_if<GetShardMapResponse>(&ret)) { - std::cout << "GetShardMapResponse" << std::endl; - } else { - std::cout << "idk response" << std::endl; - } + // if (std::get_if<HlcResponse>(&ret)) { + // std::cout << "HlcResponse" << std::endl; + // } else if (std::get_if<GetShardMapResponse>(&ret)) { + // std::cout << "GetShardMapResponse" << std::endl; + // } else { + // std::cout << "idk response" << std::endl; + // } return ret; } diff --git a/src/io/rsm/shard_rsm.hpp b/src/io/rsm/shard_rsm.hpp index ea45c33fc..8cce21664 100644 --- a/src/io/rsm/shard_rsm.hpp +++ b/src/io/rsm/shard_rsm.hpp @@ -90,15 +90,12 @@ class StorageRsm { StorageGetResponse ret; if (!IsKeyInRange(request.key)) { - std::cout << "ONE" << std::endl; ret.latest_known_shard_map_version = shard_map_version_; ret.shard_rsm_success = false; } else if (state_.contains(request.key)) { - std::cout << "TWO" << std::endl; ret.value = state_[request.key]; ret.shard_rsm_success = true; } else { - std::cout << "THREE" << std::endl; ret.shard_rsm_success = false; ret.value = std::nullopt; } @@ -112,12 +109,10 @@ class StorageRsm { if (!IsKeyInRange(request.key)) { ret.latest_known_shard_map_version = shard_map_version_; ret.shard_rsm_success = false; - std::cout << "WRITE 0" << std::endl; } // Key exist else if (state_.contains(request.key)) { auto &val = state_[request.key]; - std::cout << "WRITE 1" << std::endl; /* * Delete @@ -126,7 +121,6 @@ class StorageRsm { ret.shard_rsm_success = true; ret.last_value = val; state_.erase(state_.find(request.key)); - std::cout << "WRITE 2" << std::endl; } /* @@ -138,12 +132,10 @@ class StorageRsm { ret.shard_rsm_success = true; val = request.value.value(); - std::cout << "WRITE 3" << std::endl; } else { ret.last_value = val; ret.shard_rsm_success = false; - std::cout << "WRITE 4" << std::endl; } } /* @@ -154,10 +146,8 @@ class StorageRsm { ret.shard_rsm_success = true; state_.emplace(request.key, std::move(request.value).value()); - std::cout << "WRITE 5" << std::endl; } - std::cout << "WRITE ret" << std::endl; return ret; } }; diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp index c0cb84476..199cce5ce 100644 --- a/tests/simulation/raft.cpp +++ b/tests/simulation/raft.cpp @@ -117,91 +117,6 @@ class TestState { } }; -// template <typename IoImpl, typename WriteRequestT, typename WriteResponseT, typename ReadRequestT, -// typename ReadResponseT> -// class RsmClient { -// using ServerPool = std::vector<Address>; - -// IoImpl io_; -// Address leader_; - -// std::mt19937 cli_rng_{0}; -// ServerPool server_addrs_; - -// template <typename ResponseT> -// std::optional<ResponseT> CheckForCorrectLeader(ResponseT response) { -// if (response.retry_leader) { -// MG_ASSERT(!response.success, "retry_leader should never be set for successful responses"); -// leader_ = response.retry_leader.value(); -// std::cout << "client redirected to leader server " << leader_.last_known_port << std::endl; -// } else if (!response.success) { -// std::uniform_int_distribution<size_t> addr_distrib(0, (server_addrs_.size() - 1)); -// size_t addr_index = addr_distrib(cli_rng_); -// leader_ = server_addrs_[addr_index]; - -// std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index -// << " with port " << leader_.last_known_port << std::endl; -// return {}; -// } - -// return response; -// } - -// public: -// RsmClient(IoImpl &&io, Address &&leader, ServerPool &&server_addrs) -// : io_{io}, leader_{leader}, server_addrs_{server_addrs} {} - -// RsmClient() = delete; - -// std::optional<WriteResponse<WriteResponseT>> SendWriteRequest(WriteRequestT req) { -// WriteRequest<WriteRequestT> client_req; -// client_req.operation = req; - -// std::cout << "client sending CasRequest to Leader " << leader_.last_known_port << std::endl; -// ResponseFuture<WriteResponse<WriteResponseT>> response_future = -// io_.template Request<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(leader_, client_req); -// ResponseResult<WriteResponse<WriteResponseT>> response_result = std::move(response_future).Wait(); - -// if (response_result.HasError()) { -// std::cout << "client timed out while trying to communicate with leader server " << std::endl; -// // continue; -// return std::nullopt; -// } - -// ResponseEnvelope<WriteResponse<WriteResponseT>> response_envelope = response_result.GetValue(); -// WriteResponse<WriteResponseT> write_response = response_envelope.message; - -// return CheckForCorrectLeader(write_response); -// } - -// std::optional<ReadResponse<ReadResponseT>> SendReadRequest(ReadRequestT req) { -// ReadRequest<ReadRequestT> read_req; -// read_req.operation = req; - -// std::cout << "client sending GetRequest to Leader " << leader_.last_known_port << std::endl; -// ResponseFuture<ReadResponse<ReadResponseT>> get_response_future = -// io_.template Request<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(leader_, read_req); - -// // receive response -// ResponseResult<ReadResponse<ReadResponseT>> get_response_result = std::move(get_response_future).Wait(); - -// if (get_response_result.HasError()) { -// std::cout << "client timed out while trying to communicate with leader server " << std::endl; -// return {}; -// } - -// ResponseEnvelope<ReadResponse<ReadResponseT>> get_response_envelope = get_response_result.GetValue(); -// ReadResponse<ReadResponseT> read_get_response = get_response_envelope.message; - -// if (!read_get_response.success) { -// // sent to a non-leader -// return {}; -// } - -// return CheckForCorrectLeader(read_get_response); -// } -// }; - template <typename IoImpl> void RunRaft(Raft<IoImpl, TestState, CasRequest, CasResponse, GetRequest, GetResponse> server) { server.Run(); diff --git a/tests/simulation/sharded_map.cpp b/tests/simulation/sharded_map.cpp index ba24c4295..8b0928ecc 100644 --- a/tests/simulation/sharded_map.cpp +++ b/tests/simulation/sharded_map.cpp @@ -129,22 +129,13 @@ int main() { .scramble_messages = true, .rng_seed = 0, .start_time = Time::min() + std::chrono::microseconds{256 * 1024}, - .abort_time = Time::min() + std::chrono::microseconds{8 * 1024 * 1024}, + .abort_time = Time::min() + std::chrono::microseconds{2 * 8 * 1024 * 1024}, }; auto simulator = Simulator(config); Io<SimulatorTransport> cli_io = simulator.RegisterNew(); - // auto c_thread_1 = std::jthread(RunRaft< Coordinator>, std::move(c_1)); - // simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[0]); - - // auto c_thread_2 = std::jthread(RunRaft< Coordinator>, std::move(c_2)); - // simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[1]); - - // auto c_thread_3 = std::jthread(RunRaft<Coordinator>, std::move(c_3)); - // simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[2]); - // Register Io<SimulatorTransport> a_io_1 = simulator.RegisterNew(); Io<SimulatorTransport> a_io_2 = simulator.RegisterNew(); @@ -255,37 +246,25 @@ int main() { // Look for Shard auto read_res_opt = coordinator_client.SendReadRequest(req); if (!read_res_opt) { - std::cout << "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!0" << std::endl; continue; } if (!read_res_opt.value().success) { - std::cout << "Not successful." << std::endl; continue; } - std::cout << "Before" << std::endl; auto read_res = read_res_opt.value(); - std::cout << "After" << std::endl; auto res = std::get<memgraph::coordinator::HlcResponse>(read_res.read_return); + // Transaction ID to be used later... auto transaction_id = res.new_hlc; - std::cout << "transaction_id: " << transaction_id.logical_id << std::endl; - - if (!res.fresher_shard_map) { - // continue; - std::cout << "Something is really not OK..." << std::endl; - } - - std::cout << "Before2" << std::endl; client_shard_map = res.fresher_shard_map.value(); - std::cout << "After2" << std::endl; // TODO(gabor) check somewhere in the call chain if the entries are actually valid - for (auto &[key, val] : client_shard_map.GetShards()) { - std::cout << "key: " << key << std::endl; - } + // for (auto &[key, val] : client_shard_map.GetShards()) { + // std::cout << "key: " << key << std::endl; + // } auto target_shard = client_shard_map.GetShardForKey(std::string("label1"), cm_k); @@ -293,9 +272,7 @@ int main() { auto storage_client_opt = DetermineShardLocation(target_shard, a_addrs, shard_a_client, b_addrs, shard_b_client); MG_ASSERT(storage_client_opt); - std::cout << "Before3" << std::endl; auto storage_client = storage_client_opt.value(); - std::cout << "After3" << std::endl; // Have client use shard map to decide which shard to communicate // with in order to write a new value @@ -307,7 +284,6 @@ int main() { storage_req.value = 1000; auto write_res_opt = storage_client.SendWriteRequest(storage_req); if (!write_res_opt) { - std::cout << "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1" << std::endl; continue; } auto write_res = write_res_opt.value().write_return; @@ -325,19 +301,27 @@ int main() { storage_get_req.key = {write_key_1, write_key_2}; auto get_res_opt = storage_client.SendReadRequest(storage_get_req); if (!get_res_opt) { - std::cout << "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!2" << std::endl; continue; } auto get_res = get_res_opt.value(); auto val = get_res.read_return.value.value(); - std::cout << "val -> " << val << std::endl; - - MG_ASSERT(get_res.read_return.value == 1000); + MG_ASSERT(val == 1000); break; } simulator.ShutDown(); + SimulatorStats stats = simulator.Stats(); + + std::cout << "total messages: " << stats.total_messages << std::endl; + std::cout << "dropped messages: " << stats.dropped_messages << std::endl; + std::cout << "timed out requests: " << stats.timed_out_requests << std::endl; + std::cout << "total requests: " << stats.total_requests << std::endl; + std::cout << "total responses: " << stats.total_responses << std::endl; + std::cout << "simulator ticks: " << stats.simulator_ticks << std::endl; + + std::cout << "========================== SUCCESS :) ==========================" << std::endl; + return 0; }