Temporarily duplicate shard management logic from ShardManger in ShardWorker

This commit is contained in:
Tyler Neely 2022-10-31 17:25:08 +00:00
parent 6138277972
commit d0cad6e6ba
2 changed files with 56 additions and 12 deletions

View File

@ -78,9 +78,8 @@ static_assert(kMinimumCronInterval < kMaximumCronInterval,
template <typename IoImpl> template <typename IoImpl>
class ShardManager { class ShardManager {
public: public:
ShardManager(io::Io<IoImpl> io, size_t n_shard_worker_threads, Address coordinator_leader, ShardManager(io::Io<IoImpl> io, size_t n_shard_worker_threads, Address coordinator_leader)
coordinator::ShardMap shard_map) : io_(io), coordinator_leader_(coordinator_leader) {
: io_(io), coordinator_leader_(coordinator_leader), shard_map_{std::move(shard_map)} {
MG_ASSERT(n_shard_worker_threads >= 1); MG_ASSERT(n_shard_worker_threads >= 1);
shard_worker::Queue queue; shard_worker::Queue queue;
@ -234,7 +233,6 @@ class ShardManager {
} }
} }
/// Returns true if the RSM was able to be initialized, and false if it was already initialized
void InitializeRsm(coordinator::ShardToInitialize to_init) { void InitializeRsm(coordinator::ShardToInitialize to_init) {
if (rsm_map_.contains(to_init.uuid)) { if (rsm_map_.contains(to_init.uuid)) {
// it's not a bug for the coordinator to send us UUIDs that we have // it's not a bug for the coordinator to send us UUIDs that we have

View File

@ -22,6 +22,7 @@
#include "coordinator/shard_map.hpp" #include "coordinator/shard_map.hpp"
#include "io/address.hpp" #include "io/address.hpp"
#include "io/future.hpp" #include "io/future.hpp"
#include "io/messages.hpp"
#include "io/rsm/raft.hpp" #include "io/rsm/raft.hpp"
#include "io/time.hpp" #include "io/time.hpp"
#include "io/transport.hpp" #include "io/transport.hpp"
@ -33,12 +34,16 @@ namespace memgraph::storage::v3::shard_worker {
/// Obligations: /// Obligations:
/// * ShutDown /// * ShutDown
/// * Cron /// * Cron
/// * Handle /// * RouteMessage
/// * InitializeRsm /// * ShardToInitialize
using boost::uuids::uuid; using boost::uuids::uuid;
using coordinator::ShardToInitialize;
using io::Address;
using io::RequestId;
using io::Time; using io::Time;
using io::messages::ShardMessages;
using io::rsm::Raft; using io::rsm::Raft;
using msgs::ReadRequests; using msgs::ReadRequests;
using msgs::ReadResponses; using msgs::ReadResponses;
@ -57,11 +62,14 @@ struct Cron {
io::Promise<io::Time> request_next_cron_at; io::Promise<io::Time> request_next_cron_at;
}; };
struct InitializeRsm {}; struct RouteMessage {
ShardMessages message;
RequestId request_id;
Address to;
Address from;
};
struct Handle {}; using Message = std::variant<ShutDown, Cron, ShardToInitialize, RouteMessage>;
using Message = std::variant<ShutDown, Cron, InitializeRsm, Handle>;
struct QueueInner { struct QueueInner {
std::mutex mu{}; std::mutex mu{};
@ -129,9 +137,19 @@ class ShardWorker {
return true; return true;
} }
bool Process(InitializeRsm &&initialize_rsm) { return true; } bool Process(ShardToInitialize &&shard_to_initialize) {
InitializeRsm(std::forward<ShardToInitialize>(shard_to_initialize));
bool Process(Handle &&handle) { return true; } return true;
}
bool Process(RouteMessage &&route_message) {
auto &rsm = rsm_map_.at(route_message.to.unique_id);
rsm.Handle(std::move(route_message.message), route_message.request_id, route_message.from);
return true;
}
Time Cron() { Time Cron() {
spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString()); spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString());
@ -154,6 +172,34 @@ class ShardWorker {
return next_cron_; return next_cron_;
} }
void InitializeRsm(ShardToInitialize to_init) {
if (rsm_map_.contains(to_init.uuid)) {
// it's not a bug for the coordinator to send us UUIDs that we have
// already created, because there may have been lag that caused
// the coordinator not to hear back from us.
return;
}
auto rsm_io = io_.ForkLocal();
auto io_addr = rsm_io.GetAddress();
io_addr.unique_id = to_init.uuid;
rsm_io.SetAddress(io_addr);
// TODO(tyler) get peers from Coordinator in HeartbeatResponse
std::vector<Address> rsm_peers = {};
std::unique_ptr<Shard> shard = std::make_unique<Shard>(to_init.label_id, to_init.min_key, to_init.max_key,
to_init.schema, to_init.config, to_init.id_to_names);
ShardRsm rsm_state{std::move(shard)};
ShardRaft<IoImpl> rsm{std::move(rsm_io), rsm_peers, std::move(rsm_state)};
spdlog::info("SM created a new shard with UUID {}", to_init.uuid);
rsm_map_.emplace(to_init.uuid, std::move(rsm));
}
public: public:
ShardWorker(io::Io<IoImpl> io, Queue queue) : io_(io), queue_(queue) {} ShardWorker(io::Io<IoImpl> io, Queue queue) : io_(io), queue_(queue) {}
ShardWorker(ShardWorker &&) = default; ShardWorker(ShardWorker &&) = default;