Add the coordinator part to the sharded_map test
This commit is contained in:
parent
eb71f3750c
commit
bbd3d352ee
@ -107,15 +107,20 @@ class Coordinator {
|
||||
ReadResponses Read(HlcRequest &&hlc_request) {
|
||||
HlcResponse res{};
|
||||
|
||||
std::cout << "HlcRequest->HlcResponse" << std::endl;
|
||||
|
||||
auto hlc_shard_map = shard_map_.GetHlc();
|
||||
|
||||
MG_ASSERT(!(hlc_request.last_shard_map_version.logical_id > hlc_shard_map.logical_id));
|
||||
|
||||
res.new_hlc = shard_map_.UpdateShardMapVersion();
|
||||
|
||||
res.fresher_shard_map = hlc_request.last_shard_map_version.logical_id < hlc_shard_map.logical_id
|
||||
? std::make_optional(shard_map_)
|
||||
: std::nullopt;
|
||||
// res.fresher_shard_map = hlc_request.last_shard_map_version.logical_id < hlc_shard_map.logical_id
|
||||
// ? std::make_optional(shard_map_)
|
||||
// : std::nullopt;
|
||||
|
||||
// Allways return fresher shard_map for now.
|
||||
res.fresher_shard_map = std::make_optional(shard_map_);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ struct ShardMap {
|
||||
|
||||
Shards GetShardsForRange(Label label, CompoundKey start, CompoundKey end);
|
||||
|
||||
Shard GetShardForKey(Label label, CompoundKey key);
|
||||
Shard GetShardForKey(Label label, CompoundKey key) { return shards.at(label).at(key); }
|
||||
};
|
||||
|
||||
} // namespace memgraph::coordinator
|
||||
|
@ -55,9 +55,12 @@ using memgraph::io::simulator::SimulatorConfig;
|
||||
using memgraph::io::simulator::SimulatorStats;
|
||||
using memgraph::io::simulator::SimulatorTransport;
|
||||
|
||||
using StorageClient =
|
||||
RsmClient<Io<SimulatorTransport>, StorageWriteRequest, StorageWriteResponse, StorageGetRequest, StorageGetResponse>;
|
||||
namespace {
|
||||
ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Address b_io_1, Address b_io_2,
|
||||
Address b_io_3) {
|
||||
ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::coordinator::Address a_io_2,
|
||||
memgraph::coordinator::Address a_io_3, memgraph::coordinator::Address b_io_1,
|
||||
memgraph::coordinator::Address b_io_2, memgraph::coordinator::Address b_io_3) {
|
||||
ShardMap sm1;
|
||||
auto &shards = sm1.GetShards();
|
||||
|
||||
@ -91,6 +94,21 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add
|
||||
|
||||
return sm1;
|
||||
}
|
||||
|
||||
std::optional<StorageClient> DetermineShardLocation(Shard target_shard, const std::vector<Address> &a_addrs,
|
||||
StorageClient a_client, const std::vector<Address> &b_addrs,
|
||||
StorageClient b_client) {
|
||||
for (const auto &addr : target_shard) {
|
||||
if (addr.address == b_addrs[0]) {
|
||||
return b_client;
|
||||
}
|
||||
if (addr.address == a_addrs[0]) {
|
||||
return a_client;
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
using ConcreteCoordinatorRsm = CoordinatorRsm<SimulatorTransport>;
|
||||
@ -215,10 +233,8 @@ int main() {
|
||||
using CoordinatorClient =
|
||||
RsmClient<Io<SimulatorTransport>, memgraph::coordinator::WriteRequests, memgraph::coordinator::WriteResponses,
|
||||
memgraph::coordinator::ReadRequests, memgraph::coordinator::ReadResponses>;
|
||||
CoordinatorClient coordinator_client(cli_io, c_addrs[2], c_addrs);
|
||||
CoordinatorClient coordinator_client(cli_io, c_addrs[0], c_addrs);
|
||||
|
||||
using StorageClient = RsmClient<Io<SimulatorTransport>, StorageWriteRequest, StorageWriteResponse, StorageGetRequest,
|
||||
StorageGetResponse>;
|
||||
StorageClient shard_a_client(cli_io, a_addrs[0], a_addrs);
|
||||
StorageClient shard_b_client(cli_io, b_addrs[0], b_addrs);
|
||||
|
||||
@ -229,30 +245,56 @@ int main() {
|
||||
req.last_shard_map_version = client_shard_map.GetHlc();
|
||||
|
||||
while (true) {
|
||||
// auto read_res_opt = coordinator_client.SendReadRequest(req);
|
||||
// if(!read_res_opt)
|
||||
// {
|
||||
// std::cout << "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!0" << std::endl;
|
||||
// continue;
|
||||
// }
|
||||
// auto read_res = read_res_opt.value();
|
||||
// Create CompoundKey
|
||||
auto cm_key_1 = memgraph::storage::v3::PropertyValue(3);
|
||||
auto cm_key_2 = memgraph::storage::v3::PropertyValue(4);
|
||||
|
||||
// auto res = std::get<memgraph::coordinator::HlcResponse>(read_res.read_return);
|
||||
CompoundKey cm_k = {cm_key_1, cm_key_2};
|
||||
|
||||
// auto transaction_id = res.new_hlc;
|
||||
// Look for Shard
|
||||
auto read_res_opt = coordinator_client.SendReadRequest(req);
|
||||
if (!read_res_opt) {
|
||||
std::cout << "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!0" << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
// client_shard_map = res.fresher_shard_map.value();
|
||||
std::cout << "Before" << std::endl;
|
||||
auto read_res = read_res_opt.value();
|
||||
std::cout << "After" << std::endl;
|
||||
|
||||
// // Have client use shard map to decide which shard to communicate
|
||||
// // with in order to write a new value
|
||||
auto res = std::get<memgraph::coordinator::HlcResponse>(read_res.read_return);
|
||||
auto transaction_id = res.new_hlc;
|
||||
|
||||
// //client_shard_map.
|
||||
std::cout << "transaction_id: " << transaction_id.logical_id << std::endl;
|
||||
|
||||
if (!res.fresher_shard_map) {
|
||||
// continue;
|
||||
std::cout << "Something is really not OK..." << std::endl;
|
||||
}
|
||||
|
||||
std::cout << "Before2" << std::endl;
|
||||
client_shard_map = res.fresher_shard_map.value();
|
||||
std::cout << "After2" << std::endl;
|
||||
|
||||
auto target_shard = client_shard_map.GetShardForKey("label1", cm_k);
|
||||
|
||||
// Determine which shard to send the requests to
|
||||
auto storage_client_opt = DetermineShardLocation(target_shard, a_addrs, shard_a_client, b_addrs, shard_b_client);
|
||||
MG_ASSERT(storage_client_opt);
|
||||
|
||||
std::cout << "Before3" << std::endl;
|
||||
auto storage_client = storage_client_opt.value();
|
||||
std::cout << "After3" << std::endl;
|
||||
|
||||
// Have client use shard map to decide which shard to communicate
|
||||
// with in order to write a new value
|
||||
// client_shard_map.
|
||||
StorageWriteRequest storage_req;
|
||||
auto write_key_1 = memgraph::storage::PropertyValue(3);
|
||||
auto write_key_2 = memgraph::storage::PropertyValue(4);
|
||||
storage_req.key = {write_key_1, write_key_2};
|
||||
storage_req.value = 1000;
|
||||
auto write_res_opt = shard_a_client.SendWriteRequest(storage_req);
|
||||
auto write_res_opt = storage_client.SendWriteRequest(storage_req);
|
||||
if (!write_res_opt) {
|
||||
std::cout << "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1" << std::endl;
|
||||
continue;
|
||||
@ -265,14 +307,12 @@ int main() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// ... write_res.
|
||||
|
||||
// Have client use shard map to decide which shard to communicate
|
||||
// with to read that same value back
|
||||
|
||||
StorageGetRequest storage_get_req;
|
||||
storage_get_req.key = {write_key_1, write_key_2};
|
||||
auto get_res_opt = shard_a_client.SendReadRequest(storage_get_req);
|
||||
auto get_res_opt = storage_client.SendReadRequest(storage_get_req);
|
||||
if (!get_res_opt) {
|
||||
std::cout << "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!2" << std::endl;
|
||||
continue;
|
||||
|
Loading…
Reference in New Issue
Block a user