diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index 0896929ed..81d91f354 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -525,6 +525,7 @@ class GraphDbAccessor { const tx::Transaction &transaction() const { return transaction_; } durability::WriteAheadLog &wal(); + auto &db() { return db_; } /** * Returns the current value of the counter with the given name, and diff --git a/src/distributed/remote_pull_rpc_clients.hpp b/src/distributed/remote_pull_rpc_clients.hpp index 984987c21..22e70804d 100644 --- a/src/distributed/remote_pull_rpc_clients.hpp +++ b/src/distributed/remote_pull_rpc_clients.hpp @@ -30,6 +30,8 @@ class RemotePullRpcClients { ->member); } + auto GetWorkerIds() { return clients_.GetWorkerIds(); } + // Notifies all workers that the given transaction/plan is done. Otherwise the // server is left with potentially unconsumed Cursors that never get deleted. // diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index dcf5abb5c..7bd2f6933 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -12,6 +12,7 @@ #include "glog/logging.h" #include "database/graph_db_accessor.hpp" +#include "distributed/remote_pull_rpc_clients.hpp" #include "query/context.hpp" #include "query/exceptions.hpp" #include "query/frontend/ast/ast.hpp" @@ -320,10 +321,10 @@ std::unique_ptr ScanAllByLabelPropertyRange::MakeCursor( context.symbol_table_, db, graph_view_); auto convert = [&evaluator](const auto &bound) -> std::experimental::optional> { - if (!bound) return std::experimental::nullopt; - return std::experimental::make_optional(utils::Bound( - bound.value().value()->Accept(evaluator), bound.value().type())); - }; + if (!bound) return std::experimental::nullopt; + return std::experimental::make_optional(utils::Bound( + bound.value().value()->Accept(evaluator), bound.value().type())); + }; return db.Vertices(label_, property_, convert(lower_bound()), convert(upper_bound()), graph_view_ == GraphView::NEW); }; @@ -2559,7 +2560,7 @@ ProduceRemote::ProduceRemote(const std::shared_ptr &input, ACCEPT_WITH_INPUT(ProduceRemote) std::unique_ptr ProduceRemote::MakeCursor( - database::GraphDbAccessor &db) const { + database::GraphDbAccessor &) const { // TODO: Implement a concrete cursor. return nullptr; } @@ -2572,10 +2573,77 @@ PullRemote::PullRemote(const std::shared_ptr &input, ACCEPT_WITH_INPUT(PullRemote); +PullRemote::PullRemoteCursor::PullRemoteCursor(const PullRemote &self, + database::GraphDbAccessor &db) + : self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) { + worker_ids_ = db_.db().remote_pull_clients().GetWorkerIds(); + // remove master from the worker_ids list + worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0)); +} + +void PullRemote::PullRemoteCursor::EndRemotePull() { + if (remote_pull_ended_) return; + db_.db().remote_pull_clients().EndRemotePull(db_.transaction().id_, + self_.plan_id()); + remote_pull_ended_ = true; +} + +bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) { + if (input_cursor_->Pull(frame, context)) { + return true; + } + + while (worker_ids_.size() > 0 && results_.empty()) { + last_pulled_worker_ = (last_pulled_worker_ + 1) % worker_ids_.size(); + auto remote_results = db_.db().remote_pull_clients().RemotePull( + db_.transaction().id_, worker_ids_[last_pulled_worker_], + self_.plan_id(), context.parameters_, self_.symbols()); + + auto get_results = [&]() { + for (auto &result : remote_results.frames) { + results_.emplace(std::move(result)); + } + }; + + switch (remote_results.pull_state) { + case distributed::RemotePullState::CURSOR_EXHAUSTED: + get_results(); + worker_ids_.erase(worker_ids_.begin() + last_pulled_worker_); + break; + case distributed::RemotePullState::CURSOR_IN_PROGRESS: + get_results(); + break; + case distributed::RemotePullState::SERIALIZATION_ERROR: + EndRemotePull(); + throw mvcc::SerializationError( + "Serialization error occured during PullRemote!"); + break; + } + } + + // if the results_ are still empty, we've exhausted all worker results + if (results_.empty()) { + EndRemotePull(); + return false; + } + + auto &result = results_.front(); + for (size_t i = 0; i < self_.symbols().size(); ++i) { + frame[self_.symbols()[i]] = std::move(result[i]); + } + results_.pop(); + + return true; +} + +void PullRemote::PullRemoteCursor::Reset() { + EndRemotePull(); + throw QueryRuntimeException("Unsupported: Reset during PullRemote!"); +} + std::unique_ptr PullRemote::MakeCursor( database::GraphDbAccessor &db) const { - // TODO: Implement a concrete cursor. - return nullptr; + return std::make_unique(*this, db); } } // namespace query::plan diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp index ece032cbe..b8694bef4 100644 --- a/src/query/plan/operator.hpp +++ b/src/query/plan/operator.hpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -163,7 +164,7 @@ class LogicalOperator friend class boost::serialization::access; template - void serialize(TArchive &ar, const unsigned int) {} + void serialize(TArchive &, const unsigned int) {} }; template @@ -2267,6 +2268,7 @@ class PullRemote : public LogicalOperator { database::GraphDbAccessor &db) const override; const auto &symbols() const { return symbols_; } + auto plan_id() const { return plan_id_; } private: std::shared_ptr input_; @@ -2275,6 +2277,24 @@ class PullRemote : public LogicalOperator { PullRemote() {} + class PullRemoteCursor : public Cursor { + public: + PullRemoteCursor(const PullRemote &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + void EndRemotePull(); + + const PullRemote &self_; + database::GraphDbAccessor &db_; + const std::unique_ptr input_cursor_; + std::queue> results_; + std::vector worker_ids_; + int last_pulled_worker_ = -1; + bool remote_pull_ended_ = false; + }; + friend class boost::serialization::access; template void serialize(TArchive &ar, const unsigned int) {