From 58eb2caf0f6216c08c697decfbf5d0785f38ffa3 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Wed, 21 Sep 2022 16:57:25 +0200 Subject: [PATCH] Add machine manager prototype (#533) --- src/coordinator/CMakeLists.txt | 5 +- src/coordinator/coordinator.cpp | 129 ++++++++++ src/coordinator/coordinator.hpp | 144 +++--------- src/coordinator/shard_map.cpp | 90 +++++++ src/coordinator/shard_map.hpp | 120 +++++++--- src/io/future.hpp | 8 +- src/io/local_transport/local_system.hpp | 2 +- src/io/local_transport/local_transport.hpp | 15 +- .../local_transport_handle.hpp | 24 +- src/io/message_conversion.hpp | 26 ++ src/io/messages.hpp | 48 ++++ src/io/rsm/raft.hpp | 89 ++++--- src/io/simulator/simulator_transport.hpp | 10 +- src/io/transport.hpp | 16 +- src/machine_manager/machine_config.hpp | 42 ++++ src/machine_manager/machine_manager.hpp | 180 ++++++++++++++ src/storage/v3/shard_manager.hpp | 222 ++++++++++++++++++ tests/simulation/CMakeLists.txt | 2 +- tests/simulation/sharded_map.cpp | 37 +-- tests/unit/CMakeLists.txt | 8 +- tests/unit/machine_manager.cpp | 209 +++++++++++++++++ 21 files changed, 1200 insertions(+), 226 deletions(-) create mode 100644 src/coordinator/coordinator.cpp create mode 100644 src/coordinator/shard_map.cpp create mode 100644 src/io/messages.hpp create mode 100644 src/machine_manager/machine_config.hpp create mode 100644 src/machine_manager/machine_manager.hpp create mode 100644 src/storage/v3/shard_manager.hpp create mode 100644 tests/unit/machine_manager.cpp diff --git a/src/coordinator/CMakeLists.txt b/src/coordinator/CMakeLists.txt index 198223f5a..c72dc5157 100644 --- a/src/coordinator/CMakeLists.txt +++ b/src/coordinator/CMakeLists.txt @@ -1,7 +1,6 @@ set(coordinator_src_files - coordinator.hpp - shard_map.hpp - hybrid_logical_clock.hpp) + coordinator.cpp + shard_map.cpp) find_package(fmt REQUIRED) find_package(Threads REQUIRED) diff --git a/src/coordinator/coordinator.cpp b/src/coordinator/coordinator.cpp new file mode 100644 index 000000000..f24fd0f08 --- /dev/null +++ b/src/coordinator/coordinator.cpp @@ -0,0 +1,129 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include + +namespace memgraph::coordinator { + +CoordinatorWriteResponses Coordinator::ApplyWrite(HeartbeatRequest &&heartbeat_request) { + spdlog::info("Coordinator handling HeartbeatRequest"); + + // add this storage engine to any under-replicated shards that it is not already a part of + + auto initializing_rsms_for_shard_manager = + shard_map_.AssignShards(heartbeat_request.from_storage_manager, heartbeat_request.initialized_rsms); + + return HeartbeatResponse{ + .shards_to_initialize = initializing_rsms_for_shard_manager, + }; +} + +CoordinatorWriteResponses Coordinator::ApplyWrite(HlcRequest &&hlc_request) { + HlcResponse res{}; + + auto hlc_shard_map = shard_map_.GetHlc(); + + MG_ASSERT(!(hlc_request.last_shard_map_version.logical_id > hlc_shard_map.logical_id)); + + res.new_hlc = Hlc{ + .logical_id = ++highest_allocated_timestamp_, + // TODO(tyler) probably pass some more context to the Coordinator here + // so that we can use our wall clock and enforce monotonicity. + // .coordinator_wall_clock = io_.Now(), + }; + + // Allways return fresher shard_map for now. + res.fresher_shard_map = std::make_optional(shard_map_); + + return res; +} + +CoordinatorWriteResponses Coordinator::ApplyWrite(AllocateEdgeIdBatchRequest &&ahr) { + AllocateEdgeIdBatchResponse res{}; + + uint64_t low = highest_allocated_edge_id_; + + highest_allocated_edge_id_ += ahr.batch_size; + + uint64_t high = highest_allocated_edge_id_; + + res.low = low; + res.high = high; + + return res; +} + +/// This splits the shard immediately beneath the provided +/// split key, keeping the assigned peers identical for now, +/// but letting them be gradually migrated over time. +CoordinatorWriteResponses Coordinator::ApplyWrite(SplitShardRequest &&split_shard_request) { + SplitShardResponse res{}; + + if (split_shard_request.previous_shard_map_version != shard_map_.shard_map_version) { + res.success = false; + } else { + res.success = shard_map_.SplitShard(split_shard_request.previous_shard_map_version, split_shard_request.label_id, + split_shard_request.split_key); + } + + return res; +} + +/// This adds the provided storage engine to the standby storage engine pool, +/// which can be used to rebalance storage over time. +CoordinatorWriteResponses Coordinator::ApplyWrite( + RegisterStorageEngineRequest && /* register_storage_engine_request */) { + RegisterStorageEngineResponse res{}; + // TODO + + return res; +} + +/// This begins the process of draining the provided storage engine from all raft +/// clusters that it might be participating in. +CoordinatorWriteResponses Coordinator::ApplyWrite( + DeregisterStorageEngineRequest && /* register_storage_engine_request */) { + DeregisterStorageEngineResponse res{}; + // TODO + + return res; +} + +CoordinatorWriteResponses Coordinator::ApplyWrite(InitializeLabelRequest &&initialize_label_request) { + InitializeLabelResponse res{}; + + std::optional new_label_id = shard_map_.InitializeNewLabel( + initialize_label_request.label_name, initialize_label_request.schema, initialize_label_request.replication_factor, + initialize_label_request.last_shard_map_version); + + if (new_label_id) { + res.new_label_id = new_label_id.value(); + res.fresher_shard_map = std::nullopt; + res.success = true; + } else { + res.fresher_shard_map = shard_map_; + res.success = false; + } + + return res; +} + +CoordinatorWriteResponses Coordinator::ApplyWrite(AllocatePropertyIdsRequest &&allocate_property_ids_request) { + AllocatePropertyIdsResponse res{}; + + auto property_ids = shard_map_.AllocatePropertyIds(allocate_property_ids_request.property_names); + + res.property_ids = property_ids; + + return res; +} + +} // namespace memgraph::coordinator diff --git a/src/coordinator/coordinator.hpp b/src/coordinator/coordinator.hpp index 3382a4006..f363d799e 100644 --- a/src/coordinator/coordinator.hpp +++ b/src/coordinator/coordinator.hpp @@ -12,26 +12,30 @@ #pragma once #include +#include #include #include #include #include -#include "coordinator/hybrid_logical_clock.hpp" -#include "coordinator/shard_map.hpp" -#include "io/simulator/simulator.hpp" -#include "io/time.hpp" -#include "io/transport.hpp" -#include "storage/v3/id_types.hpp" -#include "storage/v3/schemas.hpp" +#include + +#include +#include +#include +#include +#include +#include +#include namespace memgraph::coordinator { +using memgraph::io::Address; using memgraph::storage::v3::LabelId; using memgraph::storage::v3::PropertyId; -using Address = memgraph::io::Address; -using SimT = memgraph::io::simulator::SimulatorTransport; using memgraph::storage::v3::SchemaProperty; +using SimT = memgraph::io::simulator::SimulatorTransport; +using PrimaryKey = std::vector; struct HlcRequest { Hlc last_shard_map_version; @@ -76,7 +80,7 @@ struct AllocatePropertyIdsResponse { struct SplitShardRequest { Hlc previous_shard_map_version; LabelId label_id; - CompoundKey split_key; + PrimaryKey split_key; }; struct SplitShardResponse { @@ -102,26 +106,34 @@ struct DeregisterStorageEngineResponse { struct InitializeLabelRequest { std::string label_name; std::vector schema; + size_t replication_factor; Hlc last_shard_map_version; }; struct InitializeLabelResponse { bool success; + LabelId new_label_id; std::optional fresher_shard_map; }; -struct HeartbeatRequest {}; -struct HeartbeatResponse {}; +struct HeartbeatRequest { + Address from_storage_manager; + std::set initialized_rsms; +}; + +struct HeartbeatResponse { + std::vector shards_to_initialize; +}; using CoordinatorWriteRequests = std::variant; -using CoordinatorWriteResponses = - std::variant; + DeregisterStorageEngineRequest, InitializeLabelRequest, AllocatePropertyIdsRequest, HeartbeatRequest>; +using CoordinatorWriteResponses = std::variant; -using CoordinatorReadRequests = std::variant; -using CoordinatorReadResponses = std::variant; +using CoordinatorReadRequests = std::variant; +using CoordinatorReadResponses = std::variant; class Coordinator { public: @@ -146,114 +158,34 @@ class Coordinator { /// Query engines need to periodically request batches of unique edge IDs. uint64_t highest_allocated_edge_id_; - static CoordinatorReadResponses HandleRead(HeartbeatRequest && /* heartbeat_request */) { - return HeartbeatResponse{}; - } - CoordinatorReadResponses HandleRead(GetShardMapRequest && /* get_shard_map_request */) { GetShardMapResponse res; res.shard_map = shard_map_; return res; } - CoordinatorWriteResponses ApplyWrite(HlcRequest &&hlc_request) { - HlcResponse res{}; + CoordinatorWriteResponses ApplyWrite(HeartbeatRequest &&heartbeat_request); - auto hlc_shard_map = shard_map_.GetHlc(); + CoordinatorWriteResponses ApplyWrite(HlcRequest &&hlc_request); - MG_ASSERT(!(hlc_request.last_shard_map_version.logical_id > hlc_shard_map.logical_id)); - - res.new_hlc = Hlc{ - .logical_id = ++highest_allocated_timestamp_, - // TODO(tyler) probably pass some more context to the Coordinator here - // so that we can use our wall clock and enforce monotonicity. - // .coordinator_wall_clock = io_.Now(), - }; - - // Allways return fresher shard_map for now. - res.fresher_shard_map = std::make_optional(shard_map_); - - return res; - } - - CoordinatorWriteResponses ApplyWrite(AllocateEdgeIdBatchRequest &&ahr) { - AllocateEdgeIdBatchResponse res{}; - - uint64_t low = highest_allocated_edge_id_; - - highest_allocated_edge_id_ += ahr.batch_size; - - uint64_t high = highest_allocated_edge_id_; - - res.low = low; - res.high = high; - - return res; - } + CoordinatorWriteResponses ApplyWrite(AllocateEdgeIdBatchRequest &&ahr); /// This splits the shard immediately beneath the provided /// split key, keeping the assigned peers identical for now, /// but letting them be gradually migrated over time. - CoordinatorWriteResponses ApplyWrite(SplitShardRequest &&split_shard_request) { - SplitShardResponse res{}; - - if (split_shard_request.previous_shard_map_version != shard_map_.shard_map_version) { - res.success = false; - } else { - res.success = shard_map_.SplitShard(split_shard_request.previous_shard_map_version, split_shard_request.label_id, - split_shard_request.split_key); - } - - return res; - } + CoordinatorWriteResponses ApplyWrite(SplitShardRequest &&split_shard_request); /// This adds the provided storage engine to the standby storage engine pool, /// which can be used to rebalance storage over time. - static CoordinatorWriteResponses ApplyWrite(RegisterStorageEngineRequest && /* register_storage_engine_request */) { - RegisterStorageEngineResponse res{}; - // TODO - - return res; - } + static CoordinatorWriteResponses ApplyWrite(RegisterStorageEngineRequest && /* register_storage_engine_request */); /// This begins the process of draining the provided storage engine from all raft /// clusters that it might be participating in. - static CoordinatorWriteResponses ApplyWrite(DeregisterStorageEngineRequest && /* register_storage_engine_request */) { - DeregisterStorageEngineResponse res{}; - // TODO - // const Address &address = register_storage_engine_request.address; - // storage_engine_pool_.erase(address); - // res.success = true; + static CoordinatorWriteResponses ApplyWrite(DeregisterStorageEngineRequest && /* register_storage_engine_request */); - return res; - } + CoordinatorWriteResponses ApplyWrite(InitializeLabelRequest &&initialize_label_request); - CoordinatorWriteResponses ApplyWrite(InitializeLabelRequest &&initialize_label_request) { - InitializeLabelResponse res{}; - - bool success = shard_map_.InitializeNewLabel(initialize_label_request.label_name, initialize_label_request.schema, - initialize_label_request.last_shard_map_version); - - if (success) { - res.fresher_shard_map = shard_map_; - res.success = false; - } else { - res.fresher_shard_map = std::nullopt; - res.success = true; - } - - return res; - } - - CoordinatorWriteResponses ApplyWrite(AllocatePropertyIdsRequest &&allocate_property_ids_request) { - AllocatePropertyIdsResponse res{}; - - auto property_ids = shard_map_.AllocatePropertyIds(allocate_property_ids_request.property_names); - - res.property_ids = property_ids; - - return res; - } + CoordinatorWriteResponses ApplyWrite(AllocatePropertyIdsRequest &&allocate_property_ids_request); }; } // namespace memgraph::coordinator diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp new file mode 100644 index 000000000..731c0fdcf --- /dev/null +++ b/src/coordinator/shard_map.cpp @@ -0,0 +1,90 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "coordinator/shard_map.hpp" +#include "storage/v3/temporal.hpp" + +namespace memgraph::coordinator { + +using memgraph::common::SchemaType; +using memgraph::storage::v3::TemporalData; +using memgraph::storage::v3::TemporalType; + +PrimaryKey SchemaToMinKey(const std::vector &schema) { + PrimaryKey ret{}; + + const int64_t min_int = std::numeric_limits::min(); + + const TemporalData date{TemporalType::Date, min_int}; + const TemporalData local_time{TemporalType::LocalTime, min_int}; + const TemporalData local_date_time{TemporalType::LocalDateTime, min_int}; + const TemporalData duration{TemporalType::Duration, min_int}; + + for (const auto &schema_property : schema) { + switch (schema_property.type) { + case SchemaType::BOOL: + ret.emplace_back(PropertyValue(false)); + break; + case SchemaType::INT: + ret.emplace_back(PropertyValue(min_int)); + break; + case SchemaType::STRING: + ret.emplace_back(PropertyValue("")); + break; + case SchemaType::DATE: + ret.emplace_back(PropertyValue(date)); + break; + case SchemaType::LOCALTIME: + ret.emplace_back(PropertyValue(local_time)); + break; + case SchemaType::LOCALDATETIME: + ret.emplace_back(PropertyValue(local_date_time)); + break; + case SchemaType::DURATION: + ret.emplace_back(PropertyValue(duration)); + break; + } + } + + return ret; +} + +std::optional ShardMap::InitializeNewLabel(std::string label_name, std::vector schema, + size_t replication_factor, Hlc last_shard_map_version) { + if (shard_map_version != last_shard_map_version || labels.contains(label_name)) { + return std::nullopt; + } + + const LabelId label_id = LabelId::FromUint(++max_label_id); + + labels.emplace(std::move(label_name), label_id); + + PrimaryKey initial_key = SchemaToMinKey(schema); + Shard empty_shard = {}; + + Shards shards = { + {initial_key, empty_shard}, + }; + + LabelSpace label_space{ + .schema = std::move(schema), + .shards = shards, + .replication_factor = replication_factor, + }; + + label_spaces.emplace(label_id, label_space); + + IncrementShardMapVersion(); + + return label_id; +} + +} // namespace memgraph::coordinator diff --git a/src/coordinator/shard_map.hpp b/src/coordinator/shard_map.hpp index ce9f779dd..923406e00 100644 --- a/src/coordinator/shard_map.hpp +++ b/src/coordinator/shard_map.hpp @@ -11,11 +11,18 @@ #pragma once +#include #include +#include #include +#include +#include + +#include "common/types.hpp" #include "coordinator/hybrid_logical_clock.hpp" #include "io/address.hpp" +#include "storage/v3/config.hpp" #include "storage/v3/id_types.hpp" #include "storage/v3/property_value.hpp" #include "storage/v3/schemas.hpp" @@ -23,8 +30,10 @@ namespace memgraph::coordinator { using memgraph::io::Address; +using memgraph::storage::v3::Config; using memgraph::storage::v3::LabelId; using memgraph::storage::v3::PropertyId; +using memgraph::storage::v3::PropertyValue; using memgraph::storage::v3::SchemaProperty; enum class Status : uint8_t { @@ -40,16 +49,27 @@ struct AddressAndStatus { Status status; }; -using CompoundKey = std::vector; +using PrimaryKey = std::vector; using Shard = std::vector; -using Shards = std::map; +using Shards = std::map; using LabelName = std::string; using PropertyName = std::string; using PropertyMap = std::map; +struct ShardToInitialize { + boost::uuids::uuid uuid; + LabelId label_id; + PrimaryKey min_key; + std::optional max_key; + Config config; +}; + +PrimaryKey SchemaToMinKey(const std::vector &schema); + struct LabelSpace { std::vector schema; - std::map shards; + std::map shards; + size_t replication_factor; }; struct ShardMap { @@ -70,7 +90,70 @@ struct ShardMap { Hlc GetHlc() const noexcept { return shard_map_version; } - bool SplitShard(Hlc previous_shard_map_version, LabelId label_id, const CompoundKey &key) { + // Returns the shard UUIDs that have been assigned but not yet acknowledged for this storage manager + std::vector AssignShards(Address storage_manager, std::set initialized) { + std::vector ret{}; + + bool mutated = false; + + for (auto &[label_id, label_space] : label_spaces) { + for (auto &[low_key, shard] : label_space.shards) { + // TODO(tyler) avoid these triple-nested loops by having the heartbeat include better info + bool machine_contains_shard = false; + + for (auto &aas : shard) { + if (initialized.contains(aas.address.unique_id)) { + spdlog::info("marking shard as full consensus participant: {}", aas.address.unique_id); + aas.status = Status::CONSENSUS_PARTICIPANT; + machine_contains_shard = true; + } else { + const bool same_machine = aas.address.last_known_ip == storage_manager.last_known_ip && + aas.address.last_known_port == storage_manager.last_known_port; + if (same_machine) { + machine_contains_shard = true; + ret.push_back(ShardToInitialize{ + .uuid = aas.address.unique_id, + .label_id = label_id, + .min_key = low_key, + .max_key = std::nullopt, + .config = Config{}, + }); + } + } + } + + if (!machine_contains_shard && shard.size() < label_space.replication_factor) { + Address address = storage_manager; + + // TODO(tyler) use deterministic UUID so that coordinators don't diverge here + address.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()}, + + ret.push_back(ShardToInitialize{ + .uuid = address.unique_id, + .label_id = label_id, + .min_key = low_key, + .max_key = std::nullopt, + .config = Config{}, + }); + + AddressAndStatus aas = { + .address = address, + .status = Status::INITIALIZING, + }; + + shard.emplace_back(aas); + } + } + } + + if (mutated) { + IncrementShardMapVersion(); + } + + return ret; + } + + bool SplitShard(Hlc previous_shard_map_version, LabelId label_id, const PrimaryKey &key) { if (previous_shard_map_version != shard_map_version) { return false; } @@ -78,10 +161,11 @@ struct ShardMap { auto &label_space = label_spaces.at(label_id); auto &shards_in_map = label_space.shards; + MG_ASSERT(!shards_in_map.empty()); MG_ASSERT(!shards_in_map.contains(key)); MG_ASSERT(label_spaces.contains(label_id)); - // Finding the Shard that the new CompoundKey should map to. + // Finding the Shard that the new PrimaryKey should map to. auto prev = std::prev(shards_in_map.upper_bound(key)); Shard duplicated_shard = prev->second; @@ -91,32 +175,14 @@ struct ShardMap { return true; } - bool InitializeNewLabel(std::string label_name, std::vector schema, Hlc last_shard_map_version) { - if (shard_map_version != last_shard_map_version || labels.contains(label_name)) { - return false; - } - - const LabelId label_id = LabelId::FromUint(++max_label_id); - - labels.emplace(std::move(label_name), label_id); - - LabelSpace label_space{ - .schema = std::move(schema), - .shards = Shards{}, - }; - - label_spaces.emplace(label_id, label_space); - - IncrementShardMapVersion(); - - return true; - } + std::optional InitializeNewLabel(std::string label_name, std::vector schema, + size_t replication_factor, Hlc last_shard_map_version); void AddServer(Address server_address) { // Find a random place for the server to plug in } - Shards GetShardsForRange(const LabelName &label_name, const CompoundKey &start_key, const CompoundKey &end_key) const { + Shards GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const { MG_ASSERT(start_key <= end_key); MG_ASSERT(labels.contains(label_name)); @@ -139,7 +205,7 @@ struct ShardMap { return shards; } - Shard GetShardForKey(const LabelName &label_name, const CompoundKey &key) const { + Shard GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const { MG_ASSERT(labels.contains(label_name)); LabelId label_id = labels.at(label_name); diff --git a/src/io/future.hpp b/src/io/future.hpp index 7b9a4461c..c697670ab 100644 --- a/src/io/future.hpp +++ b/src/io/future.hpp @@ -99,7 +99,7 @@ class Shared { bool IsReady() const { std::unique_lock lock(mu_); - return item_; + return item_.has_value(); } std::optional TryGet() { @@ -148,12 +148,6 @@ class Future { old.consumed_or_moved_ = true; } - Future &operator=(Future &&old) noexcept { - MG_ASSERT(!old.consumed_or_moved_, "Future moved from after already being moved from or consumed."); - shared_ = std::move(old.shared_); - old.consumed_or_moved_ = true; - } - Future(const Future &) = delete; Future &operator=(const Future &) = delete; ~Future() = default; diff --git a/src/io/local_transport/local_system.hpp b/src/io/local_transport/local_system.hpp index fd0628943..2e54f8d75 100644 --- a/src/io/local_transport/local_system.hpp +++ b/src/io/local_transport/local_system.hpp @@ -25,7 +25,7 @@ class LocalSystem { public: Io Register(Address address) { - LocalTransport local_transport(local_transport_handle_, address); + LocalTransport local_transport(local_transport_handle_); return Io{local_transport, address}; } diff --git a/src/io/local_transport/local_transport.hpp b/src/io/local_transport/local_transport.hpp index bdf964bed..719605081 100644 --- a/src/io/local_transport/local_transport.hpp +++ b/src/io/local_transport/local_transport.hpp @@ -25,18 +25,16 @@ namespace memgraph::io::local_transport { class LocalTransport { std::shared_ptr local_transport_handle_; - const Address address_; public: - LocalTransport(std::shared_ptr local_transport_handle, Address address) - : local_transport_handle_(std::move(local_transport_handle)), address_(address) {} + explicit LocalTransport(std::shared_ptr local_transport_handle) + : local_transport_handle_(std::move(local_transport_handle)) {} template - ResponseFuture Request(Address to_address, RequestId request_id, RequestT request, Duration timeout) { + ResponseFuture Request(Address to_address, Address from_address, RequestId request_id, RequestT request, + Duration timeout) { auto [future, promise] = memgraph::io::FuturePromisePair>(); - Address from_address = address_; - local_transport_handle_->SubmitRequest(to_address, from_address, request_id, std::move(request), timeout, std::move(promise)); @@ -44,9 +42,8 @@ class LocalTransport { } template - requires(sizeof...(Ms) > 0) RequestResult Receive(Duration timeout) { - Address from_address = address_; - return local_transport_handle_->template Receive(timeout); + requires(sizeof...(Ms) > 0) RequestResult Receive(Address receiver_address, Duration timeout) { + return local_transport_handle_->template Receive(receiver_address, timeout); } template diff --git a/src/io/local_transport/local_transport_handle.hpp b/src/io/local_transport/local_transport_handle.hpp index 1afade9e6..798f47a94 100644 --- a/src/io/local_transport/local_transport_handle.hpp +++ b/src/io/local_transport/local_transport_handle.hpp @@ -36,6 +36,13 @@ class LocalTransportHandle { std::vector can_receive_; public: + ~LocalTransportHandle() { + for (auto &&[pk, promise] : promises_) { + std::move(promise.promise).TimeOut(); + } + promises_.clear(); + } + void ShutDown() { std::unique_lock lock(mu_); should_shut_down_ = true; @@ -53,11 +60,13 @@ class LocalTransportHandle { } template - requires(sizeof...(Ms) > 0) RequestResult Receive(Duration timeout) { + requires(sizeof...(Ms) > 0) RequestResult Receive(Address /* receiver_address */, Duration timeout) { std::unique_lock lock(mu_); Time before = Now(); + spdlog::info("can_receive_ size: {}", can_receive_.size()); + while (can_receive_.empty()) { Time now = Now(); @@ -89,23 +98,26 @@ class LocalTransportHandle { template void Send(Address to_address, Address from_address, RequestId request_id, M &&message) { std::any message_any(std::forward(message)); - OpaqueMessage opaque_message{ - .from_address = from_address, .request_id = request_id, .message = std::move(message_any)}; + OpaqueMessage opaque_message{.to_address = to_address, + .from_address = from_address, + .request_id = request_id, + .message = std::move(message_any)}; - PromiseKey promise_key{.requester_address = to_address, - .request_id = opaque_message.request_id, - .replier_address = opaque_message.from_address}; + PromiseKey promise_key{ + .requester_address = to_address, .request_id = opaque_message.request_id, .replier_address = from_address}; { std::unique_lock lock(mu_); if (promises_.contains(promise_key)) { + spdlog::info("using message to fill promise"); // complete waiting promise if it's there DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key)); promises_.erase(promise_key); dop.promise.Fill(std::move(opaque_message)); } else { + spdlog::info("placing message in can_receive_"); can_receive_.emplace_back(std::move(opaque_message)); } } // lock dropped diff --git a/src/io/message_conversion.hpp b/src/io/message_conversion.hpp index 53881583b..d1f805711 100644 --- a/src/io/message_conversion.hpp +++ b/src/io/message_conversion.hpp @@ -203,4 +203,30 @@ struct DeadlineAndOpaquePromise { OpaquePromise promise; }; +template +std::type_info const &type_info_for_variant(From const &from) { + return std::visit([](auto &&x) -> decltype(auto) { return typeid(x); }, from); +} + +template +std::optional ConvertVariantInner(From &&a) { + if (typeid(Head) == type_info_for_variant(a)) { + Head concrete = std::get(std::forward(a)); + return concrete; + } + + if constexpr (sizeof...(Rest) > 0) { + return ConvertVariantInner(std::forward(a)); + } else { + return std::nullopt; + } +} + +/// This function converts a variant to another variant holding a subset OR superset of +/// possible types. +template +requires(sizeof...(Ms) > 0) std::optional> ConvertVariant(From &&from) { + return ConvertVariantInner, Ms...>(std::forward(from)); +} + } // namespace memgraph::io diff --git a/src/io/messages.hpp b/src/io/messages.hpp new file mode 100644 index 000000000..fc74a309a --- /dev/null +++ b/src/io/messages.hpp @@ -0,0 +1,48 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include + +#include +#include +#include +#include "utils/concepts.hpp" + +namespace memgraph::io::messages { + +using memgraph::coordinator::CoordinatorReadRequests; +using memgraph::coordinator::CoordinatorWriteRequests; +using memgraph::coordinator::CoordinatorWriteResponses; + +// TODO(everbody) change these to the real shard messages +using memgraph::io::rsm::StorageReadRequest; +using memgraph::io::rsm::StorageWriteRequest; + +using memgraph::io::rsm::AppendRequest; +using memgraph::io::rsm::AppendResponse; +using memgraph::io::rsm::ReadRequest; +using memgraph::io::rsm::VoteRequest; +using memgraph::io::rsm::VoteResponse; +using memgraph::io::rsm::WriteRequest; +using memgraph::io::rsm::WriteResponse; + +using CoordinatorMessages = + std::variant, AppendRequest, AppendResponse, + WriteRequest, VoteRequest, VoteResponse>; + +using ShardMessages = std::variant, AppendRequest, AppendResponse, + WriteRequest, VoteRequest, VoteResponse>; + +using ShardManagerMessages = std::variant>; + +} // namespace memgraph::io::messages diff --git a/src/io/rsm/raft.hpp b/src/io/rsm/raft.hpp index ea7286cbd..06772eb35 100644 --- a/src/io/rsm/raft.hpp +++ b/src/io/rsm/raft.hpp @@ -53,7 +53,6 @@ static constexpr size_t kMaximumAppendBatchSize = 1024; using Term = uint64_t; using LogIndex = uint64_t; using LogSize = uint64_t; -using RequestId = uint64_t; template struct WriteRequest { @@ -230,22 +229,60 @@ class Raft { Io io_; std::vector
peers_; ReplicatedState replicated_state_; + Time next_cron_; public: Raft(Io &&io, std::vector
peers, ReplicatedState &&replicated_state) : io_(std::forward>(io)), peers_(peers), - replicated_state_(std::forward(replicated_state)) {} + replicated_state_(std::forward(replicated_state)) { + if (peers.empty()) { + role_ = Leader{}; + } + } + + /// Periodic protocol maintenance. Returns the time that Cron should be called again + /// in the future. + Time Cron() { + // dispatch periodic logic based on our role to a specific Cron method. + std::optional new_role = std::visit([&](auto &role) { return Cron(role); }, role_); + + if (new_role) { + role_ = std::move(new_role).value(); + } + const Duration random_cron_interval = RandomTimeout(kMinimumCronInterval, kMaximumCronInterval); + + return io_.Now() + random_cron_interval; + } + + /// Returns the Address for our underlying Io implementation + Address GetAddress() { return io_.GetAddress(); } + + using ReceiveVariant = std::variant, AppendRequest, AppendResponse, + WriteRequest, VoteRequest, VoteResponse>; + + void Handle(ReceiveVariant &&message_variant, RequestId request_id, Address from_address) { + // dispatch the message to a handler based on our role, + // which can be specified in the Handle first argument, + // or it can be `auto` if it's a handler for several roles + // or messages. + std::optional new_role = std::visit( + [&](auto &&msg, auto &role) mutable { + return Handle(role, std::forward(msg), request_id, from_address); + }, + std::forward(message_variant), role_); + + // TODO(tyler) (M3) maybe replace std::visit with get_if for explicit prioritized matching, [[likely]] etc... + if (new_role) { + role_ = std::move(new_role).value(); + } + } void Run() { - Time last_cron = io_.Now(); - while (!io_.ShouldShutDown()) { const auto now = io_.Now(); - const Duration random_cron_interval = RandomTimeout(kMinimumCronInterval, kMaximumCronInterval); - if (now - last_cron > random_cron_interval) { - Cron(); - last_cron = now; + if (now >= next_cron_) { + next_cron_ = Cron(); } const Duration receive_timeout = RandomTimeout(kMinimumReceiveTimeout, kMaximumReceiveTimeout); @@ -449,16 +486,6 @@ class Raft { /// been received. ///////////////////////////////////////////////////////////// - /// Periodic protocol maintenance. - void Cron() { - // dispatch periodic logic based on our role to a specific Cron method. - std::optional new_role = std::visit([&](auto &role) { return Cron(role); }, role_); - - if (new_role) { - role_ = std::move(new_role).value(); - } - } - // Raft paper - 5.2 // Candidates keep sending Vote to peers until: // 1. receiving Append with a higher term (become Follower) @@ -541,26 +568,6 @@ class Raft { /// message that has been received. ///////////////////////////////////////////////////////////// - using ReceiveVariant = std::variant, AppendRequest, AppendResponse, - WriteRequest, VoteRequest, VoteResponse>; - - void Handle(ReceiveVariant &&message_variant, RequestId request_id, Address from_address) { - // dispatch the message to a handler based on our role, - // which can be specified in the Handle first argument, - // or it can be `auto` if it's a handler for several roles - // or messages. - std::optional new_role = std::visit( - [&](auto &&msg, auto &role) mutable { - return Handle(role, std::forward(msg), request_id, from_address); - }, - std::forward(message_variant), role_); - - // TODO(tyler) (M3) maybe replace std::visit with get_if for explicit prioritized matching, [[likely]] etc... - if (new_role) { - role_ = std::move(new_role).value(); - } - } - // all roles can receive Vote and possibly become a follower template std::optional Handle(ALL & /* variable */, VoteRequest &&req, RequestId request_id, Address from_address) { @@ -906,7 +913,11 @@ class Raft { leader.pending_client_requests.emplace(log_index, pcr); - BroadcastAppendEntries(leader.followers); + if (peers_.empty()) { + BumpCommitIndexAndReplyToClients(leader); + } else { + BroadcastAppendEntries(leader.followers); + } return std::nullopt; } diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp index 4b5a2e890..5f007da87 100644 --- a/src/io/simulator/simulator_transport.hpp +++ b/src/io/simulator/simulator_transport.hpp @@ -33,19 +33,21 @@ class SimulatorTransport { : simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {} template - ResponseFuture Request(Address address, uint64_t request_id, RequestT request, Duration timeout) { + ResponseFuture Request(Address to_address, Address from_address, uint64_t request_id, RequestT request, + Duration timeout) { std::function maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); }; auto [future, promise] = memgraph::io::FuturePromisePairWithNotifier>(maybe_tick_simulator); - simulator_handle_->SubmitRequest(address, address_, request_id, std::move(request), timeout, std::move(promise)); + simulator_handle_->SubmitRequest(to_address, from_address, request_id, std::move(request), timeout, + std::move(promise)); return std::move(future); } template - requires(sizeof...(Ms) > 0) RequestResult Receive(Duration timeout) { - return simulator_handle_->template Receive(address_, timeout); + requires(sizeof...(Ms) > 0) RequestResult Receive(Address receiver_address, Duration timeout) { + return simulator_handle_->template Receive(receiver_address, timeout); } template diff --git a/src/io/transport.hpp b/src/io/transport.hpp index 04b1d2d62..87e6bc06f 100644 --- a/src/io/transport.hpp +++ b/src/io/transport.hpp @@ -83,23 +83,26 @@ class Io { template ResponseFuture RequestWithTimeout(Address address, RequestT request, Duration timeout) { const RequestId request_id = ++request_id_counter_; - return implementation_.template Request(address, request_id, request, timeout); + const Address from_address = address_; + return implementation_.template Request(address, from_address, request_id, request, timeout); } /// Issue a request that times out after the default timeout. This tends /// to be used by clients. template - ResponseFuture Request(Address address, RequestT request) { + ResponseFuture Request(Address to_address, RequestT request) { const RequestId request_id = ++request_id_counter_; const Duration timeout = default_timeout_; - return implementation_.template Request(address, request_id, std::move(request), timeout); + const Address from_address = address_; + return implementation_.template Request(to_address, from_address, request_id, + std::move(request), timeout); } /// Wait for an explicit number of microseconds for a request of one of the /// provided types to arrive. This tends to be used by servers. template RequestResult ReceiveWithTimeout(Duration timeout) { - return implementation_.template Receive(timeout); + return implementation_.template Receive(address_, timeout); } /// Wait the default number of microseconds for a request of one of the @@ -107,7 +110,7 @@ class Io { template requires(sizeof...(Ms) > 0) RequestResult Receive() { const Duration timeout = default_timeout_; - return implementation_.template Receive(timeout); + return implementation_.template Receive(address_, timeout); } /// Send a message in a best-effort fashion. This is used for messaging where @@ -134,5 +137,8 @@ class Io { } Address GetAddress() { return address_; } + void SetAddress(Address address) { address_ = address; } + + Io ForkLocal() { return Io(implementation_, address_.ForkUniqueAddress()); } }; }; // namespace memgraph::io diff --git a/src/machine_manager/machine_config.hpp b/src/machine_manager/machine_config.hpp new file mode 100644 index 000000000..6e46d2b83 --- /dev/null +++ b/src/machine_manager/machine_config.hpp @@ -0,0 +1,42 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include "io/address.hpp" +#include "storage/v3/property_value.hpp" +#include "storage/v3/schemas.hpp" + +namespace memgraph::machine_manager { + +using memgraph::io::Address; +using memgraph::storage::v3::SchemaProperty; +using CompoundKey = std::vector; + +struct InitialLabelSpace { + std::string label_name; + std::vector schema; + size_t replication_factor; + std::vector split_points; +}; + +struct MachineConfig { + std::vector initial_label_spaces; + std::vector
coordinator_addresses; + bool is_storage; + bool is_coordinator; + bool is_query_engine; + boost::asio::ip::address listen_ip; + uint16_t listen_port; +}; + +} // namespace memgraph::machine_manager diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp new file mode 100644 index 000000000..909d7235b --- /dev/null +++ b/src/machine_manager/machine_manager.hpp @@ -0,0 +1,180 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace memgraph::machine_manager { + +using memgraph::coordinator::Coordinator; +using memgraph::coordinator::CoordinatorReadRequests; +using memgraph::coordinator::CoordinatorReadResponses; +using memgraph::coordinator::CoordinatorRsm; +using memgraph::coordinator::CoordinatorWriteRequests; +using memgraph::coordinator::CoordinatorWriteResponses; +using memgraph::io::ConvertVariant; +using memgraph::io::Duration; +using memgraph::io::RequestId; +using memgraph::io::Time; +using memgraph::io::messages::CoordinatorMessages; +using memgraph::io::messages::ShardManagerMessages; +using memgraph::io::messages::ShardMessages; +using memgraph::io::rsm::AppendRequest; +using memgraph::io::rsm::AppendResponse; +using memgraph::io::rsm::ReadRequest; +using memgraph::io::rsm::VoteRequest; +using memgraph::io::rsm::VoteResponse; +using memgraph::io::rsm::WriteRequest; +using memgraph::io::rsm::WriteResponse; +using memgraph::storage::v3::ShardManager; + +using memgraph::io::rsm::StorageReadRequest; +using memgraph::io::rsm::StorageWriteRequest; + +/// The MachineManager is responsible for: +/// * starting the entire system and ensuring that high-level +/// operational requirements continue to be met +/// * acting as a machine's single caller of Io::Receive +/// * routing incoming messages from the Io interface to the +/// appropriate Coordinator or to the StorageManager +/// (it's not necessary to route anything in this layer +/// to the query engine because the query engine only +/// communicates using higher-level Futures that will +/// be filled immediately when the response comes in +/// at the lower-level transport layer. +/// +/// Every storage engine has exactly one RsmEngine. +template +class MachineManager { + io::Io io_; + MachineConfig config_; + CoordinatorRsm coordinator_; + ShardManager shard_manager_; + Time next_cron_; + + public: + // TODO initialize ShardManager with "real" coordinator addresses instead of io.GetAddress + // which is only true for single-machine config. + MachineManager(io::Io io, MachineConfig config, Coordinator coordinator) + : io_(io), + config_(config), + coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)}, + shard_manager_(ShardManager{io.ForkLocal(), coordinator_.GetAddress()}) {} + + Address CoordinatorAddress() { return coordinator_.GetAddress(); } + + void Run() { + while (!io_.ShouldShutDown()) { + const auto now = io_.Now(); + + if (now >= next_cron_) { + next_cron_ = Cron(); + } + + Duration receive_timeout = next_cron_ - now; + + // Note: this parameter pack must be kept in-sync with the ReceiveWithTimeout parameter pack below + using AllMessages = + std::variant, AppendRequest, AppendResponse, + WriteRequest, VoteRequest, VoteResponse, + WriteResponse, ReadRequest, + AppendRequest, WriteRequest>; + + spdlog::info("MM waiting on Receive"); + + // Note: this parameter pack must be kept in-sync with the AllMessages parameter pack above + auto request_result = io_.template ReceiveWithTimeout< + ReadRequest, AppendRequest, AppendResponse, + WriteRequest, VoteRequest, VoteResponse, WriteResponse, + ReadRequest, AppendRequest, WriteRequest>( + receive_timeout); + + if (request_result.HasError()) { + // time to do Cron + spdlog::info("MM got timeout"); + continue; + } + + auto &&request_envelope = std::move(request_result.GetValue()); + + spdlog::info("MM got message to {}", request_envelope.to_address.ToString()); + + // If message is for the coordinator, cast it to subset and pass it to the coordinator + bool to_coordinator = coordinator_.GetAddress() == request_envelope.to_address; + spdlog::info("coordinator: {}", coordinator_.GetAddress().ToString()); + if (to_coordinator) { + std::optional conversion_attempt = + ConvertVariant, AppendRequest, + AppendResponse, WriteRequest, VoteRequest, VoteResponse>( + std::move(request_envelope.message)); + + MG_ASSERT(conversion_attempt.has_value(), "coordinator message conversion failed"); + + spdlog::info("got coordinator message"); + + CoordinatorMessages &&cm = std::move(conversion_attempt.value()); + + coordinator_.Handle(std::forward(cm), request_envelope.request_id, + request_envelope.from_address); + continue; + } + + bool to_sm = shard_manager_.GetAddress() == request_envelope.to_address; + spdlog::info("smm: {}", shard_manager_.GetAddress().ToString()); + if (to_sm) { + std::optional conversion_attempt = + ConvertVariant>(std::move(request_envelope.message)); + + MG_ASSERT(conversion_attempt.has_value(), "shard manager message conversion failed"); + + spdlog::info("got shard manager message"); + + ShardManagerMessages &&smm = std::move(conversion_attempt.value()); + shard_manager_.Receive(std::forward(smm), request_envelope.request_id, + request_envelope.from_address); + continue; + } + + // treat this as a message to a specific shard rsm and cast it accordingly + + std::optional conversion_attempt = + ConvertVariant, AppendRequest, + AppendResponse, WriteRequest, VoteRequest, VoteResponse>( + std::move(request_envelope.message)); + + MG_ASSERT(conversion_attempt.has_value(), "shard rsm message conversion failed for {}", + request_envelope.to_address.ToString()); + + spdlog::info("got shard rsm message"); + + ShardMessages &&sm = std::move(conversion_attempt.value()); + shard_manager_.Route(std::forward(sm), request_envelope.request_id, request_envelope.to_address, + request_envelope.from_address); + } + } + + private: + Time Cron() { + spdlog::info("running MachineManager::Cron, address {}", io_.GetAddress().ToString()); + return shard_manager_.Cron(); + } +}; + +} // namespace memgraph::machine_manager diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp new file mode 100644 index 000000000..2f0ec2d49 --- /dev/null +++ b/src/storage/v3/shard_manager.hpp @@ -0,0 +1,222 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace memgraph::storage::v3 { + +using boost::uuids::uuid; + +using memgraph::coordinator::CoordinatorWriteRequests; +using memgraph::coordinator::CoordinatorWriteResponses; +using memgraph::coordinator::HeartbeatRequest; +using memgraph::coordinator::HeartbeatResponse; +using memgraph::io::Address; +using memgraph::io::Duration; +using memgraph::io::Message; +using memgraph::io::RequestId; +using memgraph::io::ResponseFuture; +using memgraph::io::Time; +using memgraph::io::messages::CoordinatorMessages; +using memgraph::io::messages::ShardManagerMessages; +using memgraph::io::messages::ShardMessages; +using memgraph::io::rsm::Raft; +using memgraph::io::rsm::ShardRsm; +using memgraph::io::rsm::StorageReadRequest; +using memgraph::io::rsm::StorageReadResponse; +using memgraph::io::rsm::StorageWriteRequest; +using memgraph::io::rsm::StorageWriteResponse; +using memgraph::io::rsm::WriteRequest; +using memgraph::io::rsm::WriteResponse; + +using ShardManagerOrRsmMessage = std::variant; +using TimeUuidPair = std::pair; + +template +using ShardRaft = + Raft; + +using namespace std::chrono_literals; +static constexpr Duration kMinimumCronInterval = 1000ms; +static constexpr Duration kMaximumCronInterval = 2000ms; +static_assert(kMinimumCronInterval < kMaximumCronInterval, + "The minimum cron interval has to be smaller than the maximum cron interval!"); + +/// The ShardManager is responsible for: +/// * reconciling the storage engine's local configuration with the Coordinator's +/// intentions for how it should participate in multiple raft clusters +/// * replying to heartbeat requests to the Coordinator +/// * routing incoming messages to the appropriate sRSM +/// +/// Every storage engine has exactly one RsmEngine. +template +class ShardManager { + public: + ShardManager(io::Io io, Address coordinator_leader) : io_(io), coordinator_leader_(coordinator_leader) {} + + /// Periodic protocol maintenance. Returns the time that Cron should be called again + /// in the future. + Time Cron() { + spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString()); + Time now = io_.Now(); + + if (now >= next_cron_) { + Reconciliation(); + + std::uniform_int_distribution time_distrib(kMinimumCronInterval.count(), kMaximumCronInterval.count()); + + const auto rand = io_.Rand(time_distrib); + + next_cron_ = now + Duration{rand}; + } + + if (!cron_schedule_.empty()) { + const auto &[time, uuid] = cron_schedule_.top(); + + if (time <= now) { + auto &rsm = rsm_map_.at(uuid); + Time next_for_uuid = rsm.Cron(); + + cron_schedule_.pop(); + cron_schedule_.push(std::make_pair(next_for_uuid, uuid)); + + const auto &[next_time, _uuid] = cron_schedule_.top(); + + return std::min(next_cron_, next_time); + } + } + + return next_cron_; + } + + /// Returns the Address for our underlying Io implementation + Address GetAddress() { return io_.GetAddress(); } + + void Receive(ShardManagerMessages &&smm, RequestId request_id, Address from) {} + + void Route(ShardMessages &&sm, RequestId request_id, Address to, Address from) { + Address address = io_.GetAddress(); + + MG_ASSERT(address.last_known_port == to.last_known_port); + MG_ASSERT(address.last_known_ip == to.last_known_ip); + + auto &rsm = rsm_map_.at(to.unique_id); + + rsm.Handle(std::forward(sm), request_id, from); + } + + private: + io::Io io_; + std::map> rsm_map_; + std::priority_queue, std::vector>, std::greater<>> cron_schedule_; + Time next_cron_; + Address coordinator_leader_; + std::optional>> heartbeat_res_; + + // TODO(tyler) over time remove items from initialized_but_not_confirmed_rsm_ + // after the Coordinator is clearly aware of them + std::set initialized_but_not_confirmed_rsm_; + + void Reconciliation() { + if (heartbeat_res_.has_value()) { + if (heartbeat_res_->IsReady()) { + io::ResponseResult> response_result = + std::move(heartbeat_res_).value().Wait(); + heartbeat_res_.reset(); + + if (response_result.HasError()) { + spdlog::error("SM timed out while trying to reach C"); + } else { + auto response_envelope = response_result.GetValue(); + WriteResponse wr = response_envelope.message; + + if (wr.retry_leader.has_value()) { + spdlog::info("SM redirected to new C leader"); + coordinator_leader_ = wr.retry_leader.value(); + } else if (wr.success) { + CoordinatorWriteResponses cwr = wr.write_return; + HeartbeatResponse hr = std::get(cwr); + spdlog::info("SM received heartbeat response from C"); + + EnsureShardsInitialized(hr); + } + } + } else { + return; + } + } + + HeartbeatRequest req{ + .from_storage_manager = GetAddress(), + .initialized_rsms = initialized_but_not_confirmed_rsm_, + }; + + CoordinatorWriteRequests cwr = req; + WriteRequest ww; + ww.operation = cwr; + + spdlog::info("SM sending heartbeat to coordinator {}", coordinator_leader_.ToString()); + heartbeat_res_.emplace(std::move( + io_.template Request, WriteResponse>( + coordinator_leader_, ww))); + spdlog::info("SM sent heartbeat"); + } + + void EnsureShardsInitialized(HeartbeatResponse hr) { + for (const auto &shard_to_initialize : hr.shards_to_initialize) { + InitializeRsm(shard_to_initialize); + initialized_but_not_confirmed_rsm_.emplace(shard_to_initialize.uuid); + } + } + + /// Returns true if the RSM was able to be initialized, and false if it was already initialized + void InitializeRsm(coordinator::ShardToInitialize to_init) { + if (rsm_map_.contains(to_init.uuid)) { + // it's not a bug for the coordinator to send us UUIDs that we have + // already created, because there may have been lag that caused + // the coordinator not to hear back from us. + return; + } + + auto rsm_io = io_.ForkLocal(); + auto io_addr = rsm_io.GetAddress(); + io_addr.unique_id = to_init.uuid; + rsm_io.SetAddress(io_addr); + + // TODO(tyler) get geers from Coordinator in HeartbeatResponse + std::vector
rsm_peers = {}; + + // TODO(everbody) change this to storage::Shard + ShardRsm rsm_state{}; + + ShardRaft rsm{std::move(rsm_io), rsm_peers, std::move(rsm_state)}; + + spdlog::info("SM created a new shard with UUID {}", to_init.uuid); + + rsm_map_.emplace(to_init.uuid, std::move(rsm)); + } +}; + +} // namespace memgraph::storage::v3 diff --git a/tests/simulation/CMakeLists.txt b/tests/simulation/CMakeLists.txt index b44dacb36..a693a1d9d 100644 --- a/tests/simulation/CMakeLists.txt +++ b/tests/simulation/CMakeLists.txt @@ -14,7 +14,7 @@ function(add_simulation_test test_cpp san) # used to help create two targets of the same name even though CMake # requires unique logical target names set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name}) - target_link_libraries(${target_name} gtest gmock mg-utils mg-io mg-io-simulator) + target_link_libraries(${target_name} gtest gmock mg-utils mg-io mg-coordinator mg-io-simulator mg-storage-v3) # sanitize target_compile_options(${target_name} PRIVATE -fsanitize=${san}) diff --git a/tests/simulation/sharded_map.cpp b/tests/simulation/sharded_map.cpp index cd9d7db1a..5b1fb2085 100644 --- a/tests/simulation/sharded_map.cpp +++ b/tests/simulation/sharded_map.cpp @@ -32,13 +32,14 @@ #include "storage/v3/schemas.hpp" #include "utils/result.hpp" +using memgraph::common::SchemaType; using memgraph::coordinator::AddressAndStatus; -using memgraph::coordinator::CompoundKey; using memgraph::coordinator::Coordinator; using memgraph::coordinator::CoordinatorClient; using memgraph::coordinator::CoordinatorRsm; using memgraph::coordinator::HlcRequest; using memgraph::coordinator::HlcResponse; +using memgraph::coordinator::PrimaryKey; using memgraph::coordinator::Shard; using memgraph::coordinator::ShardMap; using memgraph::coordinator::Shards; @@ -65,9 +66,11 @@ using memgraph::io::simulator::SimulatorConfig; using memgraph::io::simulator::SimulatorStats; using memgraph::io::simulator::SimulatorTransport; using memgraph::storage::v3::LabelId; +using memgraph::storage::v3::PropertyValue; using memgraph::storage::v3::SchemaProperty; using memgraph::utils::BasicResult; +using PrimaryKey = std::vector; using ShardClient = RsmClient; namespace { @@ -83,18 +86,20 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add const auto properties = sm.AllocatePropertyIds(property_names); const auto property_id_1 = properties.at("property_1"); const auto property_id_2 = properties.at("property_2"); - const auto type_1 = memgraph::common::SchemaType::INT; - const auto type_2 = memgraph::common::SchemaType::INT; + const auto type_1 = SchemaType::INT; + const auto type_2 = SchemaType::INT; // register new label space std::vector schema = { SchemaProperty{.property_id = property_id_1, .type = type_1}, SchemaProperty{.property_id = property_id_2, .type = type_2}, }; - bool label_success = sm.InitializeNewLabel(label_name, schema, sm.shard_map_version); - MG_ASSERT(label_success); + size_t replication_factor = 3; + std::optional label_id_opt = + sm.InitializeNewLabel(label_name, schema, replication_factor, sm.shard_map_version); + MG_ASSERT(label_id_opt.has_value()); - const LabelId label_id = sm.labels.at(label_name); + const LabelId label_id = label_id_opt.value(); auto &label_space = sm.label_spaces.at(label_id); Shards &shards_for_label = label_space.shards; @@ -105,9 +110,9 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add Shard shard1 = {aas1_1, aas1_2, aas1_3}; - const auto key1 = memgraph::storage::v3::PropertyValue(0); - const auto key2 = memgraph::storage::v3::PropertyValue(0); - const CompoundKey compound_key_1 = {key1, key2}; + const auto key1 = PropertyValue(0); + const auto key2 = PropertyValue(0); + const PrimaryKey compound_key_1 = {key1, key2}; shards_for_label.emplace(compound_key_1, shard1); // add second shard at [12, 13] @@ -117,9 +122,9 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add Shard shard2 = {aas2_1, aas2_2, aas2_3}; - auto key3 = memgraph::storage::v3::PropertyValue(12); - auto key4 = memgraph::storage::v3::PropertyValue(13); - CompoundKey compound_key_2 = {key3, key4}; + auto key3 = PropertyValue(12); + auto key4 = PropertyValue(13); + PrimaryKey compound_key_2 = {key3, key4}; shards_for_label[compound_key_2] = shard2; return sm; @@ -263,11 +268,11 @@ int main() { req.last_shard_map_version = client_shard_map.GetHlc(); while (true) { - // Create CompoundKey - const auto cm_key_1 = memgraph::storage::v3::PropertyValue(3); - const auto cm_key_2 = memgraph::storage::v3::PropertyValue(4); + // Create PrimaryKey + const auto cm_key_1 = PropertyValue(3); + const auto cm_key_2 = PropertyValue(4); - const CompoundKey compound_key = {cm_key_1, cm_key_2}; + const PrimaryKey compound_key = {cm_key_1, cm_key_2}; // Look for Shard BasicResult read_res = diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 1aab3186e..53c51c2ed 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -391,10 +391,14 @@ add_custom_target(test_lcp ALL DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/test_lcp) add_test(test_lcp ${CMAKE_CURRENT_BINARY_DIR}/test_lcp) add_dependencies(memgraph__unit test_lcp) -# Test future +# Test Future add_unit_test(future.cpp) target_link_libraries(${test_prefix}future mg-io) -# Test Local Transport +# Test LocalTransport add_unit_test(local_transport.cpp) target_link_libraries(${test_prefix}local_transport mg-io) + +# Test MachineManager with LocalTransport +add_unit_test(machine_manager.cpp) +target_link_libraries(${test_prefix}machine_manager mg-io mg-coordinator mg-storage-v3) diff --git a/tests/unit/machine_manager.cpp b/tests/unit/machine_manager.cpp new file mode 100644 index 000000000..3c3a69ff1 --- /dev/null +++ b/tests/unit/machine_manager.cpp @@ -0,0 +1,209 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "io/rsm/rsm_client.hpp" +#include "io/rsm/shard_rsm.hpp" +#include "storage/v3/id_types.hpp" +#include "storage/v3/schemas.hpp" + +namespace memgraph::io::tests { + +using memgraph::coordinator::Coordinator; +using memgraph::coordinator::CoordinatorClient; +using memgraph::coordinator::CoordinatorReadRequests; +using memgraph::coordinator::CoordinatorReadResponses; +using memgraph::coordinator::CoordinatorWriteRequests; +using memgraph::coordinator::CoordinatorWriteResponses; +using memgraph::coordinator::Hlc; +using memgraph::coordinator::HlcResponse; +using memgraph::coordinator::Shard; +using memgraph::coordinator::ShardMap; +using memgraph::io::Io; +using memgraph::io::local_transport::LocalSystem; +using memgraph::io::local_transport::LocalTransport; +using memgraph::io::rsm::RsmClient; +using memgraph::io::rsm::StorageReadRequest; +using memgraph::io::rsm::StorageReadResponse; +using memgraph::io::rsm::StorageWriteRequest; +using memgraph::io::rsm::StorageWriteResponse; +using memgraph::machine_manager::MachineConfig; +using memgraph::machine_manager::MachineManager; +using memgraph::storage::v3::LabelId; +using memgraph::storage::v3::SchemaProperty; + +using CompoundKey = std::vector; +using ShardClient = + RsmClient; + +ShardMap TestShardMap() { + ShardMap sm{}; + + const std::string label_name = std::string("test_label"); + + // register new properties + const std::vector property_names = {"property_1", "property_2"}; + const auto properties = sm.AllocatePropertyIds(property_names); + const auto property_id_1 = properties.at("property_1"); + const auto property_id_2 = properties.at("property_2"); + const auto type_1 = memgraph::common::SchemaType::INT; + const auto type_2 = memgraph::common::SchemaType::INT; + + // register new label space + std::vector schema = { + SchemaProperty{.property_id = property_id_1, .type = type_1}, + SchemaProperty{.property_id = property_id_2, .type = type_2}, + }; + + const size_t replication_factor = 1; + + std::optional label_id = sm.InitializeNewLabel(label_name, schema, replication_factor, sm.shard_map_version); + MG_ASSERT(label_id); + + // split the shard at N split points + // NB: this is the logic that should be provided by the "split file" + // TODO(tyler) split points should account for signedness + const size_t n_splits = 16; + const auto split_interval = std::numeric_limits::max() / n_splits; + + for (int64_t i = 0; i < n_splits; ++i) { + const int64_t value = i * split_interval; + + const auto key1 = memgraph::storage::v3::PropertyValue(value); + const auto key2 = memgraph::storage::v3::PropertyValue(0); + + const CompoundKey split_point = {key1, key2}; + + const bool split_success = sm.SplitShard(sm.shard_map_version, label_id.value(), split_point); + + MG_ASSERT(split_success); + } + + return sm; +} + +MachineManager MkMm(LocalSystem &local_system, std::vector
coordinator_addresses, Address addr, + ShardMap shard_map) { + MachineConfig config{ + .coordinator_addresses = coordinator_addresses, + .is_storage = true, + .is_coordinator = true, + .listen_ip = addr.last_known_ip, + .listen_port = addr.last_known_port, + }; + + Io io = local_system.Register(addr); + + Coordinator coordinator{shard_map}; + + return MachineManager{io, config, coordinator}; +} + +void RunMachine(MachineManager mm) { mm.Run(); } + +void WaitForShardsToInitialize(CoordinatorClient &cc) { + // TODO(tyler) call coordinator client's read method for GetShardMap + // and keep reading it until the shard map contains proper replicas + // for each shard in the label space. +} + +TEST(MachineManager, BasicFunctionality) { + LocalSystem local_system; + + auto cli_addr = Address::TestAddress(1); + auto machine_1_addr = cli_addr.ForkUniqueAddress(); + + Io cli_io = local_system.Register(cli_addr); + + auto coordinator_addresses = std::vector{ + machine_1_addr, + }; + + ShardMap initialization_sm = TestShardMap(); + + auto mm_1 = MkMm(local_system, coordinator_addresses, machine_1_addr, initialization_sm); + Address coordinator_address = mm_1.CoordinatorAddress(); + + auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1)); + + // TODO(tyler) clarify addresses of coordinator etc... as it's a mess + CoordinatorClient cc{cli_io, coordinator_address, {coordinator_address}}; + + WaitForShardsToInitialize(cc); + + using namespace std::chrono_literals; + std::this_thread::sleep_for(2010ms); + + // get ShardMap from coordinator + memgraph::coordinator::HlcRequest req; + req.last_shard_map_version = Hlc{ + .logical_id = 0, + }; + + BasicResult read_res = cc.SendWriteRequest(req); + MG_ASSERT(!read_res.HasError(), "HLC request unexpectedly timed out"); + + auto coordinator_read_response = read_res.GetValue(); + HlcResponse hlc_response = std::get(coordinator_read_response); + ShardMap sm = hlc_response.fresher_shard_map.value(); + + // Get shard for key and create rsm client + const auto cm_key_1 = memgraph::storage::v3::PropertyValue(3); + const auto cm_key_2 = memgraph::storage::v3::PropertyValue(4); + + const CompoundKey compound_key = {cm_key_1, cm_key_2}; + + std::string label_name = "test_label"; + + Shard shard_for_key = sm.GetShardForKey(label_name, compound_key); + + auto shard_for_client = std::vector
{}; + + for (const auto &aas : shard_for_key) { + spdlog::info("got address for shard: {}", aas.address.ToString()); + shard_for_client.push_back(aas.address); + } + + ShardClient shard_client{cli_io, shard_for_client[0], shard_for_client}; + + // submit a read request and assert that the requested key does not yet exist + + LabelId label_id = sm.labels.at(label_name); + StorageReadRequest storage_get_req; + storage_get_req.label_id = label_id; + storage_get_req.key = compound_key; + storage_get_req.transaction_id = hlc_response.new_hlc; + + auto get_response_result = shard_client.SendReadRequest(storage_get_req); + auto get_response = get_response_result.GetValue(); + auto val = get_response.value; + + MG_ASSERT(!val.has_value()); + + local_system.ShutDown(); +}; + +} // namespace memgraph::io::tests