diff --git a/src/storage/v3/request_helper.cpp b/src/storage/v3/request_helper.cpp index 591d7b6d7..b5c7ea2dd 100644 --- a/src/storage/v3/request_helper.cpp +++ b/src/storage/v3/request_helper.cpp @@ -16,8 +16,444 @@ #include "pretty_print_ast_to_original_expression.hpp" #include "storage/v3/bindings/db_accessor.hpp" #include "storage/v3/expr.hpp" +#include "storage/v3/value_conversions.hpp" namespace memgraph::storage::v3 { +using msgs::Label; +using msgs::PropertyId; + +using conversions::ConvertPropertyMap; +using conversions::ConvertPropertyVector; +using conversions::ConvertValueVector; +using conversions::FromPropertyValueToValue; +using conversions::ToMsgsVertexId; +using conversions::ToPropertyValue; + +namespace { +namespace msgs = msgs; + +using AllEdgePropertyDataSructure = std::map; +using SpecificEdgePropertyDataSructure = std::vector; + +using AllEdgeProperties = std::tuple; +using SpecificEdgeProperties = std::tuple; + +using SpecificEdgePropertiesVector = std::vector; +using AllEdgePropertiesVector = std::vector; + +struct VertexIdCmpr { + bool operator()(const storage::v3::VertexId *lhs, const storage::v3::VertexId *rhs) const { return *lhs < *rhs; } +}; + +std::optional> PrimaryKeysFromAccessor(const VertexAccessor &acc, View view, + const Schemas::Schema &schema) { + std::map ret; + auto props = acc.Properties(view); + auto maybe_pk = acc.PrimaryKey(view); + if (maybe_pk.HasError()) { + spdlog::debug("Encountered an error while trying to get vertex primary key."); + return std::nullopt; + } + auto &pk = maybe_pk.GetValue(); + MG_ASSERT(schema.second.size() == pk.size(), "PrimaryKey size does not match schema!"); + for (size_t i{0}; i < schema.second.size(); ++i) { + ret.emplace(schema.second[i].property_id, FromPropertyValueToValue(std::move(pk[i]))); + } + + return ret; +} + +struct LocalError {}; + +std::optional> FillUpSourceVertexSecondaryLabels(const std::optional &v_acc, + const msgs::ExpandOneRequest &req) { + auto secondary_labels = v_acc->Labels(View::NEW); + if (secondary_labels.HasError()) { + spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + + auto &sec_labels = secondary_labels.GetValue(); + std::vector msgs_secondary_labels; + msgs_secondary_labels.reserve(sec_labels.size()); + + std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(msgs_secondary_labels), + [](auto label_id) { return msgs::Label{.id = label_id}; }); + + return msgs_secondary_labels; +} + +std::optional> FillUpSourceVertexProperties(const std::optional &v_acc, + const msgs::ExpandOneRequest &req, + storage::v3::View view, + const Schemas::Schema &schema) { + std::map src_vertex_properties; + + if (!req.src_vertex_properties) { + auto props = v_acc->Properties(View::NEW); + if (props.HasError()) { + spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + + for (auto &[key, val] : props.GetValue()) { + src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(std::move(val)))); + } + auto pks = PrimaryKeysFromAccessor(*v_acc, view, schema); + if (pks) { + src_vertex_properties.merge(*pks); + } + + } else if (req.src_vertex_properties.value().empty()) { + // NOOP + } else { + for (const auto &prop : req.src_vertex_properties.value()) { + auto prop_val = v_acc->GetProperty(prop, View::OLD); + if (prop_val.HasError()) { + spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + src_vertex_properties.insert(std::make_pair(prop, FromPropertyValueToValue(std::move(prop_val.GetValue())))); + } + } + + return src_vertex_properties; +} + +std::optional, 2>> FillUpConnectingEdges( + const std::optional &v_acc, const msgs::ExpandOneRequest &req, + const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness) { + std::vector edge_types{}; + edge_types.reserve(req.edge_types.size()); + std::transform(req.edge_types.begin(), req.edge_types.end(), std::back_inserter(edge_types), + [](const msgs::EdgeType &edge_type) { return edge_type.id; }); + + std::vector in_edges; + std::vector out_edges; + + switch (req.direction) { + case msgs::EdgeDirection::OUT: { + auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types); + if (out_edges_result.HasError()) { + spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + out_edges = + maybe_filter_based_on_edge_uniquness(std::move(out_edges_result.GetValue()), msgs::EdgeDirection::OUT); + break; + } + case msgs::EdgeDirection::IN: { + auto in_edges_result = v_acc->InEdges(View::NEW, edge_types); + if (in_edges_result.HasError()) { + spdlog::debug( + "Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}"[req.transaction_id + .logical_id]); + return std::nullopt; + } + in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN); + break; + } + case msgs::EdgeDirection::BOTH: { + auto in_edges_result = v_acc->InEdges(View::NEW, edge_types); + if (in_edges_result.HasError()) { + spdlog::debug("Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN); + auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types); + if (out_edges_result.HasError()) { + spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + out_edges = + maybe_filter_based_on_edge_uniquness(std::move(out_edges_result.GetValue()), msgs::EdgeDirection::OUT); + break; + } + } + return std::array, 2>{std::move(in_edges), std::move(out_edges)}; +} + +using AllEdgePropertyDataSructure = std::map; +using SpecificEdgePropertyDataSructure = std::vector; + +using AllEdgeProperties = std::tuple; +using SpecificEdgeProperties = std::tuple; + +using SpecificEdgePropertiesVector = std::vector; +using AllEdgePropertiesVector = std::vector; + +using EdgeFiller = std::function; + +template +bool FillEdges(const std::vector &edges, msgs::ExpandOneResultRow &row, const EdgeFiller &edge_filler) { + for (const auto &edge : edges) { + if (!edge_filler(edge, are_in_edges, row)) { + return false; + } + } + + return true; +} + +}; // namespace + +std::optional> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc, + const std::vector &props, + View view) { + std::map ret; + + for (const auto &prop : props) { + auto result = acc.GetProperty(prop, view); + if (result.HasError()) { + spdlog::debug("Encountered an Error while trying to get a vertex property."); + return std::nullopt; + } + auto &value = result.GetValue(); + ret.emplace(std::make_pair(prop, FromPropertyValueToValue(std::move(value)))); + } + + return ret; +} + +std::vector EvaluateVertexExpressions(DbAccessor &dba, const VertexAccessor &v_acc, + const std::vector &expressions, + std::string_view node_name) { + std::vector evaluated_expressions; + evaluated_expressions.reserve(expressions.size()); + + std::transform(expressions.begin(), expressions.end(), std::back_inserter(evaluated_expressions), + [&dba, &v_acc, &node_name](const auto &expression) { + return ComputeExpression(dba, v_acc, std::nullopt, expression, node_name, ""); + }); + + return evaluated_expressions; +} + +std::optional> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view, + const Schemas::Schema &schema) { + std::map ret; + auto props = acc.Properties(view); + if (props.HasError()) { + spdlog::debug("Encountered an error while trying to get vertex properties."); + return std::nullopt; + } + + auto &properties = props.GetValue(); + std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()), + [](std::pair &pair) { + return std::make_pair(pair.first, FromPropertyValueToValue(std::move(pair.second))); + }); + properties.clear(); + + auto pks = PrimaryKeysFromAccessor(acc, view, schema); + if (pks) { + ret.merge(*pks); + } + + return ret; +} + +EdgeUniqunessFunction InitializeEdgeUniqunessFunction(bool only_unique_neighbor_rows) { + // Functions to select connecting edges based on uniquness + EdgeUniqunessFunction maybe_filter_based_on_edge_uniquness; + + if (only_unique_neighbor_rows) { + maybe_filter_based_on_edge_uniquness = [](EdgeAccessors &&edges, + msgs::EdgeDirection edge_direction) -> EdgeAccessors { + std::function &, const storage::v3::EdgeAccessor &)> + is_edge_unique; + switch (edge_direction) { + case msgs::EdgeDirection::OUT: { + is_edge_unique = [](std::set &other_vertex_set, + const storage::v3::EdgeAccessor &edge_acc) { + auto [it, insertion_happened] = other_vertex_set.insert(&edge_acc.ToVertex()); + return insertion_happened; + }; + break; + } + case msgs::EdgeDirection::IN: { + is_edge_unique = [](std::set &other_vertex_set, + const storage::v3::EdgeAccessor &edge_acc) { + auto [it, insertion_happened] = other_vertex_set.insert(&edge_acc.FromVertex()); + return insertion_happened; + }; + break; + } + case msgs::EdgeDirection::BOTH: + MG_ASSERT(false, "This is should never happen, msgs::EdgeDirection::BOTH should not be passed here."); + } + + EdgeAccessors ret; + std::set other_vertex_set; + + for (const auto &edge : edges) { + if (is_edge_unique(other_vertex_set, edge)) { + ret.emplace_back(edge); + } + } + + return ret; + }; + } else { + maybe_filter_based_on_edge_uniquness = + [](EdgeAccessors &&edges, msgs::EdgeDirection /*edge_direction*/) -> EdgeAccessors { return std::move(edges); }; + } + + return maybe_filter_based_on_edge_uniquness; +} + +EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) { + EdgeFiller edge_filler; + + if (!req.edge_properties) { + edge_filler = [transaction_id = req.transaction_id.logical_id](const EdgeAccessor &edge, const bool is_in_edge, + msgs::ExpandOneResultRow &result_row) -> bool { + auto properties_results = edge.Properties(View::NEW); + if (properties_results.HasError()) { + spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}", transaction_id); + return false; + } + + std::map value_properties; + for (auto &[prop_key, prop_val] : properties_results.GetValue()) { + value_properties.insert(std::make_pair(prop_key, FromPropertyValueToValue(std::move(prop_val)))); + } + using EdgeWithAllProperties = msgs::ExpandOneResultRow::EdgeWithAllProperties; + EdgeWithAllProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()}, + edge.Gid().AsUint(), std::move(value_properties)}; + if (is_in_edge) { + result_row.in_edges_with_all_properties.push_back(std::move(edges)); + } else { + result_row.out_edges_with_all_properties.push_back(std::move(edges)); + } + return true; + }; + } else { + // TODO(gvolfing) - do we want to set the action_successful here? + edge_filler = [&req](const EdgeAccessor &edge, const bool is_in_edge, + msgs::ExpandOneResultRow &result_row) -> bool { + std::vector value_properties; + value_properties.reserve(req.edge_properties.value().size()); + for (const auto &edge_prop : req.edge_properties.value()) { + auto property_result = edge.GetProperty(edge_prop, View::NEW); + if (property_result.HasError()) { + spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}", + req.transaction_id.logical_id); + return false; + } + value_properties.emplace_back(FromPropertyValueToValue(std::move(property_result.GetValue()))); + } + using EdgeWithSpecificProperties = msgs::ExpandOneResultRow::EdgeWithSpecificProperties; + EdgeWithSpecificProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()}, + edge.Gid().AsUint(), std::move(value_properties)}; + if (is_in_edge) { + result_row.in_edges_with_specific_properties.push_back(std::move(edges)); + } else { + result_row.out_edges_with_specific_properties.push_back(std::move(edges)); + } + return true; + }; + } + + return edge_filler; +} + +bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector &filters, + const std::string_view node_name) { + return std::ranges::all_of(filters, [&node_name, &dba, &v_acc](const auto &filter_expr) { + auto res = ComputeExpression(dba, v_acc, std::nullopt, filter_expr, node_name, ""); + return res.IsBool() && res.ValueBool(); + }); +} + +std::optional GetExpandOneResult( + Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, + const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler, + const Schemas::Schema &schema) { + /// Fill up source vertex + const auto primary_key = ConvertPropertyVector(src_vertex.second); + auto v_acc = acc.FindVertex(primary_key, View::NEW); + + msgs::Vertex source_vertex = {.id = src_vertex}; + if (const auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); maybe_secondary_labels) { + source_vertex.labels = *maybe_secondary_labels; + } else { + return std::nullopt; + } + + std::optional> src_vertex_properties; + src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema); + + if (!src_vertex_properties) { + return std::nullopt; + } + + /// Fill up connecting edges + auto fill_up_connecting_edges = FillUpConnectingEdges(v_acc, req, maybe_filter_based_on_edge_uniquness); + if (!fill_up_connecting_edges) { + return std::nullopt; + } + + auto [in_edges, out_edges] = fill_up_connecting_edges.value(); + + msgs::ExpandOneResultRow result_row; + result_row.src_vertex = std::move(source_vertex); + result_row.src_vertex_properties = std::move(*src_vertex_properties); + static constexpr bool kInEdges = true; + static constexpr bool kOutEdges = false; + if (!in_edges.empty() && !FillEdges(in_edges, result_row, edge_filler)) { + return std::nullopt; + } + if (!out_edges.empty() && !FillEdges(out_edges, result_row, edge_filler)) { + return std::nullopt; + } + + return result_row; +} + +std::optional GetExpandOneResult( + VertexAccessor v_acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, + std::vector in_edge_accessors, std::vector out_edge_accessors, + const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler, + const Schemas::Schema &schema) { + /// Fill up source vertex + msgs::Vertex source_vertex = {.id = src_vertex}; + if (const auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); maybe_secondary_labels) { + source_vertex.labels = *maybe_secondary_labels; + } else { + return std::nullopt; + } + + /// Fill up source vertex properties + auto src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema); + if (!src_vertex_properties) { + return std::nullopt; + } + + /// Fill up connecting edges + auto in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edge_accessors), msgs::EdgeDirection::IN); + auto out_edges = maybe_filter_based_on_edge_uniquness(std::move(out_edge_accessors), msgs::EdgeDirection::OUT); + + msgs::ExpandOneResultRow result_row; + result_row.src_vertex = std::move(source_vertex); + result_row.src_vertex_properties = std::move(*src_vertex_properties); + static constexpr bool kInEdges = true; + static constexpr bool kOutEdges = false; + if (!in_edges.empty() && !FillEdges(in_edges, result_row, edge_filler)) { + return std::nullopt; + } + if (!out_edges.empty() && !FillEdges(out_edges, result_row, edge_filler)) { + return std::nullopt; + } + + return result_row; +} VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable, const std::vector &start_ids, const View view) { diff --git a/src/storage/v3/request_helper.hpp b/src/storage/v3/request_helper.hpp index 4d3018566..035cc1ae5 100644 --- a/src/storage/v3/request_helper.hpp +++ b/src/storage/v3/request_helper.hpp @@ -22,6 +22,11 @@ #include "storage/v3/shard.hpp" #include "storage/v3/vertex_accessor.hpp" namespace memgraph::storage::v3 { +using EdgeAccessors = std::vector; +using EdgeUniqunessFunction = std::function; +using EdgeFiller = std::function; +using msgs::Value; + template concept ObjectAccessor = std::is_same_v || std::is_same_v; @@ -194,4 +199,40 @@ std::vector>::const_iterator GetStartOrderedElementsIter std::array, 2> GetEdgesFromVertex(const VertexAccessor &vertex_accessor, msgs::EdgeDirection direction); + +bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector &filters, + const std::string_view node_name); + +std::vector EvaluateVertexExpressions(DbAccessor &dba, const VertexAccessor &v_acc, + const std::vector &expressions, + std::string_view node_name); + +std::optional> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc, + const std::vector &props, + View view); + +std::optional> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view, + const Schemas::Schema &schema); + +EdgeUniqunessFunction InitializeEdgeUniqunessFunction(bool only_unique_neighbor_rows); + +EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req); + +std::optional> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc, + const std::vector &props, + View view); + +bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector &filters, + const std::string_view node_name); + +std::optional GetExpandOneResult( + Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, + const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler, + const Schemas::Schema &schema); + +std::optional GetExpandOneResult( + VertexAccessor v_acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, + std::vector in_edge_accessors, std::vector out_edge_accessors, + const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler, + const Schemas::Schema &schema); } // namespace memgraph::storage::v3 diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index bf31e8ced..841b64e44 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -45,469 +45,18 @@ #include "utils/logging.hpp" namespace memgraph::storage::v3 { -using msgs::Label; +using msgs::Label; // #NoCommit not needed? using msgs::PropertyId; using msgs::Value; +using conversions::ConvertPropertyMap; using conversions::ConvertPropertyVector; using conversions::ConvertValueVector; +using conversions::FromMap; using conversions::FromPropertyValueToValue; using conversions::ToMsgsVertexId; using conversions::ToPropertyValue; -namespace { -namespace msgs = msgs; - -using AllEdgePropertyDataSructure = std::map; -using SpecificEdgePropertyDataSructure = std::vector; - -using AllEdgeProperties = std::tuple; -using SpecificEdgeProperties = std::tuple; - -using SpecificEdgePropertiesVector = std::vector; -using AllEdgePropertiesVector = std::vector; - -using EdgeAccessors = std::vector; - -using EdgeFiller = std::function; -using EdgeUniqunessFunction = std::function; - -struct VertexIdCmpr { - bool operator()(const storage::v3::VertexId *lhs, const storage::v3::VertexId *rhs) const { return *lhs < *rhs; } -}; - -std::vector> ConvertPropertyMap( - std::vector> &&properties) { - std::vector> ret; - ret.reserve(properties.size()); - - std::transform(std::make_move_iterator(properties.begin()), std::make_move_iterator(properties.end()), - std::back_inserter(ret), [](std::pair &&property) { - return std::make_pair(property.first, ToPropertyValue(std::move(property.second))); - }); - - return ret; -} - -std::vector> FromMap(const std::map &properties) { - std::vector> ret; - ret.reserve(properties.size()); - - std::transform(properties.begin(), properties.end(), std::back_inserter(ret), - [](const auto &property) { return std::make_pair(property.first, property.second); }); - - return ret; -} - -std::optional> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc, - const std::vector &props, - View view) { - std::map ret; - - for (const auto &prop : props) { - auto result = acc.GetProperty(prop, view); - if (result.HasError()) { - spdlog::debug("Encountered an Error while trying to get a vertex property."); - return std::nullopt; - } - auto &value = result.GetValue(); - ret.emplace(std::make_pair(prop, FromPropertyValueToValue(std::move(value)))); - } - - return ret; -} - -std::optional> PrimaryKeysFromAccessor(const VertexAccessor &acc, View view, - const Schemas::Schema &schema) { - std::map ret; - auto props = acc.Properties(view); - auto maybe_pk = acc.PrimaryKey(view); - if (maybe_pk.HasError()) { - spdlog::debug("Encountered an error while trying to get vertex primary key."); - return std::nullopt; - } - auto &pk = maybe_pk.GetValue(); - MG_ASSERT(schema.second.size() == pk.size(), "PrimaryKey size does not match schema!"); - for (size_t i{0}; i < schema.second.size(); ++i) { - ret.emplace(schema.second[i].property_id, FromPropertyValueToValue(std::move(pk[i]))); - } - - return ret; -} - -std::optional> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view, - const Schemas::Schema &schema) { - std::map ret; - auto props = acc.Properties(view); - if (props.HasError()) { - spdlog::debug("Encountered an error while trying to get vertex properties."); - return std::nullopt; - } - - auto &properties = props.GetValue(); - std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()), - [](std::pair &pair) { - return std::make_pair(pair.first, FromPropertyValueToValue(std::move(pair.second))); - }); - properties.clear(); - - auto pks = PrimaryKeysFromAccessor(acc, view, schema); - if (pks) { - ret.merge(*pks); - } - - return ret; -} - -bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector &filters, - const std::string_view node_name) { - return std::ranges::all_of(filters, [&node_name, &dba, &v_acc](const auto &filter_expr) { - auto res = ComputeExpression(dba, v_acc, std::nullopt, filter_expr, node_name, ""); - return res.IsBool() && res.ValueBool(); - }); -} - -std::vector EvaluateVertexExpressions(DbAccessor &dba, const VertexAccessor &v_acc, - const std::vector &expressions, - std::string_view node_name) { - std::vector evaluated_expressions; - evaluated_expressions.reserve(expressions.size()); - - std::transform(expressions.begin(), expressions.end(), std::back_inserter(evaluated_expressions), - [&dba, &v_acc, &node_name](const auto &expression) { - return ComputeExpression(dba, v_acc, std::nullopt, expression, node_name, ""); - }); - - return evaluated_expressions; -} - -struct LocalError {}; - -std::optional> FillUpSourceVertexSecondaryLabels(const std::optional &v_acc, - const msgs::ExpandOneRequest &req) { - auto secondary_labels = v_acc->Labels(View::NEW); - if (secondary_labels.HasError()) { - spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}", - req.transaction_id.logical_id); - return std::nullopt; - } - - auto &sec_labels = secondary_labels.GetValue(); - std::vector msgs_secondary_labels; - msgs_secondary_labels.reserve(sec_labels.size()); - - std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(msgs_secondary_labels), - [](auto label_id) { return msgs::Label{.id = label_id}; }); - - return msgs_secondary_labels; -} - -std::optional> FillUpSourceVertexProperties(const std::optional &v_acc, - const msgs::ExpandOneRequest &req, - storage::v3::View view, - const Schemas::Schema &schema) { - std::map src_vertex_properties; - - if (!req.src_vertex_properties) { - auto props = v_acc->Properties(View::NEW); - if (props.HasError()) { - spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}", - req.transaction_id.logical_id); - return std::nullopt; - } - - for (auto &[key, val] : props.GetValue()) { - src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(std::move(val)))); - } - auto pks = PrimaryKeysFromAccessor(*v_acc, view, schema); - if (pks) { - src_vertex_properties.merge(*pks); - } - - } else if (req.src_vertex_properties.value().empty()) { - // NOOP - } else { - for (const auto &prop : req.src_vertex_properties.value()) { - auto prop_val = v_acc->GetProperty(prop, View::OLD); - if (prop_val.HasError()) { - spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}", - req.transaction_id.logical_id); - return std::nullopt; - } - src_vertex_properties.insert(std::make_pair(prop, FromPropertyValueToValue(std::move(prop_val.GetValue())))); - } - } - - return src_vertex_properties; -} - -std::optional, 2>> FillUpConnectingEdges( - const std::optional &v_acc, const msgs::ExpandOneRequest &req, - const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness) { - std::vector edge_types{}; - edge_types.reserve(req.edge_types.size()); - std::transform(req.edge_types.begin(), req.edge_types.end(), std::back_inserter(edge_types), - [](const msgs::EdgeType &edge_type) { return edge_type.id; }); - - std::vector in_edges; - std::vector out_edges; - - switch (req.direction) { - case msgs::EdgeDirection::OUT: { - auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types); - if (out_edges_result.HasError()) { - spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}", - req.transaction_id.logical_id); - return std::nullopt; - } - out_edges = - maybe_filter_based_on_edge_uniquness(std::move(out_edges_result.GetValue()), msgs::EdgeDirection::OUT); - break; - } - case msgs::EdgeDirection::IN: { - auto in_edges_result = v_acc->InEdges(View::NEW, edge_types); - if (in_edges_result.HasError()) { - spdlog::debug( - "Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}"[req.transaction_id - .logical_id]); - return std::nullopt; - } - in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN); - break; - } - case msgs::EdgeDirection::BOTH: { - auto in_edges_result = v_acc->InEdges(View::NEW, edge_types); - if (in_edges_result.HasError()) { - spdlog::debug("Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}", - req.transaction_id.logical_id); - return std::nullopt; - } - in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN); - auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types); - if (out_edges_result.HasError()) { - spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}", - req.transaction_id.logical_id); - return std::nullopt; - } - out_edges = - maybe_filter_based_on_edge_uniquness(std::move(out_edges_result.GetValue()), msgs::EdgeDirection::OUT); - break; - } - } - return std::array, 2>{std::move(in_edges), std::move(out_edges)}; -} - -using AllEdgePropertyDataSructure = std::map; -using SpecificEdgePropertyDataSructure = std::vector; - -using AllEdgeProperties = std::tuple; -using SpecificEdgeProperties = std::tuple; - -using SpecificEdgePropertiesVector = std::vector; -using AllEdgePropertiesVector = std::vector; - -using EdgeFiller = std::function; - -template -bool FillEdges(const std::vector &edges, msgs::ExpandOneResultRow &row, const EdgeFiller &edge_filler) { - for (const auto &edge : edges) { - if (!edge_filler(edge, are_in_edges, row)) { - return false; - } - } - - return true; -} - -std::optional GetExpandOneResult( - Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, - const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler, - const Schemas::Schema &schema) { - /// Fill up source vertex - const auto primary_key = ConvertPropertyVector(src_vertex.second); - auto v_acc = acc.FindVertex(primary_key, View::NEW); - - msgs::Vertex source_vertex = {.id = src_vertex}; - if (const auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); maybe_secondary_labels) { - source_vertex.labels = *maybe_secondary_labels; - } else { - return std::nullopt; - } - - std::optional> src_vertex_properties; - src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema); - - if (!src_vertex_properties) { - return std::nullopt; - } - - /// Fill up connecting edges - auto fill_up_connecting_edges = FillUpConnectingEdges(v_acc, req, maybe_filter_based_on_edge_uniquness); - if (!fill_up_connecting_edges) { - return std::nullopt; - } - - auto [in_edges, out_edges] = fill_up_connecting_edges.value(); - - msgs::ExpandOneResultRow result_row; - result_row.src_vertex = std::move(source_vertex); - result_row.src_vertex_properties = std::move(*src_vertex_properties); - static constexpr bool kInEdges = true; - static constexpr bool kOutEdges = false; - if (!in_edges.empty() && !FillEdges(in_edges, result_row, edge_filler)) { - return std::nullopt; - } - if (!out_edges.empty() && !FillEdges(out_edges, result_row, edge_filler)) { - return std::nullopt; - } - - return result_row; -} - -std::optional GetExpandOneResult( - VertexAccessor v_acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, - std::vector in_edge_accessors, std::vector out_edge_accessors, - const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler, - const Schemas::Schema &schema) { - /// Fill up source vertex - msgs::Vertex source_vertex = {.id = src_vertex}; - if (const auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); maybe_secondary_labels) { - source_vertex.labels = *maybe_secondary_labels; - } else { - return std::nullopt; - } - - /// Fill up source vertex properties - auto src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema); - if (!src_vertex_properties) { - return std::nullopt; - } - - /// Fill up connecting edges - auto in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edge_accessors), msgs::EdgeDirection::IN); - auto out_edges = maybe_filter_based_on_edge_uniquness(std::move(out_edge_accessors), msgs::EdgeDirection::OUT); - - msgs::ExpandOneResultRow result_row; - result_row.src_vertex = std::move(source_vertex); - result_row.src_vertex_properties = std::move(*src_vertex_properties); - static constexpr bool kInEdges = true; - static constexpr bool kOutEdges = false; - if (!in_edges.empty() && !FillEdges(in_edges, result_row, edge_filler)) { - return std::nullopt; - } - if (!out_edges.empty() && !FillEdges(out_edges, result_row, edge_filler)) { - return std::nullopt; - } - - return result_row; -} - -EdgeUniqunessFunction InitializeEdgeUniqunessFunction(bool only_unique_neighbor_rows) { - // Functions to select connecting edges based on uniquness - EdgeUniqunessFunction maybe_filter_based_on_edge_uniquness; - - if (only_unique_neighbor_rows) { - maybe_filter_based_on_edge_uniquness = [](EdgeAccessors &&edges, - msgs::EdgeDirection edge_direction) -> EdgeAccessors { - std::function &, const storage::v3::EdgeAccessor &)> - is_edge_unique; - switch (edge_direction) { - case msgs::EdgeDirection::OUT: { - is_edge_unique = [](std::set &other_vertex_set, - const storage::v3::EdgeAccessor &edge_acc) { - auto [it, insertion_happened] = other_vertex_set.insert(&edge_acc.ToVertex()); - return insertion_happened; - }; - break; - } - case msgs::EdgeDirection::IN: { - is_edge_unique = [](std::set &other_vertex_set, - const storage::v3::EdgeAccessor &edge_acc) { - auto [it, insertion_happened] = other_vertex_set.insert(&edge_acc.FromVertex()); - return insertion_happened; - }; - break; - } - case msgs::EdgeDirection::BOTH: - MG_ASSERT(false, "This is should never happen, msgs::EdgeDirection::BOTH should not be passed here."); - } - - EdgeAccessors ret; - std::set other_vertex_set; - - for (const auto &edge : edges) { - if (is_edge_unique(other_vertex_set, edge)) { - ret.emplace_back(edge); - } - } - - return ret; - }; - } else { - maybe_filter_based_on_edge_uniquness = - [](EdgeAccessors &&edges, msgs::EdgeDirection /*edge_direction*/) -> EdgeAccessors { return std::move(edges); }; - } - - return maybe_filter_based_on_edge_uniquness; -} - -EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) { - EdgeFiller edge_filler; - - if (!req.edge_properties) { - edge_filler = [transaction_id = req.transaction_id.logical_id](const EdgeAccessor &edge, const bool is_in_edge, - msgs::ExpandOneResultRow &result_row) -> bool { - auto properties_results = edge.Properties(View::NEW); - if (properties_results.HasError()) { - spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}", transaction_id); - return false; - } - - std::map value_properties; - for (auto &[prop_key, prop_val] : properties_results.GetValue()) { - value_properties.insert(std::make_pair(prop_key, FromPropertyValueToValue(std::move(prop_val)))); - } - using EdgeWithAllProperties = msgs::ExpandOneResultRow::EdgeWithAllProperties; - EdgeWithAllProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()}, - edge.Gid().AsUint(), std::move(value_properties)}; - if (is_in_edge) { - result_row.in_edges_with_all_properties.push_back(std::move(edges)); - } else { - result_row.out_edges_with_all_properties.push_back(std::move(edges)); - } - return true; - }; - } else { - // TODO(gvolfing) - do we want to set the action_successful here? - edge_filler = [&req](const EdgeAccessor &edge, const bool is_in_edge, - msgs::ExpandOneResultRow &result_row) -> bool { - std::vector value_properties; - value_properties.reserve(req.edge_properties.value().size()); - for (const auto &edge_prop : req.edge_properties.value()) { - auto property_result = edge.GetProperty(edge_prop, View::NEW); - if (property_result.HasError()) { - spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}", - req.transaction_id.logical_id); - return false; - } - value_properties.emplace_back(FromPropertyValueToValue(std::move(property_result.GetValue()))); - } - using EdgeWithSpecificProperties = msgs::ExpandOneResultRow::EdgeWithSpecificProperties; - EdgeWithSpecificProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()}, - edge.Gid().AsUint(), std::move(value_properties)}; - if (is_in_edge) { - result_row.in_edges_with_specific_properties.push_back(std::move(edges)); - } else { - result_row.out_edges_with_specific_properties.push_back(std::move(edges)); - } - return true; - }; - } - - return edge_filler; -} - -}; // namespace msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) { auto acc = shard_->Access(req.transaction_id); diff --git a/src/storage/v3/value_conversions.hpp b/src/storage/v3/value_conversions.hpp index 05fd1394b..80b6f02b9 100644 --- a/src/storage/v3/value_conversions.hpp +++ b/src/storage/v3/value_conversions.hpp @@ -129,4 +129,27 @@ inline std::vector ConvertValueVector(const std::vector> ConvertPropertyMap( + std::vector> &&properties) { + std::vector> ret; + ret.reserve(properties.size()); + + std::transform(std::make_move_iterator(properties.begin()), std::make_move_iterator(properties.end()), + std::back_inserter(ret), [](std::pair &&property) { + return std::make_pair(property.first, ToPropertyValue(std::move(property.second))); + }); + + return ret; +} + +inline std::vector> FromMap(const std::map &properties) { + std::vector> ret; + ret.reserve(properties.size()); + + std::transform(properties.begin(), properties.end(), std::back_inserter(ret), + [](const auto &property) { return std::make_pair(property.first, property.second); }); + + return ret; +} } // namespace memgraph::storage::conversions