Add implementation of synchronous replication ()

This implements the initial version of synchronous replication.
Currently, only one replica is supported and that isn't configurable.

To run the main instance use the following command:
```
./memgraph \
    --main \
    --data-directory main-data \
    --storage-properties-on-edges \
    --storage-wal-enabled \
    --storage-snapshot-interval-sec 300
```

To run the replica instance use the following command:
```
./memgraph \
    --replica \
    --data-directory replica-data \
    --storage-properties-on-edges \
    --bolt-port 7688
```

You can then write/read data to Bolt port 7687 (the main instance) and also you
can read the data from the replica instance using Bolt port 7688.

NOTE: The main instance *must* be started without any data and the replica
*must* be started before any data is added to the main instance.

* Add basic synchronous replication test
* Using RWLock for replication stuff

Co-authored-by: Matej Ferencevic <matej.ferencevic@memgraph.io>
Co-authored-by: Antonio Andelic <antonio.andelic@memgraph.io>
This commit is contained in:
Marko Budiselić 2020-10-27 10:11:43 +01:00 committed by Antonio Andelic
parent d5b02eafb1
commit c68ed8d94e
13 changed files with 1374 additions and 5 deletions

View File

@ -10,5 +10,23 @@ set(storage_v2_src_files
vertex_accessor.cpp
storage.cpp)
if(MG_ENTERPRISE)
define_add_lcp(add_lcp_storage lcp_storage_cpp_files generated_lcp_storage_files)
add_lcp_storage(replication/rpc.lcp SLK_SERIALIZE)
add_custom_target(generate_lcp_storage DEPENDS ${generated_lcp_storage_files})
set(storage_v2_src_files
${storage_v2_src_files}
replication/slk.cpp
${lcp_storage_cpp_files})
endif()
add_library(mg-storage-v2 STATIC ${storage_v2_src_files})
target_link_libraries(mg-storage-v2 Threads::Threads mg-utils glog gflags)
if(MG_ENTERPRISE)
add_dependencies(mg-storage-v2 generate_lcp_storage)
target_link_libraries(mg-storage-v2 mg-rpc mg-slk)
endif()

View File

@ -25,7 +25,15 @@ class PropertyValueException : public utils::BasicException {
class PropertyValue {
public:
/// A value type, each type corresponds to exactly one C++ type.
enum class Type : unsigned char { Null, Bool, Int, Double, String, List, Map };
enum class Type : uint8_t {
Null = 0,
Bool = 1,
Int = 2,
Double = 3,
String = 4,
List = 5,
Map = 6,
};
static bool AreComparableTypes(Type a, Type b) {
return (a == b) || (a == Type::Int && b == Type::Double) ||

2
src/storage/v2/replication/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
# autogenerated files
rpc.hpp

View File

@ -0,0 +1,82 @@
#pragma once
#include "rpc/client.hpp"
#include "storage/v2/config.hpp"
#include "storage/v2/delta.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/name_id_mapper.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/replication/serialization.hpp"
namespace storage::replication {
class ReplicationClient {
public:
ReplicationClient(NameIdMapper *name_id_mapper, Config::Items items,
const io::network::Endpoint &endpoint, bool use_ssl)
: name_id_mapper_(name_id_mapper),
items_(items),
rpc_context_(use_ssl),
rpc_client_(endpoint, &rpc_context_) {}
class Handler {
private:
friend class ReplicationClient;
/// @throw rpc::RpcFailedException
explicit Handler(ReplicationClient *self)
: self_(self), stream_(self_->rpc_client_.Stream<AppendDeltasRpc>()) {}
public:
/// @throw rpc::RpcFailedException
void AppendDelta(const Delta &delta, const Vertex &vertex,
uint64_t final_commit_timestamp) {
Encoder encoder(stream_.GetBuilder());
EncodeDelta(&encoder, self_->name_id_mapper_, self_->items_, delta,
vertex, final_commit_timestamp);
}
/// @throw rpc::RpcFailedException
void AppendDelta(const Delta &delta, const Edge &edge,
uint64_t final_commit_timestamp) {
Encoder encoder(stream_.GetBuilder());
EncodeDelta(&encoder, self_->name_id_mapper_, delta, edge,
final_commit_timestamp);
}
/// @throw rpc::RpcFailedException
void AppendTransactionEnd(uint64_t final_commit_timestamp) {
Encoder encoder(stream_.GetBuilder());
EncodeTransactionEnd(&encoder, final_commit_timestamp);
}
/// @throw rpc::RpcFailedException
void AppendOperation(durability::StorageGlobalOperation operation,
LabelId label, const std::set<PropertyId> &properties,
uint64_t timestamp) {
Encoder encoder(stream_.GetBuilder());
EncodeOperation(&encoder, self_->name_id_mapper_, operation, label,
properties, timestamp);
}
/// @throw rpc::RpcFailedException
void Finalize() { stream_.AwaitResponse(); }
private:
ReplicationClient *self_;
rpc::Client::StreamHandler<AppendDeltasRpc> stream_;
};
Handler ReplicateTransaction() { return Handler(this); }
private:
NameIdMapper *name_id_mapper_;
Config::Items items_;
communication::ClientContext rpc_context_;
rpc::Client rpc_client_;
};
} // namespace storage::replication

View File

@ -0,0 +1,32 @@
#>cpp
#pragma once
#include <cstring>
#include <cstdint>
#include "rpc/messages.hpp"
#include "slk/serialization.hpp"
#include "slk/streams.hpp"
cpp<#
;; TODO(mferencevic): Change namespace to `storage::replication` once LCP is
;; updated to support such namespaces.
(lcp:namespace storage)
(lcp:define-rpc append-deltas
;; The actual deltas are sent as additional data using the RPC client's
;; streaming API for additional data.
(:request ())
(:response
((success :bool)
(term :uint64_t))))
(lcp:define-rpc heartbeat
(:request
((leader-id :uint16_t)
(term :uint64_t)))
(:response
((success :bool)
(term :uint64_t))))
(lcp:pop-namespace) ;; storage

View File

@ -0,0 +1,124 @@
#pragma once
#include "slk/streams.hpp"
#include "storage/v2/durability/serialization.hpp"
#include "storage/v2/replication/slk.hpp"
#include "utils/cast.hpp"
namespace storage::replication {
class Encoder final : public durability::BaseEncoder {
public:
explicit Encoder(slk::Builder *builder) : builder_(builder) {}
void WriteMarker(durability::Marker marker) override {
slk::Save(marker, builder_);
}
void WriteBool(bool value) override {
WriteMarker(durability::Marker::TYPE_BOOL);
slk::Save(value, builder_);
}
void WriteUint(uint64_t value) override {
WriteMarker(durability::Marker::TYPE_INT);
slk::Save(value, builder_);
}
void WriteDouble(double value) override {
WriteMarker(durability::Marker::TYPE_DOUBLE);
slk::Save(value, builder_);
}
void WriteString(const std::string_view &value) override {
WriteMarker(durability::Marker::TYPE_STRING);
slk::Save(value, builder_);
}
void WritePropertyValue(const PropertyValue &value) override {
WriteMarker(durability::Marker::TYPE_PROPERTY_VALUE);
slk::Save(value, builder_);
}
private:
slk::Builder *builder_;
};
class Decoder final : public durability::BaseDecoder {
public:
explicit Decoder(slk::Reader *reader) : reader_(reader) {}
std::optional<durability::Marker> ReadMarker() override {
durability::Marker marker;
slk::Load(&marker, reader_);
return marker;
}
std::optional<bool> ReadBool() override {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_BOOL)
return std::nullopt;
bool value;
slk::Load(&value, reader_);
return value;
}
std::optional<uint64_t> ReadUint() override {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_INT)
return std::nullopt;
uint64_t value;
slk::Load(&value, reader_);
return value;
}
std::optional<double> ReadDouble() override {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_DOUBLE)
return std::nullopt;
double value;
slk::Load(&value, reader_);
return value;
}
std::optional<std::string> ReadString() override {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_STRING)
return std::nullopt;
std::string value;
slk::Load(&value, reader_);
return std::move(value);
}
std::optional<PropertyValue> ReadPropertyValue() override {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_PROPERTY_VALUE)
return std::nullopt;
PropertyValue value;
slk::Load(&value, reader_);
return std::move(value);
}
bool SkipString() override {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_STRING)
return false;
std::string value;
slk::Load(&value, reader_);
return true;
}
bool SkipPropertyValue() override {
if (const auto marker = ReadMarker();
!marker || marker != durability::Marker::TYPE_PROPERTY_VALUE)
return false;
PropertyValue value;
slk::Load(&value, reader_);
return true;
}
private:
slk::Reader *reader_;
};
} // namespace storage::replication

View File

@ -0,0 +1,161 @@
#include "storage/v2/replication/slk.hpp"
#include <type_traits>
#include "utils/cast.hpp"
namespace slk {
void Save(const storage::Gid &gid, slk::Builder *builder) {
slk::Save(gid.AsUint(), builder);
}
void Load(storage::Gid *gid, slk::Reader *reader) {
uint64_t value;
slk::Load(&value, reader);
*gid = storage::Gid::FromUint(value);
}
void Save(const storage::PropertyValue::Type &type, slk::Builder *builder) {
slk::Save(utils::UnderlyingCast(type), builder);
}
void Load(storage::PropertyValue::Type *type, slk::Reader *reader) {
using PVTypeUnderlyingType =
std::underlying_type_t<storage::PropertyValue::Type>;
PVTypeUnderlyingType value;
slk::Load(&value, reader);
bool valid;
switch (value) {
case utils::UnderlyingCast(storage::PropertyValue::Type::Null):
case utils::UnderlyingCast(storage::PropertyValue::Type::Bool):
case utils::UnderlyingCast(storage::PropertyValue::Type::Int):
case utils::UnderlyingCast(storage::PropertyValue::Type::Double):
case utils::UnderlyingCast(storage::PropertyValue::Type::String):
case utils::UnderlyingCast(storage::PropertyValue::Type::List):
case utils::UnderlyingCast(storage::PropertyValue::Type::Map):
valid = true;
break;
default:
valid = false;
break;
}
if (!valid)
throw slk::SlkDecodeException(
"Trying to load unknown storage::PropertyValue!");
*type = static_cast<storage::PropertyValue::Type>(value);
}
void Save(const storage::PropertyValue &value, slk::Builder *builder) {
switch (value.type()) {
case storage::PropertyValue::Type::Null:
slk::Save(storage::PropertyValue::Type::Null, builder);
return;
case storage::PropertyValue::Type::Bool:
slk::Save(storage::PropertyValue::Type::Bool, builder);
slk::Save(value.ValueBool(), builder);
return;
case storage::PropertyValue::Type::Int:
slk::Save(storage::PropertyValue::Type::Int, builder);
slk::Save(value.ValueInt(), builder);
return;
case storage::PropertyValue::Type::Double:
slk::Save(storage::PropertyValue::Type::Double, builder);
slk::Save(value.ValueDouble(), builder);
return;
case storage::PropertyValue::Type::String:
slk::Save(storage::PropertyValue::Type::String, builder);
slk::Save(value.ValueString(), builder);
return;
case storage::PropertyValue::Type::List: {
slk::Save(storage::PropertyValue::Type::List, builder);
const auto &values = value.ValueList();
size_t size = values.size();
slk::Save(size, builder);
for (const auto &v : values) {
slk::Save(v, builder);
}
return;
}
case storage::PropertyValue::Type::Map: {
slk::Save(storage::PropertyValue::Type::Map, builder);
const auto &map = value.ValueMap();
size_t size = map.size();
slk::Save(size, builder);
for (const auto &kv : map) {
slk::Save(kv, builder);
}
return;
}
}
}
void Load(storage::PropertyValue *value, slk::Reader *reader) {
storage::PropertyValue::Type type;
slk::Load(&type, reader);
switch (type) {
case storage::PropertyValue::Type::Null:
*value = storage::PropertyValue();
return;
case storage::PropertyValue::Type::Bool: {
bool v;
slk::Load(&v, reader);
*value = storage::PropertyValue(v);
return;
}
case storage::PropertyValue::Type::Int: {
int64_t v;
slk::Load(&v, reader);
*value = storage::PropertyValue(v);
return;
}
case storage::PropertyValue::Type::Double: {
double v;
slk::Load(&v, reader);
*value = storage::PropertyValue(v);
return;
}
case storage::PropertyValue::Type::String: {
std::string v;
slk::Load(&v, reader);
*value = storage::PropertyValue(std::move(v));
return;
}
case storage::PropertyValue::Type::List: {
size_t size;
slk::Load(&size, reader);
std::vector<storage::PropertyValue> list(size);
for (size_t i = 0; i < size; ++i) {
slk::Load(&list[i], reader);
}
*value = storage::PropertyValue(std::move(list));
return;
}
case storage::PropertyValue::Type::Map: {
size_t size;
slk::Load(&size, reader);
std::map<std::string, storage::PropertyValue> map;
for (size_t i = 0; i < size; ++i) {
std::pair<std::string, storage::PropertyValue> kv;
slk::Load(&kv, reader);
map.insert(kv);
}
*value = storage::PropertyValue(std::move(map));
return;
}
}
}
void Save(const storage::durability::Marker &marker, slk::Builder *builder) {
slk::Save(utils::UnderlyingCast(marker), builder);
}
void Load(storage::durability::Marker *marker, slk::Reader *reader) {
using PVTypeUnderlyingType =
std::underlying_type_t<storage::PropertyValue::Type>;
PVTypeUnderlyingType value;
slk::Load(&value, reader);
*marker = static_cast<storage::durability::Marker>(value);
}
} // namespace slk

View File

@ -0,0 +1,19 @@
#pragma once
#include "slk/serialization.hpp"
#include "storage/v2/durability/marker.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/property_value.hpp"
namespace slk {
void Save(const storage::Gid &gid, slk::Builder *builder);
void Load(storage::Gid *gid, slk::Reader *reader);
void Save(const storage::PropertyValue &value, slk::Builder *builder);
void Load(storage::PropertyValue *value, slk::Reader *reader);
void Save(const storage::durability::Marker &marker, slk::Builder *builder);
void Load(storage::durability::Marker *marker, slk::Reader *reader);
} // namespace slk

View File

@ -1,7 +1,9 @@
#include "storage/v2/storage.hpp"
#include <algorithm>
#include <atomic>
#include <memory>
#include <mutex>
#include <gflags/gflags.h>
#include <glog/logging.h>
@ -10,9 +12,20 @@
#include "storage/v2/durability/paths.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/mvcc.hpp"
#include "utils/rw_lock.hpp"
#include "utils/spin_lock.hpp"
#include "utils/stat.hpp"
#include "utils/uuid.hpp"
#ifdef MG_ENTERPRISE
#include "storage/v2/replication/rpc.hpp"
#endif
#ifdef MG_ENTERPRISE
DEFINE_bool(main, false, "Set to true to be the main");
DEFINE_bool(replica, false, "Set to true to be the replica");
#endif
namespace storage {
auto AdvanceToVisibleVertex(utils::SkipList<Vertex>::Iterator it,
@ -401,12 +414,28 @@ Storage::Storage(Config config)
gc_runner_.Run("Storage GC", config_.gc.interval,
[this] { this->CollectGarbage(); });
}
#ifdef MG_ENTERPRISE
// For testing purposes until we can define the instance type from
// a query.
if (FLAGS_main) {
SetReplicationState(ReplicationState::MAIN);
} else if (FLAGS_replica) {
SetReplicationState(ReplicationState::REPLICA);
}
#endif
}
Storage::~Storage() {
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
gc_runner_.Stop();
}
#ifdef MG_ENTERPRISE
if (replication_server_) {
replication_server_->Shutdown();
replication_server_->AwaitShutdown();
}
#endif
wal_file_ = std::nullopt;
if (config_.durability.snapshot_wal_mode !=
Config::Durability::SnapshotWalMode::DISABLED) {
@ -467,6 +496,29 @@ VertexAccessor Storage::Accessor::CreateVertex() {
&storage_->constraints_, config_);
}
#ifdef MG_ENTERPRISE
VertexAccessor Storage::Accessor::CreateVertex(storage::Gid gid) {
// NOTE: When we update the next `vertex_id_` here we perform a RMW
// (read-modify-write) operation that ISN'T atomic! But, that isn't an issue
// because this function is only called from the replication delta applier
// that runs single-threadedly and while this instance is set-up to apply
// threads (it is the replica), it is guaranteed that no other writes are
// possible.
storage_->vertex_id_.store(
std::max(storage_->vertex_id_.load(std::memory_order_acquire),
gid.AsUint() + 1),
std::memory_order_release);
auto acc = storage_->vertices_.access();
auto delta = CreateDeleteObjectDelta(&transaction_);
auto [it, inserted] = acc.insert(Vertex{gid, delta});
CHECK(inserted) << "The vertex must be inserted here!";
CHECK(it != acc.end()) << "Invalid Vertex accessor!";
delta->prev.Set(&*it);
return VertexAccessor(&*it, &transaction_, &storage_->indices_,
&storage_->constraints_, config_);
}
#endif
std::optional<VertexAccessor> Storage::Accessor::FindVertex(Gid gid,
View view) {
auto acc = storage_->vertices_.access();
@ -625,6 +677,84 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from,
&storage_->indices_, &storage_->constraints_, config_);
}
#ifdef MG_ENTERPRISE
Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from,
VertexAccessor *to,
EdgeTypeId edge_type,
storage::Gid gid) {
CHECK(from->transaction_ == to->transaction_)
<< "VertexAccessors must be from the same transaction when creating "
"an edge!";
CHECK(from->transaction_ == &transaction_)
<< "VertexAccessors must be from the same transaction in when "
"creating an edge!";
auto from_vertex = from->vertex_;
auto to_vertex = to->vertex_;
// Obtain the locks by `gid` order to avoid lock cycles.
std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock,
std::defer_lock);
std::unique_lock<utils::SpinLock> guard_to(to_vertex->lock, std::defer_lock);
if (from_vertex->gid < to_vertex->gid) {
guard_from.lock();
guard_to.lock();
} else if (from_vertex->gid > to_vertex->gid) {
guard_to.lock();
guard_from.lock();
} else {
// The vertices are the same vertex, only lock one.
guard_from.lock();
}
if (!PrepareForWrite(&transaction_, from_vertex))
return Error::SERIALIZATION_ERROR;
if (from_vertex->deleted) return Error::DELETED_OBJECT;
if (to_vertex != from_vertex) {
if (!PrepareForWrite(&transaction_, to_vertex))
return Error::SERIALIZATION_ERROR;
if (to_vertex->deleted) return Error::DELETED_OBJECT;
}
// NOTE: When we update the next `edge_id_` here we perform a RMW
// (read-modify-write) operation that ISN'T atomic! But, that isn't an issue
// because this function is only called from the replication delta applier
// that runs single-threadedly and while this instance is set-up to apply
// threads (it is the replica), it is guaranteed that no other writes are
// possible.
storage_->edge_id_.store(
std::max(storage_->edge_id_.load(std::memory_order_acquire),
gid.AsUint() + 1),
std::memory_order_release);
EdgeRef edge(gid);
if (config_.properties_on_edges) {
auto acc = storage_->edges_.access();
auto delta = CreateDeleteObjectDelta(&transaction_);
auto [it, inserted] = acc.insert(Edge(gid, delta));
CHECK(inserted) << "The edge must be inserted here!";
CHECK(it != acc.end()) << "Invalid Edge accessor!";
edge = EdgeRef(&*it);
delta->prev.Set(&*it);
}
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(),
edge_type, to_vertex, edge);
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(),
edge_type, from_vertex, edge);
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
// Increment edge count.
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, &transaction_,
&storage_->indices_, &storage_->constraints_, config_);
}
#endif
Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) {
CHECK(edge->transaction_ == &transaction_)
<< "EdgeAccessor must be from the same transaction as the storage "
@ -743,7 +873,16 @@ EdgeTypeId Storage::Accessor::NameToEdgeType(const std::string_view &name) {
void Storage::Accessor::AdvanceCommand() { ++transaction_.command_id; }
#ifdef MG_ENTERPRISE
utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit() {
return Commit(std::nullopt);
}
utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit(
std::optional<uint64_t> desired_commit_timestamp) {
#else
utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit() {
#endif
CHECK(is_transaction_active_) << "The transaction is already terminated!";
CHECK(!transaction_.must_abort) << "The transaction can't be committed!";
@ -780,7 +919,17 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit() {
{
std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_);
#ifdef MG_ENTERPRISE
if (!desired_commit_timestamp) {
commit_timestamp = storage_->timestamp_++;
} else {
commit_timestamp = *desired_commit_timestamp;
storage_->timestamp_ =
std::max(storage_->timestamp_, *desired_commit_timestamp + 1);
}
#else
commit_timestamp = storage_->timestamp_++;
#endif
// Before committing and validating vertices against unique constraints,
// we have to update unique constraints with the vertices that are going
@ -1223,7 +1372,15 @@ Transaction Storage::CreateTransaction() {
{
std::lock_guard<utils::SpinLock> guard(engine_lock_);
transaction_id = transaction_id_++;
#ifdef MG_ENTERPRISE
if (replication_state_.load() != ReplicationState::REPLICA) {
start_timestamp = timestamp_++;
} else {
start_timestamp = timestamp_;
}
#else
start_timestamp = timestamp_++;
#endif
}
return {transaction_id, start_timestamp};
}
@ -1490,6 +1647,20 @@ void Storage::AppendToWal(const Transaction &transaction,
// A single transaction will always be contained in a single WAL file.
auto current_commit_timestamp =
transaction.commit_timestamp->load(std::memory_order_acquire);
#ifdef MG_ENTERPRISE
std::shared_lock<utils::RWLock> replication_guard(replication_lock_);
std::optional<replication::ReplicationClient::Handler> stream;
if (replication_client_) {
try {
stream.emplace(replication_client_->ReplicateTransaction());
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
}
}
replication_guard.unlock();
#endif
// Helper lambda that traverses the delta chain on order to find the first
// delta that should be processed and then appends all discovered deltas.
auto find_and_apply_deltas = [&](const auto *delta, const auto &parent,
@ -1505,6 +1676,15 @@ void Storage::AppendToWal(const Transaction &transaction,
while (true) {
if (filter(delta->action)) {
wal_file_->AppendDelta(*delta, parent, final_commit_timestamp);
#ifdef MG_ENTERPRISE
if (stream) {
try {
stream->AppendDelta(*delta, parent, final_commit_timestamp);
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
}
}
#endif
}
auto prev = delta->prev.Get();
if (prev.type != PreviousPtr::Type::DELTA) break;
@ -1637,6 +1817,17 @@ void Storage::AppendToWal(const Transaction &transaction,
wal_file_->AppendTransactionEnd(final_commit_timestamp);
FinalizeWalFile();
#ifdef MG_ENTERPRISE
if (stream) {
try {
stream->AppendTransactionEnd(final_commit_timestamp);
stream->Finalize();
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
}
}
#endif
}
void Storage::AppendToWal(durability::StorageGlobalOperation operation,
@ -1645,7 +1836,438 @@ void Storage::AppendToWal(durability::StorageGlobalOperation operation,
if (!InitializeWalFile()) return;
wal_file_->AppendOperation(operation, label, properties,
final_commit_timestamp);
#ifdef MG_ENTERPRISE
std::shared_lock<utils::RWLock> replication_guard(replication_lock_);
if (replication_client_) {
auto stream = replication_client_->ReplicateTransaction();
try {
stream.AppendOperation(operation, label, properties,
final_commit_timestamp);
stream.Finalize();
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
}
}
replication_guard.unlock();
#endif
FinalizeWalFile();
}
#ifdef MG_ENTERPRISE
void Storage::ConfigureReplica() {
// Create RPC server.
// TODO(mferencevic): Add support for SSL.
replication_server_context_.emplace();
// NOTE: The replication server must have a single thread for processing
// because there is no need for more processing threads - each replica can
// have only a single main server. Also, the single-threaded guarantee
// simplifies the rest of the implementation.
// TODO(mferencevic): Make endpoint configurable.
replication_server_.emplace(io::network::Endpoint{"127.0.0.1", 10000},
&*replication_server_context_,
/* workers_count = */ 1);
replication_server_->Register<AppendDeltasRpc>([this](auto *req_reader,
auto *res_builder) {
AppendDeltasReq req;
slk::Load(&req, req_reader);
DLOG(INFO) << "Received AppendDeltasRpc:";
replication::Decoder decoder(req_reader);
auto edge_acc = edges_.access();
auto vertex_acc = vertices_.access();
std::optional<std::pair<uint64_t, storage::Storage::Accessor>>
commit_timestamp_and_accessor;
auto get_transaction =
[this, &commit_timestamp_and_accessor](uint64_t commit_timestamp) {
if (!commit_timestamp_and_accessor) {
commit_timestamp_and_accessor.emplace(commit_timestamp, Access());
} else if (commit_timestamp_and_accessor->first != commit_timestamp) {
throw utils::BasicException("Received more than one transaction!");
}
return &commit_timestamp_and_accessor->second;
};
bool transaction_complete = false;
for (uint64_t i = 0; !transaction_complete; ++i) {
uint64_t timestamp;
durability::WalDeltaData delta;
try {
timestamp = ReadWalDeltaHeader(&decoder);
DLOG(INFO) << " Delta " << i;
DLOG(INFO) << " Timestamp " << timestamp;
delta = ReadWalDeltaData(&decoder);
} catch (const slk::SlkReaderException &) {
throw utils::BasicException("Missing data!");
} catch (const durability::RecoveryFailure &) {
throw utils::BasicException("Invalid data!");
}
switch (delta.type) {
case durability::WalDeltaData::Type::VERTEX_CREATE: {
DLOG(INFO) << " Create vertex "
<< delta.vertex_create_delete.gid.AsUint();
auto transaction = get_transaction(timestamp);
transaction->CreateVertex(delta.vertex_create_delete.gid);
break;
}
case durability::WalDeltaData::Type::VERTEX_DELETE: {
DLOG(INFO) << " Delete vertex "
<< delta.vertex_create_delete.gid.AsUint();
auto transaction = get_transaction(timestamp);
auto vertex = transaction->FindVertex(delta.vertex_create_delete.gid,
storage::View::NEW);
if (!vertex) throw utils::BasicException("Invalid transaction!");
auto ret = transaction->DeleteVertex(&*vertex);
if (ret.HasError() || !ret.GetValue())
throw utils::BasicException("Invalid transaction!");
break;
}
case durability::WalDeltaData::Type::VERTEX_ADD_LABEL: {
DLOG(INFO) << " Vertex "
<< delta.vertex_add_remove_label.gid.AsUint()
<< " add label " << delta.vertex_add_remove_label.label;
auto transaction = get_transaction(timestamp);
auto vertex = transaction->FindVertex(
delta.vertex_add_remove_label.gid, storage::View::NEW);
if (!vertex) throw utils::BasicException("Invalid transaction!");
auto ret = vertex->AddLabel(
transaction->NameToLabel(delta.vertex_add_remove_label.label));
if (ret.HasError() || !ret.GetValue())
throw utils::BasicException("Invalid transaction!");
break;
}
case durability::WalDeltaData::Type::VERTEX_REMOVE_LABEL: {
DLOG(INFO) << " Vertex "
<< delta.vertex_add_remove_label.gid.AsUint()
<< " remove label " << delta.vertex_add_remove_label.label;
auto transaction = get_transaction(timestamp);
auto vertex = transaction->FindVertex(
delta.vertex_add_remove_label.gid, storage::View::NEW);
if (!vertex) throw utils::BasicException("Invalid transaction!");
auto ret = vertex->RemoveLabel(
transaction->NameToLabel(delta.vertex_add_remove_label.label));
if (ret.HasError() || !ret.GetValue())
throw utils::BasicException("Invalid transaction!");
break;
}
case durability::WalDeltaData::Type::VERTEX_SET_PROPERTY: {
DLOG(INFO) << " Vertex "
<< delta.vertex_edge_set_property.gid.AsUint()
<< " set property "
<< delta.vertex_edge_set_property.property << " to "
<< delta.vertex_edge_set_property.value;
auto transaction = get_transaction(timestamp);
auto vertex = transaction->FindVertex(
delta.vertex_edge_set_property.gid, storage::View::NEW);
if (!vertex) throw utils::BasicException("Invalid transaction!");
auto ret =
vertex->SetProperty(transaction->NameToProperty(
delta.vertex_edge_set_property.property),
delta.vertex_edge_set_property.value);
if (ret.HasError())
throw utils::BasicException("Invalid transaction!");
break;
}
case durability::WalDeltaData::Type::EDGE_CREATE: {
DLOG(INFO) << " Create edge "
<< delta.edge_create_delete.gid.AsUint() << " of type "
<< delta.edge_create_delete.edge_type << " from vertex "
<< delta.edge_create_delete.from_vertex.AsUint()
<< " to vertex "
<< delta.edge_create_delete.to_vertex.AsUint();
auto transaction = get_transaction(timestamp);
auto from_vertex = transaction->FindVertex(
delta.edge_create_delete.from_vertex, storage::View::NEW);
if (!from_vertex) throw utils::BasicException("Invalid transaction!");
auto to_vertex = transaction->FindVertex(
delta.edge_create_delete.to_vertex, storage::View::NEW);
if (!to_vertex) throw utils::BasicException("Invalid transaction!");
auto edge = transaction->CreateEdge(
&*from_vertex, &*to_vertex,
transaction->NameToEdgeType(delta.edge_create_delete.edge_type),
delta.edge_create_delete.gid);
if (edge.HasError())
throw utils::BasicException("Invalid transaction!");
break;
}
case durability::WalDeltaData::Type::EDGE_DELETE: {
DLOG(INFO) << " Delete edge "
<< delta.edge_create_delete.gid.AsUint() << " of type "
<< delta.edge_create_delete.edge_type << " from vertex "
<< delta.edge_create_delete.from_vertex.AsUint()
<< " to vertex "
<< delta.edge_create_delete.to_vertex.AsUint();
auto transaction = get_transaction(timestamp);
auto from_vertex = transaction->FindVertex(
delta.edge_create_delete.from_vertex, storage::View::NEW);
if (!from_vertex) throw utils::BasicException("Invalid transaction!");
auto to_vertex = transaction->FindVertex(
delta.edge_create_delete.to_vertex, storage::View::NEW);
if (!to_vertex) throw utils::BasicException("Invalid transaction!");
auto edges = from_vertex->OutEdges(
storage::View::NEW,
{transaction->NameToEdgeType(delta.edge_create_delete.edge_type)},
&*to_vertex);
if (edges.HasError())
throw utils::BasicException("Invalid transaction!");
if (edges->size() != 1)
throw utils::BasicException("Invalid transaction!");
auto &edge = (*edges)[0];
auto ret = transaction->DeleteEdge(&edge);
if (ret.HasError())
throw utils::BasicException("Invalid transaction!");
break;
}
case durability::WalDeltaData::Type::EDGE_SET_PROPERTY: {
DLOG(INFO) << " Edge "
<< delta.vertex_edge_set_property.gid.AsUint()
<< " set property "
<< delta.vertex_edge_set_property.property << " to "
<< delta.vertex_edge_set_property.value;
if (!config_.items.properties_on_edges)
throw utils::BasicException(
"Can't set properties on edges because properties on edges "
"are disabled!");
auto transaction = get_transaction(timestamp);
// The following block of code effectively implements `FindEdge` and
// yields an accessor that is only valid for managing the edge's
// properties.
auto edge = edge_acc.find(delta.vertex_edge_set_property.gid);
if (edge == edge_acc.end())
throw utils::BasicException("Invalid transaction!");
// The edge visibility check must be done here manually because we
// don't allow direct access to the edges through the public API.
{
bool is_visible = true;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(edge->lock);
is_visible = !edge->deleted;
delta = edge->delta;
}
ApplyDeltasForRead(&transaction->transaction_, delta, View::NEW,
[&is_visible](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
break;
case Delta::Action::RECREATE_OBJECT: {
is_visible = true;
break;
}
case Delta::Action::DELETE_OBJECT: {
is_visible = false;
break;
}
}
});
if (!is_visible)
throw utils::BasicException("Invalid transaction!");
}
EdgeRef edge_ref(&*edge);
// Here we create an edge accessor that we will use to get the
// properties of the edge. The accessor is created with an invalid
// type and invalid from/to pointers because we don't know them
// here, but that isn't an issue because we won't use that part of
// the API here.
auto ea = EdgeAccessor{edge_ref,
EdgeTypeId::FromUint(0UL),
nullptr,
nullptr,
&transaction->transaction_,
&indices_,
&constraints_,
config_.items};
auto ret =
ea.SetProperty(transaction->NameToProperty(
delta.vertex_edge_set_property.property),
delta.vertex_edge_set_property.value);
if (ret.HasError())
throw utils::BasicException("Invalid transaction!");
break;
}
case durability::WalDeltaData::Type::TRANSACTION_END: {
DLOG(INFO) << " Transaction end";
if (!commit_timestamp_and_accessor ||
commit_timestamp_and_accessor->first != timestamp)
throw utils::BasicException("Invalid data!");
auto ret = commit_timestamp_and_accessor->second.Commit(
commit_timestamp_and_accessor->first);
if (ret.HasError())
throw utils::BasicException("Invalid transaction!");
commit_timestamp_and_accessor = std::nullopt;
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::LABEL_INDEX_CREATE: {
DLOG(INFO) << " Create label index on :"
<< delta.operation_label.label;
if (commit_timestamp_and_accessor)
throw utils::BasicException("Invalid transaction!");
if (!CreateIndex(NameToLabel(delta.operation_label.label)))
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::LABEL_INDEX_DROP: {
DLOG(INFO) << " Drop label index on :"
<< delta.operation_label.label;
if (commit_timestamp_and_accessor)
throw utils::BasicException("Invalid transaction!");
if (!DropIndex(NameToLabel(delta.operation_label.label)))
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE: {
DLOG(INFO) << " Create label+property index on :"
<< delta.operation_label_property.label << " ("
<< delta.operation_label_property.property << ")";
if (commit_timestamp_and_accessor)
throw utils::BasicException("Invalid transaction!");
if (!CreateIndex(
NameToLabel(delta.operation_label_property.label),
NameToProperty(delta.operation_label_property.property)))
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP: {
DLOG(INFO) << " Drop label+property index on :"
<< delta.operation_label_property.label << " ("
<< delta.operation_label_property.property << ")";
if (commit_timestamp_and_accessor)
throw utils::BasicException("Invalid transaction!");
if (!DropIndex(
NameToLabel(delta.operation_label_property.label),
NameToProperty(delta.operation_label_property.property)))
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE: {
DLOG(INFO) << " Create existence constraint on :"
<< delta.operation_label_property.label << " ("
<< delta.operation_label_property.property << ")";
if (commit_timestamp_and_accessor)
throw utils::BasicException("Invalid transaction!");
auto ret = CreateExistenceConstraint(
NameToLabel(delta.operation_label_property.label),
NameToProperty(delta.operation_label_property.property));
if (!ret.HasValue() || !ret.GetValue())
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: {
DLOG(INFO) << " Drop existence constraint on :"
<< delta.operation_label_property.label << " ("
<< delta.operation_label_property.property << ")";
if (commit_timestamp_and_accessor)
throw utils::BasicException("Invalid transaction!");
if (!DropExistenceConstraint(
NameToLabel(delta.operation_label_property.label),
NameToProperty(delta.operation_label_property.property)))
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE: {
std::stringstream ss;
utils::PrintIterable(ss, delta.operation_label_properties.properties);
DLOG(INFO) << " Create unique constraint on :"
<< delta.operation_label_properties.label << " ("
<< ss.str() << ")";
if (commit_timestamp_and_accessor)
throw utils::BasicException("Invalid transaction!");
std::set<PropertyId> properties;
for (const auto &prop : delta.operation_label_properties.properties) {
properties.emplace(NameToProperty(prop));
}
auto ret = CreateUniqueConstraint(
NameToLabel(delta.operation_label_properties.label), properties);
if (!ret.HasValue() ||
ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS)
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
case durability::WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP: {
std::stringstream ss;
utils::PrintIterable(ss, delta.operation_label_properties.properties);
DLOG(INFO) << " Drop unique constraint on :"
<< delta.operation_label_properties.label << " ("
<< ss.str() << ")";
if (commit_timestamp_and_accessor)
throw utils::BasicException("Invalid transaction!");
std::set<PropertyId> properties;
for (const auto &prop : delta.operation_label_properties.properties) {
properties.emplace(NameToProperty(prop));
}
auto ret = DropUniqueConstraint(
NameToLabel(delta.operation_label_properties.label), properties);
if (ret != UniqueConstraints::DeletionStatus::SUCCESS)
throw utils::BasicException("Invalid transaction!");
transaction_complete = true;
break;
}
}
}
if (commit_timestamp_and_accessor)
throw utils::BasicException("Invalid data!");
AppendDeltasRes res;
slk::Save(res, res_builder);
});
replication_server_->Start();
}
void Storage::ConfigureMain() {
replication_client_.emplace(&name_id_mapper_, config_.items,
io::network::Endpoint{"127.0.0.1", 10000}, false);
}
void Storage::SetReplicationState(const ReplicationState state) {
if (replication_state_.load(std::memory_order_acquire) == state) {
return;
}
std::unique_lock<utils::RWLock> replication_guard(replication_lock_);
replication_server_.reset();
replication_server_context_.reset();
replication_client_.reset();
switch (state) {
case ReplicationState::MAIN:
ConfigureMain();
break;
case ReplicationState::REPLICA:
ConfigureReplica();
break;
case ReplicationState::NONE:
default:
break;
}
replication_state_.store(state, std::memory_order_release);
}
#endif
} // namespace storage

View File

@ -1,5 +1,6 @@
#pragma once
#include <atomic>
#include <filesystem>
#include <optional>
#include <shared_mutex>
@ -22,6 +23,13 @@
#include "utils/skip_list.hpp"
#include "utils/synchronized.hpp"
#ifdef MG_ENTERPRISE
#include "rpc/server.hpp"
#include "storage/v2/replication/replication.hpp"
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/replication/serialization.hpp"
#endif
namespace storage {
// The storage is based on this paper:
@ -159,6 +167,10 @@ struct StorageInfo {
uint64_t disk_usage;
};
#ifdef MG_ENTERPRISE
enum class ReplicationState : uint8_t { NONE, MAIN, REPLICA };
#endif
class Storage final {
public:
/// @throw std::system_error
@ -305,6 +317,19 @@ class Storage final {
void Abort();
private:
#ifdef MG_ENTERPRISE
/// @throw std::bad_alloc
VertexAccessor CreateVertex(storage::Gid gid);
/// @throw std::bad_alloc
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to,
EdgeTypeId edge_type, storage::Gid gid);
/// @throw std::bad_alloc
utils::BasicResult<ConstraintViolation, void> Commit(
std::optional<uint64_t> desired_commit_timestamp);
#endif
Storage *storage_;
std::shared_lock<utils::RWLock> storage_guard_;
Transaction transaction_;
@ -379,6 +404,10 @@ class Storage final {
StorageInfo GetInfo() const;
#ifdef MG_ENTERPRISE
void SetReplicationState(ReplicationState state);
#endif
private:
Transaction CreateTransaction();
@ -395,6 +424,11 @@ class Storage final {
const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp);
#ifdef MG_ENTERPRISE
void ConfigureReplica();
void ConfigureMain();
#endif
// Main storage lock.
//
// Accessors take a shared lock when starting, so it is possible to block
@ -467,6 +501,16 @@ class Storage final {
std::optional<durability::WalFile> wal_file_;
uint64_t wal_unsynced_transactions_{0};
// Replication
#ifdef MG_ENTERPRISE
utils::RWLock replication_lock_{utils::RWLock::Priority::WRITE};
std::optional<communication::ServerContext> replication_server_context_;
std::optional<rpc::Server> replication_server_;
// TODO(mferencevic): Add support for multiple clients.
std::optional<replication::ReplicationClient> replication_client_;
std::atomic<ReplicationState> replication_state_{ReplicationState::NONE};
#endif
};
} // namespace storage

View File

@ -234,6 +234,10 @@ target_link_libraries(${test_prefix}storage_v2_property_store mg-storage-v2 fmt)
add_unit_test(storage_v2_wal_file.cpp)
target_link_libraries(${test_prefix}storage_v2_wal_file mg-storage-v2 fmt)
if (MG_ENTERPRISE)
add_unit_test(storage_v2_replication.cpp)
target_link_libraries(${test_prefix}storage_v2_replication mg-storage-v2 fmt)
endif()
# Test mg-auth
@ -246,10 +250,11 @@ endif()
# Test mg-slk
if (MG_ENTERPRISE)
## TODO: REPLACE single-node-ha
#add_unit_test(slk_advanced.cpp)
#target_link_libraries(${test_prefix}slk_advanced mg-single-node-ha mg-kvstore-dummy)
add_unit_test(slk_advanced.cpp)
target_link_libraries(${test_prefix}slk_advanced mg-storage-v2)
endif()
if (MG_ENTERPRISE)
add_unit_test(slk_core.cpp)
target_link_libraries(${test_prefix}slk_core mg-slk glog gflags fmt)

View File

@ -1,6 +1,6 @@
#include <gtest/gtest.h>
#include "storage/common/types/slk.hpp"
#include "storage/v2/replication/slk.hpp"
#include "slk_common.hpp"

View File

@ -0,0 +1,252 @@
#include <chrono>
#include <thread>
#include <fmt/format.h>
#include <gmock/gmock-generated-matchers.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <storage/v2/property_value.hpp>
#include <storage/v2/storage.hpp>
using testing::UnorderedElementsAre;
TEST(ReplicationTest, BasicSynchronousReplicationTest) {
std::filesystem::path storage_directory{
std::filesystem::temp_directory_path() /
"MG_test_unit_storage_v2_replication"};
storage::Storage main_store(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL,
}});
main_store.SetReplicationState(storage::ReplicationState::MAIN);
storage::Storage replica_store(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL,
}});
replica_store.SetReplicationState(storage::ReplicationState::REPLICA);
// vertex create
// vertex add label
// vertex set property
const auto *vertex_label = "vertex_label";
const auto *vertex_property = "vertex_property";
const auto *vertex_property_value = "vertex_property_value";
std::optional<storage::Gid> vertex_gid;
{
auto acc = main_store.Access();
auto v = acc.CreateVertex();
vertex_gid.emplace(v.Gid());
ASSERT_TRUE(v.AddLabel(main_store.NameToLabel(vertex_label)).HasValue());
ASSERT_TRUE(v.SetProperty(main_store.NameToProperty(vertex_property),
storage::PropertyValue(vertex_property_value))
.HasValue());
ASSERT_FALSE(acc.Commit().HasError());
}
{
auto acc = replica_store.Access();
const auto v = acc.FindVertex(*vertex_gid, storage::View::OLD);
ASSERT_TRUE(v);
const auto labels = v->Labels(storage::View::OLD);
ASSERT_TRUE(labels.HasValue());
ASSERT_EQ(labels->size(), 1);
ASSERT_THAT(*labels,
UnorderedElementsAre(replica_store.NameToLabel(vertex_label)));
const auto properties = v->Properties(storage::View::OLD);
ASSERT_TRUE(properties.HasValue());
ASSERT_EQ(properties->size(), 1);
ASSERT_THAT(*properties,
UnorderedElementsAre(std::make_pair(
replica_store.NameToProperty(vertex_property),
storage::PropertyValue(vertex_property_value))));
ASSERT_FALSE(acc.Commit().HasError());
}
// vertex remove label
{
auto acc = main_store.Access();
auto v = acc.FindVertex(*vertex_gid, storage::View::OLD);
ASSERT_TRUE(v);
ASSERT_TRUE(
v->RemoveLabel(main_store.NameToLabel(vertex_label)).HasValue());
ASSERT_FALSE(acc.Commit().HasError());
}
{
auto acc = replica_store.Access();
const auto v = acc.FindVertex(*vertex_gid, storage::View::OLD);
ASSERT_TRUE(v);
const auto labels = v->Labels(storage::View::OLD);
ASSERT_TRUE(labels.HasValue());
ASSERT_EQ(labels->size(), 0);
ASSERT_FALSE(acc.Commit().HasError());
}
// vertex delete
{
auto acc = main_store.Access();
auto v = acc.FindVertex(*vertex_gid, storage::View::OLD);
ASSERT_TRUE(v);
ASSERT_TRUE(acc.DeleteVertex(&*v).HasValue());
ASSERT_FALSE(acc.Commit().HasError());
}
{
auto acc = replica_store.Access();
const auto v = acc.FindVertex(*vertex_gid, storage::View::OLD);
ASSERT_FALSE(v);
vertex_gid.reset();
ASSERT_FALSE(acc.Commit().HasError());
}
// edge create
// edge set property
const auto *edge_type = "edge_type";
const auto *edge_property = "edge_property";
const auto *edge_property_value = "edge_property_value";
std::optional<storage::Gid> edge_gid;
{
auto acc = main_store.Access();
auto v = acc.CreateVertex();
vertex_gid.emplace(v.Gid());
auto edge = acc.CreateEdge(&v, &v, main_store.NameToEdgeType(edge_type));
ASSERT_TRUE(edge.HasValue());
ASSERT_TRUE(edge->SetProperty(main_store.NameToProperty(edge_property),
storage::PropertyValue(edge_property_value))
.HasValue());
edge_gid.emplace(edge->Gid());
ASSERT_FALSE(acc.Commit().HasError());
}
const auto find_edge =
[&](const auto &edges,
const storage::Gid edge_gid) -> std::optional<storage::EdgeAccessor> {
for (const auto &edge : edges) {
if (edge.Gid() == edge_gid) {
return edge;
}
}
return std::nullopt;
};
{
auto acc = replica_store.Access();
const auto v = acc.FindVertex(*vertex_gid, storage::View::OLD);
ASSERT_TRUE(v);
const auto out_edges = v->OutEdges(storage::View::OLD);
ASSERT_TRUE(out_edges.HasValue());
const auto edge = find_edge(*out_edges, *edge_gid);
ASSERT_EQ(edge->EdgeType(), replica_store.NameToEdgeType(edge_type));
const auto properties = edge->Properties(storage::View::OLD);
ASSERT_TRUE(properties.HasValue());
ASSERT_EQ(properties->size(), 1);
ASSERT_THAT(*properties, UnorderedElementsAre(std::make_pair(
replica_store.NameToProperty(edge_property),
storage::PropertyValue(edge_property_value))));
ASSERT_FALSE(acc.Commit().HasError());
}
// delete edge
{
auto acc = main_store.Access();
auto v = acc.FindVertex(*vertex_gid, storage::View::OLD);
ASSERT_TRUE(v);
auto out_edges = v->OutEdges(storage::View::OLD);
auto edge = find_edge(*out_edges, *edge_gid);
ASSERT_TRUE(edge);
ASSERT_TRUE(acc.DeleteEdge(&*edge).HasValue());
ASSERT_FALSE(acc.Commit().HasError());
}
{
auto acc = replica_store.Access();
const auto v = acc.FindVertex(*vertex_gid, storage::View::OLD);
ASSERT_TRUE(v);
const auto out_edges = v->OutEdges(storage::View::OLD);
ASSERT_TRUE(out_edges.HasValue());
ASSERT_FALSE(find_edge(*out_edges, *edge_gid));
ASSERT_FALSE(acc.Commit().HasError());
}
// label index create
// label property index create
// existence constraint create
// unique constriant create
const auto *label = "label";
const auto *property = "property";
const auto *property_extra = "property_extra";
{
ASSERT_TRUE(main_store.CreateIndex(main_store.NameToLabel(label)));
ASSERT_TRUE(main_store.CreateIndex(main_store.NameToLabel(label),
main_store.NameToProperty(property)));
ASSERT_FALSE(
main_store
.CreateExistenceConstraint(main_store.NameToLabel(label),
main_store.NameToProperty(property))
.HasError());
ASSERT_FALSE(
main_store
.CreateUniqueConstraint(main_store.NameToLabel(label),
{main_store.NameToProperty(property),
main_store.NameToProperty(property_extra)})
.HasError());
}
{
const auto indices = replica_store.ListAllIndices();
ASSERT_THAT(indices.label,
UnorderedElementsAre(replica_store.NameToLabel(label)));
ASSERT_THAT(indices.label_property,
UnorderedElementsAre(
std::make_pair(replica_store.NameToLabel(label),
replica_store.NameToProperty(property))));
const auto constraints = replica_store.ListAllConstraints();
ASSERT_THAT(constraints.existence,
UnorderedElementsAre(
std::make_pair(replica_store.NameToLabel(label),
replica_store.NameToProperty(property))));
ASSERT_THAT(constraints.unique,
UnorderedElementsAre(std::make_pair(
replica_store.NameToLabel(label),
std::set{replica_store.NameToProperty(property),
replica_store.NameToProperty(property_extra)})));
}
// label index drop
// label property index drop
// existence constraint drop
// unique constriant drop
{
ASSERT_TRUE(main_store.DropIndex(main_store.NameToLabel(label)));
ASSERT_TRUE(main_store.DropIndex(main_store.NameToLabel(label),
main_store.NameToProperty(property)));
ASSERT_TRUE(main_store.DropExistenceConstraint(
main_store.NameToLabel(label), main_store.NameToProperty(property)));
ASSERT_EQ(main_store.DropUniqueConstraint(
main_store.NameToLabel(label),
{main_store.NameToProperty(property),
main_store.NameToProperty(property_extra)}),
storage::UniqueConstraints::DeletionStatus::SUCCESS);
}
{
const auto indices = replica_store.ListAllIndices();
ASSERT_EQ(indices.label.size(), 0);
ASSERT_EQ(indices.label_property.size(), 0);
const auto constraints = replica_store.ListAllConstraints();
ASSERT_EQ(constraints.existence.size(), 0);
ASSERT_EQ(constraints.unique.size(), 0);
}
}