From b82e8748ad9bf2a0a96a59cf9b4f00eaf9f86ba1 Mon Sep 17 00:00:00 2001 From: jeremy <jeremy.bailleux@memgraph.io> Date: Mon, 24 Oct 2022 12:03:51 +0200 Subject: [PATCH] Attempt impl --- src/query/v2/requests.hpp | 2 +- src/storage/v3/request_helper.cpp | 20 +++--- src/storage/v3/request_helper.hpp | 3 +- src/storage/v3/shard_rsm.cpp | 105 +++++++++++++++++++++--------- tests/simulation/shard_rsm.cpp | 32 ++++----- 5 files changed, 103 insertions(+), 59 deletions(-) diff --git a/src/query/v2/requests.hpp b/src/query/v2/requests.hpp index 49ae346cb..d8d637037 100644 --- a/src/query/v2/requests.hpp +++ b/src/query/v2/requests.hpp @@ -404,7 +404,7 @@ struct ExpandOneRequest { std::vector<std::string> vertex_expressions; std::vector<std::string> edge_expressions; - std::optional<std::vector<OrderBy>> order_by; + std::vector<OrderBy> order_by; // Limit the edges or the vertices? std::optional<size_t> limit; std::vector<std::string> filters; diff --git a/src/storage/v3/request_helper.cpp b/src/storage/v3/request_helper.cpp index 7dab4c5c1..60633fbbd 100644 --- a/src/storage/v3/request_helper.cpp +++ b/src/storage/v3/request_helper.cpp @@ -42,36 +42,38 @@ std::vector<Element<VertexAccessor>>::const_iterator GetStartOrderedElementsIter return ordered_elements.end(); } -std::vector<EdgeAccessor> GetEdgesFromVertex(const VertexAccessor &vertex_accessor, - const msgs::EdgeDirection direction) { +std::array<std::vector<EdgeAccessor>, 2> GetEdgesFromVertex(const VertexAccessor &vertex_accessor, + const msgs::EdgeDirection direction) { + std::vector<EdgeAccessor> in_edges; + std::vector<EdgeAccessor> out_edges; + switch (direction) { case memgraph::msgs::EdgeDirection::IN: { auto edges = vertex_accessor.InEdges(View::OLD); if (edges.HasValue()) { - return edges.GetValue(); + in_edges = edges.GetValue(); } - return {}; } case memgraph::msgs::EdgeDirection::OUT: { auto edges = vertex_accessor.OutEdges(View::OLD); if (edges.HasValue()) { - return edges.GetValue(); + out_edges = edges.GetValue(); } - return {}; } case memgraph::msgs::EdgeDirection::BOTH: { auto maybe_in_edges = vertex_accessor.InEdges(View::OLD); auto maybe_out_edges = vertex_accessor.OutEdges(View::OLD); std::vector<EdgeAccessor> edges; if (maybe_in_edges.HasValue()) { - edges = maybe_in_edges.GetValue(); + in_edges = maybe_in_edges.GetValue(); } if (maybe_out_edges.HasValue()) { - edges.insert(edges.end(), maybe_out_edges.GetValue().begin(), maybe_out_edges.GetValue().end()); + out_edges = maybe_out_edges.GetValue(); } - return edges; } } + + return std::array<std::vector<EdgeAccessor>, 2>{in_edges, out_edges}; } } // namespace memgraph::storage::v3 diff --git a/src/storage/v3/request_helper.hpp b/src/storage/v3/request_helper.hpp index 6212c4a69..63758cf63 100644 --- a/src/storage/v3/request_helper.hpp +++ b/src/storage/v3/request_helper.hpp @@ -163,5 +163,6 @@ std::vector<Element<VertexAccessor>>::const_iterator GetStartOrderedElementsIter const std::vector<Element<VertexAccessor>> &ordered_elements, const std::vector<PropertyValue> &start_ids, View view); -std::vector<EdgeAccessor> GetEdgesFromVertex(const VertexAccessor &vertex_accessor, msgs::EdgeDirection direction); +std::array<std::vector<EdgeAccessor>, 2> GetEdgesFromVertex(const VertexAccessor &vertex_accessor, + msgs::EdgeDirection direction); } // namespace memgraph::storage::v3 diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 97b275530..5f30692ec 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -354,6 +354,41 @@ std::optional<msgs::ExpandOneResultRow> GetExpandOneResult( return result_row; } +std::optional<msgs::ExpandOneResultRow> GetExpandOneResult2( + VertexAccessor v_acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, + std::vector<EdgeAccessor> in_edge_accessors, std::vector<EdgeAccessor> out_edge_accessors, + const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler) { + /// Fill up source vertex + auto source_vertex = FillUpSourceVertex(v_acc, req, src_vertex); + if (!source_vertex) { + return std::nullopt; + } + + /// Fill up source vertex properties + auto src_vertex_properties = FillUpSourceVertexProperties(v_acc, req); + if (!src_vertex_properties) { + return std::nullopt; + } + + /// Fill up connecting edges + auto in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edge_accessors), msgs::EdgeDirection::IN); + auto out_edges = maybe_filter_based_on_edge_uniquness(std::move(out_edge_accessors), msgs::EdgeDirection::OUT); + + msgs::ExpandOneResultRow result_row; + result_row.src_vertex = std::move(*source_vertex); + result_row.src_vertex_properties = std::move(*src_vertex_properties); + static constexpr bool kInEdges = true; + static constexpr bool kOutEdges = false; + if (!in_edges.empty() && !FillEdges<kInEdges>(in_edges, req, result_row, edge_filler)) { + return std::nullopt; + } + if (!out_edges.empty() && !FillEdges<kOutEdges>(out_edges, req, result_row, edge_filler)) { + return std::nullopt; + } + + return result_row; +} + EdgeUniqunessFunction InitializeEdgeUniqunessFunction(bool only_unique_neighbor_rows) { // Functions to select connecting edges based on uniquness EdgeUniqunessFunction maybe_filter_based_on_edge_uniquness; @@ -862,57 +897,67 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) { auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows); auto edge_filler = InitializeEdgeFillerFunction(req); - //#NoCommit code below is duplicated, one needs to factor it once it's clear - if (req.order_by) { + //#NoCommit code below is duplicated, one needs to factor it once it's clear! + if (!req.order_by.empty()) { auto vertex_iterable = acc.Vertices(View::OLD); - const auto ordered_vertices = OrderByElements<VertexAccessor>(acc, dba, vertex_iterable, *req.order_by); - std::vector<std::pair<VertexAccessor, std::vector<Element<EdgeAccessor>>>> vertex_ordered_edges; - vertex_ordered_edges.reserve(req.src_vertices.size()); + const auto ordered_vertices = OrderByElements<VertexAccessor>(acc, dba, vertex_iterable, req.order_by); - for (auto &src_vertex : req.src_vertices) { + for (auto &ordered_vertice : ordered_vertices) { // Get Vertex acc - auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW); - if (!src_vertex_acc_opt) { - action_successful = false; - spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}", - req.transaction_id.logical_id); - break; - } + auto src_vertex_acc = ordered_vertice.object_acc; if (!req.filters.empty()) { // NOTE - DbAccessor might get removed in the future. - auto dba = DbAccessor{&acc}; - const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol); + const bool eval = FilterOnVertex(dba, src_vertex_acc, req.filters, expr::identifier_node_symbol); if (!eval) { continue; } } - auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler); - if (!result) { + auto [in_edge_accessors, out_edge_accessors] = GetEdgesFromVertex(src_vertex_acc, req.direction); + const auto in_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, in_edge_accessors, req.order_by); + const auto out_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, out_edge_accessors, req.order_by); + + std::vector<EdgeAccessor> in_edge_ordered_accessors; + std::transform(in_ordered_edges.begin(), in_ordered_edges.end(), std::back_inserter(in_edge_ordered_accessors), + [](auto &edge_element) { return edge_element.object_acc; }); + + std::vector<EdgeAccessor> out_edge_ordered_accessors; + std::transform(out_ordered_edges.begin(), out_ordered_edges.end(), std::back_inserter(out_edge_ordered_accessors), + [](auto &edge_element) { return edge_element.object_acc; }); + + auto label_id = src_vertex_acc.PrimaryLabel(View::NEW); + if (label_id.HasError()) { + action_successful = false; + break; + } + auto primary_key = src_vertex_acc.PrimaryKey(View::NEW); + if (primary_key.HasError()) { action_successful = false; break; } - results.emplace_back(result.value()); - if (batch_limit) { // #NoCommit can ebd one differently + msgs::VertexId src_vertice; + src_vertice.first = msgs::Label{.id = *label_id}; + src_vertice.second = conversions::ConvertValueVector(*primary_key); + auto maybe_result = + GetExpandOneResult2(src_vertex_acc, src_vertice, req, in_edge_ordered_accessors, out_edge_ordered_accessors, + maybe_filter_based_on_edge_uniquness, edge_filler); + + if (!maybe_result) { + action_successful = false; + break; + } + + results.emplace_back(maybe_result.value()); + + if (batch_limit) { // #NoCommit can be done differently --*batch_limit; if (batch_limit < 0) { break; } } } - // Now we have to construct response like this, and here we will care about - // limit: - // v1 e1 - // v1 e2 - // v1 e3 - // v2 e4 - // v2 e5 - // v2 e6 - // v2 e7 - // v3 e8 - // v3 e9 } else { for (auto &src_vertex : req.src_vertices) { // Get Vertex acc diff --git a/tests/simulation/shard_rsm.cpp b/tests/simulation/shard_rsm.cpp index 6c3ea7d60..8883e4952 100644 --- a/tests/simulation/shard_rsm.cpp +++ b/tests/simulation/shard_rsm.cpp @@ -531,7 +531,7 @@ void AttemptToExpandOneWithWrongEdgeType(ShardClient &client, uint64_t src_verte std::optional<std::vector<PropertyId>> edge_properties = {}; std::vector<std::string> expressions; - std::optional<std::vector<msgs::OrderBy>> order_by = {}; + std::vector<msgs::OrderBy> order_by = {}; std::optional<size_t> limit = {}; std::vector<std::string> filter = {}; @@ -586,7 +586,7 @@ void AttemptToExpandOneSimple(ShardClient &client, uint64_t src_vertex_val, Edge std::optional<std::vector<PropertyId>> edge_properties = {}; std::vector<std::string> expressions; - std::optional<std::vector<msgs::OrderBy>> order_by = {}; + std::vector<msgs::OrderBy> order_by = {}; std::optional<size_t> limit = {}; std::vector<std::string> filter = {}; @@ -642,7 +642,7 @@ void AttemptToExpandOneWithUniqueEdges(ShardClient &client, uint64_t src_vertex_ std::optional<std::vector<PropertyId>> edge_properties = {}; std::vector<std::string> expressions; - std::optional<std::vector<msgs::OrderBy>> order_by = {}; + std::vector<msgs::OrderBy> order_by = {}; std::optional<size_t> limit = {}; std::vector<std::string> filter = {}; @@ -698,9 +698,9 @@ void AttemptToExpandOneLimitAndOrderBy(ShardClient &client, uint64_t src_vertex_ // Edge properties to look for std::optional<std::vector<PropertyId>> edge_properties = {}; - std::vector<std::string> expressions; - std::optional<std::vector<msgs::OrderBy>> order_by = {}; - // std::optional<size_t> limit = 5; + std::vector<msgs::OrderBy> order_by = { + {msgs::Expression{"MG_SYMBOL_NODE.prop1"}, msgs::OrderingDirection::DESCENDING}}; + std::optional<size_t> limit = 3; // std::optional<msgs::Filter> filter = {}; msgs::ExpandOneRequest expand_one_req{}; @@ -708,9 +708,8 @@ void AttemptToExpandOneLimitAndOrderBy(ShardClient &client, uint64_t src_vertex_ expand_one_req.direction = edge_direction; expand_one_req.edge_properties = edge_properties; expand_one_req.edge_types = {edge_type}; - // expand_one_req.expressions = expressions; #NoCommit not existing? - // expand_one_req.filter = filter; - // expand_one_req.limit = limit; + // expand_one_req.filter = filter; #NoCommit later? + expand_one_req.limit = limit; expand_one_req.order_by = order_by; expand_one_req.src_vertex_properties = src_vertex_properties; expand_one_req.src_vertices = {src_vertex}; @@ -725,12 +724,9 @@ void AttemptToExpandOneLimitAndOrderBy(ShardClient &client, uint64_t src_vertex_ auto write_response_result = read_res.GetValue(); auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result); MG_ASSERT(write_response.result.size() == 1); - // MG_ASSERT(write_response.result[0].edges_with_all_properties->size() == 10); // #NoCommit where does that come - // from? - // auto number_of_properties_on_edge = - // (std::get<std::map<PropertyId, msgs::Value>>(write_response.result[0].edges_with_all_properties.value()[0])) - // .size(); - // MG_ASSERT(number_of_properties_on_edge == 1); + MG_ASSERT(write_response.result[0].out_edges_with_all_properties.size() == 10); + auto number_of_properties_on_edge = write_response.result[0].out_edges_with_all_properties.size(); + MG_ASSERT(number_of_properties_on_edge == 1); break; } } @@ -756,7 +752,7 @@ void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uin std::optional<std::vector<PropertyId>> edge_properties = {}; std::vector<std::string> expressions; - std::optional<std::vector<msgs::OrderBy>> order_by = {}; + std::vector<msgs::OrderBy> order_by = {}; std::optional<size_t> limit = {}; std::vector<std::string> filter = {}; @@ -816,7 +812,7 @@ void AttemptToExpandOneWithSpecifiedEdgeProperties(ShardClient &client, uint64_t std::optional<std::vector<PropertyId>> edge_properties = {specified_edge_prop}; std::vector<std::string> expressions; - std::optional<std::vector<msgs::OrderBy>> order_by = {}; + std::vector<msgs::OrderBy> order_by = {}; std::optional<size_t> limit = {}; std::vector<std::string> filter = {}; @@ -875,7 +871,7 @@ void AttemptToExpandOneWithFilters(ShardClient &client, uint64_t src_vertex_val, std::optional<std::vector<PropertyId>> edge_properties = {}; std::vector<std::string> expressions; - std::optional<std::vector<msgs::OrderBy>> order_by = {}; + std::vector<msgs::OrderBy> order_by = {}; std::optional<size_t> limit = {}; std::vector<std::string> filter = {};