Add AllocatePropertyIds request/response pair for Coordinator + ShardMap

This commit is contained in:
Tyler Neely 2022-08-31 14:22:40 +00:00
parent 61b1fbfbc9
commit 27459a9eb0
2 changed files with 60 additions and 7 deletions

View File

@ -23,6 +23,7 @@
namespace memgraph::coordinator {
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::PropertyId;
using Address = memgraph::io::Address;
using SimT = memgraph::io::simulator::SimulatorTransport;
@ -63,6 +64,14 @@ struct AllocateEdgeIdBatchResponse {
uint64_t high;
};
struct AllocatePropertyIdsRequest {
std::vector<std::string> property_names;
};
struct AllocatePropertyIdsResponse {
std::map<std::string, PropertyId> property_ids;
};
struct SplitShardRequest {
Hlc previous_shard_map_version;
LabelId label_id;
@ -101,10 +110,10 @@ struct InitializeLabelResponse {
using WriteRequests =
std::variant<AllocateHlcBatchRequest, AllocateEdgeIdBatchRequest, SplitShardRequest, RegisterStorageEngineRequest,
DeregisterStorageEngineRequest, InitializeLabelRequest>;
using WriteResponses =
std::variant<AllocateHlcBatchResponse, AllocateEdgeIdBatchResponse, SplitShardResponse,
RegisterStorageEngineResponse, DeregisterStorageEngineResponse, InitializeLabelResponse>;
DeregisterStorageEngineRequest, InitializeLabelRequest, AllocatePropertyIdsRequest>;
using WriteResponses = std::variant<AllocateHlcBatchResponse, AllocateEdgeIdBatchResponse, SplitShardResponse,
RegisterStorageEngineResponse, DeregisterStorageEngineResponse,
InitializeLabelResponse, AllocatePropertyIdsResponse>;
using ReadRequests = std::variant<HlcRequest, GetShardMapRequest>;
using ReadResponses = std::variant<HlcResponse, GetShardMapResponse>;
@ -230,15 +239,27 @@ class Coordinator {
return res;
}
WriteResponses ApplyWrite(AllocatePropertyIdsRequest &&allocate_property_ids_request) {
AllocatePropertyIdsResponse res{};
auto property_ids = shard_map_.AllocatePropertyIds(allocate_property_ids_request.property_names);
res.property_ids = property_ids;
return res;
}
public:
explicit Coordinator(ShardMap sm) : shard_map_{(sm)} {}
ReadResponses Read(ReadRequests requests) {
return std::visit([&](auto &&requests) { return HandleRead(std::move(requests)); }, std::move(requests));
return std::visit([&](auto &&request) mutable { return HandleRead(std::forward<decltype(request)>(request)); },
std::move(requests));
}
WriteResponses Apply(WriteRequests requests) {
return std::visit([&](auto &&requests) { return ApplyWrite(std::move(requests)); }, std::move(requests));
return std::visit([&](auto &&request) mutable { return ApplyWrite(std::forward<decltype(request)>(request)); },
std::move(requests));
}
};

View File

@ -23,6 +23,7 @@ namespace memgraph::coordinator {
using memgraph::io::Address;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::PropertyId;
enum class Status : uint8_t {
CONSENSUS_PARTICIPANT,
@ -41,12 +42,16 @@ using CompoundKey = std::vector<memgraph::storage::v3::PropertyValue>;
using Shard = std::vector<AddressAndStatus>;
using Shards = std::map<CompoundKey, Shard>;
using LabelName = std::string;
using PropertyName = std::string;
using PropertyMap = std::map<PropertyName, PropertyId>;
struct ShardMap {
Hlc shard_map_version;
uint64_t max_property_id;
std::map<PropertyName, PropertyId> properties;
uint64_t max_label_id;
std::map<LabelName, LabelId> labels;
std::map<LabelId, Shards> shards;
uint64_t max_label_id;
// TODO(gabor) later we will want to update the wallclock time with
// the given Io<impl>'s time as well
@ -140,6 +145,33 @@ struct ShardMap {
return std::prev(shard_for_label.upper_bound(key))->second;
}
PropertyMap AllocatePropertyIds(std::vector<PropertyName> &new_properties) {
PropertyMap ret{};
bool mutated = false;
for (const auto &property_name : new_properties) {
if (properties.contains(property_name)) {
auto property_id = properties.at(property_name);
ret.emplace(property_name, property_id);
} else {
mutated = true;
const PropertyId property_id = PropertyId::FromUint(++max_property_id);
ret.emplace(property_name, property_id);
properties.emplace(property_name, property_id);
}
}
if (mutated) {
IncrementShardMapVersion();
}
return ret;
}
};
} // namespace memgraph::coordinator