Merge branch 'master' into run-ci-in-mgbuilders

This commit is contained in:
Marko Barišić 2024-02-26 22:58:38 +01:00 committed by GitHub
commit f1dc2fc9aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 1848 additions and 728 deletions

View File

@ -56,6 +56,15 @@ jobs:
- name: "Build package"
run: |
./release/package/run.sh package debian-10 $BUILD_TYPE
- name: Upload to S3
uses: jakejarvis/s3-sync-action@v0.5.1
env:
AWS_S3_BUCKET: "deps.memgraph.io"
AWS_ACCESS_KEY_ID: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: "eu-west-1"
SOURCE_DIR: "build/output"
DEST_DIR: "memgraph-unofficial/${{ github.ref_name }}/"
- name: "Upload package"
uses: actions/upload-artifact@v4
with:
@ -75,6 +84,15 @@ jobs:
- name: "Build package"
run: |
./release/package/run.sh package ubuntu-22.04 $BUILD_TYPE
- name: Upload to S3
uses: jakejarvis/s3-sync-action@v0.5.1
env:
AWS_S3_BUCKET: "deps.memgraph.io"
AWS_ACCESS_KEY_ID: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: "eu-west-1"
SOURCE_DIR: "build/output"
DEST_DIR: "memgraph-unofficial/${{ github.ref_name }}/"
- name: "Upload package"
uses: actions/upload-artifact@v4
with:
@ -86,7 +104,7 @@ jobs:
needs: [Ubuntu20_04]
runs-on: [self-hosted, DockerMgBuild, ARM64]
# M1 Mac mini is sometimes slower
timeout-minutes: 90
timeout-minutes: 150
steps:
- name: "Set up repository"
uses: actions/checkout@v4
@ -95,6 +113,15 @@ jobs:
- name: "Build package"
run: |
./release/package/run.sh package ubuntu-22.04-arm $BUILD_TYPE
- name: Upload to S3
uses: jakejarvis/s3-sync-action@v0.5.1
env:
AWS_S3_BUCKET: "deps.memgraph.io"
AWS_ACCESS_KEY_ID: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: "eu-west-1"
SOURCE_DIR: "build/output"
DEST_DIR: "memgraph-unofficial/${{ github.ref_name }}/"
- name: "Upload package"
uses: actions/upload-artifact@v4
with:
@ -114,6 +141,15 @@ jobs:
- name: "Build package"
run: |
./release/package/run.sh package debian-11 $BUILD_TYPE
- name: Upload to S3
uses: jakejarvis/s3-sync-action@v0.5.1
env:
AWS_S3_BUCKET: "deps.memgraph.io"
AWS_ACCESS_KEY_ID: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: "eu-west-1"
SOURCE_DIR: "build/output"
DEST_DIR: "memgraph-unofficial/${{ github.ref_name }}/"
- name: "Upload package"
uses: actions/upload-artifact@v4
with:
@ -125,7 +161,7 @@ jobs:
needs: [Debian10, Ubuntu20_04]
runs-on: [self-hosted, DockerMgBuild, ARM64]
# M1 Mac mini is sometimes slower
timeout-minutes: 90
timeout-minutes: 150
steps:
- name: "Set up repository"
uses: actions/checkout@v4
@ -134,23 +170,6 @@ jobs:
- name: "Build package"
run: |
./release/package/run.sh package debian-11-arm $BUILD_TYPE
- name: "Upload package"
uses: actions/upload-artifact@v4
with:
name: debian-11-aarch64
path: build/output/debian-11-arm/memgraph*.deb
PushToS3:
if: github.ref_type == 'tag'
needs: [PackageDebian10, PackageDebian11, PackageDebian11_ARM, PackageUbuntu20_04, PackageUbuntu20_04_ARM]
runs-on: ubuntu-latest
steps:
- name: Download artifacts
uses: actions/download-artifact@v4
with:
# name: # if name input parameter is not provided, all artifacts are downloaded
# and put in directories named after each one.
path: build/output/release
- name: Upload to S3
uses: jakejarvis/s3-sync-action@v0.5.1
env:
@ -158,5 +177,10 @@ jobs:
AWS_ACCESS_KEY_ID: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: "eu-west-1"
SOURCE_DIR: "build/output/release"
SOURCE_DIR: "build/output"
DEST_DIR: "memgraph-unofficial/${{ github.ref_name }}/"
- name: "Upload package"
uses: actions/upload-artifact@v4
with:
name: debian-11-aarch64
path: build/output/debian-11-arm/memgraph*.deb

View File

@ -110,9 +110,9 @@ class Database {
* @param force_directory Use the configured directory, do not try to decipher the multi-db version
* @return DatabaseInfo
*/
DatabaseInfo GetInfo(bool force_directory, replication_coordination_glue::ReplicationRole replication_role) const {
DatabaseInfo GetInfo(replication_coordination_glue::ReplicationRole replication_role) const {
DatabaseInfo info;
info.storage_info = storage_->GetInfo(force_directory, replication_role);
info.storage_info = storage_->GetInfo(replication_role);
info.triggers = trigger_store_.GetTriggerInfo().size();
info.streams = streams_.GetStreamInfo().size();
return info;

View File

@ -302,7 +302,7 @@ class DbmsHandler {
auto db_acc_opt = db_gk.access();
if (db_acc_opt) {
auto &db_acc = *db_acc_opt;
const auto &info = db_acc->GetInfo(false, replication_role);
const auto &info = db_acc->GetInfo(replication_role);
const auto &storage_info = info.storage_info;
stats.num_vertex += storage_info.vertex_count;
stats.num_edges += storage_info.edge_count;
@ -338,7 +338,7 @@ class DbmsHandler {
auto db_acc_opt = db_gk.access();
if (db_acc_opt) {
auto &db_acc = *db_acc_opt;
res.push_back(db_acc->GetInfo(false, replication_role));
res.push_back(db_acc->GetInfo(replication_role));
}
}
return res;

View File

@ -3035,8 +3035,6 @@ class ReplicationQuery : public memgraph::query::Query {
enum class SyncMode { SYNC, ASYNC };
enum class ReplicaState { READY, REPLICATING, RECOVERY, MAYBE_BEHIND, DIVERGED_FROM_MAIN };
ReplicationQuery() = default;
DEFVISITABLE(QueryVisitor<void>);

View File

@ -297,16 +297,6 @@ inline auto convertToReplicationMode(const ReplicationQuery::SyncMode &sync_mode
class ReplQueryHandler {
public:
struct ReplicaInfo {
std::string name;
std::string socket_address;
ReplicationQuery::SyncMode sync_mode;
std::optional<double> timeout;
uint64_t current_timestamp_of_replica;
uint64_t current_number_of_timestamp_behind_master;
ReplicationQuery::ReplicaState state;
};
explicit ReplQueryHandler(query::ReplicationQueryHandler &replication_query_handler)
: handler_{&replication_query_handler} {}
@ -397,58 +387,16 @@ class ReplQueryHandler {
}
}
std::vector<ReplicaInfo> ShowReplicas(const dbms::Database &db) const {
if (handler_->IsReplica()) {
// replica can't show registered replicas (it shouldn't have any)
throw QueryRuntimeException("Replica can't show registered replicas (it shouldn't have any)!");
std::vector<ReplicasInfo> ShowReplicas() const {
auto info = handler_->ShowReplicas();
if (info.HasError()) {
switch (info.GetError()) {
case ShowReplicaError::NOT_MAIN:
throw QueryRuntimeException("Replica can't show registered replicas (it shouldn't have any)!");
}
}
// TODO: Combine results? Have a single place with clients???
// Also authentication checks (replica + database visibility)
const auto repl_infos = db.storage()->ReplicasInfo();
std::vector<ReplicaInfo> replicas;
replicas.reserve(repl_infos.size());
const auto from_info = [](const auto &repl_info) -> ReplicaInfo {
ReplicaInfo replica;
replica.name = repl_info.name;
replica.socket_address = repl_info.endpoint.SocketAddress();
switch (repl_info.mode) {
case replication_coordination_glue::ReplicationMode::SYNC:
replica.sync_mode = ReplicationQuery::SyncMode::SYNC;
break;
case replication_coordination_glue::ReplicationMode::ASYNC:
replica.sync_mode = ReplicationQuery::SyncMode::ASYNC;
break;
}
replica.current_timestamp_of_replica = repl_info.timestamp_info.current_timestamp_of_replica;
replica.current_number_of_timestamp_behind_master =
repl_info.timestamp_info.current_number_of_timestamp_behind_master;
switch (repl_info.state) {
case storage::replication::ReplicaState::READY:
replica.state = ReplicationQuery::ReplicaState::READY;
break;
case storage::replication::ReplicaState::REPLICATING:
replica.state = ReplicationQuery::ReplicaState::REPLICATING;
break;
case storage::replication::ReplicaState::RECOVERY:
replica.state = ReplicationQuery::ReplicaState::RECOVERY;
break;
case storage::replication::ReplicaState::MAYBE_BEHIND:
replica.state = ReplicationQuery::ReplicaState::MAYBE_BEHIND;
break;
case storage::replication::ReplicaState::DIVERGED_FROM_MAIN:
replica.state = ReplicationQuery::ReplicaState::DIVERGED_FROM_MAIN;
break;
}
return replica;
};
std::transform(repl_infos.begin(), repl_infos.end(), std::back_inserter(replicas), from_info);
return replicas;
return info.GetValue().entries_;
}
private:
@ -1092,50 +1040,98 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
}
#endif
callback.header = {
"name", "socket_address", "sync_mode", "current_timestamp_of_replica", "number_of_timestamp_behind_master",
"state"};
bool full_info = false;
#ifdef MG_ENTERPRISE
full_info = license::global_license_checker.IsEnterpriseValidFast();
#endif
callback.header = {"name", "socket_address", "sync_mode", "system_info", "data_info"};
callback.fn = [handler = ReplQueryHandler{replication_query_handler}, replica_nfields = callback.header.size(),
db_acc = current_db.db_acc_] {
const auto &replicas = handler.ShowReplicas(*db_acc->get());
full_info] {
auto const sync_mode_to_tv = [](memgraph::replication_coordination_glue::ReplicationMode sync_mode) {
using namespace std::string_view_literals;
switch (sync_mode) {
using enum memgraph::replication_coordination_glue::ReplicationMode;
case SYNC:
return TypedValue{"sync"sv};
case ASYNC:
return TypedValue{"async"sv};
}
};
auto const replica_sys_state_to_tv = [](memgraph::replication::ReplicationClient::State state) {
using namespace std::string_view_literals;
switch (state) {
using enum memgraph::replication::ReplicationClient::State;
case BEHIND:
return TypedValue{"invalid"sv};
case READY:
return TypedValue{"ready"sv};
case RECOVERY:
return TypedValue{"recovery"sv};
}
};
auto const sys_info_to_tv = [&](ReplicaSystemInfoState orig) {
auto info = std::map<std::string, TypedValue>{};
info.emplace("ts", TypedValue{static_cast<int64_t>(orig.ts_)});
// TODO: behind not implemented
info.emplace("behind", TypedValue{/* static_cast<int64_t>(orig.behind_) */});
info.emplace("status", replica_sys_state_to_tv(orig.state_));
return TypedValue{std::move(info)};
};
auto const replica_state_to_tv = [](memgraph::storage::replication::ReplicaState state) {
using namespace std::string_view_literals;
switch (state) {
using enum memgraph::storage::replication::ReplicaState;
case READY:
return TypedValue{"ready"sv};
case REPLICATING:
return TypedValue{"replicating"sv};
case RECOVERY:
return TypedValue{"recovery"sv};
case MAYBE_BEHIND:
return TypedValue{"invalid"sv};
case DIVERGED_FROM_MAIN:
return TypedValue{"diverged"sv};
}
};
auto const info_to_tv = [&](ReplicaInfoState orig) {
auto info = std::map<std::string, TypedValue>{};
info.emplace("ts", TypedValue{static_cast<int64_t>(orig.ts_)});
info.emplace("behind", TypedValue{static_cast<int64_t>(orig.behind_)});
info.emplace("status", replica_state_to_tv(orig.state_));
return TypedValue{std::move(info)};
};
auto const data_info_to_tv = [&](std::map<std::string, ReplicaInfoState> orig) {
auto data_info = std::map<std::string, TypedValue>{};
for (auto &[name, info] : orig) {
data_info.emplace(name, info_to_tv(info));
}
return TypedValue{std::move(data_info)};
};
auto replicas = handler.ShowReplicas();
auto typed_replicas = std::vector<std::vector<TypedValue>>{};
typed_replicas.reserve(replicas.size());
for (const auto &replica : replicas) {
for (auto &replica : replicas) {
std::vector<TypedValue> typed_replica;
typed_replica.reserve(replica_nfields);
typed_replica.emplace_back(replica.name);
typed_replica.emplace_back(replica.socket_address);
switch (replica.sync_mode) {
case ReplicationQuery::SyncMode::SYNC:
typed_replica.emplace_back("sync");
break;
case ReplicationQuery::SyncMode::ASYNC:
typed_replica.emplace_back("async");
break;
}
typed_replica.emplace_back(static_cast<int64_t>(replica.current_timestamp_of_replica));
typed_replica.emplace_back(static_cast<int64_t>(replica.current_number_of_timestamp_behind_master));
switch (replica.state) {
case ReplicationQuery::ReplicaState::READY:
typed_replica.emplace_back("ready");
break;
case ReplicationQuery::ReplicaState::REPLICATING:
typed_replica.emplace_back("replicating");
break;
case ReplicationQuery::ReplicaState::RECOVERY:
typed_replica.emplace_back("recovery");
break;
case ReplicationQuery::ReplicaState::MAYBE_BEHIND:
typed_replica.emplace_back("invalid");
break;
case ReplicationQuery::ReplicaState::DIVERGED_FROM_MAIN:
typed_replica.emplace_back("diverged");
break;
typed_replica.emplace_back(replica.name_);
typed_replica.emplace_back(replica.socket_address_);
typed_replica.emplace_back(sync_mode_to_tv(replica.sync_mode_));
if (full_info) {
typed_replica.emplace_back(sys_info_to_tv(replica.system_info_));
} else {
// Set to NULL
typed_replica.emplace_back(TypedValue{});
}
typed_replica.emplace_back(data_info_to_tv(replica.data_info_));
typed_replicas.emplace_back(std::move(typed_replica));
}

View File

@ -84,16 +84,6 @@ class CoordinatorQueryHandler {
CoordinatorQueryHandler(CoordinatorQueryHandler &&) = default;
CoordinatorQueryHandler &operator=(CoordinatorQueryHandler &&) = default;
struct Replica {
std::string name;
std::string socket_address;
ReplicationQuery::SyncMode sync_mode;
std::optional<double> timeout;
uint64_t current_timestamp_of_replica;
uint64_t current_number_of_timestamp_behind_master;
ReplicationQuery::ReplicaState state;
};
struct MainReplicaStatus {
std::string_view name;
std::string_view socket_address;

View File

@ -11,6 +11,8 @@
#pragma once
#include "replication/replication_client.hpp"
#include "replication_coordination_glue/mode.hpp"
#include "replication_coordination_glue/role.hpp"
#include "utils/result.hpp"
#include "utils/uuid.hpp"
@ -31,6 +33,7 @@ enum class RegisterReplicaError : uint8_t {
COULD_NOT_BE_PERSISTED,
ERROR_ACCEPTING_MAIN
};
enum class UnregisterReplicaResult : uint8_t {
NOT_MAIN,
COULD_NOT_BE_PERSISTED,
@ -38,6 +41,47 @@ enum class UnregisterReplicaResult : uint8_t {
SUCCESS,
};
enum class ShowReplicaError : uint8_t {
NOT_MAIN,
};
struct ReplicaSystemInfoState {
uint64_t ts_;
uint64_t behind_;
replication::ReplicationClient::State state_;
};
struct ReplicaInfoState {
ReplicaInfoState(uint64_t ts, uint64_t behind, storage::replication::ReplicaState state)
: ts_(ts), behind_(behind), state_(state) {}
uint64_t ts_;
uint64_t behind_;
storage::replication::ReplicaState state_;
};
struct ReplicasInfo {
ReplicasInfo(std::string name, std::string socket_address, replication_coordination_glue::ReplicationMode sync_mode,
ReplicaSystemInfoState system_info, std::map<std::string, ReplicaInfoState> data_info)
: name_(std::move(name)),
socket_address_(std::move(socket_address)),
sync_mode_(sync_mode),
system_info_(std::move(system_info)),
data_info_(std::move(data_info)) {}
std::string name_;
std::string socket_address_;
memgraph::replication_coordination_glue::ReplicationMode sync_mode_;
ReplicaSystemInfoState system_info_;
std::map<std::string, ReplicaInfoState> data_info_;
};
struct ReplicasInfos {
explicit ReplicasInfos(std::vector<ReplicasInfo> entries) : entries_(std::move(entries)) {}
std::vector<ReplicasInfo> entries_;
};
/// A handler type that keep in sync current ReplicationState and the MAIN/REPLICA-ness of Storage
struct ReplicationQueryHandler {
virtual ~ReplicationQueryHandler() = default;
@ -66,6 +110,8 @@ struct ReplicationQueryHandler {
virtual auto GetRole() const -> memgraph::replication_coordination_glue::ReplicationRole = 0;
virtual bool IsMain() const = 0;
virtual bool IsReplica() const = 0;
virtual auto ShowReplicas() const -> utils::BasicResult<ShowReplicaError, ReplicasInfos> = 0;
};
} // namespace memgraph::query

View File

@ -14,7 +14,9 @@
#include "replication/config.hpp"
#include "replication_coordination_glue/messages.hpp"
#include "rpc/client.hpp"
#include "utils/rw_lock.hpp"
#include "utils/scheduler.hpp"
#include "utils/spin_lock.hpp"
#include "utils/synchronized.hpp"
#include "utils/thread_pool.hpp"
@ -114,8 +116,9 @@ struct ReplicationClient {
enum class State {
BEHIND,
READY,
RECOVERY,
};
utils::Synchronized<State> state_{State::BEHIND};
utils::Synchronized<State, utils::WritePrioritizedRWLock> state_{State::BEHIND};
replication_coordination_glue::ReplicationMode mode_{replication_coordination_glue::ReplicationMode::SYNC};
// This thread pool is used for background tasks so we don't

View File

@ -39,10 +39,12 @@ void SystemRestore(replication::ReplicationClient &client, system::System &syste
const utils::UUID &main_uuid, auth::SynchedAuth &auth) {
// Check if system is up to date
if (client.state_.WithLock(
[](auto &state) { return state == memgraph::replication::ReplicationClient::State::READY; }))
[](auto &state) { return state != memgraph::replication::ReplicationClient::State::BEHIND; }))
return;
// Try to recover...
client.state_.WithLock(
[](auto &state) { return state != memgraph::replication::ReplicationClient::State::RECOVERY; });
{
using enum memgraph::flags::Experiments;
bool full_system_replication =
@ -139,6 +141,9 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
bool IsMain() const override;
bool IsReplica() const override;
auto ShowReplicas() const
-> utils::BasicResult<memgraph::query::ShowReplicaError, memgraph::query::ReplicasInfos> override;
auto GetReplState() const -> const memgraph::replication::ReplicationState &;
auto GetReplState() -> memgraph::replication::ReplicationState &;

View File

@ -10,26 +10,28 @@
// licenses/APL.txt.
#include "replication_handler/replication_handler.hpp"
#include "dbms/constants.hpp"
#include "dbms/dbms_handler.hpp"
#include "replication/replication_client.hpp"
#include "replication_handler/system_replication.hpp"
namespace memgraph::replication {
namespace {
#ifdef MG_ENTERPRISE
void RecoverReplication(memgraph::replication::ReplicationState &repl_state, memgraph::system::System &system,
memgraph::dbms::DbmsHandler &dbms_handler, memgraph::auth::SynchedAuth &auth) {
void RecoverReplication(replication::ReplicationState &repl_state, system::System &system,
dbms::DbmsHandler &dbms_handler, auth::SynchedAuth &auth) {
/*
* REPLICATION RECOVERY AND STARTUP
*/
// Startup replication state (if recovered at startup)
auto replica = [&dbms_handler, &auth, &system](memgraph::replication::RoleReplicaData &data) {
return memgraph::replication::StartRpcServer(dbms_handler, data, auth, system);
auto replica = [&dbms_handler, &auth, &system](replication::RoleReplicaData &data) {
return replication::StartRpcServer(dbms_handler, data, auth, system);
};
// Replication recovery and frequent check start
auto main = [&system, &dbms_handler, &auth](memgraph::replication::RoleMainData &mainData) {
auto main = [&system, &dbms_handler, &auth](replication::RoleMainData &mainData) {
for (auto &client : mainData.registered_replicas_) {
if (client.try_set_uuid &&
replication_coordination_glue::SendSwapMainUUIDRpc(client.rpc_client_, mainData.uuid_)) {
@ -38,7 +40,7 @@ void RecoverReplication(memgraph::replication::ReplicationState &repl_state, mem
SystemRestore(client, system, dbms_handler, mainData.uuid_, auth);
}
// DBMS here
dbms_handler.ForEach([&mainData](memgraph::dbms::DatabaseAccess db_acc) {
dbms_handler.ForEach([&mainData](dbms::DatabaseAccess db_acc) {
dbms::DbmsHandler::RecoverStorageReplication(std::move(db_acc), mainData);
});
@ -48,7 +50,7 @@ void RecoverReplication(memgraph::replication::ReplicationState &repl_state, mem
// Warning
if (dbms_handler.default_config().durability.snapshot_wal_mode ==
memgraph::storage::Config::Durability::SnapshotWalMode::DISABLED) {
storage::Config::Durability::SnapshotWalMode::DISABLED) {
spdlog::warn(
"The instance has the MAIN replication role, but durability logs and snapshots are disabled. Please "
"consider "
@ -59,19 +61,18 @@ void RecoverReplication(memgraph::replication::ReplicationState &repl_state, mem
return true;
};
auto result = std::visit(memgraph::utils::Overloaded{replica, main}, repl_state.ReplicationData());
auto result = std::visit(utils::Overloaded{replica, main}, repl_state.ReplicationData());
MG_ASSERT(result, "Replica recovery failure!");
}
#else
void RecoverReplication(memgraph::replication::ReplicationState &repl_state,
memgraph::dbms::DbmsHandler &dbms_handler) {
void RecoverReplication(replication::ReplicationState &repl_state, dbms::DbmsHandler &dbms_handler) {
// Startup replication state (if recovered at startup)
auto replica = [&dbms_handler](memgraph::replication::RoleReplicaData &data) {
return memgraph::replication::StartRpcServer(dbms_handler, data);
auto replica = [&dbms_handler](replication::RoleReplicaData &data) {
return replication::StartRpcServer(dbms_handler, data);
};
// Replication recovery and frequent check start
auto main = [&dbms_handler](memgraph::replication::RoleMainData &mainData) {
auto main = [&dbms_handler](replication::RoleMainData &mainData) {
dbms::DbmsHandler::RecoverStorageReplication(dbms_handler.Get(), mainData);
for (auto &client : mainData.registered_replicas_) {
@ -79,12 +80,12 @@ void RecoverReplication(memgraph::replication::ReplicationState &repl_state,
replication_coordination_glue::SendSwapMainUUIDRpc(client.rpc_client_, mainData.uuid_)) {
client.try_set_uuid = false;
}
memgraph::replication::StartReplicaClient(client, dbms_handler, mainData.uuid_);
replication::StartReplicaClient(client, dbms_handler, mainData.uuid_);
}
// Warning
if (dbms_handler.default_config().durability.snapshot_wal_mode ==
memgraph::storage::Config::Durability::SnapshotWalMode::DISABLED) {
storage::Config::Durability::SnapshotWalMode::DISABLED) {
spdlog::warn(
"The instance has the MAIN replication role, but durability logs and snapshots are disabled. Please "
"consider "
@ -95,7 +96,7 @@ void RecoverReplication(memgraph::replication::ReplicationState &repl_state,
return true;
};
auto result = std::visit(memgraph::utils::Overloaded{replica, main}, repl_state.ReplicationData());
auto result = std::visit(utils::Overloaded{replica, main}, repl_state.ReplicationData());
MG_ASSERT(result, "Replica recovery failure!");
}
#endif
@ -133,20 +134,19 @@ void StartReplicaClient(replication::ReplicationClient &client, dbms::DbmsHandle
spdlog::trace("Replication client started at: {}:{}", endpoint.address, endpoint.port);
client.StartFrequentCheck([&, license = license::global_license_checker.IsEnterpriseValidFast(), main_uuid](
bool reconnect, replication::ReplicationClient &client) mutable {
if (client.try_set_uuid &&
memgraph::replication_coordination_glue::SendSwapMainUUIDRpc(client.rpc_client_, main_uuid)) {
if (client.try_set_uuid && replication_coordination_glue::SendSwapMainUUIDRpc(client.rpc_client_, main_uuid)) {
client.try_set_uuid = false;
}
// Working connection
// Check if system needs restoration
if (reconnect) {
client.state_.WithLock([](auto &state) { state = memgraph::replication::ReplicationClient::State::BEHIND; });
client.state_.WithLock([](auto &state) { state = replication::ReplicationClient::State::BEHIND; });
}
// Check if license has changed
const auto new_license = license::global_license_checker.IsEnterpriseValidFast();
if (new_license != license) {
license = new_license;
client.state_.WithLock([](auto &state) { state = memgraph::replication::ReplicationClient::State::BEHIND; });
client.state_.WithLock([](auto &state) { state = replication::ReplicationClient::State::BEHIND; });
}
#ifdef MG_ENTERPRISE
SystemRestore<true>(client, system, dbms_handler, main_uuid, auth);
@ -154,10 +154,10 @@ void StartReplicaClient(replication::ReplicationClient &client, dbms::DbmsHandle
// Check if any database has been left behind
dbms_handler.ForEach([&name = client.name_, reconnect](dbms::DatabaseAccess db_acc) {
// Specific database <-> replica client
db_acc->storage()->repl_storage_state_.WithClient(name, [&](storage::ReplicationStorageClient *client) {
if (reconnect || client->State() == storage::replication::ReplicaState::MAYBE_BEHIND) {
db_acc->storage()->repl_storage_state_.WithClient(name, [&](storage::ReplicationStorageClient &client) {
if (reconnect || client.State() == storage::replication::ReplicaState::MAYBE_BEHIND) {
// Database <-> replica might be behind, check and recover
client->TryCheckReplicaStateAsync(db_acc->storage(), db_acc);
client.TryCheckReplicaStateAsync(db_acc->storage(), db_acc);
}
});
});
@ -165,9 +165,8 @@ void StartReplicaClient(replication::ReplicationClient &client, dbms::DbmsHandle
}
#ifdef MG_ENTERPRISE
ReplicationHandler::ReplicationHandler(memgraph::replication::ReplicationState &repl_state,
memgraph::dbms::DbmsHandler &dbms_handler, memgraph::system::System &system,
memgraph::auth::SynchedAuth &auth)
ReplicationHandler::ReplicationHandler(replication::ReplicationState &repl_state, dbms::DbmsHandler &dbms_handler,
system::System &system, auth::SynchedAuth &auth)
: repl_state_{repl_state}, dbms_handler_{dbms_handler}, system_{system}, auth_{auth} {
RecoverReplication(repl_state_, system_, dbms_handler_, auth_);
}
@ -179,20 +178,20 @@ ReplicationHandler::ReplicationHandler(replication::ReplicationState &repl_state
#endif
bool ReplicationHandler::SetReplicationRoleMain() {
auto const main_handler = [](memgraph::replication::RoleMainData &) {
auto const main_handler = [](replication::RoleMainData &) {
// If we are already MAIN, we don't want to change anything
return false;
};
auto const replica_handler = [this](memgraph::replication::RoleReplicaData const &) {
auto const replica_handler = [this](replication::RoleReplicaData const &) {
return DoReplicaToMainPromotion(utils::UUID{});
};
// TODO: under lock
return std::visit(memgraph::utils::Overloaded{main_handler, replica_handler}, repl_state_.ReplicationData());
return std::visit(utils::Overloaded{main_handler, replica_handler}, repl_state_.ReplicationData());
}
bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
bool ReplicationHandler::SetReplicationRoleReplica(const replication::ReplicationServerConfig &config,
const std::optional<utils::UUID> &main_uuid) {
return SetReplicationRoleReplica_<true>(config, main_uuid);
}
@ -238,18 +237,16 @@ auto ReplicationHandler::RegisterReplica(const memgraph::replication::Replicatio
return RegisterReplica_<false>(config);
}
auto ReplicationHandler::UnregisterReplica(std::string_view name) -> memgraph::query::UnregisterReplicaResult {
auto const replica_handler =
[](memgraph::replication::RoleReplicaData const &) -> memgraph::query::UnregisterReplicaResult {
return memgraph::query::UnregisterReplicaResult::NOT_MAIN;
auto ReplicationHandler::UnregisterReplica(std::string_view name) -> query::UnregisterReplicaResult {
auto const replica_handler = [](replication::RoleReplicaData const &) -> query::UnregisterReplicaResult {
return query::UnregisterReplicaResult::NOT_MAIN;
};
auto const main_handler =
[this, name](memgraph::replication::RoleMainData &mainData) -> memgraph::query::UnregisterReplicaResult {
auto const main_handler = [this, name](replication::RoleMainData &mainData) -> query::UnregisterReplicaResult {
if (!repl_state_.TryPersistUnregisterReplica(name)) {
return memgraph::query::UnregisterReplicaResult::COULD_NOT_BE_PERSISTED;
return query::UnregisterReplicaResult::COULD_NOT_BE_PERSISTED;
}
// Remove database specific clients
dbms_handler_.ForEach([name](memgraph::dbms::DatabaseAccess db_acc) {
dbms_handler_.ForEach([name](dbms::DatabaseAccess db_acc) {
db_acc->storage()->repl_storage_state_.replication_clients_.WithLock([&name](auto &clients) {
std::erase_if(clients, [name](const auto &client) { return client->Name() == name; });
});
@ -257,14 +254,14 @@ auto ReplicationHandler::UnregisterReplica(std::string_view name) -> memgraph::q
// Remove instance level clients
auto const n_unregistered =
std::erase_if(mainData.registered_replicas_, [name](auto const &client) { return client.name_ == name; });
return n_unregistered != 0 ? memgraph::query::UnregisterReplicaResult::SUCCESS
: memgraph::query::UnregisterReplicaResult::CAN_NOT_UNREGISTER;
return n_unregistered != 0 ? query::UnregisterReplicaResult::SUCCESS
: query::UnregisterReplicaResult::CAN_NOT_UNREGISTER;
};
return std::visit(memgraph::utils::Overloaded{main_handler, replica_handler}, repl_state_.ReplicationData());
return std::visit(utils::Overloaded{main_handler, replica_handler}, repl_state_.ReplicationData());
}
auto ReplicationHandler::GetRole() const -> memgraph::replication_coordination_glue::ReplicationRole {
auto ReplicationHandler::GetRole() const -> replication_coordination_glue::ReplicationRole {
return repl_state_.GetRole();
}
@ -275,10 +272,57 @@ auto ReplicationHandler::GetReplicaUUID() -> std::optional<utils::UUID> {
auto ReplicationHandler::GetReplState() const -> const memgraph::replication::ReplicationState & { return repl_state_; }
auto ReplicationHandler::GetReplState() -> memgraph::replication::ReplicationState & { return repl_state_; }
auto ReplicationHandler::GetReplState() -> replication::ReplicationState & { return repl_state_; }
bool ReplicationHandler::IsMain() const { return repl_state_.IsMain(); }
bool ReplicationHandler::IsReplica() const { return repl_state_.IsReplica(); }
auto ReplicationHandler::ShowReplicas() const -> utils::BasicResult<query::ShowReplicaError, query::ReplicasInfos> {
using res_t = utils::BasicResult<query::ShowReplicaError, query::ReplicasInfos>;
auto main = [this](RoleMainData const &main) -> res_t {
auto entries = std::vector<query::ReplicasInfo>{};
entries.reserve(main.registered_replicas_.size());
const bool full_info = license::global_license_checker.IsEnterpriseValidFast();
for (auto const &replica : main.registered_replicas_) {
// STEP 1: data_info
auto data_info = std::map<std::string, query::ReplicaInfoState>{};
this->dbms_handler_.ForEach([&](dbms::DatabaseAccess db_acc) {
auto *storage = db_acc->storage();
// ATM we only support IN_MEMORY_TRANSACTIONAL
if (storage->storage_mode_ != storage::StorageMode::IN_MEMORY_TRANSACTIONAL) return;
if (!full_info && storage->name() == dbms::kDefaultDB) return;
auto ok =
storage->repl_storage_state_.WithClient(replica.name_, [&](storage::ReplicationStorageClient &client) {
auto ts_info = client.GetTimestampInfo(storage);
auto state = client.State();
data_info.emplace(storage->name(),
query::ReplicaInfoState{ts_info.current_timestamp_of_replica,
ts_info.current_number_of_timestamp_behind_main, state});
});
DMG_ASSERT(ok);
});
// STEP 2: system_info
#ifdef MG_ENTERPRISE
// Already locked on system transaction via the interpreter
const auto ts = system_.LastCommittedSystemTimestamp();
// NOTE: no system behind at the moment
query::ReplicaSystemInfoState system_info{ts, 0 /* behind ts not implemented */, *replica.state_.ReadLock()};
#else
query::ReplicaSystemInfoState system_info{};
#endif
// STEP 3: add entry
entries.emplace_back(replica.name_, replica.rpc_client_.Endpoint().SocketAddress(), replica.mode_, system_info,
std::move(data_info));
}
return query::ReplicasInfos{std::move(entries)};
};
auto replica = [](RoleReplicaData const &) -> res_t { return query::ShowReplicaError::NOT_MAIN; };
return std::visit(utils::Overloaded{main, replica}, repl_state_.ReplicationData());
}
} // namespace memgraph::replication

View File

@ -825,7 +825,7 @@ uint64_t DiskStorage::GetDiskSpaceUsage() const {
durability_disk_storage_size;
}
StorageInfo DiskStorage::GetBaseInfo(bool /* unused */) {
StorageInfo DiskStorage::GetBaseInfo() {
StorageInfo info{};
info.vertex_count = vertex_count_;
info.edge_count = edge_count_.load(std::memory_order_acquire);
@ -838,9 +838,8 @@ StorageInfo DiskStorage::GetBaseInfo(bool /* unused */) {
return info;
}
StorageInfo DiskStorage::GetInfo(bool force_dir,
memgraph::replication_coordination_glue::ReplicationRole replication_role) {
StorageInfo info = GetBaseInfo(force_dir);
StorageInfo DiskStorage::GetInfo(memgraph::replication_coordination_glue::ReplicationRole replication_role) {
StorageInfo info = GetBaseInfo();
{
auto access = Access(replication_role);
const auto &lbl = access->ListAllIndices();

View File

@ -307,9 +307,8 @@ class DiskStorage final : public Storage {
std::vector<std::pair<std::string, std::string>> SerializeVerticesForLabelPropertyIndex(LabelId label,
PropertyId property);
StorageInfo GetBaseInfo(bool force_directory) override;
StorageInfo GetInfo(bool force_directory,
memgraph::replication_coordination_glue::ReplicationRole replication_role) override;
StorageInfo GetBaseInfo() override;
StorageInfo GetInfo(memgraph::replication_coordination_glue::ReplicationRole replication_role) override;
void FreeMemory(std::unique_lock<utils::ResourceLock> /*lock*/) override {}

View File

@ -11,6 +11,7 @@
#include "storage/v2/inmemory/storage.hpp"
#include <algorithm>
#include <filesystem>
#include <functional>
#include <optional>
#include "dbms/constants.hpp"
@ -1758,7 +1759,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
template void InMemoryStorage::CollectGarbage<true>(std::unique_lock<utils::ResourceLock>);
template void InMemoryStorage::CollectGarbage<false>(std::unique_lock<utils::ResourceLock>);
StorageInfo InMemoryStorage::GetBaseInfo(bool force_directory) {
StorageInfo InMemoryStorage::GetBaseInfo() {
StorageInfo info{};
info.vertex_count = vertices_.size();
info.edge_count = edge_count_.load(std::memory_order_acquire);
@ -1769,27 +1770,23 @@ StorageInfo InMemoryStorage::GetBaseInfo(bool force_directory) {
info.memory_res = utils::GetMemoryRES();
// Special case for the default database
auto update_path = [&](const std::filesystem::path &dir) {
if (!force_directory && std::filesystem::is_directory(dir) && dir.has_filename()) {
const auto end = dir.end();
auto it = end;
--it;
if (it != end) {
--it;
if (it != end && *it != "databases") {
// Default DB points to the root (for back-compatibility); update to the "database" dir
return dir / dbms::kMultiTenantDir / dbms::kDefaultDB;
}
#ifdef MG_ENTERPRISE
if (config_.salient.name == dbms::kDefaultDB) {
// Default DB points to the root (for back-compatibility); update to the "database" dir
std::filesystem::path new_dir = dir / "databases" / dbms::kDefaultDB;
if (std::filesystem::exists(new_dir) && std::filesystem::is_directory(new_dir)) {
return new_dir;
}
}
#endif
return dir;
};
info.disk_usage = utils::GetDirDiskUsage<false>(update_path(config_.durability.storage_directory));
return info;
}
StorageInfo InMemoryStorage::GetInfo(bool force_directory,
memgraph::replication_coordination_glue::ReplicationRole replication_role) {
StorageInfo info = GetBaseInfo(force_directory);
StorageInfo InMemoryStorage::GetInfo(memgraph::replication_coordination_glue::ReplicationRole replication_role) {
StorageInfo info = GetBaseInfo();
{
auto access = Access(replication_role); // TODO: override isolation level?
const auto &lbl = access->ListAllIndices();

View File

@ -368,9 +368,8 @@ class InMemoryStorage final : public Storage {
bool InitializeWalFile(memgraph::replication::ReplicationEpoch &epoch);
void FinalizeWalFile();
StorageInfo GetBaseInfo(bool force_directory) override;
StorageInfo GetInfo(bool force_directory,
memgraph::replication_coordination_glue::ReplicationRole replication_role) override;
StorageInfo GetBaseInfo() override;
StorageInfo GetInfo(memgraph::replication_coordination_glue::ReplicationRole replication_role) override;
/// Return true in all cases excepted if any sync replicas have not sent confirmation.
[[nodiscard]] bool AppendToWal(const Transaction &transaction, uint64_t final_commit_timestamp,

View File

@ -26,7 +26,7 @@ namespace memgraph::storage {
struct TimestampInfo {
uint64_t current_timestamp_of_replica;
uint64_t current_number_of_timestamp_behind_master;
uint64_t current_number_of_timestamp_behind_main;
};
struct ReplicaInfo {

View File

@ -95,7 +95,7 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
TimestampInfo ReplicationStorageClient::GetTimestampInfo(Storage const *storage) {
TimestampInfo info;
info.current_timestamp_of_replica = 0;
info.current_number_of_timestamp_behind_master = 0;
info.current_number_of_timestamp_behind_main = 0;
try {
auto stream{client_.rpc_client_.Stream<replication::TimestampRpc>(main_uuid_, storage->uuid())};
@ -104,9 +104,9 @@ TimestampInfo ReplicationStorageClient::GetTimestampInfo(Storage const *storage)
auto main_time_stamp = storage->repl_storage_state_.last_commit_timestamp_.load();
info.current_timestamp_of_replica = response.current_commit_timestamp;
info.current_number_of_timestamp_behind_master = response.current_commit_timestamp - main_time_stamp;
info.current_number_of_timestamp_behind_main = response.current_commit_timestamp - main_time_stamp;
if (!is_success || info.current_number_of_timestamp_behind_master != 0) {
if (!is_success || info.current_number_of_timestamp_behind_main != 0) {
replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::MAYBE_BEHIND; });
LogRpcFailure();
}

View File

@ -63,7 +63,7 @@ struct ReplicationStorageState {
return replication_clients_.WithLock([replica_name, cb = std::forward<F>(callback)](auto &clients) {
for (const auto &client : clients) {
if (client->Name() == replica_name) {
cb(client.get());
cb(*client);
return true;
}
}

View File

@ -359,18 +359,9 @@ class Storage {
utils::BasicResult<SetIsolationLevelError> SetIsolationLevel(IsolationLevel isolation_level);
IsolationLevel GetIsolationLevel() const noexcept;
virtual StorageInfo GetBaseInfo(bool force_directory) = 0;
StorageInfo GetBaseInfo() {
#if MG_ENTERPRISE
const bool force_dir = false;
#else
const bool force_dir = true; //!< Use the configured directory (multi-tenancy reroutes to another dir)
#endif
return GetBaseInfo(force_dir);
}
virtual StorageInfo GetBaseInfo() = 0;
virtual StorageInfo GetInfo(bool force_directory,
memgraph::replication_coordination_glue::ReplicationRole replication_role) = 0;
virtual StorageInfo GetInfo(memgraph::replication_coordination_glue::ReplicationRole replication_role) = 0;
virtual Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode,
memgraph::replication_coordination_glue::ReplicationRole replication_role) = 0;

View File

@ -34,7 +34,7 @@ struct System {
if (!system_unique.try_lock_for(try_time)) {
return std::nullopt;
}
return Transaction{state_, std::move(system_unique), timestamp_++};
return Transaction{state_, std::move(system_unique), ++timestamp_};
}
// TODO: this and LastCommittedSystemTimestamp maybe not needed
@ -46,7 +46,7 @@ struct System {
private:
State state_;
std::timed_mutex mtx_{};
std::uint64_t timestamp_{};
std::uint64_t timestamp_{0};
};
} // namespace memgraph::system

View File

@ -1,7 +1,7 @@
#!/bin/bash
# Old v1 tests
run_v1.sh
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cd "$DIR"
# New tests
pushd () { command pushd "$@" > /dev/null; }
@ -15,8 +15,9 @@ function wait_for_server {
sleep 1
}
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cd "$DIR"
# Old v1 tests
tests_v1="$DIR/run_v1.sh"
$tests_v1
# Create a temporary directory.
tmpdir=/tmp/memgraph_drivers

View File

@ -312,8 +312,20 @@ def test_unregister_replicas(kill_instance):
]
expected_replicas = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
@ -330,7 +342,13 @@ def test_unregister_replicas(kill_instance):
]
expected_replicas = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
@ -406,7 +424,13 @@ def test_unregister_main():
]
expected_replicas = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
main_cursor = connect(host="localhost", port=7687).cursor()

View File

@ -17,7 +17,7 @@ import tempfile
import interactive_mg_runner
import pytest
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert
from mg_utils import mg_sleep_and_assert, mg_sleep_and_assert_collection
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
@ -123,11 +123,23 @@ def test_distributed_automatic_failover():
main_cursor = connect(host="localhost", port=7689).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
assert actual_data_on_main == sorted(expected_data_on_main)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
@ -152,18 +164,42 @@ def test_distributed_automatic_failover():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "invalid"},
{"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main_old_alive = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
if __name__ == "__main__":

View File

@ -67,7 +67,13 @@ def test_replication_works_on_failover():
# 2
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("shared_replica", "127.0.0.1:10001", "sync", 0, 0, "ready"),
(
"shared_replica",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
@ -82,8 +88,20 @@ def test_replication_works_on_failover():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("replica", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("shared_replica", "127.0.0.1:10001", "sync", 0, 0, "ready"),
(
"replica",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"shared_replica",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)

View File

@ -16,7 +16,7 @@ import tempfile
import interactive_mg_runner
import pytest
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert
from mg_utils import mg_sleep_and_assert, mg_sleep_and_assert_collection
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
@ -106,8 +106,20 @@ def test_replication_works_on_failover():
# 2
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
@ -135,17 +147,41 @@ def test_replication_works_on_failover():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "invalid"},
{"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas)
# 5
execute_and_fetch_all(new_main_cursor, "CREATE ();")
@ -173,8 +209,20 @@ def test_replication_works_on_replica_instance_restart():
# 2
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
@ -193,16 +241,28 @@ def test_replication_works_on_replica_instance_restart():
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
mg_sleep_and_assert_collection(expected_data_on_coord, retrieve_data_show_repl_cluster)
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}},
),
]
mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
# 4
instance_1_cursor = connect(host="localhost", port=7688).cursor()
@ -217,10 +277,22 @@ def test_replication_works_on_replica_instance_restart():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 2, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 2, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}},
),
]
mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
# 5.
@ -241,10 +313,22 @@ def test_replication_works_on_replica_instance_restart():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 2, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 2, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 2, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 2, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
# 6.
instance_2_cursor = connect(port=7689, host="localhost").cursor()
@ -313,11 +397,23 @@ def test_simple_automatic_failover():
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
assert actual_data_on_main == sorted(expected_data_on_main)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
@ -340,18 +436,42 @@ def test_simple_automatic_failover():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "invalid"},
{"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main_old_alive = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
def test_registering_replica_fails_name_exists():

View File

@ -15,3 +15,21 @@ def mg_sleep_and_assert(expected_value, function_to_retrieve_data, max_duration=
result = function_to_retrieve_data()
return result
def mg_sleep_and_assert_collection(
expected_value, function_to_retrieve_data, max_duration=20, time_between_attempt=0.2
):
result = function_to_retrieve_data()
start_time = time.time()
while len(result) != len(expected_value) or any((x not in result for x in expected_value)):
duration = time.time() - start_time
if duration > max_duration:
assert (
False
), f" mg_sleep_and_assert has tried for too long and did not get the expected result! Last result was: {result}"
time.sleep(time_between_attempt)
result = function_to_retrieve_data()
return result

View File

@ -14,7 +14,7 @@ import time
import pytest
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
from mg_utils import mg_sleep_and_assert_collection
# BUGFIX: for issue https://github.com/memgraph/memgraph/issues/1515
@ -28,28 +28,52 @@ def test_replication_handles_delete_when_multiple_edges_of_same_type(connection)
conn = connection(7687, "main")
conn.autocommit = True
cursor = conn.cursor()
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;")
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "async", 0, 0, "ready"),
}
assert actual_data == expected_data
expected_data = [
(
"replica_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
"127.0.0.1:10002",
"async",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
assert all([x in actual_data for x in expected_data])
# 1/
execute_and_fetch_all(cursor, "CREATE (a)-[r:X]->(b) CREATE (a)-[:X]->(b) DELETE r;")
# 2/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 2, 0, "ready"),
("replica_2", "127.0.0.1:10002", "async", 2, 0, "ready"),
}
expected_data = [
(
"replica_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 2, "behind": 0, "status": "ready"}},
),
(
"replica_2",
"127.0.0.1:10002",
"async",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 2, "behind": 0, "status": "ready"}},
),
]
def retrieve_data():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
return execute_and_fetch_all(cursor, "SHOW REPLICAS;")
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
actual_data = mg_sleep_and_assert_collection(expected_data, retrieve_data)
assert all([x in actual_data for x in expected_data])
if __name__ == "__main__":

View File

@ -10,12 +10,11 @@
# licenses/APL.txt.
import sys
import pytest
import time
import pytest
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
from mg_utils import mg_sleep_and_assert_collection
@pytest.mark.parametrize(
@ -31,25 +30,42 @@ def test_show_replication_role(port, role, connection):
def test_show_replicas(connection):
cursor = connection(7687, "main").cursor()
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;")
expected_column_names = {
"name",
"socket_address",
"sync_mode",
"current_timestamp_of_replica",
"number_of_timestamp_behind_master",
"state",
"system_info",
"data_info",
}
actual_column_names = {x.name for x in cursor.description}
assert actual_column_names == expected_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
}
assert actual_data == expected_data
expected_data = [
(
"replica_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_3",
"127.0.0.1:10003",
"async",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
assert all([x in actual_data for x in expected_data])
def test_show_replicas_while_inserting_data(connection):
@ -62,49 +78,108 @@ def test_show_replicas_while_inserting_data(connection):
# 0/
cursor = connection(7687, "main").cursor()
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;")
expected_column_names = {
"name",
"socket_address",
"sync_mode",
"current_timestamp_of_replica",
"number_of_timestamp_behind_master",
"state",
"system_info",
"data_info",
}
actual_column_names = {x.name for x in cursor.description}
assert actual_column_names == expected_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
}
assert actual_data == expected_data
expected_data = [
(
"replica_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_3",
"127.0.0.1:10003",
"async",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
assert all([x in actual_data for x in expected_data])
# 1/
execute_and_fetch_all(cursor, "CREATE (n1:Number {name: 'forty_two', value:42});")
# 2/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 4, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 4, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 4, 0, "ready"),
}
expected_data = [
(
"replica_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 4, "behind": 0, "status": "ready"}},
),
(
"replica_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 4, "behind": 0, "status": "ready"}},
),
(
"replica_3",
"127.0.0.1:10003",
"async",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 4, "behind": 0, "status": "ready"}},
),
]
def retrieve_data():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
return execute_and_fetch_all(cursor, "SHOW REPLICAS;")
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
actual_data = mg_sleep_and_assert_collection(expected_data, retrieve_data)
assert all([x in actual_data for x in expected_data])
# 3/
res = execute_and_fetch_all(cursor, "MATCH (node) return node;")
assert len(res) == 1
# 4/
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert actual_data == expected_data
expected_data = [
(
"replica_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 4, "behind": 0, "status": "ready"}},
),
(
"replica_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 4, "behind": 0, "status": "ready"}},
),
(
"replica_3",
"127.0.0.1:10003",
"async",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 4, "behind": 0, "status": "ready"}},
),
]
actual_data = execute_and_fetch_all(cursor, "SHOW REPLICAS;")
assert all([x in actual_data for x in expected_data])
if __name__ == "__main__":

File diff suppressed because it is too large Load Diff

View File

@ -22,7 +22,7 @@ import interactive_mg_runner
import mgclient
import pytest
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
from mg_utils import mg_sleep_and_assert, mg_sleep_and_assert_collection
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
@ -35,6 +35,10 @@ BOLT_PORTS = {"main": 7687, "replica_1": 7688, "replica_2": 7689}
REPLICATION_PORTS = {"replica_1": 10001, "replica_2": 10002}
def set_eq(actual, expected):
return len(actual) == len(expected) and all([x in actual for x in expected])
def create_memgraph_instances_with_role_recovery(data_directory: Any) -> Dict[str, Any]:
return {
"replica_1": {
@ -174,10 +178,9 @@ def setup_main(main_cursor):
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'B'});")
def show_replicas_func(cursor, db_name):
def show_replicas_func(cursor):
def func():
execute_and_fetch_all(cursor, f"USE DATABASE {db_name};")
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
return execute_and_fetch_all(cursor, "SHOW REPLICAS;")
return func
@ -271,17 +274,31 @@ def test_manual_databases_create_multitenancy_replication(connection):
execute_and_fetch_all(cursor, "CREATE ()-[:EDGE]->();")
# 2/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "A"))
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "B"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 2, "behind": None, "status": "ready"},
{
"A": {"ts": 1, "behind": 0, "status": "ready"},
"B": {"ts": 1, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 2, "behind": None, "status": "ready"},
{
"A": {"ts": 1, "behind": 0, "status": "ready"},
"B": {"ts": 1, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(cursor))
cursor_replica = connection(BOLT_PORTS["replica_1"], "replica").cursor()
assert get_number_of_nodes_func(cursor_replica, "A")() == 1
@ -523,11 +540,23 @@ def test_manual_databases_create_multitenancy_replication_main_behind(connection
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
# 2/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 3, "behind": None, "status": "ready"},
{"A": {"ts": 0, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 3, "behind": None, "status": "ready"},
{"A": {"ts": 0, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(main_cursor))
databases_on_main = show_databases_func(main_cursor)()
@ -567,17 +596,31 @@ def test_automatic_databases_create_multitenancy_replication(connection):
execute_and_fetch_all(main_cursor, "CREATE (:Node)-[:EDGE]->(:Node)")
# 3/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 0, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 0, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(main_cursor))
cursor_replica = connection(BOLT_PORTS["replica_1"], "replica").cursor()
assert get_number_of_nodes_func(cursor_replica, "A")() == 7
@ -640,19 +683,26 @@ def test_automatic_databases_multitenancy_replication_predefined(connection):
execute_and_fetch_all(cursor, "CREATE ()-[:EDGE]->();")
# 2/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "A"))
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "B"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 2, "behind": None, "status": "ready"},
{
"A": {"ts": 1, "behind": 0, "status": "ready"},
"B": {"ts": 1, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(cursor))
cursor_replica = connection(BOLT_PORTS["replica_1"], "replica").cursor()
assert get_number_of_nodes_func(cursor_replica, "A")() == 1
assert get_number_of_edges_func(cursor_replica, "A")() == 0
assert get_number_of_nodes_func(cursor_replica, "B")() == 2
assert get_number_of_edges_func(cursor_replica, "B")() == 1
def test_automatic_databases_create_multitenancy_replication_dirty_main(connection):
@ -698,10 +748,16 @@ def test_automatic_databases_create_multitenancy_replication_dirty_main(connecti
cursor = connection(BOLT_PORTS["main"], "main").cursor()
# 1/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(cursor, "A"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 1, "behind": None, "status": "ready"},
{"A": {"ts": 1, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(cursor))
cursor_replica = connection(BOLT_PORTS["replica_1"], "replica").cursor()
execute_and_fetch_all(cursor_replica, "USE DATABASE A;")
@ -740,31 +796,85 @@ def test_multitenancy_replication_restart_replica_w_fc(connection, replica_name)
time.sleep(3) # In order for the frequent check to run
# Check that the FC did invalidate
expected_data = {
"replica_1": {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "invalid"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
},
"replica_2": {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "invalid"),
},
"replica_1": [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 0, "behind": 0, "status": "invalid"},
"B": {"ts": 0, "behind": 0, "status": "invalid"},
"memgraph": {"ts": 0, "behind": 0, "status": "invalid"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
],
"replica_2": [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 0, "behind": 0, "status": "invalid"},
"B": {"ts": 0, "behind": 0, "status": "invalid"},
"memgraph": {"ts": 0, "behind": 0, "status": "invalid"},
},
),
],
}
assert expected_data[replica_name] == show_replicas_func(main_cursor, "A")()
assert set_eq(expected_data[replica_name], show_replicas_func(main_cursor)())
# Restart
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, replica_name)
# 4/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 3, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 3, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(main_cursor))
cursor_replica = connection(BOLT_PORTS[replica_name], "replica").cursor()
@ -805,19 +915,33 @@ def test_multitenancy_replication_restart_replica_wo_fc(connection, replica_name
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, replica_name)
# 4/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(main_cursor))
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 3, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 3, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
cursor_replica = connection(BOLT_PORTS[replica_name], "replica").cursor()
cursor_replica = connection(BOLT_PORTS[replica_name], replica_name).cursor()
assert get_number_of_nodes_func(cursor_replica, "A")() == 7
assert get_number_of_edges_func(cursor_replica, "A")() == 3
assert get_number_of_nodes_func(cursor_replica, "B")() == 2
@ -899,17 +1023,28 @@ def test_multitenancy_replication_drop_replica(connection, replica_name):
)
# 4/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 7, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 7, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 3, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 3, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{
"A": {"ts": 7, "behind": 0, "status": "ready"},
"B": {"ts": 3, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
]
cursor_replica = connection(BOLT_PORTS[replica_name], "replica").cursor()
assert get_number_of_nodes_func(cursor_replica, "A")() == 7
@ -993,17 +1128,31 @@ def test_automatic_databases_drop_multitenancy_replication(connection):
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'A'});")
# 3/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 1, "behind": 0, "status": "ready"},
"B": {"ts": 0, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 4, "behind": None, "status": "ready"},
{
"A": {"ts": 1, "behind": 0, "status": "ready"},
"B": {"ts": 0, "behind": 0, "status": "ready"},
"memgraph": {"ts": 0, "behind": 0, "status": "ready"},
},
),
]
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor))
# 4/
execute_and_fetch_all(main_cursor, "USE DATABASE memgraph;")
@ -1088,11 +1237,23 @@ def test_multitenancy_drop_while_replica_using(connection):
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'A'});")
# 3/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 3, "behind": None, "status": "ready"},
{"A": {"ts": 1, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 3, "behind": None, "status": "ready"},
{"A": {"ts": 1, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(main_cursor))
# 4/
replica1_cursor = connection(BOLT_PORTS["replica_1"], "replica").cursor()
@ -1110,11 +1271,23 @@ def test_multitenancy_drop_while_replica_using(connection):
execute_and_fetch_all(main_cursor, "CREATE DATABASE B;")
execute_and_fetch_all(main_cursor, "USE DATABASE B;")
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "B"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 8, "behind": None, "status": "ready"},
{"B": {"ts": 0, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 8, "behind": None, "status": "ready"},
{"B": {"ts": 0, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor))
# 6/
assert execute_and_fetch_all(replica1_cursor, "MATCH(n) RETURN count(*);")[0][0] == 1
@ -1163,11 +1336,23 @@ def test_multitenancy_drop_and_recreate_while_replica_using(connection):
execute_and_fetch_all(main_cursor, "CREATE (:Node{on:'A'});")
# 3/
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 1, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 1, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 3, "behind": None, "status": "ready"},
{"A": {"ts": 1, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 3, "behind": None, "status": "ready"},
{"A": {"ts": 1, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert_collection(expected_data, show_replicas_func(main_cursor))
# 4/
replica1_cursor = connection(BOLT_PORTS["replica_1"], "replica").cursor()
@ -1184,11 +1369,23 @@ def test_multitenancy_drop_and_recreate_while_replica_using(connection):
execute_and_fetch_all(main_cursor, "CREATE DATABASE A;")
execute_and_fetch_all(main_cursor, "USE DATABASE A;")
expected_data = {
("replica_1", f"127.0.0.1:{REPLICATION_PORTS['replica_1']}", "sync", 0, 0, "ready"),
("replica_2", f"127.0.0.1:{REPLICATION_PORTS['replica_2']}", "async", 0, 0, "ready"),
}
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor, "A"))
expected_data = [
(
"replica_1",
f"127.0.0.1:{REPLICATION_PORTS['replica_1']}",
"sync",
{"ts": 8, "behind": None, "status": "ready"},
{"A": {"ts": 0, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"replica_2",
f"127.0.0.1:{REPLICATION_PORTS['replica_2']}",
"async",
{"ts": 8, "behind": None, "status": "ready"},
{"A": {"ts": 0, "behind": 0, "status": "ready"}, "memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert(expected_data, show_replicas_func(main_cursor))
# 6/
assert execute_and_fetch_all(replica1_cursor, "MATCH(n) RETURN count(*);")[0][0] == 1

View File

@ -15,6 +15,7 @@
#include <optional>
#include "dbms/database.hpp"
#include "dbms/dbms_handler.hpp"
#include "disk_test_utils.hpp"
#include "query/interpret/awesome_memgraph_functions.hpp"
#include "query/interpreter_context.hpp"
@ -30,15 +31,31 @@ using namespace memgraph::storage;
constexpr auto testSuite = "database_v2_get_info";
const std::filesystem::path storage_directory{std::filesystem::temp_directory_path() / testSuite};
template <typename StorageType>
struct TestConfig {};
struct DefaultConfig : TestConfig {};
struct TenantConfig : TestConfig {};
template <typename TestType>
class InfoTest : public testing::Test {
using StorageType = typename TestType::first_type;
using ConfigType = typename TestType::second_type;
protected:
void SetUp() {
repl_state.emplace(memgraph::storage::ReplicationStateRootPath(config));
db_gk.emplace(config, *repl_state);
auto db_acc_opt = db_gk->access();
MG_ASSERT(db_acc_opt, "Failed to access db");
auto &db_acc = *db_acc_opt;
repl_state_.emplace(ReplicationStateRootPath(config));
#ifdef MG_ENTERPRISE
dbms_handler_.emplace(config, *repl_state_, auth_, false);
auto db_acc = dbms_handler_->Get(); // Default db
if (std::is_same_v<ConfigType, TenantConfig>) {
constexpr std::string_view db_name = "test_db";
MG_ASSERT(dbms_handler_->New(std::string{db_name}).HasValue(), "Failed to create database.");
db_acc = dbms_handler_->Get(db_name);
}
#else
dbms_handler_.emplace(config, *repl_state_);
auto db_acc = dbms_handler_->Get();
#endif
MG_ASSERT(db_acc, "Failed to access db");
MG_ASSERT(db_acc->GetStorageMode() == (std::is_same_v<StorageType, memgraph::storage::DiskStorage>
? memgraph::storage::StorageMode::ON_DISK_TRANSACTIONAL
: memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL),
@ -48,8 +65,8 @@ class InfoTest : public testing::Test {
void TearDown() {
db_acc_.reset();
db_gk.reset();
repl_state.reset();
dbms_handler_.reset();
repl_state_.reset();
if (std::is_same<StorageType, memgraph::storage::DiskStorage>::value) {
disk_test_utils::RemoveRocksDbDirs(testSuite);
}
@ -59,9 +76,9 @@ class InfoTest : public testing::Test {
StorageMode mode{std::is_same_v<StorageType, DiskStorage> ? StorageMode::ON_DISK_TRANSACTIONAL
: StorageMode::IN_MEMORY_TRANSACTIONAL};
std::optional<memgraph::replication::ReplicationState> repl_state;
std::optional<memgraph::dbms::DatabaseAccess> db_acc_;
std::optional<memgraph::utils::Gatekeeper<memgraph::dbms::Database>> db_gk;
#ifdef MG_ENTERPRISE
memgraph::auth::SynchedAuth auth_{storage_directory, memgraph::auth::Auth::Config {}};
#endif
memgraph::storage::Config config{
[&]() {
memgraph::storage::Config config{};
@ -69,18 +86,27 @@ class InfoTest : public testing::Test {
config.durability.snapshot_wal_mode =
memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL;
if constexpr (std::is_same_v<StorageType, memgraph::storage::DiskStorage>) {
config.disk = disk_test_utils::GenerateOnDiskConfig(testSuite).disk;
config.force_on_disk = true;
}
return config;
}() // iile
};
std::optional<memgraph::replication::ReplicationState> repl_state_;
std::optional<memgraph::dbms::DbmsHandler> dbms_handler_;
std::optional<memgraph::dbms::DatabaseAccess> db_acc_;
};
using StorageTypes = ::testing::Types<memgraph::storage::InMemoryStorage, memgraph::storage::DiskStorage>;
using TestTypes = ::testing::Types<std::pair<memgraph::storage::InMemoryStorage, DefaultConfig>,
std::pair<memgraph::storage::DiskStorage, DefaultConfig>
TYPED_TEST_CASE(InfoTest, StorageTypes);
// TYPED_TEST_CASE(IndexTest, InMemoryStorageType);
#ifdef MG_ENTERPRISE
,
std::pair<memgraph::storage::InMemoryStorage, TenantConfig>,
std::pair<memgraph::storage::DiskStorage, TenantConfig>
#endif
>;
TYPED_TEST_CASE(InfoTest, TestTypes);
// NOLINTNEXTLINE(hicpp-special-member-functions)
TYPED_TEST(InfoTest, InfoCheck) {
@ -166,13 +192,13 @@ TYPED_TEST(InfoTest, InfoCheck) {
}
const auto &info = db_acc->GetInfo(
true, memgraph::replication_coordination_glue::ReplicationRole::MAIN); // force to use configured directory
memgraph::replication_coordination_glue::ReplicationRole::MAIN); // force to use configured directory
ASSERT_EQ(info.storage_info.vertex_count, 5);
ASSERT_EQ(info.storage_info.edge_count, 2);
ASSERT_EQ(info.storage_info.average_degree, 0.8);
ASSERT_GT(info.storage_info.memory_res, 10'000'000); // 200MB < > 10MB
ASSERT_LT(info.storage_info.memory_res, 200'000'000);
ASSERT_GT(info.storage_info.memory_res, 10'000'000); // 250MB < > 10MB
ASSERT_LT(info.storage_info.memory_res, 250'000'000);
ASSERT_GT(info.storage_info.disk_usage, 100); // 1MB < > 100B
ASSERT_LT(info.storage_info.disk_usage, 1000'000);
ASSERT_EQ(info.storage_info.label_indices, 1);

View File

@ -13,12 +13,11 @@
#include <gtest/gtest.h>
#include <filesystem>
#include "disk_test_utils.hpp"
#include "dbms/constants.hpp"
#include "storage/v2/disk/storage.hpp"
#include "storage/v2/inmemory/storage.hpp"
#include "storage/v2/isolation_level.hpp"
#include "storage/v2/storage.hpp"
#include "storage/v2/storage_error.hpp"
// NOLINTNEXTLINE(google-build-using-namespace)
using namespace memgraph::storage;
@ -31,6 +30,7 @@ class InfoTest : public testing::Test {
protected:
void SetUp() override {
std::filesystem::remove_all(storage_directory);
config_.salient.name = memgraph::dbms::kDefaultDB;
memgraph::storage::UpdatePaths(config_, storage_directory);
config_.durability.snapshot_wal_mode =
memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL;
@ -135,7 +135,7 @@ TYPED_TEST(InfoTest, InfoCheck) {
ASSERT_FALSE(unique_acc->Commit().HasError());
}
StorageInfo info = this->storage->GetInfo(true, ReplicationRole::MAIN); // force to use configured directory
StorageInfo info = this->storage->GetInfo(ReplicationRole::MAIN);
ASSERT_EQ(info.vertex_count, 5);
ASSERT_EQ(info.edge_count, 2);