diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index 269ac7996..1b41e88e2 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -39,6 +39,8 @@ BOOST_CLASS_EXPORT(tx::AbortReq); BOOST_CLASS_EXPORT(tx::AbortRes); BOOST_CLASS_EXPORT(tx::SnapshotReq); BOOST_CLASS_EXPORT(tx::SnapshotRes); +BOOST_CLASS_EXPORT(tx::CommandReq); +BOOST_CLASS_EXPORT(tx::CommandRes); BOOST_CLASS_EXPORT(tx::GcSnapshotReq); BOOST_CLASS_EXPORT(tx::ClogInfoReq); BOOST_CLASS_EXPORT(tx::ClogInfoRes); @@ -70,6 +72,8 @@ BOOST_CLASS_EXPORT(distributed::RemotePullReq); BOOST_CLASS_EXPORT(distributed::RemotePullRes); BOOST_CLASS_EXPORT(distributed::EndRemotePullReq); BOOST_CLASS_EXPORT(distributed::EndRemotePullRes); +BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedReq); +BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedRes); // Distributed indexes. BOOST_CLASS_EXPORT(distributed::BuildIndexReq); @@ -86,3 +90,7 @@ BOOST_CLASS_EXPORT(stats::BatchStatsRes); BOOST_CLASS_EXPORT(database::StateDelta); BOOST_CLASS_EXPORT(distributed::RemoteUpdateReq); BOOST_CLASS_EXPORT(distributed::RemoteUpdateRes); +BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyReq); +BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyRes); +BOOST_CLASS_EXPORT(distributed::RemoteUpdateDiscardReq); +BOOST_CLASS_EXPORT(distributed::RemoteUpdateDiscardRes); diff --git a/src/distributed/remote_cache.hpp b/src/distributed/remote_cache.hpp index f9dfb91f6..c5da17741 100644 --- a/src/distributed/remote_cache.hpp +++ b/src/distributed/remote_cache.hpp @@ -89,6 +89,15 @@ class RemoteCache { std::make_pair(std::move(old_record), std::move(new_record)); } + /// Removes all the cached data. All the pointers to that data still held by + /// RecordAccessors will become invalid and must never be dereferenced after + /// this call. To make a RecordAccessor valid again Reconstruct must be called + /// on it. This is typically done after the command advanced. + void ClearCache() { + std::lock_guard guard{lock_}; + cache_.clear(); + } + private: std::mutex lock_; distributed::RemoteDataRpcClients &remote_data_clients_; diff --git a/src/distributed/remote_data_manager.hpp b/src/distributed/remote_data_manager.hpp index 113789a5b..20ec4fe57 100644 --- a/src/distributed/remote_data_manager.hpp +++ b/src/distributed/remote_data_manager.hpp @@ -41,6 +41,12 @@ class RemoteDataManager { template auto &Elements(tx::transaction_id_t tx_id); + /// Calls RemoteCache::ClearCache on vertex and edge caches. + void ClearCaches(tx::transaction_id_t tx_id) { + Vertices(tx_id).ClearCache(); + Edges(tx_id).ClearCache(); + } + private: RemoteDataRpcClients &remote_data_clients_; SpinLock lock_; diff --git a/src/distributed/remote_produce_rpc_server.hpp b/src/distributed/remote_produce_rpc_server.hpp index dde7bdb41..d691af58d 100644 --- a/src/distributed/remote_produce_rpc_server.hpp +++ b/src/distributed/remote_produce_rpc_server.hpp @@ -10,7 +10,9 @@ #include "database/graph_db.hpp" #include "database/graph_db_accessor.hpp" #include "distributed/plan_consumer.hpp" +#include "distributed/remote_data_manager.hpp" #include "distributed/remote_pull_produce_rpc_messages.hpp" +#include "query/common.hpp" #include "query/context.hpp" #include "query/exceptions.hpp" #include "query/frontend/semantic/symbol_table.hpp" @@ -29,7 +31,10 @@ namespace distributed { * identified. */ class RemoteProduceRpcServer { - /** Encapsulates an execution in progress. */ + /// Encapsulates a Cursor execution in progress. Can be used for pulling a + /// single result from the execution, or pulling all and accumulating the + /// results. Accumulations are used for synchronizing updates in distributed + /// MG (see query::plan::Synchronize). class OngoingProduce { public: OngoingProduce(database::GraphDb &db, tx::transaction_id_t tx_id, @@ -45,12 +50,60 @@ class RemoteProduceRpcServer { context_.parameters_ = std::move(parameters); } - /** Returns a vector of typed values (one for each `pull_symbol`), and a - * `bool` indicating if the pull was successful (or the cursor is - * exhausted). */ + /// Returns a vector of typed values (one for each `pull_symbol`), and an + /// indication of the pull result. The result data is valid only if the + /// returned state is CURSOR_IN_PROGRESS. std::pair, RemotePullState> Pull() { - RemotePullState state = RemotePullState::CURSOR_IN_PROGRESS; + if (!accumulation_.empty()) { + auto results = std::move(accumulation_.back()); + accumulation_.pop_back(); + for (auto &element : results) { + try { + query::ReconstructTypedValue(element); + } catch (query::ReconstructionException &) { + cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR; + return std::make_pair(std::move(results), cursor_state_); + } + } + + return std::make_pair(std::move(results), + RemotePullState::CURSOR_IN_PROGRESS); + } + + return PullOneFromCursor(); + } + + /// Accumulates all the frames pulled from the cursor and returns + /// CURSOR_EXHAUSTED. If an error occurs, an appropriate value is returned. + RemotePullState Accumulate() { + while (true) { + auto result = PullOneFromCursor(); + if (result.second != RemotePullState::CURSOR_IN_PROGRESS) + return result.second; + else + accumulation_.emplace_back(std::move(result.first)); + } + } + + private: + database::GraphDbAccessor dba_; + std::unique_ptr cursor_; + query::Context context_; + std::vector pull_symbols_; + query::Frame frame_; + RemotePullState cursor_state_{RemotePullState::CURSOR_IN_PROGRESS}; + std::vector> accumulation_; + + std::pair, RemotePullState> + PullOneFromCursor() { std::vector results; + + // Check if we already exhausted this cursor (or it entered an error + // state). This happens when we accumulate before normal pull. + if (cursor_state_ != RemotePullState::CURSOR_IN_PROGRESS) { + return std::make_pair(results, cursor_state_); + } + try { if (cursor_->Pull(frame_, context_)) { results.reserve(pull_symbols_.size()); @@ -58,32 +111,21 @@ class RemoteProduceRpcServer { results.emplace_back(std::move(frame_[symbol])); } } else { - state = RemotePullState::CURSOR_EXHAUSTED; + cursor_state_ = RemotePullState::CURSOR_EXHAUSTED; } } catch (const mvcc::SerializationError &) { - state = RemotePullState::SERIALIZATION_ERROR; + cursor_state_ = RemotePullState::SERIALIZATION_ERROR; } catch (const LockTimeoutException &) { - state = RemotePullState::LOCK_TIMEOUT_ERROR; + cursor_state_ = RemotePullState::LOCK_TIMEOUT_ERROR; } catch (const RecordDeletedError &) { - state = RemotePullState::UPDATE_DELETED_ERROR; + cursor_state_ = RemotePullState::UPDATE_DELETED_ERROR; } catch (const query::ReconstructionException &) { - state = RemotePullState::RECONSTRUCTION_ERROR; + cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR; } catch (const query::QueryRuntimeException &) { - state = RemotePullState::QUERY_ERROR; + cursor_state_ = RemotePullState::QUERY_ERROR; } - return std::make_pair(std::move(results), state); + return std::make_pair(std::move(results), cursor_state_); } - - private: - // TODO currently each OngoingProduce has it's own GDBA. There is no sharing - // of them in the same transaction. This should be correct, but it's - // inefficient in multi-command queries, and when a single query will get - // broken down into multiple parts. - database::GraphDbAccessor dba_; - std::unique_ptr cursor_; - query::Context context_; - std::vector pull_symbols_; - query::Frame frame_; }; public: @@ -106,6 +148,13 @@ class RemoteProduceRpcServer { ongoing_produces_.erase(it); return std::make_unique(); }); + + remote_produce_rpc_server_.Register( + [this](const TransactionCommandAdvancedReq &req) { + db_.tx_engine().UpdateCommand(req.member); + db_.remote_data_manager().ClearCaches(req.member); + return std::make_unique(); + }); } private: @@ -140,11 +189,19 @@ class RemoteProduceRpcServer { RemotePullResData result{db_.WorkerId(), req.send_old, req.send_new}; result.state_and_frames.pull_state = RemotePullState::CURSOR_IN_PROGRESS; + if (req.accumulate) { + result.state_and_frames.pull_state = ongoing_produce.Accumulate(); + // If an error ocurred, we need to return that error. + if (result.state_and_frames.pull_state != + RemotePullState::CURSOR_EXHAUSTED) { + return result; + } + } + for (int i = 0; i < req.batch_size; ++i) { auto pull_result = ongoing_produce.Pull(); result.state_and_frames.pull_state = pull_result.second; - if (pull_result.second != RemotePullState::CURSOR_IN_PROGRESS) - break; + if (pull_result.second != RemotePullState::CURSOR_IN_PROGRESS) break; result.state_and_frames.frames.emplace_back(std::move(pull_result.first)); } diff --git a/src/distributed/remote_pull_produce_rpc_messages.hpp b/src/distributed/remote_pull_produce_rpc_messages.hpp index 7b7f9f488..1eaa7ee6d 100644 --- a/src/distributed/remote_pull_produce_rpc_messages.hpp +++ b/src/distributed/remote_pull_produce_rpc_messages.hpp @@ -39,11 +39,12 @@ struct RemotePullReq : public communication::rpc::Message { RemotePullReq() {} RemotePullReq(tx::transaction_id_t tx_id, int64_t plan_id, const Parameters ¶ms, std::vector symbols, - int batch_size, bool send_old, bool send_new) + bool accumulate, int batch_size, bool send_old, bool send_new) : tx_id(tx_id), plan_id(plan_id), params(params), symbols(symbols), + accumulate(accumulate), batch_size(batch_size), send_old(send_old), send_new(send_new) {} @@ -52,6 +53,7 @@ struct RemotePullReq : public communication::rpc::Message { int64_t plan_id; Parameters params; std::vector symbols; + bool accumulate; int batch_size; // Indicates which of (old, new) records of a graph element should be sent. bool send_old; @@ -72,6 +74,7 @@ struct RemotePullReq : public communication::rpc::Message { utils::SaveTypedValue(ar, kv.second); } ar << symbols; + ar << accumulate; ar << batch_size; ar << send_old; ar << send_new; @@ -93,6 +96,7 @@ struct RemotePullReq : public communication::rpc::Message { params.Add(token_pos, param); } ar >> symbols; + ar >> accumulate; ar >> batch_size; ar >> send_old; ar >> send_new; @@ -360,11 +364,16 @@ using RemotePullRpc = // optimization not to have to send the full RemotePullReqData pack every // time. -using EndRemotePullReqData = std::pair; -RPC_SINGLE_MEMBER_MESSAGE(EndRemotePullReq, EndRemotePullReqData); +using EndRemotePullData = std::pair; +RPC_SINGLE_MEMBER_MESSAGE(EndRemotePullReq, EndRemotePullData); RPC_NO_MEMBER_MESSAGE(EndRemotePullRes); - using EndRemotePullRpc = communication::rpc::RequestResponse; +RPC_SINGLE_MEMBER_MESSAGE(TransactionCommandAdvancedReq, tx::transaction_id_t); +RPC_NO_MEMBER_MESSAGE(TransactionCommandAdvancedRes); +using TransactionCommandAdvancedRpc = + communication::rpc::RequestResponse; + } // namespace distributed diff --git a/src/distributed/remote_pull_rpc_clients.hpp b/src/distributed/remote_pull_rpc_clients.hpp index 215896a54..048870dbd 100644 --- a/src/distributed/remote_pull_rpc_clients.hpp +++ b/src/distributed/remote_pull_rpc_clients.hpp @@ -27,16 +27,20 @@ class RemotePullRpcClients { /// Calls a remote pull asynchroniously. IMPORTANT: take care not to call this /// function for the same (tx_id, worker_id, plan_id) before the previous call /// has ended. + /// + /// @todo: it might be cleaner to split RemotePull into {InitRemoteCursor, + /// RemotePull, RemoteAccumulate}, but that's a lot of refactoring and more + /// RPC calls. std::future RemotePull( database::GraphDbAccessor &dba, int worker_id, int64_t plan_id, const Parameters ¶ms, const std::vector &symbols, - int batch_size = kDefaultBatchSize) { + bool accumulate, int batch_size = kDefaultBatchSize) { return clients_.ExecuteOnWorker( - worker_id, - [&dba, plan_id, params, symbols, batch_size](ClientPool &client_pool) { + worker_id, [&dba, plan_id, params, symbols, accumulate, + batch_size](ClientPool &client_pool) { auto result = client_pool.Call( - dba.transaction_id(), plan_id, params, symbols, batch_size, true, - true); + dba.transaction_id(), plan_id, params, symbols, accumulate, + batch_size, true, true); auto handle_vertex = [&dba](auto &v) { dba.db() @@ -86,7 +90,7 @@ class RemotePullRpcClients { // Notifies a worker that the given transaction/plan is done. Otherwise the // server is left with potentially unconsumed Cursors that never get deleted. // - // TODO - maybe this needs to be done with hooks into the transactional + // @todo - this needs to be done with hooks into the transactional // engine, so that the Worker discards it's stuff when the relevant // transaction are done. std::future EndRemotePull(int worker_id, tx::transaction_id_t tx_id, @@ -94,7 +98,7 @@ class RemotePullRpcClients { return clients_.ExecuteOnWorker( worker_id, [tx_id, plan_id](ClientPool &client_pool) { return client_pool.Call( - EndRemotePullReqData{tx_id, plan_id}); + EndRemotePullData{tx_id, plan_id}); }); } @@ -107,6 +111,13 @@ class RemotePullRpcClients { for (auto &future : futures) future.wait(); } + std::vector> NotifyAllTransactionCommandAdvanced( + tx::transaction_id_t tx_id) { + return clients_.ExecuteOnWorkers(0, [tx_id](auto &client) { + client.template Call(tx_id); + }); + } + private: RpcWorkerClients clients_; }; diff --git a/src/distributed/remote_updates_rpc_clients.hpp b/src/distributed/remote_updates_rpc_clients.hpp index d45ca2f1f..fee2a1438 100644 --- a/src/distributed/remote_updates_rpc_clients.hpp +++ b/src/distributed/remote_updates_rpc_clients.hpp @@ -33,6 +33,16 @@ class RemoteUpdatesRpcClients { ->member; } + /// Calls for all the workers (except the given one) to apply their updates + /// and returns the future results. + std::vector> RemoteUpdateApplyAll( + int skip_worker_id, tx::transaction_id_t tx_id) { + return worker_clients_.ExecuteOnWorkers( + skip_worker_id, [tx_id](auto &client) { + return client.template Call(tx_id)->member; + }); + } + /// Calls for the worker with the given ID to discard remote updates. void RemoteUpdateDiscard(int worker_id, tx::transaction_id_t tx_id) { worker_clients_.GetClientPool(worker_id).Call( diff --git a/src/query/interpret/frame.hpp b/src/query/interpret/frame.hpp index 9449eb2ca..02688d8e9 100644 --- a/src/query/interpret/frame.hpp +++ b/src/query/interpret/frame.hpp @@ -18,6 +18,8 @@ class Frame { return elems_[symbol.position()]; } + auto &elems() { return elems_; } + private: int size_; std::vector elems_; diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 220af4dd3..316e3c29c 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -14,6 +14,8 @@ #include "database/graph_db_accessor.hpp" #include "distributed/remote_pull_rpc_clients.hpp" +#include "distributed/remote_updates_rpc_clients.hpp" +#include "distributed/remote_updates_rpc_server.hpp" #include "query/context.hpp" #include "query/exceptions.hpp" #include "query/frontend/ast/ast.hpp" @@ -2720,18 +2722,6 @@ void Union::UnionCursor::Reset() { right_cursor_->Reset(); } -ProduceRemote::ProduceRemote(const std::shared_ptr &input, - const std::vector &symbols) - : input_(input ? input : std::make_shared()), symbols_(symbols) {} - -ACCEPT_WITH_INPUT(ProduceRemote) - -std::unique_ptr ProduceRemote::MakeCursor( - database::GraphDbAccessor &) const { - // TODO: Implement a concrete cursor. - return nullptr; -} - PullRemote::PullRemote(const std::shared_ptr &input, int64_t plan_id, const std::vector &symbols) : input_(input), plan_id_(plan_id), symbols_(symbols) {} @@ -2761,7 +2751,8 @@ void PullRemote::PullRemoteCursor::EndRemotePull() { bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) { auto insert_future_for_worker = [&](int worker_id) { remote_pulls_[worker_id] = db_.db().remote_pull_clients().RemotePull( - db_, worker_id, self_.plan_id(), context.parameters_, self_.symbols()); + db_, worker_id, self_.plan_id(), context.parameters_, self_.symbols(), + false); }; if (!remote_pulls_initialized_) { @@ -2804,7 +2795,8 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) { "LockTimeout error occured during PullRemote !"); case distributed::RemotePullState::UPDATE_DELETED_ERROR: EndRemotePull(); - throw RecordDeletedError(); + throw QueryRuntimeException( + "RecordDeleted error ocured during PullRemote !"); case distributed::RemotePullState::RECONSTRUCTION_ERROR: EndRemotePull(); throw query::ReconstructionException(); @@ -2896,10 +2888,141 @@ bool Synchronize::Accept(HierarchicalLogicalOperatorVisitor &visitor) { return visitor.PostVisit(*this); } +namespace { +class SynchronizeCursor : public Cursor { + public: + SynchronizeCursor(const Synchronize &self, database::GraphDbAccessor &db) + : self_(self), + input_cursor_(self.input()->MakeCursor(db)), + pull_remote_cursor_( + self.pull_remote() ? self.pull_remote()->MakeCursor(db) : nullptr) { + } + + bool Pull(Frame &frame, Context &context) override { + if (!initial_pull_done_) { + InitialPull(frame, context); + initial_pull_done_ = true; + } + // Yield local stuff while available. + if (!local_frames_.empty()) { + auto &result = local_frames_.back(); + for (size_t i = 0; i < frame.elems().size(); ++i) { + if (self_.advance_command()) { + query::ReconstructTypedValue(result[i]); + } + frame.elems()[i] = std::move(result[i]); + } + local_frames_.resize(local_frames_.size() - 1); + return true; + } + + // We're out of local stuff, yield from pull_remote if available. + if (pull_remote_cursor_ && pull_remote_cursor_->Pull(frame, context)) + return true; + + return false; + } + + void Reset() override { + throw QueryRuntimeException("Unsupported: Reset during Synchronize!"); + } + + private: + const Synchronize &self_; + const std::unique_ptr input_cursor_; + const std::unique_ptr pull_remote_cursor_; + bool initial_pull_done_{false}; + std::vector> local_frames_; + + void InitialPull(Frame &frame, Context &context) { + auto &db = context.db_accessor_.db(); + + // Tell all workers to accumulate, only if there is a remote pull. + std::vector> worker_accumulations; + if (pull_remote_cursor_) { + for (auto worker_id : db.remote_pull_clients().GetWorkerIds()) { + if (worker_id == db.WorkerId()) continue; + worker_accumulations.emplace_back(db.remote_pull_clients().RemotePull( + context.db_accessor_, worker_id, self_.pull_remote()->plan_id(), + context.parameters_, self_.pull_remote()->symbols(), true, 0)); + } + } + + // Accumulate local results + while (input_cursor_->Pull(frame, context)) { + local_frames_.emplace_back(); + auto &local_frame = local_frames_.back(); + local_frame.reserve(frame.elems().size()); + for (auto &elem : frame.elems()) { + local_frame.emplace_back(std::move(elem)); + } + } + + // Wait for all workers to finish accumulation (first sync point). + for (auto &accu : worker_accumulations) { + switch (accu.get().pull_state) { + case distributed::RemotePullState::CURSOR_EXHAUSTED: + continue; + case distributed::RemotePullState::CURSOR_IN_PROGRESS: + throw QueryRuntimeException( + "Expected exhausted cursor after remote pull accumulate"); + case distributed::RemotePullState::SERIALIZATION_ERROR: + throw mvcc::SerializationError( + "Failed to perform remote accumulate due to SerializationError"); + case distributed::RemotePullState::UPDATE_DELETED_ERROR: + throw QueryRuntimeException( + "Failed to perform remote accumulate due to RecordDeletedError"); + case distributed::RemotePullState::LOCK_TIMEOUT_ERROR: + throw LockTimeoutException( + "Failed to perform remote accumulate due to " + "LockTimeoutException"); + case distributed::RemotePullState::RECONSTRUCTION_ERROR: + throw QueryRuntimeException( + "Failed to perform remote accumulate due to ReconstructionError"); + case distributed::RemotePullState::QUERY_ERROR: + throw QueryRuntimeException( + "Failed to perform remote accumulate due to Query runtime error"); + } + } + + if (self_.advance_command()) { + context.db_accessor_.AdvanceCommand(); + } + + // Make all the workers apply their deltas. + auto tx_id = context.db_accessor_.transaction_id(); + auto apply_futures = + db.remote_updates_clients().RemoteUpdateApplyAll(db.WorkerId(), tx_id); + db.remote_updates_server().Apply(tx_id); + for (auto &future : apply_futures) { + switch (future.get()) { + case distributed::RemoteUpdateResult::SERIALIZATION_ERROR: + throw mvcc::SerializationError( + "Failed to apply deferred updates due to SerializationError"); + case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR: + throw QueryRuntimeException( + "Failed to apply deferred updates due to RecordDeletedError"); + case distributed::RemoteUpdateResult::LOCK_TIMEOUT_ERROR: + throw LockTimeoutException( + "Failed to apply deferred update due to LockTimeoutException"); + case distributed::RemoteUpdateResult::DONE: + break; + } + } + + // If the command advanced, let the workers know. + if (self_.advance_command()) { + auto futures = + db.remote_pull_clients().NotifyAllTransactionCommandAdvanced(tx_id); + for (auto &future : futures) future.wait(); + } + } +}; +} + std::unique_ptr Synchronize::MakeCursor( - database::GraphDbAccessor &) const { - // TODO: Implement a concrete cursor. - return nullptr; + database::GraphDbAccessor &db) const { + return std::make_unique(*this, db); } } // namespace query::plan @@ -2936,6 +3059,5 @@ BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Unwind); BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Distinct); BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::CreateIndex); BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Union); -BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::ProduceRemote); BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::PullRemote); BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Synchronize); diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp index 7f15b92f0..f938d818c 100644 --- a/src/query/plan/operator.hpp +++ b/src/query/plan/operator.hpp @@ -97,7 +97,6 @@ class Unwind; class Distinct; class CreateIndex; class Union; -class ProduceRemote; class PullRemote; class Synchronize; @@ -108,8 +107,7 @@ using LogicalOperatorCompositeVisitor = ::utils::CompositeVisitor< SetProperties, SetLabels, RemoveProperty, RemoveLabels, ExpandUniquenessFilter, ExpandUniquenessFilter, Accumulate, Aggregate, Skip, Limit, - OrderBy, Merge, Optional, Unwind, Distinct, Union, ProduceRemote, - PullRemote, Synchronize>; + OrderBy, Merge, Optional, Unwind, Distinct, Union, PullRemote, Synchronize>; using LogicalOperatorLeafVisitor = ::utils::LeafVisitor; @@ -2271,29 +2269,14 @@ class Union : public LogicalOperator { } }; -class ProduceRemote : public LogicalOperator { - public: - ProduceRemote(const std::shared_ptr &input, - const std::vector &symbols); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - - private: - std::shared_ptr input_; - std::vector symbols_; - - ProduceRemote() {} - - friend class boost::serialization::access; - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &symbols_; - } -}; - +/** + * An operator in distributed Memgraph that yields both local and remote (from + * other workers) frames. Obtaining remote frames is done through RPC calls to + * `distributed::RemoteProduceRpcServer`s running on all the workers. + * + * This operator aims to yield results as fast as possible and loose minimal + * time on data transfer. It gives no guarantees on result order. + */ class PullRemote : public LogicalOperator { public: PullRemote(const std::shared_ptr &input, int64_t plan_id, @@ -2346,12 +2329,30 @@ class PullRemote : public LogicalOperator { } }; -/** Operator used to synchronize the execution of master and workers. */ +/** + * Operator used to synchronize stages of plan execution between the master and + * all the workers. Synchronization is necessary in queries that update that + * graph state because updates (as well as creations and deletions) are deferred + * to avoid multithreaded modification of graph element data (as it's not + * thread-safe). + * + * Logic of the synchronize operator is: + * + * 1. If there is a RemotePull, tell all the workers to pull on that plan and + * accumulate results without sending them to the master. This is async. + * 2. Accumulate local results, in parallel with 1. getting executed on workers. + * 3. Wait till the master and all the workers are done accumulating. + * 4. Advance the command, if necessary. + * 5. Tell all the workers to apply their updates. This is async. + * 6. Apply local updates, in parallel with 5. on the workers. + * 7. Notify workers that the command has advanced, if necessary. + * 8. Yield all the resutls, first local, then from RemotePull if available. + */ class Synchronize : public LogicalOperator { public: Synchronize(const std::shared_ptr &input, const std::shared_ptr &pull_remote, - bool advance_command = false) + bool advance_command) : input_(input), pull_remote_(pull_remote), advance_command_(advance_command) {} @@ -2413,6 +2414,5 @@ BOOST_CLASS_EXPORT_KEY(query::plan::Unwind); BOOST_CLASS_EXPORT_KEY(query::plan::Distinct); BOOST_CLASS_EXPORT_KEY(query::plan::CreateIndex); BOOST_CLASS_EXPORT_KEY(query::plan::Union); -BOOST_CLASS_EXPORT_KEY(query::plan::ProduceRemote); BOOST_CLASS_EXPORT_KEY(query::plan::PullRemote); BOOST_CLASS_EXPORT_KEY(query::plan::Synchronize); diff --git a/src/storage/locking/record_lock.cpp b/src/storage/locking/record_lock.cpp index f8994f698..dee8430b2 100644 --- a/src/storage/locking/record_lock.cpp +++ b/src/storage/locking/record_lock.cpp @@ -55,6 +55,19 @@ LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) { tx::transaction_id_t owner = owner_; if (owner_ == tx.id_) return LockStatus::AlreadyHeld; + // In a distributed worker the transaction objects (and the locks they own) + // are not destructed at the same time like on the master. Consequently a lock + // might be active for a dead transaction. By asking the transaction engine + // for transaction info, we'll make the worker refresh it's knowledge about + // live transactions and release obsolete locks. + auto info = engine.Info(owner); + if (!info.is_active()) { + if (lock_.try_lock()) { + owner_ = tx.id_; + return LockStatus::Acquired; + } + } + // Insert edge into local lock_graph. auto accessor = engine.local_lock_graph().access(); auto it = accessor.insert(tx.id_, owner).first; @@ -63,8 +76,7 @@ LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) { // Find oldest transaction in lock cycle if cycle exists and notify that // transaction that it should abort. // TODO: maybe we can be smarter and abort some other transaction and not - // the - // oldest one. + // the oldest one. auto oldest = FindOldestTxInLockCycle(tx.id_, accessor); if (oldest) { engine.LocalForEachActiveTransaction([&](tx::Transaction &t) { diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp index fd955a4d8..80a26eef9 100644 --- a/src/transactions/engine.hpp +++ b/src/transactions/engine.hpp @@ -28,6 +28,9 @@ class Engine { /// Advances the command on the transaction with the given id. virtual command_id_t Advance(transaction_id_t id) = 0; + /// Updates the command on the workers to the master's value. + virtual command_id_t UpdateCommand(transaction_id_t id) = 0; + /// Comits the given transaction. Deletes the transaction object, it's not /// valid after this function executes. virtual void Commit(const Transaction &t) = 0; diff --git a/src/transactions/engine_master.cpp b/src/transactions/engine_master.cpp index 59b74b76e..d15e87849 100644 --- a/src/transactions/engine_master.cpp +++ b/src/transactions/engine_master.cpp @@ -13,7 +13,7 @@ MasterEngine::MasterEngine(communication::rpc::System &system, durability::WriteAheadLog *wal) : SingleNodeEngine(wal), rpc_server_(system, kTransactionEngineRpc) { rpc_server_.Register([this](const BeginReq &) { - auto tx = Begin(); + auto tx = Begin(); return std::make_unique(TxAndSnapshot{tx->id_, tx->snapshot()}); }); @@ -22,12 +22,12 @@ MasterEngine::MasterEngine(communication::rpc::System &system, }); rpc_server_.Register([this](const CommitReq &req) { - Commit(*RunningTransaction(req.member)); + Commit(*RunningTransaction(req.member)); return std::make_unique(); }); rpc_server_.Register([this](const AbortReq &req) { - Abort(*RunningTransaction(req.member)); + Abort(*RunningTransaction(req.member)); return std::make_unique(); }); @@ -38,6 +38,12 @@ MasterEngine::MasterEngine(communication::rpc::System &system, RunningTransaction(req.member)->snapshot()); }); + rpc_server_.Register([this](const CommandReq &req) { + // It is guaranteed that the Worker will not be requesting this for a + // transaction that's done, and that there are no race conditions here. + return std::make_unique(RunningTransaction(req.member)->cid()); + }); + rpc_server_.Register( [this](const communication::rpc::Message &) { return std::make_unique(GlobalGcSnapshot()); diff --git a/src/transactions/engine_rpc_messages.hpp b/src/transactions/engine_rpc_messages.hpp index 431fa7278..b5841018f 100644 --- a/src/transactions/engine_rpc_messages.hpp +++ b/src/transactions/engine_rpc_messages.hpp @@ -23,8 +23,7 @@ struct TxAndSnapshot { } }; RPC_SINGLE_MEMBER_MESSAGE(BeginRes, TxAndSnapshot); -using BeginRpc = - communication::rpc::RequestResponse; +using BeginRpc = communication::rpc::RequestResponse; RPC_SINGLE_MEMBER_MESSAGE(AdvanceReq, transaction_id_t); RPC_SINGLE_MEMBER_MESSAGE(AdvanceRes, command_id_t); @@ -43,6 +42,10 @@ RPC_SINGLE_MEMBER_MESSAGE(SnapshotRes, Snapshot) using SnapshotRpc = communication::rpc::RequestResponse; +RPC_SINGLE_MEMBER_MESSAGE(CommandReq, transaction_id_t) +RPC_SINGLE_MEMBER_MESSAGE(CommandRes, command_id_t) +using CommandRpc = communication::rpc::RequestResponse; + RPC_NO_MEMBER_MESSAGE(GcSnapshotReq) using GcSnapshotRpc = communication::rpc::RequestResponse; diff --git a/src/transactions/engine_single_node.cpp b/src/transactions/engine_single_node.cpp index 459a50a16..da8ec69f6 100644 --- a/src/transactions/engine_single_node.cpp +++ b/src/transactions/engine_single_node.cpp @@ -41,6 +41,14 @@ command_id_t SingleNodeEngine::Advance(transaction_id_t id) { return ++(t->cid_); } +command_id_t SingleNodeEngine::UpdateCommand(transaction_id_t id) { + std::lock_guard guard(lock_); + auto it = store_.find(id); + DCHECK(it != store_.end()) + << "Transaction::advance on non-existing transaction"; + return it->second->cid_; +} + void SingleNodeEngine::Commit(const Transaction &t) { std::lock_guard guard(lock_); clog_.set_committed(t.id_); diff --git a/src/transactions/engine_single_node.hpp b/src/transactions/engine_single_node.hpp index 63b0999e7..4d9b34580 100644 --- a/src/transactions/engine_single_node.hpp +++ b/src/transactions/engine_single_node.hpp @@ -31,6 +31,7 @@ class SingleNodeEngine : public Engine { Transaction *Begin() override; command_id_t Advance(transaction_id_t id) override; + command_id_t UpdateCommand(transaction_id_t id) override; void Commit(const Transaction &t) override; void Abort(const Transaction &t) override; CommitLog::Info Info(transaction_id_t tx) const override; diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index 9e470e1a1..c7f8989a8 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -34,16 +34,32 @@ command_id_t WorkerEngine::Advance(transaction_id_t tx_id) { return res->member; } +command_id_t WorkerEngine::UpdateCommand(transaction_id_t tx_id) { + command_id_t cmd_id = rpc_client_pool_.Call(tx_id)->member; + + // Assume there is no concurrent work being done on this worker in the given + // transaction. This assumption is sound because command advancing needs to be + // done in a synchronized fashion, while no workers are executing in that + // transaction. That assumption lets us freely modify the command id in the + // cached transaction object, and ensures there are no race conditions on + // caching a transaction object if it wasn't cached already. + + auto access = active_.access(); + auto found = access.find(tx_id); + if (found != access.end()) { + found->second->cid_ = cmd_id; + } + return cmd_id; +} + void WorkerEngine::Commit(const Transaction &t) { auto res = rpc_client_pool_.Call(t.id_); - auto removal = active_.access().remove(t.id_); - CHECK(removal) << "Can't commit a transaction not in local cache"; + ClearCache(t.id_); } void WorkerEngine::Abort(const Transaction &t) { auto res = rpc_client_pool_.Call(t.id_); - auto removal = active_.access().remove(t.id_); - CHECK(removal) << "Can't abort a transaction not in local cache"; + ClearCache(t.id_); } CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { @@ -54,12 +70,11 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { // @review: this version of Call is just used because Info has no // default constructor. info = rpc_client_pool_.Call(tid)->member; - DCHECK(info.is_committed() || info.is_aborted()) - << "It is expected that the transaction is not running anymore. This " - "function should be used only after the snapshot of the current " - "transaction is checked (in MVCC)."; - if (info.is_committed()) clog_.set_committed(tid); - if (info.is_aborted()) clog_.set_aborted(tid); + if (!info.is_active()) { + if (info.is_committed()) clog_.set_committed(tid); + if (info.is_aborted()) clog_.set_aborted(tid); + ClearCache(tid); + } } return info; @@ -91,10 +106,19 @@ tx::Transaction *WorkerEngine::RunningTransaction(tx::transaction_id_t tx_id) { Snapshot snapshot( std::move(rpc_client_pool_.Call(tx_id)->member)); - auto insertion = - accessor.insert(tx_id, new Transaction(tx_id, snapshot, *this)); + auto new_tx = new Transaction(tx_id, snapshot, *this); + auto insertion = accessor.insert(tx_id, new_tx); + if (!insertion.second) delete new_tx; utils::EnsureAtomicGe(local_last_, tx_id); return insertion.first->second; } +void WorkerEngine::ClearCache(transaction_id_t tx_id) const { + auto access = active_.access(); + auto found = access.find(tx_id); + if (found != access.end()) { + delete found->second; + access.remove(tx_id); + } +} } // namespace tx diff --git a/src/transactions/engine_worker.hpp b/src/transactions/engine_worker.hpp index 8a406b2e9..c931e9424 100644 --- a/src/transactions/engine_worker.hpp +++ b/src/transactions/engine_worker.hpp @@ -22,6 +22,7 @@ class WorkerEngine : public Engine { Transaction *Begin() override; command_id_t Advance(transaction_id_t id) override; + command_id_t UpdateCommand(transaction_id_t id) override; void Commit(const Transaction &t) override; void Abort(const Transaction &t) override; CommitLog::Info Info(transaction_id_t tid) const override; @@ -35,13 +36,16 @@ class WorkerEngine : public Engine { private: // Local caches. - ConcurrentMap active_; + mutable ConcurrentMap active_; std::atomic local_last_{0}; // Mutable because just getting info can cause a cache fill. mutable CommitLog clog_; // Communication to the transactional master. mutable communication::rpc::ClientPool rpc_client_pool_; -}; + // Removes (destructs) a Transaction that's expired. If there is no cached + // transacton for the given id, nothing is done. + void ClearCache(transaction_id_t tx_id) const; +}; } // namespace tx diff --git a/tests/manual/query_planner.cpp b/tests/manual/query_planner.cpp index 1c5812ff8..79b364182 100644 --- a/tests/manual/query_planner.cpp +++ b/tests/manual/query_planner.cpp @@ -502,8 +502,6 @@ class PlanPrinter : public query::plan::HierarchicalLogicalOperatorVisitor { return true; } - PRE_VISIT(ProduceRemote); - bool PreVisit(query::plan::PullRemote &op) override { WithPrintLn([&op](auto &out) { out << "* PullRemote {"; diff --git a/tests/unit/distributed_common.hpp b/tests/unit/distributed_common.hpp index 1080de58f..969dd4de9 100644 --- a/tests/unit/distributed_common.hpp +++ b/tests/unit/distributed_common.hpp @@ -3,6 +3,7 @@ #include #include "database/graph_db.hpp" +#include "database/graph_db_accessor.hpp" #include "transactions/engine_master.hpp" class DistributedGraphDbTest : public ::testing::Test { @@ -63,6 +64,44 @@ class DistributedGraphDbTest : public ::testing::Test { return workers_[worker_id - 1]->worker_; } + /// Inserts a vertex and returns it's global address. Does it in a new + /// transaction. + auto InsertVertex(database::GraphDb &db) { + database::GraphDbAccessor dba{db}; + auto r_val = dba.InsertVertex().GlobalAddress(); + dba.Commit(); + return r_val; + } + + /// Inserts an edge (on the 'from' side) and returns it's global address. + auto InsertEdge(Edges::VertexAddress from, Edges::VertexAddress to, + const std::string &edge_type_name) { + database::GraphDbAccessor dba{worker(from.worker_id())}; + auto from_v = dba.FindVertexChecked(from.gid(), false); + auto edge_type = dba.EdgeType(edge_type_name); + + // If 'to' is on the same worker as 'from', create and send local. + if (to.worker_id() == from.worker_id()) { + auto to_v = dba.FindVertexChecked(to.gid(), false); + auto r_val = dba.InsertEdge(from_v, to_v, edge_type).GlobalAddress(); + dba.Commit(); + return r_val; + } + + // 'to' is not on the same worker as 'from' + auto edge_ga = dba.InsertOnlyEdge(from, to, edge_type, + dba.db().storage().EdgeGenerator().Next()) + .GlobalAddress(); + from_v.update().out_.emplace(to, edge_ga, edge_type); + database::GraphDbAccessor dba_to{worker(to.worker_id()), + dba.transaction_id()}; + auto to_v = dba_to.FindVertexChecked(to.gid(), false); + to_v.update().in_.emplace(from, edge_ga, edge_type); + + dba.Commit(); + return edge_ga; + } + private: std::unique_ptr master_; std::vector> workers_; diff --git a/tests/unit/distributed_data_exchange.cpp b/tests/unit/distributed_data_exchange.cpp index dd8689adb..ee979088e 100644 --- a/tests/unit/distributed_data_exchange.cpp +++ b/tests/unit/distributed_data_exchange.cpp @@ -65,36 +65,9 @@ TEST_F(DistributedGraphDbTest, RemoteDataGetting) { TEST_F(DistributedGraphDbTest, RemoteExpansion) { // Model (v1)-->(v2), where each vertex is on one worker. - std::vector v_ga; - { - GraphDbAccessor dba{master()}; - v_ga.emplace_back(GraphDbAccessor(worker(1), dba.transaction_id()) - .InsertVertex() - .GlobalAddress()); - v_ga.emplace_back(GraphDbAccessor(worker(2), dba.transaction_id()) - .InsertVertex() - .GlobalAddress()); - dba.Commit(); - } - { - GraphDbAccessor dba{master()}; - auto edge_type = dba.EdgeType("et"); - auto prop = dba.Property("prop"); - GraphDbAccessor dba1{worker(1), dba.transaction_id()}; - auto edge_ga = - dba1.InsertOnlyEdge(v_ga[0], v_ga[1], edge_type, - dba1.db().storage().EdgeGenerator().Next()) - .GlobalAddress(); - for (int i : {0, 1}) { - GraphDbAccessor dba_w{worker(i + 1), dba.transaction_id()}; - auto v = dba_w.FindVertexChecked(v_ga[i].gid(), false); - // Update the vertex to create a new record. - v.PropsSet(prop, 42); - auto &edges = i == 0 ? v.GetNew()->out_ : v.GetNew()->in_; - edges.emplace(v_ga[(i + 1) % 2], edge_ga, edge_type); - } - dba.Commit(); - } + auto from = InsertVertex(worker(1)); + auto to = InsertVertex(worker(2)); + InsertEdge(from, to, "et"); { // Expand on the master for three hops. Collect vertex gids. GraphDbAccessor dba{master()}; @@ -107,11 +80,11 @@ TEST_F(DistributedGraphDbTest, RemoteExpansion) { }; // Do a few hops back and forth, all on the master. - VertexAccessor v{v_ga[0], dba}; + VertexAccessor v{from, dba}; for (int i = 0; i < 5; ++i) { v = expand(v); EXPECT_FALSE(v.address().is_local()); - EXPECT_EQ(v.address(), v_ga[(i + 1) % 2]); + EXPECT_EQ(v.address(), i % 2 ? from : to); } } } diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp index f93321595..325bb7445 100644 --- a/tests/unit/distributed_graph_db.cpp +++ b/tests/unit/distributed_graph_db.cpp @@ -149,7 +149,7 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { auto remote_pull = [this, ¶ms, &symbols](GraphDbAccessor &dba, int worker_id) { return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id, - params, symbols, 3); + params, symbols, false, 3); }; auto expect_first_batch = [](auto &batch) { EXPECT_EQ(batch.pull_state, @@ -271,7 +271,7 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) { auto remote_pull = [this, ¶ms, &symbols](GraphDbAccessor &dba, int worker_id) { return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id, - params, symbols, 3); + params, symbols, false, 3); }; auto future_w1_results = remote_pull(dba, 1); auto future_w2_results = remote_pull(dba, 2); @@ -339,3 +339,57 @@ TEST_F(DistributedGraphDbTest, WorkerOwnedDbAccessors) { VertexAccessor v_in_w2{v_ga, dba_w2}; EXPECT_EQ(v_in_w2.PropsAt(prop).Value(), 42); } + +TEST_F(DistributedGraphDbTest, Synchronize) { + auto from = InsertVertex(worker(1)); + auto to = InsertVertex(worker(2)); + InsertEdge(from, to, "et"); + + // Query: MATCH (n)--(m) SET m.prop = 2 RETURN n.prop + // This query ensures that a remote update gets applied and the local stuff + // gets reconstructed. + auto &db = master(); + GraphDbAccessor dba{db}; + Context ctx{dba}; + SymbolGenerator symbol_generator{ctx.symbol_table_}; + AstTreeStorage storage; + // MATCH + auto n = MakeScanAll(storage, ctx.symbol_table_, "n"); + auto r_m = + MakeExpand(storage, ctx.symbol_table_, n.op_, n.sym_, "r", + EdgeAtom::Direction::BOTH, {}, "m", false, GraphView::OLD); + + // SET + auto literal = LITERAL(42); + auto prop = PROPERTY_PAIR("prop"); + auto m_p = PROPERTY_LOOKUP("m", prop); + ctx.symbol_table_[*m_p->expression_] = r_m.node_sym_; + auto set_m_p = std::make_shared(r_m.op_, m_p, literal); + + const int plan_id = 42; + master().plan_dispatcher().DispatchPlan(plan_id, set_m_p, ctx.symbol_table_); + + // Master-side PullRemote, Synchronize + auto pull_remote = std::make_shared( + nullptr, plan_id, std::vector{n.sym_}); + auto synchronize = + std::make_shared(set_m_p, pull_remote, true); + + // RETURN + auto n_p = + storage.Create(storage.Create("n"), prop); + ctx.symbol_table_[*n_p->expression_] = n.sym_; + auto return_n_p = NEXPR("n.prop", n_p); + auto return_n_p_sym = ctx.symbol_table_.CreateSymbol("n.p", true); + ctx.symbol_table_[*return_n_p] = return_n_p_sym; + auto produce = MakeProduce(synchronize, return_n_p); + + auto results = CollectProduce(produce.get(), ctx.symbol_table_, dba); + ASSERT_EQ(results.size(), 2); + ASSERT_EQ(results[0].size(), 1); + EXPECT_EQ(results[0][0].ValueInt(), 42); + ASSERT_EQ(results[1].size(), 1); + EXPECT_EQ(results[1][0].ValueInt(), 42); + + // TODO test without advance command? +} diff --git a/tests/unit/query_plan_match_filter_return.cpp b/tests/unit/query_plan_match_filter_return.cpp index 84d33f3c4..b11b2d308 100644 --- a/tests/unit/query_plan_match_filter_return.cpp +++ b/tests/unit/query_plan_match_filter_return.cpp @@ -1,8 +1,3 @@ -// -// Copyright 2017 Memgraph -// Created by Florijan Stamenkovic on 14.03.17. -// - #include #include #include diff --git a/tests/unit/query_planner.cpp b/tests/unit/query_planner.cpp index 3a6bc386c..5c59ad918 100644 --- a/tests/unit/query_planner.cpp +++ b/tests/unit/query_planner.cpp @@ -113,7 +113,6 @@ class PlanChecker : public HierarchicalLogicalOperatorVisitor { return true; } - PRE_VISIT(ProduceRemote); PRE_VISIT(PullRemote); bool PreVisit(Synchronize &op) override { @@ -1685,9 +1684,8 @@ TYPED_TEST(TestPlanner, WhereIndexedLabelPropertyRange) { AstTreeStorage storage; auto lit_42 = LITERAL(42); auto n_prop = PROPERTY_LOOKUP("n", property); - auto check_planned_range = [&label, &property, &db](const auto &rel_expr, - auto lower_bound, - auto upper_bound) { + auto check_planned_range = [&label, &property, &db]( + const auto &rel_expr, auto lower_bound, auto upper_bound) { // Shadow the first storage, so that the query is created in this one. AstTreeStorage storage; QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n", label))), WHERE(rel_expr), @@ -2020,5 +2018,4 @@ TYPED_TEST(TestPlanner, DistributedMatchCreateReturn) { MakeCheckers(ExpectScanAll(), ExpectCreateNode(), acc)}; CheckDistributedPlan(planner.plan(), symbol_table, expected); } - } // namespace