Prepare record accessor for distributed
Summary: What's done: - `RecordAccessor` can represent remote data - `GraphDbAccessor` manages remote data - Cleanup: different `EdgeAccessor lazyness (@dgleich: take a look), unused methods, documentation... - `TODO` placeholders for remote implementation What's not done: - RPC and data transfer - how exactly remote errors are handled - not sure if any MVCC Record info for remote data should be tracked - WAL and RPC Deltas properly handled (Gleich working on extracting `Wal::Op`) This implementation should not break single-node execution, and should provide good abstractions and placeholders for distributed. Once that's satisfied, it should land. Reviewers: dgleich, buda, mislav.bradac Reviewed By: dgleich Subscribers: dgleich, pullbot Differential Revision: https://phabricator.memgraph.io/D1030
This commit is contained in:
parent
03db948d7e
commit
f5c0455af4
@ -70,7 +70,7 @@ VertexAccessor GraphDbAccessor::InsertVertex(
|
||||
CHECK(success) << "Attempting to insert a vertex with an existing ID: " << id;
|
||||
db_.wal_.Emplace(database::StateDelta::CreateVertex(transaction_->id_,
|
||||
vertex_vlist->gid_));
|
||||
return VertexAccessor(*vertex_vlist, *this);
|
||||
return VertexAccessor(vertex_vlist, *this);
|
||||
}
|
||||
|
||||
std::experimental::optional<VertexAccessor> GraphDbAccessor::FindVertex(
|
||||
@ -78,8 +78,8 @@ std::experimental::optional<VertexAccessor> GraphDbAccessor::FindVertex(
|
||||
auto collection_accessor = db_.vertices_.access();
|
||||
auto found = collection_accessor.find(gid);
|
||||
if (found == collection_accessor.end()) return std::experimental::nullopt;
|
||||
VertexAccessor record_accessor(*found->second, *this);
|
||||
if (!Visible(record_accessor, current_state))
|
||||
VertexAccessor record_accessor(found->second, *this);
|
||||
if (!record_accessor.Visible(transaction(), current_state))
|
||||
return std::experimental::nullopt;
|
||||
return record_accessor;
|
||||
}
|
||||
@ -89,8 +89,8 @@ std::experimental::optional<EdgeAccessor> GraphDbAccessor::FindEdge(
|
||||
auto collection_accessor = db_.edges_.access();
|
||||
auto found = collection_accessor.find(gid);
|
||||
if (found == collection_accessor.end()) return std::experimental::nullopt;
|
||||
EdgeAccessor record_accessor(*found->second, *this);
|
||||
if (!Visible(record_accessor, current_state))
|
||||
EdgeAccessor record_accessor(found->second, *this);
|
||||
if (!record_accessor.Visible(transaction(), current_state))
|
||||
return std::experimental::nullopt;
|
||||
return record_accessor;
|
||||
}
|
||||
@ -140,7 +140,7 @@ void GraphDbAccessor::BuildIndex(const GraphDbTypes::Label &label,
|
||||
// CreateIndex.
|
||||
GraphDbAccessor dba(db_);
|
||||
for (auto vertex : dba.Vertices(label, false)) {
|
||||
db_.label_property_index_.UpdateOnLabelProperty(vertex.vlist_,
|
||||
db_.label_property_index_.UpdateOnLabelProperty(vertex.address().local(),
|
||||
vertex.current_);
|
||||
}
|
||||
// Commit transaction as we finished applying method on newest visible
|
||||
@ -162,17 +162,19 @@ void GraphDbAccessor::UpdateLabelIndices(const GraphDbTypes::Label &label,
|
||||
const VertexAccessor &vertex_accessor,
|
||||
const Vertex *const vertex) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
db_.labels_index_.Update(label, vertex_accessor.vlist_, vertex);
|
||||
db_.label_property_index_.UpdateOnLabel(label, vertex_accessor.vlist_,
|
||||
vertex);
|
||||
DCHECK(vertex_accessor.is_local()) << "Only local vertices belong in indexes";
|
||||
auto *vlist_ptr = vertex_accessor.address().local();
|
||||
db_.labels_index_.Update(label, vlist_ptr, vertex);
|
||||
db_.label_property_index_.UpdateOnLabel(label, vlist_ptr, vertex);
|
||||
}
|
||||
|
||||
void GraphDbAccessor::UpdatePropertyIndex(
|
||||
const GraphDbTypes::Property &property,
|
||||
const RecordAccessor<Vertex> &record_accessor, const Vertex *const vertex) {
|
||||
const RecordAccessor<Vertex> &vertex_accessor, const Vertex *const vertex) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
db_.label_property_index_.UpdateOnProperty(property, record_accessor.vlist_,
|
||||
vertex);
|
||||
DCHECK(vertex_accessor.is_local()) << "Only local vertices belong in indexes";
|
||||
db_.label_property_index_.UpdateOnProperty(
|
||||
property, vertex_accessor.address().local(), vertex);
|
||||
}
|
||||
|
||||
int64_t GraphDbAccessor::VerticesCount() const {
|
||||
@ -245,23 +247,39 @@ int64_t GraphDbAccessor::VerticesCount(
|
||||
|
||||
bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
|
||||
if (!vertex_accessor.is_local()) {
|
||||
LOG(ERROR) << "Remote vertex deletion not implemented";
|
||||
// TODO support distributed
|
||||
// call remote RemoveVertex(gid), return it's result. The result can be
|
||||
// (true, false), or an error can occur (serialization, timeout). In case
|
||||
// of error the remote worker will be asking for a transaction abort,
|
||||
// not sure what to do here.
|
||||
return false;
|
||||
}
|
||||
vertex_accessor.SwitchNew();
|
||||
// it's possible the vertex was removed already in this transaction
|
||||
// due to it getting matched multiple times by some patterns
|
||||
// we can only delete it once, so check if it's already deleted
|
||||
if (vertex_accessor.current_->is_expired_by(*transaction_)) return true;
|
||||
if (vertex_accessor.current().is_expired_by(*transaction_)) return true;
|
||||
if (vertex_accessor.out_degree() > 0 || vertex_accessor.in_degree() > 0)
|
||||
return false;
|
||||
|
||||
db_.wal_.Emplace(database::StateDelta::RemoveVertex(
|
||||
transaction_->id_, vertex_accessor.vlist_->gid_));
|
||||
|
||||
vertex_accessor.vlist_->remove(vertex_accessor.current_, *transaction_);
|
||||
auto *vlist_ptr = vertex_accessor.address().local();
|
||||
db_.wal_.Emplace(
|
||||
database::StateDelta::RemoveVertex(transaction_->id_, vlist_ptr->gid_));
|
||||
vlist_ptr->remove(vertex_accessor.current_, *transaction_);
|
||||
return true;
|
||||
}
|
||||
|
||||
void GraphDbAccessor::DetachRemoveVertex(VertexAccessor &vertex_accessor) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
if (!vertex_accessor.is_local()) {
|
||||
LOG(ERROR) << "Remote vertex deletion not implemented";
|
||||
// TODO support distributed
|
||||
// call remote DetachRemoveVertex(gid). It can either succeed or an error
|
||||
// can occur. See discussion in the RemoveVertex method above.
|
||||
}
|
||||
vertex_accessor.SwitchNew();
|
||||
for (auto edge_accessor : vertex_accessor.in())
|
||||
RemoveEdge(edge_accessor, true, false);
|
||||
@ -273,15 +291,23 @@ void GraphDbAccessor::DetachRemoveVertex(VertexAccessor &vertex_accessor) {
|
||||
// it's possible the vertex was removed already in this transaction
|
||||
// due to it getting matched multiple times by some patterns
|
||||
// we can only delete it once, so check if it's already deleted
|
||||
if (!vertex_accessor.current_->is_expired_by(*transaction_))
|
||||
vertex_accessor.vlist_->remove(vertex_accessor.current_, *transaction_);
|
||||
if (!vertex_accessor.current().is_expired_by(*transaction_))
|
||||
vertex_accessor.address().local()->remove(vertex_accessor.current_,
|
||||
*transaction_);
|
||||
}
|
||||
|
||||
EdgeAccessor GraphDbAccessor::InsertEdge(
|
||||
VertexAccessor &from, VertexAccessor &to, GraphDbTypes::EdgeType edge_type,
|
||||
std::experimental::optional<gid::Gid> gid) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
|
||||
// An edge is created on the worker of it's "from" vertex.
|
||||
if (!from.is_local()) {
|
||||
LOG(ERROR) << "Remote edge insertion not implemented.";
|
||||
// TODO call remote InsertEdge(...)->gid. Possible outcomes are successful
|
||||
// creation or an error (serialization, timeout). If successful, create an
|
||||
// EdgeAccessor and return it. The remote InsertEdge(...) will be calling
|
||||
// remote Connect(...) if "to" is not local to it.
|
||||
}
|
||||
std::experimental::optional<uint64_t> next_id;
|
||||
if (gid) {
|
||||
CHECK(static_cast<int>(gid::WorkerId(*gid)) == db_.worker_id_)
|
||||
@ -290,8 +316,8 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
|
||||
}
|
||||
|
||||
auto id = db_.edge_generator_.Next(next_id);
|
||||
auto edge_vlist = new mvcc::VersionList<Edge>(*transaction_, id, from.vlist_,
|
||||
to.vlist_, edge_type);
|
||||
auto edge_vlist = new mvcc::VersionList<Edge>(
|
||||
*transaction_, id, from.address(), to.address(), edge_type);
|
||||
// We need to insert edge_vlist to edges_ before calling update since update
|
||||
// can throw and edge_vlist will not be garbage collected if it is not in
|
||||
// edges_ skiplist.
|
||||
@ -300,17 +326,25 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
|
||||
|
||||
// ensure that the "from" accessor has the latest version
|
||||
from.SwitchNew();
|
||||
from.update().out_.emplace(to.vlist_, edge_vlist, edge_type);
|
||||
// ensure that the "to" accessor has the latest version
|
||||
// WARNING: must do that after the above "from.update()" for cases when
|
||||
// we are creating a cycle and "from" and "to" are the same vlist
|
||||
to.SwitchNew();
|
||||
to.update().in_.emplace(from.vlist_, edge_vlist, edge_type);
|
||||
from.update().out_.emplace(to.address(), edge_vlist, edge_type);
|
||||
|
||||
// It is possible that the "to" accessor is remote.
|
||||
if (to.is_local()) {
|
||||
// ensure that the "to" accessor has the latest version (Switch new)
|
||||
// WARNING: must do that after the above "from.update()" for cases when
|
||||
// we are creating a cycle and "from" and "to" are the same vlist
|
||||
to.SwitchNew();
|
||||
to.update().in_.emplace(from.address(), edge_vlist, edge_type);
|
||||
} else {
|
||||
LOG(ERROR) << "Connecting to a remote vertex not implemented.";
|
||||
// TODO call remote Connect(from_gid, edge_gid, to_gid, edge_type). Possible
|
||||
// outcomes are success or error (serialization, timeout).
|
||||
}
|
||||
db_.wal_.Emplace(database::StateDelta::CreateEdge(
|
||||
transaction_->id_, edge_vlist->gid_, from.vlist_->gid_, to.vlist_->gid_,
|
||||
transaction_->id_, edge_vlist->gid_, from.gid(), to.gid(),
|
||||
EdgeTypeName(edge_type)));
|
||||
return EdgeAccessor(*edge_vlist, *this, from.vlist_, to.vlist_, edge_type);
|
||||
return EdgeAccessor(edge_vlist, *this, from.address(), to.address(),
|
||||
edge_type);
|
||||
}
|
||||
|
||||
int64_t GraphDbAccessor::EdgesCount() const {
|
||||
@ -321,17 +355,23 @@ int64_t GraphDbAccessor::EdgesCount() const {
|
||||
void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge_accessor,
|
||||
bool remove_from_from, bool remove_from_to) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
if (!edge_accessor.is_local()) {
|
||||
LOG(ERROR) << "Remote edge deletion not implemented";
|
||||
// TODO support distributed
|
||||
// call remote RemoveEdge(gid, true, true). It can either succeed or an
|
||||
// error can occur. See discussion in the RemoveVertex method above.
|
||||
}
|
||||
// it's possible the edge was removed already in this transaction
|
||||
// due to it getting matched multiple times by some patterns
|
||||
// we can only delete it once, so check if it's already deleted
|
||||
edge_accessor.SwitchNew();
|
||||
if (edge_accessor.current().is_expired_by(*transaction_)) return;
|
||||
if (remove_from_from)
|
||||
edge_accessor.from().update().out_.RemoveEdge(edge_accessor.vlist_);
|
||||
edge_accessor.from().update().out_.RemoveEdge(edge_accessor.address());
|
||||
if (remove_from_to)
|
||||
edge_accessor.to().update().in_.RemoveEdge(edge_accessor.vlist_);
|
||||
edge_accessor.vlist_->remove(edge_accessor.current_, *transaction_);
|
||||
|
||||
edge_accessor.to().update().in_.RemoveEdge(edge_accessor.address());
|
||||
edge_accessor.address().local()->remove(edge_accessor.current_,
|
||||
*transaction_);
|
||||
db_.wal_.Emplace(
|
||||
database::StateDelta::RemoveEdge(transaction_->id_, edge_accessor.gid()));
|
||||
}
|
||||
@ -388,13 +428,21 @@ std::vector<std::string> GraphDbAccessor::IndexInfo() const {
|
||||
for (GraphDbTypes::Label label : db_.labels_index_.Keys()) {
|
||||
info.emplace_back(":" + LabelName(label));
|
||||
}
|
||||
|
||||
// Edge indices are not shown because they are never used.
|
||||
|
||||
for (LabelPropertyIndex::Key key : db_.label_property_index_.Keys()) {
|
||||
info.emplace_back(fmt::format(":{}({})", LabelName(key.label_),
|
||||
PropertyName(key.property_)));
|
||||
}
|
||||
|
||||
return info;
|
||||
}
|
||||
auto &GraphDbAccessor::remote_vertices() { return remote_vertices_; }
|
||||
auto &GraphDbAccessor::remote_edges() { return remote_edges_; }
|
||||
|
||||
template <>
|
||||
GraphDbAccessor::RemoteCache<Vertex> &GraphDbAccessor::remote_elements() {
|
||||
return remote_vertices();
|
||||
}
|
||||
|
||||
template <>
|
||||
GraphDbAccessor::RemoteCache<Edge> &GraphDbAccessor::remote_elements() {
|
||||
return remote_edges();
|
||||
}
|
||||
|
@ -1,12 +1,7 @@
|
||||
//
|
||||
// Copyright 2017 Memgraph
|
||||
// Created by Florijan Stamenkovic on 03.02.17.
|
||||
//
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <experimental/optional>
|
||||
#include <random>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "cppitertools/filter.hpp"
|
||||
#include "cppitertools/imap.hpp"
|
||||
@ -39,6 +34,60 @@ class GraphDbAccessor {
|
||||
friend class VertexAccessor;
|
||||
friend class EdgeAccessor;
|
||||
|
||||
/**
|
||||
* Used for caching Vertices and Edges that are stored on another worker in a
|
||||
* distributed system. Maps global IDs to (old, new) Vertex/Edge pointer
|
||||
* pairs. It is possible that either "old" or "new" are nullptrs, but at
|
||||
* least one must be not-null. The RemoteCache is the owner of TRecord
|
||||
* objects it points to.
|
||||
*
|
||||
* @tparam TRecord - Edge or Vertex
|
||||
*/
|
||||
template <typename TRecord>
|
||||
class RemoteCache {
|
||||
public:
|
||||
~RemoteCache() {
|
||||
for (const auto &pair : cache_) {
|
||||
delete pair.second.first;
|
||||
delete pair.second.second;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the "new" Vertex/Edge for the given gid.
|
||||
*
|
||||
* @param gid - global ID.
|
||||
* @param init_if_necessary - If "new" is not initialized and this flag is
|
||||
* set, then "new" is initialized with a copy of "old" before returning.
|
||||
*/
|
||||
TRecord *FindNew(gid::Gid gid, bool init_if_necessary) {
|
||||
auto found = cache_.find(gid);
|
||||
DCHECK(found != cache_.end()) << "Uninitialized remote Vertex/Edge";
|
||||
auto &pair = found->second;
|
||||
if (!pair.second && init_if_necessary) {
|
||||
pair.second = pair.first->CloneData();
|
||||
}
|
||||
return pair.second;
|
||||
}
|
||||
|
||||
/**
|
||||
* For the Vertex/Edge with the given global ID, looks for the data visible
|
||||
* from the given transaction's ID and command ID, and caches it. Sets the
|
||||
* given pointers to point to the fetched data. Analogue to
|
||||
* mvcc::VersionList::find_set_old_new.
|
||||
*/
|
||||
void FindSetOldNew(const tx::Transaction &, gid::Gid, TRecord *&,
|
||||
TRecord *&) {
|
||||
LOG(ERROR) << "Remote data storage not yet implemented";
|
||||
// TODO fetch data for (gid, t.id_, t.cmd_id()) from remote worker.
|
||||
// Set that data in the cache.
|
||||
// Set the pointers to the new data.
|
||||
}
|
||||
|
||||
private:
|
||||
std::unordered_map<gid::Gid, std::pair<TRecord *, TRecord *>> cache_;
|
||||
};
|
||||
|
||||
public:
|
||||
/**
|
||||
* Creates an accessor for the given database.
|
||||
@ -120,14 +169,14 @@ class GraphDbAccessor {
|
||||
// wrap version lists into accessors, which will look for visible versions
|
||||
auto accessors = iter::imap(
|
||||
[this](auto id_vlist) {
|
||||
return VertexAccessor(*id_vlist.second, *this);
|
||||
return VertexAccessor(id_vlist.second, *this);
|
||||
},
|
||||
db_.vertices_.access());
|
||||
|
||||
// filter out the accessors not visible to the current transaction
|
||||
return iter::filter(
|
||||
[this, current_state](const VertexAccessor &accessor) {
|
||||
return Visible(accessor, current_state);
|
||||
return accessor.Visible(transaction(), current_state);
|
||||
},
|
||||
std::move(accessors));
|
||||
}
|
||||
@ -145,7 +194,7 @@ class GraphDbAccessor {
|
||||
auto Vertices(const GraphDbTypes::Label &label, bool current_state) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
return iter::imap(
|
||||
[this](auto vlist) { return VertexAccessor(*vlist, *this); },
|
||||
[this](auto vlist) { return VertexAccessor(vlist, *this); },
|
||||
db_.labels_index_.GetVlists(label, *transaction_, current_state));
|
||||
}
|
||||
|
||||
@ -168,7 +217,7 @@ class GraphDbAccessor {
|
||||
LabelPropertyIndex::Key(label, property)))
|
||||
<< "Label+property index doesn't exist.";
|
||||
return iter::imap(
|
||||
[this](auto vlist) { return VertexAccessor(*vlist, *this); },
|
||||
[this](auto vlist) { return VertexAccessor(vlist, *this); },
|
||||
db_.label_property_index_.GetVlists(
|
||||
LabelPropertyIndex::Key(label, property), *transaction_,
|
||||
current_state));
|
||||
@ -197,7 +246,7 @@ class GraphDbAccessor {
|
||||
CHECK(value.type() != PropertyValue::Type::Null)
|
||||
<< "Can't query index for propery value type null.";
|
||||
return iter::imap(
|
||||
[this](auto vlist) { return VertexAccessor(*vlist, *this); },
|
||||
[this](auto vlist) { return VertexAccessor(vlist, *this); },
|
||||
db_.label_property_index_.GetVlists(
|
||||
LabelPropertyIndex::Key(label, property), value, *transaction_,
|
||||
current_state));
|
||||
@ -240,7 +289,7 @@ class GraphDbAccessor {
|
||||
LabelPropertyIndex::Key(label, property)))
|
||||
<< "Label+property index doesn't exist.";
|
||||
return iter::imap(
|
||||
[this](auto vlist) { return VertexAccessor(*vlist, *this); },
|
||||
[this](auto vlist) { return VertexAccessor(vlist, *this); },
|
||||
db_.label_property_index_.GetVlists(
|
||||
LabelPropertyIndex::Key(label, property), lower, upper,
|
||||
*transaction_, current_state));
|
||||
@ -310,13 +359,13 @@ class GraphDbAccessor {
|
||||
|
||||
// wrap version lists into accessors, which will look for visible versions
|
||||
auto accessors = iter::imap(
|
||||
[this](auto id_vlist) { return EdgeAccessor(*id_vlist.second, *this); },
|
||||
[this](auto id_vlist) { return EdgeAccessor(id_vlist.second, *this); },
|
||||
db_.edges_.access());
|
||||
|
||||
// filter out the accessors not visible to the current transaction
|
||||
return iter::filter(
|
||||
[this, current_state](const EdgeAccessor &accessor) {
|
||||
return Visible(accessor, current_state);
|
||||
return accessor.Visible(transaction(), current_state);
|
||||
},
|
||||
std::move(accessors));
|
||||
}
|
||||
@ -343,7 +392,7 @@ class GraphDbAccessor {
|
||||
if (accessor.db_accessor_ == this)
|
||||
return std::experimental::make_optional(accessor);
|
||||
|
||||
TAccessor accessor_in_this(*accessor.vlist_, *this);
|
||||
TAccessor accessor_in_this(accessor.address(), *this);
|
||||
if (accessor_in_this.current_)
|
||||
return std::experimental::make_optional(std::move(accessor_in_this));
|
||||
else
|
||||
@ -526,11 +575,16 @@ class GraphDbAccessor {
|
||||
*/
|
||||
void CounterSet(const std::string &name, int64_t value);
|
||||
|
||||
/*
|
||||
* Returns a list of index names present in the database.
|
||||
*/
|
||||
/* Returns a list of index names present in the database. */
|
||||
std::vector<std::string> IndexInfo() const;
|
||||
|
||||
auto &remote_vertices();
|
||||
auto &remote_edges();
|
||||
|
||||
/** Gets remote_vertices or remote_edges, depending on type param. */
|
||||
template <typename TRecord>
|
||||
RemoteCache<TRecord> &remote_elements();
|
||||
|
||||
private:
|
||||
/**
|
||||
* Insert this vertex into corresponding label and label+property (if it
|
||||
@ -548,27 +602,15 @@ class GraphDbAccessor {
|
||||
* Insert this vertex into corresponding any label + 'property' index.
|
||||
* @param property - vertex will be inserted into indexes which contain this
|
||||
* property
|
||||
* @param record_accessor - record_accessor to insert
|
||||
* @param vertex_accessor - vertex accessor to insert
|
||||
* @param vertex - vertex to insert
|
||||
*/
|
||||
void UpdatePropertyIndex(const GraphDbTypes::Property &property,
|
||||
const RecordAccessor<Vertex> &record_accessor,
|
||||
const RecordAccessor<Vertex> &vertex_accessor,
|
||||
const Vertex *const vertex);
|
||||
|
||||
/** Returns true if the given accessor (made with this GraphDbAccessor) is
|
||||
* visible given the `current_state` flag. */
|
||||
template <typename TRecord>
|
||||
bool Visible(const RecordAccessor<TRecord> &accessor,
|
||||
bool current_state) const {
|
||||
return (accessor.old_ &&
|
||||
!(current_state && accessor.old_->is_expired_by(*transaction_))) ||
|
||||
(current_state && accessor.new_ &&
|
||||
!accessor.new_->is_expired_by(*transaction_));
|
||||
}
|
||||
|
||||
/** Casts the DB's engine to MasterEngine and returns it. If the DB's engine
|
||||
* is
|
||||
* RemoteEngine, this function will crash MG. */
|
||||
* is RemoteEngine, this function will crash MG. */
|
||||
tx::MasterEngine &MasterEngine() {
|
||||
auto *local_engine = dynamic_cast<tx::MasterEngine *>(db_.tx_engine_.get());
|
||||
DCHECK(local_engine) << "Asked for MasterEngine on distributed worker";
|
||||
@ -582,4 +624,7 @@ class GraphDbAccessor {
|
||||
|
||||
bool commited_{false};
|
||||
bool aborted_{false};
|
||||
|
||||
RemoteCache<Vertex> remote_vertices_;
|
||||
RemoteCache<Edge> remote_edges_;
|
||||
};
|
||||
|
@ -1572,7 +1572,7 @@ void ReconstructTypedValue(TypedValue &value) {
|
||||
throw QueryRuntimeException(vertex_error_msg);
|
||||
break;
|
||||
case TypedValue::Type::Edge:
|
||||
if (!value.Value<VertexAccessor>().Reconstruct())
|
||||
if (!value.Value<EdgeAccessor>().Reconstruct())
|
||||
throw QueryRuntimeException(edge_error_msg);
|
||||
break;
|
||||
case TypedValue::Type::List:
|
||||
|
@ -7,19 +7,19 @@
|
||||
GraphDbTypes::EdgeType EdgeAccessor::EdgeType() const { return edge_type_; }
|
||||
|
||||
VertexAccessor EdgeAccessor::from() const {
|
||||
return VertexAccessor(*from_.local(), db_accessor());
|
||||
return VertexAccessor(from_, db_accessor());
|
||||
}
|
||||
|
||||
bool EdgeAccessor::from_is(const VertexAccessor &v) const {
|
||||
return v == from_.local();
|
||||
return v.address() == from_;
|
||||
}
|
||||
|
||||
VertexAccessor EdgeAccessor::to() const {
|
||||
return VertexAccessor(*to_.local(), db_accessor());
|
||||
return VertexAccessor(to_, db_accessor());
|
||||
}
|
||||
|
||||
bool EdgeAccessor::to_is(const VertexAccessor &v) const {
|
||||
return v == to_.local();
|
||||
return v.address() == to_;
|
||||
}
|
||||
|
||||
bool EdgeAccessor::is_cycle() const { return to_ == from_; }
|
||||
@ -34,19 +34,3 @@ std::ostream &operator<<(std::ostream &os, const EdgeAccessor &ea) {
|
||||
});
|
||||
return os << "}]";
|
||||
}
|
||||
|
||||
const Edge &EdgeAccessor::current() {
|
||||
if (current_ == nullptr) RecordAccessor::Reconstruct();
|
||||
return *current_;
|
||||
}
|
||||
|
||||
const PropertyValueStore<GraphDbTypes::Property> &EdgeAccessor::Properties()
|
||||
const {
|
||||
if (current_ == nullptr) RecordAccessor::Reconstruct();
|
||||
return RecordAccessor::Properties();
|
||||
}
|
||||
|
||||
const PropertyValue &EdgeAccessor::PropsAt(GraphDbTypes::Property key) const {
|
||||
if (current_ == nullptr) RecordAccessor::Reconstruct();
|
||||
return RecordAccessor::PropsAt(key);
|
||||
}
|
||||
|
@ -8,21 +8,25 @@
|
||||
class VertexAccessor;
|
||||
|
||||
/**
|
||||
* Provides ways for the client programmer (i.e. code generated
|
||||
* by the compiler) to interact with an Edge.
|
||||
* Provides ways for the client programmer (i.e. code generated by the compiler)
|
||||
* to interact with an Edge.
|
||||
*
|
||||
* This class indirectly inherits MVCC data structures and
|
||||
* takes care of MVCC versioning.
|
||||
* Note that EdgeAccessors do not necessary read versioned (MVCC) data. This is
|
||||
* possible because edge endpoints (from and to), as well as the edge type, are
|
||||
* all immutable. These are the most often used aspects of an edge, and are
|
||||
* stored also in the vertex endpoints of the edge. Using them when creating an
|
||||
* EdgeAccessor means that data does not have to be read from a random memory
|
||||
* location, which is often a performance bottleneck in traversals.
|
||||
*/
|
||||
class EdgeAccessor : public RecordAccessor<Edge> {
|
||||
using VertexAddress = storage::Address<mvcc::VersionList<Vertex>>;
|
||||
using EdgeAddress = storage::Address<mvcc::VersionList<Edge>>;
|
||||
|
||||
public:
|
||||
/**
|
||||
* Create a new EdgeAccessor and reads data from edge mvcc
|
||||
*/
|
||||
EdgeAccessor(mvcc::VersionList<Edge> &edge, GraphDbAccessor &db_accessor)
|
||||
: RecordAccessor(edge, db_accessor),
|
||||
/** Constructor that reads data from the random memory location (lower
|
||||
* performance, see class docs). */
|
||||
EdgeAccessor(EdgeAddress address, GraphDbAccessor &db_accessor)
|
||||
: RecordAccessor(address, db_accessor),
|
||||
from_(nullptr),
|
||||
to_(nullptr),
|
||||
edge_type_() {
|
||||
@ -34,39 +38,29 @@ class EdgeAccessor : public RecordAccessor<Edge> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new EdgeAccessor without invoking mvcc methods
|
||||
*/
|
||||
EdgeAccessor(mvcc::VersionList<Edge> &edge, GraphDbAccessor &db_accessor,
|
||||
/** Constructor that does NOT data from the random memory location (better
|
||||
* performance, see class docs). */
|
||||
EdgeAccessor(EdgeAddress address, GraphDbAccessor &db_accessor,
|
||||
VertexAddress from, VertexAddress to,
|
||||
GraphDbTypes::EdgeType edge_type)
|
||||
: RecordAccessor(edge, db_accessor),
|
||||
: RecordAccessor(address, db_accessor),
|
||||
from_(from),
|
||||
to_(to),
|
||||
edge_type_(edge_type) {}
|
||||
|
||||
/**
|
||||
* Returns the edge type.
|
||||
* @return
|
||||
*/
|
||||
GraphDbTypes::EdgeType EdgeType() const;
|
||||
|
||||
/**
|
||||
* Returns an accessor to the originating Vertex of this edge.
|
||||
* @return
|
||||
*/
|
||||
/** Returns an accessor to the originating Vertex of this edge. */
|
||||
VertexAccessor from() const;
|
||||
|
||||
/** Checks if the given vertex is the source of this edge, without
|
||||
* creating an additional accessor to perform the check. */
|
||||
bool from_is(const VertexAccessor &v) const;
|
||||
|
||||
/**
|
||||
* Returns an accessor to the destination Vertex of this edge.
|
||||
*/
|
||||
/** Returns an accessor to the destination Vertex of this edge. */
|
||||
VertexAccessor to() const;
|
||||
|
||||
/** Checks ig the given vertex is the destination of this edge, without
|
||||
/** Checks if the given vertex is the destination of this edge, without
|
||||
* creating an additional accessor to perform the check. */
|
||||
bool to_is(const VertexAccessor &v) const;
|
||||
|
||||
@ -74,19 +68,6 @@ class EdgeAccessor : public RecordAccessor<Edge> {
|
||||
* the same. */
|
||||
bool is_cycle() const;
|
||||
|
||||
/** Returns current edge
|
||||
*/
|
||||
const Edge ¤t();
|
||||
|
||||
/** Returns edge properties
|
||||
*/
|
||||
const PropertyValueStore<GraphDbTypes::Property> &Properties() const;
|
||||
|
||||
/* Returns property at key.
|
||||
* @param key - Property key
|
||||
*/
|
||||
const PropertyValue &PropsAt(GraphDbTypes::Property key) const;
|
||||
|
||||
private:
|
||||
VertexAddress from_;
|
||||
VertexAddress to_;
|
||||
|
@ -5,10 +5,12 @@
|
||||
#include "storage/record_accessor.hpp"
|
||||
#include "storage/vertex.hpp"
|
||||
|
||||
using database::StateDelta;
|
||||
|
||||
template <typename TRecord>
|
||||
RecordAccessor<TRecord>::RecordAccessor(mvcc::VersionList<TRecord> &vlist,
|
||||
RecordAccessor<TRecord>::RecordAccessor(AddressT address,
|
||||
GraphDbAccessor &db_accessor)
|
||||
: vlist_(&vlist), db_accessor_(&db_accessor) {}
|
||||
: db_accessor_(&db_accessor), address_(address) {}
|
||||
|
||||
template <typename TRecord>
|
||||
const PropertyValue &RecordAccessor<TRecord>::PropsAt(
|
||||
@ -22,9 +24,12 @@ void RecordAccessor<Vertex>::PropsSet(GraphDbTypes::Property key,
|
||||
Vertex &vertex = update();
|
||||
vertex.properties_.set(key, value);
|
||||
auto &dba = db_accessor();
|
||||
dba.wal().Emplace(database::StateDelta::PropsSetVertex(
|
||||
dba.transaction_id(), vlist_->gid_, dba.PropertyName(key), value));
|
||||
db_accessor().UpdatePropertyIndex(key, *this, &vertex);
|
||||
// TODO use the delta for handling.
|
||||
dba.wal().Emplace(StateDelta::PropsSetVertex(dba.transaction_id(), gid(),
|
||||
dba.PropertyName(key), value));
|
||||
if (is_local()) {
|
||||
db_accessor().UpdatePropertyIndex(key, *this, &vertex);
|
||||
}
|
||||
}
|
||||
|
||||
template <>
|
||||
@ -32,36 +37,38 @@ void RecordAccessor<Edge>::PropsSet(GraphDbTypes::Property key,
|
||||
PropertyValue value) {
|
||||
update().properties_.set(key, value);
|
||||
auto &dba = db_accessor();
|
||||
dba.wal().Emplace(database::StateDelta::PropsSetEdge(
|
||||
dba.transaction_id(), vlist_->gid_, dba.PropertyName(key), value));
|
||||
// TODO use the delta for handling.
|
||||
dba.wal().Emplace(StateDelta::PropsSetEdge(dba.transaction_id(), gid(),
|
||||
dba.PropertyName(key), value));
|
||||
}
|
||||
|
||||
template <>
|
||||
size_t RecordAccessor<Vertex>::PropsErase(GraphDbTypes::Property key) {
|
||||
auto &dba = db_accessor();
|
||||
dba.wal().Emplace(database::StateDelta::PropsSetVertex(
|
||||
dba.transaction_id(), vlist_->gid_, dba.PropertyName(key),
|
||||
PropertyValue::Null));
|
||||
// TODO use the delta for handling.
|
||||
dba.wal().Emplace(StateDelta::PropsSetVertex(
|
||||
dba.transaction_id(), gid(), dba.PropertyName(key), PropertyValue::Null));
|
||||
return update().properties_.erase(key);
|
||||
}
|
||||
|
||||
template <>
|
||||
size_t RecordAccessor<Edge>::PropsErase(GraphDbTypes::Property key) {
|
||||
auto &dba = db_accessor();
|
||||
dba.wal().Emplace(database::StateDelta::PropsSetEdge(
|
||||
dba.transaction_id(), vlist_->gid_, dba.PropertyName(key),
|
||||
PropertyValue::Null));
|
||||
// TODO use the delta for handling.
|
||||
dba.wal().Emplace(StateDelta::PropsSetEdge(
|
||||
dba.transaction_id(), gid(), dba.PropertyName(key), PropertyValue::Null));
|
||||
return update().properties_.erase(key);
|
||||
}
|
||||
|
||||
template <>
|
||||
void RecordAccessor<Vertex>::PropsClear() {
|
||||
auto &updated = update();
|
||||
// TODO use the delta for handling.
|
||||
auto &dba = db_accessor();
|
||||
for (const auto &kv : updated.properties_)
|
||||
dba.wal().Emplace(database::StateDelta::PropsSetVertex(
|
||||
dba.transaction_id(), vlist_->gid_, dba.PropertyName(kv.first),
|
||||
PropertyValue::Null));
|
||||
dba.wal().Emplace(StateDelta::PropsSetVertex(dba.transaction_id(), gid(),
|
||||
dba.PropertyName(kv.first),
|
||||
PropertyValue::Null));
|
||||
updated.properties_.clear();
|
||||
}
|
||||
|
||||
@ -69,10 +76,11 @@ template <>
|
||||
void RecordAccessor<Edge>::PropsClear() {
|
||||
auto &updated = update();
|
||||
auto &dba = db_accessor();
|
||||
// TODO use the delta for handling.
|
||||
for (const auto &kv : updated.properties_)
|
||||
dba.wal().Emplace(database::StateDelta::PropsSetEdge(
|
||||
dba.transaction_id(), vlist_->gid_, dba.PropertyName(kv.first),
|
||||
PropertyValue::Null));
|
||||
dba.wal().Emplace(StateDelta::PropsSetEdge(dba.transaction_id(), gid(),
|
||||
dba.PropertyName(kv.first),
|
||||
PropertyValue::Null));
|
||||
updated.properties_.clear();
|
||||
}
|
||||
|
||||
@ -82,21 +90,48 @@ const PropertyValueStore<GraphDbTypes::Property>
|
||||
return current().properties_;
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
bool RecordAccessor<TRecord>::operator==(const RecordAccessor &other) const {
|
||||
DCHECK(db_accessor_ == other.db_accessor_) << "Not in the same transaction.";
|
||||
return address_ == other.address_;
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
GraphDbAccessor &RecordAccessor<TRecord>::db_accessor() const {
|
||||
return *db_accessor_;
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
gid::Gid RecordAccessor<TRecord>::gid() const {
|
||||
return is_local() ? address_.local()->gid_ : address_.global_id();
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
storage::Address<mvcc::VersionList<TRecord>> RecordAccessor<TRecord>::address()
|
||||
const {
|
||||
return address_;
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchNew() {
|
||||
if (!new_) {
|
||||
// if new_ is not set yet, look for it
|
||||
// we can just Reconstruct the pointers, old_ will get initialized
|
||||
// to the same value as it has now, and the amount of work is the
|
||||
// same as just looking for a new_ record
|
||||
if (!Reconstruct())
|
||||
DLOG(FATAL)
|
||||
<< "RecordAccessor::SwitchNew - accessor invalid after Reconstruct";
|
||||
if (is_local()) {
|
||||
if (!new_) {
|
||||
// if new_ is not set yet, look for it
|
||||
// we can just Reconstruct the pointers, old_ will get initialized
|
||||
// to the same value as it has now, and the amount of work is the
|
||||
// same as just looking for a new_ record
|
||||
if (!Reconstruct())
|
||||
DLOG(FATAL)
|
||||
<< "RecordAccessor::SwitchNew - accessor invalid after Reconstruct";
|
||||
}
|
||||
} else {
|
||||
// TODO If we have distributed execution, here it's necessary to load the
|
||||
// data from the it's home worker. When only storage is distributed, it's
|
||||
// enough just to switch to the new record if we have it.
|
||||
if (!new_) {
|
||||
new_ = db_accessor().template remote_elements<TRecord>().FindNew(
|
||||
address_.global_id(), false);
|
||||
}
|
||||
}
|
||||
current_ = new_ ? new_ : old_;
|
||||
return *this;
|
||||
@ -110,26 +145,25 @@ RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchOld() {
|
||||
|
||||
template <typename TRecord>
|
||||
bool RecordAccessor<TRecord>::Reconstruct() const {
|
||||
vlist_->find_set_old_new(db_accessor_->transaction(), old_, new_);
|
||||
if (is_local()) {
|
||||
address_.local()->find_set_old_new(db_accessor_->transaction(), old_, new_);
|
||||
} else {
|
||||
db_accessor().template remote_elements<TRecord>().FindSetOldNew(
|
||||
db_accessor().transaction(), address_.global_id(), old_, new_);
|
||||
}
|
||||
current_ = old_ ? old_ : new_;
|
||||
return old_ != nullptr || new_ != nullptr;
|
||||
// We should never use a record accessor that does not have either old_ or
|
||||
// new_ (both are null), but we can't assert that here because we construct
|
||||
// such an accessor and filter it out in GraphDbAccessor::[Vertices|Edges].
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
TRecord &RecordAccessor<TRecord>::update() const {
|
||||
// If the current is not set we probably created the accessor with a lazy
|
||||
// constructor which didn't call Reconstruct on creation
|
||||
if (!current_) {
|
||||
// Edges have lazily initialize mutable, versioned data (properties).
|
||||
if (std::is_same<TRecord, Edge>::value && current_ == nullptr) {
|
||||
bool reconstructed = Reconstruct();
|
||||
DCHECK(reconstructed) << "Unable to initialize record";
|
||||
}
|
||||
|
||||
auto &t = db_accessor_->transaction();
|
||||
// can't update a deleted record if:
|
||||
// - we only have old_ and it hasn't been deleted
|
||||
// - we have new_ and it hasn't been deleted
|
||||
if (!new_) {
|
||||
DCHECK(!old_->is_expired_by(t))
|
||||
<< "Can't update a record deleted in the current transaction+commad";
|
||||
@ -138,16 +172,37 @@ TRecord &RecordAccessor<TRecord>::update() const {
|
||||
<< "Can't update a record deleted in the current transaction+command";
|
||||
}
|
||||
|
||||
if (!new_) new_ = vlist_->update(t);
|
||||
DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update";
|
||||
if (new_) return *new_;
|
||||
|
||||
if (is_local()) {
|
||||
new_ = address_.local()->update(t);
|
||||
DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update";
|
||||
} else {
|
||||
new_ = db_accessor().template remote_elements<TRecord>().FindNew(
|
||||
address_.global_id(), true);
|
||||
}
|
||||
return *new_;
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
const TRecord &RecordAccessor<TRecord>::current() const {
|
||||
// Edges have lazily initialize mutable, versioned data (properties).
|
||||
if (std::is_same<TRecord, Edge>::value && current_ == nullptr)
|
||||
RecordAccessor::Reconstruct();
|
||||
DCHECK(current_ != nullptr) << "RecordAccessor.current_ pointer is nullptr";
|
||||
return *current_;
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
void RecordAccessor<TRecord>::ProcessDelta(const GraphStateDelta &) const {
|
||||
LOG(ERROR) << "Delta processing not yet implemented";
|
||||
if (is_local()) {
|
||||
// TODO write delta to WAL
|
||||
} else {
|
||||
// TODO use the delta to perform a remote update.
|
||||
// TODO check for results (success, serialization_error, ...)
|
||||
}
|
||||
}
|
||||
|
||||
template class RecordAccessor<Vertex>;
|
||||
template class RecordAccessor<Edge>;
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
#include "database/graph_db_datatypes.hpp"
|
||||
#include "mvcc/version_list.hpp"
|
||||
#include "storage/address.hpp"
|
||||
#include "storage/gid.hpp"
|
||||
#include "storage/property_value.hpp"
|
||||
#include "storage/property_value_store.hpp"
|
||||
@ -11,6 +12,20 @@
|
||||
|
||||
class GraphDbAccessor;
|
||||
|
||||
/// Mock class for a DB delta.
|
||||
// TODO replace with the real thing.
|
||||
class GraphStateDelta {
|
||||
public:
|
||||
/// Indicates what the result of applying the delta to the remote worker
|
||||
/// (owner of the Vertex/Edge the delta affects).
|
||||
enum class RemoteResult {
|
||||
SUCCES,
|
||||
SERIALIZATION_ERROR,
|
||||
LOCK_TIMEOUT_ERROR
|
||||
// TODO: network error?
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* An accessor to a database record (an Edge or a Vertex).
|
||||
*
|
||||
@ -22,7 +37,7 @@ class GraphDbAccessor;
|
||||
*/
|
||||
template <typename TRecord>
|
||||
class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
|
||||
public:
|
||||
using AddressT = storage::Address<mvcc::VersionList<TRecord>>;
|
||||
/**
|
||||
* The GraphDbAccessor is friend to this accessor so it can
|
||||
* operate on it's data (mvcc version-list and the record itself).
|
||||
@ -33,12 +48,13 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
|
||||
*/
|
||||
friend GraphDbAccessor;
|
||||
|
||||
public:
|
||||
/**
|
||||
* @param vlist MVCC record that this accessor wraps.
|
||||
* @param address Address (local or global) of the Vertex/Edge of this
|
||||
* accessor.
|
||||
* @param db_accessor The DB accessor that "owns" this record accessor.
|
||||
*/
|
||||
RecordAccessor(mvcc::VersionList<TRecord> &vlist,
|
||||
GraphDbAccessor &db_accessor);
|
||||
RecordAccessor(AddressT address, GraphDbAccessor &db_accessor);
|
||||
|
||||
// this class is default copyable, movable and assignable
|
||||
RecordAccessor(const RecordAccessor &other) = default;
|
||||
@ -46,85 +62,39 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
|
||||
RecordAccessor &operator=(const RecordAccessor &other) = default;
|
||||
RecordAccessor &operator=(RecordAccessor &&other) = default;
|
||||
|
||||
/**
|
||||
* Gets the property for the given key.
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
/** Gets the property for the given key. */
|
||||
const PropertyValue &PropsAt(GraphDbTypes::Property key) const;
|
||||
|
||||
/**
|
||||
* Sets a value on the record for the given property, operates on edge.
|
||||
*
|
||||
* @param key Property key.
|
||||
* @param value The value to set.
|
||||
*/
|
||||
/** Sets a value on the record for the given property. */
|
||||
void PropsSet(GraphDbTypes::Property key, PropertyValue value);
|
||||
|
||||
/**
|
||||
* Erases the property for the given key.
|
||||
*
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
/** Erases the property for the given key. */
|
||||
size_t PropsErase(GraphDbTypes::Property key);
|
||||
|
||||
/**
|
||||
* Removes all the properties from this record.
|
||||
*/
|
||||
/** Removes all the properties from this record. */
|
||||
void PropsClear();
|
||||
|
||||
/**
|
||||
* Returns the properties of this record.
|
||||
* @return
|
||||
*/
|
||||
/** Returns the properties of this record. */
|
||||
const PropertyValueStore<GraphDbTypes::Property> &Properties() const;
|
||||
|
||||
void PropertiesAccept(std::function<void(const GraphDbTypes::Property key,
|
||||
const PropertyValue &prop)>
|
||||
handler,
|
||||
std::function<void()> finish = {}) const;
|
||||
bool operator==(const RecordAccessor &other) const;
|
||||
|
||||
/**
|
||||
* This should be used with care as it's comparing vlist_ pointer records and
|
||||
* not actual values inside RecordAccessors.
|
||||
*/
|
||||
bool operator<(const RecordAccessor &other) const {
|
||||
DCHECK(db_accessor_ == other.db_accessor_)
|
||||
<< "Not in the same transaction.";
|
||||
return vlist_ < other.vlist_;
|
||||
}
|
||||
|
||||
bool operator==(const RecordAccessor &other) const {
|
||||
DCHECK(db_accessor_ == other.db_accessor_)
|
||||
<< "Not in the same transaction.";
|
||||
return vlist_ == other.vlist_;
|
||||
}
|
||||
|
||||
/** Enables equality check against a version list pointer. This makes it
|
||||
* possible to check if an accessor and a vlist ptr represent the same graph
|
||||
* element without creating an accessor (not very cheap). */
|
||||
bool operator==(const mvcc::VersionList<TRecord> *other_vlist) const {
|
||||
return vlist_ == other_vlist;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a GraphDB accessor of this record accessor.
|
||||
*
|
||||
* @return See above.
|
||||
*/
|
||||
/** Returns a GraphDB accessor of this record accessor. */
|
||||
GraphDbAccessor &db_accessor() const;
|
||||
|
||||
/** Returns a database-unique index of this vertex or edge. Note that vertices
|
||||
* and edges have separate GID domains, there can be a vertex with GID X and
|
||||
* an edge with the same gid.
|
||||
/**
|
||||
* Returns a globally-unique ID of this vertex or edge. Note that vertices
|
||||
* and edges have separate ID domains, there can be a vertex with ID X and an
|
||||
* edge with the same id.
|
||||
*/
|
||||
gid::Gid gid() const { return vlist_->gid_; }
|
||||
gid::Gid gid() const;
|
||||
|
||||
AddressT address() const;
|
||||
|
||||
/*
|
||||
* Switches this record accessor to use the latest
|
||||
* version visible to the current transaction+command.
|
||||
* Possibly the one that was created by this transaction+command.
|
||||
* Switches this record accessor to use the latest version visible to the
|
||||
* current transaction+command. Possibly the one that was created by this
|
||||
* transaction+command.
|
||||
*
|
||||
* @return A reference to this.
|
||||
*/
|
||||
@ -135,38 +105,35 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
|
||||
* the current transaction+command. If that is not possible (vertex/edge was
|
||||
* created by the current transaction/command), it does nothing (current
|
||||
* remains pointing to the new version).
|
||||
*
|
||||
* @return A reference to this.
|
||||
*/
|
||||
RecordAccessor<TRecord> &SwitchOld();
|
||||
|
||||
/**
|
||||
Reconstructs the internal state of the record accessor so it uses the
|
||||
versions appropriate to this transaction+command.
|
||||
* Reconstructs the internal state of the record accessor so it uses the
|
||||
* versions appropriate to this transaction+command.
|
||||
*
|
||||
@return True if this accessor is valid after reconstruction. This means that
|
||||
at least one record pointer was found (either new_ or old_), possibly both.
|
||||
* @return True if this accessor is valid after reconstruction. This means
|
||||
* that at least one record pointer was found (either new_ or old_), possibly
|
||||
* both.
|
||||
*/
|
||||
bool Reconstruct() const;
|
||||
|
||||
/**
|
||||
* Returns true if the given accessor is visible to the given transaction.
|
||||
*
|
||||
* @param current_state If true then the graph state for the
|
||||
* current transaction+command is returned (insertions, updates and
|
||||
* deletions performed in the current transaction+command are not
|
||||
* ignored).
|
||||
*/
|
||||
bool Visible(const tx::Transaction &t, bool current_state) const {
|
||||
return (old_ && !(current_state && old_->is_expired_by(t))) ||
|
||||
(current_state && new_ && !new_->is_expired_by(t));
|
||||
}
|
||||
|
||||
protected:
|
||||
/**
|
||||
* Ensures there is an updateable version of the record in the version_list,
|
||||
* and that the `new_` pointer points to it. Returns a reference to that
|
||||
* version.
|
||||
*
|
||||
* It is not legal to call this function on a Vertex/Edge that has been
|
||||
* deleted in the current transaction+command.
|
||||
*/
|
||||
TRecord &update() const;
|
||||
|
||||
/**
|
||||
* Returns the current version (either new_ or old_)
|
||||
* set on this RecordAccessor.
|
||||
*
|
||||
* @return See above.
|
||||
*/
|
||||
const TRecord ¤t() const;
|
||||
|
||||
/**
|
||||
* Pointer to the version (either old_ or new_) that READ operations
|
||||
* in the accessor should take data from. Note that WRITE operations
|
||||
@ -177,9 +144,33 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
|
||||
*/
|
||||
mutable TRecord *current_{nullptr};
|
||||
|
||||
// The record (edge or vertex) this accessor provides access to.
|
||||
// Immutable, set in the constructor and never changed.
|
||||
mvcc::VersionList<TRecord> *vlist_;
|
||||
/**
|
||||
* Ensures there is an updateable version of the record in the version_list,
|
||||
* and that the `new_` pointer points to it. Returns a reference to that
|
||||
* version.
|
||||
*
|
||||
* It is not legal to call this function on a Vertex/Edge that has been
|
||||
* deleted in the current transaction+command.
|
||||
*/
|
||||
TRecord &update() const;
|
||||
|
||||
/** Returns the current version (either new_ or old_) set on this
|
||||
* RecordAccessor. */
|
||||
const TRecord ¤t() const;
|
||||
|
||||
/** Indicates if this accessor represents a local Vertex/Edge, or one whose
|
||||
* owner is some other worker in a distributed system. */
|
||||
bool is_local() const { return address_.is_local(); }
|
||||
|
||||
/**
|
||||
* Processes the delta that's a consequence of changes in this accessor. If
|
||||
* the accessor is local that means writing the delta to the write-ahead log.
|
||||
* If it's remote, then the delta needs to be sent to it's owner for
|
||||
* processing.
|
||||
*
|
||||
* @param delta The delta to process.
|
||||
*/
|
||||
void ProcessDelta(const GraphStateDelta &delta) const;
|
||||
|
||||
private:
|
||||
// The database accessor for which this record accessor is created
|
||||
@ -187,6 +178,8 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
|
||||
// Immutable, set in the constructor and never changed.
|
||||
GraphDbAccessor *db_accessor_;
|
||||
|
||||
AddressT address_;
|
||||
|
||||
/**
|
||||
* Latest version which is visible to the current transaction+command
|
||||
* but has not been created nor modified by the current transaction+command.
|
||||
|
@ -19,6 +19,7 @@ bool VertexAccessor::add_label(GraphDbTypes::Label label) {
|
||||
vertex.labels_.emplace_back(label);
|
||||
auto &dba = db_accessor();
|
||||
dba.UpdateLabelIndices(label, *this, &vertex);
|
||||
// TODO support distributed.
|
||||
dba.wal().Emplace(database::StateDelta::AddLabel(dba.transaction_id(), gid(),
|
||||
dba.LabelName(label)));
|
||||
return true;
|
||||
@ -32,6 +33,7 @@ size_t VertexAccessor::remove_label(GraphDbTypes::Label label) {
|
||||
std::swap(*found, labels.back());
|
||||
labels.pop_back();
|
||||
auto &dba = db_accessor();
|
||||
// TODO support distributed.
|
||||
dba.wal().Emplace(database::StateDelta::RemoveLabel(
|
||||
dba.transaction_id(), gid(), dba.LabelName(label)));
|
||||
return 1;
|
||||
@ -52,10 +54,9 @@ std::ostream &operator<<(std::ostream &os, const VertexAccessor &va) {
|
||||
stream << va.db_accessor().LabelName(label);
|
||||
});
|
||||
os << " {";
|
||||
utils::PrintIterable(os, va.Properties(), ", ",
|
||||
[&](auto &stream, const auto &pair) {
|
||||
stream << va.db_accessor().PropertyName(pair.first)
|
||||
<< ": " << pair.second;
|
||||
});
|
||||
utils::PrintIterable(os, va.Properties(), ", ", [&](auto &stream,
|
||||
const auto &pair) {
|
||||
stream << va.db_accessor().PropertyName(pair.first) << ": " << pair.second;
|
||||
});
|
||||
return os << "})";
|
||||
}
|
||||
|
@ -35,13 +35,11 @@ class VertexAccessor : public RecordAccessor<Vertex> {
|
||||
GraphDbAccessor &db_accessor) {
|
||||
return iter::imap(
|
||||
[from, vertex, &db_accessor](auto &edges_element) {
|
||||
// Currently only local storage is supported.
|
||||
if (from) {
|
||||
return EdgeAccessor(*edges_element.edge.local(), db_accessor,
|
||||
vertex, edges_element.vertex,
|
||||
edges_element.edge_type);
|
||||
return EdgeAccessor(edges_element.edge.local(), db_accessor, vertex,
|
||||
edges_element.vertex, edges_element.edge_type);
|
||||
} else {
|
||||
return EdgeAccessor(*edges_element.edge.local(), db_accessor,
|
||||
return EdgeAccessor(edges_element.edge.local(), db_accessor,
|
||||
edges_element.vertex, vertex,
|
||||
edges_element.edge_type);
|
||||
}
|
||||
@ -51,58 +49,36 @@ class VertexAccessor : public RecordAccessor<Vertex> {
|
||||
}
|
||||
|
||||
public:
|
||||
VertexAccessor(mvcc::VersionList<Vertex> &vertex,
|
||||
GraphDbAccessor &db_accessor)
|
||||
: RecordAccessor(vertex, db_accessor) {
|
||||
VertexAccessor(VertexAddress address, GraphDbAccessor &db_accessor)
|
||||
: RecordAccessor(address, db_accessor) {
|
||||
RecordAccessor::Reconstruct();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of outgoing edges.
|
||||
* @return
|
||||
*/
|
||||
/** Returns the number of outgoing edges. */
|
||||
size_t out_degree() const;
|
||||
|
||||
/**
|
||||
* Returns the number of incoming edges.
|
||||
* @return
|
||||
*/
|
||||
/** Returns the number of incoming edges. */
|
||||
size_t in_degree() const;
|
||||
|
||||
/**
|
||||
* Adds a label to the Vertex. If the Vertex already
|
||||
* has that label the call has no effect.
|
||||
* @param label A label.
|
||||
* @return If or not a new Label was set on this Vertex.
|
||||
*/
|
||||
/** Adds a label to the Vertex. If the Vertex already has that label the call
|
||||
* has no effect. */
|
||||
// TODO revise return value, is it necessary?
|
||||
bool add_label(GraphDbTypes::Label label);
|
||||
|
||||
/**
|
||||
* Removes a label from the Vertex.
|
||||
* @param label The label to remove.
|
||||
* @return The number of removed labels (can be 0 or 1).
|
||||
*/
|
||||
/** Removes a label from the Vertex. Return number of removed (0, 1). */
|
||||
// TODO reves return value, is it necessary?
|
||||
size_t remove_label(GraphDbTypes::Label label);
|
||||
|
||||
/**
|
||||
* Indicates if the Vertex has the given label.
|
||||
* @param label A label.
|
||||
* @return
|
||||
*/
|
||||
/** Indicates if the Vertex has the given label. */
|
||||
bool has_label(GraphDbTypes::Label label) const;
|
||||
|
||||
/**
|
||||
* Returns all the Labels of the Vertex.
|
||||
* @return
|
||||
*/
|
||||
/** Returns all the Labels of the Vertex. */
|
||||
const std::vector<GraphDbTypes::Label> &labels() const;
|
||||
|
||||
/**
|
||||
* Returns EdgeAccessors for all incoming edges.
|
||||
*/
|
||||
/** Returns EdgeAccessors for all incoming edges. */
|
||||
auto in() const {
|
||||
return MakeAccessorIterator(current().in_.begin(), current().in_.end(),
|
||||
false, vlist_, db_accessor());
|
||||
false, address(), db_accessor());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -115,8 +91,8 @@ class VertexAccessor : public RecordAccessor<Vertex> {
|
||||
auto in(
|
||||
const VertexAccessor &dest,
|
||||
const std::vector<GraphDbTypes::EdgeType> *edge_types = nullptr) const {
|
||||
return MakeAccessorIterator(current().in_.begin(dest.vlist_, edge_types),
|
||||
current().in_.end(), false, vlist_,
|
||||
return MakeAccessorIterator(current().in_.begin(dest.address(), edge_types),
|
||||
current().in_.end(), false, address(),
|
||||
db_accessor());
|
||||
}
|
||||
|
||||
@ -128,16 +104,14 @@ class VertexAccessor : public RecordAccessor<Vertex> {
|
||||
*/
|
||||
auto in(const std::vector<GraphDbTypes::EdgeType> *edge_types) const {
|
||||
return MakeAccessorIterator(current().in_.begin(nullptr, edge_types),
|
||||
current().in_.end(), false, vlist_,
|
||||
current().in_.end(), false, address(),
|
||||
db_accessor());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns EdgeAccessors for all outgoing edges.
|
||||
*/
|
||||
/** Returns EdgeAccessors for all outgoing edges. */
|
||||
auto out() const {
|
||||
return MakeAccessorIterator(current().out_.begin(), current().out_.end(),
|
||||
true, vlist_, db_accessor());
|
||||
true, address(), db_accessor());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -151,9 +125,9 @@ class VertexAccessor : public RecordAccessor<Vertex> {
|
||||
auto out(
|
||||
const VertexAccessor &dest,
|
||||
const std::vector<GraphDbTypes::EdgeType> *edge_types = nullptr) const {
|
||||
return MakeAccessorIterator(current().out_.begin(dest.vlist_, edge_types),
|
||||
current().out_.end(), true, vlist_,
|
||||
db_accessor());
|
||||
return MakeAccessorIterator(
|
||||
current().out_.begin(dest.address(), edge_types), current().out_.end(),
|
||||
true, address(), db_accessor());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -164,7 +138,7 @@ class VertexAccessor : public RecordAccessor<Vertex> {
|
||||
*/
|
||||
auto out(const std::vector<GraphDbTypes::EdgeType> *edge_types) const {
|
||||
return MakeAccessorIterator(current().out_.begin(nullptr, edge_types),
|
||||
current().out_.end(), true, vlist_,
|
||||
current().out_.end(), true, address(),
|
||||
db_accessor());
|
||||
}
|
||||
};
|
||||
|
@ -1,4 +1,3 @@
|
||||
#include <map>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
@ -17,9 +16,8 @@
|
||||
* It is possible to run test with custom seed with:
|
||||
* RC_PARAMS="seed=1" ./random_graph
|
||||
*/
|
||||
RC_GTEST_PROP(RandomGraph, RandomGraph,
|
||||
(std::vector<std::string> vertex_labels,
|
||||
std::vector<std::string> edge_types)) {
|
||||
RC_GTEST_PROP(RandomGraph, RandomGraph, (std::vector<std::string> vertex_labels,
|
||||
std::vector<std::string> edge_types)) {
|
||||
RC_PRE(!vertex_labels.empty());
|
||||
RC_PRE(!edge_types.empty());
|
||||
|
||||
@ -28,8 +26,8 @@ RC_GTEST_PROP(RandomGraph, RandomGraph,
|
||||
|
||||
GraphDb db;
|
||||
std::vector<VertexAccessor> vertices;
|
||||
std::map<VertexAccessor, std::string> vertex_label_map;
|
||||
std::map<EdgeAccessor, std::string> edge_type_map;
|
||||
std::unordered_map<VertexAccessor, std::string> vertex_label_map;
|
||||
std::unordered_map<EdgeAccessor, std::string> edge_type_map;
|
||||
|
||||
GraphDbAccessor dba(db);
|
||||
|
||||
|
@ -58,24 +58,6 @@ TEST(RecordAccessor, RecordEquality) {
|
||||
EXPECT_NE(e1, e2);
|
||||
}
|
||||
|
||||
TEST(RecordAccessor, RecordLessThan) {
|
||||
GraphDb db;
|
||||
GraphDbAccessor dba(db);
|
||||
|
||||
auto v1 = dba.InsertVertex();
|
||||
auto v2 = dba.InsertVertex();
|
||||
EXPECT_NE(v1, v2);
|
||||
EXPECT_TRUE(v1 < v2 || v2 < v1);
|
||||
EXPECT_FALSE(v1 < v1);
|
||||
EXPECT_FALSE(v2 < v2);
|
||||
auto e1 = dba.InsertEdge(v1, v2, dba.EdgeType("type"));
|
||||
auto e2 = dba.InsertEdge(v1, v2, dba.EdgeType("type"));
|
||||
EXPECT_NE(e1, e2);
|
||||
EXPECT_TRUE(e1 < e2 || e2 < e1);
|
||||
EXPECT_FALSE(e1 < e1);
|
||||
EXPECT_FALSE(e2 < e2);
|
||||
}
|
||||
|
||||
TEST(RecordAccessor, SwitchOldAndSwitchNewMemberFunctionTest) {
|
||||
GraphDb db;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user