diff --git a/src/query/v2/requests.hpp b/src/query/v2/requests.hpp index 356e2c0da..69cdc084b 100644 --- a/src/query/v2/requests.hpp +++ b/src/query/v2/requests.hpp @@ -403,7 +403,9 @@ struct ExpandOneRequest { std::vector<std::string> vertex_expressions; std::vector<std::string> edge_expressions; - std::optional<std::vector<OrderBy>> order_by; + std::vector<OrderBy> order_by_vertices; + std::vector<OrderBy> order_by_edges; + // Limit the edges or the vertices? std::optional<size_t> limit; std::vector<std::string> filters; diff --git a/src/storage/v3/expr.cpp b/src/storage/v3/expr.cpp index 1d6739433..8062a2662 100644 --- a/src/storage/v3/expr.cpp +++ b/src/storage/v3/expr.cpp @@ -165,7 +165,7 @@ std::any ParseExpression(const std::string &expr, memgraph::expr::AstStorage &st return visitor.visit(ast); } -TypedValue ComputeExpression(DbAccessor &dba, const std::optional<memgraph::storage::v3::VertexAccessor> &v_acc, +TypedValue ComputeExpression(DbAccessor &dba, const memgraph::storage::v3::VertexAccessor &v_acc, const std::optional<memgraph::storage::v3::EdgeAccessor> &e_acc, const std::string &expression, std::string_view node_name, std::string_view edge_name) { AstStorage storage; @@ -192,10 +192,11 @@ TypedValue ComputeExpression(DbAccessor &dba, const std::optional<memgraph::stor return position_symbol_pair.second.name() == node_name; }) != symbol_table.table().end()); - frame[symbol_table.at(node_identifier)] = *v_acc; + frame[symbol_table.at(node_identifier)] = v_acc; } if (edge_identifier.symbol_pos_ != -1) { + MG_ASSERT(e_acc.has_value()); MG_ASSERT(std::find_if(symbol_table.table().begin(), symbol_table.table().end(), [&edge_name](const std::pair<int32_t, Symbol> &position_symbol_pair) { return position_symbol_pair.second.name() == edge_name; diff --git a/src/storage/v3/expr.hpp b/src/storage/v3/expr.hpp index c3199abf1..d344d36b2 100644 --- a/src/storage/v3/expr.hpp +++ b/src/storage/v3/expr.hpp @@ -9,6 +9,8 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#pragma once + #include <vector> #include "db_accessor.hpp" @@ -48,8 +50,7 @@ auto Eval(TExpression *expr, EvaluationContext &ctx, AstStorage &storage, Expres std::any ParseExpression(const std::string &expr, AstStorage &storage); -TypedValue ComputeExpression(DbAccessor &dba, const std::optional<VertexAccessor> &v_acc, - const std::optional<EdgeAccessor> &e_acc, const std::string &expression, - std::string_view node_name, std::string_view edge_name); +TypedValue ComputeExpression(DbAccessor &dba, const VertexAccessor &v_acc, const std::optional<EdgeAccessor> &e_acc, + const std::string &expression, std::string_view node_name, std::string_view edge_name); } // namespace memgraph::storage::v3 diff --git a/src/storage/v3/request_helper.cpp b/src/storage/v3/request_helper.cpp index 0be659c43..8288a6154 100644 --- a/src/storage/v3/request_helper.cpp +++ b/src/storage/v3/request_helper.cpp @@ -13,54 +13,435 @@ #include <vector> -#include "pretty_print_ast_to_original_expression.hpp" #include "storage/v3/bindings/db_accessor.hpp" +#include "storage/v3/bindings/pretty_print_ast_to_original_expression.hpp" #include "storage/v3/expr.hpp" +#include "storage/v3/value_conversions.hpp" namespace memgraph::storage::v3 { +using msgs::Label; +using msgs::PropertyId; -std::vector<Element> OrderByElements(Shard::Accessor &acc, DbAccessor &dba, VerticesIterable &vertices_iterable, - std::vector<msgs::OrderBy> &order_bys) { - std::vector<Element> ordered; - ordered.reserve(acc.ApproximateVertexCount()); - std::vector<Ordering> ordering; - ordering.reserve(order_bys.size()); - for (const auto &order : order_bys) { - switch (order.direction) { - case memgraph::msgs::OrderingDirection::ASCENDING: { - ordering.push_back(Ordering::ASC); - break; - } - case memgraph::msgs::OrderingDirection::DESCENDING: { - ordering.push_back(Ordering::DESC); - break; - } - } +using conversions::ConvertPropertyVector; +using conversions::FromPropertyValueToValue; +using conversions::ToMsgsVertexId; + +namespace { + +using AllEdgePropertyDataSructure = std::map<PropertyId, msgs::Value>; +using SpecificEdgePropertyDataSructure = std::vector<msgs::Value>; + +using AllEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, AllEdgePropertyDataSructure>; +using SpecificEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, SpecificEdgePropertyDataSructure>; + +using SpecificEdgePropertiesVector = std::vector<SpecificEdgeProperties>; +using AllEdgePropertiesVector = std::vector<AllEdgeProperties>; + +struct VertexIdCmpr { + bool operator()(const storage::v3::VertexId *lhs, const storage::v3::VertexId *rhs) const { return *lhs < *rhs; } +}; + +std::optional<std::map<PropertyId, Value>> PrimaryKeysFromAccessor(const VertexAccessor &acc, View view, + const Schemas::Schema &schema) { + std::map<PropertyId, Value> ret; + auto props = acc.Properties(view); + auto maybe_pk = acc.PrimaryKey(view); + if (maybe_pk.HasError()) { + spdlog::debug("Encountered an error while trying to get vertex primary key."); + return std::nullopt; } - auto compare_typed_values = TypedValueVectorCompare(ordering); - for (auto it = vertices_iterable.begin(); it != vertices_iterable.end(); ++it) { - std::vector<TypedValue> properties_order_by; - properties_order_by.reserve(order_bys.size()); - - for (const auto &order_by : order_bys) { - const auto val = - ComputeExpression(dba, *it, std::nullopt, order_by.expression.expression, expr::identifier_node_symbol, ""); - properties_order_by.push_back(val); - } - ordered.push_back({std::move(properties_order_by), *it}); + auto &pk = maybe_pk.GetValue(); + MG_ASSERT(schema.second.size() == pk.size(), "PrimaryKey size does not match schema!"); + for (size_t i{0}; i < schema.second.size(); ++i) { + ret.emplace(schema.second[i].property_id, FromPropertyValueToValue(std::move(pk[i]))); } - std::sort(ordered.begin(), ordered.end(), [compare_typed_values](const auto &pair1, const auto &pair2) { - return compare_typed_values(pair1.properties_order_by, pair2.properties_order_by); + return ret; +} + +std::optional<std::vector<msgs::Label>> FillUpSourceVertexSecondaryLabels(const std::optional<VertexAccessor> &v_acc, + const msgs::ExpandOneRequest &req) { + auto secondary_labels = v_acc->Labels(View::NEW); + if (secondary_labels.HasError()) { + spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + + auto &sec_labels = secondary_labels.GetValue(); + std::vector<msgs::Label> msgs_secondary_labels; + msgs_secondary_labels.reserve(sec_labels.size()); + + std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(msgs_secondary_labels), + [](auto label_id) { return msgs::Label{.id = label_id}; }); + + return msgs_secondary_labels; +} + +std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std::optional<VertexAccessor> &v_acc, + const msgs::ExpandOneRequest &req, + storage::v3::View view, + const Schemas::Schema &schema) { + std::map<PropertyId, Value> src_vertex_properties; + + if (!req.src_vertex_properties) { + auto props = v_acc->Properties(View::NEW); + if (props.HasError()) { + spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + + for (auto &[key, val] : props.GetValue()) { + src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(std::move(val)))); + } + auto pks = PrimaryKeysFromAccessor(*v_acc, view, schema); + if (pks) { + src_vertex_properties.merge(*pks); + } + + } else if (req.src_vertex_properties.value().empty()) { + // NOOP + } else { + for (const auto &prop : req.src_vertex_properties.value()) { + auto prop_val = v_acc->GetProperty(prop, View::OLD); + if (prop_val.HasError()) { + spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + src_vertex_properties.insert(std::make_pair(prop, FromPropertyValueToValue(std::move(prop_val.GetValue())))); + } + } + + return src_vertex_properties; +} + +std::optional<std::array<std::vector<EdgeAccessor>, 2>> FillUpConnectingEdges( + const std::optional<VertexAccessor> &v_acc, const msgs::ExpandOneRequest &req, + const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness) { + std::vector<EdgeTypeId> edge_types{}; + edge_types.reserve(req.edge_types.size()); + std::transform(req.edge_types.begin(), req.edge_types.end(), std::back_inserter(edge_types), + [](const msgs::EdgeType &edge_type) { return edge_type.id; }); + + std::vector<EdgeAccessor> in_edges; + std::vector<EdgeAccessor> out_edges; + + switch (req.direction) { + case msgs::EdgeDirection::OUT: { + auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types); + if (out_edges_result.HasError()) { + spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + out_edges = + maybe_filter_based_on_edge_uniquness(std::move(out_edges_result.GetValue()), msgs::EdgeDirection::OUT); + break; + } + case msgs::EdgeDirection::IN: { + auto in_edges_result = v_acc->InEdges(View::NEW, edge_types); + if (in_edges_result.HasError()) { + spdlog::debug( + "Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}"[req.transaction_id + .logical_id]); + return std::nullopt; + } + in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN); + break; + } + case msgs::EdgeDirection::BOTH: { + auto in_edges_result = v_acc->InEdges(View::NEW, edge_types); + if (in_edges_result.HasError()) { + spdlog::debug("Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN); + auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types); + if (out_edges_result.HasError()) { + spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + out_edges = + maybe_filter_based_on_edge_uniquness(std::move(out_edges_result.GetValue()), msgs::EdgeDirection::OUT); + break; + } + } + return std::array<std::vector<EdgeAccessor>, 2>{std::move(in_edges), std::move(out_edges)}; +} + +template <bool are_in_edges> +bool FillEdges(const std::vector<EdgeAccessor> &edges, msgs::ExpandOneResultRow &row, const EdgeFiller &edge_filler) { + for (const auto &edge : edges) { + if (!edge_filler(edge, are_in_edges, row)) { + return false; + } + } + + return true; +} + +}; // namespace + +std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc, + const std::vector<PropertyId> &props, + View view) { + std::map<PropertyId, Value> ret; + + for (const auto &prop : props) { + auto result = acc.GetProperty(prop, view); + if (result.HasError()) { + spdlog::debug("Encountered an Error while trying to get a vertex property."); + return std::nullopt; + } + auto &value = result.GetValue(); + ret.emplace(std::make_pair(prop, FromPropertyValueToValue(std::move(value)))); + } + + return ret; +} + +std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexAccessor &v_acc, + const std::vector<std::string> &expressions, + std::string_view node_name) { + std::vector<TypedValue> evaluated_expressions; + evaluated_expressions.reserve(expressions.size()); + + std::transform(expressions.begin(), expressions.end(), std::back_inserter(evaluated_expressions), + [&dba, &v_acc, &node_name](const auto &expression) { + return ComputeExpression(dba, v_acc, std::nullopt, expression, node_name, ""); + }); + + return evaluated_expressions; +} + +std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view, + const Schemas::Schema &schema) { + std::map<PropertyId, Value> ret; + auto props = acc.Properties(view); + if (props.HasError()) { + spdlog::debug("Encountered an error while trying to get vertex properties."); + return std::nullopt; + } + + auto &properties = props.GetValue(); + std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()), + [](std::pair<const PropertyId, PropertyValue> &pair) { + return std::make_pair(pair.first, FromPropertyValueToValue(std::move(pair.second))); + }); + properties.clear(); + + auto pks = PrimaryKeysFromAccessor(acc, view, schema); + if (pks) { + ret.merge(*pks); + } + + return ret; +} + +EdgeUniqunessFunction InitializeEdgeUniqunessFunction(bool only_unique_neighbor_rows) { + // Functions to select connecting edges based on uniquness + EdgeUniqunessFunction maybe_filter_based_on_edge_uniquness; + + if (only_unique_neighbor_rows) { + maybe_filter_based_on_edge_uniquness = [](EdgeAccessors &&edges, + msgs::EdgeDirection edge_direction) -> EdgeAccessors { + std::function<bool(std::set<const storage::v3::VertexId *, VertexIdCmpr> &, const storage::v3::EdgeAccessor &)> + is_edge_unique; + switch (edge_direction) { + case msgs::EdgeDirection::OUT: { + is_edge_unique = [](std::set<const storage::v3::VertexId *, VertexIdCmpr> &other_vertex_set, + const storage::v3::EdgeAccessor &edge_acc) { + auto [it, insertion_happened] = other_vertex_set.insert(&edge_acc.ToVertex()); + return insertion_happened; + }; + break; + } + case msgs::EdgeDirection::IN: { + is_edge_unique = [](std::set<const storage::v3::VertexId *, VertexIdCmpr> &other_vertex_set, + const storage::v3::EdgeAccessor &edge_acc) { + auto [it, insertion_happened] = other_vertex_set.insert(&edge_acc.FromVertex()); + return insertion_happened; + }; + break; + } + case msgs::EdgeDirection::BOTH: + MG_ASSERT(false, "This is should never happen, msgs::EdgeDirection::BOTH should not be passed here."); + } + + EdgeAccessors ret; + std::set<const storage::v3::VertexId *, VertexIdCmpr> other_vertex_set; + + for (const auto &edge : edges) { + if (is_edge_unique(other_vertex_set, edge)) { + ret.emplace_back(edge); + } + } + + return ret; + }; + } else { + maybe_filter_based_on_edge_uniquness = + [](EdgeAccessors &&edges, msgs::EdgeDirection /*edge_direction*/) -> EdgeAccessors { return std::move(edges); }; + } + + return maybe_filter_based_on_edge_uniquness; +} + +EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) { + EdgeFiller edge_filler; + + if (!req.edge_properties) { + edge_filler = [transaction_id = req.transaction_id.logical_id](const EdgeAccessor &edge, const bool is_in_edge, + msgs::ExpandOneResultRow &result_row) -> bool { + auto properties_results = edge.Properties(View::NEW); + if (properties_results.HasError()) { + spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}", transaction_id); + return false; + } + + std::map<PropertyId, msgs::Value> value_properties; + for (auto &[prop_key, prop_val] : properties_results.GetValue()) { + value_properties.insert(std::make_pair(prop_key, FromPropertyValueToValue(std::move(prop_val)))); + } + using EdgeWithAllProperties = msgs::ExpandOneResultRow::EdgeWithAllProperties; + EdgeWithAllProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()}, + edge.Gid().AsUint(), std::move(value_properties)}; + if (is_in_edge) { + result_row.in_edges_with_all_properties.push_back(std::move(edges)); + } else { + result_row.out_edges_with_all_properties.push_back(std::move(edges)); + } + return true; + }; + } else { + // TODO(gvolfing) - do we want to set the action_successful here? + edge_filler = [&req](const EdgeAccessor &edge, const bool is_in_edge, + msgs::ExpandOneResultRow &result_row) -> bool { + std::vector<msgs::Value> value_properties; + value_properties.reserve(req.edge_properties.value().size()); + for (const auto &edge_prop : req.edge_properties.value()) { + auto property_result = edge.GetProperty(edge_prop, View::NEW); + if (property_result.HasError()) { + spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}", + req.transaction_id.logical_id); + return false; + } + value_properties.emplace_back(FromPropertyValueToValue(std::move(property_result.GetValue()))); + } + using EdgeWithSpecificProperties = msgs::ExpandOneResultRow::EdgeWithSpecificProperties; + EdgeWithSpecificProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()}, + edge.Gid().AsUint(), std::move(value_properties)}; + if (is_in_edge) { + result_row.in_edges_with_specific_properties.push_back(std::move(edges)); + } else { + result_row.out_edges_with_specific_properties.push_back(std::move(edges)); + } + return true; + }; + } + + return edge_filler; +} + +bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &filters, + const std::string_view node_name) { + return std::ranges::all_of(filters, [&node_name, &dba, &v_acc](const auto &filter_expr) { + auto res = ComputeExpression(dba, v_acc, std::nullopt, filter_expr, node_name, ""); + return res.IsBool() && res.ValueBool(); }); - return ordered; +} + +std::optional<msgs::ExpandOneResultRow> GetExpandOneResult( + Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, + const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler, + const Schemas::Schema &schema) { + /// Fill up source vertex + const auto primary_key = ConvertPropertyVector(src_vertex.second); + auto v_acc = acc.FindVertex(primary_key, View::NEW); + + msgs::Vertex source_vertex = {.id = src_vertex}; + if (const auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); maybe_secondary_labels) { + source_vertex.labels = *maybe_secondary_labels; + } else { + return std::nullopt; + } + + auto src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema); + + if (!src_vertex_properties) { + return std::nullopt; + } + + /// Fill up connecting edges + auto fill_up_connecting_edges = FillUpConnectingEdges(v_acc, req, maybe_filter_based_on_edge_uniquness); + if (!fill_up_connecting_edges) { + return std::nullopt; + } + + auto [in_edges, out_edges] = fill_up_connecting_edges.value(); + + msgs::ExpandOneResultRow result_row; + result_row.src_vertex = std::move(source_vertex); + result_row.src_vertex_properties = std::move(*src_vertex_properties); + static constexpr bool kInEdges = true; + static constexpr bool kOutEdges = false; + if (!in_edges.empty() && !FillEdges<kInEdges>(in_edges, result_row, edge_filler)) { + return std::nullopt; + } + if (!out_edges.empty() && !FillEdges<kOutEdges>(out_edges, result_row, edge_filler)) { + return std::nullopt; + } + + return result_row; +} + +std::optional<msgs::ExpandOneResultRow> GetExpandOneResult( + VertexAccessor v_acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, + std::vector<EdgeAccessor> in_edge_accessors, std::vector<EdgeAccessor> out_edge_accessors, + const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler, + const Schemas::Schema &schema) { + /// Fill up source vertex + msgs::Vertex source_vertex = {.id = src_vertex}; + if (const auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); maybe_secondary_labels) { + source_vertex.labels = *maybe_secondary_labels; + } else { + return std::nullopt; + } + + /// Fill up source vertex properties + auto src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema); + if (!src_vertex_properties) { + return std::nullopt; + } + + /// Fill up connecting edges + auto in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edge_accessors), msgs::EdgeDirection::IN); + auto out_edges = maybe_filter_based_on_edge_uniquness(std::move(out_edge_accessors), msgs::EdgeDirection::OUT); + + msgs::ExpandOneResultRow result_row; + result_row.src_vertex = std::move(source_vertex); + result_row.src_vertex_properties = std::move(*src_vertex_properties); + static constexpr bool kInEdges = true; + static constexpr bool kOutEdges = false; + if (!in_edges.empty() && !FillEdges<kInEdges>(in_edges, result_row, edge_filler)) { + return std::nullopt; + } + if (!out_edges.empty() && !FillEdges<kOutEdges>(out_edges, result_row, edge_filler)) { + return std::nullopt; + } + + return result_row; } VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable, - const std::vector<PropertyValue> &start_ids, const View view) { + const std::vector<PropertyValue> &primary_key, const View view) { auto it = vertex_iterable.begin(); while (it != vertex_iterable.end()) { - if (const auto &vertex = *it; start_ids <= vertex.PrimaryKey(view).GetValue()) { + if (const auto &vertex = *it; primary_key <= vertex.PrimaryKey(view).GetValue()) { break; } ++it; @@ -68,17 +449,79 @@ VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_itera return it; } -std::vector<Element>::const_iterator GetStartOrderedElementsIterator(const std::vector<Element> &ordered_elements, - const std::vector<PropertyValue> &start_ids, - const View view) { +std::vector<Element<VertexAccessor>>::const_iterator GetStartOrderedElementsIterator( + const std::vector<Element<VertexAccessor>> &ordered_elements, const std::vector<PropertyValue> &primary_key, + const View view) { for (auto it = ordered_elements.begin(); it != ordered_elements.end(); ++it) { - if (const auto &vertex = it->vertex_acc; start_ids <= vertex.PrimaryKey(view).GetValue()) { + if (const auto &vertex = it->object_acc; primary_key <= vertex.PrimaryKey(view).GetValue()) { return it; } } return ordered_elements.end(); } +std::array<std::vector<EdgeAccessor>, 2> GetEdgesFromVertex(const VertexAccessor &vertex_accessor, + const msgs::EdgeDirection direction) { + std::vector<EdgeAccessor> in_edges; + std::vector<EdgeAccessor> out_edges; + + switch (direction) { + case memgraph::msgs::EdgeDirection::IN: { + auto edges = vertex_accessor.InEdges(View::OLD); + if (edges.HasValue()) { + in_edges = edges.GetValue(); + } + } + case memgraph::msgs::EdgeDirection::OUT: { + auto edges = vertex_accessor.OutEdges(View::OLD); + if (edges.HasValue()) { + out_edges = edges.GetValue(); + } + } + case memgraph::msgs::EdgeDirection::BOTH: { + auto maybe_in_edges = vertex_accessor.InEdges(View::OLD); + auto maybe_out_edges = vertex_accessor.OutEdges(View::OLD); + std::vector<EdgeAccessor> edges; + if (maybe_in_edges.HasValue()) { + in_edges = maybe_in_edges.GetValue(); + } + if (maybe_out_edges.HasValue()) { + out_edges = maybe_out_edges.GetValue(); + } + } + } + + return std::array<std::vector<EdgeAccessor>, 2>{std::move(in_edges), std::move(out_edges)}; +} + +std::vector<Element<EdgeAccessor>> OrderByEdges(DbAccessor &dba, std::vector<EdgeAccessor> &iterable, + std::vector<msgs::OrderBy> &order_by_edges, + const VertexAccessor &vertex_acc) { + std::vector<Ordering> ordering; + ordering.reserve(order_by_edges.size()); + std::transform(order_by_edges.begin(), order_by_edges.end(), std::back_inserter(ordering), + [](const auto &order_by) { return ConvertMsgsOrderByToOrdering(order_by.direction); }); + + std::vector<Element<EdgeAccessor>> ordered; + for (auto it = iterable.begin(); it != iterable.end(); ++it) { + std::vector<TypedValue> properties_order_by; + properties_order_by.reserve(order_by_edges.size()); + std::transform(order_by_edges.begin(), order_by_edges.end(), std::back_inserter(properties_order_by), + [&dba, &vertex_acc, &it](const auto &order_by) { + return ComputeExpression(dba, vertex_acc, *it, order_by.expression.expression, + expr::identifier_node_symbol, expr::identifier_edge_symbol); + }); + + ordered.push_back({std::move(properties_order_by), *it}); + } + + auto compare_typed_values = TypedValueVectorCompare(ordering); + std::sort(ordered.begin(), ordered.end(), [compare_typed_values](const auto &pair1, const auto &pair2) { + return compare_typed_values(pair1.properties_order_by, pair2.properties_order_by); + }); + return ordered; +} + void LogResultError(const ResultErrorType &error, const std::string_view action) { std::visit( [action]<typename T>(T &&error) { diff --git a/src/storage/v3/request_helper.hpp b/src/storage/v3/request_helper.hpp index 1d26e338d..f6c7c841a 100644 --- a/src/storage/v3/request_helper.hpp +++ b/src/storage/v3/request_helper.hpp @@ -9,15 +9,28 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#pragma once + #include <vector> -#include "ast/ast.hpp" +#include "query/v2/requests.hpp" +#include "storage/v3/bindings/ast/ast.hpp" +#include "storage/v3/bindings/pretty_print_ast_to_original_expression.hpp" #include "storage/v3/bindings/typed_value.hpp" +#include "storage/v3/edge_accessor.hpp" +#include "storage/v3/expr.hpp" #include "storage/v3/shard.hpp" #include "storage/v3/vertex_accessor.hpp" #include "utils/template_utils.hpp" namespace memgraph::storage::v3 { +using EdgeAccessors = std::vector<storage::v3::EdgeAccessor>; +using EdgeUniqunessFunction = std::function<EdgeAccessors(EdgeAccessors &&, msgs::EdgeDirection)>; +using EdgeFiller = std::function<bool(const EdgeAccessor &edge, bool is_in_edge, msgs::ExpandOneResultRow &result_row)>; +using msgs::Value; + +template <typename T> +concept ObjectAccessor = utils::SameAsAnyOf<T, VertexAccessor, EdgeAccessor>; inline bool TypedValueCompare(const TypedValue &a, const TypedValue &b) { // in ordering null comes after everything else @@ -73,6 +86,17 @@ inline bool TypedValueCompare(const TypedValue &a, const TypedValue &b) { } } +inline Ordering ConvertMsgsOrderByToOrdering(msgs::OrderingDirection ordering) { + switch (ordering) { + case memgraph::msgs::OrderingDirection::ASCENDING: + return memgraph::storage::v3::Ordering::ASC; + case memgraph::msgs::OrderingDirection::DESCENDING: + return memgraph::storage::v3::Ordering::DESC; + default: + LOG_FATAL("Unknown ordering direction"); + } +} + class TypedValueVectorCompare final { public: explicit TypedValueVectorCompare(const std::vector<Ordering> &ordering) : ordering_(ordering) {} @@ -100,20 +124,86 @@ class TypedValueVectorCompare final { std::vector<Ordering> ordering_; }; +template <ObjectAccessor TObjectAccessor> struct Element { std::vector<TypedValue> properties_order_by; - VertexAccessor vertex_acc; + TObjectAccessor object_acc; }; -std::vector<Element> OrderByElements(Shard::Accessor &acc, DbAccessor &dba, VerticesIterable &vertices_iterable, - std::vector<msgs::OrderBy> &order_bys); +template <typename T> +concept VerticesIt = utils::SameAsAnyOf<T, VerticesIterable, std::vector<VertexAccessor>>; -VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable, - const std::vector<PropertyValue> &start_ids, View view); +template <VerticesIt TIterable> +std::vector<Element<VertexAccessor>> OrderByVertices(DbAccessor &dba, TIterable &iterable, + std::vector<msgs::OrderBy> &order_by_vertices) { + std::vector<Ordering> ordering; + ordering.reserve(order_by_vertices.size()); + std::transform(order_by_vertices.begin(), order_by_vertices.end(), std::back_inserter(ordering), + [](const auto &order_by) { return ConvertMsgsOrderByToOrdering(order_by.direction); }); -std::vector<Element>::const_iterator GetStartOrderedElementsIterator(const std::vector<Element> &ordered_elements, - const std::vector<PropertyValue> &start_ids, - View view); + std::vector<Element<VertexAccessor>> ordered; + for (auto it = iterable.begin(); it != iterable.end(); ++it) { + std::vector<TypedValue> properties_order_by; + properties_order_by.reserve(order_by_vertices.size()); + + std::transform(order_by_vertices.begin(), order_by_vertices.end(), std::back_inserter(properties_order_by), + [&dba, &it](const auto &order_by) { + return ComputeExpression(dba, *it, std::nullopt /*e_acc*/, order_by.expression.expression, + expr::identifier_node_symbol, expr::identifier_edge_symbol); + }); + + ordered.push_back({std::move(properties_order_by), *it}); + } + + auto compare_typed_values = TypedValueVectorCompare(ordering); + std::sort(ordered.begin(), ordered.end(), [compare_typed_values](const auto &pair1, const auto &pair2) { + return compare_typed_values(pair1.properties_order_by, pair2.properties_order_by); + }); + return ordered; +} void LogResultError(const ResultErrorType &error, std::string_view action); + +std::vector<Element<EdgeAccessor>> OrderByEdges(DbAccessor &dba, std::vector<EdgeAccessor> &iterable, + std::vector<msgs::OrderBy> &order_by_edges, + const VertexAccessor &vertex_acc); + +VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable, + const std::vector<PropertyValue> &primary_key, View view); + +std::vector<Element<VertexAccessor>>::const_iterator GetStartOrderedElementsIterator( + const std::vector<Element<VertexAccessor>> &ordered_elements, const std::vector<PropertyValue> &primary_key, + View view); + +std::array<std::vector<EdgeAccessor>, 2> GetEdgesFromVertex(const VertexAccessor &vertex_accessor, + msgs::EdgeDirection direction); + +bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &filters, + std::string_view node_name); + +std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexAccessor &v_acc, + const std::vector<std::string> &expressions, + std::string_view node_name); + +std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc, + const std::vector<PropertyId> &props, + View view); + +std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view, + const Schemas::Schema &schema); + +EdgeUniqunessFunction InitializeEdgeUniqunessFunction(bool only_unique_neighbor_rows); + +EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req); + +std::optional<msgs::ExpandOneResultRow> GetExpandOneResult( + Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, + const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler, + const Schemas::Schema &schema); + +std::optional<msgs::ExpandOneResultRow> GetExpandOneResult( + VertexAccessor v_acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, + std::vector<EdgeAccessor> in_edge_accessors, std::vector<EdgeAccessor> out_edge_accessors, + const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler, + const Schemas::Schema &schema); } // namespace memgraph::storage::v3 diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 1f70f25a5..f2c6504b7 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -41,431 +41,21 @@ #include "storage/v3/vertex_accessor.hpp" #include "storage/v3/vertex_id.hpp" #include "storage/v3/view.hpp" +#include "utils/logging.hpp" namespace memgraph::storage::v3 { using msgs::Label; using msgs::PropertyId; using msgs::Value; +using conversions::ConvertPropertyMap; using conversions::ConvertPropertyVector; using conversions::ConvertValueVector; +using conversions::FromMap; using conversions::FromPropertyValueToValue; using conversions::ToMsgsVertexId; using conversions::ToPropertyValue; -namespace { -namespace msgs = msgs; - -using AllEdgePropertyDataStructure = std::map<PropertyId, msgs::Value>; -using SpecificEdgePropertyDataStructure = std::vector<msgs::Value>; - -using AllEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, AllEdgePropertyDataStructure>; -using SpecificEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, SpecificEdgePropertyDataStructure>; - -using SpecificEdgePropertiesVector = std::vector<SpecificEdgeProperties>; -using AllEdgePropertiesVector = std::vector<AllEdgeProperties>; - -using EdgeAccessors = std::vector<storage::v3::EdgeAccessor>; - -using EdgeFiller = std::function<bool(const EdgeAccessor &edge, bool is_in_edge, msgs::ExpandOneResultRow &result_row)>; -using EdgeUniquenessFunction = std::function<EdgeAccessors(EdgeAccessors &&, msgs::EdgeDirection)>; - -struct VertexIdCmpr { - bool operator()(const storage::v3::VertexId *lhs, const storage::v3::VertexId *rhs) const { return *lhs < *rhs; } -}; - -std::vector<std::pair<PropertyId, PropertyValue>> ConvertPropertyMap( - std::vector<std::pair<PropertyId, Value>> &&properties) { - std::vector<std::pair<PropertyId, PropertyValue>> ret; - ret.reserve(properties.size()); - - std::transform(std::make_move_iterator(properties.begin()), std::make_move_iterator(properties.end()), - std::back_inserter(ret), [](std::pair<PropertyId, Value> &&property) { - return std::make_pair(property.first, ToPropertyValue(std::move(property.second))); - }); - - return ret; -} - -std::vector<std::pair<PropertyId, Value>> FromMap(const std::map<PropertyId, Value> &properties) { - std::vector<std::pair<PropertyId, Value>> ret; - ret.reserve(properties.size()); - - std::transform(properties.begin(), properties.end(), std::back_inserter(ret), - [](const auto &property) { return std::make_pair(property.first, property.second); }); - - return ret; -} - -std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc, - const std::vector<PropertyId> &props, - View view) { - std::map<PropertyId, Value> ret; - - for (const auto &prop : props) { - auto result = acc.GetProperty(prop, view); - if (result.HasError()) { - spdlog::debug("Encountered an Error while trying to get a vertex property."); - return std::nullopt; - } - auto &value = result.GetValue(); - ret.emplace(std::make_pair(prop, FromPropertyValueToValue(std::move(value)))); - } - - return ret; -} - -std::optional<std::map<PropertyId, Value>> PrimaryKeysFromAccessor(const VertexAccessor &acc, View view, - const Schemas::Schema *schema) { - std::map<PropertyId, Value> ret; - auto props = acc.Properties(view); - auto maybe_pk = acc.PrimaryKey(view); - if (maybe_pk.HasError()) { - spdlog::debug("Encountered an error while trying to get vertex primary key."); - return std::nullopt; - } - auto &pk = maybe_pk.GetValue(); - MG_ASSERT(schema->second.size() == pk.size(), "PrimaryKey size does not match schema!"); - for (size_t i{0}; i < schema->second.size(); ++i) { - ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(std::move(pk[i]))); - } - - return ret; -} - -std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view, - const Schemas::Schema *schema) { - std::map<PropertyId, Value> ret; - auto props = acc.Properties(view); - if (props.HasError()) { - spdlog::debug("Encountered an error while trying to get vertex properties."); - return std::nullopt; - } - - auto &properties = props.GetValue(); - std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()), - [](std::pair<const PropertyId, PropertyValue> &pair) { - return std::make_pair(pair.first, FromPropertyValueToValue(std::move(pair.second))); - }); - properties.clear(); - - auto pks = PrimaryKeysFromAccessor(acc, view, schema); - if (pks) { - ret.merge(*pks); - } - - return ret; -} - -bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &filters, - const std::string_view node_name) { - return std::ranges::all_of(filters, [&node_name, &dba, &v_acc](const auto &filter_expr) { - auto res = ComputeExpression(dba, v_acc, std::nullopt, filter_expr, node_name, ""); - return res.IsBool() && res.ValueBool(); - }); -} - -std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexAccessor &v_acc, - const std::vector<std::string> &expressions, - std::string_view node_name) { - std::vector<TypedValue> evaluated_expressions; - evaluated_expressions.reserve(expressions.size()); - - std::transform(expressions.begin(), expressions.end(), std::back_inserter(evaluated_expressions), - [&dba, &v_acc, &node_name](const auto &expression) { - return ComputeExpression(dba, v_acc, std::nullopt, expression, node_name, ""); - }); - - return evaluated_expressions; -} - -std::optional<std::vector<msgs::Label>> FillUpSourceVertexSecondaryLabels(const std::optional<VertexAccessor> &v_acc, - const msgs::ExpandOneRequest &req) { - auto secondary_labels = v_acc->Labels(View::NEW); - if (secondary_labels.HasError()) { - spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}", - req.transaction_id.logical_id); - return std::nullopt; - } - - auto &sec_labels = secondary_labels.GetValue(); - std::vector<msgs::Label> msgs_secondary_labels; - msgs_secondary_labels.reserve(sec_labels.size()); - - std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(msgs_secondary_labels), - [](auto label_id) { return msgs::Label{.id = label_id}; }); - - return msgs_secondary_labels; -} - -std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std::optional<VertexAccessor> &v_acc, - const msgs::ExpandOneRequest &req, - storage::v3::View view, - const Schemas::Schema *schema) { - std::map<PropertyId, Value> src_vertex_properties; - - if (!req.src_vertex_properties) { - auto props = v_acc->Properties(View::NEW); - if (props.HasError()) { - spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}", - req.transaction_id.logical_id); - return std::nullopt; - } - - for (auto &[key, val] : props.GetValue()) { - src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(std::move(val)))); - } - auto pks = PrimaryKeysFromAccessor(*v_acc, view, schema); - if (pks) { - src_vertex_properties.merge(*pks); - } - - } else if (req.src_vertex_properties.value().empty()) { - // NOOP - } else { - for (const auto &prop : req.src_vertex_properties.value()) { - auto prop_val = v_acc->GetProperty(prop, View::OLD); - if (prop_val.HasError()) { - spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}", - req.transaction_id.logical_id); - return std::nullopt; - } - src_vertex_properties.insert(std::make_pair(prop, FromPropertyValueToValue(std::move(prop_val.GetValue())))); - } - } - - return src_vertex_properties; -} - -std::optional<std::array<std::vector<EdgeAccessor>, 2>> FillUpConnectingEdges( - const std::optional<VertexAccessor> &v_acc, const msgs::ExpandOneRequest &req, - const EdgeUniquenessFunction &maybe_filter_based_on_edge_uniquness) { - std::vector<EdgeTypeId> edge_types{}; - edge_types.reserve(req.edge_types.size()); - std::transform(req.edge_types.begin(), req.edge_types.end(), std::back_inserter(edge_types), - [](const msgs::EdgeType &edge_type) { return edge_type.id; }); - - std::vector<EdgeAccessor> in_edges; - std::vector<EdgeAccessor> out_edges; - - switch (req.direction) { - case msgs::EdgeDirection::OUT: { - auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types); - if (out_edges_result.HasError()) { - spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}", - req.transaction_id.logical_id); - return std::nullopt; - } - out_edges = - maybe_filter_based_on_edge_uniquness(std::move(out_edges_result.GetValue()), msgs::EdgeDirection::OUT); - break; - } - case msgs::EdgeDirection::IN: { - auto in_edges_result = v_acc->InEdges(View::NEW, edge_types); - if (in_edges_result.HasError()) { - spdlog::debug( - "Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}"[req.transaction_id - .logical_id]); - return std::nullopt; - } - in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN); - break; - } - case msgs::EdgeDirection::BOTH: { - auto in_edges_result = v_acc->InEdges(View::NEW, edge_types); - if (in_edges_result.HasError()) { - spdlog::debug("Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}", - req.transaction_id.logical_id); - return std::nullopt; - } - in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN); - auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types); - if (out_edges_result.HasError()) { - spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}", - req.transaction_id.logical_id); - return std::nullopt; - } - out_edges = - maybe_filter_based_on_edge_uniquness(std::move(out_edges_result.GetValue()), msgs::EdgeDirection::OUT); - break; - } - } - return std::array<std::vector<EdgeAccessor>, 2>{in_edges, out_edges}; -} - -using AllEdgePropertyDataStructure = std::map<PropertyId, msgs::Value>; -using SpecificEdgePropertyDataStructure = std::vector<msgs::Value>; - -using AllEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, AllEdgePropertyDataStructure>; -using SpecificEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, SpecificEdgePropertyDataStructure>; - -using SpecificEdgePropertiesVector = std::vector<SpecificEdgeProperties>; -using AllEdgePropertiesVector = std::vector<AllEdgeProperties>; - -using EdgeFiller = std::function<bool(const EdgeAccessor &edge, bool is_in_edge, msgs::ExpandOneResultRow &result_row)>; - -template <bool are_in_edges> -bool FillEdges(const std::vector<EdgeAccessor> &edges, msgs::ExpandOneResultRow &row, const EdgeFiller &edge_filler) { - for (const auto &edge : edges) { - if (!edge_filler(edge, are_in_edges, row)) { - return false; - } - } - - return true; -} - -std::optional<msgs::ExpandOneResultRow> GetExpandOneResult( - Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, - const EdgeUniquenessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler, - const Schemas::Schema *schema) { - /// Fill up source vertex - const auto primary_key = ConvertPropertyVector(src_vertex.second); - auto v_acc = acc.FindVertex(primary_key, View::NEW); - - msgs::Vertex source_vertex = {.id = src_vertex}; - if (const auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); maybe_secondary_labels) { - source_vertex.labels = *maybe_secondary_labels; - } else { - return std::nullopt; - } - - std::optional<std::map<PropertyId, Value>> src_vertex_properties; - src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema); - - if (!src_vertex_properties) { - return std::nullopt; - } - - /// Fill up connecting edges - auto fill_up_connecting_edges = FillUpConnectingEdges(v_acc, req, maybe_filter_based_on_edge_uniquness); - if (!fill_up_connecting_edges) { - return std::nullopt; - } - - auto [in_edges, out_edges] = fill_up_connecting_edges.value(); - - msgs::ExpandOneResultRow result_row; - result_row.src_vertex = std::move(source_vertex); - result_row.src_vertex_properties = std::move(*src_vertex_properties); - static constexpr bool kInEdges = true; - static constexpr bool kOutEdges = false; - if (!in_edges.empty() && !FillEdges<kInEdges>(in_edges, result_row, edge_filler)) { - return std::nullopt; - } - if (!out_edges.empty() && !FillEdges<kOutEdges>(out_edges, result_row, edge_filler)) { - return std::nullopt; - } - - return result_row; -} - -EdgeUniquenessFunction InitializeEdgeUniquenessFunction(bool only_unique_neighbor_rows) { - // Functions to select connecting edges based on uniquness - EdgeUniquenessFunction maybe_filter_based_on_edge_uniquness; - - if (only_unique_neighbor_rows) { - maybe_filter_based_on_edge_uniquness = [](EdgeAccessors &&edges, - msgs::EdgeDirection edge_direction) -> EdgeAccessors { - std::function<bool(std::set<const storage::v3::VertexId *, VertexIdCmpr> &, const storage::v3::EdgeAccessor &)> - is_edge_unique; - switch (edge_direction) { - case msgs::EdgeDirection::OUT: { - is_edge_unique = [](std::set<const storage::v3::VertexId *, VertexIdCmpr> &other_vertex_set, - const storage::v3::EdgeAccessor &edge_acc) { - auto [it, insertion_happened] = other_vertex_set.insert(&edge_acc.ToVertex()); - return insertion_happened; - }; - break; - } - case msgs::EdgeDirection::IN: { - is_edge_unique = [](std::set<const storage::v3::VertexId *, VertexIdCmpr> &other_vertex_set, - const storage::v3::EdgeAccessor &edge_acc) { - auto [it, insertion_happened] = other_vertex_set.insert(&edge_acc.FromVertex()); - return insertion_happened; - }; - break; - } - case msgs::EdgeDirection::BOTH: - MG_ASSERT(false, "This is should never happen, msgs::EdgeDirection::BOTH should not be passed here."); - } - - EdgeAccessors ret; - std::set<const storage::v3::VertexId *, VertexIdCmpr> other_vertex_set; - - for (const auto &edge : edges) { - if (is_edge_unique(other_vertex_set, edge)) { - ret.emplace_back(edge); - } - } - - return ret; - }; - } else { - maybe_filter_based_on_edge_uniquness = - [](EdgeAccessors &&edges, msgs::EdgeDirection /*edge_direction*/) -> EdgeAccessors { return std::move(edges); }; - } - - return maybe_filter_based_on_edge_uniquness; -} - -EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) { - EdgeFiller edge_filler; - - if (!req.edge_properties) { - edge_filler = [transaction_id = req.transaction_id.logical_id](const EdgeAccessor &edge, const bool is_in_edge, - msgs::ExpandOneResultRow &result_row) -> bool { - auto properties_results = edge.Properties(View::NEW); - if (properties_results.HasError()) { - spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}", transaction_id); - return false; - } - - std::map<PropertyId, msgs::Value> value_properties; - for (auto &[prop_key, prop_val] : properties_results.GetValue()) { - value_properties.insert(std::make_pair(prop_key, FromPropertyValueToValue(std::move(prop_val)))); - } - using EdgeWithAllProperties = msgs::ExpandOneResultRow::EdgeWithAllProperties; - EdgeWithAllProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()}, - edge.Gid().AsUint(), std::move(value_properties)}; - if (is_in_edge) { - result_row.in_edges_with_all_properties.push_back(std::move(edges)); - } else { - result_row.out_edges_with_all_properties.push_back(std::move(edges)); - } - return true; - }; - } else { - // TODO(gvolfing) - do we want to set the action_successful here? - edge_filler = [&req](const EdgeAccessor &edge, const bool is_in_edge, - msgs::ExpandOneResultRow &result_row) -> bool { - std::vector<msgs::Value> value_properties; - value_properties.reserve(req.edge_properties.value().size()); - for (const auto &edge_prop : req.edge_properties.value()) { - auto property_result = edge.GetProperty(edge_prop, View::NEW); - if (property_result.HasError()) { - spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}", - req.transaction_id.logical_id); - return false; - } - value_properties.emplace_back(FromPropertyValueToValue(std::move(property_result.GetValue()))); - } - using EdgeWithSpecificProperties = msgs::ExpandOneResultRow::EdgeWithSpecificProperties; - EdgeWithSpecificProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()}, - edge.Gid().AsUint(), std::move(value_properties)}; - if (is_in_edge) { - result_row.in_edges_with_specific_properties.push_back(std::move(edges)); - } else { - result_row.out_edges_with_specific_properties.push_back(std::move(edges)); - } - return true; - }; - } - - return edge_filler; -} - -}; // namespace msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) { auto acc = shard_->Access(req.transaction_id); @@ -475,7 +65,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) { /// TODO(gvolfing) Consider other methods than converting. Change either /// the way that the property map is stored in the messages, or the /// signature of CreateVertexAndValidate. - auto converted_property_map = ConvertPropertyMap(std::move(new_vertex.properties)); + auto converted_property_map = ConvertPropertyMap(new_vertex.properties); // TODO(gvolfing) make sure if this conversion is actually needed. std::vector<LabelId> converted_label_ids; @@ -764,7 +354,8 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) { found_props = CollectSpecificPropertiesFromAccessor(vertex, req.props_to_return.value(), view); } else { const auto *schema = shard_->GetSchema(shard_->PrimaryLabel()); - found_props = CollectAllPropertiesFromAccessor(vertex, view, schema); + MG_ASSERT(schema); + found_props = CollectAllPropertiesFromAccessor(vertex, view, *schema); } // TODO(gvolfing) -VERIFY- @@ -783,18 +374,18 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) { uint64_t sample_counter{0}; auto vertex_iterable = acc.Vertices(view); if (!req.order_bys.empty()) { - const auto ordered = OrderByElements(acc, dba, vertex_iterable, req.order_bys); + const auto ordered = OrderByVertices(dba, vertex_iterable, req.order_bys); // we are traversing Elements auto it = GetStartOrderedElementsIterator(ordered, start_id, View(req.storage_view)); for (; it != ordered.end(); ++it) { - emplace_scan_result(it->vertex_acc); + emplace_scan_result(it->object_acc); ++sample_counter; if (req.batch_limit && sample_counter == req.batch_limit) { // Reached the maximum specified batch size. // Get the next element before exiting. ++it; if (it != ordered.end()) { - const auto &next_vertex = it->vertex_acc; + const auto &next_vertex = it->object_acc; next_start_id = ConstructValueVertex(next_vertex, view).vertex_v.id; } @@ -835,10 +426,14 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) { bool action_successful = true; std::vector<msgs::ExpandOneResultRow> results; + const auto batch_limit = req.limit; + auto dba = DbAccessor{&acc}; - auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniquenessFunction(req.only_unique_neighbor_rows); + auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows); auto edge_filler = InitializeEdgeFillerFunction(req); + std::vector<VertexAccessor> vertex_accessors; + vertex_accessors.reserve(req.src_vertices.size()); for (auto &src_vertex : req.src_vertices) { // Get Vertex acc auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW); @@ -848,24 +443,77 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) { req.transaction_id.logical_id); break; } - if (!req.filters.empty()) { // NOTE - DbAccessor might get removed in the future. - auto dba = DbAccessor{&acc}; const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol); if (!eval) { continue; } } - auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler, - shard_->GetSchema(shard_->PrimaryLabel())); - if (!result) { + vertex_accessors.emplace_back(src_vertex_acc_opt.value()); + } + + if (!req.order_by_vertices.empty()) { + // Can we do differently to avoid this? We need OrderByElements but currently it returns vector<Element>, so this + // workaround is here to avoid more duplication later + auto local_sorted_vertices = OrderByVertices(dba, vertex_accessors, req.order_by_vertices); + vertex_accessors.clear(); + std::transform(local_sorted_vertices.begin(), local_sorted_vertices.end(), std::back_inserter(vertex_accessors), + [](auto &vertex) { return vertex.object_acc; }); + } + + for (const auto &src_vertex_acc : vertex_accessors) { + auto label_id = src_vertex_acc.PrimaryLabel(View::NEW); + if (label_id.HasError()) { action_successful = false; break; } - results.emplace_back(result.value()); + auto primary_key = src_vertex_acc.PrimaryKey(View::NEW); + if (primary_key.HasError()) { + action_successful = false; + break; + } + + msgs::VertexId src_vertex(msgs::Label{.id = *label_id}, conversions::ConvertValueVector(*primary_key)); + + std::optional<msgs::ExpandOneResultRow> maybe_result; + + if (req.order_by_edges.empty()) { + const auto *schema = shard_->GetSchema(shard_->PrimaryLabel()); + MG_ASSERT(schema); + maybe_result = + GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler, *schema); + + } else { + auto [in_edge_accessors, out_edge_accessors] = GetEdgesFromVertex(src_vertex_acc, req.direction); + const auto in_ordered_edges = OrderByEdges(dba, in_edge_accessors, req.order_by_edges, src_vertex_acc); + const auto out_ordered_edges = OrderByEdges(dba, out_edge_accessors, req.order_by_edges, src_vertex_acc); + + std::vector<EdgeAccessor> in_edge_ordered_accessors; + std::transform(in_ordered_edges.begin(), in_ordered_edges.end(), std::back_inserter(in_edge_ordered_accessors), + [](const auto &edge_element) { return edge_element.object_acc; }); + + std::vector<EdgeAccessor> out_edge_ordered_accessors; + std::transform(out_ordered_edges.begin(), out_ordered_edges.end(), std::back_inserter(out_edge_ordered_accessors), + [](const auto &edge_element) { return edge_element.object_acc; }); + const auto *schema = shard_->GetSchema(shard_->PrimaryLabel()); + MG_ASSERT(schema); + maybe_result = + GetExpandOneResult(src_vertex_acc, src_vertex, req, in_edge_ordered_accessors, out_edge_ordered_accessors, + maybe_filter_based_on_edge_uniquness, edge_filler, *schema); + } + + if (!maybe_result) { + action_successful = false; + break; + } + + results.emplace_back(std::move(maybe_result.value())); + if (batch_limit.has_value() && results.size() >= batch_limit.value()) { + break; + } } msgs::ExpandOneResponse resp{}; diff --git a/src/storage/v3/value_conversions.hpp b/src/storage/v3/value_conversions.hpp index 05fd1394b..53374e1ed 100644 --- a/src/storage/v3/value_conversions.hpp +++ b/src/storage/v3/value_conversions.hpp @@ -129,4 +129,27 @@ inline std::vector<Value> ConvertValueVector(const std::vector<v3::PropertyValue inline msgs::VertexId ToMsgsVertexId(const v3::VertexId &vertex_id) { return {msgs::Label{vertex_id.primary_label}, ConvertValueVector(vertex_id.primary_key)}; } + +inline std::vector<std::pair<v3::PropertyId, v3::PropertyValue>> ConvertPropertyMap( + std::vector<std::pair<v3::PropertyId, Value>> &properties) { + std::vector<std::pair<v3::PropertyId, v3::PropertyValue>> ret; + ret.reserve(properties.size()); + + std::transform(std::make_move_iterator(properties.begin()), std::make_move_iterator(properties.end()), + std::back_inserter(ret), [](std::pair<v3::PropertyId, Value> &&property) { + return std::make_pair(property.first, ToPropertyValue(std::move(property.second))); + }); + + return ret; +} + +inline std::vector<std::pair<PropertyId, Value>> FromMap(const std::map<PropertyId, Value> &properties) { + std::vector<std::pair<PropertyId, Value>> ret; + ret.reserve(properties.size()); + + std::transform(properties.begin(), properties.end(), std::back_inserter(ret), + [](const auto &property) { return std::make_pair(property.first, property.second); }); + + return ret; +} } // namespace memgraph::storage::conversions diff --git a/tests/simulation/shard_rsm.cpp b/tests/simulation/shard_rsm.cpp index 41162ff19..0fa1a46ce 100644 --- a/tests/simulation/shard_rsm.cpp +++ b/tests/simulation/shard_rsm.cpp @@ -565,7 +565,8 @@ void AttemptToExpandOneWithWrongEdgeType(ShardClient &client, uint64_t src_verte std::optional<std::vector<PropertyId>> edge_properties = {}; std::vector<std::string> expressions; - std::optional<std::vector<msgs::OrderBy>> order_by = {}; + std::vector<msgs::OrderBy> order_by_vertices = {}; + std::vector<msgs::OrderBy> order_by_edges = {}; std::optional<size_t> limit = {}; std::vector<std::string> filter = {}; @@ -577,7 +578,8 @@ void AttemptToExpandOneWithWrongEdgeType(ShardClient &client, uint64_t src_verte expand_one_req.vertex_expressions = expressions; expand_one_req.filters = filter; expand_one_req.limit = limit; - expand_one_req.order_by = order_by; + expand_one_req.order_by_vertices = order_by_vertices; + expand_one_req.order_by_edges = order_by_edges; expand_one_req.src_vertex_properties = src_vertex_properties; expand_one_req.src_vertices = {src_vertex}; expand_one_req.transaction_id.logical_id = GetTransactionId(); @@ -620,7 +622,8 @@ void AttemptToExpandOneSimple(ShardClient &client, uint64_t src_vertex_val, Edge std::optional<std::vector<PropertyId>> edge_properties = {}; std::vector<std::string> expressions; - std::optional<std::vector<msgs::OrderBy>> order_by = {}; + std::vector<msgs::OrderBy> order_by_vertices = {}; + std::vector<msgs::OrderBy> order_by_edges = {}; std::optional<size_t> limit = {}; std::vector<std::string> filter = {}; @@ -632,7 +635,8 @@ void AttemptToExpandOneSimple(ShardClient &client, uint64_t src_vertex_val, Edge expand_one_req.vertex_expressions = expressions; expand_one_req.filters = filter; expand_one_req.limit = limit; - expand_one_req.order_by = order_by; + expand_one_req.order_by_vertices = order_by_vertices; + expand_one_req.order_by_edges = order_by_edges; expand_one_req.src_vertex_properties = src_vertex_properties; expand_one_req.src_vertices = {src_vertex}; expand_one_req.transaction_id.logical_id = GetTransactionId(); @@ -676,7 +680,8 @@ void AttemptToExpandOneWithUniqueEdges(ShardClient &client, uint64_t src_vertex_ std::optional<std::vector<PropertyId>> edge_properties = {}; std::vector<std::string> expressions; - std::optional<std::vector<msgs::OrderBy>> order_by = {}; + std::vector<msgs::OrderBy> order_by_vertices = {}; + std::vector<msgs::OrderBy> order_by_edges = {}; std::optional<size_t> limit = {}; std::vector<std::string> filter = {}; @@ -688,7 +693,8 @@ void AttemptToExpandOneWithUniqueEdges(ShardClient &client, uint64_t src_vertex_ expand_one_req.vertex_expressions = expressions; expand_one_req.filters = filter; expand_one_req.limit = limit; - expand_one_req.order_by = order_by; + expand_one_req.order_by_vertices = order_by_vertices; + expand_one_req.order_by_edges = order_by_edges; expand_one_req.src_vertex_properties = src_vertex_properties; expand_one_req.src_vertices = {src_vertex}; expand_one_req.only_unique_neighbor_rows = true; @@ -714,6 +720,88 @@ void AttemptToExpandOneWithUniqueEdges(ShardClient &client, uint64_t src_vertex_ } } +void AttemptToExpandOneLimitAndOrderBy(ShardClient &client, uint64_t src_vertex_val, uint64_t other_src_vertex_val, + EdgeTypeId edge_type_id) { + // Source vertex + msgs::Label label = {.id = get_primary_label()}; + auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val)); + auto other_src_vertex = std::make_pair(label, GetPrimaryKey(other_src_vertex_val)); + + // Edge type + auto edge_type = msgs::EdgeType{}; + edge_type.id = edge_type_id; + + // Edge direction + auto edge_direction = msgs::EdgeDirection::OUT; + + // Source Vertex properties to look for + std::optional<std::vector<PropertyId>> src_vertex_properties = {}; + + // Edge properties to look for + std::optional<std::vector<PropertyId>> edge_properties = {}; + + std::vector<msgs::OrderBy> order_by_vertices = { + {msgs::Expression{"MG_SYMBOL_NODE.prop1"}, msgs::OrderingDirection::ASCENDING}}; + std::vector<msgs::OrderBy> order_by_edges = { + {msgs::Expression{"MG_SYMBOL_EDGE.prop4"}, msgs::OrderingDirection::DESCENDING}}; + + size_t limit = 1; + std::vector<std::string> filters = {"MG_SYMBOL_NODE.prop1 != -1"}; + + msgs::ExpandOneRequest expand_one_req{}; + + expand_one_req.direction = edge_direction; + expand_one_req.edge_properties = edge_properties; + expand_one_req.edge_types = {edge_type}; + expand_one_req.filters = filters; + expand_one_req.limit = limit; + expand_one_req.order_by_vertices = order_by_vertices; + expand_one_req.order_by_edges = order_by_edges; + expand_one_req.src_vertex_properties = src_vertex_properties; + expand_one_req.src_vertices = {src_vertex, other_src_vertex}; + expand_one_req.transaction_id.logical_id = GetTransactionId(); + + while (true) { + auto read_res = client.SendReadRequest(expand_one_req); + if (read_res.HasError()) { + continue; + } + + auto write_response_result = read_res.GetValue(); + auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result); + + // We check that we do not have more results than the limit. Based on the data in the graph, we know that we should + // receive exactly limit responses. + auto expected_number_of_rows = std::min(expand_one_req.src_vertices.size(), limit); + MG_ASSERT(expected_number_of_rows == 1); + MG_ASSERT(write_response.result.size() == expected_number_of_rows); + + // We know there are 1 out-going edges from V1->V2 + // We know there are 10 out-going edges from V2->V3 + // Since we sort on prop1 and limit 1, we will have a single response + // with two edges corresponding to V1->V2 and V1->V3 + const auto expected_number_of_edges = 2; + MG_ASSERT(write_response.result[0].out_edges_with_all_properties.size() == expected_number_of_edges); + MG_ASSERT(write_response.result[0] + .out_edges_with_specific_properties.empty()); // We are not asking for specific properties + + // We also check that the vertices are ordered by prop1 DESC + + auto is_sorted = std::is_sorted(write_response.result.cbegin(), write_response.result.cend(), + [](const auto &vertex, const auto &other_vertex) { + const auto primary_key = vertex.src_vertex.id.second; + const auto other_primary_key = other_vertex.src_vertex.id.second; + + MG_ASSERT(primary_key.size() == 1); + MG_ASSERT(other_primary_key.size() == 1); + return primary_key[0].int_v > other_primary_key[0].int_v; + }); + + MG_ASSERT(is_sorted); + break; + } +} + void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uint64_t src_vertex_val, EdgeTypeId edge_type_id) { // Source vertex @@ -735,7 +823,8 @@ void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uin std::optional<std::vector<PropertyId>> edge_properties = {}; std::vector<std::string> expressions; - std::optional<std::vector<msgs::OrderBy>> order_by = {}; + std::vector<msgs::OrderBy> order_by_vertices = {}; + std::vector<msgs::OrderBy> order_by_edges = {}; std::optional<size_t> limit = {}; std::vector<std::string> filter = {}; @@ -747,7 +836,8 @@ void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uin expand_one_req.vertex_expressions = expressions; expand_one_req.filters = filter; expand_one_req.limit = limit; - expand_one_req.order_by = order_by; + expand_one_req.order_by_vertices = order_by_vertices; + expand_one_req.order_by_edges = order_by_edges; expand_one_req.src_vertex_properties = src_vertex_properties; expand_one_req.src_vertices = {src_vertex}; expand_one_req.transaction_id.logical_id = GetTransactionId(); @@ -795,7 +885,8 @@ void AttemptToExpandOneWithSpecifiedEdgeProperties(ShardClient &client, uint64_t std::optional<std::vector<PropertyId>> edge_properties = {specified_edge_prop}; std::vector<std::string> expressions; - std::optional<std::vector<msgs::OrderBy>> order_by = {}; + std::vector<msgs::OrderBy> order_by_vertices = {}; + std::vector<msgs::OrderBy> order_by_edges = {}; std::optional<size_t> limit = {}; std::vector<std::string> filter = {}; @@ -807,7 +898,8 @@ void AttemptToExpandOneWithSpecifiedEdgeProperties(ShardClient &client, uint64_t expand_one_req.vertex_expressions = expressions; expand_one_req.filters = filter; expand_one_req.limit = limit; - expand_one_req.order_by = order_by; + expand_one_req.order_by_vertices = order_by_vertices; + expand_one_req.order_by_edges = order_by_edges; expand_one_req.src_vertex_properties = src_vertex_properties; expand_one_req.src_vertices = {src_vertex}; expand_one_req.transaction_id.logical_id = GetTransactionId(); @@ -854,7 +946,8 @@ void AttemptToExpandOneWithFilters(ShardClient &client, uint64_t src_vertex_val, std::optional<std::vector<PropertyId>> edge_properties = {}; std::vector<std::string> expressions; - std::optional<std::vector<msgs::OrderBy>> order_by = {}; + std::vector<msgs::OrderBy> order_by_vertices = {}; + std::vector<msgs::OrderBy> order_by_edges = {}; std::optional<size_t> limit = {}; std::vector<std::string> filter = {}; @@ -866,7 +959,8 @@ void AttemptToExpandOneWithFilters(ShardClient &client, uint64_t src_vertex_val, expand_one_req.vertex_expressions = expressions; expand_one_req.filters = {filter_expr1}; expand_one_req.limit = limit; - expand_one_req.order_by = order_by; + expand_one_req.order_by_vertices = order_by_vertices; + expand_one_req.order_by_edges = order_by_edges; expand_one_req.src_vertex_properties = src_vertex_properties; expand_one_req.src_vertices = {src_vertex}; expand_one_req.transaction_id.logical_id = GetTransactionId(); @@ -1057,6 +1151,9 @@ void TestExpandOneGraphOne(ShardClient &client) { auto edge_prop_id = GetUniqueInteger(); auto edge_prop_val = GetUniqueInteger(); + std::vector<uint64_t> edges_ids(10); + std::generate(edges_ids.begin(), edges_ids.end(), GetUniqueInteger); + // (V1)-[edge_type_id]->(V2) MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_1, unique_prop_val_2, edge_gid_1, edge_prop_id, edge_prop_val, {edge_type_id})); @@ -1064,7 +1161,14 @@ void TestExpandOneGraphOne(ShardClient &client) { MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_1, unique_prop_val_3, edge_gid_2, edge_prop_id, edge_prop_val, {edge_type_id})); + // (V2)-[edge_type_id]->(V3) x 10 + std::for_each(edges_ids.begin(), edges_ids.end(), [&](const auto &edge_id) { + MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_2, unique_prop_val_3, edge_id, edge_prop_id, + edge_prop_val, {edge_type_id})); + }); + AttemptToExpandOneSimple(client, unique_prop_val_1, edge_type_id); + AttemptToExpandOneLimitAndOrderBy(client, unique_prop_val_1, unique_prop_val_2, edge_type_id); AttemptToExpandOneWithWrongEdgeType(client, unique_prop_val_1, wrong_edge_type_id); AttemptToExpandOneWithSpecifiedSrcVertexProperties(client, unique_prop_val_1, edge_type_id); AttemptToExpandOneWithSpecifiedEdgeProperties(client, unique_prop_val_1, edge_type_id, edge_prop_id);