Add Synchronize operator stub

Reviewers: florijan, msantl

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1166
This commit is contained in:
Teon Banek 2018-02-02 10:19:19 +01:00
parent fe556e3128
commit 583f6f1794
3 changed files with 53 additions and 8 deletions

View File

@ -2649,6 +2649,19 @@ std::unique_ptr<Cursor> PullRemote::MakeCursor(
return std::make_unique<PullRemote::PullRemoteCursor>(*this, db);
}
bool Synchronize::Accept(HierarchicalLogicalOperatorVisitor &visitor) {
if (visitor.PreVisit(*this)) {
input_->Accept(visitor) && pull_remote_->Accept(visitor);
}
return visitor.PostVisit(*this);
}
std::unique_ptr<Cursor> Synchronize::MakeCursor(
database::GraphDbAccessor &) const {
// TODO: Implement a concrete cursor.
return nullptr;
}
} // namespace query::plan
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Once);
@ -2685,3 +2698,4 @@ BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::CreateIndex);
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Union);
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::ProduceRemote);
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::PullRemote);
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Synchronize);

View File

@ -99,6 +99,7 @@ class CreateIndex;
class Union;
class ProduceRemote;
class PullRemote;
class Synchronize;
using LogicalOperatorCompositeVisitor = ::utils::CompositeVisitor<
Once, CreateNode, CreateExpand, ScanAll, ScanAllByLabel,
@ -108,7 +109,7 @@ using LogicalOperatorCompositeVisitor = ::utils::CompositeVisitor<
ExpandUniquenessFilter<VertexAccessor>,
ExpandUniquenessFilter<EdgeAccessor>, Accumulate, Aggregate, Skip, Limit,
OrderBy, Merge, Optional, Unwind, Distinct, Union, ProduceRemote,
PullRemote>;
PullRemote, Synchronize>;
using LogicalOperatorLeafVisitor = ::utils::LeafVisitor<Once, CreateIndex>;
@ -2317,6 +2318,35 @@ class PullRemote : public LogicalOperator {
}
};
/** Operator used to synchronize the execution of master and workers. */
class Synchronize : public LogicalOperator {
public:
Synchronize(const std::shared_ptr<LogicalOperator> &input,
const std::shared_ptr<PullRemote> &pull_remote,
bool advance_command = false)
: input_(input),
pull_remote_(pull_remote),
advance_command_(advance_command) {}
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
private:
std::shared_ptr<LogicalOperator> input_;
std::shared_ptr<PullRemote> pull_remote_;
bool advance_command_ = false;
Synchronize() {}
friend class boost::serialization::access;
template <class TArchive>
void serialize(TArchive &ar, const unsigned int) {
ar &input_;
ar &pull_remote_;
ar &advance_command_;
}
};
} // namespace plan
} // namespace query
@ -2353,3 +2383,4 @@ BOOST_CLASS_EXPORT_KEY(query::plan::CreateIndex);
BOOST_CLASS_EXPORT_KEY(query::plan::Union);
BOOST_CLASS_EXPORT_KEY(query::plan::ProduceRemote);
BOOST_CLASS_EXPORT_KEY(query::plan::PullRemote);
BOOST_CLASS_EXPORT_KEY(query::plan::Synchronize);

View File

@ -263,8 +263,8 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
Parameters params;
std::vector<query::Symbol> symbols{ctx.symbol_table_[*x_ne]};
auto remote_pull = [this, plan_id, &params, &symbols](
database::GraphDbAccessor &dba, int worker_id) {
auto remote_pull = [this, &params, &symbols](database::GraphDbAccessor &dba,
int worker_id) {
return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id,
params, symbols, 3);
};
@ -358,9 +358,9 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
ctx.symbol_table_[*return_p] = ctx.symbol_table_.CreateSymbol("", true);
auto produce = MakeProduce(p, return_n_r, return_m, return_p);
auto check_result = [prop](
int worker_id,
const std::vector<std::vector<query::TypedValue>> &frames) {
auto check_result = [prop](int worker_id,
const std::vector<std::vector<query::TypedValue>>
&frames) {
int offset = worker_id * 10;
ASSERT_EQ(frames.size(), 1);
auto &row = frames[0];
@ -387,8 +387,8 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
Parameters params;
std::vector<query::Symbol> symbols{ctx.symbol_table_[*return_n_r],
ctx.symbol_table_[*return_m], p_sym};
auto remote_pull = [this, plan_id, &params, &symbols](
database::GraphDbAccessor &dba, int worker_id) {
auto remote_pull = [this, &params, &symbols](database::GraphDbAccessor &dba,
int worker_id) {
return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id,
params, symbols, 3);
};