From f07fd21a9e290a883a33c06bba87568692ef1340 Mon Sep 17 00:00:00 2001
From: antoniofilipovic <filipovicantonio1998@gmail.com>
Date: Mon, 12 Feb 2024 11:33:50 +0100
Subject: [PATCH] add working test for demote to replica, not replicating in
 new cluster

---
 .../not_replicate_from_old_main.py            | 147 +++++++++++++++++-
 1 file changed, 144 insertions(+), 3 deletions(-)

diff --git a/tests/e2e/high_availability_experimental/not_replicate_from_old_main.py b/tests/e2e/high_availability_experimental/not_replicate_from_old_main.py
index d6f6f7da4..b859cae84 100644
--- a/tests/e2e/high_availability_experimental/not_replicate_from_old_main.py
+++ b/tests/e2e/high_availability_experimental/not_replicate_from_old_main.py
@@ -10,11 +10,13 @@
 # licenses/APL.txt.
 
 import os
+import shutil
 import sys
+import tempfile
 
 import interactive_mg_runner
 import pytest
-from common import execute_and_fetch_all
+from common import execute_and_fetch_all, safe_execute
 from mg_utils import mg_sleep_and_assert
 
 interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@@ -38,7 +40,7 @@ MEMGRAPH_FIRST_CLUSTER_DESCRIPTION = {
 }
 
 
-MEMGRAPH_INSTANCES_DESCRIPTION = {
+MEMGRAPH_SECOND_CLUSTER_DESCRIPTION = {
     "replica": {
         "args": ["--bolt-port", "7689", "--log-level", "TRACE"],
         "log_file": "replica.log",
@@ -71,7 +73,7 @@ def test_replication_works_on_failover(connection):
     assert actual_data_on_main == expected_data_on_main
 
     # 3
-    interactive_mg_runner.start_all_keep_others(MEMGRAPH_INSTANCES_DESCRIPTION)
+    interactive_mg_runner.start_all_keep_others(MEMGRAPH_SECOND_CLUSTER_DESCRIPTION)
 
     # 4
     new_main_cursor = connection(7690, "main_2").cursor()
@@ -113,5 +115,144 @@ def test_replication_works_on_failover(connection):
     interactive_mg_runner.stop_all()
 
 
+def test_not_replicate_old_main_register_new_cluster(connection):
+    # Goal of this test is to check that although replica is registered in one cluster
+    # it can be re-registered to new cluster
+    # This flow checks if Registering replica is idempotent and that old main cannot talk to replica
+    # 1. We start all replicas and main in one cluster
+    # 2. Main from first cluster can see all replicas
+    # 3. We start all replicas and main in second cluster, by reusing one replica from first cluster
+    # 4. New main should see replica. Registration should pass (idempotent registration)
+    # 5. Old main should not talk to new replica
+    # 6. New main should talk to replica
+
+    TEMP_DIR = tempfile.TemporaryDirectory().name
+    MEMGRAPH_FISRT_COORD_CLUSTER_DESCRIPTION = {
+        "shared_instance": {
+            "args": [
+                "--bolt-port",
+                "7688",
+                "--log-level",
+                "TRACE",
+                "--coordinator-server-port",
+                "10011",
+            ],
+            "log_file": "instance_1.log",
+            "data_directory": f"{TEMP_DIR}/shared_instance",
+            "setup_queries": [],
+        },
+        "instance_2": {
+            "args": [
+                "--bolt-port",
+                "7689",
+                "--log-level",
+                "TRACE",
+                "--coordinator-server-port",
+                "10012",
+            ],
+            "log_file": "instance_2.log",
+            "data_directory": f"{TEMP_DIR}/instance_2",
+            "setup_queries": [],
+        },
+        "coordinator_1": {
+            "args": ["--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"],
+            "log_file": "coordinator.log",
+            "setup_queries": [
+                "REGISTER INSTANCE shared_instance ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
+                "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';",
+                "SET INSTANCE instance_2 TO MAIN",
+            ],
+        },
+    }
+
+    # 1
+    interactive_mg_runner.start_all_keep_others(MEMGRAPH_FISRT_COORD_CLUSTER_DESCRIPTION)
+
+    # 2
+
+    first_cluster_coord_cursor = connection(7690, "coord_1").cursor()
+
+    def show_repl_cluster():
+        return sorted(list(execute_and_fetch_all(first_cluster_coord_cursor, "SHOW INSTANCES;")))
+
+    expected_data_up_first_cluster = [
+        ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
+        ("instance_2", "", "127.0.0.1:10012", True, "main"),
+        ("shared_instance", "", "127.0.0.1:10011", True, "replica"),
+    ]
+
+    mg_sleep_and_assert(expected_data_up_first_cluster, show_repl_cluster)
+
+    # 3
+
+    MEMGRAPH_SECOND_COORD_CLUSTER_DESCRIPTION = {
+        "instance_3": {
+            "args": [
+                "--bolt-port",
+                "7687",
+                "--log-level",
+                "TRACE",
+                "--coordinator-server-port",
+                "10013",
+            ],
+            "log_file": "instance_3.log",
+            "data_directory": f"{TEMP_DIR}/instance_3",
+            "setup_queries": [],
+        },
+        "coordinator_2": {
+            "args": ["--bolt-port", "7691", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10112"],
+            "log_file": "coordinator.log",
+            "setup_queries": [],
+        },
+    }
+
+    interactive_mg_runner.start_all_keep_others(MEMGRAPH_SECOND_COORD_CLUSTER_DESCRIPTION)
+    second_cluster_coord_cursor = connection(7691, "coord_2").cursor()
+    execute_and_fetch_all(
+        second_cluster_coord_cursor, "REGISTER INSTANCE shared_instance ON '127.0.0.1:10011' WITH '127.0.0.1:10001';"
+    )
+    execute_and_fetch_all(
+        second_cluster_coord_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';"
+    )
+    execute_and_fetch_all(second_cluster_coord_cursor, "SET INSTANCE instance_3 TO MAIN")
+
+    # 4
+
+    def show_repl_cluster():
+        return sorted(list(execute_and_fetch_all(second_cluster_coord_cursor, "SHOW INSTANCES;")))
+
+    expected_data_up_second_cluster = [
+        ("coordinator_1", "127.0.0.1:10112", "", True, "coordinator"),
+        ("instance_3", "", "127.0.0.1:10013", True, "main"),
+        ("shared_instance", "", "127.0.0.1:10011", True, "replica"),
+    ]
+
+    mg_sleep_and_assert(expected_data_up_second_cluster, show_repl_cluster)
+
+    # 5
+    main_1_cursor = connection(7689, "main_1").cursor()
+    with pytest.raises(Exception) as e:
+        execute_and_fetch_all(main_1_cursor, "CREATE ();")
+    assert (
+        str(e.value)
+        == "Replication Exception: At least one SYNC replica has not confirmed committing last transaction. Check the status of the replicas using 'SHOW REPLICAS' query."
+    )
+
+    shared_replica_cursor = connection(7688, "shared_replica").cursor()
+    res = execute_and_fetch_all(shared_replica_cursor, "MATCH (n) RETURN count(n);")[0][0]
+    assert res == 0, "Old main should not replicate to 'shared' replica"
+
+    # 6
+    main_2_cursor = connection(7687, "main_2").cursor()
+
+    execute_and_fetch_all(main_2_cursor, "CREATE ();")
+
+    shared_replica_cursor = connection(7688, "shared_replica").cursor()
+    res = execute_and_fetch_all(shared_replica_cursor, "MATCH (n) RETURN count(n);")[0][0]
+    assert res == 1, "New main should replicate to 'shared' replica"
+
+    interactive_mg_runner.stop_all()
+
+
 if __name__ == "__main__":
     sys.exit(pytest.main([__file__, "-rA"]))