Added request for edge id batch request to coordinator rsm

This commit is contained in:
Tyler Neely 2022-08-10 11:29:20 +00:00
parent 3c9fe7ef42
commit 27fcea40cc

View File

@ -52,6 +52,15 @@ struct AllocateHlcBatchResponse {
Hlc high;
};
struct AllocateEdgeIdBatchRequest {
size_t batch_size;
};
struct AllocateEdgeIdBatchResponse {
uint64_t low;
uint64_t high;
};
struct SplitShardRequest {
Hlc previous_shard_map_version;
Label label;
@ -78,10 +87,10 @@ struct DeregisterStorageEngineResponse {
bool success;
};
using WriteRequests = std::variant<AllocateHlcBatchRequest, SplitShardRequest, RegisterStorageEngineRequest,
DeregisterStorageEngineRequest>;
using WriteResponses = std::variant<AllocateHlcBatchResponse, SplitShardResponse, RegisterStorageEngineResponse,
DeregisterStorageEngineResponse>;
using WriteRequests = std::variant<AllocateHlcBatchRequest, AllocateEdgeIdBatchRequest, SplitShardRequest,
RegisterStorageEngineRequest, DeregisterStorageEngineRequest>;
using WriteResponses = std::variant<AllocateHlcBatchResponse, AllocateEdgeIdBatchResponse, SplitShardResponse,
RegisterStorageEngineResponse, DeregisterStorageEngineResponse>;
using ReadRequests = std::variant<HlcRequest, GetShardMapRequest>;
using ReadResponses = std::variant<HlcResponse, GetShardMapResponse>;
@ -103,6 +112,9 @@ class Coordinator {
uint64_t highest_allocated_timestamp_;
uint64_t highest_reserved_timestamp_;
/// Query engines need to periodically request batches of unique edge IDs.
uint64_t highest_allocated_edge_id_;
/// Increment our
ReadResponses Read(HlcRequest hlc_request) {
HlcResponse res{};
@ -129,16 +141,31 @@ class Coordinator {
return res;
}
WriteResponses Apply(AllocateHlcBatchRequest &&ahr) {
WriteResponses ApplyWrite(AllocateHlcBatchRequest &&ahr) {
AllocateHlcBatchResponse res{};
return res;
}
WriteResponses ApplyWrite(AllocateEdgeIdBatchRequest &&ahr) {
AllocateEdgeIdBatchResponse res{};
uint64_t low = highest_allocated_edge_id_;
highest_allocated_edge_id_ += ahr.batch_size;
uint64_t high = highest_allocated_edge_id_;
res.low = low;
res.high = high;
return res;
}
/// This splits the shard immediately beneath the provided
/// split key, keeping the assigned peers identical for now,
/// but letting them be gradually migrated over time.
WriteResponses Apply(SplitShardRequest &&split_shard_request) {
WriteResponses ApplyWrite(SplitShardRequest &&split_shard_request) {
SplitShardResponse res{};
if (split_shard_request.previous_shard_map_version != shard_map_.shard_map_version) {
@ -153,7 +180,7 @@ class Coordinator {
/// This adds the provided storage engine to the standby storage engine pool,
/// which can be used to rebalance storage over time.
WriteResponses Apply(RegisterStorageEngineRequest &&register_storage_engine_request) {
WriteResponses ApplyWrite(RegisterStorageEngineRequest &&register_storage_engine_request) {
RegisterStorageEngineResponse res{};
// TODO
@ -162,7 +189,7 @@ class Coordinator {
/// This begins the process of draining the provided storage engine from all raft
/// clusters that it might be participating in.
WriteResponses Apply(DeregisterStorageEngineRequest &&register_storage_engine_request) {
WriteResponses ApplyWrite(DeregisterStorageEngineRequest &&register_storage_engine_request) {
DeregisterStorageEngineResponse res{};
// TODO
// const Address &address = register_storage_engine_request.address;
@ -196,7 +223,7 @@ class Coordinator {
}
WriteResponses Apply(WriteRequests requests) {
return std::visit([&](auto &&requests) { return Apply(requests); }, std::move(requests));
return std::visit([&](auto &&requests) { return ApplyWrite(std::move(requests)); }, std::move(requests));
}
};