Split storage and shards (#519)

- Rename storage to shard
- Add primary label and range for shard
- Remove id_mapper functionality from shard
- Adapt tests
This commit is contained in:
Jure Bajic 2022-09-01 09:10:40 +02:00 committed by GitHub
parent efb3c8d03d
commit 7e84744d07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 2869 additions and 2775 deletions

View File

@ -16,7 +16,7 @@
#include <vector>
#include "storage/v3/edge_accessor.hpp"
#include "storage/v3/storage.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/vertex_accessor.hpp"
#include "utils/temporal.hpp"
@ -64,16 +64,16 @@ query::v2::TypedValue ToTypedValue(const Value &value) {
}
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(const query::v2::VertexAccessor &vertex,
const storage::v3::Storage &db, storage::v3::View view) {
const storage::v3::Shard &db, storage::v3::View view) {
return ToBoltVertex(vertex.impl_, db, view);
}
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(const query::v2::EdgeAccessor &edge,
const storage::v3::Storage &db, storage::v3::View view) {
const storage::v3::Shard &db, storage::v3::View view) {
return ToBoltEdge(edge.impl_, db, view);
}
storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const storage::v3::Storage &db,
storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const storage::v3::Shard &db,
storage::v3::View view) {
switch (value.type()) {
case query::v2::TypedValue::Type::Null:
@ -132,7 +132,7 @@ storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const
}
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(const storage::v3::VertexAccessor &vertex,
const storage::v3::Storage &db, storage::v3::View view) {
const storage::v3::Shard &db, storage::v3::View view) {
// TODO(jbajic) Fix bolt communication
auto id = communication::bolt::Id::FromUint(0);
auto maybe_labels = vertex.Labels(view);
@ -152,7 +152,7 @@ storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(const storage::v3:
}
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(const storage::v3::EdgeAccessor &edge,
const storage::v3::Storage &db, storage::v3::View view) {
const storage::v3::Shard &db, storage::v3::View view) {
// TODO(jbajic) Fix bolt communication
auto id = communication::bolt::Id::FromUint(0);
auto from = communication::bolt::Id::FromUint(0);
@ -167,7 +167,7 @@ storage::v3::Result<communication::bolt::Edge> ToBoltEdge(const storage::v3::Edg
return communication::bolt::Edge{id, from, to, type, properties};
}
storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::Path &path, const storage::v3::Storage &db,
storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::Path &path, const storage::v3::Shard &db,
storage::v3::View view) {
std::vector<communication::bolt::Vertex> vertices;
vertices.reserve(path.vertices().size());

View File

@ -28,36 +28,36 @@ namespace memgraph::glue::v2 {
/// @param storage::v3::VertexAccessor for converting to
/// communication::bolt::Vertex.
/// @param storage::v3::Storage for getting label and property names.
/// @param storage::v3::Shard for getting label and property names.
/// @param storage::v3::View for deciding which vertex attributes are visible.
///
/// @throw std::bad_alloc
storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(const storage::v3::VertexAccessor &vertex,
const storage::v3::Storage &db, storage::v3::View view);
const storage::v3::Shard &db, storage::v3::View view);
/// @param storage::v3::EdgeAccessor for converting to communication::bolt::Edge.
/// @param storage::v3::Storage for getting edge type and property names.
/// @param storage::v3::Shard for getting edge type and property names.
/// @param storage::v3::View for deciding which edge attributes are visible.
///
/// @throw std::bad_alloc
storage::v3::Result<communication::bolt::Edge> ToBoltEdge(const storage::v3::EdgeAccessor &edge,
const storage::v3::Storage &db, storage::v3::View view);
const storage::v3::Shard &db, storage::v3::View view);
/// @param query::v2::Path for converting to communication::bolt::Path.
/// @param storage::v3::Storage for ToBoltVertex and ToBoltEdge.
/// @param storage::v3::Shard for ToBoltVertex and ToBoltEdge.
/// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
///
/// @throw std::bad_alloc
storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::Path &path, const storage::v3::Storage &db,
storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::Path &path, const storage::v3::Shard &db,
storage::v3::View view);
/// @param query::v2::TypedValue for converting to communication::bolt::Value.
/// @param storage::v3::Storage for ToBoltVertex and ToBoltEdge.
/// @param storage::v3::Shard for ToBoltVertex and ToBoltEdge.
/// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
///
/// @throw std::bad_alloc
storage::v3::Result<communication::bolt::Value> ToBoltValue(const query::v2::TypedValue &value,
const storage::v3::Storage &db, storage::v3::View view);
const storage::v3::Shard &db, storage::v3::View view);
query::v2::TypedValue ToTypedValue(const communication::bolt::Value &value);

View File

@ -212,7 +212,7 @@ inline VertexAccessor EdgeAccessor::From() const { return VertexAccessor(impl_.F
inline bool EdgeAccessor::IsCycle() const { return To() == From(); }
class DbAccessor final {
storage::v3::Storage::Accessor *accessor_;
storage::v3::Shard::Accessor *accessor_;
class VerticesIterable final {
storage::v3::VerticesIterable iterable_;
@ -244,7 +244,7 @@ class DbAccessor final {
};
public:
explicit DbAccessor(storage::v3::Storage::Accessor *accessor) : accessor_(accessor) {}
explicit DbAccessor(storage::v3::Shard::Accessor *accessor) : accessor_(accessor) {}
// TODO(jbajic) Fix Remove Gid
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
@ -348,11 +348,20 @@ class DbAccessor final {
return {std::make_optional<VertexAccessor>(*value)};
}
storage::v3::PropertyId NameToProperty(const std::string_view name) { return accessor_->NameToProperty(name); }
// TODO(jbajic) Query engine should have a map of labels, properties and edge
// types
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
storage::v3::PropertyId NameToProperty(const std::string_view /*name*/) {
return storage::v3::PropertyId::FromUint(0);
}
storage::v3::LabelId NameToLabel(const std::string_view name) { return accessor_->NameToLabel(name); }
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
storage::v3::LabelId NameToLabel(const std::string_view /*name*/) { return storage::v3::LabelId::FromUint(0); }
storage::v3::EdgeTypeId NameToEdgeType(const std::string_view name) { return accessor_->NameToEdgeType(name); }
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
storage::v3::EdgeTypeId NameToEdgeType(const std::string_view /*name*/) {
return storage::v3::EdgeTypeId::FromUint(0);
}
const std::string &PropertyToName(storage::v3::PropertyId prop) const { return accessor_->PropertyToName(prop); }

View File

@ -43,6 +43,7 @@
#include "query/v2/trigger.hpp"
#include "query/v2/typed_value.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/storage.hpp"
#include "utils/algorithm.hpp"
#include "utils/csv_parsing.hpp"
@ -127,7 +128,7 @@ std::optional<std::string> GetOptionalStringValue(query::v2::Expression *express
class ReplQueryHandler final : public query::v2::ReplicationQueryHandler {
public:
explicit ReplQueryHandler(storage::v3::Storage *db) : db_(db) {}
explicit ReplQueryHandler(storage::v3::Shard *db) : db_(db) {}
/// @throw QueryRuntimeException if an error ocurred.
void SetReplicationRole(ReplicationQuery::ReplicationRole replication_role, std::optional<int64_t> port) override {
@ -255,7 +256,7 @@ class ReplQueryHandler final : public query::v2::ReplicationQueryHandler {
}
private:
storage::v3::Storage *db_;
storage::v3::Shard *db_;
};
/// returns false if the replication role can't be set
/// @throw QueryRuntimeException if an error ocurred.
@ -913,7 +914,7 @@ Callback HandleSchemaQuery(SchemaQuery *schema_query, InterpreterContext *interp
callback.header = {"property_name", "property_type"};
callback.fn = [interpreter_context, primary_label = schema_query->label_]() {
auto *db = interpreter_context->db;
const auto label = db->NameToLabel(primary_label.name);
const auto label = interpreter_context->NameToLabelId(primary_label.name);
const auto *schema = db->GetSchema(label);
std::vector<std::vector<TypedValue>> results;
if (schema) {
@ -938,11 +939,11 @@ Callback HandleSchemaQuery(SchemaQuery *schema_query, InterpreterContext *interp
callback.fn = [interpreter_context, primary_label = schema_query->label_,
schema_type_map = std::move(schema_type_map)]() {
auto *db = interpreter_context->db;
const auto label = db->NameToLabel(primary_label.name);
const auto label = interpreter_context->NameToLabelId(primary_label.name);
std::vector<storage::v3::SchemaProperty> schemas_types;
schemas_types.reserve(schema_type_map.size());
for (const auto &schema_type : schema_type_map) {
auto property_id = db->NameToProperty(schema_type.first.name);
auto property_id = interpreter_context->NameToPropertyId(schema_type.first.name);
schemas_types.push_back({property_id, schema_type.second});
}
if (!db->CreateSchema(label, schemas_types)) {
@ -957,7 +958,7 @@ Callback HandleSchemaQuery(SchemaQuery *schema_query, InterpreterContext *interp
case SchemaQuery::Action::DROP_SCHEMA: {
callback.fn = [interpreter_context, primary_label = schema_query->label_]() {
auto *db = interpreter_context->db;
const auto label = db->NameToLabel(primary_label.name);
const auto label = interpreter_context->NameToLabelId(primary_label.name);
if (!db->DropSchema(label)) {
throw QueryException(fmt::format("Schema on label :{} does not exist!", primary_label.name));
@ -1138,7 +1139,7 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *strea
using RWType = plan::ReadWriteTypeChecker::RWType;
} // namespace
InterpreterContext::InterpreterContext(storage::v3::Storage *db, const InterpreterConfig config,
InterpreterContext::InterpreterContext(storage::v3::Shard *db, const InterpreterConfig config,
const std::filesystem::path &data_directory)
: db(db), trigger_store(data_directory / "triggers"), config(config), streams{this, data_directory / "streams"} {}
@ -1157,8 +1158,8 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
in_explicit_transaction_ = true;
expect_rollback_ = false;
db_accessor_ = std::make_unique<storage::v3::Storage::Accessor>(
interpreter_context_->db->Access(GetIsolationLevelOverride()));
db_accessor_ =
std::make_unique<storage::v3::Shard::Accessor>(interpreter_context_->db->Access(GetIsolationLevelOverride()));
execution_db_accessor_.emplace(db_accessor_.get());
if (interpreter_context_->trigger_store.HasTriggers()) {
@ -1427,14 +1428,14 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans
}
};
auto label = interpreter_context->db->NameToLabel(index_query->label_.name);
auto label = interpreter_context->NameToLabelId(index_query->label_.name);
std::vector<storage::v3::PropertyId> properties;
std::vector<std::string> properties_string;
properties.reserve(index_query->properties_.size());
properties_string.reserve(index_query->properties_.size());
for (const auto &prop : index_query->properties_) {
properties.push_back(interpreter_context->db->NameToProperty(prop.name));
properties.push_back(interpreter_context->NameToPropertyId(prop.name));
properties_string.push_back(prop.name);
}
auto properties_stringified = utils::Join(properties_string, ", ");
@ -1842,7 +1843,7 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli
[interpreter_context](AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
if (auto maybe_error = interpreter_context->db->CreateSnapshot(); maybe_error.HasError()) {
switch (maybe_error.GetError()) {
case storage::v3::Storage::CreateSnapshotError::DisabledForReplica:
case storage::v3::Shard::CreateSnapshotError::DisabledForReplica:
throw utils::BasicException(
"Failed to create a snapshot. Replica instances are not allowed to create them.");
}
@ -1897,8 +1898,8 @@ PreparedQuery PrepareVersionQuery(ParsedQuery parsed_query, const bool in_explic
}
PreparedQuery PrepareInfoQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
storage::v3::Storage *db, utils::MemoryResource *execution_memory) {
std::map<std::string, TypedValue> * /*summary*/, InterpreterContext *interpreter_context,
storage::v3::Shard *db, utils::MemoryResource * /*execution_memory*/) {
if (in_explicit_transaction) {
throw InfoInMulticommandTxException();
}
@ -1994,13 +1995,13 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_
auto *constraint_query = utils::Downcast<ConstraintQuery>(parsed_query.query);
std::function<void(Notification &)> handler;
auto label = interpreter_context->db->NameToLabel(constraint_query->constraint_.label.name);
auto label = interpreter_context->NameToLabelId(constraint_query->constraint_.label.name);
std::vector<storage::v3::PropertyId> properties;
std::vector<std::string> properties_string;
properties.reserve(constraint_query->constraint_.properties.size());
properties_string.reserve(constraint_query->constraint_.properties.size());
for (const auto &prop : constraint_query->constraint_.properties) {
properties.push_back(interpreter_context->db->NameToProperty(prop.name));
properties.push_back(interpreter_context->NameToPropertyId(prop.name));
properties_string.push_back(prop.name);
}
auto properties_stringified = utils::Join(properties_string, ", ");
@ -2259,8 +2260,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
(utils::Downcast<CypherQuery>(parsed_query.query) || utils::Downcast<ExplainQuery>(parsed_query.query) ||
utils::Downcast<ProfileQuery>(parsed_query.query) || utils::Downcast<DumpQuery>(parsed_query.query) ||
utils::Downcast<TriggerQuery>(parsed_query.query))) {
db_accessor_ = std::make_unique<storage::v3::Storage::Accessor>(
interpreter_context_->db->Access(GetIsolationLevelOverride()));
db_accessor_ =
std::make_unique<storage::v3::Shard::Accessor>(interpreter_context_->db->Access(GetIsolationLevelOverride()));
execution_db_accessor_.emplace(db_accessor_.get());
if (utils::Downcast<CypherQuery>(parsed_query.query) && interpreter_context_->trigger_store.HasTriggers()) {

View File

@ -31,6 +31,7 @@
#include "query/v2/trigger.hpp"
#include "query/v2/typed_value.hpp"
#include "storage/v3/isolation_level.hpp"
#include "storage/v3/name_id_mapper.hpp"
#include "utils/event_counter.hpp"
#include "utils/logging.hpp"
#include "utils/memory.hpp"
@ -166,10 +167,10 @@ struct PreparedQuery {
* been passed to an `Interpreter` instance.
*/
struct InterpreterContext {
explicit InterpreterContext(storage::v3::Storage *db, InterpreterConfig config,
explicit InterpreterContext(storage::v3::Shard *db, InterpreterConfig config,
const std::filesystem::path &data_directory);
storage::v3::Storage *db;
storage::v3::Shard *db;
std::optional<double> tsc_frequency{utils::GetTSCFrequency()};
std::atomic<bool> is_shutting_down{false};
@ -186,6 +187,22 @@ struct InterpreterContext {
const InterpreterConfig config;
query::v2::stream::Streams streams;
storage::v3::LabelId NameToLabelId(std::string_view label_name) {
return storage::v3::LabelId::FromUint(query_id_mapper.NameToId(label_name));
}
storage::v3::PropertyId NameToPropertyId(std::string_view property_name) {
return storage::v3::PropertyId::FromUint(query_id_mapper.NameToId(property_name));
}
storage::v3::EdgeTypeId NameToEdgeTypeId(std::string_view edge_type_name) {
return storage::v3::EdgeTypeId::FromUint(query_id_mapper.NameToId(edge_type_name));
}
private:
// TODO Replace with local map of labels, properties and edge type ids
storage::v3::NameIdMapper query_id_mapper;
};
/// Function that is used to tell all active interpreters that they should stop
@ -316,7 +333,7 @@ class Interpreter final {
// This cannot be std::optional because we need to move this accessor later on into a lambda capture
// which is assigned to std::function. std::function requires every object to be copyable, so we
// move this unique_ptr into a shrared_ptr.
std::unique_ptr<storage::v3::Storage::Accessor> db_accessor_;
std::unique_ptr<storage::v3::Shard::Accessor> db_accessor_;
std::optional<DbAccessor> execution_db_accessor_;
std::optional<TriggerContextCollector> trigger_context_collector_;
bool in_explicit_transaction_{false};

View File

@ -83,7 +83,7 @@ std::pair<TypedValue /*query*/, TypedValue /*parameters*/> ExtractTransformation
template <typename TMessage>
void CallCustomTransformation(const std::string &transformation_name, const std::vector<TMessage> &messages,
mgp_result &result, storage::v3::Storage::Accessor &storage_accessor,
mgp_result &result, storage::v3::Shard::Accessor &storage_accessor,
utils::MemoryResource &memory_resource, const std::string &stream_name) {
DbAccessor db_accessor{&storage_accessor};
{

View File

@ -14,6 +14,7 @@ set(storage_v3_src_files
vertex_accessor.cpp
schemas.cpp
schema_validator.cpp
shard.cpp
storage.cpp)
# #### Replication #####

View File

@ -31,7 +31,7 @@ struct Constraints;
class EdgeAccessor final {
private:
friend class Storage;
friend class Shard;
public:
EdgeAccessor(EdgeRef edge, EdgeTypeId edge_type, Vertex *from_vertex, Vertex *to_vertex, Transaction *transaction,

View File

@ -30,10 +30,10 @@ template <typename>
} // namespace
////// ReplicationClient //////
Storage::ReplicationClient::ReplicationClient(std::string name, Storage *storage, const io::network::Endpoint &endpoint,
const replication::ReplicationMode mode,
const replication::ReplicationClientConfig &config)
: name_(std::move(name)), storage_(storage), mode_(mode) {
Shard::ReplicationClient::ReplicationClient(std::string name, Shard *shard, const io::network::Endpoint &endpoint,
const replication::ReplicationMode mode,
const replication::ReplicationClientConfig &config)
: name_(std::move(name)), shard_(shard), mode_(mode) {
if (config.ssl) {
rpc_context_.emplace(config.ssl->key_file, config.ssl->cert_file);
} else {
@ -54,14 +54,14 @@ Storage::ReplicationClient::ReplicationClient(std::string name, Storage *storage
}
}
void Storage::ReplicationClient::TryInitializeClientAsync() {
void Shard::ReplicationClient::TryInitializeClientAsync() {
thread_pool_.AddTask([this] {
rpc_client_->Abort();
this->TryInitializeClientSync();
});
}
void Storage::ReplicationClient::FrequentCheck() {
void Shard::ReplicationClient::FrequentCheck() {
const auto is_success = std::invoke([this]() {
try {
auto stream{rpc_client_->Stream<replication::FrequentHeartbeatRpc>()};
@ -87,15 +87,15 @@ void Storage::ReplicationClient::FrequentCheck() {
}
/// @throws rpc::RpcFailedException
void Storage::ReplicationClient::InitializeClient() {
void Shard::ReplicationClient::InitializeClient() {
uint64_t current_commit_timestamp{kTimestampInitialId};
auto stream{rpc_client_->Stream<replication::HeartbeatRpc>(storage_->last_commit_timestamp_, storage_->epoch_id_)};
auto stream{rpc_client_->Stream<replication::HeartbeatRpc>(shard_->last_commit_timestamp_, shard_->epoch_id_)};
const auto response = stream.AwaitResponse();
std::optional<uint64_t> branching_point;
if (response.epoch_id != storage_->epoch_id_ && response.current_commit_timestamp != kTimestampInitialId) {
const auto &epoch_history = storage_->epoch_history_;
if (response.epoch_id != shard_->epoch_id_ && response.current_commit_timestamp != kTimestampInitialId) {
const auto &epoch_history = shard_->epoch_history_;
const auto epoch_info_iter =
std::find_if(epoch_history.crbegin(), epoch_history.crend(),
[&](const auto &epoch_info) { return epoch_info.first == response.epoch_id; });
@ -115,8 +115,8 @@ void Storage::ReplicationClient::InitializeClient() {
current_commit_timestamp = response.current_commit_timestamp;
spdlog::trace("Current timestamp on replica: {}", current_commit_timestamp);
spdlog::trace("Current timestamp on main: {}", storage_->last_commit_timestamp_);
if (current_commit_timestamp == storage_->last_commit_timestamp_) {
spdlog::trace("Current timestamp on main: {}", shard_->last_commit_timestamp_);
if (current_commit_timestamp == shard_->last_commit_timestamp_) {
spdlog::debug("Replica '{}' up to date", name_);
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::READY);
@ -130,7 +130,7 @@ void Storage::ReplicationClient::InitializeClient() {
}
}
void Storage::ReplicationClient::TryInitializeClientSync() {
void Shard::ReplicationClient::TryInitializeClientSync() {
try {
InitializeClient();
} catch (const rpc::RpcFailedException &) {
@ -141,19 +141,19 @@ void Storage::ReplicationClient::TryInitializeClientSync() {
}
}
void Storage::ReplicationClient::HandleRpcFailure() {
void Shard::ReplicationClient::HandleRpcFailure() {
spdlog::error(utils::MessageWithLink("Couldn't replicate data to {}.", name_, "https://memgr.ph/replication"));
TryInitializeClientAsync();
}
replication::SnapshotRes Storage::ReplicationClient::TransferSnapshot(const std::filesystem::path &path) {
replication::SnapshotRes Shard::ReplicationClient::TransferSnapshot(const std::filesystem::path &path) {
auto stream{rpc_client_->Stream<replication::SnapshotRpc>()};
replication::Encoder encoder(stream.GetBuilder());
encoder.WriteFile(path);
return stream.AwaitResponse();
}
replication::WalFilesRes Storage::ReplicationClient::TransferWalFiles(
replication::WalFilesRes Shard::ReplicationClient::TransferWalFiles(
const std::vector<std::filesystem::path> &wal_files) {
MG_ASSERT(!wal_files.empty(), "Wal files list is empty!");
auto stream{rpc_client_->Stream<replication::WalFilesRpc>(wal_files.size())};
@ -166,7 +166,7 @@ replication::WalFilesRes Storage::ReplicationClient::TransferWalFiles(
return stream.AwaitResponse();
}
void Storage::ReplicationClient::StartTransactionReplication(const uint64_t current_wal_seq_num) {
void Shard::ReplicationClient::StartTransactionReplication(const uint64_t current_wal_seq_num) {
std::unique_lock guard(client_lock_);
const auto status = replica_state_.load();
switch (status) {
@ -190,7 +190,7 @@ void Storage::ReplicationClient::StartTransactionReplication(const uint64_t curr
case replication::ReplicaState::READY:
MG_ASSERT(!replica_stream_);
try {
replica_stream_.emplace(ReplicaStream{this, storage_->last_commit_timestamp_, current_wal_seq_num});
replica_stream_.emplace(ReplicaStream{this, shard_->last_commit_timestamp_, current_wal_seq_num});
replica_state_.store(replication::ReplicaState::REPLICATING);
} catch (const rpc::RpcFailedException &) {
replica_state_.store(replication::ReplicaState::INVALID);
@ -200,7 +200,7 @@ void Storage::ReplicationClient::StartTransactionReplication(const uint64_t curr
}
}
void Storage::ReplicationClient::IfStreamingTransaction(const std::function<void(ReplicaStream &handler)> &callback) {
void Shard::ReplicationClient::IfStreamingTransaction(const std::function<void(ReplicaStream &handler)> &callback) {
// We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption
// that this and other transaction replication functions can only be
@ -220,7 +220,7 @@ void Storage::ReplicationClient::IfStreamingTransaction(const std::function<void
}
}
void Storage::ReplicationClient::FinalizeTransactionReplication() {
void Shard::ReplicationClient::FinalizeTransactionReplication() {
// We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption
// that this and other transaction replication functions can only be
@ -271,7 +271,7 @@ void Storage::ReplicationClient::FinalizeTransactionReplication() {
}
}
void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
void Shard::ReplicationClient::FinalizeTransactionReplicationInternal() {
MG_ASSERT(replica_stream_, "Missing stream for transaction deltas");
try {
auto response = replica_stream_->Finalize();
@ -293,9 +293,9 @@ void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
}
}
void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
void Shard::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
while (true) {
auto file_locker = storage_->file_retainer_.AddLocker();
auto file_locker = shard_->file_retainer_.AddLocker();
const auto steps = GetRecoverySteps(replica_commit, &file_locker);
for (const auto &recovery_step : steps) {
@ -312,11 +312,11 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
auto response = TransferWalFiles(arg);
replica_commit = response.current_commit_timestamp;
} else if constexpr (std::is_same_v<StepType, RecoveryCurrentWal>) {
if (storage_->wal_file_ && storage_->wal_file_->SequenceNumber() == arg.current_wal_seq_num) {
storage_->wal_file_->DisableFlushing();
if (shard_->wal_file_ && shard_->wal_file_->SequenceNumber() == arg.current_wal_seq_num) {
shard_->wal_file_->DisableFlushing();
spdlog::debug("Sending current wal file");
replica_commit = ReplicateCurrentWal();
storage_->wal_file_->EnableFlushing();
shard_->wal_file_->EnableFlushing();
}
} else {
static_assert(always_false_v<T>, "Missing type from variant visitor");
@ -345,20 +345,20 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
// By adding this lock, we can avoid that, and go to RECOVERY immediately.
std::unique_lock client_guard{client_lock_};
SPDLOG_INFO("Replica timestamp: {}", replica_commit);
SPDLOG_INFO("Last commit: {}", storage_->last_commit_timestamp_);
if (storage_->last_commit_timestamp_ == replica_commit) {
SPDLOG_INFO("Last commit: {}", shard_->last_commit_timestamp_);
if (shard_->last_commit_timestamp_ == replica_commit) {
replica_state_.store(replication::ReplicaState::READY);
return;
}
}
}
uint64_t Storage::ReplicationClient::ReplicateCurrentWal() {
const auto &wal_file = storage_->wal_file_;
uint64_t Shard::ReplicationClient::ReplicateCurrentWal() {
const auto &wal_file = shard_->wal_file_;
auto stream = TransferCurrentWalFile();
stream.AppendFilename(wal_file->Path().filename());
utils::InputFile file;
MG_ASSERT(file.Open(storage_->wal_file_->Path()), "Failed to open current WAL file!");
MG_ASSERT(file.Open(shard_->wal_file_->Path()), "Failed to open current WAL file!");
const auto [buffer, buffer_size] = wal_file->CurrentFileBuffer();
stream.AppendSize(file.GetSize() + buffer_size);
stream.AppendFileData(&file);
@ -387,23 +387,23 @@ uint64_t Storage::ReplicationClient::ReplicateCurrentWal() {
/// recovery steps, so we can safely send it to the replica.
/// We assume that the property of preserving at least 1 WAL before the snapshot
/// is satisfied as we extract the timestamp information from it.
std::vector<Storage::ReplicationClient::RecoveryStep> Storage::ReplicationClient::GetRecoverySteps(
std::vector<Shard::ReplicationClient::RecoveryStep> Shard::ReplicationClient::GetRecoverySteps(
const uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker) {
// First check if we can recover using the current wal file only
// otherwise save the seq_num of the current wal file
// This lock is also necessary to force the missed transaction to finish.
std::optional<uint64_t> current_wal_seq_num;
std::optional<uint64_t> current_wal_from_timestamp;
if (storage_->wal_file_) {
current_wal_seq_num.emplace(storage_->wal_file_->SequenceNumber());
current_wal_from_timestamp.emplace(storage_->wal_file_->FromTimestamp());
if (shard_->wal_file_) {
current_wal_seq_num.emplace(shard_->wal_file_->SequenceNumber());
current_wal_from_timestamp.emplace(shard_->wal_file_->FromTimestamp());
}
auto locker_acc = file_locker->Access();
auto wal_files = durability::GetWalFiles(storage_->wal_directory_, storage_->uuid_, current_wal_seq_num);
auto wal_files = durability::GetWalFiles(shard_->wal_directory_, shard_->uuid_, current_wal_seq_num);
MG_ASSERT(wal_files, "Wal files could not be loaded");
auto snapshot_files = durability::GetSnapshotFiles(storage_->snapshot_directory_, storage_->uuid_);
auto snapshot_files = durability::GetSnapshotFiles(shard_->snapshot_directory_, shard_->uuid_);
std::optional<durability::SnapshotDurabilityInfo> latest_snapshot;
if (!snapshot_files.empty()) {
std::sort(snapshot_files.begin(), snapshot_files.end());
@ -529,13 +529,13 @@ std::vector<Storage::ReplicationClient::RecoveryStep> Storage::ReplicationClient
}
////// TimeoutDispatcher //////
void Storage::ReplicationClient::TimeoutDispatcher::WaitForTaskToFinish() {
void Shard::ReplicationClient::TimeoutDispatcher::WaitForTaskToFinish() {
// Wait for the previous timeout task to finish
std::unique_lock main_guard(main_lock);
main_cv.wait(main_guard, [&] { return finished; });
}
void Storage::ReplicationClient::TimeoutDispatcher::StartTimeoutTask(const double timeout) {
void Shard::ReplicationClient::TimeoutDispatcher::StartTimeoutTask(const double timeout) {
timeout_pool.AddTask([timeout, this] {
finished = false;
using std::chrono::steady_clock;
@ -553,65 +553,65 @@ void Storage::ReplicationClient::TimeoutDispatcher::StartTimeoutTask(const doubl
});
}
////// ReplicaStream //////
Storage::ReplicationClient::ReplicaStream::ReplicaStream(ReplicationClient *self,
const uint64_t previous_commit_timestamp,
const uint64_t current_seq_num)
Shard::ReplicationClient::ReplicaStream::ReplicaStream(ReplicationClient *self,
const uint64_t previous_commit_timestamp,
const uint64_t current_seq_num)
: self_(self),
stream_(self_->rpc_client_->Stream<replication::AppendDeltasRpc>(previous_commit_timestamp, current_seq_num)) {
replication::Encoder encoder{stream_.GetBuilder()};
encoder.WriteString(self_->storage_->epoch_id_);
encoder.WriteString(self_->shard_->epoch_id_);
}
void Storage::ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Vertex &vertex,
uint64_t final_commit_timestamp) {
void Shard::ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Vertex &vertex,
uint64_t final_commit_timestamp) {
replication::Encoder encoder(stream_.GetBuilder());
EncodeDelta(&encoder, &self_->storage_->name_id_mapper_, self_->storage_->config_.items, delta, vertex,
EncodeDelta(&encoder, &self_->shard_->name_id_mapper_, self_->shard_->config_.items, delta, vertex,
final_commit_timestamp);
}
void Storage::ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Edge &edge,
uint64_t final_commit_timestamp) {
void Shard::ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Edge &edge,
uint64_t final_commit_timestamp) {
replication::Encoder encoder(stream_.GetBuilder());
EncodeDelta(&encoder, &self_->storage_->name_id_mapper_, delta, edge, final_commit_timestamp);
EncodeDelta(&encoder, &self_->shard_->name_id_mapper_, delta, edge, final_commit_timestamp);
}
void Storage::ReplicationClient::ReplicaStream::AppendTransactionEnd(uint64_t final_commit_timestamp) {
void Shard::ReplicationClient::ReplicaStream::AppendTransactionEnd(uint64_t final_commit_timestamp) {
replication::Encoder encoder(stream_.GetBuilder());
EncodeTransactionEnd(&encoder, final_commit_timestamp);
}
void Storage::ReplicationClient::ReplicaStream::AppendOperation(durability::StorageGlobalOperation operation,
LabelId label, const std::set<PropertyId> &properties,
uint64_t timestamp) {
void Shard::ReplicationClient::ReplicaStream::AppendOperation(durability::StorageGlobalOperation operation,
LabelId label, const std::set<PropertyId> &properties,
uint64_t timestamp) {
replication::Encoder encoder(stream_.GetBuilder());
EncodeOperation(&encoder, &self_->storage_->name_id_mapper_, operation, label, properties, timestamp);
EncodeOperation(&encoder, &self_->shard_->name_id_mapper_, operation, label, properties, timestamp);
}
replication::AppendDeltasRes Storage::ReplicationClient::ReplicaStream::Finalize() { return stream_.AwaitResponse(); }
replication::AppendDeltasRes Shard::ReplicationClient::ReplicaStream::Finalize() { return stream_.AwaitResponse(); }
////// CurrentWalHandler //////
Storage::ReplicationClient::CurrentWalHandler::CurrentWalHandler(ReplicationClient *self)
Shard::ReplicationClient::CurrentWalHandler::CurrentWalHandler(ReplicationClient *self)
: self_(self), stream_(self_->rpc_client_->Stream<replication::CurrentWalRpc>()) {}
void Storage::ReplicationClient::CurrentWalHandler::AppendFilename(const std::string &filename) {
void Shard::ReplicationClient::CurrentWalHandler::AppendFilename(const std::string &filename) {
replication::Encoder encoder(stream_.GetBuilder());
encoder.WriteString(filename);
}
void Storage::ReplicationClient::CurrentWalHandler::AppendSize(const size_t size) {
void Shard::ReplicationClient::CurrentWalHandler::AppendSize(const size_t size) {
replication::Encoder encoder(stream_.GetBuilder());
encoder.WriteUint(size);
}
void Storage::ReplicationClient::CurrentWalHandler::AppendFileData(utils::InputFile *file) {
void Shard::ReplicationClient::CurrentWalHandler::AppendFileData(utils::InputFile *file) {
replication::Encoder encoder(stream_.GetBuilder());
encoder.WriteFileData(file);
}
void Storage::ReplicationClient::CurrentWalHandler::AppendBufferData(const uint8_t *buffer, const size_t buffer_size) {
void Shard::ReplicationClient::CurrentWalHandler::AppendBufferData(const uint8_t *buffer, const size_t buffer_size) {
replication::Encoder encoder(stream_.GetBuilder());
encoder.WriteBuffer(buffer, buffer_size);
}
replication::CurrentWalRes Storage::ReplicationClient::CurrentWalHandler::Finalize() { return stream_.AwaitResponse(); }
replication::CurrentWalRes Shard::ReplicationClient::CurrentWalHandler::Finalize() { return stream_.AwaitResponse(); }
} // namespace memgraph::storage::v3

View File

@ -28,7 +28,7 @@
#include "storage/v3/replication/enums.hpp"
#include "storage/v3/replication/rpc.hpp"
#include "storage/v3/replication/serialization.hpp"
#include "storage/v3/storage.hpp"
#include "storage/v3/shard.hpp"
#include "utils/file.hpp"
#include "utils/file_locker.hpp"
#include "utils/spin_lock.hpp"
@ -37,9 +37,9 @@
namespace memgraph::storage::v3 {
class Storage::ReplicationClient {
class Shard::ReplicationClient {
public:
ReplicationClient(std::string name, Storage *storage, const io::network::Endpoint &endpoint,
ReplicationClient(std::string name, Shard *shard, const io::network::Endpoint &endpoint,
replication::ReplicationMode mode, const replication::ReplicationClientConfig &config = {});
// Handler used for transfering the current transaction.
@ -149,7 +149,7 @@ class Storage::ReplicationClient {
void HandleRpcFailure();
std::string name_;
Storage *storage_;
Shard *shard_;
std::optional<communication::ClientContext> rpc_context_;
std::optional<rpc::Client> rpc_client_;

View File

@ -40,9 +40,9 @@ std::pair<uint64_t, durability::WalDeltaData> ReadDelta(durability::BaseDecoder
};
} // namespace
Storage::ReplicationServer::ReplicationServer(Storage *storage, io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config)
: storage_(storage) {
Shard::ReplicationServer::ReplicationServer(Shard *shard, io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config)
: shard_(shard) {
// Create RPC server.
if (config.ssl) {
rpc_server_context_.emplace(config.ssl->key_file, config.ssl->cert_file, config.ssl->ca_file,
@ -84,21 +84,21 @@ Storage::ReplicationServer::ReplicationServer(Storage *storage, io::network::End
rpc_server_->Start();
}
void Storage::ReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
void Shard::ReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::HeartbeatReq req;
slk::Load(&req, req_reader);
replication::HeartbeatRes res{true, storage_->last_commit_timestamp_, storage_->epoch_id_};
replication::HeartbeatRes res{true, shard_->last_commit_timestamp_, shard_->epoch_id_};
slk::Save(res, res_builder);
}
void Storage::ReplicationServer::FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
void Shard::ReplicationServer::FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::FrequentHeartbeatReq req;
slk::Load(&req, req_reader);
replication::FrequentHeartbeatRes res{true};
slk::Save(res, res_builder);
}
void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
void Shard::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::AppendDeltasReq req;
slk::Load(&req, req_reader);
@ -107,25 +107,25 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
auto maybe_epoch_id = decoder.ReadString();
MG_ASSERT(maybe_epoch_id, "Invalid replication message");
if (*maybe_epoch_id != storage_->epoch_id_) {
storage_->epoch_history_.emplace_back(std::move(storage_->epoch_id_), storage_->last_commit_timestamp_);
storage_->epoch_id_ = std::move(*maybe_epoch_id);
if (*maybe_epoch_id != shard_->epoch_id_) {
shard_->epoch_history_.emplace_back(std::move(shard_->epoch_id_), shard_->last_commit_timestamp_);
shard_->epoch_id_ = std::move(*maybe_epoch_id);
}
if (storage_->wal_file_) {
if (req.seq_num > storage_->wal_file_->SequenceNumber() || *maybe_epoch_id != storage_->epoch_id_) {
storage_->wal_file_->FinalizeWal();
storage_->wal_file_.reset();
storage_->wal_seq_num_ = req.seq_num;
if (shard_->wal_file_) {
if (req.seq_num > shard_->wal_file_->SequenceNumber() || *maybe_epoch_id != shard_->epoch_id_) {
shard_->wal_file_->FinalizeWal();
shard_->wal_file_.reset();
shard_->wal_seq_num_ = req.seq_num;
} else {
MG_ASSERT(storage_->wal_file_->SequenceNumber() == req.seq_num, "Invalid sequence number of current wal file");
storage_->wal_seq_num_ = req.seq_num + 1;
MG_ASSERT(shard_->wal_file_->SequenceNumber() == req.seq_num, "Invalid sequence number of current wal file");
shard_->wal_seq_num_ = req.seq_num + 1;
}
} else {
storage_->wal_seq_num_ = req.seq_num;
shard_->wal_seq_num_ = req.seq_num;
}
if (req.previous_commit_timestamp != storage_->last_commit_timestamp_) {
if (req.previous_commit_timestamp != shard_->last_commit_timestamp_) {
// Empty the stream
bool transaction_complete = false;
while (!transaction_complete) {
@ -134,82 +134,82 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
transaction_complete = durability::IsWalDeltaDataTypeTransactionEnd(delta.type);
}
replication::AppendDeltasRes res{false, storage_->last_commit_timestamp_};
replication::AppendDeltasRes res{false, shard_->last_commit_timestamp_};
slk::Save(res, res_builder);
return;
}
ReadAndApplyDelta(&decoder);
replication::AppendDeltasRes res{true, storage_->last_commit_timestamp_};
replication::AppendDeltasRes res{true, shard_->last_commit_timestamp_};
slk::Save(res, res_builder);
}
void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
void Shard::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::SnapshotReq req;
slk::Load(&req, req_reader);
replication::Decoder decoder(req_reader);
utils::EnsureDirOrDie(storage_->snapshot_directory_);
utils::EnsureDirOrDie(shard_->snapshot_directory_);
const auto maybe_snapshot_path = decoder.ReadFile(storage_->snapshot_directory_);
const auto maybe_snapshot_path = decoder.ReadFile(shard_->snapshot_directory_);
MG_ASSERT(maybe_snapshot_path, "Failed to load snapshot!");
spdlog::info("Received snapshot saved to {}", *maybe_snapshot_path);
// Clear the database
storage_->vertices_.clear();
storage_->edges_.clear();
shard_->vertices_.clear();
shard_->edges_.clear();
storage_->constraints_ = Constraints();
storage_->indices_.label_index =
LabelIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items, storage_->schema_validator_);
storage_->indices_.label_property_index = LabelPropertyIndex(&storage_->indices_, &storage_->constraints_,
storage_->config_.items, storage_->schema_validator_);
shard_->constraints_ = Constraints();
shard_->indices_.label_index =
LabelIndex(&shard_->indices_, &shard_->constraints_, shard_->config_.items, shard_->schema_validator_);
shard_->indices_.label_property_index =
LabelPropertyIndex(&shard_->indices_, &shard_->constraints_, shard_->config_.items, shard_->schema_validator_);
try {
spdlog::debug("Loading snapshot");
auto recovered_snapshot = durability::RecoveredSnapshot{};
// durability::LoadSnapshot(*maybe_snapshot_path, &storage_->vertices_, &storage_->edges_,
// &storage_->epoch_history_,
// &storage_->name_id_mapper_, &storage_->edge_count_, storage_->config_.items);
// durability::LoadSnapshot(*maybe_snapshot_path, &shard_->vertices_, &shard_->edges_,
// &shard_->epoch_history_,
// &shard_->name_id_mapper_, &shard_->edge_count_, shard_->config_.items);
spdlog::debug("Snapshot loaded successfully");
// If this step is present it should always be the first step of
// the recovery so we use the UUID we read from snasphost
storage_->uuid_ = std::move(recovered_snapshot.snapshot_info.uuid);
storage_->epoch_id_ = std::move(recovered_snapshot.snapshot_info.epoch_id);
shard_->uuid_ = std::move(recovered_snapshot.snapshot_info.uuid);
shard_->epoch_id_ = std::move(recovered_snapshot.snapshot_info.epoch_id);
const auto &recovery_info = recovered_snapshot.recovery_info;
storage_->edge_id_ = recovery_info.next_edge_id;
storage_->timestamp_ = std::max(storage_->timestamp_, recovery_info.next_timestamp);
shard_->edge_id_ = recovery_info.next_edge_id;
shard_->timestamp_ = std::max(shard_->timestamp_, recovery_info.next_timestamp);
// durability::RecoverIndicesAndConstraints(recovered_snapshot.indices_constraints, &storage_->indices_,
// &storage_->constraints_, &storage_->vertices_);
// durability::RecoverIndicesAndConstraints(recovered_snapshot.indices_constraints, &shard_->indices_,
// &shard_->constraints_, &shard_->vertices_);
} catch (const durability::RecoveryFailure &e) {
LOG_FATAL("Couldn't load the snapshot because of: {}", e.what());
}
replication::SnapshotRes res{true, storage_->last_commit_timestamp_};
replication::SnapshotRes res{true, shard_->last_commit_timestamp_};
slk::Save(res, res_builder);
// Delete other durability files
auto snapshot_files = durability::GetSnapshotFiles(storage_->snapshot_directory_, storage_->uuid_);
auto snapshot_files = durability::GetSnapshotFiles(shard_->snapshot_directory_, shard_->uuid_);
for (const auto &[path, uuid, _] : snapshot_files) {
if (path != *maybe_snapshot_path) {
storage_->file_retainer_.DeleteFile(path);
shard_->file_retainer_.DeleteFile(path);
}
}
auto wal_files = durability::GetWalFiles(storage_->wal_directory_, storage_->uuid_);
auto wal_files = durability::GetWalFiles(shard_->wal_directory_, shard_->uuid_);
if (wal_files) {
for (const auto &wal_file : *wal_files) {
storage_->file_retainer_.DeleteFile(wal_file.path);
shard_->file_retainer_.DeleteFile(wal_file.path);
}
storage_->wal_file_.reset();
shard_->wal_file_.reset();
}
}
void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
void Shard::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::WalFilesReq req;
slk::Load(&req, req_reader);
@ -218,31 +218,31 @@ void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::B
replication::Decoder decoder(req_reader);
utils::EnsureDirOrDie(storage_->wal_directory_);
utils::EnsureDirOrDie(shard_->wal_directory_);
for (auto i = 0; i < wal_file_number; ++i) {
LoadWal(&decoder);
}
replication::WalFilesRes res{true, storage_->last_commit_timestamp_};
replication::WalFilesRes res{true, shard_->last_commit_timestamp_};
slk::Save(res, res_builder);
}
void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
void Shard::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::CurrentWalReq req;
slk::Load(&req, req_reader);
replication::Decoder decoder(req_reader);
utils::EnsureDirOrDie(storage_->wal_directory_);
utils::EnsureDirOrDie(shard_->wal_directory_);
LoadWal(&decoder);
replication::CurrentWalRes res{true, storage_->last_commit_timestamp_};
replication::CurrentWalRes res{true, shard_->last_commit_timestamp_};
slk::Save(res, res_builder);
}
void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) {
void Shard::ReplicationServer::LoadWal(replication::Decoder *decoder) {
const auto temp_wal_directory = std::filesystem::temp_directory_path() / "memgraph" / durability::kWalDirectory;
utils::EnsureDir(temp_wal_directory);
auto maybe_wal_path = decoder->ReadFile(temp_wal_directory);
@ -251,22 +251,22 @@ void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) {
try {
auto wal_info = durability::ReadWalInfo(*maybe_wal_path);
if (wal_info.seq_num == 0) {
storage_->uuid_ = wal_info.uuid;
shard_->uuid_ = wal_info.uuid;
}
if (wal_info.epoch_id != storage_->epoch_id_) {
storage_->epoch_history_.emplace_back(wal_info.epoch_id, storage_->last_commit_timestamp_);
storage_->epoch_id_ = std::move(wal_info.epoch_id);
if (wal_info.epoch_id != shard_->epoch_id_) {
shard_->epoch_history_.emplace_back(wal_info.epoch_id, shard_->last_commit_timestamp_);
shard_->epoch_id_ = std::move(wal_info.epoch_id);
}
if (storage_->wal_file_) {
if (storage_->wal_file_->SequenceNumber() != wal_info.seq_num) {
storage_->wal_file_->FinalizeWal();
storage_->wal_seq_num_ = wal_info.seq_num;
storage_->wal_file_.reset();
if (shard_->wal_file_) {
if (shard_->wal_file_->SequenceNumber() != wal_info.seq_num) {
shard_->wal_file_->FinalizeWal();
shard_->wal_seq_num_ = wal_info.seq_num;
shard_->wal_file_.reset();
}
} else {
storage_->wal_seq_num_ = wal_info.seq_num;
shard_->wal_seq_num_ = wal_info.seq_num;
}
durability::Decoder wal;
@ -285,20 +285,20 @@ void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) {
}
}
Storage::ReplicationServer::~ReplicationServer() {
Shard::ReplicationServer::~ReplicationServer() {
if (rpc_server_) {
rpc_server_->Shutdown();
rpc_server_->AwaitShutdown();
}
}
uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *decoder) {
auto edge_acc = storage_->edges_.access();
// auto vertex_acc = storage_->vertices_.access();
uint64_t Shard::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *decoder) {
auto edge_acc = shard_->edges_.access();
// auto vertex_acc = shard_->vertices_.access();
std::optional<std::pair<uint64_t, Storage::Accessor>> commit_timestamp_and_accessor;
std::optional<std::pair<uint64_t, Shard::Accessor>> commit_timestamp_and_accessor;
// auto get_transaction = [this, &commit_timestamp_and_accessor](uint64_t commit_timestamp) {
// if (!commit_timestamp_and_accessor) {
// commit_timestamp_and_accessor.emplace(commit_timestamp, storage_->Access());
// commit_timestamp_and_accessor.emplace(commit_timestamp, shard_->Access());
// } else if (commit_timestamp_and_accessor->first != commit_timestamp) {
// throw utils::BasicException("Received more than one transaction!");
// }
@ -306,7 +306,7 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
// };
uint64_t applied_deltas = 0;
auto max_commit_timestamp = storage_->last_commit_timestamp_;
auto max_commit_timestamp = shard_->last_commit_timestamp_;
for (bool transaction_complete = false; !transaction_complete; ++applied_deltas) {
const auto [timestamp, delta] = ReadDelta(decoder);
@ -316,7 +316,7 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
transaction_complete = durability::IsWalDeltaDataTypeTransactionEnd(delta.type);
if (timestamp < storage_->timestamp_) {
if (timestamp < shard_->timestamp_) {
continue;
}
@ -406,12 +406,12 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
// spdlog::trace(" Edge {} set property {} to {}", delta.vertex_edge_set_property.gid.AsUint(),
// delta.vertex_edge_set_property.property, delta.vertex_edge_set_property.value);
// if (!storage_->config_.items.properties_on_edges)
// if (!shard_->config_.items.properties_on_edges)
// throw utils::BasicException(
// "Can't set properties on edges because properties on edges "
// "are disabled!");
// auto *transaction = get_transaction(timestamp);
// // auto *transaction = get_transaction(timestamp);
// // The following block of code effectively implements `FindEdge` and
// // yields an accessor that is only valid for managing the edge's
@ -456,10 +456,10 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
// nullptr,
// nullptr,
// &transaction->transaction_,
// &storage_->indices_,
// &storage_->constraints_,
// storage_->config_.items,
// storage_->schema_validator_};
// &shard_->indices_,
// &shard_->constraints_,
// shard_->config_.items,
// shard_->schema_validator_};
// auto ret = ea.SetProperty(transaction->NameToProperty(delta.vertex_edge_set_property.property),
// delta.vertex_edge_set_property.value);
@ -481,14 +481,14 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
// spdlog::trace(" Create label index on :{}", delta.operation_label.label);
// // Need to send the timestamp
// if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid transaction!");
// if (!storage_->CreateIndex(storage_->NameToLabel(delta.operation_label.label), timestamp))
// if (!shard_->CreateIndex(shard_->NameToLabel(delta.operation_label.label), timestamp))
// throw utils::BasicException("Invalid transaction!");
// break;
// }
// case durability::WalDeltaData::Type::LABEL_INDEX_DROP: {
// spdlog::trace(" Drop label index on :{}", delta.operation_label.label);
// if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid transaction!");
// if (!storage_->DropIndex(storage_->NameToLabel(delta.operation_label.label), timestamp))
// if (!shard_->DropIndex(shard_->NameToLabel(delta.operation_label.label), timestamp))
// throw utils::BasicException("Invalid transaction!");
// break;
// }
@ -496,8 +496,8 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
// spdlog::trace(" Create label+property index on :{} ({})", delta.operation_label_property.label,
// delta.operation_label_property.property);
// if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid transaction!");
// if (!storage_->CreateIndex(storage_->NameToLabel(delta.operation_label_property.label),
// storage_->NameToProperty(delta.operation_label_property.property), timestamp))
// if (!shard_->CreateIndex(shard_->NameToLabel(delta.operation_label_property.label),
// shard_->NameToProperty(delta.operation_label_property.property), timestamp))
// throw utils::BasicException("Invalid transaction!");
// break;
// }
@ -505,8 +505,8 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
// spdlog::trace(" Drop label+property index on :{} ({})", delta.operation_label_property.label,
// delta.operation_label_property.property);
// if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid transaction!");
// if (!storage_->DropIndex(storage_->NameToLabel(delta.operation_label_property.label),
// storage_->NameToProperty(delta.operation_label_property.property), timestamp))
// if (!shard_->DropIndex(shard_->NameToLabel(delta.operation_label_property.label),
// shard_->NameToProperty(delta.operation_label_property.property), timestamp))
// throw utils::BasicException("Invalid transaction!");
// break;
// }
@ -514,9 +514,9 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
// spdlog::trace(" Create existence constraint on :{} ({})", delta.operation_label_property.label,
// delta.operation_label_property.property);
// if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid transaction!");
// auto ret = storage_->CreateExistenceConstraint(
// storage_->NameToLabel(delta.operation_label_property.label),
// storage_->NameToProperty(delta.operation_label_property.property), timestamp);
// auto ret = shard_->CreateExistenceConstraint(
// shard_->NameToLabel(delta.operation_label_property.label),
// shard_->NameToProperty(delta.operation_label_property.property), timestamp);
// if (!ret.HasValue() || !ret.GetValue()) throw utils::BasicException("Invalid transaction!");
// break;
// }
@ -524,8 +524,8 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
// spdlog::trace(" Drop existence constraint on :{} ({})", delta.operation_label_property.label,
// delta.operation_label_property.property);
// if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid transaction!");
// if (!storage_->DropExistenceConstraint(storage_->NameToLabel(delta.operation_label_property.label),
// storage_->NameToProperty(delta.operation_label_property.property),
// if (!shard_->DropExistenceConstraint(shard_->NameToLabel(delta.operation_label_property.label),
// shard_->NameToProperty(delta.operation_label_property.property),
// timestamp))
// throw utils::BasicException("Invalid transaction!");
// break;
@ -537,9 +537,9 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
// ss.str()); if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid transaction!");
// std::set<PropertyId> properties;
// for (const auto &prop : delta.operation_label_properties.properties) {
// properties.emplace(storage_->NameToProperty(prop));
// properties.emplace(shard_->NameToProperty(prop));
// }
// auto ret = storage_->CreateUniqueConstraint(storage_->NameToLabel(delta.operation_label_properties.label),
// auto ret = shard_->CreateUniqueConstraint(shard_->NameToLabel(delta.operation_label_properties.label),
// properties, timestamp);
// if (!ret.HasValue() || ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS)
// throw utils::BasicException("Invalid transaction!");
@ -552,9 +552,9 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
// ss.str()); if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid transaction!");
// std::set<PropertyId> properties;
// for (const auto &prop : delta.operation_label_properties.properties) {
// properties.emplace(storage_->NameToProperty(prop));
// properties.emplace(shard_->NameToProperty(prop));
// }
// auto ret = storage_->DropUniqueConstraint(storage_->NameToLabel(delta.operation_label_properties.label),
// auto ret = shard_->DropUniqueConstraint(shard_->NameToLabel(delta.operation_label_properties.label),
// properties, timestamp);
// if (ret != UniqueConstraints::DeletionStatus::SUCCESS) throw utils::BasicException("Invalid transaction!");
// break;
@ -564,7 +564,7 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid data!");
storage_->last_commit_timestamp_ = max_commit_timestamp;
shard_->last_commit_timestamp_ = max_commit_timestamp;
return applied_deltas;
}

View File

@ -11,13 +11,13 @@
#pragma once
#include "storage/v3/storage.hpp"
#include "storage/v3/shard.hpp"
namespace memgraph::storage::v3 {
class Storage::ReplicationServer {
class Shard::ReplicationServer {
public:
explicit ReplicationServer(Storage *storage, io::network::Endpoint endpoint,
explicit ReplicationServer(Shard *shard, io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config);
ReplicationServer(const ReplicationServer &) = delete;
ReplicationServer(ReplicationServer &&) = delete;
@ -41,7 +41,7 @@ class Storage::ReplicationServer {
std::optional<communication::ServerContext> rpc_server_context_;
std::optional<rpc::Server> rpc_server_;
Storage *storage_;
Shard *shard_;
};
} // namespace memgraph::storage::v3

1849
src/storage/v3/shard.cpp Normal file

File diff suppressed because it is too large Load Diff

617
src/storage/v3/shard.hpp Normal file
View File

@ -0,0 +1,617 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <atomic>
#include <cstdint>
#include <filesystem>
#include <map>
#include <numeric>
#include <optional>
#include <shared_mutex>
#include <variant>
#include <vector>
#include "io/network/endpoint.hpp"
#include "kvstore/kvstore.hpp"
#include "storage/v3/commit_log.hpp"
#include "storage/v3/config.hpp"
#include "storage/v3/constraints.hpp"
#include "storage/v3/durability/metadata.hpp"
#include "storage/v3/durability/wal.hpp"
#include "storage/v3/edge.hpp"
#include "storage/v3/edge_accessor.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/indices.hpp"
#include "storage/v3/isolation_level.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/lexicographically_ordered_vertex.hpp"
#include "storage/v3/mvcc.hpp"
#include "storage/v3/name_id_mapper.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/result.hpp"
#include "storage/v3/schema_validator.hpp"
#include "storage/v3/schemas.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex.hpp"
#include "storage/v3/vertex_accessor.hpp"
#include "storage/v3/vertices_skip_list.hpp"
#include "utils/exceptions.hpp"
#include "utils/file_locker.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/rw_lock.hpp"
#include "utils/scheduler.hpp"
#include "utils/skip_list.hpp"
#include "utils/synchronized.hpp"
#include "utils/uuid.hpp"
/// REPLICATION ///
#include "rpc/server.hpp"
#include "storage/v3/replication/config.hpp"
#include "storage/v3/replication/enums.hpp"
#include "storage/v3/replication/rpc.hpp"
#include "storage/v3/replication/serialization.hpp"
namespace memgraph::storage::v3 {
// The storage is based on this paper:
// https://db.in.tum.de/~muehlbau/papers/mvcc.pdf
// The paper implements a fully serializable storage, in our implementation we
// only implement snapshot isolation for transactions.
/// Iterable for iterating through all vertices of a Storage.
///
/// An instance of this will be usually be wrapped inside VerticesIterable for
/// generic, public use.
class AllVerticesIterable final {
VerticesSkipList::Accessor vertices_accessor_;
Transaction *transaction_;
View view_;
Indices *indices_;
Constraints *constraints_;
Config::Items config_;
const SchemaValidator *schema_validator_;
const Schemas *schemas_;
std::optional<VertexAccessor> vertex_;
public:
class Iterator final {
AllVerticesIterable *self_;
VerticesSkipList::Iterator it_;
public:
Iterator(AllVerticesIterable *self, VerticesSkipList::Iterator it);
VertexAccessor operator*() const;
Iterator &operator++();
bool operator==(const Iterator &other) const { return self_ == other.self_ && it_ == other.it_; }
bool operator!=(const Iterator &other) const { return !(*this == other); }
};
AllVerticesIterable(VerticesSkipList::Accessor vertices_accessor, Transaction *transaction, View view,
Indices *indices, Constraints *constraints, Config::Items config,
const SchemaValidator &schema_validator)
: vertices_accessor_(std::move(vertices_accessor)),
transaction_(transaction),
view_(view),
indices_(indices),
constraints_(constraints),
config_(config),
schema_validator_{&schema_validator} {}
Iterator begin() { return {this, vertices_accessor_.begin()}; }
Iterator end() { return {this, vertices_accessor_.end()}; }
};
/// Generic access to different kinds of vertex iterations.
///
/// This class should be the primary type used by the client code to iterate
/// over vertices inside a Storage instance.
class VerticesIterable final {
enum class Type { ALL, BY_LABEL, BY_LABEL_PROPERTY };
Type type_;
union {
AllVerticesIterable all_vertices_;
LabelIndex::Iterable vertices_by_label_;
LabelPropertyIndex::Iterable vertices_by_label_property_;
};
public:
explicit VerticesIterable(AllVerticesIterable);
explicit VerticesIterable(LabelIndex::Iterable);
explicit VerticesIterable(LabelPropertyIndex::Iterable);
VerticesIterable(const VerticesIterable &) = delete;
VerticesIterable &operator=(const VerticesIterable &) = delete;
VerticesIterable(VerticesIterable &&) noexcept;
VerticesIterable &operator=(VerticesIterable &&) noexcept;
~VerticesIterable();
class Iterator final {
Type type_;
union {
AllVerticesIterable::Iterator all_it_;
LabelIndex::Iterable::Iterator by_label_it_;
LabelPropertyIndex::Iterable::Iterator by_label_property_it_;
};
void Destroy() noexcept;
public:
explicit Iterator(AllVerticesIterable::Iterator);
explicit Iterator(LabelIndex::Iterable::Iterator);
explicit Iterator(LabelPropertyIndex::Iterable::Iterator);
Iterator(const Iterator &);
Iterator &operator=(const Iterator &);
Iterator(Iterator &&) noexcept;
Iterator &operator=(Iterator &&) noexcept;
~Iterator();
VertexAccessor operator*() const;
Iterator &operator++();
bool operator==(const Iterator &other) const;
bool operator!=(const Iterator &other) const { return !(*this == other); }
};
Iterator begin();
Iterator end();
};
/// Structure used to return information about existing indices in the storage.
struct IndicesInfo {
std::vector<LabelId> label;
std::vector<std::pair<LabelId, PropertyId>> label_property;
};
/// Structure used to return information about existing constraints in the
/// storage.
struct ConstraintsInfo {
std::vector<std::pair<LabelId, PropertyId>> existence;
std::vector<std::pair<LabelId, std::set<PropertyId>>> unique;
};
/// Structure used to return information about existing schemas in the storage
struct SchemasInfo {
Schemas::SchemasList schemas;
};
/// Structure used to return information about the storage.
struct StorageInfo {
uint64_t vertex_count;
uint64_t edge_count;
double average_degree;
uint64_t memory_usage;
uint64_t disk_usage;
};
enum class ReplicationRole : uint8_t { MAIN, REPLICA };
class Shard final {
public:
/// @throw std::system_error
/// @throw std::bad_alloc
explicit Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
Config config = Config());
Shard(const Shard &) = delete;
Shard(Shard &&) noexcept = delete;
Shard &operator=(const Shard &) = delete;
Shard operator=(Shard &&) noexcept = delete;
~Shard();
class Accessor final {
private:
friend class Shard;
explicit Accessor(Shard *shard, IsolationLevel isolation_level);
public:
Accessor(const Accessor &) = delete;
Accessor &operator=(const Accessor &) = delete;
Accessor &operator=(Accessor &&other) = delete;
// NOTE: After the accessor is moved, all objects derived from it (accessors
// and iterators) are *invalid*. You have to get all derived objects again.
Accessor(Accessor &&other) noexcept;
~Accessor();
/// @throw std::bad_alloc
ResultSchema<VertexAccessor> CreateVertexAndValidate(
LabelId primary_label, const std::vector<LabelId> &labels,
const std::vector<std::pair<PropertyId, PropertyValue>> &properties);
std::optional<VertexAccessor> FindVertex(std::vector<PropertyValue> primary_key, View view);
VerticesIterable Vertices(View view) {
return VerticesIterable(AllVerticesIterable(shard_->vertices_.access(), &transaction_, view, &shard_->indices_,
&shard_->constraints_, shard_->config_.items,
shard_->schema_validator_));
}
VerticesIterable Vertices(LabelId label, View view);
VerticesIterable Vertices(LabelId label, PropertyId property, View view);
VerticesIterable Vertices(LabelId label, PropertyId property, const PropertyValue &value, View view);
VerticesIterable Vertices(LabelId label, PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view);
/// Return approximate number of all vertices in the database.
/// Note that this is always an over-estimate and never an under-estimate.
int64_t ApproximateVertexCount() const { return static_cast<int64_t>(shard_->vertices_.size()); }
/// Return approximate number of vertices with the given label.
/// Note that this is always an over-estimate and never an under-estimate.
int64_t ApproximateVertexCount(LabelId label) const {
return shard_->indices_.label_index.ApproximateVertexCount(label);
}
/// Return approximate number of vertices with the given label and property.
/// Note that this is always an over-estimate and never an under-estimate.
int64_t ApproximateVertexCount(LabelId label, PropertyId property) const {
return shard_->indices_.label_property_index.ApproximateVertexCount(label, property);
}
/// Return approximate number of vertices with the given label and the given
/// value for the given property. Note that this is always an over-estimate
/// and never an under-estimate.
int64_t ApproximateVertexCount(LabelId label, PropertyId property, const PropertyValue &value) const {
return shard_->indices_.label_property_index.ApproximateVertexCount(label, property, value);
}
/// Return approximate number of vertices with the given label and value for
/// the given property in the range defined by provided upper and lower
/// bounds.
int64_t ApproximateVertexCount(LabelId label, PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower,
const std::optional<utils::Bound<PropertyValue>> &upper) const {
return shard_->indices_.label_property_index.ApproximateVertexCount(label, property, lower, upper);
}
/// @return Accessor to the deleted vertex if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc
Result<std::optional<VertexAccessor>> DeleteVertex(VertexAccessor *vertex);
/// @return Accessor to the deleted vertex and deleted edges if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc
Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> DetachDeleteVertex(
VertexAccessor *vertex);
/// @throw std::bad_alloc
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type);
/// Accessor to the deleted edge if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc
Result<std::optional<EdgeAccessor>> DeleteEdge(EdgeAccessor *edge);
const std::string &LabelToName(LabelId label) const;
const std::string &PropertyToName(PropertyId property) const;
const std::string &EdgeTypeToName(EdgeTypeId edge_type) const;
bool LabelIndexExists(LabelId label) const { return shard_->indices_.label_index.IndexExists(label); }
bool LabelPropertyIndexExists(LabelId label, PropertyId property) const {
return shard_->indices_.label_property_index.IndexExists(label, property);
}
IndicesInfo ListAllIndices() const {
return {shard_->indices_.label_index.ListIndices(), shard_->indices_.label_property_index.ListIndices()};
}
ConstraintsInfo ListAllConstraints() const {
return {ListExistenceConstraints(shard_->constraints_),
shard_->constraints_.unique_constraints.ListConstraints()};
}
const SchemaValidator &GetSchemaValidator() const;
SchemasInfo ListAllSchemas() const { return {shard_->schemas_.ListSchemas()}; }
void AdvanceCommand();
/// Commit returns `ConstraintViolation` if the changes made by this
/// transaction violate an existence or unique constraint. In that case the
/// transaction is automatically aborted. Otherwise, void is returned.
/// @throw std::bad_alloc
utils::BasicResult<ConstraintViolation, void> Commit(std::optional<uint64_t> desired_commit_timestamp = {});
/// @throw std::bad_alloc
void Abort();
void FinalizeTransaction();
private:
/// @throw std::bad_alloc
VertexAccessor CreateVertex(Gid gid, LabelId primary_label);
/// @throw std::bad_alloc
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, Gid gid);
Shard *shard_;
Transaction transaction_;
std::optional<uint64_t> commit_timestamp_;
bool is_transaction_active_;
Config::Items config_;
};
Accessor Access(std::optional<IsolationLevel> override_isolation_level = {}) {
return Accessor{this, override_isolation_level.value_or(isolation_level_)};
}
const std::string &LabelToName(LabelId label) const;
const std::string &PropertyToName(PropertyId property) const;
const std::string &EdgeTypeToName(EdgeTypeId edge_type) const;
/// @throw std::bad_alloc if unable to insert a new mapping
LabelId NameToLabel(std::string_view name);
/// @throw std::bad_alloc if unable to insert a new mapping
PropertyId NameToProperty(std::string_view name);
/// @throw std::bad_alloc if unable to insert a new mapping
EdgeTypeId NameToEdgeType(std::string_view name);
/// @throw std::bad_alloc
bool CreateIndex(LabelId label, std::optional<uint64_t> desired_commit_timestamp = {});
/// @throw std::bad_alloc
bool CreateIndex(LabelId label, PropertyId property, std::optional<uint64_t> desired_commit_timestamp = {});
bool DropIndex(LabelId label, std::optional<uint64_t> desired_commit_timestamp = {});
bool DropIndex(LabelId label, PropertyId property, std::optional<uint64_t> desired_commit_timestamp = {});
IndicesInfo ListAllIndices() const;
/// Creates an existence constraint. Returns true if the constraint was
/// successfully added, false if it already exists and a `ConstraintViolation`
/// if there is an existing vertex violating the constraint.
///
/// @throw std::bad_alloc
/// @throw std::length_error
utils::BasicResult<ConstraintViolation, bool> CreateExistenceConstraint(
LabelId label, PropertyId property, std::optional<uint64_t> desired_commit_timestamp = {});
/// Removes an existence constraint. Returns true if the constraint was
/// removed, and false if it doesn't exist.
bool DropExistenceConstraint(LabelId label, PropertyId property,
std::optional<uint64_t> desired_commit_timestamp = {});
/// Creates a unique constraint. In the case of two vertices violating the
/// constraint, it returns `ConstraintViolation`. Otherwise returns a
/// `UniqueConstraints::CreationStatus` enum with the following possibilities:
/// * `SUCCESS` if the constraint was successfully created,
/// * `ALREADY_EXISTS` if the constraint already existed,
/// * `EMPTY_PROPERTIES` if the property set is empty, or
// * `PROPERTIES_SIZE_LIMIT_EXCEEDED` if the property set exceeds the
// limit of maximum number of properties.
///
/// @throw std::bad_alloc
utils::BasicResult<ConstraintViolation, UniqueConstraints::CreationStatus> CreateUniqueConstraint(
LabelId label, const std::set<PropertyId> &properties, std::optional<uint64_t> desired_commit_timestamp = {});
/// Removes a unique constraint. Returns `UniqueConstraints::DeletionStatus`
/// enum with the following possibilities:
/// * `SUCCESS` if constraint was successfully removed,
/// * `NOT_FOUND` if the specified constraint was not found,
/// * `EMPTY_PROPERTIES` if the property set is empty, or
/// * `PROPERTIES_SIZE_LIMIT_EXCEEDED` if the property set exceeds the
// limit of maximum number of properties.
UniqueConstraints::DeletionStatus DropUniqueConstraint(LabelId label, const std::set<PropertyId> &properties,
std::optional<uint64_t> desired_commit_timestamp = {});
ConstraintsInfo ListAllConstraints() const;
SchemasInfo ListAllSchemas() const;
const Schemas::Schema *GetSchema(LabelId primary_label) const;
bool CreateSchema(LabelId primary_label, const std::vector<SchemaProperty> &schemas_types);
bool DropSchema(LabelId primary_label);
StorageInfo GetInfo() const;
bool LockPath();
bool UnlockPath();
bool SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config = {});
bool SetMainReplicationRole();
enum class RegisterReplicaError : uint8_t {
NAME_EXISTS,
END_POINT_EXISTS,
CONNECTION_FAILED,
COULD_NOT_BE_PERSISTED
};
/// @pre The instance should have a MAIN role
/// @pre Timeout can only be set for SYNC replication
utils::BasicResult<RegisterReplicaError, void> RegisterReplica(
std::string name, io::network::Endpoint endpoint, replication::ReplicationMode replication_mode,
const replication::ReplicationClientConfig &config = {});
/// @pre The instance should have a MAIN role
bool UnregisterReplica(std::string_view name);
std::optional<replication::ReplicaState> GetReplicaState(std::string_view name);
ReplicationRole GetReplicationRole() const;
struct ReplicaInfo {
std::string name;
replication::ReplicationMode mode;
std::optional<double> timeout;
io::network::Endpoint endpoint;
replication::ReplicaState state;
};
std::vector<ReplicaInfo> ReplicasInfo();
void FreeMemory();
void SetIsolationLevel(IsolationLevel isolation_level);
enum class CreateSnapshotError : uint8_t { DisabledForReplica };
utils::BasicResult<CreateSnapshotError> CreateSnapshot();
private:
Transaction CreateTransaction(IsolationLevel isolation_level);
/// The force parameter determines the behaviour of the garbage collector.
/// If it's set to true, it will behave as a global operation, i.e. it can't
/// be part of a transaction, and no other transaction can be active at the same time.
/// This allows it to delete immediately vertices without worrying that some other
/// transaction is possibly using it. If there are active transactions when this method
/// is called with force set to true, it will fallback to the same method with the force
/// set to false.
/// If it's set to false, it will execute in parallel with other transactions, ensuring
/// that no object in use can be deleted.
/// @throw std::system_error
/// @throw std::bad_alloc
template <bool force>
void CollectGarbage();
bool InitializeWalFile();
void FinalizeWalFile();
void AppendToWal(const Transaction &transaction, uint64_t final_commit_timestamp);
void AppendToWal(durability::StorageGlobalOperation operation, LabelId label, const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp);
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
// Main object storage
NameIdMapper name_id_mapper_;
LabelId primary_label_;
PrimaryKey min_primary_key_;
std::optional<PrimaryKey> max_primary_key_;
VerticesSkipList vertices_;
utils::SkipList<Edge> edges_;
uint64_t edge_id_{0};
// Even though the edge count is already kept in the `edges_` SkipList, the
// list is used only when properties are enabled for edges. Because of that we
// keep a separate count of edges that is always updated.
uint64_t edge_count_{0};
SchemaValidator schema_validator_;
Constraints constraints_;
Indices indices_;
Schemas schemas_;
// Transaction engine
uint64_t timestamp_{kTimestampInitialId};
uint64_t transaction_id_{kTransactionInitialId};
// TODO: This isn't really a commit log, it doesn't even care if a
// transaction commited or aborted. We could probably combine this with
// `timestamp_` in a sensible unit, something like TransactionClock or
// whatever.
std::optional<CommitLog> commit_log_;
std::list<Transaction> committed_transactions_;
IsolationLevel isolation_level_;
Config config_;
// Undo buffers that were unlinked and now are waiting to be freed.
std::list<std::pair<uint64_t, std::list<Delta>>> garbage_undo_buffers_;
// Vertices that are logically deleted but still have to be removed from
// indices before removing them from the main storage.
std::list<PrimaryKey> deleted_vertices_;
// Vertices that are logically deleted and removed from indices and now wait
// to be removed from the main storage.
std::list<std::pair<uint64_t, PrimaryKey>> garbage_vertices_;
// Edges that are logically deleted and wait to be removed from the main
// storage.
std::list<Gid> deleted_edges_;
// Durability
std::filesystem::path snapshot_directory_;
std::filesystem::path wal_directory_;
std::filesystem::path lock_file_path_;
utils::OutputFile lock_file_handle_;
// UUID used to distinguish snapshots and to link snapshots to WALs
std::string uuid_;
// Sequence number used to keep track of the chain of WALs.
uint64_t wal_seq_num_{0};
// UUID to distinguish different main instance runs for replication process
// on SAME storage.
// Multiple instances can have same storage UUID and be MAIN at the same time.
// We cannot compare commit timestamps of those instances if one of them
// becomes the replica of the other so we use epoch_id_ as additional
// discriminating property.
// Example of this:
// We have 2 instances of the same storage, S1 and S2.
// S1 and S2 are MAIN and accept their own commits and write them to the WAL.
// At the moment when S1 commited a transaction with timestamp 20, and S2
// a different transaction with timestamp 15, we change S2's role to REPLICA
// and register it on S1.
// Without using the epoch_id, we don't know that S1 and S2 have completely
// different transactions, we think that the S2 is behind only by 5 commits.
std::string epoch_id_;
// History of the previous epoch ids.
// Each value consists of the epoch id along the last commit belonging to that
// epoch.
std::deque<std::pair<std::string, uint64_t>> epoch_history_;
std::optional<durability::WalFile> wal_file_;
uint64_t wal_unsynced_transactions_{0};
utils::FileRetainer file_retainer_;
// Global locker that is used for clients file locking
utils::FileRetainer::FileLocker global_locker_;
// Last commited timestamp
uint64_t last_commit_timestamp_{kTimestampInitialId};
class ReplicationServer;
std::unique_ptr<ReplicationServer> replication_server_{nullptr};
class ReplicationClient;
// We create ReplicationClient using unique_ptr so we can move
// newly created client into the vector.
// We cannot move the client directly because it contains ThreadPool
// which cannot be moved. Also, the move is necessary because
// we don't want to create the client directly inside the vector
// because that would require the lock on the list putting all
// commits (they iterate list of clients) to halt.
// This way we can initialize client in main thread which means
// that we can immediately notify the user if the initialization
// failed.
using ReplicationClientList = utils::Synchronized<std::vector<std::unique_ptr<ReplicationClient>>, utils::SpinLock>;
ReplicationClientList replication_clients_;
ReplicationRole replication_role_{ReplicationRole::MAIN};
};
} // namespace memgraph::storage::v3

File diff suppressed because it is too large Load Diff

View File

@ -11,609 +11,24 @@
#pragma once
#include <atomic>
#include <cstdint>
#include <filesystem>
#include <map>
#include <numeric>
#include <optional>
#include <shared_mutex>
#include <variant>
#include <vector>
#include "io/network/endpoint.hpp"
#include "kvstore/kvstore.hpp"
#include "storage/v3/commit_log.hpp"
#include "storage/v3/config.hpp"
#include "storage/v3/constraints.hpp"
#include "storage/v3/durability/metadata.hpp"
#include "storage/v3/durability/wal.hpp"
#include "storage/v3/edge.hpp"
#include "storage/v3/edge_accessor.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/indices.hpp"
#include "storage/v3/isolation_level.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/lexicographically_ordered_vertex.hpp"
#include "storage/v3/mvcc.hpp"
#include "storage/v3/name_id_mapper.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/result.hpp"
#include "storage/v3/schema_validator.hpp"
#include "storage/v3/schemas.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex.hpp"
#include "storage/v3/vertex_accessor.hpp"
#include "storage/v3/vertices_skip_list.hpp"
#include "utils/exceptions.hpp"
#include "utils/file_locker.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/rw_lock.hpp"
#include "utils/scheduler.hpp"
#include "utils/skip_list.hpp"
#include "utils/synchronized.hpp"
#include "utils/uuid.hpp"
#include <boost/asio/thread_pool.hpp>
/// REPLICATION ///
#include "rpc/server.hpp"
#include "storage/v3/replication/config.hpp"
#include "storage/v3/replication/enums.hpp"
#include "storage/v3/replication/rpc.hpp"
#include "storage/v3/replication/serialization.hpp"
#include "storage/v3/shard.hpp"
namespace memgraph::storage::v3 {
// The storage is based on this paper:
// https://db.in.tum.de/~muehlbau/papers/mvcc.pdf
// The paper implements a fully serializable storage, in our implementation we
// only implement snapshot isolation for transactions.
/// Iterable for iterating through all vertices of a Storage.
///
/// An instance of this will be usually be wrapped inside VerticesIterable for
/// generic, public use.
class AllVerticesIterable final {
VerticesSkipList::Accessor vertices_accessor_;
Transaction *transaction_;
View view_;
Indices *indices_;
Constraints *constraints_;
Config::Items config_;
const SchemaValidator *schema_validator_;
const Schemas *schemas_;
std::optional<VertexAccessor> vertex_;
class Storage {
public:
class Iterator final {
AllVerticesIterable *self_;
VerticesSkipList::Iterator it_;
public:
Iterator(AllVerticesIterable *self, VerticesSkipList::Iterator it);
VertexAccessor operator*() const;
Iterator &operator++();
bool operator==(const Iterator &other) const { return self_ == other.self_ && it_ == other.it_; }
bool operator!=(const Iterator &other) const { return !(*this == other); }
};
AllVerticesIterable(VerticesSkipList::Accessor vertices_accessor, Transaction *transaction, View view,
Indices *indices, Constraints *constraints, Config::Items config,
const SchemaValidator &schema_validator)
: vertices_accessor_(std::move(vertices_accessor)),
transaction_(transaction),
view_(view),
indices_(indices),
constraints_(constraints),
config_(config),
schema_validator_{&schema_validator} {}
Iterator begin() { return {this, vertices_accessor_.begin()}; }
Iterator end() { return {this, vertices_accessor_.end()}; }
};
/// Generic access to different kinds of vertex iterations.
///
/// This class should be the primary type used by the client code to iterate
/// over vertices inside a Storage instance.
class VerticesIterable final {
enum class Type { ALL, BY_LABEL, BY_LABEL_PROPERTY };
Type type_;
union {
AllVerticesIterable all_vertices_;
LabelIndex::Iterable vertices_by_label_;
LabelPropertyIndex::Iterable vertices_by_label_property_;
};
public:
explicit VerticesIterable(AllVerticesIterable);
explicit VerticesIterable(LabelIndex::Iterable);
explicit VerticesIterable(LabelPropertyIndex::Iterable);
VerticesIterable(const VerticesIterable &) = delete;
VerticesIterable &operator=(const VerticesIterable &) = delete;
VerticesIterable(VerticesIterable &&) noexcept;
VerticesIterable &operator=(VerticesIterable &&) noexcept;
~VerticesIterable();
class Iterator final {
Type type_;
union {
AllVerticesIterable::Iterator all_it_;
LabelIndex::Iterable::Iterator by_label_it_;
LabelPropertyIndex::Iterable::Iterator by_label_property_it_;
};
void Destroy() noexcept;
public:
explicit Iterator(AllVerticesIterable::Iterator);
explicit Iterator(LabelIndex::Iterable::Iterator);
explicit Iterator(LabelPropertyIndex::Iterable::Iterator);
Iterator(const Iterator &);
Iterator &operator=(const Iterator &);
Iterator(Iterator &&) noexcept;
Iterator &operator=(Iterator &&) noexcept;
~Iterator();
VertexAccessor operator*() const;
Iterator &operator++();
bool operator==(const Iterator &other) const;
bool operator!=(const Iterator &other) const { return !(*this == other); }
};
Iterator begin();
Iterator end();
};
/// Structure used to return information about existing indices in the storage.
struct IndicesInfo {
std::vector<LabelId> label;
std::vector<std::pair<LabelId, PropertyId>> label_property;
};
/// Structure used to return information about existing constraints in the
/// storage.
struct ConstraintsInfo {
std::vector<std::pair<LabelId, PropertyId>> existence;
std::vector<std::pair<LabelId, std::set<PropertyId>>> unique;
};
/// Structure used to return information about existing schemas in the storage
struct SchemasInfo {
Schemas::SchemasList schemas;
};
/// Structure used to return information about the storage.
struct StorageInfo {
uint64_t vertex_count;
uint64_t edge_count;
double average_degree;
uint64_t memory_usage;
uint64_t disk_usage;
};
enum class ReplicationRole : uint8_t { MAIN, REPLICA };
class Storage final {
public:
/// @throw std::system_error
/// @throw std::bad_alloc
explicit Storage(Config config = Config());
~Storage();
class Accessor final {
private:
friend class Storage;
explicit Accessor(Storage *storage, IsolationLevel isolation_level);
public:
Accessor(const Accessor &) = delete;
Accessor &operator=(const Accessor &) = delete;
Accessor &operator=(Accessor &&other) = delete;
// NOTE: After the accessor is moved, all objects derived from it (accessors
// and iterators) are *invalid*. You have to get all derived objects again.
Accessor(Accessor &&other) noexcept;
~Accessor();
/// @throw std::bad_alloc
ResultSchema<VertexAccessor> CreateVertexAndValidate(
LabelId primary_label, const std::vector<LabelId> &labels,
const std::vector<std::pair<PropertyId, PropertyValue>> &properties);
std::optional<VertexAccessor> FindVertex(std::vector<PropertyValue> primary_key, View view);
VerticesIterable Vertices(View view) {
return VerticesIterable(AllVerticesIterable(storage_->vertices_.access(), &transaction_, view,
&storage_->indices_, &storage_->constraints_, storage_->config_.items,
storage_->schema_validator_));
}
VerticesIterable Vertices(LabelId label, View view);
VerticesIterable Vertices(LabelId label, PropertyId property, View view);
VerticesIterable Vertices(LabelId label, PropertyId property, const PropertyValue &value, View view);
VerticesIterable Vertices(LabelId label, PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view);
/// Return approximate number of all vertices in the database.
/// Note that this is always an over-estimate and never an under-estimate.
int64_t ApproximateVertexCount() const { return static_cast<int64_t>(storage_->vertices_.size()); }
/// Return approximate number of vertices with the given label.
/// Note that this is always an over-estimate and never an under-estimate.
int64_t ApproximateVertexCount(LabelId label) const {
return storage_->indices_.label_index.ApproximateVertexCount(label);
}
/// Return approximate number of vertices with the given label and property.
/// Note that this is always an over-estimate and never an under-estimate.
int64_t ApproximateVertexCount(LabelId label, PropertyId property) const {
return storage_->indices_.label_property_index.ApproximateVertexCount(label, property);
}
/// Return approximate number of vertices with the given label and the given
/// value for the given property. Note that this is always an over-estimate
/// and never an under-estimate.
int64_t ApproximateVertexCount(LabelId label, PropertyId property, const PropertyValue &value) const {
return storage_->indices_.label_property_index.ApproximateVertexCount(label, property, value);
}
/// Return approximate number of vertices with the given label and value for
/// the given property in the range defined by provided upper and lower
/// bounds.
int64_t ApproximateVertexCount(LabelId label, PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower,
const std::optional<utils::Bound<PropertyValue>> &upper) const {
return storage_->indices_.label_property_index.ApproximateVertexCount(label, property, lower, upper);
}
/// @return Accessor to the deleted vertex if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc
Result<std::optional<VertexAccessor>> DeleteVertex(VertexAccessor *vertex);
/// @return Accessor to the deleted vertex and deleted edges if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc
Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> DetachDeleteVertex(
VertexAccessor *vertex);
/// @throw std::bad_alloc
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type);
/// Accessor to the deleted edge if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc
Result<std::optional<EdgeAccessor>> DeleteEdge(EdgeAccessor *edge);
const std::string &LabelToName(LabelId label) const;
const std::string &PropertyToName(PropertyId property) const;
const std::string &EdgeTypeToName(EdgeTypeId edge_type) const;
/// @throw std::bad_alloc if unable to insert a new mapping
LabelId NameToLabel(std::string_view name);
/// @throw std::bad_alloc if unable to insert a new mapping
PropertyId NameToProperty(std::string_view name);
/// @throw std::bad_alloc if unable to insert a new mapping
EdgeTypeId NameToEdgeType(std::string_view name);
bool LabelIndexExists(LabelId label) const { return storage_->indices_.label_index.IndexExists(label); }
bool LabelPropertyIndexExists(LabelId label, PropertyId property) const {
return storage_->indices_.label_property_index.IndexExists(label, property);
}
IndicesInfo ListAllIndices() const {
return {storage_->indices_.label_index.ListIndices(), storage_->indices_.label_property_index.ListIndices()};
}
ConstraintsInfo ListAllConstraints() const {
return {ListExistenceConstraints(storage_->constraints_),
storage_->constraints_.unique_constraints.ListConstraints()};
}
const SchemaValidator &GetSchemaValidator() const;
SchemasInfo ListAllSchemas() const { return {storage_->schemas_.ListSchemas()}; }
void AdvanceCommand();
/// Commit returns `ConstraintViolation` if the changes made by this
/// transaction violate an existence or unique constraint. In that case the
/// transaction is automatically aborted. Otherwise, void is returned.
/// @throw std::bad_alloc
utils::BasicResult<ConstraintViolation, void> Commit(std::optional<uint64_t> desired_commit_timestamp = {});
/// @throw std::bad_alloc
void Abort();
void FinalizeTransaction();
private:
/// @throw std::bad_alloc
VertexAccessor CreateVertex(Gid gid, LabelId primary_label);
/// @throw std::bad_alloc
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, Gid gid);
Storage *storage_;
Transaction transaction_;
std::optional<uint64_t> commit_timestamp_;
bool is_transaction_active_;
Config::Items config_;
};
Accessor Access(std::optional<IsolationLevel> override_isolation_level = {}) {
return Accessor{this, override_isolation_level.value_or(isolation_level_)};
}
const std::string &LabelToName(LabelId label) const;
const std::string &PropertyToName(PropertyId property) const;
const std::string &EdgeTypeToName(EdgeTypeId edge_type) const;
/// @throw std::bad_alloc if unable to insert a new mapping
LabelId NameToLabel(std::string_view name);
/// @throw std::bad_alloc if unable to insert a new mapping
PropertyId NameToProperty(std::string_view name);
/// @throw std::bad_alloc if unable to insert a new mapping
EdgeTypeId NameToEdgeType(std::string_view name);
/// @throw std::bad_alloc
bool CreateIndex(LabelId label, std::optional<uint64_t> desired_commit_timestamp = {});
/// @throw std::bad_alloc
bool CreateIndex(LabelId label, PropertyId property, std::optional<uint64_t> desired_commit_timestamp = {});
bool DropIndex(LabelId label, std::optional<uint64_t> desired_commit_timestamp = {});
bool DropIndex(LabelId label, PropertyId property, std::optional<uint64_t> desired_commit_timestamp = {});
IndicesInfo ListAllIndices() const;
/// Creates an existence constraint. Returns true if the constraint was
/// successfully added, false if it already exists and a `ConstraintViolation`
/// if there is an existing vertex violating the constraint.
///
/// @throw std::bad_alloc
/// @throw std::length_error
utils::BasicResult<ConstraintViolation, bool> CreateExistenceConstraint(
LabelId label, PropertyId property, std::optional<uint64_t> desired_commit_timestamp = {});
/// Removes an existence constraint. Returns true if the constraint was
/// removed, and false if it doesn't exist.
bool DropExistenceConstraint(LabelId label, PropertyId property,
std::optional<uint64_t> desired_commit_timestamp = {});
/// Creates a unique constraint. In the case of two vertices violating the
/// constraint, it returns `ConstraintViolation`. Otherwise returns a
/// `UniqueConstraints::CreationStatus` enum with the following possibilities:
/// * `SUCCESS` if the constraint was successfully created,
/// * `ALREADY_EXISTS` if the constraint already existed,
/// * `EMPTY_PROPERTIES` if the property set is empty, or
// * `PROPERTIES_SIZE_LIMIT_EXCEEDED` if the property set exceeds the
// limit of maximum number of properties.
///
/// @throw std::bad_alloc
utils::BasicResult<ConstraintViolation, UniqueConstraints::CreationStatus> CreateUniqueConstraint(
LabelId label, const std::set<PropertyId> &properties, std::optional<uint64_t> desired_commit_timestamp = {});
/// Removes a unique constraint. Returns `UniqueConstraints::DeletionStatus`
/// enum with the following possibilities:
/// * `SUCCESS` if constraint was successfully removed,
/// * `NOT_FOUND` if the specified constraint was not found,
/// * `EMPTY_PROPERTIES` if the property set is empty, or
/// * `PROPERTIES_SIZE_LIMIT_EXCEEDED` if the property set exceeds the
// limit of maximum number of properties.
UniqueConstraints::DeletionStatus DropUniqueConstraint(LabelId label, const std::set<PropertyId> &properties,
std::optional<uint64_t> desired_commit_timestamp = {});
ConstraintsInfo ListAllConstraints() const;
SchemasInfo ListAllSchemas() const;
const Schemas::Schema *GetSchema(LabelId primary_label) const;
bool CreateSchema(LabelId primary_label, const std::vector<SchemaProperty> &schemas_types);
bool DropSchema(LabelId primary_label);
StorageInfo GetInfo() const;
bool LockPath();
bool UnlockPath();
bool SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config = {});
bool SetMainReplicationRole();
enum class RegisterReplicaError : uint8_t {
NAME_EXISTS,
END_POINT_EXISTS,
CONNECTION_FAILED,
COULD_NOT_BE_PERSISTED
};
/// @pre The instance should have a MAIN role
/// @pre Timeout can only be set for SYNC replication
utils::BasicResult<RegisterReplicaError, void> RegisterReplica(
std::string name, io::network::Endpoint endpoint, replication::ReplicationMode replication_mode,
const replication::ReplicationClientConfig &config = {});
/// @pre The instance should have a MAIN role
bool UnregisterReplica(std::string_view name);
std::optional<replication::ReplicaState> GetReplicaState(std::string_view name);
ReplicationRole GetReplicationRole() const;
struct ReplicaInfo {
std::string name;
replication::ReplicationMode mode;
std::optional<double> timeout;
io::network::Endpoint endpoint;
replication::ReplicaState state;
};
std::vector<ReplicaInfo> ReplicasInfo();
void FreeMemory();
void SetIsolationLevel(IsolationLevel isolation_level);
enum class CreateSnapshotError : uint8_t { DisabledForReplica };
utils::BasicResult<CreateSnapshotError> CreateSnapshot();
explicit Storage(Config config);
// Interface toward shard manipulation
// Shard handler -> will use rsm client
private:
Transaction CreateTransaction(IsolationLevel isolation_level);
/// The force parameter determines the behaviour of the garbage collector.
/// If it's set to true, it will behave as a global operation, i.e. it can't
/// be part of a transaction, and no other transaction can be active at the same time.
/// This allows it to delete immediately vertices without worrying that some other
/// transaction is possibly using it. If there are active transactions when this method
/// is called with force set to true, it will fallback to the same method with the force
/// set to false.
/// If it's set to false, it will execute in parallel with other transactions, ensuring
/// that no object in use can be deleted.
/// @throw std::system_error
/// @throw std::bad_alloc
template <bool force>
void CollectGarbage();
bool InitializeWalFile();
void FinalizeWalFile();
void AppendToWal(const Transaction &transaction, uint64_t final_commit_timestamp);
void AppendToWal(durability::StorageGlobalOperation operation, LabelId label, const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp);
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
// Main object storage
VerticesSkipList vertices_;
utils::SkipList<Edge> edges_;
uint64_t edge_id_{0};
// Even though the edge count is already kept in the `edges_` SkipList, the
// list is used only when properties are enabled for edges. Because of that we
// keep a separate count of edges that is always updated.
uint64_t edge_count_{0};
NameIdMapper name_id_mapper_;
SchemaValidator schema_validator_;
Constraints constraints_;
Indices indices_;
Schemas schemas_;
// Transaction engine
uint64_t timestamp_{kTimestampInitialId};
uint64_t transaction_id_{kTransactionInitialId};
// TODO: This isn't really a commit log, it doesn't even care if a
// transaction commited or aborted. We could probably combine this with
// `timestamp_` in a sensible unit, something like TransactionClock or
// whatever.
std::optional<CommitLog> commit_log_;
std::list<Transaction> committed_transactions_;
IsolationLevel isolation_level_;
std::vector<Shard> shards_;
boost::asio::thread_pool shard_handlers_;
Config config_;
// Undo buffers that were unlinked and now are waiting to be freed.
std::list<std::pair<uint64_t, std::list<Delta>>> garbage_undo_buffers_;
// Vertices that are logically deleted but still have to be removed from
// indices before removing them from the main storage.
std::list<PrimaryKey> deleted_vertices_;
// Vertices that are logically deleted and removed from indices and now wait
// to be removed from the main storage.
std::list<std::pair<uint64_t, PrimaryKey>> garbage_vertices_;
// Edges that are logically deleted and wait to be removed from the main
// storage.
std::list<Gid> deleted_edges_;
// Durability
std::filesystem::path snapshot_directory_;
std::filesystem::path wal_directory_;
std::filesystem::path lock_file_path_;
utils::OutputFile lock_file_handle_;
// UUID used to distinguish snapshots and to link snapshots to WALs
std::string uuid_;
// Sequence number used to keep track of the chain of WALs.
uint64_t wal_seq_num_{0};
// UUID to distinguish different main instance runs for replication process
// on SAME storage.
// Multiple instances can have same storage UUID and be MAIN at the same time.
// We cannot compare commit timestamps of those instances if one of them
// becomes the replica of the other so we use epoch_id_ as additional
// discriminating property.
// Example of this:
// We have 2 instances of the same storage, S1 and S2.
// S1 and S2 are MAIN and accept their own commits and write them to the WAL.
// At the moment when S1 commited a transaction with timestamp 20, and S2
// a different transaction with timestamp 15, we change S2's role to REPLICA
// and register it on S1.
// Without using the epoch_id, we don't know that S1 and S2 have completely
// different transactions, we think that the S2 is behind only by 5 commits.
std::string epoch_id_;
// History of the previous epoch ids.
// Each value consists of the epoch id along the last commit belonging to that
// epoch.
std::deque<std::pair<std::string, uint64_t>> epoch_history_;
std::optional<durability::WalFile> wal_file_;
uint64_t wal_unsynced_transactions_{0};
utils::FileRetainer file_retainer_;
// Global locker that is used for clients file locking
utils::FileRetainer::FileLocker global_locker_;
// Last commited timestamp
uint64_t last_commit_timestamp_{kTimestampInitialId};
class ReplicationServer;
std::unique_ptr<ReplicationServer> replication_server_{nullptr};
class ReplicationClient;
// We create ReplicationClient using unique_ptr so we can move
// newly created client into the vector.
// We cannot move the client directly because it contains ThreadPool
// which cannot be moved. Also, the move is necessary because
// we don't want to create the client directly inside the vector
// because that would require the lock on the list putting all
// commits (they iterate list of clients) to halt.
// This way we can initialize client in main thread which means
// that we can immediately notify the user if the initialization
// failed.
using ReplicationClientList = utils::Synchronized<std::vector<std::unique_ptr<ReplicationClient>>, utils::SpinLock>;
ReplicationClientList replication_clients_;
ReplicationRole replication_role_{ReplicationRole::MAIN};
};
} // namespace memgraph::storage::v3

View File

@ -25,7 +25,7 @@
namespace memgraph::storage::v3 {
class EdgeAccessor;
class Storage;
class Shard;
struct Indices;
struct Constraints;
@ -46,7 +46,7 @@ class VertexAccessor final {
private:
const Vertex *vertex_;
};
friend class Storage;
friend class Shard;
public:
// Be careful when using VertexAccessor since it can be instantiated with

View File

@ -337,15 +337,15 @@ target_link_libraries(${test_prefix}storage_v3_key_store mg-storage-v3 rapidchec
add_unit_test(storage_v3_vertex_accessors.cpp)
target_link_libraries(${test_prefix}storage_v3_vertex_accessors mg-storage-v3)
# Test mg-query-v3
# Test mg-query-v2
add_unit_test(query_v2_interpreter.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp)
target_link_libraries(${test_prefix}query_v2_interpreter mg-storage-v3 mg-query-v2 mg-communication)
# add_unit_test(query_v2_query_plan_accumulate_aggregate.cpp)
# target_link_libraries(${test_prefix}query_v2_query_plan_accumulate_aggregate mg-query-v2)
# add_unit_test(query_v2_query_plan_create_set_remove_delete.cpp)
# target_link_libraries(${test_prefix}query_v2_query_plan_create_set_remove_delete mg-query-v2)
# # add_unit_test(query_v2_query_plan_create_set_remove_delete.cpp)
# # target_link_libraries(${test_prefix}query_v2_query_plan_create_set_remove_delete mg-query-v2)
# add_unit_test(query_v2_query_plan_bag_semantics.cpp)
# target_link_libraries(${test_prefix}query_v2_query_plan_bag_semantics mg-query-v2)

View File

@ -29,27 +29,29 @@
#include "query_v2_query_common.hpp"
#include "result_stream_faker.hpp"
#include "storage/v3/isolation_level.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/name_id_mapper.hpp"
#include "storage/v3/property_value.hpp"
#include "utils/csv_parsing.hpp"
#include "utils/logging.hpp"
namespace memgraph::query::v2::tests {
// auto ToEdgeList(const memgraph::communication::bolt::Value &v) {
// std::vector<memgraph::communication::bolt::Edge> list;
// for (auto x : v.ValueList()) {
// list.push_back(x.ValueEdge());
// }
// return list;
// }
auto ToEdgeList(const memgraph::communication::bolt::Value &v) {
std::vector<memgraph::communication::bolt::Edge> list;
for (auto x : v.ValueList()) {
list.push_back(x.ValueEdge());
}
return list;
}
// auto StringToUnorderedSet(const std::string &element) {
// const auto element_split = memgraph::utils::Split(element, ", ");
// return std::unordered_set<std::string>(element_split.begin(), element_split.end());
// };
auto StringToUnorderedSet(const std::string &element) {
const auto element_split = memgraph::utils::Split(element, ", ");
return std::unordered_set<std::string>(element_split.begin(), element_split.end());
};
struct InterpreterFaker {
InterpreterFaker(memgraph::storage::v3::Storage *db, const memgraph::query::v2::InterpreterConfig config,
InterpreterFaker(memgraph::storage::v3::Shard *db, const memgraph::query::v2::InterpreterConfig config,
const std::filesystem::path &data_directory)
: interpreter_context(db, config, data_directory), interpreter(&interpreter_context) {
interpreter_context.auth_checker = &auth_checker;
@ -113,10 +115,20 @@ class InterpreterTest : public ::testing::Test {
return default_interpreter.Interpret(query, params);
}
memgraph::storage::v3::Storage db_;
storage::v3::LabelId NameToLabelId(std::string_view label_name) {
return storage::v3::LabelId::FromUint(id_mapper.NameToId(label_name));
}
storage::v3::PropertyId NameToPropertyId(std::string_view property_name) {
return storage::v3::PropertyId::FromUint(id_mapper.NameToId(property_name));
}
storage::v3::PrimaryKey pk{storage::v3::PropertyValue(0)};
memgraph::storage::v3::NameIdMapper id_mapper;
const storage::v3::LabelId label{NameToLabelId("label")};
storage::v3::Shard db_{label, pk, std::nullopt};
std::filesystem::path data_directory{std::filesystem::temp_directory_path() / "MG_tests_unit_query_v2_interpreter"};
const storage::v3::LabelId label{db_.NameToLabel("label")};
const storage::v3::PropertyId property{db_.NameToProperty("property")};
const storage::v3::PropertyId property{NameToPropertyId("property")};
InterpreterFaker default_interpreter{&db_, {}, data_directory};
};
@ -150,8 +162,8 @@ TEST_F(InterpreterTest, DummyTestToForceQueryV2Compilation) {
// }
// }
// // Run query with different ast twice to see if query executes correctly when
// // ast is read from cache.
// Run query with different ast twice to see if query executes correctly when
// ast is read from cache.
// TEST_F(InterpreterTest, AstCache) {
// {
// auto stream = Interpret("RETURN 2 + 3");
@ -1447,8 +1459,8 @@ TEST_F(InterpreterTest, DummyTestToForceQueryV2Compilation) {
// auto notification = notifications[0].ValueMap();
// ASSERT_EQ(notification["severity"].ValueString(), "INFO");
// ASSERT_EQ(notification["code"].ValueString(), "ConstraintDoesNotExist");
// ASSERT_EQ(notification["title"].ValueString(), "Constraint EXISTS on label L1 on properties name doesn't
// exist."); ASSERT_EQ(notification["description"].ValueString(), "");
// ASSERT_EQ(notification["title"].ValueString(), "Constraint EXISTS on label L1 on properties name doesn'texist.");
// ASSERT_EQ(notification["description"].ValueString(), "");
// }
// }

View File

@ -26,7 +26,7 @@
*/
class ResultStreamFaker {
public:
explicit ResultStreamFaker(memgraph::storage::v3::Storage *store) : store_(store) {}
explicit ResultStreamFaker(memgraph::storage::v3::Shard *store) : store_(store) {}
ResultStreamFaker(const ResultStreamFaker &) = delete;
ResultStreamFaker &operator=(const ResultStreamFaker &) = delete;
@ -124,7 +124,7 @@ class ResultStreamFaker {
}
private:
memgraph::storage::v3::Storage *store_;
memgraph::storage::v3::Shard *store_;
// the data that the record stream can accept
std::vector<std::string> header_;
std::vector<std::vector<memgraph::communication::bolt::Value>> results_;

View File

@ -19,12 +19,14 @@
#include "storage/v3/delta.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/name_id_mapper.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/result.hpp"
#include "storage/v3/schema_validator.hpp"
#include "storage/v3/storage.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/vertex_accessor.hpp"
#include "storage_v3_test_utils.hpp"
#include "utils/exceptions.hpp"
using testing::UnorderedElementsAre;
@ -37,7 +39,7 @@ class StorageV3 : public ::testing::Test {
store.CreateSchema(primary_label, {storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}}));
}
VertexAccessor CreateVertexAndValidate(Storage::Accessor &acc, LabelId primary_label,
VertexAccessor CreateVertexAndValidate(Shard::Accessor &acc, LabelId primary_label,
const std::vector<LabelId> &labels,
const std::vector<std::pair<PropertyId, PropertyValue>> &properties) {
auto vtx = acc.CreateVertexAndValidate(primary_label, labels, properties);
@ -45,10 +47,21 @@ class StorageV3 : public ::testing::Test {
return *vtx;
}
Storage store;
const LabelId primary_label{store.NameToLabel("label")};
const PropertyId primary_property{store.NameToProperty("property")};
LabelId NameToLabelId(std::string_view label_name) { return LabelId::FromUint(id_mapper.NameToId(label_name)); }
PropertyId NameToPropertyId(std::string_view property_name) {
return PropertyId::FromUint(id_mapper.NameToId(property_name));
}
EdgeTypeId NameToEdgeTypeId(std::string_view edge_type_name) {
return EdgeTypeId::FromUint(id_mapper.NameToId(edge_type_name));
}
NameIdMapper id_mapper;
const std::vector<PropertyValue> pk{PropertyValue{0}};
const LabelId primary_label{NameToLabelId("label")};
Shard store{primary_label, pk, std::nullopt};
const PropertyId primary_property{NameToPropertyId("property")};
};
// NOLINTNEXTLINE(hicpp-special-member-functions)
@ -236,7 +249,7 @@ TEST_F(StorageV3, AccessorMove) {
ASSERT_TRUE(acc.FindVertex(pk, View::NEW).has_value());
EXPECT_EQ(CountVertices(acc, View::NEW), 1U);
Storage::Accessor moved(std::move(acc));
Shard::Accessor moved(std::move(acc));
ASSERT_FALSE(moved.FindVertex(pk, View::OLD).has_value());
EXPECT_EQ(CountVertices(moved, View::OLD), 0U);
@ -600,7 +613,7 @@ TEST_F(StorageV3, VertexDeleteLabel) {
auto vertex = acc.FindVertex(pk, View::NEW);
ASSERT_TRUE(vertex);
auto label5 = acc.NameToLabel("label5");
auto label5 = NameToLabelId("label5");
// Check whether label 5 exists
ASSERT_FALSE(vertex->HasLabel(label5, View::OLD).GetValue());
@ -651,7 +664,7 @@ TEST_F(StorageV3, VertexDeleteLabel) {
auto vertex = acc.FindVertex(pk, View::NEW);
ASSERT_TRUE(vertex);
auto label5 = acc.NameToLabel("label5");
auto label5 = NameToLabelId("label5");
// Check whether label 5 exists
ASSERT_FALSE(vertex->HasLabel(label5, View::OLD).GetValue());
@ -744,7 +757,7 @@ TEST_F(StorageV3, VertexDeleteProperty) {
auto vertex = acc.FindVertex(pk, View::NEW);
ASSERT_TRUE(vertex);
auto property5 = acc.NameToProperty("property5");
auto property5 = NameToPropertyId("property5");
// Check whether property 5 exists
ASSERT_TRUE(vertex->GetProperty(property5, View::OLD)->IsNull());
@ -790,7 +803,7 @@ TEST_F(StorageV3, VertexDeleteProperty) {
auto vertex = acc.FindVertex(pk, View::NEW);
ASSERT_TRUE(vertex);
auto property5 = acc.NameToProperty("property5");
auto property5 = NameToPropertyId("property5");
// Check whether property 5 exists
ASSERT_TRUE(vertex->GetProperty(property5, View::OLD)->IsNull());
@ -866,7 +879,7 @@ TEST_F(StorageV3, VertexLabelCommit) {
auto acc = store.Access();
auto vertex = CreateVertexAndValidate(acc, primary_label, {}, {{primary_property, PropertyValue{0}}});
auto label = acc.NameToLabel("label5");
auto label = NameToLabelId("label5");
ASSERT_FALSE(vertex.HasLabel(label, View::NEW).GetValue());
ASSERT_EQ(vertex.Labels(View::NEW)->size(), 0);
@ -897,7 +910,7 @@ TEST_F(StorageV3, VertexLabelCommit) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto label = acc.NameToLabel("label5");
auto label = NameToLabelId("label5");
ASSERT_TRUE(vertex->HasLabel(label, View::OLD).GetValue());
{
@ -913,7 +926,7 @@ TEST_F(StorageV3, VertexLabelCommit) {
ASSERT_EQ(labels[0], label);
}
auto other_label = acc.NameToLabel("other");
auto other_label = NameToLabelId("other");
ASSERT_FALSE(vertex->HasLabel(other_label, View::OLD).GetValue());
ASSERT_FALSE(vertex->HasLabel(other_label, View::NEW).GetValue());
@ -925,7 +938,7 @@ TEST_F(StorageV3, VertexLabelCommit) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto label = acc.NameToLabel("label5");
auto label = NameToLabelId("label5");
{
auto res = vertex->RemoveLabelAndValidate(label);
@ -956,14 +969,14 @@ TEST_F(StorageV3, VertexLabelCommit) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto label = acc.NameToLabel("label5");
auto label = NameToLabelId("label5");
ASSERT_FALSE(vertex->HasLabel(label, View::OLD).GetValue());
ASSERT_FALSE(vertex->HasLabel(label, View::NEW).GetValue());
ASSERT_EQ(vertex->Labels(View::OLD)->size(), 0);
ASSERT_EQ(vertex->Labels(View::NEW)->size(), 0);
auto other_label = acc.NameToLabel("other");
auto other_label = NameToLabelId("other");
ASSERT_FALSE(vertex->HasLabel(other_label, View::OLD).GetValue());
ASSERT_FALSE(vertex->HasLabel(other_label, View::NEW).GetValue());
@ -987,7 +1000,7 @@ TEST_F(StorageV3, VertexLabelAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto label = acc.NameToLabel("label5");
auto label = NameToLabelId("label5");
ASSERT_FALSE(vertex->HasLabel(label, View::NEW).GetValue());
ASSERT_EQ(vertex->Labels(View::NEW)->size(), 0);
@ -1020,14 +1033,14 @@ TEST_F(StorageV3, VertexLabelAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto label = acc.NameToLabel("label5");
auto label = NameToLabelId("label5");
ASSERT_FALSE(vertex->HasLabel(label, View::OLD).GetValue());
ASSERT_FALSE(vertex->HasLabel(label, View::NEW).GetValue());
ASSERT_EQ(vertex->Labels(View::OLD)->size(), 0);
ASSERT_EQ(vertex->Labels(View::NEW)->size(), 0);
auto other_label = acc.NameToLabel("other");
auto other_label = NameToLabelId("other");
ASSERT_FALSE(vertex->HasLabel(other_label, View::OLD).GetValue());
ASSERT_FALSE(vertex->HasLabel(other_label, View::NEW).GetValue());
@ -1041,7 +1054,7 @@ TEST_F(StorageV3, VertexLabelAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto label = acc.NameToLabel("label5");
auto label = NameToLabelId("label5");
ASSERT_FALSE(vertex->HasLabel(label, View::NEW).GetValue());
ASSERT_EQ(vertex->Labels(View::NEW)->size(), 0);
@ -1074,7 +1087,7 @@ TEST_F(StorageV3, VertexLabelAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto label = acc.NameToLabel("label5");
auto label = NameToLabelId("label5");
ASSERT_TRUE(vertex->HasLabel(label, View::OLD).GetValue());
{
@ -1090,7 +1103,7 @@ TEST_F(StorageV3, VertexLabelAbort) {
ASSERT_EQ(labels[0], label);
}
auto other_label = acc.NameToLabel("other");
auto other_label = NameToLabelId("other");
ASSERT_FALSE(vertex->HasLabel(other_label, View::OLD).GetValue());
ASSERT_FALSE(vertex->HasLabel(other_label, View::NEW).GetValue());
@ -1104,7 +1117,7 @@ TEST_F(StorageV3, VertexLabelAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto label = acc.NameToLabel("label5");
auto label = NameToLabelId("label5");
{
auto res = vertex->RemoveLabelAndValidate(label);
@ -1137,7 +1150,7 @@ TEST_F(StorageV3, VertexLabelAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto label = acc.NameToLabel("label5");
auto label = NameToLabelId("label5");
ASSERT_TRUE(vertex->HasLabel(label, View::OLD).GetValue());
{
@ -1153,7 +1166,7 @@ TEST_F(StorageV3, VertexLabelAbort) {
ASSERT_EQ(labels[0], label);
}
auto other_label = acc.NameToLabel("other");
auto other_label = NameToLabelId("other");
ASSERT_FALSE(vertex->HasLabel(other_label, View::OLD).GetValue());
ASSERT_FALSE(vertex->HasLabel(other_label, View::NEW).GetValue());
@ -1167,7 +1180,7 @@ TEST_F(StorageV3, VertexLabelAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto label = acc.NameToLabel("label5");
auto label = NameToLabelId("label5");
{
auto res = vertex->RemoveLabelAndValidate(label);
@ -1200,14 +1213,14 @@ TEST_F(StorageV3, VertexLabelAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto label = acc.NameToLabel("label5");
auto label = NameToLabelId("label5");
ASSERT_FALSE(vertex->HasLabel(label, View::OLD).GetValue());
ASSERT_FALSE(vertex->HasLabel(label, View::NEW).GetValue());
ASSERT_EQ(vertex->Labels(View::OLD)->size(), 0);
ASSERT_EQ(vertex->Labels(View::NEW)->size(), 0);
auto other_label = acc.NameToLabel("other");
auto other_label = NameToLabelId("other");
ASSERT_FALSE(vertex->HasLabel(other_label, View::OLD).GetValue());
ASSERT_FALSE(vertex->HasLabel(other_label, View::NEW).GetValue());
@ -1232,8 +1245,8 @@ TEST_F(StorageV3, VertexLabelSerializationError) {
auto vertex = acc1.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto label1 = acc1.NameToLabel("label1");
auto label2 = acc1.NameToLabel("label2");
auto label1 = NameToLabelId("label1");
auto label2 = NameToLabelId("label2");
ASSERT_FALSE(vertex->HasLabel(label1, View::OLD).GetValue());
ASSERT_FALSE(vertex->HasLabel(label1, View::NEW).GetValue());
@ -1271,8 +1284,8 @@ TEST_F(StorageV3, VertexLabelSerializationError) {
auto vertex = acc2.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto label1 = acc2.NameToLabel("label1");
auto label2 = acc2.NameToLabel("label2");
auto label1 = NameToLabelId("label1");
auto label2 = NameToLabelId("label2");
ASSERT_FALSE(vertex->HasLabel(label1, View::OLD).GetValue());
ASSERT_FALSE(vertex->HasLabel(label1, View::NEW).GetValue());
@ -1297,8 +1310,8 @@ TEST_F(StorageV3, VertexLabelSerializationError) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto label1 = acc.NameToLabel("label1");
auto label2 = acc.NameToLabel("label2");
auto label1 = NameToLabelId("label1");
auto label2 = NameToLabelId("label2");
ASSERT_TRUE(vertex->HasLabel(label1, View::OLD).GetValue());
ASSERT_FALSE(vertex->HasLabel(label2, View::OLD).GetValue());
@ -1326,7 +1339,7 @@ TEST_F(StorageV3, VertexPropertyCommit) {
auto acc = store.Access();
auto vertex = CreateVertexAndValidate(acc, primary_label, {}, {{primary_property, PropertyValue{0}}});
auto property = acc.NameToProperty("property5");
auto property = NameToPropertyId("property5");
ASSERT_TRUE(vertex.GetProperty(property, View::NEW)->IsNull());
ASSERT_EQ(vertex.Properties(View::NEW)->size(), 0);
@ -1364,7 +1377,7 @@ TEST_F(StorageV3, VertexPropertyCommit) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto property = acc.NameToProperty("property5");
auto property = NameToPropertyId("property5");
ASSERT_EQ(vertex->GetProperty(property, View::OLD)->ValueString(), "nandare");
{
@ -1380,7 +1393,7 @@ TEST_F(StorageV3, VertexPropertyCommit) {
ASSERT_EQ(properties[property].ValueString(), "nandare");
}
auto other_property = acc.NameToProperty("other");
auto other_property = NameToPropertyId("other");
ASSERT_TRUE(vertex->GetProperty(other_property, View::OLD)->IsNull());
ASSERT_TRUE(vertex->GetProperty(other_property, View::NEW)->IsNull());
@ -1392,7 +1405,7 @@ TEST_F(StorageV3, VertexPropertyCommit) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto property = acc.NameToProperty("property5");
auto property = NameToPropertyId("property5");
{
auto old_value = vertex->SetPropertyAndValidate(property, PropertyValue());
@ -1423,14 +1436,14 @@ TEST_F(StorageV3, VertexPropertyCommit) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto property = acc.NameToProperty("property5");
auto property = NameToPropertyId("property5");
ASSERT_TRUE(vertex->GetProperty(property, View::OLD)->IsNull());
ASSERT_TRUE(vertex->GetProperty(property, View::NEW)->IsNull());
ASSERT_EQ(vertex->Properties(View::OLD)->size(), 0);
ASSERT_EQ(vertex->Properties(View::NEW)->size(), 0);
auto other_property = acc.NameToProperty("other");
auto other_property = NameToPropertyId("other");
ASSERT_TRUE(vertex->GetProperty(other_property, View::OLD)->IsNull());
ASSERT_TRUE(vertex->GetProperty(other_property, View::NEW)->IsNull());
@ -1454,7 +1467,7 @@ TEST_F(StorageV3, VertexPropertyAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto property = acc.NameToProperty("property5");
auto property = NameToPropertyId("property5");
ASSERT_TRUE(vertex->GetProperty(property, View::NEW)->IsNull());
ASSERT_EQ(vertex->Properties(View::NEW)->size(), 0);
@ -1494,14 +1507,14 @@ TEST_F(StorageV3, VertexPropertyAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto property = acc.NameToProperty("property5");
auto property = NameToPropertyId("property5");
ASSERT_TRUE(vertex->GetProperty(property, View::OLD)->IsNull());
ASSERT_TRUE(vertex->GetProperty(property, View::NEW)->IsNull());
ASSERT_EQ(vertex->Properties(View::OLD)->size(), 0);
ASSERT_EQ(vertex->Properties(View::NEW)->size(), 0);
auto other_property = acc.NameToProperty("other");
auto other_property = NameToPropertyId("other");
ASSERT_TRUE(vertex->GetProperty(other_property, View::OLD)->IsNull());
ASSERT_TRUE(vertex->GetProperty(other_property, View::NEW)->IsNull());
@ -1515,7 +1528,7 @@ TEST_F(StorageV3, VertexPropertyAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto property = acc.NameToProperty("property5");
auto property = NameToPropertyId("property5");
ASSERT_TRUE(vertex->GetProperty(property, View::NEW)->IsNull());
ASSERT_EQ(vertex->Properties(View::NEW)->size(), 0);
@ -1555,7 +1568,7 @@ TEST_F(StorageV3, VertexPropertyAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto property = acc.NameToProperty("property5");
auto property = NameToPropertyId("property5");
ASSERT_EQ(vertex->GetProperty(property, View::OLD)->ValueString(), "nandare");
{
@ -1571,7 +1584,7 @@ TEST_F(StorageV3, VertexPropertyAbort) {
ASSERT_EQ(properties[property].ValueString(), "nandare");
}
auto other_property = acc.NameToProperty("other");
auto other_property = NameToPropertyId("other");
ASSERT_TRUE(vertex->GetProperty(other_property, View::OLD)->IsNull());
ASSERT_TRUE(vertex->GetProperty(other_property, View::NEW)->IsNull());
@ -1585,7 +1598,7 @@ TEST_F(StorageV3, VertexPropertyAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto property = acc.NameToProperty("property5");
auto property = NameToPropertyId("property5");
ASSERT_EQ(vertex->GetProperty(property, View::OLD)->ValueString(), "nandare");
{
@ -1626,7 +1639,7 @@ TEST_F(StorageV3, VertexPropertyAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto property = acc.NameToProperty("property5");
auto property = NameToPropertyId("property5");
ASSERT_EQ(vertex->GetProperty(property, View::OLD)->ValueString(), "nandare");
{
@ -1642,7 +1655,7 @@ TEST_F(StorageV3, VertexPropertyAbort) {
ASSERT_EQ(properties[property].ValueString(), "nandare");
}
auto other_property = acc.NameToProperty("other");
auto other_property = NameToPropertyId("other");
ASSERT_TRUE(vertex->GetProperty(other_property, View::OLD)->IsNull());
ASSERT_TRUE(vertex->GetProperty(other_property, View::NEW)->IsNull());
@ -1656,7 +1669,7 @@ TEST_F(StorageV3, VertexPropertyAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto property = acc.NameToProperty("property5");
auto property = NameToPropertyId("property5");
ASSERT_EQ(vertex->GetProperty(property, View::OLD)->ValueString(), "nandare");
{
@ -1697,14 +1710,14 @@ TEST_F(StorageV3, VertexPropertyAbort) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto property = acc.NameToProperty("property5");
auto property = NameToPropertyId("property5");
ASSERT_TRUE(vertex->GetProperty(property, View::OLD)->IsNull());
ASSERT_TRUE(vertex->GetProperty(property, View::NEW)->IsNull());
ASSERT_EQ(vertex->Properties(View::OLD)->size(), 0);
ASSERT_EQ(vertex->Properties(View::NEW)->size(), 0);
auto other_property = acc.NameToProperty("other");
auto other_property = NameToPropertyId("other");
ASSERT_TRUE(vertex->GetProperty(other_property, View::OLD)->IsNull());
ASSERT_TRUE(vertex->GetProperty(other_property, View::NEW)->IsNull());
@ -1729,8 +1742,8 @@ TEST_F(StorageV3, VertexPropertySerializationError) {
auto vertex = acc1.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto property1 = acc1.NameToProperty("property1");
auto property2 = acc1.NameToProperty("property2");
auto property1 = NameToPropertyId("property1");
auto property2 = NameToPropertyId("property2");
ASSERT_TRUE(vertex->GetProperty(property1, View::OLD)->IsNull());
ASSERT_TRUE(vertex->GetProperty(property1, View::NEW)->IsNull());
@ -1762,8 +1775,8 @@ TEST_F(StorageV3, VertexPropertySerializationError) {
auto vertex = acc2.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto property1 = acc2.NameToProperty("property1");
auto property2 = acc2.NameToProperty("property2");
auto property1 = NameToPropertyId("property1");
auto property2 = NameToPropertyId("property2");
ASSERT_TRUE(vertex->GetProperty(property1, View::OLD)->IsNull());
ASSERT_TRUE(vertex->GetProperty(property1, View::NEW)->IsNull());
@ -1788,8 +1801,8 @@ TEST_F(StorageV3, VertexPropertySerializationError) {
auto vertex = acc.FindVertex(pk, View::OLD);
ASSERT_TRUE(vertex);
auto property1 = acc.NameToProperty("property1");
auto property2 = acc.NameToProperty("property2");
auto property1 = NameToPropertyId("property1");
auto property2 = NameToPropertyId("property2");
ASSERT_EQ(vertex->GetProperty(property1, View::OLD)->ValueInt(), 123);
ASSERT_TRUE(vertex->GetProperty(property2, View::OLD)->IsNull());
@ -1816,8 +1829,8 @@ TEST_F(StorageV3, VertexLabelPropertyMixed) {
auto acc = store.Access();
auto vertex = CreateVertexAndValidate(acc, primary_label, {}, {{primary_property, PropertyValue{0}}});
auto label = acc.NameToLabel("label5");
auto property = acc.NameToProperty("property5");
auto label = NameToLabelId("label5");
auto property = NameToPropertyId("property5");
// Check whether label 5 and property 5 exist
ASSERT_FALSE(vertex.HasLabel(label, View::NEW).GetValue());
@ -2051,8 +2064,8 @@ TEST_F(StorageV3, VertexLabelPropertyMixed) {
}
TEST_F(StorageV3, VertexPropertyClear) {
auto property1 = store.NameToProperty("property1");
auto property2 = store.NameToProperty("property2");
auto property1 = NameToPropertyId("property1");
auto property2 = NameToPropertyId("property2");
{
auto acc = store.Access();
auto vertex = CreateVertexAndValidate(acc, primary_label, {}, {{primary_property, PropertyValue{0}}});
@ -2153,8 +2166,8 @@ TEST_F(StorageV3, VertexPropertyClear) {
}
TEST_F(StorageV3, VertexNonexistentLabelPropertyEdgeAPI) {
auto label1 = store.NameToLabel("label1");
auto property1 = store.NameToProperty("property1");
auto label1 = NameToLabelId("label1");
auto property1 = NameToPropertyId("property1");
auto acc = store.Access();
auto vertex = CreateVertexAndValidate(acc, primary_label, {}, {{primary_property, PropertyValue{0}}});
@ -2182,7 +2195,7 @@ TEST_F(StorageV3, VertexNonexistentLabelPropertyEdgeAPI) {
// Modify vertex.
ASSERT_TRUE(vertex.AddLabelAndValidate(label1).HasValue());
ASSERT_TRUE(vertex.SetPropertyAndValidate(property1, PropertyValue("value")).HasValue());
ASSERT_TRUE(acc.CreateEdge(&vertex, &vertex, acc.NameToEdgeType("edge")).HasValue());
ASSERT_TRUE(acc.CreateEdge(&vertex, &vertex, NameToEdgeTypeId("edge")).HasValue());
// Check state after (OLD view).
ASSERT_EQ(vertex.Labels(View::OLD).GetError(), Error::NONEXISTENT_OBJECT);
@ -2218,14 +2231,14 @@ TEST_F(StorageV3, VertexVisibilitySingleTransaction) {
EXPECT_FALSE(acc2.FindVertex(pk, View::OLD));
EXPECT_FALSE(acc2.FindVertex(pk, View::NEW));
ASSERT_TRUE(vertex.AddLabelAndValidate(acc1.NameToLabel("label1")).HasValue());
ASSERT_TRUE(vertex.AddLabelAndValidate(NameToLabelId("label1")).HasValue());
EXPECT_FALSE(acc1.FindVertex(pk, View::OLD));
EXPECT_TRUE(acc1.FindVertex(pk, View::NEW));
EXPECT_FALSE(acc2.FindVertex(pk, View::OLD));
EXPECT_FALSE(acc2.FindVertex(pk, View::NEW));
ASSERT_TRUE(vertex.SetPropertyAndValidate(acc1.NameToProperty("meaning"), PropertyValue(42)).HasValue());
ASSERT_TRUE(vertex.SetPropertyAndValidate(NameToPropertyId("meaning"), PropertyValue(42)).HasValue());
auto acc3 = store.Access();
@ -2302,7 +2315,7 @@ TEST_F(StorageV3, VertexVisibilityMultipleTransactions) {
EXPECT_TRUE(acc2.FindVertex(pk, View::OLD));
EXPECT_TRUE(acc2.FindVertex(pk, View::NEW));
ASSERT_TRUE(vertex->AddLabelAndValidate(acc1.NameToLabel("label1")).HasValue());
ASSERT_TRUE(vertex->AddLabelAndValidate(NameToLabelId("label1")).HasValue());
EXPECT_TRUE(acc1.FindVertex(pk, View::OLD));
EXPECT_TRUE(acc1.FindVertex(pk, View::NEW));
@ -2323,7 +2336,7 @@ TEST_F(StorageV3, VertexVisibilityMultipleTransactions) {
EXPECT_TRUE(acc2.FindVertex(pk, View::OLD));
EXPECT_TRUE(acc2.FindVertex(pk, View::NEW));
ASSERT_TRUE(vertex->SetPropertyAndValidate(acc1.NameToProperty("meaning"), PropertyValue(42)).HasValue());
ASSERT_TRUE(vertex->SetPropertyAndValidate(NameToPropertyId("meaning"), PropertyValue(42)).HasValue());
auto acc3 = store.Access();
@ -2497,7 +2510,7 @@ TEST_F(StorageV3, VertexVisibilityMultipleTransactions) {
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_F(StorageV3, DeletedVertexAccessor) {
const auto property1 = store.NameToProperty("property1");
const auto property1 = NameToPropertyId("property1");
const PropertyValue property_value{"property_value"};
// Create the vertex
@ -2538,8 +2551,8 @@ TEST_F(StorageV3, DeletedVertexAccessor) {
TEST_F(StorageV3, TestCreateVertexAndValidate) {
{
auto acc = store.Access();
const auto label1 = store.NameToLabel("label1");
const auto prop1 = store.NameToProperty("prop1");
const auto label1 = NameToLabelId("label1");
const auto prop1 = NameToPropertyId("prop1");
auto vertex = acc.CreateVertexAndValidate(primary_label, {label1},
{{primary_property, PropertyValue(0)}, {prop1, PropertyValue(111)}});
ASSERT_TRUE(vertex.HasValue());
@ -2552,25 +2565,20 @@ TEST_F(StorageV3, TestCreateVertexAndValidate) {
(std::map<PropertyId, PropertyValue>{{prop1, PropertyValue(111)}}));
}
{
const auto label1 = store.NameToLabel("new_primary_label");
const auto prop1 = store.NameToProperty("key1");
const auto prop2 = store.NameToProperty("key2");
ASSERT_TRUE(store.CreateSchema(
label1, {SchemaProperty{prop1, common::SchemaType::INT}, SchemaProperty{prop2, common::SchemaType::STRING}}));
auto acc = store.Access();
auto vertex = acc.CreateVertexAndValidate(label1, {}, {{prop1, PropertyValue(21)}, {prop2, PropertyValue("test")}});
ASSERT_TRUE(vertex.HasValue());
ASSERT_TRUE(vertex->PrimaryLabel(View::NEW).HasValue());
EXPECT_EQ(vertex->PrimaryLabel(View::NEW).GetValue(), label1);
ASSERT_TRUE(vertex->PrimaryKey(View::NEW).HasValue());
EXPECT_EQ(vertex->PrimaryKey(View::NEW).GetValue(), (PrimaryKey{{PropertyValue(21), PropertyValue("test")}}));
ASSERT_TRUE(vertex->Properties(View::NEW).HasValue());
EXPECT_TRUE(vertex->Properties(View::NEW).GetValue().empty());
const auto label1 = NameToLabelId("label1");
const auto prop1 = NameToPropertyId("prop1");
EXPECT_THROW(
{
auto vertex = acc.CreateVertexAndValidate(
label1, {}, {{primary_property, PropertyValue(0)}, {prop1, PropertyValue(111)}});
},
utils::BasicException);
}
{
ASSERT_DEATH(
{
Storage store;
Shard store(primary_label, pk, std::nullopt);
ASSERT_TRUE(store.CreateSchema(primary_label,
{storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}}));
auto acc = store.Access();

View File

@ -13,7 +13,7 @@
namespace memgraph::storage::v3::tests {
size_t CountVertices(Storage::Accessor &storage_accessor, View view) {
size_t CountVertices(Shard::Accessor &storage_accessor, View view) {
auto vertices = storage_accessor.Vertices(view);
size_t count = 0U;
for (auto it = vertices.begin(); it != vertices.end(); ++it, ++count)

View File

@ -16,6 +16,6 @@
namespace memgraph::storage::v3::tests {
size_t CountVertices(Storage::Accessor &storage_accessor, View view);
size_t CountVertices(Shard::Accessor &storage_accessor, View view);
} // namespace memgraph::storage::v3::tests

View File

@ -21,7 +21,7 @@
#include "storage/v3/property_value.hpp"
#include "storage/v3/result.hpp"
#include "storage/v3/schema_validator.hpp"
#include "storage/v3/storage.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/vertex_accessor.hpp"
#include "storage_v3_test_utils.hpp"
@ -35,7 +35,7 @@ class StorageV3Accessor : public ::testing::Test {
ASSERT_TRUE(storage.CreateSchema(primary_label, {SchemaProperty{primary_property, common::SchemaType::INT}}));
}
VertexAccessor CreateVertexAndValidate(Storage::Accessor &acc, LabelId primary_label,
VertexAccessor CreateVertexAndValidate(Shard::Accessor &acc, LabelId primary_label,
const std::vector<LabelId> &labels,
const std::vector<std::pair<PropertyId, PropertyValue>> &properties) {
auto vtx = acc.CreateVertexAndValidate(primary_label, labels, properties);
@ -43,9 +43,17 @@ class StorageV3Accessor : public ::testing::Test {
return *vtx;
}
Storage storage;
const LabelId primary_label{storage.NameToLabel("label")};
const PropertyId primary_property{storage.NameToProperty("property")};
LabelId NameToLabelId(std::string_view label_name) { return LabelId::FromUint(id_mapper.NameToId(label_name)); }
PropertyId NameToPropertyId(std::string_view property_name) {
return PropertyId::FromUint(id_mapper.NameToId(property_name));
}
const std::vector<PropertyValue> pk{PropertyValue{0}};
NameIdMapper id_mapper;
Shard storage{NameToLabelId("label"), pk, std::nullopt};
const LabelId primary_label{NameToLabelId("label")};
const PropertyId primary_property{NameToPropertyId("property")};
};
TEST_F(StorageV3Accessor, TestPrimaryLabel) {
@ -85,9 +93,9 @@ TEST_F(StorageV3Accessor, TestPrimaryLabel) {
TEST_F(StorageV3Accessor, TestAddLabels) {
{
auto acc = storage.Access();
const auto label1 = storage.NameToLabel("label1");
const auto label2 = storage.NameToLabel("label2");
const auto label3 = storage.NameToLabel("label3");
const auto label1 = NameToLabelId("label1");
const auto label2 = NameToLabelId("label2");
const auto label3 = NameToLabelId("label3");
const auto vertex =
CreateVertexAndValidate(acc, primary_label, {label1, label2, label3}, {{primary_property, PropertyValue(0)}});
ASSERT_TRUE(vertex.Labels(View::NEW).HasValue());
@ -96,9 +104,9 @@ TEST_F(StorageV3Accessor, TestAddLabels) {
}
{
auto acc = storage.Access();
const auto label1 = storage.NameToLabel("label1");
const auto label2 = storage.NameToLabel("label2");
const auto label3 = storage.NameToLabel("label3");
const auto label1 = NameToLabelId("label1");
const auto label2 = NameToLabelId("label2");
const auto label3 = NameToLabelId("label3");
auto vertex = CreateVertexAndValidate(acc, primary_label, {label1}, {{primary_property, PropertyValue(1)}});
ASSERT_TRUE(vertex.Labels(View::NEW).HasValue());
ASSERT_FALSE(vertex.Labels(View::OLD).HasValue());
@ -111,7 +119,7 @@ TEST_F(StorageV3Accessor, TestAddLabels) {
}
{
auto acc = storage.Access();
const auto label1 = storage.NameToLabel("label");
const auto label1 = NameToLabelId("label");
auto vertex = acc.CreateVertexAndValidate(primary_label, {label1}, {{primary_property, PropertyValue(2)}});
ASSERT_TRUE(vertex.HasError());
ASSERT_TRUE(std::holds_alternative<SchemaViolation>(vertex.GetError()));
@ -120,7 +128,7 @@ TEST_F(StorageV3Accessor, TestAddLabels) {
}
{
auto acc = storage.Access();
const auto label1 = storage.NameToLabel("label");
const auto label1 = NameToLabelId("label");
auto vertex = acc.CreateVertexAndValidate(primary_label, {}, {{primary_property, PropertyValue(3)}});
ASSERT_TRUE(vertex.HasValue());
const auto schema_violation = vertex->AddLabelAndValidate(label1);
@ -134,9 +142,9 @@ TEST_F(StorageV3Accessor, TestAddLabels) {
TEST_F(StorageV3Accessor, TestRemoveLabels) {
{
auto acc = storage.Access();
const auto label1 = storage.NameToLabel("label1");
const auto label2 = storage.NameToLabel("label2");
const auto label3 = storage.NameToLabel("label3");
const auto label1 = NameToLabelId("label1");
const auto label2 = NameToLabelId("label2");
const auto label3 = NameToLabelId("label3");
auto vertex =
CreateVertexAndValidate(acc, primary_label, {label1, label2, label3}, {{primary_property, PropertyValue(0)}});
ASSERT_TRUE(vertex.Labels(View::NEW).HasValue());
@ -157,7 +165,7 @@ TEST_F(StorageV3Accessor, TestRemoveLabels) {
}
{
auto acc = storage.Access();
const auto label1 = storage.NameToLabel("label1");
const auto label1 = NameToLabelId("label1");
auto vertex = CreateVertexAndValidate(acc, primary_label, {}, {{primary_property, PropertyValue(1)}});
ASSERT_TRUE(vertex.Labels(View::NEW).HasValue());
EXPECT_TRUE(vertex.Labels(View::NEW).GetValue().empty());
@ -179,7 +187,7 @@ TEST_F(StorageV3Accessor, TestRemoveLabels) {
TEST_F(StorageV3Accessor, TestSetKeysAndProperties) {
{
auto acc = storage.Access();
const PropertyId prop1{storage.NameToProperty("prop1")};
const PropertyId prop1{NameToPropertyId("prop1")};
auto vertex = CreateVertexAndValidate(acc, primary_label, {}, {{primary_property, PropertyValue(0)}});
const auto res = vertex.SetPropertyAndValidate(prop1, PropertyValue(1));
ASSERT_TRUE(res.HasValue());