Initial version of storage v2

Summary:
Initial implementation of new storage engine.  It implements snapshot isolation
for transactions.  All changes in the database are stored as deltas instead of
making full copies.  Currently, the storage supports full transaction
functionality (commit, abort, command advancement).  Also, support has been
implemented only for vertices that have only labels.

Reviewers: teon.banek, mtomic

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2138
This commit is contained in:
Matej Ferencevic 2019-06-26 16:01:51 +02:00
parent 70c14b2048
commit 54e1e4e1ff
11 changed files with 1307 additions and 0 deletions

55
src/storage/v2/delta.hpp Normal file
View File

@ -0,0 +1,55 @@
#pragma once
#include <atomic>
namespace storage {
struct Delta {
enum class Action {
DELETE_OBJECT,
ADD_LABEL,
REMOVE_LABEL,
};
Delta(Action action, uint64_t value, std::atomic<uint64_t> *timestamp,
uint64_t command_id)
: action(action),
value(value),
timestamp(timestamp),
command_id(command_id),
next(nullptr) {}
Delta(const Delta &) = delete;
Delta &operator=(const Delta &) = delete;
Delta(Delta &&other) noexcept
: action(other.action),
value(other.value),
timestamp(other.timestamp),
command_id(other.command_id),
next(other.next.load()) {}
Delta &operator=(Delta &&other) noexcept {
if (this == &other) return *this;
action = other.action;
value = other.value;
timestamp = other.timestamp;
command_id = other.command_id;
next = other.next.load();
return *this;
}
~Delta() {}
Action action;
uint64_t value;
// TODO: optimize with in-place copy
std::atomic<uint64_t> *timestamp;
uint64_t command_id;
std::atomic<Delta *> next;
};
} // namespace storage

45
src/storage/v2/gid.hpp Normal file
View File

@ -0,0 +1,45 @@
#pragma once
#include "utils/cast.hpp"
namespace storage {
class Gid final {
private:
explicit Gid(uint64_t gid) : gid_(gid) {}
public:
static Gid FromUint(uint64_t gid) { return Gid{gid}; }
static Gid FromInt(int64_t gid) {
return Gid{utils::MemcpyCast<uint64_t>(gid)};
}
uint64_t AsUint() const { return gid_; }
int64_t AsInt() const { return utils::MemcpyCast<int64_t>(gid_); }
private:
uint64_t gid_;
};
inline bool operator==(const Gid &first, const Gid &second) {
return first.AsUint() == second.AsUint();
}
inline bool operator!=(const Gid &first, const Gid &second) {
return first.AsUint() != second.AsUint();
}
inline bool operator<(const Gid &first, const Gid &second) {
return first.AsUint() < second.AsUint();
}
inline bool operator>(const Gid &first, const Gid &second) {
return first.AsUint() > second.AsUint();
}
inline bool operator<=(const Gid &first, const Gid &second) {
return first.AsUint() <= second.AsUint();
}
inline bool operator>=(const Gid &first, const Gid &second) {
return first.AsUint() >= second.AsUint();
}
} // namespace storage

70
src/storage/v2/mvcc.hpp Normal file
View File

@ -0,0 +1,70 @@
#pragma once
#include "storage/v2/delta.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/view.hpp"
namespace storage {
/// This function iterates through the undo buffers from an object (starting
/// from the supplied delta) and determines what deltas should be applied to get
/// the currently visible version of the object. When the function finds a delta
/// that should be applied it calls the callback function with the delta that
/// should be applied passed as a parameter to the callback. It is up to the
/// caller to apply the deltas.
template <typename TCallback>
inline void ApplyDeltasForRead(Transaction *transaction, Delta *delta,
View view, const TCallback &callback) {
while (delta != nullptr) {
auto ts = delta->timestamp->load(std::memory_order_acquire);
auto cid = delta->command_id;
// This is a committed change that we see so we shouldn't undo it.
if (ts < transaction->start_timestamp) {
break;
}
// We shouldn't undo our newest changes because the user requested a NEW
// view of the database.
if (view == View::NEW && ts == transaction->transaction_id &&
cid <= transaction->command_id) {
break;
}
// We shouldn't undo our older changes because the user requested a OLD view
// of the database.
if (view == View::OLD && ts == transaction->transaction_id &&
cid < transaction->command_id) {
break;
}
// This delta must be applied, call the callback.
callback(*delta);
// Move to the next delta.
delta = delta->next.load(std::memory_order_acquire);
}
}
/// This function prepares the Vertex object for a write. It checks whether
/// there are any serialization errors in the process (eg. the object can't be
/// written to from this transaction because it is being written to from another
/// transaction) and returns a `bool` value indicating whether the caller can
/// proceed with a write operation.
inline bool PrepareForWrite(Transaction *transaction, Vertex *vertex) {
if (vertex->delta == nullptr) return true;
auto ts = vertex->delta->timestamp->load(std::memory_order_acquire);
if (ts == transaction->transaction_id || ts < transaction->start_timestamp) {
auto it = transaction->modified_vertices.rbegin();
if (it == transaction->modified_vertices.rend() || *it != vertex) {
transaction->modified_vertices.push_back(vertex);
}
return true;
}
transaction->must_abort = true;
return false;
}
} // namespace storage

43
src/storage/v2/result.hpp Normal file
View File

@ -0,0 +1,43 @@
#pragma once
#include <optional>
#include <glog/logging.h>
namespace storage {
enum class Error : uint8_t {
SERIALIZATION_ERROR,
};
template <typename TReturn>
class Result final {
public:
explicit Result(const TReturn &ret) : return_(ret) {}
explicit Result(TReturn &&ret) : return_(std::move(ret)) {}
explicit Result(const Error &error) : error_(error) {}
bool IsReturn() const { return return_.has_value(); }
bool IsError() const { return error_.has_value(); }
TReturn *GetReturn() {
CHECK(return_) << "The storage result is an error!";
return &*return_;
}
const TReturn *GetReturn() const {
CHECK(return_) << "The storage result is an error!";
return &*return_;
}
Error GetError() const {
CHECK(error_) << "The storage result is a return value!";
return *error_;
}
private:
std::optional<TReturn> return_;
std::optional<Error> error_;
};
} // namespace storage

176
src/storage/v2/storage.hpp Normal file
View File

@ -0,0 +1,176 @@
#pragma once
#include <memory>
#include <optional>
#include <glog/logging.h>
#include "utils/skip_list.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/vertex_accessor.hpp"
namespace storage {
// The storage is based on this paper:
// https://db.in.tum.de/~muehlbau/papers/mvcc.pdf
// The paper implements a fully serializable storage, in our implementation we
// only implement snapshot isolation for transactions.
const uint64_t kTimestampInitialId = 0;
const uint64_t kTransactionInitialId = 1ULL << 63U;
class Storage final {
public:
class Accessor final {
public:
explicit Accessor(Storage *storage)
: storage_(storage), is_transaction_starter_(true) {
// We acquire the storage lock here because we access (and modify) the
// transaction engine variables (`transaction_id` and `timestamp`) below.
std::lock_guard<utils::SpinLock> guard(storage_->lock_);
auto acc = storage_->transactions_.access();
auto [it, inserted] = acc.insert(
Transaction{storage_->transaction_id_++, storage_->timestamp_++});
CHECK(inserted) << "The Transaction must be inserted here!";
CHECK(it != acc.end()) << "Invalid Transaction iterator!";
transaction_ = &*it;
}
Accessor(const Accessor &) = delete;
Accessor &operator=(const Accessor &) = delete;
Accessor(Accessor &&other) noexcept
: storage_(other.storage_),
transaction_(other.transaction_),
is_transaction_starter_(true) {
CHECK(other.is_transaction_starter_)
<< "The original accessor isn't valid!";
// Don't allow the other accessor to abort our transaction.
other.is_transaction_starter_ = false;
}
// This operator isn't `noexcept` because the `Abort` function isn't
// `noexcept`.
Accessor &operator=(Accessor &&other) {
if (this == &other) return *this;
if (is_transaction_starter_ && transaction_->is_active) {
Abort();
}
storage_ = other.storage_;
transaction_ = other.transaction_;
is_transaction_starter_ = true;
CHECK(other.is_transaction_starter_)
<< "The original accessor isn't valid!";
// Don't allow the other accessor to abort our transaction.
other.is_transaction_starter_ = false;
return *this;
}
~Accessor() {
if (is_transaction_starter_ && transaction_->is_active) {
Abort();
}
}
VertexAccessor CreateVertex() {
auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel);
auto acc = storage_->vertices_.access();
auto delta = transaction_->CreateDelta(Delta::Action::DELETE_OBJECT, 0);
auto [it, inserted] =
acc.insert(Vertex{storage::Gid::FromUint(gid), delta});
CHECK(inserted) << "The vertex must be inserted here!";
CHECK(it != acc.end()) << "Invalid Vertex accessor!";
return VertexAccessor::Create(&*it, transaction_, View::NEW).value();
}
std::optional<VertexAccessor> FindVertex(Gid gid, View view) {
auto acc = storage_->vertices_.access();
auto it = acc.find(gid);
if (it == acc.end()) return std::nullopt;
return VertexAccessor::Create(&*it, transaction_, view);
}
void AdvanceCommand() { ++transaction_->command_id; }
void Commit() {
CHECK(!transaction_->must_abort) << "The transaction can't be committed!";
CHECK(transaction_->is_active)
<< "The transaction is already terminated!";
if (transaction_->deltas.empty()) {
transaction_->commit_timestamp.store(transaction_->start_timestamp,
std::memory_order_release);
} else {
std::lock_guard<utils::SpinLock> guard(storage_->lock_);
transaction_->commit_timestamp.store(storage_->timestamp_++,
std::memory_order_release);
// TODO: release lock, and update all deltas to have an in-memory copy
// of the commit id
}
transaction_->is_active = false;
}
void Abort() {
CHECK(transaction_->is_active)
<< "The transaction is already terminated!";
for (Vertex *vertex : transaction_->modified_vertices) {
std::lock_guard<utils::SpinLock> guard(vertex->lock);
Delta *current = vertex->delta;
while (current != nullptr &&
current->timestamp->load(std::memory_order_acquire) ==
transaction_->transaction_id) {
switch (current->action) {
case Delta::Action::REMOVE_LABEL: {
auto it = std::find(vertex->labels.begin(), vertex->labels.end(),
current->value);
CHECK(it != vertex->labels.end()) << "Invalid database state!";
std::swap(*it, *vertex->labels.rbegin());
vertex->labels.pop_back();
break;
}
case Delta::Action::ADD_LABEL: {
auto it = std::find(vertex->labels.begin(), vertex->labels.end(),
current->value);
CHECK(it == vertex->labels.end()) << "Invalid database state!";
vertex->labels.push_back(current->value);
break;
}
case Delta::Action::DELETE_OBJECT: {
auto acc = storage_->vertices_.access();
CHECK(acc.remove(vertex->gid)) << "Invalid database state!";
break;
}
}
current = current->next.load(std::memory_order_acquire);
}
vertex->delta = current;
}
transaction_->is_active = false;
}
private:
Storage *storage_;
Transaction *transaction_;
bool is_transaction_starter_;
};
Accessor Access() { return Accessor{this}; }
private:
// Main object storage
utils::SkipList<storage::Vertex> vertices_;
std::atomic<uint64_t> vertex_id_{0};
// Transaction engine
utils::SpinLock lock_;
uint64_t timestamp_{kTimestampInitialId};
uint64_t transaction_id_{kTransactionInitialId};
utils::SkipList<Transaction> transactions_;
};
} // namespace storage

View File

@ -0,0 +1,90 @@
#pragma once
#include <atomic>
#include <limits>
#include <list>
#include "utils/skip_list.hpp"
#include "storage/v2/delta.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/view.hpp"
namespace storage {
struct Transaction {
Transaction(uint64_t transaction_id, uint64_t start_timestamp)
: transaction_id(transaction_id),
start_timestamp(start_timestamp),
commit_timestamp(transaction_id),
command_id(0),
is_active(true),
must_abort(false) {}
// Default constructor necessary for utils::SkipList.
Transaction()
: transaction_id(std::numeric_limits<uint64_t>::max()),
start_timestamp(std::numeric_limits<uint64_t>::max()),
commit_timestamp(std::numeric_limits<uint64_t>::max()),
command_id(std::numeric_limits<uint64_t>::max()),
is_active(true),
must_abort(false) {}
Transaction(const Transaction &) = delete;
Transaction &operator=(const Transaction &) = delete;
Transaction(Transaction &&other) noexcept
: transaction_id(other.transaction_id),
start_timestamp(other.start_timestamp),
commit_timestamp(other.commit_timestamp.load()),
command_id(other.command_id),
deltas(std::move(other.deltas)),
modified_vertices(std::move(other.modified_vertices)),
is_active(other.is_active),
must_abort(other.must_abort) {}
Transaction &operator=(Transaction &&other) noexcept {
if (this == &other) return *this;
transaction_id = other.transaction_id;
start_timestamp = other.start_timestamp;
commit_timestamp = other.commit_timestamp.load();
command_id = other.command_id;
deltas = std::move(other.deltas);
modified_vertices = std::move(other.modified_vertices);
is_active = other.is_active;
must_abort = other.must_abort;
return *this;
}
~Transaction() {}
Delta *CreateDelta(Delta::Action action, uint64_t value) {
return &deltas.emplace_back(action, value, &commit_timestamp, command_id);
}
uint64_t transaction_id;
uint64_t start_timestamp;
std::atomic<uint64_t> commit_timestamp;
uint64_t command_id;
std::list<Delta> deltas;
std::list<Vertex *> modified_vertices;
bool is_active;
bool must_abort;
};
inline bool operator==(const Transaction &first, const Transaction &second) {
return first.transaction_id == second.transaction_id;
}
inline bool operator<(const Transaction &first, const Transaction &second) {
return first.transaction_id < second.transaction_id;
}
inline bool operator==(const Transaction &first, const uint64_t &second) {
return first.transaction_id == second;
}
inline bool operator<(const Transaction &first, const uint64_t &second) {
return first.transaction_id < second;
}
} // namespace storage

50
src/storage/v2/vertex.hpp Normal file
View File

@ -0,0 +1,50 @@
#pragma once
#include <limits>
#include <unordered_map>
#include <vector>
#include "utils/spin_lock.hpp"
#include "storage/v2/delta.hpp"
#include "storage/v2/gid.hpp"
namespace storage {
struct Vertex {
// Default constructor necessary for utils::SkipList.
Vertex()
: gid(storage::Gid::FromUint(std::numeric_limits<uint64_t>::max())),
delta(nullptr) {}
Vertex(Gid gid, Delta *delta) : gid(gid), delta(delta) {
CHECK(delta->action == Delta::Action::DELETE_OBJECT)
<< "Vertex must be created with an initial DELETE_OBJECT delta!";
}
Gid gid;
std::vector<uint64_t> labels;
// TODO: add
// std::unordered_map<uint64_t, storage::PropertyValue> properties;
utils::SpinLock lock;
// uint32_t PAD;
Delta *delta;
};
inline bool operator==(const Vertex &first, const Vertex &second) {
return first.gid == second.gid;
}
inline bool operator<(const Vertex &first, const Vertex &second) {
return first.gid < second.gid;
}
inline bool operator==(const Vertex &first, const Gid &second) {
return first.gid == second;
}
inline bool operator<(const Vertex &first, const Gid &second) {
return first.gid < second;
}
} // namespace storage

View File

@ -0,0 +1,159 @@
#pragma once
#include <memory>
#include <optional>
#include "storage/v2/vertex.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/result.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/view.hpp"
namespace storage {
class VertexAccessor final {
private:
VertexAccessor(Vertex *vertex, Transaction *transaction)
: vertex_(vertex), transaction_(transaction) {}
public:
static std::optional<VertexAccessor> Create(Vertex *vertex,
Transaction *transaction,
View view) {
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex->lock);
delta = vertex->delta;
}
bool is_visible = true;
ApplyDeltasForRead(transaction, delta, view,
[&is_visible](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
break;
case Delta::Action::DELETE_OBJECT: {
is_visible = false;
break;
}
}
});
if (!is_visible) return std::nullopt;
return VertexAccessor{vertex, transaction};
}
Result<bool> AddLabel(uint64_t label) {
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_))
return Result<bool>{Error::SERIALIZATION_ERROR};
if (std::find(vertex_->labels.begin(), vertex_->labels.end(), label) !=
vertex_->labels.end())
return Result<bool>{false};
auto delta = transaction_->CreateDelta(Delta::Action::REMOVE_LABEL, label);
delta->next = vertex_->delta;
vertex_->delta = delta;
vertex_->labels.push_back(label);
return Result<bool>{true};
}
Result<bool> RemoveLabel(uint64_t label) {
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_))
return Result<bool>{Error::SERIALIZATION_ERROR};
auto it = std::find(vertex_->labels.begin(), vertex_->labels.end(), label);
if (it == vertex_->labels.end()) return Result<bool>{false};
auto delta = transaction_->CreateDelta(Delta::Action::ADD_LABEL, label);
delta->next = vertex_->delta;
vertex_->delta = delta;
std::swap(*it, *vertex_->labels.rbegin());
vertex_->labels.pop_back();
return Result<bool>{true};
}
bool HasLabel(uint64_t label, View view) {
bool has_label = false;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
has_label = std::find(vertex_->labels.begin(), vertex_->labels.end(),
label) != vertex_->labels.end();
delta = vertex_->delta;
}
ApplyDeltasForRead(transaction_, delta, view,
[&has_label, label](const Delta &delta) {
switch (delta.action) {
case Delta::Action::REMOVE_LABEL: {
if (delta.value == label) {
CHECK(has_label) << "Invalid database state!";
has_label = false;
}
break;
}
case Delta::Action::ADD_LABEL: {
if (delta.value == label) {
CHECK(!has_label) << "Invalid database state!";
has_label = true;
}
break;
}
case Delta::Action::DELETE_OBJECT: {
LOG(FATAL) << "Invalid accessor!";
break;
}
}
});
return has_label;
}
std::vector<uint64_t> Labels(View view) {
std::vector<uint64_t> labels;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
labels = vertex_->labels;
delta = vertex_->delta;
}
ApplyDeltasForRead(
transaction_, delta, view, [&labels](const Delta &delta) {
switch (delta.action) {
case Delta::Action::REMOVE_LABEL: {
// Remove the label because we don't see the addition.
auto it = std::find(labels.begin(), labels.end(), delta.value);
CHECK(it != labels.end()) << "Invalid database state!";
std::swap(*it, *labels.rbegin());
labels.pop_back();
break;
}
case Delta::Action::ADD_LABEL: {
// Add the label because we don't see the removal.
auto it = std::find(labels.begin(), labels.end(), delta.value);
CHECK(it == labels.end()) << "Invalid database state!";
labels.push_back(delta.value);
break;
}
case Delta::Action::DELETE_OBJECT: {
LOG(FATAL) << "Invalid accessor!";
break;
}
}
});
return labels;
}
Gid Gid() const { return vertex_->gid; }
private:
Vertex *vertex_;
Transaction *transaction_;
};
} // namespace storage

10
src/storage/v2/view.hpp Normal file
View File

@ -0,0 +1,10 @@
#pragma once
namespace storage {
enum class View {
OLD,
NEW,
};
} // namespace storage

View File

@ -374,6 +374,11 @@ target_link_libraries(${test_prefix}utils_watchdog mg-utils)
add_unit_test(auth.cpp)
target_link_libraries(${test_prefix}auth mg-auth kvstore_lib)
# Test storage v2
add_unit_test(storage_v2.cpp)
target_link_libraries(${test_prefix}storage_v2 mg-utils)
# Test LCP
add_custom_command(

604
tests/unit/storage_v2.cpp Normal file
View File

@ -0,0 +1,604 @@
#include <gtest/gtest.h>
#include <limits>
#include "storage/v2/storage.hpp"
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST(StorageV2, Commit) {
storage::Storage store;
storage::Gid gid =
storage::Gid::FromUint(std::numeric_limits<uint64_t>::max());
{
auto acc = store.Access();
auto vertex = acc.CreateVertex();
gid = vertex.Gid();
ASSERT_FALSE(acc.FindVertex(gid, storage::View::OLD).has_value());
ASSERT_TRUE(acc.FindVertex(gid, storage::View::NEW).has_value());
acc.Commit();
}
{
auto acc = store.Access();
ASSERT_TRUE(acc.FindVertex(gid, storage::View::OLD).has_value());
ASSERT_TRUE(acc.FindVertex(gid, storage::View::NEW).has_value());
acc.Abort();
}
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST(StorageV2, Abort) {
storage::Storage store;
storage::Gid gid =
storage::Gid::FromUint(std::numeric_limits<uint64_t>::max());
{
auto acc = store.Access();
auto vertex = acc.CreateVertex();
gid = vertex.Gid();
ASSERT_FALSE(acc.FindVertex(gid, storage::View::OLD).has_value());
ASSERT_TRUE(acc.FindVertex(gid, storage::View::NEW).has_value());
acc.Abort();
}
{
auto acc = store.Access();
ASSERT_FALSE(acc.FindVertex(gid, storage::View::OLD).has_value());
ASSERT_FALSE(acc.FindVertex(gid, storage::View::NEW).has_value());
acc.Abort();
}
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST(StorageV2, AdvanceCommandCommit) {
storage::Storage store;
storage::Gid gid1 =
storage::Gid::FromUint(std::numeric_limits<uint64_t>::max());
storage::Gid gid2 =
storage::Gid::FromUint(std::numeric_limits<uint64_t>::max());
{
auto acc = store.Access();
auto vertex1 = acc.CreateVertex();
gid1 = vertex1.Gid();
ASSERT_FALSE(acc.FindVertex(gid1, storage::View::OLD).has_value());
ASSERT_TRUE(acc.FindVertex(gid1, storage::View::NEW).has_value());
acc.AdvanceCommand();
auto vertex2 = acc.CreateVertex();
gid2 = vertex2.Gid();
ASSERT_FALSE(acc.FindVertex(gid2, storage::View::OLD).has_value());
ASSERT_TRUE(acc.FindVertex(gid2, storage::View::NEW).has_value());
ASSERT_TRUE(acc.FindVertex(gid1, storage::View::OLD).has_value());
ASSERT_TRUE(acc.FindVertex(gid1, storage::View::NEW).has_value());
acc.Commit();
}
{
auto acc = store.Access();
ASSERT_TRUE(acc.FindVertex(gid1, storage::View::OLD).has_value());
ASSERT_TRUE(acc.FindVertex(gid1, storage::View::NEW).has_value());
ASSERT_TRUE(acc.FindVertex(gid2, storage::View::OLD).has_value());
ASSERT_TRUE(acc.FindVertex(gid2, storage::View::NEW).has_value());
acc.Abort();
}
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST(StorageV2, AdvanceCommandAbort) {
storage::Storage store;
storage::Gid gid1 =
storage::Gid::FromUint(std::numeric_limits<uint64_t>::max());
storage::Gid gid2 =
storage::Gid::FromUint(std::numeric_limits<uint64_t>::max());
{
auto acc = store.Access();
auto vertex1 = acc.CreateVertex();
gid1 = vertex1.Gid();
ASSERT_FALSE(acc.FindVertex(gid1, storage::View::OLD).has_value());
ASSERT_TRUE(acc.FindVertex(gid1, storage::View::NEW).has_value());
acc.AdvanceCommand();
auto vertex2 = acc.CreateVertex();
gid2 = vertex2.Gid();
ASSERT_FALSE(acc.FindVertex(gid2, storage::View::OLD).has_value());
ASSERT_TRUE(acc.FindVertex(gid2, storage::View::NEW).has_value());
ASSERT_TRUE(acc.FindVertex(gid1, storage::View::OLD).has_value());
ASSERT_TRUE(acc.FindVertex(gid1, storage::View::NEW).has_value());
acc.Abort();
}
{
auto acc = store.Access();
ASSERT_FALSE(acc.FindVertex(gid1, storage::View::OLD).has_value());
ASSERT_FALSE(acc.FindVertex(gid1, storage::View::NEW).has_value());
ASSERT_FALSE(acc.FindVertex(gid2, storage::View::OLD).has_value());
ASSERT_FALSE(acc.FindVertex(gid2, storage::View::NEW).has_value());
acc.Abort();
}
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST(StorageV2, SnapshotIsolation) {
storage::Storage store;
auto acc1 = store.Access();
auto acc2 = store.Access();
auto vertex = acc1.CreateVertex();
auto gid = vertex.Gid();
ASSERT_FALSE(acc2.FindVertex(gid, storage::View::OLD).has_value());
ASSERT_FALSE(acc2.FindVertex(gid, storage::View::NEW).has_value());
acc1.Commit();
ASSERT_FALSE(acc2.FindVertex(gid, storage::View::OLD).has_value());
ASSERT_FALSE(acc2.FindVertex(gid, storage::View::NEW).has_value());
acc2.Abort();
auto acc3 = store.Access();
ASSERT_TRUE(acc3.FindVertex(gid, storage::View::OLD).has_value());
ASSERT_TRUE(acc3.FindVertex(gid, storage::View::NEW).has_value());
acc3.Abort();
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST(StorageV2, AccessorMove) {
storage::Storage store;
storage::Gid gid =
storage::Gid::FromUint(std::numeric_limits<uint64_t>::max());
{
auto acc = store.Access();
auto vertex = acc.CreateVertex();
gid = vertex.Gid();
ASSERT_FALSE(acc.FindVertex(gid, storage::View::OLD).has_value());
ASSERT_TRUE(acc.FindVertex(gid, storage::View::NEW).has_value());
storage::Storage::Accessor moved(std::move(acc));
ASSERT_FALSE(moved.FindVertex(gid, storage::View::OLD).has_value());
ASSERT_TRUE(moved.FindVertex(gid, storage::View::NEW).has_value());
moved.Commit();
}
{
auto acc = store.Access();
ASSERT_TRUE(acc.FindVertex(gid, storage::View::OLD).has_value());
ASSERT_TRUE(acc.FindVertex(gid, storage::View::NEW).has_value());
acc.Abort();
}
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST(StorageV2, VertexLabelCommit) {
storage::Storage store;
storage::Gid gid =
storage::Gid::FromUint(std::numeric_limits<uint64_t>::max());
{
auto acc = store.Access();
auto vertex = acc.CreateVertex();
gid = vertex.Gid();
ASSERT_FALSE(vertex.HasLabel(5, storage::View::NEW));
ASSERT_EQ(vertex.Labels(storage::View::NEW).size(), 0);
{
auto res = vertex.AddLabel(5);
ASSERT_TRUE(res.IsReturn());
ASSERT_TRUE(*res.GetReturn());
}
ASSERT_TRUE(vertex.HasLabel(5, storage::View::NEW));
{
auto labels = vertex.Labels(storage::View::NEW);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 5);
}
{
auto res = vertex.AddLabel(5);
ASSERT_TRUE(res.IsReturn());
ASSERT_FALSE(*res.GetReturn());
}
acc.Commit();
}
{
auto acc = store.Access();
auto vertex = acc.FindVertex(gid, storage::View::OLD);
ASSERT_TRUE(vertex);
ASSERT_TRUE(vertex->HasLabel(5, storage::View::OLD));
{
auto labels = vertex->Labels(storage::View::OLD);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 5);
}
ASSERT_TRUE(vertex->HasLabel(5, storage::View::NEW));
{
auto labels = vertex->Labels(storage::View::NEW);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 5);
}
ASSERT_FALSE(vertex->HasLabel(10, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(10, storage::View::NEW));
acc.Abort();
}
{
auto acc = store.Access();
auto vertex = acc.FindVertex(gid, storage::View::OLD);
ASSERT_TRUE(vertex);
{
auto res = vertex->RemoveLabel(5);
ASSERT_TRUE(res.IsReturn());
ASSERT_TRUE(*res.GetReturn());
}
ASSERT_TRUE(vertex->HasLabel(5, storage::View::OLD));
{
auto labels = vertex->Labels(storage::View::OLD);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 5);
}
ASSERT_FALSE(vertex->HasLabel(5, storage::View::NEW));
ASSERT_EQ(vertex->Labels(storage::View::NEW).size(), 0);
{
auto res = vertex->RemoveLabel(5);
ASSERT_TRUE(res.IsReturn());
ASSERT_FALSE(*res.GetReturn());
}
acc.Commit();
}
{
auto acc = store.Access();
auto vertex = acc.FindVertex(gid, storage::View::OLD);
ASSERT_TRUE(vertex);
ASSERT_FALSE(vertex->HasLabel(5, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(5, storage::View::NEW));
ASSERT_EQ(vertex->Labels(storage::View::OLD).size(), 0);
ASSERT_EQ(vertex->Labels(storage::View::NEW).size(), 0);
ASSERT_FALSE(vertex->HasLabel(10, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(10, storage::View::NEW));
acc.Abort();
}
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST(StorageV2, VertexLabelAbort) {
storage::Storage store;
storage::Gid gid =
storage::Gid::FromUint(std::numeric_limits<uint64_t>::max());
// Create the vertex.
{
auto acc = store.Access();
auto vertex = acc.CreateVertex();
gid = vertex.Gid();
acc.Commit();
}
// Add label 5, but abort the transaction.
{
auto acc = store.Access();
auto vertex = acc.FindVertex(gid, storage::View::OLD);
ASSERT_TRUE(vertex);
ASSERT_FALSE(vertex->HasLabel(5, storage::View::NEW));
ASSERT_EQ(vertex->Labels(storage::View::NEW).size(), 0);
{
auto res = vertex->AddLabel(5);
ASSERT_TRUE(res.IsReturn());
ASSERT_TRUE(*res.GetReturn());
}
ASSERT_TRUE(vertex->HasLabel(5, storage::View::NEW));
{
auto labels = vertex->Labels(storage::View::NEW);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 5);
}
{
auto res = vertex->AddLabel(5);
ASSERT_TRUE(res.IsReturn());
ASSERT_FALSE(*res.GetReturn());
}
acc.Abort();
}
// Check that label 5 doesn't exist.
{
auto acc = store.Access();
auto vertex = acc.FindVertex(gid, storage::View::OLD);
ASSERT_TRUE(vertex);
ASSERT_FALSE(vertex->HasLabel(5, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(5, storage::View::NEW));
ASSERT_EQ(vertex->Labels(storage::View::OLD).size(), 0);
ASSERT_EQ(vertex->Labels(storage::View::NEW).size(), 0);
ASSERT_FALSE(vertex->HasLabel(10, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(10, storage::View::NEW));
acc.Abort();
}
// Add label 5.
{
auto acc = store.Access();
auto vertex = acc.FindVertex(gid, storage::View::OLD);
ASSERT_TRUE(vertex);
ASSERT_FALSE(vertex->HasLabel(5, storage::View::NEW));
ASSERT_EQ(vertex->Labels(storage::View::NEW).size(), 0);
{
auto res = vertex->AddLabel(5);
ASSERT_TRUE(res.IsReturn());
ASSERT_TRUE(*res.GetReturn());
}
ASSERT_TRUE(vertex->HasLabel(5, storage::View::NEW));
{
auto labels = vertex->Labels(storage::View::NEW);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 5);
}
{
auto res = vertex->AddLabel(5);
ASSERT_TRUE(res.IsReturn());
ASSERT_FALSE(*res.GetReturn());
}
acc.Commit();
}
// Check that label 5 exists.
{
auto acc = store.Access();
auto vertex = acc.FindVertex(gid, storage::View::OLD);
ASSERT_TRUE(vertex);
ASSERT_TRUE(vertex->HasLabel(5, storage::View::OLD));
{
auto labels = vertex->Labels(storage::View::OLD);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 5);
}
ASSERT_TRUE(vertex->HasLabel(5, storage::View::NEW));
{
auto labels = vertex->Labels(storage::View::NEW);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 5);
}
ASSERT_FALSE(vertex->HasLabel(10, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(10, storage::View::NEW));
acc.Abort();
}
// Remove label 5, but abort the transaction.
{
auto acc = store.Access();
auto vertex = acc.FindVertex(gid, storage::View::OLD);
ASSERT_TRUE(vertex);
{
auto res = vertex->RemoveLabel(5);
ASSERT_TRUE(res.IsReturn());
ASSERT_TRUE(*res.GetReturn());
}
ASSERT_TRUE(vertex->HasLabel(5, storage::View::OLD));
{
auto labels = vertex->Labels(storage::View::OLD);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 5);
}
ASSERT_FALSE(vertex->HasLabel(5, storage::View::NEW));
ASSERT_EQ(vertex->Labels(storage::View::NEW).size(), 0);
{
auto res = vertex->RemoveLabel(5);
ASSERT_TRUE(res.IsReturn());
ASSERT_FALSE(*res.GetReturn());
}
acc.Abort();
}
// Check that label 5 exists.
{
auto acc = store.Access();
auto vertex = acc.FindVertex(gid, storage::View::OLD);
ASSERT_TRUE(vertex);
ASSERT_TRUE(vertex->HasLabel(5, storage::View::OLD));
{
auto labels = vertex->Labels(storage::View::OLD);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 5);
}
ASSERT_TRUE(vertex->HasLabel(5, storage::View::NEW));
{
auto labels = vertex->Labels(storage::View::NEW);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 5);
}
ASSERT_FALSE(vertex->HasLabel(10, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(10, storage::View::NEW));
acc.Abort();
}
// Remove label 5.
{
auto acc = store.Access();
auto vertex = acc.FindVertex(gid, storage::View::OLD);
ASSERT_TRUE(vertex);
{
auto res = vertex->RemoveLabel(5);
ASSERT_TRUE(res.IsReturn());
ASSERT_TRUE(*res.GetReturn());
}
ASSERT_TRUE(vertex->HasLabel(5, storage::View::OLD));
{
auto labels = vertex->Labels(storage::View::OLD);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 5);
}
ASSERT_FALSE(vertex->HasLabel(5, storage::View::NEW));
ASSERT_EQ(vertex->Labels(storage::View::NEW).size(), 0);
{
auto res = vertex->RemoveLabel(5);
ASSERT_TRUE(res.IsReturn());
ASSERT_FALSE(*res.GetReturn());
}
acc.Commit();
}
// Check that label 5 doesn't exist.
{
auto acc = store.Access();
auto vertex = acc.FindVertex(gid, storage::View::OLD);
ASSERT_TRUE(vertex);
ASSERT_FALSE(vertex->HasLabel(5, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(5, storage::View::NEW));
ASSERT_EQ(vertex->Labels(storage::View::OLD).size(), 0);
ASSERT_EQ(vertex->Labels(storage::View::NEW).size(), 0);
ASSERT_FALSE(vertex->HasLabel(10, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(10, storage::View::NEW));
acc.Abort();
}
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST(StorageV2, VertexLabelSerializationError) {
storage::Storage store;
storage::Gid gid =
storage::Gid::FromUint(std::numeric_limits<uint64_t>::max());
{
auto acc = store.Access();
auto vertex = acc.CreateVertex();
gid = vertex.Gid();
acc.Commit();
}
auto acc1 = store.Access();
auto acc2 = store.Access();
// Add label 10 in accessor 1.
{
auto vertex = acc1.FindVertex(gid, storage::View::OLD);
ASSERT_TRUE(vertex);
ASSERT_FALSE(vertex->HasLabel(1, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(1, storage::View::NEW));
ASSERT_FALSE(vertex->HasLabel(2, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(2, storage::View::NEW));
ASSERT_EQ(vertex->Labels(storage::View::OLD).size(), 0);
ASSERT_EQ(vertex->Labels(storage::View::NEW).size(), 0);
{
auto res = vertex->AddLabel(1);
ASSERT_TRUE(res.IsReturn());
ASSERT_TRUE(*res.GetReturn());
}
ASSERT_FALSE(vertex->HasLabel(1, storage::View::OLD));
ASSERT_TRUE(vertex->HasLabel(1, storage::View::NEW));
ASSERT_FALSE(vertex->HasLabel(2, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(2, storage::View::NEW));
ASSERT_EQ(vertex->Labels(storage::View::OLD).size(), 0);
{
auto labels = vertex->Labels(storage::View::NEW);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 1);
}
{
auto res = vertex->AddLabel(1);
ASSERT_TRUE(res.IsReturn());
ASSERT_FALSE(*res.GetReturn());
}
}
// Add label 2 in accessor 2.
{
auto vertex = acc2.FindVertex(gid, storage::View::OLD);
ASSERT_TRUE(vertex);
ASSERT_FALSE(vertex->HasLabel(1, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(1, storage::View::NEW));
ASSERT_FALSE(vertex->HasLabel(2, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(2, storage::View::NEW));
ASSERT_EQ(vertex->Labels(storage::View::OLD).size(), 0);
ASSERT_EQ(vertex->Labels(storage::View::NEW).size(), 0);
{
auto res = vertex->AddLabel(1);
ASSERT_TRUE(res.IsError());
ASSERT_EQ(res.GetError(), storage::Error::SERIALIZATION_ERROR);
}
}
// Finalize both accessors.
acc1.Commit();
acc2.Abort();
// Check which labels exist.
{
auto acc = store.Access();
auto vertex = acc.FindVertex(gid, storage::View::OLD);
ASSERT_TRUE(vertex);
ASSERT_TRUE(vertex->HasLabel(1, storage::View::OLD));
ASSERT_FALSE(vertex->HasLabel(2, storage::View::OLD));
{
auto labels = vertex->Labels(storage::View::OLD);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 1);
}
ASSERT_TRUE(vertex->HasLabel(1, storage::View::NEW));
ASSERT_FALSE(vertex->HasLabel(2, storage::View::NEW));
{
auto labels = vertex->Labels(storage::View::NEW);
ASSERT_EQ(labels.size(), 1);
ASSERT_EQ(labels[0], 1);
}
acc.Abort();
}
}