Add basic skeleton for the coordinator-half of the shard splits

This commit is contained in:
Tyler Neely 2023-01-19 17:23:34 +00:00
parent 00d6b42c37
commit 6e4d9b4b04
4 changed files with 125 additions and 73 deletions

View File

@ -18,12 +18,7 @@ CoordinatorWriteResponses Coordinator::ApplyWrite(HeartbeatRequest &&heartbeat_r
// add this storage engine to any under-replicated shards that it is not already a part of
auto initializing_rsms_for_shard_manager =
shard_map_.AssignShards(heartbeat_request.from_storage_manager, heartbeat_request.initialized_rsms);
return HeartbeatResponse{
.shards_to_initialize = initializing_rsms_for_shard_manager,
};
return shard_map_.AssignShards(heartbeat_request.from_storage_manager, heartbeat_request.initialized_rsms);
}
CoordinatorWriteResponses Coordinator::ApplyWrite(HlcRequest &&hlc_request) {
@ -67,12 +62,8 @@ CoordinatorWriteResponses Coordinator::ApplyWrite(AllocateEdgeIdBatchRequest &&a
CoordinatorWriteResponses Coordinator::ApplyWrite(SplitShardRequest &&split_shard_request) {
SplitShardResponse res{};
if (split_shard_request.previous_shard_map_version != shard_map_.shard_map_version) {
res.success = false;
} else {
res.success = shard_map_.SplitShard(split_shard_request.previous_shard_map_version, split_shard_request.label_id,
split_shard_request.split_key);
}
res.success = shard_map_.SplitShard(split_shard_request.previous_shard_map_version, split_shard_request.label_id,
split_shard_request.split_key);
return res;
}

View File

@ -121,15 +121,6 @@ struct InitializeLabelResponse {
std::optional<ShardMap> fresher_shard_map;
};
struct HeartbeatRequest {
Address from_storage_manager;
std::set<boost::uuids::uuid> initialized_rsms;
};
struct HeartbeatResponse {
std::vector<ShardToInitialize> shards_to_initialize;
};
using CoordinatorWriteRequests =
std::variant<HlcRequest, AllocateEdgeIdBatchRequest, SplitShardRequest, RegisterStorageEngineRequest,
DeregisterStorageEngineRequest, InitializeLabelRequest, AllocatePropertyIdsRequest, HeartbeatRequest>;

View File

@ -9,6 +9,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <map>
#include <optional>
#include <unordered_map>
#include <vector>
@ -267,9 +268,8 @@ boost::uuids::uuid NewShardUuid(uint64_t shard_id) {
static_cast<unsigned char>(shard_id)};
}
std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
std::set<boost::uuids::uuid> initialized) {
std::vector<ShardToInitialize> ret{};
HeartbeatResponse ShardMap::AssignShards(Address storage_manager, std::set<boost::uuids::uuid> initialized) {
HeartbeatResponse ret{};
bool mutated = false;
@ -281,48 +281,74 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
high_key = next_it->first;
}
// TODO(tyler) avoid these triple-nested loops by having the heartbeat include better info
bool machine_contains_shard = false;
bool shard_assigned_to_machine = false;
for (auto &aas : shard.peers) {
if (initialized.contains(aas.address.unique_id)) {
machine_contains_shard = true;
if (aas.status != Status::CONSENSUS_PARTICIPANT) {
for (auto &peer_metadata : shard.peers) {
const bool same_machine = peer_metadata.address.last_known_ip == storage_manager.last_known_ip &&
peer_metadata.address.last_known_port == storage_manager.last_known_port;
if (initialized.contains(peer_metadata.address.unique_id)) {
shard_assigned_to_machine = true;
if (!same_machine) {
// set the last known ip and port to the storage manager that has heartbeated it just now
mutated = true;
spdlog::info("marking shard as full consensus participant: {}", aas.address.unique_id);
aas.status = Status::CONSENSUS_PARTICIPANT;
peer_metadata.address.last_known_ip = storage_manager.last_known_ip;
peer_metadata.address.last_known_port = storage_manager.last_known_port;
}
} else {
const bool same_machine = aas.address.last_known_ip == storage_manager.last_known_ip &&
aas.address.last_known_port == storage_manager.last_known_port;
if (same_machine) {
machine_contains_shard = true;
spdlog::info("reminding shard manager that they should begin participating in shard");
ret.push_back(ShardToInitialize{
.uuid = aas.address.unique_id,
.label_id = label_id,
.min_key = low_key,
.max_key = high_key,
.schema = schemas[label_id],
.config = Config{},
.id_to_names = IdToNames(),
});
if (peer_metadata.status != Status::CONSENSUS_PARTICIPANT) {
mutated = true;
spdlog::info("marking shard as full consensus participant: {}", peer_metadata.address.unique_id);
peer_metadata.status = Status::CONSENSUS_PARTICIPANT;
}
} else if (same_machine && peer_metadata.status == Status::INITIALIZING) {
// we are expecting this shard to be initialized (rather than split) on this machine,
// so send it an initialization request
shard_assigned_to_machine = true;
spdlog::info("reminding shard manager that they should begin participating in shard");
ret.shards_to_initialize.push_back(ShardToInitialize{
.uuid = peer_metadata.address.unique_id,
.label_id = label_id,
.min_key = low_key,
.max_key = high_key,
.schema = schemas[label_id],
.config = Config{},
.id_to_names = IdToNames(),
});
} else if (same_machine && peer_metadata.status == Status::PENDING_SPLIT) {
// we are expecting this shard to be split, so send it a split request
ret.shards_to_split.push_back(ShardToSplit{
.shard_to_split_uuid = peer_metadata.split_from,
.new_right_side_uuid = peer_metadata.address.unique_id,
.split_requested_at = shard.version,
.label_id = label_id,
.split_key = low_key,
.schema = schemas[label_id],
.config = Config{},
.id_to_names = IdToNames(),
});
} else {
MG_ASSERT(
!same_machine,
"failed to properly handle a new Status type in the heartbeat management and shard assignment code");
}
}
if (!machine_contains_shard && shard.peers.size() < label_space.replication_factor) {
// increment version for each new uuid for deterministic creation
IncrementShardMapVersion();
if (!shard_assigned_to_machine && shard.peers.size() < label_space.replication_factor) {
Address address = storage_manager;
// TODO(tyler) use deterministic UUID so that coordinators don't diverge here
// NB: increment version for each new uuid for deterministic creation
IncrementShardMapVersion();
address.unique_id = NewShardUuid(shard_map_version.logical_id);
spdlog::info("assigning shard manager to shard");
ret.push_back(ShardToInitialize{
ret.shards_to_initialize.push_back(ShardToInitialize{
.uuid = address.unique_id,
.label_id = label_id,
.min_key = low_key,
@ -332,12 +358,12 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
.id_to_names = IdToNames(),
});
AddressAndStatus aas = {
PeerMetadata peer_metadata = {
.address = address,
.status = Status::INITIALIZING,
};
shard.peers.emplace_back(aas);
shard.peers.emplace_back(peer_metadata);
}
}
}
@ -345,11 +371,12 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
if (mutated) {
IncrementShardMapVersion();
}
return ret;
}
bool ShardMap::SplitShard(Hlc previous_shard_map_version, LabelId label_id, const PrimaryKey &key) {
if (previous_shard_map_version != shard_map_version) {
if (previous_shard_map_version != shard_map_version || !label_spaces.contains(label_id)) {
return false;
}
@ -358,11 +385,26 @@ bool ShardMap::SplitShard(Hlc previous_shard_map_version, LabelId label_id, cons
MG_ASSERT(!shards_in_map.empty());
MG_ASSERT(!shards_in_map.contains(key));
MG_ASSERT(label_spaces.contains(label_id));
// Finding the ShardMetadata that the new PrimaryKey should map to.
auto prev = std::prev(shards_in_map.upper_bound(key));
ShardMetadata duplicated_shard = prev->second;
ShardMetadata duplicated_shard = GetShardForKey(label_id, key);
std::map<boost::uuids::uuid, boost::uuids::uuid> split_mapping = {};
for (auto &peer_metadata : duplicated_shard.peers) {
peer_metadata.status = Status::PENDING_SPLIT;
peer_metadata.split_from = peer_metadata.address.unique_id;
// NB: increment version for each new uuid for deterministic creation
IncrementShardMapVersion();
auto new_uuid = NewShardUuid(shard_map_version.logical_id);
// store new uuid for the right side of each shard
split_mapping.emplace(peer_metadata.address.unique_id, new_uuid);
peer_metadata.address.unique_id = new_uuid;
}
// Apply the split
shards_in_map[key] = duplicated_shard;
@ -561,8 +603,8 @@ bool ShardMap::ClusterInitialized() const {
return false;
}
for (const auto &aas : shard.peers) {
if (aas.status != Status::CONSENSUS_PARTICIPANT) {
for (const auto &peer_metadata : shard.peers) {
if (peer_metadata.status != Status::CONSENSUS_PARTICIPANT) {
spdlog::info("shard member not yet a CONSENSUS_PARTICIPANT");
return false;
}

View File

@ -47,38 +47,45 @@ using memgraph::storage::v3::SchemaProperty;
enum class Status : uint8_t {
CONSENSUS_PARTICIPANT,
INITIALIZING,
PENDING_SPLIT,
// TODO(tyler) this will possibly have more states,
// depending on the reconfiguration protocol that we
// implement.
};
struct AddressAndStatus {
struct PeerMetadata {
memgraph::io::Address address;
Status status;
boost::uuids::uuid split_from;
friend bool operator<(const AddressAndStatus &lhs, const AddressAndStatus &rhs) { return lhs.address < rhs.address; }
friend bool operator<(const PeerMetadata &lhs, const PeerMetadata &rhs) { return lhs.address < rhs.address; }
friend std::ostream &operator<<(std::ostream &in, const AddressAndStatus &address_and_status) {
in << "AddressAndStatus { address: ";
in << address_and_status.address;
if (address_and_status.status == Status::CONSENSUS_PARTICIPANT) {
in << ", status: CONSENSUS_PARTICIPANT }";
friend std::ostream &operator<<(std::ostream &in, const PeerMetadata &peer_metadata) {
in << "PeerMetadata { address: ";
in << peer_metadata.address;
if (peer_metadata.status == Status::CONSENSUS_PARTICIPANT) {
in << ", status: CONSENSUS_PARTICIPANT";
} else if (peer_metadata.status == Status::INITIALIZING) {
in << ", status: INITIALIZING";
} else if (peer_metadata.status == Status::PENDING_SPLIT) {
in << ", status: PENDING_SPLIT";
} else {
in << ", status: INITIALIZING }";
MG_ASSERT(false, "failed to update the operator<< implementation for Status");
}
in << ", split_from: " << peer_metadata.split_from << " }";
return in;
}
friend bool operator==(const AddressAndStatus &lhs, const AddressAndStatus &rhs) {
return lhs.address == rhs.address;
}
friend bool operator==(const PeerMetadata &lhs, const PeerMetadata &rhs) { return lhs.address == rhs.address; }
};
using PrimaryKey = std::vector<PropertyValue>;
struct ShardMetadata {
std::vector<AddressAndStatus> peers;
std::vector<PeerMetadata> peers;
uint64_t version;
friend std::ostream &operator<<(std::ostream &in, const ShardMetadata &shard) {
@ -121,6 +128,27 @@ struct ShardToInitialize {
std::unordered_map<uint64_t, std::string> id_to_names;
};
struct ShardToSplit {
boost::uuids::uuid shard_to_split_uuid;
boost::uuids::uuid new_right_side_uuid;
Hlc split_requested_at;
LabelId label_id;
PrimaryKey split_key;
std::vector<SchemaProperty> schema;
Config config;
std::unordered_map<uint64_t, std::string> id_to_names;
};
struct HeartbeatRequest {
Address from_storage_manager;
std::set<boost::uuids::uuid> initialized_rsms;
};
struct HeartbeatResponse {
std::vector<ShardToInitialize> shards_to_initialize;
std::vector<ShardToSplit> shards_to_split;
};
PrimaryKey SchemaToMinKey(const std::vector<SchemaProperty> &schema);
struct LabelSpace {
@ -168,7 +196,7 @@ struct ShardMap {
std::unordered_map<uint64_t, std::string> IdToNames();
// Returns the shard UUIDs that have been assigned but not yet acknowledged for this storage manager
std::vector<ShardToInitialize> AssignShards(Address storage_manager, std::set<boost::uuids::uuid> initialized);
HeartbeatResponse AssignShards(Address storage_manager, std::set<boost::uuids::uuid> initialized);
bool SplitShard(Hlc previous_shard_map_version, LabelId label_id, const PrimaryKey &key);