diff --git a/src/query/frontend/ast/ast.lcp b/src/query/frontend/ast/ast.lcp index 33d397754..6b5b5a16f 100644 --- a/src/query/frontend/ast/ast.lcp +++ b/src/query/frontend/ast/ast.lcp @@ -2375,10 +2375,7 @@ cpp<# (port "Expression *" :initval "nullptr" :scope :public :slk-save #'slk-save-ast-pointer :slk-load (slk-load-ast-pointer "Expression")) - (sync_mode "SyncMode" :scope :public) - (timeout "Expression *" :initval "nullptr" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "Expression"))) + (sync_mode "SyncMode" :scope :public)) (:public (lcp:define-enum action diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index f4a269dfd..a12643298 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -275,18 +275,7 @@ antlrcpp::Any CypherMainVisitor::visitRegisterReplica(MemgraphCypher::RegisterRe replication_query->replica_name_ = ctx->replicaName()->symbolicName()->accept(this).as(); if (ctx->SYNC()) { replication_query->sync_mode_ = memgraph::query::ReplicationQuery::SyncMode::SYNC; - if (ctx->WITH() && ctx->TIMEOUT()) { - if (ctx->timeout->numberLiteral()) { - // we accept both double and integer literals - replication_query->timeout_ = ctx->timeout->accept(this); - } else { - throw SemanticException("Timeout should be a integer or double literal!"); - } - } } else if (ctx->ASYNC()) { - if (ctx->WITH() && ctx->TIMEOUT()) { - throw SyntaxException("Timeout can be set only for the SYNC replication mode!"); - } replication_query->sync_mode_ = memgraph::query::ReplicationQuery::SyncMode::ASYNC; } diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index b412a474a..8790b8731 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -276,7 +276,6 @@ replicaName : symbolicName ; socketAddress : literal ; registerReplica : REGISTER REPLICA replicaName ( SYNC | ASYNC ) - ( WITH TIMEOUT timeout=literal ) ? TO socketAddress ; dropReplica : DROP REPLICA replicaName ; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index c1d206a72..17d62b240 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -160,7 +160,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { /// @throw QueryRuntimeException if an error ocurred. void RegisterReplica(const std::string &name, const std::string &socket_address, - const ReplicationQuery::SyncMode sync_mode, const std::optional timeout, + const ReplicationQuery::SyncMode sync_mode, const std::chrono::seconds replica_check_frequency) override { if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) { // replica can't register another replica @@ -183,9 +183,8 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { io::network::Endpoint::ParseSocketOrIpAddress(socket_address, query::kDefaultReplicationPort); if (maybe_ip_and_port) { auto [ip, port] = *maybe_ip_and_port; - auto ret = db_->RegisterReplica( - name, {std::move(ip), port}, repl_mode, - {.timeout = timeout, .replica_check_frequency = replica_check_frequency, .ssl = std::nullopt}); + auto ret = db_->RegisterReplica(name, {std::move(ip), port}, repl_mode, + {.replica_check_frequency = replica_check_frequency, .ssl = std::nullopt}); if (ret.HasError()) { throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name)); } @@ -228,9 +227,6 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { replica.sync_mode = ReplicationQuery::SyncMode::ASYNC; break; } - if (repl_info.timeout) { - replica.timeout = *repl_info.timeout; - } replica.current_timestamp_of_replica = repl_info.timestamp_info.current_timestamp_of_replica; replica.current_number_of_timestamp_behind_master = @@ -487,22 +483,11 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & const auto &name = repl_query->replica_name_; const auto &sync_mode = repl_query->sync_mode_; auto socket_address = repl_query->socket_address_->Accept(evaluator); - auto timeout = EvaluateOptionalExpression(repl_query->timeout_, &evaluator); const auto replica_check_frequency = interpreter_context->config.replication_replica_check_frequency; - std::optional maybe_timeout; - if (timeout.IsDouble()) { - maybe_timeout = timeout.ValueDouble(); - } else if (timeout.IsInt()) { - maybe_timeout = static_cast(timeout.ValueInt()); - } - if (maybe_timeout && *maybe_timeout <= 0.0) { - throw utils::BasicException("Parameter TIMEOUT must be strictly greater than 0."); - } callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name, socket_address, sync_mode, - maybe_timeout, replica_check_frequency]() mutable { - handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout, - replica_check_frequency); + replica_check_frequency]() mutable { + handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, replica_check_frequency); return std::vector>(); }; notifications->emplace_back(SeverityLevel::INFO, NotificationCode::REGISTER_REPLICA, @@ -522,13 +507,9 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } case ReplicationQuery::Action::SHOW_REPLICAS: { - callback.header = {"name", - "socket_address", - "sync_mode", - "timeout", - "current_timestamp_of_replica", - "number_of_timestamp_behind_master", - "state"}; + callback.header = { + "name", "socket_address", "sync_mode", "current_timestamp_of_replica", "number_of_timestamp_behind_master", + "state"}; callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, replica_nfields = callback.header.size()] { const auto &replicas = handler.ShowReplicas(); auto typed_replicas = std::vector>{}; @@ -549,12 +530,6 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & break; } - if (replica.timeout) { - typed_replica.emplace_back(TypedValue(*replica.timeout)); - } else { - typed_replica.emplace_back(TypedValue()); - } - typed_replica.emplace_back(TypedValue(static_cast(replica.current_timestamp_of_replica))); typed_replica.emplace_back( TypedValue(static_cast(replica.current_number_of_timestamp_behind_master))); diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 155cb28a6..5373588d9 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -140,7 +140,7 @@ class ReplicationQueryHandler { /// @throw QueryRuntimeException if an error ocurred. virtual void RegisterReplica(const std::string &name, const std::string &socket_address, - const ReplicationQuery::SyncMode sync_mode, const std::optional timeout, + ReplicationQuery::SyncMode sync_mode, const std::chrono::seconds replica_check_frequency) = 0; /// @throw QueryRuntimeException if an error ocurred. diff --git a/src/storage/v2/replication/config.hpp b/src/storage/v2/replication/config.hpp index 124e7e9d2..6c7e40ba9 100644 --- a/src/storage/v2/replication/config.hpp +++ b/src/storage/v2/replication/config.hpp @@ -15,7 +15,6 @@ namespace memgraph::storage::replication { struct ReplicationClientConfig { - std::optional timeout; // The default delay between main checking/pinging replicas is 1s because // that seems like a reasonable timeframe in which main should notice a // replica is down. diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 19fdf4d9b..9b6401391 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -43,12 +43,6 @@ Storage::ReplicationClient::ReplicationClient(std::string name, Storage *storage rpc_client_.emplace(endpoint, &*rpc_context_); TryInitializeClientSync(); - if (config.timeout && replica_state_ != replication::ReplicaState::INVALID) { - MG_ASSERT(*config.timeout > 0); - timeout_.emplace(*config.timeout); - timeout_dispatcher_.emplace(); - } - // Help the user to get the most accurate replica state possible. if (config.replica_check_frequency > std::chrono::seconds(0)) { replica_checker_.Run("Replica Checker", config.replica_check_frequency, [&] { FrequentCheck(); }); @@ -239,41 +233,6 @@ void Storage::ReplicationClient::FinalizeTransactionReplication() { if (mode_ == replication::ReplicationMode::ASYNC) { thread_pool_.AddTask([this] { this->FinalizeTransactionReplicationInternal(); }); - } else if (timeout_) { - MG_ASSERT(mode_ == replication::ReplicationMode::SYNC, "Only SYNC replica can have a timeout."); - MG_ASSERT(timeout_dispatcher_, "Timeout thread is missing"); - timeout_dispatcher_->WaitForTaskToFinish(); - - timeout_dispatcher_->active = true; - thread_pool_.AddTask([&, this] { - this->FinalizeTransactionReplicationInternal(); - std::unique_lock main_guard(timeout_dispatcher_->main_lock); - // TimerThread can finish waiting for timeout - timeout_dispatcher_->active = false; - // Notify the main thread - timeout_dispatcher_->main_cv.notify_one(); - }); - - timeout_dispatcher_->StartTimeoutTask(*timeout_); - - // Wait until one of the threads notifies us that they finished executing - // Both threads should first set the active flag to false - { - std::unique_lock main_guard(timeout_dispatcher_->main_lock); - timeout_dispatcher_->main_cv.wait(main_guard, [&] { return !timeout_dispatcher_->active.load(); }); - } - - // TODO (antonio2368): Document and/or polish SEMI-SYNC to ASYNC fallback. - if (replica_state_ == replication::ReplicaState::REPLICATING) { - mode_ = replication::ReplicationMode::ASYNC; - timeout_.reset(); - // This can only happen if we timeouted so we are sure that - // Timeout task finished - // We need to delete timeout dispatcher AFTER the replication - // finished because it tries to acquire the timeout lock - // and acces the `active` variable` - thread_pool_.AddTask([this] { timeout_dispatcher_.reset(); }); - } } else { FinalizeTransactionReplicationInternal(); } @@ -566,30 +525,6 @@ Storage::TimestampInfo Storage::ReplicationClient::GetTimestampInfo() { return info; } -////// TimeoutDispatcher ////// -void Storage::ReplicationClient::TimeoutDispatcher::WaitForTaskToFinish() { - // Wait for the previous timeout task to finish - std::unique_lock main_guard(main_lock); - main_cv.wait(main_guard, [&] { return finished; }); -} - -void Storage::ReplicationClient::TimeoutDispatcher::StartTimeoutTask(const double timeout) { - timeout_pool.AddTask([timeout, this] { - finished = false; - using std::chrono::steady_clock; - const auto timeout_duration = - std::chrono::duration_cast(std::chrono::duration(timeout)); - const auto end_time = steady_clock::now() + timeout_duration; - while (active && (steady_clock::now() < end_time)) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - - std::unique_lock main_guard(main_lock); - finished = true; - active = false; - main_cv.notify_one(); - }); -} ////// ReplicaStream ////// Storage::ReplicationClient::ReplicaStream::ReplicaStream(ReplicationClient *self, const uint64_t previous_commit_timestamp, diff --git a/src/storage/v2/replication/replication_client.hpp b/src/storage/v2/replication/replication_client.hpp index b1be036c4..71c674062 100644 --- a/src/storage/v2/replication/replication_client.hpp +++ b/src/storage/v2/replication/replication_client.hpp @@ -120,8 +120,6 @@ class Storage::ReplicationClient { auto Mode() const { return mode_; } - auto Timeout() const { return timeout_; } - const auto &Endpoint() const { return rpc_client_->Endpoint(); } Storage::TimestampInfo GetTimestampInfo(); @@ -158,30 +156,6 @@ class Storage::ReplicationClient { std::optional replica_stream_; replication::ReplicationMode mode_{replication::ReplicationMode::SYNC}; - // Dispatcher class for timeout tasks - struct TimeoutDispatcher { - explicit TimeoutDispatcher(){}; - - void WaitForTaskToFinish(); - - void StartTimeoutTask(double timeout); - - // If the Timeout task should continue waiting - std::atomic active{false}; - - std::mutex main_lock; - std::condition_variable main_cv; - - private: - // if the Timeout task finished executing - bool finished{true}; - - utils::ThreadPool timeout_pool{1}; - }; - - std::optional timeout_; - std::optional timeout_dispatcher_; - utils::SpinLock client_lock_; // This thread pool is used for background tasks so we don't // block the main storage thread diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index e0fbdf0da..1e66ffd6e 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -1902,9 +1902,6 @@ utils::BasicResult Storage::RegisterReplica( return RegisterReplicaError::END_POINT_EXISTS; } - MG_ASSERT(replication_mode == replication::ReplicationMode::SYNC || !config.timeout, - "Only SYNC mode can have a timeout set"); - auto client = std::make_unique(std::move(name), this, endpoint, replication_mode, config); if (client->State() == replication::ReplicaState::INVALID) { return RegisterReplicaError::CONNECTION_FAILED; @@ -1952,11 +1949,10 @@ std::vector Storage::ReplicasInfo() { return replication_clients_.WithLock([](auto &clients) { std::vector replica_info; replica_info.reserve(clients.size()); - std::transform(clients.begin(), clients.end(), std::back_inserter(replica_info), - [](const auto &client) -> ReplicaInfo { - return {client->Name(), client->Mode(), client->Timeout(), - client->Endpoint(), client->State(), client->GetTimestampInfo()}; - }); + std::transform( + clients.begin(), clients.end(), std::back_inserter(replica_info), [](const auto &client) -> ReplicaInfo { + return {client->Name(), client->Mode(), client->Endpoint(), client->State(), client->GetTimestampInfo()}; + }); return replica_info; }); } diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 90a5daf16..bd8b351e5 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -433,7 +433,6 @@ class Storage final { struct ReplicaInfo { std::string name; replication::ReplicationMode mode; - std::optional timeout; io::network::Endpoint endpoint; replication::ReplicaState state; TimestampInfo timestamp_info; diff --git a/tests/e2e/interactive_mg_runner.py b/tests/e2e/interactive_mg_runner.py index 707eb8cfd..b7eaf60da 100644 --- a/tests/e2e/interactive_mg_runner.py +++ b/tests/e2e/interactive_mg_runner.py @@ -67,7 +67,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { "log_file": "main.log", "setup_queries": [ "REGISTER REPLICA replica1 SYNC TO '127.0.0.1:10001'", - "REGISTER REPLICA replica2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'", + "REGISTER REPLICA replica2 SYNC TO '127.0.0.1:10002'", ], }, } diff --git a/tests/e2e/replication/show.py b/tests/e2e/replication/show.py index add3d5938..3857ea7eb 100755 --- a/tests/e2e/replication/show.py +++ b/tests/e2e/replication/show.py @@ -36,7 +36,6 @@ def test_show_replicas(connection): "name", "socket_address", "sync_mode", - "timeout", "current_timestamp_of_replica", "number_of_timestamp_behind_master", "state", @@ -45,9 +44,9 @@ def test_show_replicas(connection): assert expected_column_names == actual_column_names expected_data = { - ("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "ready"), - ("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"), - ("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"), + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"), } assert expected_data == actual_data @@ -68,7 +67,6 @@ def test_show_replicas_while_inserting_data(connection): "name", "socket_address", "sync_mode", - "timeout", "current_timestamp_of_replica", "number_of_timestamp_behind_master", "state", @@ -77,9 +75,9 @@ def test_show_replicas_while_inserting_data(connection): assert expected_column_names == actual_column_names expected_data = { - ("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "ready"), - ("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"), - ("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"), + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"), } assert expected_data == actual_data @@ -89,9 +87,9 @@ def test_show_replicas_while_inserting_data(connection): # 2/ expected_data = { - ("replica_1", "127.0.0.1:10001", "sync", 2.0, 4, 0, "ready"), - ("replica_2", "127.0.0.1:10002", "sync", 1.0, 4, 0, "ready"), - ("replica_3", "127.0.0.1:10003", "async", None, 4, 0, "ready"), + ("replica_1", "127.0.0.1:10001", "sync", 4, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 4, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 4, 0, "ready"), } actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) print("actual_data=" + str(actual_data)) diff --git a/tests/e2e/replication/show_while_creating_invalid_state.py b/tests/e2e/replication/show_while_creating_invalid_state.py index 151119be8..6386edcf2 100644 --- a/tests/e2e/replication/show_while_creating_invalid_state.py +++ b/tests/e2e/replication/show_while_creating_invalid_state.py @@ -51,8 +51,8 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { "args": ["--bolt-port", "7687", "--log-level=TRACE"], "log_file": "main.log", "setup_queries": [ - "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 2 TO '127.0.0.1:10001';", - "REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002';", + "REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001';", + "REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';", "REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003';", "REGISTER REPLICA replica_4 ASYNC TO '127.0.0.1:10004';", ], @@ -78,7 +78,6 @@ def test_show_replicas(connection): "name", "socket_address", "sync_mode", - "timeout", "current_timestamp_of_replica", "number_of_timestamp_behind_master", "state", @@ -88,10 +87,10 @@ def test_show_replicas(connection): assert EXPECTED_COLUMN_NAMES == actual_column_names expected_data = { - ("replica_1", "127.0.0.1:10001", "sync", 2, 0, 0, "ready"), - ("replica_2", "127.0.0.1:10002", "sync", 1.0, 0, 0, "ready"), - ("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"), - ("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "ready"), + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"), } assert expected_data == actual_data @@ -99,9 +98,9 @@ def test_show_replicas(connection): execute_and_fetch_all(cursor, "DROP REPLICA replica_2") actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) expected_data = { - ("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "ready"), - ("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "ready"), - ("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "ready"), + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"), } assert expected_data == actual_data @@ -114,45 +113,12 @@ def test_show_replicas(connection): time.sleep(2) actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) expected_data = { - ("replica_1", "127.0.0.1:10001", "sync", 2.0, 0, 0, "invalid"), - ("replica_3", "127.0.0.1:10003", "async", None, 0, 0, "invalid"), - ("replica_4", "127.0.0.1:10004", "async", None, 0, 0, "invalid"), + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"), + ("replica_3", "127.0.0.1:10003", "async", 0, 0, "invalid"), + ("replica_4", "127.0.0.1:10004", "async", 0, 0, "invalid"), } assert expected_data == actual_data -def test_add_replica_invalid_timeout(connection): - # Goal of this test is to check the registration of replica with invalid timeout raises an exception - CONFIGURATION = { - "replica_1": { - "args": ["--bolt-port", "7688", "--log-level=TRACE"], - "log_file": "replica1.log", - "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], - }, - "main": { - "args": ["--bolt-port", "7687", "--log-level=TRACE"], - "log_file": "main.log", - "setup_queries": [], - }, - } - - interactive_mg_runner.start_all(CONFIGURATION) - - cursor = connection(7687, "main").cursor() - - with pytest.raises(mgclient.DatabaseError): - execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001';") - - with pytest.raises(mgclient.DatabaseError): - execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT -5 TO '127.0.0.1:10001';") - - actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;") - assert 0 == len(actual_data) - - execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10001';") - actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;") - assert 1 == len(actual_data) - - if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/replication/workloads.yaml b/tests/e2e/replication/workloads.yaml index c7e11d12e..d9830ae04 100644 --- a/tests/e2e/replication/workloads.yaml +++ b/tests/e2e/replication/workloads.yaml @@ -29,8 +29,8 @@ template_cluster: &template_cluster args: ["--bolt-port", "7687", "--log-level=TRACE"] log_file: "replication-e2e-main.log" setup_queries: [ - "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 2 TO '127.0.0.1:10001'", - "REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'", + "REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001'", + "REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002'", "REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'" ] <<: *template_validation_queries @@ -69,8 +69,8 @@ workloads: args: ["--bolt-port", "7687", "--log-level=TRACE"] log_file: "replication-e2e-main.log" setup_queries: [ - "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 2 TO '127.0.0.1:10001'", - "REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'", + "REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001'", + "REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002'", "REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'" ] validation_queries: [] diff --git a/tests/jepsen/resources/node-config.edn b/tests/jepsen/resources/node-config.edn index 13c453366..93b2bffff 100644 --- a/tests/jepsen/resources/node-config.edn +++ b/tests/jepsen/resources/node-config.edn @@ -3,4 +3,4 @@ "n3" {:replication-role :replica :replication-mode :async :port 10000}} {"n1" {:replication-role :main} "n2" {:replication-role :replica :replication-mode :async :port 10000} - "n3" {:replication-role :replica :replication-mode :sync :port 10000 :timeout 3}}] + "n3" {:replication-role :replica :replication-mode :sync :port 10000}}] diff --git a/tests/jepsen/src/jepsen/memgraph/client.clj b/tests/jepsen/src/jepsen/memgraph/client.clj index 8f7ce2fb3..176f2433f 100644 --- a/tests/jepsen/src/jepsen/memgraph/client.clj +++ b/tests/jepsen/src/jepsen/memgraph/client.clj @@ -31,7 +31,7 @@ [node-config] (case (:replication-mode node-config) :async "ASYNC" - :sync (str "SYNC" (when-let [timeout (:timeout node-config)] (str " WITH TIMEOUT " timeout))))) + :sync "SYNC" )) (defn create-register-replica-query [name node-config] diff --git a/tests/jepsen/src/jepsen/memgraph/core.clj b/tests/jepsen/src/jepsen/memgraph/core.clj index 77bf6a163..545367bcc 100644 --- a/tests/jepsen/src/jepsen/memgraph/core.clj +++ b/tests/jepsen/src/jepsen/memgraph/core.clj @@ -129,13 +129,7 @@ :replication-mode (str "Invalid node configuration. " "Every replication node requires " - ":replication-mode to be defined.")) - (throw-if-key-missing-in-any - (filter #(= (:replication-mode %) :sync) replica-nodes-configs) - :timeout - (str "Invalid node confiruation. " - "Every SYNC replication node requires " - ":timeout to be defined.")))) + ":replication-mode to be defined.")))) (map (fn [node-config] (resolve-all-node-hostnames (merge diff --git a/tests/unit/cypher_main_visitor.cpp b/tests/unit/cypher_main_visitor.cpp index 39663e2ea..40aeb0161 100644 --- a/tests/unit/cypher_main_visitor.cpp +++ b/tests/unit/cypher_main_visitor.cpp @@ -2322,17 +2322,13 @@ TEST_P(CypherMainVisitorTest, ShowUsersForRole) { void check_replication_query(Base *ast_generator, const ReplicationQuery *query, const std::string name, const std::optional socket_address, const ReplicationQuery::SyncMode sync_mode, - const std::optional timeout = {}, const std::optional port = {}) { + const std::optional port = {}) { EXPECT_EQ(query->replica_name_, name); EXPECT_EQ(query->sync_mode_, sync_mode); ASSERT_EQ(static_cast(query->socket_address_), static_cast(socket_address)); if (socket_address) { ast_generator->CheckLiteral(query->socket_address_, *socket_address); } - ASSERT_EQ(static_cast(query->timeout_), static_cast(timeout)); - if (timeout) { - ast_generator->CheckLiteral(query->timeout_, *timeout); - } ASSERT_EQ(static_cast(query->port_), static_cast(port)); if (port) { ast_generator->CheckLiteral(query->port_, *port); @@ -2390,20 +2386,22 @@ TEST_P(CypherMainVisitorTest, TestSetReplicationMode) { TEST_P(CypherMainVisitorTest, TestRegisterReplicationQuery) { auto &ast_generator = *GetParam(); - const std::string faulty_query = "REGISTER REPLICA WITH TIMEOUT TO"; + const std::string faulty_query = "REGISTER REPLICA TO"; ASSERT_THROW(ast_generator.ParseQuery(faulty_query), SyntaxException); - const std::string no_timeout_query = R"(REGISTER REPLICA replica1 SYNC TO "127.0.0.1")"; - auto *no_timeout_query_parsed = dynamic_cast(ast_generator.ParseQuery(no_timeout_query)); - ASSERT_TRUE(no_timeout_query_parsed); - check_replication_query(&ast_generator, no_timeout_query_parsed, "replica1", TypedValue("127.0.0.1"), + const std::string faulty_query_with_timeout = R"(REGISTER REPLICA replica1 SYNC WITH TIMEOUT 1.0 TO "127.0.0.1")"; + ASSERT_THROW(ast_generator.ParseQuery(faulty_query_with_timeout), SyntaxException); + + const std::string correct_query = R"(REGISTER REPLICA replica1 SYNC TO "127.0.0.1")"; + auto *correct_query_parsed = dynamic_cast(ast_generator.ParseQuery(correct_query)); + check_replication_query(&ast_generator, correct_query_parsed, "replica1", TypedValue("127.0.0.1"), ReplicationQuery::SyncMode::SYNC); - std::string full_query = R"(REGISTER REPLICA replica2 SYNC WITH TIMEOUT 0.5 TO "1.1.1.1:10000")"; + std::string full_query = R"(REGISTER REPLICA replica2 SYNC TO "1.1.1.1:10000")"; auto *full_query_parsed = dynamic_cast(ast_generator.ParseQuery(full_query)); ASSERT_TRUE(full_query_parsed); check_replication_query(&ast_generator, full_query_parsed, "replica2", TypedValue("1.1.1.1:10000"), - ReplicationQuery::SyncMode::SYNC, TypedValue(0.5)); + ReplicationQuery::SyncMode::SYNC); } TEST_P(CypherMainVisitorTest, TestDeleteReplica) { diff --git a/tests/unit/storage_v2_replication.cpp b/tests/unit/storage_v2_replication.cpp index a32c9c78f..75b8a0b27 100644 --- a/tests/unit/storage_v2_replication.cpp +++ b/tests/unit/storage_v2_replication.cpp @@ -624,10 +624,10 @@ TEST_F(ReplicationTest, ReplicationInformation) { replica_store2.SetReplicaRole(replica2_endpoint); const std::string replica1_name{"REPLICA1"}; - ASSERT_FALSE(main_store - .RegisterReplica(replica1_name, replica1_endpoint, - memgraph::storage::replication::ReplicationMode::SYNC, {.timeout = 2.0}) - .HasError()); + ASSERT_FALSE( + main_store + .RegisterReplica(replica1_name, replica1_endpoint, memgraph::storage::replication::ReplicationMode::SYNC) + .HasError()); const std::string replica2_name{"REPLICA2"}; ASSERT_FALSE( @@ -645,15 +645,12 @@ TEST_F(ReplicationTest, ReplicationInformation) { const auto &first_info = replicas_info[0]; ASSERT_EQ(first_info.name, replica1_name); ASSERT_EQ(first_info.mode, memgraph::storage::replication::ReplicationMode::SYNC); - ASSERT_TRUE(first_info.timeout); - ASSERT_EQ(*first_info.timeout, 2.0); ASSERT_EQ(first_info.endpoint, replica1_endpoint); ASSERT_EQ(first_info.state, memgraph::storage::replication::ReplicaState::READY); const auto &second_info = replicas_info[1]; ASSERT_EQ(second_info.name, replica2_name); ASSERT_EQ(second_info.mode, memgraph::storage::replication::ReplicationMode::ASYNC); - ASSERT_FALSE(second_info.timeout); ASSERT_EQ(second_info.endpoint, replica2_endpoint); ASSERT_EQ(second_info.state, memgraph::storage::replication::ReplicaState::READY); } @@ -672,10 +669,10 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) { replica_store2.SetReplicaRole(replica2_endpoint); const std::string replica1_name{"REPLICA1"}; - ASSERT_FALSE(main_store - .RegisterReplica(replica1_name, replica1_endpoint, - memgraph::storage::replication::ReplicationMode::SYNC, {.timeout = 2.0}) - .HasError()); + ASSERT_FALSE( + main_store + .RegisterReplica(replica1_name, replica1_endpoint, memgraph::storage::replication::ReplicationMode::SYNC) + .HasError()); const std::string replica2_name{"REPLICA1"}; ASSERT_TRUE( @@ -698,10 +695,10 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) { replica_store2.SetReplicaRole(replica2_endpoint); const std::string replica1_name{"REPLICA1"}; - ASSERT_FALSE(main_store - .RegisterReplica(replica1_name, replica1_endpoint, - memgraph::storage::replication::ReplicationMode::SYNC, {.timeout = 2.0}) - .HasError()); + ASSERT_FALSE( + main_store + .RegisterReplica(replica1_name, replica1_endpoint, memgraph::storage::replication::ReplicationMode::SYNC) + .HasError()); const std::string replica2_name{"REPLICA2"}; ASSERT_TRUE(