Add support for the id function
Reviewers: dgleich, teon.banek, mferencevic Reviewed By: mferencevic Subscribers: dgleich, mferencevic, teon.banek, pullbot Differential Revision: https://phabricator.memgraph.io/D1462
This commit is contained in:
parent
5dad16712e
commit
888c6a4bca
@ -48,17 +48,6 @@
|
||||
# would be --properties-on-disk=biography,summary.
|
||||
#--properties-on-disk=
|
||||
|
||||
## Ids
|
||||
|
||||
# Memgraph can generate an identifier for each vertex or edge. The
|
||||
# generated ids are returned with the id function.
|
||||
|
||||
# Memgraph can generate an identifier for each vertex.
|
||||
--generate-vertex-ids=true
|
||||
|
||||
# Memgraph can generate an identifier for each edge.
|
||||
--generate-edge-ids=true
|
||||
|
||||
## Query
|
||||
#
|
||||
# Various settings related to openCypher query execution.
|
||||
|
@ -715,9 +715,8 @@ functions.
|
||||
`counter` | Generates integers that are guaranteed to be unique on the database level, for the given counter name.
|
||||
`counterSet` | Sets the counter with the given name to the given value.
|
||||
`indexInfo` | Returns a list of all the indexes available in the database. The list includes indexes that are not yet ready for use (they are concurrently being built by another transaction).
|
||||
`id` | Returns identifier for a given node or edge. To enable automatic generation of the identifiers, `--generate-vertex-ids` and `--generate-edge-ids` parameters have to be set on `true` (enabled in the configuration by default).
|
||||
`timestamp` | Returns the difference, measured in milliseconds, between the current time and midnight, January 1, 1970 UTC.
|
||||
|
||||
`id` | Returns identifier for a given node or edge. The identifier is generated during the initialization of node or edge and will be persisted through the durability mechanism.
|
||||
|
||||
#### String Operators
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include "utils/underlying_cast.hpp"
|
||||
#include "utils/cast.hpp"
|
||||
|
||||
namespace communication::bolt {
|
||||
|
||||
@ -82,4 +82,4 @@ static constexpr Marker Marker16[3] = {Marker::String16, Marker::List16,
|
||||
Marker::Map16};
|
||||
static constexpr Marker Marker32[3] = {Marker::String32, Marker::List32,
|
||||
Marker::Map32};
|
||||
}
|
||||
} // namespace communication::bolt
|
||||
|
@ -7,7 +7,7 @@
|
||||
#include "communication/bolt/v1/codes.hpp"
|
||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||
#include "utils/bswap.hpp"
|
||||
#include "utils/underlying_cast.hpp"
|
||||
#include "utils/cast.hpp"
|
||||
|
||||
namespace communication::bolt {
|
||||
|
||||
|
@ -15,18 +15,6 @@
|
||||
#include "utils/atomic.hpp"
|
||||
#include "utils/on_scope_exit.hpp"
|
||||
|
||||
// Autogenerate property with Vertex ID
|
||||
DEFINE_bool(
|
||||
generate_vertex_ids, false,
|
||||
"If enabled database will automatically generate Ids as properties of "
|
||||
"vertices, which can be accessed using the `Id` cypher function.");
|
||||
|
||||
// Autogenerate property with Edge ID
|
||||
DEFINE_bool(
|
||||
generate_edge_ids, false,
|
||||
"If enabled database will automatically generate Ids as properties of "
|
||||
"edges, which can be accessed using the `Id` cypher function.");
|
||||
|
||||
namespace database {
|
||||
|
||||
GraphDbAccessor::GraphDbAccessor(GraphDb &db)
|
||||
@ -74,21 +62,22 @@ bool GraphDbAccessor::should_abort() const {
|
||||
durability::WriteAheadLog &GraphDbAccessor::wal() { return db_.wal(); }
|
||||
|
||||
VertexAccessor GraphDbAccessor::InsertVertex(
|
||||
std::experimental::optional<gid::Gid> requested_gid) {
|
||||
std::experimental::optional<gid::Gid> requested_gid,
|
||||
std::experimental::optional<int64_t> cypher_id) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
|
||||
int64_t gid = db_.storage().vertex_generator_.Next(requested_gid);
|
||||
auto vertex_vlist = new mvcc::VersionList<Vertex>(transaction_, gid);
|
||||
auto gid = db_.storage().vertex_generator_.Next(requested_gid);
|
||||
if (!cypher_id) cypher_id = utils::MemcpyCast<int64_t>(gid);
|
||||
auto vertex_vlist =
|
||||
new mvcc::VersionList<Vertex>(transaction_, gid, *cypher_id);
|
||||
|
||||
bool success =
|
||||
db_.storage().vertices_.access().insert(gid, vertex_vlist).second;
|
||||
CHECK(success) << "Attempting to insert a vertex with an existing GID: "
|
||||
<< gid;
|
||||
wal().Emplace(
|
||||
database::StateDelta::CreateVertex(transaction_.id_, vertex_vlist->gid_));
|
||||
auto va = VertexAccessor(vertex_vlist, *this);
|
||||
if (FLAGS_generate_vertex_ids)
|
||||
va.PropsSet(Property(PropertyValueStore::IdPropertyName), gid);
|
||||
wal().Emplace(database::StateDelta::CreateVertex(
|
||||
transaction_.id_, vertex_vlist->gid_, vertex_vlist->cypher_id()));
|
||||
auto va = VertexAccessor(storage::VertexAddress(vertex_vlist), *this);
|
||||
return va;
|
||||
}
|
||||
|
||||
@ -114,8 +103,8 @@ VertexAccessor GraphDbAccessor::InsertVertexIntoRemote(
|
||||
|
||||
std::experimental::optional<VertexAccessor> GraphDbAccessor::FindVertexOptional(
|
||||
gid::Gid gid, bool current_state) {
|
||||
VertexAccessor record_accessor(db_.storage().LocalAddress<Vertex>(gid),
|
||||
*this);
|
||||
VertexAccessor record_accessor(
|
||||
storage::VertexAddress(db_.storage().LocalAddress<Vertex>(gid)), *this);
|
||||
if (!record_accessor.Visible(transaction(), current_state))
|
||||
return std::experimental::nullopt;
|
||||
return record_accessor;
|
||||
@ -129,7 +118,8 @@ VertexAccessor GraphDbAccessor::FindVertex(gid::Gid gid, bool current_state) {
|
||||
|
||||
std::experimental::optional<EdgeAccessor> GraphDbAccessor::FindEdgeOptional(
|
||||
gid::Gid gid, bool current_state) {
|
||||
EdgeAccessor record_accessor(db_.storage().LocalAddress<Edge>(gid), *this);
|
||||
EdgeAccessor record_accessor(
|
||||
storage::EdgeAddress(db_.storage().LocalAddress<Edge>(gid)), *this);
|
||||
if (!record_accessor.Visible(transaction(), current_state))
|
||||
return std::experimental::nullopt;
|
||||
return record_accessor;
|
||||
@ -395,7 +385,8 @@ void GraphDbAccessor::DetachRemoveVertex(VertexAccessor &vertex_accessor) {
|
||||
|
||||
EdgeAccessor GraphDbAccessor::InsertEdge(
|
||||
VertexAccessor &from, VertexAccessor &to, storage::EdgeType edge_type,
|
||||
std::experimental::optional<gid::Gid> requested_gid) {
|
||||
std::experimental::optional<gid::Gid> requested_gid,
|
||||
std::experimental::optional<int64_t> cypher_id) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
|
||||
// The address of an edge we'll create.
|
||||
@ -403,8 +394,8 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
|
||||
|
||||
Vertex *from_updated;
|
||||
if (from.is_local()) {
|
||||
auto edge_accessor =
|
||||
InsertOnlyEdge(from.address(), to.address(), edge_type, requested_gid);
|
||||
auto edge_accessor = InsertOnlyEdge(from.address(), to.address(), edge_type,
|
||||
requested_gid, cypher_id);
|
||||
edge_address = edge_accessor.address(),
|
||||
|
||||
from.SwitchNew();
|
||||
@ -414,8 +405,8 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
|
||||
// `CREATE_EDGE`, but always have it split into 3 parts (edge insertion,
|
||||
// in/out modification).
|
||||
wal().Emplace(database::StateDelta::CreateEdge(
|
||||
transaction_.id_, edge_accessor.gid(), from.gid(), to.gid(), edge_type,
|
||||
EdgeTypeName(edge_type)));
|
||||
transaction_.id_, edge_accessor.gid(), edge_accessor.cypher_id(),
|
||||
from.gid(), to.gid(), edge_type, EdgeTypeName(edge_type)));
|
||||
|
||||
} else {
|
||||
edge_address = db().updates_clients().CreateEdge(transaction_id(), from, to,
|
||||
@ -466,21 +457,22 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
|
||||
EdgeAccessor GraphDbAccessor::InsertOnlyEdge(
|
||||
storage::VertexAddress from, storage::VertexAddress to,
|
||||
storage::EdgeType edge_type,
|
||||
std::experimental::optional<gid::Gid> requested_gid) {
|
||||
std::experimental::optional<gid::Gid> requested_gid,
|
||||
std::experimental::optional<int64_t> cypher_id) {
|
||||
CHECK(from.is_local())
|
||||
<< "`from` address should be local when calling InsertOnlyEdge";
|
||||
int64_t gid = db_.storage().edge_generator_.Next(requested_gid);
|
||||
auto edge_vlist =
|
||||
new mvcc::VersionList<Edge>(transaction_, gid, from, to, edge_type);
|
||||
auto gid = db_.storage().edge_generator_.Next(requested_gid);
|
||||
if (!cypher_id) cypher_id = utils::MemcpyCast<int64_t>(gid);
|
||||
auto edge_vlist = new mvcc::VersionList<Edge>(transaction_, gid, *cypher_id,
|
||||
from, to, edge_type);
|
||||
// We need to insert edge_vlist to edges_ before calling update since update
|
||||
// can throw and edge_vlist will not be garbage collected if it is not in
|
||||
// edges_ skiplist.
|
||||
bool success = db_.storage().edges_.access().insert(gid, edge_vlist).second;
|
||||
CHECK(success) << "Attempting to insert an edge with an existing GID: "
|
||||
<< gid;
|
||||
auto ea = EdgeAccessor(edge_vlist, *this, from, to, edge_type);
|
||||
if (FLAGS_generate_edge_ids)
|
||||
ea.PropsSet(Property(PropertyValueStore::IdPropertyName), gid);
|
||||
auto ea = EdgeAccessor(storage::EdgeAddress(edge_vlist), *this, from, to,
|
||||
edge_type);
|
||||
return ea;
|
||||
}
|
||||
|
||||
|
@ -73,10 +73,14 @@ class GraphDbAccessor {
|
||||
*
|
||||
* @param requested_gid The requested GID. Should only be provided when
|
||||
* recovering from durability.
|
||||
* @param cypher_id Take a look under mvcc::VersionList::cypher_id
|
||||
*
|
||||
* @return See above.
|
||||
*/
|
||||
VertexAccessor InsertVertex(std::experimental::optional<gid::Gid>
|
||||
requested_gid = std::experimental::nullopt);
|
||||
requested_gid = std::experimental::nullopt,
|
||||
std::experimental::optional<int64_t> cypher_id =
|
||||
std::experimental::nullopt);
|
||||
|
||||
/** Creates a new Vertex on the given worker. It is NOT allowed to call this
|
||||
* function with this worker's id. */
|
||||
@ -149,7 +153,7 @@ class GraphDbAccessor {
|
||||
// wrap version lists into accessors, which will look for visible versions
|
||||
auto accessors = iter::imap(
|
||||
[this](auto id_vlist) {
|
||||
return VertexAccessor(id_vlist.second, *this);
|
||||
return VertexAccessor(storage::VertexAddress(id_vlist.second), *this);
|
||||
},
|
||||
db_.storage().vertices_.access());
|
||||
|
||||
@ -174,7 +178,9 @@ class GraphDbAccessor {
|
||||
auto Vertices(storage::Label label, bool current_state) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
return iter::imap(
|
||||
[this](auto vlist) { return VertexAccessor(vlist, *this); },
|
||||
[this](auto vlist) {
|
||||
return VertexAccessor(storage::VertexAddress(vlist), *this);
|
||||
},
|
||||
db_.storage().labels_index_.GetVlists(label, transaction_,
|
||||
current_state));
|
||||
}
|
||||
@ -198,7 +204,9 @@ class GraphDbAccessor {
|
||||
LabelPropertyIndex::Key(label, property)))
|
||||
<< "Label+property index doesn't exist.";
|
||||
return iter::imap(
|
||||
[this](auto vlist) { return VertexAccessor(vlist, *this); },
|
||||
[this](auto vlist) {
|
||||
return VertexAccessor(storage::VertexAddress(vlist), *this);
|
||||
},
|
||||
db_.storage().label_property_index_.GetVlists(
|
||||
LabelPropertyIndex::Key(label, property), transaction_,
|
||||
current_state));
|
||||
@ -226,7 +234,9 @@ class GraphDbAccessor {
|
||||
CHECK(value.type() != PropertyValue::Type::Null)
|
||||
<< "Can't query index for propery value type null.";
|
||||
return iter::imap(
|
||||
[this](auto vlist) { return VertexAccessor(vlist, *this); },
|
||||
[this](auto vlist) {
|
||||
return VertexAccessor(storage::VertexAddress(vlist), *this);
|
||||
},
|
||||
db_.storage().label_property_index_.GetVlists(
|
||||
LabelPropertyIndex::Key(label, property), value, transaction_,
|
||||
current_state));
|
||||
@ -269,7 +279,9 @@ class GraphDbAccessor {
|
||||
LabelPropertyIndex::Key(label, property)))
|
||||
<< "Label+property index doesn't exist.";
|
||||
return iter::imap(
|
||||
[this](auto vlist) { return VertexAccessor(vlist, *this); },
|
||||
[this](auto vlist) {
|
||||
return VertexAccessor(storage::VertexAddress(vlist), *this);
|
||||
},
|
||||
db_.storage().label_property_index_.GetVlists(
|
||||
LabelPropertyIndex::Key(label, property), lower, upper,
|
||||
transaction_, current_state));
|
||||
@ -291,22 +303,30 @@ class GraphDbAccessor {
|
||||
* @param type Edge type.
|
||||
* @param requested_gid The requested GID. Should only be provided when
|
||||
* recovering from durability.
|
||||
* @param cypher_id Take a look under mvcc::VersionList::cypher_id
|
||||
*
|
||||
* @return An accessor to the edge.
|
||||
*/
|
||||
EdgeAccessor InsertEdge(VertexAccessor &from, VertexAccessor &to,
|
||||
storage::EdgeType type,
|
||||
std::experimental::optional<gid::Gid> requested_gid =
|
||||
std::experimental::nullopt,
|
||||
std::experimental::optional<int64_t> cypher_id =
|
||||
std::experimental::nullopt);
|
||||
|
||||
/**
|
||||
* Insert edge into main storage, but don't insert it into from and to
|
||||
* vertices edge lists.
|
||||
*
|
||||
* @param cypher_id Take a look under mvcc::VersionList::cypher_id
|
||||
*/
|
||||
EdgeAccessor InsertOnlyEdge(storage::VertexAddress from,
|
||||
storage::VertexAddress to,
|
||||
storage::EdgeType edge_type,
|
||||
std::experimental::optional<gid::Gid>
|
||||
requested_gid = std::experimental::nullopt);
|
||||
requested_gid = std::experimental::nullopt,
|
||||
std::experimental::optional<int64_t> cypher_id =
|
||||
std::experimental::nullopt);
|
||||
|
||||
/**
|
||||
* Removes an edge from the graph. Parameters can indicate if the edge should
|
||||
@ -364,7 +384,9 @@ class GraphDbAccessor {
|
||||
|
||||
// wrap version lists into accessors, which will look for visible versions
|
||||
auto accessors = iter::imap(
|
||||
[this](auto id_vlist) { return EdgeAccessor(id_vlist.second, *this); },
|
||||
[this](auto id_vlist) {
|
||||
return EdgeAccessor(storage::EdgeAddress(id_vlist.second), *this);
|
||||
},
|
||||
db_.storage().edges_.access());
|
||||
|
||||
// filter out the accessors not visible to the current transaction
|
||||
|
@ -18,20 +18,22 @@ StateDelta StateDelta::TxAbort(tx::TransactionId tx_id) {
|
||||
return {StateDelta::Type::TRANSACTION_ABORT, tx_id};
|
||||
}
|
||||
|
||||
StateDelta StateDelta::CreateVertex(tx::TransactionId tx_id,
|
||||
gid::Gid vertex_id) {
|
||||
StateDelta StateDelta::CreateVertex(tx::TransactionId tx_id, gid::Gid vertex_id,
|
||||
int64_t cypher_id) {
|
||||
StateDelta op(StateDelta::Type::CREATE_VERTEX, tx_id);
|
||||
op.vertex_id = vertex_id;
|
||||
op.cypher_id = cypher_id;
|
||||
return op;
|
||||
}
|
||||
|
||||
StateDelta StateDelta::CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id,
|
||||
gid::Gid vertex_from_id,
|
||||
int64_t cypher_id, gid::Gid vertex_from_id,
|
||||
gid::Gid vertex_to_id,
|
||||
storage::EdgeType edge_type,
|
||||
const std::string &edge_type_name) {
|
||||
StateDelta op(StateDelta::Type::CREATE_EDGE, tx_id);
|
||||
op.edge_id = edge_id;
|
||||
op.cypher_id = cypher_id;
|
||||
op.vertex_from_id = vertex_from_id;
|
||||
op.vertex_to_id = vertex_to_id;
|
||||
op.edge_type = edge_type;
|
||||
@ -39,8 +41,7 @@ StateDelta StateDelta::CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id,
|
||||
return op;
|
||||
}
|
||||
|
||||
StateDelta StateDelta::AddOutEdge(tx::TransactionId tx_id,
|
||||
gid::Gid vertex_id,
|
||||
StateDelta StateDelta::AddOutEdge(tx::TransactionId tx_id, gid::Gid vertex_id,
|
||||
storage::VertexAddress vertex_to_address,
|
||||
storage::EdgeAddress edge_address,
|
||||
storage::EdgeType edge_type) {
|
||||
@ -78,8 +79,7 @@ StateDelta StateDelta::AddInEdge(tx::TransactionId tx_id, gid::Gid vertex_id,
|
||||
return op;
|
||||
}
|
||||
|
||||
StateDelta StateDelta::RemoveInEdge(tx::TransactionId tx_id,
|
||||
gid::Gid vertex_id,
|
||||
StateDelta StateDelta::RemoveInEdge(tx::TransactionId tx_id, gid::Gid vertex_id,
|
||||
storage::EdgeAddress edge_address) {
|
||||
CHECK(edge_address.is_remote()) << "WAL can only contain global addresses.";
|
||||
StateDelta op(StateDelta::Type::REMOVE_IN_EDGE, tx_id);
|
||||
@ -101,8 +101,7 @@ StateDelta StateDelta::PropsSetVertex(tx::TransactionId tx_id,
|
||||
return op;
|
||||
}
|
||||
|
||||
StateDelta StateDelta::PropsSetEdge(tx::TransactionId tx_id,
|
||||
gid::Gid edge_id,
|
||||
StateDelta StateDelta::PropsSetEdge(tx::TransactionId tx_id, gid::Gid edge_id,
|
||||
storage::Property property,
|
||||
const std::string &property_name,
|
||||
const PropertyValue &value) {
|
||||
@ -124,8 +123,8 @@ StateDelta StateDelta::AddLabel(tx::TransactionId tx_id, gid::Gid vertex_id,
|
||||
return op;
|
||||
}
|
||||
|
||||
StateDelta StateDelta::RemoveLabel(tx::TransactionId tx_id,
|
||||
gid::Gid vertex_id, storage::Label label,
|
||||
StateDelta StateDelta::RemoveLabel(tx::TransactionId tx_id, gid::Gid vertex_id,
|
||||
storage::Label label,
|
||||
const std::string &label_name) {
|
||||
StateDelta op(StateDelta::Type::REMOVE_LABEL, tx_id);
|
||||
op.vertex_id = vertex_id;
|
||||
@ -134,23 +133,21 @@ StateDelta StateDelta::RemoveLabel(tx::TransactionId tx_id,
|
||||
return op;
|
||||
}
|
||||
|
||||
StateDelta StateDelta::RemoveVertex(tx::TransactionId tx_id,
|
||||
gid::Gid vertex_id, bool check_empty) {
|
||||
StateDelta StateDelta::RemoveVertex(tx::TransactionId tx_id, gid::Gid vertex_id,
|
||||
bool check_empty) {
|
||||
StateDelta op(StateDelta::Type::REMOVE_VERTEX, tx_id);
|
||||
op.vertex_id = vertex_id;
|
||||
op.check_empty = check_empty;
|
||||
return op;
|
||||
}
|
||||
|
||||
StateDelta StateDelta::RemoveEdge(tx::TransactionId tx_id,
|
||||
gid::Gid edge_id) {
|
||||
StateDelta StateDelta::RemoveEdge(tx::TransactionId tx_id, gid::Gid edge_id) {
|
||||
StateDelta op(StateDelta::Type::REMOVE_EDGE, tx_id);
|
||||
op.edge_id = edge_id;
|
||||
return op;
|
||||
}
|
||||
|
||||
StateDelta StateDelta::BuildIndex(tx::TransactionId tx_id,
|
||||
storage::Label label,
|
||||
StateDelta StateDelta::BuildIndex(tx::TransactionId tx_id, storage::Label label,
|
||||
const std::string &label_name,
|
||||
storage::Property property,
|
||||
const std::string &property_name) {
|
||||
@ -175,9 +172,11 @@ void StateDelta::Encode(
|
||||
break;
|
||||
case Type::CREATE_VERTEX:
|
||||
encoder.WriteInt(vertex_id);
|
||||
encoder.WriteInt(cypher_id);
|
||||
break;
|
||||
case Type::CREATE_EDGE:
|
||||
encoder.WriteInt(edge_id);
|
||||
encoder.WriteInt(cypher_id);
|
||||
encoder.WriteInt(vertex_from_id);
|
||||
encoder.WriteInt(vertex_to_id);
|
||||
encoder.WriteInt(edge_type.Id());
|
||||
@ -267,9 +266,11 @@ std::experimental::optional<StateDelta> StateDelta::Decode(
|
||||
break;
|
||||
case Type::CREATE_VERTEX:
|
||||
DECODE_MEMBER(vertex_id, ValueInt)
|
||||
DECODE_MEMBER(cypher_id, ValueInt)
|
||||
break;
|
||||
case Type::CREATE_EDGE:
|
||||
DECODE_MEMBER(edge_id, ValueInt)
|
||||
DECODE_MEMBER(cypher_id, ValueInt)
|
||||
DECODE_MEMBER(vertex_from_id, ValueInt)
|
||||
DECODE_MEMBER(vertex_to_id, ValueInt)
|
||||
DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType)
|
||||
@ -354,12 +355,13 @@ void StateDelta::Apply(GraphDbAccessor &dba) const {
|
||||
LOG(FATAL) << "Transaction handling not handled in Apply";
|
||||
break;
|
||||
case Type::CREATE_VERTEX:
|
||||
dba.InsertVertex(vertex_id);
|
||||
dba.InsertVertex(vertex_id, cypher_id);
|
||||
break;
|
||||
case Type::CREATE_EDGE: {
|
||||
auto from = dba.FindVertex(vertex_from_id, true);
|
||||
auto to = dba.FindVertex(vertex_to_id, true);
|
||||
dba.InsertEdge(from, to, dba.EdgeType(edge_type_name), edge_id);
|
||||
dba.InsertEdge(from, to, dba.EdgeType(edge_type_name), edge_id,
|
||||
cypher_id);
|
||||
break;
|
||||
}
|
||||
case Type::ADD_OUT_EDGE:
|
||||
|
@ -39,6 +39,7 @@ cpp<#
|
||||
;; only keep addresses.
|
||||
(vertex-id "gid::Gid")
|
||||
(edge-id "gid::Gid")
|
||||
(cypher-id :int64_t)
|
||||
(edge-address "storage::EdgeAddress")
|
||||
(vertex-from-id "gid::Gid")
|
||||
(vertex-from-address "storage::VertexAddress")
|
||||
@ -130,9 +131,12 @@ omitted in the comment.")
|
||||
static StateDelta TxCommit(tx::TransactionId tx_id);
|
||||
static StateDelta TxAbort(tx::TransactionId tx_id);
|
||||
static StateDelta CreateVertex(tx::TransactionId tx_id,
|
||||
gid::Gid vertex_id);
|
||||
gid::Gid vertex_id,
|
||||
int64_t cypher_id);
|
||||
static StateDelta CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id,
|
||||
gid::Gid vertex_from_id, gid::Gid vertex_to_id,
|
||||
int64_t cypher_id,
|
||||
gid::Gid vertex_from_id,
|
||||
gid::Gid vertex_to_id,
|
||||
storage::EdgeType edge_type,
|
||||
const std::string &edge_type_name);
|
||||
static StateDelta AddOutEdge(tx::TransactionId tx_id, gid::Gid vertex_id,
|
||||
|
@ -28,7 +28,7 @@ struct RecoveryInfo;
|
||||
RecoveryInfo Recover(const std::experimental::filesystem::path &,
|
||||
database::GraphDb &,
|
||||
std::experimental::optional<RecoveryInfo>);
|
||||
};
|
||||
}; // namespace durability
|
||||
|
||||
namespace database {
|
||||
|
||||
@ -77,7 +77,11 @@ class Storage {
|
||||
storage::Address<mvcc::VersionList<TRecord>> address) const {
|
||||
if (address.is_local()) return address;
|
||||
if (address.worker_id() == worker_id_) {
|
||||
return LocalAddress<TRecord>(address.gid());
|
||||
auto vlist = LocalAddress<TRecord>(address.gid());
|
||||
if constexpr (std::is_same<TRecord, Vertex>::value)
|
||||
return storage::VertexAddress(vlist);
|
||||
else
|
||||
return storage::EdgeAddress(vlist);
|
||||
}
|
||||
return address;
|
||||
}
|
||||
|
@ -121,10 +121,13 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
|
||||
std::unordered_map<gid::Gid,
|
||||
std::pair<storage::VertexAddress, storage::VertexAddress>>
|
||||
edge_gid_endpoints_mapping;
|
||||
|
||||
for (int64_t i = 0; i < vertex_count; ++i) {
|
||||
auto vertex = decoder.ReadSnapshotVertex();
|
||||
RETURN_IF_NOT(vertex);
|
||||
auto vertex_accessor = dba.InsertVertex(vertex->gid);
|
||||
|
||||
auto vertex_accessor =
|
||||
dba.InsertVertex(vertex->gid, vertex->cypher_id);
|
||||
for (const auto &label : vertex->labels) {
|
||||
vertex_accessor.add_label(dba.Label(label));
|
||||
}
|
||||
@ -169,10 +172,18 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
|
||||
}
|
||||
};
|
||||
|
||||
DecodedValue dv_cypher_id;
|
||||
|
||||
for (int64_t i = 0; i < edge_count; ++i) {
|
||||
RETURN_IF_NOT(
|
||||
decoder.ReadValue(&dv, communication::bolt::DecodedValue::Type::Edge));
|
||||
auto &edge = dv.ValueEdge();
|
||||
|
||||
// Read cypher_id
|
||||
RETURN_IF_NOT(decoder.ReadValue(
|
||||
&dv_cypher_id, communication::bolt::DecodedValue::Type::Int));
|
||||
auto cypher_id = dv_cypher_id.ValueInt();
|
||||
|
||||
// We have to take full edge endpoints from vertices since the endpoints
|
||||
// found here don't containt worker_id, and this can't be changed since this
|
||||
// edges must be bolt-compliant
|
||||
@ -187,8 +198,8 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
|
||||
vertex_transform_to_local_if_possible(from);
|
||||
vertex_transform_to_local_if_possible(to);
|
||||
|
||||
auto edge_accessor =
|
||||
dba.InsertOnlyEdge(from, to, dba.EdgeType(edge.type), edge.id);
|
||||
auto edge_accessor = dba.InsertOnlyEdge(from, to, dba.EdgeType(edge.type),
|
||||
edge.id, cypher_id);
|
||||
|
||||
for (const auto &property_pair : edge.properties)
|
||||
edge_accessor.PropsSet(dba.Property(property_pair.first),
|
||||
|
@ -16,7 +16,7 @@ namespace fs = std::experimental::filesystem;
|
||||
namespace durability {
|
||||
|
||||
// Snapshot layout is described in durability/version.hpp
|
||||
static_assert(durability::kVersion == 5,
|
||||
static_assert(durability::kVersion == 6,
|
||||
"Wrong snapshot version, please update!");
|
||||
|
||||
namespace {
|
||||
@ -68,6 +68,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
|
||||
}
|
||||
for (const auto &edge : dba.Edges(false)) {
|
||||
encoder.WriteEdge(edge);
|
||||
encoder.WriteInt(edge.cypher_id());
|
||||
edge_num++;
|
||||
}
|
||||
buffer.WriteValue(vertex_num);
|
||||
|
@ -22,6 +22,7 @@ struct DecodedInlinedVertexEdge;
|
||||
*/
|
||||
struct DecodedSnapshotVertex {
|
||||
gid::Gid gid;
|
||||
int64_t cypher_id;
|
||||
std::vector<std::string> labels;
|
||||
std::map<std::string, communication::bolt::DecodedValue> properties;
|
||||
// Vector of edges without properties
|
||||
|
@ -17,37 +17,45 @@ class SnapshotDecoder : public communication::bolt::Decoder<Buffer> {
|
||||
communication::bolt::DecodedValue dv;
|
||||
DecodedSnapshotVertex vertex;
|
||||
|
||||
// Read global id, labels and properties of the vertex
|
||||
if (!communication::bolt::Decoder<Buffer>::ReadValue(
|
||||
&dv, communication::bolt::DecodedValue::Type::Vertex)) {
|
||||
DLOG(WARNING) << "Unable to read snapshot vertex";
|
||||
return std::experimental::nullopt;
|
||||
}
|
||||
|
||||
auto &read_vertex = dv.ValueVertex();
|
||||
vertex.gid = read_vertex.id;
|
||||
vertex.gid = static_cast<uint64_t>(read_vertex.id);
|
||||
vertex.labels = read_vertex.labels;
|
||||
vertex.properties = read_vertex.properties;
|
||||
|
||||
// Read cypher_id
|
||||
if (!communication::bolt::Decoder<Buffer>::ReadValue(
|
||||
&dv, communication::bolt::DecodedValue::Type::Int)) {
|
||||
DLOG(WARNING) << "Unable to read vertex cypher_id";
|
||||
return std::experimental::nullopt;
|
||||
}
|
||||
vertex.cypher_id = dv.ValueInt();
|
||||
|
||||
// Read in edges
|
||||
if (!communication::bolt::Decoder<Buffer>::ReadValue(
|
||||
&dv, communication::bolt::DecodedValue::Type::Int)) {
|
||||
DLOG(WARNING) << "[ReadSnapshotVertex] Couldn't read number of in "
|
||||
"edges in vertex!";
|
||||
return std::experimental::nullopt;
|
||||
}
|
||||
|
||||
for (int i = 0; i < dv.ValueInt(); ++i) {
|
||||
auto edge = ReadSnapshotEdge();
|
||||
if (!edge) return std::experimental::nullopt;
|
||||
vertex.in.emplace_back(*edge);
|
||||
}
|
||||
|
||||
// Read out edges
|
||||
if (!communication::bolt::Decoder<Buffer>::ReadValue(
|
||||
&dv, communication::bolt::DecodedValue::Type::Int)) {
|
||||
DLOG(WARNING) << "[ReadSnapshotVertex] Couldn't read number of out "
|
||||
"edges in vertex!";
|
||||
return std::experimental::nullopt;
|
||||
}
|
||||
|
||||
for (int i = 0; i < dv.ValueInt(); ++i) {
|
||||
auto edge = ReadSnapshotEdge();
|
||||
if (!edge) return std::experimental::nullopt;
|
||||
@ -65,23 +73,24 @@ class SnapshotDecoder : public communication::bolt::Decoder<Buffer> {
|
||||
|
||||
VLOG(20) << "[ReadSnapshotEdge] Start";
|
||||
|
||||
// read ID
|
||||
// Read global id of this edge
|
||||
if (!communication::bolt::Decoder<Buffer>::ReadValue(
|
||||
&dv, communication::bolt::DecodedValue::Type::Int)) {
|
||||
DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read ID!";
|
||||
DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read Global ID!";
|
||||
return std::experimental::nullopt;
|
||||
}
|
||||
edge.address = storage::EdgeAddress(static_cast<uint64_t>(dv.ValueInt()));
|
||||
|
||||
edge.address = dv.ValueInt();
|
||||
// read other side
|
||||
// Read global vertex id of the other side of the edge
|
||||
// (global id of from/to vertexes).
|
||||
if (!communication::bolt::Decoder<Buffer>::ReadValue(
|
||||
&dv, communication::bolt::DecodedValue::Type::Int)) {
|
||||
DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read from address!";
|
||||
DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read from/to address!";
|
||||
return std::experimental::nullopt;
|
||||
}
|
||||
edge.vertex = dv.ValueInt();
|
||||
edge.vertex = storage::VertexAddress(static_cast<uint64_t>(dv.ValueInt()));
|
||||
|
||||
// read type
|
||||
// Read edge type
|
||||
if (!communication::bolt::Decoder<Buffer>::ReadValue(
|
||||
&dv, communication::bolt::DecodedValue::Type::String)) {
|
||||
DLOG(WARNING) << "[ReadSnapshotEdge] Couldn't read type!";
|
||||
@ -90,7 +99,6 @@ class SnapshotDecoder : public communication::bolt::Decoder<Buffer> {
|
||||
edge.type = dv.ValueString();
|
||||
|
||||
VLOG(20) << "[ReadSnapshotEdge] Success";
|
||||
|
||||
return edge;
|
||||
}
|
||||
};
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "communication/bolt/v1/encoder/base_encoder.hpp"
|
||||
#include "utils/cast.hpp"
|
||||
|
||||
namespace durability {
|
||||
|
||||
@ -12,14 +13,17 @@ class SnapshotEncoder : public communication::bolt::BaseEncoder<Buffer> {
|
||||
void WriteSnapshotVertex(const VertexAccessor &vertex) {
|
||||
communication::bolt::BaseEncoder<Buffer>::WriteVertex(vertex);
|
||||
|
||||
// write in edges without properties
|
||||
// Write cypher_id
|
||||
this->WriteInt(vertex.cypher_id());
|
||||
|
||||
// Write in edges without properties
|
||||
this->WriteUInt(vertex.in_degree());
|
||||
auto edges_in = vertex.in();
|
||||
for (const auto &edge : edges_in) {
|
||||
this->WriteSnapshotEdge(edge, true);
|
||||
}
|
||||
|
||||
// write out edges without properties
|
||||
// Write out edges without properties
|
||||
this->WriteUInt(vertex.out_degree());
|
||||
auto edges_out = vertex.out();
|
||||
for (const auto &edge : edges_out) {
|
||||
@ -29,18 +33,21 @@ class SnapshotEncoder : public communication::bolt::BaseEncoder<Buffer> {
|
||||
|
||||
private:
|
||||
void WriteUInt(const uint64_t &value) {
|
||||
this->WriteInt(*reinterpret_cast<const int64_t *>(&value));
|
||||
this->WriteInt(utils::MemcpyCast<int64_t>(value));
|
||||
}
|
||||
|
||||
// Writes edge without properties
|
||||
void WriteSnapshotEdge(const EdgeAccessor &edge, bool write_from) {
|
||||
// Write global id of the edge
|
||||
WriteUInt(edge.GlobalAddress().raw());
|
||||
|
||||
// Write to/from global id
|
||||
if (write_from)
|
||||
WriteUInt(edge.from().GlobalAddress().raw());
|
||||
else
|
||||
WriteUInt(edge.to().GlobalAddress().raw());
|
||||
|
||||
// write type
|
||||
// Write type
|
||||
this->WriteString(edge.db_accessor().EdgeTypeName(edge.EdgeType()));
|
||||
}
|
||||
};
|
||||
|
@ -1,5 +1,11 @@
|
||||
#pragma once
|
||||
|
||||
///
|
||||
///
|
||||
/// IMPORTANT: Please update this file for every snapshot format change!!!
|
||||
/// TODO (buda): This is not rock solid.
|
||||
///
|
||||
|
||||
#include <array>
|
||||
#include <cstdint>
|
||||
|
||||
@ -8,9 +14,9 @@ namespace durability {
|
||||
constexpr std::array<uint8_t, 4> kMagicNumber{{'M', 'G', 's', 'n'}};
|
||||
|
||||
// The current default version of snapshot and WAL encoding / decoding.
|
||||
constexpr int64_t kVersion{5};
|
||||
constexpr int64_t kVersion{6};
|
||||
|
||||
// Snapshot format (version 5):
|
||||
// Snapshot format (version 6):
|
||||
// 1) Magic number + snapshot version
|
||||
// 2) Distributed worker ID
|
||||
//
|
||||
@ -29,9 +35,16 @@ constexpr int64_t kVersion{5};
|
||||
//
|
||||
// We must inline edges with nodes because some edges might be stored on other
|
||||
// worker (edges are always stored only on the worker of the edge source).
|
||||
// 8) Bolt encoded nodes + inlined edges (edge address, other endpoint address
|
||||
// and edge type)
|
||||
// 9) Bolt encoded edges
|
||||
// 8) Bolt encoded nodes. Each node is written in the following format:
|
||||
// * gid, labels, properties
|
||||
// * cypher_id
|
||||
// * inlined edges (edge address, other endpoint address and edge type)
|
||||
// 9) Bolt encoded edges. Each edge is written in the following format:
|
||||
// * gid
|
||||
// * from, to
|
||||
// * edge_type
|
||||
// * properties
|
||||
// * cypher_id
|
||||
//
|
||||
// 10) Snapshot summary (number of nodes, number of edges, hash)
|
||||
|
||||
|
@ -21,15 +21,18 @@ class VersionList {
|
||||
public:
|
||||
/**
|
||||
* @brief Constructor that is used to insert one item into VersionList.
|
||||
*
|
||||
* @param t - transaction
|
||||
* @param gid - Version list identifier. Uniqueness guaranteed by the code
|
||||
* creating this version list.
|
||||
* @param cypher_id - Number returned from the id function.
|
||||
* @param args - args forwarded to constructor of item T (for
|
||||
* creating the first Record (Version) in this VersionList.
|
||||
*/
|
||||
template <typename... Args>
|
||||
VersionList(const tx::Transaction &t, gid::Gid gid, Args &&... args)
|
||||
: gid_(gid) {
|
||||
VersionList(const tx::Transaction &t, gid::Gid gid, int64_t cypher_id,
|
||||
Args &&... args)
|
||||
: gid_(gid), cypher_id_(cypher_id) {
|
||||
// TODO replace 'new' with something better
|
||||
auto *v1 = new T(std::forward<Args>(args)...);
|
||||
v1->mark_created(t);
|
||||
@ -221,8 +224,13 @@ class VersionList {
|
||||
record->mark_expired(t);
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO (buda): Try to move git_ to storage::Address.
|
||||
*/
|
||||
const gid::Gid gid_;
|
||||
|
||||
auto cypher_id() { return cypher_id_; }
|
||||
|
||||
private:
|
||||
void lock_and_validate(T *record, const tx::Transaction &t) {
|
||||
DCHECK(record != nullptr) << "Record is nullptr on lock and validation.";
|
||||
@ -260,6 +268,18 @@ class VersionList {
|
||||
return updated;
|
||||
}
|
||||
|
||||
/**
|
||||
* The following member is here because Memgraph supports ID function from
|
||||
* the Cypher query language. If you have plans to change this you have to
|
||||
* consider the following:
|
||||
* * If the id has to be durable. -> Snapshot and WAL have to be updated.
|
||||
* * Impact on query execution. |
|
||||
* * Impact on the communication stack. |-> The id has to be returned
|
||||
* to the client.
|
||||
* * Import tools bacause of the dependencies on the durability stack.
|
||||
* * Implications on the distributed system.
|
||||
*/
|
||||
int64_t cypher_id_{0};
|
||||
std::atomic<T *> head_{nullptr};
|
||||
RecordLock lock_;
|
||||
};
|
||||
|
@ -610,24 +610,10 @@ TypedValue Id(const std::vector<TypedValue> &args, Context *ctx) {
|
||||
auto &arg = args[0];
|
||||
switch (arg.type()) {
|
||||
case TypedValue::Type::Vertex: {
|
||||
auto id = arg.ValueVertex().PropsAt(
|
||||
ctx->db_accessor_.Property(PropertyValueStore::IdPropertyName));
|
||||
if (id.IsNull()) {
|
||||
throw QueryRuntimeException(
|
||||
"IDs are not set on vertices, --generate-vertex-ids flag must be "
|
||||
"set on startup to automatically generate them");
|
||||
}
|
||||
return id.Value<int64_t>();
|
||||
return TypedValue(arg.ValueVertex().cypher_id());
|
||||
}
|
||||
case TypedValue::Type::Edge: {
|
||||
auto id = arg.ValueEdge().PropsAt(
|
||||
ctx->db_accessor_.Property(PropertyValueStore::IdPropertyName));
|
||||
if (id.IsNull()) {
|
||||
throw QueryRuntimeException(
|
||||
"IDs are not set on edges, --generate-edge-ids flag must be set on "
|
||||
"startup to automatically generate them");
|
||||
}
|
||||
return id.Value<int64_t>();
|
||||
return TypedValue(arg.ValueEdge().cypher_id());
|
||||
}
|
||||
default:
|
||||
throw QueryRuntimeException("id argument must be a vertex or an edge");
|
||||
|
@ -4,8 +4,8 @@
|
||||
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "storage/serialization.capnp.h"
|
||||
#include "storage/gid.hpp"
|
||||
#include "storage/serialization.capnp.h"
|
||||
|
||||
namespace storage {
|
||||
|
||||
@ -41,10 +41,10 @@ class Address {
|
||||
Address() {}
|
||||
|
||||
// Constructor for raw address value
|
||||
Address(StorageT storage) : storage_(storage) {}
|
||||
explicit Address(StorageT storage) : storage_(storage) {}
|
||||
|
||||
// Constructor for local Address.
|
||||
Address(TLocalObj *ptr) {
|
||||
explicit Address(TLocalObj *ptr) {
|
||||
uintptr_t ptr_no_type = reinterpret_cast<uintptr_t>(ptr);
|
||||
DCHECK((ptr_no_type & kTypeMask) == 0) << "Ptr has type_mask bit set";
|
||||
storage_ = ptr_no_type | kLocal;
|
||||
|
@ -150,6 +150,13 @@ class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> {
|
||||
* owner is some other worker in a distributed system. */
|
||||
bool is_local() const { return address_.is_local(); }
|
||||
|
||||
int64_t cypher_id() const {
|
||||
if (address_.is_local())
|
||||
return address_.local()->cypher_id();
|
||||
else
|
||||
throw utils::NotYetImplemented("Fetch remote cypher_id");
|
||||
}
|
||||
|
||||
protected:
|
||||
/**
|
||||
* Sends delta for remote processing.
|
||||
|
@ -100,9 +100,9 @@ class VertexAccessor : public RecordAccessor<Vertex> {
|
||||
* or empty, the parameter is ignored.
|
||||
*/
|
||||
auto in(const std::vector<storage::EdgeType> *edge_types) const {
|
||||
return MakeAccessorIterator(current().in_.begin(nullptr, edge_types),
|
||||
current().in_.end(), false, address(),
|
||||
db_accessor());
|
||||
return MakeAccessorIterator(
|
||||
current().in_.begin(storage::VertexAddress(nullptr), edge_types),
|
||||
current().in_.end(), false, address(), db_accessor());
|
||||
}
|
||||
|
||||
/** Returns EdgeAccessors for all outgoing edges. */
|
||||
@ -133,9 +133,9 @@ class VertexAccessor : public RecordAccessor<Vertex> {
|
||||
* or empty, the parameter is ignored.
|
||||
*/
|
||||
auto out(const std::vector<storage::EdgeType> *edge_types) const {
|
||||
return MakeAccessorIterator(current().out_.begin(nullptr, edge_types),
|
||||
current().out_.end(), true, address(),
|
||||
db_accessor());
|
||||
return MakeAccessorIterator(
|
||||
current().out_.begin(storage::VertexAddress(nullptr), edge_types),
|
||||
current().out_.end(), true, address(), db_accessor());
|
||||
}
|
||||
|
||||
/** Removes the given edge from the outgoing edges of this vertex. Note that
|
||||
|
37
src/utils/cast.hpp
Normal file
37
src/utils/cast.hpp
Normal file
@ -0,0 +1,37 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <cstring>
|
||||
#include <type_traits>
|
||||
|
||||
namespace utils {
|
||||
|
||||
template <typename T>
|
||||
constexpr typename std::underlying_type<T>::type UnderlyingCast(T e) {
|
||||
return static_cast<typename std::underlying_type<T>::type>(e);
|
||||
}
|
||||
|
||||
/**
|
||||
* uint to int conversion in C++ is a bit tricky. Take a look here
|
||||
* https://stackoverflow.com/questions/14623266/why-cant-i-reinterpret-cast-uint-to-int
|
||||
* for more details.
|
||||
*
|
||||
* @tparam TDest Returned datatype.
|
||||
* @tparam TSrc Input datatype.
|
||||
*
|
||||
* @return "copy casted" value.
|
||||
*/
|
||||
template <typename TDest, typename TSrc>
|
||||
TDest MemcpyCast(TSrc src) {
|
||||
TDest dest;
|
||||
static_assert(sizeof(dest) == sizeof(src),
|
||||
"MemcpyCast expects source and destination to be of same size");
|
||||
static_assert(std::is_arithmetic<TSrc>::value,
|
||||
"MemcpyCast expects source is an arithmetic type");
|
||||
static_assert(std::is_arithmetic<TDest>::value,
|
||||
"MemcypCast expects destination is an arithmetic type");
|
||||
std::memcpy(&dest, &src, sizeof(src));
|
||||
return dest;
|
||||
}
|
||||
|
||||
} // namespace utils
|
@ -1,12 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <type_traits>
|
||||
|
||||
namespace utils {
|
||||
|
||||
template <typename T>
|
||||
constexpr typename std::underlying_type<T>::type UnderlyingCast(T e) {
|
||||
return static_cast<typename std::underlying_type<T>::type>(e);
|
||||
}
|
||||
|
||||
} // namespace utils
|
@ -21,7 +21,7 @@ void MvccMix(benchmark::State &state) {
|
||||
state.PauseTiming();
|
||||
tx::SingleNodeEngine engine;
|
||||
auto t1 = engine.Begin();
|
||||
mvcc::VersionList<Prop> version_list(*t1, 0);
|
||||
mvcc::VersionList<Prop> version_list(*t1, 0, 0);
|
||||
|
||||
engine.Commit(*t1);
|
||||
auto t2 = engine.Begin();
|
||||
|
@ -14,7 +14,7 @@
|
||||
namespace snapshot_generation {
|
||||
|
||||
// Snapshot layout is described in durability/version.hpp
|
||||
static_assert(durability::kVersion == 5,
|
||||
static_assert(durability::kVersion == 6,
|
||||
"Wrong snapshot version, please update!");
|
||||
|
||||
class SnapshotWriter {
|
||||
@ -69,6 +69,9 @@ class SnapshotWriter {
|
||||
WriteList(node.labels);
|
||||
encoder_.WriteMap(node.props);
|
||||
|
||||
// cypher_id
|
||||
encoder_.WriteInt(utils::MemcpyCast<int64_t>(node.gid));
|
||||
|
||||
encoder_.WriteInt(node.in_edges.size());
|
||||
for (const auto &edge_idx : node.in_edges) {
|
||||
WriteInlineEdge(edges.at(edge_idx), true);
|
||||
@ -93,6 +96,9 @@ class SnapshotWriter {
|
||||
encoder_.WriteString(edge.type);
|
||||
encoder_.WriteMap(edge.props);
|
||||
|
||||
// cypher_id
|
||||
encoder_.WriteInt(utils::MemcpyCast<int64_t>(edge.gid));
|
||||
|
||||
++edges_written_;
|
||||
}
|
||||
|
||||
|
@ -19,7 +19,7 @@ TEST(LabelsIndex, UniqueInsert) {
|
||||
tx::SingleNodeEngine engine;
|
||||
|
||||
auto t1 = engine.Begin();
|
||||
mvcc::VersionList<Vertex> vlist(*t1, 0);
|
||||
mvcc::VersionList<Vertex> vlist(*t1, 0, 0);
|
||||
engine.Commit(*t1);
|
||||
auto t2 = engine.Begin();
|
||||
|
||||
@ -48,8 +48,8 @@ TEST(LabelsIndex, UniqueFilter) {
|
||||
tx::SingleNodeEngine engine;
|
||||
|
||||
auto t1 = engine.Begin();
|
||||
mvcc::VersionList<Vertex> vlist1(*t1, 0);
|
||||
mvcc::VersionList<Vertex> vlist2(*t1, 1);
|
||||
mvcc::VersionList<Vertex> vlist1(*t1, 0, 0);
|
||||
mvcc::VersionList<Vertex> vlist2(*t1, 1, 1);
|
||||
engine.Advance(t1->id_);
|
||||
auto r1v1 = vlist1.find(*t1);
|
||||
auto r1v2 = vlist2.find(*t1);
|
||||
@ -89,8 +89,8 @@ TEST(LabelsIndex, Refresh) {
|
||||
|
||||
// add two vertices to database
|
||||
auto t1 = engine.Begin();
|
||||
mvcc::VersionList<Vertex> vlist1(*t1, 0);
|
||||
mvcc::VersionList<Vertex> vlist2(*t1, 1);
|
||||
mvcc::VersionList<Vertex> vlist1(*t1, 0, 0);
|
||||
mvcc::VersionList<Vertex> vlist2(*t1, 1, 1);
|
||||
engine.Advance(t1->id_);
|
||||
|
||||
auto v1r1 = vlist1.find(*t1);
|
||||
|
@ -24,7 +24,7 @@ class LabelPropertyIndexComplexTest : public ::testing::Test {
|
||||
index.IndexFinishedBuilding(*key);
|
||||
|
||||
t = engine.Begin();
|
||||
vlist = new mvcc::VersionList<Vertex>(*t, 0);
|
||||
vlist = new mvcc::VersionList<Vertex>(*t, 0, 0);
|
||||
engine.Advance(t->id_);
|
||||
|
||||
vertex = vlist->find(*t);
|
||||
|
@ -121,26 +121,44 @@ class DistributedSerializationMvcc : public ::testing::Test {
|
||||
protected:
|
||||
tx::SingleNodeEngine engine;
|
||||
tx::Transaction *tx = engine.Begin();
|
||||
mvcc::VersionList<Vertex> v1_vlist{*tx, 0};
|
||||
mvcc::VersionList<Vertex> v1_vlist{*tx, 0, 0};
|
||||
Vertex &v1 = *v1_vlist.Oldest();
|
||||
mvcc::VersionList<Vertex> v2_vlist{*tx, 1};
|
||||
mvcc::VersionList<Vertex> v2_vlist{*tx, 1, 1};
|
||||
Vertex &v2 = *v2_vlist.Oldest();
|
||||
mvcc::VersionList<Edge> e1_vlist{*tx, 0, &v1_vlist, &v2_vlist, EdgeType(0)};
|
||||
mvcc::VersionList<Edge> e1_vlist{*tx,
|
||||
0,
|
||||
0,
|
||||
storage::VertexAddress(&v1_vlist),
|
||||
storage::VertexAddress(&v2_vlist),
|
||||
EdgeType(0)};
|
||||
Edge &e1 = *e1_vlist.Oldest();
|
||||
mvcc::VersionList<Edge> e2_vlist{*tx, 1, &v2_vlist, &v1_vlist, EdgeType(2)};
|
||||
mvcc::VersionList<Edge> e2_vlist{*tx,
|
||||
1,
|
||||
1,
|
||||
storage::VertexAddress(&v2_vlist),
|
||||
storage::VertexAddress(&v1_vlist),
|
||||
EdgeType(2)};
|
||||
Edge &e2 = *e2_vlist.Oldest();
|
||||
};
|
||||
|
||||
TEST_F(DistributedSerializationMvcc, VertexEdges) {
|
||||
UPDATE_AND_CHECK_V(v1, v1.out_.emplace(&v2_vlist, &e1_vlist, EdgeType(0)));
|
||||
UPDATE_AND_CHECK_V(v2, v2.in_.emplace(&v1_vlist, &e1_vlist, EdgeType(0)));
|
||||
UPDATE_AND_CHECK_V(v1, v1.in_.emplace(&v2_vlist, &e2_vlist, EdgeType(2)));
|
||||
UPDATE_AND_CHECK_V(v2, v2.out_.emplace(&v1_vlist, &e2_vlist, EdgeType(2)));
|
||||
UPDATE_AND_CHECK_V(
|
||||
v1, v1.out_.emplace(storage::VertexAddress(&v2_vlist),
|
||||
storage::EdgeAddress(&e1_vlist), EdgeType(0)));
|
||||
UPDATE_AND_CHECK_V(
|
||||
v2, v2.in_.emplace(storage::VertexAddress(&v1_vlist),
|
||||
storage::EdgeAddress(&e1_vlist), EdgeType(0)));
|
||||
UPDATE_AND_CHECK_V(
|
||||
v1, v1.in_.emplace(storage::VertexAddress(&v2_vlist),
|
||||
storage::EdgeAddress(&e2_vlist), EdgeType(2)));
|
||||
UPDATE_AND_CHECK_V(
|
||||
v2, v2.out_.emplace(storage::VertexAddress(&v1_vlist),
|
||||
storage::EdgeAddress(&e2_vlist), EdgeType(2)));
|
||||
}
|
||||
|
||||
TEST_F(DistributedSerializationMvcc, EdgeFromAndTo) {
|
||||
UPDATE_AND_CHECK_E(e1, e1.from_ = &v2_vlist);
|
||||
UPDATE_AND_CHECK_E(e1, e1.to_ = &v1_vlist);
|
||||
UPDATE_AND_CHECK_E(e1, e1.from_ = storage::VertexAddress(&v2_vlist));
|
||||
UPDATE_AND_CHECK_E(e1, e1.to_ = storage::VertexAddress(&v1_vlist));
|
||||
}
|
||||
|
||||
TEST_F(DistributedSerializationMvcc, EdgeType) {
|
||||
|
@ -483,6 +483,9 @@ TEST_F(Durability, SnapshotEncoding) {
|
||||
ASSERT_EQ(dv.type(), communication::bolt::DecodedValue::Type::Edge);
|
||||
auto &edge = dv.ValueEdge();
|
||||
decoded_edges.emplace(edge.id, edge);
|
||||
// Read cypher_id.
|
||||
decoder.ReadValue(&dv);
|
||||
ASSERT_EQ(dv.type(), communication::bolt::DecodedValue::Type::Int);
|
||||
}
|
||||
EXPECT_EQ(decoded_edges.size(), 2);
|
||||
EXPECT_EQ(decoded_edges[gid0].from, gid0);
|
||||
@ -823,8 +826,9 @@ TEST_F(Durability, SequentialRecovery) {
|
||||
return threads;
|
||||
};
|
||||
|
||||
auto make_updates = [&run_updates, this](
|
||||
database::GraphDb &db, bool snapshot_during, bool snapshot_after) {
|
||||
auto make_updates = [&run_updates, this](database::GraphDb &db,
|
||||
bool snapshot_during,
|
||||
bool snapshot_after) {
|
||||
std::atomic<bool> keep_running{true};
|
||||
auto update_theads = run_updates(db, keep_running);
|
||||
std::this_thread::sleep_for(25ms);
|
||||
|
@ -14,8 +14,8 @@ TEST(MVCC, Deadlock) {
|
||||
tx::SingleNodeEngine engine;
|
||||
|
||||
auto t0 = engine.Begin();
|
||||
mvcc::VersionList<Prop> version_list1(*t0, 0);
|
||||
mvcc::VersionList<Prop> version_list2(*t0, 1);
|
||||
mvcc::VersionList<Prop> version_list1(*t0, 0, 0);
|
||||
mvcc::VersionList<Prop> version_list2(*t0, 1, 1);
|
||||
engine.Commit(*t0);
|
||||
|
||||
auto t1 = engine.Begin();
|
||||
@ -33,7 +33,7 @@ TEST(MVCC, UpdateDontDelete) {
|
||||
{
|
||||
tx::SingleNodeEngine engine;
|
||||
auto t1 = engine.Begin();
|
||||
mvcc::VersionList<DestrCountRec> version_list(*t1, 0, count);
|
||||
mvcc::VersionList<DestrCountRec> version_list(*t1, 0, 0, count);
|
||||
engine.Commit(*t1);
|
||||
|
||||
auto t2 = engine.Begin();
|
||||
@ -57,7 +57,7 @@ TEST(MVCC, UpdateDontDelete) {
|
||||
TEST(MVCC, Oldest) {
|
||||
tx::SingleNodeEngine engine;
|
||||
auto t1 = engine.Begin();
|
||||
mvcc::VersionList<Prop> version_list(*t1, 0);
|
||||
mvcc::VersionList<Prop> version_list(*t1, 0, 0);
|
||||
auto first = version_list.Oldest();
|
||||
EXPECT_NE(first, nullptr);
|
||||
// TODO Gleich: no need to do 10 checks of the same thing
|
||||
|
@ -59,7 +59,7 @@ class Mvcc : public ::testing::Test {
|
||||
int version_list_size = 0;
|
||||
tx::SingleNodeEngine engine;
|
||||
tx::Transaction *t1 = engine.Begin();
|
||||
mvcc::VersionList<TestClass> version_list{*t1, 0, version_list_size};
|
||||
mvcc::VersionList<TestClass> version_list{*t1, 0, 0, version_list_size};
|
||||
TestClass *v1 = nullptr;
|
||||
tx::Transaction *t2 = nullptr;
|
||||
tx::TransactionId id0, id1, id2;
|
||||
|
@ -25,7 +25,7 @@ class MvccGcTest : public ::testing::Test {
|
||||
|
||||
protected:
|
||||
std::atomic<int> record_destruction_count{0};
|
||||
mvcc::VersionList<DestrCountRec> version_list{*t0, 0,
|
||||
mvcc::VersionList<DestrCountRec> version_list{*t0, 0, 0,
|
||||
record_destruction_count};
|
||||
std::vector<tx::Transaction *> transactions{t0};
|
||||
|
||||
@ -126,7 +126,7 @@ TEST(GarbageCollector, GcClean) {
|
||||
auto t1 = engine.Begin();
|
||||
std::atomic<int> record_destruction_count{0};
|
||||
auto vl =
|
||||
new mvcc::VersionList<DestrCountRec>(*t1, 0, record_destruction_count);
|
||||
new mvcc::VersionList<DestrCountRec>(*t1, 0, 0, record_destruction_count);
|
||||
auto access = collection.access();
|
||||
access.insert(0, vl);
|
||||
engine.Commit(*t1);
|
||||
|
@ -4,7 +4,6 @@
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "gflags/gflags.h"
|
||||
#include "gmock/gmock.h"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
@ -26,9 +25,6 @@ using query::test_common::ToList;
|
||||
using testing::ElementsAre;
|
||||
using testing::UnorderedElementsAre;
|
||||
|
||||
DECLARE_bool(generate_vertex_ids);
|
||||
DECLARE_bool(generate_edge_ids);
|
||||
|
||||
namespace {
|
||||
|
||||
struct NoContextExpressionEvaluator {
|
||||
@ -1477,44 +1473,19 @@ TEST(ExpressionEvaluator, FunctionIndexInfo) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST(ExpressionEvaluator, FunctionIdNoGeneration) {
|
||||
NoContextExpressionEvaluator eval;
|
||||
EXPECT_THROW(EvaluateFunction("ID", {}, &eval.ctx), QueryRuntimeException);
|
||||
auto &dba = eval.dba;
|
||||
|
||||
auto va = dba.InsertVertex();
|
||||
auto ea = dba.InsertEdge(va, va, dba.EdgeType("edge"));
|
||||
// Unable to get id if they are not set
|
||||
EXPECT_THROW(EvaluateFunction("ID", {va}, &eval.ctx), QueryRuntimeException);
|
||||
EXPECT_THROW(EvaluateFunction("ID", {ea}, &eval.ctx), QueryRuntimeException);
|
||||
}
|
||||
|
||||
TEST(ExpressionEvaluator, FunctionIdGenerateVertexIds) {
|
||||
FLAGS_generate_vertex_ids = true;
|
||||
TEST(ExpressionEvaluator, FunctionId) {
|
||||
NoContextExpressionEvaluator eval;
|
||||
auto &dba = eval.dba;
|
||||
auto va = dba.InsertVertex();
|
||||
auto ea = dba.InsertEdge(va, va, dba.EdgeType("edge"));
|
||||
EXPECT_EQ(EvaluateFunction("ID", {va}, &eval.ctx).Value<int64_t>(), 0);
|
||||
EXPECT_THROW(EvaluateFunction("ID", {ea}, &eval.ctx), QueryRuntimeException);
|
||||
|
||||
auto vb = dba.InsertVertex();
|
||||
EXPECT_EQ(EvaluateFunction("ID", {vb}, &eval.ctx).Value<int64_t>(), 1024);
|
||||
FLAGS_generate_vertex_ids = false;
|
||||
}
|
||||
|
||||
TEST(ExpressionEvaluator, FunctionIdGenerateEdgeIds) {
|
||||
FLAGS_generate_edge_ids = true;
|
||||
NoContextExpressionEvaluator eval;
|
||||
auto &dba = eval.dba;
|
||||
auto va = dba.InsertVertex();
|
||||
auto ea = dba.InsertEdge(va, va, dba.EdgeType("edge"));
|
||||
EXPECT_THROW(EvaluateFunction("ID", {va}, &eval.ctx), QueryRuntimeException);
|
||||
EXPECT_EQ(EvaluateFunction("ID", {va}, &eval.ctx).Value<int64_t>(), 0);
|
||||
EXPECT_EQ(EvaluateFunction("ID", {ea}, &eval.ctx).Value<int64_t>(), 0);
|
||||
|
||||
auto eb = dba.InsertEdge(va, va, dba.EdgeType("edge"));
|
||||
EXPECT_EQ(EvaluateFunction("ID", {eb}, &eval.ctx).Value<int64_t>(), 1024);
|
||||
FLAGS_generate_edge_ids = false;
|
||||
EXPECT_EQ(EvaluateFunction("ID", {vb}, &eval.ctx).Value<int64_t>(), 1024);
|
||||
EXPECT_THROW(EvaluateFunction("ID", {}, &eval.ctx), QueryRuntimeException);
|
||||
EXPECT_THROW(EvaluateFunction("ID", {0}, &eval.ctx), QueryRuntimeException);
|
||||
EXPECT_THROW(EvaluateFunction("ID", {va, ea}, &eval.ctx),
|
||||
QueryRuntimeException);
|
||||
}
|
||||
|
||||
TEST(ExpressionEvaluator, FunctionWorkerIdException) {
|
||||
|
@ -10,7 +10,8 @@ TEST(StateDelta, CreateVertex) {
|
||||
auto gid0 = generator.Next();
|
||||
{
|
||||
database::GraphDbAccessor dba(db);
|
||||
auto delta = database::StateDelta::CreateVertex(dba.transaction_id(), gid0);
|
||||
auto delta =
|
||||
database::StateDelta::CreateVertex(dba.transaction_id(), gid0, 0);
|
||||
delta.Apply(dba);
|
||||
dba.Commit();
|
||||
}
|
||||
@ -18,6 +19,7 @@ TEST(StateDelta, CreateVertex) {
|
||||
database::GraphDbAccessor dba(db);
|
||||
auto vertex = dba.FindVertexOptional(gid0, false);
|
||||
EXPECT_TRUE(vertex);
|
||||
EXPECT_EQ(vertex->cypher_id(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
@ -58,8 +60,9 @@ TEST(StateDelta, CreateEdge) {
|
||||
}
|
||||
{
|
||||
database::GraphDbAccessor dba(db);
|
||||
auto delta = database::StateDelta::CreateEdge(
|
||||
dba.transaction_id(), gid2, gid0, gid1, dba.EdgeType("edge"), "edge");
|
||||
auto delta =
|
||||
database::StateDelta::CreateEdge(dba.transaction_id(), gid2, 0, gid0,
|
||||
gid1, dba.EdgeType("edge"), "edge");
|
||||
delta.Apply(dba);
|
||||
dba.Commit();
|
||||
}
|
||||
|
@ -114,7 +114,7 @@ class MemgraphNodeIdMap {
|
||||
return found_it->second;
|
||||
}
|
||||
|
||||
int64_t Insert(const NodeId &node_id) {
|
||||
uint64_t Insert(const NodeId &node_id) {
|
||||
auto gid = generator_.Next();
|
||||
node_id_to_mg_[node_id] = gid;
|
||||
return gid;
|
||||
@ -250,7 +250,7 @@ void WriteNodeRow(
|
||||
}
|
||||
}
|
||||
CHECK(id) << "Node ID must be specified";
|
||||
partial_vertices[*id] = {*id, labels, properties, {}};
|
||||
partial_vertices[*id] = {*id, static_cast<int>(*id), labels, properties, {}};
|
||||
}
|
||||
|
||||
auto PassNodes(std::unordered_map<gid::Gid, durability::DecodedSnapshotVertex>
|
||||
@ -379,12 +379,12 @@ void Convert(const std::vector<std::string> &nodes,
|
||||
}
|
||||
for (auto edge : edges) {
|
||||
auto encoded = edge.second;
|
||||
auto edge_address = storage::EdgeAddress(encoded.id, 0);
|
||||
vertices[encoded.from].out.push_back(
|
||||
{storage::EdgeAddress(encoded.id, 0),
|
||||
storage::VertexAddress(encoded.to, 0), encoded.type});
|
||||
{edge_address, storage::VertexAddress(encoded.to, 0), encoded.type});
|
||||
vertices[encoded.to].in.push_back(
|
||||
{storage::EdgeAddress(encoded.id, 0),
|
||||
storage::VertexAddress(encoded.from, 0), encoded.type});
|
||||
{edge_address, storage::VertexAddress(encoded.from, 0),
|
||||
encoded.type});
|
||||
}
|
||||
for (auto vertex_pair : vertices) {
|
||||
auto &vertex = vertex_pair.second;
|
||||
@ -403,6 +403,8 @@ void Convert(const std::vector<std::string> &nodes,
|
||||
encoder.WriteList(transformed);
|
||||
encoder.WriteMap(vertex.properties);
|
||||
|
||||
encoder.WriteInt(vertex.cypher_id);
|
||||
|
||||
encoder.WriteInt(vertex.in.size());
|
||||
for (auto edge : vertex.in) {
|
||||
encoder.WriteInt(edge.address.raw());
|
||||
@ -429,6 +431,9 @@ void Convert(const std::vector<std::string> &nodes,
|
||||
encoder.WriteInt(edge.to);
|
||||
encoder.WriteString(edge.type);
|
||||
encoder.WriteMap(edge.properties);
|
||||
|
||||
// cypher_id
|
||||
encoder.WriteInt(edge.id);
|
||||
}
|
||||
|
||||
buffer.WriteValue(node_count);
|
||||
|
Loading…
Reference in New Issue
Block a user