Add SHOW RAFT INFO query to HA

Reviewers: msantl, mtomic

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1946
This commit is contained in:
Matej Ferencevic 2019-04-08 13:00:28 +02:00
parent 47a1e302a0
commit 63aeed0624
10 changed files with 50 additions and 11 deletions

View File

@ -2551,7 +2551,7 @@ cpp<#
((info-type "InfoType" :scope :public))
(:public
(lcp:define-enum info-type
(storage index constraint)
(storage index constraint raft)
(:serialize))
#>cpp

View File

@ -61,6 +61,9 @@ antlrcpp::Any CypherMainVisitor::visitInfoQuery(
} else if (ctx->constraintInfo()) {
info_query->info_type_ = InfoQuery::InfoType::CONSTRAINT;
return info_query;
} else if (ctx->raftInfo()) {
info_query->info_type_ = InfoQuery::InfoType::RAFT;
return info_query;
} else {
throw utils::NotYetImplemented("Info query: '{}'", ctx->getText());
}

View File

@ -45,7 +45,9 @@ indexInfo : INDEX INFO ;
constraintInfo : CONSTRAINT INFO ;
infoQuery : SHOW ( storageInfo | indexInfo | constraintInfo ) ;
raftInfo : RAFT INFO ;
infoQuery : SHOW ( storageInfo | indexInfo | constraintInfo | raftInfo ) ;
explainQuery : EXPLAIN cypherQuery ;

View File

@ -113,6 +113,7 @@ OPTIONAL : O P T I O N A L ;
OR : O R ;
ORDER : O R D E R ;
PROFILE : P R O F I L E ;
RAFT : R A F T ;
REDUCE : R E D U C E ;
REMOVE : R E M O V E ;
RETURN : R E T U R N ;

View File

@ -45,6 +45,11 @@ class PrivilegeExtractor : public QueryVisitor<void>,
// for *or* with privileges.
AddPrivilege(AuthQuery::Privilege::CONSTRAINT);
break;
case InfoQuery::InfoType::RAFT:
// This query should always be available to everyone. It is essential
// for correct operation of the HA cluster. Because of that we don't
// add any privileges here.
break;
}
}

View File

@ -631,6 +631,19 @@ Callback HandleInfoQuery(InfoQuery *info_query,
case InfoQuery::InfoType::CONSTRAINT:
throw utils::NotYetImplemented("constraint info");
break;
case InfoQuery::InfoType::RAFT:
#if defined(MG_SINGLE_NODE_HA)
callback.header = {"info", "value"};
callback.fn = [db_accessor] {
std::vector<std::vector<TypedValue>> results(
{{"is_leader", db_accessor->raft()->IsLeader()},
{"term_id", static_cast<int64_t>(db_accessor->raft()->TermId())}});
return results;
};
#else
throw utils::NotYetImplemented("raft info");
#endif
break;
}
return callback;
}
@ -672,13 +685,6 @@ Interpreter::Results Interpreter::operator()(
const std::string &query_string, database::GraphDbAccessor &db_accessor,
const std::map<std::string, PropertyValue> &params,
bool in_explicit_transaction) {
#ifdef MG_SINGLE_NODE_HA
if (!db_accessor.raft()->IsLeader()) {
throw QueryException(
"Memgraph High Availability: Can't execute queries if not leader.");
}
#endif
AstStorage ast_storage;
Parameters parameters;
std::map<std::string, TypedValue> summary;
@ -704,6 +710,18 @@ Interpreter::Results Interpreter::operator()(
// we must ensure it lives during the whole interpretation.
std::shared_ptr<CachedPlan> plan{nullptr};
#ifdef MG_SINGLE_NODE_HA
{
InfoQuery *info_query = nullptr;
if (!db_accessor.raft()->IsLeader() &&
(!(info_query = utils::Downcast<InfoQuery>(parsed_query.query)) ||
info_query->info_type_ != InfoQuery::InfoType::RAFT)) {
throw QueryException(
"Memgraph High Availability: Can't execute queries if not leader.");
}
}
#endif
if (auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query)) {
plan = CypherQueryToPlan(stripped_query.hash(), cypher_query,
std::move(ast_storage), parameters, &db_accessor);

View File

@ -21,6 +21,9 @@ class RaftInterface {
/// Returns true if the current servers mode is LEADER. False otherwise.
virtual bool IsLeader() = 0;
/// Returns the term ID of the current leader.
virtual uint64_t TermId() = 0;
protected:
~RaftInterface() {}
};

View File

@ -454,6 +454,8 @@ void RaftServer::GarbageCollectReplicationLog(const tx::TransactionId &tx_id) {
bool RaftServer::IsLeader() { return !exiting_ && mode_ == Mode::LEADER; }
uint64_t RaftServer::TermId() { return current_term_; }
RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server)
: raft_server_(raft_server) {
CHECK(raft_server_) << "RaftServer can't be nullptr";
@ -874,7 +876,7 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
// TODO(ipaljak): Consider backoff.
wait_until = TimePoint::max();
auto request_term = current_term_;
auto request_term = current_term_.load();
auto peer_future = coordination_->ExecuteOnWorker<RequestVoteRes>(
peer_id, [&](int worker_id, auto &client) {
auto last_entry_data = LastEntryData();

View File

@ -113,6 +113,9 @@ class RaftServer final : public RaftInterface {
/// Returns true if the current servers mode is LEADER. False otherwise.
bool IsLeader() override;
/// Returns the term ID of the current leader.
uint64_t TermId() override;
void GarbageCollectReplicationLog(const tx::TransactionId &tx_id);
private:
@ -253,7 +256,7 @@ class RaftServer final : public RaftInterface {
std::experimental::optional<uint16_t> voted_for_;
uint64_t current_term_;
std::atomic<uint64_t> current_term_;
uint64_t log_size_;
/// Recovers persistent data from disk and stores its in-memory copies

View File

@ -20,6 +20,8 @@ class RaftMock final : public raft::RaftInterface {
bool IsLeader() override { return true; }
uint64_t TermId() override { return 1; }
std::vector<database::StateDelta> GetLogForTx(
const tx::TransactionId &tx_id) {
return log_[tx_id];