From c12a87e9ca146e582f71d71b3967100de2731838 Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Mon, 17 Aug 2020 10:30:36 +0200 Subject: [PATCH] Allow inheritance of storage durability decoder Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2810 --- src/storage/v2/durability/serialization.hpp | 36 ++++++++--- src/storage/v2/durability/wal.cpp | 67 +++++++++++---------- src/storage/v2/durability/wal.hpp | 6 +- 3 files changed, 64 insertions(+), 45 deletions(-) diff --git a/src/storage/v2/durability/serialization.hpp b/src/storage/v2/durability/serialization.hpp index 18018ea8c..bdd3cd74f 100644 --- a/src/storage/v2/durability/serialization.hpp +++ b/src/storage/v2/durability/serialization.hpp @@ -55,8 +55,26 @@ class Encoder final : public BaseEncoder { utils::OutputFile file_; }; +/// Decoder interface class. Used to implement streams from different sources +/// (e.g. file and network). +class BaseDecoder { + protected: + ~BaseDecoder() {} + + public: + virtual std::optional ReadMarker() = 0; + virtual std::optional ReadBool() = 0; + virtual std::optional ReadUint() = 0; + virtual std::optional ReadDouble() = 0; + virtual std::optional ReadString() = 0; + virtual std::optional ReadPropertyValue() = 0; + + virtual bool SkipString() = 0; + virtual bool SkipPropertyValue() = 0; +}; + /// Decoder that is used to read a generated snapshot/WAL. -class Decoder final { +class Decoder final : public BaseDecoder { public: std::optional Initialize(const std::filesystem::path &path, const std::string &magic); @@ -68,15 +86,15 @@ class Decoder final { std::optional PeekMarker(); - std::optional ReadMarker(); - std::optional ReadBool(); - std::optional ReadUint(); - std::optional ReadDouble(); - std::optional ReadString(); - std::optional ReadPropertyValue(); + std::optional ReadMarker() override; + std::optional ReadBool() override; + std::optional ReadUint() override; + std::optional ReadDouble() override; + std::optional ReadString() override; + std::optional ReadPropertyValue() override; - bool SkipString(); - bool SkipPropertyValue(); + bool SkipString() override; + bool SkipPropertyValue() override; std::optional GetSize(); std::optional GetPosition(); diff --git a/src/storage/v2/durability/wal.cpp b/src/storage/v2/durability/wal.cpp index ebf9c7ab3..49f1723b6 100644 --- a/src/storage/v2/durability/wal.cpp +++ b/src/storage/v2/durability/wal.cpp @@ -211,69 +211,69 @@ bool IsWalDeltaDataTypeTransactionEnd(WalDeltaData::Type type) { // be used. // @throw RecoveryFailure template -WalDeltaData ReadSkipWalDeltaData(Decoder *wal) { +WalDeltaData ReadSkipWalDeltaData(BaseDecoder *decoder) { WalDeltaData delta; - auto action = wal->ReadMarker(); + auto action = decoder->ReadMarker(); if (!action) throw RecoveryFailure("Invalid WAL data!"); delta.type = MarkerToWalDeltaDataType(*action); switch (delta.type) { case WalDeltaData::Type::VERTEX_CREATE: case WalDeltaData::Type::VERTEX_DELETE: { - auto gid = wal->ReadUint(); + auto gid = decoder->ReadUint(); if (!gid) throw RecoveryFailure("Invalid WAL data!"); delta.vertex_create_delete.gid = Gid::FromUint(*gid); break; } case WalDeltaData::Type::VERTEX_ADD_LABEL: case WalDeltaData::Type::VERTEX_REMOVE_LABEL: { - auto gid = wal->ReadUint(); + auto gid = decoder->ReadUint(); if (!gid) throw RecoveryFailure("Invalid WAL data!"); delta.vertex_add_remove_label.gid = Gid::FromUint(*gid); if constexpr (read_data) { - auto label = wal->ReadString(); + auto label = decoder->ReadString(); if (!label) throw RecoveryFailure("Invalid WAL data!"); delta.vertex_add_remove_label.label = std::move(*label); } else { - if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!"); + if (!decoder->SkipString()) throw RecoveryFailure("Invalid WAL data!"); } break; } case WalDeltaData::Type::VERTEX_SET_PROPERTY: case WalDeltaData::Type::EDGE_SET_PROPERTY: { - auto gid = wal->ReadUint(); + auto gid = decoder->ReadUint(); if (!gid) throw RecoveryFailure("Invalid WAL data!"); delta.vertex_edge_set_property.gid = Gid::FromUint(*gid); if constexpr (read_data) { - auto property = wal->ReadString(); + auto property = decoder->ReadString(); if (!property) throw RecoveryFailure("Invalid WAL data!"); delta.vertex_edge_set_property.property = std::move(*property); - auto value = wal->ReadPropertyValue(); + auto value = decoder->ReadPropertyValue(); if (!value) throw RecoveryFailure("Invalid WAL data!"); delta.vertex_edge_set_property.value = std::move(*value); } else { - if (!wal->SkipString() || !wal->SkipPropertyValue()) + if (!decoder->SkipString() || !decoder->SkipPropertyValue()) throw RecoveryFailure("Invalid WAL data!"); } break; } case WalDeltaData::Type::EDGE_CREATE: case WalDeltaData::Type::EDGE_DELETE: { - auto gid = wal->ReadUint(); + auto gid = decoder->ReadUint(); if (!gid) throw RecoveryFailure("Invalid WAL data!"); delta.edge_create_delete.gid = Gid::FromUint(*gid); if constexpr (read_data) { - auto edge_type = wal->ReadString(); + auto edge_type = decoder->ReadString(); if (!edge_type) throw RecoveryFailure("Invalid WAL data!"); delta.edge_create_delete.edge_type = std::move(*edge_type); } else { - if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!"); + if (!decoder->SkipString()) throw RecoveryFailure("Invalid WAL data!"); } - auto from_gid = wal->ReadUint(); + auto from_gid = decoder->ReadUint(); if (!from_gid) throw RecoveryFailure("Invalid WAL data!"); delta.edge_create_delete.from_vertex = Gid::FromUint(*from_gid); - auto to_gid = wal->ReadUint(); + auto to_gid = decoder->ReadUint(); if (!to_gid) throw RecoveryFailure("Invalid WAL data!"); delta.edge_create_delete.to_vertex = Gid::FromUint(*to_gid); break; @@ -283,11 +283,11 @@ WalDeltaData ReadSkipWalDeltaData(Decoder *wal) { case WalDeltaData::Type::LABEL_INDEX_CREATE: case WalDeltaData::Type::LABEL_INDEX_DROP: { if constexpr (read_data) { - auto label = wal->ReadString(); + auto label = decoder->ReadString(); if (!label) throw RecoveryFailure("Invalid WAL data!"); delta.operation_label.label = std::move(*label); } else { - if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!"); + if (!decoder->SkipString()) throw RecoveryFailure("Invalid WAL data!"); } break; } @@ -296,14 +296,14 @@ WalDeltaData ReadSkipWalDeltaData(Decoder *wal) { case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE: case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: { if constexpr (read_data) { - auto label = wal->ReadString(); + auto label = decoder->ReadString(); if (!label) throw RecoveryFailure("Invalid WAL data!"); delta.operation_label_property.label = std::move(*label); - auto property = wal->ReadString(); + auto property = decoder->ReadString(); if (!property) throw RecoveryFailure("Invalid WAL data!"); delta.operation_label_property.property = std::move(*property); } else { - if (!wal->SkipString() || !wal->SkipString()) + if (!decoder->SkipString() || !decoder->SkipString()) throw RecoveryFailure("Invalid WAL data!"); } break; @@ -311,23 +311,24 @@ WalDeltaData ReadSkipWalDeltaData(Decoder *wal) { case WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE: case WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP: { if constexpr (read_data) { - auto label = wal->ReadString(); + auto label = decoder->ReadString(); if (!label) throw RecoveryFailure("Invalid WAL data!"); delta.operation_label_properties.label = std::move(*label); - auto properties_count = wal->ReadUint(); + auto properties_count = decoder->ReadUint(); if (!properties_count) throw RecoveryFailure("Invalid WAL data!"); for (uint64_t i = 0; i < *properties_count; ++i) { - auto property = wal->ReadString(); + auto property = decoder->ReadString(); if (!property) throw RecoveryFailure("Invalid WAL data!"); delta.operation_label_properties.properties.emplace( std::move(*property)); } } else { - if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!"); - auto properties_count = wal->ReadUint(); + if (!decoder->SkipString()) throw RecoveryFailure("Invalid WAL data!"); + auto properties_count = decoder->ReadUint(); if (!properties_count) throw RecoveryFailure("Invalid WAL data!"); for (uint64_t i = 0; i < *properties_count; ++i) { - if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!"); + if (!decoder->SkipString()) + throw RecoveryFailure("Invalid WAL data!"); } } } @@ -493,26 +494,26 @@ bool operator!=(const WalDeltaData &a, const WalDeltaData &b) { // Function used to read the WAL delta header. The function returns the delta // timestamp. -uint64_t ReadWalDeltaHeader(Decoder *wal) { - auto marker = wal->ReadMarker(); +uint64_t ReadWalDeltaHeader(BaseDecoder *decoder) { + auto marker = decoder->ReadMarker(); if (!marker || *marker != Marker::SECTION_DELTA) throw RecoveryFailure("Invalid WAL data!"); - auto timestamp = wal->ReadUint(); + auto timestamp = decoder->ReadUint(); if (!timestamp) throw RecoveryFailure("Invalid WAL data!"); return *timestamp; } // Function used to read the current WAL delta data. The WAL delta header must // be read before calling this function. -WalDeltaData ReadWalDeltaData(Decoder *wal) { - return ReadSkipWalDeltaData(wal); +WalDeltaData ReadWalDeltaData(BaseDecoder *decoder) { + return ReadSkipWalDeltaData(decoder); } // Function used to skip the current WAL delta data. The WAL delta header must // be read before calling this function. -WalDeltaData::Type SkipWalDeltaData(Decoder *wal) { - auto delta = ReadSkipWalDeltaData(wal); +WalDeltaData::Type SkipWalDeltaData(BaseDecoder *decoder) { + auto delta = ReadSkipWalDeltaData(decoder); return delta.type; } diff --git a/src/storage/v2/durability/wal.hpp b/src/storage/v2/durability/wal.hpp index ae0f7c1b3..7c14aabfb 100644 --- a/src/storage/v2/durability/wal.hpp +++ b/src/storage/v2/durability/wal.hpp @@ -113,19 +113,19 @@ WalInfo ReadWalInfo(const std::filesystem::path &path); /// Function used to read the WAL delta header. The function returns the delta /// timestamp. /// @throw RecoveryFailure -uint64_t ReadWalDeltaHeader(Decoder *wal); +uint64_t ReadWalDeltaHeader(BaseDecoder *decoder); /// Function used to read the current WAL delta data. The function returns the /// read delta data. The WAL delta header must be read before calling this /// function. /// @throw RecoveryFailure -WalDeltaData ReadWalDeltaData(Decoder *wal); +WalDeltaData ReadWalDeltaData(BaseDecoder *decoder); /// Function used to skip the current WAL delta data. The function returns the /// skipped delta type. The WAL delta header must be read before calling this /// function. /// @throw RecoveryFailure -WalDeltaData::Type SkipWalDeltaData(Decoder *wal); +WalDeltaData::Type SkipWalDeltaData(BaseDecoder *decoder); /// Function used to encode a `Delta` that originated from a `Vertex`. void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper,