diff --git a/src/storage/v3/expr.hpp b/src/storage/v3/expr.hpp index c3199abf1..7eefdd71c 100644 --- a/src/storage/v3/expr.hpp +++ b/src/storage/v3/expr.hpp @@ -9,6 +9,8 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#pragma once + #include <vector> #include "db_accessor.hpp" diff --git a/src/storage/v3/request_helper.cpp b/src/storage/v3/request_helper.cpp index bb1c8bca4..7dab4c5c1 100644 --- a/src/storage/v3/request_helper.cpp +++ b/src/storage/v3/request_helper.cpp @@ -19,43 +19,6 @@ namespace memgraph::storage::v3 { -std::vector<Element> OrderByElements(Shard::Accessor &acc, DbAccessor &dba, VerticesIterable &vertices_iterable, - std::vector<msgs::OrderBy> &order_bys) { - std::vector<Element> ordered; - ordered.reserve(acc.ApproximateVertexCount()); - std::vector<Ordering> ordering; - ordering.reserve(order_bys.size()); - for (const auto &order : order_bys) { - switch (order.direction) { - case memgraph::msgs::OrderingDirection::ASCENDING: { - ordering.push_back(Ordering::ASC); - break; - } - case memgraph::msgs::OrderingDirection::DESCENDING: { - ordering.push_back(Ordering::DESC); - break; - } - } - } - auto compare_typed_values = TypedValueVectorCompare(ordering); - for (auto it = vertices_iterable.begin(); it != vertices_iterable.end(); ++it) { - std::vector<TypedValue> properties_order_by; - properties_order_by.reserve(order_bys.size()); - - for (const auto &order_by : order_bys) { - const auto val = - ComputeExpression(dba, *it, std::nullopt, order_by.expression.expression, expr::identifier_node_symbol, ""); - properties_order_by.push_back(val); - } - ordered.push_back({std::move(properties_order_by), *it}); - } - - std::sort(ordered.begin(), ordered.end(), [compare_typed_values](const auto &pair1, const auto &pair2) { - return compare_typed_values(pair1.properties_order_by, pair2.properties_order_by); - }); - return ordered; -} - VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable, const std::vector<PropertyValue> &start_ids, const View view) { auto it = vertex_iterable.begin(); @@ -68,15 +31,47 @@ VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_itera return it; } -std::vector<Element>::const_iterator GetStartOrderedElementsIterator(const std::vector<Element> &ordered_elements, - const std::vector<PropertyValue> &start_ids, - const View view) { +std::vector<Element<VertexAccessor>>::const_iterator GetStartOrderedElementsIterator( + const std::vector<Element<VertexAccessor>> &ordered_elements, const std::vector<PropertyValue> &start_ids, + const View view) { for (auto it = ordered_elements.begin(); it != ordered_elements.end(); ++it) { - if (const auto &vertex = it->vertex_acc; start_ids <= vertex.PrimaryKey(view).GetValue()) { + if (const auto &vertex = it->object_acc; start_ids <= vertex.PrimaryKey(view).GetValue()) { return it; } } return ordered_elements.end(); } +std::vector<EdgeAccessor> GetEdgesFromVertex(const VertexAccessor &vertex_accessor, + const msgs::EdgeDirection direction) { + switch (direction) { + case memgraph::msgs::EdgeDirection::IN: { + auto edges = vertex_accessor.InEdges(View::OLD); + if (edges.HasValue()) { + return edges.GetValue(); + } + return {}; + } + case memgraph::msgs::EdgeDirection::OUT: { + auto edges = vertex_accessor.OutEdges(View::OLD); + if (edges.HasValue()) { + return edges.GetValue(); + } + return {}; + } + case memgraph::msgs::EdgeDirection::BOTH: { + auto maybe_in_edges = vertex_accessor.InEdges(View::OLD); + auto maybe_out_edges = vertex_accessor.OutEdges(View::OLD); + std::vector<EdgeAccessor> edges; + if (maybe_in_edges.HasValue()) { + edges = maybe_in_edges.GetValue(); + } + if (maybe_out_edges.HasValue()) { + edges.insert(edges.end(), maybe_out_edges.GetValue().begin(), maybe_out_edges.GetValue().end()); + } + return edges; + } + } +} + } // namespace memgraph::storage::v3 diff --git a/src/storage/v3/request_helper.hpp b/src/storage/v3/request_helper.hpp index 24ed40f8c..6212c4a69 100644 --- a/src/storage/v3/request_helper.hpp +++ b/src/storage/v3/request_helper.hpp @@ -9,14 +9,21 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#pragma once + #include <vector> #include "ast/ast.hpp" +#include "pretty_print_ast_to_original_expression.hpp" // #NoCommit why like this? +#include "query/v2/requests.hpp" #include "storage/v3/bindings/typed_value.hpp" +#include "storage/v3/edge_accessor.hpp" +#include "storage/v3/expr.hpp" #include "storage/v3/shard.hpp" #include "storage/v3/vertex_accessor.hpp" - namespace memgraph::storage::v3 { +template <typename T> +concept ObjectAccessor = std::is_same_v<T, VertexAccessor> || std::is_same_v<T, EdgeAccessor>; inline bool TypedValueCompare(const TypedValue &a, const TypedValue &b) { // in ordering null comes after everything else @@ -99,18 +106,62 @@ class TypedValueVectorCompare final { std::vector<Ordering> ordering_; }; +template <ObjectAccessor TObjectAccessor> struct Element { std::vector<TypedValue> properties_order_by; - VertexAccessor vertex_acc; + TObjectAccessor object_acc; }; -std::vector<Element> OrderByElements(Shard::Accessor &acc, DbAccessor &dba, VerticesIterable &vertices_iterable, - std::vector<msgs::OrderBy> &order_bys); +// #NoCommit in cpp +template <ObjectAccessor TObjectAccessor, typename TIterable> +std::vector<Element<TObjectAccessor>> OrderByElements(Shard::Accessor &acc, DbAccessor &dba, TIterable &iterable, + std::vector<msgs::OrderBy> &order_bys) { + std::vector<Element<TObjectAccessor>> ordered; + ordered.reserve(acc.ApproximateVertexCount()); + std::vector<Ordering> ordering; + ordering.reserve(order_bys.size()); + for (const auto &order : order_bys) { + switch (order.direction) { + case memgraph::msgs::OrderingDirection::ASCENDING: { + ordering.push_back(Ordering::ASC); + break; + } + case memgraph::msgs::OrderingDirection::DESCENDING: { + ordering.push_back(Ordering::DESC); + break; + } + } + } + auto compare_typed_values = TypedValueVectorCompare(ordering); + auto it = iterable.begin(); + for (; it != iterable.end(); ++it) { + std::vector<TypedValue> properties_order_by; + properties_order_by.reserve(order_bys.size()); + + for (const auto &order_by : order_bys) { + if constexpr (std::is_same_v<TIterable, VerticesIterable>) { + properties_order_by.push_back(ComputeExpression(dba, *it, std::nullopt, order_by.expression.expression, + expr::identifier_node_symbol, expr::identifier_edge_symbol)); + } else { + properties_order_by.push_back(ComputeExpression(dba, std::nullopt, *it, order_by.expression.expression, + expr::identifier_node_symbol, expr::identifier_edge_symbol)); + } + } + ordered.push_back({std::move(properties_order_by), *it}); + } + + std::sort(ordered.begin(), ordered.end(), [compare_typed_values](const auto &pair1, const auto &pair2) { + return compare_typed_values(pair1.properties_order_by, pair2.properties_order_by); + }); + return ordered; +} VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable, const std::vector<PropertyValue> &start_ids, View view); -std::vector<Element>::const_iterator GetStartOrderedElementsIterator(const std::vector<Element> &ordered_elements, - const std::vector<PropertyValue> &start_ids, - View view); +std::vector<Element<VertexAccessor>>::const_iterator GetStartOrderedElementsIterator( + const std::vector<Element<VertexAccessor>> &ordered_elements, const std::vector<PropertyValue> &start_ids, + View view); + +std::vector<EdgeAccessor> GetEdgesFromVertex(const VertexAccessor &vertex_accessor, msgs::EdgeDirection direction); } // namespace memgraph::storage::v3 diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 679f95172..97b275530 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -15,6 +15,7 @@ #include <unordered_set> #include <utility> +#include <unordered_map> #include "parser/opencypher/parser.hpp" #include "query/v2/requests.hpp" #include "storage/v3/bindings/ast/ast.hpp" @@ -26,6 +27,7 @@ #include "storage/v3/bindings/symbol_generator.hpp" #include "storage/v3/bindings/symbol_table.hpp" #include "storage/v3/bindings/typed_value.hpp" +#include "storage/v3/edge_accessor.hpp" #include "storage/v3/expr.hpp" #include "storage/v3/id_types.hpp" #include "storage/v3/key_store.hpp" @@ -802,18 +804,18 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) { uint64_t sample_counter{0}; auto vertex_iterable = acc.Vertices(view); if (!req.order_bys.empty()) { - const auto ordered = OrderByElements(acc, dba, vertex_iterable, req.order_bys); + const auto ordered = OrderByElements<VertexAccessor>(acc, dba, vertex_iterable, req.order_bys); // we are traversing Elements auto it = GetStartOrderedElementsIterator(ordered, start_id, View(req.storage_view)); for (; it != ordered.end(); ++it) { - emplace_scan_result(it->vertex_acc); + emplace_scan_result(it->object_acc); ++sample_counter; if (req.batch_limit && sample_counter == req.batch_limit) { // Reached the maximum specified batch size. // Get the next element before exiting. ++it; if (it != ordered.end()) { - const auto &next_vertex = it->vertex_acc; + const auto &next_vertex = it->object_acc; next_start_id = ConstructValueVertex(next_vertex, view).vertex_v.id; } @@ -854,36 +856,97 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) { bool action_successful = true; std::vector<msgs::ExpandOneResultRow> results; + auto batch_limit = req.limit; + auto dba = DbAccessor{&acc}; auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows); auto edge_filler = InitializeEdgeFillerFunction(req); - for (auto &src_vertex : req.src_vertices) { - // Get Vertex acc - auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW); - if (!src_vertex_acc_opt) { - action_successful = false; - spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}", - req.transaction_id.logical_id); - break; - } + //#NoCommit code below is duplicated, one needs to factor it once it's clear + if (req.order_by) { + auto vertex_iterable = acc.Vertices(View::OLD); + const auto ordered_vertices = OrderByElements<VertexAccessor>(acc, dba, vertex_iterable, *req.order_by); + std::vector<std::pair<VertexAccessor, std::vector<Element<EdgeAccessor>>>> vertex_ordered_edges; + vertex_ordered_edges.reserve(req.src_vertices.size()); - if (!req.filters.empty()) { - // NOTE - DbAccessor might get removed in the future. - auto dba = DbAccessor{&acc}; - const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol); - if (!eval) { - continue; + for (auto &src_vertex : req.src_vertices) { + // Get Vertex acc + auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW); + if (!src_vertex_acc_opt) { + action_successful = false; + spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}", + req.transaction_id.logical_id); + break; + } + + if (!req.filters.empty()) { + // NOTE - DbAccessor might get removed in the future. + auto dba = DbAccessor{&acc}; + const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol); + if (!eval) { + continue; + } + } + auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler); + + if (!result) { + action_successful = false; + break; + } + + results.emplace_back(result.value()); + if (batch_limit) { // #NoCommit can ebd one differently + --*batch_limit; + if (batch_limit < 0) { + break; + } } } - auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler); + // Now we have to construct response like this, and here we will care about + // limit: + // v1 e1 + // v1 e2 + // v1 e3 + // v2 e4 + // v2 e5 + // v2 e6 + // v2 e7 + // v3 e8 + // v3 e9 + } else { + for (auto &src_vertex : req.src_vertices) { + // Get Vertex acc + auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW); + if (!src_vertex_acc_opt) { + action_successful = false; + spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}", + req.transaction_id.logical_id); + break; + } - if (!result) { - action_successful = false; - break; + if (!req.filters.empty()) { + // NOTE - DbAccessor might get removed in the future. + auto dba = DbAccessor{&acc}; + const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol); + if (!eval) { + continue; + } + } + auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler); + + if (!result) { + action_successful = false; + break; + } + + results.emplace_back(result.value()); + if (batch_limit) { // #NoCommit can ebd one differently + --*batch_limit; + if (batch_limit < 0) { + break; + } + } } - - results.emplace_back(result.value()); } msgs::ExpandOneResponse resp{}; diff --git a/tests/simulation/shard_rsm.cpp b/tests/simulation/shard_rsm.cpp index d5f8f2775..6c3ea7d60 100644 --- a/tests/simulation/shard_rsm.cpp +++ b/tests/simulation/shard_rsm.cpp @@ -680,6 +680,61 @@ void AttemptToExpandOneWithUniqueEdges(ShardClient &client, uint64_t src_vertex_ } } +void AttemptToExpandOneLimitAndOrderBy(ShardClient &client, uint64_t src_vertex_val, EdgeTypeId edge_type_id) { + // Source vertex + msgs::Label label = {.id = get_primary_label()}; + auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val)); + + // Edge type + auto edge_type = msgs::EdgeType{}; + edge_type.id = edge_type_id; + + // Edge direction + auto edge_direction = msgs::EdgeDirection::OUT; + + // Source Vertex properties to look for + std::optional<std::vector<PropertyId>> src_vertex_properties = {}; + + // Edge properties to look for + std::optional<std::vector<PropertyId>> edge_properties = {}; + + std::vector<std::string> expressions; + std::optional<std::vector<msgs::OrderBy>> order_by = {}; + // std::optional<size_t> limit = 5; + // std::optional<msgs::Filter> filter = {}; + + msgs::ExpandOneRequest expand_one_req{}; + + expand_one_req.direction = edge_direction; + expand_one_req.edge_properties = edge_properties; + expand_one_req.edge_types = {edge_type}; + // expand_one_req.expressions = expressions; #NoCommit not existing? + // expand_one_req.filter = filter; + // expand_one_req.limit = limit; + expand_one_req.order_by = order_by; + expand_one_req.src_vertex_properties = src_vertex_properties; + expand_one_req.src_vertices = {src_vertex}; + expand_one_req.transaction_id.logical_id = GetTransactionId(); + + while (true) { + auto read_res = client.SendReadRequest(expand_one_req); + if (read_res.HasError()) { + continue; + } + + auto write_response_result = read_res.GetValue(); + auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result); + MG_ASSERT(write_response.result.size() == 1); + // MG_ASSERT(write_response.result[0].edges_with_all_properties->size() == 10); // #NoCommit where does that come + // from? + // auto number_of_properties_on_edge = + // (std::get<std::map<PropertyId, msgs::Value>>(write_response.result[0].edges_with_all_properties.value()[0])) + // .size(); + // MG_ASSERT(number_of_properties_on_edge == 1); + break; + } +} + void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uint64_t src_vertex_val, EdgeTypeId edge_type_id) { // Source vertex @@ -1021,6 +1076,9 @@ void TestExpandOneGraphOne(ShardClient &client) { auto edge_prop_id = GetUniqueInteger(); auto edge_prop_val = GetUniqueInteger(); + std::vector<uint64_t> edges_ids(10); + std::generate(edges_ids.begin(), edges_ids.end(), GetUniqueInteger); + // (V1)-[edge_type_id]->(V2) MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_1, unique_prop_val_2, edge_gid_1, edge_prop_id, edge_prop_val, {edge_type_id})); @@ -1028,6 +1086,13 @@ void TestExpandOneGraphOne(ShardClient &client) { MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_1, unique_prop_val_3, edge_gid_2, edge_prop_id, edge_prop_val, {edge_type_id})); + // (V2)-[edge_type_id]->(V3) x 10 + std::for_each(edges_ids.begin(), edges_ids.end(), [&](const auto &edge_id) { + MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_2, unique_prop_val_3, edge_id, edge_prop_id, + edge_prop_val, {edge_type_id})); + }); + AttemptToExpandOneLimitAndOrderBy(client, unique_prop_val_2, edge_type_id); // #NoCommit + AttemptToExpandOneSimple(client, unique_prop_val_1, edge_type_id); AttemptToExpandOneWithWrongEdgeType(client, unique_prop_val_1, wrong_edge_type_id); AttemptToExpandOneWithSpecifiedSrcVertexProperties(client, unique_prop_val_1, edge_type_id);