From c15e75b48cea40e980706a44647753842e63fb61 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis <kostaskyrim@gmail.com> Date: Thu, 1 Dec 2022 17:40:58 +0200 Subject: [PATCH] Remove old shard request manager header --- src/query/v2/shard_request_manager.hpp | 812 ------------------------- 1 file changed, 812 deletions(-) delete mode 100644 src/query/v2/shard_request_manager.hpp diff --git a/src/query/v2/shard_request_manager.hpp b/src/query/v2/shard_request_manager.hpp deleted file mode 100644 index 4db77e645..000000000 --- a/src/query/v2/shard_request_manager.hpp +++ /dev/null @@ -1,812 +0,0 @@ -// Copyright 2022 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -#pragma once - -#include <chrono> -#include <deque> -#include <iostream> -#include <iterator> -#include <map> -#include <numeric> -#include <optional> -#include <random> -#include <set> -#include <stdexcept> -#include <thread> -#include <unordered_map> -#include <vector> - -#include "coordinator/coordinator.hpp" -#include "coordinator/coordinator_client.hpp" -#include "coordinator/coordinator_rsm.hpp" -#include "coordinator/shard_map.hpp" -#include "io/address.hpp" -#include "io/errors.hpp" -#include "io/rsm/raft.hpp" -#include "io/rsm/rsm_client.hpp" -#include "io/rsm/shard_rsm.hpp" -#include "io/simulator/simulator.hpp" -#include "io/simulator/simulator_transport.hpp" -#include "query/v2/accessors.hpp" -#include "query/v2/requests.hpp" -#include "storage/v3/id_types.hpp" -#include "storage/v3/value_conversions.hpp" -#include "utils/result.hpp" - -namespace memgraph::msgs { -template <typename TStorageClient> -class RsmStorageClientManager { - public: - using CompoundKey = memgraph::io::rsm::ShardRsmKey; - using Shard = memgraph::coordinator::Shard; - using LabelId = memgraph::storage::v3::LabelId; - RsmStorageClientManager() = default; - RsmStorageClientManager(const RsmStorageClientManager &) = delete; - RsmStorageClientManager(RsmStorageClientManager &&) = delete; - RsmStorageClientManager &operator=(const RsmStorageClientManager &) = delete; - RsmStorageClientManager &operator=(RsmStorageClientManager &&) = delete; - ~RsmStorageClientManager() = default; - - void AddClient(Shard key, TStorageClient client) { cli_cache_.emplace(std::move(key), std::move(client)); } - - bool Exists(const Shard &key) { return cli_cache_.contains(key); } - - void PurgeCache() { cli_cache_.clear(); } - - TStorageClient &GetClient(const Shard &key) { - auto it = cli_cache_.find(key); - MG_ASSERT(it != cli_cache_.end(), "Non-existing shard client"); - return it->second; - } - - private: - std::map<Shard, TStorageClient> cli_cache_; -}; - -template <typename TRequest> -struct ExecutionState { - using CompoundKey = memgraph::io::rsm::ShardRsmKey; - using Shard = memgraph::coordinator::Shard; - - enum State : int8_t { INITIALIZING, EXECUTING, COMPLETED }; - // label is optional because some operators can create/remove etc, vertices. These kind of requests contain the label - // on the request itself. - std::optional<std::string> label; - // CompoundKey is optional because some operators require to iterate over all the available keys - // of a shard. One example is ScanAll, where we only require the field label. - std::optional<CompoundKey> key; - // Transaction id to be filled by the ShardRequestManager implementation - memgraph::coordinator::Hlc transaction_id; - // Initialized by ShardRequestManager implementation. This vector is filled with the shards that - // the ShardRequestManager impl will send requests to. When a request to a shard exhausts it, meaning that - // it pulled all the requested data from the given Shard, it will be removed from the Vector. When the Vector becomes - // empty, it means that all of the requests have completed succefully. - // TODO(gvolfing) - // Maybe make this into a more complex object to be able to keep track of paginated resutls. E.g. instead of a vector - // of Shards make it into a std::vector<std::pair<Shard, PaginatedResultType>> (probably a struct instead of a pair) - // where PaginatedResultType is an enum signaling the progress on the given request. This way we can easily check if - // a partial response on a shard(if there is one) is finished and we can send off the request for the next batch. - std::vector<Shard> shard_cache; - // 1-1 mapping with `shard_cache`. - // A vector that tracks request metadata for each shard (For example, next_id for a ScanAll on Shard A) - std::vector<TRequest> requests; - State state = INITIALIZING; -}; - -class ShardRequestManagerInterface { - public: - using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor; - using EdgeAccessor = memgraph::query::v2::accessors::EdgeAccessor; - ShardRequestManagerInterface() = default; - ShardRequestManagerInterface(const ShardRequestManagerInterface &) = delete; - ShardRequestManagerInterface(ShardRequestManagerInterface &&) = delete; - ShardRequestManagerInterface &operator=(const ShardRequestManagerInterface &) = delete; - ShardRequestManagerInterface &&operator=(ShardRequestManagerInterface &&) = delete; - - virtual ~ShardRequestManagerInterface() = default; - - virtual void StartTransaction() = 0; - virtual void Commit() = 0; - virtual std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) = 0; - virtual std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state, - std::vector<NewVertex> new_vertices) = 0; - virtual std::vector<ExpandOneResultRow> Request(ExecutionState<ExpandOneRequest> &state, - ExpandOneRequest request) = 0; - virtual std::vector<CreateExpandResponse> Request(ExecutionState<CreateExpandRequest> &state, - std::vector<NewExpand> new_edges) = 0; - virtual std::vector<GetPropertiesResponse> Request(ExecutionState<GetPropertiesRequest> &state, - GetPropertiesRequest request) = 0; - virtual storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const = 0; - virtual storage::v3::PropertyId NameToProperty(const std::string &name) const = 0; - virtual storage::v3::LabelId NameToLabel(const std::string &name) const = 0; - virtual const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const = 0; - virtual const std::string &LabelToName(memgraph::storage::v3::LabelId label) const = 0; - virtual const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const = 0; - virtual bool IsPrimaryLabel(LabelId label) const = 0; - virtual bool IsPrimaryKey(LabelId primary_label, PropertyId property) const = 0; -}; - -// TODO(kostasrim)rename this class template -template <typename TTransport> -class ShardRequestManager : public ShardRequestManagerInterface { - public: - using StorageClient = - memgraph::coordinator::RsmClient<TTransport, WriteRequests, WriteResponses, ReadRequests, ReadResponses>; - using CoordinatorWriteRequests = memgraph::coordinator::CoordinatorWriteRequests; - using CoordinatorClient = memgraph::coordinator::CoordinatorClient<TTransport>; - using Address = memgraph::io::Address; - using Shard = memgraph::coordinator::Shard; - using ShardMap = memgraph::coordinator::ShardMap; - using CompoundKey = memgraph::coordinator::PrimaryKey; - using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor; - using EdgeAccessor = memgraph::query::v2::accessors::EdgeAccessor; - ShardRequestManager(CoordinatorClient coord, memgraph::io::Io<TTransport> &&io) - : coord_cli_(std::move(coord)), io_(std::move(io)) {} - - ShardRequestManager(const ShardRequestManager &) = delete; - ShardRequestManager(ShardRequestManager &&) = delete; - ShardRequestManager &operator=(const ShardRequestManager &) = delete; - ShardRequestManager &operator=(ShardRequestManager &&) = delete; - - ~ShardRequestManager() override {} - - void StartTransaction() override { - memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; - CoordinatorWriteRequests write_req = req; - auto write_res = coord_cli_.SendWriteRequest(write_req); - if (write_res.HasError()) { - throw std::runtime_error("HLC request failed"); - } - auto coordinator_write_response = write_res.GetValue(); - auto hlc_response = std::get<memgraph::coordinator::HlcResponse>(coordinator_write_response); - - // Transaction ID to be used later... - transaction_id_ = hlc_response.new_hlc; - - if (hlc_response.fresher_shard_map) { - shards_map_ = hlc_response.fresher_shard_map.value(); - SetUpNameIdMappers(); - } - } - - void Commit() override { - memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; - CoordinatorWriteRequests write_req = req; - auto write_res = coord_cli_.SendWriteRequest(write_req); - if (write_res.HasError()) { - throw std::runtime_error("HLC request for commit failed"); - } - auto coordinator_write_response = write_res.GetValue(); - auto hlc_response = std::get<memgraph::coordinator::HlcResponse>(coordinator_write_response); - - if (hlc_response.fresher_shard_map) { - shards_map_ = hlc_response.fresher_shard_map.value(); - SetUpNameIdMappers(); - } - auto commit_timestamp = hlc_response.new_hlc; - - msgs::CommitRequest commit_req{.transaction_id = transaction_id_, .commit_timestamp = commit_timestamp}; - - for (const auto &[label, space] : shards_map_.label_spaces) { - for (const auto &[key, shard] : space.shards) { - auto &storage_client = GetStorageClientForShard(shard); - // TODO(kostasrim) Currently requests return the result directly. Adjust this when the API works MgFuture - // instead. - auto commit_response = storage_client.SendWriteRequest(commit_req); - // RETRY on timeouts? - // Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test - if (commit_response.HasError()) { - throw std::runtime_error("Commit request timed out"); - } - WriteResponses write_response_variant = commit_response.GetValue(); - auto &response = std::get<CommitResponse>(write_response_variant); - if (response.error) { - throw std::runtime_error("Commit request did not succeed"); - } - } - } - } - - storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const override { - return shards_map_.GetEdgeTypeId(name).value(); - } - - storage::v3::PropertyId NameToProperty(const std::string &name) const override { - return shards_map_.GetPropertyId(name).value(); - } - - storage::v3::LabelId NameToLabel(const std::string &name) const override { - return shards_map_.GetLabelId(name).value(); - } - - const std::string &PropertyToName(memgraph::storage::v3::PropertyId id) const override { - return properties_.IdToName(id.AsUint()); - } - const std::string &LabelToName(memgraph::storage::v3::LabelId id) const override { - return labels_.IdToName(id.AsUint()); - } - const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId id) const override { - return edge_types_.IdToName(id.AsUint()); - } - - bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override { - const auto schema_it = shards_map_.schemas.find(primary_label); - MG_ASSERT(schema_it != shards_map_.schemas.end(), "Invalid primary label id: {}", primary_label.AsUint()); - - return std::find_if(schema_it->second.begin(), schema_it->second.end(), [property](const auto &schema_prop) { - return schema_prop.property_id == property; - }) != schema_it->second.end(); - } - - bool IsPrimaryLabel(LabelId label) const override { return shards_map_.label_spaces.contains(label); } - - // TODO(kostasrim) Simplify return result - std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) override { - MaybeInitializeExecutionState(state); - std::vector<ScanVerticesResponse> responses; - - SendAllRequests(state); - auto all_requests_gathered = [](auto &paginated_rsp_tracker) { - return std::ranges::all_of(paginated_rsp_tracker, [](const auto &state) { - return state.second == PaginatedResponseState::PartiallyFinished; - }); - }; - - std::map<Shard, PaginatedResponseState> paginated_response_tracker; - for (const auto &shard : state.shard_cache) { - paginated_response_tracker.insert(std::make_pair(shard, PaginatedResponseState::Pending)); - } - - do { - AwaitOnPaginatedRequests(state, responses, paginated_response_tracker); - } while (!all_requests_gathered(paginated_response_tracker)); - - MaybeCompleteState(state); - // TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return - // result of storage_client.SendReadRequest()). - return PostProcess(std::move(responses)); - } - - std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state, - std::vector<NewVertex> new_vertices) override { - MG_ASSERT(!new_vertices.empty()); - MaybeInitializeExecutionState(state, new_vertices); - std::vector<CreateVerticesResponse> responses; - auto &shard_cache_ref = state.shard_cache; - - // 1. Send the requests. - SendAllRequests(state, shard_cache_ref); - - // 2. Block untill all the futures are exhausted - do { - AwaitOnResponses(state, responses); - } while (!state.shard_cache.empty()); - - MaybeCompleteState(state); - // TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return - // result of storage_client.SendReadRequest()). - return responses; - } - - std::vector<CreateExpandResponse> Request(ExecutionState<CreateExpandRequest> &state, - std::vector<NewExpand> new_edges) override { - MG_ASSERT(!new_edges.empty()); - MaybeInitializeExecutionState(state, new_edges); - std::vector<CreateExpandResponse> responses; - auto &shard_cache_ref = state.shard_cache; - size_t id{0}; - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) { - auto &storage_client = GetStorageClientForShard(*shard_it); - WriteRequests req = state.requests[id]; - auto write_response_result = storage_client.SendWriteRequest(std::move(req)); - if (write_response_result.HasError()) { - throw std::runtime_error("CreateVertices request timedout"); - } - WriteResponses response_variant = write_response_result.GetValue(); - CreateExpandResponse mapped_response = std::get<CreateExpandResponse>(response_variant); - - if (mapped_response.error) { - throw std::runtime_error("CreateExpand request did not succeed"); - } - responses.push_back(mapped_response); - shard_it = shard_cache_ref.erase(shard_it); - } - // We are done with this state - MaybeCompleteState(state); - return responses; - } - - std::vector<ExpandOneResultRow> Request(ExecutionState<ExpandOneRequest> &state, ExpandOneRequest request) override { - // TODO(kostasrim)Update to limit the batch size here - // Expansions of the destination must be handled by the caller. For example - // match (u:L1 { prop : 1 })-[:Friend]-(v:L1) - // For each vertex U, the ExpandOne will result in <U, Edges>. The destination vertex and its properties - // must be fetched again with an ExpandOne(Edges.dst) - MaybeInitializeExecutionState(state, std::move(request)); - std::vector<ExpandOneResponse> responses; - auto &shard_cache_ref = state.shard_cache; - - // 1. Send the requests. - SendAllRequests(state, shard_cache_ref); - - // 2. Block untill all the futures are exhausted - do { - AwaitOnResponses(state, responses); - } while (!state.shard_cache.empty()); - std::vector<ExpandOneResultRow> result_rows; - const auto total_row_count = std::accumulate( - responses.begin(), responses.end(), 0, - [](const int64_t partial_count, const ExpandOneResponse &resp) { return partial_count + resp.result.size(); }); - result_rows.reserve(total_row_count); - - for (auto &response : responses) { - result_rows.insert(result_rows.end(), std::make_move_iterator(response.result.begin()), - std::make_move_iterator(response.result.end())); - } - MaybeCompleteState(state); - return result_rows; - } - - std::vector<GetPropertiesResponse> Request(ExecutionState<GetPropertiesRequest> &state, - GetPropertiesRequest requests) override { - MaybeInitializeExecutionState(state, std::move(requests)); - SendAllRequests(state); - - std::vector<GetPropertiesResponse> responses; - // 2. Block untill all the futures are exhausted - do { - AwaitOnResponses(state, responses); - } while (!state.shard_cache.empty()); - - MaybeCompleteState(state); - return responses; - } - - private: - enum class PaginatedResponseState { Pending, PartiallyFinished }; - - std::vector<VertexAccessor> PostProcess(std::vector<ScanVerticesResponse> &&responses) const { - std::vector<VertexAccessor> accessors; - for (auto &response : responses) { - for (auto &result_row : response.results) { - accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props), this)); - } - } - return accessors; - } - - template <typename ExecutionState> - void ThrowIfStateCompleted(ExecutionState &state) const { - if (state.state == ExecutionState::COMPLETED) [[unlikely]] { - throw std::runtime_error("State is completed and must be reset"); - } - } - - template <typename ExecutionState> - void ThrowIfStateExecuting(ExecutionState &state) const { - if (state.state == ExecutionState::EXECUTING) [[unlikely]] { - throw std::runtime_error("State is completed and must be reset"); - } - } - - template <typename ExecutionState> - void MaybeCompleteState(ExecutionState &state) const { - if (state.requests.empty()) { - state.state = ExecutionState::COMPLETED; - } - } - - template <typename ExecutionState> - bool ShallNotInitializeState(ExecutionState &state) const { - return state.state != ExecutionState::INITIALIZING; - } - - void MaybeInitializeExecutionState(ExecutionState<CreateVerticesRequest> &state, - std::vector<NewVertex> new_vertices) { - ThrowIfStateCompleted(state); - if (ShallNotInitializeState(state)) { - return; - } - state.transaction_id = transaction_id_; - - std::map<Shard, CreateVerticesRequest> per_shard_request_table; - - for (auto &new_vertex : new_vertices) { - MG_ASSERT(!new_vertex.label_ids.empty(), "This is error!"); - auto shard = shards_map_.GetShardForKey(new_vertex.label_ids[0].id, - storage::conversions::ConvertPropertyVector(new_vertex.primary_key)); - if (!per_shard_request_table.contains(shard)) { - CreateVerticesRequest create_v_rqst{.transaction_id = transaction_id_}; - per_shard_request_table.insert(std::pair(shard, std::move(create_v_rqst))); - state.shard_cache.push_back(shard); - } - per_shard_request_table[shard].new_vertices.push_back(std::move(new_vertex)); - } - - for (auto &[shard, rqst] : per_shard_request_table) { - state.requests.push_back(std::move(rqst)); - } - state.state = ExecutionState<CreateVerticesRequest>::EXECUTING; - } - - void MaybeInitializeExecutionState(ExecutionState<CreateExpandRequest> &state, std::vector<NewExpand> new_expands) { - ThrowIfStateCompleted(state); - if (ShallNotInitializeState(state)) { - return; - } - state.transaction_id = transaction_id_; - - std::map<Shard, CreateExpandRequest> per_shard_request_table; - auto ensure_shard_exists_in_table = [&per_shard_request_table, - transaction_id = transaction_id_](const Shard &shard) { - if (!per_shard_request_table.contains(shard)) { - CreateExpandRequest create_expand_request{.transaction_id = transaction_id}; - per_shard_request_table.insert({shard, std::move(create_expand_request)}); - } - }; - - for (auto &new_expand : new_expands) { - const auto shard_src_vertex = shards_map_.GetShardForKey( - new_expand.src_vertex.first.id, storage::conversions::ConvertPropertyVector(new_expand.src_vertex.second)); - const auto shard_dest_vertex = shards_map_.GetShardForKey( - new_expand.dest_vertex.first.id, storage::conversions::ConvertPropertyVector(new_expand.dest_vertex.second)); - - ensure_shard_exists_in_table(shard_src_vertex); - - if (shard_src_vertex != shard_dest_vertex) { - ensure_shard_exists_in_table(shard_dest_vertex); - per_shard_request_table[shard_dest_vertex].new_expands.push_back(new_expand); - } - per_shard_request_table[shard_src_vertex].new_expands.push_back(std::move(new_expand)); - } - - for (auto &[shard, request] : per_shard_request_table) { - state.shard_cache.push_back(shard); - state.requests.push_back(std::move(request)); - } - state.state = ExecutionState<CreateExpandRequest>::EXECUTING; - } - - void MaybeInitializeExecutionState(ExecutionState<ScanVerticesRequest> &state) { - ThrowIfStateCompleted(state); - if (ShallNotInitializeState(state)) { - return; - } - - std::vector<coordinator::Shards> multi_shards; - state.transaction_id = transaction_id_; - if (!state.label) { - multi_shards = shards_map_.GetAllShards(); - } else { - const auto label_id = shards_map_.GetLabelId(*state.label); - MG_ASSERT(label_id); - MG_ASSERT(IsPrimaryLabel(*label_id)); - multi_shards = {shards_map_.GetShardsForLabel(*state.label)}; - } - for (auto &shards : multi_shards) { - for (auto &[key, shard] : shards) { - MG_ASSERT(!shard.empty()); - state.shard_cache.push_back(std::move(shard)); - ScanVerticesRequest rqst; - rqst.transaction_id = transaction_id_; - rqst.start_id.second = storage::conversions::ConvertValueVector(key); - state.requests.push_back(std::move(rqst)); - } - } - state.state = ExecutionState<ScanVerticesRequest>::EXECUTING; - } - - void MaybeInitializeExecutionState(ExecutionState<ExpandOneRequest> &state, ExpandOneRequest request) { - ThrowIfStateCompleted(state); - if (ShallNotInitializeState(state)) { - return; - } - state.transaction_id = transaction_id_; - - std::map<Shard, ExpandOneRequest> per_shard_request_table; - auto top_level_rqst_template = request; - top_level_rqst_template.transaction_id = transaction_id_; - top_level_rqst_template.src_vertices.clear(); - state.requests.clear(); - for (auto &vertex : request.src_vertices) { - auto shard = - shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second)); - if (!per_shard_request_table.contains(shard)) { - per_shard_request_table.insert(std::pair(shard, top_level_rqst_template)); - state.shard_cache.push_back(shard); - } - per_shard_request_table[shard].src_vertices.push_back(vertex); - } - - for (auto &[shard, rqst] : per_shard_request_table) { - state.requests.push_back(std::move(rqst)); - } - state.state = ExecutionState<ExpandOneRequest>::EXECUTING; - } - - void MaybeInitializeExecutionState(ExecutionState<GetPropertiesRequest> &state, GetPropertiesRequest request) { - ThrowIfStateCompleted(state); - ThrowIfStateExecuting(state); - - std::map<Shard, GetPropertiesRequest> per_shard_request_table; - auto top_level_rqst_template = request; - top_level_rqst_template.transaction_id = transaction_id_; - top_level_rqst_template.vertices_and_edges.clear(); - - state.transaction_id = transaction_id_; - - for (auto &[vertex, maybe_edge] : request.vertices_and_edges) { - auto shard = - shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second)); - if (!per_shard_request_table.contains(shard)) { - per_shard_request_table.insert(std::pair(shard, top_level_rqst_template)); - state.shard_cache.push_back(shard); - } - per_shard_request_table[shard].vertices_and_edges.push_back({std::move(vertex), maybe_edge}); - } - - for (auto &[shard, rqst] : per_shard_request_table) { - state.requests.push_back(std::move(rqst)); - } - state.state = ExecutionState<GetPropertiesRequest>::EXECUTING; - } - - StorageClient &GetStorageClientForShard(Shard shard) { - if (!storage_cli_manager_.Exists(shard)) { - AddStorageClientToManager(shard); - } - return storage_cli_manager_.GetClient(shard); - } - - StorageClient &GetStorageClientForShard(const std::string &label, const CompoundKey &key) { - auto shard = shards_map_.GetShardForKey(label, key); - return GetStorageClientForShard(std::move(shard)); - } - - void AddStorageClientToManager(Shard target_shard) { - MG_ASSERT(!target_shard.empty()); - auto leader_addr = target_shard.front(); - std::vector<Address> addresses; - addresses.reserve(target_shard.size()); - for (auto &address : target_shard) { - addresses.push_back(std::move(address.address)); - } - auto cli = StorageClient(io_, std::move(leader_addr.address), std::move(addresses)); - storage_cli_manager_.AddClient(target_shard, std::move(cli)); - } - - template <typename TRequest> - void SendAllRequests(ExecutionState<TRequest> &state) { - int64_t shard_idx = 0; - for (const auto &request : state.requests) { - const auto ¤t_shard = state.shard_cache[shard_idx]; - - auto &storage_client = GetStorageClientForShard(current_shard); - ReadRequests req = request; - storage_client.SendAsyncReadRequest(request); - - ++shard_idx; - } - } - - void SendAllRequests(ExecutionState<CreateVerticesRequest> &state, - std::vector<memgraph::coordinator::Shard> &shard_cache_ref) { - size_t id = 0; - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++shard_it) { - // This is fine because all new_vertices of each request end up on the same shard - const auto labels = state.requests[id].new_vertices[0].label_ids; - auto req_deep_copy = state.requests[id]; - - for (auto &new_vertex : req_deep_copy.new_vertices) { - new_vertex.label_ids.erase(new_vertex.label_ids.begin()); - } - - auto &storage_client = GetStorageClientForShard(*shard_it); - - WriteRequests req = req_deep_copy; - storage_client.SendAsyncWriteRequest(req); - ++id; - } - } - - void SendAllRequests(ExecutionState<ExpandOneRequest> &state, - std::vector<memgraph::coordinator::Shard> &shard_cache_ref) { - size_t id = 0; - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++shard_it) { - auto &storage_client = GetStorageClientForShard(*shard_it); - ReadRequests req = state.requests[id]; - storage_client.SendAsyncReadRequest(req); - ++id; - } - } - - void AwaitOnResponses(ExecutionState<CreateVerticesRequest> &state, std::vector<CreateVerticesResponse> &responses) { - auto &shard_cache_ref = state.shard_cache; - int64_t request_idx = 0; - - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end();) { - auto &storage_client = GetStorageClientForShard(*shard_it); - - auto poll_result = storage_client.AwaitAsyncWriteRequest(); - if (!poll_result) { - ++shard_it; - ++request_idx; - - continue; - } - - if (poll_result->HasError()) { - throw std::runtime_error("CreateVertices request timed out"); - } - - WriteResponses response_variant = poll_result->GetValue(); - auto response = std::get<CreateVerticesResponse>(response_variant); - - if (response.error) { - throw std::runtime_error("CreateVertices request did not succeed"); - } - responses.push_back(response); - - shard_it = shard_cache_ref.erase(shard_it); - // Needed to maintain the 1-1 mapping between the ShardCache and the requests. - auto it = state.requests.begin() + request_idx; - state.requests.erase(it); - } - } - - void AwaitOnResponses(ExecutionState<ExpandOneRequest> &state, std::vector<ExpandOneResponse> &responses) { - auto &shard_cache_ref = state.shard_cache; - int64_t request_idx = 0; - - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end();) { - auto &storage_client = GetStorageClientForShard(*shard_it); - - auto poll_result = storage_client.PollAsyncReadRequest(); - if (!poll_result) { - ++shard_it; - ++request_idx; - continue; - } - - if (poll_result->HasError()) { - throw std::runtime_error("ExpandOne request timed out"); - } - - ReadResponses response_variant = poll_result->GetValue(); - auto response = std::get<ExpandOneResponse>(response_variant); - // -NOTE- - // Currently a boolean flag for signaling the overall success of the - // ExpandOne request does not exist. But it should, so here we assume - // that it is already in place. - if (response.error) { - throw std::runtime_error("ExpandOne request did not succeed"); - } - - responses.push_back(std::move(response)); - shard_it = shard_cache_ref.erase(shard_it); - // Needed to maintain the 1-1 mapping between the ShardCache and the requests. - auto it = state.requests.begin() + request_idx; - state.requests.erase(it); - } - } - - void AwaitOnResponses(ExecutionState<GetPropertiesRequest> &state, std::vector<GetPropertiesResponse> &responses) { - auto &shard_cache_ref = state.shard_cache; - int64_t request_idx = 0; - - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end();) { - auto &storage_client = GetStorageClientForShard(*shard_it); - - auto poll_result = storage_client.PollAsyncReadRequest(); - if (!poll_result) { - ++shard_it; - ++request_idx; - continue; - } - - if (poll_result->HasError()) { - throw std::runtime_error("GetProperties request timed out"); - } - - ReadResponses response_variant = poll_result->GetValue(); - auto response = std::get<GetPropertiesResponse>(response_variant); - if (response.result != GetPropertiesResponse::SUCCESS) { - throw std::runtime_error("GetProperties request did not succeed"); - } - - responses.push_back(std::move(response)); - shard_it = shard_cache_ref.erase(shard_it); - // Needed to maintain the 1-1 mapping between the ShardCache and the requests. - auto it = state.requests.begin() + request_idx; - state.requests.erase(it); - } - } - - void AwaitOnPaginatedRequests(ExecutionState<ScanVerticesRequest> &state, - std::vector<ScanVerticesResponse> &responses, - std::map<Shard, PaginatedResponseState> &paginated_response_tracker) { - auto &shard_cache_ref = state.shard_cache; - - // Find the first request that is not holding a paginated response. - int64_t request_idx = 0; - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end();) { - if (paginated_response_tracker.at(*shard_it) != PaginatedResponseState::Pending) { - ++shard_it; - ++request_idx; - continue; - } - - auto &storage_client = GetStorageClientForShard(*shard_it); - - auto await_result = storage_client.AwaitAsyncReadRequest(); - - if (!await_result) { - // Redirection has occured. - ++shard_it; - ++request_idx; - continue; - } - - if (await_result->HasError()) { - throw std::runtime_error("ScanAll request timed out"); - } - - ReadResponses read_response_variant = await_result->GetValue(); - auto response = std::get<ScanVerticesResponse>(read_response_variant); - if (response.error) { - throw std::runtime_error("ScanAll request did not succeed"); - } - - if (!response.next_start_id) { - paginated_response_tracker.erase((*shard_it)); - shard_cache_ref.erase(shard_it); - // Needed to maintain the 1-1 mapping between the ShardCache and the requests. - auto it = state.requests.begin() + request_idx; - state.requests.erase(it); - - } else { - state.requests[request_idx].start_id.second = response.next_start_id->second; - paginated_response_tracker[*shard_it] = PaginatedResponseState::PartiallyFinished; - } - responses.push_back(std::move(response)); - } - } - - void SetUpNameIdMappers() { - std::unordered_map<uint64_t, std::string> id_to_name; - for (const auto &[name, id] : shards_map_.labels) { - id_to_name.emplace(id.AsUint(), name); - } - labels_.StoreMapping(std::move(id_to_name)); - id_to_name.clear(); - for (const auto &[name, id] : shards_map_.properties) { - id_to_name.emplace(id.AsUint(), name); - } - properties_.StoreMapping(std::move(id_to_name)); - id_to_name.clear(); - for (const auto &[name, id] : shards_map_.edge_types) { - id_to_name.emplace(id.AsUint(), name); - } - edge_types_.StoreMapping(std::move(id_to_name)); - } - - ShardMap shards_map_; - storage::v3::NameIdMapper properties_; - storage::v3::NameIdMapper edge_types_; - storage::v3::NameIdMapper labels_; - CoordinatorClient coord_cli_; - RsmStorageClientManager<StorageClient> storage_cli_manager_; - memgraph::io::Io<TTransport> io_; - memgraph::coordinator::Hlc transaction_id_; - // TODO(kostasrim) Add batch prefetching -}; -} // namespace memgraph::msgs