From 583f6f179456d16d01794df4307a9fb3a43aee21 Mon Sep 17 00:00:00 2001 From: Teon Banek Date: Fri, 2 Feb 2018 10:19:19 +0100 Subject: [PATCH] Add Synchronize operator stub Reviewers: florijan, msantl Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1166 --- src/query/plan/operator.cpp | 14 ++++++++++++ src/query/plan/operator.hpp | 33 ++++++++++++++++++++++++++++- tests/unit/distributed_graph_db.cpp | 14 ++++++------ 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index c3d41560f..00f3be3ec 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -2649,6 +2649,19 @@ std::unique_ptr PullRemote::MakeCursor( return std::make_unique(*this, db); } +bool Synchronize::Accept(HierarchicalLogicalOperatorVisitor &visitor) { + if (visitor.PreVisit(*this)) { + input_->Accept(visitor) && pull_remote_->Accept(visitor); + } + return visitor.PostVisit(*this); +} + +std::unique_ptr Synchronize::MakeCursor( + database::GraphDbAccessor &) const { + // TODO: Implement a concrete cursor. + return nullptr; +} + } // namespace query::plan BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Once); @@ -2685,3 +2698,4 @@ BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::CreateIndex); BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Union); BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::ProduceRemote); BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::PullRemote); +BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Synchronize); diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp index d710c777f..de32f923f 100644 --- a/src/query/plan/operator.hpp +++ b/src/query/plan/operator.hpp @@ -99,6 +99,7 @@ class CreateIndex; class Union; class ProduceRemote; class PullRemote; +class Synchronize; using LogicalOperatorCompositeVisitor = ::utils::CompositeVisitor< Once, CreateNode, CreateExpand, ScanAll, ScanAllByLabel, @@ -108,7 +109,7 @@ using LogicalOperatorCompositeVisitor = ::utils::CompositeVisitor< ExpandUniquenessFilter, ExpandUniquenessFilter, Accumulate, Aggregate, Skip, Limit, OrderBy, Merge, Optional, Unwind, Distinct, Union, ProduceRemote, - PullRemote>; + PullRemote, Synchronize>; using LogicalOperatorLeafVisitor = ::utils::LeafVisitor; @@ -2317,6 +2318,35 @@ class PullRemote : public LogicalOperator { } }; +/** Operator used to synchronize the execution of master and workers. */ +class Synchronize : public LogicalOperator { + public: + Synchronize(const std::shared_ptr &input, + const std::shared_ptr &pull_remote, + bool advance_command = false) + : input_(input), + pull_remote_(pull_remote), + advance_command_(advance_command) {} + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + + private: + std::shared_ptr input_; + std::shared_ptr pull_remote_; + bool advance_command_ = false; + + Synchronize() {} + + friend class boost::serialization::access; + template + void serialize(TArchive &ar, const unsigned int) { + ar &input_; + ar &pull_remote_; + ar &advance_command_; + } +}; + } // namespace plan } // namespace query @@ -2353,3 +2383,4 @@ BOOST_CLASS_EXPORT_KEY(query::plan::CreateIndex); BOOST_CLASS_EXPORT_KEY(query::plan::Union); BOOST_CLASS_EXPORT_KEY(query::plan::ProduceRemote); BOOST_CLASS_EXPORT_KEY(query::plan::PullRemote); +BOOST_CLASS_EXPORT_KEY(query::plan::Synchronize); diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp index f8777675e..d2a5ea142 100644 --- a/tests/unit/distributed_graph_db.cpp +++ b/tests/unit/distributed_graph_db.cpp @@ -263,8 +263,8 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { Parameters params; std::vector symbols{ctx.symbol_table_[*x_ne]}; - auto remote_pull = [this, plan_id, ¶ms, &symbols]( - database::GraphDbAccessor &dba, int worker_id) { + auto remote_pull = [this, ¶ms, &symbols](database::GraphDbAccessor &dba, + int worker_id) { return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id, params, symbols, 3); }; @@ -358,9 +358,9 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) { ctx.symbol_table_[*return_p] = ctx.symbol_table_.CreateSymbol("", true); auto produce = MakeProduce(p, return_n_r, return_m, return_p); - auto check_result = [prop]( - int worker_id, - const std::vector> &frames) { + auto check_result = [prop](int worker_id, + const std::vector> + &frames) { int offset = worker_id * 10; ASSERT_EQ(frames.size(), 1); auto &row = frames[0]; @@ -387,8 +387,8 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) { Parameters params; std::vector symbols{ctx.symbol_table_[*return_n_r], ctx.symbol_table_[*return_m], p_sym}; - auto remote_pull = [this, plan_id, ¶ms, &symbols]( - database::GraphDbAccessor &dba, int worker_id) { + auto remote_pull = [this, ¶ms, &symbols](database::GraphDbAccessor &dba, + int worker_id) { return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id, params, symbols, 3); };