Add support for specifying the replica port to SET REPLICATION ROLE query (#61)
Co-authored-by: jseljan <josip.seljan@memgraph.io>
This commit is contained in:
parent
f7b764607d
commit
87e00f4fef
@ -88,9 +88,12 @@ the role of all replica nodes using the following openCypher query before you
|
|||||||
can enable replication on the main:
|
can enable replication on the main:
|
||||||
|
|
||||||
```plaintext
|
```plaintext
|
||||||
SET REPLICATION ROLE TO (MAIN|REPLICA);
|
SET REPLICATION ROLE TO (MAIN|REPLICA) WITH PORT <port_number>;
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Note that the "WITH PORT <port_number>" part of the query sets the replication port,
|
||||||
|
but it applies only to the replica. In other words, if you try to set the
|
||||||
|
replication port as the main, a semantic exception will be thrown.
|
||||||
After you have set your replica instance to the correct operating role, you can
|
After you have set your replica instance to the correct operating role, you can
|
||||||
enable replication in the main instance by issuing the following openCypher
|
enable replication in the main instance by issuing the following openCypher
|
||||||
command:
|
command:
|
||||||
|
@ -2304,6 +2304,7 @@ cpp<#
|
|||||||
(socket_address "Expression *" :initval "nullptr" :scope :public
|
(socket_address "Expression *" :initval "nullptr" :scope :public
|
||||||
:slk-save #'slk-save-ast-pointer
|
:slk-save #'slk-save-ast-pointer
|
||||||
:slk-load (slk-load-ast-pointer "Expression"))
|
:slk-load (slk-load-ast-pointer "Expression"))
|
||||||
|
(port "Expression *" :initval "nullptr" :scope :public)
|
||||||
(sync_mode "SyncMode" :scope :public)
|
(sync_mode "SyncMode" :scope :public)
|
||||||
(timeout "Expression *" :initval "nullptr" :scope :public
|
(timeout "Expression *" :initval "nullptr" :scope :public
|
||||||
:slk-save #'slk-save-ast-pointer
|
:slk-save #'slk-save-ast-pointer
|
||||||
|
@ -211,13 +211,24 @@ antlrcpp::Any CypherMainVisitor::visitSetReplicationRole(
|
|||||||
auto *replication_query = storage_->Create<ReplicationQuery>();
|
auto *replication_query = storage_->Create<ReplicationQuery>();
|
||||||
replication_query->action_ = ReplicationQuery::Action::SET_REPLICATION_ROLE;
|
replication_query->action_ = ReplicationQuery::Action::SET_REPLICATION_ROLE;
|
||||||
if (ctx->MAIN()) {
|
if (ctx->MAIN()) {
|
||||||
|
if (ctx->WITH() || ctx->PORT()) {
|
||||||
|
throw SemanticException("Main can't set a port!");
|
||||||
|
}
|
||||||
replication_query->role_ = ReplicationQuery::ReplicationRole::MAIN;
|
replication_query->role_ = ReplicationQuery::ReplicationRole::MAIN;
|
||||||
} else if (ctx->REPLICA()) {
|
} else if (ctx->REPLICA()) {
|
||||||
replication_query->role_ = ReplicationQuery::ReplicationRole::REPLICA;
|
replication_query->role_ = ReplicationQuery::ReplicationRole::REPLICA;
|
||||||
|
if (ctx->WITH() && ctx->PORT()) {
|
||||||
|
if (!ctx->port) {
|
||||||
|
throw SyntaxException("Port not given!");
|
||||||
|
}
|
||||||
|
if (!ctx->port->numberLiteral()->integerLiteral()) {
|
||||||
|
throw SyntaxException("Port must be an integer literal!");
|
||||||
|
}
|
||||||
|
replication_query->port_ = ctx->port->accept(this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return replication_query;
|
return replication_query;
|
||||||
}
|
}
|
||||||
|
|
||||||
antlrcpp::Any CypherMainVisitor::visitShowReplicationRole(
|
antlrcpp::Any CypherMainVisitor::visitShowReplicationRole(
|
||||||
MemgraphCypher::ShowReplicationRoleContext *ctx) {
|
MemgraphCypher::ShowReplicationRoleContext *ctx) {
|
||||||
auto *replication_query = storage_->Create<ReplicationQuery>();
|
auto *replication_query = storage_->Create<ReplicationQuery>();
|
||||||
|
@ -22,6 +22,7 @@ memgraphCypherKeyword : cypherKeyword
|
|||||||
| MAIN
|
| MAIN
|
||||||
| MODE
|
| MODE
|
||||||
| PASSWORD
|
| PASSWORD
|
||||||
|
| PORT
|
||||||
| PRIVILEGES
|
| PRIVILEGES
|
||||||
| REGISTER
|
| REGISTER
|
||||||
| REPLICA
|
| REPLICA
|
||||||
@ -118,7 +119,8 @@ showUsersForRole : SHOW USERS FOR role=userOrRoleName ;
|
|||||||
|
|
||||||
dumpQuery: DUMP DATABASE ;
|
dumpQuery: DUMP DATABASE ;
|
||||||
|
|
||||||
setReplicationRole : SET REPLICATION ROLE TO ( MAIN | REPLICA ) ;
|
setReplicationRole : SET REPLICATION ROLE TO ( MAIN | REPLICA )
|
||||||
|
( WITH PORT port=literal ) ? ;
|
||||||
|
|
||||||
showReplicationRole : SHOW REPLICATION ROLE ;
|
showReplicationRole : SHOW REPLICATION ROLE ;
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@ IDENTIFIED : I D E N T I F I E D ;
|
|||||||
MAIN : M A I N ;
|
MAIN : M A I N ;
|
||||||
MODE : M O D E ;
|
MODE : M O D E ;
|
||||||
PASSWORD : P A S S W O R D ;
|
PASSWORD : P A S S W O R D ;
|
||||||
|
PORT : P O R T ;
|
||||||
PRIVILEGES : P R I V I L E G E S ;
|
PRIVILEGES : P R I V I L E G E S ;
|
||||||
REGISTER : R E G I S T E R ;
|
REGISTER : R E G I S T E R ;
|
||||||
REPLICA : R E P L I C A ;
|
REPLICA : R E P L I C A ;
|
||||||
|
@ -341,8 +341,13 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query,
|
|||||||
Callback callback;
|
Callback callback;
|
||||||
switch (repl_query->action_) {
|
switch (repl_query->action_) {
|
||||||
case ReplicationQuery::Action::SET_REPLICATION_ROLE: {
|
case ReplicationQuery::Action::SET_REPLICATION_ROLE: {
|
||||||
callback.fn = [handler, role = repl_query->role_] {
|
auto port = repl_query->port_->Accept(evaluator);
|
||||||
if (!handler->SetReplicationRole(role)) {
|
std::optional<int64_t> maybe_port;
|
||||||
|
if (port.IsInt()) {
|
||||||
|
maybe_port = port.ValueInt();
|
||||||
|
}
|
||||||
|
callback.fn = [handler, role = repl_query->role_, maybe_port] {
|
||||||
|
if (!handler->SetReplicationRole(role, maybe_port)) {
|
||||||
throw QueryRuntimeException(
|
throw QueryRuntimeException(
|
||||||
"Couldn't set the desired replication role.");
|
"Couldn't set the desired replication role.");
|
||||||
}
|
}
|
||||||
|
@ -118,7 +118,8 @@ class ReplicationQueryHandler {
|
|||||||
/// returns false if the replication role can't be set
|
/// returns false if the replication role can't be set
|
||||||
/// @throw QueryRuntimeException if an error ocurred.
|
/// @throw QueryRuntimeException if an error ocurred.
|
||||||
virtual bool SetReplicationRole(
|
virtual bool SetReplicationRole(
|
||||||
ReplicationQuery::ReplicationRole replication_mode) = 0;
|
ReplicationQuery::ReplicationRole replication_mode,
|
||||||
|
std::optional<int64_t> port) = 0;
|
||||||
|
|
||||||
/// @throw QueryRuntimeException if an error ocurred.
|
/// @throw QueryRuntimeException if an error ocurred.
|
||||||
virtual ReplicationQuery::ReplicationRole ShowReplicationRole() const = 0;
|
virtual ReplicationQuery::ReplicationRole ShowReplicationRole() const = 0;
|
||||||
|
@ -2457,7 +2457,8 @@ void check_replication_query(Base *ast_generator, const ReplicationQuery *query,
|
|||||||
const std::string name,
|
const std::string name,
|
||||||
const std::optional<TypedValue> socket_address,
|
const std::optional<TypedValue> socket_address,
|
||||||
const ReplicationQuery::SyncMode sync_mode,
|
const ReplicationQuery::SyncMode sync_mode,
|
||||||
const std::optional<TypedValue> timeout) {
|
const std::optional<TypedValue> timeout = {},
|
||||||
|
const std::optional<TypedValue> port = {}) {
|
||||||
EXPECT_EQ(query->replica_name_, name);
|
EXPECT_EQ(query->replica_name_, name);
|
||||||
EXPECT_EQ(query->sync_mode_, sync_mode);
|
EXPECT_EQ(query->sync_mode_, sync_mode);
|
||||||
ASSERT_EQ(static_cast<bool>(query->socket_address_),
|
ASSERT_EQ(static_cast<bool>(query->socket_address_),
|
||||||
@ -2469,6 +2470,10 @@ void check_replication_query(Base *ast_generator, const ReplicationQuery *query,
|
|||||||
if (timeout) {
|
if (timeout) {
|
||||||
ast_generator->CheckLiteral(query->timeout_, *timeout);
|
ast_generator->CheckLiteral(query->timeout_, *timeout);
|
||||||
}
|
}
|
||||||
|
ASSERT_EQ(static_cast<bool>(query->port_), static_cast<bool>(port));
|
||||||
|
if (port) {
|
||||||
|
ast_generator->CheckLiteral(query->port_, *port);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_P(CypherMainVisitorTest, TestShowReplicationMode) {
|
TEST_P(CypherMainVisitorTest, TestShowReplicationMode) {
|
||||||
@ -2490,18 +2495,40 @@ TEST_P(CypherMainVisitorTest, TestShowReplicasQuery) {
|
|||||||
|
|
||||||
TEST_P(CypherMainVisitorTest, TestSetReplicationMode) {
|
TEST_P(CypherMainVisitorTest, TestSetReplicationMode) {
|
||||||
auto &ast_generator = *GetParam();
|
auto &ast_generator = *GetParam();
|
||||||
const std::string missing_mode_query = "SET REPLICATION ROLE";
|
|
||||||
ASSERT_THROW(ast_generator.ParseQuery(missing_mode_query), SyntaxException);
|
|
||||||
|
|
||||||
const std::string bad_mode_query = "SET REPLICATION ROLE TO BUTTERY";
|
{
|
||||||
ASSERT_THROW(ast_generator.ParseQuery(bad_mode_query), SyntaxException);
|
const std::string query = "SET REPLICATION ROLE";
|
||||||
|
ASSERT_THROW(ast_generator.ParseQuery(query), SyntaxException);
|
||||||
|
}
|
||||||
|
|
||||||
const std::string full_query = "SET REPLICATION ROLE TO MAIN";
|
{
|
||||||
auto *parsed_full_query =
|
const std::string query = "SET REPLICATION ROLE TO BUTTERY";
|
||||||
dynamic_cast<ReplicationQuery *>(ast_generator.ParseQuery(full_query));
|
ASSERT_THROW(ast_generator.ParseQuery(query), SyntaxException);
|
||||||
EXPECT_EQ(parsed_full_query->action_,
|
}
|
||||||
ReplicationQuery::Action::SET_REPLICATION_ROLE);
|
|
||||||
EXPECT_EQ(parsed_full_query->role_, ReplicationQuery::ReplicationRole::MAIN);
|
{
|
||||||
|
const std::string query = "SET REPLICATION ROLE TO MAIN";
|
||||||
|
auto *parsed_query =
|
||||||
|
dynamic_cast<ReplicationQuery *>(ast_generator.ParseQuery(query));
|
||||||
|
EXPECT_EQ(parsed_query->action_,
|
||||||
|
ReplicationQuery::Action::SET_REPLICATION_ROLE);
|
||||||
|
EXPECT_EQ(parsed_query->role_, ReplicationQuery::ReplicationRole::MAIN);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const std::string query = "SET REPLICATION ROLE TO MAIN WITH PORT 10000";
|
||||||
|
ASSERT_THROW(ast_generator.ParseQuery(query), SemanticException);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const std::string query = "SET REPLICATION ROLE TO REPLICA WITH PORT 10000";
|
||||||
|
auto *parsed_query =
|
||||||
|
dynamic_cast<ReplicationQuery *>(ast_generator.ParseQuery(query));
|
||||||
|
EXPECT_EQ(parsed_query->action_,
|
||||||
|
ReplicationQuery::Action::SET_REPLICATION_ROLE);
|
||||||
|
EXPECT_EQ(parsed_query->role_, ReplicationQuery::ReplicationRole::REPLICA);
|
||||||
|
ast_generator.CheckLiteral(parsed_query->port_, TypedValue(10000));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_P(CypherMainVisitorTest, TestRegisterReplicationQuery) {
|
TEST_P(CypherMainVisitorTest, TestRegisterReplicationQuery) {
|
||||||
@ -2517,7 +2544,7 @@ TEST_P(CypherMainVisitorTest, TestRegisterReplicationQuery) {
|
|||||||
ASSERT_TRUE(no_timeout_query_parsed);
|
ASSERT_TRUE(no_timeout_query_parsed);
|
||||||
check_replication_query(&ast_generator, no_timeout_query_parsed, "replica1",
|
check_replication_query(&ast_generator, no_timeout_query_parsed, "replica1",
|
||||||
TypedValue("127.0.0.1"),
|
TypedValue("127.0.0.1"),
|
||||||
ReplicationQuery::SyncMode::SYNC, {});
|
ReplicationQuery::SyncMode::SYNC);
|
||||||
|
|
||||||
std::string full_query =
|
std::string full_query =
|
||||||
R"(REGISTER REPLICA replica2 SYNC WITH TIMEOUT 0.5 TO "1.1.1.1:10000")";
|
R"(REGISTER REPLICA replica2 SYNC WITH TIMEOUT 0.5 TO "1.1.1.1:10000")";
|
||||||
|
Loading…
Reference in New Issue
Block a user