diff --git a/src/storage/v2/CMakeLists.txt b/src/storage/v2/CMakeLists.txt index 6264db977..79e8ef52d 100644 --- a/src/storage/v2/CMakeLists.txt +++ b/src/storage/v2/CMakeLists.txt @@ -10,5 +10,23 @@ set(storage_v2_src_files vertex_accessor.cpp storage.cpp) +if(MG_ENTERPRISE) + define_add_lcp(add_lcp_storage lcp_storage_cpp_files generated_lcp_storage_files) + + add_lcp_storage(replication/rpc.lcp SLK_SERIALIZE) + + add_custom_target(generate_lcp_storage DEPENDS ${generated_lcp_storage_files}) + + set(storage_v2_src_files + ${storage_v2_src_files} + replication/slk.cpp + ${lcp_storage_cpp_files}) +endif() + add_library(mg-storage-v2 STATIC ${storage_v2_src_files}) target_link_libraries(mg-storage-v2 Threads::Threads mg-utils glog gflags) + +if(MG_ENTERPRISE) + add_dependencies(mg-storage-v2 generate_lcp_storage) + target_link_libraries(mg-storage-v2 mg-rpc mg-slk) +endif() diff --git a/src/storage/v2/property_value.hpp b/src/storage/v2/property_value.hpp index 0522dfb19..c2f7aeb7a 100644 --- a/src/storage/v2/property_value.hpp +++ b/src/storage/v2/property_value.hpp @@ -25,7 +25,15 @@ class PropertyValueException : public utils::BasicException { class PropertyValue { public: /// A value type, each type corresponds to exactly one C++ type. - enum class Type : unsigned char { Null, Bool, Int, Double, String, List, Map }; + enum class Type : uint8_t { + Null = 0, + Bool = 1, + Int = 2, + Double = 3, + String = 4, + List = 5, + Map = 6, + }; static bool AreComparableTypes(Type a, Type b) { return (a == b) || (a == Type::Int && b == Type::Double) || diff --git a/src/storage/v2/replication/.gitignore b/src/storage/v2/replication/.gitignore new file mode 100644 index 000000000..8fb0c720c --- /dev/null +++ b/src/storage/v2/replication/.gitignore @@ -0,0 +1,2 @@ +# autogenerated files +rpc.hpp diff --git a/src/storage/v2/replication/replication.hpp b/src/storage/v2/replication/replication.hpp new file mode 100644 index 000000000..b9ff60fda --- /dev/null +++ b/src/storage/v2/replication/replication.hpp @@ -0,0 +1,82 @@ +#pragma once + +#include "rpc/client.hpp" +#include "storage/v2/config.hpp" +#include "storage/v2/delta.hpp" +#include "storage/v2/durability/wal.hpp" +#include "storage/v2/id_types.hpp" +#include "storage/v2/mvcc.hpp" +#include "storage/v2/name_id_mapper.hpp" +#include "storage/v2/property_value.hpp" +#include "storage/v2/replication/rpc.hpp" +#include "storage/v2/replication/serialization.hpp" + +namespace storage::replication { + +class ReplicationClient { + public: + ReplicationClient(NameIdMapper *name_id_mapper, Config::Items items, + const io::network::Endpoint &endpoint, bool use_ssl) + : name_id_mapper_(name_id_mapper), + items_(items), + rpc_context_(use_ssl), + rpc_client_(endpoint, &rpc_context_) {} + + class Handler { + private: + friend class ReplicationClient; + + /// @throw rpc::RpcFailedException + explicit Handler(ReplicationClient *self) + : self_(self), stream_(self_->rpc_client_.Stream()) {} + + public: + /// @throw rpc::RpcFailedException + void AppendDelta(const Delta &delta, const Vertex &vertex, + uint64_t final_commit_timestamp) { + Encoder encoder(stream_.GetBuilder()); + EncodeDelta(&encoder, self_->name_id_mapper_, self_->items_, delta, + vertex, final_commit_timestamp); + } + + /// @throw rpc::RpcFailedException + void AppendDelta(const Delta &delta, const Edge &edge, + uint64_t final_commit_timestamp) { + Encoder encoder(stream_.GetBuilder()); + EncodeDelta(&encoder, self_->name_id_mapper_, delta, edge, + final_commit_timestamp); + } + + /// @throw rpc::RpcFailedException + void AppendTransactionEnd(uint64_t final_commit_timestamp) { + Encoder encoder(stream_.GetBuilder()); + EncodeTransactionEnd(&encoder, final_commit_timestamp); + } + + /// @throw rpc::RpcFailedException + void AppendOperation(durability::StorageGlobalOperation operation, + LabelId label, const std::set &properties, + uint64_t timestamp) { + Encoder encoder(stream_.GetBuilder()); + EncodeOperation(&encoder, self_->name_id_mapper_, operation, label, + properties, timestamp); + } + + /// @throw rpc::RpcFailedException + void Finalize() { stream_.AwaitResponse(); } + + private: + ReplicationClient *self_; + rpc::Client::StreamHandler stream_; + }; + + Handler ReplicateTransaction() { return Handler(this); } + + private: + NameIdMapper *name_id_mapper_; + Config::Items items_; + communication::ClientContext rpc_context_; + rpc::Client rpc_client_; +}; + +} // namespace storage::replication diff --git a/src/storage/v2/replication/rpc.lcp b/src/storage/v2/replication/rpc.lcp new file mode 100644 index 000000000..15c31d836 --- /dev/null +++ b/src/storage/v2/replication/rpc.lcp @@ -0,0 +1,32 @@ +#>cpp +#pragma once + +#include +#include + +#include "rpc/messages.hpp" +#include "slk/serialization.hpp" +#include "slk/streams.hpp" +cpp<# + +;; TODO(mferencevic): Change namespace to `storage::replication` once LCP is +;; updated to support such namespaces. +(lcp:namespace storage) + +(lcp:define-rpc append-deltas + ;; The actual deltas are sent as additional data using the RPC client's + ;; streaming API for additional data. + (:request ()) + (:response + ((success :bool) + (term :uint64_t)))) + +(lcp:define-rpc heartbeat + (:request + ((leader-id :uint16_t) + (term :uint64_t))) + (:response + ((success :bool) + (term :uint64_t)))) + +(lcp:pop-namespace) ;; storage diff --git a/src/storage/v2/replication/serialization.hpp b/src/storage/v2/replication/serialization.hpp new file mode 100644 index 000000000..9d2db59c0 --- /dev/null +++ b/src/storage/v2/replication/serialization.hpp @@ -0,0 +1,124 @@ +#pragma once + +#include "slk/streams.hpp" +#include "storage/v2/durability/serialization.hpp" +#include "storage/v2/replication/slk.hpp" +#include "utils/cast.hpp" + +namespace storage::replication { + +class Encoder final : public durability::BaseEncoder { + public: + explicit Encoder(slk::Builder *builder) : builder_(builder) {} + + void WriteMarker(durability::Marker marker) override { + slk::Save(marker, builder_); + } + + void WriteBool(bool value) override { + WriteMarker(durability::Marker::TYPE_BOOL); + slk::Save(value, builder_); + } + + void WriteUint(uint64_t value) override { + WriteMarker(durability::Marker::TYPE_INT); + slk::Save(value, builder_); + } + + void WriteDouble(double value) override { + WriteMarker(durability::Marker::TYPE_DOUBLE); + slk::Save(value, builder_); + } + + void WriteString(const std::string_view &value) override { + WriteMarker(durability::Marker::TYPE_STRING); + slk::Save(value, builder_); + } + + void WritePropertyValue(const PropertyValue &value) override { + WriteMarker(durability::Marker::TYPE_PROPERTY_VALUE); + slk::Save(value, builder_); + } + + private: + slk::Builder *builder_; +}; + +class Decoder final : public durability::BaseDecoder { + public: + explicit Decoder(slk::Reader *reader) : reader_(reader) {} + + std::optional ReadMarker() override { + durability::Marker marker; + slk::Load(&marker, reader_); + return marker; + } + + std::optional ReadBool() override { + if (const auto marker = ReadMarker(); + !marker || marker != durability::Marker::TYPE_BOOL) + return std::nullopt; + bool value; + slk::Load(&value, reader_); + return value; + } + + std::optional ReadUint() override { + if (const auto marker = ReadMarker(); + !marker || marker != durability::Marker::TYPE_INT) + return std::nullopt; + uint64_t value; + slk::Load(&value, reader_); + return value; + } + + std::optional ReadDouble() override { + if (const auto marker = ReadMarker(); + !marker || marker != durability::Marker::TYPE_DOUBLE) + return std::nullopt; + double value; + slk::Load(&value, reader_); + return value; + } + + std::optional ReadString() override { + if (const auto marker = ReadMarker(); + !marker || marker != durability::Marker::TYPE_STRING) + return std::nullopt; + std::string value; + slk::Load(&value, reader_); + return std::move(value); + } + + std::optional ReadPropertyValue() override { + if (const auto marker = ReadMarker(); + !marker || marker != durability::Marker::TYPE_PROPERTY_VALUE) + return std::nullopt; + PropertyValue value; + slk::Load(&value, reader_); + return std::move(value); + } + + bool SkipString() override { + if (const auto marker = ReadMarker(); + !marker || marker != durability::Marker::TYPE_STRING) + return false; + std::string value; + slk::Load(&value, reader_); + return true; + } + + bool SkipPropertyValue() override { + if (const auto marker = ReadMarker(); + !marker || marker != durability::Marker::TYPE_PROPERTY_VALUE) + return false; + PropertyValue value; + slk::Load(&value, reader_); + return true; + } + + private: + slk::Reader *reader_; +}; + +} // namespace storage::replication diff --git a/src/storage/v2/replication/slk.cpp b/src/storage/v2/replication/slk.cpp new file mode 100644 index 000000000..2d961de8b --- /dev/null +++ b/src/storage/v2/replication/slk.cpp @@ -0,0 +1,161 @@ +#include "storage/v2/replication/slk.hpp" + +#include + +#include "utils/cast.hpp" + +namespace slk { + +void Save(const storage::Gid &gid, slk::Builder *builder) { + slk::Save(gid.AsUint(), builder); +} + +void Load(storage::Gid *gid, slk::Reader *reader) { + uint64_t value; + slk::Load(&value, reader); + *gid = storage::Gid::FromUint(value); +} + +void Save(const storage::PropertyValue::Type &type, slk::Builder *builder) { + slk::Save(utils::UnderlyingCast(type), builder); +} + +void Load(storage::PropertyValue::Type *type, slk::Reader *reader) { + using PVTypeUnderlyingType = + std::underlying_type_t; + PVTypeUnderlyingType value; + slk::Load(&value, reader); + bool valid; + switch (value) { + case utils::UnderlyingCast(storage::PropertyValue::Type::Null): + case utils::UnderlyingCast(storage::PropertyValue::Type::Bool): + case utils::UnderlyingCast(storage::PropertyValue::Type::Int): + case utils::UnderlyingCast(storage::PropertyValue::Type::Double): + case utils::UnderlyingCast(storage::PropertyValue::Type::String): + case utils::UnderlyingCast(storage::PropertyValue::Type::List): + case utils::UnderlyingCast(storage::PropertyValue::Type::Map): + valid = true; + break; + default: + valid = false; + break; + } + if (!valid) + throw slk::SlkDecodeException( + "Trying to load unknown storage::PropertyValue!"); + *type = static_cast(value); +} + +void Save(const storage::PropertyValue &value, slk::Builder *builder) { + switch (value.type()) { + case storage::PropertyValue::Type::Null: + slk::Save(storage::PropertyValue::Type::Null, builder); + return; + case storage::PropertyValue::Type::Bool: + slk::Save(storage::PropertyValue::Type::Bool, builder); + slk::Save(value.ValueBool(), builder); + return; + case storage::PropertyValue::Type::Int: + slk::Save(storage::PropertyValue::Type::Int, builder); + slk::Save(value.ValueInt(), builder); + return; + case storage::PropertyValue::Type::Double: + slk::Save(storage::PropertyValue::Type::Double, builder); + slk::Save(value.ValueDouble(), builder); + return; + case storage::PropertyValue::Type::String: + slk::Save(storage::PropertyValue::Type::String, builder); + slk::Save(value.ValueString(), builder); + return; + case storage::PropertyValue::Type::List: { + slk::Save(storage::PropertyValue::Type::List, builder); + const auto &values = value.ValueList(); + size_t size = values.size(); + slk::Save(size, builder); + for (const auto &v : values) { + slk::Save(v, builder); + } + return; + } + case storage::PropertyValue::Type::Map: { + slk::Save(storage::PropertyValue::Type::Map, builder); + const auto &map = value.ValueMap(); + size_t size = map.size(); + slk::Save(size, builder); + for (const auto &kv : map) { + slk::Save(kv, builder); + } + return; + } + } +} + +void Load(storage::PropertyValue *value, slk::Reader *reader) { + storage::PropertyValue::Type type; + slk::Load(&type, reader); + switch (type) { + case storage::PropertyValue::Type::Null: + *value = storage::PropertyValue(); + return; + case storage::PropertyValue::Type::Bool: { + bool v; + slk::Load(&v, reader); + *value = storage::PropertyValue(v); + return; + } + case storage::PropertyValue::Type::Int: { + int64_t v; + slk::Load(&v, reader); + *value = storage::PropertyValue(v); + return; + } + case storage::PropertyValue::Type::Double: { + double v; + slk::Load(&v, reader); + *value = storage::PropertyValue(v); + return; + } + case storage::PropertyValue::Type::String: { + std::string v; + slk::Load(&v, reader); + *value = storage::PropertyValue(std::move(v)); + return; + } + case storage::PropertyValue::Type::List: { + size_t size; + slk::Load(&size, reader); + std::vector list(size); + for (size_t i = 0; i < size; ++i) { + slk::Load(&list[i], reader); + } + *value = storage::PropertyValue(std::move(list)); + return; + } + case storage::PropertyValue::Type::Map: { + size_t size; + slk::Load(&size, reader); + std::map map; + for (size_t i = 0; i < size; ++i) { + std::pair kv; + slk::Load(&kv, reader); + map.insert(kv); + } + *value = storage::PropertyValue(std::move(map)); + return; + } + } +} + +void Save(const storage::durability::Marker &marker, slk::Builder *builder) { + slk::Save(utils::UnderlyingCast(marker), builder); +} + +void Load(storage::durability::Marker *marker, slk::Reader *reader) { + using PVTypeUnderlyingType = + std::underlying_type_t; + PVTypeUnderlyingType value; + slk::Load(&value, reader); + *marker = static_cast(value); +} + +} // namespace slk diff --git a/src/storage/v2/replication/slk.hpp b/src/storage/v2/replication/slk.hpp new file mode 100644 index 000000000..0652ae211 --- /dev/null +++ b/src/storage/v2/replication/slk.hpp @@ -0,0 +1,19 @@ +#pragma once + +#include "slk/serialization.hpp" +#include "storage/v2/durability/marker.hpp" +#include "storage/v2/id_types.hpp" +#include "storage/v2/property_value.hpp" + +namespace slk { + +void Save(const storage::Gid &gid, slk::Builder *builder); +void Load(storage::Gid *gid, slk::Reader *reader); + +void Save(const storage::PropertyValue &value, slk::Builder *builder); +void Load(storage::PropertyValue *value, slk::Reader *reader); + +void Save(const storage::durability::Marker &marker, slk::Builder *builder); +void Load(storage::durability::Marker *marker, slk::Reader *reader); + +} // namespace slk diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index e348c5649..570f24a8e 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -1,7 +1,9 @@ #include "storage/v2/storage.hpp" #include +#include #include +#include #include #include @@ -10,9 +12,20 @@ #include "storage/v2/durability/paths.hpp" #include "storage/v2/durability/snapshot.hpp" #include "storage/v2/mvcc.hpp" +#include "utils/rw_lock.hpp" +#include "utils/spin_lock.hpp" #include "utils/stat.hpp" #include "utils/uuid.hpp" +#ifdef MG_ENTERPRISE +#include "storage/v2/replication/rpc.hpp" +#endif + +#ifdef MG_ENTERPRISE +DEFINE_bool(main, false, "Set to true to be the main"); +DEFINE_bool(replica, false, "Set to true to be the replica"); +#endif + namespace storage { auto AdvanceToVisibleVertex(utils::SkipList::Iterator it, @@ -401,12 +414,28 @@ Storage::Storage(Config config) gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->CollectGarbage(); }); } + +#ifdef MG_ENTERPRISE + // For testing purposes until we can define the instance type from + // a query. + if (FLAGS_main) { + SetReplicationState(ReplicationState::MAIN); + } else if (FLAGS_replica) { + SetReplicationState(ReplicationState::REPLICA); + } +#endif } Storage::~Storage() { if (config_.gc.type == Config::Gc::Type::PERIODIC) { gc_runner_.Stop(); } +#ifdef MG_ENTERPRISE + if (replication_server_) { + replication_server_->Shutdown(); + replication_server_->AwaitShutdown(); + } +#endif wal_file_ = std::nullopt; if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) { @@ -467,6 +496,29 @@ VertexAccessor Storage::Accessor::CreateVertex() { &storage_->constraints_, config_); } +#ifdef MG_ENTERPRISE +VertexAccessor Storage::Accessor::CreateVertex(storage::Gid gid) { + // NOTE: When we update the next `vertex_id_` here we perform a RMW + // (read-modify-write) operation that ISN'T atomic! But, that isn't an issue + // because this function is only called from the replication delta applier + // that runs single-threadedly and while this instance is set-up to apply + // threads (it is the replica), it is guaranteed that no other writes are + // possible. + storage_->vertex_id_.store( + std::max(storage_->vertex_id_.load(std::memory_order_acquire), + gid.AsUint() + 1), + std::memory_order_release); + auto acc = storage_->vertices_.access(); + auto delta = CreateDeleteObjectDelta(&transaction_); + auto [it, inserted] = acc.insert(Vertex{gid, delta}); + CHECK(inserted) << "The vertex must be inserted here!"; + CHECK(it != acc.end()) << "Invalid Vertex accessor!"; + delta->prev.Set(&*it); + return VertexAccessor(&*it, &transaction_, &storage_->indices_, + &storage_->constraints_, config_); +} +#endif + std::optional Storage::Accessor::FindVertex(Gid gid, View view) { auto acc = storage_->vertices_.access(); @@ -625,6 +677,84 @@ Result Storage::Accessor::CreateEdge(VertexAccessor *from, &storage_->indices_, &storage_->constraints_, config_); } +#ifdef MG_ENTERPRISE +Result Storage::Accessor::CreateEdge(VertexAccessor *from, + VertexAccessor *to, + EdgeTypeId edge_type, + storage::Gid gid) { + CHECK(from->transaction_ == to->transaction_) + << "VertexAccessors must be from the same transaction when creating " + "an edge!"; + CHECK(from->transaction_ == &transaction_) + << "VertexAccessors must be from the same transaction in when " + "creating an edge!"; + + auto from_vertex = from->vertex_; + auto to_vertex = to->vertex_; + + // Obtain the locks by `gid` order to avoid lock cycles. + std::unique_lock guard_from(from_vertex->lock, + std::defer_lock); + std::unique_lock guard_to(to_vertex->lock, std::defer_lock); + if (from_vertex->gid < to_vertex->gid) { + guard_from.lock(); + guard_to.lock(); + } else if (from_vertex->gid > to_vertex->gid) { + guard_to.lock(); + guard_from.lock(); + } else { + // The vertices are the same vertex, only lock one. + guard_from.lock(); + } + + if (!PrepareForWrite(&transaction_, from_vertex)) + return Error::SERIALIZATION_ERROR; + if (from_vertex->deleted) return Error::DELETED_OBJECT; + + if (to_vertex != from_vertex) { + if (!PrepareForWrite(&transaction_, to_vertex)) + return Error::SERIALIZATION_ERROR; + if (to_vertex->deleted) return Error::DELETED_OBJECT; + } + + // NOTE: When we update the next `edge_id_` here we perform a RMW + // (read-modify-write) operation that ISN'T atomic! But, that isn't an issue + // because this function is only called from the replication delta applier + // that runs single-threadedly and while this instance is set-up to apply + // threads (it is the replica), it is guaranteed that no other writes are + // possible. + storage_->edge_id_.store( + std::max(storage_->edge_id_.load(std::memory_order_acquire), + gid.AsUint() + 1), + std::memory_order_release); + + EdgeRef edge(gid); + if (config_.properties_on_edges) { + auto acc = storage_->edges_.access(); + auto delta = CreateDeleteObjectDelta(&transaction_); + auto [it, inserted] = acc.insert(Edge(gid, delta)); + CHECK(inserted) << "The edge must be inserted here!"; + CHECK(it != acc.end()) << "Invalid Edge accessor!"; + edge = EdgeRef(&*it); + delta->prev.Set(&*it); + } + + CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), + edge_type, to_vertex, edge); + from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); + + CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), + edge_type, from_vertex, edge); + to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); + + // Increment edge count. + storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); + + return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, &transaction_, + &storage_->indices_, &storage_->constraints_, config_); +} +#endif + Result Storage::Accessor::DeleteEdge(EdgeAccessor *edge) { CHECK(edge->transaction_ == &transaction_) << "EdgeAccessor must be from the same transaction as the storage " @@ -743,7 +873,16 @@ EdgeTypeId Storage::Accessor::NameToEdgeType(const std::string_view &name) { void Storage::Accessor::AdvanceCommand() { ++transaction_.command_id; } +#ifdef MG_ENTERPRISE utils::BasicResult Storage::Accessor::Commit() { + return Commit(std::nullopt); +} + +utils::BasicResult Storage::Accessor::Commit( + std::optional desired_commit_timestamp) { +#else +utils::BasicResult Storage::Accessor::Commit() { +#endif CHECK(is_transaction_active_) << "The transaction is already terminated!"; CHECK(!transaction_.must_abort) << "The transaction can't be committed!"; @@ -780,7 +919,17 @@ utils::BasicResult Storage::Accessor::Commit() { { std::unique_lock engine_guard(storage_->engine_lock_); +#ifdef MG_ENTERPRISE + if (!desired_commit_timestamp) { + commit_timestamp = storage_->timestamp_++; + } else { + commit_timestamp = *desired_commit_timestamp; + storage_->timestamp_ = + std::max(storage_->timestamp_, *desired_commit_timestamp + 1); + } +#else commit_timestamp = storage_->timestamp_++; +#endif // Before committing and validating vertices against unique constraints, // we have to update unique constraints with the vertices that are going @@ -1223,7 +1372,15 @@ Transaction Storage::CreateTransaction() { { std::lock_guard guard(engine_lock_); transaction_id = transaction_id_++; +#ifdef MG_ENTERPRISE + if (replication_state_.load() != ReplicationState::REPLICA) { + start_timestamp = timestamp_++; + } else { + start_timestamp = timestamp_; + } +#else start_timestamp = timestamp_++; +#endif } return {transaction_id, start_timestamp}; } @@ -1490,6 +1647,20 @@ void Storage::AppendToWal(const Transaction &transaction, // A single transaction will always be contained in a single WAL file. auto current_commit_timestamp = transaction.commit_timestamp->load(std::memory_order_acquire); + +#ifdef MG_ENTERPRISE + std::shared_lock replication_guard(replication_lock_); + std::optional stream; + if (replication_client_) { + try { + stream.emplace(replication_client_->ReplicateTransaction()); + } catch (const rpc::RpcFailedException &) { + LOG(FATAL) << "Couldn't replicate data!"; + } + } + replication_guard.unlock(); +#endif + // Helper lambda that traverses the delta chain on order to find the first // delta that should be processed and then appends all discovered deltas. auto find_and_apply_deltas = [&](const auto *delta, const auto &parent, @@ -1505,6 +1676,15 @@ void Storage::AppendToWal(const Transaction &transaction, while (true) { if (filter(delta->action)) { wal_file_->AppendDelta(*delta, parent, final_commit_timestamp); +#ifdef MG_ENTERPRISE + if (stream) { + try { + stream->AppendDelta(*delta, parent, final_commit_timestamp); + } catch (const rpc::RpcFailedException &) { + LOG(FATAL) << "Couldn't replicate data!"; + } + } +#endif } auto prev = delta->prev.Get(); if (prev.type != PreviousPtr::Type::DELTA) break; @@ -1637,6 +1817,17 @@ void Storage::AppendToWal(const Transaction &transaction, wal_file_->AppendTransactionEnd(final_commit_timestamp); FinalizeWalFile(); + +#ifdef MG_ENTERPRISE + if (stream) { + try { + stream->AppendTransactionEnd(final_commit_timestamp); + stream->Finalize(); + } catch (const rpc::RpcFailedException &) { + LOG(FATAL) << "Couldn't replicate data!"; + } + } +#endif } void Storage::AppendToWal(durability::StorageGlobalOperation operation, @@ -1645,7 +1836,438 @@ void Storage::AppendToWal(durability::StorageGlobalOperation operation, if (!InitializeWalFile()) return; wal_file_->AppendOperation(operation, label, properties, final_commit_timestamp); +#ifdef MG_ENTERPRISE + std::shared_lock replication_guard(replication_lock_); + if (replication_client_) { + auto stream = replication_client_->ReplicateTransaction(); + try { + stream.AppendOperation(operation, label, properties, + final_commit_timestamp); + stream.Finalize(); + } catch (const rpc::RpcFailedException &) { + LOG(FATAL) << "Couldn't replicate data!"; + } + } + replication_guard.unlock(); +#endif FinalizeWalFile(); } +#ifdef MG_ENTERPRISE +void Storage::ConfigureReplica() { + // Create RPC server. + // TODO(mferencevic): Add support for SSL. + replication_server_context_.emplace(); + // NOTE: The replication server must have a single thread for processing + // because there is no need for more processing threads - each replica can + // have only a single main server. Also, the single-threaded guarantee + // simplifies the rest of the implementation. + // TODO(mferencevic): Make endpoint configurable. + replication_server_.emplace(io::network::Endpoint{"127.0.0.1", 10000}, + &*replication_server_context_, + /* workers_count = */ 1); + replication_server_->Register([this](auto *req_reader, + auto *res_builder) { + AppendDeltasReq req; + slk::Load(&req, req_reader); + + DLOG(INFO) << "Received AppendDeltasRpc:"; + + replication::Decoder decoder(req_reader); + + auto edge_acc = edges_.access(); + auto vertex_acc = vertices_.access(); + + std::optional> + commit_timestamp_and_accessor; + auto get_transaction = + [this, &commit_timestamp_and_accessor](uint64_t commit_timestamp) { + if (!commit_timestamp_and_accessor) { + commit_timestamp_and_accessor.emplace(commit_timestamp, Access()); + } else if (commit_timestamp_and_accessor->first != commit_timestamp) { + throw utils::BasicException("Received more than one transaction!"); + } + return &commit_timestamp_and_accessor->second; + }; + + bool transaction_complete = false; + for (uint64_t i = 0; !transaction_complete; ++i) { + uint64_t timestamp; + durability::WalDeltaData delta; + + try { + timestamp = ReadWalDeltaHeader(&decoder); + DLOG(INFO) << " Delta " << i; + DLOG(INFO) << " Timestamp " << timestamp; + delta = ReadWalDeltaData(&decoder); + } catch (const slk::SlkReaderException &) { + throw utils::BasicException("Missing data!"); + } catch (const durability::RecoveryFailure &) { + throw utils::BasicException("Invalid data!"); + } + + switch (delta.type) { + case durability::WalDeltaData::Type::VERTEX_CREATE: { + DLOG(INFO) << " Create vertex " + << delta.vertex_create_delete.gid.AsUint(); + auto transaction = get_transaction(timestamp); + transaction->CreateVertex(delta.vertex_create_delete.gid); + break; + } + case durability::WalDeltaData::Type::VERTEX_DELETE: { + DLOG(INFO) << " Delete vertex " + << delta.vertex_create_delete.gid.AsUint(); + auto transaction = get_transaction(timestamp); + auto vertex = transaction->FindVertex(delta.vertex_create_delete.gid, + storage::View::NEW); + if (!vertex) throw utils::BasicException("Invalid transaction!"); + auto ret = transaction->DeleteVertex(&*vertex); + if (ret.HasError() || !ret.GetValue()) + throw utils::BasicException("Invalid transaction!"); + break; + } + case durability::WalDeltaData::Type::VERTEX_ADD_LABEL: { + DLOG(INFO) << " Vertex " + << delta.vertex_add_remove_label.gid.AsUint() + << " add label " << delta.vertex_add_remove_label.label; + auto transaction = get_transaction(timestamp); + auto vertex = transaction->FindVertex( + delta.vertex_add_remove_label.gid, storage::View::NEW); + if (!vertex) throw utils::BasicException("Invalid transaction!"); + auto ret = vertex->AddLabel( + transaction->NameToLabel(delta.vertex_add_remove_label.label)); + if (ret.HasError() || !ret.GetValue()) + throw utils::BasicException("Invalid transaction!"); + break; + } + case durability::WalDeltaData::Type::VERTEX_REMOVE_LABEL: { + DLOG(INFO) << " Vertex " + << delta.vertex_add_remove_label.gid.AsUint() + << " remove label " << delta.vertex_add_remove_label.label; + auto transaction = get_transaction(timestamp); + auto vertex = transaction->FindVertex( + delta.vertex_add_remove_label.gid, storage::View::NEW); + if (!vertex) throw utils::BasicException("Invalid transaction!"); + auto ret = vertex->RemoveLabel( + transaction->NameToLabel(delta.vertex_add_remove_label.label)); + if (ret.HasError() || !ret.GetValue()) + throw utils::BasicException("Invalid transaction!"); + break; + } + case durability::WalDeltaData::Type::VERTEX_SET_PROPERTY: { + DLOG(INFO) << " Vertex " + << delta.vertex_edge_set_property.gid.AsUint() + << " set property " + << delta.vertex_edge_set_property.property << " to " + << delta.vertex_edge_set_property.value; + auto transaction = get_transaction(timestamp); + auto vertex = transaction->FindVertex( + delta.vertex_edge_set_property.gid, storage::View::NEW); + if (!vertex) throw utils::BasicException("Invalid transaction!"); + auto ret = + vertex->SetProperty(transaction->NameToProperty( + delta.vertex_edge_set_property.property), + delta.vertex_edge_set_property.value); + if (ret.HasError()) + throw utils::BasicException("Invalid transaction!"); + break; + } + case durability::WalDeltaData::Type::EDGE_CREATE: { + DLOG(INFO) << " Create edge " + << delta.edge_create_delete.gid.AsUint() << " of type " + << delta.edge_create_delete.edge_type << " from vertex " + << delta.edge_create_delete.from_vertex.AsUint() + << " to vertex " + << delta.edge_create_delete.to_vertex.AsUint(); + auto transaction = get_transaction(timestamp); + auto from_vertex = transaction->FindVertex( + delta.edge_create_delete.from_vertex, storage::View::NEW); + if (!from_vertex) throw utils::BasicException("Invalid transaction!"); + auto to_vertex = transaction->FindVertex( + delta.edge_create_delete.to_vertex, storage::View::NEW); + if (!to_vertex) throw utils::BasicException("Invalid transaction!"); + auto edge = transaction->CreateEdge( + &*from_vertex, &*to_vertex, + transaction->NameToEdgeType(delta.edge_create_delete.edge_type), + delta.edge_create_delete.gid); + if (edge.HasError()) + throw utils::BasicException("Invalid transaction!"); + break; + } + case durability::WalDeltaData::Type::EDGE_DELETE: { + DLOG(INFO) << " Delete edge " + << delta.edge_create_delete.gid.AsUint() << " of type " + << delta.edge_create_delete.edge_type << " from vertex " + << delta.edge_create_delete.from_vertex.AsUint() + << " to vertex " + << delta.edge_create_delete.to_vertex.AsUint(); + auto transaction = get_transaction(timestamp); + auto from_vertex = transaction->FindVertex( + delta.edge_create_delete.from_vertex, storage::View::NEW); + if (!from_vertex) throw utils::BasicException("Invalid transaction!"); + auto to_vertex = transaction->FindVertex( + delta.edge_create_delete.to_vertex, storage::View::NEW); + if (!to_vertex) throw utils::BasicException("Invalid transaction!"); + auto edges = from_vertex->OutEdges( + storage::View::NEW, + {transaction->NameToEdgeType(delta.edge_create_delete.edge_type)}, + &*to_vertex); + if (edges.HasError()) + throw utils::BasicException("Invalid transaction!"); + if (edges->size() != 1) + throw utils::BasicException("Invalid transaction!"); + auto &edge = (*edges)[0]; + auto ret = transaction->DeleteEdge(&edge); + if (ret.HasError()) + throw utils::BasicException("Invalid transaction!"); + break; + } + case durability::WalDeltaData::Type::EDGE_SET_PROPERTY: { + DLOG(INFO) << " Edge " + << delta.vertex_edge_set_property.gid.AsUint() + << " set property " + << delta.vertex_edge_set_property.property << " to " + << delta.vertex_edge_set_property.value; + + if (!config_.items.properties_on_edges) + throw utils::BasicException( + "Can't set properties on edges because properties on edges " + "are disabled!"); + + auto transaction = get_transaction(timestamp); + + // The following block of code effectively implements `FindEdge` and + // yields an accessor that is only valid for managing the edge's + // properties. + auto edge = edge_acc.find(delta.vertex_edge_set_property.gid); + if (edge == edge_acc.end()) + throw utils::BasicException("Invalid transaction!"); + // The edge visibility check must be done here manually because we + // don't allow direct access to the edges through the public API. + { + bool is_visible = true; + Delta *delta = nullptr; + { + std::lock_guard guard(edge->lock); + is_visible = !edge->deleted; + delta = edge->delta; + } + ApplyDeltasForRead(&transaction->transaction_, delta, View::NEW, + [&is_visible](const Delta &delta) { + switch (delta.action) { + case Delta::Action::ADD_LABEL: + case Delta::Action::REMOVE_LABEL: + case Delta::Action::SET_PROPERTY: + case Delta::Action::ADD_IN_EDGE: + case Delta::Action::ADD_OUT_EDGE: + case Delta::Action::REMOVE_IN_EDGE: + case Delta::Action::REMOVE_OUT_EDGE: + break; + case Delta::Action::RECREATE_OBJECT: { + is_visible = true; + break; + } + case Delta::Action::DELETE_OBJECT: { + is_visible = false; + break; + } + } + }); + if (!is_visible) + throw utils::BasicException("Invalid transaction!"); + } + EdgeRef edge_ref(&*edge); + // Here we create an edge accessor that we will use to get the + // properties of the edge. The accessor is created with an invalid + // type and invalid from/to pointers because we don't know them + // here, but that isn't an issue because we won't use that part of + // the API here. + auto ea = EdgeAccessor{edge_ref, + EdgeTypeId::FromUint(0UL), + nullptr, + nullptr, + &transaction->transaction_, + &indices_, + &constraints_, + config_.items}; + + auto ret = + ea.SetProperty(transaction->NameToProperty( + delta.vertex_edge_set_property.property), + delta.vertex_edge_set_property.value); + if (ret.HasError()) + throw utils::BasicException("Invalid transaction!"); + break; + } + + case durability::WalDeltaData::Type::TRANSACTION_END: { + DLOG(INFO) << " Transaction end"; + if (!commit_timestamp_and_accessor || + commit_timestamp_and_accessor->first != timestamp) + throw utils::BasicException("Invalid data!"); + auto ret = commit_timestamp_and_accessor->second.Commit( + commit_timestamp_and_accessor->first); + if (ret.HasError()) + throw utils::BasicException("Invalid transaction!"); + commit_timestamp_and_accessor = std::nullopt; + transaction_complete = true; + break; + } + + case durability::WalDeltaData::Type::LABEL_INDEX_CREATE: { + DLOG(INFO) << " Create label index on :" + << delta.operation_label.label; + if (commit_timestamp_and_accessor) + throw utils::BasicException("Invalid transaction!"); + if (!CreateIndex(NameToLabel(delta.operation_label.label))) + throw utils::BasicException("Invalid transaction!"); + transaction_complete = true; + break; + } + case durability::WalDeltaData::Type::LABEL_INDEX_DROP: { + DLOG(INFO) << " Drop label index on :" + << delta.operation_label.label; + if (commit_timestamp_and_accessor) + throw utils::BasicException("Invalid transaction!"); + if (!DropIndex(NameToLabel(delta.operation_label.label))) + throw utils::BasicException("Invalid transaction!"); + transaction_complete = true; + break; + } + case durability::WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE: { + DLOG(INFO) << " Create label+property index on :" + << delta.operation_label_property.label << " (" + << delta.operation_label_property.property << ")"; + if (commit_timestamp_and_accessor) + throw utils::BasicException("Invalid transaction!"); + if (!CreateIndex( + NameToLabel(delta.operation_label_property.label), + NameToProperty(delta.operation_label_property.property))) + throw utils::BasicException("Invalid transaction!"); + transaction_complete = true; + break; + } + case durability::WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP: { + DLOG(INFO) << " Drop label+property index on :" + << delta.operation_label_property.label << " (" + << delta.operation_label_property.property << ")"; + if (commit_timestamp_and_accessor) + throw utils::BasicException("Invalid transaction!"); + if (!DropIndex( + NameToLabel(delta.operation_label_property.label), + NameToProperty(delta.operation_label_property.property))) + throw utils::BasicException("Invalid transaction!"); + transaction_complete = true; + break; + } + case durability::WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE: { + DLOG(INFO) << " Create existence constraint on :" + << delta.operation_label_property.label << " (" + << delta.operation_label_property.property << ")"; + if (commit_timestamp_and_accessor) + throw utils::BasicException("Invalid transaction!"); + auto ret = CreateExistenceConstraint( + NameToLabel(delta.operation_label_property.label), + NameToProperty(delta.operation_label_property.property)); + if (!ret.HasValue() || !ret.GetValue()) + throw utils::BasicException("Invalid transaction!"); + transaction_complete = true; + break; + } + case durability::WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: { + DLOG(INFO) << " Drop existence constraint on :" + << delta.operation_label_property.label << " (" + << delta.operation_label_property.property << ")"; + if (commit_timestamp_and_accessor) + throw utils::BasicException("Invalid transaction!"); + if (!DropExistenceConstraint( + NameToLabel(delta.operation_label_property.label), + NameToProperty(delta.operation_label_property.property))) + throw utils::BasicException("Invalid transaction!"); + transaction_complete = true; + break; + } + case durability::WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE: { + std::stringstream ss; + utils::PrintIterable(ss, delta.operation_label_properties.properties); + DLOG(INFO) << " Create unique constraint on :" + << delta.operation_label_properties.label << " (" + << ss.str() << ")"; + if (commit_timestamp_and_accessor) + throw utils::BasicException("Invalid transaction!"); + std::set properties; + for (const auto &prop : delta.operation_label_properties.properties) { + properties.emplace(NameToProperty(prop)); + } + auto ret = CreateUniqueConstraint( + NameToLabel(delta.operation_label_properties.label), properties); + if (!ret.HasValue() || + ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS) + throw utils::BasicException("Invalid transaction!"); + transaction_complete = true; + break; + } + case durability::WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP: { + std::stringstream ss; + utils::PrintIterable(ss, delta.operation_label_properties.properties); + DLOG(INFO) << " Drop unique constraint on :" + << delta.operation_label_properties.label << " (" + << ss.str() << ")"; + if (commit_timestamp_and_accessor) + throw utils::BasicException("Invalid transaction!"); + std::set properties; + for (const auto &prop : delta.operation_label_properties.properties) { + properties.emplace(NameToProperty(prop)); + } + auto ret = DropUniqueConstraint( + NameToLabel(delta.operation_label_properties.label), properties); + if (ret != UniqueConstraints::DeletionStatus::SUCCESS) + throw utils::BasicException("Invalid transaction!"); + transaction_complete = true; + break; + } + } + } + + if (commit_timestamp_and_accessor) + throw utils::BasicException("Invalid data!"); + + AppendDeltasRes res; + slk::Save(res, res_builder); + }); + replication_server_->Start(); +} + +void Storage::ConfigureMain() { + replication_client_.emplace(&name_id_mapper_, config_.items, + io::network::Endpoint{"127.0.0.1", 10000}, false); +} + +void Storage::SetReplicationState(const ReplicationState state) { + if (replication_state_.load(std::memory_order_acquire) == state) { + return; + } + + std::unique_lock replication_guard(replication_lock_); + + replication_server_.reset(); + replication_server_context_.reset(); + replication_client_.reset(); + + switch (state) { + case ReplicationState::MAIN: + ConfigureMain(); + break; + case ReplicationState::REPLICA: + ConfigureReplica(); + break; + case ReplicationState::NONE: + default: + break; + } + + replication_state_.store(state, std::memory_order_release); +} +#endif + } // namespace storage diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index c72127b6f..5d2134761 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -22,6 +23,13 @@ #include "utils/skip_list.hpp" #include "utils/synchronized.hpp" +#ifdef MG_ENTERPRISE +#include "rpc/server.hpp" +#include "storage/v2/replication/replication.hpp" +#include "storage/v2/replication/rpc.hpp" +#include "storage/v2/replication/serialization.hpp" +#endif + namespace storage { // The storage is based on this paper: @@ -159,6 +167,10 @@ struct StorageInfo { uint64_t disk_usage; }; +#ifdef MG_ENTERPRISE +enum class ReplicationState : uint8_t { NONE, MAIN, REPLICA }; +#endif + class Storage final { public: /// @throw std::system_error @@ -305,6 +317,19 @@ class Storage final { void Abort(); private: +#ifdef MG_ENTERPRISE + /// @throw std::bad_alloc + VertexAccessor CreateVertex(storage::Gid gid); + + /// @throw std::bad_alloc + Result CreateEdge(VertexAccessor *from, VertexAccessor *to, + EdgeTypeId edge_type, storage::Gid gid); + + /// @throw std::bad_alloc + utils::BasicResult Commit( + std::optional desired_commit_timestamp); +#endif + Storage *storage_; std::shared_lock storage_guard_; Transaction transaction_; @@ -379,6 +404,10 @@ class Storage final { StorageInfo GetInfo() const; +#ifdef MG_ENTERPRISE + void SetReplicationState(ReplicationState state); +#endif + private: Transaction CreateTransaction(); @@ -395,6 +424,11 @@ class Storage final { const std::set &properties, uint64_t final_commit_timestamp); +#ifdef MG_ENTERPRISE + void ConfigureReplica(); + void ConfigureMain(); +#endif + // Main storage lock. // // Accessors take a shared lock when starting, so it is possible to block @@ -467,6 +501,16 @@ class Storage final { std::optional wal_file_; uint64_t wal_unsynced_transactions_{0}; + + // Replication +#ifdef MG_ENTERPRISE + utils::RWLock replication_lock_{utils::RWLock::Priority::WRITE}; + std::optional replication_server_context_; + std::optional replication_server_; + // TODO(mferencevic): Add support for multiple clients. + std::optional replication_client_; + std::atomic replication_state_{ReplicationState::NONE}; +#endif }; } // namespace storage diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index db97cb2d5..3501686ad 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -234,6 +234,10 @@ target_link_libraries(${test_prefix}storage_v2_property_store mg-storage-v2 fmt) add_unit_test(storage_v2_wal_file.cpp) target_link_libraries(${test_prefix}storage_v2_wal_file mg-storage-v2 fmt) +if (MG_ENTERPRISE) +add_unit_test(storage_v2_replication.cpp) +target_link_libraries(${test_prefix}storage_v2_replication mg-storage-v2 fmt) +endif() # Test mg-auth @@ -246,10 +250,11 @@ endif() # Test mg-slk if (MG_ENTERPRISE) -## TODO: REPLACE single-node-ha -#add_unit_test(slk_advanced.cpp) -#target_link_libraries(${test_prefix}slk_advanced mg-single-node-ha mg-kvstore-dummy) +add_unit_test(slk_advanced.cpp) +target_link_libraries(${test_prefix}slk_advanced mg-storage-v2) +endif() +if (MG_ENTERPRISE) add_unit_test(slk_core.cpp) target_link_libraries(${test_prefix}slk_core mg-slk glog gflags fmt) diff --git a/tests/unit/slk_advanced.cpp b/tests/unit/slk_advanced.cpp index 03fb77291..e6e35fcae 100644 --- a/tests/unit/slk_advanced.cpp +++ b/tests/unit/slk_advanced.cpp @@ -1,6 +1,6 @@ #include -#include "storage/common/types/slk.hpp" +#include "storage/v2/replication/slk.hpp" #include "slk_common.hpp" diff --git a/tests/unit/storage_v2_replication.cpp b/tests/unit/storage_v2_replication.cpp new file mode 100644 index 000000000..8a4dac764 --- /dev/null +++ b/tests/unit/storage_v2_replication.cpp @@ -0,0 +1,252 @@ +#include +#include + +#include +#include +#include +#include + +#include +#include + +using testing::UnorderedElementsAre; + +TEST(ReplicationTest, BasicSynchronousReplicationTest) { + std::filesystem::path storage_directory{ + std::filesystem::temp_directory_path() / + "MG_test_unit_storage_v2_replication"}; + + storage::Storage main_store( + {.items = {.properties_on_edges = true}, + .durability = { + .storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode:: + PERIODIC_SNAPSHOT_WITH_WAL, + }}); + main_store.SetReplicationState(storage::ReplicationState::MAIN); + + storage::Storage replica_store( + {.items = {.properties_on_edges = true}, + .durability = { + .storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode:: + PERIODIC_SNAPSHOT_WITH_WAL, + }}); + replica_store.SetReplicationState(storage::ReplicationState::REPLICA); + + // vertex create + // vertex add label + // vertex set property + const auto *vertex_label = "vertex_label"; + const auto *vertex_property = "vertex_property"; + const auto *vertex_property_value = "vertex_property_value"; + std::optional vertex_gid; + { + auto acc = main_store.Access(); + auto v = acc.CreateVertex(); + vertex_gid.emplace(v.Gid()); + ASSERT_TRUE(v.AddLabel(main_store.NameToLabel(vertex_label)).HasValue()); + ASSERT_TRUE(v.SetProperty(main_store.NameToProperty(vertex_property), + storage::PropertyValue(vertex_property_value)) + .HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } + + { + auto acc = replica_store.Access(); + const auto v = acc.FindVertex(*vertex_gid, storage::View::OLD); + ASSERT_TRUE(v); + const auto labels = v->Labels(storage::View::OLD); + ASSERT_TRUE(labels.HasValue()); + ASSERT_EQ(labels->size(), 1); + ASSERT_THAT(*labels, + UnorderedElementsAre(replica_store.NameToLabel(vertex_label))); + const auto properties = v->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + ASSERT_EQ(properties->size(), 1); + ASSERT_THAT(*properties, + UnorderedElementsAre(std::make_pair( + replica_store.NameToProperty(vertex_property), + storage::PropertyValue(vertex_property_value)))); + + ASSERT_FALSE(acc.Commit().HasError()); + } + + // vertex remove label + { + auto acc = main_store.Access(); + auto v = acc.FindVertex(*vertex_gid, storage::View::OLD); + ASSERT_TRUE(v); + ASSERT_TRUE( + v->RemoveLabel(main_store.NameToLabel(vertex_label)).HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } + + { + auto acc = replica_store.Access(); + const auto v = acc.FindVertex(*vertex_gid, storage::View::OLD); + ASSERT_TRUE(v); + const auto labels = v->Labels(storage::View::OLD); + ASSERT_TRUE(labels.HasValue()); + ASSERT_EQ(labels->size(), 0); + ASSERT_FALSE(acc.Commit().HasError()); + } + + // vertex delete + { + auto acc = main_store.Access(); + auto v = acc.FindVertex(*vertex_gid, storage::View::OLD); + ASSERT_TRUE(v); + ASSERT_TRUE(acc.DeleteVertex(&*v).HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } + + { + auto acc = replica_store.Access(); + const auto v = acc.FindVertex(*vertex_gid, storage::View::OLD); + ASSERT_FALSE(v); + vertex_gid.reset(); + ASSERT_FALSE(acc.Commit().HasError()); + } + + // edge create + // edge set property + const auto *edge_type = "edge_type"; + const auto *edge_property = "edge_property"; + const auto *edge_property_value = "edge_property_value"; + std::optional edge_gid; + { + auto acc = main_store.Access(); + auto v = acc.CreateVertex(); + vertex_gid.emplace(v.Gid()); + auto edge = acc.CreateEdge(&v, &v, main_store.NameToEdgeType(edge_type)); + ASSERT_TRUE(edge.HasValue()); + ASSERT_TRUE(edge->SetProperty(main_store.NameToProperty(edge_property), + storage::PropertyValue(edge_property_value)) + .HasValue()); + edge_gid.emplace(edge->Gid()); + ASSERT_FALSE(acc.Commit().HasError()); + } + + const auto find_edge = + [&](const auto &edges, + const storage::Gid edge_gid) -> std::optional { + for (const auto &edge : edges) { + if (edge.Gid() == edge_gid) { + return edge; + } + } + return std::nullopt; + }; + + { + auto acc = replica_store.Access(); + const auto v = acc.FindVertex(*vertex_gid, storage::View::OLD); + ASSERT_TRUE(v); + const auto out_edges = v->OutEdges(storage::View::OLD); + ASSERT_TRUE(out_edges.HasValue()); + const auto edge = find_edge(*out_edges, *edge_gid); + ASSERT_EQ(edge->EdgeType(), replica_store.NameToEdgeType(edge_type)); + const auto properties = edge->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + ASSERT_EQ(properties->size(), 1); + ASSERT_THAT(*properties, UnorderedElementsAre(std::make_pair( + replica_store.NameToProperty(edge_property), + storage::PropertyValue(edge_property_value)))); + ASSERT_FALSE(acc.Commit().HasError()); + } + + // delete edge + { + auto acc = main_store.Access(); + auto v = acc.FindVertex(*vertex_gid, storage::View::OLD); + ASSERT_TRUE(v); + auto out_edges = v->OutEdges(storage::View::OLD); + auto edge = find_edge(*out_edges, *edge_gid); + ASSERT_TRUE(edge); + ASSERT_TRUE(acc.DeleteEdge(&*edge).HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } + + { + auto acc = replica_store.Access(); + const auto v = acc.FindVertex(*vertex_gid, storage::View::OLD); + ASSERT_TRUE(v); + const auto out_edges = v->OutEdges(storage::View::OLD); + ASSERT_TRUE(out_edges.HasValue()); + ASSERT_FALSE(find_edge(*out_edges, *edge_gid)); + ASSERT_FALSE(acc.Commit().HasError()); + } + + // label index create + // label property index create + // existence constraint create + // unique constriant create + const auto *label = "label"; + const auto *property = "property"; + const auto *property_extra = "property_extra"; + { + ASSERT_TRUE(main_store.CreateIndex(main_store.NameToLabel(label))); + ASSERT_TRUE(main_store.CreateIndex(main_store.NameToLabel(label), + main_store.NameToProperty(property))); + ASSERT_FALSE( + main_store + .CreateExistenceConstraint(main_store.NameToLabel(label), + main_store.NameToProperty(property)) + .HasError()); + ASSERT_FALSE( + main_store + .CreateUniqueConstraint(main_store.NameToLabel(label), + {main_store.NameToProperty(property), + main_store.NameToProperty(property_extra)}) + .HasError()); + } + + { + const auto indices = replica_store.ListAllIndices(); + ASSERT_THAT(indices.label, + UnorderedElementsAre(replica_store.NameToLabel(label))); + ASSERT_THAT(indices.label_property, + UnorderedElementsAre( + std::make_pair(replica_store.NameToLabel(label), + replica_store.NameToProperty(property)))); + + const auto constraints = replica_store.ListAllConstraints(); + ASSERT_THAT(constraints.existence, + UnorderedElementsAre( + std::make_pair(replica_store.NameToLabel(label), + replica_store.NameToProperty(property)))); + ASSERT_THAT(constraints.unique, + UnorderedElementsAre(std::make_pair( + replica_store.NameToLabel(label), + std::set{replica_store.NameToProperty(property), + replica_store.NameToProperty(property_extra)}))); + } + + // label index drop + // label property index drop + // existence constraint drop + // unique constriant drop + { + ASSERT_TRUE(main_store.DropIndex(main_store.NameToLabel(label))); + ASSERT_TRUE(main_store.DropIndex(main_store.NameToLabel(label), + main_store.NameToProperty(property))); + ASSERT_TRUE(main_store.DropExistenceConstraint( + main_store.NameToLabel(label), main_store.NameToProperty(property))); + ASSERT_EQ(main_store.DropUniqueConstraint( + main_store.NameToLabel(label), + {main_store.NameToProperty(property), + main_store.NameToProperty(property_extra)}), + storage::UniqueConstraints::DeletionStatus::SUCCESS); + } + + { + const auto indices = replica_store.ListAllIndices(); + ASSERT_EQ(indices.label.size(), 0); + ASSERT_EQ(indices.label_property.size(), 0); + + const auto constraints = replica_store.ListAllConstraints(); + ASSERT_EQ(constraints.existence.size(), 0); + ASSERT_EQ(constraints.unique.size(), 0); + } +}