From 03cf264b766127317b865a0c00e308595f1ea5cf Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 28 Oct 2022 08:29:27 +0000 Subject: [PATCH 1/7] Add test for 1k shards, 1k create vertices, and then a scan all --- tests/unit/1k_shards_1k_create_scanall.cpp | 258 +++++++++++++++++++++ tests/unit/CMakeLists.txt | 4 + 2 files changed, 262 insertions(+) create mode 100644 tests/unit/1k_shards_1k_create_scanall.cpp diff --git a/tests/unit/1k_shards_1k_create_scanall.cpp b/tests/unit/1k_shards_1k_create_scanall.cpp new file mode 100644 index 000000000..5fdbfafa8 --- /dev/null +++ b/tests/unit/1k_shards_1k_create_scanall.cpp @@ -0,0 +1,258 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include +#include +#include +#include +#include +#include + +#include + +#include "coordinator/coordinator_client.hpp" +#include "coordinator/coordinator_rsm.hpp" +#include "coordinator/shard_map.hpp" +#include "io/address.hpp" +#include "io/local_transport/local_system.hpp" +#include "io/local_transport/local_transport.hpp" +#include "io/simulator/simulator.hpp" +#include "io/simulator/simulator_config.hpp" +#include "io/simulator/simulator_transport.hpp" +#include "machine_manager/machine_config.hpp" +#include "machine_manager/machine_manager.hpp" +#include "query/v2/requests.hpp" +#include "query/v2/shard_request_manager.hpp" +#include "utils/print_helpers.hpp" +#include "utils/variant_helpers.hpp" + +namespace memgraph::tests::simulation { + +using coordinator::Coordinator; +using coordinator::CoordinatorClient; +using coordinator::CoordinatorReadRequests; +using coordinator::CoordinatorWriteRequests; +using coordinator::CoordinatorWriteResponses; +using coordinator::GetShardMapRequest; +using coordinator::GetShardMapResponse; +using coordinator::Hlc; +using coordinator::HlcResponse; +using coordinator::Shard; +using coordinator::ShardMap; +using io::Address; +using io::Io; +using io::local_transport::LocalSystem; +using io::local_transport::LocalTransport; +using io::rsm::RsmClient; +using machine_manager::MachineConfig; +using machine_manager::MachineManager; +using msgs::ReadRequests; +using msgs::ReadResponses; +using msgs::WriteRequests; +using msgs::WriteResponses; +using storage::v3::LabelId; +using storage::v3::SchemaProperty; + +using CompoundKey = std::pair; +using ShardClient = RsmClient; + +struct CreateVertex { + int first; + int second; + + friend std::ostream &operator<<(std::ostream &in, const CreateVertex &add) { + in << "CreateVertex { first: " << add.first << ", second: " << add.second << " }"; + return in; + } +}; + +struct ScanAll { + friend std::ostream &operator<<(std::ostream &in, const ScanAll &get) { + in << "ScanAll {}"; + return in; + } +}; + +MachineManager MkMm(LocalSystem &local_system, std::vector
coordinator_addresses, Address addr, + ShardMap shard_map) { + MachineConfig config{ + .coordinator_addresses = coordinator_addresses, + .is_storage = true, + .is_coordinator = true, + .listen_ip = addr.last_known_ip, + .listen_port = addr.last_known_port, + }; + + Io io = local_system.Register(addr); + + Coordinator coordinator{shard_map}; + + return MachineManager{io, config, coordinator, shard_map}; +} + +void RunMachine(MachineManager mm) { mm.Run(); } + +void WaitForShardsToInitialize(CoordinatorClient &coordinator_client) { + // Call coordinator client's read method for GetShardMap and keep + // reading it until the shard map contains proper replicas for + // each shard in the label space. + + while (true) { + GetShardMapRequest req{}; + CoordinatorReadRequests read_req = req; + auto read_res = coordinator_client.SendReadRequest(read_req); + if (read_res.HasError()) { + // timed out + continue; + } + auto response_result = read_res.GetValue(); + auto response = std::get(response_result); + auto shard_map = response.shard_map; + + if (shard_map.ClusterInitialized()) { + spdlog::info("cluster stabilized - beginning workload"); + return; + } + } +} + +ShardMap TestShardMap(int n_splits, int replication_factor) { + ShardMap sm{}; + + const std::string label_name = std::string("test_label"); + + // register new properties + const std::vector property_names = {"property_1", "property_2"}; + const auto properties = sm.AllocatePropertyIds(property_names); + const auto property_id_1 = properties.at("property_1"); + const auto property_id_2 = properties.at("property_2"); + const auto type_1 = memgraph::common::SchemaType::INT; + const auto type_2 = memgraph::common::SchemaType::INT; + + // register new label space + std::vector schema = { + SchemaProperty{.property_id = property_id_1, .type = type_1}, + SchemaProperty{.property_id = property_id_2, .type = type_2}, + }; + + std::optional label_id = sm.InitializeNewLabel(label_name, schema, replication_factor, sm.shard_map_version); + MG_ASSERT(label_id.has_value()); + + // split the shard at N split points + for (int64_t i = 1; i < n_splits; ++i) { + const auto key1 = memgraph::storage::v3::PropertyValue(i); + const auto key2 = memgraph::storage::v3::PropertyValue(0); + + const auto split_point = {key1, key2}; + + const bool split_success = sm.SplitShard(sm.shard_map_version, label_id.value(), split_point); + + MG_ASSERT(split_success); + } + + return sm; +} + +void ExecuteOp(msgs::ShardRequestManager &shard_request_manager, + std::set &correctness_model, CreateVertex create_vertex) { + const auto key1 = memgraph::storage::v3::PropertyValue(create_vertex.first); + const auto key2 = memgraph::storage::v3::PropertyValue(create_vertex.second); + + std::vector primary_key = {msgs::Value(int64_t(create_vertex.first)), + msgs::Value(int64_t(create_vertex.second))}; + + if (correctness_model.contains(std::make_pair(create_vertex.first, create_vertex.second))) { + // TODO(tyler) remove this early-return when we have properly handled setting non-unique vertexes + return; + } + + msgs::ExecutionState state; + + auto label_id = shard_request_manager.NameToLabel("test_label"); + + msgs::NewVertex nv{.primary_key = primary_key}; + nv.label_ids.push_back({label_id}); + + std::vector new_vertices; + new_vertices.push_back(std::move(nv)); + + auto result = shard_request_manager.Request(state, std::move(new_vertices)); + + MG_ASSERT(result.size() == 1); + MG_ASSERT(result[0].success); + + correctness_model.emplace(std::make_pair(create_vertex.first, create_vertex.second)); +} + +void ExecuteOp(msgs::ShardRequestManager &shard_request_manager, + std::set &correctness_model, ScanAll scan_all) { + msgs::ExecutionState request{.label = "test_label"}; + + auto results = shard_request_manager.Request(request); + + MG_ASSERT(results.size() == correctness_model.size()); + + for (const auto &vertex_accessor : results) { + const auto properties = vertex_accessor.Properties(); + const auto primary_key = vertex_accessor.Id().second; + const CompoundKey model_key = std::make_pair(primary_key[0].int_v, primary_key[1].int_v); + MG_ASSERT(correctness_model.contains(model_key)); + } +} + +TEST(MachineManager, ManyShards) { + LocalSystem local_system; + + auto cli_addr = Address::TestAddress(1); + auto machine_1_addr = cli_addr.ForkUniqueAddress(); + + Io cli_io = local_system.Register(cli_addr); + Io cli_io_2 = local_system.Register(Address::TestAddress(2)); + + auto coordinator_addresses = std::vector{ + machine_1_addr, + }; + + auto shard_splits = 1024; + auto replication_factor = 1; + auto create_ops = 1000; + + ShardMap initialization_sm = TestShardMap(shard_splits, replication_factor); + + auto mm_1 = MkMm(local_system, coordinator_addresses, machine_1_addr, initialization_sm); + Address coordinator_address = mm_1.CoordinatorAddress(); + + auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1)); + + CoordinatorClient coordinator_client(cli_io, coordinator_address, {coordinator_address}); + WaitForShardsToInitialize(coordinator_client); + + msgs::ShardRequestManager shard_request_manager(std::move(coordinator_client), std::move(cli_io)); + + shard_request_manager.StartTransaction(); + + auto correctness_model = std::set{}; + + for (int i = 0; i < create_ops; i++) { + ExecuteOp(shard_request_manager, correctness_model, CreateVertex{.first = i, .second = i}); + } + + ExecuteOp(shard_request_manager, correctness_model, ScanAll{}); + + local_system.ShutDown(); + + auto histo = cli_io_2.ResponseLatencies(); + + using memgraph::utils::print_helpers::operator<<; + std::cout << "response latencies: " << histo << std::endl; +} + +} // namespace memgraph::tests::simulation diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index c8ed0d33a..bb723c1d3 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -442,3 +442,7 @@ target_link_libraries(${test_prefix}pretty_print_ast_to_original_expression_test # Tests for mg-coordinator add_unit_test(coordinator_shard_map.cpp) target_link_libraries(${test_prefix}coordinator_shard_map mg-coordinator) + +# Tests for mg-coordinator +add_unit_test(1k_shards_1k_create_scanall.cpp) +target_link_libraries(${test_prefix}1k_shards_1k_create_scanall mg-io mg-coordinator mg-storage-v3 mg-query-v2) From 6b0168cb3dbafe8549d21de06263f7d8e4856c3c Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 28 Oct 2022 08:51:12 +0000 Subject: [PATCH 2/7] Add LocalTransport::ResponseLatencies --- src/io/local_transport/local_transport.hpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/io/local_transport/local_transport.hpp b/src/io/local_transport/local_transport.hpp index 719605081..4fc6a4361 100644 --- a/src/io/local_transport/local_transport.hpp +++ b/src/io/local_transport/local_transport.hpp @@ -60,5 +60,9 @@ class LocalTransport { std::random_device rng; return distrib(rng); } + + std::unordered_map ResponseLatencies() { + return local_transport_handle_->ResponseLatencies(); + } }; }; // namespace memgraph::io::local_transport From 119da2d7a767244d54ef6520c023695d6329fdae Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 1 Nov 2022 09:51:43 +0100 Subject: [PATCH 3/7] Update CMakeLists.txt --- tests/unit/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index bb723c1d3..52e3a7eef 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -443,6 +443,6 @@ target_link_libraries(${test_prefix}pretty_print_ast_to_original_expression_test add_unit_test(coordinator_shard_map.cpp) target_link_libraries(${test_prefix}coordinator_shard_map mg-coordinator) -# Tests for mg-coordinator +# Tests for 1000 shards, 1000 creates, scan add_unit_test(1k_shards_1k_create_scanall.cpp) target_link_libraries(${test_prefix}1k_shards_1k_create_scanall mg-io mg-coordinator mg-storage-v3 mg-query-v2) From 5674ef40160a13bd7bc9732ae73865dd599a1d06 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 1 Nov 2022 16:30:37 +0100 Subject: [PATCH 4/7] Update tests/unit/1k_shards_1k_create_scanall.cpp Co-authored-by: Kostas Kyrimis --- tests/unit/1k_shards_1k_create_scanall.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/1k_shards_1k_create_scanall.cpp b/tests/unit/1k_shards_1k_create_scanall.cpp index 5fdbfafa8..7bfbbc27b 100644 --- a/tests/unit/1k_shards_1k_create_scanall.cpp +++ b/tests/unit/1k_shards_1k_create_scanall.cpp @@ -84,7 +84,7 @@ struct ScanAll { MachineManager MkMm(LocalSystem &local_system, std::vector
coordinator_addresses, Address addr, ShardMap shard_map) { MachineConfig config{ - .coordinator_addresses = coordinator_addresses, + .coordinator_addresses = std::move(coordinator_addresses), .is_storage = true, .is_coordinator = true, .listen_ip = addr.last_known_ip, From c55ca836a4ac4a6a840e6e2994c30ff7b5d96a17 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 1 Nov 2022 16:31:06 +0100 Subject: [PATCH 5/7] Update tests/unit/1k_shards_1k_create_scanall.cpp Co-authored-by: Kostas Kyrimis --- tests/unit/1k_shards_1k_create_scanall.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/1k_shards_1k_create_scanall.cpp b/tests/unit/1k_shards_1k_create_scanall.cpp index 7bfbbc27b..402d87cc4 100644 --- a/tests/unit/1k_shards_1k_create_scanall.cpp +++ b/tests/unit/1k_shards_1k_create_scanall.cpp @@ -95,7 +95,7 @@ MachineManager MkMm(LocalSystem &local_system, std::vector mm) { mm.Run(); } From 9a1258a708a6eccc2e974b81fc92ec6802d9901c Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 1 Nov 2022 16:31:14 +0100 Subject: [PATCH 6/7] Update tests/unit/1k_shards_1k_create_scanall.cpp Co-authored-by: Kostas Kyrimis --- tests/unit/1k_shards_1k_create_scanall.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/1k_shards_1k_create_scanall.cpp b/tests/unit/1k_shards_1k_create_scanall.cpp index 402d87cc4..603dc6a2f 100644 --- a/tests/unit/1k_shards_1k_create_scanall.cpp +++ b/tests/unit/1k_shards_1k_create_scanall.cpp @@ -127,7 +127,7 @@ void WaitForShardsToInitialize(CoordinatorClient &coordinator_cl ShardMap TestShardMap(int n_splits, int replication_factor) { ShardMap sm{}; - const std::string label_name = std::string("test_label"); + const auto label_name = std::string("test_label"); // register new properties const std::vector property_names = {"property_1", "property_2"}; From 6dd57426f80a49ec3bdb965e42f2a280a6187171 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 1 Nov 2022 15:51:37 +0000 Subject: [PATCH 7/7] Revert broken code suggestion --- tests/unit/1k_shards_1k_create_scanall.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/1k_shards_1k_create_scanall.cpp b/tests/unit/1k_shards_1k_create_scanall.cpp index 603dc6a2f..468a0e8b0 100644 --- a/tests/unit/1k_shards_1k_create_scanall.cpp +++ b/tests/unit/1k_shards_1k_create_scanall.cpp @@ -95,7 +95,7 @@ MachineManager MkMm(LocalSystem &local_system, std::vector mm) { mm.Run(); }