Implement PullRemote
logical operator
Summary: PullRemoteCursor will pull all clients in a RoundRobin fashion until all clients are exhausted and there are no more results to return. Reviewers: teon.banek, florijan Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1147
This commit is contained in:
parent
bfb3a0d9b1
commit
a73a4c3762
@ -525,6 +525,7 @@ class GraphDbAccessor {
|
||||
|
||||
const tx::Transaction &transaction() const { return transaction_; }
|
||||
durability::WriteAheadLog &wal();
|
||||
auto &db() { return db_; }
|
||||
|
||||
/**
|
||||
* Returns the current value of the counter with the given name, and
|
||||
|
@ -30,6 +30,8 @@ class RemotePullRpcClients {
|
||||
->member);
|
||||
}
|
||||
|
||||
auto GetWorkerIds() { return clients_.GetWorkerIds(); }
|
||||
|
||||
// Notifies all workers that the given transaction/plan is done. Otherwise the
|
||||
// server is left with potentially unconsumed Cursors that never get deleted.
|
||||
//
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "distributed/remote_pull_rpc_clients.hpp"
|
||||
#include "query/context.hpp"
|
||||
#include "query/exceptions.hpp"
|
||||
#include "query/frontend/ast/ast.hpp"
|
||||
@ -2559,7 +2560,7 @@ ProduceRemote::ProduceRemote(const std::shared_ptr<LogicalOperator> &input,
|
||||
ACCEPT_WITH_INPUT(ProduceRemote)
|
||||
|
||||
std::unique_ptr<Cursor> ProduceRemote::MakeCursor(
|
||||
database::GraphDbAccessor &db) const {
|
||||
database::GraphDbAccessor &) const {
|
||||
// TODO: Implement a concrete cursor.
|
||||
return nullptr;
|
||||
}
|
||||
@ -2572,10 +2573,77 @@ PullRemote::PullRemote(const std::shared_ptr<LogicalOperator> &input,
|
||||
|
||||
ACCEPT_WITH_INPUT(PullRemote);
|
||||
|
||||
PullRemote::PullRemoteCursor::PullRemoteCursor(const PullRemote &self,
|
||||
database::GraphDbAccessor &db)
|
||||
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {
|
||||
worker_ids_ = db_.db().remote_pull_clients().GetWorkerIds();
|
||||
// remove master from the worker_ids list
|
||||
worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0));
|
||||
}
|
||||
|
||||
void PullRemote::PullRemoteCursor::EndRemotePull() {
|
||||
if (remote_pull_ended_) return;
|
||||
db_.db().remote_pull_clients().EndRemotePull(db_.transaction().id_,
|
||||
self_.plan_id());
|
||||
remote_pull_ended_ = true;
|
||||
}
|
||||
|
||||
bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) {
|
||||
if (input_cursor_->Pull(frame, context)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
while (worker_ids_.size() > 0 && results_.empty()) {
|
||||
last_pulled_worker_ = (last_pulled_worker_ + 1) % worker_ids_.size();
|
||||
auto remote_results = db_.db().remote_pull_clients().RemotePull(
|
||||
db_.transaction().id_, worker_ids_[last_pulled_worker_],
|
||||
self_.plan_id(), context.parameters_, self_.symbols());
|
||||
|
||||
auto get_results = [&]() {
|
||||
for (auto &result : remote_results.frames) {
|
||||
results_.emplace(std::move(result));
|
||||
}
|
||||
};
|
||||
|
||||
switch (remote_results.pull_state) {
|
||||
case distributed::RemotePullState::CURSOR_EXHAUSTED:
|
||||
get_results();
|
||||
worker_ids_.erase(worker_ids_.begin() + last_pulled_worker_);
|
||||
break;
|
||||
case distributed::RemotePullState::CURSOR_IN_PROGRESS:
|
||||
get_results();
|
||||
break;
|
||||
case distributed::RemotePullState::SERIALIZATION_ERROR:
|
||||
EndRemotePull();
|
||||
throw mvcc::SerializationError(
|
||||
"Serialization error occured during PullRemote!");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// if the results_ are still empty, we've exhausted all worker results
|
||||
if (results_.empty()) {
|
||||
EndRemotePull();
|
||||
return false;
|
||||
}
|
||||
|
||||
auto &result = results_.front();
|
||||
for (size_t i = 0; i < self_.symbols().size(); ++i) {
|
||||
frame[self_.symbols()[i]] = std::move(result[i]);
|
||||
}
|
||||
results_.pop();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void PullRemote::PullRemoteCursor::Reset() {
|
||||
EndRemotePull();
|
||||
throw QueryRuntimeException("Unsupported: Reset during PullRemote!");
|
||||
}
|
||||
|
||||
std::unique_ptr<Cursor> PullRemote::MakeCursor(
|
||||
database::GraphDbAccessor &db) const {
|
||||
// TODO: Implement a concrete cursor.
|
||||
return nullptr;
|
||||
return std::make_unique<PullRemote::PullRemoteCursor>(*this, db);
|
||||
}
|
||||
|
||||
} // namespace query::plan
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
#include <experimental/optional>
|
||||
#include <memory>
|
||||
#include <queue>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <utility>
|
||||
@ -163,7 +164,7 @@ class LogicalOperator
|
||||
friend class boost::serialization::access;
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, const unsigned int) {}
|
||||
void serialize(TArchive &, const unsigned int) {}
|
||||
};
|
||||
|
||||
template <class TArchive>
|
||||
@ -2267,6 +2268,7 @@ class PullRemote : public LogicalOperator {
|
||||
database::GraphDbAccessor &db) const override;
|
||||
|
||||
const auto &symbols() const { return symbols_; }
|
||||
auto plan_id() const { return plan_id_; }
|
||||
|
||||
private:
|
||||
std::shared_ptr<LogicalOperator> input_;
|
||||
@ -2275,6 +2277,24 @@ class PullRemote : public LogicalOperator {
|
||||
|
||||
PullRemote() {}
|
||||
|
||||
class PullRemoteCursor : public Cursor {
|
||||
public:
|
||||
PullRemoteCursor(const PullRemote &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
void EndRemotePull();
|
||||
|
||||
const PullRemote &self_;
|
||||
database::GraphDbAccessor &db_;
|
||||
const std::unique_ptr<Cursor> input_cursor_;
|
||||
std::queue<std::vector<query::TypedValue>> results_;
|
||||
std::vector<int> worker_ids_;
|
||||
int last_pulled_worker_ = -1;
|
||||
bool remote_pull_ended_ = false;
|
||||
};
|
||||
|
||||
friend class boost::serialization::access;
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, const unsigned int) {
|
||||
|
Loading…
Reference in New Issue
Block a user