Switch from Cache to LruCache

Summary: This diff contains refactor and various fixes needed for LruCache.

Reviewers: msantl, ipaljak

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1889
This commit is contained in:
Vinko Kasljevic 2019-03-12 15:26:43 +01:00
parent d3e00635c6
commit 1c768287e8
23 changed files with 656 additions and 358 deletions

View File

@ -77,6 +77,14 @@ DEFINE_VALIDATED_int32(recovering_cluster_size, 0,
DEFINE_bool(dynamic_graph_partitioner_enabled, false,
"If the dynamic graph partitioner should be enabled.");
DEFINE_VALIDATED_uint64(vertex_cache_size, 5000,
"Size of cache used for storing remote vertices",
FLAG_IN_RANGE(1, std::numeric_limits<uint64_t>::max()));
DEFINE_VALIDATED_uint64(edge_cache_size, 5000,
"Size of cache used for storing remote edges",
FLAG_IN_RANGE(1, std::numeric_limits<uint64_t>::max()));
database::Config::Config()
// Durability flags.
: durability_enabled{FLAGS_durability_enabled},
@ -101,4 +109,6 @@ database::Config::Config()
static_cast<uint16_t>(FLAGS_master_port)},
worker_endpoint{FLAGS_worker_host,
static_cast<uint16_t>(FLAGS_worker_port)},
recovering_cluster_size{FLAGS_recovering_cluster_size} {}
recovering_cluster_size{FLAGS_recovering_cluster_size},
vertex_cache_size{FLAGS_vertex_cache_size},
edge_cache_size{FLAGS_edge_cache_size} {}

View File

@ -297,7 +297,9 @@ class Master {
distributed::PullRpcClients pull_clients_{&coordination_, &data_manager_};
distributed::UpdatesRpcServer updates_server_{self_, &coordination_};
distributed::UpdatesRpcClients updates_clients_{&coordination_};
distributed::DataManager data_manager_{*self_, data_clients_};
distributed::DataManager data_manager_{*self_, data_clients_,
config_.vertex_cache_size,
config_.edge_cache_size};
distributed::ClusterDiscoveryMaster cluster_discovery_{
&coordination_, config_.durability_directory};
distributed::TokenSharingRpcServer token_sharing_server_{
@ -667,7 +669,9 @@ class Worker {
distributed::IndexRpcServer index_rpc_server_{self_, &coordination_};
distributed::UpdatesRpcServer updates_server_{self_, &coordination_};
distributed::UpdatesRpcClients updates_clients_{&coordination_};
distributed::DataManager data_manager_{*self_, data_clients_};
distributed::DataManager data_manager_{*self_, data_clients_,
config_.vertex_cache_size,
config_.edge_cache_size};
distributed::DurabilityRpcWorker durability_rpc_{self_, &coordination_};
distributed::ClusterDiscoveryWorker cluster_discovery_{
&coordination_};

View File

@ -66,6 +66,11 @@ struct Config {
io::network::Endpoint master_endpoint{"0.0.0.0", 0};
io::network::Endpoint worker_endpoint{"0.0.0.0", 0};
int recovering_cluster_size{0};
// Sizes of caches that hold remote data
// Default value is same as in config.cpp
size_t vertex_cache_size{5000};
size_t edge_cache_size{5000};
};
class GraphDbAccessor;

View File

@ -225,7 +225,7 @@ void GraphDbAccessor::PopulateIndex(const LabelPropertyIndex::Key &key) {
if (vertex.PropsAt(key.property_).type() == PropertyValue::Type::Null)
continue;
db_.storage().label_property_index_.UpdateOnLabelProperty(
vertex.address().local(), vertex.current_);
vertex.address().local(), vertex.GetCurrent());
}
}
@ -328,7 +328,7 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor,
// it's possible the vertex was removed already in this transaction
// due to it getting matched multiple times by some patterns
// we can only delete it once, so check if it's already deleted
if (vertex_accessor.current().is_expired_by(transaction_)) return true;
if (vertex_accessor.GetCurrent()->is_expired_by(transaction_)) return true;
if (check_empty &&
vertex_accessor.out_degree() + vertex_accessor.in_degree() > 0)
return false;
@ -336,13 +336,14 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor,
auto *vlist_ptr = vertex_accessor.address().local();
wal().Emplace(database::StateDelta::RemoveVertex(
transaction_.id_, vlist_ptr->gid_, check_empty));
vlist_ptr->remove(vertex_accessor.current_, transaction_);
vlist_ptr->remove(vertex_accessor.GetCurrent(), transaction_);
return true;
} else {
auto address = vertex_accessor.address();
updates_clients().RemoveVertex(address.worker_id(), transaction_id(),
address.gid(), check_empty);
updates_clients().RemoveVertex(worker_id(), address.worker_id(),
transaction_id(), address.gid(),
check_empty);
// We can't know if we are going to be able to remove vertex until
// deferred updates on a remote worker are executed
return true;
@ -391,7 +392,8 @@ storage::EdgeAddress GraphDbAccessor::InsertEdgeOnFrom(
auto edge_address = edge_accessor.address();
from->SwitchNew();
auto from_updated = &from->update();
from->update();
auto from_updated = from->GetNew();
// TODO when preparing WAL for distributed, most likely never use
// `CREATE_EDGE`, but always have it split into 3 parts (edge insertion,
@ -406,19 +408,19 @@ storage::EdgeAddress GraphDbAccessor::InsertEdgeOnFrom(
return edge_address;
} else {
auto created_edge_info = updates_clients().CreateEdge(
transaction_id(), *from, *to, edge_type, cypher_id);
worker_id(), transaction_id(), *from, *to, edge_type, cypher_id);
auto edge_address = created_edge_info.edge_address;
auto *from_updated =
data_manager().FindNew<Vertex>(transaction_id(), from->gid());
// Create an Edge and insert it into the Cache so we see it locally.
auto guard = storage::GetDataLock(*from);
from->update();
from->GetNew()->out_.emplace(
db().storage().LocalizedAddressIfPossible(to->address()), edge_address,
edge_type);
data_manager().Emplace<Edge>(
transaction_id(), edge_address.gid(),
distributed::CachedRecordData<Edge>(
created_edge_info.cypher_id, nullptr,
std::make_unique<Edge>(from->address(), to->address(), edge_type)));
from_updated->out_.emplace(
db().storage().LocalizedAddressIfPossible(to->address()), edge_address,
edge_type);
return edge_address;
}
}
@ -431,7 +433,8 @@ void GraphDbAccessor::InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to,
// WARNING: Must do that after the above "from->update()" for cases when
// we are creating a cycle and "from" and "to" are the same vlist.
to->SwitchNew();
auto *to_updated = &to->update();
to->update();
auto *to_updated = to->GetNew();
to_updated->in_.emplace(
db_.storage().LocalizedAddressIfPossible(from->address()), edge_address,
edge_type);
@ -441,12 +444,13 @@ void GraphDbAccessor::InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to,
if (from->is_local() ||
from->address().worker_id() != to->address().worker_id()) {
updates_clients().AddInEdge(
transaction_id(), *from,
worker_id(), transaction_id(), *from,
db().storage().GlobalizedAddress(edge_address), *to, edge_type);
}
auto *to_updated =
data_manager().FindNew<Vertex>(transaction_id(), to->gid());
to_updated->in_.emplace(
auto guard = storage::GetDataLock(*to);
to->update();
to->GetNew()->in_.emplace(
db().storage().LocalizedAddressIfPossible(from->address()),
edge_address, edge_type);
}
@ -487,11 +491,11 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge,
// due to it getting matched multiple times by some patterns
// we can only delete it once, so check if it's already deleted
edge.SwitchNew();
if (edge.current().is_expired_by(transaction_)) return;
if (edge.GetCurrent()->is_expired_by(transaction_)) return;
if (remove_out_edge) edge.from().RemoveOutEdge(edge.address());
if (remove_in_edge) edge.to().RemoveInEdge(edge.address());
edge.address().local()->remove(edge.current_, transaction_);
edge.address().local()->remove(edge.GetCurrent(), transaction_);
wal().Emplace(
database::StateDelta::RemoveEdge(transaction_.id_, edge.gid()));
} else {
@ -500,13 +504,15 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge,
CHECK(edge_addr.worker_id() == from_addr.worker_id())
<< "Edge and it's 'from' vertex not on the same worker";
auto to_addr = db().storage().GlobalizedAddress(edge.to_addr());
updates_clients().RemoveEdge(transaction_id(), edge_addr.worker_id(),
edge_addr.gid(), from_addr.gid(), to_addr);
updates_clients().RemoveEdge(worker_id(), edge_addr.worker_id(),
transaction_id(), edge_addr.gid(),
from_addr.gid(), to_addr);
// Another RPC is necessary only if the first did not handle vertices on
// both sides.
if (edge_addr.worker_id() != to_addr.worker_id()) {
updates_clients().RemoveInEdge(transaction_id(), to_addr.worker_id(),
to_addr.gid(), edge_addr);
updates_clients().RemoveInEdge(worker_id(), to_addr.worker_id(),
transaction_id(), to_addr.gid(),
edge_addr);
}
}
}

View File

@ -0,0 +1,23 @@
/// @file
#pragma once
#include <memory>
namespace distributed {
/// A wrapper for cached vertex/edge from other machines in the distributed
/// system.
///
/// @tparam TRecord Vertex or Edge
template <typename TRecord>
struct CachedRecordData {
CachedRecordData(int64_t cypher_id, std::unique_ptr<TRecord> old_record,
std::unique_ptr<TRecord> new_record)
: cypher_id(cypher_id),
old_record(std::move(old_record)),
new_record(std::move(new_record)) {}
int64_t cypher_id;
std::unique_ptr<TRecord> old_record;
std::unique_ptr<TRecord> new_record;
};
} // namespace distributed

View File

@ -8,7 +8,7 @@ template <typename TCache>
void ClearCache(TCache &cache, tx::TransactionId tx_id) {
auto access = cache.access();
auto found = access.find(tx_id);
if (found != access.end()) found->second.clear();
if (found != access.end()) found->second.Clear();
}
template <typename TCache>
@ -35,9 +35,23 @@ DataManager::CacheT<Edge> &DataManager::caches<Edge>() {
return edges_caches_;
}
template <>
size_t DataManager::GetInitSize<Vertex>() const {
return vertex_cache_size_;
}
template <>
size_t DataManager::GetInitSize<Edge>() const {
return edge_cache_size_;
}
DataManager::DataManager(database::GraphDb &db,
distributed::DataRpcClients &data_clients)
: db_(db), data_clients_(data_clients) {}
distributed::DataRpcClients &data_clients,
size_t vertex_cache_size, size_t edge_cache_size)
: vertex_cache_size_(vertex_cache_size),
edge_cache_size_(edge_cache_size),
db_(db),
data_clients_(data_clients) {}
std::mutex &DataManager::GetLock(tx::TransactionId tx_id) {
auto accessor = lock_store_.access();
@ -81,4 +95,3 @@ void DataManager::ClearTransactionalCache(tx::TransactionId oldest_active) {
}
} // namespace distributed

View File

@ -4,6 +4,7 @@
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/distributed/graph_db.hpp"
#include "distributed/cached_record_data.hpp"
#include "distributed/data_rpc_clients.hpp"
#include "transactions/type.hpp"
#include "utils/cache.hpp"
@ -12,115 +13,55 @@ class Vertex;
class Edge;
namespace distributed {
/// A wrapper for cached vertex/edge from other machines in the distributed
/// system.
///
/// @tparam TRecord Vertex or Edge
template <typename TRecord>
struct CachedRecordData {
CachedRecordData(int64_t cypher_id, std::unique_ptr<TRecord> old_record,
std::unique_ptr<TRecord> new_record)
: cypher_id(cypher_id),
old_record(std::move(old_record)),
new_record(std::move(new_record)) {}
int64_t cypher_id;
std::unique_ptr<TRecord> old_record;
std::unique_ptr<TRecord> new_record;
};
/// Handles remote data caches for edges and vertices, per transaction.
class DataManager {
template <typename TRecord>
using CacheG = utils::Cache<gid::Gid, CachedRecordData<TRecord>>;
using CacheG =
utils::LruCache<gid::Gid, std::shared_ptr<CachedRecordData<TRecord>>>;
template <typename TRecord>
using CacheT = ConcurrentMap<tx::TransactionId, CacheG<TRecord>>;
public:
DataManager(database::GraphDb &db, distributed::DataRpcClients &data_clients);
/// Returns the new data for the given ID. Creates it (as copy of old) if
/// necessary.
template <typename TRecord>
TRecord *FindNew(tx::TransactionId tx_id, gid::Gid gid) {
auto &cache = GetCache<TRecord>(tx_id);
std::lock_guard<std::mutex> guard(GetLock(tx_id));
auto found = cache.find(gid);
DCHECK(found != cache.end())
<< "FindNew is called on uninitialized remote Vertex/Edge";
auto &data = found->second;
if (!data.new_record) {
data.new_record = std::unique_ptr<TRecord>(data.old_record->CloneData());
}
return data.new_record.get();
}
/// For the Vertex/Edge with the given global ID, looks for the data visible
/// from the given transaction's ID and command ID, and caches it. Sets the
/// given pointers to point to the fetched data. Analogue to
/// mvcc::VersionList::find_set_old_new.
// TODO (vkasljevic) remove this and use Find instead
template <typename TRecord>
void FindSetOldNew(tx::TransactionId tx_id, int worker_id, gid::Gid gid,
TRecord **old_record, TRecord **new_record) {
auto &cache = GetCache<TRecord>(tx_id);
auto &lock = GetLock(tx_id);
{
std::lock_guard<std::mutex> guard(lock);
auto found = cache.find(gid);
if (found != cache.end()) {
*old_record = found->second.old_record.get();
*new_record = found->second.new_record.get();
return;
}
}
auto remote = data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid);
if (remote.old_record_ptr) LocalizeAddresses(*remote.old_record_ptr);
if (remote.new_record_ptr) LocalizeAddresses(*remote.new_record_ptr);
// This logic is a bit strange because we need to make sure that someone
// else didn't get a response and updated the cache before we did and we
// need a lock for that, but we also need to check if we can now return
// that result - otherwise we could get incosistent results for remote
// FindSetOldNew
std::lock_guard<std::mutex> guard(lock);
auto it_pair = cache.emplace(
std::move(gid), CachedRecordData<TRecord>(
remote.cypher_id, std::move(remote.old_record_ptr),
std::move(remote.new_record_ptr)));
*old_record = it_pair.first->second.old_record.get();
*new_record = it_pair.first->second.new_record.get();
}
DataManager(database::GraphDb &db, distributed::DataRpcClients &data_clients,
size_t vertex_cache_size, size_t edge_cache_size);
/// Finds cached element for the given transaction, worker and gid.
///
/// @tparam TRecord Vertex or Edge
template <typename TRecord>
const CachedRecordData<TRecord> &Find(tx::TransactionId tx_id, int worker_id,
gid::Gid gid) {
std::shared_ptr<CachedRecordData<TRecord>> Find(tx::TransactionId tx_id,
int from_worker_id,
int worker_id, gid::Gid gid,
bool to_update = false) {
auto &cache = GetCache<TRecord>(tx_id);
std::unique_lock<std::mutex> guard(GetLock(tx_id));
auto found = cache.find(gid);
if (found != cache.end()) {
return found->second;
auto found = cache.Find(gid);
if (found) {
auto data = *found;
if (to_update && !data->new_record) {
data->new_record.reset(data->old_record->CloneData());
}
return data;
} else {
guard.unlock();
auto remote = data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid);
auto remote = data_clients_.RemoteElement<TRecord>(from_worker_id,
worker_id, tx_id, gid);
if (remote.old_record_ptr) LocalizeAddresses(*remote.old_record_ptr);
if (remote.new_record_ptr) LocalizeAddresses(*remote.new_record_ptr);
if (to_update && !remote.new_record_ptr) {
remote.new_record_ptr.reset(remote.old_record_ptr->CloneData());
}
guard.lock();
return cache
.emplace(std::move(gid),
CachedRecordData<TRecord>(remote.cypher_id,
std::move(remote.old_record_ptr),
std::move(remote.new_record_ptr)))
.first->second;
auto data =
std::make_shared<CachedRecordData<TRecord>>(CachedRecordData<TRecord>{
remote.cypher_id, std::move(remote.old_record_ptr),
std::move(remote.new_record_ptr)});
cache.Insert(gid, data);
return data;
}
}
@ -128,16 +69,17 @@ class DataManager {
template <typename TRecord>
void Emplace(tx::TransactionId tx_id, gid::Gid gid,
CachedRecordData<TRecord> data) {
if (data.old_record) LocalizeAddresses(*data.old_record);
if (data.new_record) LocalizeAddresses(*data.new_record);
std::lock_guard<std::mutex> guard(GetLock(tx_id));
// We can't replace existing data because some accessors might be using
// it.
// TODO - consider if it's necessary and OK to copy just the data content.
auto &cache = GetCache<TRecord>(tx_id);
auto found = cache.find(gid);
if (found == cache.end()) cache.emplace(std::move(gid), std::move(data));
auto found = cache.Find(gid);
if (!found) {
if (data.old_record) LocalizeAddresses(*data.old_record);
if (data.new_record) LocalizeAddresses(*data.new_record);
cache.Insert(gid, std::make_shared<CachedRecordData<TRecord>>(std::move(data)));
}
}
/// Removes all the caches for a single transaction.
@ -151,13 +93,18 @@ class DataManager {
template <typename TRecord>
void LocalizeAddresses(TRecord &record);
template <typename TRecord>
size_t GetInitSize() const;
template <typename TRecord>
CacheG<TRecord> &GetCache(tx::TransactionId tx_id) {
auto accessor = caches<TRecord>().access();
auto found = accessor.find(tx_id);
if (found != accessor.end()) return found->second;
return accessor.emplace(tx_id, std::make_tuple(tx_id), std::make_tuple())
return accessor
.emplace(tx_id, std::make_tuple(tx_id),
std::make_tuple(GetInitSize<TRecord>()))
.first->second;
}
@ -166,6 +113,9 @@ class DataManager {
template <typename TRecord>
CacheT<TRecord> &caches();
size_t vertex_cache_size_;
size_t edge_cache_size_;
database::GraphDb &db_;
DataRpcClients &data_clients_;
ConcurrentMap<tx::TransactionId, std::mutex> lock_store_;

View File

@ -9,22 +9,24 @@
namespace distributed {
template <>
RemoteElementInfo<Edge> DataRpcClients::RemoteElement(int worker_id,
RemoteElementInfo<Edge> DataRpcClients::RemoteElement(int from_worker_id,
int worker_id,
tx::TransactionId tx_id,
gid::Gid gid) {
auto response = coordination_->GetClientPool(worker_id)->Call<EdgeRpc>(
TxGidPair{tx_id, gid});
TxGidPair{tx_id, gid, from_worker_id});
return RemoteElementInfo<Edge>(response.cypher_id,
std::move(response.edge_old_output),
std::move(response.edge_new_output));
}
template <>
RemoteElementInfo<Vertex> DataRpcClients::RemoteElement(int worker_id,
RemoteElementInfo<Vertex> DataRpcClients::RemoteElement(int from_worker_id,
int worker_id,
tx::TransactionId tx_id,
gid::Gid gid) {
auto response = coordination_->GetClientPool(worker_id)->Call<VertexRpc>(
TxGidPair{tx_id, gid});
TxGidPair{tx_id, gid, from_worker_id});
return RemoteElementInfo<Vertex>(response.cypher_id,
std::move(response.vertex_old_output),
std::move(response.vertex_new_output));

View File

@ -46,7 +46,7 @@ class DataRpcClients {
/// That worker must own the vertex/edge for the given id, and that vertex
/// must be visible in given transaction.
template <typename TRecord>
RemoteElementInfo<TRecord> RemoteElement(int worker_id,
RemoteElementInfo<TRecord> RemoteElement(int from_worker_id, int worker_id,
tx::TransactionId tx_id,
gid::Gid gid);

View File

@ -22,7 +22,8 @@ cpp<#
(lcp:define-struct tx-gid-pair ()
((tx-id "tx::TransactionId" :capnp-type "UInt64")
(gid "gid::Gid" :capnp-type "UInt64"))
(gid "gid::Gid" :capnp-type "UInt64")
(from-worker-id :int64_t))
(:serialize (:slk) (:capnp)))
(lcp:define-rpc vertex

View File

@ -4,6 +4,7 @@
#include "database/distributed/graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/updates_rpc_server.hpp"
#include "distributed/data_rpc_messages.hpp"
namespace distributed {
@ -15,18 +16,32 @@ DataRpcServer::DataRpcServer(database::GraphDb *db,
[this](const auto &req_reader, auto *res_builder) {
auto dba = db_->Access(req_reader.getMember().getTxId());
auto vertex = dba->FindVertexRaw(req_reader.getMember().getGid());
VertexRes response(vertex.CypherId(), vertex.GetOld(),
vertex.GetNew(), db_->WorkerId());
auto *old = vertex.GetOld();
auto *newr = vertex.GetNew() ? vertex.GetNew()->CloneData() : nullptr;
db_->updates_server().ApplyDeltasToRecord(
dba->transaction().id_, req_reader.getMember().getGid(),
req_reader.getMember().getFromWorkerId(), &old, &newr);
VertexRes response(vertex.CypherId(), old, newr, db_->WorkerId());
Save(response, res_builder);
delete newr;
});
coordination->Register<EdgeRpc>(
[this](const auto &req_reader, auto *res_builder) {
auto dba = db_->Access(req_reader.getMember().getTxId());
auto edge = dba->FindEdgeRaw(req_reader.getMember().getGid());
EdgeRes response(edge.CypherId(), edge.GetOld(),
edge.GetNew(), db_->WorkerId());
auto *old = edge.GetOld();
auto *newr = edge.GetNew() ? edge.GetNew()->CloneData() : nullptr;
db_->updates_server().ApplyDeltasToRecord(
dba->transaction().id_, req_reader.getMember().getGid(),
req_reader.getMember().getFromWorkerId(), &old, &newr);
EdgeRes response(edge.CypherId(), old, newr, db_->WorkerId());
Save(response, res_builder);
delete newr;
});
coordination->Register<VertexCountRpc>(

View File

@ -25,9 +25,11 @@ void RaiseIfRemoteError(UpdateResult result) {
}
} // namespace
UpdateResult UpdatesRpcClients::Update(int worker_id,
UpdateResult UpdatesRpcClients::Update(int this_worker_id, int to_worker_id,
const database::StateDelta &delta) {
return coordination_->GetClientPool(worker_id)->Call<UpdateRpc>(delta).member;
return coordination_->GetClientPool(to_worker_id)
->Call<UpdateRpc>(delta, this_worker_id)
.member;
}
CreatedVertexInfo UpdatesRpcClients::CreateVertex(
@ -42,7 +44,7 @@ CreatedVertexInfo UpdatesRpcClients::CreateVertex(
return CreatedVertexInfo(res.member.cypher_id, res.member.gid);
}
CreatedEdgeInfo UpdatesRpcClients::CreateEdge(
CreatedEdgeInfo UpdatesRpcClients::CreateEdge(int this_worker_id,
tx::TransactionId tx_id, VertexAccessor &from, VertexAccessor &to,
storage::EdgeType edge_type,
std::experimental::optional<int64_t> cypher_id) {
@ -50,14 +52,15 @@ CreatedEdgeInfo UpdatesRpcClients::CreateEdge(
int from_worker = from.address().worker_id();
auto res =
coordination_->GetClientPool(from_worker)
->Call<CreateEdgeRpc>(CreateEdgeReqData{
->Call<CreateEdgeRpc>(CreateEdgeReqData{this_worker_id,
from.gid(), to.GlobalAddress(), edge_type, tx_id, cypher_id});
RaiseIfRemoteError(res.member.result);
return CreatedEdgeInfo(res.member.cypher_id,
storage::EdgeAddress{res.member.gid, from_worker});
}
void UpdatesRpcClients::AddInEdge(tx::TransactionId tx_id, VertexAccessor &from,
void UpdatesRpcClients::AddInEdge(int this_worker_id, tx::TransactionId tx_id,
VertexAccessor &from,
storage::EdgeAddress edge_address,
VertexAccessor &to,
storage::EdgeType edge_type) {
@ -67,32 +70,38 @@ void UpdatesRpcClients::AddInEdge(tx::TransactionId tx_id, VertexAccessor &from,
"`from` is not on the same worker as `to`.";
auto worker_id = to.GlobalAddress().worker_id();
auto res = coordination_->GetClientPool(worker_id)->Call<AddInEdgeRpc>(
AddInEdgeReqData{from.GlobalAddress(), edge_address, to.gid(), edge_type,
tx_id});
AddInEdgeReqData{this_worker_id, from.GlobalAddress(), edge_address,
to.gid(), edge_type, tx_id});
RaiseIfRemoteError(res.member);
}
void UpdatesRpcClients::RemoveVertex(int worker_id, tx::TransactionId tx_id,
gid::Gid gid, bool check_empty) {
auto res = coordination_->GetClientPool(worker_id)->Call<RemoveVertexRpc>(
RemoveVertexReqData{gid, tx_id, check_empty});
void UpdatesRpcClients::RemoveVertex(int this_worker_id, int to_worker_id,
tx::TransactionId tx_id, gid::Gid gid,
bool check_empty) {
auto res = coordination_->GetClientPool(to_worker_id)->Call<RemoveVertexRpc>(
RemoveVertexReqData{this_worker_id, gid, tx_id, check_empty});
RaiseIfRemoteError(res.member);
}
void UpdatesRpcClients::RemoveEdge(tx::TransactionId tx_id, int worker_id,
gid::Gid edge_gid, gid::Gid vertex_from_id,
void UpdatesRpcClients::RemoveEdge(int this_worker_id, int to_worker_id,
tx::TransactionId tx_id, gid::Gid edge_gid,
gid::Gid vertex_from_id,
storage::VertexAddress vertex_to_addr) {
auto res = coordination_->GetClientPool(worker_id)->Call<RemoveEdgeRpc>(
RemoveEdgeData{tx_id, edge_gid, vertex_from_id, vertex_to_addr});
auto res =
coordination_->GetClientPool(to_worker_id)
->Call<RemoveEdgeRpc>(RemoveEdgeData{this_worker_id, tx_id, edge_gid,
vertex_from_id, vertex_to_addr});
RaiseIfRemoteError(res.member);
}
void UpdatesRpcClients::RemoveInEdge(tx::TransactionId tx_id, int worker_id,
void UpdatesRpcClients::RemoveInEdge(int this_worker_id, int to_worker_id,
tx::TransactionId tx_id,
gid::Gid vertex_id,
storage::EdgeAddress edge_address) {
CHECK(edge_address.is_remote()) << "RemoveInEdge edge_address is local.";
auto res = coordination_->GetClientPool(worker_id)->Call<RemoveInEdgeRpc>(
RemoveInEdgeData{tx_id, vertex_id, edge_address});
auto res = coordination_->GetClientPool(to_worker_id)
->Call<RemoveInEdgeRpc>(RemoveInEdgeData{
this_worker_id, tx_id, vertex_id, edge_address});
RaiseIfRemoteError(res.member);
}

View File

@ -24,14 +24,14 @@ class UpdatesRpcClients {
: coordination_(coordination) {}
/// Sends an update delta to the given worker.
UpdateResult Update(int worker_id, const database::StateDelta &delta);
UpdateResult Update(int this_worker_id, int to_worker_id,
const database::StateDelta &delta);
/// Creates a vertex on the given worker and returns it's id.
CreatedVertexInfo CreateVertex(
int worker_id, tx::TransactionId tx_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, PropertyValue>
&properties,
const std::unordered_map<storage::Property, PropertyValue> &properties,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
@ -40,8 +40,9 @@ class UpdatesRpcClients {
/// handled by a call to this function. Otherwise a separate call to
/// `AddInEdge` might be necessary. Throws all the exceptions that can
/// occur remotely as a result of updating a vertex.
CreatedEdgeInfo CreateEdge(tx::TransactionId tx_id, VertexAccessor &from,
VertexAccessor &to, storage::EdgeType edge_type,
CreatedEdgeInfo CreateEdge(int this_worker_id, tx::TransactionId tx_id,
VertexAccessor &from, VertexAccessor &to,
storage::EdgeType edge_type,
std::experimental::optional<int64_t> cypher_id =
std::experimental::nullopt);
// TODO (buda): Another machine in the cluster is asked to create an edge.
@ -50,24 +51,25 @@ class UpdatesRpcClients {
/// Adds the edge with the given address to the `to` vertex as an incoming
/// edge. Only used when `to` is remote and not on the same worker as `from`.
void AddInEdge(tx::TransactionId tx_id, VertexAccessor &from,
storage::EdgeAddress edge_address, VertexAccessor &to,
storage::EdgeType edge_type);
void AddInEdge(int this_worker_id, tx::TransactionId tx_id,
VertexAccessor &from, storage::EdgeAddress edge_address,
VertexAccessor &to, storage::EdgeType edge_type);
/// Removes a vertex from the other worker.
void RemoveVertex(int worker_id, tx::TransactionId tx_id, gid::Gid gid,
bool check_empty);
void RemoveVertex(int this_worker_id, int to_worker_id,
tx::TransactionId tx_id, gid::Gid gid, bool check_empty);
/// Removes an edge on another worker. This also handles the `from` vertex
/// outgoing edge, as that vertex is on the same worker as the edge. If the
/// `to` vertex is on the same worker, then that side is handled too by the
/// single RPC call, otherwise a separate call has to be made to
/// RemoveInEdge.
void RemoveEdge(tx::TransactionId tx_id, int worker_id, gid::Gid edge_gid,
gid::Gid vertex_from_id,
void RemoveEdge(int this_worker_id, int to_worker_id, tx::TransactionId tx_id,
gid::Gid edge_gid, gid::Gid vertex_from_id,
storage::VertexAddress vertex_to_addr);
void RemoveInEdge(tx::TransactionId tx_id, int worker_id, gid::Gid vertex_id,
void RemoveInEdge(int this_worker_id, int to_worker_id,
tx::TransactionId tx_id, gid::Gid vertex_id,
storage::EdgeAddress edge_address);
/// Calls for all the workers (except the given one) to apply their updates

View File

@ -40,7 +40,8 @@ cpp<#
(:serialize))
(lcp:define-rpc update
(:request ((member "database::StateDelta" :capnp-type "Db.StateDelta")))
(:request ((member "database::StateDelta" :capnp-type "Db.StateDelta")
(worker-id :int64_t)))
(:response ((member "UpdateResult"))))
(lcp:define-rpc update-apply
@ -114,7 +115,8 @@ cpp<#
(:response ((member "CreateResult"))))
(lcp:define-struct create-edge-req-data ()
((from "gid::Gid")
((worker-id :int64_t)
(from "gid::Gid")
(to "storage::VertexAddress")
(edge-type "storage::EdgeType")
(tx-id "tx::TransactionId")
@ -145,7 +147,8 @@ cpp<#
(:response ((member "CreateResult"))))
(lcp:define-struct add-in-edge-req-data ()
((from "storage::VertexAddress")
((worker-id :int64_t)
(from "storage::VertexAddress")
(edge-address "storage::EdgeAddress")
(to "gid::Gid")
(edge-type "storage::EdgeType")
@ -157,7 +160,8 @@ cpp<#
(:response ((member "UpdateResult"))))
(lcp:define-struct remove-vertex-req-data ()
((gid "gid::Gid")
((worker-id :int64_t)
(gid "gid::Gid")
(tx-id "tx::TransactionId")
(check-empty :bool))
(:serialize (:slk) (:capnp)))
@ -167,7 +171,8 @@ cpp<#
(:response ((member "UpdateResult"))))
(lcp:define-struct remove-edge-data ()
((tx-id "tx::TransactionId")
((worker-id :int64_t)
(tx-id "tx::TransactionId")
(edge-id "gid::Gid")
(vertex-from-id "gid::Gid")
(vertex-to-address "storage::VertexAddress"))
@ -178,7 +183,8 @@ cpp<#
(:response ((member "UpdateResult"))))
(lcp:define-struct remove-in-edge-data ()
((tx-id "tx::TransactionId")
((worker-id :int64_t)
(tx-id "tx::TransactionId")
(vertex "gid::Gid")
(edge-address "storage::EdgeAddress"))
(:serialize (:slk) (:capnp)))

View File

@ -10,21 +10,20 @@ namespace distributed {
template <typename TRecordAccessor>
UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Emplace(
const database::StateDelta &delta) {
const database::StateDelta &delta, int worker_id) {
auto gid = std::is_same<TRecordAccessor, VertexAccessor>::value
? delta.vertex_id
: delta.edge_id;
std::lock_guard<utils::SpinLock> guard{lock_};
auto found = deltas_.find(gid);
if (found == deltas_.end()) {
found =
deltas_
.emplace(gid, std::make_pair(FindAccessor(gid),
std::vector<database::StateDelta>{}))
.first;
found = deltas_
.emplace(gid, std::make_pair(FindAccessor(gid),
std::vector<DeltaPair>{}))
.first;
}
found->second.second.emplace_back(delta);
found->second.second.emplace_back(delta, worker_id);
// TODO call `RecordAccessor::update` to force serialization errors to
// fail-fast (as opposed to when all the deltas get applied).
@ -70,7 +69,7 @@ CreatedInfo UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateVertex(
for (auto &kv : properties) result.PropsSet(kv.first, kv.second);
std::lock_guard<utils::SpinLock> guard{lock_};
deltas_.emplace(result.gid(),
std::make_pair(result, std::vector<database::StateDelta>{}));
std::make_pair(result, std::vector<DeltaPair>{}));
return CreatedInfo(result.CypherId(), result.gid());
}
@ -86,7 +85,7 @@ CreatedInfo UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateEdge(
from_addr, to_addr, edge_type, std::experimental::nullopt, cypher_id);
std::lock_guard<utils::SpinLock> guard{lock_};
deltas_.emplace(edge.gid(),
std::make_pair(edge, std::vector<database::StateDelta>{}));
std::make_pair(edge, std::vector<DeltaPair>{}));
return CreatedInfo(edge.CypherId(), edge.gid());
}
@ -98,7 +97,8 @@ UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Apply() {
// We need to reconstruct the record as in the meantime some local
// update might have updated it.
record_accessor.Reconstruct();
for (database::StateDelta &delta : kv.second.second) {
for (auto &pair : kv.second.second) {
auto delta = pair.delta;
try {
auto &dba = *db_accessor_;
switch (delta.type) {
@ -130,7 +130,8 @@ UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Apply() {
.remove_label(delta.label);
break;
case database::StateDelta::Type::ADD_OUT_EDGE:
reinterpret_cast<Vertex &>(record_accessor.update())
record_accessor.update();
reinterpret_cast<Vertex &>(*record_accessor.GetNew())
.out_.emplace(dba.db().storage().LocalizedAddressIfPossible(
delta.vertex_to_address),
dba.db().storage().LocalizedAddressIfPossible(
@ -139,7 +140,8 @@ UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Apply() {
dba.wal().Emplace(delta);
break;
case database::StateDelta::Type::ADD_IN_EDGE:
reinterpret_cast<Vertex &>(record_accessor.update())
record_accessor.update();
reinterpret_cast<Vertex &>(*record_accessor.GetNew())
.in_.emplace(dba.db().storage().LocalizedAddressIfPossible(
delta.vertex_from_address),
dba.db().storage().LocalizedAddressIfPossible(
@ -176,6 +178,76 @@ UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Apply() {
return UpdateResult::DONE;
}
template <typename TRecordAccessor>
void UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::ApplyDeltasToRecord(
gid::Gid gid, int worker_id, TRecord **old, TRecord **newr) {
std::lock_guard<utils::SpinLock> guard{lock_};
auto found = deltas_.find(gid);
if (found == deltas_.end()) return;
auto update = [](auto **old, auto **newr) {
if (!*newr) {
DCHECK(*old) << "Trying to create new record but pointer to old record "
"is nullptr.";
*newr = (*old)->CloneData();
}
};
for (auto &pair : found->second.second) {
auto delta = pair.delta;
if (worker_id != pair.worker_id) continue;
switch (delta.type) {
case database::StateDelta::Type::SET_PROPERTY_VERTEX:
case database::StateDelta::Type::SET_PROPERTY_EDGE:
update(old, newr);
(*newr)->properties_.set(delta.property, delta.value);
break;
case database::StateDelta::Type::ADD_LABEL: {
update(old, newr);
auto &labels = reinterpret_cast<Vertex *>(*newr)->labels_;
if (!utils::Contains(labels, delta.label)) {
labels.emplace_back(delta.label);
}
break;
}
case database::StateDelta::Type::REMOVE_LABEL: {
update(old, newr);
auto &labels = reinterpret_cast<Vertex *>(*newr)->labels_;
auto found = std::find(labels.begin(), labels.end(), delta.label);
if (found == labels.end()) continue;
std::swap(*found, labels.back());
labels.pop_back();
break;
}
case database::StateDelta::Type::ADD_OUT_EDGE:
update(old, newr);
reinterpret_cast<Vertex *>(*newr)->out_.emplace(
delta.vertex_to_address, delta.edge_address, delta.edge_type);
break;
case database::StateDelta::Type::ADD_IN_EDGE:
update(old, newr);
reinterpret_cast<Vertex *>(*newr)->in_.emplace(
delta.vertex_from_address, delta.edge_address, delta.edge_type);
break;
case database::StateDelta::Type::REMOVE_OUT_EDGE:
update(old, newr);
reinterpret_cast<Vertex *>(*newr)->out_.RemoveEdge(delta.edge_address);
break;
case database::StateDelta::Type::REMOVE_IN_EDGE:
update(old, newr);
reinterpret_cast<Vertex *>(*newr)->in_.RemoveEdge(delta.edge_address);
break;
default:
// Effects of REMOVE VERTEX and REMOVE EDGE aren't visible in the
// current command id so we can safely ignore this case.
// Other deltas we're ignoring don't update record.
break;
}
}
}
UpdatesRpcServer::UpdatesRpcServer(database::GraphDb *db,
distributed::Coordination *coordination)
: db_(db) {
@ -191,14 +263,14 @@ UpdatesRpcServer::UpdatesRpcServer(database::GraphDb *db,
case DeltaType::REMOVE_LABEL:
case database::StateDelta::Type::REMOVE_OUT_EDGE:
case database::StateDelta::Type::REMOVE_IN_EDGE: {
UpdateRes res(
GetUpdates(vertex_updates_, delta.transaction_id).Emplace(delta));
UpdateRes res(GetUpdates(vertex_updates_, delta.transaction_id)
.Emplace(delta, req.worker_id));
Save(res, res_builder);
return;
}
case DeltaType::SET_PROPERTY_EDGE: {
UpdateRes res(
GetUpdates(edge_updates_, delta.transaction_id).Emplace(delta));
UpdateRes res(GetUpdates(edge_updates_, delta.transaction_id)
.Emplace(delta, req.worker_id));
Save(res, res_builder);
return;
}
@ -243,7 +315,8 @@ UpdatesRpcServer::UpdatesRpcServer(database::GraphDb *db,
data.tx_id, data.to.gid(), {data.from, db_->WorkerId()},
{creation_result.gid, db_->WorkerId()}, data.edge_type);
creation_result.result =
GetUpdates(vertex_updates_, data.tx_id).Emplace(to_delta);
GetUpdates(vertex_updates_, data.tx_id)
.Emplace(to_delta, data.worker_id);
}
CreateEdgeRes res(creation_result);
@ -257,8 +330,8 @@ UpdatesRpcServer::UpdatesRpcServer(database::GraphDb *db,
auto to_delta = database::StateDelta::AddInEdge(
req.member.tx_id, req.member.to, req.member.from,
req.member.edge_address, req.member.edge_type);
auto result =
GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta);
auto result = GetUpdates(vertex_updates_, req.member.tx_id)
.Emplace(to_delta, req.member.worker_id);
AddInEdgeRes res(result);
Save(res, res_builder);
});
@ -269,8 +342,8 @@ UpdatesRpcServer::UpdatesRpcServer(database::GraphDb *db,
Load(&req, req_reader);
auto to_delta = database::StateDelta::RemoveVertex(
req.member.tx_id, req.member.gid, req.member.check_empty);
auto result =
GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta);
auto result = GetUpdates(vertex_updates_, req.member.tx_id)
.Emplace(to_delta, req.member.worker_id);
RemoveVertexRes res(result);
Save(res, res_builder);
});
@ -288,9 +361,11 @@ UpdatesRpcServer::UpdatesRpcServer(database::GraphDb *db,
RemoveInEdgeReq req;
Load(&req, req_reader);
auto data = req.member;
RemoveInEdgeRes res(GetUpdates(vertex_updates_, data.tx_id)
.Emplace(database::StateDelta::RemoveInEdge(
data.tx_id, data.vertex, data.edge_address)));
RemoveInEdgeRes res(
GetUpdates(vertex_updates_, data.tx_id)
.Emplace(database::StateDelta::RemoveInEdge(data.tx_id, data.vertex,
data.edge_address),
data.worker_id));
Save(res, res_builder);
});
}
@ -314,6 +389,28 @@ UpdateResult UpdatesRpcServer::Apply(tx::TransactionId tx_id) {
return UpdateResult::DONE;
}
template <>
void UpdatesRpcServer::ApplyDeltasToRecord<Vertex>(tx::TransactionId tx_id,
gid::Gid gid, int worker_id,
Vertex **old,
Vertex **newr) {
auto access = vertex_updates_.access();
auto found = access.find(tx_id);
if (found != access.end())
found->second.ApplyDeltasToRecord(gid, worker_id, old, newr);
}
template <>
void UpdatesRpcServer::ApplyDeltasToRecord<Edge>(tx::TransactionId tx_id,
gid::Gid gid, int worker_id,
Edge **old,
Edge **newr) {
auto access = edge_updates_.access();
auto found = access.find(tx_id);
if (found != access.end())
found->second.ApplyDeltasToRecord(gid, worker_id, old, newr);
}
void UpdatesRpcServer::ClearTransactionalCache(
tx::TransactionId oldest_active) {
auto vertex_access = vertex_updates_.access();
@ -350,7 +447,8 @@ CreateResult UpdatesRpcServer::CreateEdge(const CreateEdgeReqData &req) {
auto from_delta = database::StateDelta::AddOutEdge(
req.tx_id, req.from, req.to, {ids.gid, db_->WorkerId()}, req.edge_type);
auto result = GetUpdates(vertex_updates_, req.tx_id).Emplace(from_delta);
auto result = GetUpdates(vertex_updates_, req.tx_id)
.Emplace(from_delta, req.worker_id);
return {result, ids.cypher_id, ids.gid};
}
@ -358,13 +456,15 @@ UpdateResult UpdatesRpcServer::RemoveEdge(const RemoveEdgeData &data) {
// Edge removal.
auto deletion_delta =
database::StateDelta::RemoveEdge(data.tx_id, data.edge_id);
auto result = GetUpdates(edge_updates_, data.tx_id).Emplace(deletion_delta);
auto result = GetUpdates(edge_updates_, data.tx_id)
.Emplace(deletion_delta, data.worker_id);
// Out-edge removal, for sure is local.
if (result == UpdateResult::DONE) {
auto remove_out_delta = database::StateDelta::RemoveOutEdge(
data.tx_id, data.vertex_from_id, {data.edge_id, db_->WorkerId()});
result = GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_out_delta);
result = GetUpdates(vertex_updates_, data.tx_id)
.Emplace(remove_out_delta, data.worker_id);
}
// In-edge removal, might not be local.
@ -373,7 +473,8 @@ UpdateResult UpdatesRpcServer::RemoveEdge(const RemoveEdgeData &data) {
auto remove_in_delta = database::StateDelta::RemoveInEdge(
data.tx_id, data.vertex_to_address.gid(),
{data.edge_id, db_->WorkerId()});
result = GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_in_delta);
result = GetUpdates(vertex_updates_, data.tx_id)
.Emplace(remove_in_delta, data.worker_id);
}
return result;

View File

@ -33,7 +33,18 @@ class UpdatesRpcServer {
// Remote updates for one transaction.
template <typename TRecordAccessor>
class TransactionUpdates {
struct DeltaPair {
DeltaPair(const database::StateDelta &delta, int worker_id)
: delta(delta), worker_id(worker_id) {}
database::StateDelta delta;
int worker_id;
};
public:
using TRecord = typename std::remove_pointer<decltype(
std::declval<TRecordAccessor>().GetNew())>::type;
TransactionUpdates(database::GraphDb *db,
tx::TransactionId tx_id)
: db_accessor_(db->Access(tx_id)) {}
@ -41,7 +52,7 @@ class UpdatesRpcServer {
/// Adds a delta and returns the result. Does not modify the state (data)
/// of the graph element the update is for, but calls the `update` method
/// to fail-fast on serialization and update-after-delete errors.
UpdateResult Emplace(const database::StateDelta &delta);
UpdateResult Emplace(const database::StateDelta &delta, int worker_id);
/// Creates a new vertex and returns it's cypher_id and gid.
CreatedInfo CreateVertex(
@ -60,12 +71,20 @@ class UpdatesRpcServer {
/// Applies all the deltas on the record.
UpdateResult Apply();
/// Applies all deltas made by certain worker to given old and new record.
/// This method could change newr pointer, and if it does it wont free that
/// memory. In case that update method needs to be called on records, new
/// record will be created by calling CloneData on old record. Caller
/// has to make sure to free that memory.
void ApplyDeltasToRecord(gid::Gid gid, int worker_id, TRecord **old,
TRecord **newr);
auto &db_accessor() { return *db_accessor_; }
private:
std::unique_ptr<database::GraphDbAccessor> db_accessor_;
std::unordered_map<
gid::Gid, std::pair<TRecordAccessor, std::vector<database::StateDelta>>>
std::unordered_map<gid::Gid,
std::pair<TRecordAccessor, std::vector<DeltaPair>>>
deltas_;
// Multiple workers might be sending remote updates concurrently.
utils::SpinLock lock_;
@ -83,6 +102,15 @@ class UpdatesRpcServer {
/// cache after applying them, regardless of the result.
UpdateResult Apply(tx::TransactionId tx_id);
/// Applies all deltas made by certain worker to given old and new record.
/// This method could change newr pointer, and if it does it wont free that
/// memory. In case that update method needs to be called on records, new
/// record will be created by calling CloneData on old record. Caller
/// has to make sure to free that memory.
template <typename TRecord>
void ApplyDeltasToRecord(tx::TransactionId tx_id, gid::Gid, int worker_id,
TRecord **old, TRecord **newr);
/// Clears the cache of local transactions that are completed. The signature
/// of this method is dictated by `distributed::TransactionalCacheCleaner`.
void ClearTransactionalCache(tx::TransactionId oldest_active);

View File

@ -10,11 +10,12 @@ EdgeAccessor::EdgeAccessor(EdgeAddress address,
from_(nullptr),
to_(nullptr),
edge_type_() {
auto guard = storage::GetDataLock(*this);
RecordAccessor::Reconstruct();
if (current_ != nullptr) {
from_ = current_->from_;
to_ = current_->to_;
edge_type_ = current_->edge_type_;
if (GetCurrent() != nullptr) {
from_ = GetCurrent()->from_;
to_ = GetCurrent()->to_;
edge_type_ = GetCurrent()->edge_type_;
}
}

View File

@ -16,6 +16,7 @@ EdgesIterable::EdgesIterable(
const VertexAccessor &va, bool from, const VertexAccessor &dest,
const std::vector<storage::EdgeType> *edge_types) {
auto sptr = std::make_shared<VertexAccessor>(va);
sptr->HoldCachedData();
begin_.emplace(GetBegin(sptr, from, dest.address(), edge_types));
end_.emplace(GetEnd(sptr, from));
}
@ -24,6 +25,7 @@ EdgesIterable::EdgesIterable(
const VertexAccessor &va, bool from,
const std::vector<storage::EdgeType> *edge_types) {
auto sptr = std::make_shared<VertexAccessor>(va);
sptr->HoldCachedData();
begin_.emplace(GetBegin(sptr, from, std::experimental::nullopt, edge_types));
end_.emplace(GetEnd(sptr, from));
}
@ -35,9 +37,9 @@ EdgeAccessorIterator EdgesIterable::GetBegin(
const Edges *edges;
if (from) {
edges = &va->current().out_;
edges = &va->GetCurrent()->out_;
} else {
edges = &va->current().in_;
edges = &va->GetCurrent()->in_;
}
return EdgeAccessorIterator(edges->begin(dest, edge_types), va, from);
@ -46,10 +48,10 @@ EdgeAccessorIterator EdgesIterable::GetBegin(
EdgeAccessorIterator EdgesIterable::GetEnd(std::shared_ptr<VertexAccessor> va,
bool from) {
if (from) {
auto iter = va->current().out_.end();
auto iter = va->GetCurrent()->out_.end();
return EdgeAccessorIterator(iter, va, from);
} else {
auto iter = va->current().in_.end();
auto iter = va->GetCurrent()->in_.end();
return EdgeAccessorIterator(iter, va, from);
}
};

View File

@ -17,12 +17,69 @@ RecordAccessor<TRecord>::RecordAccessor(AddressT address,
database::GraphDbAccessor &db_accessor)
: db_accessor_(&db_accessor),
address_(db_accessor.db().storage().LocalizedAddressIfPossible(address)) {
if (is_local()) {
new (&local_) Local();
} else {
new (&remote_) Remote();
}
}
template <typename TRecord>
RecordAccessor<TRecord>::RecordAccessor(const RecordAccessor &other)
: db_accessor_(other.db_accessor_), address_(other.address_) {
is_initialized_ = other.is_initialized_;
if (other.is_local()) {
new (&local_) Local();
local_.old = other.local_.old;
local_.newr = other.local_.newr;
} else {
DCHECK(other.remote_.lock_counter == 0);
new (&remote_) Remote();
remote_.has_updated = other.remote_.has_updated;
}
current_ = other.current_;
}
template <typename TRecord>
RecordAccessor<TRecord> &RecordAccessor<TRecord>::operator=(
const RecordAccessor &other) {
if (is_local()) {
local_.~Local();
} else {
DCHECK(remote_.lock_counter == 0);
remote_.~Remote();
}
db_accessor_ = other.db_accessor_;
is_initialized_ = other.is_initialized_;
if (other.is_local()) {
new (&local_) Local();
local_.old = other.local_.old;
local_.newr = other.local_.newr;
} else {
new (&remote_) Remote();
remote_.has_updated = other.remote_.has_updated;
}
address_ = other.address_;
current_ = other.current_;
return *this;
}
template <typename TRecord>
RecordAccessor<TRecord>::~RecordAccessor() {
if (is_local()) {
local_.~Local();
} else {
remote_.~Remote();
}
}
template <typename TRecord>
PropertyValue RecordAccessor<TRecord>::PropsAt(storage::Property key) const {
auto guard = storage::GetDataLock(*this);
return current().properties_.at(key);
return GetCurrent()->properties_.at(key);
}
template <>
@ -32,9 +89,10 @@ void RecordAccessor<Vertex>::PropsSet(storage::Property key,
auto delta = StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
dba.PropertyName(key), value);
auto guard = storage::GetDataLock(*this);
update().properties_.set(key, value);
update();
GetNew()->properties_.set(key, value);
if (is_local()) {
dba.UpdatePropertyIndex(key, *this, &update());
dba.UpdatePropertyIndex(key, *this, GetNew());
}
ProcessDelta(delta);
}
@ -47,7 +105,8 @@ void RecordAccessor<Edge>::PropsSet(storage::Property key,
dba.PropertyName(key), value);
auto guard = storage::GetDataLock(*this);
update().properties_.set(key, value);
update();
GetNew()->properties_.set(key, value);
ProcessDelta(delta);
}
@ -58,7 +117,8 @@ void RecordAccessor<Vertex>::PropsErase(storage::Property key) {
StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
dba.PropertyName(key), PropertyValue::Null);
auto guard = storage::GetDataLock(*this);
update().properties_.set(key, PropertyValue::Null);
update();
GetNew()->properties_.set(key, PropertyValue::Null);
ProcessDelta(delta);
}
@ -70,7 +130,8 @@ void RecordAccessor<Edge>::PropsErase(storage::Property key) {
dba.PropertyName(key), PropertyValue::Null);
auto guard = storage::GetDataLock(*this);
update().properties_.set(key, PropertyValue::Null);
update();
GetNew()->properties_.set(key, PropertyValue::Null);
ProcessDelta(delta);
}
@ -78,16 +139,17 @@ template <typename TRecord>
void RecordAccessor<TRecord>::PropsClear() {
std::vector<storage::Property> to_remove;
auto guard = storage::GetDataLock(*this);
for (const auto &kv : update().properties_) to_remove.emplace_back(kv.first);
update();
for (const auto &kv : GetNew()->properties_) to_remove.emplace_back(kv.first);
for (const auto &prop : to_remove) {
PropsErase(prop);
}
}
template <typename TRecord>
const PropertyValueStore &RecordAccessor<TRecord>::Properties() const {
PropertyValueStore RecordAccessor<TRecord>::Properties() const {
auto guard = storage::GetDataLock(*this);
return current().properties_;
return GetCurrent()->properties_;
}
template <typename TRecord>
@ -123,95 +185,128 @@ RecordAccessor<TRecord>::GlobalAddress() const {
template <typename TRecord>
RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchNew() {
auto guard = storage::GetDataLock(*this);
if (is_local()) {
if (!new_) {
// if new_ is not set yet, look for it
// we can just Reconstruct the pointers, old_ will get initialized
// to the same value as it has now, and the amount of work is the
// same as just looking for a new_ record
if (!local_.newr) {
// if local new record is not set yet, look for it we can just Reconstruct
// the pointers, local old record will get initialized to the same value
// as it has now, and the amount of work is the same as just looking for a
// new record
if (!Reconstruct())
DLOG(FATAL)
<< "RecordAccessor::SwitchNew - accessor invalid after Reconstruct";
}
current_ = local_.newr ? CurrentRecord::NEW : CurrentRecord::OLD;
} else {
// A remote record only sees local updates, until the command is advanced.
// So this does nothing, as the old/new switch happens below.
auto guard = storage::GetDataLock(*this);
current_ =
remote_.data->new_record ? CurrentRecord::NEW : CurrentRecord::OLD;
}
current_ = new_ ? new_ : old_;
return *this;
}
template <typename TRecord>
TRecord *RecordAccessor<TRecord>::GetNew() const {
if (!is_local()) {
DCHECK(remote_.lock_counter > 0) << "Remote data is missing";
if (is_local()) {
return local_.newr;
} else {
DCHECK(remote_.data) << "Remote data is missing";
return remote_.data->new_record.get();
}
return new_;
}
template <typename TRecord>
RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchOld() {
current_ = old_ ? old_ : new_;
if (is_local()) {
current_ = local_.old ? CurrentRecord::OLD : CurrentRecord::NEW;
} else {
auto guard = storage::GetDataLock(*this);
current_ = remote_.data->old_record.get() ? CurrentRecord::OLD
: CurrentRecord::NEW;
}
return *this;
}
template <typename TRecord>
TRecord *RecordAccessor<TRecord>::GetOld() const {
if (!is_local()) {
DCHECK(remote_.lock_counter > 0) << "Remote data is missing";
if (is_local()) {
return local_.old;
} else {
DCHECK(remote_.data) << "Remote data is missing";
return remote_.data->old_record.get();
}
return old_;
}
template <typename TRecord>
bool RecordAccessor<TRecord>::Reconstruct() const {
is_initialized_ = true;
auto &dba = db_accessor();
auto guard = storage::GetDataLock(*this);
if (is_local()) {
address().local()->find_set_old_new(dba.transaction(), &old_, &new_);
address().local()->find_set_old_new(dba.transaction(), &local_.old,
&local_.newr);
current_ = local_.old ? CurrentRecord::OLD : CurrentRecord::NEW;
return local_.old != nullptr || local_.newr != nullptr;
} else {
// It's not possible that we have a global address for a graph element
// that's local, because that is resolved in the constructor.
// TODO in write queries it's possible the command has been advanced and
// we need to invalidate the Cache and really get the latest stuff.
// But only do that after the command has been advanced.
dba.data_manager().template FindSetOldNew<TRecord>(
dba.transaction_id(), address().worker_id(), gid(), &old_, &new_);
auto guard = storage::GetDataLock(*this);
TRecord *old_ = remote_.data->old_record.get();
TRecord *new_ = remote_.data->new_record.get();
current_ = old_ ? CurrentRecord::OLD : CurrentRecord::NEW;
return old_ != nullptr || new_ != nullptr;
}
current_ = old_ ? old_ : new_;
return old_ != nullptr || new_ != nullptr;
}
template <typename TRecord>
TRecord &RecordAccessor<TRecord>::update() const {
auto &dba = db_accessor();
auto guard = storage::GetDataLock(*this);
void RecordAccessor<TRecord>::update() const {
// Edges have lazily initialize mutable, versioned data (properties).
if (std::is_same<TRecord, Edge>::value && current_ == nullptr) {
if (std::is_same<TRecord, Edge>::value && is_initialized_ == false) {
bool reconstructed = Reconstruct();
DCHECK(reconstructed) << "Unable to initialize record";
}
auto &dba = db_accessor();
auto guard = storage::GetDataLock(*this);
const auto &t = dba.transaction();
if (!new_ && old_->is_expired_by(t))
TRecord *old = is_local() ? local_.old : remote_.data->old_record.get();
TRecord *newr = is_local() ? local_.newr : remote_.data->new_record.get();
if (!newr && old->is_expired_by(t))
throw RecordDeletedError();
else if (new_ && new_->is_expired_by(t))
else if (newr && newr->is_expired_by(t))
throw RecordDeletedError();
if (new_) return *new_;
if (newr) return;
if (is_local()) {
local_.newr = address().local()->update(t);
DCHECK(local_.newr != nullptr)
<< "RecordAccessor.new_ is null after update";
if (address().is_local()) {
new_ = address().local()->update(dba.transaction());
} else {
new_ = dba.data_manager().template FindNew<TRecord>(dba.transaction_id(),
address().gid());
}
remote_.has_updated = true;
DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update";
return *new_;
if (remote_.lock_counter > 0) {
remote_.data = db_accessor_->data_manager().Find<TRecord>(
dba.transaction_id(), dba.worker_id(), address().worker_id(),
address().gid(), remote_.has_updated);
}
DCHECK(remote_.data->new_record)
<< "RecordAccessor.new_ is null after update";
}
}
template <typename TRecord>
bool RecordAccessor<TRecord>::Visible(const tx::Transaction &t,
bool current_state) const {
auto guard = storage::GetDataLock(*this);
TRecord *old = is_local() ? local_.old : remote_.data->old_record.get();
TRecord *newr = is_local() ? local_.newr : remote_.data->new_record.get();
return (old && !(current_state && old->is_expired_by(t))) ||
(current_state && newr && !newr->is_expired_by(t));
}
template <typename TRecord>
@ -228,52 +323,57 @@ int64_t RecordAccessor<TRecord>::CypherId() const {
// TODO (buda): If we save cypher_id similar/next to edge_type we would save
// a network call.
return db_accessor_->data_manager()
.template Find<TRecord>(dba.transaction().id_, address().worker_id(),
gid())
.cypher_id;
.template Find<TRecord>(dba.transaction().id_, dba.worker_id(),
address().worker_id(), gid())
->cypher_id;
}
template <typename TRecord>
void RecordAccessor<TRecord>::HoldCachedData() const {
if (!is_local()) {
if (remote_.lock_counter == 0) {
// TODO (vkasljevic) uncomment once Remote has beed implemented
// remote_.data = db_accessor_->data_manager().template Find<TRecord>(
// db_accessor_->transaction().id_, Address().worker_id(),
// Address().gid(), remote_.has_updated);
remote_.data = db_accessor_->data_manager().template Find<TRecord>(
db_accessor_->transaction().id_, db_accessor_->worker_id(),
address().worker_id(), address().gid(), remote_.has_updated);
}
++remote_.lock_counter;
DCHECK(remote_.lock_counter <= 10000)
<< "Something wrong with lock counter";
<< "Something wrong with RemoteDataLock";
}
}
template <typename TRecord>
void RecordAccessor<TRecord>::ReleaseCachedData() const {
if (!is_local()) {
if (!is_local()) {
DCHECK(remote_.lock_counter > 0) << "Lock should exist at this point";
--remote_.lock_counter;
if (remote_.lock_counter == 0) {
// TODO (vkasljevic) uncomment once Remote has beed implemented
// remote_.data = nullptr;
remote_.data = nullptr;
}
}
}
template <typename TRecord>
const TRecord &RecordAccessor<TRecord>::current() const {
if (!is_local()) {
DCHECK(remote_.lock_counter > 0) << "Remote data is missing";
TRecord *RecordAccessor<TRecord>::GetCurrent() const {
// Edges have lazily initialize mutable, versioned data (properties).
if (std::is_same<TRecord, Edge>::value && is_initialized_ == false) {
Reconstruct();
}
// Edges have lazily initialize mutable, versioned data (properties).
if (std::is_same<TRecord, Edge>::value && current_ == nullptr) {
bool reconstructed = Reconstruct();
DCHECK(reconstructed) << "Unable to initialize record";
if (is_local()) {
return current_ == CurrentRecord::OLD ? local_.old : local_.newr;
} else {
DCHECK(remote_.data) << "CachedDataRecord is missing";
if (current_ == CurrentRecord::NEW) {
if (!remote_.data->new_record && remote_.data->old_record) {
current_ = CurrentRecord::OLD;
return remote_.data->old_record.get();
}
return remote_.data->new_record.get();
}
return remote_.data->old_record.get();
}
DCHECK(current_ != nullptr) << "RecordAccessor.current_ pointer is nullptr";
return *current_;
}
template <typename TRecord>
@ -283,14 +383,19 @@ void RecordAccessor<TRecord>::ProcessDelta(
db_accessor().wal().Emplace(delta);
} else {
SendDelta(delta);
// This is needed because Update() method creates new record if it doesn't
// exist. If that record is evicted from cache and fetched again it wont
// have new record. Once delta has been sent record will have new so we
// don't have to update anymore.
remote_.has_updated = false;
}
}
template <typename TRecord>
void RecordAccessor<TRecord>::SendDelta(
const database::StateDelta &delta) const {
auto result =
db_accessor_->updates_clients().Update(address().worker_id(), delta);
auto result = db_accessor_->updates_clients().Update(
db_accessor_->worker_id(), address().worker_id(), delta);
switch (result) {
case distributed::UpdateResult::DONE:
break;

View File

@ -3,12 +3,13 @@
#include <glog/logging.h>
#include "storage/distributed/mvcc/version_list.hpp"
#include "distributed/cached_record_data.hpp"
#include "storage/common/types/property_value.hpp"
#include "storage/common/types/property_value_store.hpp"
#include "storage/common/types/types.hpp"
#include "storage/distributed/address.hpp"
#include "storage/distributed/gid.hpp"
#include "storage/distributed/mvcc/version_list.hpp"
namespace database {
class GraphDbAccessor;
@ -30,17 +31,17 @@ class RecordAccessor {
using AddressT = storage::Address<mvcc::VersionList<TRecord>>;
// this class is default copyable, movable and assignable
RecordAccessor(const RecordAccessor &other) = default;
RecordAccessor(RecordAccessor &&other) = default;
RecordAccessor &operator=(const RecordAccessor &other) = default;
RecordAccessor &operator=(RecordAccessor &&other) = default;
RecordAccessor(const RecordAccessor &other);
RecordAccessor(RecordAccessor &&other) = delete;
RecordAccessor &operator=(const RecordAccessor &other);
RecordAccessor &operator=(RecordAccessor &&other) = delete;
protected:
/**
* Protected destructor because we allow inheritance, but nobody should own a
* pointer to plain RecordAccessor.
*/
~RecordAccessor() = default;
~RecordAccessor();
/**
* Only derived types may allow construction.
@ -66,7 +67,7 @@ class RecordAccessor {
void PropsClear();
/** Returns the properties of this record. */
const PropertyValueStore &Properties() const;
PropertyValueStore Properties() const;
bool operator==(const RecordAccessor &other) const;
@ -133,7 +134,7 @@ class RecordAccessor {
*
* @throws RecordDeletedError
*/
TRecord &update() const;
void update() const;
/**
* Returns true if the given accessor is visible to the given transaction.
@ -143,10 +144,7 @@ class RecordAccessor {
* deletions performed in the current transaction+command are not
* ignored).
*/
bool Visible(const tx::Transaction &t, bool current_state) const {
return (old_ && !(current_state && old_->is_expired_by(t))) ||
(current_state && new_ && !new_->is_expired_by(t));
}
bool Visible(const tx::Transaction &t, bool current_state) const;
// TODO: This shouldn't be here, because it's only relevant in distributed.
/** Indicates if this accessor represents a local Vertex/Edge, or one whose
@ -173,7 +171,7 @@ class RecordAccessor {
/** Returns the current version (either new_ or old_) set on this
* RecordAccessor. */
const TRecord &current() const;
TRecord *GetCurrent() const;
protected:
/**
@ -192,53 +190,67 @@ class RecordAccessor {
void SendDelta(const database::StateDelta &delta) const;
/**
* Pointer to the version (either old_ or new_) that READ operations
* in the accessor should take data from. Note that WRITE operations
* should always use new_.
*
* This pointer can be null if created by an accessor which lazily reads from
* mvcc.
*/
// TODO (vkasljevic) remove this
mutable TRecord *current_{nullptr};
private:
enum class CurrentRecord : bool { OLD, NEW };
struct Local {
/**
* Version that has been modified (created or updated) by the current
* transaction+command.
*
* Can be null when the record has not been modified in the current
* transaction+command. It is also possible that the modification
* has happened, but this RecordAccessor does not know this. To
* ensure correctness, the `SwitchNew` function must check if this
* is null, and if it is it must check with the vlist_ if there is
* an update.
*/
TRecord *newr{nullptr};
/**
* Latest version which is visible to the current transaction+command
* but has not been created nor modified by the current transaction+command.
*
* Can be null only when the record itself (the version-list) has
* been created by the current transaction+command.
*/
TRecord *old{nullptr};
};
struct Remote {
/* TODO (vkasljevic) possible improvement (to discuss)
* change to std::unique_ptr and borrow it from data manager
* and later return it to data manager
*/
std::shared_ptr<distributed::CachedRecordData<TRecord>> data;
/* Keeps track of how many times HoldRemoteData was called. */
unsigned short lock_counter{0};
/* Has Update() been called. This is needed because Update() method creates
* new record if it doesn't exist. If that record is evicted from cache
* and fetched again it wont have a new record.
*/
bool has_updated{false};
};
// The database accessor for which this record accessor is created
// Provides means of getting to the transaction and database functions.
// Immutable, set in the constructor and never changed.
database::GraphDbAccessor *db_accessor_;
AddressT address_;
struct Remote {
/* Keeps track of how many times HoldRemoteData was called. */
mutable unsigned short lock_counter{0};
union {
Local local_;
Remote remote_;
};
Remote remote_;
mutable CurrentRecord current_ = CurrentRecord::OLD;
/**
* Latest version which is visible to the current transaction+command
* but has not been created nor modified by the current transaction+command.
*
* Can be null only when the record itself (the version-list) has
* been created by the current transaction+command.
* Flag that indicates whether Reconstruct() was called.
* This is needed because edges are lazy initialized.
*/
mutable TRecord *old_{nullptr};
/**
* Version that has been modified (created or updated) by the current
* transaction+command.
*
* Can be null when the record has not been modified in the current
* transaction+command. It is also possible that the modification
* has happened, but this RecordAccessor does not know this. To
* ensure correctness, the `SwitchNew` function must check if this
* is null, and if it is it must check with the vlist_ if there is
* an update.
*/
mutable TRecord *new_{nullptr};
mutable bool is_initialized_ = false;
};
/** Error when trying to update a deleted record */

View File

@ -12,26 +12,31 @@ VertexAccessor::VertexAccessor(VertexAddress address,
Reconstruct();
}
size_t VertexAccessor::out_degree() const { return current().out_.size(); }
size_t VertexAccessor::out_degree() const {
auto guard = storage::GetDataLock(*this);
return GetCurrent()->out_.size();
}
size_t VertexAccessor::in_degree() const { return current().in_.size(); }
size_t VertexAccessor::in_degree() const {
auto guard = storage::GetDataLock(*this);
return GetCurrent()->in_.size();
}
void VertexAccessor::add_label(storage::Label label) {
auto &dba = db_accessor();
auto delta = database::StateDelta::AddLabel(dba.transaction_id(), gid(),
label, dba.LabelName(label));
auto guard = storage::GetDataLock(*this);
auto &vertex = update();
update();
auto &vertex = *GetNew();
// not a duplicate label, add it
if (!utils::Contains(vertex.labels_, label)) {
vertex.labels_.emplace_back(label);
if (is_local()) {
dba.wal().Emplace(delta);
dba.UpdateLabelIndices(label, *this, &vertex);
}
ProcessDelta(delta);
}
if (!is_local()) SendDelta(delta);
}
void VertexAccessor::remove_label(storage::Label label) {
@ -39,29 +44,26 @@ void VertexAccessor::remove_label(storage::Label label) {
auto delta = database::StateDelta::RemoveLabel(dba.transaction_id(), gid(),
label, dba.LabelName(label));
auto guard = storage::GetDataLock(*this);
auto &vertex = update();
update();
auto &vertex = *GetNew();
if (utils::Contains(vertex.labels_, label)) {
auto &labels = vertex.labels_;
auto found = std::find(labels.begin(), labels.end(), delta.label);
std::swap(*found, labels.back());
labels.pop_back();
if (is_local()) {
dba.wal().Emplace(delta);
}
ProcessDelta(delta);
}
if (!is_local()) SendDelta(delta);
}
bool VertexAccessor::has_label(storage::Label label) const {
auto guard = storage::GetDataLock(*this);
auto &labels = this->current().labels_;
auto &labels = GetCurrent()->labels_;
return std::find(labels.begin(), labels.end(), label) != labels.end();
}
std::vector<storage::Label> VertexAccessor::labels() const {
auto guard = storage::GetDataLock(*this);
return this->current().labels_;
return GetCurrent()->labels_;
}
void VertexAccessor::RemoveOutEdge(storage::EdgeAddress edge) {
@ -71,9 +73,11 @@ void VertexAccessor::RemoveOutEdge(storage::EdgeAddress edge) {
SwitchNew();
auto guard = storage::GetDataLock(*this);
if (current().is_expired_by(dba.transaction())) return;
if (GetCurrent()->is_expired_by(dba.transaction())) return;
update().out_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge));
update();
GetNew()->out_.RemoveEdge(
dba.db().storage().LocalizedAddressIfPossible(edge));
ProcessDelta(delta);
}
@ -84,9 +88,10 @@ void VertexAccessor::RemoveInEdge(storage::EdgeAddress edge) {
SwitchNew();
auto guard = storage::GetDataLock(*this);
if (current().is_expired_by(dba.transaction())) return;
if (GetCurrent()->is_expired_by(dba.transaction())) return;
update().in_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge));
update();
GetNew()->in_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge));
ProcessDelta(delta);
}

View File

@ -46,7 +46,6 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
/** Returns EdgeAccessors for all incoming edges. */
EdgesIterable in() const {
auto guard = storage::GetDataLock(*this);
return EdgesIterable(*this, false);
}
@ -59,7 +58,6 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
*/
EdgesIterable in(const VertexAccessor &dest,
const std::vector<storage::EdgeType> *edge_types = nullptr) const {
auto guard = storage::GetDataLock(*this);
return EdgesIterable(*this, false, dest, edge_types);
}
@ -70,13 +68,11 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
* or empty, the parameter is ignored.
*/
EdgesIterable in(const std::vector<storage::EdgeType> *edge_types) const {
auto guard = storage::GetDataLock(*this);
return EdgesIterable(*this, false, edge_types);
}
/** Returns EdgeAccessors for all outgoing edges. */
EdgesIterable out() const {
auto guard = storage::GetDataLock(*this);
return EdgesIterable(*this, true);
}
@ -90,7 +86,6 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
*/
EdgesIterable out(const VertexAccessor &dest,
const std::vector<storage::EdgeType> *edge_types = nullptr) const {
auto guard = storage::GetDataLock(*this);
return EdgesIterable(*this, true, dest, edge_types);
}
@ -101,7 +96,6 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
* or empty, the parameter is ignored.
*/
EdgesIterable out(const std::vector<storage::EdgeType> *edge_types) const {
auto guard = storage::GetDataLock(*this);
return EdgesIterable(*this, true, edge_types);
}

View File

@ -53,6 +53,8 @@ class DistributedGraphDbTest : public ::testing::Test {
master_config.master_endpoint = {kLocal, 0};
master_config.query_execution_time_sec = QueryExecutionTimeSec(0);
master_config.durability_directory = GetDurabilityDirectory(0);
master_config.vertex_cache_size = 1;
master_config.edge_cache_size = 1;
// Flag needs to be updated due to props on disk storage.
FLAGS_durability_directory = GetDurabilityDirectory(0);
// This is semantically wrong since this is not a cluster of size 1 but of
@ -67,6 +69,8 @@ class DistributedGraphDbTest : public ::testing::Test {
auto worker_config = [this](int worker_id) {
database::Config config;
config.worker_id = worker_id;
config.vertex_cache_size = 1;
config.edge_cache_size = 1;
config.master_endpoint = master_->endpoint();
config.durability_directory = GetDurabilityDirectory(worker_id);
config.worker_endpoint = {kLocal, 0};