Significantly improve the determinism of the coordinator, UUID generation, the machine manager, the shard manager, and the cluster property test
This commit is contained in:
parent
3ad8489735
commit
ce45a548c7
src
coordinator
io
machine_manager
query/v2
storage/v3
tests/simulation
@ -71,6 +71,9 @@ struct QueueInner {
|
||||
// starvation by sometimes randomizing priorities, rather than following a strict
|
||||
// prioritization.
|
||||
std::deque<Message> queue;
|
||||
|
||||
uint64_t submitted = 0;
|
||||
uint64_t calls_to_pop = 0;
|
||||
};
|
||||
|
||||
/// There are two reasons to implement our own Queue instead of using
|
||||
@ -86,6 +89,8 @@ class Queue {
|
||||
MG_ASSERT(inner_.use_count() > 0);
|
||||
std::unique_lock<std::mutex> lock(inner_->mu);
|
||||
|
||||
inner_->submitted++;
|
||||
|
||||
inner_->queue.emplace_back(std::move(message));
|
||||
} // lock dropped before notifying condition variable
|
||||
|
||||
@ -96,6 +101,9 @@ class Queue {
|
||||
MG_ASSERT(inner_.use_count() > 0);
|
||||
std::unique_lock<std::mutex> lock(inner_->mu);
|
||||
|
||||
inner_->calls_to_pop++;
|
||||
inner_->cv.notify_all();
|
||||
|
||||
while (inner_->queue.empty()) {
|
||||
inner_->cv.wait(lock);
|
||||
}
|
||||
@ -105,6 +113,15 @@ class Queue {
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
void BlockOnQuiescence() const {
|
||||
MG_ASSERT(inner_.use_count() > 0);
|
||||
std::unique_lock<std::mutex> lock(inner_->mu);
|
||||
|
||||
while (inner_->calls_to_pop <= inner_->submitted) {
|
||||
inner_->cv.wait(lock);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// A CoordinatorWorker owns Raft<CoordinatorRsm> instances. receives messages from the MachineManager.
|
||||
@ -129,9 +146,7 @@ class CoordinatorWorker {
|
||||
|
||||
public:
|
||||
CoordinatorWorker(io::Io<IoImpl> io, Queue queue, Coordinator coordinator)
|
||||
: io_(std::move(io)),
|
||||
queue_(std::move(queue)),
|
||||
coordinator_{std::move(io_.ForkLocal()), {}, std::move(coordinator)} {}
|
||||
: io_(std::move(io)), queue_(std::move(queue)), coordinator_{std::move(io_), {}, std::move(coordinator)} {}
|
||||
|
||||
CoordinatorWorker(CoordinatorWorker &&) noexcept = default;
|
||||
CoordinatorWorker &operator=(CoordinatorWorker &&) noexcept = default;
|
||||
@ -140,15 +155,12 @@ class CoordinatorWorker {
|
||||
~CoordinatorWorker() = default;
|
||||
|
||||
void Run() {
|
||||
while (true) {
|
||||
bool should_continue = true;
|
||||
while (should_continue) {
|
||||
Message message = queue_.Pop();
|
||||
|
||||
const bool should_continue = std::visit(
|
||||
[this](auto &&msg) { return this->Process(std::forward<decltype(msg)>(msg)); }, std::move(message));
|
||||
|
||||
if (!should_continue) {
|
||||
return;
|
||||
}
|
||||
should_continue = std::visit([this](auto &&msg) { return this->Process(std::forward<decltype(msg)>(msg)); },
|
||||
std::move(message));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -228,7 +228,7 @@ Hlc ShardMap::IncrementShardMapVersion() noexcept {
|
||||
return shard_map_version;
|
||||
}
|
||||
|
||||
// TODO(antaljanosbenjamin) use a single map for all name id
|
||||
// TODO(antaljanosbenjamin) use a single map for all name id
|
||||
// mapping and a single counter to maintain the next id
|
||||
std::unordered_map<uint64_t, std::string> ShardMap::IdToNames() {
|
||||
std::unordered_map<uint64_t, std::string> id_to_names;
|
||||
@ -248,6 +248,25 @@ std::unordered_map<uint64_t, std::string> ShardMap::IdToNames() {
|
||||
|
||||
Hlc ShardMap::GetHlc() const noexcept { return shard_map_version; }
|
||||
|
||||
boost::uuids::uuid NewShardUuid(uint64_t shard_id) {
|
||||
return boost::uuids::uuid{0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
static_cast<unsigned char>(shard_id >> 56),
|
||||
static_cast<unsigned char>(shard_id >> 48),
|
||||
static_cast<unsigned char>(shard_id >> 40),
|
||||
static_cast<unsigned char>(shard_id >> 32),
|
||||
static_cast<unsigned char>(shard_id >> 24),
|
||||
static_cast<unsigned char>(shard_id >> 16),
|
||||
static_cast<unsigned char>(shard_id >> 8),
|
||||
static_cast<unsigned char>(shard_id)};
|
||||
}
|
||||
|
||||
std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
|
||||
std::set<boost::uuids::uuid> initialized) {
|
||||
std::vector<ShardToInitialize> ret{};
|
||||
@ -268,6 +287,7 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
|
||||
if (initialized.contains(aas.address.unique_id)) {
|
||||
machine_contains_shard = true;
|
||||
if (aas.status != Status::CONSENSUS_PARTICIPANT) {
|
||||
mutated = true;
|
||||
spdlog::info("marking shard as full consensus participant: {}", aas.address.unique_id);
|
||||
aas.status = Status::CONSENSUS_PARTICIPANT;
|
||||
}
|
||||
@ -292,10 +312,13 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
|
||||
}
|
||||
|
||||
if (!machine_contains_shard && shard.size() < label_space.replication_factor) {
|
||||
// increment version for each new uuid for deterministic creation
|
||||
IncrementShardMapVersion();
|
||||
|
||||
Address address = storage_manager;
|
||||
|
||||
// TODO(tyler) use deterministic UUID so that coordinators don't diverge here
|
||||
address.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()},
|
||||
address.unique_id = NewShardUuid(shard_map_version.logical_id);
|
||||
|
||||
spdlog::info("assigning shard manager to shard");
|
||||
|
||||
@ -383,6 +406,7 @@ std::optional<LabelId> ShardMap::InitializeNewLabel(std::string label_name, std:
|
||||
void ShardMap::AddServer(Address server_address) {
|
||||
// Find a random place for the server to plug in
|
||||
}
|
||||
|
||||
std::optional<LabelId> ShardMap::GetLabelId(const std::string &label) const {
|
||||
if (const auto it = labels.find(label); it != labels.end()) {
|
||||
return it->second;
|
||||
|
@ -68,12 +68,31 @@ struct Address {
|
||||
};
|
||||
}
|
||||
|
||||
// NB: don't use this in test code because it is non-deterministic
|
||||
static Address UniqueLocalAddress() {
|
||||
return Address{
|
||||
.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()},
|
||||
};
|
||||
}
|
||||
|
||||
/// `Coordinator`s have constant UUIDs because there is at most one per ip/port pair.
|
||||
Address ForkLocalCoordinator() {
|
||||
return Address{
|
||||
.unique_id = boost::uuids::uuid{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
.last_known_ip = last_known_ip,
|
||||
.last_known_port = last_known_port,
|
||||
};
|
||||
}
|
||||
|
||||
/// `ShardManager`s have constant UUIDs because there is at most one per ip/port pair.
|
||||
Address ForkLocalShardManager() {
|
||||
return Address{
|
||||
.unique_id = boost::uuids::uuid{2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
.last_known_ip = last_known_ip,
|
||||
.last_known_port = last_known_port,
|
||||
};
|
||||
}
|
||||
|
||||
/// Returns a new ID with the same IP and port but a unique UUID.
|
||||
Address ForkUniqueAddress() {
|
||||
return Address{
|
||||
|
@ -137,7 +137,14 @@ class Io {
|
||||
Address GetAddress() { return address_; }
|
||||
void SetAddress(Address address) { address_ = address; }
|
||||
|
||||
Io<I> ForkLocal() { return Io(implementation_, address_.ForkUniqueAddress()); }
|
||||
Io<I> ForkLocal(boost::uuids::uuid uuid) {
|
||||
Address new_address{
|
||||
.unique_id = uuid,
|
||||
.last_known_ip = address_.last_known_ip,
|
||||
.last_known_port = address_.last_known_port,
|
||||
};
|
||||
return Io(implementation_, new_address);
|
||||
}
|
||||
|
||||
LatencyHistogramSummaries ResponseLatencies() { return implementation_.ResponseLatencies(); }
|
||||
};
|
||||
|
@ -42,6 +42,7 @@ struct MachineConfig {
|
||||
boost::asio::ip::address listen_ip;
|
||||
uint16_t listen_port;
|
||||
size_t shard_worker_threads = std::max(static_cast<unsigned int>(1), std::thread::hardware_concurrency());
|
||||
bool sync_message_handling = false;
|
||||
};
|
||||
|
||||
} // namespace memgraph::machine_manager
|
||||
|
@ -78,10 +78,10 @@ class MachineManager {
|
||||
MachineManager(io::Io<IoImpl> io, MachineConfig config, Coordinator coordinator)
|
||||
: io_(io),
|
||||
config_(config),
|
||||
coordinator_address_(io.GetAddress().ForkUniqueAddress()),
|
||||
shard_manager_{io.ForkLocal(), config.shard_worker_threads, coordinator_address_} {
|
||||
auto coordinator_io = io.ForkLocal();
|
||||
coordinator_io.SetAddress(coordinator_address_);
|
||||
coordinator_address_(io.GetAddress().ForkLocalCoordinator()),
|
||||
shard_manager_{io.ForkLocal(io.GetAddress().ForkLocalShardManager().unique_id), config.shard_worker_threads,
|
||||
coordinator_address_} {
|
||||
auto coordinator_io = io.ForkLocal(coordinator_address_.unique_id);
|
||||
CoordinatorWorker coordinator_worker{coordinator_io, coordinator_queue_, coordinator};
|
||||
coordinator_handle_ = std::jthread([coordinator = std::move(coordinator_worker)]() mutable { coordinator.Run(); });
|
||||
}
|
||||
@ -101,11 +101,23 @@ class MachineManager {
|
||||
Address CoordinatorAddress() { return coordinator_address_; }
|
||||
|
||||
void Run() {
|
||||
while (!io_.ShouldShutDown()) {
|
||||
while (true) {
|
||||
MaybeBlockOnSyncHandling();
|
||||
|
||||
if (io_.ShouldShutDown()) {
|
||||
break;
|
||||
}
|
||||
|
||||
const auto now = io_.Now();
|
||||
|
||||
uint64_t now_us = now.time_since_epoch().count();
|
||||
uint64_t next_us = next_cron_.time_since_epoch().count();
|
||||
|
||||
if (now >= next_cron_) {
|
||||
spdlog::info("now {} >= next_cron_ {}", now_us, next_us);
|
||||
next_cron_ = Cron();
|
||||
} else {
|
||||
spdlog::info("now {} < next_cron_ {}", now_us, next_us);
|
||||
}
|
||||
|
||||
Duration receive_timeout = std::max(next_cron_, now) - now;
|
||||
@ -194,10 +206,27 @@ class MachineManager {
|
||||
}
|
||||
|
||||
private:
|
||||
// This method exists for controlling concurrency
|
||||
// during deterministic simulation testing.
|
||||
void MaybeBlockOnSyncHandling() {
|
||||
if (!config_.sync_message_handling) {
|
||||
return;
|
||||
}
|
||||
|
||||
// block on coordinator
|
||||
coordinator_queue_.BlockOnQuiescence();
|
||||
|
||||
// block on shards
|
||||
shard_manager_.BlockOnQuiescence();
|
||||
}
|
||||
|
||||
Time Cron() {
|
||||
spdlog::info("running MachineManager::Cron, address {}", io_.GetAddress().ToString());
|
||||
coordinator_queue_.Push(coordinator::coordinator_worker::Cron{});
|
||||
return shard_manager_.Cron();
|
||||
MaybeBlockOnSyncHandling();
|
||||
Time ret = shard_manager_.Cron();
|
||||
MaybeBlockOnSyncHandling();
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -800,7 +800,11 @@ InterpreterContext::InterpreterContext(storage::v3::Shard *db, const Interpreter
|
||||
|
||||
Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {
|
||||
MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");
|
||||
auto query_io = interpreter_context_->io.ForkLocal();
|
||||
|
||||
// TODO(tyler) make this deterministic so that it can be tested.
|
||||
auto random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()};
|
||||
auto query_io = interpreter_context_->io.ForkLocal(random_uuid);
|
||||
|
||||
shard_request_manager_ = std::make_unique<msgs::ShardRequestManager<io::local_transport::LocalTransport>>(
|
||||
coordinator::CoordinatorClient<io::local_transport::LocalTransport>(
|
||||
query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}),
|
||||
|
@ -190,6 +190,12 @@ class ShardManager {
|
||||
});
|
||||
}
|
||||
|
||||
void BlockOnQuiescence() {
|
||||
for (const auto &worker : workers_) {
|
||||
worker.BlockOnQuiescence();
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
io::Io<IoImpl> io_;
|
||||
std::vector<shard_worker::Queue> workers_;
|
||||
|
@ -80,6 +80,9 @@ struct QueueInner {
|
||||
// starvation by sometimes randomizing priorities, rather than following a strict
|
||||
// prioritization.
|
||||
std::deque<Message> queue;
|
||||
|
||||
uint64_t submitted = 0;
|
||||
uint64_t calls_to_pop = 0;
|
||||
};
|
||||
|
||||
/// There are two reasons to implement our own Queue instead of using
|
||||
@ -95,6 +98,8 @@ class Queue {
|
||||
MG_ASSERT(inner_.use_count() > 0);
|
||||
std::unique_lock<std::mutex> lock(inner_->mu);
|
||||
|
||||
inner_->submitted++;
|
||||
|
||||
inner_->queue.emplace_back(std::forward<Message>(message));
|
||||
} // lock dropped before notifying condition variable
|
||||
|
||||
@ -105,6 +110,9 @@ class Queue {
|
||||
MG_ASSERT(inner_.use_count() > 0);
|
||||
std::unique_lock<std::mutex> lock(inner_->mu);
|
||||
|
||||
inner_->calls_to_pop++;
|
||||
inner_->cv.notify_all();
|
||||
|
||||
while (inner_->queue.empty()) {
|
||||
inner_->cv.wait(lock);
|
||||
}
|
||||
@ -114,6 +122,15 @@ class Queue {
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
void BlockOnQuiescence() const {
|
||||
MG_ASSERT(inner_.use_count() > 0);
|
||||
std::unique_lock<std::mutex> lock(inner_->mu);
|
||||
|
||||
while (inner_->calls_to_pop <= inner_->submitted) {
|
||||
inner_->cv.wait(lock);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// A ShardWorker owns Raft<ShardRsm> instances. receives messages from the ShardManager.
|
||||
@ -122,7 +139,6 @@ class ShardWorker {
|
||||
io::Io<IoImpl> io_;
|
||||
Queue queue_;
|
||||
std::priority_queue<std::pair<Time, uuid>, std::vector<std::pair<Time, uuid>>, std::greater<>> cron_schedule_;
|
||||
Time next_cron_ = Time::min();
|
||||
std::map<uuid, ShardRaft<IoImpl>> rsm_map_;
|
||||
|
||||
bool Process(ShutDown && /* shut_down */) { return false; }
|
||||
@ -175,10 +191,7 @@ class ShardWorker {
|
||||
return;
|
||||
}
|
||||
|
||||
auto rsm_io = io_.ForkLocal();
|
||||
auto io_addr = rsm_io.GetAddress();
|
||||
io_addr.unique_id = to_init.uuid;
|
||||
rsm_io.SetAddress(io_addr);
|
||||
auto rsm_io = io_.ForkLocal(to_init.uuid);
|
||||
|
||||
// TODO(tyler) get peers from Coordinator in HeartbeatResponse
|
||||
std::vector<Address> rsm_peers = {};
|
||||
@ -208,15 +221,12 @@ class ShardWorker {
|
||||
~ShardWorker() = default;
|
||||
|
||||
void Run() {
|
||||
while (true) {
|
||||
bool should_continue = true;
|
||||
while (should_continue) {
|
||||
Message message = queue_.Pop();
|
||||
|
||||
const bool should_continue =
|
||||
should_continue =
|
||||
std::visit([&](auto &&msg) { return Process(std::forward<decltype(msg)>(msg)); }, std::move(message));
|
||||
|
||||
if (!should_continue) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -33,21 +33,31 @@ using io::Time;
|
||||
using io::simulator::SimulatorConfig;
|
||||
using storage::v3::kMaximumCronInterval;
|
||||
|
||||
RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops)) {
|
||||
// TODO(tyler) set abort_time to something more restrictive than Time::max()
|
||||
|
||||
RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops, uint64_t rng_seed)) {
|
||||
spdlog::cfg::load_env_levels();
|
||||
|
||||
SimulatorConfig sim_config{
|
||||
.drop_percent = 0,
|
||||
.perform_timeouts = false,
|
||||
.scramble_messages = true,
|
||||
.rng_seed = 0,
|
||||
.rng_seed = rng_seed,
|
||||
.start_time = Time::min(),
|
||||
// TODO(tyler) set abort_time to something more restrictive than Time::max()
|
||||
.abort_time = Time::max(),
|
||||
};
|
||||
|
||||
RunClusterSimulation(sim_config, cluster_config, ops.ops);
|
||||
auto [sim_stats_1, latency_stats_1] = RunClusterSimulation(sim_config, cluster_config, ops.ops);
|
||||
auto [sim_stats_2, latency_stats_2] = RunClusterSimulation(sim_config, cluster_config, ops.ops);
|
||||
|
||||
if (latency_stats_1 != latency_stats_2) {
|
||||
spdlog::error("simulator stats diverged across runs");
|
||||
spdlog::error("run 1 simulator stats: {}", sim_stats_1);
|
||||
spdlog::error("run 2 simulator stats: {}", sim_stats_2);
|
||||
spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable());
|
||||
spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable());
|
||||
RC_ASSERT(latency_stats_1 == latency_stats_2);
|
||||
RC_ASSERT(sim_stats_1 == sim_stats_2);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace memgraph::tests::simulation
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "coordinator/shard_map.hpp"
|
||||
#include "generated_operations.hpp"
|
||||
#include "io/address.hpp"
|
||||
#include "io/message_histogram_collector.hpp"
|
||||
#include "io/simulator/simulator.hpp"
|
||||
#include "io/simulator/simulator_config.hpp"
|
||||
#include "io/simulator/simulator_transport.hpp"
|
||||
@ -57,6 +58,7 @@ using io::simulator::SimulatorStats;
|
||||
using io::simulator::SimulatorTransport;
|
||||
using machine_manager::MachineConfig;
|
||||
using machine_manager::MachineManager;
|
||||
using memgraph::io::LatencyHistogramSummaries;
|
||||
using msgs::ReadRequests;
|
||||
using msgs::ReadResponses;
|
||||
using msgs::WriteRequests;
|
||||
@ -75,6 +77,8 @@ MachineManager<SimulatorTransport> MkMm(Simulator &simulator, std::vector<Addres
|
||||
.is_coordinator = true,
|
||||
.listen_ip = addr.last_known_ip,
|
||||
.listen_port = addr.last_known_port,
|
||||
.shard_worker_threads = 4,
|
||||
.sync_message_handling = true,
|
||||
};
|
||||
|
||||
Io<SimulatorTransport> io = simulator.Register(addr);
|
||||
@ -210,17 +214,19 @@ struct DetachIfDropped {
|
||||
}
|
||||
};
|
||||
|
||||
void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig &cluster_config,
|
||||
const std::vector<Op> &ops) {
|
||||
std::pair<SimulatorStats, LatencyHistogramSummaries> RunClusterSimulation(const SimulatorConfig &sim_config,
|
||||
const ClusterConfig &cluster_config,
|
||||
const std::vector<Op> &ops) {
|
||||
spdlog::info("========================== NEW SIMULATION ==========================");
|
||||
|
||||
auto simulator = Simulator(sim_config);
|
||||
|
||||
auto cli_addr = Address::TestAddress(1);
|
||||
auto machine_1_addr = cli_addr.ForkUniqueAddress();
|
||||
auto machine_1_addr = Address::TestAddress(1);
|
||||
auto cli_addr = Address::TestAddress(2);
|
||||
auto cli_addr_2 = Address::TestAddress(3);
|
||||
|
||||
Io<SimulatorTransport> cli_io = simulator.Register(cli_addr);
|
||||
Io<SimulatorTransport> cli_io_2 = simulator.Register(Address::TestAddress(2));
|
||||
Io<SimulatorTransport> cli_io_2 = simulator.Register(cli_addr_2);
|
||||
|
||||
auto coordinator_addresses = std::vector{
|
||||
machine_1_addr,
|
||||
@ -232,6 +238,7 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
|
||||
Address coordinator_address = mm_1.CoordinatorAddress();
|
||||
|
||||
auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1));
|
||||
simulator.IncrementServerCountAndWaitForQuiescentState(machine_1_addr);
|
||||
|
||||
auto detach_on_error = DetachIfDropped{.handle = mm_thread_1};
|
||||
|
||||
@ -257,6 +264,8 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
|
||||
|
||||
simulator.ShutDown();
|
||||
|
||||
mm_thread_1.join();
|
||||
|
||||
SimulatorStats stats = simulator.Stats();
|
||||
|
||||
spdlog::info("total messages: {}", stats.total_messages);
|
||||
@ -268,10 +277,8 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
|
||||
|
||||
auto histo = cli_io_2.ResponseLatencies();
|
||||
|
||||
using memgraph::utils::print_helpers::operator<<;
|
||||
std::cout << "response latencies: " << histo << std::endl;
|
||||
|
||||
spdlog::info("========================== SUCCESS :) ==========================");
|
||||
return std::make_pair(stats, histo);
|
||||
}
|
||||
|
||||
} // namespace memgraph::tests::simulation
|
||||
|
Loading…
Reference in New Issue
Block a user