Machine manager and shard stitch (#569)

This commit is contained in:
Tyler Neely 2022-09-22 13:55:16 +02:00 committed by GitHub
parent b8186bea2e
commit ce788f5f65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 41 additions and 35 deletions

View File

@ -26,6 +26,7 @@
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/schemas.hpp"
#include "storage/v3/temporal.hpp"
namespace memgraph::coordinator {

View File

@ -15,7 +15,7 @@
#include <coordinator/coordinator.hpp>
#include <io/rsm/raft.hpp>
#include <io/rsm/shard_rsm.hpp>
#include "query/v2/requests.hpp"
#include "utils/concepts.hpp"
namespace memgraph::io::messages {
@ -23,10 +23,8 @@ namespace memgraph::io::messages {
using memgraph::coordinator::CoordinatorReadRequests;
using memgraph::coordinator::CoordinatorWriteRequests;
using memgraph::coordinator::CoordinatorWriteResponses;
// TODO(everbody) change these to the real shard messages
using memgraph::io::rsm::StorageReadRequest;
using memgraph::io::rsm::StorageWriteRequest;
using StorageReadRequest = msgs::ReadRequests;
using StorageWriteRequest = msgs::WriteRequests;
using memgraph::io::rsm::AppendRequest;
using memgraph::io::rsm::AppendResponse;

View File

@ -38,7 +38,7 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre
server_addresses_.insert(address);
while (true) {
const size_t blocked_servers = blocked_on_receive_;
const size_t blocked_servers = blocked_on_receive_.size();
const bool all_servers_blocked = blocked_servers == server_addresses_.size();
@ -53,7 +53,7 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre
bool SimulatorHandle::MaybeTickSimulator() {
std::unique_lock<std::mutex> lock(mu_);
const size_t blocked_servers = blocked_on_receive_;
const size_t blocked_servers = blocked_on_receive_.size();
if (blocked_servers < server_addresses_.size()) {
// we only need to advance the simulator when all

View File

@ -48,7 +48,7 @@ class SimulatorHandle {
Time cluster_wide_time_microseconds_;
bool should_shut_down_ = false;
SimulatorStats stats_;
size_t blocked_on_receive_ = 0;
std::set<Address> blocked_on_receive_;
std::set<Address> server_addresses_;
std::mt19937 rng_;
SimulatorConfig config_;
@ -115,7 +115,7 @@ class SimulatorHandle {
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(const Address &receiver, Duration timeout) {
std::unique_lock<std::mutex> lock(mu_);
blocked_on_receive_ += 1;
blocked_on_receive_.emplace(receiver);
const Time deadline = cluster_wide_time_microseconds_ + timeout;
@ -130,7 +130,7 @@ class SimulatorHandle {
// than asserting that the last item in can_rx matches.
auto m_opt = std::move(message).Take<Ms...>();
blocked_on_receive_ -= 1;
blocked_on_receive_.erase(receiver);
return std::move(m_opt).value();
}
@ -144,7 +144,7 @@ class SimulatorHandle {
}
}
blocked_on_receive_ -= 1;
blocked_on_receive_.erase(receiver);
return TimedOut{};
}

View File

@ -11,8 +11,6 @@
#pragma once
#include <boost/uuid/uuid.hpp>
#include <coordinator/coordinator_rsm.hpp>
#include <io/message_conversion.hpp>
#include <io/messages.hpp>
@ -36,6 +34,8 @@ using memgraph::io::Time;
using memgraph::io::messages::CoordinatorMessages;
using memgraph::io::messages::ShardManagerMessages;
using memgraph::io::messages::ShardMessages;
using memgraph::io::messages::StorageReadRequest;
using memgraph::io::messages::StorageWriteRequest;
using memgraph::io::rsm::AppendRequest;
using memgraph::io::rsm::AppendResponse;
using memgraph::io::rsm::ReadRequest;
@ -45,9 +45,6 @@ using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using memgraph::storage::v3::ShardManager;
using memgraph::io::rsm::StorageReadRequest;
using memgraph::io::rsm::StorageWriteRequest;
/// The MachineManager is responsible for:
/// * starting the entire system and ensuring that high-level
/// operational requirements continue to be met

View File

@ -21,9 +21,12 @@
#include <io/message_conversion.hpp>
#include <io/messages.hpp>
#include <io/rsm/raft.hpp>
#include <io/rsm/shard_rsm.hpp>
#include <io/time.hpp>
#include <io/transport.hpp>
#include <query/v2/requests.hpp>
#include <storage/v3/shard.hpp>
#include <storage/v3/shard_rsm.hpp>
#include "storage/v3/config.hpp"
namespace memgraph::storage::v3 {
@ -43,20 +46,19 @@ using memgraph::io::messages::CoordinatorMessages;
using memgraph::io::messages::ShardManagerMessages;
using memgraph::io::messages::ShardMessages;
using memgraph::io::rsm::Raft;
using memgraph::io::rsm::ShardRsm;
using memgraph::io::rsm::StorageReadRequest;
using memgraph::io::rsm::StorageReadResponse;
using memgraph::io::rsm::StorageWriteRequest;
using memgraph::io::rsm::StorageWriteResponse;
using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using memgraph::msgs::ReadRequests;
using memgraph::msgs::ReadResponses;
using memgraph::msgs::WriteRequests;
using memgraph::msgs::WriteResponses;
using memgraph::storage::v3::ShardRsm;
using ShardManagerOrRsmMessage = std::variant<ShardMessages, ShardManagerMessages>;
using TimeUuidPair = std::pair<Time, uuid>;
template <typename IoImpl>
using ShardRaft =
Raft<IoImpl, ShardRsm, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse>;
using ShardRaft = Raft<IoImpl, ShardRsm, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
using namespace std::chrono_literals;
static constexpr Duration kMinimumCronInterval = 1000ms;
@ -208,8 +210,10 @@ class ShardManager {
// TODO(tyler) get geers from Coordinator in HeartbeatResponse
std::vector<Address> rsm_peers = {};
// TODO(everbody) change this to storage::Shard
ShardRsm rsm_state{};
std::unique_ptr<Shard> shard =
std::make_unique<Shard>(to_init.label_id, to_init.min_key, to_init.max_key, to_init.config);
ShardRsm rsm_state{std::move(shard)};
ShardRaft<IoImpl> rsm{std::move(rsm_io), rsm_peers, std::move(rsm_state)};

View File

@ -25,8 +25,8 @@
#include <io/transport.hpp>
#include <machine_manager/machine_config.hpp>
#include <machine_manager/machine_manager.hpp>
#include <query/v2/requests.hpp>
#include "io/rsm/rsm_client.hpp"
#include "io/rsm/shard_rsm.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/schemas.hpp"
@ -45,19 +45,19 @@ using memgraph::coordinator::ShardMap;
using memgraph::io::Io;
using memgraph::io::local_transport::LocalSystem;
using memgraph::io::local_transport::LocalTransport;
using memgraph::io::rsm::RsmClient;
using memgraph::io::rsm::StorageReadRequest;
using memgraph::io::rsm::StorageReadResponse;
using memgraph::io::rsm::StorageWriteRequest;
using memgraph::io::rsm::StorageWriteResponse;
using memgraph::machine_manager::MachineConfig;
using memgraph::machine_manager::MachineManager;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::SchemaProperty;
using memgraph::io::rsm::RsmClient;
using memgraph::msgs::ReadRequests;
using memgraph::msgs::ReadResponses;
using memgraph::msgs::WriteRequests;
using memgraph::msgs::WriteResponses;
using CompoundKey = std::vector<memgraph::storage::v3::PropertyValue>;
using ShardClient =
RsmClient<LocalTransport, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse>;
using ShardClient = RsmClient<LocalTransport, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
ShardMap TestShardMap() {
ShardMap sm{};
@ -192,16 +192,22 @@ TEST(MachineManager, BasicFunctionality) {
// submit a read request and assert that the requested key does not yet exist
LabelId label_id = sm.labels.at(label_name);
StorageReadRequest storage_get_req;
ReadRequests storage_get_req;
/*
TODO(tyler,kostas) set this to a real request
storage_get_req.label_id = label_id;
storage_get_req.key = compound_key;
storage_get_req.transaction_id = hlc_response.new_hlc;
*/
auto get_response_result = shard_client.SendReadRequest(storage_get_req);
auto get_response = get_response_result.GetValue();
/*
auto val = get_response.value;
MG_ASSERT(!val.has_value());
*/
local_system.ShutDown();
};