From 2e0586e182009702789b05f3fe353318bef72a5e Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Mon, 28 Oct 2019 14:52:41 +0100 Subject: [PATCH] Implement single function for WAL loading Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2513 --- src/storage/v2/durability.cpp | 362 ++++++++++++++++++++++------- src/storage/v2/durability.hpp | 78 +++++++ tests/unit/storage_v2_wal_file.cpp | 129 +++++++++- 3 files changed, 482 insertions(+), 87 deletions(-) diff --git a/src/storage/v2/durability.cpp b/src/storage/v2/durability.cpp index f64110efa..95ef2ba19 100644 --- a/src/storage/v2/durability.cpp +++ b/src/storage/v2/durability.cpp @@ -434,10 +434,6 @@ bool Decoder::SetPosition(uint64_t position) { ///////////////////////////// namespace { -// Magic values written to the start of a snapshot/WAL file to identify it. -const std::string kSnapshotMagic{"MGsn"}; -const std::string kWalMagic{"MGwl"}; - // The current version of snapshot and WAL encoding / decoding. // IMPORTANT: Please bump this version for every snapshot and/or WAL format // change!!! @@ -614,6 +610,204 @@ Marker VertexActionToMarker(Delta::Action action) { return Marker::DELTA_EDGE_CREATE; } } + +// This function convertes a Marker to a WalDeltaData::Type. It checks for the +// validity of the marker and throws if an invalid marker is specified. +// @throw RecoveryFailure +WalDeltaData::Type MarkerToWalDeltaDataType(Marker marker) { + switch (marker) { + case Marker::DELTA_VERTEX_CREATE: + return WalDeltaData::Type::VERTEX_CREATE; + case Marker::DELTA_VERTEX_DELETE: + return WalDeltaData::Type::VERTEX_DELETE; + case Marker::DELTA_VERTEX_ADD_LABEL: + return WalDeltaData::Type::VERTEX_ADD_LABEL; + case Marker::DELTA_VERTEX_REMOVE_LABEL: + return WalDeltaData::Type::VERTEX_REMOVE_LABEL; + case Marker::DELTA_EDGE_CREATE: + return WalDeltaData::Type::EDGE_CREATE; + case Marker::DELTA_EDGE_DELETE: + return WalDeltaData::Type::EDGE_DELETE; + case Marker::DELTA_VERTEX_SET_PROPERTY: + return WalDeltaData::Type::VERTEX_SET_PROPERTY; + case Marker::DELTA_EDGE_SET_PROPERTY: + return WalDeltaData::Type::EDGE_SET_PROPERTY; + case Marker::DELTA_TRANSACTION_END: + return WalDeltaData::Type::TRANSACTION_END; + case Marker::DELTA_LABEL_INDEX_CREATE: + return WalDeltaData::Type::LABEL_INDEX_CREATE; + case Marker::DELTA_LABEL_INDEX_DROP: + return WalDeltaData::Type::LABEL_INDEX_DROP; + case Marker::DELTA_LABEL_PROPERTY_INDEX_CREATE: + return WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE; + case Marker::DELTA_LABEL_PROPERTY_INDEX_DROP: + return WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP; + case Marker::DELTA_EXISTENCE_CONSTRAINT_CREATE: + return WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE; + case Marker::DELTA_EXISTENCE_CONSTRAINT_DROP: + return WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP; + + case Marker::TYPE_NULL: + case Marker::TYPE_BOOL: + case Marker::TYPE_INT: + case Marker::TYPE_DOUBLE: + case Marker::TYPE_STRING: + case Marker::TYPE_LIST: + case Marker::TYPE_MAP: + case Marker::TYPE_PROPERTY_VALUE: + case Marker::SECTION_VERTEX: + case Marker::SECTION_EDGE: + case Marker::SECTION_MAPPER: + case Marker::SECTION_METADATA: + case Marker::SECTION_INDICES: + case Marker::SECTION_CONSTRAINTS: + case Marker::SECTION_DELTA: + case Marker::SECTION_OFFSETS: + case Marker::VALUE_FALSE: + case Marker::VALUE_TRUE: + throw RecoveryFailure("Invalid WAL data!"); + } +} + +bool IsWalDeltaDataTypeTransactionEnd(WalDeltaData::Type type) { + switch (type) { + // These delta actions are all found inside transactions so they don't + // indicate a transaction end. + case WalDeltaData::Type::VERTEX_CREATE: + case WalDeltaData::Type::VERTEX_DELETE: + case WalDeltaData::Type::VERTEX_ADD_LABEL: + case WalDeltaData::Type::VERTEX_REMOVE_LABEL: + case WalDeltaData::Type::EDGE_CREATE: + case WalDeltaData::Type::EDGE_DELETE: + case WalDeltaData::Type::VERTEX_SET_PROPERTY: + case WalDeltaData::Type::EDGE_SET_PROPERTY: + return false; + + // This delta explicitly indicates that a transaction is done. + case WalDeltaData::Type::TRANSACTION_END: + return true; + + // These operations aren't transactional and they are encoded only using + // a single delta, so they each individually mark the end of their + // 'transaction'. + case WalDeltaData::Type::LABEL_INDEX_CREATE: + case WalDeltaData::Type::LABEL_INDEX_DROP: + case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE: + case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP: + case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE: + case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: + return true; + } +} + +// Function used to either read or skip the current WAL delta data. The WAL +// delta header must be read before calling this function. If the delta data is +// read then the data returned is valid, if the delta data is skipped then the +// returned data is not guaranteed to be set (it could be empty) and shouldn't +// be used. +// @throw RecoveryFailure +template +WalDeltaData ReadSkipWalDeltaData(Decoder *wal) { + WalDeltaData delta; + + auto action = wal->ReadMarker(); + if (!action) throw RecoveryFailure("Invalid WAL data!"); + delta.type = MarkerToWalDeltaDataType(*action); + + switch (delta.type) { + case WalDeltaData::Type::VERTEX_CREATE: + case WalDeltaData::Type::VERTEX_DELETE: { + auto gid = wal->ReadUint(); + if (!gid) throw RecoveryFailure("Invalid WAL data!"); + delta.vertex_create_delete.gid = Gid::FromUint(*gid); + break; + } + case WalDeltaData::Type::VERTEX_ADD_LABEL: + case WalDeltaData::Type::VERTEX_REMOVE_LABEL: { + auto gid = wal->ReadUint(); + if (!gid) throw RecoveryFailure("Invalid WAL data!"); + delta.vertex_add_remove_label.gid = Gid::FromUint(*gid); + if constexpr (read_data) { + auto label = wal->ReadString(); + if (!label) throw RecoveryFailure("Invalid WAL data!"); + delta.vertex_add_remove_label.label = std::move(*label); + } else { + if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!"); + } + break; + } + case WalDeltaData::Type::VERTEX_SET_PROPERTY: + case WalDeltaData::Type::EDGE_SET_PROPERTY: { + auto gid = wal->ReadUint(); + if (!gid) throw RecoveryFailure("Invalid WAL data!"); + delta.vertex_edge_set_property.gid = Gid::FromUint(*gid); + if constexpr (read_data) { + auto property = wal->ReadString(); + if (!property) throw RecoveryFailure("Invalid WAL data!"); + delta.vertex_edge_set_property.property = std::move(*property); + auto value = wal->ReadPropertyValue(); + if (!value) throw RecoveryFailure("Invalid WAL data!"); + delta.vertex_edge_set_property.value = std::move(*value); + } else { + if (!wal->SkipString() || !wal->SkipPropertyValue()) + throw RecoveryFailure("Invalid WAL data!"); + } + break; + } + case WalDeltaData::Type::EDGE_CREATE: + case WalDeltaData::Type::EDGE_DELETE: { + auto gid = wal->ReadUint(); + if (!gid) throw RecoveryFailure("Invalid WAL data!"); + delta.edge_create_delete.gid = Gid::FromUint(*gid); + if constexpr (read_data) { + auto edge_type = wal->ReadString(); + if (!edge_type) throw RecoveryFailure("Invalid WAL data!"); + delta.edge_create_delete.edge_type = std::move(*edge_type); + } else { + if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!"); + } + auto from_gid = wal->ReadUint(); + if (!from_gid) throw RecoveryFailure("Invalid WAL data!"); + delta.edge_create_delete.from_vertex = Gid::FromUint(*from_gid); + auto to_gid = wal->ReadUint(); + if (!to_gid) throw RecoveryFailure("Invalid WAL data!"); + delta.edge_create_delete.to_vertex = Gid::FromUint(*to_gid); + break; + } + case WalDeltaData::Type::TRANSACTION_END: + break; + case WalDeltaData::Type::LABEL_INDEX_CREATE: + case WalDeltaData::Type::LABEL_INDEX_DROP: { + if constexpr (read_data) { + auto label = wal->ReadString(); + if (!label) throw RecoveryFailure("Invalid WAL data!"); + delta.operation_label.label = std::move(*label); + } else { + if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!"); + } + break; + } + case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE: + case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP: + case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE: + case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: { + if constexpr (read_data) { + auto label = wal->ReadString(); + if (!label) throw RecoveryFailure("Invalid WAL data!"); + delta.operation_label_property.label = std::move(*label); + auto property = wal->ReadString(); + if (!property) throw RecoveryFailure("Invalid WAL data!"); + delta.operation_label_property.property = std::move(*property); + } else { + if (!wal->SkipString() || !wal->SkipString()) + throw RecoveryFailure("Invalid WAL data!"); + } + break; + } + } + + return delta; +} } // namespace // Function used to read information about the snapshot file. @@ -737,87 +931,13 @@ WalInfo ReadWalInfo(const std::filesystem::path &path) { // Read deltas. info.num_deltas = 0; auto validate_delta = [&wal]() -> std::optional> { - auto marker = wal.ReadMarker(); - if (!marker || *marker != Marker::SECTION_DELTA) return std::nullopt; - - auto timestamp = wal.ReadUint(); - if (!timestamp) return std::nullopt; - - auto action = wal.ReadMarker(); - if (!action) return std::nullopt; - - bool is_transaction_end; - switch (*action) { - // These delta actions are all found inside transactions so they don't - // indicate a transaction end. - case Marker::DELTA_VERTEX_CREATE: - case Marker::DELTA_VERTEX_DELETE: - if (!wal.ReadUint()) return std::nullopt; - is_transaction_end = false; - break; - case Marker::DELTA_VERTEX_ADD_LABEL: - case Marker::DELTA_VERTEX_REMOVE_LABEL: - if (!wal.ReadUint() || !wal.SkipString()) return std::nullopt; - is_transaction_end = false; - break; - case Marker::DELTA_EDGE_CREATE: - case Marker::DELTA_EDGE_DELETE: - if (!wal.ReadUint() || !wal.SkipString() || !wal.ReadUint() || - !wal.ReadUint()) - return std::nullopt; - is_transaction_end = false; - break; - case Marker::DELTA_VERTEX_SET_PROPERTY: - case Marker::DELTA_EDGE_SET_PROPERTY: - if (!wal.ReadUint() || !wal.SkipString() || !wal.SkipPropertyValue()) - return std::nullopt; - is_transaction_end = false; - break; - - // This delta explicitly indicates that a transaction is done. - case Marker::DELTA_TRANSACTION_END: - is_transaction_end = true; - break; - - // These operations aren't transactional and they are encoded only using - // a single delta, so they each individually mark the end of their - // 'transaction'. - case Marker::DELTA_LABEL_INDEX_CREATE: - case Marker::DELTA_LABEL_INDEX_DROP: - if (!wal.SkipString()) return std::nullopt; - is_transaction_end = true; - break; - case Marker::DELTA_LABEL_PROPERTY_INDEX_CREATE: - case Marker::DELTA_LABEL_PROPERTY_INDEX_DROP: - case Marker::DELTA_EXISTENCE_CONSTRAINT_CREATE: - case Marker::DELTA_EXISTENCE_CONSTRAINT_DROP: - if (!wal.SkipString() || !wal.SkipString()) return std::nullopt; - is_transaction_end = true; - break; - - // These markers aren't delta actions. - case Marker::TYPE_NULL: - case Marker::TYPE_BOOL: - case Marker::TYPE_INT: - case Marker::TYPE_DOUBLE: - case Marker::TYPE_STRING: - case Marker::TYPE_LIST: - case Marker::TYPE_MAP: - case Marker::TYPE_PROPERTY_VALUE: - case Marker::SECTION_VERTEX: - case Marker::SECTION_EDGE: - case Marker::SECTION_MAPPER: - case Marker::SECTION_METADATA: - case Marker::SECTION_INDICES: - case Marker::SECTION_CONSTRAINTS: - case Marker::SECTION_DELTA: - case Marker::SECTION_OFFSETS: - case Marker::VALUE_FALSE: - case Marker::VALUE_TRUE: - return std::nullopt; + try { + auto timestamp = ReadWalDeltaHeader(&wal); + auto type = SkipWalDeltaData(&wal); + return {{timestamp, IsWalDeltaDataTypeTransactionEnd(type)}}; + } catch (const RecoveryFailure &) { + return std::nullopt; } - - return {{*timestamp, is_transaction_end}}; }; auto size = wal.GetSize(); // Here we read the whole file and determine the number of valid deltas. A @@ -855,6 +975,80 @@ WalInfo ReadWalInfo(const std::filesystem::path &path) { return info; } +bool operator==(const WalDeltaData &a, const WalDeltaData &b) { + if (a.type != b.type) return false; + switch (a.type) { + case WalDeltaData::Type::VERTEX_CREATE: + case WalDeltaData::Type::VERTEX_DELETE: + return a.vertex_create_delete.gid == b.vertex_create_delete.gid; + + case WalDeltaData::Type::VERTEX_ADD_LABEL: + case WalDeltaData::Type::VERTEX_REMOVE_LABEL: + return a.vertex_add_remove_label.gid == b.vertex_add_remove_label.gid && + a.vertex_add_remove_label.label == b.vertex_add_remove_label.label; + + case WalDeltaData::Type::VERTEX_SET_PROPERTY: + case WalDeltaData::Type::EDGE_SET_PROPERTY: + return a.vertex_edge_set_property.gid == b.vertex_edge_set_property.gid && + a.vertex_edge_set_property.property == + b.vertex_edge_set_property.property && + a.vertex_edge_set_property.value == + b.vertex_edge_set_property.value; + + case WalDeltaData::Type::EDGE_CREATE: + case WalDeltaData::Type::EDGE_DELETE: + return a.edge_create_delete.gid == b.edge_create_delete.gid && + a.edge_create_delete.edge_type == b.edge_create_delete.edge_type && + a.edge_create_delete.from_vertex == + b.edge_create_delete.from_vertex && + a.edge_create_delete.to_vertex == b.edge_create_delete.to_vertex; + + case WalDeltaData::Type::TRANSACTION_END: + return true; + + case WalDeltaData::Type::LABEL_INDEX_CREATE: + case WalDeltaData::Type::LABEL_INDEX_DROP: + return a.operation_label.label == b.operation_label.label; + + case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE: + case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP: + case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE: + case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: + return a.operation_label_property.label == + b.operation_label_property.label && + a.operation_label_property.property == + b.operation_label_property.property; + } +} +bool operator!=(const WalDeltaData &a, const WalDeltaData &b) { + return !(a == b); +} + +// Function used to read the WAL delta header. The function returns the delta +// timestamp. +uint64_t ReadWalDeltaHeader(Decoder *wal) { + auto marker = wal->ReadMarker(); + if (!marker || *marker != Marker::SECTION_DELTA) + throw RecoveryFailure("Invalid WAL data!"); + + auto timestamp = wal->ReadUint(); + if (!timestamp) throw RecoveryFailure("Invalid WAL data!"); + return *timestamp; +} + +// Function used to either read the current WAL delta data. The WAL delta header +// must be read before calling this function. +WalDeltaData ReadWalDeltaData(Decoder *wal) { + return ReadSkipWalDeltaData(wal); +} + +// Function used to either skip the current WAL delta data. The WAL delta header +// must be read before calling this function. +WalDeltaData::Type SkipWalDeltaData(Decoder *wal) { + auto delta = ReadSkipWalDeltaData(wal); + return delta.type; +} + WalFile::WalFile(const std::filesystem::path &wal_directory, const std::string &uuid, Config::Items items, NameIdMapper *name_id_mapper, uint64_t seq_num) diff --git a/src/storage/v2/durability.hpp b/src/storage/v2/durability.hpp index 8430d0579..94155b9d7 100644 --- a/src/storage/v2/durability.hpp +++ b/src/storage/v2/durability.hpp @@ -27,6 +27,10 @@ namespace storage { static const std::string kSnapshotDirectory{"snapshots"}; static const std::string kWalDirectory{"wal"}; +// Magic values written to the start of a snapshot/WAL file to identify it. +const std::string kSnapshotMagic{"MGsn"}; +const std::string kWalMagic{"MGwl"}; + static_assert(std::is_same_v); /// Markers that are used to indicate crucial parts of the snapshot/WAL. @@ -208,6 +212,80 @@ struct WalInfo { /// @throw RecoveryFailure WalInfo ReadWalInfo(const std::filesystem::path &path); +/// Structure used to return loaded WAL delta data. +struct WalDeltaData { + enum class Type { + VERTEX_CREATE, + VERTEX_DELETE, + VERTEX_ADD_LABEL, + VERTEX_REMOVE_LABEL, + VERTEX_SET_PROPERTY, + EDGE_CREATE, + EDGE_DELETE, + EDGE_SET_PROPERTY, + TRANSACTION_END, + LABEL_INDEX_CREATE, + LABEL_INDEX_DROP, + LABEL_PROPERTY_INDEX_CREATE, + LABEL_PROPERTY_INDEX_DROP, + EXISTENCE_CONSTRAINT_CREATE, + EXISTENCE_CONSTRAINT_DROP, + }; + + Type type{Type::TRANSACTION_END}; + + struct { + Gid gid; + } vertex_create_delete; + + struct { + Gid gid; + std::string label; + } vertex_add_remove_label; + + struct { + Gid gid; + std::string property; + PropertyValue value; + } vertex_edge_set_property; + + struct { + Gid gid; + std::string edge_type; + Gid from_vertex; + Gid to_vertex; + } edge_create_delete; + + struct { + std::string label; + } operation_label; + + struct { + std::string label; + std::string property; + } operation_label_property; +}; + +bool operator==(const WalDeltaData &a, const WalDeltaData &b); +bool operator!=(const WalDeltaData &a, const WalDeltaData &b); + +/// Function used to read the WAL delta header. The function returns the delta +/// timestamp. +/// @throw RecoveryFailure +uint64_t ReadWalDeltaHeader(Decoder *wal); + +/// Function used to either read the current WAL delta data. The function +/// returns the read delta data. The WAL delta header must be read before +/// calling this function. +/// @throw RecoveryFailure +WalDeltaData ReadWalDeltaData(Decoder *wal); + +/// Function used to either skip the current WAL delta data. The function +/// returns the skipped delta type. The WAL delta header must be read before +/// calling this function. +/// @throw RecoveryFailure +WalDeltaData::Type SkipWalDeltaData(Decoder *wal); + /// Enum used to indicate a global database operation that isn't transactional. enum class StorageGlobalOperation { LABEL_INDEX_CREATE, diff --git a/tests/unit/storage_v2_wal_file.cpp b/tests/unit/storage_v2_wal_file.cpp index 5755164b2..afe889180 100644 --- a/tests/unit/storage_v2_wal_file.cpp +++ b/tests/unit/storage_v2_wal_file.cpp @@ -12,6 +12,25 @@ #include "utils/file.hpp" #include "utils/uuid.hpp" +// Helper function used to convert between enum types. +storage::WalDeltaData::Type StorageGlobalOperationToWalDeltaDataType( + storage::StorageGlobalOperation operation) { + switch (operation) { + case storage::StorageGlobalOperation::LABEL_INDEX_CREATE: + return storage::WalDeltaData::Type::LABEL_INDEX_CREATE; + case storage::StorageGlobalOperation::LABEL_INDEX_DROP: + return storage::WalDeltaData::Type::LABEL_INDEX_DROP; + case storage::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE: + return storage::WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE; + case storage::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP: + return storage::WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP; + case storage::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE: + return storage::WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE; + case storage::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP: + return storage::WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP; + } +} + // This class mimics the internals of the storage to generate the deltas. class DeltaGenerator final { public: @@ -28,12 +47,24 @@ class DeltaGenerator final { auto delta = storage::CreateDeleteObjectDelta(&transaction_); auto &it = gen_->vertices_.emplace_back(gid, delta); delta->prev.Set(&it); + { + storage::WalDeltaData data; + data.type = storage::WalDeltaData::Type::VERTEX_CREATE; + data.vertex_create_delete.gid = gid; + data_.push_back(data); + } return ⁢ } void DeleteVertex(storage::Vertex *vertex) { storage::CreateAndLinkDelta(&transaction_, &*vertex, storage::Delta::RecreateObjectTag()); + { + storage::WalDeltaData data; + data.type = storage::WalDeltaData::Type::VERTEX_DELETE; + data.vertex_create_delete.gid = vertex->gid; + data_.push_back(data); + } } void AddLabel(storage::Vertex *vertex, const std::string &label) { @@ -41,6 +72,13 @@ class DeltaGenerator final { vertex->labels.push_back(label_id); storage::CreateAndLinkDelta(&transaction_, &*vertex, storage::Delta::RemoveLabelTag(), label_id); + { + storage::WalDeltaData data; + data.type = storage::WalDeltaData::Type::VERTEX_ADD_LABEL; + data.vertex_add_remove_label.gid = vertex->gid; + data.vertex_add_remove_label.label = label; + data_.push_back(data); + } } void RemoveLabel(storage::Vertex *vertex, const std::string &label) { @@ -49,6 +87,13 @@ class DeltaGenerator final { std::find(vertex->labels.begin(), vertex->labels.end(), label_id)); storage::CreateAndLinkDelta(&transaction_, &*vertex, storage::Delta::AddLabelTag(), label_id); + { + storage::WalDeltaData data; + data.type = storage::WalDeltaData::Type::VERTEX_REMOVE_LABEL; + data.vertex_add_remove_label.gid = vertex->gid; + data.vertex_add_remove_label.label = label; + data_.push_back(data); + } } void SetProperty(storage::Vertex *vertex, const std::string &property, @@ -74,6 +119,17 @@ class DeltaGenerator final { props.erase(it); } } + { + storage::WalDeltaData data; + data.type = storage::WalDeltaData::Type::VERTEX_SET_PROPERTY; + data.vertex_edge_set_property.gid = vertex->gid; + data.vertex_edge_set_property.property = property; + // We don't store the property value here. That is because the storage + // generates multiple `SetProperty` deltas using only the final values + // of the property. The intermediate values aren't encoded. The value is + // later determined in the `Finalize` function. + data_.push_back(data); + } } void Finalize(bool append_transaction_end = true) { @@ -93,7 +149,33 @@ class DeltaGenerator final { } if (append_transaction_end) { gen_->wal_file_.AppendTransactionEnd(commit_timestamp); - gen_->UpdateStats(commit_timestamp, transaction_.deltas.size() + 1); + if (gen_->valid_) { + gen_->UpdateStats(commit_timestamp, transaction_.deltas.size() + 1); + for (auto &data : data_) { + if (data.type == storage::WalDeltaData::Type::VERTEX_SET_PROPERTY) { + // We need to put the final property value into the SET_PROPERTY + // delta. + auto vertex = + std::find(gen_->vertices_.begin(), gen_->vertices_.end(), + data.vertex_edge_set_property.gid); + ASSERT_NE(vertex, gen_->vertices_.end()); + auto property_id = + storage::PropertyId::FromUint(gen_->mapper_.NameToId( + data.vertex_edge_set_property.property)); + auto &props = vertex->properties; + auto it = props.find(property_id); + if (it == props.end()) { + data.vertex_edge_set_property.value = storage::PropertyValue(); + } else { + data.vertex_edge_set_property.value = it->second; + } + } + gen_->data_.emplace_back(commit_timestamp, data); + } + storage::WalDeltaData data{ + .type = storage::WalDeltaData::Type::TRANSACTION_END}; + gen_->data_.emplace_back(commit_timestamp, data); + } } else { gen_->valid_ = false; } @@ -102,8 +184,11 @@ class DeltaGenerator final { private: DeltaGenerator *gen_; storage::Transaction transaction_; + std::vector data_; }; + using DataT = std::vector>; + DeltaGenerator(const std::filesystem::path &data_directory, bool properties_on_edges, uint64_t seq_num) : uuid_(utils::GenerateUUID()), @@ -129,7 +214,24 @@ class DeltaGenerator final { property_id = storage::PropertyId::FromUint(mapper_.NameToId(*property)); } wal_file_.AppendOperation(operation, label_id, property_id, timestamp_); - UpdateStats(timestamp_, 1); + if (valid_) { + UpdateStats(timestamp_, 1); + storage::WalDeltaData data; + data.type = StorageGlobalOperationToWalDeltaDataType(operation); + switch (operation) { + case storage::StorageGlobalOperation::LABEL_INDEX_CREATE: + case storage::StorageGlobalOperation::LABEL_INDEX_DROP: + data.operation_label.label = label; + break; + case storage::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE: + case storage::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP: + case storage::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE: + case storage::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP: + data.operation_label_property.label = label; + data.operation_label_property.property = *property; + } + data_.emplace_back(timestamp_, data); + } } uint64_t GetPosition() { return wal_file_.GetSize(); } @@ -144,9 +246,10 @@ class DeltaGenerator final { .num_deltas = deltas_count_}; } + DataT GetData() { return data_; } + private: void UpdateStats(uint64_t timestamp, uint64_t count) { - if (!valid_) return; if (deltas_count_ == 0) { tx_from_ = timestamp; } @@ -165,6 +268,8 @@ class DeltaGenerator final { storage::WalFile wal_file_; + DataT data_; + uint64_t deltas_count_{0}; uint64_t tx_from_{0}; uint64_t tx_to_{0}; @@ -191,6 +296,21 @@ void AssertWalInfoEqual(const storage::WalInfo &a, const storage::WalInfo &b) { ASSERT_EQ(a.num_deltas, b.num_deltas); } +void AssertWalDataEqual(const DeltaGenerator::DataT &data, + const std::filesystem::path &path) { + auto info = storage::ReadWalInfo(path); + storage::Decoder wal; + wal.Initialize(path, storage::kWalMagic); + wal.SetPosition(info.offset_deltas); + DeltaGenerator::DataT current; + for (uint64_t i = 0; i < info.num_deltas; ++i) { + auto timestamp = storage::ReadWalDeltaHeader(&wal); + current.emplace_back(timestamp, storage::ReadWalDeltaData(&wal)); + } + ASSERT_EQ(data.size(), current.size()); + ASSERT_EQ(data, current); +} + class WalFileTest : public ::testing::TestWithParam { public: WalFileTest() {} @@ -236,11 +356,13 @@ TEST_P(WalFileTest, EmptyFile) { #define GENERATE_SIMPLE_TEST(name, ops) \ TEST_P(WalFileTest, name) { \ storage::WalInfo info; \ + DeltaGenerator::DataT data; \ \ { \ DeltaGenerator gen(storage_directory, GetParam(), 5); \ ops; \ info = gen.GetInfo(); \ + data = gen.GetData(); \ } \ \ auto wal_files = GetFilesList(); \ @@ -251,6 +373,7 @@ TEST_P(WalFileTest, EmptyFile) { storage::RecoveryFailure); \ } else { \ AssertWalInfoEqual(info, storage::ReadWalInfo(wal_files.front())); \ + AssertWalDataEqual(data, wal_files.front()); \ } \ }