10cc831ed2
Summary: Edge filters (edge type and destination vertex) are now handled natively in the storage API. The API is implemented to be the fastest possible when using the filters with the assumption that the number of edge types will be small. Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2576
2518 lines
91 KiB
C++
2518 lines
91 KiB
C++
#include <gmock/gmock.h>
|
|
#include <gtest/gtest.h>
|
|
|
|
#include <csignal>
|
|
|
|
#include <sys/types.h>
|
|
#include <sys/wait.h>
|
|
#include <unistd.h>
|
|
|
|
#include <algorithm>
|
|
#include <chrono>
|
|
#include <filesystem>
|
|
#include <iostream>
|
|
#include <thread>
|
|
|
|
#include "storage/v2/durability.hpp"
|
|
#include "storage/v2/storage.hpp"
|
|
#include "utils/file.hpp"
|
|
#include "utils/timer.hpp"
|
|
|
|
using testing::Contains;
|
|
using testing::UnorderedElementsAre;
|
|
|
|
class DurabilityTest : public ::testing::TestWithParam<bool> {
|
|
protected:
|
|
const uint64_t kNumBaseVertices = 1000;
|
|
const uint64_t kNumBaseEdges = 10000;
|
|
const uint64_t kNumExtendedVertices = 100;
|
|
const uint64_t kNumExtendedEdges = 1000;
|
|
|
|
// We don't want to flush the WAL while we are doing operations because the
|
|
// flushing adds a large overhead that slows down execution.
|
|
const uint64_t kFlushWalEvery = (kNumBaseVertices + kNumBaseEdges +
|
|
kNumExtendedVertices + kNumExtendedEdges) *
|
|
2;
|
|
|
|
enum class DatasetType {
|
|
ONLY_BASE,
|
|
ONLY_BASE_WITH_EXTENDED_INDICES_AND_CONSTRAINTS,
|
|
ONLY_EXTENDED,
|
|
ONLY_EXTENDED_WITH_BASE_INDICES_AND_CONSTRAINTS,
|
|
BASE_WITH_EXTENDED,
|
|
};
|
|
|
|
public:
|
|
DurabilityTest()
|
|
: base_vertex_gids_(
|
|
kNumBaseVertices,
|
|
storage::Gid::FromUint(std::numeric_limits<uint64_t>::max())),
|
|
base_edge_gids_(
|
|
kNumBaseEdges,
|
|
storage::Gid::FromUint(std::numeric_limits<uint64_t>::max())),
|
|
extended_vertex_gids_(
|
|
kNumExtendedVertices,
|
|
storage::Gid::FromUint(std::numeric_limits<uint64_t>::max())),
|
|
extended_edge_gids_(
|
|
kNumExtendedEdges,
|
|
storage::Gid::FromUint(std::numeric_limits<uint64_t>::max())) {}
|
|
|
|
void SetUp() override { Clear(); }
|
|
|
|
void TearDown() override { Clear(); }
|
|
|
|
void CreateBaseDataset(storage::Storage *store, bool properties_on_edges) {
|
|
auto label_indexed = store->NameToLabel("base_indexed");
|
|
auto label_unindexed = store->NameToLabel("base_unindexed");
|
|
auto property_id = store->NameToProperty("id");
|
|
auto et1 = store->NameToEdgeType("base_et1");
|
|
auto et2 = store->NameToEdgeType("base_et2");
|
|
|
|
// Create label index.
|
|
ASSERT_TRUE(store->CreateIndex(label_unindexed));
|
|
|
|
// Create label+property index.
|
|
ASSERT_TRUE(store->CreateIndex(label_indexed, property_id));
|
|
|
|
// Create existence constraint.
|
|
ASSERT_FALSE(store->CreateExistenceConstraint(label_unindexed, property_id)
|
|
.HasError());
|
|
|
|
// Create vertices.
|
|
for (uint64_t i = 0; i < kNumBaseVertices; ++i) {
|
|
auto acc = store->Access();
|
|
auto vertex = acc.CreateVertex();
|
|
base_vertex_gids_[i] = vertex.Gid();
|
|
if (i < kNumBaseVertices / 2) {
|
|
ASSERT_TRUE(vertex.AddLabel(label_indexed).HasValue());
|
|
} else {
|
|
ASSERT_TRUE(vertex.AddLabel(label_unindexed).HasValue());
|
|
}
|
|
if (i < kNumBaseVertices / 3 || i >= kNumBaseVertices / 2) {
|
|
ASSERT_TRUE(vertex
|
|
.SetProperty(property_id, storage::PropertyValue(
|
|
static_cast<int64_t>(i)))
|
|
.HasValue());
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
|
|
// Create edges.
|
|
for (uint64_t i = 0; i < kNumBaseEdges; ++i) {
|
|
auto acc = store->Access();
|
|
auto vertex1 = acc.FindVertex(
|
|
base_vertex_gids_[(i / 2) % kNumBaseVertices], storage::View::OLD);
|
|
ASSERT_TRUE(vertex1);
|
|
auto vertex2 = acc.FindVertex(
|
|
base_vertex_gids_[(i / 3) % kNumBaseVertices], storage::View::OLD);
|
|
ASSERT_TRUE(vertex2);
|
|
storage::EdgeTypeId et;
|
|
if (i < kNumBaseEdges / 2) {
|
|
et = et1;
|
|
} else {
|
|
et = et2;
|
|
}
|
|
auto edge = acc.CreateEdge(&*vertex1, &*vertex2, et);
|
|
ASSERT_TRUE(edge.HasValue());
|
|
base_edge_gids_[i] = edge->Gid();
|
|
if (properties_on_edges) {
|
|
ASSERT_TRUE(edge->SetProperty(property_id, storage::PropertyValue(
|
|
static_cast<int64_t>(i)))
|
|
.HasValue());
|
|
} else {
|
|
auto ret = edge->SetProperty(
|
|
property_id, storage::PropertyValue(static_cast<int64_t>(i)));
|
|
ASSERT_TRUE(ret.HasError());
|
|
ASSERT_EQ(ret.GetError(), storage::Error::PROPERTIES_DISABLED);
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
void CreateExtendedDataset(storage::Storage *store,
|
|
bool single_transaction = false) {
|
|
auto label_indexed = store->NameToLabel("extended_indexed");
|
|
auto label_unused = store->NameToLabel("extended_unused");
|
|
auto property_count = store->NameToProperty("count");
|
|
auto et3 = store->NameToEdgeType("extended_et3");
|
|
auto et4 = store->NameToEdgeType("extended_et4");
|
|
|
|
// Create label index.
|
|
ASSERT_TRUE(store->CreateIndex(label_unused));
|
|
|
|
// Create label+property index.
|
|
ASSERT_TRUE(store->CreateIndex(label_indexed, property_count));
|
|
|
|
// Create existence constraint.
|
|
ASSERT_FALSE(store->CreateExistenceConstraint(label_unused, property_count)
|
|
.HasError());
|
|
|
|
// Storage accessor.
|
|
std::optional<storage::Storage::Accessor> acc;
|
|
if (single_transaction) acc.emplace(store->Access());
|
|
|
|
// Create vertices.
|
|
for (uint64_t i = 0; i < kNumExtendedVertices; ++i) {
|
|
if (!single_transaction) acc.emplace(store->Access());
|
|
auto vertex = acc->CreateVertex();
|
|
extended_vertex_gids_[i] = vertex.Gid();
|
|
if (i < kNumExtendedVertices / 2) {
|
|
ASSERT_TRUE(vertex.AddLabel(label_indexed).HasValue());
|
|
}
|
|
if (i < kNumExtendedVertices / 3 || i >= kNumExtendedVertices / 2) {
|
|
ASSERT_TRUE(
|
|
vertex
|
|
.SetProperty(property_count, storage::PropertyValue("nandare"))
|
|
.HasValue());
|
|
}
|
|
if (!single_transaction) ASSERT_FALSE(acc->Commit().HasError());
|
|
}
|
|
|
|
// Create edges.
|
|
for (uint64_t i = 0; i < kNumExtendedEdges; ++i) {
|
|
if (!single_transaction) acc.emplace(store->Access());
|
|
auto vertex1 =
|
|
acc->FindVertex(extended_vertex_gids_[(i / 5) % kNumExtendedVertices],
|
|
storage::View::NEW);
|
|
ASSERT_TRUE(vertex1);
|
|
auto vertex2 =
|
|
acc->FindVertex(extended_vertex_gids_[(i / 6) % kNumExtendedVertices],
|
|
storage::View::NEW);
|
|
ASSERT_TRUE(vertex2);
|
|
storage::EdgeTypeId et;
|
|
if (i < kNumExtendedEdges / 4) {
|
|
et = et3;
|
|
} else {
|
|
et = et4;
|
|
}
|
|
auto edge = acc->CreateEdge(&*vertex1, &*vertex2, et);
|
|
ASSERT_TRUE(edge.HasValue());
|
|
extended_edge_gids_[i] = edge->Gid();
|
|
if (!single_transaction) ASSERT_FALSE(acc->Commit().HasError());
|
|
}
|
|
|
|
if (single_transaction) ASSERT_FALSE(acc->Commit().HasError());
|
|
}
|
|
|
|
void VerifyDataset(storage::Storage *store, DatasetType type,
|
|
bool properties_on_edges) {
|
|
auto base_label_indexed = store->NameToLabel("base_indexed");
|
|
auto base_label_unindexed = store->NameToLabel("base_unindexed");
|
|
auto property_id = store->NameToProperty("id");
|
|
auto et1 = store->NameToEdgeType("base_et1");
|
|
auto et2 = store->NameToEdgeType("base_et2");
|
|
|
|
auto extended_label_indexed = store->NameToLabel("extended_indexed");
|
|
auto extended_label_unused = store->NameToLabel("extended_unused");
|
|
auto property_count = store->NameToProperty("count");
|
|
auto et3 = store->NameToEdgeType("extended_et3");
|
|
auto et4 = store->NameToEdgeType("extended_et4");
|
|
|
|
// Verify indices info.
|
|
{
|
|
auto info = store->ListAllIndices();
|
|
switch (type) {
|
|
case DatasetType::ONLY_BASE:
|
|
ASSERT_THAT(info.label, UnorderedElementsAre(base_label_unindexed));
|
|
ASSERT_THAT(info.label_property,
|
|
UnorderedElementsAre(
|
|
std::make_pair(base_label_indexed, property_id)));
|
|
break;
|
|
case DatasetType::ONLY_EXTENDED:
|
|
ASSERT_THAT(info.label, UnorderedElementsAre(extended_label_unused));
|
|
ASSERT_THAT(
|
|
info.label_property,
|
|
UnorderedElementsAre(
|
|
std::make_pair(base_label_indexed, property_id),
|
|
std::make_pair(extended_label_indexed, property_count)));
|
|
break;
|
|
case DatasetType::ONLY_BASE_WITH_EXTENDED_INDICES_AND_CONSTRAINTS:
|
|
case DatasetType::ONLY_EXTENDED_WITH_BASE_INDICES_AND_CONSTRAINTS:
|
|
case DatasetType::BASE_WITH_EXTENDED:
|
|
ASSERT_THAT(info.label, UnorderedElementsAre(base_label_unindexed,
|
|
extended_label_unused));
|
|
ASSERT_THAT(
|
|
info.label_property,
|
|
UnorderedElementsAre(
|
|
std::make_pair(base_label_indexed, property_id),
|
|
std::make_pair(extended_label_indexed, property_count)));
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Verify constraints info.
|
|
{
|
|
auto info = store->ListAllConstraints();
|
|
switch (type) {
|
|
case DatasetType::ONLY_BASE:
|
|
ASSERT_THAT(info.existence, UnorderedElementsAre(std::make_pair(
|
|
base_label_unindexed, property_id)));
|
|
break;
|
|
case DatasetType::ONLY_EXTENDED:
|
|
ASSERT_THAT(info.existence,
|
|
UnorderedElementsAre(std::make_pair(extended_label_unused,
|
|
property_count)));
|
|
break;
|
|
case DatasetType::ONLY_BASE_WITH_EXTENDED_INDICES_AND_CONSTRAINTS:
|
|
case DatasetType::ONLY_EXTENDED_WITH_BASE_INDICES_AND_CONSTRAINTS:
|
|
case DatasetType::BASE_WITH_EXTENDED:
|
|
ASSERT_THAT(
|
|
info.existence,
|
|
UnorderedElementsAre(
|
|
std::make_pair(base_label_unindexed, property_id),
|
|
std::make_pair(extended_label_unused, property_count)));
|
|
break;
|
|
}
|
|
}
|
|
|
|
bool have_base_dataset = false;
|
|
bool have_extended_dataset = false;
|
|
switch (type) {
|
|
case DatasetType::ONLY_BASE:
|
|
case DatasetType::ONLY_BASE_WITH_EXTENDED_INDICES_AND_CONSTRAINTS:
|
|
have_base_dataset = true;
|
|
break;
|
|
case DatasetType::ONLY_EXTENDED:
|
|
case DatasetType::ONLY_EXTENDED_WITH_BASE_INDICES_AND_CONSTRAINTS:
|
|
have_extended_dataset = true;
|
|
break;
|
|
case DatasetType::BASE_WITH_EXTENDED:
|
|
have_base_dataset = true;
|
|
have_extended_dataset = true;
|
|
break;
|
|
}
|
|
|
|
// Create storage accessor.
|
|
auto acc = store->Access();
|
|
|
|
// Verify base dataset.
|
|
if (have_base_dataset) {
|
|
// Verify vertices.
|
|
for (uint64_t i = 0; i < kNumBaseVertices; ++i) {
|
|
auto vertex = acc.FindVertex(base_vertex_gids_[i], storage::View::OLD);
|
|
ASSERT_TRUE(vertex);
|
|
auto labels = vertex->Labels(storage::View::OLD);
|
|
ASSERT_TRUE(labels.HasValue());
|
|
if (i < kNumBaseVertices / 2) {
|
|
ASSERT_THAT(*labels, UnorderedElementsAre(base_label_indexed));
|
|
} else {
|
|
ASSERT_THAT(*labels, UnorderedElementsAre(base_label_unindexed));
|
|
}
|
|
auto properties = vertex->Properties(storage::View::OLD);
|
|
ASSERT_TRUE(properties.HasValue());
|
|
if (i < kNumBaseVertices / 3 || i >= kNumBaseVertices / 2) {
|
|
ASSERT_EQ(properties->size(), 1);
|
|
ASSERT_EQ((*properties)[property_id],
|
|
storage::PropertyValue(static_cast<int64_t>(i)));
|
|
} else {
|
|
ASSERT_EQ(properties->size(), 0);
|
|
}
|
|
}
|
|
|
|
// Verify edges.
|
|
for (uint64_t i = 0; i < kNumBaseEdges; ++i) {
|
|
auto find_edge =
|
|
[&](const auto &edges) -> std::optional<storage::EdgeAccessor> {
|
|
for (const auto &edge : edges) {
|
|
if (edge.Gid() == base_edge_gids_[i]) {
|
|
return edge;
|
|
}
|
|
}
|
|
return std::nullopt;
|
|
};
|
|
|
|
{
|
|
auto vertex1 =
|
|
acc.FindVertex(base_vertex_gids_[(i / 2) % kNumBaseVertices],
|
|
storage::View::OLD);
|
|
ASSERT_TRUE(vertex1);
|
|
auto out_edges = vertex1->OutEdges(storage::View::OLD);
|
|
ASSERT_TRUE(out_edges.HasValue());
|
|
auto edge1 = find_edge(*out_edges);
|
|
ASSERT_TRUE(edge1);
|
|
if (i < kNumBaseEdges / 2) {
|
|
ASSERT_EQ(edge1->EdgeType(), et1);
|
|
} else {
|
|
ASSERT_EQ(edge1->EdgeType(), et2);
|
|
}
|
|
auto properties = edge1->Properties(storage::View::OLD);
|
|
ASSERT_TRUE(properties.HasValue());
|
|
if (properties_on_edges) {
|
|
ASSERT_EQ(properties->size(), 1);
|
|
ASSERT_EQ((*properties)[property_id],
|
|
storage::PropertyValue(static_cast<int64_t>(i)));
|
|
} else {
|
|
ASSERT_EQ(properties->size(), 0);
|
|
}
|
|
}
|
|
|
|
{
|
|
auto vertex2 =
|
|
acc.FindVertex(base_vertex_gids_[(i / 3) % kNumBaseVertices],
|
|
storage::View::OLD);
|
|
ASSERT_TRUE(vertex2);
|
|
auto in_edges = vertex2->InEdges(storage::View::OLD);
|
|
ASSERT_TRUE(in_edges.HasValue());
|
|
auto edge2 = find_edge(*in_edges);
|
|
ASSERT_TRUE(edge2);
|
|
if (i < kNumBaseEdges / 2) {
|
|
ASSERT_EQ(edge2->EdgeType(), et1);
|
|
} else {
|
|
ASSERT_EQ(edge2->EdgeType(), et2);
|
|
}
|
|
auto properties = edge2->Properties(storage::View::OLD);
|
|
ASSERT_TRUE(properties.HasValue());
|
|
if (properties_on_edges) {
|
|
ASSERT_EQ(properties->size(), 1);
|
|
ASSERT_EQ((*properties)[property_id],
|
|
storage::PropertyValue(static_cast<int64_t>(i)));
|
|
} else {
|
|
ASSERT_EQ(properties->size(), 0);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Verify label indices.
|
|
{
|
|
std::vector<storage::VertexAccessor> vertices;
|
|
vertices.reserve(kNumBaseVertices / 2);
|
|
for (auto vertex :
|
|
acc.Vertices(base_label_unindexed, storage::View::OLD)) {
|
|
vertices.push_back(vertex);
|
|
}
|
|
ASSERT_EQ(vertices.size(), kNumBaseVertices / 2);
|
|
std::sort(
|
|
vertices.begin(), vertices.end(),
|
|
[](const auto &a, const auto &b) { return a.Gid() < b.Gid(); });
|
|
for (uint64_t i = 0; i < kNumBaseVertices / 2; ++i) {
|
|
ASSERT_EQ(vertices[i].Gid(),
|
|
base_vertex_gids_[kNumBaseVertices / 2 + i]);
|
|
}
|
|
}
|
|
|
|
// Verify label+property index.
|
|
{
|
|
std::vector<storage::VertexAccessor> vertices;
|
|
vertices.reserve(kNumBaseVertices / 3);
|
|
for (auto vertex : acc.Vertices(base_label_indexed, property_id,
|
|
storage::View::OLD)) {
|
|
vertices.push_back(vertex);
|
|
}
|
|
ASSERT_EQ(vertices.size(), kNumBaseVertices / 3);
|
|
std::sort(
|
|
vertices.begin(), vertices.end(),
|
|
[](const auto &a, const auto &b) { return a.Gid() < b.Gid(); });
|
|
for (uint64_t i = 0; i < kNumBaseVertices / 3; ++i) {
|
|
ASSERT_EQ(vertices[i].Gid(), base_vertex_gids_[i]);
|
|
}
|
|
}
|
|
} else {
|
|
// Verify vertices.
|
|
for (uint64_t i = 0; i < kNumBaseVertices; ++i) {
|
|
auto vertex = acc.FindVertex(base_vertex_gids_[i], storage::View::OLD);
|
|
ASSERT_FALSE(vertex);
|
|
}
|
|
|
|
if (type ==
|
|
DatasetType::ONLY_EXTENDED_WITH_BASE_INDICES_AND_CONSTRAINTS) {
|
|
// Verify label indices.
|
|
{
|
|
uint64_t count = 0;
|
|
auto iterable =
|
|
acc.Vertices(base_label_unindexed, storage::View::OLD);
|
|
for (auto it = iterable.begin(); it != iterable.end(); ++it) {
|
|
++count;
|
|
}
|
|
ASSERT_EQ(count, 0);
|
|
}
|
|
|
|
// Verify label+property index.
|
|
{
|
|
uint64_t count = 0;
|
|
auto iterable =
|
|
acc.Vertices(base_label_indexed, property_id, storage::View::OLD);
|
|
for (auto it = iterable.begin(); it != iterable.end(); ++it) {
|
|
++count;
|
|
}
|
|
ASSERT_EQ(count, 0);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Verify extended dataset.
|
|
if (have_extended_dataset) {
|
|
// Verify vertices.
|
|
for (uint64_t i = 0; i < kNumExtendedVertices; ++i) {
|
|
auto vertex =
|
|
acc.FindVertex(extended_vertex_gids_[i], storage::View::OLD);
|
|
ASSERT_TRUE(vertex);
|
|
auto labels = vertex->Labels(storage::View::OLD);
|
|
ASSERT_TRUE(labels.HasValue());
|
|
if (i < kNumExtendedVertices / 2) {
|
|
ASSERT_THAT(*labels, UnorderedElementsAre(extended_label_indexed));
|
|
}
|
|
auto properties = vertex->Properties(storage::View::OLD);
|
|
ASSERT_TRUE(properties.HasValue());
|
|
if (i < kNumExtendedVertices / 3 || i >= kNumExtendedVertices / 2) {
|
|
ASSERT_EQ(properties->size(), 1);
|
|
ASSERT_EQ((*properties)[property_count],
|
|
storage::PropertyValue("nandare"));
|
|
} else {
|
|
ASSERT_EQ(properties->size(), 0);
|
|
}
|
|
}
|
|
|
|
// Verify edges.
|
|
for (uint64_t i = 0; i < kNumExtendedEdges; ++i) {
|
|
auto find_edge =
|
|
[&](const auto &edges) -> std::optional<storage::EdgeAccessor> {
|
|
for (const auto &edge : edges) {
|
|
if (edge.Gid() == extended_edge_gids_[i]) {
|
|
return edge;
|
|
}
|
|
}
|
|
return std::nullopt;
|
|
};
|
|
|
|
{
|
|
auto vertex1 = acc.FindVertex(
|
|
extended_vertex_gids_[(i / 5) % kNumExtendedVertices],
|
|
storage::View::OLD);
|
|
ASSERT_TRUE(vertex1);
|
|
auto out_edges = vertex1->OutEdges(storage::View::OLD);
|
|
ASSERT_TRUE(out_edges.HasValue());
|
|
auto edge1 = find_edge(*out_edges);
|
|
ASSERT_TRUE(edge1);
|
|
if (i < kNumExtendedEdges / 4) {
|
|
ASSERT_EQ(edge1->EdgeType(), et3);
|
|
} else {
|
|
ASSERT_EQ(edge1->EdgeType(), et4);
|
|
}
|
|
auto properties = edge1->Properties(storage::View::OLD);
|
|
ASSERT_TRUE(properties.HasValue());
|
|
ASSERT_EQ(properties->size(), 0);
|
|
}
|
|
|
|
{
|
|
auto vertex2 = acc.FindVertex(
|
|
extended_vertex_gids_[(i / 6) % kNumExtendedVertices],
|
|
storage::View::OLD);
|
|
ASSERT_TRUE(vertex2);
|
|
auto in_edges = vertex2->InEdges(storage::View::OLD);
|
|
ASSERT_TRUE(in_edges.HasValue());
|
|
auto edge2 = find_edge(*in_edges);
|
|
ASSERT_TRUE(edge2);
|
|
if (i < kNumExtendedEdges / 4) {
|
|
ASSERT_EQ(edge2->EdgeType(), et3);
|
|
} else {
|
|
ASSERT_EQ(edge2->EdgeType(), et4);
|
|
}
|
|
auto properties = edge2->Properties(storage::View::OLD);
|
|
ASSERT_TRUE(properties.HasValue());
|
|
ASSERT_EQ(properties->size(), 0);
|
|
}
|
|
}
|
|
|
|
// Verify label indices.
|
|
{
|
|
std::vector<storage::VertexAccessor> vertices;
|
|
vertices.reserve(kNumExtendedVertices / 2);
|
|
for (auto vertex :
|
|
acc.Vertices(extended_label_unused, storage::View::OLD)) {
|
|
vertices.push_back(vertex);
|
|
}
|
|
ASSERT_EQ(vertices.size(), 0);
|
|
}
|
|
|
|
// Verify label+property index.
|
|
{
|
|
std::vector<storage::VertexAccessor> vertices;
|
|
vertices.reserve(kNumExtendedVertices / 3);
|
|
for (auto vertex : acc.Vertices(extended_label_indexed, property_count,
|
|
storage::View::OLD)) {
|
|
vertices.push_back(vertex);
|
|
}
|
|
ASSERT_EQ(vertices.size(), kNumExtendedVertices / 3);
|
|
std::sort(
|
|
vertices.begin(), vertices.end(),
|
|
[](const auto &a, const auto &b) { return a.Gid() < b.Gid(); });
|
|
for (uint64_t i = 0; i < kNumExtendedVertices / 3; ++i) {
|
|
ASSERT_EQ(vertices[i].Gid(), extended_vertex_gids_[i]);
|
|
}
|
|
}
|
|
} else {
|
|
// Verify vertices.
|
|
for (uint64_t i = 0; i < kNumExtendedVertices; ++i) {
|
|
auto vertex =
|
|
acc.FindVertex(extended_vertex_gids_[i], storage::View::OLD);
|
|
ASSERT_FALSE(vertex);
|
|
}
|
|
|
|
if (type ==
|
|
DatasetType::ONLY_BASE_WITH_EXTENDED_INDICES_AND_CONSTRAINTS) {
|
|
// Verify label indices.
|
|
{
|
|
uint64_t count = 0;
|
|
auto iterable =
|
|
acc.Vertices(extended_label_unused, storage::View::OLD);
|
|
for (auto it = iterable.begin(); it != iterable.end(); ++it) {
|
|
++count;
|
|
}
|
|
ASSERT_EQ(count, 0);
|
|
}
|
|
|
|
// Verify label+property index.
|
|
{
|
|
uint64_t count = 0;
|
|
auto iterable = acc.Vertices(extended_label_indexed, property_count,
|
|
storage::View::OLD);
|
|
for (auto it = iterable.begin(); it != iterable.end(); ++it) {
|
|
++count;
|
|
}
|
|
ASSERT_EQ(count, 0);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
std::vector<std::filesystem::path> GetSnapshotsList() {
|
|
return GetFilesList(storage_directory / storage::kSnapshotDirectory);
|
|
}
|
|
|
|
std::vector<std::filesystem::path> GetBackupSnapshotsList() {
|
|
return GetFilesList(storage_directory / storage::kBackupDirectory /
|
|
storage::kSnapshotDirectory);
|
|
}
|
|
|
|
std::vector<std::filesystem::path> GetWalsList() {
|
|
return GetFilesList(storage_directory / storage::kWalDirectory);
|
|
}
|
|
|
|
std::vector<std::filesystem::path> GetBackupWalsList() {
|
|
return GetFilesList(storage_directory / storage::kBackupDirectory /
|
|
storage::kWalDirectory);
|
|
}
|
|
|
|
void RestoreBackups() {
|
|
{
|
|
auto backup_snapshots = GetBackupSnapshotsList();
|
|
for (const auto &item : backup_snapshots) {
|
|
std::filesystem::rename(
|
|
item,
|
|
storage_directory / storage::kSnapshotDirectory / item.filename());
|
|
}
|
|
}
|
|
{
|
|
auto backup_wals = GetBackupWalsList();
|
|
for (const auto &item : backup_wals) {
|
|
std::filesystem::rename(
|
|
item, storage_directory / storage::kWalDirectory / item.filename());
|
|
}
|
|
}
|
|
}
|
|
|
|
std::filesystem::path storage_directory{
|
|
std::filesystem::temp_directory_path() /
|
|
"MG_test_unit_storage_v2_durability"};
|
|
|
|
private:
|
|
std::vector<std::filesystem::path> GetFilesList(
|
|
const std::filesystem::path &path) {
|
|
std::vector<std::filesystem::path> ret;
|
|
std::error_code ec; // For exception suppression.
|
|
for (auto &item : std::filesystem::directory_iterator(path, ec)) {
|
|
ret.push_back(item.path());
|
|
}
|
|
std::sort(ret.begin(), ret.end());
|
|
std::reverse(ret.begin(), ret.end());
|
|
return ret;
|
|
}
|
|
|
|
void Clear() {
|
|
if (!std::filesystem::exists(storage_directory)) return;
|
|
std::filesystem::remove_all(storage_directory);
|
|
}
|
|
|
|
std::vector<storage::Gid> base_vertex_gids_;
|
|
std::vector<storage::Gid> base_edge_gids_;
|
|
std::vector<storage::Gid> extended_vertex_gids_;
|
|
std::vector<storage::Gid> extended_edge_gids_;
|
|
};
|
|
|
|
void DestroySnapshot(const std::filesystem::path &path) {
|
|
auto info = storage::ReadSnapshotInfo(path);
|
|
LOG(INFO) << "Destroying snapshot " << path;
|
|
utils::OutputFile file;
|
|
file.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING);
|
|
file.SetPosition(utils::OutputFile::Position::SET, info.offset_vertices);
|
|
auto value = static_cast<uint8_t>(storage::Marker::TYPE_MAP);
|
|
file.Write(&value, sizeof(value));
|
|
file.Sync();
|
|
file.Close();
|
|
}
|
|
|
|
void DestroyWalFirstDelta(const std::filesystem::path &path) {
|
|
auto info = storage::ReadWalInfo(path);
|
|
LOG(INFO) << "Destroying WAL " << path;
|
|
utils::OutputFile file;
|
|
file.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING);
|
|
file.SetPosition(utils::OutputFile::Position::SET, info.offset_deltas);
|
|
auto value = static_cast<uint8_t>(storage::Marker::TYPE_MAP);
|
|
file.Write(&value, sizeof(value));
|
|
file.Sync();
|
|
file.Close();
|
|
}
|
|
|
|
void DestroyWalSuffix(const std::filesystem::path &path) {
|
|
auto info = storage::ReadWalInfo(path);
|
|
LOG(INFO) << "Destroying WAL " << path;
|
|
utils::OutputFile file;
|
|
file.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING);
|
|
ASSERT_LT(
|
|
info.offset_deltas,
|
|
file.SetPosition(utils::OutputFile::Position::RELATIVE_TO_END, -100));
|
|
uint8_t value = 0;
|
|
for (size_t i = 0; i < 100; ++i) {
|
|
file.Write(&value, sizeof(value));
|
|
}
|
|
file.Sync();
|
|
file.Close();
|
|
}
|
|
|
|
INSTANTIATE_TEST_CASE_P(EdgesWithProperties, DurabilityTest,
|
|
::testing::Values(true));
|
|
INSTANTIATE_TEST_CASE_P(EdgesWithoutProperties, DurabilityTest,
|
|
::testing::Values(false));
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, SnapshotOnExit) {
|
|
// Create snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_on_exit = true}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam());
|
|
CreateExtendedDataset(&store);
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover snapshot.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam());
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, SnapshotPeriodic) {
|
|
// Create snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT,
|
|
.snapshot_interval = std::chrono::milliseconds(2000)}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
|
}
|
|
|
|
ASSERT_GE(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover snapshot.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam());
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, SnapshotFallback) {
|
|
// Create snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT,
|
|
.snapshot_interval = std::chrono::milliseconds(2000)}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
|
CreateExtendedDataset(&store);
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
|
}
|
|
|
|
ASSERT_GE(GetSnapshotsList().size(), 2);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Destroy last snapshot.
|
|
{
|
|
auto snapshots = GetSnapshotsList();
|
|
ASSERT_GE(snapshots.size(), 2);
|
|
DestroySnapshot(*snapshots.begin());
|
|
}
|
|
|
|
// Recover snapshot.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam());
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, SnapshotEverythingCorrupt) {
|
|
// Create unrelated snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_on_exit = true}});
|
|
auto acc = store.Access();
|
|
for (uint64_t i = 0; i < 1000; ++i) {
|
|
acc.CreateVertex();
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Get unrelated UUID.
|
|
std::string unrelated_uuid;
|
|
{
|
|
auto snapshots = GetSnapshotsList();
|
|
ASSERT_EQ(snapshots.size(), 1);
|
|
auto info = storage::ReadSnapshotInfo(*snapshots.begin());
|
|
unrelated_uuid = info.uuid;
|
|
}
|
|
|
|
// Create snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT,
|
|
.snapshot_interval = std::chrono::milliseconds(2000)}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
|
CreateExtendedDataset(&store);
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
|
}
|
|
|
|
ASSERT_GE(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Restore unrelated snapshots.
|
|
RestoreBackups();
|
|
|
|
ASSERT_GE(GetSnapshotsList().size(), 2);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Destroy all current snapshots.
|
|
{
|
|
auto snapshots = GetSnapshotsList();
|
|
ASSERT_GE(snapshots.size(), 2);
|
|
for (const auto &snapshot : snapshots) {
|
|
auto info = storage::ReadSnapshotInfo(snapshot);
|
|
if (info.uuid == unrelated_uuid) {
|
|
LOG(INFO) << "Skipping snapshot " << snapshot;
|
|
continue;
|
|
}
|
|
DestroySnapshot(snapshot);
|
|
}
|
|
}
|
|
|
|
// Recover snapshot.
|
|
ASSERT_DEATH(
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
},
|
|
"");
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, SnapshotRetention) {
|
|
// Create unrelated snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_on_exit = true}});
|
|
auto acc = store.Access();
|
|
for (uint64_t i = 0; i < 1000; ++i) {
|
|
acc.CreateVertex();
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
|
|
ASSERT_GE(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Create snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT,
|
|
.snapshot_interval = std::chrono::milliseconds(2000),
|
|
.snapshot_retention_count = 3}});
|
|
// Restore unrelated snapshots after the database has been started.
|
|
RestoreBackups();
|
|
CreateBaseDataset(&store, GetParam());
|
|
// Allow approximately 5 snapshots to be created.
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 1 + 3);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Verify that exactly 3 snapshots and 1 unrelated snapshot exist.
|
|
{
|
|
auto snapshots = GetSnapshotsList();
|
|
ASSERT_EQ(snapshots.size(), 1 + 3);
|
|
std::string uuid;
|
|
for (size_t i = 0; i < snapshots.size(); ++i) {
|
|
const auto &path = snapshots[i];
|
|
// This shouldn't throw.
|
|
auto info = storage::ReadSnapshotInfo(path);
|
|
if (i == 0) uuid = info.uuid;
|
|
if (i < snapshots.size() - 1) {
|
|
ASSERT_EQ(info.uuid, uuid);
|
|
} else {
|
|
ASSERT_NE(info.uuid, uuid);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Recover snapshot.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam());
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, SnapshotMixedUUID) {
|
|
// Create snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_on_exit = true}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam());
|
|
CreateExtendedDataset(&store);
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam());
|
|
}
|
|
|
|
// Create another snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_on_exit = true}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Restore unrelated snapshot.
|
|
RestoreBackups();
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 2);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover snapshot.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam());
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, SnapshotBackup) {
|
|
// Create snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_on_exit = true}});
|
|
auto acc = store.Access();
|
|
for (uint64_t i = 0; i < 1000; ++i) {
|
|
acc.CreateVertex();
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Start storage without recovery.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT,
|
|
.snapshot_interval = std::chrono::minutes(20)}});
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_F(DurabilityTest,
|
|
SnapshotWithoutPropertiesOnEdgesRecoveryWithPropertiesOnEdges) {
|
|
// Create snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = false},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_on_exit = true}});
|
|
CreateBaseDataset(&store, false);
|
|
VerifyDataset(&store, DatasetType::ONLY_BASE, false);
|
|
CreateExtendedDataset(&store);
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, false);
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover snapshot.
|
|
storage::Storage store({.items = {.properties_on_edges = true},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, false);
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_F(DurabilityTest,
|
|
SnapshotWithPropertiesOnEdgesRecoveryWithoutPropertiesOnEdges) {
|
|
// Create snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = true},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_on_exit = true}});
|
|
CreateBaseDataset(&store, true);
|
|
VerifyDataset(&store, DatasetType::ONLY_BASE, true);
|
|
CreateExtendedDataset(&store);
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, true);
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover snapshot.
|
|
ASSERT_DEATH(
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = false},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
},
|
|
"");
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_F(DurabilityTest,
|
|
SnapshotWithPropertiesOnEdgesButUnusedRecoveryWithoutPropertiesOnEdges) {
|
|
// Create snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = true},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_on_exit = true}});
|
|
CreateBaseDataset(&store, true);
|
|
VerifyDataset(&store, DatasetType::ONLY_BASE, true);
|
|
CreateExtendedDataset(&store);
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, true);
|
|
// Remove properties from edges.
|
|
{
|
|
auto acc = store.Access();
|
|
for (auto vertex : acc.Vertices(storage::View::OLD)) {
|
|
auto in_edges = vertex.InEdges(storage::View::OLD);
|
|
ASSERT_TRUE(in_edges.HasValue());
|
|
for (auto edge : *in_edges) {
|
|
// TODO (mferencevic): Replace with `ClearProperties()`
|
|
auto props = edge.Properties(storage::View::NEW);
|
|
ASSERT_TRUE(props.HasValue());
|
|
for (const auto &prop : *props) {
|
|
ASSERT_TRUE(edge.SetProperty(prop.first, storage::PropertyValue())
|
|
.HasValue());
|
|
}
|
|
}
|
|
auto out_edges = vertex.InEdges(storage::View::OLD);
|
|
ASSERT_TRUE(out_edges.HasValue());
|
|
for (auto edge : *out_edges) {
|
|
// TODO (mferencevic): Replace with `ClearProperties()`
|
|
auto props = edge.Properties(storage::View::NEW);
|
|
ASSERT_TRUE(props.HasValue());
|
|
for (const auto &prop : *props) {
|
|
ASSERT_TRUE(edge.SetProperty(prop.first, storage::PropertyValue())
|
|
.HasValue());
|
|
}
|
|
}
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover snapshot.
|
|
storage::Storage store({.items = {.properties_on_edges = false},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, false);
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalBasic) {
|
|
// Create WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
CreateExtendedDataset(&store);
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover WALs.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam());
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalBackup) {
|
|
// Create WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_size_kibibytes = 1,
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
auto acc = store.Access();
|
|
for (uint64_t i = 0; i < 1000; ++i) {
|
|
acc.CreateVertex();
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
auto num_wals = GetWalsList().size();
|
|
ASSERT_GE(num_wals, 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Start storage without recovery.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20)}});
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), num_wals);
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalAppendToExisting) {
|
|
// Create WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam());
|
|
}
|
|
|
|
// Recover WALs and create more WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
CreateExtendedDataset(&store);
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 2);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover WALs.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam());
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalCreateInSingleTransaction) {
|
|
// NOLINTNEXTLINE(readability-isolate-declaration)
|
|
storage::Gid gid_v1, gid_v2, gid_e1, gid_v3;
|
|
|
|
// Create WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
auto acc = store.Access();
|
|
auto v1 = acc.CreateVertex();
|
|
gid_v1 = v1.Gid();
|
|
auto v2 = acc.CreateVertex();
|
|
gid_v2 = v2.Gid();
|
|
auto e1 = acc.CreateEdge(&v1, &v2, store.NameToEdgeType("e1"));
|
|
ASSERT_TRUE(e1.HasValue());
|
|
gid_e1 = e1->Gid();
|
|
ASSERT_TRUE(v1.AddLabel(store.NameToLabel("l11")).HasValue());
|
|
ASSERT_TRUE(v1.AddLabel(store.NameToLabel("l12")).HasValue());
|
|
ASSERT_TRUE(v1.AddLabel(store.NameToLabel("l13")).HasValue());
|
|
if (GetParam()) {
|
|
ASSERT_TRUE(e1->SetProperty(store.NameToProperty("test"),
|
|
storage::PropertyValue("nandare"))
|
|
.HasValue());
|
|
}
|
|
ASSERT_TRUE(v2.AddLabel(store.NameToLabel("l21")).HasValue());
|
|
ASSERT_TRUE(v2.SetProperty(store.NameToProperty("hello"),
|
|
storage::PropertyValue("world"))
|
|
.HasValue());
|
|
auto v3 = acc.CreateVertex();
|
|
gid_v3 = v3.Gid();
|
|
ASSERT_TRUE(
|
|
v3.SetProperty(store.NameToProperty("v3"), storage::PropertyValue(42))
|
|
.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover WALs.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
{
|
|
auto indices = store.ListAllIndices();
|
|
ASSERT_EQ(indices.label.size(), 0);
|
|
ASSERT_EQ(indices.label_property.size(), 0);
|
|
auto constraints = store.ListAllConstraints();
|
|
ASSERT_EQ(constraints.existence.size(), 0);
|
|
auto acc = store.Access();
|
|
{
|
|
auto v1 = acc.FindVertex(gid_v1, storage::View::OLD);
|
|
ASSERT_TRUE(v1);
|
|
auto labels = v1->Labels(storage::View::OLD);
|
|
ASSERT_TRUE(labels.HasValue());
|
|
ASSERT_THAT(*labels, UnorderedElementsAre(store.NameToLabel("l11"),
|
|
store.NameToLabel("l12"),
|
|
store.NameToLabel("l13")));
|
|
auto props = v1->Properties(storage::View::OLD);
|
|
ASSERT_TRUE(props.HasValue());
|
|
ASSERT_EQ(props->size(), 0);
|
|
auto in_edges = v1->InEdges(storage::View::OLD);
|
|
ASSERT_TRUE(in_edges.HasValue());
|
|
ASSERT_EQ(in_edges->size(), 0);
|
|
auto out_edges = v1->OutEdges(storage::View::OLD);
|
|
ASSERT_TRUE(out_edges.HasValue());
|
|
ASSERT_EQ(out_edges->size(), 1);
|
|
const auto &edge = (*out_edges)[0];
|
|
ASSERT_EQ(edge.Gid(), gid_e1);
|
|
auto edge_props = edge.Properties(storage::View::OLD);
|
|
ASSERT_TRUE(edge_props.HasValue());
|
|
if (GetParam()) {
|
|
ASSERT_THAT(*edge_props, UnorderedElementsAre(std::make_pair(
|
|
store.NameToProperty("test"),
|
|
storage::PropertyValue("nandare"))));
|
|
} else {
|
|
ASSERT_EQ(edge_props->size(), 0);
|
|
}
|
|
}
|
|
{
|
|
auto v2 = acc.FindVertex(gid_v2, storage::View::OLD);
|
|
ASSERT_TRUE(v2);
|
|
auto labels = v2->Labels(storage::View::OLD);
|
|
ASSERT_TRUE(labels.HasValue());
|
|
ASSERT_THAT(*labels, UnorderedElementsAre(store.NameToLabel("l21")));
|
|
auto props = v2->Properties(storage::View::OLD);
|
|
ASSERT_TRUE(props.HasValue());
|
|
ASSERT_THAT(*props, UnorderedElementsAre(
|
|
std::make_pair(store.NameToProperty("hello"),
|
|
storage::PropertyValue("world"))));
|
|
auto in_edges = v2->InEdges(storage::View::OLD);
|
|
ASSERT_TRUE(in_edges.HasValue());
|
|
ASSERT_EQ(in_edges->size(), 1);
|
|
const auto &edge = (*in_edges)[0];
|
|
ASSERT_EQ(edge.Gid(), gid_e1);
|
|
auto edge_props = edge.Properties(storage::View::OLD);
|
|
ASSERT_TRUE(edge_props.HasValue());
|
|
if (GetParam()) {
|
|
ASSERT_THAT(*edge_props, UnorderedElementsAre(std::make_pair(
|
|
store.NameToProperty("test"),
|
|
storage::PropertyValue("nandare"))));
|
|
} else {
|
|
ASSERT_EQ(edge_props->size(), 0);
|
|
}
|
|
auto out_edges = v2->OutEdges(storage::View::OLD);
|
|
ASSERT_TRUE(out_edges.HasValue());
|
|
ASSERT_EQ(out_edges->size(), 0);
|
|
}
|
|
{
|
|
auto v3 = acc.FindVertex(gid_v3, storage::View::OLD);
|
|
ASSERT_TRUE(v3);
|
|
auto labels = v3->Labels(storage::View::OLD);
|
|
ASSERT_TRUE(labels.HasValue());
|
|
ASSERT_EQ(labels->size(), 0);
|
|
auto props = v3->Properties(storage::View::OLD);
|
|
ASSERT_TRUE(props.HasValue());
|
|
ASSERT_THAT(*props,
|
|
UnorderedElementsAre(std::make_pair(
|
|
store.NameToProperty("v3"), storage::PropertyValue(42))));
|
|
auto in_edges = v3->InEdges(storage::View::OLD);
|
|
ASSERT_TRUE(in_edges.HasValue());
|
|
ASSERT_EQ(in_edges->size(), 0);
|
|
auto out_edges = v3->OutEdges(storage::View::OLD);
|
|
ASSERT_TRUE(out_edges.HasValue());
|
|
ASSERT_EQ(out_edges->size(), 0);
|
|
}
|
|
}
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalCreateAndRemoveEverything) {
|
|
// Create WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
CreateExtendedDataset(&store);
|
|
auto indices = store.ListAllIndices();
|
|
for (auto index : indices.label) {
|
|
ASSERT_TRUE(store.DropIndex(index));
|
|
}
|
|
for (auto index : indices.label_property) {
|
|
ASSERT_TRUE(store.DropIndex(index.first, index.second));
|
|
}
|
|
auto constraints = store.ListAllConstraints();
|
|
for (auto constraint : constraints.existence) {
|
|
ASSERT_TRUE(
|
|
store.DropExistenceConstraint(constraint.first, constraint.second));
|
|
}
|
|
auto acc = store.Access();
|
|
for (auto vertex : acc.Vertices(storage::View::OLD)) {
|
|
ASSERT_TRUE(acc.DetachDeleteVertex(&vertex).HasValue());
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover WALs.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
{
|
|
auto indices = store.ListAllIndices();
|
|
ASSERT_EQ(indices.label.size(), 0);
|
|
ASSERT_EQ(indices.label_property.size(), 0);
|
|
auto constraints = store.ListAllConstraints();
|
|
ASSERT_EQ(constraints.existence.size(), 0);
|
|
auto acc = store.Access();
|
|
uint64_t count = 0;
|
|
auto iterable = acc.Vertices(storage::View::OLD);
|
|
for (auto it = iterable.begin(); it != iterable.end(); ++it) {
|
|
++count;
|
|
}
|
|
ASSERT_EQ(count, 0);
|
|
}
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalTransactionOrdering) {
|
|
// NOLINTNEXTLINE(readability-isolate-declaration)
|
|
storage::Gid gid1, gid2, gid3;
|
|
|
|
// Create WAL.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery,
|
|
.wal_file_size_kibibytes = 100000}});
|
|
auto acc1 = store.Access();
|
|
auto acc2 = store.Access();
|
|
|
|
// Create vertex in transaction 2.
|
|
{
|
|
auto vertex2 = acc2.CreateVertex();
|
|
gid2 = vertex2.Gid();
|
|
ASSERT_TRUE(vertex2
|
|
.SetProperty(store.NameToProperty("id"),
|
|
storage::PropertyValue(2))
|
|
.HasValue());
|
|
}
|
|
|
|
auto acc3 = store.Access();
|
|
|
|
// Create vertex in transaction 3.
|
|
{
|
|
auto vertex3 = acc3.CreateVertex();
|
|
gid3 = vertex3.Gid();
|
|
ASSERT_TRUE(vertex3
|
|
.SetProperty(store.NameToProperty("id"),
|
|
storage::PropertyValue(3))
|
|
.HasValue());
|
|
}
|
|
|
|
// Create vertex in transaction 1.
|
|
{
|
|
auto vertex1 = acc1.CreateVertex();
|
|
gid1 = vertex1.Gid();
|
|
ASSERT_TRUE(vertex1
|
|
.SetProperty(store.NameToProperty("id"),
|
|
storage::PropertyValue(1))
|
|
.HasValue());
|
|
}
|
|
|
|
// Commit transaction 3, then 1, then 2.
|
|
ASSERT_FALSE(acc3.Commit().HasError());
|
|
ASSERT_FALSE(acc1.Commit().HasError());
|
|
ASSERT_FALSE(acc2.Commit().HasError());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Verify WAL data.
|
|
{
|
|
auto path = GetWalsList().front();
|
|
auto info = storage::ReadWalInfo(path);
|
|
storage::Decoder wal;
|
|
wal.Initialize(path, storage::kWalMagic);
|
|
wal.SetPosition(info.offset_deltas);
|
|
ASSERT_EQ(info.num_deltas, 9);
|
|
std::vector<std::pair<uint64_t, storage::WalDeltaData>> data;
|
|
for (uint64_t i = 0; i < info.num_deltas; ++i) {
|
|
auto timestamp = storage::ReadWalDeltaHeader(&wal);
|
|
data.emplace_back(timestamp, storage::ReadWalDeltaData(&wal));
|
|
}
|
|
// Verify timestamps.
|
|
ASSERT_EQ(data[1].first, data[0].first);
|
|
ASSERT_EQ(data[2].first, data[1].first);
|
|
ASSERT_GT(data[3].first, data[2].first);
|
|
ASSERT_EQ(data[4].first, data[3].first);
|
|
ASSERT_EQ(data[5].first, data[4].first);
|
|
ASSERT_GT(data[6].first, data[5].first);
|
|
ASSERT_EQ(data[7].first, data[6].first);
|
|
ASSERT_EQ(data[8].first, data[7].first);
|
|
// Verify transaction 3.
|
|
ASSERT_EQ(data[0].second.type, storage::WalDeltaData::Type::VERTEX_CREATE);
|
|
ASSERT_EQ(data[0].second.vertex_create_delete.gid, gid3);
|
|
ASSERT_EQ(data[1].second.type,
|
|
storage::WalDeltaData::Type::VERTEX_SET_PROPERTY);
|
|
ASSERT_EQ(data[1].second.vertex_edge_set_property.gid, gid3);
|
|
ASSERT_EQ(data[1].second.vertex_edge_set_property.property, "id");
|
|
ASSERT_EQ(data[1].second.vertex_edge_set_property.value,
|
|
storage::PropertyValue(3));
|
|
ASSERT_EQ(data[2].second.type,
|
|
storage::WalDeltaData::Type::TRANSACTION_END);
|
|
// Verify transaction 1.
|
|
ASSERT_EQ(data[3].second.type, storage::WalDeltaData::Type::VERTEX_CREATE);
|
|
ASSERT_EQ(data[3].second.vertex_create_delete.gid, gid1);
|
|
ASSERT_EQ(data[4].second.type,
|
|
storage::WalDeltaData::Type::VERTEX_SET_PROPERTY);
|
|
ASSERT_EQ(data[4].second.vertex_edge_set_property.gid, gid1);
|
|
ASSERT_EQ(data[4].second.vertex_edge_set_property.property, "id");
|
|
ASSERT_EQ(data[4].second.vertex_edge_set_property.value,
|
|
storage::PropertyValue(1));
|
|
ASSERT_EQ(data[5].second.type,
|
|
storage::WalDeltaData::Type::TRANSACTION_END);
|
|
// Verify transaction 2.
|
|
ASSERT_EQ(data[6].second.type, storage::WalDeltaData::Type::VERTEX_CREATE);
|
|
ASSERT_EQ(data[6].second.vertex_create_delete.gid, gid2);
|
|
ASSERT_EQ(data[7].second.type,
|
|
storage::WalDeltaData::Type::VERTEX_SET_PROPERTY);
|
|
ASSERT_EQ(data[7].second.vertex_edge_set_property.gid, gid2);
|
|
ASSERT_EQ(data[7].second.vertex_edge_set_property.property, "id");
|
|
ASSERT_EQ(data[7].second.vertex_edge_set_property.value,
|
|
storage::PropertyValue(2));
|
|
ASSERT_EQ(data[8].second.type,
|
|
storage::WalDeltaData::Type::TRANSACTION_END);
|
|
}
|
|
|
|
// Recover WALs.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
{
|
|
auto acc = store.Access();
|
|
for (auto [gid, id] : std::vector<std::pair<storage::Gid, int64_t>>{
|
|
{gid1, 1}, {gid2, 2}, {gid3, 3}}) {
|
|
auto vertex = acc.FindVertex(gid, storage::View::OLD);
|
|
ASSERT_TRUE(vertex);
|
|
auto labels = vertex->Labels(storage::View::OLD);
|
|
ASSERT_TRUE(labels.HasValue());
|
|
ASSERT_EQ(labels->size(), 0);
|
|
auto props = vertex->Properties(storage::View::OLD);
|
|
ASSERT_TRUE(props.HasValue());
|
|
ASSERT_EQ(props->size(), 1);
|
|
ASSERT_EQ(props->at(store.NameToProperty("id")),
|
|
storage::PropertyValue(id));
|
|
}
|
|
}
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalCreateAndRemoveOnlyBaseDataset) {
|
|
// Create WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
CreateExtendedDataset(&store);
|
|
auto label_indexed = store.NameToLabel("base_indexed");
|
|
auto label_unindexed = store.NameToLabel("base_unindexed");
|
|
auto acc = store.Access();
|
|
for (auto vertex : acc.Vertices(storage::View::OLD)) {
|
|
auto has_indexed = vertex.HasLabel(label_indexed, storage::View::OLD);
|
|
ASSERT_TRUE(has_indexed.HasValue());
|
|
auto has_unindexed = vertex.HasLabel(label_unindexed, storage::View::OLD);
|
|
if (!*has_indexed && !*has_unindexed) continue;
|
|
ASSERT_TRUE(acc.DetachDeleteVertex(&vertex).HasValue());
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover WALs.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store,
|
|
DatasetType::ONLY_EXTENDED_WITH_BASE_INDICES_AND_CONSTRAINTS,
|
|
GetParam());
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalDeathResilience) {
|
|
pid_t pid = fork();
|
|
if (pid == 0) {
|
|
// Create WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
// Create one million vertices.
|
|
for (uint64_t i = 0; i < 1000000; ++i) {
|
|
auto acc = store.Access();
|
|
acc.CreateVertex();
|
|
CHECK(!acc.Commit().HasError()) << "Couldn't commit transaction!";
|
|
}
|
|
}
|
|
} else if (pid > 0) {
|
|
// Wait for WALs to be created.
|
|
std::this_thread::sleep_for(std::chrono::seconds(2));
|
|
int status;
|
|
EXPECT_EQ(waitpid(pid, &status, WNOHANG), 0);
|
|
EXPECT_EQ(kill(pid, SIGKILL), 0);
|
|
EXPECT_EQ(waitpid(pid, &status, 0), pid);
|
|
EXPECT_NE(status, 0);
|
|
} else {
|
|
LOG(FATAL) << "Couldn't create process to execute test!";
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover WALs and create more WALs.
|
|
const uint64_t kExtraItems = 1000;
|
|
uint64_t count = 0;
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery,
|
|
.recover_on_startup = true}});
|
|
{
|
|
auto acc = store.Access();
|
|
auto iterable = acc.Vertices(storage::View::OLD);
|
|
for (auto it = iterable.begin(); it != iterable.end(); ++it) {
|
|
++count;
|
|
}
|
|
ASSERT_GT(count, 0);
|
|
}
|
|
|
|
{
|
|
auto acc = store.Access();
|
|
for (uint64_t i = 0; i < kExtraItems; ++i) {
|
|
acc.CreateVertex();
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 2);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover WALs.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
{
|
|
uint64_t current = 0;
|
|
auto acc = store.Access();
|
|
auto iterable = acc.Vertices(storage::View::OLD);
|
|
for (auto it = iterable.begin(); it != iterable.end(); ++it) {
|
|
++current;
|
|
}
|
|
ASSERT_EQ(count + kExtraItems, current);
|
|
}
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalMissingSecond) {
|
|
// Create unrelated WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_size_kibibytes = 1,
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
auto acc = store.Access();
|
|
for (uint64_t i = 0; i < 1000; ++i) {
|
|
acc.CreateVertex();
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
uint64_t unrelated_wals = GetWalsList().size();
|
|
|
|
// Create WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_size_kibibytes = 1,
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
const uint64_t kNumVertices = 1000;
|
|
std::vector<storage::Gid> gids;
|
|
gids.reserve(kNumVertices);
|
|
for (uint64_t i = 0; i < kNumVertices; ++i) {
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
gids.push_back(vertex.Gid());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
for (uint64_t i = 0; i < kNumVertices; ++i) {
|
|
auto acc = store.Access();
|
|
auto vertex = acc.FindVertex(gids[i], storage::View::OLD);
|
|
ASSERT_TRUE(vertex);
|
|
ASSERT_TRUE(vertex
|
|
->SetProperty(store.NameToProperty("nandare"),
|
|
storage::PropertyValue("haihaihai!"))
|
|
.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_GE(GetBackupWalsList().size(), 1);
|
|
|
|
// Restore unrelated WALs.
|
|
RestoreBackups();
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 2);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Remove second WAL.
|
|
{
|
|
auto wals = GetWalsList();
|
|
ASSERT_GT(wals.size(), unrelated_wals + 2);
|
|
const auto &wal_file = wals[wals.size() - unrelated_wals - 2];
|
|
LOG(INFO) << "Deleting WAL file " << wal_file;
|
|
ASSERT_TRUE(std::filesystem::remove(wal_file));
|
|
}
|
|
|
|
// Recover WALs.
|
|
ASSERT_DEATH(
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
},
|
|
"");
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalCorruptSecond) {
|
|
// Create unrelated WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_size_kibibytes = 1,
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
auto acc = store.Access();
|
|
for (uint64_t i = 0; i < 1000; ++i) {
|
|
acc.CreateVertex();
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
uint64_t unrelated_wals = GetWalsList().size();
|
|
|
|
// Create WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_size_kibibytes = 1,
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
const uint64_t kNumVertices = 1000;
|
|
std::vector<storage::Gid> gids;
|
|
gids.reserve(kNumVertices);
|
|
for (uint64_t i = 0; i < kNumVertices; ++i) {
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
gids.push_back(vertex.Gid());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
for (uint64_t i = 0; i < kNumVertices; ++i) {
|
|
auto acc = store.Access();
|
|
auto vertex = acc.FindVertex(gids[i], storage::View::OLD);
|
|
ASSERT_TRUE(vertex);
|
|
ASSERT_TRUE(vertex
|
|
->SetProperty(store.NameToProperty("nandare"),
|
|
storage::PropertyValue("haihaihai!"))
|
|
.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_GE(GetBackupWalsList().size(), 1);
|
|
|
|
// Restore unrelated WALs.
|
|
RestoreBackups();
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 2);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Destroy second WAL.
|
|
{
|
|
auto wals = GetWalsList();
|
|
ASSERT_GT(wals.size(), unrelated_wals + 2);
|
|
const auto &wal_file = wals[wals.size() - unrelated_wals - 2];
|
|
DestroyWalFirstDelta(wal_file);
|
|
}
|
|
|
|
// Recover WALs.
|
|
ASSERT_DEATH(
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
},
|
|
"");
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalCorruptLastTransaction) {
|
|
// Create WALs
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_size_kibibytes = 1,
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
CreateExtendedDataset(&store, /* single_transaction = */ true);
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 2);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Destroy last transaction in the latest WAL.
|
|
{
|
|
auto wals = GetWalsList();
|
|
ASSERT_GE(wals.size(), 2);
|
|
const auto &wal_file = wals.front();
|
|
DestroyWalSuffix(wal_file);
|
|
}
|
|
|
|
// Recover WALs.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
// The extended dataset shouldn't be recovered because its WAL transaction was
|
|
// corrupt.
|
|
VerifyDataset(&store,
|
|
DatasetType::ONLY_BASE_WITH_EXTENDED_INDICES_AND_CONSTRAINTS,
|
|
GetParam());
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalAllOperationsInSingleTransaction) {
|
|
// Create WALs
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_size_kibibytes = 1,
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
auto acc = store.Access();
|
|
auto vertex1 = acc.CreateVertex();
|
|
auto vertex2 = acc.CreateVertex();
|
|
ASSERT_TRUE(vertex1.AddLabel(acc.NameToLabel("nandare")).HasValue());
|
|
ASSERT_TRUE(vertex2
|
|
.SetProperty(acc.NameToProperty("haihai"),
|
|
storage::PropertyValue(42))
|
|
.HasValue());
|
|
ASSERT_TRUE(vertex1.RemoveLabel(acc.NameToLabel("nandare")).HasValue());
|
|
auto edge1 = acc.CreateEdge(&vertex1, &vertex2, acc.NameToEdgeType("et1"));
|
|
ASSERT_TRUE(edge1.HasValue());
|
|
ASSERT_TRUE(
|
|
vertex2
|
|
.SetProperty(acc.NameToProperty("haihai"), storage::PropertyValue())
|
|
.HasValue());
|
|
auto vertex3 = acc.CreateVertex();
|
|
auto edge2 = acc.CreateEdge(&vertex3, &vertex3, acc.NameToEdgeType("et2"));
|
|
ASSERT_TRUE(edge2.HasValue());
|
|
if (GetParam()) {
|
|
ASSERT_TRUE(edge2
|
|
->SetProperty(acc.NameToProperty("meaning"),
|
|
storage::PropertyValue(true))
|
|
.HasValue());
|
|
ASSERT_TRUE(edge1
|
|
->SetProperty(acc.NameToProperty("hello"),
|
|
storage::PropertyValue("world"))
|
|
.HasValue());
|
|
ASSERT_TRUE(edge2
|
|
->SetProperty(acc.NameToProperty("meaning"),
|
|
storage::PropertyValue())
|
|
.HasValue());
|
|
}
|
|
ASSERT_TRUE(vertex3.AddLabel(acc.NameToLabel("test")).HasValue());
|
|
ASSERT_TRUE(vertex3
|
|
.SetProperty(acc.NameToProperty("nonono"),
|
|
storage::PropertyValue(-1))
|
|
.HasValue());
|
|
ASSERT_TRUE(
|
|
vertex3
|
|
.SetProperty(acc.NameToProperty("nonono"), storage::PropertyValue())
|
|
.HasValue());
|
|
if (GetParam()) {
|
|
ASSERT_TRUE(edge1
|
|
->SetProperty(acc.NameToProperty("hello"),
|
|
storage::PropertyValue())
|
|
.HasValue());
|
|
}
|
|
ASSERT_TRUE(vertex3.RemoveLabel(acc.NameToLabel("test")).HasValue());
|
|
ASSERT_TRUE(acc.DetachDeleteVertex(&vertex1).HasValue());
|
|
ASSERT_TRUE(acc.DeleteEdge(&*edge2).HasValue());
|
|
ASSERT_TRUE(acc.DeleteVertex(&vertex2).HasValue());
|
|
ASSERT_TRUE(acc.DeleteVertex(&vertex3).HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover WALs.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
{
|
|
auto acc = store.Access();
|
|
uint64_t count = 0;
|
|
auto iterable = acc.Vertices(storage::View::OLD);
|
|
for (auto it = iterable.begin(); it != iterable.end(); ++it) {
|
|
++count;
|
|
}
|
|
ASSERT_EQ(count, 0);
|
|
}
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalAndSnapshot) {
|
|
// Create snapshot and WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::milliseconds(2000),
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
|
CreateExtendedDataset(&store);
|
|
}
|
|
|
|
ASSERT_GE(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover snapshot and WALs.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam());
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalAndSnapshotAppendToExistingSnapshot) {
|
|
// Create snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_on_exit = true}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam());
|
|
}
|
|
|
|
// Recover snapshot and create WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
CreateExtendedDataset(&store);
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover snapshot and WALs.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam());
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalAndSnapshotAppendToExistingSnapshotAndWal) {
|
|
// Create snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_on_exit = true}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetWalsList().size(), 0);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover snapshot.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam());
|
|
}
|
|
|
|
// Recover snapshot and create WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
CreateExtendedDataset(&store);
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover snapshot and WALs and create more WALs.
|
|
storage::Gid vertex_gid;
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam());
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
vertex_gid = vertex.Gid();
|
|
if (GetParam()) {
|
|
ASSERT_TRUE(vertex
|
|
.SetProperty(store.NameToProperty("meaning"),
|
|
storage::PropertyValue(42))
|
|
.HasValue());
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 2);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover snapshot and WALs.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam());
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.FindVertex(vertex_gid, storage::View::OLD);
|
|
ASSERT_TRUE(vertex);
|
|
auto labels = vertex->Labels(storage::View::OLD);
|
|
ASSERT_TRUE(labels.HasValue());
|
|
ASSERT_EQ(labels->size(), 0);
|
|
auto props = vertex->Properties(storage::View::OLD);
|
|
ASSERT_TRUE(props.HasValue());
|
|
if (GetParam()) {
|
|
ASSERT_THAT(*props, UnorderedElementsAre(
|
|
std::make_pair(store.NameToProperty("meaning"),
|
|
storage::PropertyValue(42))));
|
|
} else {
|
|
ASSERT_EQ(props->size(), 0);
|
|
}
|
|
}
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, WalAndSnapshotWalRetention) {
|
|
// Create unrelated WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::minutes(20),
|
|
.wal_file_size_kibibytes = 1,
|
|
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
|
auto acc = store.Access();
|
|
for (uint64_t i = 0; i < 1000; ++i) {
|
|
acc.CreateVertex();
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
uint64_t unrelated_wals = GetWalsList().size();
|
|
|
|
uint64_t items_created = 0;
|
|
|
|
// Create snapshot and WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::seconds(2),
|
|
.wal_file_size_kibibytes = 1,
|
|
.wal_file_flush_every_n_tx = 1}});
|
|
// Restore unrelated snapshots after the database has been started.
|
|
RestoreBackups();
|
|
utils::Timer timer;
|
|
// Allow at least 6 snapshots to be created.
|
|
while (timer.Elapsed().count() < 13.0) {
|
|
auto acc = store.Access();
|
|
acc.CreateVertex();
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
++items_created;
|
|
}
|
|
}
|
|
|
|
ASSERT_EQ(GetSnapshotsList().size(), 3);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), unrelated_wals + 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
auto snapshots = GetSnapshotsList();
|
|
ASSERT_EQ(snapshots.size(), 3);
|
|
|
|
for (uint64_t i = 0; i < snapshots.size(); ++i) {
|
|
LOG(INFO) << "Recovery attempt " << i;
|
|
|
|
// Recover and verify data.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
auto acc = store.Access();
|
|
for (uint64_t j = 0; j < items_created; ++j) {
|
|
auto vertex =
|
|
acc.FindVertex(storage::Gid::FromUint(j), storage::View::OLD);
|
|
ASSERT_TRUE(vertex);
|
|
}
|
|
}
|
|
|
|
// Destroy current snapshot.
|
|
DestroySnapshot(snapshots[i]);
|
|
}
|
|
|
|
// Recover data after all of the snapshots have been destroyed. The recovery
|
|
// shouldn't be possible because the initial WALs are already deleted.
|
|
ASSERT_DEATH(
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
},
|
|
"");
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
|
TEST_P(DurabilityTest, SnapshotAndWalMixedUUID) {
|
|
// Create unrelated snapshot and WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::seconds(2)}});
|
|
auto acc = store.Access();
|
|
for (uint64_t i = 0; i < 1000; ++i) {
|
|
acc.CreateVertex();
|
|
}
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
|
}
|
|
|
|
ASSERT_GE(GetSnapshotsList().size(), 1);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Create snapshot and WALs.
|
|
{
|
|
storage::Storage store(
|
|
{.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.snapshot_wal_mode = storage::Config::Durability::
|
|
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
|
.snapshot_interval = std::chrono::seconds(2)}});
|
|
CreateBaseDataset(&store, GetParam());
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
|
CreateExtendedDataset(&store);
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
|
}
|
|
|
|
ASSERT_GE(GetSnapshotsList().size(), 1);
|
|
ASSERT_GE(GetBackupSnapshotsList().size(), 1);
|
|
ASSERT_GE(GetWalsList().size(), 1);
|
|
ASSERT_GE(GetBackupWalsList().size(), 1);
|
|
|
|
// Restore unrelated snapshots and WALs.
|
|
RestoreBackups();
|
|
|
|
ASSERT_GE(GetSnapshotsList().size(), 2);
|
|
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
|
|
ASSERT_GE(GetWalsList().size(), 2);
|
|
ASSERT_EQ(GetBackupWalsList().size(), 0);
|
|
|
|
// Recover snapshot and WALs.
|
|
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
|
.durability = {.storage_directory = storage_directory,
|
|
.recover_on_startup = true}});
|
|
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam());
|
|
|
|
// Try to use the storage.
|
|
{
|
|
auto acc = store.Access();
|
|
auto vertex = acc.CreateVertex();
|
|
auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et"));
|
|
ASSERT_TRUE(edge.HasValue());
|
|
ASSERT_FALSE(acc.Commit().HasError());
|
|
}
|
|
}
|