Refactor global ids and prepare for distributed
Summary: Change ids to global ids Fix tests Reviewers: florijan, buda Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1019
This commit is contained in:
parent
e218bc1c69
commit
3ddbcad0d9
@ -35,8 +35,10 @@ void GenerateGraph(GraphDb &db) {
|
||||
// Randomize the sequence of IDs of created vertices and edges to simulate
|
||||
// real-world lack of locality.
|
||||
auto make_id_vector = [](size_t size) {
|
||||
std::vector<int64_t> ids(size);
|
||||
for (size_t i = 0; i < size; ++i) ids[i] = i;
|
||||
gid::GidGenerator generator{0};
|
||||
std::vector<gid::Gid> ids(size);
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
ids[i] = generator.Next(std::experimental::nullopt);
|
||||
std::random_shuffle(ids.begin(), ids.end());
|
||||
return ids;
|
||||
};
|
||||
@ -61,7 +63,7 @@ void GenerateGraph(GraphDb &db) {
|
||||
for (int j = i * batch_size; j < (i + 1) * batch_size; ++j) {
|
||||
auto vertex = dba.InsertVertex(vertex_ids[j]);
|
||||
vertex.add_label(label);
|
||||
vertex.PropsSet(property, vertex_ids[j]);
|
||||
vertex.PropsSet(property, static_cast<int64_t>(vertex_ids[j]));
|
||||
vertices_lock.lock();
|
||||
vertices.emplace_back(vertex);
|
||||
vertices_lock.unlock();
|
||||
@ -97,7 +99,7 @@ void GenerateGraph(GraphDb &db) {
|
||||
auto EdgeIteration(GraphDb &db) {
|
||||
GraphDbAccessor dba{db};
|
||||
int64_t sum{0};
|
||||
for (auto edge : dba.Edges(false)) sum += edge.from().id() + edge.to().id();
|
||||
for (auto edge : dba.Edges(false)) sum += edge.from().gid() + edge.to().gid();
|
||||
return sum;
|
||||
}
|
||||
|
||||
@ -105,7 +107,7 @@ auto VertexIteration(GraphDb &db) {
|
||||
GraphDbAccessor dba{db};
|
||||
int64_t sum{0};
|
||||
for (auto v : dba.Vertices(false))
|
||||
for (auto e : v.out()) sum += e.id() + e.to().id();
|
||||
for (auto e : v.out()) sum += e.gid() + e.to().gid();
|
||||
return sum;
|
||||
}
|
||||
|
||||
@ -113,7 +115,7 @@ auto ConnectedComponentsEdges(GraphDb &db) {
|
||||
UnionFind<int64_t> connectivity{FLAGS_vertex_count};
|
||||
GraphDbAccessor dba{db};
|
||||
for (auto edge : dba.Edges(false))
|
||||
connectivity.Connect(edge.from().id(), edge.to().id());
|
||||
connectivity.Connect(edge.from().gid(), edge.to().gid());
|
||||
return connectivity.Size();
|
||||
}
|
||||
|
||||
@ -122,7 +124,7 @@ auto ConnectedComponentsVertices(GraphDb &db) {
|
||||
GraphDbAccessor dba{db};
|
||||
for (auto from : dba.Vertices(false)) {
|
||||
for (auto out_edge : from.out())
|
||||
connectivity.Connect(from.id(), out_edge.to().id());
|
||||
connectivity.Connect(from.gid(), out_edge.to().gid());
|
||||
}
|
||||
return connectivity.Size();
|
||||
}
|
||||
@ -148,7 +150,7 @@ auto ConnectedComponentsVerticesParallel(GraphDb &db) {
|
||||
utils::MakeBoundExclusive(bounds[i + 1]), false)) {
|
||||
for (auto out_edge : from.out()) {
|
||||
std::lock_guard<SpinLock> lock{connectivity_lock};
|
||||
connectivity.Connect(from.id(), out_edge.to().id());
|
||||
connectivity.Connect(from.gid(), out_edge.to().gid());
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -163,20 +165,16 @@ auto Expansion(GraphDb &db) {
|
||||
std::stack<VertexAccessor> expansion_stack;
|
||||
GraphDbAccessor dba{db};
|
||||
for (auto v : dba.Vertices(false)) {
|
||||
if (component_ids[v.id()] != -1)
|
||||
continue;
|
||||
if (component_ids[v.gid()] != -1) continue;
|
||||
auto component_id = next_component_id++;
|
||||
expansion_stack.push(v);
|
||||
while (!expansion_stack.empty()) {
|
||||
auto next_v = expansion_stack.top();
|
||||
expansion_stack.pop();
|
||||
if (component_ids[next_v.id()] != -1)
|
||||
continue;
|
||||
component_ids[next_v.id()] = component_id;
|
||||
for (auto e : next_v.out())
|
||||
expansion_stack.push(e.to());
|
||||
for (auto e : next_v.in())
|
||||
expansion_stack.push(e.from());
|
||||
if (component_ids[next_v.gid()] != -1) continue;
|
||||
component_ids[next_v.gid()] = component_id;
|
||||
for (auto e : next_v.out()) expansion_stack.push(e.to());
|
||||
for (auto e : next_v.in()) expansion_stack.push(e.from());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,7 +19,7 @@ class DecodedValue;
|
||||
* The decoder writes data into this structure.
|
||||
*/
|
||||
struct DecodedVertex {
|
||||
int64_t id;
|
||||
gid::Gid id;
|
||||
std::vector<std::string> labels;
|
||||
std::map<std::string, DecodedValue> properties;
|
||||
};
|
||||
@ -29,7 +29,7 @@ struct DecodedVertex {
|
||||
* The decoder writes data into this structure.
|
||||
*/
|
||||
struct DecodedEdge {
|
||||
int64_t id;
|
||||
gid::Gid id;
|
||||
int64_t from;
|
||||
int64_t to;
|
||||
std::string type;
|
||||
@ -41,7 +41,7 @@ struct DecodedEdge {
|
||||
* The decoder writes data into this structure.
|
||||
*/
|
||||
struct DecodedUnboundedEdge {
|
||||
int64_t id;
|
||||
gid::Gid id;
|
||||
std::string type;
|
||||
std::map<std::string, DecodedValue> properties;
|
||||
};
|
||||
|
@ -40,7 +40,7 @@ class BaseEncoder : public PrimitiveEncoder<Buffer> {
|
||||
void WriteVertex(const VertexAccessor &vertex) {
|
||||
this->WriteRAW(underlying_cast(Marker::TinyStruct) + 3);
|
||||
this->WriteRAW(underlying_cast(Signature::Node));
|
||||
WriteUInt(vertex.id());
|
||||
WriteUInt(vertex.gid());
|
||||
|
||||
// write labels
|
||||
const auto &labels = vertex.labels();
|
||||
@ -62,10 +62,10 @@ class BaseEncoder : public PrimitiveEncoder<Buffer> {
|
||||
this->WriteRAW(underlying_cast(unbound ? Signature::UnboundRelationship
|
||||
: Signature::Relationship));
|
||||
|
||||
WriteUInt(edge.id());
|
||||
WriteUInt(edge.gid());
|
||||
if (!unbound) {
|
||||
WriteUInt(edge.from().id());
|
||||
WriteUInt(edge.to().id());
|
||||
WriteUInt(edge.from().gid());
|
||||
WriteUInt(edge.to().gid());
|
||||
}
|
||||
|
||||
// write type
|
||||
|
@ -1,7 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "cppitertools/filter.hpp"
|
||||
#include "cppitertools/imap.hpp"
|
||||
|
||||
@ -14,10 +12,10 @@
|
||||
#include "durability/wal.hpp"
|
||||
#include "mvcc/version_list.hpp"
|
||||
#include "storage/concurrent_id_mapper.hpp"
|
||||
#include "storage/concurrent_id_mapper_master.hpp"
|
||||
#include "storage/deferred_deleter.hpp"
|
||||
#include "storage/edge.hpp"
|
||||
#include "storage/garbage_collector.hpp"
|
||||
#include "storage/concurrent_id_mapper_master.hpp"
|
||||
#include "storage/vertex.hpp"
|
||||
#include "transactions/engine.hpp"
|
||||
#include "utils/scheduler.hpp"
|
||||
@ -96,17 +94,19 @@ class GraphDb {
|
||||
* Otherwise a WorkerEngine instance. */
|
||||
std::unique_ptr<tx::Engine> tx_engine_;
|
||||
|
||||
std::atomic<int64_t> next_vertex_id_{0};
|
||||
std::atomic<int64_t> next_edge_id_{0};
|
||||
int worker_id_{0};
|
||||
|
||||
gid::GidGenerator vertex_generator_{worker_id_};
|
||||
gid::GidGenerator edge_generator_{worker_id_};
|
||||
|
||||
// main storage for the graph
|
||||
ConcurrentMap<int64_t, mvcc::VersionList<Vertex> *> vertices_;
|
||||
ConcurrentMap<int64_t, mvcc::VersionList<Edge> *> edges_;
|
||||
ConcurrentMap<gid::Gid, mvcc::VersionList<Vertex> *> vertices_;
|
||||
ConcurrentMap<gid::Gid, mvcc::VersionList<Edge> *> edges_;
|
||||
|
||||
// Garbage collectors
|
||||
GarbageCollector<ConcurrentMap<int64_t, mvcc::VersionList<Vertex> *>, Vertex>
|
||||
GarbageCollector<ConcurrentMap<gid::Gid, mvcc::VersionList<Vertex> *>, Vertex>
|
||||
gc_vertices_;
|
||||
GarbageCollector<ConcurrentMap<int64_t, mvcc::VersionList<Edge> *>, Edge>
|
||||
GarbageCollector<ConcurrentMap<gid::Gid, mvcc::VersionList<Edge> *>, Edge>
|
||||
gc_edges_;
|
||||
|
||||
// Deleters for not relevant records
|
||||
@ -132,9 +132,7 @@ class GraphDb {
|
||||
KeyIndex<GraphDbTypes::Label, Vertex> labels_index_;
|
||||
LabelPropertyIndex label_property_index_;
|
||||
|
||||
/**
|
||||
* Set of transactions ids which are building indexes currently
|
||||
*/
|
||||
// Set of transactions ids which are building indexes currently
|
||||
ConcurrentSet<tx::transaction_id_t> index_build_tx_in_progress_;
|
||||
|
||||
durability::WriteAheadLog wal_;
|
||||
@ -146,6 +144,6 @@ class GraphDb {
|
||||
// time to stop their execution.
|
||||
Scheduler transaction_killer_;
|
||||
|
||||
/** DB level global counters, used in the "counter" function. */
|
||||
// DB level global counters, used in the "counter" function.
|
||||
ConcurrentMap<std::string, std::atomic<int64_t>> counters_;
|
||||
};
|
||||
|
@ -53,26 +53,29 @@ bool GraphDbAccessor::should_abort() const {
|
||||
durability::WriteAheadLog &GraphDbAccessor::wal() { return db_.wal_; }
|
||||
|
||||
VertexAccessor GraphDbAccessor::InsertVertex(
|
||||
std::experimental::optional<int64_t> opt_id) {
|
||||
std::experimental::optional<gid::Gid> gid) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
|
||||
auto id = opt_id ? *opt_id : db_.next_vertex_id_++;
|
||||
if (opt_id) {
|
||||
utils::EnsureAtomicGe(db_.next_vertex_id_, id + 1);
|
||||
std::experimental::optional<uint64_t> next_id;
|
||||
if (gid) {
|
||||
CHECK(static_cast<int>(gid::WorkerId(*gid)) == db_.worker_id_)
|
||||
<< "Attempting to set incompatible worker id";
|
||||
next_id = gid::LocalId(*gid);
|
||||
}
|
||||
|
||||
auto id = db_.vertex_generator_.Next(next_id);
|
||||
auto vertex_vlist = new mvcc::VersionList<Vertex>(*transaction_, id);
|
||||
|
||||
bool success = db_.vertices_.access().insert(id, vertex_vlist).second;
|
||||
CHECK(success) << "Attempting to insert a vertex with an existing ID: " << id;
|
||||
db_.wal_.CreateVertex(transaction_->id_, vertex_vlist->id_);
|
||||
db_.wal_.CreateVertex(transaction_->id_, vertex_vlist->gid_);
|
||||
return VertexAccessor(*vertex_vlist, *this);
|
||||
}
|
||||
|
||||
std::experimental::optional<VertexAccessor> GraphDbAccessor::FindVertex(
|
||||
int64_t id, bool current_state) {
|
||||
gid::Gid gid, bool current_state) {
|
||||
auto collection_accessor = db_.vertices_.access();
|
||||
auto found = collection_accessor.find(id);
|
||||
auto found = collection_accessor.find(gid);
|
||||
if (found == collection_accessor.end()) return std::experimental::nullopt;
|
||||
VertexAccessor record_accessor(*found->second, *this);
|
||||
if (!Visible(record_accessor, current_state))
|
||||
@ -81,9 +84,9 @@ std::experimental::optional<VertexAccessor> GraphDbAccessor::FindVertex(
|
||||
}
|
||||
|
||||
std::experimental::optional<EdgeAccessor> GraphDbAccessor::FindEdge(
|
||||
int64_t id, bool current_state) {
|
||||
gid::Gid gid, bool current_state) {
|
||||
auto collection_accessor = db_.edges_.access();
|
||||
auto found = collection_accessor.find(id);
|
||||
auto found = collection_accessor.find(gid);
|
||||
if (found == collection_accessor.end()) return std::experimental::nullopt;
|
||||
EdgeAccessor record_accessor(*found->second, *this);
|
||||
if (!Visible(record_accessor, current_state))
|
||||
@ -249,7 +252,7 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor) {
|
||||
if (vertex_accessor.out_degree() > 0 || vertex_accessor.in_degree() > 0)
|
||||
return false;
|
||||
|
||||
db_.wal_.RemoveVertex(transaction_->id_, vertex_accessor.vlist_->id_);
|
||||
db_.wal_.RemoveVertex(transaction_->id_, vertex_accessor.vlist_->gid_);
|
||||
|
||||
vertex_accessor.vlist_->remove(vertex_accessor.current_, *transaction_);
|
||||
return true;
|
||||
@ -274,13 +277,17 @@ void GraphDbAccessor::DetachRemoveVertex(VertexAccessor &vertex_accessor) {
|
||||
|
||||
EdgeAccessor GraphDbAccessor::InsertEdge(
|
||||
VertexAccessor &from, VertexAccessor &to, GraphDbTypes::EdgeType edge_type,
|
||||
std::experimental::optional<int64_t> opt_id) {
|
||||
std::experimental::optional<gid::Gid> gid) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
auto id = opt_id ? *opt_id : db_.next_edge_id_++;
|
||||
if (opt_id) {
|
||||
utils::EnsureAtomicGe(db_.next_edge_id_, id + 1);
|
||||
|
||||
std::experimental::optional<uint64_t> next_id;
|
||||
if (gid) {
|
||||
CHECK(static_cast<int>(gid::WorkerId(*gid)) == db_.worker_id_)
|
||||
<< "Attempting to set incompatible worker id";
|
||||
next_id = gid::LocalId(*gid);
|
||||
}
|
||||
|
||||
auto id = db_.edge_generator_.Next(next_id);
|
||||
auto edge_vlist = new mvcc::VersionList<Edge>(*transaction_, id, from.vlist_,
|
||||
to.vlist_, edge_type);
|
||||
// We need to insert edge_vlist to edges_ before calling update since update
|
||||
@ -298,8 +305,8 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
|
||||
to.SwitchNew();
|
||||
to.update().in_.emplace(from.vlist_, edge_vlist, edge_type);
|
||||
|
||||
db_.wal_.CreateEdge(transaction_->id_, edge_vlist->id_, from.vlist_->id_,
|
||||
to.vlist_->id_, EdgeTypeName(edge_type));
|
||||
db_.wal_.CreateEdge(transaction_->id_, edge_vlist->gid_, from.vlist_->gid_,
|
||||
to.vlist_->gid_, EdgeTypeName(edge_type));
|
||||
return EdgeAccessor(*edge_vlist, *this, from.vlist_, to.vlist_, edge_type);
|
||||
}
|
||||
|
||||
@ -322,7 +329,7 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge_accessor,
|
||||
edge_accessor.to().update().in_.RemoveEdge(edge_accessor.vlist_);
|
||||
edge_accessor.vlist_->remove(edge_accessor.current_, *transaction_);
|
||||
|
||||
db_.wal_.RemoveEdge(transaction_->id_, edge_accessor.id());
|
||||
db_.wal_.RemoveEdge(transaction_->id_, edge_accessor.gid());
|
||||
}
|
||||
|
||||
GraphDbTypes::Label GraphDbAccessor::Label(const std::string &label_name) {
|
||||
|
@ -55,21 +55,21 @@ class GraphDbAccessor {
|
||||
|
||||
/**
|
||||
* Creates a new Vertex and returns an accessor to it. If the ID is
|
||||
* provided, the created Vertex will have that ID, and the ID counter will be
|
||||
* increased to it so collisions are avoided. This should only be used by
|
||||
* durability recovery, normal vertex creation should not provide the ID.
|
||||
* provided, the created Vertex will have that local ID, and the ID counter
|
||||
* will be increased to it so collisions are avoided. This should only be used
|
||||
* by durability recovery, normal vertex creation should not provide the ID.
|
||||
*
|
||||
* You should NOT make interleaved recovery and normal DB op calls to this
|
||||
* function. Doing so will likely mess up the ID generation and crash MG.
|
||||
* Always perform recovery only once, immediately when the database is
|
||||
* created, before any transactional ops start.
|
||||
*
|
||||
* @param opt_id The desired ID. Should only be provided when recovering from
|
||||
* durability.
|
||||
* @param gid The desired GID. Should only be provided
|
||||
* when recovering from durability, and should `belong` to this worker.
|
||||
* @return See above.
|
||||
*/
|
||||
VertexAccessor InsertVertex(
|
||||
std::experimental::optional<int64_t> opt_id = std::experimental::nullopt);
|
||||
std::experimental::optional<gid::Gid> gid = std::experimental::nullopt);
|
||||
|
||||
/**
|
||||
* Removes the vertex of the given accessor. If the vertex has any outgoing or
|
||||
@ -97,13 +97,13 @@ class GraphDbAccessor {
|
||||
* ID, or it's not visible to this accessor's transaction, nullopt is
|
||||
* returned.
|
||||
*
|
||||
* @param id - The ID of the sought vertex.
|
||||
* @param gid - The GID of the sought vertex.
|
||||
* @param current_state If true then the graph state for the
|
||||
* current transaction+command is returned (insertions, updates and
|
||||
* deletions performed in the current transaction+command are not
|
||||
* ignored).
|
||||
*/
|
||||
std::experimental::optional<VertexAccessor> FindVertex(int64_t id,
|
||||
std::experimental::optional<VertexAccessor> FindVertex(gid::Gid gid,
|
||||
bool current_state);
|
||||
|
||||
/**
|
||||
@ -260,13 +260,13 @@ class GraphDbAccessor {
|
||||
* @param from The 'from' vertex.
|
||||
* @param to The 'to' vertex'
|
||||
* @param type Edge type.
|
||||
* @param opt_id The desired ID. Should only be provided when recovering from
|
||||
* durability.
|
||||
* @param gid The desired GID. Should only be provided when
|
||||
* recovering from durability and should `belong` to this worker.
|
||||
* @return An accessor to the edge.
|
||||
*/
|
||||
EdgeAccessor InsertEdge(
|
||||
VertexAccessor &from, VertexAccessor &to, GraphDbTypes::EdgeType type,
|
||||
std::experimental::optional<int64_t> opt_id = std::experimental::nullopt);
|
||||
std::experimental::optional<gid::Gid> gid = std::experimental::nullopt);
|
||||
|
||||
/**
|
||||
* Removes an edge from the graph. Parameters can indicate if the edge should
|
||||
@ -288,13 +288,13 @@ class GraphDbAccessor {
|
||||
* ID, or it's not visible to this accessor's transaction, nullopt is
|
||||
* returned.
|
||||
*
|
||||
* @param id - The ID of the sought edge.
|
||||
* @param gid - The GID of the sought edge.
|
||||
* @param current_state If true then the graph state for the
|
||||
* current transaction+command is returned (insertions, updates and
|
||||
* deletions performed in the current transaction+command are not
|
||||
* ignored).
|
||||
*/
|
||||
std::experimental::optional<EdgeAccessor> FindEdge(int64_t id,
|
||||
std::experimental::optional<EdgeAccessor> FindEdge(gid::Gid gid,
|
||||
bool current_state);
|
||||
/**
|
||||
* Returns iterable over accessors to all the edges in the graph
|
||||
|
@ -176,14 +176,14 @@ void WriteAheadLog::TxAbort(tx::transaction_id_t tx_id) {
|
||||
}
|
||||
|
||||
void WriteAheadLog::CreateVertex(tx::transaction_id_t tx_id,
|
||||
int64_t vertex_id) {
|
||||
gid::Gid vertex_id) {
|
||||
Op op(Op::Type::CREATE_VERTEX, tx_id);
|
||||
op.vertex_id_ = vertex_id;
|
||||
Emplace(std::move(op));
|
||||
}
|
||||
|
||||
void WriteAheadLog::CreateEdge(tx::transaction_id_t tx_id, int64_t edge_id,
|
||||
int64_t vertex_from_id, int64_t vertex_to_id,
|
||||
void WriteAheadLog::CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
|
||||
gid::Gid vertex_from_id, gid::Gid vertex_to_id,
|
||||
const std::string &edge_type) {
|
||||
Op op(Op::Type::CREATE_EDGE, tx_id);
|
||||
op.edge_id_ = edge_id;
|
||||
@ -194,7 +194,7 @@ void WriteAheadLog::CreateEdge(tx::transaction_id_t tx_id, int64_t edge_id,
|
||||
}
|
||||
|
||||
void WriteAheadLog::PropsSetVertex(tx::transaction_id_t tx_id,
|
||||
int64_t vertex_id,
|
||||
gid::Gid vertex_id,
|
||||
const std::string &property,
|
||||
const PropertyValue &value) {
|
||||
Op op(Op::Type::SET_PROPERTY_VERTEX, tx_id);
|
||||
@ -204,7 +204,7 @@ void WriteAheadLog::PropsSetVertex(tx::transaction_id_t tx_id,
|
||||
Emplace(std::move(op));
|
||||
}
|
||||
|
||||
void WriteAheadLog::PropsSetEdge(tx::transaction_id_t tx_id, int64_t edge_id,
|
||||
void WriteAheadLog::PropsSetEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
|
||||
const std::string &property,
|
||||
const PropertyValue &value) {
|
||||
Op op(Op::Type::SET_PROPERTY_EDGE, tx_id);
|
||||
@ -214,7 +214,7 @@ void WriteAheadLog::PropsSetEdge(tx::transaction_id_t tx_id, int64_t edge_id,
|
||||
Emplace(std::move(op));
|
||||
}
|
||||
|
||||
void WriteAheadLog::AddLabel(tx::transaction_id_t tx_id, int64_t vertex_id,
|
||||
void WriteAheadLog::AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
|
||||
const std::string &label) {
|
||||
Op op(Op::Type::ADD_LABEL, tx_id);
|
||||
op.vertex_id_ = vertex_id;
|
||||
@ -222,7 +222,7 @@ void WriteAheadLog::AddLabel(tx::transaction_id_t tx_id, int64_t vertex_id,
|
||||
Emplace(std::move(op));
|
||||
}
|
||||
|
||||
void WriteAheadLog::RemoveLabel(tx::transaction_id_t tx_id, int64_t vertex_id,
|
||||
void WriteAheadLog::RemoveLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
|
||||
const std::string &label) {
|
||||
Op op(Op::Type::REMOVE_LABEL, tx_id);
|
||||
op.vertex_id_ = vertex_id;
|
||||
@ -231,13 +231,13 @@ void WriteAheadLog::RemoveLabel(tx::transaction_id_t tx_id, int64_t vertex_id,
|
||||
}
|
||||
|
||||
void WriteAheadLog::RemoveVertex(tx::transaction_id_t tx_id,
|
||||
int64_t vertex_id) {
|
||||
gid::Gid vertex_id) {
|
||||
Op op(Op::Type::REMOVE_VERTEX, tx_id);
|
||||
op.vertex_id_ = vertex_id;
|
||||
Emplace(std::move(op));
|
||||
}
|
||||
|
||||
void WriteAheadLog::RemoveEdge(tx::transaction_id_t tx_id, int64_t edge_id) {
|
||||
void WriteAheadLog::RemoveEdge(tx::transaction_id_t tx_id, gid::Gid edge_id) {
|
||||
Op op(Op::Type::REMOVE_EDGE, tx_id);
|
||||
op.edge_id_ = edge_id;
|
||||
Emplace(std::move(op));
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include "database/graph_db_datatypes.hpp"
|
||||
#include "durability/hashed_file_reader.hpp"
|
||||
#include "durability/hashed_file_writer.hpp"
|
||||
#include "storage/gid.hpp"
|
||||
#include "storage/property_value.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
#include "utils/scheduler.hpp"
|
||||
@ -65,10 +66,10 @@ class WriteAheadLog {
|
||||
uint32_t hash_;
|
||||
|
||||
// Members valid only for some ops, see Op::Type comments above.
|
||||
int64_t vertex_id_;
|
||||
int64_t edge_id_;
|
||||
int64_t vertex_from_id_;
|
||||
int64_t vertex_to_id_;
|
||||
gid::Gid vertex_id_;
|
||||
gid::Gid edge_id_;
|
||||
gid::Gid vertex_from_id_;
|
||||
gid::Gid vertex_to_id_;
|
||||
std::string edge_type_;
|
||||
std::string property_;
|
||||
PropertyValue value_ = PropertyValue::Null;
|
||||
@ -97,20 +98,20 @@ class WriteAheadLog {
|
||||
void TxBegin(tx::transaction_id_t tx_id);
|
||||
void TxCommit(tx::transaction_id_t tx_id);
|
||||
void TxAbort(tx::transaction_id_t tx_id);
|
||||
void CreateVertex(tx::transaction_id_t tx_id, int64_t vertex_id);
|
||||
void CreateEdge(tx::transaction_id_t tx_id, int64_t edge_id,
|
||||
int64_t vertex_from_id, int64_t vertex_to_id,
|
||||
void CreateVertex(tx::transaction_id_t tx_id, gid::Gid vertex_id);
|
||||
void CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
|
||||
gid::Gid vertex_from_id, gid::Gid vertex_to_id,
|
||||
const std::string &edge_type);
|
||||
void PropsSetVertex(tx::transaction_id_t tx_id, int64_t vertex_id,
|
||||
void PropsSetVertex(tx::transaction_id_t tx_id, gid::Gid vertex_id,
|
||||
const std::string &property, const PropertyValue &value);
|
||||
void PropsSetEdge(tx::transaction_id_t tx_id, int64_t edge_id,
|
||||
void PropsSetEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
|
||||
const std::string &property, const PropertyValue &value);
|
||||
void AddLabel(tx::transaction_id_t tx_id, int64_t vertex_id,
|
||||
void AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
|
||||
const std::string &label);
|
||||
void RemoveLabel(tx::transaction_id_t tx_id, int64_t vertex_id,
|
||||
void RemoveLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
|
||||
const std::string &label);
|
||||
void RemoveVertex(tx::transaction_id_t tx_id, int64_t vertex_id);
|
||||
void RemoveEdge(tx::transaction_id_t tx_id, int64_t edge_id);
|
||||
void RemoveVertex(tx::transaction_id_t tx_id, gid::Gid vertex_id);
|
||||
void RemoveEdge(tx::transaction_id_t tx_id, gid::Gid edge_id);
|
||||
void BuildIndex(tx::transaction_id_t tx_id, const std::string &label,
|
||||
const std::string &property);
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include "storage/gid.hpp"
|
||||
#include "storage/locking/record_lock.hpp"
|
||||
#include "threading/sync/lockable.hpp"
|
||||
#include "transactions/transaction.hpp"
|
||||
@ -22,13 +23,14 @@ class VersionList {
|
||||
/**
|
||||
* @brief Constructor that is used to insert one item into VersionList.
|
||||
* @param t - transaction
|
||||
* @param id - Version list identifier. Uniqueness guaranteed by the code
|
||||
* @param gid - Version list identifier. Uniqueness guaranteed by the code
|
||||
* creating this version list.
|
||||
* @param args - args forwarded to constructor of item T (for
|
||||
* creating the first Record (Version) in this VersionList.
|
||||
*/
|
||||
template <typename... Args>
|
||||
VersionList(const tx::Transaction &t, int64_t id, Args &&... args) : id_(id) {
|
||||
VersionList(const tx::Transaction &t, gid::Gid gid, Args &&... args)
|
||||
: gid_(gid) {
|
||||
// TODO replace 'new' with something better
|
||||
auto *v1 = new T(std::forward<Args>(args)...);
|
||||
v1->mark_created(t);
|
||||
@ -214,7 +216,7 @@ class VersionList {
|
||||
record->mark_expired(t);
|
||||
}
|
||||
|
||||
const int64_t id_;
|
||||
const gid::Gid gid_;
|
||||
|
||||
private:
|
||||
void lock_and_validate(T *record, const tx::Transaction &t) {
|
||||
|
@ -679,9 +679,9 @@ size_t TypedValue::Hash::operator()(const TypedValue &value) const {
|
||||
return hash;
|
||||
}
|
||||
case TypedValue::Type::Vertex:
|
||||
return value.Value<VertexAccessor>().id();
|
||||
return value.Value<VertexAccessor>().gid();
|
||||
case TypedValue::Type::Edge:
|
||||
return value.Value<EdgeAccessor>().id();
|
||||
return value.Value<EdgeAccessor>().gid();
|
||||
case TypedValue::Type::Path:
|
||||
return FnvCollection<std::vector<VertexAccessor>, VertexAccessor>{}(
|
||||
value.ValuePath().vertices()) ^
|
||||
|
@ -3,37 +3,32 @@
|
||||
#include <cstdint>
|
||||
|
||||
#include "glog/logging.h"
|
||||
#include "storage/gid.hpp"
|
||||
|
||||
namespace storage {
|
||||
|
||||
/**
|
||||
* A data structure that tracks a Vertex/Edge location (address) that's either
|
||||
* local or remote. The remote address consists of a pair (shard_id, global_id),
|
||||
* while the local address is simply the memory address in the current local
|
||||
* process. Both types of address are stored in the same storage space, so an
|
||||
* Address always takes as much memory as a pointer does.
|
||||
* local or remote. The remote address is a global id, while the local address
|
||||
* is simply the memory address in the current local process. Both types of
|
||||
* address are stored in the same storage space, so an Address always takes as
|
||||
* much memory as a pointer does.
|
||||
*
|
||||
* The memory layout for storage is on x64 architecture is the following:
|
||||
* - the lowest bit stores 0 if address is local and 1 if address is global
|
||||
* - if the address is local all 64 bits store the local memory address
|
||||
* - if the address is global then:
|
||||
* - lower bits in [1, 1 + kShardIdSize] range contain the shard ID
|
||||
* - upper (64 - 1 - kShardIdSize) bits, which is the [2 + kShardIdSize,
|
||||
* 63] range contain the globally unique element ID
|
||||
* - if the address is global then most imporant 63 bits store the global id
|
||||
*
|
||||
* @tparam TRecord - Type of record this address points to. Either Vertex or
|
||||
* Edge.
|
||||
*/
|
||||
template <typename TLocalObj>
|
||||
class Address {
|
||||
static constexpr uintptr_t kTypeMask{1};
|
||||
using Storage = uint64_t;
|
||||
static constexpr uintptr_t kTypeMaskSize{1};
|
||||
static constexpr uintptr_t kTypeMask{(1ULL << kTypeMaskSize) - 1};
|
||||
static constexpr uintptr_t kLocal{0};
|
||||
static constexpr uintptr_t kRemote{1};
|
||||
static constexpr size_t kShardIdPos{1};
|
||||
// To modify memory layout only change kShardIdSize.
|
||||
static constexpr size_t kShardIdSize{10};
|
||||
static constexpr size_t KGlobalIdPos{kShardIdPos + kShardIdSize};
|
||||
static constexpr size_t kGlobalIdSize{64 - 1 - kShardIdSize};
|
||||
|
||||
public:
|
||||
// Constructor for local Address.
|
||||
@ -44,16 +39,12 @@ class Address {
|
||||
}
|
||||
|
||||
// Constructor for remote Address.
|
||||
Address(uint64_t shard_id, uint64_t global_id) {
|
||||
// TODO make a SSOT about max shard ID. Ensure that a shard with a larger ID
|
||||
// can't be created, and that this ID fits into kShardIdSize bits.
|
||||
CHECK(shard_id < (1ULL << (kShardIdSize - 1))) << "Shard ID too big.";
|
||||
CHECK(global_id < (1ULL << (kGlobalIdSize - 1)))
|
||||
<< "Global element ID too big.";
|
||||
Address(gid::Gid global_id) {
|
||||
CHECK(global_id < (1ULL << (sizeof(Storage) * 8 - kTypeMaskSize)))
|
||||
<< "Too large global id";
|
||||
|
||||
storage_ = kRemote;
|
||||
storage_ |= shard_id << kShardIdPos;
|
||||
storage_ |= global_id << KGlobalIdPos;
|
||||
storage_ |= global_id << kTypeMaskSize;
|
||||
}
|
||||
|
||||
bool is_local() const { return (storage_ & kTypeMask) == kLocal; }
|
||||
@ -64,14 +55,9 @@ class Address {
|
||||
return reinterpret_cast<TLocalObj *>(storage_);
|
||||
}
|
||||
|
||||
uint64_t shard_id() const {
|
||||
DCHECK(is_remote()) << "Attempting to get shard ID from local address";
|
||||
return (storage_ >> kShardIdPos) & ((1ULL << kShardIdSize) - 1);
|
||||
}
|
||||
|
||||
uint64_t global_id() const {
|
||||
gid::Gid global_id() const {
|
||||
DCHECK(is_remote()) << "Attempting to get global ID from local address";
|
||||
return (storage_ >> KGlobalIdPos) & ((1ULL << kGlobalIdSize) - 1);
|
||||
return storage_ >> kTypeMaskSize;
|
||||
}
|
||||
|
||||
bool operator==(const Address<TLocalObj> &other) const {
|
||||
@ -79,6 +65,6 @@ class Address {
|
||||
}
|
||||
|
||||
private:
|
||||
uintptr_t storage_{0};
|
||||
Storage storage_{0};
|
||||
};
|
||||
} // namespace storage
|
||||
|
@ -99,6 +99,6 @@ std::ostream &operator<<(std::ostream &, const EdgeAccessor &);
|
||||
namespace std {
|
||||
template <>
|
||||
struct hash<EdgeAccessor> {
|
||||
size_t operator()(const EdgeAccessor &e) const { return e.id(); };
|
||||
size_t operator()(const EdgeAccessor &e) const { return e.gid(); };
|
||||
};
|
||||
} // namespace std
|
||||
|
59
src/storage/gid.hpp
Normal file
59
src/storage/gid.hpp
Normal file
@ -0,0 +1,59 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
#include <experimental/optional>
|
||||
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "utils/atomic.hpp"
|
||||
|
||||
namespace gid {
|
||||
/**
|
||||
* Global ids are created by taking both the `local` object id, and `worker` id.
|
||||
* A global ID has 64 bits. The lower kWorkerIdSize bits contain the worker ID.
|
||||
* All the other (upper) bits contain the local ID.
|
||||
*/
|
||||
using Gid = uint64_t;
|
||||
|
||||
static constexpr std::size_t kWorkerIdSize{10};
|
||||
|
||||
/// Returns worker id from global id.
|
||||
static inline Gid WorkerId(Gid gid) {
|
||||
return gid & ((1ULL << kWorkerIdSize) - 1);
|
||||
}
|
||||
|
||||
/// Returns `local` id from global id.
|
||||
static inline Gid LocalId(Gid gid) { return gid >> kWorkerIdSize; }
|
||||
|
||||
/// Returns global id from worker and local ids.
|
||||
static inline Gid Create(uint64_t worker_id, uint64_t local_id) {
|
||||
CHECK(worker_id < (1ULL << kWorkerIdSize));
|
||||
CHECK(local_id < (1ULL << (sizeof(Gid) * 8 - kWorkerIdSize)));
|
||||
return worker_id | (local_id << kWorkerIdSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief - Threadsafe generation of new global ids which belong to the
|
||||
* worker_id machine
|
||||
*/
|
||||
class GidGenerator {
|
||||
public:
|
||||
GidGenerator(int worker_id) : worker_id_(worker_id) {}
|
||||
/**
|
||||
* Returns a new globally unique identifier.
|
||||
* @param local_id - force local id instead of generating a new one
|
||||
*/
|
||||
gid::Gid Next(std::experimental::optional<Gid> local_id) {
|
||||
auto id = local_id ? *local_id : id_++;
|
||||
if (local_id) {
|
||||
utils::EnsureAtomicGe(id_, id + 1);
|
||||
}
|
||||
return gid::Create(worker_id_, id);
|
||||
}
|
||||
|
||||
private:
|
||||
int worker_id_;
|
||||
std::atomic<Gid> id_{0};
|
||||
};
|
||||
}; // namespace gid
|
@ -22,7 +22,7 @@ void RecordAccessor<Vertex>::PropsSet(GraphDbTypes::Property key,
|
||||
Vertex &vertex = update();
|
||||
vertex.properties_.set(key, value);
|
||||
auto &dba = db_accessor();
|
||||
dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->id_,
|
||||
dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->gid_,
|
||||
dba.PropertyName(key), value);
|
||||
db_accessor().UpdatePropertyIndex(key, *this, &vertex);
|
||||
}
|
||||
@ -32,14 +32,14 @@ void RecordAccessor<Edge>::PropsSet(GraphDbTypes::Property key,
|
||||
PropertyValue value) {
|
||||
update().properties_.set(key, value);
|
||||
auto &dba = db_accessor();
|
||||
dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->id_,
|
||||
dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->gid_,
|
||||
dba.PropertyName(key), value);
|
||||
}
|
||||
|
||||
template <>
|
||||
size_t RecordAccessor<Vertex>::PropsErase(GraphDbTypes::Property key) {
|
||||
auto &dba = db_accessor();
|
||||
dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->id_,
|
||||
dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->gid_,
|
||||
dba.PropertyName(key), PropertyValue::Null);
|
||||
return update().properties_.erase(key);
|
||||
}
|
||||
@ -47,7 +47,7 @@ size_t RecordAccessor<Vertex>::PropsErase(GraphDbTypes::Property key) {
|
||||
template <>
|
||||
size_t RecordAccessor<Edge>::PropsErase(GraphDbTypes::Property key) {
|
||||
auto &dba = db_accessor();
|
||||
dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->id_,
|
||||
dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->gid_,
|
||||
dba.PropertyName(key), PropertyValue::Null);
|
||||
return update().properties_.erase(key);
|
||||
}
|
||||
@ -57,7 +57,7 @@ void RecordAccessor<Vertex>::PropsClear() {
|
||||
auto &updated = update();
|
||||
auto &dba = db_accessor();
|
||||
for (const auto &kv : updated.properties_)
|
||||
dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->id_,
|
||||
dba.wal().PropsSetVertex(dba.transaction_id(), vlist_->gid_,
|
||||
dba.PropertyName(kv.first), PropertyValue::Null);
|
||||
updated.properties_.clear();
|
||||
}
|
||||
@ -67,7 +67,7 @@ void RecordAccessor<Edge>::PropsClear() {
|
||||
auto &updated = update();
|
||||
auto &dba = db_accessor();
|
||||
for (const auto &kv : updated.properties_)
|
||||
dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->id_,
|
||||
dba.wal().PropsSetEdge(dba.transaction_id(), vlist_->gid_,
|
||||
dba.PropertyName(kv.first), PropertyValue::Null);
|
||||
updated.properties_.clear();
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
#include "database/graph_db_datatypes.hpp"
|
||||
#include "mvcc/version_list.hpp"
|
||||
#include "storage/gid.hpp"
|
||||
#include "storage/property_value.hpp"
|
||||
#include "storage/property_value_store.hpp"
|
||||
#include "utils/total_ordering.hpp"
|
||||
@ -115,10 +116,10 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
|
||||
GraphDbAccessor &db_accessor() const;
|
||||
|
||||
/** Returns a database-unique index of this vertex or edge. Note that vertices
|
||||
* and edges have separate ID domains, there can be a vertex with ID X and an
|
||||
* edge with the same id.
|
||||
* and edges have separate GID domains, there can be a vertex with GID X and
|
||||
* an edge with the same gid.
|
||||
*/
|
||||
int64_t id() const { return vlist_->id_; }
|
||||
gid::Gid gid() const { return vlist_->gid_; }
|
||||
|
||||
/*
|
||||
* Switches this record accessor to use the latest
|
||||
|
@ -19,7 +19,7 @@ bool VertexAccessor::add_label(GraphDbTypes::Label label) {
|
||||
vertex.labels_.emplace_back(label);
|
||||
auto &dba = db_accessor();
|
||||
dba.UpdateLabelIndices(label, *this, &vertex);
|
||||
dba.wal().AddLabel(dba.transaction_id(), id(), dba.LabelName(label));
|
||||
dba.wal().AddLabel(dba.transaction_id(), gid(), dba.LabelName(label));
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -31,7 +31,7 @@ size_t VertexAccessor::remove_label(GraphDbTypes::Label label) {
|
||||
std::swap(*found, labels.back());
|
||||
labels.pop_back();
|
||||
auto &dba = db_accessor();
|
||||
dba.wal().RemoveLabel(dba.transaction_id(), id(), dba.LabelName(label));
|
||||
dba.wal().RemoveLabel(dba.transaction_id(), gid(), dba.LabelName(label));
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -50,9 +50,10 @@ std::ostream &operator<<(std::ostream &os, const VertexAccessor &va) {
|
||||
stream << va.db_accessor().LabelName(label);
|
||||
});
|
||||
os << " {";
|
||||
utils::PrintIterable(os, va.Properties(), ", ", [&](auto &stream,
|
||||
const auto &pair) {
|
||||
stream << va.db_accessor().PropertyName(pair.first) << ": " << pair.second;
|
||||
});
|
||||
utils::PrintIterable(os, va.Properties(), ", ",
|
||||
[&](auto &stream, const auto &pair) {
|
||||
stream << va.db_accessor().PropertyName(pair.first)
|
||||
<< ": " << pair.second;
|
||||
});
|
||||
return os << "})";
|
||||
}
|
||||
|
@ -175,6 +175,6 @@ std::ostream &operator<<(std::ostream &, const VertexAccessor &);
|
||||
namespace std {
|
||||
template <>
|
||||
struct hash<VertexAccessor> {
|
||||
size_t operator()(const VertexAccessor &v) const { return v.id(); };
|
||||
size_t operator()(const VertexAccessor &v) const { return v.gid(); };
|
||||
};
|
||||
} // namespace std
|
||||
|
@ -198,13 +198,13 @@ TEST(BoltEncoder, VertexAndEdge) {
|
||||
// and Memgraph now encodes IDs so we need to check the output
|
||||
// part by part.
|
||||
CheckOutput(output, vertexedge_encoded, 5, false);
|
||||
CheckInt(output, va1.id());
|
||||
CheckInt(output, va1.gid());
|
||||
CheckOutput(output, vertexedge_encoded + 6, 34, false);
|
||||
CheckInt(output, va2.id());
|
||||
CheckInt(output, va2.gid());
|
||||
CheckOutput(output, vertexedge_encoded + 41, 4, false);
|
||||
CheckInt(output, ea.id());
|
||||
CheckInt(output, va1.id());
|
||||
CheckInt(output, va2.id());
|
||||
CheckInt(output, ea.gid());
|
||||
CheckInt(output, va1.gid());
|
||||
CheckInt(output, va2.gid());
|
||||
CheckOutput(output, vertexedge_encoded + 48, 26);
|
||||
}
|
||||
|
||||
|
@ -54,7 +54,7 @@ class DbGenerator {
|
||||
|
||||
VertexAccessor InsertVertex() {
|
||||
auto vertex = dba_.InsertVertex();
|
||||
vertex_ids_.emplace_back(vertex.id());
|
||||
vertex_ids_.emplace_back(vertex.gid());
|
||||
return vertex;
|
||||
}
|
||||
|
||||
@ -67,7 +67,7 @@ class DbGenerator {
|
||||
auto from = RandomVertex();
|
||||
auto to = RandomVertex();
|
||||
auto edge = dba_.InsertEdge(from, to, EdgeType(RandomInt(kEdgeTypeCount)));
|
||||
edge_ids_.emplace_back(edge.id());
|
||||
edge_ids_.emplace_back(edge.gid());
|
||||
return edge;
|
||||
}
|
||||
|
||||
@ -188,8 +188,8 @@ void CompareDbs(GraphDb &a, GraphDb &b) {
|
||||
int vertices_a_count = 0;
|
||||
for (auto v_a : dba_a.Vertices(false)) {
|
||||
vertices_a_count++;
|
||||
auto v_b = dba_b.FindVertex(v_a.id(), false);
|
||||
ASSERT_TRUE(v_b) << "Vertex not found, id: " << v_a.id();
|
||||
auto v_b = dba_b.FindVertex(v_a.gid(), false);
|
||||
ASSERT_TRUE(v_b) << "Vertex not found, id: " << v_a.gid();
|
||||
ASSERT_EQ(v_a.labels().size(), v_b->labels().size());
|
||||
std::vector<std::string> v_a_labels;
|
||||
std::vector<std::string> v_b_labels;
|
||||
@ -207,13 +207,13 @@ void CompareDbs(GraphDb &a, GraphDb &b) {
|
||||
int edges_a_count = 0;
|
||||
for (auto e_a : dba_a.Edges(false)) {
|
||||
edges_a_count++;
|
||||
auto e_b = dba_b.FindEdge(e_a.id(), false);
|
||||
auto e_b = dba_b.FindEdge(e_a.gid(), false);
|
||||
ASSERT_TRUE(e_b);
|
||||
ASSERT_TRUE(e_b) << "Edge not found, id: " << e_a.id();
|
||||
ASSERT_TRUE(e_b) << "Edge not found, id: " << e_a.gid();
|
||||
EXPECT_EQ(dba_a.EdgeTypeName(e_a.EdgeType()),
|
||||
dba_b.EdgeTypeName(e_b->EdgeType()));
|
||||
EXPECT_EQ(e_a.from().id(), e_b->from().id());
|
||||
EXPECT_EQ(e_a.to().id(), e_b->to().id());
|
||||
EXPECT_EQ(e_a.from().gid(), e_b->from().gid());
|
||||
EXPECT_EQ(e_a.to().gid(), e_b->to().gid());
|
||||
EXPECT_TRUE(is_permutation_props(e_a.Properties(), e_b->Properties()));
|
||||
}
|
||||
auto edges_b = dba_b.Edges(false);
|
||||
@ -305,19 +305,21 @@ class Durability : public ::testing::Test {
|
||||
};
|
||||
|
||||
TEST_F(Durability, WalEncoding) {
|
||||
auto gid0 = gid::Create(0, 0);
|
||||
auto gid1 = gid::Create(0, 1);
|
||||
{
|
||||
auto config = DbConfig();
|
||||
config.durability_enabled = true;
|
||||
GraphDb db{config};
|
||||
GraphDbAccessor dba(db);
|
||||
auto v0 = dba.InsertVertex();
|
||||
ASSERT_EQ(v0.id(), 0);
|
||||
ASSERT_EQ(v0.gid(), gid0);
|
||||
v0.add_label(dba.Label("l0"));
|
||||
v0.PropsSet(dba.Property("p0"), 42);
|
||||
auto v1 = dba.InsertVertex();
|
||||
ASSERT_EQ(v1.id(), 1);
|
||||
ASSERT_EQ(v1.gid(), gid1);
|
||||
auto e0 = dba.InsertEdge(v0, v1, dba.EdgeType("et0"));
|
||||
ASSERT_EQ(e0.id(), 0);
|
||||
ASSERT_EQ(e0.gid(), gid0);
|
||||
e0.PropsSet(dba.Property("p0"), std::vector<PropertyValue>{1, 2, 3});
|
||||
dba.BuildIndex(dba.Label("l1"), dba.Property("p1"));
|
||||
dba.Commit();
|
||||
@ -347,28 +349,28 @@ TEST_F(Durability, WalEncoding) {
|
||||
EXPECT_EQ(ops[0].transaction_id_, 1);
|
||||
EXPECT_EQ(ops[1].type_, Type::CREATE_VERTEX);
|
||||
EXPECT_EQ(ops[1].transaction_id_, 1);
|
||||
EXPECT_EQ(ops[1].vertex_id_, 0);
|
||||
EXPECT_EQ(ops[1].vertex_id_, gid0);
|
||||
EXPECT_EQ(ops[2].type_, Type::ADD_LABEL);
|
||||
EXPECT_EQ(ops[2].transaction_id_, 1);
|
||||
EXPECT_EQ(ops[2].label_, "l0");
|
||||
EXPECT_EQ(ops[3].type_, Type::SET_PROPERTY_VERTEX);
|
||||
EXPECT_EQ(ops[3].transaction_id_, 1);
|
||||
EXPECT_EQ(ops[3].vertex_id_, 0);
|
||||
EXPECT_EQ(ops[3].vertex_id_, gid0);
|
||||
EXPECT_EQ(ops[3].property_, "p0");
|
||||
EXPECT_EQ(ops[3].value_.type(), PropertyValue::Type::Int);
|
||||
EXPECT_EQ(ops[3].value_.Value<int64_t>(), 42);
|
||||
EXPECT_EQ(ops[4].type_, Type::CREATE_VERTEX);
|
||||
EXPECT_EQ(ops[4].transaction_id_, 1);
|
||||
EXPECT_EQ(ops[4].vertex_id_, 1);
|
||||
EXPECT_EQ(ops[4].vertex_id_, gid1);
|
||||
EXPECT_EQ(ops[5].type_, Type::CREATE_EDGE);
|
||||
EXPECT_EQ(ops[5].transaction_id_, 1);
|
||||
EXPECT_EQ(ops[5].edge_id_, 0);
|
||||
EXPECT_EQ(ops[5].vertex_from_id_, 0);
|
||||
EXPECT_EQ(ops[5].vertex_to_id_, 1);
|
||||
EXPECT_EQ(ops[5].edge_id_, gid0);
|
||||
EXPECT_EQ(ops[5].vertex_from_id_, gid0);
|
||||
EXPECT_EQ(ops[5].vertex_to_id_, gid1);
|
||||
EXPECT_EQ(ops[5].edge_type_, "et0");
|
||||
EXPECT_EQ(ops[6].type_, Type::SET_PROPERTY_EDGE);
|
||||
EXPECT_EQ(ops[6].transaction_id_, 1);
|
||||
EXPECT_EQ(ops[6].edge_id_, 0);
|
||||
EXPECT_EQ(ops[6].edge_id_, gid0);
|
||||
EXPECT_EQ(ops[6].property_, "p0");
|
||||
EXPECT_EQ(ops[6].value_.type(), PropertyValue::Type::List);
|
||||
// The next two ops are the BuildIndex internal transactions.
|
||||
@ -386,22 +388,22 @@ TEST_F(Durability, SnapshotEncoding) {
|
||||
GraphDb db{DbConfig()};
|
||||
GraphDbAccessor dba(db);
|
||||
auto v0 = dba.InsertVertex();
|
||||
ASSERT_EQ(v0.id(), 0);
|
||||
ASSERT_EQ(v0.gid(), gid::Create(0, 0));
|
||||
v0.add_label(dba.Label("l0"));
|
||||
v0.PropsSet(dba.Property("p0"), 42);
|
||||
auto v1 = dba.InsertVertex();
|
||||
ASSERT_EQ(v1.id(), 1);
|
||||
ASSERT_EQ(v1.gid(), gid::Create(0, 1));
|
||||
v1.add_label(dba.Label("l0"));
|
||||
v1.add_label(dba.Label("l1"));
|
||||
auto v2 = dba.InsertVertex();
|
||||
ASSERT_EQ(v2.id(), 2);
|
||||
ASSERT_EQ(v2.gid(), gid::Create(0, 2));
|
||||
v2.PropsSet(dba.Property("p0"), true);
|
||||
v2.PropsSet(dba.Property("p1"), "Johnny");
|
||||
auto e0 = dba.InsertEdge(v0, v1, dba.EdgeType("et0"));
|
||||
ASSERT_EQ(e0.id(), 0);
|
||||
ASSERT_EQ(e0.gid(), gid::Create(0, 0));
|
||||
e0.PropsSet(dba.Property("p0"), std::vector<PropertyValue>{1, 2, 3});
|
||||
auto e1 = dba.InsertEdge(v2, v1, dba.EdgeType("et1"));
|
||||
ASSERT_EQ(e1.id(), 1);
|
||||
ASSERT_EQ(e1.gid(), gid::Create(0, 1));
|
||||
dba.BuildIndex(dba.Label("l1"), dba.Property("p1"));
|
||||
dba.Commit();
|
||||
MakeSnapshot(db);
|
||||
@ -438,8 +440,7 @@ TEST_F(Durability, SnapshotEncoding) {
|
||||
EXPECT_EQ(dv.ValueList()[0].ValueString(), "l1");
|
||||
EXPECT_EQ(dv.ValueList()[1].ValueString(), "p1");
|
||||
|
||||
std::map<int64_t, communication::bolt::DecodedVertex> decoded_vertices;
|
||||
std::map<int64_t, communication::bolt::DecodedEdge> decoded_edges;
|
||||
std::map<gid::Gid, communication::bolt::DecodedVertex> decoded_vertices;
|
||||
|
||||
// Decode vertices.
|
||||
for (int i = 0; i < vertex_count; ++i) {
|
||||
@ -448,15 +449,20 @@ TEST_F(Durability, SnapshotEncoding) {
|
||||
auto &vertex = dv.ValueVertex();
|
||||
decoded_vertices.emplace(vertex.id, vertex);
|
||||
}
|
||||
auto gid0 = gid::Create(0, 0);
|
||||
auto gid1 = gid::Create(0, 1);
|
||||
auto gid2 = gid::Create(0, 2);
|
||||
ASSERT_EQ(decoded_vertices.size(), 3);
|
||||
ASSERT_EQ(decoded_vertices[0].labels.size(), 1);
|
||||
EXPECT_EQ(decoded_vertices[0].labels[0], "l0");
|
||||
ASSERT_EQ(decoded_vertices[0].properties.size(), 1);
|
||||
EXPECT_EQ(decoded_vertices[0].properties["p0"].ValueInt(), 42);
|
||||
EXPECT_EQ(decoded_vertices[1].labels.size(), 2);
|
||||
EXPECT_EQ(decoded_vertices[1].properties.size(), 0);
|
||||
EXPECT_EQ(decoded_vertices[2].labels.size(), 0);
|
||||
EXPECT_EQ(decoded_vertices[2].properties.size(), 2);
|
||||
ASSERT_EQ(decoded_vertices[gid0].labels.size(), 1);
|
||||
EXPECT_EQ(decoded_vertices[gid0].labels[0], "l0");
|
||||
ASSERT_EQ(decoded_vertices[gid0].properties.size(), 1);
|
||||
EXPECT_EQ(decoded_vertices[gid0].properties["p0"].ValueInt(), 42);
|
||||
EXPECT_EQ(decoded_vertices[gid1].labels.size(), 2);
|
||||
EXPECT_EQ(decoded_vertices[gid1].properties.size(), 0);
|
||||
EXPECT_EQ(decoded_vertices[gid2].labels.size(), 0);
|
||||
EXPECT_EQ(decoded_vertices[gid2].properties.size(), 2);
|
||||
|
||||
std::map<gid::Gid, communication::bolt::DecodedEdge> decoded_edges;
|
||||
|
||||
// Decode edges.
|
||||
for (int i = 0; i < edge_count; ++i) {
|
||||
@ -465,15 +471,15 @@ TEST_F(Durability, SnapshotEncoding) {
|
||||
auto &edge = dv.ValueEdge();
|
||||
decoded_edges.emplace(edge.id, edge);
|
||||
}
|
||||
ASSERT_EQ(decoded_edges.size(), 2);
|
||||
ASSERT_EQ(decoded_edges[0].from, 0);
|
||||
ASSERT_EQ(decoded_edges[0].to, 1);
|
||||
ASSERT_EQ(decoded_edges[0].type, "et0");
|
||||
ASSERT_EQ(decoded_edges[0].properties.size(), 1);
|
||||
ASSERT_EQ(decoded_edges[1].from, 2);
|
||||
ASSERT_EQ(decoded_edges[1].to, 1);
|
||||
ASSERT_EQ(decoded_edges[1].type, "et1");
|
||||
ASSERT_EQ(decoded_edges[1].properties.size(), 0);
|
||||
EXPECT_EQ(decoded_edges.size(), 2);
|
||||
EXPECT_EQ(decoded_edges[gid0].from, gid0);
|
||||
EXPECT_EQ(decoded_edges[gid0].to, gid1);
|
||||
EXPECT_EQ(decoded_edges[gid0].type, "et0");
|
||||
EXPECT_EQ(decoded_edges[gid0].properties.size(), 1);
|
||||
EXPECT_EQ(decoded_edges[gid1].from, gid2);
|
||||
EXPECT_EQ(decoded_edges[gid1].to, gid1);
|
||||
EXPECT_EQ(decoded_edges[gid1].type, "et1");
|
||||
EXPECT_EQ(decoded_edges[gid1].properties.size(), 0);
|
||||
|
||||
// Vertex and edge counts are included in the hash. Re-read them to update the
|
||||
// hash.
|
||||
|
@ -18,13 +18,13 @@ TEST(GraphDbAccessorTest, InsertVertex) {
|
||||
|
||||
EXPECT_EQ(Count(accessor.Vertices(false)), 0);
|
||||
|
||||
EXPECT_EQ(accessor.InsertVertex().id(), 0);
|
||||
EXPECT_EQ(accessor.InsertVertex().gid(), 0);
|
||||
EXPECT_EQ(Count(accessor.Vertices(false)), 0);
|
||||
EXPECT_EQ(Count(accessor.Vertices(true)), 1);
|
||||
accessor.AdvanceCommand();
|
||||
EXPECT_EQ(Count(accessor.Vertices(false)), 1);
|
||||
|
||||
EXPECT_EQ(accessor.InsertVertex().id(), 1);
|
||||
EXPECT_EQ(accessor.InsertVertex().gid(), gid::Create(0, 1));
|
||||
EXPECT_EQ(Count(accessor.Vertices(false)), 1);
|
||||
EXPECT_EQ(Count(accessor.Vertices(true)), 2);
|
||||
accessor.AdvanceCommand();
|
||||
@ -40,7 +40,7 @@ TEST(GraphDbAccessorTest, UniqueVertexId) {
|
||||
threads.emplace_back([&db, &ids]() {
|
||||
GraphDbAccessor dba(db);
|
||||
auto access = ids.access();
|
||||
for (int i = 0; i < 200; i++) access.insert(dba.InsertVertex().id());
|
||||
for (int i = 0; i < 200; i++) access.insert(dba.InsertVertex().gid());
|
||||
});
|
||||
}
|
||||
|
||||
@ -146,7 +146,7 @@ TEST(GraphDbAccessorTest, UniqueEdgeId) {
|
||||
auto edge_type = dba.EdgeType("edge_type");
|
||||
auto access = ids.access();
|
||||
for (int i = 0; i < 200; i++)
|
||||
access.insert(dba.InsertEdge(v1, v2, edge_type).id());
|
||||
access.insert(dba.InsertEdge(v1, v2, edge_type).gid());
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "storage/address.hpp"
|
||||
#include "storage/gid.hpp"
|
||||
|
||||
using storage::Address;
|
||||
|
||||
@ -22,12 +23,13 @@ TEST(Address, CopyCompare) {
|
||||
}
|
||||
|
||||
TEST(Address, Global) {
|
||||
uint64_t shard_id{13};
|
||||
uint64_t global_id{31};
|
||||
Address<int> address{shard_id, global_id};
|
||||
uint64_t worker_id{13};
|
||||
uint64_t local_id{31};
|
||||
auto global_id = gid::Create(worker_id, local_id);
|
||||
Address<int> address{global_id};
|
||||
|
||||
EXPECT_TRUE(address.is_remote());
|
||||
EXPECT_FALSE(address.is_local());
|
||||
EXPECT_EQ(address.shard_id(), shard_id);
|
||||
EXPECT_EQ(gid::WorkerId(address.global_id()), worker_id);
|
||||
EXPECT_EQ(address.global_id(), global_id);
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ class MemgraphNodeIdMap {
|
||||
}
|
||||
|
||||
int64_t Insert(const NodeId &node_id) {
|
||||
int64_t id = mg_id_++;
|
||||
gid::Gid id = gid::Create(0, mg_id_++);
|
||||
node_id_to_mg_[node_id] = id;
|
||||
return id;
|
||||
}
|
||||
@ -197,7 +197,7 @@ void WriteNodeRow(const std::vector<Field> &fields,
|
||||
const std::vector<std::string> &row,
|
||||
MemgraphNodeIdMap &node_id_map,
|
||||
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) {
|
||||
std::experimental::optional<int64_t> id;
|
||||
std::experimental::optional<gid::Gid> id;
|
||||
std::vector<query::TypedValue> labels;
|
||||
std::map<std::string, query::TypedValue> properties;
|
||||
for (int i = 0; i < row.size(); ++i) {
|
||||
@ -256,7 +256,7 @@ auto ConvertNodes(const std::string &nodes_path, MemgraphNodeIdMap &node_id_map,
|
||||
|
||||
void WriteRelationshipsRow(
|
||||
const std::vector<Field> &fields, const std::vector<std::string> &row,
|
||||
const MemgraphNodeIdMap &node_id_map, int64_t relationship_id,
|
||||
const MemgraphNodeIdMap &node_id_map, gid::Gid relationship_id,
|
||||
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) {
|
||||
std::experimental::optional<int64_t> start_id;
|
||||
std::experimental::optional<int64_t> end_id;
|
||||
@ -312,8 +312,8 @@ void ConvertRelationships(
|
||||
while (!row.empty()) {
|
||||
CHECK_EQ(row.size(), fields.size())
|
||||
<< "Expected as many values as there are header fields";
|
||||
WriteRelationshipsRow(fields, row, node_id_map, next_relationship_id++,
|
||||
encoder);
|
||||
WriteRelationshipsRow(fields, row, node_id_map,
|
||||
gid::Create(0, next_relationship_id++), encoder);
|
||||
row = ReadRow(relationships_file);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user