Stop bfs early when possible

Summary: When doing bfs with given endpoint, we can stop the traversal on first successful pull.

Reviewers: teon.banek, msantl, mculinovic, buda

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1469
This commit is contained in:
Marin Tomic 2018-07-06 15:26:59 +02:00 committed by Teon Banek
parent 1c2f599a93
commit e2f9eb6fa5

View File

@ -1079,8 +1079,10 @@ class ExpandBfsCursor : public query::plan::Cursor {
// if current is still empty, it means both are empty, so pull from // if current is still empty, it means both are empty, so pull from
// input // input
if (to_visit_current_.empty()) { if (skip_rest_ || to_visit_current_.empty()) {
if (!input_cursor_->Pull(frame, context)) return false; if (!input_cursor_->Pull(frame, context)) return false;
to_visit_current_.clear();
to_visit_next_.clear();
processed_.clear(); processed_.clear();
auto vertex_value = frame[self_.input_symbol_]; auto vertex_value = frame[self_.input_symbol_];
@ -1098,6 +1100,7 @@ class ExpandBfsCursor : public query::plan::Cursor {
? EvaluateInt(evaluator, self_.upper_bound_, ? EvaluateInt(evaluator, self_.upper_bound_,
"Max depth in breadth-first expansion") "Max depth in breadth-first expansion")
: std::numeric_limits<int>::max(); : std::numeric_limits<int>::max();
skip_rest_ = false;
if (upper_bound_ < 1) if (upper_bound_ < 1)
throw QueryRuntimeException( throw QueryRuntimeException(
"Max depth in breadth-first expansion must be greater then " "Max depth in breadth-first expansion must be greater then "
@ -1137,6 +1140,9 @@ class ExpandBfsCursor : public query::plan::Cursor {
TypedValue &node = frame[self_.node_symbol_]; TypedValue &node = frame[self_.node_symbol_];
// due to optional matching the existing node could be null // due to optional matching the existing node could be null
if (node.IsNull() || (node != expansion.second).Value<bool>()) continue; if (node.IsNull() || (node != expansion.second).Value<bool>()) continue;
// there is no point in traversing the rest of the graph because bfs
// can find only one path to a certain node
skip_rest_ = true;
} else } else
frame[self_.node_symbol_] = expansion.second; frame[self_.node_symbol_] = expansion.second;
@ -1163,6 +1169,9 @@ class ExpandBfsCursor : public query::plan::Cursor {
int lower_bound_{-1}; int lower_bound_{-1};
int upper_bound_{-1}; int upper_bound_{-1};
// when set to true, expansion is restarted from a new source
bool skip_rest_{false};
// maps vertices to the edge they got expanded from. it is an optional // maps vertices to the edge they got expanded from. it is an optional
// edge because the root does not get expanded from anything. // edge because the root does not get expanded from anything.
// contains visited vertices as well as those scheduled to be visited. // contains visited vertices as well as those scheduled to be visited.
@ -1203,76 +1212,82 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
while (true) { while (true) {
TypedValue last_vertex; TypedValue last_vertex;
if (current_depth_ >= lower_bound_) { if (!skip_rest_) {
for (; pull_pos_ != subcursor_ids_.end(); ++pull_pos_) { if (current_depth_ >= lower_bound_) {
auto vertex = db_.db().bfs_subcursor_clients().Pull( for (; pull_pos_ != subcursor_ids_.end(); ++pull_pos_) {
pull_pos_->first, pull_pos_->second, &db_); auto vertex = db_.db().bfs_subcursor_clients().Pull(
if (vertex) { pull_pos_->first, pull_pos_->second, &db_);
last_vertex = *vertex; if (vertex) {
SwitchAccessor(last_vertex.ValueVertex(), self_.graph_view_); last_vertex = *vertex;
break; SwitchAccessor(last_vertex.ValueVertex(), self_.graph_view_);
break;
}
VLOG(10) << "Nothing to pull from " << pull_pos_->first;
} }
VLOG(10) << "Nothing to pull from " << pull_pos_->first;
}
}
if (last_vertex.IsVertex()) {
// Handle existence flag
if (self_.existing_node_) {
TypedValue &node = frame[self_.node_symbol_];
// Due to optional matching the existing node could be null
if (node.IsNull() || (node != last_vertex).ValueBool()) continue;
} else {
frame[self_.node_symbol_] = last_vertex;
} }
VLOG(10) << "Expanded to vertex: " << last_vertex; if (last_vertex.IsVertex()) {
// Handle existence flag
if (self_.existing_node_) {
TypedValue &node = frame[self_.node_symbol_];
// Due to optional matching the existing node could be null
if (node.IsNull() || (node != last_vertex).ValueBool()) continue;
// There is no point in traversing the rest of the graph because BFS
// can find only one path to a certain node.
skip_rest_ = true;
} else {
frame[self_.node_symbol_] = last_vertex;
}
// Reconstruct path VLOG(10) << "Expanded to vertex: " << last_vertex;
std::vector<TypedValue> edges;
// During path reconstruction, edges crossing worker boundary are // Reconstruct path
// obtained from edge owner to reduce network traffic. If the last std::vector<TypedValue> edges;
// worker queried for its path segment owned the crossing edge,
// `current_vertex_addr` will be set. Otherwise, `current_edge_addr`
// will be set.
std::experimental::optional<storage::VertexAddress>
current_vertex_addr = last_vertex.ValueVertex().GlobalAddress();
std::experimental::optional<storage::EdgeAddress> current_edge_addr;
while (true) { // During path reconstruction, edges crossing worker boundary are
DCHECK(static_cast<bool>(current_edge_addr) ^ // obtained from edge owner to reduce network traffic. If the last
static_cast<bool>(current_vertex_addr)) // worker queried for its path segment owned the crossing edge,
<< "Exactly one of `current_edge_addr` or `current_vertex_addr` " // `current_vertex_addr` will be set. Otherwise, `current_edge_addr`
"should be set during path reconstruction"; // will be set.
auto ret = current_edge_addr std::experimental::optional<storage::VertexAddress>
? db_.db().bfs_subcursor_clients().ReconstructPath( current_vertex_addr = last_vertex.ValueVertex().GlobalAddress();
subcursor_ids_, *current_edge_addr, &db_) std::experimental::optional<storage::EdgeAddress> current_edge_addr;
: db_.db().bfs_subcursor_clients().ReconstructPath(
subcursor_ids_, *current_vertex_addr, &db_); while (true) {
edges.insert(edges.end(), ret.edges.begin(), ret.edges.end()); DCHECK(static_cast<bool>(current_edge_addr) ^
current_vertex_addr = ret.next_vertex; static_cast<bool>(current_vertex_addr))
current_edge_addr = ret.next_edge; << "Exactly one of `current_edge_addr` or "
if (!current_vertex_addr && !current_edge_addr) break; "`current_vertex_addr` "
"should be set during path reconstruction";
auto ret = current_edge_addr
? db_.db().bfs_subcursor_clients().ReconstructPath(
subcursor_ids_, *current_edge_addr, &db_)
: db_.db().bfs_subcursor_clients().ReconstructPath(
subcursor_ids_, *current_vertex_addr, &db_);
edges.insert(edges.end(), ret.edges.begin(), ret.edges.end());
current_vertex_addr = ret.next_vertex;
current_edge_addr = ret.next_edge;
if (!current_vertex_addr && !current_edge_addr) break;
}
std::reverse(edges.begin(), edges.end());
for (auto &edge : edges)
SwitchAccessor(edge.ValueEdge(), self_.graph_view_);
frame[self_.edge_symbol_] = std::move(edges);
return true;
} }
std::reverse(edges.begin(), edges.end());
for (auto &edge : edges)
SwitchAccessor(edge.ValueEdge(), self_.graph_view_);
frame[self_.edge_symbol_] = std::move(edges);
return true;
}
// We're done pulling for this level // We're done pulling for this level
pull_pos_ = subcursor_ids_.begin(); pull_pos_ = subcursor_ids_.begin();
// Try to expand again // Try to expand again
if (current_depth_ < upper_bound_) { if (current_depth_ < upper_bound_) {
VLOG(10) << "Trying to expand again..."; VLOG(10) << "Trying to expand again...";
current_depth_++; current_depth_++;
db_.db().bfs_subcursor_clients().PrepareForExpand(subcursor_ids_, db_.db().bfs_subcursor_clients().PrepareForExpand(subcursor_ids_,
false); false);
if (db_.db().bfs_subcursor_clients().ExpandLevel(subcursor_ids_)) { if (db_.db().bfs_subcursor_clients().ExpandLevel(subcursor_ids_)) {
continue; continue;
}
} }
} }
@ -1294,6 +1309,7 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
? EvaluateInt(evaluator, self_.upper_bound_, ? EvaluateInt(evaluator, self_.upper_bound_,
"Max depth in breadth-first expansion") "Max depth in breadth-first expansion")
: std::numeric_limits<int>::max(); : std::numeric_limits<int>::max();
skip_rest_ = false;
if (upper_bound_ < 1) { if (upper_bound_ < 1) {
throw QueryRuntimeException( throw QueryRuntimeException(
@ -1323,6 +1339,9 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
int lower_bound_{-1}; int lower_bound_{-1};
int upper_bound_{-1}; int upper_bound_{-1};
// When set to true, expansion is restarted from a new source.
bool skip_rest_{false};
// Current depth. Reset for each new expansion, the initial value is // Current depth. Reset for each new expansion, the initial value is
// irrelevant. // irrelevant.
int current_depth_{-1}; int current_depth_{-1};