diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index afbcaa7d8..9a00ca87c 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -100,8 +100,7 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector { .health = "unknown"}; // TODO: (andi) Get this info from RAFT and test it or when we will move // CoordinatorState to every instance, we can be smarter about this using our RPC. }; - - auto instances_status = utils::fmap(coord_instance_to_status, raft_state_.GetAllCoordinators()); + auto instances_status = utils::fmap(raft_state_.GetAllCoordinators(), coord_instance_to_status); if (raft_state_.IsLeader()) { auto const stringify_repl_role = [this](ReplicationInstance const &instance) -> std::string { diff --git a/src/dbms/inmemory/replication_handlers.cpp b/src/dbms/inmemory/replication_handlers.cpp index 8339b65b4..6a78977bb 100644 --- a/src/dbms/inmemory/replication_handlers.cpp +++ b/src/dbms/inmemory/replication_handlers.cpp @@ -118,9 +118,14 @@ void InMemoryReplicationHandlers::Register(dbms::DbmsHandler *dbms_handler, repl }); server.rpc_server_.Register( [&data, dbms_handler](auto *req_reader, auto *res_builder) { - spdlog::debug("Received SwapMainUUIDHandler"); + spdlog::debug("Received SwapMainUUIDRpc"); InMemoryReplicationHandlers::SwapMainUUIDHandler(dbms_handler, data, req_reader, res_builder); }); + server.rpc_server_.Register( + [&data, dbms_handler](auto *req_reader, auto *res_builder) { + spdlog::debug("Received ForceResetStorageRpc"); + InMemoryReplicationHandlers::ForceResetStorageHandler(dbms_handler, data.uuid_, req_reader, res_builder); + }); } void InMemoryReplicationHandlers::SwapMainUUIDHandler(dbms::DbmsHandler *dbms_handler, @@ -329,6 +334,78 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle spdlog::debug("Replication recovery from snapshot finished!"); } +void InMemoryReplicationHandlers::ForceResetStorageHandler(dbms::DbmsHandler *dbms_handler, + const std::optional ¤t_main_uuid, + slk::Reader *req_reader, slk::Builder *res_builder) { + storage::replication::ForceResetStorageReq req; + slk::Load(&req, req_reader); + auto db_acc = GetDatabaseAccessor(dbms_handler, req.db_uuid); + if (!db_acc) { + storage::replication::ForceResetStorageRes res{false, 0}; + slk::Save(res, res_builder); + return; + } + if (!current_main_uuid.has_value() || req.main_uuid != current_main_uuid) [[unlikely]] { + LogWrongMain(current_main_uuid, req.main_uuid, storage::replication::SnapshotReq::kType.name); + storage::replication::ForceResetStorageRes res{false, 0}; + slk::Save(res, res_builder); + return; + } + + storage::replication::Decoder decoder(req_reader); + + auto *storage = static_cast(db_acc->get()->storage()); + + auto storage_guard = std::unique_lock{storage->main_lock_}; + + // Clear the database + storage->vertices_.clear(); + storage->edges_.clear(); + storage->commit_log_.reset(); + storage->commit_log_.emplace(); + + storage->constraints_.existence_constraints_ = std::make_unique(); + storage->constraints_.unique_constraints_ = std::make_unique(); + storage->indices_.label_index_ = std::make_unique(); + storage->indices_.label_property_index_ = std::make_unique(); + + // Fine since we will force push when reading from WAL just random epoch with 0 timestamp, as it should be if it + // acted as MAIN before + storage->repl_storage_state_.epoch_.SetEpoch(std::string(utils::UUID{})); + storage->repl_storage_state_.last_commit_timestamp_ = 0; + + storage->repl_storage_state_.history.clear(); + storage->vertex_id_ = 0; + storage->edge_id_ = 0; + storage->timestamp_ = storage::kTimestampInitialId; + + storage->CollectGarbage(std::move(storage_guard), false); + storage->vertices_.run_gc(); + storage->edges_.run_gc(); + + storage::replication::ForceResetStorageRes res{true, storage->repl_storage_state_.last_commit_timestamp_.load()}; + slk::Save(res, res_builder); + + spdlog::trace("Deleting old snapshot files."); + // Delete other durability files + auto snapshot_files = storage::durability::GetSnapshotFiles(storage->recovery_.snapshot_directory_, storage->uuid_); + for (const auto &[path, uuid, _] : snapshot_files) { + spdlog::trace("Deleting snapshot file {}", path); + storage->file_retainer_.DeleteFile(path); + } + + spdlog::trace("Deleting old WAL files."); + auto wal_files = storage::durability::GetWalFiles(storage->recovery_.wal_directory_, storage->uuid_); + if (wal_files) { + for (const auto &wal_file : *wal_files) { + spdlog::trace("Deleting WAL file {}", wal_file.path); + storage->file_retainer_.DeleteFile(wal_file.path); + } + + storage->wal_file_.reset(); + } +} + void InMemoryReplicationHandlers::WalFilesHandler(dbms::DbmsHandler *dbms_handler, const std::optional ¤t_main_uuid, slk::Reader *req_reader, slk::Builder *res_builder) { diff --git a/src/dbms/inmemory/replication_handlers.hpp b/src/dbms/inmemory/replication_handlers.hpp index 4406b8338..aaa2d0755 100644 --- a/src/dbms/inmemory/replication_handlers.hpp +++ b/src/dbms/inmemory/replication_handlers.hpp @@ -48,6 +48,9 @@ class InMemoryReplicationHandlers { static void SwapMainUUIDHandler(dbms::DbmsHandler *dbms_handler, replication::RoleReplicaData &role_replica_data, slk::Reader *req_reader, slk::Builder *res_builder); + static void ForceResetStorageHandler(dbms::DbmsHandler *dbms_handler, + const std::optional ¤t_main_uuid, slk::Reader *req_reader, + slk::Builder *res_builder); static void LoadWal(storage::InMemoryStorage *storage, storage::replication::Decoder *decoder); diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index f7213bed1..e6d39ab9a 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -1263,7 +1263,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param TypedValue{status.coord_socket_address}, TypedValue{status.health}, TypedValue{status.cluster_role}}; }; - return utils::fmap(converter, instances); + return utils::fmap(instances, converter); }; return callback; } diff --git a/src/replication_handler/include/replication_handler/replication_handler.hpp b/src/replication_handler/include/replication_handler/replication_handler.hpp index d5c2bfa71..e1da19bfa 100644 --- a/src/replication_handler/include/replication_handler/replication_handler.hpp +++ b/src/replication_handler/include/replication_handler/replication_handler.hpp @@ -210,8 +210,13 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { auto client = std::make_unique(*instance_client_ptr, main_uuid); client->Start(storage, std::move(db_acc)); bool const success = std::invoke([state = client->State()]() { + // We force sync replicas in other situation if (state == storage::replication::ReplicaState::DIVERGED_FROM_MAIN) { +#ifdef MG_ENTERPRISE + return FLAGS_coordinator_server_port != 0; +#else return false; +#endif } return true; }); diff --git a/src/replication_handler/replication_handler.cpp b/src/replication_handler/replication_handler.cpp index fc3dd3da4..34ccdfc99 100644 --- a/src/replication_handler/replication_handler.cpp +++ b/src/replication_handler/replication_handler.cpp @@ -271,8 +271,7 @@ auto ReplicationHandler::GetDatabasesHistories() -> replication_coordination_glu dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) { auto &repl_storage_state = db_acc->storage()->repl_storage_state_; - std::vector> history = utils::fmap( - [](const auto &elem) { return std::make_pair(elem.first, elem.second); }, repl_storage_state.history); + std::vector> history = utils::fmap(repl_storage_state.history); history.emplace_back(std::string(repl_storage_state.epoch_.id()), repl_storage_state.last_commit_timestamp_.load()); replication_coordination_glue::DatabaseHistory repl{ diff --git a/src/storage/v2/durability/durability.cpp b/src/storage/v2/durability/durability.cpp index b81357902..a83313820 100644 --- a/src/storage/v2/durability/durability.cpp +++ b/src/storage/v2/durability/durability.cpp @@ -358,7 +358,6 @@ std::optional Recovery::RecoverData(std::string *uuid, Replication spdlog::warn(utils::MessageWithLink("No snapshot or WAL file found.", "https://memgr.ph/durability")); return std::nullopt; } - // TODO(antoniofilipovic) What is the logic here? std::sort(wal_files.begin(), wal_files.end()); // UUID used for durability is the UUID of the last WAL file. // Same for the epoch id. @@ -437,17 +436,13 @@ std::optional Recovery::RecoverData(std::string *uuid, Replication last_loaded_timestamp.emplace(recovery_info.next_timestamp - 1); } - bool epoch_history_empty = epoch_history->empty(); - bool epoch_not_recorded = !epoch_history_empty && epoch_history->back().first != wal_file.epoch_id; auto last_loaded_timestamp_value = last_loaded_timestamp.value_or(0); - - if (epoch_history_empty || epoch_not_recorded) { - epoch_history->emplace_back(std::string(wal_file.epoch_id), last_loaded_timestamp_value); - } - - auto last_epoch_updated = !epoch_history_empty && epoch_history->back().first == wal_file.epoch_id && - epoch_history->back().second < last_loaded_timestamp_value; - if (last_epoch_updated) { + if (epoch_history->empty() || epoch_history->back().first != wal_file.epoch_id) { + // no history or new epoch, add it + epoch_history->emplace_back(wal_file.epoch_id, last_loaded_timestamp_value); + repl_storage_state.epoch_.SetEpoch(wal_file.epoch_id); + } else if (epoch_history->back().second < last_loaded_timestamp_value) { + // existing epoch, update with newer timestamp epoch_history->back().second = last_loaded_timestamp_value; } @@ -469,11 +464,11 @@ std::optional Recovery::RecoverData(std::string *uuid, Replication memgraph::metrics::Measure(memgraph::metrics::SnapshotRecoveryLatency_us, std::chrono::duration_cast(timer.Elapsed()).count()); - spdlog::info("Set epoch id: {} with commit timestamp {}", std::string(repl_storage_state.epoch_.id()), - repl_storage_state.last_commit_timestamp_); + spdlog::trace("Set epoch id: {} with commit timestamp {}", std::string(repl_storage_state.epoch_.id()), + repl_storage_state.last_commit_timestamp_); std::for_each(repl_storage_state.history.begin(), repl_storage_state.history.end(), [](auto &history) { - spdlog::info("epoch id: {} with commit timestamp {}", std::string(history.first), history.second); + spdlog::trace("epoch id: {} with commit timestamp {}", std::string(history.first), history.second); }); return recovery_info; } diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index 1437524d6..3a4fa9b91 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -109,7 +109,7 @@ InMemoryStorage::InMemoryStorage(Config config) timestamp_ = std::max(timestamp_, info->next_timestamp); if (info->last_commit_timestamp) { repl_storage_state_.last_commit_timestamp_ = *info->last_commit_timestamp; - spdlog::info("Recovering last commit timestamp {}", *info->last_commit_timestamp); + spdlog::trace("Recovering last commit timestamp {}", *info->last_commit_timestamp); } } } else if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED || diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index bd2c2cc7d..fb332672a 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -76,13 +76,37 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce } } if (branching_point) { - spdlog::error( - "You cannot register Replica {} to this Main because at one point " - "Replica {} acted as the Main instance. Both the Main and Replica {} " - "now hold unique data. Please resolve data conflicts and start the " - "replication on a clean instance.", - client_.name_, client_.name_, client_.name_); - replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::DIVERGED_FROM_MAIN; }); + auto replica_state = replica_state_.Lock(); + if (*replica_state == replication::ReplicaState::DIVERGED_FROM_MAIN) { + return; + } + *replica_state = replication::ReplicaState::DIVERGED_FROM_MAIN; + + auto log_error = [client_name = client_.name_]() { + spdlog::error( + "You cannot register Replica {} to this Main because at one point " + "Replica {} acted as the Main instance. Both the Main and Replica {} " + "now hold unique data. Please resolve data conflicts and start the " + "replication on a clean instance.", + client_name, client_name, client_name); + }; +#ifdef MG_ENTERPRISE + if (!FLAGS_coordinator_server_port) { + log_error(); + return; + } + client_.thread_pool_.AddTask([storage, gk = std::move(db_acc), this] { + const auto [success, timestamp] = this->ForceResetStorage(storage); + if (success) { + spdlog::info("Successfully reset storage of REPLICA {} to timestamp {}.", client_.name_, timestamp); + return; + } + spdlog::error("You cannot register REPLICA {} to this MAIN because MAIN couldn't reset REPLICA's storage.", + client_.name_); + }); +#else + log_error(); +#endif return; } @@ -265,8 +289,6 @@ void ReplicationStorageClient::RecoverReplica(uint64_t replica_commit, memgraph: spdlog::debug("Starting replica recovery"); auto *mem_storage = static_cast(storage); - // TODO(antoniofilipovic): Can we get stuck here in while loop if replica commit timestamp is not updated when using - // only snapshot while (true) { auto file_locker = mem_storage->file_retainer_.AddLocker(); @@ -335,6 +357,21 @@ void ReplicationStorageClient::RecoverReplica(uint64_t replica_commit, memgraph: } } +std::pair ReplicationStorageClient::ForceResetStorage(memgraph::storage::Storage *storage) { + utils::OnScopeExit set_to_maybe_behind{ + [this]() { replica_state_.WithLock([](auto &state) { state = replication::ReplicaState::MAYBE_BEHIND; }); }}; + try { + auto stream{client_.rpc_client_.Stream(main_uuid_, storage->uuid())}; + const auto res = stream.AwaitResponse(); + return std::pair{res.success, res.current_commit_timestamp}; + } catch (const rpc::RpcFailedException &) { + spdlog::error( + utils::MessageWithLink("Couldn't ForceReset data to {}.", client_.name_, "https://memgr.ph/replication")); + } + + return {false, 0}; +} + ////// ReplicaStream ////// ReplicaStream::ReplicaStream(Storage *storage, rpc::Client &rpc_client, const uint64_t current_seq_num, utils::UUID main_uuid) diff --git a/src/storage/v2/replication/replication_client.hpp b/src/storage/v2/replication/replication_client.hpp index 3352bab65..063501111 100644 --- a/src/storage/v2/replication/replication_client.hpp +++ b/src/storage/v2/replication/replication_client.hpp @@ -188,6 +188,13 @@ class ReplicationStorageClient { */ void UpdateReplicaState(Storage *storage, DatabaseAccessProtector db_acc); + /** + * @brief Forcefully reset storage to as it is when started from scratch. + * + * @param storage pointer to the storage associated with the client + */ + std::pair ForceResetStorage(Storage *storage); + void LogRpcFailure(); /** diff --git a/src/storage/v2/replication/rpc.cpp b/src/storage/v2/replication/rpc.cpp index f523bb5d7..71a9ca65c 100644 --- a/src/storage/v2/replication/rpc.cpp +++ b/src/storage/v2/replication/rpc.cpp @@ -59,6 +59,19 @@ void TimestampRes::Save(const TimestampRes &self, memgraph::slk::Builder *builde memgraph::slk::Save(self, builder); } void TimestampRes::Load(TimestampRes *self, memgraph::slk::Reader *reader) { memgraph::slk::Load(self, reader); } + +void ForceResetStorageReq::Save(const ForceResetStorageReq &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} +void ForceResetStorageReq::Load(ForceResetStorageReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} +void ForceResetStorageRes::Save(const ForceResetStorageRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} +void ForceResetStorageRes::Load(ForceResetStorageRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} } // namespace storage::replication constexpr utils::TypeInfo storage::replication::AppendDeltasReq::kType{utils::TypeId::REP_APPEND_DELTAS_REQ, @@ -97,6 +110,12 @@ constexpr utils::TypeInfo storage::replication::TimestampReq::kType{utils::TypeI constexpr utils::TypeInfo storage::replication::TimestampRes::kType{utils::TypeId::REP_TIMESTAMP_RES, "TimestampRes", nullptr}; +constexpr utils::TypeInfo storage::replication::ForceResetStorageReq::kType{utils::TypeId::REP_FORCE_RESET_STORAGE_REQ, + "ForceResetStorageReq", nullptr}; + +constexpr utils::TypeInfo storage::replication::ForceResetStorageRes::kType{utils::TypeId::REP_FORCE_RESET_STORAGE_RES, + "ForceResetStorageRes", nullptr}; + // Autogenerated SLK serialization code namespace slk { // Serialize code for TimestampRes @@ -255,6 +274,30 @@ void Load(memgraph::storage::replication::AppendDeltasReq *self, memgraph::slk:: memgraph::slk::Load(&self->seq_num, reader); } +// Serialize code for ForceResetStorageReq + +void Save(const memgraph::storage::replication::ForceResetStorageReq &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.main_uuid, builder); + memgraph::slk::Save(self.db_uuid, builder); +} + +void Load(memgraph::storage::replication::ForceResetStorageReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->main_uuid, reader); + memgraph::slk::Load(&self->db_uuid, reader); +} + +// Serialize code for ForceResetStorageRes + +void Save(const memgraph::storage::replication::ForceResetStorageRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.success, builder); + memgraph::slk::Save(self.current_commit_timestamp, builder); +} + +void Load(memgraph::storage::replication::ForceResetStorageRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->success, reader); + memgraph::slk::Load(&self->current_commit_timestamp, reader); +} + // Serialize SalientConfig void Save(const memgraph::storage::SalientConfig &self, memgraph::slk::Builder *builder) { diff --git a/src/storage/v2/replication/rpc.hpp b/src/storage/v2/replication/rpc.hpp index 67f98d0ae..fb19d82f2 100644 --- a/src/storage/v2/replication/rpc.hpp +++ b/src/storage/v2/replication/rpc.hpp @@ -210,6 +210,36 @@ struct TimestampRes { using TimestampRpc = rpc::RequestResponse; +struct ForceResetStorageReq { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(ForceResetStorageReq *self, memgraph::slk::Reader *reader); + static void Save(const ForceResetStorageReq &self, memgraph::slk::Builder *builder); + ForceResetStorageReq() = default; + explicit ForceResetStorageReq(const utils::UUID &main_uuid, const utils::UUID &db_uuid) + : main_uuid{main_uuid}, db_uuid{db_uuid} {} + + utils::UUID main_uuid; + utils::UUID db_uuid; +}; + +struct ForceResetStorageRes { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(ForceResetStorageRes *self, memgraph::slk::Reader *reader); + static void Save(const ForceResetStorageRes &self, memgraph::slk::Builder *builder); + ForceResetStorageRes() = default; + ForceResetStorageRes(bool success, uint64_t current_commit_timestamp) + : success(success), current_commit_timestamp(current_commit_timestamp) {} + + bool success; + uint64_t current_commit_timestamp; +}; + +using ForceResetStorageRpc = rpc::RequestResponse; + } // namespace memgraph::storage::replication // SLK serialization declarations @@ -267,4 +297,12 @@ void Save(const memgraph::storage::SalientConfig &self, memgraph::slk::Builder * void Load(memgraph::storage::SalientConfig *self, memgraph::slk::Reader *reader); +void Save(const memgraph::storage::replication::ForceResetStorageReq &self, memgraph::slk::Builder *builder); + +void Load(memgraph::storage::replication::ForceResetStorageReq *self, memgraph::slk::Reader *reader); + +void Save(const memgraph::storage::replication::ForceResetStorageRes &self, memgraph::slk::Builder *builder); + +void Load(memgraph::storage::replication::ForceResetStorageRes *self, memgraph::slk::Reader *reader); + } // namespace memgraph::slk diff --git a/src/utils/functional.hpp b/src/utils/functional.hpp index f5242944a..fe60edc5c 100644 --- a/src/utils/functional.hpp +++ b/src/utils/functional.hpp @@ -19,9 +19,9 @@ namespace memgraph::utils { template