Forbid two replicas to point to the same ip port (#406)
This commit is contained in:
parent
41d4185156
commit
589e0e098b
@ -1886,13 +1886,22 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
|
||||
MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can register a replica!");
|
||||
|
||||
const bool name_exists = replication_clients_.WithLock([&](auto &clients) {
|
||||
return std::any_of(clients.begin(), clients.end(), [&](auto &client) { return client->Name() == name; });
|
||||
return std::any_of(clients.begin(), clients.end(), [&name](const auto &client) { return client->Name() == name; });
|
||||
});
|
||||
|
||||
if (name_exists) {
|
||||
return RegisterReplicaError::NAME_EXISTS;
|
||||
}
|
||||
|
||||
const auto end_point_exists = replication_clients_.WithLock([&endpoint](auto &clients) {
|
||||
return std::any_of(clients.begin(), clients.end(),
|
||||
[&endpoint](const auto &client) { return client->Endpoint() == endpoint; });
|
||||
});
|
||||
|
||||
if (end_point_exists) {
|
||||
return RegisterReplicaError::END_POINT_EXISTS;
|
||||
}
|
||||
|
||||
MG_ASSERT(replication_mode == replication::ReplicationMode::SYNC || !config.timeout,
|
||||
"Only SYNC mode can have a timeout set");
|
||||
|
||||
@ -1905,10 +1914,15 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
|
||||
// Another thread could have added a client with same name while
|
||||
// we were connecting to this client.
|
||||
if (std::any_of(clients.begin(), clients.end(),
|
||||
[&](auto &other_client) { return client->Name() == other_client->Name(); })) {
|
||||
[&](const auto &other_client) { return client->Name() == other_client->Name(); })) {
|
||||
return RegisterReplicaError::NAME_EXISTS;
|
||||
}
|
||||
|
||||
if (std::any_of(clients.begin(), clients.end(),
|
||||
[&client](const auto &other_client) { return client->Endpoint() == other_client->Endpoint(); })) {
|
||||
return RegisterReplicaError::END_POINT_EXISTS;
|
||||
}
|
||||
|
||||
clients.push_back(std::move(client));
|
||||
return {};
|
||||
});
|
||||
|
@ -411,7 +411,7 @@ class Storage final {
|
||||
|
||||
bool SetMainReplicationRole();
|
||||
|
||||
enum class RegisterReplicaError : uint8_t { NAME_EXISTS, CONNECTION_FAILED };
|
||||
enum class RegisterReplicaError : uint8_t { NAME_EXISTS, END_POINT_EXISTS, CONNECTION_FAILED };
|
||||
|
||||
/// @pre The instance should have a MAIN role
|
||||
/// @pre Timeout can only be set for SYNC replication
|
||||
|
@ -32,6 +32,13 @@ class ReplicationTest : public ::testing::Test {
|
||||
|
||||
void TearDown() override { Clear(); }
|
||||
|
||||
memgraph::storage::Config configuration{
|
||||
.items = {.properties_on_edges = true},
|
||||
.durability = {
|
||||
.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
}};
|
||||
|
||||
private:
|
||||
void Clear() {
|
||||
if (!std::filesystem::exists(storage_directory)) return;
|
||||
@ -40,19 +47,9 @@ class ReplicationTest : public ::testing::Test {
|
||||
};
|
||||
|
||||
TEST_F(ReplicationTest, BasicSynchronousReplicationTest) {
|
||||
memgraph::storage::Storage main_store(
|
||||
{.items = {.properties_on_edges = true},
|
||||
.durability = {
|
||||
.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
}});
|
||||
memgraph::storage::Storage main_store(configuration);
|
||||
|
||||
memgraph::storage::Storage replica_store(
|
||||
{.items = {.properties_on_edges = true},
|
||||
.durability = {
|
||||
.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
}});
|
||||
memgraph::storage::Storage replica_store(configuration);
|
||||
replica_store.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000});
|
||||
|
||||
ASSERT_FALSE(main_store
|
||||
@ -483,19 +480,9 @@ TEST_F(ReplicationTest, RecoveryProcess) {
|
||||
}
|
||||
|
||||
TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) {
|
||||
memgraph::storage::Storage main_store(
|
||||
{.items = {.properties_on_edges = true},
|
||||
.durability = {
|
||||
.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
}});
|
||||
memgraph::storage::Storage main_store(configuration);
|
||||
|
||||
memgraph::storage::Storage replica_store_async(
|
||||
{.items = {.properties_on_edges = true},
|
||||
.durability = {
|
||||
.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
}});
|
||||
memgraph::storage::Storage replica_store_async(configuration);
|
||||
|
||||
replica_store_async.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 20000});
|
||||
|
||||
@ -533,28 +520,13 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) {
|
||||
}
|
||||
|
||||
TEST_F(ReplicationTest, EpochTest) {
|
||||
memgraph::storage::Storage main_store(
|
||||
{.items = {.properties_on_edges = true},
|
||||
.durability = {
|
||||
.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
}});
|
||||
memgraph::storage::Storage main_store(configuration);
|
||||
|
||||
memgraph::storage::Storage replica_store1(
|
||||
{.items = {.properties_on_edges = true},
|
||||
.durability = {
|
||||
.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
}});
|
||||
memgraph::storage::Storage replica_store1(configuration);
|
||||
|
||||
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000});
|
||||
|
||||
memgraph::storage::Storage replica_store2(
|
||||
{.items = {.properties_on_edges = true},
|
||||
.durability = {
|
||||
.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
}});
|
||||
memgraph::storage::Storage replica_store2(configuration);
|
||||
|
||||
replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10001});
|
||||
|
||||
@ -639,30 +611,15 @@ TEST_F(ReplicationTest, EpochTest) {
|
||||
}
|
||||
|
||||
TEST_F(ReplicationTest, ReplicationInformation) {
|
||||
memgraph::storage::Storage main_store(
|
||||
{.items = {.properties_on_edges = true},
|
||||
.durability = {
|
||||
.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
}});
|
||||
memgraph::storage::Storage main_store(configuration);
|
||||
|
||||
memgraph::storage::Storage replica_store1(
|
||||
{.items = {.properties_on_edges = true},
|
||||
.durability = {
|
||||
.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
}});
|
||||
memgraph::storage::Storage replica_store1(configuration);
|
||||
|
||||
const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10000};
|
||||
const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001};
|
||||
replica_store1.SetReplicaRole(replica1_endpoint);
|
||||
|
||||
const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10000};
|
||||
memgraph::storage::Storage replica_store2(
|
||||
{.items = {.properties_on_edges = true},
|
||||
.durability = {
|
||||
.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
}});
|
||||
const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10002};
|
||||
memgraph::storage::Storage replica_store2(configuration);
|
||||
|
||||
replica_store2.SetReplicaRole(replica2_endpoint);
|
||||
|
||||
@ -700,3 +657,55 @@ TEST_F(ReplicationTest, ReplicationInformation) {
|
||||
ASSERT_EQ(second_info.endpoint, replica2_endpoint);
|
||||
ASSERT_EQ(second_info.state, memgraph::storage::replication::ReplicaState::READY);
|
||||
}
|
||||
|
||||
TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) {
|
||||
memgraph::storage::Storage main_store(configuration);
|
||||
|
||||
memgraph::storage::Storage replica_store1(configuration);
|
||||
|
||||
const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001};
|
||||
replica_store1.SetReplicaRole(replica1_endpoint);
|
||||
|
||||
const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10002};
|
||||
memgraph::storage::Storage replica_store2(configuration);
|
||||
|
||||
replica_store2.SetReplicaRole(replica2_endpoint);
|
||||
|
||||
const std::string replica1_name{"REPLICA1"};
|
||||
ASSERT_FALSE(main_store
|
||||
.RegisterReplica(replica1_name, replica1_endpoint,
|
||||
memgraph::storage::replication::ReplicationMode::SYNC, {.timeout = 2.0})
|
||||
.HasError());
|
||||
|
||||
const std::string replica2_name{"REPLICA1"};
|
||||
ASSERT_TRUE(
|
||||
main_store
|
||||
.RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC)
|
||||
.GetError() == memgraph::storage::Storage::RegisterReplicaError::NAME_EXISTS);
|
||||
}
|
||||
|
||||
TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
|
||||
memgraph::storage::Storage main_store(configuration);
|
||||
|
||||
memgraph::storage::Storage replica_store1(configuration);
|
||||
|
||||
const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001};
|
||||
replica_store1.SetReplicaRole(replica1_endpoint);
|
||||
|
||||
const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10001};
|
||||
memgraph::storage::Storage replica_store2(configuration);
|
||||
|
||||
replica_store2.SetReplicaRole(replica2_endpoint);
|
||||
|
||||
const std::string replica1_name{"REPLICA1"};
|
||||
ASSERT_FALSE(main_store
|
||||
.RegisterReplica(replica1_name, replica1_endpoint,
|
||||
memgraph::storage::replication::ReplicationMode::SYNC, {.timeout = 2.0})
|
||||
.HasError());
|
||||
|
||||
const std::string replica2_name{"REPLICA2"};
|
||||
ASSERT_TRUE(
|
||||
main_store
|
||||
.RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC)
|
||||
.GetError() == memgraph::storage::Storage::RegisterReplicaError::END_POINT_EXISTS);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user