Merge remote-tracking branch 'origin/project-pineapples' into E118-MG-lexicographically-ordered-storage
This commit is contained in:
@ -1,10 +1,9 @@
find_package(fmt REQUIRED)
find_package(Threads REQUIRED)
add_library(mg-coordinator STATIC ${coordinator_src_files})
target_link_libraries(mg-coordinator stdc++fs Threads::Threads fmt::fmt mg-utils)
target_link_libraries(mg-coordinator stdc++fs Threads::Threads fmt::fmt mg-utils mg-storage-v3)
Normal file
Normal file
@ -0,0 +1,129 @@
// Copyright 2022 Memgraph Ltd.
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <coordinator/coordinator.hpp>
namespace memgraph::coordinator {
CoordinatorWriteResponses Coordinator::ApplyWrite(HeartbeatRequest &&heartbeat_request) {
spdlog::info("Coordinator handling HeartbeatRequest");
// add this storage engine to any under-replicated shards that it is not already a part of
auto initializing_rsms_for_shard_manager =
shard_map_.AssignShards(heartbeat_request.from_storage_manager, heartbeat_request.initialized_rsms);
return HeartbeatResponse{
.shards_to_initialize = initializing_rsms_for_shard_manager,
CoordinatorWriteResponses Coordinator::ApplyWrite(HlcRequest &&hlc_request) {
HlcResponse res{};
auto hlc_shard_map = shard_map_.GetHlc();
MG_ASSERT(!(hlc_request.last_shard_map_version.logical_id > hlc_shard_map.logical_id));
res.new_hlc = Hlc{
.logical_id = ++highest_allocated_timestamp_,
// TODO(tyler) probably pass some more context to the Coordinator here
// so that we can use our wall clock and enforce monotonicity.
// .coordinator_wall_clock = io_.Now(),
// Allways return fresher shard_map for now.
res.fresher_shard_map = std::make_optional(shard_map_);
return res;
CoordinatorWriteResponses Coordinator::ApplyWrite(AllocateEdgeIdBatchRequest &&ahr) {
AllocateEdgeIdBatchResponse res{};
uint64_t low = highest_allocated_edge_id_;
highest_allocated_edge_id_ += ahr.batch_size;
uint64_t high = highest_allocated_edge_id_;
res.low = low;
res.high = high;
return res;
/// This splits the shard immediately beneath the provided
/// split key, keeping the assigned peers identical for now,
/// but letting them be gradually migrated over time.
CoordinatorWriteResponses Coordinator::ApplyWrite(SplitShardRequest &&split_shard_request) {
SplitShardResponse res{};
if (split_shard_request.previous_shard_map_version != shard_map_.shard_map_version) {
res.success = false;
} else {
res.success = shard_map_.SplitShard(split_shard_request.previous_shard_map_version, split_shard_request.label_id,
return res;
/// This adds the provided storage engine to the standby storage engine pool,
/// which can be used to rebalance storage over time.
CoordinatorWriteResponses Coordinator::ApplyWrite(
RegisterStorageEngineRequest && /* register_storage_engine_request */) {
RegisterStorageEngineResponse res{};
return res;
/// This begins the process of draining the provided storage engine from all raft
/// clusters that it might be participating in.
CoordinatorWriteResponses Coordinator::ApplyWrite(
DeregisterStorageEngineRequest && /* register_storage_engine_request */) {
DeregisterStorageEngineResponse res{};
return res;
CoordinatorWriteResponses Coordinator::ApplyWrite(InitializeLabelRequest &&initialize_label_request) {
InitializeLabelResponse res{};
std::optional<LabelId> new_label_id = shard_map_.InitializeNewLabel(
initialize_label_request.label_name, initialize_label_request.schema, initialize_label_request.replication_factor,
if (new_label_id) {
res.new_label_id = new_label_id.value();
res.fresher_shard_map = std::nullopt;
res.success = true;
} else {
res.fresher_shard_map = shard_map_;
res.success = false;
return res;
CoordinatorWriteResponses Coordinator::ApplyWrite(AllocatePropertyIdsRequest &&allocate_property_ids_request) {
AllocatePropertyIdsResponse res{};
auto property_ids = shard_map_.AllocatePropertyIds(allocate_property_ids_request.property_names);
res.property_ids = property_ids;
return res;
} // namespace memgraph::coordinator
@ -12,26 +12,30 @@
#pragma once
#include <optional>
#include <set>
#include <string>
#include <unordered_set>
#include <variant>
#include <vector>
#include "coordinator/hybrid_logical_clock.hpp"
#include "coordinator/shard_map.hpp"
#include "io/simulator/simulator.hpp"
#include "io/time.hpp"
#include "io/transport.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/schemas.hpp"
#include <boost/uuid/uuid.hpp>
#include <coordinator/hybrid_logical_clock.hpp>
#include <coordinator/shard_map.hpp>
#include <io/simulator/simulator.hpp>
#include <io/time.hpp>
#include <io/transport.hpp>
#include <storage/v3/id_types.hpp>
#include <storage/v3/schemas.hpp>
namespace memgraph::coordinator {
using memgraph::io::Address;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::PropertyId;
using Address = memgraph::io::Address;
using SimT = memgraph::io::simulator::SimulatorTransport;
using memgraph::storage::v3::SchemaProperty;
using SimT = memgraph::io::simulator::SimulatorTransport;
using PrimaryKey = std::vector<PropertyValue>;
struct HlcRequest {
Hlc last_shard_map_version;
@ -76,7 +80,7 @@ struct AllocatePropertyIdsResponse {
struct SplitShardRequest {
Hlc previous_shard_map_version;
LabelId label_id;
CompoundKey split_key;
PrimaryKey split_key;
struct SplitShardResponse {
@ -102,26 +106,34 @@ struct DeregisterStorageEngineResponse {
struct InitializeLabelRequest {
std::string label_name;
std::vector<SchemaProperty> schema;
size_t replication_factor;
Hlc last_shard_map_version;
struct InitializeLabelResponse {
bool success;
LabelId new_label_id;
std::optional<ShardMap> fresher_shard_map;
struct HeartbeatRequest {};
struct HeartbeatResponse {};
struct HeartbeatRequest {
Address from_storage_manager;
std::set<boost::uuids::uuid> initialized_rsms;
struct HeartbeatResponse {
std::vector<ShardToInitialize> shards_to_initialize;
using CoordinatorWriteRequests =
std::variant<HlcRequest, AllocateEdgeIdBatchRequest, SplitShardRequest, RegisterStorageEngineRequest,
DeregisterStorageEngineRequest, InitializeLabelRequest, AllocatePropertyIdsRequest>;
using CoordinatorWriteResponses =
std::variant<HlcResponse, AllocateEdgeIdBatchResponse, SplitShardResponse, RegisterStorageEngineResponse,
DeregisterStorageEngineResponse, InitializeLabelResponse, AllocatePropertyIdsResponse>;
DeregisterStorageEngineRequest, InitializeLabelRequest, AllocatePropertyIdsRequest, HeartbeatRequest>;
using CoordinatorWriteResponses = std::variant<HlcResponse, AllocateEdgeIdBatchResponse, SplitShardResponse,
RegisterStorageEngineResponse, DeregisterStorageEngineResponse,
InitializeLabelResponse, AllocatePropertyIdsResponse, HeartbeatResponse>;
using CoordinatorReadRequests = std::variant<GetShardMapRequest, HeartbeatRequest>;
using CoordinatorReadResponses = std::variant<GetShardMapResponse, HeartbeatResponse>;
using CoordinatorReadRequests = std::variant<GetShardMapRequest>;
using CoordinatorReadResponses = std::variant<GetShardMapResponse>;
class Coordinator {
@ -146,114 +158,34 @@ class Coordinator {
/// Query engines need to periodically request batches of unique edge IDs.
uint64_t highest_allocated_edge_id_;
static CoordinatorReadResponses HandleRead(HeartbeatRequest && /* heartbeat_request */) {
return HeartbeatResponse{};
CoordinatorReadResponses HandleRead(GetShardMapRequest && /* get_shard_map_request */) {
GetShardMapResponse res;
res.shard_map = shard_map_;
return res;
CoordinatorWriteResponses ApplyWrite(HlcRequest &&hlc_request) {
HlcResponse res{};
CoordinatorWriteResponses ApplyWrite(HeartbeatRequest &&heartbeat_request);
auto hlc_shard_map = shard_map_.GetHlc();
CoordinatorWriteResponses ApplyWrite(HlcRequest &&hlc_request);
MG_ASSERT(!(hlc_request.last_shard_map_version.logical_id > hlc_shard_map.logical_id));
res.new_hlc = Hlc{
.logical_id = ++highest_allocated_timestamp_,
// TODO(tyler) probably pass some more context to the Coordinator here
// so that we can use our wall clock and enforce monotonicity.
// .coordinator_wall_clock = io_.Now(),
// Allways return fresher shard_map for now.
res.fresher_shard_map = std::make_optional(shard_map_);
return res;
CoordinatorWriteResponses ApplyWrite(AllocateEdgeIdBatchRequest &&ahr) {
AllocateEdgeIdBatchResponse res{};
uint64_t low = highest_allocated_edge_id_;
highest_allocated_edge_id_ += ahr.batch_size;
uint64_t high = highest_allocated_edge_id_;
res.low = low;
res.high = high;
return res;
CoordinatorWriteResponses ApplyWrite(AllocateEdgeIdBatchRequest &&ahr);
/// This splits the shard immediately beneath the provided
/// split key, keeping the assigned peers identical for now,
/// but letting them be gradually migrated over time.
CoordinatorWriteResponses ApplyWrite(SplitShardRequest &&split_shard_request) {
SplitShardResponse res{};
if (split_shard_request.previous_shard_map_version != shard_map_.shard_map_version) {
res.success = false;
} else {
res.success = shard_map_.SplitShard(split_shard_request.previous_shard_map_version, split_shard_request.label_id,
return res;
CoordinatorWriteResponses ApplyWrite(SplitShardRequest &&split_shard_request);
/// This adds the provided storage engine to the standby storage engine pool,
/// which can be used to rebalance storage over time.
static CoordinatorWriteResponses ApplyWrite(RegisterStorageEngineRequest && /* register_storage_engine_request */) {
RegisterStorageEngineResponse res{};
return res;
static CoordinatorWriteResponses ApplyWrite(RegisterStorageEngineRequest && /* register_storage_engine_request */);
/// This begins the process of draining the provided storage engine from all raft
/// clusters that it might be participating in.
static CoordinatorWriteResponses ApplyWrite(DeregisterStorageEngineRequest && /* register_storage_engine_request */) {
DeregisterStorageEngineResponse res{};
// const Address &address = register_storage_engine_request.address;
// storage_engine_pool_.erase(address);
// res.success = true;
static CoordinatorWriteResponses ApplyWrite(DeregisterStorageEngineRequest && /* register_storage_engine_request */);
return res;
CoordinatorWriteResponses ApplyWrite(InitializeLabelRequest &&initialize_label_request);
CoordinatorWriteResponses ApplyWrite(InitializeLabelRequest &&initialize_label_request) {
InitializeLabelResponse res{};
bool success = shard_map_.InitializeNewLabel(initialize_label_request.label_name, initialize_label_request.schema,
if (success) {
res.fresher_shard_map = shard_map_;
res.success = false;
} else {
res.fresher_shard_map = std::nullopt;
res.success = true;
return res;
CoordinatorWriteResponses ApplyWrite(AllocatePropertyIdsRequest &&allocate_property_ids_request) {
AllocatePropertyIdsResponse res{};
auto property_ids = shard_map_.AllocatePropertyIds(allocate_property_ids_request.property_names);
res.property_ids = property_ids;
return res;
CoordinatorWriteResponses ApplyWrite(AllocatePropertyIdsRequest &&allocate_property_ids_request);
} // namespace memgraph::coordinator
Normal file
Normal file
@ -0,0 +1,90 @@
// Copyright 2022 Memgraph Ltd.
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "coordinator/shard_map.hpp"
#include "storage/v3/temporal.hpp"
namespace memgraph::coordinator {
using memgraph::common::SchemaType;
using memgraph::storage::v3::TemporalData;
using memgraph::storage::v3::TemporalType;
PrimaryKey SchemaToMinKey(const std::vector<SchemaProperty> &schema) {
PrimaryKey ret{};
const int64_t min_int = std::numeric_limits<int64_t>::min();
const TemporalData date{TemporalType::Date, min_int};
const TemporalData local_time{TemporalType::LocalTime, min_int};
const TemporalData local_date_time{TemporalType::LocalDateTime, min_int};
const TemporalData duration{TemporalType::Duration, min_int};
for (const auto &schema_property : schema) {
switch (schema_property.type) {
case SchemaType::BOOL:
case SchemaType::INT:
case SchemaType::STRING:
case SchemaType::DATE:
case SchemaType::LOCALTIME:
case SchemaType::LOCALDATETIME:
case SchemaType::DURATION:
return ret;
std::optional<LabelId> ShardMap::InitializeNewLabel(std::string label_name, std::vector<SchemaProperty> schema,
size_t replication_factor, Hlc last_shard_map_version) {
if (shard_map_version != last_shard_map_version || labels.contains(label_name)) {
return std::nullopt;
const LabelId label_id = LabelId::FromUint(++max_label_id);
labels.emplace(std::move(label_name), label_id);
PrimaryKey initial_key = SchemaToMinKey(schema);
Shard empty_shard = {};
Shards shards = {
{initial_key, empty_shard},
LabelSpace label_space{
.schema = std::move(schema),
.shards = shards,
.replication_factor = replication_factor,
label_spaces.emplace(label_id, label_space);
return label_id;
} // namespace memgraph::coordinator
@ -11,11 +11,18 @@
#pragma once
#include <limits>
#include <map>
#include <set>
#include <vector>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include "common/types.hpp"
#include "coordinator/hybrid_logical_clock.hpp"
#include "io/address.hpp"
#include "storage/v3/config.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/schemas.hpp"
@ -23,8 +30,10 @@
namespace memgraph::coordinator {
using memgraph::io::Address;
using memgraph::storage::v3::Config;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::PropertyId;
using memgraph::storage::v3::PropertyValue;
using memgraph::storage::v3::SchemaProperty;
enum class Status : uint8_t {
@ -40,16 +49,27 @@ struct AddressAndStatus {
Status status;
using CompoundKey = std::vector<memgraph::storage::v3::PropertyValue>;
using PrimaryKey = std::vector<PropertyValue>;
using Shard = std::vector<AddressAndStatus>;
using Shards = std::map<CompoundKey, Shard>;
using Shards = std::map<PrimaryKey, Shard>;
using LabelName = std::string;
using PropertyName = std::string;
using PropertyMap = std::map<PropertyName, PropertyId>;
struct ShardToInitialize {
boost::uuids::uuid uuid;
LabelId label_id;
PrimaryKey min_key;
std::optional<PrimaryKey> max_key;
Config config;
PrimaryKey SchemaToMinKey(const std::vector<SchemaProperty> &schema);
struct LabelSpace {
std::vector<SchemaProperty> schema;
std::map<CompoundKey, Shard> shards;
std::map<PrimaryKey, Shard> shards;
size_t replication_factor;
struct ShardMap {
@ -70,7 +90,70 @@ struct ShardMap {
Hlc GetHlc() const noexcept { return shard_map_version; }
bool SplitShard(Hlc previous_shard_map_version, LabelId label_id, const CompoundKey &key) {
// Returns the shard UUIDs that have been assigned but not yet acknowledged for this storage manager
std::vector<ShardToInitialize> AssignShards(Address storage_manager, std::set<boost::uuids::uuid> initialized) {
std::vector<ShardToInitialize> ret{};
bool mutated = false;
for (auto &[label_id, label_space] : label_spaces) {
for (auto &[low_key, shard] : label_space.shards) {
// TODO(tyler) avoid these triple-nested loops by having the heartbeat include better info
bool machine_contains_shard = false;
for (auto &aas : shard) {
if (initialized.contains(aas.address.unique_id)) {
spdlog::info("marking shard as full consensus participant: {}", aas.address.unique_id);
aas.status = Status::CONSENSUS_PARTICIPANT;
machine_contains_shard = true;
} else {
const bool same_machine = aas.address.last_known_ip == storage_manager.last_known_ip &&
aas.address.last_known_port == storage_manager.last_known_port;
if (same_machine) {
machine_contains_shard = true;
.uuid = aas.address.unique_id,
.label_id = label_id,
.min_key = low_key,
.max_key = std::nullopt,
.config = Config{},
if (!machine_contains_shard && shard.size() < label_space.replication_factor) {
Address address = storage_manager;
// TODO(tyler) use deterministic UUID so that coordinators don't diverge here
address.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()},
.uuid = address.unique_id,
.label_id = label_id,
.min_key = low_key,
.max_key = std::nullopt,
.config = Config{},
AddressAndStatus aas = {
.address = address,
.status = Status::INITIALIZING,
if (mutated) {
return ret;
bool SplitShard(Hlc previous_shard_map_version, LabelId label_id, const PrimaryKey &key) {
if (previous_shard_map_version != shard_map_version) {
return false;
@ -78,10 +161,11 @@ struct ShardMap {
auto &label_space =;
auto &shards_in_map = label_space.shards;
// Finding the Shard that the new CompoundKey should map to.
// Finding the Shard that the new PrimaryKey should map to.
auto prev = std::prev(shards_in_map.upper_bound(key));
Shard duplicated_shard = prev->second;
@ -91,32 +175,14 @@ struct ShardMap {
return true;
bool InitializeNewLabel(std::string label_name, std::vector<SchemaProperty> schema, Hlc last_shard_map_version) {
if (shard_map_version != last_shard_map_version || labels.contains(label_name)) {
return false;
const LabelId label_id = LabelId::FromUint(++max_label_id);
labels.emplace(std::move(label_name), label_id);
LabelSpace label_space{
.schema = std::move(schema),
.shards = Shards{},
label_spaces.emplace(label_id, label_space);
return true;
std::optional<LabelId> InitializeNewLabel(std::string label_name, std::vector<SchemaProperty> schema,
size_t replication_factor, Hlc last_shard_map_version);
void AddServer(Address server_address) {
// Find a random place for the server to plug in
Shards GetShardsForRange(const LabelName &label_name, const CompoundKey &start_key, const CompoundKey &end_key) const {
Shards GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const {
MG_ASSERT(start_key <= end_key);
@ -139,7 +205,7 @@ struct ShardMap {
return shards;
Shard GetShardForKey(const LabelName &label_name, const CompoundKey &key) const {
Shard GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const {
LabelId label_id =;
@ -99,7 +99,7 @@ class Shared {
bool IsReady() const {
std::unique_lock<std::mutex> lock(mu_);
return item_;
return item_.has_value();
std::optional<T> TryGet() {
@ -148,12 +148,6 @@ class Future {
old.consumed_or_moved_ = true;
Future &operator=(Future &&old) noexcept {
MG_ASSERT(!old.consumed_or_moved_, "Future moved from after already being moved from or consumed.");
shared_ = std::move(old.shared_);
old.consumed_or_moved_ = true;
Future(const Future &) = delete;
Future &operator=(const Future &) = delete;
~Future() = default;
@ -25,7 +25,7 @@ class LocalSystem {
Io<LocalTransport> Register(Address address) {
LocalTransport local_transport(local_transport_handle_, address);
LocalTransport local_transport(local_transport_handle_);
return Io{local_transport, address};
@ -25,18 +25,16 @@ namespace memgraph::io::local_transport {
class LocalTransport {
std::shared_ptr<LocalTransportHandle> local_transport_handle_;
const Address address_;
LocalTransport(std::shared_ptr<LocalTransportHandle> local_transport_handle, Address address)
: local_transport_handle_(std::move(local_transport_handle)), address_(address) {}
explicit LocalTransport(std::shared_ptr<LocalTransportHandle> local_transport_handle)
: local_transport_handle_(std::move(local_transport_handle)) {}
template <Message RequestT, Message ResponseT>
ResponseFuture<ResponseT> Request(Address to_address, RequestId request_id, RequestT request, Duration timeout) {
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestId request_id, RequestT request,
Duration timeout) {
auto [future, promise] = memgraph::io::FuturePromisePair<ResponseResult<ResponseT>>();
Address from_address = address_;
local_transport_handle_->SubmitRequest(to_address, from_address, request_id, std::move(request), timeout,
@ -44,9 +42,8 @@ class LocalTransport {
template <Message... Ms>
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Duration timeout) {
Address from_address = address_;
return local_transport_handle_->template Receive<Ms...>(timeout);
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Address receiver_address, Duration timeout) {
return local_transport_handle_->template Receive<Ms...>(receiver_address, timeout);
template <Message M>
@ -36,6 +36,13 @@ class LocalTransportHandle {
std::vector<OpaqueMessage> can_receive_;
~LocalTransportHandle() {
for (auto &&[pk, promise] : promises_) {
void ShutDown() {
std::unique_lock<std::mutex> lock(mu_);
should_shut_down_ = true;
@ -53,11 +60,13 @@ class LocalTransportHandle {
template <Message... Ms>
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Duration timeout) {
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Address /* receiver_address */, Duration timeout) {
std::unique_lock lock(mu_);
Time before = Now();
spdlog::info("can_receive_ size: {}", can_receive_.size());
while (can_receive_.empty()) {
Time now = Now();
@ -89,23 +98,26 @@ class LocalTransportHandle {
template <Message M>
void Send(Address to_address, Address from_address, RequestId request_id, M &&message) {
std::any message_any(std::forward<M>(message));
OpaqueMessage opaque_message{
.from_address = from_address, .request_id = request_id, .message = std::move(message_any)};
OpaqueMessage opaque_message{.to_address = to_address,
.from_address = from_address,
.request_id = request_id,
.message = std::move(message_any)};
PromiseKey promise_key{.requester_address = to_address,
.request_id = opaque_message.request_id,
.replier_address = opaque_message.from_address};
PromiseKey promise_key{
.requester_address = to_address, .request_id = opaque_message.request_id, .replier_address = from_address};
std::unique_lock<std::mutex> lock(mu_);
if (promises_.contains(promise_key)) {
spdlog::info("using message to fill promise");
// complete waiting promise if it's there
DeadlineAndOpaquePromise dop = std::move(;
} else {
spdlog::info("placing message in can_receive_");
} // lock dropped
@ -203,4 +203,30 @@ struct DeadlineAndOpaquePromise {
OpaquePromise promise;
template <class From>
std::type_info const &type_info_for_variant(From const &from) {
return std::visit([](auto &&x) -> decltype(auto) { return typeid(x); }, from);
template <typename From, typename Return, typename Head, typename... Rest>
std::optional<Return> ConvertVariantInner(From &&a) {
if (typeid(Head) == type_info_for_variant(a)) {
Head concrete = std::get<Head>(std::forward<From>(a));
return concrete;
if constexpr (sizeof...(Rest) > 0) {
return ConvertVariantInner<From, Return, Rest...>(std::forward<From>(a));
} else {
return std::nullopt;
/// This function converts a variant to another variant holding a subset OR superset of
/// possible types.
template <class From, class... Ms>
requires(sizeof...(Ms) > 0) std::optional<std::variant<Ms...>> ConvertVariant(From &&from) {
return ConvertVariantInner<From, std::variant<Ms...>, Ms...>(std::forward<From>(from));
} // namespace memgraph::io
Normal file
Normal file
@ -0,0 +1,48 @@
// Copyright 2022 Memgraph Ltd.
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <variant>
#include <coordinator/coordinator.hpp>
#include <io/rsm/raft.hpp>
#include <io/rsm/shard_rsm.hpp>
#include "utils/concepts.hpp"
namespace memgraph::io::messages {
using memgraph::coordinator::CoordinatorReadRequests;
using memgraph::coordinator::CoordinatorWriteRequests;
using memgraph::coordinator::CoordinatorWriteResponses;
// TODO(everbody) change these to the real shard messages
using memgraph::io::rsm::StorageReadRequest;
using memgraph::io::rsm::StorageWriteRequest;
using memgraph::io::rsm::AppendRequest;
using memgraph::io::rsm::AppendResponse;
using memgraph::io::rsm::ReadRequest;
using memgraph::io::rsm::VoteRequest;
using memgraph::io::rsm::VoteResponse;
using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using CoordinatorMessages =
std::variant<ReadRequest<CoordinatorReadRequests>, AppendRequest<CoordinatorWriteRequests>, AppendResponse,
WriteRequest<CoordinatorWriteRequests>, VoteRequest, VoteResponse>;
using ShardMessages = std::variant<ReadRequest<StorageReadRequest>, AppendRequest<StorageWriteRequest>, AppendResponse,
WriteRequest<StorageWriteRequest>, VoteRequest, VoteResponse>;
using ShardManagerMessages = std::variant<WriteResponse<CoordinatorWriteResponses>>;
} // namespace memgraph::io::messages
@ -53,7 +53,6 @@ static constexpr size_t kMaximumAppendBatchSize = 1024;
using Term = uint64_t;
using LogIndex = uint64_t;
using LogSize = uint64_t;
using RequestId = uint64_t;
template <typename WriteOperation>
struct WriteRequest {
@ -230,22 +229,60 @@ class Raft {
Io<IoImpl> io_;
std::vector<Address> peers_;
ReplicatedState replicated_state_;
Time next_cron_;
Raft(Io<IoImpl> &&io, std::vector<Address> peers, ReplicatedState &&replicated_state)
: io_(std::forward<Io<IoImpl>>(io)),
replicated_state_(std::forward<ReplicatedState>(replicated_state)) {}
replicated_state_(std::forward<ReplicatedState>(replicated_state)) {
if (peers.empty()) {
role_ = Leader{};
/// Periodic protocol maintenance. Returns the time that Cron should be called again
/// in the future.
Time Cron() {
// dispatch periodic logic based on our role to a specific Cron method.
std::optional<Role> new_role = std::visit([&](auto &role) { return Cron(role); }, role_);
if (new_role) {
role_ = std::move(new_role).value();
const Duration random_cron_interval = RandomTimeout(kMinimumCronInterval, kMaximumCronInterval);
return io_.Now() + random_cron_interval;
/// Returns the Address for our underlying Io implementation
Address GetAddress() { return io_.GetAddress(); }
using ReceiveVariant = std::variant<ReadRequest<ReadOperation>, AppendRequest<WriteOperation>, AppendResponse,
WriteRequest<WriteOperation>, VoteRequest, VoteResponse>;
void Handle(ReceiveVariant &&message_variant, RequestId request_id, Address from_address) {
// dispatch the message to a handler based on our role,
// which can be specified in the Handle first argument,
// or it can be `auto` if it's a handler for several roles
// or messages.
std::optional<Role> new_role = std::visit(
[&](auto &&msg, auto &role) mutable {
return Handle(role, std::forward<decltype(msg)>(msg), request_id, from_address);
std::forward<ReceiveVariant>(message_variant), role_);
// TODO(tyler) (M3) maybe replace std::visit with get_if for explicit prioritized matching, [[likely]] etc...
if (new_role) {
role_ = std::move(new_role).value();
void Run() {
Time last_cron = io_.Now();
while (!io_.ShouldShutDown()) {
const auto now = io_.Now();
const Duration random_cron_interval = RandomTimeout(kMinimumCronInterval, kMaximumCronInterval);
if (now - last_cron > random_cron_interval) {
last_cron = now;
if (now >= next_cron_) {
next_cron_ = Cron();
const Duration receive_timeout = RandomTimeout(kMinimumReceiveTimeout, kMaximumReceiveTimeout);
@ -449,16 +486,6 @@ class Raft {
/// been received.
/// Periodic protocol maintenance.
void Cron() {
// dispatch periodic logic based on our role to a specific Cron method.
std::optional<Role> new_role = std::visit([&](auto &role) { return Cron(role); }, role_);
if (new_role) {
role_ = std::move(new_role).value();
// Raft paper - 5.2
// Candidates keep sending Vote to peers until:
// 1. receiving Append with a higher term (become Follower)
@ -541,26 +568,6 @@ class Raft {
/// message that has been received.
using ReceiveVariant = std::variant<ReadRequest<ReadOperation>, AppendRequest<WriteOperation>, AppendResponse,
WriteRequest<WriteOperation>, VoteRequest, VoteResponse>;
void Handle(ReceiveVariant &&message_variant, RequestId request_id, Address from_address) {
// dispatch the message to a handler based on our role,
// which can be specified in the Handle first argument,
// or it can be `auto` if it's a handler for several roles
// or messages.
std::optional<Role> new_role = std::visit(
[&](auto &&msg, auto &role) mutable {
return Handle(role, std::forward<decltype(msg)>(msg), request_id, from_address);
std::forward<ReceiveVariant>(message_variant), role_);
// TODO(tyler) (M3) maybe replace std::visit with get_if for explicit prioritized matching, [[likely]] etc...
if (new_role) {
role_ = std::move(new_role).value();
// all roles can receive Vote and possibly become a follower
template <AllRoles ALL>
std::optional<Role> Handle(ALL & /* variable */, VoteRequest &&req, RequestId request_id, Address from_address) {
@ -906,7 +913,11 @@ class Raft {
leader.pending_client_requests.emplace(log_index, pcr);
if (peers_.empty()) {
} else {
return std::nullopt;
@ -33,19 +33,21 @@ class SimulatorTransport {
: simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {}
template <Message RequestT, Message ResponseT>
ResponseFuture<ResponseT> Request(Address address, uint64_t request_id, RequestT request, Duration timeout) {
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, uint64_t request_id, RequestT request,
Duration timeout) {
std::function<bool()> maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); };
auto [future, promise] =
simulator_handle_->SubmitRequest(address, address_, request_id, std::move(request), timeout, std::move(promise));
simulator_handle_->SubmitRequest(to_address, from_address, request_id, std::move(request), timeout,
return std::move(future);
template <Message... Ms>
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Duration timeout) {
return simulator_handle_->template Receive<Ms...>(address_, timeout);
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(Address receiver_address, Duration timeout) {
return simulator_handle_->template Receive<Ms...>(receiver_address, timeout);
template <Message M>
@ -83,23 +83,26 @@ class Io {
template <Message RequestT, Message ResponseT>
ResponseFuture<ResponseT> RequestWithTimeout(Address address, RequestT request, Duration timeout) {
const RequestId request_id = ++request_id_counter_;
return implementation_.template Request<RequestT, ResponseT>(address, request_id, request, timeout);
const Address from_address = address_;
return implementation_.template Request<RequestT, ResponseT>(address, from_address, request_id, request, timeout);
/// Issue a request that times out after the default timeout. This tends
/// to be used by clients.
template <Message RequestT, Message ResponseT>
ResponseFuture<ResponseT> Request(Address address, RequestT request) {
ResponseFuture<ResponseT> Request(Address to_address, RequestT request) {
const RequestId request_id = ++request_id_counter_;
const Duration timeout = default_timeout_;
return implementation_.template Request<RequestT, ResponseT>(address, request_id, std::move(request), timeout);
const Address from_address = address_;
return implementation_.template Request<RequestT, ResponseT>(to_address, from_address, request_id,
std::move(request), timeout);
/// Wait for an explicit number of microseconds for a request of one of the
/// provided types to arrive. This tends to be used by servers.
template <Message... Ms>
RequestResult<Ms...> ReceiveWithTimeout(Duration timeout) {
return implementation_.template Receive<Ms...>(timeout);
return implementation_.template Receive<Ms...>(address_, timeout);
/// Wait the default number of microseconds for a request of one of the
@ -107,7 +110,7 @@ class Io {
template <Message... Ms>
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive() {
const Duration timeout = default_timeout_;
return implementation_.template Receive<Ms...>(timeout);
return implementation_.template Receive<Ms...>(address_, timeout);
/// Send a message in a best-effort fashion. This is used for messaging where
@ -134,5 +137,8 @@ class Io {
Address GetAddress() { return address_; }
void SetAddress(Address address) { address_ = address; }
Io<I> ForkLocal() { return Io(implementation_, address_.ForkUniqueAddress()); }
}; // namespace memgraph::io
Normal file
Normal file
@ -0,0 +1,42 @@
// Copyright 2022 Memgraph Ltd.
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <boost/asio/ip/tcp.hpp>
#include "io/address.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/schemas.hpp"
namespace memgraph::machine_manager {
using memgraph::io::Address;
using memgraph::storage::v3::SchemaProperty;
using CompoundKey = std::vector<memgraph::storage::v3::PropertyValue>;
struct InitialLabelSpace {
std::string label_name;
std::vector<SchemaProperty> schema;
size_t replication_factor;
std::vector<CompoundKey> split_points;
struct MachineConfig {
std::vector<InitialLabelSpace> initial_label_spaces;
std::vector<Address> coordinator_addresses;
bool is_storage;
bool is_coordinator;
bool is_query_engine;
boost::asio::ip::address listen_ip;
uint16_t listen_port;
} // namespace memgraph::machine_manager
Normal file
Normal file
@ -0,0 +1,180 @@
// Copyright 2022 Memgraph Ltd.
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <boost/uuid/uuid.hpp>
#include <coordinator/coordinator_rsm.hpp>
#include <io/message_conversion.hpp>
#include <io/messages.hpp>
#include <io/rsm/rsm_client.hpp>
#include <io/time.hpp>
#include <machine_manager/machine_config.hpp>
#include <storage/v3/shard_manager.hpp>
namespace memgraph::machine_manager {
using memgraph::coordinator::Coordinator;
using memgraph::coordinator::CoordinatorReadRequests;
using memgraph::coordinator::CoordinatorReadResponses;
using memgraph::coordinator::CoordinatorRsm;
using memgraph::coordinator::CoordinatorWriteRequests;
using memgraph::coordinator::CoordinatorWriteResponses;
using memgraph::io::ConvertVariant;
using memgraph::io::Duration;
using memgraph::io::RequestId;
using memgraph::io::Time;
using memgraph::io::messages::CoordinatorMessages;
using memgraph::io::messages::ShardManagerMessages;
using memgraph::io::messages::ShardMessages;
using memgraph::io::rsm::AppendRequest;
using memgraph::io::rsm::AppendResponse;
using memgraph::io::rsm::ReadRequest;
using memgraph::io::rsm::VoteRequest;
using memgraph::io::rsm::VoteResponse;
using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using memgraph::storage::v3::ShardManager;
using memgraph::io::rsm::StorageReadRequest;
using memgraph::io::rsm::StorageWriteRequest;
/// The MachineManager is responsible for:
/// * starting the entire system and ensuring that high-level
/// operational requirements continue to be met
/// * acting as a machine's single caller of Io::Receive
/// * routing incoming messages from the Io interface to the
/// appropriate Coordinator or to the StorageManager
/// (it's not necessary to route anything in this layer
/// to the query engine because the query engine only
/// communicates using higher-level Futures that will
/// be filled immediately when the response comes in
/// at the lower-level transport layer.
/// Every storage engine has exactly one RsmEngine.
template <typename IoImpl>
class MachineManager {
io::Io<IoImpl> io_;
MachineConfig config_;
CoordinatorRsm<IoImpl> coordinator_;
ShardManager<IoImpl> shard_manager_;
Time next_cron_;
// TODO initialize ShardManager with "real" coordinator addresses instead of io.GetAddress
// which is only true for single-machine config.
MachineManager(io::Io<IoImpl> io, MachineConfig config, Coordinator coordinator)
: io_(io),
coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)},
shard_manager_(ShardManager{io.ForkLocal(), coordinator_.GetAddress()}) {}
Address CoordinatorAddress() { return coordinator_.GetAddress(); }
void Run() {
while (!io_.ShouldShutDown()) {
const auto now = io_.Now();
if (now >= next_cron_) {
next_cron_ = Cron();
Duration receive_timeout = next_cron_ - now;
// Note: this parameter pack must be kept in-sync with the ReceiveWithTimeout parameter pack below
using AllMessages =
std::variant<ReadRequest<CoordinatorReadRequests>, AppendRequest<CoordinatorWriteRequests>, AppendResponse,
WriteRequest<CoordinatorWriteRequests>, VoteRequest, VoteResponse,
WriteResponse<CoordinatorWriteResponses>, ReadRequest<StorageReadRequest>,
AppendRequest<StorageWriteRequest>, WriteRequest<StorageWriteRequest>>;
spdlog::info("MM waiting on Receive");
// Note: this parameter pack must be kept in-sync with the AllMessages parameter pack above
auto request_result = io_.template ReceiveWithTimeout<
ReadRequest<CoordinatorReadRequests>, AppendRequest<CoordinatorWriteRequests>, AppendResponse,
WriteRequest<CoordinatorWriteRequests>, VoteRequest, VoteResponse, WriteResponse<CoordinatorWriteResponses>,
ReadRequest<StorageReadRequest>, AppendRequest<StorageWriteRequest>, WriteRequest<StorageWriteRequest>>(
if (request_result.HasError()) {
// time to do Cron
spdlog::info("MM got timeout");
auto &&request_envelope = std::move(request_result.GetValue());
spdlog::info("MM got message to {}", request_envelope.to_address.ToString());
// If message is for the coordinator, cast it to subset and pass it to the coordinator
bool to_coordinator = coordinator_.GetAddress() == request_envelope.to_address;
spdlog::info("coordinator: {}", coordinator_.GetAddress().ToString());
if (to_coordinator) {
std::optional<CoordinatorMessages> conversion_attempt =
ConvertVariant<AllMessages, ReadRequest<CoordinatorReadRequests>, AppendRequest<CoordinatorWriteRequests>,
AppendResponse, WriteRequest<CoordinatorWriteRequests>, VoteRequest, VoteResponse>(
MG_ASSERT(conversion_attempt.has_value(), "coordinator message conversion failed");
spdlog::info("got coordinator message");
CoordinatorMessages &&cm = std::move(conversion_attempt.value());
coordinator_.Handle(std::forward<CoordinatorMessages>(cm), request_envelope.request_id,
bool to_sm = shard_manager_.GetAddress() == request_envelope.to_address;
spdlog::info("smm: {}", shard_manager_.GetAddress().ToString());
if (to_sm) {
std::optional<ShardManagerMessages> conversion_attempt =
ConvertVariant<AllMessages, WriteResponse<CoordinatorWriteResponses>>(std::move(request_envelope.message));
MG_ASSERT(conversion_attempt.has_value(), "shard manager message conversion failed");
spdlog::info("got shard manager message");
ShardManagerMessages &&smm = std::move(conversion_attempt.value());
shard_manager_.Receive(std::forward<ShardManagerMessages>(smm), request_envelope.request_id,
// treat this as a message to a specific shard rsm and cast it accordingly
std::optional<ShardMessages> conversion_attempt =
ConvertVariant<AllMessages, ReadRequest<StorageReadRequest>, AppendRequest<StorageWriteRequest>,
AppendResponse, WriteRequest<StorageWriteRequest>, VoteRequest, VoteResponse>(
MG_ASSERT(conversion_attempt.has_value(), "shard rsm message conversion failed for {}",
spdlog::info("got shard rsm message");
ShardMessages &&sm = std::move(conversion_attempt.value());
shard_manager_.Route(std::forward<ShardMessages>(sm), request_envelope.request_id, request_envelope.to_address,
Time Cron() {
spdlog::info("running MachineManager::Cron, address {}", io_.GetAddress().ToString());
return shard_manager_.Cron();
} // namespace memgraph::machine_manager
Normal file
Normal file
@ -0,0 +1,222 @@
// Copyright 2022 Memgraph Ltd.
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <queue>
#include <set>
#include <boost/uuid/uuid.hpp>
#include <coordinator/coordinator.hpp>
#include <io/address.hpp>
#include <io/message_conversion.hpp>
#include <io/messages.hpp>
#include <io/rsm/raft.hpp>
#include <io/rsm/shard_rsm.hpp>
#include <io/time.hpp>
#include <io/transport.hpp>
namespace memgraph::storage::v3 {
using boost::uuids::uuid;
using memgraph::coordinator::CoordinatorWriteRequests;
using memgraph::coordinator::CoordinatorWriteResponses;
using memgraph::coordinator::HeartbeatRequest;
using memgraph::coordinator::HeartbeatResponse;
using memgraph::io::Address;
using memgraph::io::Duration;
using memgraph::io::Message;
using memgraph::io::RequestId;
using memgraph::io::ResponseFuture;
using memgraph::io::Time;
using memgraph::io::messages::CoordinatorMessages;
using memgraph::io::messages::ShardManagerMessages;
using memgraph::io::messages::ShardMessages;
using memgraph::io::rsm::Raft;
using memgraph::io::rsm::ShardRsm;
using memgraph::io::rsm::StorageReadRequest;
using memgraph::io::rsm::StorageReadResponse;
using memgraph::io::rsm::StorageWriteRequest;
using memgraph::io::rsm::StorageWriteResponse;
using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using ShardManagerOrRsmMessage = std::variant<ShardMessages, ShardManagerMessages>;
using TimeUuidPair = std::pair<Time, uuid>;
template <typename IoImpl>
using ShardRaft =
Raft<IoImpl, ShardRsm, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse>;
using namespace std::chrono_literals;
static constexpr Duration kMinimumCronInterval = 1000ms;
static constexpr Duration kMaximumCronInterval = 2000ms;
static_assert(kMinimumCronInterval < kMaximumCronInterval,
"The minimum cron interval has to be smaller than the maximum cron interval!");
/// The ShardManager is responsible for:
/// * reconciling the storage engine's local configuration with the Coordinator's
/// intentions for how it should participate in multiple raft clusters
/// * replying to heartbeat requests to the Coordinator
/// * routing incoming messages to the appropriate sRSM
/// Every storage engine has exactly one RsmEngine.
template <typename IoImpl>
class ShardManager {
ShardManager(io::Io<IoImpl> io, Address coordinator_leader) : io_(io), coordinator_leader_(coordinator_leader) {}
/// Periodic protocol maintenance. Returns the time that Cron should be called again
/// in the future.
Time Cron() {
spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString());
Time now = io_.Now();
if (now >= next_cron_) {
std::uniform_int_distribution time_distrib(kMinimumCronInterval.count(), kMaximumCronInterval.count());
const auto rand = io_.Rand(time_distrib);
next_cron_ = now + Duration{rand};
if (!cron_schedule_.empty()) {
const auto &[time, uuid] =;
if (time <= now) {
auto &rsm =;
Time next_for_uuid = rsm.Cron();
cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
const auto &[next_time, _uuid] =;
return std::min(next_cron_, next_time);
return next_cron_;
/// Returns the Address for our underlying Io implementation
Address GetAddress() { return io_.GetAddress(); }
void Receive(ShardManagerMessages &&smm, RequestId request_id, Address from) {}
void Route(ShardMessages &&sm, RequestId request_id, Address to, Address from) {
Address address = io_.GetAddress();
MG_ASSERT(address.last_known_port == to.last_known_port);
MG_ASSERT(address.last_known_ip == to.last_known_ip);
auto &rsm =;
rsm.Handle(std::forward<ShardMessages>(sm), request_id, from);
io::Io<IoImpl> io_;
std::map<uuid, ShardRaft<IoImpl>> rsm_map_;
std::priority_queue<std::pair<Time, uuid>, std::vector<std::pair<Time, uuid>>, std::greater<>> cron_schedule_;
Time next_cron_;
Address coordinator_leader_;
std::optional<ResponseFuture<WriteResponse<CoordinatorWriteResponses>>> heartbeat_res_;
// TODO(tyler) over time remove items from initialized_but_not_confirmed_rsm_
// after the Coordinator is clearly aware of them
std::set<boost::uuids::uuid> initialized_but_not_confirmed_rsm_;
void Reconciliation() {
if (heartbeat_res_.has_value()) {
if (heartbeat_res_->IsReady()) {
io::ResponseResult<WriteResponse<CoordinatorWriteResponses>> response_result =
if (response_result.HasError()) {
spdlog::error("SM timed out while trying to reach C");
} else {
auto response_envelope = response_result.GetValue();
WriteResponse<CoordinatorWriteResponses> wr = response_envelope.message;
if (wr.retry_leader.has_value()) {
spdlog::info("SM redirected to new C leader");
coordinator_leader_ = wr.retry_leader.value();
} else if (wr.success) {
CoordinatorWriteResponses cwr = wr.write_return;
HeartbeatResponse hr = std::get<HeartbeatResponse>(cwr);
spdlog::info("SM received heartbeat response from C");
} else {
HeartbeatRequest req{
.from_storage_manager = GetAddress(),
.initialized_rsms = initialized_but_not_confirmed_rsm_,
CoordinatorWriteRequests cwr = req;
WriteRequest<CoordinatorWriteRequests> ww;
ww.operation = cwr;
spdlog::info("SM sending heartbeat to coordinator {}", coordinator_leader_.ToString());
io_.template Request<WriteRequest<CoordinatorWriteRequests>, WriteResponse<CoordinatorWriteResponses>>(
coordinator_leader_, ww)));
spdlog::info("SM sent heartbeat");
void EnsureShardsInitialized(HeartbeatResponse hr) {
for (const auto &shard_to_initialize : hr.shards_to_initialize) {
/// Returns true if the RSM was able to be initialized, and false if it was already initialized
void InitializeRsm(coordinator::ShardToInitialize to_init) {
if (rsm_map_.contains(to_init.uuid)) {
// it's not a bug for the coordinator to send us UUIDs that we have
// already created, because there may have been lag that caused
// the coordinator not to hear back from us.
auto rsm_io = io_.ForkLocal();
auto io_addr = rsm_io.GetAddress();
io_addr.unique_id = to_init.uuid;
// TODO(tyler) get geers from Coordinator in HeartbeatResponse
std::vector<Address> rsm_peers = {};
// TODO(everbody) change this to storage::Shard
ShardRsm rsm_state{};
ShardRaft<IoImpl> rsm{std::move(rsm_io), rsm_peers, std::move(rsm_state)};
spdlog::info("SM created a new shard with UUID {}", to_init.uuid);
rsm_map_.emplace(to_init.uuid, std::move(rsm));
} // namespace memgraph::storage::v3
@ -11,11 +11,13 @@ function(add_simulation_test test_cpp)
get_filename_component(exec_name ${test_cpp} NAME_WE)
set(target_name ${test_prefix}${exec_name})
add_executable(${target_name} ${test_cpp})
# OUTPUT_NAME sets the real name of a target when it is built and can be
# used to help create two targets of the same name even though CMake
# requires unique logical target names
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
target_link_libraries(${target_name} mg-storage-v3 mg-communication gtest gmock mg-utils mg-io mg-io-simulator Boost::headers)
target_link_libraries(${target_name} mg-storage-v3 mg-communication gtest gmock mg-utils mg-io mg-io-simulator mg-coordinator Boost::headers)
# register test
add_test(${target_name} ${exec_name})
add_dependencies(memgraph__simulation ${target_name})
@ -32,13 +32,14 @@
#include "storage/v3/schemas.hpp"
#include "utils/result.hpp"
using memgraph::common::SchemaType;
using memgraph::coordinator::AddressAndStatus;
using memgraph::coordinator::CompoundKey;
using memgraph::coordinator::Coordinator;
using memgraph::coordinator::CoordinatorClient;
using memgraph::coordinator::CoordinatorRsm;
using memgraph::coordinator::HlcRequest;
using memgraph::coordinator::HlcResponse;
using memgraph::coordinator::PrimaryKey;
using memgraph::coordinator::Shard;
using memgraph::coordinator::ShardMap;
using memgraph::coordinator::Shards;
@ -65,9 +66,11 @@ using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::PropertyValue;
using memgraph::storage::v3::SchemaProperty;
using memgraph::utils::BasicResult;
using PrimaryKey = std::vector<PropertyValue>;
using ShardClient =
RsmClient<SimulatorTransport, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse>;
namespace {
@ -83,18 +86,20 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add
const auto properties = sm.AllocatePropertyIds(property_names);
const auto property_id_1 ="property_1");
const auto property_id_2 ="property_2");
const auto type_1 = memgraph::common::SchemaType::INT;
const auto type_2 = memgraph::common::SchemaType::INT;
const auto type_1 = SchemaType::INT;
const auto type_2 = SchemaType::INT;
// register new label space
std::vector<SchemaProperty> schema = {
SchemaProperty{.property_id = property_id_1, .type = type_1},
SchemaProperty{.property_id = property_id_2, .type = type_2},
bool label_success = sm.InitializeNewLabel(label_name, schema, sm.shard_map_version);
size_t replication_factor = 3;
std::optional<LabelId> label_id_opt =
sm.InitializeNewLabel(label_name, schema, replication_factor, sm.shard_map_version);
const LabelId label_id =;
const LabelId label_id = label_id_opt.value();
auto &label_space =;
Shards &shards_for_label = label_space.shards;
@ -105,9 +110,9 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add
Shard shard1 = {aas1_1, aas1_2, aas1_3};
const auto key1 = memgraph::storage::v3::PropertyValue(0);
const auto key2 = memgraph::storage::v3::PropertyValue(0);
const CompoundKey compound_key_1 = {key1, key2};
const auto key1 = PropertyValue(0);
const auto key2 = PropertyValue(0);
const PrimaryKey compound_key_1 = {key1, key2};
shards_for_label.emplace(compound_key_1, shard1);
// add second shard at [12, 13]
@ -117,9 +122,9 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add
Shard shard2 = {aas2_1, aas2_2, aas2_3};
auto key3 = memgraph::storage::v3::PropertyValue(12);
auto key4 = memgraph::storage::v3::PropertyValue(13);
CompoundKey compound_key_2 = {key3, key4};
auto key3 = PropertyValue(12);
auto key4 = PropertyValue(13);
PrimaryKey compound_key_2 = {key3, key4};
shards_for_label[compound_key_2] = shard2;
return sm;
@ -263,11 +268,11 @@ int main() {
req.last_shard_map_version = client_shard_map.GetHlc();
while (true) {
// Create CompoundKey
const auto cm_key_1 = memgraph::storage::v3::PropertyValue(3);
const auto cm_key_2 = memgraph::storage::v3::PropertyValue(4);
// Create PrimaryKey
const auto cm_key_1 = PropertyValue(3);
const auto cm_key_2 = PropertyValue(4);
const CompoundKey compound_key = {cm_key_1, cm_key_2};
const PrimaryKey compound_key = {cm_key_1, cm_key_2};
// Look for Shard
BasicResult<TimedOut, memgraph::coordinator::CoordinatorWriteResponses> read_res =
@ -414,10 +414,14 @@ add_custom_target(test_lcp ALL DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/test_lcp)
add_test(test_lcp ${CMAKE_CURRENT_BINARY_DIR}/test_lcp)
add_dependencies(memgraph__unit test_lcp)
# Test future
# Test Future
target_link_libraries(${test_prefix}future mg-io)
# Test Local Transport
# Test LocalTransport
target_link_libraries(${test_prefix}local_transport mg-io)
# Test MachineManager with LocalTransport
target_link_libraries(${test_prefix}machine_manager mg-io mg-coordinator mg-storage-v3)
Normal file
Normal file
@ -0,0 +1,209 @@
// Copyright 2022 Memgraph Ltd.
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <chrono>
#include <limits>
#include <thread>
#include <gtest/gtest.h>
#include <coordinator/coordinator.hpp>
#include <coordinator/coordinator_client.hpp>
#include <coordinator/hybrid_logical_clock.hpp>
#include <coordinator/shard_map.hpp>
#include <io/local_transport/local_system.hpp>
#include <io/local_transport/local_transport.hpp>
#include <io/rsm/rsm_client.hpp>
#include <io/transport.hpp>
#include <machine_manager/machine_config.hpp>
#include <machine_manager/machine_manager.hpp>
#include "io/rsm/rsm_client.hpp"
#include "io/rsm/shard_rsm.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/schemas.hpp"
namespace memgraph::io::tests {
using memgraph::coordinator::Coordinator;
using memgraph::coordinator::CoordinatorClient;
using memgraph::coordinator::CoordinatorReadRequests;
using memgraph::coordinator::CoordinatorReadResponses;
using memgraph::coordinator::CoordinatorWriteRequests;
using memgraph::coordinator::CoordinatorWriteResponses;
using memgraph::coordinator::Hlc;
using memgraph::coordinator::HlcResponse;
using memgraph::coordinator::Shard;
using memgraph::coordinator::ShardMap;
using memgraph::io::Io;
using memgraph::io::local_transport::LocalSystem;
using memgraph::io::local_transport::LocalTransport;
using memgraph::io::rsm::RsmClient;
using memgraph::io::rsm::StorageReadRequest;
using memgraph::io::rsm::StorageReadResponse;
using memgraph::io::rsm::StorageWriteRequest;
using memgraph::io::rsm::StorageWriteResponse;
using memgraph::machine_manager::MachineConfig;
using memgraph::machine_manager::MachineManager;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::SchemaProperty;
using CompoundKey = std::vector<memgraph::storage::v3::PropertyValue>;
using ShardClient =
RsmClient<LocalTransport, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse>;
ShardMap TestShardMap() {
ShardMap sm{};
const std::string label_name = std::string("test_label");
// register new properties
const std::vector<std::string> property_names = {"property_1", "property_2"};
const auto properties = sm.AllocatePropertyIds(property_names);
const auto property_id_1 ="property_1");
const auto property_id_2 ="property_2");
const auto type_1 = memgraph::common::SchemaType::INT;
const auto type_2 = memgraph::common::SchemaType::INT;
// register new label space
std::vector<SchemaProperty> schema = {
SchemaProperty{.property_id = property_id_1, .type = type_1},
SchemaProperty{.property_id = property_id_2, .type = type_2},
const size_t replication_factor = 1;
std::optional<LabelId> label_id = sm.InitializeNewLabel(label_name, schema, replication_factor, sm.shard_map_version);
// split the shard at N split points
// NB: this is the logic that should be provided by the "split file"
// TODO(tyler) split points should account for signedness
const size_t n_splits = 16;
const auto split_interval = std::numeric_limits<int64_t>::max() / n_splits;
for (int64_t i = 0; i < n_splits; ++i) {
const int64_t value = i * split_interval;
const auto key1 = memgraph::storage::v3::PropertyValue(value);
const auto key2 = memgraph::storage::v3::PropertyValue(0);
const CompoundKey split_point = {key1, key2};
const bool split_success = sm.SplitShard(sm.shard_map_version, label_id.value(), split_point);
return sm;
MachineManager<LocalTransport> MkMm(LocalSystem &local_system, std::vector<Address> coordinator_addresses, Address addr,
ShardMap shard_map) {
MachineConfig config{
.coordinator_addresses = coordinator_addresses,
.is_storage = true,
.is_coordinator = true,
.listen_ip = addr.last_known_ip,
.listen_port = addr.last_known_port,
Io<LocalTransport> io = local_system.Register(addr);
Coordinator coordinator{shard_map};
return MachineManager{io, config, coordinator};
void RunMachine(MachineManager<LocalTransport> mm) { mm.Run(); }
void WaitForShardsToInitialize(CoordinatorClient<LocalTransport> &cc) {
// TODO(tyler) call coordinator client's read method for GetShardMap
// and keep reading it until the shard map contains proper replicas
// for each shard in the label space.
TEST(MachineManager, BasicFunctionality) {
LocalSystem local_system;
auto cli_addr = Address::TestAddress(1);
auto machine_1_addr = cli_addr.ForkUniqueAddress();
Io<LocalTransport> cli_io = local_system.Register(cli_addr);
auto coordinator_addresses = std::vector{
ShardMap initialization_sm = TestShardMap();
auto mm_1 = MkMm(local_system, coordinator_addresses, machine_1_addr, initialization_sm);
Address coordinator_address = mm_1.CoordinatorAddress();
auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1));
// TODO(tyler) clarify addresses of coordinator etc... as it's a mess
CoordinatorClient<LocalTransport> cc{cli_io, coordinator_address, {coordinator_address}};
using namespace std::chrono_literals;
// get ShardMap from coordinator
memgraph::coordinator::HlcRequest req;
req.last_shard_map_version = Hlc{
.logical_id = 0,
BasicResult<TimedOut, memgraph::coordinator::CoordinatorWriteResponses> read_res = cc.SendWriteRequest(req);
MG_ASSERT(!read_res.HasError(), "HLC request unexpectedly timed out");
auto coordinator_read_response = read_res.GetValue();
HlcResponse hlc_response = std::get<HlcResponse>(coordinator_read_response);
ShardMap sm = hlc_response.fresher_shard_map.value();
// Get shard for key and create rsm client
const auto cm_key_1 = memgraph::storage::v3::PropertyValue(3);
const auto cm_key_2 = memgraph::storage::v3::PropertyValue(4);
const CompoundKey compound_key = {cm_key_1, cm_key_2};
std::string label_name = "test_label";
Shard shard_for_key = sm.GetShardForKey(label_name, compound_key);
auto shard_for_client = std::vector<Address>{};
for (const auto &aas : shard_for_key) {
spdlog::info("got address for shard: {}", aas.address.ToString());
ShardClient shard_client{cli_io, shard_for_client[0], shard_for_client};
// submit a read request and assert that the requested key does not yet exist
LabelId label_id =;
StorageReadRequest storage_get_req;
storage_get_req.label_id = label_id;
storage_get_req.key = compound_key;
storage_get_req.transaction_id = hlc_response.new_hlc;
auto get_response_result = shard_client.SendReadRequest(storage_get_req);
auto get_response = get_response_result.GetValue();
auto val = get_response.value;
} // namespace memgraph::io::tests
Reference in New Issue
Block a user