Extract distributed BFS as a new operator

Reviewers: mtomic, msantl

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1570
This commit is contained in:
Teon Banek 2018-08-29 10:28:05 +02:00
parent f6a56db19e
commit 24e2b31367
9 changed files with 378 additions and 225 deletions

View File

@ -1,3 +1,4 @@
/// @file
#pragma once #pragma once
#include <algorithm> #include <algorithm>
@ -554,4 +555,19 @@ class ExpressionEvaluator : public TreeVisitor<TypedValue> {
const GraphView graph_view_; const GraphView graph_view_;
}; };
/// A helper function for evaluating an expression that's an int.
///
/// @param what - Name of what's getting evaluated. Used for user feedback (via
/// exception) when the evaluated value is not an int.
/// @throw QueryRuntimeException if expression doesn't evaluate to an int.
inline int64_t EvaluateInt(ExpressionEvaluator *evaluator, Expression *expr,
const std::string &what) {
TypedValue value = expr->Accept(*evaluator);
try {
return value.Value<int64_t>();
} catch (TypedValueException &e) {
throw QueryRuntimeException(what + " must be an int");
}
}
} // namespace query } // namespace query

View File

@ -338,6 +338,49 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor {
return true; return true;
} }
bool PreVisit(DistributedExpandBfs &exp) override {
prev_ops_.push_back(&exp);
return true;
}
bool PostVisit(DistributedExpandBfs &exp) override {
prev_ops_.pop_back();
if (branch_.subtree) return true;
if (auto found = FindForbidden(exp.input_symbol())) {
SetBranch(exp.input(), &exp, *found);
}
if (exp.existing_node()) {
if (auto found = FindForbidden(exp.node_symbol())) {
SetBranch(exp.input(), &exp, *found);
}
}
CHECK(!FindForbidden(exp.edge_symbol()))
<< "Expand uses an already used edge symbol.";
// Check for bounding expressions.
if (exp.lower_bound()) {
UsedSymbolsCollector collector(*symbol_table_);
exp.lower_bound()->Accept(collector);
if (auto found = ContainsForbidden(collector.symbols_)) {
SetBranch(exp.input(), &exp, *found);
}
}
if (exp.upper_bound()) {
UsedSymbolsCollector collector(*symbol_table_);
exp.upper_bound()->Accept(collector);
if (auto found = ContainsForbidden(collector.symbols_)) {
SetBranch(exp.input(), &exp, *found);
}
}
// Check for lambda expressions
if (exp.filter_lambda().expression) {
UsedSymbolsCollector collector(*symbol_table_);
exp.filter_lambda().expression->Accept(collector);
if (auto found = ContainsForbidden(collector.symbols_)) {
SetBranch(exp.input(), &exp, *found);
}
}
return true;
}
bool PreVisit(ExpandUniquenessFilter<EdgeAccessor> &op) override { bool PreVisit(ExpandUniquenessFilter<EdgeAccessor> &op) override {
prev_ops_.push_back(&op); prev_ops_.push_back(&op);
return true; return true;
@ -935,6 +978,18 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.push_back(&exp); prev_ops_.push_back(&exp);
return true; return true;
} }
bool PostVisit(ExpandVariable &exp) override {
prev_ops_.pop_back();
if (exp.type() == EdgeAtom::Type::BREADTH_FIRST) {
auto distributed_bfs = std::make_unique<DistributedExpandBfs>(
exp.node_symbol(), exp.edge_symbol(), exp.direction(),
exp.edge_types(), exp.input(), exp.input_symbol(),
exp.existing_node(), exp.graph_view(), exp.lower_bound(),
exp.upper_bound(), exp.filter_lambda());
SetOnPrevious(std::move(distributed_bfs));
}
return true;
}
// The following operators filter the frame or put something on it. They // The following operators filter the frame or put something on it. They
// should be worker local. // should be worker local.

View File

@ -104,13 +104,37 @@ std::vector<Symbol> PullRemoteOrderBy::ModifiedSymbols(
return input_->ModifiedSymbols(table); return input_->ModifiedSymbols(table);
} }
DistributedExpandBfs::DistributedExpandBfs(
Symbol node_symbol, Symbol edge_symbol, EdgeAtom::Direction direction,
const std::vector<storage::EdgeType> &edge_types,
const std::shared_ptr<LogicalOperator> &input, Symbol input_symbol,
bool existing_node, GraphView graph_view, Expression *lower_bound,
Expression *upper_bound, const ExpandVariable::Lambda &filter_lambda)
: ExpandCommon(node_symbol, edge_symbol, direction, edge_types, input,
input_symbol, existing_node, graph_view),
lower_bound_(lower_bound),
upper_bound_(upper_bound),
filter_lambda_(filter_lambda) {}
ACCEPT_WITH_INPUT(DistributedExpandBfs);
std::vector<Symbol> DistributedExpandBfs::ModifiedSymbols(
const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
symbols.emplace_back(node_symbol());
symbols.emplace_back(edge_symbol());
return symbols;
}
//////////////////////////////////////////////////////////////////////
// Cursors
//////////////////////////////////////////////////////////////////////
namespace { namespace {
/** Helper class that wraps remote pulling for cursors that handle results from // Helper class that wraps remote pulling for cursors that handle results from
* distributed workers. // distributed workers. The command_id should be the command_id at the
* // initialization of a cursor.
* The command_id should be the command_id at the initialization of a cursor.
*/
class RemotePuller { class RemotePuller {
public: public:
RemotePuller(distributed::PullRpcClients *pull_clients, RemotePuller(distributed::PullRpcClients *pull_clients,
@ -686,6 +710,183 @@ class PullRemoteOrderByCursor : public Cursor {
bool merge_initialized_ = false; bool merge_initialized_ = false;
}; };
class DistributedExpandBfsCursor : public query::plan::Cursor {
public:
DistributedExpandBfsCursor(const DistributedExpandBfs &self,
database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self_.input()->MakeCursor(db)) {
// TODO: Pass in a DistributedGraphDb.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&db.db())) {
bfs_subcursor_clients_ = &distributed_db->bfs_subcursor_clients();
}
CHECK(bfs_subcursor_clients_);
subcursor_ids_ = bfs_subcursor_clients_->CreateBfsSubcursors(
db_.transaction_id(), self_.direction(), self_.edge_types(),
self_.graph_view());
bfs_subcursor_clients_->RegisterSubcursors(subcursor_ids_);
VLOG(10) << "BFS subcursors initialized";
pull_pos_ = subcursor_ids_.end();
}
~DistributedExpandBfsCursor() {
VLOG(10) << "Removing BFS subcursors";
bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_);
}
bool Pull(Frame &frame, Context &context) override {
// TODO(mtomic): lambda filtering in distributed
if (self_.filter_lambda().expression) {
throw utils::NotYetImplemented("lambda filtering in distributed BFS");
}
// Evaluator for the filtering condition and expansion depth.
ExpressionEvaluator evaluator(frame, &context, self_.graph_view());
while (true) {
TypedValue last_vertex;
if (!skip_rest_) {
if (current_depth_ >= lower_bound_) {
for (; pull_pos_ != subcursor_ids_.end(); ++pull_pos_) {
auto vertex = bfs_subcursor_clients_->Pull(pull_pos_->first,
pull_pos_->second, &db_);
if (vertex) {
last_vertex = *vertex;
SwitchAccessor(last_vertex.ValueVertex(), self_.graph_view());
break;
}
VLOG(10) << "Nothing to pull from " << pull_pos_->first;
}
}
if (last_vertex.IsVertex()) {
// Handle existence flag
if (self_.existing_node()) {
TypedValue &node = frame[self_.node_symbol()];
// Due to optional matching the existing node could be null
if (node.IsNull() || (node != last_vertex).ValueBool()) continue;
// There is no point in traversing the rest of the graph because BFS
// can find only one path to a certain node.
skip_rest_ = true;
} else {
frame[self_.node_symbol()] = last_vertex;
}
VLOG(10) << "Expanded to vertex: " << last_vertex;
// Reconstruct path
std::vector<TypedValue> edges;
// During path reconstruction, edges crossing worker boundary are
// obtained from edge owner to reduce network traffic. If the last
// worker queried for its path segment owned the crossing edge,
// `current_vertex_addr` will be set. Otherwise, `current_edge_addr`
// will be set.
std::experimental::optional<storage::VertexAddress>
current_vertex_addr = last_vertex.ValueVertex().GlobalAddress();
std::experimental::optional<storage::EdgeAddress> current_edge_addr;
while (true) {
DCHECK(static_cast<bool>(current_edge_addr) ^
static_cast<bool>(current_vertex_addr))
<< "Exactly one of `current_edge_addr` or "
"`current_vertex_addr` "
"should be set during path reconstruction";
auto ret = current_edge_addr
? bfs_subcursor_clients_->ReconstructPath(
subcursor_ids_, *current_edge_addr, &db_)
: bfs_subcursor_clients_->ReconstructPath(
subcursor_ids_, *current_vertex_addr, &db_);
edges.insert(edges.end(), ret.edges.begin(), ret.edges.end());
current_vertex_addr = ret.next_vertex;
current_edge_addr = ret.next_edge;
if (!current_vertex_addr && !current_edge_addr) break;
}
std::reverse(edges.begin(), edges.end());
for (auto &edge : edges)
SwitchAccessor(edge.ValueEdge(), self_.graph_view());
frame[self_.edge_symbol()] = std::move(edges);
return true;
}
// We're done pulling for this level
pull_pos_ = subcursor_ids_.begin();
// Try to expand again
if (current_depth_ < upper_bound_) {
VLOG(10) << "Trying to expand again...";
current_depth_++;
bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, false);
if (bfs_subcursor_clients_->ExpandLevel(subcursor_ids_)) {
continue;
}
}
}
VLOG(10) << "Trying to get a new source...";
// We're done with this source, try getting a new one
if (!input_cursor_->Pull(frame, context)) return false;
auto vertex_value = frame[self_.input_symbol()];
// It is possible that the vertex is Null due to optional matching.
if (vertex_value.IsNull()) continue;
auto vertex = vertex_value.ValueVertex();
lower_bound_ = self_.lower_bound()
? EvaluateInt(&evaluator, self_.lower_bound(),
"Min depth in breadth-first expansion")
: 1;
upper_bound_ = self_.upper_bound()
? EvaluateInt(&evaluator, self_.upper_bound(),
"Max depth in breadth-first expansion")
: std::numeric_limits<int>::max();
skip_rest_ = false;
if (upper_bound_ < 1) {
throw QueryRuntimeException(
"Max depth in breadth-first expansion must be at least 1");
}
VLOG(10) << "Starting BFS from " << vertex << " with limits "
<< lower_bound_ << ".." << upper_bound_;
bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, true);
bfs_subcursor_clients_->SetSource(subcursor_ids_, vertex.GlobalAddress());
current_depth_ = 1;
}
}
void Reset() override {
bfs_subcursor_clients_->ResetSubcursors(subcursor_ids_);
pull_pos_ = subcursor_ids_.end();
}
private:
const DistributedExpandBfs &self_;
database::GraphDbAccessor &db_;
distributed::BfsRpcClients *bfs_subcursor_clients_{nullptr};
const std::unique_ptr<query::plan::Cursor> input_cursor_;
// Depth bounds. Calculated on each pull from the input, the initial value
// is irrelevant.
int lower_bound_{-1};
int upper_bound_{-1};
// When set to true, expansion is restarted from a new source.
bool skip_rest_{false};
// Current depth. Reset for each new expansion, the initial value is
// irrelevant.
int current_depth_{-1};
// Map from worker IDs to their corresponding subcursors.
std::unordered_map<int16_t, int64_t> subcursor_ids_;
// Next worker master should try pulling from.
std::unordered_map<int16_t, int64_t>::iterator pull_pos_;
};
} // namespace } // namespace
std::unique_ptr<Cursor> PullRemote::MakeCursor( std::unique_ptr<Cursor> PullRemote::MakeCursor(
@ -703,4 +904,9 @@ std::unique_ptr<Cursor> PullRemoteOrderBy::MakeCursor(
return std::make_unique<PullRemoteOrderByCursor>(*this, db); return std::make_unique<PullRemoteOrderByCursor>(*this, db);
} }
std::unique_ptr<Cursor> DistributedExpandBfs::MakeCursor(
database::GraphDbAccessor &db) const {
return std::make_unique<DistributedExpandBfsCursor>(*this, db);
}
} // namespace query::plan } // namespace query::plan

View File

@ -15,9 +15,11 @@ cpp<#
class PullRemote; class PullRemote;
class Synchronize; class Synchronize;
class PullRemoteOrderBy; class PullRemoteOrderBy;
class DistributedExpandBfs;
using DistributedOperatorCompositeVisitor = using DistributedOperatorCompositeVisitor =
::utils::CompositeVisitor<PullRemote, Synchronize, PullRemoteOrderBy>; ::utils::CompositeVisitor<PullRemote, Synchronize, PullRemoteOrderBy,
DistributedExpandBfs>;
/// Base class for visiting regular and distributed LogicalOperator instances. /// Base class for visiting regular and distributed LogicalOperator instances.
class DistributedOperatorVisitor : public HierarchicalLogicalOperatorVisitor, class DistributedOperatorVisitor : public HierarchicalLogicalOperatorVisitor,
@ -169,5 +171,45 @@ by having only one result from each worker.")
(:private #>cpp PullRemoteOrderBy() {} cpp<#) (:private #>cpp PullRemoteOrderBy() {} cpp<#)
(:serialize :capnp)) (:serialize :capnp))
(lcp:define-class distributed-expand-bfs (logical-operator expand-common)
((lower-bound "Expression *" :reader t
:documentation "Optional lower bound, default is 1"
:capnp-type "Ast.Tree" :capnp-init nil
:capnp-save #'save-ast-pointer
:capnp-load (load-ast-pointer "Expression *"))
(upper-bound "Expression *" :reader t
:documentation "Optional upper bound, default is infinity"
:capnp-type "Ast.Tree" :capnp-init nil
:capnp-save #'save-ast-pointer
:capnp-load (load-ast-pointer "Expression *"))
(filter-lambda "ExpandVariable::Lambda" :reader t
:documentation "Filter that must be satisfied for expansion to succeed."
:capnp-type "ExpandVariable.Lambda"))
(:documentation "BFS expansion operator suited for distributed execution.")
(:public
#>cpp
DistributedExpandBfs(Symbol node_symbol, Symbol edge_symbol,
EdgeAtom::Direction direction,
const std::vector<storage::EdgeType> &edge_types,
const std::shared_ptr<LogicalOperator> &input,
Symbol input_symbol, bool existing_node,
GraphView graph_view, Expression *lower_bound,
Expression *upper_bound,
const ExpandVariable::Lambda &filter_lambda);
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
bool HasSingleInput() const override { return true; }
std::shared_ptr<LogicalOperator> input() const override { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) override {
input_ = input;
}
cpp<#)
(:private #>cpp DistributedExpandBfs() {} cpp<#)
(:serialize :capnp :inherit-compose '(expand-common)))
(lcp:pop-namespace) (lcp:pop-namespace)
(lcp:pop-namespace) (lcp:pop-namespace)

View File

@ -17,7 +17,6 @@
#include "communication/result_stream_faker.hpp" #include "communication/result_stream_faker.hpp"
#include "database/distributed_graph_db.hpp" #include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "distributed/bfs_rpc_clients.hpp"
#include "glue/auth.hpp" #include "glue/auth.hpp"
#include "glue/communication.hpp" #include "glue/communication.hpp"
#include "integrations/kafka/exceptions.hpp" #include "integrations/kafka/exceptions.hpp"
@ -800,22 +799,6 @@ auto ExpandFromVertex(const VertexAccessor &vertex,
return iter::chain.from_iterable(std::move(chain_elements)); return iter::chain.from_iterable(std::move(chain_elements));
} }
/** A helper function for evaluating an expression that's an int.
*
* @param evaluator
* @param expr
* @param what - Name of what's getting evaluated. Used for user
* feedback (via exception) when the evaluated value is not an int.
*/
int64_t EvaluateInt(ExpressionEvaluator &evaluator, Expression *expr,
const std::string &what) {
TypedValue value = expr->Accept(evaluator);
try {
return value.Value<int64_t>();
} catch (TypedValueException &e) {
throw QueryRuntimeException(what + " must be an int");
}
}
} // namespace } // namespace
class ExpandVariableCursor : public Cursor { class ExpandVariableCursor : public Cursor {
@ -899,7 +882,7 @@ class ExpandVariableCursor : public Cursor {
// Evaluate the upper and lower bounds. // Evaluate the upper and lower bounds.
ExpressionEvaluator evaluator(frame, &context, self_.graph_view_); ExpressionEvaluator evaluator(frame, &context, self_.graph_view_);
auto calc_bound = [&evaluator](auto &bound) { auto calc_bound = [&evaluator](auto &bound) {
auto value = EvaluateInt(evaluator, bound, "Variable expansion bound"); auto value = EvaluateInt(&evaluator, bound, "Variable expansion bound");
if (value < 0) if (value < 0)
throw QueryRuntimeException( throw QueryRuntimeException(
"Variable expansion bound must be positive or zero"); "Variable expansion bound must be positive or zero");
@ -1110,11 +1093,11 @@ class ExpandBfsCursor : public query::plan::Cursor {
processed_.emplace(vertex, std::experimental::nullopt); processed_.emplace(vertex, std::experimental::nullopt);
expand_from_vertex(vertex); expand_from_vertex(vertex);
lower_bound_ = self_.lower_bound_ lower_bound_ = self_.lower_bound_
? EvaluateInt(evaluator, self_.lower_bound_, ? EvaluateInt(&evaluator, self_.lower_bound_,
"Min depth in breadth-first expansion") "Min depth in breadth-first expansion")
: 1; : 1;
upper_bound_ = self_.upper_bound_ upper_bound_ = self_.upper_bound_
? EvaluateInt(evaluator, self_.upper_bound_, ? EvaluateInt(&evaluator, self_.upper_bound_,
"Max depth in breadth-first expansion") "Max depth in breadth-first expansion")
: std::numeric_limits<int>::max(); : std::numeric_limits<int>::max();
skip_rest_ = false; skip_rest_ = false;
@ -1199,183 +1182,6 @@ class ExpandBfsCursor : public query::plan::Cursor {
std::deque<std::pair<EdgeAccessor, VertexAccessor>> to_visit_next_; std::deque<std::pair<EdgeAccessor, VertexAccessor>> to_visit_next_;
}; };
class DistributedExpandBfsCursor : public query::plan::Cursor {
public:
DistributedExpandBfsCursor(const ExpandVariable &self,
database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {
// TODO: Pass in a DistributedGraphDb.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&db.db())) {
bfs_subcursor_clients_ = &distributed_db->bfs_subcursor_clients();
}
CHECK(bfs_subcursor_clients_);
subcursor_ids_ = bfs_subcursor_clients_->CreateBfsSubcursors(
db_.transaction_id(), self_.direction(), self_.edge_types(),
self_.graph_view());
bfs_subcursor_clients_->RegisterSubcursors(subcursor_ids_);
VLOG(10) << "BFS subcursors initialized";
pull_pos_ = subcursor_ids_.end();
}
~DistributedExpandBfsCursor() {
VLOG(10) << "Removing BFS subcursors";
bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_);
}
bool Pull(Frame &frame, Context &context) override {
// TODO(mtomic): lambda filtering in distributed
if (self_.filter_lambda_.expression) {
throw utils::NotYetImplemented("lambda filtering in distributed BFS");
}
// Evaluator for the filtering condition and expansion depth.
ExpressionEvaluator evaluator(frame, &context, self_.graph_view_);
while (true) {
TypedValue last_vertex;
if (!skip_rest_) {
if (current_depth_ >= lower_bound_) {
for (; pull_pos_ != subcursor_ids_.end(); ++pull_pos_) {
auto vertex = bfs_subcursor_clients_->Pull(pull_pos_->first,
pull_pos_->second, &db_);
if (vertex) {
last_vertex = *vertex;
SwitchAccessor(last_vertex.ValueVertex(), self_.graph_view_);
break;
}
VLOG(10) << "Nothing to pull from " << pull_pos_->first;
}
}
if (last_vertex.IsVertex()) {
// Handle existence flag
if (self_.existing_node_) {
TypedValue &node = frame[self_.node_symbol_];
// Due to optional matching the existing node could be null
if (node.IsNull() || (node != last_vertex).ValueBool()) continue;
// There is no point in traversing the rest of the graph because BFS
// can find only one path to a certain node.
skip_rest_ = true;
} else {
frame[self_.node_symbol_] = last_vertex;
}
VLOG(10) << "Expanded to vertex: " << last_vertex;
// Reconstruct path
std::vector<TypedValue> edges;
// During path reconstruction, edges crossing worker boundary are
// obtained from edge owner to reduce network traffic. If the last
// worker queried for its path segment owned the crossing edge,
// `current_vertex_addr` will be set. Otherwise, `current_edge_addr`
// will be set.
std::experimental::optional<storage::VertexAddress>
current_vertex_addr = last_vertex.ValueVertex().GlobalAddress();
std::experimental::optional<storage::EdgeAddress> current_edge_addr;
while (true) {
DCHECK(static_cast<bool>(current_edge_addr) ^
static_cast<bool>(current_vertex_addr))
<< "Exactly one of `current_edge_addr` or "
"`current_vertex_addr` "
"should be set during path reconstruction";
auto ret = current_edge_addr
? bfs_subcursor_clients_->ReconstructPath(
subcursor_ids_, *current_edge_addr, &db_)
: bfs_subcursor_clients_->ReconstructPath(
subcursor_ids_, *current_vertex_addr, &db_);
edges.insert(edges.end(), ret.edges.begin(), ret.edges.end());
current_vertex_addr = ret.next_vertex;
current_edge_addr = ret.next_edge;
if (!current_vertex_addr && !current_edge_addr) break;
}
std::reverse(edges.begin(), edges.end());
for (auto &edge : edges)
SwitchAccessor(edge.ValueEdge(), self_.graph_view_);
frame[self_.edge_symbol_] = std::move(edges);
return true;
}
// We're done pulling for this level
pull_pos_ = subcursor_ids_.begin();
// Try to expand again
if (current_depth_ < upper_bound_) {
VLOG(10) << "Trying to expand again...";
current_depth_++;
bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, false);
if (bfs_subcursor_clients_->ExpandLevel(subcursor_ids_)) {
continue;
}
}
}
VLOG(10) << "Trying to get a new source...";
// We're done with this source, try getting a new one
if (!input_cursor_->Pull(frame, context)) return false;
auto vertex_value = frame[self_.input_symbol_];
// It is possible that the vertex is Null due to optional matching.
if (vertex_value.IsNull()) continue;
auto vertex = vertex_value.ValueVertex();
lower_bound_ = self_.lower_bound_
? EvaluateInt(evaluator, self_.lower_bound_,
"Min depth in breadth-first expansion")
: 1;
upper_bound_ = self_.upper_bound_
? EvaluateInt(evaluator, self_.upper_bound_,
"Max depth in breadth-first expansion")
: std::numeric_limits<int>::max();
skip_rest_ = false;
if (upper_bound_ < 1) {
throw QueryRuntimeException(
"Max depth in breadth-first expansion must be at least 1");
}
VLOG(10) << "Starting BFS from " << vertex << " with limits "
<< lower_bound_ << ".." << upper_bound_;
bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, true);
bfs_subcursor_clients_->SetSource(subcursor_ids_, vertex.GlobalAddress());
current_depth_ = 1;
}
}
void Reset() override {
bfs_subcursor_clients_->ResetSubcursors(subcursor_ids_);
pull_pos_ = subcursor_ids_.end();
}
private:
const ExpandVariable &self_;
database::GraphDbAccessor &db_;
distributed::BfsRpcClients *bfs_subcursor_clients_{nullptr};
const std::unique_ptr<query::plan::Cursor> input_cursor_;
// Depth bounds. Calculated on each pull from the input, the initial value
// is irrelevant.
int lower_bound_{-1};
int upper_bound_{-1};
// When set to true, expansion is restarted from a new source.
bool skip_rest_{false};
// Current depth. Reset for each new expansion, the initial value is
// irrelevant.
int current_depth_{-1};
// Map from worker IDs to their corresponding subcursors.
std::unordered_map<int16_t, int64_t> subcursor_ids_;
// Next worker master should try pulling from.
std::unordered_map<int16_t, int64_t>::iterator pull_pos_;
};
class ExpandWeightedShortestPathCursor : public query::plan::Cursor { class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
public: public:
ExpandWeightedShortestPathCursor(const ExpandVariable &self, ExpandWeightedShortestPathCursor(const ExpandVariable &self,
@ -1459,7 +1265,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
SwitchAccessor(vertex, self_.graph_view_); SwitchAccessor(vertex, self_.graph_view_);
if (self_.upper_bound_) { if (self_.upper_bound_) {
upper_bound_ = upper_bound_ =
EvaluateInt(evaluator, self_.upper_bound_, EvaluateInt(&evaluator, self_.upper_bound_,
"Max depth in weighted shortest path expansion"); "Max depth in weighted shortest path expansion");
upper_bound_set_ = true; upper_bound_set_ = true;
} else { } else {
@ -1613,11 +1419,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
std::unique_ptr<Cursor> ExpandVariable::MakeCursor( std::unique_ptr<Cursor> ExpandVariable::MakeCursor(
database::GraphDbAccessor &db) const { database::GraphDbAccessor &db) const {
if (type_ == EdgeAtom::Type::BREADTH_FIRST) { if (type_ == EdgeAtom::Type::BREADTH_FIRST) {
if (db.db().type() == database::GraphDb::Type::SINGLE_NODE) { return std::make_unique<ExpandBfsCursor>(*this, db);
return std::make_unique<ExpandBfsCursor>(*this, db);
} else {
return std::make_unique<DistributedExpandBfsCursor>(*this, db);
}
} else if (type_ == EdgeAtom::Type::WEIGHTED_SHORTEST_PATH) { } else if (type_ == EdgeAtom::Type::WEIGHTED_SHORTEST_PATH) {
return std::make_unique<ExpandWeightedShortestPathCursor>(*this, db); return std::make_unique<ExpandWeightedShortestPathCursor>(*this, db);
} else { } else {

View File

@ -837,7 +837,7 @@ pulled.")
'(single depth-first breadth-first weighted-shortest-path)) '(single depth-first breadth-first weighted-shortest-path))
:capnp-load (lcp:capnp-load-enum "::query::capnp::EdgeAtom::Type" "EdgeAtom::Type" :capnp-load (lcp:capnp-load-enum "::query::capnp::EdgeAtom::Type" "EdgeAtom::Type"
'(single depth-first breadth-first weighted-shortest-path))) '(single depth-first breadth-first weighted-shortest-path)))
(is-reverse :bool :documentation (is-reverse :bool :reader t :documentation
"True if the path should be written as expanding from node_symbol to input_symbol.") "True if the path should be written as expanding from node_symbol to input_symbol.")
(lower-bound "Expression *" :reader t (lower-bound "Expression *" :reader t
:capnp-type "Ast.Tree" :capnp-init nil :capnp-type "Ast.Tree" :capnp-init nil
@ -855,7 +855,7 @@ pulled.")
:capnp-load (lcp:capnp-load-optional :capnp-load (lcp:capnp-load-optional
"capnp::ExpandVariable::Lambda" "Lambda" "capnp::ExpandVariable::Lambda" "Lambda"
"[helper](const auto &reader) { Lambda val; val.Load(reader, helper); return val; }")) "[helper](const auto &reader) { Lambda val; val.Load(reader, helper); return val; }"))
(total-weight "std::experimental::optional<Symbol>" (total-weight "std::experimental::optional<Symbol>" :reader t
:capnp-save (lcp:capnp-save-optional "::query::capnp::Symbol" "Symbol") :capnp-save (lcp:capnp-save-optional "::query::capnp::Symbol" "Symbol")
:capnp-load (lcp:capnp-load-optional "::query::capnp::Symbol" "Symbol"))) :capnp-load (lcp:capnp-load-optional "::query::capnp::Symbol" "Symbol")))
(:documentation (:documentation
@ -936,14 +936,13 @@ pulled.")
// the Cursors are not declared in the header because // the Cursors are not declared in the header because
// it's edges_ and edges_it_ are decltyped using a helper function // it's edges_ and edges_it_ are decltyped using a helper function
// that should be inaccessible (private class function won't compile) // that should be inaccessible (private class function won't compile)
friend class DistributedExpandBfsCursor;
friend class ExpandVariableCursor; friend class ExpandVariableCursor;
friend class ExpandBfsCursor; friend class ExpandBfsCursor;
friend class ExpandWeightedShortestPathCursor; friend class ExpandWeightedShortestPathCursor;
ExpandVariable() {} ExpandVariable() {}
cpp<#) cpp<#)
(:serialize :capnp :inherit-compose '(expand-common))) (:serialize :capnp :inherit-compose '(expand-common)))
(lcp:define-class construct-named-path (logical-operator) (lcp:define-class construct-named-path (logical-operator)
((input "std::shared_ptr<LogicalOperator>" ((input "std::shared_ptr<LogicalOperator>"

View File

@ -80,6 +80,14 @@ class PlanPrinter final : public DistributedOperatorVisitor {
return true; return true;
} }
bool PreVisit(query::plan::DistributedExpandBfs &op) override {
WithPrintLn([&](auto &out) {
out << "* DistributedExpandBfs";
PrintExpand(out, op);
});
return true;
}
bool PreVisit(query::plan::Produce &op) override { bool PreVisit(query::plan::Produce &op) override {
WithPrintLn([&](auto &out) { WithPrintLn([&](auto &out) {
out << "* Produce {"; out << "* Produce {";

View File

@ -17,6 +17,7 @@
#include "distributed/updates_rpc_server.hpp" #include "distributed/updates_rpc_server.hpp"
#include "query/context.hpp" #include "query/context.hpp"
#include "query/exceptions.hpp" #include "query/exceptions.hpp"
#include "query/plan/distributed_ops.hpp"
#include "query/plan/operator.hpp" #include "query/plan/operator.hpp"
#include "distributed_common.hpp" #include "distributed_common.hpp"
@ -924,13 +925,21 @@ class QueryPlanExpandBfs
auto node_sym = symbol_table.CreateSymbol("node", true); auto node_sym = symbol_table.CreateSymbol("node", true);
auto edge_list_sym = symbol_table.CreateSymbol("edgelist_", true); auto edge_list_sym = symbol_table.CreateSymbol("edgelist_", true);
last_op = std::make_shared<ExpandVariable>( if (GetParam().first == TestType::DISTRIBUTED) {
node_sym, edge_list_sym, EdgeAtom::Type::BREADTH_FIRST, direction, last_op = std::make_shared<DistributedExpandBfs>(
std::vector<storage::EdgeType>{}, false, LITERAL(min_depth), node_sym, edge_list_sym, direction, std::vector<storage::EdgeType>{},
LITERAL(max_depth), last_op, source_sym, last_op, source_sym, !!existing_node, graph_view, LITERAL(min_depth),
static_cast<bool>(existing_node), LITERAL(max_depth),
ExpandVariable::Lambda{inner_edge, inner_node, where}, ExpandVariable::Lambda{inner_edge, inner_node, where});
std::experimental::nullopt, std::experimental::nullopt, graph_view); } else {
last_op = std::make_shared<ExpandVariable>(
node_sym, edge_list_sym, EdgeAtom::Type::BREADTH_FIRST, direction,
std::vector<storage::EdgeType>{}, false, LITERAL(min_depth),
LITERAL(max_depth), last_op, source_sym,
static_cast<bool>(existing_node),
ExpandVariable::Lambda{inner_edge, inner_node, where},
std::experimental::nullopt, std::experimental::nullopt, graph_view);
}
Frame frame(symbol_table.max_position()); Frame frame(symbol_table.max_position());
@ -2394,8 +2403,7 @@ TEST(QueryPlan, ScanAllEqualsScanAllByLabelProperty) {
}; };
// Make sure there are `vertex_count` results when using scan all // Make sure there are `vertex_count` results when using scan all
auto count_with_scan_all = [&db, &label, &prop](int prop_value, auto count_with_scan_all = [&db, &prop](int prop_value, int prop_count) {
int prop_count) {
AstStorage storage; AstStorage storage;
SymbolTable symbol_table; SymbolTable symbol_table;
auto dba_ptr = db.Access(); auto dba_ptr = db.Access();

View File

@ -131,6 +131,7 @@ class PlanChecker : public DistributedOperatorVisitor {
} }
PRE_VISIT(PullRemoteOrderBy); PRE_VISIT(PullRemoteOrderBy);
PRE_VISIT(DistributedExpandBfs);
VISIT(AuthHandler); VISIT(AuthHandler);
@ -192,6 +193,7 @@ using ExpectOrderBy = OpChecker<OrderBy>;
using ExpectUnwind = OpChecker<Unwind>; using ExpectUnwind = OpChecker<Unwind>;
using ExpectDistinct = OpChecker<Distinct>; using ExpectDistinct = OpChecker<Distinct>;
using ExpectShowStreams = OpChecker<ShowStreams>; using ExpectShowStreams = OpChecker<ShowStreams>;
using ExpectDistributedExpandBfs = OpChecker<DistributedExpandBfs>;
class ExpectExpandVariable : public OpChecker<ExpandVariable> { class ExpectExpandVariable : public OpChecker<ExpandVariable> {
public: public:
@ -2241,9 +2243,24 @@ TYPED_TEST(TestPlanner, MatchBfs) {
bfs->filter_lambda_.inner_node = IDENT("n"); bfs->filter_lambda_.inner_node = IDENT("n");
bfs->filter_lambda_.expression = IDENT("n"); bfs->filter_lambda_.expression = IDENT("n");
bfs->upper_bound_ = LITERAL(10); bfs->upper_bound_ = LITERAL(10);
QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), bfs, NODE("m"))), RETURN("r"))); auto *as_r = NEXPR("r", IDENT("r"));
CheckPlan<TypeParam>(storage, ExpectScanAll(), ExpectExpandBfs(), QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), bfs, NODE("m"))), RETURN(as_r)));
ExpectProduce()); auto symbol_table = MakeSymbolTable(*storage.query());
{
FakeDbAccessor dba;
auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpandBfs(),
ExpectProduce());
}
{
ExpectPullRemote pull({symbol_table.at(*as_r)});
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectDistributedExpandBfs(),
ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectDistributedExpandBfs(),
ExpectProduce()));
CheckDistributedPlan<TypeParam>(storage, expected);
}
} }
TYPED_TEST(TestPlanner, MatchDoubleScanToExpandExisting) { TYPED_TEST(TestPlanner, MatchDoubleScanToExpandExisting) {