Merge branch 'project-pineapples' into T1159-MG-Add-memgraph-functions

This commit is contained in:
Kostas Kyrimis 2022-11-23 14:53:53 +02:00
commit 407418e8f5
34 changed files with 404 additions and 198 deletions

View File

@ -71,6 +71,9 @@ struct QueueInner {
// starvation by sometimes randomizing priorities, rather than following a strict // starvation by sometimes randomizing priorities, rather than following a strict
// prioritization. // prioritization.
std::deque<Message> queue; std::deque<Message> queue;
uint64_t submitted = 0;
uint64_t calls_to_pop = 0;
}; };
/// There are two reasons to implement our own Queue instead of using /// There are two reasons to implement our own Queue instead of using
@ -86,6 +89,8 @@ class Queue {
MG_ASSERT(inner_.use_count() > 0); MG_ASSERT(inner_.use_count() > 0);
std::unique_lock<std::mutex> lock(inner_->mu); std::unique_lock<std::mutex> lock(inner_->mu);
inner_->submitted++;
inner_->queue.emplace_back(std::move(message)); inner_->queue.emplace_back(std::move(message));
} // lock dropped before notifying condition variable } // lock dropped before notifying condition variable
@ -96,6 +101,9 @@ class Queue {
MG_ASSERT(inner_.use_count() > 0); MG_ASSERT(inner_.use_count() > 0);
std::unique_lock<std::mutex> lock(inner_->mu); std::unique_lock<std::mutex> lock(inner_->mu);
inner_->calls_to_pop++;
inner_->cv.notify_all();
while (inner_->queue.empty()) { while (inner_->queue.empty()) {
inner_->cv.wait(lock); inner_->cv.wait(lock);
} }
@ -105,6 +113,15 @@ class Queue {
return message; return message;
} }
void BlockOnQuiescence() const {
MG_ASSERT(inner_.use_count() > 0);
std::unique_lock<std::mutex> lock(inner_->mu);
while (inner_->calls_to_pop <= inner_->submitted) {
inner_->cv.wait(lock);
}
}
}; };
/// A CoordinatorWorker owns Raft<CoordinatorRsm> instances. receives messages from the MachineManager. /// A CoordinatorWorker owns Raft<CoordinatorRsm> instances. receives messages from the MachineManager.
@ -129,9 +146,7 @@ class CoordinatorWorker {
public: public:
CoordinatorWorker(io::Io<IoImpl> io, Queue queue, Coordinator coordinator) CoordinatorWorker(io::Io<IoImpl> io, Queue queue, Coordinator coordinator)
: io_(std::move(io)), : io_(std::move(io)), queue_(std::move(queue)), coordinator_{std::move(io_), {}, std::move(coordinator)} {}
queue_(std::move(queue)),
coordinator_{std::move(io_.ForkLocal()), {}, std::move(coordinator)} {}
CoordinatorWorker(CoordinatorWorker &&) noexcept = default; CoordinatorWorker(CoordinatorWorker &&) noexcept = default;
CoordinatorWorker &operator=(CoordinatorWorker &&) noexcept = default; CoordinatorWorker &operator=(CoordinatorWorker &&) noexcept = default;
@ -140,15 +155,12 @@ class CoordinatorWorker {
~CoordinatorWorker() = default; ~CoordinatorWorker() = default;
void Run() { void Run() {
while (true) { bool should_continue = true;
while (should_continue) {
Message message = queue_.Pop(); Message message = queue_.Pop();
const bool should_continue = std::visit( should_continue = std::visit([this](auto &&msg) { return this->Process(std::forward<decltype(msg)>(msg)); },
[this](auto &&msg) { return this->Process(std::forward<decltype(msg)>(msg)); }, std::move(message)); std::move(message));
if (!should_continue) {
return;
}
} }
} }
}; };

View File

@ -228,7 +228,7 @@ Hlc ShardMap::IncrementShardMapVersion() noexcept {
return shard_map_version; return shard_map_version;
} }
// TODO(antaljanosbenjamin) use a single map for all name id // TODO(antaljanosbenjamin) use a single map for all name id
// mapping and a single counter to maintain the next id // mapping and a single counter to maintain the next id
std::unordered_map<uint64_t, std::string> ShardMap::IdToNames() { std::unordered_map<uint64_t, std::string> ShardMap::IdToNames() {
std::unordered_map<uint64_t, std::string> id_to_names; std::unordered_map<uint64_t, std::string> id_to_names;
@ -248,6 +248,25 @@ std::unordered_map<uint64_t, std::string> ShardMap::IdToNames() {
Hlc ShardMap::GetHlc() const noexcept { return shard_map_version; } Hlc ShardMap::GetHlc() const noexcept { return shard_map_version; }
boost::uuids::uuid NewShardUuid(uint64_t shard_id) {
return boost::uuids::uuid{0,
0,
0,
0,
0,
0,
0,
0,
static_cast<unsigned char>(shard_id >> 56U),
static_cast<unsigned char>(shard_id >> 48U),
static_cast<unsigned char>(shard_id >> 40U),
static_cast<unsigned char>(shard_id >> 32U),
static_cast<unsigned char>(shard_id >> 24U),
static_cast<unsigned char>(shard_id >> 16U),
static_cast<unsigned char>(shard_id >> 8U),
static_cast<unsigned char>(shard_id)};
}
std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager, std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
std::set<boost::uuids::uuid> initialized) { std::set<boost::uuids::uuid> initialized) {
std::vector<ShardToInitialize> ret{}; std::vector<ShardToInitialize> ret{};
@ -268,6 +287,7 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
if (initialized.contains(aas.address.unique_id)) { if (initialized.contains(aas.address.unique_id)) {
machine_contains_shard = true; machine_contains_shard = true;
if (aas.status != Status::CONSENSUS_PARTICIPANT) { if (aas.status != Status::CONSENSUS_PARTICIPANT) {
mutated = true;
spdlog::info("marking shard as full consensus participant: {}", aas.address.unique_id); spdlog::info("marking shard as full consensus participant: {}", aas.address.unique_id);
aas.status = Status::CONSENSUS_PARTICIPANT; aas.status = Status::CONSENSUS_PARTICIPANT;
} }
@ -292,10 +312,13 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
} }
if (!machine_contains_shard && shard.size() < label_space.replication_factor) { if (!machine_contains_shard && shard.size() < label_space.replication_factor) {
// increment version for each new uuid for deterministic creation
IncrementShardMapVersion();
Address address = storage_manager; Address address = storage_manager;
// TODO(tyler) use deterministic UUID so that coordinators don't diverge here // TODO(tyler) use deterministic UUID so that coordinators don't diverge here
address.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()}, address.unique_id = NewShardUuid(shard_map_version.logical_id);
spdlog::info("assigning shard manager to shard"); spdlog::info("assigning shard manager to shard");
@ -383,6 +406,7 @@ std::optional<LabelId> ShardMap::InitializeNewLabel(std::string label_name, std:
void ShardMap::AddServer(Address server_address) { void ShardMap::AddServer(Address server_address) {
// Find a random place for the server to plug in // Find a random place for the server to plug in
} }
std::optional<LabelId> ShardMap::GetLabelId(const std::string &label) const { std::optional<LabelId> ShardMap::GetLabelId(const std::string &label) const {
if (const auto it = labels.find(label); it != labels.end()) { if (const auto it = labels.find(label); it != labels.end()) {
return it->second; return it->second;

View File

@ -20,6 +20,8 @@
#include <boost/uuid/uuid_generators.hpp> #include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp> #include <boost/uuid/uuid_io.hpp>
#include "utils/logging.hpp"
namespace memgraph::io { namespace memgraph::io {
struct PartialAddress { struct PartialAddress {
@ -58,18 +60,39 @@ struct Address {
uint16_t last_known_port; uint16_t last_known_port;
static Address TestAddress(uint16_t port) { static Address TestAddress(uint16_t port) {
MG_ASSERT(port <= 255);
return Address{ return Address{
.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()}, .unique_id = boost::uuids::uuid{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, static_cast<unsigned char>(port)},
.last_known_port = port, .last_known_port = port,
}; };
} }
// NB: don't use this in test code because it is non-deterministic
static Address UniqueLocalAddress() { static Address UniqueLocalAddress() {
return Address{ return Address{
.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()}, .unique_id = boost::uuids::uuid{boost::uuids::random_generator()()},
}; };
} }
/// `Coordinator`s have constant UUIDs because there is at most one per ip/port pair.
Address ForkLocalCoordinator() {
return Address{
.unique_id = boost::uuids::uuid{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
.last_known_ip = last_known_ip,
.last_known_port = last_known_port,
};
}
/// `ShardManager`s have constant UUIDs because there is at most one per ip/port pair.
Address ForkLocalShardManager() {
return Address{
.unique_id = boost::uuids::uuid{2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
.last_known_ip = last_known_ip,
.last_known_port = last_known_port,
};
}
/// Returns a new ID with the same IP and port but a unique UUID. /// Returns a new ID with the same IP and port but a unique UUID.
Address ForkUniqueAddress() { Address ForkUniqueAddress() {
return Address{ return Address{

View File

@ -64,7 +64,6 @@ class Shared {
waiting_ = true; waiting_ = true;
while (!item_) { while (!item_) {
bool simulator_progressed = false;
if (simulator_notifier_) [[unlikely]] { if (simulator_notifier_) [[unlikely]] {
// We can't hold our own lock while notifying // We can't hold our own lock while notifying
// the simulator because notifying the simulator // the simulator because notifying the simulator
@ -77,7 +76,7 @@ class Shared {
// so we have to get out of its way to avoid // so we have to get out of its way to avoid
// a cyclical deadlock. // a cyclical deadlock.
lock.unlock(); lock.unlock();
simulator_progressed = std::invoke(simulator_notifier_); std::invoke(simulator_notifier_);
lock.lock(); lock.lock();
if (item_) { if (item_) {
// item may have been filled while we // item may have been filled while we
@ -85,8 +84,7 @@ class Shared {
// the simulator of our waiting_ status. // the simulator of our waiting_ status.
break; break;
} }
} } else {
if (!simulator_progressed) [[likely]] {
cv_.wait(lock); cv_.wait(lock);
} }
MG_ASSERT(!consumed_, "Future consumed twice!"); MG_ASSERT(!consumed_, "Future consumed twice!");

View File

@ -13,6 +13,7 @@
#include <chrono> #include <chrono>
#include <cmath> #include <cmath>
#include <compare>
#include <unordered_map> #include <unordered_map>
#include <boost/core/demangle.hpp> #include <boost/core/demangle.hpp>
@ -39,6 +40,8 @@ struct LatencyHistogramSummary {
Duration p100; Duration p100;
Duration sum; Duration sum;
friend bool operator==(const LatencyHistogramSummary &lhs, const LatencyHistogramSummary &rhs) = default;
friend std::ostream &operator<<(std::ostream &in, const LatencyHistogramSummary &histo) { friend std::ostream &operator<<(std::ostream &in, const LatencyHistogramSummary &histo) {
in << "{ \"count\": " << histo.count; in << "{ \"count\": " << histo.count;
in << ", \"p0\": " << histo.p0.count(); in << ", \"p0\": " << histo.p0.count();
@ -80,6 +83,8 @@ struct LatencyHistogramSummaries {
return output; return output;
} }
friend bool operator==(const LatencyHistogramSummaries &lhs, const LatencyHistogramSummaries &rhs) = default;
friend std::ostream &operator<<(std::ostream &in, const LatencyHistogramSummaries &histo) { friend std::ostream &operator<<(std::ostream &in, const LatencyHistogramSummaries &histo) {
using memgraph::utils::print_helpers::operator<<; using memgraph::utils::print_helpers::operator<<;
in << histo.latencies; in << histo.latencies;

View File

@ -19,7 +19,6 @@
#include <map> #include <map>
#include <set> #include <set>
#include <thread> #include <thread>
#include <unordered_map>
#include <vector> #include <vector>
#include <boost/core/demangle.hpp> #include <boost/core/demangle.hpp>
@ -182,7 +181,7 @@ struct PendingClientRequest {
struct Leader { struct Leader {
std::map<Address, FollowerTracker> followers; std::map<Address, FollowerTracker> followers;
std::unordered_map<LogIndex, PendingClientRequest> pending_client_requests; std::map<LogIndex, PendingClientRequest> pending_client_requests;
Time last_broadcast = Time::min(); Time last_broadcast = Time::min();
std::string static ToString() { return "\tLeader \t"; } std::string static ToString() { return "\tLeader \t"; }
@ -683,7 +682,7 @@ class Raft {
return Leader{ return Leader{
.followers = std::move(followers), .followers = std::move(followers),
.pending_client_requests = std::unordered_map<LogIndex, PendingClientRequest>(), .pending_client_requests = std::map<LogIndex, PendingClientRequest>(),
}; };
} }

View File

@ -23,6 +23,12 @@ namespace memgraph::io::simulator {
void SimulatorHandle::ShutDown() { void SimulatorHandle::ShutDown() {
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
should_shut_down_ = true; should_shut_down_ = true;
for (auto it = promises_.begin(); it != promises_.end();) {
auto &[promise_key, dop] = *it;
std::move(dop).promise.TimeOut();
it = promises_.erase(it);
}
can_receive_.clear();
cv_.notify_all(); cv_.notify_all();
} }
@ -46,52 +52,84 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre
const bool all_servers_blocked = blocked_servers == server_addresses_.size(); const bool all_servers_blocked = blocked_servers == server_addresses_.size();
if (all_servers_blocked) { if (all_servers_blocked) {
spdlog::trace("quiescent state detected - {} out of {} servers now blocked on receive", blocked_servers,
server_addresses_.size());
return; return;
} }
spdlog::trace("not returning from quiescent because we see {} blocked out of {}", blocked_servers,
server_addresses_.size());
cv_.wait(lock); cv_.wait(lock);
} }
} }
bool SortInFlight(const std::pair<Address, OpaqueMessage> &lhs, const std::pair<Address, OpaqueMessage> &rhs) {
// NB: never sort based on the request ID etc...
// This should only be used from std::stable_sort
// because by comparing on the from_address alone,
// we expect the sender ordering to remain
// deterministic.
const auto &[addr_1, om_1] = lhs;
const auto &[addr_2, om_2] = rhs;
return om_1.from_address < om_2.from_address;
}
bool SimulatorHandle::MaybeTickSimulator() { bool SimulatorHandle::MaybeTickSimulator() {
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
const size_t blocked_servers = blocked_on_receive_.size(); const size_t blocked_servers = blocked_on_receive_.size();
if (blocked_servers < server_addresses_.size()) { if (should_shut_down_ || blocked_servers < server_addresses_.size()) {
// we only need to advance the simulator when all // we only need to advance the simulator when all
// servers have reached a quiescent state, blocked // servers have reached a quiescent state, blocked
// on their own futures or receive methods. // on their own futures or receive methods.
return false; return false;
} }
// We allow the simulator to progress the state of the system only
// after all servers are blocked on receive.
spdlog::trace("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ simulator tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
stats_.simulator_ticks++; stats_.simulator_ticks++;
blocked_on_receive_.clear();
cv_.notify_all(); cv_.notify_all();
TimeoutPromisesPastDeadline(); bool timed_anything_out = TimeoutPromisesPastDeadline();
if (timed_anything_out) {
spdlog::trace("simulator progressing: timed out a request");
}
const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)};
// We don't always want to advance the clock with every message that we deliver because
// when we advance it for every message, it causes timeouts to occur for any "request path"
// over a certain length. Alternatively, we don't want to simply deliver multiple messages
// in a single simulator tick because that would reduce the amount of concurrent message
// mixing that may naturally occur in production. This approach is to mod the random clock
// advance by a prime number (hopefully avoiding most harmonic effects that would be introduced
// by only advancing the clock by an even amount etc...) and only advancing the clock close to
// half of the time.
if (clock_advance.count() % 97 > 49) {
spdlog::trace("simulator progressing: clock advanced by {}", clock_advance.count());
cluster_wide_time_microseconds_ += clock_advance;
stats_.elapsed_time = cluster_wide_time_microseconds_ - config_.start_time;
}
if (cluster_wide_time_microseconds_ >= config_.abort_time) {
spdlog::error(
"Cluster has executed beyond its configured abort_time, and something may be failing to make progress "
"in an expected amount of time. The SimulatorConfig.rng_seed for this run is {}",
config_.rng_seed);
throw utils::BasicException{"Cluster has executed beyond its configured abort_time"};
}
if (in_flight_.empty()) { if (in_flight_.empty()) {
// return early here because there are no messages to schedule
// We tick the clock forward when all servers are blocked but
// there are no in-flight messages to schedule delivery of.
const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)};
cluster_wide_time_microseconds_ += clock_advance;
if (cluster_wide_time_microseconds_ >= config_.abort_time) {
if (should_shut_down_) {
return false;
}
spdlog::error(
"Cluster has executed beyond its configured abort_time, and something may be failing to make progress "
"in an expected amount of time.");
throw utils::BasicException{"Cluster has executed beyond its configured abort_time"};
}
return true; return true;
} }
if (config_.scramble_messages) { std::stable_sort(in_flight_.begin(), in_flight_.end(), SortInFlight);
if (config_.scramble_messages && in_flight_.size() > 1) {
// scramble messages // scramble messages
std::uniform_int_distribution<size_t> swap_distrib(0, in_flight_.size() - 1); std::uniform_int_distribution<size_t> swap_distrib(0, in_flight_.size() - 1);
const size_t swap_index = swap_distrib(rng_); const size_t swap_index = swap_distrib(rng_);
@ -120,17 +158,22 @@ bool SimulatorHandle::MaybeTickSimulator() {
if (should_drop || normal_timeout) { if (should_drop || normal_timeout) {
stats_.timed_out_requests++; stats_.timed_out_requests++;
dop.promise.TimeOut(); dop.promise.TimeOut();
spdlog::trace("simulator timing out request ");
} else { } else {
stats_.total_responses++; stats_.total_responses++;
Duration response_latency = cluster_wide_time_microseconds_ - dop.requested_at; Duration response_latency = cluster_wide_time_microseconds_ - dop.requested_at;
auto type_info = opaque_message.type_info; auto type_info = opaque_message.type_info;
dop.promise.Fill(std::move(opaque_message), response_latency); dop.promise.Fill(std::move(opaque_message), response_latency);
histograms_.Measure(type_info, response_latency); histograms_.Measure(type_info, response_latency);
spdlog::trace("simulator replying to request");
} }
} else if (should_drop) { } else if (should_drop) {
// don't add it anywhere, let it drop // don't add it anywhere, let it drop
spdlog::trace("simulator silently dropping request");
} else { } else {
// add to can_receive_ if not // add to can_receive_ if not
spdlog::trace("simulator adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port,
opaque_message.to_address.last_known_port);
const auto &[om_vec, inserted] = const auto &[om_vec, inserted] =
can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector<OpaqueMessage>()); can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector<OpaqueMessage>());
om_vec->second.emplace_back(std::move(opaque_message)); om_vec->second.emplace_back(std::move(opaque_message));

View File

@ -52,26 +52,29 @@ class SimulatorHandle {
std::set<Address> blocked_on_receive_; std::set<Address> blocked_on_receive_;
std::set<Address> server_addresses_; std::set<Address> server_addresses_;
std::mt19937 rng_; std::mt19937 rng_;
std::uniform_int_distribution<int> time_distrib_{5, 50}; std::uniform_int_distribution<int> time_distrib_{0, 30000};
std::uniform_int_distribution<int> drop_distrib_{0, 99}; std::uniform_int_distribution<int> drop_distrib_{0, 99};
SimulatorConfig config_; SimulatorConfig config_;
MessageHistogramCollector histograms_; MessageHistogramCollector histograms_;
RequestId request_id_counter_{0}; RequestId request_id_counter_{0};
void TimeoutPromisesPastDeadline() { bool TimeoutPromisesPastDeadline() {
bool timed_anything_out = false;
const Time now = cluster_wide_time_microseconds_; const Time now = cluster_wide_time_microseconds_;
for (auto it = promises_.begin(); it != promises_.end();) { for (auto it = promises_.begin(); it != promises_.end();) {
auto &[promise_key, dop] = *it; auto &[promise_key, dop] = *it;
if (dop.deadline < now && config_.perform_timeouts) { if (dop.deadline < now && config_.perform_timeouts) {
spdlog::info("timing out request from requester {}.", promise_key.requester_address.ToString()); spdlog::trace("timing out request from requester {}.", promise_key.requester_address.ToString());
std::move(dop).promise.TimeOut(); std::move(dop).promise.TimeOut();
it = promises_.erase(it); it = promises_.erase(it);
stats_.timed_out_requests++; stats_.timed_out_requests++;
timed_anything_out = true;
} else { } else {
++it; ++it;
} }
} }
return timed_anything_out;
} }
public: public:
@ -103,6 +106,7 @@ class SimulatorHandle {
template <Message Request, Message Response> template <Message Request, Message Response>
ResponseFuture<Response> SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout, ResponseFuture<Response> SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout,
std::function<bool()> &&maybe_tick_simulator) { std::function<bool()> &&maybe_tick_simulator) {
spdlog::trace("submitting request to {}", to_address.last_known_port);
auto type_info = TypeInfoFor(request); auto type_info = TypeInfoFor(request);
auto [future, promise] = memgraph::io::FuturePromisePairWithNotifier<ResponseResult<Response>>( auto [future, promise] = memgraph::io::FuturePromisePairWithNotifier<ResponseResult<Response>>(
@ -146,8 +150,6 @@ class SimulatorHandle {
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(const Address &receiver, Duration timeout) { requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(const Address &receiver, Duration timeout) {
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
blocked_on_receive_.emplace(receiver);
const Time deadline = cluster_wide_time_microseconds_ + timeout; const Time deadline = cluster_wide_time_microseconds_ + timeout;
auto partial_address = receiver.ToPartialAddress(); auto partial_address = receiver.ToPartialAddress();
@ -164,38 +166,40 @@ class SimulatorHandle {
auto m_opt = std::move(message).Take<Ms...>(); auto m_opt = std::move(message).Take<Ms...>();
MG_ASSERT(m_opt.has_value(), "Wrong message type received compared to the expected type"); MG_ASSERT(m_opt.has_value(), "Wrong message type received compared to the expected type");
blocked_on_receive_.erase(receiver);
return std::move(m_opt).value(); return std::move(m_opt).value();
} }
} }
lock.unlock(); if (!should_shut_down_) {
bool made_progress = MaybeTickSimulator(); if (!blocked_on_receive_.contains(receiver)) {
lock.lock(); blocked_on_receive_.emplace(receiver);
if (!should_shut_down_ && !made_progress) { spdlog::trace("blocking receiver {}", receiver.ToPartialAddress().port);
cv_.notify_all();
}
cv_.wait(lock); cv_.wait(lock);
} }
} }
spdlog::trace("timing out receiver {}", receiver.ToPartialAddress().port);
blocked_on_receive_.erase(receiver);
return TimedOut{}; return TimedOut{};
} }
template <Message M> template <Message M>
void Send(Address to_address, Address from_address, RequestId request_id, M message) { void Send(Address to_address, Address from_address, RequestId request_id, M message) {
spdlog::trace("sending message from {} to {}", from_address.last_known_port, to_address.last_known_port);
auto type_info = TypeInfoFor(message); auto type_info = TypeInfoFor(message);
std::unique_lock<std::mutex> lock(mu_); {
std::any message_any(std::move(message)); std::unique_lock<std::mutex> lock(mu_);
OpaqueMessage om{.to_address = to_address, std::any message_any(std::move(message));
.from_address = from_address, OpaqueMessage om{.to_address = to_address,
.request_id = request_id, .from_address = from_address,
.message = std::move(message_any), .request_id = request_id,
.type_info = type_info}; .message = std::move(message_any),
in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om))); .type_info = type_info};
in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om)));
stats_.total_messages++; stats_.total_messages++;
} // lock dropped before cv notification
cv_.notify_all(); cv_.notify_all();
} }

View File

@ -13,6 +13,10 @@
#include <cstdint> #include <cstdint>
#include <fmt/format.h>
#include "io/time.hpp"
namespace memgraph::io::simulator { namespace memgraph::io::simulator {
struct SimulatorStats { struct SimulatorStats {
uint64_t total_messages = 0; uint64_t total_messages = 0;
@ -21,5 +25,22 @@ struct SimulatorStats {
uint64_t total_requests = 0; uint64_t total_requests = 0;
uint64_t total_responses = 0; uint64_t total_responses = 0;
uint64_t simulator_ticks = 0; uint64_t simulator_ticks = 0;
Duration elapsed_time;
friend bool operator==(const SimulatorStats & /* lhs */, const SimulatorStats & /* rhs */) = default;
friend std::ostream &operator<<(std::ostream &in, const SimulatorStats &stats) {
auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(stats.elapsed_time).count();
std::string formated = fmt::format(
"SimulatorStats {{ total_messages: {}, dropped_messages: {}, timed_out_requests: {}, total_requests: {}, "
"total_responses: {}, simulator_ticks: {}, elapsed_time: {}ms }}",
stats.total_messages, stats.dropped_messages, stats.timed_out_requests, stats.total_requests,
stats.total_responses, stats.simulator_ticks, elapsed_ms);
in << formated;
return in;
}
}; };
}; // namespace memgraph::io::simulator }; // namespace memgraph::io::simulator

View File

@ -137,7 +137,14 @@ class Io {
Address GetAddress() { return address_; } Address GetAddress() { return address_; }
void SetAddress(Address address) { address_ = address; } void SetAddress(Address address) { address_ = address; }
Io<I> ForkLocal() { return Io(implementation_, address_.ForkUniqueAddress()); } Io<I> ForkLocal(boost::uuids::uuid uuid) {
Address new_address{
.unique_id = uuid,
.last_known_ip = address_.last_known_ip,
.last_known_port = address_.last_known_port,
};
return Io(implementation_, new_address);
}
LatencyHistogramSummaries ResponseLatencies() { return implementation_.ResponseLatencies(); } LatencyHistogramSummaries ResponseLatencies() { return implementation_.ResponseLatencies(); }
}; };

View File

@ -42,6 +42,7 @@ struct MachineConfig {
boost::asio::ip::address listen_ip; boost::asio::ip::address listen_ip;
uint16_t listen_port; uint16_t listen_port;
size_t shard_worker_threads = std::max(static_cast<unsigned int>(1), std::thread::hardware_concurrency()); size_t shard_worker_threads = std::max(static_cast<unsigned int>(1), std::thread::hardware_concurrency());
bool sync_message_handling = false;
}; };
} // namespace memgraph::machine_manager } // namespace memgraph::machine_manager

View File

@ -78,10 +78,10 @@ class MachineManager {
MachineManager(io::Io<IoImpl> io, MachineConfig config, Coordinator coordinator) MachineManager(io::Io<IoImpl> io, MachineConfig config, Coordinator coordinator)
: io_(io), : io_(io),
config_(config), config_(config),
coordinator_address_(io.GetAddress().ForkUniqueAddress()), coordinator_address_(io.GetAddress().ForkLocalCoordinator()),
shard_manager_{io.ForkLocal(), config.shard_worker_threads, coordinator_address_} { shard_manager_{io.ForkLocal(io.GetAddress().ForkLocalShardManager().unique_id), config.shard_worker_threads,
auto coordinator_io = io.ForkLocal(); coordinator_address_} {
coordinator_io.SetAddress(coordinator_address_); auto coordinator_io = io.ForkLocal(coordinator_address_.unique_id);
CoordinatorWorker coordinator_worker{coordinator_io, coordinator_queue_, coordinator}; CoordinatorWorker coordinator_worker{coordinator_io, coordinator_queue_, coordinator};
coordinator_handle_ = std::jthread([coordinator = std::move(coordinator_worker)]() mutable { coordinator.Run(); }); coordinator_handle_ = std::jthread([coordinator = std::move(coordinator_worker)]() mutable { coordinator.Run(); });
} }
@ -101,11 +101,23 @@ class MachineManager {
Address CoordinatorAddress() { return coordinator_address_; } Address CoordinatorAddress() { return coordinator_address_; }
void Run() { void Run() {
while (!io_.ShouldShutDown()) { while (true) {
MaybeBlockOnSyncHandling();
if (io_.ShouldShutDown()) {
break;
}
const auto now = io_.Now(); const auto now = io_.Now();
uint64_t now_us = now.time_since_epoch().count();
uint64_t next_us = next_cron_.time_since_epoch().count();
if (now >= next_cron_) { if (now >= next_cron_) {
spdlog::info("now {} >= next_cron_ {}", now_us, next_us);
next_cron_ = Cron(); next_cron_ = Cron();
} else {
spdlog::info("now {} < next_cron_ {}", now_us, next_us);
} }
Duration receive_timeout = std::max(next_cron_, now) - now; Duration receive_timeout = std::max(next_cron_, now) - now;
@ -194,10 +206,27 @@ class MachineManager {
} }
private: private:
// This method exists for controlling concurrency
// during deterministic simulation testing.
void MaybeBlockOnSyncHandling() {
if (!config_.sync_message_handling) {
return;
}
// block on coordinator
coordinator_queue_.BlockOnQuiescence();
// block on shards
shard_manager_.BlockOnQuiescence();
}
Time Cron() { Time Cron() {
spdlog::info("running MachineManager::Cron, address {}", io_.GetAddress().ToString()); spdlog::info("running MachineManager::Cron, address {}", io_.GetAddress().ToString());
coordinator_queue_.Push(coordinator::coordinator_worker::Cron{}); coordinator_queue_.Push(coordinator::coordinator_worker::Cron{});
return shard_manager_.Cron(); MaybeBlockOnSyncHandling();
Time ret = shard_manager_.Cron();
MaybeBlockOnSyncHandling();
return ret;
} }
}; };

View File

@ -37,7 +37,7 @@
// This cannot be avoided by simple include orderings so we // This cannot be avoided by simple include orderings so we
// simply undefine those macros as we're sure that libkrb5 // simply undefine those macros as we're sure that libkrb5
// won't and can't be used anywhere in the query engine. // won't and can't be used anywhere in the query engine.
#include "storage/v3/storage.hpp" #include "storage/v3/shard.hpp"
#include "utils/logging.hpp" #include "utils/logging.hpp"
#include "utils/result.hpp" #include "utils/result.hpp"

View File

@ -47,7 +47,6 @@
#include "query/v2/shard_request_manager.hpp" #include "query/v2/shard_request_manager.hpp"
#include "storage/v3/property_value.hpp" #include "storage/v3/property_value.hpp"
#include "storage/v3/shard.hpp" #include "storage/v3/shard.hpp"
#include "storage/v3/storage.hpp"
#include "utils/algorithm.hpp" #include "utils/algorithm.hpp"
#include "utils/csv_parsing.hpp" #include "utils/csv_parsing.hpp"
#include "utils/event_counter.hpp" #include "utils/event_counter.hpp"
@ -800,7 +799,11 @@ InterpreterContext::InterpreterContext(storage::v3::Shard *db, const Interpreter
Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) { Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {
MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL"); MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");
auto query_io = interpreter_context_->io.ForkLocal();
// TODO(tyler) make this deterministic so that it can be tested.
auto random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()};
auto query_io = interpreter_context_->io.ForkLocal(random_uuid);
shard_request_manager_ = std::make_unique<msgs::ShardRequestManager<io::local_transport::LocalTransport>>( shard_request_manager_ = std::make_unique<msgs::ShardRequestManager<io::local_transport::LocalTransport>>(
coordinator::CoordinatorClient<io::local_transport::LocalTransport>( coordinator::CoordinatorClient<io::local_transport::LocalTransport>(
query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}), query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}),

View File

@ -16,12 +16,10 @@ set(storage_v3_src_files
schemas.cpp schemas.cpp
schema_validator.cpp schema_validator.cpp
shard.cpp shard.cpp
storage.cpp
shard_rsm.cpp shard_rsm.cpp
bindings/typed_value.cpp bindings/typed_value.cpp
expr.cpp expr.cpp
request_helper.cpp request_helper.cpp)
storage.cpp)
# ###################### # ######################
find_package(gflags REQUIRED) find_package(gflags REQUIRED)

View File

@ -190,6 +190,12 @@ class ShardManager {
}); });
} }
void BlockOnQuiescence() {
for (const auto &worker : workers_) {
worker.BlockOnQuiescence();
}
}
private: private:
io::Io<IoImpl> io_; io::Io<IoImpl> io_;
std::vector<shard_worker::Queue> workers_; std::vector<shard_worker::Queue> workers_;

View File

@ -37,7 +37,6 @@
#include "storage/v3/schemas.hpp" #include "storage/v3/schemas.hpp"
#include "storage/v3/shard.hpp" #include "storage/v3/shard.hpp"
#include "storage/v3/shard_rsm.hpp" #include "storage/v3/shard_rsm.hpp"
#include "storage/v3/storage.hpp"
#include "storage/v3/value_conversions.hpp" #include "storage/v3/value_conversions.hpp"
#include "storage/v3/vertex_accessor.hpp" #include "storage/v3/vertex_accessor.hpp"
#include "storage/v3/vertex_id.hpp" #include "storage/v3/vertex_id.hpp"

View File

@ -80,6 +80,9 @@ struct QueueInner {
// starvation by sometimes randomizing priorities, rather than following a strict // starvation by sometimes randomizing priorities, rather than following a strict
// prioritization. // prioritization.
std::deque<Message> queue; std::deque<Message> queue;
uint64_t submitted = 0;
uint64_t calls_to_pop = 0;
}; };
/// There are two reasons to implement our own Queue instead of using /// There are two reasons to implement our own Queue instead of using
@ -95,6 +98,8 @@ class Queue {
MG_ASSERT(inner_.use_count() > 0); MG_ASSERT(inner_.use_count() > 0);
std::unique_lock<std::mutex> lock(inner_->mu); std::unique_lock<std::mutex> lock(inner_->mu);
inner_->submitted++;
inner_->queue.emplace_back(std::forward<Message>(message)); inner_->queue.emplace_back(std::forward<Message>(message));
} // lock dropped before notifying condition variable } // lock dropped before notifying condition variable
@ -105,6 +110,9 @@ class Queue {
MG_ASSERT(inner_.use_count() > 0); MG_ASSERT(inner_.use_count() > 0);
std::unique_lock<std::mutex> lock(inner_->mu); std::unique_lock<std::mutex> lock(inner_->mu);
inner_->calls_to_pop++;
inner_->cv.notify_all();
while (inner_->queue.empty()) { while (inner_->queue.empty()) {
inner_->cv.wait(lock); inner_->cv.wait(lock);
} }
@ -114,6 +122,15 @@ class Queue {
return message; return message;
} }
void BlockOnQuiescence() const {
MG_ASSERT(inner_.use_count() > 0);
std::unique_lock<std::mutex> lock(inner_->mu);
while (inner_->calls_to_pop <= inner_->submitted) {
inner_->cv.wait(lock);
}
}
}; };
/// A ShardWorker owns Raft<ShardRsm> instances. receives messages from the ShardManager. /// A ShardWorker owns Raft<ShardRsm> instances. receives messages from the ShardManager.
@ -122,7 +139,6 @@ class ShardWorker {
io::Io<IoImpl> io_; io::Io<IoImpl> io_;
Queue queue_; Queue queue_;
std::priority_queue<std::pair<Time, uuid>, std::vector<std::pair<Time, uuid>>, std::greater<>> cron_schedule_; std::priority_queue<std::pair<Time, uuid>, std::vector<std::pair<Time, uuid>>, std::greater<>> cron_schedule_;
Time next_cron_ = Time::min();
std::map<uuid, ShardRaft<IoImpl>> rsm_map_; std::map<uuid, ShardRaft<IoImpl>> rsm_map_;
bool Process(ShutDown && /* shut_down */) { return false; } bool Process(ShutDown && /* shut_down */) { return false; }
@ -175,10 +191,7 @@ class ShardWorker {
return; return;
} }
auto rsm_io = io_.ForkLocal(); auto rsm_io = io_.ForkLocal(to_init.uuid);
auto io_addr = rsm_io.GetAddress();
io_addr.unique_id = to_init.uuid;
rsm_io.SetAddress(io_addr);
// TODO(tyler) get peers from Coordinator in HeartbeatResponse // TODO(tyler) get peers from Coordinator in HeartbeatResponse
std::vector<Address> rsm_peers = {}; std::vector<Address> rsm_peers = {};
@ -208,15 +221,12 @@ class ShardWorker {
~ShardWorker() = default; ~ShardWorker() = default;
void Run() { void Run() {
while (true) { bool should_continue = true;
while (should_continue) {
Message message = queue_.Pop(); Message message = queue_.Pop();
const bool should_continue = should_continue =
std::visit([&](auto &&msg) { return Process(std::forward<decltype(msg)>(msg)); }, std::move(message)); std::visit([&](auto &&msg) { return Process(std::forward<decltype(msg)>(msg)); }, std::move(message));
if (!should_continue) {
return;
}
} }
} }
}; };

View File

@ -1,20 +0,0 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v3/storage.hpp"
#include "storage/v3/config.hpp"
namespace memgraph::storage::v3 {
Storage::Storage(Config config) : config_{config} {}
} // namespace memgraph::storage::v3

View File

@ -1,34 +0,0 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <vector>
#include <boost/asio/thread_pool.hpp>
#include "storage/v3/shard.hpp"
namespace memgraph::storage::v3 {
class Storage {
public:
explicit Storage(Config config);
// Interface toward shard manipulation
// Shard handler -> will use rsm client
private:
std::vector<Shard> shards_;
boost::asio::thread_pool shard_handlers_;
Config config_;
};
} // namespace memgraph::storage::v3

View File

@ -9,17 +9,25 @@
// by the Apache License, Version 2.0, included in the file // by the Apache License, Version 2.0, included in the file
// licenses/APL.txt. // licenses/APL.txt.
#include <memory>
#include <thread> #include <thread>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <spdlog/cfg/env.h>
#include "io/message_histogram_collector.hpp"
#include "io/simulator/simulator.hpp" #include "io/simulator/simulator.hpp"
#include "utils/print_helpers.hpp" #include "utils/print_helpers.hpp"
using memgraph::io::Address; using memgraph::io::Address;
using memgraph::io::Io; using memgraph::io::Io;
using memgraph::io::LatencyHistogramSummaries;
using memgraph::io::ResponseFuture; using memgraph::io::ResponseFuture;
using memgraph::io::ResponseResult; using memgraph::io::ResponseResult;
using memgraph::io::simulator::Simulator; using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig; using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport; using memgraph::io::simulator::SimulatorTransport;
struct CounterRequest { struct CounterRequest {
@ -40,6 +48,7 @@ void run_server(Io<SimulatorTransport> io) {
std::cout << "[SERVER] Error, continue" << std::endl; std::cout << "[SERVER] Error, continue" << std::endl;
continue; continue;
} }
std::cout << "[SERVER] Got message" << std::endl;
auto request_envelope = request_result.GetValue(); auto request_envelope = request_result.GetValue();
auto req = std::get<CounterRequest>(request_envelope.message); auto req = std::get<CounterRequest>(request_envelope.message);
@ -50,13 +59,7 @@ void run_server(Io<SimulatorTransport> io) {
} }
} }
int main() { std::pair<SimulatorStats, LatencyHistogramSummaries> RunWorkload(SimulatorConfig &config) {
auto config = SimulatorConfig{
.drop_percent = 0,
.perform_timeouts = true,
.scramble_messages = true,
.rng_seed = 0,
};
auto simulator = Simulator(config); auto simulator = Simulator(config);
auto cli_addr = Address::TestAddress(1); auto cli_addr = Address::TestAddress(1);
@ -72,21 +75,47 @@ int main() {
// send request // send request
CounterRequest cli_req; CounterRequest cli_req;
cli_req.proposal = i; cli_req.proposal = i;
spdlog::info("[CLIENT] calling Request");
auto res_f = cli_io.Request<CounterRequest, CounterResponse>(srv_addr, cli_req); auto res_f = cli_io.Request<CounterRequest, CounterResponse>(srv_addr, cli_req);
spdlog::info("[CLIENT] calling Wait");
auto res_rez = std::move(res_f).Wait(); auto res_rez = std::move(res_f).Wait();
spdlog::info("[CLIENT] Wait returned");
if (!res_rez.HasError()) { if (!res_rez.HasError()) {
std::cout << "[CLIENT] Got a valid response" << std::endl; spdlog::info("[CLIENT] Got a valid response");
auto env = res_rez.GetValue(); auto env = res_rez.GetValue();
MG_ASSERT(env.message.highest_seen == i); MG_ASSERT(env.message.highest_seen == i);
std::cout << "response latency: " << env.response_latency.count() << " microseconds" << std::endl; spdlog::info("response latency: {} microseconds", env.response_latency.count());
} else { } else {
std::cout << "[CLIENT] Got an error" << std::endl; spdlog::info("[CLIENT] Got an error");
} }
} }
using memgraph::utils::print_helpers::operator<<;
std::cout << "response latencies: " << cli_io.ResponseLatencies() << std::endl;
simulator.ShutDown(); simulator.ShutDown();
return std::make_pair(simulator.Stats(), cli_io.ResponseLatencies());
}
int main() {
spdlog::cfg::load_env_levels();
auto config = SimulatorConfig{
.drop_percent = 0,
.perform_timeouts = true,
.scramble_messages = true,
.rng_seed = 0,
};
auto [sim_stats_1, latency_stats_1] = RunWorkload(config);
auto [sim_stats_2, latency_stats_2] = RunWorkload(config);
if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) {
spdlog::error("simulator stats diverged across runs");
spdlog::error("run 1 simulator stats: {}", sim_stats_1);
spdlog::error("run 2 simulator stats: {}", sim_stats_2);
spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable());
spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable());
std::terminate();
}
return 0; return 0;
} }

View File

@ -33,21 +33,31 @@ using io::Time;
using io::simulator::SimulatorConfig; using io::simulator::SimulatorConfig;
using storage::v3::kMaximumCronInterval; using storage::v3::kMaximumCronInterval;
RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops)) { RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops, uint64_t rng_seed)) {
// TODO(tyler) set abort_time to something more restrictive than Time::max()
spdlog::cfg::load_env_levels(); spdlog::cfg::load_env_levels();
SimulatorConfig sim_config{ SimulatorConfig sim_config{
.drop_percent = 0, .drop_percent = 0,
.perform_timeouts = false, .perform_timeouts = false,
.scramble_messages = true, .scramble_messages = true,
.rng_seed = 0, .rng_seed = rng_seed,
.start_time = Time::min(), .start_time = Time::min(),
// TODO(tyler) set abort_time to something more restrictive than Time::max()
.abort_time = Time::max(), .abort_time = Time::max(),
}; };
RunClusterSimulation(sim_config, cluster_config, ops.ops); auto [sim_stats_1, latency_stats_1] = RunClusterSimulation(sim_config, cluster_config, ops.ops);
auto [sim_stats_2, latency_stats_2] = RunClusterSimulation(sim_config, cluster_config, ops.ops);
if (latency_stats_1 != latency_stats_2) {
spdlog::error("simulator stats diverged across runs");
spdlog::error("run 1 simulator stats: {}", sim_stats_1);
spdlog::error("run 2 simulator stats: {}", sim_stats_2);
spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable());
spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable());
RC_ASSERT(latency_stats_1 == latency_stats_2);
RC_ASSERT(sim_stats_1 == sim_stats_2);
}
} }
} // namespace memgraph::tests::simulation } // namespace memgraph::tests::simulation

View File

@ -14,11 +14,15 @@
#include <iostream> #include <iostream>
#include <map> #include <map>
#include <optional> #include <optional>
#include <random>
#include <set> #include <set>
#include <thread> #include <thread>
#include <vector> #include <vector>
#include <spdlog/cfg/env.h>
#include "io/address.hpp" #include "io/address.hpp"
#include "io/message_histogram_collector.hpp"
#include "io/rsm/raft.hpp" #include "io/rsm/raft.hpp"
#include "io/rsm/rsm_client.hpp" #include "io/rsm/rsm_client.hpp"
#include "io/simulator/simulator.hpp" #include "io/simulator/simulator.hpp"
@ -27,6 +31,7 @@
using memgraph::io::Address; using memgraph::io::Address;
using memgraph::io::Duration; using memgraph::io::Duration;
using memgraph::io::Io; using memgraph::io::Io;
using memgraph::io::LatencyHistogramSummaries;
using memgraph::io::ResponseEnvelope; using memgraph::io::ResponseEnvelope;
using memgraph::io::ResponseFuture; using memgraph::io::ResponseFuture;
using memgraph::io::ResponseResult; using memgraph::io::ResponseResult;
@ -123,16 +128,7 @@ void RunRaft(Raft<IoImpl, TestState, CasRequest, CasResponse, GetRequest, GetRes
server.Run(); server.Run();
} }
void RunSimulation() { std::pair<SimulatorStats, LatencyHistogramSummaries> RunSimulation(SimulatorConfig &config) {
SimulatorConfig config{
.drop_percent = 5,
.perform_timeouts = true,
.scramble_messages = true,
.rng_seed = 0,
.start_time = Time::min() + std::chrono::microseconds{256 * 1024},
.abort_time = Time::max(),
};
auto simulator = Simulator(config); auto simulator = Simulator(config);
auto cli_addr = Address::TestAddress(1); auto cli_addr = Address::TestAddress(1);
@ -189,6 +185,7 @@ void RunSimulation() {
auto write_cas_response_result = client.SendWriteRequest(cas_req); auto write_cas_response_result = client.SendWriteRequest(cas_req);
if (write_cas_response_result.HasError()) { if (write_cas_response_result.HasError()) {
spdlog::debug("timed out");
// timed out // timed out
continue; continue;
} }
@ -229,6 +226,10 @@ void RunSimulation() {
simulator.ShutDown(); simulator.ShutDown();
srv_thread_1.join();
srv_thread_2.join();
srv_thread_3.join();
SimulatorStats stats = simulator.Stats(); SimulatorStats stats = simulator.Stats();
spdlog::info("total messages: {}", stats.total_messages); spdlog::info("total messages: {}", stats.total_messages);
@ -240,21 +241,51 @@ void RunSimulation() {
spdlog::info("========================== SUCCESS :) =========================="); spdlog::info("========================== SUCCESS :) ==========================");
/* return std::make_pair(simulator.Stats(), cli_io.ResponseLatencies());
this is implicit in jthread's dtor }
srv_thread_1.join();
srv_thread_2.join(); void RunWithSeed(uint64_t seed) {
srv_thread_3.join(); SimulatorConfig config{
*/ .drop_percent = 5,
.perform_timeouts = true,
.scramble_messages = true,
.rng_seed = seed,
.start_time = Time::min() + std::chrono::microseconds{256 * 1024},
.abort_time = Time::min() + std::chrono::seconds{3600},
};
spdlog::error("========================== NEW SIMULATION, replay with RunWithSeed({}) ==========================",
seed);
spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n");
auto [sim_stats_1, latency_stats_1] = RunSimulation(config);
spdlog::info("========================== NEW SIMULATION, replay with RunWithSeed({}) ==========================",
seed);
spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n");
auto [sim_stats_2, latency_stats_2] = RunSimulation(config);
if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) {
spdlog::error("simulator stats diverged across runs for test rng_seed: {}", seed);
spdlog::error("run 1 simulator stats: {}", sim_stats_1);
spdlog::error("run 2 simulator stats: {}", sim_stats_2);
spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable());
spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable());
std::terminate();
}
} }
int main() { int main() {
spdlog::cfg::load_env_levels();
std::random_device random_device;
std::mt19937 generator(random_device());
std::uniform_int_distribution<> distribution;
int n_tests = 50; int n_tests = 50;
for (int i = 0; i < n_tests; i++) { for (int i = 0; i < n_tests; i++) {
spdlog::info("========================== NEW SIMULATION {} ==========================", i); uint64_t seed = distribution(generator);
spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n"); RunWithSeed(seed);
RunSimulation();
} }
spdlog::info("passed {} tests!", n_tests); spdlog::info("passed {} tests!", n_tests);

View File

@ -1115,11 +1115,12 @@ int TestMessages() {
ConcreteShardRsm shard_server3(std::move(shard_server_io_3), address_for_3, ShardRsm(std::move(shard_ptr3))); ConcreteShardRsm shard_server3(std::move(shard_server_io_3), address_for_3, ShardRsm(std::move(shard_ptr3)));
auto server_thread1 = std::jthread([&shard_server1]() { shard_server1.Run(); }); auto server_thread1 = std::jthread([&shard_server1]() { shard_server1.Run(); });
auto server_thread2 = std::jthread([&shard_server2]() { shard_server2.Run(); });
auto server_thread3 = std::jthread([&shard_server3]() { shard_server3.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(shard_server_1_address); simulator.IncrementServerCountAndWaitForQuiescentState(shard_server_1_address);
auto server_thread2 = std::jthread([&shard_server2]() { shard_server2.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(shard_server_2_address); simulator.IncrementServerCountAndWaitForQuiescentState(shard_server_2_address);
auto server_thread3 = std::jthread([&shard_server3]() { shard_server3.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(shard_server_3_address); simulator.IncrementServerCountAndWaitForQuiescentState(shard_server_3_address);
std::cout << "Beginning test after servers have become quiescent." << std::endl; std::cout << "Beginning test after servers have become quiescent." << std::endl;

View File

@ -24,6 +24,7 @@
#include "coordinator/shard_map.hpp" #include "coordinator/shard_map.hpp"
#include "generated_operations.hpp" #include "generated_operations.hpp"
#include "io/address.hpp" #include "io/address.hpp"
#include "io/message_histogram_collector.hpp"
#include "io/simulator/simulator.hpp" #include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_config.hpp" #include "io/simulator/simulator_config.hpp"
#include "io/simulator/simulator_transport.hpp" #include "io/simulator/simulator_transport.hpp"
@ -57,6 +58,7 @@ using io::simulator::SimulatorStats;
using io::simulator::SimulatorTransport; using io::simulator::SimulatorTransport;
using machine_manager::MachineConfig; using machine_manager::MachineConfig;
using machine_manager::MachineManager; using machine_manager::MachineManager;
using memgraph::io::LatencyHistogramSummaries;
using msgs::ReadRequests; using msgs::ReadRequests;
using msgs::ReadResponses; using msgs::ReadResponses;
using msgs::WriteRequests; using msgs::WriteRequests;
@ -75,6 +77,8 @@ MachineManager<SimulatorTransport> MkMm(Simulator &simulator, std::vector<Addres
.is_coordinator = true, .is_coordinator = true,
.listen_ip = addr.last_known_ip, .listen_ip = addr.last_known_ip,
.listen_port = addr.last_known_port, .listen_port = addr.last_known_port,
.shard_worker_threads = 4,
.sync_message_handling = true,
}; };
Io<SimulatorTransport> io = simulator.Register(addr); Io<SimulatorTransport> io = simulator.Register(addr);
@ -210,17 +214,19 @@ struct DetachIfDropped {
} }
}; };
void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig &cluster_config, std::pair<SimulatorStats, LatencyHistogramSummaries> RunClusterSimulation(const SimulatorConfig &sim_config,
const std::vector<Op> &ops) { const ClusterConfig &cluster_config,
const std::vector<Op> &ops) {
spdlog::info("========================== NEW SIMULATION =========================="); spdlog::info("========================== NEW SIMULATION ==========================");
auto simulator = Simulator(sim_config); auto simulator = Simulator(sim_config);
auto cli_addr = Address::TestAddress(1); auto machine_1_addr = Address::TestAddress(1);
auto machine_1_addr = cli_addr.ForkUniqueAddress(); auto cli_addr = Address::TestAddress(2);
auto cli_addr_2 = Address::TestAddress(3);
Io<SimulatorTransport> cli_io = simulator.Register(cli_addr); Io<SimulatorTransport> cli_io = simulator.Register(cli_addr);
Io<SimulatorTransport> cli_io_2 = simulator.Register(Address::TestAddress(2)); Io<SimulatorTransport> cli_io_2 = simulator.Register(cli_addr_2);
auto coordinator_addresses = std::vector{ auto coordinator_addresses = std::vector{
machine_1_addr, machine_1_addr,
@ -232,6 +238,7 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
Address coordinator_address = mm_1.CoordinatorAddress(); Address coordinator_address = mm_1.CoordinatorAddress();
auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1)); auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1));
simulator.IncrementServerCountAndWaitForQuiescentState(machine_1_addr);
auto detach_on_error = DetachIfDropped{.handle = mm_thread_1}; auto detach_on_error = DetachIfDropped{.handle = mm_thread_1};
@ -257,6 +264,8 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
simulator.ShutDown(); simulator.ShutDown();
mm_thread_1.join();
SimulatorStats stats = simulator.Stats(); SimulatorStats stats = simulator.Stats();
spdlog::info("total messages: {}", stats.total_messages); spdlog::info("total messages: {}", stats.total_messages);
@ -268,10 +277,8 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
auto histo = cli_io_2.ResponseLatencies(); auto histo = cli_io_2.ResponseLatencies();
using memgraph::utils::print_helpers::operator<<;
std::cout << "response latencies: " << histo << std::endl;
spdlog::info("========================== SUCCESS :) =========================="); spdlog::info("========================== SUCCESS :) ==========================");
return std::make_pair(stats, histo);
} }
} // namespace memgraph::tests::simulation } // namespace memgraph::tests::simulation

View File

@ -21,7 +21,7 @@
#include "query/v2/context.hpp" #include "query/v2/context.hpp"
#include "query/v2/db_accessor.hpp" #include "query/v2/db_accessor.hpp"
#include "query/v2/plan/operator.hpp" #include "query/v2/plan/operator.hpp"
#include "storage/v3/storage.hpp" #include "storage/v3/shard.hpp"
#include "utils/logging.hpp" #include "utils/logging.hpp"
#include "query_v2_query_common.hpp" #include "query_v2_query_common.hpp"

View File

@ -30,7 +30,7 @@
#include "storage/v3/id_types.hpp" #include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp" #include "storage/v3/property_value.hpp"
#include "storage/v3/schemas.hpp" #include "storage/v3/schemas.hpp"
#include "storage/v3/storage.hpp" #include "storage/v3/shard.hpp"
#include "storage/v3/vertex.hpp" #include "storage/v3/vertex.hpp"
#include "storage/v3/view.hpp" #include "storage/v3/view.hpp"

View File

@ -21,7 +21,7 @@
#include "query/v2/interpreter.hpp" #include "query/v2/interpreter.hpp"
#include "result_stream_faker.hpp" #include "result_stream_faker.hpp"
#include "storage/v3/storage.hpp" #include "storage/v3/shard.hpp"
DECLARE_bool(query_cost_planner); DECLARE_bool(query_cost_planner);

View File

@ -15,7 +15,7 @@
#include "query/v2/plan/operator.hpp" #include "query/v2/plan/operator.hpp"
#include "query_v2_query_plan_common.hpp" #include "query_v2_query_plan_common.hpp"
#include "storage/v3/property_value.hpp" #include "storage/v3/property_value.hpp"
#include "storage/v3/storage.hpp" #include "storage/v3/shard.hpp"
namespace memgraph::query::v2::tests { namespace memgraph::query::v2::tests {

View File

@ -16,7 +16,7 @@
#include "glue/v2/communication.hpp" #include "glue/v2/communication.hpp"
#include "query/v2/bindings/typed_value.hpp" #include "query/v2/bindings/typed_value.hpp"
#include "storage/v3/storage.hpp" #include "storage/v3/shard.hpp"
#include "utils/algorithm.hpp" #include "utils/algorithm.hpp"
/** /**

View File

@ -18,7 +18,7 @@
#include "storage/v3/id_types.hpp" #include "storage/v3/id_types.hpp"
#include "storage/v3/name_id_mapper.hpp" #include "storage/v3/name_id_mapper.hpp"
#include "storage/v3/property_value.hpp" #include "storage/v3/property_value.hpp"
#include "storage/v3/storage.hpp" #include "storage/v3/shard.hpp"
#include "storage/v3/temporal.hpp" #include "storage/v3/temporal.hpp"
#include "storage/v3/view.hpp" #include "storage/v3/view.hpp"

View File

@ -13,7 +13,7 @@
#include "storage/v3/isolation_level.hpp" #include "storage/v3/isolation_level.hpp"
#include "storage/v3/property_value.hpp" #include "storage/v3/property_value.hpp"
#include "storage/v3/storage.hpp" #include "storage/v3/shard.hpp"
namespace memgraph::storage::v3::tests { namespace memgraph::storage::v3::tests {
int64_t VerticesCount(Shard::Accessor &accessor) { int64_t VerticesCount(Shard::Accessor &accessor) {

View File

@ -23,7 +23,7 @@
#include "storage/v3/property_value.hpp" #include "storage/v3/property_value.hpp"
#include "storage/v3/schema_validator.hpp" #include "storage/v3/schema_validator.hpp"
#include "storage/v3/schemas.hpp" #include "storage/v3/schemas.hpp"
#include "storage/v3/storage.hpp" #include "storage/v3/shard.hpp"
#include "storage/v3/temporal.hpp" #include "storage/v3/temporal.hpp"
using testing::Pair; using testing::Pair;

View File

@ -11,7 +11,7 @@
#pragma once #pragma once
#include "storage/v3/storage.hpp" #include "storage/v3/shard.hpp"
#include "storage/v3/view.hpp" #include "storage/v3/view.hpp"
namespace memgraph::storage::v3::tests { namespace memgraph::storage::v3::tests {