Fix bug on AllShortest with multiple edges between nodes (#832)

This commit is contained in:
Bruno Sačarić 2023-03-29 16:39:41 +02:00 committed by GitHub
parent 029be10f1d
commit 0819b40202
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 303 additions and 264 deletions

View File

@ -1834,7 +1834,7 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
: self_(self), : self_(self),
input_cursor_(self_.input_->MakeCursor(mem)), input_cursor_(self_.input_->MakeCursor(mem)),
visited_cost_(mem), visited_cost_(mem),
expanded_(mem), total_cost_(mem),
next_edges_(mem), next_edges_(mem),
traversal_stack_(mem), traversal_stack_(mem),
pq_(mem) {} pq_(mem) {}
@ -1844,6 +1844,9 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
storage::View::OLD); storage::View::OLD);
auto create_state = [this](const VertexAccessor &vertex, int64_t depth) {
return std::make_pair(vertex, upper_bound_set_ ? depth : 0);
};
// For the given (edge, direction, weight, depth) tuple checks if they // For the given (edge, direction, weight, depth) tuple checks if they
// satisfy the "where" condition. if so, places them in the priority // satisfy the "where" condition. if so, places them in the priority
@ -1935,26 +1938,10 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
} }
}; };
// Check if upper bound exists
upper_bound_ = self_.upper_bound_
? EvaluateInt(&evaluator, self_.upper_bound_, "Max depth in all shortest paths expansion")
: std::numeric_limits<int64_t>::max();
// Check if upper bound is valid
if (upper_bound_ < 1) {
throw QueryRuntimeException("Maximum depth in all shortest paths expansion must be at least 1.");
}
std::optional<VertexAccessor> start_vertex; std::optional<VertexAccessor> start_vertex;
auto *memory = context.evaluation_context.memory; auto *memory = context.evaluation_context.memory;
while (true) { auto create_path = [this, &frame, &memory]() {
// Check if there is an external error.
if (MustAbort(context)) throw HintedAbortError();
// If traversal stack if filled, the DFS traversal tree is created. Traverse the tree iteratively by
// preserving the traversal state on stack.
while (!traversal_stack_.empty()) {
auto &current_level = traversal_stack_.back(); auto &current_level = traversal_stack_.back();
auto &edges_on_frame = frame[self_.common_.edge_symbol].ValueList(); auto &edges_on_frame = frame[self_.common_.edge_symbol].ValueList();
@ -1967,7 +1954,7 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
edges_on_frame.erase(edges_on_frame.begin()); edges_on_frame.erase(edges_on_frame.begin());
} }
traversal_stack_.pop_back(); traversal_stack_.pop_back();
continue; return false;
} }
auto [current_edge, current_edge_direction, current_weight] = current_level.back(); auto [current_edge, current_edge_direction, current_weight] = current_level.back();
@ -1991,17 +1978,78 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
traversal_stack_.emplace_back(std::move(empty)); traversal_stack_.emplace_back(std::move(empty));
} }
if ((current_weight > visited_cost_.at(next_vertex)).ValueBool()) continue; if ((current_weight > visited_cost_.at(next_vertex)).ValueBool()) return false;
// Place destination node on the frame, handle existence flag // Place destination node on the frame, handle existence flag
if (self_.common_.existing_node) { if (self_.common_.existing_node) {
const auto &node = frame[self_.common_.node_symbol]; const auto &node = frame[self_.common_.node_symbol];
ExpectType(self_.common_.node_symbol, node, TypedValue::Type::Vertex); ExpectType(self_.common_.node_symbol, node, TypedValue::Type::Vertex);
if (node.ValueVertex() != next_vertex) continue; if (node.ValueVertex() != next_vertex) return false;
} else { } else {
frame[self_.common_.node_symbol] = next_vertex; frame[self_.common_.node_symbol] = next_vertex;
} }
return true; return true;
};
auto create_DFS_traversal_tree = [this, &context, &memory, &create_state, &expand_from_vertex]() {
while (!pq_.empty()) {
if (MustAbort(context)) throw HintedAbortError();
const auto [current_weight, current_depth, current_vertex, directed_edge] = pq_.top();
pq_.pop();
const auto &[current_edge, direction, weight] = directed_edge;
auto current_state = create_state(current_vertex, current_depth);
auto position = total_cost_.find(current_state);
if (position != total_cost_.end()) {
if ((position->second < current_weight).ValueBool()) continue;
} else {
total_cost_.emplace(current_state, current_weight);
if (current_depth < upper_bound_) {
expand_from_vertex(current_vertex, current_weight, current_depth);
}
}
// Searching for a previous vertex in the expansion
auto prev_vertex = direction == EdgeAtom::Direction::IN ? current_edge.To() : current_edge.From();
// Update the parent
if (next_edges_.find({prev_vertex, current_depth - 1}) == next_edges_.end()) {
utils::pmr::list<DirectedEdge> empty(memory);
next_edges_[{prev_vertex, current_depth - 1}] = std::move(empty);
}
next_edges_.at({prev_vertex, current_depth - 1}).emplace_back(directed_edge);
}
};
// upper_bound_set is used when storing visited edges, because with an upper bound we also consider suboptimal paths
// if they are shorter in depth
if (self_.upper_bound_) {
upper_bound_ = EvaluateInt(&evaluator, self_.upper_bound_, "Max depth in all shortest path expansion");
upper_bound_set_ = true;
} else {
upper_bound_ = std::numeric_limits<int64_t>::max();
upper_bound_set_ = false;
}
// Check if upper bound is valid
if (upper_bound_ < 1) {
throw QueryRuntimeException("Maximum depth in all shortest paths expansion must be at least 1.");
}
// On first Pull run, traversal stack and priority queue are empty, so we start a pulling stream
// and create a DFS traversal tree (main part of algorithm). Then we return the first path
// created from the DFS traversal tree (basically a DFS algorithm).
// On each subsequent Pull run, paths are created from the traversal stack and returned.
while (true) {
// Check if there is an external error.
if (MustAbort(context)) throw HintedAbortError();
// The algorithm is run all at once by create_DFS_traversal_tree, after which we
// traverse the tree iteratively by preserving the traversal state on stack.
while (!traversal_stack_.empty()) {
if (create_path()) return true;
} }
// If priority queue is empty start new pulling stream. // If priority queue is empty start new pulling stream.
@ -2022,9 +2070,9 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
// Clear existing data structures. // Clear existing data structures.
visited_cost_.clear(); visited_cost_.clear();
expanded_.clear();
next_edges_.clear(); next_edges_.clear();
traversal_stack_.clear(); traversal_stack_.clear();
total_cost_.clear();
expand_from_vertex(*start_vertex, TypedValue(), 0); expand_from_vertex(*start_vertex, TypedValue(), 0);
visited_cost_.emplace(*start_vertex, 0); visited_cost_.emplace(*start_vertex, 0);
@ -2032,33 +2080,9 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
} }
// Create a DFS traversal tree from the start node // Create a DFS traversal tree from the start node
while (!pq_.empty()) { create_DFS_traversal_tree();
if (MustAbort(context)) throw HintedAbortError();
const auto [current_weight, current_depth, current_vertex, directed_edge] = pq_.top();
pq_.pop();
const auto &[current_edge, direction, weight] = directed_edge;
if (expanded_.contains(current_edge)) continue;
expanded_.emplace(current_edge);
// Expand only if what we've just expanded is less than max depth.
if (current_depth < upper_bound_) {
expand_from_vertex(current_vertex, current_weight, current_depth);
}
// Searching for a previous vertex in the expansion
auto prev_vertex = direction == EdgeAtom::Direction::IN ? current_edge.To() : current_edge.From();
// Update the parent
if (next_edges_.find({prev_vertex, current_depth - 1}) == next_edges_.end()) {
utils::pmr::list<DirectedEdge> empty(memory);
next_edges_[{prev_vertex, current_depth - 1}] = std::move(empty);
}
next_edges_.at({prev_vertex, current_depth - 1}).emplace_back(directed_edge);
}
// DFS traversal tree is create,
if (start_vertex && next_edges_.find({*start_vertex, 0}) != next_edges_.end()) { if (start_vertex && next_edges_.find({*start_vertex, 0}) != next_edges_.end()) {
auto start_vertex_edges = next_edges_[{*start_vertex, 0}]; auto start_vertex_edges = next_edges_[{*start_vertex, 0}];
traversal_stack_.emplace_back(std::move(start_vertex_edges)); traversal_stack_.emplace_back(std::move(start_vertex_edges));
@ -2071,9 +2095,9 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
void Reset() override { void Reset() override {
input_cursor_->Reset(); input_cursor_->Reset();
visited_cost_.clear(); visited_cost_.clear();
expanded_.clear();
next_edges_.clear(); next_edges_.clear();
traversal_stack_.clear(); traversal_stack_.clear();
total_cost_.clear();
ClearQueue(); ClearQueue();
} }
@ -2083,6 +2107,7 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
// Upper bound on the path length. // Upper bound on the path length.
int64_t upper_bound_{-1}; int64_t upper_bound_{-1};
bool upper_bound_set_{false};
struct AspStateHash { struct AspStateHash {
size_t operator()(const std::pair<VertexAccessor, int64_t> &key) const { size_t operator()(const std::pair<VertexAccessor, int64_t> &key) const {
@ -2094,8 +2119,8 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
using NextEdgesState = std::pair<VertexAccessor, int64_t>; using NextEdgesState = std::pair<VertexAccessor, int64_t>;
// Maps vertices to minimum weights they got in expansion. // Maps vertices to minimum weights they got in expansion.
utils::pmr::unordered_map<VertexAccessor, TypedValue> visited_cost_; utils::pmr::unordered_map<VertexAccessor, TypedValue> visited_cost_;
// Marking the expanded edges to prevent multiple visits. // Maps vertices to weights they got in expansion.
utils::pmr::unordered_set<EdgeAccessor> expanded_; utils::pmr::unordered_map<NextEdgesState, TypedValue, AspStateHash> total_cost_;
// Maps the vertex with the potential expansion edge. // Maps the vertex with the potential expansion edge.
utils::pmr::unordered_map<NextEdgesState, utils::pmr::list<DirectedEdge>, AspStateHash> next_edges_; utils::pmr::unordered_map<NextEdgesState, utils::pmr::list<DirectedEdge>, AspStateHash> next_edges_;
// Stack indicating the traversal level. // Stack indicating the traversal level.

View File

@ -189,3 +189,17 @@ Feature: All Shortest Path
Then the result should be: Then the result should be:
| COUNT(path) | | COUNT(path) |
| 2 | | 2 |
Scenario: Test allShortest on different edge between two nodes
Given an empty graph
And having executed:
"""
CREATE (n:One), (o:Two), (m:Three), (n)-[:TYPE {cost: 0.3}]->(o), (o)-[:TYPE {cost: 40}]->(m), (o)-[:TYPE {cost: 20}]->(m)
"""
When executing query:
"""
MATCH p=(h:One)-[r*allshortest ..5 (e, v | e.cost) total_cost]->(k:Three) return total_cost;
"""
Then the result should be:
| total_cost |
| 20.3 |