Distributed BFS filter lambda

Summary:
TODO:

~~1. Figure out how to propagate exceptions during lambda evaluation to master.~~

~~2. Make some more complicated test cases to see if everything is~~
~~sent over the network properly (lambdas depending on frame, evaluation context).~~

~~3. Support only `GraphView::OLD`.~~

4. [MAYBE] Send only parts of the frame necessary for lambda evaluation.

~~5. Fix EdgeType handling~~

--------------------

Serialize frame and send it in PrepareForExpand RPC

Move Lambda out of ExpandVariable

Send symbol table and filter lambda in CreateBfsSubcursor RPC

Evaluate filter lambda during the expansion

Send evaluation context in CreateBfsSubcursor RPC

Reviewers: teon.banek, msantl

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1600
This commit is contained in:
Marin Tomic 2018-09-28 16:29:27 +02:00
parent f53343a415
commit 6894e2aef8
27 changed files with 1138 additions and 963 deletions

View File

@ -134,7 +134,6 @@ add_custom_target(generate_lcp DEPENDS ${generated_lcp_files})
add_capnp(communication/rpc/messages.capnp)
add_capnp(durability/recovery.capnp)
add_capnp(query/common.capnp)
add_capnp(query/frontend/ast/ast.capnp)
add_capnp(query/frontend/semantic/symbol.capnp)
add_capnp(query/serialization.capnp)

View File

@ -625,8 +625,7 @@ class Master {
*storage_, tx_engine_, config_.gc_cycle_sec, server_, coordination_);
TypemapPack<storage::MasterConcurrentIdMapper> typemap_pack_{server_};
database::MasterCounters counters_{&server_};
distributed::BfsSubcursorStorage subcursor_storage_{self_,
&bfs_subcursor_clients_};
distributed::BfsSubcursorStorage subcursor_storage_{&bfs_subcursor_clients_};
distributed::BfsRpcServer bfs_subcursor_server_{self_, &server_,
&subcursor_storage_};
distributed::BfsRpcClients bfs_subcursor_clients_{
@ -1008,8 +1007,7 @@ class Worker {
TypemapPack<storage::WorkerConcurrentIdMapper> typemap_pack_{
*coordination_.GetClientPool(0)};
database::WorkerCounters counters_{coordination_.GetClientPool(0)};
distributed::BfsSubcursorStorage subcursor_storage_{self_,
&bfs_subcursor_clients_};
distributed::BfsSubcursorStorage subcursor_storage_{&bfs_subcursor_clients_};
distributed::BfsRpcServer bfs_subcursor_server_{self_, &server_,
&subcursor_storage_};
distributed::BfsRpcClients bfs_subcursor_clients_{

View File

@ -16,20 +16,23 @@ BfsRpcClients::BfsRpcClients(database::DistributedGraphDb *db,
data_manager_(data_manager) {}
std::unordered_map<int16_t, int64_t> BfsRpcClients::CreateBfsSubcursors(
tx::TransactionId tx_id, query::EdgeAtom::Direction direction,
database::GraphDbAccessor *dba, query::EdgeAtom::Direction direction,
const std::vector<storage::EdgeType> &edge_types,
query::GraphView graph_view) {
const query::plan::ExpansionLambda &filter_lambda,
const query::SymbolTable &symbol_table,
const query::EvaluationContext &evaluation_context) {
auto futures = coordination_->ExecuteOnWorkers<std::pair<int16_t, int64_t>>(
db_->WorkerId(),
[tx_id, direction, &edge_types, graph_view](int worker_id, auto &client) {
db_->WorkerId(), [&](int worker_id, auto &client) {
auto res = client.template Call<CreateBfsSubcursorRpc>(
tx_id, direction, edge_types, graph_view);
dba->transaction_id(), direction, edge_types, filter_lambda,
symbol_table, evaluation_context);
return std::make_pair(worker_id, res.member);
});
std::unordered_map<int16_t, int64_t> subcursor_ids;
subcursor_ids.emplace(
db_->WorkerId(),
subcursor_storage_->Create(tx_id, direction, edge_types, graph_view));
subcursor_storage_->Create(dba, direction, edge_types, symbol_table,
nullptr, filter_lambda, evaluation_context));
for (auto &future : futures) {
auto got = subcursor_ids.emplace(future.get());
CHECK(got.second) << "CreateBfsSubcursors failed: duplicate worker id";
@ -55,8 +58,7 @@ void BfsRpcClients::ResetSubcursors(
const std::unordered_map<int16_t, int64_t> &subcursor_ids) {
auto futures = coordination_->ExecuteOnWorkers<void>(
db_->WorkerId(), [&subcursor_ids](int worker_id, auto &client) {
client.template Call<ResetSubcursorRpc>(
subcursor_ids.at(worker_id));
client.template Call<ResetSubcursorRpc>(subcursor_ids.at(worker_id));
});
subcursor_storage_->Get(subcursor_ids.at(db_->WorkerId()))->Reset();
// Wait and get all of the replies.
@ -85,13 +87,14 @@ std::experimental::optional<VertexAccessor> BfsRpcClients::Pull(
return subcursor_storage_->Get(subcursor_id)->Pull();
}
auto res = coordination_->GetClientPool(worker_id)->CallWithLoad<SubcursorPullRpc>(
[this, dba](const auto &reader) {
SubcursorPullRes res;
Load(&res, reader, dba, this->data_manager_);
return res;
},
subcursor_id);
auto res =
coordination_->GetClientPool(worker_id)->CallWithLoad<SubcursorPullRpc>(
[this, dba](const auto &reader) {
SubcursorPullRes res;
Load(&res, reader, dba, this->data_manager_);
return res;
},
subcursor_id);
return res.vertex;
}
@ -101,7 +104,15 @@ bool BfsRpcClients::ExpandLevel(
db_->WorkerId(), [&subcursor_ids](int worker_id, auto &client) {
auto res =
client.template Call<ExpandLevelRpc>(subcursor_ids.at(worker_id));
return res.member;
switch (res.result) {
case ExpandResult::SUCCESS:
return true;
case ExpandResult::FAILURE:
return false;
case ExpandResult::LAMBDA_ERROR:
throw query::QueryRuntimeException(
"Expansion condition must evaluate to boolean or null");
}
});
bool expanded =
subcursor_storage_->Get(subcursor_ids.at(db_->WorkerId()))->ExpandLevel();
@ -133,9 +144,10 @@ bool BfsRpcClients::ExpandToRemoteVertex(
CHECK(!vertex.is_local())
<< "ExpandToRemoteVertex should not be called with local vertex";
int worker_id = vertex.address().worker_id();
auto res = coordination_->GetClientPool(worker_id)->Call<ExpandToRemoteVertexRpc>(
subcursor_ids.at(worker_id), edge.GlobalAddress(),
vertex.GlobalAddress());
auto res =
coordination_->GetClientPool(worker_id)->Call<ExpandToRemoteVertexRpc>(
subcursor_ids.at(worker_id), edge.GlobalAddress(),
vertex.GlobalAddress());
return res.member;
}
@ -179,14 +191,16 @@ PathSegment BfsRpcClients::ReconstructPath(
}
void BfsRpcClients::PrepareForExpand(
const std::unordered_map<int16_t, int64_t> &subcursor_ids, bool clear) {
const std::unordered_map<int16_t, int64_t> &subcursor_ids, bool clear,
const std::vector<query::TypedValue> &frame) {
auto futures = coordination_->ExecuteOnWorkers<void>(
db_->WorkerId(), [clear, &subcursor_ids](int worker_id, auto &client) {
db_->WorkerId(),
[this, clear, &frame, &subcursor_ids](int worker_id, auto &client) {
client.template Call<PrepareForExpandRpc>(
subcursor_ids.at(worker_id), clear);
subcursor_ids.at(worker_id), clear, frame, db_->WorkerId());
});
subcursor_storage_->Get(subcursor_ids.at(db_->WorkerId()))
->PrepareForExpand(clear);
->PrepareForExpand(clear, frame);
// Wait and get all of the replies.
for (auto &future : futures) {
if (future.valid()) future.get();

View File

@ -23,13 +23,14 @@ class BfsRpcClients {
public:
BfsRpcClients(database::DistributedGraphDb *db,
BfsSubcursorStorage *subcursor_storage,
Coordination *coordination,
DataManager *data_manager);
Coordination *coordination, DataManager *data_manager);
std::unordered_map<int16_t, int64_t> CreateBfsSubcursors(
tx::TransactionId tx_id, query::EdgeAtom::Direction direction,
database::GraphDbAccessor *dba, query::EdgeAtom::Direction direction,
const std::vector<storage::EdgeType> &edge_types,
query::GraphView graph_view);
const query::plan::ExpansionLambda &filter_lambda,
const query::SymbolTable &symbol_table,
const query::EvaluationContext &evaluation_context);
void RegisterSubcursors(
const std::unordered_map<int16_t, int64_t> &subcursor_ids);
@ -61,7 +62,8 @@ class BfsRpcClients {
storage::VertexAddress vertex, database::GraphDbAccessor *dba);
void PrepareForExpand(
const std::unordered_map<int16_t, int64_t> &subcursor_ids, bool clear);
const std::unordered_map<int16_t, int64_t> &subcursor_ids, bool clear,
const std::vector<query::TypedValue> &frame);
private:
database::DistributedGraphDb *db_;

View File

@ -6,7 +6,9 @@
#include "communication/rpc/messages.hpp"
#include "distributed/bfs_rpc_messages.capnp.h"
#include "distributed/bfs_subcursor.hpp"
#include "query/plan/operator.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/plan/distributed_ops.hpp"
#include "query/serialization.hpp"
#include "storage/serialization.hpp"
#include "transactions/type.hpp"
#include "utils/serialization.hpp"
@ -17,12 +19,15 @@ cpp<#
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'ast "/query/frontend/ast/ast.capnp")
(lcp:capnp-import 'query "/query/common.capnp")
(lcp:capnp-import 'dist-ops "/query/plan/distributed_ops.capnp")
(lcp:capnp-import 'query "/query/serialization.capnp")
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'symbol "/query/frontend/semantic/symbol.capnp")
(lcp:capnp-import 'utils "/utils/serialization.capnp")
(lcp:capnp-type-conversion "storage::EdgeAddress" "Storage.Address")
(lcp:capnp-type-conversion "storage::VertexAddress" "Storage.Address")
(lcp:capnp-type-conversion "storage::EdgeType" "Storage.Common")
(lcp:define-rpc create-bfs-subcursor
(:request
@ -35,17 +40,36 @@ cpp<#
:capnp-load (lcp:capnp-load-enum "::query::capnp::EdgeAtom::Direction"
"query::EdgeAtom::Direction"
'(in out both)))
;; TODO(mtomic): Why isn't edge-types serialized?
(edge-types "std::vector<storage::EdgeType>"
:capnp-save :dont-save)
(graph-view "query::GraphView"
:capnp-type "Query.GraphView" :capnp-init nil
:capnp-save (lcp:capnp-save-enum "::query::capnp::GraphView"
"query::GraphView"
'(old new))
:capnp-load (lcp:capnp-load-enum "::query::capnp::GraphView"
"query::GraphView"
'(old new)))))
:capnp-save (lcp:capnp-save-vector "::storage::capnp::Common"
"storage::EdgeType")
:capnp-load (lcp:capnp-load-vector "::storage::capnp::Common"
"storage::EdgeType"))
(filter-lambda "query::plan::ExpansionLambda"
:capnp-type "DistOps.ExpansionLambda"
:capnp-save (lambda (builder member capnp-name)
#>cpp
std::vector<int> saved_ast_uids;
Save(${member}, &${builder}, &saved_ast_uids);
cpp<#)
:capnp-load (lambda (reader member capnp-name)
#>cpp
std::vector<int> loaded_ast_uids;
Load(&${member}, ${reader}, ast_storage, &loaded_ast_uids);
cpp<#))
(symbol-table "query::SymbolTable"
:capnp-type "Symbol.SymbolTable")
(evaluation-context "query::EvaluationContext"
:capnp-type "Query.EvaluationContext"
:capnp-save (lambda (builder member capnp-name)
#>cpp
query::SaveEvaluationContext(${member}, &${builder});
cpp<#)
:capnp-load (lambda (reader member capnp-name)
#>cpp
query::LoadEvaluationContext(${reader}, &${member});
cpp<#)))
(:serialize :capnp :load-args '((ast-storage "query::AstStorage *"))))
(:response ((member :int64_t))))
(lcp:define-rpc register-subcursors
@ -86,9 +110,13 @@ cpp<#
(:request ((member :int64_t)))
(:response ()))
(lcp:define-enum expand-result
(success failure lambda-error)
(:serialize :capnp))
(lcp:define-rpc expand-level
(:request ((member :int64_t)))
(:response ((member :bool))))
(:response ((result "ExpandResult"))))
(lcp:define-rpc subcursor-pull
(:request ((member :int64_t)))
@ -193,7 +221,28 @@ cpp<#
(lcp:define-rpc prepare-for-expand
(:request
((subcursor-id :int64_t)
(clear :bool)))
(clear :bool)
(frame "std::vector<query::TypedValue>"
:capnp-type "List(Query.TypedValue)"
:capnp-save (lcp:capnp-save-vector
"query::capnp::TypedValue"
"query::TypedValue"
"[&self](auto *builder, const auto &value) {
query::SaveCapnpTypedValue(value, builder,
storage::SendVersions::ONLY_OLD, self.worker_id);
}")
:capnp-load (lcp:capnp-load-vector
"query::capnp::TypedValue"
"query::TypedValue"
"[dba, data_manager](const auto &reader) {
query::TypedValue value;
query::LoadCapnpTypedValue(reader, &value, dba, data_manager);
return value;
}"))
(worker-id :int :capnp-save :dont-save))
(:serialize :capnp
:load-args '((dba "database::GraphDbAccessor *")
(data-manager "distributed::DataManager *"))))
(:response ()))
(lcp:pop-namespace) ;; distributed

View File

@ -24,9 +24,15 @@ class BfsRpcServer {
server_->Register<CreateBfsSubcursorRpc>(
[this](const auto &req_reader, auto *res_builder) {
CreateBfsSubcursorReq req;
Load(&req, req_reader);
CreateBfsSubcursorRes res(subcursor_storage_->Create(
req.tx_id, req.direction, req.edge_types, req.graph_view));
auto ast_storage = std::make_unique<query::AstStorage>();
Load(&req, req_reader, ast_storage.get());
auto db_accessor = db_->Access(req.tx_id);
auto id = subcursor_storage_->Create(
db_accessor.get(), req.direction, req.edge_types,
std::move(req.symbol_table), std::move(ast_storage),
req.filter_lambda, std::move(req.evaluation_context));
db_accessors_[id] = std::move(db_accessor);
CreateBfsSubcursorRes res(id);
Save(res, res_builder);
});
@ -53,6 +59,7 @@ class BfsRpcServer {
[this](const auto &req_reader, auto *res_builder) {
RemoveBfsSubcursorReq req;
Load(&req, req_reader);
db_accessors_.erase(req.member);
subcursor_storage_->Erase(req.member);
RemoveBfsSubcursorRes res;
Save(res, res_builder);
@ -67,13 +74,21 @@ class BfsRpcServer {
Save(res, res_builder);
});
server_->Register<ExpandLevelRpc>([this](const auto &req_reader,
auto *res_builder) {
ExpandLevelReq req;
Load(&req, req_reader);
ExpandLevelRes res(subcursor_storage_->Get(req.member)->ExpandLevel());
Save(res, res_builder);
});
server_->Register<ExpandLevelRpc>(
[this](const auto &req_reader, auto *res_builder) {
ExpandLevelReq req;
Load(&req, req_reader);
auto subcursor = subcursor_storage_->Get(req.member);
ExpandResult result;
try {
result = subcursor->ExpandLevel() ? ExpandResult::SUCCESS
: ExpandResult::FAILURE;
} catch (const query::QueryRuntimeException &) {
result = ExpandResult::LAMBDA_ERROR;
}
ExpandLevelRes res(result);
Save(res, res_builder);
});
server_->Register<SubcursorPullRpc>(
[this](const auto &req_reader, auto *res_builder) {
@ -107,15 +122,18 @@ class BfsRpcServer {
} else {
LOG(FATAL) << "`edge` or `vertex` should be set in ReconstructPathReq";
}
ReconstructPathRes res(result.edges, result.next_vertex, result.next_edge);
ReconstructPathRes res(result.edges, result.next_vertex,
result.next_edge);
Save(res, res_builder, db_->WorkerId());
});
server_->Register<PrepareForExpandRpc>([this](const auto &req_reader,
auto *res_builder) {
PrepareForExpandReq req;
Load(&req, req_reader);
subcursor_storage_->Get(req.subcursor_id)->PrepareForExpand(req.clear);
auto subcursor_id = req_reader.getSubcursorId();
auto *subcursor = subcursor_storage_->Get(subcursor_id);
Load(&req, req_reader, subcursor->db_accessor(), &db_->data_manager());
subcursor->PrepareForExpand(req.clear, std::move(req.frame));
PrepareForExpandRes res;
Save(res, res_builder);
});
@ -125,6 +143,7 @@ class BfsRpcServer {
database::DistributedGraphDb *db_;
communication::rpc::Server *server_;
std::map<int64_t, std::unique_ptr<database::GraphDbAccessor>> db_accessors_;
BfsSubcursorStorage *subcursor_storage_;
};

View File

@ -4,6 +4,7 @@
#include "database/distributed_graph_db.hpp"
#include "distributed/bfs_rpc_clients.hpp"
#include "query/exceptions.hpp"
#include "query/plan/operator.hpp"
#include "storage/address_types.hpp"
#include "storage/vertex_accessor.hpp"
@ -13,15 +14,23 @@ namespace distributed {
using query::TypedValue;
ExpandBfsSubcursor::ExpandBfsSubcursor(
database::GraphDb *db, tx::TransactionId tx_id,
query::EdgeAtom::Direction direction,
std::vector<storage::EdgeType> edge_types, query::GraphView graph_view,
database::GraphDbAccessor *dba, query::EdgeAtom::Direction direction,
std::vector<storage::EdgeType> edge_types, query::SymbolTable symbol_table,
std::unique_ptr<query::AstStorage> ast_storage,
query::plan::ExpansionLambda filter_lambda,
query::EvaluationContext evaluation_context,
BfsRpcClients *bfs_subcursor_clients)
: bfs_subcursor_clients_(bfs_subcursor_clients),
dba_(db->Access(tx_id)),
dba_(dba),
direction_(direction),
edge_types_(std::move(edge_types)),
graph_view_(graph_view) {
symbol_table_(std::move(symbol_table)),
ast_storage_(std::move(ast_storage)),
filter_lambda_(filter_lambda),
evaluation_context_(std::move(evaluation_context)),
frame_(symbol_table_.max_position()),
expression_evaluator_(&frame_, symbol_table_, evaluation_context_, dba_,
query::GraphView::OLD) {
Reset();
}
@ -35,14 +44,15 @@ void ExpandBfsSubcursor::Reset() {
void ExpandBfsSubcursor::SetSource(storage::VertexAddress source_address) {
Reset();
auto source = VertexAccessor(source_address, *dba_);
SwitchAccessor(source, graph_view_);
processed_.emplace(source, std::experimental::nullopt);
ExpandFromVertex(source);
}
void ExpandBfsSubcursor::PrepareForExpand(bool clear) {
void ExpandBfsSubcursor::PrepareForExpand(
bool clear, std::vector<query::TypedValue> frame) {
if (clear) {
Reset();
frame_.elems() = std::move(frame);
} else {
std::swap(to_visit_current_, to_visit_next_);
to_visit_next_.clear();
@ -71,7 +81,6 @@ bool ExpandBfsSubcursor::ExpandToLocalVertex(storage::EdgeAddress edge,
<< "ExpandToLocalVertex called with remote vertex";
edge = dba_->db().storage().LocalizedAddressIfPossible(edge);
SwitchAccessor(vertex, graph_view_);
std::lock_guard<std::mutex> lock(mutex_);
auto got = processed_.emplace(vertex, edge);
@ -146,7 +155,18 @@ void ExpandBfsSubcursor::ReconstructPathHelper(VertexAccessor vertex,
bool ExpandBfsSubcursor::ExpandToVertex(EdgeAccessor edge,
VertexAccessor vertex) {
// TODO(mtomic): lambda filtering in distributed
if (filter_lambda_.expression) {
frame_[filter_lambda_.inner_edge_symbol] = edge;
frame_[filter_lambda_.inner_node_symbol] = vertex;
TypedValue result =
filter_lambda_.expression->Accept(expression_evaluator_);
if (!result.IsNull() && !result.IsBool()) {
throw query::QueryRuntimeException(
"Expansion condition must evaluate to boolean or null");
}
if (result.IsNull() || !result.ValueBool()) return false;
}
return vertex.is_local() ? ExpandToLocalVertex(edge.address(), vertex)
: bfs_subcursor_clients_->ExpandToRemoteVertex(
subcursor_ids_, edge, vertex);
@ -165,20 +185,22 @@ bool ExpandBfsSubcursor::ExpandFromVertex(VertexAccessor vertex) {
return expanded;
}
BfsSubcursorStorage::BfsSubcursorStorage(database::GraphDb *db,
BfsRpcClients *bfs_subcursor_clients)
: db_(db), bfs_subcursor_clients_(bfs_subcursor_clients) {}
BfsSubcursorStorage::BfsSubcursorStorage(BfsRpcClients *bfs_subcursor_clients)
: bfs_subcursor_clients_(bfs_subcursor_clients) {}
int64_t BfsSubcursorStorage::Create(tx::TransactionId tx_id,
query::EdgeAtom::Direction direction,
std::vector<storage::EdgeType> edge_types,
query::GraphView graph_view) {
int64_t BfsSubcursorStorage::Create(
database::GraphDbAccessor *dba, query::EdgeAtom::Direction direction,
std::vector<storage::EdgeType> edge_types, query::SymbolTable symbol_table,
std::unique_ptr<query::AstStorage> ast_storage,
query::plan::ExpansionLambda filter_lambda,
query::EvaluationContext evaluation_context) {
std::lock_guard<std::mutex> lock(mutex_);
int64_t id = next_subcursor_id_++;
auto got =
storage_.emplace(id, std::make_unique<ExpandBfsSubcursor>(
db_, tx_id, direction, std::move(edge_types),
graph_view, bfs_subcursor_clients_));
auto got = storage_.emplace(
id, std::make_unique<ExpandBfsSubcursor>(
dba, direction, std::move(edge_types), std::move(symbol_table),
std::move(ast_storage), filter_lambda,
std::move(evaluation_context), bfs_subcursor_clients_));
CHECK(got.second) << "Subcursor with ID " << id << " already exists";
return id;
}

View File

@ -8,6 +8,9 @@
#include "glog/logging.h"
#include "database/graph_db_accessor.hpp"
#include "query/context.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/interpret/eval.hpp"
#include "query/plan/operator.hpp"
namespace database {
@ -32,10 +35,13 @@ struct PathSegment {
/// class per worker, and those instances communicate via RPC calls.
class ExpandBfsSubcursor {
public:
ExpandBfsSubcursor(database::GraphDb *db, tx::TransactionId tx_id,
ExpandBfsSubcursor(database::GraphDbAccessor *dba,
query::EdgeAtom::Direction direction,
std::vector<storage::EdgeType> edge_types,
query::GraphView graph_view,
query::SymbolTable symbol_table,
std::unique_ptr<query::AstStorage> ast_storage,
query::plan::ExpansionLambda filter_lambda,
query::EvaluationContext evaluation_context,
BfsRpcClients *bfs_subcursor_clients);
// Stores subcursor ids of other workers.
@ -55,7 +61,8 @@ class ExpandBfsSubcursor {
///
/// @param clear if set to true, `Reset` will be called instead of moving
/// `to_visit_next_`
void PrepareForExpand(bool clear);
// @param frame frame for evaluation of filter lambda expression
void PrepareForExpand(bool clear, std::vector<query::TypedValue> frame);
/// Expands the BFS frontier once. Returns true if there was a successful
/// expansion.
@ -77,7 +84,7 @@ class ExpandBfsSubcursor {
/// Reconstruct the part of path to given vertex stored on this worker.
PathSegment ReconstructPath(storage::VertexAddress vertex_addr);
database::GraphDbAccessor *db_accessor() { return dba_.get(); }
database::GraphDbAccessor *db_accessor() { return dba_; }
/// Used to reset subcursor state before starting expansion from new source.
void Reset();
@ -96,14 +103,26 @@ class ExpandBfsSubcursor {
BfsRpcClients *bfs_subcursor_clients_{nullptr};
std::unique_ptr<database::GraphDbAccessor> dba_;
database::GraphDbAccessor *dba_;
/// IDs of subcursors on other workers, used when sending RPCs.
std::unordered_map<int16_t, int64_t> subcursor_ids_;
query::EdgeAtom::Direction direction_;
std::vector<storage::EdgeType> edge_types_;
query::GraphView graph_view_;
/// Symbol table and AstStorage for filter lambda evaluation. If subcursor
/// doesn't own the filter lambda expression, `ast_storage_` is set to
/// nullptr.
query::SymbolTable symbol_table_;
std::unique_ptr<query::AstStorage> ast_storage_;
query::plan::ExpansionLambda filter_lambda_;
/// Evaluation context, frame and expression evaluator for evaluation of
/// filter lambda.
query::EvaluationContext evaluation_context_;
query::Frame frame_;
query::ExpressionEvaluator expression_evaluator_;
/// Mutex protecting `to_visit_next_` and `processed_`, because there is a
/// race between expansions done locally using `ExpandToLocalVertex` and
@ -130,17 +149,19 @@ class ExpandBfsSubcursor {
/// Thread-safe storage for BFS subcursors.
class BfsSubcursorStorage {
public:
explicit BfsSubcursorStorage(database::GraphDb *db,
BfsRpcClients *bfs_subcursor_clients);
explicit BfsSubcursorStorage(BfsRpcClients *bfs_subcursor_clients);
int64_t Create(tx::TransactionId tx_id, query::EdgeAtom::Direction direction,
int64_t Create(database::GraphDbAccessor *dba,
query::EdgeAtom::Direction direction,
std::vector<storage::EdgeType> edge_types,
query::GraphView graph_view);
query::SymbolTable symbol_table,
std::unique_ptr<query::AstStorage> ast_storage,
query::plan::ExpansionLambda filter_lambda,
query::EvaluationContext evaluation_context);
void Erase(int64_t subcursor_id);
ExpandBfsSubcursor *Get(int64_t subcursor_id);
private:
database::GraphDb *db_{nullptr};
BfsRpcClients *bfs_subcursor_clients_{nullptr};
std::mutex mutex_;

View File

@ -189,33 +189,14 @@ to the appropriate value. Not used on side that generates the response.")
(command-id "tx::CommandId")
(evaluation-context "query::EvaluationContext"
:capnp-type "Query.EvaluationContext"
:capnp-save
(lambda (builder member capnp-name)
(declare (ignore capnp-name))
#>cpp
${builder}.setTimestamp(${member}.timestamp);
auto params_builder = ${builder}.initParams().initEntries(${member}.parameters.size());
size_t i = 0;
for (auto &entry : ${member}.parameters) {
auto builder = params_builder[i];
auto key_builder = builder.initKey();
key_builder.setValue(entry.first);
auto value_builder = builder.initValue();
storage::SaveCapnpPropertyValue(entry.second, &value_builder);
++i;
}
cpp<#)
:capnp-load
(lambda (reader member capnp-name)
(declare (ignore capnp-name))
#>cpp
${member}.timestamp = ${reader}.getTimestamp();
for (const auto &entry_reader : ${reader}.getParams().getEntries()) {
PropertyValue value;
storage::LoadCapnpPropertyValue(entry_reader.getValue(), &value);
${member}.parameters.Add(entry_reader.getKey().getValue(), value);
}
cpp<#))
:capnp-save (lambda (builder member capnp-name)
#>cpp
query::SaveEvaluationContext(${member}, &${builder});
cpp<#)
:capnp-load (lambda (reader member capnp-name)
#>cpp
query::LoadEvaluationContext(${reader}, &${member});
cpp<#))
(symbols "std::vector<query::Symbol>" :capnp-type "List(Sem.Symbol)")
(accumulate :bool)
(batch-size :int64_t)

View File

@ -1,15 +0,0 @@
@0xcbc2c66202fdf643;
using Cxx = import "/capnp/c++.capnp";
$Cxx.namespace("query::capnp");
using Ast = import "/query/frontend/ast/ast.capnp";
enum GraphView {
old @0;
new @1;
}
struct TypedValueVectorCompare {
ordering @0 :List(Ast.Ordering);
}

View File

@ -7,11 +7,10 @@
#include "query/exceptions.hpp"
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/semantic/symbol.hpp"
#include "query/serialization.capnp.h"
#include "query/typed_value.hpp"
#include "storage/types.hpp"
#include "query/common.capnp.h"
namespace query {
// These are the functions for parsing literals and parameter names from

View File

@ -123,7 +123,7 @@ DistributedExpandBfs::DistributedExpandBfs(
const std::vector<storage::EdgeType> &edge_types,
const std::shared_ptr<LogicalOperator> &input, Symbol input_symbol,
bool existing_node, GraphView graph_view, Expression *lower_bound,
Expression *upper_bound, const ExpandVariable::Lambda &filter_lambda)
Expression *upper_bound, const ExpansionLambda &filter_lambda)
: ExpandCommon(node_symbol, edge_symbol, direction, edge_types, input,
input_symbol, existing_node, graph_view),
lower_bound_(lower_bound),
@ -1023,18 +1023,8 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
DistributedExpandBfsCursor(const DistributedExpandBfs &self,
database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self_.input()->MakeCursor(db)) {
// TODO: Pass in a DistributedGraphDb.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&db.db())) {
bfs_subcursor_clients_ = &distributed_db->bfs_subcursor_clients();
}
CHECK(bfs_subcursor_clients_);
subcursor_ids_ = bfs_subcursor_clients_->CreateBfsSubcursors(
db_.transaction_id(), self_.direction_, self_.edge_types_,
self_.graph_view_);
bfs_subcursor_clients_->RegisterSubcursors(subcursor_ids_);
VLOG(10) << "BFS subcursors initialized";
pull_pos_ = subcursor_ids_.end();
CHECK(self_.graph_view_ == GraphView::OLD)
<< "ExpandVariable should only be planned with GraphView::OLD";
}
~DistributedExpandBfsCursor() {
@ -1042,10 +1032,28 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_);
}
void InitSubcursors(database::GraphDbAccessor *dba,
const query::SymbolTable &symbol_table,
const EvaluationContext &evaluation_context) {
// TODO: Pass in a DistributedGraphDb.
if (auto *distributed_db =
dynamic_cast<database::DistributedGraphDb *>(&dba->db())) {
bfs_subcursor_clients_ = &distributed_db->bfs_subcursor_clients();
}
CHECK(bfs_subcursor_clients_);
subcursor_ids_ = bfs_subcursor_clients_->CreateBfsSubcursors(
dba, self_.direction_, self_.edge_types_, self_.filter_lambda_,
symbol_table, evaluation_context);
bfs_subcursor_clients_->RegisterSubcursors(subcursor_ids_);
VLOG(10) << "BFS subcursors initialized";
pull_pos_ = subcursor_ids_.end();
}
bool Pull(Frame &frame, Context &context) override {
// TODO(mtomic): lambda filtering in distributed
if (self_.filter_lambda_.expression) {
throw utils::NotYetImplemented("lambda filtering in distributed BFS");
if (!subcursors_initialized_) {
InitSubcursors(&context.db_accessor_, context.symbol_table_,
context.evaluation_context_);
subcursors_initialized_ = true;
}
// Evaluator for the filtering condition and expansion depth.
@ -1064,7 +1072,6 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
pull_pos_->second, &db_);
if (vertex) {
last_vertex = *vertex;
SwitchAccessor(last_vertex.ValueVertex(), self_.graph_view_);
break;
}
VLOG(10) << "Nothing to pull from " << pull_pos_->first;
@ -1075,8 +1082,7 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
// Handle existence flag
if (self_.existing_node_) {
TypedValue &node = frame[self_.node_symbol_];
// Due to optional matching the existing node could be null
if (node.IsNull() || (node != last_vertex).ValueBool()) continue;
if ((node != last_vertex).ValueBool()) continue;
// There is no point in traversing the rest of the graph because BFS
// can find only one path to a certain node.
skip_rest_ = true;
@ -1115,8 +1121,6 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
if (!current_vertex_addr && !current_edge_addr) break;
}
std::reverse(edges.begin(), edges.end());
for (auto &edge : edges)
SwitchAccessor(edge.ValueEdge(), self_.graph_view_);
frame[self_.edge_symbol_] = std::move(edges);
return true;
}
@ -1128,7 +1132,7 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
if (current_depth_ < upper_bound_) {
VLOG(10) << "Trying to expand again...";
current_depth_++;
bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, false);
bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, false, {});
if (bfs_subcursor_clients_->ExpandLevel(subcursor_ids_)) {
continue;
}
@ -1141,8 +1145,9 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
auto vertex_value = frame[self_.input_symbol_];
// It is possible that the vertex is Null due to optional matching.
// Source or sink node could be null due to optional matching.
if (vertex_value.IsNull()) continue;
if (self_.existing_node_ && frame[self_.node_symbol_].IsNull()) continue;
auto vertex = vertex_value.ValueVertex();
lower_bound_ = self_.lower_bound_
@ -1153,16 +1158,18 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
? EvaluateInt(&evaluator, self_.upper_bound_,
"Max depth in breadth-first expansion")
: std::numeric_limits<int64_t>::max();
if (upper_bound_ < 1 || lower_bound_ > upper_bound_) continue;
skip_rest_ = false;
if (upper_bound_ < 1) {
throw QueryRuntimeException(
"Max depth in breadth-first expansion must be at least 1");
}
pull_pos_ = subcursor_ids_.begin();
VLOG(10) << "Starting BFS from " << vertex << " with limits "
<< lower_bound_ << ".." << upper_bound_;
bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, true);
bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, true,
frame.elems());
bfs_subcursor_clients_->SetSource(subcursor_ids_, vertex.GlobalAddress());
current_depth_ = 1;
}
@ -1199,6 +1206,8 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
// Next worker master should try pulling from.
std::unordered_map<int16_t, int64_t>::iterator pull_pos_;
bool subcursors_initialized_{false};
};
// Returns a random worker id. Worker ID is obtained from the Db.
@ -1349,8 +1358,7 @@ class DistributedCreateExpandCursor : public query::plan::Cursor {
ExpectType(dest_node_symbol, dest_node_value, TypedValue::Type::Vertex);
return dest_node_value.Value<VertexAccessor>();
} else {
return CreateVertexOnWorker(worker_id, self_->node_atom_, frame,
context);
return CreateVertexOnWorker(worker_id, self_->node_atom_, frame, context);
}
}

View File

@ -160,7 +160,7 @@ Logic of the synchronize operator is:
:capnp-save (save-ast-vector "Expression *")
:capnp-load (load-ast-vector "Expression *"))
(compare "TypedValueVectorCompare" :scope :public
:capnp-type "Common.TypedValueVectorCompare"))
:capnp-type "Query.TypedValueVectorCompare"))
(:documentation
"Operator that merges distributed OrderBy operators.
Instead of using a regular OrderBy on master (which would collect all remote
@ -220,9 +220,17 @@ by having only one result from each worker.")
:capnp-type "Ast.Tree" :capnp-init nil
:capnp-save #'save-ast-pointer
:capnp-load (load-ast-pointer "Expression *"))
(filter-lambda "ExpandVariable::Lambda" :scope :public
(filter-lambda "ExpansionLambda" :scope :public
:documentation "Filter that must be satisfied for expansion to succeed."
:capnp-type "ExpandVariable.Lambda"))
:capnp-type "ExpansionLambda"
:capnp-save (lambda (builder member capnp-name)
#>cpp
Save(${member}, &${builder}, &helper->saved_ast_uids);
cpp<#)
:capnp-load (lambda (reader member capnp-name)
#>cpp
Load(&${member}, ${reader}, &helper->ast_storage, &helper->loaded_ast_uids);
cpp<#)))
(:documentation "BFS expansion operator suited for distributed execution.")
(:public
#>cpp
@ -234,7 +242,7 @@ by having only one result from each worker.")
Symbol input_symbol, bool existing_node,
GraphView graph_view, Expression *lower_bound,
Expression *upper_bound,
const ExpandVariable::Lambda &filter_lambda);
const ExpansionLambda &filter_lambda);
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(

View File

@ -584,8 +584,8 @@ ExpandVariable::ExpandVariable(
const std::vector<storage::EdgeType> &edge_types, bool is_reverse,
Expression *lower_bound, Expression *upper_bound,
const std::shared_ptr<LogicalOperator> &input, Symbol input_symbol,
bool existing_node, Lambda filter_lambda,
std::experimental::optional<Lambda> weight_lambda,
bool existing_node, ExpansionLambda filter_lambda,
std::experimental::optional<ExpansionLambda> weight_lambda,
std::experimental::optional<Symbol> total_weight, GraphView graph_view)
: ExpandCommon(node_symbol, edge_symbol, direction, edge_types, input,
input_symbol, existing_node, graph_view),
@ -1130,9 +1130,9 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
CHECK(self_.graph_view_ == GraphView::OLD)
<< "ExpandVariable should only be planned with GraphView::OLD";
CHECK(!self_.existing_node_) << "Single source shortest path algorithm "
"should not be used when `existing_node` "
"flag is set, s-t shortest path algorithm "
"should be used instead!";
"should not be used when `existing_node` "
"flag is set, s-t shortest path algorithm "
"should be used instead!";
}
bool Pull(Frame &frame, Context &context) override {
@ -1191,6 +1191,7 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
// input
if (to_visit_current_.empty()) {
if (!input_cursor_->Pull(frame, context)) return false;
to_visit_current_.clear();
to_visit_next_.clear();
processed_.clear();
@ -1198,9 +1199,6 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
auto vertex_value = frame[self_.input_symbol_];
// it is possible that the vertex is Null due to optional matching
if (vertex_value.IsNull()) continue;
auto vertex = vertex_value.Value<VertexAccessor>();
processed_.emplace(vertex, std::experimental::nullopt);
expand_from_vertex(vertex);
lower_bound_ = self_.lower_bound_
? EvaluateInt(&evaluator, self_.lower_bound_,
"Min depth in breadth-first expansion")
@ -1209,9 +1207,12 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
? EvaluateInt(&evaluator, self_.upper_bound_,
"Max depth in breadth-first expansion")
: std::numeric_limits<int64_t>::max();
if (upper_bound_ < 1)
throw QueryRuntimeException(
"Maximum depth in breadth-first expansion must be at least 1.");
if (upper_bound_ < 1 || lower_bound_ > upper_bound_) continue;
auto vertex = vertex_value.Value<VertexAccessor>();
processed_.emplace(vertex, std::experimental::nullopt);
expand_from_vertex(vertex);
// go back to loop start and see if we expanded anything
continue;

View File

@ -141,13 +141,13 @@ cpp<#
(lcp:capnp-import 'storage "/storage/serialization.capnp")
(lcp:capnp-import 'ast "/query/frontend/ast/ast.capnp")
(lcp:capnp-import 'semantic "/query/frontend/semantic/symbol.capnp")
(lcp:capnp-import 'common "/query/common.capnp")
(lcp:capnp-import 'query "/query/serialization.capnp")
(lcp:capnp-type-conversion "Symbol" "Semantic.Symbol")
(lcp:capnp-type-conversion "storage::Label" "Storage.Common")
(lcp:capnp-type-conversion "storage::Property" "Storage.Common")
(lcp:capnp-type-conversion "storage::EdgeType" "Storage.Common")
(lcp:capnp-type-conversion "GraphView" "Common.GraphView")
(lcp:capnp-type-conversion "GraphView" "Query.GraphView")
(lcp:define-class logical-operator ("::utils::Visitable<HierarchicalLogicalOperatorVisitor>")
()
@ -810,6 +810,33 @@ pulled.")
cpp<#)
(:serialize :capnp :inherit-compose '(expand-common)))
(lcp:define-struct expansion-lambda ()
((inner-edge-symbol "Symbol" :documentation "Currently expanded edge symbol.")
(inner-node-symbol "Symbol" :documentation "Currently expanded node symbol.")
(expression "Expression *" :documentation "Expression used in lambda during expansion."
:capnp-type "Ast.Tree" :capnp-init nil
:capnp-save (lambda (builder member capnp-name)
#>cpp
if (${member}) {
auto expression_builder = ${builder}->initExpression();
${member}->Save(&expression_builder, saved_ast_uids);
}
cpp<#)
:capnp-load (lambda (reader member capnp-name)
#>cpp
if (${reader}.hasExpression()) {
${member} = static_cast<Expression *>(
ast_storage->Load(${reader}.getExpression(),
loaded_ast_uids));
} else {
${member} = nullptr;
}
cpp<#)))
(:serialize :capnp
:save-args '((saved-ast-uids "std::vector<int> *"))
:load-args '((ast-storage "AstStorage *")
(loaded-ast-uids "std::vector<int> *"))))
(lcp:define-class expand-variable (logical-operator expand-common)
((type "EdgeAtom::Type" :scope :public :capnp-type "Ast.EdgeAtom.Type"
:capnp-init nil
@ -827,14 +854,29 @@ pulled.")
:capnp-type "Ast.Tree" :capnp-init nil
:capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "Expression *")
:documentation "Optional upper bound of the variable length expansion, defaults are (1, inf)")
(filter-lambda "Lambda" :scope :public)
(weight-lambda "std::experimental::optional<Lambda>" :scope :public
(filter-lambda "ExpansionLambda"
:scope :public
:capnp-save (lambda (builder member capnp-name)
#>cpp
Save(${member}, &${builder}, &helper->saved_ast_uids);
cpp<#)
:capnp-load (lambda (reader member capnp-name)
#>cpp
Load(&${member}, ${reader}, &helper->ast_storage, &helper->loaded_ast_uids);
cpp<#))
(weight-lambda "std::experimental::optional<ExpansionLambda>" :scope :public
:capnp-save (lcp:capnp-save-optional
"capnp::ExpandVariable::Lambda" "ExpandVariable::Lambda"
"[helper](auto *builder, const auto &val) { Save(val, builder, helper); }")
"capnp::ExpansionLambda" "ExpansionLambda"
"[helper](auto *builder, const auto &val) {
Save(val, builder, &helper->saved_ast_uids);
}")
:capnp-load (lcp:capnp-load-optional
"capnp::ExpandVariable::Lambda" "ExpandVariable::Lambda"
"[helper](const auto &reader) { ExpandVariable::Lambda val; Load(&val, reader, helper); return val; }"))
"capnp::ExpansionLambda" "ExpansionLambda"
"[helper](const auto &reader) {
ExpansionLambda val;
Load(&val, reader, &helper->ast_storage, &helper->loaded_ast_uids);
return val;
}"))
(total-weight "std::experimental::optional<Symbol>" :scope :public
:capnp-save (lcp:capnp-save-optional "::query::capnp::Symbol" "Symbol")
:capnp-load (lcp:capnp-load-optional "::query::capnp::Symbol" "Symbol")))
@ -856,15 +898,6 @@ MATCH (a) MATCH (a)--(b)),
only expansions that match defined equalities are succesfully
pulled.")
(:public
(lcp:define-struct lambda ()
((inner-edge-symbol "Symbol" :documentation "Currently expanded edge symbol.")
(inner-node-symbol "Symbol" :documentation "Currently expanded node symbol.")
(expression "Expression *" :documentation "Expression used in lambda during expansion."
:capnp-type "Ast.Tree" :capnp-init nil
:capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "Expression *")))
(:serialize :capnp
:save-args '((helper "LogicalOperator::SaveHelper *"))
:load-args '((helper "LogicalOperator::LoadHelper *"))))
#>cpp
ExpandVariable() {}
@ -897,8 +930,9 @@ pulled.")
bool is_reverse, Expression *lower_bound,
Expression *upper_bound,
const std::shared_ptr<LogicalOperator> &input,
Symbol input_symbol, bool existing_node, Lambda filter_lambda,
std::experimental::optional<Lambda> weight_lambda,
Symbol input_symbol, bool existing_node,
ExpansionLambda filter_lambda,
std::experimental::optional<ExpansionLambda> weight_lambda,
std::experimental::optional<Symbol> total_weight,
GraphView graph_view);
@ -1774,7 +1808,7 @@ input should be performed).")
:capnp-save #'save-operator-pointer
:capnp-load #'load-operator-pointer)
(compare "TypedValueVectorCompare" :scope :public
:capnp-type "Common.TypedValueVectorCompare")
:capnp-type "Query.TypedValueVectorCompare")
(order-by "std::vector<Expression *>" :scope :public
:capnp-type "List(Ast.Tree)"
:capnp-save (save-ast-vector "Expression *")

View File

@ -406,11 +406,11 @@ class RuleBasedPlanner {
DCHECK(!utils::Contains(bound_symbols, edge_symbol))
<< "Existing edges are not supported";
if (edge->IsVariable()) {
std::experimental::optional<ExpandVariable::Lambda> weight_lambda;
std::experimental::optional<ExpansionLambda> weight_lambda;
std::experimental::optional<Symbol> total_weight;
if (edge->type_ == EdgeAtom::Type::WEIGHTED_SHORTEST_PATH) {
weight_lambda.emplace(ExpandVariable::Lambda{
weight_lambda.emplace(ExpansionLambda{
symbol_table.at(*edge->weight_lambda_.inner_edge),
symbol_table.at(*edge->weight_lambda_.inner_node),
edge->weight_lambda_.expression});
@ -418,7 +418,7 @@ class RuleBasedPlanner {
total_weight.emplace(symbol_table.at(*edge->total_weight_));
}
ExpandVariable::Lambda filter_lambda;
ExpansionLambda filter_lambda;
filter_lambda.inner_edge_symbol =
symbol_table.at(*edge->filter_lambda_.inner_edge);
filter_lambda.inner_node_symbol =

View File

@ -1,11 +1,21 @@
@0xf47e119e21912f20;
using Ast = import "/query/frontend/ast/ast.capnp";
using Cxx = import "/capnp/c++.capnp";
using Storage = import "/storage/serialization.capnp";
using Utils = import "/utils/serialization.capnp";
$Cxx.namespace("query::capnp");
enum GraphView {
old @0;
new @1;
}
struct TypedValueVectorCompare {
ordering @0 :List(Ast.Ordering);
}
struct EvaluationContext {
timestamp @0 : Int64;
params @1 : Utils.Map(Utils.BoxInt64, Storage.PropertyValue);

View File

@ -139,4 +139,30 @@ void LoadCapnpTypedValue(const capnp::TypedValue::Reader &reader,
}
}
void SaveEvaluationContext(const EvaluationContext &ctx,
capnp::EvaluationContext::Builder *builder) {
builder->setTimestamp(ctx.timestamp);
auto params_builder =
builder->initParams().initEntries(ctx.parameters.size());
size_t i = 0;
for (auto &entry : ctx.parameters) {
auto builder = params_builder[i];
auto key_builder = builder.initKey();
key_builder.setValue(entry.first);
auto value_builder = builder.initValue();
storage::SaveCapnpPropertyValue(entry.second, &value_builder);
++i;
}
}
void LoadEvaluationContext(const capnp::EvaluationContext::Reader &reader,
EvaluationContext *ctx) {
ctx->timestamp = reader.getTimestamp();
for (const auto &entry_reader : reader.getParams().getEntries()) {
PropertyValue value;
storage::LoadCapnpPropertyValue(entry_reader.getValue(), &value);
ctx->parameters.Add(entry_reader.getKey().getValue(), value);
}
}
} // namespace query

View File

@ -1,5 +1,6 @@
#pragma once
#include "query/context.hpp"
#include "query/serialization.capnp.h"
#include "query/typed_value.hpp"
#include "storage/serialization.hpp"
@ -19,4 +20,10 @@ void LoadCapnpTypedValue(const capnp::TypedValue::Reader &reader,
database::GraphDbAccessor *dba,
distributed::DataManager *data_manager);
void SaveEvaluationContext(const EvaluationContext &ctx,
capnp::EvaluationContext::Builder *builder);
void LoadEvaluationContext(const capnp::EvaluationContext::Reader &reader,
EvaluationContext *ctx);
} // namespace query

View File

@ -58,9 +58,6 @@ target_link_libraries(${test_prefix}datastructure_union_find memgraph_lib kvstor
add_unit_test(deferred_deleter.cpp)
target_link_libraries(${test_prefix}deferred_deleter memgraph_lib kvstore_dummy_lib)
add_unit_test(distributed_bfs.cpp)
target_link_libraries(${test_prefix}distributed_bfs memgraph_lib kvstore_dummy_lib)
add_unit_test(distributed_coordination.cpp)
target_link_libraries(${test_prefix}distributed_coordination memgraph_lib kvstore_dummy_lib)
@ -101,6 +98,12 @@ target_link_libraries(${test_prefix}distributed_updates memgraph_lib kvstore_dum
# add_unit_test(distributed_token_sharing.cpp)
# target_link_libraries(${test_prefix}distributed_token_sharing memgraph_lib kvstore_dummy_lib)
add_unit_test(bfs_distributed.cpp)
target_link_libraries(${test_prefix}bfs_distributed memgraph_lib kvstore_dummy_lib)
add_unit_test(bfs_single_node.cpp)
target_link_libraries(${test_prefix}bfs_single_node memgraph_lib kvstore_dummy_lib)
add_unit_test(distributed_dgp_partitioner.cpp)
target_link_libraries(${test_prefix}distributed_dgp_partitioner memgraph_lib kvstore_dummy_lib)

478
tests/unit/bfs_common.hpp Normal file
View File

@ -0,0 +1,478 @@
#pragma once
#include "gtest/gtest.h"
#include "query/context.hpp"
#include "query/frontend/ast/ast.hpp"
#include "query/interpret/frame.hpp"
#include "query/plan/operator.hpp"
#include "query_common.hpp"
namespace query {
void PrintTo(const query::EdgeAtom::Direction &dir, std::ostream *os) {
switch (dir) {
case query::EdgeAtom::Direction::IN:
*os << "IN";
break;
case query::EdgeAtom::Direction::OUT:
*os << "OUT";
break;
case query::EdgeAtom::Direction::BOTH:
*os << "BOTH";
break;
}
}
} // namespace query
const auto kVertexCount = 6;
// Maps vertices to workers
const std::vector<int> kVertexLocations = {0, 1, 1, 0, 2, 2};
// Edge list in form of (from, to, edge_type).
const std::vector<std::tuple<int, int, std::string>> kEdges = {
{0, 1, "a"}, {1, 2, "b"}, {2, 4, "b"}, {2, 5, "a"}, {4, 1, "a"},
{4, 5, "a"}, {5, 3, "b"}, {5, 4, "a"}, {5, 5, "b"}};
// Filters input edge list by edge type and direction and returns a list of
// pairs representing valid directed edges.
std::vector<std::pair<int, int>> GetEdgeList(
const std::vector<std::tuple<int, int, std::string>> &edges,
query::EdgeAtom::Direction dir,
const std::vector<std::string> &edge_types) {
std::vector<std::pair<int, int>> ret;
for (const auto &e : edges) {
if (edge_types.empty() || utils::Contains(edge_types, std::get<2>(e)))
ret.emplace_back(std::get<0>(e), std::get<1>(e));
}
switch (dir) {
case query::EdgeAtom::Direction::OUT:
break;
case query::EdgeAtom::Direction::IN:
for (auto &e : ret) std::swap(e.first, e.second);
break;
case query::EdgeAtom::Direction::BOTH:
std::transform(
ret.begin(), ret.end(), std::back_inserter(ret),
[](const auto &e) { return std::make_pair(e.second, e.first); });
break;
}
return ret;
}
// Floyd-Warshall algorithm. Given a graph, returns its distance matrix. If
// there is no path between two vertices, corresponding matrix entry will be
// -1.
std::vector<std::vector<int>> FloydWarshall(
int num_vertices, const std::vector<std::pair<int, int>> &edges) {
int inf = std::numeric_limits<int>::max();
std::vector<std::vector<int>> dist(num_vertices,
std::vector<int>(num_vertices, inf));
for (const auto &e : edges) dist[e.first][e.second] = 1;
for (int i = 0; i < num_vertices; ++i) dist[i][i] = 0;
for (int k = 0; k < num_vertices; ++k) {
for (int i = 0; i < num_vertices; ++i) {
for (int j = 0; j < num_vertices; ++j) {
if (dist[i][k] == inf || dist[k][j] == inf) continue;
dist[i][j] = std::min(dist[i][j], dist[i][k] + dist[k][j]);
}
}
}
for (int i = 0; i < num_vertices; ++i)
for (int j = 0; j < num_vertices; ++j)
if (dist[i][j] == inf) dist[i][j] = -1;
return dist;
}
class Yield : public query::plan::LogicalOperator {
public:
Yield(const std::shared_ptr<query::plan::LogicalOperator> &input,
const std::vector<query::Symbol> &modified_symbols,
const std::vector<std::vector<query::TypedValue>> &values)
: input_(input ? input : std::make_shared<query::plan::Once>()),
modified_symbols_(modified_symbols),
values_(values) {}
std::unique_ptr<query::plan::Cursor> MakeCursor(
database::GraphDbAccessor &dba) const override {
return std::make_unique<YieldCursor>(this, input_->MakeCursor(dba));
}
std::vector<query::Symbol> ModifiedSymbols(
const query::SymbolTable &) const override {
return modified_symbols_;
}
bool HasSingleInput() const override { return true; }
std::shared_ptr<query::plan::LogicalOperator> input() const override {
return input_;
}
void set_input(std::shared_ptr<query::plan::LogicalOperator> input) override {
input_ = input;
}
bool Accept(query::plan::HierarchicalLogicalOperatorVisitor &) override {
LOG(FATAL) << "Please go away, visitor!";
}
std::shared_ptr<query::plan::LogicalOperator> input_;
std::vector<query::Symbol> modified_symbols_;
std::vector<std::vector<query::TypedValue>> values_;
class YieldCursor : public query::plan::Cursor {
public:
YieldCursor(const Yield *self,
std::unique_ptr<query::plan::Cursor> input_cursor)
: self_(self),
input_cursor_(std::move(input_cursor)),
pull_index_(self_->values_.size()) {}
bool Pull(query::Frame &frame, query::Context &context) override {
if (pull_index_ == self_->values_.size()) {
if (!input_cursor_->Pull(frame, context)) return false;
pull_index_ = 0;
}
for (size_t i = 0; i < self_->values_[pull_index_].size(); ++i) {
frame[self_->modified_symbols_[i]] = self_->values_[pull_index_][i];
}
pull_index_++;
return true;
}
void Reset() override {
input_cursor_->Reset();
pull_index_ = self_->values_.size();
}
void Shutdown() override {}
private:
const Yield *self_;
std::unique_ptr<query::plan::Cursor> input_cursor_;
size_t pull_index_;
};
};
std::vector<std::vector<query::TypedValue>> PullResults(
query::plan::LogicalOperator *last_op, query::Context *context,
std::vector<query::Symbol> output_symbols) {
auto cursor = last_op->MakeCursor(context->db_accessor_);
std::vector<std::vector<query::TypedValue>> output;
{
query::Frame frame(context->symbol_table_.max_position());
while (cursor->Pull(frame, *context)) {
output.emplace_back();
for (const auto &symbol : output_symbols) {
output.back().push_back(frame[symbol]);
}
}
}
return output;
}
/* Various types of lambdas.
* NONE - No filter lambda used.
* USE_FRAME - Block a single edge or vertex. Tests if frame is sent over
* the network properly in distributed BFS.
* USE_FRAME_NULL - Block a single node or vertex, but lambda returns null
* instead of false.
* USE_CTX - Block a vertex by checking if its ID is equal to a
* parameter. Tests if evaluation context is sent over the
* network properly in distributed BFS.
* ERROR - Lambda that evaluates to an integer instead of null or
* boolean.In distributed BFS, it will fail on worker other
* than master, to test if errors are propagated correctly.
*/
enum class FilterLambdaType { NONE, USE_FRAME, USE_FRAME_NULL, USE_CTX, ERROR };
// Common interface for single-node and distributed Memgraph.
class Database {
public:
virtual std::unique_ptr<database::GraphDbAccessor> Access() = 0;
virtual void AdvanceCommand(tx::TransactionId tx_id) = 0;
virtual std::unique_ptr<query::plan::LogicalOperator> MakeBfsOperator(
query::Symbol source_sym, query::Symbol sink_sym, query::Symbol edge_sym,
query::EdgeAtom::Direction direction,
const std::vector<storage::EdgeType> &edge_types,
const std::shared_ptr<query::plan::LogicalOperator> &input,
bool existing_node, query::Expression *lower_bound,
query::Expression *upper_bound,
const query::plan::ExpansionLambda &filter_lambda) = 0;
virtual std::pair<std::vector<storage::VertexAddress>,
std::vector<storage::EdgeAddress>>
BuildGraph(database::GraphDbAccessor *dba,
const std::vector<int> &vertex_locations,
const std::vector<std::tuple<int, int, std::string>> &edges) = 0;
virtual ~Database() {}
};
// Returns an operator that yields vertices given by their address. We will also
// include query::TypedValue::Null to account for the optional match case.
std::unique_ptr<query::plan::LogicalOperator> YieldVertices(
database::GraphDbAccessor *dba,
std::vector<storage::VertexAddress> vertices, query::Symbol symbol,
std::shared_ptr<query::plan::LogicalOperator> input_op) {
std::vector<std::vector<query::TypedValue>> frames;
frames.push_back(std::vector<query::TypedValue>{query::TypedValue::Null});
for (const auto &vertex : vertices) {
frames.push_back(
std::vector<query::TypedValue>{VertexAccessor(vertex, *dba)});
}
return std::make_unique<Yield>(input_op, std::vector<query::Symbol>{symbol},
frames);
}
// Returns an operator that yields edges and vertices given by their address.
std::unique_ptr<query::plan::LogicalOperator> YieldEntities(
database::GraphDbAccessor *dba,
std::vector<storage::VertexAddress> vertices,
std::vector<storage::EdgeAddress> edges, query::Symbol symbol,
std::shared_ptr<query::plan::LogicalOperator> input_op) {
std::vector<std::vector<query::TypedValue>> frames;
for (const auto &vertex : vertices) {
frames.push_back(
std::vector<query::TypedValue>{VertexAccessor(vertex, *dba)});
}
for (const auto &edge : edges) {
frames.push_back(std::vector<query::TypedValue>{EdgeAccessor(edge, *dba)});
}
return std::make_unique<Yield>(input_op, std::vector<query::Symbol>{symbol},
frames);
}
template <class TRecord>
auto GetProp(const RecordAccessor<TRecord> &rec, std::string prop,
database::GraphDbAccessor *dba) {
return rec.PropsAt(dba->Property(prop));
}
// Checks if the given path is actually a path from source to sink and if all
// of its edges exist in the given edge list.
void CheckPath(database::GraphDbAccessor *dba, const VertexAccessor &source,
const VertexAccessor &sink,
const std::vector<query::TypedValue> &path,
const std::vector<std::pair<int, int>> &edges) {
VertexAccessor curr = source;
for (const auto &edge_tv : path) {
ASSERT_TRUE(edge_tv.IsEdge());
auto edge = edge_tv.ValueEdge();
ASSERT_TRUE(edge.from() == curr || edge.to() == curr);
auto next = edge.from_is(curr) ? edge.to() : edge.from();
int from = GetProp(curr, "id", dba).Value<int64_t>();
int to = GetProp(next, "id", dba).Value<int64_t>();
ASSERT_TRUE(utils::Contains(edges, std::make_pair(from, to)));
curr = next;
}
ASSERT_EQ(curr, sink);
}
// Given a list of BFS results of form (from, to, path, blocked entity),
// checks if all paths are valid and returns the distance matrix.
std::vector<std::vector<int>> CheckPathsAndExtractDistances(
database::GraphDbAccessor *dba,
const std::vector<std::pair<int, int>> edges,
const std::vector<std::vector<query::TypedValue>> &results) {
std::vector<std::vector<int>> distances(kVertexCount,
std::vector<int>(kVertexCount, -1));
for (size_t i = 0; i < kVertexCount; ++i) distances[i][i] = 0;
for (const auto &row : results) {
auto source = GetProp(row[0].ValueVertex(), "id", dba).Value<int64_t>();
auto sink = GetProp(row[1].ValueVertex(), "id", dba).Value<int64_t>();
distances[source][sink] = row[2].ValueList().size();
CheckPath(dba, row[0].ValueVertex(), row[1].ValueVertex(),
row[2].ValueList(), edges);
}
return distances;
}
void BfsTest(Database *db, int lower_bound, int upper_bound,
query::EdgeAtom::Direction direction,
std::vector<std::string> edge_types, bool known_sink,
FilterLambdaType filter_lambda_type) {
auto dba_ptr = db->Access();
auto &dba = *dba_ptr;
query::AstStorage storage;
query::Context context(*dba_ptr);
query::Symbol blocked_sym =
context.symbol_table_.CreateSymbol("blocked", true);
query::Symbol source_sym = context.symbol_table_.CreateSymbol("source", true);
query::Symbol sink_sym = context.symbol_table_.CreateSymbol("sink", true);
query::Symbol edges_sym = context.symbol_table_.CreateSymbol("edges", true);
query::Symbol inner_node_sym =
context.symbol_table_.CreateSymbol("inner_node", true);
query::Symbol inner_edge_sym =
context.symbol_table_.CreateSymbol("inner_edge", true);
query::Identifier *blocked = IDENT("blocked");
query::Identifier *inner_node = IDENT("inner_node");
query::Identifier *inner_edge = IDENT("inner_edge");
context.symbol_table_[*blocked] = blocked_sym;
context.symbol_table_[*inner_node] = inner_node_sym;
context.symbol_table_[*inner_edge] = inner_edge_sym;
std::vector<storage::VertexAddress> vertices;
std::vector<storage::EdgeAddress> edges;
std::tie(vertices, edges) =
db->BuildGraph(dba_ptr.get(), kVertexLocations, kEdges);
db->AdvanceCommand(dba_ptr->transaction_id());
std::shared_ptr<query::plan::LogicalOperator> input_op;
query::Expression *filter_expr;
// First build a filter lambda and an operator yielding blocked entities.
switch (filter_lambda_type) {
case FilterLambdaType::NONE:
// No filter lambda, nothing is ever blocked.
input_op = std::make_shared<Yield>(
nullptr, std::vector<query::Symbol>{blocked_sym},
std::vector<std::vector<query::TypedValue>>{
{query::TypedValue::Null}});
filter_expr = nullptr;
break;
case FilterLambdaType::USE_FRAME:
// We block each entity in the graph and run BFS.
input_op =
YieldEntities(dba_ptr.get(), vertices, edges, blocked_sym, nullptr);
filter_expr = AND(NEQ(inner_node, blocked), NEQ(inner_edge, blocked));
break;
case FilterLambdaType::USE_FRAME_NULL:
// We block each entity in the graph and run BFS.
input_op =
YieldEntities(dba_ptr.get(), vertices, edges, blocked_sym, nullptr);
filter_expr = IF(AND(NEQ(inner_node, blocked), NEQ(inner_edge, blocked)),
LITERAL(true), LITERAL(PropertyValue::Null));
break;
case FilterLambdaType::USE_CTX:
// We only block vertex #5 and run BFS.
input_op = std::make_shared<Yield>(
nullptr, std::vector<query::Symbol>{blocked_sym},
std::vector<std::vector<query::TypedValue>>{
{VertexAccessor(vertices[5], *dba_ptr)}});
filter_expr = NEQ(PROPERTY_LOOKUP(inner_node, PROPERTY_PAIR("id")),
PARAMETER_LOOKUP(0));
context.evaluation_context_.parameters.Add(0, 5);
break;
case FilterLambdaType::ERROR:
// Evaluate to 42 for vertex #5 which is on worker 1.
filter_expr =
IF(EQ(PROPERTY_LOOKUP(inner_node, PROPERTY_PAIR("id")), LITERAL(5)),
LITERAL(42), LITERAL(true));
}
// We run BFS once from each vertex for each blocked entity.
input_op = YieldVertices(dba_ptr.get(), vertices, source_sym, input_op);
// If the sink is known, we run BFS for all posible combinations of source,
// sink and blocked entity.
if (known_sink) {
input_op = YieldVertices(dba_ptr.get(), vertices, sink_sym, input_op);
}
std::vector<storage::EdgeType> storage_edge_types;
for (const auto &t : edge_types) {
storage_edge_types.push_back(dba_ptr->EdgeType(t));
}
input_op = db->MakeBfsOperator(
source_sym, sink_sym, edges_sym, direction, storage_edge_types, input_op,
known_sink, lower_bound == -1 ? nullptr : LITERAL(lower_bound),
upper_bound == -1 ? nullptr : LITERAL(upper_bound),
query::plan::ExpansionLambda{inner_edge_sym, inner_node_sym,
filter_expr});
std::vector<std::vector<query::TypedValue>> results;
// An exception should be thrown on one of the pulls.
if (filter_lambda_type == FilterLambdaType::ERROR) {
EXPECT_THROW(PullResults(input_op.get(), &context,
std::vector<query::Symbol>{
source_sym, sink_sym, edges_sym, blocked_sym}),
query::QueryRuntimeException);
return;
}
results = PullResults(
input_op.get(), &context,
std::vector<query::Symbol>{source_sym, sink_sym, edges_sym, blocked_sym});
// Group results based on blocked entity and compare them to results
// obtained by running Floyd-Warshall.
for (size_t i = 0; i < results.size();) {
int j = i;
auto blocked = results[j][3];
while (j < results.size() &&
query::TypedValue::BoolEqual{}(results[j][3], blocked))
++j;
SCOPED_TRACE(fmt::format("blocked entity = {}", blocked));
// When an edge is blocked, it is blocked in both directions so we remove
// it before modifying edge list to account for direction and edge types;
auto edges = kEdges;
if (blocked.IsEdge()) {
int from =
GetProp(blocked.ValueEdge(), "from", dba_ptr.get()).Value<int64_t>();
int to =
GetProp(blocked.ValueEdge(), "to", dba_ptr.get()).Value<int64_t>();
edges.erase(std::remove_if(edges.begin(), edges.end(),
[from, to](const auto &e) {
return std::get<0>(e) == from &&
std::get<1>(e) == to;
}),
edges.end());
}
// Now add edges in opposite direction if necessary.
auto edges_blocked = GetEdgeList(edges, direction, edge_types);
// When a vertex is blocked, we remove all edges that lead into it.
if (blocked.IsVertex()) {
int id =
GetProp(blocked.ValueVertex(), "id", dba_ptr.get()).Value<int64_t>();
edges_blocked.erase(
std::remove_if(edges_blocked.begin(), edges_blocked.end(),
[id](const auto &e) { return e.second == id; }),
edges_blocked.end());
}
auto correct_with_bounds = FloydWarshall(kVertexCount, edges_blocked);
if (lower_bound == -1) lower_bound = 0;
if (upper_bound == -1) upper_bound = kVertexCount;
// Remove paths whose length doesn't satisfy given length bounds.
for (int a = 0; a < kVertexCount; ++a) {
for (int b = 0; b < kVertexCount; ++b) {
if (a != b && (correct_with_bounds[a][b] < lower_bound ||
correct_with_bounds[a][b] > upper_bound))
correct_with_bounds[a][b] = -1;
}
}
int num_results = 0;
for (int a = 0; a < kVertexCount; ++a)
for (int b = 0; b < kVertexCount; ++b)
if (a != b && correct_with_bounds[a][b] != -1) {
++num_results;
}
// There should be exactly 1 successful pull for each existing path.
EXPECT_EQ(j - i, num_results);
auto distances = CheckPathsAndExtractDistances(
dba_ptr.get(), edges_blocked,
std::vector<std::vector<query::TypedValue>>(results.begin() + i,
results.begin() + j));
// The distances should also match.
EXPECT_EQ(distances, correct_with_bounds);
i = j;
}
dba_ptr->Abort();
}

View File

@ -0,0 +1,133 @@
// Macros from query_common.hpp break enum declaration in distributed_ops.hpp
// (because of the SHOW_STREAMS macro), so we must be careful with the order of
// includes.
#include "query/plan/distributed_ops.hpp"
#include "bfs_common.hpp"
#include "distributed_common.hpp"
using namespace query;
using namespace query::plan;
class DistributedDb : public Database {
public:
DistributedDb() : cluster_(3, "DistributedBfsTest") {}
std::unique_ptr<database::GraphDbAccessor> Access() override {
return cluster_.master()->Access();
}
void AdvanceCommand(tx::TransactionId tx_id) override {
cluster_.AdvanceCommand(tx_id);
}
std::unique_ptr<LogicalOperator> MakeBfsOperator(
Symbol source_sym, Symbol sink_sym, Symbol edge_sym,
EdgeAtom::Direction direction,
const std::vector<storage::EdgeType> &edge_types,
const std::shared_ptr<LogicalOperator> &input, bool existing_node,
Expression *lower_bound, Expression *upper_bound,
const ExpansionLambda &filter_lambda) override {
return std::make_unique<DistributedExpandBfs>(
sink_sym, edge_sym, direction, edge_types, input, source_sym,
existing_node, GraphView::OLD, lower_bound, upper_bound, filter_lambda);
}
std::pair<std::vector<storage::VertexAddress>,
std::vector<storage::EdgeAddress>>
BuildGraph(
database::GraphDbAccessor *dba, const std::vector<int> &vertex_locations,
const std::vector<std::tuple<int, int, std::string>> &edges) override {
std::vector<storage::VertexAddress> vertex_addr;
std::vector<storage::EdgeAddress> edge_addr;
for (size_t id = 0; id < vertex_locations.size(); ++id) {
if (vertex_locations[id] == 0) {
auto vertex = dba->InsertVertex();
vertex.PropsSet(dba->Property("id"), (int64_t)id);
vertex_addr.push_back(vertex.GlobalAddress());
} else {
auto vertex = database::InsertVertexIntoRemote(
dba, vertex_locations[id], {}, {{dba->Property("id"), (int64_t)id}},
std::experimental::nullopt);
vertex_addr.push_back(vertex.GlobalAddress());
}
}
for (auto e : edges) {
int u, v;
std::string type;
std::tie(u, v, type) = e;
VertexAccessor from(vertex_addr[u], *dba);
VertexAccessor to(vertex_addr[v], *dba);
auto edge = dba->InsertEdge(from, to, dba->EdgeType(type));
edge.PropsSet(dba->Property("from"), u);
edge.PropsSet(dba->Property("to"), v);
edge_addr.push_back(edge.GlobalAddress());
}
return std::make_pair(vertex_addr, edge_addr);
}
private:
Cluster cluster_;
};
class DistributedBfsTest
: public ::testing::TestWithParam<
std::tuple<int, int, EdgeAtom::Direction, std::vector<std::string>,
bool, FilterLambdaType>> {
public:
static void SetUpTestCase() { db_ = std::make_unique<DistributedDb>(); }
static void TearDownTestCase() { db_ = nullptr; }
protected:
static std::unique_ptr<DistributedDb> db_;
};
std::unique_ptr<DistributedDb> DistributedBfsTest::db_{nullptr};
TEST_P(DistributedBfsTest, All) {
int lower_bound;
int upper_bound;
EdgeAtom::Direction direction;
std::vector<std::string> edge_types;
bool known_sink;
FilterLambdaType filter_lambda_type;
std::tie(lower_bound, upper_bound, direction, edge_types, known_sink,
filter_lambda_type) = GetParam();
BfsTest(db_.get(), lower_bound, upper_bound, direction, edge_types,
known_sink, filter_lambda_type);
}
INSTANTIATE_TEST_CASE_P(
DirectionAndExpansionDepth, DistributedBfsTest,
testing::Combine(testing::Range(-1, kVertexCount),
testing::Range(-1, kVertexCount),
testing::Values(EdgeAtom::Direction::OUT,
EdgeAtom::Direction::IN,
EdgeAtom::Direction::BOTH),
testing::Values(std::vector<std::string>{}),
testing::Bool(), testing::Values(FilterLambdaType::NONE)));
INSTANTIATE_TEST_CASE_P(
EdgeType, DistributedBfsTest,
testing::Combine(testing::Values(-1), testing::Values(-1),
testing::Values(EdgeAtom::Direction::OUT,
EdgeAtom::Direction::IN,
EdgeAtom::Direction::BOTH),
testing::Values(std::vector<std::string>{},
std::vector<std::string>{"a"},
std::vector<std::string>{"b"},
std::vector<std::string>{"a", "b"}),
testing::Bool(), testing::Values(FilterLambdaType::NONE)));
INSTANTIATE_TEST_CASE_P(
FilterLambda, DistributedBfsTest,
testing::Combine(
testing::Values(-1), testing::Values(-1),
testing::Values(EdgeAtom::Direction::OUT, EdgeAtom::Direction::IN,
EdgeAtom::Direction::BOTH),
testing::Values(std::vector<std::string>{}), testing::Bool(),
testing::Values(FilterLambdaType::NONE, FilterLambdaType::USE_FRAME,
FilterLambdaType::USE_CTX, FilterLambdaType::ERROR)));

View File

@ -0,0 +1,124 @@
#include "bfs_common.hpp"
using namespace query;
using namespace query::plan;
class SingleNodeDb : public Database {
public:
SingleNodeDb() : db_() {}
std::unique_ptr<database::GraphDbAccessor> Access() override {
return db_.Access();
}
void AdvanceCommand(tx::TransactionId tx_id) override {
auto dba = db_.Access(tx_id);
dba->AdvanceCommand();
}
std::unique_ptr<LogicalOperator> MakeBfsOperator(
Symbol source_sym, Symbol sink_sym, Symbol edge_sym,
EdgeAtom::Direction direction,
const std::vector<storage::EdgeType> &edge_types,
const std::shared_ptr<LogicalOperator> &input, bool existing_node,
Expression *lower_bound, Expression *upper_bound,
const ExpansionLambda &filter_lambda) override {
return std::make_unique<ExpandVariable>(
sink_sym, edge_sym, EdgeAtom::Type::BREADTH_FIRST, direction,
edge_types, false, lower_bound, upper_bound, input, source_sym,
existing_node, filter_lambda, std::experimental::nullopt,
std::experimental::nullopt, GraphView::OLD);
}
std::pair<std::vector<storage::VertexAddress>,
std::vector<storage::EdgeAddress>>
BuildGraph(
database::GraphDbAccessor *dba, const std::vector<int> &vertex_locations,
const std::vector<std::tuple<int, int, std::string>> &edges) override {
std::vector<storage::VertexAddress> vertex_addr;
std::vector<storage::EdgeAddress> edge_addr;
for (size_t id = 0; id < vertex_locations.size(); ++id) {
auto vertex = dba->InsertVertex();
vertex.PropsSet(dba->Property("id"), (int64_t)id);
vertex_addr.push_back(vertex.GlobalAddress());
}
for (auto e : edges) {
int u, v;
std::string type;
std::tie(u, v, type) = e;
VertexAccessor from(vertex_addr[u], *dba);
VertexAccessor to(vertex_addr[v], *dba);
auto edge = dba->InsertEdge(from, to, dba->EdgeType(type));
edge.PropsSet(dba->Property("from"), u);
edge.PropsSet(dba->Property("to"), v);
edge_addr.push_back(edge.GlobalAddress());
}
return std::make_pair(vertex_addr, edge_addr);
}
protected:
database::SingleNode db_;
};
class SingleNodeBfsTest
: public ::testing::TestWithParam<
std::tuple<int, int, EdgeAtom::Direction, std::vector<std::string>,
bool, FilterLambdaType>> {
public:
static void SetUpTestCase() { db_ = std::make_unique<SingleNodeDb>(); }
static void TearDownTestCase() { db_ = nullptr; }
protected:
static std::unique_ptr<SingleNodeDb> db_;
};
TEST_P(SingleNodeBfsTest, All) {
int lower_bound;
int upper_bound;
EdgeAtom::Direction direction;
std::vector<std::string> edge_types;
bool known_sink;
FilterLambdaType filter_lambda_type;
std::tie(lower_bound, upper_bound, direction, edge_types, known_sink,
filter_lambda_type) = GetParam();
BfsTest(db_.get(), lower_bound, upper_bound, direction,edge_types,
known_sink, filter_lambda_type);
}
std::unique_ptr<SingleNodeDb> SingleNodeBfsTest::db_{nullptr};
INSTANTIATE_TEST_CASE_P(
DirectionAndExpansionDepth, SingleNodeBfsTest,
testing::Combine(testing::Range(-1, kVertexCount),
testing::Range(-1, kVertexCount),
testing::Values(EdgeAtom::Direction::OUT,
EdgeAtom::Direction::IN,
EdgeAtom::Direction::BOTH),
testing::Values(std::vector<std::string>{}),
testing::Bool(), testing::Values(FilterLambdaType::NONE)));
INSTANTIATE_TEST_CASE_P(
EdgeType, SingleNodeBfsTest,
testing::Combine(testing::Values(-1), testing::Values(-1),
testing::Values(EdgeAtom::Direction::OUT,
EdgeAtom::Direction::IN,
EdgeAtom::Direction::BOTH),
testing::Values(std::vector<std::string>{},
std::vector<std::string>{"a"},
std::vector<std::string>{"b"},
std::vector<std::string>{"a", "b"}),
testing::Bool(), testing::Values(FilterLambdaType::NONE)));
INSTANTIATE_TEST_CASE_P(
FilterLambda, SingleNodeBfsTest,
testing::Combine(
testing::Values(-1), testing::Values(-1),
testing::Values(EdgeAtom::Direction::OUT, EdgeAtom::Direction::IN,
EdgeAtom::Direction::BOTH),
testing::Values(std::vector<std::string>{}), testing::Bool(),
testing::Values(FilterLambdaType::NONE, FilterLambdaType::USE_FRAME,
FilterLambdaType::USE_FRAME_NULL,
FilterLambdaType::USE_CTX, FilterLambdaType::ERROR)));

View File

@ -1,114 +0,0 @@
#include "gtest/gtest.h"
#include "database/graph_db_accessor.hpp"
#include "distributed/bfs_rpc_clients.hpp"
#include "distributed_common.hpp"
using namespace database;
std::vector<int> V = {0, 1, 1, 0, 1, 2};
std::vector<std::pair<int, int>> E = {{0, 1}, {1, 2}, {1, 5},
{2, 4}, {2, 5}, {3, 4}};
class BfsTest : public DistributedGraphDbTest {
protected:
void SetUp() override {
DistributedGraphDbTest::SetUp();
for (int v : V) {
auto vertex = v == 0 ? InsertVertex(master()) : InsertVertex(worker(v));
vertices.emplace_back(vertex);
}
for (auto e : E) {
edges[e] = InsertEdge(vertices[e.first], vertices[e.second], "Edge");
}
}
public:
BfsTest() : DistributedGraphDbTest("bfs") {}
std::vector<storage::VertexAddress> vertices;
std::map<std::pair<int, int>, storage::EdgeAddress> edges;
};
TEST_F(BfsTest, Expansion) {
auto dba = master().Access();
auto &clients = master().bfs_subcursor_clients();
auto subcursor_ids = clients.CreateBfsSubcursors(
dba->transaction_id(), query::EdgeAtom::Direction::BOTH,
{dba->EdgeType("Edge")}, query::GraphView::OLD);
clients.RegisterSubcursors(subcursor_ids);
clients.SetSource(subcursor_ids, vertices[0]);
auto pull = [&clients, &subcursor_ids, &dba](int worker_id) {
return clients.Pull(worker_id, subcursor_ids[worker_id], dba.get());
};
EXPECT_EQ(pull(0), std::experimental::nullopt);
EXPECT_EQ(pull(1)->GlobalAddress(), vertices[1]);
EXPECT_EQ(pull(2), std::experimental::nullopt);
clients.PrepareForExpand(subcursor_ids, false);
clients.ExpandLevel(subcursor_ids);
EXPECT_EQ(pull(0), std::experimental::nullopt);
EXPECT_EQ(pull(1)->GlobalAddress(), vertices[2]);
EXPECT_EQ(pull(1), std::experimental::nullopt);
EXPECT_EQ(pull(2)->GlobalAddress(), vertices[5]);
EXPECT_EQ(pull(2), std::experimental::nullopt);
clients.PrepareForExpand(subcursor_ids, false);
clients.ExpandLevel(subcursor_ids);
EXPECT_EQ(pull(0), std::experimental::nullopt);
EXPECT_EQ(pull(1)->GlobalAddress(), vertices[4]);
EXPECT_EQ(pull(1), std::experimental::nullopt);
EXPECT_EQ(pull(2), std::experimental::nullopt);
clients.PrepareForExpand(subcursor_ids, false);
clients.ExpandLevel(subcursor_ids);
EXPECT_EQ(pull(0)->GlobalAddress(), vertices[3]);
EXPECT_EQ(pull(0), std::experimental::nullopt);
EXPECT_EQ(pull(1), std::experimental::nullopt);
EXPECT_EQ(pull(2), std::experimental::nullopt);
auto compare = [this](const std::vector<EdgeAccessor> &lhs,
const std::vector<std::pair<int, int>> &rhs) {
EXPECT_EQ(lhs.size(), rhs.size());
if (lhs.size() != rhs.size()) return;
for (auto idx = 0u; idx < lhs.size(); ++idx) {
EXPECT_EQ(lhs[idx].GlobalAddress(), edges[rhs[idx]]);
}
};
distributed::PathSegment ps;
ps = clients.ReconstructPath(subcursor_ids, vertices[3], dba.get());
ASSERT_EQ(ps.next_vertex, vertices[4]);
ASSERT_EQ(ps.next_edge, std::experimental::nullopt);
compare(ps.edges, {{3, 4}});
ps = clients.ReconstructPath(subcursor_ids, vertices[4], dba.get());
EXPECT_EQ(ps.next_vertex, std::experimental::nullopt);
EXPECT_EQ(ps.next_edge, (edges[{0, 1}]));
compare(ps.edges, {{2, 4}, {1, 2}});
ps = clients.ReconstructPath(subcursor_ids, edges[{0, 1}], dba.get());
EXPECT_EQ(ps.next_vertex, std::experimental::nullopt);
EXPECT_EQ(ps.next_edge, std::experimental::nullopt);
compare(ps.edges, {{0, 1}});
clients.PrepareForExpand(subcursor_ids, true);
clients.SetSource(subcursor_ids, vertices[3]);
EXPECT_EQ(pull(0), std::experimental::nullopt);
EXPECT_EQ(pull(1)->GlobalAddress(), vertices[4]);
EXPECT_EQ(pull(1), std::experimental::nullopt);
EXPECT_EQ(pull(2), std::experimental::nullopt);
clients.RemoveBfsSubcursors(subcursor_ids);
}

View File

@ -483,6 +483,8 @@ auto GetMerge(AstStorage &storage, Pattern *pattern, OnMatch on_match,
std::make_pair(property_name, dba.Property(property_name))
#define PROPERTY_LOOKUP(...) \
query::test_common::GetPropertyLookup(storage, dba, __VA_ARGS__)
#define PARAMETER_LOOKUP(token_position) \
storage.Create<query::ParameterLookup>((token_position))
#define NEXPR(name, expr) storage.Create<query::NamedExpression>((name), (expr))
// AS is alternative to NEXPR which does not initialize NamedExpression with
// Expression. It should be used with RETURN or WITH. For example:
@ -554,6 +556,8 @@ auto GetMerge(AstStorage &storage, Pattern *pattern, OnMatch on_match,
#define OR(expr1, expr2) storage.Create<query::OrOperator>((expr1), (expr2))
#define IN_LIST(expr1, expr2) \
storage.Create<query::InListOperator>((expr1), (expr2))
#define IF(cond, then, else) \
storage.Create<query::IfOperator>((cond), (then), (else))
// Function call
#define FN(function_name, ...) \
storage.Create<query::Function>( \

View File

@ -174,7 +174,7 @@ TEST_F(QueryCostEstimator, ExpandVariable) {
NextSymbol(), NextSymbol(), EdgeAtom::Type::DEPTH_FIRST,
EdgeAtom::Direction::IN, std::vector<storage::EdgeType>{}, false, nullptr,
nullptr, last_op_, NextSymbol(), false,
ExpandVariable::Lambda{NextSymbol(), NextSymbol(), nullptr},
ExpansionLambda{NextSymbol(), NextSymbol(), nullptr},
std::experimental::nullopt, std::experimental::nullopt, GraphView::OLD);
EXPECT_COST(CardParam::kExpandVariable * CostParam::kExpandVariable);
}

View File

@ -569,9 +569,9 @@ class QueryPlanExpandVariable : public testing::Test {
n_to_sym, edge_sym, EdgeAtom::Type::DEPTH_FIRST, direction,
edge_types, is_reverse, convert(lower), convert(upper), filter_op,
n_from.sym_, false,
ExpandVariable::Lambda{symbol_table.CreateSymbol("inner_edge", false),
symbol_table.CreateSymbol("inner_node", false),
nullptr},
ExpansionLambda{symbol_table.CreateSymbol("inner_edge", false),
symbol_table.CreateSymbol("inner_node", false),
nullptr},
std::experimental::nullopt, std::experimental::nullopt, graph_view);
} else
return std::make_shared<Expand>(n_to_sym, edge_sym, direction, edge_types,
@ -832,642 +832,6 @@ struct hash<std::pair<int, int>> {
};
} // namespace std
std::vector<std::vector<int>> FloydWarshall(
int num_vertices, const std::vector<std::pair<int, int>> &edges,
EdgeAtom::Direction dir) {
auto has_edge = [&](int u, int v) -> bool {
bool res = false;
if (dir != EdgeAtom::Direction::IN)
res |= utils::Contains(edges, std::make_pair(u, v));
if (dir != EdgeAtom::Direction::OUT)
res |= utils::Contains(edges, std::make_pair(v, u));
return res;
};
int inf = std::numeric_limits<int>::max();
std::vector<std::vector<int>> dist(num_vertices,
std::vector<int>(num_vertices, inf));
for (int i = 0; i < num_vertices; ++i)
for (int j = 0; j < num_vertices; ++j)
if (has_edge(i, j)) dist[i][j] = 1;
for (int i = 0; i < num_vertices; ++i) dist[i][i] = 0;
for (int k = 0; k < num_vertices; ++k) {
for (int i = 0; i < num_vertices; ++i) {
for (int j = 0; j < num_vertices; ++j) {
if (dist[i][k] == inf || dist[k][j] == inf) continue;
dist[i][j] = std::min(dist[i][j], dist[i][k] + dist[k][j]);
}
}
}
for (int i = 0; i < num_vertices; ++i)
for (int j = 0; j < num_vertices; ++j)
if (dist[i][j] == inf) dist[i][j] = -1;
return dist;
}
class STShortestPathTest : public ::testing::Test {
protected:
STShortestPathTest() : db(), dba_ptr(db.Access()), dba(*dba_ptr) {}
void SetUp() {
for (int i = 0; i < NUM_VERTICES; ++i) {
vertices.emplace_back(dba.InsertVertex());
vertices[i].PropsSet(dba.Property("id"), i);
}
for (auto edge : EDGES) {
edges.emplace_back(dba.InsertEdge(
vertices[edge.first], vertices[edge.second], dba.EdgeType("Edge")));
edges.back().PropsSet(dba.Property("id"),
fmt::format("{}-{}", edge.first, edge.second));
}
dba.AdvanceCommand();
ASSERT_EQ(dba.VerticesCount(), NUM_VERTICES);
ASSERT_EQ(dba.EdgesCount(), EDGES.size());
}
std::vector<std::vector<TypedValue>> ShortestPaths(
std::shared_ptr<query::plan::LogicalOperator> input_cursor,
Symbol source_symbol, Symbol sink_symbol, EdgeAtom::Direction dir,
Expression *lower_bound = nullptr, Expression *upper_bound = nullptr,
std::experimental::optional<ExpandVariable::Lambda> expand_lambda =
std::experimental::nullopt) {
if (!expand_lambda) {
expand_lambda = ExpandVariable::Lambda{
symbol_table.CreateSymbol("inner_edge", true),
symbol_table.CreateSymbol("inner_node", true), nullptr};
}
auto edges_symbol = symbol_table.CreateSymbol("edges_symbol", true);
auto expand_variable = std::make_shared<ExpandVariable>(
sink_symbol, edges_symbol, EdgeAtom::Type::BREADTH_FIRST, dir,
std::vector<storage::EdgeType>{dba.EdgeType("Edge")}, false,
lower_bound, upper_bound, input_cursor, source_symbol, true,
*expand_lambda, std::experimental::nullopt, std::experimental::nullopt,
GraphView::OLD);
auto source_output_symbol =
symbol_table.CreateSymbol("s", true, Symbol::Type::Vertex);
auto sink_output_symbol =
symbol_table.CreateSymbol("t", true, Symbol::Type::Vertex);
auto edges_output_symbol =
symbol_table.CreateSymbol("edge", true, Symbol::Type::EdgeList);
auto source_id = IDENT("s");
auto sink_id = IDENT("t");
auto edges_id = IDENT("e");
symbol_table[*source_id] = source_symbol;
symbol_table[*sink_id] = sink_symbol;
symbol_table[*edges_id] = edges_symbol;
auto source_ne = NEXPR("s", source_id);
auto sink_ne = NEXPR("s", sink_id);
auto edges_ne = NEXPR("e", edges_id);
symbol_table[*source_ne] = source_output_symbol;
symbol_table[*sink_ne] = sink_output_symbol;
symbol_table[*edges_ne] = edges_output_symbol;
auto produce = MakeProduce(expand_variable, source_ne, sink_ne, edges_ne);
return CollectProduce(produce.get(), symbol_table, dba);
}
void CheckPath(const VertexAccessor &source, const VertexAccessor &sink,
EdgeAtom::Direction dir, const std::vector<TypedValue> &path) {
// Check that the given path is actually a path from source to sink, that
// expansion direction is correct and that given edges actually exist in the
// test graph
VertexAccessor curr = source;
for (const auto &edge_tv : path) {
EXPECT_TRUE(edge_tv.IsEdge());
auto edge = edge_tv.ValueEdge();
EXPECT_TRUE(edge.from() == curr || edge.to() == curr);
EXPECT_TRUE(curr == edge.from() || dir != EdgeAtom::Direction::OUT);
EXPECT_TRUE(curr == edge.to() || dir != EdgeAtom::Direction::IN);
int from = edge.from().PropsAt(dba.Property("id")).Value<int64_t>();
int to = edge.to().PropsAt(dba.Property("id")).Value<int64_t>();
EXPECT_TRUE(utils::Contains(EDGES, std::make_pair(from, to)));
curr = curr == edge.from() ? edge.to() : edge.from();
}
EXPECT_EQ(curr, sink);
}
database::SingleNode db;
std::unique_ptr<database::GraphDbAccessor> dba_ptr;
database::GraphDbAccessor &dba;
std::vector<VertexAccessor> vertices;
std::vector<EdgeAccessor> edges;
AstStorage storage;
SymbolTable symbol_table;
const int NUM_VERTICES = 6;
const std::vector<std::pair<int, int>> EDGES = {
{0, 1}, {1, 2}, {2, 4}, {2, 5}, {4, 1}, {4, 5}, {5, 4}, {5, 5}, {5, 3}};
};
TEST_F(STShortestPathTest, DirectionAndExpansionDepth) {
auto lower_bounds = iter::range(-1, NUM_VERTICES + 1);
auto upper_bounds = iter::range(-1, NUM_VERTICES + 1);
auto directions = std::vector<EdgeAtom::Direction>{EdgeAtom::Direction::IN,
EdgeAtom::Direction::OUT,
EdgeAtom::Direction::BOTH};
for (const auto &test :
iter::product(lower_bounds, upper_bounds, directions)) {
int lower_bound;
int upper_bound;
EdgeAtom::Direction dir;
std::tie(lower_bound, upper_bound, dir) = test;
auto dist = FloydWarshall(NUM_VERTICES, EDGES, dir);
auto source = MakeScanAll(storage, symbol_table, "s");
auto sink = MakeScanAll(storage, symbol_table, "t", source.op_);
auto results =
ShortestPaths(sink.op_, source.sym_, sink.sym_, dir,
lower_bound == -1 ? nullptr : LITERAL(lower_bound),
upper_bound == -1 ? nullptr : LITERAL(upper_bound));
if (lower_bound == -1) lower_bound = 0;
if (upper_bound == -1) upper_bound = NUM_VERTICES;
size_t output_count = 0;
for (int i = 0; i < NUM_VERTICES; ++i) {
for (int j = 0; j < NUM_VERTICES; ++j) {
if (i != j && dist[i][j] != -1 && dist[i][j] >= lower_bound &&
dist[i][j] <= upper_bound)
++output_count;
}
}
EXPECT_EQ(results.size(), output_count);
for (const auto &result : results) {
int s =
result[0].ValueVertex().PropsAt(dba.Property("id")).Value<int64_t>();
int t =
result[1].ValueVertex().PropsAt(dba.Property("id")).Value<int64_t>();
EXPECT_EQ(dist[s][t], (int)result[2].ValueList().size());
CheckPath(result[0].ValueVertex(), result[1].ValueVertex(), dir,
result[2].ValueList());
}
}
}
TEST_F(STShortestPathTest, ExpandLambda) {
Symbol inner_node_symbol = symbol_table.CreateSymbol("inner_node", true);
Symbol inner_edge_symbol = symbol_table.CreateSymbol("inner_edge", true);
auto inner_node = IDENT("inner_node");
auto inner_edge = IDENT("inner_edge");
symbol_table[*inner_node] = inner_node_symbol;
symbol_table[*inner_edge] = inner_edge_symbol;
// (filter expression, expected shortest path length)
std::vector<std::pair<Expression *, int>> tests = {
// Block vertex 1 (this stops expansion from source side)
{NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(1)), -1},
// Block vertex 5 (this stops expansion from sink side)
{NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(5)), -1},
// Block source vertex
{NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(0)), 4},
// Block sink vertex
{NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(3)), -1},
// Block edge 0-1 (this stops expansion from source side)
{NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")), LITERAL("0-1")),
-1},
// Block edge 5-3 (this stops expansion from sink side)
{NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")), LITERAL("5-3")),
-1},
// Block edges 2-5 and 4-1
{AND(NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")), LITERAL("2-5")),
NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")),
LITERAL("4-1"))),
5}};
for (auto test : tests) {
Expression *expression;
int length;
std::tie(expression, length) = test;
auto s = MakeScanAll(storage, symbol_table, "s");
auto source = std::make_shared<Filter>(
s.op_, EQ(PROPERTY_LOOKUP(s.node_->identifier_, dba.Property("id")),
LITERAL(0)));
auto t = MakeScanAll(storage, symbol_table, "t", source);
auto sink = std::make_shared<Filter>(
t.op_, EQ(PROPERTY_LOOKUP(t.node_->identifier_, dba.Property("id")),
LITERAL(3)));
auto results = ShortestPaths(
sink, s.sym_, t.sym_, EdgeAtom::Direction::BOTH, nullptr, nullptr,
ExpandVariable::Lambda{inner_edge_symbol, inner_node_symbol,
expression});
if (length == -1) {
EXPECT_EQ(results.size(), 0);
} else {
ASSERT_EQ(results.size(), 1);
EXPECT_EQ(results[0][2].ValueList().size(), length);
}
}
}
TEST_F(STShortestPathTest, OptionalMatch) {
for (int i = 0; i <= 2; ++i) {
auto s = MakeScanAll(storage, symbol_table, "s");
auto source = std::make_shared<Filter>(
s.op_, EQ(PROPERTY_LOOKUP(s.node_->identifier_, dba.Property("id")),
LITERAL(i == 0 ? 0 : -1)));
auto source_opt = std::make_shared<Optional>(
std::make_shared<Once>(), source, std::vector<Symbol>{s.sym_});
auto t = MakeScanAll(storage, symbol_table, "t");
auto sink = std::make_shared<Filter>(
t.op_, EQ(PROPERTY_LOOKUP(t.node_->identifier_, dba.Property("id")),
LITERAL(i == 1 ? 3 : -1)));
auto sink_opt = std::make_shared<Optional>(source_opt, sink,
std::vector<Symbol>{t.sym_});
auto results =
ShortestPaths(sink_opt, s.sym_, t.sym_, EdgeAtom::Direction::BOTH);
EXPECT_EQ(results.size(), 0);
}
}
enum class TestType { SINGLE_NODE, DISTRIBUTED };
/** A test fixture for breadth first expansion */
class QueryPlanExpandBfs
: public testing::TestWithParam<std::pair<TestType, int>> {
private:
std::unique_ptr<Cluster> cluster_;
std::unique_ptr<database::SingleNode> single_node_;
database::GraphDb *db_{nullptr};
std::unordered_map<std::pair<int, int>, storage::EdgeAddress> e_;
protected:
QueryPlanExpandBfs()
: cluster_(GetParam().first == TestType::DISTRIBUTED
? new Cluster(GetParam().second, "QueryPlanExpandBfs")
: nullptr),
single_node_(GetParam().first == TestType::DISTRIBUTED
? nullptr
: new database::SingleNode()),
db_([&]() -> database::GraphDb * {
if (cluster_) return cluster_->master();
return single_node_.get();
}()),
dba_ptr(db_->Access()),
dba(*dba_ptr) {}
// Worker IDs where vertices are located.
const std::vector<int> vertices = {0, 1, 1, 0, 1, 2};
// Edges in graph.
const std::vector<std::pair<int, int>> edges = {
{0, 1}, {1, 2}, {2, 4}, {2, 5}, {4, 1}, {4, 5}, {5, 4}, {5, 5}, {5, 3}};
// Style-guide non-conformant name due to PROPERTY_PAIR and PROPERTY_LOOKUP
// macro requirements.
std::unique_ptr<database::GraphDbAccessor> dba_ptr;
database::GraphDbAccessor &dba;
std::vector<storage::VertexAddress> v;
AstStorage storage;
SymbolTable symbol_table;
std::pair<std::string, storage::Property> prop = PROPERTY_PAIR("property");
storage::EdgeType edge_type = dba.EdgeType("edge_type");
// Inner edge and vertex symbols.
// Edge from a to b has `prop` with the value ab (all ints).
Symbol inner_edge = symbol_table.CreateSymbol("inner_edge", true);
Symbol inner_node = symbol_table.CreateSymbol("inner_node", true);
int plan_id_{0};
void SetUp() {
for (auto p : iter::enumerate(vertices)) {
int id, worker;
std::tie(id, worker) = p;
if (GetParam().first == TestType::SINGLE_NODE || worker == 0) {
auto vertex = dba.InsertVertex();
vertex.PropsSet(prop.second, id);
v.push_back(vertex.GlobalAddress());
} else {
auto vertex = database::InsertVertexIntoRemote(
&dba, worker, {}, {{prop.second, id}}, std::experimental::nullopt);
v.push_back(vertex.GlobalAddress());
}
}
for (auto p : edges) {
VertexAccessor from(v[p.first], dba);
VertexAccessor to(v[p.second], dba);
auto edge = dba.InsertEdge(from, to, edge_type);
edge.PropsSet(prop.second, p.first * 10 + p.second);
e_.emplace(p, edge.GlobalAddress());
}
AdvanceCommand(dba.transaction_id());
}
// Defines and performs a breadth-first expansion with the given parameters.
// Returns a vector of pairs. Each pair is (vector-of-edges, vertex).
auto ExpandBF(EdgeAtom::Direction direction, int min_depth, int max_depth,
Expression *where, std::vector<int> source_ids = {},
std::experimental::optional<TypedValue> existing_node =
std::experimental::nullopt) {
// If source_ids are empty, we set ID to -1 so no nodes are matched (used
// for optional match test case).
if (source_ids.empty()) {
source_ids = std::vector<int>{-1};
}
std::vector<PropertyValue> sources;
for (const auto id : source_ids) sources.emplace_back(id);
auto s = MakeScanAll(storage, symbol_table, "s");
std::shared_ptr<LogicalOperator> last_op = std::make_shared<Filter>(
s.op_, IN_LIST(PROPERTY_LOOKUP(s.node_->identifier_, prop.second),
LITERAL(sources)));
auto node_sym = symbol_table.CreateSymbol("node", true);
auto edge_list_sym = symbol_table.CreateSymbol("edgelist_", true);
if (GetParam().first == TestType::DISTRIBUTED) {
cluster_->master()->plan_dispatcher().DispatchPlan(plan_id_, last_op,
symbol_table);
last_op = std::make_shared<PullRemote>(last_op, plan_id_,
std::vector<Symbol>{s.sym_});
plan_id_++;
}
last_op = std::make_shared<Optional>(std::make_shared<Once>(), last_op,
std::vector<Symbol>{s.sym_});
if (GetParam().first == TestType::DISTRIBUTED) {
last_op = std::make_shared<DistributedExpandBfs>(
node_sym, edge_list_sym, direction, std::vector<storage::EdgeType>{},
last_op, s.sym_, !!existing_node, GraphView::OLD, LITERAL(min_depth),
LITERAL(max_depth),
ExpandVariable::Lambda{inner_edge, inner_node, where});
} else {
last_op = std::make_shared<ExpandVariable>(
node_sym, edge_list_sym, EdgeAtom::Type::BREADTH_FIRST, direction,
std::vector<storage::EdgeType>{}, false, LITERAL(min_depth),
LITERAL(max_depth), last_op, s.sym_, !!existing_node,
ExpandVariable::Lambda{inner_edge, inner_node, where},
std::experimental::nullopt, std::experimental::nullopt,
GraphView::OLD);
}
Frame frame(symbol_table.max_position());
if (existing_node) {
frame[node_sym] = *existing_node;
}
auto cursor = last_op->MakeCursor(dba);
std::vector<std::pair<std::vector<EdgeAccessor>, VertexAccessor>> results;
Context context(dba);
context.symbol_table_ = symbol_table;
while (cursor->Pull(frame, context)) {
results.emplace_back(std::vector<EdgeAccessor>(),
frame[node_sym].Value<VertexAccessor>());
for (const TypedValue &edge : frame[edge_list_sym].ValueList())
results.back().first.emplace_back(edge.Value<EdgeAccessor>());
}
return results;
}
template <typename TAccessor>
auto GetProp(const TAccessor &accessor) {
return accessor.PropsAt(prop.second).template Value<int64_t>();
}
bool EdgesEqual(const std::vector<EdgeAccessor> &edges,
const std::vector<int> &ids) {
if (edges.size() != ids.size()) return false;
for (size_t i = 0; i < edges.size(); ++i) {
if (GetProp(edges[i]) != ids[i]) return false;
}
return true;
}
void ApplyUpdates(tx::TransactionId tx_id) {
if (GetParam().first == TestType::DISTRIBUTED)
cluster_->ApplyUpdates(tx_id);
}
void AdvanceCommand(tx::TransactionId tx_id) {
if (GetParam().first == TestType::DISTRIBUTED)
cluster_->AdvanceCommand(tx_id);
else
db_->Access(tx_id)->AdvanceCommand();
}
};
TEST_P(QueryPlanExpandBfs, Basic) {
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {0});
ASSERT_EQ(results.size(), 5);
EXPECT_EQ(GetProp(results[0].second), 1);
EXPECT_TRUE(EdgesEqual(results[0].first, {1}));
if (GetProp(results[1].second) == 4) {
std::swap(results[1], results[2]);
}
EXPECT_EQ(GetProp(results[1].second), 2);
EXPECT_TRUE(EdgesEqual(results[1].first, {1, 12}));
EXPECT_EQ(GetProp(results[2].second), 4);
EXPECT_TRUE(EdgesEqual(results[2].first, {1, 41}));
EXPECT_EQ(GetProp(results[3].second), 5);
EXPECT_TRUE(EdgesEqual(results[3].first, {1, 41, 45}) ||
EdgesEqual(results[3].first, {1, 41, 54}) ||
EdgesEqual(results[3].first, {1, 12, 25}));
EXPECT_EQ(GetProp(results[4].second), 3);
EXPECT_TRUE(EdgesEqual(results[4].first, {1, 41, 45, 53}) ||
EdgesEqual(results[4].first, {1, 41, 54, 53}) ||
EdgesEqual(results[4].first, {1, 12, 25, 53}));
}
TEST_P(QueryPlanExpandBfs, EdgeDirection) {
{
auto results = ExpandBF(EdgeAtom::Direction::OUT, 1, 1000, nullptr, {4});
ASSERT_EQ(results.size(), 4);
if (GetProp(results[0].second) == 5) {
std::swap(results[0], results[1]);
}
EXPECT_EQ(GetProp(results[0].second), 1);
EXPECT_TRUE(EdgesEqual(results[0].first, {41}));
EXPECT_EQ(GetProp(results[1].second), 5);
EXPECT_TRUE(EdgesEqual(results[1].first, {45}));
if (GetProp(results[2].second) == 3) {
std::swap(results[2], results[3]);
}
EXPECT_EQ(GetProp(results[2].second), 2);
EXPECT_TRUE(EdgesEqual(results[2].first, {41, 12}));
EXPECT_EQ(GetProp(results[3].second), 3);
EXPECT_TRUE(EdgesEqual(results[3].first, {45, 53}));
}
{
auto results = ExpandBF(EdgeAtom::Direction::IN, 1, 1000, nullptr, {4});
ASSERT_EQ(results.size(), 4);
if (GetProp(results[0].second) == 5) {
std::swap(results[0], results[1]);
}
EXPECT_EQ(GetProp(results[0].second), 2);
EXPECT_TRUE(EdgesEqual(results[0].first, {24}));
EXPECT_EQ(GetProp(results[1].second), 5);
EXPECT_TRUE(EdgesEqual(results[1].first, {54}));
EXPECT_EQ(GetProp(results[2].second), 1);
EXPECT_TRUE(EdgesEqual(results[2].first, {24, 12}));
EXPECT_EQ(GetProp(results[3].second), 0);
EXPECT_TRUE(EdgesEqual(results[3].first, {24, 12, 1}));
}
}
TEST_P(QueryPlanExpandBfs, Where) {
// TODO(mtomic): lambda filtering in distributed
if (GetParam().first == TestType::DISTRIBUTED) {
return;
}
auto ident = IDENT("inner_element");
{
symbol_table[*ident] = inner_node;
auto filter_expr = LESS(PROPERTY_LOOKUP(ident, prop), LITERAL(4));
auto results =
ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, filter_expr, {0});
ASSERT_EQ(results.size(), 2);
EXPECT_EQ(GetProp(results[0].second), 1);
EXPECT_EQ(GetProp(results[1].second), 2);
}
{
symbol_table[*ident] = inner_edge;
auto filter_expr = AND(LESS(PROPERTY_LOOKUP(ident, prop), LITERAL(50)),
NEQ(PROPERTY_LOOKUP(ident, prop), LITERAL(12)));
auto results =
ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, filter_expr, {0});
ASSERT_EQ(results.size(), 4);
EXPECT_EQ(GetProp(results[0].second), 1);
EXPECT_EQ(GetProp(results[1].second), 4);
if (GetProp(results[2].second) == 5) {
std::swap(results[2], results[3]);
}
EXPECT_EQ(GetProp(results[2].second), 2);
EXPECT_EQ(GetProp(results[3].second), 5);
EXPECT_TRUE(EdgesEqual(results[3].first, {1, 41, 45}));
}
}
TEST_P(QueryPlanExpandBfs, MultipleInputs) {
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {0, 3});
// Expect that each vertex has been returned 2 times.
EXPECT_EQ(results.size(), 10);
std::vector<int> found(5, 0);
for (const auto &row : results) found[GetProp(row.second)]++;
EXPECT_EQ(found, (std::vector<int>{1, 2, 2, 1, 2}));
}
TEST_P(QueryPlanExpandBfs, ExistingNode) {
// In single-node, this is handled by STShortestPath cursor instead of
// SingleSourceShortestPath cursor.
using testing::ElementsAre;
using testing::WhenSorted;
{
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {0},
VertexAccessor(v[3], dba));
EXPECT_EQ(results.size(), 1);
EXPECT_EQ(GetProp(results[0].second), 3);
}
{
auto results = ExpandBF(EdgeAtom::Direction::IN, 1, 1000, nullptr,
{0, 1, 2, 3, 4}, VertexAccessor(v[5], dba));
std::vector<int> nodes;
for (auto &row : results) {
EXPECT_EQ(GetProp(row.second), 5);
nodes.push_back(GetProp(row.first[0]) % 10);
}
EXPECT_THAT(nodes, WhenSorted(ElementsAre(1, 2, 3, 4)));
}
{
auto results = ExpandBF(EdgeAtom::Direction::OUT, 1, 1000, nullptr,
{0, 1, 2, 3, 4}, VertexAccessor(v[5], dba));
std::vector<int> nodes;
for (auto &row : results) {
EXPECT_EQ(GetProp(row.second), 5);
nodes.push_back(GetProp(row.first[0]) / 10);
}
EXPECT_THAT(nodes, WhenSorted(ElementsAre(0, 1, 2, 4)));
}
}
TEST_P(QueryPlanExpandBfs, OptionalMatch) {
{
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {});
EXPECT_EQ(results.size(), 0);
}
{
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {0},
TypedValue::Null);
EXPECT_EQ(results.size(), 0);
}
}
TEST_P(QueryPlanExpandBfs, ExpansionDepth) {
{
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 2, 3, nullptr, {0});
EXPECT_EQ(results.size(), 3);
if (GetProp(results[0].second) == 4) {
std::swap(results[0], results[1]);
}
EXPECT_EQ(GetProp(results[0].second), 2);
EXPECT_EQ(GetProp(results[1].second), 4);
EXPECT_EQ(GetProp(results[2].second), 5);
}
}
INSTANTIATE_TEST_CASE_P(SingleNode, QueryPlanExpandBfs,
::testing::Values(std::make_pair(TestType::SINGLE_NODE,
0)));
INSTANTIATE_TEST_CASE_P(Distributed, QueryPlanExpandBfs,
::testing::Values(std::make_pair(TestType::DISTRIBUTED,
2)));
/** A test fixture for weighted shortest path expansion */
class QueryPlanExpandWeightedShortestPath : public testing::Test {
public:
@ -1561,9 +925,9 @@ class QueryPlanExpandWeightedShortestPath : public testing::Test {
direction, std::vector<storage::EdgeType>{}, false, nullptr,
max_depth ? LITERAL(max_depth.value()) : nullptr, last_op, n.sym_,
existing_node_input != nullptr,
ExpandVariable::Lambda{filter_edge, filter_node, where},
ExpandVariable::Lambda{weight_edge, weight_node,
PROPERTY_LOOKUP(ident_e, prop)},
ExpansionLambda{filter_edge, filter_node, where},
ExpansionLambda{weight_edge, weight_node,
PROPERTY_LOOKUP(ident_e, prop)},
total_weight, graph_view);
Frame frame(symbol_table.max_position());