From f1bae7be4474b0165c02fb1d52886d65499187ab Mon Sep 17 00:00:00 2001
From: Matej Ferencevic <matej.ferencevic@memgraph.io>
Date: Tue, 16 Oct 2018 12:35:14 +0200
Subject: [PATCH] Reduce mg_import_csv memory usage

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1663
---
 tools/src/mg_import_csv/main.cpp | 39 ++++++++++++--------------------
 1 file changed, 14 insertions(+), 25 deletions(-)

diff --git a/tools/src/mg_import_csv/main.cpp b/tools/src/mg_import_csv/main.cpp
index 113c41e83..0f99b8dea 100644
--- a/tools/src/mg_import_csv/main.cpp
+++ b/tools/src/mg_import_csv/main.cpp
@@ -260,7 +260,7 @@ std::string GetIdSpace(const std::string &type) {
 }
 
 void WriteNodeRow(
-    std::unordered_map<gid::Gid, communication::bolt::Vertex> &partial_vertices,
+    communication::bolt::BaseEncoder<HashedFileWriter> *encoder,
     const std::vector<Field> &fields, const std::vector<std::string> &row,
     const std::vector<std::string> &additional_labels,
     MemgraphNodeIdMap &node_id_map) {
@@ -296,12 +296,12 @@ void WriteNodeRow(
   labels.insert(labels.end(), additional_labels.begin(),
                 additional_labels.end());
   CHECK(id) << "Node ID must be specified";
-  partial_vertices[*id] = {communication::bolt::Id::FromUint(*id), labels,
-                           properties};
+  encoder->WriteVertex(
+      {communication::bolt::Id::FromUint(*id), labels, properties});
 }
 
 auto PassNodes(
-    std::unordered_map<gid::Gid, communication::bolt::Vertex> &partial_vertices,
+    communication::bolt::BaseEncoder<HashedFileWriter> *encoder,
     const std::string &nodes_path, MemgraphNodeIdMap &node_id_map,
     const std::vector<std::string> &additional_labels) {
   int64_t node_count = 0;
@@ -312,7 +312,7 @@ auto PassNodes(
   while (!row.empty()) {
     CHECK_EQ(row.size(), fields.size())
         << "Expected as many values as there are header fields";
-    WriteNodeRow(partial_vertices, fields, row, additional_labels, node_id_map);
+    WriteNodeRow(encoder, fields, row, additional_labels, node_id_map);
     // Increase count and move to next row.
     node_count += 1;
     row = ReadRow(nodes_file);
@@ -321,7 +321,7 @@ auto PassNodes(
 }
 
 void WriteRelationshipsRow(
-    std::unordered_map<gid::Gid, communication::bolt::Edge> &edges,
+    communication::bolt::BaseEncoder<HashedFileWriter> *encoder,
     const std::vector<Field> &fields, const std::vector<std::string> &row,
     const MemgraphNodeIdMap &node_id_map, gid::Gid relationship_id) {
   std::experimental::optional<int64_t> start_id;
@@ -362,12 +362,12 @@ void WriteRelationshipsRow(
   auto bolt_id = communication::bolt::Id::FromUint(relationship_id);
   auto bolt_start_id = communication::bolt::Id::FromUint(*start_id);
   auto bolt_end_id = communication::bolt::Id::FromUint(*end_id);
-  edges[relationship_id] = {bolt_id, bolt_start_id, bolt_end_id,
-                            *relationship_type, properties};
+  encoder->WriteEdge(
+      {bolt_id, bolt_start_id, bolt_end_id, *relationship_type, properties});
 }
 
 int PassRelationships(
-    std::unordered_map<gid::Gid, communication::bolt::Edge> &edges,
+    communication::bolt::BaseEncoder<HashedFileWriter> *encoder,
     const std::string &relationships_path, const MemgraphNodeIdMap &node_id_map,
     gid::Generator &relationship_id_generator) {
   std::ifstream relationships_file(relationships_path);
@@ -379,7 +379,7 @@ int PassRelationships(
   while (!row.empty()) {
     CHECK_EQ(row.size(), fields.size())
         << "Expected as many values as there are header fields";
-    WriteRelationshipsRow(edges, fields, row, node_id_map,
+    WriteRelationshipsRow(encoder, fields, row, node_id_map,
                           relationship_id_generator.Next());
     ++relationships;
     row = ReadRow(relationships_file);
@@ -414,27 +414,16 @@ void Convert(const std::vector<std::string> &nodes,
     encoder.WriteInt(0);    // Id of transaction that is snapshooting.
     encoder.WriteList({});  // Transactional snapshot.
     encoder.WriteList({});  // Label + property indexes.
-    std::unordered_map<gid::Gid, communication::bolt::Vertex> vertices;
-    std::unordered_map<gid::Gid, communication::bolt::Edge> edges;
+    // PassNodes streams vertices to the encoder.
     for (const auto &nodes_file : nodes) {
       node_count +=
-          PassNodes(vertices, nodes_file, node_id_map, additional_labels);
+          PassNodes(&encoder, nodes_file, node_id_map, additional_labels);
     }
+    // PassEdges streams edges to the encoder.
     for (const auto &relationships_file : relationships) {
-      edge_count += PassRelationships(edges, relationships_file, node_id_map,
+      edge_count += PassRelationships(&encoder, relationships_file, node_id_map,
                                       relationship_id_generator);
     }
-
-    for (auto vertex_pair : vertices) {
-      auto &vertex = vertex_pair.second;
-      encoder.WriteVertex(vertex);
-    }
-
-    for (auto edge_pair : edges) {
-      auto &edge = edge_pair.second;
-      encoder.WriteEdge(edge);
-    }
-
     buffer.WriteValue(node_count);
     buffer.WriteValue(edge_count);
     buffer.WriteValue(buffer.hash());