Durability improvements (#1385)

This commit is contained in:
Andi 2023-11-07 11:37:54 +01:00 committed by GitHub
parent f4b97fc03d
commit 66487a6dce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 354 additions and 334 deletions

View File

@ -36,12 +36,11 @@ inline std::unique_ptr<storage::Storage> CreateInMemoryStorage(
// Connect replication state and storage
storage->CreateSnapshotHandler(
[storage = storage.get(),
&repl_state](bool is_periodic) -> utils::BasicResult<storage::InMemoryStorage::CreateSnapshotError> {
[storage = storage.get(), &repl_state]() -> utils::BasicResult<storage::InMemoryStorage::CreateSnapshotError> {
if (repl_state.IsReplica()) {
return storage::InMemoryStorage::CreateSnapshotError::DisabledForReplica;
}
return storage->CreateSnapshot(is_periodic);
return storage->CreateSnapshot();
});
if (allow_mt_repl || name == dbms::kDefaultDB) {

View File

@ -2750,7 +2750,8 @@ PreparedQuery PrepareStorageModeQuery(ParsedQuery parsed_query, const bool in_ex
"transactions using 'SHOW TRANSACTIONS' query and ensure no other transactions are active.");
}
callback = [requested_mode, storage = db_acc->storage()]() -> std::function<void()> {
callback = [requested_mode,
storage = static_cast<storage::InMemoryStorage *>(db_acc->storage())]() -> std::function<void()> {
// SetStorageMode will probably be handled at the Database level
return [storage, requested_mode] { storage->SetStorageMode(requested_mode); };
}();
@ -2813,15 +2814,11 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli
std::move(parsed_query.required_privileges),
[storage](AnyStream * /*stream*/, std::optional<int> /*n*/) -> std::optional<QueryHandlerResult> {
auto *mem_storage = static_cast<storage::InMemoryStorage *>(storage);
if (auto maybe_error = mem_storage->CreateSnapshot(false); maybe_error.HasError()) {
if (auto maybe_error = mem_storage->CreateSnapshot(); maybe_error.HasError()) {
switch (maybe_error.GetError()) {
case storage::InMemoryStorage::CreateSnapshotError::DisabledForReplica:
throw utils::BasicException(
"Failed to create a snapshot. Replica instances are not allowed to create them.");
case storage::InMemoryStorage::CreateSnapshotError::DisabledForAnalyticsPeriodicCommit:
spdlog::warn(utils::MessageWithLink("Periodic snapshots are disabled for analytical mode.",
"https://memgr.ph/replication"));
break;
case storage::InMemoryStorage::CreateSnapshotError::ReachedMaxNumTries:
spdlog::warn("Failed to create snapshot. Reached max number of tries. Please contact support");
break;

File diff suppressed because it is too large Load Diff

View File

@ -129,7 +129,7 @@ InMemoryStorage::~InMemoryStorage() {
snapshot_runner_.Stop();
}
if (config_.durability.snapshot_on_exit && this->create_snapshot_handler) {
create_snapshot_handler(false);
create_snapshot_handler();
}
committed_transactions_.WithLock([](auto &transactions) { transactions.clear(); });
}
@ -1166,6 +1166,25 @@ Transaction InMemoryStorage::CreateTransaction(IsolationLevel isolation_level, S
return {transaction_id, start_timestamp, isolation_level, storage_mode, false};
}
void InMemoryStorage::SetStorageMode(StorageMode new_storage_mode) {
std::unique_lock main_guard{main_lock_};
MG_ASSERT(
(storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL || storage_mode_ == StorageMode::IN_MEMORY_TRANSACTIONAL) &&
(new_storage_mode == StorageMode::IN_MEMORY_ANALYTICAL ||
new_storage_mode == StorageMode::IN_MEMORY_TRANSACTIONAL));
if (storage_mode_ != new_storage_mode) {
if (new_storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
snapshot_runner_.Stop();
} else if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) {
snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval,
[this]() { this->create_snapshot_handler(); });
}
storage_mode_ = new_storage_mode;
FreeMemory(std::move(main_guard));
}
}
template <bool force>
void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_guard) {
// NOTE: You do not need to consider cleanup of deleted object that occurred in
@ -1854,7 +1873,7 @@ void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOpera
return AppendToWalDataDefinition(operation, label, {}, {}, final_commit_timestamp);
}
utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::CreateSnapshot(bool is_periodic) {
utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::CreateSnapshot() {
auto const &epoch = repl_storage_state_.epoch_;
auto snapshot_creator = [this, &epoch]() {
utils::Timer timer;
@ -1882,9 +1901,6 @@ utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::Create
} else {
std::unique_lock main_guard{main_lock_};
if (storage_mode_ == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) {
if (is_periodic) {
return CreateSnapshotError::DisabledForAnalyticsPeriodicCommit;
}
snapshot_creator();
return {};
}
@ -1969,17 +1985,13 @@ std::unique_ptr<Storage::Accessor> InMemoryStorage::UniqueAccess(std::optional<I
}
void InMemoryStorage::CreateSnapshotHandler(
std::function<utils::BasicResult<InMemoryStorage::CreateSnapshotError>(bool)> cb) {
create_snapshot_handler = [cb](bool is_periodic) {
if (auto maybe_error = cb(is_periodic); maybe_error.HasError()) {
std::function<utils::BasicResult<InMemoryStorage::CreateSnapshotError>()> cb) {
create_snapshot_handler = [cb]() {
if (auto maybe_error = cb(); maybe_error.HasError()) {
switch (maybe_error.GetError()) {
case CreateSnapshotError::DisabledForReplica:
spdlog::warn(utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication"));
break;
case CreateSnapshotError::DisabledForAnalyticsPeriodicCommit:
spdlog::warn(utils::MessageWithLink("Periodic snapshots are disabled for analytical mode.",
"https://memgr.ph/durability"));
break;
case CreateSnapshotError::ReachedMaxNumTries:
spdlog::warn("Failed to create snapshot. Reached max number of tries. Please contact support");
break;
@ -1990,7 +2002,7 @@ void InMemoryStorage::CreateSnapshotHandler(
// Run the snapshot thread (if enabled)
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) {
snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval,
[this]() { this->create_snapshot_handler(true); });
[this]() { this->create_snapshot_handler(); });
}
}
IndicesInfo InMemoryStorage::InMemoryAccessor::ListAllIndices() const {

View File

@ -47,11 +47,7 @@ class InMemoryStorage final : public Storage {
friend class InMemoryReplicationClient;
public:
enum class CreateSnapshotError : uint8_t {
DisabledForReplica,
DisabledForAnalyticsPeriodicCommit,
ReachedMaxNumTries
};
enum class CreateSnapshotError : uint8_t { DisabledForReplica, ReachedMaxNumTries };
/// @throw std::system_error
/// @throw std::bad_alloc
@ -329,9 +325,9 @@ class InMemoryStorage final : public Storage {
utils::FileRetainer::FileLockerAccessor::ret_type LockPath();
utils::FileRetainer::FileLockerAccessor::ret_type UnlockPath();
utils::BasicResult<InMemoryStorage::CreateSnapshotError> CreateSnapshot(bool is_periodic = false);
utils::BasicResult<InMemoryStorage::CreateSnapshotError> CreateSnapshot();
void CreateSnapshotHandler(std::function<utils::BasicResult<InMemoryStorage::CreateSnapshotError>(bool)> cb);
void CreateSnapshotHandler(std::function<utils::BasicResult<InMemoryStorage::CreateSnapshotError>()> cb);
using Storage::CreateTransaction;
Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode, bool is_main) override;
@ -340,6 +336,8 @@ class InMemoryStorage final : public Storage {
const memgraph::replication::ReplicationEpoch *current_epoch)
-> std::unique_ptr<ReplicationClient> override;
void SetStorageMode(StorageMode storage_mode);
private:
/// The force parameter determines the behaviour of the garbage collector.
/// If it's set to true, it will behave as a global operation, i.e. it can't
@ -454,7 +452,7 @@ class InMemoryStorage final : public Storage {
std::atomic<bool> gc_full_scan_edges_delete_ = false;
// Moved the create snapshot to a user defined handler so we can remove the global replication state from the storage
std::function<void(bool)> create_snapshot_handler{};
std::function<void()> create_snapshot_handler{};
};
} // namespace memgraph::storage

View File

@ -85,18 +85,6 @@ Storage::Accessor::Accessor(Accessor &&other) noexcept
other.commit_timestamp_.reset();
}
/// Main lock is taken by the caller.
void Storage::SetStorageMode(StorageMode storage_mode) {
std::unique_lock main_guard{main_lock_};
MG_ASSERT(
(storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL || storage_mode_ == StorageMode::IN_MEMORY_TRANSACTIONAL) &&
(storage_mode == StorageMode::IN_MEMORY_ANALYTICAL || storage_mode == StorageMode::IN_MEMORY_TRANSACTIONAL));
if (storage_mode_ != storage_mode) {
storage_mode_ = storage_mode;
FreeMemory(std::move(main_guard));
}
}
StorageMode Storage::GetStorageMode() const { return storage_mode_; }
IsolationLevel Storage::GetIsolationLevel() const noexcept { return isolation_level_; }

View File

@ -301,8 +301,6 @@ class Storage {
return EdgeTypeId::FromUint(name_id_mapper_->NameToId(name));
}
void SetStorageMode(StorageMode storage_mode);
StorageMode GetStorageMode() const;
virtual void FreeMemory(std::unique_lock<utils::ResourceLock> main_guard) = 0;

View File

@ -519,7 +519,7 @@ void OutputFile::FlushBufferInternal() {
auto *buffer = buffer_;
auto buffer_position = buffer_position_.load();
while (buffer_position > 0) {
auto written = write(fd_, buffer, buffer_position_);
auto written = write(fd_, buffer, buffer_position);
if (written == -1 && errno == EINTR) {
continue;
}

View File

@ -43,7 +43,7 @@ TEST_P(StorageModeTest, Mode) {
std::make_unique<memgraph::storage::InMemoryStorage>(memgraph::storage::Config{
.transaction{.isolation_level = memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION}});
storage->SetStorageMode(storage_mode);
static_cast<memgraph::storage::InMemoryStorage *>(storage.get())->SetStorageMode(storage_mode);
auto creator = storage->Access();
auto other_analytics_mode_reader = storage->Access();