diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3e102cf65..d2052abb6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -134,7 +134,6 @@ add_custom_target(generate_lcp DEPENDS ${generated_lcp_files}) add_capnp(communication/rpc/messages.capnp) add_capnp(durability/recovery.capnp) -add_capnp(query/common.capnp) add_capnp(query/frontend/ast/ast.capnp) add_capnp(query/frontend/semantic/symbol.capnp) add_capnp(query/serialization.capnp) diff --git a/src/database/distributed_graph_db.cpp b/src/database/distributed_graph_db.cpp index 162fa2b79..85d010160 100644 --- a/src/database/distributed_graph_db.cpp +++ b/src/database/distributed_graph_db.cpp @@ -625,8 +625,7 @@ class Master { *storage_, tx_engine_, config_.gc_cycle_sec, server_, coordination_); TypemapPack<storage::MasterConcurrentIdMapper> typemap_pack_{server_}; database::MasterCounters counters_{&server_}; - distributed::BfsSubcursorStorage subcursor_storage_{self_, - &bfs_subcursor_clients_}; + distributed::BfsSubcursorStorage subcursor_storage_{&bfs_subcursor_clients_}; distributed::BfsRpcServer bfs_subcursor_server_{self_, &server_, &subcursor_storage_}; distributed::BfsRpcClients bfs_subcursor_clients_{ @@ -1008,8 +1007,7 @@ class Worker { TypemapPack<storage::WorkerConcurrentIdMapper> typemap_pack_{ *coordination_.GetClientPool(0)}; database::WorkerCounters counters_{coordination_.GetClientPool(0)}; - distributed::BfsSubcursorStorage subcursor_storage_{self_, - &bfs_subcursor_clients_}; + distributed::BfsSubcursorStorage subcursor_storage_{&bfs_subcursor_clients_}; distributed::BfsRpcServer bfs_subcursor_server_{self_, &server_, &subcursor_storage_}; distributed::BfsRpcClients bfs_subcursor_clients_{ diff --git a/src/distributed/bfs_rpc_clients.cpp b/src/distributed/bfs_rpc_clients.cpp index aaec6924d..e1a13ae04 100644 --- a/src/distributed/bfs_rpc_clients.cpp +++ b/src/distributed/bfs_rpc_clients.cpp @@ -16,20 +16,23 @@ BfsRpcClients::BfsRpcClients(database::DistributedGraphDb *db, data_manager_(data_manager) {} std::unordered_map<int16_t, int64_t> BfsRpcClients::CreateBfsSubcursors( - tx::TransactionId tx_id, query::EdgeAtom::Direction direction, + database::GraphDbAccessor *dba, query::EdgeAtom::Direction direction, const std::vector<storage::EdgeType> &edge_types, - query::GraphView graph_view) { + const query::plan::ExpansionLambda &filter_lambda, + const query::SymbolTable &symbol_table, + const query::EvaluationContext &evaluation_context) { auto futures = coordination_->ExecuteOnWorkers<std::pair<int16_t, int64_t>>( - db_->WorkerId(), - [tx_id, direction, &edge_types, graph_view](int worker_id, auto &client) { + db_->WorkerId(), [&](int worker_id, auto &client) { auto res = client.template Call<CreateBfsSubcursorRpc>( - tx_id, direction, edge_types, graph_view); + dba->transaction_id(), direction, edge_types, filter_lambda, + symbol_table, evaluation_context); return std::make_pair(worker_id, res.member); }); std::unordered_map<int16_t, int64_t> subcursor_ids; subcursor_ids.emplace( db_->WorkerId(), - subcursor_storage_->Create(tx_id, direction, edge_types, graph_view)); + subcursor_storage_->Create(dba, direction, edge_types, symbol_table, + nullptr, filter_lambda, evaluation_context)); for (auto &future : futures) { auto got = subcursor_ids.emplace(future.get()); CHECK(got.second) << "CreateBfsSubcursors failed: duplicate worker id"; @@ -55,8 +58,7 @@ void BfsRpcClients::ResetSubcursors( const std::unordered_map<int16_t, int64_t> &subcursor_ids) { auto futures = coordination_->ExecuteOnWorkers<void>( db_->WorkerId(), [&subcursor_ids](int worker_id, auto &client) { - client.template Call<ResetSubcursorRpc>( - subcursor_ids.at(worker_id)); + client.template Call<ResetSubcursorRpc>(subcursor_ids.at(worker_id)); }); subcursor_storage_->Get(subcursor_ids.at(db_->WorkerId()))->Reset(); // Wait and get all of the replies. @@ -85,13 +87,14 @@ std::experimental::optional<VertexAccessor> BfsRpcClients::Pull( return subcursor_storage_->Get(subcursor_id)->Pull(); } - auto res = coordination_->GetClientPool(worker_id)->CallWithLoad<SubcursorPullRpc>( - [this, dba](const auto &reader) { - SubcursorPullRes res; - Load(&res, reader, dba, this->data_manager_); - return res; - }, - subcursor_id); + auto res = + coordination_->GetClientPool(worker_id)->CallWithLoad<SubcursorPullRpc>( + [this, dba](const auto &reader) { + SubcursorPullRes res; + Load(&res, reader, dba, this->data_manager_); + return res; + }, + subcursor_id); return res.vertex; } @@ -101,7 +104,15 @@ bool BfsRpcClients::ExpandLevel( db_->WorkerId(), [&subcursor_ids](int worker_id, auto &client) { auto res = client.template Call<ExpandLevelRpc>(subcursor_ids.at(worker_id)); - return res.member; + switch (res.result) { + case ExpandResult::SUCCESS: + return true; + case ExpandResult::FAILURE: + return false; + case ExpandResult::LAMBDA_ERROR: + throw query::QueryRuntimeException( + "Expansion condition must evaluate to boolean or null"); + } }); bool expanded = subcursor_storage_->Get(subcursor_ids.at(db_->WorkerId()))->ExpandLevel(); @@ -133,9 +144,10 @@ bool BfsRpcClients::ExpandToRemoteVertex( CHECK(!vertex.is_local()) << "ExpandToRemoteVertex should not be called with local vertex"; int worker_id = vertex.address().worker_id(); - auto res = coordination_->GetClientPool(worker_id)->Call<ExpandToRemoteVertexRpc>( - subcursor_ids.at(worker_id), edge.GlobalAddress(), - vertex.GlobalAddress()); + auto res = + coordination_->GetClientPool(worker_id)->Call<ExpandToRemoteVertexRpc>( + subcursor_ids.at(worker_id), edge.GlobalAddress(), + vertex.GlobalAddress()); return res.member; } @@ -179,14 +191,16 @@ PathSegment BfsRpcClients::ReconstructPath( } void BfsRpcClients::PrepareForExpand( - const std::unordered_map<int16_t, int64_t> &subcursor_ids, bool clear) { + const std::unordered_map<int16_t, int64_t> &subcursor_ids, bool clear, + const std::vector<query::TypedValue> &frame) { auto futures = coordination_->ExecuteOnWorkers<void>( - db_->WorkerId(), [clear, &subcursor_ids](int worker_id, auto &client) { + db_->WorkerId(), + [this, clear, &frame, &subcursor_ids](int worker_id, auto &client) { client.template Call<PrepareForExpandRpc>( - subcursor_ids.at(worker_id), clear); + subcursor_ids.at(worker_id), clear, frame, db_->WorkerId()); }); subcursor_storage_->Get(subcursor_ids.at(db_->WorkerId())) - ->PrepareForExpand(clear); + ->PrepareForExpand(clear, frame); // Wait and get all of the replies. for (auto &future : futures) { if (future.valid()) future.get(); diff --git a/src/distributed/bfs_rpc_clients.hpp b/src/distributed/bfs_rpc_clients.hpp index 56e6f369d..0f89e4050 100644 --- a/src/distributed/bfs_rpc_clients.hpp +++ b/src/distributed/bfs_rpc_clients.hpp @@ -23,13 +23,14 @@ class BfsRpcClients { public: BfsRpcClients(database::DistributedGraphDb *db, BfsSubcursorStorage *subcursor_storage, - Coordination *coordination, - DataManager *data_manager); + Coordination *coordination, DataManager *data_manager); std::unordered_map<int16_t, int64_t> CreateBfsSubcursors( - tx::TransactionId tx_id, query::EdgeAtom::Direction direction, + database::GraphDbAccessor *dba, query::EdgeAtom::Direction direction, const std::vector<storage::EdgeType> &edge_types, - query::GraphView graph_view); + const query::plan::ExpansionLambda &filter_lambda, + const query::SymbolTable &symbol_table, + const query::EvaluationContext &evaluation_context); void RegisterSubcursors( const std::unordered_map<int16_t, int64_t> &subcursor_ids); @@ -61,7 +62,8 @@ class BfsRpcClients { storage::VertexAddress vertex, database::GraphDbAccessor *dba); void PrepareForExpand( - const std::unordered_map<int16_t, int64_t> &subcursor_ids, bool clear); + const std::unordered_map<int16_t, int64_t> &subcursor_ids, bool clear, + const std::vector<query::TypedValue> &frame); private: database::DistributedGraphDb *db_; diff --git a/src/distributed/bfs_rpc_messages.lcp b/src/distributed/bfs_rpc_messages.lcp index f458911a0..0f4404d19 100644 --- a/src/distributed/bfs_rpc_messages.lcp +++ b/src/distributed/bfs_rpc_messages.lcp @@ -6,7 +6,9 @@ #include "communication/rpc/messages.hpp" #include "distributed/bfs_rpc_messages.capnp.h" #include "distributed/bfs_subcursor.hpp" -#include "query/plan/operator.hpp" +#include "query/frontend/semantic/symbol_table.hpp" +#include "query/plan/distributed_ops.hpp" +#include "query/serialization.hpp" #include "storage/serialization.hpp" #include "transactions/type.hpp" #include "utils/serialization.hpp" @@ -17,12 +19,15 @@ cpp<# (lcp:capnp-namespace "distributed") (lcp:capnp-import 'ast "/query/frontend/ast/ast.capnp") -(lcp:capnp-import 'query "/query/common.capnp") +(lcp:capnp-import 'dist-ops "/query/plan/distributed_ops.capnp") +(lcp:capnp-import 'query "/query/serialization.capnp") (lcp:capnp-import 'storage "/storage/serialization.capnp") +(lcp:capnp-import 'symbol "/query/frontend/semantic/symbol.capnp") (lcp:capnp-import 'utils "/utils/serialization.capnp") (lcp:capnp-type-conversion "storage::EdgeAddress" "Storage.Address") (lcp:capnp-type-conversion "storage::VertexAddress" "Storage.Address") +(lcp:capnp-type-conversion "storage::EdgeType" "Storage.Common") (lcp:define-rpc create-bfs-subcursor (:request @@ -35,17 +40,36 @@ cpp<# :capnp-load (lcp:capnp-load-enum "::query::capnp::EdgeAtom::Direction" "query::EdgeAtom::Direction" '(in out both))) - ;; TODO(mtomic): Why isn't edge-types serialized? (edge-types "std::vector<storage::EdgeType>" - :capnp-save :dont-save) - (graph-view "query::GraphView" - :capnp-type "Query.GraphView" :capnp-init nil - :capnp-save (lcp:capnp-save-enum "::query::capnp::GraphView" - "query::GraphView" - '(old new)) - :capnp-load (lcp:capnp-load-enum "::query::capnp::GraphView" - "query::GraphView" - '(old new))))) + :capnp-save (lcp:capnp-save-vector "::storage::capnp::Common" + "storage::EdgeType") + :capnp-load (lcp:capnp-load-vector "::storage::capnp::Common" + "storage::EdgeType")) + (filter-lambda "query::plan::ExpansionLambda" + :capnp-type "DistOps.ExpansionLambda" + :capnp-save (lambda (builder member capnp-name) + #>cpp + std::vector<int> saved_ast_uids; + Save(${member}, &${builder}, &saved_ast_uids); + cpp<#) + :capnp-load (lambda (reader member capnp-name) + #>cpp + std::vector<int> loaded_ast_uids; + Load(&${member}, ${reader}, ast_storage, &loaded_ast_uids); + cpp<#)) + (symbol-table "query::SymbolTable" + :capnp-type "Symbol.SymbolTable") + (evaluation-context "query::EvaluationContext" + :capnp-type "Query.EvaluationContext" + :capnp-save (lambda (builder member capnp-name) + #>cpp + query::SaveEvaluationContext(${member}, &${builder}); + cpp<#) + :capnp-load (lambda (reader member capnp-name) + #>cpp + query::LoadEvaluationContext(${reader}, &${member}); + cpp<#))) + (:serialize :capnp :load-args '((ast-storage "query::AstStorage *")))) (:response ((member :int64_t)))) (lcp:define-rpc register-subcursors @@ -86,9 +110,13 @@ cpp<# (:request ((member :int64_t))) (:response ())) +(lcp:define-enum expand-result + (success failure lambda-error) + (:serialize :capnp)) + (lcp:define-rpc expand-level (:request ((member :int64_t))) - (:response ((member :bool)))) + (:response ((result "ExpandResult")))) (lcp:define-rpc subcursor-pull (:request ((member :int64_t))) @@ -193,7 +221,28 @@ cpp<# (lcp:define-rpc prepare-for-expand (:request ((subcursor-id :int64_t) - (clear :bool))) + (clear :bool) + (frame "std::vector<query::TypedValue>" + :capnp-type "List(Query.TypedValue)" + :capnp-save (lcp:capnp-save-vector + "query::capnp::TypedValue" + "query::TypedValue" + "[&self](auto *builder, const auto &value) { + query::SaveCapnpTypedValue(value, builder, + storage::SendVersions::ONLY_OLD, self.worker_id); + }") + :capnp-load (lcp:capnp-load-vector + "query::capnp::TypedValue" + "query::TypedValue" + "[dba, data_manager](const auto &reader) { + query::TypedValue value; + query::LoadCapnpTypedValue(reader, &value, dba, data_manager); + return value; + }")) + (worker-id :int :capnp-save :dont-save)) + (:serialize :capnp + :load-args '((dba "database::GraphDbAccessor *") + (data-manager "distributed::DataManager *")))) (:response ())) (lcp:pop-namespace) ;; distributed diff --git a/src/distributed/bfs_rpc_server.hpp b/src/distributed/bfs_rpc_server.hpp index 4a4be0295..965284f92 100644 --- a/src/distributed/bfs_rpc_server.hpp +++ b/src/distributed/bfs_rpc_server.hpp @@ -24,9 +24,15 @@ class BfsRpcServer { server_->Register<CreateBfsSubcursorRpc>( [this](const auto &req_reader, auto *res_builder) { CreateBfsSubcursorReq req; - Load(&req, req_reader); - CreateBfsSubcursorRes res(subcursor_storage_->Create( - req.tx_id, req.direction, req.edge_types, req.graph_view)); + auto ast_storage = std::make_unique<query::AstStorage>(); + Load(&req, req_reader, ast_storage.get()); + auto db_accessor = db_->Access(req.tx_id); + auto id = subcursor_storage_->Create( + db_accessor.get(), req.direction, req.edge_types, + std::move(req.symbol_table), std::move(ast_storage), + req.filter_lambda, std::move(req.evaluation_context)); + db_accessors_[id] = std::move(db_accessor); + CreateBfsSubcursorRes res(id); Save(res, res_builder); }); @@ -53,6 +59,7 @@ class BfsRpcServer { [this](const auto &req_reader, auto *res_builder) { RemoveBfsSubcursorReq req; Load(&req, req_reader); + db_accessors_.erase(req.member); subcursor_storage_->Erase(req.member); RemoveBfsSubcursorRes res; Save(res, res_builder); @@ -67,13 +74,21 @@ class BfsRpcServer { Save(res, res_builder); }); - server_->Register<ExpandLevelRpc>([this](const auto &req_reader, - auto *res_builder) { - ExpandLevelReq req; - Load(&req, req_reader); - ExpandLevelRes res(subcursor_storage_->Get(req.member)->ExpandLevel()); - Save(res, res_builder); - }); + server_->Register<ExpandLevelRpc>( + [this](const auto &req_reader, auto *res_builder) { + ExpandLevelReq req; + Load(&req, req_reader); + auto subcursor = subcursor_storage_->Get(req.member); + ExpandResult result; + try { + result = subcursor->ExpandLevel() ? ExpandResult::SUCCESS + : ExpandResult::FAILURE; + } catch (const query::QueryRuntimeException &) { + result = ExpandResult::LAMBDA_ERROR; + } + ExpandLevelRes res(result); + Save(res, res_builder); + }); server_->Register<SubcursorPullRpc>( [this](const auto &req_reader, auto *res_builder) { @@ -107,15 +122,18 @@ class BfsRpcServer { } else { LOG(FATAL) << "`edge` or `vertex` should be set in ReconstructPathReq"; } - ReconstructPathRes res(result.edges, result.next_vertex, result.next_edge); + ReconstructPathRes res(result.edges, result.next_vertex, + result.next_edge); Save(res, res_builder, db_->WorkerId()); }); server_->Register<PrepareForExpandRpc>([this](const auto &req_reader, auto *res_builder) { PrepareForExpandReq req; - Load(&req, req_reader); - subcursor_storage_->Get(req.subcursor_id)->PrepareForExpand(req.clear); + auto subcursor_id = req_reader.getSubcursorId(); + auto *subcursor = subcursor_storage_->Get(subcursor_id); + Load(&req, req_reader, subcursor->db_accessor(), &db_->data_manager()); + subcursor->PrepareForExpand(req.clear, std::move(req.frame)); PrepareForExpandRes res; Save(res, res_builder); }); @@ -125,6 +143,7 @@ class BfsRpcServer { database::DistributedGraphDb *db_; communication::rpc::Server *server_; + std::map<int64_t, std::unique_ptr<database::GraphDbAccessor>> db_accessors_; BfsSubcursorStorage *subcursor_storage_; }; diff --git a/src/distributed/bfs_subcursor.cpp b/src/distributed/bfs_subcursor.cpp index ccb3fba98..e3c4366fb 100644 --- a/src/distributed/bfs_subcursor.cpp +++ b/src/distributed/bfs_subcursor.cpp @@ -4,6 +4,7 @@ #include "database/distributed_graph_db.hpp" #include "distributed/bfs_rpc_clients.hpp" +#include "query/exceptions.hpp" #include "query/plan/operator.hpp" #include "storage/address_types.hpp" #include "storage/vertex_accessor.hpp" @@ -13,15 +14,23 @@ namespace distributed { using query::TypedValue; ExpandBfsSubcursor::ExpandBfsSubcursor( - database::GraphDb *db, tx::TransactionId tx_id, - query::EdgeAtom::Direction direction, - std::vector<storage::EdgeType> edge_types, query::GraphView graph_view, + database::GraphDbAccessor *dba, query::EdgeAtom::Direction direction, + std::vector<storage::EdgeType> edge_types, query::SymbolTable symbol_table, + std::unique_ptr<query::AstStorage> ast_storage, + query::plan::ExpansionLambda filter_lambda, + query::EvaluationContext evaluation_context, BfsRpcClients *bfs_subcursor_clients) : bfs_subcursor_clients_(bfs_subcursor_clients), - dba_(db->Access(tx_id)), + dba_(dba), direction_(direction), edge_types_(std::move(edge_types)), - graph_view_(graph_view) { + symbol_table_(std::move(symbol_table)), + ast_storage_(std::move(ast_storage)), + filter_lambda_(filter_lambda), + evaluation_context_(std::move(evaluation_context)), + frame_(symbol_table_.max_position()), + expression_evaluator_(&frame_, symbol_table_, evaluation_context_, dba_, + query::GraphView::OLD) { Reset(); } @@ -35,14 +44,15 @@ void ExpandBfsSubcursor::Reset() { void ExpandBfsSubcursor::SetSource(storage::VertexAddress source_address) { Reset(); auto source = VertexAccessor(source_address, *dba_); - SwitchAccessor(source, graph_view_); processed_.emplace(source, std::experimental::nullopt); ExpandFromVertex(source); } -void ExpandBfsSubcursor::PrepareForExpand(bool clear) { +void ExpandBfsSubcursor::PrepareForExpand( + bool clear, std::vector<query::TypedValue> frame) { if (clear) { Reset(); + frame_.elems() = std::move(frame); } else { std::swap(to_visit_current_, to_visit_next_); to_visit_next_.clear(); @@ -71,7 +81,6 @@ bool ExpandBfsSubcursor::ExpandToLocalVertex(storage::EdgeAddress edge, << "ExpandToLocalVertex called with remote vertex"; edge = dba_->db().storage().LocalizedAddressIfPossible(edge); - SwitchAccessor(vertex, graph_view_); std::lock_guard<std::mutex> lock(mutex_); auto got = processed_.emplace(vertex, edge); @@ -146,7 +155,18 @@ void ExpandBfsSubcursor::ReconstructPathHelper(VertexAccessor vertex, bool ExpandBfsSubcursor::ExpandToVertex(EdgeAccessor edge, VertexAccessor vertex) { - // TODO(mtomic): lambda filtering in distributed + if (filter_lambda_.expression) { + frame_[filter_lambda_.inner_edge_symbol] = edge; + frame_[filter_lambda_.inner_node_symbol] = vertex; + TypedValue result = + filter_lambda_.expression->Accept(expression_evaluator_); + if (!result.IsNull() && !result.IsBool()) { + throw query::QueryRuntimeException( + "Expansion condition must evaluate to boolean or null"); + } + if (result.IsNull() || !result.ValueBool()) return false; + } + return vertex.is_local() ? ExpandToLocalVertex(edge.address(), vertex) : bfs_subcursor_clients_->ExpandToRemoteVertex( subcursor_ids_, edge, vertex); @@ -165,20 +185,22 @@ bool ExpandBfsSubcursor::ExpandFromVertex(VertexAccessor vertex) { return expanded; } -BfsSubcursorStorage::BfsSubcursorStorage(database::GraphDb *db, - BfsRpcClients *bfs_subcursor_clients) - : db_(db), bfs_subcursor_clients_(bfs_subcursor_clients) {} +BfsSubcursorStorage::BfsSubcursorStorage(BfsRpcClients *bfs_subcursor_clients) + : bfs_subcursor_clients_(bfs_subcursor_clients) {} -int64_t BfsSubcursorStorage::Create(tx::TransactionId tx_id, - query::EdgeAtom::Direction direction, - std::vector<storage::EdgeType> edge_types, - query::GraphView graph_view) { +int64_t BfsSubcursorStorage::Create( + database::GraphDbAccessor *dba, query::EdgeAtom::Direction direction, + std::vector<storage::EdgeType> edge_types, query::SymbolTable symbol_table, + std::unique_ptr<query::AstStorage> ast_storage, + query::plan::ExpansionLambda filter_lambda, + query::EvaluationContext evaluation_context) { std::lock_guard<std::mutex> lock(mutex_); int64_t id = next_subcursor_id_++; - auto got = - storage_.emplace(id, std::make_unique<ExpandBfsSubcursor>( - db_, tx_id, direction, std::move(edge_types), - graph_view, bfs_subcursor_clients_)); + auto got = storage_.emplace( + id, std::make_unique<ExpandBfsSubcursor>( + dba, direction, std::move(edge_types), std::move(symbol_table), + std::move(ast_storage), filter_lambda, + std::move(evaluation_context), bfs_subcursor_clients_)); CHECK(got.second) << "Subcursor with ID " << id << " already exists"; return id; } diff --git a/src/distributed/bfs_subcursor.hpp b/src/distributed/bfs_subcursor.hpp index 36221f174..9d3cc9cc1 100644 --- a/src/distributed/bfs_subcursor.hpp +++ b/src/distributed/bfs_subcursor.hpp @@ -8,6 +8,9 @@ #include "glog/logging.h" #include "database/graph_db_accessor.hpp" +#include "query/context.hpp" +#include "query/frontend/semantic/symbol_table.hpp" +#include "query/interpret/eval.hpp" #include "query/plan/operator.hpp" namespace database { @@ -32,10 +35,13 @@ struct PathSegment { /// class per worker, and those instances communicate via RPC calls. class ExpandBfsSubcursor { public: - ExpandBfsSubcursor(database::GraphDb *db, tx::TransactionId tx_id, + ExpandBfsSubcursor(database::GraphDbAccessor *dba, query::EdgeAtom::Direction direction, std::vector<storage::EdgeType> edge_types, - query::GraphView graph_view, + query::SymbolTable symbol_table, + std::unique_ptr<query::AstStorage> ast_storage, + query::plan::ExpansionLambda filter_lambda, + query::EvaluationContext evaluation_context, BfsRpcClients *bfs_subcursor_clients); // Stores subcursor ids of other workers. @@ -55,7 +61,8 @@ class ExpandBfsSubcursor { /// /// @param clear if set to true, `Reset` will be called instead of moving /// `to_visit_next_` - void PrepareForExpand(bool clear); + // @param frame frame for evaluation of filter lambda expression + void PrepareForExpand(bool clear, std::vector<query::TypedValue> frame); /// Expands the BFS frontier once. Returns true if there was a successful /// expansion. @@ -77,7 +84,7 @@ class ExpandBfsSubcursor { /// Reconstruct the part of path to given vertex stored on this worker. PathSegment ReconstructPath(storage::VertexAddress vertex_addr); - database::GraphDbAccessor *db_accessor() { return dba_.get(); } + database::GraphDbAccessor *db_accessor() { return dba_; } /// Used to reset subcursor state before starting expansion from new source. void Reset(); @@ -96,14 +103,26 @@ class ExpandBfsSubcursor { BfsRpcClients *bfs_subcursor_clients_{nullptr}; - std::unique_ptr<database::GraphDbAccessor> dba_; + database::GraphDbAccessor *dba_; /// IDs of subcursors on other workers, used when sending RPCs. std::unordered_map<int16_t, int64_t> subcursor_ids_; query::EdgeAtom::Direction direction_; std::vector<storage::EdgeType> edge_types_; - query::GraphView graph_view_; + + /// Symbol table and AstStorage for filter lambda evaluation. If subcursor + /// doesn't own the filter lambda expression, `ast_storage_` is set to + /// nullptr. + query::SymbolTable symbol_table_; + std::unique_ptr<query::AstStorage> ast_storage_; + query::plan::ExpansionLambda filter_lambda_; + + /// Evaluation context, frame and expression evaluator for evaluation of + /// filter lambda. + query::EvaluationContext evaluation_context_; + query::Frame frame_; + query::ExpressionEvaluator expression_evaluator_; /// Mutex protecting `to_visit_next_` and `processed_`, because there is a /// race between expansions done locally using `ExpandToLocalVertex` and @@ -130,17 +149,19 @@ class ExpandBfsSubcursor { /// Thread-safe storage for BFS subcursors. class BfsSubcursorStorage { public: - explicit BfsSubcursorStorage(database::GraphDb *db, - BfsRpcClients *bfs_subcursor_clients); + explicit BfsSubcursorStorage(BfsRpcClients *bfs_subcursor_clients); - int64_t Create(tx::TransactionId tx_id, query::EdgeAtom::Direction direction, + int64_t Create(database::GraphDbAccessor *dba, + query::EdgeAtom::Direction direction, std::vector<storage::EdgeType> edge_types, - query::GraphView graph_view); + query::SymbolTable symbol_table, + std::unique_ptr<query::AstStorage> ast_storage, + query::plan::ExpansionLambda filter_lambda, + query::EvaluationContext evaluation_context); void Erase(int64_t subcursor_id); ExpandBfsSubcursor *Get(int64_t subcursor_id); private: - database::GraphDb *db_{nullptr}; BfsRpcClients *bfs_subcursor_clients_{nullptr}; std::mutex mutex_; diff --git a/src/distributed/pull_produce_rpc_messages.lcp b/src/distributed/pull_produce_rpc_messages.lcp index 0acf077e6..debee5f7b 100644 --- a/src/distributed/pull_produce_rpc_messages.lcp +++ b/src/distributed/pull_produce_rpc_messages.lcp @@ -189,33 +189,14 @@ to the appropriate value. Not used on side that generates the response.") (command-id "tx::CommandId") (evaluation-context "query::EvaluationContext" :capnp-type "Query.EvaluationContext" - :capnp-save - (lambda (builder member capnp-name) - (declare (ignore capnp-name)) - #>cpp - ${builder}.setTimestamp(${member}.timestamp); - auto params_builder = ${builder}.initParams().initEntries(${member}.parameters.size()); - size_t i = 0; - for (auto &entry : ${member}.parameters) { - auto builder = params_builder[i]; - auto key_builder = builder.initKey(); - key_builder.setValue(entry.first); - auto value_builder = builder.initValue(); - storage::SaveCapnpPropertyValue(entry.second, &value_builder); - ++i; - } - cpp<#) - :capnp-load - (lambda (reader member capnp-name) - (declare (ignore capnp-name)) - #>cpp - ${member}.timestamp = ${reader}.getTimestamp(); - for (const auto &entry_reader : ${reader}.getParams().getEntries()) { - PropertyValue value; - storage::LoadCapnpPropertyValue(entry_reader.getValue(), &value); - ${member}.parameters.Add(entry_reader.getKey().getValue(), value); - } - cpp<#)) + :capnp-save (lambda (builder member capnp-name) + #>cpp + query::SaveEvaluationContext(${member}, &${builder}); + cpp<#) + :capnp-load (lambda (reader member capnp-name) + #>cpp + query::LoadEvaluationContext(${reader}, &${member}); + cpp<#)) (symbols "std::vector<query::Symbol>" :capnp-type "List(Sem.Symbol)") (accumulate :bool) (batch-size :int64_t) diff --git a/src/query/common.capnp b/src/query/common.capnp deleted file mode 100644 index 5dd38e778..000000000 --- a/src/query/common.capnp +++ /dev/null @@ -1,15 +0,0 @@ -@0xcbc2c66202fdf643; - -using Cxx = import "/capnp/c++.capnp"; -$Cxx.namespace("query::capnp"); - -using Ast = import "/query/frontend/ast/ast.capnp"; - -enum GraphView { - old @0; - new @1; -} - -struct TypedValueVectorCompare { - ordering @0 :List(Ast.Ordering); -} diff --git a/src/query/common.hpp b/src/query/common.hpp index dd2388223..682f4e459 100644 --- a/src/query/common.hpp +++ b/src/query/common.hpp @@ -7,11 +7,10 @@ #include "query/exceptions.hpp" #include "query/frontend/ast/ast.hpp" #include "query/frontend/semantic/symbol.hpp" +#include "query/serialization.capnp.h" #include "query/typed_value.hpp" #include "storage/types.hpp" -#include "query/common.capnp.h" - namespace query { // These are the functions for parsing literals and parameter names from diff --git a/src/query/plan/distributed_ops.cpp b/src/query/plan/distributed_ops.cpp index a766da6f2..afd265ae5 100644 --- a/src/query/plan/distributed_ops.cpp +++ b/src/query/plan/distributed_ops.cpp @@ -123,7 +123,7 @@ DistributedExpandBfs::DistributedExpandBfs( const std::vector<storage::EdgeType> &edge_types, const std::shared_ptr<LogicalOperator> &input, Symbol input_symbol, bool existing_node, GraphView graph_view, Expression *lower_bound, - Expression *upper_bound, const ExpandVariable::Lambda &filter_lambda) + Expression *upper_bound, const ExpansionLambda &filter_lambda) : ExpandCommon(node_symbol, edge_symbol, direction, edge_types, input, input_symbol, existing_node, graph_view), lower_bound_(lower_bound), @@ -1023,18 +1023,8 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { DistributedExpandBfsCursor(const DistributedExpandBfs &self, database::GraphDbAccessor &db) : self_(self), db_(db), input_cursor_(self_.input()->MakeCursor(db)) { - // TODO: Pass in a DistributedGraphDb. - if (auto *distributed_db = - dynamic_cast<database::DistributedGraphDb *>(&db.db())) { - bfs_subcursor_clients_ = &distributed_db->bfs_subcursor_clients(); - } - CHECK(bfs_subcursor_clients_); - subcursor_ids_ = bfs_subcursor_clients_->CreateBfsSubcursors( - db_.transaction_id(), self_.direction_, self_.edge_types_, - self_.graph_view_); - bfs_subcursor_clients_->RegisterSubcursors(subcursor_ids_); - VLOG(10) << "BFS subcursors initialized"; - pull_pos_ = subcursor_ids_.end(); + CHECK(self_.graph_view_ == GraphView::OLD) + << "ExpandVariable should only be planned with GraphView::OLD"; } ~DistributedExpandBfsCursor() { @@ -1042,10 +1032,28 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_); } + void InitSubcursors(database::GraphDbAccessor *dba, + const query::SymbolTable &symbol_table, + const EvaluationContext &evaluation_context) { + // TODO: Pass in a DistributedGraphDb. + if (auto *distributed_db = + dynamic_cast<database::DistributedGraphDb *>(&dba->db())) { + bfs_subcursor_clients_ = &distributed_db->bfs_subcursor_clients(); + } + CHECK(bfs_subcursor_clients_); + subcursor_ids_ = bfs_subcursor_clients_->CreateBfsSubcursors( + dba, self_.direction_, self_.edge_types_, self_.filter_lambda_, + symbol_table, evaluation_context); + bfs_subcursor_clients_->RegisterSubcursors(subcursor_ids_); + VLOG(10) << "BFS subcursors initialized"; + pull_pos_ = subcursor_ids_.end(); + } + bool Pull(Frame &frame, Context &context) override { - // TODO(mtomic): lambda filtering in distributed - if (self_.filter_lambda_.expression) { - throw utils::NotYetImplemented("lambda filtering in distributed BFS"); + if (!subcursors_initialized_) { + InitSubcursors(&context.db_accessor_, context.symbol_table_, + context.evaluation_context_); + subcursors_initialized_ = true; } // Evaluator for the filtering condition and expansion depth. @@ -1064,7 +1072,6 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { pull_pos_->second, &db_); if (vertex) { last_vertex = *vertex; - SwitchAccessor(last_vertex.ValueVertex(), self_.graph_view_); break; } VLOG(10) << "Nothing to pull from " << pull_pos_->first; @@ -1075,8 +1082,7 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { // Handle existence flag if (self_.existing_node_) { TypedValue &node = frame[self_.node_symbol_]; - // Due to optional matching the existing node could be null - if (node.IsNull() || (node != last_vertex).ValueBool()) continue; + if ((node != last_vertex).ValueBool()) continue; // There is no point in traversing the rest of the graph because BFS // can find only one path to a certain node. skip_rest_ = true; @@ -1115,8 +1121,6 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { if (!current_vertex_addr && !current_edge_addr) break; } std::reverse(edges.begin(), edges.end()); - for (auto &edge : edges) - SwitchAccessor(edge.ValueEdge(), self_.graph_view_); frame[self_.edge_symbol_] = std::move(edges); return true; } @@ -1128,7 +1132,7 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { if (current_depth_ < upper_bound_) { VLOG(10) << "Trying to expand again..."; current_depth_++; - bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, false); + bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, false, {}); if (bfs_subcursor_clients_->ExpandLevel(subcursor_ids_)) { continue; } @@ -1141,8 +1145,9 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { auto vertex_value = frame[self_.input_symbol_]; - // It is possible that the vertex is Null due to optional matching. + // Source or sink node could be null due to optional matching. if (vertex_value.IsNull()) continue; + if (self_.existing_node_ && frame[self_.node_symbol_].IsNull()) continue; auto vertex = vertex_value.ValueVertex(); lower_bound_ = self_.lower_bound_ @@ -1153,16 +1158,18 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { ? EvaluateInt(&evaluator, self_.upper_bound_, "Max depth in breadth-first expansion") : std::numeric_limits<int64_t>::max(); + + if (upper_bound_ < 1 || lower_bound_ > upper_bound_) continue; + skip_rest_ = false; - if (upper_bound_ < 1) { - throw QueryRuntimeException( - "Max depth in breadth-first expansion must be at least 1"); - } + pull_pos_ = subcursor_ids_.begin(); VLOG(10) << "Starting BFS from " << vertex << " with limits " << lower_bound_ << ".." << upper_bound_; - bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, true); + + bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, true, + frame.elems()); bfs_subcursor_clients_->SetSource(subcursor_ids_, vertex.GlobalAddress()); current_depth_ = 1; } @@ -1199,6 +1206,8 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { // Next worker master should try pulling from. std::unordered_map<int16_t, int64_t>::iterator pull_pos_; + + bool subcursors_initialized_{false}; }; // Returns a random worker id. Worker ID is obtained from the Db. @@ -1349,8 +1358,7 @@ class DistributedCreateExpandCursor : public query::plan::Cursor { ExpectType(dest_node_symbol, dest_node_value, TypedValue::Type::Vertex); return dest_node_value.Value<VertexAccessor>(); } else { - return CreateVertexOnWorker(worker_id, self_->node_atom_, frame, - context); + return CreateVertexOnWorker(worker_id, self_->node_atom_, frame, context); } } diff --git a/src/query/plan/distributed_ops.lcp b/src/query/plan/distributed_ops.lcp index 0e725ee45..965e321c0 100644 --- a/src/query/plan/distributed_ops.lcp +++ b/src/query/plan/distributed_ops.lcp @@ -160,7 +160,7 @@ Logic of the synchronize operator is: :capnp-save (save-ast-vector "Expression *") :capnp-load (load-ast-vector "Expression *")) (compare "TypedValueVectorCompare" :scope :public - :capnp-type "Common.TypedValueVectorCompare")) + :capnp-type "Query.TypedValueVectorCompare")) (:documentation "Operator that merges distributed OrderBy operators. Instead of using a regular OrderBy on master (which would collect all remote @@ -220,9 +220,17 @@ by having only one result from each worker.") :capnp-type "Ast.Tree" :capnp-init nil :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "Expression *")) - (filter-lambda "ExpandVariable::Lambda" :scope :public + (filter-lambda "ExpansionLambda" :scope :public :documentation "Filter that must be satisfied for expansion to succeed." - :capnp-type "ExpandVariable.Lambda")) + :capnp-type "ExpansionLambda" + :capnp-save (lambda (builder member capnp-name) + #>cpp + Save(${member}, &${builder}, &helper->saved_ast_uids); + cpp<#) + :capnp-load (lambda (reader member capnp-name) + #>cpp + Load(&${member}, ${reader}, &helper->ast_storage, &helper->loaded_ast_uids); + cpp<#))) (:documentation "BFS expansion operator suited for distributed execution.") (:public #>cpp @@ -234,7 +242,7 @@ by having only one result from each worker.") Symbol input_symbol, bool existing_node, GraphView graph_view, Expression *lower_bound, Expression *upper_bound, - const ExpandVariable::Lambda &filter_lambda); + const ExpansionLambda &filter_lambda); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; std::unique_ptr<Cursor> MakeCursor( diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 7914866d8..bc34c8273 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -584,8 +584,8 @@ ExpandVariable::ExpandVariable( const std::vector<storage::EdgeType> &edge_types, bool is_reverse, Expression *lower_bound, Expression *upper_bound, const std::shared_ptr<LogicalOperator> &input, Symbol input_symbol, - bool existing_node, Lambda filter_lambda, - std::experimental::optional<Lambda> weight_lambda, + bool existing_node, ExpansionLambda filter_lambda, + std::experimental::optional<ExpansionLambda> weight_lambda, std::experimental::optional<Symbol> total_weight, GraphView graph_view) : ExpandCommon(node_symbol, edge_symbol, direction, edge_types, input, input_symbol, existing_node, graph_view), @@ -1130,9 +1130,9 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { CHECK(self_.graph_view_ == GraphView::OLD) << "ExpandVariable should only be planned with GraphView::OLD"; CHECK(!self_.existing_node_) << "Single source shortest path algorithm " - "should not be used when `existing_node` " - "flag is set, s-t shortest path algorithm " - "should be used instead!"; + "should not be used when `existing_node` " + "flag is set, s-t shortest path algorithm " + "should be used instead!"; } bool Pull(Frame &frame, Context &context) override { @@ -1191,6 +1191,7 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { // input if (to_visit_current_.empty()) { if (!input_cursor_->Pull(frame, context)) return false; + to_visit_current_.clear(); to_visit_next_.clear(); processed_.clear(); @@ -1198,9 +1199,6 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { auto vertex_value = frame[self_.input_symbol_]; // it is possible that the vertex is Null due to optional matching if (vertex_value.IsNull()) continue; - auto vertex = vertex_value.Value<VertexAccessor>(); - processed_.emplace(vertex, std::experimental::nullopt); - expand_from_vertex(vertex); lower_bound_ = self_.lower_bound_ ? EvaluateInt(&evaluator, self_.lower_bound_, "Min depth in breadth-first expansion") @@ -1209,9 +1207,12 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { ? EvaluateInt(&evaluator, self_.upper_bound_, "Max depth in breadth-first expansion") : std::numeric_limits<int64_t>::max(); - if (upper_bound_ < 1) - throw QueryRuntimeException( - "Maximum depth in breadth-first expansion must be at least 1."); + + if (upper_bound_ < 1 || lower_bound_ > upper_bound_) continue; + + auto vertex = vertex_value.Value<VertexAccessor>(); + processed_.emplace(vertex, std::experimental::nullopt); + expand_from_vertex(vertex); // go back to loop start and see if we expanded anything continue; diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp index 5f87c08da..c912317dd 100644 --- a/src/query/plan/operator.lcp +++ b/src/query/plan/operator.lcp @@ -141,13 +141,13 @@ cpp<# (lcp:capnp-import 'storage "/storage/serialization.capnp") (lcp:capnp-import 'ast "/query/frontend/ast/ast.capnp") (lcp:capnp-import 'semantic "/query/frontend/semantic/symbol.capnp") -(lcp:capnp-import 'common "/query/common.capnp") +(lcp:capnp-import 'query "/query/serialization.capnp") (lcp:capnp-type-conversion "Symbol" "Semantic.Symbol") (lcp:capnp-type-conversion "storage::Label" "Storage.Common") (lcp:capnp-type-conversion "storage::Property" "Storage.Common") (lcp:capnp-type-conversion "storage::EdgeType" "Storage.Common") -(lcp:capnp-type-conversion "GraphView" "Common.GraphView") +(lcp:capnp-type-conversion "GraphView" "Query.GraphView") (lcp:define-class logical-operator ("::utils::Visitable<HierarchicalLogicalOperatorVisitor>") () @@ -810,6 +810,33 @@ pulled.") cpp<#) (:serialize :capnp :inherit-compose '(expand-common))) +(lcp:define-struct expansion-lambda () + ((inner-edge-symbol "Symbol" :documentation "Currently expanded edge symbol.") + (inner-node-symbol "Symbol" :documentation "Currently expanded node symbol.") + (expression "Expression *" :documentation "Expression used in lambda during expansion." + :capnp-type "Ast.Tree" :capnp-init nil + :capnp-save (lambda (builder member capnp-name) + #>cpp + if (${member}) { + auto expression_builder = ${builder}->initExpression(); + ${member}->Save(&expression_builder, saved_ast_uids); + } + cpp<#) + :capnp-load (lambda (reader member capnp-name) + #>cpp + if (${reader}.hasExpression()) { + ${member} = static_cast<Expression *>( + ast_storage->Load(${reader}.getExpression(), + loaded_ast_uids)); + } else { + ${member} = nullptr; + } + cpp<#))) + (:serialize :capnp + :save-args '((saved-ast-uids "std::vector<int> *")) + :load-args '((ast-storage "AstStorage *") + (loaded-ast-uids "std::vector<int> *")))) + (lcp:define-class expand-variable (logical-operator expand-common) ((type "EdgeAtom::Type" :scope :public :capnp-type "Ast.EdgeAtom.Type" :capnp-init nil @@ -827,14 +854,29 @@ pulled.") :capnp-type "Ast.Tree" :capnp-init nil :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "Expression *") :documentation "Optional upper bound of the variable length expansion, defaults are (1, inf)") - (filter-lambda "Lambda" :scope :public) - (weight-lambda "std::experimental::optional<Lambda>" :scope :public + (filter-lambda "ExpansionLambda" + :scope :public + :capnp-save (lambda (builder member capnp-name) + #>cpp + Save(${member}, &${builder}, &helper->saved_ast_uids); + cpp<#) + :capnp-load (lambda (reader member capnp-name) + #>cpp + Load(&${member}, ${reader}, &helper->ast_storage, &helper->loaded_ast_uids); + cpp<#)) + (weight-lambda "std::experimental::optional<ExpansionLambda>" :scope :public :capnp-save (lcp:capnp-save-optional - "capnp::ExpandVariable::Lambda" "ExpandVariable::Lambda" - "[helper](auto *builder, const auto &val) { Save(val, builder, helper); }") + "capnp::ExpansionLambda" "ExpansionLambda" + "[helper](auto *builder, const auto &val) { + Save(val, builder, &helper->saved_ast_uids); + }") :capnp-load (lcp:capnp-load-optional - "capnp::ExpandVariable::Lambda" "ExpandVariable::Lambda" - "[helper](const auto &reader) { ExpandVariable::Lambda val; Load(&val, reader, helper); return val; }")) + "capnp::ExpansionLambda" "ExpansionLambda" + "[helper](const auto &reader) { + ExpansionLambda val; + Load(&val, reader, &helper->ast_storage, &helper->loaded_ast_uids); + return val; + }")) (total-weight "std::experimental::optional<Symbol>" :scope :public :capnp-save (lcp:capnp-save-optional "::query::capnp::Symbol" "Symbol") :capnp-load (lcp:capnp-load-optional "::query::capnp::Symbol" "Symbol"))) @@ -856,15 +898,6 @@ MATCH (a) MATCH (a)--(b)), only expansions that match defined equalities are succesfully pulled.") (:public - (lcp:define-struct lambda () - ((inner-edge-symbol "Symbol" :documentation "Currently expanded edge symbol.") - (inner-node-symbol "Symbol" :documentation "Currently expanded node symbol.") - (expression "Expression *" :documentation "Expression used in lambda during expansion." - :capnp-type "Ast.Tree" :capnp-init nil - :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "Expression *"))) - (:serialize :capnp - :save-args '((helper "LogicalOperator::SaveHelper *")) - :load-args '((helper "LogicalOperator::LoadHelper *")))) #>cpp ExpandVariable() {} @@ -897,8 +930,9 @@ pulled.") bool is_reverse, Expression *lower_bound, Expression *upper_bound, const std::shared_ptr<LogicalOperator> &input, - Symbol input_symbol, bool existing_node, Lambda filter_lambda, - std::experimental::optional<Lambda> weight_lambda, + Symbol input_symbol, bool existing_node, + ExpansionLambda filter_lambda, + std::experimental::optional<ExpansionLambda> weight_lambda, std::experimental::optional<Symbol> total_weight, GraphView graph_view); @@ -1774,7 +1808,7 @@ input should be performed).") :capnp-save #'save-operator-pointer :capnp-load #'load-operator-pointer) (compare "TypedValueVectorCompare" :scope :public - :capnp-type "Common.TypedValueVectorCompare") + :capnp-type "Query.TypedValueVectorCompare") (order-by "std::vector<Expression *>" :scope :public :capnp-type "List(Ast.Tree)" :capnp-save (save-ast-vector "Expression *") diff --git a/src/query/plan/rule_based_planner.hpp b/src/query/plan/rule_based_planner.hpp index e24230e23..71e30d21e 100644 --- a/src/query/plan/rule_based_planner.hpp +++ b/src/query/plan/rule_based_planner.hpp @@ -406,11 +406,11 @@ class RuleBasedPlanner { DCHECK(!utils::Contains(bound_symbols, edge_symbol)) << "Existing edges are not supported"; if (edge->IsVariable()) { - std::experimental::optional<ExpandVariable::Lambda> weight_lambda; + std::experimental::optional<ExpansionLambda> weight_lambda; std::experimental::optional<Symbol> total_weight; if (edge->type_ == EdgeAtom::Type::WEIGHTED_SHORTEST_PATH) { - weight_lambda.emplace(ExpandVariable::Lambda{ + weight_lambda.emplace(ExpansionLambda{ symbol_table.at(*edge->weight_lambda_.inner_edge), symbol_table.at(*edge->weight_lambda_.inner_node), edge->weight_lambda_.expression}); @@ -418,7 +418,7 @@ class RuleBasedPlanner { total_weight.emplace(symbol_table.at(*edge->total_weight_)); } - ExpandVariable::Lambda filter_lambda; + ExpansionLambda filter_lambda; filter_lambda.inner_edge_symbol = symbol_table.at(*edge->filter_lambda_.inner_edge); filter_lambda.inner_node_symbol = diff --git a/src/query/serialization.capnp b/src/query/serialization.capnp index 7b0f82444..9c3c75432 100644 --- a/src/query/serialization.capnp +++ b/src/query/serialization.capnp @@ -1,11 +1,21 @@ @0xf47e119e21912f20; +using Ast = import "/query/frontend/ast/ast.capnp"; using Cxx = import "/capnp/c++.capnp"; using Storage = import "/storage/serialization.capnp"; using Utils = import "/utils/serialization.capnp"; $Cxx.namespace("query::capnp"); +enum GraphView { + old @0; + new @1; +} + +struct TypedValueVectorCompare { + ordering @0 :List(Ast.Ordering); +} + struct EvaluationContext { timestamp @0 : Int64; params @1 : Utils.Map(Utils.BoxInt64, Storage.PropertyValue); diff --git a/src/query/serialization.cpp b/src/query/serialization.cpp index 5a7fae301..648c363ae 100644 --- a/src/query/serialization.cpp +++ b/src/query/serialization.cpp @@ -139,4 +139,30 @@ void LoadCapnpTypedValue(const capnp::TypedValue::Reader &reader, } } +void SaveEvaluationContext(const EvaluationContext &ctx, + capnp::EvaluationContext::Builder *builder) { + builder->setTimestamp(ctx.timestamp); + auto params_builder = + builder->initParams().initEntries(ctx.parameters.size()); + size_t i = 0; + for (auto &entry : ctx.parameters) { + auto builder = params_builder[i]; + auto key_builder = builder.initKey(); + key_builder.setValue(entry.first); + auto value_builder = builder.initValue(); + storage::SaveCapnpPropertyValue(entry.second, &value_builder); + ++i; + } +} + +void LoadEvaluationContext(const capnp::EvaluationContext::Reader &reader, + EvaluationContext *ctx) { + ctx->timestamp = reader.getTimestamp(); + for (const auto &entry_reader : reader.getParams().getEntries()) { + PropertyValue value; + storage::LoadCapnpPropertyValue(entry_reader.getValue(), &value); + ctx->parameters.Add(entry_reader.getKey().getValue(), value); + } +} + } // namespace query diff --git a/src/query/serialization.hpp b/src/query/serialization.hpp index 0c6eeff1e..bdee10649 100644 --- a/src/query/serialization.hpp +++ b/src/query/serialization.hpp @@ -1,5 +1,6 @@ #pragma once +#include "query/context.hpp" #include "query/serialization.capnp.h" #include "query/typed_value.hpp" #include "storage/serialization.hpp" @@ -19,4 +20,10 @@ void LoadCapnpTypedValue(const capnp::TypedValue::Reader &reader, database::GraphDbAccessor *dba, distributed::DataManager *data_manager); +void SaveEvaluationContext(const EvaluationContext &ctx, + capnp::EvaluationContext::Builder *builder); + +void LoadEvaluationContext(const capnp::EvaluationContext::Reader &reader, + EvaluationContext *ctx); + } // namespace query diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index c20bb1d8f..1bd4e0de8 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -58,9 +58,6 @@ target_link_libraries(${test_prefix}datastructure_union_find memgraph_lib kvstor add_unit_test(deferred_deleter.cpp) target_link_libraries(${test_prefix}deferred_deleter memgraph_lib kvstore_dummy_lib) -add_unit_test(distributed_bfs.cpp) -target_link_libraries(${test_prefix}distributed_bfs memgraph_lib kvstore_dummy_lib) - add_unit_test(distributed_coordination.cpp) target_link_libraries(${test_prefix}distributed_coordination memgraph_lib kvstore_dummy_lib) @@ -101,6 +98,12 @@ target_link_libraries(${test_prefix}distributed_updates memgraph_lib kvstore_dum # add_unit_test(distributed_token_sharing.cpp) # target_link_libraries(${test_prefix}distributed_token_sharing memgraph_lib kvstore_dummy_lib) +add_unit_test(bfs_distributed.cpp) +target_link_libraries(${test_prefix}bfs_distributed memgraph_lib kvstore_dummy_lib) + +add_unit_test(bfs_single_node.cpp) +target_link_libraries(${test_prefix}bfs_single_node memgraph_lib kvstore_dummy_lib) + add_unit_test(distributed_dgp_partitioner.cpp) target_link_libraries(${test_prefix}distributed_dgp_partitioner memgraph_lib kvstore_dummy_lib) diff --git a/tests/unit/bfs_common.hpp b/tests/unit/bfs_common.hpp new file mode 100644 index 000000000..2e815fcf2 --- /dev/null +++ b/tests/unit/bfs_common.hpp @@ -0,0 +1,478 @@ +#pragma once + +#include "gtest/gtest.h" + +#include "query/context.hpp" +#include "query/frontend/ast/ast.hpp" +#include "query/interpret/frame.hpp" +#include "query/plan/operator.hpp" +#include "query_common.hpp" + +namespace query { +void PrintTo(const query::EdgeAtom::Direction &dir, std::ostream *os) { + switch (dir) { + case query::EdgeAtom::Direction::IN: + *os << "IN"; + break; + case query::EdgeAtom::Direction::OUT: + *os << "OUT"; + break; + case query::EdgeAtom::Direction::BOTH: + *os << "BOTH"; + break; + } +} +} // namespace query + +const auto kVertexCount = 6; +// Maps vertices to workers +const std::vector<int> kVertexLocations = {0, 1, 1, 0, 2, 2}; +// Edge list in form of (from, to, edge_type). +const std::vector<std::tuple<int, int, std::string>> kEdges = { + {0, 1, "a"}, {1, 2, "b"}, {2, 4, "b"}, {2, 5, "a"}, {4, 1, "a"}, + {4, 5, "a"}, {5, 3, "b"}, {5, 4, "a"}, {5, 5, "b"}}; + +// Filters input edge list by edge type and direction and returns a list of +// pairs representing valid directed edges. +std::vector<std::pair<int, int>> GetEdgeList( + const std::vector<std::tuple<int, int, std::string>> &edges, + query::EdgeAtom::Direction dir, + const std::vector<std::string> &edge_types) { + std::vector<std::pair<int, int>> ret; + for (const auto &e : edges) { + if (edge_types.empty() || utils::Contains(edge_types, std::get<2>(e))) + ret.emplace_back(std::get<0>(e), std::get<1>(e)); + } + switch (dir) { + case query::EdgeAtom::Direction::OUT: + break; + case query::EdgeAtom::Direction::IN: + for (auto &e : ret) std::swap(e.first, e.second); + break; + case query::EdgeAtom::Direction::BOTH: + std::transform( + ret.begin(), ret.end(), std::back_inserter(ret), + [](const auto &e) { return std::make_pair(e.second, e.first); }); + break; + } + return ret; +} + +// Floyd-Warshall algorithm. Given a graph, returns its distance matrix. If +// there is no path between two vertices, corresponding matrix entry will be +// -1. +std::vector<std::vector<int>> FloydWarshall( + int num_vertices, const std::vector<std::pair<int, int>> &edges) { + int inf = std::numeric_limits<int>::max(); + std::vector<std::vector<int>> dist(num_vertices, + std::vector<int>(num_vertices, inf)); + + for (const auto &e : edges) dist[e.first][e.second] = 1; + for (int i = 0; i < num_vertices; ++i) dist[i][i] = 0; + + for (int k = 0; k < num_vertices; ++k) { + for (int i = 0; i < num_vertices; ++i) { + for (int j = 0; j < num_vertices; ++j) { + if (dist[i][k] == inf || dist[k][j] == inf) continue; + dist[i][j] = std::min(dist[i][j], dist[i][k] + dist[k][j]); + } + } + } + + for (int i = 0; i < num_vertices; ++i) + for (int j = 0; j < num_vertices; ++j) + if (dist[i][j] == inf) dist[i][j] = -1; + + return dist; +} + +class Yield : public query::plan::LogicalOperator { + public: + Yield(const std::shared_ptr<query::plan::LogicalOperator> &input, + const std::vector<query::Symbol> &modified_symbols, + const std::vector<std::vector<query::TypedValue>> &values) + : input_(input ? input : std::make_shared<query::plan::Once>()), + modified_symbols_(modified_symbols), + values_(values) {} + + std::unique_ptr<query::plan::Cursor> MakeCursor( + database::GraphDbAccessor &dba) const override { + return std::make_unique<YieldCursor>(this, input_->MakeCursor(dba)); + } + std::vector<query::Symbol> ModifiedSymbols( + const query::SymbolTable &) const override { + return modified_symbols_; + } + bool HasSingleInput() const override { return true; } + std::shared_ptr<query::plan::LogicalOperator> input() const override { + return input_; + } + void set_input(std::shared_ptr<query::plan::LogicalOperator> input) override { + input_ = input; + } + bool Accept(query::plan::HierarchicalLogicalOperatorVisitor &) override { + LOG(FATAL) << "Please go away, visitor!"; + } + + std::shared_ptr<query::plan::LogicalOperator> input_; + std::vector<query::Symbol> modified_symbols_; + std::vector<std::vector<query::TypedValue>> values_; + + class YieldCursor : public query::plan::Cursor { + public: + YieldCursor(const Yield *self, + std::unique_ptr<query::plan::Cursor> input_cursor) + : self_(self), + input_cursor_(std::move(input_cursor)), + pull_index_(self_->values_.size()) {} + bool Pull(query::Frame &frame, query::Context &context) override { + if (pull_index_ == self_->values_.size()) { + if (!input_cursor_->Pull(frame, context)) return false; + pull_index_ = 0; + } + for (size_t i = 0; i < self_->values_[pull_index_].size(); ++i) { + frame[self_->modified_symbols_[i]] = self_->values_[pull_index_][i]; + } + pull_index_++; + return true; + } + void Reset() override { + input_cursor_->Reset(); + pull_index_ = self_->values_.size(); + } + + void Shutdown() override {} + private: + const Yield *self_; + std::unique_ptr<query::plan::Cursor> input_cursor_; + size_t pull_index_; + }; +}; + +std::vector<std::vector<query::TypedValue>> PullResults( + query::plan::LogicalOperator *last_op, query::Context *context, + std::vector<query::Symbol> output_symbols) { + auto cursor = last_op->MakeCursor(context->db_accessor_); + std::vector<std::vector<query::TypedValue>> output; + { + query::Frame frame(context->symbol_table_.max_position()); + while (cursor->Pull(frame, *context)) { + output.emplace_back(); + for (const auto &symbol : output_symbols) { + output.back().push_back(frame[symbol]); + } + } + } + return output; +} + +/* Various types of lambdas. + * NONE - No filter lambda used. + * USE_FRAME - Block a single edge or vertex. Tests if frame is sent over + * the network properly in distributed BFS. + * USE_FRAME_NULL - Block a single node or vertex, but lambda returns null + * instead of false. + * USE_CTX - Block a vertex by checking if its ID is equal to a + * parameter. Tests if evaluation context is sent over the + * network properly in distributed BFS. + * ERROR - Lambda that evaluates to an integer instead of null or + * boolean.In distributed BFS, it will fail on worker other + * than master, to test if errors are propagated correctly. + */ + +enum class FilterLambdaType { NONE, USE_FRAME, USE_FRAME_NULL, USE_CTX, ERROR }; + +// Common interface for single-node and distributed Memgraph. +class Database { + public: + virtual std::unique_ptr<database::GraphDbAccessor> Access() = 0; + virtual void AdvanceCommand(tx::TransactionId tx_id) = 0; + virtual std::unique_ptr<query::plan::LogicalOperator> MakeBfsOperator( + query::Symbol source_sym, query::Symbol sink_sym, query::Symbol edge_sym, + query::EdgeAtom::Direction direction, + const std::vector<storage::EdgeType> &edge_types, + const std::shared_ptr<query::plan::LogicalOperator> &input, + bool existing_node, query::Expression *lower_bound, + query::Expression *upper_bound, + const query::plan::ExpansionLambda &filter_lambda) = 0; + virtual std::pair<std::vector<storage::VertexAddress>, + std::vector<storage::EdgeAddress>> + BuildGraph(database::GraphDbAccessor *dba, + const std::vector<int> &vertex_locations, + const std::vector<std::tuple<int, int, std::string>> &edges) = 0; + + virtual ~Database() {} +}; + +// Returns an operator that yields vertices given by their address. We will also +// include query::TypedValue::Null to account for the optional match case. +std::unique_ptr<query::plan::LogicalOperator> YieldVertices( + database::GraphDbAccessor *dba, + std::vector<storage::VertexAddress> vertices, query::Symbol symbol, + std::shared_ptr<query::plan::LogicalOperator> input_op) { + std::vector<std::vector<query::TypedValue>> frames; + frames.push_back(std::vector<query::TypedValue>{query::TypedValue::Null}); + for (const auto &vertex : vertices) { + frames.push_back( + std::vector<query::TypedValue>{VertexAccessor(vertex, *dba)}); + } + return std::make_unique<Yield>(input_op, std::vector<query::Symbol>{symbol}, + frames); +} + +// Returns an operator that yields edges and vertices given by their address. +std::unique_ptr<query::plan::LogicalOperator> YieldEntities( + database::GraphDbAccessor *dba, + std::vector<storage::VertexAddress> vertices, + std::vector<storage::EdgeAddress> edges, query::Symbol symbol, + std::shared_ptr<query::plan::LogicalOperator> input_op) { + std::vector<std::vector<query::TypedValue>> frames; + for (const auto &vertex : vertices) { + frames.push_back( + std::vector<query::TypedValue>{VertexAccessor(vertex, *dba)}); + } + for (const auto &edge : edges) { + frames.push_back(std::vector<query::TypedValue>{EdgeAccessor(edge, *dba)}); + } + return std::make_unique<Yield>(input_op, std::vector<query::Symbol>{symbol}, + frames); +} + +template <class TRecord> +auto GetProp(const RecordAccessor<TRecord> &rec, std::string prop, + database::GraphDbAccessor *dba) { + return rec.PropsAt(dba->Property(prop)); +} + +// Checks if the given path is actually a path from source to sink and if all +// of its edges exist in the given edge list. +void CheckPath(database::GraphDbAccessor *dba, const VertexAccessor &source, + const VertexAccessor &sink, + const std::vector<query::TypedValue> &path, + const std::vector<std::pair<int, int>> &edges) { + VertexAccessor curr = source; + for (const auto &edge_tv : path) { + ASSERT_TRUE(edge_tv.IsEdge()); + auto edge = edge_tv.ValueEdge(); + + ASSERT_TRUE(edge.from() == curr || edge.to() == curr); + auto next = edge.from_is(curr) ? edge.to() : edge.from(); + + int from = GetProp(curr, "id", dba).Value<int64_t>(); + int to = GetProp(next, "id", dba).Value<int64_t>(); + ASSERT_TRUE(utils::Contains(edges, std::make_pair(from, to))); + + curr = next; + } + ASSERT_EQ(curr, sink); +} + +// Given a list of BFS results of form (from, to, path, blocked entity), +// checks if all paths are valid and returns the distance matrix. +std::vector<std::vector<int>> CheckPathsAndExtractDistances( + database::GraphDbAccessor *dba, + const std::vector<std::pair<int, int>> edges, + const std::vector<std::vector<query::TypedValue>> &results) { + std::vector<std::vector<int>> distances(kVertexCount, + std::vector<int>(kVertexCount, -1)); + + for (size_t i = 0; i < kVertexCount; ++i) distances[i][i] = 0; + + for (const auto &row : results) { + auto source = GetProp(row[0].ValueVertex(), "id", dba).Value<int64_t>(); + auto sink = GetProp(row[1].ValueVertex(), "id", dba).Value<int64_t>(); + distances[source][sink] = row[2].ValueList().size(); + CheckPath(dba, row[0].ValueVertex(), row[1].ValueVertex(), + row[2].ValueList(), edges); + } + return distances; +} + +void BfsTest(Database *db, int lower_bound, int upper_bound, + query::EdgeAtom::Direction direction, + std::vector<std::string> edge_types, bool known_sink, + FilterLambdaType filter_lambda_type) { + auto dba_ptr = db->Access(); + auto &dba = *dba_ptr; + query::AstStorage storage; + query::Context context(*dba_ptr); + query::Symbol blocked_sym = + context.symbol_table_.CreateSymbol("blocked", true); + query::Symbol source_sym = context.symbol_table_.CreateSymbol("source", true); + query::Symbol sink_sym = context.symbol_table_.CreateSymbol("sink", true); + query::Symbol edges_sym = context.symbol_table_.CreateSymbol("edges", true); + query::Symbol inner_node_sym = + context.symbol_table_.CreateSymbol("inner_node", true); + query::Symbol inner_edge_sym = + context.symbol_table_.CreateSymbol("inner_edge", true); + query::Identifier *blocked = IDENT("blocked"); + query::Identifier *inner_node = IDENT("inner_node"); + query::Identifier *inner_edge = IDENT("inner_edge"); + context.symbol_table_[*blocked] = blocked_sym; + context.symbol_table_[*inner_node] = inner_node_sym; + context.symbol_table_[*inner_edge] = inner_edge_sym; + + std::vector<storage::VertexAddress> vertices; + std::vector<storage::EdgeAddress> edges; + + std::tie(vertices, edges) = + db->BuildGraph(dba_ptr.get(), kVertexLocations, kEdges); + + db->AdvanceCommand(dba_ptr->transaction_id()); + + std::shared_ptr<query::plan::LogicalOperator> input_op; + + query::Expression *filter_expr; + + // First build a filter lambda and an operator yielding blocked entities. + switch (filter_lambda_type) { + case FilterLambdaType::NONE: + // No filter lambda, nothing is ever blocked. + input_op = std::make_shared<Yield>( + nullptr, std::vector<query::Symbol>{blocked_sym}, + std::vector<std::vector<query::TypedValue>>{ + {query::TypedValue::Null}}); + filter_expr = nullptr; + break; + case FilterLambdaType::USE_FRAME: + // We block each entity in the graph and run BFS. + input_op = + YieldEntities(dba_ptr.get(), vertices, edges, blocked_sym, nullptr); + filter_expr = AND(NEQ(inner_node, blocked), NEQ(inner_edge, blocked)); + break; + case FilterLambdaType::USE_FRAME_NULL: + // We block each entity in the graph and run BFS. + input_op = + YieldEntities(dba_ptr.get(), vertices, edges, blocked_sym, nullptr); + filter_expr = IF(AND(NEQ(inner_node, blocked), NEQ(inner_edge, blocked)), + LITERAL(true), LITERAL(PropertyValue::Null)); + break; + case FilterLambdaType::USE_CTX: + // We only block vertex #5 and run BFS. + input_op = std::make_shared<Yield>( + nullptr, std::vector<query::Symbol>{blocked_sym}, + std::vector<std::vector<query::TypedValue>>{ + {VertexAccessor(vertices[5], *dba_ptr)}}); + filter_expr = NEQ(PROPERTY_LOOKUP(inner_node, PROPERTY_PAIR("id")), + PARAMETER_LOOKUP(0)); + context.evaluation_context_.parameters.Add(0, 5); + break; + case FilterLambdaType::ERROR: + // Evaluate to 42 for vertex #5 which is on worker 1. + filter_expr = + IF(EQ(PROPERTY_LOOKUP(inner_node, PROPERTY_PAIR("id")), LITERAL(5)), + LITERAL(42), LITERAL(true)); + } + + // We run BFS once from each vertex for each blocked entity. + input_op = YieldVertices(dba_ptr.get(), vertices, source_sym, input_op); + + // If the sink is known, we run BFS for all posible combinations of source, + // sink and blocked entity. + if (known_sink) { + input_op = YieldVertices(dba_ptr.get(), vertices, sink_sym, input_op); + } + + std::vector<storage::EdgeType> storage_edge_types; + for (const auto &t : edge_types) { + storage_edge_types.push_back(dba_ptr->EdgeType(t)); + } + + input_op = db->MakeBfsOperator( + source_sym, sink_sym, edges_sym, direction, storage_edge_types, input_op, + known_sink, lower_bound == -1 ? nullptr : LITERAL(lower_bound), + upper_bound == -1 ? nullptr : LITERAL(upper_bound), + query::plan::ExpansionLambda{inner_edge_sym, inner_node_sym, + filter_expr}); + + std::vector<std::vector<query::TypedValue>> results; + + // An exception should be thrown on one of the pulls. + if (filter_lambda_type == FilterLambdaType::ERROR) { + EXPECT_THROW(PullResults(input_op.get(), &context, + std::vector<query::Symbol>{ + source_sym, sink_sym, edges_sym, blocked_sym}), + query::QueryRuntimeException); + return; + } + + results = PullResults( + input_op.get(), &context, + std::vector<query::Symbol>{source_sym, sink_sym, edges_sym, blocked_sym}); + + // Group results based on blocked entity and compare them to results + // obtained by running Floyd-Warshall. + for (size_t i = 0; i < results.size();) { + int j = i; + auto blocked = results[j][3]; + while (j < results.size() && + query::TypedValue::BoolEqual{}(results[j][3], blocked)) + ++j; + + SCOPED_TRACE(fmt::format("blocked entity = {}", blocked)); + + // When an edge is blocked, it is blocked in both directions so we remove + // it before modifying edge list to account for direction and edge types; + auto edges = kEdges; + if (blocked.IsEdge()) { + int from = + GetProp(blocked.ValueEdge(), "from", dba_ptr.get()).Value<int64_t>(); + int to = + GetProp(blocked.ValueEdge(), "to", dba_ptr.get()).Value<int64_t>(); + edges.erase(std::remove_if(edges.begin(), edges.end(), + [from, to](const auto &e) { + return std::get<0>(e) == from && + std::get<1>(e) == to; + }), + edges.end()); + } + + // Now add edges in opposite direction if necessary. + auto edges_blocked = GetEdgeList(edges, direction, edge_types); + + // When a vertex is blocked, we remove all edges that lead into it. + if (blocked.IsVertex()) { + int id = + GetProp(blocked.ValueVertex(), "id", dba_ptr.get()).Value<int64_t>(); + edges_blocked.erase( + std::remove_if(edges_blocked.begin(), edges_blocked.end(), + [id](const auto &e) { return e.second == id; }), + edges_blocked.end()); + } + + auto correct_with_bounds = FloydWarshall(kVertexCount, edges_blocked); + + if (lower_bound == -1) lower_bound = 0; + if (upper_bound == -1) upper_bound = kVertexCount; + + // Remove paths whose length doesn't satisfy given length bounds. + for (int a = 0; a < kVertexCount; ++a) { + for (int b = 0; b < kVertexCount; ++b) { + if (a != b && (correct_with_bounds[a][b] < lower_bound || + correct_with_bounds[a][b] > upper_bound)) + correct_with_bounds[a][b] = -1; + } + } + + int num_results = 0; + for (int a = 0; a < kVertexCount; ++a) + for (int b = 0; b < kVertexCount; ++b) + if (a != b && correct_with_bounds[a][b] != -1) { + ++num_results; + } + // There should be exactly 1 successful pull for each existing path. + EXPECT_EQ(j - i, num_results); + + auto distances = CheckPathsAndExtractDistances( + dba_ptr.get(), edges_blocked, + std::vector<std::vector<query::TypedValue>>(results.begin() + i, + results.begin() + j)); + + // The distances should also match. + EXPECT_EQ(distances, correct_with_bounds); + + i = j; + } + + dba_ptr->Abort(); +} diff --git a/tests/unit/bfs_distributed.cpp b/tests/unit/bfs_distributed.cpp new file mode 100644 index 000000000..f085f3beb --- /dev/null +++ b/tests/unit/bfs_distributed.cpp @@ -0,0 +1,133 @@ +// Macros from query_common.hpp break enum declaration in distributed_ops.hpp +// (because of the SHOW_STREAMS macro), so we must be careful with the order of +// includes. +#include "query/plan/distributed_ops.hpp" + +#include "bfs_common.hpp" +#include "distributed_common.hpp" + +using namespace query; +using namespace query::plan; + +class DistributedDb : public Database { + public: + DistributedDb() : cluster_(3, "DistributedBfsTest") {} + + std::unique_ptr<database::GraphDbAccessor> Access() override { + return cluster_.master()->Access(); + } + + void AdvanceCommand(tx::TransactionId tx_id) override { + cluster_.AdvanceCommand(tx_id); + } + + std::unique_ptr<LogicalOperator> MakeBfsOperator( + Symbol source_sym, Symbol sink_sym, Symbol edge_sym, + EdgeAtom::Direction direction, + const std::vector<storage::EdgeType> &edge_types, + const std::shared_ptr<LogicalOperator> &input, bool existing_node, + Expression *lower_bound, Expression *upper_bound, + const ExpansionLambda &filter_lambda) override { + return std::make_unique<DistributedExpandBfs>( + sink_sym, edge_sym, direction, edge_types, input, source_sym, + existing_node, GraphView::OLD, lower_bound, upper_bound, filter_lambda); + } + + std::pair<std::vector<storage::VertexAddress>, + std::vector<storage::EdgeAddress>> + BuildGraph( + database::GraphDbAccessor *dba, const std::vector<int> &vertex_locations, + const std::vector<std::tuple<int, int, std::string>> &edges) override { + std::vector<storage::VertexAddress> vertex_addr; + std::vector<storage::EdgeAddress> edge_addr; + + for (size_t id = 0; id < vertex_locations.size(); ++id) { + if (vertex_locations[id] == 0) { + auto vertex = dba->InsertVertex(); + vertex.PropsSet(dba->Property("id"), (int64_t)id); + vertex_addr.push_back(vertex.GlobalAddress()); + } else { + auto vertex = database::InsertVertexIntoRemote( + dba, vertex_locations[id], {}, {{dba->Property("id"), (int64_t)id}}, + std::experimental::nullopt); + vertex_addr.push_back(vertex.GlobalAddress()); + } + } + + for (auto e : edges) { + int u, v; + std::string type; + std::tie(u, v, type) = e; + VertexAccessor from(vertex_addr[u], *dba); + VertexAccessor to(vertex_addr[v], *dba); + auto edge = dba->InsertEdge(from, to, dba->EdgeType(type)); + edge.PropsSet(dba->Property("from"), u); + edge.PropsSet(dba->Property("to"), v); + edge_addr.push_back(edge.GlobalAddress()); + } + + return std::make_pair(vertex_addr, edge_addr); + } + + private: + Cluster cluster_; +}; + +class DistributedBfsTest + : public ::testing::TestWithParam< + std::tuple<int, int, EdgeAtom::Direction, std::vector<std::string>, + bool, FilterLambdaType>> { + public: + static void SetUpTestCase() { db_ = std::make_unique<DistributedDb>(); } + static void TearDownTestCase() { db_ = nullptr; } + + protected: + static std::unique_ptr<DistributedDb> db_; +}; + +std::unique_ptr<DistributedDb> DistributedBfsTest::db_{nullptr}; + +TEST_P(DistributedBfsTest, All) { + int lower_bound; + int upper_bound; + EdgeAtom::Direction direction; + std::vector<std::string> edge_types; + bool known_sink; + FilterLambdaType filter_lambda_type; + std::tie(lower_bound, upper_bound, direction, edge_types, known_sink, + filter_lambda_type) = GetParam(); + BfsTest(db_.get(), lower_bound, upper_bound, direction, edge_types, + known_sink, filter_lambda_type); +} + +INSTANTIATE_TEST_CASE_P( + DirectionAndExpansionDepth, DistributedBfsTest, + testing::Combine(testing::Range(-1, kVertexCount), + testing::Range(-1, kVertexCount), + testing::Values(EdgeAtom::Direction::OUT, + EdgeAtom::Direction::IN, + EdgeAtom::Direction::BOTH), + testing::Values(std::vector<std::string>{}), + testing::Bool(), testing::Values(FilterLambdaType::NONE))); + +INSTANTIATE_TEST_CASE_P( + EdgeType, DistributedBfsTest, + testing::Combine(testing::Values(-1), testing::Values(-1), + testing::Values(EdgeAtom::Direction::OUT, + EdgeAtom::Direction::IN, + EdgeAtom::Direction::BOTH), + testing::Values(std::vector<std::string>{}, + std::vector<std::string>{"a"}, + std::vector<std::string>{"b"}, + std::vector<std::string>{"a", "b"}), + testing::Bool(), testing::Values(FilterLambdaType::NONE))); + +INSTANTIATE_TEST_CASE_P( + FilterLambda, DistributedBfsTest, + testing::Combine( + testing::Values(-1), testing::Values(-1), + testing::Values(EdgeAtom::Direction::OUT, EdgeAtom::Direction::IN, + EdgeAtom::Direction::BOTH), + testing::Values(std::vector<std::string>{}), testing::Bool(), + testing::Values(FilterLambdaType::NONE, FilterLambdaType::USE_FRAME, + FilterLambdaType::USE_CTX, FilterLambdaType::ERROR))); diff --git a/tests/unit/bfs_single_node.cpp b/tests/unit/bfs_single_node.cpp new file mode 100644 index 000000000..998bc33f6 --- /dev/null +++ b/tests/unit/bfs_single_node.cpp @@ -0,0 +1,124 @@ +#include "bfs_common.hpp" + +using namespace query; +using namespace query::plan; + +class SingleNodeDb : public Database { + public: + SingleNodeDb() : db_() {} + + std::unique_ptr<database::GraphDbAccessor> Access() override { + return db_.Access(); + } + + void AdvanceCommand(tx::TransactionId tx_id) override { + auto dba = db_.Access(tx_id); + dba->AdvanceCommand(); + } + + std::unique_ptr<LogicalOperator> MakeBfsOperator( + Symbol source_sym, Symbol sink_sym, Symbol edge_sym, + EdgeAtom::Direction direction, + const std::vector<storage::EdgeType> &edge_types, + const std::shared_ptr<LogicalOperator> &input, bool existing_node, + Expression *lower_bound, Expression *upper_bound, + const ExpansionLambda &filter_lambda) override { + return std::make_unique<ExpandVariable>( + sink_sym, edge_sym, EdgeAtom::Type::BREADTH_FIRST, direction, + edge_types, false, lower_bound, upper_bound, input, source_sym, + existing_node, filter_lambda, std::experimental::nullopt, + std::experimental::nullopt, GraphView::OLD); + } + + std::pair<std::vector<storage::VertexAddress>, + std::vector<storage::EdgeAddress>> + BuildGraph( + database::GraphDbAccessor *dba, const std::vector<int> &vertex_locations, + const std::vector<std::tuple<int, int, std::string>> &edges) override { + std::vector<storage::VertexAddress> vertex_addr; + std::vector<storage::EdgeAddress> edge_addr; + + for (size_t id = 0; id < vertex_locations.size(); ++id) { + auto vertex = dba->InsertVertex(); + vertex.PropsSet(dba->Property("id"), (int64_t)id); + vertex_addr.push_back(vertex.GlobalAddress()); + } + + for (auto e : edges) { + int u, v; + std::string type; + std::tie(u, v, type) = e; + VertexAccessor from(vertex_addr[u], *dba); + VertexAccessor to(vertex_addr[v], *dba); + auto edge = dba->InsertEdge(from, to, dba->EdgeType(type)); + edge.PropsSet(dba->Property("from"), u); + edge.PropsSet(dba->Property("to"), v); + edge_addr.push_back(edge.GlobalAddress()); + } + + return std::make_pair(vertex_addr, edge_addr); + } + + protected: + database::SingleNode db_; +}; + +class SingleNodeBfsTest + : public ::testing::TestWithParam< + std::tuple<int, int, EdgeAtom::Direction, std::vector<std::string>, + bool, FilterLambdaType>> { + public: + static void SetUpTestCase() { db_ = std::make_unique<SingleNodeDb>(); } + static void TearDownTestCase() { db_ = nullptr; } + + protected: + static std::unique_ptr<SingleNodeDb> db_; +}; + +TEST_P(SingleNodeBfsTest, All) { + int lower_bound; + int upper_bound; + EdgeAtom::Direction direction; + std::vector<std::string> edge_types; + bool known_sink; + FilterLambdaType filter_lambda_type; + std::tie(lower_bound, upper_bound, direction, edge_types, known_sink, + filter_lambda_type) = GetParam(); + BfsTest(db_.get(), lower_bound, upper_bound, direction,edge_types, + known_sink, filter_lambda_type); +} + +std::unique_ptr<SingleNodeDb> SingleNodeBfsTest::db_{nullptr}; + +INSTANTIATE_TEST_CASE_P( + DirectionAndExpansionDepth, SingleNodeBfsTest, + testing::Combine(testing::Range(-1, kVertexCount), + testing::Range(-1, kVertexCount), + testing::Values(EdgeAtom::Direction::OUT, + EdgeAtom::Direction::IN, + EdgeAtom::Direction::BOTH), + testing::Values(std::vector<std::string>{}), + testing::Bool(), testing::Values(FilterLambdaType::NONE))); + +INSTANTIATE_TEST_CASE_P( + EdgeType, SingleNodeBfsTest, + testing::Combine(testing::Values(-1), testing::Values(-1), + testing::Values(EdgeAtom::Direction::OUT, + EdgeAtom::Direction::IN, + EdgeAtom::Direction::BOTH), + testing::Values(std::vector<std::string>{}, + std::vector<std::string>{"a"}, + std::vector<std::string>{"b"}, + std::vector<std::string>{"a", "b"}), + testing::Bool(), testing::Values(FilterLambdaType::NONE))); + +INSTANTIATE_TEST_CASE_P( + FilterLambda, SingleNodeBfsTest, + testing::Combine( + testing::Values(-1), testing::Values(-1), + testing::Values(EdgeAtom::Direction::OUT, EdgeAtom::Direction::IN, + EdgeAtom::Direction::BOTH), + testing::Values(std::vector<std::string>{}), testing::Bool(), + testing::Values(FilterLambdaType::NONE, FilterLambdaType::USE_FRAME, + FilterLambdaType::USE_FRAME_NULL, + FilterLambdaType::USE_CTX, FilterLambdaType::ERROR))); diff --git a/tests/unit/distributed_bfs.cpp b/tests/unit/distributed_bfs.cpp deleted file mode 100644 index 861258b3b..000000000 --- a/tests/unit/distributed_bfs.cpp +++ /dev/null @@ -1,114 +0,0 @@ -#include "gtest/gtest.h" - -#include "database/graph_db_accessor.hpp" -#include "distributed/bfs_rpc_clients.hpp" - -#include "distributed_common.hpp" - -using namespace database; - -std::vector<int> V = {0, 1, 1, 0, 1, 2}; -std::vector<std::pair<int, int>> E = {{0, 1}, {1, 2}, {1, 5}, - {2, 4}, {2, 5}, {3, 4}}; - -class BfsTest : public DistributedGraphDbTest { - protected: - void SetUp() override { - DistributedGraphDbTest::SetUp(); - - for (int v : V) { - auto vertex = v == 0 ? InsertVertex(master()) : InsertVertex(worker(v)); - vertices.emplace_back(vertex); - } - - for (auto e : E) { - edges[e] = InsertEdge(vertices[e.first], vertices[e.second], "Edge"); - } - } - - public: - BfsTest() : DistributedGraphDbTest("bfs") {} - std::vector<storage::VertexAddress> vertices; - std::map<std::pair<int, int>, storage::EdgeAddress> edges; -}; - -TEST_F(BfsTest, Expansion) { - auto dba = master().Access(); - - auto &clients = master().bfs_subcursor_clients(); - auto subcursor_ids = clients.CreateBfsSubcursors( - dba->transaction_id(), query::EdgeAtom::Direction::BOTH, - {dba->EdgeType("Edge")}, query::GraphView::OLD); - clients.RegisterSubcursors(subcursor_ids); - - clients.SetSource(subcursor_ids, vertices[0]); - - auto pull = [&clients, &subcursor_ids, &dba](int worker_id) { - return clients.Pull(worker_id, subcursor_ids[worker_id], dba.get()); - }; - - EXPECT_EQ(pull(0), std::experimental::nullopt); - EXPECT_EQ(pull(1)->GlobalAddress(), vertices[1]); - EXPECT_EQ(pull(2), std::experimental::nullopt); - - clients.PrepareForExpand(subcursor_ids, false); - clients.ExpandLevel(subcursor_ids); - - EXPECT_EQ(pull(0), std::experimental::nullopt); - EXPECT_EQ(pull(1)->GlobalAddress(), vertices[2]); - EXPECT_EQ(pull(1), std::experimental::nullopt); - EXPECT_EQ(pull(2)->GlobalAddress(), vertices[5]); - EXPECT_EQ(pull(2), std::experimental::nullopt); - - clients.PrepareForExpand(subcursor_ids, false); - clients.ExpandLevel(subcursor_ids); - - EXPECT_EQ(pull(0), std::experimental::nullopt); - EXPECT_EQ(pull(1)->GlobalAddress(), vertices[4]); - EXPECT_EQ(pull(1), std::experimental::nullopt); - EXPECT_EQ(pull(2), std::experimental::nullopt); - - clients.PrepareForExpand(subcursor_ids, false); - clients.ExpandLevel(subcursor_ids); - - EXPECT_EQ(pull(0)->GlobalAddress(), vertices[3]); - EXPECT_EQ(pull(0), std::experimental::nullopt); - EXPECT_EQ(pull(1), std::experimental::nullopt); - EXPECT_EQ(pull(2), std::experimental::nullopt); - - auto compare = [this](const std::vector<EdgeAccessor> &lhs, - const std::vector<std::pair<int, int>> &rhs) { - EXPECT_EQ(lhs.size(), rhs.size()); - if (lhs.size() != rhs.size()) return; - for (auto idx = 0u; idx < lhs.size(); ++idx) { - EXPECT_EQ(lhs[idx].GlobalAddress(), edges[rhs[idx]]); - } - }; - - distributed::PathSegment ps; - - ps = clients.ReconstructPath(subcursor_ids, vertices[3], dba.get()); - ASSERT_EQ(ps.next_vertex, vertices[4]); - ASSERT_EQ(ps.next_edge, std::experimental::nullopt); - compare(ps.edges, {{3, 4}}); - - ps = clients.ReconstructPath(subcursor_ids, vertices[4], dba.get()); - EXPECT_EQ(ps.next_vertex, std::experimental::nullopt); - EXPECT_EQ(ps.next_edge, (edges[{0, 1}])); - compare(ps.edges, {{2, 4}, {1, 2}}); - - ps = clients.ReconstructPath(subcursor_ids, edges[{0, 1}], dba.get()); - EXPECT_EQ(ps.next_vertex, std::experimental::nullopt); - EXPECT_EQ(ps.next_edge, std::experimental::nullopt); - compare(ps.edges, {{0, 1}}); - - clients.PrepareForExpand(subcursor_ids, true); - clients.SetSource(subcursor_ids, vertices[3]); - - EXPECT_EQ(pull(0), std::experimental::nullopt); - EXPECT_EQ(pull(1)->GlobalAddress(), vertices[4]); - EXPECT_EQ(pull(1), std::experimental::nullopt); - EXPECT_EQ(pull(2), std::experimental::nullopt); - - clients.RemoveBfsSubcursors(subcursor_ids); -} diff --git a/tests/unit/query_common.hpp b/tests/unit/query_common.hpp index 33b52b6f2..9461a7477 100644 --- a/tests/unit/query_common.hpp +++ b/tests/unit/query_common.hpp @@ -483,6 +483,8 @@ auto GetMerge(AstStorage &storage, Pattern *pattern, OnMatch on_match, std::make_pair(property_name, dba.Property(property_name)) #define PROPERTY_LOOKUP(...) \ query::test_common::GetPropertyLookup(storage, dba, __VA_ARGS__) +#define PARAMETER_LOOKUP(token_position) \ + storage.Create<query::ParameterLookup>((token_position)) #define NEXPR(name, expr) storage.Create<query::NamedExpression>((name), (expr)) // AS is alternative to NEXPR which does not initialize NamedExpression with // Expression. It should be used with RETURN or WITH. For example: @@ -554,6 +556,8 @@ auto GetMerge(AstStorage &storage, Pattern *pattern, OnMatch on_match, #define OR(expr1, expr2) storage.Create<query::OrOperator>((expr1), (expr2)) #define IN_LIST(expr1, expr2) \ storage.Create<query::InListOperator>((expr1), (expr2)) +#define IF(cond, then, else) \ + storage.Create<query::IfOperator>((cond), (then), (else)) // Function call #define FN(function_name, ...) \ storage.Create<query::Function>( \ diff --git a/tests/unit/query_cost_estimator.cpp b/tests/unit/query_cost_estimator.cpp index d0b368476..e2ed63f0a 100644 --- a/tests/unit/query_cost_estimator.cpp +++ b/tests/unit/query_cost_estimator.cpp @@ -174,7 +174,7 @@ TEST_F(QueryCostEstimator, ExpandVariable) { NextSymbol(), NextSymbol(), EdgeAtom::Type::DEPTH_FIRST, EdgeAtom::Direction::IN, std::vector<storage::EdgeType>{}, false, nullptr, nullptr, last_op_, NextSymbol(), false, - ExpandVariable::Lambda{NextSymbol(), NextSymbol(), nullptr}, + ExpansionLambda{NextSymbol(), NextSymbol(), nullptr}, std::experimental::nullopt, std::experimental::nullopt, GraphView::OLD); EXPECT_COST(CardParam::kExpandVariable * CostParam::kExpandVariable); } diff --git a/tests/unit/query_plan_match_filter_return.cpp b/tests/unit/query_plan_match_filter_return.cpp index 3f538cef4..a8d940c49 100644 --- a/tests/unit/query_plan_match_filter_return.cpp +++ b/tests/unit/query_plan_match_filter_return.cpp @@ -569,9 +569,9 @@ class QueryPlanExpandVariable : public testing::Test { n_to_sym, edge_sym, EdgeAtom::Type::DEPTH_FIRST, direction, edge_types, is_reverse, convert(lower), convert(upper), filter_op, n_from.sym_, false, - ExpandVariable::Lambda{symbol_table.CreateSymbol("inner_edge", false), - symbol_table.CreateSymbol("inner_node", false), - nullptr}, + ExpansionLambda{symbol_table.CreateSymbol("inner_edge", false), + symbol_table.CreateSymbol("inner_node", false), + nullptr}, std::experimental::nullopt, std::experimental::nullopt, graph_view); } else return std::make_shared<Expand>(n_to_sym, edge_sym, direction, edge_types, @@ -832,642 +832,6 @@ struct hash<std::pair<int, int>> { }; } // namespace std -std::vector<std::vector<int>> FloydWarshall( - int num_vertices, const std::vector<std::pair<int, int>> &edges, - EdgeAtom::Direction dir) { - auto has_edge = [&](int u, int v) -> bool { - bool res = false; - if (dir != EdgeAtom::Direction::IN) - res |= utils::Contains(edges, std::make_pair(u, v)); - if (dir != EdgeAtom::Direction::OUT) - res |= utils::Contains(edges, std::make_pair(v, u)); - return res; - }; - - int inf = std::numeric_limits<int>::max(); - std::vector<std::vector<int>> dist(num_vertices, - std::vector<int>(num_vertices, inf)); - - for (int i = 0; i < num_vertices; ++i) - for (int j = 0; j < num_vertices; ++j) - if (has_edge(i, j)) dist[i][j] = 1; - for (int i = 0; i < num_vertices; ++i) dist[i][i] = 0; - - for (int k = 0; k < num_vertices; ++k) { - for (int i = 0; i < num_vertices; ++i) { - for (int j = 0; j < num_vertices; ++j) { - if (dist[i][k] == inf || dist[k][j] == inf) continue; - dist[i][j] = std::min(dist[i][j], dist[i][k] + dist[k][j]); - } - } - } - - for (int i = 0; i < num_vertices; ++i) - for (int j = 0; j < num_vertices; ++j) - if (dist[i][j] == inf) dist[i][j] = -1; - - return dist; -} - -class STShortestPathTest : public ::testing::Test { - protected: - STShortestPathTest() : db(), dba_ptr(db.Access()), dba(*dba_ptr) {} - - void SetUp() { - for (int i = 0; i < NUM_VERTICES; ++i) { - vertices.emplace_back(dba.InsertVertex()); - vertices[i].PropsSet(dba.Property("id"), i); - } - for (auto edge : EDGES) { - edges.emplace_back(dba.InsertEdge( - vertices[edge.first], vertices[edge.second], dba.EdgeType("Edge"))); - edges.back().PropsSet(dba.Property("id"), - fmt::format("{}-{}", edge.first, edge.second)); - } - - dba.AdvanceCommand(); - - ASSERT_EQ(dba.VerticesCount(), NUM_VERTICES); - ASSERT_EQ(dba.EdgesCount(), EDGES.size()); - } - - std::vector<std::vector<TypedValue>> ShortestPaths( - std::shared_ptr<query::plan::LogicalOperator> input_cursor, - Symbol source_symbol, Symbol sink_symbol, EdgeAtom::Direction dir, - Expression *lower_bound = nullptr, Expression *upper_bound = nullptr, - std::experimental::optional<ExpandVariable::Lambda> expand_lambda = - std::experimental::nullopt) { - if (!expand_lambda) { - expand_lambda = ExpandVariable::Lambda{ - symbol_table.CreateSymbol("inner_edge", true), - symbol_table.CreateSymbol("inner_node", true), nullptr}; - } - - auto edges_symbol = symbol_table.CreateSymbol("edges_symbol", true); - - auto expand_variable = std::make_shared<ExpandVariable>( - sink_symbol, edges_symbol, EdgeAtom::Type::BREADTH_FIRST, dir, - std::vector<storage::EdgeType>{dba.EdgeType("Edge")}, false, - lower_bound, upper_bound, input_cursor, source_symbol, true, - *expand_lambda, std::experimental::nullopt, std::experimental::nullopt, - GraphView::OLD); - - auto source_output_symbol = - symbol_table.CreateSymbol("s", true, Symbol::Type::Vertex); - auto sink_output_symbol = - symbol_table.CreateSymbol("t", true, Symbol::Type::Vertex); - auto edges_output_symbol = - symbol_table.CreateSymbol("edge", true, Symbol::Type::EdgeList); - - auto source_id = IDENT("s"); - auto sink_id = IDENT("t"); - auto edges_id = IDENT("e"); - - symbol_table[*source_id] = source_symbol; - symbol_table[*sink_id] = sink_symbol; - symbol_table[*edges_id] = edges_symbol; - - auto source_ne = NEXPR("s", source_id); - auto sink_ne = NEXPR("s", sink_id); - auto edges_ne = NEXPR("e", edges_id); - - symbol_table[*source_ne] = source_output_symbol; - symbol_table[*sink_ne] = sink_output_symbol; - symbol_table[*edges_ne] = edges_output_symbol; - - auto produce = MakeProduce(expand_variable, source_ne, sink_ne, edges_ne); - return CollectProduce(produce.get(), symbol_table, dba); - } - - void CheckPath(const VertexAccessor &source, const VertexAccessor &sink, - EdgeAtom::Direction dir, const std::vector<TypedValue> &path) { - // Check that the given path is actually a path from source to sink, that - // expansion direction is correct and that given edges actually exist in the - // test graph - VertexAccessor curr = source; - for (const auto &edge_tv : path) { - EXPECT_TRUE(edge_tv.IsEdge()); - auto edge = edge_tv.ValueEdge(); - EXPECT_TRUE(edge.from() == curr || edge.to() == curr); - EXPECT_TRUE(curr == edge.from() || dir != EdgeAtom::Direction::OUT); - EXPECT_TRUE(curr == edge.to() || dir != EdgeAtom::Direction::IN); - int from = edge.from().PropsAt(dba.Property("id")).Value<int64_t>(); - int to = edge.to().PropsAt(dba.Property("id")).Value<int64_t>(); - EXPECT_TRUE(utils::Contains(EDGES, std::make_pair(from, to))); - curr = curr == edge.from() ? edge.to() : edge.from(); - } - EXPECT_EQ(curr, sink); - } - - database::SingleNode db; - std::unique_ptr<database::GraphDbAccessor> dba_ptr; - database::GraphDbAccessor &dba; - std::vector<VertexAccessor> vertices; - std::vector<EdgeAccessor> edges; - - AstStorage storage; - SymbolTable symbol_table; - - const int NUM_VERTICES = 6; - const std::vector<std::pair<int, int>> EDGES = { - {0, 1}, {1, 2}, {2, 4}, {2, 5}, {4, 1}, {4, 5}, {5, 4}, {5, 5}, {5, 3}}; -}; - -TEST_F(STShortestPathTest, DirectionAndExpansionDepth) { - auto lower_bounds = iter::range(-1, NUM_VERTICES + 1); - auto upper_bounds = iter::range(-1, NUM_VERTICES + 1); - auto directions = std::vector<EdgeAtom::Direction>{EdgeAtom::Direction::IN, - EdgeAtom::Direction::OUT, - EdgeAtom::Direction::BOTH}; - - for (const auto &test : - iter::product(lower_bounds, upper_bounds, directions)) { - int lower_bound; - int upper_bound; - EdgeAtom::Direction dir; - std::tie(lower_bound, upper_bound, dir) = test; - - auto dist = FloydWarshall(NUM_VERTICES, EDGES, dir); - - auto source = MakeScanAll(storage, symbol_table, "s"); - auto sink = MakeScanAll(storage, symbol_table, "t", source.op_); - - auto results = - ShortestPaths(sink.op_, source.sym_, sink.sym_, dir, - lower_bound == -1 ? nullptr : LITERAL(lower_bound), - upper_bound == -1 ? nullptr : LITERAL(upper_bound)); - - if (lower_bound == -1) lower_bound = 0; - if (upper_bound == -1) upper_bound = NUM_VERTICES; - size_t output_count = 0; - for (int i = 0; i < NUM_VERTICES; ++i) { - for (int j = 0; j < NUM_VERTICES; ++j) { - if (i != j && dist[i][j] != -1 && dist[i][j] >= lower_bound && - dist[i][j] <= upper_bound) - ++output_count; - } - } - - EXPECT_EQ(results.size(), output_count); - - for (const auto &result : results) { - int s = - result[0].ValueVertex().PropsAt(dba.Property("id")).Value<int64_t>(); - int t = - result[1].ValueVertex().PropsAt(dba.Property("id")).Value<int64_t>(); - EXPECT_EQ(dist[s][t], (int)result[2].ValueList().size()); - CheckPath(result[0].ValueVertex(), result[1].ValueVertex(), dir, - result[2].ValueList()); - } - } -} - -TEST_F(STShortestPathTest, ExpandLambda) { - Symbol inner_node_symbol = symbol_table.CreateSymbol("inner_node", true); - Symbol inner_edge_symbol = symbol_table.CreateSymbol("inner_edge", true); - auto inner_node = IDENT("inner_node"); - auto inner_edge = IDENT("inner_edge"); - - symbol_table[*inner_node] = inner_node_symbol; - symbol_table[*inner_edge] = inner_edge_symbol; - - // (filter expression, expected shortest path length) - std::vector<std::pair<Expression *, int>> tests = { - // Block vertex 1 (this stops expansion from source side) - {NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(1)), -1}, - // Block vertex 5 (this stops expansion from sink side) - {NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(5)), -1}, - // Block source vertex - {NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(0)), 4}, - // Block sink vertex - {NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(3)), -1}, - // Block edge 0-1 (this stops expansion from source side) - {NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")), LITERAL("0-1")), - -1}, - // Block edge 5-3 (this stops expansion from sink side) - {NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")), LITERAL("5-3")), - -1}, - // Block edges 2-5 and 4-1 - {AND(NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")), LITERAL("2-5")), - NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")), - LITERAL("4-1"))), - 5}}; - - for (auto test : tests) { - Expression *expression; - int length; - - std::tie(expression, length) = test; - - auto s = MakeScanAll(storage, symbol_table, "s"); - auto source = std::make_shared<Filter>( - s.op_, EQ(PROPERTY_LOOKUP(s.node_->identifier_, dba.Property("id")), - LITERAL(0))); - - auto t = MakeScanAll(storage, symbol_table, "t", source); - auto sink = std::make_shared<Filter>( - t.op_, EQ(PROPERTY_LOOKUP(t.node_->identifier_, dba.Property("id")), - LITERAL(3))); - - auto results = ShortestPaths( - sink, s.sym_, t.sym_, EdgeAtom::Direction::BOTH, nullptr, nullptr, - ExpandVariable::Lambda{inner_edge_symbol, inner_node_symbol, - expression}); - - if (length == -1) { - EXPECT_EQ(results.size(), 0); - } else { - ASSERT_EQ(results.size(), 1); - EXPECT_EQ(results[0][2].ValueList().size(), length); - } - } -} - -TEST_F(STShortestPathTest, OptionalMatch) { - for (int i = 0; i <= 2; ++i) { - auto s = MakeScanAll(storage, symbol_table, "s"); - auto source = std::make_shared<Filter>( - s.op_, EQ(PROPERTY_LOOKUP(s.node_->identifier_, dba.Property("id")), - LITERAL(i == 0 ? 0 : -1))); - auto source_opt = std::make_shared<Optional>( - std::make_shared<Once>(), source, std::vector<Symbol>{s.sym_}); - - auto t = MakeScanAll(storage, symbol_table, "t"); - auto sink = std::make_shared<Filter>( - t.op_, EQ(PROPERTY_LOOKUP(t.node_->identifier_, dba.Property("id")), - LITERAL(i == 1 ? 3 : -1))); - auto sink_opt = std::make_shared<Optional>(source_opt, sink, - std::vector<Symbol>{t.sym_}); - auto results = - ShortestPaths(sink_opt, s.sym_, t.sym_, EdgeAtom::Direction::BOTH); - EXPECT_EQ(results.size(), 0); - } -} - -enum class TestType { SINGLE_NODE, DISTRIBUTED }; - -/** A test fixture for breadth first expansion */ -class QueryPlanExpandBfs - : public testing::TestWithParam<std::pair<TestType, int>> { - private: - std::unique_ptr<Cluster> cluster_; - std::unique_ptr<database::SingleNode> single_node_; - database::GraphDb *db_{nullptr}; - std::unordered_map<std::pair<int, int>, storage::EdgeAddress> e_; - - protected: - QueryPlanExpandBfs() - : cluster_(GetParam().first == TestType::DISTRIBUTED - ? new Cluster(GetParam().second, "QueryPlanExpandBfs") - : nullptr), - single_node_(GetParam().first == TestType::DISTRIBUTED - ? nullptr - : new database::SingleNode()), - db_([&]() -> database::GraphDb * { - if (cluster_) return cluster_->master(); - return single_node_.get(); - }()), - dba_ptr(db_->Access()), - dba(*dba_ptr) {} - - // Worker IDs where vertices are located. - const std::vector<int> vertices = {0, 1, 1, 0, 1, 2}; - // Edges in graph. - const std::vector<std::pair<int, int>> edges = { - {0, 1}, {1, 2}, {2, 4}, {2, 5}, {4, 1}, {4, 5}, {5, 4}, {5, 5}, {5, 3}}; - - // Style-guide non-conformant name due to PROPERTY_PAIR and PROPERTY_LOOKUP - // macro requirements. - std::unique_ptr<database::GraphDbAccessor> dba_ptr; - database::GraphDbAccessor &dba; - std::vector<storage::VertexAddress> v; - - AstStorage storage; - SymbolTable symbol_table; - - std::pair<std::string, storage::Property> prop = PROPERTY_PAIR("property"); - storage::EdgeType edge_type = dba.EdgeType("edge_type"); - // Inner edge and vertex symbols. - // Edge from a to b has `prop` with the value ab (all ints). - Symbol inner_edge = symbol_table.CreateSymbol("inner_edge", true); - Symbol inner_node = symbol_table.CreateSymbol("inner_node", true); - - int plan_id_{0}; - - void SetUp() { - for (auto p : iter::enumerate(vertices)) { - int id, worker; - std::tie(id, worker) = p; - if (GetParam().first == TestType::SINGLE_NODE || worker == 0) { - auto vertex = dba.InsertVertex(); - vertex.PropsSet(prop.second, id); - v.push_back(vertex.GlobalAddress()); - } else { - auto vertex = database::InsertVertexIntoRemote( - &dba, worker, {}, {{prop.second, id}}, std::experimental::nullopt); - v.push_back(vertex.GlobalAddress()); - } - } - - for (auto p : edges) { - VertexAccessor from(v[p.first], dba); - VertexAccessor to(v[p.second], dba); - auto edge = dba.InsertEdge(from, to, edge_type); - edge.PropsSet(prop.second, p.first * 10 + p.second); - e_.emplace(p, edge.GlobalAddress()); - } - - AdvanceCommand(dba.transaction_id()); - } - - // Defines and performs a breadth-first expansion with the given parameters. - // Returns a vector of pairs. Each pair is (vector-of-edges, vertex). - auto ExpandBF(EdgeAtom::Direction direction, int min_depth, int max_depth, - Expression *where, std::vector<int> source_ids = {}, - std::experimental::optional<TypedValue> existing_node = - std::experimental::nullopt) { - // If source_ids are empty, we set ID to -1 so no nodes are matched (used - // for optional match test case). - if (source_ids.empty()) { - source_ids = std::vector<int>{-1}; - } - - std::vector<PropertyValue> sources; - for (const auto id : source_ids) sources.emplace_back(id); - - auto s = MakeScanAll(storage, symbol_table, "s"); - std::shared_ptr<LogicalOperator> last_op = std::make_shared<Filter>( - s.op_, IN_LIST(PROPERTY_LOOKUP(s.node_->identifier_, prop.second), - LITERAL(sources))); - - auto node_sym = symbol_table.CreateSymbol("node", true); - auto edge_list_sym = symbol_table.CreateSymbol("edgelist_", true); - - if (GetParam().first == TestType::DISTRIBUTED) { - cluster_->master()->plan_dispatcher().DispatchPlan(plan_id_, last_op, - symbol_table); - last_op = std::make_shared<PullRemote>(last_op, plan_id_, - std::vector<Symbol>{s.sym_}); - plan_id_++; - } - - last_op = std::make_shared<Optional>(std::make_shared<Once>(), last_op, - std::vector<Symbol>{s.sym_}); - - if (GetParam().first == TestType::DISTRIBUTED) { - last_op = std::make_shared<DistributedExpandBfs>( - node_sym, edge_list_sym, direction, std::vector<storage::EdgeType>{}, - last_op, s.sym_, !!existing_node, GraphView::OLD, LITERAL(min_depth), - LITERAL(max_depth), - ExpandVariable::Lambda{inner_edge, inner_node, where}); - } else { - last_op = std::make_shared<ExpandVariable>( - node_sym, edge_list_sym, EdgeAtom::Type::BREADTH_FIRST, direction, - std::vector<storage::EdgeType>{}, false, LITERAL(min_depth), - LITERAL(max_depth), last_op, s.sym_, !!existing_node, - ExpandVariable::Lambda{inner_edge, inner_node, where}, - std::experimental::nullopt, std::experimental::nullopt, - GraphView::OLD); - } - - Frame frame(symbol_table.max_position()); - - if (existing_node) { - frame[node_sym] = *existing_node; - } - - auto cursor = last_op->MakeCursor(dba); - std::vector<std::pair<std::vector<EdgeAccessor>, VertexAccessor>> results; - Context context(dba); - context.symbol_table_ = symbol_table; - while (cursor->Pull(frame, context)) { - results.emplace_back(std::vector<EdgeAccessor>(), - frame[node_sym].Value<VertexAccessor>()); - for (const TypedValue &edge : frame[edge_list_sym].ValueList()) - results.back().first.emplace_back(edge.Value<EdgeAccessor>()); - } - - return results; - } - - template <typename TAccessor> - auto GetProp(const TAccessor &accessor) { - return accessor.PropsAt(prop.second).template Value<int64_t>(); - } - - bool EdgesEqual(const std::vector<EdgeAccessor> &edges, - const std::vector<int> &ids) { - if (edges.size() != ids.size()) return false; - for (size_t i = 0; i < edges.size(); ++i) { - if (GetProp(edges[i]) != ids[i]) return false; - } - return true; - } - - void ApplyUpdates(tx::TransactionId tx_id) { - if (GetParam().first == TestType::DISTRIBUTED) - cluster_->ApplyUpdates(tx_id); - } - - void AdvanceCommand(tx::TransactionId tx_id) { - if (GetParam().first == TestType::DISTRIBUTED) - cluster_->AdvanceCommand(tx_id); - else - db_->Access(tx_id)->AdvanceCommand(); - } -}; - -TEST_P(QueryPlanExpandBfs, Basic) { - auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {0}); - - ASSERT_EQ(results.size(), 5); - - EXPECT_EQ(GetProp(results[0].second), 1); - EXPECT_TRUE(EdgesEqual(results[0].first, {1})); - - if (GetProp(results[1].second) == 4) { - std::swap(results[1], results[2]); - } - - EXPECT_EQ(GetProp(results[1].second), 2); - EXPECT_TRUE(EdgesEqual(results[1].first, {1, 12})); - - EXPECT_EQ(GetProp(results[2].second), 4); - EXPECT_TRUE(EdgesEqual(results[2].first, {1, 41})); - - EXPECT_EQ(GetProp(results[3].second), 5); - EXPECT_TRUE(EdgesEqual(results[3].first, {1, 41, 45}) || - EdgesEqual(results[3].first, {1, 41, 54}) || - EdgesEqual(results[3].first, {1, 12, 25})); - - EXPECT_EQ(GetProp(results[4].second), 3); - EXPECT_TRUE(EdgesEqual(results[4].first, {1, 41, 45, 53}) || - EdgesEqual(results[4].first, {1, 41, 54, 53}) || - EdgesEqual(results[4].first, {1, 12, 25, 53})); -} - -TEST_P(QueryPlanExpandBfs, EdgeDirection) { - { - auto results = ExpandBF(EdgeAtom::Direction::OUT, 1, 1000, nullptr, {4}); - ASSERT_EQ(results.size(), 4); - - if (GetProp(results[0].second) == 5) { - std::swap(results[0], results[1]); - } - - EXPECT_EQ(GetProp(results[0].second), 1); - EXPECT_TRUE(EdgesEqual(results[0].first, {41})); - - EXPECT_EQ(GetProp(results[1].second), 5); - EXPECT_TRUE(EdgesEqual(results[1].first, {45})); - - if (GetProp(results[2].second) == 3) { - std::swap(results[2], results[3]); - } - - EXPECT_EQ(GetProp(results[2].second), 2); - EXPECT_TRUE(EdgesEqual(results[2].first, {41, 12})); - - EXPECT_EQ(GetProp(results[3].second), 3); - EXPECT_TRUE(EdgesEqual(results[3].first, {45, 53})); - } - - { - auto results = ExpandBF(EdgeAtom::Direction::IN, 1, 1000, nullptr, {4}); - ASSERT_EQ(results.size(), 4); - - if (GetProp(results[0].second) == 5) { - std::swap(results[0], results[1]); - } - - EXPECT_EQ(GetProp(results[0].second), 2); - EXPECT_TRUE(EdgesEqual(results[0].first, {24})); - - EXPECT_EQ(GetProp(results[1].second), 5); - EXPECT_TRUE(EdgesEqual(results[1].first, {54})); - - EXPECT_EQ(GetProp(results[2].second), 1); - EXPECT_TRUE(EdgesEqual(results[2].first, {24, 12})); - - EXPECT_EQ(GetProp(results[3].second), 0); - EXPECT_TRUE(EdgesEqual(results[3].first, {24, 12, 1})); - } -} - -TEST_P(QueryPlanExpandBfs, Where) { - // TODO(mtomic): lambda filtering in distributed - if (GetParam().first == TestType::DISTRIBUTED) { - return; - } - - auto ident = IDENT("inner_element"); - { - symbol_table[*ident] = inner_node; - auto filter_expr = LESS(PROPERTY_LOOKUP(ident, prop), LITERAL(4)); - auto results = - ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, filter_expr, {0}); - ASSERT_EQ(results.size(), 2); - EXPECT_EQ(GetProp(results[0].second), 1); - EXPECT_EQ(GetProp(results[1].second), 2); - } - { - symbol_table[*ident] = inner_edge; - auto filter_expr = AND(LESS(PROPERTY_LOOKUP(ident, prop), LITERAL(50)), - NEQ(PROPERTY_LOOKUP(ident, prop), LITERAL(12))); - auto results = - ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, filter_expr, {0}); - ASSERT_EQ(results.size(), 4); - EXPECT_EQ(GetProp(results[0].second), 1); - EXPECT_EQ(GetProp(results[1].second), 4); - - if (GetProp(results[2].second) == 5) { - std::swap(results[2], results[3]); - } - EXPECT_EQ(GetProp(results[2].second), 2); - EXPECT_EQ(GetProp(results[3].second), 5); - EXPECT_TRUE(EdgesEqual(results[3].first, {1, 41, 45})); - } -} - -TEST_P(QueryPlanExpandBfs, MultipleInputs) { - auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {0, 3}); - // Expect that each vertex has been returned 2 times. - EXPECT_EQ(results.size(), 10); - std::vector<int> found(5, 0); - for (const auto &row : results) found[GetProp(row.second)]++; - EXPECT_EQ(found, (std::vector<int>{1, 2, 2, 1, 2})); -} - -TEST_P(QueryPlanExpandBfs, ExistingNode) { - // In single-node, this is handled by STShortestPath cursor instead of - // SingleSourceShortestPath cursor. - using testing::ElementsAre; - using testing::WhenSorted; - - { - auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {0}, - VertexAccessor(v[3], dba)); - EXPECT_EQ(results.size(), 1); - EXPECT_EQ(GetProp(results[0].second), 3); - } - { - auto results = ExpandBF(EdgeAtom::Direction::IN, 1, 1000, nullptr, - {0, 1, 2, 3, 4}, VertexAccessor(v[5], dba)); - - std::vector<int> nodes; - for (auto &row : results) { - EXPECT_EQ(GetProp(row.second), 5); - nodes.push_back(GetProp(row.first[0]) % 10); - } - EXPECT_THAT(nodes, WhenSorted(ElementsAre(1, 2, 3, 4))); - } - { - auto results = ExpandBF(EdgeAtom::Direction::OUT, 1, 1000, nullptr, - {0, 1, 2, 3, 4}, VertexAccessor(v[5], dba)); - - std::vector<int> nodes; - for (auto &row : results) { - EXPECT_EQ(GetProp(row.second), 5); - nodes.push_back(GetProp(row.first[0]) / 10); - } - EXPECT_THAT(nodes, WhenSorted(ElementsAre(0, 1, 2, 4))); - } -} - -TEST_P(QueryPlanExpandBfs, OptionalMatch) { - { - auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {}); - EXPECT_EQ(results.size(), 0); - } - { - auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {0}, - TypedValue::Null); - EXPECT_EQ(results.size(), 0); - } -} - -TEST_P(QueryPlanExpandBfs, ExpansionDepth) { - { - auto results = ExpandBF(EdgeAtom::Direction::BOTH, 2, 3, nullptr, {0}); - EXPECT_EQ(results.size(), 3); - if (GetProp(results[0].second) == 4) { - std::swap(results[0], results[1]); - } - - EXPECT_EQ(GetProp(results[0].second), 2); - EXPECT_EQ(GetProp(results[1].second), 4); - EXPECT_EQ(GetProp(results[2].second), 5); - } -} - -INSTANTIATE_TEST_CASE_P(SingleNode, QueryPlanExpandBfs, - ::testing::Values(std::make_pair(TestType::SINGLE_NODE, - 0))); - -INSTANTIATE_TEST_CASE_P(Distributed, QueryPlanExpandBfs, - ::testing::Values(std::make_pair(TestType::DISTRIBUTED, - 2))); - /** A test fixture for weighted shortest path expansion */ class QueryPlanExpandWeightedShortestPath : public testing::Test { public: @@ -1561,9 +925,9 @@ class QueryPlanExpandWeightedShortestPath : public testing::Test { direction, std::vector<storage::EdgeType>{}, false, nullptr, max_depth ? LITERAL(max_depth.value()) : nullptr, last_op, n.sym_, existing_node_input != nullptr, - ExpandVariable::Lambda{filter_edge, filter_node, where}, - ExpandVariable::Lambda{weight_edge, weight_node, - PROPERTY_LOOKUP(ident_e, prop)}, + ExpansionLambda{filter_edge, filter_node, where}, + ExpansionLambda{weight_edge, weight_node, + PROPERTY_LOOKUP(ident_e, prop)}, total_weight, graph_view); Frame frame(symbol_table.max_position());