From cd528b9bd6f38bc0546965cc2666bc2805d237b6 Mon Sep 17 00:00:00 2001 From: Kruno Tomola Fabro Date: Sun, 11 Sep 2016 21:30:31 +0100 Subject: [PATCH] Uplifted int32 to int64 and float to double in csv_import. --- include/import/csv_import.hpp | 42 +++++++++++++++++----------- include/snapshot/snapshot_engine.hpp | 3 +- poc/profile.cpp | 17 +++++++++-- poc/profile.hpp | 2 +- src/snapshot/snapshot_engine.cpp | 35 ++++++++++++++--------- 5 files changed, 64 insertions(+), 35 deletions(-) diff --git a/include/import/csv_import.hpp b/include/import/csv_import.hpp index b72be4526..aeb799c96 100644 --- a/include/import/csv_import.hpp +++ b/include/import/csv_import.hpp @@ -38,6 +38,10 @@ using namespace std; constexpr char const *_string = "string"; +// Will change all int32 into int64, and all float into double from csv into +// database. Uplifting will occure even in arrays. +constexpr const bool UPLIFT_PRIMITIVES = true; + bool equal_str(const char *a, const char *b) { return strcasecmp(a, b) == 0; } // CSV importer for importing multiple files regarding same graph. @@ -243,7 +247,8 @@ private: new BoolFiller(property_key(name, Flags::Bool))); return make_option(std::move(f)); - } else if (equal_str(type, "double")) { + } else if (equal_str(type, "double") || + (UPLIFT_PRIMITIVES && equal_str(type, "float"))) { std::unique_ptr f( new DoubleFiller(property_key(name, Flags::Double))); return make_option(std::move(f)); @@ -253,16 +258,17 @@ private: new FloatFiller(property_key(name, Flags::Float))); return make_option(std::move(f)); + } else if (equal_str(type, "long") || + (UPLIFT_PRIMITIVES && equal_str(type, "int"))) { + std::unique_ptr f( + new Int64Filler(property_key(name, Flags::Int64))); + return make_option(std::move(f)); + } else if (equal_str(type, "int")) { std::unique_ptr f( new Int32Filler(property_key(name, Flags::Int32))); return make_option(std::move(f)); - } else if (equal_str(type, "long")) { - std::unique_ptr f( - new Int64Filler(property_key(name, Flags::Int64))); - return make_option(std::move(f)); - } else if (equal_str(type, "string")) { std::unique_ptr f( new StringFiller(property_key(name, Flags::String))); @@ -273,16 +279,25 @@ private: *this, property_key(name, Flags::ArrayBool), to_bool)); return make_option(std::move(f)); + } else if (equal_str(type, "double[]") || + (UPLIFT_PRIMITIVES && equal_str(type, "float[]"))) { + std::unique_ptr f( + make_array_filler( + *this, property_key(name, Flags::ArrayDouble), + to_double)); + return make_option(std::move(f)); + } else if (equal_str(type, "float[]")) { std::unique_ptr f(make_array_filler( *this, property_key(name, Flags::ArrayFloat), to_float)); return make_option(std::move(f)); - } else if (equal_str(type, "double[]")) { + } else if (equal_str(type, "long[]") || + (UPLIFT_PRIMITIVES && equal_str(type, "int[]"))) { std::unique_ptr f( - make_array_filler( - *this, property_key(name, Flags::ArrayDouble), - to_double)); + make_array_filler( + *this, property_key(name, Flags::ArrayInt64), + to_int64)); return make_option(std::move(f)); } else if (equal_str(type, "int[]")) { @@ -292,13 +307,6 @@ private: to_int32)); return make_option(std::move(f)); - } else if (equal_str(type, "long[]")) { - std::unique_ptr f( - make_array_filler( - *this, property_key(name, Flags::ArrayInt64), - to_int64)); - return make_option(std::move(f)); - } else if (equal_str(type, "string[]")) { std::unique_ptr f( make_array_filler( diff --git a/include/snapshot/snapshot_engine.hpp b/include/snapshot/snapshot_engine.hpp index c6e785265..448771122 100644 --- a/include/snapshot/snapshot_engine.hpp +++ b/include/snapshot/snapshot_engine.hpp @@ -10,6 +10,7 @@ class SnapshotEncoder; class SnapshotDecoder; class Db; class DbTransaction; +class DbAccessor; // Captures snapshots. Only one per database should exist. class SnapshotEngine @@ -39,7 +40,7 @@ private: tx::TransactionRead const &old_trans); // Loads snapshot. True if success - bool snapshot_load(DbTransaction const &dt, SnapshotDecoder &snap); + bool snapshot_load(DbAccessor &t, SnapshotDecoder &snap); std::string snapshot_file(std::time_t const &now, const char *type); diff --git a/poc/profile.cpp b/poc/profile.cpp index 2b1b35c73..0d611918f 100644 --- a/poc/profile.cpp +++ b/poc/profile.cpp @@ -10,21 +10,28 @@ #include #include #include + #include "communication/bolt/v1/serialization/bolt_serializer.hpp" #include "import/csv_import.hpp" +#include "logging/default.hpp" +#include "logging/streams/stdout.hpp" #include "utils/command_line/arguments.hpp" using namespace std; +// Accepts flags for csv import. +// -db name # will create database with that name. +// -s true # will create snapshot of the database after import. int main(int argc, char **argv) { + logging::init_async(); + logging::log->pipe(std::make_unique()); + auto para = all_arguments(argc, argv); - Db db; + Db db(get_argument(para, "-db", "powerlinks_profile")); import_csv_from_arguments(db, para); - usleep(1000 * 1000 * 10); - { DbAccessor t(db); @@ -71,6 +78,10 @@ int main(int argc, char **argv) cout << endl << endl << "Compiler sum " << sum << endl; t.commit(); } + + if (get_argument(para, "-s", "false") == "true") { + db.snap_engine.make_snapshot(); + } // usleep(1000 * 1000 * 60); return 0; diff --git a/poc/profile.hpp b/poc/profile.hpp index ef4064c77..0c7efd8e7 100644 --- a/poc/profile.hpp +++ b/poc/profile.hpp @@ -189,7 +189,7 @@ auto query(DbAccessor &t, const Id &start_id) const EdgeType &type_created = t.type_find_or_create("Created"); auto prop_edge_status = t.edge_property_key("status"); - auto prop_edge_count = t.edge_property_key("count"); + auto prop_edge_count = t.edge_property_key("count"); auto prop_edge_feedback = t.edge_property_key("feedback"); auto prop_vertex_business_types = diff --git a/src/snapshot/snapshot_engine.cpp b/src/snapshot/snapshot_engine.cpp index 5f6c4dbde..45b605c90 100644 --- a/src/snapshot/snapshot_engine.cpp +++ b/src/snapshot/snapshot_engine.cpp @@ -95,7 +95,7 @@ bool SnapshotEngine::import() logger.info("Importing data from snapshot \"{}\"", snapshots.back()); - DbTransaction t(db); + DbAccessor t(db); try { std::ifstream snapshot_file(snapshots.back(), @@ -103,13 +103,20 @@ bool SnapshotEngine::import() SnapshotDecoder decoder(snapshot_file); if (snapshot_load(t, decoder)) { - t.trans.commit(); - logger.info("Succesfully imported snapshot \"{}\"", - snapshots.back()); - success = true; - break; + if (t.commit()) { + logger.info("Succesfully imported snapshot \"{}\"", + snapshots.back()); + success = true; + break; + + } else { + logger.info("Unuccesfully tryed to import snapshot " + "\"{}\" because indexes where unuccesfully " + "with updating", + snapshots.back()); + } } else { - t.trans.abort(); + t.abort(); logger.info("Unuccesfully tryed to import snapshot \"{}\"", snapshots.back()); } @@ -118,7 +125,7 @@ bool SnapshotEngine::import() logger.error("Error occured while importing snapshot \"{}\"", snapshots.back()); logger.error("{}", e.what()); - t.trans.abort(); + t.abort(); } snapshots.pop_back(); @@ -180,28 +187,30 @@ void SnapshotEngine::snapshot(DbTransaction const &dt, SnapshotEncoder &snap, snap.end(); } -bool SnapshotEngine::snapshot_load(DbTransaction const &dt, - SnapshotDecoder &snap) +bool SnapshotEngine::snapshot_load(DbAccessor &t, SnapshotDecoder &snap) { std::unordered_map vertices; - Db &db = dt.db; - DbAccessor t(db, dt.trans); - // Load names snap.load_init(); // Load vertices snap.begin_vertices(); + size_t v_count = 0; while (!snap.end_vertices()) { vertices.insert(serialization::deserialize_vertex(t, snap)); + v_count++; } + logger.info("Loaded {} vertices", v_count); // Load edges snap.begin_edges(); + size_t e_count = 0; while (!snap.end_edges()) { serialization::deserialize_edge(t, snap, vertices); + e_count++; } + logger.info("Loaded {} edges", e_count); // Load indexes snap.start_indexes();