Merge project-pineapples

This commit is contained in:
Kostas Kyrimis 2022-09-06 13:05:48 +03:00
commit 5fa4e8c33a
10 changed files with 192 additions and 182 deletions

View File

@ -11,7 +11,11 @@
#pragma once
#include <optional>
#include <string>
#include <unordered_set>
#include <variant>
#include <vector>
#include "coordinator/hybrid_logical_clock.hpp"
#include "coordinator/shard_map.hpp"
@ -19,6 +23,7 @@
#include "io/time.hpp"
#include "io/transport.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/schemas.hpp"
namespace memgraph::coordinator {
@ -26,6 +31,7 @@ using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::PropertyId;
using Address = memgraph::io::Address;
using SimT = memgraph::io::simulator::SimulatorTransport;
using memgraph::storage::v3::SchemaProperty;
struct HlcRequest {
Hlc last_shard_map_version;
@ -100,6 +106,7 @@ struct DeregisterStorageEngineResponse {
struct InitializeLabelRequest {
std::string label_name;
std::vector<SchemaProperty> schema;
Hlc last_shard_map_version;
};
@ -108,49 +115,65 @@ struct InitializeLabelResponse {
std::optional<ShardMap> fresher_shard_map;
};
using WriteRequests =
std::variant<AllocateHlcBatchRequest, AllocateEdgeIdBatchRequest, SplitShardRequest, RegisterStorageEngineRequest,
DeregisterStorageEngineRequest, InitializeLabelRequest, AllocatePropertyIdsRequest>;
using WriteResponses = std::variant<AllocateHlcBatchResponse, AllocateEdgeIdBatchResponse, SplitShardResponse,
RegisterStorageEngineResponse, DeregisterStorageEngineResponse,
InitializeLabelResponse, AllocatePropertyIdsResponse>;
struct HeartbeatRequest {};
struct HeartbeatResponse {};
using ReadRequests = std::variant<HlcRequest, GetShardMapRequest>;
using ReadResponses = std::variant<HlcResponse, GetShardMapResponse>;
using CoordinatorWriteRequests =
std::variant<HlcRequest, AllocateEdgeIdBatchRequest, SplitShardRequest, RegisterStorageEngineRequest,
DeregisterStorageEngineRequest, InitializeLabelRequest, AllocatePropertyIdsRequest>;
using CoordinatorWriteResponses =
std::variant<HlcResponse, AllocateEdgeIdBatchResponse, SplitShardResponse, RegisterStorageEngineResponse,
DeregisterStorageEngineResponse, InitializeLabelResponse, AllocatePropertyIdsResponse>;
using CoordinatorReadRequests = std::variant<GetShardMapRequest, HeartbeatRequest>;
using CoordinatorReadResponses = std::variant<GetShardMapResponse, HeartbeatResponse>;
class Coordinator {
public:
explicit Coordinator(ShardMap sm) : shard_map_{std::move(sm)} {}
// NOLINTNEXTLINE(readability-convert-member-functions-to-static
CoordinatorReadResponses Read(CoordinatorReadRequests requests) {
return std::visit([&](auto &&request) { return HandleRead(std::forward<decltype(request)>(request)); },
std::move(requests)); // NOLINT(hicpp-move-const-arg,performance-move-const-arg)
}
// NOLINTNEXTLINE(readability-convert-member-functions-to-static
CoordinatorWriteResponses Apply(CoordinatorWriteRequests requests) {
return std::visit([&](auto &&request) mutable { return ApplyWrite(std::forward<decltype(request)>(request)); },
std::move(requests));
}
private:
ShardMap shard_map_;
/// The highest reserved timestamp / highest allocated timestamp
/// is a way for minimizing communication involved in query engines
/// reserving Hlc's for their transaction processing.
/// Periodically, the coordinator will allocate a batch of timestamps
/// and this will need to go over consensus. From that point forward,
/// each timestamp in that batch can be given out to "readers" who issue
/// HlcRequest without blocking on consensus first. But if
/// highest_allocated_timestamp_ approaches highest_reserved_timestamp_,
/// it is time to allocate another batch, so that we can keep guaranteeing
/// forward progress.
/// Any time a coordinator becomes a new leader, it will need to issue
/// a new AllocateHlcBatchRequest to create a pool of IDs to allocate.
uint64_t highest_allocated_timestamp_;
uint64_t highest_reserved_timestamp_;
/// Query engines need to periodically request batches of unique edge IDs.
uint64_t highest_allocated_edge_id_;
/// Increment our
ReadResponses HandleRead(HlcRequest &&hlc_request) {
static CoordinatorReadResponses HandleRead(HeartbeatRequest && /* heartbeat_request */) {
return HeartbeatResponse{};
}
CoordinatorReadResponses HandleRead(GetShardMapRequest && /* get_shard_map_request */) {
GetShardMapResponse res;
res.shard_map = shard_map_;
return res;
}
CoordinatorWriteResponses ApplyWrite(HlcRequest &&hlc_request) {
HlcResponse res{};
auto hlc_shard_map = shard_map_.GetHlc();
MG_ASSERT(!(hlc_request.last_shard_map_version.logical_id > hlc_shard_map.logical_id));
res.new_hlc = shard_map_.IncrementShardMapVersion();
// res.fresher_shard_map = hlc_request.last_shard_map_version.logical_id < hlc_shard_map.logical_id
// ? std::make_optional(shard_map_)
// : std::nullopt;
res.new_hlc = Hlc{
.logical_id = ++highest_allocated_timestamp_,
// TODO(tyler) probably pass some more context to the Coordinator here
// so that we can use our wall clock and enforce monotonicity.
// .coordinator_wall_clock = io_.Now(),
};
// Allways return fresher shard_map for now.
res.fresher_shard_map = std::make_optional(shard_map_);
@ -158,19 +181,7 @@ class Coordinator {
return res;
}
ReadResponses HandleRead(GetShardMapRequest &&get_shard_map_request) {
GetShardMapResponse res;
res.shard_map = shard_map_;
return res;
}
WriteResponses ApplyWrite(AllocateHlcBatchRequest &&ahr) {
AllocateHlcBatchResponse res{};
return res;
}
WriteResponses ApplyWrite(AllocateEdgeIdBatchRequest &&ahr) {
CoordinatorWriteResponses ApplyWrite(AllocateEdgeIdBatchRequest &&ahr) {
AllocateEdgeIdBatchResponse res{};
uint64_t low = highest_allocated_edge_id_;
@ -188,7 +199,7 @@ class Coordinator {
/// This splits the shard immediately beneath the provided
/// split key, keeping the assigned peers identical for now,
/// but letting them be gradually migrated over time.
WriteResponses ApplyWrite(SplitShardRequest &&split_shard_request) {
CoordinatorWriteResponses ApplyWrite(SplitShardRequest &&split_shard_request) {
SplitShardResponse res{};
if (split_shard_request.previous_shard_map_version != shard_map_.shard_map_version) {
@ -203,7 +214,7 @@ class Coordinator {
/// This adds the provided storage engine to the standby storage engine pool,
/// which can be used to rebalance storage over time.
WriteResponses ApplyWrite(RegisterStorageEngineRequest &&register_storage_engine_request) {
static CoordinatorWriteResponses ApplyWrite(RegisterStorageEngineRequest && /* register_storage_engine_request */) {
RegisterStorageEngineResponse res{};
// TODO
@ -212,7 +223,7 @@ class Coordinator {
/// This begins the process of draining the provided storage engine from all raft
/// clusters that it might be participating in.
WriteResponses ApplyWrite(DeregisterStorageEngineRequest &&register_storage_engine_request) {
static CoordinatorWriteResponses ApplyWrite(DeregisterStorageEngineRequest && /* register_storage_engine_request */) {
DeregisterStorageEngineResponse res{};
// TODO
// const Address &address = register_storage_engine_request.address;
@ -222,10 +233,10 @@ class Coordinator {
return res;
}
WriteResponses ApplyWrite(InitializeLabelRequest &&initialize_label_request) {
CoordinatorWriteResponses ApplyWrite(InitializeLabelRequest &&initialize_label_request) {
InitializeLabelResponse res{};
bool success = shard_map_.InitializeNewLabel(initialize_label_request.label_name,
bool success = shard_map_.InitializeNewLabel(initialize_label_request.label_name, initialize_label_request.schema,
initialize_label_request.last_shard_map_version);
if (success) {
@ -239,7 +250,7 @@ class Coordinator {
return res;
}
WriteResponses ApplyWrite(AllocatePropertyIdsRequest &&allocate_property_ids_request) {
CoordinatorWriteResponses ApplyWrite(AllocatePropertyIdsRequest &&allocate_property_ids_request) {
AllocatePropertyIdsResponse res{};
auto property_ids = shard_map_.AllocatePropertyIds(allocate_property_ids_request.property_names);
@ -248,19 +259,6 @@ class Coordinator {
return res;
}
public:
explicit Coordinator(ShardMap sm) : shard_map_{(sm)} {}
ReadResponses Read(ReadRequests requests) {
return std::visit([&](auto &&request) mutable { return HandleRead(std::forward<decltype(request)>(request)); },
std::move(requests));
}
WriteResponses Apply(WriteRequests requests) {
return std::visit([&](auto &&request) mutable { return ApplyWrite(std::forward<decltype(request)>(request)); },
std::move(requests));
}
};
} // namespace memgraph::coordinator

View File

@ -19,6 +19,6 @@ namespace memgraph::coordinator {
using memgraph::io::rsm::RsmClient;
template <typename IoImpl>
using CoordinatorClient = RsmClient<IoImpl, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
using CoordinatorClient = RsmClient<IoImpl, CoordinatorWriteRequests, CoordinatorWriteResponses,
CoordinatorReadRequests, CoordinatorReadResponses>;
} // namespace memgraph::coordinator

View File

@ -17,7 +17,7 @@
namespace memgraph::coordinator {
template <typename IoImpl>
using CoordinatorRsm =
memgraph::io::rsm::Raft<IoImpl, Coordinator, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
using CoordinatorRsm = memgraph::io::rsm::Raft<IoImpl, Coordinator, CoordinatorWriteRequests, CoordinatorWriteResponses,
CoordinatorReadRequests, CoordinatorReadResponses>;
} // namespace memgraph::coordinator

View File

@ -18,12 +18,14 @@
#include "io/address.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/schemas.hpp"
namespace memgraph::coordinator {
using memgraph::io::Address;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::PropertyId;
using memgraph::storage::v3::SchemaProperty;
enum class Status : uint8_t {
CONSENSUS_PARTICIPANT,
@ -36,7 +38,6 @@ enum class Status : uint8_t {
struct AddressAndStatus {
memgraph::io::Address address;
Status status;
friend bool operator<(const AddressAndStatus &lhs, const AddressAndStatus &rhs) { return lhs.address < rhs.address; }
};
@ -47,18 +48,30 @@ using LabelName = std::string;
using PropertyName = std::string;
using PropertyMap = std::map<PropertyName, PropertyId>;
struct LabelSpace {
std::vector<SchemaProperty> schema;
std::map<CompoundKey, Shard> shards;
};
struct ShardMap {
Hlc shard_map_version;
uint64_t max_property_id;
std::map<PropertyName, PropertyId> properties;
uint64_t max_label_id;
std::map<LabelName, LabelId> labels;
std::map<LabelId, Shards> shards;
std::map<LabelId, LabelSpace> label_spaces;
std::map<LabelId, std::vector<SchemaProperty>> schemas;
Shards GetShards(const LabelName &label) {
const auto id = labels[label];
auto &shards = label_spaces[id].shards;
return shards;
}
auto FindShardToInsert(const LabelName &name, CompoundKey &key) {
MG_ASSERT(labels.contains(name));
const auto id = labels.find(name)->second;
auto &shards_ref = shards[id];
auto &shards_ref = label_spaces[id].shards;
auto it =
std::find_if(shards_ref.rbegin(), shards_ref.rend(), [&key](const auto &shard) { return shard.first <= key; });
MG_ASSERT(it != shards_ref.rbegin());
@ -74,50 +87,42 @@ struct ShardMap {
Hlc GetHlc() const noexcept { return shard_map_version; }
bool SplitShard(Hlc previous_shard_map_version, LabelId label_id, CompoundKey key) {
bool SplitShard(Hlc previous_shard_map_version, LabelId label_id, const CompoundKey &key) {
if (previous_shard_map_version != shard_map_version) {
return false;
}
if (!shards.contains(label_id)) {
return false;
}
auto &label_space = label_spaces.at(label_id);
auto &shards_in_map = label_space.shards;
auto &shards_in_map = shards.at(label_id);
MG_ASSERT(!shards_in_map.contains(key));
MG_ASSERT(label_spaces.contains(label_id));
// Finding the Shard that the new CompoundKey should map to.
Shard shard_to_map_to;
CompoundKey prev_key = ((*shards_in_map.begin()).first);
for (auto iter = std::next(shards_in_map.begin()); iter != shards_in_map.end(); ++iter) {
const auto &current_key = (*iter).first;
if (key > prev_key && key < current_key) {
shard_to_map_to = shards_in_map[prev_key];
}
prev_key = (*iter).first;
}
auto prev = std::prev(shards_in_map.upper_bound(key));
Shard duplicated_shard = prev->second;
// Apply the split
shards_in_map[key] = shard_to_map_to;
shards_in_map[key] = duplicated_shard;
return true;
}
bool InitializeNewLabel(std::string label_name, Hlc last_shard_map_version) {
if (shard_map_version != last_shard_map_version) {
return false;
}
if (labels.contains(label_name)) {
bool InitializeNewLabel(std::string label_name, std::vector<SchemaProperty> schema, Hlc last_shard_map_version) {
if (shard_map_version != last_shard_map_version || labels.contains(label_name)) {
return false;
}
const LabelId label_id = LabelId::FromUint(++max_label_id);
labels.emplace(label_name, label_id);
shards.emplace(label_id, Shards{});
labels.emplace(std::move(label_name), label_id);
LabelSpace label_space{
.schema = std::move(schema),
.shards = Shards{},
};
label_spaces.emplace(label_id, label_space);
IncrementShardMapVersion();
@ -129,46 +134,44 @@ struct ShardMap {
}
LabelId GetLabelId(const std::string &label) const { return labels.at(label); }
Shards GetShardsForRange(LabelName label_name, CompoundKey start_key, CompoundKey end_key) {
Shards GetShardsForRange(const LabelName &label_name, const CompoundKey &start_key,
const CompoundKey &end_key) const {
MG_ASSERT(start_key <= end_key);
MG_ASSERT(labels.contains(label_name));
LabelId label_id = labels.at(label_name);
const auto &shard_for_label = shards.at(label_id);
const auto &label_space = label_spaces.at(label_id);
auto it = std::prev(shard_for_label.upper_bound(start_key));
const auto end_it = shard_for_label.upper_bound(end_key);
const auto &shards_for_label = label_space.shards;
MG_ASSERT(shards_for_label.begin()->first <= start_key,
"the ShardMap must always contain a minimal key that is less than or equal to any requested key");
auto it = std::prev(shards_for_label.upper_bound(start_key));
const auto end_it = shards_for_label.upper_bound(end_key);
Shards shards{};
for (; it != end_it; it++) {
shards.emplace(it->first, it->second);
}
std::copy(it, end_it, std::inserter(shards, shards.end()));
return shards;
}
Shard GetShardForKey(LabelName label_name, CompoundKey key) {
Shard GetShardForKey(const LabelName &label_name, const CompoundKey &key) const {
MG_ASSERT(labels.contains(label_name));
LabelId label_id = labels.at(label_name);
const auto &shard_for_label = shards.at(label_id);
const auto &label_space = label_spaces.at(label_id);
return std::prev(shard_for_label.upper_bound(key))->second;
MG_ASSERT(label_space.shards.begin()->first <= key,
"the ShardMap must always contain a minimal key that is less than or equal to any requested key");
return std::prev(label_space.shards.upper_bound(key))->second;
}
Shard GetShardForKey(LabelId label_id, CompoundKey key) {
MG_ASSERT(shards.contains(label_id));
const auto &shard_for_label = shards.at(label_id);
return std::prev(shard_for_label.upper_bound(key))->second;
}
PropertyMap AllocatePropertyIds(std::vector<PropertyName> &new_properties) {
PropertyMap AllocatePropertyIds(const std::vector<PropertyName> &new_properties) {
PropertyMap ret{};
bool mutated = false;
@ -181,9 +184,7 @@ struct ShardMap {
mutated = true;
const PropertyId property_id = PropertyId::FromUint(++max_property_id);
ret.emplace(property_name, property_id);
properties.emplace(property_name, property_id);
}
}
@ -195,12 +196,12 @@ struct ShardMap {
return ret;
}
std::optional<PropertyId> GetPropertyId(std::string &property_name) {
std::optional<PropertyId> GetPropertyId(const std::string &property_name) const {
if (properties.contains(property_name)) {
return properties.at(property_name);
} else {
return std::nullopt;
}
return std::nullopt;
}
};

View File

@ -45,6 +45,15 @@ struct Address {
};
}
/// Returns a new ID with the same IP and port but a unique UUID.
Address ForkUniqueAddress() {
return Address{
.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()},
.last_known_ip = last_known_ip,
.last_known_port = last_known_port,
};
}
friend bool operator==(const Address &lhs, const Address &rhs) = default;
/// unique_id is most dominant for ordering, then last_known_ip, then last_known_port

View File

@ -41,8 +41,6 @@ class RsmClient {
Io<IoImpl> io_;
Address leader_;
std::mt19937 cli_rng_{0};
ServerPool server_addrs_;
template <typename ResponseT>
@ -53,7 +51,7 @@ class RsmClient {
spdlog::debug("client redirected to leader server {}", leader_.ToString());
} else if (!response.success) {
std::uniform_int_distribution<size_t> addr_distrib(0, (server_addrs_.size() - 1));
size_t addr_index = addr_distrib(cli_rng_);
size_t addr_index = io_.Rand(addr_distrib);
leader_ = server_addrs_[addr_index];
spdlog::debug(
@ -84,7 +82,6 @@ class RsmClient {
if (response_result.HasError()) {
spdlog::debug("client timed out while trying to communicate with leader server {}", leader_.ToString());
// continue;
return response_result.GetError();
}

View File

@ -32,8 +32,6 @@
#include "coordinator/hybrid_logical_clock.hpp"
#include "io/address.hpp"
#include "io/rsm/raft.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp"
#include "utils/logging.hpp"
@ -41,14 +39,10 @@
namespace memgraph::io::rsm {
using memgraph::coordinator::Hlc;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::PropertyValue;
using ShardRsmKey = std::vector<memgraph::storage::v3::PropertyValue>;
using ShardRsmKey = std::vector<PropertyValue>;
struct StorageWriteRequest {
LabelId label_id;
@ -84,7 +78,7 @@ class ShardRsm {
Hlc shard_map_version_;
// The key is not located in this shard
bool IsKeyInRange(const ShardRsmKey &key) {
bool IsKeyInRange(const ShardRsmKey &key) const {
if (maximum_key_) [[likely]] {
return (key >= minimum_key_ && key <= maximum_key_);
}
@ -92,14 +86,14 @@ class ShardRsm {
}
public:
StorageReadResponse Read(StorageReadRequest request) {
StorageReadResponse Read(StorageReadRequest request) const {
StorageReadResponse ret;
if (!IsKeyInRange(request.key)) {
ret.latest_known_shard_map_version = shard_map_version_;
ret.shard_rsm_success = false;
} else if (state_.contains(request.key)) {
ret.value = state_[request.key];
ret.value = state_.at(request.key);
ret.shard_rsm_success = true;
} else {
ret.shard_rsm_success = false;
@ -151,7 +145,7 @@ class ShardRsm {
ret.last_value = std::nullopt;
ret.shard_rsm_success = true;
state_.emplace(request.key, std::move(request.value).value());
state_.emplace(request.key, request.value.value());
}
return ret;

View File

@ -115,12 +115,12 @@ class ShardRequestManager : public ShardRequestManagerInterface {
void StartTransaction() override {
memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
auto read_res = coord_cli_.SendReadRequest(req);
if (read_res.HasError()) {
auto write_res = coord_cli_.SendWriteRequest(req);
if (write_res.HasError()) {
throw std::runtime_error("HLC request failed");
}
auto coordinator_read_response = read_res.GetValue();
auto hlc_response = std::get<memgraph::coordinator::HlcResponse>(coordinator_read_response);
auto coordinator_write_response = write_res.GetValue();
auto hlc_response = std::get<memgraph::coordinator::HlcResponse>(coordinator_write_response);
// Transaction ID to be used later...
transaction_id_ = hlc_response.new_hlc;
@ -137,7 +137,9 @@ class ShardRequestManager : public ShardRequestManagerInterface {
std::vector<ScanVerticesResponse> responses;
auto &shard_cacheref = state.shard_cache;
size_t id = 0;
std::cout << "ScanVerticesRequest" << std::endl;
for (auto shard_it = shard_cacheref.begin(); shard_it != shard_cacheref.end(); ++id) {
std::cout << "ScanVerticesResponse" << std::endl;
auto &storage_client = GetStorageClientForShard(*state.label, state.requests[id].start_id.second);
// TODO(kostasrim) Currently requests return the result directly. Adjust this when the API works MgFuture instead.
auto read_response_result = storage_client.SendReadRequest(state.requests[id]);
@ -215,27 +217,6 @@ class ShardRequestManager : public ShardRequestManagerInterface {
return state.state != ExecutionState::INITIALIZING;
}
template <typename TRequest>
void MaybeUpdateExecutionState(TRequest &state) {
if (state.shard_cache) {
return;
}
state.transaction_id = transaction_id_;
state.shard_cache = std::make_optional<std::vector<Shard>>();
const auto &shards = shards_map_.shards[shards_map_.labels[state.label]];
if (state.key) {
if (auto it = shards.find(*state.key); it != shards.end()) {
state.shard_cache->push_back(it->second);
return;
}
// throw here
}
for (const auto &[key, shard] : shards) {
state.shard_cache->push_back(shard);
}
}
void MaybeInitializeExecutionState(ExecutionState<CreateVerticesRequest> &state,
std::vector<NewVertexLabel> new_vertices) {
ThrowIfStateCompleted(state);
@ -271,12 +252,12 @@ class ShardRequestManager : public ShardRequestManagerInterface {
return;
}
state.transaction_id = transaction_id_;
const auto &shards = shards_map_.shards[shards_map_.labels[*state.label]];
const auto shards = shards_map_.GetShards(*state.label);
for (const auto &[key, shard] : shards) {
state.shard_cache.push_back(shard);
state.shard_cache.push_back(std::move(shard));
ScanVerticesRequest rqst;
rqst.transaction_id = transaction_id_;
rqst.start_id.second = key;
rqst.start_id.second = std::move(key);
state.requests.push_back(std::move(rqst));
}
state.state = ExecutionState<ScanVerticesRequest>::EXECUTING;

View File

@ -19,6 +19,7 @@
#include <vector>
#include "common.hpp"
#include "common/types.hpp"
#include "coordinator/coordinator_client.hpp"
#include "coordinator/coordinator_rsm.hpp"
#include "io/address.hpp"
@ -66,6 +67,7 @@ using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::SchemaProperty;
using memgraph::utils::BasicResult;
using ShardClient =
@ -79,12 +81,26 @@ ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::co
static const std::string label_name = std::string("test_label");
ShardMap sm;
// register new properties
const std::vector<std::string> property_names = {"property_1", "property_2"};
const auto properties = sm.AllocatePropertyIds(property_names);
const auto property_id_1 = properties.at("property_1");
const auto property_id_2 = properties.at("property_2");
const auto type_1 = memgraph::common::SchemaType::INT;
const auto type_2 = memgraph::common::SchemaType::INT;
// register new label space
bool label_success = sm.InitializeNewLabel(label_name, sm.shard_map_version);
std::vector<SchemaProperty> schema = {
SchemaProperty{.property_id = property_id_1, .type = type_1},
SchemaProperty{.property_id = property_id_2, .type = type_2},
};
bool label_success = sm.InitializeNewLabel(label_name, schema, sm.shard_map_version);
MG_ASSERT(label_success);
LabelId label_id = sm.labels.at(label_name);
Shards &shards_for_label = sm.shards.at(label_id);
const LabelId label_id = sm.labels.at(label_name);
auto &label_space = sm.label_spaces.at(label_id);
Shards &shards_for_label = label_space.shards;
// add first shard at [0, 0]
AddressAndStatus aas1_1{.address = a_io_1, .status = Status::CONSENSUS_PARTICIPANT};

View File

@ -18,6 +18,7 @@
#include <thread>
#include <vector>
#include "common/types.hpp"
#include "coordinator/coordinator_client.hpp"
#include "coordinator/coordinator_rsm.hpp"
#include "io/address.hpp"
@ -27,8 +28,8 @@
#include "io/rsm/shard_rsm.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/schemas.hpp"
#include "utils/result.hpp"
using memgraph::coordinator::AddressAndStatus;
@ -64,6 +65,7 @@ using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::SchemaProperty;
using memgraph::utils::BasicResult;
using ShardClient =
@ -72,17 +74,29 @@ namespace {
const std::string label_name = std::string("test_label");
ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::coordinator::Address a_io_2,
memgraph::coordinator::Address a_io_3, memgraph::coordinator::Address b_io_1,
memgraph::coordinator::Address b_io_2, memgraph::coordinator::Address b_io_3) {
ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Address b_io_1, Address b_io_2,
Address b_io_3) {
ShardMap sm;
// register new properties
const std::vector<std::string> property_names = {"property_1", "property_2"};
const auto properties = sm.AllocatePropertyIds(property_names);
const auto property_id_1 = properties.at("property_1");
const auto property_id_2 = properties.at("property_2");
const auto type_1 = memgraph::common::SchemaType::INT;
const auto type_2 = memgraph::common::SchemaType::INT;
// register new label space
bool label_success = sm.InitializeNewLabel(label_name, sm.shard_map_version);
std::vector<SchemaProperty> schema = {
SchemaProperty{.property_id = property_id_1, .type = type_1},
SchemaProperty{.property_id = property_id_2, .type = type_2},
};
bool label_success = sm.InitializeNewLabel(label_name, schema, sm.shard_map_version);
MG_ASSERT(label_success);
LabelId label_id = sm.labels.at(label_name);
Shards &shards_for_label = sm.shards.at(label_id);
const LabelId label_id = sm.labels.at(label_name);
auto &label_space = sm.label_spaces.at(label_id);
Shards &shards_for_label = label_space.shards;
// add first shard at [0, 0]
AddressAndStatus aas1_1{.address = a_io_1, .status = Status::CONSENSUS_PARTICIPANT};
@ -91,10 +105,10 @@ ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::co
Shard shard1 = {aas1_1, aas1_2, aas1_3};
auto key1 = memgraph::storage::v3::PropertyValue(0);
auto key2 = memgraph::storage::v3::PropertyValue(0);
CompoundKey compound_key_1 = {key1, key2};
shards_for_label[compound_key_1] = shard1;
const auto key1 = memgraph::storage::v3::PropertyValue(0);
const auto key2 = memgraph::storage::v3::PropertyValue(0);
const CompoundKey compound_key_1 = {key1, key2};
shards_for_label.emplace(compound_key_1, shard1);
// add second shard at [12, 13]
AddressAndStatus aas2_1{.address = b_io_1, .status = Status::CONSENSUS_PARTICIPANT};
@ -111,7 +125,7 @@ ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::co
return sm;
}
std::optional<ShardClient> DetermineShardLocation(Shard target_shard, const std::vector<Address> &a_addrs,
std::optional<ShardClient> DetermineShardLocation(const Shard &target_shard, const std::vector<Address> &a_addrs,
ShardClient a_client, const std::vector<Address> &b_addrs,
ShardClient b_client) {
for (const auto &addr : target_shard) {
@ -129,20 +143,19 @@ std::optional<ShardClient> DetermineShardLocation(Shard target_shard, const std:
using ConcreteCoordinatorRsm = CoordinatorRsm<SimulatorTransport>;
using ConcreteShardRsm = Raft<SimulatorTransport, ShardRsm, StorageWriteRequest, StorageWriteResponse,
ScanVerticesRequest, ScanVerticesResponse>;
StorageReadRequest, StorageReadResponse>;
template <typename IoImpl>
void RunStorageRaft(
Raft<IoImpl, ShardRsm, StorageWriteRequest, StorageWriteResponse, ScanVerticesRequest, ScanVerticesResponse>
server) {
Raft<IoImpl, ShardRsm, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse> server) {
server.Run();
}
int main() {
SimulatorConfig config{
.drop_percent = 0,
.perform_timeouts = false,
.scramble_messages = false,
.drop_percent = 5,
.perform_timeouts = true,
.scramble_messages = true,
.rng_seed = 0,
.start_time = Time::min() + std::chrono::microseconds{256 * 1024},
.abort_time = Time::min() + std::chrono::microseconds{2 * 8 * 1024 * 1024},
@ -257,7 +270,8 @@ int main() {
const CompoundKey compound_key = {cm_key_1, cm_key_2};
// Look for Shard
BasicResult<TimedOut, memgraph::coordinator::ReadResponses> read_res = coordinator_client.SendReadRequest(req);
BasicResult<TimedOut, memgraph::coordinator::CoordinatorWriteResponses> read_res =
coordinator_client.SendWriteRequest(req);
if (read_res.HasError()) {
// timeout