Merge pull request #611 from memgraph/T1083-MG-limit-and-order-expand-one_v3
Add Limit and OrderBy to ExpandOne
This commit is contained in:
commit
c647134916
@ -403,7 +403,9 @@ struct ExpandOneRequest {
|
||||
std::vector<std::string> vertex_expressions;
|
||||
std::vector<std::string> edge_expressions;
|
||||
|
||||
std::optional<std::vector<OrderBy>> order_by;
|
||||
std::vector<OrderBy> order_by_vertices;
|
||||
std::vector<OrderBy> order_by_edges;
|
||||
|
||||
// Limit the edges or the vertices?
|
||||
std::optional<size_t> limit;
|
||||
std::vector<std::string> filters;
|
||||
|
@ -165,7 +165,7 @@ std::any ParseExpression(const std::string &expr, memgraph::expr::AstStorage &st
|
||||
return visitor.visit(ast);
|
||||
}
|
||||
|
||||
TypedValue ComputeExpression(DbAccessor &dba, const std::optional<memgraph::storage::v3::VertexAccessor> &v_acc,
|
||||
TypedValue ComputeExpression(DbAccessor &dba, const memgraph::storage::v3::VertexAccessor &v_acc,
|
||||
const std::optional<memgraph::storage::v3::EdgeAccessor> &e_acc,
|
||||
const std::string &expression, std::string_view node_name, std::string_view edge_name) {
|
||||
AstStorage storage;
|
||||
@ -192,10 +192,11 @@ TypedValue ComputeExpression(DbAccessor &dba, const std::optional<memgraph::stor
|
||||
return position_symbol_pair.second.name() == node_name;
|
||||
}) != symbol_table.table().end());
|
||||
|
||||
frame[symbol_table.at(node_identifier)] = *v_acc;
|
||||
frame[symbol_table.at(node_identifier)] = v_acc;
|
||||
}
|
||||
|
||||
if (edge_identifier.symbol_pos_ != -1) {
|
||||
MG_ASSERT(e_acc.has_value());
|
||||
MG_ASSERT(std::find_if(symbol_table.table().begin(), symbol_table.table().end(),
|
||||
[&edge_name](const std::pair<int32_t, Symbol> &position_symbol_pair) {
|
||||
return position_symbol_pair.second.name() == edge_name;
|
||||
|
@ -9,6 +9,8 @@
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
|
||||
#include "db_accessor.hpp"
|
||||
@ -48,8 +50,7 @@ auto Eval(TExpression *expr, EvaluationContext &ctx, AstStorage &storage, Expres
|
||||
|
||||
std::any ParseExpression(const std::string &expr, AstStorage &storage);
|
||||
|
||||
TypedValue ComputeExpression(DbAccessor &dba, const std::optional<VertexAccessor> &v_acc,
|
||||
const std::optional<EdgeAccessor> &e_acc, const std::string &expression,
|
||||
std::string_view node_name, std::string_view edge_name);
|
||||
TypedValue ComputeExpression(DbAccessor &dba, const VertexAccessor &v_acc, const std::optional<EdgeAccessor> &e_acc,
|
||||
const std::string &expression, std::string_view node_name, std::string_view edge_name);
|
||||
|
||||
} // namespace memgraph::storage::v3
|
||||
|
@ -13,54 +13,435 @@
|
||||
|
||||
#include <vector>
|
||||
|
||||
#include "pretty_print_ast_to_original_expression.hpp"
|
||||
#include "storage/v3/bindings/db_accessor.hpp"
|
||||
#include "storage/v3/bindings/pretty_print_ast_to_original_expression.hpp"
|
||||
#include "storage/v3/expr.hpp"
|
||||
#include "storage/v3/value_conversions.hpp"
|
||||
|
||||
namespace memgraph::storage::v3 {
|
||||
using msgs::Label;
|
||||
using msgs::PropertyId;
|
||||
|
||||
std::vector<Element> OrderByElements(Shard::Accessor &acc, DbAccessor &dba, VerticesIterable &vertices_iterable,
|
||||
std::vector<msgs::OrderBy> &order_bys) {
|
||||
std::vector<Element> ordered;
|
||||
ordered.reserve(acc.ApproximateVertexCount());
|
||||
std::vector<Ordering> ordering;
|
||||
ordering.reserve(order_bys.size());
|
||||
for (const auto &order : order_bys) {
|
||||
switch (order.direction) {
|
||||
case memgraph::msgs::OrderingDirection::ASCENDING: {
|
||||
ordering.push_back(Ordering::ASC);
|
||||
break;
|
||||
}
|
||||
case memgraph::msgs::OrderingDirection::DESCENDING: {
|
||||
ordering.push_back(Ordering::DESC);
|
||||
break;
|
||||
}
|
||||
}
|
||||
using conversions::ConvertPropertyVector;
|
||||
using conversions::FromPropertyValueToValue;
|
||||
using conversions::ToMsgsVertexId;
|
||||
|
||||
namespace {
|
||||
|
||||
using AllEdgePropertyDataSructure = std::map<PropertyId, msgs::Value>;
|
||||
using SpecificEdgePropertyDataSructure = std::vector<msgs::Value>;
|
||||
|
||||
using AllEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, AllEdgePropertyDataSructure>;
|
||||
using SpecificEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, SpecificEdgePropertyDataSructure>;
|
||||
|
||||
using SpecificEdgePropertiesVector = std::vector<SpecificEdgeProperties>;
|
||||
using AllEdgePropertiesVector = std::vector<AllEdgeProperties>;
|
||||
|
||||
struct VertexIdCmpr {
|
||||
bool operator()(const storage::v3::VertexId *lhs, const storage::v3::VertexId *rhs) const { return *lhs < *rhs; }
|
||||
};
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> PrimaryKeysFromAccessor(const VertexAccessor &acc, View view,
|
||||
const Schemas::Schema &schema) {
|
||||
std::map<PropertyId, Value> ret;
|
||||
auto props = acc.Properties(view);
|
||||
auto maybe_pk = acc.PrimaryKey(view);
|
||||
if (maybe_pk.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get vertex primary key.");
|
||||
return std::nullopt;
|
||||
}
|
||||
auto compare_typed_values = TypedValueVectorCompare(ordering);
|
||||
for (auto it = vertices_iterable.begin(); it != vertices_iterable.end(); ++it) {
|
||||
std::vector<TypedValue> properties_order_by;
|
||||
properties_order_by.reserve(order_bys.size());
|
||||
|
||||
for (const auto &order_by : order_bys) {
|
||||
const auto val =
|
||||
ComputeExpression(dba, *it, std::nullopt, order_by.expression.expression, expr::identifier_node_symbol, "");
|
||||
properties_order_by.push_back(val);
|
||||
}
|
||||
ordered.push_back({std::move(properties_order_by), *it});
|
||||
auto &pk = maybe_pk.GetValue();
|
||||
MG_ASSERT(schema.second.size() == pk.size(), "PrimaryKey size does not match schema!");
|
||||
for (size_t i{0}; i < schema.second.size(); ++i) {
|
||||
ret.emplace(schema.second[i].property_id, FromPropertyValueToValue(std::move(pk[i])));
|
||||
}
|
||||
|
||||
std::sort(ordered.begin(), ordered.end(), [compare_typed_values](const auto &pair1, const auto &pair2) {
|
||||
return compare_typed_values(pair1.properties_order_by, pair2.properties_order_by);
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::optional<std::vector<msgs::Label>> FillUpSourceVertexSecondaryLabels(const std::optional<VertexAccessor> &v_acc,
|
||||
const msgs::ExpandOneRequest &req) {
|
||||
auto secondary_labels = v_acc->Labels(View::NEW);
|
||||
if (secondary_labels.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto &sec_labels = secondary_labels.GetValue();
|
||||
std::vector<msgs::Label> msgs_secondary_labels;
|
||||
msgs_secondary_labels.reserve(sec_labels.size());
|
||||
|
||||
std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(msgs_secondary_labels),
|
||||
[](auto label_id) { return msgs::Label{.id = label_id}; });
|
||||
|
||||
return msgs_secondary_labels;
|
||||
}
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std::optional<VertexAccessor> &v_acc,
|
||||
const msgs::ExpandOneRequest &req,
|
||||
storage::v3::View view,
|
||||
const Schemas::Schema &schema) {
|
||||
std::map<PropertyId, Value> src_vertex_properties;
|
||||
|
||||
if (!req.src_vertex_properties) {
|
||||
auto props = v_acc->Properties(View::NEW);
|
||||
if (props.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
for (auto &[key, val] : props.GetValue()) {
|
||||
src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(std::move(val))));
|
||||
}
|
||||
auto pks = PrimaryKeysFromAccessor(*v_acc, view, schema);
|
||||
if (pks) {
|
||||
src_vertex_properties.merge(*pks);
|
||||
}
|
||||
|
||||
} else if (req.src_vertex_properties.value().empty()) {
|
||||
// NOOP
|
||||
} else {
|
||||
for (const auto &prop : req.src_vertex_properties.value()) {
|
||||
auto prop_val = v_acc->GetProperty(prop, View::OLD);
|
||||
if (prop_val.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
return std::nullopt;
|
||||
}
|
||||
src_vertex_properties.insert(std::make_pair(prop, FromPropertyValueToValue(std::move(prop_val.GetValue()))));
|
||||
}
|
||||
}
|
||||
|
||||
return src_vertex_properties;
|
||||
}
|
||||
|
||||
std::optional<std::array<std::vector<EdgeAccessor>, 2>> FillUpConnectingEdges(
|
||||
const std::optional<VertexAccessor> &v_acc, const msgs::ExpandOneRequest &req,
|
||||
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness) {
|
||||
std::vector<EdgeTypeId> edge_types{};
|
||||
edge_types.reserve(req.edge_types.size());
|
||||
std::transform(req.edge_types.begin(), req.edge_types.end(), std::back_inserter(edge_types),
|
||||
[](const msgs::EdgeType &edge_type) { return edge_type.id; });
|
||||
|
||||
std::vector<EdgeAccessor> in_edges;
|
||||
std::vector<EdgeAccessor> out_edges;
|
||||
|
||||
switch (req.direction) {
|
||||
case msgs::EdgeDirection::OUT: {
|
||||
auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types);
|
||||
if (out_edges_result.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
return std::nullopt;
|
||||
}
|
||||
out_edges =
|
||||
maybe_filter_based_on_edge_uniquness(std::move(out_edges_result.GetValue()), msgs::EdgeDirection::OUT);
|
||||
break;
|
||||
}
|
||||
case msgs::EdgeDirection::IN: {
|
||||
auto in_edges_result = v_acc->InEdges(View::NEW, edge_types);
|
||||
if (in_edges_result.HasError()) {
|
||||
spdlog::debug(
|
||||
"Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}"[req.transaction_id
|
||||
.logical_id]);
|
||||
return std::nullopt;
|
||||
}
|
||||
in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN);
|
||||
break;
|
||||
}
|
||||
case msgs::EdgeDirection::BOTH: {
|
||||
auto in_edges_result = v_acc->InEdges(View::NEW, edge_types);
|
||||
if (in_edges_result.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
return std::nullopt;
|
||||
}
|
||||
in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN);
|
||||
auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types);
|
||||
if (out_edges_result.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
return std::nullopt;
|
||||
}
|
||||
out_edges =
|
||||
maybe_filter_based_on_edge_uniquness(std::move(out_edges_result.GetValue()), msgs::EdgeDirection::OUT);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return std::array<std::vector<EdgeAccessor>, 2>{std::move(in_edges), std::move(out_edges)};
|
||||
}
|
||||
|
||||
template <bool are_in_edges>
|
||||
bool FillEdges(const std::vector<EdgeAccessor> &edges, msgs::ExpandOneResultRow &row, const EdgeFiller &edge_filler) {
|
||||
for (const auto &edge : edges) {
|
||||
if (!edge_filler(edge, are_in_edges, row)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}; // namespace
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc,
|
||||
const std::vector<PropertyId> &props,
|
||||
View view) {
|
||||
std::map<PropertyId, Value> ret;
|
||||
|
||||
for (const auto &prop : props) {
|
||||
auto result = acc.GetProperty(prop, view);
|
||||
if (result.HasError()) {
|
||||
spdlog::debug("Encountered an Error while trying to get a vertex property.");
|
||||
return std::nullopt;
|
||||
}
|
||||
auto &value = result.GetValue();
|
||||
ret.emplace(std::make_pair(prop, FromPropertyValueToValue(std::move(value))));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexAccessor &v_acc,
|
||||
const std::vector<std::string> &expressions,
|
||||
std::string_view node_name) {
|
||||
std::vector<TypedValue> evaluated_expressions;
|
||||
evaluated_expressions.reserve(expressions.size());
|
||||
|
||||
std::transform(expressions.begin(), expressions.end(), std::back_inserter(evaluated_expressions),
|
||||
[&dba, &v_acc, &node_name](const auto &expression) {
|
||||
return ComputeExpression(dba, v_acc, std::nullopt, expression, node_name, "");
|
||||
});
|
||||
|
||||
return evaluated_expressions;
|
||||
}
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view,
|
||||
const Schemas::Schema &schema) {
|
||||
std::map<PropertyId, Value> ret;
|
||||
auto props = acc.Properties(view);
|
||||
if (props.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get vertex properties.");
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto &properties = props.GetValue();
|
||||
std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()),
|
||||
[](std::pair<const PropertyId, PropertyValue> &pair) {
|
||||
return std::make_pair(pair.first, FromPropertyValueToValue(std::move(pair.second)));
|
||||
});
|
||||
properties.clear();
|
||||
|
||||
auto pks = PrimaryKeysFromAccessor(acc, view, schema);
|
||||
if (pks) {
|
||||
ret.merge(*pks);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
EdgeUniqunessFunction InitializeEdgeUniqunessFunction(bool only_unique_neighbor_rows) {
|
||||
// Functions to select connecting edges based on uniquness
|
||||
EdgeUniqunessFunction maybe_filter_based_on_edge_uniquness;
|
||||
|
||||
if (only_unique_neighbor_rows) {
|
||||
maybe_filter_based_on_edge_uniquness = [](EdgeAccessors &&edges,
|
||||
msgs::EdgeDirection edge_direction) -> EdgeAccessors {
|
||||
std::function<bool(std::set<const storage::v3::VertexId *, VertexIdCmpr> &, const storage::v3::EdgeAccessor &)>
|
||||
is_edge_unique;
|
||||
switch (edge_direction) {
|
||||
case msgs::EdgeDirection::OUT: {
|
||||
is_edge_unique = [](std::set<const storage::v3::VertexId *, VertexIdCmpr> &other_vertex_set,
|
||||
const storage::v3::EdgeAccessor &edge_acc) {
|
||||
auto [it, insertion_happened] = other_vertex_set.insert(&edge_acc.ToVertex());
|
||||
return insertion_happened;
|
||||
};
|
||||
break;
|
||||
}
|
||||
case msgs::EdgeDirection::IN: {
|
||||
is_edge_unique = [](std::set<const storage::v3::VertexId *, VertexIdCmpr> &other_vertex_set,
|
||||
const storage::v3::EdgeAccessor &edge_acc) {
|
||||
auto [it, insertion_happened] = other_vertex_set.insert(&edge_acc.FromVertex());
|
||||
return insertion_happened;
|
||||
};
|
||||
break;
|
||||
}
|
||||
case msgs::EdgeDirection::BOTH:
|
||||
MG_ASSERT(false, "This is should never happen, msgs::EdgeDirection::BOTH should not be passed here.");
|
||||
}
|
||||
|
||||
EdgeAccessors ret;
|
||||
std::set<const storage::v3::VertexId *, VertexIdCmpr> other_vertex_set;
|
||||
|
||||
for (const auto &edge : edges) {
|
||||
if (is_edge_unique(other_vertex_set, edge)) {
|
||||
ret.emplace_back(edge);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
};
|
||||
} else {
|
||||
maybe_filter_based_on_edge_uniquness =
|
||||
[](EdgeAccessors &&edges, msgs::EdgeDirection /*edge_direction*/) -> EdgeAccessors { return std::move(edges); };
|
||||
}
|
||||
|
||||
return maybe_filter_based_on_edge_uniquness;
|
||||
}
|
||||
|
||||
EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) {
|
||||
EdgeFiller edge_filler;
|
||||
|
||||
if (!req.edge_properties) {
|
||||
edge_filler = [transaction_id = req.transaction_id.logical_id](const EdgeAccessor &edge, const bool is_in_edge,
|
||||
msgs::ExpandOneResultRow &result_row) -> bool {
|
||||
auto properties_results = edge.Properties(View::NEW);
|
||||
if (properties_results.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}", transaction_id);
|
||||
return false;
|
||||
}
|
||||
|
||||
std::map<PropertyId, msgs::Value> value_properties;
|
||||
for (auto &[prop_key, prop_val] : properties_results.GetValue()) {
|
||||
value_properties.insert(std::make_pair(prop_key, FromPropertyValueToValue(std::move(prop_val))));
|
||||
}
|
||||
using EdgeWithAllProperties = msgs::ExpandOneResultRow::EdgeWithAllProperties;
|
||||
EdgeWithAllProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()},
|
||||
edge.Gid().AsUint(), std::move(value_properties)};
|
||||
if (is_in_edge) {
|
||||
result_row.in_edges_with_all_properties.push_back(std::move(edges));
|
||||
} else {
|
||||
result_row.out_edges_with_all_properties.push_back(std::move(edges));
|
||||
}
|
||||
return true;
|
||||
};
|
||||
} else {
|
||||
// TODO(gvolfing) - do we want to set the action_successful here?
|
||||
edge_filler = [&req](const EdgeAccessor &edge, const bool is_in_edge,
|
||||
msgs::ExpandOneResultRow &result_row) -> bool {
|
||||
std::vector<msgs::Value> value_properties;
|
||||
value_properties.reserve(req.edge_properties.value().size());
|
||||
for (const auto &edge_prop : req.edge_properties.value()) {
|
||||
auto property_result = edge.GetProperty(edge_prop, View::NEW);
|
||||
if (property_result.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
return false;
|
||||
}
|
||||
value_properties.emplace_back(FromPropertyValueToValue(std::move(property_result.GetValue())));
|
||||
}
|
||||
using EdgeWithSpecificProperties = msgs::ExpandOneResultRow::EdgeWithSpecificProperties;
|
||||
EdgeWithSpecificProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()},
|
||||
edge.Gid().AsUint(), std::move(value_properties)};
|
||||
if (is_in_edge) {
|
||||
result_row.in_edges_with_specific_properties.push_back(std::move(edges));
|
||||
} else {
|
||||
result_row.out_edges_with_specific_properties.push_back(std::move(edges));
|
||||
}
|
||||
return true;
|
||||
};
|
||||
}
|
||||
|
||||
return edge_filler;
|
||||
}
|
||||
|
||||
bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &filters,
|
||||
const std::string_view node_name) {
|
||||
return std::ranges::all_of(filters, [&node_name, &dba, &v_acc](const auto &filter_expr) {
|
||||
auto res = ComputeExpression(dba, v_acc, std::nullopt, filter_expr, node_name, "");
|
||||
return res.IsBool() && res.ValueBool();
|
||||
});
|
||||
return ordered;
|
||||
}
|
||||
|
||||
std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||
Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req,
|
||||
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler,
|
||||
const Schemas::Schema &schema) {
|
||||
/// Fill up source vertex
|
||||
const auto primary_key = ConvertPropertyVector(src_vertex.second);
|
||||
auto v_acc = acc.FindVertex(primary_key, View::NEW);
|
||||
|
||||
msgs::Vertex source_vertex = {.id = src_vertex};
|
||||
if (const auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); maybe_secondary_labels) {
|
||||
source_vertex.labels = *maybe_secondary_labels;
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema);
|
||||
|
||||
if (!src_vertex_properties) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
/// Fill up connecting edges
|
||||
auto fill_up_connecting_edges = FillUpConnectingEdges(v_acc, req, maybe_filter_based_on_edge_uniquness);
|
||||
if (!fill_up_connecting_edges) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto [in_edges, out_edges] = fill_up_connecting_edges.value();
|
||||
|
||||
msgs::ExpandOneResultRow result_row;
|
||||
result_row.src_vertex = std::move(source_vertex);
|
||||
result_row.src_vertex_properties = std::move(*src_vertex_properties);
|
||||
static constexpr bool kInEdges = true;
|
||||
static constexpr bool kOutEdges = false;
|
||||
if (!in_edges.empty() && !FillEdges<kInEdges>(in_edges, result_row, edge_filler)) {
|
||||
return std::nullopt;
|
||||
}
|
||||
if (!out_edges.empty() && !FillEdges<kOutEdges>(out_edges, result_row, edge_filler)) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return result_row;
|
||||
}
|
||||
|
||||
std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||
VertexAccessor v_acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req,
|
||||
std::vector<EdgeAccessor> in_edge_accessors, std::vector<EdgeAccessor> out_edge_accessors,
|
||||
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler,
|
||||
const Schemas::Schema &schema) {
|
||||
/// Fill up source vertex
|
||||
msgs::Vertex source_vertex = {.id = src_vertex};
|
||||
if (const auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); maybe_secondary_labels) {
|
||||
source_vertex.labels = *maybe_secondary_labels;
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
/// Fill up source vertex properties
|
||||
auto src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema);
|
||||
if (!src_vertex_properties) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
/// Fill up connecting edges
|
||||
auto in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edge_accessors), msgs::EdgeDirection::IN);
|
||||
auto out_edges = maybe_filter_based_on_edge_uniquness(std::move(out_edge_accessors), msgs::EdgeDirection::OUT);
|
||||
|
||||
msgs::ExpandOneResultRow result_row;
|
||||
result_row.src_vertex = std::move(source_vertex);
|
||||
result_row.src_vertex_properties = std::move(*src_vertex_properties);
|
||||
static constexpr bool kInEdges = true;
|
||||
static constexpr bool kOutEdges = false;
|
||||
if (!in_edges.empty() && !FillEdges<kInEdges>(in_edges, result_row, edge_filler)) {
|
||||
return std::nullopt;
|
||||
}
|
||||
if (!out_edges.empty() && !FillEdges<kOutEdges>(out_edges, result_row, edge_filler)) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return result_row;
|
||||
}
|
||||
|
||||
VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable,
|
||||
const std::vector<PropertyValue> &start_ids, const View view) {
|
||||
const std::vector<PropertyValue> &primary_key, const View view) {
|
||||
auto it = vertex_iterable.begin();
|
||||
while (it != vertex_iterable.end()) {
|
||||
if (const auto &vertex = *it; start_ids <= vertex.PrimaryKey(view).GetValue()) {
|
||||
if (const auto &vertex = *it; primary_key <= vertex.PrimaryKey(view).GetValue()) {
|
||||
break;
|
||||
}
|
||||
++it;
|
||||
@ -68,17 +449,79 @@ VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_itera
|
||||
return it;
|
||||
}
|
||||
|
||||
std::vector<Element>::const_iterator GetStartOrderedElementsIterator(const std::vector<Element> &ordered_elements,
|
||||
const std::vector<PropertyValue> &start_ids,
|
||||
const View view) {
|
||||
std::vector<Element<VertexAccessor>>::const_iterator GetStartOrderedElementsIterator(
|
||||
const std::vector<Element<VertexAccessor>> &ordered_elements, const std::vector<PropertyValue> &primary_key,
|
||||
const View view) {
|
||||
for (auto it = ordered_elements.begin(); it != ordered_elements.end(); ++it) {
|
||||
if (const auto &vertex = it->vertex_acc; start_ids <= vertex.PrimaryKey(view).GetValue()) {
|
||||
if (const auto &vertex = it->object_acc; primary_key <= vertex.PrimaryKey(view).GetValue()) {
|
||||
return it;
|
||||
}
|
||||
}
|
||||
return ordered_elements.end();
|
||||
}
|
||||
|
||||
std::array<std::vector<EdgeAccessor>, 2> GetEdgesFromVertex(const VertexAccessor &vertex_accessor,
|
||||
const msgs::EdgeDirection direction) {
|
||||
std::vector<EdgeAccessor> in_edges;
|
||||
std::vector<EdgeAccessor> out_edges;
|
||||
|
||||
switch (direction) {
|
||||
case memgraph::msgs::EdgeDirection::IN: {
|
||||
auto edges = vertex_accessor.InEdges(View::OLD);
|
||||
if (edges.HasValue()) {
|
||||
in_edges = edges.GetValue();
|
||||
}
|
||||
}
|
||||
case memgraph::msgs::EdgeDirection::OUT: {
|
||||
auto edges = vertex_accessor.OutEdges(View::OLD);
|
||||
if (edges.HasValue()) {
|
||||
out_edges = edges.GetValue();
|
||||
}
|
||||
}
|
||||
case memgraph::msgs::EdgeDirection::BOTH: {
|
||||
auto maybe_in_edges = vertex_accessor.InEdges(View::OLD);
|
||||
auto maybe_out_edges = vertex_accessor.OutEdges(View::OLD);
|
||||
std::vector<EdgeAccessor> edges;
|
||||
if (maybe_in_edges.HasValue()) {
|
||||
in_edges = maybe_in_edges.GetValue();
|
||||
}
|
||||
if (maybe_out_edges.HasValue()) {
|
||||
out_edges = maybe_out_edges.GetValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return std::array<std::vector<EdgeAccessor>, 2>{std::move(in_edges), std::move(out_edges)};
|
||||
}
|
||||
|
||||
std::vector<Element<EdgeAccessor>> OrderByEdges(DbAccessor &dba, std::vector<EdgeAccessor> &iterable,
|
||||
std::vector<msgs::OrderBy> &order_by_edges,
|
||||
const VertexAccessor &vertex_acc) {
|
||||
std::vector<Ordering> ordering;
|
||||
ordering.reserve(order_by_edges.size());
|
||||
std::transform(order_by_edges.begin(), order_by_edges.end(), std::back_inserter(ordering),
|
||||
[](const auto &order_by) { return ConvertMsgsOrderByToOrdering(order_by.direction); });
|
||||
|
||||
std::vector<Element<EdgeAccessor>> ordered;
|
||||
for (auto it = iterable.begin(); it != iterable.end(); ++it) {
|
||||
std::vector<TypedValue> properties_order_by;
|
||||
properties_order_by.reserve(order_by_edges.size());
|
||||
std::transform(order_by_edges.begin(), order_by_edges.end(), std::back_inserter(properties_order_by),
|
||||
[&dba, &vertex_acc, &it](const auto &order_by) {
|
||||
return ComputeExpression(dba, vertex_acc, *it, order_by.expression.expression,
|
||||
expr::identifier_node_symbol, expr::identifier_edge_symbol);
|
||||
});
|
||||
|
||||
ordered.push_back({std::move(properties_order_by), *it});
|
||||
}
|
||||
|
||||
auto compare_typed_values = TypedValueVectorCompare(ordering);
|
||||
std::sort(ordered.begin(), ordered.end(), [compare_typed_values](const auto &pair1, const auto &pair2) {
|
||||
return compare_typed_values(pair1.properties_order_by, pair2.properties_order_by);
|
||||
});
|
||||
return ordered;
|
||||
}
|
||||
|
||||
void LogResultError(const ResultErrorType &error, const std::string_view action) {
|
||||
std::visit(
|
||||
[action]<typename T>(T &&error) {
|
||||
|
@ -9,15 +9,28 @@
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
|
||||
#include "ast/ast.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "storage/v3/bindings/ast/ast.hpp"
|
||||
#include "storage/v3/bindings/pretty_print_ast_to_original_expression.hpp"
|
||||
#include "storage/v3/bindings/typed_value.hpp"
|
||||
#include "storage/v3/edge_accessor.hpp"
|
||||
#include "storage/v3/expr.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "storage/v3/vertex_accessor.hpp"
|
||||
#include "utils/template_utils.hpp"
|
||||
|
||||
namespace memgraph::storage::v3 {
|
||||
using EdgeAccessors = std::vector<storage::v3::EdgeAccessor>;
|
||||
using EdgeUniqunessFunction = std::function<EdgeAccessors(EdgeAccessors &&, msgs::EdgeDirection)>;
|
||||
using EdgeFiller = std::function<bool(const EdgeAccessor &edge, bool is_in_edge, msgs::ExpandOneResultRow &result_row)>;
|
||||
using msgs::Value;
|
||||
|
||||
template <typename T>
|
||||
concept ObjectAccessor = utils::SameAsAnyOf<T, VertexAccessor, EdgeAccessor>;
|
||||
|
||||
inline bool TypedValueCompare(const TypedValue &a, const TypedValue &b) {
|
||||
// in ordering null comes after everything else
|
||||
@ -73,6 +86,17 @@ inline bool TypedValueCompare(const TypedValue &a, const TypedValue &b) {
|
||||
}
|
||||
}
|
||||
|
||||
inline Ordering ConvertMsgsOrderByToOrdering(msgs::OrderingDirection ordering) {
|
||||
switch (ordering) {
|
||||
case memgraph::msgs::OrderingDirection::ASCENDING:
|
||||
return memgraph::storage::v3::Ordering::ASC;
|
||||
case memgraph::msgs::OrderingDirection::DESCENDING:
|
||||
return memgraph::storage::v3::Ordering::DESC;
|
||||
default:
|
||||
LOG_FATAL("Unknown ordering direction");
|
||||
}
|
||||
}
|
||||
|
||||
class TypedValueVectorCompare final {
|
||||
public:
|
||||
explicit TypedValueVectorCompare(const std::vector<Ordering> &ordering) : ordering_(ordering) {}
|
||||
@ -100,20 +124,86 @@ class TypedValueVectorCompare final {
|
||||
std::vector<Ordering> ordering_;
|
||||
};
|
||||
|
||||
template <ObjectAccessor TObjectAccessor>
|
||||
struct Element {
|
||||
std::vector<TypedValue> properties_order_by;
|
||||
VertexAccessor vertex_acc;
|
||||
TObjectAccessor object_acc;
|
||||
};
|
||||
|
||||
std::vector<Element> OrderByElements(Shard::Accessor &acc, DbAccessor &dba, VerticesIterable &vertices_iterable,
|
||||
std::vector<msgs::OrderBy> &order_bys);
|
||||
template <typename T>
|
||||
concept VerticesIt = utils::SameAsAnyOf<T, VerticesIterable, std::vector<VertexAccessor>>;
|
||||
|
||||
VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable,
|
||||
const std::vector<PropertyValue> &start_ids, View view);
|
||||
template <VerticesIt TIterable>
|
||||
std::vector<Element<VertexAccessor>> OrderByVertices(DbAccessor &dba, TIterable &iterable,
|
||||
std::vector<msgs::OrderBy> &order_by_vertices) {
|
||||
std::vector<Ordering> ordering;
|
||||
ordering.reserve(order_by_vertices.size());
|
||||
std::transform(order_by_vertices.begin(), order_by_vertices.end(), std::back_inserter(ordering),
|
||||
[](const auto &order_by) { return ConvertMsgsOrderByToOrdering(order_by.direction); });
|
||||
|
||||
std::vector<Element>::const_iterator GetStartOrderedElementsIterator(const std::vector<Element> &ordered_elements,
|
||||
const std::vector<PropertyValue> &start_ids,
|
||||
View view);
|
||||
std::vector<Element<VertexAccessor>> ordered;
|
||||
for (auto it = iterable.begin(); it != iterable.end(); ++it) {
|
||||
std::vector<TypedValue> properties_order_by;
|
||||
properties_order_by.reserve(order_by_vertices.size());
|
||||
|
||||
std::transform(order_by_vertices.begin(), order_by_vertices.end(), std::back_inserter(properties_order_by),
|
||||
[&dba, &it](const auto &order_by) {
|
||||
return ComputeExpression(dba, *it, std::nullopt /*e_acc*/, order_by.expression.expression,
|
||||
expr::identifier_node_symbol, expr::identifier_edge_symbol);
|
||||
});
|
||||
|
||||
ordered.push_back({std::move(properties_order_by), *it});
|
||||
}
|
||||
|
||||
auto compare_typed_values = TypedValueVectorCompare(ordering);
|
||||
std::sort(ordered.begin(), ordered.end(), [compare_typed_values](const auto &pair1, const auto &pair2) {
|
||||
return compare_typed_values(pair1.properties_order_by, pair2.properties_order_by);
|
||||
});
|
||||
return ordered;
|
||||
}
|
||||
|
||||
void LogResultError(const ResultErrorType &error, std::string_view action);
|
||||
|
||||
std::vector<Element<EdgeAccessor>> OrderByEdges(DbAccessor &dba, std::vector<EdgeAccessor> &iterable,
|
||||
std::vector<msgs::OrderBy> &order_by_edges,
|
||||
const VertexAccessor &vertex_acc);
|
||||
|
||||
VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable,
|
||||
const std::vector<PropertyValue> &primary_key, View view);
|
||||
|
||||
std::vector<Element<VertexAccessor>>::const_iterator GetStartOrderedElementsIterator(
|
||||
const std::vector<Element<VertexAccessor>> &ordered_elements, const std::vector<PropertyValue> &primary_key,
|
||||
View view);
|
||||
|
||||
std::array<std::vector<EdgeAccessor>, 2> GetEdgesFromVertex(const VertexAccessor &vertex_accessor,
|
||||
msgs::EdgeDirection direction);
|
||||
|
||||
bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &filters,
|
||||
std::string_view node_name);
|
||||
|
||||
std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexAccessor &v_acc,
|
||||
const std::vector<std::string> &expressions,
|
||||
std::string_view node_name);
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc,
|
||||
const std::vector<PropertyId> &props,
|
||||
View view);
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view,
|
||||
const Schemas::Schema &schema);
|
||||
|
||||
EdgeUniqunessFunction InitializeEdgeUniqunessFunction(bool only_unique_neighbor_rows);
|
||||
|
||||
EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req);
|
||||
|
||||
std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||
Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req,
|
||||
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler,
|
||||
const Schemas::Schema &schema);
|
||||
|
||||
std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||
VertexAccessor v_acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req,
|
||||
std::vector<EdgeAccessor> in_edge_accessors, std::vector<EdgeAccessor> out_edge_accessors,
|
||||
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler,
|
||||
const Schemas::Schema &schema);
|
||||
} // namespace memgraph::storage::v3
|
||||
|
@ -41,431 +41,21 @@
|
||||
#include "storage/v3/vertex_accessor.hpp"
|
||||
#include "storage/v3/vertex_id.hpp"
|
||||
#include "storage/v3/view.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
|
||||
namespace memgraph::storage::v3 {
|
||||
using msgs::Label;
|
||||
using msgs::PropertyId;
|
||||
using msgs::Value;
|
||||
|
||||
using conversions::ConvertPropertyMap;
|
||||
using conversions::ConvertPropertyVector;
|
||||
using conversions::ConvertValueVector;
|
||||
using conversions::FromMap;
|
||||
using conversions::FromPropertyValueToValue;
|
||||
using conversions::ToMsgsVertexId;
|
||||
using conversions::ToPropertyValue;
|
||||
|
||||
namespace {
|
||||
namespace msgs = msgs;
|
||||
|
||||
using AllEdgePropertyDataStructure = std::map<PropertyId, msgs::Value>;
|
||||
using SpecificEdgePropertyDataStructure = std::vector<msgs::Value>;
|
||||
|
||||
using AllEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, AllEdgePropertyDataStructure>;
|
||||
using SpecificEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, SpecificEdgePropertyDataStructure>;
|
||||
|
||||
using SpecificEdgePropertiesVector = std::vector<SpecificEdgeProperties>;
|
||||
using AllEdgePropertiesVector = std::vector<AllEdgeProperties>;
|
||||
|
||||
using EdgeAccessors = std::vector<storage::v3::EdgeAccessor>;
|
||||
|
||||
using EdgeFiller = std::function<bool(const EdgeAccessor &edge, bool is_in_edge, msgs::ExpandOneResultRow &result_row)>;
|
||||
using EdgeUniquenessFunction = std::function<EdgeAccessors(EdgeAccessors &&, msgs::EdgeDirection)>;
|
||||
|
||||
struct VertexIdCmpr {
|
||||
bool operator()(const storage::v3::VertexId *lhs, const storage::v3::VertexId *rhs) const { return *lhs < *rhs; }
|
||||
};
|
||||
|
||||
std::vector<std::pair<PropertyId, PropertyValue>> ConvertPropertyMap(
|
||||
std::vector<std::pair<PropertyId, Value>> &&properties) {
|
||||
std::vector<std::pair<PropertyId, PropertyValue>> ret;
|
||||
ret.reserve(properties.size());
|
||||
|
||||
std::transform(std::make_move_iterator(properties.begin()), std::make_move_iterator(properties.end()),
|
||||
std::back_inserter(ret), [](std::pair<PropertyId, Value> &&property) {
|
||||
return std::make_pair(property.first, ToPropertyValue(std::move(property.second)));
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::vector<std::pair<PropertyId, Value>> FromMap(const std::map<PropertyId, Value> &properties) {
|
||||
std::vector<std::pair<PropertyId, Value>> ret;
|
||||
ret.reserve(properties.size());
|
||||
|
||||
std::transform(properties.begin(), properties.end(), std::back_inserter(ret),
|
||||
[](const auto &property) { return std::make_pair(property.first, property.second); });
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc,
|
||||
const std::vector<PropertyId> &props,
|
||||
View view) {
|
||||
std::map<PropertyId, Value> ret;
|
||||
|
||||
for (const auto &prop : props) {
|
||||
auto result = acc.GetProperty(prop, view);
|
||||
if (result.HasError()) {
|
||||
spdlog::debug("Encountered an Error while trying to get a vertex property.");
|
||||
return std::nullopt;
|
||||
}
|
||||
auto &value = result.GetValue();
|
||||
ret.emplace(std::make_pair(prop, FromPropertyValueToValue(std::move(value))));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> PrimaryKeysFromAccessor(const VertexAccessor &acc, View view,
|
||||
const Schemas::Schema *schema) {
|
||||
std::map<PropertyId, Value> ret;
|
||||
auto props = acc.Properties(view);
|
||||
auto maybe_pk = acc.PrimaryKey(view);
|
||||
if (maybe_pk.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get vertex primary key.");
|
||||
return std::nullopt;
|
||||
}
|
||||
auto &pk = maybe_pk.GetValue();
|
||||
MG_ASSERT(schema->second.size() == pk.size(), "PrimaryKey size does not match schema!");
|
||||
for (size_t i{0}; i < schema->second.size(); ++i) {
|
||||
ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(std::move(pk[i])));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view,
|
||||
const Schemas::Schema *schema) {
|
||||
std::map<PropertyId, Value> ret;
|
||||
auto props = acc.Properties(view);
|
||||
if (props.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get vertex properties.");
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto &properties = props.GetValue();
|
||||
std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()),
|
||||
[](std::pair<const PropertyId, PropertyValue> &pair) {
|
||||
return std::make_pair(pair.first, FromPropertyValueToValue(std::move(pair.second)));
|
||||
});
|
||||
properties.clear();
|
||||
|
||||
auto pks = PrimaryKeysFromAccessor(acc, view, schema);
|
||||
if (pks) {
|
||||
ret.merge(*pks);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &filters,
|
||||
const std::string_view node_name) {
|
||||
return std::ranges::all_of(filters, [&node_name, &dba, &v_acc](const auto &filter_expr) {
|
||||
auto res = ComputeExpression(dba, v_acc, std::nullopt, filter_expr, node_name, "");
|
||||
return res.IsBool() && res.ValueBool();
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexAccessor &v_acc,
|
||||
const std::vector<std::string> &expressions,
|
||||
std::string_view node_name) {
|
||||
std::vector<TypedValue> evaluated_expressions;
|
||||
evaluated_expressions.reserve(expressions.size());
|
||||
|
||||
std::transform(expressions.begin(), expressions.end(), std::back_inserter(evaluated_expressions),
|
||||
[&dba, &v_acc, &node_name](const auto &expression) {
|
||||
return ComputeExpression(dba, v_acc, std::nullopt, expression, node_name, "");
|
||||
});
|
||||
|
||||
return evaluated_expressions;
|
||||
}
|
||||
|
||||
std::optional<std::vector<msgs::Label>> FillUpSourceVertexSecondaryLabels(const std::optional<VertexAccessor> &v_acc,
|
||||
const msgs::ExpandOneRequest &req) {
|
||||
auto secondary_labels = v_acc->Labels(View::NEW);
|
||||
if (secondary_labels.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto &sec_labels = secondary_labels.GetValue();
|
||||
std::vector<msgs::Label> msgs_secondary_labels;
|
||||
msgs_secondary_labels.reserve(sec_labels.size());
|
||||
|
||||
std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(msgs_secondary_labels),
|
||||
[](auto label_id) { return msgs::Label{.id = label_id}; });
|
||||
|
||||
return msgs_secondary_labels;
|
||||
}
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std::optional<VertexAccessor> &v_acc,
|
||||
const msgs::ExpandOneRequest &req,
|
||||
storage::v3::View view,
|
||||
const Schemas::Schema *schema) {
|
||||
std::map<PropertyId, Value> src_vertex_properties;
|
||||
|
||||
if (!req.src_vertex_properties) {
|
||||
auto props = v_acc->Properties(View::NEW);
|
||||
if (props.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
for (auto &[key, val] : props.GetValue()) {
|
||||
src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(std::move(val))));
|
||||
}
|
||||
auto pks = PrimaryKeysFromAccessor(*v_acc, view, schema);
|
||||
if (pks) {
|
||||
src_vertex_properties.merge(*pks);
|
||||
}
|
||||
|
||||
} else if (req.src_vertex_properties.value().empty()) {
|
||||
// NOOP
|
||||
} else {
|
||||
for (const auto &prop : req.src_vertex_properties.value()) {
|
||||
auto prop_val = v_acc->GetProperty(prop, View::OLD);
|
||||
if (prop_val.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
return std::nullopt;
|
||||
}
|
||||
src_vertex_properties.insert(std::make_pair(prop, FromPropertyValueToValue(std::move(prop_val.GetValue()))));
|
||||
}
|
||||
}
|
||||
|
||||
return src_vertex_properties;
|
||||
}
|
||||
|
||||
std::optional<std::array<std::vector<EdgeAccessor>, 2>> FillUpConnectingEdges(
|
||||
const std::optional<VertexAccessor> &v_acc, const msgs::ExpandOneRequest &req,
|
||||
const EdgeUniquenessFunction &maybe_filter_based_on_edge_uniquness) {
|
||||
std::vector<EdgeTypeId> edge_types{};
|
||||
edge_types.reserve(req.edge_types.size());
|
||||
std::transform(req.edge_types.begin(), req.edge_types.end(), std::back_inserter(edge_types),
|
||||
[](const msgs::EdgeType &edge_type) { return edge_type.id; });
|
||||
|
||||
std::vector<EdgeAccessor> in_edges;
|
||||
std::vector<EdgeAccessor> out_edges;
|
||||
|
||||
switch (req.direction) {
|
||||
case msgs::EdgeDirection::OUT: {
|
||||
auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types);
|
||||
if (out_edges_result.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
return std::nullopt;
|
||||
}
|
||||
out_edges =
|
||||
maybe_filter_based_on_edge_uniquness(std::move(out_edges_result.GetValue()), msgs::EdgeDirection::OUT);
|
||||
break;
|
||||
}
|
||||
case msgs::EdgeDirection::IN: {
|
||||
auto in_edges_result = v_acc->InEdges(View::NEW, edge_types);
|
||||
if (in_edges_result.HasError()) {
|
||||
spdlog::debug(
|
||||
"Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}"[req.transaction_id
|
||||
.logical_id]);
|
||||
return std::nullopt;
|
||||
}
|
||||
in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN);
|
||||
break;
|
||||
}
|
||||
case msgs::EdgeDirection::BOTH: {
|
||||
auto in_edges_result = v_acc->InEdges(View::NEW, edge_types);
|
||||
if (in_edges_result.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
return std::nullopt;
|
||||
}
|
||||
in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN);
|
||||
auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types);
|
||||
if (out_edges_result.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
return std::nullopt;
|
||||
}
|
||||
out_edges =
|
||||
maybe_filter_based_on_edge_uniquness(std::move(out_edges_result.GetValue()), msgs::EdgeDirection::OUT);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return std::array<std::vector<EdgeAccessor>, 2>{in_edges, out_edges};
|
||||
}
|
||||
|
||||
using AllEdgePropertyDataStructure = std::map<PropertyId, msgs::Value>;
|
||||
using SpecificEdgePropertyDataStructure = std::vector<msgs::Value>;
|
||||
|
||||
using AllEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, AllEdgePropertyDataStructure>;
|
||||
using SpecificEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, SpecificEdgePropertyDataStructure>;
|
||||
|
||||
using SpecificEdgePropertiesVector = std::vector<SpecificEdgeProperties>;
|
||||
using AllEdgePropertiesVector = std::vector<AllEdgeProperties>;
|
||||
|
||||
using EdgeFiller = std::function<bool(const EdgeAccessor &edge, bool is_in_edge, msgs::ExpandOneResultRow &result_row)>;
|
||||
|
||||
template <bool are_in_edges>
|
||||
bool FillEdges(const std::vector<EdgeAccessor> &edges, msgs::ExpandOneResultRow &row, const EdgeFiller &edge_filler) {
|
||||
for (const auto &edge : edges) {
|
||||
if (!edge_filler(edge, are_in_edges, row)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||
Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req,
|
||||
const EdgeUniquenessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler,
|
||||
const Schemas::Schema *schema) {
|
||||
/// Fill up source vertex
|
||||
const auto primary_key = ConvertPropertyVector(src_vertex.second);
|
||||
auto v_acc = acc.FindVertex(primary_key, View::NEW);
|
||||
|
||||
msgs::Vertex source_vertex = {.id = src_vertex};
|
||||
if (const auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); maybe_secondary_labels) {
|
||||
source_vertex.labels = *maybe_secondary_labels;
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> src_vertex_properties;
|
||||
src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema);
|
||||
|
||||
if (!src_vertex_properties) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
/// Fill up connecting edges
|
||||
auto fill_up_connecting_edges = FillUpConnectingEdges(v_acc, req, maybe_filter_based_on_edge_uniquness);
|
||||
if (!fill_up_connecting_edges) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto [in_edges, out_edges] = fill_up_connecting_edges.value();
|
||||
|
||||
msgs::ExpandOneResultRow result_row;
|
||||
result_row.src_vertex = std::move(source_vertex);
|
||||
result_row.src_vertex_properties = std::move(*src_vertex_properties);
|
||||
static constexpr bool kInEdges = true;
|
||||
static constexpr bool kOutEdges = false;
|
||||
if (!in_edges.empty() && !FillEdges<kInEdges>(in_edges, result_row, edge_filler)) {
|
||||
return std::nullopt;
|
||||
}
|
||||
if (!out_edges.empty() && !FillEdges<kOutEdges>(out_edges, result_row, edge_filler)) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return result_row;
|
||||
}
|
||||
|
||||
EdgeUniquenessFunction InitializeEdgeUniquenessFunction(bool only_unique_neighbor_rows) {
|
||||
// Functions to select connecting edges based on uniquness
|
||||
EdgeUniquenessFunction maybe_filter_based_on_edge_uniquness;
|
||||
|
||||
if (only_unique_neighbor_rows) {
|
||||
maybe_filter_based_on_edge_uniquness = [](EdgeAccessors &&edges,
|
||||
msgs::EdgeDirection edge_direction) -> EdgeAccessors {
|
||||
std::function<bool(std::set<const storage::v3::VertexId *, VertexIdCmpr> &, const storage::v3::EdgeAccessor &)>
|
||||
is_edge_unique;
|
||||
switch (edge_direction) {
|
||||
case msgs::EdgeDirection::OUT: {
|
||||
is_edge_unique = [](std::set<const storage::v3::VertexId *, VertexIdCmpr> &other_vertex_set,
|
||||
const storage::v3::EdgeAccessor &edge_acc) {
|
||||
auto [it, insertion_happened] = other_vertex_set.insert(&edge_acc.ToVertex());
|
||||
return insertion_happened;
|
||||
};
|
||||
break;
|
||||
}
|
||||
case msgs::EdgeDirection::IN: {
|
||||
is_edge_unique = [](std::set<const storage::v3::VertexId *, VertexIdCmpr> &other_vertex_set,
|
||||
const storage::v3::EdgeAccessor &edge_acc) {
|
||||
auto [it, insertion_happened] = other_vertex_set.insert(&edge_acc.FromVertex());
|
||||
return insertion_happened;
|
||||
};
|
||||
break;
|
||||
}
|
||||
case msgs::EdgeDirection::BOTH:
|
||||
MG_ASSERT(false, "This is should never happen, msgs::EdgeDirection::BOTH should not be passed here.");
|
||||
}
|
||||
|
||||
EdgeAccessors ret;
|
||||
std::set<const storage::v3::VertexId *, VertexIdCmpr> other_vertex_set;
|
||||
|
||||
for (const auto &edge : edges) {
|
||||
if (is_edge_unique(other_vertex_set, edge)) {
|
||||
ret.emplace_back(edge);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
};
|
||||
} else {
|
||||
maybe_filter_based_on_edge_uniquness =
|
||||
[](EdgeAccessors &&edges, msgs::EdgeDirection /*edge_direction*/) -> EdgeAccessors { return std::move(edges); };
|
||||
}
|
||||
|
||||
return maybe_filter_based_on_edge_uniquness;
|
||||
}
|
||||
|
||||
EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) {
|
||||
EdgeFiller edge_filler;
|
||||
|
||||
if (!req.edge_properties) {
|
||||
edge_filler = [transaction_id = req.transaction_id.logical_id](const EdgeAccessor &edge, const bool is_in_edge,
|
||||
msgs::ExpandOneResultRow &result_row) -> bool {
|
||||
auto properties_results = edge.Properties(View::NEW);
|
||||
if (properties_results.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}", transaction_id);
|
||||
return false;
|
||||
}
|
||||
|
||||
std::map<PropertyId, msgs::Value> value_properties;
|
||||
for (auto &[prop_key, prop_val] : properties_results.GetValue()) {
|
||||
value_properties.insert(std::make_pair(prop_key, FromPropertyValueToValue(std::move(prop_val))));
|
||||
}
|
||||
using EdgeWithAllProperties = msgs::ExpandOneResultRow::EdgeWithAllProperties;
|
||||
EdgeWithAllProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()},
|
||||
edge.Gid().AsUint(), std::move(value_properties)};
|
||||
if (is_in_edge) {
|
||||
result_row.in_edges_with_all_properties.push_back(std::move(edges));
|
||||
} else {
|
||||
result_row.out_edges_with_all_properties.push_back(std::move(edges));
|
||||
}
|
||||
return true;
|
||||
};
|
||||
} else {
|
||||
// TODO(gvolfing) - do we want to set the action_successful here?
|
||||
edge_filler = [&req](const EdgeAccessor &edge, const bool is_in_edge,
|
||||
msgs::ExpandOneResultRow &result_row) -> bool {
|
||||
std::vector<msgs::Value> value_properties;
|
||||
value_properties.reserve(req.edge_properties.value().size());
|
||||
for (const auto &edge_prop : req.edge_properties.value()) {
|
||||
auto property_result = edge.GetProperty(edge_prop, View::NEW);
|
||||
if (property_result.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
return false;
|
||||
}
|
||||
value_properties.emplace_back(FromPropertyValueToValue(std::move(property_result.GetValue())));
|
||||
}
|
||||
using EdgeWithSpecificProperties = msgs::ExpandOneResultRow::EdgeWithSpecificProperties;
|
||||
EdgeWithSpecificProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()},
|
||||
edge.Gid().AsUint(), std::move(value_properties)};
|
||||
if (is_in_edge) {
|
||||
result_row.in_edges_with_specific_properties.push_back(std::move(edges));
|
||||
} else {
|
||||
result_row.out_edges_with_specific_properties.push_back(std::move(edges));
|
||||
}
|
||||
return true;
|
||||
};
|
||||
}
|
||||
|
||||
return edge_filler;
|
||||
}
|
||||
|
||||
}; // namespace
|
||||
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) {
|
||||
auto acc = shard_->Access(req.transaction_id);
|
||||
|
||||
@ -475,7 +65,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) {
|
||||
/// TODO(gvolfing) Consider other methods than converting. Change either
|
||||
/// the way that the property map is stored in the messages, or the
|
||||
/// signature of CreateVertexAndValidate.
|
||||
auto converted_property_map = ConvertPropertyMap(std::move(new_vertex.properties));
|
||||
auto converted_property_map = ConvertPropertyMap(new_vertex.properties);
|
||||
|
||||
// TODO(gvolfing) make sure if this conversion is actually needed.
|
||||
std::vector<LabelId> converted_label_ids;
|
||||
@ -764,7 +354,8 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) {
|
||||
found_props = CollectSpecificPropertiesFromAccessor(vertex, req.props_to_return.value(), view);
|
||||
} else {
|
||||
const auto *schema = shard_->GetSchema(shard_->PrimaryLabel());
|
||||
found_props = CollectAllPropertiesFromAccessor(vertex, view, schema);
|
||||
MG_ASSERT(schema);
|
||||
found_props = CollectAllPropertiesFromAccessor(vertex, view, *schema);
|
||||
}
|
||||
|
||||
// TODO(gvolfing) -VERIFY-
|
||||
@ -783,18 +374,18 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) {
|
||||
uint64_t sample_counter{0};
|
||||
auto vertex_iterable = acc.Vertices(view);
|
||||
if (!req.order_bys.empty()) {
|
||||
const auto ordered = OrderByElements(acc, dba, vertex_iterable, req.order_bys);
|
||||
const auto ordered = OrderByVertices(dba, vertex_iterable, req.order_bys);
|
||||
// we are traversing Elements
|
||||
auto it = GetStartOrderedElementsIterator(ordered, start_id, View(req.storage_view));
|
||||
for (; it != ordered.end(); ++it) {
|
||||
emplace_scan_result(it->vertex_acc);
|
||||
emplace_scan_result(it->object_acc);
|
||||
++sample_counter;
|
||||
if (req.batch_limit && sample_counter == req.batch_limit) {
|
||||
// Reached the maximum specified batch size.
|
||||
// Get the next element before exiting.
|
||||
++it;
|
||||
if (it != ordered.end()) {
|
||||
const auto &next_vertex = it->vertex_acc;
|
||||
const auto &next_vertex = it->object_acc;
|
||||
next_start_id = ConstructValueVertex(next_vertex, view).vertex_v.id;
|
||||
}
|
||||
|
||||
@ -835,10 +426,14 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
|
||||
bool action_successful = true;
|
||||
|
||||
std::vector<msgs::ExpandOneResultRow> results;
|
||||
const auto batch_limit = req.limit;
|
||||
auto dba = DbAccessor{&acc};
|
||||
|
||||
auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniquenessFunction(req.only_unique_neighbor_rows);
|
||||
auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows);
|
||||
auto edge_filler = InitializeEdgeFillerFunction(req);
|
||||
|
||||
std::vector<VertexAccessor> vertex_accessors;
|
||||
vertex_accessors.reserve(req.src_vertices.size());
|
||||
for (auto &src_vertex : req.src_vertices) {
|
||||
// Get Vertex acc
|
||||
auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW);
|
||||
@ -848,24 +443,77 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
|
||||
req.transaction_id.logical_id);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!req.filters.empty()) {
|
||||
// NOTE - DbAccessor might get removed in the future.
|
||||
auto dba = DbAccessor{&acc};
|
||||
const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol);
|
||||
if (!eval) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler,
|
||||
shard_->GetSchema(shard_->PrimaryLabel()));
|
||||
|
||||
if (!result) {
|
||||
vertex_accessors.emplace_back(src_vertex_acc_opt.value());
|
||||
}
|
||||
|
||||
if (!req.order_by_vertices.empty()) {
|
||||
// Can we do differently to avoid this? We need OrderByElements but currently it returns vector<Element>, so this
|
||||
// workaround is here to avoid more duplication later
|
||||
auto local_sorted_vertices = OrderByVertices(dba, vertex_accessors, req.order_by_vertices);
|
||||
vertex_accessors.clear();
|
||||
std::transform(local_sorted_vertices.begin(), local_sorted_vertices.end(), std::back_inserter(vertex_accessors),
|
||||
[](auto &vertex) { return vertex.object_acc; });
|
||||
}
|
||||
|
||||
for (const auto &src_vertex_acc : vertex_accessors) {
|
||||
auto label_id = src_vertex_acc.PrimaryLabel(View::NEW);
|
||||
if (label_id.HasError()) {
|
||||
action_successful = false;
|
||||
break;
|
||||
}
|
||||
|
||||
results.emplace_back(result.value());
|
||||
auto primary_key = src_vertex_acc.PrimaryKey(View::NEW);
|
||||
if (primary_key.HasError()) {
|
||||
action_successful = false;
|
||||
break;
|
||||
}
|
||||
|
||||
msgs::VertexId src_vertex(msgs::Label{.id = *label_id}, conversions::ConvertValueVector(*primary_key));
|
||||
|
||||
std::optional<msgs::ExpandOneResultRow> maybe_result;
|
||||
|
||||
if (req.order_by_edges.empty()) {
|
||||
const auto *schema = shard_->GetSchema(shard_->PrimaryLabel());
|
||||
MG_ASSERT(schema);
|
||||
maybe_result =
|
||||
GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler, *schema);
|
||||
|
||||
} else {
|
||||
auto [in_edge_accessors, out_edge_accessors] = GetEdgesFromVertex(src_vertex_acc, req.direction);
|
||||
const auto in_ordered_edges = OrderByEdges(dba, in_edge_accessors, req.order_by_edges, src_vertex_acc);
|
||||
const auto out_ordered_edges = OrderByEdges(dba, out_edge_accessors, req.order_by_edges, src_vertex_acc);
|
||||
|
||||
std::vector<EdgeAccessor> in_edge_ordered_accessors;
|
||||
std::transform(in_ordered_edges.begin(), in_ordered_edges.end(), std::back_inserter(in_edge_ordered_accessors),
|
||||
[](const auto &edge_element) { return edge_element.object_acc; });
|
||||
|
||||
std::vector<EdgeAccessor> out_edge_ordered_accessors;
|
||||
std::transform(out_ordered_edges.begin(), out_ordered_edges.end(), std::back_inserter(out_edge_ordered_accessors),
|
||||
[](const auto &edge_element) { return edge_element.object_acc; });
|
||||
const auto *schema = shard_->GetSchema(shard_->PrimaryLabel());
|
||||
MG_ASSERT(schema);
|
||||
maybe_result =
|
||||
GetExpandOneResult(src_vertex_acc, src_vertex, req, in_edge_ordered_accessors, out_edge_ordered_accessors,
|
||||
maybe_filter_based_on_edge_uniquness, edge_filler, *schema);
|
||||
}
|
||||
|
||||
if (!maybe_result) {
|
||||
action_successful = false;
|
||||
break;
|
||||
}
|
||||
|
||||
results.emplace_back(std::move(maybe_result.value()));
|
||||
if (batch_limit.has_value() && results.size() >= batch_limit.value()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
msgs::ExpandOneResponse resp{};
|
||||
|
@ -129,4 +129,27 @@ inline std::vector<Value> ConvertValueVector(const std::vector<v3::PropertyValue
|
||||
inline msgs::VertexId ToMsgsVertexId(const v3::VertexId &vertex_id) {
|
||||
return {msgs::Label{vertex_id.primary_label}, ConvertValueVector(vertex_id.primary_key)};
|
||||
}
|
||||
|
||||
inline std::vector<std::pair<v3::PropertyId, v3::PropertyValue>> ConvertPropertyMap(
|
||||
std::vector<std::pair<v3::PropertyId, Value>> &properties) {
|
||||
std::vector<std::pair<v3::PropertyId, v3::PropertyValue>> ret;
|
||||
ret.reserve(properties.size());
|
||||
|
||||
std::transform(std::make_move_iterator(properties.begin()), std::make_move_iterator(properties.end()),
|
||||
std::back_inserter(ret), [](std::pair<v3::PropertyId, Value> &&property) {
|
||||
return std::make_pair(property.first, ToPropertyValue(std::move(property.second)));
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
inline std::vector<std::pair<PropertyId, Value>> FromMap(const std::map<PropertyId, Value> &properties) {
|
||||
std::vector<std::pair<PropertyId, Value>> ret;
|
||||
ret.reserve(properties.size());
|
||||
|
||||
std::transform(properties.begin(), properties.end(), std::back_inserter(ret),
|
||||
[](const auto &property) { return std::make_pair(property.first, property.second); });
|
||||
|
||||
return ret;
|
||||
}
|
||||
} // namespace memgraph::storage::conversions
|
||||
|
@ -565,7 +565,8 @@ void AttemptToExpandOneWithWrongEdgeType(ShardClient &client, uint64_t src_verte
|
||||
std::optional<std::vector<PropertyId>> edge_properties = {};
|
||||
|
||||
std::vector<std::string> expressions;
|
||||
std::optional<std::vector<msgs::OrderBy>> order_by = {};
|
||||
std::vector<msgs::OrderBy> order_by_vertices = {};
|
||||
std::vector<msgs::OrderBy> order_by_edges = {};
|
||||
std::optional<size_t> limit = {};
|
||||
std::vector<std::string> filter = {};
|
||||
|
||||
@ -577,7 +578,8 @@ void AttemptToExpandOneWithWrongEdgeType(ShardClient &client, uint64_t src_verte
|
||||
expand_one_req.vertex_expressions = expressions;
|
||||
expand_one_req.filters = filter;
|
||||
expand_one_req.limit = limit;
|
||||
expand_one_req.order_by = order_by;
|
||||
expand_one_req.order_by_vertices = order_by_vertices;
|
||||
expand_one_req.order_by_edges = order_by_edges;
|
||||
expand_one_req.src_vertex_properties = src_vertex_properties;
|
||||
expand_one_req.src_vertices = {src_vertex};
|
||||
expand_one_req.transaction_id.logical_id = GetTransactionId();
|
||||
@ -620,7 +622,8 @@ void AttemptToExpandOneSimple(ShardClient &client, uint64_t src_vertex_val, Edge
|
||||
std::optional<std::vector<PropertyId>> edge_properties = {};
|
||||
|
||||
std::vector<std::string> expressions;
|
||||
std::optional<std::vector<msgs::OrderBy>> order_by = {};
|
||||
std::vector<msgs::OrderBy> order_by_vertices = {};
|
||||
std::vector<msgs::OrderBy> order_by_edges = {};
|
||||
std::optional<size_t> limit = {};
|
||||
std::vector<std::string> filter = {};
|
||||
|
||||
@ -632,7 +635,8 @@ void AttemptToExpandOneSimple(ShardClient &client, uint64_t src_vertex_val, Edge
|
||||
expand_one_req.vertex_expressions = expressions;
|
||||
expand_one_req.filters = filter;
|
||||
expand_one_req.limit = limit;
|
||||
expand_one_req.order_by = order_by;
|
||||
expand_one_req.order_by_vertices = order_by_vertices;
|
||||
expand_one_req.order_by_edges = order_by_edges;
|
||||
expand_one_req.src_vertex_properties = src_vertex_properties;
|
||||
expand_one_req.src_vertices = {src_vertex};
|
||||
expand_one_req.transaction_id.logical_id = GetTransactionId();
|
||||
@ -676,7 +680,8 @@ void AttemptToExpandOneWithUniqueEdges(ShardClient &client, uint64_t src_vertex_
|
||||
std::optional<std::vector<PropertyId>> edge_properties = {};
|
||||
|
||||
std::vector<std::string> expressions;
|
||||
std::optional<std::vector<msgs::OrderBy>> order_by = {};
|
||||
std::vector<msgs::OrderBy> order_by_vertices = {};
|
||||
std::vector<msgs::OrderBy> order_by_edges = {};
|
||||
std::optional<size_t> limit = {};
|
||||
std::vector<std::string> filter = {};
|
||||
|
||||
@ -688,7 +693,8 @@ void AttemptToExpandOneWithUniqueEdges(ShardClient &client, uint64_t src_vertex_
|
||||
expand_one_req.vertex_expressions = expressions;
|
||||
expand_one_req.filters = filter;
|
||||
expand_one_req.limit = limit;
|
||||
expand_one_req.order_by = order_by;
|
||||
expand_one_req.order_by_vertices = order_by_vertices;
|
||||
expand_one_req.order_by_edges = order_by_edges;
|
||||
expand_one_req.src_vertex_properties = src_vertex_properties;
|
||||
expand_one_req.src_vertices = {src_vertex};
|
||||
expand_one_req.only_unique_neighbor_rows = true;
|
||||
@ -714,6 +720,88 @@ void AttemptToExpandOneWithUniqueEdges(ShardClient &client, uint64_t src_vertex_
|
||||
}
|
||||
}
|
||||
|
||||
void AttemptToExpandOneLimitAndOrderBy(ShardClient &client, uint64_t src_vertex_val, uint64_t other_src_vertex_val,
|
||||
EdgeTypeId edge_type_id) {
|
||||
// Source vertex
|
||||
msgs::Label label = {.id = get_primary_label()};
|
||||
auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val));
|
||||
auto other_src_vertex = std::make_pair(label, GetPrimaryKey(other_src_vertex_val));
|
||||
|
||||
// Edge type
|
||||
auto edge_type = msgs::EdgeType{};
|
||||
edge_type.id = edge_type_id;
|
||||
|
||||
// Edge direction
|
||||
auto edge_direction = msgs::EdgeDirection::OUT;
|
||||
|
||||
// Source Vertex properties to look for
|
||||
std::optional<std::vector<PropertyId>> src_vertex_properties = {};
|
||||
|
||||
// Edge properties to look for
|
||||
std::optional<std::vector<PropertyId>> edge_properties = {};
|
||||
|
||||
std::vector<msgs::OrderBy> order_by_vertices = {
|
||||
{msgs::Expression{"MG_SYMBOL_NODE.prop1"}, msgs::OrderingDirection::ASCENDING}};
|
||||
std::vector<msgs::OrderBy> order_by_edges = {
|
||||
{msgs::Expression{"MG_SYMBOL_EDGE.prop4"}, msgs::OrderingDirection::DESCENDING}};
|
||||
|
||||
size_t limit = 1;
|
||||
std::vector<std::string> filters = {"MG_SYMBOL_NODE.prop1 != -1"};
|
||||
|
||||
msgs::ExpandOneRequest expand_one_req{};
|
||||
|
||||
expand_one_req.direction = edge_direction;
|
||||
expand_one_req.edge_properties = edge_properties;
|
||||
expand_one_req.edge_types = {edge_type};
|
||||
expand_one_req.filters = filters;
|
||||
expand_one_req.limit = limit;
|
||||
expand_one_req.order_by_vertices = order_by_vertices;
|
||||
expand_one_req.order_by_edges = order_by_edges;
|
||||
expand_one_req.src_vertex_properties = src_vertex_properties;
|
||||
expand_one_req.src_vertices = {src_vertex, other_src_vertex};
|
||||
expand_one_req.transaction_id.logical_id = GetTransactionId();
|
||||
|
||||
while (true) {
|
||||
auto read_res = client.SendReadRequest(expand_one_req);
|
||||
if (read_res.HasError()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto write_response_result = read_res.GetValue();
|
||||
auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result);
|
||||
|
||||
// We check that we do not have more results than the limit. Based on the data in the graph, we know that we should
|
||||
// receive exactly limit responses.
|
||||
auto expected_number_of_rows = std::min(expand_one_req.src_vertices.size(), limit);
|
||||
MG_ASSERT(expected_number_of_rows == 1);
|
||||
MG_ASSERT(write_response.result.size() == expected_number_of_rows);
|
||||
|
||||
// We know there are 1 out-going edges from V1->V2
|
||||
// We know there are 10 out-going edges from V2->V3
|
||||
// Since we sort on prop1 and limit 1, we will have a single response
|
||||
// with two edges corresponding to V1->V2 and V1->V3
|
||||
const auto expected_number_of_edges = 2;
|
||||
MG_ASSERT(write_response.result[0].out_edges_with_all_properties.size() == expected_number_of_edges);
|
||||
MG_ASSERT(write_response.result[0]
|
||||
.out_edges_with_specific_properties.empty()); // We are not asking for specific properties
|
||||
|
||||
// We also check that the vertices are ordered by prop1 DESC
|
||||
|
||||
auto is_sorted = std::is_sorted(write_response.result.cbegin(), write_response.result.cend(),
|
||||
[](const auto &vertex, const auto &other_vertex) {
|
||||
const auto primary_key = vertex.src_vertex.id.second;
|
||||
const auto other_primary_key = other_vertex.src_vertex.id.second;
|
||||
|
||||
MG_ASSERT(primary_key.size() == 1);
|
||||
MG_ASSERT(other_primary_key.size() == 1);
|
||||
return primary_key[0].int_v > other_primary_key[0].int_v;
|
||||
});
|
||||
|
||||
MG_ASSERT(is_sorted);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uint64_t src_vertex_val,
|
||||
EdgeTypeId edge_type_id) {
|
||||
// Source vertex
|
||||
@ -735,7 +823,8 @@ void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uin
|
||||
std::optional<std::vector<PropertyId>> edge_properties = {};
|
||||
|
||||
std::vector<std::string> expressions;
|
||||
std::optional<std::vector<msgs::OrderBy>> order_by = {};
|
||||
std::vector<msgs::OrderBy> order_by_vertices = {};
|
||||
std::vector<msgs::OrderBy> order_by_edges = {};
|
||||
std::optional<size_t> limit = {};
|
||||
std::vector<std::string> filter = {};
|
||||
|
||||
@ -747,7 +836,8 @@ void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uin
|
||||
expand_one_req.vertex_expressions = expressions;
|
||||
expand_one_req.filters = filter;
|
||||
expand_one_req.limit = limit;
|
||||
expand_one_req.order_by = order_by;
|
||||
expand_one_req.order_by_vertices = order_by_vertices;
|
||||
expand_one_req.order_by_edges = order_by_edges;
|
||||
expand_one_req.src_vertex_properties = src_vertex_properties;
|
||||
expand_one_req.src_vertices = {src_vertex};
|
||||
expand_one_req.transaction_id.logical_id = GetTransactionId();
|
||||
@ -795,7 +885,8 @@ void AttemptToExpandOneWithSpecifiedEdgeProperties(ShardClient &client, uint64_t
|
||||
std::optional<std::vector<PropertyId>> edge_properties = {specified_edge_prop};
|
||||
|
||||
std::vector<std::string> expressions;
|
||||
std::optional<std::vector<msgs::OrderBy>> order_by = {};
|
||||
std::vector<msgs::OrderBy> order_by_vertices = {};
|
||||
std::vector<msgs::OrderBy> order_by_edges = {};
|
||||
std::optional<size_t> limit = {};
|
||||
std::vector<std::string> filter = {};
|
||||
|
||||
@ -807,7 +898,8 @@ void AttemptToExpandOneWithSpecifiedEdgeProperties(ShardClient &client, uint64_t
|
||||
expand_one_req.vertex_expressions = expressions;
|
||||
expand_one_req.filters = filter;
|
||||
expand_one_req.limit = limit;
|
||||
expand_one_req.order_by = order_by;
|
||||
expand_one_req.order_by_vertices = order_by_vertices;
|
||||
expand_one_req.order_by_edges = order_by_edges;
|
||||
expand_one_req.src_vertex_properties = src_vertex_properties;
|
||||
expand_one_req.src_vertices = {src_vertex};
|
||||
expand_one_req.transaction_id.logical_id = GetTransactionId();
|
||||
@ -854,7 +946,8 @@ void AttemptToExpandOneWithFilters(ShardClient &client, uint64_t src_vertex_val,
|
||||
std::optional<std::vector<PropertyId>> edge_properties = {};
|
||||
|
||||
std::vector<std::string> expressions;
|
||||
std::optional<std::vector<msgs::OrderBy>> order_by = {};
|
||||
std::vector<msgs::OrderBy> order_by_vertices = {};
|
||||
std::vector<msgs::OrderBy> order_by_edges = {};
|
||||
std::optional<size_t> limit = {};
|
||||
std::vector<std::string> filter = {};
|
||||
|
||||
@ -866,7 +959,8 @@ void AttemptToExpandOneWithFilters(ShardClient &client, uint64_t src_vertex_val,
|
||||
expand_one_req.vertex_expressions = expressions;
|
||||
expand_one_req.filters = {filter_expr1};
|
||||
expand_one_req.limit = limit;
|
||||
expand_one_req.order_by = order_by;
|
||||
expand_one_req.order_by_vertices = order_by_vertices;
|
||||
expand_one_req.order_by_edges = order_by_edges;
|
||||
expand_one_req.src_vertex_properties = src_vertex_properties;
|
||||
expand_one_req.src_vertices = {src_vertex};
|
||||
expand_one_req.transaction_id.logical_id = GetTransactionId();
|
||||
@ -1057,6 +1151,9 @@ void TestExpandOneGraphOne(ShardClient &client) {
|
||||
auto edge_prop_id = GetUniqueInteger();
|
||||
auto edge_prop_val = GetUniqueInteger();
|
||||
|
||||
std::vector<uint64_t> edges_ids(10);
|
||||
std::generate(edges_ids.begin(), edges_ids.end(), GetUniqueInteger);
|
||||
|
||||
// (V1)-[edge_type_id]->(V2)
|
||||
MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_1, unique_prop_val_2, edge_gid_1, edge_prop_id,
|
||||
edge_prop_val, {edge_type_id}));
|
||||
@ -1064,7 +1161,14 @@ void TestExpandOneGraphOne(ShardClient &client) {
|
||||
MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_1, unique_prop_val_3, edge_gid_2, edge_prop_id,
|
||||
edge_prop_val, {edge_type_id}));
|
||||
|
||||
// (V2)-[edge_type_id]->(V3) x 10
|
||||
std::for_each(edges_ids.begin(), edges_ids.end(), [&](const auto &edge_id) {
|
||||
MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_2, unique_prop_val_3, edge_id, edge_prop_id,
|
||||
edge_prop_val, {edge_type_id}));
|
||||
});
|
||||
|
||||
AttemptToExpandOneSimple(client, unique_prop_val_1, edge_type_id);
|
||||
AttemptToExpandOneLimitAndOrderBy(client, unique_prop_val_1, unique_prop_val_2, edge_type_id);
|
||||
AttemptToExpandOneWithWrongEdgeType(client, unique_prop_val_1, wrong_edge_type_id);
|
||||
AttemptToExpandOneWithSpecifiedSrcVertexProperties(client, unique_prop_val_1, edge_type_id);
|
||||
AttemptToExpandOneWithSpecifiedEdgeProperties(client, unique_prop_val_1, edge_type_id, edge_prop_id);
|
||||
|
Loading…
Reference in New Issue
Block a user