From 888c6a4bca9fbc033a2d49c947d3626e023a89ec Mon Sep 17 00:00:00 2001 From: Marko Budiselic <marko.budiselic@memgraph.io> Date: Wed, 4 Jul 2018 21:32:07 +0200 Subject: [PATCH] Add support for the id function Reviewers: dgleich, teon.banek, mferencevic Reviewed By: mferencevic Subscribers: dgleich, mferencevic, teon.banek, pullbot Differential Revision: https://phabricator.memgraph.io/D1462 --- config/community.conf | 11 ---- docs/user_technical/open-cypher.md | 3 +- src/communication/bolt/v1/codes.hpp | 4 +- src/communication/bolt/v1/decoder/decoder.hpp | 2 +- src/database/graph_db_accessor.cpp | 62 ++++++++----------- src/database/graph_db_accessor.hpp | 38 +++++++++--- src/database/state_delta.cpp | 40 ++++++------ src/database/state_delta.lcp | 8 ++- src/database/storage.hpp | 8 ++- src/durability/recovery.cpp | 17 ++++- src/durability/snapshooter.cpp | 3 +- src/durability/snapshot_decoded_value.hpp | 1 + src/durability/snapshot_decoder.hpp | 32 ++++++---- src/durability/snapshot_encoder.hpp | 15 +++-- src/durability/version.hpp | 23 +++++-- src/mvcc/version_list.hpp | 24 ++++++- .../interpret/awesome_memgraph_functions.cpp | 18 +----- src/storage/address.hpp | 6 +- src/storage/record_accessor.hpp | 7 +++ src/storage/vertex_accessor.hpp | 12 ++-- src/utils/cast.hpp | 37 +++++++++++ src/utils/underlying_cast.hpp | 12 ---- tests/benchmark/mvcc.cpp | 2 +- .../snapshot_generation/snapshot_writer.hpp | 8 ++- tests/unit/database_key_index.cpp | 10 +-- tests/unit/database_label_property_index.cpp | 2 +- tests/unit/distributed_serialization.cpp | 38 +++++++++--- tests/unit/durability.cpp | 8 ++- tests/unit/mvcc.cpp | 8 +-- tests/unit/mvcc_find_update_common.hpp | 2 +- tests/unit/mvcc_gc.cpp | 4 +- tests/unit/query_expression_evaluator.cpp | 43 +++---------- tests/unit/state_delta.cpp | 9 ++- tools/src/mg_import_csv/main.cpp | 17 +++-- 34 files changed, 316 insertions(+), 218 deletions(-) create mode 100644 src/utils/cast.hpp delete mode 100644 src/utils/underlying_cast.hpp diff --git a/config/community.conf b/config/community.conf index e572d897d..75ccb480b 100644 --- a/config/community.conf +++ b/config/community.conf @@ -48,17 +48,6 @@ # would be --properties-on-disk=biography,summary. #--properties-on-disk= -## Ids - -# Memgraph can generate an identifier for each vertex or edge. The -# generated ids are returned with the id function. - -# Memgraph can generate an identifier for each vertex. ---generate-vertex-ids=true - -# Memgraph can generate an identifier for each edge. ---generate-edge-ids=true - ## Query # # Various settings related to openCypher query execution. diff --git a/docs/user_technical/open-cypher.md b/docs/user_technical/open-cypher.md index d71bf37f2..249c7db0c 100644 --- a/docs/user_technical/open-cypher.md +++ b/docs/user_technical/open-cypher.md @@ -715,9 +715,8 @@ functions. `counter` | Generates integers that are guaranteed to be unique on the database level, for the given counter name. `counterSet` | Sets the counter with the given name to the given value. `indexInfo` | Returns a list of all the indexes available in the database. The list includes indexes that are not yet ready for use (they are concurrently being built by another transaction). - `id` | Returns identifier for a given node or edge. To enable automatic generation of the identifiers, `--generate-vertex-ids` and `--generate-edge-ids` parameters have to be set on `true` (enabled in the configuration by default). `timestamp` | Returns the difference, measured in milliseconds, between the current time and midnight, January 1, 1970 UTC. - + `id` | Returns identifier for a given node or edge. The identifier is generated during the initialization of node or edge and will be persisted through the durability mechanism. #### String Operators diff --git a/src/communication/bolt/v1/codes.hpp b/src/communication/bolt/v1/codes.hpp index b47cd87c7..be769ff5d 100644 --- a/src/communication/bolt/v1/codes.hpp +++ b/src/communication/bolt/v1/codes.hpp @@ -1,7 +1,7 @@ #pragma once #include <cstdint> -#include "utils/underlying_cast.hpp" +#include "utils/cast.hpp" namespace communication::bolt { @@ -82,4 +82,4 @@ static constexpr Marker Marker16[3] = {Marker::String16, Marker::List16, Marker::Map16}; static constexpr Marker Marker32[3] = {Marker::String32, Marker::List32, Marker::Map32}; -} +} // namespace communication::bolt diff --git a/src/communication/bolt/v1/decoder/decoder.hpp b/src/communication/bolt/v1/decoder/decoder.hpp index 1f09f8a29..a7bb44c39 100644 --- a/src/communication/bolt/v1/decoder/decoder.hpp +++ b/src/communication/bolt/v1/decoder/decoder.hpp @@ -7,7 +7,7 @@ #include "communication/bolt/v1/codes.hpp" #include "communication/bolt/v1/decoder/decoded_value.hpp" #include "utils/bswap.hpp" -#include "utils/underlying_cast.hpp" +#include "utils/cast.hpp" namespace communication::bolt { diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index ffd9d1337..1e1cf1109 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -15,18 +15,6 @@ #include "utils/atomic.hpp" #include "utils/on_scope_exit.hpp" -// Autogenerate property with Vertex ID -DEFINE_bool( - generate_vertex_ids, false, - "If enabled database will automatically generate Ids as properties of " - "vertices, which can be accessed using the `Id` cypher function."); - -// Autogenerate property with Edge ID -DEFINE_bool( - generate_edge_ids, false, - "If enabled database will automatically generate Ids as properties of " - "edges, which can be accessed using the `Id` cypher function."); - namespace database { GraphDbAccessor::GraphDbAccessor(GraphDb &db) @@ -74,21 +62,22 @@ bool GraphDbAccessor::should_abort() const { durability::WriteAheadLog &GraphDbAccessor::wal() { return db_.wal(); } VertexAccessor GraphDbAccessor::InsertVertex( - std::experimental::optional<gid::Gid> requested_gid) { + std::experimental::optional<gid::Gid> requested_gid, + std::experimental::optional<int64_t> cypher_id) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - int64_t gid = db_.storage().vertex_generator_.Next(requested_gid); - auto vertex_vlist = new mvcc::VersionList<Vertex>(transaction_, gid); + auto gid = db_.storage().vertex_generator_.Next(requested_gid); + if (!cypher_id) cypher_id = utils::MemcpyCast<int64_t>(gid); + auto vertex_vlist = + new mvcc::VersionList<Vertex>(transaction_, gid, *cypher_id); bool success = db_.storage().vertices_.access().insert(gid, vertex_vlist).second; CHECK(success) << "Attempting to insert a vertex with an existing GID: " << gid; - wal().Emplace( - database::StateDelta::CreateVertex(transaction_.id_, vertex_vlist->gid_)); - auto va = VertexAccessor(vertex_vlist, *this); - if (FLAGS_generate_vertex_ids) - va.PropsSet(Property(PropertyValueStore::IdPropertyName), gid); + wal().Emplace(database::StateDelta::CreateVertex( + transaction_.id_, vertex_vlist->gid_, vertex_vlist->cypher_id())); + auto va = VertexAccessor(storage::VertexAddress(vertex_vlist), *this); return va; } @@ -114,8 +103,8 @@ VertexAccessor GraphDbAccessor::InsertVertexIntoRemote( std::experimental::optional<VertexAccessor> GraphDbAccessor::FindVertexOptional( gid::Gid gid, bool current_state) { - VertexAccessor record_accessor(db_.storage().LocalAddress<Vertex>(gid), - *this); + VertexAccessor record_accessor( + storage::VertexAddress(db_.storage().LocalAddress<Vertex>(gid)), *this); if (!record_accessor.Visible(transaction(), current_state)) return std::experimental::nullopt; return record_accessor; @@ -129,7 +118,8 @@ VertexAccessor GraphDbAccessor::FindVertex(gid::Gid gid, bool current_state) { std::experimental::optional<EdgeAccessor> GraphDbAccessor::FindEdgeOptional( gid::Gid gid, bool current_state) { - EdgeAccessor record_accessor(db_.storage().LocalAddress<Edge>(gid), *this); + EdgeAccessor record_accessor( + storage::EdgeAddress(db_.storage().LocalAddress<Edge>(gid)), *this); if (!record_accessor.Visible(transaction(), current_state)) return std::experimental::nullopt; return record_accessor; @@ -395,7 +385,8 @@ void GraphDbAccessor::DetachRemoveVertex(VertexAccessor &vertex_accessor) { EdgeAccessor GraphDbAccessor::InsertEdge( VertexAccessor &from, VertexAccessor &to, storage::EdgeType edge_type, - std::experimental::optional<gid::Gid> requested_gid) { + std::experimental::optional<gid::Gid> requested_gid, + std::experimental::optional<int64_t> cypher_id) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; // The address of an edge we'll create. @@ -403,8 +394,8 @@ EdgeAccessor GraphDbAccessor::InsertEdge( Vertex *from_updated; if (from.is_local()) { - auto edge_accessor = - InsertOnlyEdge(from.address(), to.address(), edge_type, requested_gid); + auto edge_accessor = InsertOnlyEdge(from.address(), to.address(), edge_type, + requested_gid, cypher_id); edge_address = edge_accessor.address(), from.SwitchNew(); @@ -414,8 +405,8 @@ EdgeAccessor GraphDbAccessor::InsertEdge( // `CREATE_EDGE`, but always have it split into 3 parts (edge insertion, // in/out modification). wal().Emplace(database::StateDelta::CreateEdge( - transaction_.id_, edge_accessor.gid(), from.gid(), to.gid(), edge_type, - EdgeTypeName(edge_type))); + transaction_.id_, edge_accessor.gid(), edge_accessor.cypher_id(), + from.gid(), to.gid(), edge_type, EdgeTypeName(edge_type))); } else { edge_address = db().updates_clients().CreateEdge(transaction_id(), from, to, @@ -466,21 +457,22 @@ EdgeAccessor GraphDbAccessor::InsertEdge( EdgeAccessor GraphDbAccessor::InsertOnlyEdge( storage::VertexAddress from, storage::VertexAddress to, storage::EdgeType edge_type, - std::experimental::optional<gid::Gid> requested_gid) { + std::experimental::optional<gid::Gid> requested_gid, + std::experimental::optional<int64_t> cypher_id) { CHECK(from.is_local()) << "`from` address should be local when calling InsertOnlyEdge"; - int64_t gid = db_.storage().edge_generator_.Next(requested_gid); - auto edge_vlist = - new mvcc::VersionList<Edge>(transaction_, gid, from, to, edge_type); + auto gid = db_.storage().edge_generator_.Next(requested_gid); + if (!cypher_id) cypher_id = utils::MemcpyCast<int64_t>(gid); + auto edge_vlist = new mvcc::VersionList<Edge>(transaction_, gid, *cypher_id, + from, to, edge_type); // We need to insert edge_vlist to edges_ before calling update since update // can throw and edge_vlist will not be garbage collected if it is not in // edges_ skiplist. bool success = db_.storage().edges_.access().insert(gid, edge_vlist).second; CHECK(success) << "Attempting to insert an edge with an existing GID: " << gid; - auto ea = EdgeAccessor(edge_vlist, *this, from, to, edge_type); - if (FLAGS_generate_edge_ids) - ea.PropsSet(Property(PropertyValueStore::IdPropertyName), gid); + auto ea = EdgeAccessor(storage::EdgeAddress(edge_vlist), *this, from, to, + edge_type); return ea; } diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index bacb710d0..9de9f1b4d 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -73,10 +73,14 @@ class GraphDbAccessor { * * @param requested_gid The requested GID. Should only be provided when * recovering from durability. + * @param cypher_id Take a look under mvcc::VersionList::cypher_id + * * @return See above. */ VertexAccessor InsertVertex(std::experimental::optional<gid::Gid> - requested_gid = std::experimental::nullopt); + requested_gid = std::experimental::nullopt, + std::experimental::optional<int64_t> cypher_id = + std::experimental::nullopt); /** Creates a new Vertex on the given worker. It is NOT allowed to call this * function with this worker's id. */ @@ -149,7 +153,7 @@ class GraphDbAccessor { // wrap version lists into accessors, which will look for visible versions auto accessors = iter::imap( [this](auto id_vlist) { - return VertexAccessor(id_vlist.second, *this); + return VertexAccessor(storage::VertexAddress(id_vlist.second), *this); }, db_.storage().vertices_.access()); @@ -174,7 +178,9 @@ class GraphDbAccessor { auto Vertices(storage::Label label, bool current_state) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; return iter::imap( - [this](auto vlist) { return VertexAccessor(vlist, *this); }, + [this](auto vlist) { + return VertexAccessor(storage::VertexAddress(vlist), *this); + }, db_.storage().labels_index_.GetVlists(label, transaction_, current_state)); } @@ -198,7 +204,9 @@ class GraphDbAccessor { LabelPropertyIndex::Key(label, property))) << "Label+property index doesn't exist."; return iter::imap( - [this](auto vlist) { return VertexAccessor(vlist, *this); }, + [this](auto vlist) { + return VertexAccessor(storage::VertexAddress(vlist), *this); + }, db_.storage().label_property_index_.GetVlists( LabelPropertyIndex::Key(label, property), transaction_, current_state)); @@ -226,7 +234,9 @@ class GraphDbAccessor { CHECK(value.type() != PropertyValue::Type::Null) << "Can't query index for propery value type null."; return iter::imap( - [this](auto vlist) { return VertexAccessor(vlist, *this); }, + [this](auto vlist) { + return VertexAccessor(storage::VertexAddress(vlist), *this); + }, db_.storage().label_property_index_.GetVlists( LabelPropertyIndex::Key(label, property), value, transaction_, current_state)); @@ -269,7 +279,9 @@ class GraphDbAccessor { LabelPropertyIndex::Key(label, property))) << "Label+property index doesn't exist."; return iter::imap( - [this](auto vlist) { return VertexAccessor(vlist, *this); }, + [this](auto vlist) { + return VertexAccessor(storage::VertexAddress(vlist), *this); + }, db_.storage().label_property_index_.GetVlists( LabelPropertyIndex::Key(label, property), lower, upper, transaction_, current_state)); @@ -291,22 +303,30 @@ class GraphDbAccessor { * @param type Edge type. * @param requested_gid The requested GID. Should only be provided when * recovering from durability. + * @param cypher_id Take a look under mvcc::VersionList::cypher_id + * * @return An accessor to the edge. */ EdgeAccessor InsertEdge(VertexAccessor &from, VertexAccessor &to, storage::EdgeType type, std::experimental::optional<gid::Gid> requested_gid = + std::experimental::nullopt, + std::experimental::optional<int64_t> cypher_id = std::experimental::nullopt); /** * Insert edge into main storage, but don't insert it into from and to * vertices edge lists. + * + * @param cypher_id Take a look under mvcc::VersionList::cypher_id */ EdgeAccessor InsertOnlyEdge(storage::VertexAddress from, storage::VertexAddress to, storage::EdgeType edge_type, std::experimental::optional<gid::Gid> - requested_gid = std::experimental::nullopt); + requested_gid = std::experimental::nullopt, + std::experimental::optional<int64_t> cypher_id = + std::experimental::nullopt); /** * Removes an edge from the graph. Parameters can indicate if the edge should @@ -364,7 +384,9 @@ class GraphDbAccessor { // wrap version lists into accessors, which will look for visible versions auto accessors = iter::imap( - [this](auto id_vlist) { return EdgeAccessor(id_vlist.second, *this); }, + [this](auto id_vlist) { + return EdgeAccessor(storage::EdgeAddress(id_vlist.second), *this); + }, db_.storage().edges_.access()); // filter out the accessors not visible to the current transaction diff --git a/src/database/state_delta.cpp b/src/database/state_delta.cpp index b10cf89cd..2d448f493 100644 --- a/src/database/state_delta.cpp +++ b/src/database/state_delta.cpp @@ -18,20 +18,22 @@ StateDelta StateDelta::TxAbort(tx::TransactionId tx_id) { return {StateDelta::Type::TRANSACTION_ABORT, tx_id}; } -StateDelta StateDelta::CreateVertex(tx::TransactionId tx_id, - gid::Gid vertex_id) { +StateDelta StateDelta::CreateVertex(tx::TransactionId tx_id, gid::Gid vertex_id, + int64_t cypher_id) { StateDelta op(StateDelta::Type::CREATE_VERTEX, tx_id); op.vertex_id = vertex_id; + op.cypher_id = cypher_id; return op; } StateDelta StateDelta::CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id, - gid::Gid vertex_from_id, + int64_t cypher_id, gid::Gid vertex_from_id, gid::Gid vertex_to_id, storage::EdgeType edge_type, const std::string &edge_type_name) { StateDelta op(StateDelta::Type::CREATE_EDGE, tx_id); op.edge_id = edge_id; + op.cypher_id = cypher_id; op.vertex_from_id = vertex_from_id; op.vertex_to_id = vertex_to_id; op.edge_type = edge_type; @@ -39,8 +41,7 @@ StateDelta StateDelta::CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id, return op; } -StateDelta StateDelta::AddOutEdge(tx::TransactionId tx_id, - gid::Gid vertex_id, +StateDelta StateDelta::AddOutEdge(tx::TransactionId tx_id, gid::Gid vertex_id, storage::VertexAddress vertex_to_address, storage::EdgeAddress edge_address, storage::EdgeType edge_type) { @@ -78,8 +79,7 @@ StateDelta StateDelta::AddInEdge(tx::TransactionId tx_id, gid::Gid vertex_id, return op; } -StateDelta StateDelta::RemoveInEdge(tx::TransactionId tx_id, - gid::Gid vertex_id, +StateDelta StateDelta::RemoveInEdge(tx::TransactionId tx_id, gid::Gid vertex_id, storage::EdgeAddress edge_address) { CHECK(edge_address.is_remote()) << "WAL can only contain global addresses."; StateDelta op(StateDelta::Type::REMOVE_IN_EDGE, tx_id); @@ -101,8 +101,7 @@ StateDelta StateDelta::PropsSetVertex(tx::TransactionId tx_id, return op; } -StateDelta StateDelta::PropsSetEdge(tx::TransactionId tx_id, - gid::Gid edge_id, +StateDelta StateDelta::PropsSetEdge(tx::TransactionId tx_id, gid::Gid edge_id, storage::Property property, const std::string &property_name, const PropertyValue &value) { @@ -124,8 +123,8 @@ StateDelta StateDelta::AddLabel(tx::TransactionId tx_id, gid::Gid vertex_id, return op; } -StateDelta StateDelta::RemoveLabel(tx::TransactionId tx_id, - gid::Gid vertex_id, storage::Label label, +StateDelta StateDelta::RemoveLabel(tx::TransactionId tx_id, gid::Gid vertex_id, + storage::Label label, const std::string &label_name) { StateDelta op(StateDelta::Type::REMOVE_LABEL, tx_id); op.vertex_id = vertex_id; @@ -134,23 +133,21 @@ StateDelta StateDelta::RemoveLabel(tx::TransactionId tx_id, return op; } -StateDelta StateDelta::RemoveVertex(tx::TransactionId tx_id, - gid::Gid vertex_id, bool check_empty) { +StateDelta StateDelta::RemoveVertex(tx::TransactionId tx_id, gid::Gid vertex_id, + bool check_empty) { StateDelta op(StateDelta::Type::REMOVE_VERTEX, tx_id); op.vertex_id = vertex_id; op.check_empty = check_empty; return op; } -StateDelta StateDelta::RemoveEdge(tx::TransactionId tx_id, - gid::Gid edge_id) { +StateDelta StateDelta::RemoveEdge(tx::TransactionId tx_id, gid::Gid edge_id) { StateDelta op(StateDelta::Type::REMOVE_EDGE, tx_id); op.edge_id = edge_id; return op; } -StateDelta StateDelta::BuildIndex(tx::TransactionId tx_id, - storage::Label label, +StateDelta StateDelta::BuildIndex(tx::TransactionId tx_id, storage::Label label, const std::string &label_name, storage::Property property, const std::string &property_name) { @@ -175,9 +172,11 @@ void StateDelta::Encode( break; case Type::CREATE_VERTEX: encoder.WriteInt(vertex_id); + encoder.WriteInt(cypher_id); break; case Type::CREATE_EDGE: encoder.WriteInt(edge_id); + encoder.WriteInt(cypher_id); encoder.WriteInt(vertex_from_id); encoder.WriteInt(vertex_to_id); encoder.WriteInt(edge_type.Id()); @@ -267,9 +266,11 @@ std::experimental::optional<StateDelta> StateDelta::Decode( break; case Type::CREATE_VERTEX: DECODE_MEMBER(vertex_id, ValueInt) + DECODE_MEMBER(cypher_id, ValueInt) break; case Type::CREATE_EDGE: DECODE_MEMBER(edge_id, ValueInt) + DECODE_MEMBER(cypher_id, ValueInt) DECODE_MEMBER(vertex_from_id, ValueInt) DECODE_MEMBER(vertex_to_id, ValueInt) DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType) @@ -354,12 +355,13 @@ void StateDelta::Apply(GraphDbAccessor &dba) const { LOG(FATAL) << "Transaction handling not handled in Apply"; break; case Type::CREATE_VERTEX: - dba.InsertVertex(vertex_id); + dba.InsertVertex(vertex_id, cypher_id); break; case Type::CREATE_EDGE: { auto from = dba.FindVertex(vertex_from_id, true); auto to = dba.FindVertex(vertex_to_id, true); - dba.InsertEdge(from, to, dba.EdgeType(edge_type_name), edge_id); + dba.InsertEdge(from, to, dba.EdgeType(edge_type_name), edge_id, + cypher_id); break; } case Type::ADD_OUT_EDGE: diff --git a/src/database/state_delta.lcp b/src/database/state_delta.lcp index f395f4c3c..9d3daee26 100644 --- a/src/database/state_delta.lcp +++ b/src/database/state_delta.lcp @@ -39,6 +39,7 @@ cpp<# ;; only keep addresses. (vertex-id "gid::Gid") (edge-id "gid::Gid") + (cypher-id :int64_t) (edge-address "storage::EdgeAddress") (vertex-from-id "gid::Gid") (vertex-from-address "storage::VertexAddress") @@ -130,9 +131,12 @@ omitted in the comment.") static StateDelta TxCommit(tx::TransactionId tx_id); static StateDelta TxAbort(tx::TransactionId tx_id); static StateDelta CreateVertex(tx::TransactionId tx_id, - gid::Gid vertex_id); + gid::Gid vertex_id, + int64_t cypher_id); static StateDelta CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id, - gid::Gid vertex_from_id, gid::Gid vertex_to_id, + int64_t cypher_id, + gid::Gid vertex_from_id, + gid::Gid vertex_to_id, storage::EdgeType edge_type, const std::string &edge_type_name); static StateDelta AddOutEdge(tx::TransactionId tx_id, gid::Gid vertex_id, diff --git a/src/database/storage.hpp b/src/database/storage.hpp index 1d5d50e96..d3f6b850a 100644 --- a/src/database/storage.hpp +++ b/src/database/storage.hpp @@ -28,7 +28,7 @@ struct RecoveryInfo; RecoveryInfo Recover(const std::experimental::filesystem::path &, database::GraphDb &, std::experimental::optional<RecoveryInfo>); -}; +}; // namespace durability namespace database { @@ -77,7 +77,11 @@ class Storage { storage::Address<mvcc::VersionList<TRecord>> address) const { if (address.is_local()) return address; if (address.worker_id() == worker_id_) { - return LocalAddress<TRecord>(address.gid()); + auto vlist = LocalAddress<TRecord>(address.gid()); + if constexpr (std::is_same<TRecord, Vertex>::value) + return storage::VertexAddress(vlist); + else + return storage::EdgeAddress(vlist); } return address; } diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp index b09607dea..4006e2993 100644 --- a/src/durability/recovery.cpp +++ b/src/durability/recovery.cpp @@ -121,10 +121,13 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, std::unordered_map<gid::Gid, std::pair<storage::VertexAddress, storage::VertexAddress>> edge_gid_endpoints_mapping; + for (int64_t i = 0; i < vertex_count; ++i) { auto vertex = decoder.ReadSnapshotVertex(); RETURN_IF_NOT(vertex); - auto vertex_accessor = dba.InsertVertex(vertex->gid); + + auto vertex_accessor = + dba.InsertVertex(vertex->gid, vertex->cypher_id); for (const auto &label : vertex->labels) { vertex_accessor.add_label(dba.Label(label)); } @@ -169,10 +172,18 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, } }; + DecodedValue dv_cypher_id; + for (int64_t i = 0; i < edge_count; ++i) { RETURN_IF_NOT( decoder.ReadValue(&dv, communication::bolt::DecodedValue::Type::Edge)); auto &edge = dv.ValueEdge(); + + // Read cypher_id + RETURN_IF_NOT(decoder.ReadValue( + &dv_cypher_id, communication::bolt::DecodedValue::Type::Int)); + auto cypher_id = dv_cypher_id.ValueInt(); + // We have to take full edge endpoints from vertices since the endpoints // found here don't containt worker_id, and this can't be changed since this // edges must be bolt-compliant @@ -187,8 +198,8 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, vertex_transform_to_local_if_possible(from); vertex_transform_to_local_if_possible(to); - auto edge_accessor = - dba.InsertOnlyEdge(from, to, dba.EdgeType(edge.type), edge.id); + auto edge_accessor = dba.InsertOnlyEdge(from, to, dba.EdgeType(edge.type), + edge.id, cypher_id); for (const auto &property_pair : edge.properties) edge_accessor.PropsSet(dba.Property(property_pair.first), diff --git a/src/durability/snapshooter.cpp b/src/durability/snapshooter.cpp index c74a273a5..0b523a501 100644 --- a/src/durability/snapshooter.cpp +++ b/src/durability/snapshooter.cpp @@ -16,7 +16,7 @@ namespace fs = std::experimental::filesystem; namespace durability { // Snapshot layout is described in durability/version.hpp -static_assert(durability::kVersion == 5, +static_assert(durability::kVersion == 6, "Wrong snapshot version, please update!"); namespace { @@ -68,6 +68,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db, } for (const auto &edge : dba.Edges(false)) { encoder.WriteEdge(edge); + encoder.WriteInt(edge.cypher_id()); edge_num++; } buffer.WriteValue(vertex_num); diff --git a/src/durability/snapshot_decoded_value.hpp b/src/durability/snapshot_decoded_value.hpp index a728e60c9..9f8f25202 100644 --- a/src/durability/snapshot_decoded_value.hpp +++ b/src/durability/snapshot_decoded_value.hpp @@ -22,6 +22,7 @@ struct DecodedInlinedVertexEdge; */ struct DecodedSnapshotVertex { gid::Gid gid; + int64_t cypher_id; std::vector<std::string> labels; std::map<std::string, communication::bolt::DecodedValue> properties; // Vector of edges without properties diff --git a/src/durability/snapshot_decoder.hpp b/src/durability/snapshot_decoder.hpp index 2a1382188..e6a9d45a5 100644 --- a/src/durability/snapshot_decoder.hpp +++ b/src/durability/snapshot_decoder.hpp @@ -17,37 +17,45 @@ class SnapshotDecoder : public communication::bolt::Decoder<Buffer> { communication::bolt::DecodedValue dv; DecodedSnapshotVertex vertex; + // Read global id, labels and properties of the vertex if (!communication::bolt::Decoder<Buffer>::ReadValue( &dv, communication::bolt::DecodedValue::Type::Vertex)) { DLOG(WARNING) << "Unable to read snapshot vertex"; return std::experimental::nullopt; } - auto &read_vertex = dv.ValueVertex(); - vertex.gid = read_vertex.id; + vertex.gid = static_cast<uint64_t>(read_vertex.id); vertex.labels = read_vertex.labels; vertex.properties = read_vertex.properties; + // Read cypher_id + if (!communication::bolt::Decoder<Buffer>::ReadValue( + &dv, communication::bolt::DecodedValue::Type::Int)) { + DLOG(WARNING) << "Unable to read vertex cypher_id"; + return std::experimental::nullopt; + } + vertex.cypher_id = dv.ValueInt(); + + // Read in edges if (!communication::bolt::Decoder<Buffer>::ReadValue( &dv, communication::bolt::DecodedValue::Type::Int)) { DLOG(WARNING) << "[ReadSnapshotVertex] Couldn't read number of in " "edges in vertex!"; return std::experimental::nullopt; } - for (int i = 0; i < dv.ValueInt(); ++i) { auto edge = ReadSnapshotEdge(); if (!edge) return std::experimental::nullopt; vertex.in.emplace_back(*edge); } + // Read out edges if (!communication::bolt::Decoder<Buffer>::ReadValue( &dv, communication::bolt::DecodedValue::Type::Int)) { DLOG(WARNING) << "[ReadSnapshotVertex] Couldn't read number of out " "edges in vertex!"; return std::experimental::nullopt; } - for (int i = 0; i < dv.ValueInt(); ++i) { auto edge = ReadSnapshotEdge(); if (!edge) return std::experimental::nullopt; @@ -65,23 +73,24 @@ class SnapshotDecoder : public communication::bolt::Decoder<Buffer> { VLOG(20) << "[ReadSnapshotEdge] Start"; - // read ID + // Read global id of this edge if (!communication::bolt::Decoder<Buffer>::ReadValue( &dv, communication::bolt::DecodedValue::Type::Int)) { - DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read ID!"; + DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read Global ID!"; return std::experimental::nullopt; } + edge.address = storage::EdgeAddress(static_cast<uint64_t>(dv.ValueInt())); - edge.address = dv.ValueInt(); - // read other side + // Read global vertex id of the other side of the edge + // (global id of from/to vertexes). if (!communication::bolt::Decoder<Buffer>::ReadValue( &dv, communication::bolt::DecodedValue::Type::Int)) { - DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read from address!"; + DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read from/to address!"; return std::experimental::nullopt; } - edge.vertex = dv.ValueInt(); + edge.vertex = storage::VertexAddress(static_cast<uint64_t>(dv.ValueInt())); - // read type + // Read edge type if (!communication::bolt::Decoder<Buffer>::ReadValue( &dv, communication::bolt::DecodedValue::Type::String)) { DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read type!"; @@ -90,7 +99,6 @@ class SnapshotDecoder : public communication::bolt::Decoder<Buffer> { edge.type = dv.ValueString(); VLOG(20) << "[ReadSnapshotEdge] Success"; - return edge; } }; diff --git a/src/durability/snapshot_encoder.hpp b/src/durability/snapshot_encoder.hpp index 8ebed3fc6..cbcf05a23 100644 --- a/src/durability/snapshot_encoder.hpp +++ b/src/durability/snapshot_encoder.hpp @@ -1,6 +1,7 @@ #pragma once #include "communication/bolt/v1/encoder/base_encoder.hpp" +#include "utils/cast.hpp" namespace durability { @@ -12,14 +13,17 @@ class SnapshotEncoder : public communication::bolt::BaseEncoder<Buffer> { void WriteSnapshotVertex(const VertexAccessor &vertex) { communication::bolt::BaseEncoder<Buffer>::WriteVertex(vertex); - // write in edges without properties + // Write cypher_id + this->WriteInt(vertex.cypher_id()); + + // Write in edges without properties this->WriteUInt(vertex.in_degree()); auto edges_in = vertex.in(); for (const auto &edge : edges_in) { this->WriteSnapshotEdge(edge, true); } - // write out edges without properties + // Write out edges without properties this->WriteUInt(vertex.out_degree()); auto edges_out = vertex.out(); for (const auto &edge : edges_out) { @@ -29,18 +33,21 @@ class SnapshotEncoder : public communication::bolt::BaseEncoder<Buffer> { private: void WriteUInt(const uint64_t &value) { - this->WriteInt(*reinterpret_cast<const int64_t *>(&value)); + this->WriteInt(utils::MemcpyCast<int64_t>(value)); } // Writes edge without properties void WriteSnapshotEdge(const EdgeAccessor &edge, bool write_from) { + // Write global id of the edge WriteUInt(edge.GlobalAddress().raw()); + + // Write to/from global id if (write_from) WriteUInt(edge.from().GlobalAddress().raw()); else WriteUInt(edge.to().GlobalAddress().raw()); - // write type + // Write type this->WriteString(edge.db_accessor().EdgeTypeName(edge.EdgeType())); } }; diff --git a/src/durability/version.hpp b/src/durability/version.hpp index ec372abd1..d4517b032 100644 --- a/src/durability/version.hpp +++ b/src/durability/version.hpp @@ -1,5 +1,11 @@ #pragma once +/// +/// +/// IMPORTANT: Please update this file for every snapshot format change!!! +/// TODO (buda): This is not rock solid. +/// + #include <array> #include <cstdint> @@ -8,9 +14,9 @@ namespace durability { constexpr std::array<uint8_t, 4> kMagicNumber{{'M', 'G', 's', 'n'}}; // The current default version of snapshot and WAL encoding / decoding. -constexpr int64_t kVersion{5}; +constexpr int64_t kVersion{6}; -// Snapshot format (version 5): +// Snapshot format (version 6): // 1) Magic number + snapshot version // 2) Distributed worker ID // @@ -29,9 +35,16 @@ constexpr int64_t kVersion{5}; // // We must inline edges with nodes because some edges might be stored on other // worker (edges are always stored only on the worker of the edge source). -// 8) Bolt encoded nodes + inlined edges (edge address, other endpoint address -// and edge type) -// 9) Bolt encoded edges +// 8) Bolt encoded nodes. Each node is written in the following format: +// * gid, labels, properties +// * cypher_id +// * inlined edges (edge address, other endpoint address and edge type) +// 9) Bolt encoded edges. Each edge is written in the following format: +// * gid +// * from, to +// * edge_type +// * properties +// * cypher_id // // 10) Snapshot summary (number of nodes, number of edges, hash) diff --git a/src/mvcc/version_list.hpp b/src/mvcc/version_list.hpp index 9f63e0fe5..73d186621 100644 --- a/src/mvcc/version_list.hpp +++ b/src/mvcc/version_list.hpp @@ -21,15 +21,18 @@ class VersionList { public: /** * @brief Constructor that is used to insert one item into VersionList. + * * @param t - transaction * @param gid - Version list identifier. Uniqueness guaranteed by the code * creating this version list. + * @param cypher_id - Number returned from the id function. * @param args - args forwarded to constructor of item T (for * creating the first Record (Version) in this VersionList. */ template <typename... Args> - VersionList(const tx::Transaction &t, gid::Gid gid, Args &&... args) - : gid_(gid) { + VersionList(const tx::Transaction &t, gid::Gid gid, int64_t cypher_id, + Args &&... args) + : gid_(gid), cypher_id_(cypher_id) { // TODO replace 'new' with something better auto *v1 = new T(std::forward<Args>(args)...); v1->mark_created(t); @@ -221,8 +224,13 @@ class VersionList { record->mark_expired(t); } + /** + * TODO (buda): Try to move git_ to storage::Address. + */ const gid::Gid gid_; + auto cypher_id() { return cypher_id_; } + private: void lock_and_validate(T *record, const tx::Transaction &t) { DCHECK(record != nullptr) << "Record is nullptr on lock and validation."; @@ -260,6 +268,18 @@ class VersionList { return updated; } + /** + * The following member is here because Memgraph supports ID function from + * the Cypher query language. If you have plans to change this you have to + * consider the following: + * * If the id has to be durable. -> Snapshot and WAL have to be updated. + * * Impact on query execution. | + * * Impact on the communication stack. |-> The id has to be returned + * to the client. + * * Import tools bacause of the dependencies on the durability stack. + * * Implications on the distributed system. + */ + int64_t cypher_id_{0}; std::atomic<T *> head_{nullptr}; RecordLock lock_; }; diff --git a/src/query/interpret/awesome_memgraph_functions.cpp b/src/query/interpret/awesome_memgraph_functions.cpp index 39abf6196..dbfe333d7 100644 --- a/src/query/interpret/awesome_memgraph_functions.cpp +++ b/src/query/interpret/awesome_memgraph_functions.cpp @@ -610,24 +610,10 @@ TypedValue Id(const std::vector<TypedValue> &args, Context *ctx) { auto &arg = args[0]; switch (arg.type()) { case TypedValue::Type::Vertex: { - auto id = arg.ValueVertex().PropsAt( - ctx->db_accessor_.Property(PropertyValueStore::IdPropertyName)); - if (id.IsNull()) { - throw QueryRuntimeException( - "IDs are not set on vertices, --generate-vertex-ids flag must be " - "set on startup to automatically generate them"); - } - return id.Value<int64_t>(); + return TypedValue(arg.ValueVertex().cypher_id()); } case TypedValue::Type::Edge: { - auto id = arg.ValueEdge().PropsAt( - ctx->db_accessor_.Property(PropertyValueStore::IdPropertyName)); - if (id.IsNull()) { - throw QueryRuntimeException( - "IDs are not set on edges, --generate-edge-ids flag must be set on " - "startup to automatically generate them"); - } - return id.Value<int64_t>(); + return TypedValue(arg.ValueEdge().cypher_id()); } default: throw QueryRuntimeException("id argument must be a vertex or an edge"); diff --git a/src/storage/address.hpp b/src/storage/address.hpp index ff6d7bd8a..954efbf1b 100644 --- a/src/storage/address.hpp +++ b/src/storage/address.hpp @@ -4,8 +4,8 @@ #include "glog/logging.h" -#include "storage/serialization.capnp.h" #include "storage/gid.hpp" +#include "storage/serialization.capnp.h" namespace storage { @@ -41,10 +41,10 @@ class Address { Address() {} // Constructor for raw address value - Address(StorageT storage) : storage_(storage) {} + explicit Address(StorageT storage) : storage_(storage) {} // Constructor for local Address. - Address(TLocalObj *ptr) { + explicit Address(TLocalObj *ptr) { uintptr_t ptr_no_type = reinterpret_cast<uintptr_t>(ptr); DCHECK((ptr_no_type & kTypeMask) == 0) << "Ptr has type_mask bit set"; storage_ = ptr_no_type | kLocal; diff --git a/src/storage/record_accessor.hpp b/src/storage/record_accessor.hpp index 6bb6c7d88..b95cf5407 100644 --- a/src/storage/record_accessor.hpp +++ b/src/storage/record_accessor.hpp @@ -150,6 +150,13 @@ class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> { * owner is some other worker in a distributed system. */ bool is_local() const { return address_.is_local(); } + int64_t cypher_id() const { + if (address_.is_local()) + return address_.local()->cypher_id(); + else + throw utils::NotYetImplemented("Fetch remote cypher_id"); + } + protected: /** * Sends delta for remote processing. diff --git a/src/storage/vertex_accessor.hpp b/src/storage/vertex_accessor.hpp index 3393d6f02..ae5a95652 100644 --- a/src/storage/vertex_accessor.hpp +++ b/src/storage/vertex_accessor.hpp @@ -100,9 +100,9 @@ class VertexAccessor : public RecordAccessor<Vertex> { * or empty, the parameter is ignored. */ auto in(const std::vector<storage::EdgeType> *edge_types) const { - return MakeAccessorIterator(current().in_.begin(nullptr, edge_types), - current().in_.end(), false, address(), - db_accessor()); + return MakeAccessorIterator( + current().in_.begin(storage::VertexAddress(nullptr), edge_types), + current().in_.end(), false, address(), db_accessor()); } /** Returns EdgeAccessors for all outgoing edges. */ @@ -133,9 +133,9 @@ class VertexAccessor : public RecordAccessor<Vertex> { * or empty, the parameter is ignored. */ auto out(const std::vector<storage::EdgeType> *edge_types) const { - return MakeAccessorIterator(current().out_.begin(nullptr, edge_types), - current().out_.end(), true, address(), - db_accessor()); + return MakeAccessorIterator( + current().out_.begin(storage::VertexAddress(nullptr), edge_types), + current().out_.end(), true, address(), db_accessor()); } /** Removes the given edge from the outgoing edges of this vertex. Note that diff --git a/src/utils/cast.hpp b/src/utils/cast.hpp new file mode 100644 index 000000000..d4886f1eb --- /dev/null +++ b/src/utils/cast.hpp @@ -0,0 +1,37 @@ +#pragma once + +#include <cstdint> +#include <cstring> +#include <type_traits> + +namespace utils { + +template <typename T> +constexpr typename std::underlying_type<T>::type UnderlyingCast(T e) { + return static_cast<typename std::underlying_type<T>::type>(e); +} + +/** + * uint to int conversion in C++ is a bit tricky. Take a look here + * https://stackoverflow.com/questions/14623266/why-cant-i-reinterpret-cast-uint-to-int + * for more details. + * + * @tparam TDest Returned datatype. + * @tparam TSrc Input datatype. + * + * @return "copy casted" value. + */ +template <typename TDest, typename TSrc> +TDest MemcpyCast(TSrc src) { + TDest dest; + static_assert(sizeof(dest) == sizeof(src), + "MemcpyCast expects source and destination to be of same size"); + static_assert(std::is_arithmetic<TSrc>::value, + "MemcpyCast expects source is an arithmetic type"); + static_assert(std::is_arithmetic<TDest>::value, + "MemcypCast expects destination is an arithmetic type"); + std::memcpy(&dest, &src, sizeof(src)); + return dest; +} + +} // namespace utils diff --git a/src/utils/underlying_cast.hpp b/src/utils/underlying_cast.hpp deleted file mode 100644 index 19a0d3a5f..000000000 --- a/src/utils/underlying_cast.hpp +++ /dev/null @@ -1,12 +0,0 @@ -#pragma once - -#include <type_traits> - -namespace utils { - -template <typename T> -constexpr typename std::underlying_type<T>::type UnderlyingCast(T e) { - return static_cast<typename std::underlying_type<T>::type>(e); -} - -} // namespace utils diff --git a/tests/benchmark/mvcc.cpp b/tests/benchmark/mvcc.cpp index 0bfffa23f..1f3ad3b28 100644 --- a/tests/benchmark/mvcc.cpp +++ b/tests/benchmark/mvcc.cpp @@ -21,7 +21,7 @@ void MvccMix(benchmark::State &state) { state.PauseTiming(); tx::SingleNodeEngine engine; auto t1 = engine.Begin(); - mvcc::VersionList<Prop> version_list(*t1, 0); + mvcc::VersionList<Prop> version_list(*t1, 0, 0); engine.Commit(*t1); auto t2 = engine.Begin(); diff --git a/tests/manual/snapshot_generation/snapshot_writer.hpp b/tests/manual/snapshot_generation/snapshot_writer.hpp index 2833f3a99..ec9f8d16e 100644 --- a/tests/manual/snapshot_generation/snapshot_writer.hpp +++ b/tests/manual/snapshot_generation/snapshot_writer.hpp @@ -14,7 +14,7 @@ namespace snapshot_generation { // Snapshot layout is described in durability/version.hpp -static_assert(durability::kVersion == 5, +static_assert(durability::kVersion == 6, "Wrong snapshot version, please update!"); class SnapshotWriter { @@ -69,6 +69,9 @@ class SnapshotWriter { WriteList(node.labels); encoder_.WriteMap(node.props); + // cypher_id + encoder_.WriteInt(utils::MemcpyCast<int64_t>(node.gid)); + encoder_.WriteInt(node.in_edges.size()); for (const auto &edge_idx : node.in_edges) { WriteInlineEdge(edges.at(edge_idx), true); @@ -93,6 +96,9 @@ class SnapshotWriter { encoder_.WriteString(edge.type); encoder_.WriteMap(edge.props); + // cypher_id + encoder_.WriteInt(utils::MemcpyCast<int64_t>(edge.gid)); + ++edges_written_; } diff --git a/tests/unit/database_key_index.cpp b/tests/unit/database_key_index.cpp index e4fb1bde7..c87b452a5 100644 --- a/tests/unit/database_key_index.cpp +++ b/tests/unit/database_key_index.cpp @@ -19,7 +19,7 @@ TEST(LabelsIndex, UniqueInsert) { tx::SingleNodeEngine engine; auto t1 = engine.Begin(); - mvcc::VersionList<Vertex> vlist(*t1, 0); + mvcc::VersionList<Vertex> vlist(*t1, 0, 0); engine.Commit(*t1); auto t2 = engine.Begin(); @@ -48,8 +48,8 @@ TEST(LabelsIndex, UniqueFilter) { tx::SingleNodeEngine engine; auto t1 = engine.Begin(); - mvcc::VersionList<Vertex> vlist1(*t1, 0); - mvcc::VersionList<Vertex> vlist2(*t1, 1); + mvcc::VersionList<Vertex> vlist1(*t1, 0, 0); + mvcc::VersionList<Vertex> vlist2(*t1, 1, 1); engine.Advance(t1->id_); auto r1v1 = vlist1.find(*t1); auto r1v2 = vlist2.find(*t1); @@ -89,8 +89,8 @@ TEST(LabelsIndex, Refresh) { // add two vertices to database auto t1 = engine.Begin(); - mvcc::VersionList<Vertex> vlist1(*t1, 0); - mvcc::VersionList<Vertex> vlist2(*t1, 1); + mvcc::VersionList<Vertex> vlist1(*t1, 0, 0); + mvcc::VersionList<Vertex> vlist2(*t1, 1, 1); engine.Advance(t1->id_); auto v1r1 = vlist1.find(*t1); diff --git a/tests/unit/database_label_property_index.cpp b/tests/unit/database_label_property_index.cpp index c0d022d56..a22c9e1c9 100644 --- a/tests/unit/database_label_property_index.cpp +++ b/tests/unit/database_label_property_index.cpp @@ -24,7 +24,7 @@ class LabelPropertyIndexComplexTest : public ::testing::Test { index.IndexFinishedBuilding(*key); t = engine.Begin(); - vlist = new mvcc::VersionList<Vertex>(*t, 0); + vlist = new mvcc::VersionList<Vertex>(*t, 0, 0); engine.Advance(t->id_); vertex = vlist->find(*t); diff --git a/tests/unit/distributed_serialization.cpp b/tests/unit/distributed_serialization.cpp index 4f9cb83ef..9139d1153 100644 --- a/tests/unit/distributed_serialization.cpp +++ b/tests/unit/distributed_serialization.cpp @@ -121,26 +121,44 @@ class DistributedSerializationMvcc : public ::testing::Test { protected: tx::SingleNodeEngine engine; tx::Transaction *tx = engine.Begin(); - mvcc::VersionList<Vertex> v1_vlist{*tx, 0}; + mvcc::VersionList<Vertex> v1_vlist{*tx, 0, 0}; Vertex &v1 = *v1_vlist.Oldest(); - mvcc::VersionList<Vertex> v2_vlist{*tx, 1}; + mvcc::VersionList<Vertex> v2_vlist{*tx, 1, 1}; Vertex &v2 = *v2_vlist.Oldest(); - mvcc::VersionList<Edge> e1_vlist{*tx, 0, &v1_vlist, &v2_vlist, EdgeType(0)}; + mvcc::VersionList<Edge> e1_vlist{*tx, + 0, + 0, + storage::VertexAddress(&v1_vlist), + storage::VertexAddress(&v2_vlist), + EdgeType(0)}; Edge &e1 = *e1_vlist.Oldest(); - mvcc::VersionList<Edge> e2_vlist{*tx, 1, &v2_vlist, &v1_vlist, EdgeType(2)}; + mvcc::VersionList<Edge> e2_vlist{*tx, + 1, + 1, + storage::VertexAddress(&v2_vlist), + storage::VertexAddress(&v1_vlist), + EdgeType(2)}; Edge &e2 = *e2_vlist.Oldest(); }; TEST_F(DistributedSerializationMvcc, VertexEdges) { - UPDATE_AND_CHECK_V(v1, v1.out_.emplace(&v2_vlist, &e1_vlist, EdgeType(0))); - UPDATE_AND_CHECK_V(v2, v2.in_.emplace(&v1_vlist, &e1_vlist, EdgeType(0))); - UPDATE_AND_CHECK_V(v1, v1.in_.emplace(&v2_vlist, &e2_vlist, EdgeType(2))); - UPDATE_AND_CHECK_V(v2, v2.out_.emplace(&v1_vlist, &e2_vlist, EdgeType(2))); + UPDATE_AND_CHECK_V( + v1, v1.out_.emplace(storage::VertexAddress(&v2_vlist), + storage::EdgeAddress(&e1_vlist), EdgeType(0))); + UPDATE_AND_CHECK_V( + v2, v2.in_.emplace(storage::VertexAddress(&v1_vlist), + storage::EdgeAddress(&e1_vlist), EdgeType(0))); + UPDATE_AND_CHECK_V( + v1, v1.in_.emplace(storage::VertexAddress(&v2_vlist), + storage::EdgeAddress(&e2_vlist), EdgeType(2))); + UPDATE_AND_CHECK_V( + v2, v2.out_.emplace(storage::VertexAddress(&v1_vlist), + storage::EdgeAddress(&e2_vlist), EdgeType(2))); } TEST_F(DistributedSerializationMvcc, EdgeFromAndTo) { - UPDATE_AND_CHECK_E(e1, e1.from_ = &v2_vlist); - UPDATE_AND_CHECK_E(e1, e1.to_ = &v1_vlist); + UPDATE_AND_CHECK_E(e1, e1.from_ = storage::VertexAddress(&v2_vlist)); + UPDATE_AND_CHECK_E(e1, e1.to_ = storage::VertexAddress(&v1_vlist)); } TEST_F(DistributedSerializationMvcc, EdgeType) { diff --git a/tests/unit/durability.cpp b/tests/unit/durability.cpp index 436f26f3c..f767a4dd8 100644 --- a/tests/unit/durability.cpp +++ b/tests/unit/durability.cpp @@ -483,6 +483,9 @@ TEST_F(Durability, SnapshotEncoding) { ASSERT_EQ(dv.type(), communication::bolt::DecodedValue::Type::Edge); auto &edge = dv.ValueEdge(); decoded_edges.emplace(edge.id, edge); + // Read cypher_id. + decoder.ReadValue(&dv); + ASSERT_EQ(dv.type(), communication::bolt::DecodedValue::Type::Int); } EXPECT_EQ(decoded_edges.size(), 2); EXPECT_EQ(decoded_edges[gid0].from, gid0); @@ -823,8 +826,9 @@ TEST_F(Durability, SequentialRecovery) { return threads; }; - auto make_updates = [&run_updates, this]( - database::GraphDb &db, bool snapshot_during, bool snapshot_after) { + auto make_updates = [&run_updates, this](database::GraphDb &db, + bool snapshot_during, + bool snapshot_after) { std::atomic<bool> keep_running{true}; auto update_theads = run_updates(db, keep_running); std::this_thread::sleep_for(25ms); diff --git a/tests/unit/mvcc.cpp b/tests/unit/mvcc.cpp index 8f9c5b147..149f64f80 100644 --- a/tests/unit/mvcc.cpp +++ b/tests/unit/mvcc.cpp @@ -14,8 +14,8 @@ TEST(MVCC, Deadlock) { tx::SingleNodeEngine engine; auto t0 = engine.Begin(); - mvcc::VersionList<Prop> version_list1(*t0, 0); - mvcc::VersionList<Prop> version_list2(*t0, 1); + mvcc::VersionList<Prop> version_list1(*t0, 0, 0); + mvcc::VersionList<Prop> version_list2(*t0, 1, 1); engine.Commit(*t0); auto t1 = engine.Begin(); @@ -33,7 +33,7 @@ TEST(MVCC, UpdateDontDelete) { { tx::SingleNodeEngine engine; auto t1 = engine.Begin(); - mvcc::VersionList<DestrCountRec> version_list(*t1, 0, count); + mvcc::VersionList<DestrCountRec> version_list(*t1, 0, 0, count); engine.Commit(*t1); auto t2 = engine.Begin(); @@ -57,7 +57,7 @@ TEST(MVCC, UpdateDontDelete) { TEST(MVCC, Oldest) { tx::SingleNodeEngine engine; auto t1 = engine.Begin(); - mvcc::VersionList<Prop> version_list(*t1, 0); + mvcc::VersionList<Prop> version_list(*t1, 0, 0); auto first = version_list.Oldest(); EXPECT_NE(first, nullptr); // TODO Gleich: no need to do 10 checks of the same thing diff --git a/tests/unit/mvcc_find_update_common.hpp b/tests/unit/mvcc_find_update_common.hpp index 82a66a4a7..0586d38e6 100644 --- a/tests/unit/mvcc_find_update_common.hpp +++ b/tests/unit/mvcc_find_update_common.hpp @@ -59,7 +59,7 @@ class Mvcc : public ::testing::Test { int version_list_size = 0; tx::SingleNodeEngine engine; tx::Transaction *t1 = engine.Begin(); - mvcc::VersionList<TestClass> version_list{*t1, 0, version_list_size}; + mvcc::VersionList<TestClass> version_list{*t1, 0, 0, version_list_size}; TestClass *v1 = nullptr; tx::Transaction *t2 = nullptr; tx::TransactionId id0, id1, id2; diff --git a/tests/unit/mvcc_gc.cpp b/tests/unit/mvcc_gc.cpp index d1e2f6226..a3b2bea3c 100644 --- a/tests/unit/mvcc_gc.cpp +++ b/tests/unit/mvcc_gc.cpp @@ -25,7 +25,7 @@ class MvccGcTest : public ::testing::Test { protected: std::atomic<int> record_destruction_count{0}; - mvcc::VersionList<DestrCountRec> version_list{*t0, 0, + mvcc::VersionList<DestrCountRec> version_list{*t0, 0, 0, record_destruction_count}; std::vector<tx::Transaction *> transactions{t0}; @@ -126,7 +126,7 @@ TEST(GarbageCollector, GcClean) { auto t1 = engine.Begin(); std::atomic<int> record_destruction_count{0}; auto vl = - new mvcc::VersionList<DestrCountRec>(*t1, 0, record_destruction_count); + new mvcc::VersionList<DestrCountRec>(*t1, 0, 0, record_destruction_count); auto access = collection.access(); access.insert(0, vl); engine.Commit(*t1); diff --git a/tests/unit/query_expression_evaluator.cpp b/tests/unit/query_expression_evaluator.cpp index 29a0b2bb9..2999df82b 100644 --- a/tests/unit/query_expression_evaluator.cpp +++ b/tests/unit/query_expression_evaluator.cpp @@ -4,7 +4,6 @@ #include <unordered_map> #include <vector> -#include "gflags/gflags.h" #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -26,9 +25,6 @@ using query::test_common::ToList; using testing::ElementsAre; using testing::UnorderedElementsAre; -DECLARE_bool(generate_vertex_ids); -DECLARE_bool(generate_edge_ids); - namespace { struct NoContextExpressionEvaluator { @@ -1477,44 +1473,19 @@ TEST(ExpressionEvaluator, FunctionIndexInfo) { } } -TEST(ExpressionEvaluator, FunctionIdNoGeneration) { - NoContextExpressionEvaluator eval; - EXPECT_THROW(EvaluateFunction("ID", {}, &eval.ctx), QueryRuntimeException); - auto &dba = eval.dba; - - auto va = dba.InsertVertex(); - auto ea = dba.InsertEdge(va, va, dba.EdgeType("edge")); - // Unable to get id if they are not set - EXPECT_THROW(EvaluateFunction("ID", {va}, &eval.ctx), QueryRuntimeException); - EXPECT_THROW(EvaluateFunction("ID", {ea}, &eval.ctx), QueryRuntimeException); -} - -TEST(ExpressionEvaluator, FunctionIdGenerateVertexIds) { - FLAGS_generate_vertex_ids = true; +TEST(ExpressionEvaluator, FunctionId) { NoContextExpressionEvaluator eval; auto &dba = eval.dba; auto va = dba.InsertVertex(); auto ea = dba.InsertEdge(va, va, dba.EdgeType("edge")); - EXPECT_EQ(EvaluateFunction("ID", {va}, &eval.ctx).Value<int64_t>(), 0); - EXPECT_THROW(EvaluateFunction("ID", {ea}, &eval.ctx), QueryRuntimeException); - auto vb = dba.InsertVertex(); - EXPECT_EQ(EvaluateFunction("ID", {vb}, &eval.ctx).Value<int64_t>(), 1024); - FLAGS_generate_vertex_ids = false; -} - -TEST(ExpressionEvaluator, FunctionIdGenerateEdgeIds) { - FLAGS_generate_edge_ids = true; - NoContextExpressionEvaluator eval; - auto &dba = eval.dba; - auto va = dba.InsertVertex(); - auto ea = dba.InsertEdge(va, va, dba.EdgeType("edge")); - EXPECT_THROW(EvaluateFunction("ID", {va}, &eval.ctx), QueryRuntimeException); + EXPECT_EQ(EvaluateFunction("ID", {va}, &eval.ctx).Value<int64_t>(), 0); EXPECT_EQ(EvaluateFunction("ID", {ea}, &eval.ctx).Value<int64_t>(), 0); - - auto eb = dba.InsertEdge(va, va, dba.EdgeType("edge")); - EXPECT_EQ(EvaluateFunction("ID", {eb}, &eval.ctx).Value<int64_t>(), 1024); - FLAGS_generate_edge_ids = false; + EXPECT_EQ(EvaluateFunction("ID", {vb}, &eval.ctx).Value<int64_t>(), 1024); + EXPECT_THROW(EvaluateFunction("ID", {}, &eval.ctx), QueryRuntimeException); + EXPECT_THROW(EvaluateFunction("ID", {0}, &eval.ctx), QueryRuntimeException); + EXPECT_THROW(EvaluateFunction("ID", {va, ea}, &eval.ctx), + QueryRuntimeException); } TEST(ExpressionEvaluator, FunctionWorkerIdException) { diff --git a/tests/unit/state_delta.cpp b/tests/unit/state_delta.cpp index 008c1a314..f7e86e6e5 100644 --- a/tests/unit/state_delta.cpp +++ b/tests/unit/state_delta.cpp @@ -10,7 +10,8 @@ TEST(StateDelta, CreateVertex) { auto gid0 = generator.Next(); { database::GraphDbAccessor dba(db); - auto delta = database::StateDelta::CreateVertex(dba.transaction_id(), gid0); + auto delta = + database::StateDelta::CreateVertex(dba.transaction_id(), gid0, 0); delta.Apply(dba); dba.Commit(); } @@ -18,6 +19,7 @@ TEST(StateDelta, CreateVertex) { database::GraphDbAccessor dba(db); auto vertex = dba.FindVertexOptional(gid0, false); EXPECT_TRUE(vertex); + EXPECT_EQ(vertex->cypher_id(), 0); } } @@ -58,8 +60,9 @@ TEST(StateDelta, CreateEdge) { } { database::GraphDbAccessor dba(db); - auto delta = database::StateDelta::CreateEdge( - dba.transaction_id(), gid2, gid0, gid1, dba.EdgeType("edge"), "edge"); + auto delta = + database::StateDelta::CreateEdge(dba.transaction_id(), gid2, 0, gid0, + gid1, dba.EdgeType("edge"), "edge"); delta.Apply(dba); dba.Commit(); } diff --git a/tools/src/mg_import_csv/main.cpp b/tools/src/mg_import_csv/main.cpp index 09700d98f..ef8bf72e4 100644 --- a/tools/src/mg_import_csv/main.cpp +++ b/tools/src/mg_import_csv/main.cpp @@ -114,7 +114,7 @@ class MemgraphNodeIdMap { return found_it->second; } - int64_t Insert(const NodeId &node_id) { + uint64_t Insert(const NodeId &node_id) { auto gid = generator_.Next(); node_id_to_mg_[node_id] = gid; return gid; @@ -250,7 +250,7 @@ void WriteNodeRow( } } CHECK(id) << "Node ID must be specified"; - partial_vertices[*id] = {*id, labels, properties, {}}; + partial_vertices[*id] = {*id, static_cast<int>(*id), labels, properties, {}}; } auto PassNodes(std::unordered_map<gid::Gid, durability::DecodedSnapshotVertex> @@ -379,12 +379,12 @@ void Convert(const std::vector<std::string> &nodes, } for (auto edge : edges) { auto encoded = edge.second; + auto edge_address = storage::EdgeAddress(encoded.id, 0); vertices[encoded.from].out.push_back( - {storage::EdgeAddress(encoded.id, 0), - storage::VertexAddress(encoded.to, 0), encoded.type}); + {edge_address, storage::VertexAddress(encoded.to, 0), encoded.type}); vertices[encoded.to].in.push_back( - {storage::EdgeAddress(encoded.id, 0), - storage::VertexAddress(encoded.from, 0), encoded.type}); + {edge_address, storage::VertexAddress(encoded.from, 0), + encoded.type}); } for (auto vertex_pair : vertices) { auto &vertex = vertex_pair.second; @@ -403,6 +403,8 @@ void Convert(const std::vector<std::string> &nodes, encoder.WriteList(transformed); encoder.WriteMap(vertex.properties); + encoder.WriteInt(vertex.cypher_id); + encoder.WriteInt(vertex.in.size()); for (auto edge : vertex.in) { encoder.WriteInt(edge.address.raw()); @@ -429,6 +431,9 @@ void Convert(const std::vector<std::string> &nodes, encoder.WriteInt(edge.to); encoder.WriteString(edge.type); encoder.WriteMap(edge.properties); + + // cypher_id + encoder.WriteInt(edge.id); } buffer.WriteValue(node_count);