1047 lines
40 KiB
Python
1047 lines
40 KiB
Python
|
# Copyright 2022 Memgraph Ltd.
|
||
|
#
|
||
|
# Use of this software is governed by the Business Source License
|
||
|
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||
|
# License, and you may not use this file except in compliance with the Business Source License.
|
||
|
#
|
||
|
# As of the Change Date specified in that file, in accordance with
|
||
|
# the Business Source License, use of this software will be governed
|
||
|
# by the Apache License, Version 2.0, included in the file
|
||
|
# licenses/APL.txt.
|
||
|
|
||
|
import atexit
|
||
|
import os
|
||
|
import shutil
|
||
|
import sys
|
||
|
import tempfile
|
||
|
import time
|
||
|
from functools import partial
|
||
|
|
||
|
import interactive_mg_runner
|
||
|
import mgclient
|
||
|
import pytest
|
||
|
from common import execute_and_fetch_all
|
||
|
from mg_utils import mg_sleep_and_assert
|
||
|
|
||
|
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||
|
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
|
||
|
os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..")
|
||
|
)
|
||
|
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
|
||
|
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
|
||
|
|
||
|
BOLT_PORTS = {"main": 7687, "replica_1": 7688, "replica_2": 7689}
|
||
|
REPLICATION_PORTS = {"replica_1": 10001, "replica_2": 10002}
|
||
|
|
||
|
MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||
|
"replica_1": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"],
|
||
|
"log_file": "replica1.log",
|
||
|
"setup_queries": [f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_1']};"],
|
||
|
},
|
||
|
"replica_2": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['replica_2']}", "--log-level=TRACE"],
|
||
|
"log_file": "replica2.log",
|
||
|
"setup_queries": [f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_2']};"],
|
||
|
},
|
||
|
"main": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
|
||
|
"log_file": "main.log",
|
||
|
"setup_queries": [
|
||
|
f"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_1']}';",
|
||
|
f"REGISTER REPLICA replica_2 ASYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_2']}';",
|
||
|
],
|
||
|
},
|
||
|
}
|
||
|
|
||
|
TEMP_DIR = tempfile.TemporaryDirectory().name
|
||
|
|
||
|
MEMGRAPH_INSTANCES_DESCRIPTION_WITH_RECOVERY = {
|
||
|
"replica_1": {
|
||
|
"args": [
|
||
|
"--bolt-port",
|
||
|
f"{BOLT_PORTS['replica_1']}",
|
||
|
"--log-level=TRACE",
|
||
|
"--replication-restore-state-on-startup",
|
||
|
"--data-recovery-on-startup",
|
||
|
],
|
||
|
"log_file": "replica1.log",
|
||
|
"data_directory": TEMP_DIR + "/replica1",
|
||
|
},
|
||
|
"replica_2": {
|
||
|
"args": [
|
||
|
"--bolt-port",
|
||
|
f"{BOLT_PORTS['replica_2']}",
|
||
|
"--log-level=TRACE",
|
||
|
"--replication-restore-state-on-startup",
|
||
|
"--data-recovery-on-startup",
|
||
|
],
|
||
|
"log_file": "replica2.log",
|
||
|
"data_directory": TEMP_DIR + "/replica2",
|
||
|
},
|
||
|
"main": {
|
||
|
"args": [
|
||
|
"--bolt-port",
|
||
|
f"{BOLT_PORTS['main']}",
|
||
|
"--log-level=TRACE",
|
||
|
"--replication-restore-state-on-startup",
|
||
|
"--data-recovery-on-startup",
|
||
|
],
|
||
|
"log_file": "main.log",
|
||
|
"data_directory": TEMP_DIR + "/main",
|
||
|
},
|
||
|
}
|
||
|
|
||
|
|
||
|
def safe_execute(function, *args):
|
||
|
try:
|
||
|
function(*args)
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
|
||
|
def setup_replication(connection):
|
||
|
# Setup replica1
|
||
|
cursor = connection(BOLT_PORTS["replica_1"], "replica").cursor()
|
||
|
execute_and_fetch_all(cursor, f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_1']};")
|
||
|
# Setup replica2
|
||
|
cursor = connection(BOLT_PORTS["replica_2"], "replica").cursor()
|
||
|
execute_and_fetch_all(cursor, f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_2']};")
|
||
|
# Setup main
|
||
|
cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
execute_and_fetch_all(cursor, f"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_1']}';")
|
||
|
execute_and_fetch_all(cursor, f"REGISTER REPLICA replica_2 ASYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_2']}';")
|
||
|
|
||
|
|
||
|
def setup_main(main_cursor):
|
||
|
execute_and_fetch_all(main_cursor, "USE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'A'});")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node)-[:EDGE]->(:Node)")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node)-[:EDGE]->(:Node)")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node)-[:EDGE]->(:Node)")
|
||
|
|
||
|
execute_and_fetch_all(main_cursor, "USE DATABASE B;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'B'});")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'B'});")
|
||
|
|
||
|
|
||
|
def show_replicas_func(cursor, db_name):
|
||
|
def func():
|
||
|
execute_and_fetch_all(cursor, f"USE DATABASE {db_name};")
|
||
|
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
|
||
|
|
||
|
return func
|
||
|
|
||
|
|
||
|
def show_databases_func(cursor):
|
||
|
def func():
|
||
|
return execute_and_fetch_all(cursor, "SHOW DATABASES;")
|
||
|
|
||
|
return func
|
||
|
|
||
|
|
||
|
def get_number_of_nodes_func(cursor, db_name):
|
||
|
def func():
|
||
|
execute_and_fetch_all(cursor, f"USE DATABASE {db_name};")
|
||
|
return execute_and_fetch_all(cursor, "MATCH (n) RETURN count(*);")[0][0]
|
||
|
|
||
|
return func
|
||
|
|
||
|
|
||
|
def get_number_of_edges_func(cursor, db_name):
|
||
|
def func():
|
||
|
execute_and_fetch_all(cursor, f"USE DATABASE {db_name};")
|
||
|
return execute_and_fetch_all(cursor, "MATCH ()-[r]->() RETURN count(*);")[0][0]
|
||
|
|
||
|
return func
|
||
|
|
||
|
|
||
|
def test_manual_databases_create_multitenancy_replication(connection):
|
||
|
# Goal: to show that replication can be established against REPLICA which already
|
||
|
# has the clean databases we need
|
||
|
# 0/ MAIN CREATE DATABASE A + B
|
||
|
# REPLICA CREATE DATABASE A + B
|
||
|
# Setup replication
|
||
|
# 1/ Write to MAIN A, Write to MAIN B
|
||
|
# 2/ Validate replication of changes to A + B have arrived at REPLICA
|
||
|
|
||
|
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||
|
"replica_1": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"],
|
||
|
"log_file": "replica1.log",
|
||
|
"setup_queries": [
|
||
|
"CREATE DATABASE A;",
|
||
|
"CREATE DATABASE B;",
|
||
|
f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_1']};",
|
||
|
],
|
||
|
},
|
||
|
"replica_2": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['replica_2']}", "--log-level=TRACE"],
|
||
|
"log_file": "replica2.log",
|
||
|
"setup_queries": [
|
||
|
"CREATE DATABASE A;",
|
||
|
"CREATE DATABASE B;",
|
||
|
f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_2']};",
|
||
|
],
|
||
|
},
|
||
|
"main": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
|
||
|
"log_file": "main.log",
|
||
|
"setup_queries": [
|
||
|
"CREATE DATABASE A;",
|
||
|
"CREATE DATABASE B;",
|
||
|
f"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_1']}';",
|
||
|
f"REGISTER REPLICA replica_2 ASYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_2']}';",
|
||
|
],
|
||
|
},
|
||
|
}
|
||
|
|
||
|
# 0/
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL)
|
||
|
cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
execute_and_fetch_all(cursor, "USE DATABASE A;")
|
||
|
execute_and_fetch_all(cursor, "CREATE ();")
|
||
|
execute_and_fetch_all(cursor, "USE DATABASE B;")
|
||
|
execute_and_fetch_all(cursor, "CREATE ()-[:EDGE]->();")
|
||
|
|
||
|
# 2/
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "A"))
|
||
|
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "B"))
|
||
|
|
||
|
cursor_replica = connection(BOLT_PORTS["replica_1"], "replica").cursor()
|
||
|
assert get_number_of_nodes_func(cursor_replica, "A")() == 1
|
||
|
assert get_number_of_edges_func(cursor_replica, "A")() == 0
|
||
|
assert get_number_of_nodes_func(cursor_replica, "B")() == 2
|
||
|
assert get_number_of_edges_func(cursor_replica, "B")() == 1
|
||
|
|
||
|
cursor_replica2 = connection(BOLT_PORTS["replica_1"], "replica_2").cursor()
|
||
|
assert get_number_of_nodes_func(cursor_replica2, "A")() == 1
|
||
|
assert get_number_of_edges_func(cursor_replica2, "A")() == 0
|
||
|
assert get_number_of_nodes_func(cursor_replica2, "B")() == 2
|
||
|
assert get_number_of_edges_func(cursor_replica2, "B")() == 1
|
||
|
|
||
|
|
||
|
def test_manual_databases_create_multitenancy_replication_branching(connection):
|
||
|
# Goal: to show that replication can be established against REPLICA which already
|
||
|
# has all the databases and the same data
|
||
|
# 0/ MAIN CREATE DATABASE A + B and fill with data
|
||
|
# REPLICA CREATE DATABASE A + B and fil with exact data
|
||
|
# Setup REPLICA
|
||
|
# 1/ Registering REPLICA on MAIN should not fail due to tenant branching
|
||
|
|
||
|
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||
|
"replica_1": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"],
|
||
|
"log_file": "replica1.log",
|
||
|
"setup_queries": [
|
||
|
"CREATE DATABASE A;",
|
||
|
"USE DATABASE A;",
|
||
|
"CREATE ()",
|
||
|
"CREATE DATABASE B;",
|
||
|
"USE DATABASE B;",
|
||
|
"CREATE ()-[:EDGE]->()",
|
||
|
f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_1']};",
|
||
|
],
|
||
|
},
|
||
|
"replica_2": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['replica_2']}", "--log-level=TRACE"],
|
||
|
"log_file": "replica2.log",
|
||
|
"setup_queries": [
|
||
|
"CREATE DATABASE A;",
|
||
|
"USE DATABASE A;",
|
||
|
"CREATE ()",
|
||
|
"CREATE DATABASE B;",
|
||
|
"USE DATABASE B;",
|
||
|
"CREATE ()-[:EDGE]->()",
|
||
|
f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_2']};",
|
||
|
],
|
||
|
},
|
||
|
"main": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
|
||
|
"log_file": "main.log",
|
||
|
"setup_queries": [
|
||
|
"CREATE DATABASE A;",
|
||
|
"USE DATABASE A;",
|
||
|
"CREATE ()",
|
||
|
"CREATE DATABASE B;",
|
||
|
"USE DATABASE B;",
|
||
|
"CREATE ()-[:EDGE]->()",
|
||
|
],
|
||
|
},
|
||
|
}
|
||
|
|
||
|
# 0/
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL)
|
||
|
cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
failed = False
|
||
|
try:
|
||
|
execute_and_fetch_all(
|
||
|
cursor, f"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_1']}';"
|
||
|
)
|
||
|
except mgclient.DatabaseError:
|
||
|
failed = True
|
||
|
assert not failed
|
||
|
|
||
|
try:
|
||
|
execute_and_fetch_all(
|
||
|
cursor, f"REGISTER REPLICA replica_2 ASYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_2']}';"
|
||
|
)
|
||
|
except mgclient.DatabaseError:
|
||
|
failed = True
|
||
|
assert not failed
|
||
|
|
||
|
|
||
|
def test_manual_databases_create_multitenancy_replication_dirty_replica(connection):
|
||
|
# Goal: to show that replication can be established against REPLICA which already
|
||
|
# has all the databases we need, even when they branched
|
||
|
# 0/ MAIN CREATE DATABASE A
|
||
|
# REPLICA CREATE DATABASE A
|
||
|
# REPLICA write to A
|
||
|
# Setup REPLICA
|
||
|
# 1/ Register replica; should fail
|
||
|
|
||
|
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||
|
"replica_1": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"],
|
||
|
"log_file": "replica1.log",
|
||
|
"setup_queries": [
|
||
|
"CREATE DATABASE A;",
|
||
|
"USE DATABASE A;",
|
||
|
"CREATE (:Node{from:'A'})",
|
||
|
f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_1']};",
|
||
|
],
|
||
|
},
|
||
|
"replica_2": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['replica_2']}", "--log-level=TRACE"],
|
||
|
"log_file": "replica2.log",
|
||
|
"setup_queries": [
|
||
|
"CREATE DATABASE A;",
|
||
|
"USE DATABASE A;",
|
||
|
"CREATE (:Node{from:'A'})",
|
||
|
f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_2']};",
|
||
|
],
|
||
|
},
|
||
|
"main": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
|
||
|
"log_file": "main.log",
|
||
|
"setup_queries": [
|
||
|
"CREATE DATABASE A;",
|
||
|
],
|
||
|
},
|
||
|
}
|
||
|
|
||
|
# 0/
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL)
|
||
|
cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
failed = False
|
||
|
try:
|
||
|
execute_and_fetch_all(
|
||
|
cursor, f"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_1']}';"
|
||
|
)
|
||
|
except mgclient.DatabaseError:
|
||
|
failed = True
|
||
|
assert not failed
|
||
|
|
||
|
try:
|
||
|
execute_and_fetch_all(
|
||
|
cursor, f"REGISTER REPLICA replica_2 ASYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_2']}';"
|
||
|
)
|
||
|
except mgclient.DatabaseError:
|
||
|
failed = True
|
||
|
assert not failed
|
||
|
|
||
|
|
||
|
def test_manual_databases_create_multitenancy_replication_main_behind(connection):
|
||
|
# Goal: to show that replication can be established against REPLICA which has
|
||
|
# different branched databases
|
||
|
# 0/ REPLICA CREATE DATABASE A
|
||
|
# REPLICA write to A
|
||
|
# Setup replication
|
||
|
# 1/ MAIN CREATE DATABASE A
|
||
|
# 2/ Check that database has been replicated
|
||
|
|
||
|
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||
|
"replica_1": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"],
|
||
|
"log_file": "replica1.log",
|
||
|
"setup_queries": [
|
||
|
"CREATE DATABASE A;",
|
||
|
"USE DATABASE A;",
|
||
|
"CREATE (:Node{from:'A'})",
|
||
|
f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_1']};",
|
||
|
],
|
||
|
},
|
||
|
"replica_2": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['replica_2']}", "--log-level=TRACE"],
|
||
|
"log_file": "replica2.log",
|
||
|
"setup_queries": [
|
||
|
"CREATE DATABASE A;",
|
||
|
"USE DATABASE A;",
|
||
|
"CREATE (:Node{from:'A'})",
|
||
|
f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_2']};",
|
||
|
],
|
||
|
},
|
||
|
"main": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
|
||
|
"log_file": "main.log",
|
||
|
"setup_queries": [
|
||
|
f"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_1']}';",
|
||
|
f"REGISTER REPLICA replica_2 ASYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_2']}';",
|
||
|
],
|
||
|
},
|
||
|
}
|
||
|
|
||
|
# 0/
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL)
|
||
|
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
|
||
|
|
||
|
# 2/
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
|
||
|
|
||
|
databases_on_main = show_databases_func(main_cursor)()
|
||
|
|
||
|
replica_cursor = connection(BOLT_PORTS["replica_1"], "replica").cursor()
|
||
|
mg_sleep_and_assert(databases_on_main, show_databases_func(replica_cursor))
|
||
|
|
||
|
replica_cursor = connection(BOLT_PORTS["replica_2"], "replica").cursor()
|
||
|
mg_sleep_and_assert(databases_on_main, show_databases_func(replica_cursor))
|
||
|
|
||
|
|
||
|
def test_automatic_databases_create_multitenancy_replication(connection):
|
||
|
# Goal: to show that replication can be established against REPLICA where a new databases
|
||
|
# needs replication
|
||
|
# 0/ Setup replication
|
||
|
# 1/ MAIN CREATE DATABASE A
|
||
|
# 2/ Write to MAIN A
|
||
|
# 3/ Validate replication of changes to A have arrived at REPLICA
|
||
|
|
||
|
# 0/
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||
|
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE B;")
|
||
|
|
||
|
# 2/
|
||
|
execute_and_fetch_all(main_cursor, "USE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'A'});")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node)-[:EDGE]->(:Node)")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node)-[:EDGE]->(:Node)")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node)-[:EDGE]->(:Node)")
|
||
|
|
||
|
# 3/
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
|
||
|
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
|
||
|
|
||
|
cursor_replica = connection(BOLT_PORTS["replica_1"], "replica").cursor()
|
||
|
assert get_number_of_nodes_func(cursor_replica, "A")() == 7
|
||
|
assert get_number_of_edges_func(cursor_replica, "A")() == 3
|
||
|
assert get_number_of_nodes_func(cursor_replica, "B")() == 0
|
||
|
assert get_number_of_edges_func(cursor_replica, "B")() == 0
|
||
|
|
||
|
cursor_replica = connection(BOLT_PORTS["replica_2"], "replica").cursor()
|
||
|
assert get_number_of_nodes_func(cursor_replica, "A")() == 7
|
||
|
assert get_number_of_edges_func(cursor_replica, "A")() == 3
|
||
|
assert get_number_of_nodes_func(cursor_replica, "B")() == 0
|
||
|
assert get_number_of_edges_func(cursor_replica, "B")() == 0
|
||
|
|
||
|
|
||
|
def test_automatic_databases_multitenancy_replication_predefined(connection):
|
||
|
# Goal: to show that replication can be established against REPLICA which doesn't
|
||
|
# have any additional databases; MAIN's database clean at registration time
|
||
|
# 0/ MAIN CREATE DATABASE A + B
|
||
|
# Setup replication
|
||
|
# 1/ Write to MAIN A, Write to MAIN B
|
||
|
# 2/ Validate replication of changes to A + B have arrived at REPLICA
|
||
|
|
||
|
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||
|
"replica_1": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"],
|
||
|
"log_file": "replica1.log",
|
||
|
"setup_queries": [
|
||
|
f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_1']};",
|
||
|
],
|
||
|
},
|
||
|
"main": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
|
||
|
"log_file": "main.log",
|
||
|
"setup_queries": [
|
||
|
"CREATE DATABASE A;",
|
||
|
"CREATE DATABASE B;",
|
||
|
f"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_1']}';",
|
||
|
],
|
||
|
},
|
||
|
}
|
||
|
|
||
|
# 0/
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL)
|
||
|
cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
execute_and_fetch_all(cursor, "USE DATABASE A;")
|
||
|
execute_and_fetch_all(cursor, "CREATE ();")
|
||
|
execute_and_fetch_all(cursor, "USE DATABASE B;")
|
||
|
execute_and_fetch_all(cursor, "CREATE ()-[:EDGE]->();")
|
||
|
|
||
|
# 2/
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "A"))
|
||
|
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "B"))
|
||
|
|
||
|
cursor_replica = connection(BOLT_PORTS["replica_1"], "replica").cursor()
|
||
|
assert get_number_of_nodes_func(cursor_replica, "A")() == 1
|
||
|
assert get_number_of_edges_func(cursor_replica, "A")() == 0
|
||
|
|
||
|
|
||
|
def test_automatic_databases_create_multitenancy_replication_dirty_main(connection):
|
||
|
# Goal: to show that replication can be established against REPLICA which doesn't
|
||
|
# have any additional databases; MAIN's database dirty at registration time
|
||
|
# 0/ MAIN CREATE DATABASE A
|
||
|
# MAIN write to A
|
||
|
# Setup replication
|
||
|
# 1/ Validate
|
||
|
|
||
|
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||
|
"replica_1": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"],
|
||
|
"log_file": "replica1.log",
|
||
|
"setup_queries": [
|
||
|
f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_1']};",
|
||
|
],
|
||
|
},
|
||
|
"main": {
|
||
|
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
|
||
|
"log_file": "main.log",
|
||
|
"setup_queries": [
|
||
|
"CREATE DATABASE A;",
|
||
|
"USE DATABASE A;",
|
||
|
"CREATE (:Node{from:'A'})",
|
||
|
f"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_1']}';",
|
||
|
],
|
||
|
},
|
||
|
}
|
||
|
|
||
|
# 0/
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL)
|
||
|
cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "A"))
|
||
|
|
||
|
cursor_replica = connection(BOLT_PORTS["replica_1"], "replica").cursor()
|
||
|
execute_and_fetch_all(cursor_replica, "USE DATABASE A;")
|
||
|
actual_data = execute_and_fetch_all(cursor_replica, "MATCH (n) RETURN count(*);")
|
||
|
assert actual_data[0][0] == 1 # one node
|
||
|
actual_data = execute_and_fetch_all(cursor_replica, "MATCH ()-[r]->() RETURN count(*);")
|
||
|
assert actual_data[0][0] == 0 # zero relationships
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize("replica_name", [("replica_1"), ("replica_2")])
|
||
|
def test_multitenancy_replication_restart_replica_w_fc(connection, replica_name):
|
||
|
# Goal: show that a replica can be recovered with the frequent checker
|
||
|
# 0/ Setup replication
|
||
|
# 1/ MAIN CREATE DATABASE A and B
|
||
|
# 2/ Write on MAIN to A and B
|
||
|
# 3/ Restart replica
|
||
|
# 4/ Validate data on replica
|
||
|
|
||
|
# 0/
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||
|
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE B;")
|
||
|
|
||
|
# 2/
|
||
|
setup_main(main_cursor)
|
||
|
|
||
|
# 3/
|
||
|
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, replica_name)
|
||
|
time.sleep(3) # In order for the frequent check to run
|
||
|
# Check that the FC did invalidate
|
||
|
expected_data = {
|
||
|
"replica_1": {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "invalid"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
|
||
|
},
|
||
|
"replica_2": {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "invalid"),
|
||
|
},
|
||
|
}
|
||
|
assert expected_data[replica_name] == show_replicas_func(main_cursor, "A")()
|
||
|
# Restart
|
||
|
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, replica_name)
|
||
|
|
||
|
# 4/
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
|
||
|
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 3, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 3, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
|
||
|
|
||
|
cursor_replica = connection(BOLT_PORTS[replica_name], "replica").cursor()
|
||
|
|
||
|
assert get_number_of_nodes_func(cursor_replica, "A")() == 7
|
||
|
assert get_number_of_edges_func(cursor_replica, "A")() == 3
|
||
|
assert get_number_of_nodes_func(cursor_replica, "B")() == 2
|
||
|
assert get_number_of_edges_func(cursor_replica, "B")() == 0
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize("replica_name", [("replica_1"), ("replica_2")])
|
||
|
def test_multitenancy_replication_restart_replica_wo_fc(connection, replica_name):
|
||
|
# Goal: show that a replica can be recovered without the frequent checker detecting it being down
|
||
|
# needs replicating over
|
||
|
# 0/ Setup replication
|
||
|
# 1/ MAIN CREATE DATABASE A and B
|
||
|
# 2/ Write on MAIN to A and B
|
||
|
# 3/ Restart replica
|
||
|
# 4/ Validate data on replica
|
||
|
|
||
|
# 0/
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||
|
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE B;")
|
||
|
|
||
|
# 2/
|
||
|
setup_main(main_cursor)
|
||
|
|
||
|
# 3/
|
||
|
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, replica_name)
|
||
|
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, replica_name)
|
||
|
|
||
|
# 4/
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
|
||
|
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 3, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 3, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
|
||
|
|
||
|
cursor_replica = connection(BOLT_PORTS[replica_name], "replica").cursor()
|
||
|
assert get_number_of_nodes_func(cursor_replica, "A")() == 7
|
||
|
assert get_number_of_edges_func(cursor_replica, "A")() == 3
|
||
|
assert get_number_of_nodes_func(cursor_replica, "B")() == 2
|
||
|
assert get_number_of_edges_func(cursor_replica, "B")() == 0
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize("replica_name", [("replica_1"), ("replica_2")])
|
||
|
def test_multitenancy_replication_restart_replica_w_fc_w_rec(connection, replica_name):
|
||
|
# Goal: show that a replica recovers data on reconnect
|
||
|
# needs replicating over
|
||
|
# 0/ Setup replication
|
||
|
# 1/ MAIN CREATE DATABASE A and B
|
||
|
# 2/ Write on MAIN to A and B
|
||
|
# 3/ Restart replica
|
||
|
# 4/ Validate data on replica
|
||
|
|
||
|
# 0/
|
||
|
# Tmp dir should already be removed, but sometimes its not...
|
||
|
safe_execute(shutil.rmtree, TEMP_DIR)
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION_WITH_RECOVERY)
|
||
|
setup_replication(connection)
|
||
|
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE B;")
|
||
|
|
||
|
# 2/
|
||
|
setup_main(main_cursor)
|
||
|
|
||
|
# 3/
|
||
|
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION_WITH_RECOVERY, replica_name)
|
||
|
safe_execute(execute_and_fetch_all, main_cursor, "USE DATABASE A;")
|
||
|
safe_execute(execute_and_fetch_all, main_cursor, "CREATE (:Node{on:'A'});")
|
||
|
safe_execute(execute_and_fetch_all, main_cursor, "USE DATABASE B;")
|
||
|
safe_execute(execute_and_fetch_all, main_cursor, "CREATE (:Node{on:'B'});")
|
||
|
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION_WITH_RECOVERY, replica_name)
|
||
|
|
||
|
# 4/
|
||
|
cursor_replica = connection(BOLT_PORTS[replica_name], "replica").cursor()
|
||
|
|
||
|
mg_sleep_and_assert(8, get_number_of_nodes_func(cursor_replica, "A"))
|
||
|
mg_sleep_and_assert(3, get_number_of_edges_func(cursor_replica, "A"))
|
||
|
|
||
|
mg_sleep_and_assert(3, get_number_of_nodes_func(cursor_replica, "B"))
|
||
|
mg_sleep_and_assert(0, get_number_of_edges_func(cursor_replica, "B"))
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize("replica_name", [("replica_1"), ("replica_2")])
|
||
|
def test_multitenancy_replication_drop_replica(connection, replica_name):
|
||
|
# Goal: show that the cluster can recover if a replica is dropped and registered again
|
||
|
# 0/ Setup replication
|
||
|
# 1/ MAIN CREATE DATABASE A and B
|
||
|
# 2/ Write on MAIN to A and B
|
||
|
# 3/ Drop and add the same replica
|
||
|
# 4/ Validate data on replica
|
||
|
|
||
|
# 0/
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||
|
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE B;")
|
||
|
|
||
|
# 2/
|
||
|
setup_main(main_cursor)
|
||
|
|
||
|
# 3/
|
||
|
execute_and_fetch_all(main_cursor, f"DROP REPLICA {replica_name};")
|
||
|
sync = {"replica_1": "SYNC", "replica_2": "ASYNC"}
|
||
|
execute_and_fetch_all(
|
||
|
main_cursor,
|
||
|
f"REGISTER REPLICA {replica_name} {sync[replica_name]} TO '127.0.0.1:{REPLICATION_PORTS[replica_name]}';",
|
||
|
)
|
||
|
|
||
|
# 4/
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
|
||
|
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 3, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 3, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
|
||
|
|
||
|
cursor_replica = connection(BOLT_PORTS[replica_name], "replica").cursor()
|
||
|
assert get_number_of_nodes_func(cursor_replica, "A")() == 7
|
||
|
assert get_number_of_edges_func(cursor_replica, "A")() == 3
|
||
|
assert get_number_of_nodes_func(cursor_replica, "B")() == 2
|
||
|
assert get_number_of_edges_func(cursor_replica, "B")() == 0
|
||
|
|
||
|
|
||
|
def test_multitenancy_replication_restart_main(connection):
|
||
|
# Goal: show that the cluster can restore to a correct state if the MAIN restarts
|
||
|
# 0/ Setup replication
|
||
|
# 1/ MAIN CREATE DATABASE A and B
|
||
|
# 2/ Write on MAIN to A and B
|
||
|
# 3/ Restart main and write new data
|
||
|
# 4/ Validate data on replica
|
||
|
|
||
|
# 0/
|
||
|
# Tmp dir should already be removed, but sometimes its not...
|
||
|
safe_execute(shutil.rmtree, TEMP_DIR)
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION_WITH_RECOVERY)
|
||
|
setup_replication(connection)
|
||
|
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE B;")
|
||
|
|
||
|
# 2/
|
||
|
setup_main(main_cursor)
|
||
|
|
||
|
# 3/
|
||
|
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION_WITH_RECOVERY, "main")
|
||
|
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION_WITH_RECOVERY, "main")
|
||
|
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
execute_and_fetch_all(main_cursor, "USE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'A'});")
|
||
|
execute_and_fetch_all(main_cursor, "USE DATABASE B;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'B'});")
|
||
|
|
||
|
# 4/
|
||
|
cursor_replica = connection(BOLT_PORTS["replica_1"], "replica").cursor()
|
||
|
execute_and_fetch_all(cursor_replica, "USE DATABASE A;")
|
||
|
assert get_number_of_nodes_func(cursor_replica, "A")() == 8
|
||
|
assert get_number_of_edges_func(cursor_replica, "A")() == 3
|
||
|
assert get_number_of_nodes_func(cursor_replica, "B")() == 3
|
||
|
assert get_number_of_edges_func(cursor_replica, "B")() == 0
|
||
|
|
||
|
cursor_replica = connection(BOLT_PORTS["replica_2"], "replica").cursor()
|
||
|
execute_and_fetch_all(cursor_replica, "USE DATABASE A;")
|
||
|
assert get_number_of_nodes_func(cursor_replica, "A")() == 8
|
||
|
assert get_number_of_edges_func(cursor_replica, "A")() == 3
|
||
|
assert get_number_of_nodes_func(cursor_replica, "B")() == 3
|
||
|
assert get_number_of_edges_func(cursor_replica, "B")() == 0
|
||
|
|
||
|
|
||
|
def test_automatic_databases_drop_multitenancy_replication(connection):
|
||
|
# Goal: show that drop database can be replicated
|
||
|
# 0/ Setup replication
|
||
|
# 1/ MAIN CREATE DATABASE A
|
||
|
# 2/ Write to MAIN A
|
||
|
# 3/ Validate replication of changes to A have arrived at REPLICA
|
||
|
# 4/ DROP DATABASE A/B
|
||
|
# 5/ Check that the drop replicated
|
||
|
|
||
|
# 0/
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||
|
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE B;")
|
||
|
|
||
|
# 2/
|
||
|
execute_and_fetch_all(main_cursor, "USE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'A'});")
|
||
|
|
||
|
# 3/
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
|
||
|
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
|
||
|
|
||
|
# 4/
|
||
|
execute_and_fetch_all(main_cursor, "USE DATABASE memgraph;")
|
||
|
execute_and_fetch_all(main_cursor, "DROP DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "DROP DATABASE B;")
|
||
|
|
||
|
# 5/
|
||
|
databases_on_main = show_databases_func(main_cursor)()
|
||
|
|
||
|
replica_cursor = connection(BOLT_PORTS["replica_1"], "replica").cursor()
|
||
|
mg_sleep_and_assert(databases_on_main, show_databases_func(replica_cursor))
|
||
|
|
||
|
replica_cursor = connection(BOLT_PORTS["replica_2"], "replica").cursor()
|
||
|
mg_sleep_and_assert(databases_on_main, show_databases_func(replica_cursor))
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize("replica_name", [("replica_1"), ("replica_2")])
|
||
|
def test_drop_multitenancy_replication_restart_replica(connection, replica_name):
|
||
|
# Goal: show that the drop database can be restored
|
||
|
# 0/ Setup replication
|
||
|
# 1/ MAIN CREATE DATABASE A and B
|
||
|
# 2/ Write on MAIN to A and B
|
||
|
# 3/ Restart SYNC replica and drop database
|
||
|
# 4/ Validate data on replica
|
||
|
|
||
|
# 0/
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||
|
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE B;")
|
||
|
|
||
|
# 2/
|
||
|
setup_main(main_cursor)
|
||
|
|
||
|
# 3/
|
||
|
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, replica_name)
|
||
|
execute_and_fetch_all(main_cursor, "USE DATABASE memgraph;")
|
||
|
execute_and_fetch_all(main_cursor, "DROP DATABASE B;")
|
||
|
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, replica_name)
|
||
|
|
||
|
# 4/
|
||
|
databases_on_main = show_databases_func(main_cursor)()
|
||
|
|
||
|
replica_cursor = connection(BOLT_PORTS["replica_1"], "replica").cursor()
|
||
|
mg_sleep_and_assert(databases_on_main, show_databases_func(replica_cursor))
|
||
|
|
||
|
replica_cursor = connection(BOLT_PORTS["replica_2"], "replica").cursor()
|
||
|
mg_sleep_and_assert(databases_on_main, show_databases_func(replica_cursor))
|
||
|
|
||
|
|
||
|
def test_multitenancy_drop_while_replica_using(connection):
|
||
|
# Goal: show that the replica can handle a transaction on a database being dropped (will persist until tx finishes)
|
||
|
# 0/ Setup replication
|
||
|
# 1/ MAIN CREATE DATABASE A
|
||
|
# 2/ Write to MAIN A
|
||
|
# 3/ Validate replication of changes to A have arrived at REPLICA
|
||
|
# 4/ Start A transaction on replica 1, Use A on replica2
|
||
|
# 5/ Check that the drop replicated
|
||
|
# 6/ Validate that the transaction is still active and working and that the replica2 is not pointing to anything
|
||
|
|
||
|
# 0/
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||
|
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
|
||
|
|
||
|
# 2/
|
||
|
execute_and_fetch_all(main_cursor, "USE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'A'});")
|
||
|
|
||
|
# 3/
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
|
||
|
|
||
|
# 4/
|
||
|
replica1_cursor = connection(BOLT_PORTS["replica_1"], "replica").cursor()
|
||
|
replica2_cursor = connection(BOLT_PORTS["replica_2"], "replica").cursor()
|
||
|
|
||
|
execute_and_fetch_all(replica1_cursor, "USE DATABASE A;")
|
||
|
execute_and_fetch_all(replica1_cursor, "BEGIN")
|
||
|
execute_and_fetch_all(replica2_cursor, "USE DATABASE A;")
|
||
|
|
||
|
execute_and_fetch_all(main_cursor, "USE DATABASE memgraph;")
|
||
|
execute_and_fetch_all(main_cursor, "DROP DATABASE A;")
|
||
|
|
||
|
# 5/
|
||
|
# TODO Remove this once there is a replica state for the system
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE B;")
|
||
|
execute_and_fetch_all(main_cursor, "USE DATABASE B;")
|
||
|
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
|
||
|
|
||
|
# 6/
|
||
|
assert execute_and_fetch_all(replica1_cursor, "MATCH(n) RETURN count(*);")[0][0] == 1
|
||
|
execute_and_fetch_all(replica1_cursor, "COMMIT")
|
||
|
failed = False
|
||
|
try:
|
||
|
execute_and_fetch_all(replica1_cursor, "MATCH(n) RETURN n;")
|
||
|
except mgclient.DatabaseError:
|
||
|
failed = True
|
||
|
assert failed
|
||
|
|
||
|
failed = False
|
||
|
try:
|
||
|
execute_and_fetch_all(replica2_cursor, "MATCH(n) RETURN n;")
|
||
|
except mgclient.DatabaseError:
|
||
|
failed = True
|
||
|
assert failed
|
||
|
|
||
|
|
||
|
def test_multitenancy_drop_and_recreate_while_replica_using(connection):
|
||
|
# Goal: show that the replica can handle a transaction on a database being dropped and the same name reused
|
||
|
# Original storage should persist in a nameless state until tx is over
|
||
|
# needs replicating over
|
||
|
# 0/ Setup replication
|
||
|
# 1/ MAIN CREATE DATABASE A
|
||
|
# 2/ Write to MAIN A
|
||
|
# 3/ Validate replication of changes to A have arrived at REPLICA
|
||
|
# 4/ Start A transaction on replica 1, Use A on replica2
|
||
|
# 5/ Check that the drop/create replicated
|
||
|
# 6/ Validate that the transaction is still active and working and that the replica2 is not pointing to anything
|
||
|
|
||
|
# 0/
|
||
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||
|
main_cursor = connection(BOLT_PORTS["main"], "main").cursor()
|
||
|
|
||
|
# 1/
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
|
||
|
|
||
|
# 2/
|
||
|
execute_and_fetch_all(main_cursor, "USE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'A'});")
|
||
|
|
||
|
# 3/
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
|
||
|
|
||
|
# 4/
|
||
|
replica1_cursor = connection(BOLT_PORTS["replica_1"], "replica").cursor()
|
||
|
replica2_cursor = connection(BOLT_PORTS["replica_2"], "replica").cursor()
|
||
|
|
||
|
execute_and_fetch_all(replica1_cursor, "USE DATABASE A;")
|
||
|
execute_and_fetch_all(replica1_cursor, "BEGIN")
|
||
|
execute_and_fetch_all(replica2_cursor, "USE DATABASE A;")
|
||
|
|
||
|
execute_and_fetch_all(main_cursor, "USE DATABASE memgraph;")
|
||
|
execute_and_fetch_all(main_cursor, "DROP DATABASE A;")
|
||
|
|
||
|
# 5/
|
||
|
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
|
||
|
execute_and_fetch_all(main_cursor, "USE DATABASE A;")
|
||
|
|
||
|
expected_data = {
|
||
|
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
|
||
|
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
|
||
|
}
|
||
|
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
|
||
|
|
||
|
# 6/
|
||
|
assert execute_and_fetch_all(replica1_cursor, "MATCH(n) RETURN count(*);")[0][0] == 1
|
||
|
execute_and_fetch_all(replica1_cursor, "COMMIT")
|
||
|
failed = False
|
||
|
try:
|
||
|
execute_and_fetch_all(replica1_cursor, "MATCH(n) RETURN n;")
|
||
|
except mgclient.DatabaseError:
|
||
|
failed = True
|
||
|
assert failed
|
||
|
|
||
|
failed = False
|
||
|
try:
|
||
|
execute_and_fetch_all(replica2_cursor, "MATCH(n) RETURN n;")
|
||
|
except mgclient.DatabaseError:
|
||
|
failed = True
|
||
|
assert failed
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
interactive_mg_runner.cleanup_directories_on_exit()
|
||
|
sys.exit(pytest.main([__file__, "-rA"]))
|