Implement snapshots for storage v2
Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2400
This commit is contained in:
parent
3756534490
commit
3adedc0679
@ -1,6 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <filesystem>
|
||||
|
||||
namespace storage {
|
||||
|
||||
@ -17,6 +19,19 @@ struct Config {
|
||||
struct Items {
|
||||
bool properties_on_edges{true};
|
||||
} items;
|
||||
|
||||
struct Durability {
|
||||
enum class SnapshotType { NONE, PERIODIC };
|
||||
|
||||
std::filesystem::path storage_directory{"storage"};
|
||||
|
||||
bool recover_on_startup{false};
|
||||
|
||||
SnapshotType snapshot_type{SnapshotType::NONE};
|
||||
std::chrono::milliseconds snapshot_interval{std::chrono::minutes(2)};
|
||||
uint64_t snapshot_retention_count{3};
|
||||
bool snapshot_on_exit{false};
|
||||
} durability;
|
||||
};
|
||||
|
||||
} // namespace storage
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -2,15 +2,29 @@
|
||||
|
||||
#include <cstdint>
|
||||
#include <filesystem>
|
||||
#include <functional>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <type_traits>
|
||||
|
||||
#include "storage/v2/config.hpp"
|
||||
#include "storage/v2/constraints.hpp"
|
||||
#include "storage/v2/edge.hpp"
|
||||
#include "storage/v2/indices.hpp"
|
||||
#include "storage/v2/name_id_mapper.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "storage/v2/transaction.hpp"
|
||||
#include "storage/v2/vertex.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/file.hpp"
|
||||
#include "utils/scheduler.hpp"
|
||||
#include "utils/skip_list.hpp"
|
||||
|
||||
namespace storage::durability {
|
||||
namespace storage {
|
||||
|
||||
static const std::string kSnapshotDirectory{"snapshots"};
|
||||
static const std::string kWalDirectory{"wal"};
|
||||
|
||||
static_assert(std::is_same_v<uint8_t, unsigned char>);
|
||||
|
||||
@ -110,4 +124,74 @@ class Decoder final {
|
||||
utils::InputFile file_;
|
||||
};
|
||||
|
||||
} // namespace storage::durability
|
||||
/// Exception used to handle errors during recovery.
|
||||
class RecoveryFailure : public utils::BasicException {
|
||||
using utils::BasicException::BasicException;
|
||||
};
|
||||
|
||||
/// Structure used to hold information about a snapshot.
|
||||
struct SnapshotInfo {
|
||||
uint64_t offset_edges;
|
||||
uint64_t offset_vertices;
|
||||
uint64_t offset_indices;
|
||||
uint64_t offset_constraints;
|
||||
uint64_t offset_mapper;
|
||||
uint64_t offset_metadata;
|
||||
|
||||
std::string uuid;
|
||||
uint64_t start_timestamp;
|
||||
uint64_t edges_count;
|
||||
uint64_t vertices_count;
|
||||
};
|
||||
|
||||
/// Function used to read information about the snapshot file.
|
||||
/// @throw RecoveryFailure
|
||||
SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path);
|
||||
|
||||
/// Durability class that is used to provide full durability functionality to
|
||||
/// the storage.
|
||||
class Durability final {
|
||||
public:
|
||||
struct RecoveryInfo {
|
||||
uint64_t next_vertex_id;
|
||||
uint64_t next_edge_id;
|
||||
uint64_t next_timestamp;
|
||||
};
|
||||
|
||||
Durability(Config::Durability config, utils::SkipList<Vertex> *vertices,
|
||||
utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper,
|
||||
Indices *indices, Constraints *constraints, Config::Items items);
|
||||
|
||||
std::optional<RecoveryInfo> Initialize(
|
||||
std::function<void(std::function<void(Transaction *)>)>
|
||||
execute_with_transaction);
|
||||
|
||||
void Finalize();
|
||||
|
||||
private:
|
||||
void CreateSnapshot(Transaction *transaction);
|
||||
|
||||
std::optional<RecoveryInfo> RecoverData();
|
||||
|
||||
RecoveryInfo LoadSnapshot(const std::filesystem::path &path);
|
||||
|
||||
Config::Durability config_;
|
||||
|
||||
utils::SkipList<Vertex> *vertices_;
|
||||
utils::SkipList<Edge> *edges_;
|
||||
NameIdMapper *name_id_mapper_;
|
||||
Indices *indices_;
|
||||
Constraints *constraints_;
|
||||
Config::Items items_;
|
||||
|
||||
std::function<void(std::function<void(Transaction *)>)>
|
||||
execute_with_transaction_;
|
||||
|
||||
std::filesystem::path snapshot_directory_;
|
||||
utils::Scheduler snapshot_runner_;
|
||||
|
||||
// UUID used to distinguish snapshots and to link snapshots to WALs
|
||||
std::string uuid_;
|
||||
};
|
||||
|
||||
} // namespace storage
|
||||
|
@ -14,7 +14,7 @@ struct Vertex;
|
||||
|
||||
struct Edge {
|
||||
Edge(Gid gid, Delta *delta) : gid(gid), deleted(false), delta(delta) {
|
||||
CHECK(delta->action == Delta::Action::DELETE_OBJECT)
|
||||
CHECK(delta == nullptr || delta->action == Delta::Action::DELETE_OBJECT)
|
||||
<< "Edge must be created with an initial DELETE_OBJECT delta!";
|
||||
}
|
||||
|
||||
|
@ -118,6 +118,8 @@ class LabelIndex {
|
||||
return it->second.size();
|
||||
}
|
||||
|
||||
void Clear() { index_.clear(); }
|
||||
|
||||
private:
|
||||
std::map<LabelId, utils::SkipList<Entry>> index_;
|
||||
Indices *indices_;
|
||||
@ -248,6 +250,8 @@ class LabelPropertyIndex {
|
||||
const std::optional<utils::Bound<PropertyValue>> &lower,
|
||||
const std::optional<utils::Bound<PropertyValue>> &upper) const;
|
||||
|
||||
void Clear() { index_.clear(); }
|
||||
|
||||
private:
|
||||
std::map<std::pair<LabelId, PropertyId>, utils::SkipList<Entry>> index_;
|
||||
Indices *indices_;
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include "storage/v2/storage.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
@ -291,7 +292,29 @@ bool VerticesIterable::Iterator::operator==(const Iterator &other) const {
|
||||
}
|
||||
}
|
||||
|
||||
Storage::Storage(Config config) : indices_(config.items), config_(config) {
|
||||
Storage::Storage(Config config)
|
||||
: indices_(config.items),
|
||||
config_(config),
|
||||
durability_(config.durability, &vertices_, &edges_, &name_id_mapper_,
|
||||
&indices_, &constraints_, config.items) {
|
||||
auto info = durability_.Initialize([this](auto callback) {
|
||||
// Take master RW lock (for reading).
|
||||
std::shared_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
|
||||
// Create the transaction used to create the snapshot.
|
||||
auto transaction = CreateTransaction();
|
||||
|
||||
// Create snapshot.
|
||||
callback(&transaction);
|
||||
|
||||
// Finalize snapshot transaction.
|
||||
commit_log_.MarkFinished(transaction.start_timestamp);
|
||||
});
|
||||
if (info) {
|
||||
vertex_id_ = info->next_vertex_id;
|
||||
edge_id_ = info->next_edge_id;
|
||||
timestamp_ = std::max(timestamp_, info->next_timestamp);
|
||||
}
|
||||
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
|
||||
gc_runner_.Run("Storage GC", config_.gc.interval,
|
||||
[this] { this->CollectGarbage(); });
|
||||
@ -302,6 +325,7 @@ Storage::~Storage() {
|
||||
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
|
||||
gc_runner_.Stop();
|
||||
}
|
||||
durability_.Finalize();
|
||||
}
|
||||
|
||||
Storage::Accessor::Accessor(Storage *storage)
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include "storage/v2/commit_log.hpp"
|
||||
#include "storage/v2/config.hpp"
|
||||
#include "storage/v2/constraints.hpp"
|
||||
#include "storage/v2/durability.hpp"
|
||||
#include "storage/v2/edge.hpp"
|
||||
#include "storage/v2/edge_accessor.hpp"
|
||||
#include "storage/v2/indices.hpp"
|
||||
@ -418,6 +419,8 @@ class Storage final {
|
||||
// Edges that are logically deleted and wait to be removed from the main
|
||||
// storage.
|
||||
utils::Synchronized<std::list<Gid>, utils::SpinLock> deleted_edges_;
|
||||
|
||||
Durability durability_;
|
||||
};
|
||||
|
||||
} // namespace storage
|
||||
|
@ -15,7 +15,7 @@ namespace storage {
|
||||
|
||||
struct Vertex {
|
||||
Vertex(Gid gid, Delta *delta) : gid(gid), deleted(false), delta(delta) {
|
||||
CHECK(delta->action == Delta::Action::DELETE_OBJECT)
|
||||
CHECK(delta == nullptr || delta->action == Delta::Action::DELETE_OBJECT)
|
||||
<< "Vertex must be created with an initial DELETE_OBJECT delta!";
|
||||
}
|
||||
|
||||
|
@ -344,6 +344,9 @@ target_link_libraries(${test_prefix}storage_v2_constraints mg-storage-v2)
|
||||
add_unit_test(storage_v2_decoder_encoder.cpp)
|
||||
target_link_libraries(${test_prefix}storage_v2_decoder_encoder mg-storage-v2)
|
||||
|
||||
add_unit_test(storage_v2_durability.cpp)
|
||||
target_link_libraries(${test_prefix}storage_v2_durability mg-storage-v2)
|
||||
|
||||
add_unit_test(storage_v2_edge.cpp)
|
||||
target_link_libraries(${test_prefix}storage_v2_edge mg-storage-v2)
|
||||
|
||||
|
@ -36,9 +36,9 @@ class DecoderEncoderTest : public ::testing::Test {
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_F(DecoderEncoderTest, ReadMarker) {
|
||||
{
|
||||
storage::durability::Encoder encoder;
|
||||
storage::Encoder encoder;
|
||||
encoder.Initialize(storage_file, kTestMagic, kTestVersion);
|
||||
for (const auto &item : storage::durability::kMarkersAll) {
|
||||
for (const auto &item : storage::kMarkersAll) {
|
||||
encoder.WriteMarker(item);
|
||||
}
|
||||
{
|
||||
@ -48,11 +48,11 @@ TEST_F(DecoderEncoderTest, ReadMarker) {
|
||||
encoder.Finalize();
|
||||
}
|
||||
{
|
||||
storage::durability::Decoder decoder;
|
||||
storage::Decoder decoder;
|
||||
auto version = decoder.Initialize(storage_file, kTestMagic);
|
||||
ASSERT_TRUE(version);
|
||||
ASSERT_EQ(*version, kTestVersion);
|
||||
for (const auto &item : storage::durability::kMarkersAll) {
|
||||
for (const auto &item : storage::kMarkersAll) {
|
||||
auto decoded = decoder.ReadMarker();
|
||||
ASSERT_TRUE(decoded);
|
||||
ASSERT_EQ(*decoded, item);
|
||||
@ -70,7 +70,7 @@ TEST_F(DecoderEncoderTest, ReadMarker) {
|
||||
TEST_F(DecoderEncoderTest, Read##name) { \
|
||||
std::vector<type> dataset{__VA_ARGS__}; \
|
||||
{ \
|
||||
storage::durability::Encoder encoder; \
|
||||
storage::Encoder encoder; \
|
||||
encoder.Initialize(storage_file, kTestMagic, kTestVersion); \
|
||||
for (const auto &item : dataset) { \
|
||||
encoder.Write##name(item); \
|
||||
@ -82,7 +82,7 @@ TEST_F(DecoderEncoderTest, ReadMarker) {
|
||||
encoder.Finalize(); \
|
||||
} \
|
||||
{ \
|
||||
storage::durability::Decoder decoder; \
|
||||
storage::Decoder decoder; \
|
||||
auto version = decoder.Initialize(storage_file, kTestMagic); \
|
||||
ASSERT_TRUE(version); \
|
||||
ASSERT_EQ(*version, kTestVersion); \
|
||||
@ -131,7 +131,7 @@ GENERATE_READ_TEST(
|
||||
TEST_F(DecoderEncoderTest, Skip##name) { \
|
||||
std::vector<type> dataset{__VA_ARGS__}; \
|
||||
{ \
|
||||
storage::durability::Encoder encoder; \
|
||||
storage::Encoder encoder; \
|
||||
encoder.Initialize(storage_file, kTestMagic, kTestVersion); \
|
||||
for (const auto &item : dataset) { \
|
||||
encoder.Write##name(item); \
|
||||
@ -143,7 +143,7 @@ GENERATE_READ_TEST(
|
||||
encoder.Finalize(); \
|
||||
} \
|
||||
{ \
|
||||
storage::durability::Decoder decoder; \
|
||||
storage::Decoder decoder; \
|
||||
auto version = decoder.Initialize(storage_file, kTestMagic); \
|
||||
ASSERT_TRUE(version); \
|
||||
ASSERT_EQ(*version, kTestVersion); \
|
||||
@ -177,7 +177,7 @@ GENERATE_SKIP_TEST(
|
||||
#define GENERATE_PARTIAL_READ_TEST(name, value) \
|
||||
TEST_F(DecoderEncoderTest, PartialRead##name) { \
|
||||
{ \
|
||||
storage::durability::Encoder encoder; \
|
||||
storage::Encoder encoder; \
|
||||
encoder.Initialize(storage_file, kTestMagic, kTestVersion); \
|
||||
encoder.Write##name(value); \
|
||||
encoder.Finalize(); \
|
||||
@ -195,7 +195,7 @@ GENERATE_SKIP_TEST(
|
||||
ofile.Write(&byte, sizeof(byte)); \
|
||||
ofile.Sync(); \
|
||||
} \
|
||||
storage::durability::Decoder decoder; \
|
||||
storage::Decoder decoder; \
|
||||
auto version = decoder.Initialize(alternate_file, kTestMagic); \
|
||||
if (i < kTestMagic.size() + sizeof(kTestVersion)) { \
|
||||
ASSERT_FALSE(version); \
|
||||
@ -215,7 +215,7 @@ GENERATE_SKIP_TEST(
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
GENERATE_PARTIAL_READ_TEST(Marker, storage::durability::Marker::SECTION_VERTEX);
|
||||
GENERATE_PARTIAL_READ_TEST(Marker, storage::Marker::SECTION_VERTEX);
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
GENERATE_PARTIAL_READ_TEST(Bool, false);
|
||||
@ -243,7 +243,7 @@ GENERATE_PARTIAL_READ_TEST(
|
||||
#define GENERATE_PARTIAL_SKIP_TEST(name, value) \
|
||||
TEST_F(DecoderEncoderTest, PartialSkip##name) { \
|
||||
{ \
|
||||
storage::durability::Encoder encoder; \
|
||||
storage::Encoder encoder; \
|
||||
encoder.Initialize(storage_file, kTestMagic, kTestVersion); \
|
||||
encoder.Write##name(value); \
|
||||
encoder.Finalize(); \
|
||||
@ -261,7 +261,7 @@ GENERATE_PARTIAL_READ_TEST(
|
||||
ofile.Write(&byte, sizeof(byte)); \
|
||||
ofile.Sync(); \
|
||||
} \
|
||||
storage::durability::Decoder decoder; \
|
||||
storage::Decoder decoder; \
|
||||
auto version = decoder.Initialize(alternate_file, kTestMagic); \
|
||||
if (i < kTestMagic.size() + sizeof(kTestVersion)) { \
|
||||
ASSERT_FALSE(version); \
|
||||
@ -294,7 +294,7 @@ GENERATE_PARTIAL_SKIP_TEST(
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_F(DecoderEncoderTest, PropertyValueInvalidMarker) {
|
||||
{
|
||||
storage::durability::Encoder encoder;
|
||||
storage::Encoder encoder;
|
||||
encoder.Initialize(storage_file, kTestMagic, kTestVersion);
|
||||
encoder.WritePropertyValue(storage::PropertyValue(123L));
|
||||
encoder.Finalize();
|
||||
@ -302,51 +302,50 @@ TEST_F(DecoderEncoderTest, PropertyValueInvalidMarker) {
|
||||
{
|
||||
utils::OutputFile file;
|
||||
file.Open(storage_file, utils::OutputFile::Mode::OVERWRITE_EXISTING);
|
||||
for (auto marker : storage::durability::kMarkersAll) {
|
||||
for (auto marker : storage::kMarkersAll) {
|
||||
bool valid_marker;
|
||||
switch (marker) {
|
||||
case storage::durability::Marker::TYPE_NULL:
|
||||
case storage::durability::Marker::TYPE_BOOL:
|
||||
case storage::durability::Marker::TYPE_INT:
|
||||
case storage::durability::Marker::TYPE_DOUBLE:
|
||||
case storage::durability::Marker::TYPE_STRING:
|
||||
case storage::durability::Marker::TYPE_LIST:
|
||||
case storage::durability::Marker::TYPE_MAP:
|
||||
case storage::durability::Marker::TYPE_PROPERTY_VALUE:
|
||||
case storage::Marker::TYPE_NULL:
|
||||
case storage::Marker::TYPE_BOOL:
|
||||
case storage::Marker::TYPE_INT:
|
||||
case storage::Marker::TYPE_DOUBLE:
|
||||
case storage::Marker::TYPE_STRING:
|
||||
case storage::Marker::TYPE_LIST:
|
||||
case storage::Marker::TYPE_MAP:
|
||||
case storage::Marker::TYPE_PROPERTY_VALUE:
|
||||
valid_marker = true;
|
||||
break;
|
||||
|
||||
case storage::durability::Marker::SECTION_VERTEX:
|
||||
case storage::durability::Marker::SECTION_EDGE:
|
||||
case storage::durability::Marker::SECTION_MAPPER:
|
||||
case storage::durability::Marker::SECTION_METADATA:
|
||||
case storage::durability::Marker::SECTION_INDICES:
|
||||
case storage::durability::Marker::SECTION_CONSTRAINTS:
|
||||
case storage::durability::Marker::SECTION_OFFSETS:
|
||||
case storage::durability::Marker::VALUE_FALSE:
|
||||
case storage::durability::Marker::VALUE_TRUE:
|
||||
case storage::Marker::SECTION_VERTEX:
|
||||
case storage::Marker::SECTION_EDGE:
|
||||
case storage::Marker::SECTION_MAPPER:
|
||||
case storage::Marker::SECTION_METADATA:
|
||||
case storage::Marker::SECTION_INDICES:
|
||||
case storage::Marker::SECTION_CONSTRAINTS:
|
||||
case storage::Marker::SECTION_OFFSETS:
|
||||
case storage::Marker::VALUE_FALSE:
|
||||
case storage::Marker::VALUE_TRUE:
|
||||
valid_marker = false;
|
||||
break;
|
||||
}
|
||||
// We only run this test with invalid markers.
|
||||
if (valid_marker) continue;
|
||||
{
|
||||
file.SetPosition(
|
||||
utils::OutputFile::Position::RELATIVE_TO_END,
|
||||
-(sizeof(uint64_t) + sizeof(storage::durability::Marker)));
|
||||
file.SetPosition(utils::OutputFile::Position::RELATIVE_TO_END,
|
||||
-(sizeof(uint64_t) + sizeof(storage::Marker)));
|
||||
auto byte = static_cast<uint8_t>(marker);
|
||||
file.Write(&byte, sizeof(byte));
|
||||
file.Sync();
|
||||
}
|
||||
{
|
||||
storage::durability::Decoder decoder;
|
||||
storage::Decoder decoder;
|
||||
auto version = decoder.Initialize(storage_file, kTestMagic);
|
||||
ASSERT_TRUE(version);
|
||||
ASSERT_EQ(*version, kTestVersion);
|
||||
ASSERT_FALSE(decoder.SkipPropertyValue());
|
||||
}
|
||||
{
|
||||
storage::durability::Decoder decoder;
|
||||
storage::Decoder decoder;
|
||||
auto version = decoder.Initialize(storage_file, kTestMagic);
|
||||
ASSERT_TRUE(version);
|
||||
ASSERT_EQ(*version, kTestVersion);
|
||||
@ -355,22 +354,21 @@ TEST_F(DecoderEncoderTest, PropertyValueInvalidMarker) {
|
||||
}
|
||||
{
|
||||
{
|
||||
file.SetPosition(
|
||||
utils::OutputFile::Position::RELATIVE_TO_END,
|
||||
-(sizeof(uint64_t) + sizeof(storage::durability::Marker)));
|
||||
file.SetPosition(utils::OutputFile::Position::RELATIVE_TO_END,
|
||||
-(sizeof(uint64_t) + sizeof(storage::Marker)));
|
||||
uint8_t byte = 1;
|
||||
file.Write(&byte, sizeof(byte));
|
||||
file.Sync();
|
||||
}
|
||||
{
|
||||
storage::durability::Decoder decoder;
|
||||
storage::Decoder decoder;
|
||||
auto version = decoder.Initialize(storage_file, kTestMagic);
|
||||
ASSERT_TRUE(version);
|
||||
ASSERT_EQ(*version, kTestVersion);
|
||||
ASSERT_FALSE(decoder.SkipPropertyValue());
|
||||
}
|
||||
{
|
||||
storage::durability::Decoder decoder;
|
||||
storage::Decoder decoder;
|
||||
auto version = decoder.Initialize(storage_file, kTestMagic);
|
||||
ASSERT_TRUE(version);
|
||||
ASSERT_EQ(*version, kTestVersion);
|
||||
@ -383,13 +381,13 @@ TEST_F(DecoderEncoderTest, PropertyValueInvalidMarker) {
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_F(DecoderEncoderTest, DecoderPosition) {
|
||||
{
|
||||
storage::durability::Encoder encoder;
|
||||
storage::Encoder encoder;
|
||||
encoder.Initialize(storage_file, kTestMagic, kTestVersion);
|
||||
encoder.WriteBool(true);
|
||||
encoder.Finalize();
|
||||
}
|
||||
{
|
||||
storage::durability::Decoder decoder;
|
||||
storage::Decoder decoder;
|
||||
auto version = decoder.Initialize(storage_file, kTestMagic);
|
||||
ASSERT_TRUE(version);
|
||||
ASSERT_EQ(*version, kTestVersion);
|
||||
@ -409,7 +407,7 @@ TEST_F(DecoderEncoderTest, DecoderPosition) {
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_F(DecoderEncoderTest, EncoderPosition) {
|
||||
{
|
||||
storage::durability::Encoder encoder;
|
||||
storage::Encoder encoder;
|
||||
encoder.Initialize(storage_file, kTestMagic, kTestVersion);
|
||||
encoder.WriteBool(false);
|
||||
encoder.SetPosition(kTestMagic.size() + sizeof(kTestVersion));
|
||||
@ -418,7 +416,7 @@ TEST_F(DecoderEncoderTest, EncoderPosition) {
|
||||
encoder.Finalize();
|
||||
}
|
||||
{
|
||||
storage::durability::Decoder decoder;
|
||||
storage::Decoder decoder;
|
||||
auto version = decoder.Initialize(storage_file, kTestMagic);
|
||||
ASSERT_TRUE(version);
|
||||
ASSERT_EQ(*version, kTestVersion);
|
||||
|
764
tests/unit/storage_v2_durability.cpp
Normal file
764
tests/unit/storage_v2_durability.cpp
Normal file
@ -0,0 +1,764 @@
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <filesystem>
|
||||
#include <thread>
|
||||
|
||||
#include "storage/v2/durability.hpp"
|
||||
#include "storage/v2/storage.hpp"
|
||||
#include "utils/file.hpp"
|
||||
|
||||
using testing::Contains;
|
||||
using testing::UnorderedElementsAre;
|
||||
|
||||
class DurabilityTest : public ::testing::TestWithParam<bool> {
|
||||
private:
|
||||
const uint64_t kNumBaseVertices = 1000;
|
||||
const uint64_t kNumBaseEdges = 10000;
|
||||
const uint64_t kNumExtendedVertices = 100;
|
||||
const uint64_t kNumExtendedEdges = 1000;
|
||||
|
||||
public:
|
||||
DurabilityTest()
|
||||
: base_vertex_gids_(kNumBaseVertices),
|
||||
base_edge_gids_(kNumBaseEdges),
|
||||
extended_vertex_gids_(kNumExtendedVertices),
|
||||
extended_edge_gids_(kNumExtendedEdges) {}
|
||||
|
||||
void SetUp() override { Clear(); }
|
||||
|
||||
void TearDown() override { Clear(); }
|
||||
|
||||
void CreateBaseDataset(storage::Storage *store, bool properties_on_edges) {
|
||||
auto label_indexed = store->NameToLabel("base_indexed");
|
||||
auto label_unindexed = store->NameToLabel("base_unindexed");
|
||||
auto property_id = store->NameToProperty("id");
|
||||
auto et1 = store->NameToEdgeType("base_et1");
|
||||
auto et2 = store->NameToEdgeType("base_et2");
|
||||
|
||||
// Create label index.
|
||||
ASSERT_TRUE(store->CreateIndex(label_unindexed));
|
||||
|
||||
// Create label+property index.
|
||||
ASSERT_TRUE(store->CreateIndex(label_indexed, property_id));
|
||||
|
||||
// Create existence constraint.
|
||||
ASSERT_FALSE(store->CreateExistenceConstraint(label_unindexed, property_id)
|
||||
.HasError());
|
||||
|
||||
// Create vertices.
|
||||
for (uint64_t i = 0; i < kNumBaseVertices; ++i) {
|
||||
auto acc = store->Access();
|
||||
auto vertex = acc.CreateVertex();
|
||||
base_vertex_gids_[i] = vertex.Gid();
|
||||
if (i < kNumBaseVertices / 2) {
|
||||
ASSERT_TRUE(vertex.AddLabel(label_indexed).HasValue());
|
||||
} else {
|
||||
ASSERT_TRUE(vertex.AddLabel(label_unindexed).HasValue());
|
||||
}
|
||||
if (i < kNumBaseVertices / 3 || i >= kNumBaseVertices / 2) {
|
||||
ASSERT_TRUE(vertex
|
||||
.SetProperty(property_id, storage::PropertyValue(
|
||||
static_cast<int64_t>(i)))
|
||||
.HasValue());
|
||||
}
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
|
||||
// Create edges.
|
||||
for (uint64_t i = 0; i < kNumBaseEdges; ++i) {
|
||||
auto acc = store->Access();
|
||||
auto vertex1 = acc.FindVertex(
|
||||
base_vertex_gids_[(i / 2) % kNumBaseVertices], storage::View::OLD);
|
||||
ASSERT_TRUE(vertex1);
|
||||
auto vertex2 = acc.FindVertex(
|
||||
base_vertex_gids_[(i / 3) % kNumBaseVertices], storage::View::OLD);
|
||||
ASSERT_TRUE(vertex2);
|
||||
storage::EdgeTypeId et;
|
||||
if (i < kNumBaseEdges / 2) {
|
||||
et = et1;
|
||||
} else {
|
||||
et = et2;
|
||||
}
|
||||
auto edge = acc.CreateEdge(&*vertex1, &*vertex2, et);
|
||||
ASSERT_TRUE(edge.HasValue());
|
||||
base_edge_gids_[i] = edge->Gid();
|
||||
if (properties_on_edges) {
|
||||
ASSERT_TRUE(edge->SetProperty(property_id, storage::PropertyValue(
|
||||
static_cast<int64_t>(i)))
|
||||
.HasValue());
|
||||
} else {
|
||||
auto ret = edge->SetProperty(
|
||||
property_id, storage::PropertyValue(static_cast<int64_t>(i)));
|
||||
ASSERT_TRUE(ret.HasError());
|
||||
ASSERT_EQ(ret.GetError(), storage::Error::PROPERTIES_DISABLED);
|
||||
}
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
}
|
||||
|
||||
void VerifyBaseDataset(storage::Storage *store, bool properties_on_edges,
|
||||
bool extended_dataset_exists) {
|
||||
auto label_indexed = store->NameToLabel("base_indexed");
|
||||
auto label_unindexed = store->NameToLabel("base_unindexed");
|
||||
auto property_id = store->NameToProperty("id");
|
||||
auto et1 = store->NameToEdgeType("base_et1");
|
||||
auto et2 = store->NameToEdgeType("base_et2");
|
||||
|
||||
// Verify indices info.
|
||||
{
|
||||
auto info = store->ListAllIndices();
|
||||
if (extended_dataset_exists) {
|
||||
ASSERT_THAT(info.label, Contains(label_unindexed));
|
||||
ASSERT_THAT(info.label_property,
|
||||
Contains(std::make_pair(label_indexed, property_id)));
|
||||
} else {
|
||||
ASSERT_THAT(info.label, UnorderedElementsAre(label_unindexed));
|
||||
ASSERT_THAT(info.label_property, UnorderedElementsAre(std::make_pair(
|
||||
label_indexed, property_id)));
|
||||
}
|
||||
}
|
||||
|
||||
// Verify constraints info.
|
||||
{
|
||||
auto info = store->ListAllConstraints();
|
||||
if (extended_dataset_exists) {
|
||||
ASSERT_THAT(info.existence,
|
||||
Contains(std::make_pair(label_unindexed, property_id)));
|
||||
} else {
|
||||
ASSERT_THAT(info.existence, UnorderedElementsAre(std::make_pair(
|
||||
label_unindexed, property_id)));
|
||||
}
|
||||
}
|
||||
|
||||
// Create storage accessor.
|
||||
auto acc = store->Access();
|
||||
|
||||
// Verify vertices.
|
||||
for (uint64_t i = 0; i < kNumBaseVertices; ++i) {
|
||||
auto vertex = acc.FindVertex(base_vertex_gids_[i], storage::View::OLD);
|
||||
ASSERT_TRUE(vertex);
|
||||
auto labels = vertex->Labels(storage::View::OLD);
|
||||
ASSERT_TRUE(labels.HasValue());
|
||||
if (i < kNumBaseVertices / 2) {
|
||||
ASSERT_THAT(*labels, UnorderedElementsAre(label_indexed));
|
||||
} else {
|
||||
ASSERT_THAT(*labels, UnorderedElementsAre(label_unindexed));
|
||||
}
|
||||
auto properties = vertex->Properties(storage::View::OLD);
|
||||
ASSERT_TRUE(properties.HasValue());
|
||||
if (i < kNumBaseVertices / 3 || i >= kNumBaseVertices / 2) {
|
||||
ASSERT_EQ(properties->size(), 1);
|
||||
ASSERT_EQ((*properties)[property_id],
|
||||
storage::PropertyValue(static_cast<int64_t>(i)));
|
||||
} else {
|
||||
ASSERT_EQ(properties->size(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
// Verify edges.
|
||||
for (uint64_t i = 0; i < kNumBaseEdges; ++i) {
|
||||
auto find_edge =
|
||||
[&](const auto &edges) -> std::optional<storage::EdgeAccessor> {
|
||||
for (const auto &edge : edges) {
|
||||
if (edge.Gid() == base_edge_gids_[i]) {
|
||||
return edge;
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
};
|
||||
|
||||
{
|
||||
auto vertex1 = acc.FindVertex(
|
||||
base_vertex_gids_[(i / 2) % kNumBaseVertices], storage::View::OLD);
|
||||
ASSERT_TRUE(vertex1);
|
||||
auto out_edges = vertex1->OutEdges({}, storage::View::OLD);
|
||||
ASSERT_TRUE(out_edges.HasValue());
|
||||
auto edge1 = find_edge(*out_edges);
|
||||
ASSERT_TRUE(edge1);
|
||||
if (i < kNumBaseEdges / 2) {
|
||||
ASSERT_EQ(edge1->EdgeType(), et1);
|
||||
} else {
|
||||
ASSERT_EQ(edge1->EdgeType(), et2);
|
||||
}
|
||||
auto properties = edge1->Properties(storage::View::OLD);
|
||||
ASSERT_TRUE(properties.HasValue());
|
||||
if (properties_on_edges) {
|
||||
ASSERT_EQ(properties->size(), 1);
|
||||
ASSERT_EQ((*properties)[property_id],
|
||||
storage::PropertyValue(static_cast<int64_t>(i)));
|
||||
} else {
|
||||
ASSERT_EQ(properties->size(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto vertex2 = acc.FindVertex(
|
||||
base_vertex_gids_[(i / 3) % kNumBaseVertices], storage::View::OLD);
|
||||
ASSERT_TRUE(vertex2);
|
||||
auto in_edges = vertex2->InEdges({}, storage::View::OLD);
|
||||
ASSERT_TRUE(in_edges.HasValue());
|
||||
auto edge2 = find_edge(*in_edges);
|
||||
ASSERT_TRUE(edge2);
|
||||
if (i < kNumBaseEdges / 2) {
|
||||
ASSERT_EQ(edge2->EdgeType(), et1);
|
||||
} else {
|
||||
ASSERT_EQ(edge2->EdgeType(), et2);
|
||||
}
|
||||
auto properties = edge2->Properties(storage::View::OLD);
|
||||
ASSERT_TRUE(properties.HasValue());
|
||||
if (properties_on_edges) {
|
||||
ASSERT_EQ(properties->size(), 1);
|
||||
ASSERT_EQ((*properties)[property_id],
|
||||
storage::PropertyValue(static_cast<int64_t>(i)));
|
||||
} else {
|
||||
ASSERT_EQ(properties->size(), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Verify label indices.
|
||||
{
|
||||
std::vector<storage::VertexAccessor> vertices;
|
||||
vertices.reserve(kNumBaseVertices / 2);
|
||||
for (auto vertex : acc.Vertices(label_unindexed, storage::View::OLD)) {
|
||||
vertices.push_back(vertex);
|
||||
}
|
||||
ASSERT_EQ(vertices.size(), kNumBaseVertices / 2);
|
||||
std::sort(vertices.begin(), vertices.end(),
|
||||
[](const auto &a, const auto &b) { return a.Gid() < b.Gid(); });
|
||||
for (uint64_t i = 0; i < kNumBaseVertices / 2; ++i) {
|
||||
ASSERT_EQ(vertices[i].Gid(),
|
||||
base_vertex_gids_[kNumBaseVertices / 2 + i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Verify label+property index.
|
||||
{
|
||||
std::vector<storage::VertexAccessor> vertices;
|
||||
vertices.reserve(kNumBaseVertices / 3);
|
||||
for (auto vertex :
|
||||
acc.Vertices(label_indexed, property_id, storage::View::OLD)) {
|
||||
vertices.push_back(vertex);
|
||||
}
|
||||
ASSERT_EQ(vertices.size(), kNumBaseVertices / 3);
|
||||
std::sort(vertices.begin(), vertices.end(),
|
||||
[](const auto &a, const auto &b) { return a.Gid() < b.Gid(); });
|
||||
for (uint64_t i = 0; i < kNumBaseVertices / 3; ++i) {
|
||||
ASSERT_EQ(vertices[i].Gid(), base_vertex_gids_[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CreateExtendedDataset(storage::Storage *store) {
|
||||
auto label_indexed = store->NameToLabel("extended_indexed");
|
||||
auto label_unused = store->NameToLabel("extended_unused");
|
||||
auto property_count = store->NameToProperty("count");
|
||||
auto et3 = store->NameToEdgeType("extended_et3");
|
||||
auto et4 = store->NameToEdgeType("extended_et4");
|
||||
|
||||
// Create label index.
|
||||
ASSERT_TRUE(store->CreateIndex(label_unused));
|
||||
|
||||
// Create label+property index.
|
||||
ASSERT_TRUE(store->CreateIndex(label_indexed, property_count));
|
||||
|
||||
// Create existence constraint.
|
||||
ASSERT_FALSE(store->CreateExistenceConstraint(label_unused, property_count)
|
||||
.HasError());
|
||||
|
||||
// Create vertices.
|
||||
for (uint64_t i = 0; i < kNumExtendedVertices; ++i) {
|
||||
auto acc = store->Access();
|
||||
auto vertex = acc.CreateVertex();
|
||||
extended_vertex_gids_[i] = vertex.Gid();
|
||||
if (i < kNumExtendedVertices / 2) {
|
||||
ASSERT_TRUE(vertex.AddLabel(label_indexed).HasValue());
|
||||
}
|
||||
if (i < kNumExtendedVertices / 3 || i >= kNumExtendedVertices / 2) {
|
||||
ASSERT_TRUE(
|
||||
vertex
|
||||
.SetProperty(property_count, storage::PropertyValue("nandare"))
|
||||
.HasValue());
|
||||
}
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
|
||||
// Create edges.
|
||||
for (uint64_t i = 0; i < kNumExtendedEdges; ++i) {
|
||||
auto acc = store->Access();
|
||||
auto vertex1 =
|
||||
acc.FindVertex(extended_vertex_gids_[(i / 5) % kNumExtendedVertices],
|
||||
storage::View::OLD);
|
||||
ASSERT_TRUE(vertex1);
|
||||
auto vertex2 =
|
||||
acc.FindVertex(extended_vertex_gids_[(i / 6) % kNumExtendedVertices],
|
||||
storage::View::OLD);
|
||||
ASSERT_TRUE(vertex2);
|
||||
storage::EdgeTypeId et;
|
||||
if (i < kNumExtendedEdges / 4) {
|
||||
et = et3;
|
||||
} else {
|
||||
et = et4;
|
||||
}
|
||||
auto edge = acc.CreateEdge(&*vertex1, &*vertex2, et);
|
||||
ASSERT_TRUE(edge.HasValue());
|
||||
extended_edge_gids_[i] = edge->Gid();
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
}
|
||||
|
||||
void VerifyExtendedDataset(storage::Storage *store) {
|
||||
auto label_indexed = store->NameToLabel("extended_indexed");
|
||||
auto label_unused = store->NameToLabel("extended_unused");
|
||||
auto property_count = store->NameToProperty("count");
|
||||
auto et3 = store->NameToEdgeType("extended_et3");
|
||||
auto et4 = store->NameToEdgeType("extended_et4");
|
||||
|
||||
// Verify indices info.
|
||||
{
|
||||
auto info = store->ListAllIndices();
|
||||
auto base_label_indexed = store->NameToLabel("base_indexed");
|
||||
auto base_label_unindexed = store->NameToLabel("base_unindexed");
|
||||
auto base_property_id = store->NameToProperty("id");
|
||||
ASSERT_THAT(info.label,
|
||||
UnorderedElementsAre(base_label_unindexed, label_unused));
|
||||
ASSERT_THAT(info.label_property,
|
||||
UnorderedElementsAre(
|
||||
std::make_pair(base_label_indexed, base_property_id),
|
||||
std::make_pair(label_indexed, property_count)));
|
||||
}
|
||||
|
||||
// Verify constraints info.
|
||||
{
|
||||
auto info = store->ListAllConstraints();
|
||||
auto base_label_unindexed = store->NameToLabel("base_unindexed");
|
||||
auto base_property_id = store->NameToProperty("id");
|
||||
ASSERT_THAT(info.existence,
|
||||
UnorderedElementsAre(
|
||||
std::make_pair(base_label_unindexed, base_property_id),
|
||||
std::make_pair(label_unused, property_count)));
|
||||
}
|
||||
|
||||
// Create storage accessor.
|
||||
auto acc = store->Access();
|
||||
|
||||
// Verify vertices.
|
||||
for (uint64_t i = 0; i < kNumExtendedVertices; ++i) {
|
||||
auto vertex =
|
||||
acc.FindVertex(extended_vertex_gids_[i], storage::View::OLD);
|
||||
ASSERT_TRUE(vertex);
|
||||
auto labels = vertex->Labels(storage::View::OLD);
|
||||
ASSERT_TRUE(labels.HasValue());
|
||||
if (i < kNumExtendedVertices / 2) {
|
||||
ASSERT_THAT(*labels, UnorderedElementsAre(label_indexed));
|
||||
}
|
||||
auto properties = vertex->Properties(storage::View::OLD);
|
||||
ASSERT_TRUE(properties.HasValue());
|
||||
if (i < kNumExtendedVertices / 3 || i >= kNumExtendedVertices / 2) {
|
||||
ASSERT_EQ(properties->size(), 1);
|
||||
ASSERT_EQ((*properties)[property_count],
|
||||
storage::PropertyValue("nandare"));
|
||||
} else {
|
||||
ASSERT_EQ(properties->size(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
// Verify edges.
|
||||
for (uint64_t i = 0; i < kNumExtendedEdges; ++i) {
|
||||
auto find_edge =
|
||||
[&](const auto &edges) -> std::optional<storage::EdgeAccessor> {
|
||||
for (const auto &edge : edges) {
|
||||
if (edge.Gid() == extended_edge_gids_[i]) {
|
||||
return edge;
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
};
|
||||
|
||||
{
|
||||
auto vertex1 = acc.FindVertex(
|
||||
extended_vertex_gids_[(i / 5) % kNumExtendedVertices],
|
||||
storage::View::OLD);
|
||||
ASSERT_TRUE(vertex1);
|
||||
auto out_edges = vertex1->OutEdges({}, storage::View::OLD);
|
||||
ASSERT_TRUE(out_edges.HasValue());
|
||||
auto edge1 = find_edge(*out_edges);
|
||||
ASSERT_TRUE(edge1);
|
||||
if (i < kNumExtendedEdges / 4) {
|
||||
ASSERT_EQ(edge1->EdgeType(), et3);
|
||||
} else {
|
||||
ASSERT_EQ(edge1->EdgeType(), et4);
|
||||
}
|
||||
auto properties = edge1->Properties(storage::View::OLD);
|
||||
ASSERT_TRUE(properties.HasValue());
|
||||
ASSERT_EQ(properties->size(), 0);
|
||||
}
|
||||
|
||||
{
|
||||
auto vertex2 = acc.FindVertex(
|
||||
extended_vertex_gids_[(i / 6) % kNumExtendedVertices],
|
||||
storage::View::OLD);
|
||||
ASSERT_TRUE(vertex2);
|
||||
auto in_edges = vertex2->InEdges({}, storage::View::OLD);
|
||||
ASSERT_TRUE(in_edges.HasValue());
|
||||
auto edge2 = find_edge(*in_edges);
|
||||
ASSERT_TRUE(edge2);
|
||||
if (i < kNumExtendedEdges / 4) {
|
||||
ASSERT_EQ(edge2->EdgeType(), et3);
|
||||
} else {
|
||||
ASSERT_EQ(edge2->EdgeType(), et4);
|
||||
}
|
||||
auto properties = edge2->Properties(storage::View::OLD);
|
||||
ASSERT_TRUE(properties.HasValue());
|
||||
ASSERT_EQ(properties->size(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
// Verify label indices.
|
||||
{
|
||||
std::vector<storage::VertexAccessor> vertices;
|
||||
vertices.reserve(kNumExtendedVertices / 2);
|
||||
for (auto vertex : acc.Vertices(label_unused, storage::View::OLD)) {
|
||||
vertices.push_back(vertex);
|
||||
}
|
||||
ASSERT_EQ(vertices.size(), 0);
|
||||
}
|
||||
|
||||
// Verify label+property index.
|
||||
{
|
||||
std::vector<storage::VertexAccessor> vertices;
|
||||
vertices.reserve(kNumExtendedVertices / 3);
|
||||
for (auto vertex :
|
||||
acc.Vertices(label_indexed, property_count, storage::View::OLD)) {
|
||||
vertices.push_back(vertex);
|
||||
}
|
||||
ASSERT_EQ(vertices.size(), kNumExtendedVertices / 3);
|
||||
std::sort(vertices.begin(), vertices.end(),
|
||||
[](const auto &a, const auto &b) { return a.Gid() < b.Gid(); });
|
||||
for (uint64_t i = 0; i < kNumExtendedVertices / 3; ++i) {
|
||||
ASSERT_EQ(vertices[i].Gid(), extended_vertex_gids_[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::filesystem::path> GetSnapshotsList() {
|
||||
std::vector<std::filesystem::path> ret;
|
||||
for (auto &item : std::filesystem::directory_iterator(
|
||||
storage_directory / storage::kSnapshotDirectory)) {
|
||||
ret.push_back(item.path());
|
||||
}
|
||||
std::sort(ret.begin(), ret.end());
|
||||
std::reverse(ret.begin(), ret.end());
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::filesystem::path storage_directory{
|
||||
std::filesystem::temp_directory_path() /
|
||||
"MG_test_unit_storage_v2_durability"};
|
||||
|
||||
private:
|
||||
void Clear() {
|
||||
if (!std::filesystem::exists(storage_directory)) return;
|
||||
std::filesystem::remove_all(storage_directory);
|
||||
}
|
||||
|
||||
std::vector<storage::Gid> base_vertex_gids_;
|
||||
std::vector<storage::Gid> base_edge_gids_;
|
||||
std::vector<storage::Gid> extended_vertex_gids_;
|
||||
std::vector<storage::Gid> extended_edge_gids_;
|
||||
};
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(EdgesWithProperties, DurabilityTest,
|
||||
::testing::Values(true));
|
||||
INSTANTIATE_TEST_CASE_P(EdgesWithoutProperties, DurabilityTest,
|
||||
::testing::Values(false));
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, SnapshotOnExit) {
|
||||
// Create snapshot.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_on_exit = true}});
|
||||
CreateBaseDataset(&store, GetParam());
|
||||
VerifyBaseDataset(&store, GetParam(), false);
|
||||
CreateExtendedDataset(&store);
|
||||
VerifyBaseDataset(&store, GetParam(), true);
|
||||
VerifyExtendedDataset(&store);
|
||||
}
|
||||
|
||||
// Recover snapshot.
|
||||
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.recover_on_startup = true}});
|
||||
VerifyBaseDataset(&store, GetParam(), true);
|
||||
VerifyExtendedDataset(&store);
|
||||
|
||||
// Try to use the storage.
|
||||
{
|
||||
auto acc = store.Access();
|
||||
auto vertex = acc.CreateVertex();
|
||||
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
||||
ASSERT_TRUE(edge.HasValue());
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, SnapshotPeriodic) {
|
||||
// Create snapshot.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_type =
|
||||
storage::Config::Durability::SnapshotType::PERIODIC,
|
||||
.snapshot_interval = std::chrono::milliseconds(2000)}});
|
||||
CreateBaseDataset(&store, GetParam());
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
||||
}
|
||||
|
||||
// Recover snapshot.
|
||||
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.recover_on_startup = true}});
|
||||
VerifyBaseDataset(&store, GetParam(), false);
|
||||
|
||||
// Try to use the storage.
|
||||
{
|
||||
auto acc = store.Access();
|
||||
auto vertex = acc.CreateVertex();
|
||||
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
||||
ASSERT_TRUE(edge.HasValue());
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, SnapshotFallback) {
|
||||
// Create snapshot.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_type =
|
||||
storage::Config::Durability::SnapshotType::PERIODIC,
|
||||
.snapshot_interval = std::chrono::milliseconds(2000)}});
|
||||
CreateBaseDataset(&store, GetParam());
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
||||
CreateExtendedDataset(&store);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
||||
}
|
||||
|
||||
// Destroy last snapshot.
|
||||
{
|
||||
auto snapshots = GetSnapshotsList();
|
||||
ASSERT_GE(snapshots.size(), 2);
|
||||
|
||||
auto info = storage::ReadSnapshotInfo(*snapshots.begin());
|
||||
|
||||
LOG(INFO) << "Destroying snapshot " << *snapshots.begin();
|
||||
utils::OutputFile file;
|
||||
file.Open(*snapshots.begin(), utils::OutputFile::Mode::OVERWRITE_EXISTING);
|
||||
file.SetPosition(utils::OutputFile::Position::SET, info.offset_vertices);
|
||||
auto value = static_cast<uint8_t>(storage::Marker::TYPE_MAP);
|
||||
file.Write(&value, sizeof(value));
|
||||
file.Sync();
|
||||
file.Close();
|
||||
}
|
||||
|
||||
// Recover snapshot.
|
||||
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.recover_on_startup = true}});
|
||||
VerifyBaseDataset(&store, GetParam(), false);
|
||||
|
||||
// Try to use the storage.
|
||||
{
|
||||
auto acc = store.Access();
|
||||
auto vertex = acc.CreateVertex();
|
||||
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
||||
ASSERT_TRUE(edge.HasValue());
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, SnapshotRetention) {
|
||||
// Create snapshot.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_type =
|
||||
storage::Config::Durability::SnapshotType::PERIODIC,
|
||||
.snapshot_interval = std::chrono::milliseconds(2000),
|
||||
.snapshot_retention_count = 3}});
|
||||
CreateBaseDataset(&store, GetParam());
|
||||
// Allow approximately 5 snapshots to be created.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
|
||||
}
|
||||
|
||||
// Verify that exactly 3 snapshots exist.
|
||||
{
|
||||
auto snapshots = GetSnapshotsList();
|
||||
ASSERT_EQ(snapshots.size(), 3);
|
||||
for (const auto &path : snapshots) {
|
||||
// This shouldn't throw.
|
||||
storage::ReadSnapshotInfo(path);
|
||||
}
|
||||
}
|
||||
|
||||
// Recover snapshot.
|
||||
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.recover_on_startup = true}});
|
||||
VerifyBaseDataset(&store, GetParam(), false);
|
||||
|
||||
// Try to use the storage.
|
||||
{
|
||||
auto acc = store.Access();
|
||||
auto vertex = acc.CreateVertex();
|
||||
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
||||
ASSERT_TRUE(edge.HasValue());
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_F(DurabilityTest,
|
||||
SnapshotWithoutPropertiesOnEdgesRecoveryWithPropertiesOnEdges) {
|
||||
// Create snapshot.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = false},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_on_exit = true}});
|
||||
CreateBaseDataset(&store, false);
|
||||
VerifyBaseDataset(&store, false, false);
|
||||
CreateExtendedDataset(&store);
|
||||
VerifyBaseDataset(&store, false, true);
|
||||
VerifyExtendedDataset(&store);
|
||||
}
|
||||
|
||||
// Recover snapshot.
|
||||
storage::Storage store({.items = {.properties_on_edges = true},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.recover_on_startup = true}});
|
||||
VerifyBaseDataset(&store, false, true);
|
||||
VerifyExtendedDataset(&store);
|
||||
|
||||
// Try to use the storage.
|
||||
{
|
||||
auto acc = store.Access();
|
||||
auto vertex = acc.CreateVertex();
|
||||
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
||||
ASSERT_TRUE(edge.HasValue());
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_F(DurabilityTest,
|
||||
SnapshotWithPropertiesOnEdgesRecoveryWithoutPropertiesOnEdges) {
|
||||
// Create snapshot.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = true},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_on_exit = true}});
|
||||
CreateBaseDataset(&store, true);
|
||||
VerifyBaseDataset(&store, true, false);
|
||||
CreateExtendedDataset(&store);
|
||||
VerifyBaseDataset(&store, true, true);
|
||||
VerifyExtendedDataset(&store);
|
||||
}
|
||||
|
||||
// Recover snapshot.
|
||||
storage::Storage store({.items = {.properties_on_edges = false},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.recover_on_startup = true}});
|
||||
{
|
||||
std::vector<storage::VertexAccessor> vertices;
|
||||
auto acc = store.Access();
|
||||
for (auto vertex : acc.Vertices(storage::View::OLD)) {
|
||||
vertices.push_back(vertex);
|
||||
}
|
||||
ASSERT_EQ(vertices.size(), 0);
|
||||
}
|
||||
|
||||
// Try to use the storage.
|
||||
{
|
||||
auto acc = store.Access();
|
||||
auto vertex = acc.CreateVertex();
|
||||
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
||||
ASSERT_TRUE(edge.HasValue());
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_F(DurabilityTest,
|
||||
SnapshotWithPropertiesOnEdgesButUnusedRecoveryWithoutPropertiesOnEdges) {
|
||||
// Create snapshot.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = true},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_on_exit = true}});
|
||||
CreateBaseDataset(&store, true);
|
||||
VerifyBaseDataset(&store, true, false);
|
||||
CreateExtendedDataset(&store);
|
||||
VerifyBaseDataset(&store, true, true);
|
||||
VerifyExtendedDataset(&store);
|
||||
// Remove properties from edges.
|
||||
{
|
||||
auto acc = store.Access();
|
||||
for (auto vertex : acc.Vertices(storage::View::OLD)) {
|
||||
auto in_edges = vertex.InEdges({}, storage::View::OLD);
|
||||
ASSERT_TRUE(in_edges.HasValue());
|
||||
for (auto edge : *in_edges) {
|
||||
// TODO (mferencevic): Replace with `ClearProperties()`
|
||||
auto props = edge.Properties(storage::View::NEW);
|
||||
ASSERT_TRUE(props.HasValue());
|
||||
for (const auto &prop : *props) {
|
||||
ASSERT_TRUE(edge.SetProperty(prop.first, storage::PropertyValue())
|
||||
.HasValue());
|
||||
}
|
||||
}
|
||||
auto out_edges = vertex.InEdges({}, storage::View::OLD);
|
||||
ASSERT_TRUE(out_edges.HasValue());
|
||||
for (auto edge : *out_edges) {
|
||||
// TODO (mferencevic): Replace with `ClearProperties()`
|
||||
auto props = edge.Properties(storage::View::NEW);
|
||||
ASSERT_TRUE(props.HasValue());
|
||||
for (const auto &prop : *props) {
|
||||
ASSERT_TRUE(edge.SetProperty(prop.first, storage::PropertyValue())
|
||||
.HasValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
}
|
||||
|
||||
// Recover snapshot.
|
||||
storage::Storage store({.items = {.properties_on_edges = false},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.recover_on_startup = true}});
|
||||
VerifyBaseDataset(&store, false, true);
|
||||
VerifyExtendedDataset(&store);
|
||||
|
||||
// Try to use the storage.
|
||||
{
|
||||
auto acc = store.Access();
|
||||
auto vertex = acc.CreateVertex();
|
||||
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
||||
ASSERT_TRUE(edge.HasValue());
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user