Added Request ScanVertices on RequestClientManager and polished

This commit is contained in:
Kostas Kyrimis 2022-09-05 21:13:12 +03:00
parent e442bf435a
commit c8bc4c7dbc
4 changed files with 155 additions and 90 deletions
src
tests/simulation

View File

@ -36,6 +36,8 @@ enum class Status : uint8_t {
struct AddressAndStatus {
memgraph::io::Address address;
Status status;
friend bool operator<(const AddressAndStatus &lhs, const AddressAndStatus &rhs) { return lhs.address < rhs.address; }
};
using CompoundKey = std::vector<memgraph::storage::v3::PropertyValue>;
@ -53,6 +55,16 @@ struct ShardMap {
std::map<LabelName, LabelId> labels;
std::map<LabelId, Shards> shards;
auto FindShardToInsert(const LabelName &name, CompoundKey &key) {
MG_ASSERT(labels.contains(name));
const auto id = labels.find(name)->second;
auto &shards_ref = shards[id];
auto it =
std::find_if(shards_ref.rbegin(), shards_ref.rend(), [&key](const auto &shard) { return shard.first <= key; });
MG_ASSERT(it != shards_ref.rbegin());
return it;
}
// TODO(gabor) later we will want to update the wallclock time with
// the given Io<impl>'s time as well
Hlc IncrementShardMapVersion() noexcept {
@ -116,6 +128,8 @@ struct ShardMap {
// Find a random place for the server to plug in
}
LabelId GetLabelId(const std::string &label) const { return labels.at(label); }
Shards GetShardsForRange(LabelName label_name, CompoundKey start_key, CompoundKey end_key) {
MG_ASSERT(start_key <= end_key);
MG_ASSERT(labels.contains(label_name));
@ -146,6 +160,14 @@ struct ShardMap {
return std::prev(shard_for_label.upper_bound(key))->second;
}
Shard GetShardForKey(LabelId label_id, CompoundKey key) {
MG_ASSERT(shards.contains(label_id));
const auto &shard_for_label = shards.at(label_id);
return std::prev(shard_for_label.upper_bound(key))->second;
}
PropertyMap AllocatePropertyIds(std::vector<PropertyName> &new_properties) {
PropertyMap ret{};

View File

@ -41,30 +41,34 @@ template <typename TStorageClient>
class RsmStorageClientManager {
public:
using CompoundKey = memgraph::io::rsm::ShardRsmKey;
using Shard = memgraph::coordinator::Shard;
RsmStorageClientManager() = default;
RsmStorageClientManager(const RsmStorageClientManager &) = delete;
RsmStorageClientManager(RsmStorageClientManager &&) = delete;
void AddClient(const std::string &label, CompoundKey key, TStorageClient client) {
void AddClient(const std::string &label, Shard key, TStorageClient client) {
cli_cache_[label].insert({std::move(key), std::move(client)});
}
bool Exists(const std::string &label, const CompoundKey &key) { return cli_cache_[label].contains(key); }
bool Exists(const std::string &label, const Shard &key) { return cli_cache_[label].contains(key); }
void PurgeCache() { cli_cache_.clear(); }
TStorageClient &GetClient(const std::string &label, const CompoundKey &key) {
return cli_cache_[label].find(key)->second;
}
TStorageClient &GetClient(const std::string &label, const Shard &key) { return cli_cache_[label].find(key)->second; }
private:
std::unordered_map<std::string, std::map<CompoundKey, TStorageClient>> cli_cache_;
std::map<std::string, std::map<Shard, TStorageClient>> cli_cache_;
};
template <typename TRequest>
struct ExecutionState {
using CompoundKey = memgraph::io::rsm::ShardRsmKey;
using Shard = memgraph::coordinator::Shard;
const std::string label;
enum State : int8_t { INITIALIZING, EXECUTING, COMPLETED };
// label is optional because some operators can create/remove etc, vertices. These kind of requests contain the label
// on the request itself.
std::optional<std::string> label;
// CompoundKey is optional because some operators require to iterate over all the available keys
// of a shard. One example is ScanAll, where we only require the field label.
std::optional<CompoundKey> key;
@ -74,10 +78,11 @@ struct ExecutionState {
// the ShardRequestManager impl will send requests to. When a request to a shard exhausts it, meaning that
// it pulled all the requested data from the given Shard, it will be removed from the Vector. When the Vector becomes
// empty, it means that all of the requests have completed succefully.
std::optional<std::vector<Shard>> state_;
// 1-1 mapping with `state_`.
std::vector<Shard> shard_cache;
// 1-1 mapping with `shard_cache`.
// A vector that tracks request metatdata for each shard (For example, next_id for a ScanAll on Shard A)
std::vector<TRequest> requests;
State state = INITIALIZING;
};
class ShardRequestManagerInterface {
@ -126,13 +131,14 @@ class ShardRequestManager : public ShardRequestManagerInterface {
shards_map_ = hlc_response.fresher_shard_map.value();
}
// TODO(kostasrim) Simplify return result
std::vector<ScanVerticesResponse> Request(ExecutionState<ScanVerticesRequest> &state) override {
MaybeInitializeExecutionState(state);
std::vector<ScanVerticesResponse> responses;
auto &state_ref = *state.state_;
auto &shard_cacheref = state.shard_cache;
size_t id = 0;
for (auto shard_it = state_ref.begin(); shard_it != state_ref.end(); ++id) {
auto &storage_client = GetStorageClientForShard(state.label, state.requests[id].start_id.second);
for (auto shard_it = shard_cacheref.begin(); shard_it != shard_cacheref.end(); ++id) {
auto &storage_client = GetStorageClientForShard(*state.label, state.requests[id].start_id.second);
// TODO(kostasrim) Currently requests return the result directly. Adjust this when the API works MgFuture instead.
auto read_response_result = storage_client.SendReadRequest(state.requests[id]);
// RETRY on timeouts?
@ -145,131 +151,150 @@ class ShardRequestManager : public ShardRequestManagerInterface {
}
responses.push_back(read_response_result.GetValue());
if (!read_response_result.GetValue().next_start_id) {
shard_it = state_ref.erase(shard_it);
shard_it = shard_cacheref.erase(shard_it);
} else {
state.requests[id].start_id.second = read_response_result.GetValue().next_start_id->second;
++shard_it;
}
}
// We are done with this state
MaybeCompleteState(state);
// TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return
// result of storage_client.SendReadRequest()).
return responses;
}
std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state) {}
// size_t TestRequest(ExecutionState &state) {
// MaybeUpdateShardMap(state);
// MaybeUpdateExecutionState(state);
// for (auto &st : *state.state_) {
// auto &storage_client = GetStorageClientForShard(state.label, *state.key);
//
// memgraph::storage::v3::LabelId label_id = shards_map_.labels.at(state.label);
//
// rsm::StorageWriteRequest storage_req;
// storage_req.label_id = label_id;
// storage_req.transaction_id = state.transaction_id;
// storage_req.key = *state.key;
// storage_req.value = 469;
// auto write_response_result = storage_client.SendWriteRequest(storage_req);
// if (write_response_result.HasError()) {
// throw std::runtime_error("Handle gracefully!");
// }
// auto write_response = write_response_result.GetValue();
//
// bool cas_succeeded = write_response.shard_rsm_success;
//
// if (!cas_succeeded) {
// throw std::runtime_error("Handler gracefully!");
// }
// rsm::StorageReadRequest storage_get_req;
// storage_get_req.key = *state.key;
//
// auto get_response_result = storage_client.SendReadRequest(storage_get_req);
// if (get_response_result.HasError()) {
// throw std::runtime_error("Handler gracefully!");
// }
// auto get_response = get_response_result.GetValue();
// auto val = get_response.value.value();
// return val;
// }
// return 0;
// }
std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state,
std::vector<NewVertexLabel> new_vertices) {
MG_ASSERT(!new_vertices.empty());
MaybeInitializeExecutionState(state, std::move(new_vertices));
std::vector<CreateVerticesResponse> responses;
auto &shard_cache_ref = state.shard_cache;
size_t id = 0;
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
// This is fine because all new_vertices of each request end up on the same shard
Label label = state.requests[id].new_vertices[0].label_ids;
auto primary_key = state.requests[id].new_vertices[0].primary_key;
auto &storage_client = GetStorageClientForShard(label, primary_key);
auto read_response_result = storage_client.SendReadRequest(state.requests[id]);
// RETRY on timeouts?
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test
if (read_response_result.HasError()) {
throw std::runtime_error("Write request error");
}
if (read_response_result.GetValue().success == false) {
throw std::runtime_error("Write request did not succeed");
}
responses.push_back(read_response_result.GetValue());
shard_it = shard_cache_ref.erase(shard_it);
}
// We are done with this state
MaybeCompleteState(state);
// TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return
// result of storage_client.SendReadRequest()).
return responses;
}
private:
template <typename TRequest>
void MaybeUpdateShardMap(TRequest &state) {
memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
auto read_res = coord_cli_.SendReadRequest(req);
if (read_res.HasError()) {
throw std::runtime_error("HLC request failed");
}
auto coordinator_read_response = read_res.GetValue();
auto hlc_response = std::get<memgraph::coordinator::HlcResponse>(coordinator_read_response);
if (hlc_response.fresher_shard_map) {
// throw std::runtime_error("Shouldn'");
// error here new shard map shouldn't exist
template <typename ExecutionState>
void ThrowIfStateCompleted(ExecutionState &state) const {
if (state.state == ExecutionState::COMPLETED) [[unlikely]] {
throw std::runtime_error("State is completed and must be reset");
}
}
// Transaction ID to be used later...
state.transaction_id = hlc_response.new_hlc;
if (hlc_response.fresher_shard_map) {
shards_map_ = hlc_response.fresher_shard_map.value();
} else {
throw std::runtime_error("Should handle gracefully!");
template <typename ExecutionState>
void MaybeCompleteState(ExecutionState &state) const {
if (state.requests.empty()) {
state.state = ExecutionState::COMPLETED;
}
}
template <typename ExecutionState>
bool ShallNotInitializeState(ExecutionState &state) const {
return state.state != ExecutionState::INITIALIZING;
}
template <typename TRequest>
void MaybeUpdateExecutionState(TRequest &state) {
if (state.state_) {
if (state.shard_cache) {
return;
}
state.transaction_id = transaction_id_;
state.state_ = std::make_optional<std::vector<Shard>>();
state.shard_cache = std::make_optional<std::vector<Shard>>();
const auto &shards = shards_map_.shards[shards_map_.labels[state.label]];
if (state.key) {
if (auto it = shards.find(*state.key); it != shards.end()) {
state.state_->push_back(it->second);
state.shard_cache->push_back(it->second);
return;
}
// throw here
}
for (const auto &[key, shard] : shards) {
state.state_->push_back(shard);
state.shard_cache->push_back(shard);
}
}
void MaybeInitializeExecutionState(ExecutionState<ScanVerticesRequest> &state) {
if (state.state_) {
void MaybeInitializeExecutionState(ExecutionState<CreateVerticesRequest> &state,
std::vector<NewVertexLabel> new_vertices) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
}
state.transaction_id = transaction_id_;
state.state_ = std::make_optional<std::vector<Shard>>();
const auto &shards = shards_map_.shards[shards_map_.labels[state.label]];
std::map<Shard, CreateVerticesRequest> per_shard_request_table;
for (auto &new_vertex : new_vertices) {
auto shard = shards_map_.GetShardForKey(new_vertex.label, new_vertex.primary_key);
if (!per_shard_request_table.contains(shard)) {
CreateVerticesRequest create_v_rqst{.transaction_id = transaction_id_};
per_shard_request_table.insert(std::pair(shard, std::move(create_v_rqst)));
state.shard_cache.push_back(shard);
}
per_shard_request_table[shard].new_vertices.push_back(
NewVertex{.label_ids = shards_map_.GetLabelId(new_vertex.label),
.primary_key = std::move(new_vertex.primary_key),
.properties = std::move(new_vertex.properties)});
}
for (auto &[shard, rqst] : per_shard_request_table) {
state.requests.push_back(std::move(rqst));
}
state.state = ExecutionState<CreateVerticesRequest>::EXECUTING;
}
void MaybeInitializeExecutionState(ExecutionState<ScanVerticesRequest> &state) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
}
state.transaction_id = transaction_id_;
const auto &shards = shards_map_.shards[shards_map_.labels[*state.label]];
for (const auto &[key, shard] : shards) {
state.state_->push_back(shard);
state.shard_cache.push_back(shard);
ScanVerticesRequest rqst;
rqst.transaction_id = transaction_id_;
rqst.start_id.second = key;
state.requests.push_back(std::move(rqst));
}
state.state = ExecutionState<ScanVerticesRequest>::EXECUTING;
}
// std::vector<storageclient> GetStorageClientFromShardforRange(const std::string &label, const CompoundKey &start,
// const CompoundKey &end);
StorageClient &GetStorageClientForShard(const std::string &label, const CompoundKey &key) {
if (storage_cli_manager_.Exists(label, key)) {
return storage_cli_manager_.GetClient(label, key);
template <typename TLabel>
StorageClient &GetStorageClientForShard(const TLabel &label, const CompoundKey &key) {
auto shard = shards_map_.GetShardForKey(label, key);
if (!storage_cli_manager_.Exists(label, shard)) {
AddStorageClientToManager(shard, label);
}
auto target_shard = shards_map_.GetShardForKey(label, key);
AddStorageClientToManager(std::move(target_shard), label, key);
return storage_cli_manager_.GetClient(label, key);
return storage_cli_manager_.GetClient(label, shard);
}
void AddStorageClientToManager(Shard target_shard, const std::string &label, const CompoundKey &cm_k) {
void AddStorageClientToManager(Shard target_shard, const std::string &label) {
MG_ASSERT(!target_shard.empty());
auto leader_addr = target_shard.front();
std::vector<Address> addresses;
@ -278,7 +303,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
addresses.push_back(std::move(address.address));
}
auto cli = StorageClient(io_, std::move(leader_addr.address), std::move(addresses));
storage_cli_manager_.AddClient(label, cm_k, std::move(cli));
storage_cli_manager_.AddClient(label, target_shard, std::move(cli));
}
ShardMap shards_map_;

View File

@ -24,9 +24,10 @@
#include "storage/v3/property_value.hpp"
using memgraph::coordinator::Hlc;
using memgraph::storage::v3::LabelId;
struct Label {
size_t id;
LabelId id;
};
// TODO(kostasrim) update this with CompoundKey, same for the rest of the file.
@ -196,11 +197,19 @@ struct ExpandOneResponse {
};
struct NewVertex {
std::vector<Label> label_ids;
Label label_ids;
PrimaryKey primary_key;
std::map<PropertyId, Value> properties;
};
struct NewVertexLabel {
std::string label;
PrimaryKey primary_key;
std::map<PropertyId, Value> properties;
};
struct CreateVerticesRequest {
std::string label;
Hlc transaction_id;
std::vector<NewVertex> new_vertices;
};

View File

@ -153,6 +153,15 @@ void TestScanAll(ShardRequestManager &io) {
MG_ASSERT(result.size() == 0);
}
template <typename ShardRequestManager>
void TestCreateVertices(ShardRequestManager &io) {}
template <typename ShardRequestManager>
void TestExpand(ShardRequestManager &io) {}
template <typename ShardRequestManager>
void TestAggregate(ShardRequestManager &io) {}
int main() {
SimulatorConfig config{
.drop_percent = 0,