Merge branch 'automatic-failover' of github.com:memgraph/memgraph into automatic-failover

This commit is contained in:
antoniofilipovic 2024-01-25 14:00:43 +01:00
commit 62d1b68c2f
8 changed files with 260 additions and 134 deletions

View File

@ -96,7 +96,7 @@ jobs:
- name: Python code analysis
run: |
CHANGED_FILES=$(git diff -U0 ${{ env.BASE_BRANCH }}... --name-only)
CHANGED_FILES=$(git diff -U0 ${{ env.BASE_BRANCH }}... --name-only --diff-filter=d)
for file in ${CHANGED_FILES}; do
echo ${file}
if [[ ${file} == *.py ]]; then

View File

@ -506,7 +506,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
throw QueryRuntimeException("Couldn't register replica instance since this instance is not a coordinator!");
case RPC_FAILED:
throw QueryRuntimeException(
"Couldn't register replica because promotion on replica failed! Check replica for more logs");
"Couldn't register replica because promotion on replica failed! Check logs on replica to find out more "
"info!");
case SUCCESS:
break;
}

View File

@ -1,7 +1,6 @@
find_package(gflags REQUIRED)
copy_e2e_python_files(ha_experimental coordinator.py)
copy_e2e_python_files(ha_experimental show_replication_cluster.py)
copy_e2e_python_files(ha_experimental automatic_failover.py)
copy_e2e_python_files(ha_experimental common.py)
copy_e2e_python_files(ha_experimental conftest.py)

View File

@ -28,33 +28,77 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": ["--bolt-port", "7688", "--log-level", "TRACE", "--coordinator-server-port", "10011"],
"log_file": "replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
"setup_queries": [],
},
"instance_2": {
"args": ["--bolt-port", "7689", "--log-level", "TRACE", "--coordinator-server-port", "10012"],
"log_file": "replica2.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
"setup_queries": [],
},
"instance_3": {
"args": ["--bolt-port", "7687", "--log-level", "TRACE", "--coordinator-server-port", "10013"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA instance_1 SYNC TO '127.0.0.1:10001'",
"REGISTER REPLICA instance_2 SYNC TO '127.0.0.1:10002'",
],
"setup_queries": [],
},
"coordinator": {
"args": ["--bolt-port", "7690", "--log-level=TRACE", "--coordinator"],
"log_file": "replica3.log",
"setup_queries": [
"REGISTER REPLICA instance_1 SYNC TO '127.0.0.1:10001' WITH COORDINATOR SERVER ON '127.0.0.1:10011';",
"REGISTER REPLICA instance_2 SYNC TO '127.0.0.1:10002' WITH COORDINATOR SERVER ON '127.0.0.1:10012';",
"REGISTER MAIN instance_3 WITH COORDINATOR SERVER ON '127.0.0.1:10013';",
"REGISTER INSTANCE instance_1 ON '127.0.0.1:100011' WITH '127.0.0.1:10001';",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:100012' WITH '127.0.0.1:10002';",
"REGISTER INSTANCE instance_3 ON '127.0.0.1:100013' WITH '127.0.0.1:10003';",
"SET INSTANCE instance_3 TO MAIN",
],
},
}
def test_show_replication_cluster(connection):
# Goal of this test is to check the SHOW REPLICATION CLUSTER command.
# 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 2. We check that all replicas and main have the correct state: they should all be alive.
# 3. We kill one replica. It should not appear anymore in the SHOW REPLICATION CLUSTER command.
# 4. We kill main. It should not appear anymore in the SHOW REPLICATION CLUSTER command.
# 1.
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connection(7690, "coordinator").cursor()
# 2.
# We leave some time for the coordinator to realise the replicas are down.
def retrieve_data():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
expected_data = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
# 3.
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data = [
("instance_1", "127.0.0.1:10011", False, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
# 4.
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data = [
("instance_1", "127.0.0.1:10011", False, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
def test_simple_automatic_failover(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
@ -98,7 +142,7 @@ def test_registering_replica_fails_name_exists(connection):
with pytest.raises(Exception) as e:
execute_and_fetch_all(
coord_cursor,
"REGISTER REPLICA instance_1 SYNC TO '127.0.0.1:10051' WITH COORDINATOR SERVER ON '127.0.0.1:10111';",
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10051' WITH '127.0.0.1:10111';",
)
assert str(e.value) == "Couldn't register replica instance since instance with such name already exists!"
@ -110,7 +154,7 @@ def test_registering_replica_fails_endpoint_exists(connection):
with pytest.raises(Exception) as e:
execute_and_fetch_all(
coord_cursor,
"REGISTER REPLICA instance_5 SYNC TO '127.0.0.1:10001' WITH COORDINATOR SERVER ON '127.0.0.1:10013';",
"REGISTER INSTANCE instance_5 ON '127.0.0.1:10001' WITH '127.0.0.1:10013';",
)
assert str(e.value) == "Couldn't register replica instance since instance with such endpoint already exists!"

View File

@ -78,7 +78,7 @@ def test_main_and_replicas_cannot_register_coord_server(port, role, connection):
with pytest.raises(Exception) as e:
execute_and_fetch_all(
cursor,
"REGISTER REPLICA instance_1 SYNC TO '127.0.0.1:10001' WITH COORDINATOR SERVER ON '127.0.0.1:10011';",
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10001' WITH '127.0.0.1:10011';",
)
assert str(e.value) == "Only coordinator can register coordinator server!"

View File

@ -1,105 +0,0 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import os
import sys
import interactive_mg_runner
import pytest
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..")
)
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
MEMGRAPH_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": ["--bolt-port", "7688", "--log-level", "TRACE", "--coordinator-server-port", "10011"],
"log_file": "replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
},
"instance_2": {
"args": ["--bolt-port", "7689", "--log-level", "TRACE", "--coordinator-server-port", "10012"],
"log_file": "replica2.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
},
"instance_3": {
"args": ["--bolt-port", "7687", "--log-level", "TRACE", "--coordinator-server-port", "10013"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA instance_1 SYNC TO '127.0.0.1:10001'",
"REGISTER REPLICA instance_2 SYNC TO '127.0.0.1:10002'",
],
},
"coordinator": {
"args": ["--bolt-port", "7690", "--log-level=TRACE", "--coordinator"],
"log_file": "replica3.log",
"setup_queries": [
"REGISTER REPLICA instance_1 SYNC TO '127.0.0.1:10001' WITH COORDINATOR SERVER ON '127.0.0.1:10011';",
"REGISTER REPLICA instance_2 SYNC TO '127.0.0.1:10002' WITH COORDINATOR SERVER ON '127.0.0.1:10012';",
"REGISTER MAIN instance_3 WITH COORDINATOR SERVER ON '127.0.0.1:10013';",
],
},
}
def test_show_replication_cluster(connection):
# Goal of this test is to check the SHOW REPLICATION CLUSTER command.
# 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 2. We check that all replicas and main have the correct state: they should all be alive.
# 3. We kill one replica. It should not appear anymore in the SHOW REPLICATION CLUSTER command.
# 4. We kill main. It should not appear anymore in the SHOW REPLICATION CLUSTER command.
# 1.
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connection(7690, "coordinator").cursor()
# 2.
# We leave some time for the coordinator to realise the replicas are down.
def retrieve_data():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
expected_data = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
# 3.
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data = [
("instance_1", "127.0.0.1:10011", False, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
# 4.
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data = [
("instance_1", "127.0.0.1:10011", False, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -3,38 +3,31 @@ ha_cluster: &ha_cluster
replica_1:
args: ["--bolt-port", "7688", "--log-level=TRACE", "--coordinator-server-port=10011"]
log_file: "replication-e2e-replica1.log"
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"]
setup_queries: []
replica_2:
args: ["--bolt-port", "7689", "--log-level=TRACE", "--coordinator-server-port=10012"]
log_file: "replication-e2e-replica2.log"
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"]
setup_queries: []
main:
args: ["--bolt-port", "7687", "--log-level=TRACE", "--coordinator-server-port=10013"]
log_file: "replication-e2e-main.log"
setup_queries: [
"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001'",
"REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002'",
]
setup_queries: []
coordinator:
args: ["--bolt-port", "7690", "--log-level=TRACE", "--coordinator"]
log_file: "replication-e2e-coordinator.log"
setup_queries: [
"REGISTER MAIN main WITH COORDINATOR SERVER ON '127.0.0.1:10013'",
"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001' WITH COORDINATOR SERVER ON '127.0.0.1:10011'",
"REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002' WITH COORDINATOR SERVER ON '127.0.0.1:10012'",
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';",
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';",
"SET INSTANCE instance_3 TO MAIN;"
]
workloads:
- name: "Coordinator"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/coordinator.py"]
<<: *ha_cluster
- name: "Show replication cluster"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/show_replication_cluster.py"]
- name: "Automatic failover"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/automatic_failover.py"]

194
tests/stress/test_config.py Normal file
View File

@ -0,0 +1,194 @@
import itertools
import os
from dataclasses import dataclass
from typing import List
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
STATS_FILE = os.path.join(SCRIPT_DIR, ".long_running_stats")
class DatasetConstants:
TEST = "test"
OPTIONS = "options"
TIMEOUT = "timeout"
MODE = "mode"
MEMGRAPH_OPTIONS = "memgraph_options"
@dataclass
class DatabaseMode:
storage_mode: str
isolation_level: str
class StorageModeConstants:
IN_MEMORY_TRANSACTIONAL = "IN_MEMORY_TRANSACTIONAL"
IN_MEMORY_ANALYTICAL = "IN_MEMORY_ANALYTICAL"
@classmethod
def to_list(cls) -> List[str]:
return [cls.IN_MEMORY_TRANSACTIONAL, cls.IN_MEMORY_ANALYTICAL]
class IsolationLevelConstants:
SNAPSHOT_ISOLATION = "SNAPSHOT ISOLATION"
READ_COMMITED = "READ COMMITED"
READ_UNCOMMITED = "READ UNCOMMITED"
@classmethod
def to_list(cls) -> List[str]:
return [cls.SNAPSHOT_SERIALIZATION, cls.READ_COMMITED, cls.READ_UNCOMMITED]
def get_default_database_mode() -> DatabaseMode:
return DatabaseMode(StorageModeConstants.IN_MEMORY_TRANSACTIONAL, IsolationLevelConstants.SNAPSHOT_ISOLATION)
def get_all_database_modes() -> List[DatabaseMode]:
return [
DatabaseMode(x[0], x[1])
for x in itertools.product(StorageModeConstants.to_list(), IsolationLevelConstants.to_list())
]
# dataset calibrated for running on Apollo (total 4min)
# bipartite.py runs for approx. 30s
# create_match.py runs for approx. 30s
# long_running runs for 1min
# long_running runs for 2min
SMALL_DATASET = [
{
DatasetConstants.TEST: "bipartite.py",
DatasetConstants.OPTIONS: ["--u-count", "100", "--v-count", "100"],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "detach_delete.py",
DatasetConstants.OPTIONS: ["--worker-count", "4", "--repetition-count", "100"],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "memory_tracker.py",
DatasetConstants.OPTIONS: ["--worker-count", "5", "--repetition-count", "100"],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
DatasetConstants.MEMGRAPH_OPTIONS: ["--memory-limit=2048"],
},
{
DatasetConstants.TEST: "memory_limit.py",
DatasetConstants.OPTIONS: ["--worker-count", "5", "--repetition-count", "100"],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
DatasetConstants.MEMGRAPH_OPTIONS: ["--memory-limit=2048"],
},
{
DatasetConstants.TEST: "create_match.py",
DatasetConstants.OPTIONS: ["--vertex-count", "40000", "--create-pack-size", "100"],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "parser.cpp",
DatasetConstants.OPTIONS: ["--per-worker-query-count", "1000"],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "long_running.cpp",
DatasetConstants.OPTIONS: [
"--vertex-count",
"1000",
"--edge-count",
"5000",
"--max-time",
"1",
"--verify",
"20",
],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "long_running.cpp",
DatasetConstants.OPTIONS: [
"--vertex-count",
"10000",
"--edge-count",
"50000",
"--max-time",
"2",
"--verify",
"30",
"--stats-file",
STATS_FILE,
],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
]
# dataset calibrated for running on daily stress instance (total 9h)
# bipartite.py and create_match.py run for approx. 15min
# long_running runs for 5min x 6 times = 30min
# long_running runs for 8h
LARGE_DATASET = (
[
{
DatasetConstants.TEST: "bipartite.py",
DatasetConstants.OPTIONS: ["--u-count", "300", "--v-count", "300"],
DatasetConstants.TIMEOUT: 30,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "detach_delete.py",
DatasetConstants.OPTIONS: ["--worker-count", "4", "--repetition-count", "300"],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "create_match.py",
DatasetConstants.OPTIONS: ["--vertex-count", "500000", "--create-pack-size", "500"],
DatasetConstants.TIMEOUT: 30,
DatasetConstants.MODE: [get_default_database_mode()],
},
]
+ [
{
DatasetConstants.TEST: "long_running.cpp",
DatasetConstants.OPTIONS: [
"--vertex-count",
"10000",
"--edge-count",
"40000",
"--max-time",
"5",
"--verify",
"60",
],
DatasetConstants.TIMEOUT: 16,
DatasetConstants.MODE: [get_default_database_mode()],
},
]
* 6
+ [
{
DatasetConstants.TEST: "long_running.cpp",
DatasetConstants.OPTIONS: [
"--vertex-count",
"200000",
"--edge-count",
"1000000",
"--max-time",
"480",
"--verify",
"300",
"--stats-file",
STATS_FILE,
],
DatasetConstants.TIMEOUT: 500,
DatasetConstants.MODE: [get_default_database_mode()],
},
]
)