Remove sync with timeout (#423)
* Remove timout when registering a sync replica * Simplify jepsen configuration file * Remove timeout from jepsen configuration * Add unit test * Remove TimeoutDispatcher
This commit is contained in:
parent
10ca68bb2a
commit
b737e53456
@ -2375,10 +2375,7 @@ cpp<#
|
||||
(port "Expression *" :initval "nullptr" :scope :public
|
||||
:slk-save #'slk-save-ast-pointer
|
||||
:slk-load (slk-load-ast-pointer "Expression"))
|
||||
(sync_mode "SyncMode" :scope :public)
|
||||
(timeout "Expression *" :initval "nullptr" :scope :public
|
||||
:slk-save #'slk-save-ast-pointer
|
||||
:slk-load (slk-load-ast-pointer "Expression")))
|
||||
(sync_mode "SyncMode" :scope :public))
|
||||
|
||||
(:public
|
||||
(lcp:define-enum action
|
||||
|
@ -275,18 +275,7 @@ antlrcpp::Any CypherMainVisitor::visitRegisterReplica(MemgraphCypher::RegisterRe
|
||||
replication_query->replica_name_ = ctx->replicaName()->symbolicName()->accept(this).as<std::string>();
|
||||
if (ctx->SYNC()) {
|
||||
replication_query->sync_mode_ = memgraph::query::ReplicationQuery::SyncMode::SYNC;
|
||||
if (ctx->WITH() && ctx->TIMEOUT()) {
|
||||
if (ctx->timeout->numberLiteral()) {
|
||||
// we accept both double and integer literals
|
||||
replication_query->timeout_ = ctx->timeout->accept(this);
|
||||
} else {
|
||||
throw SemanticException("Timeout should be a integer or double literal!");
|
||||
}
|
||||
}
|
||||
} else if (ctx->ASYNC()) {
|
||||
if (ctx->WITH() && ctx->TIMEOUT()) {
|
||||
throw SyntaxException("Timeout can be set only for the SYNC replication mode!");
|
||||
}
|
||||
replication_query->sync_mode_ = memgraph::query::ReplicationQuery::SyncMode::ASYNC;
|
||||
}
|
||||
|
||||
|
@ -276,7 +276,6 @@ replicaName : symbolicName ;
|
||||
socketAddress : literal ;
|
||||
|
||||
registerReplica : REGISTER REPLICA replicaName ( SYNC | ASYNC )
|
||||
( WITH TIMEOUT timeout=literal ) ?
|
||||
TO socketAddress ;
|
||||
|
||||
dropReplica : DROP REPLICA replicaName ;
|
||||
|
@ -160,7 +160,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
void RegisterReplica(const std::string &name, const std::string &socket_address,
|
||||
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout,
|
||||
const ReplicationQuery::SyncMode sync_mode,
|
||||
const std::chrono::seconds replica_check_frequency) override {
|
||||
if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) {
|
||||
// replica can't register another replica
|
||||
@ -183,9 +183,8 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
|
||||
io::network::Endpoint::ParseSocketOrIpAddress(socket_address, query::kDefaultReplicationPort);
|
||||
if (maybe_ip_and_port) {
|
||||
auto [ip, port] = *maybe_ip_and_port;
|
||||
auto ret = db_->RegisterReplica(
|
||||
name, {std::move(ip), port}, repl_mode,
|
||||
{.timeout = timeout, .replica_check_frequency = replica_check_frequency, .ssl = std::nullopt});
|
||||
auto ret = db_->RegisterReplica(name, {std::move(ip), port}, repl_mode,
|
||||
{.replica_check_frequency = replica_check_frequency, .ssl = std::nullopt});
|
||||
if (ret.HasError()) {
|
||||
throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name));
|
||||
}
|
||||
@ -228,9 +227,6 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
|
||||
replica.sync_mode = ReplicationQuery::SyncMode::ASYNC;
|
||||
break;
|
||||
}
|
||||
if (repl_info.timeout) {
|
||||
replica.timeout = *repl_info.timeout;
|
||||
}
|
||||
|
||||
replica.current_timestamp_of_replica = repl_info.timestamp_info.current_timestamp_of_replica;
|
||||
replica.current_number_of_timestamp_behind_master =
|
||||
@ -487,22 +483,11 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
|
||||
const auto &name = repl_query->replica_name_;
|
||||
const auto &sync_mode = repl_query->sync_mode_;
|
||||
auto socket_address = repl_query->socket_address_->Accept(evaluator);
|
||||
auto timeout = EvaluateOptionalExpression(repl_query->timeout_, &evaluator);
|
||||
const auto replica_check_frequency = interpreter_context->config.replication_replica_check_frequency;
|
||||
std::optional<double> maybe_timeout;
|
||||
if (timeout.IsDouble()) {
|
||||
maybe_timeout = timeout.ValueDouble();
|
||||
} else if (timeout.IsInt()) {
|
||||
maybe_timeout = static_cast<double>(timeout.ValueInt());
|
||||
}
|
||||
if (maybe_timeout && *maybe_timeout <= 0.0) {
|
||||
throw utils::BasicException("Parameter TIMEOUT must be strictly greater than 0.");
|
||||
}
|
||||
|
||||
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name, socket_address, sync_mode,
|
||||
maybe_timeout, replica_check_frequency]() mutable {
|
||||
handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout,
|
||||
replica_check_frequency);
|
||||
replica_check_frequency]() mutable {
|
||||
handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, replica_check_frequency);
|
||||
return std::vector<std::vector<TypedValue>>();
|
||||
};
|
||||
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::REGISTER_REPLICA,
|
||||
@ -522,13 +507,9 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
|
||||
}
|
||||
|
||||
case ReplicationQuery::Action::SHOW_REPLICAS: {
|
||||
callback.header = {"name",
|
||||
"socket_address",
|
||||
"sync_mode",
|
||||
"timeout",
|
||||
"current_timestamp_of_replica",
|
||||
"number_of_timestamp_behind_master",
|
||||
"state"};
|
||||
callback.header = {
|
||||
"name", "socket_address", "sync_mode", "current_timestamp_of_replica", "number_of_timestamp_behind_master",
|
||||
"state"};
|
||||
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, replica_nfields = callback.header.size()] {
|
||||
const auto &replicas = handler.ShowReplicas();
|
||||
auto typed_replicas = std::vector<std::vector<TypedValue>>{};
|
||||
@ -549,12 +530,6 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
|
||||
break;
|
||||
}
|
||||
|
||||
if (replica.timeout) {
|
||||
typed_replica.emplace_back(TypedValue(*replica.timeout));
|
||||
} else {
|
||||
typed_replica.emplace_back(TypedValue());
|
||||
}
|
||||
|
||||
typed_replica.emplace_back(TypedValue(static_cast<int64_t>(replica.current_timestamp_of_replica)));
|
||||
typed_replica.emplace_back(
|
||||
TypedValue(static_cast<int64_t>(replica.current_number_of_timestamp_behind_master)));
|
||||
|
@ -140,7 +140,7 @@ class ReplicationQueryHandler {
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual void RegisterReplica(const std::string &name, const std::string &socket_address,
|
||||
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout,
|
||||
ReplicationQuery::SyncMode sync_mode,
|
||||
const std::chrono::seconds replica_check_frequency) = 0;
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
|
@ -15,7 +15,6 @@
|
||||
|
||||
namespace memgraph::storage::replication {
|
||||
struct ReplicationClientConfig {
|
||||
std::optional<double> timeout;
|
||||
// The default delay between main checking/pinging replicas is 1s because
|
||||
// that seems like a reasonable timeframe in which main should notice a
|
||||
// replica is down.
|
||||
|
@ -43,12 +43,6 @@ Storage::ReplicationClient::ReplicationClient(std::string name, Storage *storage
|
||||
rpc_client_.emplace(endpoint, &*rpc_context_);
|
||||
TryInitializeClientSync();
|
||||
|
||||
if (config.timeout && replica_state_ != replication::ReplicaState::INVALID) {
|
||||
MG_ASSERT(*config.timeout > 0);
|
||||
timeout_.emplace(*config.timeout);
|
||||
timeout_dispatcher_.emplace();
|
||||
}
|
||||
|
||||
// Help the user to get the most accurate replica state possible.
|
||||
if (config.replica_check_frequency > std::chrono::seconds(0)) {
|
||||
replica_checker_.Run("Replica Checker", config.replica_check_frequency, [&] { FrequentCheck(); });
|
||||
@ -239,41 +233,6 @@ void Storage::ReplicationClient::FinalizeTransactionReplication() {
|
||||
|
||||
if (mode_ == replication::ReplicationMode::ASYNC) {
|
||||
thread_pool_.AddTask([this] { this->FinalizeTransactionReplicationInternal(); });
|
||||
} else if (timeout_) {
|
||||
MG_ASSERT(mode_ == replication::ReplicationMode::SYNC, "Only SYNC replica can have a timeout.");
|
||||
MG_ASSERT(timeout_dispatcher_, "Timeout thread is missing");
|
||||
timeout_dispatcher_->WaitForTaskToFinish();
|
||||
|
||||
timeout_dispatcher_->active = true;
|
||||
thread_pool_.AddTask([&, this] {
|
||||
this->FinalizeTransactionReplicationInternal();
|
||||
std::unique_lock main_guard(timeout_dispatcher_->main_lock);
|
||||
// TimerThread can finish waiting for timeout
|
||||
timeout_dispatcher_->active = false;
|
||||
// Notify the main thread
|
||||
timeout_dispatcher_->main_cv.notify_one();
|
||||
});
|
||||
|
||||
timeout_dispatcher_->StartTimeoutTask(*timeout_);
|
||||
|
||||
// Wait until one of the threads notifies us that they finished executing
|
||||
// Both threads should first set the active flag to false
|
||||
{
|
||||
std::unique_lock main_guard(timeout_dispatcher_->main_lock);
|
||||
timeout_dispatcher_->main_cv.wait(main_guard, [&] { return !timeout_dispatcher_->active.load(); });
|
||||
}
|
||||
|
||||
// TODO (antonio2368): Document and/or polish SEMI-SYNC to ASYNC fallback.
|
||||
if (replica_state_ == replication::ReplicaState::REPLICATING) {
|
||||
mode_ = replication::ReplicationMode::ASYNC;
|
||||
timeout_.reset();
|
||||
// This can only happen if we timeouted so we are sure that
|
||||
// Timeout task finished
|
||||
// We need to delete timeout dispatcher AFTER the replication
|
||||
// finished because it tries to acquire the timeout lock
|
||||
// and acces the `active` variable`
|
||||
thread_pool_.AddTask([this] { timeout_dispatcher_.reset(); });
|
||||
}
|
||||
} else {
|
||||
FinalizeTransactionReplicationInternal();
|
||||
}
|
||||
@ -566,30 +525,6 @@ Storage::TimestampInfo Storage::ReplicationClient::GetTimestampInfo() {
|
||||
return info;
|
||||
}
|
||||
|
||||
////// TimeoutDispatcher //////
|
||||
void Storage::ReplicationClient::TimeoutDispatcher::WaitForTaskToFinish() {
|
||||
// Wait for the previous timeout task to finish
|
||||
std::unique_lock main_guard(main_lock);
|
||||
main_cv.wait(main_guard, [&] { return finished; });
|
||||
}
|
||||
|
||||
void Storage::ReplicationClient::TimeoutDispatcher::StartTimeoutTask(const double timeout) {
|
||||
timeout_pool.AddTask([timeout, this] {
|
||||
finished = false;
|
||||
using std::chrono::steady_clock;
|
||||
const auto timeout_duration =
|
||||
std::chrono::duration_cast<steady_clock::duration>(std::chrono::duration<double>(timeout));
|
||||
const auto end_time = steady_clock::now() + timeout_duration;
|
||||
while (active && (steady_clock::now() < end_time)) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
}
|
||||
|
||||
std::unique_lock main_guard(main_lock);
|
||||
finished = true;
|
||||
active = false;
|
||||
main_cv.notify_one();
|
||||
});
|
||||
}
|
||||
////// ReplicaStream //////
|
||||
Storage::ReplicationClient::ReplicaStream::ReplicaStream(ReplicationClient *self,
|
||||
const uint64_t previous_commit_timestamp,
|
||||
|
@ -120,8 +120,6 @@ class Storage::ReplicationClient {
|
||||
|
||||
auto Mode() const { return mode_; }
|
||||
|
||||
auto Timeout() const { return timeout_; }
|
||||
|
||||
const auto &Endpoint() const { return rpc_client_->Endpoint(); }
|
||||
|
||||
Storage::TimestampInfo GetTimestampInfo();
|
||||
@ -158,30 +156,6 @@ class Storage::ReplicationClient {
|
||||
std::optional<ReplicaStream> replica_stream_;
|
||||
replication::ReplicationMode mode_{replication::ReplicationMode::SYNC};
|
||||
|
||||
// Dispatcher class for timeout tasks
|
||||
struct TimeoutDispatcher {
|
||||
explicit TimeoutDispatcher(){};
|
||||
|
||||
void WaitForTaskToFinish();
|
||||
|
||||
void StartTimeoutTask(double timeout);
|
||||
|
||||
// If the Timeout task should continue waiting
|
||||
std::atomic<bool> active{false};
|
||||
|
||||
std::mutex main_lock;
|
||||
std::condition_variable main_cv;
|
||||
|
||||
private:
|
||||
// if the Timeout task finished executing
|
||||
bool finished{true};
|
||||
|
||||
utils::ThreadPool timeout_pool{1};
|
||||
};
|
||||
|
||||
std::optional<double> timeout_;
|
||||
std::optional<TimeoutDispatcher> timeout_dispatcher_;
|
||||
|
||||
utils::SpinLock client_lock_;
|
||||
// This thread pool is used for background tasks so we don't
|
||||
// block the main storage thread
|
||||
|
@ -1902,9 +1902,6 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
|
||||
return RegisterReplicaError::END_POINT_EXISTS;
|
||||
}
|
||||
|
||||
MG_ASSERT(replication_mode == replication::ReplicationMode::SYNC || !config.timeout,
|
||||
"Only SYNC mode can have a timeout set");
|
||||
|
||||
auto client = std::make_unique<ReplicationClient>(std::move(name), this, endpoint, replication_mode, config);
|
||||
if (client->State() == replication::ReplicaState::INVALID) {
|
||||
return RegisterReplicaError::CONNECTION_FAILED;
|
||||
@ -1952,11 +1949,10 @@ std::vector<Storage::ReplicaInfo> Storage::ReplicasInfo() {
|
||||
return replication_clients_.WithLock([](auto &clients) {
|
||||
std::vector<Storage::ReplicaInfo> replica_info;
|
||||
replica_info.reserve(clients.size());
|
||||
std::transform(clients.begin(), clients.end(), std::back_inserter(replica_info),
|
||||
[](const auto &client) -> ReplicaInfo {
|
||||
return {client->Name(), client->Mode(), client->Timeout(),
|
||||
client->Endpoint(), client->State(), client->GetTimestampInfo()};
|
||||
});
|
||||
std::transform(
|
||||
clients.begin(), clients.end(), std::back_inserter(replica_info), [](const auto &client) -> ReplicaInfo {
|
||||
return {client->Name(), client->Mode(), client->Endpoint(), client->State(), client->GetTimestampInfo()};
|
||||
});
|
||||
return replica_info;
|
||||
});
|
||||
}
|
||||
|
@ -433,7 +433,6 @@ class Storage final {
|
||||
struct ReplicaInfo {
|
||||
std::string name;
|
||||
replication::ReplicationMode mode;
|
||||
std::optional<double> timeout;
|
||||
io::network::Endpoint endpoint;
|
||||
replication::ReplicaState state;
|
||||
TimestampInfo timestamp_info;
|
||||
|
@ -67,7 +67,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
"log_file": "main.log",
|
||||
"setup_queries": [
|
||||
"REGISTER REPLICA replica1 SYNC TO '127.0.0.1:10001'",
|
||||
"REGISTER REPLICA replica2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'",
|
||||
"REGISTER REPLICA replica2 SYNC TO '127.0.0.1:10002'",
|
||||
],
|
||||
},
|
||||
}
|
||||
|
@ -36,7 +36,6 @@ def test_show_replicas(connection):
|
||||
"name",
|
||||
"socket_address",
|
||||
"sync_mode",
|
||||
"timeout",
|
||||
"current_timestamp_of_replica",
|
||||
"number_of_timestamp_behind_master",
|
||||
"state",
|
||||
@ -45,9 +44,9 @@ def test_show_replicas(connection):
|
||||
assert expected_column_names == actual_column_names
|
||||
|
||||
expected_data = {
|
||||
("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "ready"),
|
||||
("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"),
|
||||
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"),
|
||||
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
|
||||
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
|
||||
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
|
||||
}
|
||||
assert expected_data == actual_data
|
||||
|
||||
@ -68,7 +67,6 @@ def test_show_replicas_while_inserting_data(connection):
|
||||
"name",
|
||||
"socket_address",
|
||||
"sync_mode",
|
||||
"timeout",
|
||||
"current_timestamp_of_replica",
|
||||
"number_of_timestamp_behind_master",
|
||||
"state",
|
||||
@ -77,9 +75,9 @@ def test_show_replicas_while_inserting_data(connection):
|
||||
assert expected_column_names == actual_column_names
|
||||
|
||||
expected_data = {
|
||||
("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "ready"),
|
||||
("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"),
|
||||
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"),
|
||||
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
|
||||
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
|
||||
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
|
||||
}
|
||||
assert expected_data == actual_data
|
||||
|
||||
@ -89,9 +87,9 @@ def test_show_replicas_while_inserting_data(connection):
|
||||
|
||||
# 2/
|
||||
expected_data = {
|
||||
("replica_1", "127.0.0.1:10001", "sync", 2.0, 4, 0, "ready"),
|
||||
("replica_2", "127.0.0.1:10002", "sync", 1.0, 4, 0, "ready"),
|
||||
("replica_3", "127.0.0.1:10003", "async", None, 4, 0, "ready"),
|
||||
("replica_1", "127.0.0.1:10001", "sync", 4, 0, "ready"),
|
||||
("replica_2", "127.0.0.1:10002", "sync", 4, 0, "ready"),
|
||||
("replica_3", "127.0.0.1:10003", "async", 4, 0, "ready"),
|
||||
}
|
||||
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
|
||||
print("actual_data=" + str(actual_data))
|
||||
|
@ -51,8 +51,8 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
"args": ["--bolt-port", "7687", "--log-level=TRACE"],
|
||||
"log_file": "main.log",
|
||||
"setup_queries": [
|
||||
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 2 TO '127.0.0.1:10001';",
|
||||
"REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002';",
|
||||
"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001';",
|
||||
"REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';",
|
||||
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003';",
|
||||
"REGISTER REPLICA replica_4 ASYNC TO '127.0.0.1:10004';",
|
||||
],
|
||||
@ -78,7 +78,6 @@ def test_show_replicas(connection):
|
||||
"name",
|
||||
"socket_address",
|
||||
"sync_mode",
|
||||
"timeout",
|
||||
"current_timestamp_of_replica",
|
||||
"number_of_timestamp_behind_master",
|
||||
"state",
|
||||
@ -88,10 +87,10 @@ def test_show_replicas(connection):
|
||||
assert EXPECTED_COLUMN_NAMES == actual_column_names
|
||||
|
||||
expected_data = {
|
||||
("replica_1", "127.0.0.1:10001", "sync", 2, 0, 0, "ready"),
|
||||
("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"),
|
||||
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"),
|
||||
("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "ready"),
|
||||
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
|
||||
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
|
||||
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
|
||||
("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"),
|
||||
}
|
||||
assert expected_data == actual_data
|
||||
|
||||
@ -99,9 +98,9 @@ def test_show_replicas(connection):
|
||||
execute_and_fetch_all(cursor, "DROP REPLICA replica_2")
|
||||
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
|
||||
expected_data = {
|
||||
("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "ready"),
|
||||
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"),
|
||||
("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "ready"),
|
||||
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
|
||||
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
|
||||
("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"),
|
||||
}
|
||||
assert expected_data == actual_data
|
||||
|
||||
@ -114,45 +113,12 @@ def test_show_replicas(connection):
|
||||
time.sleep(2)
|
||||
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
|
||||
expected_data = {
|
||||
("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "invalid"),
|
||||
("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "invalid"),
|
||||
("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "invalid"),
|
||||
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"),
|
||||
("replica_3", "127.0.0.1:10003", "async", 0, 0, "invalid"),
|
||||
("replica_4", "127.0.0.1:10004", "async", 0, 0, "invalid"),
|
||||
}
|
||||
assert expected_data == actual_data
|
||||
|
||||
|
||||
def test_add_replica_invalid_timeout(connection):
|
||||
# Goal of this test is to check the registration of replica with invalid timeout raises an exception
|
||||
CONFIGURATION = {
|
||||
"replica_1": {
|
||||
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
|
||||
"log_file": "replica1.log",
|
||||
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
|
||||
},
|
||||
"main": {
|
||||
"args": ["--bolt-port", "7687", "--log-level=TRACE"],
|
||||
"log_file": "main.log",
|
||||
"setup_queries": [],
|
||||
},
|
||||
}
|
||||
|
||||
interactive_mg_runner.start_all(CONFIGURATION)
|
||||
|
||||
cursor = connection(7687, "main").cursor()
|
||||
|
||||
with pytest.raises(mgclient.DatabaseError):
|
||||
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001';")
|
||||
|
||||
with pytest.raises(mgclient.DatabaseError):
|
||||
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT -5 TO '127.0.0.1:10001';")
|
||||
|
||||
actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;")
|
||||
assert 0 == len(actual_data)
|
||||
|
||||
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10001';")
|
||||
actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;")
|
||||
assert 1 == len(actual_data)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
||||
|
@ -29,8 +29,8 @@ template_cluster: &template_cluster
|
||||
args: ["--bolt-port", "7687", "--log-level=TRACE"]
|
||||
log_file: "replication-e2e-main.log"
|
||||
setup_queries: [
|
||||
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 2 TO '127.0.0.1:10001'",
|
||||
"REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'",
|
||||
"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001'",
|
||||
"REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002'",
|
||||
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'"
|
||||
]
|
||||
<<: *template_validation_queries
|
||||
@ -69,8 +69,8 @@ workloads:
|
||||
args: ["--bolt-port", "7687", "--log-level=TRACE"]
|
||||
log_file: "replication-e2e-main.log"
|
||||
setup_queries: [
|
||||
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 2 TO '127.0.0.1:10001'",
|
||||
"REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'",
|
||||
"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001'",
|
||||
"REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002'",
|
||||
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'"
|
||||
]
|
||||
validation_queries: []
|
||||
|
@ -3,4 +3,4 @@
|
||||
"n3" {:replication-role :replica :replication-mode :async :port 10000}}
|
||||
{"n1" {:replication-role :main}
|
||||
"n2" {:replication-role :replica :replication-mode :async :port 10000}
|
||||
"n3" {:replication-role :replica :replication-mode :sync :port 10000 :timeout 3}}]
|
||||
"n3" {:replication-role :replica :replication-mode :sync :port 10000}}]
|
||||
|
@ -31,7 +31,7 @@
|
||||
[node-config]
|
||||
(case (:replication-mode node-config)
|
||||
:async "ASYNC"
|
||||
:sync (str "SYNC" (when-let [timeout (:timeout node-config)] (str " WITH TIMEOUT " timeout)))))
|
||||
:sync "SYNC" ))
|
||||
|
||||
(defn create-register-replica-query
|
||||
[name node-config]
|
||||
|
@ -129,13 +129,7 @@
|
||||
:replication-mode
|
||||
(str "Invalid node configuration. "
|
||||
"Every replication node requires "
|
||||
":replication-mode to be defined."))
|
||||
(throw-if-key-missing-in-any
|
||||
(filter #(= (:replication-mode %) :sync) replica-nodes-configs)
|
||||
:timeout
|
||||
(str "Invalid node confiruation. "
|
||||
"Every SYNC replication node requires "
|
||||
":timeout to be defined."))))
|
||||
":replication-mode to be defined."))))
|
||||
|
||||
(map (fn [node-config] (resolve-all-node-hostnames
|
||||
(merge
|
||||
|
@ -2322,17 +2322,13 @@ TEST_P(CypherMainVisitorTest, ShowUsersForRole) {
|
||||
|
||||
void check_replication_query(Base *ast_generator, const ReplicationQuery *query, const std::string name,
|
||||
const std::optional<TypedValue> socket_address, const ReplicationQuery::SyncMode sync_mode,
|
||||
const std::optional<TypedValue> timeout = {}, const std::optional<TypedValue> port = {}) {
|
||||
const std::optional<TypedValue> port = {}) {
|
||||
EXPECT_EQ(query->replica_name_, name);
|
||||
EXPECT_EQ(query->sync_mode_, sync_mode);
|
||||
ASSERT_EQ(static_cast<bool>(query->socket_address_), static_cast<bool>(socket_address));
|
||||
if (socket_address) {
|
||||
ast_generator->CheckLiteral(query->socket_address_, *socket_address);
|
||||
}
|
||||
ASSERT_EQ(static_cast<bool>(query->timeout_), static_cast<bool>(timeout));
|
||||
if (timeout) {
|
||||
ast_generator->CheckLiteral(query->timeout_, *timeout);
|
||||
}
|
||||
ASSERT_EQ(static_cast<bool>(query->port_), static_cast<bool>(port));
|
||||
if (port) {
|
||||
ast_generator->CheckLiteral(query->port_, *port);
|
||||
@ -2390,20 +2386,22 @@ TEST_P(CypherMainVisitorTest, TestSetReplicationMode) {
|
||||
TEST_P(CypherMainVisitorTest, TestRegisterReplicationQuery) {
|
||||
auto &ast_generator = *GetParam();
|
||||
|
||||
const std::string faulty_query = "REGISTER REPLICA WITH TIMEOUT TO";
|
||||
const std::string faulty_query = "REGISTER REPLICA TO";
|
||||
ASSERT_THROW(ast_generator.ParseQuery(faulty_query), SyntaxException);
|
||||
|
||||
const std::string no_timeout_query = R"(REGISTER REPLICA replica1 SYNC TO "127.0.0.1")";
|
||||
auto *no_timeout_query_parsed = dynamic_cast<ReplicationQuery *>(ast_generator.ParseQuery(no_timeout_query));
|
||||
ASSERT_TRUE(no_timeout_query_parsed);
|
||||
check_replication_query(&ast_generator, no_timeout_query_parsed, "replica1", TypedValue("127.0.0.1"),
|
||||
const std::string faulty_query_with_timeout = R"(REGISTER REPLICA replica1 SYNC WITH TIMEOUT 1.0 TO "127.0.0.1")";
|
||||
ASSERT_THROW(ast_generator.ParseQuery(faulty_query_with_timeout), SyntaxException);
|
||||
|
||||
const std::string correct_query = R"(REGISTER REPLICA replica1 SYNC TO "127.0.0.1")";
|
||||
auto *correct_query_parsed = dynamic_cast<ReplicationQuery *>(ast_generator.ParseQuery(correct_query));
|
||||
check_replication_query(&ast_generator, correct_query_parsed, "replica1", TypedValue("127.0.0.1"),
|
||||
ReplicationQuery::SyncMode::SYNC);
|
||||
|
||||
std::string full_query = R"(REGISTER REPLICA replica2 SYNC WITH TIMEOUT 0.5 TO "1.1.1.1:10000")";
|
||||
std::string full_query = R"(REGISTER REPLICA replica2 SYNC TO "1.1.1.1:10000")";
|
||||
auto *full_query_parsed = dynamic_cast<ReplicationQuery *>(ast_generator.ParseQuery(full_query));
|
||||
ASSERT_TRUE(full_query_parsed);
|
||||
check_replication_query(&ast_generator, full_query_parsed, "replica2", TypedValue("1.1.1.1:10000"),
|
||||
ReplicationQuery::SyncMode::SYNC, TypedValue(0.5));
|
||||
ReplicationQuery::SyncMode::SYNC);
|
||||
}
|
||||
|
||||
TEST_P(CypherMainVisitorTest, TestDeleteReplica) {
|
||||
|
@ -624,10 +624,10 @@ TEST_F(ReplicationTest, ReplicationInformation) {
|
||||
replica_store2.SetReplicaRole(replica2_endpoint);
|
||||
|
||||
const std::string replica1_name{"REPLICA1"};
|
||||
ASSERT_FALSE(main_store
|
||||
.RegisterReplica(replica1_name, replica1_endpoint,
|
||||
memgraph::storage::replication::ReplicationMode::SYNC, {.timeout = 2.0})
|
||||
.HasError());
|
||||
ASSERT_FALSE(
|
||||
main_store
|
||||
.RegisterReplica(replica1_name, replica1_endpoint, memgraph::storage::replication::ReplicationMode::SYNC)
|
||||
.HasError());
|
||||
|
||||
const std::string replica2_name{"REPLICA2"};
|
||||
ASSERT_FALSE(
|
||||
@ -645,15 +645,12 @@ TEST_F(ReplicationTest, ReplicationInformation) {
|
||||
const auto &first_info = replicas_info[0];
|
||||
ASSERT_EQ(first_info.name, replica1_name);
|
||||
ASSERT_EQ(first_info.mode, memgraph::storage::replication::ReplicationMode::SYNC);
|
||||
ASSERT_TRUE(first_info.timeout);
|
||||
ASSERT_EQ(*first_info.timeout, 2.0);
|
||||
ASSERT_EQ(first_info.endpoint, replica1_endpoint);
|
||||
ASSERT_EQ(first_info.state, memgraph::storage::replication::ReplicaState::READY);
|
||||
|
||||
const auto &second_info = replicas_info[1];
|
||||
ASSERT_EQ(second_info.name, replica2_name);
|
||||
ASSERT_EQ(second_info.mode, memgraph::storage::replication::ReplicationMode::ASYNC);
|
||||
ASSERT_FALSE(second_info.timeout);
|
||||
ASSERT_EQ(second_info.endpoint, replica2_endpoint);
|
||||
ASSERT_EQ(second_info.state, memgraph::storage::replication::ReplicaState::READY);
|
||||
}
|
||||
@ -672,10 +669,10 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) {
|
||||
replica_store2.SetReplicaRole(replica2_endpoint);
|
||||
|
||||
const std::string replica1_name{"REPLICA1"};
|
||||
ASSERT_FALSE(main_store
|
||||
.RegisterReplica(replica1_name, replica1_endpoint,
|
||||
memgraph::storage::replication::ReplicationMode::SYNC, {.timeout = 2.0})
|
||||
.HasError());
|
||||
ASSERT_FALSE(
|
||||
main_store
|
||||
.RegisterReplica(replica1_name, replica1_endpoint, memgraph::storage::replication::ReplicationMode::SYNC)
|
||||
.HasError());
|
||||
|
||||
const std::string replica2_name{"REPLICA1"};
|
||||
ASSERT_TRUE(
|
||||
@ -698,10 +695,10 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
|
||||
replica_store2.SetReplicaRole(replica2_endpoint);
|
||||
|
||||
const std::string replica1_name{"REPLICA1"};
|
||||
ASSERT_FALSE(main_store
|
||||
.RegisterReplica(replica1_name, replica1_endpoint,
|
||||
memgraph::storage::replication::ReplicationMode::SYNC, {.timeout = 2.0})
|
||||
.HasError());
|
||||
ASSERT_FALSE(
|
||||
main_store
|
||||
.RegisterReplica(replica1_name, replica1_endpoint, memgraph::storage::replication::ReplicationMode::SYNC)
|
||||
.HasError());
|
||||
|
||||
const std::string replica2_name{"REPLICA2"};
|
||||
ASSERT_TRUE(
|
||||
|
Loading…
Reference in New Issue
Block a user