Remove TX info from HA snapshot
Reviewers: msantl, ipaljak Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1984
This commit is contained in:
parent
872b043bed
commit
6182312e3d
@ -1,12 +1,5 @@
|
||||
#include "durability/single_node_ha/paths.hpp"
|
||||
|
||||
#include <filesystem>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "transactions/type.hpp"
|
||||
#include "utils/string.hpp"
|
||||
#include "utils/timestamp.hpp"
|
||||
|
||||
@ -14,36 +7,20 @@ namespace durability {
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
std::string GetSnapshotFilename(tx::TransactionId tx_id) {
|
||||
std::string date_str =
|
||||
utils::Timestamp(utils::Timestamp::Now())
|
||||
.ToString("{:04d}_{:02d}_{:02d}__{:02d}_{:02d}_{:02d}_{:05d}");
|
||||
return date_str + "_tx_" + std::to_string(tx_id);
|
||||
// This is the prefix used for WAL and Snapshot filenames. It is a timestamp
|
||||
// format that equals to: YYYYmmddHHMMSSffffff
|
||||
const std::string kTimestampFormat =
|
||||
"{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}{:06d}";
|
||||
|
||||
std::string GetSnapshotFilename(uint64_t last_included_term,
|
||||
uint64_t last_included_index) {
|
||||
std::string date_str = utils::Timestamp::Now().ToString(kTimestampFormat);
|
||||
return date_str + "_term_" + std::to_string(last_included_term) + "_index_" +
|
||||
std::to_string(last_included_index);
|
||||
}
|
||||
|
||||
fs::path MakeSnapshotPath(const fs::path &durability_dir,
|
||||
const std::string &snapshot_filename) {
|
||||
return durability_dir / kSnapshotDir / snapshot_filename;
|
||||
}
|
||||
|
||||
std::optional<tx::TransactionId> TransactionIdFromSnapshotFilename(
|
||||
const std::string &name) {
|
||||
auto nullopt = std::nullopt;
|
||||
auto file_name_split = utils::RSplit(name, "_tx_", 1);
|
||||
if (file_name_split.size() != 2) {
|
||||
LOG(WARNING) << "Unable to parse snapshot file name: " << name;
|
||||
return nullopt;
|
||||
}
|
||||
try {
|
||||
return std::stoll(file_name_split[1]);
|
||||
} catch (std::invalid_argument &) {
|
||||
LOG(WARNING) << "Unable to parse snapshot file name tx ID: "
|
||||
<< file_name_split[1];
|
||||
return nullopt;
|
||||
} catch (std::out_of_range &) {
|
||||
LOG(WARNING) << "Unable to parse snapshot file name tx ID: "
|
||||
<< file_name_split[1];
|
||||
return nullopt;
|
||||
}
|
||||
}
|
||||
} // namespace durability
|
||||
|
@ -1,26 +1,20 @@
|
||||
#pragma once
|
||||
|
||||
#include <filesystem>
|
||||
#include <optional>
|
||||
|
||||
#include "transactions/type.hpp"
|
||||
#include <string>
|
||||
|
||||
namespace durability {
|
||||
const std::string kSnapshotDir = "snapshots";
|
||||
const std::string kBackupDir = ".backup";
|
||||
|
||||
/// Generates a filename for a DB snapshot in the given folder in a well-defined
|
||||
/// sortable format with transaction from which the snapshot is created appended
|
||||
/// to the file name.
|
||||
std::string GetSnapshotFilename(tx::TransactionId tx_id);
|
||||
/// sortable format with last included term and last included index from which
|
||||
/// the snapshot is created appended to the file name.
|
||||
std::string GetSnapshotFilename(uint64_t last_included_term,
|
||||
uint64_t last_included_index);
|
||||
|
||||
/// Generates a full path for a DB snapshot.
|
||||
std::filesystem::path MakeSnapshotPath(
|
||||
const std::filesystem::path &durability_dir,
|
||||
const std::string &snapshot_filename);
|
||||
|
||||
/// Returns the transaction id contained in the file name. If the filename is
|
||||
/// not a parseable snapshot file name, nullopt is returned.
|
||||
std::optional<tx::TransactionId> TransactionIdFromSnapshotFilename(
|
||||
const std::string &name);
|
||||
} // namespace durability
|
||||
|
@ -64,15 +64,6 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db,
|
||||
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int) &&
|
||||
dv.ValueInt() == durability::kVersion);
|
||||
|
||||
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int));
|
||||
recovery_data->snapshooter_tx_id = dv.ValueInt();
|
||||
// Transaction snapshot of the transaction that created the snapshot.
|
||||
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::List));
|
||||
for (const auto &value : dv.ValueList()) {
|
||||
RETURN_IF_NOT(value.IsInt());
|
||||
recovery_data->snapshooter_tx_snapshot.emplace_back(value.ValueInt());
|
||||
}
|
||||
|
||||
// A list of label+property indexes.
|
||||
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::List));
|
||||
auto index_value = dv.ValueList();
|
||||
@ -131,16 +122,6 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db,
|
||||
return false;
|
||||
}
|
||||
|
||||
// Ensure that the next transaction ID in the recovered DB will be greater
|
||||
// than the latest one we have recovered. Do this to make sure that
|
||||
// subsequently created snapshots files will have transactional info
|
||||
// that does not interfere with that found in previous snapshots.
|
||||
tx::TransactionId max_id = recovery_data->snapshooter_tx_id;
|
||||
auto &snap = recovery_data->snapshooter_tx_snapshot;
|
||||
if (!snap.empty()) {
|
||||
max_id = std::max(max_id, *std::max_element(snap.begin(), snap.end()));
|
||||
}
|
||||
dba.db()->tx_engine().EnsureNextIdGreater(max_id);
|
||||
dba.Commit();
|
||||
return true;
|
||||
}
|
||||
|
@ -25,17 +25,9 @@ struct IndexRecoveryData {
|
||||
/// Data structure for exchanging info between main recovery function and
|
||||
/// snapshot recovery functions.
|
||||
struct RecoveryData {
|
||||
tx::TransactionId snapshooter_tx_id{0};
|
||||
std::vector<tx::TransactionId> snapshooter_tx_snapshot;
|
||||
// A collection into which the indexes should be added so they
|
||||
// can be rebuilt at the end of the recovery transaction.
|
||||
std::vector<IndexRecoveryData> indexes;
|
||||
|
||||
void Clear() {
|
||||
snapshooter_tx_id = 0;
|
||||
snapshooter_tx_snapshot.clear();
|
||||
indexes.clear();
|
||||
}
|
||||
};
|
||||
|
||||
/// Reads snapshot metadata from the end of the file without messing up the
|
||||
|
@ -17,7 +17,7 @@ namespace fs = std::filesystem;
|
||||
namespace durability {
|
||||
|
||||
// Snapshot layout is described in durability/version.hpp
|
||||
static_assert(durability::kVersion == 7,
|
||||
static_assert(durability::kVersion == 8,
|
||||
"Wrong snapshot version, please update!");
|
||||
|
||||
namespace {
|
||||
@ -32,18 +32,6 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
|
||||
durability::kSnapshotMagic.size());
|
||||
encoder.WriteInt(durability::kVersion);
|
||||
|
||||
// Write the ID of the transaction doing the snapshot.
|
||||
encoder.WriteInt(dba.transaction_id());
|
||||
|
||||
// Write the transaction snapshot into the snapshot. It's used when
|
||||
// recovering from the combination of snapshot and write-ahead-log.
|
||||
{
|
||||
std::vector<communication::bolt::Value> tx_snapshot;
|
||||
for (int64_t tx : dba.transaction().snapshot())
|
||||
tx_snapshot.emplace_back(tx);
|
||||
encoder.WriteList(tx_snapshot);
|
||||
}
|
||||
|
||||
// Write label+property indexes as list ["label", "property", ...]
|
||||
{
|
||||
std::vector<communication::bolt::Value> index_vec;
|
||||
|
@ -11,30 +11,24 @@
|
||||
|
||||
namespace durability {
|
||||
|
||||
constexpr std::array<uint8_t, 4> kSnapshotMagic{{'M', 'G', 's', 'n'}};
|
||||
constexpr std::array<uint8_t, 4> kWalMagic{{'M', 'G', 'w', 'l'}};
|
||||
constexpr std::array<uint8_t, 6> kSnapshotMagic{{'M', 'G', 'H', 'A', 's', 'n'}};
|
||||
|
||||
// The current default version of snapshot and WAL encoding / decoding.
|
||||
constexpr int64_t kVersion{7};
|
||||
constexpr int64_t kVersion{8};
|
||||
|
||||
// Snapshot format (version 7):
|
||||
// Snapshot format (version 8):
|
||||
// 1) Magic number + snapshot version
|
||||
//
|
||||
// The following two entries are required when recovering from snapshot combined
|
||||
// with WAL to determine record visibility.
|
||||
// 2) Transactional ID of the snapshooter
|
||||
// 3) Transactional snapshot of the snapshooter
|
||||
// 2) A list of label+property indices.
|
||||
//
|
||||
// 4) A list of label+property indices.
|
||||
//
|
||||
// 5) Bolt encoded nodes. Each node is written in the following format:
|
||||
// 3) Bolt encoded nodes. Each node is written in the following format:
|
||||
// * gid, labels, properties
|
||||
// 6) Bolt encoded edges. Each edge is written in the following format:
|
||||
// 4) Bolt encoded edges. Each edge is written in the following format:
|
||||
// * gid
|
||||
// * from, to
|
||||
// * edge_type
|
||||
// * properties
|
||||
//
|
||||
// 7) Snapshot summary (number of nodes, number of edges, hash)
|
||||
// 5) Snapshot summary (number of nodes, number of edges, hash)
|
||||
|
||||
} // namespace durability
|
||||
|
@ -971,8 +971,8 @@ void RaftServer::SnapshotThread() {
|
||||
auto dba = db_->Access();
|
||||
last_included_term = GetLogEntry(last_applied_).term;
|
||||
last_included_index = last_applied_;
|
||||
snapshot_filename =
|
||||
durability::GetSnapshotFilename(dba.transaction_id());
|
||||
snapshot_filename = durability::GetSnapshotFilename(
|
||||
last_included_term, last_included_index);
|
||||
|
||||
lock.unlock();
|
||||
VLOG(40) << "[LogCompaction] Creating snapshot.";
|
||||
|
@ -190,11 +190,6 @@ Transaction *Engine::RunningTransaction(TransactionId tx_id) {
|
||||
return found->second.get();
|
||||
}
|
||||
|
||||
void Engine::EnsureNextIdGreater(TransactionId tx_id) {
|
||||
std::lock_guard<utils::SpinLock> guard(lock_);
|
||||
counter_ = std::max(tx_id, counter_);
|
||||
}
|
||||
|
||||
void Engine::Reset() {
|
||||
Snapshot wait_for_txs;
|
||||
{
|
||||
|
@ -49,7 +49,6 @@ class Engine final {
|
||||
TransactionId LocalOldestActive() const;
|
||||
void LocalForEachActiveTransaction(std::function<void(Transaction &)> f);
|
||||
Transaction *RunningTransaction(TransactionId tx_id);
|
||||
void EnsureNextIdGreater(TransactionId tx_id);
|
||||
void GarbageCollectCommitLog(TransactionId tx_id);
|
||||
|
||||
auto &local_lock_graph() { return local_lock_graph_; }
|
||||
|
Loading…
Reference in New Issue
Block a user