A number of clean-ups related to the coordinator and its tests

This commit is contained in:
Tyler Neely 2022-08-30 15:14:28 +00:00
parent 025df32d9e
commit 61b1fbfbc9
4 changed files with 97 additions and 80 deletions

View File

@ -18,9 +18,11 @@
#include "io/simulator/simulator.hpp"
#include "io/time.hpp"
#include "io/transport.hpp"
#include "storage/v3/id_types.hpp"
namespace memgraph::coordinator {
using memgraph::storage::v3::LabelId;
using Address = memgraph::io::Address;
using SimT = memgraph::io::simulator::SimulatorTransport;
@ -63,7 +65,7 @@ struct AllocateEdgeIdBatchResponse {
struct SplitShardRequest {
Hlc previous_shard_map_version;
Label label;
LabelId label_id;
CompoundKey split_key;
};
@ -183,7 +185,7 @@ class Coordinator {
if (split_shard_request.previous_shard_map_version != shard_map_.shard_map_version) {
res.success = false;
} else {
res.success = shard_map_.SplitShard(split_shard_request.previous_shard_map_version, split_shard_request.label,
res.success = shard_map_.SplitShard(split_shard_request.previous_shard_map_version, split_shard_request.label_id,
split_shard_request.split_key);
}

View File

@ -16,10 +16,14 @@
#include "coordinator/hybrid_logical_clock.hpp"
#include "io/address.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp"
namespace memgraph::coordinator {
using memgraph::io::Address;
using memgraph::storage::v3::LabelId;
enum class Status : uint8_t {
CONSENSUS_PARTICIPANT,
INITIALIZING,
@ -33,18 +37,16 @@ struct AddressAndStatus {
Status status;
};
using memgraph::io::Address;
using CompoundKey = std::vector<memgraph::storage::v3::PropertyValue>;
using Shard = std::vector<AddressAndStatus>;
using Shards = std::map<CompoundKey, Shard>;
// use string for intermachine communication and NameIdMapper within the machine
using Label = std::string;
using LabelName = std::string;
struct ShardMap {
Hlc shard_map_version;
std::map<Label, Shards> shards;
std::map<LabelName, LabelId> labels;
std::map<LabelId, Shards> shards;
uint64_t max_label_id;
// TODO(gabor) later we will want to update the wallclock time with
// the given Io<impl>'s time as well
@ -55,32 +57,35 @@ struct ShardMap {
Hlc GetHlc() const noexcept { return shard_map_version; }
bool SplitShard(Hlc previous_shard_map_version, Label label, CompoundKey key) {
if (previous_shard_map_version == shard_map_version) {
MG_ASSERT(shards.contains(label));
auto &shards_in_map = shards[label];
MG_ASSERT(!shards_in_map.contains(key));
// Finding the Shard that the new CompoundKey should map to.
Shard shard_to_map_to;
CompoundKey prev_key = ((*shards_in_map.begin()).first);
for (auto iter = std::next(shards_in_map.begin()); iter != shards_in_map.end(); ++iter) {
const auto &current_key = (*iter).first;
if (key > prev_key && key < current_key) {
shard_to_map_to = shards_in_map[prev_key];
}
prev_key = (*iter).first;
}
// Apply the split
shards_in_map[key] = shard_to_map_to;
return true;
bool SplitShard(Hlc previous_shard_map_version, LabelId label_id, CompoundKey key) {
if (previous_shard_map_version != shard_map_version) {
return false;
}
return false;
if (!shards.contains(label_id)) {
return false;
}
auto &shards_in_map = shards.at(label_id);
MG_ASSERT(!shards_in_map.contains(key));
// Finding the Shard that the new CompoundKey should map to.
Shard shard_to_map_to;
CompoundKey prev_key = ((*shards_in_map.begin()).first);
for (auto iter = std::next(shards_in_map.begin()); iter != shards_in_map.end(); ++iter) {
const auto &current_key = (*iter).first;
if (key > prev_key && key < current_key) {
shard_to_map_to = shards_in_map[prev_key];
}
prev_key = (*iter).first;
}
// Apply the split
shards_in_map[key] = shard_to_map_to;
return true;
}
bool InitializeNewLabel(std::string label_name, Hlc last_shard_map_version) {
@ -88,11 +93,14 @@ struct ShardMap {
return false;
}
if (shards.contains(label_name)) {
if (labels.contains(label_name)) {
return false;
}
shards.emplace(label_name, Shards{});
const LabelId label_id = LabelId::FromUint(++max_label_id);
labels.emplace(label_name, label_id);
shards.emplace(label_id, Shards{});
IncrementShardMapVersion();
@ -103,18 +111,19 @@ struct ShardMap {
// Find a random place for the server to plug in
}
std::map<Label, Shards> &GetShards() noexcept { return shards; }
Shards GetShardsForRange(Label label, CompoundKey start_key, CompoundKey end_key) {
Shards GetShardsForRange(LabelName label_name, CompoundKey start_key, CompoundKey end_key) {
MG_ASSERT(start_key <= end_key);
MG_ASSERT(labels.contains(label_name));
const auto &shard_for_label = shards.at(label);
LabelId label_id = labels.at(label_name);
Shards shards{};
const auto &shard_for_label = shards.at(label_id);
auto it = std::prev(shard_for_label.upper_bound(start_key));
const auto end_it = shard_for_label.upper_bound(end_key);
Shards shards{};
for (; it != end_it; it++) {
shards.emplace(it->first, it->second);
}
@ -122,8 +131,12 @@ struct ShardMap {
return shards;
}
Shard GetShardForKey(Label label, CompoundKey key) {
const auto &shard_for_label = shards.at(label);
Shard GetShardForKey(LabelName label_name, CompoundKey key) {
MG_ASSERT(labels.contains(label_name));
LabelId label_id = labels.at(label_name);
const auto &shard_for_label = shards.at(label_id);
return std::prev(shard_for_label.upper_bound(key))->second;
}

View File

@ -34,31 +34,25 @@
#include "io/rsm/raft.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp"
#include "utils/logging.hpp"
namespace memgraph::io::rsm {
using memgraph::coordinator::Hlc;
using memgraph::io::Address;
using memgraph::io::Io;
using memgraph::io::ResponseEnvelope;
using memgraph::io::ResponseFuture;
using memgraph::io::ResponseResult;
using memgraph::io::rsm::Raft;
using memgraph::io::rsm::ReadRequest;
using memgraph::io::rsm::ReadResponse;
using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::PropertyValue;
namespace memgraph::io::rsm {
using ShardRsmKey = std::vector<PropertyValue>;
struct StorageWriteRequest {
LabelId label_id;
Hlc transaction_id;
ShardRsmKey key;
std::optional<int> value;
};
@ -71,6 +65,8 @@ struct StorageWriteResponse {
};
struct StorageReadRequest {
LabelId label_id;
Hlc transaction_id;
ShardRsmKey key;
};

View File

@ -27,6 +27,7 @@
#include "io/rsm/shard_rsm.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "storage/v3/id_types.hpp"
#include "utils/result.hpp"
using memgraph::coordinator::AddressAndStatus;
@ -61,48 +62,52 @@ using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::storage::v3::LabelId;
using memgraph::utils::BasicResult;
using StorageClient =
RsmClient<SimulatorTransport, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse>;
namespace {
const std::string label_name = std::string("test_label");
ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::coordinator::Address a_io_2,
memgraph::coordinator::Address a_io_3, memgraph::coordinator::Address b_io_1,
memgraph::coordinator::Address b_io_2, memgraph::coordinator::Address b_io_3) {
ShardMap sm1;
auto &shards = sm1.GetShards();
ShardMap sm;
// 1
std::string label1 = std::string("label1");
auto key1 = memgraph::storage::v3::PropertyValue(3);
auto key2 = memgraph::storage::v3::PropertyValue(4);
CompoundKey cm1 = {key1, key2};
// register new label space
bool label_success = sm.InitializeNewLabel(label_name, sm.shard_map_version);
MG_ASSERT(label_success);
LabelId label_id = sm.labels.at(label_name);
Shards &shards_for_label = sm.shards.at(label_id);
// add first shard at [0, 0]
AddressAndStatus aas1_1{.address = a_io_1, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas1_2{.address = a_io_2, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas1_3{.address = a_io_3, .status = Status::CONSENSUS_PARTICIPANT};
Shard shard1 = {aas1_1, aas1_2, aas1_3};
Shards shards1;
shards1[cm1] = shard1;
// 2
std::string label2 = std::string("label2");
auto key3 = memgraph::storage::v3::PropertyValue(12);
auto key4 = memgraph::storage::v3::PropertyValue(13);
CompoundKey cm2 = {key3, key4};
auto key1 = memgraph::storage::v3::PropertyValue(0);
auto key2 = memgraph::storage::v3::PropertyValue(0);
CompoundKey compound_key_1 = {key1, key2};
shards_for_label[compound_key_1] = shard1;
// add second shard at [12, 13]
AddressAndStatus aas2_1{.address = b_io_1, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas2_2{.address = b_io_2, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas2_3{.address = b_io_3, .status = Status::CONSENSUS_PARTICIPANT};
Shard shard2 = {aas2_1, aas2_2, aas2_3};
Shards shards2;
shards2[cm2] = shard2;
shards[label1] = shards1;
shards[label2] = shards2;
auto key3 = memgraph::storage::v3::PropertyValue(12);
auto key4 = memgraph::storage::v3::PropertyValue(13);
CompoundKey compound_key_2 = {key3, key4};
shards_for_label[compound_key_2] = shard2;
return sm1;
return sm;
}
std::optional<StorageClient> DetermineShardLocation(Shard target_shard, const std::vector<Address> &a_addrs,
@ -268,25 +273,24 @@ int main() {
client_shard_map = hlc_response.fresher_shard_map.value();
}
// TODO(gabor) check somewhere in the call chain if the entries are actually valid
// for (auto &[key, val] : client_shard_map.GetShards()) {
// std::cout << "key: " << key << std::endl;
// }
auto target_shard = client_shard_map.GetShardForKey(label_name, compound_key);
auto target_shard = client_shard_map.GetShardForKey(std::string("label1"), compound_key);
// Determine which shard to send the requests to
// Determine which shard to send the requests to. This should be a more proper client cache in the "real" version.
auto storage_client_opt = DetermineShardLocation(target_shard, a_addrs, shard_a_client, b_addrs, shard_b_client);
MG_ASSERT(storage_client_opt);
auto storage_client = storage_client_opt.value();
LabelId label_id = client_shard_map.labels.at(label_name);
// Have client use shard map to decide which shard to communicate
// with in order to write a new value
// client_shard_map.
StorageWriteRequest storage_req;
storage_req.label_id = label_id;
storage_req.key = compound_key;
storage_req.value = 1000;
storage_req.transaction_id = transaction_id;
auto write_response_result = storage_client.SendWriteRequest(storage_req);
if (write_response_result.HasError()) {
@ -304,7 +308,9 @@ int main() {
// with to read that same value back
StorageReadRequest storage_get_req;
storage_get_req.label_id = label_id;
storage_get_req.key = compound_key;
storage_get_req.transaction_id = transaction_id;
auto get_response_result = storage_client.SendReadRequest(storage_get_req);
if (get_response_result.HasError()) {