diff --git a/src/query/frontend/ast/ast.lcp b/src/query/frontend/ast/ast.lcp index 217c43c07..13491b9e5 100644 --- a/src/query/frontend/ast/ast.lcp +++ b/src/query/frontend/ast/ast.lcp @@ -2551,7 +2551,7 @@ cpp<# ((info-type "InfoType" :scope :public)) (:public (lcp:define-enum info-type - (storage index constraint) + (storage index constraint raft) (:serialize)) #>cpp diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index 04ad1ebbc..b9611a09f 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -61,6 +61,9 @@ antlrcpp::Any CypherMainVisitor::visitInfoQuery( } else if (ctx->constraintInfo()) { info_query->info_type_ = InfoQuery::InfoType::CONSTRAINT; return info_query; + } else if (ctx->raftInfo()) { + info_query->info_type_ = InfoQuery::InfoType::RAFT; + return info_query; } else { throw utils::NotYetImplemented("Info query: '{}'", ctx->getText()); } diff --git a/src/query/frontend/opencypher/grammar/Cypher.g4 b/src/query/frontend/opencypher/grammar/Cypher.g4 index 626c6bb21..51a460257 100644 --- a/src/query/frontend/opencypher/grammar/Cypher.g4 +++ b/src/query/frontend/opencypher/grammar/Cypher.g4 @@ -45,7 +45,9 @@ indexInfo : INDEX INFO ; constraintInfo : CONSTRAINT INFO ; -infoQuery : SHOW ( storageInfo | indexInfo | constraintInfo ) ; +raftInfo : RAFT INFO ; + +infoQuery : SHOW ( storageInfo | indexInfo | constraintInfo | raftInfo ) ; explainQuery : EXPLAIN cypherQuery ; diff --git a/src/query/frontend/opencypher/grammar/CypherLexer.g4 b/src/query/frontend/opencypher/grammar/CypherLexer.g4 index e93321630..38ad6736b 100644 --- a/src/query/frontend/opencypher/grammar/CypherLexer.g4 +++ b/src/query/frontend/opencypher/grammar/CypherLexer.g4 @@ -113,6 +113,7 @@ OPTIONAL : O P T I O N A L ; OR : O R ; ORDER : O R D E R ; PROFILE : P R O F I L E ; +RAFT : R A F T ; REDUCE : R E D U C E ; REMOVE : R E M O V E ; RETURN : R E T U R N ; diff --git a/src/query/frontend/semantic/required_privileges.cpp b/src/query/frontend/semantic/required_privileges.cpp index d24bb1611..524ef6d9b 100644 --- a/src/query/frontend/semantic/required_privileges.cpp +++ b/src/query/frontend/semantic/required_privileges.cpp @@ -45,6 +45,11 @@ class PrivilegeExtractor : public QueryVisitor, // for *or* with privileges. AddPrivilege(AuthQuery::Privilege::CONSTRAINT); break; + case InfoQuery::InfoType::RAFT: + // This query should always be available to everyone. It is essential + // for correct operation of the HA cluster. Because of that we don't + // add any privileges here. + break; } } diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 001f51234..fb59e0971 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -631,6 +631,19 @@ Callback HandleInfoQuery(InfoQuery *info_query, case InfoQuery::InfoType::CONSTRAINT: throw utils::NotYetImplemented("constraint info"); break; + case InfoQuery::InfoType::RAFT: +#if defined(MG_SINGLE_NODE_HA) + callback.header = {"info", "value"}; + callback.fn = [db_accessor] { + std::vector> results( + {{"is_leader", db_accessor->raft()->IsLeader()}, + {"term_id", static_cast(db_accessor->raft()->TermId())}}); + return results; + }; +#else + throw utils::NotYetImplemented("raft info"); +#endif + break; } return callback; } @@ -672,13 +685,6 @@ Interpreter::Results Interpreter::operator()( const std::string &query_string, database::GraphDbAccessor &db_accessor, const std::map ¶ms, bool in_explicit_transaction) { -#ifdef MG_SINGLE_NODE_HA - if (!db_accessor.raft()->IsLeader()) { - throw QueryException( - "Memgraph High Availability: Can't execute queries if not leader."); - } -#endif - AstStorage ast_storage; Parameters parameters; std::map summary; @@ -704,6 +710,18 @@ Interpreter::Results Interpreter::operator()( // we must ensure it lives during the whole interpretation. std::shared_ptr plan{nullptr}; +#ifdef MG_SINGLE_NODE_HA + { + InfoQuery *info_query = nullptr; + if (!db_accessor.raft()->IsLeader() && + (!(info_query = utils::Downcast(parsed_query.query)) || + info_query->info_type_ != InfoQuery::InfoType::RAFT)) { + throw QueryException( + "Memgraph High Availability: Can't execute queries if not leader."); + } + } +#endif + if (auto *cypher_query = utils::Downcast(parsed_query.query)) { plan = CypherQueryToPlan(stripped_query.hash(), cypher_query, std::move(ast_storage), parameters, &db_accessor); diff --git a/src/raft/raft_interface.hpp b/src/raft/raft_interface.hpp index a521e38b9..1522536fa 100644 --- a/src/raft/raft_interface.hpp +++ b/src/raft/raft_interface.hpp @@ -21,6 +21,9 @@ class RaftInterface { /// Returns true if the current servers mode is LEADER. False otherwise. virtual bool IsLeader() = 0; + /// Returns the term ID of the current leader. + virtual uint64_t TermId() = 0; + protected: ~RaftInterface() {} }; diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 22330d1ae..71d50426b 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -454,6 +454,8 @@ void RaftServer::GarbageCollectReplicationLog(const tx::TransactionId &tx_id) { bool RaftServer::IsLeader() { return !exiting_ && mode_ == Mode::LEADER; } +uint64_t RaftServer::TermId() { return current_term_; } + RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server) : raft_server_(raft_server) { CHECK(raft_server_) << "RaftServer can't be nullptr"; @@ -874,7 +876,7 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) { // TODO(ipaljak): Consider backoff. wait_until = TimePoint::max(); - auto request_term = current_term_; + auto request_term = current_term_.load(); auto peer_future = coordination_->ExecuteOnWorker( peer_id, [&](int worker_id, auto &client) { auto last_entry_data = LastEntryData(); diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index be9e77198..5a5450ce0 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -113,6 +113,9 @@ class RaftServer final : public RaftInterface { /// Returns true if the current servers mode is LEADER. False otherwise. bool IsLeader() override; + /// Returns the term ID of the current leader. + uint64_t TermId() override; + void GarbageCollectReplicationLog(const tx::TransactionId &tx_id); private: @@ -253,7 +256,7 @@ class RaftServer final : public RaftInterface { std::experimental::optional voted_for_; - uint64_t current_term_; + std::atomic current_term_; uint64_t log_size_; /// Recovers persistent data from disk and stores its in-memory copies diff --git a/tests/unit/transaction_engine_single_node_ha.cpp b/tests/unit/transaction_engine_single_node_ha.cpp index 81abdc0d0..c19a52fae 100644 --- a/tests/unit/transaction_engine_single_node_ha.cpp +++ b/tests/unit/transaction_engine_single_node_ha.cpp @@ -20,6 +20,8 @@ class RaftMock final : public raft::RaftInterface { bool IsLeader() override { return true; } + uint64_t TermId() override { return 1; } + std::vector GetLogForTx( const tx::TransactionId &tx_id) { return log_[tx_id];