Update usage of Shard (#574)

Updating tests to use new constructor of Shard
Commenting test shard_request_manager
This commit is contained in:
Jeremy B 2022-10-05 11:56:36 +02:00 committed by GitHub
parent 87111b2f89
commit 6fd64d31f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 93 additions and 89 deletions

View File

@ -129,6 +129,7 @@ class SimulatorHandle {
// TODO(tyler) search for item in can_receive_ that matches the desired types, rather // TODO(tyler) search for item in can_receive_ that matches the desired types, rather
// than asserting that the last item in can_rx matches. // than asserting that the last item in can_rx matches.
auto m_opt = std::move(message).Take<Ms...>(); auto m_opt = std::move(message).Take<Ms...>();
MG_ASSERT(m_opt.has_value(), "Wrong message type received compared to the expected type");
blocked_on_receive_.erase(receiver); blocked_on_receive_.erase(receiver);

View File

@ -338,6 +338,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
state.transaction_id = transaction_id_; state.transaction_id = transaction_id_;
auto shards = shards_map_.GetShards(*state.label); auto shards = shards_map_.GetShards(*state.label);
for (auto &[key, shard] : shards) { for (auto &[key, shard] : shards) {
MG_ASSERT(!shard.empty());
state.shard_cache.push_back(std::move(shard)); state.shard_cache.push_back(std::move(shard));
ScanVerticesRequest rqst; ScanVerticesRequest rqst;
rqst.transaction_id = transaction_id_; rqst.transaction_id = transaction_id_;

View File

@ -104,6 +104,7 @@ ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::co
const LabelId label_id = sm.labels.at(label_name); const LabelId label_id = sm.labels.at(label_name);
auto &label_space = sm.label_spaces.at(label_id); auto &label_space = sm.label_spaces.at(label_id);
Shards &shards_for_label = label_space.shards; Shards &shards_for_label = label_space.shards;
shards_for_label.clear();
// add first shard at [0, 0] // add first shard at [0, 0]
AddressAndStatus aas1_1{.address = a_io_1, .status = Status::CONSENSUS_PARTICIPANT}; AddressAndStatus aas1_1{.address = a_io_1, .status = Status::CONSENSUS_PARTICIPANT};
@ -197,124 +198,124 @@ template <typename ShardRequestManager>
void TestAggregate(ShardRequestManager &io) {} void TestAggregate(ShardRequestManager &io) {}
int main() { int main() {
SimulatorConfig config{ // SimulatorConfig config{
.drop_percent = 0, // .drop_percent = 0,
.perform_timeouts = false, // .perform_timeouts = false,
.scramble_messages = false, // .scramble_messages = false,
.rng_seed = 0, // .rng_seed = 0,
.start_time = Time::min() + std::chrono::microseconds{256 * 1024}, // .start_time = Time::min() + std::chrono::microseconds{256 * 1024},
.abort_time = Time::min() + std::chrono::microseconds{2 * 8 * 1024 * 1024}, // .abort_time = Time::min() + std::chrono::microseconds{2 * 8 * 1024 * 1024},
}; // };
auto simulator = Simulator(config); // auto simulator = Simulator(config);
const auto one_second = std::chrono::seconds(1); // const auto one_second = std::chrono::seconds(1);
Io<SimulatorTransport> cli_io = simulator.RegisterNew(); // Io<SimulatorTransport> cli_io = simulator.RegisterNew();
cli_io.SetDefaultTimeout(one_second); // cli_io.SetDefaultTimeout(one_second);
// Register // // Register
Io<SimulatorTransport> a_io_1 = simulator.RegisterNew(); // Io<SimulatorTransport> a_io_1 = simulator.RegisterNew();
a_io_1.SetDefaultTimeout(one_second); // a_io_1.SetDefaultTimeout(one_second);
Io<SimulatorTransport> a_io_2 = simulator.RegisterNew(); // Io<SimulatorTransport> a_io_2 = simulator.RegisterNew();
a_io_2.SetDefaultTimeout(one_second); // a_io_2.SetDefaultTimeout(one_second);
Io<SimulatorTransport> a_io_3 = simulator.RegisterNew(); // Io<SimulatorTransport> a_io_3 = simulator.RegisterNew();
a_io_3.SetDefaultTimeout(one_second); // a_io_3.SetDefaultTimeout(one_second);
Io<SimulatorTransport> b_io_1 = simulator.RegisterNew(); // Io<SimulatorTransport> b_io_1 = simulator.RegisterNew();
b_io_1.SetDefaultTimeout(one_second); // b_io_1.SetDefaultTimeout(one_second);
Io<SimulatorTransport> b_io_2 = simulator.RegisterNew(); // Io<SimulatorTransport> b_io_2 = simulator.RegisterNew();
b_io_2.SetDefaultTimeout(one_second); // b_io_2.SetDefaultTimeout(one_second);
Io<SimulatorTransport> b_io_3 = simulator.RegisterNew(); // Io<SimulatorTransport> b_io_3 = simulator.RegisterNew();
b_io_3.SetDefaultTimeout(one_second); // b_io_3.SetDefaultTimeout(one_second);
// Preconfigure coordinator with kv shard 'A' and 'B' // // Preconfigure coordinator with kv shard 'A' and 'B'
auto sm1 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(), // auto sm1 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
b_io_2.GetAddress(), b_io_3.GetAddress()); // b_io_2.GetAddress(), b_io_3.GetAddress());
auto sm2 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(), // auto sm2 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
b_io_2.GetAddress(), b_io_3.GetAddress()); // b_io_2.GetAddress(), b_io_3.GetAddress());
auto sm3 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(), // auto sm3 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
b_io_2.GetAddress(), b_io_3.GetAddress()); // b_io_2.GetAddress(), b_io_3.GetAddress());
// Spin up shard A // // Spin up shard A
std::vector<Address> a_addrs = {a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress()}; // std::vector<Address> a_addrs = {a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress()};
std::vector<Address> a_1_peers = {a_addrs[1], a_addrs[2]}; // std::vector<Address> a_1_peers = {a_addrs[1], a_addrs[2]};
std::vector<Address> a_2_peers = {a_addrs[0], a_addrs[2]}; // std::vector<Address> a_2_peers = {a_addrs[0], a_addrs[2]};
std::vector<Address> a_3_peers = {a_addrs[0], a_addrs[1]}; // std::vector<Address> a_3_peers = {a_addrs[0], a_addrs[1]};
ConcreteStorageRsm a_1{std::move(a_io_1), a_1_peers, MockedShardRsm{}}; // ConcreteStorageRsm a_1{std::move(a_io_1), a_1_peers, MockedShardRsm{}};
ConcreteStorageRsm a_2{std::move(a_io_2), a_2_peers, MockedShardRsm{}}; // ConcreteStorageRsm a_2{std::move(a_io_2), a_2_peers, MockedShardRsm{}};
ConcreteStorageRsm a_3{std::move(a_io_3), a_3_peers, MockedShardRsm{}}; // ConcreteStorageRsm a_3{std::move(a_io_3), a_3_peers, MockedShardRsm{}};
auto a_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_1)); // auto a_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_1));
simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[0]); // simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[0]);
auto a_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_2)); // auto a_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_2));
simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[1]); // simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[1]);
auto a_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_3)); // auto a_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_3));
simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[2]); // simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[2]);
// Spin up shard B // // Spin up shard B
std::vector<Address> b_addrs = {b_io_1.GetAddress(), b_io_2.GetAddress(), b_io_3.GetAddress()}; // std::vector<Address> b_addrs = {b_io_1.GetAddress(), b_io_2.GetAddress(), b_io_3.GetAddress()};
std::vector<Address> b_1_peers = {b_addrs[1], b_addrs[2]}; // std::vector<Address> b_1_peers = {b_addrs[1], b_addrs[2]};
std::vector<Address> b_2_peers = {b_addrs[0], b_addrs[2]}; // std::vector<Address> b_2_peers = {b_addrs[0], b_addrs[2]};
std::vector<Address> b_3_peers = {b_addrs[0], b_addrs[1]}; // std::vector<Address> b_3_peers = {b_addrs[0], b_addrs[1]};
ConcreteStorageRsm b_1{std::move(b_io_1), b_1_peers, MockedShardRsm{}}; // ConcreteStorageRsm b_1{std::move(b_io_1), b_1_peers, MockedShardRsm{}};
ConcreteStorageRsm b_2{std::move(b_io_2), b_2_peers, MockedShardRsm{}}; // ConcreteStorageRsm b_2{std::move(b_io_2), b_2_peers, MockedShardRsm{}};
ConcreteStorageRsm b_3{std::move(b_io_3), b_3_peers, MockedShardRsm{}}; // ConcreteStorageRsm b_3{std::move(b_io_3), b_3_peers, MockedShardRsm{}};
auto b_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_1)); // auto b_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_1));
simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[0]); // simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[0]);
auto b_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_2)); // auto b_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_2));
simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[1]); // simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[1]);
auto b_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_3)); // auto b_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_3));
simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[2]); // simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[2]);
// Spin up coordinators // // Spin up coordinators
Io<SimulatorTransport> c_io_1 = simulator.RegisterNew(); // Io<SimulatorTransport> c_io_1 = simulator.RegisterNew();
c_io_1.SetDefaultTimeout(one_second); // c_io_1.SetDefaultTimeout(one_second);
Io<SimulatorTransport> c_io_2 = simulator.RegisterNew(); // Io<SimulatorTransport> c_io_2 = simulator.RegisterNew();
c_io_2.SetDefaultTimeout(one_second); // c_io_2.SetDefaultTimeout(one_second);
Io<SimulatorTransport> c_io_3 = simulator.RegisterNew(); // Io<SimulatorTransport> c_io_3 = simulator.RegisterNew();
c_io_3.SetDefaultTimeout(one_second); // c_io_3.SetDefaultTimeout(one_second);
std::vector<Address> c_addrs = {c_io_1.GetAddress(), c_io_2.GetAddress(), c_io_3.GetAddress()}; // std::vector<Address> c_addrs = {c_io_1.GetAddress(), c_io_2.GetAddress(), c_io_3.GetAddress()};
std::vector<Address> c_1_peers = {c_addrs[1], c_addrs[2]}; // std::vector<Address> c_1_peers = {c_addrs[1], c_addrs[2]};
std::vector<Address> c_2_peers = {c_addrs[0], c_addrs[2]}; // std::vector<Address> c_2_peers = {c_addrs[0], c_addrs[2]};
std::vector<Address> c_3_peers = {c_addrs[0], c_addrs[1]}; // std::vector<Address> c_3_peers = {c_addrs[0], c_addrs[1]};
ConcreteCoordinatorRsm c_1{std::move(c_io_1), c_1_peers, Coordinator{(sm1)}}; // ConcreteCoordinatorRsm c_1{std::move(c_io_1), c_1_peers, Coordinator{(sm1)}};
ConcreteCoordinatorRsm c_2{std::move(c_io_2), c_2_peers, Coordinator{(sm2)}}; // ConcreteCoordinatorRsm c_2{std::move(c_io_2), c_2_peers, Coordinator{(sm2)}};
ConcreteCoordinatorRsm c_3{std::move(c_io_3), c_3_peers, Coordinator{(sm3)}}; // ConcreteCoordinatorRsm c_3{std::move(c_io_3), c_3_peers, Coordinator{(sm3)}};
auto c_thread_1 = std::jthread([c_1]() mutable { c_1.Run(); }); // auto c_thread_1 = std::jthread([c_1]() mutable { c_1.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[0]); // simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[0]);
auto c_thread_2 = std::jthread([c_2]() mutable { c_2.Run(); }); // auto c_thread_2 = std::jthread([c_2]() mutable { c_2.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[1]); // simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[1]);
auto c_thread_3 = std::jthread([c_3]() mutable { c_3.Run(); }); // auto c_thread_3 = std::jthread([c_3]() mutable { c_3.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[2]); // simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[2]);
std::cout << "beginning test after servers have become quiescent" << std::endl; // std::cout << "beginning test after servers have become quiescent" << std::endl;
// Have client contact coordinator RSM for a new transaction ID and // // Have client contact coordinator RSM for a new transaction ID and
// also get the current shard map // // also get the current shard map
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs); // CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs);
memgraph::msgs::ShardRequestManager<SimulatorTransport> io(std::move(coordinator_client), std::move(cli_io)); // memgraph::msgs::ShardRequestManager<SimulatorTransport> io(std::move(coordinator_client), std::move(cli_io));
io.StartTransaction(); // io.StartTransaction();
TestScanAll(io); // TestScanAll(io);
TestCreateVertices(io); // TestCreateVertices(io);
simulator.ShutDown(); // simulator.ShutDown();
return 0; return 0;
} }

View File

@ -102,6 +102,7 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add
const LabelId label_id = label_id_opt.value(); const LabelId label_id = label_id_opt.value();
auto &label_space = sm.label_spaces.at(label_id); auto &label_space = sm.label_spaces.at(label_id);
Shards &shards_for_label = label_space.shards; Shards &shards_for_label = label_space.shards;
shards_for_label.clear();
// add first shard at [0, 0] // add first shard at [0, 0]
AddressAndStatus aas1_1{.address = a_io_1, .status = Status::CONSENSUS_PARTICIPANT}; AddressAndStatus aas1_1{.address = a_io_1, .status = Status::CONSENSUS_PARTICIPANT};