From acc655f4fd4110e8b3dc1025c20a2cd17efd96e8 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 24 Oct 2022 19:54:09 +0200 Subject: [PATCH 1/2] Model-based testing of simulated full cluster (#584) This PR adds support for generating randomized workloads that will be executed against a simulated cluster, as well as against a correctness model. Initially this just generates ScanAll and CreateVertex requests, and anything that it creates, it also inserts into a `std::set`, and when we do a ScanAll, it asserts that we get the same number of requests back. This will become much more sophisticated over time, but it's already hitting pay-dirt. --- src/coordinator/shard_map.hpp | 46 +++- src/io/address.hpp | 34 +++ src/io/simulator/simulator.hpp | 2 + src/io/simulator/simulator_handle.cpp | 19 +- src/io/simulator/simulator_handle.hpp | 18 +- src/machine_manager/machine_manager.hpp | 6 +- src/query/v2/shard_request_manager.hpp | 2 + src/storage/v3/shard_manager.hpp | 6 +- tests/simulation/CMakeLists.txt | 18 +- tests/simulation/cluster_config.hpp | 51 ++++ tests/simulation/cluster_property_test.cpp | 50 ++++ tests/simulation/generated_operations.hpp | 114 ++++++++ tests/simulation/raft.cpp | 2 +- tests/simulation/shard_rsm.cpp | 2 +- tests/simulation/test_cluster.hpp | 251 ++++++++++++++++++ tests/simulation/testing_constants.hpp | 28 ++ .../query_storage_test.cpp | 4 +- 17 files changed, 613 insertions(+), 40 deletions(-) create mode 100644 tests/simulation/cluster_config.hpp create mode 100644 tests/simulation/cluster_property_test.cpp create mode 100644 tests/simulation/generated_operations.hpp create mode 100644 tests/simulation/test_cluster.hpp create mode 100644 tests/simulation/testing_constants.hpp diff --git a/src/coordinator/shard_map.hpp b/src/coordinator/shard_map.hpp index f869c79d1..aa0f13d09 100644 --- a/src/coordinator/shard_map.hpp +++ b/src/coordinator/shard_map.hpp @@ -172,14 +172,17 @@ struct ShardMap { for (auto &aas : shard) { if (initialized.contains(aas.address.unique_id)) { - spdlog::info("marking shard as full consensus participant: {}", aas.address.unique_id); - aas.status = Status::CONSENSUS_PARTICIPANT; machine_contains_shard = true; + if (aas.status != Status::CONSENSUS_PARTICIPANT) { + spdlog::info("marking shard as full consensus participant: {}", aas.address.unique_id); + aas.status = Status::CONSENSUS_PARTICIPANT; + } } else { const bool same_machine = aas.address.last_known_ip == storage_manager.last_known_ip && aas.address.last_known_port == storage_manager.last_known_port; if (same_machine) { machine_contains_shard = true; + spdlog::info("reminding shard manager that they should begin participating in shard"); ret.push_back(ShardToInitialize{ .uuid = aas.address.unique_id, .label_id = label_id, @@ -198,12 +201,16 @@ struct ShardMap { // TODO(tyler) use deterministic UUID so that coordinators don't diverge here address.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()}, - ret.push_back(ShardToInitialize{.uuid = address.unique_id, - .label_id = label_id, - .min_key = low_key, - .max_key = high_key, - .schema = schemas[label_id], - .config = Config{}}); + spdlog::info("assigning shard manager to shard"); + + ret.push_back(ShardToInitialize{ + .uuid = address.unique_id, + .label_id = label_id, + .min_key = low_key, + .max_key = high_key, + .schema = schemas[label_id], + .config = Config{}, + }); AddressAndStatus aas = { .address = address, @@ -398,6 +405,29 @@ struct ShardMap { return ret; } + + /// Returns true if all shards have the desired number of replicas and they are in + /// the CONSENSUS_PARTICIPANT state. Note that this does not necessarily mean that + /// there is also an active leader for each shard. + bool ClusterInitialized() const { + for (const auto &[label_id, label_space] : label_spaces) { + for (const auto &[low_key, shard] : label_space.shards) { + if (shard.size() < label_space.replication_factor) { + spdlog::info("label_space below desired replication factor"); + return false; + } + + for (const auto &aas : shard) { + if (aas.status != Status::CONSENSUS_PARTICIPANT) { + spdlog::info("shard member not yet a CONSENSUS_PARTICIPANT"); + return false; + } + } + } + } + + return true; + } }; } // namespace memgraph::coordinator diff --git a/src/io/address.hpp b/src/io/address.hpp index 73c7f1efd..a6e372edb 100644 --- a/src/io/address.hpp +++ b/src/io/address.hpp @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -78,6 +79,13 @@ struct Address { }; } + PartialAddress ToPartialAddress() const { + return PartialAddress{ + .ip = last_known_ip, + .port = last_known_port, + }; + } + friend bool operator==(const Address &lhs, const Address &rhs) = default; /// unique_id is most dominant for ordering, then last_known_ip, then last_known_port @@ -103,4 +111,30 @@ struct Address { return in; } }; + }; // namespace memgraph::io + +namespace std { +template <> +struct hash { + size_t operator()(const memgraph::io::PartialAddress &pa) const { + using boost::hash_combine; + using boost::hash_value; + + // Start with a hash value of 0 . + std::size_t seed = 0; + + if (pa.ip.is_v4()) { + auto h = std::hash()(pa.ip.to_v4()); + hash_combine(seed, h); + } else { + auto h = std::hash()(pa.ip.to_v6()); + hash_combine(seed, h); + } + hash_combine(seed, hash_value(pa.port)); + + // Return the result. + return seed; + } +}; +} // namespace std diff --git a/src/io/simulator/simulator.hpp b/src/io/simulator/simulator.hpp index a28ae16df..622c264b4 100644 --- a/src/io/simulator/simulator.hpp +++ b/src/io/simulator/simulator.hpp @@ -29,6 +29,8 @@ class Simulator { explicit Simulator(SimulatorConfig config) : rng_(std::mt19937{config.rng_seed}), simulator_handle_{std::make_shared(config)} {} + ~Simulator() { ShutDown(); } + void ShutDown() { simulator_handle_->ShutDown(); } Io RegisterNew() { diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp index 9fa16fae3..d48fe41ad 100644 --- a/src/io/simulator/simulator_handle.cpp +++ b/src/io/simulator/simulator_handle.cpp @@ -16,12 +16,10 @@ #include "io/simulator/simulator_stats.hpp" #include "io/time.hpp" #include "io/transport.hpp" +#include "utils/exceptions.hpp" namespace memgraph::io::simulator { -using memgraph::io::Duration; -using memgraph::io::Time; - void SimulatorHandle::ShutDown() { std::unique_lock lock(mu_); should_shut_down_ = true; @@ -76,9 +74,15 @@ bool SimulatorHandle::MaybeTickSimulator() { const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)}; cluster_wide_time_microseconds_ += clock_advance; - MG_ASSERT(cluster_wide_time_microseconds_ < config_.abort_time, - "Cluster has executed beyond its configured abort_time, and something may be failing to make progress " - "in an expected amount of time."); + if (cluster_wide_time_microseconds_ >= config_.abort_time) { + if (should_shut_down_) { + return false; + } + spdlog::error( + "Cluster has executed beyond its configured abort_time, and something may be failing to make progress " + "in an expected amount of time."); + throw utils::BasicException{"Cluster has executed beyond its configured abort_time"}; + } return true; } @@ -121,7 +125,8 @@ bool SimulatorHandle::MaybeTickSimulator() { // don't add it anywhere, let it drop } else { // add to can_receive_ if not - const auto &[om_vec, inserted] = can_receive_.try_emplace(to_address, std::vector()); + const auto &[om_vec, inserted] = + can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector()); om_vec->second.emplace_back(std::move(opaque_message)); } diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 3adf2b7b0..f7b3e89da 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -43,7 +43,7 @@ class SimulatorHandle { std::map promises_; // messages that are sent to servers that may later receive them - std::map> can_receive_; + std::map> can_receive_; Time cluster_wide_time_microseconds_; bool should_shut_down_ = false; @@ -59,7 +59,7 @@ class SimulatorHandle { const Time now = cluster_wide_time_microseconds_; for (auto it = promises_.begin(); it != promises_.end();) { auto &[promise_key, dop] = *it; - if (dop.deadline < now) { + if (dop.deadline < now && config_.perform_timeouts) { spdlog::info("timing out request from requester {} to replier {}.", promise_key.requester_address.ToString(), promise_key.replier_address.ToString()); std::move(dop).promise.TimeOut(); @@ -76,6 +76,14 @@ class SimulatorHandle { explicit SimulatorHandle(SimulatorConfig config) : cluster_wide_time_microseconds_(config.start_time), rng_(config.rng_seed), config_(config) {} + ~SimulatorHandle() { + for (auto it = promises_.begin(); it != promises_.end();) { + auto &[promise_key, dop] = *it; + std::move(dop).promise.TimeOut(); + it = promises_.erase(it); + } + } + void IncrementServerCountAndWaitForQuiescentState(Address address); /// This method causes most of the interesting simulation logic to happen, wrt network behavior. @@ -121,9 +129,11 @@ class SimulatorHandle { const Time deadline = cluster_wide_time_microseconds_ + timeout; + auto partial_address = receiver.ToPartialAddress(); + while (!should_shut_down_ && (cluster_wide_time_microseconds_ < deadline)) { - if (can_receive_.contains(receiver)) { - std::vector &can_rx = can_receive_.at(receiver); + if (can_receive_.contains(partial_address)) { + std::vector &can_rx = can_receive_.at(partial_address); if (!can_rx.empty()) { OpaqueMessage message = std::move(can_rx.back()); can_rx.pop_back(); diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp index ea5b0dff9..aab658429 100644 --- a/src/machine_manager/machine_manager.hpp +++ b/src/machine_manager/machine_manager.hpp @@ -65,7 +65,7 @@ class MachineManager { MachineConfig config_; CoordinatorRsm coordinator_; ShardManager shard_manager_; - Time next_cron_; + Time next_cron_ = Time::min(); public: // TODO initialize ShardManager with "real" coordinator addresses instead of io.GetAddress @@ -95,7 +95,7 @@ class MachineManager { WriteResponse, ReadRequest, AppendRequest, WriteRequest>; - spdlog::info("MM waiting on Receive"); + spdlog::info("MM waiting on Receive on address {}", io_.GetAddress().ToString()); // Note: this parameter pack must be kept in-sync with the AllMessages parameter pack above auto request_result = io_.template ReceiveWithTimeout< @@ -106,7 +106,6 @@ class MachineManager { if (request_result.HasError()) { // time to do Cron - spdlog::info("MM got timeout"); continue; } @@ -116,7 +115,6 @@ class MachineManager { // If message is for the coordinator, cast it to subset and pass it to the coordinator bool to_coordinator = coordinator_.GetAddress() == request_envelope.to_address; - spdlog::info("coordinator: {}", coordinator_.GetAddress().ToString()); if (to_coordinator) { std::optional conversion_attempt = ConvertVariant, AppendRequest, diff --git a/src/query/v2/shard_request_manager.hpp b/src/query/v2/shard_request_manager.hpp index be8ca6852..4ee36ec4a 100644 --- a/src/query/v2/shard_request_manager.hpp +++ b/src/query/v2/shard_request_manager.hpp @@ -246,6 +246,7 @@ class ShardRequestManager : public ShardRequestManagerInterface { // TODO(kostasrim) Simplify return result std::vector Request(ExecutionState &state) override { + spdlog::info("shards_map_.size(): {}", shards_map_.GetShards(*state.label).size()); MaybeInitializeExecutionState(state); std::vector responses; @@ -260,6 +261,7 @@ class ShardRequestManager : public ShardRequestManagerInterface { for (const auto &shard : state.shard_cache) { paginated_response_tracker.insert(std::make_pair(shard, PaginatedResponseState::Pending)); } + do { AwaitOnPaginatedRequests(state, responses, paginated_response_tracker); } while (!all_requests_gathered(paginated_response_tracker)); diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp index aa9f938e8..a119148e9 100644 --- a/src/storage/v3/shard_manager.hpp +++ b/src/storage/v3/shard_manager.hpp @@ -62,8 +62,8 @@ template using ShardRaft = Raft; using namespace std::chrono_literals; -static constexpr Duration kMinimumCronInterval = 1000ms; -static constexpr Duration kMaximumCronInterval = 2000ms; +static constexpr Duration kMinimumCronInterval = 100ms; +static constexpr Duration kMaximumCronInterval = 200ms; static_assert(kMinimumCronInterval < kMaximumCronInterval, "The minimum cron interval has to be smaller than the maximum cron interval!"); @@ -135,7 +135,7 @@ class ShardManager { io::Io io_; std::map> rsm_map_; std::priority_queue, std::vector>, std::greater<>> cron_schedule_; - Time next_cron_; + Time next_cron_ = Time::min(); Address coordinator_leader_; coordinator::ShardMap shard_map_; std::optional>> heartbeat_res_; diff --git a/tests/simulation/CMakeLists.txt b/tests/simulation/CMakeLists.txt index 3e8e9879d..9e1a4c71e 100644 --- a/tests/simulation/CMakeLists.txt +++ b/tests/simulation/CMakeLists.txt @@ -17,20 +17,18 @@ function(add_simulation_test test_cpp) # requires unique logical target names set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name}) - # sanitize - target_compile_options(${target_name} PRIVATE -fsanitize=${san}) - target_link_options(${target_name} PRIVATE -fsanitize=${san}) - - target_link_libraries(${target_name} mg-storage-v3 mg-communication gtest gmock mg-utils mg-io mg-io-simulator mg-coordinator Boost::headers mg-query-v2) + target_link_libraries(${target_name} mg-storage-v3 mg-communication mg-utils mg-io mg-io-simulator mg-coordinator mg-query-v2) + target_link_libraries(${target_name} Boost::headers) + target_link_libraries(${target_name} gtest gtest_main gmock rapidcheck rapidcheck_gtest) # register test add_test(${target_name} ${exec_name}) add_dependencies(memgraph__simulation ${target_name}) endfunction(add_simulation_test) -add_simulation_test(basic_request.cpp address) -add_simulation_test(raft.cpp address) -add_simulation_test(trial_query_storage/query_storage_test.cpp address) -add_simulation_test(sharded_map.cpp address) -add_simulation_test(shard_request_manager.cpp address) +add_simulation_test(basic_request.cpp) +add_simulation_test(raft.cpp) +add_simulation_test(trial_query_storage/query_storage_test.cpp) +add_simulation_test(sharded_map.cpp) add_simulation_test(shard_rsm.cpp) +add_simulation_test(cluster_property_test.cpp) diff --git a/tests/simulation/cluster_config.hpp b/tests/simulation/cluster_config.hpp new file mode 100644 index 000000000..70c9ef447 --- /dev/null +++ b/tests/simulation/cluster_config.hpp @@ -0,0 +1,51 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include + +#include "testing_constants.hpp" + +namespace memgraph::tests::simulation { + +struct ClusterConfig { + int servers; + int replication_factor; + int shards; + + friend std::ostream &operator<<(std::ostream &in, const ClusterConfig &cluster) { + in << "ClusterConfig { servers: " << cluster.servers << ", replication_factor: " << cluster.replication_factor + << ", shards: " << cluster.shards << " }"; + return in; + } +}; + +} // namespace memgraph::tests::simulation + +// Required namespace for rapidcheck generator +namespace rc { + +using memgraph::tests::simulation::ClusterConfig; + +template <> +struct Arbitrary { + static Gen arbitrary() { + return gen::build( + // gen::inRange is [inclusive min, exclusive max) + gen::set(&ClusterConfig::servers, gen::inRange(kMinimumServers, kMaximumServers)), + gen::set(&ClusterConfig::replication_factor, + gen::inRange(kMinimumReplicationFactor, kMaximumReplicationFactor)), + gen::set(&ClusterConfig::shards, gen::inRange(kMinimumShards, kMaximumShards))); + } +}; + +} // namespace rc diff --git a/tests/simulation/cluster_property_test.cpp b/tests/simulation/cluster_property_test.cpp new file mode 100644 index 000000000..0de5c21fc --- /dev/null +++ b/tests/simulation/cluster_property_test.cpp @@ -0,0 +1,50 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// This test serves as an example of a property-based model test. +// It generates a cluster configuration and a set of operations to +// apply against both the real system and a greatly simplified model. + +#include + +#include +#include +#include + +#include "generated_operations.hpp" +#include "io/simulator/simulator_config.hpp" +#include "io/time.hpp" +#include "storage/v3/shard_manager.hpp" +#include "test_cluster.hpp" + +namespace memgraph::tests::simulation { + +using io::Duration; +using io::Time; +using io::simulator::SimulatorConfig; +using storage::v3::kMaximumCronInterval; + +RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops)) { + // TODO(tyler) set abort_time to something more restrictive than Time::max() + + SimulatorConfig sim_config{ + .drop_percent = 0, + .perform_timeouts = false, + .scramble_messages = true, + .rng_seed = 0, + .start_time = Time::min(), + .abort_time = Time::max(), + }; + + RunClusterSimulation(sim_config, cluster_config, ops.ops); +} + +} // namespace memgraph::tests::simulation diff --git a/tests/simulation/generated_operations.hpp b/tests/simulation/generated_operations.hpp new file mode 100644 index 000000000..3db86a61e --- /dev/null +++ b/tests/simulation/generated_operations.hpp @@ -0,0 +1,114 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include +#include + +#include +#include + +#include "storage/v2/storage.hpp" +#include "testing_constants.hpp" +#include "utils/logging.hpp" + +namespace memgraph::tests::simulation { + +struct CreateVertex { + int first; + int second; + + friend std::ostream &operator<<(std::ostream &in, const CreateVertex &add) { + in << "CreateVertex { first: " << add.first << ", second: " << add.second << " }"; + return in; + } +}; + +struct ScanAll { + friend std::ostream &operator<<(std::ostream &in, const ScanAll &get) { + in << "ScanAll {}"; + return in; + } +}; + +using OpVariant = std::variant; + +struct Op { + OpVariant inner; + + friend std::ostream &operator<<(std::ostream &in, const Op &op) { + std::visit([&](const auto &x) { in << x; }, op.inner); + return in; + } +}; + +struct NonEmptyOpVec { + std::vector ops; + + friend std::ostream &operator<<(std::ostream &in, const NonEmptyOpVec &op) { + in << "["; + bool first = true; + for (const auto &op : op.ops) { + if (!first) { + in << ", "; + } + in << op; + first = false; + } + in << "]"; + + return in; + } +}; + +} // namespace memgraph::tests::simulation + +// Required namespace for rapidcheck generators +namespace rc { + +using namespace memgraph::tests::simulation; + +template <> +struct Arbitrary { + static Gen arbitrary() { + return gen::build(gen::set(&CreateVertex::first, gen::inRange(0, kMaximumShards + 1)), + gen::set(&CreateVertex::second, gen::inRange(0, kMaximumShards + 1))); + } +}; + +template <> +struct Arbitrary { + static Gen arbitrary() { return gen::just(ScanAll{}); } +}; + +OpVariant opHoist(ScanAll op) { return op; } +OpVariant opHoist(CreateVertex op) { return op; } + +template <> +struct ::rc::Arbitrary { + static Gen arbitrary() { + return gen::build(gen::set( + &Op::inner, gen::oneOf(gen::map(gen::arbitrary(), [](CreateVertex op) { return opHoist(op); }), + gen::map(gen::arbitrary(), [](ScanAll op) { return opHoist(op); })))); + } +}; + +template <> +struct Arbitrary { + static Gen arbitrary() { + return gen::build( + gen::set(&NonEmptyOpVec::ops, gen::nonEmpty>())); + } +}; + +} // namespace rc diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp index 92750d744..df619bd42 100644 --- a/tests/simulation/raft.cpp +++ b/tests/simulation/raft.cpp @@ -130,7 +130,7 @@ void RunSimulation() { .scramble_messages = true, .rng_seed = 0, .start_time = Time::min() + std::chrono::microseconds{256 * 1024}, - .abort_time = Time::min() + std::chrono::microseconds{8 * 1024 * 128}, + .abort_time = Time::max(), }; auto simulator = Simulator(config); diff --git a/tests/simulation/shard_rsm.cpp b/tests/simulation/shard_rsm.cpp index d5f8f2775..cadc66702 100644 --- a/tests/simulation/shard_rsm.cpp +++ b/tests/simulation/shard_rsm.cpp @@ -1074,7 +1074,7 @@ int TestMessages() { .scramble_messages = false, .rng_seed = 0, .start_time = Time::min() + std::chrono::microseconds{256 * 1024}, - .abort_time = Time::min() + std::chrono::microseconds{4 * 8 * 1024 * 1024}, + .abort_time = Time::max(), }; auto simulator = Simulator(config); diff --git a/tests/simulation/test_cluster.hpp b/tests/simulation/test_cluster.hpp new file mode 100644 index 000000000..0997f2dc2 --- /dev/null +++ b/tests/simulation/test_cluster.hpp @@ -0,0 +1,251 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include +#include +#include +#include +#include +#include + +#include + +#include "cluster_config.hpp" +#include "coordinator/coordinator_client.hpp" +#include "coordinator/coordinator_rsm.hpp" +#include "coordinator/shard_map.hpp" +#include "generated_operations.hpp" +#include "io/address.hpp" +#include "io/simulator/simulator.hpp" +#include "io/simulator/simulator_config.hpp" +#include "io/simulator/simulator_transport.hpp" +#include "machine_manager/machine_config.hpp" +#include "machine_manager/machine_manager.hpp" +#include "query/v2/requests.hpp" +#include "query/v2/shard_request_manager.hpp" +#include "testing_constants.hpp" +#include "utils/variant_helpers.hpp" + +namespace memgraph::tests::simulation { + +using coordinator::Coordinator; +using coordinator::CoordinatorClient; +using coordinator::CoordinatorReadRequests; +using coordinator::CoordinatorWriteRequests; +using coordinator::CoordinatorWriteResponses; +using coordinator::GetShardMapRequest; +using coordinator::GetShardMapResponse; +using coordinator::Hlc; +using coordinator::HlcResponse; +using coordinator::Shard; +using coordinator::ShardMap; +using io::Address; +using io::Io; +using io::rsm::RsmClient; +using io::simulator::Simulator; +using io::simulator::SimulatorConfig; +using io::simulator::SimulatorStats; +using io::simulator::SimulatorTransport; +using machine_manager::MachineConfig; +using machine_manager::MachineManager; +using msgs::ReadRequests; +using msgs::ReadResponses; +using msgs::WriteRequests; +using msgs::WriteResponses; +using storage::v3::LabelId; +using storage::v3::SchemaProperty; + +using CompoundKey = std::pair; +using ShardClient = RsmClient; + +MachineManager MkMm(Simulator &simulator, std::vector
coordinator_addresses, Address addr, + ShardMap shard_map) { + MachineConfig config{ + .coordinator_addresses = coordinator_addresses, + .is_storage = true, + .is_coordinator = true, + .listen_ip = addr.last_known_ip, + .listen_port = addr.last_known_port, + }; + + Io io = simulator.Register(addr); + + Coordinator coordinator{shard_map}; + + return MachineManager{io, config, coordinator, shard_map}; +} + +void RunMachine(MachineManager mm) { mm.Run(); } + +void WaitForShardsToInitialize(CoordinatorClient &coordinator_client) { + // Call coordinator client's read method for GetShardMap and keep + // reading it until the shard map contains proper replicas for + // each shard in the label space. + + while (true) { + GetShardMapRequest req{}; + CoordinatorReadRequests read_req = req; + auto read_res = coordinator_client.SendReadRequest(read_req); + if (read_res.HasError()) { + // timed out + continue; + } + auto response_result = read_res.GetValue(); + auto response = std::get(response_result); + auto shard_map = response.shard_map; + + if (shard_map.ClusterInitialized()) { + spdlog::info("cluster stabilized - beginning workload"); + return; + } + } +} + +ShardMap TestShardMap(int n_splits, int replication_factor) { + ShardMap sm{}; + + const std::string label_name = std::string("test_label"); + + // register new properties + const std::vector property_names = {"property_1", "property_2"}; + const auto properties = sm.AllocatePropertyIds(property_names); + const auto property_id_1 = properties.at("property_1"); + const auto property_id_2 = properties.at("property_2"); + const auto type_1 = memgraph::common::SchemaType::INT; + const auto type_2 = memgraph::common::SchemaType::INT; + + // register new label space + std::vector schema = { + SchemaProperty{.property_id = property_id_1, .type = type_1}, + SchemaProperty{.property_id = property_id_2, .type = type_2}, + }; + + std::optional label_id = sm.InitializeNewLabel(label_name, schema, replication_factor, sm.shard_map_version); + RC_ASSERT(label_id.has_value()); + + // split the shard at N split points + for (int64_t i = 1; i < n_splits; ++i) { + const auto key1 = memgraph::storage::v3::PropertyValue(i); + const auto key2 = memgraph::storage::v3::PropertyValue(0); + + const auto split_point = {key1, key2}; + + const bool split_success = sm.SplitShard(sm.shard_map_version, label_id.value(), split_point); + + RC_ASSERT(split_success); + } + + return sm; +} + +void ExecuteOp(msgs::ShardRequestManager &shard_request_manager, + std::set &correctness_model, CreateVertex create_vertex) { + const auto key1 = memgraph::storage::v3::PropertyValue(create_vertex.first); + const auto key2 = memgraph::storage::v3::PropertyValue(create_vertex.second); + + std::vector primary_key = {msgs::Value(int64_t(create_vertex.first)), + msgs::Value(int64_t(create_vertex.second))}; + + if (correctness_model.contains(std::make_pair(create_vertex.first, create_vertex.second))) { + // TODO(tyler) remove this early-return when we have properly handled setting non-unique vertexes + return; + } + + msgs::ExecutionState state; + + auto label_id = shard_request_manager.NameToLabel("test_label"); + + msgs::NewVertex nv{.primary_key = primary_key}; + nv.label_ids.push_back({label_id}); + + std::vector new_vertices; + new_vertices.push_back(std::move(nv)); + + auto result = shard_request_manager.Request(state, std::move(new_vertices)); + + RC_ASSERT(result.size() == 1); + RC_ASSERT(result[0].success); + + correctness_model.emplace(std::make_pair(create_vertex.first, create_vertex.second)); +} + +void ExecuteOp(msgs::ShardRequestManager &shard_request_manager, + std::set &correctness_model, ScanAll scan_all) { + msgs::ExecutionState request{.label = "test_label"}; + + auto results = shard_request_manager.Request(request); + + RC_ASSERT(results.size() == correctness_model.size()); + + for (const auto &vertex_accessor : results) { + const auto properties = vertex_accessor.Properties(); + const auto primary_key = vertex_accessor.Id().second; + const CompoundKey model_key = std::make_pair(primary_key[0].int_v, primary_key[1].int_v); + RC_ASSERT(correctness_model.contains(model_key)); + } +} + +void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig &cluster_config, + const std::vector &ops) { + spdlog::info("========================== NEW SIMULATION =========================="); + + auto simulator = Simulator(sim_config); + + auto cli_addr = Address::TestAddress(1); + auto machine_1_addr = cli_addr.ForkUniqueAddress(); + + Io cli_io = simulator.Register(cli_addr); + + auto coordinator_addresses = std::vector{ + machine_1_addr, + }; + + ShardMap initialization_sm = TestShardMap(cluster_config.shards - 1, cluster_config.replication_factor); + + auto mm_1 = MkMm(simulator, coordinator_addresses, machine_1_addr, initialization_sm); + Address coordinator_address = mm_1.CoordinatorAddress(); + + auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1)); + + // Need to detach this thread so that the destructor does not + // block before we can propagate assertion failures. + mm_thread_1.detach(); + + // TODO(tyler) clarify addresses of coordinator etc... as it's a mess + + CoordinatorClient coordinator_client(cli_io, coordinator_address, {coordinator_address}); + WaitForShardsToInitialize(coordinator_client); + + msgs::ShardRequestManager shard_request_manager(std::move(coordinator_client), std::move(cli_io)); + + shard_request_manager.StartTransaction(); + + auto correctness_model = std::set{}; + + for (const Op &op : ops) { + std::visit([&](auto &o) { ExecuteOp(shard_request_manager, correctness_model, o); }, op.inner); + } + + simulator.ShutDown(); + + SimulatorStats stats = simulator.Stats(); + + spdlog::info("total messages: {}", stats.total_messages); + spdlog::info("dropped messages: {}", stats.dropped_messages); + spdlog::info("timed out requests: {}", stats.timed_out_requests); + spdlog::info("total requests: {}", stats.total_requests); + spdlog::info("total responses: {}", stats.total_responses); + spdlog::info("simulator ticks: {}", stats.simulator_ticks); + + spdlog::info("========================== SUCCESS :) =========================="); +} + +} // namespace memgraph::tests::simulation diff --git a/tests/simulation/testing_constants.hpp b/tests/simulation/testing_constants.hpp new file mode 100644 index 000000000..70fbc0cba --- /dev/null +++ b/tests/simulation/testing_constants.hpp @@ -0,0 +1,28 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +namespace memgraph::tests::simulation { + +// TODO(tyler) increase this when we start standing up multiple machines in cluster tests +static constexpr auto kMinimumShards = 1; +static constexpr auto kMaximumShards = kMinimumShards + 10; + +// TODO(tyler) increase this when we start standing up multiple machines in cluster tests +static constexpr auto kMinimumServers = 1; +static constexpr auto kMaximumServers = kMinimumServers + 1; + +// TODO(tyler) increase this when we start standing up multiple machines in cluster tests +static constexpr auto kMinimumReplicationFactor = 1; +static constexpr auto kMaximumReplicationFactor = kMinimumReplicationFactor + 1; + +} // namespace memgraph::tests::simulation diff --git a/tests/simulation/trial_query_storage/query_storage_test.cpp b/tests/simulation/trial_query_storage/query_storage_test.cpp index 9cdff4ee6..8ef12bdb8 100644 --- a/tests/simulation/trial_query_storage/query_storage_test.cpp +++ b/tests/simulation/trial_query_storage/query_storage_test.cpp @@ -20,8 +20,8 @@ #include "messages.hpp" namespace memgraph::tests::simulation { -using memgraph::io::Io; -using memgraph::io::simulator::SimulatorTransport; +using io::Io; +using io::simulator::SimulatorTransport; void run_server(Io io) { while (!io.ShouldShutDown()) { From 332afadf21b13e48ffa87595b93b943eeebe96f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Tue, 25 Oct 2022 10:27:13 +0200 Subject: [PATCH 2/2] Split file parsing (#600) Add temporary support for split files. This is only temporary solution until we get the shard splitting implemented. --- src/coordinator/shard_map.cpp | 433 +++++++++++++++++++++++++++ src/coordinator/shard_map.hpp | 306 ++----------------- src/memgraph.cpp | 28 +- src/query/v2/CMakeLists.txt | 2 +- tests/simulation/shard_rsm.cpp | 1 - tests/unit/CMakeLists.txt | 45 +-- tests/unit/coordinator_shard_map.cpp | 104 +++++++ 7 files changed, 604 insertions(+), 315 deletions(-) create mode 100644 tests/unit/coordinator_shard_map.cpp diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp index 88f878b4e..87b449301 100644 --- a/src/coordinator/shard_map.cpp +++ b/src/coordinator/shard_map.cpp @@ -9,8 +9,17 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#include +#include + +#include "common/types.hpp" #include "coordinator/shard_map.hpp" +#include "spdlog/spdlog.h" +#include "storage/v3/schemas.hpp" #include "storage/v3/temporal.hpp" +#include "utils/cast.hpp" +#include "utils/exceptions.hpp" +#include "utils/string.hpp" namespace memgraph::coordinator { @@ -57,6 +66,259 @@ PrimaryKey SchemaToMinKey(const std::vector &schema) { return ret; } +ShardMap ShardMap::Parse(std::istream &input_stream) { + ShardMap shard_map; + const auto read_size = [&input_stream] { + size_t size{0}; + input_stream >> size; + return size; + }; + + // Reads a string until the next whitespace + const auto read_word = [&input_stream] { + std::string word; + input_stream >> word; + return word; + }; + + const auto read_names = [&read_size, &read_word] { + const auto number_of_names = read_size(); + spdlog::trace("Reading {} names", number_of_names); + std::vector names; + names.reserve(number_of_names); + + for (auto name_index = 0; name_index < number_of_names; ++name_index) { + names.push_back(read_word()); + spdlog::trace("Read '{}'", names.back()); + } + return names; + }; + + const auto read_line = [&input_stream] { + std::string line; + std::getline(input_stream, line); + return line; + }; + + const auto parse_type = [](const std::string &type) { + static const auto type_map = std::unordered_map{ + {"string", common::SchemaType::STRING}, {"int", common::SchemaType::INT}, {"bool", common::SchemaType::BOOL}}; + const auto lower_case_type = utils::ToLowerCase(type); + auto it = type_map.find(lower_case_type); + MG_ASSERT(it != type_map.end(), "Invalid type in split files: {}", type); + return it->second; + }; + + const auto parse_property_value = [](std::string text, const common::SchemaType type) { + if (type == common::SchemaType::STRING) { + return storage::v3::PropertyValue{std::move(text)}; + } + if (type == common::SchemaType::INT) { + size_t processed{0}; + int64_t value = std::stoll(text, &processed); + MG_ASSERT(processed == text.size() || text[processed] == ' ', "Invalid integer format: '{}'", text); + return storage::v3::PropertyValue{value}; + } + LOG_FATAL("Not supported type: {}", utils::UnderlyingCast(type)); + }; + + spdlog::debug("Reading properties"); + const auto properties = read_names(); + MG_ASSERT(shard_map.AllocatePropertyIds(properties).size() == properties.size(), + "Unexpected number of properties created!"); + + spdlog::debug("Reading edge types"); + const auto edge_types = read_names(); + MG_ASSERT(shard_map.AllocateEdgeTypeIds(edge_types).size() == edge_types.size(), + "Unexpected number of properties created!"); + + spdlog::debug("Reading primary labels"); + const auto number_of_primary_labels = read_size(); + spdlog::debug("Reading {} primary labels", number_of_primary_labels); + + for (auto label_index = 0; label_index < number_of_primary_labels; ++label_index) { + const auto primary_label = read_word(); + spdlog::debug("Reading primary label named '{}'", primary_label); + const auto number_of_primary_properties = read_size(); + spdlog::debug("Reading {} primary properties", number_of_primary_properties); + std::vector pp_names; + std::vector pp_types; + pp_names.reserve(number_of_primary_properties); + pp_types.reserve(number_of_primary_properties); + for (auto property_index = 0; property_index < number_of_primary_properties; ++property_index) { + pp_names.push_back(read_word()); + spdlog::debug("Reading primary property named '{}'", pp_names.back()); + pp_types.push_back(parse_type(read_word())); + } + auto pp_mapping = shard_map.AllocatePropertyIds(pp_names); + std::vector schema; + schema.reserve(number_of_primary_properties); + + for (auto property_index = 0; property_index < number_of_primary_properties; ++property_index) { + schema.push_back(storage::v3::SchemaProperty{pp_mapping.at(pp_names[property_index]), pp_types[property_index]}); + } + const auto hlc = shard_map.GetHlc(); + MG_ASSERT(shard_map.InitializeNewLabel(primary_label, schema, 1, hlc).has_value(), + "Cannot initialize new label: {}", primary_label); + + const auto number_of_split_points = read_size(); + spdlog::debug("Reading {} split points", number_of_split_points); + + [[maybe_unused]] const auto remainder_from_last_line = read_line(); + for (auto split_point_index = 0; split_point_index < number_of_split_points; ++split_point_index) { + const auto line = read_line(); + spdlog::debug("Read split point '{}'", line); + MG_ASSERT(line.front() == '[', "Invalid split file format!"); + MG_ASSERT(line.back() == ']', "Invalid split file format!"); + std::string_view line_view{line}; + line_view.remove_prefix(1); + line_view.remove_suffix(1); + static constexpr std::string_view kDelimiter{","}; + auto pk_values_as_text = utils::Split(line_view, kDelimiter); + std::vector pk; + pk.reserve(number_of_primary_properties); + MG_ASSERT(pk_values_as_text.size() == number_of_primary_properties, + "Split point contains invalid number of values '{}'", line); + + for (auto property_index = 0; property_index < number_of_primary_properties; ++property_index) { + pk.push_back(parse_property_value(std::move(pk_values_as_text[property_index]), schema[property_index].type)); + } + shard_map.SplitShard(shard_map.GetHlc(), shard_map.labels.at(primary_label), pk); + } + } + + return shard_map; +} + +std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map) { + using utils::print_helpers::operator<<; + + in << "ShardMap { shard_map_version: " << shard_map.shard_map_version; + in << ", max_property_id: " << shard_map.max_property_id; + in << ", max_edge_type_id: " << shard_map.max_edge_type_id; + in << ", properties: " << shard_map.properties; + in << ", edge_types: " << shard_map.edge_types; + in << ", max_label_id: " << shard_map.max_label_id; + in << ", labels: " << shard_map.labels; + in << ", label_spaces: " << shard_map.label_spaces; + in << ", schemas: " << shard_map.schemas; + in << "}"; + return in; +} + +Shards ShardMap::GetShards(const LabelName &label) { + const auto id = labels.at(label); + auto &shards = label_spaces.at(id).shards; + return shards; +} + +// TODO(gabor) later we will want to update the wallclock time with +// the given Io's time as well +Hlc ShardMap::IncrementShardMapVersion() noexcept { + ++shard_map_version.logical_id; + return shard_map_version; +} + +Hlc ShardMap::GetHlc() const noexcept { return shard_map_version; } + +std::vector ShardMap::AssignShards(Address storage_manager, + std::set initialized) { + std::vector ret{}; + + bool mutated = false; + + for (auto &[label_id, label_space] : label_spaces) { + for (auto it = label_space.shards.begin(); it != label_space.shards.end(); it++) { + auto &[low_key, shard] = *it; + std::optional high_key; + if (const auto next_it = std::next(it); next_it != label_space.shards.end()) { + high_key = next_it->first; + } + // TODO(tyler) avoid these triple-nested loops by having the heartbeat include better info + bool machine_contains_shard = false; + + for (auto &aas : shard) { + if (initialized.contains(aas.address.unique_id)) { + machine_contains_shard = true; + if (aas.status != Status::CONSENSUS_PARTICIPANT) { + spdlog::info("marking shard as full consensus participant: {}", aas.address.unique_id); + aas.status = Status::CONSENSUS_PARTICIPANT; + } + } else { + const bool same_machine = aas.address.last_known_ip == storage_manager.last_known_ip && + aas.address.last_known_port == storage_manager.last_known_port; + if (same_machine) { + machine_contains_shard = true; + spdlog::info("reminding shard manager that they should begin participating in shard"); + ret.push_back(ShardToInitialize{ + .uuid = aas.address.unique_id, + .label_id = label_id, + .min_key = low_key, + .max_key = high_key, + .schema = schemas[label_id], + .config = Config{}, + }); + } + } + } + + if (!machine_contains_shard && shard.size() < label_space.replication_factor) { + Address address = storage_manager; + + // TODO(tyler) use deterministic UUID so that coordinators don't diverge here + address.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()}, + + spdlog::info("assigning shard manager to shard"); + + ret.push_back(ShardToInitialize{ + .uuid = address.unique_id, + .label_id = label_id, + .min_key = low_key, + .max_key = high_key, + .schema = schemas[label_id], + .config = Config{}, + }); + + AddressAndStatus aas = { + .address = address, + .status = Status::INITIALIZING, + }; + + shard.emplace_back(aas); + } + } + } + + if (mutated) { + IncrementShardMapVersion(); + } + return ret; +} + +bool ShardMap::SplitShard(Hlc previous_shard_map_version, LabelId label_id, const PrimaryKey &key) { + if (previous_shard_map_version != shard_map_version) { + return false; + } + + auto &label_space = label_spaces.at(label_id); + auto &shards_in_map = label_space.shards; + + MG_ASSERT(!shards_in_map.empty()); + MG_ASSERT(!shards_in_map.contains(key)); + MG_ASSERT(label_spaces.contains(label_id)); + + // Finding the Shard that the new PrimaryKey should map to. + auto prev = std::prev(shards_in_map.upper_bound(key)); + Shard duplicated_shard = prev->second; + + // Apply the split + shards_in_map[key] = duplicated_shard; + + IncrementShardMapVersion(); + + return true; +} + std::optional ShardMap::InitializeNewLabel(std::string label_name, std::vector schema, size_t replication_factor, Hlc last_shard_map_version) { if (shard_map_version != last_shard_map_version || labels.contains(label_name)) { @@ -88,4 +350,175 @@ std::optional ShardMap::InitializeNewLabel(std::string label_name, std: return label_id; } +void ShardMap::AddServer(Address server_address) { + // Find a random place for the server to plug in +} +std::optional ShardMap::GetLabelId(const std::string &label) const { + if (const auto it = labels.find(label); it != labels.end()) { + return it->second; + } + + return std::nullopt; +} + +std::string ShardMap::GetLabelName(const LabelId label) const { + if (const auto it = + std::ranges::find_if(labels, [label](const auto &name_id_pair) { return name_id_pair.second == label; }); + it != labels.end()) { + return it->first; + } + throw utils::BasicException("GetLabelName fails on the given label id!"); +} + +std::optional ShardMap::GetPropertyId(const std::string &property_name) const { + if (const auto it = properties.find(property_name); it != properties.end()) { + return it->second; + } + + return std::nullopt; +} + +std::string ShardMap::GetPropertyName(const PropertyId property) const { + if (const auto it = std::ranges::find_if( + properties, [property](const auto &name_id_pair) { return name_id_pair.second == property; }); + it != properties.end()) { + return it->first; + } + throw utils::BasicException("PropertyId not found!"); +} + +std::optional ShardMap::GetEdgeTypeId(const std::string &edge_type) const { + if (const auto it = edge_types.find(edge_type); it != edge_types.end()) { + return it->second; + } + + return std::nullopt; +} + +std::string ShardMap::GetEdgeTypeName(const EdgeTypeId property) const { + if (const auto it = std::ranges::find_if( + edge_types, [property](const auto &name_id_pair) { return name_id_pair.second == property; }); + it != edge_types.end()) { + return it->first; + } + throw utils::BasicException("EdgeTypeId not found!"); +} +Shards ShardMap::GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, + const PrimaryKey &end_key) const { + MG_ASSERT(start_key <= end_key); + MG_ASSERT(labels.contains(label_name)); + + LabelId label_id = labels.at(label_name); + + const auto &label_space = label_spaces.at(label_id); + + const auto &shards_for_label = label_space.shards; + + MG_ASSERT(shards_for_label.begin()->first <= start_key, + "the ShardMap must always contain a minimal key that is less than or equal to any requested key"); + + auto it = std::prev(shards_for_label.upper_bound(start_key)); + const auto end_it = shards_for_label.upper_bound(end_key); + + Shards shards{}; + + std::copy(it, end_it, std::inserter(shards, shards.end())); + + return shards; +} + +Shard ShardMap::GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const { + MG_ASSERT(labels.contains(label_name)); + + LabelId label_id = labels.at(label_name); + + const auto &label_space = label_spaces.at(label_id); + + MG_ASSERT(label_space.shards.begin()->first <= key, + "the ShardMap must always contain a minimal key that is less than or equal to any requested key"); + + return std::prev(label_space.shards.upper_bound(key))->second; +} + +Shard ShardMap::GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const { + MG_ASSERT(label_spaces.contains(label_id)); + + const auto &label_space = label_spaces.at(label_id); + + MG_ASSERT(label_space.shards.begin()->first <= key, + "the ShardMap must always contain a minimal key that is less than or equal to any requested key"); + + return std::prev(label_space.shards.upper_bound(key))->second; +} + +PropertyMap ShardMap::AllocatePropertyIds(const std::vector &new_properties) { + PropertyMap ret{}; + + bool mutated = false; + + for (const auto &property_name : new_properties) { + if (properties.contains(property_name)) { + auto property_id = properties.at(property_name); + ret.emplace(property_name, property_id); + } else { + mutated = true; + + const PropertyId property_id = PropertyId::FromUint(++max_property_id); + ret.emplace(property_name, property_id); + properties.emplace(property_name, property_id); + } + } + + if (mutated) { + IncrementShardMapVersion(); + } + + return ret; +} + +EdgeTypeIdMap ShardMap::AllocateEdgeTypeIds(const std::vector &new_edge_types) { + EdgeTypeIdMap ret; + + bool mutated = false; + + for (const auto &edge_type_name : new_edge_types) { + if (edge_types.contains(edge_type_name)) { + auto edge_type_id = edge_types.at(edge_type_name); + ret.emplace(edge_type_name, edge_type_id); + } else { + mutated = true; + + const EdgeTypeId edge_type_id = EdgeTypeId::FromUint(++max_edge_type_id); + ret.emplace(edge_type_name, edge_type_id); + edge_types.emplace(edge_type_name, edge_type_id); + } + } + + if (mutated) { + IncrementShardMapVersion(); + } + + return ret; +} + +bool ShardMap::ClusterInitialized() const { + for (const auto &[label_id, label_space] : label_spaces) { + for (const auto &[low_key, shard] : label_space.shards) { + if (shard.size() < label_space.replication_factor) { + spdlog::info("label_space below desired replication factor"); + return false; + } + + for (const auto &aas : shard) { + if (aas.status != Status::CONSENSUS_PARTICIPANT) { + spdlog::info("shard member not yet a CONSENSUS_PARTICIPANT"); + return false; + } + } + } + } + + return true; +} + } // namespace memgraph::coordinator diff --git a/src/coordinator/shard_map.hpp b/src/coordinator/shard_map.hpp index aa0f13d09..b637e2300 100644 --- a/src/coordinator/shard_map.hpp +++ b/src/coordinator/shard_map.hpp @@ -96,6 +96,7 @@ PrimaryKey SchemaToMinKey(const std::vector &schema); struct LabelSpace { std::vector schema; + // Maps between the smallest primary key stored in the shard and the shard std::map shards; size_t replication_factor; @@ -123,311 +124,48 @@ struct ShardMap { std::map label_spaces; std::map> schemas; - friend std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map) { - using utils::print_helpers::operator<<; + [[nodiscard]] static ShardMap Parse(std::istream &input_stream); + friend std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map); - in << "ShardMap { shard_map_version: " << shard_map.shard_map_version; - in << ", max_property_id: " << shard_map.max_property_id; - in << ", max_edge_type_id: " << shard_map.max_edge_type_id; - in << ", properties: " << shard_map.properties; - in << ", edge_types: " << shard_map.edge_types; - in << ", max_label_id: " << shard_map.max_label_id; - in << ", labels: " << shard_map.labels; - in << ", label_spaces: " << shard_map.label_spaces; - in << ", schemas: " << shard_map.schemas; - in << "}"; - return in; - } - - Shards GetShards(const LabelName &label) { - const auto id = labels.at(label); - auto &shards = label_spaces.at(id).shards; - return shards; - } + Shards GetShards(const LabelName &label); // TODO(gabor) later we will want to update the wallclock time with // the given Io's time as well - Hlc IncrementShardMapVersion() noexcept { - ++shard_map_version.logical_id; - return shard_map_version; - } - - Hlc GetHlc() const noexcept { return shard_map_version; } + Hlc IncrementShardMapVersion() noexcept; + Hlc GetHlc() const noexcept; // Returns the shard UUIDs that have been assigned but not yet acknowledged for this storage manager - std::vector AssignShards(Address storage_manager, std::set initialized) { - std::vector ret{}; + std::vector AssignShards(Address storage_manager, std::set initialized); - bool mutated = false; - - for (auto &[label_id, label_space] : label_spaces) { - for (auto it = label_space.shards.begin(); it != label_space.shards.end(); it++) { - auto &[low_key, shard] = *it; - std::optional high_key; - if (const auto next_it = std::next(it); next_it != label_space.shards.end()) { - high_key = next_it->first; - } - // TODO(tyler) avoid these triple-nested loops by having the heartbeat include better info - bool machine_contains_shard = false; - - for (auto &aas : shard) { - if (initialized.contains(aas.address.unique_id)) { - machine_contains_shard = true; - if (aas.status != Status::CONSENSUS_PARTICIPANT) { - spdlog::info("marking shard as full consensus participant: {}", aas.address.unique_id); - aas.status = Status::CONSENSUS_PARTICIPANT; - } - } else { - const bool same_machine = aas.address.last_known_ip == storage_manager.last_known_ip && - aas.address.last_known_port == storage_manager.last_known_port; - if (same_machine) { - machine_contains_shard = true; - spdlog::info("reminding shard manager that they should begin participating in shard"); - ret.push_back(ShardToInitialize{ - .uuid = aas.address.unique_id, - .label_id = label_id, - .min_key = low_key, - .max_key = high_key, - .schema = schemas[label_id], - .config = Config{}, - }); - } - } - } - - if (!machine_contains_shard && shard.size() < label_space.replication_factor) { - Address address = storage_manager; - - // TODO(tyler) use deterministic UUID so that coordinators don't diverge here - address.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()}, - - spdlog::info("assigning shard manager to shard"); - - ret.push_back(ShardToInitialize{ - .uuid = address.unique_id, - .label_id = label_id, - .min_key = low_key, - .max_key = high_key, - .schema = schemas[label_id], - .config = Config{}, - }); - - AddressAndStatus aas = { - .address = address, - .status = Status::INITIALIZING, - }; - - shard.emplace_back(aas); - } - } - } - - if (mutated) { - IncrementShardMapVersion(); - } - - return ret; - } - - bool SplitShard(Hlc previous_shard_map_version, LabelId label_id, const PrimaryKey &key) { - if (previous_shard_map_version != shard_map_version) { - return false; - } - - auto &label_space = label_spaces.at(label_id); - auto &shards_in_map = label_space.shards; - - MG_ASSERT(!shards_in_map.empty()); - MG_ASSERT(!shards_in_map.contains(key)); - MG_ASSERT(label_spaces.contains(label_id)); - - // Finding the Shard that the new PrimaryKey should map to. - auto prev = std::prev(shards_in_map.upper_bound(key)); - Shard duplicated_shard = prev->second; - - // Apply the split - shards_in_map[key] = duplicated_shard; - - return true; - } + bool SplitShard(Hlc previous_shard_map_version, LabelId label_id, const PrimaryKey &key); std::optional InitializeNewLabel(std::string label_name, std::vector schema, size_t replication_factor, Hlc last_shard_map_version); - void AddServer(Address server_address) { - // Find a random place for the server to plug in - } + void AddServer(Address server_address); - std::optional GetLabelId(const std::string &label) const { - if (const auto it = labels.find(label); it != labels.end()) { - return it->second; - } + std::optional GetLabelId(const std::string &label) const; + // TODO(antaljanosbenjamin): Remove this and instead use NameIdMapper + std::string GetLabelName(LabelId label) const; + std::optional GetPropertyId(const std::string &property_name) const; + std::string GetPropertyName(PropertyId property) const; + std::optional GetEdgeTypeId(const std::string &edge_type) const; + std::string GetEdgeTypeName(EdgeTypeId property) const; - return std::nullopt; - } + Shards GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const; - std::string GetLabelName(const LabelId label) const { - if (const auto it = - std::ranges::find_if(labels, [label](const auto &name_id_pair) { return name_id_pair.second == label; }); - it != labels.end()) { - return it->first; - } - throw utils::BasicException("GetLabelName fails on the given label id!"); - } + Shard GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const; - std::optional GetPropertyId(const std::string &property_name) const { - if (const auto it = properties.find(property_name); it != properties.end()) { - return it->second; - } + Shard GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const; - return std::nullopt; - } + PropertyMap AllocatePropertyIds(const std::vector &new_properties); - std::string GetPropertyName(const PropertyId property) const { - if (const auto it = std::ranges::find_if( - properties, [property](const auto &name_id_pair) { return name_id_pair.second == property; }); - it != properties.end()) { - return it->first; - } - throw utils::BasicException("PropertyId not found!"); - } - - std::optional GetEdgeTypeId(const std::string &edge_type) const { - if (const auto it = edge_types.find(edge_type); it != edge_types.end()) { - return it->second; - } - - return std::nullopt; - } - - std::string GetEdgeTypeName(const EdgeTypeId property) const { - if (const auto it = std::ranges::find_if( - edge_types, [property](const auto &name_id_pair) { return name_id_pair.second == property; }); - it != edge_types.end()) { - return it->first; - } - throw utils::BasicException("EdgeTypeId not found!"); - } - - Shards GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const { - MG_ASSERT(start_key <= end_key); - MG_ASSERT(labels.contains(label_name)); - - LabelId label_id = labels.at(label_name); - - const auto &label_space = label_spaces.at(label_id); - - const auto &shards_for_label = label_space.shards; - - MG_ASSERT(shards_for_label.begin()->first <= start_key, - "the ShardMap must always contain a minimal key that is less than or equal to any requested key"); - - auto it = std::prev(shards_for_label.upper_bound(start_key)); - const auto end_it = shards_for_label.upper_bound(end_key); - - Shards shards{}; - - std::copy(it, end_it, std::inserter(shards, shards.end())); - - return shards; - } - - Shard GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const { - MG_ASSERT(labels.contains(label_name)); - - LabelId label_id = labels.at(label_name); - - const auto &label_space = label_spaces.at(label_id); - - MG_ASSERT(label_space.shards.begin()->first <= key, - "the ShardMap must always contain a minimal key that is less than or equal to any requested key"); - - return std::prev(label_space.shards.upper_bound(key))->second; - } - - Shard GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const { - MG_ASSERT(label_spaces.contains(label_id)); - - const auto &label_space = label_spaces.at(label_id); - - MG_ASSERT(label_space.shards.begin()->first <= key, - "the ShardMap must always contain a minimal key that is less than or equal to any requested key"); - - return std::prev(label_space.shards.upper_bound(key))->second; - } - - PropertyMap AllocatePropertyIds(const std::vector &new_properties) { - PropertyMap ret{}; - - bool mutated = false; - - for (const auto &property_name : new_properties) { - if (properties.contains(property_name)) { - auto property_id = properties.at(property_name); - ret.emplace(property_name, property_id); - } else { - mutated = true; - - const PropertyId property_id = PropertyId::FromUint(++max_property_id); - ret.emplace(property_name, property_id); - properties.emplace(property_name, property_id); - } - } - - if (mutated) { - IncrementShardMapVersion(); - } - - return ret; - } - - EdgeTypeIdMap AllocateEdgeTypeIds(const std::vector &new_edge_types) { - EdgeTypeIdMap ret; - - bool mutated = false; - - for (const auto &edge_type_name : new_edge_types) { - if (edge_types.contains(edge_type_name)) { - auto edge_type_id = edge_types.at(edge_type_name); - ret.emplace(edge_type_name, edge_type_id); - } else { - mutated = true; - - const EdgeTypeId edge_type_id = EdgeTypeId::FromUint(++max_edge_type_id); - ret.emplace(edge_type_name, edge_type_id); - edge_types.emplace(edge_type_name, edge_type_id); - } - } - - if (mutated) { - IncrementShardMapVersion(); - } - - return ret; - } + EdgeTypeIdMap AllocateEdgeTypeIds(const std::vector &new_edge_types); /// Returns true if all shards have the desired number of replicas and they are in /// the CONSENSUS_PARTICIPANT state. Note that this does not necessarily mean that /// there is also an active leader for each shard. - bool ClusterInitialized() const { - for (const auto &[label_id, label_space] : label_spaces) { - for (const auto &[low_key, shard] : label_space.shards) { - if (shard.size() < label_space.replication_factor) { - spdlog::info("label_space below desired replication factor"); - return false; - } - - for (const auto &aas : shard) { - if (aas.status != Status::CONSENSUS_PARTICIPANT) { - spdlog::info("shard member not yet a CONSENSUS_PARTICIPANT"); - return false; - } - } - } - } - - return true; - } + bool ClusterInitialized() const; }; } // namespace memgraph::coordinator diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 9c4d71e94..c696615b5 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -265,6 +266,10 @@ DEFINE_uint64( "Total memory limit in MiB. Set to 0 to use the default values which are 100\% of the phyisical memory if the swap " "is enabled and 90\% of the physical memory otherwise."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(split_file, "", + "Path to the split file which contains the predefined labels, properties, edge types and shard-ranges."); + namespace { using namespace std::literals; inline constexpr std::array isolation_level_mappings{ @@ -639,15 +644,22 @@ int main(int argc, char **argv) { .listen_port = unique_local_addr_query.last_known_port, }; - const std::string property{"property"}; - const std::string label{"label"}; memgraph::coordinator::ShardMap sm; - auto prop_map = sm.AllocatePropertyIds(std::vector{property}); - auto edge_type_map = sm.AllocateEdgeTypeIds(std::vector{"TO"}); - std::vector schema{{prop_map.at(property), memgraph::common::SchemaType::INT}}; - sm.InitializeNewLabel(label, schema, 1, sm.shard_map_version); - sm.SplitShard(sm.GetHlc(), *sm.GetLabelId(label), - std::vector{memgraph::storage::v3::PropertyValue{2}}); + if (FLAGS_split_file.empty()) { + const std::string property{"property"}; + const std::string label{"label"}; + auto prop_map = sm.AllocatePropertyIds(std::vector{property}); + auto edge_type_map = sm.AllocateEdgeTypeIds(std::vector{"TO"}); + std::vector schema{ + {prop_map.at(property), memgraph::common::SchemaType::INT}}; + sm.InitializeNewLabel(label, schema, 1, sm.shard_map_version); + sm.SplitShard(sm.GetHlc(), *sm.GetLabelId(label), + std::vector{memgraph::storage::v3::PropertyValue{2}}); + } else { + std::ifstream input{FLAGS_split_file, std::ios::in}; + MG_ASSERT(input.is_open(), "Cannot open split file to read: {}", FLAGS_split_file); + sm = memgraph::coordinator::ShardMap::Parse(input); + } memgraph::coordinator::Coordinator coordinator{sm}; diff --git a/src/query/v2/CMakeLists.txt b/src/query/v2/CMakeLists.txt index 03167341a..3c3f780c8 100644 --- a/src/query/v2/CMakeLists.txt +++ b/src/query/v2/CMakeLists.txt @@ -33,7 +33,7 @@ add_dependencies(mg-query-v2 generate_lcp_query_v2) target_include_directories(mg-query-v2 PUBLIC ${CMAKE_SOURCE_DIR}/include) target_include_directories(mg-query-v2 PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/bindings) target_link_libraries(mg-query-v2 dl cppitertools Boost::headers) -target_link_libraries(mg-query-v2 mg-integrations-pulsar mg-integrations-kafka mg-storage-v3 mg-license mg-utils mg-kvstore mg-memory) +target_link_libraries(mg-query-v2 mg-integrations-pulsar mg-integrations-kafka mg-storage-v3 mg-license mg-utils mg-kvstore mg-memory mg-coordinator) target_link_libraries(mg-query-v2 mg-expr) if(NOT "${MG_PYTHON_PATH}" STREQUAL "") diff --git a/tests/simulation/shard_rsm.cpp b/tests/simulation/shard_rsm.cpp index cadc66702..64d0a0861 100644 --- a/tests/simulation/shard_rsm.cpp +++ b/tests/simulation/shard_rsm.cpp @@ -1046,7 +1046,6 @@ void TestExpandOneGraphTwo(ShardClient &client) { MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2)); auto edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger()); - auto wrong_edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger()); auto edge_gid_1 = GetUniqueInteger(); auto edge_gid_2 = GetUniqueInteger(); diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index d70c4d867..f3a95c1d3 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -333,36 +333,35 @@ target_link_libraries(${test_prefix}storage_v3_schema mg-storage-v3) # Test mg-query-v2 # These are commented out because of the new TypedValue in the query engine -#add_unit_test(query_v2_interpreter.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp) -#target_link_libraries(${test_prefix}query_v2_interpreter mg-storage-v3 mg-query-v2 mg-communication) +# add_unit_test(query_v2_interpreter.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp) +# target_link_libraries(${test_prefix}query_v2_interpreter mg-storage-v3 mg-query-v2 mg-communication) # -#add_unit_test(query_v2_query_plan_accumulate_aggregate.cpp) -#target_link_libraries(${test_prefix}query_v2_query_plan_accumulate_aggregate mg-query-v2) +# add_unit_test(query_v2_query_plan_accumulate_aggregate.cpp) +# target_link_libraries(${test_prefix}query_v2_query_plan_accumulate_aggregate mg-query-v2) # -#add_unit_test(query_v2_query_plan_create_set_remove_delete.cpp) -#target_link_libraries(${test_prefix}query_v2_query_plan_create_set_remove_delete mg-query-v2 mg-expr) +# add_unit_test(query_v2_query_plan_create_set_remove_delete.cpp) +# target_link_libraries(${test_prefix}query_v2_query_plan_create_set_remove_delete mg-query-v2 mg-expr) # -#add_unit_test(query_v2_query_plan_bag_semantics.cpp) -#target_link_libraries(${test_prefix}query_v2_query_plan_bag_semantics mg-query-v2) +# add_unit_test(query_v2_query_plan_bag_semantics.cpp) +# target_link_libraries(${test_prefix}query_v2_query_plan_bag_semantics mg-query-v2) # -#add_unit_test(query_v2_query_plan_edge_cases.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp) -#target_link_libraries(${test_prefix}query_v2_query_plan_edge_cases mg-communication mg-query-v2) +# add_unit_test(query_v2_query_plan_edge_cases.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp) +# target_link_libraries(${test_prefix}query_v2_query_plan_edge_cases mg-communication mg-query-v2) # -#add_unit_test(query_v2_query_plan_v2_create_set_remove_delete.cpp) -#target_link_libraries(${test_prefix}query_v2_query_plan_v2_create_set_remove_delete mg-query-v2) +# add_unit_test(query_v2_query_plan_v2_create_set_remove_delete.cpp) +# target_link_libraries(${test_prefix}query_v2_query_plan_v2_create_set_remove_delete mg-query-v2) # -#add_unit_test(query_v2_query_plan_match_filter_return.cpp) -#target_link_libraries(${test_prefix}query_v2_query_plan_match_filter_return mg-query-v2) +# add_unit_test(query_v2_query_plan_match_filter_return.cpp) +# target_link_libraries(${test_prefix}query_v2_query_plan_match_filter_return mg-query-v2) # -#add_unit_test(query_v2_cypher_main_visitor.cpp) -#target_link_libraries(${test_prefix}query_v2_cypher_main_visitor mg-query-v2) +# add_unit_test(query_v2_cypher_main_visitor.cpp) +# target_link_libraries(${test_prefix}query_v2_cypher_main_visitor mg-query-v2) # -#add_unit_test(query_v2_query_required_privileges.cpp) -#target_link_libraries(${test_prefix}query_v2_query_required_privileges mg-query-v2) +# add_unit_test(query_v2_query_required_privileges.cpp) +# target_link_libraries(${test_prefix}query_v2_query_required_privileges mg-query-v2) # -#add_unit_test(replication_persistence_helper.cpp) -#target_link_libraries(${test_prefix}replication_persistence_helper mg-storage-v2) - +# add_unit_test(replication_persistence_helper.cpp) +# target_link_libraries(${test_prefix}replication_persistence_helper mg-storage-v2) add_unit_test(query_v2_dummy_test.cpp) target_link_libraries(${test_prefix}query_v2_dummy_test mg-query-v2) @@ -436,3 +435,7 @@ target_link_libraries(${test_prefix}machine_manager mg-io mg-coordinator mg-stor add_unit_test(pretty_print_ast_to_original_expression_test.cpp) target_link_libraries(${test_prefix}pretty_print_ast_to_original_expression_test mg-io mg-expr mg-query-v2) + +# Tests for mg-coordinator +add_unit_test(coordinator_shard_map.cpp) +target_link_libraries(${test_prefix}coordinator_shard_map mg-coordinator) diff --git a/tests/unit/coordinator_shard_map.cpp b/tests/unit/coordinator_shard_map.cpp new file mode 100644 index 000000000..4bfb43a24 --- /dev/null +++ b/tests/unit/coordinator_shard_map.cpp @@ -0,0 +1,104 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include +#include + +#include "common/types.hpp" +#include "coordinator/shard_map.hpp" +#include "gtest/gtest.h" +#include "storage/v3/id_types.hpp" +#include "storage/v3/property_value.hpp" +#include "storage/v3/schemas.hpp" + +namespace memgraph::coordinator::tests { +TEST(ShardMap, Parse) { + std::string input = R"(4 +property_1 +property_2 +property_3 +property_4 +3 +edge_type_1 +edge_type_2 +edge_type_3 +2 +label_1 +1 +primary_property_name_1 +string +4 +[asdasd] +[qweqwe] +[bnm] +[tryuryturtyur] +label_2 +3 +property_1 +string +property_2 +int +primary_property_name_2 +InT +2 +[first,1 ,2] +[ second ,-1, -9223372036854775808] +)"; + + std::stringstream stream(input); + auto shard_map = ShardMap::Parse(stream); + EXPECT_EQ(shard_map.properties.size(), 6); + EXPECT_EQ(shard_map.edge_types.size(), 3); + EXPECT_EQ(shard_map.label_spaces.size(), 2); + EXPECT_EQ(shard_map.schemas.size(), 2); + + auto check_label = [&shard_map](const std::string &label_name, const std::vector &expected_schema, + const std::vector &expected_split_points) { + ASSERT_TRUE(shard_map.labels.contains(label_name)); + const auto label_id = shard_map.labels.at(label_name); + const auto &schema = shard_map.schemas.at(label_id); + ASSERT_EQ(schema.size(), expected_schema.size()); + for (auto pp_index = 0; pp_index < schema.size(); ++pp_index) { + EXPECT_EQ(schema[pp_index].property_id, expected_schema[pp_index].property_id); + EXPECT_EQ(schema[pp_index].type, expected_schema[pp_index].type); + } + + const auto &label_space = shard_map.label_spaces.at(label_id); + + ASSERT_EQ(label_space.shards.size(), expected_split_points.size()); + for (const auto &split_point : expected_split_points) { + EXPECT_TRUE(label_space.shards.contains(split_point)) << split_point[0]; + } + }; + + check_label("label_1", + {SchemaProperty{shard_map.properties.at("primary_property_name_1"), common::SchemaType::STRING}}, + std::vector{ + PrimaryKey{PropertyValue{""}}, + PrimaryKey{PropertyValue{"asdasd"}}, + PrimaryKey{PropertyValue{"qweqwe"}}, + PrimaryKey{PropertyValue{"bnm"}}, + PrimaryKey{PropertyValue{"tryuryturtyur"}}, + }); + + static constexpr int64_t kMinInt = std::numeric_limits::min(); + check_label("label_2", + {SchemaProperty{shard_map.properties.at("property_1"), common::SchemaType::STRING}, + SchemaProperty{shard_map.properties.at("property_2"), common::SchemaType::INT}, + SchemaProperty{shard_map.properties.at("primary_property_name_2"), common::SchemaType::INT}}, + std::vector{ + PrimaryKey{PropertyValue{""}, PropertyValue{kMinInt}, PropertyValue{kMinInt}}, + PrimaryKey{PropertyValue{"first"}, PropertyValue{1}, PropertyValue{2}}, + PrimaryKey{PropertyValue{" second "}, PropertyValue{-1}, + PropertyValue{int64_t{-9223372036854775807LL - 1LL}}}, + }); +} +} // namespace memgraph::coordinator::tests