Reduce mg_import_csv memory usage

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1663
This commit is contained in:
Matej Ferencevic 2018-10-16 12:35:14 +02:00
parent fb6284cb99
commit f1bae7be44

View File

@ -260,7 +260,7 @@ std::string GetIdSpace(const std::string &type) {
}
void WriteNodeRow(
std::unordered_map<gid::Gid, communication::bolt::Vertex> &partial_vertices,
communication::bolt::BaseEncoder<HashedFileWriter> *encoder,
const std::vector<Field> &fields, const std::vector<std::string> &row,
const std::vector<std::string> &additional_labels,
MemgraphNodeIdMap &node_id_map) {
@ -296,12 +296,12 @@ void WriteNodeRow(
labels.insert(labels.end(), additional_labels.begin(),
additional_labels.end());
CHECK(id) << "Node ID must be specified";
partial_vertices[*id] = {communication::bolt::Id::FromUint(*id), labels,
properties};
encoder->WriteVertex(
{communication::bolt::Id::FromUint(*id), labels, properties});
}
auto PassNodes(
std::unordered_map<gid::Gid, communication::bolt::Vertex> &partial_vertices,
communication::bolt::BaseEncoder<HashedFileWriter> *encoder,
const std::string &nodes_path, MemgraphNodeIdMap &node_id_map,
const std::vector<std::string> &additional_labels) {
int64_t node_count = 0;
@ -312,7 +312,7 @@ auto PassNodes(
while (!row.empty()) {
CHECK_EQ(row.size(), fields.size())
<< "Expected as many values as there are header fields";
WriteNodeRow(partial_vertices, fields, row, additional_labels, node_id_map);
WriteNodeRow(encoder, fields, row, additional_labels, node_id_map);
// Increase count and move to next row.
node_count += 1;
row = ReadRow(nodes_file);
@ -321,7 +321,7 @@ auto PassNodes(
}
void WriteRelationshipsRow(
std::unordered_map<gid::Gid, communication::bolt::Edge> &edges,
communication::bolt::BaseEncoder<HashedFileWriter> *encoder,
const std::vector<Field> &fields, const std::vector<std::string> &row,
const MemgraphNodeIdMap &node_id_map, gid::Gid relationship_id) {
std::experimental::optional<int64_t> start_id;
@ -362,12 +362,12 @@ void WriteRelationshipsRow(
auto bolt_id = communication::bolt::Id::FromUint(relationship_id);
auto bolt_start_id = communication::bolt::Id::FromUint(*start_id);
auto bolt_end_id = communication::bolt::Id::FromUint(*end_id);
edges[relationship_id] = {bolt_id, bolt_start_id, bolt_end_id,
*relationship_type, properties};
encoder->WriteEdge(
{bolt_id, bolt_start_id, bolt_end_id, *relationship_type, properties});
}
int PassRelationships(
std::unordered_map<gid::Gid, communication::bolt::Edge> &edges,
communication::bolt::BaseEncoder<HashedFileWriter> *encoder,
const std::string &relationships_path, const MemgraphNodeIdMap &node_id_map,
gid::Generator &relationship_id_generator) {
std::ifstream relationships_file(relationships_path);
@ -379,7 +379,7 @@ int PassRelationships(
while (!row.empty()) {
CHECK_EQ(row.size(), fields.size())
<< "Expected as many values as there are header fields";
WriteRelationshipsRow(edges, fields, row, node_id_map,
WriteRelationshipsRow(encoder, fields, row, node_id_map,
relationship_id_generator.Next());
++relationships;
row = ReadRow(relationships_file);
@ -414,27 +414,16 @@ void Convert(const std::vector<std::string> &nodes,
encoder.WriteInt(0); // Id of transaction that is snapshooting.
encoder.WriteList({}); // Transactional snapshot.
encoder.WriteList({}); // Label + property indexes.
std::unordered_map<gid::Gid, communication::bolt::Vertex> vertices;
std::unordered_map<gid::Gid, communication::bolt::Edge> edges;
// PassNodes streams vertices to the encoder.
for (const auto &nodes_file : nodes) {
node_count +=
PassNodes(vertices, nodes_file, node_id_map, additional_labels);
PassNodes(&encoder, nodes_file, node_id_map, additional_labels);
}
// PassEdges streams edges to the encoder.
for (const auto &relationships_file : relationships) {
edge_count += PassRelationships(edges, relationships_file, node_id_map,
edge_count += PassRelationships(&encoder, relationships_file, node_id_map,
relationship_id_generator);
}
for (auto vertex_pair : vertices) {
auto &vertex = vertex_pair.second;
encoder.WriteVertex(vertex);
}
for (auto edge_pair : edges) {
auto &edge = edge_pair.second;
encoder.WriteEdge(edge);
}
buffer.WriteValue(node_count);
buffer.WriteValue(edge_count);
buffer.WriteValue(buffer.hash());