Add replica state to SHOW REPLICAS (#379)

This commit is contained in:
Marko Budiselić 2022-06-20 12:28:42 +02:00 committed by GitHub
parent 1fb49c4865
commit 599c0a641f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 423 additions and 50 deletions

View File

@ -2391,6 +2391,9 @@ cpp<#
(lcp:define-enum sync-mode
(sync async)
(:serialize))
(lcp:define-enum replica-state
(ready replicating recovery invalid)
(:serialize))
#>cpp
ReplicationQuery() = default;

View File

@ -231,7 +231,20 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
if (repl_info.timeout) {
replica.timeout = *repl_info.timeout;
}
switch (repl_info.state) {
case storage::replication::ReplicaState::READY:
replica.state = ReplicationQuery::ReplicaState::READY;
break;
case storage::replication::ReplicaState::REPLICATING:
replica.state = ReplicationQuery::ReplicaState::REPLICATING;
break;
case storage::replication::ReplicaState::RECOVERY:
replica.state = ReplicationQuery::ReplicaState::RECOVERY;
break;
case storage::replication::ReplicaState::INVALID:
replica.state = ReplicationQuery::ReplicaState::INVALID;
break;
}
return replica;
};
@ -486,6 +499,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
fmt::format("Replica {} is registered.", repl_query->replica_name_));
return callback;
}
case ReplicationQuery::Action::DROP_REPLICA: {
const auto &name = repl_query->replica_name_;
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name]() mutable {
@ -496,8 +510,9 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
fmt::format("Replica {} is dropped.", repl_query->replica_name_));
return callback;
}
case ReplicationQuery::Action::SHOW_REPLICAS: {
callback.header = {"name", "socket_address", "sync_mode", "timeout"};
callback.header = {"name", "socket_address", "sync_mode", "timeout", "state"};
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, replica_nfields = callback.header.size()] {
const auto &replicas = handler.ShowReplicas();
auto typed_replicas = std::vector<std::vector<TypedValue>>{};
@ -508,6 +523,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
typed_replica.emplace_back(TypedValue(replica.name));
typed_replica.emplace_back(TypedValue(replica.socket_address));
switch (replica.sync_mode) {
case ReplicationQuery::SyncMode::SYNC:
typed_replica.emplace_back(TypedValue("sync"));
@ -516,12 +532,28 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
typed_replica.emplace_back(TypedValue("async"));
break;
}
if (replica.timeout) {
typed_replica.emplace_back(TypedValue(*replica.timeout));
} else {
typed_replica.emplace_back(TypedValue());
}
switch (replica.state) {
case ReplicationQuery::ReplicaState::READY:
typed_replica.emplace_back(TypedValue("ready"));
break;
case ReplicationQuery::ReplicaState::REPLICATING:
typed_replica.emplace_back(TypedValue("replicating"));
break;
case ReplicationQuery::ReplicaState::RECOVERY:
typed_replica.emplace_back(TypedValue("recovery"));
break;
case ReplicationQuery::ReplicaState::INVALID:
typed_replica.emplace_back(TypedValue("invalid"));
break;
}
typed_replicas.emplace_back(std::move(typed_replica));
}
return typed_replicas;

View File

@ -127,6 +127,7 @@ class ReplicationQueryHandler {
std::string socket_address;
ReplicationQuery::SyncMode sync_mode;
std::optional<double> timeout;
ReplicationQuery::ReplicaState state;
};
/// @throw QueryRuntimeException if an error ocurred.

View File

@ -306,6 +306,13 @@ Storage::Storage(Config config)
uuid_(utils::GenerateUUID()),
epoch_id_(utils::GenerateUUID()),
global_locker_(file_retainer_.AddLocker()) {
if (config_.durability.snapshot_wal_mode == Config::Durability::SnapshotWalMode::DISABLED &&
replication_role_ == ReplicationRole::MAIN) {
spdlog::warn(
"The instance has the MAIN replication role, but durability logs and snapshots are disabled. Please consider "
"enabling durability by using --storage-snapshot-interval-sec and --storage-wal-enabled flags because "
"without write-ahead logs this instance is not replicating any data.");
}
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED ||
config_.durability.snapshot_on_exit || config_.durability.recover_on_startup) {
// Create the directory initially to crash the database in case of

View File

@ -1,25 +1,32 @@
# Set up C++ functions for e2e tests
function(add_query_module target_name src)
add_library(${target_name} SHARED ${src})
SET_TARGET_PROPERTIES(${target_name} PROPERTIES PREFIX "")
target_include_directories(${target_name} PRIVATE ${CMAKE_SOURCE_DIR}/include)
add_library(${target_name} SHARED ${src})
SET_TARGET_PROPERTIES(${target_name} PROPERTIES PREFIX "")
target_include_directories(${target_name} PRIVATE ${CMAKE_SOURCE_DIR}/include)
endfunction()
function(copy_e2e_python_files TARGET_PREFIX FILE_NAME)
add_custom_target(memgraph__e2e__${TARGET_PREFIX}__${FILE_NAME} ALL
COMMAND ${CMAKE_COMMAND} -E copy
add_custom_target(memgraph__e2e__${TARGET_PREFIX}__${FILE_NAME} ALL
COMMAND ${CMAKE_COMMAND} -E copy
${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME}
${CMAKE_CURRENT_BINARY_DIR}/${FILE_NAME}
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME})
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME})
endfunction()
function(copy_e2e_python_files_from_parent_folder TARGET_PREFIX EXTRA_PATH FILE_NAME)
add_custom_target(memgraph__e2e__${TARGET_PREFIX}__${FILE_NAME} ALL
COMMAND ${CMAKE_COMMAND} -E copy
${CMAKE_CURRENT_SOURCE_DIR}/${EXTRA_PATH}/${FILE_NAME}
${CMAKE_CURRENT_BINARY_DIR}/${FILE_NAME}
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${EXTRA_PATH}/${FILE_NAME})
endfunction()
function(copy_e2e_cpp_files TARGET_PREFIX FILE_NAME)
add_custom_target(memgraph__e2e__${TARGET_PREFIX}__${FILE_NAME} ALL
COMMAND ${CMAKE_COMMAND} -E copy
add_custom_target(memgraph__e2e__${TARGET_PREFIX}__${FILE_NAME} ALL
COMMAND ${CMAKE_COMMAND} -E copy
${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME}
${CMAKE_CURRENT_BINARY_DIR}/${FILE_NAME}
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME})
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME})
endfunction()
add_subdirectory(server)

View File

@ -0,0 +1,219 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
# TODO(gitbuda): Add action to print the context/cluster.
# TODO(gitbuda): Add action to print logs of each Memgraph instance.
# TODO(gitbuda): Polish naming within script.
# TODO(gitbuda): Consider moving this somewhere higher in the project or even put inside GQLAlchmey.
# The idea here is to implement simple interactive runner of Memgraph instances because:
# * it should be possible to manually create new test cases first
# by just running this script and executing command manually from e.g. mgconsole,
# running single instance of Memgraph is easy but running multiple instances and
# controlling them is not that easy
# * it should be easy to create new operational test without huge knowledge overhead
# by e.g. calling `process_actions` from any e2e Python test, the test will contain the
# string with all actions and should run test code in a different thread.
#
# NOTE: The intention here is not to provide infrastructure to write data
# correctness tests or any heavy workload, the intention is to being able to
# easily test e2e "operational" cases, simple cluster setup and basic Memgraph
# operational queries. For any type of data correctness tests Jepsen or similar
# approaches have to be employed.
# NOTE: The instance description / context should be compatible with tests/e2e/runner.py
import atexit
import logging
import os
import subprocess
from argparse import ArgumentParser
from pathlib import Path
import time
import sys
from inspect import signature
import yaml
from memgraph import MemgraphInstanceRunner
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
BUILD_DIR = os.path.join(PROJECT_DIR, "build")
MEMGRAPH_BINARY = os.path.join(BUILD_DIR, "memgraph")
# Cluster description, injectable as the context.
# If the script argument is not provided, the following will be used as a default.
MEMGRAPH_INSTANCES_DESCRIPTION = {
"replica1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"log_file": "replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
},
"replica2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
"log_file": "replica2.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
},
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA replica1 SYNC TO '127.0.0.1:10001'",
"REGISTER REPLICA replica2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'",
],
},
}
MEMGRAPH_INSTANCES = {}
ACTIONS = {
"info": lambda context: info(context),
"stop": lambda context, name: stop(context, name),
"start": lambda context, name: start(context, name),
"sleep": lambda context, delta: time.sleep(float(delta)),
"exit": lambda context: sys.exit(1),
"quit": lambda context: sys.exit(1),
}
log = logging.getLogger("memgraph.tests.e2e")
def load_args():
parser = ArgumentParser()
parser.add_argument("--actions", required=False, help="What actions to run", default="")
parser.add_argument(
"--context-yaml",
required=False,
help="YAML file with the cluster description",
default="",
)
return parser.parse_args()
def _start_instance(name, args, log_file, queries, use_ssl, procdir):
mg_instance = MemgraphInstanceRunner(MEMGRAPH_BINARY, use_ssl)
MEMGRAPH_INSTANCES[name] = mg_instance
log_file_path = os.path.join(BUILD_DIR, "logs", log_file)
binary_args = args + ["--log-file", log_file_path]
if len(procdir) != 0:
binary_args.append("--query-modules-directory=" + procdir)
mg_instance.start(args=binary_args)
for query in queries:
mg_instance.query(query)
return mg_instance
def stop_all():
for mg_instance in MEMGRAPH_INSTANCES.values():
mg_instance.stop()
def stop_instance(context, name):
for key, _ in context.items():
if key != name:
continue
MEMGRAPH_INSTANCES[name].stop()
def stop(context, name):
if name != "all":
stop_instance(context, name)
return
stop_all()
@atexit.register
def cleanup():
stop_all()
def start_instance(context, name, procdir):
mg_instances = {}
for key, value in context.items():
if key != name:
continue
args = value["args"]
log_file = value["log_file"]
queries = []
if "setup_queries" in value:
queries = value["setup_queries"]
use_ssl = False
if "ssl" in value:
use_ssl = bool(value["ssl"])
value.pop("ssl")
instance = _start_instance(name, args, log_file, queries, use_ssl, procdir)
mg_instances[name] = instance
assert len(mg_instances) == 1
return mg_instances
def start_all(context, procdir=""):
mg_instances = {}
for key, _ in context.items():
mg_instances.update(start_instance(context, key, procdir))
return mg_instances
def start(context, name, procdir=""):
if name != "all":
return start_instance(context, name, procdir)
return start_all(context)
def info(context):
print("{:<15s}{:>6s}".format("NAME", "STATUS"))
for name, _ in context.items():
if name not in MEMGRAPH_INSTANCES:
continue
instance = MEMGRAPH_INSTANCES[name]
print("{:<15s}{:>6s}".format(name, "UP" if instance.is_running() else "DOWN"))
def process_actions(context, actions):
actions = actions.split(" ")
actions.reverse()
while len(actions) > 0:
name = actions.pop()
action = ACTIONS[name]
args_no = len(signature(action).parameters) - 1
assert (
args_no >= 0
), "Wrong action definition, each action has to accept at least 1 argument which is the context."
assert args_no <= 1, "Actions with more than one user argument are not yet supported"
if args_no == 0:
action(context)
if args_no == 1:
action(context, actions.pop())
if __name__ == "__main__":
args = load_args()
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(asctime)s %(name)s] %(message)s")
if args.context_yaml == "":
context = MEMGRAPH_INSTANCES_DESCRIPTION
else:
with open(args.context_yaml, "r") as f:
context = yaml.load(f, Loader=yaml.FullLoader)
if args.actions != "":
process_actions(context, args.actions)
sys.exit(0)
while True:
choice = input("ACTION>")
process_actions(context, choice)

View File

@ -112,4 +112,4 @@ class MemgraphInstanceRunner:
return
self.proc_mg.kill()
code = self.proc_mg.wait()
assert code == 9, "The killed Memgraph process exited with non-nine!"
assert code == -9, "The killed Memgraph process exited with non-nine!"

View File

@ -9,3 +9,6 @@ target_link_libraries(memgraph__e2e__replication__read_write_benchmark gflags js
copy_e2e_python_files(replication_show common.py)
copy_e2e_python_files(replication_show conftest.py)
copy_e2e_python_files(replication_show show.py)
copy_e2e_python_files(replication_show show_while_creating_invalid_state.py)
copy_e2e_python_files_from_parent_folder(replication_show ".." memgraph.py)
copy_e2e_python_files_from_parent_folder(replication_show ".." interactive_mg_runner.py)

View File

@ -12,6 +12,7 @@
import sys
import pytest
from common import execute_and_fetch_all
@ -26,21 +27,5 @@ def test_show_replication_role(port, role, connection):
assert data[0][0] == role
def test_show_replicas(connection):
cursor = connection(7687, "main").cursor()
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_column_names = {"name", "socket_address", "sync_mode", "timeout"}
actual_column_names = {x.name for x in cursor.description}
assert expected_column_names == actual_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0),
("replica_2", "127.0.0.1:10002", "sync", 1.0),
("replica_3", "127.0.0.1:10003", "async", None),
}
assert expected_data == actual_data
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -0,0 +1,121 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import atexit
import os
import pytest
import time
from common import execute_and_fetch_all
import interactive_mg_runner
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..")
)
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
MEMGRAPH_INSTANCES_DESCRIPTION = {
"replica_1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"log_file": "replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
},
"replica_2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
"log_file": "replica2.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
},
"replica_3": {
"args": ["--bolt-port", "7690", "--log-level=TRACE"],
"log_file": "replica3.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10003;"],
},
"replica_4": {
"args": ["--bolt-port", "7691", "--log-level=TRACE"],
"log_file": "replica4.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10004;"],
},
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001';",
"REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002';",
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003';",
"REGISTER REPLICA replica_4 ASYNC TO '127.0.0.1:10004';",
],
},
}
def test_show_replicas(connection):
# Goal of this test is to check the SHOW REPLICAS command.
# 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 1/ We check that all replicas have the correct state: they should all be ready.
# 2/ We drop one replica. It should not appear anymore in the SHOW REPLICAS command.
# 3/ We kill another replica. It should become invalid in the SHOW REPLICAS command.
# 0/
atexit.register(
interactive_mg_runner.stop_all
) # Needed in case the test fails due to an assert. One still want the instances to be stoped.
mg_instances = interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connection(7687, "main").cursor()
# 1/
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
EXPECTED_COLUMN_NAMES = {"name", "socket_address", "sync_mode", "timeout", "state"}
actual_column_names = {x.name for x in cursor.description}
assert EXPECTED_COLUMN_NAMES == actual_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 1.0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, "ready"),
("replica_4", "127.0.0.1:10004", "async", None, "ready"),
}
assert expected_data == actual_data
# 2/
execute_and_fetch_all(cursor, "DROP REPLICA replica_2")
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, "ready"),
("replica_4", "127.0.0.1:10004", "async", None, "ready"),
}
assert expected_data == actual_data
# 3/
mg_instances["replica_1"].kill()
mg_instances["replica_3"].kill()
mg_instances["replica_4"].stop()
# We leave some time for the main to realise the replicas are down.
time.sleep(2)
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, "invalid"),
("replica_3", "127.0.0.1:10003", "async", None, "invalid"),
("replica_4", "127.0.0.1:10004", "async", None, "invalid"),
}
assert expected_data == actual_data
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -74,3 +74,7 @@ workloads:
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'"
]
validation_queries: []
- name: "Show while creating invalid state"
binary: "tests/e2e/pytest_runner.sh"
args: ["replication/show_while_creating_invalid_state.py"]

View File

@ -18,12 +18,11 @@ from pathlib import Path
import yaml
from memgraph import MemgraphInstanceRunner
import interactive_mg_runner
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
BUILD_DIR = os.path.join(PROJECT_DIR, "build")
MEMGRAPH_BINARY = os.path.join(BUILD_DIR, "memgraph")
log = logging.getLogger("memgraph.tests.e2e")
@ -58,30 +57,22 @@ def run(args):
for mg_instance in mg_instances.values():
mg_instance.stop()
for name, config in workload["cluster"].items():
use_ssl = False
if "ssl" in config:
use_ssl = bool(config["ssl"])
config.pop("ssl")
mg_instance = MemgraphInstanceRunner(MEMGRAPH_BINARY, use_ssl)
mg_instances[name] = mg_instance
log_file_path = os.path.join(BUILD_DIR, "logs", config["log_file"])
binary_args = config["args"] + ["--log-file", log_file_path]
if "cluster" in workload:
procdir = ""
if "proc" in workload:
procdir = "--query-modules-directory=" + os.path.join(BUILD_DIR, workload["proc"])
binary_args.append(procdir)
mg_instance.start(args=binary_args)
for query in config.get("setup_queries", []):
mg_instance.query(query)
procdir = os.path.join(BUILD_DIR, workload["proc"])
mg_instances = interactive_mg_runner.start_all(workload["cluster"], procdir)
# Test.
mg_test_binary = os.path.join(BUILD_DIR, workload["binary"])
subprocess.run([mg_test_binary] + workload["args"], check=True, stderr=subprocess.STDOUT)
# Validation.
for name, config in workload["cluster"].items():
for validation in config.get("validation_queries", []):
mg_instance = mg_instances[name]
data = mg_instance.query(validation["query"])[0][0]
assert data == validation["expected"]
if "cluster" in workload:
for name, config in workload["cluster"].items():
for validation in config.get("validation_queries", []):
mg_instance = mg_instances[name]
data = mg_instance.query(validation["query"])[0][0]
assert data == validation["expected"]
cleanup()
log.info("%s PASSED.", workload_name)