diff --git a/src/coordinator/coordinator.hpp b/src/coordinator/coordinator.hpp index aa36af6a2..d37e93f53 100644 --- a/src/coordinator/coordinator.hpp +++ b/src/coordinator/coordinator.hpp @@ -107,15 +107,20 @@ class Coordinator { ReadResponses Read(HlcRequest &&hlc_request) { HlcResponse res{}; + std::cout << "HlcRequest->HlcResponse" << std::endl; + auto hlc_shard_map = shard_map_.GetHlc(); MG_ASSERT(!(hlc_request.last_shard_map_version.logical_id > hlc_shard_map.logical_id)); res.new_hlc = shard_map_.UpdateShardMapVersion(); - res.fresher_shard_map = hlc_request.last_shard_map_version.logical_id < hlc_shard_map.logical_id - ? std::make_optional(shard_map_) - : std::nullopt; + // res.fresher_shard_map = hlc_request.last_shard_map_version.logical_id < hlc_shard_map.logical_id + // ? std::make_optional(shard_map_) + // : std::nullopt; + + // Allways return fresher shard_map for now. + res.fresher_shard_map = std::make_optional(shard_map_); return res; } diff --git a/src/coordinator/shard_map.hpp b/src/coordinator/shard_map.hpp index e78699b1b..cf7cec541 100644 --- a/src/coordinator/shard_map.hpp +++ b/src/coordinator/shard_map.hpp @@ -97,7 +97,7 @@ struct ShardMap { Shards GetShardsForRange(Label label, CompoundKey start, CompoundKey end); - Shard GetShardForKey(Label label, CompoundKey key); + Shard GetShardForKey(Label label, CompoundKey key) { return shards.at(label).at(key); } }; } // namespace memgraph::coordinator diff --git a/tests/simulation/sharded_map.cpp b/tests/simulation/sharded_map.cpp index f547dc9c6..9b38ac123 100644 --- a/tests/simulation/sharded_map.cpp +++ b/tests/simulation/sharded_map.cpp @@ -55,9 +55,12 @@ using memgraph::io::simulator::SimulatorConfig; using memgraph::io::simulator::SimulatorStats; using memgraph::io::simulator::SimulatorTransport; +using StorageClient = + RsmClient<Io<SimulatorTransport>, StorageWriteRequest, StorageWriteResponse, StorageGetRequest, StorageGetResponse>; namespace { -ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Address b_io_1, Address b_io_2, - Address b_io_3) { +ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::coordinator::Address a_io_2, + memgraph::coordinator::Address a_io_3, memgraph::coordinator::Address b_io_1, + memgraph::coordinator::Address b_io_2, memgraph::coordinator::Address b_io_3) { ShardMap sm1; auto &shards = sm1.GetShards(); @@ -91,6 +94,21 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add return sm1; } + +std::optional<StorageClient> DetermineShardLocation(Shard target_shard, const std::vector<Address> &a_addrs, + StorageClient a_client, const std::vector<Address> &b_addrs, + StorageClient b_client) { + for (const auto &addr : target_shard) { + if (addr.address == b_addrs[0]) { + return b_client; + } + if (addr.address == a_addrs[0]) { + return a_client; + } + } + return {}; +} + } // namespace using ConcreteCoordinatorRsm = CoordinatorRsm<SimulatorTransport>; @@ -215,10 +233,8 @@ int main() { using CoordinatorClient = RsmClient<Io<SimulatorTransport>, memgraph::coordinator::WriteRequests, memgraph::coordinator::WriteResponses, memgraph::coordinator::ReadRequests, memgraph::coordinator::ReadResponses>; - CoordinatorClient coordinator_client(cli_io, c_addrs[2], c_addrs); + CoordinatorClient coordinator_client(cli_io, c_addrs[0], c_addrs); - using StorageClient = RsmClient<Io<SimulatorTransport>, StorageWriteRequest, StorageWriteResponse, StorageGetRequest, - StorageGetResponse>; StorageClient shard_a_client(cli_io, a_addrs[0], a_addrs); StorageClient shard_b_client(cli_io, b_addrs[0], b_addrs); @@ -229,30 +245,56 @@ int main() { req.last_shard_map_version = client_shard_map.GetHlc(); while (true) { - // auto read_res_opt = coordinator_client.SendReadRequest(req); - // if(!read_res_opt) - // { - // std::cout << "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!0" << std::endl; - // continue; - // } - // auto read_res = read_res_opt.value(); + // Create CompoundKey + auto cm_key_1 = memgraph::storage::v3::PropertyValue(3); + auto cm_key_2 = memgraph::storage::v3::PropertyValue(4); - // auto res = std::get<memgraph::coordinator::HlcResponse>(read_res.read_return); + CompoundKey cm_k = {cm_key_1, cm_key_2}; - // auto transaction_id = res.new_hlc; + // Look for Shard + auto read_res_opt = coordinator_client.SendReadRequest(req); + if (!read_res_opt) { + std::cout << "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!0" << std::endl; + continue; + } - // client_shard_map = res.fresher_shard_map.value(); + std::cout << "Before" << std::endl; + auto read_res = read_res_opt.value(); + std::cout << "After" << std::endl; - // // Have client use shard map to decide which shard to communicate - // // with in order to write a new value + auto res = std::get<memgraph::coordinator::HlcResponse>(read_res.read_return); + auto transaction_id = res.new_hlc; - // //client_shard_map. + std::cout << "transaction_id: " << transaction_id.logical_id << std::endl; + + if (!res.fresher_shard_map) { + // continue; + std::cout << "Something is really not OK..." << std::endl; + } + + std::cout << "Before2" << std::endl; + client_shard_map = res.fresher_shard_map.value(); + std::cout << "After2" << std::endl; + + auto target_shard = client_shard_map.GetShardForKey("label1", cm_k); + + // Determine which shard to send the requests to + auto storage_client_opt = DetermineShardLocation(target_shard, a_addrs, shard_a_client, b_addrs, shard_b_client); + MG_ASSERT(storage_client_opt); + + std::cout << "Before3" << std::endl; + auto storage_client = storage_client_opt.value(); + std::cout << "After3" << std::endl; + + // Have client use shard map to decide which shard to communicate + // with in order to write a new value + // client_shard_map. StorageWriteRequest storage_req; auto write_key_1 = memgraph::storage::PropertyValue(3); auto write_key_2 = memgraph::storage::PropertyValue(4); storage_req.key = {write_key_1, write_key_2}; storage_req.value = 1000; - auto write_res_opt = shard_a_client.SendWriteRequest(storage_req); + auto write_res_opt = storage_client.SendWriteRequest(storage_req); if (!write_res_opt) { std::cout << "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1" << std::endl; continue; @@ -265,14 +307,12 @@ int main() { continue; } - // ... write_res. - // Have client use shard map to decide which shard to communicate // with to read that same value back StorageGetRequest storage_get_req; storage_get_req.key = {write_key_1, write_key_2}; - auto get_res_opt = shard_a_client.SendReadRequest(storage_get_req); + auto get_res_opt = storage_client.SendReadRequest(storage_get_req); if (!get_res_opt) { std::cout << "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!2" << std::endl; continue;