From bb679a4b1dadbd26af6d306909fb14a8a5f18b2e Mon Sep 17 00:00:00 2001 From: Teon Banek Date: Mon, 27 Aug 2018 16:43:34 +0200 Subject: [PATCH] Move distributed operators to its own file Summary: This is the first step in separating the implementation of distributed features in operators. Following steps are: * decoupling distributed visitors * injecting distributed details in operator state * minor cleanup or anything else that was overlooked Reviewers: mtomic, msantl Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1563 --- .gitignore | 3 +- src/CMakeLists.txt | 22 +- src/distributed/plan_rpc_messages.lcp | 4 +- src/distributed/produce_rpc_server.hpp | 2 + src/lisp/lcp.lisp | 4 +- src/query/plan/distributed.cpp | 1 + src/query/plan/distributed_ops.cpp | 697 +++++++++++++++++++++++++ src/query/plan/distributed_ops.lcp | 156 ++++++ src/query/plan/operator.cpp | 671 +----------------------- src/query/plan/operator.lcp | 155 +----- src/query/plan/pretty_print.cpp | 1 + tests/unit/distributed_query_plan.cpp | 1 + tests/unit/distributed_reset.cpp | 1 + tests/unit/query_planner.cpp | 2 +- tools/lcp | 10 +- 15 files changed, 905 insertions(+), 825 deletions(-) create mode 100644 src/query/plan/distributed_ops.cpp create mode 100644 src/query/plan/distributed_ops.lcp diff --git a/.gitignore b/.gitignore index 96fd7c3fa..260c92a1c 100644 --- a/.gitignore +++ b/.gitignore @@ -66,7 +66,8 @@ src/distributed/transactional_cache_cleaner_rpc_messages.capnp src/distributed/transactional_cache_cleaner_rpc_messages.hpp src/distributed/updates_rpc_messages.capnp src/distributed/updates_rpc_messages.hpp -src/query/plan/operator.capnp +src/query/plan/distributed_ops.capnp +src/query/plan/distributed_ops.hpp src/query/plan/operator.hpp src/stats/stats_rpc_messages.capnp src/stats/stats_rpc_messages.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6f54cf476..e6bf5b386 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -55,6 +55,7 @@ set(memgraph_src_files query/interpret/awesome_memgraph_functions.cpp query/interpreter.cpp query/plan/distributed.cpp + query/plan/distributed_ops.cpp query/plan/operator.cpp query/plan/preprocess.cpp query/plan/pretty_print.cpp @@ -112,11 +113,13 @@ set(lcp_src_files lisp/lcp.lisp ${lcp_exe}) # # NOTE: memgraph_src_files and generated_lcp_files are globally updated. function(add_lcp lcp_file) + set(options CAPNP_DECLARATION) set(one_value_kwargs CAPNP_SCHEMA) - cmake_parse_arguments(KW "" "${one_value_kwargs}" "" ${ARGN}) + set(multi_value_kwargs DEPENDS) + cmake_parse_arguments(KW "${options}" "${one_value_kwargs}" "${multi_value_kwargs}" ${ARGN}) string(REGEX REPLACE "\.lcp$" ".hpp" h_file "${CMAKE_CURRENT_SOURCE_DIR}/${lcp_file}") - if (KW_CAPNP_SCHEMA) + if (KW_CAPNP_SCHEMA AND NOT KW_CAPNP_DECLARATION) string(REGEX REPLACE "\.lcp$" ".capnp" capnp_file "${CMAKE_CURRENT_SOURCE_DIR}/${lcp_file}") set(capnp_id ${KW_CAPNP_SCHEMA}) @@ -125,10 +128,13 @@ function(add_lcp lcp_file) # Update *global* memgraph_src_files set(memgraph_src_files ${memgraph_src_files} ${cpp_file} PARENT_SCOPE) endif() + if (KW_CAPNP_DECLARATION) + set(capnp_id "--capnp-declaration") + endif() add_custom_command(OUTPUT ${h_file} ${cpp_file} ${capnp_file} COMMAND ${lcp_exe} ${lcp_file} ${capnp_id} VERBATIM - DEPENDS ${lcp_file} ${lcp_src_files} ${capnp_depend} + DEPENDS ${lcp_file} ${lcp_src_files} ${capnp_depend} ${KW_DEPENDS} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) # Update *global* generated_lcp_files set(generated_lcp_files ${generated_lcp_files} ${h_file} ${cpp_file} ${capnp_file} PARENT_SCOPE) @@ -160,8 +166,14 @@ add_lcp(distributed/transactional_cache_cleaner_rpc_messages.lcp CAPNP_SCHEMA @0 add_capnp(distributed/transactional_cache_cleaner_rpc_messages.capnp) add_lcp(distributed/updates_rpc_messages.lcp CAPNP_SCHEMA @0x82d5f38d73c7b53a) add_capnp(distributed/updates_rpc_messages.capnp) -add_lcp(query/plan/operator.lcp CAPNP_SCHEMA @0xe5cae8d045d30c42) -add_capnp(query/plan/operator.capnp) + +# distributed_ops.lcp is leading the capnp code generation, so we only want +# function declarations in generated operator.hpp +add_lcp(query/plan/operator.lcp CAPNP_DECLARATION) +add_lcp(query/plan/distributed_ops.lcp CAPNP_SCHEMA @0xe5cae8d045d30c42 + DEPENDS query/plan/operator.lcp) +add_capnp(query/plan/distributed_ops.capnp) + add_lcp(storage/concurrent_id_mapper_rpc_messages.lcp CAPNP_SCHEMA @0xa6068dae93d225dd) add_capnp(storage/concurrent_id_mapper_rpc_messages.capnp) add_lcp(transactions/engine_rpc_messages.lcp CAPNP_SCHEMA @0xde02b7c49180cad5) diff --git a/src/distributed/plan_rpc_messages.lcp b/src/distributed/plan_rpc_messages.lcp index d70ca8f4b..1a7a3e0e0 100644 --- a/src/distributed/plan_rpc_messages.lcp +++ b/src/distributed/plan_rpc_messages.lcp @@ -4,7 +4,7 @@ #include "communication/rpc/messages.hpp" #include "query/frontend/ast/ast.hpp" #include "query/frontend/semantic/symbol_table.hpp" -#include "query/plan/operator.hpp" +#include "query/plan/distributed_ops.hpp" #include "distributed/plan_rpc_messages.capnp.h" cpp<# @@ -14,7 +14,7 @@ cpp<# (lcp:capnp-namespace "distributed") (lcp:capnp-import 'utils "/utils/serialization.capnp") -(lcp:capnp-import 'plan "/query/plan/operator.capnp") +(lcp:capnp-import 'plan "/query/plan/distributed_ops.capnp") (lcp:capnp-import 'sem "/query/frontend/semantic/symbol.capnp") (defun load-plan (reader member) diff --git a/src/distributed/produce_rpc_server.hpp b/src/distributed/produce_rpc_server.hpp index 0927d889d..8abea9cb8 100644 --- a/src/distributed/produce_rpc_server.hpp +++ b/src/distributed/produce_rpc_server.hpp @@ -1,3 +1,4 @@ +/// @file #pragma once #include @@ -10,6 +11,7 @@ #include "database/graph_db.hpp" #include "database/graph_db_accessor.hpp" #include "distributed/plan_consumer.hpp" +#include "distributed/pull_produce_rpc_messages.hpp" #include "query/context.hpp" #include "query/frontend/semantic/symbol_table.hpp" #include "query/interpret/frame.hpp" diff --git a/src/lisp/lcp.lisp b/src/lisp/lcp.lisp index 92260f0f1..9ffdbf7fa 100644 --- a/src/lisp/lcp.lisp +++ b/src/lisp/lcp.lisp @@ -1570,7 +1570,7 @@ code generation." (declare (ignore to-close)) (format cpp-out "~%}")))) -(defun process-file (lcp-file &key capnp-id) +(defun process-file (lcp-file &key capnp-id capnp-declaration) "Process a LCP-FILE and write the output to .hpp file in the same directory. If CAPNP-ID is passed, generates the Cap'n Proto schema to .capnp file in the same directory, while the loading code is generated in LCP-FILE.cpp source @@ -1584,7 +1584,7 @@ file." (cpp-file (concatenate 'string lcp-file ".cpp")) (capnp-file (concatenate 'string filename ".capnp")) ;; Reset globals - (*capnp-serialize-p* capnp-id) + (*capnp-serialize-p* (or capnp-id capnp-declaration)) (*capnp-namespace* nil) (*capnp-imports* nil) (*capnp-type-converters* nil) diff --git a/src/query/plan/distributed.cpp b/src/query/plan/distributed.cpp index f91b979f0..e4d436dc0 100644 --- a/src/query/plan/distributed.cpp +++ b/src/query/plan/distributed.cpp @@ -6,6 +6,7 @@ // serialization when proper cloning is added. #include +#include "query/plan/distributed_ops.hpp" #include "query/plan/operator.hpp" #include "query/plan/preprocess.hpp" #include "utils/exceptions.hpp" diff --git a/src/query/plan/distributed_ops.cpp b/src/query/plan/distributed_ops.cpp new file mode 100644 index 000000000..869b33f13 --- /dev/null +++ b/src/query/plan/distributed_ops.cpp @@ -0,0 +1,697 @@ +#include "query/plan/distributed_ops.hpp" + +#include "database/distributed_graph_db.hpp" +#include "distributed/bfs_rpc_clients.hpp" +#include "distributed/pull_produce_rpc_messages.hpp" +#include "distributed/pull_rpc_clients.hpp" +#include "distributed/updates_rpc_clients.hpp" +#include "distributed/updates_rpc_server.hpp" +#include "query/context.hpp" +#include "query/exceptions.hpp" +#include "query/interpret/eval.hpp" +#include "query/interpret/frame.hpp" + +DEFINE_HIDDEN_int32(remote_pull_sleep_micros, 10, + "Sleep between remote result pulling in microseconds"); + +// macro for the default implementation of LogicalOperator::Accept +// that accepts the visitor and visits it's input_ operator +#define ACCEPT_WITH_INPUT(class_name) \ + bool class_name::Accept(HierarchicalLogicalOperatorVisitor &visitor) { \ + if (visitor.PreVisit(*this)) { \ + input_->Accept(visitor); \ + } \ + return visitor.PostVisit(*this); \ + } + +namespace query::plan { + +bool PullRemote::Accept(HierarchicalLogicalOperatorVisitor &visitor) { + if (visitor.PreVisit(*this)) { + if (input_) input_->Accept(visitor); + } + return visitor.PostVisit(*this); +} + +std::vector PullRemote::OutputSymbols(const SymbolTable &table) const { + return input_ ? input_->OutputSymbols(table) : std::vector{}; +} + +std::vector PullRemote::ModifiedSymbols( + const SymbolTable &table) const { + auto symbols = symbols_; + if (input_) { + auto input_symbols = input_->ModifiedSymbols(table); + symbols.insert(symbols.end(), input_symbols.begin(), input_symbols.end()); + } + return symbols; +} + +std::vector Synchronize::ModifiedSymbols( + const SymbolTable &table) const { + auto symbols = input_->ModifiedSymbols(table); + if (pull_remote_) { + auto pull_symbols = pull_remote_->ModifiedSymbols(table); + symbols.insert(symbols.end(), pull_symbols.begin(), pull_symbols.end()); + } + return symbols; +} + +bool Synchronize::Accept(HierarchicalLogicalOperatorVisitor &visitor) { + if (visitor.PreVisit(*this)) { + // pull_remote_ is optional here, so visit it only if we continue visiting + // and pull_remote_ does exist. + input_->Accept(visitor) && pull_remote_ && pull_remote_->Accept(visitor); + } + return visitor.PostVisit(*this); +} + +PullRemoteOrderBy::PullRemoteOrderBy( + const std::shared_ptr &input, int64_t plan_id, + const std::vector> &order_by, + const std::vector &symbols) + : input_(input), plan_id_(plan_id), symbols_(symbols) { + CHECK(input_ != nullptr) + << "PullRemoteOrderBy should always be constructed with input!"; + std::vector ordering; + ordering.reserve(order_by.size()); + order_by_.reserve(order_by.size()); + for (const auto &ordering_expression_pair : order_by) { + ordering.emplace_back(ordering_expression_pair.first); + order_by_.emplace_back(ordering_expression_pair.second); + } + compare_ = TypedValueVectorCompare(ordering); +} + +ACCEPT_WITH_INPUT(PullRemoteOrderBy); + +std::vector PullRemoteOrderBy::OutputSymbols( + const SymbolTable &table) const { + return input_->OutputSymbols(table); +} + +std::vector PullRemoteOrderBy::ModifiedSymbols( + const SymbolTable &table) const { + return input_->ModifiedSymbols(table); +} + +namespace { + +/** Helper class that wraps remote pulling for cursors that handle results from + * distributed workers. + * + * The command_id should be the command_id at the initialization of a cursor. + */ +class RemotePuller { + public: + RemotePuller(distributed::PullRpcClients *pull_clients, + database::GraphDbAccessor &db, + const std::vector &symbols, int64_t plan_id, + tx::CommandId command_id) + : pull_clients_(pull_clients), + db_(db), + symbols_(symbols), + plan_id_(plan_id), + command_id_(command_id) { + CHECK(pull_clients_); + worker_ids_ = pull_clients_->GetWorkerIds(); + // Remove master from the worker ids list. + worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0)); + } + + void Initialize(Context &context) { + if (!remote_pulls_initialized_) { + VLOG(10) << "[RemotePuller] [" << context.db_accessor_.transaction_id() + << "] [" << plan_id_ << "] [" << command_id_ << "] initialized"; + for (auto &worker_id : worker_ids_) { + UpdatePullForWorker(worker_id, context); + } + remote_pulls_initialized_ = true; + } + } + + void Update(Context &context) { + // If we don't have results for a worker, check if his remote pull + // finished and save results locally. + + auto move_frames = [this, &context](int worker_id, auto remote_results) { + VLOG(10) << "[RemotePuller] [" << context.db_accessor_.transaction_id() + << "] [" << plan_id_ << "] [" << command_id_ + << "] received results from " << worker_id; + remote_results_[worker_id] = std::move(remote_results.frames); + // Since we return and remove results from the back of the vector, + // reverse the results so the first to return is on the end of the + // vector. + std::reverse(remote_results_[worker_id].begin(), + remote_results_[worker_id].end()); + }; + + for (auto &worker_id : worker_ids_) { + if (!remote_results_[worker_id].empty()) continue; + + auto found_it = remote_pulls_.find(worker_id); + if (found_it == remote_pulls_.end()) continue; + + auto &remote_pull = found_it->second; + if (!remote_pull.IsReady()) continue; + + auto remote_results = remote_pull.get(); + switch (remote_results.pull_state) { + case distributed::PullState::CURSOR_EXHAUSTED: + VLOG(10) << "[RemotePuller] [" + << context.db_accessor_.transaction_id() << "] [" << plan_id_ + << "] [" << command_id_ << "] cursor exhausted from " + << worker_id; + move_frames(worker_id, remote_results); + remote_pulls_.erase(found_it); + break; + case distributed::PullState::CURSOR_IN_PROGRESS: + VLOG(10) << "[RemotePuller] [" + << context.db_accessor_.transaction_id() << "] [" << plan_id_ + << "] [" << command_id_ << "] cursor in progress from " + << worker_id; + move_frames(worker_id, remote_results); + UpdatePullForWorker(worker_id, context); + break; + case distributed::PullState::SERIALIZATION_ERROR: + throw mvcc::SerializationError( + "Serialization error occured during PullRemote !"); + case distributed::PullState::LOCK_TIMEOUT_ERROR: + throw utils::LockTimeoutException( + "LockTimeout error occured during PullRemote !"); + case distributed::PullState::UPDATE_DELETED_ERROR: + throw QueryRuntimeException( + "RecordDeleted error ocured during PullRemote !"); + case distributed::PullState::RECONSTRUCTION_ERROR: + throw query::ReconstructionException(); + case distributed::PullState::UNABLE_TO_DELETE_VERTEX_ERROR: + throw RemoveAttachedVertexException(); + case distributed::PullState::HINTED_ABORT_ERROR: + throw HintedAbortError(); + case distributed::PullState::QUERY_ERROR: + throw QueryRuntimeException( + "Query runtime error occurred during PullRemote !"); + } + } + } + + void Reset() { + worker_ids_ = pull_clients_->GetWorkerIds(); + // Remove master from the worker ids list. + worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0)); + + // We must clear remote_pulls before reseting cursors to make sure that all + // outstanding remote pulls are done. Otherwise we might try to reset cursor + // during its pull. + remote_pulls_.clear(); + for (auto &worker_id : worker_ids_) { + pull_clients_->ResetCursor(&db_, worker_id, plan_id_, command_id_); + } + remote_results_.clear(); + remote_pulls_initialized_ = false; + } + + auto Workers() { return worker_ids_; } + + int GetWorkerId(int worker_id_index) { return worker_ids_[worker_id_index]; } + + size_t WorkerCount() { return worker_ids_.size(); } + + void ClearWorkers() { worker_ids_.clear(); } + + bool HasPendingPulls() { return !remote_pulls_.empty(); } + + bool HasPendingPullFromWorker(int worker_id) { + return remote_pulls_.find(worker_id) != remote_pulls_.end(); + } + + bool HasResultsFromWorker(int worker_id) { + return !remote_results_[worker_id].empty(); + } + + std::vector PopResultFromWorker(int worker_id) { + auto result = remote_results_[worker_id].back(); + remote_results_[worker_id].pop_back(); + + // Remove the worker if we exhausted all locally stored results and there + // are no more pending remote pulls for that worker. + if (remote_results_[worker_id].empty() && + remote_pulls_.find(worker_id) == remote_pulls_.end()) { + worker_ids_.erase( + std::find(worker_ids_.begin(), worker_ids_.end(), worker_id)); + } + + return result; + } + + private: + distributed::PullRpcClients *pull_clients_{nullptr}; + database::GraphDbAccessor &db_; + std::vector symbols_; + int64_t plan_id_; + tx::CommandId command_id_; + std::unordered_map> remote_pulls_; + std::unordered_map>> + remote_results_; + std::vector worker_ids_; + bool remote_pulls_initialized_ = false; + + void UpdatePullForWorker(int worker_id, Context &context) { + remote_pulls_[worker_id] = pull_clients_->Pull( + &db_, worker_id, plan_id_, command_id_, context.parameters_, symbols_, + context.timestamp_, false); + } +}; + +class PullRemoteCursor : public Cursor { + public: + PullRemoteCursor(const PullRemote &self, database::GraphDbAccessor &db) + : self_(self), + input_cursor_(self.input() ? self.input()->MakeCursor(db) : nullptr), + command_id_(db.transaction().cid()), + remote_puller_( + // TODO: Pass in a Master GraphDb. + &dynamic_cast(&db.db())->pull_clients(), db, + self.symbols(), self.plan_id(), command_id_) {} + + bool Pull(Frame &frame, Context &context) override { + if (context.db_accessor_.should_abort()) throw HintedAbortError(); + remote_puller_.Initialize(context); + + bool have_remote_results = false; + while (!have_remote_results && remote_puller_.WorkerCount() > 0) { + if (context.db_accessor_.should_abort()) throw HintedAbortError(); + remote_puller_.Update(context); + + // Get locally stored results from workers in a round-robin fasion. + int num_workers = remote_puller_.WorkerCount(); + for (int i = 0; i < num_workers; ++i) { + int worker_id_index = + (last_pulled_worker_id_index_ + i + 1) % num_workers; + int worker_id = remote_puller_.GetWorkerId(worker_id_index); + + if (remote_puller_.HasResultsFromWorker(worker_id)) { + last_pulled_worker_id_index_ = worker_id_index; + have_remote_results = true; + break; + } + } + + if (!have_remote_results) { + if (!remote_puller_.HasPendingPulls()) { + remote_puller_.ClearWorkers(); + break; + } + + // If there are no remote results available, try to pull and return + // local results. + if (input_cursor_ && input_cursor_->Pull(frame, context)) { + VLOG(10) << "[PullRemoteCursor] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] [" << command_id_ + << "] producing local results "; + return true; + } + + VLOG(10) << "[PullRemoteCursor] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] [" << command_id_ + << "] no results available, sleeping "; + // If there aren't any local/remote results available, sleep. + std::this_thread::sleep_for( + std::chrono::microseconds(FLAGS_remote_pull_sleep_micros)); + } + } + + // No more remote results, make sure local results get exhausted. + if (!have_remote_results) { + if (input_cursor_ && input_cursor_->Pull(frame, context)) { + VLOG(10) << "[PullRemoteCursor] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] [" << command_id_ + << "] producing local results "; + return true; + } + return false; + } + + { + int worker_id = remote_puller_.GetWorkerId(last_pulled_worker_id_index_); + VLOG(10) << "[PullRemoteCursor] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] [" << command_id_ + << "] producing results from worker " << worker_id; + auto result = remote_puller_.PopResultFromWorker(worker_id); + for (size_t i = 0; i < self_.symbols().size(); ++i) { + frame[self_.symbols()[i]] = std::move(result[i]); + } + } + return true; + } + + void Reset() override { + if (input_cursor_) input_cursor_->Reset(); + remote_puller_.Reset(); + last_pulled_worker_id_index_ = 0; + } + + private: + const PullRemote &self_; + const std::unique_ptr input_cursor_; + tx::CommandId command_id_; + RemotePuller remote_puller_; + int last_pulled_worker_id_index_ = 0; +}; + +class SynchronizeCursor : public Cursor { + public: + SynchronizeCursor(const Synchronize &self, database::GraphDbAccessor &db) + : self_(self), + pull_clients_( + // TODO: Pass in a Master GraphDb. + &dynamic_cast(&db.db())->pull_clients()), + updates_clients_( + // TODO: Pass in a Master GraphDb. + &dynamic_cast(&db.db())->updates_clients()), + updates_server_( + // TODO: Pass in a Master GraphDb. + &dynamic_cast(&db.db())->updates_server()), + input_cursor_(self.input()->MakeCursor(db)), + pull_remote_cursor_( + self.pull_remote() ? self.pull_remote()->MakeCursor(db) : nullptr), + command_id_(db.transaction().cid()), + master_id_( + // TODO: Pass in a Master GraphDb. + dynamic_cast(&db.db())->WorkerId()) {} + + bool Pull(Frame &frame, Context &context) override { + if (!initial_pull_done_) { + InitialPull(frame, context); + initial_pull_done_ = true; + } + // Yield local stuff while available. + if (!local_frames_.empty()) { + VLOG(10) << "[SynchronizeCursor] [" + << context.db_accessor_.transaction_id() + << "] producing local results"; + auto &result = local_frames_.back(); + for (size_t i = 0; i < frame.elems().size(); ++i) { + if (self_.advance_command()) { + query::ReconstructTypedValue(result[i]); + } + frame.elems()[i] = std::move(result[i]); + } + local_frames_.resize(local_frames_.size() - 1); + return true; + } + + // We're out of local stuff, yield from pull_remote if available. + if (pull_remote_cursor_ && pull_remote_cursor_->Pull(frame, context)) { + VLOG(10) << "[SynchronizeCursor] [" + << context.db_accessor_.transaction_id() + << "] producing remote results"; + return true; + } + + return false; + } + + void Reset() override { + input_cursor_->Reset(); + pull_remote_cursor_->Reset(); + initial_pull_done_ = false; + local_frames_.clear(); + } + + private: + const Synchronize &self_; + distributed::PullRpcClients *pull_clients_{nullptr}; + distributed::UpdatesRpcClients *updates_clients_{nullptr}; + distributed::UpdatesRpcServer *updates_server_{nullptr}; + const std::unique_ptr input_cursor_; + const std::unique_ptr pull_remote_cursor_; + bool initial_pull_done_{false}; + std::vector> local_frames_; + tx::CommandId command_id_; + int master_id_; + + void InitialPull(Frame &frame, Context &context) { + VLOG(10) << "[SynchronizeCursor] [" << context.db_accessor_.transaction_id() + << "] initial pull"; + + // Tell all workers to accumulate, only if there is a remote pull. + std::vector> worker_accumulations; + if (pull_remote_cursor_) { + for (auto worker_id : pull_clients_->GetWorkerIds()) { + if (worker_id == master_id_) continue; + worker_accumulations.emplace_back(pull_clients_->Pull( + &context.db_accessor_, worker_id, self_.pull_remote()->plan_id(), + command_id_, context.parameters_, self_.pull_remote()->symbols(), + context.timestamp_, true, 0)); + } + } + + // Accumulate local results + while (input_cursor_->Pull(frame, context)) { + local_frames_.emplace_back(); + auto &local_frame = local_frames_.back(); + local_frame.reserve(frame.elems().size()); + for (auto &elem : frame.elems()) { + local_frame.emplace_back(std::move(elem)); + } + } + + // Wait for all workers to finish accumulation (first sync point). + for (auto &accu : worker_accumulations) { + switch (accu.get().pull_state) { + case distributed::PullState::CURSOR_EXHAUSTED: + continue; + case distributed::PullState::CURSOR_IN_PROGRESS: + throw QueryRuntimeException( + "Expected exhausted cursor after remote pull accumulate"); + case distributed::PullState::SERIALIZATION_ERROR: + throw mvcc::SerializationError( + "Failed to perform remote accumulate due to " + "SerializationError"); + case distributed::PullState::UPDATE_DELETED_ERROR: + throw QueryRuntimeException( + "Failed to perform remote accumulate due to " + "RecordDeletedError"); + case distributed::PullState::LOCK_TIMEOUT_ERROR: + throw utils::LockTimeoutException( + "Failed to perform remote accumulate due to " + "LockTimeoutException"); + case distributed::PullState::RECONSTRUCTION_ERROR: + throw QueryRuntimeException( + "Failed to perform remote accumulate due to " + "ReconstructionError"); + case distributed::PullState::UNABLE_TO_DELETE_VERTEX_ERROR: + throw RemoveAttachedVertexException(); + case distributed::PullState::HINTED_ABORT_ERROR: + throw HintedAbortError(); + case distributed::PullState::QUERY_ERROR: + throw QueryRuntimeException( + "Failed to perform remote accumulate due to Query runtime " + "error"); + } + } + + if (self_.advance_command()) { + context.db_accessor_.AdvanceCommand(); + } + + // Make all the workers apply their deltas. + auto tx_id = context.db_accessor_.transaction_id(); + auto apply_futures = updates_clients_->UpdateApplyAll(master_id_, tx_id); + updates_server_->Apply(tx_id); + for (auto &future : apply_futures) { + switch (future.get()) { + case distributed::UpdateResult::SERIALIZATION_ERROR: + throw mvcc::SerializationError( + "Failed to apply deferred updates due to SerializationError"); + case distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: + throw RemoveAttachedVertexException(); + case distributed::UpdateResult::UPDATE_DELETED_ERROR: + throw QueryRuntimeException( + "Failed to apply deferred updates due to RecordDeletedError"); + case distributed::UpdateResult::LOCK_TIMEOUT_ERROR: + throw utils::LockTimeoutException( + "Failed to apply deferred update due to LockTimeoutException"); + case distributed::UpdateResult::DONE: + break; + } + } + + // If the command advanced, let the workers know. + if (self_.advance_command()) { + auto futures = pull_clients_->NotifyAllTransactionCommandAdvanced(tx_id); + for (auto &future : futures) future.wait(); + } + } +}; + +class PullRemoteOrderByCursor : public Cursor { + public: + PullRemoteOrderByCursor(const PullRemoteOrderBy &self, + database::GraphDbAccessor &db) + : self_(self), + input_(self.input()->MakeCursor(db)), + command_id_(db.transaction().cid()), + remote_puller_( + // TODO: Pass in a Master GraphDb. + &dynamic_cast(&db.db())->pull_clients(), db, + self.symbols(), self.plan_id(), command_id_) {} + + bool Pull(Frame &frame, Context &context) { + if (context.db_accessor_.should_abort()) throw HintedAbortError(); + ExpressionEvaluator evaluator(frame, &context, GraphView::OLD); + + auto evaluate_result = [this, &evaluator]() { + std::vector order_by; + order_by.reserve(self_.order_by().size()); + for (auto expression_ptr : self_.order_by()) { + order_by.emplace_back(expression_ptr->Accept(evaluator)); + } + return order_by; + }; + + auto restore_frame = [&frame, + this](const std::vector &restore_from) { + for (size_t i = 0; i < restore_from.size(); ++i) { + frame[self_.symbols()[i]] = restore_from[i]; + } + }; + + if (!merge_initialized_) { + VLOG(10) << "[PullRemoteOrderBy] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] [" << command_id_ << "] initialize"; + remote_puller_.Initialize(context); + missing_results_from_ = remote_puller_.Workers(); + missing_master_result_ = true; + merge_initialized_ = true; + } + + if (missing_master_result_) { + if (input_->Pull(frame, context)) { + std::vector output; + output.reserve(self_.symbols().size()); + for (const Symbol &symbol : self_.symbols()) { + output.emplace_back(frame[symbol]); + } + + merge_.push_back(MergeResultItem{std::experimental::nullopt, output, + evaluate_result()}); + } + missing_master_result_ = false; + } + + while (!missing_results_from_.empty()) { + if (context.db_accessor_.should_abort()) throw HintedAbortError(); + remote_puller_.Update(context); + + bool has_all_result = true; + for (auto &worker_id : missing_results_from_) { + if (!remote_puller_.HasResultsFromWorker(worker_id) && + remote_puller_.HasPendingPullFromWorker(worker_id)) { + has_all_result = false; + break; + } + } + + if (!has_all_result) { + VLOG(10) << "[PullRemoteOrderByCursor] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] [" << command_id_ + << "] missing results, sleep"; + // If we don't have results from all workers, sleep before continuing. + std::this_thread::sleep_for( + std::chrono::microseconds(FLAGS_remote_pull_sleep_micros)); + continue; + } + + for (auto &worker_id : missing_results_from_) { + // It is possible that the workers remote pull finished but it didn't + // return any results. In that case, just skip it. + if (!remote_puller_.HasResultsFromWorker(worker_id)) continue; + auto remote_result = remote_puller_.PopResultFromWorker(worker_id); + restore_frame(remote_result); + merge_.push_back( + MergeResultItem{worker_id, remote_result, evaluate_result()}); + } + + missing_results_from_.clear(); + } + + if (merge_.empty()) return false; + + auto result_it = std::min_element( + merge_.begin(), merge_.end(), [this](const auto &lhs, const auto &rhs) { + return self_.compare()(lhs.order_by, rhs.order_by); + }); + + restore_frame(result_it->remote_result); + + if (result_it->worker_id) { + VLOG(10) << "[PullRemoteOrderByCursor] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] [" << command_id_ + << "] producing results from worker " + << result_it->worker_id.value(); + missing_results_from_.push_back(result_it->worker_id.value()); + } else { + VLOG(10) << "[PullRemoteOrderByCursor] [" + << context.db_accessor_.transaction_id() << "] [" + << self_.plan_id() << "] [" << command_id_ + << "] producing local results"; + missing_master_result_ = true; + } + + merge_.erase(result_it); + return true; + } + + void Reset() { + input_->Reset(); + remote_puller_.Reset(); + merge_.clear(); + missing_results_from_.clear(); + missing_master_result_ = false; + merge_initialized_ = false; + } + + private: + struct MergeResultItem { + std::experimental::optional worker_id; + std::vector remote_result; + std::vector order_by; + }; + + const PullRemoteOrderBy &self_; + std::unique_ptr input_; + tx::CommandId command_id_; + RemotePuller remote_puller_; + std::vector merge_; + std::vector missing_results_from_; + bool missing_master_result_ = false; + bool merge_initialized_ = false; +}; + +} // namespace + +std::unique_ptr PullRemote::MakeCursor( + database::GraphDbAccessor &db) const { + return std::make_unique(*this, db); +} + +std::unique_ptr Synchronize::MakeCursor( + database::GraphDbAccessor &db) const { + return std::make_unique(*this, db); +} + +std::unique_ptr PullRemoteOrderBy::MakeCursor( + database::GraphDbAccessor &db) const { + return std::make_unique(*this, db); +} + +} // namespace query::plan diff --git a/src/query/plan/distributed_ops.lcp b/src/query/plan/distributed_ops.lcp new file mode 100644 index 000000000..3f769f2f0 --- /dev/null +++ b/src/query/plan/distributed_ops.lcp @@ -0,0 +1,156 @@ +#>cpp +/// @file + +#pragma once + +#include "query/plan/operator.hpp" +cpp<# + +(load "query/plan/operator.lcp") + +(lcp:namespace query) +(lcp:namespace plan) + +(lcp:define-class pull-remote (logical-operator) + ((input "std::shared_ptr" + :capnp-save #'save-operator-pointer + :capnp-load #'load-operator-pointer) + (plan-id :int64_t :initval 0 :reader t) + (symbols "std::vector" :reader t + :capnp-save (lcp:capnp-save-vector "::query::capnp::Symbol" "Symbol") + :capnp-load (lcp:capnp-load-vector "::query::capnp::Symbol" "Symbol"))) + (:documentation + "An operator in distributed Memgraph that yields both local and remote (from +other workers) frames. Obtaining remote frames is done through RPC calls to +`distributed::ProduceRpcServer`s running on all the workers. + +This operator aims to yield results as fast as possible and lose minimal +time on data transfer. It gives no guarantees on result order.") + (:public + #>cpp + PullRemote(const std::shared_ptr &input, int64_t plan_id, + const std::vector &symbols) + : input_(input), plan_id_(plan_id), symbols_(symbols) {} + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector OutputSymbols(const SymbolTable &) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private #>cpp PullRemote() {} cpp<#) + (:serialize :capnp)) + +(defun load-pull-remote (reader member-name) + #>cpp + ${member-name} = std::static_pointer_cast( + utils::LoadSharedPtr(${reader}, + [helper](const auto &reader) { + auto op = LogicalOperator::Construct(reader); + op->Load(reader, helper); + return op.release(); + }, &helper->loaded_ops)); + cpp<#) + +(lcp:define-class synchronize (logical-operator) + ((input "std::shared_ptr" + :capnp-save #'save-operator-pointer + :capnp-load #'load-operator-pointer) + (pull-remote "std::shared_ptr" :reader t + :capnp-save #'save-operator-pointer + :capnp-load #'load-pull-remote) + (advance-command :bool :initval "false" :reader t)) + (:documentation + "Operator used to synchronize stages of plan execution between the master and +all the workers. Synchronization is necessary in queries that update that +graph state because updates (as well as creations and deletions) are deferred +to avoid multithreaded modification of graph element data (as it's not +thread-safe). + +Logic of the synchronize operator is: + +1. If there is a Pull, tell all the workers to pull on that plan and + accumulate results without sending them to the master. This is async. +2. Accumulate local results, in parallel with 1. getting executed on workers. +3. Wait till the master and all the workers are done accumulating. +4. Advance the command, if necessary. +5. Tell all the workers to apply their updates. This is async. +6. Apply local updates, in parallel with 5. on the workers. +7. Notify workers that the command has advanced, if necessary. +8. Yield all the results, first local, then from Pull if available.") + (:public + #>cpp + Synchronize(const std::shared_ptr &input, + const std::shared_ptr &pull_remote, + bool advance_command) + : input_(input), + pull_remote_(pull_remote), + advance_command_(advance_command) {} + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + std::vector OutputSymbols( + const SymbolTable &symbol_table) const override { + return input_->OutputSymbols(symbol_table); + } + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private #>cpp Synchronize() {} cpp<#) + (:serialize :capnp)) + +(lcp:define-class pull-remote-order-by (logical-operator) + ((input "std::shared_ptr" + :capnp-save #'save-operator-pointer + :capnp-load #'load-operator-pointer) + (plan-id :int64_t :initval 0 :reader t) + (symbols "std::vector" :reader t + :capnp-save (lcp:capnp-save-vector "::query::capnp::Symbol" "Symbol") + :capnp-load (lcp:capnp-load-vector "::query::capnp::Symbol" "Symbol")) + (order-by "std::vector" :reader t + :capnp-type "List(Ast.Tree)" + :capnp-save (save-ast-vector "Expression *") + :capnp-load (load-ast-vector "Expression *")) + (compare "TypedValueVectorCompare" :reader t + :capnp-type "Common.TypedValueVectorCompare")) + (:documentation + "Operator that merges distributed OrderBy operators. +Instead of using a regular OrderBy on master (which would collect all remote +results and order them), we can have each worker do an OrderBy locally and +have the master rely on the fact that the results are ordered and merge them +by having only one result from each worker.") + (:public + #>cpp + PullRemoteOrderBy( + const std::shared_ptr &input, int64_t plan_id, + const std::vector> &order_by, + const std::vector &symbols); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + + std::vector ModifiedSymbols(const SymbolTable &) const override; + std::vector OutputSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private #>cpp PullRemoteOrderBy() {} cpp<#) + (:serialize :capnp)) + +(lcp:pop-namespace) +(lcp:pop-namespace) diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 3ba628763..fff022bbd 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -18,9 +18,6 @@ #include "database/distributed_graph_db.hpp" #include "database/graph_db_accessor.hpp" #include "distributed/bfs_rpc_clients.hpp" -#include "distributed/pull_rpc_clients.hpp" -#include "distributed/updates_rpc_clients.hpp" -#include "distributed/updates_rpc_server.hpp" #include "glue/auth.hpp" #include "glue/communication.hpp" #include "integrations/kafka/exceptions.hpp" @@ -38,8 +35,8 @@ #include "utils/string.hpp" #include "utils/thread/sync.hpp" -DEFINE_HIDDEN_int32(remote_pull_sleep_micros, 10, - "Sleep between remote result pulling in microseconds"); +// TODO: Remove this when distributed logic is completely removed from here. +DECLARE_int32(remote_pull_sleep_micros); // macro for the default implementation of LogicalOperator::Accept // that accepts the visitor and visits it's input_ operator @@ -3123,46 +3120,6 @@ void Union::UnionCursor::Reset() { right_cursor_->Reset(); } -bool PullRemote::Accept(HierarchicalLogicalOperatorVisitor &visitor) { - if (visitor.PreVisit(*this)) { - if (input_) input_->Accept(visitor); - } - return visitor.PostVisit(*this); -} - -std::vector PullRemote::OutputSymbols(const SymbolTable &table) const { - return input_ ? input_->OutputSymbols(table) : std::vector{}; -} - -std::vector PullRemote::ModifiedSymbols( - const SymbolTable &table) const { - auto symbols = symbols_; - if (input_) { - auto input_symbols = input_->ModifiedSymbols(table); - symbols.insert(symbols.end(), input_symbols.begin(), input_symbols.end()); - } - return symbols; -} - -std::vector Synchronize::ModifiedSymbols( - const SymbolTable &table) const { - auto symbols = input_->ModifiedSymbols(table); - if (pull_remote_) { - auto pull_symbols = pull_remote_->ModifiedSymbols(table); - symbols.insert(symbols.end(), pull_symbols.begin(), pull_symbols.end()); - } - return symbols; -} - -bool Synchronize::Accept(HierarchicalLogicalOperatorVisitor &visitor) { - if (visitor.PreVisit(*this)) { - // pull_remote_ is optional here, so visit it only if we continue visiting - // and pull_remote_ does exist. - input_->Accept(visitor) && pull_remote_ && pull_remote_->Accept(visitor); - } - return visitor.PostVisit(*this); -} - std::vector Cartesian::ModifiedSymbols(const SymbolTable &table) const { auto symbols = left_op_->ModifiedSymbols(table); auto right = right_op_->ModifiedSymbols(table); @@ -3179,470 +3136,8 @@ bool Cartesian::Accept(HierarchicalLogicalOperatorVisitor &visitor) { WITHOUT_SINGLE_INPUT(Cartesian); -PullRemoteOrderBy::PullRemoteOrderBy( - const std::shared_ptr &input, int64_t plan_id, - const std::vector> &order_by, - const std::vector &symbols) - : input_(input), plan_id_(plan_id), symbols_(symbols) { - CHECK(input_ != nullptr) - << "PullRemoteOrderBy should always be constructed with input!"; - std::vector ordering; - ordering.reserve(order_by.size()); - order_by_.reserve(order_by.size()); - for (const auto &ordering_expression_pair : order_by) { - ordering.emplace_back(ordering_expression_pair.first); - order_by_.emplace_back(ordering_expression_pair.second); - } - compare_ = TypedValueVectorCompare(ordering); -} - -ACCEPT_WITH_INPUT(PullRemoteOrderBy); - -std::vector PullRemoteOrderBy::OutputSymbols( - const SymbolTable &table) const { - return input_->OutputSymbols(table); -} - -std::vector PullRemoteOrderBy::ModifiedSymbols( - const SymbolTable &table) const { - return input_->ModifiedSymbols(table); -} - namespace { -/** Helper class that wraps remote pulling for cursors that handle results from - * distributed workers. - * - * The command_id should be the command_id at the initialization of a cursor. - */ -class RemotePuller { - public: - RemotePuller(distributed::PullRpcClients *pull_clients, - database::GraphDbAccessor &db, - const std::vector &symbols, int64_t plan_id, - tx::CommandId command_id) - : pull_clients_(pull_clients), - db_(db), - symbols_(symbols), - plan_id_(plan_id), - command_id_(command_id) { - CHECK(pull_clients_); - worker_ids_ = pull_clients_->GetWorkerIds(); - // Remove master from the worker ids list. - worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0)); - } - - void Initialize(Context &context) { - if (!remote_pulls_initialized_) { - VLOG(10) << "[RemotePuller] [" << context.db_accessor_.transaction_id() - << "] [" << plan_id_ << "] [" << command_id_ << "] initialized"; - for (auto &worker_id : worker_ids_) { - UpdatePullForWorker(worker_id, context); - } - remote_pulls_initialized_ = true; - } - } - - void Update(Context &context) { - // If we don't have results for a worker, check if his remote pull - // finished and save results locally. - - auto move_frames = [this, &context](int worker_id, auto remote_results) { - VLOG(10) << "[RemotePuller] [" << context.db_accessor_.transaction_id() - << "] [" << plan_id_ << "] [" << command_id_ - << "] received results from " << worker_id; - remote_results_[worker_id] = std::move(remote_results.frames); - // Since we return and remove results from the back of the vector, - // reverse the results so the first to return is on the end of the - // vector. - std::reverse(remote_results_[worker_id].begin(), - remote_results_[worker_id].end()); - }; - - for (auto &worker_id : worker_ids_) { - if (!remote_results_[worker_id].empty()) continue; - - auto found_it = remote_pulls_.find(worker_id); - if (found_it == remote_pulls_.end()) continue; - - auto &remote_pull = found_it->second; - if (!remote_pull.IsReady()) continue; - - auto remote_results = remote_pull.get(); - switch (remote_results.pull_state) { - case distributed::PullState::CURSOR_EXHAUSTED: - VLOG(10) << "[RemotePuller] [" - << context.db_accessor_.transaction_id() << "] [" << plan_id_ - << "] [" << command_id_ << "] cursor exhausted from " - << worker_id; - move_frames(worker_id, remote_results); - remote_pulls_.erase(found_it); - break; - case distributed::PullState::CURSOR_IN_PROGRESS: - VLOG(10) << "[RemotePuller] [" - << context.db_accessor_.transaction_id() << "] [" << plan_id_ - << "] [" << command_id_ << "] cursor in progress from " - << worker_id; - move_frames(worker_id, remote_results); - UpdatePullForWorker(worker_id, context); - break; - case distributed::PullState::SERIALIZATION_ERROR: - throw mvcc::SerializationError( - "Serialization error occured during PullRemote !"); - case distributed::PullState::LOCK_TIMEOUT_ERROR: - throw utils::LockTimeoutException( - "LockTimeout error occured during PullRemote !"); - case distributed::PullState::UPDATE_DELETED_ERROR: - throw QueryRuntimeException( - "RecordDeleted error ocured during PullRemote !"); - case distributed::PullState::RECONSTRUCTION_ERROR: - throw query::ReconstructionException(); - case distributed::PullState::UNABLE_TO_DELETE_VERTEX_ERROR: - throw RemoveAttachedVertexException(); - case distributed::PullState::HINTED_ABORT_ERROR: - throw HintedAbortError(); - case distributed::PullState::QUERY_ERROR: - throw QueryRuntimeException( - "Query runtime error occurred during PullRemote !"); - } - } - } - - void Reset() { - worker_ids_ = pull_clients_->GetWorkerIds(); - // Remove master from the worker ids list. - worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0)); - - // We must clear remote_pulls before reseting cursors to make sure that all - // outstanding remote pulls are done. Otherwise we might try to reset cursor - // during its pull. - remote_pulls_.clear(); - for (auto &worker_id : worker_ids_) { - pull_clients_->ResetCursor(&db_, worker_id, plan_id_, command_id_); - } - remote_results_.clear(); - remote_pulls_initialized_ = false; - } - - auto Workers() { return worker_ids_; } - - int GetWorkerId(int worker_id_index) { return worker_ids_[worker_id_index]; } - - size_t WorkerCount() { return worker_ids_.size(); } - - void ClearWorkers() { worker_ids_.clear(); } - - bool HasPendingPulls() { return !remote_pulls_.empty(); } - - bool HasPendingPullFromWorker(int worker_id) { - return remote_pulls_.find(worker_id) != remote_pulls_.end(); - } - - bool HasResultsFromWorker(int worker_id) { - return !remote_results_[worker_id].empty(); - } - - std::vector PopResultFromWorker(int worker_id) { - auto result = remote_results_[worker_id].back(); - remote_results_[worker_id].pop_back(); - - // Remove the worker if we exhausted all locally stored results and there - // are no more pending remote pulls for that worker. - if (remote_results_[worker_id].empty() && - remote_pulls_.find(worker_id) == remote_pulls_.end()) { - worker_ids_.erase( - std::find(worker_ids_.begin(), worker_ids_.end(), worker_id)); - } - - return result; - } - - private: - distributed::PullRpcClients *pull_clients_{nullptr}; - database::GraphDbAccessor &db_; - std::vector symbols_; - int64_t plan_id_; - tx::CommandId command_id_; - std::unordered_map> remote_pulls_; - std::unordered_map>> - remote_results_; - std::vector worker_ids_; - bool remote_pulls_initialized_ = false; - - void UpdatePullForWorker(int worker_id, Context &context) { - remote_pulls_[worker_id] = pull_clients_->Pull( - &db_, worker_id, plan_id_, command_id_, context.parameters_, symbols_, - context.timestamp_, false); - } -}; - -class PullRemoteCursor : public Cursor { - public: - PullRemoteCursor(const PullRemote &self, database::GraphDbAccessor &db) - : self_(self), - input_cursor_(self.input() ? self.input()->MakeCursor(db) : nullptr), - command_id_(db.transaction().cid()), - remote_puller_( - // TODO: Pass in a Master GraphDb. - &dynamic_cast(&db.db())->pull_clients(), db, - self.symbols(), self.plan_id(), command_id_) {} - - bool Pull(Frame &frame, Context &context) override { - if (context.db_accessor_.should_abort()) throw HintedAbortError(); - remote_puller_.Initialize(context); - - bool have_remote_results = false; - while (!have_remote_results && remote_puller_.WorkerCount() > 0) { - if (context.db_accessor_.should_abort()) throw HintedAbortError(); - remote_puller_.Update(context); - - // Get locally stored results from workers in a round-robin fasion. - int num_workers = remote_puller_.WorkerCount(); - for (int i = 0; i < num_workers; ++i) { - int worker_id_index = - (last_pulled_worker_id_index_ + i + 1) % num_workers; - int worker_id = remote_puller_.GetWorkerId(worker_id_index); - - if (remote_puller_.HasResultsFromWorker(worker_id)) { - last_pulled_worker_id_index_ = worker_id_index; - have_remote_results = true; - break; - } - } - - if (!have_remote_results) { - if (!remote_puller_.HasPendingPulls()) { - remote_puller_.ClearWorkers(); - break; - } - - // If there are no remote results available, try to pull and return - // local results. - if (input_cursor_ && input_cursor_->Pull(frame, context)) { - VLOG(10) << "[PullRemoteCursor] [" - << context.db_accessor_.transaction_id() << "] [" - << self_.plan_id() << "] [" << command_id_ - << "] producing local results "; - return true; - } - - VLOG(10) << "[PullRemoteCursor] [" - << context.db_accessor_.transaction_id() << "] [" - << self_.plan_id() << "] [" << command_id_ - << "] no results available, sleeping "; - // If there aren't any local/remote results available, sleep. - std::this_thread::sleep_for( - std::chrono::microseconds(FLAGS_remote_pull_sleep_micros)); - } - } - - // No more remote results, make sure local results get exhausted. - if (!have_remote_results) { - if (input_cursor_ && input_cursor_->Pull(frame, context)) { - VLOG(10) << "[PullRemoteCursor] [" - << context.db_accessor_.transaction_id() << "] [" - << self_.plan_id() << "] [" << command_id_ - << "] producing local results "; - return true; - } - return false; - } - - { - int worker_id = remote_puller_.GetWorkerId(last_pulled_worker_id_index_); - VLOG(10) << "[PullRemoteCursor] [" - << context.db_accessor_.transaction_id() << "] [" - << self_.plan_id() << "] [" << command_id_ - << "] producing results from worker " << worker_id; - auto result = remote_puller_.PopResultFromWorker(worker_id); - for (size_t i = 0; i < self_.symbols().size(); ++i) { - frame[self_.symbols()[i]] = std::move(result[i]); - } - } - return true; - } - - void Reset() override { - if (input_cursor_) input_cursor_->Reset(); - remote_puller_.Reset(); - last_pulled_worker_id_index_ = 0; - } - - private: - const PullRemote &self_; - const std::unique_ptr input_cursor_; - tx::CommandId command_id_; - RemotePuller remote_puller_; - int last_pulled_worker_id_index_ = 0; -}; - -class SynchronizeCursor : public Cursor { - public: - SynchronizeCursor(const Synchronize &self, database::GraphDbAccessor &db) - : self_(self), - pull_clients_( - // TODO: Pass in a Master GraphDb. - &dynamic_cast(&db.db())->pull_clients()), - updates_clients_( - // TODO: Pass in a Master GraphDb. - &dynamic_cast(&db.db())->updates_clients()), - updates_server_( - // TODO: Pass in a Master GraphDb. - &dynamic_cast(&db.db())->updates_server()), - input_cursor_(self.input()->MakeCursor(db)), - pull_remote_cursor_( - self.pull_remote() ? self.pull_remote()->MakeCursor(db) : nullptr), - command_id_(db.transaction().cid()), - master_id_( - // TODO: Pass in a Master GraphDb. - dynamic_cast(&db.db())->WorkerId()) {} - - bool Pull(Frame &frame, Context &context) override { - if (!initial_pull_done_) { - InitialPull(frame, context); - initial_pull_done_ = true; - } - // Yield local stuff while available. - if (!local_frames_.empty()) { - VLOG(10) << "[SynchronizeCursor] [" - << context.db_accessor_.transaction_id() - << "] producing local results"; - auto &result = local_frames_.back(); - for (size_t i = 0; i < frame.elems().size(); ++i) { - if (self_.advance_command()) { - query::ReconstructTypedValue(result[i]); - } - frame.elems()[i] = std::move(result[i]); - } - local_frames_.resize(local_frames_.size() - 1); - return true; - } - - // We're out of local stuff, yield from pull_remote if available. - if (pull_remote_cursor_ && pull_remote_cursor_->Pull(frame, context)) { - VLOG(10) << "[SynchronizeCursor] [" - << context.db_accessor_.transaction_id() - << "] producing remote results"; - return true; - } - - return false; - } - - void Reset() override { - input_cursor_->Reset(); - pull_remote_cursor_->Reset(); - initial_pull_done_ = false; - local_frames_.clear(); - } - - private: - const Synchronize &self_; - distributed::PullRpcClients *pull_clients_{nullptr}; - distributed::UpdatesRpcClients *updates_clients_{nullptr}; - distributed::UpdatesRpcServer *updates_server_{nullptr}; - const std::unique_ptr input_cursor_; - const std::unique_ptr pull_remote_cursor_; - bool initial_pull_done_{false}; - std::vector> local_frames_; - tx::CommandId command_id_; - int master_id_; - - void InitialPull(Frame &frame, Context &context) { - VLOG(10) << "[SynchronizeCursor] [" << context.db_accessor_.transaction_id() - << "] initial pull"; - - // Tell all workers to accumulate, only if there is a remote pull. - std::vector> worker_accumulations; - if (pull_remote_cursor_) { - for (auto worker_id : pull_clients_->GetWorkerIds()) { - if (worker_id == master_id_) continue; - worker_accumulations.emplace_back(pull_clients_->Pull( - &context.db_accessor_, worker_id, self_.pull_remote()->plan_id(), - command_id_, context.parameters_, self_.pull_remote()->symbols(), - context.timestamp_, true, 0)); - } - } - - // Accumulate local results - while (input_cursor_->Pull(frame, context)) { - local_frames_.emplace_back(); - auto &local_frame = local_frames_.back(); - local_frame.reserve(frame.elems().size()); - for (auto &elem : frame.elems()) { - local_frame.emplace_back(std::move(elem)); - } - } - - // Wait for all workers to finish accumulation (first sync point). - for (auto &accu : worker_accumulations) { - switch (accu.get().pull_state) { - case distributed::PullState::CURSOR_EXHAUSTED: - continue; - case distributed::PullState::CURSOR_IN_PROGRESS: - throw QueryRuntimeException( - "Expected exhausted cursor after remote pull accumulate"); - case distributed::PullState::SERIALIZATION_ERROR: - throw mvcc::SerializationError( - "Failed to perform remote accumulate due to " - "SerializationError"); - case distributed::PullState::UPDATE_DELETED_ERROR: - throw QueryRuntimeException( - "Failed to perform remote accumulate due to " - "RecordDeletedError"); - case distributed::PullState::LOCK_TIMEOUT_ERROR: - throw utils::LockTimeoutException( - "Failed to perform remote accumulate due to " - "LockTimeoutException"); - case distributed::PullState::RECONSTRUCTION_ERROR: - throw QueryRuntimeException( - "Failed to perform remote accumulate due to " - "ReconstructionError"); - case distributed::PullState::UNABLE_TO_DELETE_VERTEX_ERROR: - throw RemoveAttachedVertexException(); - case distributed::PullState::HINTED_ABORT_ERROR: - throw HintedAbortError(); - case distributed::PullState::QUERY_ERROR: - throw QueryRuntimeException( - "Failed to perform remote accumulate due to Query runtime " - "error"); - } - } - - if (self_.advance_command()) { - context.db_accessor_.AdvanceCommand(); - } - - // Make all the workers apply their deltas. - auto tx_id = context.db_accessor_.transaction_id(); - auto apply_futures = updates_clients_->UpdateApplyAll(master_id_, tx_id); - updates_server_->Apply(tx_id); - for (auto &future : apply_futures) { - switch (future.get()) { - case distributed::UpdateResult::SERIALIZATION_ERROR: - throw mvcc::SerializationError( - "Failed to apply deferred updates due to SerializationError"); - case distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: - throw RemoveAttachedVertexException(); - case distributed::UpdateResult::UPDATE_DELETED_ERROR: - throw QueryRuntimeException( - "Failed to apply deferred updates due to RecordDeletedError"); - case distributed::UpdateResult::LOCK_TIMEOUT_ERROR: - throw utils::LockTimeoutException( - "Failed to apply deferred update due to LockTimeoutException"); - case distributed::UpdateResult::DONE: - break; - } - } - - // If the command advanced, let the workers know. - if (self_.advance_command()) { - auto futures = pull_clients_->NotifyAllTransactionCommandAdvanced(tx_id); - for (auto &future : futures) future.wait(); - } - } -}; - class CartesianCursor : public Cursor { public: CartesianCursor(const Cartesian &self, database::GraphDbAccessor &db) @@ -3723,175 +3218,13 @@ class CartesianCursor : public Cursor { bool cartesian_pull_initialized_{false}; }; -class PullRemoteOrderByCursor : public Cursor { - public: - PullRemoteOrderByCursor(const PullRemoteOrderBy &self, - database::GraphDbAccessor &db) - : self_(self), - input_(self.input()->MakeCursor(db)), - command_id_(db.transaction().cid()), - remote_puller_( - // TODO: Pass in a Master GraphDb. - &dynamic_cast(&db.db())->pull_clients(), db, - self.symbols(), self.plan_id(), command_id_) {} - - bool Pull(Frame &frame, Context &context) { - if (context.db_accessor_.should_abort()) throw HintedAbortError(); - ExpressionEvaluator evaluator(frame, &context, GraphView::OLD); - - auto evaluate_result = [this, &evaluator]() { - std::vector order_by; - order_by.reserve(self_.order_by().size()); - for (auto expression_ptr : self_.order_by()) { - order_by.emplace_back(expression_ptr->Accept(evaluator)); - } - return order_by; - }; - - auto restore_frame = [&frame, - this](const std::vector &restore_from) { - for (size_t i = 0; i < restore_from.size(); ++i) { - frame[self_.symbols()[i]] = restore_from[i]; - } - }; - - if (!merge_initialized_) { - VLOG(10) << "[PullRemoteOrderBy] [" - << context.db_accessor_.transaction_id() << "] [" - << self_.plan_id() << "] [" << command_id_ << "] initialize"; - remote_puller_.Initialize(context); - missing_results_from_ = remote_puller_.Workers(); - missing_master_result_ = true; - merge_initialized_ = true; - } - - if (missing_master_result_) { - if (input_->Pull(frame, context)) { - std::vector output; - output.reserve(self_.symbols().size()); - for (const Symbol &symbol : self_.symbols()) { - output.emplace_back(frame[symbol]); - } - - merge_.push_back(MergeResultItem{std::experimental::nullopt, output, - evaluate_result()}); - } - missing_master_result_ = false; - } - - while (!missing_results_from_.empty()) { - if (context.db_accessor_.should_abort()) throw HintedAbortError(); - remote_puller_.Update(context); - - bool has_all_result = true; - for (auto &worker_id : missing_results_from_) { - if (!remote_puller_.HasResultsFromWorker(worker_id) && - remote_puller_.HasPendingPullFromWorker(worker_id)) { - has_all_result = false; - break; - } - } - - if (!has_all_result) { - VLOG(10) << "[PullRemoteOrderByCursor] [" - << context.db_accessor_.transaction_id() << "] [" - << self_.plan_id() << "] [" << command_id_ - << "] missing results, sleep"; - // If we don't have results from all workers, sleep before continuing. - std::this_thread::sleep_for( - std::chrono::microseconds(FLAGS_remote_pull_sleep_micros)); - continue; - } - - for (auto &worker_id : missing_results_from_) { - // It is possible that the workers remote pull finished but it didn't - // return any results. In that case, just skip it. - if (!remote_puller_.HasResultsFromWorker(worker_id)) continue; - auto remote_result = remote_puller_.PopResultFromWorker(worker_id); - restore_frame(remote_result); - merge_.push_back( - MergeResultItem{worker_id, remote_result, evaluate_result()}); - } - - missing_results_from_.clear(); - } - - if (merge_.empty()) return false; - - auto result_it = std::min_element( - merge_.begin(), merge_.end(), [this](const auto &lhs, const auto &rhs) { - return self_.compare()(lhs.order_by, rhs.order_by); - }); - - restore_frame(result_it->remote_result); - - if (result_it->worker_id) { - VLOG(10) << "[PullRemoteOrderByCursor] [" - << context.db_accessor_.transaction_id() << "] [" - << self_.plan_id() << "] [" << command_id_ - << "] producing results from worker " - << result_it->worker_id.value(); - missing_results_from_.push_back(result_it->worker_id.value()); - } else { - VLOG(10) << "[PullRemoteOrderByCursor] [" - << context.db_accessor_.transaction_id() << "] [" - << self_.plan_id() << "] [" << command_id_ - << "] producing local results"; - missing_master_result_ = true; - } - - merge_.erase(result_it); - return true; - } - - void Reset() { - input_->Reset(); - remote_puller_.Reset(); - merge_.clear(); - missing_results_from_.clear(); - missing_master_result_ = false; - merge_initialized_ = false; - } - - private: - struct MergeResultItem { - std::experimental::optional worker_id; - std::vector remote_result; - std::vector order_by; - }; - - const PullRemoteOrderBy &self_; - std::unique_ptr input_; - tx::CommandId command_id_; - RemotePuller remote_puller_; - std::vector merge_; - std::vector missing_results_from_; - bool missing_master_result_ = false; - bool merge_initialized_ = false; -}; - } // namespace -std::unique_ptr PullRemote::MakeCursor( - database::GraphDbAccessor &db) const { - return std::make_unique(*this, db); -} - -std::unique_ptr Synchronize::MakeCursor( - database::GraphDbAccessor &db) const { - return std::make_unique(*this, db); -} - std::unique_ptr Cartesian::MakeCursor( database::GraphDbAccessor &db) const { return std::make_unique(*this, db); } -std::unique_ptr PullRemoteOrderBy::MakeCursor( - database::GraphDbAccessor &db) const { - return std::make_unique(*this, db); -} - AuthHandler::AuthHandler(AuthQuery::Action action, std::string user, std::string role, std::string user_or_role, Expression *password, diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp index bbea378e4..be775b85d 100644 --- a/src/query/plan/operator.lcp +++ b/src/query/plan/operator.lcp @@ -10,7 +10,6 @@ #include #include -#include "distributed/pull_produce_rpc_messages.hpp" #include "query/common.hpp" #include "query/frontend/ast/ast.hpp" #include "query/frontend/semantic/symbol.hpp" @@ -21,7 +20,13 @@ #include "utils/hashing/fnv.hpp" #include "utils/visitor.hpp" -#include "query/plan/operator.capnp.h" +// TODO: It makes no sense for this file to include distributed_ops Cap'n +// Proto schema. Unfortunately, forward declares of nested capnp types do not +// work, so we cannot use that mechanism. Perhaps we want to generate +// serialization code based only on type info, similarly to RPCs. This +// requires rework of LCP and we will need to have a sort of 'register' +// mechanism to fill the table of (type info -> operator constructor). +#include "query/plan/distributed_ops.capnp.h" namespace database { class GraphDbAccessor; @@ -2289,105 +2294,8 @@ vectors of symbols used by each of the inputs.") cpp<#) (:serialize :capnp)) -(lcp:define-class pull-remote (logical-operator) - ((input "std::shared_ptr" - :capnp-save #'save-operator-pointer - :capnp-load #'load-operator-pointer) - (plan-id :int64_t :initval 0 :reader t) - (symbols "std::vector" :reader t - :capnp-save (lcp:capnp-save-vector "::query::capnp::Symbol" "Symbol") - :capnp-load (lcp:capnp-load-vector "::query::capnp::Symbol" "Symbol"))) - (:documentation - "An operator in distributed Memgraph that yields both local and remote (from -other workers) frames. Obtaining remote frames is done through RPC calls to -`distributed::ProduceRpcServer`s running on all the workers. - -This operator aims to yield results as fast as possible and lose minimal -time on data transfer. It gives no guarantees on result order.") - (:public - #>cpp - PullRemote(const std::shared_ptr &input, int64_t plan_id, - const std::vector &symbols) - : input_(input), plan_id_(plan_id), symbols_(symbols) {} - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector OutputSymbols(const SymbolTable &) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - cpp<#) - (:private #>cpp PullRemote() {} cpp<#) - (:serialize :capnp)) - -(defun load-pull-remote (reader member-name) - #>cpp - ${member-name} = std::static_pointer_cast( - utils::LoadSharedPtr(${reader}, - [helper](const auto &reader) { - auto op = LogicalOperator::Construct(reader); - op->Load(reader, helper); - return op.release(); - }, &helper->loaded_ops)); - cpp<#) - -(lcp:define-class synchronize (logical-operator) - ((input "std::shared_ptr" - :capnp-save #'save-operator-pointer - :capnp-load #'load-operator-pointer) - (pull-remote "std::shared_ptr" :reader t - :capnp-save #'save-operator-pointer - :capnp-load #'load-pull-remote) - (advance-command :bool :initval "false" :reader t)) - (:documentation - "Operator used to synchronize stages of plan execution between the master and -all the workers. Synchronization is necessary in queries that update that -graph state because updates (as well as creations and deletions) are deferred -to avoid multithreaded modification of graph element data (as it's not -thread-safe). - -Logic of the synchronize operator is: - -1. If there is a Pull, tell all the workers to pull on that plan and - accumulate results without sending them to the master. This is async. -2. Accumulate local results, in parallel with 1. getting executed on workers. -3. Wait till the master and all the workers are done accumulating. -4. Advance the command, if necessary. -5. Tell all the workers to apply their updates. This is async. -6. Apply local updates, in parallel with 5. on the workers. -7. Notify workers that the command has advanced, if necessary. -8. Yield all the results, first local, then from Pull if available.") - (:public - #>cpp - Synchronize(const std::shared_ptr &input, - const std::shared_ptr &pull_remote, - bool advance_command) - : input_(input), - pull_remote_(pull_remote), - advance_command_(advance_command) {} - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - std::vector OutputSymbols( - const SymbolTable &symbol_table) const override { - return input_->OutputSymbols(symbol_table); - } - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - cpp<#) - (:private #>cpp Synchronize() {} cpp<#) - (:serialize :capnp)) - +;; TODO: We should probably output this operator in regular planner, not just +;; distributed planner. (lcp:define-class cartesian (logical-operator) ((left-op "std::shared_ptr" :reader t :capnp-save #'save-operator-pointer @@ -2427,48 +2335,7 @@ Logic of the synchronize operator is: (:private #>cpp Cartesian() {} cpp<#) (:serialize :capnp)) -(lcp:define-class pull-remote-order-by (logical-operator) - ((input "std::shared_ptr" - :capnp-save #'save-operator-pointer - :capnp-load #'load-operator-pointer) - (plan-id :int64_t :initval 0 :reader t) - (symbols "std::vector" :reader t - :capnp-save (lcp:capnp-save-vector "::query::capnp::Symbol" "Symbol") - :capnp-load (lcp:capnp-load-vector "::query::capnp::Symbol" "Symbol")) - (order-by "std::vector" :reader t - :capnp-type "List(Ast.Tree)" - :capnp-save (save-ast-vector "Expression *") - :capnp-load (load-ast-vector "Expression *")) - (compare "TypedValueVectorCompare" :reader t - :capnp-type "Common.TypedValueVectorCompare")) - (:documentation - "Operator that merges distributed OrderBy operators. -Instead of using a regular OrderBy on master (which would collect all remote -results and order them), we can have each worker do an OrderBy locally and -have the master rely on the fact that the results are ordered and merge them -by having only one result from each worker.") - (:public - #>cpp - PullRemoteOrderBy( - const std::shared_ptr &input, int64_t plan_id, - const std::vector> &order_by, - const std::vector &symbols); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - - std::vector ModifiedSymbols(const SymbolTable &) const override; - std::vector OutputSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - cpp<#) - (:private #>cpp PullRemoteOrderBy() {} cpp<#) - (:serialize :capnp)) - +;;; KAFKA STREAM OPERATORS (lcp:define-class create-stream (logical-operator) ((stream-name "std::string" :reader t) (stream-uri "Expression *" @@ -2662,6 +2529,8 @@ in the db.") #>cpp TestStream() {} cpp<#) (:serialize :capnp)) +;;; END KAFKA STREAM OPERATORS + (lcp:define-class explain (logical-operator) ((input "std::shared_ptr" :capnp-save #'save-operator-pointer diff --git a/src/query/plan/pretty_print.cpp b/src/query/plan/pretty_print.cpp index 2dfec7b1f..d850eaec8 100644 --- a/src/query/plan/pretty_print.cpp +++ b/src/query/plan/pretty_print.cpp @@ -1,6 +1,7 @@ #include "query/plan/pretty_print.hpp" #include "database/graph_db_accessor.hpp" +#include "query/plan/distributed_ops.hpp" #include "query/plan/operator.hpp" namespace query::plan { diff --git a/tests/unit/distributed_query_plan.cpp b/tests/unit/distributed_query_plan.cpp index 8cf355c70..ccac2192e 100644 --- a/tests/unit/distributed_query_plan.cpp +++ b/tests/unit/distributed_query_plan.cpp @@ -20,6 +20,7 @@ #include "query/frontend/semantic/symbol_generator.hpp" #include "query/frontend/semantic/symbol_table.hpp" #include "query/interpreter.hpp" +#include "query/plan/distributed_ops.hpp" #include "query/plan/planner.hpp" #include "query/typed_value.hpp" #include "query_common.hpp" diff --git a/tests/unit/distributed_reset.cpp b/tests/unit/distributed_reset.cpp index 0a3dbaf1b..35471c092 100644 --- a/tests/unit/distributed_reset.cpp +++ b/tests/unit/distributed_reset.cpp @@ -3,6 +3,7 @@ #include "distributed/plan_dispatcher.hpp" #include "distributed_common.hpp" #include "query/context.hpp" +#include "query/plan/distributed_ops.hpp" #include "query/interpret/frame.hpp" class DistributedReset : public DistributedGraphDbTest { diff --git a/tests/unit/query_planner.cpp b/tests/unit/query_planner.cpp index 797191d75..47934a967 100644 --- a/tests/unit/query_planner.cpp +++ b/tests/unit/query_planner.cpp @@ -12,11 +12,11 @@ #include "query/frontend/semantic/symbol_generator.hpp" #include "query/frontend/semantic/symbol_table.hpp" #include "query/plan/distributed.hpp" +#include "query/plan/distributed_ops.hpp" #include "query/plan/operator.hpp" #include "query/plan/planner.hpp" #include -#include "query/plan/operator.capnp.h" #include "query_common.hpp" diff --git a/tools/lcp b/tools/lcp index a02e74efb..8c41ab80c 100755 --- a/tools/lcp +++ b/tools/lcp @@ -1,9 +1,11 @@ #!/bin/bash -e if [[ $# -ne 1 && $# -ne 2 ]]; then - echo "Usage: $0 LCP_FILE [CAPNP_ID]" + echo "Usage: $0 LCP_FILE [--capnp-declaration | CAPNP_ID]" echo "Convert a LCP_FILE to C++ header file and output to stdout." echo "If CAPNP_ID is provided, then the Cap'n Proto schema is generated." + echo "Passing --capnp-declaration flag will declare serialization functions in " + echo "C++ header without actually generating the Cap'n Proto schema or code." exit 1 fi @@ -22,7 +24,11 @@ fi capnp="" if [[ $# -eq 2 ]]; then - capnp=":capnp-id \"$2\"" + if [[ "$2" == "--capnp-declaration" ]]; then + capnp=":capnp-declaration t" + else + capnp=":capnp-id \"$2\"" + fi fi echo \