Attempt impl
This commit is contained in:
parent
55e0dbca80
commit
b82e8748ad
@ -404,7 +404,7 @@ struct ExpandOneRequest {
|
||||
std::vector<std::string> vertex_expressions;
|
||||
std::vector<std::string> edge_expressions;
|
||||
|
||||
std::optional<std::vector<OrderBy>> order_by;
|
||||
std::vector<OrderBy> order_by;
|
||||
// Limit the edges or the vertices?
|
||||
std::optional<size_t> limit;
|
||||
std::vector<std::string> filters;
|
||||
|
@ -42,36 +42,38 @@ std::vector<Element<VertexAccessor>>::const_iterator GetStartOrderedElementsIter
|
||||
return ordered_elements.end();
|
||||
}
|
||||
|
||||
std::vector<EdgeAccessor> GetEdgesFromVertex(const VertexAccessor &vertex_accessor,
|
||||
const msgs::EdgeDirection direction) {
|
||||
std::array<std::vector<EdgeAccessor>, 2> GetEdgesFromVertex(const VertexAccessor &vertex_accessor,
|
||||
const msgs::EdgeDirection direction) {
|
||||
std::vector<EdgeAccessor> in_edges;
|
||||
std::vector<EdgeAccessor> out_edges;
|
||||
|
||||
switch (direction) {
|
||||
case memgraph::msgs::EdgeDirection::IN: {
|
||||
auto edges = vertex_accessor.InEdges(View::OLD);
|
||||
if (edges.HasValue()) {
|
||||
return edges.GetValue();
|
||||
in_edges = edges.GetValue();
|
||||
}
|
||||
return {};
|
||||
}
|
||||
case memgraph::msgs::EdgeDirection::OUT: {
|
||||
auto edges = vertex_accessor.OutEdges(View::OLD);
|
||||
if (edges.HasValue()) {
|
||||
return edges.GetValue();
|
||||
out_edges = edges.GetValue();
|
||||
}
|
||||
return {};
|
||||
}
|
||||
case memgraph::msgs::EdgeDirection::BOTH: {
|
||||
auto maybe_in_edges = vertex_accessor.InEdges(View::OLD);
|
||||
auto maybe_out_edges = vertex_accessor.OutEdges(View::OLD);
|
||||
std::vector<EdgeAccessor> edges;
|
||||
if (maybe_in_edges.HasValue()) {
|
||||
edges = maybe_in_edges.GetValue();
|
||||
in_edges = maybe_in_edges.GetValue();
|
||||
}
|
||||
if (maybe_out_edges.HasValue()) {
|
||||
edges.insert(edges.end(), maybe_out_edges.GetValue().begin(), maybe_out_edges.GetValue().end());
|
||||
out_edges = maybe_out_edges.GetValue();
|
||||
}
|
||||
return edges;
|
||||
}
|
||||
}
|
||||
|
||||
return std::array<std::vector<EdgeAccessor>, 2>{in_edges, out_edges};
|
||||
}
|
||||
|
||||
} // namespace memgraph::storage::v3
|
||||
|
@ -163,5 +163,6 @@ std::vector<Element<VertexAccessor>>::const_iterator GetStartOrderedElementsIter
|
||||
const std::vector<Element<VertexAccessor>> &ordered_elements, const std::vector<PropertyValue> &start_ids,
|
||||
View view);
|
||||
|
||||
std::vector<EdgeAccessor> GetEdgesFromVertex(const VertexAccessor &vertex_accessor, msgs::EdgeDirection direction);
|
||||
std::array<std::vector<EdgeAccessor>, 2> GetEdgesFromVertex(const VertexAccessor &vertex_accessor,
|
||||
msgs::EdgeDirection direction);
|
||||
} // namespace memgraph::storage::v3
|
||||
|
@ -354,6 +354,41 @@ std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||
return result_row;
|
||||
}
|
||||
|
||||
std::optional<msgs::ExpandOneResultRow> GetExpandOneResult2(
|
||||
VertexAccessor v_acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req,
|
||||
std::vector<EdgeAccessor> in_edge_accessors, std::vector<EdgeAccessor> out_edge_accessors,
|
||||
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler) {
|
||||
/// Fill up source vertex
|
||||
auto source_vertex = FillUpSourceVertex(v_acc, req, src_vertex);
|
||||
if (!source_vertex) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
/// Fill up source vertex properties
|
||||
auto src_vertex_properties = FillUpSourceVertexProperties(v_acc, req);
|
||||
if (!src_vertex_properties) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
/// Fill up connecting edges
|
||||
auto in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edge_accessors), msgs::EdgeDirection::IN);
|
||||
auto out_edges = maybe_filter_based_on_edge_uniquness(std::move(out_edge_accessors), msgs::EdgeDirection::OUT);
|
||||
|
||||
msgs::ExpandOneResultRow result_row;
|
||||
result_row.src_vertex = std::move(*source_vertex);
|
||||
result_row.src_vertex_properties = std::move(*src_vertex_properties);
|
||||
static constexpr bool kInEdges = true;
|
||||
static constexpr bool kOutEdges = false;
|
||||
if (!in_edges.empty() && !FillEdges<kInEdges>(in_edges, req, result_row, edge_filler)) {
|
||||
return std::nullopt;
|
||||
}
|
||||
if (!out_edges.empty() && !FillEdges<kOutEdges>(out_edges, req, result_row, edge_filler)) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return result_row;
|
||||
}
|
||||
|
||||
EdgeUniqunessFunction InitializeEdgeUniqunessFunction(bool only_unique_neighbor_rows) {
|
||||
// Functions to select connecting edges based on uniquness
|
||||
EdgeUniqunessFunction maybe_filter_based_on_edge_uniquness;
|
||||
@ -862,57 +897,67 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
|
||||
auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows);
|
||||
auto edge_filler = InitializeEdgeFillerFunction(req);
|
||||
|
||||
//#NoCommit code below is duplicated, one needs to factor it once it's clear
|
||||
if (req.order_by) {
|
||||
//#NoCommit code below is duplicated, one needs to factor it once it's clear!
|
||||
if (!req.order_by.empty()) {
|
||||
auto vertex_iterable = acc.Vertices(View::OLD);
|
||||
const auto ordered_vertices = OrderByElements<VertexAccessor>(acc, dba, vertex_iterable, *req.order_by);
|
||||
std::vector<std::pair<VertexAccessor, std::vector<Element<EdgeAccessor>>>> vertex_ordered_edges;
|
||||
vertex_ordered_edges.reserve(req.src_vertices.size());
|
||||
const auto ordered_vertices = OrderByElements<VertexAccessor>(acc, dba, vertex_iterable, req.order_by);
|
||||
|
||||
for (auto &src_vertex : req.src_vertices) {
|
||||
for (auto &ordered_vertice : ordered_vertices) {
|
||||
// Get Vertex acc
|
||||
auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW);
|
||||
if (!src_vertex_acc_opt) {
|
||||
action_successful = false;
|
||||
spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
break;
|
||||
}
|
||||
auto src_vertex_acc = ordered_vertice.object_acc;
|
||||
|
||||
if (!req.filters.empty()) {
|
||||
// NOTE - DbAccessor might get removed in the future.
|
||||
auto dba = DbAccessor{&acc};
|
||||
const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol);
|
||||
const bool eval = FilterOnVertex(dba, src_vertex_acc, req.filters, expr::identifier_node_symbol);
|
||||
if (!eval) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler);
|
||||
|
||||
if (!result) {
|
||||
auto [in_edge_accessors, out_edge_accessors] = GetEdgesFromVertex(src_vertex_acc, req.direction);
|
||||
const auto in_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, in_edge_accessors, req.order_by);
|
||||
const auto out_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, out_edge_accessors, req.order_by);
|
||||
|
||||
std::vector<EdgeAccessor> in_edge_ordered_accessors;
|
||||
std::transform(in_ordered_edges.begin(), in_ordered_edges.end(), std::back_inserter(in_edge_ordered_accessors),
|
||||
[](auto &edge_element) { return edge_element.object_acc; });
|
||||
|
||||
std::vector<EdgeAccessor> out_edge_ordered_accessors;
|
||||
std::transform(out_ordered_edges.begin(), out_ordered_edges.end(), std::back_inserter(out_edge_ordered_accessors),
|
||||
[](auto &edge_element) { return edge_element.object_acc; });
|
||||
|
||||
auto label_id = src_vertex_acc.PrimaryLabel(View::NEW);
|
||||
if (label_id.HasError()) {
|
||||
action_successful = false;
|
||||
break;
|
||||
}
|
||||
auto primary_key = src_vertex_acc.PrimaryKey(View::NEW);
|
||||
if (primary_key.HasError()) {
|
||||
action_successful = false;
|
||||
break;
|
||||
}
|
||||
|
||||
results.emplace_back(result.value());
|
||||
if (batch_limit) { // #NoCommit can ebd one differently
|
||||
msgs::VertexId src_vertice;
|
||||
src_vertice.first = msgs::Label{.id = *label_id};
|
||||
src_vertice.second = conversions::ConvertValueVector(*primary_key);
|
||||
auto maybe_result =
|
||||
GetExpandOneResult2(src_vertex_acc, src_vertice, req, in_edge_ordered_accessors, out_edge_ordered_accessors,
|
||||
maybe_filter_based_on_edge_uniquness, edge_filler);
|
||||
|
||||
if (!maybe_result) {
|
||||
action_successful = false;
|
||||
break;
|
||||
}
|
||||
|
||||
results.emplace_back(maybe_result.value());
|
||||
|
||||
if (batch_limit) { // #NoCommit can be done differently
|
||||
--*batch_limit;
|
||||
if (batch_limit < 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Now we have to construct response like this, and here we will care about
|
||||
// limit:
|
||||
// v1 e1
|
||||
// v1 e2
|
||||
// v1 e3
|
||||
// v2 e4
|
||||
// v2 e5
|
||||
// v2 e6
|
||||
// v2 e7
|
||||
// v3 e8
|
||||
// v3 e9
|
||||
} else {
|
||||
for (auto &src_vertex : req.src_vertices) {
|
||||
// Get Vertex acc
|
||||
|
@ -531,7 +531,7 @@ void AttemptToExpandOneWithWrongEdgeType(ShardClient &client, uint64_t src_verte
|
||||
std::optional<std::vector<PropertyId>> edge_properties = {};
|
||||
|
||||
std::vector<std::string> expressions;
|
||||
std::optional<std::vector<msgs::OrderBy>> order_by = {};
|
||||
std::vector<msgs::OrderBy> order_by = {};
|
||||
std::optional<size_t> limit = {};
|
||||
std::vector<std::string> filter = {};
|
||||
|
||||
@ -586,7 +586,7 @@ void AttemptToExpandOneSimple(ShardClient &client, uint64_t src_vertex_val, Edge
|
||||
std::optional<std::vector<PropertyId>> edge_properties = {};
|
||||
|
||||
std::vector<std::string> expressions;
|
||||
std::optional<std::vector<msgs::OrderBy>> order_by = {};
|
||||
std::vector<msgs::OrderBy> order_by = {};
|
||||
std::optional<size_t> limit = {};
|
||||
std::vector<std::string> filter = {};
|
||||
|
||||
@ -642,7 +642,7 @@ void AttemptToExpandOneWithUniqueEdges(ShardClient &client, uint64_t src_vertex_
|
||||
std::optional<std::vector<PropertyId>> edge_properties = {};
|
||||
|
||||
std::vector<std::string> expressions;
|
||||
std::optional<std::vector<msgs::OrderBy>> order_by = {};
|
||||
std::vector<msgs::OrderBy> order_by = {};
|
||||
std::optional<size_t> limit = {};
|
||||
std::vector<std::string> filter = {};
|
||||
|
||||
@ -698,9 +698,9 @@ void AttemptToExpandOneLimitAndOrderBy(ShardClient &client, uint64_t src_vertex_
|
||||
// Edge properties to look for
|
||||
std::optional<std::vector<PropertyId>> edge_properties = {};
|
||||
|
||||
std::vector<std::string> expressions;
|
||||
std::optional<std::vector<msgs::OrderBy>> order_by = {};
|
||||
// std::optional<size_t> limit = 5;
|
||||
std::vector<msgs::OrderBy> order_by = {
|
||||
{msgs::Expression{"MG_SYMBOL_NODE.prop1"}, msgs::OrderingDirection::DESCENDING}};
|
||||
std::optional<size_t> limit = 3;
|
||||
// std::optional<msgs::Filter> filter = {};
|
||||
|
||||
msgs::ExpandOneRequest expand_one_req{};
|
||||
@ -708,9 +708,8 @@ void AttemptToExpandOneLimitAndOrderBy(ShardClient &client, uint64_t src_vertex_
|
||||
expand_one_req.direction = edge_direction;
|
||||
expand_one_req.edge_properties = edge_properties;
|
||||
expand_one_req.edge_types = {edge_type};
|
||||
// expand_one_req.expressions = expressions; #NoCommit not existing?
|
||||
// expand_one_req.filter = filter;
|
||||
// expand_one_req.limit = limit;
|
||||
// expand_one_req.filter = filter; #NoCommit later?
|
||||
expand_one_req.limit = limit;
|
||||
expand_one_req.order_by = order_by;
|
||||
expand_one_req.src_vertex_properties = src_vertex_properties;
|
||||
expand_one_req.src_vertices = {src_vertex};
|
||||
@ -725,12 +724,9 @@ void AttemptToExpandOneLimitAndOrderBy(ShardClient &client, uint64_t src_vertex_
|
||||
auto write_response_result = read_res.GetValue();
|
||||
auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result);
|
||||
MG_ASSERT(write_response.result.size() == 1);
|
||||
// MG_ASSERT(write_response.result[0].edges_with_all_properties->size() == 10); // #NoCommit where does that come
|
||||
// from?
|
||||
// auto number_of_properties_on_edge =
|
||||
// (std::get<std::map<PropertyId, msgs::Value>>(write_response.result[0].edges_with_all_properties.value()[0]))
|
||||
// .size();
|
||||
// MG_ASSERT(number_of_properties_on_edge == 1);
|
||||
MG_ASSERT(write_response.result[0].out_edges_with_all_properties.size() == 10);
|
||||
auto number_of_properties_on_edge = write_response.result[0].out_edges_with_all_properties.size();
|
||||
MG_ASSERT(number_of_properties_on_edge == 1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -756,7 +752,7 @@ void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uin
|
||||
std::optional<std::vector<PropertyId>> edge_properties = {};
|
||||
|
||||
std::vector<std::string> expressions;
|
||||
std::optional<std::vector<msgs::OrderBy>> order_by = {};
|
||||
std::vector<msgs::OrderBy> order_by = {};
|
||||
std::optional<size_t> limit = {};
|
||||
std::vector<std::string> filter = {};
|
||||
|
||||
@ -816,7 +812,7 @@ void AttemptToExpandOneWithSpecifiedEdgeProperties(ShardClient &client, uint64_t
|
||||
std::optional<std::vector<PropertyId>> edge_properties = {specified_edge_prop};
|
||||
|
||||
std::vector<std::string> expressions;
|
||||
std::optional<std::vector<msgs::OrderBy>> order_by = {};
|
||||
std::vector<msgs::OrderBy> order_by = {};
|
||||
std::optional<size_t> limit = {};
|
||||
std::vector<std::string> filter = {};
|
||||
|
||||
@ -875,7 +871,7 @@ void AttemptToExpandOneWithFilters(ShardClient &client, uint64_t src_vertex_val,
|
||||
std::optional<std::vector<PropertyId>> edge_properties = {};
|
||||
|
||||
std::vector<std::string> expressions;
|
||||
std::optional<std::vector<msgs::OrderBy>> order_by = {};
|
||||
std::vector<msgs::OrderBy> order_by = {};
|
||||
std::optional<size_t> limit = {};
|
||||
std::vector<std::string> filter = {};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user