From 63de0b5db439116eb68ebe60f2ec9b20e834da93 Mon Sep 17 00:00:00 2001
From: Matej Ferencevic <matej.ferencevic@memgraph.io>
Date: Tue, 26 May 2020 15:28:24 +0200
Subject: [PATCH] Allow inheritance of storage durability encoder

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2809
---
 src/storage/v2/durability/serialization.hpp |  29 ++-
 src/storage/v2/durability/wal.cpp           | 274 +++++++++++---------
 src/storage/v2/durability/wal.hpp           |  18 ++
 3 files changed, 190 insertions(+), 131 deletions(-)

diff --git a/src/storage/v2/durability/serialization.hpp b/src/storage/v2/durability/serialization.hpp
index e0d8a9e59..18018ea8c 100644
--- a/src/storage/v2/durability/serialization.hpp
+++ b/src/storage/v2/durability/serialization.hpp
@@ -12,8 +12,23 @@
 
 namespace storage::durability {
 
+/// Encoder interface class. Used to implement streams to different targets
+/// (e.g. file and network).
+class BaseEncoder {
+ protected:
+  ~BaseEncoder() {}
+
+ public:
+  virtual void WriteMarker(Marker marker) = 0;
+  virtual void WriteBool(bool value) = 0;
+  virtual void WriteUint(uint64_t value) = 0;
+  virtual void WriteDouble(double value) = 0;
+  virtual void WriteString(const std::string_view &value) = 0;
+  virtual void WritePropertyValue(const PropertyValue &value) = 0;
+};
+
 /// Encoder that is used to generate a snapshot/WAL.
-class Encoder final {
+class Encoder final : public BaseEncoder {
  public:
   void Initialize(const std::filesystem::path &path,
                   const std::string_view &magic, uint64_t version);
@@ -22,12 +37,12 @@ class Encoder final {
   // directly.
   void Write(const uint8_t *data, uint64_t size);
 
-  void WriteMarker(Marker marker);
-  void WriteBool(bool value);
-  void WriteUint(uint64_t value);
-  void WriteDouble(double value);
-  void WriteString(const std::string_view &value);
-  void WritePropertyValue(const PropertyValue &value);
+  void WriteMarker(Marker marker) override;
+  void WriteBool(bool value) override;
+  void WriteUint(uint64_t value) override;
+  void WriteDouble(double value) override;
+  void WriteString(const std::string_view &value) override;
+  void WritePropertyValue(const PropertyValue &value) override;
 
   uint64_t GetPosition();
   void SetPosition(uint64_t position);
diff --git a/src/storage/v2/durability/wal.cpp b/src/storage/v2/durability/wal.cpp
index 1272f68ce..ebf9c7ab3 100644
--- a/src/storage/v2/durability/wal.cpp
+++ b/src/storage/v2/durability/wal.cpp
@@ -516,6 +516,151 @@ WalDeltaData::Type SkipWalDeltaData(Decoder *wal) {
   return delta.type;
 }
 
+void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
+                 Config::Items items, const Delta &delta, const Vertex &vertex,
+                 uint64_t timestamp) {
+  // When converting a Delta to a WAL delta the logic is inverted. That is
+  // because the Delta's represent undo actions and we want to store redo
+  // actions.
+  encoder->WriteMarker(Marker::SECTION_DELTA);
+  encoder->WriteUint(timestamp);
+  std::lock_guard<utils::SpinLock> guard(vertex.lock);
+  switch (delta.action) {
+    case Delta::Action::DELETE_OBJECT:
+    case Delta::Action::RECREATE_OBJECT: {
+      encoder->WriteMarker(VertexActionToMarker(delta.action));
+      encoder->WriteUint(vertex.gid.AsUint());
+      break;
+    }
+    case Delta::Action::SET_PROPERTY: {
+      encoder->WriteMarker(Marker::DELTA_VERTEX_SET_PROPERTY);
+      encoder->WriteUint(vertex.gid.AsUint());
+      encoder->WriteString(
+          name_id_mapper->IdToName(delta.property.key.AsUint()));
+      // The property value is the value that is currently stored in the
+      // vertex.
+      // TODO (mferencevic): Mitigate the memory allocation introduced here
+      // (with the `GetProperty` call). It is the only memory allocation in the
+      // entire WAL file writing logic.
+      encoder->WritePropertyValue(
+          vertex.properties.GetProperty(delta.property.key));
+      break;
+    }
+    case Delta::Action::ADD_LABEL:
+    case Delta::Action::REMOVE_LABEL: {
+      encoder->WriteMarker(VertexActionToMarker(delta.action));
+      encoder->WriteUint(vertex.gid.AsUint());
+      encoder->WriteString(name_id_mapper->IdToName(delta.label.AsUint()));
+      break;
+    }
+    case Delta::Action::ADD_OUT_EDGE:
+    case Delta::Action::REMOVE_OUT_EDGE: {
+      encoder->WriteMarker(VertexActionToMarker(delta.action));
+      if (items.properties_on_edges) {
+        encoder->WriteUint(delta.vertex_edge.edge.ptr->gid.AsUint());
+      } else {
+        encoder->WriteUint(delta.vertex_edge.edge.gid.AsUint());
+      }
+      encoder->WriteString(
+          name_id_mapper->IdToName(delta.vertex_edge.edge_type.AsUint()));
+      encoder->WriteUint(vertex.gid.AsUint());
+      encoder->WriteUint(delta.vertex_edge.vertex->gid.AsUint());
+      break;
+    }
+    case Delta::Action::ADD_IN_EDGE:
+    case Delta::Action::REMOVE_IN_EDGE:
+      // These actions are already encoded in the *_OUT_EDGE actions. This
+      // function should never be called for this type of deltas.
+      LOG(FATAL) << "Invalid delta action!";
+  }
+}
+
+void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
+                 const Delta &delta, const Edge &edge, uint64_t timestamp) {
+  // When converting a Delta to a WAL delta the logic is inverted. That is
+  // because the Delta's represent undo actions and we want to store redo
+  // actions.
+  encoder->WriteMarker(Marker::SECTION_DELTA);
+  encoder->WriteUint(timestamp);
+  std::lock_guard<utils::SpinLock> guard(edge.lock);
+  switch (delta.action) {
+    case Delta::Action::SET_PROPERTY: {
+      encoder->WriteMarker(Marker::DELTA_EDGE_SET_PROPERTY);
+      encoder->WriteUint(edge.gid.AsUint());
+      encoder->WriteString(
+          name_id_mapper->IdToName(delta.property.key.AsUint()));
+      // The property value is the value that is currently stored in the
+      // edge.
+      // TODO (mferencevic): Mitigate the memory allocation introduced here
+      // (with the `GetProperty` call). It is the only memory allocation in the
+      // entire WAL file writing logic.
+      encoder->WritePropertyValue(
+          edge.properties.GetProperty(delta.property.key));
+      break;
+    }
+    case Delta::Action::DELETE_OBJECT:
+    case Delta::Action::RECREATE_OBJECT:
+      // These actions are already encoded in vertex *_OUT_EDGE actions. Also,
+      // these deltas don't contain any information about the from vertex, to
+      // vertex or edge type so they are useless. This function should never
+      // be called for this type of deltas.
+      LOG(FATAL) << "Invalid delta action!";
+    case Delta::Action::ADD_LABEL:
+    case Delta::Action::REMOVE_LABEL:
+    case Delta::Action::ADD_OUT_EDGE:
+    case Delta::Action::REMOVE_OUT_EDGE:
+    case Delta::Action::ADD_IN_EDGE:
+    case Delta::Action::REMOVE_IN_EDGE:
+      // These deltas shouldn't appear for edges.
+      LOG(FATAL) << "Invalid database state!";
+  }
+}
+
+void EncodeTransactionEnd(BaseEncoder *encoder, uint64_t timestamp) {
+  encoder->WriteMarker(Marker::SECTION_DELTA);
+  encoder->WriteUint(timestamp);
+  encoder->WriteMarker(Marker::DELTA_TRANSACTION_END);
+}
+
+void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
+                     StorageGlobalOperation operation, LabelId label,
+                     const std::set<PropertyId> &properties,
+                     uint64_t timestamp) {
+  encoder->WriteMarker(Marker::SECTION_DELTA);
+  encoder->WriteUint(timestamp);
+  switch (operation) {
+    case StorageGlobalOperation::LABEL_INDEX_CREATE:
+    case StorageGlobalOperation::LABEL_INDEX_DROP: {
+      CHECK(properties.empty()) << "Invalid function call!";
+      encoder->WriteMarker(OperationToMarker(operation));
+      encoder->WriteString(name_id_mapper->IdToName(label.AsUint()));
+      break;
+    }
+    case StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE:
+    case StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP:
+    case StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE:
+    case StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP: {
+      CHECK(properties.size() == 1) << "Invalid function call!";
+      encoder->WriteMarker(OperationToMarker(operation));
+      encoder->WriteString(name_id_mapper->IdToName(label.AsUint()));
+      encoder->WriteString(
+          name_id_mapper->IdToName((*properties.begin()).AsUint()));
+      break;
+    }
+    case StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE:
+    case StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP: {
+      CHECK(!properties.empty()) << "Invalid function call!";
+      encoder->WriteMarker(OperationToMarker(operation));
+      encoder->WriteString(name_id_mapper->IdToName(label.AsUint()));
+      encoder->WriteUint(properties.size());
+      for (const auto &property : properties) {
+        encoder->WriteString(name_id_mapper->IdToName(property.AsUint()));
+      }
+      break;
+    }
+  }
+}
+
 RecoveryInfo LoadWal(const std::filesystem::path &path,
                      RecoveredIndicesAndConstraints *indices_constraints,
                      std::optional<uint64_t> snapshot_timestamp,
@@ -884,145 +1029,26 @@ WalFile::~WalFile() {
 
 void WalFile::AppendDelta(const Delta &delta, const Vertex &vertex,
                           uint64_t timestamp) {
-  // When converting a Delta to a WAL delta the logic is inverted. That is
-  // because the Delta's represent undo actions and we want to store redo
-  // actions.
-  wal_.WriteMarker(Marker::SECTION_DELTA);
-  wal_.WriteUint(timestamp);
-  std::lock_guard<utils::SpinLock> guard(vertex.lock);
-  switch (delta.action) {
-    case Delta::Action::DELETE_OBJECT:
-    case Delta::Action::RECREATE_OBJECT: {
-      wal_.WriteMarker(VertexActionToMarker(delta.action));
-      wal_.WriteUint(vertex.gid.AsUint());
-      break;
-    }
-    case Delta::Action::SET_PROPERTY: {
-      wal_.WriteMarker(Marker::DELTA_VERTEX_SET_PROPERTY);
-      wal_.WriteUint(vertex.gid.AsUint());
-      wal_.WriteString(name_id_mapper_->IdToName(delta.property.key.AsUint()));
-      // The property value is the value that is currently stored in the
-      // vertex.
-      // TODO (mferencevic): Mitigate the memory allocation introduced here
-      // (with the `GetProperty` call). It is the only memory allocation in the
-      // entire WAL file writing logic.
-      wal_.WritePropertyValue(
-          vertex.properties.GetProperty(delta.property.key));
-      break;
-    }
-    case Delta::Action::ADD_LABEL:
-    case Delta::Action::REMOVE_LABEL: {
-      wal_.WriteMarker(VertexActionToMarker(delta.action));
-      wal_.WriteUint(vertex.gid.AsUint());
-      wal_.WriteString(name_id_mapper_->IdToName(delta.label.AsUint()));
-      break;
-    }
-    case Delta::Action::ADD_OUT_EDGE:
-    case Delta::Action::REMOVE_OUT_EDGE: {
-      wal_.WriteMarker(VertexActionToMarker(delta.action));
-      if (items_.properties_on_edges) {
-        wal_.WriteUint(delta.vertex_edge.edge.ptr->gid.AsUint());
-      } else {
-        wal_.WriteUint(delta.vertex_edge.edge.gid.AsUint());
-      }
-      wal_.WriteString(
-          name_id_mapper_->IdToName(delta.vertex_edge.edge_type.AsUint()));
-      wal_.WriteUint(vertex.gid.AsUint());
-      wal_.WriteUint(delta.vertex_edge.vertex->gid.AsUint());
-      break;
-    }
-    case Delta::Action::ADD_IN_EDGE:
-    case Delta::Action::REMOVE_IN_EDGE:
-      // These actions are already encoded in the *_OUT_EDGE actions. This
-      // function should never be called for this type of deltas.
-      LOG(FATAL) << "Invalid delta action!";
-  }
+  EncodeDelta(&wal_, name_id_mapper_, items_, delta, vertex, timestamp);
   UpdateStats(timestamp);
 }
 
 void WalFile::AppendDelta(const Delta &delta, const Edge &edge,
                           uint64_t timestamp) {
-  // When converting a Delta to a WAL delta the logic is inverted. That is
-  // because the Delta's represent undo actions and we want to store redo
-  // actions.
-  wal_.WriteMarker(Marker::SECTION_DELTA);
-  wal_.WriteUint(timestamp);
-  std::lock_guard<utils::SpinLock> guard(edge.lock);
-  switch (delta.action) {
-    case Delta::Action::SET_PROPERTY: {
-      wal_.WriteMarker(Marker::DELTA_EDGE_SET_PROPERTY);
-      wal_.WriteUint(edge.gid.AsUint());
-      wal_.WriteString(name_id_mapper_->IdToName(delta.property.key.AsUint()));
-      // The property value is the value that is currently stored in the
-      // edge.
-      // TODO (mferencevic): Mitigate the memory allocation introduced here
-      // (with the `GetProperty` call). It is the only memory allocation in the
-      // entire WAL file writing logic.
-      wal_.WritePropertyValue(edge.properties.GetProperty(delta.property.key));
-      break;
-    }
-    case Delta::Action::DELETE_OBJECT:
-    case Delta::Action::RECREATE_OBJECT:
-      // These actions are already encoded in vertex *_OUT_EDGE actions. Also,
-      // these deltas don't contain any information about the from vertex, to
-      // vertex or edge type so they are useless. This function should never
-      // be called for this type of deltas.
-      LOG(FATAL) << "Invalid delta action!";
-    case Delta::Action::ADD_LABEL:
-    case Delta::Action::REMOVE_LABEL:
-    case Delta::Action::ADD_OUT_EDGE:
-    case Delta::Action::REMOVE_OUT_EDGE:
-    case Delta::Action::ADD_IN_EDGE:
-    case Delta::Action::REMOVE_IN_EDGE:
-      // These deltas shouldn't appear for edges.
-      LOG(FATAL) << "Invalid database state!";
-  }
+  EncodeDelta(&wal_, name_id_mapper_, delta, edge, timestamp);
   UpdateStats(timestamp);
 }
 
 void WalFile::AppendTransactionEnd(uint64_t timestamp) {
-  wal_.WriteMarker(Marker::SECTION_DELTA);
-  wal_.WriteUint(timestamp);
-  wal_.WriteMarker(Marker::DELTA_TRANSACTION_END);
+  EncodeTransactionEnd(&wal_, timestamp);
   UpdateStats(timestamp);
 }
 
 void WalFile::AppendOperation(StorageGlobalOperation operation, LabelId label,
                               const std::set<PropertyId> &properties,
                               uint64_t timestamp) {
-  wal_.WriteMarker(Marker::SECTION_DELTA);
-  wal_.WriteUint(timestamp);
-  switch (operation) {
-    case StorageGlobalOperation::LABEL_INDEX_CREATE:
-    case StorageGlobalOperation::LABEL_INDEX_DROP: {
-      CHECK(properties.empty()) << "Invalid function call!";
-      wal_.WriteMarker(OperationToMarker(operation));
-      wal_.WriteString(name_id_mapper_->IdToName(label.AsUint()));
-      break;
-    }
-    case StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE:
-    case StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP:
-    case StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE:
-    case StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP: {
-      CHECK(properties.size() == 1) << "Invalid function call!";
-      wal_.WriteMarker(OperationToMarker(operation));
-      wal_.WriteString(name_id_mapper_->IdToName(label.AsUint()));
-      wal_.WriteString(
-          name_id_mapper_->IdToName((*properties.begin()).AsUint()));
-      break;
-    }
-    case StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE:
-    case StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP: {
-      CHECK(!properties.empty()) << "Invalid function call!";
-      wal_.WriteMarker(OperationToMarker(operation));
-      wal_.WriteString(name_id_mapper_->IdToName(label.AsUint()));
-      wal_.WriteUint(properties.size());
-      for (const auto &property : properties) {
-        wal_.WriteString(name_id_mapper_->IdToName(property.AsUint()));
-      }
-      break;
-    }
-  }
+  EncodeOperation(&wal_, name_id_mapper_, operation, label, properties,
+                  timestamp);
   UpdateStats(timestamp);
 }
 
diff --git a/src/storage/v2/durability/wal.hpp b/src/storage/v2/durability/wal.hpp
index 12ecc0a65..ae0f7c1b3 100644
--- a/src/storage/v2/durability/wal.hpp
+++ b/src/storage/v2/durability/wal.hpp
@@ -127,6 +127,24 @@ WalDeltaData ReadWalDeltaData(Decoder *wal);
 /// @throw RecoveryFailure
 WalDeltaData::Type SkipWalDeltaData(Decoder *wal);
 
+/// Function used to encode a `Delta` that originated from a `Vertex`.
+void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
+                 Config::Items items, const Delta &delta, const Vertex &vertex,
+                 uint64_t timestamp);
+
+/// Function used to encode a `Delta` that originated from an `Edge`.
+void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
+                 const Delta &delta, const Edge &edge, uint64_t timestamp);
+
+/// Function used to encode the transaction end.
+void EncodeTransactionEnd(BaseEncoder *encoder, uint64_t timestamp);
+
+/// Function used to encode non-transactional operation.
+void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper,
+                     StorageGlobalOperation operation, LabelId label,
+                     const std::set<PropertyId> &properties,
+                     uint64_t timestamp);
+
 /// Function used to load the WAL data into the storage.
 /// @throw RecoveryFailure
 RecoveryInfo LoadWal(const std::filesystem::path &path,