Merge branch 'master' into fix-commit-if-sync-replica-down

This commit is contained in:
Marko Budiselić 2022-06-06 15:20:56 +02:00 committed by GitHub
commit ed71d01aa3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 235 additions and 21 deletions

View File

@ -88,4 +88,3 @@ CheckOptions:
- key: modernize-use-nullptr.NullMacros
value: 'NULL'
...

View File

@ -252,6 +252,11 @@ DEFINE_double(query_execution_timeout_sec, 600,
"Maximum allowed query execution time. Queries exceeding this "
"limit will be aborted. Value of 0 means no limit.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint64(replication_replica_check_frequency_sec, 1,
"The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: "
"The MAIN instance allocates a new thread for each REPLICA.");
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint64(
memory_limit, 0,
@ -1216,6 +1221,7 @@ int main(int argc, char **argv) {
&db,
{.query = {.allow_load_csv = FLAGS_allow_load_csv},
.execution_timeout_sec = FLAGS_query_execution_timeout_sec,
.replication_replica_check_frequency = std::chrono::seconds(FLAGS_replication_replica_check_frequency_sec),
.default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers,
.default_pulsar_service_url = FLAGS_pulsar_service_url,
.stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries,

View File

@ -21,6 +21,8 @@ struct InterpreterConfig {
// The default execution timeout is 10 minutes.
double execution_timeout_sec{600.0};
// The same as \ref memgraph::storage::replication::ReplicationClientConfig
std::chrono::seconds replication_replica_check_frequency{1};
std::string default_kafka_bootstrap_servers;
std::string default_pulsar_service_url;

View File

@ -160,7 +160,8 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
void RegisterReplica(const std::string &name, const std::string &socket_address,
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout) override {
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout,
const std::chrono::seconds replica_check_frequency) override {
if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) {
// replica can't register another replica
throw QueryRuntimeException("Replica can't register another replica!");
@ -182,8 +183,9 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
io::network::Endpoint::ParseSocketOrIpAddress(socket_address, query::kDefaultReplicationPort);
if (maybe_ip_and_port) {
auto [ip, port] = *maybe_ip_and_port;
auto ret =
db_->RegisterReplica(name, {std::move(ip), port}, repl_mode, {.timeout = timeout, .ssl = std::nullopt});
auto ret = db_->RegisterReplica(
name, {std::move(ip), port}, repl_mode,
{.timeout = timeout, .replica_check_frequency = replica_check_frequency, .ssl = std::nullopt});
if (ret.HasError()) {
throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name));
}
@ -448,7 +450,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
return callback;
}
case ReplicationQuery::Action::SHOW_REPLICATION_ROLE: {
callback.header = {"replication mode"};
callback.header = {"replication role"};
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}] {
auto mode = handler.ShowReplicationRole();
switch (mode) {
@ -467,6 +469,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
const auto &sync_mode = repl_query->sync_mode_;
auto socket_address = repl_query->socket_address_->Accept(evaluator);
auto timeout = EvaluateOptionalExpression(repl_query->timeout_, &evaluator);
const auto replica_check_frequency = interpreter_context->config.replication_replica_check_frequency;
std::optional<double> maybe_timeout;
if (timeout.IsDouble()) {
maybe_timeout = timeout.ValueDouble();
@ -474,8 +477,9 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
maybe_timeout = static_cast<double>(timeout.ValueInt());
}
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name, socket_address, sync_mode,
maybe_timeout]() mutable {
handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout);
maybe_timeout, replica_check_frequency]() mutable {
handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout,
replica_check_frequency);
return std::vector<std::vector<TypedValue>>();
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::REGISTER_REPLICA,
@ -512,7 +516,6 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
typed_replica.emplace_back(TypedValue("async"));
break;
}
typed_replica.emplace_back(TypedValue(static_cast<int64_t>(replica.sync_mode)));
if (replica.timeout) {
typed_replica.emplace_back(TypedValue(*replica.timeout));
} else {

View File

@ -137,7 +137,8 @@ class ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
virtual void RegisterReplica(const std::string &name, const std::string &socket_address,
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout) = 0;
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout,
const std::chrono::seconds replica_check_frequency) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual void DropReplica(const std::string &replica_name) = 0;

View File

@ -16,6 +16,10 @@
namespace memgraph::storage::replication {
struct ReplicationClientConfig {
std::optional<double> timeout;
// The default delay between main checking/pinging replicas is 1s because
// that seems like a reasonable timeframe in which main should notice a
// replica is down.
std::chrono::seconds replica_check_frequency{1};
struct SSL {
std::string key_file = "";

View File

@ -41,12 +41,49 @@ Storage::ReplicationClient::ReplicationClient(std::string name, Storage *storage
}
rpc_client_.emplace(endpoint, &*rpc_context_);
TryInitializeClient();
TryInitializeClientSync();
if (config.timeout && replica_state_ != replication::ReplicaState::INVALID) {
timeout_.emplace(*config.timeout);
timeout_dispatcher_.emplace();
}
// Help the user to get the most accurate replica state possible.
if (config.replica_check_frequency > std::chrono::seconds(0)) {
replica_checker_.Run("Replica Checker", config.replica_check_frequency, [&] { FrequentCheck(); });
}
}
void Storage::ReplicationClient::TryInitializeClientAsync() {
thread_pool_.AddTask([this] {
rpc_client_->Abort();
this->TryInitializeClientSync();
});
}
void Storage::ReplicationClient::FrequentCheck() {
const auto is_success = std::invoke([this]() {
try {
auto stream{rpc_client_->Stream<replication::FrequentHeartbeatRpc>()};
const auto response = stream.AwaitResponse();
return response.success;
} catch (const rpc::RpcFailedException &) {
return false;
}
});
// States: READY, REPLICATING, RECOVERY, INVALID
// If success && ready, replicating, recovery -> stay the same because something good is going on.
// If success && INVALID -> [it's possible that replica came back to life] -> TryInitializeClient.
// If fail -> [replica is not reachable at all] -> INVALID state.
// NOTE: TryInitializeClient might return nothing if there is a branching point.
// NOTE: The early return pattern simplified the code, but the behavior should be as explained.
if (!is_success) {
replica_state_.store(replication::ReplicaState::INVALID);
return;
}
if (replica_state_.load() == replication::ReplicaState::INVALID) {
TryInitializeClientAsync();
}
}
/// @throws rpc::RpcFailedException
@ -100,7 +137,7 @@ void Storage::ReplicationClient::InitializeClient() {
}
}
void Storage::ReplicationClient::TryInitializeClient() {
void Storage::ReplicationClient::TryInitializeClientSync() {
try {
InitializeClient();
} catch (const rpc::RpcFailedException &) {
@ -113,10 +150,7 @@ void Storage::ReplicationClient::TryInitializeClient() {
void Storage::ReplicationClient::HandleRpcFailure() {
spdlog::error(utils::MessageWithLink("Couldn't replicate data to {}.", name_, "https://memgr.ph/replication"));
thread_pool_.AddTask([this] {
rpc_client_->Abort();
this->TryInitializeClient();
});
TryInitializeClientAsync();
}
replication::SnapshotRes Storage::ReplicationClient::TransferSnapshot(const std::filesystem::path &path) {

View File

@ -142,16 +142,14 @@ class Storage::ReplicationClient {
std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker);
void FrequentCheck();
void InitializeClient();
void TryInitializeClient();
void TryInitializeClientSync();
void TryInitializeClientAsync();
void HandleRpcFailure();
std::string name_;
Storage *storage_;
std::optional<communication::ClientContext> rpc_context_;
std::optional<rpc::Client> rpc_client_;
@ -198,6 +196,8 @@ class Storage::ReplicationClient {
// to ignore concurrency problems inside the client.
utils::ThreadPool thread_pool_{1};
std::atomic<replication::ReplicaState> replica_state_{replication::ReplicaState::INVALID};
utils::Scheduler replica_checker_;
};
} // namespace memgraph::storage

View File

@ -60,6 +60,10 @@ Storage::ReplicationServer::ReplicationServer(Storage *storage, io::network::End
spdlog::debug("Received HeartbeatRpc");
this->HeartbeatHandler(req_reader, res_builder);
});
rpc_server_->Register<replication::FrequentHeartbeatRpc>([](auto *req_reader, auto *res_builder) {
spdlog::debug("Received FrequentHeartbeatRpc");
FrequentHeartbeatHandler(req_reader, res_builder);
});
rpc_server_->Register<replication::AppendDeltasRpc>([this](auto *req_reader, auto *res_builder) {
spdlog::debug("Received AppendDeltasRpc");
this->AppendDeltasHandler(req_reader, res_builder);
@ -86,6 +90,13 @@ void Storage::ReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::
slk::Save(res, res_builder);
}
void Storage::ReplicationServer::FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::FrequentHeartbeatReq req;
slk::Load(&req, req_reader);
replication::FrequentHeartbeatRes res{true};
slk::Save(res, res_builder);
}
void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::AppendDeltasReq req;
slk::Load(&req, req_reader);

View File

@ -29,6 +29,7 @@ class Storage::ReplicationServer {
private:
// RPC handlers
void HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder);
static void FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder);
void AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder);
void SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder);
void WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder);

View File

@ -43,6 +43,12 @@ cpp<#
(current-commit-timestamp :uint64_t)
(epoch-id "std::string"))))
;; FrequentHearthbeat is required because calling Heartbeat takes the storage lock.
;; Configured by `replication_replica_check_delay`.
(lcp:define-rpc frequent-heartbeat
(:request ())
(:response ((success :bool))))
(lcp:define-rpc snapshot
(:request ())
(:response

View File

@ -5,3 +5,7 @@ target_link_libraries(memgraph__e2e__replication__constraints gflags mgclient mg
add_executable(memgraph__e2e__replication__read_write_benchmark read_write_benchmark.cpp)
target_link_libraries(memgraph__e2e__replication__read_write_benchmark gflags json mgclient mg-utils mg-io Threads::Threads)
copy_e2e_python_files(replication_show common.py)
copy_e2e_python_files(replication_show conftest.py)
copy_e2e_python_files(replication_show show.py)

View File

@ -0,0 +1,26 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import mgclient
import typing
def execute_and_fetch_all(
cursor: mgclient.Cursor, query: str, params: dict = {}
) -> typing.List[tuple]:
cursor.execute(query, params)
return cursor.fetchall()
def connect(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(**kwargs)
connection.autocommit = True
return connection

View File

@ -0,0 +1,44 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import pytest
from common import execute_and_fetch_all, connect
# The fixture here is more complex because the connection has to be
# parameterized based on the test parameters (info has to be available on both
# sides).
#
# https://docs.pytest.org/en/latest/example/parametrize.html#indirect-parametrization
# is not an elegant/feasible solution here.
#
# The solution was independently developed and then I stumbled upon the same
# approach here https://stackoverflow.com/a/68286553/4888809 which I think is
# optimal.
@pytest.fixture(scope="function")
def connection():
connection_holder = None
role_holder = None
def inner_connection(port, role):
nonlocal connection_holder, role_holder
connection_holder = connect(host="localhost", port=port)
role_holder = role
return connection_holder
yield inner_connection
# Only main instance can be cleaned up because replicas do NOT accept
# writes.
if role_holder == "main":
cursor = connection_holder.cursor()
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n;")

46
tests/e2e/replication/show.py Executable file
View File

@ -0,0 +1,46 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import pytest
from common import execute_and_fetch_all
@pytest.mark.parametrize(
"port, role",
[(7687, "main"), (7688, "replica"), (7689, "replica"), (7690, "replica")],
)
def test_show_replication_role(port, role, connection):
cursor = connection(port, role).cursor()
data = execute_and_fetch_all(cursor, "SHOW REPLICATION ROLE;")
assert cursor.description[0].name == "replication role"
assert data[0][0] == role
def test_show_replicas(connection):
cursor = connection(7687, "main").cursor()
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_column_names = {"name", "socket_address", "sync_mode", "timeout"}
actual_column_names = {x.name for x in cursor.description}
assert expected_column_names == actual_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0),
("replica_2", "127.0.0.1:10002", "sync", 1.0),
("replica_3", "127.0.0.1:10003", "async", None),
}
assert expected_data == actual_data
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -46,4 +46,31 @@ workloads:
args: []
<<: *template_cluster
- name: "Show"
binary: "tests/e2e/pytest_runner.sh"
args: ["replication/show.py"]
cluster:
replica_1:
args: ["--bolt-port", "7688", "--log-level=TRACE"]
log_file: "replication-e2e-replica1.log"
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"]
validation_queries: []
replica_2:
args: ["--bolt-port", "7689", "--log-level=TRACE"]
log_file: "replication-e2e-replica2.log"
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"]
validation_queries: []
replica_3:
args: ["--bolt-port", "7690", "--log-level=TRACE"]
log_file: "replication-e2e-replica3.log"
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10003;"]
validation_queries: []
main:
args: ["--bolt-port", "7687", "--log-level=TRACE"]
log_file: "replication-e2e-main.log"
setup_queries: [
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001'",
"REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'",
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'"
]
validation_queries: []