Fix replication queries (#65)
* Fix replication query visitors * Improve error handling in ReplQueryHandler methods Co-authored-by: jseljan <josip.seljan@memgraph.io>
This commit is contained in:
parent
60d742a2dc
commit
bff9cf07de
src
io/network
query
@ -53,6 +53,10 @@ Endpoint::ParseSocketOrIpAddress(
|
||||
LOG(ERROR) << "Invalid port number: " << parts[1];
|
||||
return std::nullopt;
|
||||
}
|
||||
if (int_port < 0) {
|
||||
LOG(ERROR) << "Port number must be a positive integer!";
|
||||
return std::nullopt;
|
||||
}
|
||||
if (int_port > std::numeric_limits<uint16_t>::max()) {
|
||||
LOG(ERROR) << "Port number exceeded maximum possible size!";
|
||||
return std::nullopt;
|
||||
|
@ -4,5 +4,5 @@
|
||||
|
||||
namespace query {
|
||||
constexpr uint16_t kDefaultReplicationPort = 10000;
|
||||
constexpr auto *kDefaultReplicationServerIp = "127.0.0.1";
|
||||
constexpr auto *kDefaultReplicationServerIp = "0.0.0.0";
|
||||
} // namespace query
|
||||
|
@ -218,13 +218,12 @@ antlrcpp::Any CypherMainVisitor::visitSetReplicationRole(
|
||||
} else if (ctx->REPLICA()) {
|
||||
replication_query->role_ = ReplicationQuery::ReplicationRole::REPLICA;
|
||||
if (ctx->WITH() && ctx->PORT()) {
|
||||
if (!ctx->port) {
|
||||
throw SyntaxException("Port not given!");
|
||||
}
|
||||
if (!ctx->port->numberLiteral()->integerLiteral()) {
|
||||
if (ctx->port->numberLiteral() &&
|
||||
ctx->port->numberLiteral()->integerLiteral()) {
|
||||
replication_query->port_ = ctx->port->accept(this);
|
||||
} else {
|
||||
throw SyntaxException("Port must be an integer literal!");
|
||||
}
|
||||
replication_query->port_ = ctx->port->accept(this);
|
||||
}
|
||||
}
|
||||
return replication_query;
|
||||
@ -244,22 +243,29 @@ antlrcpp::Any CypherMainVisitor::visitRegisterReplica(
|
||||
ctx->replicaName()->symbolicName()->accept(this).as<std::string>();
|
||||
if (ctx->SYNC()) {
|
||||
replication_query->sync_mode_ = query::ReplicationQuery::SyncMode::SYNC;
|
||||
if (ctx->WITH() && ctx->TIMEOUT()) {
|
||||
if (ctx->timeout->numberLiteral()) {
|
||||
// we accept both double and integer literals
|
||||
replication_query->timeout_ = ctx->timeout->accept(this);
|
||||
} else {
|
||||
throw SemanticException(
|
||||
"Timeout should be a integer or double literal!");
|
||||
}
|
||||
}
|
||||
} else if (ctx->ASYNC()) {
|
||||
if (ctx->WITH() && ctx->TIMEOUT()) {
|
||||
throw SyntaxException(
|
||||
"Timeout can be set only for the SYNC replication mode!");
|
||||
}
|
||||
replication_query->sync_mode_ = query::ReplicationQuery::SyncMode::ASYNC;
|
||||
}
|
||||
|
||||
if (!ctx->socketAddress()->literal()->StringLiteral()) {
|
||||
throw SyntaxException("Socket address should be a string literal!");
|
||||
throw SemanticException("Socket address should be a string literal!");
|
||||
} else {
|
||||
replication_query->socket_address_ = ctx->socketAddress()->accept(this);
|
||||
}
|
||||
if (ctx->timeout) {
|
||||
if (!ctx->timeout->numberLiteral()->doubleLiteral() &&
|
||||
!ctx->timeout->numberLiteral()->integerLiteral()) {
|
||||
throw SyntaxException("Timeout should be a double literal!");
|
||||
} else {
|
||||
replication_query->timeout_ = ctx->timeout->accept(this);
|
||||
}
|
||||
}
|
||||
|
||||
return replication_query;
|
||||
}
|
||||
|
||||
|
@ -168,19 +168,24 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
|
||||
public:
|
||||
explicit ReplQueryHandler(storage::Storage *db) : db_(db) {}
|
||||
|
||||
bool SetReplicationRole(ReplicationQuery::ReplicationRole replication_role,
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
void SetReplicationRole(ReplicationQuery::ReplicationRole replication_role,
|
||||
std::optional<int64_t> port) override {
|
||||
if (replication_role == ReplicationQuery::ReplicationRole::MAIN) {
|
||||
return db_->SetMainReplicationRole();
|
||||
if (!db_->SetMainReplicationRole()) {
|
||||
throw QueryRuntimeException("Couldn't set role to main!");
|
||||
}
|
||||
}
|
||||
if (replication_role == ReplicationQuery::ReplicationRole::REPLICA) {
|
||||
if (!port || *port > std::numeric_limits<uint16_t>::max()) {
|
||||
return false;
|
||||
if (!port || *port < 0 || *port > std::numeric_limits<uint16_t>::max()) {
|
||||
throw QueryRuntimeException("Port number invalid!");
|
||||
}
|
||||
if (!db_->SetReplicaRole(
|
||||
io::network::Endpoint(query::kDefaultReplicationServerIp,
|
||||
static_cast<uint16_t>(*port)))) {
|
||||
throw QueryRuntimeException("Couldn't set role to replica!");
|
||||
}
|
||||
return db_->SetReplicaRole(io::network::Endpoint(
|
||||
query::kDefaultReplicationServerIp, static_cast<uint16_t>(*port)));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
@ -191,14 +196,20 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
|
||||
case storage::ReplicationRole::REPLICA:
|
||||
return ReplicationQuery::ReplicationRole::REPLICA;
|
||||
}
|
||||
throw QueryRuntimeException("Couldn't get replication role!");
|
||||
throw QueryRuntimeException(
|
||||
"Couldn't show replication role - invalid role set!");
|
||||
}
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
bool RegisterReplica(const std::string &name,
|
||||
void RegisterReplica(const std::string &name,
|
||||
const std::string &socket_address,
|
||||
const ReplicationQuery::SyncMode sync_mode,
|
||||
const std::optional<double> timeout) override {
|
||||
if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) {
|
||||
// replica can't register another replica
|
||||
throw QueryRuntimeException("Replica can't register another replica!");
|
||||
}
|
||||
|
||||
storage::replication::ReplicationMode repl_mode;
|
||||
switch (sync_mode) {
|
||||
case ReplicationQuery::SyncMode::ASYNC: {
|
||||
@ -211,35 +222,42 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
auto maybe_ip_and_port = io::network::Endpoint::ParseSocketOrIpAddress(
|
||||
socket_address, query::kDefaultReplicationPort);
|
||||
if (maybe_ip_and_port) {
|
||||
auto [ip, port] = *maybe_ip_and_port;
|
||||
auto ret = db_->RegisterReplica(name, {std::move(ip), port}, repl_mode);
|
||||
return (!ret.HasError());
|
||||
auto ret =
|
||||
db_->RegisterReplica(name, {std::move(ip), port}, repl_mode,
|
||||
{.timeout = timeout, .ssl = std::nullopt});
|
||||
if (ret.HasError()) {
|
||||
throw QueryRuntimeException(
|
||||
fmt::format("Couldn't register replica '{}'!", name));
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
throw QueryRuntimeException("Invalid socket address!");
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
throw QueryRuntimeException(
|
||||
fmt::format("Couldn't register replica! Reason: {}", e.what()));
|
||||
}
|
||||
}
|
||||
|
||||
/// returns false if the desired replica couldn't be dropped
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
bool DropReplica(const std::string &replica_name) override {
|
||||
try {
|
||||
return db_->UnregisterReplica(replica_name);
|
||||
} catch (std::exception &e) {
|
||||
void DropReplica(const std::string &replica_name) override {
|
||||
if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) {
|
||||
// replica can't unregister a replica
|
||||
throw QueryRuntimeException("Replica can't unregister a replica!");
|
||||
}
|
||||
if (!db_->UnregisterReplica(replica_name)) {
|
||||
throw QueryRuntimeException(
|
||||
fmt::format("Couldn't unregister replica! Reason: {}", e.what()));
|
||||
fmt::format("Couldn't unregister the replica '{}'", replica_name));
|
||||
}
|
||||
}
|
||||
|
||||
using Replica = ReplicationQueryHandler::Replica;
|
||||
std::vector<Replica> ShowReplicas() const override {
|
||||
if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) {
|
||||
// replica can't show registered replicas (it shouldn't have any)
|
||||
throw QueryRuntimeException(
|
||||
"Replica can't show registered replicas (it shouldn't have any)!");
|
||||
}
|
||||
|
||||
auto repl_infos = db_->ReplicasInfo();
|
||||
std::vector<Replica> replicas;
|
||||
replicas.reserve(repl_infos.size());
|
||||
@ -287,7 +305,7 @@ class ReplQueryHandler : public query::ReplicationQueryHandler {
|
||||
// Dummy ctor - just there to make the replication query handler work
|
||||
// in both community and enterprise versions.
|
||||
explicit ReplQueryHandler(storage::Storage *db) {}
|
||||
bool SetReplicationRole(ReplicationQuery::ReplicationRole replication_role,
|
||||
void SetReplicationRole(ReplicationQuery::ReplicationRole replication_role,
|
||||
std::optional<int64_t> port) override {
|
||||
throw NoReplicationInCommunity();
|
||||
}
|
||||
@ -296,14 +314,14 @@ class ReplQueryHandler : public query::ReplicationQueryHandler {
|
||||
throw NoReplicationInCommunity();
|
||||
}
|
||||
|
||||
bool RegisterReplica(const std::string &name,
|
||||
void RegisterReplica(const std::string &name,
|
||||
const std::string &socket_address,
|
||||
const ReplicationQuery::SyncMode sync_mode,
|
||||
const std::optional<double> timeout) {
|
||||
throw NoReplicationInCommunity();
|
||||
}
|
||||
|
||||
bool DropReplica(const std::string &replica_name) override {
|
||||
void DropReplica(const std::string &replica_name) override {
|
||||
throw NoReplicationInCommunity();
|
||||
}
|
||||
|
||||
@ -500,10 +518,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query,
|
||||
maybe_port = port.ValueInt();
|
||||
}
|
||||
callback.fn = [handler, role = repl_query->role_, maybe_port] {
|
||||
if (!handler->SetReplicationRole(role, maybe_port)) {
|
||||
throw QueryRuntimeException(
|
||||
"Couldn't set the desired replication role.");
|
||||
}
|
||||
handler->SetReplicationRole(role, maybe_port);
|
||||
return std::vector<std::vector<TypedValue>>();
|
||||
};
|
||||
return callback;
|
||||
@ -530,20 +545,17 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query,
|
||||
auto socket_address = repl_query->socket_address_->Accept(evaluator);
|
||||
auto timeout =
|
||||
EvaluateOptionalExpression(repl_query->timeout_, &evaluator);
|
||||
std::optional<double> opt_timeout;
|
||||
std::optional<double> maybe_timeout;
|
||||
if (timeout.IsDouble()) {
|
||||
opt_timeout = timeout.ValueDouble();
|
||||
maybe_timeout = timeout.ValueDouble();
|
||||
} else if (timeout.IsInt()) {
|
||||
opt_timeout = static_cast<double>(timeout.ValueInt());
|
||||
maybe_timeout = static_cast<double>(timeout.ValueInt());
|
||||
}
|
||||
callback.fn = [handler, name, socket_address, sync_mode, opt_timeout] {
|
||||
callback.fn = [handler, name, socket_address, sync_mode, maybe_timeout] {
|
||||
CHECK(socket_address.IsString());
|
||||
if (!handler->RegisterReplica(name,
|
||||
std::string(socket_address.ValueString()),
|
||||
sync_mode, opt_timeout)) {
|
||||
throw QueryRuntimeException(
|
||||
"Couldn't create the desired replica '{}'.", name);
|
||||
}
|
||||
handler->RegisterReplica(name,
|
||||
std::string(socket_address.ValueString()),
|
||||
sync_mode, maybe_timeout);
|
||||
return std::vector<std::vector<TypedValue>>();
|
||||
};
|
||||
return callback;
|
||||
@ -551,9 +563,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query,
|
||||
case ReplicationQuery::Action::DROP_REPLICA: {
|
||||
const auto &name = repl_query->replica_name_;
|
||||
callback.fn = [handler, name] {
|
||||
if (!handler->DropReplica(name)) {
|
||||
throw QueryRuntimeException("Couldn't drop the replica '{}'.", name);
|
||||
}
|
||||
handler->DropReplica(name);
|
||||
return std::vector<std::vector<TypedValue>>();
|
||||
};
|
||||
return callback;
|
||||
|
@ -116,25 +116,22 @@ class ReplicationQueryHandler {
|
||||
std::optional<double> timeout;
|
||||
};
|
||||
|
||||
/// returns false if the replication role can't be set
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual bool SetReplicationRole(
|
||||
virtual void SetReplicationRole(
|
||||
ReplicationQuery::ReplicationRole replication_role,
|
||||
std::optional<int64_t> port) = 0;
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual ReplicationQuery::ReplicationRole ShowReplicationRole() const = 0;
|
||||
|
||||
/// returns false if the replica can't be registered
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual bool RegisterReplica(const std::string &name,
|
||||
virtual void RegisterReplica(const std::string &name,
|
||||
const std::string &socket_address,
|
||||
const ReplicationQuery::SyncMode sync_mode,
|
||||
const std::optional<double> timeout) = 0;
|
||||
|
||||
/// returns false if the desired replica couldn't be dropped
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual bool DropReplica(const std::string &replica_name) = 0;
|
||||
virtual void DropReplica(const std::string &replica_name) = 0;
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual std::vector<Replica> ShowReplicas() const = 0;
|
||||
|
Loading…
Reference in New Issue
Block a user