Merge branch 'T0912-MG-in-memory-shard-map' of github.com:memgraph/memgraph into T0916-MG-fbthrift-transport

This commit is contained in:
Tyler Neely 2022-08-19 06:47:29 +00:00
commit 0f28408db4
6 changed files with 107 additions and 53 deletions

View File

@ -87,10 +87,22 @@ struct DeregisterStorageEngineResponse {
bool success;
};
using WriteRequests = std::variant<AllocateHlcBatchRequest, AllocateEdgeIdBatchRequest, SplitShardRequest,
RegisterStorageEngineRequest, DeregisterStorageEngineRequest>;
using WriteResponses = std::variant<AllocateHlcBatchResponse, AllocateEdgeIdBatchResponse, SplitShardResponse,
RegisterStorageEngineResponse, DeregisterStorageEngineResponse>;
struct InitializeLabelRequest {
std::string label_name;
Hlc last_shard_map_version;
};
struct InitializeLabelResponse {
bool success;
std::optional<ShardMap> fresher_shard_map;
};
using WriteRequests =
std::variant<AllocateHlcBatchRequest, AllocateEdgeIdBatchRequest, SplitShardRequest, RegisterStorageEngineRequest,
DeregisterStorageEngineRequest, InitializeLabelRequest>;
using WriteResponses =
std::variant<AllocateHlcBatchResponse, AllocateEdgeIdBatchResponse, SplitShardResponse,
RegisterStorageEngineResponse, DeregisterStorageEngineResponse, InitializeLabelResponse>;
using ReadRequests = std::variant<HlcRequest, GetShardMapRequest>;
using ReadResponses = std::variant<HlcResponse, GetShardMapResponse>;
@ -116,14 +128,14 @@ class Coordinator {
uint64_t highest_allocated_edge_id_;
/// Increment our
ReadResponses Read(HlcRequest hlc_request) {
ReadResponses HandleRead(HlcRequest &&hlc_request) {
HlcResponse res{};
auto hlc_shard_map = shard_map_.GetHlc();
MG_ASSERT(!(hlc_request.last_shard_map_version.logical_id > hlc_shard_map.logical_id));
res.new_hlc = shard_map_.UpdateShardMapVersion();
res.new_hlc = shard_map_.IncrementShardMapVersion();
// res.fresher_shard_map = hlc_request.last_shard_map_version.logical_id < hlc_shard_map.logical_id
// ? std::make_optional(shard_map_)
@ -135,7 +147,7 @@ class Coordinator {
return res;
}
GetShardMapResponse Read(GetShardMapRequest &&get_shard_map_request) {
ReadResponses HandleRead(GetShardMapRequest &&get_shard_map_request) {
GetShardMapResponse res;
res.shard_map = shard_map_;
return res;
@ -199,27 +211,28 @@ class Coordinator {
return res;
}
WriteResponses ApplyWrite(InitializeLabelRequest &&initialize_label_request) {
InitializeLabelResponse res{};
bool success = shard_map_.InitializeNewLabel(initialize_label_request.label_name,
initialize_label_request.last_shard_map_version);
if (success) {
res.fresher_shard_map = shard_map_;
res.success = false;
} else {
res.fresher_shard_map = std::nullopt;
res.success = true;
}
return res;
}
public:
explicit Coordinator(ShardMap sm) : shard_map_{(sm)} {}
ReadResponses Read(ReadRequests requests) {
// if (std::get_if<HlcRequest>(&requests)) {
// std::cout << "HlcRequest" << std::endl;
// } else if (std::get_if<GetShardMapRequest>(&requests)) {
// std::cout << "GetShardMapRequest" << std::endl;
// } else {
// std::cout << "idk requests" << std::endl;
// }
// std::cout << "Coordinator Read()" << std::endl;
auto ret = std::visit([&](auto requests) { return Read(requests); }, (requests));
// if (std::get_if<HlcResponse>(&ret)) {
// std::cout << "HlcResponse" << std::endl;
// } else if (std::get_if<GetShardMapResponse>(&ret)) {
// std::cout << "GetShardMapResponse" << std::endl;
// } else {
// std::cout << "idk response" << std::endl;
// }
return ret;
return std::visit([&](auto &&requests) { return HandleRead(std::move(requests)); }, std::move(requests));
}
WriteResponses Apply(WriteRequests requests) {

View File

@ -46,16 +46,9 @@ struct ShardMap {
Hlc shard_map_version;
std::map<Label, Shards> shards;
// TODO(gabor) later we will want to update the wallclock time with
// the given Io<impl>'s time as well. This function should just be
// replaced with operator== since it is already overloaded for Hlc
// objects.
bool CompareShardMapVersions(Hlc one, Hlc two) { return one.logical_id == two.logical_id; }
public:
// TODO(gabor) later we will want to update the wallclock time with
// the given Io<impl>'s time as well
Hlc UpdateShardMapVersion() noexcept {
Hlc IncrementShardMapVersion() noexcept {
++shard_map_version.logical_id;
return shard_map_version;
}
@ -83,12 +76,29 @@ struct ShardMap {
// Apply the split
shards_in_map[key] = shard_to_map_to;
return true;
}
return false;
}
bool InitializeNewLabel(std::string label_name, Hlc last_shard_map_version) {
if (shard_map_version != last_shard_map_version) {
return false;
}
if (shards.contains(label_name)) {
return false;
}
shards.emplace(label_name, Shards{});
IncrementShardMapVersion();
return true;
}
void AddServer(Address server_address) {
// Find a random place for the server to plug in
}
@ -98,14 +108,28 @@ struct ShardMap {
Shards GetShardsForRange(Label label, CompoundKey start, CompoundKey end);
Shard GetShardForKey(Label label, CompoundKey key) {
// return shards.at(label).at(key);
std::cout << "label" << std::endl;
auto asd1 = shards.at(label);
std::cout << "key" << std::endl;
auto asd2 = asd1[key];
auto shard_for_label = shards.at(label);
return asd2;
auto max = (--shard_for_label.end())->first;
if (key > max) {
return shard_for_label[max];
}
for (auto it = shard_for_label.lower_bound(key);; --it) {
MG_ASSERT(it->first <= key);
return it->second;
}
MG_ASSERT(false, "failed to find shard with a key that is less than or equal to the provided key");
}
private:
// TODO(gabor) later we will want to update the wallclock time with
// the given Io<impl>'s time as well. This function should just be
// replaced with operator== since it is already overloaded for Hlc
// objects.
bool CompareShardMapVersions(Hlc one, Hlc two) { return one.logical_id == two.logical_id; }
};
} // namespace memgraph::coordinator

View File

@ -17,6 +17,8 @@
#include "io/rsm/raft.hpp"
#include "utils/result.hpp"
namespace memgraph::io::rsm {
using memgraph::io::Address;
using memgraph::io::Duration;
using memgraph::io::ResponseEnvelope;
@ -73,7 +75,7 @@ class RsmClient {
const Time before = io_.Now();
do {
spdlog::debug("client sending CasRequest to Leader {}", leader_.ToString());
spdlog::debug("client sending WriteRequest to Leader {}", leader_.ToString());
ResponseFuture<WriteResponse<WriteResponseT>> response_future =
io_.template Request<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(leader_, client_req);
ResponseResult<WriteResponse<WriteResponseT>> response_result = std::move(response_future).Wait();
@ -105,7 +107,7 @@ class RsmClient {
const Time before = io_.Now();
do {
spdlog::debug("client sending GetRequest to Leader {}", leader_.ToString());
spdlog::debug("client sending ReadRequest to Leader {}", leader_.ToString());
ResponseFuture<ReadResponse<ReadResponseT>> get_response_future =
io_.template Request<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(leader_, read_req);
@ -131,3 +133,5 @@ class RsmClient {
return TimedOut{};
}
};
} // namespace memgraph::io::rsm

View File

@ -11,6 +11,16 @@
#pragma once
/// The StorageRsm is a simple in-memory raft-backed kv store that can be used for simple testing
/// and implementation of some query engine logic before storage engines are fully implemented.
///
/// To implement multiple read and write commands, change the StorageRead* and StorageWrite* requests
/// and responses to a std::variant of the different options, and route them to specific handlers in
/// the StorageRsm's Read and Apply methods. Remember that Read is called immediately when the Raft
/// leader receives the request, and does not replicate anything over Raft. Apply is called only
/// AFTER the StorageWriteRequest is replicated to a majority of Raft peers, and the result of calling
/// StorageRsm::Apply(StorageWriteRequest) is returned to the client that submitted the request.
#include <deque>
#include <iostream>
#include <map>
@ -60,11 +70,11 @@ struct StorageWriteResponse {
std::optional<Hlc> latest_known_shard_map_version{std::nullopt};
};
struct StorageGetRequest {
struct StorageReadRequest {
ShardRsmKey key;
};
struct StorageGetResponse {
struct StorageReadResponse {
bool shard_rsm_success;
std::optional<int> value;
// Only has a value if the given shard does not contain the requested key
@ -86,8 +96,8 @@ class StorageRsm {
}
public:
StorageGetResponse Read(StorageGetRequest request) {
StorageGetResponse ret;
StorageReadResponse Read(StorageReadRequest request) {
StorageReadResponse ret;
if (!IsKeyInRange(request.key)) {
ret.latest_known_shard_map_version = shard_map_version_;

View File

@ -20,9 +20,9 @@
#include "io/address.hpp"
#include "io/rsm/raft.hpp"
#include "io/rsm/rsm_client.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "utils/rsm_client.hpp"
using memgraph::io::Address;
using memgraph::io::Duration;
@ -34,6 +34,7 @@ using memgraph::io::Time;
using memgraph::io::rsm::Raft;
using memgraph::io::rsm::ReadRequest;
using memgraph::io::rsm::ReadResponse;
using memgraph::io::rsm::RsmClient;
using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using memgraph::io::simulator::Simulator;

View File

@ -22,11 +22,11 @@
#include "io/errors.hpp"
#include "io/rsm/coordinator_rsm.hpp"
#include "io/rsm/raft.hpp"
#include "io/rsm/rsm_client.hpp"
#include "io/rsm/shard_rsm.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "utils/result.hpp"
#include "utils/rsm_client.hpp"
using memgraph::coordinator::Address;
using memgraph::coordinator::AddressAndStatus;
@ -48,8 +48,9 @@ using memgraph::io::rsm::CoordinatorRsm;
using memgraph::io::rsm::Raft;
using memgraph::io::rsm::ReadRequest;
using memgraph::io::rsm::ReadResponse;
using memgraph::io::rsm::StorageGetRequest;
using memgraph::io::rsm::StorageGetResponse;
using memgraph::io::rsm::RsmClient;
using memgraph::io::rsm::StorageReadRequest;
using memgraph::io::rsm::StorageReadResponse;
using memgraph::io::rsm::StorageRsm;
using memgraph::io::rsm::StorageWriteRequest;
using memgraph::io::rsm::StorageWriteResponse;
@ -61,8 +62,8 @@ using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::utils::BasicResult;
using StorageClient =
RsmClient<Io<SimulatorTransport>, StorageWriteRequest, StorageWriteResponse, StorageGetRequest, StorageGetResponse>;
using StorageClient = RsmClient<Io<SimulatorTransport>, StorageWriteRequest, StorageWriteResponse, StorageReadRequest,
StorageReadResponse>;
namespace {
ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::coordinator::Address a_io_2,
memgraph::coordinator::Address a_io_3, memgraph::coordinator::Address b_io_1,
@ -120,11 +121,12 @@ std::optional<StorageClient> DetermineShardLocation(Shard target_shard, const st
using ConcreteCoordinatorRsm = CoordinatorRsm<SimulatorTransport>;
using ConcreteStorageRsm = Raft<SimulatorTransport, StorageRsm, StorageWriteRequest, StorageWriteResponse,
StorageGetRequest, StorageGetResponse>;
StorageReadRequest, StorageReadResponse>;
template <typename IoImpl>
void RunStorageRaft(
Raft<IoImpl, StorageRsm, StorageWriteRequest, StorageWriteResponse, StorageGetRequest, StorageGetResponse> server) {
Raft<IoImpl, StorageRsm, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse>
server) {
server.Run();
}
@ -303,7 +305,7 @@ int main() {
// Have client use shard map to decide which shard to communicate
// with to read that same value back
StorageGetRequest storage_get_req;
StorageReadRequest storage_get_req;
storage_get_req.key = {write_key_1, write_key_2};
auto get_response_result = storage_client.SendReadRequest(storage_get_req);