From f94f72438d81539608bce756a4b5ac941259396d Mon Sep 17 00:00:00 2001 From: Marko Budiselic <marko.budiselic@memgraph.com> Date: Tue, 14 Mar 2023 12:49:41 +0000 Subject: [PATCH] Add simple test and rr implementation --- src/query/v2/request_router.hpp | 35 ++++++++++++++++++++++++++++- tests/simulation/common.hpp | 5 ++++- tests/simulation/request_router.cpp | 10 ++++----- 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index 6f3d762fc..4da5e7ccc 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -406,7 +406,40 @@ class RequestRouter : public RequestRouterInterface { std::vector<msgs::GraphResponse> GetGraph(msgs::GraphRequest req) override { SPDLOG_WARN("RequestRouter::GetGraph(GraphRequest) not fully implemented"); - return {msgs::GraphResponse{}}; + + // InitializeRequests + // TODO(gitbuda): This seems quite expensive because potentially a lot of requests has to be initialized. + auto multi_shards = shards_map_.GetAllShards(); + std::vector<ShardRequestState<msgs::GraphRequest>> requests = {}; + for (auto &shards : multi_shards) { + for (auto &[key, shard] : shards) { + MG_ASSERT(!shard.peers.empty()); + msgs::GraphRequest request; + request.transaction_id = transaction_id_; + ShardRequestState<msgs::GraphRequest> shard_request_state{ + .shard = shard, + .request = std::move(request), + }; + requests.emplace_back(std::move(shard_request_state)); + } + } + spdlog::trace("created {} Graph requests", requests.size()); + + // SendRequests and CollectResponses + RunningRequests<msgs::GraphRequest> running_requests = {}; + running_requests.reserve(requests.size()); + for (size_t i = 0; i < requests.size(); i++) { + auto &request = requests[i]; + io::ReadinessToken readiness_token{i}; + auto &storage_client = GetStorageClientForShard(request.shard); + storage_client.SendAsyncReadRequest(request.request, notifier_, readiness_token); + running_requests.emplace(readiness_token.GetId(), request); + } + spdlog::trace("sent {} Graph requests in parallel", running_requests.size()); + auto responses = DriveReadResponses<msgs::GraphRequest, msgs::GraphResponse>(running_requests); + spdlog::trace("got back {} Graph responses after driving to completion", responses.size()); + + return responses; } std::optional<storage::v3::PropertyId> MaybeNameToProperty(const std::string &name) const override { diff --git a/tests/simulation/common.hpp b/tests/simulation/common.hpp index 8e07258a9..6471fe92c 100644 --- a/tests/simulation/common.hpp +++ b/tests/simulation/common.hpp @@ -118,7 +118,10 @@ class MockedShardRsm { return resp; } - msgs::GraphResponse ReadImpl(msgs::GraphRequest rqst) { LOG_FATAL("Implement Simulator ReadImpl GraphRequest"); } + msgs::GraphResponse ReadImpl(msgs::GraphRequest rqst) { + msgs::GraphResponse resp; + return resp; + } ReadResponses Read(ReadRequests read_requests) { return {std::visit([this]<typename T>(T &&request) { return ReadResponses{ReadImpl(std::forward<T>(request))}; }, diff --git a/tests/simulation/request_router.cpp b/tests/simulation/request_router.cpp index 9f7210d64..86c386b2b 100644 --- a/tests/simulation/request_router.cpp +++ b/tests/simulation/request_router.cpp @@ -213,17 +213,17 @@ void TestGetProperties(query::v2::RequestRouterInterface &request_router) { auto result = request_router.GetProperties(std::move(request)); MG_ASSERT(result.size() == 3); } - template <typename RequestRouter> void TestAggregate(RequestRouter &request_router) {} void TestGetGraph(query::v2::RequestRouterInterface &rr) { msgs::GraphRequest req; auto graphs = rr.GetGraph(req); - MG_ASSERT(graphs.size() == 1); - auto graph = graphs[0]; - MG_ASSERT(graph.data.vertices.size() == 0); - MG_ASSERT(graph.data.edges.size() == 0); + MG_ASSERT(graphs.size() == 2); + for (const auto &graph : graphs) { + MG_ASSERT(graph.data.vertices.size() == 0); + MG_ASSERT(graph.data.edges.size() == 0); + } } void DoTest() {