From d0cad6e6baae84e03943d9ec13109de4dfbedaee Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 31 Oct 2022 17:25:08 +0000 Subject: [PATCH] Temporarily duplicate shard management logic from ShardManger in ShardWorker --- src/storage/v3/shard_manager.hpp | 6 ++-- src/storage/v3/shard_worker.hpp | 62 +++++++++++++++++++++++++++----- 2 files changed, 56 insertions(+), 12 deletions(-) diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp index 8f53c7a90..0b13b1f09 100644 --- a/src/storage/v3/shard_manager.hpp +++ b/src/storage/v3/shard_manager.hpp @@ -78,9 +78,8 @@ static_assert(kMinimumCronInterval < kMaximumCronInterval, template class ShardManager { public: - ShardManager(io::Io io, size_t n_shard_worker_threads, Address coordinator_leader, - coordinator::ShardMap shard_map) - : io_(io), coordinator_leader_(coordinator_leader), shard_map_{std::move(shard_map)} { + ShardManager(io::Io io, size_t n_shard_worker_threads, Address coordinator_leader) + : io_(io), coordinator_leader_(coordinator_leader) { MG_ASSERT(n_shard_worker_threads >= 1); shard_worker::Queue queue; @@ -234,7 +233,6 @@ class ShardManager { } } - /// Returns true if the RSM was able to be initialized, and false if it was already initialized void InitializeRsm(coordinator::ShardToInitialize to_init) { if (rsm_map_.contains(to_init.uuid)) { // it's not a bug for the coordinator to send us UUIDs that we have diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp index 3ec1d9bb9..0cbae8c6c 100644 --- a/src/storage/v3/shard_worker.hpp +++ b/src/storage/v3/shard_worker.hpp @@ -22,6 +22,7 @@ #include "coordinator/shard_map.hpp" #include "io/address.hpp" #include "io/future.hpp" +#include "io/messages.hpp" #include "io/rsm/raft.hpp" #include "io/time.hpp" #include "io/transport.hpp" @@ -33,12 +34,16 @@ namespace memgraph::storage::v3::shard_worker { /// Obligations: /// * ShutDown /// * Cron -/// * Handle -/// * InitializeRsm +/// * RouteMessage +/// * ShardToInitialize using boost::uuids::uuid; +using coordinator::ShardToInitialize; +using io::Address; +using io::RequestId; using io::Time; +using io::messages::ShardMessages; using io::rsm::Raft; using msgs::ReadRequests; using msgs::ReadResponses; @@ -57,11 +62,14 @@ struct Cron { io::Promise request_next_cron_at; }; -struct InitializeRsm {}; +struct RouteMessage { + ShardMessages message; + RequestId request_id; + Address to; + Address from; +}; -struct Handle {}; - -using Message = std::variant; +using Message = std::variant; struct QueueInner { std::mutex mu{}; @@ -129,9 +137,19 @@ class ShardWorker { return true; } - bool Process(InitializeRsm &&initialize_rsm) { return true; } + bool Process(ShardToInitialize &&shard_to_initialize) { + InitializeRsm(std::forward(shard_to_initialize)); - bool Process(Handle &&handle) { return true; } + return true; + } + + bool Process(RouteMessage &&route_message) { + auto &rsm = rsm_map_.at(route_message.to.unique_id); + + rsm.Handle(std::move(route_message.message), route_message.request_id, route_message.from); + + return true; + } Time Cron() { spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString()); @@ -154,6 +172,34 @@ class ShardWorker { return next_cron_; } + void InitializeRsm(ShardToInitialize to_init) { + if (rsm_map_.contains(to_init.uuid)) { + // it's not a bug for the coordinator to send us UUIDs that we have + // already created, because there may have been lag that caused + // the coordinator not to hear back from us. + return; + } + + auto rsm_io = io_.ForkLocal(); + auto io_addr = rsm_io.GetAddress(); + io_addr.unique_id = to_init.uuid; + rsm_io.SetAddress(io_addr); + + // TODO(tyler) get peers from Coordinator in HeartbeatResponse + std::vector
rsm_peers = {}; + + std::unique_ptr shard = std::make_unique(to_init.label_id, to_init.min_key, to_init.max_key, + to_init.schema, to_init.config, to_init.id_to_names); + + ShardRsm rsm_state{std::move(shard)}; + + ShardRaft rsm{std::move(rsm_io), rsm_peers, std::move(rsm_state)}; + + spdlog::info("SM created a new shard with UUID {}", to_init.uuid); + + rsm_map_.emplace(to_init.uuid, std::move(rsm)); + } + public: ShardWorker(io::Io io, Queue queue) : io_(io), queue_(queue) {} ShardWorker(ShardWorker &&) = default;