Uplifted int32 to int64 and float to double in csv_import.

This commit is contained in:
Kruno Tomola Fabro 2016-09-11 21:30:31 +01:00
parent 150f22ffef
commit cd528b9bd6
5 changed files with 64 additions and 35 deletions

View File

@ -38,6 +38,10 @@ using namespace std;
constexpr char const *_string = "string";
// Will change all int32 into int64, and all float into double from csv into
// database. Uplifting will occure even in arrays.
constexpr const bool UPLIFT_PRIMITIVES = true;
bool equal_str(const char *a, const char *b) { return strcasecmp(a, b) == 0; }
// CSV importer for importing multiple files regarding same graph.
@ -243,7 +247,8 @@ private:
new BoolFiller<TG>(property_key<TG>(name, Flags::Bool)));
return make_option(std::move(f));
} else if (equal_str(type, "double")) {
} else if (equal_str(type, "double") ||
(UPLIFT_PRIMITIVES && equal_str(type, "float"))) {
std::unique_ptr<Filler> f(
new DoubleFiller<TG>(property_key<TG>(name, Flags::Double)));
return make_option(std::move(f));
@ -253,16 +258,17 @@ private:
new FloatFiller<TG>(property_key<TG>(name, Flags::Float)));
return make_option(std::move(f));
} else if (equal_str(type, "long") ||
(UPLIFT_PRIMITIVES && equal_str(type, "int"))) {
std::unique_ptr<Filler> f(
new Int64Filler<TG>(property_key<TG>(name, Flags::Int64)));
return make_option(std::move(f));
} else if (equal_str(type, "int")) {
std::unique_ptr<Filler> f(
new Int32Filler<TG>(property_key<TG>(name, Flags::Int32)));
return make_option(std::move(f));
} else if (equal_str(type, "long")) {
std::unique_ptr<Filler> f(
new Int64Filler<TG>(property_key<TG>(name, Flags::Int64)));
return make_option(std::move(f));
} else if (equal_str(type, "string")) {
std::unique_ptr<Filler> f(
new StringFiller<TG>(property_key<TG>(name, Flags::String)));
@ -273,16 +279,25 @@ private:
*this, property_key<TG>(name, Flags::ArrayBool), to_bool));
return make_option(std::move(f));
} else if (equal_str(type, "double[]") ||
(UPLIFT_PRIMITIVES && equal_str(type, "float[]"))) {
std::unique_ptr<Filler> f(
make_array_filler<TG, double, ArrayDouble>(
*this, property_key<TG>(name, Flags::ArrayDouble),
to_double));
return make_option(std::move(f));
} else if (equal_str(type, "float[]")) {
std::unique_ptr<Filler> f(make_array_filler<TG, float, ArrayFloat>(
*this, property_key<TG>(name, Flags::ArrayFloat), to_float));
return make_option(std::move(f));
} else if (equal_str(type, "double[]")) {
} else if (equal_str(type, "long[]") ||
(UPLIFT_PRIMITIVES && equal_str(type, "int[]"))) {
std::unique_ptr<Filler> f(
make_array_filler<TG, double, ArrayDouble>(
*this, property_key<TG>(name, Flags::ArrayDouble),
to_double));
make_array_filler<TG, int64_t, ArrayInt64>(
*this, property_key<TG>(name, Flags::ArrayInt64),
to_int64));
return make_option(std::move(f));
} else if (equal_str(type, "int[]")) {
@ -292,13 +307,6 @@ private:
to_int32));
return make_option(std::move(f));
} else if (equal_str(type, "long[]")) {
std::unique_ptr<Filler> f(
make_array_filler<TG, int64_t, ArrayInt64>(
*this, property_key<TG>(name, Flags::ArrayInt64),
to_int64));
return make_option(std::move(f));
} else if (equal_str(type, "string[]")) {
std::unique_ptr<Filler> f(
make_array_filler<TG, std::string, ArrayString>(

View File

@ -10,6 +10,7 @@ class SnapshotEncoder;
class SnapshotDecoder;
class Db;
class DbTransaction;
class DbAccessor;
// Captures snapshots. Only one per database should exist.
class SnapshotEngine
@ -39,7 +40,7 @@ private:
tx::TransactionRead const &old_trans);
// Loads snapshot. True if success
bool snapshot_load(DbTransaction const &dt, SnapshotDecoder &snap);
bool snapshot_load(DbAccessor &t, SnapshotDecoder &snap);
std::string snapshot_file(std::time_t const &now, const char *type);

View File

@ -10,21 +10,28 @@
#include <strings.h>
#include <unistd.h>
#include <unordered_map>
#include "communication/bolt/v1/serialization/bolt_serializer.hpp"
#include "import/csv_import.hpp"
#include "logging/default.hpp"
#include "logging/streams/stdout.hpp"
#include "utils/command_line/arguments.hpp"
using namespace std;
// Accepts flags for csv import.
// -db name # will create database with that name.
// -s true # will create snapshot of the database after import.
int main(int argc, char **argv)
{
logging::init_async();
logging::log->pipe(std::make_unique<Stdout>());
auto para = all_arguments(argc, argv);
Db db;
Db db(get_argument(para, "-db", "powerlinks_profile"));
import_csv_from_arguments(db, para);
usleep(1000 * 1000 * 10);
{
DbAccessor t(db);
@ -71,6 +78,10 @@ int main(int argc, char **argv)
cout << endl << endl << "Compiler sum " << sum << endl;
t.commit();
}
if (get_argument(para, "-s", "false") == "true") {
db.snap_engine.make_snapshot();
}
// usleep(1000 * 1000 * 60);
return 0;

View File

@ -189,7 +189,7 @@ auto query(DbAccessor &t, const Id &start_id)
const EdgeType &type_created = t.type_find_or_create("Created");
auto prop_edge_status = t.edge_property_key<String>("status");
auto prop_edge_count = t.edge_property_key<Int32>("count");
auto prop_edge_count = t.edge_property_key<Int64>("count");
auto prop_edge_feedback = t.edge_property_key<String>("feedback");
auto prop_vertex_business_types =

View File

@ -95,7 +95,7 @@ bool SnapshotEngine::import()
logger.info("Importing data from snapshot \"{}\"",
snapshots.back());
DbTransaction t(db);
DbAccessor t(db);
try {
std::ifstream snapshot_file(snapshots.back(),
@ -103,13 +103,20 @@ bool SnapshotEngine::import()
SnapshotDecoder decoder(snapshot_file);
if (snapshot_load(t, decoder)) {
t.trans.commit();
logger.info("Succesfully imported snapshot \"{}\"",
snapshots.back());
success = true;
break;
if (t.commit()) {
logger.info("Succesfully imported snapshot \"{}\"",
snapshots.back());
success = true;
break;
} else {
logger.info("Unuccesfully tryed to import snapshot "
"\"{}\" because indexes where unuccesfully "
"with updating",
snapshots.back());
}
} else {
t.trans.abort();
t.abort();
logger.info("Unuccesfully tryed to import snapshot \"{}\"",
snapshots.back());
}
@ -118,7 +125,7 @@ bool SnapshotEngine::import()
logger.error("Error occured while importing snapshot \"{}\"",
snapshots.back());
logger.error("{}", e.what());
t.trans.abort();
t.abort();
}
snapshots.pop_back();
@ -180,28 +187,30 @@ void SnapshotEngine::snapshot(DbTransaction const &dt, SnapshotEncoder &snap,
snap.end();
}
bool SnapshotEngine::snapshot_load(DbTransaction const &dt,
SnapshotDecoder &snap)
bool SnapshotEngine::snapshot_load(DbAccessor &t, SnapshotDecoder &snap)
{
std::unordered_map<uint64_t, VertexAccessor> vertices;
Db &db = dt.db;
DbAccessor t(db, dt.trans);
// Load names
snap.load_init();
// Load vertices
snap.begin_vertices();
size_t v_count = 0;
while (!snap.end_vertices()) {
vertices.insert(serialization::deserialize_vertex(t, snap));
v_count++;
}
logger.info("Loaded {} vertices", v_count);
// Load edges
snap.begin_edges();
size_t e_count = 0;
while (!snap.end_edges()) {
serialization::deserialize_edge(t, snap, vertices);
e_count++;
}
logger.info("Loaded {} edges", e_count);
// Load indexes
snap.start_indexes();