From c37bb87ed855d2a8d9c8f764cd943f31f6f98a03 Mon Sep 17 00:00:00 2001
From: Dominik Gleich <dominik.gleich@memgraph.io>
Date: Mon, 29 Jan 2018 19:16:06 +0100
Subject: [PATCH] Support snapshot creation and recovery in distributed

Summary:
Add custom encoder/decoder

Update snapshot recovery

Reviewers: florijan, teon.banek, mferencevic, mculinovic

Reviewed By: florijan

Subscribers: mferencevic, pullbot

Differential Revision: https://phabricator.memgraph.io/D1142
---
 .../bolt/v1/decoder/decoded_value.hpp         |   6 +-
 src/database/graph_db_accessor.cpp            |  19 +++
 src/database/graph_db_accessor.hpp            |  20 ++-
 src/distributed/serialization.hpp             |   2 +-
 src/durability/recovery.cpp                   | 109 +++++++++++---
 src/durability/snapshooter.cpp                |   6 +-
 src/durability/snapshot_decoded_value.hpp     |  43 ++++++
 src/durability/snapshot_decoder.hpp           |  92 ++++++++++++
 src/durability/snapshot_encoder.hpp           |  49 ++++++
 src/storage/address.hpp                       |  10 +-
 src/storage/edges.hpp                         |   7 +-
 src/storage/record_accessor.cpp               |  18 ++-
 src/storage/record_accessor.hpp               |   3 +
 tests/manual/generate_snapshot.cpp            |   1 +
 tests/unit/durability.cpp                     |  22 ++-
 tests/unit/storage_address.cpp                |   6 +-
 tools/src/mg_import_csv/main.cpp              | 139 ++++++++++++------
 17 files changed, 456 insertions(+), 96 deletions(-)
 create mode 100644 src/durability/snapshot_decoded_value.hpp
 create mode 100644 src/durability/snapshot_decoder.hpp
 create mode 100644 src/durability/snapshot_encoder.hpp

diff --git a/src/communication/bolt/v1/decoder/decoded_value.hpp b/src/communication/bolt/v1/decoder/decoded_value.hpp
index 43239bddb..56a2f5f68 100644
--- a/src/communication/bolt/v1/decoder/decoded_value.hpp
+++ b/src/communication/bolt/v1/decoder/decoded_value.hpp
@@ -19,7 +19,7 @@ class DecodedValue;
  * The decoder writes data into this structure.
  */
 struct DecodedVertex {
-  gid::Gid id;
+  int64_t id;
   std::vector<std::string> labels;
   std::map<std::string, DecodedValue> properties;
 };
@@ -29,7 +29,7 @@ struct DecodedVertex {
  * The decoder writes data into this structure.
  */
 struct DecodedEdge {
-  gid::Gid id;
+  int64_t id;
   int64_t from;
   int64_t to;
   std::string type;
@@ -41,7 +41,7 @@ struct DecodedEdge {
  * The decoder writes data into this structure.
  */
 struct DecodedUnboundedEdge {
-  gid::Gid id;
+  int64_t id;
   std::string type;
   std::map<std::string, DecodedValue> properties;
 };
diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp
index 0a9ef0a88..d513c1971 100644
--- a/src/database/graph_db_accessor.cpp
+++ b/src/database/graph_db_accessor.cpp
@@ -425,6 +425,25 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
                       edge_type);
 }
 
+EdgeAccessor GraphDbAccessor::InsertOnlyEdge(Edges::VertexAddress &from,
+                                             Edges::VertexAddress &to,
+                                             storage::EdgeType edge_type,
+                                             gid::Gid edge_gid) {
+  auto gid = db_.storage().edge_generator_.Next(edge_gid);
+  DCHECK(gid == edge_gid) << "Gid should be equal as edge gid since "
+                             "this edges are only added after vertices "
+                             "reference them by their gid";
+  auto edge_vlist =
+      new mvcc::VersionList<Edge>(transaction_, gid, from, to, edge_type);
+  // We need to insert edge_vlist to edges_ before calling update since update
+  // can throw and edge_vlist will not be garbage collected if it is not in
+  // edges_ skiplist.
+  bool success = db_.storage().edges_.access().insert(gid, edge_vlist).second;
+  CHECK(success) << "Attempting to insert an edge with an existing GID: "
+                 << gid;
+  return EdgeAccessor(edge_vlist, *this, from, to, edge_type);
+}
+
 int64_t GraphDbAccessor::EdgesCount() const {
   DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
   return db_.storage().edges_.access().size();
diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp
index 81d91f354..22530bd6c 100644
--- a/src/database/graph_db_accessor.hpp
+++ b/src/database/graph_db_accessor.hpp
@@ -278,6 +278,14 @@ class GraphDbAccessor {
                           std::experimental::optional<gid::Gid> requested_gid =
                               std::experimental::nullopt);
 
+  /**
+   * Insert edge into main storage, but don't insert it into from and to
+   * vertices edge lists.
+   */
+  EdgeAccessor InsertOnlyEdge(Edges::VertexAddress &from,
+                              Edges::VertexAddress &to,
+                              storage::EdgeType edge_type, gid::Gid edge_gid);
+
   /**
    * Removes an edge from the graph. Parameters can indicate if the edge should
    * be removed from data structures in vertices it connects. When removing an
@@ -551,6 +559,12 @@ class GraphDbAccessor {
   template <typename TRecord>
   distributed::RemoteCache<TRecord> &remote_elements();
 
+  /// Gets the local address for the given gid. Fails if not present.
+  mvcc::VersionList<Vertex> *LocalVertexAddress(gid::Gid gid) const;
+
+  /// Gets the local edge address for the given gid. Fails if not present.
+  mvcc::VersionList<Edge> *LocalEdgeAddress(gid::Gid gid) const;
+
  private:
   GraphDb &db_;
   tx::Transaction &transaction_;
@@ -607,11 +621,5 @@ class GraphDbAccessor {
   void UpdatePropertyIndex(storage::Property property,
                            const RecordAccessor<Vertex> &vertex_accessor,
                            const Vertex *const vertex);
-
-  /// Gets the local address for the given gid. Fails if not present.
-  mvcc::VersionList<Vertex> *LocalVertexAddress(gid::Gid gid) const;
-
-  /// Gets the local edge address for the given gid. Fails if not present.
-  mvcc::VersionList<Edge> *LocalEdgeAddress(gid::Gid gid) const;
 };
 }  // namespace database
diff --git a/src/distributed/serialization.hpp b/src/distributed/serialization.hpp
index 912357fc1..91079c253 100644
--- a/src/distributed/serialization.hpp
+++ b/src/distributed/serialization.hpp
@@ -17,7 +17,7 @@ namespace impl {
 // global one, using the given worker_id.
 template <typename TArchive, typename TAddress>
 void SaveAddress(TArchive &ar, TAddress address, int worker_id) {
-  auto gid = address.is_remote() ? address.global_id() : address.local()->gid_;
+  auto gid = address.is_remote() ? address.gid() : address.local()->gid_;
   ar << gid;
   ar << worker_id;
 };
diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp
index 6951d08a1..1a713ddfa 100644
--- a/src/durability/recovery.cpp
+++ b/src/durability/recovery.cpp
@@ -3,10 +3,11 @@
 #include <limits>
 #include <unordered_map>
 
-#include "communication/bolt/v1/decoder/decoder.hpp"
 #include "database/graph_db_accessor.hpp"
 #include "durability/hashed_file_reader.hpp"
 #include "durability/paths.hpp"
+#include "durability/snapshot_decoded_value.hpp"
+#include "durability/snapshot_decoder.hpp"
 #include "durability/version.hpp"
 #include "durability/wal.hpp"
 #include "query/typed_value.hpp"
@@ -55,10 +56,9 @@ struct RecoveryData {
 bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
                      RecoveryData &recovery_data) {
   HashedFileReader reader;
-  communication::bolt::Decoder<HashedFileReader> decoder(reader);
+  SnapshotDecoder<HashedFileReader> decoder(reader);
 
   RETURN_IF_NOT(reader.Open(snapshot_file));
-  std::unordered_map<uint64_t, VertexAccessor> vertices;
 
   auto magic_number = durability::kMagicNumber;
   reader.Read(magic_number.data(), magic_number.size());
@@ -112,29 +112,75 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
   }
 
   database::GraphDbAccessor dba(db);
+  std::unordered_map<gid::Gid,
+                     std::pair<Edges::VertexAddress, Edges::VertexAddress>>
+      edge_gid_endpoints_mapping;
   for (int64_t i = 0; i < vertex_count; ++i) {
-    DecodedValue vertex_dv;
-    RETURN_IF_NOT(decoder.ReadValue(&vertex_dv, DecodedValue::Type::Vertex));
-    auto &vertex = vertex_dv.ValueVertex();
-    auto vertex_accessor = dba.InsertVertex(vertex.id);
-    for (const auto &label : vertex.labels) {
+    auto vertex = decoder.ReadSnapshotVertex();
+    RETURN_IF_NOT(vertex);
+    auto vertex_accessor = dba.InsertVertex(vertex->gid);
+    for (const auto &label : vertex->labels) {
       vertex_accessor.add_label(dba.Label(label));
     }
-    for (const auto &property_pair : vertex.properties) {
+    for (const auto &property_pair : vertex->properties) {
       vertex_accessor.PropsSet(dba.Property(property_pair.first),
                                query::TypedValue(property_pair.second));
     }
-    vertices.insert({vertex.id, vertex_accessor});
+    auto vertex_record = vertex_accessor.GetNew();
+    for (const auto &edge : vertex->in) {
+      vertex_record->in_.emplace(edge.vertex, edge.address,
+                                 dba.EdgeType(edge.type));
+      edge_gid_endpoints_mapping[edge.address.gid()] = {
+          edge.vertex, vertex_accessor.GlobalAddress()};
+    }
+    for (const auto &edge : vertex->out) {
+      vertex_record->out_.emplace(edge.vertex, edge.address,
+                                  dba.EdgeType(edge.type));
+      edge_gid_endpoints_mapping[edge.address.gid()] = {
+          vertex_accessor.GlobalAddress(), edge.vertex};
+    }
   }
+
+  auto vertex_transform_to_local_if_possible =
+      [&db, &dba](Edges::VertexAddress &address) {
+        if (address.is_local()) return;
+        // If the worker id matches it should be a local apperance
+        if (address.worker_id() == db.WorkerId()) {
+          address = Edges::VertexAddress(dba.LocalVertexAddress(address.gid()));
+          CHECK(address.is_local()) << "Address should be local but isn't";
+        }
+      };
+
+  auto edge_transform_to_local_if_possible =
+      [&db, &dba](Edges::EdgeAddress &address) {
+        if (address.is_local()) return;
+        // If the worker id matches it should be a local apperance
+        if (address.worker_id() == db.WorkerId()) {
+          address = Edges::EdgeAddress(dba.LocalEdgeAddress(address.gid()));
+          CHECK(address.is_local()) << "Address should be local but isn't";
+        }
+      };
+
   for (int64_t i = 0; i < edge_count; ++i) {
-    DecodedValue edge_dv;
-    RETURN_IF_NOT(decoder.ReadValue(&edge_dv, DecodedValue::Type::Edge));
-    auto &edge = edge_dv.ValueEdge();
-    auto it_from = vertices.find(edge.from);
-    auto it_to = vertices.find(edge.to);
-    RETURN_IF_NOT(it_from != vertices.end() && it_to != vertices.end());
-    auto edge_accessor = dba.InsertEdge(it_from->second, it_to->second,
-                                        dba.EdgeType(edge.type), edge.id);
+    RETURN_IF_NOT(
+        decoder.ReadValue(&dv, communication::bolt::DecodedValue::Type::Edge));
+    auto &edge = dv.ValueEdge();
+    // We have to take full edge endpoints from vertices since the endpoints
+    // found here don't containt worker_id, and this can't be changed since this
+    // edges must be bolt-compliant
+    auto &edge_endpoints = edge_gid_endpoints_mapping[edge.id];
+
+    Edges::VertexAddress from;
+    Edges::VertexAddress to;
+    std::tie(from, to) = edge_endpoints;
+
+    // From and to are written in the global_address format and we should
+    // convert them back to local format for speedup - if possible
+    vertex_transform_to_local_if_possible(from);
+    vertex_transform_to_local_if_possible(to);
+
+    auto edge_accessor =
+        dba.InsertOnlyEdge(from, to, dba.EdgeType(edge.type), edge.id);
 
     for (const auto &property_pair : edge.properties)
       edge_accessor.PropsSet(dba.Property(property_pair.first),
@@ -149,6 +195,33 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
     dba.Abort();
     return false;
   }
+
+  // We have to replace global_ids with local ids where possible for all edges
+  // in every vertex and this can only be done after we inserted the edges; this
+  // is to speedup execution
+  for (auto &vertex_accessor : dba.Vertices(true)) {
+    auto vertex = vertex_accessor.GetNew();
+    auto iterate_and_transform =
+        [vertex_transform_to_local_if_possible,
+         edge_transform_to_local_if_possible](Edges &edges) {
+          Edges transformed;
+          for (auto &element : edges) {
+            auto vertex = element.vertex;
+            vertex_transform_to_local_if_possible(vertex);
+
+            auto edge = element.edge;
+            edge_transform_to_local_if_possible(edge);
+
+            transformed.emplace(vertex, edge, element.edge_type);
+          }
+
+          return transformed;
+        };
+
+    vertex->in_ = iterate_and_transform(vertex->in_);
+    vertex->out_ = iterate_and_transform(vertex->out_);
+  }
+
   dba.Commit();
   return true;
 }
diff --git a/src/durability/snapshooter.cpp b/src/durability/snapshooter.cpp
index 8940cff64..f57493f87 100644
--- a/src/durability/snapshooter.cpp
+++ b/src/durability/snapshooter.cpp
@@ -4,10 +4,10 @@
 
 #include "durability/snapshooter.hpp"
 
-#include "communication/bolt/v1/encoder/base_encoder.hpp"
 #include "database/graph_db_accessor.hpp"
 #include "durability/hashed_file_writer.hpp"
 #include "durability/paths.hpp"
+#include "durability/snapshot_encoder.hpp"
 #include "durability/version.hpp"
 #include "utils/datetime/timestamp.hpp"
 
@@ -20,7 +20,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
             database::GraphDbAccessor &dba) {
   try {
     HashedFileWriter buffer(snapshot_file);
-    communication::bolt::BaseEncoder<HashedFileWriter> encoder(buffer);
+    SnapshotEncoder<HashedFileWriter> encoder(buffer);
     int64_t vertex_num = 0, edge_num = 0;
 
     encoder.WriteRAW(durability::kMagicNumber.data(),
@@ -59,7 +59,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
     }
 
     for (const auto &vertex : dba.Vertices(false)) {
-      encoder.WriteVertex(vertex);
+      encoder.WriteSnapshotVertex(vertex);
       vertex_num++;
     }
     for (const auto &edge : dba.Edges(false)) {
diff --git a/src/durability/snapshot_decoded_value.hpp b/src/durability/snapshot_decoded_value.hpp
new file mode 100644
index 000000000..d081e4682
--- /dev/null
+++ b/src/durability/snapshot_decoded_value.hpp
@@ -0,0 +1,43 @@
+#pragma once
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "communication/bolt/v1/decoder/decoded_value.hpp"
+#include "query/typed_value.hpp"
+#include "storage/property_value.hpp"
+#include "utils/algorithm.hpp"
+#include "utils/exceptions.hpp"
+
+namespace durability {
+
+/** Forward declartion of DecodedSnapshotEdge. */
+struct DecodedInlinedVertexEdge;
+
+/**
+ * Structure used when reading a Vertex with the decoder.
+ * The decoder writes data into this structure.
+ */
+struct DecodedSnapshotVertex {
+  gid::Gid gid;
+  std::vector<std::string> labels;
+  std::map<std::string, communication::bolt::DecodedValue> properties;
+  // Vector of edges without properties
+  std::vector<DecodedInlinedVertexEdge> in;
+  std::vector<DecodedInlinedVertexEdge> out;
+};
+
+/**
+ * Structure used when reading an Edge with the snapshot decoder.
+ * The decoder writes data into this structure.
+ */
+struct DecodedInlinedVertexEdge {
+  // Addresses down below must always be global_address and never direct
+  // pointers to a record.
+  Edges::EdgeAddress address;
+  Edges::VertexAddress vertex;
+  std::string type;
+};
+
+}  // namespace durability
diff --git a/src/durability/snapshot_decoder.hpp b/src/durability/snapshot_decoder.hpp
new file mode 100644
index 000000000..a6f8d530d
--- /dev/null
+++ b/src/durability/snapshot_decoder.hpp
@@ -0,0 +1,92 @@
+#pragma once
+
+#include <experimental/optional>
+
+#include "communication/bolt/v1/decoder/decoder.hpp"
+#include "durability/snapshot_decoded_value.hpp"
+
+namespace durability {
+
+using namespace communication::bolt;
+
+template <typename Buffer>
+class SnapshotDecoder : public Decoder<Buffer> {
+ public:
+  explicit SnapshotDecoder(Buffer &buffer) : Decoder<Buffer>(buffer) {}
+
+  std::experimental::optional<DecodedSnapshotVertex> ReadSnapshotVertex() {
+    DecodedValue dv;
+    DecodedSnapshotVertex vertex;
+
+    if (!Decoder<Buffer>::ReadValue(&dv, DecodedValue::Type::Vertex)) {
+      DLOG(WARNING) << "Unable to read snapshot vertex";
+      return std::experimental::nullopt;
+    }
+
+    auto &read_vertex = dv.ValueVertex();
+    vertex.gid = read_vertex.id;
+    vertex.labels = read_vertex.labels;
+    vertex.properties = read_vertex.properties;
+
+    if (!Decoder<Buffer>::ReadValue(&dv, DecodedValue::Type::Int)) {
+      DLOG(WARNING) << "[ReadSnapshotVertex] Couldn't read number of in "
+                       "edges in vertex!";
+      return std::experimental::nullopt;
+    }
+
+    for (int i = 0; i < dv.ValueInt(); ++i) {
+      auto edge = ReadSnapshotEdge();
+      if (!edge) return std::experimental::nullopt;
+      vertex.in.emplace_back(*edge);
+    }
+
+    if (!Decoder<Buffer>::ReadValue(&dv, DecodedValue::Type::Int)) {
+      DLOG(WARNING) << "[ReadSnapshotVertex] Couldn't read number of out "
+                       "edges in vertex!";
+      return std::experimental::nullopt;
+    }
+
+    for (int i = 0; i < dv.ValueInt(); ++i) {
+      auto edge = ReadSnapshotEdge();
+      if (!edge) return std::experimental::nullopt;
+      vertex.out.emplace_back(*edge);
+    }
+
+    VLOG(1) << "[ReadSnapshotVertex] Success";
+    return vertex;
+  }
+
+ private:
+  std::experimental::optional<DecodedInlinedVertexEdge> ReadSnapshotEdge() {
+    DecodedValue dv;
+    DecodedInlinedVertexEdge edge;
+
+    VLOG(1) << "[ReadSnapshotEdge] Start";
+
+    // read ID
+    if (!Decoder<Buffer>::ReadValue(&dv, DecodedValue::Type::Int)) {
+      DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read ID!";
+      return std::experimental::nullopt;
+    }
+
+    edge.address = dv.ValueInt();
+    // read other side
+    if (!Decoder<Buffer>::ReadValue(&dv, DecodedValue::Type::Int)) {
+      DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read from address!";
+      return std::experimental::nullopt;
+    }
+    edge.vertex = dv.ValueInt();
+
+    // read type
+    if (!Decoder<Buffer>::ReadValue(&dv, DecodedValue::Type::String)) {
+      DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read type!";
+      return std::experimental::nullopt;
+    }
+    edge.type = dv.ValueString();
+
+    VLOG(1) << "[ReadSnapshotEdge] Success";
+
+    return edge;
+  }
+};
+};  // namespace durability
diff --git a/src/durability/snapshot_encoder.hpp b/src/durability/snapshot_encoder.hpp
new file mode 100644
index 000000000..33bc0e1f3
--- /dev/null
+++ b/src/durability/snapshot_encoder.hpp
@@ -0,0 +1,49 @@
+#pragma once
+
+#include "communication/bolt/v1/encoder/base_encoder.hpp"
+
+namespace durability {
+
+using namespace communication::bolt;
+
+template <typename Buffer>
+class SnapshotEncoder : public BaseEncoder<Buffer> {
+ public:
+  explicit SnapshotEncoder(Buffer &buffer) : BaseEncoder<Buffer>(buffer) {}
+  void WriteSnapshotVertex(const VertexAccessor &vertex) {
+    BaseEncoder<Buffer>::WriteVertex(vertex);
+
+    // write in edges without properties
+    this->WriteUInt(vertex.in_degree());
+    auto edges_in = vertex.in();
+    for (const auto &edge : edges_in) {
+      this->WriteSnapshotEdge(edge, true);
+    }
+
+    // write out edges without properties
+    this->WriteUInt(vertex.out_degree());
+    auto edges_out = vertex.out();
+    for (const auto &edge : edges_out) {
+      this->WriteSnapshotEdge(edge, false);
+    }
+  }
+
+ private:
+  void WriteUInt(const uint64_t &value) {
+    this->WriteInt(*reinterpret_cast<const int64_t *>(&value));
+  }
+
+  // Writes edge without properties
+  void WriteSnapshotEdge(const EdgeAccessor &edge, bool write_from) {
+    WriteUInt(edge.GlobalAddress().raw());
+    if (write_from)
+      WriteUInt(edge.from().GlobalAddress().raw());
+    else
+      WriteUInt(edge.to().GlobalAddress().raw());
+
+    // write type
+    this->WriteString(edge.db_accessor().EdgeTypeName(edge.EdgeType()));
+  }
+};
+
+}  // namespace durability
diff --git a/src/storage/address.hpp b/src/storage/address.hpp
index 228735644..f91bab9c2 100644
--- a/src/storage/address.hpp
+++ b/src/storage/address.hpp
@@ -35,6 +35,11 @@ class Address {
   static constexpr uint64_t kRemote{1};
 
  public:
+  Address() {}
+
+  // Constructor for raw address value
+  Address(Storage storage) : storage_(storage) {}
+
   // Constructor for local Address.
   Address(TLocalObj *ptr) {
     uintptr_t ptr_no_type = reinterpret_cast<uintptr_t>(ptr);
@@ -63,7 +68,7 @@ class Address {
     return reinterpret_cast<TLocalObj *>(storage_);
   }
 
-  gid::Gid global_id() const {
+  gid::Gid gid() const {
     DCHECK(is_remote()) << "Attempting to get global ID from local address";
     return storage_ >> (kTypeMaskSize + kWorkerIdSize);
   }
@@ -74,6 +79,9 @@ class Address {
     return (storage_ >> 1) & ((1ULL << kWorkerIdSize) - 1);
   }
 
+  /// Returns raw address value
+  Storage raw() const { return storage_; }
+
   bool operator==(const Address<TLocalObj> &other) const {
     return storage_ == other.storage_;
   }
diff --git a/src/storage/edges.hpp b/src/storage/edges.hpp
index 553693758..5f0d75b71 100644
--- a/src/storage/edges.hpp
+++ b/src/storage/edges.hpp
@@ -94,9 +94,10 @@ class Edges {
      * present in this iterator. */
     void update_position() {
       if (vertex_.local()) {
-        position_ = std::find_if(
-            position_, end_,
-            [v = this->vertex_](const Element &e) { return e.vertex == v; });
+        position_ = std::find_if(position_,
+                                 end_, [v = this->vertex_](const Element &e) {
+                                   return e.vertex == v;
+                                 });
       }
       if (edge_types_) {
         position_ = std::find_if(position_, end_, [this](const Element &e) {
diff --git a/src/storage/record_accessor.cpp b/src/storage/record_accessor.cpp
index 2af5b69ab..4cc38bb9f 100644
--- a/src/storage/record_accessor.cpp
+++ b/src/storage/record_accessor.cpp
@@ -102,7 +102,7 @@ database::GraphDbAccessor &RecordAccessor<TRecord>::db_accessor() const {
 
 template <typename TRecord>
 gid::Gid RecordAccessor<TRecord>::gid() const {
-  return is_local() ? address_.local()->gid_ : address_.global_id();
+  return is_local() ? address_.local()->gid_ : address_.gid();
 }
 
 template <typename TRecord>
@@ -111,6 +111,14 @@ storage::Address<mvcc::VersionList<TRecord>> RecordAccessor<TRecord>::address()
   return address_;
 }
 
+template <typename TRecord>
+storage::Address<mvcc::VersionList<TRecord>>
+RecordAccessor<TRecord>::GlobalAddress() const {
+  return is_local() ? storage::Address<mvcc::VersionList<TRecord>>(
+                          gid(), db_accessor_->db_.WorkerId())
+                    : address_;
+}
+
 template <typename TRecord>
 RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchNew() {
   if (is_local()) {
@@ -148,8 +156,8 @@ bool RecordAccessor<TRecord>::Reconstruct() const {
     // we need to invalidate the RemoteCache and really get the latest stuff.
     // But only do that after the command has been advanced.
     db_accessor().template remote_elements<TRecord>().FindSetOldNew(
-        db_accessor().transaction().id_, address_.worker_id(),
-        address_.global_id(), old_, new_);
+        db_accessor().transaction().id_, address_.worker_id(), address_.gid(),
+        old_, new_);
   }
   current_ = old_ ? old_ : new_;
   return old_ != nullptr || new_ != nullptr;
@@ -210,7 +218,7 @@ RecordAccessor<Vertex>::AddressT RecordAccessor<Vertex>::NormalizedAddress(
     AddressT address) const {
   if (address.is_local()) return address;
   if (address.worker_id() == db_accessor().db_.WorkerId()) {
-    return AddressT(db_accessor().LocalVertexAddress(address.global_id()));
+    return AddressT(db_accessor().LocalVertexAddress(address.gid()));
   }
 
   return address;
@@ -221,7 +229,7 @@ RecordAccessor<Edge>::AddressT RecordAccessor<Edge>::NormalizedAddress(
     AddressT address) const {
   if (address.is_local()) return address;
   if (address.worker_id() == db_accessor().db_.WorkerId()) {
-    return AddressT(db_accessor().LocalEdgeAddress(address.global_id()));
+    return AddressT(db_accessor().LocalEdgeAddress(address.gid()));
   }
 
   return address;
diff --git a/src/storage/record_accessor.hpp b/src/storage/record_accessor.hpp
index e9cf01ed1..df88ff5cc 100644
--- a/src/storage/record_accessor.hpp
+++ b/src/storage/record_accessor.hpp
@@ -96,6 +96,9 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
 
   AddressT address() const;
 
+  // Returns an address which is global - composed of gid and worker_id
+  AddressT GlobalAddress() const;
+
   /*
    * Switches this record accessor to use the latest version visible to the
    * current transaction+command.  Possibly the one that was created by this
diff --git a/tests/manual/generate_snapshot.cpp b/tests/manual/generate_snapshot.cpp
index bd54be486..c293eb7f3 100644
--- a/tests/manual/generate_snapshot.cpp
+++ b/tests/manual/generate_snapshot.cpp
@@ -273,6 +273,7 @@ nlohmann::json GetWithDefault(const nlohmann::json &object,
 }
 
 int main(int argc, char **argv) {
+  LOG(FATAL) << "Doesn't work with the newest format - waiting for refactor";
   gflags::ParseCommandLineFlags(&argc, &argv, true);
   google::InitGoogleLogging(argv[0]);
 
diff --git a/tests/unit/durability.cpp b/tests/unit/durability.cpp
index 940d6eb5e..055787122 100644
--- a/tests/unit/durability.cpp
+++ b/tests/unit/durability.cpp
@@ -9,7 +9,6 @@
 #include "glog/logging.h"
 #include "gtest/gtest.h"
 
-#include "communication/bolt/v1/decoder/decoder.hpp"
 #include "database/graph_db.hpp"
 #include "database/graph_db_accessor.hpp"
 #include "database/state_delta.hpp"
@@ -17,6 +16,7 @@
 #include "durability/paths.hpp"
 #include "durability/recovery.hpp"
 #include "durability/snapshooter.hpp"
+#include "durability/snapshot_decoder.hpp"
 #include "durability/version.hpp"
 #include "utils/string.hpp"
 
@@ -72,6 +72,14 @@ class DbGenerator {
     return edge;
   }
 
+  EdgeAccessor InsertCycleEdge() {
+    auto vertex = RandomVertex();
+    auto edge =
+        dba_.InsertEdge(vertex, vertex, EdgeType(RandomInt(kEdgeTypeCount)));
+    edge_ids_.emplace_back(edge.gid());
+    return edge;
+  }
+
   void RemoveEdge() {
     auto edge = RandomEdge(true);
     dba_.RemoveEdge(edge);
@@ -240,6 +248,7 @@ void MakeDb(database::GraphDbAccessor &dba, int scale,
   DbGenerator generator{dba};
   for (int i = 0; i < scale; i++) generator.InsertVertex();
   for (int i = 0; i < scale * 2; i++) generator.InsertEdge();
+  for (int i = 0; i < scale / 2; i++) generator.InsertCycleEdge();
   // Give the WAL some time to flush, we're pumping ops fast here.
   std::this_thread::sleep_for(std::chrono::milliseconds(30));
   for (int i = 0; i < scale * 3; i++) {
@@ -405,7 +414,7 @@ TEST_F(Durability, SnapshotEncoding) {
 
   auto snapshot = GetLastFile(snapshot_dir_);
   HashedFileReader buffer;
-  communication::bolt::Decoder<HashedFileReader> decoder(buffer);
+  durability::SnapshotDecoder<HashedFileReader> decoder(buffer);
 
   int64_t vertex_count, edge_count;
   uint64_t hash;
@@ -443,14 +452,13 @@ TEST_F(Durability, SnapshotEncoding) {
   EXPECT_EQ(dv.ValueList()[0].ValueString(), "l1");
   EXPECT_EQ(dv.ValueList()[1].ValueString(), "p1");
 
-  std::map<gid::Gid, communication::bolt::DecodedVertex> decoded_vertices;
+  std::map<gid::Gid, durability::DecodedSnapshotVertex> decoded_vertices;
 
   // Decode vertices.
   for (int i = 0; i < vertex_count; ++i) {
-    decoder.ReadValue(&dv);
-    ASSERT_EQ(dv.type(), communication::bolt::DecodedValue::Type::Vertex);
-    auto &vertex = dv.ValueVertex();
-    decoded_vertices.emplace(vertex.id, vertex);
+    auto vertex = decoder.ReadSnapshotVertex();
+    ASSERT_NE(vertex, std::experimental::nullopt);
+    decoded_vertices.emplace(vertex->gid, *vertex);
   }
   ASSERT_EQ(decoded_vertices.size(), 3);
   ASSERT_EQ(decoded_vertices[gid0].labels.size(), 1);
diff --git a/tests/unit/storage_address.cpp b/tests/unit/storage_address.cpp
index 4a4bd768d..3747236bf 100644
--- a/tests/unit/storage_address.cpp
+++ b/tests/unit/storage_address.cpp
@@ -26,11 +26,11 @@ TEST(Address, Global) {
   int worker_id{17};
   uint64_t local_id{31};
   gid::Generator generator(13);
-  auto global_id = generator.Next(local_id);
-  Address<int> address{global_id, worker_id};
+  auto gid = generator.Next(local_id);
+  Address<int> address{gid, worker_id};
 
   EXPECT_TRUE(address.is_remote());
   EXPECT_FALSE(address.is_local());
-  EXPECT_EQ(address.global_id(), global_id);
+  EXPECT_EQ(address.gid(), gid);
   EXPECT_EQ(address.worker_id(), worker_id);
 }
diff --git a/tools/src/mg_import_csv/main.cpp b/tools/src/mg_import_csv/main.cpp
index e703297e7..bb52cfb50 100644
--- a/tools/src/mg_import_csv/main.cpp
+++ b/tools/src/mg_import_csv/main.cpp
@@ -1,3 +1,4 @@
+#include <algorithm>
 #include <cstdio>
 #include <experimental/filesystem>
 #include <experimental/optional>
@@ -8,11 +9,12 @@
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 
-#include "communication/bolt/v1/encoder/base_encoder.hpp"
 #include "config.hpp"
 #include "durability/hashed_file_writer.hpp"
 #include "durability/paths.hpp"
 #include "durability/snapshooter.hpp"
+#include "durability/snapshot_decoded_value.hpp"
+#include "durability/snapshot_encoder.hpp"
 #include "durability/version.hpp"
 #include "utils/string.hpp"
 #include "utils/timer.hpp"
@@ -172,11 +174,12 @@ std::vector<Field> ReadHeader(std::istream &stream) {
   return fields;
 }
 
-query::TypedValue StringToTypedValue(const std::string &str,
-                                     const std::string &type) {
+communication::bolt::DecodedValue StringToDecodedValue(
+    const std::string &str, const std::string &type) {
   // Empty string signifies Null.
-  if (str.empty()) return query::TypedValue::Null;
-  auto convert = [](const auto &str, const auto &type) -> query::TypedValue {
+  if (str.empty()) return communication::bolt::DecodedValue();
+  auto convert = [](const auto &str,
+                    const auto &type) -> communication::bolt::DecodedValue {
     if (type == "int" || type == "long" || type == "byte" || type == "short") {
       std::istringstream ss(str);
       int64_t val;
@@ -190,14 +193,14 @@ query::TypedValue StringToTypedValue(const std::string &str,
       return str;
     }
     LOG(FATAL) << "Unexpected type: " << type;
-    return query::TypedValue::Null;
+    return communication::bolt::DecodedValue();
   };
   // Type *not* ending with '[]', signifies regular value.
   if (!utils::EndsWith(type, "[]")) return convert(str, type);
   // Otherwise, we have an array type.
   auto elem_type = type.substr(0, type.size() - 2);
   auto elems = utils::Split(str, FLAGS_array_delimiter);
-  std::vector<query::TypedValue> array;
+  std::vector<communication::bolt::DecodedValue> array;
   array.reserve(elems.size());
   for (const auto &elem : elems) {
     array.push_back(convert(utils::Trim(elem), elem_type));
@@ -211,13 +214,14 @@ std::string GetIdSpace(const std::string &type) {
   return type.substr(start + 1, type.size() - 1);
 }
 
-void WriteNodeRow(const std::vector<Field> &fields,
-                  const std::vector<std::string> &row,
-                  MemgraphNodeIdMap &node_id_map,
-                  communication::bolt::BaseEncoder<HashedFileWriter> &encoder) {
+void WriteNodeRow(
+    std::unordered_map<gid::Gid, durability::DecodedSnapshotVertex>
+        &partial_vertices,
+    const std::vector<Field> &fields, const std::vector<std::string> &row,
+    MemgraphNodeIdMap &node_id_map) {
   std::experimental::optional<gid::Gid> id;
-  std::vector<query::TypedValue> labels;
-  std::map<std::string, query::TypedValue> properties;
+  std::vector<std::string> labels;
+  std::map<std::string, communication::bolt::DecodedValue> properties;
   for (int i = 0; i < row.size(); ++i) {
     const auto &field = fields[i];
     auto value = utils::Trim(row[i]);
@@ -241,21 +245,16 @@ void WriteNodeRow(const std::vector<Field> &fields,
         labels.emplace_back(utils::Trim(label));
       }
     } else if (field.type != "ignore") {
-      properties[field.name] = StringToTypedValue(value, field.type);
+      properties[field.name] = StringToDecodedValue(value, field.type);
     }
   }
   CHECK(id) << "Node ID must be specified";
-  // write node
-  encoder.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
-                   3);
-  encoder.WriteRAW(underlying_cast(communication::bolt::Signature::Node));
-  encoder.WriteInt(*id);
-  encoder.WriteList(labels);
-  encoder.WriteMap(properties);
+  partial_vertices[*id] = {*id, labels, properties, {}};
 }
 
-auto ConvertNodes(const std::string &nodes_path, MemgraphNodeIdMap &node_id_map,
-                  communication::bolt::BaseEncoder<HashedFileWriter> &encoder) {
+auto PassNodes(std::unordered_map<gid::Gid, durability::DecodedSnapshotVertex>
+                   &partial_vertices,
+               const std::string &nodes_path, MemgraphNodeIdMap &node_id_map) {
   int64_t node_count = 0;
   std::ifstream nodes_file(nodes_path);
   CHECK(nodes_file) << fmt::format("Unable to open '{}'", nodes_path);
@@ -264,7 +263,7 @@ auto ConvertNodes(const std::string &nodes_path, MemgraphNodeIdMap &node_id_map,
   while (!row.empty()) {
     CHECK_EQ(row.size(), fields.size())
         << "Expected as many values as there are header fields";
-    WriteNodeRow(fields, row, node_id_map, encoder);
+    WriteNodeRow(partial_vertices, fields, row, node_id_map);
     // Increase count and move to next row.
     node_count += 1;
     row = ReadRow(nodes_file);
@@ -273,13 +272,13 @@ auto ConvertNodes(const std::string &nodes_path, MemgraphNodeIdMap &node_id_map,
 }
 
 void WriteRelationshipsRow(
+    std::unordered_map<gid::Gid, communication::bolt::DecodedEdge> &edges,
     const std::vector<Field> &fields, const std::vector<std::string> &row,
-    const MemgraphNodeIdMap &node_id_map, gid::Gid relationship_id,
-    communication::bolt::BaseEncoder<HashedFileWriter> &encoder) {
+    const MemgraphNodeIdMap &node_id_map, gid::Gid relationship_id) {
   std::experimental::optional<int64_t> start_id;
   std::experimental::optional<int64_t> end_id;
   std::experimental::optional<std::string> relationship_type;
-  std::map<std::string, query::TypedValue> properties;
+  std::map<std::string, communication::bolt::DecodedValue> properties;
   for (int i = 0; i < row.size(); ++i) {
     const auto &field = fields[i];
     auto value = utils::Trim(row[i]);
@@ -300,27 +299,20 @@ void WriteRelationshipsRow(
           << "Only one relationship TYPE must be specified";
       relationship_type = value;
     } else if (field.type != "ignore") {
-      properties[field.name] = StringToTypedValue(value, field.type);
+      properties[field.name] = StringToDecodedValue(value, field.type);
     }
   }
   CHECK(start_id) << "START_ID must be set";
   CHECK(end_id) << "END_ID must be set";
   CHECK(relationship_type) << "Relationship TYPE must be set";
-  // write relationship
-  encoder.WriteRAW(underlying_cast(communication::bolt::Marker::TinyStruct) +
-                   5);
-  encoder.WriteRAW(
-      underlying_cast(communication::bolt::Signature::Relationship));
-  encoder.WriteInt(relationship_id);
-  encoder.WriteInt(*start_id);
-  encoder.WriteInt(*end_id);
-  encoder.WriteString(*relationship_type);
-  encoder.WriteMap(properties);
+
+  edges[relationship_id] = {(int64_t)relationship_id, *start_id, *end_id,
+                            *relationship_type, properties};
 }
 
-int ConvertRelationships(
+int PassRelationships(
+    std::unordered_map<gid::Gid, communication::bolt::DecodedEdge> &edges,
     const std::string &relationships_path, const MemgraphNodeIdMap &node_id_map,
-    communication::bolt::BaseEncoder<HashedFileWriter> &encoder,
     gid::Generator &relationship_id_generator) {
   std::ifstream relationships_file(relationships_path);
   CHECK(relationships_file)
@@ -331,8 +323,8 @@ int ConvertRelationships(
   while (!row.empty()) {
     CHECK_EQ(row.size(), fields.size())
         << "Expected as many values as there are header fields";
-    WriteRelationshipsRow(fields, row, node_id_map,
-                          relationship_id_generator.Next(), encoder);
+    WriteRelationshipsRow(edges, fields, row, node_id_map,
+                          relationship_id_generator.Next());
     ++relationships;
     row = ReadRow(relationships_file);
   }
@@ -344,7 +336,7 @@ void Convert(const std::vector<std::string> &nodes,
              const std::string &output_path) {
   try {
     HashedFileWriter buffer(output_path);
-    communication::bolt::BaseEncoder<HashedFileWriter> encoder(buffer);
+    durability::SnapshotEncoder<HashedFileWriter> encoder(buffer);
     int64_t node_count = 0;
     int64_t edge_count = 0;
     gid::Generator relationship_id_generator(0);
@@ -375,13 +367,68 @@ void Convert(const std::vector<std::string> &nodes,
     encoder.WriteInt(0);    // Id of transaction that is snapshooting.
     encoder.WriteList({});  // Transactional snapshot.
     encoder.WriteList({});  // Label + property indexes.
+    std::unordered_map<gid::Gid, durability::DecodedSnapshotVertex> vertices;
+    std::unordered_map<gid::Gid, communication::bolt::DecodedEdge> edges;
     for (const auto &nodes_file : nodes) {
-      node_count += ConvertNodes(nodes_file, node_id_map, encoder);
+      node_count += PassNodes(vertices, nodes_file, node_id_map);
     }
     for (const auto &relationships_file : relationships) {
-      edge_count += ConvertRelationships(relationships_file, node_id_map,
-                                         encoder, relationship_id_generator);
+      edge_count += PassRelationships(edges, relationships_file, node_id_map,
+                                      relationship_id_generator);
     }
+    for (auto edge : edges) {
+      auto encoded = edge.second;
+      vertices[encoded.from].out.push_back({Edges::EdgeAddress(encoded.id, 0),
+                                            Edges::VertexAddress(encoded.to, 0),
+                                            encoded.type});
+      vertices[encoded.to].in.push_back({Edges::EdgeAddress(encoded.id, 0),
+                                         Edges::VertexAddress(encoded.from, 0),
+                                         encoded.type});
+    }
+    for (auto vertex_pair : vertices) {
+      auto &vertex = vertex_pair.second;
+      // write node
+      encoder.WriteRAW(
+          underlying_cast(communication::bolt::Marker::TinyStruct) + 3);
+      encoder.WriteRAW(underlying_cast(communication::bolt::Signature::Node));
+
+      encoder.WriteInt(vertex.gid);
+      auto &labels = vertex.labels;
+      std::vector<query::TypedValue> transformed;
+      std::transform(
+          labels.begin(), labels.end(), std::back_inserter(transformed),
+          [](const std::string &str) -> query::TypedValue { return str; });
+      encoder.WriteList(transformed);
+      encoder.WriteMap(vertex.properties);
+
+      encoder.WriteInt(vertex.in.size());
+      for (auto edge : vertex.in) {
+        encoder.WriteInt(edge.address.raw());
+        encoder.WriteInt(edge.vertex.raw());
+        encoder.WriteString(edge.type);
+      }
+      encoder.WriteInt(vertex.out.size());
+      for (auto edge : vertex.out) {
+        encoder.WriteInt(edge.address.raw());
+        encoder.WriteInt(edge.vertex.raw());
+        encoder.WriteString(edge.type);
+      }
+    }
+
+    for (auto edge_pair : edges) {
+      auto &edge = edge_pair.second;
+      // write relationship
+      encoder.WriteRAW(
+          underlying_cast(communication::bolt::Marker::TinyStruct) + 5);
+      encoder.WriteRAW(
+          underlying_cast(communication::bolt::Signature::Relationship));
+      encoder.WriteInt(edge.id);
+      encoder.WriteInt(edge.from);
+      encoder.WriteInt(edge.to);
+      encoder.WriteString(edge.type);
+      encoder.WriteMap(edge.properties);
+    }
+
     buffer.WriteValue(node_count);
     buffer.WriteValue(edge_count);
     buffer.WriteValue(buffer.hash());