diff --git a/src/storage/v2/durability/serialization.hpp b/src/storage/v2/durability/serialization.hpp index e0d8a9e59..18018ea8c 100644 --- a/src/storage/v2/durability/serialization.hpp +++ b/src/storage/v2/durability/serialization.hpp @@ -12,8 +12,23 @@ namespace storage::durability { +/// Encoder interface class. Used to implement streams to different targets +/// (e.g. file and network). +class BaseEncoder { + protected: + ~BaseEncoder() {} + + public: + virtual void WriteMarker(Marker marker) = 0; + virtual void WriteBool(bool value) = 0; + virtual void WriteUint(uint64_t value) = 0; + virtual void WriteDouble(double value) = 0; + virtual void WriteString(const std::string_view &value) = 0; + virtual void WritePropertyValue(const PropertyValue &value) = 0; +}; + /// Encoder that is used to generate a snapshot/WAL. -class Encoder final { +class Encoder final : public BaseEncoder { public: void Initialize(const std::filesystem::path &path, const std::string_view &magic, uint64_t version); @@ -22,12 +37,12 @@ class Encoder final { // directly. void Write(const uint8_t *data, uint64_t size); - void WriteMarker(Marker marker); - void WriteBool(bool value); - void WriteUint(uint64_t value); - void WriteDouble(double value); - void WriteString(const std::string_view &value); - void WritePropertyValue(const PropertyValue &value); + void WriteMarker(Marker marker) override; + void WriteBool(bool value) override; + void WriteUint(uint64_t value) override; + void WriteDouble(double value) override; + void WriteString(const std::string_view &value) override; + void WritePropertyValue(const PropertyValue &value) override; uint64_t GetPosition(); void SetPosition(uint64_t position); diff --git a/src/storage/v2/durability/wal.cpp b/src/storage/v2/durability/wal.cpp index 1272f68ce..ebf9c7ab3 100644 --- a/src/storage/v2/durability/wal.cpp +++ b/src/storage/v2/durability/wal.cpp @@ -516,6 +516,151 @@ WalDeltaData::Type SkipWalDeltaData(Decoder *wal) { return delta.type; } +void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper, + Config::Items items, const Delta &delta, const Vertex &vertex, + uint64_t timestamp) { + // When converting a Delta to a WAL delta the logic is inverted. That is + // because the Delta's represent undo actions and we want to store redo + // actions. + encoder->WriteMarker(Marker::SECTION_DELTA); + encoder->WriteUint(timestamp); + std::lock_guard guard(vertex.lock); + switch (delta.action) { + case Delta::Action::DELETE_OBJECT: + case Delta::Action::RECREATE_OBJECT: { + encoder->WriteMarker(VertexActionToMarker(delta.action)); + encoder->WriteUint(vertex.gid.AsUint()); + break; + } + case Delta::Action::SET_PROPERTY: { + encoder->WriteMarker(Marker::DELTA_VERTEX_SET_PROPERTY); + encoder->WriteUint(vertex.gid.AsUint()); + encoder->WriteString( + name_id_mapper->IdToName(delta.property.key.AsUint())); + // The property value is the value that is currently stored in the + // vertex. + // TODO (mferencevic): Mitigate the memory allocation introduced here + // (with the `GetProperty` call). It is the only memory allocation in the + // entire WAL file writing logic. + encoder->WritePropertyValue( + vertex.properties.GetProperty(delta.property.key)); + break; + } + case Delta::Action::ADD_LABEL: + case Delta::Action::REMOVE_LABEL: { + encoder->WriteMarker(VertexActionToMarker(delta.action)); + encoder->WriteUint(vertex.gid.AsUint()); + encoder->WriteString(name_id_mapper->IdToName(delta.label.AsUint())); + break; + } + case Delta::Action::ADD_OUT_EDGE: + case Delta::Action::REMOVE_OUT_EDGE: { + encoder->WriteMarker(VertexActionToMarker(delta.action)); + if (items.properties_on_edges) { + encoder->WriteUint(delta.vertex_edge.edge.ptr->gid.AsUint()); + } else { + encoder->WriteUint(delta.vertex_edge.edge.gid.AsUint()); + } + encoder->WriteString( + name_id_mapper->IdToName(delta.vertex_edge.edge_type.AsUint())); + encoder->WriteUint(vertex.gid.AsUint()); + encoder->WriteUint(delta.vertex_edge.vertex->gid.AsUint()); + break; + } + case Delta::Action::ADD_IN_EDGE: + case Delta::Action::REMOVE_IN_EDGE: + // These actions are already encoded in the *_OUT_EDGE actions. This + // function should never be called for this type of deltas. + LOG(FATAL) << "Invalid delta action!"; + } +} + +void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper, + const Delta &delta, const Edge &edge, uint64_t timestamp) { + // When converting a Delta to a WAL delta the logic is inverted. That is + // because the Delta's represent undo actions and we want to store redo + // actions. + encoder->WriteMarker(Marker::SECTION_DELTA); + encoder->WriteUint(timestamp); + std::lock_guard guard(edge.lock); + switch (delta.action) { + case Delta::Action::SET_PROPERTY: { + encoder->WriteMarker(Marker::DELTA_EDGE_SET_PROPERTY); + encoder->WriteUint(edge.gid.AsUint()); + encoder->WriteString( + name_id_mapper->IdToName(delta.property.key.AsUint())); + // The property value is the value that is currently stored in the + // edge. + // TODO (mferencevic): Mitigate the memory allocation introduced here + // (with the `GetProperty` call). It is the only memory allocation in the + // entire WAL file writing logic. + encoder->WritePropertyValue( + edge.properties.GetProperty(delta.property.key)); + break; + } + case Delta::Action::DELETE_OBJECT: + case Delta::Action::RECREATE_OBJECT: + // These actions are already encoded in vertex *_OUT_EDGE actions. Also, + // these deltas don't contain any information about the from vertex, to + // vertex or edge type so they are useless. This function should never + // be called for this type of deltas. + LOG(FATAL) << "Invalid delta action!"; + case Delta::Action::ADD_LABEL: + case Delta::Action::REMOVE_LABEL: + case Delta::Action::ADD_OUT_EDGE: + case Delta::Action::REMOVE_OUT_EDGE: + case Delta::Action::ADD_IN_EDGE: + case Delta::Action::REMOVE_IN_EDGE: + // These deltas shouldn't appear for edges. + LOG(FATAL) << "Invalid database state!"; + } +} + +void EncodeTransactionEnd(BaseEncoder *encoder, uint64_t timestamp) { + encoder->WriteMarker(Marker::SECTION_DELTA); + encoder->WriteUint(timestamp); + encoder->WriteMarker(Marker::DELTA_TRANSACTION_END); +} + +void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, + StorageGlobalOperation operation, LabelId label, + const std::set &properties, + uint64_t timestamp) { + encoder->WriteMarker(Marker::SECTION_DELTA); + encoder->WriteUint(timestamp); + switch (operation) { + case StorageGlobalOperation::LABEL_INDEX_CREATE: + case StorageGlobalOperation::LABEL_INDEX_DROP: { + CHECK(properties.empty()) << "Invalid function call!"; + encoder->WriteMarker(OperationToMarker(operation)); + encoder->WriteString(name_id_mapper->IdToName(label.AsUint())); + break; + } + case StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE: + case StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP: + case StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE: + case StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP: { + CHECK(properties.size() == 1) << "Invalid function call!"; + encoder->WriteMarker(OperationToMarker(operation)); + encoder->WriteString(name_id_mapper->IdToName(label.AsUint())); + encoder->WriteString( + name_id_mapper->IdToName((*properties.begin()).AsUint())); + break; + } + case StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE: + case StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP: { + CHECK(!properties.empty()) << "Invalid function call!"; + encoder->WriteMarker(OperationToMarker(operation)); + encoder->WriteString(name_id_mapper->IdToName(label.AsUint())); + encoder->WriteUint(properties.size()); + for (const auto &property : properties) { + encoder->WriteString(name_id_mapper->IdToName(property.AsUint())); + } + break; + } + } +} + RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConstraints *indices_constraints, std::optional snapshot_timestamp, @@ -884,145 +1029,26 @@ WalFile::~WalFile() { void WalFile::AppendDelta(const Delta &delta, const Vertex &vertex, uint64_t timestamp) { - // When converting a Delta to a WAL delta the logic is inverted. That is - // because the Delta's represent undo actions and we want to store redo - // actions. - wal_.WriteMarker(Marker::SECTION_DELTA); - wal_.WriteUint(timestamp); - std::lock_guard guard(vertex.lock); - switch (delta.action) { - case Delta::Action::DELETE_OBJECT: - case Delta::Action::RECREATE_OBJECT: { - wal_.WriteMarker(VertexActionToMarker(delta.action)); - wal_.WriteUint(vertex.gid.AsUint()); - break; - } - case Delta::Action::SET_PROPERTY: { - wal_.WriteMarker(Marker::DELTA_VERTEX_SET_PROPERTY); - wal_.WriteUint(vertex.gid.AsUint()); - wal_.WriteString(name_id_mapper_->IdToName(delta.property.key.AsUint())); - // The property value is the value that is currently stored in the - // vertex. - // TODO (mferencevic): Mitigate the memory allocation introduced here - // (with the `GetProperty` call). It is the only memory allocation in the - // entire WAL file writing logic. - wal_.WritePropertyValue( - vertex.properties.GetProperty(delta.property.key)); - break; - } - case Delta::Action::ADD_LABEL: - case Delta::Action::REMOVE_LABEL: { - wal_.WriteMarker(VertexActionToMarker(delta.action)); - wal_.WriteUint(vertex.gid.AsUint()); - wal_.WriteString(name_id_mapper_->IdToName(delta.label.AsUint())); - break; - } - case Delta::Action::ADD_OUT_EDGE: - case Delta::Action::REMOVE_OUT_EDGE: { - wal_.WriteMarker(VertexActionToMarker(delta.action)); - if (items_.properties_on_edges) { - wal_.WriteUint(delta.vertex_edge.edge.ptr->gid.AsUint()); - } else { - wal_.WriteUint(delta.vertex_edge.edge.gid.AsUint()); - } - wal_.WriteString( - name_id_mapper_->IdToName(delta.vertex_edge.edge_type.AsUint())); - wal_.WriteUint(vertex.gid.AsUint()); - wal_.WriteUint(delta.vertex_edge.vertex->gid.AsUint()); - break; - } - case Delta::Action::ADD_IN_EDGE: - case Delta::Action::REMOVE_IN_EDGE: - // These actions are already encoded in the *_OUT_EDGE actions. This - // function should never be called for this type of deltas. - LOG(FATAL) << "Invalid delta action!"; - } + EncodeDelta(&wal_, name_id_mapper_, items_, delta, vertex, timestamp); UpdateStats(timestamp); } void WalFile::AppendDelta(const Delta &delta, const Edge &edge, uint64_t timestamp) { - // When converting a Delta to a WAL delta the logic is inverted. That is - // because the Delta's represent undo actions and we want to store redo - // actions. - wal_.WriteMarker(Marker::SECTION_DELTA); - wal_.WriteUint(timestamp); - std::lock_guard guard(edge.lock); - switch (delta.action) { - case Delta::Action::SET_PROPERTY: { - wal_.WriteMarker(Marker::DELTA_EDGE_SET_PROPERTY); - wal_.WriteUint(edge.gid.AsUint()); - wal_.WriteString(name_id_mapper_->IdToName(delta.property.key.AsUint())); - // The property value is the value that is currently stored in the - // edge. - // TODO (mferencevic): Mitigate the memory allocation introduced here - // (with the `GetProperty` call). It is the only memory allocation in the - // entire WAL file writing logic. - wal_.WritePropertyValue(edge.properties.GetProperty(delta.property.key)); - break; - } - case Delta::Action::DELETE_OBJECT: - case Delta::Action::RECREATE_OBJECT: - // These actions are already encoded in vertex *_OUT_EDGE actions. Also, - // these deltas don't contain any information about the from vertex, to - // vertex or edge type so they are useless. This function should never - // be called for this type of deltas. - LOG(FATAL) << "Invalid delta action!"; - case Delta::Action::ADD_LABEL: - case Delta::Action::REMOVE_LABEL: - case Delta::Action::ADD_OUT_EDGE: - case Delta::Action::REMOVE_OUT_EDGE: - case Delta::Action::ADD_IN_EDGE: - case Delta::Action::REMOVE_IN_EDGE: - // These deltas shouldn't appear for edges. - LOG(FATAL) << "Invalid database state!"; - } + EncodeDelta(&wal_, name_id_mapper_, delta, edge, timestamp); UpdateStats(timestamp); } void WalFile::AppendTransactionEnd(uint64_t timestamp) { - wal_.WriteMarker(Marker::SECTION_DELTA); - wal_.WriteUint(timestamp); - wal_.WriteMarker(Marker::DELTA_TRANSACTION_END); + EncodeTransactionEnd(&wal_, timestamp); UpdateStats(timestamp); } void WalFile::AppendOperation(StorageGlobalOperation operation, LabelId label, const std::set &properties, uint64_t timestamp) { - wal_.WriteMarker(Marker::SECTION_DELTA); - wal_.WriteUint(timestamp); - switch (operation) { - case StorageGlobalOperation::LABEL_INDEX_CREATE: - case StorageGlobalOperation::LABEL_INDEX_DROP: { - CHECK(properties.empty()) << "Invalid function call!"; - wal_.WriteMarker(OperationToMarker(operation)); - wal_.WriteString(name_id_mapper_->IdToName(label.AsUint())); - break; - } - case StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE: - case StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP: - case StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE: - case StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP: { - CHECK(properties.size() == 1) << "Invalid function call!"; - wal_.WriteMarker(OperationToMarker(operation)); - wal_.WriteString(name_id_mapper_->IdToName(label.AsUint())); - wal_.WriteString( - name_id_mapper_->IdToName((*properties.begin()).AsUint())); - break; - } - case StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE: - case StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP: { - CHECK(!properties.empty()) << "Invalid function call!"; - wal_.WriteMarker(OperationToMarker(operation)); - wal_.WriteString(name_id_mapper_->IdToName(label.AsUint())); - wal_.WriteUint(properties.size()); - for (const auto &property : properties) { - wal_.WriteString(name_id_mapper_->IdToName(property.AsUint())); - } - break; - } - } + EncodeOperation(&wal_, name_id_mapper_, operation, label, properties, + timestamp); UpdateStats(timestamp); } diff --git a/src/storage/v2/durability/wal.hpp b/src/storage/v2/durability/wal.hpp index 12ecc0a65..ae0f7c1b3 100644 --- a/src/storage/v2/durability/wal.hpp +++ b/src/storage/v2/durability/wal.hpp @@ -127,6 +127,24 @@ WalDeltaData ReadWalDeltaData(Decoder *wal); /// @throw RecoveryFailure WalDeltaData::Type SkipWalDeltaData(Decoder *wal); +/// Function used to encode a `Delta` that originated from a `Vertex`. +void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper, + Config::Items items, const Delta &delta, const Vertex &vertex, + uint64_t timestamp); + +/// Function used to encode a `Delta` that originated from an `Edge`. +void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper, + const Delta &delta, const Edge &edge, uint64_t timestamp); + +/// Function used to encode the transaction end. +void EncodeTransactionEnd(BaseEncoder *encoder, uint64_t timestamp); + +/// Function used to encode non-transactional operation. +void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, + StorageGlobalOperation operation, LabelId label, + const std::set &properties, + uint64_t timestamp); + /// Function used to load the WAL data into the storage. /// @throw RecoveryFailure RecoveryInfo LoadWal(const std::filesystem::path &path,