Don't implicitly wait on futures during query execution
Summary: Even though we may not need the results of a RPC, we should check that it completed without error. Reviewers: mferencevic, mtomic Reviewed By: mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1610
This commit is contained in:
parent
9980f899c9
commit
91566bb9fc
@ -47,6 +47,10 @@ void BfsRpcClients::RegisterSubcursors(
|
||||
});
|
||||
subcursor_storage_->Get(subcursor_ids.at(db_->WorkerId()))
|
||||
->RegisterSubcursors(subcursor_ids);
|
||||
// Wait and get all of the replies.
|
||||
for (auto &future : futures) {
|
||||
if (future.valid()) future.get();
|
||||
}
|
||||
}
|
||||
|
||||
void BfsRpcClients::ResetSubcursors(
|
||||
@ -58,6 +62,10 @@ void BfsRpcClients::ResetSubcursors(
|
||||
CHECK(res) << "ResetSubcursor RPC failed!";
|
||||
});
|
||||
subcursor_storage_->Get(subcursor_ids.at(db_->WorkerId()))->Reset();
|
||||
// Wait and get all of the replies.
|
||||
for (auto &future : futures) {
|
||||
if (future.valid()) future.get();
|
||||
}
|
||||
}
|
||||
|
||||
void BfsRpcClients::RemoveBfsSubcursors(
|
||||
@ -69,6 +77,10 @@ void BfsRpcClients::RemoveBfsSubcursors(
|
||||
CHECK(res) << "RemoveBfsSubcursor RPC failed!";
|
||||
});
|
||||
subcursor_storage_->Erase(subcursor_ids.at(db_->WorkerId()));
|
||||
// Wait and get all of the replies.
|
||||
for (auto &future : futures) {
|
||||
if (future.valid()) future.get();
|
||||
}
|
||||
}
|
||||
|
||||
std::experimental::optional<VertexAccessor> BfsRpcClients::Pull(
|
||||
@ -178,7 +190,7 @@ PathSegment BfsRpcClients::ReconstructPath(
|
||||
|
||||
void BfsRpcClients::PrepareForExpand(
|
||||
const std::unordered_map<int16_t, int64_t> &subcursor_ids, bool clear) {
|
||||
auto res = clients_->ExecuteOnWorkers<void>(
|
||||
auto futures = clients_->ExecuteOnWorkers<void>(
|
||||
db_->WorkerId(), [clear, &subcursor_ids](int worker_id, auto &client) {
|
||||
auto res = client.template Call<PrepareForExpandRpc>(
|
||||
subcursor_ids.at(worker_id), clear);
|
||||
@ -186,6 +198,10 @@ void BfsRpcClients::PrepareForExpand(
|
||||
});
|
||||
subcursor_storage_->Get(subcursor_ids.at(db_->WorkerId()))
|
||||
->PrepareForExpand(clear);
|
||||
// Wait and get all of the replies.
|
||||
for (auto &future : futures) {
|
||||
if (future.valid()) future.get();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace distributed
|
||||
|
@ -78,6 +78,7 @@ ProduceRpcServer::OngoingProduce::PullOneFromCursor() {
|
||||
}
|
||||
} else {
|
||||
cursor_state_ = PullState::CURSOR_EXHAUSTED;
|
||||
cursor_->Shutdown();
|
||||
}
|
||||
} catch (const mvcc::SerializationError &) {
|
||||
cursor_state_ = PullState::SERIALIZATION_ERROR;
|
||||
|
@ -120,6 +120,7 @@ class Interpreter {
|
||||
access.remove(kv.first);
|
||||
}
|
||||
}
|
||||
cursor_->Shutdown();
|
||||
}
|
||||
|
||||
return return_value;
|
||||
|
@ -278,6 +278,15 @@ class RemotePuller {
|
||||
}
|
||||
}
|
||||
|
||||
void Shutdown() {
|
||||
// Explicitly get all of the requested RPC futures, so that we register any
|
||||
// exceptions.
|
||||
for (auto &remote_pull : remote_pulls_) {
|
||||
if (remote_pull.second.valid()) remote_pull.second.get();
|
||||
}
|
||||
remote_pulls_.clear();
|
||||
}
|
||||
|
||||
void Reset() {
|
||||
worker_ids_ = pull_clients_->GetWorkerIds();
|
||||
// Remove master from the worker ids list.
|
||||
@ -286,6 +295,9 @@ class RemotePuller {
|
||||
// We must clear remote_pulls before reseting cursors to make sure that all
|
||||
// outstanding remote pulls are done. Otherwise we might try to reset cursor
|
||||
// during its pull.
|
||||
for (auto &remote_pull : remote_pulls_) {
|
||||
if (remote_pull.second.valid()) remote_pull.second.get();
|
||||
}
|
||||
remote_pulls_.clear();
|
||||
for (auto &worker_id : worker_ids_) {
|
||||
pull_clients_->ResetCursor(&db_, worker_id, plan_id_, command_id_);
|
||||
@ -432,6 +444,11 @@ class PullRemoteCursor : public Cursor {
|
||||
return true;
|
||||
}
|
||||
|
||||
void Shutdown() override {
|
||||
if (input_cursor_) input_cursor_->Reset();
|
||||
remote_puller_.Shutdown();
|
||||
}
|
||||
|
||||
void Reset() override {
|
||||
if (input_cursor_) input_cursor_->Reset();
|
||||
remote_puller_.Reset();
|
||||
@ -499,9 +516,14 @@ class SynchronizeCursor : public Cursor {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Shutdown() override {
|
||||
input_cursor_->Shutdown();
|
||||
if (pull_remote_cursor_) pull_remote_cursor_->Shutdown();
|
||||
}
|
||||
|
||||
void Reset() override {
|
||||
input_cursor_->Reset();
|
||||
pull_remote_cursor_->Reset();
|
||||
if (pull_remote_cursor_) pull_remote_cursor_->Reset();
|
||||
initial_pull_done_ = false;
|
||||
local_frames_.clear();
|
||||
}
|
||||
@ -608,7 +630,7 @@ class SynchronizeCursor : public Cursor {
|
||||
// If the command advanced, let the workers know.
|
||||
if (self_.advance_command()) {
|
||||
auto futures = pull_clients_->NotifyAllTransactionCommandAdvanced(tx_id);
|
||||
for (auto &future : futures) future.wait();
|
||||
for (auto &future : futures) future.get();
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -625,7 +647,7 @@ class PullRemoteOrderByCursor : public Cursor {
|
||||
&dynamic_cast<database::Master *>(&db.db())->pull_clients(), db,
|
||||
self.symbols(), self.plan_id(), command_id_) {}
|
||||
|
||||
bool Pull(Frame &frame, Context &context) {
|
||||
bool Pull(Frame &frame, Context &context) override {
|
||||
if (context.db_accessor_.should_abort()) throw HintedAbortError();
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table_,
|
||||
context.evaluation_context_,
|
||||
@ -736,7 +758,12 @@ class PullRemoteOrderByCursor : public Cursor {
|
||||
return true;
|
||||
}
|
||||
|
||||
void Reset() {
|
||||
void Shutdown() override {
|
||||
input_->Shutdown();
|
||||
remote_puller_.Shutdown();
|
||||
}
|
||||
|
||||
void Reset() override {
|
||||
input_->Reset();
|
||||
remote_puller_.Reset();
|
||||
merge_.clear();
|
||||
@ -768,7 +795,7 @@ class DistributedExpandCursor : public query::plan::Cursor {
|
||||
database::GraphDbAccessor *db)
|
||||
: input_cursor_(self->input()->MakeCursor(*db)), self_(self) {}
|
||||
|
||||
bool Pull(Frame &frame, Context &context) {
|
||||
bool Pull(Frame &frame, Context &context) override {
|
||||
// A helper function for expanding a node from an edge.
|
||||
auto pull_node = [this, &frame](const EdgeAccessor &new_edge,
|
||||
EdgeAtom::Direction direction) {
|
||||
@ -888,12 +915,27 @@ class DistributedExpandCursor : public query::plan::Cursor {
|
||||
}
|
||||
}
|
||||
|
||||
void Reset() {
|
||||
void Shutdown() override {
|
||||
input_cursor_->Shutdown();
|
||||
// Explicitly get all of the requested RPC futures, so that we register any
|
||||
// exceptions.
|
||||
for (auto &future_expand : future_expands_) {
|
||||
if (future_expand.edge_to.valid()) future_expand.edge_to.get();
|
||||
}
|
||||
future_expands_.clear();
|
||||
}
|
||||
|
||||
void Reset() override {
|
||||
input_cursor_->Reset();
|
||||
in_edges_ = std::experimental::nullopt;
|
||||
in_edges_it_ = std::experimental::nullopt;
|
||||
out_edges_ = std::experimental::nullopt;
|
||||
out_edges_it_ = std::experimental::nullopt;
|
||||
// Explicitly get all of the requested RPC futures, so that we register any
|
||||
// exceptions.
|
||||
for (auto &future_expand : future_expands_) {
|
||||
if (future_expand.edge_to.valid()) future_expand.edge_to.get();
|
||||
}
|
||||
future_expands_.clear();
|
||||
last_frame_.clear();
|
||||
}
|
||||
@ -1125,7 +1167,10 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
|
||||
}
|
||||
}
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void Reset() override {
|
||||
input_cursor_->Reset();
|
||||
bfs_subcursor_clients_->ResetSubcursors(subcursor_ids_);
|
||||
pull_pos_ = subcursor_ids_.end();
|
||||
}
|
||||
@ -1217,7 +1262,7 @@ class DistributedCreateNodeCursor : public query::plan::Cursor {
|
||||
CHECK(node_atom_);
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, Context &context) {
|
||||
bool Pull(Frame &frame, Context &context) override {
|
||||
if (input_cursor_->Pull(frame, context)) {
|
||||
if (on_random_worker_) {
|
||||
CreateVertexOnWorker(RandomWorkerId(*db_), node_atom_, frame, context);
|
||||
@ -1229,7 +1274,9 @@ class DistributedCreateNodeCursor : public query::plan::Cursor {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Reset() { input_cursor_->Reset(); }
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void Reset() override { input_cursor_->Reset(); }
|
||||
|
||||
private:
|
||||
std::unique_ptr<query::plan::Cursor> input_cursor_;
|
||||
@ -1249,7 +1296,7 @@ class DistributedCreateExpandCursor : public query::plan::Cursor {
|
||||
CHECK(db_);
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, Context &context) {
|
||||
bool Pull(Frame &frame, Context &context) override {
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
|
||||
// get the origin vertex
|
||||
@ -1289,7 +1336,9 @@ class DistributedCreateExpandCursor : public query::plan::Cursor {
|
||||
return true;
|
||||
}
|
||||
|
||||
void Reset() { input_cursor_->Reset(); }
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void Reset() override { input_cursor_->Reset(); }
|
||||
|
||||
VertexAccessor &OtherVertex(int worker_id, Frame &frame, Context &context) {
|
||||
if (self_->existing_node()) {
|
||||
|
@ -84,6 +84,8 @@ std::unique_ptr<Cursor> Once::MakeCursor(database::GraphDbAccessor &) const {
|
||||
|
||||
WITHOUT_SINGLE_INPUT(Once);
|
||||
|
||||
void Once::OnceCursor::Shutdown() {}
|
||||
|
||||
void Once::OnceCursor::Reset() { did_pull_ = false; }
|
||||
|
||||
CreateNode::CreateNode(const std::shared_ptr<LogicalOperator> &input,
|
||||
@ -135,6 +137,8 @@ bool CreateNode::CreateNodeCursor::Pull(Frame &frame, Context &context) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void CreateNode::CreateNodeCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void CreateNode::CreateNodeCursor::Reset() { input_cursor_->Reset(); }
|
||||
|
||||
CreateExpand::CreateExpand(NodeAtom *node_atom, EdgeAtom *edge_atom,
|
||||
@ -206,6 +210,8 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, Context &context) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void CreateExpand::CreateExpandCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void CreateExpand::CreateExpandCursor::Reset() { input_cursor_->Reset(); }
|
||||
|
||||
VertexAccessor &CreateExpand::CreateExpandCursor::OtherVertex(
|
||||
@ -263,6 +269,8 @@ class ScanAllCursor : public Cursor {
|
||||
return true;
|
||||
}
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void Reset() override {
|
||||
input_cursor_->Reset();
|
||||
vertices_ = std::experimental::nullopt;
|
||||
@ -507,6 +515,8 @@ bool Expand::ExpandCursor::Pull(Frame &frame, Context &context) {
|
||||
}
|
||||
}
|
||||
|
||||
void Expand::ExpandCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void Expand::ExpandCursor::Reset() {
|
||||
input_cursor_->Reset();
|
||||
in_edges_ = std::experimental::nullopt;
|
||||
@ -682,6 +692,8 @@ class ExpandVariableCursor : public Cursor {
|
||||
}
|
||||
}
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void Reset() override {
|
||||
input_cursor_->Reset();
|
||||
edges_.clear();
|
||||
@ -924,6 +936,8 @@ class STShortestPathCursor : public query::plan::Cursor {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void Reset() override { input_cursor_->Reset(); }
|
||||
|
||||
private:
|
||||
@ -1232,6 +1246,9 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void Reset() override {
|
||||
input_cursor_->Reset();
|
||||
processed_.clear();
|
||||
@ -1436,6 +1453,8 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
||||
}
|
||||
}
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void Reset() override {
|
||||
input_cursor_->Reset();
|
||||
previous_.clear();
|
||||
@ -1586,6 +1605,8 @@ class ConstructNamedPathCursor : public Cursor {
|
||||
return true;
|
||||
}
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void Reset() override { input_cursor_->Reset(); }
|
||||
|
||||
private:
|
||||
@ -1639,6 +1660,8 @@ bool Filter::FilterCursor::Pull(Frame &frame, Context &context) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Filter::FilterCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void Filter::FilterCursor::Reset() { input_cursor_->Reset(); }
|
||||
|
||||
Produce::Produce(const std::shared_ptr<LogicalOperator> &input,
|
||||
@ -1683,6 +1706,8 @@ bool Produce::ProduceCursor::Pull(Frame &frame, Context &context) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Produce::ProduceCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void Produce::ProduceCursor::Reset() { input_cursor_->Reset(); }
|
||||
|
||||
Delete::Delete(const std::shared_ptr<LogicalOperator> &input_,
|
||||
@ -1754,6 +1779,8 @@ bool Delete::DeleteCursor::Pull(Frame &frame, Context &context) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void Delete::DeleteCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void Delete::DeleteCursor::Reset() { input_cursor_->Reset(); }
|
||||
|
||||
SetProperty::SetProperty(const std::shared_ptr<LogicalOperator> &input,
|
||||
@ -1810,6 +1837,8 @@ bool SetProperty::SetPropertyCursor::Pull(Frame &frame, Context &context) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void SetProperty::SetPropertyCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void SetProperty::SetPropertyCursor::Reset() { input_cursor_->Reset(); }
|
||||
|
||||
SetProperties::SetProperties(const std::shared_ptr<LogicalOperator> &input,
|
||||
@ -1860,6 +1889,10 @@ bool SetProperties::SetPropertiesCursor::Pull(Frame &frame, Context &context) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void SetProperties::SetPropertiesCursor::Shutdown() {
|
||||
input_cursor_->Shutdown();
|
||||
}
|
||||
|
||||
void SetProperties::SetPropertiesCursor::Reset() { input_cursor_->Reset(); }
|
||||
|
||||
template <typename TRecordAccessor>
|
||||
@ -1948,6 +1981,8 @@ bool SetLabels::SetLabelsCursor::Pull(Frame &frame, Context &context) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void SetLabels::SetLabelsCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void SetLabels::SetLabelsCursor::Reset() { input_cursor_->Reset(); }
|
||||
|
||||
RemoveProperty::RemoveProperty(const std::shared_ptr<LogicalOperator> &input,
|
||||
@ -2007,6 +2042,10 @@ bool RemoveProperty::RemovePropertyCursor::Pull(Frame &frame,
|
||||
return true;
|
||||
}
|
||||
|
||||
void RemoveProperty::RemovePropertyCursor::Shutdown() {
|
||||
input_cursor_->Shutdown();
|
||||
}
|
||||
|
||||
void RemoveProperty::RemovePropertyCursor::Reset() { input_cursor_->Reset(); }
|
||||
|
||||
RemoveLabels::RemoveLabels(const std::shared_ptr<LogicalOperator> &input,
|
||||
@ -2048,6 +2087,8 @@ bool RemoveLabels::RemoveLabelsCursor::Pull(Frame &frame, Context &context) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void RemoveLabels::RemoveLabelsCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void RemoveLabels::RemoveLabelsCursor::Reset() { input_cursor_->Reset(); }
|
||||
|
||||
template <typename TAccessor>
|
||||
@ -2129,6 +2170,12 @@ bool ExpandUniquenessFilter<TAccessor>::ExpandUniquenessFilterCursor::Pull(
|
||||
return false;
|
||||
}
|
||||
|
||||
template <typename TAccessor>
|
||||
void ExpandUniquenessFilter<
|
||||
TAccessor>::ExpandUniquenessFilterCursor::Shutdown() {
|
||||
input_cursor_->Shutdown();
|
||||
}
|
||||
|
||||
template <typename TAccessor>
|
||||
void ExpandUniquenessFilter<TAccessor>::ExpandUniquenessFilterCursor::Reset() {
|
||||
input_cursor_->Reset();
|
||||
@ -2184,6 +2231,8 @@ bool Accumulate::AccumulateCursor::Pull(Frame &frame, Context &context) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void Accumulate::AccumulateCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void Accumulate::AccumulateCursor::Reset() {
|
||||
input_cursor_->Reset();
|
||||
cache_.clear();
|
||||
@ -2433,6 +2482,8 @@ void Aggregate::AggregateCursor::Update(
|
||||
} // end loop over all aggregations
|
||||
}
|
||||
|
||||
void Aggregate::AggregateCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void Aggregate::AggregateCursor::Reset() {
|
||||
input_cursor_->Reset();
|
||||
aggregation_.clear();
|
||||
@ -2524,6 +2575,8 @@ bool Skip::SkipCursor::Pull(Frame &frame, Context &context) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Skip::SkipCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void Skip::SkipCursor::Reset() {
|
||||
input_cursor_->Reset();
|
||||
to_skip_ = -1;
|
||||
@ -2582,6 +2635,8 @@ bool Limit::LimitCursor::Pull(Frame &frame, Context &context) {
|
||||
return input_cursor_->Pull(frame, context);
|
||||
}
|
||||
|
||||
void Limit::LimitCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void Limit::LimitCursor::Reset() {
|
||||
input_cursor_->Reset();
|
||||
limit_ = -1;
|
||||
@ -2669,6 +2724,8 @@ bool OrderBy::OrderByCursor::Pull(Frame &frame, Context &context) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void OrderBy::OrderByCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void OrderBy::OrderByCursor::Reset() {
|
||||
input_cursor_->Reset();
|
||||
did_pull_all_ = false;
|
||||
@ -2746,6 +2803,12 @@ bool Merge::MergeCursor::Pull(Frame &frame, Context &context) {
|
||||
}
|
||||
}
|
||||
|
||||
void Merge::MergeCursor::Shutdown() {
|
||||
input_cursor_->Shutdown();
|
||||
merge_match_cursor_->Shutdown();
|
||||
merge_create_cursor_->Shutdown();
|
||||
}
|
||||
|
||||
void Merge::MergeCursor::Reset() {
|
||||
input_cursor_->Reset();
|
||||
merge_match_cursor_->Reset();
|
||||
@ -2820,6 +2883,11 @@ bool Optional::OptionalCursor::Pull(Frame &frame, Context &context) {
|
||||
}
|
||||
}
|
||||
|
||||
void Optional::OptionalCursor::Shutdown() {
|
||||
input_cursor_->Shutdown();
|
||||
optional_cursor_->Shutdown();
|
||||
}
|
||||
|
||||
void Optional::OptionalCursor::Reset() {
|
||||
input_cursor_->Reset();
|
||||
optional_cursor_->Reset();
|
||||
@ -2876,6 +2944,8 @@ bool Unwind::UnwindCursor::Pull(Frame &frame, Context &context) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void Unwind::UnwindCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void Unwind::UnwindCursor::Reset() {
|
||||
input_cursor_->Reset();
|
||||
input_value_.clear();
|
||||
@ -2920,6 +2990,8 @@ bool Distinct::DistinctCursor::Pull(Frame &frame, Context &context) {
|
||||
}
|
||||
}
|
||||
|
||||
void Distinct::DistinctCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void Distinct::DistinctCursor::Reset() {
|
||||
input_cursor_->Reset();
|
||||
seen_rows_.clear();
|
||||
@ -2953,6 +3025,8 @@ class CreateIndexCursor : public Cursor {
|
||||
return true;
|
||||
}
|
||||
|
||||
void Shutdown() override {}
|
||||
|
||||
void Reset() override { did_create_ = false; }
|
||||
|
||||
private:
|
||||
@ -3029,6 +3103,11 @@ bool Union::UnionCursor::Pull(Frame &frame, Context &context) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void Union::UnionCursor::Shutdown() {
|
||||
left_cursor_->Shutdown();
|
||||
right_cursor_->Shutdown();
|
||||
}
|
||||
|
||||
void Union::UnionCursor::Reset() {
|
||||
left_cursor_->Reset();
|
||||
right_cursor_->Reset();
|
||||
@ -3113,6 +3192,11 @@ class CartesianCursor : public Cursor {
|
||||
return true;
|
||||
}
|
||||
|
||||
void Shutdown() override {
|
||||
left_op_cursor_->Shutdown();
|
||||
right_op_cursor_->Shutdown();
|
||||
}
|
||||
|
||||
void Reset() override {
|
||||
left_op_cursor_->Reset();
|
||||
right_op_cursor_->Reset();
|
||||
@ -3503,6 +3587,8 @@ class AuthHandlerCursor : public Cursor {
|
||||
}
|
||||
}
|
||||
|
||||
void Shutdown() override {}
|
||||
|
||||
void Reset() override {
|
||||
LOG(FATAL) << "AuthHandler cursor should never be reset";
|
||||
}
|
||||
@ -3587,6 +3673,8 @@ class CreateStreamCursor : public Cursor {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Shutdown() override {}
|
||||
|
||||
void Reset() override { throw utils::NotYetImplemented("Create Stream"); }
|
||||
|
||||
private:
|
||||
@ -3621,6 +3709,8 @@ class DropStreamCursor : public Cursor {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Shutdown() override {}
|
||||
|
||||
void Reset() override { throw utils::NotYetImplemented("Drop Stream"); }
|
||||
|
||||
private:
|
||||
@ -3677,6 +3767,8 @@ class ShowStreamsCursor : public Cursor {
|
||||
return true;
|
||||
}
|
||||
|
||||
void Shutdown() override {}
|
||||
|
||||
void Reset() override { throw utils::NotYetImplemented("Show Streams"); }
|
||||
|
||||
private:
|
||||
@ -3734,6 +3826,8 @@ class StartStopStreamCursor : public Cursor {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Shutdown() override {}
|
||||
|
||||
void Reset() override { throw utils::NotYetImplemented("Start/Stop Stream"); }
|
||||
|
||||
private:
|
||||
@ -3773,6 +3867,8 @@ class StartStopAllStreamsCursor : public Cursor {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Shutdown() override {}
|
||||
|
||||
void Reset() override {
|
||||
throw utils::NotYetImplemented("Start/Stop All Streams");
|
||||
}
|
||||
@ -3846,6 +3942,8 @@ class TestStreamCursor : public Cursor {
|
||||
return true;
|
||||
}
|
||||
|
||||
void Shutdown() override {}
|
||||
|
||||
void Reset() override { throw utils::NotYetImplemented("Test Stream"); }
|
||||
|
||||
private:
|
||||
@ -3901,6 +3999,8 @@ class ExplainCursor : public Cursor {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Shutdown() override {}
|
||||
|
||||
void Reset() override { print_it_ = printed_plan_rows_.begin(); }
|
||||
|
||||
private:
|
||||
|
@ -45,30 +45,31 @@ cpp<#
|
||||
(lcp:namespace plan)
|
||||
|
||||
#>cpp
|
||||
/** @brief Base class for iteration cursors of @c LogicalOperator classes.
|
||||
*
|
||||
* Each @c LogicalOperator must produce a concrete @c Cursor, which provides
|
||||
* the iteration mechanism.
|
||||
*/
|
||||
/// Base class for iteration cursors of @c LogicalOperator classes.
|
||||
///
|
||||
/// Each @c LogicalOperator must produce a concrete @c Cursor, which provides
|
||||
/// the iteration mechanism.
|
||||
class Cursor {
|
||||
public:
|
||||
/** @brief Run an iteration of a @c LogicalOperator.
|
||||
*
|
||||
* Since operators may be chained, the iteration may pull results from
|
||||
* multiple operators.
|
||||
*
|
||||
* @param Frame May be read from or written to while performing the
|
||||
* iteration.
|
||||
* @param Context Used to get the position of symbols in frame and other
|
||||
* information.
|
||||
*/
|
||||
/// Run an iteration of a @c LogicalOperator.
|
||||
///
|
||||
/// Since operators may be chained, the iteration may pull results from
|
||||
/// multiple operators.
|
||||
///
|
||||
/// @param Frame May be read from or written to while performing the
|
||||
/// iteration.
|
||||
/// @param Context Used to get the position of symbols in frame and other
|
||||
/// information.
|
||||
///
|
||||
/// @throws QueryRuntimeException if something went wrong with execution
|
||||
virtual bool Pull(Frame &, Context &) = 0;
|
||||
|
||||
/**
|
||||
* Resets the Cursor to it's initial state.
|
||||
*/
|
||||
/// Resets the Cursor to its initial state.
|
||||
virtual void Reset() = 0;
|
||||
|
||||
/// Perform cleanup which may throw an exception
|
||||
virtual void Shutdown() = 0;
|
||||
|
||||
virtual ~Cursor() {}
|
||||
};
|
||||
|
||||
@ -317,6 +318,7 @@ and false on every following Pull.")
|
||||
public:
|
||||
OnceCursor() {}
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -371,6 +373,7 @@ a preceeding `MATCH`), or multiple nodes (`MATCH ... CREATE` or
|
||||
public:
|
||||
CreateNodeCursor(const CreateNode &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -446,6 +449,7 @@ chained in cases when longer paths need creating.
|
||||
public:
|
||||
CreateExpandCursor(const CreateExpand &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -791,6 +795,7 @@ pulled.")
|
||||
public:
|
||||
ExpandCursor(const Expand &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -993,6 +998,7 @@ a boolean value.")
|
||||
public:
|
||||
FilterCursor(const Filter &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1042,6 +1048,7 @@ RETURN clause) the Produce's pull succeeds exactly once.")
|
||||
public:
|
||||
ProduceCursor(const Produce &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1091,6 +1098,7 @@ Has a flag for using DETACH DELETE when deleting vertices.")
|
||||
public:
|
||||
DeleteCursor(const Delete &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1139,6 +1147,7 @@ can be stored (a TypedValue that can be converted to PropertyValue).")
|
||||
public:
|
||||
SetPropertyCursor(const SetProperty &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1198,6 +1207,7 @@ that the old props are discarded and replaced with new ones.")
|
||||
SetPropertiesCursor(const SetProperties &self,
|
||||
database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1251,6 +1261,7 @@ It does NOT remove labels that are already set on that Vertex.")
|
||||
public:
|
||||
SetLabelsCursor(const SetLabels &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1293,6 +1304,7 @@ It does NOT remove labels that are already set on that Vertex.")
|
||||
RemovePropertyCursor(const RemoveProperty &self,
|
||||
database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1337,6 +1349,7 @@ If a label does not exist on a Vertex, nothing happens.")
|
||||
public:
|
||||
RemoveLabelsCursor(const RemoveLabels &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1401,6 +1414,7 @@ between edges and an edge lists).")
|
||||
ExpandUniquenessFilterCursor(const ExpandUniquenessFilter &self,
|
||||
database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1468,6 +1482,7 @@ has been cached will be reconstructed before Pull returns.
|
||||
public:
|
||||
AccumulateCursor(const Accumulate &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1569,6 +1584,7 @@ elements are in an undefined state after aggregation.")
|
||||
public:
|
||||
AggregateCursor(const Aggregate &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1688,6 +1704,7 @@ operator's implementation does not expect this.")
|
||||
public:
|
||||
SkipCursor(const Skip &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1746,6 +1763,7 @@ input should be performed).")
|
||||
public:
|
||||
LimitCursor(const Limit &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1808,6 +1826,7 @@ are valid for usage after the OrderBy operator.")
|
||||
public:
|
||||
OrderByCursor(const OrderBy &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1875,6 +1894,7 @@ documentation.")
|
||||
public:
|
||||
MergeCursor(const Merge &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -1933,6 +1953,7 @@ and returns true, once.")
|
||||
public:
|
||||
OptionalCursor(const Optional &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -2116,6 +2137,7 @@ Input is optional (unwind can be the first clause in a query).")
|
||||
public:
|
||||
UnwindCursor(const Unwind &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -2169,6 +2191,7 @@ This implementation maintains input ordering.")
|
||||
DistinctCursor(const Distinct &self, database::GraphDbAccessor &db);
|
||||
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
@ -2260,6 +2283,7 @@ vectors of symbols used by each of the inputs.")
|
||||
public:
|
||||
UnionCursor(const Union &self, database::GraphDbAccessor &db);
|
||||
bool Pull(Frame &, Context &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
|
Loading…
Reference in New Issue
Block a user