Fix CSV multiple edge file import

Summary:
The CSV importer used to generate non-unique edge IDs when
generating edges from multiple CSV files. This is incompatible with the
unique ID requirements introduced by the WAL. Tested and fixed in this
diff.

Reviewers: teon.banek, mferencevic

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D980
This commit is contained in:
florijan 2017-11-14 14:06:19 +01:00
parent 885d19401f
commit b7188c296c
5 changed files with 18 additions and 17 deletions

View File

@ -300,10 +300,10 @@ void WriteRelationshipsRow(
encoder.WriteMap(properties); encoder.WriteMap(properties);
} }
auto ConvertRelationships( void ConvertRelationships(
const std::string &relationships_path, const MemgraphNodeIdMap &node_id_map, const std::string &relationships_path, const MemgraphNodeIdMap &node_id_map,
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) { communication::bolt::BaseEncoder<HashedFileWriter> &encoder,
int64_t relationship_count = 0; int64_t &next_relationship_id) {
std::ifstream relationships_file(relationships_path); std::ifstream relationships_file(relationships_path);
CHECK(relationships_file) CHECK(relationships_file)
<< fmt::format("Unable to open '{}'", relationships_path); << fmt::format("Unable to open '{}'", relationships_path);
@ -312,13 +312,10 @@ auto ConvertRelationships(
while (!row.empty()) { while (!row.empty()) {
CHECK_EQ(row.size(), fields.size()) CHECK_EQ(row.size(), fields.size())
<< "Expected as many values as there are header fields"; << "Expected as many values as there are header fields";
auto relationship_id = relationship_count; WriteRelationshipsRow(fields, row, node_id_map, next_relationship_id++,
WriteRelationshipsRow(fields, row, node_id_map, relationship_id, encoder); encoder);
// Increase count and move to next row.
relationship_count += 1;
row = ReadRow(relationships_file); row = ReadRow(relationships_file);
} }
return relationship_count;
} }
void Convert(const std::vector<std::string> &nodes, void Convert(const std::vector<std::string> &nodes,
@ -328,7 +325,7 @@ void Convert(const std::vector<std::string> &nodes,
HashedFileWriter buffer(output_path); HashedFileWriter buffer(output_path);
communication::bolt::BaseEncoder<HashedFileWriter> encoder(buffer); communication::bolt::BaseEncoder<HashedFileWriter> encoder(buffer);
int64_t node_count = 0; int64_t node_count = 0;
int64_t relationship_count = 0; int64_t next_relationship_id = 0;
MemgraphNodeIdMap node_id_map; MemgraphNodeIdMap node_id_map;
// Snapshot file has the following contents in order: // Snapshot file has the following contents in order:
// 1) Magic number. // 1) Magic number.
@ -349,11 +346,11 @@ void Convert(const std::vector<std::string> &nodes,
node_count += ConvertNodes(nodes_file, node_id_map, encoder); node_count += ConvertNodes(nodes_file, node_id_map, encoder);
} }
for (const auto &relationships_file : relationships) { for (const auto &relationships_file : relationships) {
relationship_count += ConvertRelationships(relationships_file, node_id_map, encoder,
ConvertRelationships(relationships_file, node_id_map, encoder); next_relationship_id);
} }
buffer.WriteValue(node_count); buffer.WriteValue(node_count);
buffer.WriteValue(relationship_count); buffer.WriteValue(next_relationship_id);
buffer.WriteValue(buffer.hash()); buffer.WriteValue(buffer.hash());
} catch (const std::ios_base::failure &) { } catch (const std::ios_base::failure &) {
// Only HashedFileWriter sets the underlying fstream to throw. // Only HashedFileWriter sets the underlying fstream to throw.

View File

@ -25,7 +25,8 @@ target_link_libraries(mg_recovery_check gtest gtest_main)
# Copy CSV data to CMake build dir # Copy CSV data to CMake build dir
configure_file(csv/comment_nodes.csv csv/comment_nodes.csv COPYONLY) configure_file(csv/comment_nodes.csv csv/comment_nodes.csv COPYONLY)
configure_file(csv/forum_nodes.csv csv/forum_nodes.csv COPYONLY) configure_file(csv/forum_nodes.csv csv/forum_nodes.csv COPYONLY)
configure_file(csv/relationships.csv csv/relationships.csv COPYONLY) configure_file(csv/relationships_0.csv csv/relationships_0.csv COPYONLY)
configure_file(csv/relationships_1.csv csv/relationships_1.csv COPYONLY)
# Copy the actual runner to CMake build dir # Copy the actual runner to CMake build dir
configure_file(test_mg_import_csv test_mg_import_csv COPYONLY) configure_file(test_mg_import_csv test_mg_import_csv COPYONLY)

View File

@ -2,5 +2,3 @@
0|0|POSTED_ON 0|0|POSTED_ON
1|1|POSTED_ON 1|1|POSTED_ON
2|2|POSTED_ON 2|2|POSTED_ON
3|3|POSTED_ON
4|4|POSTED_ON
1 :START_ID(COMMENT_ID) :END_ID(FORUM_ID) :TYPE
2 0 0 POSTED_ON
3 1 1 POSTED_ON
4 2 2 POSTED_ON
3 3 POSTED_ON
4 4 POSTED_ON

View File

@ -0,0 +1,3 @@
:START_ID(COMMENT_ID)|:END_ID(FORUM_ID)|:TYPE
3|3|POSTED_ON
4|4|POSTED_ON
1 :START_ID(COMMENT_ID) :END_ID(FORUM_ID) :TYPE
2 3 3 POSTED_ON
3 4 4 POSTED_ON

View File

@ -23,11 +23,13 @@ def main():
args = parse_args() args = parse_args()
comment_nodes = os.path.join(_SCRIPT_DIR, 'csv', 'comment_nodes.csv') comment_nodes = os.path.join(_SCRIPT_DIR, 'csv', 'comment_nodes.csv')
forum_nodes = os.path.join(_SCRIPT_DIR, 'csv', 'forum_nodes.csv') forum_nodes = os.path.join(_SCRIPT_DIR, 'csv', 'forum_nodes.csv')
relationships = os.path.join(_SCRIPT_DIR, 'csv', 'relationships.csv') relationships_0 = os.path.join(_SCRIPT_DIR, 'csv', 'relationships_0.csv')
relationships_1 = os.path.join(_SCRIPT_DIR, 'csv', 'relationships_1.csv')
with tempfile.TemporaryDirectory(suffix='-snapshots', dir=_SCRIPT_DIR) as snapshot_dir: with tempfile.TemporaryDirectory(suffix='-snapshots', dir=_SCRIPT_DIR) as snapshot_dir:
out_snapshot = os.path.join(snapshot_dir, 'snapshot') out_snapshot = os.path.join(snapshot_dir, 'snapshot')
mg_import_csv = [args.mg_import_csv, '--nodes', comment_nodes, mg_import_csv = [args.mg_import_csv, '--nodes', comment_nodes,
'--nodes', forum_nodes, '--relationships', relationships, '--nodes', forum_nodes, '--relationships', relationships_0,
'--relationships', relationships_1,
'--out', out_snapshot, '--csv-delimiter=|', '--array-delimiter=;'] '--out', out_snapshot, '--csv-delimiter=|', '--array-delimiter=;']
subprocess.check_call(mg_import_csv) subprocess.check_call(mg_import_csv)
mg_recovery_check = [args.mg_recovery_check, '--snapshot-dir', snapshot_dir] mg_recovery_check = [args.mg_recovery_check, '--snapshot-dir', snapshot_dir]