Change gid bit size

Summary:
Change gid methods

Rename GidGenerator and add tests

Fix tools broken by gid changes

Reviewers: dgleich, buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1044
This commit is contained in:
Dominik Gleich 2017-12-28 10:35:12 +01:00 committed by florijan
parent 8952df06c1
commit 503381549e
14 changed files with 185 additions and 122 deletions

View File

@ -35,7 +35,7 @@ void GenerateGraph(GraphDb &db) {
// Randomize the sequence of IDs of created vertices and edges to simulate
// real-world lack of locality.
auto make_id_vector = [](size_t size) {
gid::GidGenerator generator{0};
gid::Generator generator{0};
std::vector<gid::Gid> ids(size);
for (size_t i = 0; i < size; ++i)
ids[i] = generator.Next(std::experimental::nullopt);

View File

@ -141,8 +141,8 @@ void GraphDb::CollectGarbage() {
// the ID of the oldest active transaction (or next active, if there
// are no currently active). That's legal because that was the
// last possible transaction that could have obtained pointers
// to those records. New snapshot can be used, different than one used
// for the first two phases of gc.
// to those records. New snapshot can be used, different than one used for
// first two phases of gc.
utils::Timer x;
const auto snapshot = tx_engine_->GlobalGcSnapshot();
edge_record_deleter_.FreeExpiredObjects(snapshot.back());
@ -169,8 +169,8 @@ GraphDb::~GraphDb() {
// Stop the gc scheduler to not run into race conditions for deletions.
gc_scheduler_.Stop();
// Stop the snapshot creator to avoid snapshooting while database is
// being deleted.
// Stop the snapshot creator to avoid snapshooting while database is being
// deleted.
snapshot_creator_.Stop();
// Stop transaction killer.
@ -194,8 +194,7 @@ GraphDb::~GraphDb() {
for (auto &id_vlist : vertices_.access()) delete id_vlist.second;
for (auto &id_vlist : edges_.access()) delete id_vlist.second;
// Free expired records with the maximal possible id from all the
// deleters.
// Free expired records with the maximal possible id from all the deleters.
edge_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
vertex_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
edge_version_list_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());

View File

@ -103,8 +103,8 @@ class GraphDb {
void CollectGarbage();
gid::GidGenerator &VertexGenerator() { return vertex_generator_; }
gid::GidGenerator &EdgeGenerator() { return edge_generator_; }
gid::Generator &VertexGenerator() { return vertex_generator_; }
gid::Generator &EdgeGenerator() { return edge_generator_; }
/** When this is false, no new transactions should be created. */
std::atomic<bool> is_accepting_transactions_{true};
@ -121,8 +121,8 @@ class GraphDb {
int worker_id_{0};
gid::GidGenerator vertex_generator_{worker_id_};
gid::GidGenerator edge_generator_{worker_id_};
gid::Generator vertex_generator_{worker_id_};
gid::Generator edge_generator_{worker_id_};
// main storage for the graph
ConcurrentMap<gid::Gid, mvcc::VersionList<Vertex> *> vertices_;

View File

@ -47,21 +47,15 @@ bool GraphDbAccessor::should_abort() const {
durability::WriteAheadLog &GraphDbAccessor::wal() { return db_.wal_; }
VertexAccessor GraphDbAccessor::InsertVertex(
std::experimental::optional<gid::Gid> gid) {
std::experimental::optional<gid::Gid> requested_gid) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
std::experimental::optional<uint64_t> next_id;
if (gid) {
CHECK(static_cast<int>(gid::WorkerId(*gid)) == db_.worker_id_)
<< "Attempting to set incompatible worker id";
next_id = gid::LocalId(*gid);
}
auto gid = db_.vertex_generator_.Next(requested_gid);
auto vertex_vlist = new mvcc::VersionList<Vertex>(*transaction_, gid);
auto id = db_.vertex_generator_.Next(next_id);
auto vertex_vlist = new mvcc::VersionList<Vertex>(*transaction_, id);
bool success = db_.vertices_.access().insert(id, vertex_vlist).second;
CHECK(success) << "Attempting to insert a vertex with an existing ID: " << id;
bool success = db_.vertices_.access().insert(gid, vertex_vlist).second;
CHECK(success) << "Attempting to insert a vertex with an existing GID: "
<< gid;
db_.wal_.Emplace(database::StateDelta::CreateVertex(transaction_->id_,
vertex_vlist->gid_));
return VertexAccessor(vertex_vlist, *this);
@ -305,7 +299,7 @@ void GraphDbAccessor::DetachRemoveVertex(VertexAccessor &vertex_accessor) {
EdgeAccessor GraphDbAccessor::InsertEdge(
VertexAccessor &from, VertexAccessor &to, GraphDbTypes::EdgeType edge_type,
std::experimental::optional<gid::Gid> gid) {
std::experimental::optional<gid::Gid> requested_gid) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
// An edge is created on the worker of it's "from" vertex.
if (!from.is_local()) {
@ -315,21 +309,15 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
// EdgeAccessor and return it. The remote InsertEdge(...) will be calling
// remote Connect(...) if "to" is not local to it.
}
std::experimental::optional<uint64_t> next_id;
if (gid) {
CHECK(static_cast<int>(gid::WorkerId(*gid)) == db_.worker_id_)
<< "Attempting to set incompatible worker id";
next_id = gid::LocalId(*gid);
}
auto id = db_.edge_generator_.Next(next_id);
auto gid = db_.edge_generator_.Next(requested_gid);
auto edge_vlist = new mvcc::VersionList<Edge>(
*transaction_, id, from.address(), to.address(), edge_type);
*transaction_, gid, from.address(), to.address(), edge_type);
// We need to insert edge_vlist to edges_ before calling update since update
// can throw and edge_vlist will not be garbage collected if it is not in
// edges_ skiplist.
bool success = db_.edges_.access().insert(id, edge_vlist).second;
CHECK(success) << "Attempting to insert an edge with an existing ID: " << id;
bool success = db_.edges_.access().insert(gid, edge_vlist).second;
CHECK(success) << "Attempting to insert an edge with an existing GID: "
<< gid;
// ensure that the "from" accessor has the latest version
from.SwitchNew();

View File

@ -113,12 +113,12 @@ class GraphDbAccessor {
* Always perform recovery only once, immediately when the database is
* created, before any transactional ops start.
*
* @param gid The desired GID. Should only be provided
* when recovering from durability, and should `belong` to this worker.
* @param requested_gid The requested GID. Should only be provided when
* recovering from durability.
* @return See above.
*/
VertexAccessor InsertVertex(
std::experimental::optional<gid::Gid> gid = std::experimental::nullopt);
VertexAccessor InsertVertex(std::experimental::optional<gid::Gid>
requested_gid = std::experimental::nullopt);
/**
* Removes the vertex of the given accessor. If the vertex has any outgoing or
@ -309,13 +309,14 @@ class GraphDbAccessor {
* @param from The 'from' vertex.
* @param to The 'to' vertex'
* @param type Edge type.
* @param gid The desired GID. Should only be provided when
* recovering from durability and should `belong` to this worker.
* @param requested_gid The requested GID. Should only be provided when
* recovering from durability.
* @return An accessor to the edge.
*/
EdgeAccessor InsertEdge(
VertexAccessor &from, VertexAccessor &to, GraphDbTypes::EdgeType type,
std::experimental::optional<gid::Gid> gid = std::experimental::nullopt);
EdgeAccessor InsertEdge(VertexAccessor &from, VertexAccessor &to,
GraphDbTypes::EdgeType type,
std::experimental::optional<gid::Gid> requested_gid =
std::experimental::nullopt);
/**
* Removes an edge from the graph. Parameters can indicate if the edge should
@ -612,7 +613,8 @@ class GraphDbAccessor {
/** Casts the DB's engine to MasterEngine and returns it. If the DB's engine
* is RemoteEngine, this function will crash MG. */
tx::MasterEngine &MasterEngine() {
auto *master_engine = dynamic_cast<tx::MasterEngine *>(db_.tx_engine_.get());
auto *master_engine =
dynamic_cast<tx::MasterEngine *>(db_.tx_engine_.get());
DCHECK(master_engine) << "Asked for MasterEngine on distributed worker";
return *master_engine;
}

View File

@ -9,15 +9,18 @@ namespace storage {
/**
* A data structure that tracks a Vertex/Edge location (address) that's either
* local or remote. The remote address is a global id, while the local address
* is simply the memory address in the current local process. Both types of
* address are stored in the same storage space, so an Address always takes as
* much memory as a pointer does.
* local or remote. The remote address is a global id alongside the id of the
* worker on which it's currently stored, while the local address is simply the
* memory address in the current local process. Both types of address are stored
* in the same storage space, so an Address always takes as much memory as a
* pointer does.
*
* The memory layout for storage is on x64 architecture is the following:
* - the lowest bit stores 0 if address is local and 1 if address is global
* - if the address is local all 64 bits store the local memory address
* - if the address is global then most imporant 63 bits store the global id
* - if the address is global then lowest bit stores 1. the following
* kWorkerIdSize bits contain the worker id and the final (upper) 64 - 1 -
* kWorkerIdSize bits contain the global id.
*
* @tparam TRecord - Type of record this address points to. Either Vertex or
* Edge.
@ -25,10 +28,11 @@ namespace storage {
template <typename TLocalObj>
class Address {
using Storage = uint64_t;
static constexpr uintptr_t kTypeMaskSize{1};
static constexpr uintptr_t kTypeMask{(1ULL << kTypeMaskSize) - 1};
static constexpr uintptr_t kLocal{0};
static constexpr uintptr_t kRemote{1};
static constexpr uint64_t kTypeMaskSize{1};
static constexpr uint64_t kTypeMask{(1ULL << kTypeMaskSize) - 1};
static constexpr uint64_t kWorkerIdSize{gid::kWorkerIdSize};
static constexpr uint64_t kLocal{0};
static constexpr uint64_t kRemote{1};
public:
// Constructor for local Address.
@ -38,13 +42,17 @@ class Address {
storage_ = ptr_no_type | kLocal;
}
// Constructor for remote Address.
Address(gid::Gid global_id) {
CHECK(global_id < (1ULL << (sizeof(Storage) * 8 - kTypeMaskSize)))
// Constructor for remote Address, takes worker_id which specifies the worker
// that is storing that vertex/edge
Address(gid::Gid global_id, int worker_id) {
CHECK(global_id <
(1ULL << (sizeof(Storage) * 8 - kWorkerIdSize - kTypeMaskSize)))
<< "Too large global id";
CHECK(worker_id < (1ULL << kWorkerIdSize)) << "Too larger worker id";
storage_ = kRemote;
storage_ |= global_id << kTypeMaskSize;
storage_ |= global_id << (kTypeMaskSize + kWorkerIdSize);
storage_ |= worker_id << kTypeMaskSize;
}
bool is_local() const { return (storage_ & kTypeMask) == kLocal; }
@ -57,7 +65,13 @@ class Address {
gid::Gid global_id() const {
DCHECK(is_remote()) << "Attempting to get global ID from local address";
return storage_ >> kTypeMaskSize;
return storage_ >> (kTypeMaskSize + kWorkerIdSize);
}
/// Returns worker id where the object is located
int worker_id() const {
DCHECK(is_remote()) << "Attempting to get worker ID from local address";
return (storage_ >> 1) & ((1ULL << kWorkerIdSize) - 1);
}
bool operator==(const Address<TLocalObj> &other) const {

View File

@ -18,19 +18,12 @@ using Gid = uint64_t;
static constexpr std::size_t kWorkerIdSize{10};
/// Returns worker id from global id.
static inline Gid WorkerId(Gid gid) {
return gid & ((1ULL << kWorkerIdSize) - 1);
}
/// Returns `local` id from global id.
static inline Gid LocalId(Gid gid) { return gid >> kWorkerIdSize; }
static inline uint64_t LocalId(Gid gid) { return gid >> kWorkerIdSize; }
/// Returns global id from worker and local ids.
static inline Gid Create(uint64_t worker_id, uint64_t local_id) {
CHECK(worker_id < (1ULL << kWorkerIdSize));
CHECK(local_id < (1ULL << (sizeof(Gid) * 8 - kWorkerIdSize)));
return worker_id | (local_id << kWorkerIdSize);
/// Returns id of the worker that created this gid.
static inline int CreatorWorker(Gid gid) {
return gid & ((1ULL << kWorkerIdSize) - 1);
}
/**
@ -41,21 +34,26 @@ static inline Gid Create(uint64_t worker_id, uint64_t local_id) {
* larger than the id set by SetId, we can ensure that by not allowing calls to
* SetId after Next which generated new id (incremented internal id counter).
*/
class GidGenerator {
class Generator {
public:
GidGenerator(int worker_id) : worker_id_(worker_id) {}
Generator(int worker_id) : worker_id_(worker_id) {}
/**
* Returns a new globally unique identifier.
* @param local_id - force local id instead of generating a new one
* Returns a globally unique identifier.
*
* @param requested_gid - The desired gid. If given, it will be returned and
* this generator's state updated accordingly.
*/
gid::Gid Next(std::experimental::optional<Gid> local_id) {
auto id = local_id ? *local_id : next_local_id_++;
if (local_id) {
utils::EnsureAtomicGe(next_local_id_, id + 1);
gid::Gid Next(std::experimental::optional<gid::Gid> requested_gid =
std::experimental::nullopt) {
if (requested_gid) {
if (gid::CreatorWorker(*requested_gid) == worker_id_)
utils::EnsureAtomicGe(next_local_id_, gid::LocalId(*requested_gid) + 1);
return *requested_gid;
} else {
generated_id_ = true;
return worker_id_ | next_local_id_++ << kWorkerIdSize;
}
return gid::Create(worker_id_, id);
}
/// Returns number of locally generated ids

View File

@ -130,8 +130,8 @@ class Writer {
}
private:
gid::GidGenerator node_generator_{0};
gid::GidGenerator edge_generator_{0};
gid::Generator node_generator_{0};
gid::Generator edge_generator_{0};
HashedFileWriter buffer_;
communication::bolt::BaseEncoder<HashedFileWriter> encoder_{buffer_};
};

View File

@ -306,8 +306,9 @@ class Durability : public ::testing::Test {
// Tests wal encoder to encode correctly non-CRUD deltas, and that all deltas
// are written in the correct order
TEST_F(Durability, WalEncoding) {
auto gid0 = gid::Create(0, 0);
auto gid1 = gid::Create(0, 1);
gid::Generator generator(0);
auto gid0 = generator.Next();
auto gid1 = generator.Next();
{
auto config = DbConfig();
config.durability_enabled = true;
@ -372,26 +373,30 @@ TEST_F(Durability, WalEncoding) {
}
TEST_F(Durability, SnapshotEncoding) {
gid::Generator generator(0);
auto gid0 = generator.Next();
auto gid1 = generator.Next();
auto gid2 = generator.Next();
{
GraphDb db{DbConfig()};
GraphDbAccessor dba(db);
auto v0 = dba.InsertVertex();
ASSERT_EQ(v0.gid(), gid::Create(0, 0));
ASSERT_EQ(v0.gid(), gid0);
v0.add_label(dba.Label("l0"));
v0.PropsSet(dba.Property("p0"), 42);
auto v1 = dba.InsertVertex();
ASSERT_EQ(v1.gid(), gid::Create(0, 1));
ASSERT_EQ(v1.gid(), gid1);
v1.add_label(dba.Label("l0"));
v1.add_label(dba.Label("l1"));
auto v2 = dba.InsertVertex();
ASSERT_EQ(v2.gid(), gid::Create(0, 2));
ASSERT_EQ(v2.gid(), gid2);
v2.PropsSet(dba.Property("p0"), true);
v2.PropsSet(dba.Property("p1"), "Johnny");
auto e0 = dba.InsertEdge(v0, v1, dba.EdgeType("et0"));
ASSERT_EQ(e0.gid(), gid::Create(0, 0));
ASSERT_EQ(e0.gid(), gid0);
e0.PropsSet(dba.Property("p0"), std::vector<PropertyValue>{1, 2, 3});
auto e1 = dba.InsertEdge(v2, v1, dba.EdgeType("et1"));
ASSERT_EQ(e1.gid(), gid::Create(0, 1));
ASSERT_EQ(e1.gid(), gid1);
dba.BuildIndex(dba.Label("l1"), dba.Property("p1"));
dba.Commit();
MakeSnapshot(db);
@ -443,9 +448,6 @@ TEST_F(Durability, SnapshotEncoding) {
auto &vertex = dv.ValueVertex();
decoded_vertices.emplace(vertex.id, vertex);
}
auto gid0 = gid::Create(0, 0);
auto gid1 = gid::Create(0, 1);
auto gid2 = gid::Create(0, 2);
ASSERT_EQ(decoded_vertices.size(), 3);
ASSERT_EQ(decoded_vertices[gid0].labels.size(), 1);
EXPECT_EQ(decoded_vertices[gid0].labels[0], "l0");

46
tests/unit/gid.cpp Normal file
View File

@ -0,0 +1,46 @@
#include "gtest/gtest.h"
#include "storage/gid.hpp"
TEST(Generator, GeneratedOnCorrectWorker) {
int my_worker = 775;
gid::Generator generator(my_worker);
auto gid = generator.Next();
EXPECT_EQ(gid::CreatorWorker(gid), my_worker);
}
TEST(Generator, DontReuseIds) {
gid::Generator generator(0);
auto gid = generator.Next();
auto gid_gt = generator.Next();
EXPECT_NE(gid_gt, gid);
}
TEST(Generator, FromOtherGenerator) {
gid::Generator generator_0(0);
gid::Generator generator_1(1);
for (int i = 0; i < 10; ++i) generator_1.Next();
auto gid1 = generator_1.Next();
auto gid = generator_0.Next(gid1);
EXPECT_EQ(gid::CreatorWorker(gid), 1);
EXPECT_EQ(gid, gid1);
}
TEST(Generator, FromSameGenerator) {
gid::Generator generator(0);
std::vector<gid::Gid> gids;
for (int i = 0; i < 10; ++i) gids.push_back(generator.Next());
for (int i = 0; i < 10; ++i) EXPECT_EQ(gids[i], generator.Next(gids[i]));
}
TEST(Generator, AdvanceGenerator) {
gid::Generator generator_00(0);
gid::Generator generator_01(0);
for (int i = 0; i < 10; ++i) generator_00.Next();
auto gid00 = generator_00.Next();
auto gid01 = generator_01.Next(gid00);
EXPECT_EQ(gid00, gid01);
auto gid10 = generator_00.Next();
auto gid11 = generator_01.Next();
EXPECT_EQ(gid10, gid11);
}

View File

@ -15,16 +15,17 @@ auto Count(TIterable iterable) {
TEST(GraphDbAccessorTest, InsertVertex) {
GraphDb db;
GraphDbAccessor accessor(db);
gid::Generator generator(0);
EXPECT_EQ(Count(accessor.Vertices(false)), 0);
EXPECT_EQ(accessor.InsertVertex().gid(), 0);
EXPECT_EQ(accessor.InsertVertex().gid(), generator.Next());
EXPECT_EQ(Count(accessor.Vertices(false)), 0);
EXPECT_EQ(Count(accessor.Vertices(true)), 1);
accessor.AdvanceCommand();
EXPECT_EQ(Count(accessor.Vertices(false)), 1);
EXPECT_EQ(accessor.InsertVertex().gid(), gid::Create(0, 1));
EXPECT_EQ(accessor.InsertVertex().gid(), generator.Next());
EXPECT_EQ(Count(accessor.Vertices(false)), 1);
EXPECT_EQ(Count(accessor.Vertices(true)), 2);
accessor.AdvanceCommand();

View File

@ -6,7 +6,8 @@
TEST(StateDelta, CreateVertex) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
gid::Generator generator(0);
auto gid0 = generator.Next();
{
GraphDbAccessor dba(db);
auto delta = database::StateDelta::CreateVertex(dba.transaction_id(), gid0);
@ -22,7 +23,8 @@ TEST(StateDelta, CreateVertex) {
TEST(StateDelta, RemoveVertex) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
gid::Generator generator(0);
auto gid0 = generator.Next();
{
GraphDbAccessor dba(db);
dba.InsertVertex(gid0);
@ -43,9 +45,10 @@ TEST(StateDelta, RemoveVertex) {
TEST(StateDelta, CreateEdge) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
auto gid1 = gid::Create(0, 1);
auto gid2 = gid::Create(0, 2);
gid::Generator generator(0);
auto gid0 = generator.Next();
auto gid1 = generator.Next();
auto gid2 = generator.Next();
{
GraphDbAccessor dba(db);
dba.InsertVertex(gid0);
@ -68,9 +71,10 @@ TEST(StateDelta, CreateEdge) {
TEST(StateDelta, RemoveEdge) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
auto gid1 = gid::Create(0, 1);
auto gid2 = gid::Create(0, 2);
gid::Generator generator(0);
auto gid0 = generator.Next();
auto gid1 = generator.Next();
auto gid2 = generator.Next();
{
GraphDbAccessor dba(db);
auto v0 = dba.InsertVertex(gid0);
@ -93,7 +97,8 @@ TEST(StateDelta, RemoveEdge) {
TEST(StateDelta, AddLabel) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
gid::Generator generator(0);
auto gid0 = generator.Next();
{
GraphDbAccessor dba(db);
dba.InsertVertex(gid0);
@ -118,7 +123,8 @@ TEST(StateDelta, AddLabel) {
TEST(StateDelta, RemoveLabel) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
gid::Generator generator(0);
auto gid0 = generator.Next();
{
GraphDbAccessor dba(db);
auto vertex = dba.InsertVertex(gid0);
@ -143,7 +149,8 @@ TEST(StateDelta, RemoveLabel) {
TEST(StateDelta, SetPropertyVertex) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
gid::Generator generator(0);
auto gid0 = generator.Next();
{
GraphDbAccessor dba(db);
dba.InsertVertex(gid0);
@ -167,9 +174,10 @@ TEST(StateDelta, SetPropertyVertex) {
TEST(StateDelta, SetPropertyEdge) {
GraphDb db;
auto gid0 = gid::Create(0, 0);
auto gid1 = gid::Create(0, 1);
auto gid2 = gid::Create(0, 2);
gid::Generator generator(0);
auto gid0 = generator.Next();
auto gid1 = generator.Next();
auto gid2 = generator.Next();
{
GraphDbAccessor dba(db);
auto v0 = dba.InsertVertex(gid0);

View File

@ -23,13 +23,14 @@ TEST(Address, CopyCompare) {
}
TEST(Address, Global) {
uint64_t worker_id{13};
int worker_id{17};
uint64_t local_id{31};
auto global_id = gid::Create(worker_id, local_id);
Address<int> address{global_id};
gid::Generator generator(13);
auto global_id = generator.Next(local_id);
Address<int> address{global_id, worker_id};
EXPECT_TRUE(address.is_remote());
EXPECT_FALSE(address.is_local());
EXPECT_EQ(gid::WorkerId(address.global_id()), worker_id);
EXPECT_EQ(address.global_id(), global_id);
EXPECT_EQ(address.worker_id(), worker_id);
}

View File

@ -111,13 +111,13 @@ class MemgraphNodeIdMap {
}
int64_t Insert(const NodeId &node_id) {
gid::Gid id = gid::Create(0, mg_id_++);
node_id_to_mg_[node_id] = id;
return id;
auto gid = generator_.Next();
node_id_to_mg_[node_id] = gid;
return gid;
}
private:
int64_t mg_id_ = 0;
gid::Generator generator_{0};
std::unordered_map<NodeId, int64_t> node_id_to_mg_;
};
@ -317,22 +317,25 @@ void WriteRelationshipsRow(
encoder.WriteMap(properties);
}
void ConvertRelationships(
int ConvertRelationships(
const std::string &relationships_path, const MemgraphNodeIdMap &node_id_map,
communication::bolt::BaseEncoder<HashedFileWriter> &encoder,
int64_t &next_relationship_id) {
gid::Generator &relationship_id_generator) {
std::ifstream relationships_file(relationships_path);
CHECK(relationships_file)
<< fmt::format("Unable to open '{}'", relationships_path);
auto fields = ReadHeader(relationships_file);
auto row = ReadRow(relationships_file);
int64_t relationships = 0;
while (!row.empty()) {
CHECK_EQ(row.size(), fields.size())
<< "Expected as many values as there are header fields";
WriteRelationshipsRow(fields, row, node_id_map,
gid::Create(0, next_relationship_id++), encoder);
relationship_id_generator.Next(), encoder);
++relationships;
row = ReadRow(relationships_file);
}
return relationships;
}
void Convert(const std::vector<std::string> &nodes,
@ -342,7 +345,8 @@ void Convert(const std::vector<std::string> &nodes,
HashedFileWriter buffer(output_path);
communication::bolt::BaseEncoder<HashedFileWriter> encoder(buffer);
int64_t node_count = 0;
int64_t next_relationship_id = 0;
int64_t edge_count = 0;
gid::Generator relationship_id_generator(0);
MemgraphNodeIdMap node_id_map;
// Snapshot file has the following contents in order:
// 1) Magic number.
@ -371,11 +375,11 @@ void Convert(const std::vector<std::string> &nodes,
node_count += ConvertNodes(nodes_file, node_id_map, encoder);
}
for (const auto &relationships_file : relationships) {
ConvertRelationships(relationships_file, node_id_map, encoder,
next_relationship_id);
edge_count += ConvertRelationships(relationships_file, node_id_map,
encoder, relationship_id_generator);
}
buffer.WriteValue(node_count);
buffer.WriteValue(next_relationship_id);
buffer.WriteValue(edge_count);
buffer.WriteValue(buffer.hash());
} catch (const std::ios_base::failure &) {
// Only HashedFileWriter sets the underlying fstream to throw.