add register instance and set instance to main queries and logic

This commit is contained in:
antoniofilipovic 2024-01-23 16:20:58 +01:00 committed by Andi Skrgat
parent dc9a2c45c4
commit 0a28fee34b
23 changed files with 546 additions and 95 deletions

View File

@ -58,6 +58,8 @@ void CoordinatorClient::StartFrequentCheck() {
});
}
void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); }
void CoordinatorClient::PauseFrequentCheck() { replica_checker_.Pause(); }
void CoordinatorClient::ResumeFrequentCheck() { replica_checker_.Resume(); }
@ -86,5 +88,20 @@ auto CoordinatorClient::SendPromoteReplicaToMainRpc(
return false;
}
auto CoordinatorClient::SendSetToReplicaRpc(CoordinatorClient::ReplClientInfo replication_client_info) const -> bool {
try {
auto stream{rpc_client_.Stream<SetMainToReplicaRpc>(std::move(replication_client_info))};
if (!stream.AwaitResponse().success) {
spdlog::error("Failed to set main to replica!");
return false;
}
spdlog::info("Sent request RPC from coordinator to instance to set it as replica!");
return true;
} catch (const rpc::RpcFailedException &) {
spdlog::error("Failed to send failover RPC from coordinator to new main!");
}
return false;
}
} // namespace memgraph::coordination
#endif

View File

@ -36,6 +36,22 @@ void PromoteReplicaToMainRes::Load(PromoteReplicaToMainRes *self, memgraph::slk:
memgraph::slk::Load(self, reader);
}
void SetMainToReplicaReq::Save(const SetMainToReplicaReq &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void SetMainToReplicaReq::Load(SetMainToReplicaReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
void SetMainToReplicaRes::Save(const SetMainToReplicaRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void SetMainToReplicaRes::Load(SetMainToReplicaRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
} // namespace coordination
constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ,
@ -44,6 +60,12 @@ constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::Ty
constexpr utils::TypeInfo coordination::PromoteReplicaToMainRes::kType{utils::TypeId::COORD_FAILOVER_RES,
"CoordPromoteReplicaToMainRes", nullptr};
constexpr utils::TypeInfo coordination::SetMainToReplicaReq::kType{utils::TypeId::COORD_SET_REPL_MAIN_REQ,
"CoordSetReplMainReq", nullptr};
constexpr utils::TypeInfo coordination::SetMainToReplicaRes::kType{utils::TypeId::COORD_SET_REPL_MAIN_RES,
"CoordSetReplMainRes", nullptr};
namespace slk {
void Save(const memgraph::coordination::PromoteReplicaToMainRes &self, memgraph::slk::Builder *builder) {
@ -62,6 +84,22 @@ void Load(memgraph::coordination::PromoteReplicaToMainReq *self, memgraph::slk::
memgraph::slk::Load(&self->replication_clients_info, reader);
}
void Save(const memgraph::coordination::SetMainToReplicaReq &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.replication_client_info, builder);
}
void Load(memgraph::coordination::SetMainToReplicaReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->replication_client_info, reader);
}
void Save(const memgraph::coordination::SetMainToReplicaRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.success, builder);
}
void Load(memgraph::coordination::SetMainToReplicaRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->success, reader);
}
} // namespace slk
} // namespace memgraph

View File

@ -10,6 +10,9 @@
// licenses/APL.txt.
#include "coordination/coordinator_state.hpp"
#include <algorithm>
#include "coordination/coordinator_client_info.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#ifdef MG_ENTERPRISE
@ -38,6 +41,163 @@ CoordinatorState::CoordinatorState() {
}
}
auto CoordinatorState::RegisterInstanceOnCoordinator(CoordinatorClientConfig config)
-> RegisterInstanceCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
const auto name_endpoint_status =
std::visit(memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR;
},
[&config](const CoordinatorData &coordinator_data) {
if (memgraph::coordination::CheckName(
coordinator_data.registered_replicas_, config)) {
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
}
return RegisterInstanceCoordinatorStatus::SUCCESS;
}},
data_);
if (name_endpoint_status != RegisterInstanceCoordinatorStatus::SUCCESS) {
return name_endpoint_status;
}
auto find_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo & {
MG_ASSERT(std::holds_alternative<CoordinatorData>(coord_state->data_),
"Can't execute CoordinatorClient's callback since variant holds wrong alternative");
auto &coord_data = std::get<CoordinatorData>(coord_state->data_);
std::shared_lock<utils::RWLock> lock{coord_data.coord_data_lock_};
auto replica_client_info = std::ranges::find_if(
coord_data.registered_replicas_info_,
[instance_name](const CoordinatorClientInfo &replica) { return replica.InstanceName() == instance_name; });
if (replica_client_info != coord_data.registered_replicas_info_.end()) {
return *replica_client_info;
}
MG_ASSERT(coord_data.registered_main_info_->InstanceName() == instance_name,
"Instance is neither a replica nor main...");
return *coord_data.registered_main_info_;
};
// TODO MERGE WITH ANDI's WORK
auto repl_succ_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
auto &client_info = find_client_info(coord_state, instance_name);
client_info.UpdateLastResponseTime();
};
auto repl_fail_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
auto &client_info = find_client_info(coord_state, instance_name);
client_info.UpdateInstanceStatus();
};
CoordinatorClientConfig::ReplicationClientInfo replication_client_info = *config.replication_client_info;
auto *coord_client = &std::get<CoordinatorData>(data_).registered_replicas_.emplace_back(
this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb));
coord_client->SendSetToReplicaRpc(replication_client_info);
std::get<CoordinatorData>(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(),
coord_client->SocketAddress());
coord_client->StartFrequentCheck();
return RegisterInstanceCoordinatorStatus::SUCCESS;
}
auto CoordinatorState::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
// TODO: (andi) How does the situation change when restoration of main is implemented regarding callbacks?
// We should probably at that point search for main also in instances as for replicas...
auto get_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo & {
MG_ASSERT(std::holds_alternative<CoordinatorData>(coord_state->data_),
"Can't execute CoordinatorClient's callback since variant holds wrong alternative");
MG_ASSERT(std::get<CoordinatorData>(coord_state->data_).registered_main_info_.has_value(),
"Main info is not set, but callback is called");
auto &coord_data = std::get<CoordinatorData>(coord_state->data_);
std::shared_lock<utils::RWLock> lock{coord_data.coord_data_lock_};
// TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at
// this point....
auto &registered_main_info = coord_data.registered_main_info_;
MG_ASSERT(registered_main_info->InstanceName() == instance_name,
"Callback called for wrong instance name: {}, expected: {}", instance_name,
registered_main_info->InstanceName());
return *registered_main_info;
};
auto succ_cb = [get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
auto &registered_main_info = get_client_info(coord_state, instance_name);
registered_main_info.UpdateLastResponseTime();
};
auto fail_cb = [this, get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
auto &registered_main_info = get_client_info(coord_state, instance_name);
if (bool main_alive = registered_main_info.UpdateInstanceStatus(); !main_alive) {
spdlog::warn("Main is not alive, starting failover");
switch (auto failover_status = DoFailover(); failover_status) {
using enum DoFailoverStatus;
case ALL_REPLICAS_DOWN:
spdlog::warn("Failover aborted since all replicas are down!");
case MAIN_ALIVE:
spdlog::warn("Failover aborted since main is alive!");
case CLUSTER_UNINITIALIZED:
spdlog::warn("Failover aborted since cluster is uninitialized!");
case SUCCESS:
break;
}
}
};
auto &registered_replicas = std::get<CoordinatorData>(data_).registered_replicas_;
// Find replica we already registered
auto registered_replica =
std::find_if(registered_replicas.begin(), registered_replicas.end(), [instance_name](const auto &replica_client) {
std::cout << "replica name: " << replica_client.InstanceName() << ", instance name: " << instance_name
<< std::endl;
return replica_client.InstanceName() == instance_name;
});
std::for_each(registered_replicas.begin(), registered_replicas.end(),
[](const auto &client) { std::cout << "replica names: " << client.InstanceName() << std::endl; });
// if replica not found...
if (registered_replica == registered_replicas.end()) {
spdlog::error("You didn't register instance with given name {}", instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
registered_replica->StopFrequentCheck();
// Set instance as MAIN
// THIS WILL SHUT DOWN CLIENT
auto &registered_main = std::get<CoordinatorData>(data_).registered_main_;
registered_main =
std::make_unique<CoordinatorClient>(this, registered_replica->Config(), std::move(succ_cb), std::move(fail_cb));
std::get<CoordinatorData>(data_).registered_main_info_.emplace(registered_main->InstanceName(),
registered_main->SocketAddress());
std::vector<CoordinatorClientConfig::ReplicationClientInfo> repl_clients_info;
repl_clients_info.reserve(registered_replicas.size() - 1);
std::ranges::for_each(registered_replicas,
[registered_replica, &repl_clients_info](const CoordinatorClient &replica) {
if (replica != *registered_replica) {
repl_clients_info.emplace_back(replica.ReplicationClientInfo());
}
});
// PROMOTE REPLICA TO MAIN
// THIS SHOULD FAIL HERE IF IT IS DOWN
if (auto result = registered_main->SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) {
registered_replica->StartFrequentCheck();
registered_main.reset();
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
registered_main->StartFrequentCheck();
registered_replicas.erase(registered_replica);
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");

View File

@ -39,6 +39,7 @@ class CoordinatorClient {
CoordinatorClient &operator=(CoordinatorClient &&) noexcept = delete;
void StartFrequentCheck();
void StopFrequentCheck();
void PauseFrequentCheck();
void ResumeFrequentCheck();
@ -50,6 +51,8 @@ class CoordinatorClient {
auto ReplicationClientInfo() const -> const std::optional<ReplClientInfo> &;
auto ResetReplicationClientInfo() -> void;
auto SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool;
auto SetSuccCallback(HealthCheckCallback succ_cb) -> void;
auto SetFailCallback(HealthCheckCallback fail_cb) -> void;

View File

@ -48,6 +48,36 @@ struct PromoteReplicaToMainRes {
using PromoteReplicaToMainRpc = rpc::RequestResponse<PromoteReplicaToMainReq, PromoteReplicaToMainRes>;
struct SetMainToReplicaReq {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(SetMainToReplicaReq *self, memgraph::slk::Reader *reader);
static void Save(const SetMainToReplicaReq &self, memgraph::slk::Builder *builder);
explicit SetMainToReplicaReq(CoordinatorClientConfig::ReplicationClientInfo replication_client_info)
: replication_client_info(std::move(replication_client_info)) {}
SetMainToReplicaReq() = default;
CoordinatorClientConfig::ReplicationClientInfo replication_client_info;
};
struct SetMainToReplicaRes {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(SetMainToReplicaRes *self, memgraph::slk::Reader *reader);
static void Save(const SetMainToReplicaRes &self, memgraph::slk::Builder *builder);
explicit SetMainToReplicaRes(bool success) : success(success) {}
SetMainToReplicaRes() = default;
bool success;
};
using SetMainToReplicaRpc = rpc::RequestResponse<SetMainToReplicaReq, SetMainToReplicaRes>;
} // namespace memgraph::coordination
// SLK serialization declarations
@ -61,6 +91,14 @@ void Save(const memgraph::coordination::PromoteReplicaToMainReq &self, memgraph:
void Load(memgraph::coordination::PromoteReplicaToMainReq *self, memgraph::slk::Reader *reader);
void Save(const memgraph::coordination::SetMainToReplicaRes &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::SetMainToReplicaRes *self, memgraph::slk::Reader *reader);
void Save(const memgraph::coordination::SetMainToReplicaReq &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::SetMainToReplicaReq *self, memgraph::slk::Reader *reader);
} // namespace memgraph::slk
#endif

View File

@ -38,6 +38,10 @@ class CoordinatorState {
[[nodiscard]] auto RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus;
[[nodiscard]] auto RegisterInstanceOnCoordinator(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowReplicas() const -> std::vector<CoordinatorInstanceStatus>;
auto ShowMain() const -> std::optional<CoordinatorInstanceStatus>;

View File

@ -25,5 +25,20 @@ enum class RegisterMainReplicaCoordinatorStatus : uint8_t {
SUCCESS
};
enum class RegisterInstanceCoordinatorStatus : uint8_t {
NAME_EXISTS,
END_POINT_EXISTS,
COULD_NOT_BE_PERSISTED,
NOT_COORDINATOR,
SUCCESS
};
enum class SetInstanceToMainCoordinatorStatus : uint8_t {
NO_INSTANCE_WITH_NAME,
NOT_COORDINATOR,
SUCCESS,
COULD_NOT_PROMOTE_TO_MAIN,
};
} // namespace memgraph::coordination
#endif

View File

@ -9,6 +9,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "coordination/register_main_replica_coordinator_status.hpp"
#ifdef MG_ENTERPRISE
#include "dbms/coordinator_handler.hpp"
@ -29,6 +30,16 @@ auto CoordinatorHandler::RegisterMainOnCoordinator(memgraph::coordination::Coord
return dbms_handler_.CoordinatorState().RegisterMain(std::move(config));
}
auto CoordinatorHandler::RegisterInstanceOnCoordinator(memgraph::coordination::CoordinatorClientConfig config)
-> coordination::RegisterInstanceCoordinatorStatus {
return dbms_handler_.CoordinatorState().RegisterInstanceOnCoordinator(std::move(config));
}
auto CoordinatorHandler::SetInstanceToMain(std::string instance_name)
-> coordination::SetInstanceToMainCoordinatorStatus {
return dbms_handler_.CoordinatorState().SetInstanceToMain(std::move(instance_name));
}
auto CoordinatorHandler::ShowReplicasOnCoordinator() const -> std::vector<coordination::CoordinatorInstanceStatus> {
return dbms_handler_.CoordinatorState().ShowReplicas();
}

View File

@ -38,6 +38,11 @@ class CoordinatorHandler {
auto RegisterMainOnCoordinator(coordination::CoordinatorClientConfig config)
-> coordination::RegisterMainReplicaCoordinatorStatus;
auto RegisterInstanceOnCoordinator(coordination::CoordinatorClientConfig config)
-> coordination::RegisterInstanceCoordinatorStatus;
auto SetInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus;
auto ShowReplicasOnCoordinator() const -> std::vector<coordination::CoordinatorInstanceStatus>;
auto ShowMainOnCoordinator() const -> std::optional<coordination::CoordinatorInstanceStatus>;

View File

@ -31,6 +31,37 @@ void CoordinatorHandlers::Register(DbmsHandler &dbms_handler) {
spdlog::info("Received PromoteReplicaToMainRpc");
CoordinatorHandlers::PromoteReplicaToMainHandler(dbms_handler, req_reader, res_builder);
});
server.Register<coordination::SetMainToReplicaRpc>(
[&dbms_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
spdlog::info("Received PromoteReplicaToMainRpc from coordinator server");
CoordinatorHandlers::SetMainToReplicaHandler(dbms_handler, req_reader, res_builder);
});
}
void CoordinatorHandlers::SetMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader,
slk::Builder *res_builder) {
auto &repl_state = dbms_handler.ReplicationState();
if (!repl_state.IsMain()) {
spdlog::error("Setting to replica must be performed on main.");
slk::Save(coordination::SetMainToReplicaRes{false}, res_builder);
return;
}
coordination::SetMainToReplicaReq req;
slk::Load(&req, req_reader);
replication::ReplicationServerConfig clients_config{.ip_address = req.replication_client_info.replication_ip_address,
.port = req.replication_client_info.replication_port};
if (bool success = memgraph::dbms::SetReplicationRoleReplica(dbms_handler, clients_config); !success) {
spdlog::error("Setting main to replica failed!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
}
slk::Save(coordination::PromoteReplicaToMainRes{true}, res_builder);
}
void CoordinatorHandlers::PromoteReplicaToMainHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader,

View File

@ -26,6 +26,7 @@ class CoordinatorHandlers {
private:
static void PromoteReplicaToMainHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader,
slk::Builder *res_builder);
static void SetMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader, slk::Builder *res_builder);
};
} // namespace memgraph::dbms

View File

@ -44,6 +44,46 @@ inline bool DoReplicaToMainPromotion(dbms::DbmsHandler &dbms_handler) {
return true;
};
inline bool SetReplicationRoleReplica(dbms::DbmsHandler &dbms_handler,
const memgraph::replication::ReplicationServerConfig &config) {
// We don't want to restart the server if we're already a REPLICA
if (dbms_handler.ReplicationState().IsReplica()) {
return false;
}
// TODO StorageState needs to be synched. Could have a dangling reference if someone adds a database as we are
// deleting the replica.
// Remove database specific clients
dbms_handler.ForEach([&](Database *db) {
auto *storage = db->storage();
storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); });
});
// Remove instance level clients
std::get<replication::RoleMainData>(dbms_handler.ReplicationState().ReplicationData()).registered_replicas_.clear();
// Creates the server
dbms_handler.ReplicationState().SetReplicationRoleReplica(config);
// Start
const auto success =
std::visit(utils::Overloaded{[](replication::RoleMainData const &) {
// ASSERT
return false;
},
[&dbms_handler](replication::RoleReplicaData const &data) {
// Register handlers
InMemoryReplicationHandlers::Register(&dbms_handler, *data.server);
if (!data.server->Start()) {
spdlog::error("Unable to start the replication server.");
return false;
}
return true;
}},
dbms_handler.ReplicationState().ReplicationData());
// TODO Handle error (restore to main?)
return success;
}
inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler,
replication::ReplicationClient &instance_client) {
if (!allow_mt_repl && dbms_handler.All().size() > 1) {

View File

@ -3071,14 +3071,14 @@ class CoordinatorQuery : public memgraph::query::Query {
const utils::TypeInfo &GetTypeInfo() const override { return kType; }
enum class Action {
REGISTER_INSTANCE_ON_COORDINATOR,
REGISTER_MAIN_COORDINATOR_SERVER,
REGISTER_REPLICA_COORDINATOR_SERVER,
SET_INSTANCE_TO_MAIN,
SHOW_REPLICATION_CLUSTER,
DO_FAILOVER
};
enum class ReplicationRole { MAIN, REPLICA };
enum class SyncMode { SYNC, ASYNC };
CoordinatorQuery() = default;
@ -3086,18 +3086,17 @@ class CoordinatorQuery : public memgraph::query::Query {
DEFVISITABLE(QueryVisitor<void>);
memgraph::query::CoordinatorQuery::Action action_;
memgraph::query::CoordinatorQuery::ReplicationRole role_;
std::string instance_name_;
memgraph::query::Expression *socket_address_{nullptr};
memgraph::query::Expression *replication_socket_address_{nullptr};
memgraph::query::Expression *coordinator_socket_address_{nullptr};
memgraph::query::CoordinatorQuery::SyncMode sync_mode_;
CoordinatorQuery *Clone(AstStorage *storage) const override {
auto *object = storage->Create<CoordinatorQuery>();
object->action_ = action_;
object->role_ = role_;
object->instance_name_ = instance_name_;
object->socket_address_ = socket_address_ ? socket_address_->Clone(storage) : nullptr;
object->replication_socket_address_ =
replication_socket_address_ ? replication_socket_address_->Clone(storage) : nullptr;
object->sync_mode_ = sync_mode_;
object->coordinator_socket_address_ =
coordinator_socket_address_ ? coordinator_socket_address_->Clone(storage) : nullptr;

View File

@ -375,10 +375,28 @@ antlrcpp::Any CypherMainVisitor::visitRegisterReplica(MemgraphCypher::RegisterRe
}
// License check is done in the interpreter.
antlrcpp::Any CypherMainVisitor::visitRegisterCoordinatorServer(MemgraphCypher::RegisterCoordinatorServerContext *ctx) {
MG_ASSERT(ctx->children.size() == 1, "RegisterCoordinatorServerQuery should have exactly one child!");
auto *coordinator_query = std::any_cast<CoordinatorQuery *>(ctx->children[0]->accept(this));
query_ = coordinator_query;
antlrcpp::Any CypherMainVisitor::visitRegisterInstanceOnCoordinator(
MemgraphCypher::RegisterInstanceOnCoordinatorContext *ctx) {
auto *coordinator_query = storage_->Create<CoordinatorQuery>();
if (!ctx->replicationSocketAddress()->literal()->StringLiteral()) {
throw SemanticException("Replication socket address should be a string literal!");
}
if (!ctx->coordinatorSocketAddress()->literal()->StringLiteral()) {
throw SemanticException("Coordinator socket address should be a string literal!");
}
coordinator_query->action_ = CoordinatorQuery::Action::REGISTER_INSTANCE_ON_COORDINATOR;
coordinator_query->replication_socket_address_ =
std::any_cast<Expression *>(ctx->replicationSocketAddress()->accept(this));
coordinator_query->coordinator_socket_address_ =
std::any_cast<Expression *>(ctx->coordinatorSocketAddress()->accept(this));
coordinator_query->instance_name_ = std::any_cast<std::string>(ctx->instanceName()->symbolicName()->accept(this));
if (ctx->ASYNC()) {
coordinator_query->sync_mode_ = memgraph::query::CoordinatorQuery::SyncMode::ASYNC;
} else {
coordinator_query->sync_mode_ = memgraph::query::CoordinatorQuery::SyncMode::SYNC;
}
return coordinator_query;
}
@ -389,48 +407,6 @@ antlrcpp::Any CypherMainVisitor::visitShowReplicationCluster(MemgraphCypher::Sho
return coordinator_query;
}
// License check is done in the interpreter
antlrcpp::Any CypherMainVisitor::visitRegisterReplicaCoordinatorServer(
MemgraphCypher::RegisterReplicaCoordinatorServerContext *ctx) {
auto *coordinator_query = storage_->Create<CoordinatorQuery>();
if (!ctx->socketAddress()->literal()->StringLiteral()) {
throw SemanticException("Socket address should be a string literal!");
}
if (!ctx->coordinatorSocketAddress()->literal()->StringLiteral()) {
throw SemanticException("Coordinator socket address should be a string literal!");
}
coordinator_query->action_ = CoordinatorQuery::Action::REGISTER_REPLICA_COORDINATOR_SERVER;
coordinator_query->role_ = CoordinatorQuery::ReplicationRole::REPLICA;
coordinator_query->socket_address_ = std::any_cast<Expression *>(ctx->socketAddress()->accept(this));
coordinator_query->coordinator_socket_address_ =
std::any_cast<Expression *>(ctx->coordinatorSocketAddress()->accept(this));
coordinator_query->instance_name_ = std::any_cast<std::string>(ctx->instanceName()->symbolicName()->accept(this));
if (ctx->SYNC()) {
coordinator_query->sync_mode_ = memgraph::query::CoordinatorQuery::SyncMode::SYNC;
} else if (ctx->ASYNC()) {
coordinator_query->sync_mode_ = memgraph::query::CoordinatorQuery::SyncMode::ASYNC;
}
return coordinator_query;
}
// License check is done in the interpreter
antlrcpp::Any CypherMainVisitor::visitRegisterMainCoordinatorServer(
MemgraphCypher::RegisterMainCoordinatorServerContext *ctx) {
if (!ctx->coordinatorSocketAddress()->literal()->StringLiteral()) {
throw SemanticException("Coordinator socket address should be a string literal!");
}
auto *coordinator_query = storage_->Create<CoordinatorQuery>();
coordinator_query->action_ = CoordinatorQuery::Action::REGISTER_MAIN_COORDINATOR_SERVER;
coordinator_query->role_ = CoordinatorQuery::ReplicationRole::MAIN;
coordinator_query->coordinator_socket_address_ =
std::any_cast<Expression *>(ctx->coordinatorSocketAddress()->accept(this));
coordinator_query->instance_name_ = std::any_cast<std::string>(ctx->instanceName()->symbolicName()->accept(this));
return coordinator_query;
}
antlrcpp::Any CypherMainVisitor::visitDropReplica(MemgraphCypher::DropReplicaContext *ctx) {
auto *replication_query = storage_->Create<ReplicationQuery>();
replication_query->action_ = ReplicationQuery::Action::DROP_REPLICA;
@ -452,6 +428,15 @@ antlrcpp::Any CypherMainVisitor::visitDoFailover(MemgraphCypher::DoFailoverConte
return coordinator_query;
}
// License check is done in the interpreter
antlrcpp::Any CypherMainVisitor::visitSetInstanceToMain(MemgraphCypher::SetInstanceToMainContext *ctx) {
auto *coordinator_query = storage_->Create<CoordinatorQuery>();
coordinator_query->action_ = CoordinatorQuery::Action::SET_INSTANCE_TO_MAIN;
coordinator_query->instance_name_ = std::any_cast<std::string>(ctx->instanceName()->symbolicName()->accept(this));
query_ = coordinator_query;
return coordinator_query;
}
antlrcpp::Any CypherMainVisitor::visitLockPathQuery(MemgraphCypher::LockPathQueryContext *ctx) {
auto *lock_query = storage_->Create<LockPathQuery>();
if (ctx->STATUS()) {

View File

@ -241,18 +241,12 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
/**
* @return CoordinatorQuery*
*/
antlrcpp::Any visitRegisterCoordinatorServer(MemgraphCypher::RegisterCoordinatorServerContext *ctx) override;
antlrcpp::Any visitRegisterInstanceOnCoordinator(MemgraphCypher::RegisterInstanceOnCoordinatorContext *ctx) override;
/**
* @return CoordinatorQuery*
*/
antlrcpp::Any visitRegisterMainCoordinatorServer(MemgraphCypher::RegisterMainCoordinatorServerContext *ctx) override;
/**
* @return CoordinatorQuery*
*/
antlrcpp::Any visitRegisterReplicaCoordinatorServer(
MemgraphCypher::RegisterReplicaCoordinatorServerContext *ctx) override;
antlrcpp::Any visitSetInstanceToMain(MemgraphCypher::SetInstanceToMainContext *ctx) override;
/**
* @return CoordinatorQuery*

View File

@ -102,6 +102,7 @@ FILTER : F I L T E R ;
IN : I N ;
INDEX : I N D E X ;
INFO : I N F O ;
INSTANCE : I N S T A N C E ;
IS : I S ;
KB : K B ;
KEY : K E Y ;
@ -122,6 +123,7 @@ PROCEDURE : P R O C E D U R E ;
PROFILE : P R O F I L E ;
QUERY : Q U E R Y ;
REDUCE : R E D U C E ;
REGISTER : R E G I S T E R;
REMOVE : R E M O V E ;
RETURN : R E T U R N ;
SET : S E T ;

View File

@ -63,6 +63,7 @@ memgraphCypherKeyword : cypherKeyword
| GRANT
| HEADER
| IDENTIFIED
| INSTANCE
| NODE_LABELS
| NULLIF
| IMPORT
@ -186,7 +187,8 @@ replicationQuery : setReplicationRole
| showReplicas
;
coordinatorQuery : registerCoordinatorServer
coordinatorQuery : registerInstanceOnCoordinator
| setInstanceToMain
| showReplicationCluster
| doFailover
;
@ -382,15 +384,14 @@ instanceName : symbolicName ;
socketAddress : literal ;
coordinatorSocketAddress : literal ;
replicationSocketAddress : literal ;
registerReplica : REGISTER REPLICA instanceName ( SYNC | ASYNC )
TO socketAddress ;
registerReplicaCoordinatorServer: REGISTER REPLICA instanceName ( ASYNC | SYNC ) TO socketAddress WITH COORDINATOR SERVER ON coordinatorSocketAddress ;
registerInstanceOnCoordinator : REGISTER INSTANCE instanceName ON coordinatorSocketAddress ( AS ASYNC ) ? WITH replicationSocketAddress ;
registerMainCoordinatorServer: REGISTER MAIN instanceName WITH COORDINATOR SERVER ON coordinatorSocketAddress ;
registerCoordinatorServer : registerMainCoordinatorServer | registerReplicaCoordinatorServer ;
setInstanceToMain : SET INSTANCE instanceName TO MAIN ;
dropReplica : DROP REPLICA instanceName ;

View File

@ -79,6 +79,7 @@ IMPORT : I M P O R T ;
INACTIVE : I N A C T I V E ;
IN_MEMORY_ANALYTICAL : I N UNDERSCORE M E M O R Y UNDERSCORE A N A L Y T I C A L ;
IN_MEMORY_TRANSACTIONAL : I N UNDERSCORE M E M O R Y UNDERSCORE T R A N S A C T I O N A L ;
INSTANCE : I N S T A N C E ;
ISOLATION : I S O L A T I O N ;
KAFKA : K A F K A ;
LABELS : L A B E L S ;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -218,7 +218,8 @@ const trie::Trie kKeywords = {"union",
"directory",
"lock",
"unlock",
"build"};
"build",
"instance"};
// Unicode codepoints that are allowed at the start of the unescaped name.
const std::bitset<kBitsetSize> kUnescapedNameAllowedStarts(

View File

@ -544,6 +544,71 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
}
}
void RegisterInstanceOnCoordinator(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) override {
const auto maybe_replication_ip_port =
io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt);
if (!maybe_replication_ip_port) {
throw QueryRuntimeException("Invalid replication socket address!");
}
const auto maybe_coordinator_ip_port =
io::network::Endpoint::ParseSocketOrAddress(coordinator_socket_address, std::nullopt);
if (!maybe_replication_ip_port) {
throw QueryRuntimeException("Invalid replication socket address!");
}
const auto [replication_ip, replication_port] = *maybe_replication_ip_port;
const auto [coordinator_server_ip, coordinator_server_port] = *maybe_coordinator_ip_port;
const auto repl_config = coordination::CoordinatorClientConfig::ReplicationClientInfo{
.instance_name = instance_name,
.replication_mode = convertFromCoordinatorToReplicationMode(sync_mode),
.replication_ip_address = replication_ip,
.replication_port = replication_port};
auto coordinator_client_config =
coordination::CoordinatorClientConfig{.instance_name = instance_name,
.ip_address = coordinator_server_ip,
.port = coordinator_server_port,
.health_check_frequency_sec = instance_check_frequency,
.replication_client_info = repl_config,
.ssl = std::nullopt};
auto status = coordinator_handler_.RegisterInstanceOnCoordinator(std::move(coordinator_client_config));
switch (status) {
using enum memgraph::coordination::RegisterInstanceCoordinatorStatus;
case NAME_EXISTS:
throw QueryRuntimeException("Couldn't register replica instance since instance with such name already exists!");
case END_POINT_EXISTS:
throw QueryRuntimeException(
"Couldn't register replica instance since instance with such endpoint already exists!");
case COULD_NOT_BE_PERSISTED:
throw QueryRuntimeException("Couldn't register replica instance since it couldn't be persisted!");
case NOT_COORDINATOR:
throw QueryRuntimeException("Couldn't register replica instance since this instance is not a coordinator!");
case SUCCESS:
break;
}
}
void SetInstanceToMain(const std::string &instance_name) override {
auto status = coordinator_handler_.SetInstanceToMain(instance_name);
switch (status) {
using enum memgraph::coordination::SetInstanceToMainCoordinatorStatus;
case NO_INSTANCE_WITH_NAME:
throw QueryRuntimeException("No instance with such name!");
case NOT_COORDINATOR:
throw QueryRuntimeException("Couldn't set replica instance to main since this instance is not a coordinator!");
case COULD_NOT_PROMOTE_TO_MAIN:
throw QueryRuntimeException(
"Couldn't set replica instance to main. Check coordinator and replica for more logs");
case SUCCESS:
break;
}
}
/// @throw QueryRuntimeException if an error ocurred.
void DoFailover() const override {
if (!FLAGS_coordinator) {
@ -1039,24 +1104,8 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
if (!FLAGS_coordinator) {
throw QueryRuntimeException("Only coordinator can register coordinator server!");
}
// TODO: MemoryResource for EvaluationContext, it should probably be passed as
// the argument to Callback.
EvaluationContext evaluation_context{.timestamp = QueryTimestamp(), .parameters = parameters};
auto evaluator = PrimitiveLiteralExpressionEvaluator{evaluation_context};
auto coordinator_socket_address_tv = coordinator_query->coordinator_socket_address_->Accept(evaluator);
callback.fn = [handler = CoordQueryHandler{dbms_handler}, coordinator_socket_address_tv,
main_check_frequency = config.replication_replica_check_frequency,
instance_name = coordinator_query->instance_name_]() mutable {
handler.RegisterMainCoordinatorServer(std::string(coordinator_socket_address_tv.ValueString()),
main_check_frequency, instance_name);
return std::vector<std::vector<TypedValue>>();
};
notifications->emplace_back(
SeverityLevel::INFO, NotificationCode::REGISTER_COORDINATOR_SERVER,
fmt::format("Coordinator has registered coordinator server on {} for instance {}.",
coordinator_socket_address_tv.ValueString(), coordinator_query->instance_name_));
throw QueryRuntimeException("Query is disabled for now");
return callback;
#endif
}
@ -1075,20 +1124,37 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
throw QueryRuntimeException("Only coordinator can register coordinator server!");
}
throw QueryRuntimeException("Query is disabled for now");
return callback;
#endif
}
case CoordinatorQuery::Action::REGISTER_INSTANCE_ON_COORDINATOR: {
if (!license::global_license_checker.IsEnterpriseValidFast()) {
throw QueryException("Trying to use enterprise feature without a valid license.");
}
#ifdef MG_ENTERPRISE
if constexpr (!coordination::allow_ha) {
throw QueryRuntimeException(
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
"be able to use this functionality.");
}
if (!FLAGS_coordinator) {
throw QueryRuntimeException("Only coordinator can register coordinator server!");
}
// TODO: MemoryResource for EvaluationContext, it should probably be passed as
// the argument to Callback.
EvaluationContext evaluation_context{.timestamp = QueryTimestamp(), .parameters = parameters};
auto evaluator = PrimitiveLiteralExpressionEvaluator{evaluation_context};
auto coordinator_socket_address_tv = coordinator_query->coordinator_socket_address_->Accept(evaluator);
auto replication_socket_address_tv = coordinator_query->socket_address_->Accept(evaluator);
callback.fn =
[handler = CoordQueryHandler{dbms_handler}, coordinator_socket_address_tv, replication_socket_address_tv,
replica_check_frequency = config.replication_replica_check_frequency,
instance_name = coordinator_query->instance_name_, sync_mode = coordinator_query->sync_mode_]() mutable {
handler.RegisterReplicaCoordinatorServer(std::string(replication_socket_address_tv.ValueString()),
std::string(coordinator_socket_address_tv.ValueString()),
replica_check_frequency, instance_name, sync_mode);
auto coordinator_socket_address_tv = coordinator_query->coordinator_socket_address_->Accept(evaluator);
auto replication_socket_address_tv = coordinator_query->replication_socket_address_->Accept(evaluator);
callback.fn = [handler = CoordQueryHandler{dbms_handler}, coordinator_socket_address_tv,
replication_socket_address_tv, main_check_frequency = config.replication_replica_check_frequency,
instance_name = coordinator_query->instance_name_,
sync_mode = coordinator_query->sync_mode_]() mutable {
handler.RegisterInstanceOnCoordinator(std::string(coordinator_socket_address_tv.ValueString()),
std::string(replication_socket_address_tv.ValueString()),
main_check_frequency, instance_name, sync_mode);
return std::vector<std::vector<TypedValue>>();
};
@ -1097,6 +1163,33 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
fmt::format("Coordinator has registered coordinator server on {} for instance {}.",
coordinator_socket_address_tv.ValueString(), coordinator_query->instance_name_));
return callback;
#endif
}
case CoordinatorQuery::Action::SET_INSTANCE_TO_MAIN: {
if (!license::global_license_checker.IsEnterpriseValidFast()) {
throw QueryException("Trying to use enterprise feature without a valid license.");
}
#ifdef MG_ENTERPRISE
if constexpr (!coordination::allow_ha) {
throw QueryRuntimeException(
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
"be able to use this functionality.");
}
if (!FLAGS_coordinator) {
throw QueryRuntimeException("Only coordinator can register coordinator server!");
}
// TODO: MemoryResource for EvaluationContext, it should probably be passed as
// the argument to Callback.
EvaluationContext evaluation_context{.timestamp = QueryTimestamp(), .parameters = parameters};
auto evaluator = PrimitiveLiteralExpressionEvaluator{evaluation_context};
callback.fn = [handler = CoordQueryHandler{dbms_handler},
instance_name = coordinator_query->instance_name_]() mutable {
handler.SetInstanceToMain(instance_name);
return std::vector<std::vector<TypedValue>>();
};
return callback;
#endif
}
case CoordinatorQuery::Action::SHOW_REPLICATION_CLUSTER: {

View File

@ -116,6 +116,14 @@ class CoordinatorQueryHandler {
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name) = 0;
virtual void RegisterInstanceOnCoordinator(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name,
CoordinatorQuery::SyncMode sync_mode) = 0;
virtual void SetInstanceToMain(const std::string &instance_name) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual std::vector<coordination::CoordinatorInstanceStatus> ShowReplicasOnCoordinator() const = 0;

View File

@ -89,7 +89,9 @@ class Scheduler {
* @throw std::system_error
*/
void Stop() {
is_paused_.store(false);
is_working_.store(false);
pause_cv_.notify_one();
condition_variable_.notify_one();
if (thread_.joinable()) thread_.join();
}

View File

@ -97,6 +97,8 @@ enum class TypeId : uint64_t {
// Coordinator
COORD_FAILOVER_REQ,
COORD_FAILOVER_RES,
COORD_SET_REPL_MAIN_REQ,
COORD_SET_REPL_MAIN_RES,
// AST
AST_LABELIX = 3000,