Enable replication in community (#104)

* Enable replication in community
This commit is contained in:
antonio2368 2021-03-10 10:36:38 +01:00 committed by GitHub
parent a9f5f45b3d
commit 16715d5005
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 24 additions and 121 deletions

View File

@ -3,6 +3,10 @@
## Future
TODO: Don't forget to add items on the fly.
### Major Feature and Improvements
* Added replication to community version.
### Bug Fixes
* Fixed garbage collector by correctly marking the oldest current timestamp

View File

@ -10,11 +10,11 @@ add_subdirectory(telemetry)
add_subdirectory(communication)
add_subdirectory(storage/v2)
add_subdirectory(query)
add_subdirectory(slk)
add_subdirectory(rpc)
if (MG_ENTERPRISE)
add_subdirectory(audit)
add_subdirectory(auth)
add_subdirectory(slk)
add_subdirectory(rpc)
endif()
string(TOLOWER ${CMAKE_BUILD_TYPE} lower_build_type)

View File

@ -172,7 +172,6 @@ TypedValue EvaluateOptionalExpression(Expression *expression, ExpressionEvaluato
return expression ? expression->Accept(*eval) : TypedValue();
}
#ifdef MG_ENTERPRISE
class ReplQueryHandler final : public query::ReplicationQueryHandler {
public:
explicit ReplQueryHandler(storage::Storage *db) : db_(db) {}
@ -290,37 +289,6 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
};
/// returns false if the replication role can't be set
/// @throw QueryRuntimeException if an error ocurred.
#else
class NoReplicationInCommunity : public query::QueryRuntimeException {
public:
NoReplicationInCommunity()
: query::QueryRuntimeException::QueryRuntimeException("Replication is not supported in Memgraph Community!") {}
};
class ReplQueryHandler : public query::ReplicationQueryHandler {
public:
// Dummy ctor - just there to make the replication query handler work
// in both community and enterprise versions.
explicit ReplQueryHandler(storage::Storage *db) {}
void SetReplicationRole(ReplicationQuery::ReplicationRole replication_role, std::optional<int64_t> port) override {
throw NoReplicationInCommunity();
}
ReplicationQuery::ReplicationRole ShowReplicationRole() const override { throw NoReplicationInCommunity(); }
void RegisterReplica(const std::string &name, const std::string &socket_address,
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout) {
throw NoReplicationInCommunity();
}
void DropReplica(const std::string &replica_name) override { throw NoReplicationInCommunity(); }
using Replica = ReplicationQueryHandler::Replica;
std::vector<Replica> ShowReplicas() const override { throw NoReplicationInCommunity(); }
};
#endif
Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Parameters &parameters,
DbAccessor *db_accessor) {
@ -1494,14 +1462,12 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
UpdateTypeCount(rw_type);
#ifdef MG_ENTERPRISE
if (const auto query_type = query_execution->prepared_query->rw_type;
interpreter_context_->db->GetReplicationRole() == storage::ReplicationRole::REPLICA &&
(query_type == RWType::W || query_type == RWType::RW)) {
query_execution = nullptr;
throw QueryException("Write query forbidden on the replica!");
}
#endif
return {query_execution->prepared_query->header, query_execution->prepared_query->privileges, qid};
} catch (const utils::BasicException &) {

View File

@ -11,26 +11,26 @@ set(storage_v2_src_files
vertex_accessor.cpp
storage.cpp)
if(MG_ENTERPRISE)
define_add_lcp(add_lcp_storage lcp_storage_cpp_files generated_lcp_storage_files)
##### Replication #####
add_lcp_storage(replication/rpc.lcp SLK_SERIALIZE)
define_add_lcp(add_lcp_storage lcp_storage_cpp_files generated_lcp_storage_files)
add_custom_target(generate_lcp_storage DEPENDS ${generated_lcp_storage_files})
add_lcp_storage(replication/rpc.lcp SLK_SERIALIZE)
set(storage_v2_src_files
${storage_v2_src_files}
replication/replication_client.cpp
replication/replication_server.cpp
replication/serialization.cpp
replication/slk.cpp
${lcp_storage_cpp_files})
endif()
add_custom_target(generate_lcp_storage DEPENDS ${generated_lcp_storage_files})
set(storage_v2_src_files
${storage_v2_src_files}
replication/replication_client.cpp
replication/replication_server.cpp
replication/serialization.cpp
replication/slk.cpp
${lcp_storage_cpp_files})
#######################
add_library(mg-storage-v2 STATIC ${storage_v2_src_files})
target_link_libraries(mg-storage-v2 Threads::Threads mg-utils gflags)
if(MG_ENTERPRISE)
add_dependencies(mg-storage-v2 generate_lcp_storage)
target_link_libraries(mg-storage-v2 mg-rpc mg-slk)
endif()
add_dependencies(mg-storage-v2 generate_lcp_storage)
target_link_libraries(mg-storage-v2 mg-rpc mg-slk)

View File

@ -24,11 +24,10 @@
#include "utils/stat.hpp"
#include "utils/uuid.hpp"
#ifdef MG_ENTERPRISE
/// REPLICATION ///
#include "storage/v2/replication/replication_client.hpp"
#include "storage/v2/replication/replication_server.hpp"
#include "storage/v2/replication/rpc.hpp"
#endif
namespace storage {
@ -322,11 +321,9 @@ Storage::Storage(Config config)
vertex_id_ = info->next_vertex_id;
edge_id_ = info->next_edge_id;
timestamp_ = std::max(timestamp_, info->next_timestamp);
#if MG_ENTERPRISE
if (info->last_commit_timestamp) {
last_commit_timestamp_ = *info->last_commit_timestamp;
}
#endif
}
} else if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED ||
config_.durability.snapshot_on_exit) {
@ -375,13 +372,11 @@ Storage::~Storage() {
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
gc_runner_.Stop();
}
#ifdef MG_ENTERPRISE
{
// Clear replication data
replication_server_.reset();
replication_clients_.WithLock([&](auto &clients) { clients.clear(); });
}
#endif
if (wal_file_) {
wal_file_->FinalizeWal();
wal_file_ = std::nullopt;
@ -430,7 +425,6 @@ VertexAccessor Storage::Accessor::CreateVertex() {
return VertexAccessor(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_);
}
#ifdef MG_ENTERPRISE
VertexAccessor Storage::Accessor::CreateVertex(storage::Gid gid) {
// NOTE: When we update the next `vertex_id_` here we perform a RMW
// (read-modify-write) operation that ISN'T atomic! But, that isn't an issue
@ -448,7 +442,6 @@ VertexAccessor Storage::Accessor::CreateVertex(storage::Gid gid) {
delta->prev.Set(&*it);
return VertexAccessor(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_);
}
#endif
std::optional<VertexAccessor> Storage::Accessor::FindVertex(Gid gid, View view) {
auto acc = storage_->vertices_.access();
@ -592,7 +585,6 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
&storage_->constraints_, config_);
}
#ifdef MG_ENTERPRISE
Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type,
storage::Gid gid) {
MG_ASSERT(from->transaction_ == to->transaction_,
@ -659,7 +651,6 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_,
&storage_->constraints_, config_);
}
#endif
Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) {
MG_ASSERT(edge->transaction_ == &transaction_,
@ -837,16 +828,11 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit(
// written before actually committing the transaction (before setting
// the commit timestamp) so that no other transaction can see the
// modifications before they are written to disk.
#ifdef MG_ENTERPRISE
// Replica can log only the write transaction received from Main
// so the Wal files are consistent
if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) {
storage_->AppendToWal(transaction_, commit_timestamp);
}
#else
storage_->AppendToWal(transaction_, commit_timestamp);
#endif
// Take committed_transactions lock while holding the engine lock to
// make sure that committed transactions are sorted by the commit
@ -856,14 +842,12 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit(
// of the commit timestamp
MG_ASSERT(transaction_.commit_timestamp != nullptr, "Invalid database state!");
transaction_.commit_timestamp->store(commit_timestamp, std::memory_order_release);
#ifdef MG_ENTERPRISE
// Replica can only update the last commit timestamp with
// the commits received from main.
if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) {
// Update the last commit timestamp
storage_->last_commit_timestamp_.store(commit_timestamp);
}
#endif
// Release engine lock because we don't have to hold it anymore
// and emplace back could take a long time.
engine_guard.unlock();
@ -1076,9 +1060,7 @@ bool Storage::CreateIndex(LabelId label, const std::optional<uint64_t> desired_c
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_CREATE, label, {}, commit_timestamp);
commit_log_->MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return true;
}
@ -1088,9 +1070,7 @@ bool Storage::CreateIndex(LabelId label, PropertyId property, const std::optiona
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE, label, {property}, commit_timestamp);
commit_log_->MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return true;
}
@ -1100,9 +1080,7 @@ bool Storage::DropIndex(LabelId label, const std::optional<uint64_t> desired_com
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_DROP, label, {}, commit_timestamp);
commit_log_->MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return true;
}
@ -1114,9 +1092,7 @@ bool Storage::DropIndex(LabelId label, PropertyId property, const std::optional<
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP, label, {property}, commit_timestamp);
commit_log_->MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return true;
}
@ -1133,9 +1109,7 @@ utils::BasicResult<ConstraintViolation, bool> Storage::CreateExistenceConstraint
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE, label, {property}, commit_timestamp);
commit_log_->MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return true;
}
@ -1146,9 +1120,7 @@ bool Storage::DropExistenceConstraint(LabelId label, PropertyId property,
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP, label, {property}, commit_timestamp);
commit_log_->MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return true;
}
@ -1162,9 +1134,7 @@ utils::BasicResult<ConstraintViolation, UniqueConstraints::CreationStatus> Stora
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE, label, properties, commit_timestamp);
commit_log_->MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return UniqueConstraints::CreationStatus::SUCCESS;
}
@ -1178,9 +1148,7 @@ UniqueConstraints::DeletionStatus Storage::DropUniqueConstraint(
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP, label, properties, commit_timestamp);
commit_log_->MarkFinished(commit_timestamp);
#ifdef MG_ENTERPRISE
last_commit_timestamp_ = commit_timestamp;
#endif
return UniqueConstraints::DeletionStatus::SUCCESS;
}
@ -1231,7 +1199,6 @@ Transaction Storage::CreateTransaction() {
{
std::lock_guard<utils::SpinLock> guard(engine_lock_);
transaction_id = transaction_id_++;
#ifdef MG_ENTERPRISE
// Replica should have only read queries and the write queries
// can come from main instance with any past timestamp.
// To preserve snapshot isolation we set the start timestamp
@ -1243,9 +1210,6 @@ Transaction Storage::CreateTransaction() {
} else {
start_timestamp = timestamp_++;
}
#else
start_timestamp = timestamp_++;
#endif
}
return {transaction_id, start_timestamp};
}
@ -1505,7 +1469,6 @@ void Storage::AppendToWal(const Transaction &transaction, uint64_t final_commit_
// A single transaction will always be contained in a single WAL file.
auto current_commit_timestamp = transaction.commit_timestamp->load(std::memory_order_acquire);
#ifdef MG_ENTERPRISE
if (replication_role_.load() == ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
@ -1513,7 +1476,6 @@ void Storage::AppendToWal(const Transaction &transaction, uint64_t final_commit_
}
});
}
#endif
// Helper lambda that traverses the delta chain on order to find the first
// delta that should be processed and then appends all discovered deltas.
@ -1526,14 +1488,12 @@ void Storage::AppendToWal(const Transaction &transaction, uint64_t final_commit_
while (true) {
if (filter(delta->action)) {
wal_file_->AppendDelta(*delta, parent, final_commit_timestamp);
#ifdef MG_ENTERPRISE
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->IfStreamingTransaction(
[&](auto &stream) { stream.AppendDelta(*delta, parent, final_commit_timestamp); });
}
});
#endif
}
auto prev = delta->prev.Get();
if (prev.type != PreviousPtr::Type::DELTA) break;
@ -1667,21 +1627,18 @@ void Storage::AppendToWal(const Transaction &transaction, uint64_t final_commit_
FinalizeWalFile();
#ifdef MG_ENTERPRISE
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(final_commit_timestamp); });
client->FinalizeTransactionReplication();
}
});
#endif
}
void Storage::AppendToWal(durability::StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties, uint64_t final_commit_timestamp) {
if (!InitializeWalFile()) return;
wal_file_->AppendOperation(operation, label, properties, final_commit_timestamp);
#ifdef MG_ENTERPRISE
{
if (replication_role_.load() == ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
@ -1694,17 +1651,14 @@ void Storage::AppendToWal(durability::StorageGlobalOperation operation, LabelId
});
}
}
#endif
FinalizeWalFile();
}
void Storage::CreateSnapshot() {
#ifdef MG_ENTERPRISE
if (replication_role_.load() != ReplicationRole::MAIN) {
spdlog::warn("Snapshots are disabled for replicas!");
return;
}
#endif
// Take master RW lock (for reading).
std::shared_lock<utils::RWLock> storage_guard(main_lock_);
@ -1742,19 +1696,14 @@ bool Storage::UnlockPath() {
}
uint64_t Storage::CommitTimestamp(const std::optional<uint64_t> desired_commit_timestamp) {
#ifdef MG_ENTERPRISE
if (!desired_commit_timestamp) {
return timestamp_++;
} else {
timestamp_ = std::max(timestamp_, *desired_commit_timestamp + 1);
return *desired_commit_timestamp;
}
#else
return timestamp_++;
#endif
}
#ifdef MG_ENTERPRISE
bool Storage::SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) {
// We don't want to restart the server if we're already a REPLICA
if (replication_role_ == ReplicationRole::REPLICA) {
@ -1862,6 +1811,5 @@ std::vector<Storage::ReplicaInfo> Storage::ReplicasInfo() {
return replica_info;
});
}
#endif
} // namespace storage

View File

@ -27,13 +27,12 @@
#include "utils/synchronized.hpp"
#include "utils/uuid.hpp"
#ifdef MG_ENTERPRISE
/// REPLICATION ///
#include "rpc/server.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/replication/serialization.hpp"
#endif
namespace storage {
@ -169,9 +168,7 @@ struct StorageInfo {
uint64_t disk_usage;
};
#ifdef MG_ENTERPRISE
enum class ReplicationRole : uint8_t { MAIN, REPLICA };
#endif
class Storage final {
public:
@ -303,13 +300,11 @@ class Storage final {
void Abort();
private:
#ifdef MG_ENTERPRISE
/// @throw std::bad_alloc
VertexAccessor CreateVertex(storage::Gid gid);
/// @throw std::bad_alloc
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, storage::Gid gid);
#endif
Storage *storage_;
std::shared_lock<utils::RWLock> storage_guard_;
@ -389,7 +384,6 @@ class Storage final {
bool LockPath();
bool UnlockPath();
#if MG_ENTERPRISE
bool SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config = {});
bool SetMainReplicationRole();
@ -417,7 +411,6 @@ class Storage final {
};
std::vector<ReplicaInfo> ReplicasInfo();
#endif
private:
Transaction CreateTransaction();
@ -437,9 +430,6 @@ class Storage final {
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
#ifdef MG_ENTERPRISE
#endif
// Main storage lock.
//
// Accessors take a shared lock when starting, so it is possible to block
@ -535,8 +525,6 @@ class Storage final {
// Global locker that is used for clients file locking
utils::FileRetainer::FileLocker global_locker_;
// Replication
#ifdef MG_ENTERPRISE
// Last commited timestamp
std::atomic<uint64_t> last_commit_timestamp_{kTimestampInitialId};
@ -558,7 +546,6 @@ class Storage final {
ReplicationClientList replication_clients_;
std::atomic<ReplicationRole> replication_role_{ReplicationRole::MAIN};
#endif
};
} // namespace storage

View File

@ -264,10 +264,8 @@ target_link_libraries(${test_prefix}storage_v2_property_store mg-storage-v2 fmt)
add_unit_test(storage_v2_wal_file.cpp)
target_link_libraries(${test_prefix}storage_v2_wal_file mg-storage-v2 fmt)
if (MG_ENTERPRISE)
add_unit_test(storage_v2_replication.cpp)
target_link_libraries(${test_prefix}storage_v2_replication mg-storage-v2 fmt)
endif()
# Test mg-auth