Implement ReplicationQueryHandler class (#52)
* Refactor io::network::Endpoint class * Add ParseSocketOrIpAddress static method to Endpoint class * Implement ReplQueryHandler methods * Add implementation of SetReplicationRole to ReplQueryHandler * Fix PrepareReplicationQuery (create PullPlanVector) Co-authored-by: jseljan <josip.seljan@memgraph.io>
This commit is contained in:
parent
f23e2e12c4
commit
a0fb3fc463
@ -6,35 +6,86 @@
|
||||
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "utils/string.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
|
||||
namespace io::network {
|
||||
|
||||
Endpoint::Endpoint() {}
|
||||
Endpoint::Endpoint(const std::string &address, uint16_t port)
|
||||
: address_(address), port_(port) {
|
||||
Endpoint::IpFamily Endpoint::GetIpFamily(const std::string &ip_address) {
|
||||
in_addr addr4;
|
||||
in6_addr addr6;
|
||||
int ipv4_result = inet_pton(AF_INET, address_.c_str(), &addr4);
|
||||
int ipv6_result = inet_pton(AF_INET6, address_.c_str(), &addr6);
|
||||
if (ipv4_result == 1)
|
||||
family_ = 4;
|
||||
else if (ipv6_result == 1)
|
||||
family_ = 6;
|
||||
CHECK(family_ != 0) << "Not a valid IPv4 or IPv6 address: " << address;
|
||||
int ipv4_result = inet_pton(AF_INET, ip_address.c_str(), &addr4);
|
||||
int ipv6_result = inet_pton(AF_INET6, ip_address.c_str(), &addr6);
|
||||
if (ipv4_result == 1) {
|
||||
return IpFamily::IP4;
|
||||
} else if (ipv6_result == 1) {
|
||||
return IpFamily::IP6;
|
||||
} else {
|
||||
return IpFamily::NONE;
|
||||
}
|
||||
}
|
||||
|
||||
bool Endpoint::operator==(const Endpoint &other) const {
|
||||
return address_ == other.address_ && port_ == other.port_ &&
|
||||
family_ == other.family_;
|
||||
std::optional<std::pair<std::string, uint16_t>>
|
||||
Endpoint::ParseSocketOrIpAddress(
|
||||
const std::string &address,
|
||||
const std::optional<uint16_t> default_port = {}) {
|
||||
/// expected address format:
|
||||
/// - "ip_address:port_number"
|
||||
/// - "ip_address"
|
||||
/// We parse the address first. If it's an IP address, a default port must
|
||||
// be given, or we return nullopt. If it's a socket address, we try to parse
|
||||
// it into an ip address and a port number; even if a default port is given,
|
||||
// it won't be used, as we expect that it is given in the address string.
|
||||
const std::string delimiter = ":";
|
||||
std::string ip_address;
|
||||
|
||||
std::vector<std::string> parts = utils::Split(address, delimiter);
|
||||
if (parts.size() == 1) {
|
||||
if (default_port) {
|
||||
return std::pair{address, *default_port};
|
||||
}
|
||||
} else if (parts.size() == 2) {
|
||||
ip_address = std::move(parts[0]);
|
||||
int64_t int_port{0};
|
||||
try {
|
||||
int_port = utils::ParseInt(parts[1]);
|
||||
} catch (utils::BasicException &e) {
|
||||
LOG(ERROR) << "Invalid port number: " << parts[1];
|
||||
return std::nullopt;
|
||||
}
|
||||
if (int_port > std::numeric_limits<uint16_t>::max()) {
|
||||
LOG(ERROR) << "Port number exceeded maximum possible size!";
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return std::pair{ip_address, static_cast<uint16_t>(int_port)};
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::string Endpoint::SocketAddress() const {
|
||||
auto ip_address = address.empty() ? "EMPTY" : address;
|
||||
return ip_address + ":" + std::to_string(port);
|
||||
}
|
||||
|
||||
Endpoint::Endpoint() {}
|
||||
Endpoint::Endpoint(std::string ip_address, uint16_t port)
|
||||
: address(std::move(ip_address)), port(port) {
|
||||
IpFamily ip_family = GetIpFamily(address);
|
||||
CHECK(ip_family != IpFamily::NONE)
|
||||
<< "Not a valid IPv4 or IPv6 address: " << ip_address;
|
||||
family = ip_family;
|
||||
}
|
||||
|
||||
std::ostream &operator<<(std::ostream &os, const Endpoint &endpoint) {
|
||||
if (endpoint.family() == 6) {
|
||||
return os << "[" << endpoint.address() << "]"
|
||||
<< ":" << endpoint.port();
|
||||
// no need to cover the IpFamily::NONE case, as you can't even construct an
|
||||
// Endpoint object if the IpFamily is NONE (i.e. the IP address is invalid)
|
||||
if (endpoint.family == Endpoint::IpFamily::IP6) {
|
||||
return os << "[" << endpoint.address << "]"
|
||||
<< ":" << endpoint.port;
|
||||
}
|
||||
return os << endpoint.address() << ":" << endpoint.port();
|
||||
return os << endpoint.address << ":" << endpoint.port;
|
||||
}
|
||||
|
||||
} // namespace io::network
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <netinet/in.h>
|
||||
#include <cstdint>
|
||||
#include <iostream>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
|
||||
namespace io::network {
|
||||
@ -12,22 +13,35 @@ namespace io::network {
|
||||
* It is used when connecting to an address and to get the current
|
||||
* connection address.
|
||||
*/
|
||||
class Endpoint {
|
||||
public:
|
||||
struct Endpoint {
|
||||
Endpoint();
|
||||
Endpoint(const std::string &address, uint16_t port);
|
||||
Endpoint(std::string ip_address, uint16_t port);
|
||||
|
||||
// TODO: Remove these since members are public
|
||||
std::string address() const { return address_; }
|
||||
uint16_t port() const { return port_; }
|
||||
unsigned char family() const { return family_; }
|
||||
enum class IpFamily : std::uint8_t { NONE, IP4, IP6 };
|
||||
|
||||
bool operator==(const Endpoint &other) const;
|
||||
std::string SocketAddress() const;
|
||||
|
||||
bool operator==(const Endpoint &other) const = default;
|
||||
friend std::ostream &operator<<(std::ostream &os, const Endpoint &endpoint);
|
||||
|
||||
std::string address_;
|
||||
uint16_t port_{0};
|
||||
unsigned char family_{0};
|
||||
std::string address;
|
||||
uint16_t port{0};
|
||||
IpFamily family{IpFamily::NONE};
|
||||
|
||||
/**
|
||||
* Tries to parse the given string as either a socket address or ip address.
|
||||
* Expected address format:
|
||||
* - "ip_address:port_number"
|
||||
* - "ip_address"
|
||||
* We parse the address first. If it's an IP address, a default port must
|
||||
* be given, or we return nullopt. If it's a socket address, we try to parse
|
||||
* it into an ip address and a port number; even if a default port is given,
|
||||
* it won't be used, as we expect that it is given in the address string.
|
||||
*/
|
||||
static std::optional<std::pair<std::string, uint16_t>> ParseSocketOrIpAddress(
|
||||
const std::string &address, const std::optional<uint16_t> default_port);
|
||||
|
||||
static IpFamily GetIpFamily(const std::string &ip_address);
|
||||
};
|
||||
|
||||
} // namespace io::network
|
||||
|
@ -60,8 +60,8 @@ bool Socket::IsOpen() const { return socket_ != -1; }
|
||||
bool Socket::Connect(const Endpoint &endpoint) {
|
||||
if (socket_ != -1) return false;
|
||||
|
||||
auto info = AddrInfo::Get(endpoint.address().c_str(),
|
||||
std::to_string(endpoint.port()).c_str());
|
||||
auto info = AddrInfo::Get(endpoint.address.c_str(),
|
||||
std::to_string(endpoint.port).c_str());
|
||||
|
||||
for (struct addrinfo *it = info; it != nullptr; it = it->ai_next) {
|
||||
int sfd = socket(it->ai_family, it->ai_socktype, it->ai_protocol);
|
||||
@ -84,8 +84,8 @@ bool Socket::Connect(const Endpoint &endpoint) {
|
||||
bool Socket::Bind(const Endpoint &endpoint) {
|
||||
if (socket_ != -1) return false;
|
||||
|
||||
auto info = AddrInfo::Get(endpoint.address().c_str(),
|
||||
std::to_string(endpoint.port()).c_str());
|
||||
auto info = AddrInfo::Get(endpoint.address.c_str(),
|
||||
std::to_string(endpoint.port).c_str());
|
||||
|
||||
for (struct addrinfo *it = info; it != nullptr; it = it->ai_next) {
|
||||
int sfd = socket(it->ai_family, it->ai_socktype, it->ai_protocol);
|
||||
@ -122,7 +122,7 @@ bool Socket::Bind(const Endpoint &endpoint) {
|
||||
return false;
|
||||
}
|
||||
|
||||
endpoint_ = Endpoint(endpoint.address(), ntohs(portdata.sin6_port));
|
||||
endpoint_ = Endpoint(endpoint.address, ntohs(portdata.sin6_port));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -228,8 +228,8 @@ class BoltSession final
|
||||
for (const auto &kv : params)
|
||||
params_pv.emplace(kv.first, glue::ToPropertyValue(kv.second));
|
||||
#ifdef MG_ENTERPRISE
|
||||
audit_log_->Record(endpoint_.address(), user_ ? user_->username() : "",
|
||||
query, storage::PropertyValue(params_pv));
|
||||
audit_log_->Record(endpoint_.address, user_ ? user_->username() : "", query,
|
||||
storage::PropertyValue(params_pv));
|
||||
#endif
|
||||
try {
|
||||
auto result = interpreter_.Prepare(query, params_pv);
|
||||
|
8
src/query/constants.hpp
Normal file
8
src/query/constants.hpp
Normal file
@ -0,0 +1,8 @@
|
||||
#pragma once
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
|
||||
namespace query {
|
||||
constexpr uint16_t kDefaultReplicationPort = 10000;
|
||||
constexpr auto *kDefaultReplicationServerIp = "127.0.0.1";
|
||||
} // namespace query
|
@ -9,6 +9,7 @@
|
||||
#include "query/db_accessor.hpp"
|
||||
#include "query/dump.hpp"
|
||||
#include "query/exceptions.hpp"
|
||||
#include "query/constants.hpp"
|
||||
#include "query/frontend/ast/cypher_main_visitor.hpp"
|
||||
#include "query/frontend/opencypher/parser.hpp"
|
||||
#include "query/frontend/semantic/required_privileges.hpp"
|
||||
@ -162,6 +163,158 @@ TypedValue EvaluateOptionalExpression(Expression *expression,
|
||||
return expression ? expression->Accept(*eval) : TypedValue();
|
||||
}
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
class ReplQueryHandler final : public query::ReplicationQueryHandler {
|
||||
public:
|
||||
explicit ReplQueryHandler(storage::Storage *db) : db_(db) {}
|
||||
|
||||
bool SetReplicationRole(ReplicationQuery::ReplicationRole replication_role,
|
||||
std::optional<int64_t> port) override {
|
||||
if (replication_role == ReplicationQuery::ReplicationRole::MAIN) {
|
||||
return db_->SetMainReplicationRole();
|
||||
}
|
||||
if (replication_role == ReplicationQuery::ReplicationRole::REPLICA) {
|
||||
if (!port || *port > std::numeric_limits<uint16_t>::max()) {
|
||||
return false;
|
||||
}
|
||||
return db_->SetReplicaRole(io::network::Endpoint(
|
||||
query::kDefaultReplicationServerIp, static_cast<uint16_t>(*port)));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
ReplicationQuery::ReplicationRole ShowReplicationRole() const override {
|
||||
switch (db_->GetReplicationRole()) {
|
||||
case storage::ReplicationRole::MAIN:
|
||||
return ReplicationQuery::ReplicationRole::MAIN;
|
||||
case storage::ReplicationRole::REPLICA:
|
||||
return ReplicationQuery::ReplicationRole::REPLICA;
|
||||
}
|
||||
throw QueryRuntimeException("Couldn't get replication role!");
|
||||
}
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
bool RegisterReplica(const std::string &name,
|
||||
const std::string &socket_address,
|
||||
const ReplicationQuery::SyncMode sync_mode,
|
||||
const std::optional<double> timeout) override {
|
||||
storage::replication::ReplicationMode repl_mode;
|
||||
switch (sync_mode) {
|
||||
case ReplicationQuery::SyncMode::ASYNC: {
|
||||
repl_mode = storage::replication::ReplicationMode::ASYNC;
|
||||
break;
|
||||
}
|
||||
case ReplicationQuery::SyncMode::SYNC: {
|
||||
repl_mode = storage::replication::ReplicationMode::SYNC;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
auto maybe_ip_and_port = io::network::Endpoint::ParseSocketOrIpAddress(
|
||||
socket_address, query::kDefaultReplicationPort);
|
||||
if (maybe_ip_and_port) {
|
||||
auto [ip, port] = *maybe_ip_and_port;
|
||||
auto ret = db_->RegisterReplica(name, {std::move(ip), port}, repl_mode);
|
||||
return (!ret.HasError());
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
throw QueryRuntimeException(
|
||||
fmt::format("Couldn't register replica! Reason: {}", e.what()));
|
||||
}
|
||||
}
|
||||
|
||||
/// returns false if the desired replica couldn't be dropped
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
bool DropReplica(const std::string &replica_name) override {
|
||||
try {
|
||||
return db_->UnregisterReplica(replica_name);
|
||||
} catch (std::exception &e) {
|
||||
throw QueryRuntimeException(
|
||||
fmt::format("Couldn't unregister replica! Reason: {}", e.what()));
|
||||
}
|
||||
}
|
||||
|
||||
using Replica = ReplicationQueryHandler::Replica;
|
||||
std::vector<Replica> ShowReplicas() const override {
|
||||
auto repl_infos = db_->ReplicasInfo();
|
||||
std::vector<Replica> replicas;
|
||||
replicas.reserve(repl_infos.size());
|
||||
|
||||
const auto from_info = [](const auto &repl_info) -> Replica {
|
||||
Replica replica;
|
||||
replica.name = repl_info.name;
|
||||
replica.socket_address = repl_info.endpoint.SocketAddress();
|
||||
switch (repl_info.mode) {
|
||||
case storage::replication::ReplicationMode::SYNC:
|
||||
replica.sync_mode = ReplicationQuery::SyncMode::SYNC;
|
||||
break;
|
||||
case storage::replication::ReplicationMode::ASYNC:
|
||||
replica.sync_mode = ReplicationQuery::SyncMode::ASYNC;
|
||||
break;
|
||||
}
|
||||
if (repl_info.timeout) {
|
||||
replica.timeout = *repl_info.timeout;
|
||||
}
|
||||
|
||||
return replica;
|
||||
};
|
||||
|
||||
std::transform(repl_infos.begin(), repl_infos.end(),
|
||||
std::back_inserter(replicas), from_info);
|
||||
return replicas;
|
||||
}
|
||||
|
||||
private:
|
||||
storage::Storage *db_;
|
||||
};
|
||||
/// returns false if the replication role can't be set
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
#else
|
||||
|
||||
class NoReplicationInCommunity : public query::QueryRuntimeException {
|
||||
public:
|
||||
NoReplicationInCommunity()
|
||||
: query::QueryRuntimeException::QueryRuntimeException(
|
||||
"Replication is not supported in Memgraph Community!") {}
|
||||
};
|
||||
|
||||
class ReplQueryHandler : public query::ReplicationQueryHandler {
|
||||
public:
|
||||
// Dummy ctor - just there to make the replication query handler work
|
||||
// in both community and enterprise versions.
|
||||
explicit ReplQueryHandler(storage::Storage *db) {}
|
||||
bool SetReplicationRole(ReplicationQuery::ReplicationRole replication_role,
|
||||
std::optional<int64_t> port) override {
|
||||
throw NoReplicationInCommunity();
|
||||
}
|
||||
|
||||
ReplicationQuery::ReplicationRole ShowReplicationRole() const override {
|
||||
throw NoReplicationInCommunity();
|
||||
}
|
||||
|
||||
bool RegisterReplica(const std::string &name,
|
||||
const std::string &socket_address,
|
||||
const ReplicationQuery::SyncMode sync_mode,
|
||||
const std::optional<double> timeout) {
|
||||
throw NoReplicationInCommunity();
|
||||
}
|
||||
|
||||
bool DropReplica(const std::string &replica_name) override {
|
||||
throw NoReplicationInCommunity();
|
||||
}
|
||||
|
||||
using Replica = ReplicationQueryHandler::Replica;
|
||||
|
||||
std::vector<Replica> ShowReplicas() const override {
|
||||
throw NoReplicationInCommunity();
|
||||
}
|
||||
};
|
||||
#endif
|
||||
|
||||
Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth,
|
||||
const Parameters ¶meters,
|
||||
DbAccessor *db_accessor) {
|
||||
@ -324,7 +477,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth,
|
||||
}
|
||||
|
||||
Callback HandleReplicationQuery(ReplicationQuery *repl_query,
|
||||
ReplicationQueryHandler *handler,
|
||||
ReplQueryHandler *handler,
|
||||
const Parameters ¶meters,
|
||||
DbAccessor *db_accessor) {
|
||||
Frame frame(0);
|
||||
@ -341,7 +494,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query,
|
||||
Callback callback;
|
||||
switch (repl_query->action_) {
|
||||
case ReplicationQuery::Action::SET_REPLICATION_ROLE: {
|
||||
auto port = repl_query->port_->Accept(evaluator);
|
||||
auto port = EvaluateOptionalExpression(repl_query->port_, &evaluator);
|
||||
std::optional<int64_t> maybe_port;
|
||||
if (port.IsInt()) {
|
||||
maybe_port = port.ValueInt();
|
||||
@ -386,8 +539,8 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query,
|
||||
callback.fn = [handler, name, socket_address, sync_mode, opt_timeout] {
|
||||
CHECK(socket_address.IsString());
|
||||
if (!handler->RegisterReplica(name,
|
||||
std::string(socket_address.ValueString()),
|
||||
sync_mode, opt_timeout)) {
|
||||
std::string(socket_address.ValueString()),
|
||||
sync_mode, opt_timeout)) {
|
||||
throw QueryRuntimeException(
|
||||
"Couldn't create the desired replica '{}'.", name);
|
||||
}
|
||||
@ -406,7 +559,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query,
|
||||
return callback;
|
||||
}
|
||||
case ReplicationQuery::Action::SHOW_REPLICAS: {
|
||||
callback.header = {"name", "hostname", "sync_mode", "timeout"};
|
||||
callback.header = {"name", "socket_address", "sync_mode", "timeout"};
|
||||
callback.fn = [handler, replica_nfields = callback.header.size()] {
|
||||
const auto &replicas = handler->ShowReplicas();
|
||||
auto typed_replicas = std::vector<std::vector<TypedValue>>{};
|
||||
@ -1016,50 +1169,32 @@ PreparedQuery PrepareAuthQuery(
|
||||
}};
|
||||
}
|
||||
|
||||
PreparedQuery PrepareReplicationQuery(
|
||||
ParsedQuery parsed_query, bool in_explicit_transaction,
|
||||
std::map<std::string, TypedValue> *summary,
|
||||
InterpreterContext *interpreter_context, DbAccessor *dba,
|
||||
utils::MonotonicBufferResource *execution_memory) {
|
||||
PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query,
|
||||
bool in_explicit_transaction,
|
||||
InterpreterContext *interpreter_context,
|
||||
DbAccessor *dba) {
|
||||
if (in_explicit_transaction) {
|
||||
throw ReplicationModificationInMulticommandTxException();
|
||||
}
|
||||
|
||||
auto *replication_query =
|
||||
utils::Downcast<ReplicationQuery>(parsed_query.query);
|
||||
auto callback =
|
||||
HandleReplicationQuery(replication_query, interpreter_context->repl,
|
||||
parsed_query.parameters, dba);
|
||||
ReplQueryHandler handler{interpreter_context->db};
|
||||
auto callback = HandleReplicationQuery(replication_query, &handler,
|
||||
parsed_query.parameters, dba);
|
||||
|
||||
SymbolTable symbol_table;
|
||||
std::vector<Symbol> output_symbols;
|
||||
for (const auto &column : callback.header) {
|
||||
output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false"));
|
||||
}
|
||||
|
||||
auto plan =
|
||||
std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
|
||||
std::make_unique<plan::OutputTable>(
|
||||
output_symbols,
|
||||
[fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }),
|
||||
0.0, AstStorage{}, symbol_table));
|
||||
auto pull_plan =
|
||||
std::make_shared<PullPlan>(plan, parsed_query.parameters, false, dba,
|
||||
interpreter_context, execution_memory);
|
||||
return PreparedQuery{
|
||||
callback.header, std::move(parsed_query.required_privileges),
|
||||
[pull_plan = std::move(pull_plan), callback = std::move(callback),
|
||||
output_symbols = std::move(output_symbols),
|
||||
summary](AnyStream *stream,
|
||||
std::optional<int> n) -> std::optional<QueryHandlerResult> {
|
||||
if (pull_plan->Pull(stream, n, output_symbols, summary)) {
|
||||
return callback.should_abort_query ? QueryHandlerResult::ABORT
|
||||
: QueryHandlerResult::COMMIT;
|
||||
[pull_plan = std::make_shared<PullPlanVector>(callback.fn())](
|
||||
AnyStream *stream,
|
||||
std::optional<int> n) -> std::optional<QueryHandlerResult> {
|
||||
if (pull_plan->Pull(stream, n)) {
|
||||
return QueryHandlerResult::COMMIT;
|
||||
}
|
||||
return std::nullopt;
|
||||
}};
|
||||
}
|
||||
|
||||
|
||||
PreparedQuery PrepareInfoQuery(
|
||||
ParsedQuery parsed_query, bool in_explicit_transaction,
|
||||
std::map<std::string, TypedValue> *summary,
|
||||
@ -1443,8 +1578,7 @@ Interpreter::PrepareResult Interpreter::Prepare(
|
||||
} else if (utils::Downcast<ReplicationQuery>(parsed_query.query)) {
|
||||
prepared_query = PrepareReplicationQuery(
|
||||
std::move(parsed_query), in_explicit_transaction_,
|
||||
&query_execution->summary, interpreter_context_,
|
||||
&*execution_db_accessor_, &query_execution->execution_memory);
|
||||
interpreter_context_, &*execution_db_accessor_);
|
||||
} else {
|
||||
LOG(FATAL) << "Should not get here -- unknown query type!";
|
||||
}
|
||||
|
@ -101,11 +101,12 @@ enum class QueryHandlerResult { COMMIT, ABORT, NOTHING };
|
||||
class ReplicationQueryHandler {
|
||||
public:
|
||||
ReplicationQueryHandler() = default;
|
||||
~ReplicationQueryHandler() = default;
|
||||
virtual ~ReplicationQueryHandler() = default;
|
||||
|
||||
ReplicationQueryHandler(const ReplicationQueryHandler &) = delete;
|
||||
ReplicationQueryHandler(ReplicationQueryHandler &&) = delete;
|
||||
ReplicationQueryHandler &operator=(const ReplicationQueryHandler &) = delete;
|
||||
|
||||
ReplicationQueryHandler(ReplicationQueryHandler &&) = delete;
|
||||
ReplicationQueryHandler &operator=(ReplicationQueryHandler &&) = delete;
|
||||
|
||||
struct Replica {
|
||||
@ -118,7 +119,7 @@ class ReplicationQueryHandler {
|
||||
/// returns false if the replication role can't be set
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual bool SetReplicationRole(
|
||||
ReplicationQuery::ReplicationRole replication_mode,
|
||||
ReplicationQuery::ReplicationRole replication_role,
|
||||
std::optional<int64_t> port) = 0;
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
@ -127,9 +128,9 @@ class ReplicationQueryHandler {
|
||||
/// returns false if the replica can't be registered
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual bool RegisterReplica(const std::string &name,
|
||||
const std::string &hostname,
|
||||
ReplicationQuery::SyncMode sync_mode,
|
||||
std::optional<double> timeout) = 0;
|
||||
const std::string &socket_address,
|
||||
const ReplicationQuery::SyncMode sync_mode,
|
||||
const std::optional<double> timeout) = 0;
|
||||
|
||||
/// returns false if the desired replica couldn't be dropped
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
@ -246,7 +247,6 @@ struct InterpreterContext {
|
||||
double execution_timeout_sec{180.0};
|
||||
|
||||
AuthQueryHandler *auth{nullptr};
|
||||
ReplicationQueryHandler *repl{nullptr};
|
||||
|
||||
utils::SkipList<QueryCacheEntry> ast_cache;
|
||||
utils::SkipList<PlanCacheEntry> plan_cache;
|
||||
|
@ -1956,25 +1956,26 @@ uint64_t Storage::CommitTimestamp(
|
||||
}
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
void Storage::SetReplicaRole(
|
||||
bool Storage::SetReplicaRole(
|
||||
io::network::Endpoint endpoint,
|
||||
const replication::ReplicationServerConfig &config) {
|
||||
// We don't want to restart the server if we're already a REPLICA
|
||||
if (replication_role_ == ReplicationRole::REPLICA) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
replication_server_ =
|
||||
std::make_unique<ReplicationServer>(this, std::move(endpoint), config);
|
||||
|
||||
replication_role_.store(ReplicationRole::REPLICA);
|
||||
return true;
|
||||
}
|
||||
|
||||
void Storage::SetMainReplicationRole() {
|
||||
bool Storage::SetMainReplicationRole() {
|
||||
// We don't want to generate new epoch_id and do the
|
||||
// cleanup if we're already a MAIN
|
||||
if (replication_role_ == ReplicationRole::MAIN) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Main instance does not need replication server
|
||||
@ -1997,6 +1998,7 @@ void Storage::SetMainReplicationRole() {
|
||||
}
|
||||
|
||||
replication_role_.store(ReplicationRole::MAIN);
|
||||
return true;
|
||||
}
|
||||
|
||||
utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
|
||||
|
@ -417,10 +417,10 @@ class Storage final {
|
||||
|
||||
#if MG_ENTERPRISE
|
||||
|
||||
void SetReplicaRole(io::network::Endpoint endpoint,
|
||||
bool SetReplicaRole(io::network::Endpoint endpoint,
|
||||
const replication::ReplicationServerConfig &config = {});
|
||||
|
||||
void SetMainReplicationRole();
|
||||
bool SetMainReplicationRole();
|
||||
|
||||
enum class RegisterReplicaError : uint8_t { NAME_EXISTS, CONNECTION_FAILED };
|
||||
|
||||
@ -584,8 +584,8 @@ class Storage final {
|
||||
// we don't want to create the client directly inside the vector
|
||||
// because that would require the lock on the list putting all
|
||||
// commits (they iterate list of clients) to halt.
|
||||
// This way we can initiliaze client in main thread which means
|
||||
// that we can immediately notify the user if the intiialization
|
||||
// This way we can initialize client in main thread which means
|
||||
// that we can immediately notify the user if the initialization
|
||||
// failed.
|
||||
using ReplicationClientList =
|
||||
utils::Synchronized<std::vector<std::unique_ptr<ReplicationClient>>,
|
||||
|
@ -72,7 +72,7 @@ TEST(Network, SocketReadHangOnConcurrentConnections) {
|
||||
// start clients
|
||||
std::vector<std::thread> clients;
|
||||
for (int i = 0; i < Nc; ++i)
|
||||
clients.push_back(std::thread(client_run, i, interface, ep.port()));
|
||||
clients.push_back(std::thread(client_run, i, interface, ep.port));
|
||||
|
||||
// wait for 2s and stop clients
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
|
@ -30,7 +30,7 @@ TEST(Network, Server) {
|
||||
std::vector<std::thread> clients;
|
||||
for (int i = 0; i < N; ++i)
|
||||
clients.push_back(
|
||||
std::thread(client_run, i, interface, ep.port(), data, 30000, SIZE));
|
||||
std::thread(client_run, i, interface, ep.port, data, 30000, SIZE));
|
||||
|
||||
// cleanup clients
|
||||
for (int i = 0; i < N; ++i) clients[i].join();
|
||||
|
@ -33,8 +33,8 @@ TEST(Network, SessionLeak) {
|
||||
const auto &ep = server.endpoint();
|
||||
int testlen = 3000;
|
||||
for (int i = 0; i < N; ++i) {
|
||||
clients.push_back(std::thread(client_run, i, interface, ep.port(), data,
|
||||
testlen, testlen));
|
||||
clients.push_back(
|
||||
std::thread(client_run, i, interface, ep.port, data, testlen, testlen));
|
||||
std::this_thread::sleep_for(10ms);
|
||||
}
|
||||
|
||||
|
@ -12,9 +12,9 @@ TEST(Endpoint, IPv4) {
|
||||
|
||||
// test constructor
|
||||
endpoint = endpoint_t("127.0.0.1", 12347);
|
||||
EXPECT_EQ(endpoint.address(), "127.0.0.1");
|
||||
EXPECT_EQ(endpoint.port(), 12347);
|
||||
EXPECT_EQ(endpoint.family(), 4);
|
||||
EXPECT_EQ(endpoint.address, "127.0.0.1");
|
||||
EXPECT_EQ(endpoint.port, 12347);
|
||||
EXPECT_EQ(endpoint.family, endpoint_t::IpFamily::IP4);
|
||||
|
||||
// test address invalid
|
||||
EXPECT_DEATH(endpoint_t("invalid", 12345), "address");
|
||||
@ -25,9 +25,9 @@ TEST(Endpoint, IPv6) {
|
||||
|
||||
// test constructor
|
||||
endpoint = endpoint_t("ab:cd:ef::3", 12347);
|
||||
EXPECT_EQ(endpoint.address(), "ab:cd:ef::3");
|
||||
EXPECT_EQ(endpoint.port(), 12347);
|
||||
EXPECT_EQ(endpoint.family(), 6);
|
||||
EXPECT_EQ(endpoint.address, "ab:cd:ef::3");
|
||||
EXPECT_EQ(endpoint.port, 12347);
|
||||
EXPECT_EQ(endpoint.family, endpoint_t::IpFamily::IP6);
|
||||
|
||||
// test address invalid
|
||||
EXPECT_DEATH(endpoint_t("::g", 12345), "address");
|
||||
|
Loading…
Reference in New Issue
Block a user