System replication experimental flag (#1702)
- Remove the compile time control - Introduce the runtime control flag New flag `--experimental-enabled=system-replication`
This commit is contained in:
parent
4a7c7f0898
commit
f48151576b
64
.github/workflows/diff.yaml
vendored
64
.github/workflows/diff.yaml
vendored
@ -383,70 +383,6 @@ jobs:
|
||||
# multiple paths could be defined
|
||||
build/logs
|
||||
|
||||
experimental_build_mt:
|
||||
name: "MultiTenancy replication build"
|
||||
runs-on: [self-hosted, Linux, X64, Diff]
|
||||
env:
|
||||
THREADS: 24
|
||||
MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }}
|
||||
MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }}
|
||||
|
||||
steps:
|
||||
- name: Set up repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
# Number of commits to fetch. `0` indicates all history for all
|
||||
# branches and tags. (default: 1)
|
||||
fetch-depth: 0
|
||||
|
||||
|
||||
- name: Build release binaries
|
||||
run: |
|
||||
# Activate toolchain.
|
||||
source /opt/toolchain-v4/activate
|
||||
|
||||
# Initialize dependencies.
|
||||
./init
|
||||
|
||||
# Build MT replication experimental binaries.
|
||||
cd build
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -D MG_EXPERIMENTAL_REPLICATION_MULTITENANCY=ON ..
|
||||
make -j$THREADS
|
||||
|
||||
- name: Run unit tests
|
||||
run: |
|
||||
# Activate toolchain.
|
||||
source /opt/toolchain-v4/activate
|
||||
|
||||
# Run unit tests.
|
||||
cd build
|
||||
ctest -R memgraph__unit --output-on-failure -j$THREADS
|
||||
|
||||
- name: Run e2e tests
|
||||
run: |
|
||||
cd tests
|
||||
./setup.sh /opt/toolchain-v4/activate
|
||||
source ve3/bin/activate_e2e
|
||||
cd e2e
|
||||
|
||||
# Just the replication based e2e tests
|
||||
./run.sh "Replicate multitenancy"
|
||||
./run.sh "Show"
|
||||
./run.sh "Show while creating invalid state"
|
||||
./run.sh "Delete edge replication"
|
||||
./run.sh "Read-write benchmark"
|
||||
./run.sh "Index replication"
|
||||
./run.sh "Constraints"
|
||||
|
||||
- name: Save test data
|
||||
uses: actions/upload-artifact@v4
|
||||
if: always()
|
||||
with:
|
||||
name: "Test data(MultiTenancy replication build)"
|
||||
path: |
|
||||
# multiple paths could be defined
|
||||
build/logs
|
||||
|
||||
release_jepsen_test:
|
||||
name: "Release Jepsen Test"
|
||||
runs-on: [self-hosted, Linux, X64, Debian10, JepsenControl]
|
||||
|
@ -291,16 +291,6 @@ option(TSAN "Build with Thread Sanitizer. To get a reasonable performance option
|
||||
option(UBSAN "Build with Undefined Behaviour Sanitizer" OFF)
|
||||
|
||||
# Build feature flags
|
||||
option(MG_EXPERIMENTAL_REPLICATION_MULTITENANCY "Feature flag for experimental replicaition of multitenacy" OFF)
|
||||
|
||||
if (NOT MG_ENTERPRISE AND MG_EXPERIMENTAL_REPLICATION_MULTITENANCY)
|
||||
set(MG_EXPERIMENTAL_REPLICATION_MULTITENANCY OFF)
|
||||
message(FATAL_ERROR "MG_EXPERIMENTAL_REPLICATION_MULTITENANCY with community edition build isn't possible")
|
||||
endif ()
|
||||
|
||||
if (MG_EXPERIMENTAL_REPLICATION_MULTITENANCY)
|
||||
add_compile_definitions(MG_EXPERIMENTAL_REPLICATION_MULTITENANCY)
|
||||
endif ()
|
||||
|
||||
if (TEST_COVERAGE)
|
||||
string(TOLOWER ${CMAKE_BUILD_TYPE} lower_build_type)
|
||||
|
@ -16,10 +16,4 @@ namespace memgraph::dbms {
|
||||
constexpr std::string_view kDefaultDB = "memgraph"; //!< Name of the default database
|
||||
constexpr std::string_view kMultiTenantDir = "databases"; //!< Name of the multi-tenant directory
|
||||
|
||||
#ifdef MG_EXPERIMENTAL_REPLICATION_MULTITENANCY
|
||||
constexpr bool allow_mt_repl = true;
|
||||
#else
|
||||
constexpr bool allow_mt_repl = false;
|
||||
#endif
|
||||
|
||||
} // namespace memgraph::dbms
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
#include "dbms/constants.hpp"
|
||||
#include "dbms/global.hpp"
|
||||
#include "flags/experimental.hpp"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include "system/include/system/system.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
@ -158,9 +159,9 @@ struct Durability {
|
||||
}
|
||||
};
|
||||
|
||||
DbmsHandler::DbmsHandler(storage::Config config, memgraph::system::System &system,
|
||||
replication::ReplicationState &repl_state, auth::SynchedAuth &auth, bool recovery_on_startup)
|
||||
: default_config_{std::move(config)}, auth_{auth}, repl_state_{repl_state}, system_{&system} {
|
||||
DbmsHandler::DbmsHandler(storage::Config config, replication::ReplicationState &repl_state, auth::SynchedAuth &auth,
|
||||
bool recovery_on_startup)
|
||||
: default_config_{std::move(config)}, auth_{auth}, repl_state_{repl_state} {
|
||||
// TODO: Decouple storage config from dbms config
|
||||
// TODO: Save individual db configs inside the kvstore and restore from there
|
||||
|
||||
@ -419,7 +420,10 @@ void DbmsHandler::UpdateDurability(const storage::Config &config, std::optional<
|
||||
#endif
|
||||
|
||||
void DbmsHandler::RecoverStorageReplication(DatabaseAccess db_acc, replication::RoleMainData &role_main_data) {
|
||||
if (allow_mt_repl || db_acc->name() == dbms::kDefaultDB) {
|
||||
using enum memgraph::flags::Experiments;
|
||||
auto const is_enterprise = license::global_license_checker.IsEnterpriseValidFast();
|
||||
auto experimental_system_replication = flags::AreExperimentsEnabled(SYSTEM_REPLICATION);
|
||||
if ((is_enterprise && experimental_system_replication) || db_acc->name() == dbms::kDefaultDB) {
|
||||
// Handle global replication state
|
||||
spdlog::info("Replication configuration will be stored and will be automatically restored in case of a crash.");
|
||||
// RECOVER REPLICA CONNECTIONS
|
||||
|
@ -107,8 +107,7 @@ class DbmsHandler {
|
||||
* @param auth pointer to the global authenticator
|
||||
* @param recovery_on_startup restore databases (and its content) and authentication data
|
||||
*/
|
||||
DbmsHandler(storage::Config config, memgraph::system::System &system, replication::ReplicationState &repl_state,
|
||||
auth::SynchedAuth &auth,
|
||||
DbmsHandler(storage::Config config, replication::ReplicationState &repl_state, auth::SynchedAuth &auth,
|
||||
bool recovery_on_startup); // TODO If more arguments are added use a config struct
|
||||
#else
|
||||
/**
|
||||
@ -116,9 +115,8 @@ class DbmsHandler {
|
||||
*
|
||||
* @param configs storage configuration
|
||||
*/
|
||||
DbmsHandler(storage::Config config, memgraph::system::System &system, replication::ReplicationState &repl_state)
|
||||
DbmsHandler(storage::Config config, replication::ReplicationState &repl_state)
|
||||
: repl_state_{repl_state},
|
||||
system_{&system},
|
||||
db_gatekeeper_{[&] {
|
||||
config.salient.name = kDefaultDB;
|
||||
return std::move(config);
|
||||
@ -272,6 +270,20 @@ class DbmsHandler {
|
||||
// coordination::CoordinatorState &CoordinatorState() { return coordinator_state_; }
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Return all active databases.
|
||||
*
|
||||
* @return std::vector<std::string>
|
||||
*/
|
||||
auto Count() const -> std::size_t {
|
||||
#ifdef MG_ENTERPRISE
|
||||
std::shared_lock<LockT> rd(lock_);
|
||||
return db_handler_.size();
|
||||
#else
|
||||
return 1;
|
||||
#endif
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Return the statistics all databases.
|
||||
*
|
||||
@ -587,9 +599,6 @@ class DbmsHandler {
|
||||
// current replication role. TODO: make Database Access explicit about the role and remove this from
|
||||
// dbms stuff
|
||||
replication::ReplicationState &repl_state_; //!< Ref to global replication state
|
||||
public:
|
||||
// TODO fix to be non public/remove from dbms....maybe
|
||||
system::System *system_;
|
||||
|
||||
#ifndef MG_ENTERPRISE
|
||||
mutable utils::Gatekeeper<Database> db_gatekeeper_; //!< Single databases gatekeeper
|
||||
|
@ -144,6 +144,8 @@ class Handler {
|
||||
auto cbegin() const { return items_.cbegin(); }
|
||||
auto cend() const { return items_.cend(); }
|
||||
|
||||
auto size() const { return items_.size(); }
|
||||
|
||||
struct string_hash {
|
||||
using is_transparent = void;
|
||||
[[nodiscard]] size_t operator()(const char *s) const { return std::hash<std::string_view>{}(s); }
|
||||
|
@ -1,12 +1,17 @@
|
||||
add_library(mg-flags STATIC audit.cpp
|
||||
bolt.cpp
|
||||
general.cpp
|
||||
isolation_level.cpp
|
||||
log_level.cpp
|
||||
memory_limit.cpp
|
||||
run_time_configurable.cpp
|
||||
storage_mode.cpp
|
||||
query.cpp
|
||||
replication.cpp)
|
||||
target_include_directories(mg-flags PUBLIC ${CMAKE_SOURCE_DIR}/include)
|
||||
target_link_libraries(mg-flags PUBLIC spdlog::spdlog mg-settings mg-utils)
|
||||
add_library(mg-flags STATIC
|
||||
audit.cpp
|
||||
bolt.cpp
|
||||
general.cpp
|
||||
isolation_level.cpp
|
||||
log_level.cpp
|
||||
memory_limit.cpp
|
||||
run_time_configurable.cpp
|
||||
storage_mode.cpp
|
||||
query.cpp
|
||||
replication.cpp
|
||||
experimental.cpp
|
||||
experimental.hpp)
|
||||
target_include_directories(mg-flags PUBLIC include)
|
||||
target_link_libraries(mg-flags
|
||||
PUBLIC spdlog::spdlog mg-settings mg-utils
|
||||
PRIVATE lib::rangev3)
|
||||
|
@ -12,6 +12,7 @@
|
||||
|
||||
#include "flags/audit.hpp"
|
||||
#include "flags/bolt.hpp"
|
||||
#include "flags/experimental.hpp"
|
||||
#include "flags/general.hpp"
|
||||
#include "flags/isolation_level.hpp"
|
||||
#include "flags/log_level.hpp"
|
||||
|
67
src/flags/experimental.cpp
Normal file
67
src/flags/experimental.cpp
Normal file
@ -0,0 +1,67 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "flags/experimental.hpp"
|
||||
#include "range/v3/all.hpp"
|
||||
#include "utils/string.hpp"
|
||||
|
||||
#include <map>
|
||||
#include <string_view>
|
||||
|
||||
// Bolt server flags.
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_string(experimental_enabled, "",
|
||||
"Experimental features to be used, comma seperated. Options [system-replication]");
|
||||
|
||||
using namespace std::string_view_literals;
|
||||
|
||||
namespace memgraph::flags {
|
||||
|
||||
auto const mapping = std::map{std::pair{"system-replication"sv, Experiments::SYSTEM_REPLICATION}};
|
||||
|
||||
auto ExperimentsInstance() -> Experiments & {
|
||||
static auto instance = Experiments{};
|
||||
return instance;
|
||||
}
|
||||
|
||||
bool AreExperimentsEnabled(Experiments experiments) {
|
||||
using t = std::underlying_type_t<Experiments>;
|
||||
|
||||
auto actual = static_cast<t>(ExperimentsInstance());
|
||||
auto check = static_cast<t>(experiments);
|
||||
|
||||
return (actual & check) == check;
|
||||
}
|
||||
|
||||
void InitializeExperimental() {
|
||||
namespace rv = ranges::views;
|
||||
|
||||
auto const connonicalize_string = [](auto &&rng) {
|
||||
auto const is_space = [](auto c) { return c == ' '; };
|
||||
auto const to_lower = [](unsigned char c) { return std::tolower(c); };
|
||||
|
||||
return rng | rv::drop_while(is_space) | rv::take_while(std::not_fn(is_space)) | rv::transform(to_lower) |
|
||||
ranges::to<std::string>;
|
||||
};
|
||||
|
||||
auto const mapping_end = mapping.cend();
|
||||
using underlying_type = std::underlying_type_t<Experiments>;
|
||||
auto to_set = underlying_type{};
|
||||
for (auto &&experiment : FLAGS_experimental_enabled | rv::split(',') | rv::transform(connonicalize_string)) {
|
||||
if (auto it = mapping.find(experiment); it != mapping_end) {
|
||||
to_set |= static_cast<underlying_type>(it->second);
|
||||
}
|
||||
}
|
||||
|
||||
ExperimentsInstance() = static_cast<Experiments>(to_set);
|
||||
}
|
||||
|
||||
} // namespace memgraph::flags
|
32
src/flags/experimental.hpp
Normal file
32
src/flags/experimental.hpp
Normal file
@ -0,0 +1,32 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "gflags/gflags.h"
|
||||
|
||||
// Short help flag.
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DECLARE_string(experimental_enabled);
|
||||
|
||||
namespace memgraph::flags {
|
||||
|
||||
// Each bit is an enabled experiment
|
||||
// old experiments can be reused once code cleanup has happened
|
||||
enum class Experiments : uint8_t {
|
||||
SYSTEM_REPLICATION = 1 << 0,
|
||||
};
|
||||
|
||||
bool AreExperimentsEnabled(Experiments experiments);
|
||||
|
||||
void InitializeExperimental();
|
||||
|
||||
} // namespace memgraph::flags
|
@ -134,6 +134,7 @@ int main(int argc, char **argv) {
|
||||
}
|
||||
|
||||
memgraph::flags::InitializeLogger();
|
||||
memgraph::flags::InitializeExperimental();
|
||||
|
||||
// Unhandled exception handler init.
|
||||
std::set_terminate(&memgraph::utils::TerminateHandler);
|
||||
@ -396,7 +397,7 @@ int main(int argc, char **argv) {
|
||||
memgraph::coordination::CoordinatorState coordinator_state;
|
||||
#endif
|
||||
|
||||
memgraph::dbms::DbmsHandler dbms_handler(db_config, system, repl_state
|
||||
memgraph::dbms::DbmsHandler dbms_handler(db_config, repl_state
|
||||
#ifdef MG_ENTERPRISE
|
||||
,
|
||||
auth_, FLAGS_data_recovery_on_startup
|
||||
@ -409,7 +410,7 @@ int main(int argc, char **argv) {
|
||||
auto replication_handler = memgraph::replication::ReplicationHandler{repl_state, dbms_handler
|
||||
#ifdef MG_ENTERPRISE
|
||||
,
|
||||
&system, auth_
|
||||
system, auth_
|
||||
#endif
|
||||
};
|
||||
|
||||
|
@ -109,6 +109,7 @@
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
#include "coordination/constants.hpp"
|
||||
#include "flags/experimental.hpp"
|
||||
#endif
|
||||
|
||||
namespace memgraph::metrics {
|
||||
@ -3881,7 +3882,9 @@ PreparedQuery PrepareMultiDatabaseQuery(ParsedQuery parsed_query, CurrentDB &cur
|
||||
if (current_db.in_explicit_db_) {
|
||||
throw QueryException("Database switching is prohibited if session explicitly defines the used database");
|
||||
}
|
||||
if (!dbms::allow_mt_repl && is_replica) {
|
||||
|
||||
using enum memgraph::flags::Experiments;
|
||||
if (!flags::AreExperimentsEnabled(SYSTEM_REPLICATION) && is_replica) {
|
||||
throw QueryException("Query forbidden on the replica!");
|
||||
}
|
||||
return PreparedQuery{{"STATUS"},
|
||||
@ -4535,7 +4538,8 @@ void Interpreter::Commit() {
|
||||
auto const main_commit = [&](replication::RoleMainData &mainData) {
|
||||
// Only enterprise can do system replication
|
||||
#ifdef MG_ENTERPRISE
|
||||
if (license::global_license_checker.IsEnterpriseValidFast()) {
|
||||
using enum memgraph::flags::Experiments;
|
||||
if (flags::AreExperimentsEnabled(SYSTEM_REPLICATION) && license::global_license_checker.IsEnterpriseValidFast()) {
|
||||
return system_transaction_->Commit(memgraph::system::DoReplication{mainData});
|
||||
}
|
||||
#endif
|
||||
|
@ -12,6 +12,7 @@
|
||||
|
||||
#include "auth/auth.hpp"
|
||||
#include "dbms/dbms_handler.hpp"
|
||||
#include "flags/experimental.hpp"
|
||||
#include "replication/include/replication/state.hpp"
|
||||
#include "replication_handler/system_rpc.hpp"
|
||||
#include "utils/result.hpp"
|
||||
@ -22,8 +23,8 @@ inline std::optional<query::RegisterReplicaError> HandleRegisterReplicaStatus(
|
||||
utils::BasicResult<replication::RegisterReplicaError, replication::ReplicationClient *> &instance_client);
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
void StartReplicaClient(replication::ReplicationClient &client, dbms::DbmsHandler &dbms_handler, utils::UUID main_uuid,
|
||||
system::System *system, auth::SynchedAuth &auth);
|
||||
void StartReplicaClient(replication::ReplicationClient &client, system::System &system, dbms::DbmsHandler &dbms_handler,
|
||||
utils::UUID main_uuid, auth::SynchedAuth &auth);
|
||||
#else
|
||||
void StartReplicaClient(replication::ReplicationClient &client, dbms::DbmsHandler &dbms_handler, utils::UUID main_uuid);
|
||||
#endif
|
||||
@ -33,8 +34,8 @@ void StartReplicaClient(replication::ReplicationClient &client, dbms::DbmsHandle
|
||||
// When being called by interpreter no need to gain lock, it should already be under a system transaction
|
||||
// But concurrently the FrequentCheck is running and will need to lock before reading last_committed_system_timestamp_
|
||||
template <bool REQUIRE_LOCK = false>
|
||||
void SystemRestore(replication::ReplicationClient &client, dbms::DbmsHandler &dbms_handler,
|
||||
const utils::UUID &main_uuid, system::System *system, auth::SynchedAuth &auth) {
|
||||
void SystemRestore(replication::ReplicationClient &client, system::System &system, dbms::DbmsHandler &dbms_handler,
|
||||
const utils::UUID &main_uuid, auth::SynchedAuth &auth) {
|
||||
// Check if system is up to date
|
||||
if (client.state_.WithLock(
|
||||
[](auto &state) { return state == memgraph::replication::ReplicationClient::State::READY; }))
|
||||
@ -42,6 +43,10 @@ void SystemRestore(replication::ReplicationClient &client, dbms::DbmsHandler &db
|
||||
|
||||
// Try to recover...
|
||||
{
|
||||
using enum memgraph::flags::Experiments;
|
||||
bool full_system_replication =
|
||||
flags::AreExperimentsEnabled(SYSTEM_REPLICATION) && license::global_license_checker.IsEnterpriseValidFast();
|
||||
// We still need to system replicate
|
||||
struct DbInfo {
|
||||
std::vector<storage::SalientConfig> configs;
|
||||
uint64_t last_committed_timestamp;
|
||||
@ -49,25 +54,25 @@ void SystemRestore(replication::ReplicationClient &client, dbms::DbmsHandler &db
|
||||
DbInfo db_info = std::invoke([&] {
|
||||
auto guard = std::invoke([&]() -> std::optional<memgraph::system::TransactionGuard> {
|
||||
if constexpr (REQUIRE_LOCK) {
|
||||
return system->GenTransactionGuard();
|
||||
return system.GenTransactionGuard();
|
||||
}
|
||||
return std::nullopt;
|
||||
});
|
||||
|
||||
if (license::global_license_checker.IsEnterpriseValidFast()) {
|
||||
if (full_system_replication) {
|
||||
auto configs = std::vector<storage::SalientConfig>{};
|
||||
dbms_handler.ForEach([&configs](dbms::DatabaseAccess acc) { configs.emplace_back(acc->config().salient); });
|
||||
// TODO: This is `SystemRestore` maybe DbInfo is incorrect as it will need Auth also
|
||||
return DbInfo{configs, system->LastCommittedSystemTimestamp()};
|
||||
return DbInfo{configs, system.LastCommittedSystemTimestamp()};
|
||||
}
|
||||
|
||||
// No license -> send only default config
|
||||
return DbInfo{{dbms_handler.Get()->config().salient}, system->LastCommittedSystemTimestamp()};
|
||||
return DbInfo{{dbms_handler.Get()->config().salient}, system.LastCommittedSystemTimestamp()};
|
||||
});
|
||||
try {
|
||||
auto stream = std::invoke([&]() {
|
||||
// Handle only default database is no license
|
||||
if (!license::global_license_checker.IsEnterpriseValidFast()) {
|
||||
if (!full_system_replication) {
|
||||
return client.rpc_client_.Stream<replication::SystemRecoveryRpc>(
|
||||
main_uuid, db_info.last_committed_timestamp, std::move(db_info.configs), auth::Auth::Config{},
|
||||
std::vector<auth::User>{}, std::vector<auth::Role>{});
|
||||
@ -98,7 +103,7 @@ void SystemRestore(replication::ReplicationClient &client, dbms::DbmsHandler &db
|
||||
struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
|
||||
#ifdef MG_ENTERPRISE
|
||||
explicit ReplicationHandler(memgraph::replication::ReplicationState &repl_state,
|
||||
memgraph::dbms::DbmsHandler &dbms_handler, memgraph::system::System *system,
|
||||
memgraph::dbms::DbmsHandler &dbms_handler, memgraph::system::System &system,
|
||||
memgraph::auth::SynchedAuth &auth);
|
||||
#else
|
||||
explicit ReplicationHandler(memgraph::replication::ReplicationState &repl_state,
|
||||
@ -155,7 +160,9 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
|
||||
}
|
||||
}
|
||||
|
||||
if (!memgraph::dbms::allow_mt_repl && dbms_handler_.All().size() > 1) {
|
||||
using enum memgraph::flags::Experiments;
|
||||
bool system_replication_enabled = flags::AreExperimentsEnabled(SYSTEM_REPLICATION);
|
||||
if (!system_replication_enabled && dbms_handler_.Count() > 1) {
|
||||
spdlog::warn("Multi-tenant replication is currently not supported!");
|
||||
}
|
||||
const auto main_uuid =
|
||||
@ -170,7 +177,7 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
// Update system before enabling individual storage <-> replica clients
|
||||
SystemRestore(*maybe_client.GetValue(), dbms_handler_, main_uuid, system_, auth_);
|
||||
SystemRestore(*maybe_client.GetValue(), system_, dbms_handler_, main_uuid, auth_);
|
||||
#endif
|
||||
|
||||
const auto dbms_error = HandleRegisterReplicaStatus(maybe_client);
|
||||
@ -183,7 +190,7 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
|
||||
// Add database specific clients (NOTE Currently all databases are connected to each replica)
|
||||
dbms_handler_.ForEach([&](dbms::DatabaseAccess db_acc) {
|
||||
auto *storage = db_acc->storage();
|
||||
if (!dbms::allow_mt_repl && storage->name() != dbms::kDefaultDB) {
|
||||
if (!system_replication_enabled && storage->name() != dbms::kDefaultDB) {
|
||||
return;
|
||||
}
|
||||
// TODO: ATM only IN_MEMORY_TRANSACTIONAL, fix other modes
|
||||
@ -215,7 +222,7 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
|
||||
|
||||
// No client error, start instance level client
|
||||
#ifdef MG_ENTERPRISE
|
||||
StartReplicaClient(*instance_client_ptr, dbms_handler_, main_uuid, system_, auth_);
|
||||
StartReplicaClient(*instance_client_ptr, system_, dbms_handler_, main_uuid, auth_);
|
||||
#else
|
||||
StartReplicaClient(*instance_client_ptr, dbms_handler_, main_uuid);
|
||||
#endif
|
||||
@ -226,7 +233,7 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
|
||||
memgraph::dbms::DbmsHandler &dbms_handler_;
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
memgraph::system::System *system_;
|
||||
memgraph::system::System &system_;
|
||||
memgraph::auth::SynchedAuth &auth_;
|
||||
#endif
|
||||
};
|
||||
|
@ -27,11 +27,16 @@ inline void LogWrongMain(const std::optional<utils::UUID> ¤t_main_uuid, co
|
||||
#ifdef MG_ENTERPRISE
|
||||
void SystemHeartbeatHandler(uint64_t ts, const std::optional<utils::UUID> ¤t_main_uuid, slk::Reader *req_reader,
|
||||
slk::Builder *res_builder);
|
||||
|
||||
void SystemRecoveryHandler(memgraph::system::ReplicaHandlerAccessToState &system_state_access,
|
||||
std::optional<utils::UUID> ¤t_main_uuid, dbms::DbmsHandler &dbms_handler,
|
||||
auth::SynchedAuth &auth, slk::Reader *req_reader, slk::Builder *res_builder);
|
||||
void Register(replication::RoleReplicaData const &data, dbms::DbmsHandler &dbms_handler, auth::SynchedAuth &auth);
|
||||
bool StartRpcServer(dbms::DbmsHandler &dbms_handler, replication::RoleReplicaData &data, auth::SynchedAuth &auth);
|
||||
|
||||
void Register(replication::RoleReplicaData const &data, system::System &system, dbms::DbmsHandler &dbms_handler,
|
||||
auth::SynchedAuth &auth);
|
||||
|
||||
bool StartRpcServer(dbms::DbmsHandler &dbms_handler, replication::RoleReplicaData &data, auth::SynchedAuth &auth,
|
||||
system::System &system);
|
||||
#else
|
||||
bool StartRpcServer(dbms::DbmsHandler &dbms_handler, replication::RoleReplicaData &data);
|
||||
#endif
|
||||
|
@ -17,25 +17,25 @@ namespace memgraph::replication {
|
||||
|
||||
namespace {
|
||||
#ifdef MG_ENTERPRISE
|
||||
void RecoverReplication(memgraph::replication::ReplicationState &repl_state, memgraph::system::System *system,
|
||||
void RecoverReplication(memgraph::replication::ReplicationState &repl_state, memgraph::system::System &system,
|
||||
memgraph::dbms::DbmsHandler &dbms_handler, memgraph::auth::SynchedAuth &auth) {
|
||||
/*
|
||||
* REPLICATION RECOVERY AND STARTUP
|
||||
*/
|
||||
|
||||
// Startup replication state (if recovered at startup)
|
||||
auto replica = [&dbms_handler, &auth](memgraph::replication::RoleReplicaData &data) {
|
||||
return StartRpcServer(dbms_handler, data, auth);
|
||||
auto replica = [&dbms_handler, &auth, &system](memgraph::replication::RoleReplicaData &data) {
|
||||
return memgraph::replication::StartRpcServer(dbms_handler, data, auth, system);
|
||||
};
|
||||
|
||||
// Replication recovery and frequent check start
|
||||
auto main = [system, &dbms_handler, &auth](memgraph::replication::RoleMainData &mainData) {
|
||||
auto main = [&system, &dbms_handler, &auth](memgraph::replication::RoleMainData &mainData) {
|
||||
for (auto &client : mainData.registered_replicas_) {
|
||||
if (client.try_set_uuid &&
|
||||
replication_coordination_glue::SendSwapMainUUIDRpc(client.rpc_client_, mainData.uuid_)) {
|
||||
client.try_set_uuid = false;
|
||||
}
|
||||
SystemRestore(client, dbms_handler, mainData.uuid_, system, auth);
|
||||
SystemRestore(client, system, dbms_handler, mainData.uuid_, auth);
|
||||
}
|
||||
// DBMS here
|
||||
dbms_handler.ForEach([&mainData](memgraph::dbms::DatabaseAccess db_acc) {
|
||||
@ -43,7 +43,7 @@ void RecoverReplication(memgraph::replication::ReplicationState &repl_state, mem
|
||||
});
|
||||
|
||||
for (auto &client : mainData.registered_replicas_) {
|
||||
StartReplicaClient(client, dbms_handler, mainData.uuid_, system, auth);
|
||||
StartReplicaClient(client, system, dbms_handler, mainData.uuid_, auth);
|
||||
}
|
||||
|
||||
// Warning
|
||||
@ -120,8 +120,8 @@ inline std::optional<query::RegisterReplicaError> HandleRegisterReplicaStatus(
|
||||
}
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
void StartReplicaClient(replication::ReplicationClient &client, dbms::DbmsHandler &dbms_handler, utils::UUID main_uuid,
|
||||
system::System *system, auth::SynchedAuth &auth) {
|
||||
void StartReplicaClient(replication::ReplicationClient &client, system::System &system, dbms::DbmsHandler &dbms_handler,
|
||||
utils::UUID main_uuid, auth::SynchedAuth &auth) {
|
||||
#else
|
||||
void StartReplicaClient(replication::ReplicationClient &client, dbms::DbmsHandler &dbms_handler,
|
||||
utils::UUID main_uuid) {
|
||||
@ -129,12 +129,8 @@ void StartReplicaClient(replication::ReplicationClient &client, dbms::DbmsHandle
|
||||
// No client error, start instance level client
|
||||
auto const &endpoint = client.rpc_client_.Endpoint();
|
||||
spdlog::trace("Replication client started at: {}:{}", endpoint.address, endpoint.port);
|
||||
client.StartFrequentCheck([&,
|
||||
#ifdef MG_ENTERPRISE
|
||||
system = system,
|
||||
#endif
|
||||
license = license::global_license_checker.IsEnterpriseValidFast(),
|
||||
main_uuid](bool reconnect, replication::ReplicationClient &client) mutable {
|
||||
client.StartFrequentCheck([&, license = license::global_license_checker.IsEnterpriseValidFast(), main_uuid](
|
||||
bool reconnect, replication::ReplicationClient &client) mutable {
|
||||
if (client.try_set_uuid &&
|
||||
memgraph::replication_coordination_glue::SendSwapMainUUIDRpc(client.rpc_client_, main_uuid)) {
|
||||
client.try_set_uuid = false;
|
||||
@ -151,7 +147,7 @@ void StartReplicaClient(replication::ReplicationClient &client, dbms::DbmsHandle
|
||||
client.state_.WithLock([](auto &state) { state = memgraph::replication::ReplicationClient::State::BEHIND; });
|
||||
}
|
||||
#ifdef MG_ENTERPRISE
|
||||
SystemRestore<true>(client, dbms_handler, main_uuid, system, auth);
|
||||
SystemRestore<true>(client, system, dbms_handler, main_uuid, auth);
|
||||
#endif
|
||||
// Check if any database has been left behind
|
||||
dbms_handler.ForEach([&name = client.name_, reconnect](dbms::DatabaseAccess db_acc) {
|
||||
@ -168,7 +164,7 @@ void StartReplicaClient(replication::ReplicationClient &client, dbms::DbmsHandle
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
ReplicationHandler::ReplicationHandler(memgraph::replication::ReplicationState &repl_state,
|
||||
memgraph::dbms::DbmsHandler &dbms_handler, memgraph::system::System *system,
|
||||
memgraph::dbms::DbmsHandler &dbms_handler, memgraph::system::System &system,
|
||||
memgraph::auth::SynchedAuth &auth)
|
||||
: repl_state_{repl_state}, dbms_handler_{dbms_handler}, system_{system}, auth_{auth} {
|
||||
RecoverReplication(repl_state_, system_, dbms_handler_, auth_);
|
||||
@ -216,18 +212,19 @@ bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication::
|
||||
repl_state_.SetReplicationRoleReplica(config, main_uuid);
|
||||
|
||||
// Start
|
||||
const auto success = std::visit(memgraph::utils::Overloaded{[](memgraph::replication::RoleMainData &) {
|
||||
// ASSERT
|
||||
return false;
|
||||
},
|
||||
[this](memgraph::replication::RoleReplicaData &data) {
|
||||
const auto success =
|
||||
std::visit(memgraph::utils::Overloaded{[](memgraph::replication::RoleMainData &) {
|
||||
// ASSERT
|
||||
return false;
|
||||
},
|
||||
[this](memgraph::replication::RoleReplicaData &data) {
|
||||
#ifdef MG_ENTERPRISE
|
||||
return StartRpcServer(dbms_handler_, data, auth_);
|
||||
return StartRpcServer(dbms_handler_, data, auth_, system_);
|
||||
#else
|
||||
return StartRpcServer(dbms_handler_, data);
|
||||
return StartRpcServer(dbms_handler_, data);
|
||||
#endif
|
||||
}},
|
||||
repl_state_.ReplicationData());
|
||||
}},
|
||||
repl_state_.ReplicationData());
|
||||
// TODO Handle error (restore to main?)
|
||||
return success;
|
||||
}
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
#include "auth/replication_handlers.hpp"
|
||||
#include "dbms/replication_handlers.hpp"
|
||||
#include "flags/experimental.hpp"
|
||||
#include "license/license.hpp"
|
||||
#include "replication_handler/system_rpc.hpp"
|
||||
|
||||
@ -56,10 +57,24 @@ void SystemRecoveryHandler(memgraph::system::ReplicaHandlerAccessToState &system
|
||||
memgraph::replication::SystemRecoveryReq req;
|
||||
memgraph::slk::Load(&req, req_reader);
|
||||
|
||||
using enum memgraph::flags::Experiments;
|
||||
auto experimental_system_replication = flags::AreExperimentsEnabled(SYSTEM_REPLICATION);
|
||||
|
||||
// validate
|
||||
if (!current_main_uuid.has_value() || req.main_uuid != current_main_uuid) [[unlikely]] {
|
||||
LogWrongMain(current_main_uuid, req.main_uuid, SystemRecoveryReq::kType.name);
|
||||
return;
|
||||
}
|
||||
if (!experimental_system_replication) {
|
||||
if (req.database_configs.size() != 1 && req.database_configs[0].name != dbms::kDefaultDB) {
|
||||
// a partial system recovery should be only be updating the default database uuid
|
||||
return; // Failure sent on exit
|
||||
}
|
||||
if (!req.users.empty() || !req.roles.empty()) {
|
||||
// a partial system recovery should not be updating any users or roles
|
||||
return; // Failure sent on exit
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* DBMS
|
||||
@ -69,7 +84,9 @@ void SystemRecoveryHandler(memgraph::system::ReplicaHandlerAccessToState &system
|
||||
/*
|
||||
* AUTH
|
||||
*/
|
||||
if (!auth::SystemRecoveryHandler(auth, req.auth_config, req.users, req.roles)) return; // Failure sent on exit
|
||||
if (experimental_system_replication) {
|
||||
if (!auth::SystemRecoveryHandler(auth, req.auth_config, req.users, req.roles)) return; // Failure sent on exit
|
||||
}
|
||||
|
||||
/*
|
||||
* SUCCESSFUL RECOVERY
|
||||
@ -79,35 +96,44 @@ void SystemRecoveryHandler(memgraph::system::ReplicaHandlerAccessToState &system
|
||||
res = SystemRecoveryRes(SystemRecoveryRes::Result::SUCCESS);
|
||||
}
|
||||
|
||||
void Register(replication::RoleReplicaData const &data, dbms::DbmsHandler &dbms_handler, auth::SynchedAuth &auth) {
|
||||
void Register(replication::RoleReplicaData const &data, system::System &system, dbms::DbmsHandler &dbms_handler,
|
||||
auth::SynchedAuth &auth) {
|
||||
// NOTE: Register even without license as the user could add a license at run-time
|
||||
// TODO: fix Register when system is removed from DbmsHandler
|
||||
|
||||
auto system_state_access = dbms_handler.system_->CreateSystemStateAccess();
|
||||
auto system_state_access = system.CreateSystemStateAccess();
|
||||
|
||||
using enum memgraph::flags::Experiments;
|
||||
auto experimental_system_replication = flags::AreExperimentsEnabled(SYSTEM_REPLICATION);
|
||||
|
||||
// System
|
||||
// TODO: remove, as this is not used
|
||||
data.server->rpc_server_.Register<replication::SystemHeartbeatRpc>(
|
||||
[&data, system_state_access](auto *req_reader, auto *res_builder) {
|
||||
spdlog::debug("Received SystemHeartbeatRpc");
|
||||
SystemHeartbeatHandler(system_state_access.LastCommitedTS(), data.uuid_, req_reader, res_builder);
|
||||
});
|
||||
if (experimental_system_replication) {
|
||||
data.server->rpc_server_.Register<replication::SystemHeartbeatRpc>(
|
||||
[&data, system_state_access](auto *req_reader, auto *res_builder) {
|
||||
spdlog::debug("Received SystemHeartbeatRpc");
|
||||
SystemHeartbeatHandler(system_state_access.LastCommitedTS(), data.uuid_, req_reader, res_builder);
|
||||
});
|
||||
}
|
||||
// Needed even with experimental_system_replication=false becasue
|
||||
// need to tell REPLICA the uuid to use for "memgraph" default database
|
||||
data.server->rpc_server_.Register<replication::SystemRecoveryRpc>(
|
||||
[&data, system_state_access, &dbms_handler, &auth](auto *req_reader, auto *res_builder) mutable {
|
||||
spdlog::debug("Received SystemRecoveryRpc");
|
||||
SystemRecoveryHandler(system_state_access, data.uuid_, dbms_handler, auth, req_reader, res_builder);
|
||||
});
|
||||
|
||||
// DBMS
|
||||
dbms::Register(data, system_state_access, dbms_handler);
|
||||
if (experimental_system_replication) {
|
||||
// DBMS
|
||||
dbms::Register(data, system_state_access, dbms_handler);
|
||||
|
||||
// Auth
|
||||
auth::Register(data, system_state_access, auth);
|
||||
// Auth
|
||||
auth::Register(data, system_state_access, auth);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
bool StartRpcServer(dbms::DbmsHandler &dbms_handler, replication::RoleReplicaData &data, auth::SynchedAuth &auth) {
|
||||
bool StartRpcServer(dbms::DbmsHandler &dbms_handler, replication::RoleReplicaData &data, auth::SynchedAuth &auth,
|
||||
system::System &system) {
|
||||
#else
|
||||
bool StartRpcServer(dbms::DbmsHandler &dbms_handler, replication::RoleReplicaData &data) {
|
||||
#endif
|
||||
@ -115,7 +141,7 @@ bool StartRpcServer(dbms::DbmsHandler &dbms_handler, replication::RoleReplicaDat
|
||||
dbms::InMemoryReplicationHandlers::Register(&dbms_handler, data);
|
||||
#ifdef MG_ENTERPRISE
|
||||
// Register system handlers
|
||||
Register(data, dbms_handler, auth);
|
||||
Register(data, system, dbms_handler, auth);
|
||||
#endif
|
||||
// Start server
|
||||
if (!data.server->Start()) {
|
||||
|
@ -26,12 +26,14 @@ struct ISystemAction {
|
||||
/// Durability step which is defered until commit time
|
||||
virtual void DoDurability() = 0;
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
/// Prepare the RPC payload that will be sent to all replicas clients
|
||||
virtual bool DoReplication(memgraph::replication::ReplicationClient &client, const utils::UUID &main_uuid,
|
||||
memgraph::replication::ReplicationEpoch const &epoch,
|
||||
Transaction const &system_tx) const = 0;
|
||||
|
||||
virtual void PostReplication(memgraph::replication::RoleMainData &main_data) const = 0;
|
||||
#endif
|
||||
|
||||
virtual ~ISystemAction() = default;
|
||||
};
|
||||
|
@ -57,11 +57,13 @@ struct Transaction {
|
||||
/// durability
|
||||
action->DoDurability();
|
||||
|
||||
/// replication prep
|
||||
#ifdef MG_ENTERPRISE
|
||||
/// replication
|
||||
auto action_sync_status = handler.ApplyAction(*action, *this);
|
||||
if (action_sync_status != AllSyncReplicaStatus::AllCommitsConfirmed) {
|
||||
sync_status = AllSyncReplicaStatus::SomeCommitsUnconfirmed;
|
||||
}
|
||||
#endif
|
||||
|
||||
actions_.pop_front();
|
||||
}
|
||||
@ -93,6 +95,7 @@ struct Transaction {
|
||||
std::list<std::unique_ptr<ISystemAction>> actions_;
|
||||
};
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
struct DoReplication {
|
||||
explicit DoReplication(replication::RoleMainData &main_data) : main_data_{main_data} {}
|
||||
auto ApplyAction(ISystemAction const &action, Transaction const &system_tx) -> AllSyncReplicaStatus {
|
||||
@ -113,6 +116,7 @@ struct DoReplication {
|
||||
replication::RoleMainData &main_data_;
|
||||
};
|
||||
static_assert(ReplicationPolicy<DoReplication>);
|
||||
#endif
|
||||
|
||||
struct DoNothing {
|
||||
auto ApplyAction(ISystemAction const & /*action*/, Transaction const & /*system_tx*/) -> AllSyncReplicaStatus {
|
||||
|
@ -81,10 +81,7 @@ if (MG_EXPERIMENTAL_HIGH_AVAILABILITY)
|
||||
add_subdirectory(high_availability_experimental)
|
||||
endif ()
|
||||
|
||||
|
||||
if (MG_EXPERIMENTAL_REPLICATION_MULTITENANCY)
|
||||
add_subdirectory(replication_experimental)
|
||||
endif ()
|
||||
add_subdirectory(replication_experimental)
|
||||
|
||||
copy_e2e_python_files(pytest_runner pytest_runner.sh "")
|
||||
copy_e2e_python_files(x x.sh "")
|
||||
|
@ -222,4 +222,9 @@ startup_config_dict = {
|
||||
"128",
|
||||
"The threshold for when to cache long delta chains. This is used for heavy read + write workloads where repeated processing of delta chains can become costly.",
|
||||
),
|
||||
"experimental_enabled": (
|
||||
"",
|
||||
"",
|
||||
"Experimental features to be used, comma seperated. Options [system-replication]",
|
||||
),
|
||||
}
|
||||
|
@ -147,6 +147,7 @@ def test_auth_queries_on_replica(connection):
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||||
"replica_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_1']}",
|
||||
"--log-level=TRACE",
|
||||
@ -160,6 +161,7 @@ def test_auth_queries_on_replica(connection):
|
||||
},
|
||||
"replica_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_2']}",
|
||||
"--log-level=TRACE",
|
||||
@ -173,6 +175,7 @@ def test_auth_queries_on_replica(connection):
|
||||
},
|
||||
"main": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['main']}",
|
||||
"--log-level=TRACE",
|
||||
@ -213,6 +216,7 @@ def test_manual_users_recovery(connection):
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||||
"replica_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_1']}",
|
||||
"--log-level=TRACE",
|
||||
@ -227,6 +231,7 @@ def test_manual_users_recovery(connection):
|
||||
},
|
||||
"replica_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_2']}",
|
||||
"--log-level=TRACE",
|
||||
@ -241,6 +246,7 @@ def test_manual_users_recovery(connection):
|
||||
},
|
||||
"main": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['main']}",
|
||||
"--log-level=TRACE",
|
||||
@ -287,6 +293,7 @@ def test_env_users_recovery(connection):
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||||
"replica_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_1']}",
|
||||
"--log-level=TRACE",
|
||||
@ -300,6 +307,7 @@ def test_env_users_recovery(connection):
|
||||
},
|
||||
"replica_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_2']}",
|
||||
"--log-level=TRACE",
|
||||
@ -315,6 +323,7 @@ def test_env_users_recovery(connection):
|
||||
"username": "user1",
|
||||
"password": "password",
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['main']}",
|
||||
"--log-level=TRACE",
|
||||
@ -368,6 +377,7 @@ def test_manual_roles_recovery(connection):
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||||
"replica_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_1']}",
|
||||
"--log-level=TRACE",
|
||||
@ -382,6 +392,7 @@ def test_manual_roles_recovery(connection):
|
||||
},
|
||||
"replica_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_2']}",
|
||||
"--log-level=TRACE",
|
||||
@ -398,6 +409,7 @@ def test_manual_roles_recovery(connection):
|
||||
},
|
||||
"main": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['main']}",
|
||||
"--log-level=TRACE",
|
||||
@ -455,6 +467,7 @@ def test_auth_config_recovery(connection):
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||||
"replica_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_1']}",
|
||||
"--log-level=TRACE",
|
||||
@ -475,6 +488,7 @@ def test_auth_config_recovery(connection):
|
||||
},
|
||||
"replica_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_2']}",
|
||||
"--log-level=TRACE",
|
||||
@ -497,6 +511,7 @@ def test_auth_config_recovery(connection):
|
||||
},
|
||||
"main": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['main']}",
|
||||
"--log-level=TRACE",
|
||||
@ -550,6 +565,7 @@ def test_auth_replication(connection):
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||||
"replica_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_1']}",
|
||||
"--log-level=TRACE",
|
||||
@ -563,6 +579,7 @@ def test_auth_replication(connection):
|
||||
},
|
||||
"replica_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_2']}",
|
||||
"--log-level=TRACE",
|
||||
@ -576,6 +593,7 @@ def test_auth_replication(connection):
|
||||
},
|
||||
"main": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['main']}",
|
||||
"--log-level=TRACE",
|
||||
|
@ -39,6 +39,7 @@ def create_memgraph_instances_with_role_recovery(data_directory: Any) -> Dict[st
|
||||
return {
|
||||
"replica_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_1']}",
|
||||
"--log-level",
|
||||
@ -53,6 +54,7 @@ def create_memgraph_instances_with_role_recovery(data_directory: Any) -> Dict[st
|
||||
},
|
||||
"replica_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_2']}",
|
||||
"--log-level=TRACE",
|
||||
@ -65,7 +67,12 @@ def create_memgraph_instances_with_role_recovery(data_directory: Any) -> Dict[st
|
||||
"data_directory": f"{data_directory}/replica_2",
|
||||
},
|
||||
"main": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['main']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "main.log",
|
||||
"setup_queries": [],
|
||||
},
|
||||
@ -98,6 +105,7 @@ TEMP_DIR = tempfile.TemporaryDirectory().name
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION_WITH_RECOVERY = {
|
||||
"replica_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_1']}",
|
||||
"--log-level=TRACE",
|
||||
@ -109,6 +117,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION_WITH_RECOVERY = {
|
||||
},
|
||||
"replica_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_2']}",
|
||||
"--log-level=TRACE",
|
||||
@ -120,6 +129,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION_WITH_RECOVERY = {
|
||||
},
|
||||
"main": {
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['main']}",
|
||||
"--log-level=TRACE",
|
||||
@ -206,7 +216,12 @@ def test_manual_databases_create_multitenancy_replication(connection):
|
||||
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||||
"replica_1": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_1']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "replica1.log",
|
||||
"setup_queries": [
|
||||
"CREATE DATABASE A;",
|
||||
@ -215,7 +230,12 @@ def test_manual_databases_create_multitenancy_replication(connection):
|
||||
],
|
||||
},
|
||||
"replica_2": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['replica_2']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_2']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "replica2.log",
|
||||
"setup_queries": [
|
||||
"CREATE DATABASE A;",
|
||||
@ -224,7 +244,12 @@ def test_manual_databases_create_multitenancy_replication(connection):
|
||||
],
|
||||
},
|
||||
"main": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['main']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "main.log",
|
||||
"setup_queries": [
|
||||
"CREATE DATABASE A;",
|
||||
@ -281,7 +306,12 @@ def test_manual_databases_create_multitenancy_replication_branching(connection):
|
||||
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||||
"replica_1": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_1']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "replica1.log",
|
||||
"setup_queries": [
|
||||
"CREATE DATABASE A;",
|
||||
@ -294,7 +324,12 @@ def test_manual_databases_create_multitenancy_replication_branching(connection):
|
||||
],
|
||||
},
|
||||
"replica_2": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['replica_2']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_2']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "replica2.log",
|
||||
"setup_queries": [
|
||||
"CREATE DATABASE A;",
|
||||
@ -307,7 +342,12 @@ def test_manual_databases_create_multitenancy_replication_branching(connection):
|
||||
],
|
||||
},
|
||||
"main": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['main']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "main.log",
|
||||
"setup_queries": [
|
||||
"CREATE DATABASE A;",
|
||||
@ -354,7 +394,12 @@ def test_manual_databases_create_multitenancy_replication_dirty_replica(connecti
|
||||
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||||
"replica_1": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_1']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "replica1.log",
|
||||
"setup_queries": [
|
||||
"CREATE DATABASE A;",
|
||||
@ -364,7 +409,12 @@ def test_manual_databases_create_multitenancy_replication_dirty_replica(connecti
|
||||
],
|
||||
},
|
||||
"replica_2": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['replica_2']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_2']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "replica2.log",
|
||||
"setup_queries": [
|
||||
"CREATE DATABASE A;",
|
||||
@ -374,7 +424,12 @@ def test_manual_databases_create_multitenancy_replication_dirty_replica(connecti
|
||||
],
|
||||
},
|
||||
"main": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['main']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "main.log",
|
||||
"setup_queries": [
|
||||
"CREATE DATABASE A;",
|
||||
@ -416,7 +471,12 @@ def test_manual_databases_create_multitenancy_replication_main_behind(connection
|
||||
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||||
"replica_1": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_1']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "replica1.log",
|
||||
"setup_queries": [
|
||||
"CREATE DATABASE A;",
|
||||
@ -426,7 +486,12 @@ def test_manual_databases_create_multitenancy_replication_main_behind(connection
|
||||
],
|
||||
},
|
||||
"replica_2": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['replica_2']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_2']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "replica2.log",
|
||||
"setup_queries": [
|
||||
"CREATE DATABASE A;",
|
||||
@ -436,7 +501,12 @@ def test_manual_databases_create_multitenancy_replication_main_behind(connection
|
||||
],
|
||||
},
|
||||
"main": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['main']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "main.log",
|
||||
"setup_queries": [
|
||||
f"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:{REPLICATION_PORTS['replica_1']}';",
|
||||
@ -532,14 +602,24 @@ def test_automatic_databases_multitenancy_replication_predefined(connection):
|
||||
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||||
"replica_1": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_1']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "replica1.log",
|
||||
"setup_queries": [
|
||||
f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_1']};",
|
||||
],
|
||||
},
|
||||
"main": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['main']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "main.log",
|
||||
"setup_queries": [
|
||||
"CREATE DATABASE A;",
|
||||
@ -585,14 +665,24 @@ def test_automatic_databases_create_multitenancy_replication_dirty_main(connecti
|
||||
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION_MANUAL = {
|
||||
"replica_1": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['replica_1']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['replica_1']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "replica1.log",
|
||||
"setup_queries": [
|
||||
f"SET REPLICATION ROLE TO REPLICA WITH PORT {REPLICATION_PORTS['replica_1']};",
|
||||
],
|
||||
},
|
||||
"main": {
|
||||
"args": ["--bolt-port", f"{BOLT_PORTS['main']}", "--log-level=TRACE"],
|
||||
"args": [
|
||||
"--experimental-enabled=system-replication",
|
||||
"--bolt-port",
|
||||
f"{BOLT_PORTS['main']}",
|
||||
"--log-level=TRACE",
|
||||
],
|
||||
"log_file": "main.log",
|
||||
"setup_queries": [
|
||||
"CREATE DATABASE A;",
|
||||
|
@ -44,7 +44,7 @@ int main(int argc, char **argv) {
|
||||
memgraph::replication::ReplicationState repl_state(ReplicationStateRootPath(db_config));
|
||||
|
||||
memgraph::system::System system_state;
|
||||
memgraph::dbms::DbmsHandler dbms_handler(db_config, system_state, repl_state
|
||||
memgraph::dbms::DbmsHandler dbms_handler(db_config, repl_state
|
||||
#ifdef MG_ENTERPRISE
|
||||
,
|
||||
auth_, false
|
||||
|
@ -71,8 +71,7 @@ class TestEnvironment : public ::testing::Environment {
|
||||
memgraph::auth::Auth::Config{/* default */});
|
||||
system_state = std::make_unique<memgraph::system::System>();
|
||||
repl_state = std::make_unique<memgraph::replication::ReplicationState>(ReplicationStateRootPath(storage_conf));
|
||||
ptr_ = std::make_unique<memgraph::dbms::DbmsHandler>(storage_conf, *system_state.get(), *repl_state.get(),
|
||||
*auth.get(), false);
|
||||
ptr_ = std::make_unique<memgraph::dbms::DbmsHandler>(storage_conf, *repl_state.get(), *auth.get(), false);
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
|
@ -55,7 +55,7 @@ class TestEnvironment : public ::testing::Environment {
|
||||
memgraph::auth::Auth::Config{/* default */});
|
||||
system_state = std::make_unique<memgraph::system::System>();
|
||||
repl_state = std::make_unique<memgraph::replication::ReplicationState>(ReplicationStateRootPath(storage_conf));
|
||||
ptr_ = std::make_unique<memgraph::dbms::DbmsHandler>(storage_conf, *system_state.get(), *repl_state.get());
|
||||
ptr_ = std::make_unique<memgraph::dbms::DbmsHandler>(storage_conf, *repl_state.get());
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
|
@ -101,7 +101,7 @@ class MultiTenantTest : public ::testing::Test {
|
||||
explicit MinMemgraph(const memgraph::storage::Config &conf)
|
||||
: auth{conf.durability.storage_directory / "auth", memgraph::auth::Auth::Config{/* default */}},
|
||||
repl_state{ReplicationStateRootPath(conf)},
|
||||
dbms{conf, system, repl_state, auth, true},
|
||||
dbms{conf, repl_state, auth, true},
|
||||
interpreter_context{{},
|
||||
&dbms,
|
||||
&repl_state,
|
||||
|
@ -113,7 +113,7 @@ struct MinMemgraph {
|
||||
MinMemgraph(const memgraph::storage::Config &conf)
|
||||
: auth{conf.durability.storage_directory / "auth", memgraph::auth::Auth::Config{/* default */}},
|
||||
repl_state{ReplicationStateRootPath(conf)},
|
||||
dbms{conf, system_, repl_state
|
||||
dbms{conf, repl_state
|
||||
#ifdef MG_ENTERPRISE
|
||||
,
|
||||
auth, true
|
||||
@ -124,7 +124,7 @@ struct MinMemgraph {
|
||||
repl_handler(repl_state, dbms
|
||||
#ifdef MG_ENTERPRISE
|
||||
,
|
||||
&system_, auth
|
||||
system_, auth
|
||||
#endif
|
||||
) {
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user