|
|
|
@ -1,4 +1,4 @@
|
|
|
|
|
// Copyright 2023 Memgraph Ltd.
|
|
|
|
|
// Copyright 2024 Memgraph Ltd.
|
|
|
|
|
//
|
|
|
|
|
// Use of this software is governed by the Business Source License
|
|
|
|
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
|
|
|
@ -1057,7 +1057,8 @@ class ExpandVariableCursor : public Cursor {
|
|
|
|
|
if (!self_.common_.existing_node) {
|
|
|
|
|
frame[self_.common_.node_symbol] = start_vertex;
|
|
|
|
|
return true;
|
|
|
|
|
} else if (CheckExistingNode(start_vertex, self_.common_.node_symbol, frame)) {
|
|
|
|
|
}
|
|
|
|
|
if (CheckExistingNode(start_vertex, self_.common_.node_symbol, frame)) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1243,6 +1244,10 @@ class ExpandVariableCursor : public Cursor {
|
|
|
|
|
MG_ASSERT(frame[self_.filter_lambda_.accumulated_path_symbol.value()].IsPath(),
|
|
|
|
|
"Accumulated path must be path");
|
|
|
|
|
Path &accumulated_path = frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath();
|
|
|
|
|
// Shrink the accumulated path including current level if necessary
|
|
|
|
|
while (accumulated_path.size() >= edges_on_frame.size()) {
|
|
|
|
|
accumulated_path.Shrink();
|
|
|
|
|
}
|
|
|
|
|
accumulated_path.Expand(current_edge.first);
|
|
|
|
|
accumulated_path.Expand(current_vertex);
|
|
|
|
|
}
|
|
|
|
@ -1260,10 +1265,9 @@ class ExpandVariableCursor : public Cursor {
|
|
|
|
|
if (self_.common_.existing_node && !CheckExistingNode(current_vertex, self_.common_.node_symbol, frame)) continue;
|
|
|
|
|
|
|
|
|
|
// We only yield true if we satisfy the lower bound.
|
|
|
|
|
if (static_cast<int64_t>(edges_on_frame.size()) >= lower_bound_)
|
|
|
|
|
if (static_cast<int64_t>(edges_on_frame.size()) >= lower_bound_) {
|
|
|
|
|
return true;
|
|
|
|
|
else
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
@ -1527,8 +1531,8 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
: self_(self),
|
|
|
|
|
input_cursor_(self_.input()->MakeCursor(mem)),
|
|
|
|
|
processed_(mem),
|
|
|
|
|
to_visit_current_(mem),
|
|
|
|
|
to_visit_next_(mem) {
|
|
|
|
|
to_visit_next_(mem),
|
|
|
|
|
to_visit_current_(mem) {
|
|
|
|
|
MG_ASSERT(!self_.common_.existing_node,
|
|
|
|
|
"Single source shortest path algorithm "
|
|
|
|
|
"should not be used when `existing_node` "
|
|
|
|
@ -1558,12 +1562,14 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
#endif
|
|
|
|
|
frame[self_.filter_lambda_.inner_edge_symbol] = edge;
|
|
|
|
|
frame[self_.filter_lambda_.inner_node_symbol] = vertex;
|
|
|
|
|
std::optional<Path> curr_acc_path = std::nullopt;
|
|
|
|
|
if (self_.filter_lambda_.accumulated_path_symbol) {
|
|
|
|
|
MG_ASSERT(frame[self_.filter_lambda_.accumulated_path_symbol.value()].IsPath(),
|
|
|
|
|
"Accumulated path must have Path type");
|
|
|
|
|
Path &accumulated_path = frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath();
|
|
|
|
|
accumulated_path.Expand(edge);
|
|
|
|
|
accumulated_path.Expand(vertex);
|
|
|
|
|
curr_acc_path = accumulated_path;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (self_.filter_lambda_.expression) {
|
|
|
|
@ -1578,21 +1584,33 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
throw QueryRuntimeException("Expansion condition must evaluate to boolean or null.");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
to_visit_next_.emplace_back(edge, vertex);
|
|
|
|
|
to_visit_next_.emplace_back(edge, vertex, std::move(curr_acc_path));
|
|
|
|
|
processed_.emplace(vertex, edge);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
auto restore_frame_state_after_expansion = [this, &frame]() {
|
|
|
|
|
if (self_.filter_lambda_.accumulated_path_symbol) {
|
|
|
|
|
frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath().Shrink();
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// populates the to_visit_next_ structure with expansions
|
|
|
|
|
// from the given vertex. skips expansions that don't satisfy
|
|
|
|
|
// the "where" condition.
|
|
|
|
|
auto expand_from_vertex = [this, &expand_pair](const auto &vertex) {
|
|
|
|
|
auto expand_from_vertex = [this, &expand_pair, &restore_frame_state_after_expansion](const auto &vertex) {
|
|
|
|
|
if (self_.common_.direction != EdgeAtom::Direction::IN) {
|
|
|
|
|
auto out_edges = UnwrapEdgesResult(vertex.OutEdges(storage::View::OLD, self_.common_.edge_types)).edges;
|
|
|
|
|
for (const auto &edge : out_edges) expand_pair(edge, edge.To());
|
|
|
|
|
for (const auto &edge : out_edges) {
|
|
|
|
|
expand_pair(edge, edge.To());
|
|
|
|
|
restore_frame_state_after_expansion();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (self_.common_.direction != EdgeAtom::Direction::OUT) {
|
|
|
|
|
auto in_edges = UnwrapEdgesResult(vertex.InEdges(storage::View::OLD, self_.common_.edge_types)).edges;
|
|
|
|
|
for (const auto &edge : in_edges) expand_pair(edge, edge.From());
|
|
|
|
|
for (const auto &edge : in_edges) {
|
|
|
|
|
expand_pair(edge, edge.From());
|
|
|
|
|
restore_frame_state_after_expansion();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
@ -1638,14 +1656,14 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// take the next expansion from the queue
|
|
|
|
|
auto expansion = to_visit_current_.back();
|
|
|
|
|
auto [curr_edge, curr_vertex, curr_acc_path] = to_visit_current_.back();
|
|
|
|
|
to_visit_current_.pop_back();
|
|
|
|
|
|
|
|
|
|
// create the frame value for the edges
|
|
|
|
|
auto *pull_memory = context.evaluation_context.memory;
|
|
|
|
|
utils::pmr::vector<TypedValue> edge_list(pull_memory);
|
|
|
|
|
edge_list.emplace_back(expansion.first);
|
|
|
|
|
auto last_vertex = expansion.second;
|
|
|
|
|
edge_list.emplace_back(curr_edge);
|
|
|
|
|
auto last_vertex = curr_vertex;
|
|
|
|
|
while (true) {
|
|
|
|
|
const EdgeAccessor &last_edge = edge_list.back().ValueEdge();
|
|
|
|
|
last_vertex = last_edge.From() == last_vertex ? last_edge.To() : last_edge.From();
|
|
|
|
@ -1657,11 +1675,17 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// expand only if what we've just expanded is less then max depth
|
|
|
|
|
if (static_cast<int64_t>(edge_list.size()) < upper_bound_) expand_from_vertex(expansion.second);
|
|
|
|
|
if (static_cast<int64_t>(edge_list.size()) < upper_bound_) {
|
|
|
|
|
if (self_.filter_lambda_.accumulated_path_symbol) {
|
|
|
|
|
MG_ASSERT(curr_acc_path.has_value(), "Expected non-null accumulated path");
|
|
|
|
|
frame[self_.filter_lambda_.accumulated_path_symbol.value()] = std::move(curr_acc_path.value());
|
|
|
|
|
}
|
|
|
|
|
expand_from_vertex(curr_vertex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (static_cast<int64_t>(edge_list.size()) < lower_bound_) continue;
|
|
|
|
|
|
|
|
|
|
frame[self_.common_.node_symbol] = expansion.second;
|
|
|
|
|
frame[self_.common_.node_symbol] = curr_vertex;
|
|
|
|
|
|
|
|
|
|
// place edges on the frame in the correct order
|
|
|
|
|
std::reverse(edge_list.begin(), edge_list.end());
|
|
|
|
@ -1693,9 +1717,9 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
// edge because the root does not get expanded from anything.
|
|
|
|
|
// contains visited vertices as well as those scheduled to be visited.
|
|
|
|
|
utils::pmr::unordered_map<VertexAccessor, std::optional<EdgeAccessor>> processed_;
|
|
|
|
|
// edge/vertex pairs we have yet to visit, for current and next depth
|
|
|
|
|
utils::pmr::vector<std::pair<EdgeAccessor, VertexAccessor>> to_visit_current_;
|
|
|
|
|
utils::pmr::vector<std::pair<EdgeAccessor, VertexAccessor>> to_visit_next_;
|
|
|
|
|
// edge, vertex we have yet to visit, for current and next depth and their accumulated paths
|
|
|
|
|
utils::pmr::vector<std::tuple<EdgeAccessor, VertexAccessor, std::optional<Path>>> to_visit_next_;
|
|
|
|
|
utils::pmr::vector<std::tuple<EdgeAccessor, VertexAccessor, std::optional<Path>>> to_visit_current_;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
namespace {
|
|
|
|
@ -1768,6 +1792,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
|
|
|
|
|
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
|
|
|
|
storage::View::OLD);
|
|
|
|
|
|
|
|
|
|
auto create_state = [this](const VertexAccessor &vertex, int64_t depth) {
|
|
|
|
|
return std::make_pair(vertex, upper_bound_set_ ? depth : 0);
|
|
|
|
|
};
|
|
|
|
@ -1791,6 +1816,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
frame[self_.weight_lambda_->inner_node_symbol] = vertex;
|
|
|
|
|
TypedValue next_weight = CalculateNextWeight(self_.weight_lambda_, total_weight, evaluator);
|
|
|
|
|
|
|
|
|
|
std::optional<Path> curr_acc_path = std::nullopt;
|
|
|
|
|
if (self_.filter_lambda_.expression) {
|
|
|
|
|
frame[self_.filter_lambda_.inner_edge_symbol] = edge;
|
|
|
|
|
frame[self_.filter_lambda_.inner_node_symbol] = vertex;
|
|
|
|
@ -1800,6 +1826,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
Path &accumulated_path = frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath();
|
|
|
|
|
accumulated_path.Expand(edge);
|
|
|
|
|
accumulated_path.Expand(vertex);
|
|
|
|
|
curr_acc_path = accumulated_path;
|
|
|
|
|
|
|
|
|
|
if (self_.filter_lambda_.accumulated_weight_symbol) {
|
|
|
|
|
frame[self_.filter_lambda_.accumulated_weight_symbol.value()] = next_weight;
|
|
|
|
@ -1815,24 +1842,32 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
if (found_it != total_cost_.end() && (found_it->second.IsNull() || (found_it->second <= next_weight).ValueBool()))
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
pq_.emplace(next_weight, depth + 1, vertex, edge);
|
|
|
|
|
pq_.emplace(next_weight, depth + 1, vertex, edge, curr_acc_path);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
auto restore_frame_state_after_expansion = [this, &frame]() {
|
|
|
|
|
if (self_.filter_lambda_.accumulated_path_symbol) {
|
|
|
|
|
frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath().Shrink();
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Populates the priority queue structure with expansions
|
|
|
|
|
// from the given vertex. skips expansions that don't satisfy
|
|
|
|
|
// the "where" condition.
|
|
|
|
|
auto expand_from_vertex = [this, &expand_pair](const VertexAccessor &vertex, const TypedValue &weight,
|
|
|
|
|
int64_t depth) {
|
|
|
|
|
auto expand_from_vertex = [this, &expand_pair, &restore_frame_state_after_expansion](
|
|
|
|
|
const VertexAccessor &vertex, const TypedValue &weight, int64_t depth) {
|
|
|
|
|
if (self_.common_.direction != EdgeAtom::Direction::IN) {
|
|
|
|
|
auto out_edges = UnwrapEdgesResult(vertex.OutEdges(storage::View::OLD, self_.common_.edge_types)).edges;
|
|
|
|
|
for (const auto &edge : out_edges) {
|
|
|
|
|
expand_pair(edge, edge.To(), weight, depth);
|
|
|
|
|
restore_frame_state_after_expansion();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (self_.common_.direction != EdgeAtom::Direction::OUT) {
|
|
|
|
|
auto in_edges = UnwrapEdgesResult(vertex.InEdges(storage::View::OLD, self_.common_.edge_types)).edges;
|
|
|
|
|
for (const auto &edge : in_edges) {
|
|
|
|
|
expand_pair(edge, edge.From(), weight, depth);
|
|
|
|
|
restore_frame_state_after_expansion();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
@ -1850,9 +1885,12 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
// Skip expansion for such nodes.
|
|
|
|
|
if (node.IsNull()) continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::optional<Path> curr_acc_path;
|
|
|
|
|
if (self_.filter_lambda_.accumulated_path_symbol) {
|
|
|
|
|
// Add initial vertex of path to the accumulated path
|
|
|
|
|
frame[self_.filter_lambda_.accumulated_path_symbol.value()] = Path(vertex);
|
|
|
|
|
curr_acc_path = Path(vertex);
|
|
|
|
|
frame[self_.filter_lambda_.accumulated_path_symbol.value()] = curr_acc_path.value();
|
|
|
|
|
}
|
|
|
|
|
if (self_.upper_bound_) {
|
|
|
|
|
upper_bound_ = EvaluateInt(&evaluator, self_.upper_bound_, "Max depth in weighted shortest path expansion");
|
|
|
|
@ -1876,7 +1914,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
total_cost_.clear();
|
|
|
|
|
yielded_vertices_.clear();
|
|
|
|
|
|
|
|
|
|
pq_.emplace(current_weight, 0, vertex, std::nullopt);
|
|
|
|
|
pq_.emplace(current_weight, 0, vertex, std::nullopt, curr_acc_path);
|
|
|
|
|
// We are adding the starting vertex to the set of yielded vertices
|
|
|
|
|
// because we don't want to yield paths that end with the starting
|
|
|
|
|
// vertex.
|
|
|
|
@ -1885,7 +1923,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
|
|
|
|
|
while (!pq_.empty()) {
|
|
|
|
|
AbortCheck(context);
|
|
|
|
|
auto [current_weight, current_depth, current_vertex, current_edge] = pq_.top();
|
|
|
|
|
auto [current_weight, current_depth, current_vertex, current_edge, curr_acc_path] = pq_.top();
|
|
|
|
|
pq_.pop();
|
|
|
|
|
|
|
|
|
|
auto current_state = create_state(current_vertex, current_depth);
|
|
|
|
@ -1898,7 +1936,12 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
total_cost_.emplace(current_state, current_weight);
|
|
|
|
|
|
|
|
|
|
// Expand only if what we've just expanded is less than max depth.
|
|
|
|
|
if (current_depth < upper_bound_) expand_from_vertex(current_vertex, current_weight, current_depth);
|
|
|
|
|
if (current_depth < upper_bound_) {
|
|
|
|
|
if (self_.filter_lambda_.accumulated_path_symbol) {
|
|
|
|
|
frame[self_.filter_lambda_.accumulated_path_symbol.value()] = std::move(curr_acc_path.value());
|
|
|
|
|
}
|
|
|
|
|
expand_from_vertex(current_vertex, current_weight, current_depth);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If we yielded a path for a vertex already, make the expansion but
|
|
|
|
|
// don't return the path again.
|
|
|
|
@ -1921,12 +1964,12 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
// Place destination node on the frame, handle existence flag.
|
|
|
|
|
if (self_.common_.existing_node) {
|
|
|
|
|
const auto &node = frame[self_.common_.node_symbol];
|
|
|
|
|
if ((node != TypedValue(current_vertex, pull_memory)).ValueBool())
|
|
|
|
|
if ((node != TypedValue(current_vertex, pull_memory)).ValueBool()) {
|
|
|
|
|
continue;
|
|
|
|
|
else
|
|
|
|
|
// Prevent expanding other paths, because we found the
|
|
|
|
|
// shortest to existing node.
|
|
|
|
|
ClearQueue();
|
|
|
|
|
}
|
|
|
|
|
// Prevent expanding other paths, because we found the
|
|
|
|
|
// shortest to existing node.
|
|
|
|
|
ClearQueue();
|
|
|
|
|
} else {
|
|
|
|
|
frame[self_.common_.node_symbol] = current_vertex;
|
|
|
|
|
}
|
|
|
|
@ -1979,8 +2022,9 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
// Priority queue comparator. Keep lowest weight on top of the queue.
|
|
|
|
|
class PriorityQueueComparator {
|
|
|
|
|
public:
|
|
|
|
|
bool operator()(const std::tuple<TypedValue, int64_t, VertexAccessor, std::optional<EdgeAccessor>> &lhs,
|
|
|
|
|
const std::tuple<TypedValue, int64_t, VertexAccessor, std::optional<EdgeAccessor>> &rhs) {
|
|
|
|
|
bool operator()(
|
|
|
|
|
const std::tuple<TypedValue, int64_t, VertexAccessor, std::optional<EdgeAccessor>, std::optional<Path>> &lhs,
|
|
|
|
|
const std::tuple<TypedValue, int64_t, VertexAccessor, std::optional<EdgeAccessor>, std::optional<Path>> &rhs) {
|
|
|
|
|
const auto &lhs_weight = std::get<0>(lhs);
|
|
|
|
|
const auto &rhs_weight = std::get<0>(rhs);
|
|
|
|
|
// Null defines minimum value for all types
|
|
|
|
@ -1997,8 +2041,9 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
std::priority_queue<std::tuple<TypedValue, int64_t, VertexAccessor, std::optional<EdgeAccessor>>,
|
|
|
|
|
utils::pmr::vector<std::tuple<TypedValue, int64_t, VertexAccessor, std::optional<EdgeAccessor>>>,
|
|
|
|
|
std::priority_queue<std::tuple<TypedValue, int64_t, VertexAccessor, std::optional<EdgeAccessor>, std::optional<Path>>,
|
|
|
|
|
utils::pmr::vector<std::tuple<TypedValue, int64_t, VertexAccessor, std::optional<EdgeAccessor>,
|
|
|
|
|
std::optional<Path>>>,
|
|
|
|
|
PriorityQueueComparator>
|
|
|
|
|
pq_;
|
|
|
|
|
|
|
|
|
@ -2024,6 +2069,9 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
|
|
|
|
|
|
|
|
|
|
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
|
|
|
|
storage::View::OLD);
|
|
|
|
|
|
|
|
|
|
auto *memory = context.evaluation_context.memory;
|
|
|
|
|
|
|
|
|
|
auto create_state = [this](const VertexAccessor &vertex, int64_t depth) {
|
|
|
|
|
return std::make_pair(vertex, upper_bound_set_ ? depth : 0);
|
|
|
|
|
};
|
|
|
|
@ -2041,6 +2089,7 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
|
|
|
|
|
TypedValue next_weight = CalculateNextWeight(self_.weight_lambda_, total_weight, evaluator);
|
|
|
|
|
|
|
|
|
|
// If filter expression exists, evaluate filter
|
|
|
|
|
std::optional<Path> curr_acc_path = std::nullopt;
|
|
|
|
|
if (self_.filter_lambda_.expression) {
|
|
|
|
|
frame[self_.filter_lambda_.inner_edge_symbol] = edge;
|
|
|
|
|
frame[self_.filter_lambda_.inner_node_symbol] = next_vertex;
|
|
|
|
@ -2050,6 +2099,7 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
|
|
|
|
|
Path &accumulated_path = frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath();
|
|
|
|
|
accumulated_path.Expand(edge);
|
|
|
|
|
accumulated_path.Expand(next_vertex);
|
|
|
|
|
curr_acc_path = accumulated_path;
|
|
|
|
|
|
|
|
|
|
if (self_.filter_lambda_.accumulated_weight_symbol) {
|
|
|
|
|
frame[self_.filter_lambda_.accumulated_weight_symbol.value()] = next_weight;
|
|
|
|
@ -2076,14 +2126,20 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
DirectedEdge directed_edge = {edge, direction, next_weight};
|
|
|
|
|
pq_.emplace(next_weight, depth + 1, next_vertex, directed_edge);
|
|
|
|
|
pq_.emplace(next_weight, depth + 1, next_vertex, directed_edge, curr_acc_path);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
auto restore_frame_state_after_expansion = [this, &frame]() {
|
|
|
|
|
if (self_.filter_lambda_.accumulated_path_symbol) {
|
|
|
|
|
frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath().Shrink();
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Populates the priority queue structure with expansions
|
|
|
|
|
// from the given vertex. skips expansions that don't satisfy
|
|
|
|
|
// the "where" condition.
|
|
|
|
|
auto expand_from_vertex = [this, &expand_vertex, &context](const VertexAccessor &vertex, const TypedValue &weight,
|
|
|
|
|
int64_t depth) {
|
|
|
|
|
auto expand_from_vertex = [this, &expand_vertex, &context, &restore_frame_state_after_expansion](
|
|
|
|
|
const VertexAccessor &vertex, const TypedValue &weight, int64_t depth) {
|
|
|
|
|
if (self_.common_.direction != EdgeAtom::Direction::IN) {
|
|
|
|
|
auto out_edges = UnwrapEdgesResult(vertex.OutEdges(storage::View::OLD, self_.common_.edge_types)).edges;
|
|
|
|
|
for (const auto &edge : out_edges) {
|
|
|
|
@ -2096,6 +2152,7 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
expand_vertex(edge, EdgeAtom::Direction::OUT, weight, depth);
|
|
|
|
|
restore_frame_state_after_expansion();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (self_.common_.direction != EdgeAtom::Direction::OUT) {
|
|
|
|
@ -2110,12 +2167,12 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
expand_vertex(edge, EdgeAtom::Direction::IN, weight, depth);
|
|
|
|
|
restore_frame_state_after_expansion();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
std::optional<VertexAccessor> start_vertex;
|
|
|
|
|
auto *memory = context.evaluation_context.memory;
|
|
|
|
|
|
|
|
|
|
auto create_path = [this, &frame, &memory]() {
|
|
|
|
|
auto ¤t_level = traversal_stack_.back();
|
|
|
|
@ -2167,11 +2224,11 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
|
|
|
|
|
return true;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
auto create_DFS_traversal_tree = [this, &context, &memory, &create_state, &expand_from_vertex]() {
|
|
|
|
|
auto create_DFS_traversal_tree = [this, &context, &memory, &frame, &create_state, &expand_from_vertex]() {
|
|
|
|
|
while (!pq_.empty()) {
|
|
|
|
|
AbortCheck(context);
|
|
|
|
|
|
|
|
|
|
const auto [current_weight, current_depth, current_vertex, directed_edge] = pq_.top();
|
|
|
|
|
auto [current_weight, current_depth, current_vertex, directed_edge, acc_path] = pq_.top();
|
|
|
|
|
pq_.pop();
|
|
|
|
|
|
|
|
|
|
const auto &[current_edge, direction, weight] = directed_edge;
|
|
|
|
@ -2183,6 +2240,10 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
|
|
|
|
|
} else {
|
|
|
|
|
total_cost_.emplace(current_state, current_weight);
|
|
|
|
|
if (current_depth < upper_bound_) {
|
|
|
|
|
if (self_.filter_lambda_.accumulated_path_symbol) {
|
|
|
|
|
DMG_ASSERT(acc_path.has_value(), "Path must be already filled in AllShortestPath DFS traversals");
|
|
|
|
|
frame[self_.filter_lambda_.accumulated_path_symbol.value()] = std::move(acc_path.value());
|
|
|
|
|
}
|
|
|
|
|
expand_from_vertex(current_vertex, current_weight, current_depth);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -2315,8 +2376,8 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
|
|
|
|
|
// Priority queue comparator. Keep lowest weight on top of the queue.
|
|
|
|
|
class PriorityQueueComparator {
|
|
|
|
|
public:
|
|
|
|
|
bool operator()(const std::tuple<TypedValue, int64_t, VertexAccessor, DirectedEdge> &lhs,
|
|
|
|
|
const std::tuple<TypedValue, int64_t, VertexAccessor, DirectedEdge> &rhs) {
|
|
|
|
|
bool operator()(const std::tuple<TypedValue, int64_t, VertexAccessor, DirectedEdge, std::optional<Path>> &lhs,
|
|
|
|
|
const std::tuple<TypedValue, int64_t, VertexAccessor, DirectedEdge, std::optional<Path>> &rhs) {
|
|
|
|
|
const auto &lhs_weight = std::get<0>(lhs);
|
|
|
|
|
const auto &rhs_weight = std::get<0>(rhs);
|
|
|
|
|
// Null defines minimum value for all types
|
|
|
|
@ -2335,9 +2396,10 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
|
|
|
|
|
|
|
|
|
|
// Priority queue - core element of the algorithm.
|
|
|
|
|
// Stores: {weight, depth, next vertex, edge and direction}
|
|
|
|
|
std::priority_queue<std::tuple<TypedValue, int64_t, VertexAccessor, DirectedEdge>,
|
|
|
|
|
utils::pmr::vector<std::tuple<TypedValue, int64_t, VertexAccessor, DirectedEdge>>,
|
|
|
|
|
PriorityQueueComparator>
|
|
|
|
|
std::priority_queue<
|
|
|
|
|
std::tuple<TypedValue, int64_t, VertexAccessor, DirectedEdge, std::optional<Path>>,
|
|
|
|
|
utils::pmr::vector<std::tuple<TypedValue, int64_t, VertexAccessor, DirectedEdge, std::optional<Path>>>,
|
|
|
|
|
PriorityQueueComparator>
|
|
|
|
|
pq_;
|
|
|
|
|
|
|
|
|
|
void ClearQueue() {
|
|
|
|
|