Add new test, start to fill out coordinator RSM

This commit is contained in:
Tyler Neely 2022-08-04 10:55:05 +00:00
parent 523e2b9186
commit e3dd404865
5 changed files with 140 additions and 26 deletions

View File

@ -20,9 +20,28 @@
namespace memgraph::coordinator {
using Address = memgraph::io::Address;
using Io = memgraph::io::Io;
using SimT = memgraph::io::simulator::SimulatorTransport;
struct HlcRequest {
Hlc last_shard_map_version;
};
struct HlcResponse {
Hlc new_hlc;
std::optional<ShardMap> fresher_shard_map;
};
struct AllocateHlcBatchRequest {
Hlc low;
Hlc high;
};
struct AllocateHlcBatchResponse {
bool success;
Hlc low;
Hlc high;
};
struct SplitShardRequest {
Hlc previous_shard_map_version;
Label label;
@ -49,43 +68,53 @@ struct DeregisterStorageEngineResponse {
bool success;
};
struct HlcRequest {
Hlc last_shard_map_version;
};
using WriteRequests = std::variant<AllocateHlcBatchRequest, SplitShardRequest, RegisterStorageEngineRequest,
DeregisterStorageEngineRequest>;
using WriteResponses = std::variant<AllocateHlcBatchResponse, SplitShardResponse, RegisterStorageEngineResponse,
DeregisterStorageEngineResponse>;
struct HlcResponse {
Hlc new_hlc;
std::optional<ShardMap> fresher_shard_map;
};
using ReadRequests = std::variant<HlcRequest>;
using ReadResponses = std::variant<HlcResponse>;
class Coordinator {
ShardMap shard_map_;
Io<SimT> io_;
/// This splits the shard
void Handle(SplitShardRequest &split_shard_request, Address from_addr) {
WriteResponses Apply(AllocateHlcBatchRequest &&ahr) {
AllocateHlcBatchResponse res{};
return res;
}
/// This splits the shard immediately beneath the provided
/// split key, keeping the assigned peers identical for now,
/// but letting them be gradually migrated over time.
WriteResponses Apply(SplitShardRequest &&split_shard_request) {
SplitShardResponse res{};
if (split_shard_request.previous_shard_map_version != shard_map_.shard_map_version) {
// TODO reply with failure
}
return res;
}
void Handle(RegisterStorageEngineRequest &register_storage_engine_request, Address from_addr) {}
WriteResponses Apply(RegisterStorageEngineRequest &&register_storage_engine_request) {
RegisterStorageEngineResponse res{};
return res;
}
WriteResponses Apply(DeregisterStorageEngineRequest &&register_storage_engine_request) {
DeregisterStorageEngineResponse res{};
return res;
}
public:
void Run() {
while (!io_.ShouldShutDown()) {
std::cout << "[Coordinator] Is receiving..." << std::endl;
auto request_result =
io_.Receive<SplitShardRequest, RegisterStorageEngineRequest, DeregisterStorageEngineRequest, HlcRequest>();
if (request_result.HasError()) {
std::cout << "[Coordinator] Error, continue" << std::endl;
continue;
}
ReadResponses Read(ReadRequests requests) { return HlcResponse{}; }
auto request_envelope = request_result.GetValue();
// TODO std::visit to determine whether to handle shard split, registration etc... (see raft.hpp Run / Handle
// methods in T0941)
}
WriteResponses Apply(WriteRequests requests) {
return std::visit([&](auto &&requests) { return Apply(requests); }, std::move(requests));
}
};

View File

@ -21,6 +21,10 @@ using Time = memgraph::io::Time;
struct Hlc {
uint64_t logical_id;
Time coordinator_wall_clock;
bool operator==(const Hlc &other) const {
return (logical_id == other.logical_id) && (coordinator_wall_clock == other.coordinator_wall_clock);
}
};
} // namespace memgraph::coordinator

View File

@ -16,6 +16,7 @@
#include "coordinator/hybrid_logical_clock.hpp"
#include "io/address.hpp"
#include "storage/v3/property_value.hpp"
namespace memgraph::coordinator {
@ -32,7 +33,7 @@ struct AddressAndStatus {
Status status;
};
using CompoundKey = std::vector<memgraph::storage::PropertyValue>;
using CompoundKey = std::vector<memgraph::storage::v3::PropertyValue>;
using Shard = std::vector<AddressAndStatus>;
using Shards = std::map<CompoundKey, Shard>;

View File

@ -30,3 +30,5 @@ add_simulation_test(future.cpp thread)
add_simulation_test(basic_request.cpp address)
add_simulation_test(trial_query_storage/query_storage_test.cpp address)
add_simulation_test(sharded_map.cpp address)

View File

@ -0,0 +1,78 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <chrono>
#include <deque>
#include <iostream>
#include <map>
#include <optional>
#include <set>
#include <thread>
#include <vector>
#include "io/address.hpp"
#include "io/rsm/coordinator_rsm.hpp"
#include "io/rsm/raft.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
using memgraph::coordinator::Coordinator;
using memgraph::coordinator::CoordinatorRsm;
using memgraph::io::Address;
using memgraph::io::Io;
using memgraph::io::ResponseEnvelope;
using memgraph::io::ResponseFuture;
using memgraph::io::ResponseResult;
using memgraph::io::rsm::Raft;
using memgraph::io::rsm::ReadRequest;
using memgraph::io::rsm::ReadResponse;
using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
int main() {
SimulatorConfig config{
/*
.drop_percent = 5,
.perform_timeouts = true,
.scramble_messages = true,
.rng_seed = 0,
.start_time = 256 * 1024,
.abort_time = std::chrono::microseconds{8 * 1024 * 1024},
*/
};
auto simulator = Simulator(config);
auto cli_addr = Address::TestAddress(1);
auto srv_addr_1 = Address::TestAddress(2);
auto srv_addr_2 = Address::TestAddress(3);
auto srv_addr_3 = Address::TestAddress(4);
Io<SimulatorTransport> cli_io = simulator.Register(cli_addr);
Io<SimulatorTransport> srv_io_1 = simulator.Register(srv_addr_1);
Io<SimulatorTransport> srv_io_2 = simulator.Register(srv_addr_2);
Io<SimulatorTransport> srv_io_3 = simulator.Register(srv_addr_3);
std::vector<Address> srv_1_peers = {srv_addr_2, srv_addr_3};
std::vector<Address> srv_2_peers = {srv_addr_1, srv_addr_3};
std::vector<Address> srv_3_peers = {srv_addr_1, srv_addr_2};
using ConcreteCoordinatorRsm = CoordinatorRsm<SimulatorTransport>;
ConcreteCoordinatorRsm srv_1{std::move(srv_io_1), srv_1_peers, Coordinator{}};
ConcreteCoordinatorRsm srv_2{std::move(srv_io_2), srv_2_peers, Coordinator{}};
ConcreteCoordinatorRsm srv_3{std::move(srv_io_3), srv_3_peers, Coordinator{}};
return 0;
}