Merge branch 'project-pineapples' into T1185-MG-replace-skip-list
This commit is contained in:
commit
c24c699c78
@ -283,7 +283,7 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
|
|||||||
// TODO(tyler) avoid these triple-nested loops by having the heartbeat include better info
|
// TODO(tyler) avoid these triple-nested loops by having the heartbeat include better info
|
||||||
bool machine_contains_shard = false;
|
bool machine_contains_shard = false;
|
||||||
|
|
||||||
for (auto &aas : shard) {
|
for (auto &aas : shard.peers) {
|
||||||
if (initialized.contains(aas.address.unique_id)) {
|
if (initialized.contains(aas.address.unique_id)) {
|
||||||
machine_contains_shard = true;
|
machine_contains_shard = true;
|
||||||
if (aas.status != Status::CONSENSUS_PARTICIPANT) {
|
if (aas.status != Status::CONSENSUS_PARTICIPANT) {
|
||||||
@ -311,7 +311,7 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!machine_contains_shard && shard.size() < label_space.replication_factor) {
|
if (!machine_contains_shard && shard.peers.size() < label_space.replication_factor) {
|
||||||
// increment version for each new uuid for deterministic creation
|
// increment version for each new uuid for deterministic creation
|
||||||
IncrementShardMapVersion();
|
IncrementShardMapVersion();
|
||||||
|
|
||||||
@ -337,7 +337,7 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
|
|||||||
.status = Status::INITIALIZING,
|
.status = Status::INITIALIZING,
|
||||||
};
|
};
|
||||||
|
|
||||||
shard.emplace_back(aas);
|
shard.peers.emplace_back(aas);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -360,9 +360,9 @@ bool ShardMap::SplitShard(Hlc previous_shard_map_version, LabelId label_id, cons
|
|||||||
MG_ASSERT(!shards_in_map.contains(key));
|
MG_ASSERT(!shards_in_map.contains(key));
|
||||||
MG_ASSERT(label_spaces.contains(label_id));
|
MG_ASSERT(label_spaces.contains(label_id));
|
||||||
|
|
||||||
// Finding the Shard that the new PrimaryKey should map to.
|
// Finding the ShardMetadata that the new PrimaryKey should map to.
|
||||||
auto prev = std::prev(shards_in_map.upper_bound(key));
|
auto prev = std::prev(shards_in_map.upper_bound(key));
|
||||||
Shard duplicated_shard = prev->second;
|
ShardMetadata duplicated_shard = prev->second;
|
||||||
|
|
||||||
// Apply the split
|
// Apply the split
|
||||||
shards_in_map[key] = duplicated_shard;
|
shards_in_map[key] = duplicated_shard;
|
||||||
@ -383,7 +383,7 @@ std::optional<LabelId> ShardMap::InitializeNewLabel(std::string label_name, std:
|
|||||||
labels.emplace(std::move(label_name), label_id);
|
labels.emplace(std::move(label_name), label_id);
|
||||||
|
|
||||||
PrimaryKey initial_key = SchemaToMinKey(schema);
|
PrimaryKey initial_key = SchemaToMinKey(schema);
|
||||||
Shard empty_shard = {};
|
ShardMetadata empty_shard = {};
|
||||||
|
|
||||||
Shards shards = {
|
Shards shards = {
|
||||||
{initial_key, empty_shard},
|
{initial_key, empty_shard},
|
||||||
@ -479,7 +479,7 @@ Shards ShardMap::GetShardsForRange(const LabelName &label_name, const PrimaryKey
|
|||||||
return shards;
|
return shards;
|
||||||
}
|
}
|
||||||
|
|
||||||
Shard ShardMap::GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const {
|
ShardMetadata ShardMap::GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const {
|
||||||
MG_ASSERT(labels.contains(label_name));
|
MG_ASSERT(labels.contains(label_name));
|
||||||
|
|
||||||
LabelId label_id = labels.at(label_name);
|
LabelId label_id = labels.at(label_name);
|
||||||
@ -492,7 +492,7 @@ Shard ShardMap::GetShardForKey(const LabelName &label_name, const PrimaryKey &ke
|
|||||||
return std::prev(label_space.shards.upper_bound(key))->second;
|
return std::prev(label_space.shards.upper_bound(key))->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
Shard ShardMap::GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const {
|
ShardMetadata ShardMap::GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const {
|
||||||
MG_ASSERT(label_spaces.contains(label_id));
|
MG_ASSERT(label_spaces.contains(label_id));
|
||||||
|
|
||||||
const auto &label_space = label_spaces.at(label_id);
|
const auto &label_space = label_spaces.at(label_id);
|
||||||
@ -556,12 +556,12 @@ EdgeTypeIdMap ShardMap::AllocateEdgeTypeIds(const std::vector<EdgeTypeName> &new
|
|||||||
bool ShardMap::ClusterInitialized() const {
|
bool ShardMap::ClusterInitialized() const {
|
||||||
for (const auto &[label_id, label_space] : label_spaces) {
|
for (const auto &[label_id, label_space] : label_spaces) {
|
||||||
for (const auto &[low_key, shard] : label_space.shards) {
|
for (const auto &[low_key, shard] : label_space.shards) {
|
||||||
if (shard.size() < label_space.replication_factor) {
|
if (shard.peers.size() < label_space.replication_factor) {
|
||||||
spdlog::info("label_space below desired replication factor");
|
spdlog::info("label_space below desired replication factor");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto &aas : shard) {
|
for (const auto &aas : shard.peers) {
|
||||||
if (aas.status != Status::CONSENSUS_PARTICIPANT) {
|
if (aas.status != Status::CONSENSUS_PARTICIPANT) {
|
||||||
spdlog::info("shard member not yet a CONSENSUS_PARTICIPANT");
|
spdlog::info("shard member not yet a CONSENSUS_PARTICIPANT");
|
||||||
return false;
|
return false;
|
||||||
|
@ -76,8 +76,35 @@ struct AddressAndStatus {
|
|||||||
};
|
};
|
||||||
|
|
||||||
using PrimaryKey = std::vector<PropertyValue>;
|
using PrimaryKey = std::vector<PropertyValue>;
|
||||||
using Shard = std::vector<AddressAndStatus>;
|
|
||||||
using Shards = std::map<PrimaryKey, Shard>;
|
struct ShardMetadata {
|
||||||
|
std::vector<AddressAndStatus> peers;
|
||||||
|
uint64_t version;
|
||||||
|
|
||||||
|
friend std::ostream &operator<<(std::ostream &in, const ShardMetadata &shard) {
|
||||||
|
using utils::print_helpers::operator<<;
|
||||||
|
|
||||||
|
in << "ShardMetadata { peers: ";
|
||||||
|
in << shard.peers;
|
||||||
|
in << " version: ";
|
||||||
|
in << shard.version;
|
||||||
|
in << " }";
|
||||||
|
|
||||||
|
return in;
|
||||||
|
}
|
||||||
|
|
||||||
|
friend bool operator==(const ShardMetadata &lhs, const ShardMetadata &rhs) = default;
|
||||||
|
|
||||||
|
friend bool operator<(const ShardMetadata &lhs, const ShardMetadata &rhs) {
|
||||||
|
if (lhs.peers != rhs.peers) {
|
||||||
|
return lhs.peers < rhs.peers;
|
||||||
|
}
|
||||||
|
|
||||||
|
return lhs.version < rhs.version;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
using Shards = std::map<PrimaryKey, ShardMetadata>;
|
||||||
using LabelName = std::string;
|
using LabelName = std::string;
|
||||||
using PropertyName = std::string;
|
using PropertyName = std::string;
|
||||||
using EdgeTypeName = std::string;
|
using EdgeTypeName = std::string;
|
||||||
@ -99,7 +126,7 @@ PrimaryKey SchemaToMinKey(const std::vector<SchemaProperty> &schema);
|
|||||||
struct LabelSpace {
|
struct LabelSpace {
|
||||||
std::vector<SchemaProperty> schema;
|
std::vector<SchemaProperty> schema;
|
||||||
// Maps between the smallest primary key stored in the shard and the shard
|
// Maps between the smallest primary key stored in the shard and the shard
|
||||||
std::map<PrimaryKey, Shard> shards;
|
std::map<PrimaryKey, ShardMetadata> shards;
|
||||||
size_t replication_factor;
|
size_t replication_factor;
|
||||||
|
|
||||||
friend std::ostream &operator<<(std::ostream &in, const LabelSpace &label_space) {
|
friend std::ostream &operator<<(std::ostream &in, const LabelSpace &label_space) {
|
||||||
@ -160,9 +187,9 @@ struct ShardMap {
|
|||||||
|
|
||||||
Shards GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const;
|
Shards GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const;
|
||||||
|
|
||||||
Shard GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const;
|
ShardMetadata GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const;
|
||||||
|
|
||||||
Shard GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const;
|
ShardMetadata GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const;
|
||||||
|
|
||||||
PropertyMap AllocatePropertyIds(const std::vector<PropertyName> &new_properties);
|
PropertyMap AllocatePropertyIds(const std::vector<PropertyName> &new_properties);
|
||||||
|
|
||||||
|
@ -50,7 +50,7 @@ template <typename TStorageClient>
|
|||||||
class RsmStorageClientManager {
|
class RsmStorageClientManager {
|
||||||
public:
|
public:
|
||||||
using CompoundKey = io::rsm::ShardRsmKey;
|
using CompoundKey = io::rsm::ShardRsmKey;
|
||||||
using Shard = coordinator::Shard;
|
using ShardMetadata = coordinator::ShardMetadata;
|
||||||
RsmStorageClientManager() = default;
|
RsmStorageClientManager() = default;
|
||||||
RsmStorageClientManager(const RsmStorageClientManager &) = delete;
|
RsmStorageClientManager(const RsmStorageClientManager &) = delete;
|
||||||
RsmStorageClientManager(RsmStorageClientManager &&) = delete;
|
RsmStorageClientManager(RsmStorageClientManager &&) = delete;
|
||||||
@ -58,25 +58,25 @@ class RsmStorageClientManager {
|
|||||||
RsmStorageClientManager &operator=(RsmStorageClientManager &&) = delete;
|
RsmStorageClientManager &operator=(RsmStorageClientManager &&) = delete;
|
||||||
~RsmStorageClientManager() = default;
|
~RsmStorageClientManager() = default;
|
||||||
|
|
||||||
void AddClient(Shard key, TStorageClient client) { cli_cache_.emplace(std::move(key), std::move(client)); }
|
void AddClient(ShardMetadata key, TStorageClient client) { cli_cache_.emplace(std::move(key), std::move(client)); }
|
||||||
|
|
||||||
bool Exists(const Shard &key) { return cli_cache_.contains(key); }
|
bool Exists(const ShardMetadata &key) { return cli_cache_.contains(key); }
|
||||||
|
|
||||||
void PurgeCache() { cli_cache_.clear(); }
|
void PurgeCache() { cli_cache_.clear(); }
|
||||||
|
|
||||||
TStorageClient &GetClient(const Shard &key) {
|
TStorageClient &GetClient(const ShardMetadata &key) {
|
||||||
auto it = cli_cache_.find(key);
|
auto it = cli_cache_.find(key);
|
||||||
MG_ASSERT(it != cli_cache_.end(), "Non-existing shard client");
|
MG_ASSERT(it != cli_cache_.end(), "Non-existing shard client");
|
||||||
return it->second;
|
return it->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::map<Shard, TStorageClient> cli_cache_;
|
std::map<ShardMetadata, TStorageClient> cli_cache_;
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename TRequest>
|
template <typename TRequest>
|
||||||
struct ShardRequestState {
|
struct ShardRequestState {
|
||||||
memgraph::coordinator::Shard shard;
|
memgraph::coordinator::ShardMetadata shard;
|
||||||
TRequest request;
|
TRequest request;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -125,7 +125,7 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
using CoordinatorWriteRequests = coordinator::CoordinatorWriteRequests;
|
using CoordinatorWriteRequests = coordinator::CoordinatorWriteRequests;
|
||||||
using CoordinatorClient = coordinator::CoordinatorClient<TTransport>;
|
using CoordinatorClient = coordinator::CoordinatorClient<TTransport>;
|
||||||
using Address = io::Address;
|
using Address = io::Address;
|
||||||
using Shard = coordinator::Shard;
|
using ShardMetadata = coordinator::ShardMetadata;
|
||||||
using ShardMap = coordinator::ShardMap;
|
using ShardMap = coordinator::ShardMap;
|
||||||
using CompoundKey = coordinator::PrimaryKey;
|
using CompoundKey = coordinator::PrimaryKey;
|
||||||
using VertexAccessor = query::v2::accessors::VertexAccessor;
|
using VertexAccessor = query::v2::accessors::VertexAccessor;
|
||||||
@ -403,7 +403,7 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
private:
|
private:
|
||||||
std::vector<ShardRequestState<msgs::CreateVerticesRequest>> RequestsForCreateVertices(
|
std::vector<ShardRequestState<msgs::CreateVerticesRequest>> RequestsForCreateVertices(
|
||||||
const std::vector<msgs::NewVertex> &new_vertices) {
|
const std::vector<msgs::NewVertex> &new_vertices) {
|
||||||
std::map<Shard, msgs::CreateVerticesRequest> per_shard_request_table;
|
std::map<ShardMetadata, msgs::CreateVerticesRequest> per_shard_request_table;
|
||||||
|
|
||||||
for (auto &new_vertex : new_vertices) {
|
for (auto &new_vertex : new_vertices) {
|
||||||
MG_ASSERT(!new_vertex.label_ids.empty(), "No label_ids provided for new vertex in RequestRouter::CreateVertices");
|
MG_ASSERT(!new_vertex.label_ids.empty(), "No label_ids provided for new vertex in RequestRouter::CreateVertices");
|
||||||
@ -431,9 +431,9 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
|
|
||||||
std::vector<ShardRequestState<msgs::CreateExpandRequest>> RequestsForCreateExpand(
|
std::vector<ShardRequestState<msgs::CreateExpandRequest>> RequestsForCreateExpand(
|
||||||
const std::vector<msgs::NewExpand> &new_expands) {
|
const std::vector<msgs::NewExpand> &new_expands) {
|
||||||
std::map<Shard, msgs::CreateExpandRequest> per_shard_request_table;
|
std::map<ShardMetadata, msgs::CreateExpandRequest> per_shard_request_table;
|
||||||
auto ensure_shard_exists_in_table = [&per_shard_request_table,
|
auto ensure_shard_exists_in_table = [&per_shard_request_table,
|
||||||
transaction_id = transaction_id_](const Shard &shard) {
|
transaction_id = transaction_id_](const ShardMetadata &shard) {
|
||||||
if (!per_shard_request_table.contains(shard)) {
|
if (!per_shard_request_table.contains(shard)) {
|
||||||
msgs::CreateExpandRequest create_expand_request{.transaction_id = transaction_id};
|
msgs::CreateExpandRequest create_expand_request{.transaction_id = transaction_id};
|
||||||
per_shard_request_table.insert({shard, std::move(create_expand_request)});
|
per_shard_request_table.insert({shard, std::move(create_expand_request)});
|
||||||
@ -484,7 +484,7 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
|
|
||||||
for (auto &shards : multi_shards) {
|
for (auto &shards : multi_shards) {
|
||||||
for (auto &[key, shard] : shards) {
|
for (auto &[key, shard] : shards) {
|
||||||
MG_ASSERT(!shard.empty());
|
MG_ASSERT(!shard.peers.empty());
|
||||||
|
|
||||||
msgs::ScanVerticesRequest request;
|
msgs::ScanVerticesRequest request;
|
||||||
request.transaction_id = transaction_id_;
|
request.transaction_id = transaction_id_;
|
||||||
@ -503,7 +503,7 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::vector<ShardRequestState<msgs::ExpandOneRequest>> RequestsForExpandOne(const msgs::ExpandOneRequest &request) {
|
std::vector<ShardRequestState<msgs::ExpandOneRequest>> RequestsForExpandOne(const msgs::ExpandOneRequest &request) {
|
||||||
std::map<Shard, msgs::ExpandOneRequest> per_shard_request_table;
|
std::map<ShardMetadata, msgs::ExpandOneRequest> per_shard_request_table;
|
||||||
msgs::ExpandOneRequest top_level_rqst_template = request;
|
msgs::ExpandOneRequest top_level_rqst_template = request;
|
||||||
top_level_rqst_template.transaction_id = transaction_id_;
|
top_level_rqst_template.transaction_id = transaction_id_;
|
||||||
top_level_rqst_template.src_vertices.clear();
|
top_level_rqst_template.src_vertices.clear();
|
||||||
@ -533,7 +533,7 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
|
|
||||||
std::vector<ShardRequestState<msgs::GetPropertiesRequest>> RequestsForGetProperties(
|
std::vector<ShardRequestState<msgs::GetPropertiesRequest>> RequestsForGetProperties(
|
||||||
msgs::GetPropertiesRequest &&request) {
|
msgs::GetPropertiesRequest &&request) {
|
||||||
std::map<Shard, msgs::GetPropertiesRequest> per_shard_request_table;
|
std::map<ShardMetadata, msgs::GetPropertiesRequest> per_shard_request_table;
|
||||||
auto top_level_rqst_template = request;
|
auto top_level_rqst_template = request;
|
||||||
top_level_rqst_template.transaction_id = transaction_id_;
|
top_level_rqst_template.transaction_id = transaction_id_;
|
||||||
top_level_rqst_template.vertex_ids.clear();
|
top_level_rqst_template.vertex_ids.clear();
|
||||||
@ -571,7 +571,7 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
return requests;
|
return requests;
|
||||||
}
|
}
|
||||||
|
|
||||||
StorageClient &GetStorageClientForShard(Shard shard) {
|
StorageClient &GetStorageClientForShard(ShardMetadata shard) {
|
||||||
if (!storage_cli_manager_.Exists(shard)) {
|
if (!storage_cli_manager_.Exists(shard)) {
|
||||||
AddStorageClientToManager(shard);
|
AddStorageClientToManager(shard);
|
||||||
}
|
}
|
||||||
@ -583,12 +583,12 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
return GetStorageClientForShard(std::move(shard));
|
return GetStorageClientForShard(std::move(shard));
|
||||||
}
|
}
|
||||||
|
|
||||||
void AddStorageClientToManager(Shard target_shard) {
|
void AddStorageClientToManager(ShardMetadata target_shard) {
|
||||||
MG_ASSERT(!target_shard.empty());
|
MG_ASSERT(!target_shard.peers.empty());
|
||||||
auto leader_addr = target_shard.front();
|
auto leader_addr = target_shard.peers.front();
|
||||||
std::vector<Address> addresses;
|
std::vector<Address> addresses;
|
||||||
addresses.reserve(target_shard.size());
|
addresses.reserve(target_shard.peers.size());
|
||||||
for (auto &address : target_shard) {
|
for (auto &address : target_shard.peers) {
|
||||||
addresses.push_back(std::move(address.address));
|
addresses.push_back(std::move(address.address));
|
||||||
}
|
}
|
||||||
auto cli = StorageClient(io_, std::move(leader_addr.address), std::move(addresses));
|
auto cli = StorageClient(io_, std::move(leader_addr.address), std::move(addresses));
|
||||||
|
@ -46,8 +46,8 @@ using coordinator::CoordinatorClient;
|
|||||||
using coordinator::CoordinatorRsm;
|
using coordinator::CoordinatorRsm;
|
||||||
using coordinator::HlcRequest;
|
using coordinator::HlcRequest;
|
||||||
using coordinator::HlcResponse;
|
using coordinator::HlcResponse;
|
||||||
using coordinator::Shard;
|
|
||||||
using coordinator::ShardMap;
|
using coordinator::ShardMap;
|
||||||
|
using coordinator::ShardMetadata;
|
||||||
using coordinator::Shards;
|
using coordinator::Shards;
|
||||||
using coordinator::Status;
|
using coordinator::Status;
|
||||||
using io::Address;
|
using io::Address;
|
||||||
@ -113,7 +113,7 @@ ShardMap CreateDummyShardmap(coordinator::Address a_io_1, coordinator::Address a
|
|||||||
AddressAndStatus aas1_2{.address = a_io_2, .status = Status::CONSENSUS_PARTICIPANT};
|
AddressAndStatus aas1_2{.address = a_io_2, .status = Status::CONSENSUS_PARTICIPANT};
|
||||||
AddressAndStatus aas1_3{.address = a_io_3, .status = Status::CONSENSUS_PARTICIPANT};
|
AddressAndStatus aas1_3{.address = a_io_3, .status = Status::CONSENSUS_PARTICIPANT};
|
||||||
|
|
||||||
Shard shard1 = {aas1_1, aas1_2, aas1_3};
|
ShardMetadata shard1 = ShardMetadata{.peers = {aas1_1, aas1_2, aas1_3}, .version = 1};
|
||||||
|
|
||||||
auto key1 = storage::v3::PropertyValue(0);
|
auto key1 = storage::v3::PropertyValue(0);
|
||||||
auto key2 = storage::v3::PropertyValue(0);
|
auto key2 = storage::v3::PropertyValue(0);
|
||||||
@ -125,7 +125,7 @@ ShardMap CreateDummyShardmap(coordinator::Address a_io_1, coordinator::Address a
|
|||||||
AddressAndStatus aas2_2{.address = b_io_2, .status = Status::CONSENSUS_PARTICIPANT};
|
AddressAndStatus aas2_2{.address = b_io_2, .status = Status::CONSENSUS_PARTICIPANT};
|
||||||
AddressAndStatus aas2_3{.address = b_io_3, .status = Status::CONSENSUS_PARTICIPANT};
|
AddressAndStatus aas2_3{.address = b_io_3, .status = Status::CONSENSUS_PARTICIPANT};
|
||||||
|
|
||||||
Shard shard2 = {aas2_1, aas2_2, aas2_3};
|
ShardMetadata shard2 = ShardMetadata{.peers = {aas2_1, aas2_2, aas2_3}, .version = 1};
|
||||||
|
|
||||||
auto key3 = storage::v3::PropertyValue(12);
|
auto key3 = storage::v3::PropertyValue(12);
|
||||||
auto key4 = storage::v3::PropertyValue(13);
|
auto key4 = storage::v3::PropertyValue(13);
|
||||||
|
@ -40,8 +40,8 @@ using memgraph::coordinator::CoordinatorRsm;
|
|||||||
using memgraph::coordinator::HlcRequest;
|
using memgraph::coordinator::HlcRequest;
|
||||||
using memgraph::coordinator::HlcResponse;
|
using memgraph::coordinator::HlcResponse;
|
||||||
using memgraph::coordinator::PrimaryKey;
|
using memgraph::coordinator::PrimaryKey;
|
||||||
using memgraph::coordinator::Shard;
|
|
||||||
using memgraph::coordinator::ShardMap;
|
using memgraph::coordinator::ShardMap;
|
||||||
|
using memgraph::coordinator::ShardMetadata;
|
||||||
using memgraph::coordinator::Shards;
|
using memgraph::coordinator::Shards;
|
||||||
using memgraph::coordinator::Status;
|
using memgraph::coordinator::Status;
|
||||||
using memgraph::io::Address;
|
using memgraph::io::Address;
|
||||||
@ -109,7 +109,7 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add
|
|||||||
AddressAndStatus aas1_2{.address = a_io_2, .status = Status::CONSENSUS_PARTICIPANT};
|
AddressAndStatus aas1_2{.address = a_io_2, .status = Status::CONSENSUS_PARTICIPANT};
|
||||||
AddressAndStatus aas1_3{.address = a_io_3, .status = Status::CONSENSUS_PARTICIPANT};
|
AddressAndStatus aas1_3{.address = a_io_3, .status = Status::CONSENSUS_PARTICIPANT};
|
||||||
|
|
||||||
Shard shard1 = {aas1_1, aas1_2, aas1_3};
|
ShardMetadata shard1 = ShardMetadata{.peers = {aas1_1, aas1_2, aas1_3}, .version = 1};
|
||||||
|
|
||||||
const auto key1 = PropertyValue(0);
|
const auto key1 = PropertyValue(0);
|
||||||
const auto key2 = PropertyValue(0);
|
const auto key2 = PropertyValue(0);
|
||||||
@ -121,7 +121,7 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add
|
|||||||
AddressAndStatus aas2_2{.address = b_io_2, .status = Status::CONSENSUS_PARTICIPANT};
|
AddressAndStatus aas2_2{.address = b_io_2, .status = Status::CONSENSUS_PARTICIPANT};
|
||||||
AddressAndStatus aas2_3{.address = b_io_3, .status = Status::CONSENSUS_PARTICIPANT};
|
AddressAndStatus aas2_3{.address = b_io_3, .status = Status::CONSENSUS_PARTICIPANT};
|
||||||
|
|
||||||
Shard shard2 = {aas2_1, aas2_2, aas2_3};
|
ShardMetadata shard2 = ShardMetadata{.peers = {aas2_1, aas2_2, aas2_3}, .version = 1};
|
||||||
|
|
||||||
auto key3 = PropertyValue(12);
|
auto key3 = PropertyValue(12);
|
||||||
auto key4 = PropertyValue(13);
|
auto key4 = PropertyValue(13);
|
||||||
@ -131,10 +131,10 @@ ShardMap CreateDummyShardmap(Address a_io_1, Address a_io_2, Address a_io_3, Add
|
|||||||
return sm;
|
return sm;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<ShardClient *> DetermineShardLocation(const Shard &target_shard, const std::vector<Address> &a_addrs,
|
std::optional<ShardClient *> DetermineShardLocation(const ShardMetadata &target_shard,
|
||||||
ShardClient &a_client, const std::vector<Address> &b_addrs,
|
const std::vector<Address> &a_addrs, ShardClient &a_client,
|
||||||
ShardClient &b_client) {
|
const std::vector<Address> &b_addrs, ShardClient &b_client) {
|
||||||
for (const auto &addr : target_shard) {
|
for (const auto &addr : target_shard.peers) {
|
||||||
if (addr.address == b_addrs[0]) {
|
if (addr.address == b_addrs[0]) {
|
||||||
return &b_client;
|
return &b_client;
|
||||||
}
|
}
|
||||||
@ -275,7 +275,7 @@ int main() {
|
|||||||
|
|
||||||
const PrimaryKey compound_key = {cm_key_1, cm_key_2};
|
const PrimaryKey compound_key = {cm_key_1, cm_key_2};
|
||||||
|
|
||||||
// Look for Shard
|
// Look for ShardMetadata
|
||||||
BasicResult<TimedOut, memgraph::coordinator::CoordinatorWriteResponses> read_res =
|
BasicResult<TimedOut, memgraph::coordinator::CoordinatorWriteResponses> read_res =
|
||||||
coordinator_client.SendWriteRequest(req);
|
coordinator_client.SendWriteRequest(req);
|
||||||
|
|
||||||
|
@ -47,8 +47,8 @@ using coordinator::GetShardMapRequest;
|
|||||||
using coordinator::GetShardMapResponse;
|
using coordinator::GetShardMapResponse;
|
||||||
using coordinator::Hlc;
|
using coordinator::Hlc;
|
||||||
using coordinator::HlcResponse;
|
using coordinator::HlcResponse;
|
||||||
using coordinator::Shard;
|
|
||||||
using coordinator::ShardMap;
|
using coordinator::ShardMap;
|
||||||
|
using coordinator::ShardMetadata;
|
||||||
using io::Address;
|
using io::Address;
|
||||||
using io::Io;
|
using io::Io;
|
||||||
using io::rsm::RsmClient;
|
using io::rsm::RsmClient;
|
||||||
|
@ -44,8 +44,8 @@ using coordinator::GetShardMapRequest;
|
|||||||
using coordinator::GetShardMapResponse;
|
using coordinator::GetShardMapResponse;
|
||||||
using coordinator::Hlc;
|
using coordinator::Hlc;
|
||||||
using coordinator::HlcResponse;
|
using coordinator::HlcResponse;
|
||||||
using coordinator::Shard;
|
|
||||||
using coordinator::ShardMap;
|
using coordinator::ShardMap;
|
||||||
|
using coordinator::ShardMetadata;
|
||||||
using io::Address;
|
using io::Address;
|
||||||
using io::Io;
|
using io::Io;
|
||||||
using io::local_transport::LocalSystem;
|
using io::local_transport::LocalSystem;
|
||||||
|
@ -45,8 +45,8 @@ using memgraph::coordinator::CoordinatorWriteRequests;
|
|||||||
using memgraph::coordinator::CoordinatorWriteResponses;
|
using memgraph::coordinator::CoordinatorWriteResponses;
|
||||||
using memgraph::coordinator::Hlc;
|
using memgraph::coordinator::Hlc;
|
||||||
using memgraph::coordinator::HlcResponse;
|
using memgraph::coordinator::HlcResponse;
|
||||||
using memgraph::coordinator::Shard;
|
|
||||||
using memgraph::coordinator::ShardMap;
|
using memgraph::coordinator::ShardMap;
|
||||||
|
using memgraph::coordinator::ShardMetadata;
|
||||||
using memgraph::io::Io;
|
using memgraph::io::Io;
|
||||||
using memgraph::io::local_transport::LocalSystem;
|
using memgraph::io::local_transport::LocalSystem;
|
||||||
using memgraph::io::local_transport::LocalTransport;
|
using memgraph::io::local_transport::LocalTransport;
|
||||||
|
Loading…
Reference in New Issue
Block a user