diff --git a/src/distributed/produce_rpc_server.cpp b/src/distributed/produce_rpc_server.cpp index 8ae02cb4e..4a6630d0d 100644 --- a/src/distributed/produce_rpc_server.cpp +++ b/src/distributed/produce_rpc_server.cpp @@ -20,7 +20,7 @@ ProduceRpcServer::OngoingProduce::OngoingProduce( frame_(plan_pack.symbol_table.max_position()), execution_memory_(std::make_unique<utils::MonotonicBufferResource>( query::kExecutionMemoryBlockSize)), - cursor_(plan_pack.plan->MakeCursor(dba_.get(), execution_memory_.get())) { + cursor_(plan_pack.plan->MakeCursor(execution_memory_.get())) { context_.symbol_table = plan_pack.symbol_table; // TODO: Maybe we want a seperate MemoryResource per pull evaluation context_.evaluation_context.memory = execution_memory_.get(); diff --git a/src/query/distributed/plan/ops.cpp b/src/query/distributed/plan/ops.cpp index b38750bf8..c783731fc 100644 --- a/src/query/distributed/plan/ops.cpp +++ b/src/query/distributed/plan/ops.cpp @@ -216,23 +216,20 @@ namespace { // initialization of a cursor. class RemotePuller { public: - RemotePuller(distributed::PullRpcClients *pull_clients, - database::GraphDbAccessor &db, - const std::vector<Symbol> &symbols, int64_t plan_id, - tx::CommandId command_id) - : pull_clients_(pull_clients), - db_(db), - symbols_(symbols), - plan_id_(plan_id), - command_id_(command_id) { - CHECK(pull_clients_); - worker_ids_ = pull_clients_->GetWorkerIds(); - // Remove master from the worker ids list. - worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0)); - } + RemotePuller(const std::vector<Symbol> &symbols, int64_t plan_id) + : symbols_(symbols), plan_id_(plan_id) {} - void Initialize(ExecutionContext &context) { + void Initialize(const ExecutionContext &context) { if (!remote_pulls_initialized_) { + db_ = context.db_accessor; + command_id_ = context.db_accessor->transaction().cid(); + // TODO: Pass in a Master GraphDb. + pull_clients_ = + &dynamic_cast<database::Master *>(&db_->db())->pull_clients(); + CHECK(pull_clients_); + worker_ids_ = pull_clients_->GetWorkerIds(); + // Remove master from the worker ids list. + worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0)); VLOG(10) << "[RemotePuller] [" << context.db_accessor->transaction_id() << "] [" << plan_id_ << "] [" << command_id_ << "] initialized"; for (auto &worker_id : worker_ids_) { @@ -317,6 +314,7 @@ class RemotePuller { } void Reset() { + if (!remote_pulls_initialized_) return; worker_ids_ = pull_clients_->GetWorkerIds(); // Remove master from the worker ids list. worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0)); @@ -329,7 +327,7 @@ class RemotePuller { } remote_pulls_.clear(); for (auto &worker_id : worker_ids_) { - pull_clients_->ResetCursor(&db_, worker_id, plan_id_, command_id_); + pull_clients_->ResetCursor(db_, worker_id, plan_id_, command_id_); } remote_results_.clear(); remote_pulls_initialized_ = false; @@ -368,9 +366,11 @@ class RemotePuller { return result; } + auto command_id() const { return command_id_; } + private: distributed::PullRpcClients *pull_clients_{nullptr}; - database::GraphDbAccessor &db_; + database::GraphDbAccessor *db_; std::vector<Symbol> symbols_; int64_t plan_id_; tx::CommandId command_id_; @@ -382,23 +382,17 @@ class RemotePuller { void UpdatePullForWorker(int worker_id, const ExecutionContext &context) { remote_pulls_[worker_id] = - pull_clients_->Pull(&db_, worker_id, plan_id_, command_id_, + pull_clients_->Pull(db_, worker_id, plan_id_, command_id_, context.evaluation_context, symbols_, false); } }; class PullRemoteCursor : public Cursor { public: - PullRemoteCursor(const PullRemote &self, database::GraphDbAccessor *db, - utils::MemoryResource *mem) + PullRemoteCursor(const PullRemote &self, utils::MemoryResource *mem) : self_(self), - input_cursor_(self.input() ? self.input()->MakeCursor(db, mem) - : nullptr), - command_id_(db->transaction().cid()), - remote_puller_( - // TODO: Pass in a Master GraphDb. - &dynamic_cast<database::Master *>(&db->db())->pull_clients(), *db, - self.symbols_, self.plan_id_, command_id_) {} + input_cursor_(self.input() ? self.input()->MakeCursor(mem) : nullptr), + remote_puller_(self.symbols_, self.plan_id_) {} bool Pull(Frame &frame, ExecutionContext &context) override { if (context.db_accessor->should_abort()) throw HintedAbortError(); @@ -434,14 +428,14 @@ class PullRemoteCursor : public Cursor { if (input_cursor_ && input_cursor_->Pull(frame, context)) { VLOG(10) << "[PullRemoteCursor] [" << context.db_accessor->transaction_id() << "] [" - << self_.plan_id_ << "] [" << command_id_ + << self_.plan_id_ << "] [" << command_id() << "] producing local results "; return true; } VLOG(10) << "[PullRemoteCursor] [" << context.db_accessor->transaction_id() << "] [" - << self_.plan_id_ << "] [" << command_id_ + << self_.plan_id_ << "] [" << command_id() << "] no results available, sleeping "; // If there aren't any local/remote results available, sleep. std::this_thread::sleep_for( @@ -454,7 +448,7 @@ class PullRemoteCursor : public Cursor { if (input_cursor_ && input_cursor_->Pull(frame, context)) { VLOG(10) << "[PullRemoteCursor] [" << context.db_accessor->transaction_id() << "] [" - << self_.plan_id_ << "] [" << command_id_ + << self_.plan_id_ << "] [" << command_id() << "] producing local results "; return true; } @@ -465,7 +459,7 @@ class PullRemoteCursor : public Cursor { int worker_id = remote_puller_.GetWorkerId(last_pulled_worker_id_index_); VLOG(10) << "[PullRemoteCursor] [" << context.db_accessor->transaction_id() << "] [" - << self_.plan_id_ << "] [" << command_id_ + << self_.plan_id_ << "] [" << command_id() << "] producing results from worker " << worker_id; auto result = remote_puller_.PopResultFromWorker(worker_id); for (size_t i = 0; i < self_.symbols_.size(); ++i) { @@ -487,41 +481,41 @@ class PullRemoteCursor : public Cursor { } private: + tx::CommandId command_id() const { return remote_puller_.command_id(); } + const PullRemote &self_; const UniqueCursorPtr input_cursor_; - tx::CommandId command_id_; RemotePuller remote_puller_; int last_pulled_worker_id_index_ = 0; }; class SynchronizeCursor : public Cursor { public: - SynchronizeCursor(const Synchronize &self, database::GraphDbAccessor *db, - utils::MemoryResource *mem) + SynchronizeCursor(const Synchronize &self, utils::MemoryResource *mem) : self_(self), - pull_clients_( - // TODO: Pass in a Master GraphDb. - &dynamic_cast<database::Master *>(&db->db())->pull_clients()), - updates_clients_( - // TODO: Pass in a Master GraphDb. - &dynamic_cast<database::Master *>(&db->db())->updates_clients()), - updates_server_( - // TODO: Pass in a Master GraphDb. - &dynamic_cast<database::Master *>(&db->db())->updates_server()), - input_cursor_(self.input()->MakeCursor(db, mem)), - pull_remote_cursor_(self.pull_remote_ - ? self.pull_remote_->MakeCursor(db, mem) - : nullptr), - command_id_(db->transaction().cid()), - master_id_( - // TODO: Pass in a Master GraphDb. - dynamic_cast<database::Master *>(&db->db())->WorkerId()) {} + input_cursor_(self.input()->MakeCursor(mem)), + pull_remote_cursor_( + self.pull_remote_ ? self.pull_remote_->MakeCursor(mem) : nullptr) {} bool Pull(Frame &frame, ExecutionContext &context) override { if (!initial_pull_done_) { + // TODO: Pass in a Master GraphDb. + pull_clients_ = + &dynamic_cast<database::Master *>(&context.db_accessor->db()) + ->pull_clients(); + updates_clients_ = + &dynamic_cast<database::Master *>(&context.db_accessor->db()) + ->updates_clients(); + updates_server_ = + &dynamic_cast<database::Master *>(&context.db_accessor->db()) + ->updates_server(); + command_id_ = context.db_accessor->transaction().cid(); + master_id_ = dynamic_cast<database::Master *>(&context.db_accessor->db()) + ->WorkerId(); InitialPull(frame, context); initial_pull_done_ = true; } + // Yield local stuff while available. if (!local_frames_.empty()) { VLOG(10) << "[SynchronizeCursor] [" @@ -667,15 +661,10 @@ class SynchronizeCursor : public Cursor { class PullRemoteOrderByCursor : public Cursor { public: PullRemoteOrderByCursor(const PullRemoteOrderBy &self, - database::GraphDbAccessor *db, utils::MemoryResource *mem) : self_(self), - input_(self.input()->MakeCursor(db, mem)), - command_id_(db->transaction().cid()), - remote_puller_( - // TODO: Pass in a Master GraphDb. - &dynamic_cast<database::Master *>(&db->db())->pull_clients(), *db, - self.symbols_, self.plan_id_, command_id_) {} + input_(self.input()->MakeCursor(mem)), + remote_puller_(self.symbols_, self.plan_id_) {} bool Pull(Frame &frame, ExecutionContext &context) override { if (context.db_accessor->should_abort()) throw HintedAbortError(); @@ -700,10 +689,10 @@ class PullRemoteOrderByCursor : public Cursor { }; if (!merge_initialized_) { + remote_puller_.Initialize(context); VLOG(10) << "[PullRemoteOrderBy] [" << context.db_accessor->transaction_id() << "] [" - << self_.plan_id_ << "] [" << command_id_ << "] initialize"; - remote_puller_.Initialize(context); + << self_.plan_id_ << "] [" << command_id() << "] initialize"; missing_results_from_ = remote_puller_.Workers(); missing_master_result_ = true; merge_initialized_ = true; @@ -739,7 +728,7 @@ class PullRemoteOrderByCursor : public Cursor { if (!has_all_result) { VLOG(10) << "[PullRemoteOrderByCursor] [" << context.db_accessor->transaction_id() << "] [" - << self_.plan_id_ << "] [" << command_id_ + << self_.plan_id_ << "] [" << command_id() << "] missing results, sleep"; // If we don't have results from all workers, sleep before continuing. std::this_thread::sleep_for( @@ -772,14 +761,14 @@ class PullRemoteOrderByCursor : public Cursor { if (result_it->worker_id) { VLOG(10) << "[PullRemoteOrderByCursor] [" << context.db_accessor->transaction_id() << "] [" - << self_.plan_id_ << "] [" << command_id_ + << self_.plan_id_ << "] [" << command_id() << "] producing results from worker " << result_it->worker_id.value(); missing_results_from_.push_back(result_it->worker_id.value()); } else { VLOG(10) << "[PullRemoteOrderByCursor] [" << context.db_accessor->transaction_id() << "] [" - << self_.plan_id_ << "] [" << command_id_ + << self_.plan_id_ << "] [" << command_id() << "] producing local results"; missing_master_result_ = true; } @@ -809,9 +798,10 @@ class PullRemoteOrderByCursor : public Cursor { std::vector<TypedValue> order_by; }; + tx::CommandId command_id() const { return remote_puller_.command_id(); } + const PullRemoteOrderBy &self_; UniqueCursorPtr input_; - tx::CommandId command_id_; RemotePuller remote_puller_; std::vector<MergeResultItem> merge_; std::vector<int> missing_results_from_; @@ -822,9 +812,8 @@ class PullRemoteOrderByCursor : public Cursor { class DistributedExpandCursor : public query::plan::Cursor { public: DistributedExpandCursor(const DistributedExpand *self, - database::GraphDbAccessor *db, utils::MemoryResource *mem) - : input_cursor_(self->input()->MakeCursor(db, mem)), self_(self) {} + : input_cursor_(self->input()->MakeCursor(mem)), self_(self) {} bool Pull(Frame &frame, ExecutionContext &context) override { // A helper function for expanding a node from an edge. @@ -1065,11 +1054,8 @@ class DistributedExpandCursor : public query::plan::Cursor { class DistributedExpandBfsCursor : public query::plan::Cursor { public: DistributedExpandBfsCursor(const DistributedExpandBfs &self, - database::GraphDbAccessor *db, utils::MemoryResource *mem) - : self_(self), - db_(*db), - input_cursor_(self_.input()->MakeCursor(db, mem)) {} + : self_(self), input_cursor_(self_.input()->MakeCursor(mem)) {} void InitSubcursors(database::GraphDbAccessor *dba, const query::SymbolTable &symbol_table, @@ -1105,8 +1091,8 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { if (!skip_rest_) { if (current_depth_ >= lower_bound_) { for (; pull_pos_ != subcursor_ids_.end(); ++pull_pos_) { - auto vertex = bfs_subcursor_clients_->Pull(pull_pos_->first, - pull_pos_->second, &db_); + auto vertex = bfs_subcursor_clients_->Pull( + pull_pos_->first, pull_pos_->second, context.db_accessor); if (vertex) { last_vertex = *vertex; break; @@ -1149,9 +1135,11 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { "should be set during path reconstruction"; auto ret = current_edge_addr ? bfs_subcursor_clients_->ReconstructPath( - subcursor_ids_, *current_edge_addr, &db_) + subcursor_ids_, *current_edge_addr, + context.db_accessor) : bfs_subcursor_clients_->ReconstructPath( - subcursor_ids_, *current_vertex_addr, &db_); + subcursor_ids_, *current_vertex_addr, + context.db_accessor); for (const auto &edge : ret.edges) edges.emplace_back(edge); current_vertex_addr = ret.next_vertex; current_edge_addr = ret.next_edge; @@ -1224,7 +1212,6 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { private: const DistributedExpandBfs &self_; - database::GraphDbAccessor &db_; distributed::BfsRpcClients *bfs_subcursor_clients_{nullptr}; UniqueCursorPtr input_cursor_; @@ -1295,19 +1282,16 @@ VertexAccessor &CreateVertexOnWorker(int worker_id, class DistributedCreateNodeCursor : public query::plan::Cursor { public: DistributedCreateNodeCursor(const DistributedCreateNode *self, - database::GraphDbAccessor *dba, utils::MemoryResource *mem) - : input_cursor_(self->input()->MakeCursor(dba, mem)), - db_(&dba->db()), + : input_cursor_(self->input()->MakeCursor(mem)), node_info_(self->node_info_), - on_random_worker_(self->on_random_worker_) { - CHECK(db_); - } + on_random_worker_(self->on_random_worker_) {} bool Pull(Frame &frame, ExecutionContext &context) override { if (input_cursor_->Pull(frame, context)) { if (on_random_worker_) { - CreateVertexOnWorker(RandomWorkerId(*db_), node_info_, frame, context); + CreateVertexOnWorker(RandomWorkerId(context.db_accessor->db()), + node_info_, frame, context); } else { CreateLocalVertex(node_info_, &frame, context); } @@ -1322,7 +1306,6 @@ class DistributedCreateNodeCursor : public query::plan::Cursor { private: UniqueCursorPtr input_cursor_; - database::GraphDb *db_{nullptr}; NodeCreationInfo node_info_; bool on_random_worker_{false}; }; @@ -1330,13 +1313,8 @@ class DistributedCreateNodeCursor : public query::plan::Cursor { class DistributedCreateExpandCursor : public query::plan::Cursor { public: DistributedCreateExpandCursor(const DistributedCreateExpand *self, - database::GraphDbAccessor *dba, utils::MemoryResource *mem) - : input_cursor_(self->input()->MakeCursor(dba, mem)), - self_(self), - db_(&dba->db()) { - CHECK(db_); - } + : input_cursor_(self->input()->MakeCursor(mem)), self_(self) {} bool Pull(Frame &frame, ExecutionContext &context) override { if (!input_cursor_->Pull(frame, context)) return false; @@ -1408,44 +1386,41 @@ class DistributedCreateExpandCursor : public query::plan::Cursor { private: UniqueCursorPtr input_cursor_; const DistributedCreateExpand *self_{nullptr}; - database::GraphDb *db_{nullptr}; }; } // namespace -UniqueCursorPtr PullRemote::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<PullRemoteCursor>(mem, *this, db, mem); +UniqueCursorPtr PullRemote::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<PullRemoteCursor>(mem, *this, mem); } -UniqueCursorPtr Synchronize::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<SynchronizeCursor>(mem, *this, db, mem); +UniqueCursorPtr Synchronize::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<SynchronizeCursor>(mem, *this, mem); } UniqueCursorPtr PullRemoteOrderBy::MakeCursor( - database::GraphDbAccessor *db, utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<PullRemoteOrderByCursor>(mem, *this, db, mem); + utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<PullRemoteOrderByCursor>(mem, *this, mem); } UniqueCursorPtr DistributedExpand::MakeCursor( - database::GraphDbAccessor *db, utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<DistributedExpandCursor>(mem, this, db, mem); + utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<DistributedExpandCursor>(mem, this, mem); } UniqueCursorPtr DistributedExpandBfs::MakeCursor( - database::GraphDbAccessor *db, utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<DistributedExpandBfsCursor>(mem, *this, db, mem); + utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<DistributedExpandBfsCursor>(mem, *this, mem); } UniqueCursorPtr DistributedCreateNode::MakeCursor( - database::GraphDbAccessor *db, utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<DistributedCreateNodeCursor>(mem, this, db, mem); + utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<DistributedCreateNodeCursor>(mem, this, mem); } UniqueCursorPtr DistributedCreateExpand::MakeCursor( - database::GraphDbAccessor *db, utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<DistributedCreateExpandCursor>(mem, this, db, mem); + utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<DistributedCreateExpandCursor>(mem, this, mem); } } // namespace query::plan diff --git a/src/query/distributed/plan/ops.lcp b/src/query/distributed/plan/ops.lcp index ab88c5442..db6240dfc 100644 --- a/src/query/distributed/plan/ops.lcp +++ b/src/query/distributed/plan/ops.lcp @@ -68,8 +68,7 @@ time on data transfer. It gives no guarantees on result order.") const std::vector<Symbol> &symbols) : input_(input), plan_id_(plan_id), symbols_(symbols) {} bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> OutputSymbols(const SymbolTable &) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; @@ -138,8 +137,7 @@ Logic of the synchronize operator is: pull_remote_(pull_remote), advance_command_(advance_command) {} bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; std::vector<Symbol> OutputSymbols( @@ -179,8 +177,7 @@ by having only one result from each worker.") const std::shared_ptr<LogicalOperator> &input, int64_t plan_id, const std::vector<SortItem> &order_by, const std::vector<Symbol> &symbols); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; std::vector<Symbol> OutputSymbols(const SymbolTable &) const override; @@ -216,8 +213,7 @@ by having only one result from each worker.") Symbol input_symbol, const ExpandCommon &common); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -266,8 +262,7 @@ by having only one result from each worker.") const ExpansionLambda &filter_lambda); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -301,8 +296,7 @@ by having only one result from each worker.") const NodeCreationInfo &node_info, bool on_random_worker); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -347,8 +341,7 @@ by having only one result from each worker.") const std::shared_ptr<LogicalOperator> &input, Symbol input_symbol, bool existing_node); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 49f6e96c1..61753ebca 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -964,8 +964,7 @@ Interpreter::Results Interpreter::operator()( output_symbols, [cypher_query_plan](Frame *frame, ExecutionContext *context) { utils::MonotonicBufferResource execution_memory(1 * 1024 * 1024); - auto cursor = cypher_query_plan->plan().MakeCursor( - context->db_accessor, &execution_memory); + auto cursor = cypher_query_plan->plan().MakeCursor(&execution_memory); // We are pulling from another plan, so set up the EvaluationContext // correctly. The rest of the context should be good for sharing. diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index ba45a12b1..c8321f01b 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -95,8 +95,7 @@ class Interpreter { plan_(plan), execution_memory_(std::make_unique<utils::MonotonicBufferResource>( kExecutionMemoryBlockSize)), - cursor_( - plan_->plan().MakeCursor(db_accessor, execution_memory_.get())), + cursor_(plan_->plan().MakeCursor(execution_memory_.get())), frame_(plan_->symbol_table().max_position(), execution_memory_.get()), output_symbols_(output_symbols), header_(header), diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 80e98860a..11994af1c 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -106,8 +106,7 @@ bool Once::OnceCursor::Pull(Frame &, ExecutionContext &context) { return false; } -UniqueCursorPtr Once::MakeCursor(database::GraphDbAccessor *, - utils::MemoryResource *mem) const { +UniqueCursorPtr Once::MakeCursor(utils::MemoryResource *mem) const { return MakeUniqueCursorPtr<OnceCursor>(mem); } @@ -144,9 +143,8 @@ VertexAccessor &CreateLocalVertex(const NodeCreationInfo &node_info, ACCEPT_WITH_INPUT(CreateNode) -UniqueCursorPtr CreateNode::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<CreateNodeCursor>(mem, *this, db, mem); +UniqueCursorPtr CreateNode::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<CreateNodeCursor>(mem, *this, mem); } std::vector<Symbol> CreateNode::ModifiedSymbols( @@ -157,9 +155,8 @@ std::vector<Symbol> CreateNode::ModifiedSymbols( } CreateNode::CreateNodeCursor::CreateNodeCursor(const CreateNode &self, - database::GraphDbAccessor *db, utils::MemoryResource *mem) - : self_(self), input_cursor_(self.input_->MakeCursor(db, mem)) {} + : self_(self), input_cursor_(self.input_->MakeCursor(mem)) {} bool CreateNode::CreateNodeCursor::Pull(Frame &frame, ExecutionContext &context) { @@ -189,9 +186,8 @@ CreateExpand::CreateExpand(const NodeCreationInfo &node_info, ACCEPT_WITH_INPUT(CreateExpand) -UniqueCursorPtr CreateExpand::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<CreateExpandCursor>(mem, *this, db, mem); +UniqueCursorPtr CreateExpand::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<CreateExpandCursor>(mem, *this, mem); } std::vector<Symbol> CreateExpand::ModifiedSymbols( @@ -202,10 +198,9 @@ std::vector<Symbol> CreateExpand::ModifiedSymbols( return symbols; } -CreateExpand::CreateExpandCursor::CreateExpandCursor( - const CreateExpand &self, database::GraphDbAccessor *db, - utils::MemoryResource *mem) - : self_(self), db_(*db), input_cursor_(self.input_->MakeCursor(db, mem)) {} +CreateExpand::CreateExpandCursor::CreateExpandCursor(const CreateExpand &self, + utils::MemoryResource *mem) + : self_(self), input_cursor_(self.input_->MakeCursor(mem)) {} namespace { @@ -245,19 +240,20 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, v2.SwitchNew(); // create an edge between the two nodes + auto *dba = context.db_accessor; switch (self_.edge_info_.direction) { case EdgeAtom::Direction::IN: - CreateEdge(self_.edge_info_, &db_, &v2, &v1, &frame, &evaluator); + CreateEdge(self_.edge_info_, dba, &v2, &v1, &frame, &evaluator); break; case EdgeAtom::Direction::OUT: - CreateEdge(self_.edge_info_, &db_, &v1, &v2, &frame, &evaluator); + CreateEdge(self_.edge_info_, dba, &v1, &v2, &frame, &evaluator); break; case EdgeAtom::Direction::BOTH: // in the case of an undirected CreateExpand we choose an arbitrary // direction. this is used in the MERGE clause // it is not allowed in the CREATE clause, and the semantic // checker needs to ensure it doesn't reach this point - CreateEdge(self_.edge_info_, &db_, &v1, &v2, &frame, &evaluator); + CreateEdge(self_.edge_info_, dba, &v1, &v2, &frame, &evaluator); } return true; @@ -283,17 +279,15 @@ template <class TVerticesFun> class ScanAllCursor : public Cursor { public: explicit ScanAllCursor(Symbol output_symbol, UniqueCursorPtr &&input_cursor, - TVerticesFun &&get_vertices, - database::GraphDbAccessor &db) + TVerticesFun &&get_vertices) : output_symbol_(output_symbol), input_cursor_(std::move(input_cursor)), - get_vertices_(std::move(get_vertices)), - db_(db) {} + get_vertices_(std::move(get_vertices)) {} bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP("ScanAll"); - if (db_.should_abort()) throw HintedAbortError(); + if (context.db_accessor->should_abort()) throw HintedAbortError(); while (!vertices_ || vertices_it_.value() == vertices_.value().end()) { if (!input_cursor_->Pull(frame, context)) return false; @@ -328,7 +322,6 @@ class ScanAllCursor : public Cursor { Frame &, ExecutionContext &)>::type::value_type> vertices_; std::optional<decltype(vertices_.value().begin())> vertices_it_; - database::GraphDbAccessor &db_; }; ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, @@ -339,14 +332,13 @@ ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, ACCEPT_WITH_INPUT(ScanAll) -UniqueCursorPtr ScanAll::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - auto vertices = [this, db](Frame &, ExecutionContext &) { +UniqueCursorPtr ScanAll::MakeCursor(utils::MemoryResource *mem) const { + auto vertices = [this](Frame &, ExecutionContext &context) { + auto *db = context.db_accessor; return std::make_optional(db->Vertices(graph_view_ == GraphView::NEW)); }; return MakeUniqueCursorPtr<ScanAllCursor<decltype(vertices)>>( - mem, output_symbol_, input_->MakeCursor(db, mem), std::move(vertices), - *db); + mem, output_symbol_, input_->MakeCursor(mem), std::move(vertices)); } std::vector<Symbol> ScanAll::ModifiedSymbols(const SymbolTable &table) const { @@ -362,15 +354,14 @@ ScanAllByLabel::ScanAllByLabel(const std::shared_ptr<LogicalOperator> &input, ACCEPT_WITH_INPUT(ScanAllByLabel) -UniqueCursorPtr ScanAllByLabel::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - auto vertices = [this, db](Frame &, ExecutionContext &) { +UniqueCursorPtr ScanAllByLabel::MakeCursor(utils::MemoryResource *mem) const { + auto vertices = [this](Frame &, ExecutionContext &context) { + auto *db = context.db_accessor; return std::make_optional( db->Vertices(label_, graph_view_ == GraphView::NEW)); }; return MakeUniqueCursorPtr<ScanAllCursor<decltype(vertices)>>( - mem, output_symbol_, input_->MakeCursor(db, mem), std::move(vertices), - *db); + mem, output_symbol_, input_->MakeCursor(mem), std::move(vertices)); } ScanAllByLabelPropertyRange::ScanAllByLabelPropertyRange( @@ -390,10 +381,11 @@ ScanAllByLabelPropertyRange::ScanAllByLabelPropertyRange( ACCEPT_WITH_INPUT(ScanAllByLabelPropertyRange) UniqueCursorPtr ScanAllByLabelPropertyRange::MakeCursor( - database::GraphDbAccessor *db, utils::MemoryResource *mem) const { - auto vertices = [this, db](Frame &frame, ExecutionContext &context) - -> std::optional<decltype( - db->Vertices(label_, property_, std::nullopt, std::nullopt, false))> { + utils::MemoryResource *mem) const { + auto vertices = [this](Frame &frame, ExecutionContext &context) + -> std::optional<decltype(context.db_accessor->Vertices( + label_, property_, std::nullopt, std::nullopt, false))> { + auto *db = context.db_accessor; ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, graph_view_); @@ -421,8 +413,7 @@ UniqueCursorPtr ScanAllByLabelPropertyRange::MakeCursor( graph_view_ == GraphView::NEW)); }; return MakeUniqueCursorPtr<ScanAllCursor<decltype(vertices)>>( - mem, output_symbol_, input_->MakeCursor(db, mem), std::move(vertices), - *db); + mem, output_symbol_, input_->MakeCursor(mem), std::move(vertices)); } ScanAllByLabelPropertyValue::ScanAllByLabelPropertyValue( @@ -441,10 +432,11 @@ ScanAllByLabelPropertyValue::ScanAllByLabelPropertyValue( ACCEPT_WITH_INPUT(ScanAllByLabelPropertyValue) UniqueCursorPtr ScanAllByLabelPropertyValue::MakeCursor( - database::GraphDbAccessor *db, utils::MemoryResource *mem) const { - auto vertices = [this, db](Frame &frame, ExecutionContext &context) - -> std::optional<decltype( - db->Vertices(label_, property_, PropertyValue::Null, false))> { + utils::MemoryResource *mem) const { + auto vertices = [this](Frame &frame, ExecutionContext &context) + -> std::optional<decltype(context.db_accessor->Vertices( + label_, property_, PropertyValue::Null, false))> { + auto *db = context.db_accessor; ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, graph_view_); @@ -459,8 +451,7 @@ UniqueCursorPtr ScanAllByLabelPropertyValue::MakeCursor( graph_view_ == GraphView::NEW)); }; return MakeUniqueCursorPtr<ScanAllCursor<decltype(vertices)>>( - mem, output_symbol_, input_->MakeCursor(db, mem), std::move(vertices), - *db); + mem, output_symbol_, input_->MakeCursor(mem), std::move(vertices)); } namespace { @@ -485,9 +476,8 @@ Expand::Expand(const std::shared_ptr<LogicalOperator> &input, ACCEPT_WITH_INPUT(Expand) -UniqueCursorPtr Expand::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<ExpandCursor>(mem, *this, db, mem); +UniqueCursorPtr Expand::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<ExpandCursor>(mem, *this, mem); } std::vector<Symbol> Expand::ModifiedSymbols(const SymbolTable &table) const { @@ -498,9 +488,8 @@ std::vector<Symbol> Expand::ModifiedSymbols(const SymbolTable &table) const { } Expand::ExpandCursor::ExpandCursor(const Expand &self, - database::GraphDbAccessor *db, utils::MemoryResource *mem) - : self_(self), input_cursor_(self.input_->MakeCursor(db, mem)), db_(*db) {} + : self_(self), input_cursor_(self.input_->MakeCursor(mem)) {} bool Expand::ExpandCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("Expand"); @@ -522,7 +511,7 @@ bool Expand::ExpandCursor::Pull(Frame &frame, ExecutionContext &context) { }; while (true) { - if (db_.should_abort()) throw HintedAbortError(); + if (context.db_accessor->should_abort()) throw HintedAbortError(); // attempt to get a value from the incoming edges if (in_edges_ && *in_edges_it_ != in_edges_->end()) { auto edge = *(*in_edges_it_)++; @@ -711,11 +700,9 @@ auto ExpandFromVertex(const VertexAccessor &vertex, class ExpandVariableCursor : public Cursor { public: - ExpandVariableCursor(const ExpandVariable &self, - database::GraphDbAccessor *db, - utils::MemoryResource *mem) + ExpandVariableCursor(const ExpandVariable &self, utils::MemoryResource *mem) : self_(self), - input_cursor_(self.input_->MakeCursor(db, mem)), + input_cursor_(self.input_->MakeCursor(mem)), edges_(mem), edges_it_(mem) {} @@ -954,10 +941,8 @@ class ExpandVariableCursor : public Cursor { class STShortestPathCursor : public query::plan::Cursor { public: - STShortestPathCursor(const ExpandVariable &self, - database::GraphDbAccessor *dba, - utils::MemoryResource *mem) - : self_(self), input_cursor_(self_.input()->MakeCursor(dba, mem)) { + STShortestPathCursor(const ExpandVariable &self, utils::MemoryResource *mem) + : self_(self), input_cursor_(self_.input()->MakeCursor(mem)) { CHECK(self_.common_.existing_node) << "s-t shortest path algorithm should only " "be used when `existing_node` flag is " @@ -1197,10 +1182,9 @@ class STShortestPathCursor : public query::plan::Cursor { class SingleSourceShortestPathCursor : public query::plan::Cursor { public: SingleSourceShortestPathCursor(const ExpandVariable &self, - database::GraphDbAccessor *db, utils::MemoryResource *mem) : self_(self), - input_cursor_(self_.input()->MakeCursor(db, mem)), + input_cursor_(self_.input()->MakeCursor(mem)), processed_(mem), to_visit_current_(mem), to_visit_next_(mem) { @@ -1367,10 +1351,9 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { class ExpandWeightedShortestPathCursor : public query::plan::Cursor { public: ExpandWeightedShortestPathCursor(const ExpandVariable &self, - database::GraphDbAccessor *db, utils::MemoryResource *mem) : self_(self), - input_cursor_(self_.input_->MakeCursor(db, mem)), + input_cursor_(self_.input_->MakeCursor(mem)), total_cost_(mem), previous_(mem), yielded_vertices_(mem), @@ -1619,21 +1602,20 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { } }; -UniqueCursorPtr ExpandVariable::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { +UniqueCursorPtr ExpandVariable::MakeCursor(utils::MemoryResource *mem) const { switch (type_) { case EdgeAtom::Type::BREADTH_FIRST: if (common_.existing_node) { - return MakeUniqueCursorPtr<STShortestPathCursor>(mem, *this, db, mem); + return MakeUniqueCursorPtr<STShortestPathCursor>(mem, *this, mem); } else { return MakeUniqueCursorPtr<SingleSourceShortestPathCursor>(mem, *this, - db, mem); + mem); } case EdgeAtom::Type::DEPTH_FIRST: - return MakeUniqueCursorPtr<ExpandVariableCursor>(mem, *this, db, mem); + return MakeUniqueCursorPtr<ExpandVariableCursor>(mem, *this, mem); case EdgeAtom::Type::WEIGHTED_SHORTEST_PATH: return MakeUniqueCursorPtr<ExpandWeightedShortestPathCursor>(mem, *this, - db, mem); + mem); case EdgeAtom::Type::SINGLE: LOG(FATAL) << "ExpandVariable should not be planned for a single expansion!"; @@ -1643,9 +1625,8 @@ UniqueCursorPtr ExpandVariable::MakeCursor(database::GraphDbAccessor *db, class ConstructNamedPathCursor : public Cursor { public: ConstructNamedPathCursor(const ConstructNamedPath &self, - database::GraphDbAccessor *db, utils::MemoryResource *mem) - : self_(self), input_cursor_(self_.input()->MakeCursor(db, mem)) {} + : self_(self), input_cursor_(self_.input()->MakeCursor(mem)) {} bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP("ConstructNamedPath"); @@ -1727,8 +1708,8 @@ class ConstructNamedPathCursor : public Cursor { ACCEPT_WITH_INPUT(ConstructNamedPath) UniqueCursorPtr ConstructNamedPath::MakeCursor( - database::GraphDbAccessor *db, utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<ConstructNamedPathCursor>(mem, *this, db, mem); + utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<ConstructNamedPathCursor>(mem, *this, mem); } std::vector<Symbol> ConstructNamedPath::ModifiedSymbols( @@ -1745,9 +1726,8 @@ Filter::Filter(const std::shared_ptr<LogicalOperator> &input, ACCEPT_WITH_INPUT(Filter) -UniqueCursorPtr Filter::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<FilterCursor>(mem, *this, db, mem); +UniqueCursorPtr Filter::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<FilterCursor>(mem, *this, mem); } std::vector<Symbol> Filter::ModifiedSymbols(const SymbolTable &table) const { @@ -1755,9 +1735,8 @@ std::vector<Symbol> Filter::ModifiedSymbols(const SymbolTable &table) const { } Filter::FilterCursor::FilterCursor(const Filter &self, - database::GraphDbAccessor *db, utils::MemoryResource *mem) - : self_(self), input_cursor_(self_.input_->MakeCursor(db, mem)) {} + : self_(self), input_cursor_(self_.input_->MakeCursor(mem)) {} bool Filter::FilterCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("Filter"); @@ -1784,9 +1763,8 @@ Produce::Produce(const std::shared_ptr<LogicalOperator> &input, ACCEPT_WITH_INPUT(Produce) -UniqueCursorPtr Produce::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<ProduceCursor>(mem, *this, db, mem); +UniqueCursorPtr Produce::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<ProduceCursor>(mem, *this, mem); } std::vector<Symbol> Produce::OutputSymbols( @@ -1803,9 +1781,8 @@ std::vector<Symbol> Produce::ModifiedSymbols(const SymbolTable &table) const { } Produce::ProduceCursor::ProduceCursor(const Produce &self, - database::GraphDbAccessor *db, utils::MemoryResource *mem) - : self_(self), input_cursor_(self_.input_->MakeCursor(db, mem)) {} + : self_(self), input_cursor_(self_.input_->MakeCursor(mem)) {} bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("Produce"); @@ -1833,9 +1810,8 @@ Delete::Delete(const std::shared_ptr<LogicalOperator> &input_, ACCEPT_WITH_INPUT(Delete) -UniqueCursorPtr Delete::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<DeleteCursor>(mem, *this, db, mem); +UniqueCursorPtr Delete::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<DeleteCursor>(mem, *this, mem); } std::vector<Symbol> Delete::ModifiedSymbols(const SymbolTable &table) const { @@ -1843,9 +1819,8 @@ std::vector<Symbol> Delete::ModifiedSymbols(const SymbolTable &table) const { } Delete::DeleteCursor::DeleteCursor(const Delete &self, - database::GraphDbAccessor *db, utils::MemoryResource *mem) - : self_(self), db_(*db), input_cursor_(self_.input_->MakeCursor(db, mem)) {} + : self_(self), input_cursor_(self_.input_->MakeCursor(mem)) {} bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("Delete"); @@ -1868,24 +1843,25 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) { expression_results.emplace_back(expression->Accept(evaluator)); } + auto &dba = *context.db_accessor; // delete edges first for (TypedValue &expression_result : expression_results) { - if (db_.should_abort()) throw HintedAbortError(); + if (dba.should_abort()) throw HintedAbortError(); if (expression_result.type() == TypedValue::Type::Edge) - db_.RemoveEdge(expression_result.Value<EdgeAccessor>()); + dba.RemoveEdge(expression_result.Value<EdgeAccessor>()); } // delete vertices for (TypedValue &expression_result : expression_results) { - if (db_.should_abort()) throw HintedAbortError(); + if (dba.should_abort()) throw HintedAbortError(); switch (expression_result.type()) { case TypedValue::Type::Vertex: { VertexAccessor &va = expression_result.Value<VertexAccessor>(); va.SwitchNew(); // necessary because an edge deletion could have // updated if (self_.detach_) - db_.DetachRemoveVertex(va); - else if (!db_.RemoveVertex(va)) + dba.DetachRemoveVertex(va); + else if (!dba.RemoveVertex(va)) throw RemoveAttachedVertexException(); break; } @@ -1915,9 +1891,8 @@ SetProperty::SetProperty(const std::shared_ptr<LogicalOperator> &input, ACCEPT_WITH_INPUT(SetProperty) -UniqueCursorPtr SetProperty::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<SetPropertyCursor>(mem, *this, db, mem); +UniqueCursorPtr SetProperty::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<SetPropertyCursor>(mem, *this, mem); } std::vector<Symbol> SetProperty::ModifiedSymbols( @@ -1926,9 +1901,8 @@ std::vector<Symbol> SetProperty::ModifiedSymbols( } SetProperty::SetPropertyCursor::SetPropertyCursor(const SetProperty &self, - database::GraphDbAccessor *db, utils::MemoryResource *mem) - : self_(self), input_cursor_(self.input_->MakeCursor(db, mem)) {} + : self_(self), input_cursor_(self.input_->MakeCursor(mem)) {} bool SetProperty::SetPropertyCursor::Pull(Frame &frame, ExecutionContext &context) { @@ -1976,9 +1950,8 @@ SetProperties::SetProperties(const std::shared_ptr<LogicalOperator> &input, ACCEPT_WITH_INPUT(SetProperties) -UniqueCursorPtr SetProperties::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<SetPropertiesCursor>(mem, *this, db, mem); +UniqueCursorPtr SetProperties::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<SetPropertiesCursor>(mem, *this, mem); } std::vector<Symbol> SetProperties::ModifiedSymbols( @@ -1987,9 +1960,59 @@ std::vector<Symbol> SetProperties::ModifiedSymbols( } SetProperties::SetPropertiesCursor::SetPropertiesCursor( - const SetProperties &self, database::GraphDbAccessor *db, - utils::MemoryResource *mem) - : self_(self), db_(*db), input_cursor_(self.input_->MakeCursor(db, mem)) {} + const SetProperties &self, utils::MemoryResource *mem) + : self_(self), input_cursor_(self.input_->MakeCursor(mem)) {} + +namespace { + +/// Helper function that sets the given values on either a Vertex or an Edge. +/// +/// @tparam TRecordAccessor Either RecordAccessor<Vertex> or +/// RecordAccessor<Edge> +template <typename TRecordAccessor> +void SetPropertiesOnRecord(database::GraphDbAccessor *dba, + TRecordAccessor *record, const TypedValue &rhs, + SetProperties::Op op) { + record->SwitchNew(); + if (op == SetProperties::Op::REPLACE) { + try { + record->PropsClear(); + } catch (const RecordDeletedError &) { + throw QueryRuntimeException( + "Trying to set properties on a deleted graph element."); + } + } + + auto set_props = [record](const auto &properties) { + try { + for (const auto &kv : properties) record->PropsSet(kv.first, kv.second); + } catch (const RecordDeletedError &) { + throw QueryRuntimeException( + "Trying to set properties on a deleted graph element."); + } + }; + + switch (rhs.type()) { + case TypedValue::Type::Edge: + set_props(rhs.Value<EdgeAccessor>().Properties()); + break; + case TypedValue::Type::Vertex: + set_props(rhs.Value<VertexAccessor>().Properties()); + break; + case TypedValue::Type::Map: { + for (const auto &kv : rhs.ValueMap()) + PropsSetChecked(record, dba->Property(std::string(kv.first)), + kv.second); + break; + } + default: + throw QueryRuntimeException( + "Right-hand side in SET expression must be a node, an edge or a " + "map."); + } +} + +} // namespace bool SetProperties::SetPropertiesCursor::Pull(Frame &frame, ExecutionContext &context) { @@ -2007,10 +2030,12 @@ bool SetProperties::SetPropertiesCursor::Pull(Frame &frame, switch (lhs.type()) { case TypedValue::Type::Vertex: - Set(lhs.Value<VertexAccessor>(), rhs); + SetPropertiesOnRecord(context.db_accessor, &lhs.Value<VertexAccessor>(), + rhs, self_.op_); break; case TypedValue::Type::Edge: - Set(lhs.Value<EdgeAccessor>(), rhs); + SetPropertiesOnRecord(context.db_accessor, &lhs.Value<EdgeAccessor>(), + rhs, self_.op_); break; case TypedValue::Type::Null: // Skip setting properties on Null (can occur in optional match). @@ -2028,55 +2053,6 @@ void SetProperties::SetPropertiesCursor::Shutdown() { void SetProperties::SetPropertiesCursor::Reset() { input_cursor_->Reset(); } -template <typename TRecordAccessor> -void SetProperties::SetPropertiesCursor::Set(TRecordAccessor &record, - const TypedValue &rhs) const { - record.SwitchNew(); - if (self_.op_ == Op::REPLACE) { - try { - record.PropsClear(); - } catch (const RecordDeletedError &) { - throw QueryRuntimeException( - "Trying to set properties on a deleted graph element."); - } - } - - auto set_props = [&record](const auto &properties) { - try { - for (const auto &kv : properties) record.PropsSet(kv.first, kv.second); - } catch (const RecordDeletedError &) { - throw QueryRuntimeException( - "Trying to set properties on a deleted graph element."); - } - }; - - switch (rhs.type()) { - case TypedValue::Type::Edge: - set_props(rhs.Value<EdgeAccessor>().Properties()); - break; - case TypedValue::Type::Vertex: - set_props(rhs.Value<VertexAccessor>().Properties()); - break; - case TypedValue::Type::Map: { - for (const auto &kv : rhs.ValueMap()) - PropsSetChecked(&record, db_.Property(std::string(kv.first)), - kv.second); - break; - } - default: - throw QueryRuntimeException( - "Right-hand side in SET expression must be a node, an edge or a " - "map."); - } -} - -// instantiate the SetProperties function with concrete TRecordAccessor -// types -template void SetProperties::SetPropertiesCursor::Set( - RecordAccessor<Vertex> &record, const TypedValue &rhs) const; -template void SetProperties::SetPropertiesCursor::Set( - RecordAccessor<Edge> &record, const TypedValue &rhs) const; - SetLabels::SetLabels(const std::shared_ptr<LogicalOperator> &input, Symbol input_symbol, const std::vector<storage::Label> &labels) @@ -2084,9 +2060,8 @@ SetLabels::SetLabels(const std::shared_ptr<LogicalOperator> &input, ACCEPT_WITH_INPUT(SetLabels) -UniqueCursorPtr SetLabels::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<SetLabelsCursor>(mem, *this, db, mem); +UniqueCursorPtr SetLabels::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<SetLabelsCursor>(mem, *this, mem); } std::vector<Symbol> SetLabels::ModifiedSymbols(const SymbolTable &table) const { @@ -2094,9 +2069,8 @@ std::vector<Symbol> SetLabels::ModifiedSymbols(const SymbolTable &table) const { } SetLabels::SetLabelsCursor::SetLabelsCursor(const SetLabels &self, - database::GraphDbAccessor *db, utils::MemoryResource *mem) - : self_(self), input_cursor_(self.input_->MakeCursor(db, mem)) {} + : self_(self), input_cursor_(self.input_->MakeCursor(mem)) {} bool SetLabels::SetLabelsCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("SetLabels"); @@ -2128,9 +2102,8 @@ RemoveProperty::RemoveProperty(const std::shared_ptr<LogicalOperator> &input, ACCEPT_WITH_INPUT(RemoveProperty) -UniqueCursorPtr RemoveProperty::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<RemovePropertyCursor>(mem, *this, db, mem); +UniqueCursorPtr RemoveProperty::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<RemovePropertyCursor>(mem, *this, mem); } std::vector<Symbol> RemoveProperty::ModifiedSymbols( @@ -2139,9 +2112,8 @@ std::vector<Symbol> RemoveProperty::ModifiedSymbols( } RemoveProperty::RemovePropertyCursor::RemovePropertyCursor( - const RemoveProperty &self, database::GraphDbAccessor *db, - utils::MemoryResource *mem) - : self_(self), input_cursor_(self.input_->MakeCursor(db, mem)) {} + const RemoveProperty &self, utils::MemoryResource *mem) + : self_(self), input_cursor_(self.input_->MakeCursor(mem)) {} bool RemoveProperty::RemovePropertyCursor::Pull(Frame &frame, ExecutionContext &context) { @@ -2195,9 +2167,8 @@ RemoveLabels::RemoveLabels(const std::shared_ptr<LogicalOperator> &input, ACCEPT_WITH_INPUT(RemoveLabels) -UniqueCursorPtr RemoveLabels::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<RemoveLabelsCursor>(mem, *this, db, mem); +UniqueCursorPtr RemoveLabels::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<RemoveLabelsCursor>(mem, *this, mem); } std::vector<Symbol> RemoveLabels::ModifiedSymbols( @@ -2205,10 +2176,9 @@ std::vector<Symbol> RemoveLabels::ModifiedSymbols( return input_->ModifiedSymbols(table); } -RemoveLabels::RemoveLabelsCursor::RemoveLabelsCursor( - const RemoveLabels &self, database::GraphDbAccessor *db, - utils::MemoryResource *mem) - : self_(self), input_cursor_(self.input_->MakeCursor(db, mem)) {} +RemoveLabels::RemoveLabelsCursor::RemoveLabelsCursor(const RemoveLabels &self, + utils::MemoryResource *mem) + : self_(self), input_cursor_(self.input_->MakeCursor(mem)) {} bool RemoveLabels::RemoveLabelsCursor::Pull(Frame &frame, ExecutionContext &context) { @@ -2245,8 +2215,8 @@ EdgeUniquenessFilter::EdgeUniquenessFilter( ACCEPT_WITH_INPUT(EdgeUniquenessFilter) UniqueCursorPtr EdgeUniquenessFilter::MakeCursor( - database::GraphDbAccessor *db, utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<EdgeUniquenessFilterCursor>(mem, *this, db, mem); + utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<EdgeUniquenessFilterCursor>(mem, *this, mem); } std::vector<Symbol> EdgeUniquenessFilter::ModifiedSymbols( @@ -2255,9 +2225,8 @@ std::vector<Symbol> EdgeUniquenessFilter::ModifiedSymbols( } EdgeUniquenessFilter::EdgeUniquenessFilterCursor::EdgeUniquenessFilterCursor( - const EdgeUniquenessFilter &self, database::GraphDbAccessor *db, - utils::MemoryResource *mem) - : self_(self), input_cursor_(self.input_->MakeCursor(db, mem)) {} + const EdgeUniquenessFilter &self, utils::MemoryResource *mem) + : self_(self), input_cursor_(self.input_->MakeCursor(mem)) {} namespace { /** @@ -2321,16 +2290,13 @@ std::vector<Symbol> Accumulate::ModifiedSymbols(const SymbolTable &) const { class AccumulateCursor : public Cursor { public: - AccumulateCursor(const Accumulate &self, database::GraphDbAccessor *db, - utils::MemoryResource *mem) - : self_(self), - db_(*db), - input_cursor_(self.input_->MakeCursor(db, mem)), - cache_(mem) {} + AccumulateCursor(const Accumulate &self, utils::MemoryResource *mem) + : self_(self), input_cursor_(self.input_->MakeCursor(mem)), cache_(mem) {} bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP("Accumulate"); + auto &dba = *context.db_accessor; // cache all the input if (!pulled_all_input_) { while (input_cursor_->Pull(frame, context)) { @@ -2345,13 +2311,13 @@ class AccumulateCursor : public Cursor { cache_it_ = cache_.begin(); if (self_.advance_command_) { - db_.AdvanceCommand(); + dba.AdvanceCommand(); for (auto &row : cache_) for (auto &col : row) query::ReconstructTypedValue(col); } } - if (db_.should_abort()) throw HintedAbortError(); + if (dba.should_abort()) throw HintedAbortError(); if (cache_it_ == cache_.end()) return false; auto row_it = (cache_it_++)->begin(); for (const Symbol &symbol : self_.symbols_) frame[symbol] = *row_it++; @@ -2369,7 +2335,6 @@ class AccumulateCursor : public Cursor { private: const Accumulate &self_; - database::GraphDbAccessor &db_; const UniqueCursorPtr input_cursor_; std::vector< std::vector<TypedValue, utils::Allocator<TypedValue>>, @@ -2379,9 +2344,8 @@ class AccumulateCursor : public Cursor { bool pulled_all_input_{false}; }; -UniqueCursorPtr Accumulate::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<AccumulateCursor>(mem, *this, db, mem); +UniqueCursorPtr Accumulate::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<AccumulateCursor>(mem, *this, mem); } Aggregate::Aggregate(const std::shared_ptr<LogicalOperator> &input, @@ -2426,10 +2390,9 @@ TypedValue DefaultAggregationOpValue(const Aggregate::Element &element, class AggregateCursor : public Cursor { public: - AggregateCursor(const Aggregate &self, database::GraphDbAccessor *db, - utils::MemoryResource *mem) + AggregateCursor(const Aggregate &self, utils::MemoryResource *mem) : self_(self), - input_cursor_(self_.input_->MakeCursor(db, mem)), + input_cursor_(self_.input_->MakeCursor(mem)), aggregation_(mem) {} bool Pull(Frame &frame, ExecutionContext &context) override { @@ -2730,9 +2693,8 @@ class AggregateCursor : public Cursor { } }; -UniqueCursorPtr Aggregate::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<AggregateCursor>(mem, *this, db, mem); +UniqueCursorPtr Aggregate::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<AggregateCursor>(mem, *this, mem); } Skip::Skip(const std::shared_ptr<LogicalOperator> &input, @@ -2741,9 +2703,8 @@ Skip::Skip(const std::shared_ptr<LogicalOperator> &input, ACCEPT_WITH_INPUT(Skip) -UniqueCursorPtr Skip::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<SkipCursor>(mem, *this, db, mem); +UniqueCursorPtr Skip::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<SkipCursor>(mem, *this, mem); } std::vector<Symbol> Skip::OutputSymbols(const SymbolTable &symbol_table) const { @@ -2755,9 +2716,8 @@ std::vector<Symbol> Skip::ModifiedSymbols(const SymbolTable &table) const { return input_->ModifiedSymbols(table); } -Skip::SkipCursor::SkipCursor(const Skip &self, database::GraphDbAccessor *db, - utils::MemoryResource *mem) - : self_(self), input_cursor_(self_.input_->MakeCursor(db, mem)) {} +Skip::SkipCursor::SkipCursor(const Skip &self, utils::MemoryResource *mem) + : self_(self), input_cursor_(self_.input_->MakeCursor(mem)) {} bool Skip::SkipCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("Skip"); @@ -2801,9 +2761,8 @@ Limit::Limit(const std::shared_ptr<LogicalOperator> &input, ACCEPT_WITH_INPUT(Limit) -UniqueCursorPtr Limit::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<LimitCursor>(mem, *this, db, mem); +UniqueCursorPtr Limit::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<LimitCursor>(mem, *this, mem); } std::vector<Symbol> Limit::OutputSymbols( @@ -2816,10 +2775,8 @@ std::vector<Symbol> Limit::ModifiedSymbols(const SymbolTable &table) const { return input_->ModifiedSymbols(table); } -Limit::LimitCursor::LimitCursor(const Limit &self, - database::GraphDbAccessor *db, - utils::MemoryResource *mem) - : self_(self), input_cursor_(self_.input_->MakeCursor(db, mem)) {} +Limit::LimitCursor::LimitCursor(const Limit &self, utils::MemoryResource *mem) + : self_(self), input_cursor_(self_.input_->MakeCursor(mem)) {} bool Limit::LimitCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("Limit"); @@ -2888,10 +2845,9 @@ std::vector<Symbol> OrderBy::ModifiedSymbols(const SymbolTable &table) const { class OrderByCursor : public Cursor { public: - OrderByCursor(const OrderBy &self, database::GraphDbAccessor *db, - utils::MemoryResource *mem) + OrderByCursor(const OrderBy &self, utils::MemoryResource *mem) : self_(self), - input_cursor_(self_.input_->MakeCursor(db, mem)), + input_cursor_(self_.input_->MakeCursor(mem)), cache_(mem) {} bool Pull(Frame &frame, ExecutionContext &context) override { @@ -2968,9 +2924,8 @@ class OrderByCursor : public Cursor { decltype(cache_.begin()) cache_it_ = cache_.begin(); }; -UniqueCursorPtr OrderBy::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<OrderByCursor>(mem, *this, db, mem); +UniqueCursorPtr OrderBy::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<OrderByCursor>(mem, *this, mem); } Merge::Merge(const std::shared_ptr<LogicalOperator> &input, @@ -2988,9 +2943,8 @@ bool Merge::Accept(HierarchicalLogicalOperatorVisitor &visitor) { return visitor.PostVisit(*this); } -UniqueCursorPtr Merge::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<MergeCursor>(mem, *this, db, mem); +UniqueCursorPtr Merge::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<MergeCursor>(mem, *this, mem); } std::vector<Symbol> Merge::ModifiedSymbols(const SymbolTable &table) const { @@ -3002,12 +2956,10 @@ std::vector<Symbol> Merge::ModifiedSymbols(const SymbolTable &table) const { return symbols; } -Merge::MergeCursor::MergeCursor(const Merge &self, - database::GraphDbAccessor *db, - utils::MemoryResource *mem) - : input_cursor_(self.input_->MakeCursor(db, mem)), - merge_match_cursor_(self.merge_match_->MakeCursor(db, mem)), - merge_create_cursor_(self.merge_create_->MakeCursor(db, mem)) {} +Merge::MergeCursor::MergeCursor(const Merge &self, utils::MemoryResource *mem) + : input_cursor_(self.input_->MakeCursor(mem)), + merge_match_cursor_(self.merge_match_->MakeCursor(mem)), + merge_create_cursor_(self.merge_create_->MakeCursor(mem)) {} bool Merge::MergeCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("Merge"); @@ -3075,9 +3027,8 @@ bool Optional::Accept(HierarchicalLogicalOperatorVisitor &visitor) { return visitor.PostVisit(*this); } -UniqueCursorPtr Optional::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<OptionalCursor>(mem, *this, db, mem); +UniqueCursorPtr Optional::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<OptionalCursor>(mem, *this, mem); } std::vector<Symbol> Optional::ModifiedSymbols(const SymbolTable &table) const { @@ -3088,11 +3039,10 @@ std::vector<Symbol> Optional::ModifiedSymbols(const SymbolTable &table) const { } Optional::OptionalCursor::OptionalCursor(const Optional &self, - database::GraphDbAccessor *db, utils::MemoryResource *mem) : self_(self), - input_cursor_(self.input_->MakeCursor(db, mem)), - optional_cursor_(self.optional_->MakeCursor(db, mem)) {} + input_cursor_(self.input_->MakeCursor(mem)), + optional_cursor_(self.optional_->MakeCursor(mem)) {} bool Optional::OptionalCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("Optional"); @@ -3160,18 +3110,16 @@ std::vector<Symbol> Unwind::ModifiedSymbols(const SymbolTable &table) const { class UnwindCursor : public Cursor { public: - UnwindCursor(const Unwind &self, database::GraphDbAccessor *db, - utils::MemoryResource *mem) + UnwindCursor(const Unwind &self, utils::MemoryResource *mem) : self_(self), - db_(*db), - input_cursor_(self.input_->MakeCursor(db, mem)), + input_cursor_(self.input_->MakeCursor(mem)), input_value_(mem) {} bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP("Unwind"); while (true) { - if (db_.should_abort()) throw HintedAbortError(); + if (context.db_accessor->should_abort()) throw HintedAbortError(); // if we reached the end of our list of values // pull from the input if (input_value_it_ == input_value_.end()) { @@ -3209,7 +3157,6 @@ class UnwindCursor : public Cursor { private: const Unwind &self_; - database::GraphDbAccessor &db_; const UniqueCursorPtr input_cursor_; // typed values we are unwinding and yielding std::vector<TypedValue, utils::Allocator<TypedValue>> input_value_; @@ -3217,17 +3164,15 @@ class UnwindCursor : public Cursor { decltype(input_value_)::iterator input_value_it_ = input_value_.end(); }; -UniqueCursorPtr Unwind::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<UnwindCursor>(mem, *this, db, mem); +UniqueCursorPtr Unwind::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<UnwindCursor>(mem, *this, mem); } class DistinctCursor : public Cursor { public: - DistinctCursor(const Distinct &self, database::GraphDbAccessor *db, - utils::MemoryResource *mem) + DistinctCursor(const Distinct &self, utils::MemoryResource *mem) : self_(self), - input_cursor_(self.input_->MakeCursor(db, mem)), + input_cursor_(self.input_->MakeCursor(mem)), seen_rows_(mem) {} bool Pull(Frame &frame, ExecutionContext &context) override { @@ -3274,9 +3219,8 @@ Distinct::Distinct(const std::shared_ptr<LogicalOperator> &input, ACCEPT_WITH_INPUT(Distinct) -UniqueCursorPtr Distinct::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<DistinctCursor>(mem, *this, db, mem); +UniqueCursorPtr Distinct::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<DistinctCursor>(mem, *this, mem); } std::vector<Symbol> Distinct::OutputSymbols( @@ -3300,9 +3244,8 @@ Union::Union(const std::shared_ptr<LogicalOperator> &left_op, left_symbols_(left_symbols), right_symbols_(right_symbols) {} -UniqueCursorPtr Union::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<Union::UnionCursor>(mem, *this, db, mem); +UniqueCursorPtr Union::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<Union::UnionCursor>(mem, *this, mem); } bool Union::Accept(HierarchicalLogicalOperatorVisitor &visitor) { @@ -3324,12 +3267,10 @@ std::vector<Symbol> Union::ModifiedSymbols(const SymbolTable &) const { WITHOUT_SINGLE_INPUT(Union); -Union::UnionCursor::UnionCursor(const Union &self, - database::GraphDbAccessor *db, - utils::MemoryResource *mem) +Union::UnionCursor::UnionCursor(const Union &self, utils::MemoryResource *mem) : self_(self), - left_cursor_(self.left_op_->MakeCursor(db, mem)), - right_cursor_(self.right_op_->MakeCursor(db, mem)) {} + left_cursor_(self.left_op_->MakeCursor(mem)), + right_cursor_(self.right_op_->MakeCursor(mem)) {} bool Union::UnionCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("Union"); @@ -3389,13 +3330,12 @@ namespace { class CartesianCursor : public Cursor { public: - CartesianCursor(const Cartesian &self, database::GraphDbAccessor *db, - utils::MemoryResource *mem) + CartesianCursor(const Cartesian &self, utils::MemoryResource *mem) : self_(self), left_op_frames_(mem), right_op_frame_(mem), - left_op_cursor_(self.left_op_->MakeCursor(db, mem)), - right_op_cursor_(self_.right_op_->MakeCursor(db, mem)) { + left_op_cursor_(self.left_op_->MakeCursor(mem)), + right_op_cursor_(self_.right_op_->MakeCursor(mem)) { CHECK(left_op_cursor_ != nullptr) << "CartesianCursor: Missing left operator cursor."; CHECK(right_op_cursor_ != nullptr) @@ -3474,9 +3414,8 @@ class CartesianCursor : public Cursor { } // namespace -UniqueCursorPtr Cartesian::MakeCursor(database::GraphDbAccessor *db, - utils::MemoryResource *mem) const { - return MakeUniqueCursorPtr<CartesianCursor>(mem, *this, db, mem); +UniqueCursorPtr Cartesian::MakeCursor(utils::MemoryResource *mem) const { + return MakeUniqueCursorPtr<CartesianCursor>(mem, *this, mem); } OutputTable::OutputTable(std::vector<Symbol> output_symbols, @@ -3531,8 +3470,7 @@ class OutputTableCursor : public Cursor { bool pulled_{false}; }; -UniqueCursorPtr OutputTable::MakeCursor(database::GraphDbAccessor *, - utils::MemoryResource *mem) const { +UniqueCursorPtr OutputTable::MakeCursor(utils::MemoryResource *mem) const { return MakeUniqueCursorPtr<OutputTableCursor>(mem, *this); } @@ -3578,7 +3516,7 @@ class OutputTableStreamCursor : public Cursor { }; UniqueCursorPtr OutputTableStream::MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *mem) const { + utils::MemoryResource *mem) const { return MakeUniqueCursorPtr<OutputTableStreamCursor>(mem, this); } diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp index 75eba007d..d968f6bd7 100644 --- a/src/query/plan/operator.lcp +++ b/src/query/plan/operator.lcp @@ -20,10 +20,6 @@ #include "utils/hashing/fnv.hpp" #include "utils/memory.hpp" #include "utils/visitor.hpp" - -namespace database { -class GraphDbAccessor; -} cpp<# (lcp:namespace query) @@ -159,13 +155,10 @@ can serve as inputs to others and thus a sequence of operations is formed.") /** Construct a @c Cursor which is used to run this operator. * - * @param database::GraphDbAccessor Used to perform operations on the - * database. * @param utils::MemoryResource Memory resource used for allocations during * the lifetime of the returned Cursor. */ - virtual UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const = 0; + virtual UniqueCursorPtr MakeCursor(utils::MemoryResource *) const = 0; /** Return @c Symbol vector where the query results will be stored. * @@ -300,8 +293,7 @@ and false on every following Pull.") (:public #>cpp DEFVISITABLE(HierarchicalLogicalOperatorVisitor); - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override { return {}; } @@ -395,8 +387,7 @@ a preceeding `MATCH`), or multiple nodes (`MATCH ... CREATE` or CreateNode(const std::shared_ptr<LogicalOperator> &input, const NodeCreationInfo &node_info); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -409,8 +400,7 @@ a preceeding `MATCH`), or multiple nodes (`MATCH ... CREATE` or #>cpp class CreateNodeCursor : public Cursor { public: - CreateNodeCursor(const CreateNode &, database::GraphDbAccessor *, - utils::MemoryResource *); + CreateNodeCursor(const CreateNode &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -495,8 +485,7 @@ chained in cases when longer paths need creating. const std::shared_ptr<LogicalOperator> &input, Symbol input_symbol, bool existing_node); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -509,15 +498,13 @@ chained in cases when longer paths need creating. #>cpp class CreateExpandCursor : public Cursor { public: - CreateExpandCursor(const CreateExpand &, database::GraphDbAccessor *, - utils::MemoryResource *); + CreateExpandCursor(const CreateExpand &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; private: const CreateExpand &self_; - database::GraphDbAccessor &db_; const UniqueCursorPtr input_cursor_; // Get the existing node (if existing_node_ == true), or create a new node @@ -562,8 +549,7 @@ with a constructor argument. ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, GraphView graph_view = GraphView::OLD); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -591,8 +577,7 @@ given label. Symbol output_symbol, storage::Label label, GraphView graph_view = GraphView::OLD); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; cpp<#) (:serialize (:slk)) (:clone)) @@ -701,8 +686,7 @@ property value which is inside a range (inclusive or exlusive). GraphView graph_view = GraphView::OLD); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; cpp<#) (:serialize (:slk)) (:clone)) @@ -742,8 +726,7 @@ property value. GraphView graph_view = GraphView::OLD); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; cpp<#) (:serialize (:slk)) (:clone)) @@ -808,8 +791,7 @@ pulled.") Expand() {} bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -820,8 +802,7 @@ pulled.") class ExpandCursor : public Cursor { public: - ExpandCursor(const Expand &, database::GraphDbAccessor *, - utils::MemoryResource *); + ExpandCursor(const Expand &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -835,7 +816,6 @@ pulled.") const Expand &self_; const UniqueCursorPtr input_cursor_; - database::GraphDbAccessor &db_; // The iterable over edges and the current edge iterator are referenced via // optional because they can not be initialized in the constructor of @@ -959,8 +939,7 @@ pulled.") std::optional<Symbol> total_weight); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -998,8 +977,7 @@ pulled.") path_symbol_(path_symbol), path_elements_(path_elements) {} bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -1031,8 +1009,7 @@ a boolean value.") Filter(const std::shared_ptr<LogicalOperator> &input_, Expression *expression_); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -1045,8 +1022,7 @@ a boolean value.") #>cpp class FilterCursor : public Cursor { public: - FilterCursor(const Filter &, database::GraphDbAccessor *, - utils::MemoryResource *); + FilterCursor(const Filter &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -1083,8 +1059,7 @@ RETURN clause) the Produce's pull succeeds exactly once.") Produce(const std::shared_ptr<LogicalOperator> &input, const std::vector<NamedExpression *> &named_expressions); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> OutputSymbols(const SymbolTable &) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; @@ -1098,8 +1073,7 @@ RETURN clause) the Produce's pull succeeds exactly once.") #>cpp class ProduceCursor : public Cursor { public: - ProduceCursor(const Produce &, database::GraphDbAccessor *, - utils::MemoryResource *); + ProduceCursor(const Produce &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -1133,8 +1107,7 @@ Has a flag for using DETACH DELETE when deleting vertices.") Delete(const std::shared_ptr<LogicalOperator> &input_, const std::vector<Expression *> &expressions, bool detach_); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -1147,15 +1120,13 @@ Has a flag for using DETACH DELETE when deleting vertices.") #>cpp class DeleteCursor : public Cursor { public: - DeleteCursor(const Delete &, database::GraphDbAccessor *, - utils::MemoryResource *); + DeleteCursor(const Delete &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; private: const Delete &self_; - database::GraphDbAccessor &db_; const UniqueCursorPtr input_cursor_; }; cpp<#) @@ -1186,8 +1157,7 @@ can be stored (a TypedValue that can be converted to PropertyValue).") storage::Property property, PropertyLookup *lhs, Expression *rhs); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -1200,8 +1170,7 @@ can be stored (a TypedValue that can be converted to PropertyValue).") #>cpp class SetPropertyCursor : public Cursor { public: - SetPropertyCursor(const SetProperty &, database::GraphDbAccessor *, - utils::MemoryResource *); + SetPropertyCursor(const SetProperty &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -1247,8 +1216,7 @@ that the old properties are discarded and replaced with new ones.") SetProperties(const std::shared_ptr<LogicalOperator> &input, Symbol input_symbol, Expression *rhs, Op op); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -1261,24 +1229,14 @@ that the old properties are discarded and replaced with new ones.") #>cpp class SetPropertiesCursor : public Cursor { public: - SetPropertiesCursor(const SetProperties &, database::GraphDbAccessor *, - utils::MemoryResource *); + SetPropertiesCursor(const SetProperties &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; private: const SetProperties &self_; - database::GraphDbAccessor &db_; const UniqueCursorPtr input_cursor_; - - /** Helper function that sets the given values on either - * a VertexRecord or an EdgeRecord. - * @tparam TRecordAccessor Either RecordAccessor<Vertex> or - * RecordAccessor<Edge> - */ - template <typename TRecordAccessor> - void Set(TRecordAccessor &record, const TypedValue &rhs) const; }; cpp<#) (:serialize (:slk)) @@ -1301,8 +1259,7 @@ It does NOT remove labels that are already set on that Vertex.") SetLabels(const std::shared_ptr<LogicalOperator> &input, Symbol input_symbol, const std::vector<storage::Label> &labels); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -1315,8 +1272,7 @@ It does NOT remove labels that are already set on that Vertex.") #>cpp class SetLabelsCursor : public Cursor { public: - SetLabelsCursor(const SetLabels &, database::GraphDbAccessor *, - utils::MemoryResource *); + SetLabelsCursor(const SetLabels &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -1346,8 +1302,7 @@ It does NOT remove labels that are already set on that Vertex.") RemoveProperty(const std::shared_ptr<LogicalOperator> &input, storage::Property property, PropertyLookup *lhs); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -1360,8 +1315,7 @@ It does NOT remove labels that are already set on that Vertex.") #>cpp class RemovePropertyCursor : public Cursor { public: - RemovePropertyCursor(const RemoveProperty &, database::GraphDbAccessor *, - utils::MemoryResource *); + RemovePropertyCursor(const RemoveProperty &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -1391,8 +1345,7 @@ If a label does not exist on a Vertex, nothing happens.") RemoveLabels(const std::shared_ptr<LogicalOperator> &input, Symbol input_symbol, const std::vector<storage::Label> &labels); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -1405,8 +1358,7 @@ If a label does not exist on a Vertex, nothing happens.") #>cpp class RemoveLabelsCursor : public Cursor { public: - RemoveLabelsCursor(const RemoveLabels &, database::GraphDbAccessor *, - utils::MemoryResource *); + RemoveLabelsCursor(const RemoveLabels &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -1447,8 +1399,7 @@ edge lists).") Symbol expand_symbol, const std::vector<Symbol> &previous_symbols); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -1462,7 +1413,6 @@ edge lists).") class EdgeUniquenessFilterCursor : public Cursor { public: EdgeUniquenessFilterCursor(const EdgeUniquenessFilter &, - database::GraphDbAccessor *, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; @@ -1516,8 +1466,7 @@ has been cached will be reconstructed before Pull returns. Accumulate(const std::shared_ptr<LogicalOperator> &input, const std::vector<Symbol> &symbols, bool advance_command = false); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -1595,8 +1544,7 @@ elements are in an undefined state after aggregation.") const std::vector<Expression *> &group_by, const std::vector<Symbol> &remember); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -1633,8 +1581,7 @@ operator's implementation does not expect this.") Skip(const std::shared_ptr<LogicalOperator> &input, Expression *expression); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> OutputSymbols(const SymbolTable &) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; @@ -1648,8 +1595,7 @@ operator's implementation does not expect this.") #>cpp class SkipCursor : public Cursor { public: - SkipCursor(const Skip &, database::GraphDbAccessor *, - utils::MemoryResource *); + SkipCursor(const Skip &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -1694,8 +1640,7 @@ input should be performed).") Limit(const std::shared_ptr<LogicalOperator> &input, Expression *expression); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> OutputSymbols(const SymbolTable &) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; @@ -1709,8 +1654,7 @@ input should be performed).") #>cpp class LimitCursor : public Cursor { public: - LimitCursor(const Limit &, database::GraphDbAccessor *, - utils::MemoryResource *); + LimitCursor(const Limit &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -1755,8 +1699,7 @@ are valid for usage after the OrderBy operator.") const std::vector<SortItem> &order_by, const std::vector<Symbol> &output_symbols); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> OutputSymbols(const SymbolTable &) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; @@ -1799,8 +1742,7 @@ documentation.") const std::shared_ptr<LogicalOperator> &merge_match, const std::shared_ptr<LogicalOperator> &merge_create); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; // TODO: Consider whether we want to treat Merge as having single input. It @@ -1816,8 +1758,7 @@ documentation.") #>cpp class MergeCursor : public Cursor { public: - MergeCursor(const Merge &, database::GraphDbAccessor *, - utils::MemoryResource *); + MergeCursor(const Merge &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -1861,8 +1802,7 @@ and returns true, once.") const std::shared_ptr<LogicalOperator> &optional, const std::vector<Symbol> &optional_symbols); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { return true; } @@ -1875,8 +1815,7 @@ and returns true, once.") #>cpp class OptionalCursor : public Cursor { public: - OptionalCursor(const Optional &, database::GraphDbAccessor *, - utils::MemoryResource *); + OptionalCursor(const Optional &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -1916,8 +1855,7 @@ Input is optional (unwind can be the first clause in a query).") Unwind(const std::shared_ptr<LogicalOperator> &input, Expression *input_expression_, Symbol output_symbol); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override { @@ -1950,8 +1888,7 @@ This implementation maintains input ordering.") Distinct(const std::shared_ptr<LogicalOperator> &input, const std::vector<Symbol> &value_symbols); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> OutputSymbols(const SymbolTable &) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; @@ -1990,8 +1927,7 @@ of symbols used by each of the inputs.") const std::vector<Symbol> &left_symbols, const std::vector<Symbol> &right_symbols); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> OutputSymbols(const SymbolTable &) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; @@ -2003,8 +1939,7 @@ of symbols used by each of the inputs.") #>cpp class UnionCursor : public Cursor { public: - UnionCursor(const Union &, database::GraphDbAccessor *, - utils::MemoryResource *); + UnionCursor(const Union &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -2044,8 +1979,7 @@ of symbols used by each of the inputs.") right_symbols_(right_symbols) {} bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override; bool HasSingleInput() const override; @@ -2074,8 +2008,7 @@ of symbols used by each of the inputs.") LOG(FATAL) << "OutputTable operator should not be visited!"; } - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> OutputSymbols(const SymbolTable &) const override { return output_symbols_; } @@ -2109,8 +2042,7 @@ at once. Instead, each call of the callback should return a single row of the ta LOG(FATAL) << "OutputTableStream operator should not be visited!"; } - UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *, utils::MemoryResource *) const override; + UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; std::vector<Symbol> OutputSymbols(const SymbolTable &) const override { return output_symbols_; } diff --git a/tests/benchmark/query/execution.cpp b/tests/benchmark/query/execution.cpp index e324a1f01..7b113c1d0 100644 --- a/tests/benchmark/query/execution.cpp +++ b/tests/benchmark/query/execution.cpp @@ -111,7 +111,7 @@ static void Distinct(benchmark::State &state) { evaluation_context}; TMemory memory; query::Frame frame(symbol_table.max_position(), memory.get()); - auto cursor = plan_and_cost.first->MakeCursor(&dba, memory.get()); + auto cursor = plan_and_cost.first->MakeCursor(memory.get()); while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); } state.SetItemsProcessed(state.iterations()); @@ -161,7 +161,7 @@ static void ExpandVariable(benchmark::State &state) { evaluation_context}; TMemory memory; query::Frame frame(symbol_table.max_position(), memory.get()); - auto cursor = expand_variable.MakeCursor(&dba, memory.get()); + auto cursor = expand_variable.MakeCursor(memory.get()); for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { frame[expand_variable.input_symbol_] = query::TypedValue(v); while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); @@ -197,7 +197,7 @@ static void ExpandBfs(benchmark::State &state) { evaluation_context}; TMemory memory; query::Frame frame(symbol_table.max_position(), memory.get()); - auto cursor = expand_variable.MakeCursor(&dba, memory.get()); + auto cursor = expand_variable.MakeCursor(memory.get()); for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { frame[expand_variable.input_symbol_] = query::TypedValue(v); while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); @@ -235,7 +235,7 @@ static void ExpandShortest(benchmark::State &state) { evaluation_context}; TMemory memory; query::Frame frame(symbol_table.max_position(), memory.get()); - auto cursor = expand_variable.MakeCursor(&dba, memory.get()); + auto cursor = expand_variable.MakeCursor(memory.get()); for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { frame[expand_variable.input_symbol_] = query::TypedValue(v); for (const auto &dest : dba.Vertices(false)) { @@ -280,7 +280,7 @@ static void ExpandWeightedShortest(benchmark::State &state) { evaluation_context}; TMemory memory; query::Frame frame(symbol_table.max_position(), memory.get()); - auto cursor = expand_variable.MakeCursor(&dba, memory.get()); + auto cursor = expand_variable.MakeCursor(memory.get()); for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { frame[expand_variable.input_symbol_] = query::TypedValue(v); for (const auto &dest : dba.Vertices(false)) { @@ -326,7 +326,7 @@ static void Accumulate(benchmark::State &state) { evaluation_context}; TMemory memory; query::Frame frame(symbol_table.max_position(), memory.get()); - auto cursor = accumulate.MakeCursor(&dba, memory.get()); + auto cursor = accumulate.MakeCursor(memory.get()); while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); } state.SetItemsProcessed(state.iterations()); @@ -375,7 +375,7 @@ static void Aggregate(benchmark::State &state) { evaluation_context}; TMemory memory; query::Frame frame(symbol_table.max_position(), memory.get()); - auto cursor = aggregate.MakeCursor(&dba, memory.get()); + auto cursor = aggregate.MakeCursor(memory.get()); frame[symbols.front()] = query::TypedValue(0); // initial group_by value while (cursor->Pull(frame, execution_context)) { frame[symbols.front()].ValueInt()++; // new group_by value @@ -425,7 +425,7 @@ static void OrderBy(benchmark::State &state) { evaluation_context}; TMemory memory; query::Frame frame(symbol_table.max_position(), memory.get()); - auto cursor = order_by.MakeCursor(&dba, memory.get()); + auto cursor = order_by.MakeCursor(memory.get()); while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); } state.SetItemsProcessed(state.iterations()); @@ -464,7 +464,7 @@ static void Unwind(benchmark::State &state) { query::Frame frame(symbol_table.max_position(), memory.get()); frame[list_sym] = query::TypedValue(std::vector<query::TypedValue>(state.range(1))); - auto cursor = unwind.MakeCursor(&dba, memory.get()); + auto cursor = unwind.MakeCursor(memory.get()); while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); } state.SetItemsProcessed(state.iterations()); diff --git a/tests/unit/bfs_common.hpp b/tests/unit/bfs_common.hpp index f1c2f3acf..9805cac55 100644 --- a/tests/unit/bfs_common.hpp +++ b/tests/unit/bfs_common.hpp @@ -110,10 +110,9 @@ class Yield : public query::plan::LogicalOperator { values_(values) {} query::plan::UniqueCursorPtr MakeCursor( - database::GraphDbAccessor *dba, utils::MemoryResource *mem) const override { return query::plan::MakeUniqueCursorPtr<YieldCursor>( - mem, this, input_->MakeCursor(dba, mem)); + mem, this, input_->MakeCursor(mem)); } std::vector<query::Symbol> ModifiedSymbols( const query::SymbolTable &) const override { @@ -173,8 +172,7 @@ class Yield : public query::plan::LogicalOperator { std::vector<std::vector<query::TypedValue>> PullResults( query::plan::LogicalOperator *last_op, query::ExecutionContext *context, std::vector<query::Symbol> output_symbols) { - auto cursor = - last_op->MakeCursor(context->db_accessor, utils::NewDeleteResource()); + auto cursor = last_op->MakeCursor(utils::NewDeleteResource()); std::vector<std::vector<query::TypedValue>> output; { query::Frame frame(context->symbol_table.max_position()); diff --git a/tests/unit/distributed_reset.cpp b/tests/unit/distributed_reset.cpp index 56c3c55fc..56c760fe6 100644 --- a/tests/unit/distributed_reset.cpp +++ b/tests/unit/distributed_reset.cpp @@ -20,8 +20,7 @@ TEST_F(DistributedReset, ResetTest) { auto dba = master().Access(); query::Frame frame(0); query::ExecutionContext context{dba.get()}; - auto pull_remote_cursor = - pull_remote->MakeCursor(dba.get(), utils::NewDeleteResource()); + auto pull_remote_cursor = pull_remote->MakeCursor(utils::NewDeleteResource()); for (int i = 0; i < 3; ++i) { EXPECT_TRUE(pull_remote_cursor->Pull(frame, context)); diff --git a/tests/unit/query_plan_common.hpp b/tests/unit/query_plan_common.hpp index a4de396cf..568f8b4e8 100644 --- a/tests/unit/query_plan_common.hpp +++ b/tests/unit/query_plan_common.hpp @@ -42,8 +42,7 @@ std::vector<std::vector<TypedValue>> CollectProduce(const Produce &produce, symbols.emplace_back(context->symbol_table.at(*named_expression)); // stream out results - auto cursor = - produce.MakeCursor(context->db_accessor, utils::NewDeleteResource()); + auto cursor = produce.MakeCursor(utils::NewDeleteResource()); std::vector<std::vector<TypedValue>> results; while (cursor->Pull(frame, *context)) { std::vector<TypedValue> values; @@ -56,8 +55,7 @@ std::vector<std::vector<TypedValue>> CollectProduce(const Produce &produce, int PullAll(const LogicalOperator &logical_op, ExecutionContext *context) { Frame frame(context->symbol_table.max_position()); - auto cursor = - logical_op.MakeCursor(context->db_accessor, utils::NewDeleteResource()); + auto cursor = logical_op.MakeCursor(utils::NewDeleteResource()); int count = 0; while (cursor->Pull(frame, *context)) count++; return count; diff --git a/tests/unit/query_plan_create_set_remove_delete.cpp b/tests/unit/query_plan_create_set_remove_delete.cpp index 76aebc436..094bf5933 100644 --- a/tests/unit/query_plan_create_set_remove_delete.cpp +++ b/tests/unit/query_plan_create_set_remove_delete.cpp @@ -281,8 +281,7 @@ TEST(QueryPlan, Delete) { n.op_, std::vector<Expression *>{n_get}, true); Frame frame(symbol_table.max_position()); auto context = MakeContext(storage, symbol_table, &dba); - delete_op->MakeCursor(&dba, utils::NewDeleteResource()) - ->Pull(frame, context); + delete_op->MakeCursor(utils::NewDeleteResource())->Pull(frame, context); dba.AdvanceCommand(); EXPECT_EQ(3, CountIterable(dba.Vertices(false))); EXPECT_EQ(3, CountIterable(dba.Edges(false))); diff --git a/tests/unit/query_plan_match_filter_return.cpp b/tests/unit/query_plan_match_filter_return.cpp index 6cb398f17..8ce7cfeba 100644 --- a/tests/unit/query_plan_match_filter_return.cpp +++ b/tests/unit/query_plan_match_filter_return.cpp @@ -587,7 +587,7 @@ class QueryPlanExpandVariable : public testing::Test { template <typename TResult> auto GetResults(std::shared_ptr<LogicalOperator> input_op, Symbol symbol) { Frame frame(symbol_table.max_position()); - auto cursor = input_op->MakeCursor(&dba_, utils::NewDeleteResource()); + auto cursor = input_op->MakeCursor(utils::NewDeleteResource()); auto context = MakeContext(storage, symbol_table, &dba_); std::vector<TResult> results; while (cursor->Pull(frame, context)) @@ -885,7 +885,7 @@ class QueryPlanExpandWeightedShortestPath : public testing::Test { total_weight); Frame frame(symbol_table.max_position()); - auto cursor = last_op->MakeCursor(&dba, utils::NewDeleteResource()); + auto cursor = last_op->MakeCursor(utils::NewDeleteResource()); std::vector<ResultType> results; auto context = MakeContext(storage, symbol_table, &dba); while (cursor->Pull(frame, context)) {